1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "fd.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* array of pages to sent */ 53 MultiFDPages_t *pages; 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 static bool multifd_use_packets(void) 102 { 103 return !migrate_mapped_ram(); 104 } 105 106 void multifd_send_channel_created(void) 107 { 108 qemu_sem_post(&multifd_send_state->channels_created); 109 } 110 111 static void multifd_set_file_bitmap(MultiFDSendParams *p) 112 { 113 MultiFDPages_t *pages = p->pages; 114 115 assert(pages->block); 116 117 for (int i = 0; i < p->pages->num; i++) { 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i]); 119 } 120 } 121 122 /* Multifd without compression */ 123 124 /** 125 * nocomp_send_setup: setup send side 126 * 127 * @p: Params for the channel that we are using 128 * @errp: pointer to an error 129 */ 130 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 131 { 132 if (migrate_zero_copy_send()) { 133 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 134 } 135 136 return 0; 137 } 138 139 /** 140 * nocomp_send_cleanup: cleanup send side 141 * 142 * For no compression this function does nothing. 143 * 144 * @p: Params for the channel that we are using 145 * @errp: pointer to an error 146 */ 147 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 148 { 149 return; 150 } 151 152 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 153 { 154 MultiFDPages_t *pages = p->pages; 155 156 for (int i = 0; i < pages->num; i++) { 157 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 158 p->iov[p->iovs_num].iov_len = p->page_size; 159 p->iovs_num++; 160 } 161 162 p->next_packet_size = pages->num * p->page_size; 163 } 164 165 /** 166 * nocomp_send_prepare: prepare date to be able to send 167 * 168 * For no compression we just have to calculate the size of the 169 * packet. 170 * 171 * Returns 0 for success or -1 for error 172 * 173 * @p: Params for the channel that we are using 174 * @errp: pointer to an error 175 */ 176 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 177 { 178 bool use_zero_copy_send = migrate_zero_copy_send(); 179 int ret; 180 181 if (!multifd_use_packets()) { 182 multifd_send_prepare_iovs(p); 183 multifd_set_file_bitmap(p); 184 185 return 0; 186 } 187 188 if (!use_zero_copy_send) { 189 /* 190 * Only !zerocopy needs the header in IOV; zerocopy will 191 * send it separately. 192 */ 193 multifd_send_prepare_header(p); 194 } 195 196 multifd_send_prepare_iovs(p); 197 p->flags |= MULTIFD_FLAG_NOCOMP; 198 199 multifd_send_fill_packet(p); 200 201 if (use_zero_copy_send) { 202 /* Send header first, without zerocopy */ 203 ret = qio_channel_write_all(p->c, (void *)p->packet, 204 p->packet_len, errp); 205 if (ret != 0) { 206 return -1; 207 } 208 } 209 210 return 0; 211 } 212 213 /** 214 * nocomp_recv_setup: setup receive side 215 * 216 * For no compression this function does nothing. 217 * 218 * Returns 0 for success or -1 for error 219 * 220 * @p: Params for the channel that we are using 221 * @errp: pointer to an error 222 */ 223 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 224 { 225 return 0; 226 } 227 228 /** 229 * nocomp_recv_cleanup: setup receive side 230 * 231 * For no compression this function does nothing. 232 * 233 * @p: Params for the channel that we are using 234 */ 235 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 236 { 237 } 238 239 /** 240 * nocomp_recv: read the data from the channel 241 * 242 * For no compression we just need to read things into the correct place. 243 * 244 * Returns 0 for success or -1 for error 245 * 246 * @p: Params for the channel that we are using 247 * @errp: pointer to an error 248 */ 249 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 250 { 251 uint32_t flags; 252 253 if (!multifd_use_packets()) { 254 return multifd_file_recv_data(p, errp); 255 } 256 257 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 258 259 if (flags != MULTIFD_FLAG_NOCOMP) { 260 error_setg(errp, "multifd %u: flags received %x flags expected %x", 261 p->id, flags, MULTIFD_FLAG_NOCOMP); 262 return -1; 263 } 264 for (int i = 0; i < p->normal_num; i++) { 265 p->iov[i].iov_base = p->host + p->normal[i]; 266 p->iov[i].iov_len = p->page_size; 267 } 268 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 269 } 270 271 static MultiFDMethods multifd_nocomp_ops = { 272 .send_setup = nocomp_send_setup, 273 .send_cleanup = nocomp_send_cleanup, 274 .send_prepare = nocomp_send_prepare, 275 .recv_setup = nocomp_recv_setup, 276 .recv_cleanup = nocomp_recv_cleanup, 277 .recv = nocomp_recv 278 }; 279 280 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 281 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 282 }; 283 284 void multifd_register_ops(int method, MultiFDMethods *ops) 285 { 286 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 287 multifd_ops[method] = ops; 288 } 289 290 /* Reset a MultiFDPages_t* object for the next use */ 291 static void multifd_pages_reset(MultiFDPages_t *pages) 292 { 293 /* 294 * We don't need to touch offset[] array, because it will be 295 * overwritten later when reused. 296 */ 297 pages->num = 0; 298 pages->block = NULL; 299 } 300 301 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 302 { 303 MultiFDInit_t msg = {}; 304 size_t size = sizeof(msg); 305 int ret; 306 307 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 308 msg.version = cpu_to_be32(MULTIFD_VERSION); 309 msg.id = p->id; 310 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 311 312 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 313 if (ret != 0) { 314 return -1; 315 } 316 stat64_add(&mig_stats.multifd_bytes, size); 317 return 0; 318 } 319 320 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 321 { 322 MultiFDInit_t msg; 323 int ret; 324 325 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 326 if (ret != 0) { 327 return -1; 328 } 329 330 msg.magic = be32_to_cpu(msg.magic); 331 msg.version = be32_to_cpu(msg.version); 332 333 if (msg.magic != MULTIFD_MAGIC) { 334 error_setg(errp, "multifd: received packet magic %x " 335 "expected %x", msg.magic, MULTIFD_MAGIC); 336 return -1; 337 } 338 339 if (msg.version != MULTIFD_VERSION) { 340 error_setg(errp, "multifd: received packet version %u " 341 "expected %u", msg.version, MULTIFD_VERSION); 342 return -1; 343 } 344 345 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 346 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 347 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 348 349 error_setg(errp, "multifd: received uuid '%s' and expected " 350 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 351 g_free(uuid); 352 g_free(msg_uuid); 353 return -1; 354 } 355 356 if (msg.id > migrate_multifd_channels()) { 357 error_setg(errp, "multifd: received channel id %u is greater than " 358 "number of channels %u", msg.id, migrate_multifd_channels()); 359 return -1; 360 } 361 362 return msg.id; 363 } 364 365 static MultiFDPages_t *multifd_pages_init(uint32_t n) 366 { 367 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 368 369 pages->allocated = n; 370 pages->offset = g_new0(ram_addr_t, n); 371 372 return pages; 373 } 374 375 static void multifd_pages_clear(MultiFDPages_t *pages) 376 { 377 multifd_pages_reset(pages); 378 pages->allocated = 0; 379 g_free(pages->offset); 380 pages->offset = NULL; 381 g_free(pages); 382 } 383 384 void multifd_send_fill_packet(MultiFDSendParams *p) 385 { 386 MultiFDPacket_t *packet = p->packet; 387 MultiFDPages_t *pages = p->pages; 388 uint64_t packet_num; 389 int i; 390 391 packet->flags = cpu_to_be32(p->flags); 392 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 393 packet->normal_pages = cpu_to_be32(pages->num); 394 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 395 396 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 397 packet->packet_num = cpu_to_be64(packet_num); 398 399 if (pages->block) { 400 strncpy(packet->ramblock, pages->block->idstr, 256); 401 } 402 403 for (i = 0; i < pages->num; i++) { 404 /* there are architectures where ram_addr_t is 32 bit */ 405 uint64_t temp = pages->offset[i]; 406 407 packet->offset[i] = cpu_to_be64(temp); 408 } 409 410 p->packets_sent++; 411 p->total_normal_pages += pages->num; 412 413 trace_multifd_send(p->id, packet_num, pages->num, p->flags, 414 p->next_packet_size); 415 } 416 417 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 418 { 419 MultiFDPacket_t *packet = p->packet; 420 int i; 421 422 packet->magic = be32_to_cpu(packet->magic); 423 if (packet->magic != MULTIFD_MAGIC) { 424 error_setg(errp, "multifd: received packet " 425 "magic %x and expected magic %x", 426 packet->magic, MULTIFD_MAGIC); 427 return -1; 428 } 429 430 packet->version = be32_to_cpu(packet->version); 431 if (packet->version != MULTIFD_VERSION) { 432 error_setg(errp, "multifd: received packet " 433 "version %u and expected version %u", 434 packet->version, MULTIFD_VERSION); 435 return -1; 436 } 437 438 p->flags = be32_to_cpu(packet->flags); 439 440 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 441 /* 442 * If we received a packet that is 100 times bigger than expected 443 * just stop migration. It is a magic number. 444 */ 445 if (packet->pages_alloc > p->page_count) { 446 error_setg(errp, "multifd: received packet " 447 "with size %u and expected a size of %u", 448 packet->pages_alloc, p->page_count) ; 449 return -1; 450 } 451 452 p->normal_num = be32_to_cpu(packet->normal_pages); 453 if (p->normal_num > packet->pages_alloc) { 454 error_setg(errp, "multifd: received packet " 455 "with %u pages and expected maximum pages are %u", 456 p->normal_num, packet->pages_alloc) ; 457 return -1; 458 } 459 460 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 461 p->packet_num = be64_to_cpu(packet->packet_num); 462 p->packets_recved++; 463 p->total_normal_pages += p->normal_num; 464 465 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, 466 p->next_packet_size); 467 468 if (p->normal_num == 0) { 469 return 0; 470 } 471 472 /* make sure that ramblock is 0 terminated */ 473 packet->ramblock[255] = 0; 474 p->block = qemu_ram_block_by_name(packet->ramblock); 475 if (!p->block) { 476 error_setg(errp, "multifd: unknown ram block %s", 477 packet->ramblock); 478 return -1; 479 } 480 481 p->host = p->block->host; 482 for (i = 0; i < p->normal_num; i++) { 483 uint64_t offset = be64_to_cpu(packet->offset[i]); 484 485 if (offset > (p->block->used_length - p->page_size)) { 486 error_setg(errp, "multifd: offset too long %" PRIu64 487 " (max " RAM_ADDR_FMT ")", 488 offset, p->block->used_length); 489 return -1; 490 } 491 p->normal[i] = offset; 492 } 493 494 return 0; 495 } 496 497 static bool multifd_send_should_exit(void) 498 { 499 return qatomic_read(&multifd_send_state->exiting); 500 } 501 502 static bool multifd_recv_should_exit(void) 503 { 504 return qatomic_read(&multifd_recv_state->exiting); 505 } 506 507 /* 508 * The migration thread can wait on either of the two semaphores. This 509 * function can be used to kick the main thread out of waiting on either of 510 * them. Should mostly only be called when something wrong happened with 511 * the current multifd send thread. 512 */ 513 static void multifd_send_kick_main(MultiFDSendParams *p) 514 { 515 qemu_sem_post(&p->sem_sync); 516 qemu_sem_post(&multifd_send_state->channels_ready); 517 } 518 519 /* 520 * How we use multifd_send_state->pages and channel->pages? 521 * 522 * We create a pages for each channel, and a main one. Each time that 523 * we need to send a batch of pages we interchange the ones between 524 * multifd_send_state and the channel that is sending it. There are 525 * two reasons for that: 526 * - to not have to do so many mallocs during migration 527 * - to make easier to know what to free at the end of migration 528 * 529 * This way we always know who is the owner of each "pages" struct, 530 * and we don't need any locking. It belongs to the migration thread 531 * or to the channel thread. Switching is safe because the migration 532 * thread is using the channel mutex when changing it, and the channel 533 * have to had finish with its own, otherwise pending_job can't be 534 * false. 535 * 536 * Returns true if succeed, false otherwise. 537 */ 538 static bool multifd_send_pages(void) 539 { 540 int i; 541 static int next_channel; 542 MultiFDSendParams *p = NULL; /* make happy gcc */ 543 MultiFDPages_t *pages = multifd_send_state->pages; 544 545 if (multifd_send_should_exit()) { 546 return false; 547 } 548 549 /* We wait here, until at least one channel is ready */ 550 qemu_sem_wait(&multifd_send_state->channels_ready); 551 552 /* 553 * next_channel can remain from a previous migration that was 554 * using more channels, so ensure it doesn't overflow if the 555 * limit is lower now. 556 */ 557 next_channel %= migrate_multifd_channels(); 558 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 559 if (multifd_send_should_exit()) { 560 return false; 561 } 562 p = &multifd_send_state->params[i]; 563 /* 564 * Lockless read to p->pending_job is safe, because only multifd 565 * sender thread can clear it. 566 */ 567 if (qatomic_read(&p->pending_job) == false) { 568 next_channel = (i + 1) % migrate_multifd_channels(); 569 break; 570 } 571 } 572 573 /* 574 * Make sure we read p->pending_job before all the rest. Pairs with 575 * qatomic_store_release() in multifd_send_thread(). 576 */ 577 smp_mb_acquire(); 578 assert(!p->pages->num); 579 multifd_send_state->pages = p->pages; 580 p->pages = pages; 581 /* 582 * Making sure p->pages is setup before marking pending_job=true. Pairs 583 * with the qatomic_load_acquire() in multifd_send_thread(). 584 */ 585 qatomic_store_release(&p->pending_job, true); 586 qemu_sem_post(&p->sem); 587 588 return true; 589 } 590 591 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 592 { 593 return pages->num == 0; 594 } 595 596 static inline bool multifd_queue_full(MultiFDPages_t *pages) 597 { 598 return pages->num == pages->allocated; 599 } 600 601 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 602 { 603 pages->offset[pages->num++] = offset; 604 } 605 606 /* Returns true if enqueue successful, false otherwise */ 607 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 608 { 609 MultiFDPages_t *pages; 610 611 retry: 612 pages = multifd_send_state->pages; 613 614 /* If the queue is empty, we can already enqueue now */ 615 if (multifd_queue_empty(pages)) { 616 pages->block = block; 617 multifd_enqueue(pages, offset); 618 return true; 619 } 620 621 /* 622 * Not empty, meanwhile we need a flush. It can because of either: 623 * 624 * (1) The page is not on the same ramblock of previous ones, or, 625 * (2) The queue is full. 626 * 627 * After flush, always retry. 628 */ 629 if (pages->block != block || multifd_queue_full(pages)) { 630 if (!multifd_send_pages()) { 631 return false; 632 } 633 goto retry; 634 } 635 636 /* Not empty, and we still have space, do it! */ 637 multifd_enqueue(pages, offset); 638 return true; 639 } 640 641 /* Multifd send side hit an error; remember it and prepare to quit */ 642 static void multifd_send_set_error(Error *err) 643 { 644 /* 645 * We don't want to exit each threads twice. Depending on where 646 * we get the error, or if there are two independent errors in two 647 * threads at the same time, we can end calling this function 648 * twice. 649 */ 650 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 651 return; 652 } 653 654 if (err) { 655 MigrationState *s = migrate_get_current(); 656 migrate_set_error(s, err); 657 if (s->state == MIGRATION_STATUS_SETUP || 658 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 659 s->state == MIGRATION_STATUS_DEVICE || 660 s->state == MIGRATION_STATUS_ACTIVE) { 661 migrate_set_state(&s->state, s->state, 662 MIGRATION_STATUS_FAILED); 663 } 664 } 665 } 666 667 static void multifd_send_terminate_threads(void) 668 { 669 int i; 670 671 trace_multifd_send_terminate_threads(); 672 673 /* 674 * Tell everyone we're quitting. No xchg() needed here; we simply 675 * always set it. 676 */ 677 qatomic_set(&multifd_send_state->exiting, 1); 678 679 /* 680 * Firstly, kick all threads out; no matter whether they are just idle, 681 * or blocked in an IO system call. 682 */ 683 for (i = 0; i < migrate_multifd_channels(); i++) { 684 MultiFDSendParams *p = &multifd_send_state->params[i]; 685 686 qemu_sem_post(&p->sem); 687 if (p->c) { 688 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 689 } 690 } 691 692 /* 693 * Finally recycle all the threads. 694 */ 695 for (i = 0; i < migrate_multifd_channels(); i++) { 696 MultiFDSendParams *p = &multifd_send_state->params[i]; 697 698 if (p->tls_thread_created) { 699 qemu_thread_join(&p->tls_thread); 700 } 701 702 if (p->thread_created) { 703 qemu_thread_join(&p->thread); 704 } 705 } 706 } 707 708 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 709 { 710 if (p->c) { 711 migration_ioc_unregister_yank(p->c); 712 /* 713 * The object_unref() cannot guarantee the fd will always be 714 * released because finalize() of the iochannel is only 715 * triggered on the last reference and it's not guaranteed 716 * that we always hold the last refcount when reaching here. 717 * 718 * Closing the fd explicitly has the benefit that if there is any 719 * registered I/O handler callbacks on such fd, that will get a 720 * POLLNVAL event and will further trigger the cleanup to finally 721 * release the IOC. 722 * 723 * FIXME: It should logically be guaranteed that all multifd 724 * channels have no I/O handler callback registered when reaching 725 * here, because migration thread will wait for all multifd channel 726 * establishments to complete during setup. Since 727 * migrate_fd_cleanup() will be scheduled in main thread too, all 728 * previous callbacks should guarantee to be completed when 729 * reaching here. See multifd_send_state.channels_created and its 730 * usage. In the future, we could replace this with an assert 731 * making sure we're the last reference, or simply drop it if above 732 * is more clear to be justified. 733 */ 734 qio_channel_close(p->c, &error_abort); 735 object_unref(OBJECT(p->c)); 736 p->c = NULL; 737 } 738 qemu_sem_destroy(&p->sem); 739 qemu_sem_destroy(&p->sem_sync); 740 g_free(p->name); 741 p->name = NULL; 742 multifd_pages_clear(p->pages); 743 p->pages = NULL; 744 p->packet_len = 0; 745 g_free(p->packet); 746 p->packet = NULL; 747 g_free(p->iov); 748 p->iov = NULL; 749 multifd_send_state->ops->send_cleanup(p, errp); 750 751 return *errp == NULL; 752 } 753 754 static void multifd_send_cleanup_state(void) 755 { 756 file_cleanup_outgoing_migration(); 757 fd_cleanup_outgoing_migration(); 758 socket_cleanup_outgoing_migration(); 759 qemu_sem_destroy(&multifd_send_state->channels_created); 760 qemu_sem_destroy(&multifd_send_state->channels_ready); 761 g_free(multifd_send_state->params); 762 multifd_send_state->params = NULL; 763 multifd_pages_clear(multifd_send_state->pages); 764 multifd_send_state->pages = NULL; 765 g_free(multifd_send_state); 766 multifd_send_state = NULL; 767 } 768 769 void multifd_send_shutdown(void) 770 { 771 int i; 772 773 if (!migrate_multifd()) { 774 return; 775 } 776 777 multifd_send_terminate_threads(); 778 779 for (i = 0; i < migrate_multifd_channels(); i++) { 780 MultiFDSendParams *p = &multifd_send_state->params[i]; 781 Error *local_err = NULL; 782 783 if (!multifd_send_cleanup_channel(p, &local_err)) { 784 migrate_set_error(migrate_get_current(), local_err); 785 error_free(local_err); 786 } 787 } 788 789 multifd_send_cleanup_state(); 790 } 791 792 static int multifd_zero_copy_flush(QIOChannel *c) 793 { 794 int ret; 795 Error *err = NULL; 796 797 ret = qio_channel_flush(c, &err); 798 if (ret < 0) { 799 error_report_err(err); 800 return -1; 801 } 802 if (ret == 1) { 803 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 804 } 805 806 return ret; 807 } 808 809 int multifd_send_sync_main(void) 810 { 811 int i; 812 bool flush_zero_copy; 813 814 if (!migrate_multifd()) { 815 return 0; 816 } 817 if (multifd_send_state->pages->num) { 818 if (!multifd_send_pages()) { 819 error_report("%s: multifd_send_pages fail", __func__); 820 return -1; 821 } 822 } 823 824 flush_zero_copy = migrate_zero_copy_send(); 825 826 for (i = 0; i < migrate_multifd_channels(); i++) { 827 MultiFDSendParams *p = &multifd_send_state->params[i]; 828 829 if (multifd_send_should_exit()) { 830 return -1; 831 } 832 833 trace_multifd_send_sync_main_signal(p->id); 834 835 /* 836 * We should be the only user so far, so not possible to be set by 837 * others concurrently. 838 */ 839 assert(qatomic_read(&p->pending_sync) == false); 840 qatomic_set(&p->pending_sync, true); 841 qemu_sem_post(&p->sem); 842 } 843 for (i = 0; i < migrate_multifd_channels(); i++) { 844 MultiFDSendParams *p = &multifd_send_state->params[i]; 845 846 if (multifd_send_should_exit()) { 847 return -1; 848 } 849 850 qemu_sem_wait(&multifd_send_state->channels_ready); 851 trace_multifd_send_sync_main_wait(p->id); 852 qemu_sem_wait(&p->sem_sync); 853 854 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 855 return -1; 856 } 857 } 858 trace_multifd_send_sync_main(multifd_send_state->packet_num); 859 860 return 0; 861 } 862 863 static void *multifd_send_thread(void *opaque) 864 { 865 MultiFDSendParams *p = opaque; 866 MigrationThread *thread = NULL; 867 Error *local_err = NULL; 868 int ret = 0; 869 bool use_packets = multifd_use_packets(); 870 871 thread = migration_threads_add(p->name, qemu_get_thread_id()); 872 873 trace_multifd_send_thread_start(p->id); 874 rcu_register_thread(); 875 876 if (use_packets) { 877 if (multifd_send_initial_packet(p, &local_err) < 0) { 878 ret = -1; 879 goto out; 880 } 881 } 882 883 while (true) { 884 qemu_sem_post(&multifd_send_state->channels_ready); 885 qemu_sem_wait(&p->sem); 886 887 if (multifd_send_should_exit()) { 888 break; 889 } 890 891 /* 892 * Read pending_job flag before p->pages. Pairs with the 893 * qatomic_store_release() in multifd_send_pages(). 894 */ 895 if (qatomic_load_acquire(&p->pending_job)) { 896 MultiFDPages_t *pages = p->pages; 897 898 p->iovs_num = 0; 899 assert(pages->num); 900 901 ret = multifd_send_state->ops->send_prepare(p, &local_err); 902 if (ret != 0) { 903 break; 904 } 905 906 if (migrate_mapped_ram()) { 907 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 908 p->pages->block, &local_err); 909 } else { 910 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 911 NULL, 0, p->write_flags, 912 &local_err); 913 } 914 915 if (ret != 0) { 916 break; 917 } 918 919 stat64_add(&mig_stats.multifd_bytes, 920 p->next_packet_size + p->packet_len); 921 922 multifd_pages_reset(p->pages); 923 p->next_packet_size = 0; 924 925 /* 926 * Making sure p->pages is published before saying "we're 927 * free". Pairs with the smp_mb_acquire() in 928 * multifd_send_pages(). 929 */ 930 qatomic_store_release(&p->pending_job, false); 931 } else { 932 /* 933 * If not a normal job, must be a sync request. Note that 934 * pending_sync is a standalone flag (unlike pending_job), so 935 * it doesn't require explicit memory barriers. 936 */ 937 assert(qatomic_read(&p->pending_sync)); 938 939 if (use_packets) { 940 p->flags = MULTIFD_FLAG_SYNC; 941 multifd_send_fill_packet(p); 942 ret = qio_channel_write_all(p->c, (void *)p->packet, 943 p->packet_len, &local_err); 944 if (ret != 0) { 945 break; 946 } 947 /* p->next_packet_size will always be zero for a SYNC packet */ 948 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 949 p->flags = 0; 950 } 951 952 qatomic_set(&p->pending_sync, false); 953 qemu_sem_post(&p->sem_sync); 954 } 955 } 956 957 out: 958 if (ret) { 959 assert(local_err); 960 trace_multifd_send_error(p->id); 961 multifd_send_set_error(local_err); 962 multifd_send_kick_main(p); 963 error_free(local_err); 964 } 965 966 rcu_unregister_thread(); 967 migration_threads_remove(thread); 968 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); 969 970 return NULL; 971 } 972 973 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 974 975 typedef struct { 976 MultiFDSendParams *p; 977 QIOChannelTLS *tioc; 978 } MultiFDTLSThreadArgs; 979 980 static void *multifd_tls_handshake_thread(void *opaque) 981 { 982 MultiFDTLSThreadArgs *args = opaque; 983 984 qio_channel_tls_handshake(args->tioc, 985 multifd_new_send_channel_async, 986 args->p, 987 NULL, 988 NULL); 989 g_free(args); 990 991 return NULL; 992 } 993 994 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 995 QIOChannel *ioc, 996 Error **errp) 997 { 998 MigrationState *s = migrate_get_current(); 999 const char *hostname = s->hostname; 1000 MultiFDTLSThreadArgs *args; 1001 QIOChannelTLS *tioc; 1002 1003 tioc = migration_tls_client_create(ioc, hostname, errp); 1004 if (!tioc) { 1005 return false; 1006 } 1007 1008 /* 1009 * Ownership of the socket channel now transfers to the newly 1010 * created TLS channel, which has already taken a reference. 1011 */ 1012 object_unref(OBJECT(ioc)); 1013 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1014 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1015 1016 args = g_new0(MultiFDTLSThreadArgs, 1); 1017 args->tioc = tioc; 1018 args->p = p; 1019 1020 p->tls_thread_created = true; 1021 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 1022 multifd_tls_handshake_thread, args, 1023 QEMU_THREAD_JOINABLE); 1024 return true; 1025 } 1026 1027 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1028 { 1029 qio_channel_set_delay(ioc, false); 1030 1031 migration_ioc_register_yank(ioc); 1032 /* Setup p->c only if the channel is completely setup */ 1033 p->c = ioc; 1034 1035 p->thread_created = true; 1036 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1037 QEMU_THREAD_JOINABLE); 1038 } 1039 1040 /* 1041 * When TLS is enabled this function is called once to establish the 1042 * TLS connection and a second time after the TLS handshake to create 1043 * the multifd channel. Without TLS it goes straight into the channel 1044 * creation. 1045 */ 1046 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1047 { 1048 MultiFDSendParams *p = opaque; 1049 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1050 Error *local_err = NULL; 1051 bool ret; 1052 1053 trace_multifd_new_send_channel_async(p->id); 1054 1055 if (qio_task_propagate_error(task, &local_err)) { 1056 ret = false; 1057 goto out; 1058 } 1059 1060 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1061 migrate_get_current()->hostname); 1062 1063 if (migrate_channel_requires_tls_upgrade(ioc)) { 1064 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1065 if (ret) { 1066 return; 1067 } 1068 } else { 1069 multifd_channel_connect(p, ioc); 1070 ret = true; 1071 } 1072 1073 out: 1074 /* 1075 * Here we're not interested whether creation succeeded, only that 1076 * it happened at all. 1077 */ 1078 multifd_send_channel_created(); 1079 1080 if (ret) { 1081 return; 1082 } 1083 1084 trace_multifd_new_send_channel_async_error(p->id, local_err); 1085 multifd_send_set_error(local_err); 1086 /* 1087 * For error cases (TLS or non-TLS), IO channel is always freed here 1088 * rather than when cleanup multifd: since p->c is not set, multifd 1089 * cleanup code doesn't even know its existence. 1090 */ 1091 object_unref(OBJECT(ioc)); 1092 error_free(local_err); 1093 } 1094 1095 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1096 { 1097 if (!multifd_use_packets()) { 1098 return file_send_channel_create(opaque, errp); 1099 } 1100 1101 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1102 return true; 1103 } 1104 1105 bool multifd_send_setup(void) 1106 { 1107 MigrationState *s = migrate_get_current(); 1108 Error *local_err = NULL; 1109 int thread_count, ret = 0; 1110 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1111 bool use_packets = multifd_use_packets(); 1112 uint8_t i; 1113 1114 if (!migrate_multifd()) { 1115 return true; 1116 } 1117 1118 thread_count = migrate_multifd_channels(); 1119 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1120 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1121 multifd_send_state->pages = multifd_pages_init(page_count); 1122 qemu_sem_init(&multifd_send_state->channels_created, 0); 1123 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1124 qatomic_set(&multifd_send_state->exiting, 0); 1125 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1126 1127 for (i = 0; i < thread_count; i++) { 1128 MultiFDSendParams *p = &multifd_send_state->params[i]; 1129 1130 qemu_sem_init(&p->sem, 0); 1131 qemu_sem_init(&p->sem_sync, 0); 1132 p->id = i; 1133 p->pages = multifd_pages_init(page_count); 1134 1135 if (use_packets) { 1136 p->packet_len = sizeof(MultiFDPacket_t) 1137 + sizeof(uint64_t) * page_count; 1138 p->packet = g_malloc0(p->packet_len); 1139 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1140 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1141 1142 /* We need one extra place for the packet header */ 1143 p->iov = g_new0(struct iovec, page_count + 1); 1144 } else { 1145 p->iov = g_new0(struct iovec, page_count); 1146 } 1147 p->name = g_strdup_printf("multifdsend_%d", i); 1148 p->page_size = qemu_target_page_size(); 1149 p->page_count = page_count; 1150 p->write_flags = 0; 1151 1152 if (!multifd_new_send_channel_create(p, &local_err)) { 1153 return false; 1154 } 1155 } 1156 1157 /* 1158 * Wait until channel creation has started for all channels. The 1159 * creation can still fail, but no more channels will be created 1160 * past this point. 1161 */ 1162 for (i = 0; i < thread_count; i++) { 1163 qemu_sem_wait(&multifd_send_state->channels_created); 1164 } 1165 1166 for (i = 0; i < thread_count; i++) { 1167 MultiFDSendParams *p = &multifd_send_state->params[i]; 1168 1169 ret = multifd_send_state->ops->send_setup(p, &local_err); 1170 if (ret) { 1171 break; 1172 } 1173 } 1174 1175 if (ret) { 1176 migrate_set_error(s, local_err); 1177 error_report_err(local_err); 1178 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1179 MIGRATION_STATUS_FAILED); 1180 return false; 1181 } 1182 1183 return true; 1184 } 1185 1186 bool multifd_recv(void) 1187 { 1188 int i; 1189 static int next_recv_channel; 1190 MultiFDRecvParams *p = NULL; 1191 MultiFDRecvData *data = multifd_recv_state->data; 1192 1193 /* 1194 * next_channel can remain from a previous migration that was 1195 * using more channels, so ensure it doesn't overflow if the 1196 * limit is lower now. 1197 */ 1198 next_recv_channel %= migrate_multifd_channels(); 1199 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1200 if (multifd_recv_should_exit()) { 1201 return false; 1202 } 1203 1204 p = &multifd_recv_state->params[i]; 1205 1206 if (qatomic_read(&p->pending_job) == false) { 1207 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1208 break; 1209 } 1210 } 1211 1212 /* 1213 * Order pending_job read before manipulating p->data below. Pairs 1214 * with qatomic_store_release() at multifd_recv_thread(). 1215 */ 1216 smp_mb_acquire(); 1217 1218 assert(!p->data->size); 1219 multifd_recv_state->data = p->data; 1220 p->data = data; 1221 1222 /* 1223 * Order p->data update before setting pending_job. Pairs with 1224 * qatomic_load_acquire() at multifd_recv_thread(). 1225 */ 1226 qatomic_store_release(&p->pending_job, true); 1227 qemu_sem_post(&p->sem); 1228 1229 return true; 1230 } 1231 1232 MultiFDRecvData *multifd_get_recv_data(void) 1233 { 1234 return multifd_recv_state->data; 1235 } 1236 1237 static void multifd_recv_terminate_threads(Error *err) 1238 { 1239 int i; 1240 1241 trace_multifd_recv_terminate_threads(err != NULL); 1242 1243 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1244 return; 1245 } 1246 1247 if (err) { 1248 MigrationState *s = migrate_get_current(); 1249 migrate_set_error(s, err); 1250 if (s->state == MIGRATION_STATUS_SETUP || 1251 s->state == MIGRATION_STATUS_ACTIVE) { 1252 migrate_set_state(&s->state, s->state, 1253 MIGRATION_STATUS_FAILED); 1254 } 1255 } 1256 1257 for (i = 0; i < migrate_multifd_channels(); i++) { 1258 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1259 1260 /* 1261 * The migration thread and channels interact differently 1262 * depending on the presence of packets. 1263 */ 1264 if (multifd_use_packets()) { 1265 /* 1266 * The channel receives as long as there are packets. When 1267 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1268 * channel waits for the migration thread to sync. If the 1269 * sync never happens, do it here. 1270 */ 1271 qemu_sem_post(&p->sem_sync); 1272 } else { 1273 /* 1274 * The channel waits for the migration thread to give it 1275 * work. When the migration thread runs out of work, it 1276 * releases the channel and waits for any pending work to 1277 * finish. If we reach here (e.g. due to error) before the 1278 * work runs out, release the channel. 1279 */ 1280 qemu_sem_post(&p->sem); 1281 } 1282 1283 /* 1284 * We could arrive here for two reasons: 1285 * - normal quit, i.e. everything went fine, just finished 1286 * - error quit: We close the channels so the channel threads 1287 * finish the qio_channel_read_all_eof() 1288 */ 1289 if (p->c) { 1290 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1291 } 1292 } 1293 } 1294 1295 void multifd_recv_shutdown(void) 1296 { 1297 if (migrate_multifd()) { 1298 multifd_recv_terminate_threads(NULL); 1299 } 1300 } 1301 1302 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1303 { 1304 migration_ioc_unregister_yank(p->c); 1305 object_unref(OBJECT(p->c)); 1306 p->c = NULL; 1307 qemu_mutex_destroy(&p->mutex); 1308 qemu_sem_destroy(&p->sem_sync); 1309 qemu_sem_destroy(&p->sem); 1310 g_free(p->name); 1311 p->name = NULL; 1312 p->packet_len = 0; 1313 g_free(p->packet); 1314 p->packet = NULL; 1315 g_free(p->iov); 1316 p->iov = NULL; 1317 g_free(p->normal); 1318 p->normal = NULL; 1319 multifd_recv_state->ops->recv_cleanup(p); 1320 } 1321 1322 static void multifd_recv_cleanup_state(void) 1323 { 1324 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1325 g_free(multifd_recv_state->params); 1326 multifd_recv_state->params = NULL; 1327 g_free(multifd_recv_state->data); 1328 multifd_recv_state->data = NULL; 1329 g_free(multifd_recv_state); 1330 multifd_recv_state = NULL; 1331 } 1332 1333 void multifd_recv_cleanup(void) 1334 { 1335 int i; 1336 1337 if (!migrate_multifd()) { 1338 return; 1339 } 1340 multifd_recv_terminate_threads(NULL); 1341 for (i = 0; i < migrate_multifd_channels(); i++) { 1342 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1343 1344 if (p->thread_created) { 1345 qemu_thread_join(&p->thread); 1346 } 1347 } 1348 for (i = 0; i < migrate_multifd_channels(); i++) { 1349 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1350 } 1351 multifd_recv_cleanup_state(); 1352 } 1353 1354 void multifd_recv_sync_main(void) 1355 { 1356 int thread_count = migrate_multifd_channels(); 1357 bool file_based = !multifd_use_packets(); 1358 int i; 1359 1360 if (!migrate_multifd()) { 1361 return; 1362 } 1363 1364 /* 1365 * File-based channels don't use packets and therefore need to 1366 * wait for more work. Release them to start the sync. 1367 */ 1368 if (file_based) { 1369 for (i = 0; i < thread_count; i++) { 1370 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1371 1372 trace_multifd_recv_sync_main_signal(p->id); 1373 qemu_sem_post(&p->sem); 1374 } 1375 } 1376 1377 /* 1378 * Initiate the synchronization by waiting for all channels. 1379 * 1380 * For socket-based migration this means each channel has received 1381 * the SYNC packet on the stream. 1382 * 1383 * For file-based migration this means each channel is done with 1384 * the work (pending_job=false). 1385 */ 1386 for (i = 0; i < thread_count; i++) { 1387 trace_multifd_recv_sync_main_wait(i); 1388 qemu_sem_wait(&multifd_recv_state->sem_sync); 1389 } 1390 1391 if (file_based) { 1392 /* 1393 * For file-based loading is done in one iteration. We're 1394 * done. 1395 */ 1396 return; 1397 } 1398 1399 /* 1400 * Sync done. Release the channels for the next iteration. 1401 */ 1402 for (i = 0; i < thread_count; i++) { 1403 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1404 1405 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1406 if (multifd_recv_state->packet_num < p->packet_num) { 1407 multifd_recv_state->packet_num = p->packet_num; 1408 } 1409 } 1410 trace_multifd_recv_sync_main_signal(p->id); 1411 qemu_sem_post(&p->sem_sync); 1412 } 1413 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1414 } 1415 1416 static void *multifd_recv_thread(void *opaque) 1417 { 1418 MultiFDRecvParams *p = opaque; 1419 Error *local_err = NULL; 1420 bool use_packets = multifd_use_packets(); 1421 int ret; 1422 1423 trace_multifd_recv_thread_start(p->id); 1424 rcu_register_thread(); 1425 1426 while (true) { 1427 uint32_t flags = 0; 1428 bool has_data = false; 1429 p->normal_num = 0; 1430 1431 if (use_packets) { 1432 if (multifd_recv_should_exit()) { 1433 break; 1434 } 1435 1436 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1437 p->packet_len, &local_err); 1438 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1439 break; 1440 } 1441 1442 qemu_mutex_lock(&p->mutex); 1443 ret = multifd_recv_unfill_packet(p, &local_err); 1444 if (ret) { 1445 qemu_mutex_unlock(&p->mutex); 1446 break; 1447 } 1448 1449 flags = p->flags; 1450 /* recv methods don't know how to handle the SYNC flag */ 1451 p->flags &= ~MULTIFD_FLAG_SYNC; 1452 has_data = !!p->normal_num; 1453 qemu_mutex_unlock(&p->mutex); 1454 } else { 1455 /* 1456 * No packets, so we need to wait for the vmstate code to 1457 * give us work. 1458 */ 1459 qemu_sem_wait(&p->sem); 1460 1461 if (multifd_recv_should_exit()) { 1462 break; 1463 } 1464 1465 /* pairs with qatomic_store_release() at multifd_recv() */ 1466 if (!qatomic_load_acquire(&p->pending_job)) { 1467 /* 1468 * Migration thread did not send work, this is 1469 * equivalent to pending_sync on the sending 1470 * side. Post sem_sync to notify we reached this 1471 * point. 1472 */ 1473 qemu_sem_post(&multifd_recv_state->sem_sync); 1474 continue; 1475 } 1476 1477 has_data = !!p->data->size; 1478 } 1479 1480 if (has_data) { 1481 ret = multifd_recv_state->ops->recv(p, &local_err); 1482 if (ret != 0) { 1483 break; 1484 } 1485 } 1486 1487 if (use_packets) { 1488 if (flags & MULTIFD_FLAG_SYNC) { 1489 qemu_sem_post(&multifd_recv_state->sem_sync); 1490 qemu_sem_wait(&p->sem_sync); 1491 } 1492 } else { 1493 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1494 p->data->size = 0; 1495 /* 1496 * Order data->size update before clearing 1497 * pending_job. Pairs with smp_mb_acquire() at 1498 * multifd_recv(). 1499 */ 1500 qatomic_store_release(&p->pending_job, false); 1501 } 1502 } 1503 1504 if (local_err) { 1505 multifd_recv_terminate_threads(local_err); 1506 error_free(local_err); 1507 } 1508 1509 rcu_unregister_thread(); 1510 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); 1511 1512 return NULL; 1513 } 1514 1515 int multifd_recv_setup(Error **errp) 1516 { 1517 int thread_count; 1518 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1519 bool use_packets = multifd_use_packets(); 1520 uint8_t i; 1521 1522 /* 1523 * Return successfully if multiFD recv state is already initialised 1524 * or multiFD is not enabled. 1525 */ 1526 if (multifd_recv_state || !migrate_multifd()) { 1527 return 0; 1528 } 1529 1530 thread_count = migrate_multifd_channels(); 1531 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1532 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1533 1534 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1535 multifd_recv_state->data->size = 0; 1536 1537 qatomic_set(&multifd_recv_state->count, 0); 1538 qatomic_set(&multifd_recv_state->exiting, 0); 1539 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1540 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1541 1542 for (i = 0; i < thread_count; i++) { 1543 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1544 1545 qemu_mutex_init(&p->mutex); 1546 qemu_sem_init(&p->sem_sync, 0); 1547 qemu_sem_init(&p->sem, 0); 1548 p->pending_job = false; 1549 p->id = i; 1550 1551 p->data = g_new0(MultiFDRecvData, 1); 1552 p->data->size = 0; 1553 1554 if (use_packets) { 1555 p->packet_len = sizeof(MultiFDPacket_t) 1556 + sizeof(uint64_t) * page_count; 1557 p->packet = g_malloc0(p->packet_len); 1558 } 1559 p->name = g_strdup_printf("multifdrecv_%d", i); 1560 p->iov = g_new0(struct iovec, page_count); 1561 p->normal = g_new0(ram_addr_t, page_count); 1562 p->page_count = page_count; 1563 p->page_size = qemu_target_page_size(); 1564 } 1565 1566 for (i = 0; i < thread_count; i++) { 1567 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1568 int ret; 1569 1570 ret = multifd_recv_state->ops->recv_setup(p, errp); 1571 if (ret) { 1572 return ret; 1573 } 1574 } 1575 return 0; 1576 } 1577 1578 bool multifd_recv_all_channels_created(void) 1579 { 1580 int thread_count = migrate_multifd_channels(); 1581 1582 if (!migrate_multifd()) { 1583 return true; 1584 } 1585 1586 if (!multifd_recv_state) { 1587 /* Called before any connections created */ 1588 return false; 1589 } 1590 1591 return thread_count == qatomic_read(&multifd_recv_state->count); 1592 } 1593 1594 /* 1595 * Try to receive all multifd channels to get ready for the migration. 1596 * Sets @errp when failing to receive the current channel. 1597 */ 1598 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1599 { 1600 MultiFDRecvParams *p; 1601 Error *local_err = NULL; 1602 bool use_packets = multifd_use_packets(); 1603 int id; 1604 1605 if (use_packets) { 1606 id = multifd_recv_initial_packet(ioc, &local_err); 1607 if (id < 0) { 1608 multifd_recv_terminate_threads(local_err); 1609 error_propagate_prepend(errp, local_err, 1610 "failed to receive packet" 1611 " via multifd channel %d: ", 1612 qatomic_read(&multifd_recv_state->count)); 1613 return; 1614 } 1615 trace_multifd_recv_new_channel(id); 1616 } else { 1617 id = qatomic_read(&multifd_recv_state->count); 1618 } 1619 1620 p = &multifd_recv_state->params[id]; 1621 if (p->c != NULL) { 1622 error_setg(&local_err, "multifd: received id '%d' already setup'", 1623 id); 1624 multifd_recv_terminate_threads(local_err); 1625 error_propagate(errp, local_err); 1626 return; 1627 } 1628 p->c = ioc; 1629 object_ref(OBJECT(ioc)); 1630 1631 p->thread_created = true; 1632 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1633 QEMU_THREAD_JOINABLE); 1634 qatomic_inc(&multifd_recv_state->count); 1635 } 1636