1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "sysemu/sysemu.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* array of pages to sent */ 53 MultiFDPages_t *pages; 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 static bool multifd_use_packets(void) 102 { 103 return !migrate_mapped_ram(); 104 } 105 106 void multifd_send_channel_created(void) 107 { 108 qemu_sem_post(&multifd_send_state->channels_created); 109 } 110 111 static void multifd_set_file_bitmap(MultiFDSendParams *p) 112 { 113 MultiFDPages_t *pages = p->pages; 114 115 assert(pages->block); 116 117 for (int i = 0; i < p->pages->normal_num; i++) { 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); 119 } 120 121 for (int i = p->pages->normal_num; i < p->pages->num; i++) { 122 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); 123 } 124 } 125 126 /* Multifd without compression */ 127 128 /** 129 * nocomp_send_setup: setup send side 130 * 131 * @p: Params for the channel that we are using 132 * @errp: pointer to an error 133 */ 134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 135 { 136 if (migrate_zero_copy_send()) { 137 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 138 } 139 140 return 0; 141 } 142 143 /** 144 * nocomp_send_cleanup: cleanup send side 145 * 146 * For no compression this function does nothing. 147 * 148 * @p: Params for the channel that we are using 149 * @errp: pointer to an error 150 */ 151 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 152 { 153 return; 154 } 155 156 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 157 { 158 MultiFDPages_t *pages = p->pages; 159 160 for (int i = 0; i < pages->normal_num; i++) { 161 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 162 p->iov[p->iovs_num].iov_len = p->page_size; 163 p->iovs_num++; 164 } 165 166 p->next_packet_size = pages->normal_num * p->page_size; 167 } 168 169 /** 170 * nocomp_send_prepare: prepare date to be able to send 171 * 172 * For no compression we just have to calculate the size of the 173 * packet. 174 * 175 * Returns 0 for success or -1 for error 176 * 177 * @p: Params for the channel that we are using 178 * @errp: pointer to an error 179 */ 180 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 181 { 182 bool use_zero_copy_send = migrate_zero_copy_send(); 183 int ret; 184 185 multifd_send_zero_page_detect(p); 186 187 if (!multifd_use_packets()) { 188 multifd_send_prepare_iovs(p); 189 multifd_set_file_bitmap(p); 190 191 return 0; 192 } 193 194 if (!use_zero_copy_send) { 195 /* 196 * Only !zerocopy needs the header in IOV; zerocopy will 197 * send it separately. 198 */ 199 multifd_send_prepare_header(p); 200 } 201 202 multifd_send_prepare_iovs(p); 203 p->flags |= MULTIFD_FLAG_NOCOMP; 204 205 multifd_send_fill_packet(p); 206 207 if (use_zero_copy_send) { 208 /* Send header first, without zerocopy */ 209 ret = qio_channel_write_all(p->c, (void *)p->packet, 210 p->packet_len, errp); 211 if (ret != 0) { 212 return -1; 213 } 214 } 215 216 return 0; 217 } 218 219 /** 220 * nocomp_recv_setup: setup receive side 221 * 222 * For no compression this function does nothing. 223 * 224 * Returns 0 for success or -1 for error 225 * 226 * @p: Params for the channel that we are using 227 * @errp: pointer to an error 228 */ 229 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 230 { 231 return 0; 232 } 233 234 /** 235 * nocomp_recv_cleanup: setup receive side 236 * 237 * For no compression this function does nothing. 238 * 239 * @p: Params for the channel that we are using 240 */ 241 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 242 { 243 } 244 245 /** 246 * nocomp_recv: read the data from the channel 247 * 248 * For no compression we just need to read things into the correct place. 249 * 250 * Returns 0 for success or -1 for error 251 * 252 * @p: Params for the channel that we are using 253 * @errp: pointer to an error 254 */ 255 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 256 { 257 uint32_t flags; 258 259 if (!multifd_use_packets()) { 260 return multifd_file_recv_data(p, errp); 261 } 262 263 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 264 265 if (flags != MULTIFD_FLAG_NOCOMP) { 266 error_setg(errp, "multifd %u: flags received %x flags expected %x", 267 p->id, flags, MULTIFD_FLAG_NOCOMP); 268 return -1; 269 } 270 271 multifd_recv_zero_page_process(p); 272 273 if (!p->normal_num) { 274 return 0; 275 } 276 277 for (int i = 0; i < p->normal_num; i++) { 278 p->iov[i].iov_base = p->host + p->normal[i]; 279 p->iov[i].iov_len = p->page_size; 280 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); 281 } 282 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 283 } 284 285 static MultiFDMethods multifd_nocomp_ops = { 286 .send_setup = nocomp_send_setup, 287 .send_cleanup = nocomp_send_cleanup, 288 .send_prepare = nocomp_send_prepare, 289 .recv_setup = nocomp_recv_setup, 290 .recv_cleanup = nocomp_recv_cleanup, 291 .recv = nocomp_recv 292 }; 293 294 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 295 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 296 }; 297 298 void multifd_register_ops(int method, MultiFDMethods *ops) 299 { 300 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 301 multifd_ops[method] = ops; 302 } 303 304 /* Reset a MultiFDPages_t* object for the next use */ 305 static void multifd_pages_reset(MultiFDPages_t *pages) 306 { 307 /* 308 * We don't need to touch offset[] array, because it will be 309 * overwritten later when reused. 310 */ 311 pages->num = 0; 312 pages->normal_num = 0; 313 pages->block = NULL; 314 } 315 316 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 317 { 318 MultiFDInit_t msg = {}; 319 size_t size = sizeof(msg); 320 int ret; 321 322 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 323 msg.version = cpu_to_be32(MULTIFD_VERSION); 324 msg.id = p->id; 325 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 326 327 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 328 if (ret != 0) { 329 return -1; 330 } 331 stat64_add(&mig_stats.multifd_bytes, size); 332 return 0; 333 } 334 335 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 336 { 337 MultiFDInit_t msg; 338 int ret; 339 340 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 341 if (ret != 0) { 342 return -1; 343 } 344 345 msg.magic = be32_to_cpu(msg.magic); 346 msg.version = be32_to_cpu(msg.version); 347 348 if (msg.magic != MULTIFD_MAGIC) { 349 error_setg(errp, "multifd: received packet magic %x " 350 "expected %x", msg.magic, MULTIFD_MAGIC); 351 return -1; 352 } 353 354 if (msg.version != MULTIFD_VERSION) { 355 error_setg(errp, "multifd: received packet version %u " 356 "expected %u", msg.version, MULTIFD_VERSION); 357 return -1; 358 } 359 360 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 361 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 362 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 363 364 error_setg(errp, "multifd: received uuid '%s' and expected " 365 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 366 g_free(uuid); 367 g_free(msg_uuid); 368 return -1; 369 } 370 371 if (msg.id > migrate_multifd_channels()) { 372 error_setg(errp, "multifd: received channel id %u is greater than " 373 "number of channels %u", msg.id, migrate_multifd_channels()); 374 return -1; 375 } 376 377 return msg.id; 378 } 379 380 static MultiFDPages_t *multifd_pages_init(uint32_t n) 381 { 382 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 383 384 pages->allocated = n; 385 pages->offset = g_new0(ram_addr_t, n); 386 387 return pages; 388 } 389 390 static void multifd_pages_clear(MultiFDPages_t *pages) 391 { 392 multifd_pages_reset(pages); 393 pages->allocated = 0; 394 g_free(pages->offset); 395 pages->offset = NULL; 396 g_free(pages); 397 } 398 399 void multifd_send_fill_packet(MultiFDSendParams *p) 400 { 401 MultiFDPacket_t *packet = p->packet; 402 MultiFDPages_t *pages = p->pages; 403 uint64_t packet_num; 404 uint32_t zero_num = pages->num - pages->normal_num; 405 int i; 406 407 packet->flags = cpu_to_be32(p->flags); 408 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 409 packet->normal_pages = cpu_to_be32(pages->normal_num); 410 packet->zero_pages = cpu_to_be32(zero_num); 411 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 412 413 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 414 packet->packet_num = cpu_to_be64(packet_num); 415 416 if (pages->block) { 417 strncpy(packet->ramblock, pages->block->idstr, 256); 418 } 419 420 for (i = 0; i < pages->num; i++) { 421 /* there are architectures where ram_addr_t is 32 bit */ 422 uint64_t temp = pages->offset[i]; 423 424 packet->offset[i] = cpu_to_be64(temp); 425 } 426 427 p->packets_sent++; 428 p->total_normal_pages += pages->normal_num; 429 p->total_zero_pages += zero_num; 430 431 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, 432 p->flags, p->next_packet_size); 433 } 434 435 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 436 { 437 MultiFDPacket_t *packet = p->packet; 438 int i; 439 440 packet->magic = be32_to_cpu(packet->magic); 441 if (packet->magic != MULTIFD_MAGIC) { 442 error_setg(errp, "multifd: received packet " 443 "magic %x and expected magic %x", 444 packet->magic, MULTIFD_MAGIC); 445 return -1; 446 } 447 448 packet->version = be32_to_cpu(packet->version); 449 if (packet->version != MULTIFD_VERSION) { 450 error_setg(errp, "multifd: received packet " 451 "version %u and expected version %u", 452 packet->version, MULTIFD_VERSION); 453 return -1; 454 } 455 456 p->flags = be32_to_cpu(packet->flags); 457 458 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 459 /* 460 * If we received a packet that is 100 times bigger than expected 461 * just stop migration. It is a magic number. 462 */ 463 if (packet->pages_alloc > p->page_count) { 464 error_setg(errp, "multifd: received packet " 465 "with size %u and expected a size of %u", 466 packet->pages_alloc, p->page_count) ; 467 return -1; 468 } 469 470 p->normal_num = be32_to_cpu(packet->normal_pages); 471 if (p->normal_num > packet->pages_alloc) { 472 error_setg(errp, "multifd: received packet " 473 "with %u normal pages and expected maximum pages are %u", 474 p->normal_num, packet->pages_alloc) ; 475 return -1; 476 } 477 478 p->zero_num = be32_to_cpu(packet->zero_pages); 479 if (p->zero_num > packet->pages_alloc - p->normal_num) { 480 error_setg(errp, "multifd: received packet " 481 "with %u zero pages and expected maximum zero pages are %u", 482 p->zero_num, packet->pages_alloc - p->normal_num) ; 483 return -1; 484 } 485 486 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 487 p->packet_num = be64_to_cpu(packet->packet_num); 488 p->packets_recved++; 489 p->total_normal_pages += p->normal_num; 490 p->total_zero_pages += p->zero_num; 491 492 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, 493 p->flags, p->next_packet_size); 494 495 if (p->normal_num == 0 && p->zero_num == 0) { 496 return 0; 497 } 498 499 /* make sure that ramblock is 0 terminated */ 500 packet->ramblock[255] = 0; 501 p->block = qemu_ram_block_by_name(packet->ramblock); 502 if (!p->block) { 503 error_setg(errp, "multifd: unknown ram block %s", 504 packet->ramblock); 505 return -1; 506 } 507 508 p->host = p->block->host; 509 for (i = 0; i < p->normal_num; i++) { 510 uint64_t offset = be64_to_cpu(packet->offset[i]); 511 512 if (offset > (p->block->used_length - p->page_size)) { 513 error_setg(errp, "multifd: offset too long %" PRIu64 514 " (max " RAM_ADDR_FMT ")", 515 offset, p->block->used_length); 516 return -1; 517 } 518 p->normal[i] = offset; 519 } 520 521 for (i = 0; i < p->zero_num; i++) { 522 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); 523 524 if (offset > (p->block->used_length - p->page_size)) { 525 error_setg(errp, "multifd: offset too long %" PRIu64 526 " (max " RAM_ADDR_FMT ")", 527 offset, p->block->used_length); 528 return -1; 529 } 530 p->zero[i] = offset; 531 } 532 533 return 0; 534 } 535 536 static bool multifd_send_should_exit(void) 537 { 538 return qatomic_read(&multifd_send_state->exiting); 539 } 540 541 static bool multifd_recv_should_exit(void) 542 { 543 return qatomic_read(&multifd_recv_state->exiting); 544 } 545 546 /* 547 * The migration thread can wait on either of the two semaphores. This 548 * function can be used to kick the main thread out of waiting on either of 549 * them. Should mostly only be called when something wrong happened with 550 * the current multifd send thread. 551 */ 552 static void multifd_send_kick_main(MultiFDSendParams *p) 553 { 554 qemu_sem_post(&p->sem_sync); 555 qemu_sem_post(&multifd_send_state->channels_ready); 556 } 557 558 /* 559 * How we use multifd_send_state->pages and channel->pages? 560 * 561 * We create a pages for each channel, and a main one. Each time that 562 * we need to send a batch of pages we interchange the ones between 563 * multifd_send_state and the channel that is sending it. There are 564 * two reasons for that: 565 * - to not have to do so many mallocs during migration 566 * - to make easier to know what to free at the end of migration 567 * 568 * This way we always know who is the owner of each "pages" struct, 569 * and we don't need any locking. It belongs to the migration thread 570 * or to the channel thread. Switching is safe because the migration 571 * thread is using the channel mutex when changing it, and the channel 572 * have to had finish with its own, otherwise pending_job can't be 573 * false. 574 * 575 * Returns true if succeed, false otherwise. 576 */ 577 static bool multifd_send_pages(void) 578 { 579 int i; 580 static int next_channel; 581 MultiFDSendParams *p = NULL; /* make happy gcc */ 582 MultiFDPages_t *pages = multifd_send_state->pages; 583 584 if (multifd_send_should_exit()) { 585 return false; 586 } 587 588 /* We wait here, until at least one channel is ready */ 589 qemu_sem_wait(&multifd_send_state->channels_ready); 590 591 /* 592 * next_channel can remain from a previous migration that was 593 * using more channels, so ensure it doesn't overflow if the 594 * limit is lower now. 595 */ 596 next_channel %= migrate_multifd_channels(); 597 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 598 if (multifd_send_should_exit()) { 599 return false; 600 } 601 p = &multifd_send_state->params[i]; 602 /* 603 * Lockless read to p->pending_job is safe, because only multifd 604 * sender thread can clear it. 605 */ 606 if (qatomic_read(&p->pending_job) == false) { 607 next_channel = (i + 1) % migrate_multifd_channels(); 608 break; 609 } 610 } 611 612 /* 613 * Make sure we read p->pending_job before all the rest. Pairs with 614 * qatomic_store_release() in multifd_send_thread(). 615 */ 616 smp_mb_acquire(); 617 assert(!p->pages->num); 618 multifd_send_state->pages = p->pages; 619 p->pages = pages; 620 /* 621 * Making sure p->pages is setup before marking pending_job=true. Pairs 622 * with the qatomic_load_acquire() in multifd_send_thread(). 623 */ 624 qatomic_store_release(&p->pending_job, true); 625 qemu_sem_post(&p->sem); 626 627 return true; 628 } 629 630 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 631 { 632 return pages->num == 0; 633 } 634 635 static inline bool multifd_queue_full(MultiFDPages_t *pages) 636 { 637 return pages->num == pages->allocated; 638 } 639 640 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 641 { 642 pages->offset[pages->num++] = offset; 643 } 644 645 /* Returns true if enqueue successful, false otherwise */ 646 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 647 { 648 MultiFDPages_t *pages; 649 650 retry: 651 pages = multifd_send_state->pages; 652 653 /* If the queue is empty, we can already enqueue now */ 654 if (multifd_queue_empty(pages)) { 655 pages->block = block; 656 multifd_enqueue(pages, offset); 657 return true; 658 } 659 660 /* 661 * Not empty, meanwhile we need a flush. It can because of either: 662 * 663 * (1) The page is not on the same ramblock of previous ones, or, 664 * (2) The queue is full. 665 * 666 * After flush, always retry. 667 */ 668 if (pages->block != block || multifd_queue_full(pages)) { 669 if (!multifd_send_pages()) { 670 return false; 671 } 672 goto retry; 673 } 674 675 /* Not empty, and we still have space, do it! */ 676 multifd_enqueue(pages, offset); 677 return true; 678 } 679 680 /* Multifd send side hit an error; remember it and prepare to quit */ 681 static void multifd_send_set_error(Error *err) 682 { 683 /* 684 * We don't want to exit each threads twice. Depending on where 685 * we get the error, or if there are two independent errors in two 686 * threads at the same time, we can end calling this function 687 * twice. 688 */ 689 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 690 return; 691 } 692 693 if (err) { 694 MigrationState *s = migrate_get_current(); 695 migrate_set_error(s, err); 696 if (s->state == MIGRATION_STATUS_SETUP || 697 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 698 s->state == MIGRATION_STATUS_DEVICE || 699 s->state == MIGRATION_STATUS_ACTIVE) { 700 migrate_set_state(&s->state, s->state, 701 MIGRATION_STATUS_FAILED); 702 } 703 } 704 } 705 706 static void multifd_send_terminate_threads(void) 707 { 708 int i; 709 710 trace_multifd_send_terminate_threads(); 711 712 /* 713 * Tell everyone we're quitting. No xchg() needed here; we simply 714 * always set it. 715 */ 716 qatomic_set(&multifd_send_state->exiting, 1); 717 718 /* 719 * Firstly, kick all threads out; no matter whether they are just idle, 720 * or blocked in an IO system call. 721 */ 722 for (i = 0; i < migrate_multifd_channels(); i++) { 723 MultiFDSendParams *p = &multifd_send_state->params[i]; 724 725 qemu_sem_post(&p->sem); 726 if (p->c) { 727 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 728 } 729 } 730 731 /* 732 * Finally recycle all the threads. 733 */ 734 for (i = 0; i < migrate_multifd_channels(); i++) { 735 MultiFDSendParams *p = &multifd_send_state->params[i]; 736 737 if (p->tls_thread_created) { 738 qemu_thread_join(&p->tls_thread); 739 } 740 741 if (p->thread_created) { 742 qemu_thread_join(&p->thread); 743 } 744 } 745 } 746 747 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 748 { 749 if (p->c) { 750 migration_ioc_unregister_yank(p->c); 751 /* 752 * The object_unref() cannot guarantee the fd will always be 753 * released because finalize() of the iochannel is only 754 * triggered on the last reference and it's not guaranteed 755 * that we always hold the last refcount when reaching here. 756 * 757 * Closing the fd explicitly has the benefit that if there is any 758 * registered I/O handler callbacks on such fd, that will get a 759 * POLLNVAL event and will further trigger the cleanup to finally 760 * release the IOC. 761 * 762 * FIXME: It should logically be guaranteed that all multifd 763 * channels have no I/O handler callback registered when reaching 764 * here, because migration thread will wait for all multifd channel 765 * establishments to complete during setup. Since 766 * migrate_fd_cleanup() will be scheduled in main thread too, all 767 * previous callbacks should guarantee to be completed when 768 * reaching here. See multifd_send_state.channels_created and its 769 * usage. In the future, we could replace this with an assert 770 * making sure we're the last reference, or simply drop it if above 771 * is more clear to be justified. 772 */ 773 qio_channel_close(p->c, &error_abort); 774 object_unref(OBJECT(p->c)); 775 p->c = NULL; 776 } 777 qemu_sem_destroy(&p->sem); 778 qemu_sem_destroy(&p->sem_sync); 779 g_free(p->name); 780 p->name = NULL; 781 multifd_pages_clear(p->pages); 782 p->pages = NULL; 783 p->packet_len = 0; 784 g_free(p->packet); 785 p->packet = NULL; 786 g_free(p->iov); 787 p->iov = NULL; 788 multifd_send_state->ops->send_cleanup(p, errp); 789 790 return *errp == NULL; 791 } 792 793 static void multifd_send_cleanup_state(void) 794 { 795 file_cleanup_outgoing_migration(); 796 socket_cleanup_outgoing_migration(); 797 qemu_sem_destroy(&multifd_send_state->channels_created); 798 qemu_sem_destroy(&multifd_send_state->channels_ready); 799 g_free(multifd_send_state->params); 800 multifd_send_state->params = NULL; 801 multifd_pages_clear(multifd_send_state->pages); 802 multifd_send_state->pages = NULL; 803 g_free(multifd_send_state); 804 multifd_send_state = NULL; 805 } 806 807 void multifd_send_shutdown(void) 808 { 809 int i; 810 811 if (!migrate_multifd()) { 812 return; 813 } 814 815 multifd_send_terminate_threads(); 816 817 for (i = 0; i < migrate_multifd_channels(); i++) { 818 MultiFDSendParams *p = &multifd_send_state->params[i]; 819 Error *local_err = NULL; 820 821 if (!multifd_send_cleanup_channel(p, &local_err)) { 822 migrate_set_error(migrate_get_current(), local_err); 823 error_free(local_err); 824 } 825 } 826 827 multifd_send_cleanup_state(); 828 } 829 830 static int multifd_zero_copy_flush(QIOChannel *c) 831 { 832 int ret; 833 Error *err = NULL; 834 835 ret = qio_channel_flush(c, &err); 836 if (ret < 0) { 837 error_report_err(err); 838 return -1; 839 } 840 if (ret == 1) { 841 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 842 } 843 844 return ret; 845 } 846 847 int multifd_send_sync_main(void) 848 { 849 int i; 850 bool flush_zero_copy; 851 852 if (!migrate_multifd()) { 853 return 0; 854 } 855 if (multifd_send_state->pages->num) { 856 if (!multifd_send_pages()) { 857 error_report("%s: multifd_send_pages fail", __func__); 858 return -1; 859 } 860 } 861 862 flush_zero_copy = migrate_zero_copy_send(); 863 864 for (i = 0; i < migrate_multifd_channels(); i++) { 865 MultiFDSendParams *p = &multifd_send_state->params[i]; 866 867 if (multifd_send_should_exit()) { 868 return -1; 869 } 870 871 trace_multifd_send_sync_main_signal(p->id); 872 873 /* 874 * We should be the only user so far, so not possible to be set by 875 * others concurrently. 876 */ 877 assert(qatomic_read(&p->pending_sync) == false); 878 qatomic_set(&p->pending_sync, true); 879 qemu_sem_post(&p->sem); 880 } 881 for (i = 0; i < migrate_multifd_channels(); i++) { 882 MultiFDSendParams *p = &multifd_send_state->params[i]; 883 884 if (multifd_send_should_exit()) { 885 return -1; 886 } 887 888 qemu_sem_wait(&multifd_send_state->channels_ready); 889 trace_multifd_send_sync_main_wait(p->id); 890 qemu_sem_wait(&p->sem_sync); 891 892 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 893 return -1; 894 } 895 } 896 trace_multifd_send_sync_main(multifd_send_state->packet_num); 897 898 return 0; 899 } 900 901 static void *multifd_send_thread(void *opaque) 902 { 903 MultiFDSendParams *p = opaque; 904 MigrationThread *thread = NULL; 905 Error *local_err = NULL; 906 int ret = 0; 907 bool use_packets = multifd_use_packets(); 908 909 thread = migration_threads_add(p->name, qemu_get_thread_id()); 910 911 trace_multifd_send_thread_start(p->id); 912 rcu_register_thread(); 913 914 if (use_packets) { 915 if (multifd_send_initial_packet(p, &local_err) < 0) { 916 ret = -1; 917 goto out; 918 } 919 } 920 921 while (true) { 922 qemu_sem_post(&multifd_send_state->channels_ready); 923 qemu_sem_wait(&p->sem); 924 925 if (multifd_send_should_exit()) { 926 break; 927 } 928 929 /* 930 * Read pending_job flag before p->pages. Pairs with the 931 * qatomic_store_release() in multifd_send_pages(). 932 */ 933 if (qatomic_load_acquire(&p->pending_job)) { 934 MultiFDPages_t *pages = p->pages; 935 936 p->iovs_num = 0; 937 assert(pages->num); 938 939 ret = multifd_send_state->ops->send_prepare(p, &local_err); 940 if (ret != 0) { 941 break; 942 } 943 944 if (migrate_mapped_ram()) { 945 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 946 p->pages->block, &local_err); 947 } else { 948 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 949 NULL, 0, p->write_flags, 950 &local_err); 951 } 952 953 if (ret != 0) { 954 break; 955 } 956 957 stat64_add(&mig_stats.multifd_bytes, 958 p->next_packet_size + p->packet_len); 959 stat64_add(&mig_stats.normal_pages, pages->normal_num); 960 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); 961 962 multifd_pages_reset(p->pages); 963 p->next_packet_size = 0; 964 965 /* 966 * Making sure p->pages is published before saying "we're 967 * free". Pairs with the smp_mb_acquire() in 968 * multifd_send_pages(). 969 */ 970 qatomic_store_release(&p->pending_job, false); 971 } else { 972 /* 973 * If not a normal job, must be a sync request. Note that 974 * pending_sync is a standalone flag (unlike pending_job), so 975 * it doesn't require explicit memory barriers. 976 */ 977 assert(qatomic_read(&p->pending_sync)); 978 979 if (use_packets) { 980 p->flags = MULTIFD_FLAG_SYNC; 981 multifd_send_fill_packet(p); 982 ret = qio_channel_write_all(p->c, (void *)p->packet, 983 p->packet_len, &local_err); 984 if (ret != 0) { 985 break; 986 } 987 /* p->next_packet_size will always be zero for a SYNC packet */ 988 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 989 p->flags = 0; 990 } 991 992 qatomic_set(&p->pending_sync, false); 993 qemu_sem_post(&p->sem_sync); 994 } 995 } 996 997 out: 998 if (ret) { 999 assert(local_err); 1000 trace_multifd_send_error(p->id); 1001 multifd_send_set_error(local_err); 1002 multifd_send_kick_main(p); 1003 error_free(local_err); 1004 } 1005 1006 rcu_unregister_thread(); 1007 migration_threads_remove(thread); 1008 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, 1009 p->total_zero_pages); 1010 1011 return NULL; 1012 } 1013 1014 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 1015 1016 typedef struct { 1017 MultiFDSendParams *p; 1018 QIOChannelTLS *tioc; 1019 } MultiFDTLSThreadArgs; 1020 1021 static void *multifd_tls_handshake_thread(void *opaque) 1022 { 1023 MultiFDTLSThreadArgs *args = opaque; 1024 1025 qio_channel_tls_handshake(args->tioc, 1026 multifd_new_send_channel_async, 1027 args->p, 1028 NULL, 1029 NULL); 1030 g_free(args); 1031 1032 return NULL; 1033 } 1034 1035 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 1036 QIOChannel *ioc, 1037 Error **errp) 1038 { 1039 MigrationState *s = migrate_get_current(); 1040 const char *hostname = s->hostname; 1041 MultiFDTLSThreadArgs *args; 1042 QIOChannelTLS *tioc; 1043 1044 tioc = migration_tls_client_create(ioc, hostname, errp); 1045 if (!tioc) { 1046 return false; 1047 } 1048 1049 /* 1050 * Ownership of the socket channel now transfers to the newly 1051 * created TLS channel, which has already taken a reference. 1052 */ 1053 object_unref(OBJECT(ioc)); 1054 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1055 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1056 1057 args = g_new0(MultiFDTLSThreadArgs, 1); 1058 args->tioc = tioc; 1059 args->p = p; 1060 1061 p->tls_thread_created = true; 1062 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 1063 multifd_tls_handshake_thread, args, 1064 QEMU_THREAD_JOINABLE); 1065 return true; 1066 } 1067 1068 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1069 { 1070 qio_channel_set_delay(ioc, false); 1071 1072 migration_ioc_register_yank(ioc); 1073 /* Setup p->c only if the channel is completely setup */ 1074 p->c = ioc; 1075 1076 p->thread_created = true; 1077 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1078 QEMU_THREAD_JOINABLE); 1079 } 1080 1081 /* 1082 * When TLS is enabled this function is called once to establish the 1083 * TLS connection and a second time after the TLS handshake to create 1084 * the multifd channel. Without TLS it goes straight into the channel 1085 * creation. 1086 */ 1087 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1088 { 1089 MultiFDSendParams *p = opaque; 1090 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1091 Error *local_err = NULL; 1092 bool ret; 1093 1094 trace_multifd_new_send_channel_async(p->id); 1095 1096 if (qio_task_propagate_error(task, &local_err)) { 1097 ret = false; 1098 goto out; 1099 } 1100 1101 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1102 migrate_get_current()->hostname); 1103 1104 if (migrate_channel_requires_tls_upgrade(ioc)) { 1105 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1106 if (ret) { 1107 return; 1108 } 1109 } else { 1110 multifd_channel_connect(p, ioc); 1111 ret = true; 1112 } 1113 1114 out: 1115 /* 1116 * Here we're not interested whether creation succeeded, only that 1117 * it happened at all. 1118 */ 1119 multifd_send_channel_created(); 1120 1121 if (ret) { 1122 return; 1123 } 1124 1125 trace_multifd_new_send_channel_async_error(p->id, local_err); 1126 multifd_send_set_error(local_err); 1127 /* 1128 * For error cases (TLS or non-TLS), IO channel is always freed here 1129 * rather than when cleanup multifd: since p->c is not set, multifd 1130 * cleanup code doesn't even know its existence. 1131 */ 1132 object_unref(OBJECT(ioc)); 1133 error_free(local_err); 1134 } 1135 1136 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1137 { 1138 if (!multifd_use_packets()) { 1139 return file_send_channel_create(opaque, errp); 1140 } 1141 1142 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1143 return true; 1144 } 1145 1146 bool multifd_send_setup(void) 1147 { 1148 MigrationState *s = migrate_get_current(); 1149 Error *local_err = NULL; 1150 int thread_count, ret = 0; 1151 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1152 bool use_packets = multifd_use_packets(); 1153 uint8_t i; 1154 1155 if (!migrate_multifd()) { 1156 return true; 1157 } 1158 1159 thread_count = migrate_multifd_channels(); 1160 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1161 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1162 multifd_send_state->pages = multifd_pages_init(page_count); 1163 qemu_sem_init(&multifd_send_state->channels_created, 0); 1164 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1165 qatomic_set(&multifd_send_state->exiting, 0); 1166 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1167 1168 for (i = 0; i < thread_count; i++) { 1169 MultiFDSendParams *p = &multifd_send_state->params[i]; 1170 1171 qemu_sem_init(&p->sem, 0); 1172 qemu_sem_init(&p->sem_sync, 0); 1173 p->id = i; 1174 p->pages = multifd_pages_init(page_count); 1175 1176 if (use_packets) { 1177 p->packet_len = sizeof(MultiFDPacket_t) 1178 + sizeof(uint64_t) * page_count; 1179 p->packet = g_malloc0(p->packet_len); 1180 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1181 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1182 1183 /* We need one extra place for the packet header */ 1184 p->iov = g_new0(struct iovec, page_count + 1); 1185 } else { 1186 p->iov = g_new0(struct iovec, page_count); 1187 } 1188 p->name = g_strdup_printf("multifdsend_%d", i); 1189 p->page_size = qemu_target_page_size(); 1190 p->page_count = page_count; 1191 p->write_flags = 0; 1192 1193 if (!multifd_new_send_channel_create(p, &local_err)) { 1194 return false; 1195 } 1196 } 1197 1198 /* 1199 * Wait until channel creation has started for all channels. The 1200 * creation can still fail, but no more channels will be created 1201 * past this point. 1202 */ 1203 for (i = 0; i < thread_count; i++) { 1204 qemu_sem_wait(&multifd_send_state->channels_created); 1205 } 1206 1207 for (i = 0; i < thread_count; i++) { 1208 MultiFDSendParams *p = &multifd_send_state->params[i]; 1209 1210 ret = multifd_send_state->ops->send_setup(p, &local_err); 1211 if (ret) { 1212 break; 1213 } 1214 } 1215 1216 if (ret) { 1217 migrate_set_error(s, local_err); 1218 error_report_err(local_err); 1219 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1220 MIGRATION_STATUS_FAILED); 1221 return false; 1222 } 1223 1224 return true; 1225 } 1226 1227 bool multifd_recv(void) 1228 { 1229 int i; 1230 static int next_recv_channel; 1231 MultiFDRecvParams *p = NULL; 1232 MultiFDRecvData *data = multifd_recv_state->data; 1233 1234 /* 1235 * next_channel can remain from a previous migration that was 1236 * using more channels, so ensure it doesn't overflow if the 1237 * limit is lower now. 1238 */ 1239 next_recv_channel %= migrate_multifd_channels(); 1240 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1241 if (multifd_recv_should_exit()) { 1242 return false; 1243 } 1244 1245 p = &multifd_recv_state->params[i]; 1246 1247 if (qatomic_read(&p->pending_job) == false) { 1248 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1249 break; 1250 } 1251 } 1252 1253 /* 1254 * Order pending_job read before manipulating p->data below. Pairs 1255 * with qatomic_store_release() at multifd_recv_thread(). 1256 */ 1257 smp_mb_acquire(); 1258 1259 assert(!p->data->size); 1260 multifd_recv_state->data = p->data; 1261 p->data = data; 1262 1263 /* 1264 * Order p->data update before setting pending_job. Pairs with 1265 * qatomic_load_acquire() at multifd_recv_thread(). 1266 */ 1267 qatomic_store_release(&p->pending_job, true); 1268 qemu_sem_post(&p->sem); 1269 1270 return true; 1271 } 1272 1273 MultiFDRecvData *multifd_get_recv_data(void) 1274 { 1275 return multifd_recv_state->data; 1276 } 1277 1278 static void multifd_recv_terminate_threads(Error *err) 1279 { 1280 int i; 1281 1282 trace_multifd_recv_terminate_threads(err != NULL); 1283 1284 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1285 return; 1286 } 1287 1288 if (err) { 1289 MigrationState *s = migrate_get_current(); 1290 migrate_set_error(s, err); 1291 if (s->state == MIGRATION_STATUS_SETUP || 1292 s->state == MIGRATION_STATUS_ACTIVE) { 1293 migrate_set_state(&s->state, s->state, 1294 MIGRATION_STATUS_FAILED); 1295 } 1296 } 1297 1298 for (i = 0; i < migrate_multifd_channels(); i++) { 1299 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1300 1301 /* 1302 * The migration thread and channels interact differently 1303 * depending on the presence of packets. 1304 */ 1305 if (multifd_use_packets()) { 1306 /* 1307 * The channel receives as long as there are packets. When 1308 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1309 * channel waits for the migration thread to sync. If the 1310 * sync never happens, do it here. 1311 */ 1312 qemu_sem_post(&p->sem_sync); 1313 } else { 1314 /* 1315 * The channel waits for the migration thread to give it 1316 * work. When the migration thread runs out of work, it 1317 * releases the channel and waits for any pending work to 1318 * finish. If we reach here (e.g. due to error) before the 1319 * work runs out, release the channel. 1320 */ 1321 qemu_sem_post(&p->sem); 1322 } 1323 1324 /* 1325 * We could arrive here for two reasons: 1326 * - normal quit, i.e. everything went fine, just finished 1327 * - error quit: We close the channels so the channel threads 1328 * finish the qio_channel_read_all_eof() 1329 */ 1330 if (p->c) { 1331 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1332 } 1333 } 1334 } 1335 1336 void multifd_recv_shutdown(void) 1337 { 1338 if (migrate_multifd()) { 1339 multifd_recv_terminate_threads(NULL); 1340 } 1341 } 1342 1343 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1344 { 1345 migration_ioc_unregister_yank(p->c); 1346 object_unref(OBJECT(p->c)); 1347 p->c = NULL; 1348 qemu_mutex_destroy(&p->mutex); 1349 qemu_sem_destroy(&p->sem_sync); 1350 qemu_sem_destroy(&p->sem); 1351 g_free(p->name); 1352 p->name = NULL; 1353 p->packet_len = 0; 1354 g_free(p->packet); 1355 p->packet = NULL; 1356 g_free(p->iov); 1357 p->iov = NULL; 1358 g_free(p->normal); 1359 p->normal = NULL; 1360 g_free(p->zero); 1361 p->zero = NULL; 1362 multifd_recv_state->ops->recv_cleanup(p); 1363 } 1364 1365 static void multifd_recv_cleanup_state(void) 1366 { 1367 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1368 g_free(multifd_recv_state->params); 1369 multifd_recv_state->params = NULL; 1370 g_free(multifd_recv_state->data); 1371 multifd_recv_state->data = NULL; 1372 g_free(multifd_recv_state); 1373 multifd_recv_state = NULL; 1374 } 1375 1376 void multifd_recv_cleanup(void) 1377 { 1378 int i; 1379 1380 if (!migrate_multifd()) { 1381 return; 1382 } 1383 multifd_recv_terminate_threads(NULL); 1384 for (i = 0; i < migrate_multifd_channels(); i++) { 1385 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1386 1387 if (p->thread_created) { 1388 qemu_thread_join(&p->thread); 1389 } 1390 } 1391 for (i = 0; i < migrate_multifd_channels(); i++) { 1392 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1393 } 1394 multifd_recv_cleanup_state(); 1395 } 1396 1397 void multifd_recv_sync_main(void) 1398 { 1399 int thread_count = migrate_multifd_channels(); 1400 bool file_based = !multifd_use_packets(); 1401 int i; 1402 1403 if (!migrate_multifd()) { 1404 return; 1405 } 1406 1407 /* 1408 * File-based channels don't use packets and therefore need to 1409 * wait for more work. Release them to start the sync. 1410 */ 1411 if (file_based) { 1412 for (i = 0; i < thread_count; i++) { 1413 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1414 1415 trace_multifd_recv_sync_main_signal(p->id); 1416 qemu_sem_post(&p->sem); 1417 } 1418 } 1419 1420 /* 1421 * Initiate the synchronization by waiting for all channels. 1422 * 1423 * For socket-based migration this means each channel has received 1424 * the SYNC packet on the stream. 1425 * 1426 * For file-based migration this means each channel is done with 1427 * the work (pending_job=false). 1428 */ 1429 for (i = 0; i < thread_count; i++) { 1430 trace_multifd_recv_sync_main_wait(i); 1431 qemu_sem_wait(&multifd_recv_state->sem_sync); 1432 } 1433 1434 if (file_based) { 1435 /* 1436 * For file-based loading is done in one iteration. We're 1437 * done. 1438 */ 1439 return; 1440 } 1441 1442 /* 1443 * Sync done. Release the channels for the next iteration. 1444 */ 1445 for (i = 0; i < thread_count; i++) { 1446 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1447 1448 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1449 if (multifd_recv_state->packet_num < p->packet_num) { 1450 multifd_recv_state->packet_num = p->packet_num; 1451 } 1452 } 1453 trace_multifd_recv_sync_main_signal(p->id); 1454 qemu_sem_post(&p->sem_sync); 1455 } 1456 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1457 } 1458 1459 static void *multifd_recv_thread(void *opaque) 1460 { 1461 MultiFDRecvParams *p = opaque; 1462 Error *local_err = NULL; 1463 bool use_packets = multifd_use_packets(); 1464 int ret; 1465 1466 trace_multifd_recv_thread_start(p->id); 1467 rcu_register_thread(); 1468 1469 while (true) { 1470 uint32_t flags = 0; 1471 bool has_data = false; 1472 p->normal_num = 0; 1473 1474 if (use_packets) { 1475 if (multifd_recv_should_exit()) { 1476 break; 1477 } 1478 1479 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1480 p->packet_len, &local_err); 1481 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1482 break; 1483 } 1484 1485 qemu_mutex_lock(&p->mutex); 1486 ret = multifd_recv_unfill_packet(p, &local_err); 1487 if (ret) { 1488 qemu_mutex_unlock(&p->mutex); 1489 break; 1490 } 1491 1492 flags = p->flags; 1493 /* recv methods don't know how to handle the SYNC flag */ 1494 p->flags &= ~MULTIFD_FLAG_SYNC; 1495 has_data = p->normal_num || p->zero_num; 1496 qemu_mutex_unlock(&p->mutex); 1497 } else { 1498 /* 1499 * No packets, so we need to wait for the vmstate code to 1500 * give us work. 1501 */ 1502 qemu_sem_wait(&p->sem); 1503 1504 if (multifd_recv_should_exit()) { 1505 break; 1506 } 1507 1508 /* pairs with qatomic_store_release() at multifd_recv() */ 1509 if (!qatomic_load_acquire(&p->pending_job)) { 1510 /* 1511 * Migration thread did not send work, this is 1512 * equivalent to pending_sync on the sending 1513 * side. Post sem_sync to notify we reached this 1514 * point. 1515 */ 1516 qemu_sem_post(&multifd_recv_state->sem_sync); 1517 continue; 1518 } 1519 1520 has_data = !!p->data->size; 1521 } 1522 1523 if (has_data) { 1524 ret = multifd_recv_state->ops->recv(p, &local_err); 1525 if (ret != 0) { 1526 break; 1527 } 1528 } 1529 1530 if (use_packets) { 1531 if (flags & MULTIFD_FLAG_SYNC) { 1532 qemu_sem_post(&multifd_recv_state->sem_sync); 1533 qemu_sem_wait(&p->sem_sync); 1534 } 1535 } else { 1536 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1537 p->data->size = 0; 1538 /* 1539 * Order data->size update before clearing 1540 * pending_job. Pairs with smp_mb_acquire() at 1541 * multifd_recv(). 1542 */ 1543 qatomic_store_release(&p->pending_job, false); 1544 } 1545 } 1546 1547 if (local_err) { 1548 multifd_recv_terminate_threads(local_err); 1549 error_free(local_err); 1550 } 1551 1552 rcu_unregister_thread(); 1553 trace_multifd_recv_thread_end(p->id, p->packets_recved, 1554 p->total_normal_pages, 1555 p->total_zero_pages); 1556 1557 return NULL; 1558 } 1559 1560 int multifd_recv_setup(Error **errp) 1561 { 1562 int thread_count; 1563 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1564 bool use_packets = multifd_use_packets(); 1565 uint8_t i; 1566 1567 /* 1568 * Return successfully if multiFD recv state is already initialised 1569 * or multiFD is not enabled. 1570 */ 1571 if (multifd_recv_state || !migrate_multifd()) { 1572 return 0; 1573 } 1574 1575 thread_count = migrate_multifd_channels(); 1576 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1577 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1578 1579 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1580 multifd_recv_state->data->size = 0; 1581 1582 qatomic_set(&multifd_recv_state->count, 0); 1583 qatomic_set(&multifd_recv_state->exiting, 0); 1584 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1585 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1586 1587 for (i = 0; i < thread_count; i++) { 1588 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1589 1590 qemu_mutex_init(&p->mutex); 1591 qemu_sem_init(&p->sem_sync, 0); 1592 qemu_sem_init(&p->sem, 0); 1593 p->pending_job = false; 1594 p->id = i; 1595 1596 p->data = g_new0(MultiFDRecvData, 1); 1597 p->data->size = 0; 1598 1599 if (use_packets) { 1600 p->packet_len = sizeof(MultiFDPacket_t) 1601 + sizeof(uint64_t) * page_count; 1602 p->packet = g_malloc0(p->packet_len); 1603 } 1604 p->name = g_strdup_printf("multifdrecv_%d", i); 1605 p->iov = g_new0(struct iovec, page_count); 1606 p->normal = g_new0(ram_addr_t, page_count); 1607 p->zero = g_new0(ram_addr_t, page_count); 1608 p->page_count = page_count; 1609 p->page_size = qemu_target_page_size(); 1610 } 1611 1612 for (i = 0; i < thread_count; i++) { 1613 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1614 int ret; 1615 1616 ret = multifd_recv_state->ops->recv_setup(p, errp); 1617 if (ret) { 1618 return ret; 1619 } 1620 } 1621 return 0; 1622 } 1623 1624 bool multifd_recv_all_channels_created(void) 1625 { 1626 int thread_count = migrate_multifd_channels(); 1627 1628 if (!migrate_multifd()) { 1629 return true; 1630 } 1631 1632 if (!multifd_recv_state) { 1633 /* Called before any connections created */ 1634 return false; 1635 } 1636 1637 return thread_count == qatomic_read(&multifd_recv_state->count); 1638 } 1639 1640 /* 1641 * Try to receive all multifd channels to get ready for the migration. 1642 * Sets @errp when failing to receive the current channel. 1643 */ 1644 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1645 { 1646 MultiFDRecvParams *p; 1647 Error *local_err = NULL; 1648 bool use_packets = multifd_use_packets(); 1649 int id; 1650 1651 if (use_packets) { 1652 id = multifd_recv_initial_packet(ioc, &local_err); 1653 if (id < 0) { 1654 multifd_recv_terminate_threads(local_err); 1655 error_propagate_prepend(errp, local_err, 1656 "failed to receive packet" 1657 " via multifd channel %d: ", 1658 qatomic_read(&multifd_recv_state->count)); 1659 return; 1660 } 1661 trace_multifd_recv_new_channel(id); 1662 } else { 1663 id = qatomic_read(&multifd_recv_state->count); 1664 } 1665 1666 p = &multifd_recv_state->params[id]; 1667 if (p->c != NULL) { 1668 error_setg(&local_err, "multifd: received id '%d' already setup'", 1669 id); 1670 multifd_recv_terminate_threads(local_err); 1671 error_propagate(errp, local_err); 1672 return; 1673 } 1674 p->c = ioc; 1675 object_ref(OBJECT(ioc)); 1676 1677 p->thread_created = true; 1678 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1679 QEMU_THREAD_JOINABLE); 1680 qatomic_inc(&multifd_recv_state->count); 1681 } 1682 1683 bool multifd_send_prepare_common(MultiFDSendParams *p) 1684 { 1685 multifd_send_zero_page_detect(p); 1686 1687 if (!p->pages->normal_num) { 1688 p->next_packet_size = 0; 1689 return false; 1690 } 1691 1692 multifd_send_prepare_header(p); 1693 1694 return true; 1695 } 1696