1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "sysemu/sysemu.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "fd.h" 22 #include "file.h" 23 #include "migration.h" 24 #include "migration-stats.h" 25 #include "socket.h" 26 #include "tls.h" 27 #include "qemu-file.h" 28 #include "trace.h" 29 #include "multifd.h" 30 #include "threadinfo.h" 31 #include "options.h" 32 #include "qemu/yank.h" 33 #include "io/channel-file.h" 34 #include "io/channel-socket.h" 35 #include "yank_functions.h" 36 37 /* Multiple fd's */ 38 39 #define MULTIFD_MAGIC 0x11223344U 40 #define MULTIFD_VERSION 1 41 42 typedef struct { 43 uint32_t magic; 44 uint32_t version; 45 unsigned char uuid[16]; /* QemuUUID */ 46 uint8_t id; 47 uint8_t unused1[7]; /* Reserved for future use */ 48 uint64_t unused2[4]; /* Reserved for future use */ 49 } __attribute__((packed)) MultiFDInit_t; 50 51 struct { 52 MultiFDSendParams *params; 53 /* array of pages to sent */ 54 MultiFDPages_t *pages; 55 /* 56 * Global number of generated multifd packets. 57 * 58 * Note that we used 'uintptr_t' because it'll naturally support atomic 59 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 60 * multifd will overflow the packet_num easier, but that should be 61 * fine. 62 * 63 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 64 * hosts, however so far it does not support atomic fetch_add() yet. 65 * Make it easy for now. 66 */ 67 uintptr_t packet_num; 68 /* 69 * Synchronization point past which no more channels will be 70 * created. 71 */ 72 QemuSemaphore channels_created; 73 /* send channels ready */ 74 QemuSemaphore channels_ready; 75 /* 76 * Have we already run terminate threads. There is a race when it 77 * happens that we got one error while we are exiting. 78 * We will use atomic operations. Only valid values are 0 and 1. 79 */ 80 int exiting; 81 /* multifd ops */ 82 MultiFDMethods *ops; 83 } *multifd_send_state; 84 85 struct { 86 MultiFDRecvParams *params; 87 MultiFDRecvData *data; 88 /* number of created threads */ 89 int count; 90 /* 91 * This is always posted by the recv threads, the migration thread 92 * uses it to wait for recv threads to finish assigned tasks. 93 */ 94 QemuSemaphore sem_sync; 95 /* global number of generated multifd packets */ 96 uint64_t packet_num; 97 int exiting; 98 /* multifd ops */ 99 MultiFDMethods *ops; 100 } *multifd_recv_state; 101 102 static bool multifd_use_packets(void) 103 { 104 return !migrate_mapped_ram(); 105 } 106 107 void multifd_send_channel_created(void) 108 { 109 qemu_sem_post(&multifd_send_state->channels_created); 110 } 111 112 static void multifd_set_file_bitmap(MultiFDSendParams *p) 113 { 114 MultiFDPages_t *pages = p->pages; 115 uint32_t zero_num = p->pages->num - p->pages->normal_num; 116 117 assert(pages->block); 118 119 for (int i = 0; i < p->pages->normal_num; i++) { 120 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); 121 } 122 123 for (int i = p->pages->num; i < zero_num; i++) { 124 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); 125 } 126 } 127 128 /* Multifd without compression */ 129 130 /** 131 * nocomp_send_setup: setup send side 132 * 133 * @p: Params for the channel that we are using 134 * @errp: pointer to an error 135 */ 136 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 137 { 138 if (migrate_zero_copy_send()) { 139 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 140 } 141 142 return 0; 143 } 144 145 /** 146 * nocomp_send_cleanup: cleanup send side 147 * 148 * For no compression this function does nothing. 149 * 150 * @p: Params for the channel that we are using 151 * @errp: pointer to an error 152 */ 153 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 154 { 155 return; 156 } 157 158 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 159 { 160 MultiFDPages_t *pages = p->pages; 161 162 for (int i = 0; i < pages->normal_num; i++) { 163 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 164 p->iov[p->iovs_num].iov_len = p->page_size; 165 p->iovs_num++; 166 } 167 168 p->next_packet_size = pages->normal_num * p->page_size; 169 } 170 171 /** 172 * nocomp_send_prepare: prepare date to be able to send 173 * 174 * For no compression we just have to calculate the size of the 175 * packet. 176 * 177 * Returns 0 for success or -1 for error 178 * 179 * @p: Params for the channel that we are using 180 * @errp: pointer to an error 181 */ 182 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 183 { 184 bool use_zero_copy_send = migrate_zero_copy_send(); 185 int ret; 186 187 multifd_send_zero_page_detect(p); 188 189 if (!multifd_use_packets()) { 190 multifd_send_prepare_iovs(p); 191 multifd_set_file_bitmap(p); 192 193 return 0; 194 } 195 196 if (!use_zero_copy_send) { 197 /* 198 * Only !zerocopy needs the header in IOV; zerocopy will 199 * send it separately. 200 */ 201 multifd_send_prepare_header(p); 202 } 203 204 multifd_send_prepare_iovs(p); 205 p->flags |= MULTIFD_FLAG_NOCOMP; 206 207 multifd_send_fill_packet(p); 208 209 if (use_zero_copy_send) { 210 /* Send header first, without zerocopy */ 211 ret = qio_channel_write_all(p->c, (void *)p->packet, 212 p->packet_len, errp); 213 if (ret != 0) { 214 return -1; 215 } 216 } 217 218 return 0; 219 } 220 221 /** 222 * nocomp_recv_setup: setup receive side 223 * 224 * For no compression this function does nothing. 225 * 226 * Returns 0 for success or -1 for error 227 * 228 * @p: Params for the channel that we are using 229 * @errp: pointer to an error 230 */ 231 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 232 { 233 return 0; 234 } 235 236 /** 237 * nocomp_recv_cleanup: setup receive side 238 * 239 * For no compression this function does nothing. 240 * 241 * @p: Params for the channel that we are using 242 */ 243 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 244 { 245 } 246 247 /** 248 * nocomp_recv: read the data from the channel 249 * 250 * For no compression we just need to read things into the correct place. 251 * 252 * Returns 0 for success or -1 for error 253 * 254 * @p: Params for the channel that we are using 255 * @errp: pointer to an error 256 */ 257 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 258 { 259 uint32_t flags; 260 261 if (!multifd_use_packets()) { 262 return multifd_file_recv_data(p, errp); 263 } 264 265 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 266 267 if (flags != MULTIFD_FLAG_NOCOMP) { 268 error_setg(errp, "multifd %u: flags received %x flags expected %x", 269 p->id, flags, MULTIFD_FLAG_NOCOMP); 270 return -1; 271 } 272 273 multifd_recv_zero_page_process(p); 274 275 if (!p->normal_num) { 276 return 0; 277 } 278 279 for (int i = 0; i < p->normal_num; i++) { 280 p->iov[i].iov_base = p->host + p->normal[i]; 281 p->iov[i].iov_len = p->page_size; 282 } 283 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 284 } 285 286 static MultiFDMethods multifd_nocomp_ops = { 287 .send_setup = nocomp_send_setup, 288 .send_cleanup = nocomp_send_cleanup, 289 .send_prepare = nocomp_send_prepare, 290 .recv_setup = nocomp_recv_setup, 291 .recv_cleanup = nocomp_recv_cleanup, 292 .recv = nocomp_recv 293 }; 294 295 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 296 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 297 }; 298 299 void multifd_register_ops(int method, MultiFDMethods *ops) 300 { 301 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 302 multifd_ops[method] = ops; 303 } 304 305 /* Reset a MultiFDPages_t* object for the next use */ 306 static void multifd_pages_reset(MultiFDPages_t *pages) 307 { 308 /* 309 * We don't need to touch offset[] array, because it will be 310 * overwritten later when reused. 311 */ 312 pages->num = 0; 313 pages->normal_num = 0; 314 pages->block = NULL; 315 } 316 317 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 318 { 319 MultiFDInit_t msg = {}; 320 size_t size = sizeof(msg); 321 int ret; 322 323 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 324 msg.version = cpu_to_be32(MULTIFD_VERSION); 325 msg.id = p->id; 326 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 327 328 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 329 if (ret != 0) { 330 return -1; 331 } 332 stat64_add(&mig_stats.multifd_bytes, size); 333 return 0; 334 } 335 336 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 337 { 338 MultiFDInit_t msg; 339 int ret; 340 341 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 342 if (ret != 0) { 343 return -1; 344 } 345 346 msg.magic = be32_to_cpu(msg.magic); 347 msg.version = be32_to_cpu(msg.version); 348 349 if (msg.magic != MULTIFD_MAGIC) { 350 error_setg(errp, "multifd: received packet magic %x " 351 "expected %x", msg.magic, MULTIFD_MAGIC); 352 return -1; 353 } 354 355 if (msg.version != MULTIFD_VERSION) { 356 error_setg(errp, "multifd: received packet version %u " 357 "expected %u", msg.version, MULTIFD_VERSION); 358 return -1; 359 } 360 361 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 362 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 363 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 364 365 error_setg(errp, "multifd: received uuid '%s' and expected " 366 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 367 g_free(uuid); 368 g_free(msg_uuid); 369 return -1; 370 } 371 372 if (msg.id > migrate_multifd_channels()) { 373 error_setg(errp, "multifd: received channel id %u is greater than " 374 "number of channels %u", msg.id, migrate_multifd_channels()); 375 return -1; 376 } 377 378 return msg.id; 379 } 380 381 static MultiFDPages_t *multifd_pages_init(uint32_t n) 382 { 383 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 384 385 pages->allocated = n; 386 pages->offset = g_new0(ram_addr_t, n); 387 388 return pages; 389 } 390 391 static void multifd_pages_clear(MultiFDPages_t *pages) 392 { 393 multifd_pages_reset(pages); 394 pages->allocated = 0; 395 g_free(pages->offset); 396 pages->offset = NULL; 397 g_free(pages); 398 } 399 400 void multifd_send_fill_packet(MultiFDSendParams *p) 401 { 402 MultiFDPacket_t *packet = p->packet; 403 MultiFDPages_t *pages = p->pages; 404 uint64_t packet_num; 405 uint32_t zero_num = pages->num - pages->normal_num; 406 int i; 407 408 packet->flags = cpu_to_be32(p->flags); 409 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 410 packet->normal_pages = cpu_to_be32(pages->normal_num); 411 packet->zero_pages = cpu_to_be32(zero_num); 412 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 413 414 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 415 packet->packet_num = cpu_to_be64(packet_num); 416 417 if (pages->block) { 418 strncpy(packet->ramblock, pages->block->idstr, 256); 419 } 420 421 for (i = 0; i < pages->num; i++) { 422 /* there are architectures where ram_addr_t is 32 bit */ 423 uint64_t temp = pages->offset[i]; 424 425 packet->offset[i] = cpu_to_be64(temp); 426 } 427 428 p->packets_sent++; 429 p->total_normal_pages += pages->normal_num; 430 p->total_zero_pages += zero_num; 431 432 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, 433 p->flags, p->next_packet_size); 434 } 435 436 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 437 { 438 MultiFDPacket_t *packet = p->packet; 439 int i; 440 441 packet->magic = be32_to_cpu(packet->magic); 442 if (packet->magic != MULTIFD_MAGIC) { 443 error_setg(errp, "multifd: received packet " 444 "magic %x and expected magic %x", 445 packet->magic, MULTIFD_MAGIC); 446 return -1; 447 } 448 449 packet->version = be32_to_cpu(packet->version); 450 if (packet->version != MULTIFD_VERSION) { 451 error_setg(errp, "multifd: received packet " 452 "version %u and expected version %u", 453 packet->version, MULTIFD_VERSION); 454 return -1; 455 } 456 457 p->flags = be32_to_cpu(packet->flags); 458 459 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 460 /* 461 * If we received a packet that is 100 times bigger than expected 462 * just stop migration. It is a magic number. 463 */ 464 if (packet->pages_alloc > p->page_count) { 465 error_setg(errp, "multifd: received packet " 466 "with size %u and expected a size of %u", 467 packet->pages_alloc, p->page_count) ; 468 return -1; 469 } 470 471 p->normal_num = be32_to_cpu(packet->normal_pages); 472 if (p->normal_num > packet->pages_alloc) { 473 error_setg(errp, "multifd: received packet " 474 "with %u normal pages and expected maximum pages are %u", 475 p->normal_num, packet->pages_alloc) ; 476 return -1; 477 } 478 479 p->zero_num = be32_to_cpu(packet->zero_pages); 480 if (p->zero_num > packet->pages_alloc - p->normal_num) { 481 error_setg(errp, "multifd: received packet " 482 "with %u zero pages and expected maximum zero pages are %u", 483 p->zero_num, packet->pages_alloc - p->normal_num) ; 484 return -1; 485 } 486 487 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 488 p->packet_num = be64_to_cpu(packet->packet_num); 489 p->packets_recved++; 490 p->total_normal_pages += p->normal_num; 491 p->total_zero_pages += p->zero_num; 492 493 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, 494 p->flags, p->next_packet_size); 495 496 if (p->normal_num == 0 && p->zero_num == 0) { 497 return 0; 498 } 499 500 /* make sure that ramblock is 0 terminated */ 501 packet->ramblock[255] = 0; 502 p->block = qemu_ram_block_by_name(packet->ramblock); 503 if (!p->block) { 504 error_setg(errp, "multifd: unknown ram block %s", 505 packet->ramblock); 506 return -1; 507 } 508 509 p->host = p->block->host; 510 for (i = 0; i < p->normal_num; i++) { 511 uint64_t offset = be64_to_cpu(packet->offset[i]); 512 513 if (offset > (p->block->used_length - p->page_size)) { 514 error_setg(errp, "multifd: offset too long %" PRIu64 515 " (max " RAM_ADDR_FMT ")", 516 offset, p->block->used_length); 517 return -1; 518 } 519 p->normal[i] = offset; 520 } 521 522 for (i = 0; i < p->zero_num; i++) { 523 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); 524 525 if (offset > (p->block->used_length - p->page_size)) { 526 error_setg(errp, "multifd: offset too long %" PRIu64 527 " (max " RAM_ADDR_FMT ")", 528 offset, p->block->used_length); 529 return -1; 530 } 531 p->zero[i] = offset; 532 } 533 534 return 0; 535 } 536 537 static bool multifd_send_should_exit(void) 538 { 539 return qatomic_read(&multifd_send_state->exiting); 540 } 541 542 static bool multifd_recv_should_exit(void) 543 { 544 return qatomic_read(&multifd_recv_state->exiting); 545 } 546 547 /* 548 * The migration thread can wait on either of the two semaphores. This 549 * function can be used to kick the main thread out of waiting on either of 550 * them. Should mostly only be called when something wrong happened with 551 * the current multifd send thread. 552 */ 553 static void multifd_send_kick_main(MultiFDSendParams *p) 554 { 555 qemu_sem_post(&p->sem_sync); 556 qemu_sem_post(&multifd_send_state->channels_ready); 557 } 558 559 /* 560 * How we use multifd_send_state->pages and channel->pages? 561 * 562 * We create a pages for each channel, and a main one. Each time that 563 * we need to send a batch of pages we interchange the ones between 564 * multifd_send_state and the channel that is sending it. There are 565 * two reasons for that: 566 * - to not have to do so many mallocs during migration 567 * - to make easier to know what to free at the end of migration 568 * 569 * This way we always know who is the owner of each "pages" struct, 570 * and we don't need any locking. It belongs to the migration thread 571 * or to the channel thread. Switching is safe because the migration 572 * thread is using the channel mutex when changing it, and the channel 573 * have to had finish with its own, otherwise pending_job can't be 574 * false. 575 * 576 * Returns true if succeed, false otherwise. 577 */ 578 static bool multifd_send_pages(void) 579 { 580 int i; 581 static int next_channel; 582 MultiFDSendParams *p = NULL; /* make happy gcc */ 583 MultiFDPages_t *pages = multifd_send_state->pages; 584 585 if (multifd_send_should_exit()) { 586 return false; 587 } 588 589 /* We wait here, until at least one channel is ready */ 590 qemu_sem_wait(&multifd_send_state->channels_ready); 591 592 /* 593 * next_channel can remain from a previous migration that was 594 * using more channels, so ensure it doesn't overflow if the 595 * limit is lower now. 596 */ 597 next_channel %= migrate_multifd_channels(); 598 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 599 if (multifd_send_should_exit()) { 600 return false; 601 } 602 p = &multifd_send_state->params[i]; 603 /* 604 * Lockless read to p->pending_job is safe, because only multifd 605 * sender thread can clear it. 606 */ 607 if (qatomic_read(&p->pending_job) == false) { 608 next_channel = (i + 1) % migrate_multifd_channels(); 609 break; 610 } 611 } 612 613 /* 614 * Make sure we read p->pending_job before all the rest. Pairs with 615 * qatomic_store_release() in multifd_send_thread(). 616 */ 617 smp_mb_acquire(); 618 assert(!p->pages->num); 619 multifd_send_state->pages = p->pages; 620 p->pages = pages; 621 /* 622 * Making sure p->pages is setup before marking pending_job=true. Pairs 623 * with the qatomic_load_acquire() in multifd_send_thread(). 624 */ 625 qatomic_store_release(&p->pending_job, true); 626 qemu_sem_post(&p->sem); 627 628 return true; 629 } 630 631 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 632 { 633 return pages->num == 0; 634 } 635 636 static inline bool multifd_queue_full(MultiFDPages_t *pages) 637 { 638 return pages->num == pages->allocated; 639 } 640 641 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 642 { 643 pages->offset[pages->num++] = offset; 644 } 645 646 /* Returns true if enqueue successful, false otherwise */ 647 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 648 { 649 MultiFDPages_t *pages; 650 651 retry: 652 pages = multifd_send_state->pages; 653 654 /* If the queue is empty, we can already enqueue now */ 655 if (multifd_queue_empty(pages)) { 656 pages->block = block; 657 multifd_enqueue(pages, offset); 658 return true; 659 } 660 661 /* 662 * Not empty, meanwhile we need a flush. It can because of either: 663 * 664 * (1) The page is not on the same ramblock of previous ones, or, 665 * (2) The queue is full. 666 * 667 * After flush, always retry. 668 */ 669 if (pages->block != block || multifd_queue_full(pages)) { 670 if (!multifd_send_pages()) { 671 return false; 672 } 673 goto retry; 674 } 675 676 /* Not empty, and we still have space, do it! */ 677 multifd_enqueue(pages, offset); 678 return true; 679 } 680 681 /* Multifd send side hit an error; remember it and prepare to quit */ 682 static void multifd_send_set_error(Error *err) 683 { 684 /* 685 * We don't want to exit each threads twice. Depending on where 686 * we get the error, or if there are two independent errors in two 687 * threads at the same time, we can end calling this function 688 * twice. 689 */ 690 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 691 return; 692 } 693 694 if (err) { 695 MigrationState *s = migrate_get_current(); 696 migrate_set_error(s, err); 697 if (s->state == MIGRATION_STATUS_SETUP || 698 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 699 s->state == MIGRATION_STATUS_DEVICE || 700 s->state == MIGRATION_STATUS_ACTIVE) { 701 migrate_set_state(&s->state, s->state, 702 MIGRATION_STATUS_FAILED); 703 } 704 } 705 } 706 707 static void multifd_send_terminate_threads(void) 708 { 709 int i; 710 711 trace_multifd_send_terminate_threads(); 712 713 /* 714 * Tell everyone we're quitting. No xchg() needed here; we simply 715 * always set it. 716 */ 717 qatomic_set(&multifd_send_state->exiting, 1); 718 719 /* 720 * Firstly, kick all threads out; no matter whether they are just idle, 721 * or blocked in an IO system call. 722 */ 723 for (i = 0; i < migrate_multifd_channels(); i++) { 724 MultiFDSendParams *p = &multifd_send_state->params[i]; 725 726 qemu_sem_post(&p->sem); 727 if (p->c) { 728 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 729 } 730 } 731 732 /* 733 * Finally recycle all the threads. 734 */ 735 for (i = 0; i < migrate_multifd_channels(); i++) { 736 MultiFDSendParams *p = &multifd_send_state->params[i]; 737 738 if (p->tls_thread_created) { 739 qemu_thread_join(&p->tls_thread); 740 } 741 742 if (p->thread_created) { 743 qemu_thread_join(&p->thread); 744 } 745 } 746 } 747 748 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 749 { 750 if (p->c) { 751 migration_ioc_unregister_yank(p->c); 752 /* 753 * The object_unref() cannot guarantee the fd will always be 754 * released because finalize() of the iochannel is only 755 * triggered on the last reference and it's not guaranteed 756 * that we always hold the last refcount when reaching here. 757 * 758 * Closing the fd explicitly has the benefit that if there is any 759 * registered I/O handler callbacks on such fd, that will get a 760 * POLLNVAL event and will further trigger the cleanup to finally 761 * release the IOC. 762 * 763 * FIXME: It should logically be guaranteed that all multifd 764 * channels have no I/O handler callback registered when reaching 765 * here, because migration thread will wait for all multifd channel 766 * establishments to complete during setup. Since 767 * migrate_fd_cleanup() will be scheduled in main thread too, all 768 * previous callbacks should guarantee to be completed when 769 * reaching here. See multifd_send_state.channels_created and its 770 * usage. In the future, we could replace this with an assert 771 * making sure we're the last reference, or simply drop it if above 772 * is more clear to be justified. 773 */ 774 qio_channel_close(p->c, &error_abort); 775 object_unref(OBJECT(p->c)); 776 p->c = NULL; 777 } 778 qemu_sem_destroy(&p->sem); 779 qemu_sem_destroy(&p->sem_sync); 780 g_free(p->name); 781 p->name = NULL; 782 multifd_pages_clear(p->pages); 783 p->pages = NULL; 784 p->packet_len = 0; 785 g_free(p->packet); 786 p->packet = NULL; 787 g_free(p->iov); 788 p->iov = NULL; 789 multifd_send_state->ops->send_cleanup(p, errp); 790 791 return *errp == NULL; 792 } 793 794 static void multifd_send_cleanup_state(void) 795 { 796 file_cleanup_outgoing_migration(); 797 fd_cleanup_outgoing_migration(); 798 socket_cleanup_outgoing_migration(); 799 qemu_sem_destroy(&multifd_send_state->channels_created); 800 qemu_sem_destroy(&multifd_send_state->channels_ready); 801 g_free(multifd_send_state->params); 802 multifd_send_state->params = NULL; 803 multifd_pages_clear(multifd_send_state->pages); 804 multifd_send_state->pages = NULL; 805 g_free(multifd_send_state); 806 multifd_send_state = NULL; 807 } 808 809 void multifd_send_shutdown(void) 810 { 811 int i; 812 813 if (!migrate_multifd()) { 814 return; 815 } 816 817 multifd_send_terminate_threads(); 818 819 for (i = 0; i < migrate_multifd_channels(); i++) { 820 MultiFDSendParams *p = &multifd_send_state->params[i]; 821 Error *local_err = NULL; 822 823 if (!multifd_send_cleanup_channel(p, &local_err)) { 824 migrate_set_error(migrate_get_current(), local_err); 825 error_free(local_err); 826 } 827 } 828 829 multifd_send_cleanup_state(); 830 } 831 832 static int multifd_zero_copy_flush(QIOChannel *c) 833 { 834 int ret; 835 Error *err = NULL; 836 837 ret = qio_channel_flush(c, &err); 838 if (ret < 0) { 839 error_report_err(err); 840 return -1; 841 } 842 if (ret == 1) { 843 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 844 } 845 846 return ret; 847 } 848 849 int multifd_send_sync_main(void) 850 { 851 int i; 852 bool flush_zero_copy; 853 854 if (!migrate_multifd()) { 855 return 0; 856 } 857 if (multifd_send_state->pages->num) { 858 if (!multifd_send_pages()) { 859 error_report("%s: multifd_send_pages fail", __func__); 860 return -1; 861 } 862 } 863 864 flush_zero_copy = migrate_zero_copy_send(); 865 866 for (i = 0; i < migrate_multifd_channels(); i++) { 867 MultiFDSendParams *p = &multifd_send_state->params[i]; 868 869 if (multifd_send_should_exit()) { 870 return -1; 871 } 872 873 trace_multifd_send_sync_main_signal(p->id); 874 875 /* 876 * We should be the only user so far, so not possible to be set by 877 * others concurrently. 878 */ 879 assert(qatomic_read(&p->pending_sync) == false); 880 qatomic_set(&p->pending_sync, true); 881 qemu_sem_post(&p->sem); 882 } 883 for (i = 0; i < migrate_multifd_channels(); i++) { 884 MultiFDSendParams *p = &multifd_send_state->params[i]; 885 886 if (multifd_send_should_exit()) { 887 return -1; 888 } 889 890 qemu_sem_wait(&multifd_send_state->channels_ready); 891 trace_multifd_send_sync_main_wait(p->id); 892 qemu_sem_wait(&p->sem_sync); 893 894 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 895 return -1; 896 } 897 } 898 trace_multifd_send_sync_main(multifd_send_state->packet_num); 899 900 return 0; 901 } 902 903 static void *multifd_send_thread(void *opaque) 904 { 905 MultiFDSendParams *p = opaque; 906 MigrationThread *thread = NULL; 907 Error *local_err = NULL; 908 int ret = 0; 909 bool use_packets = multifd_use_packets(); 910 911 thread = migration_threads_add(p->name, qemu_get_thread_id()); 912 913 trace_multifd_send_thread_start(p->id); 914 rcu_register_thread(); 915 916 if (use_packets) { 917 if (multifd_send_initial_packet(p, &local_err) < 0) { 918 ret = -1; 919 goto out; 920 } 921 } 922 923 while (true) { 924 qemu_sem_post(&multifd_send_state->channels_ready); 925 qemu_sem_wait(&p->sem); 926 927 if (multifd_send_should_exit()) { 928 break; 929 } 930 931 /* 932 * Read pending_job flag before p->pages. Pairs with the 933 * qatomic_store_release() in multifd_send_pages(). 934 */ 935 if (qatomic_load_acquire(&p->pending_job)) { 936 MultiFDPages_t *pages = p->pages; 937 938 p->iovs_num = 0; 939 assert(pages->num); 940 941 ret = multifd_send_state->ops->send_prepare(p, &local_err); 942 if (ret != 0) { 943 break; 944 } 945 946 if (migrate_mapped_ram()) { 947 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 948 p->pages->block, &local_err); 949 } else { 950 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 951 NULL, 0, p->write_flags, 952 &local_err); 953 } 954 955 if (ret != 0) { 956 break; 957 } 958 959 stat64_add(&mig_stats.multifd_bytes, 960 p->next_packet_size + p->packet_len); 961 stat64_add(&mig_stats.normal_pages, pages->normal_num); 962 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); 963 964 multifd_pages_reset(p->pages); 965 p->next_packet_size = 0; 966 967 /* 968 * Making sure p->pages is published before saying "we're 969 * free". Pairs with the smp_mb_acquire() in 970 * multifd_send_pages(). 971 */ 972 qatomic_store_release(&p->pending_job, false); 973 } else { 974 /* 975 * If not a normal job, must be a sync request. Note that 976 * pending_sync is a standalone flag (unlike pending_job), so 977 * it doesn't require explicit memory barriers. 978 */ 979 assert(qatomic_read(&p->pending_sync)); 980 981 if (use_packets) { 982 p->flags = MULTIFD_FLAG_SYNC; 983 multifd_send_fill_packet(p); 984 ret = qio_channel_write_all(p->c, (void *)p->packet, 985 p->packet_len, &local_err); 986 if (ret != 0) { 987 break; 988 } 989 /* p->next_packet_size will always be zero for a SYNC packet */ 990 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 991 p->flags = 0; 992 } 993 994 qatomic_set(&p->pending_sync, false); 995 qemu_sem_post(&p->sem_sync); 996 } 997 } 998 999 out: 1000 if (ret) { 1001 assert(local_err); 1002 trace_multifd_send_error(p->id); 1003 multifd_send_set_error(local_err); 1004 multifd_send_kick_main(p); 1005 error_free(local_err); 1006 } 1007 1008 rcu_unregister_thread(); 1009 migration_threads_remove(thread); 1010 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, 1011 p->total_zero_pages); 1012 1013 return NULL; 1014 } 1015 1016 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 1017 1018 typedef struct { 1019 MultiFDSendParams *p; 1020 QIOChannelTLS *tioc; 1021 } MultiFDTLSThreadArgs; 1022 1023 static void *multifd_tls_handshake_thread(void *opaque) 1024 { 1025 MultiFDTLSThreadArgs *args = opaque; 1026 1027 qio_channel_tls_handshake(args->tioc, 1028 multifd_new_send_channel_async, 1029 args->p, 1030 NULL, 1031 NULL); 1032 g_free(args); 1033 1034 return NULL; 1035 } 1036 1037 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 1038 QIOChannel *ioc, 1039 Error **errp) 1040 { 1041 MigrationState *s = migrate_get_current(); 1042 const char *hostname = s->hostname; 1043 MultiFDTLSThreadArgs *args; 1044 QIOChannelTLS *tioc; 1045 1046 tioc = migration_tls_client_create(ioc, hostname, errp); 1047 if (!tioc) { 1048 return false; 1049 } 1050 1051 /* 1052 * Ownership of the socket channel now transfers to the newly 1053 * created TLS channel, which has already taken a reference. 1054 */ 1055 object_unref(OBJECT(ioc)); 1056 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1057 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1058 1059 args = g_new0(MultiFDTLSThreadArgs, 1); 1060 args->tioc = tioc; 1061 args->p = p; 1062 1063 p->tls_thread_created = true; 1064 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 1065 multifd_tls_handshake_thread, args, 1066 QEMU_THREAD_JOINABLE); 1067 return true; 1068 } 1069 1070 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1071 { 1072 qio_channel_set_delay(ioc, false); 1073 1074 migration_ioc_register_yank(ioc); 1075 /* Setup p->c only if the channel is completely setup */ 1076 p->c = ioc; 1077 1078 p->thread_created = true; 1079 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1080 QEMU_THREAD_JOINABLE); 1081 } 1082 1083 /* 1084 * When TLS is enabled this function is called once to establish the 1085 * TLS connection and a second time after the TLS handshake to create 1086 * the multifd channel. Without TLS it goes straight into the channel 1087 * creation. 1088 */ 1089 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1090 { 1091 MultiFDSendParams *p = opaque; 1092 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1093 Error *local_err = NULL; 1094 bool ret; 1095 1096 trace_multifd_new_send_channel_async(p->id); 1097 1098 if (qio_task_propagate_error(task, &local_err)) { 1099 ret = false; 1100 goto out; 1101 } 1102 1103 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1104 migrate_get_current()->hostname); 1105 1106 if (migrate_channel_requires_tls_upgrade(ioc)) { 1107 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1108 if (ret) { 1109 return; 1110 } 1111 } else { 1112 multifd_channel_connect(p, ioc); 1113 ret = true; 1114 } 1115 1116 out: 1117 /* 1118 * Here we're not interested whether creation succeeded, only that 1119 * it happened at all. 1120 */ 1121 multifd_send_channel_created(); 1122 1123 if (ret) { 1124 return; 1125 } 1126 1127 trace_multifd_new_send_channel_async_error(p->id, local_err); 1128 multifd_send_set_error(local_err); 1129 /* 1130 * For error cases (TLS or non-TLS), IO channel is always freed here 1131 * rather than when cleanup multifd: since p->c is not set, multifd 1132 * cleanup code doesn't even know its existence. 1133 */ 1134 object_unref(OBJECT(ioc)); 1135 error_free(local_err); 1136 } 1137 1138 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1139 { 1140 if (!multifd_use_packets()) { 1141 return file_send_channel_create(opaque, errp); 1142 } 1143 1144 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1145 return true; 1146 } 1147 1148 bool multifd_send_setup(void) 1149 { 1150 MigrationState *s = migrate_get_current(); 1151 Error *local_err = NULL; 1152 int thread_count, ret = 0; 1153 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1154 bool use_packets = multifd_use_packets(); 1155 uint8_t i; 1156 1157 if (!migrate_multifd()) { 1158 return true; 1159 } 1160 1161 thread_count = migrate_multifd_channels(); 1162 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1163 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1164 multifd_send_state->pages = multifd_pages_init(page_count); 1165 qemu_sem_init(&multifd_send_state->channels_created, 0); 1166 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1167 qatomic_set(&multifd_send_state->exiting, 0); 1168 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1169 1170 for (i = 0; i < thread_count; i++) { 1171 MultiFDSendParams *p = &multifd_send_state->params[i]; 1172 1173 qemu_sem_init(&p->sem, 0); 1174 qemu_sem_init(&p->sem_sync, 0); 1175 p->id = i; 1176 p->pages = multifd_pages_init(page_count); 1177 1178 if (use_packets) { 1179 p->packet_len = sizeof(MultiFDPacket_t) 1180 + sizeof(uint64_t) * page_count; 1181 p->packet = g_malloc0(p->packet_len); 1182 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1183 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1184 1185 /* We need one extra place for the packet header */ 1186 p->iov = g_new0(struct iovec, page_count + 1); 1187 } else { 1188 p->iov = g_new0(struct iovec, page_count); 1189 } 1190 p->name = g_strdup_printf("multifdsend_%d", i); 1191 p->page_size = qemu_target_page_size(); 1192 p->page_count = page_count; 1193 p->write_flags = 0; 1194 1195 if (!multifd_new_send_channel_create(p, &local_err)) { 1196 return false; 1197 } 1198 } 1199 1200 /* 1201 * Wait until channel creation has started for all channels. The 1202 * creation can still fail, but no more channels will be created 1203 * past this point. 1204 */ 1205 for (i = 0; i < thread_count; i++) { 1206 qemu_sem_wait(&multifd_send_state->channels_created); 1207 } 1208 1209 for (i = 0; i < thread_count; i++) { 1210 MultiFDSendParams *p = &multifd_send_state->params[i]; 1211 1212 ret = multifd_send_state->ops->send_setup(p, &local_err); 1213 if (ret) { 1214 break; 1215 } 1216 } 1217 1218 if (ret) { 1219 migrate_set_error(s, local_err); 1220 error_report_err(local_err); 1221 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1222 MIGRATION_STATUS_FAILED); 1223 return false; 1224 } 1225 1226 return true; 1227 } 1228 1229 bool multifd_recv(void) 1230 { 1231 int i; 1232 static int next_recv_channel; 1233 MultiFDRecvParams *p = NULL; 1234 MultiFDRecvData *data = multifd_recv_state->data; 1235 1236 /* 1237 * next_channel can remain from a previous migration that was 1238 * using more channels, so ensure it doesn't overflow if the 1239 * limit is lower now. 1240 */ 1241 next_recv_channel %= migrate_multifd_channels(); 1242 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1243 if (multifd_recv_should_exit()) { 1244 return false; 1245 } 1246 1247 p = &multifd_recv_state->params[i]; 1248 1249 if (qatomic_read(&p->pending_job) == false) { 1250 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1251 break; 1252 } 1253 } 1254 1255 /* 1256 * Order pending_job read before manipulating p->data below. Pairs 1257 * with qatomic_store_release() at multifd_recv_thread(). 1258 */ 1259 smp_mb_acquire(); 1260 1261 assert(!p->data->size); 1262 multifd_recv_state->data = p->data; 1263 p->data = data; 1264 1265 /* 1266 * Order p->data update before setting pending_job. Pairs with 1267 * qatomic_load_acquire() at multifd_recv_thread(). 1268 */ 1269 qatomic_store_release(&p->pending_job, true); 1270 qemu_sem_post(&p->sem); 1271 1272 return true; 1273 } 1274 1275 MultiFDRecvData *multifd_get_recv_data(void) 1276 { 1277 return multifd_recv_state->data; 1278 } 1279 1280 static void multifd_recv_terminate_threads(Error *err) 1281 { 1282 int i; 1283 1284 trace_multifd_recv_terminate_threads(err != NULL); 1285 1286 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1287 return; 1288 } 1289 1290 if (err) { 1291 MigrationState *s = migrate_get_current(); 1292 migrate_set_error(s, err); 1293 if (s->state == MIGRATION_STATUS_SETUP || 1294 s->state == MIGRATION_STATUS_ACTIVE) { 1295 migrate_set_state(&s->state, s->state, 1296 MIGRATION_STATUS_FAILED); 1297 } 1298 } 1299 1300 for (i = 0; i < migrate_multifd_channels(); i++) { 1301 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1302 1303 /* 1304 * The migration thread and channels interact differently 1305 * depending on the presence of packets. 1306 */ 1307 if (multifd_use_packets()) { 1308 /* 1309 * The channel receives as long as there are packets. When 1310 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1311 * channel waits for the migration thread to sync. If the 1312 * sync never happens, do it here. 1313 */ 1314 qemu_sem_post(&p->sem_sync); 1315 } else { 1316 /* 1317 * The channel waits for the migration thread to give it 1318 * work. When the migration thread runs out of work, it 1319 * releases the channel and waits for any pending work to 1320 * finish. If we reach here (e.g. due to error) before the 1321 * work runs out, release the channel. 1322 */ 1323 qemu_sem_post(&p->sem); 1324 } 1325 1326 /* 1327 * We could arrive here for two reasons: 1328 * - normal quit, i.e. everything went fine, just finished 1329 * - error quit: We close the channels so the channel threads 1330 * finish the qio_channel_read_all_eof() 1331 */ 1332 if (p->c) { 1333 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1334 } 1335 } 1336 } 1337 1338 void multifd_recv_shutdown(void) 1339 { 1340 if (migrate_multifd()) { 1341 multifd_recv_terminate_threads(NULL); 1342 } 1343 } 1344 1345 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1346 { 1347 migration_ioc_unregister_yank(p->c); 1348 object_unref(OBJECT(p->c)); 1349 p->c = NULL; 1350 qemu_mutex_destroy(&p->mutex); 1351 qemu_sem_destroy(&p->sem_sync); 1352 qemu_sem_destroy(&p->sem); 1353 g_free(p->name); 1354 p->name = NULL; 1355 p->packet_len = 0; 1356 g_free(p->packet); 1357 p->packet = NULL; 1358 g_free(p->iov); 1359 p->iov = NULL; 1360 g_free(p->normal); 1361 p->normal = NULL; 1362 g_free(p->zero); 1363 p->zero = NULL; 1364 multifd_recv_state->ops->recv_cleanup(p); 1365 } 1366 1367 static void multifd_recv_cleanup_state(void) 1368 { 1369 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1370 g_free(multifd_recv_state->params); 1371 multifd_recv_state->params = NULL; 1372 g_free(multifd_recv_state->data); 1373 multifd_recv_state->data = NULL; 1374 g_free(multifd_recv_state); 1375 multifd_recv_state = NULL; 1376 } 1377 1378 void multifd_recv_cleanup(void) 1379 { 1380 int i; 1381 1382 if (!migrate_multifd()) { 1383 return; 1384 } 1385 multifd_recv_terminate_threads(NULL); 1386 for (i = 0; i < migrate_multifd_channels(); i++) { 1387 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1388 1389 if (p->thread_created) { 1390 qemu_thread_join(&p->thread); 1391 } 1392 } 1393 for (i = 0; i < migrate_multifd_channels(); i++) { 1394 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1395 } 1396 multifd_recv_cleanup_state(); 1397 } 1398 1399 void multifd_recv_sync_main(void) 1400 { 1401 int thread_count = migrate_multifd_channels(); 1402 bool file_based = !multifd_use_packets(); 1403 int i; 1404 1405 if (!migrate_multifd()) { 1406 return; 1407 } 1408 1409 /* 1410 * File-based channels don't use packets and therefore need to 1411 * wait for more work. Release them to start the sync. 1412 */ 1413 if (file_based) { 1414 for (i = 0; i < thread_count; i++) { 1415 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1416 1417 trace_multifd_recv_sync_main_signal(p->id); 1418 qemu_sem_post(&p->sem); 1419 } 1420 } 1421 1422 /* 1423 * Initiate the synchronization by waiting for all channels. 1424 * 1425 * For socket-based migration this means each channel has received 1426 * the SYNC packet on the stream. 1427 * 1428 * For file-based migration this means each channel is done with 1429 * the work (pending_job=false). 1430 */ 1431 for (i = 0; i < thread_count; i++) { 1432 trace_multifd_recv_sync_main_wait(i); 1433 qemu_sem_wait(&multifd_recv_state->sem_sync); 1434 } 1435 1436 if (file_based) { 1437 /* 1438 * For file-based loading is done in one iteration. We're 1439 * done. 1440 */ 1441 return; 1442 } 1443 1444 /* 1445 * Sync done. Release the channels for the next iteration. 1446 */ 1447 for (i = 0; i < thread_count; i++) { 1448 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1449 1450 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1451 if (multifd_recv_state->packet_num < p->packet_num) { 1452 multifd_recv_state->packet_num = p->packet_num; 1453 } 1454 } 1455 trace_multifd_recv_sync_main_signal(p->id); 1456 qemu_sem_post(&p->sem_sync); 1457 } 1458 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1459 } 1460 1461 static void *multifd_recv_thread(void *opaque) 1462 { 1463 MultiFDRecvParams *p = opaque; 1464 Error *local_err = NULL; 1465 bool use_packets = multifd_use_packets(); 1466 int ret; 1467 1468 trace_multifd_recv_thread_start(p->id); 1469 rcu_register_thread(); 1470 1471 while (true) { 1472 uint32_t flags = 0; 1473 bool has_data = false; 1474 p->normal_num = 0; 1475 1476 if (use_packets) { 1477 if (multifd_recv_should_exit()) { 1478 break; 1479 } 1480 1481 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1482 p->packet_len, &local_err); 1483 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1484 break; 1485 } 1486 1487 qemu_mutex_lock(&p->mutex); 1488 ret = multifd_recv_unfill_packet(p, &local_err); 1489 if (ret) { 1490 qemu_mutex_unlock(&p->mutex); 1491 break; 1492 } 1493 1494 flags = p->flags; 1495 /* recv methods don't know how to handle the SYNC flag */ 1496 p->flags &= ~MULTIFD_FLAG_SYNC; 1497 has_data = p->normal_num || p->zero_num; 1498 qemu_mutex_unlock(&p->mutex); 1499 } else { 1500 /* 1501 * No packets, so we need to wait for the vmstate code to 1502 * give us work. 1503 */ 1504 qemu_sem_wait(&p->sem); 1505 1506 if (multifd_recv_should_exit()) { 1507 break; 1508 } 1509 1510 /* pairs with qatomic_store_release() at multifd_recv() */ 1511 if (!qatomic_load_acquire(&p->pending_job)) { 1512 /* 1513 * Migration thread did not send work, this is 1514 * equivalent to pending_sync on the sending 1515 * side. Post sem_sync to notify we reached this 1516 * point. 1517 */ 1518 qemu_sem_post(&multifd_recv_state->sem_sync); 1519 continue; 1520 } 1521 1522 has_data = !!p->data->size; 1523 } 1524 1525 if (has_data) { 1526 ret = multifd_recv_state->ops->recv(p, &local_err); 1527 if (ret != 0) { 1528 break; 1529 } 1530 } 1531 1532 if (use_packets) { 1533 if (flags & MULTIFD_FLAG_SYNC) { 1534 qemu_sem_post(&multifd_recv_state->sem_sync); 1535 qemu_sem_wait(&p->sem_sync); 1536 } 1537 } else { 1538 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1539 p->data->size = 0; 1540 /* 1541 * Order data->size update before clearing 1542 * pending_job. Pairs with smp_mb_acquire() at 1543 * multifd_recv(). 1544 */ 1545 qatomic_store_release(&p->pending_job, false); 1546 } 1547 } 1548 1549 if (local_err) { 1550 multifd_recv_terminate_threads(local_err); 1551 error_free(local_err); 1552 } 1553 1554 rcu_unregister_thread(); 1555 trace_multifd_recv_thread_end(p->id, p->packets_recved, 1556 p->total_normal_pages, 1557 p->total_zero_pages); 1558 1559 return NULL; 1560 } 1561 1562 int multifd_recv_setup(Error **errp) 1563 { 1564 int thread_count; 1565 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1566 bool use_packets = multifd_use_packets(); 1567 uint8_t i; 1568 1569 /* 1570 * Return successfully if multiFD recv state is already initialised 1571 * or multiFD is not enabled. 1572 */ 1573 if (multifd_recv_state || !migrate_multifd()) { 1574 return 0; 1575 } 1576 1577 thread_count = migrate_multifd_channels(); 1578 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1579 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1580 1581 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1582 multifd_recv_state->data->size = 0; 1583 1584 qatomic_set(&multifd_recv_state->count, 0); 1585 qatomic_set(&multifd_recv_state->exiting, 0); 1586 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1587 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1588 1589 for (i = 0; i < thread_count; i++) { 1590 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1591 1592 qemu_mutex_init(&p->mutex); 1593 qemu_sem_init(&p->sem_sync, 0); 1594 qemu_sem_init(&p->sem, 0); 1595 p->pending_job = false; 1596 p->id = i; 1597 1598 p->data = g_new0(MultiFDRecvData, 1); 1599 p->data->size = 0; 1600 1601 if (use_packets) { 1602 p->packet_len = sizeof(MultiFDPacket_t) 1603 + sizeof(uint64_t) * page_count; 1604 p->packet = g_malloc0(p->packet_len); 1605 } 1606 p->name = g_strdup_printf("multifdrecv_%d", i); 1607 p->iov = g_new0(struct iovec, page_count); 1608 p->normal = g_new0(ram_addr_t, page_count); 1609 p->zero = g_new0(ram_addr_t, page_count); 1610 p->page_count = page_count; 1611 p->page_size = qemu_target_page_size(); 1612 } 1613 1614 for (i = 0; i < thread_count; i++) { 1615 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1616 int ret; 1617 1618 ret = multifd_recv_state->ops->recv_setup(p, errp); 1619 if (ret) { 1620 return ret; 1621 } 1622 } 1623 return 0; 1624 } 1625 1626 bool multifd_recv_all_channels_created(void) 1627 { 1628 int thread_count = migrate_multifd_channels(); 1629 1630 if (!migrate_multifd()) { 1631 return true; 1632 } 1633 1634 if (!multifd_recv_state) { 1635 /* Called before any connections created */ 1636 return false; 1637 } 1638 1639 return thread_count == qatomic_read(&multifd_recv_state->count); 1640 } 1641 1642 /* 1643 * Try to receive all multifd channels to get ready for the migration. 1644 * Sets @errp when failing to receive the current channel. 1645 */ 1646 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1647 { 1648 MultiFDRecvParams *p; 1649 Error *local_err = NULL; 1650 bool use_packets = multifd_use_packets(); 1651 int id; 1652 1653 if (use_packets) { 1654 id = multifd_recv_initial_packet(ioc, &local_err); 1655 if (id < 0) { 1656 multifd_recv_terminate_threads(local_err); 1657 error_propagate_prepend(errp, local_err, 1658 "failed to receive packet" 1659 " via multifd channel %d: ", 1660 qatomic_read(&multifd_recv_state->count)); 1661 return; 1662 } 1663 trace_multifd_recv_new_channel(id); 1664 } else { 1665 id = qatomic_read(&multifd_recv_state->count); 1666 } 1667 1668 p = &multifd_recv_state->params[id]; 1669 if (p->c != NULL) { 1670 error_setg(&local_err, "multifd: received id '%d' already setup'", 1671 id); 1672 multifd_recv_terminate_threads(local_err); 1673 error_propagate(errp, local_err); 1674 return; 1675 } 1676 p->c = ioc; 1677 object_ref(OBJECT(ioc)); 1678 1679 p->thread_created = true; 1680 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1681 QEMU_THREAD_JOINABLE); 1682 qatomic_inc(&multifd_recv_state->count); 1683 } 1684 1685 bool multifd_send_prepare_common(MultiFDSendParams *p) 1686 { 1687 multifd_send_zero_page_detect(p); 1688 1689 if (!p->pages->normal_num) { 1690 p->next_packet_size = 0; 1691 return false; 1692 } 1693 1694 multifd_send_prepare_header(p); 1695 1696 return true; 1697 } 1698