1 /* 2 * Multifd zlib compression implementation 3 * 4 * Copyright (c) 2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include <zstd.h> 15 #include "qemu/rcu.h" 16 #include "exec/ramblock.h" 17 #include "exec/target_page.h" 18 #include "qapi/error.h" 19 #include "migration.h" 20 #include "trace.h" 21 #include "options.h" 22 #include "multifd.h" 23 24 struct zstd_data { 25 /* stream for compression */ 26 ZSTD_CStream *zcs; 27 /* stream for decompression */ 28 ZSTD_DStream *zds; 29 /* buffers */ 30 ZSTD_inBuffer in; 31 ZSTD_outBuffer out; 32 /* compressed buffer */ 33 uint8_t *zbuff; 34 /* size of compressed buffer */ 35 uint32_t zbuff_len; 36 }; 37 38 /* Multifd zstd compression */ 39 40 /** 41 * zstd_send_setup: setup send side 42 * 43 * Setup each channel with zstd compression. 44 * 45 * Returns 0 for success or -1 for error 46 * 47 * @p: Params for the channel that we are using 48 * @errp: pointer to an error 49 */ 50 static int zstd_send_setup(MultiFDSendParams *p, Error **errp) 51 { 52 struct zstd_data *z = g_new0(struct zstd_data, 1); 53 int res; 54 55 z->zcs = ZSTD_createCStream(); 56 if (!z->zcs) { 57 g_free(z); 58 error_setg(errp, "multifd %u: zstd createCStream failed", p->id); 59 return -1; 60 } 61 62 res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level()); 63 if (ZSTD_isError(res)) { 64 ZSTD_freeCStream(z->zcs); 65 g_free(z); 66 error_setg(errp, "multifd %u: initCStream failed with error %s", 67 p->id, ZSTD_getErrorName(res)); 68 return -1; 69 } 70 /* This is the maximum size of the compressed buffer */ 71 z->zbuff_len = ZSTD_compressBound(MULTIFD_PACKET_SIZE); 72 z->zbuff = g_try_malloc(z->zbuff_len); 73 if (!z->zbuff) { 74 ZSTD_freeCStream(z->zcs); 75 g_free(z); 76 error_setg(errp, "multifd %u: out of memory for zbuff", p->id); 77 return -1; 78 } 79 p->compress_data = z; 80 81 /* Needs 2 IOVs, one for packet header and one for compressed data */ 82 p->iov = g_new0(struct iovec, 2); 83 return 0; 84 } 85 86 /** 87 * zstd_send_cleanup: cleanup send side 88 * 89 * Close the channel and return memory. 90 * 91 * @p: Params for the channel that we are using 92 * @errp: pointer to an error 93 */ 94 static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) 95 { 96 struct zstd_data *z = p->compress_data; 97 98 ZSTD_freeCStream(z->zcs); 99 z->zcs = NULL; 100 g_free(z->zbuff); 101 z->zbuff = NULL; 102 g_free(p->compress_data); 103 p->compress_data = NULL; 104 105 g_free(p->iov); 106 p->iov = NULL; 107 } 108 109 /** 110 * zstd_send_prepare: prepare date to be able to send 111 * 112 * Create a compressed buffer with all the pages that we are going to 113 * send. 114 * 115 * Returns 0 for success or -1 for error 116 * 117 * @p: Params for the channel that we are using 118 * @errp: pointer to an error 119 */ 120 static int zstd_send_prepare(MultiFDSendParams *p, Error **errp) 121 { 122 MultiFDPages_t *pages = p->pages; 123 struct zstd_data *z = p->compress_data; 124 int ret; 125 uint32_t i; 126 127 if (!multifd_send_prepare_common(p)) { 128 goto out; 129 } 130 131 z->out.dst = z->zbuff; 132 z->out.size = z->zbuff_len; 133 z->out.pos = 0; 134 135 for (i = 0; i < pages->normal_num; i++) { 136 ZSTD_EndDirective flush = ZSTD_e_continue; 137 138 if (i == pages->normal_num - 1) { 139 flush = ZSTD_e_flush; 140 } 141 z->in.src = p->pages->block->host + pages->offset[i]; 142 z->in.size = p->page_size; 143 z->in.pos = 0; 144 145 /* 146 * Welcome to compressStream2 semantics 147 * 148 * We need to loop while: 149 * - return is > 0 150 * - there is input available 151 * - there is output space free 152 */ 153 do { 154 ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush); 155 } while (ret > 0 && (z->in.size - z->in.pos > 0) 156 && (z->out.size - z->out.pos > 0)); 157 if (ret > 0 && (z->in.size - z->in.pos > 0)) { 158 error_setg(errp, "multifd %u: compressStream buffer too small", 159 p->id); 160 return -1; 161 } 162 if (ZSTD_isError(ret)) { 163 error_setg(errp, "multifd %u: compressStream error %s", 164 p->id, ZSTD_getErrorName(ret)); 165 return -1; 166 } 167 } 168 p->iov[p->iovs_num].iov_base = z->zbuff; 169 p->iov[p->iovs_num].iov_len = z->out.pos; 170 p->iovs_num++; 171 p->next_packet_size = z->out.pos; 172 173 out: 174 p->flags |= MULTIFD_FLAG_ZSTD; 175 multifd_send_fill_packet(p); 176 return 0; 177 } 178 179 /** 180 * zstd_recv_setup: setup receive side 181 * 182 * Create the compressed channel and buffer. 183 * 184 * Returns 0 for success or -1 for error 185 * 186 * @p: Params for the channel that we are using 187 * @errp: pointer to an error 188 */ 189 static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) 190 { 191 struct zstd_data *z = g_new0(struct zstd_data, 1); 192 int ret; 193 194 p->compress_data = z; 195 z->zds = ZSTD_createDStream(); 196 if (!z->zds) { 197 g_free(z); 198 error_setg(errp, "multifd %u: zstd createDStream failed", p->id); 199 return -1; 200 } 201 202 ret = ZSTD_initDStream(z->zds); 203 if (ZSTD_isError(ret)) { 204 ZSTD_freeDStream(z->zds); 205 g_free(z); 206 error_setg(errp, "multifd %u: initDStream failed with error %s", 207 p->id, ZSTD_getErrorName(ret)); 208 return -1; 209 } 210 211 /* To be safe, we reserve twice the size of the packet */ 212 z->zbuff_len = MULTIFD_PACKET_SIZE * 2; 213 z->zbuff = g_try_malloc(z->zbuff_len); 214 if (!z->zbuff) { 215 ZSTD_freeDStream(z->zds); 216 g_free(z); 217 error_setg(errp, "multifd %u: out of memory for zbuff", p->id); 218 return -1; 219 } 220 return 0; 221 } 222 223 /** 224 * zstd_recv_cleanup: setup receive side 225 * 226 * For no compression this function does nothing. 227 * 228 * @p: Params for the channel that we are using 229 */ 230 static void zstd_recv_cleanup(MultiFDRecvParams *p) 231 { 232 struct zstd_data *z = p->compress_data; 233 234 ZSTD_freeDStream(z->zds); 235 z->zds = NULL; 236 g_free(z->zbuff); 237 z->zbuff = NULL; 238 g_free(p->compress_data); 239 p->compress_data = NULL; 240 } 241 242 /** 243 * zstd_recv: read the data from the channel into actual pages 244 * 245 * Read the compressed buffer, and uncompress it into the actual 246 * pages. 247 * 248 * Returns 0 for success or -1 for error 249 * 250 * @p: Params for the channel that we are using 251 * @errp: pointer to an error 252 */ 253 static int zstd_recv(MultiFDRecvParams *p, Error **errp) 254 { 255 uint32_t in_size = p->next_packet_size; 256 uint32_t out_size = 0; 257 uint32_t expected_size = p->normal_num * p->page_size; 258 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 259 struct zstd_data *z = p->compress_data; 260 int ret; 261 int i; 262 263 if (flags != MULTIFD_FLAG_ZSTD) { 264 error_setg(errp, "multifd %u: flags received %x flags expected %x", 265 p->id, flags, MULTIFD_FLAG_ZSTD); 266 return -1; 267 } 268 269 multifd_recv_zero_page_process(p); 270 271 if (!p->normal_num) { 272 assert(in_size == 0); 273 return 0; 274 } 275 276 ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp); 277 278 if (ret != 0) { 279 return ret; 280 } 281 282 z->in.src = z->zbuff; 283 z->in.size = in_size; 284 z->in.pos = 0; 285 286 for (i = 0; i < p->normal_num; i++) { 287 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); 288 z->out.dst = p->host + p->normal[i]; 289 z->out.size = p->page_size; 290 z->out.pos = 0; 291 292 /* 293 * Welcome to decompressStream semantics 294 * 295 * We need to loop while: 296 * - return is > 0 297 * - there is input available 298 * - we haven't put out a full page 299 */ 300 do { 301 ret = ZSTD_decompressStream(z->zds, &z->out, &z->in); 302 } while (ret > 0 && (z->in.size - z->in.pos > 0) 303 && (z->out.pos < p->page_size)); 304 if (ret > 0 && (z->out.pos < p->page_size)) { 305 error_setg(errp, "multifd %u: decompressStream buffer too small", 306 p->id); 307 return -1; 308 } 309 if (ZSTD_isError(ret)) { 310 error_setg(errp, "multifd %u: decompressStream returned %s", 311 p->id, ZSTD_getErrorName(ret)); 312 return ret; 313 } 314 out_size += z->out.pos; 315 } 316 if (out_size != expected_size) { 317 error_setg(errp, "multifd %u: packet size received %u size expected %u", 318 p->id, out_size, expected_size); 319 return -1; 320 } 321 return 0; 322 } 323 324 static MultiFDMethods multifd_zstd_ops = { 325 .send_setup = zstd_send_setup, 326 .send_cleanup = zstd_send_cleanup, 327 .send_prepare = zstd_send_prepare, 328 .recv_setup = zstd_recv_setup, 329 .recv_cleanup = zstd_recv_cleanup, 330 .recv = zstd_recv 331 }; 332 333 static void multifd_zstd_register(void) 334 { 335 multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops); 336 } 337 338 migration_init(multifd_zstd_register); 339