1 /* 2 * Multifd zlib compression implementation 3 * 4 * Copyright (c) 2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include <zstd.h> 15 #include "qemu/rcu.h" 16 #include "exec/ramblock.h" 17 #include "exec/target_page.h" 18 #include "qapi/error.h" 19 #include "migration.h" 20 #include "trace.h" 21 #include "options.h" 22 #include "multifd.h" 23 24 struct zstd_data { 25 /* stream for compression */ 26 ZSTD_CStream *zcs; 27 /* stream for decompression */ 28 ZSTD_DStream *zds; 29 /* buffers */ 30 ZSTD_inBuffer in; 31 ZSTD_outBuffer out; 32 /* compressed buffer */ 33 uint8_t *zbuff; 34 /* size of compressed buffer */ 35 uint32_t zbuff_len; 36 }; 37 38 /* Multifd zstd compression */ 39 40 static int multifd_zstd_send_setup(MultiFDSendParams *p, Error **errp) 41 { 42 struct zstd_data *z = g_new0(struct zstd_data, 1); 43 int res; 44 45 z->zcs = ZSTD_createCStream(); 46 if (!z->zcs) { 47 g_free(z); 48 error_setg(errp, "multifd %u: zstd createCStream failed", p->id); 49 return -1; 50 } 51 52 res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level()); 53 if (ZSTD_isError(res)) { 54 ZSTD_freeCStream(z->zcs); 55 g_free(z); 56 error_setg(errp, "multifd %u: initCStream failed with error %s", 57 p->id, ZSTD_getErrorName(res)); 58 return -1; 59 } 60 /* This is the maximum size of the compressed buffer */ 61 z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE); 62 z->zbuff = g_try_malloc(z->zbuff_len); 63 if (!z->zbuff) { 64 ZSTD_freeCStream(z->zcs); 65 g_free(z); 66 error_setg(errp, "multifd %u: out of memory for zbuff", p->id); 67 return -1; 68 } 69 p->compress_data = z; 70 71 /* Needs 2 IOVs, one for packet header and one for compressed data */ 72 p->iov = g_new0(struct iovec, 2); 73 return 0; 74 } 75 76 static void multifd_zstd_send_cleanup(MultiFDSendParams *p, Error **errp) 77 { 78 struct zstd_data *z = p->compress_data; 79 80 ZSTD_freeCStream(z->zcs); 81 z->zcs = NULL; 82 g_free(z->zbuff); 83 z->zbuff = NULL; 84 g_free(p->compress_data); 85 p->compress_data = NULL; 86 87 g_free(p->iov); 88 p->iov = NULL; 89 } 90 91 static int multifd_zstd_send_prepare(MultiFDSendParams *p, Error **errp) 92 { 93 MultiFDPages_t *pages = &p->data->u.ram; 94 struct zstd_data *z = p->compress_data; 95 int ret; 96 uint32_t i; 97 98 if (!multifd_send_prepare_common(p)) { 99 goto out; 100 } 101 102 z->out.dst = z->zbuff; 103 z->out.size = z->zbuff_len; 104 z->out.pos = 0; 105 106 for (i = 0; i < pages->normal_num; i++) { 107 ZSTD_EndDirective flush = ZSTD_e_continue; 108 109 if (i == pages->normal_num - 1) { 110 flush = ZSTD_e_flush; 111 } 112 z->in.src = pages->block->host + pages->offset[i]; 113 z->in.size = multifd_ram_page_size(); 114 z->in.pos = 0; 115 116 /* 117 * Welcome to compressStream2 semantics 118 * 119 * We need to loop while: 120 * - return is > 0 121 * - there is input available 122 * - there is output space free 123 */ 124 do { 125 ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush); 126 } while (ret > 0 && (z->in.size - z->in.pos > 0) 127 && (z->out.size - z->out.pos > 0)); 128 if (ret > 0 && (z->in.size - z->in.pos > 0)) { 129 error_setg(errp, "multifd %u: compressStream buffer too small", 130 p->id); 131 return -1; 132 } 133 if (ZSTD_isError(ret)) { 134 error_setg(errp, "multifd %u: compressStream error %s", 135 p->id, ZSTD_getErrorName(ret)); 136 return -1; 137 } 138 } 139 p->iov[p->iovs_num].iov_base = z->zbuff; 140 p->iov[p->iovs_num].iov_len = z->out.pos; 141 p->iovs_num++; 142 p->next_packet_size = z->out.pos; 143 144 out: 145 p->flags |= MULTIFD_FLAG_ZSTD; 146 multifd_send_fill_packet(p); 147 return 0; 148 } 149 150 static int multifd_zstd_recv_setup(MultiFDRecvParams *p, Error **errp) 151 { 152 struct zstd_data *z = g_new0(struct zstd_data, 1); 153 int ret; 154 155 p->compress_data = z; 156 z->zds = ZSTD_createDStream(); 157 if (!z->zds) { 158 g_free(z); 159 error_setg(errp, "multifd %u: zstd createDStream failed", p->id); 160 return -1; 161 } 162 163 ret = ZSTD_initDStream(z->zds); 164 if (ZSTD_isError(ret)) { 165 ZSTD_freeDStream(z->zds); 166 g_free(z); 167 error_setg(errp, "multifd %u: initDStream failed with error %s", 168 p->id, ZSTD_getErrorName(ret)); 169 return -1; 170 } 171 172 /* To be safe, we reserve twice the size of the packet */ 173 z->zbuff_len = MULTIFD_PACKET_SIZE * 2; 174 z->zbuff = g_try_malloc(z->zbuff_len); 175 if (!z->zbuff) { 176 ZSTD_freeDStream(z->zds); 177 g_free(z); 178 error_setg(errp, "multifd %u: out of memory for zbuff", p->id); 179 return -1; 180 } 181 return 0; 182 } 183 184 static void multifd_zstd_recv_cleanup(MultiFDRecvParams *p) 185 { 186 struct zstd_data *z = p->compress_data; 187 188 ZSTD_freeDStream(z->zds); 189 z->zds = NULL; 190 g_free(z->zbuff); 191 z->zbuff = NULL; 192 g_free(p->compress_data); 193 p->compress_data = NULL; 194 } 195 196 static int multifd_zstd_recv(MultiFDRecvParams *p, Error **errp) 197 { 198 uint32_t in_size = p->next_packet_size; 199 uint32_t out_size = 0; 200 uint32_t page_size = multifd_ram_page_size(); 201 uint32_t expected_size = p->normal_num * page_size; 202 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 203 struct zstd_data *z = p->compress_data; 204 int ret; 205 int i; 206 207 if (flags != MULTIFD_FLAG_ZSTD) { 208 error_setg(errp, "multifd %u: flags received %x flags expected %x", 209 p->id, flags, MULTIFD_FLAG_ZSTD); 210 return -1; 211 } 212 213 multifd_recv_zero_page_process(p); 214 215 if (!p->normal_num) { 216 assert(in_size == 0); 217 return 0; 218 } 219 220 ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp); 221 222 if (ret != 0) { 223 return ret; 224 } 225 226 z->in.src = z->zbuff; 227 z->in.size = in_size; 228 z->in.pos = 0; 229 230 for (i = 0; i < p->normal_num; i++) { 231 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); 232 z->out.dst = p->host + p->normal[i]; 233 z->out.size = page_size; 234 z->out.pos = 0; 235 236 /* 237 * Welcome to decompressStream semantics 238 * 239 * We need to loop while: 240 * - return is > 0 241 * - there is input available 242 * - we haven't put out a full page 243 */ 244 do { 245 ret = ZSTD_decompressStream(z->zds, &z->out, &z->in); 246 } while (ret > 0 && (z->in.size - z->in.pos > 0) 247 && (z->out.pos < page_size)); 248 if (ret > 0 && (z->out.pos < page_size)) { 249 error_setg(errp, "multifd %u: decompressStream buffer too small", 250 p->id); 251 return -1; 252 } 253 if (ZSTD_isError(ret)) { 254 error_setg(errp, "multifd %u: decompressStream returned %s", 255 p->id, ZSTD_getErrorName(ret)); 256 return ret; 257 } 258 out_size += z->out.pos; 259 } 260 if (out_size != expected_size) { 261 error_setg(errp, "multifd %u: packet size received %u size expected %u", 262 p->id, out_size, expected_size); 263 return -1; 264 } 265 return 0; 266 } 267 268 static const MultiFDMethods multifd_zstd_ops = { 269 .send_setup = multifd_zstd_send_setup, 270 .send_cleanup = multifd_zstd_send_cleanup, 271 .send_prepare = multifd_zstd_send_prepare, 272 .recv_setup = multifd_zstd_recv_setup, 273 .recv_cleanup = multifd_zstd_recv_cleanup, 274 .recv = multifd_zstd_recv 275 }; 276 277 static void multifd_zstd_register(void) 278 { 279 multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops); 280 } 281 282 migration_init(multifd_zstd_register); 283