1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "fd.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* array of pages to sent */ 53 MultiFDPages_t *pages; 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 static bool multifd_use_packets(void) 102 { 103 return !migrate_mapped_ram(); 104 } 105 106 void multifd_send_channel_created(void) 107 { 108 qemu_sem_post(&multifd_send_state->channels_created); 109 } 110 111 static void multifd_set_file_bitmap(MultiFDSendParams *p) 112 { 113 MultiFDPages_t *pages = p->pages; 114 115 assert(pages->block); 116 117 for (int i = 0; i < p->pages->num; i++) { 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i]); 119 } 120 } 121 122 /* Multifd without compression */ 123 124 /** 125 * nocomp_send_setup: setup send side 126 * 127 * @p: Params for the channel that we are using 128 * @errp: pointer to an error 129 */ 130 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 131 { 132 if (migrate_zero_copy_send()) { 133 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 134 } 135 136 return 0; 137 } 138 139 /** 140 * nocomp_send_cleanup: cleanup send side 141 * 142 * For no compression this function does nothing. 143 * 144 * @p: Params for the channel that we are using 145 * @errp: pointer to an error 146 */ 147 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 148 { 149 return; 150 } 151 152 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 153 { 154 MultiFDPages_t *pages = p->pages; 155 156 for (int i = 0; i < pages->num; i++) { 157 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 158 p->iov[p->iovs_num].iov_len = p->page_size; 159 p->iovs_num++; 160 } 161 162 p->next_packet_size = pages->num * p->page_size; 163 } 164 165 /** 166 * nocomp_send_prepare: prepare date to be able to send 167 * 168 * For no compression we just have to calculate the size of the 169 * packet. 170 * 171 * Returns 0 for success or -1 for error 172 * 173 * @p: Params for the channel that we are using 174 * @errp: pointer to an error 175 */ 176 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 177 { 178 bool use_zero_copy_send = migrate_zero_copy_send(); 179 int ret; 180 181 if (!multifd_use_packets()) { 182 multifd_send_prepare_iovs(p); 183 multifd_set_file_bitmap(p); 184 185 return 0; 186 } 187 188 if (!use_zero_copy_send) { 189 /* 190 * Only !zerocopy needs the header in IOV; zerocopy will 191 * send it separately. 192 */ 193 multifd_send_prepare_header(p); 194 } 195 196 multifd_send_prepare_iovs(p); 197 p->flags |= MULTIFD_FLAG_NOCOMP; 198 199 multifd_send_fill_packet(p); 200 201 if (use_zero_copy_send) { 202 /* Send header first, without zerocopy */ 203 ret = qio_channel_write_all(p->c, (void *)p->packet, 204 p->packet_len, errp); 205 if (ret != 0) { 206 return -1; 207 } 208 } 209 210 return 0; 211 } 212 213 /** 214 * nocomp_recv_setup: setup receive side 215 * 216 * For no compression this function does nothing. 217 * 218 * Returns 0 for success or -1 for error 219 * 220 * @p: Params for the channel that we are using 221 * @errp: pointer to an error 222 */ 223 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 224 { 225 return 0; 226 } 227 228 /** 229 * nocomp_recv_cleanup: setup receive side 230 * 231 * For no compression this function does nothing. 232 * 233 * @p: Params for the channel that we are using 234 */ 235 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 236 { 237 } 238 239 /** 240 * nocomp_recv: read the data from the channel 241 * 242 * For no compression we just need to read things into the correct place. 243 * 244 * Returns 0 for success or -1 for error 245 * 246 * @p: Params for the channel that we are using 247 * @errp: pointer to an error 248 */ 249 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 250 { 251 uint32_t flags; 252 253 if (!multifd_use_packets()) { 254 return multifd_file_recv_data(p, errp); 255 } 256 257 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 258 259 if (flags != MULTIFD_FLAG_NOCOMP) { 260 error_setg(errp, "multifd %u: flags received %x flags expected %x", 261 p->id, flags, MULTIFD_FLAG_NOCOMP); 262 return -1; 263 } 264 for (int i = 0; i < p->normal_num; i++) { 265 p->iov[i].iov_base = p->host + p->normal[i]; 266 p->iov[i].iov_len = p->page_size; 267 } 268 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 269 } 270 271 static MultiFDMethods multifd_nocomp_ops = { 272 .send_setup = nocomp_send_setup, 273 .send_cleanup = nocomp_send_cleanup, 274 .send_prepare = nocomp_send_prepare, 275 .recv_setup = nocomp_recv_setup, 276 .recv_cleanup = nocomp_recv_cleanup, 277 .recv = nocomp_recv 278 }; 279 280 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 281 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 282 }; 283 284 void multifd_register_ops(int method, MultiFDMethods *ops) 285 { 286 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 287 multifd_ops[method] = ops; 288 } 289 290 /* Reset a MultiFDPages_t* object for the next use */ 291 static void multifd_pages_reset(MultiFDPages_t *pages) 292 { 293 /* 294 * We don't need to touch offset[] array, because it will be 295 * overwritten later when reused. 296 */ 297 pages->num = 0; 298 pages->block = NULL; 299 } 300 301 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 302 { 303 MultiFDInit_t msg = {}; 304 size_t size = sizeof(msg); 305 int ret; 306 307 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 308 msg.version = cpu_to_be32(MULTIFD_VERSION); 309 msg.id = p->id; 310 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 311 312 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 313 if (ret != 0) { 314 return -1; 315 } 316 stat64_add(&mig_stats.multifd_bytes, size); 317 return 0; 318 } 319 320 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 321 { 322 MultiFDInit_t msg; 323 int ret; 324 325 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 326 if (ret != 0) { 327 return -1; 328 } 329 330 msg.magic = be32_to_cpu(msg.magic); 331 msg.version = be32_to_cpu(msg.version); 332 333 if (msg.magic != MULTIFD_MAGIC) { 334 error_setg(errp, "multifd: received packet magic %x " 335 "expected %x", msg.magic, MULTIFD_MAGIC); 336 return -1; 337 } 338 339 if (msg.version != MULTIFD_VERSION) { 340 error_setg(errp, "multifd: received packet version %u " 341 "expected %u", msg.version, MULTIFD_VERSION); 342 return -1; 343 } 344 345 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 346 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 347 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 348 349 error_setg(errp, "multifd: received uuid '%s' and expected " 350 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 351 g_free(uuid); 352 g_free(msg_uuid); 353 return -1; 354 } 355 356 if (msg.id > migrate_multifd_channels()) { 357 error_setg(errp, "multifd: received channel id %u is greater than " 358 "number of channels %u", msg.id, migrate_multifd_channels()); 359 return -1; 360 } 361 362 return msg.id; 363 } 364 365 static MultiFDPages_t *multifd_pages_init(uint32_t n) 366 { 367 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 368 369 pages->allocated = n; 370 pages->offset = g_new0(ram_addr_t, n); 371 372 return pages; 373 } 374 375 static void multifd_pages_clear(MultiFDPages_t *pages) 376 { 377 multifd_pages_reset(pages); 378 pages->allocated = 0; 379 g_free(pages->offset); 380 pages->offset = NULL; 381 g_free(pages); 382 } 383 384 void multifd_send_fill_packet(MultiFDSendParams *p) 385 { 386 MultiFDPacket_t *packet = p->packet; 387 MultiFDPages_t *pages = p->pages; 388 uint64_t packet_num; 389 int i; 390 391 packet->flags = cpu_to_be32(p->flags); 392 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 393 packet->normal_pages = cpu_to_be32(pages->num); 394 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 395 396 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 397 packet->packet_num = cpu_to_be64(packet_num); 398 399 if (pages->block) { 400 strncpy(packet->ramblock, pages->block->idstr, 256); 401 } 402 403 for (i = 0; i < pages->num; i++) { 404 /* there are architectures where ram_addr_t is 32 bit */ 405 uint64_t temp = pages->offset[i]; 406 407 packet->offset[i] = cpu_to_be64(temp); 408 } 409 410 p->packets_sent++; 411 p->total_normal_pages += pages->num; 412 413 trace_multifd_send(p->id, packet_num, pages->num, p->flags, 414 p->next_packet_size); 415 } 416 417 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 418 { 419 MultiFDPacket_t *packet = p->packet; 420 int i; 421 422 packet->magic = be32_to_cpu(packet->magic); 423 if (packet->magic != MULTIFD_MAGIC) { 424 error_setg(errp, "multifd: received packet " 425 "magic %x and expected magic %x", 426 packet->magic, MULTIFD_MAGIC); 427 return -1; 428 } 429 430 packet->version = be32_to_cpu(packet->version); 431 if (packet->version != MULTIFD_VERSION) { 432 error_setg(errp, "multifd: received packet " 433 "version %u and expected version %u", 434 packet->version, MULTIFD_VERSION); 435 return -1; 436 } 437 438 p->flags = be32_to_cpu(packet->flags); 439 440 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 441 /* 442 * If we received a packet that is 100 times bigger than expected 443 * just stop migration. It is a magic number. 444 */ 445 if (packet->pages_alloc > p->page_count) { 446 error_setg(errp, "multifd: received packet " 447 "with size %u and expected a size of %u", 448 packet->pages_alloc, p->page_count) ; 449 return -1; 450 } 451 452 p->normal_num = be32_to_cpu(packet->normal_pages); 453 if (p->normal_num > packet->pages_alloc) { 454 error_setg(errp, "multifd: received packet " 455 "with %u pages and expected maximum pages are %u", 456 p->normal_num, packet->pages_alloc) ; 457 return -1; 458 } 459 460 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 461 p->packet_num = be64_to_cpu(packet->packet_num); 462 p->packets_recved++; 463 p->total_normal_pages += p->normal_num; 464 465 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, 466 p->next_packet_size); 467 468 if (p->normal_num == 0) { 469 return 0; 470 } 471 472 /* make sure that ramblock is 0 terminated */ 473 packet->ramblock[255] = 0; 474 p->block = qemu_ram_block_by_name(packet->ramblock); 475 if (!p->block) { 476 error_setg(errp, "multifd: unknown ram block %s", 477 packet->ramblock); 478 return -1; 479 } 480 481 p->host = p->block->host; 482 for (i = 0; i < p->normal_num; i++) { 483 uint64_t offset = be64_to_cpu(packet->offset[i]); 484 485 if (offset > (p->block->used_length - p->page_size)) { 486 error_setg(errp, "multifd: offset too long %" PRIu64 487 " (max " RAM_ADDR_FMT ")", 488 offset, p->block->used_length); 489 return -1; 490 } 491 p->normal[i] = offset; 492 } 493 494 return 0; 495 } 496 497 static bool multifd_send_should_exit(void) 498 { 499 return qatomic_read(&multifd_send_state->exiting); 500 } 501 502 static bool multifd_recv_should_exit(void) 503 { 504 return qatomic_read(&multifd_recv_state->exiting); 505 } 506 507 /* 508 * The migration thread can wait on either of the two semaphores. This 509 * function can be used to kick the main thread out of waiting on either of 510 * them. Should mostly only be called when something wrong happened with 511 * the current multifd send thread. 512 */ 513 static void multifd_send_kick_main(MultiFDSendParams *p) 514 { 515 qemu_sem_post(&p->sem_sync); 516 qemu_sem_post(&multifd_send_state->channels_ready); 517 } 518 519 /* 520 * How we use multifd_send_state->pages and channel->pages? 521 * 522 * We create a pages for each channel, and a main one. Each time that 523 * we need to send a batch of pages we interchange the ones between 524 * multifd_send_state and the channel that is sending it. There are 525 * two reasons for that: 526 * - to not have to do so many mallocs during migration 527 * - to make easier to know what to free at the end of migration 528 * 529 * This way we always know who is the owner of each "pages" struct, 530 * and we don't need any locking. It belongs to the migration thread 531 * or to the channel thread. Switching is safe because the migration 532 * thread is using the channel mutex when changing it, and the channel 533 * have to had finish with its own, otherwise pending_job can't be 534 * false. 535 * 536 * Returns true if succeed, false otherwise. 537 */ 538 static bool multifd_send_pages(void) 539 { 540 int i; 541 static int next_channel; 542 MultiFDSendParams *p = NULL; /* make happy gcc */ 543 MultiFDPages_t *pages = multifd_send_state->pages; 544 545 if (multifd_send_should_exit()) { 546 return false; 547 } 548 549 /* We wait here, until at least one channel is ready */ 550 qemu_sem_wait(&multifd_send_state->channels_ready); 551 552 /* 553 * next_channel can remain from a previous migration that was 554 * using more channels, so ensure it doesn't overflow if the 555 * limit is lower now. 556 */ 557 next_channel %= migrate_multifd_channels(); 558 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 559 if (multifd_send_should_exit()) { 560 return false; 561 } 562 p = &multifd_send_state->params[i]; 563 /* 564 * Lockless read to p->pending_job is safe, because only multifd 565 * sender thread can clear it. 566 */ 567 if (qatomic_read(&p->pending_job) == false) { 568 next_channel = (i + 1) % migrate_multifd_channels(); 569 break; 570 } 571 } 572 573 /* 574 * Make sure we read p->pending_job before all the rest. Pairs with 575 * qatomic_store_release() in multifd_send_thread(). 576 */ 577 smp_mb_acquire(); 578 assert(!p->pages->num); 579 multifd_send_state->pages = p->pages; 580 p->pages = pages; 581 /* 582 * Making sure p->pages is setup before marking pending_job=true. Pairs 583 * with the qatomic_load_acquire() in multifd_send_thread(). 584 */ 585 qatomic_store_release(&p->pending_job, true); 586 qemu_sem_post(&p->sem); 587 588 return true; 589 } 590 591 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 592 { 593 return pages->num == 0; 594 } 595 596 static inline bool multifd_queue_full(MultiFDPages_t *pages) 597 { 598 return pages->num == pages->allocated; 599 } 600 601 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 602 { 603 pages->offset[pages->num++] = offset; 604 } 605 606 /* Returns true if enqueue successful, false otherwise */ 607 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 608 { 609 MultiFDPages_t *pages; 610 611 retry: 612 pages = multifd_send_state->pages; 613 614 /* If the queue is empty, we can already enqueue now */ 615 if (multifd_queue_empty(pages)) { 616 pages->block = block; 617 multifd_enqueue(pages, offset); 618 return true; 619 } 620 621 /* 622 * Not empty, meanwhile we need a flush. It can because of either: 623 * 624 * (1) The page is not on the same ramblock of previous ones, or, 625 * (2) The queue is full. 626 * 627 * After flush, always retry. 628 */ 629 if (pages->block != block || multifd_queue_full(pages)) { 630 if (!multifd_send_pages()) { 631 return false; 632 } 633 goto retry; 634 } 635 636 /* Not empty, and we still have space, do it! */ 637 multifd_enqueue(pages, offset); 638 return true; 639 } 640 641 /* Multifd send side hit an error; remember it and prepare to quit */ 642 static void multifd_send_set_error(Error *err) 643 { 644 /* 645 * We don't want to exit each threads twice. Depending on where 646 * we get the error, or if there are two independent errors in two 647 * threads at the same time, we can end calling this function 648 * twice. 649 */ 650 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 651 return; 652 } 653 654 if (err) { 655 MigrationState *s = migrate_get_current(); 656 migrate_set_error(s, err); 657 if (s->state == MIGRATION_STATUS_SETUP || 658 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 659 s->state == MIGRATION_STATUS_DEVICE || 660 s->state == MIGRATION_STATUS_ACTIVE) { 661 migrate_set_state(&s->state, s->state, 662 MIGRATION_STATUS_FAILED); 663 } 664 } 665 } 666 667 static void multifd_send_terminate_threads(void) 668 { 669 int i; 670 671 trace_multifd_send_terminate_threads(); 672 673 /* 674 * Tell everyone we're quitting. No xchg() needed here; we simply 675 * always set it. 676 */ 677 qatomic_set(&multifd_send_state->exiting, 1); 678 679 /* 680 * Firstly, kick all threads out; no matter whether they are just idle, 681 * or blocked in an IO system call. 682 */ 683 for (i = 0; i < migrate_multifd_channels(); i++) { 684 MultiFDSendParams *p = &multifd_send_state->params[i]; 685 686 qemu_sem_post(&p->sem); 687 if (p->c) { 688 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 689 } 690 } 691 692 /* 693 * Finally recycle all the threads. 694 */ 695 for (i = 0; i < migrate_multifd_channels(); i++) { 696 MultiFDSendParams *p = &multifd_send_state->params[i]; 697 698 if (p->tls_thread_created) { 699 qemu_thread_join(&p->tls_thread); 700 } 701 702 if (p->thread_created) { 703 qemu_thread_join(&p->thread); 704 } 705 } 706 } 707 708 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 709 { 710 if (p->c) { 711 migration_ioc_unregister_yank(p->c); 712 /* 713 * An explicit close() on the channel here is normally not 714 * required, but can be helpful for "file:" iochannels, where it 715 * will include fdatasync() to make sure the data is flushed to the 716 * disk backend. 717 * 718 * The object_unref() cannot guarantee that because: (1) finalize() 719 * of the iochannel is only triggered on the last reference, and 720 * it's not guaranteed that we always hold the last refcount when 721 * reaching here, and, (2) even if finalize() is invoked, it only 722 * does a close(fd) without data flush. 723 */ 724 qio_channel_close(p->c, &error_abort); 725 object_unref(OBJECT(p->c)); 726 p->c = NULL; 727 } 728 qemu_sem_destroy(&p->sem); 729 qemu_sem_destroy(&p->sem_sync); 730 g_free(p->name); 731 p->name = NULL; 732 multifd_pages_clear(p->pages); 733 p->pages = NULL; 734 p->packet_len = 0; 735 g_free(p->packet); 736 p->packet = NULL; 737 g_free(p->iov); 738 p->iov = NULL; 739 multifd_send_state->ops->send_cleanup(p, errp); 740 741 return *errp == NULL; 742 } 743 744 static void multifd_send_cleanup_state(void) 745 { 746 file_cleanup_outgoing_migration(); 747 fd_cleanup_outgoing_migration(); 748 socket_cleanup_outgoing_migration(); 749 qemu_sem_destroy(&multifd_send_state->channels_created); 750 qemu_sem_destroy(&multifd_send_state->channels_ready); 751 g_free(multifd_send_state->params); 752 multifd_send_state->params = NULL; 753 multifd_pages_clear(multifd_send_state->pages); 754 multifd_send_state->pages = NULL; 755 g_free(multifd_send_state); 756 multifd_send_state = NULL; 757 } 758 759 void multifd_send_shutdown(void) 760 { 761 int i; 762 763 if (!migrate_multifd()) { 764 return; 765 } 766 767 multifd_send_terminate_threads(); 768 769 for (i = 0; i < migrate_multifd_channels(); i++) { 770 MultiFDSendParams *p = &multifd_send_state->params[i]; 771 Error *local_err = NULL; 772 773 if (!multifd_send_cleanup_channel(p, &local_err)) { 774 migrate_set_error(migrate_get_current(), local_err); 775 error_free(local_err); 776 } 777 } 778 779 multifd_send_cleanup_state(); 780 } 781 782 static int multifd_zero_copy_flush(QIOChannel *c) 783 { 784 int ret; 785 Error *err = NULL; 786 787 ret = qio_channel_flush(c, &err); 788 if (ret < 0) { 789 error_report_err(err); 790 return -1; 791 } 792 if (ret == 1) { 793 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 794 } 795 796 return ret; 797 } 798 799 int multifd_send_sync_main(void) 800 { 801 int i; 802 bool flush_zero_copy; 803 804 if (!migrate_multifd()) { 805 return 0; 806 } 807 if (multifd_send_state->pages->num) { 808 if (!multifd_send_pages()) { 809 error_report("%s: multifd_send_pages fail", __func__); 810 return -1; 811 } 812 } 813 814 flush_zero_copy = migrate_zero_copy_send(); 815 816 for (i = 0; i < migrate_multifd_channels(); i++) { 817 MultiFDSendParams *p = &multifd_send_state->params[i]; 818 819 if (multifd_send_should_exit()) { 820 return -1; 821 } 822 823 trace_multifd_send_sync_main_signal(p->id); 824 825 /* 826 * We should be the only user so far, so not possible to be set by 827 * others concurrently. 828 */ 829 assert(qatomic_read(&p->pending_sync) == false); 830 qatomic_set(&p->pending_sync, true); 831 qemu_sem_post(&p->sem); 832 } 833 for (i = 0; i < migrate_multifd_channels(); i++) { 834 MultiFDSendParams *p = &multifd_send_state->params[i]; 835 836 if (multifd_send_should_exit()) { 837 return -1; 838 } 839 840 qemu_sem_wait(&multifd_send_state->channels_ready); 841 trace_multifd_send_sync_main_wait(p->id); 842 qemu_sem_wait(&p->sem_sync); 843 844 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 845 return -1; 846 } 847 } 848 trace_multifd_send_sync_main(multifd_send_state->packet_num); 849 850 return 0; 851 } 852 853 static void *multifd_send_thread(void *opaque) 854 { 855 MultiFDSendParams *p = opaque; 856 MigrationThread *thread = NULL; 857 Error *local_err = NULL; 858 int ret = 0; 859 bool use_packets = multifd_use_packets(); 860 861 thread = migration_threads_add(p->name, qemu_get_thread_id()); 862 863 trace_multifd_send_thread_start(p->id); 864 rcu_register_thread(); 865 866 if (use_packets) { 867 if (multifd_send_initial_packet(p, &local_err) < 0) { 868 ret = -1; 869 goto out; 870 } 871 } 872 873 while (true) { 874 qemu_sem_post(&multifd_send_state->channels_ready); 875 qemu_sem_wait(&p->sem); 876 877 if (multifd_send_should_exit()) { 878 break; 879 } 880 881 /* 882 * Read pending_job flag before p->pages. Pairs with the 883 * qatomic_store_release() in multifd_send_pages(). 884 */ 885 if (qatomic_load_acquire(&p->pending_job)) { 886 MultiFDPages_t *pages = p->pages; 887 888 p->iovs_num = 0; 889 assert(pages->num); 890 891 ret = multifd_send_state->ops->send_prepare(p, &local_err); 892 if (ret != 0) { 893 break; 894 } 895 896 if (migrate_mapped_ram()) { 897 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 898 p->pages->block, &local_err); 899 } else { 900 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 901 NULL, 0, p->write_flags, 902 &local_err); 903 } 904 905 if (ret != 0) { 906 break; 907 } 908 909 stat64_add(&mig_stats.multifd_bytes, 910 p->next_packet_size + p->packet_len); 911 912 multifd_pages_reset(p->pages); 913 p->next_packet_size = 0; 914 915 /* 916 * Making sure p->pages is published before saying "we're 917 * free". Pairs with the smp_mb_acquire() in 918 * multifd_send_pages(). 919 */ 920 qatomic_store_release(&p->pending_job, false); 921 } else { 922 /* 923 * If not a normal job, must be a sync request. Note that 924 * pending_sync is a standalone flag (unlike pending_job), so 925 * it doesn't require explicit memory barriers. 926 */ 927 assert(qatomic_read(&p->pending_sync)); 928 929 if (use_packets) { 930 p->flags = MULTIFD_FLAG_SYNC; 931 multifd_send_fill_packet(p); 932 ret = qio_channel_write_all(p->c, (void *)p->packet, 933 p->packet_len, &local_err); 934 if (ret != 0) { 935 break; 936 } 937 /* p->next_packet_size will always be zero for a SYNC packet */ 938 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 939 p->flags = 0; 940 } 941 942 qatomic_set(&p->pending_sync, false); 943 qemu_sem_post(&p->sem_sync); 944 } 945 } 946 947 out: 948 if (ret) { 949 assert(local_err); 950 trace_multifd_send_error(p->id); 951 multifd_send_set_error(local_err); 952 multifd_send_kick_main(p); 953 error_free(local_err); 954 } 955 956 rcu_unregister_thread(); 957 migration_threads_remove(thread); 958 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); 959 960 return NULL; 961 } 962 963 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 964 965 typedef struct { 966 MultiFDSendParams *p; 967 QIOChannelTLS *tioc; 968 } MultiFDTLSThreadArgs; 969 970 static void *multifd_tls_handshake_thread(void *opaque) 971 { 972 MultiFDTLSThreadArgs *args = opaque; 973 974 qio_channel_tls_handshake(args->tioc, 975 multifd_new_send_channel_async, 976 args->p, 977 NULL, 978 NULL); 979 g_free(args); 980 981 return NULL; 982 } 983 984 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 985 QIOChannel *ioc, 986 Error **errp) 987 { 988 MigrationState *s = migrate_get_current(); 989 const char *hostname = s->hostname; 990 MultiFDTLSThreadArgs *args; 991 QIOChannelTLS *tioc; 992 993 tioc = migration_tls_client_create(ioc, hostname, errp); 994 if (!tioc) { 995 return false; 996 } 997 998 /* 999 * Ownership of the socket channel now transfers to the newly 1000 * created TLS channel, which has already taken a reference. 1001 */ 1002 object_unref(OBJECT(ioc)); 1003 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1004 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1005 1006 args = g_new0(MultiFDTLSThreadArgs, 1); 1007 args->tioc = tioc; 1008 args->p = p; 1009 1010 p->tls_thread_created = true; 1011 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 1012 multifd_tls_handshake_thread, args, 1013 QEMU_THREAD_JOINABLE); 1014 return true; 1015 } 1016 1017 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1018 { 1019 qio_channel_set_delay(ioc, false); 1020 1021 migration_ioc_register_yank(ioc); 1022 /* Setup p->c only if the channel is completely setup */ 1023 p->c = ioc; 1024 1025 p->thread_created = true; 1026 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1027 QEMU_THREAD_JOINABLE); 1028 } 1029 1030 /* 1031 * When TLS is enabled this function is called once to establish the 1032 * TLS connection and a second time after the TLS handshake to create 1033 * the multifd channel. Without TLS it goes straight into the channel 1034 * creation. 1035 */ 1036 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1037 { 1038 MultiFDSendParams *p = opaque; 1039 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1040 Error *local_err = NULL; 1041 bool ret; 1042 1043 trace_multifd_new_send_channel_async(p->id); 1044 1045 if (qio_task_propagate_error(task, &local_err)) { 1046 ret = false; 1047 goto out; 1048 } 1049 1050 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1051 migrate_get_current()->hostname); 1052 1053 if (migrate_channel_requires_tls_upgrade(ioc)) { 1054 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1055 if (ret) { 1056 return; 1057 } 1058 } else { 1059 multifd_channel_connect(p, ioc); 1060 ret = true; 1061 } 1062 1063 out: 1064 /* 1065 * Here we're not interested whether creation succeeded, only that 1066 * it happened at all. 1067 */ 1068 multifd_send_channel_created(); 1069 1070 if (ret) { 1071 return; 1072 } 1073 1074 trace_multifd_new_send_channel_async_error(p->id, local_err); 1075 multifd_send_set_error(local_err); 1076 /* 1077 * For error cases (TLS or non-TLS), IO channel is always freed here 1078 * rather than when cleanup multifd: since p->c is not set, multifd 1079 * cleanup code doesn't even know its existence. 1080 */ 1081 object_unref(OBJECT(ioc)); 1082 error_free(local_err); 1083 } 1084 1085 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1086 { 1087 if (!multifd_use_packets()) { 1088 return file_send_channel_create(opaque, errp); 1089 } 1090 1091 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1092 return true; 1093 } 1094 1095 bool multifd_send_setup(void) 1096 { 1097 MigrationState *s = migrate_get_current(); 1098 Error *local_err = NULL; 1099 int thread_count, ret = 0; 1100 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1101 bool use_packets = multifd_use_packets(); 1102 uint8_t i; 1103 1104 if (!migrate_multifd()) { 1105 return true; 1106 } 1107 1108 thread_count = migrate_multifd_channels(); 1109 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1110 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1111 multifd_send_state->pages = multifd_pages_init(page_count); 1112 qemu_sem_init(&multifd_send_state->channels_created, 0); 1113 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1114 qatomic_set(&multifd_send_state->exiting, 0); 1115 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1116 1117 for (i = 0; i < thread_count; i++) { 1118 MultiFDSendParams *p = &multifd_send_state->params[i]; 1119 1120 qemu_sem_init(&p->sem, 0); 1121 qemu_sem_init(&p->sem_sync, 0); 1122 p->id = i; 1123 p->pages = multifd_pages_init(page_count); 1124 1125 if (use_packets) { 1126 p->packet_len = sizeof(MultiFDPacket_t) 1127 + sizeof(uint64_t) * page_count; 1128 p->packet = g_malloc0(p->packet_len); 1129 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1130 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1131 1132 /* We need one extra place for the packet header */ 1133 p->iov = g_new0(struct iovec, page_count + 1); 1134 } else { 1135 p->iov = g_new0(struct iovec, page_count); 1136 } 1137 p->name = g_strdup_printf("multifdsend_%d", i); 1138 p->page_size = qemu_target_page_size(); 1139 p->page_count = page_count; 1140 p->write_flags = 0; 1141 1142 if (!multifd_new_send_channel_create(p, &local_err)) { 1143 return false; 1144 } 1145 } 1146 1147 /* 1148 * Wait until channel creation has started for all channels. The 1149 * creation can still fail, but no more channels will be created 1150 * past this point. 1151 */ 1152 for (i = 0; i < thread_count; i++) { 1153 qemu_sem_wait(&multifd_send_state->channels_created); 1154 } 1155 1156 for (i = 0; i < thread_count; i++) { 1157 MultiFDSendParams *p = &multifd_send_state->params[i]; 1158 1159 ret = multifd_send_state->ops->send_setup(p, &local_err); 1160 if (ret) { 1161 break; 1162 } 1163 } 1164 1165 if (ret) { 1166 migrate_set_error(s, local_err); 1167 error_report_err(local_err); 1168 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1169 MIGRATION_STATUS_FAILED); 1170 return false; 1171 } 1172 1173 return true; 1174 } 1175 1176 bool multifd_recv(void) 1177 { 1178 int i; 1179 static int next_recv_channel; 1180 MultiFDRecvParams *p = NULL; 1181 MultiFDRecvData *data = multifd_recv_state->data; 1182 1183 /* 1184 * next_channel can remain from a previous migration that was 1185 * using more channels, so ensure it doesn't overflow if the 1186 * limit is lower now. 1187 */ 1188 next_recv_channel %= migrate_multifd_channels(); 1189 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1190 if (multifd_recv_should_exit()) { 1191 return false; 1192 } 1193 1194 p = &multifd_recv_state->params[i]; 1195 1196 if (qatomic_read(&p->pending_job) == false) { 1197 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1198 break; 1199 } 1200 } 1201 1202 /* 1203 * Order pending_job read before manipulating p->data below. Pairs 1204 * with qatomic_store_release() at multifd_recv_thread(). 1205 */ 1206 smp_mb_acquire(); 1207 1208 assert(!p->data->size); 1209 multifd_recv_state->data = p->data; 1210 p->data = data; 1211 1212 /* 1213 * Order p->data update before setting pending_job. Pairs with 1214 * qatomic_load_acquire() at multifd_recv_thread(). 1215 */ 1216 qatomic_store_release(&p->pending_job, true); 1217 qemu_sem_post(&p->sem); 1218 1219 return true; 1220 } 1221 1222 MultiFDRecvData *multifd_get_recv_data(void) 1223 { 1224 return multifd_recv_state->data; 1225 } 1226 1227 static void multifd_recv_terminate_threads(Error *err) 1228 { 1229 int i; 1230 1231 trace_multifd_recv_terminate_threads(err != NULL); 1232 1233 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1234 return; 1235 } 1236 1237 if (err) { 1238 MigrationState *s = migrate_get_current(); 1239 migrate_set_error(s, err); 1240 if (s->state == MIGRATION_STATUS_SETUP || 1241 s->state == MIGRATION_STATUS_ACTIVE) { 1242 migrate_set_state(&s->state, s->state, 1243 MIGRATION_STATUS_FAILED); 1244 } 1245 } 1246 1247 for (i = 0; i < migrate_multifd_channels(); i++) { 1248 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1249 1250 /* 1251 * The migration thread and channels interact differently 1252 * depending on the presence of packets. 1253 */ 1254 if (multifd_use_packets()) { 1255 /* 1256 * The channel receives as long as there are packets. When 1257 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1258 * channel waits for the migration thread to sync. If the 1259 * sync never happens, do it here. 1260 */ 1261 qemu_sem_post(&p->sem_sync); 1262 } else { 1263 /* 1264 * The channel waits for the migration thread to give it 1265 * work. When the migration thread runs out of work, it 1266 * releases the channel and waits for any pending work to 1267 * finish. If we reach here (e.g. due to error) before the 1268 * work runs out, release the channel. 1269 */ 1270 qemu_sem_post(&p->sem); 1271 } 1272 1273 /* 1274 * We could arrive here for two reasons: 1275 * - normal quit, i.e. everything went fine, just finished 1276 * - error quit: We close the channels so the channel threads 1277 * finish the qio_channel_read_all_eof() 1278 */ 1279 if (p->c) { 1280 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1281 } 1282 } 1283 } 1284 1285 void multifd_recv_shutdown(void) 1286 { 1287 if (migrate_multifd()) { 1288 multifd_recv_terminate_threads(NULL); 1289 } 1290 } 1291 1292 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1293 { 1294 migration_ioc_unregister_yank(p->c); 1295 object_unref(OBJECT(p->c)); 1296 p->c = NULL; 1297 qemu_mutex_destroy(&p->mutex); 1298 qemu_sem_destroy(&p->sem_sync); 1299 qemu_sem_destroy(&p->sem); 1300 g_free(p->name); 1301 p->name = NULL; 1302 p->packet_len = 0; 1303 g_free(p->packet); 1304 p->packet = NULL; 1305 g_free(p->iov); 1306 p->iov = NULL; 1307 g_free(p->normal); 1308 p->normal = NULL; 1309 multifd_recv_state->ops->recv_cleanup(p); 1310 } 1311 1312 static void multifd_recv_cleanup_state(void) 1313 { 1314 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1315 g_free(multifd_recv_state->params); 1316 multifd_recv_state->params = NULL; 1317 g_free(multifd_recv_state->data); 1318 multifd_recv_state->data = NULL; 1319 g_free(multifd_recv_state); 1320 multifd_recv_state = NULL; 1321 } 1322 1323 void multifd_recv_cleanup(void) 1324 { 1325 int i; 1326 1327 if (!migrate_multifd()) { 1328 return; 1329 } 1330 multifd_recv_terminate_threads(NULL); 1331 for (i = 0; i < migrate_multifd_channels(); i++) { 1332 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1333 1334 if (p->thread_created) { 1335 qemu_thread_join(&p->thread); 1336 } 1337 } 1338 for (i = 0; i < migrate_multifd_channels(); i++) { 1339 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1340 } 1341 multifd_recv_cleanup_state(); 1342 } 1343 1344 void multifd_recv_sync_main(void) 1345 { 1346 int thread_count = migrate_multifd_channels(); 1347 bool file_based = !multifd_use_packets(); 1348 int i; 1349 1350 if (!migrate_multifd()) { 1351 return; 1352 } 1353 1354 /* 1355 * File-based channels don't use packets and therefore need to 1356 * wait for more work. Release them to start the sync. 1357 */ 1358 if (file_based) { 1359 for (i = 0; i < thread_count; i++) { 1360 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1361 1362 trace_multifd_recv_sync_main_signal(p->id); 1363 qemu_sem_post(&p->sem); 1364 } 1365 } 1366 1367 /* 1368 * Initiate the synchronization by waiting for all channels. 1369 * 1370 * For socket-based migration this means each channel has received 1371 * the SYNC packet on the stream. 1372 * 1373 * For file-based migration this means each channel is done with 1374 * the work (pending_job=false). 1375 */ 1376 for (i = 0; i < thread_count; i++) { 1377 trace_multifd_recv_sync_main_wait(i); 1378 qemu_sem_wait(&multifd_recv_state->sem_sync); 1379 } 1380 1381 if (file_based) { 1382 /* 1383 * For file-based loading is done in one iteration. We're 1384 * done. 1385 */ 1386 return; 1387 } 1388 1389 /* 1390 * Sync done. Release the channels for the next iteration. 1391 */ 1392 for (i = 0; i < thread_count; i++) { 1393 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1394 1395 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1396 if (multifd_recv_state->packet_num < p->packet_num) { 1397 multifd_recv_state->packet_num = p->packet_num; 1398 } 1399 } 1400 trace_multifd_recv_sync_main_signal(p->id); 1401 qemu_sem_post(&p->sem_sync); 1402 } 1403 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1404 } 1405 1406 static void *multifd_recv_thread(void *opaque) 1407 { 1408 MultiFDRecvParams *p = opaque; 1409 Error *local_err = NULL; 1410 bool use_packets = multifd_use_packets(); 1411 int ret; 1412 1413 trace_multifd_recv_thread_start(p->id); 1414 rcu_register_thread(); 1415 1416 while (true) { 1417 uint32_t flags = 0; 1418 bool has_data = false; 1419 p->normal_num = 0; 1420 1421 if (use_packets) { 1422 if (multifd_recv_should_exit()) { 1423 break; 1424 } 1425 1426 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1427 p->packet_len, &local_err); 1428 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1429 break; 1430 } 1431 1432 qemu_mutex_lock(&p->mutex); 1433 ret = multifd_recv_unfill_packet(p, &local_err); 1434 if (ret) { 1435 qemu_mutex_unlock(&p->mutex); 1436 break; 1437 } 1438 1439 flags = p->flags; 1440 /* recv methods don't know how to handle the SYNC flag */ 1441 p->flags &= ~MULTIFD_FLAG_SYNC; 1442 has_data = !!p->normal_num; 1443 qemu_mutex_unlock(&p->mutex); 1444 } else { 1445 /* 1446 * No packets, so we need to wait for the vmstate code to 1447 * give us work. 1448 */ 1449 qemu_sem_wait(&p->sem); 1450 1451 if (multifd_recv_should_exit()) { 1452 break; 1453 } 1454 1455 /* pairs with qatomic_store_release() at multifd_recv() */ 1456 if (!qatomic_load_acquire(&p->pending_job)) { 1457 /* 1458 * Migration thread did not send work, this is 1459 * equivalent to pending_sync on the sending 1460 * side. Post sem_sync to notify we reached this 1461 * point. 1462 */ 1463 qemu_sem_post(&multifd_recv_state->sem_sync); 1464 continue; 1465 } 1466 1467 has_data = !!p->data->size; 1468 } 1469 1470 if (has_data) { 1471 ret = multifd_recv_state->ops->recv(p, &local_err); 1472 if (ret != 0) { 1473 break; 1474 } 1475 } 1476 1477 if (use_packets) { 1478 if (flags & MULTIFD_FLAG_SYNC) { 1479 qemu_sem_post(&multifd_recv_state->sem_sync); 1480 qemu_sem_wait(&p->sem_sync); 1481 } 1482 } else { 1483 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1484 p->data->size = 0; 1485 /* 1486 * Order data->size update before clearing 1487 * pending_job. Pairs with smp_mb_acquire() at 1488 * multifd_recv(). 1489 */ 1490 qatomic_store_release(&p->pending_job, false); 1491 } 1492 } 1493 1494 if (local_err) { 1495 multifd_recv_terminate_threads(local_err); 1496 error_free(local_err); 1497 } 1498 1499 rcu_unregister_thread(); 1500 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); 1501 1502 return NULL; 1503 } 1504 1505 int multifd_recv_setup(Error **errp) 1506 { 1507 int thread_count; 1508 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1509 bool use_packets = multifd_use_packets(); 1510 uint8_t i; 1511 1512 /* 1513 * Return successfully if multiFD recv state is already initialised 1514 * or multiFD is not enabled. 1515 */ 1516 if (multifd_recv_state || !migrate_multifd()) { 1517 return 0; 1518 } 1519 1520 thread_count = migrate_multifd_channels(); 1521 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1522 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1523 1524 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1525 multifd_recv_state->data->size = 0; 1526 1527 qatomic_set(&multifd_recv_state->count, 0); 1528 qatomic_set(&multifd_recv_state->exiting, 0); 1529 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1530 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1531 1532 for (i = 0; i < thread_count; i++) { 1533 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1534 1535 qemu_mutex_init(&p->mutex); 1536 qemu_sem_init(&p->sem_sync, 0); 1537 qemu_sem_init(&p->sem, 0); 1538 p->pending_job = false; 1539 p->id = i; 1540 1541 p->data = g_new0(MultiFDRecvData, 1); 1542 p->data->size = 0; 1543 1544 if (use_packets) { 1545 p->packet_len = sizeof(MultiFDPacket_t) 1546 + sizeof(uint64_t) * page_count; 1547 p->packet = g_malloc0(p->packet_len); 1548 } 1549 p->name = g_strdup_printf("multifdrecv_%d", i); 1550 p->iov = g_new0(struct iovec, page_count); 1551 p->normal = g_new0(ram_addr_t, page_count); 1552 p->page_count = page_count; 1553 p->page_size = qemu_target_page_size(); 1554 } 1555 1556 for (i = 0; i < thread_count; i++) { 1557 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1558 int ret; 1559 1560 ret = multifd_recv_state->ops->recv_setup(p, errp); 1561 if (ret) { 1562 return ret; 1563 } 1564 } 1565 return 0; 1566 } 1567 1568 bool multifd_recv_all_channels_created(void) 1569 { 1570 int thread_count = migrate_multifd_channels(); 1571 1572 if (!migrate_multifd()) { 1573 return true; 1574 } 1575 1576 if (!multifd_recv_state) { 1577 /* Called before any connections created */ 1578 return false; 1579 } 1580 1581 return thread_count == qatomic_read(&multifd_recv_state->count); 1582 } 1583 1584 /* 1585 * Try to receive all multifd channels to get ready for the migration. 1586 * Sets @errp when failing to receive the current channel. 1587 */ 1588 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1589 { 1590 MultiFDRecvParams *p; 1591 Error *local_err = NULL; 1592 bool use_packets = multifd_use_packets(); 1593 int id; 1594 1595 if (use_packets) { 1596 id = multifd_recv_initial_packet(ioc, &local_err); 1597 if (id < 0) { 1598 multifd_recv_terminate_threads(local_err); 1599 error_propagate_prepend(errp, local_err, 1600 "failed to receive packet" 1601 " via multifd channel %d: ", 1602 qatomic_read(&multifd_recv_state->count)); 1603 return; 1604 } 1605 trace_multifd_recv_new_channel(id); 1606 } else { 1607 id = qatomic_read(&multifd_recv_state->count); 1608 } 1609 1610 p = &multifd_recv_state->params[id]; 1611 if (p->c != NULL) { 1612 error_setg(&local_err, "multifd: received id '%d' already setup'", 1613 id); 1614 multifd_recv_terminate_threads(local_err); 1615 error_propagate(errp, local_err); 1616 return; 1617 } 1618 p->c = ioc; 1619 object_ref(OBJECT(ioc)); 1620 1621 p->thread_created = true; 1622 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1623 QEMU_THREAD_JOINABLE); 1624 qatomic_inc(&multifd_recv_state->count); 1625 } 1626