1 /* 2 * Multifd zlib compression implementation 3 * 4 * Copyright (c) 2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include <zstd.h> 15 #include "qemu/rcu.h" 16 #include "exec/ramblock.h" 17 #include "exec/target_page.h" 18 #include "qapi/error.h" 19 #include "migration.h" 20 #include "trace.h" 21 #include "multifd.h" 22 23 struct zstd_data { 24 /* stream for compression */ 25 ZSTD_CStream *zcs; 26 /* stream for decompression */ 27 ZSTD_DStream *zds; 28 /* buffers */ 29 ZSTD_inBuffer in; 30 ZSTD_outBuffer out; 31 /* compressed buffer */ 32 uint8_t *zbuff; 33 /* size of compressed buffer */ 34 uint32_t zbuff_len; 35 }; 36 37 /* Multifd zstd compression */ 38 39 /** 40 * zstd_send_setup: setup send side 41 * 42 * Setup each channel with zstd compression. 43 * 44 * Returns 0 for success or -1 for error 45 * 46 * @p: Params for the channel that we are using 47 * @errp: pointer to an error 48 */ 49 static int zstd_send_setup(MultiFDSendParams *p, Error **errp) 50 { 51 struct zstd_data *z = g_new0(struct zstd_data, 1); 52 int res; 53 54 p->data = z; 55 z->zcs = ZSTD_createCStream(); 56 if (!z->zcs) { 57 g_free(z); 58 error_setg(errp, "multifd %d: zstd createCStream failed", p->id); 59 return -1; 60 } 61 62 res = ZSTD_initCStream(z->zcs, migrate_multifd_zstd_level()); 63 if (ZSTD_isError(res)) { 64 ZSTD_freeCStream(z->zcs); 65 g_free(z); 66 error_setg(errp, "multifd %d: initCStream failed with error %s", 67 p->id, ZSTD_getErrorName(res)); 68 return -1; 69 } 70 /* To be safe, we reserve twice the size of the packet */ 71 z->zbuff_len = MULTIFD_PACKET_SIZE * 2; 72 z->zbuff = g_try_malloc(z->zbuff_len); 73 if (!z->zbuff) { 74 ZSTD_freeCStream(z->zcs); 75 g_free(z); 76 error_setg(errp, "multifd %d: out of memory for zbuff", p->id); 77 return -1; 78 } 79 return 0; 80 } 81 82 /** 83 * zstd_send_cleanup: cleanup send side 84 * 85 * Close the channel and return memory. 86 * 87 * @p: Params for the channel that we are using 88 * @errp: pointer to an error 89 */ 90 static void zstd_send_cleanup(MultiFDSendParams *p, Error **errp) 91 { 92 struct zstd_data *z = p->data; 93 94 ZSTD_freeCStream(z->zcs); 95 z->zcs = NULL; 96 g_free(z->zbuff); 97 z->zbuff = NULL; 98 g_free(p->data); 99 p->data = NULL; 100 } 101 102 /** 103 * zstd_send_prepare: prepare date to be able to send 104 * 105 * Create a compressed buffer with all the pages that we are going to 106 * send. 107 * 108 * Returns 0 for success or -1 for error 109 * 110 * @p: Params for the channel that we are using 111 * @errp: pointer to an error 112 */ 113 static int zstd_send_prepare(MultiFDSendParams *p, Error **errp) 114 { 115 struct zstd_data *z = p->data; 116 size_t page_size = qemu_target_page_size(); 117 int ret; 118 uint32_t i; 119 120 z->out.dst = z->zbuff; 121 z->out.size = z->zbuff_len; 122 z->out.pos = 0; 123 124 for (i = 0; i < p->pages->num; i++) { 125 ZSTD_EndDirective flush = ZSTD_e_continue; 126 127 if (i == p->pages->num - 1) { 128 flush = ZSTD_e_flush; 129 } 130 z->in.src = p->pages->block->host + p->pages->offset[i]; 131 z->in.size = page_size; 132 z->in.pos = 0; 133 134 /* 135 * Welcome to compressStream2 semantics 136 * 137 * We need to loop while: 138 * - return is > 0 139 * - there is input available 140 * - there is output space free 141 */ 142 do { 143 ret = ZSTD_compressStream2(z->zcs, &z->out, &z->in, flush); 144 } while (ret > 0 && (z->in.size - z->in.pos > 0) 145 && (z->out.size - z->out.pos > 0)); 146 if (ret > 0 && (z->in.size - z->in.pos > 0)) { 147 error_setg(errp, "multifd %d: compressStream buffer too small", 148 p->id); 149 return -1; 150 } 151 if (ZSTD_isError(ret)) { 152 error_setg(errp, "multifd %d: compressStream error %s", 153 p->id, ZSTD_getErrorName(ret)); 154 return -1; 155 } 156 } 157 p->next_packet_size = z->out.pos; 158 p->flags |= MULTIFD_FLAG_ZSTD; 159 160 return 0; 161 } 162 163 /** 164 * zstd_send_write: do the actual write of the data 165 * 166 * Do the actual write of the comprresed buffer. 167 * 168 * Returns 0 for success or -1 for error 169 * 170 * @p: Params for the channel that we are using 171 * @used: number of pages used 172 * @errp: pointer to an error 173 */ 174 static int zstd_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 175 { 176 struct zstd_data *z = p->data; 177 178 return qio_channel_write_all(p->c, (void *)z->zbuff, p->next_packet_size, 179 errp); 180 } 181 182 /** 183 * zstd_recv_setup: setup receive side 184 * 185 * Create the compressed channel and buffer. 186 * 187 * Returns 0 for success or -1 for error 188 * 189 * @p: Params for the channel that we are using 190 * @errp: pointer to an error 191 */ 192 static int zstd_recv_setup(MultiFDRecvParams *p, Error **errp) 193 { 194 struct zstd_data *z = g_new0(struct zstd_data, 1); 195 int ret; 196 197 p->data = z; 198 z->zds = ZSTD_createDStream(); 199 if (!z->zds) { 200 g_free(z); 201 error_setg(errp, "multifd %d: zstd createDStream failed", p->id); 202 return -1; 203 } 204 205 ret = ZSTD_initDStream(z->zds); 206 if (ZSTD_isError(ret)) { 207 ZSTD_freeDStream(z->zds); 208 g_free(z); 209 error_setg(errp, "multifd %d: initDStream failed with error %s", 210 p->id, ZSTD_getErrorName(ret)); 211 return -1; 212 } 213 214 /* To be safe, we reserve twice the size of the packet */ 215 z->zbuff_len = MULTIFD_PACKET_SIZE * 2; 216 z->zbuff = g_try_malloc(z->zbuff_len); 217 if (!z->zbuff) { 218 ZSTD_freeDStream(z->zds); 219 g_free(z); 220 error_setg(errp, "multifd %d: out of memory for zbuff", p->id); 221 return -1; 222 } 223 return 0; 224 } 225 226 /** 227 * zstd_recv_cleanup: setup receive side 228 * 229 * For no compression this function does nothing. 230 * 231 * @p: Params for the channel that we are using 232 */ 233 static void zstd_recv_cleanup(MultiFDRecvParams *p) 234 { 235 struct zstd_data *z = p->data; 236 237 ZSTD_freeDStream(z->zds); 238 z->zds = NULL; 239 g_free(z->zbuff); 240 z->zbuff = NULL; 241 g_free(p->data); 242 p->data = NULL; 243 } 244 245 /** 246 * zstd_recv_pages: read the data from the channel into actual pages 247 * 248 * Read the compressed buffer, and uncompress it into the actual 249 * pages. 250 * 251 * Returns 0 for success or -1 for error 252 * 253 * @p: Params for the channel that we are using 254 * @errp: pointer to an error 255 */ 256 static int zstd_recv_pages(MultiFDRecvParams *p, Error **errp) 257 { 258 uint32_t in_size = p->next_packet_size; 259 uint32_t out_size = 0; 260 size_t page_size = qemu_target_page_size(); 261 uint32_t expected_size = p->pages->num * page_size; 262 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 263 struct zstd_data *z = p->data; 264 int ret; 265 int i; 266 267 if (flags != MULTIFD_FLAG_ZSTD) { 268 error_setg(errp, "multifd %d: flags received %x flags expected %x", 269 p->id, flags, MULTIFD_FLAG_ZSTD); 270 return -1; 271 } 272 ret = qio_channel_read_all(p->c, (void *)z->zbuff, in_size, errp); 273 274 if (ret != 0) { 275 return ret; 276 } 277 278 z->in.src = z->zbuff; 279 z->in.size = in_size; 280 z->in.pos = 0; 281 282 for (i = 0; i < p->pages->num; i++) { 283 z->out.dst = p->pages->block->host + p->pages->offset[i]; 284 z->out.size = page_size; 285 z->out.pos = 0; 286 287 /* 288 * Welcome to decompressStream semantics 289 * 290 * We need to loop while: 291 * - return is > 0 292 * - there is input available 293 * - we haven't put out a full page 294 */ 295 do { 296 ret = ZSTD_decompressStream(z->zds, &z->out, &z->in); 297 } while (ret > 0 && (z->in.size - z->in.pos > 0) 298 && (z->out.pos < page_size)); 299 if (ret > 0 && (z->out.pos < page_size)) { 300 error_setg(errp, "multifd %d: decompressStream buffer too small", 301 p->id); 302 return -1; 303 } 304 if (ZSTD_isError(ret)) { 305 error_setg(errp, "multifd %d: decompressStream returned %s", 306 p->id, ZSTD_getErrorName(ret)); 307 return ret; 308 } 309 out_size += z->out.pos; 310 } 311 if (out_size != expected_size) { 312 error_setg(errp, "multifd %d: packet size received %d size expected %d", 313 p->id, out_size, expected_size); 314 return -1; 315 } 316 return 0; 317 } 318 319 static MultiFDMethods multifd_zstd_ops = { 320 .send_setup = zstd_send_setup, 321 .send_cleanup = zstd_send_cleanup, 322 .send_prepare = zstd_send_prepare, 323 .send_write = zstd_send_write, 324 .recv_setup = zstd_recv_setup, 325 .recv_cleanup = zstd_recv_cleanup, 326 .recv_pages = zstd_recv_pages 327 }; 328 329 static void multifd_zstd_register(void) 330 { 331 multifd_register_ops(MULTIFD_COMPRESSION_ZSTD, &multifd_zstd_ops); 332 } 333 334 migration_init(multifd_zstd_register); 335