1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "sysemu/sysemu.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* array of pages to sent */ 53 MultiFDPages_t *pages; 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 static bool multifd_use_packets(void) 102 { 103 return !migrate_mapped_ram(); 104 } 105 106 void multifd_send_channel_created(void) 107 { 108 qemu_sem_post(&multifd_send_state->channels_created); 109 } 110 111 static void multifd_set_file_bitmap(MultiFDSendParams *p) 112 { 113 MultiFDPages_t *pages = p->pages; 114 115 assert(pages->block); 116 117 for (int i = 0; i < p->pages->normal_num; i++) { 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); 119 } 120 121 for (int i = p->pages->normal_num; i < p->pages->num; i++) { 122 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); 123 } 124 } 125 126 /* Multifd without compression */ 127 128 /** 129 * nocomp_send_setup: setup send side 130 * 131 * @p: Params for the channel that we are using 132 * @errp: pointer to an error 133 */ 134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 135 { 136 if (migrate_zero_copy_send()) { 137 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 138 } 139 140 if (multifd_use_packets()) { 141 /* We need one extra place for the packet header */ 142 p->iov = g_new0(struct iovec, p->page_count + 1); 143 } else { 144 p->iov = g_new0(struct iovec, p->page_count); 145 } 146 147 return 0; 148 } 149 150 /** 151 * nocomp_send_cleanup: cleanup send side 152 * 153 * For no compression this function does nothing. 154 * 155 * @p: Params for the channel that we are using 156 * @errp: pointer to an error 157 */ 158 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 159 { 160 g_free(p->iov); 161 p->iov = NULL; 162 return; 163 } 164 165 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 166 { 167 MultiFDPages_t *pages = p->pages; 168 169 for (int i = 0; i < pages->normal_num; i++) { 170 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 171 p->iov[p->iovs_num].iov_len = p->page_size; 172 p->iovs_num++; 173 } 174 175 p->next_packet_size = pages->normal_num * p->page_size; 176 } 177 178 /** 179 * nocomp_send_prepare: prepare date to be able to send 180 * 181 * For no compression we just have to calculate the size of the 182 * packet. 183 * 184 * Returns 0 for success or -1 for error 185 * 186 * @p: Params for the channel that we are using 187 * @errp: pointer to an error 188 */ 189 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 190 { 191 bool use_zero_copy_send = migrate_zero_copy_send(); 192 int ret; 193 194 multifd_send_zero_page_detect(p); 195 196 if (!multifd_use_packets()) { 197 multifd_send_prepare_iovs(p); 198 multifd_set_file_bitmap(p); 199 200 return 0; 201 } 202 203 if (!use_zero_copy_send) { 204 /* 205 * Only !zerocopy needs the header in IOV; zerocopy will 206 * send it separately. 207 */ 208 multifd_send_prepare_header(p); 209 } 210 211 multifd_send_prepare_iovs(p); 212 p->flags |= MULTIFD_FLAG_NOCOMP; 213 214 multifd_send_fill_packet(p); 215 216 if (use_zero_copy_send) { 217 /* Send header first, without zerocopy */ 218 ret = qio_channel_write_all(p->c, (void *)p->packet, 219 p->packet_len, errp); 220 if (ret != 0) { 221 return -1; 222 } 223 } 224 225 return 0; 226 } 227 228 /** 229 * nocomp_recv_setup: setup receive side 230 * 231 * For no compression this function does nothing. 232 * 233 * Returns 0 for success or -1 for error 234 * 235 * @p: Params for the channel that we are using 236 * @errp: pointer to an error 237 */ 238 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 239 { 240 p->iov = g_new0(struct iovec, p->page_count); 241 return 0; 242 } 243 244 /** 245 * nocomp_recv_cleanup: setup receive side 246 * 247 * For no compression this function does nothing. 248 * 249 * @p: Params for the channel that we are using 250 */ 251 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 252 { 253 g_free(p->iov); 254 p->iov = NULL; 255 } 256 257 /** 258 * nocomp_recv: read the data from the channel 259 * 260 * For no compression we just need to read things into the correct place. 261 * 262 * Returns 0 for success or -1 for error 263 * 264 * @p: Params for the channel that we are using 265 * @errp: pointer to an error 266 */ 267 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 268 { 269 uint32_t flags; 270 271 if (!multifd_use_packets()) { 272 return multifd_file_recv_data(p, errp); 273 } 274 275 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 276 277 if (flags != MULTIFD_FLAG_NOCOMP) { 278 error_setg(errp, "multifd %u: flags received %x flags expected %x", 279 p->id, flags, MULTIFD_FLAG_NOCOMP); 280 return -1; 281 } 282 283 multifd_recv_zero_page_process(p); 284 285 if (!p->normal_num) { 286 return 0; 287 } 288 289 for (int i = 0; i < p->normal_num; i++) { 290 p->iov[i].iov_base = p->host + p->normal[i]; 291 p->iov[i].iov_len = p->page_size; 292 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); 293 } 294 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 295 } 296 297 static MultiFDMethods multifd_nocomp_ops = { 298 .send_setup = nocomp_send_setup, 299 .send_cleanup = nocomp_send_cleanup, 300 .send_prepare = nocomp_send_prepare, 301 .recv_setup = nocomp_recv_setup, 302 .recv_cleanup = nocomp_recv_cleanup, 303 .recv = nocomp_recv 304 }; 305 306 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 307 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 308 }; 309 310 void multifd_register_ops(int method, MultiFDMethods *ops) 311 { 312 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 313 multifd_ops[method] = ops; 314 } 315 316 /* Reset a MultiFDPages_t* object for the next use */ 317 static void multifd_pages_reset(MultiFDPages_t *pages) 318 { 319 /* 320 * We don't need to touch offset[] array, because it will be 321 * overwritten later when reused. 322 */ 323 pages->num = 0; 324 pages->normal_num = 0; 325 pages->block = NULL; 326 } 327 328 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 329 { 330 MultiFDInit_t msg = {}; 331 size_t size = sizeof(msg); 332 int ret; 333 334 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 335 msg.version = cpu_to_be32(MULTIFD_VERSION); 336 msg.id = p->id; 337 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 338 339 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 340 if (ret != 0) { 341 return -1; 342 } 343 stat64_add(&mig_stats.multifd_bytes, size); 344 return 0; 345 } 346 347 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 348 { 349 MultiFDInit_t msg; 350 int ret; 351 352 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 353 if (ret != 0) { 354 return -1; 355 } 356 357 msg.magic = be32_to_cpu(msg.magic); 358 msg.version = be32_to_cpu(msg.version); 359 360 if (msg.magic != MULTIFD_MAGIC) { 361 error_setg(errp, "multifd: received packet magic %x " 362 "expected %x", msg.magic, MULTIFD_MAGIC); 363 return -1; 364 } 365 366 if (msg.version != MULTIFD_VERSION) { 367 error_setg(errp, "multifd: received packet version %u " 368 "expected %u", msg.version, MULTIFD_VERSION); 369 return -1; 370 } 371 372 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 373 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 374 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 375 376 error_setg(errp, "multifd: received uuid '%s' and expected " 377 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 378 g_free(uuid); 379 g_free(msg_uuid); 380 return -1; 381 } 382 383 if (msg.id > migrate_multifd_channels()) { 384 error_setg(errp, "multifd: received channel id %u is greater than " 385 "number of channels %u", msg.id, migrate_multifd_channels()); 386 return -1; 387 } 388 389 return msg.id; 390 } 391 392 static MultiFDPages_t *multifd_pages_init(uint32_t n) 393 { 394 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 395 396 pages->allocated = n; 397 pages->offset = g_new0(ram_addr_t, n); 398 399 return pages; 400 } 401 402 static void multifd_pages_clear(MultiFDPages_t *pages) 403 { 404 multifd_pages_reset(pages); 405 pages->allocated = 0; 406 g_free(pages->offset); 407 pages->offset = NULL; 408 g_free(pages); 409 } 410 411 void multifd_send_fill_packet(MultiFDSendParams *p) 412 { 413 MultiFDPacket_t *packet = p->packet; 414 MultiFDPages_t *pages = p->pages; 415 uint64_t packet_num; 416 uint32_t zero_num = pages->num - pages->normal_num; 417 int i; 418 419 packet->flags = cpu_to_be32(p->flags); 420 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 421 packet->normal_pages = cpu_to_be32(pages->normal_num); 422 packet->zero_pages = cpu_to_be32(zero_num); 423 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 424 425 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 426 packet->packet_num = cpu_to_be64(packet_num); 427 428 if (pages->block) { 429 strncpy(packet->ramblock, pages->block->idstr, 256); 430 } 431 432 for (i = 0; i < pages->num; i++) { 433 /* there are architectures where ram_addr_t is 32 bit */ 434 uint64_t temp = pages->offset[i]; 435 436 packet->offset[i] = cpu_to_be64(temp); 437 } 438 439 p->packets_sent++; 440 p->total_normal_pages += pages->normal_num; 441 p->total_zero_pages += zero_num; 442 443 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, 444 p->flags, p->next_packet_size); 445 } 446 447 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 448 { 449 MultiFDPacket_t *packet = p->packet; 450 int i; 451 452 packet->magic = be32_to_cpu(packet->magic); 453 if (packet->magic != MULTIFD_MAGIC) { 454 error_setg(errp, "multifd: received packet " 455 "magic %x and expected magic %x", 456 packet->magic, MULTIFD_MAGIC); 457 return -1; 458 } 459 460 packet->version = be32_to_cpu(packet->version); 461 if (packet->version != MULTIFD_VERSION) { 462 error_setg(errp, "multifd: received packet " 463 "version %u and expected version %u", 464 packet->version, MULTIFD_VERSION); 465 return -1; 466 } 467 468 p->flags = be32_to_cpu(packet->flags); 469 470 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 471 /* 472 * If we received a packet that is 100 times bigger than expected 473 * just stop migration. It is a magic number. 474 */ 475 if (packet->pages_alloc > p->page_count) { 476 error_setg(errp, "multifd: received packet " 477 "with size %u and expected a size of %u", 478 packet->pages_alloc, p->page_count) ; 479 return -1; 480 } 481 482 p->normal_num = be32_to_cpu(packet->normal_pages); 483 if (p->normal_num > packet->pages_alloc) { 484 error_setg(errp, "multifd: received packet " 485 "with %u normal pages and expected maximum pages are %u", 486 p->normal_num, packet->pages_alloc) ; 487 return -1; 488 } 489 490 p->zero_num = be32_to_cpu(packet->zero_pages); 491 if (p->zero_num > packet->pages_alloc - p->normal_num) { 492 error_setg(errp, "multifd: received packet " 493 "with %u zero pages and expected maximum zero pages are %u", 494 p->zero_num, packet->pages_alloc - p->normal_num) ; 495 return -1; 496 } 497 498 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 499 p->packet_num = be64_to_cpu(packet->packet_num); 500 p->packets_recved++; 501 p->total_normal_pages += p->normal_num; 502 p->total_zero_pages += p->zero_num; 503 504 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, 505 p->flags, p->next_packet_size); 506 507 if (p->normal_num == 0 && p->zero_num == 0) { 508 return 0; 509 } 510 511 /* make sure that ramblock is 0 terminated */ 512 packet->ramblock[255] = 0; 513 p->block = qemu_ram_block_by_name(packet->ramblock); 514 if (!p->block) { 515 error_setg(errp, "multifd: unknown ram block %s", 516 packet->ramblock); 517 return -1; 518 } 519 520 p->host = p->block->host; 521 for (i = 0; i < p->normal_num; i++) { 522 uint64_t offset = be64_to_cpu(packet->offset[i]); 523 524 if (offset > (p->block->used_length - p->page_size)) { 525 error_setg(errp, "multifd: offset too long %" PRIu64 526 " (max " RAM_ADDR_FMT ")", 527 offset, p->block->used_length); 528 return -1; 529 } 530 p->normal[i] = offset; 531 } 532 533 for (i = 0; i < p->zero_num; i++) { 534 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); 535 536 if (offset > (p->block->used_length - p->page_size)) { 537 error_setg(errp, "multifd: offset too long %" PRIu64 538 " (max " RAM_ADDR_FMT ")", 539 offset, p->block->used_length); 540 return -1; 541 } 542 p->zero[i] = offset; 543 } 544 545 return 0; 546 } 547 548 static bool multifd_send_should_exit(void) 549 { 550 return qatomic_read(&multifd_send_state->exiting); 551 } 552 553 static bool multifd_recv_should_exit(void) 554 { 555 return qatomic_read(&multifd_recv_state->exiting); 556 } 557 558 /* 559 * The migration thread can wait on either of the two semaphores. This 560 * function can be used to kick the main thread out of waiting on either of 561 * them. Should mostly only be called when something wrong happened with 562 * the current multifd send thread. 563 */ 564 static void multifd_send_kick_main(MultiFDSendParams *p) 565 { 566 qemu_sem_post(&p->sem_sync); 567 qemu_sem_post(&multifd_send_state->channels_ready); 568 } 569 570 /* 571 * How we use multifd_send_state->pages and channel->pages? 572 * 573 * We create a pages for each channel, and a main one. Each time that 574 * we need to send a batch of pages we interchange the ones between 575 * multifd_send_state and the channel that is sending it. There are 576 * two reasons for that: 577 * - to not have to do so many mallocs during migration 578 * - to make easier to know what to free at the end of migration 579 * 580 * This way we always know who is the owner of each "pages" struct, 581 * and we don't need any locking. It belongs to the migration thread 582 * or to the channel thread. Switching is safe because the migration 583 * thread is using the channel mutex when changing it, and the channel 584 * have to had finish with its own, otherwise pending_job can't be 585 * false. 586 * 587 * Returns true if succeed, false otherwise. 588 */ 589 static bool multifd_send_pages(void) 590 { 591 int i; 592 static int next_channel; 593 MultiFDSendParams *p = NULL; /* make happy gcc */ 594 MultiFDPages_t *pages = multifd_send_state->pages; 595 596 if (multifd_send_should_exit()) { 597 return false; 598 } 599 600 /* We wait here, until at least one channel is ready */ 601 qemu_sem_wait(&multifd_send_state->channels_ready); 602 603 /* 604 * next_channel can remain from a previous migration that was 605 * using more channels, so ensure it doesn't overflow if the 606 * limit is lower now. 607 */ 608 next_channel %= migrate_multifd_channels(); 609 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 610 if (multifd_send_should_exit()) { 611 return false; 612 } 613 p = &multifd_send_state->params[i]; 614 /* 615 * Lockless read to p->pending_job is safe, because only multifd 616 * sender thread can clear it. 617 */ 618 if (qatomic_read(&p->pending_job) == false) { 619 next_channel = (i + 1) % migrate_multifd_channels(); 620 break; 621 } 622 } 623 624 /* 625 * Make sure we read p->pending_job before all the rest. Pairs with 626 * qatomic_store_release() in multifd_send_thread(). 627 */ 628 smp_mb_acquire(); 629 assert(!p->pages->num); 630 multifd_send_state->pages = p->pages; 631 p->pages = pages; 632 /* 633 * Making sure p->pages is setup before marking pending_job=true. Pairs 634 * with the qatomic_load_acquire() in multifd_send_thread(). 635 */ 636 qatomic_store_release(&p->pending_job, true); 637 qemu_sem_post(&p->sem); 638 639 return true; 640 } 641 642 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 643 { 644 return pages->num == 0; 645 } 646 647 static inline bool multifd_queue_full(MultiFDPages_t *pages) 648 { 649 return pages->num == pages->allocated; 650 } 651 652 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 653 { 654 pages->offset[pages->num++] = offset; 655 } 656 657 /* Returns true if enqueue successful, false otherwise */ 658 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 659 { 660 MultiFDPages_t *pages; 661 662 retry: 663 pages = multifd_send_state->pages; 664 665 /* If the queue is empty, we can already enqueue now */ 666 if (multifd_queue_empty(pages)) { 667 pages->block = block; 668 multifd_enqueue(pages, offset); 669 return true; 670 } 671 672 /* 673 * Not empty, meanwhile we need a flush. It can because of either: 674 * 675 * (1) The page is not on the same ramblock of previous ones, or, 676 * (2) The queue is full. 677 * 678 * After flush, always retry. 679 */ 680 if (pages->block != block || multifd_queue_full(pages)) { 681 if (!multifd_send_pages()) { 682 return false; 683 } 684 goto retry; 685 } 686 687 /* Not empty, and we still have space, do it! */ 688 multifd_enqueue(pages, offset); 689 return true; 690 } 691 692 /* Multifd send side hit an error; remember it and prepare to quit */ 693 static void multifd_send_set_error(Error *err) 694 { 695 /* 696 * We don't want to exit each threads twice. Depending on where 697 * we get the error, or if there are two independent errors in two 698 * threads at the same time, we can end calling this function 699 * twice. 700 */ 701 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 702 return; 703 } 704 705 if (err) { 706 MigrationState *s = migrate_get_current(); 707 migrate_set_error(s, err); 708 if (s->state == MIGRATION_STATUS_SETUP || 709 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 710 s->state == MIGRATION_STATUS_DEVICE || 711 s->state == MIGRATION_STATUS_ACTIVE) { 712 migrate_set_state(&s->state, s->state, 713 MIGRATION_STATUS_FAILED); 714 } 715 } 716 } 717 718 static void multifd_send_terminate_threads(void) 719 { 720 int i; 721 722 trace_multifd_send_terminate_threads(); 723 724 /* 725 * Tell everyone we're quitting. No xchg() needed here; we simply 726 * always set it. 727 */ 728 qatomic_set(&multifd_send_state->exiting, 1); 729 730 /* 731 * Firstly, kick all threads out; no matter whether they are just idle, 732 * or blocked in an IO system call. 733 */ 734 for (i = 0; i < migrate_multifd_channels(); i++) { 735 MultiFDSendParams *p = &multifd_send_state->params[i]; 736 737 qemu_sem_post(&p->sem); 738 if (p->c) { 739 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 740 } 741 } 742 743 /* 744 * Finally recycle all the threads. 745 */ 746 for (i = 0; i < migrate_multifd_channels(); i++) { 747 MultiFDSendParams *p = &multifd_send_state->params[i]; 748 749 if (p->tls_thread_created) { 750 qemu_thread_join(&p->tls_thread); 751 } 752 753 if (p->thread_created) { 754 qemu_thread_join(&p->thread); 755 } 756 } 757 } 758 759 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 760 { 761 if (p->c) { 762 migration_ioc_unregister_yank(p->c); 763 /* 764 * The object_unref() cannot guarantee the fd will always be 765 * released because finalize() of the iochannel is only 766 * triggered on the last reference and it's not guaranteed 767 * that we always hold the last refcount when reaching here. 768 * 769 * Closing the fd explicitly has the benefit that if there is any 770 * registered I/O handler callbacks on such fd, that will get a 771 * POLLNVAL event and will further trigger the cleanup to finally 772 * release the IOC. 773 * 774 * FIXME: It should logically be guaranteed that all multifd 775 * channels have no I/O handler callback registered when reaching 776 * here, because migration thread will wait for all multifd channel 777 * establishments to complete during setup. Since 778 * migrate_fd_cleanup() will be scheduled in main thread too, all 779 * previous callbacks should guarantee to be completed when 780 * reaching here. See multifd_send_state.channels_created and its 781 * usage. In the future, we could replace this with an assert 782 * making sure we're the last reference, or simply drop it if above 783 * is more clear to be justified. 784 */ 785 qio_channel_close(p->c, &error_abort); 786 object_unref(OBJECT(p->c)); 787 p->c = NULL; 788 } 789 qemu_sem_destroy(&p->sem); 790 qemu_sem_destroy(&p->sem_sync); 791 g_free(p->name); 792 p->name = NULL; 793 multifd_pages_clear(p->pages); 794 p->pages = NULL; 795 p->packet_len = 0; 796 g_free(p->packet); 797 p->packet = NULL; 798 multifd_send_state->ops->send_cleanup(p, errp); 799 800 return *errp == NULL; 801 } 802 803 static void multifd_send_cleanup_state(void) 804 { 805 file_cleanup_outgoing_migration(); 806 socket_cleanup_outgoing_migration(); 807 qemu_sem_destroy(&multifd_send_state->channels_created); 808 qemu_sem_destroy(&multifd_send_state->channels_ready); 809 g_free(multifd_send_state->params); 810 multifd_send_state->params = NULL; 811 multifd_pages_clear(multifd_send_state->pages); 812 multifd_send_state->pages = NULL; 813 g_free(multifd_send_state); 814 multifd_send_state = NULL; 815 } 816 817 void multifd_send_shutdown(void) 818 { 819 int i; 820 821 if (!migrate_multifd()) { 822 return; 823 } 824 825 multifd_send_terminate_threads(); 826 827 for (i = 0; i < migrate_multifd_channels(); i++) { 828 MultiFDSendParams *p = &multifd_send_state->params[i]; 829 Error *local_err = NULL; 830 831 if (!multifd_send_cleanup_channel(p, &local_err)) { 832 migrate_set_error(migrate_get_current(), local_err); 833 error_free(local_err); 834 } 835 } 836 837 multifd_send_cleanup_state(); 838 } 839 840 static int multifd_zero_copy_flush(QIOChannel *c) 841 { 842 int ret; 843 Error *err = NULL; 844 845 ret = qio_channel_flush(c, &err); 846 if (ret < 0) { 847 error_report_err(err); 848 return -1; 849 } 850 if (ret == 1) { 851 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 852 } 853 854 return ret; 855 } 856 857 int multifd_send_sync_main(void) 858 { 859 int i; 860 bool flush_zero_copy; 861 862 if (!migrate_multifd()) { 863 return 0; 864 } 865 if (multifd_send_state->pages->num) { 866 if (!multifd_send_pages()) { 867 error_report("%s: multifd_send_pages fail", __func__); 868 return -1; 869 } 870 } 871 872 flush_zero_copy = migrate_zero_copy_send(); 873 874 for (i = 0; i < migrate_multifd_channels(); i++) { 875 MultiFDSendParams *p = &multifd_send_state->params[i]; 876 877 if (multifd_send_should_exit()) { 878 return -1; 879 } 880 881 trace_multifd_send_sync_main_signal(p->id); 882 883 /* 884 * We should be the only user so far, so not possible to be set by 885 * others concurrently. 886 */ 887 assert(qatomic_read(&p->pending_sync) == false); 888 qatomic_set(&p->pending_sync, true); 889 qemu_sem_post(&p->sem); 890 } 891 for (i = 0; i < migrate_multifd_channels(); i++) { 892 MultiFDSendParams *p = &multifd_send_state->params[i]; 893 894 if (multifd_send_should_exit()) { 895 return -1; 896 } 897 898 qemu_sem_wait(&multifd_send_state->channels_ready); 899 trace_multifd_send_sync_main_wait(p->id); 900 qemu_sem_wait(&p->sem_sync); 901 902 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 903 return -1; 904 } 905 } 906 trace_multifd_send_sync_main(multifd_send_state->packet_num); 907 908 return 0; 909 } 910 911 static void *multifd_send_thread(void *opaque) 912 { 913 MultiFDSendParams *p = opaque; 914 MigrationThread *thread = NULL; 915 Error *local_err = NULL; 916 int ret = 0; 917 bool use_packets = multifd_use_packets(); 918 919 thread = migration_threads_add(p->name, qemu_get_thread_id()); 920 921 trace_multifd_send_thread_start(p->id); 922 rcu_register_thread(); 923 924 if (use_packets) { 925 if (multifd_send_initial_packet(p, &local_err) < 0) { 926 ret = -1; 927 goto out; 928 } 929 } 930 931 while (true) { 932 qemu_sem_post(&multifd_send_state->channels_ready); 933 qemu_sem_wait(&p->sem); 934 935 if (multifd_send_should_exit()) { 936 break; 937 } 938 939 /* 940 * Read pending_job flag before p->pages. Pairs with the 941 * qatomic_store_release() in multifd_send_pages(). 942 */ 943 if (qatomic_load_acquire(&p->pending_job)) { 944 MultiFDPages_t *pages = p->pages; 945 946 p->iovs_num = 0; 947 assert(pages->num); 948 949 ret = multifd_send_state->ops->send_prepare(p, &local_err); 950 if (ret != 0) { 951 break; 952 } 953 954 if (migrate_mapped_ram()) { 955 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 956 p->pages->block, &local_err); 957 } else { 958 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 959 NULL, 0, p->write_flags, 960 &local_err); 961 } 962 963 if (ret != 0) { 964 break; 965 } 966 967 stat64_add(&mig_stats.multifd_bytes, 968 p->next_packet_size + p->packet_len); 969 stat64_add(&mig_stats.normal_pages, pages->normal_num); 970 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); 971 972 multifd_pages_reset(p->pages); 973 p->next_packet_size = 0; 974 975 /* 976 * Making sure p->pages is published before saying "we're 977 * free". Pairs with the smp_mb_acquire() in 978 * multifd_send_pages(). 979 */ 980 qatomic_store_release(&p->pending_job, false); 981 } else { 982 /* 983 * If not a normal job, must be a sync request. Note that 984 * pending_sync is a standalone flag (unlike pending_job), so 985 * it doesn't require explicit memory barriers. 986 */ 987 assert(qatomic_read(&p->pending_sync)); 988 989 if (use_packets) { 990 p->flags = MULTIFD_FLAG_SYNC; 991 multifd_send_fill_packet(p); 992 ret = qio_channel_write_all(p->c, (void *)p->packet, 993 p->packet_len, &local_err); 994 if (ret != 0) { 995 break; 996 } 997 /* p->next_packet_size will always be zero for a SYNC packet */ 998 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 999 p->flags = 0; 1000 } 1001 1002 qatomic_set(&p->pending_sync, false); 1003 qemu_sem_post(&p->sem_sync); 1004 } 1005 } 1006 1007 out: 1008 if (ret) { 1009 assert(local_err); 1010 trace_multifd_send_error(p->id); 1011 multifd_send_set_error(local_err); 1012 multifd_send_kick_main(p); 1013 error_free(local_err); 1014 } 1015 1016 rcu_unregister_thread(); 1017 migration_threads_remove(thread); 1018 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, 1019 p->total_zero_pages); 1020 1021 return NULL; 1022 } 1023 1024 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 1025 1026 typedef struct { 1027 MultiFDSendParams *p; 1028 QIOChannelTLS *tioc; 1029 } MultiFDTLSThreadArgs; 1030 1031 static void *multifd_tls_handshake_thread(void *opaque) 1032 { 1033 MultiFDTLSThreadArgs *args = opaque; 1034 1035 qio_channel_tls_handshake(args->tioc, 1036 multifd_new_send_channel_async, 1037 args->p, 1038 NULL, 1039 NULL); 1040 g_free(args); 1041 1042 return NULL; 1043 } 1044 1045 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 1046 QIOChannel *ioc, 1047 Error **errp) 1048 { 1049 MigrationState *s = migrate_get_current(); 1050 const char *hostname = s->hostname; 1051 MultiFDTLSThreadArgs *args; 1052 QIOChannelTLS *tioc; 1053 1054 tioc = migration_tls_client_create(ioc, hostname, errp); 1055 if (!tioc) { 1056 return false; 1057 } 1058 1059 /* 1060 * Ownership of the socket channel now transfers to the newly 1061 * created TLS channel, which has already taken a reference. 1062 */ 1063 object_unref(OBJECT(ioc)); 1064 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1065 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1066 1067 args = g_new0(MultiFDTLSThreadArgs, 1); 1068 args->tioc = tioc; 1069 args->p = p; 1070 1071 p->tls_thread_created = true; 1072 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 1073 multifd_tls_handshake_thread, args, 1074 QEMU_THREAD_JOINABLE); 1075 return true; 1076 } 1077 1078 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1079 { 1080 qio_channel_set_delay(ioc, false); 1081 1082 migration_ioc_register_yank(ioc); 1083 /* Setup p->c only if the channel is completely setup */ 1084 p->c = ioc; 1085 1086 p->thread_created = true; 1087 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1088 QEMU_THREAD_JOINABLE); 1089 } 1090 1091 /* 1092 * When TLS is enabled this function is called once to establish the 1093 * TLS connection and a second time after the TLS handshake to create 1094 * the multifd channel. Without TLS it goes straight into the channel 1095 * creation. 1096 */ 1097 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1098 { 1099 MultiFDSendParams *p = opaque; 1100 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1101 Error *local_err = NULL; 1102 bool ret; 1103 1104 trace_multifd_new_send_channel_async(p->id); 1105 1106 if (qio_task_propagate_error(task, &local_err)) { 1107 ret = false; 1108 goto out; 1109 } 1110 1111 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1112 migrate_get_current()->hostname); 1113 1114 if (migrate_channel_requires_tls_upgrade(ioc)) { 1115 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1116 if (ret) { 1117 return; 1118 } 1119 } else { 1120 multifd_channel_connect(p, ioc); 1121 ret = true; 1122 } 1123 1124 out: 1125 /* 1126 * Here we're not interested whether creation succeeded, only that 1127 * it happened at all. 1128 */ 1129 multifd_send_channel_created(); 1130 1131 if (ret) { 1132 return; 1133 } 1134 1135 trace_multifd_new_send_channel_async_error(p->id, local_err); 1136 multifd_send_set_error(local_err); 1137 /* 1138 * For error cases (TLS or non-TLS), IO channel is always freed here 1139 * rather than when cleanup multifd: since p->c is not set, multifd 1140 * cleanup code doesn't even know its existence. 1141 */ 1142 object_unref(OBJECT(ioc)); 1143 error_free(local_err); 1144 } 1145 1146 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1147 { 1148 if (!multifd_use_packets()) { 1149 return file_send_channel_create(opaque, errp); 1150 } 1151 1152 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1153 return true; 1154 } 1155 1156 bool multifd_send_setup(void) 1157 { 1158 MigrationState *s = migrate_get_current(); 1159 Error *local_err = NULL; 1160 int thread_count, ret = 0; 1161 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1162 bool use_packets = multifd_use_packets(); 1163 uint8_t i; 1164 1165 if (!migrate_multifd()) { 1166 return true; 1167 } 1168 1169 thread_count = migrate_multifd_channels(); 1170 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1171 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1172 multifd_send_state->pages = multifd_pages_init(page_count); 1173 qemu_sem_init(&multifd_send_state->channels_created, 0); 1174 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1175 qatomic_set(&multifd_send_state->exiting, 0); 1176 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1177 1178 for (i = 0; i < thread_count; i++) { 1179 MultiFDSendParams *p = &multifd_send_state->params[i]; 1180 1181 qemu_sem_init(&p->sem, 0); 1182 qemu_sem_init(&p->sem_sync, 0); 1183 p->id = i; 1184 p->pages = multifd_pages_init(page_count); 1185 1186 if (use_packets) { 1187 p->packet_len = sizeof(MultiFDPacket_t) 1188 + sizeof(uint64_t) * page_count; 1189 p->packet = g_malloc0(p->packet_len); 1190 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1191 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1192 } 1193 p->name = g_strdup_printf("multifdsend_%d", i); 1194 p->page_size = qemu_target_page_size(); 1195 p->page_count = page_count; 1196 p->write_flags = 0; 1197 1198 if (!multifd_new_send_channel_create(p, &local_err)) { 1199 return false; 1200 } 1201 } 1202 1203 /* 1204 * Wait until channel creation has started for all channels. The 1205 * creation can still fail, but no more channels will be created 1206 * past this point. 1207 */ 1208 for (i = 0; i < thread_count; i++) { 1209 qemu_sem_wait(&multifd_send_state->channels_created); 1210 } 1211 1212 for (i = 0; i < thread_count; i++) { 1213 MultiFDSendParams *p = &multifd_send_state->params[i]; 1214 1215 ret = multifd_send_state->ops->send_setup(p, &local_err); 1216 if (ret) { 1217 break; 1218 } 1219 } 1220 1221 if (ret) { 1222 migrate_set_error(s, local_err); 1223 error_report_err(local_err); 1224 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1225 MIGRATION_STATUS_FAILED); 1226 return false; 1227 } 1228 1229 return true; 1230 } 1231 1232 bool multifd_recv(void) 1233 { 1234 int i; 1235 static int next_recv_channel; 1236 MultiFDRecvParams *p = NULL; 1237 MultiFDRecvData *data = multifd_recv_state->data; 1238 1239 /* 1240 * next_channel can remain from a previous migration that was 1241 * using more channels, so ensure it doesn't overflow if the 1242 * limit is lower now. 1243 */ 1244 next_recv_channel %= migrate_multifd_channels(); 1245 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1246 if (multifd_recv_should_exit()) { 1247 return false; 1248 } 1249 1250 p = &multifd_recv_state->params[i]; 1251 1252 if (qatomic_read(&p->pending_job) == false) { 1253 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1254 break; 1255 } 1256 } 1257 1258 /* 1259 * Order pending_job read before manipulating p->data below. Pairs 1260 * with qatomic_store_release() at multifd_recv_thread(). 1261 */ 1262 smp_mb_acquire(); 1263 1264 assert(!p->data->size); 1265 multifd_recv_state->data = p->data; 1266 p->data = data; 1267 1268 /* 1269 * Order p->data update before setting pending_job. Pairs with 1270 * qatomic_load_acquire() at multifd_recv_thread(). 1271 */ 1272 qatomic_store_release(&p->pending_job, true); 1273 qemu_sem_post(&p->sem); 1274 1275 return true; 1276 } 1277 1278 MultiFDRecvData *multifd_get_recv_data(void) 1279 { 1280 return multifd_recv_state->data; 1281 } 1282 1283 static void multifd_recv_terminate_threads(Error *err) 1284 { 1285 int i; 1286 1287 trace_multifd_recv_terminate_threads(err != NULL); 1288 1289 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1290 return; 1291 } 1292 1293 if (err) { 1294 MigrationState *s = migrate_get_current(); 1295 migrate_set_error(s, err); 1296 if (s->state == MIGRATION_STATUS_SETUP || 1297 s->state == MIGRATION_STATUS_ACTIVE) { 1298 migrate_set_state(&s->state, s->state, 1299 MIGRATION_STATUS_FAILED); 1300 } 1301 } 1302 1303 for (i = 0; i < migrate_multifd_channels(); i++) { 1304 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1305 1306 /* 1307 * The migration thread and channels interact differently 1308 * depending on the presence of packets. 1309 */ 1310 if (multifd_use_packets()) { 1311 /* 1312 * The channel receives as long as there are packets. When 1313 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1314 * channel waits for the migration thread to sync. If the 1315 * sync never happens, do it here. 1316 */ 1317 qemu_sem_post(&p->sem_sync); 1318 } else { 1319 /* 1320 * The channel waits for the migration thread to give it 1321 * work. When the migration thread runs out of work, it 1322 * releases the channel and waits for any pending work to 1323 * finish. If we reach here (e.g. due to error) before the 1324 * work runs out, release the channel. 1325 */ 1326 qemu_sem_post(&p->sem); 1327 } 1328 1329 /* 1330 * We could arrive here for two reasons: 1331 * - normal quit, i.e. everything went fine, just finished 1332 * - error quit: We close the channels so the channel threads 1333 * finish the qio_channel_read_all_eof() 1334 */ 1335 if (p->c) { 1336 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1337 } 1338 } 1339 } 1340 1341 void multifd_recv_shutdown(void) 1342 { 1343 if (migrate_multifd()) { 1344 multifd_recv_terminate_threads(NULL); 1345 } 1346 } 1347 1348 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1349 { 1350 migration_ioc_unregister_yank(p->c); 1351 object_unref(OBJECT(p->c)); 1352 p->c = NULL; 1353 qemu_mutex_destroy(&p->mutex); 1354 qemu_sem_destroy(&p->sem_sync); 1355 qemu_sem_destroy(&p->sem); 1356 g_free(p->name); 1357 p->name = NULL; 1358 p->packet_len = 0; 1359 g_free(p->packet); 1360 p->packet = NULL; 1361 g_free(p->normal); 1362 p->normal = NULL; 1363 g_free(p->zero); 1364 p->zero = NULL; 1365 multifd_recv_state->ops->recv_cleanup(p); 1366 } 1367 1368 static void multifd_recv_cleanup_state(void) 1369 { 1370 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1371 g_free(multifd_recv_state->params); 1372 multifd_recv_state->params = NULL; 1373 g_free(multifd_recv_state->data); 1374 multifd_recv_state->data = NULL; 1375 g_free(multifd_recv_state); 1376 multifd_recv_state = NULL; 1377 } 1378 1379 void multifd_recv_cleanup(void) 1380 { 1381 int i; 1382 1383 if (!migrate_multifd()) { 1384 return; 1385 } 1386 multifd_recv_terminate_threads(NULL); 1387 for (i = 0; i < migrate_multifd_channels(); i++) { 1388 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1389 1390 if (p->thread_created) { 1391 qemu_thread_join(&p->thread); 1392 } 1393 } 1394 for (i = 0; i < migrate_multifd_channels(); i++) { 1395 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1396 } 1397 multifd_recv_cleanup_state(); 1398 } 1399 1400 void multifd_recv_sync_main(void) 1401 { 1402 int thread_count = migrate_multifd_channels(); 1403 bool file_based = !multifd_use_packets(); 1404 int i; 1405 1406 if (!migrate_multifd()) { 1407 return; 1408 } 1409 1410 /* 1411 * File-based channels don't use packets and therefore need to 1412 * wait for more work. Release them to start the sync. 1413 */ 1414 if (file_based) { 1415 for (i = 0; i < thread_count; i++) { 1416 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1417 1418 trace_multifd_recv_sync_main_signal(p->id); 1419 qemu_sem_post(&p->sem); 1420 } 1421 } 1422 1423 /* 1424 * Initiate the synchronization by waiting for all channels. 1425 * 1426 * For socket-based migration this means each channel has received 1427 * the SYNC packet on the stream. 1428 * 1429 * For file-based migration this means each channel is done with 1430 * the work (pending_job=false). 1431 */ 1432 for (i = 0; i < thread_count; i++) { 1433 trace_multifd_recv_sync_main_wait(i); 1434 qemu_sem_wait(&multifd_recv_state->sem_sync); 1435 } 1436 1437 if (file_based) { 1438 /* 1439 * For file-based loading is done in one iteration. We're 1440 * done. 1441 */ 1442 return; 1443 } 1444 1445 /* 1446 * Sync done. Release the channels for the next iteration. 1447 */ 1448 for (i = 0; i < thread_count; i++) { 1449 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1450 1451 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1452 if (multifd_recv_state->packet_num < p->packet_num) { 1453 multifd_recv_state->packet_num = p->packet_num; 1454 } 1455 } 1456 trace_multifd_recv_sync_main_signal(p->id); 1457 qemu_sem_post(&p->sem_sync); 1458 } 1459 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1460 } 1461 1462 static void *multifd_recv_thread(void *opaque) 1463 { 1464 MultiFDRecvParams *p = opaque; 1465 Error *local_err = NULL; 1466 bool use_packets = multifd_use_packets(); 1467 int ret; 1468 1469 trace_multifd_recv_thread_start(p->id); 1470 rcu_register_thread(); 1471 1472 while (true) { 1473 uint32_t flags = 0; 1474 bool has_data = false; 1475 p->normal_num = 0; 1476 1477 if (use_packets) { 1478 if (multifd_recv_should_exit()) { 1479 break; 1480 } 1481 1482 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1483 p->packet_len, &local_err); 1484 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1485 break; 1486 } 1487 1488 qemu_mutex_lock(&p->mutex); 1489 ret = multifd_recv_unfill_packet(p, &local_err); 1490 if (ret) { 1491 qemu_mutex_unlock(&p->mutex); 1492 break; 1493 } 1494 1495 flags = p->flags; 1496 /* recv methods don't know how to handle the SYNC flag */ 1497 p->flags &= ~MULTIFD_FLAG_SYNC; 1498 has_data = p->normal_num || p->zero_num; 1499 qemu_mutex_unlock(&p->mutex); 1500 } else { 1501 /* 1502 * No packets, so we need to wait for the vmstate code to 1503 * give us work. 1504 */ 1505 qemu_sem_wait(&p->sem); 1506 1507 if (multifd_recv_should_exit()) { 1508 break; 1509 } 1510 1511 /* pairs with qatomic_store_release() at multifd_recv() */ 1512 if (!qatomic_load_acquire(&p->pending_job)) { 1513 /* 1514 * Migration thread did not send work, this is 1515 * equivalent to pending_sync on the sending 1516 * side. Post sem_sync to notify we reached this 1517 * point. 1518 */ 1519 qemu_sem_post(&multifd_recv_state->sem_sync); 1520 continue; 1521 } 1522 1523 has_data = !!p->data->size; 1524 } 1525 1526 if (has_data) { 1527 ret = multifd_recv_state->ops->recv(p, &local_err); 1528 if (ret != 0) { 1529 break; 1530 } 1531 } 1532 1533 if (use_packets) { 1534 if (flags & MULTIFD_FLAG_SYNC) { 1535 qemu_sem_post(&multifd_recv_state->sem_sync); 1536 qemu_sem_wait(&p->sem_sync); 1537 } 1538 } else { 1539 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1540 p->data->size = 0; 1541 /* 1542 * Order data->size update before clearing 1543 * pending_job. Pairs with smp_mb_acquire() at 1544 * multifd_recv(). 1545 */ 1546 qatomic_store_release(&p->pending_job, false); 1547 } 1548 } 1549 1550 if (local_err) { 1551 multifd_recv_terminate_threads(local_err); 1552 error_free(local_err); 1553 } 1554 1555 rcu_unregister_thread(); 1556 trace_multifd_recv_thread_end(p->id, p->packets_recved, 1557 p->total_normal_pages, 1558 p->total_zero_pages); 1559 1560 return NULL; 1561 } 1562 1563 int multifd_recv_setup(Error **errp) 1564 { 1565 int thread_count; 1566 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1567 bool use_packets = multifd_use_packets(); 1568 uint8_t i; 1569 1570 /* 1571 * Return successfully if multiFD recv state is already initialised 1572 * or multiFD is not enabled. 1573 */ 1574 if (multifd_recv_state || !migrate_multifd()) { 1575 return 0; 1576 } 1577 1578 thread_count = migrate_multifd_channels(); 1579 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1580 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1581 1582 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1583 multifd_recv_state->data->size = 0; 1584 1585 qatomic_set(&multifd_recv_state->count, 0); 1586 qatomic_set(&multifd_recv_state->exiting, 0); 1587 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1588 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1589 1590 for (i = 0; i < thread_count; i++) { 1591 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1592 1593 qemu_mutex_init(&p->mutex); 1594 qemu_sem_init(&p->sem_sync, 0); 1595 qemu_sem_init(&p->sem, 0); 1596 p->pending_job = false; 1597 p->id = i; 1598 1599 p->data = g_new0(MultiFDRecvData, 1); 1600 p->data->size = 0; 1601 1602 if (use_packets) { 1603 p->packet_len = sizeof(MultiFDPacket_t) 1604 + sizeof(uint64_t) * page_count; 1605 p->packet = g_malloc0(p->packet_len); 1606 } 1607 p->name = g_strdup_printf("multifdrecv_%d", i); 1608 p->normal = g_new0(ram_addr_t, page_count); 1609 p->zero = g_new0(ram_addr_t, page_count); 1610 p->page_count = page_count; 1611 p->page_size = qemu_target_page_size(); 1612 } 1613 1614 for (i = 0; i < thread_count; i++) { 1615 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1616 int ret; 1617 1618 ret = multifd_recv_state->ops->recv_setup(p, errp); 1619 if (ret) { 1620 return ret; 1621 } 1622 } 1623 return 0; 1624 } 1625 1626 bool multifd_recv_all_channels_created(void) 1627 { 1628 int thread_count = migrate_multifd_channels(); 1629 1630 if (!migrate_multifd()) { 1631 return true; 1632 } 1633 1634 if (!multifd_recv_state) { 1635 /* Called before any connections created */ 1636 return false; 1637 } 1638 1639 return thread_count == qatomic_read(&multifd_recv_state->count); 1640 } 1641 1642 /* 1643 * Try to receive all multifd channels to get ready for the migration. 1644 * Sets @errp when failing to receive the current channel. 1645 */ 1646 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1647 { 1648 MultiFDRecvParams *p; 1649 Error *local_err = NULL; 1650 bool use_packets = multifd_use_packets(); 1651 int id; 1652 1653 if (use_packets) { 1654 id = multifd_recv_initial_packet(ioc, &local_err); 1655 if (id < 0) { 1656 multifd_recv_terminate_threads(local_err); 1657 error_propagate_prepend(errp, local_err, 1658 "failed to receive packet" 1659 " via multifd channel %d: ", 1660 qatomic_read(&multifd_recv_state->count)); 1661 return; 1662 } 1663 trace_multifd_recv_new_channel(id); 1664 } else { 1665 id = qatomic_read(&multifd_recv_state->count); 1666 } 1667 1668 p = &multifd_recv_state->params[id]; 1669 if (p->c != NULL) { 1670 error_setg(&local_err, "multifd: received id '%d' already setup'", 1671 id); 1672 multifd_recv_terminate_threads(local_err); 1673 error_propagate(errp, local_err); 1674 return; 1675 } 1676 p->c = ioc; 1677 object_ref(OBJECT(ioc)); 1678 1679 p->thread_created = true; 1680 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1681 QEMU_THREAD_JOINABLE); 1682 qatomic_inc(&multifd_recv_state->count); 1683 } 1684 1685 bool multifd_send_prepare_common(MultiFDSendParams *p) 1686 { 1687 multifd_send_zero_page_detect(p); 1688 1689 if (!p->pages->normal_num) { 1690 p->next_packet_size = 0; 1691 return false; 1692 } 1693 1694 multifd_send_prepare_header(p); 1695 1696 return true; 1697 } 1698