187dc6f5fSJuan Quintela /*
287dc6f5fSJuan Quintela * Multifd zlib compression implementation
387dc6f5fSJuan Quintela *
487dc6f5fSJuan Quintela * Copyright (c) 2020 Red Hat Inc
587dc6f5fSJuan Quintela *
687dc6f5fSJuan Quintela * Authors:
787dc6f5fSJuan Quintela * Juan Quintela <quintela@redhat.com>
887dc6f5fSJuan Quintela *
987dc6f5fSJuan Quintela * This work is licensed under the terms of the GNU GPL, version 2 or later.
1087dc6f5fSJuan Quintela * See the COPYING file in the top-level directory.
1187dc6f5fSJuan Quintela */
1287dc6f5fSJuan Quintela
1387dc6f5fSJuan Quintela #include "qemu/osdep.h"
1487dc6f5fSJuan Quintela #include <zstd.h>
1587dc6f5fSJuan Quintela #include "qemu/rcu.h"
16f5ff5487SJuan Quintela #include "exec/ramblock.h"
1787dc6f5fSJuan Quintela #include "exec/target_page.h"
1887dc6f5fSJuan Quintela #include "qapi/error.h"
1987dc6f5fSJuan Quintela #include "migration.h"
2087dc6f5fSJuan Quintela #include "trace.h"
211dfc4b9eSJuan Quintela #include "options.h"
2287dc6f5fSJuan Quintela #include "multifd.h"
2387dc6f5fSJuan Quintela
2487dc6f5fSJuan Quintela struct zstd_data {
2587dc6f5fSJuan Quintela /* stream for compression */
2687dc6f5fSJuan Quintela ZSTD_CStream *zcs;
2787dc6f5fSJuan Quintela /* stream for decompression */
2887dc6f5fSJuan Quintela ZSTD_DStream *zds;
2987dc6f5fSJuan Quintela /* buffers */
3087dc6f5fSJuan Quintela ZSTD_inBuffer in;
3187dc6f5fSJuan Quintela ZSTD_outBuffer out;
3287dc6f5fSJuan Quintela /* compressed buffer */
3387dc6f5fSJuan Quintela uint8_t *zbuff;
3487dc6f5fSJuan Quintela /* size of compressed buffer */
3587dc6f5fSJuan Quintela uint32_t zbuff_len;
3687dc6f5fSJuan Quintela };
3787dc6f5fSJuan Quintela
3887dc6f5fSJuan Quintela /* Multifd zstd compression */
3987dc6f5fSJuan Quintela
multifd_zstd_send_setup(MultiFDSendParams * p,Error ** errp)406f848dacSFabiano Rosas static int multifd_zstd_send_setup(MultiFDSendParams *p, Error **errp)
4187dc6f5fSJuan Quintela {
4287dc6f5fSJuan Quintela struct zstd_data *z = g_new0(struct zstd_data, 1);
4387dc6f5fSJuan Quintela int res;
4487dc6f5fSJuan Quintela
4587dc6f5fSJuan Quintela z->zcs = ZSTD_createCStream();
4687dc6f5fSJuan Quintela if (!z->zcs) {
4787dc6f5fSJuan Quintela g_free(z);
4804e11404SJuan Quintela error_setg(errp, "multifd %u: zstd createCStream failed", p->id);
4987dc6f5fSJuan Quintela return -1;
5087dc6f5fSJuan Quintela }
5187dc6f5fSJuan Quintela
5287dc6f5fSJuan Quintela res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level());
5387dc6f5fSJuan Quintela if (ZSTD_isError(res)) {
5487dc6f5fSJuan Quintela ZSTD_freeCStream(z->zcs);
5587dc6f5fSJuan Quintela g_free(z);
5604e11404SJuan Quintela error_setg(errp, "multifd %u: initCStream failed with error %s",
5787dc6f5fSJuan Quintela p->id, ZSTD_getErrorName(res));
5887dc6f5fSJuan Quintela return -1;
5987dc6f5fSJuan Quintela }
60d8b71d96SMichael Tokarev /* This is the maximum size of the compressed buffer */
61fc670522SJuan Quintela z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE);
6287dc6f5fSJuan Quintela z->zbuff = g_try_malloc(z->zbuff_len);
6387dc6f5fSJuan Quintela if (!z->zbuff) {
6487dc6f5fSJuan Quintela ZSTD_freeCStream(z->zcs);
6587dc6f5fSJuan Quintela g_free(z);
6604e11404SJuan Quintela error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
6787dc6f5fSJuan Quintela return -1;
6887dc6f5fSJuan Quintela }
69d9d3e4f2SYuan Liu p->compress_data = z;
70d9d3e4f2SYuan Liu
71d9d3e4f2SYuan Liu /* Needs 2 IOVs, one for packet header and one for compressed data */
72d9d3e4f2SYuan Liu p->iov = g_new0(struct iovec, 2);
7387dc6f5fSJuan Quintela return 0;
7487dc6f5fSJuan Quintela }
7587dc6f5fSJuan Quintela
multifd_zstd_send_cleanup(MultiFDSendParams * p,Error ** errp)766f848dacSFabiano Rosas static void multifd_zstd_send_cleanup(MultiFDSendParams *p, Error **errp)
7787dc6f5fSJuan Quintela {
78402dd7acSFabiano Rosas struct zstd_data *z = p->compress_data;
7987dc6f5fSJuan Quintela
8087dc6f5fSJuan Quintela ZSTD_freeCStream(z->zcs);
8187dc6f5fSJuan Quintela z->zcs = NULL;
8287dc6f5fSJuan Quintela g_free(z->zbuff);
8387dc6f5fSJuan Quintela z->zbuff = NULL;
84402dd7acSFabiano Rosas g_free(p->compress_data);
85402dd7acSFabiano Rosas p->compress_data = NULL;
86d9d3e4f2SYuan Liu
87d9d3e4f2SYuan Liu g_free(p->iov);
88d9d3e4f2SYuan Liu p->iov = NULL;
8987dc6f5fSJuan Quintela }
9087dc6f5fSJuan Quintela
multifd_zstd_send_prepare(MultiFDSendParams * p,Error ** errp)916f848dacSFabiano Rosas static int multifd_zstd_send_prepare(MultiFDSendParams *p, Error **errp)
9287dc6f5fSJuan Quintela {
939f0e1089SFabiano Rosas MultiFDPages_t *pages = &p->data->u.ram;
94402dd7acSFabiano Rosas struct zstd_data *z = p->compress_data;
9587dc6f5fSJuan Quintela int ret;
9687dc6f5fSJuan Quintela uint32_t i;
9787dc6f5fSJuan Quintela
98303e6f54SHao Xiang if (!multifd_send_prepare_common(p)) {
99303e6f54SHao Xiang goto out;
100303e6f54SHao Xiang }
10125a1f878SPeter Xu
10287dc6f5fSJuan Quintela z->out.dst = z->zbuff;
10387dc6f5fSJuan Quintela z->out.size = z->zbuff_len;
10487dc6f5fSJuan Quintela z->out.pos = 0;
10587dc6f5fSJuan Quintela
106303e6f54SHao Xiang for (i = 0; i < pages->normal_num; i++) {
10787dc6f5fSJuan Quintela ZSTD_EndDirective flush = ZSTD_e_continue;
10887dc6f5fSJuan Quintela
109303e6f54SHao Xiang if (i == pages->normal_num - 1) {
11087dc6f5fSJuan Quintela flush = ZSTD_e_flush;
11187dc6f5fSJuan Quintela }
112bc112a6cSFabiano Rosas z->in.src = pages->block->host + pages->offset[i];
11390fa121cSFabiano Rosas z->in.size = multifd_ram_page_size();
11487dc6f5fSJuan Quintela z->in.pos = 0;
11587dc6f5fSJuan Quintela
11687dc6f5fSJuan Quintela /*
11787dc6f5fSJuan Quintela * Welcome to compressStream2 semantics
11887dc6f5fSJuan Quintela *
11987dc6f5fSJuan Quintela * We need to loop while:
12087dc6f5fSJuan Quintela * - return is > 0
12187dc6f5fSJuan Quintela * - there is input available
12287dc6f5fSJuan Quintela * - there is output space free
12387dc6f5fSJuan Quintela */
12487dc6f5fSJuan Quintela do {
12587dc6f5fSJuan Quintela ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush);
126*cb0ed522SStefan Weil } while (ret > 0 && (z->in.size > z->in.pos)
127*cb0ed522SStefan Weil && (z->out.size > z->out.pos));
128*cb0ed522SStefan Weil if (ret > 0 && (z->in.size > z->in.pos)) {
12904e11404SJuan Quintela error_setg(errp, "multifd %u: compressStream buffer too small",
13087dc6f5fSJuan Quintela p->id);
13187dc6f5fSJuan Quintela return -1;
13287dc6f5fSJuan Quintela }
13387dc6f5fSJuan Quintela if (ZSTD_isError(ret)) {
13404e11404SJuan Quintela error_setg(errp, "multifd %u: compressStream error %s",
13587dc6f5fSJuan Quintela p->id, ZSTD_getErrorName(ret));
13687dc6f5fSJuan Quintela return -1;
13787dc6f5fSJuan Quintela }
13887dc6f5fSJuan Quintela }
1390a818b89SJuan Quintela p->iov[p->iovs_num].iov_base = z->zbuff;
1400a818b89SJuan Quintela p->iov[p->iovs_num].iov_len = z->out.pos;
1410a818b89SJuan Quintela p->iovs_num++;
14287dc6f5fSJuan Quintela p->next_packet_size = z->out.pos;
143303e6f54SHao Xiang
144303e6f54SHao Xiang out:
14587dc6f5fSJuan Quintela p->flags |= MULTIFD_FLAG_ZSTD;
14625a1f878SPeter Xu multifd_send_fill_packet(p);
14787dc6f5fSJuan Quintela return 0;
14887dc6f5fSJuan Quintela }
14987dc6f5fSJuan Quintela
multifd_zstd_recv_setup(MultiFDRecvParams * p,Error ** errp)1506f848dacSFabiano Rosas static int multifd_zstd_recv_setup(MultiFDRecvParams *p, Error **errp)
15187dc6f5fSJuan Quintela {
15287dc6f5fSJuan Quintela struct zstd_data *z = g_new0(struct zstd_data, 1);
15387dc6f5fSJuan Quintela int ret;
15487dc6f5fSJuan Quintela
155402dd7acSFabiano Rosas p->compress_data = z;
15687dc6f5fSJuan Quintela z->zds = ZSTD_createDStream();
15787dc6f5fSJuan Quintela if (!z->zds) {
15887dc6f5fSJuan Quintela g_free(z);
15904e11404SJuan Quintela error_setg(errp, "multifd %u: zstd createDStream failed", p->id);
16087dc6f5fSJuan Quintela return -1;
16187dc6f5fSJuan Quintela }
16287dc6f5fSJuan Quintela
16387dc6f5fSJuan Quintela ret = ZSTD_initDStream(z->zds);
16487dc6f5fSJuan Quintela if (ZSTD_isError(ret)) {
16587dc6f5fSJuan Quintela ZSTD_freeDStream(z->zds);
16687dc6f5fSJuan Quintela g_free(z);
16704e11404SJuan Quintela error_setg(errp, "multifd %u: initDStream failed with error %s",
16887dc6f5fSJuan Quintela p->id, ZSTD_getErrorName(ret));
16987dc6f5fSJuan Quintela return -1;
17087dc6f5fSJuan Quintela }
17187dc6f5fSJuan Quintela
17247a17824SJuan Quintela /* To be safe, we reserve twice the size of the packet */
17347a17824SJuan Quintela z->zbuff_len = MULTIFD_PACKET_SIZE * 2;
17487dc6f5fSJuan Quintela z->zbuff = g_try_malloc(z->zbuff_len);
17587dc6f5fSJuan Quintela if (!z->zbuff) {
17687dc6f5fSJuan Quintela ZSTD_freeDStream(z->zds);
17787dc6f5fSJuan Quintela g_free(z);
17804e11404SJuan Quintela error_setg(errp, "multifd %u: out of memory for zbuff", p->id);
17987dc6f5fSJuan Quintela return -1;
18087dc6f5fSJuan Quintela }
18187dc6f5fSJuan Quintela return 0;
18287dc6f5fSJuan Quintela }
18387dc6f5fSJuan Quintela
multifd_zstd_recv_cleanup(MultiFDRecvParams * p)1846f848dacSFabiano Rosas static void multifd_zstd_recv_cleanup(MultiFDRecvParams *p)
18587dc6f5fSJuan Quintela {
186402dd7acSFabiano Rosas struct zstd_data *z = p->compress_data;
18787dc6f5fSJuan Quintela
18887dc6f5fSJuan Quintela ZSTD_freeDStream(z->zds);
18987dc6f5fSJuan Quintela z->zds = NULL;
19087dc6f5fSJuan Quintela g_free(z->zbuff);
19187dc6f5fSJuan Quintela z->zbuff = NULL;
192402dd7acSFabiano Rosas g_free(p->compress_data);
193402dd7acSFabiano Rosas p->compress_data = NULL;
19487dc6f5fSJuan Quintela }
19587dc6f5fSJuan Quintela
multifd_zstd_recv(MultiFDRecvParams * p,Error ** errp)1966f848dacSFabiano Rosas static int multifd_zstd_recv(MultiFDRecvParams *p, Error **errp)
19787dc6f5fSJuan Quintela {
19887dc6f5fSJuan Quintela uint32_t in_size = p->next_packet_size;
19987dc6f5fSJuan Quintela uint32_t out_size = 0;
20090fa121cSFabiano Rosas uint32_t page_size = multifd_ram_page_size();
20190fa121cSFabiano Rosas uint32_t expected_size = p->normal_num * page_size;
20287dc6f5fSJuan Quintela uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
203402dd7acSFabiano Rosas struct zstd_data *z = p->compress_data;
20487dc6f5fSJuan Quintela int ret;
20587dc6f5fSJuan Quintela int i;
20687dc6f5fSJuan Quintela
20787dc6f5fSJuan Quintela if (flags != MULTIFD_FLAG_ZSTD) {
20804e11404SJuan Quintela error_setg(errp, "multifd %u: flags received %x flags expected %x",
20987dc6f5fSJuan Quintela p->id, flags, MULTIFD_FLAG_ZSTD);
21087dc6f5fSJuan Quintela return -1;
21187dc6f5fSJuan Quintela }
212303e6f54SHao Xiang
213303e6f54SHao Xiang multifd_recv_zero_page_process(p);
214303e6f54SHao Xiang
215303e6f54SHao Xiang if (!p->normal_num) {
216303e6f54SHao Xiang assert(in_size == 0);
217303e6f54SHao Xiang return 0;
218303e6f54SHao Xiang }
219303e6f54SHao Xiang
22087dc6f5fSJuan Quintela ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp);
22187dc6f5fSJuan Quintela
22287dc6f5fSJuan Quintela if (ret != 0) {
22387dc6f5fSJuan Quintela return ret;
22487dc6f5fSJuan Quintela }
22587dc6f5fSJuan Quintela
22687dc6f5fSJuan Quintela z->in.src = z->zbuff;
22787dc6f5fSJuan Quintela z->in.size = in_size;
22887dc6f5fSJuan Quintela z->in.pos = 0;
22987dc6f5fSJuan Quintela
230cf2d4aa8SJuan Quintela for (i = 0; i < p->normal_num; i++) {
2315ef7e26bSYuan Liu ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
232faf60935SJuan Quintela z->out.dst = p->host + p->normal[i];
23390fa121cSFabiano Rosas z->out.size = page_size;
23487dc6f5fSJuan Quintela z->out.pos = 0;
23587dc6f5fSJuan Quintela
23687dc6f5fSJuan Quintela /*
23787dc6f5fSJuan Quintela * Welcome to decompressStream semantics
23887dc6f5fSJuan Quintela *
23987dc6f5fSJuan Quintela * We need to loop while:
24087dc6f5fSJuan Quintela * - return is > 0
24187dc6f5fSJuan Quintela * - there is input available
24287dc6f5fSJuan Quintela * - we haven't put out a full page
24387dc6f5fSJuan Quintela */
24487dc6f5fSJuan Quintela do {
24587dc6f5fSJuan Quintela ret = ZSTD_decompressStream(z->zds, &z->out, &z->in);
246*cb0ed522SStefan Weil } while (ret > 0 && (z->in.size > z->in.pos)
24790fa121cSFabiano Rosas && (z->out.pos < page_size));
24890fa121cSFabiano Rosas if (ret > 0 && (z->out.pos < page_size)) {
24904e11404SJuan Quintela error_setg(errp, "multifd %u: decompressStream buffer too small",
25087dc6f5fSJuan Quintela p->id);
25187dc6f5fSJuan Quintela return -1;
25287dc6f5fSJuan Quintela }
25387dc6f5fSJuan Quintela if (ZSTD_isError(ret)) {
25404e11404SJuan Quintela error_setg(errp, "multifd %u: decompressStream returned %s",
25587dc6f5fSJuan Quintela p->id, ZSTD_getErrorName(ret));
25687dc6f5fSJuan Quintela return ret;
25787dc6f5fSJuan Quintela }
25887dc6f5fSJuan Quintela out_size += z->out.pos;
25987dc6f5fSJuan Quintela }
26087dc6f5fSJuan Quintela if (out_size != expected_size) {
26104e11404SJuan Quintela error_setg(errp, "multifd %u: packet size received %u size expected %u",
26287dc6f5fSJuan Quintela p->id, out_size, expected_size);
26387dc6f5fSJuan Quintela return -1;
26487dc6f5fSJuan Quintela }
26587dc6f5fSJuan Quintela return 0;
26687dc6f5fSJuan Quintela }
26787dc6f5fSJuan Quintela
268308d165cSFabiano Rosas static const MultiFDMethods multifd_zstd_ops = {
2696f848dacSFabiano Rosas .send_setup = multifd_zstd_send_setup,
2706f848dacSFabiano Rosas .send_cleanup = multifd_zstd_send_cleanup,
2716f848dacSFabiano Rosas .send_prepare = multifd_zstd_send_prepare,
2726f848dacSFabiano Rosas .recv_setup = multifd_zstd_recv_setup,
2736f848dacSFabiano Rosas .recv_cleanup = multifd_zstd_recv_cleanup,
2746f848dacSFabiano Rosas .recv = multifd_zstd_recv
27587dc6f5fSJuan Quintela };
27687dc6f5fSJuan Quintela
multifd_zstd_register(void)27787dc6f5fSJuan Quintela static void multifd_zstd_register(void)
27887dc6f5fSJuan Quintela {
27987dc6f5fSJuan Quintela multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops);
28087dc6f5fSJuan Quintela }
28187dc6f5fSJuan Quintela
28287dc6f5fSJuan Quintela migration_init(multifd_zstd_register);
283