1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/rcu.h" 16 #include "exec/target_page.h" 17 #include "sysemu/sysemu.h" 18 #include "exec/ramblock.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "file.h" 22 #include "migration.h" 23 #include "migration-stats.h" 24 #include "socket.h" 25 #include "tls.h" 26 #include "qemu-file.h" 27 #include "trace.h" 28 #include "multifd.h" 29 #include "threadinfo.h" 30 #include "options.h" 31 #include "qemu/yank.h" 32 #include "io/channel-file.h" 33 #include "io/channel-socket.h" 34 #include "yank_functions.h" 35 36 /* Multiple fd's */ 37 38 #define MULTIFD_MAGIC 0x11223344U 39 #define MULTIFD_VERSION 1 40 41 typedef struct { 42 uint32_t magic; 43 uint32_t version; 44 unsigned char uuid[16]; /* QemuUUID */ 45 uint8_t id; 46 uint8_t unused1[7]; /* Reserved for future use */ 47 uint64_t unused2[4]; /* Reserved for future use */ 48 } __attribute__((packed)) MultiFDInit_t; 49 50 struct { 51 MultiFDSendParams *params; 52 /* array of pages to sent */ 53 MultiFDPages_t *pages; 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 static bool multifd_use_packets(void) 102 { 103 return !migrate_mapped_ram(); 104 } 105 106 void multifd_send_channel_created(void) 107 { 108 qemu_sem_post(&multifd_send_state->channels_created); 109 } 110 111 static void multifd_set_file_bitmap(MultiFDSendParams *p) 112 { 113 MultiFDPages_t *pages = p->pages; 114 115 assert(pages->block); 116 117 for (int i = 0; i < p->pages->normal_num; i++) { 118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true); 119 } 120 121 for (int i = p->pages->normal_num; i < p->pages->num; i++) { 122 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false); 123 } 124 } 125 126 /* Multifd without compression */ 127 128 /** 129 * nocomp_send_setup: setup send side 130 * 131 * @p: Params for the channel that we are using 132 * @errp: pointer to an error 133 */ 134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 135 { 136 if (migrate_zero_copy_send()) { 137 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 138 } 139 140 if (multifd_use_packets()) { 141 /* We need one extra place for the packet header */ 142 p->iov = g_new0(struct iovec, p->page_count + 1); 143 } else { 144 p->iov = g_new0(struct iovec, p->page_count); 145 } 146 147 return 0; 148 } 149 150 /** 151 * nocomp_send_cleanup: cleanup send side 152 * 153 * For no compression this function does nothing. 154 * 155 * @p: Params for the channel that we are using 156 * @errp: pointer to an error 157 */ 158 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 159 { 160 g_free(p->iov); 161 p->iov = NULL; 162 return; 163 } 164 165 static void multifd_send_prepare_iovs(MultiFDSendParams *p) 166 { 167 MultiFDPages_t *pages = p->pages; 168 169 for (int i = 0; i < pages->normal_num; i++) { 170 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 171 p->iov[p->iovs_num].iov_len = p->page_size; 172 p->iovs_num++; 173 } 174 175 p->next_packet_size = pages->normal_num * p->page_size; 176 } 177 178 /** 179 * nocomp_send_prepare: prepare date to be able to send 180 * 181 * For no compression we just have to calculate the size of the 182 * packet. 183 * 184 * Returns 0 for success or -1 for error 185 * 186 * @p: Params for the channel that we are using 187 * @errp: pointer to an error 188 */ 189 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 190 { 191 bool use_zero_copy_send = migrate_zero_copy_send(); 192 int ret; 193 194 multifd_send_zero_page_detect(p); 195 196 if (!multifd_use_packets()) { 197 multifd_send_prepare_iovs(p); 198 multifd_set_file_bitmap(p); 199 200 return 0; 201 } 202 203 if (!use_zero_copy_send) { 204 /* 205 * Only !zerocopy needs the header in IOV; zerocopy will 206 * send it separately. 207 */ 208 multifd_send_prepare_header(p); 209 } 210 211 multifd_send_prepare_iovs(p); 212 p->flags |= MULTIFD_FLAG_NOCOMP; 213 214 multifd_send_fill_packet(p); 215 216 if (use_zero_copy_send) { 217 /* Send header first, without zerocopy */ 218 ret = qio_channel_write_all(p->c, (void *)p->packet, 219 p->packet_len, errp); 220 if (ret != 0) { 221 return -1; 222 } 223 } 224 225 return 0; 226 } 227 228 /** 229 * nocomp_recv_setup: setup receive side 230 * 231 * For no compression this function does nothing. 232 * 233 * Returns 0 for success or -1 for error 234 * 235 * @p: Params for the channel that we are using 236 * @errp: pointer to an error 237 */ 238 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 239 { 240 p->iov = g_new0(struct iovec, p->page_count); 241 return 0; 242 } 243 244 /** 245 * nocomp_recv_cleanup: setup receive side 246 * 247 * For no compression this function does nothing. 248 * 249 * @p: Params for the channel that we are using 250 */ 251 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 252 { 253 g_free(p->iov); 254 p->iov = NULL; 255 } 256 257 /** 258 * nocomp_recv: read the data from the channel 259 * 260 * For no compression we just need to read things into the correct place. 261 * 262 * Returns 0 for success or -1 for error 263 * 264 * @p: Params for the channel that we are using 265 * @errp: pointer to an error 266 */ 267 static int nocomp_recv(MultiFDRecvParams *p, Error **errp) 268 { 269 uint32_t flags; 270 271 if (!multifd_use_packets()) { 272 return multifd_file_recv_data(p, errp); 273 } 274 275 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 276 277 if (flags != MULTIFD_FLAG_NOCOMP) { 278 error_setg(errp, "multifd %u: flags received %x flags expected %x", 279 p->id, flags, MULTIFD_FLAG_NOCOMP); 280 return -1; 281 } 282 283 multifd_recv_zero_page_process(p); 284 285 if (!p->normal_num) { 286 return 0; 287 } 288 289 for (int i = 0; i < p->normal_num; i++) { 290 p->iov[i].iov_base = p->host + p->normal[i]; 291 p->iov[i].iov_len = p->page_size; 292 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]); 293 } 294 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 295 } 296 297 static MultiFDMethods multifd_nocomp_ops = { 298 .send_setup = nocomp_send_setup, 299 .send_cleanup = nocomp_send_cleanup, 300 .send_prepare = nocomp_send_prepare, 301 .recv_setup = nocomp_recv_setup, 302 .recv_cleanup = nocomp_recv_cleanup, 303 .recv = nocomp_recv 304 }; 305 306 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 307 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 308 }; 309 310 void multifd_register_ops(int method, MultiFDMethods *ops) 311 { 312 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 313 multifd_ops[method] = ops; 314 } 315 316 /* Reset a MultiFDPages_t* object for the next use */ 317 static void multifd_pages_reset(MultiFDPages_t *pages) 318 { 319 /* 320 * We don't need to touch offset[] array, because it will be 321 * overwritten later when reused. 322 */ 323 pages->num = 0; 324 pages->normal_num = 0; 325 pages->block = NULL; 326 } 327 328 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 329 { 330 MultiFDInit_t msg = {}; 331 size_t size = sizeof(msg); 332 int ret; 333 334 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 335 msg.version = cpu_to_be32(MULTIFD_VERSION); 336 msg.id = p->id; 337 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 338 339 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 340 if (ret != 0) { 341 return -1; 342 } 343 stat64_add(&mig_stats.multifd_bytes, size); 344 return 0; 345 } 346 347 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 348 { 349 MultiFDInit_t msg; 350 int ret; 351 352 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 353 if (ret != 0) { 354 return -1; 355 } 356 357 msg.magic = be32_to_cpu(msg.magic); 358 msg.version = be32_to_cpu(msg.version); 359 360 if (msg.magic != MULTIFD_MAGIC) { 361 error_setg(errp, "multifd: received packet magic %x " 362 "expected %x", msg.magic, MULTIFD_MAGIC); 363 return -1; 364 } 365 366 if (msg.version != MULTIFD_VERSION) { 367 error_setg(errp, "multifd: received packet version %u " 368 "expected %u", msg.version, MULTIFD_VERSION); 369 return -1; 370 } 371 372 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 373 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 374 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 375 376 error_setg(errp, "multifd: received uuid '%s' and expected " 377 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 378 g_free(uuid); 379 g_free(msg_uuid); 380 return -1; 381 } 382 383 if (msg.id > migrate_multifd_channels()) { 384 error_setg(errp, "multifd: received channel id %u is greater than " 385 "number of channels %u", msg.id, migrate_multifd_channels()); 386 return -1; 387 } 388 389 return msg.id; 390 } 391 392 static MultiFDPages_t *multifd_pages_init(uint32_t n) 393 { 394 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 395 396 pages->allocated = n; 397 pages->offset = g_new0(ram_addr_t, n); 398 399 return pages; 400 } 401 402 static void multifd_pages_clear(MultiFDPages_t *pages) 403 { 404 multifd_pages_reset(pages); 405 pages->allocated = 0; 406 g_free(pages->offset); 407 pages->offset = NULL; 408 g_free(pages); 409 } 410 411 void multifd_send_fill_packet(MultiFDSendParams *p) 412 { 413 MultiFDPacket_t *packet = p->packet; 414 MultiFDPages_t *pages = p->pages; 415 uint64_t packet_num; 416 uint32_t zero_num = pages->num - pages->normal_num; 417 int i; 418 419 packet->flags = cpu_to_be32(p->flags); 420 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 421 packet->normal_pages = cpu_to_be32(pages->normal_num); 422 packet->zero_pages = cpu_to_be32(zero_num); 423 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 424 425 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 426 packet->packet_num = cpu_to_be64(packet_num); 427 428 if (pages->block) { 429 strncpy(packet->ramblock, pages->block->idstr, 256); 430 } 431 432 for (i = 0; i < pages->num; i++) { 433 /* there are architectures where ram_addr_t is 32 bit */ 434 uint64_t temp = pages->offset[i]; 435 436 packet->offset[i] = cpu_to_be64(temp); 437 } 438 439 p->packets_sent++; 440 p->total_normal_pages += pages->normal_num; 441 p->total_zero_pages += zero_num; 442 443 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num, 444 p->flags, p->next_packet_size); 445 } 446 447 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 448 { 449 MultiFDPacket_t *packet = p->packet; 450 int i; 451 452 packet->magic = be32_to_cpu(packet->magic); 453 if (packet->magic != MULTIFD_MAGIC) { 454 error_setg(errp, "multifd: received packet " 455 "magic %x and expected magic %x", 456 packet->magic, MULTIFD_MAGIC); 457 return -1; 458 } 459 460 packet->version = be32_to_cpu(packet->version); 461 if (packet->version != MULTIFD_VERSION) { 462 error_setg(errp, "multifd: received packet " 463 "version %u and expected version %u", 464 packet->version, MULTIFD_VERSION); 465 return -1; 466 } 467 468 p->flags = be32_to_cpu(packet->flags); 469 470 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 471 /* 472 * If we received a packet that is 100 times bigger than expected 473 * just stop migration. It is a magic number. 474 */ 475 if (packet->pages_alloc > p->page_count) { 476 error_setg(errp, "multifd: received packet " 477 "with size %u and expected a size of %u", 478 packet->pages_alloc, p->page_count) ; 479 return -1; 480 } 481 482 p->normal_num = be32_to_cpu(packet->normal_pages); 483 if (p->normal_num > packet->pages_alloc) { 484 error_setg(errp, "multifd: received packet " 485 "with %u normal pages and expected maximum pages are %u", 486 p->normal_num, packet->pages_alloc) ; 487 return -1; 488 } 489 490 p->zero_num = be32_to_cpu(packet->zero_pages); 491 if (p->zero_num > packet->pages_alloc - p->normal_num) { 492 error_setg(errp, "multifd: received packet " 493 "with %u zero pages and expected maximum zero pages are %u", 494 p->zero_num, packet->pages_alloc - p->normal_num) ; 495 return -1; 496 } 497 498 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 499 p->packet_num = be64_to_cpu(packet->packet_num); 500 p->packets_recved++; 501 p->total_normal_pages += p->normal_num; 502 p->total_zero_pages += p->zero_num; 503 504 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num, 505 p->flags, p->next_packet_size); 506 507 if (p->normal_num == 0 && p->zero_num == 0) { 508 return 0; 509 } 510 511 /* make sure that ramblock is 0 terminated */ 512 packet->ramblock[255] = 0; 513 p->block = qemu_ram_block_by_name(packet->ramblock); 514 if (!p->block) { 515 error_setg(errp, "multifd: unknown ram block %s", 516 packet->ramblock); 517 return -1; 518 } 519 520 p->host = p->block->host; 521 for (i = 0; i < p->normal_num; i++) { 522 uint64_t offset = be64_to_cpu(packet->offset[i]); 523 524 if (offset > (p->block->used_length - p->page_size)) { 525 error_setg(errp, "multifd: offset too long %" PRIu64 526 " (max " RAM_ADDR_FMT ")", 527 offset, p->block->used_length); 528 return -1; 529 } 530 p->normal[i] = offset; 531 } 532 533 for (i = 0; i < p->zero_num; i++) { 534 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]); 535 536 if (offset > (p->block->used_length - p->page_size)) { 537 error_setg(errp, "multifd: offset too long %" PRIu64 538 " (max " RAM_ADDR_FMT ")", 539 offset, p->block->used_length); 540 return -1; 541 } 542 p->zero[i] = offset; 543 } 544 545 return 0; 546 } 547 548 static bool multifd_send_should_exit(void) 549 { 550 return qatomic_read(&multifd_send_state->exiting); 551 } 552 553 static bool multifd_recv_should_exit(void) 554 { 555 return qatomic_read(&multifd_recv_state->exiting); 556 } 557 558 /* 559 * The migration thread can wait on either of the two semaphores. This 560 * function can be used to kick the main thread out of waiting on either of 561 * them. Should mostly only be called when something wrong happened with 562 * the current multifd send thread. 563 */ 564 static void multifd_send_kick_main(MultiFDSendParams *p) 565 { 566 qemu_sem_post(&p->sem_sync); 567 qemu_sem_post(&multifd_send_state->channels_ready); 568 } 569 570 /* 571 * How we use multifd_send_state->pages and channel->pages? 572 * 573 * We create a pages for each channel, and a main one. Each time that 574 * we need to send a batch of pages we interchange the ones between 575 * multifd_send_state and the channel that is sending it. There are 576 * two reasons for that: 577 * - to not have to do so many mallocs during migration 578 * - to make easier to know what to free at the end of migration 579 * 580 * This way we always know who is the owner of each "pages" struct, 581 * and we don't need any locking. It belongs to the migration thread 582 * or to the channel thread. Switching is safe because the migration 583 * thread is using the channel mutex when changing it, and the channel 584 * have to had finish with its own, otherwise pending_job can't be 585 * false. 586 * 587 * Returns true if succeed, false otherwise. 588 */ 589 static bool multifd_send_pages(void) 590 { 591 int i; 592 static int next_channel; 593 MultiFDSendParams *p = NULL; /* make happy gcc */ 594 MultiFDPages_t *pages = multifd_send_state->pages; 595 596 if (multifd_send_should_exit()) { 597 return false; 598 } 599 600 /* We wait here, until at least one channel is ready */ 601 qemu_sem_wait(&multifd_send_state->channels_ready); 602 603 /* 604 * next_channel can remain from a previous migration that was 605 * using more channels, so ensure it doesn't overflow if the 606 * limit is lower now. 607 */ 608 next_channel %= migrate_multifd_channels(); 609 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 610 if (multifd_send_should_exit()) { 611 return false; 612 } 613 p = &multifd_send_state->params[i]; 614 /* 615 * Lockless read to p->pending_job is safe, because only multifd 616 * sender thread can clear it. 617 */ 618 if (qatomic_read(&p->pending_job) == false) { 619 next_channel = (i + 1) % migrate_multifd_channels(); 620 break; 621 } 622 } 623 624 /* 625 * Make sure we read p->pending_job before all the rest. Pairs with 626 * qatomic_store_release() in multifd_send_thread(). 627 */ 628 smp_mb_acquire(); 629 assert(!p->pages->num); 630 multifd_send_state->pages = p->pages; 631 p->pages = pages; 632 /* 633 * Making sure p->pages is setup before marking pending_job=true. Pairs 634 * with the qatomic_load_acquire() in multifd_send_thread(). 635 */ 636 qatomic_store_release(&p->pending_job, true); 637 qemu_sem_post(&p->sem); 638 639 return true; 640 } 641 642 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 643 { 644 return pages->num == 0; 645 } 646 647 static inline bool multifd_queue_full(MultiFDPages_t *pages) 648 { 649 return pages->num == pages->allocated; 650 } 651 652 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 653 { 654 pages->offset[pages->num++] = offset; 655 } 656 657 /* Returns true if enqueue successful, false otherwise */ 658 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 659 { 660 MultiFDPages_t *pages; 661 662 retry: 663 pages = multifd_send_state->pages; 664 665 /* If the queue is empty, we can already enqueue now */ 666 if (multifd_queue_empty(pages)) { 667 pages->block = block; 668 multifd_enqueue(pages, offset); 669 return true; 670 } 671 672 /* 673 * Not empty, meanwhile we need a flush. It can because of either: 674 * 675 * (1) The page is not on the same ramblock of previous ones, or, 676 * (2) The queue is full. 677 * 678 * After flush, always retry. 679 */ 680 if (pages->block != block || multifd_queue_full(pages)) { 681 if (!multifd_send_pages()) { 682 return false; 683 } 684 goto retry; 685 } 686 687 /* Not empty, and we still have space, do it! */ 688 multifd_enqueue(pages, offset); 689 return true; 690 } 691 692 /* Multifd send side hit an error; remember it and prepare to quit */ 693 static void multifd_send_set_error(Error *err) 694 { 695 /* 696 * We don't want to exit each threads twice. Depending on where 697 * we get the error, or if there are two independent errors in two 698 * threads at the same time, we can end calling this function 699 * twice. 700 */ 701 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 702 return; 703 } 704 705 if (err) { 706 MigrationState *s = migrate_get_current(); 707 migrate_set_error(s, err); 708 if (s->state == MIGRATION_STATUS_SETUP || 709 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 710 s->state == MIGRATION_STATUS_DEVICE || 711 s->state == MIGRATION_STATUS_ACTIVE) { 712 migrate_set_state(&s->state, s->state, 713 MIGRATION_STATUS_FAILED); 714 } 715 } 716 } 717 718 static void multifd_send_terminate_threads(void) 719 { 720 int i; 721 722 trace_multifd_send_terminate_threads(); 723 724 /* 725 * Tell everyone we're quitting. No xchg() needed here; we simply 726 * always set it. 727 */ 728 qatomic_set(&multifd_send_state->exiting, 1); 729 730 /* 731 * Firstly, kick all threads out; no matter whether they are just idle, 732 * or blocked in an IO system call. 733 */ 734 for (i = 0; i < migrate_multifd_channels(); i++) { 735 MultiFDSendParams *p = &multifd_send_state->params[i]; 736 737 qemu_sem_post(&p->sem); 738 if (p->c) { 739 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 740 } 741 } 742 743 /* 744 * Finally recycle all the threads. 745 */ 746 for (i = 0; i < migrate_multifd_channels(); i++) { 747 MultiFDSendParams *p = &multifd_send_state->params[i]; 748 749 if (p->tls_thread_created) { 750 qemu_thread_join(&p->tls_thread); 751 } 752 753 if (p->thread_created) { 754 qemu_thread_join(&p->thread); 755 } 756 } 757 } 758 759 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 760 { 761 if (p->c) { 762 migration_ioc_unregister_yank(p->c); 763 /* 764 * The object_unref() cannot guarantee the fd will always be 765 * released because finalize() of the iochannel is only 766 * triggered on the last reference and it's not guaranteed 767 * that we always hold the last refcount when reaching here. 768 * 769 * Closing the fd explicitly has the benefit that if there is any 770 * registered I/O handler callbacks on such fd, that will get a 771 * POLLNVAL event and will further trigger the cleanup to finally 772 * release the IOC. 773 * 774 * FIXME: It should logically be guaranteed that all multifd 775 * channels have no I/O handler callback registered when reaching 776 * here, because migration thread will wait for all multifd channel 777 * establishments to complete during setup. Since 778 * migrate_fd_cleanup() will be scheduled in main thread too, all 779 * previous callbacks should guarantee to be completed when 780 * reaching here. See multifd_send_state.channels_created and its 781 * usage. In the future, we could replace this with an assert 782 * making sure we're the last reference, or simply drop it if above 783 * is more clear to be justified. 784 */ 785 qio_channel_close(p->c, &error_abort); 786 object_unref(OBJECT(p->c)); 787 p->c = NULL; 788 } 789 qemu_sem_destroy(&p->sem); 790 qemu_sem_destroy(&p->sem_sync); 791 g_free(p->name); 792 p->name = NULL; 793 multifd_pages_clear(p->pages); 794 p->pages = NULL; 795 p->packet_len = 0; 796 g_free(p->packet); 797 p->packet = NULL; 798 multifd_send_state->ops->send_cleanup(p, errp); 799 800 return *errp == NULL; 801 } 802 803 static void multifd_send_cleanup_state(void) 804 { 805 file_cleanup_outgoing_migration(); 806 socket_cleanup_outgoing_migration(); 807 qemu_sem_destroy(&multifd_send_state->channels_created); 808 qemu_sem_destroy(&multifd_send_state->channels_ready); 809 g_free(multifd_send_state->params); 810 multifd_send_state->params = NULL; 811 multifd_pages_clear(multifd_send_state->pages); 812 multifd_send_state->pages = NULL; 813 g_free(multifd_send_state); 814 multifd_send_state = NULL; 815 } 816 817 void multifd_send_shutdown(void) 818 { 819 int i; 820 821 if (!migrate_multifd()) { 822 return; 823 } 824 825 multifd_send_terminate_threads(); 826 827 for (i = 0; i < migrate_multifd_channels(); i++) { 828 MultiFDSendParams *p = &multifd_send_state->params[i]; 829 Error *local_err = NULL; 830 831 if (!multifd_send_cleanup_channel(p, &local_err)) { 832 migrate_set_error(migrate_get_current(), local_err); 833 error_free(local_err); 834 } 835 } 836 837 multifd_send_cleanup_state(); 838 } 839 840 static int multifd_zero_copy_flush(QIOChannel *c) 841 { 842 int ret; 843 Error *err = NULL; 844 845 ret = qio_channel_flush(c, &err); 846 if (ret < 0) { 847 error_report_err(err); 848 return -1; 849 } 850 if (ret == 1) { 851 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 852 } 853 854 return ret; 855 } 856 857 int multifd_send_sync_main(void) 858 { 859 int i; 860 bool flush_zero_copy; 861 862 if (!migrate_multifd()) { 863 return 0; 864 } 865 if (multifd_send_state->pages->num) { 866 if (!multifd_send_pages()) { 867 error_report("%s: multifd_send_pages fail", __func__); 868 return -1; 869 } 870 } 871 872 flush_zero_copy = migrate_zero_copy_send(); 873 874 for (i = 0; i < migrate_multifd_channels(); i++) { 875 MultiFDSendParams *p = &multifd_send_state->params[i]; 876 877 if (multifd_send_should_exit()) { 878 return -1; 879 } 880 881 trace_multifd_send_sync_main_signal(p->id); 882 883 /* 884 * We should be the only user so far, so not possible to be set by 885 * others concurrently. 886 */ 887 assert(qatomic_read(&p->pending_sync) == false); 888 qatomic_set(&p->pending_sync, true); 889 qemu_sem_post(&p->sem); 890 } 891 for (i = 0; i < migrate_multifd_channels(); i++) { 892 MultiFDSendParams *p = &multifd_send_state->params[i]; 893 894 if (multifd_send_should_exit()) { 895 return -1; 896 } 897 898 qemu_sem_wait(&multifd_send_state->channels_ready); 899 trace_multifd_send_sync_main_wait(p->id); 900 qemu_sem_wait(&p->sem_sync); 901 902 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 903 return -1; 904 } 905 } 906 trace_multifd_send_sync_main(multifd_send_state->packet_num); 907 908 return 0; 909 } 910 911 static void *multifd_send_thread(void *opaque) 912 { 913 MultiFDSendParams *p = opaque; 914 MigrationThread *thread = NULL; 915 Error *local_err = NULL; 916 int ret = 0; 917 bool use_packets = multifd_use_packets(); 918 919 thread = migration_threads_add(p->name, qemu_get_thread_id()); 920 921 trace_multifd_send_thread_start(p->id); 922 rcu_register_thread(); 923 924 if (use_packets) { 925 if (multifd_send_initial_packet(p, &local_err) < 0) { 926 ret = -1; 927 goto out; 928 } 929 } 930 931 while (true) { 932 qemu_sem_post(&multifd_send_state->channels_ready); 933 qemu_sem_wait(&p->sem); 934 935 if (multifd_send_should_exit()) { 936 break; 937 } 938 939 /* 940 * Read pending_job flag before p->pages. Pairs with the 941 * qatomic_store_release() in multifd_send_pages(). 942 */ 943 if (qatomic_load_acquire(&p->pending_job)) { 944 MultiFDPages_t *pages = p->pages; 945 946 p->iovs_num = 0; 947 assert(pages->num); 948 949 ret = multifd_send_state->ops->send_prepare(p, &local_err); 950 if (ret != 0) { 951 break; 952 } 953 954 if (migrate_mapped_ram()) { 955 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 956 p->pages->block, &local_err); 957 } else { 958 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 959 NULL, 0, p->write_flags, 960 &local_err); 961 } 962 963 if (ret != 0) { 964 break; 965 } 966 967 stat64_add(&mig_stats.multifd_bytes, 968 p->next_packet_size + p->packet_len); 969 stat64_add(&mig_stats.normal_pages, pages->normal_num); 970 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); 971 972 multifd_pages_reset(p->pages); 973 p->next_packet_size = 0; 974 975 /* 976 * Making sure p->pages is published before saying "we're 977 * free". Pairs with the smp_mb_acquire() in 978 * multifd_send_pages(). 979 */ 980 qatomic_store_release(&p->pending_job, false); 981 } else { 982 /* 983 * If not a normal job, must be a sync request. Note that 984 * pending_sync is a standalone flag (unlike pending_job), so 985 * it doesn't require explicit memory barriers. 986 */ 987 assert(qatomic_read(&p->pending_sync)); 988 989 if (use_packets) { 990 p->flags = MULTIFD_FLAG_SYNC; 991 multifd_send_fill_packet(p); 992 ret = qio_channel_write_all(p->c, (void *)p->packet, 993 p->packet_len, &local_err); 994 if (ret != 0) { 995 break; 996 } 997 /* p->next_packet_size will always be zero for a SYNC packet */ 998 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 999 p->flags = 0; 1000 } 1001 1002 qatomic_set(&p->pending_sync, false); 1003 qemu_sem_post(&p->sem_sync); 1004 } 1005 } 1006 1007 out: 1008 if (ret) { 1009 assert(local_err); 1010 trace_multifd_send_error(p->id); 1011 multifd_send_set_error(local_err); 1012 multifd_send_kick_main(p); 1013 error_free(local_err); 1014 } 1015 1016 rcu_unregister_thread(); 1017 migration_threads_remove(thread); 1018 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages, 1019 p->total_zero_pages); 1020 1021 return NULL; 1022 } 1023 1024 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 1025 1026 typedef struct { 1027 MultiFDSendParams *p; 1028 QIOChannelTLS *tioc; 1029 } MultiFDTLSThreadArgs; 1030 1031 static void *multifd_tls_handshake_thread(void *opaque) 1032 { 1033 MultiFDTLSThreadArgs *args = opaque; 1034 1035 qio_channel_tls_handshake(args->tioc, 1036 multifd_new_send_channel_async, 1037 args->p, 1038 NULL, 1039 NULL); 1040 g_free(args); 1041 1042 return NULL; 1043 } 1044 1045 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 1046 QIOChannel *ioc, 1047 Error **errp) 1048 { 1049 MigrationState *s = migrate_get_current(); 1050 const char *hostname = s->hostname; 1051 MultiFDTLSThreadArgs *args; 1052 QIOChannelTLS *tioc; 1053 1054 tioc = migration_tls_client_create(ioc, hostname, errp); 1055 if (!tioc) { 1056 return false; 1057 } 1058 1059 /* 1060 * Ownership of the socket channel now transfers to the newly 1061 * created TLS channel, which has already taken a reference. 1062 */ 1063 object_unref(OBJECT(ioc)); 1064 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 1065 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 1066 1067 args = g_new0(MultiFDTLSThreadArgs, 1); 1068 args->tioc = tioc; 1069 args->p = p; 1070 1071 p->tls_thread_created = true; 1072 qemu_thread_create(&p->tls_thread, "mig/src/tls", 1073 multifd_tls_handshake_thread, args, 1074 QEMU_THREAD_JOINABLE); 1075 return true; 1076 } 1077 1078 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 1079 { 1080 qio_channel_set_delay(ioc, false); 1081 1082 migration_ioc_register_yank(ioc); 1083 /* Setup p->c only if the channel is completely setup */ 1084 p->c = ioc; 1085 1086 p->thread_created = true; 1087 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 1088 QEMU_THREAD_JOINABLE); 1089 } 1090 1091 /* 1092 * When TLS is enabled this function is called once to establish the 1093 * TLS connection and a second time after the TLS handshake to create 1094 * the multifd channel. Without TLS it goes straight into the channel 1095 * creation. 1096 */ 1097 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 1098 { 1099 MultiFDSendParams *p = opaque; 1100 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 1101 Error *local_err = NULL; 1102 bool ret; 1103 1104 trace_multifd_new_send_channel_async(p->id); 1105 1106 if (qio_task_propagate_error(task, &local_err)) { 1107 ret = false; 1108 goto out; 1109 } 1110 1111 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 1112 migrate_get_current()->hostname); 1113 1114 if (migrate_channel_requires_tls_upgrade(ioc)) { 1115 ret = multifd_tls_channel_connect(p, ioc, &local_err); 1116 if (ret) { 1117 return; 1118 } 1119 } else { 1120 multifd_channel_connect(p, ioc); 1121 ret = true; 1122 } 1123 1124 out: 1125 /* 1126 * Here we're not interested whether creation succeeded, only that 1127 * it happened at all. 1128 */ 1129 multifd_send_channel_created(); 1130 1131 if (ret) { 1132 return; 1133 } 1134 1135 trace_multifd_new_send_channel_async_error(p->id, local_err); 1136 multifd_send_set_error(local_err); 1137 /* 1138 * For error cases (TLS or non-TLS), IO channel is always freed here 1139 * rather than when cleanup multifd: since p->c is not set, multifd 1140 * cleanup code doesn't even know its existence. 1141 */ 1142 object_unref(OBJECT(ioc)); 1143 error_free(local_err); 1144 } 1145 1146 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 1147 { 1148 if (!multifd_use_packets()) { 1149 return file_send_channel_create(opaque, errp); 1150 } 1151 1152 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1153 return true; 1154 } 1155 1156 bool multifd_send_setup(void) 1157 { 1158 MigrationState *s = migrate_get_current(); 1159 int thread_count, ret = 0; 1160 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1161 bool use_packets = multifd_use_packets(); 1162 uint8_t i; 1163 1164 if (!migrate_multifd()) { 1165 return true; 1166 } 1167 1168 thread_count = migrate_multifd_channels(); 1169 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1170 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1171 multifd_send_state->pages = multifd_pages_init(page_count); 1172 qemu_sem_init(&multifd_send_state->channels_created, 0); 1173 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1174 qatomic_set(&multifd_send_state->exiting, 0); 1175 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1176 1177 for (i = 0; i < thread_count; i++) { 1178 MultiFDSendParams *p = &multifd_send_state->params[i]; 1179 Error *local_err = NULL; 1180 1181 qemu_sem_init(&p->sem, 0); 1182 qemu_sem_init(&p->sem_sync, 0); 1183 p->id = i; 1184 p->pages = multifd_pages_init(page_count); 1185 1186 if (use_packets) { 1187 p->packet_len = sizeof(MultiFDPacket_t) 1188 + sizeof(uint64_t) * page_count; 1189 p->packet = g_malloc0(p->packet_len); 1190 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1191 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1192 } 1193 p->name = g_strdup_printf("mig/src/send_%d", i); 1194 p->page_size = qemu_target_page_size(); 1195 p->page_count = page_count; 1196 p->write_flags = 0; 1197 1198 if (!multifd_new_send_channel_create(p, &local_err)) { 1199 migrate_set_error(s, local_err); 1200 ret = -1; 1201 } 1202 } 1203 1204 /* 1205 * Wait until channel creation has started for all channels. The 1206 * creation can still fail, but no more channels will be created 1207 * past this point. 1208 */ 1209 for (i = 0; i < thread_count; i++) { 1210 qemu_sem_wait(&multifd_send_state->channels_created); 1211 } 1212 1213 if (ret) { 1214 goto err; 1215 } 1216 1217 for (i = 0; i < thread_count; i++) { 1218 MultiFDSendParams *p = &multifd_send_state->params[i]; 1219 Error *local_err = NULL; 1220 1221 ret = multifd_send_state->ops->send_setup(p, &local_err); 1222 if (ret) { 1223 migrate_set_error(s, local_err); 1224 goto err; 1225 } 1226 } 1227 1228 return true; 1229 1230 err: 1231 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1232 MIGRATION_STATUS_FAILED); 1233 return false; 1234 } 1235 1236 bool multifd_recv(void) 1237 { 1238 int i; 1239 static int next_recv_channel; 1240 MultiFDRecvParams *p = NULL; 1241 MultiFDRecvData *data = multifd_recv_state->data; 1242 1243 /* 1244 * next_channel can remain from a previous migration that was 1245 * using more channels, so ensure it doesn't overflow if the 1246 * limit is lower now. 1247 */ 1248 next_recv_channel %= migrate_multifd_channels(); 1249 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1250 if (multifd_recv_should_exit()) { 1251 return false; 1252 } 1253 1254 p = &multifd_recv_state->params[i]; 1255 1256 if (qatomic_read(&p->pending_job) == false) { 1257 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1258 break; 1259 } 1260 } 1261 1262 /* 1263 * Order pending_job read before manipulating p->data below. Pairs 1264 * with qatomic_store_release() at multifd_recv_thread(). 1265 */ 1266 smp_mb_acquire(); 1267 1268 assert(!p->data->size); 1269 multifd_recv_state->data = p->data; 1270 p->data = data; 1271 1272 /* 1273 * Order p->data update before setting pending_job. Pairs with 1274 * qatomic_load_acquire() at multifd_recv_thread(). 1275 */ 1276 qatomic_store_release(&p->pending_job, true); 1277 qemu_sem_post(&p->sem); 1278 1279 return true; 1280 } 1281 1282 MultiFDRecvData *multifd_get_recv_data(void) 1283 { 1284 return multifd_recv_state->data; 1285 } 1286 1287 static void multifd_recv_terminate_threads(Error *err) 1288 { 1289 int i; 1290 1291 trace_multifd_recv_terminate_threads(err != NULL); 1292 1293 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1294 return; 1295 } 1296 1297 if (err) { 1298 MigrationState *s = migrate_get_current(); 1299 migrate_set_error(s, err); 1300 if (s->state == MIGRATION_STATUS_SETUP || 1301 s->state == MIGRATION_STATUS_ACTIVE) { 1302 migrate_set_state(&s->state, s->state, 1303 MIGRATION_STATUS_FAILED); 1304 } 1305 } 1306 1307 for (i = 0; i < migrate_multifd_channels(); i++) { 1308 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1309 1310 /* 1311 * The migration thread and channels interact differently 1312 * depending on the presence of packets. 1313 */ 1314 if (multifd_use_packets()) { 1315 /* 1316 * The channel receives as long as there are packets. When 1317 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1318 * channel waits for the migration thread to sync. If the 1319 * sync never happens, do it here. 1320 */ 1321 qemu_sem_post(&p->sem_sync); 1322 } else { 1323 /* 1324 * The channel waits for the migration thread to give it 1325 * work. When the migration thread runs out of work, it 1326 * releases the channel and waits for any pending work to 1327 * finish. If we reach here (e.g. due to error) before the 1328 * work runs out, release the channel. 1329 */ 1330 qemu_sem_post(&p->sem); 1331 } 1332 1333 /* 1334 * We could arrive here for two reasons: 1335 * - normal quit, i.e. everything went fine, just finished 1336 * - error quit: We close the channels so the channel threads 1337 * finish the qio_channel_read_all_eof() 1338 */ 1339 if (p->c) { 1340 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1341 } 1342 } 1343 } 1344 1345 void multifd_recv_shutdown(void) 1346 { 1347 if (migrate_multifd()) { 1348 multifd_recv_terminate_threads(NULL); 1349 } 1350 } 1351 1352 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1353 { 1354 migration_ioc_unregister_yank(p->c); 1355 object_unref(OBJECT(p->c)); 1356 p->c = NULL; 1357 qemu_mutex_destroy(&p->mutex); 1358 qemu_sem_destroy(&p->sem_sync); 1359 qemu_sem_destroy(&p->sem); 1360 g_free(p->data); 1361 p->data = NULL; 1362 g_free(p->name); 1363 p->name = NULL; 1364 p->packet_len = 0; 1365 g_free(p->packet); 1366 p->packet = NULL; 1367 g_free(p->normal); 1368 p->normal = NULL; 1369 g_free(p->zero); 1370 p->zero = NULL; 1371 multifd_recv_state->ops->recv_cleanup(p); 1372 } 1373 1374 static void multifd_recv_cleanup_state(void) 1375 { 1376 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1377 g_free(multifd_recv_state->params); 1378 multifd_recv_state->params = NULL; 1379 g_free(multifd_recv_state->data); 1380 multifd_recv_state->data = NULL; 1381 g_free(multifd_recv_state); 1382 multifd_recv_state = NULL; 1383 } 1384 1385 void multifd_recv_cleanup(void) 1386 { 1387 int i; 1388 1389 if (!migrate_multifd()) { 1390 return; 1391 } 1392 multifd_recv_terminate_threads(NULL); 1393 for (i = 0; i < migrate_multifd_channels(); i++) { 1394 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1395 1396 if (p->thread_created) { 1397 qemu_thread_join(&p->thread); 1398 } 1399 } 1400 for (i = 0; i < migrate_multifd_channels(); i++) { 1401 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1402 } 1403 multifd_recv_cleanup_state(); 1404 } 1405 1406 void multifd_recv_sync_main(void) 1407 { 1408 int thread_count = migrate_multifd_channels(); 1409 bool file_based = !multifd_use_packets(); 1410 int i; 1411 1412 if (!migrate_multifd()) { 1413 return; 1414 } 1415 1416 /* 1417 * File-based channels don't use packets and therefore need to 1418 * wait for more work. Release them to start the sync. 1419 */ 1420 if (file_based) { 1421 for (i = 0; i < thread_count; i++) { 1422 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1423 1424 trace_multifd_recv_sync_main_signal(p->id); 1425 qemu_sem_post(&p->sem); 1426 } 1427 } 1428 1429 /* 1430 * Initiate the synchronization by waiting for all channels. 1431 * 1432 * For socket-based migration this means each channel has received 1433 * the SYNC packet on the stream. 1434 * 1435 * For file-based migration this means each channel is done with 1436 * the work (pending_job=false). 1437 */ 1438 for (i = 0; i < thread_count; i++) { 1439 trace_multifd_recv_sync_main_wait(i); 1440 qemu_sem_wait(&multifd_recv_state->sem_sync); 1441 } 1442 1443 if (file_based) { 1444 /* 1445 * For file-based loading is done in one iteration. We're 1446 * done. 1447 */ 1448 return; 1449 } 1450 1451 /* 1452 * Sync done. Release the channels for the next iteration. 1453 */ 1454 for (i = 0; i < thread_count; i++) { 1455 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1456 1457 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1458 if (multifd_recv_state->packet_num < p->packet_num) { 1459 multifd_recv_state->packet_num = p->packet_num; 1460 } 1461 } 1462 trace_multifd_recv_sync_main_signal(p->id); 1463 qemu_sem_post(&p->sem_sync); 1464 } 1465 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1466 } 1467 1468 static void *multifd_recv_thread(void *opaque) 1469 { 1470 MultiFDRecvParams *p = opaque; 1471 Error *local_err = NULL; 1472 bool use_packets = multifd_use_packets(); 1473 int ret; 1474 1475 trace_multifd_recv_thread_start(p->id); 1476 rcu_register_thread(); 1477 1478 while (true) { 1479 uint32_t flags = 0; 1480 bool has_data = false; 1481 p->normal_num = 0; 1482 1483 if (use_packets) { 1484 if (multifd_recv_should_exit()) { 1485 break; 1486 } 1487 1488 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1489 p->packet_len, &local_err); 1490 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1491 break; 1492 } 1493 1494 qemu_mutex_lock(&p->mutex); 1495 ret = multifd_recv_unfill_packet(p, &local_err); 1496 if (ret) { 1497 qemu_mutex_unlock(&p->mutex); 1498 break; 1499 } 1500 1501 flags = p->flags; 1502 /* recv methods don't know how to handle the SYNC flag */ 1503 p->flags &= ~MULTIFD_FLAG_SYNC; 1504 has_data = p->normal_num || p->zero_num; 1505 qemu_mutex_unlock(&p->mutex); 1506 } else { 1507 /* 1508 * No packets, so we need to wait for the vmstate code to 1509 * give us work. 1510 */ 1511 qemu_sem_wait(&p->sem); 1512 1513 if (multifd_recv_should_exit()) { 1514 break; 1515 } 1516 1517 /* pairs with qatomic_store_release() at multifd_recv() */ 1518 if (!qatomic_load_acquire(&p->pending_job)) { 1519 /* 1520 * Migration thread did not send work, this is 1521 * equivalent to pending_sync on the sending 1522 * side. Post sem_sync to notify we reached this 1523 * point. 1524 */ 1525 qemu_sem_post(&multifd_recv_state->sem_sync); 1526 continue; 1527 } 1528 1529 has_data = !!p->data->size; 1530 } 1531 1532 if (has_data) { 1533 ret = multifd_recv_state->ops->recv(p, &local_err); 1534 if (ret != 0) { 1535 break; 1536 } 1537 } 1538 1539 if (use_packets) { 1540 if (flags & MULTIFD_FLAG_SYNC) { 1541 qemu_sem_post(&multifd_recv_state->sem_sync); 1542 qemu_sem_wait(&p->sem_sync); 1543 } 1544 } else { 1545 p->total_normal_pages += p->data->size / qemu_target_page_size(); 1546 p->data->size = 0; 1547 /* 1548 * Order data->size update before clearing 1549 * pending_job. Pairs with smp_mb_acquire() at 1550 * multifd_recv(). 1551 */ 1552 qatomic_store_release(&p->pending_job, false); 1553 } 1554 } 1555 1556 if (local_err) { 1557 multifd_recv_terminate_threads(local_err); 1558 error_free(local_err); 1559 } 1560 1561 rcu_unregister_thread(); 1562 trace_multifd_recv_thread_end(p->id, p->packets_recved, 1563 p->total_normal_pages, 1564 p->total_zero_pages); 1565 1566 return NULL; 1567 } 1568 1569 int multifd_recv_setup(Error **errp) 1570 { 1571 int thread_count; 1572 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1573 bool use_packets = multifd_use_packets(); 1574 uint8_t i; 1575 1576 /* 1577 * Return successfully if multiFD recv state is already initialised 1578 * or multiFD is not enabled. 1579 */ 1580 if (multifd_recv_state || !migrate_multifd()) { 1581 return 0; 1582 } 1583 1584 thread_count = migrate_multifd_channels(); 1585 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1586 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1587 1588 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1589 multifd_recv_state->data->size = 0; 1590 1591 qatomic_set(&multifd_recv_state->count, 0); 1592 qatomic_set(&multifd_recv_state->exiting, 0); 1593 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1594 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1595 1596 for (i = 0; i < thread_count; i++) { 1597 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1598 1599 qemu_mutex_init(&p->mutex); 1600 qemu_sem_init(&p->sem_sync, 0); 1601 qemu_sem_init(&p->sem, 0); 1602 p->pending_job = false; 1603 p->id = i; 1604 1605 p->data = g_new0(MultiFDRecvData, 1); 1606 p->data->size = 0; 1607 1608 if (use_packets) { 1609 p->packet_len = sizeof(MultiFDPacket_t) 1610 + sizeof(uint64_t) * page_count; 1611 p->packet = g_malloc0(p->packet_len); 1612 } 1613 p->name = g_strdup_printf("mig/dst/recv_%d", i); 1614 p->normal = g_new0(ram_addr_t, page_count); 1615 p->zero = g_new0(ram_addr_t, page_count); 1616 p->page_count = page_count; 1617 p->page_size = qemu_target_page_size(); 1618 } 1619 1620 for (i = 0; i < thread_count; i++) { 1621 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1622 int ret; 1623 1624 ret = multifd_recv_state->ops->recv_setup(p, errp); 1625 if (ret) { 1626 return ret; 1627 } 1628 } 1629 return 0; 1630 } 1631 1632 bool multifd_recv_all_channels_created(void) 1633 { 1634 int thread_count = migrate_multifd_channels(); 1635 1636 if (!migrate_multifd()) { 1637 return true; 1638 } 1639 1640 if (!multifd_recv_state) { 1641 /* Called before any connections created */ 1642 return false; 1643 } 1644 1645 return thread_count == qatomic_read(&multifd_recv_state->count); 1646 } 1647 1648 /* 1649 * Try to receive all multifd channels to get ready for the migration. 1650 * Sets @errp when failing to receive the current channel. 1651 */ 1652 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1653 { 1654 MultiFDRecvParams *p; 1655 Error *local_err = NULL; 1656 bool use_packets = multifd_use_packets(); 1657 int id; 1658 1659 if (use_packets) { 1660 id = multifd_recv_initial_packet(ioc, &local_err); 1661 if (id < 0) { 1662 multifd_recv_terminate_threads(local_err); 1663 error_propagate_prepend(errp, local_err, 1664 "failed to receive packet" 1665 " via multifd channel %d: ", 1666 qatomic_read(&multifd_recv_state->count)); 1667 return; 1668 } 1669 trace_multifd_recv_new_channel(id); 1670 } else { 1671 id = qatomic_read(&multifd_recv_state->count); 1672 } 1673 1674 p = &multifd_recv_state->params[id]; 1675 if (p->c != NULL) { 1676 error_setg(&local_err, "multifd: received id '%d' already setup'", 1677 id); 1678 multifd_recv_terminate_threads(local_err); 1679 error_propagate(errp, local_err); 1680 return; 1681 } 1682 p->c = ioc; 1683 object_ref(OBJECT(ioc)); 1684 1685 p->thread_created = true; 1686 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1687 QEMU_THREAD_JOINABLE); 1688 qatomic_inc(&multifd_recv_state->count); 1689 } 1690 1691 bool multifd_send_prepare_common(MultiFDSendParams *p) 1692 { 1693 multifd_send_zero_page_detect(p); 1694 1695 if (!p->pages->normal_num) { 1696 p->next_packet_size = 0; 1697 return false; 1698 } 1699 1700 multifd_send_prepare_header(p); 1701 1702 return true; 1703 } 1704