1 /*
2 * Multifd QATzip compression implementation
3 *
4 * Copyright (c) Bytedance
5 *
6 * Authors:
7 * Bryan Zhang <bryan.zhang@bytedance.com>
8 * Hao Xiang <hao.xiang@bytedance.com>
9 * Yichen Wang <yichen.wang@bytedance.com>
10 *
11 * This work is licensed under the terms of the GNU GPL, version 2 or later.
12 * See the COPYING file in the top-level directory.
13 */
14
15 #include "qemu/osdep.h"
16 #include "exec/ramblock.h"
17 #include "qapi/error.h"
18 #include "qemu/error-report.h"
19 #include "qapi/qapi-types-migration.h"
20 #include "options.h"
21 #include "multifd.h"
22 #include <qatzip.h>
23
24 typedef struct {
25 /*
26 * Unique session for use with QATzip API
27 */
28 QzSession_T sess;
29
30 /*
31 * For compression: Buffer for pages to compress
32 * For decompression: Buffer for data to decompress
33 */
34 uint8_t *in_buf;
35 uint32_t in_len;
36
37 /*
38 * For compression: Output buffer of compressed data
39 * For decompression: Output buffer of decompressed data
40 */
41 uint8_t *out_buf;
42 uint32_t out_len;
43 } QatzipData;
44
45 /**
46 * qatzip_send_setup: Set up QATzip session and private buffers.
47 *
48 * @param p Multifd channel params
49 * @param errp Pointer to error, which will be set in case of error
50 * @return 0 on success, -1 on error (and *errp will be set)
51 */
qatzip_send_setup(MultiFDSendParams * p,Error ** errp)52 static int qatzip_send_setup(MultiFDSendParams *p, Error **errp)
53 {
54 QatzipData *q;
55 QzSessionParamsDeflate_T params;
56 const char *err_msg;
57 int ret;
58
59 q = g_new0(QatzipData, 1);
60 p->compress_data = q;
61 /* We need one extra place for the packet header */
62 p->iov = g_new0(struct iovec, 2);
63
64 /*
65 * Initialize QAT device with software fallback by default. This allows
66 * QATzip to use CPU path when QAT hardware reaches maximum throughput.
67 */
68 ret = qzInit(&q->sess, true);
69 if (ret != QZ_OK && ret != QZ_DUPLICATE) {
70 err_msg = "qzInit failed";
71 goto err;
72 }
73
74 ret = qzGetDefaultsDeflate(¶ms);
75 if (ret != QZ_OK) {
76 err_msg = "qzGetDefaultsDeflate failed";
77 goto err;
78 }
79
80 /* Make sure to use configured QATzip compression level. */
81 params.common_params.comp_lvl = migrate_multifd_qatzip_level();
82 ret = qzSetupSessionDeflate(&q->sess, ¶ms);
83 if (ret != QZ_OK && ret != QZ_DUPLICATE) {
84 err_msg = "qzSetupSessionDeflate failed";
85 goto err;
86 }
87
88 if (MULTIFD_PACKET_SIZE > UINT32_MAX) {
89 err_msg = "packet size too large for QAT";
90 goto err;
91 }
92
93 q->in_len = MULTIFD_PACKET_SIZE;
94 /*
95 * PINNED_MEM is an enum from qatzip headers, which means to use
96 * kzalloc_node() to allocate memory for QAT DMA purposes. When QAT device
97 * is not available or software fallback is used, the malloc flag needs to
98 * be set as COMMON_MEM.
99 */
100 q->in_buf = qzMalloc(q->in_len, 0, PINNED_MEM);
101 if (!q->in_buf) {
102 q->in_buf = qzMalloc(q->in_len, 0, COMMON_MEM);
103 if (!q->in_buf) {
104 err_msg = "qzMalloc failed";
105 goto err;
106 }
107 }
108
109 q->out_len = qzMaxCompressedLength(MULTIFD_PACKET_SIZE, &q->sess);
110 q->out_buf = qzMalloc(q->out_len, 0, PINNED_MEM);
111 if (!q->out_buf) {
112 q->out_buf = qzMalloc(q->out_len, 0, COMMON_MEM);
113 if (!q->out_buf) {
114 err_msg = "qzMalloc failed";
115 goto err;
116 }
117 }
118
119 return 0;
120
121 err:
122 error_setg(errp, "multifd %u: [sender] %s", p->id, err_msg);
123 return -1;
124 }
125
126 /**
127 * qatzip_send_cleanup: Tear down QATzip session and release private buffers.
128 *
129 * @param p Multifd channel params
130 * @param errp Pointer to error, which will be set in case of error
131 * @return None
132 */
qatzip_send_cleanup(MultiFDSendParams * p,Error ** errp)133 static void qatzip_send_cleanup(MultiFDSendParams *p, Error **errp)
134 {
135 QatzipData *q = p->compress_data;
136
137 if (q) {
138 if (q->in_buf) {
139 qzFree(q->in_buf);
140 }
141 if (q->out_buf) {
142 qzFree(q->out_buf);
143 }
144 (void)qzTeardownSession(&q->sess);
145 (void)qzClose(&q->sess);
146 g_free(q);
147 }
148
149 g_free(p->iov);
150 p->iov = NULL;
151 p->compress_data = NULL;
152 }
153
154 /**
155 * qatzip_send_prepare: Compress pages and update IO channel info.
156 *
157 * @param p Multifd channel params
158 * @param errp Pointer to error, which will be set in case of error
159 * @return 0 on success, -1 on error (and *errp will be set)
160 */
qatzip_send_prepare(MultiFDSendParams * p,Error ** errp)161 static int qatzip_send_prepare(MultiFDSendParams *p, Error **errp)
162 {
163 uint32_t page_size = multifd_ram_page_size();
164 MultiFDPages_t *pages = &p->data->u.ram;
165 QatzipData *q = p->compress_data;
166 int ret;
167 unsigned int in_len, out_len;
168
169 if (!multifd_send_prepare_common(p)) {
170 goto out;
171 }
172
173 /*
174 * Unlike other multifd compression implementations, we use a non-streaming
175 * API and place all the data into one buffer, rather than sending each
176 * page to the compression API at a time. Based on initial benchmarks, the
177 * non-streaming API outperforms the streaming API. Plus, the logic in QEMU
178 * is friendly to using the non-streaming API anyway. If either of these
179 * statements becomes no longer true, we can revisit adding a streaming
180 * implementation.
181 */
182 for (int i = 0; i < pages->normal_num; i++) {
183 memcpy(q->in_buf + (i * page_size),
184 pages->block->host + pages->offset[i],
185 page_size);
186 }
187
188 in_len = pages->normal_num * page_size;
189 if (in_len > q->in_len) {
190 error_setg(errp, "multifd %u: unexpectedly large input", p->id);
191 return -1;
192 }
193 out_len = q->out_len;
194
195 ret = qzCompress(&q->sess, q->in_buf, &in_len, q->out_buf, &out_len, 1);
196 if (ret != QZ_OK) {
197 error_setg(errp, "multifd %u: QATzip returned %d instead of QZ_OK",
198 p->id, ret);
199 return -1;
200 }
201 if (in_len != pages->normal_num * page_size) {
202 error_setg(errp, "multifd %u: QATzip failed to compress all input",
203 p->id);
204 return -1;
205 }
206
207 p->iov[p->iovs_num].iov_base = q->out_buf;
208 p->iov[p->iovs_num].iov_len = out_len;
209 p->iovs_num++;
210 p->next_packet_size = out_len;
211
212 out:
213 p->flags |= MULTIFD_FLAG_QATZIP;
214 multifd_send_fill_packet(p);
215 return 0;
216 }
217
218 /**
219 * qatzip_recv_setup: Set up QATzip session and allocate private buffers.
220 *
221 * @param p Multifd channel params
222 * @param errp Pointer to error, which will be set in case of error
223 * @return 0 on success, -1 on error (and *errp will be set)
224 */
qatzip_recv_setup(MultiFDRecvParams * p,Error ** errp)225 static int qatzip_recv_setup(MultiFDRecvParams *p, Error **errp)
226 {
227 QatzipData *q;
228 QzSessionParamsDeflate_T params;
229 const char *err_msg;
230 int ret;
231
232 q = g_new0(QatzipData, 1);
233 p->compress_data = q;
234
235 /*
236 * Initialize QAT device with software fallback by default. This allows
237 * QATzip to use CPU path when QAT hardware reaches maximum throughput.
238 */
239 ret = qzInit(&q->sess, true);
240 if (ret != QZ_OK && ret != QZ_DUPLICATE) {
241 err_msg = "qzInit failed";
242 goto err;
243 }
244
245 ret = qzGetDefaultsDeflate(¶ms);
246 if (ret != QZ_OK) {
247 err_msg = "qzGetDefaultsDeflate failed";
248 goto err;
249 }
250
251 ret = qzSetupSessionDeflate(&q->sess, ¶ms);
252 if (ret != QZ_OK && ret != QZ_DUPLICATE) {
253 err_msg = "qzSetupSessionDeflate failed";
254 goto err;
255 }
256
257 /*
258 * Reserve extra spaces for the incoming packets. Current implementation
259 * doesn't send uncompressed pages in case the compression gets too big.
260 */
261 q->in_len = MULTIFD_PACKET_SIZE * 2;
262 /*
263 * PINNED_MEM is an enum from qatzip headers, which means to use
264 * kzalloc_node() to allocate memory for QAT DMA purposes. When QAT device
265 * is not available or software fallback is used, the malloc flag needs to
266 * be set as COMMON_MEM.
267 */
268 q->in_buf = qzMalloc(q->in_len, 0, PINNED_MEM);
269 if (!q->in_buf) {
270 q->in_buf = qzMalloc(q->in_len, 0, COMMON_MEM);
271 if (!q->in_buf) {
272 err_msg = "qzMalloc failed";
273 goto err;
274 }
275 }
276
277 q->out_len = MULTIFD_PACKET_SIZE;
278 q->out_buf = qzMalloc(q->out_len, 0, PINNED_MEM);
279 if (!q->out_buf) {
280 q->out_buf = qzMalloc(q->out_len, 0, COMMON_MEM);
281 if (!q->out_buf) {
282 err_msg = "qzMalloc failed";
283 goto err;
284 }
285 }
286
287 return 0;
288
289 err:
290 error_setg(errp, "multifd %u: [receiver] %s", p->id, err_msg);
291 return -1;
292 }
293
294 /**
295 * qatzip_recv_cleanup: Tear down QATzip session and release private buffers.
296 *
297 * @param p Multifd channel params
298 * @return None
299 */
qatzip_recv_cleanup(MultiFDRecvParams * p)300 static void qatzip_recv_cleanup(MultiFDRecvParams *p)
301 {
302 QatzipData *q = p->compress_data;
303
304 if (q) {
305 if (q->in_buf) {
306 qzFree(q->in_buf);
307 }
308 if (q->out_buf) {
309 qzFree(q->out_buf);
310 }
311 (void)qzTeardownSession(&q->sess);
312 (void)qzClose(&q->sess);
313 g_free(q);
314 }
315 p->compress_data = NULL;
316 }
317
318
319 /**
320 * qatzip_recv: Decompress pages and copy them to the appropriate
321 * locations.
322 *
323 * @param p Multifd channel params
324 * @param errp Pointer to error, which will be set in case of error
325 * @return 0 on success, -1 on error (and *errp will be set)
326 */
qatzip_recv(MultiFDRecvParams * p,Error ** errp)327 static int qatzip_recv(MultiFDRecvParams *p, Error **errp)
328 {
329 QatzipData *q = p->compress_data;
330 int ret;
331 unsigned int in_len, out_len;
332 uint32_t in_size = p->next_packet_size;
333 uint32_t page_size = multifd_ram_page_size();
334 uint32_t expected_size = p->normal_num * page_size;
335 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
336
337 if (in_size > q->in_len) {
338 error_setg(errp, "multifd %u: received unexpectedly large packet",
339 p->id);
340 return -1;
341 }
342
343 if (flags != MULTIFD_FLAG_QATZIP) {
344 error_setg(errp, "multifd %u: flags received %x flags expected %x",
345 p->id, flags, MULTIFD_FLAG_QATZIP);
346 return -1;
347 }
348
349 multifd_recv_zero_page_process(p);
350 if (!p->normal_num) {
351 assert(in_size == 0);
352 return 0;
353 }
354
355 ret = qio_channel_read_all(p->c, (void *)q->in_buf, in_size, errp);
356 if (ret != 0) {
357 return ret;
358 }
359
360 in_len = in_size;
361 out_len = q->out_len;
362 ret = qzDecompress(&q->sess, q->in_buf, &in_len, q->out_buf, &out_len);
363 if (ret != QZ_OK) {
364 error_setg(errp, "multifd %u: qzDecompress failed", p->id);
365 return -1;
366 }
367 if (out_len != expected_size) {
368 error_setg(errp, "multifd %u: packet size received %u size expected %u",
369 p->id, out_len, expected_size);
370 return -1;
371 }
372
373 /* Copy each page to its appropriate location. */
374 for (int i = 0; i < p->normal_num; i++) {
375 memcpy(p->host + p->normal[i], q->out_buf + page_size * i, page_size);
376 }
377 return 0;
378 }
379
380 static MultiFDMethods multifd_qatzip_ops = {
381 .send_setup = qatzip_send_setup,
382 .send_cleanup = qatzip_send_cleanup,
383 .send_prepare = qatzip_send_prepare,
384 .recv_setup = qatzip_recv_setup,
385 .recv_cleanup = qatzip_recv_cleanup,
386 .recv = qatzip_recv
387 };
388
multifd_qatzip_register(void)389 static void multifd_qatzip_register(void)
390 {
391 multifd_register_ops(MULTIFD_COMPRESSION_QATZIP, &multifd_qatzip_ops);
392 }
393
394 migration_init(multifd_qatzip_register);
395