1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "migration-stats.h" 23 #include "socket.h" 24 #include "tls.h" 25 #include "qemu-file.h" 26 #include "trace.h" 27 #include "multifd.h" 28 #include "threadinfo.h" 29 #include "options.h" 30 #include "qemu/yank.h" 31 #include "io/channel-socket.h" 32 #include "yank_functions.h" 33 34 /* Multiple fd's */ 35 36 #define MULTIFD_MAGIC 0x11223344U 37 #define MULTIFD_VERSION 1 38 39 typedef struct { 40 uint32_t magic; 41 uint32_t version; 42 unsigned char uuid[16]; /* QemuUUID */ 43 uint8_t id; 44 uint8_t unused1[7]; /* Reserved for future use */ 45 uint64_t unused2[4]; /* Reserved for future use */ 46 } __attribute__((packed)) MultiFDInit_t; 47 48 struct { 49 MultiFDSendParams *params; 50 /* array of pages to sent */ 51 MultiFDPages_t *pages; 52 /* 53 * Global number of generated multifd packets. 54 * 55 * Note that we used 'uintptr_t' because it'll naturally support atomic 56 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 57 * multifd will overflow the packet_num easier, but that should be 58 * fine. 59 * 60 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 61 * hosts, however so far it does not support atomic fetch_add() yet. 62 * Make it easy for now. 63 */ 64 uintptr_t packet_num; 65 /* 66 * Synchronization point past which no more channels will be 67 * created. 68 */ 69 QemuSemaphore channels_created; 70 /* send channels ready */ 71 QemuSemaphore channels_ready; 72 /* 73 * Have we already run terminate threads. There is a race when it 74 * happens that we got one error while we are exiting. 75 * We will use atomic operations. Only valid values are 0 and 1. 76 */ 77 int exiting; 78 /* multifd ops */ 79 MultiFDMethods *ops; 80 } *multifd_send_state; 81 82 struct { 83 MultiFDRecvParams *params; 84 /* number of created threads */ 85 int count; 86 /* syncs main thread and channels */ 87 QemuSemaphore sem_sync; 88 /* global number of generated multifd packets */ 89 uint64_t packet_num; 90 int exiting; 91 /* multifd ops */ 92 MultiFDMethods *ops; 93 } *multifd_recv_state; 94 95 /* Multifd without compression */ 96 97 /** 98 * nocomp_send_setup: setup send side 99 * 100 * @p: Params for the channel that we are using 101 * @errp: pointer to an error 102 */ 103 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 104 { 105 if (migrate_zero_copy_send()) { 106 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 107 } 108 109 return 0; 110 } 111 112 /** 113 * nocomp_send_cleanup: cleanup send side 114 * 115 * For no compression this function does nothing. 116 * 117 * @p: Params for the channel that we are using 118 * @errp: pointer to an error 119 */ 120 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 121 { 122 return; 123 } 124 125 /** 126 * nocomp_send_prepare: prepare date to be able to send 127 * 128 * For no compression we just have to calculate the size of the 129 * packet. 130 * 131 * Returns 0 for success or -1 for error 132 * 133 * @p: Params for the channel that we are using 134 * @errp: pointer to an error 135 */ 136 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 137 { 138 bool use_zero_copy_send = migrate_zero_copy_send(); 139 MultiFDPages_t *pages = p->pages; 140 int ret; 141 142 if (!use_zero_copy_send) { 143 /* 144 * Only !zerocopy needs the header in IOV; zerocopy will 145 * send it separately. 146 */ 147 multifd_send_prepare_header(p); 148 } 149 150 for (int i = 0; i < pages->num; i++) { 151 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 152 p->iov[p->iovs_num].iov_len = p->page_size; 153 p->iovs_num++; 154 } 155 156 p->next_packet_size = pages->num * p->page_size; 157 p->flags |= MULTIFD_FLAG_NOCOMP; 158 159 multifd_send_fill_packet(p); 160 161 if (use_zero_copy_send) { 162 /* Send header first, without zerocopy */ 163 ret = qio_channel_write_all(p->c, (void *)p->packet, 164 p->packet_len, errp); 165 if (ret != 0) { 166 return -1; 167 } 168 } 169 170 return 0; 171 } 172 173 /** 174 * nocomp_recv_setup: setup receive side 175 * 176 * For no compression this function does nothing. 177 * 178 * Returns 0 for success or -1 for error 179 * 180 * @p: Params for the channel that we are using 181 * @errp: pointer to an error 182 */ 183 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 184 { 185 return 0; 186 } 187 188 /** 189 * nocomp_recv_cleanup: setup receive side 190 * 191 * For no compression this function does nothing. 192 * 193 * @p: Params for the channel that we are using 194 */ 195 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 196 { 197 } 198 199 /** 200 * nocomp_recv_pages: read the data from the channel into actual pages 201 * 202 * For no compression we just need to read things into the correct place. 203 * 204 * Returns 0 for success or -1 for error 205 * 206 * @p: Params for the channel that we are using 207 * @errp: pointer to an error 208 */ 209 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) 210 { 211 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 212 213 if (flags != MULTIFD_FLAG_NOCOMP) { 214 error_setg(errp, "multifd %u: flags received %x flags expected %x", 215 p->id, flags, MULTIFD_FLAG_NOCOMP); 216 return -1; 217 } 218 for (int i = 0; i < p->normal_num; i++) { 219 p->iov[i].iov_base = p->host + p->normal[i]; 220 p->iov[i].iov_len = p->page_size; 221 } 222 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 223 } 224 225 static MultiFDMethods multifd_nocomp_ops = { 226 .send_setup = nocomp_send_setup, 227 .send_cleanup = nocomp_send_cleanup, 228 .send_prepare = nocomp_send_prepare, 229 .recv_setup = nocomp_recv_setup, 230 .recv_cleanup = nocomp_recv_cleanup, 231 .recv_pages = nocomp_recv_pages 232 }; 233 234 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 235 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 236 }; 237 238 void multifd_register_ops(int method, MultiFDMethods *ops) 239 { 240 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 241 multifd_ops[method] = ops; 242 } 243 244 /* Reset a MultiFDPages_t* object for the next use */ 245 static void multifd_pages_reset(MultiFDPages_t *pages) 246 { 247 /* 248 * We don't need to touch offset[] array, because it will be 249 * overwritten later when reused. 250 */ 251 pages->num = 0; 252 pages->block = NULL; 253 } 254 255 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 256 { 257 MultiFDInit_t msg = {}; 258 size_t size = sizeof(msg); 259 int ret; 260 261 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 262 msg.version = cpu_to_be32(MULTIFD_VERSION); 263 msg.id = p->id; 264 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 265 266 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 267 if (ret != 0) { 268 return -1; 269 } 270 stat64_add(&mig_stats.multifd_bytes, size); 271 return 0; 272 } 273 274 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 275 { 276 MultiFDInit_t msg; 277 int ret; 278 279 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 280 if (ret != 0) { 281 return -1; 282 } 283 284 msg.magic = be32_to_cpu(msg.magic); 285 msg.version = be32_to_cpu(msg.version); 286 287 if (msg.magic != MULTIFD_MAGIC) { 288 error_setg(errp, "multifd: received packet magic %x " 289 "expected %x", msg.magic, MULTIFD_MAGIC); 290 return -1; 291 } 292 293 if (msg.version != MULTIFD_VERSION) { 294 error_setg(errp, "multifd: received packet version %u " 295 "expected %u", msg.version, MULTIFD_VERSION); 296 return -1; 297 } 298 299 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 300 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 301 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 302 303 error_setg(errp, "multifd: received uuid '%s' and expected " 304 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 305 g_free(uuid); 306 g_free(msg_uuid); 307 return -1; 308 } 309 310 if (msg.id > migrate_multifd_channels()) { 311 error_setg(errp, "multifd: received channel id %u is greater than " 312 "number of channels %u", msg.id, migrate_multifd_channels()); 313 return -1; 314 } 315 316 return msg.id; 317 } 318 319 static MultiFDPages_t *multifd_pages_init(uint32_t n) 320 { 321 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 322 323 pages->allocated = n; 324 pages->offset = g_new0(ram_addr_t, n); 325 326 return pages; 327 } 328 329 static void multifd_pages_clear(MultiFDPages_t *pages) 330 { 331 multifd_pages_reset(pages); 332 pages->allocated = 0; 333 g_free(pages->offset); 334 pages->offset = NULL; 335 g_free(pages); 336 } 337 338 void multifd_send_fill_packet(MultiFDSendParams *p) 339 { 340 MultiFDPacket_t *packet = p->packet; 341 MultiFDPages_t *pages = p->pages; 342 uint64_t packet_num; 343 int i; 344 345 packet->flags = cpu_to_be32(p->flags); 346 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 347 packet->normal_pages = cpu_to_be32(pages->num); 348 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 349 350 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 351 packet->packet_num = cpu_to_be64(packet_num); 352 353 if (pages->block) { 354 strncpy(packet->ramblock, pages->block->idstr, 256); 355 } 356 357 for (i = 0; i < pages->num; i++) { 358 /* there are architectures where ram_addr_t is 32 bit */ 359 uint64_t temp = pages->offset[i]; 360 361 packet->offset[i] = cpu_to_be64(temp); 362 } 363 364 p->packets_sent++; 365 p->total_normal_pages += pages->num; 366 367 trace_multifd_send(p->id, packet_num, pages->num, p->flags, 368 p->next_packet_size); 369 } 370 371 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 372 { 373 MultiFDPacket_t *packet = p->packet; 374 int i; 375 376 packet->magic = be32_to_cpu(packet->magic); 377 if (packet->magic != MULTIFD_MAGIC) { 378 error_setg(errp, "multifd: received packet " 379 "magic %x and expected magic %x", 380 packet->magic, MULTIFD_MAGIC); 381 return -1; 382 } 383 384 packet->version = be32_to_cpu(packet->version); 385 if (packet->version != MULTIFD_VERSION) { 386 error_setg(errp, "multifd: received packet " 387 "version %u and expected version %u", 388 packet->version, MULTIFD_VERSION); 389 return -1; 390 } 391 392 p->flags = be32_to_cpu(packet->flags); 393 394 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 395 /* 396 * If we received a packet that is 100 times bigger than expected 397 * just stop migration. It is a magic number. 398 */ 399 if (packet->pages_alloc > p->page_count) { 400 error_setg(errp, "multifd: received packet " 401 "with size %u and expected a size of %u", 402 packet->pages_alloc, p->page_count) ; 403 return -1; 404 } 405 406 p->normal_num = be32_to_cpu(packet->normal_pages); 407 if (p->normal_num > packet->pages_alloc) { 408 error_setg(errp, "multifd: received packet " 409 "with %u pages and expected maximum pages are %u", 410 p->normal_num, packet->pages_alloc) ; 411 return -1; 412 } 413 414 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 415 p->packet_num = be64_to_cpu(packet->packet_num); 416 p->packets_recved++; 417 p->total_normal_pages += p->normal_num; 418 419 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, 420 p->next_packet_size); 421 422 if (p->normal_num == 0) { 423 return 0; 424 } 425 426 /* make sure that ramblock is 0 terminated */ 427 packet->ramblock[255] = 0; 428 p->block = qemu_ram_block_by_name(packet->ramblock); 429 if (!p->block) { 430 error_setg(errp, "multifd: unknown ram block %s", 431 packet->ramblock); 432 return -1; 433 } 434 435 p->host = p->block->host; 436 for (i = 0; i < p->normal_num; i++) { 437 uint64_t offset = be64_to_cpu(packet->offset[i]); 438 439 if (offset > (p->block->used_length - p->page_size)) { 440 error_setg(errp, "multifd: offset too long %" PRIu64 441 " (max " RAM_ADDR_FMT ")", 442 offset, p->block->used_length); 443 return -1; 444 } 445 p->normal[i] = offset; 446 } 447 448 return 0; 449 } 450 451 static bool multifd_send_should_exit(void) 452 { 453 return qatomic_read(&multifd_send_state->exiting); 454 } 455 456 static bool multifd_recv_should_exit(void) 457 { 458 return qatomic_read(&multifd_recv_state->exiting); 459 } 460 461 /* 462 * The migration thread can wait on either of the two semaphores. This 463 * function can be used to kick the main thread out of waiting on either of 464 * them. Should mostly only be called when something wrong happened with 465 * the current multifd send thread. 466 */ 467 static void multifd_send_kick_main(MultiFDSendParams *p) 468 { 469 qemu_sem_post(&p->sem_sync); 470 qemu_sem_post(&multifd_send_state->channels_ready); 471 } 472 473 /* 474 * How we use multifd_send_state->pages and channel->pages? 475 * 476 * We create a pages for each channel, and a main one. Each time that 477 * we need to send a batch of pages we interchange the ones between 478 * multifd_send_state and the channel that is sending it. There are 479 * two reasons for that: 480 * - to not have to do so many mallocs during migration 481 * - to make easier to know what to free at the end of migration 482 * 483 * This way we always know who is the owner of each "pages" struct, 484 * and we don't need any locking. It belongs to the migration thread 485 * or to the channel thread. Switching is safe because the migration 486 * thread is using the channel mutex when changing it, and the channel 487 * have to had finish with its own, otherwise pending_job can't be 488 * false. 489 * 490 * Returns true if succeed, false otherwise. 491 */ 492 static bool multifd_send_pages(void) 493 { 494 int i; 495 static int next_channel; 496 MultiFDSendParams *p = NULL; /* make happy gcc */ 497 MultiFDPages_t *pages = multifd_send_state->pages; 498 499 if (multifd_send_should_exit()) { 500 return false; 501 } 502 503 /* We wait here, until at least one channel is ready */ 504 qemu_sem_wait(&multifd_send_state->channels_ready); 505 506 /* 507 * next_channel can remain from a previous migration that was 508 * using more channels, so ensure it doesn't overflow if the 509 * limit is lower now. 510 */ 511 next_channel %= migrate_multifd_channels(); 512 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 513 if (multifd_send_should_exit()) { 514 return false; 515 } 516 p = &multifd_send_state->params[i]; 517 /* 518 * Lockless read to p->pending_job is safe, because only multifd 519 * sender thread can clear it. 520 */ 521 if (qatomic_read(&p->pending_job) == false) { 522 next_channel = (i + 1) % migrate_multifd_channels(); 523 break; 524 } 525 } 526 527 /* 528 * Make sure we read p->pending_job before all the rest. Pairs with 529 * qatomic_store_release() in multifd_send_thread(). 530 */ 531 smp_mb_acquire(); 532 assert(!p->pages->num); 533 multifd_send_state->pages = p->pages; 534 p->pages = pages; 535 /* 536 * Making sure p->pages is setup before marking pending_job=true. Pairs 537 * with the qatomic_load_acquire() in multifd_send_thread(). 538 */ 539 qatomic_store_release(&p->pending_job, true); 540 qemu_sem_post(&p->sem); 541 542 return true; 543 } 544 545 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 546 { 547 return pages->num == 0; 548 } 549 550 static inline bool multifd_queue_full(MultiFDPages_t *pages) 551 { 552 return pages->num == pages->allocated; 553 } 554 555 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 556 { 557 pages->offset[pages->num++] = offset; 558 } 559 560 /* Returns true if enqueue successful, false otherwise */ 561 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 562 { 563 MultiFDPages_t *pages; 564 565 retry: 566 pages = multifd_send_state->pages; 567 568 /* If the queue is empty, we can already enqueue now */ 569 if (multifd_queue_empty(pages)) { 570 pages->block = block; 571 multifd_enqueue(pages, offset); 572 return true; 573 } 574 575 /* 576 * Not empty, meanwhile we need a flush. It can because of either: 577 * 578 * (1) The page is not on the same ramblock of previous ones, or, 579 * (2) The queue is full. 580 * 581 * After flush, always retry. 582 */ 583 if (pages->block != block || multifd_queue_full(pages)) { 584 if (!multifd_send_pages()) { 585 return false; 586 } 587 goto retry; 588 } 589 590 /* Not empty, and we still have space, do it! */ 591 multifd_enqueue(pages, offset); 592 return true; 593 } 594 595 /* Multifd send side hit an error; remember it and prepare to quit */ 596 static void multifd_send_set_error(Error *err) 597 { 598 /* 599 * We don't want to exit each threads twice. Depending on where 600 * we get the error, or if there are two independent errors in two 601 * threads at the same time, we can end calling this function 602 * twice. 603 */ 604 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 605 return; 606 } 607 608 if (err) { 609 MigrationState *s = migrate_get_current(); 610 migrate_set_error(s, err); 611 if (s->state == MIGRATION_STATUS_SETUP || 612 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 613 s->state == MIGRATION_STATUS_DEVICE || 614 s->state == MIGRATION_STATUS_ACTIVE) { 615 migrate_set_state(&s->state, s->state, 616 MIGRATION_STATUS_FAILED); 617 } 618 } 619 } 620 621 static void multifd_send_terminate_threads(void) 622 { 623 int i; 624 625 trace_multifd_send_terminate_threads(); 626 627 /* 628 * Tell everyone we're quitting. No xchg() needed here; we simply 629 * always set it. 630 */ 631 qatomic_set(&multifd_send_state->exiting, 1); 632 633 /* 634 * Firstly, kick all threads out; no matter whether they are just idle, 635 * or blocked in an IO system call. 636 */ 637 for (i = 0; i < migrate_multifd_channels(); i++) { 638 MultiFDSendParams *p = &multifd_send_state->params[i]; 639 640 qemu_sem_post(&p->sem); 641 if (p->c) { 642 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 643 } 644 } 645 646 /* 647 * Finally recycle all the threads. 648 */ 649 for (i = 0; i < migrate_multifd_channels(); i++) { 650 MultiFDSendParams *p = &multifd_send_state->params[i]; 651 652 if (p->tls_thread_created) { 653 qemu_thread_join(&p->tls_thread); 654 } 655 656 if (p->thread_created) { 657 qemu_thread_join(&p->thread); 658 } 659 } 660 } 661 662 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 663 { 664 if (p->c) { 665 migration_ioc_unregister_yank(p->c); 666 object_unref(OBJECT(p->c)); 667 p->c = NULL; 668 } 669 qemu_sem_destroy(&p->sem); 670 qemu_sem_destroy(&p->sem_sync); 671 g_free(p->name); 672 p->name = NULL; 673 multifd_pages_clear(p->pages); 674 p->pages = NULL; 675 p->packet_len = 0; 676 g_free(p->packet); 677 p->packet = NULL; 678 g_free(p->iov); 679 p->iov = NULL; 680 multifd_send_state->ops->send_cleanup(p, errp); 681 682 return *errp == NULL; 683 } 684 685 static void multifd_send_cleanup_state(void) 686 { 687 socket_cleanup_outgoing_migration(); 688 qemu_sem_destroy(&multifd_send_state->channels_created); 689 qemu_sem_destroy(&multifd_send_state->channels_ready); 690 g_free(multifd_send_state->params); 691 multifd_send_state->params = NULL; 692 multifd_pages_clear(multifd_send_state->pages); 693 multifd_send_state->pages = NULL; 694 g_free(multifd_send_state); 695 multifd_send_state = NULL; 696 } 697 698 void multifd_send_shutdown(void) 699 { 700 int i; 701 702 if (!migrate_multifd()) { 703 return; 704 } 705 706 multifd_send_terminate_threads(); 707 708 for (i = 0; i < migrate_multifd_channels(); i++) { 709 MultiFDSendParams *p = &multifd_send_state->params[i]; 710 Error *local_err = NULL; 711 712 if (!multifd_send_cleanup_channel(p, &local_err)) { 713 migrate_set_error(migrate_get_current(), local_err); 714 error_free(local_err); 715 } 716 } 717 718 multifd_send_cleanup_state(); 719 } 720 721 static int multifd_zero_copy_flush(QIOChannel *c) 722 { 723 int ret; 724 Error *err = NULL; 725 726 ret = qio_channel_flush(c, &err); 727 if (ret < 0) { 728 error_report_err(err); 729 return -1; 730 } 731 if (ret == 1) { 732 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 733 } 734 735 return ret; 736 } 737 738 int multifd_send_sync_main(void) 739 { 740 int i; 741 bool flush_zero_copy; 742 743 if (!migrate_multifd()) { 744 return 0; 745 } 746 if (multifd_send_state->pages->num) { 747 if (!multifd_send_pages()) { 748 error_report("%s: multifd_send_pages fail", __func__); 749 return -1; 750 } 751 } 752 753 flush_zero_copy = migrate_zero_copy_send(); 754 755 for (i = 0; i < migrate_multifd_channels(); i++) { 756 MultiFDSendParams *p = &multifd_send_state->params[i]; 757 758 if (multifd_send_should_exit()) { 759 return -1; 760 } 761 762 trace_multifd_send_sync_main_signal(p->id); 763 764 /* 765 * We should be the only user so far, so not possible to be set by 766 * others concurrently. 767 */ 768 assert(qatomic_read(&p->pending_sync) == false); 769 qatomic_set(&p->pending_sync, true); 770 qemu_sem_post(&p->sem); 771 } 772 for (i = 0; i < migrate_multifd_channels(); i++) { 773 MultiFDSendParams *p = &multifd_send_state->params[i]; 774 775 if (multifd_send_should_exit()) { 776 return -1; 777 } 778 779 qemu_sem_wait(&multifd_send_state->channels_ready); 780 trace_multifd_send_sync_main_wait(p->id); 781 qemu_sem_wait(&p->sem_sync); 782 783 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 784 return -1; 785 } 786 } 787 trace_multifd_send_sync_main(multifd_send_state->packet_num); 788 789 return 0; 790 } 791 792 static void *multifd_send_thread(void *opaque) 793 { 794 MultiFDSendParams *p = opaque; 795 MigrationThread *thread = NULL; 796 Error *local_err = NULL; 797 int ret = 0; 798 799 thread = migration_threads_add(p->name, qemu_get_thread_id()); 800 801 trace_multifd_send_thread_start(p->id); 802 rcu_register_thread(); 803 804 if (multifd_send_initial_packet(p, &local_err) < 0) { 805 ret = -1; 806 goto out; 807 } 808 809 while (true) { 810 qemu_sem_post(&multifd_send_state->channels_ready); 811 qemu_sem_wait(&p->sem); 812 813 if (multifd_send_should_exit()) { 814 break; 815 } 816 817 /* 818 * Read pending_job flag before p->pages. Pairs with the 819 * qatomic_store_release() in multifd_send_pages(). 820 */ 821 if (qatomic_load_acquire(&p->pending_job)) { 822 MultiFDPages_t *pages = p->pages; 823 824 p->iovs_num = 0; 825 assert(pages->num); 826 827 ret = multifd_send_state->ops->send_prepare(p, &local_err); 828 if (ret != 0) { 829 break; 830 } 831 832 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 833 0, p->write_flags, &local_err); 834 if (ret != 0) { 835 break; 836 } 837 838 stat64_add(&mig_stats.multifd_bytes, 839 p->next_packet_size + p->packet_len); 840 841 multifd_pages_reset(p->pages); 842 p->next_packet_size = 0; 843 844 /* 845 * Making sure p->pages is published before saying "we're 846 * free". Pairs with the smp_mb_acquire() in 847 * multifd_send_pages(). 848 */ 849 qatomic_store_release(&p->pending_job, false); 850 } else { 851 /* 852 * If not a normal job, must be a sync request. Note that 853 * pending_sync is a standalone flag (unlike pending_job), so 854 * it doesn't require explicit memory barriers. 855 */ 856 assert(qatomic_read(&p->pending_sync)); 857 p->flags = MULTIFD_FLAG_SYNC; 858 multifd_send_fill_packet(p); 859 ret = qio_channel_write_all(p->c, (void *)p->packet, 860 p->packet_len, &local_err); 861 if (ret != 0) { 862 break; 863 } 864 /* p->next_packet_size will always be zero for a SYNC packet */ 865 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 866 p->flags = 0; 867 qatomic_set(&p->pending_sync, false); 868 qemu_sem_post(&p->sem_sync); 869 } 870 } 871 872 out: 873 if (ret) { 874 assert(local_err); 875 trace_multifd_send_error(p->id); 876 multifd_send_set_error(local_err); 877 multifd_send_kick_main(p); 878 error_free(local_err); 879 } 880 881 rcu_unregister_thread(); 882 migration_threads_remove(thread); 883 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); 884 885 return NULL; 886 } 887 888 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 889 890 typedef struct { 891 MultiFDSendParams *p; 892 QIOChannelTLS *tioc; 893 } MultiFDTLSThreadArgs; 894 895 static void *multifd_tls_handshake_thread(void *opaque) 896 { 897 MultiFDTLSThreadArgs *args = opaque; 898 899 qio_channel_tls_handshake(args->tioc, 900 multifd_new_send_channel_async, 901 args->p, 902 NULL, 903 NULL); 904 g_free(args); 905 906 return NULL; 907 } 908 909 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 910 QIOChannel *ioc, 911 Error **errp) 912 { 913 MigrationState *s = migrate_get_current(); 914 const char *hostname = s->hostname; 915 MultiFDTLSThreadArgs *args; 916 QIOChannelTLS *tioc; 917 918 tioc = migration_tls_client_create(ioc, hostname, errp); 919 if (!tioc) { 920 return false; 921 } 922 923 /* 924 * Ownership of the socket channel now transfers to the newly 925 * created TLS channel, which has already taken a reference. 926 */ 927 object_unref(OBJECT(ioc)); 928 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 929 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 930 931 args = g_new0(MultiFDTLSThreadArgs, 1); 932 args->tioc = tioc; 933 args->p = p; 934 935 p->tls_thread_created = true; 936 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 937 multifd_tls_handshake_thread, args, 938 QEMU_THREAD_JOINABLE); 939 return true; 940 } 941 942 static void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 943 { 944 qio_channel_set_delay(ioc, false); 945 946 migration_ioc_register_yank(ioc); 947 /* Setup p->c only if the channel is completely setup */ 948 p->c = ioc; 949 950 p->thread_created = true; 951 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 952 QEMU_THREAD_JOINABLE); 953 } 954 955 /* 956 * When TLS is enabled this function is called once to establish the 957 * TLS connection and a second time after the TLS handshake to create 958 * the multifd channel. Without TLS it goes straight into the channel 959 * creation. 960 */ 961 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 962 { 963 MultiFDSendParams *p = opaque; 964 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 965 Error *local_err = NULL; 966 bool ret; 967 968 trace_multifd_new_send_channel_async(p->id); 969 970 if (qio_task_propagate_error(task, &local_err)) { 971 ret = false; 972 goto out; 973 } 974 975 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 976 migrate_get_current()->hostname); 977 978 if (migrate_channel_requires_tls_upgrade(ioc)) { 979 ret = multifd_tls_channel_connect(p, ioc, &local_err); 980 if (ret) { 981 return; 982 } 983 } else { 984 multifd_channel_connect(p, ioc); 985 ret = true; 986 } 987 988 out: 989 /* 990 * Here we're not interested whether creation succeeded, only that 991 * it happened at all. 992 */ 993 qemu_sem_post(&multifd_send_state->channels_created); 994 995 if (ret) { 996 return; 997 } 998 999 trace_multifd_new_send_channel_async_error(p->id, local_err); 1000 multifd_send_set_error(local_err); 1001 /* 1002 * For error cases (TLS or non-TLS), IO channel is always freed here 1003 * rather than when cleanup multifd: since p->c is not set, multifd 1004 * cleanup code doesn't even know its existence. 1005 */ 1006 object_unref(OBJECT(ioc)); 1007 error_free(local_err); 1008 } 1009 1010 static void multifd_new_send_channel_create(gpointer opaque) 1011 { 1012 socket_send_channel_create(multifd_new_send_channel_async, opaque); 1013 } 1014 1015 bool multifd_send_setup(void) 1016 { 1017 MigrationState *s = migrate_get_current(); 1018 Error *local_err = NULL; 1019 int thread_count, ret = 0; 1020 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1021 uint8_t i; 1022 1023 if (!migrate_multifd()) { 1024 return true; 1025 } 1026 1027 thread_count = migrate_multifd_channels(); 1028 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1029 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1030 multifd_send_state->pages = multifd_pages_init(page_count); 1031 qemu_sem_init(&multifd_send_state->channels_created, 0); 1032 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1033 qatomic_set(&multifd_send_state->exiting, 0); 1034 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1035 1036 for (i = 0; i < thread_count; i++) { 1037 MultiFDSendParams *p = &multifd_send_state->params[i]; 1038 1039 qemu_sem_init(&p->sem, 0); 1040 qemu_sem_init(&p->sem_sync, 0); 1041 p->id = i; 1042 p->pages = multifd_pages_init(page_count); 1043 p->packet_len = sizeof(MultiFDPacket_t) 1044 + sizeof(uint64_t) * page_count; 1045 p->packet = g_malloc0(p->packet_len); 1046 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1047 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1048 p->name = g_strdup_printf("multifdsend_%d", i); 1049 /* We need one extra place for the packet header */ 1050 p->iov = g_new0(struct iovec, page_count + 1); 1051 p->page_size = qemu_target_page_size(); 1052 p->page_count = page_count; 1053 p->write_flags = 0; 1054 multifd_new_send_channel_create(p); 1055 } 1056 1057 /* 1058 * Wait until channel creation has started for all channels. The 1059 * creation can still fail, but no more channels will be created 1060 * past this point. 1061 */ 1062 for (i = 0; i < thread_count; i++) { 1063 qemu_sem_wait(&multifd_send_state->channels_created); 1064 } 1065 1066 for (i = 0; i < thread_count; i++) { 1067 MultiFDSendParams *p = &multifd_send_state->params[i]; 1068 1069 ret = multifd_send_state->ops->send_setup(p, &local_err); 1070 if (ret) { 1071 break; 1072 } 1073 } 1074 1075 if (ret) { 1076 migrate_set_error(s, local_err); 1077 error_report_err(local_err); 1078 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1079 MIGRATION_STATUS_FAILED); 1080 return false; 1081 } 1082 1083 return true; 1084 } 1085 1086 static void multifd_recv_terminate_threads(Error *err) 1087 { 1088 int i; 1089 1090 trace_multifd_recv_terminate_threads(err != NULL); 1091 1092 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1093 return; 1094 } 1095 1096 if (err) { 1097 MigrationState *s = migrate_get_current(); 1098 migrate_set_error(s, err); 1099 if (s->state == MIGRATION_STATUS_SETUP || 1100 s->state == MIGRATION_STATUS_ACTIVE) { 1101 migrate_set_state(&s->state, s->state, 1102 MIGRATION_STATUS_FAILED); 1103 } 1104 } 1105 1106 for (i = 0; i < migrate_multifd_channels(); i++) { 1107 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1108 1109 /* 1110 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1111 * however try to wakeup it without harm in cleanup phase. 1112 */ 1113 qemu_sem_post(&p->sem_sync); 1114 1115 /* 1116 * We could arrive here for two reasons: 1117 * - normal quit, i.e. everything went fine, just finished 1118 * - error quit: We close the channels so the channel threads 1119 * finish the qio_channel_read_all_eof() 1120 */ 1121 if (p->c) { 1122 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1123 } 1124 } 1125 } 1126 1127 void multifd_recv_shutdown(void) 1128 { 1129 if (migrate_multifd()) { 1130 multifd_recv_terminate_threads(NULL); 1131 } 1132 } 1133 1134 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1135 { 1136 migration_ioc_unregister_yank(p->c); 1137 object_unref(OBJECT(p->c)); 1138 p->c = NULL; 1139 qemu_mutex_destroy(&p->mutex); 1140 qemu_sem_destroy(&p->sem_sync); 1141 g_free(p->name); 1142 p->name = NULL; 1143 p->packet_len = 0; 1144 g_free(p->packet); 1145 p->packet = NULL; 1146 g_free(p->iov); 1147 p->iov = NULL; 1148 g_free(p->normal); 1149 p->normal = NULL; 1150 multifd_recv_state->ops->recv_cleanup(p); 1151 } 1152 1153 static void multifd_recv_cleanup_state(void) 1154 { 1155 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1156 g_free(multifd_recv_state->params); 1157 multifd_recv_state->params = NULL; 1158 g_free(multifd_recv_state); 1159 multifd_recv_state = NULL; 1160 } 1161 1162 void multifd_recv_cleanup(void) 1163 { 1164 int i; 1165 1166 if (!migrate_multifd()) { 1167 return; 1168 } 1169 multifd_recv_terminate_threads(NULL); 1170 for (i = 0; i < migrate_multifd_channels(); i++) { 1171 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1172 1173 if (p->thread_created) { 1174 qemu_thread_join(&p->thread); 1175 } 1176 } 1177 for (i = 0; i < migrate_multifd_channels(); i++) { 1178 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1179 } 1180 multifd_recv_cleanup_state(); 1181 } 1182 1183 void multifd_recv_sync_main(void) 1184 { 1185 int i; 1186 1187 if (!migrate_multifd()) { 1188 return; 1189 } 1190 for (i = 0; i < migrate_multifd_channels(); i++) { 1191 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1192 1193 trace_multifd_recv_sync_main_wait(p->id); 1194 qemu_sem_wait(&multifd_recv_state->sem_sync); 1195 } 1196 for (i = 0; i < migrate_multifd_channels(); i++) { 1197 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1198 1199 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1200 if (multifd_recv_state->packet_num < p->packet_num) { 1201 multifd_recv_state->packet_num = p->packet_num; 1202 } 1203 } 1204 trace_multifd_recv_sync_main_signal(p->id); 1205 qemu_sem_post(&p->sem_sync); 1206 } 1207 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1208 } 1209 1210 static void *multifd_recv_thread(void *opaque) 1211 { 1212 MultiFDRecvParams *p = opaque; 1213 Error *local_err = NULL; 1214 int ret; 1215 1216 trace_multifd_recv_thread_start(p->id); 1217 rcu_register_thread(); 1218 1219 while (true) { 1220 uint32_t flags; 1221 1222 if (multifd_recv_should_exit()) { 1223 break; 1224 } 1225 1226 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1227 p->packet_len, &local_err); 1228 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1229 break; 1230 } 1231 1232 qemu_mutex_lock(&p->mutex); 1233 ret = multifd_recv_unfill_packet(p, &local_err); 1234 if (ret) { 1235 qemu_mutex_unlock(&p->mutex); 1236 break; 1237 } 1238 1239 flags = p->flags; 1240 /* recv methods don't know how to handle the SYNC flag */ 1241 p->flags &= ~MULTIFD_FLAG_SYNC; 1242 qemu_mutex_unlock(&p->mutex); 1243 1244 if (p->normal_num) { 1245 ret = multifd_recv_state->ops->recv_pages(p, &local_err); 1246 if (ret != 0) { 1247 break; 1248 } 1249 } 1250 1251 if (flags & MULTIFD_FLAG_SYNC) { 1252 qemu_sem_post(&multifd_recv_state->sem_sync); 1253 qemu_sem_wait(&p->sem_sync); 1254 } 1255 } 1256 1257 if (local_err) { 1258 multifd_recv_terminate_threads(local_err); 1259 error_free(local_err); 1260 } 1261 1262 rcu_unregister_thread(); 1263 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); 1264 1265 return NULL; 1266 } 1267 1268 int multifd_recv_setup(Error **errp) 1269 { 1270 int thread_count; 1271 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1272 uint8_t i; 1273 1274 /* 1275 * Return successfully if multiFD recv state is already initialised 1276 * or multiFD is not enabled. 1277 */ 1278 if (multifd_recv_state || !migrate_multifd()) { 1279 return 0; 1280 } 1281 1282 thread_count = migrate_multifd_channels(); 1283 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1284 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1285 qatomic_set(&multifd_recv_state->count, 0); 1286 qatomic_set(&multifd_recv_state->exiting, 0); 1287 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1288 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1289 1290 for (i = 0; i < thread_count; i++) { 1291 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1292 1293 qemu_mutex_init(&p->mutex); 1294 qemu_sem_init(&p->sem_sync, 0); 1295 p->id = i; 1296 p->packet_len = sizeof(MultiFDPacket_t) 1297 + sizeof(uint64_t) * page_count; 1298 p->packet = g_malloc0(p->packet_len); 1299 p->name = g_strdup_printf("multifdrecv_%d", i); 1300 p->iov = g_new0(struct iovec, page_count); 1301 p->normal = g_new0(ram_addr_t, page_count); 1302 p->page_count = page_count; 1303 p->page_size = qemu_target_page_size(); 1304 } 1305 1306 for (i = 0; i < thread_count; i++) { 1307 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1308 int ret; 1309 1310 ret = multifd_recv_state->ops->recv_setup(p, errp); 1311 if (ret) { 1312 return ret; 1313 } 1314 } 1315 return 0; 1316 } 1317 1318 bool multifd_recv_all_channels_created(void) 1319 { 1320 int thread_count = migrate_multifd_channels(); 1321 1322 if (!migrate_multifd()) { 1323 return true; 1324 } 1325 1326 if (!multifd_recv_state) { 1327 /* Called before any connections created */ 1328 return false; 1329 } 1330 1331 return thread_count == qatomic_read(&multifd_recv_state->count); 1332 } 1333 1334 /* 1335 * Try to receive all multifd channels to get ready for the migration. 1336 * Sets @errp when failing to receive the current channel. 1337 */ 1338 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1339 { 1340 MultiFDRecvParams *p; 1341 Error *local_err = NULL; 1342 int id; 1343 1344 id = multifd_recv_initial_packet(ioc, &local_err); 1345 if (id < 0) { 1346 multifd_recv_terminate_threads(local_err); 1347 error_propagate_prepend(errp, local_err, 1348 "failed to receive packet" 1349 " via multifd channel %d: ", 1350 qatomic_read(&multifd_recv_state->count)); 1351 return; 1352 } 1353 trace_multifd_recv_new_channel(id); 1354 1355 p = &multifd_recv_state->params[id]; 1356 if (p->c != NULL) { 1357 error_setg(&local_err, "multifd: received id '%d' already setup'", 1358 id); 1359 multifd_recv_terminate_threads(local_err); 1360 error_propagate(errp, local_err); 1361 return; 1362 } 1363 p->c = ioc; 1364 object_ref(OBJECT(ioc)); 1365 1366 p->thread_created = true; 1367 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1368 QEMU_THREAD_JOINABLE); 1369 qatomic_inc(&multifd_recv_state->count); 1370 } 1371