1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "sysemu/sysemu.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* array of pages to sent */ 53 MultiFDPages_t *pages; 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 static bool multifd_use_packets(void) 102 { 103 return !migrate_mapped_ram(); 104 } 105 106 void multifd_send_channel_created(void) 107 { 108 qemu_sem_post(&multifd_send_state->channels_created); 109 } 110 111 static void multifd_set_file_bitmap(MultiFDSendParams *p) 112 { 113 MultiFDPages_t *pages = p->pages; 114 115 assert(pages->block); 116 117 for (int i = 0; i < p->pages->normal_num; i++) { 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); 119 } 120 121 for (int i = p->pages->normal_num; i < p->pages->num; i++) { 122 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); 123 } 124 } 125 126 /* Multifd without compression */ 127 128 /** 129 * nocomp_send_setup: setup send side 130 * 131 * @p: Params for the channel that we are using 132 * @errp: pointer to an error 133 */ 134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 135 { 136 if (migrate_zero_copy_send()) { 137 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 138 } 139 140 return 0; 141 } 142 143 /** 144 * nocomp_send_cleanup: cleanup send side 145 * 146 * For no compression this function does nothing. 147 * 148 * @p: Params for the channel that we are using 149 * @errp: pointer to an error 150 */ 151 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 152 { 153 return; 154 } 155 156 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 157 { 158 MultiFDPages_t *pages = p->pages; 159 160 for (int i = 0; i < pages->normal_num; i++) { 161 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 162 p->iov[p->iovs_num].iov_len = p->page_size; 163 p->iovs_num++; 164 } 165 166 p->next_packet_size = pages->normal_num * p->page_size; 167 } 168 169 /** 170 * nocomp_send_prepare: prepare date to be able to send 171 * 172 * For no compression we just have to calculate the size of the 173 * packet. 174 * 175 * Returns 0 for success or -1 for error 176 * 177 * @p: Params for the channel that we are using 178 * @errp: pointer to an error 179 */ 180 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 181 { 182 bool use_zero_copy_send = migrate_zero_copy_send(); 183 int ret; 184 185 multifd_send_zero_page_detect(p); 186 187 if (!multifd_use_packets()) { 188 multifd_send_prepare_iovs(p); 189 multifd_set_file_bitmap(p); 190 191 return 0; 192 } 193 194 if (!use_zero_copy_send) { 195 /* 196 * Only !zerocopy needs the header in IOV; zerocopy will 197 * send it separately. 198 */ 199 multifd_send_prepare_header(p); 200 } 201 202 multifd_send_prepare_iovs(p); 203 p->flags |= MULTIFD_FLAG_NOCOMP; 204 205 multifd_send_fill_packet(p); 206 207 if (use_zero_copy_send) { 208 /* Send header first, without zerocopy */ 209 ret = qio_channel_write_all(p->c, (void *)p->packet, 210 p->packet_len, errp); 211 if (ret != 0) { 212 return -1; 213 } 214 } 215 216 return 0; 217 } 218 219 /** 220 * nocomp_recv_setup: setup receive side 221 * 222 * For no compression this function does nothing. 223 * 224 * Returns 0 for success or -1 for error 225 * 226 * @p: Params for the channel that we are using 227 * @errp: pointer to an error 228 */ 229 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 230 { 231 return 0; 232 } 233 234 /** 235 * nocomp_recv_cleanup: setup receive side 236 * 237 * For no compression this function does nothing. 238 * 239 * @p: Params for the channel that we are using 240 */ 241 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 242 { 243 } 244 245 /** 246 * nocomp_recv: read the data from the channel 247 * 248 * For no compression we just need to read things into the correct place. 249 * 250 * Returns 0 for success or -1 for error 251 * 252 * @p: Params for the channel that we are using 253 * @errp: pointer to an error 254 */ 255 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 256 { 257 uint32_t flags; 258 259 if (!multifd_use_packets()) { 260 return multifd_file_recv_data(p, errp); 261 } 262 263 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 264 265 if (flags != MULTIFD_FLAG_NOCOMP) { 266 error_setg(errp, "multifd %u: flags received %x flags expected %x", 267 p->id, flags, MULTIFD_FLAG_NOCOMP); 268 return -1; 269 } 270 271 multifd_recv_zero_page_process(p); 272 273 if (!p->normal_num) { 274 return 0; 275 } 276 277 for (int i = 0; i < p->normal_num; i++) { 278 p->iov[i].iov_base = p->host + p->normal[i]; 279 p->iov[i].iov_len = p->page_size; 280 } 281 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 282 } 283 284 static MultiFDMethods multifd_nocomp_ops = { 285 .send_setup = nocomp_send_setup, 286 .send_cleanup = nocomp_send_cleanup, 287 .send_prepare = nocomp_send_prepare, 288 .recv_setup = nocomp_recv_setup, 289 .recv_cleanup = nocomp_recv_cleanup, 290 .recv = nocomp_recv 291 }; 292 293 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 294 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 295 }; 296 297 void multifd_register_ops(int method, MultiFDMethods *ops) 298 { 299 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 300 multifd_ops[method] = ops; 301 } 302 303 /* Reset a MultiFDPages_t* object for the next use */ 304 static void multifd_pages_reset(MultiFDPages_t *pages) 305 { 306 /* 307 * We don't need to touch offset[] array, because it will be 308 * overwritten later when reused. 309 */ 310 pages->num = 0; 311 pages->normal_num = 0; 312 pages->block = NULL; 313 } 314 315 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 316 { 317 MultiFDInit_t msg = {}; 318 size_t size = sizeof(msg); 319 int ret; 320 321 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 322 msg.version = cpu_to_be32(MULTIFD_VERSION); 323 msg.id = p->id; 324 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 325 326 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 327 if (ret != 0) { 328 return -1; 329 } 330 stat64_add(&mig_stats.multifd_bytes, size); 331 return 0; 332 } 333 334 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 335 { 336 MultiFDInit_t msg; 337 int ret; 338 339 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 340 if (ret != 0) { 341 return -1; 342 } 343 344 msg.magic = be32_to_cpu(msg.magic); 345 msg.version = be32_to_cpu(msg.version); 346 347 if (msg.magic != MULTIFD_MAGIC) { 348 error_setg(errp, "multifd: received packet magic %x " 349 "expected %x", msg.magic, MULTIFD_MAGIC); 350 return -1; 351 } 352 353 if (msg.version != MULTIFD_VERSION) { 354 error_setg(errp, "multifd: received packet version %u " 355 "expected %u", msg.version, MULTIFD_VERSION); 356 return -1; 357 } 358 359 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 360 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 361 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 362 363 error_setg(errp, "multifd: received uuid '%s' and expected " 364 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 365 g_free(uuid); 366 g_free(msg_uuid); 367 return -1; 368 } 369 370 if (msg.id > migrate_multifd_channels()) { 371 error_setg(errp, "multifd: received channel id %u is greater than " 372 "number of channels %u", msg.id, migrate_multifd_channels()); 373 return -1; 374 } 375 376 return msg.id; 377 } 378 379 static MultiFDPages_t *multifd_pages_init(uint32_t n) 380 { 381 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 382 383 pages->allocated = n; 384 pages->offset = g_new0(ram_addr_t, n); 385 386 return pages; 387 } 388 389 static void multifd_pages_clear(MultiFDPages_t *pages) 390 { 391 multifd_pages_reset(pages); 392 pages->allocated = 0; 393 g_free(pages->offset); 394 pages->offset = NULL; 395 g_free(pages); 396 } 397 398 void multifd_send_fill_packet(MultiFDSendParams *p) 399 { 400 MultiFDPacket_t *packet = p->packet; 401 MultiFDPages_t *pages = p->pages; 402 uint64_t packet_num; 403 uint32_t zero_num = pages->num - pages->normal_num; 404 int i; 405 406 packet->flags = cpu_to_be32(p->flags); 407 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 408 packet->normal_pages = cpu_to_be32(pages->normal_num); 409 packet->zero_pages = cpu_to_be32(zero_num); 410 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 411 412 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 413 packet->packet_num = cpu_to_be64(packet_num); 414 415 if (pages->block) { 416 strncpy(packet->ramblock, pages->block->idstr, 256); 417 } 418 419 for (i = 0; i < pages->num; i++) { 420 /* there are architectures where ram_addr_t is 32 bit */ 421 uint64_t temp = pages->offset[i]; 422 423 packet->offset[i] = cpu_to_be64(temp); 424 } 425 426 p->packets_sent++; 427 p->total_normal_pages += pages->normal_num; 428 p->total_zero_pages += zero_num; 429 430 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, 431 p->flags, p->next_packet_size); 432 } 433 434 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 435 { 436 MultiFDPacket_t *packet = p->packet; 437 int i; 438 439 packet->magic = be32_to_cpu(packet->magic); 440 if (packet->magic != MULTIFD_MAGIC) { 441 error_setg(errp, "multifd: received packet " 442 "magic %x and expected magic %x", 443 packet->magic, MULTIFD_MAGIC); 444 return -1; 445 } 446 447 packet->version = be32_to_cpu(packet->version); 448 if (packet->version != MULTIFD_VERSION) { 449 error_setg(errp, "multifd: received packet " 450 "version %u and expected version %u", 451 packet->version, MULTIFD_VERSION); 452 return -1; 453 } 454 455 p->flags = be32_to_cpu(packet->flags); 456 457 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 458 /* 459 * If we received a packet that is 100 times bigger than expected 460 * just stop migration. It is a magic number. 461 */ 462 if (packet->pages_alloc > p->page_count) { 463 error_setg(errp, "multifd: received packet " 464 "with size %u and expected a size of %u", 465 packet->pages_alloc, p->page_count) ; 466 return -1; 467 } 468 469 p->normal_num = be32_to_cpu(packet->normal_pages); 470 if (p->normal_num > packet->pages_alloc) { 471 error_setg(errp, "multifd: received packet " 472 "with %u normal pages and expected maximum pages are %u", 473 p->normal_num, packet->pages_alloc) ; 474 return -1; 475 } 476 477 p->zero_num = be32_to_cpu(packet->zero_pages); 478 if (p->zero_num > packet->pages_alloc - p->normal_num) { 479 error_setg(errp, "multifd: received packet " 480 "with %u zero pages and expected maximum zero pages are %u", 481 p->zero_num, packet->pages_alloc - p->normal_num) ; 482 return -1; 483 } 484 485 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 486 p->packet_num = be64_to_cpu(packet->packet_num); 487 p->packets_recved++; 488 p->total_normal_pages += p->normal_num; 489 p->total_zero_pages += p->zero_num; 490 491 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, 492 p->flags, p->next_packet_size); 493 494 if (p->normal_num == 0 && p->zero_num == 0) { 495 return 0; 496 } 497 498 /* make sure that ramblock is 0 terminated */ 499 packet->ramblock[255] = 0; 500 p->block = qemu_ram_block_by_name(packet->ramblock); 501 if (!p->block) { 502 error_setg(errp, "multifd: unknown ram block %s", 503 packet->ramblock); 504 return -1; 505 } 506 507 p->host = p->block->host; 508 for (i = 0; i < p->normal_num; i++) { 509 uint64_t offset = be64_to_cpu(packet->offset[i]); 510 511 if (offset > (p->block->used_length - p->page_size)) { 512 error_setg(errp, "multifd: offset too long %" PRIu64 513 " (max " RAM_ADDR_FMT ")", 514 offset, p->block->used_length); 515 return -1; 516 } 517 p->normal[i] = offset; 518 } 519 520 for (i = 0; i < p->zero_num; i++) { 521 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); 522 523 if (offset > (p->block->used_length - p->page_size)) { 524 error_setg(errp, "multifd: offset too long %" PRIu64 525 " (max " RAM_ADDR_FMT ")", 526 offset, p->block->used_length); 527 return -1; 528 } 529 p->zero[i] = offset; 530 } 531 532 return 0; 533 } 534 535 static bool multifd_send_should_exit(void) 536 { 537 return qatomic_read(&multifd_send_state->exiting); 538 } 539 540 static bool multifd_recv_should_exit(void) 541 { 542 return qatomic_read(&multifd_recv_state->exiting); 543 } 544 545 /* 546 * The migration thread can wait on either of the two semaphores. This 547 * function can be used to kick the main thread out of waiting on either of 548 * them. Should mostly only be called when something wrong happened with 549 * the current multifd send thread. 550 */ 551 static void multifd_send_kick_main(MultiFDSendParams *p) 552 { 553 qemu_sem_post(&p->sem_sync); 554 qemu_sem_post(&multifd_send_state->channels_ready); 555 } 556 557 /* 558 * How we use multifd_send_state->pages and channel->pages? 559 * 560 * We create a pages for each channel, and a main one. Each time that 561 * we need to send a batch of pages we interchange the ones between 562 * multifd_send_state and the channel that is sending it. There are 563 * two reasons for that: 564 * - to not have to do so many mallocs during migration 565 * - to make easier to know what to free at the end of migration 566 * 567 * This way we always know who is the owner of each "pages" struct, 568 * and we don't need any locking. It belongs to the migration thread 569 * or to the channel thread. Switching is safe because the migration 570 * thread is using the channel mutex when changing it, and the channel 571 * have to had finish with its own, otherwise pending_job can't be 572 * false. 573 * 574 * Returns true if succeed, false otherwise. 575 */ 576 static bool multifd_send_pages(void) 577 { 578 int i; 579 static int next_channel; 580 MultiFDSendParams *p = NULL; /* make happy gcc */ 581 MultiFDPages_t *pages = multifd_send_state->pages; 582 583 if (multifd_send_should_exit()) { 584 return false; 585 } 586 587 /* We wait here, until at least one channel is ready */ 588 qemu_sem_wait(&multifd_send_state->channels_ready); 589 590 /* 591 * next_channel can remain from a previous migration that was 592 * using more channels, so ensure it doesn't overflow if the 593 * limit is lower now. 594 */ 595 next_channel %= migrate_multifd_channels(); 596 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 597 if (multifd_send_should_exit()) { 598 return false; 599 } 600 p = &multifd_send_state->params[i]; 601 /* 602 * Lockless read to p->pending_job is safe, because only multifd 603 * sender thread can clear it. 604 */ 605 if (qatomic_read(&p->pending_job) == false) { 606 next_channel = (i + 1) % migrate_multifd_channels(); 607 break; 608 } 609 } 610 611 /* 612 * Make sure we read p->pending_job before all the rest. Pairs with 613 * qatomic_store_release() in multifd_send_thread(). 614 */ 615 smp_mb_acquire(); 616 assert(!p->pages->num); 617 multifd_send_state->pages = p->pages; 618 p->pages = pages; 619 /* 620 * Making sure p->pages is setup before marking pending_job=true. Pairs 621 * with the qatomic_load_acquire() in multifd_send_thread(). 622 */ 623 qatomic_store_release(&p->pending_job, true); 624 qemu_sem_post(&p->sem); 625 626 return true; 627 } 628 629 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 630 { 631 return pages->num == 0; 632 } 633 634 static inline bool multifd_queue_full(MultiFDPages_t *pages) 635 { 636 return pages->num == pages->allocated; 637 } 638 639 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 640 { 641 pages->offset[pages->num++] = offset; 642 } 643 644 /* Returns true if enqueue successful, false otherwise */ 645 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 646 { 647 MultiFDPages_t *pages; 648 649 retry: 650 pages = multifd_send_state->pages; 651 652 /* If the queue is empty, we can already enqueue now */ 653 if (multifd_queue_empty(pages)) { 654 pages->block = block; 655 multifd_enqueue(pages, offset); 656 return true; 657 } 658 659 /* 660 * Not empty, meanwhile we need a flush. It can because of either: 661 * 662 * (1) The page is not on the same ramblock of previous ones, or, 663 * (2) The queue is full. 664 * 665 * After flush, always retry. 666 */ 667 if (pages->block != block || multifd_queue_full(pages)) { 668 if (!multifd_send_pages()) { 669 return false; 670 } 671 goto retry; 672 } 673 674 /* Not empty, and we still have space, do it! */ 675 multifd_enqueue(pages, offset); 676 return true; 677 } 678 679 /* Multifd send side hit an error; remember it and prepare to quit */ 680 static void multifd_send_set_error(Error *err) 681 { 682 /* 683 * We don't want to exit each threads twice. Depending on where 684 * we get the error, or if there are two independent errors in two 685 * threads at the same time, we can end calling this function 686 * twice. 687 */ 688 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 689 return; 690 } 691 692 if (err) { 693 MigrationState *s = migrate_get_current(); 694 migrate_set_error(s, err); 695 if (s->state == MIGRATION_STATUS_SETUP || 696 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 697 s->state == MIGRATION_STATUS_DEVICE || 698 s->state == MIGRATION_STATUS_ACTIVE) { 699 migrate_set_state(&s->state, s->state, 700 MIGRATION_STATUS_FAILED); 701 } 702 } 703 } 704 705 static void multifd_send_terminate_threads(void) 706 { 707 int i; 708 709 trace_multifd_send_terminate_threads(); 710 711 /* 712 * Tell everyone we're quitting. No xchg() needed here; we simply 713 * always set it. 714 */ 715 qatomic_set(&multifd_send_state->exiting, 1); 716 717 /* 718 * Firstly, kick all threads out; no matter whether they are just idle, 719 * or blocked in an IO system call. 720 */ 721 for (i = 0; i < migrate_multifd_channels(); i++) { 722 MultiFDSendParams *p = &multifd_send_state->params[i]; 723 724 qemu_sem_post(&p->sem); 725 if (p->c) { 726 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 727 } 728 } 729 730 /* 731 * Finally recycle all the threads. 732 */ 733 for (i = 0; i < migrate_multifd_channels(); i++) { 734 MultiFDSendParams *p = &multifd_send_state->params[i]; 735 736 if (p->tls_thread_created) { 737 qemu_thread_join(&p->tls_thread); 738 } 739 740 if (p->thread_created) { 741 qemu_thread_join(&p->thread); 742 } 743 } 744 } 745 746 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 747 { 748 if (p->c) { 749 migration_ioc_unregister_yank(p->c); 750 /* 751 * The object_unref() cannot guarantee the fd will always be 752 * released because finalize() of the iochannel is only 753 * triggered on the last reference and it's not guaranteed 754 * that we always hold the last refcount when reaching here. 755 * 756 * Closing the fd explicitly has the benefit that if there is any 757 * registered I/O handler callbacks on such fd, that will get a 758 * POLLNVAL event and will further trigger the cleanup to finally 759 * release the IOC. 760 * 761 * FIXME: It should logically be guaranteed that all multifd 762 * channels have no I/O handler callback registered when reaching 763 * here, because migration thread will wait for all multifd channel 764 * establishments to complete during setup. Since 765 * migrate_fd_cleanup() will be scheduled in main thread too, all 766 * previous callbacks should guarantee to be completed when 767 * reaching here. See multifd_send_state.channels_created and its 768 * usage. In the future, we could replace this with an assert 769 * making sure we're the last reference, or simply drop it if above 770 * is more clear to be justified. 771 */ 772 qio_channel_close(p->c, &error_abort); 773 object_unref(OBJECT(p->c)); 774 p->c = NULL; 775 } 776 qemu_sem_destroy(&p->sem); 777 qemu_sem_destroy(&p->sem_sync); 778 g_free(p->name); 779 p->name = NULL; 780 multifd_pages_clear(p->pages); 781 p->pages = NULL; 782 p->packet_len = 0; 783 g_free(p->packet); 784 p->packet = NULL; 785 g_free(p->iov); 786 p->iov = NULL; 787 multifd_send_state->ops->send_cleanup(p, errp); 788 789 return *errp == NULL; 790 } 791 792 static void multifd_send_cleanup_state(void) 793 { 794 file_cleanup_outgoing_migration(); 795 socket_cleanup_outgoing_migration(); 796 qemu_sem_destroy(&multifd_send_state->channels_created); 797 qemu_sem_destroy(&multifd_send_state->channels_ready); 798 g_free(multifd_send_state->params); 799 multifd_send_state->params = NULL; 800 multifd_pages_clear(multifd_send_state->pages); 801 multifd_send_state->pages = NULL; 802 g_free(multifd_send_state); 803 multifd_send_state = NULL; 804 } 805 806 void multifd_send_shutdown(void) 807 { 808 int i; 809 810 if (!migrate_multifd()) { 811 return; 812 } 813 814 multifd_send_terminate_threads(); 815 816 for (i = 0; i < migrate_multifd_channels(); i++) { 817 MultiFDSendParams *p = &multifd_send_state->params[i]; 818 Error *local_err = NULL; 819 820 if (!multifd_send_cleanup_channel(p, &local_err)) { 821 migrate_set_error(migrate_get_current(), local_err); 822 error_free(local_err); 823 } 824 } 825 826 multifd_send_cleanup_state(); 827 } 828 829 static int multifd_zero_copy_flush(QIOChannel *c) 830 { 831 int ret; 832 Error *err = NULL; 833 834 ret = qio_channel_flush(c, &err); 835 if (ret < 0) { 836 error_report_err(err); 837 return -1; 838 } 839 if (ret == 1) { 840 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 841 } 842 843 return ret; 844 } 845 846 int multifd_send_sync_main(void) 847 { 848 int i; 849 bool flush_zero_copy; 850 851 if (!migrate_multifd()) { 852 return 0; 853 } 854 if (multifd_send_state->pages->num) { 855 if (!multifd_send_pages()) { 856 error_report("%s: multifd_send_pages fail", __func__); 857 return -1; 858 } 859 } 860 861 flush_zero_copy = migrate_zero_copy_send(); 862 863 for (i = 0; i < migrate_multifd_channels(); i++) { 864 MultiFDSendParams *p = &multifd_send_state->params[i]; 865 866 if (multifd_send_should_exit()) { 867 return -1; 868 } 869 870 trace_multifd_send_sync_main_signal(p->id); 871 872 /* 873 * We should be the only user so far, so not possible to be set by 874 * others concurrently. 875 */ 876 assert(qatomic_read(&p->pending_sync) == false); 877 qatomic_set(&p->pending_sync, true); 878 qemu_sem_post(&p->sem); 879 } 880 for (i = 0; i < migrate_multifd_channels(); i++) { 881 MultiFDSendParams *p = &multifd_send_state->params[i]; 882 883 if (multifd_send_should_exit()) { 884 return -1; 885 } 886 887 qemu_sem_wait(&multifd_send_state->channels_ready); 888 trace_multifd_send_sync_main_wait(p->id); 889 qemu_sem_wait(&p->sem_sync); 890 891 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 892 return -1; 893 } 894 } 895 trace_multifd_send_sync_main(multifd_send_state->packet_num); 896 897 return 0; 898 } 899 900 static void *multifd_send_thread(void *opaque) 901 { 902 MultiFDSendParams *p = opaque; 903 MigrationThread *thread = NULL; 904 Error *local_err = NULL; 905 int ret = 0; 906 bool use_packets = multifd_use_packets(); 907 908 thread = migration_threads_add(p->name, qemu_get_thread_id()); 909 910 trace_multifd_send_thread_start(p->id); 911 rcu_register_thread(); 912 913 if (use_packets) { 914 if (multifd_send_initial_packet(p, &local_err) < 0) { 915 ret = -1; 916 goto out; 917 } 918 } 919 920 while (true) { 921 qemu_sem_post(&multifd_send_state->channels_ready); 922 qemu_sem_wait(&p->sem); 923 924 if (multifd_send_should_exit()) { 925 break; 926 } 927 928 /* 929 * Read pending_job flag before p->pages. Pairs with the 930 * qatomic_store_release() in multifd_send_pages(). 931 */ 932 if (qatomic_load_acquire(&p->pending_job)) { 933 MultiFDPages_t *pages = p->pages; 934 935 p->iovs_num = 0; 936 assert(pages->num); 937 938 ret = multifd_send_state->ops->send_prepare(p, &local_err); 939 if (ret != 0) { 940 break; 941 } 942 943 if (migrate_mapped_ram()) { 944 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 945 p->pages->block, &local_err); 946 } else { 947 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 948 NULL, 0, p->write_flags, 949 &local_err); 950 } 951 952 if (ret != 0) { 953 break; 954 } 955 956 stat64_add(&mig_stats.multifd_bytes, 957 p->next_packet_size + p->packet_len); 958 stat64_add(&mig_stats.normal_pages, pages->normal_num); 959 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); 960 961 multifd_pages_reset(p->pages); 962 p->next_packet_size = 0; 963 964 /* 965 * Making sure p->pages is published before saying "we're 966 * free". Pairs with the smp_mb_acquire() in 967 * multifd_send_pages(). 968 */ 969 qatomic_store_release(&p->pending_job, false); 970 } else { 971 /* 972 * If not a normal job, must be a sync request. Note that 973 * pending_sync is a standalone flag (unlike pending_job), so 974 * it doesn't require explicit memory barriers. 975 */ 976 assert(qatomic_read(&p->pending_sync)); 977 978 if (use_packets) { 979 p->flags = MULTIFD_FLAG_SYNC; 980 multifd_send_fill_packet(p); 981 ret = qio_channel_write_all(p->c, (void *)p->packet, 982 p->packet_len, &local_err); 983 if (ret != 0) { 984 break; 985 } 986 /* p->next_packet_size will always be zero for a SYNC packet */ 987 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 988 p->flags = 0; 989 } 990 991 qatomic_set(&p->pending_sync, false); 992 qemu_sem_post(&p->sem_sync); 993 } 994 } 995 996 out: 997 if (ret) { 998 assert(local_err); 999 trace_multifd_send_error(p->id); 1000 multifd_send_set_error(local_err); 1001 multifd_send_kick_main(p); 1002 error_free(local_err); 1003 } 1004 1005 rcu_unregister_thread(); 1006 migration_threads_remove(thread); 1007 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, 1008 p->total_zero_pages); 1009 1010 return NULL; 1011 } 1012 1013 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 1014 1015 typedef struct { 1016 MultiFDSendParams *p; 1017 QIOChannelTLS *tioc; 1018 } MultiFDTLSThreadArgs; 1019 1020 static void *multifd_tls_handshake_thread(void *opaque) 1021 { 1022 MultiFDTLSThreadArgs *args = opaque; 1023 1024 qio_channel_tls_handshake(args->tioc, 1025 multifd_new_send_channel_async, 1026 args->p, 1027 NULL, 1028 NULL); 1029 g_free(args); 1030 1031 return NULL; 1032 } 1033 1034 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 1035 QIOChannel *ioc, 1036 Error **errp) 1037 { 1038 MigrationState *s = migrate_get_current(); 1039 const char *hostname = s->hostname; 1040 MultiFDTLSThreadArgs *args; 1041 QIOChannelTLS *tioc; 1042 1043 tioc = migration_tls_client_create(ioc, hostname, errp); 1044 if (!tioc) { 1045 return false; 1046 } 1047 1048 /* 1049 * Ownership of the socket channel now transfers to the newly 1050 * created TLS channel, which has already taken a reference. 1051 */ 1052 object_unref(OBJECT(ioc)); 1053 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1054 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1055 1056 args = g_new0(MultiFDTLSThreadArgs, 1); 1057 args->tioc = tioc; 1058 args->p = p; 1059 1060 p->tls_thread_created = true; 1061 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 1062 multifd_tls_handshake_thread, args, 1063 QEMU_THREAD_JOINABLE); 1064 return true; 1065 } 1066 1067 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1068 { 1069 qio_channel_set_delay(ioc, false); 1070 1071 migration_ioc_register_yank(ioc); 1072 /* Setup p->c only if the channel is completely setup */ 1073 p->c = ioc; 1074 1075 p->thread_created = true; 1076 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1077 QEMU_THREAD_JOINABLE); 1078 } 1079 1080 /* 1081 * When TLS is enabled this function is called once to establish the 1082 * TLS connection and a second time after the TLS handshake to create 1083 * the multifd channel. Without TLS it goes straight into the channel 1084 * creation. 1085 */ 1086 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1087 { 1088 MultiFDSendParams *p = opaque; 1089 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1090 Error *local_err = NULL; 1091 bool ret; 1092 1093 trace_multifd_new_send_channel_async(p->id); 1094 1095 if (qio_task_propagate_error(task, &local_err)) { 1096 ret = false; 1097 goto out; 1098 } 1099 1100 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1101 migrate_get_current()->hostname); 1102 1103 if (migrate_channel_requires_tls_upgrade(ioc)) { 1104 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1105 if (ret) { 1106 return; 1107 } 1108 } else { 1109 multifd_channel_connect(p, ioc); 1110 ret = true; 1111 } 1112 1113 out: 1114 /* 1115 * Here we're not interested whether creation succeeded, only that 1116 * it happened at all. 1117 */ 1118 multifd_send_channel_created(); 1119 1120 if (ret) { 1121 return; 1122 } 1123 1124 trace_multifd_new_send_channel_async_error(p->id, local_err); 1125 multifd_send_set_error(local_err); 1126 /* 1127 * For error cases (TLS or non-TLS), IO channel is always freed here 1128 * rather than when cleanup multifd: since p->c is not set, multifd 1129 * cleanup code doesn't even know its existence. 1130 */ 1131 object_unref(OBJECT(ioc)); 1132 error_free(local_err); 1133 } 1134 1135 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1136 { 1137 if (!multifd_use_packets()) { 1138 return file_send_channel_create(opaque, errp); 1139 } 1140 1141 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1142 return true; 1143 } 1144 1145 bool multifd_send_setup(void) 1146 { 1147 MigrationState *s = migrate_get_current(); 1148 Error *local_err = NULL; 1149 int thread_count, ret = 0; 1150 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1151 bool use_packets = multifd_use_packets(); 1152 uint8_t i; 1153 1154 if (!migrate_multifd()) { 1155 return true; 1156 } 1157 1158 thread_count = migrate_multifd_channels(); 1159 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1160 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1161 multifd_send_state->pages = multifd_pages_init(page_count); 1162 qemu_sem_init(&multifd_send_state->channels_created, 0); 1163 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1164 qatomic_set(&multifd_send_state->exiting, 0); 1165 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1166 1167 for (i = 0; i < thread_count; i++) { 1168 MultiFDSendParams *p = &multifd_send_state->params[i]; 1169 1170 qemu_sem_init(&p->sem, 0); 1171 qemu_sem_init(&p->sem_sync, 0); 1172 p->id = i; 1173 p->pages = multifd_pages_init(page_count); 1174 1175 if (use_packets) { 1176 p->packet_len = sizeof(MultiFDPacket_t) 1177 + sizeof(uint64_t) * page_count; 1178 p->packet = g_malloc0(p->packet_len); 1179 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1180 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1181 1182 /* We need one extra place for the packet header */ 1183 p->iov = g_new0(struct iovec, page_count + 1); 1184 } else { 1185 p->iov = g_new0(struct iovec, page_count); 1186 } 1187 p->name = g_strdup_printf("multifdsend_%d", i); 1188 p->page_size = qemu_target_page_size(); 1189 p->page_count = page_count; 1190 p->write_flags = 0; 1191 1192 if (!multifd_new_send_channel_create(p, &local_err)) { 1193 return false; 1194 } 1195 } 1196 1197 /* 1198 * Wait until channel creation has started for all channels. The 1199 * creation can still fail, but no more channels will be created 1200 * past this point. 1201 */ 1202 for (i = 0; i < thread_count; i++) { 1203 qemu_sem_wait(&multifd_send_state->channels_created); 1204 } 1205 1206 for (i = 0; i < thread_count; i++) { 1207 MultiFDSendParams *p = &multifd_send_state->params[i]; 1208 1209 ret = multifd_send_state->ops->send_setup(p, &local_err); 1210 if (ret) { 1211 break; 1212 } 1213 } 1214 1215 if (ret) { 1216 migrate_set_error(s, local_err); 1217 error_report_err(local_err); 1218 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1219 MIGRATION_STATUS_FAILED); 1220 return false; 1221 } 1222 1223 return true; 1224 } 1225 1226 bool multifd_recv(void) 1227 { 1228 int i; 1229 static int next_recv_channel; 1230 MultiFDRecvParams *p = NULL; 1231 MultiFDRecvData *data = multifd_recv_state->data; 1232 1233 /* 1234 * next_channel can remain from a previous migration that was 1235 * using more channels, so ensure it doesn't overflow if the 1236 * limit is lower now. 1237 */ 1238 next_recv_channel %= migrate_multifd_channels(); 1239 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1240 if (multifd_recv_should_exit()) { 1241 return false; 1242 } 1243 1244 p = &multifd_recv_state->params[i]; 1245 1246 if (qatomic_read(&p->pending_job) == false) { 1247 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1248 break; 1249 } 1250 } 1251 1252 /* 1253 * Order pending_job read before manipulating p->data below. Pairs 1254 * with qatomic_store_release() at multifd_recv_thread(). 1255 */ 1256 smp_mb_acquire(); 1257 1258 assert(!p->data->size); 1259 multifd_recv_state->data = p->data; 1260 p->data = data; 1261 1262 /* 1263 * Order p->data update before setting pending_job. Pairs with 1264 * qatomic_load_acquire() at multifd_recv_thread(). 1265 */ 1266 qatomic_store_release(&p->pending_job, true); 1267 qemu_sem_post(&p->sem); 1268 1269 return true; 1270 } 1271 1272 MultiFDRecvData *multifd_get_recv_data(void) 1273 { 1274 return multifd_recv_state->data; 1275 } 1276 1277 static void multifd_recv_terminate_threads(Error *err) 1278 { 1279 int i; 1280 1281 trace_multifd_recv_terminate_threads(err != NULL); 1282 1283 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1284 return; 1285 } 1286 1287 if (err) { 1288 MigrationState *s = migrate_get_current(); 1289 migrate_set_error(s, err); 1290 if (s->state == MIGRATION_STATUS_SETUP || 1291 s->state == MIGRATION_STATUS_ACTIVE) { 1292 migrate_set_state(&s->state, s->state, 1293 MIGRATION_STATUS_FAILED); 1294 } 1295 } 1296 1297 for (i = 0; i < migrate_multifd_channels(); i++) { 1298 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1299 1300 /* 1301 * The migration thread and channels interact differently 1302 * depending on the presence of packets. 1303 */ 1304 if (multifd_use_packets()) { 1305 /* 1306 * The channel receives as long as there are packets. When 1307 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1308 * channel waits for the migration thread to sync. If the 1309 * sync never happens, do it here. 1310 */ 1311 qemu_sem_post(&p->sem_sync); 1312 } else { 1313 /* 1314 * The channel waits for the migration thread to give it 1315 * work. When the migration thread runs out of work, it 1316 * releases the channel and waits for any pending work to 1317 * finish. If we reach here (e.g. due to error) before the 1318 * work runs out, release the channel. 1319 */ 1320 qemu_sem_post(&p->sem); 1321 } 1322 1323 /* 1324 * We could arrive here for two reasons: 1325 * - normal quit, i.e. everything went fine, just finished 1326 * - error quit: We close the channels so the channel threads 1327 * finish the qio_channel_read_all_eof() 1328 */ 1329 if (p->c) { 1330 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1331 } 1332 } 1333 } 1334 1335 void multifd_recv_shutdown(void) 1336 { 1337 if (migrate_multifd()) { 1338 multifd_recv_terminate_threads(NULL); 1339 } 1340 } 1341 1342 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1343 { 1344 migration_ioc_unregister_yank(p->c); 1345 object_unref(OBJECT(p->c)); 1346 p->c = NULL; 1347 qemu_mutex_destroy(&p->mutex); 1348 qemu_sem_destroy(&p->sem_sync); 1349 qemu_sem_destroy(&p->sem); 1350 g_free(p->name); 1351 p->name = NULL; 1352 p->packet_len = 0; 1353 g_free(p->packet); 1354 p->packet = NULL; 1355 g_free(p->iov); 1356 p->iov = NULL; 1357 g_free(p->normal); 1358 p->normal = NULL; 1359 g_free(p->zero); 1360 p->zero = NULL; 1361 multifd_recv_state->ops->recv_cleanup(p); 1362 } 1363 1364 static void multifd_recv_cleanup_state(void) 1365 { 1366 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1367 g_free(multifd_recv_state->params); 1368 multifd_recv_state->params = NULL; 1369 g_free(multifd_recv_state->data); 1370 multifd_recv_state->data = NULL; 1371 g_free(multifd_recv_state); 1372 multifd_recv_state = NULL; 1373 } 1374 1375 void multifd_recv_cleanup(void) 1376 { 1377 int i; 1378 1379 if (!migrate_multifd()) { 1380 return; 1381 } 1382 multifd_recv_terminate_threads(NULL); 1383 for (i = 0; i < migrate_multifd_channels(); i++) { 1384 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1385 1386 if (p->thread_created) { 1387 qemu_thread_join(&p->thread); 1388 } 1389 } 1390 for (i = 0; i < migrate_multifd_channels(); i++) { 1391 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1392 } 1393 multifd_recv_cleanup_state(); 1394 } 1395 1396 void multifd_recv_sync_main(void) 1397 { 1398 int thread_count = migrate_multifd_channels(); 1399 bool file_based = !multifd_use_packets(); 1400 int i; 1401 1402 if (!migrate_multifd()) { 1403 return; 1404 } 1405 1406 /* 1407 * File-based channels don't use packets and therefore need to 1408 * wait for more work. Release them to start the sync. 1409 */ 1410 if (file_based) { 1411 for (i = 0; i < thread_count; i++) { 1412 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1413 1414 trace_multifd_recv_sync_main_signal(p->id); 1415 qemu_sem_post(&p->sem); 1416 } 1417 } 1418 1419 /* 1420 * Initiate the synchronization by waiting for all channels. 1421 * 1422 * For socket-based migration this means each channel has received 1423 * the SYNC packet on the stream. 1424 * 1425 * For file-based migration this means each channel is done with 1426 * the work (pending_job=false). 1427 */ 1428 for (i = 0; i < thread_count; i++) { 1429 trace_multifd_recv_sync_main_wait(i); 1430 qemu_sem_wait(&multifd_recv_state->sem_sync); 1431 } 1432 1433 if (file_based) { 1434 /* 1435 * For file-based loading is done in one iteration. We're 1436 * done. 1437 */ 1438 return; 1439 } 1440 1441 /* 1442 * Sync done. Release the channels for the next iteration. 1443 */ 1444 for (i = 0; i < thread_count; i++) { 1445 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1446 1447 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1448 if (multifd_recv_state->packet_num < p->packet_num) { 1449 multifd_recv_state->packet_num = p->packet_num; 1450 } 1451 } 1452 trace_multifd_recv_sync_main_signal(p->id); 1453 qemu_sem_post(&p->sem_sync); 1454 } 1455 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1456 } 1457 1458 static void *multifd_recv_thread(void *opaque) 1459 { 1460 MultiFDRecvParams *p = opaque; 1461 Error *local_err = NULL; 1462 bool use_packets = multifd_use_packets(); 1463 int ret; 1464 1465 trace_multifd_recv_thread_start(p->id); 1466 rcu_register_thread(); 1467 1468 while (true) { 1469 uint32_t flags = 0; 1470 bool has_data = false; 1471 p->normal_num = 0; 1472 1473 if (use_packets) { 1474 if (multifd_recv_should_exit()) { 1475 break; 1476 } 1477 1478 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1479 p->packet_len, &local_err); 1480 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1481 break; 1482 } 1483 1484 qemu_mutex_lock(&p->mutex); 1485 ret = multifd_recv_unfill_packet(p, &local_err); 1486 if (ret) { 1487 qemu_mutex_unlock(&p->mutex); 1488 break; 1489 } 1490 1491 flags = p->flags; 1492 /* recv methods don't know how to handle the SYNC flag */ 1493 p->flags &= ~MULTIFD_FLAG_SYNC; 1494 has_data = p->normal_num || p->zero_num; 1495 qemu_mutex_unlock(&p->mutex); 1496 } else { 1497 /* 1498 * No packets, so we need to wait for the vmstate code to 1499 * give us work. 1500 */ 1501 qemu_sem_wait(&p->sem); 1502 1503 if (multifd_recv_should_exit()) { 1504 break; 1505 } 1506 1507 /* pairs with qatomic_store_release() at multifd_recv() */ 1508 if (!qatomic_load_acquire(&p->pending_job)) { 1509 /* 1510 * Migration thread did not send work, this is 1511 * equivalent to pending_sync on the sending 1512 * side. Post sem_sync to notify we reached this 1513 * point. 1514 */ 1515 qemu_sem_post(&multifd_recv_state->sem_sync); 1516 continue; 1517 } 1518 1519 has_data = !!p->data->size; 1520 } 1521 1522 if (has_data) { 1523 ret = multifd_recv_state->ops->recv(p, &local_err); 1524 if (ret != 0) { 1525 break; 1526 } 1527 } 1528 1529 if (use_packets) { 1530 if (flags & MULTIFD_FLAG_SYNC) { 1531 qemu_sem_post(&multifd_recv_state->sem_sync); 1532 qemu_sem_wait(&p->sem_sync); 1533 } 1534 } else { 1535 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1536 p->data->size = 0; 1537 /* 1538 * Order data->size update before clearing 1539 * pending_job. Pairs with smp_mb_acquire() at 1540 * multifd_recv(). 1541 */ 1542 qatomic_store_release(&p->pending_job, false); 1543 } 1544 } 1545 1546 if (local_err) { 1547 multifd_recv_terminate_threads(local_err); 1548 error_free(local_err); 1549 } 1550 1551 rcu_unregister_thread(); 1552 trace_multifd_recv_thread_end(p->id, p->packets_recved, 1553 p->total_normal_pages, 1554 p->total_zero_pages); 1555 1556 return NULL; 1557 } 1558 1559 int multifd_recv_setup(Error **errp) 1560 { 1561 int thread_count; 1562 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1563 bool use_packets = multifd_use_packets(); 1564 uint8_t i; 1565 1566 /* 1567 * Return successfully if multiFD recv state is already initialised 1568 * or multiFD is not enabled. 1569 */ 1570 if (multifd_recv_state || !migrate_multifd()) { 1571 return 0; 1572 } 1573 1574 thread_count = migrate_multifd_channels(); 1575 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1576 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1577 1578 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1579 multifd_recv_state->data->size = 0; 1580 1581 qatomic_set(&multifd_recv_state->count, 0); 1582 qatomic_set(&multifd_recv_state->exiting, 0); 1583 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1584 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1585 1586 for (i = 0; i < thread_count; i++) { 1587 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1588 1589 qemu_mutex_init(&p->mutex); 1590 qemu_sem_init(&p->sem_sync, 0); 1591 qemu_sem_init(&p->sem, 0); 1592 p->pending_job = false; 1593 p->id = i; 1594 1595 p->data = g_new0(MultiFDRecvData, 1); 1596 p->data->size = 0; 1597 1598 if (use_packets) { 1599 p->packet_len = sizeof(MultiFDPacket_t) 1600 + sizeof(uint64_t) * page_count; 1601 p->packet = g_malloc0(p->packet_len); 1602 } 1603 p->name = g_strdup_printf("multifdrecv_%d", i); 1604 p->iov = g_new0(struct iovec, page_count); 1605 p->normal = g_new0(ram_addr_t, page_count); 1606 p->zero = g_new0(ram_addr_t, page_count); 1607 p->page_count = page_count; 1608 p->page_size = qemu_target_page_size(); 1609 } 1610 1611 for (i = 0; i < thread_count; i++) { 1612 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1613 int ret; 1614 1615 ret = multifd_recv_state->ops->recv_setup(p, errp); 1616 if (ret) { 1617 return ret; 1618 } 1619 } 1620 return 0; 1621 } 1622 1623 bool multifd_recv_all_channels_created(void) 1624 { 1625 int thread_count = migrate_multifd_channels(); 1626 1627 if (!migrate_multifd()) { 1628 return true; 1629 } 1630 1631 if (!multifd_recv_state) { 1632 /* Called before any connections created */ 1633 return false; 1634 } 1635 1636 return thread_count == qatomic_read(&multifd_recv_state->count); 1637 } 1638 1639 /* 1640 * Try to receive all multifd channels to get ready for the migration. 1641 * Sets @errp when failing to receive the current channel. 1642 */ 1643 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1644 { 1645 MultiFDRecvParams *p; 1646 Error *local_err = NULL; 1647 bool use_packets = multifd_use_packets(); 1648 int id; 1649 1650 if (use_packets) { 1651 id = multifd_recv_initial_packet(ioc, &local_err); 1652 if (id < 0) { 1653 multifd_recv_terminate_threads(local_err); 1654 error_propagate_prepend(errp, local_err, 1655 "failed to receive packet" 1656 " via multifd channel %d: ", 1657 qatomic_read(&multifd_recv_state->count)); 1658 return; 1659 } 1660 trace_multifd_recv_new_channel(id); 1661 } else { 1662 id = qatomic_read(&multifd_recv_state->count); 1663 } 1664 1665 p = &multifd_recv_state->params[id]; 1666 if (p->c != NULL) { 1667 error_setg(&local_err, "multifd: received id '%d' already setup'", 1668 id); 1669 multifd_recv_terminate_threads(local_err); 1670 error_propagate(errp, local_err); 1671 return; 1672 } 1673 p->c = ioc; 1674 object_ref(OBJECT(ioc)); 1675 1676 p->thread_created = true; 1677 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1678 QEMU_THREAD_JOINABLE); 1679 qatomic_inc(&multifd_recv_state->count); 1680 } 1681 1682 bool multifd_send_prepare_common(MultiFDSendParams *p) 1683 { 1684 multifd_send_zero_page_detect(p); 1685 1686 if (!p->pages->normal_num) { 1687 p->next_packet_size = 0; 1688 return false; 1689 } 1690 1691 multifd_send_prepare_header(p); 1692 1693 return true; 1694 } 1695