1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "migration-stats.h" 23 #include "socket.h" 24 #include "tls.h" 25 #include "qemu-file.h" 26 #include "trace.h" 27 #include "multifd.h" 28 #include "threadinfo.h" 29 #include "options.h" 30 #include "qemu/yank.h" 31 #include "io/channel-socket.h" 32 #include "yank_functions.h" 33 34 /* Multiple fd's */ 35 36 #define MULTIFD_MAGIC 0x11223344U 37 #define MULTIFD_VERSION 1 38 39 typedef struct { 40 uint32_t magic; 41 uint32_t version; 42 unsigned char uuid[16]; /* QemuUUID */ 43 uint8_t id; 44 uint8_t unused1[7]; /* Reserved for future use */ 45 uint64_t unused2[4]; /* Reserved for future use */ 46 } __attribute__((packed)) MultiFDInit_t; 47 48 struct { 49 MultiFDSendParams *params; 50 /* array of pages to sent */ 51 MultiFDPages_t *pages; 52 /* 53 * Global number of generated multifd packets. 54 * 55 * Note that we used 'uintptr_t' because it'll naturally support atomic 56 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 57 * multifd will overflow the packet_num easier, but that should be 58 * fine. 59 * 60 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 61 * hosts, however so far it does not support atomic fetch_add() yet. 62 * Make it easy for now. 63 */ 64 uintptr_t packet_num; 65 /* 66 * Synchronization point past which no more channels will be 67 * created. 68 */ 69 QemuSemaphore channels_created; 70 /* send channels ready */ 71 QemuSemaphore channels_ready; 72 /* 73 * Have we already run terminate threads. There is a race when it 74 * happens that we got one error while we are exiting. 75 * We will use atomic operations. Only valid values are 0 and 1. 76 */ 77 int exiting; 78 /* multifd ops */ 79 MultiFDMethods *ops; 80 } *multifd_send_state; 81 82 /* Multifd without compression */ 83 84 /** 85 * nocomp_send_setup: setup send side 86 * 87 * @p: Params for the channel that we are using 88 * @errp: pointer to an error 89 */ 90 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 91 { 92 if (migrate_zero_copy_send()) { 93 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 94 } 95 96 return 0; 97 } 98 99 /** 100 * nocomp_send_cleanup: cleanup send side 101 * 102 * For no compression this function does nothing. 103 * 104 * @p: Params for the channel that we are using 105 * @errp: pointer to an error 106 */ 107 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 108 { 109 return; 110 } 111 112 /** 113 * nocomp_send_prepare: prepare date to be able to send 114 * 115 * For no compression we just have to calculate the size of the 116 * packet. 117 * 118 * Returns 0 for success or -1 for error 119 * 120 * @p: Params for the channel that we are using 121 * @errp: pointer to an error 122 */ 123 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 124 { 125 bool use_zero_copy_send = migrate_zero_copy_send(); 126 MultiFDPages_t *pages = p->pages; 127 int ret; 128 129 if (!use_zero_copy_send) { 130 /* 131 * Only !zerocopy needs the header in IOV; zerocopy will 132 * send it separately. 133 */ 134 multifd_send_prepare_header(p); 135 } 136 137 for (int i = 0; i < pages->num; i++) { 138 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i]; 139 p->iov[p->iovs_num].iov_len = p->page_size; 140 p->iovs_num++; 141 } 142 143 p->next_packet_size = pages->num * p->page_size; 144 p->flags |= MULTIFD_FLAG_NOCOMP; 145 146 multifd_send_fill_packet(p); 147 148 if (use_zero_copy_send) { 149 /* Send header first, without zerocopy */ 150 ret = qio_channel_write_all(p->c, (void *)p->packet, 151 p->packet_len, errp); 152 if (ret != 0) { 153 return -1; 154 } 155 } 156 157 return 0; 158 } 159 160 /** 161 * nocomp_recv_setup: setup receive side 162 * 163 * For no compression this function does nothing. 164 * 165 * Returns 0 for success or -1 for error 166 * 167 * @p: Params for the channel that we are using 168 * @errp: pointer to an error 169 */ 170 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 171 { 172 return 0; 173 } 174 175 /** 176 * nocomp_recv_cleanup: setup receive side 177 * 178 * For no compression this function does nothing. 179 * 180 * @p: Params for the channel that we are using 181 */ 182 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 183 { 184 } 185 186 /** 187 * nocomp_recv_pages: read the data from the channel into actual pages 188 * 189 * For no compression we just need to read things into the correct place. 190 * 191 * Returns 0 for success or -1 for error 192 * 193 * @p: Params for the channel that we are using 194 * @errp: pointer to an error 195 */ 196 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) 197 { 198 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 199 200 if (flags != MULTIFD_FLAG_NOCOMP) { 201 error_setg(errp, "multifd %u: flags received %x flags expected %x", 202 p->id, flags, MULTIFD_FLAG_NOCOMP); 203 return -1; 204 } 205 for (int i = 0; i < p->normal_num; i++) { 206 p->iov[i].iov_base = p->host + p->normal[i]; 207 p->iov[i].iov_len = p->page_size; 208 } 209 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 210 } 211 212 static MultiFDMethods multifd_nocomp_ops = { 213 .send_setup = nocomp_send_setup, 214 .send_cleanup = nocomp_send_cleanup, 215 .send_prepare = nocomp_send_prepare, 216 .recv_setup = nocomp_recv_setup, 217 .recv_cleanup = nocomp_recv_cleanup, 218 .recv_pages = nocomp_recv_pages 219 }; 220 221 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 222 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 223 }; 224 225 void multifd_register_ops(int method, MultiFDMethods *ops) 226 { 227 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 228 multifd_ops[method] = ops; 229 } 230 231 /* Reset a MultiFDPages_t* object for the next use */ 232 static void multifd_pages_reset(MultiFDPages_t *pages) 233 { 234 /* 235 * We don't need to touch offset[] array, because it will be 236 * overwritten later when reused. 237 */ 238 pages->num = 0; 239 pages->block = NULL; 240 } 241 242 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 243 { 244 MultiFDInit_t msg = {}; 245 size_t size = sizeof(msg); 246 int ret; 247 248 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 249 msg.version = cpu_to_be32(MULTIFD_VERSION); 250 msg.id = p->id; 251 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 252 253 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 254 if (ret != 0) { 255 return -1; 256 } 257 stat64_add(&mig_stats.multifd_bytes, size); 258 return 0; 259 } 260 261 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 262 { 263 MultiFDInit_t msg; 264 int ret; 265 266 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 267 if (ret != 0) { 268 return -1; 269 } 270 271 msg.magic = be32_to_cpu(msg.magic); 272 msg.version = be32_to_cpu(msg.version); 273 274 if (msg.magic != MULTIFD_MAGIC) { 275 error_setg(errp, "multifd: received packet magic %x " 276 "expected %x", msg.magic, MULTIFD_MAGIC); 277 return -1; 278 } 279 280 if (msg.version != MULTIFD_VERSION) { 281 error_setg(errp, "multifd: received packet version %u " 282 "expected %u", msg.version, MULTIFD_VERSION); 283 return -1; 284 } 285 286 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 287 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 288 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 289 290 error_setg(errp, "multifd: received uuid '%s' and expected " 291 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 292 g_free(uuid); 293 g_free(msg_uuid); 294 return -1; 295 } 296 297 if (msg.id > migrate_multifd_channels()) { 298 error_setg(errp, "multifd: received channel id %u is greater than " 299 "number of channels %u", msg.id, migrate_multifd_channels()); 300 return -1; 301 } 302 303 return msg.id; 304 } 305 306 static MultiFDPages_t *multifd_pages_init(uint32_t n) 307 { 308 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 309 310 pages->allocated = n; 311 pages->offset = g_new0(ram_addr_t, n); 312 313 return pages; 314 } 315 316 static void multifd_pages_clear(MultiFDPages_t *pages) 317 { 318 multifd_pages_reset(pages); 319 pages->allocated = 0; 320 g_free(pages->offset); 321 pages->offset = NULL; 322 g_free(pages); 323 } 324 325 void multifd_send_fill_packet(MultiFDSendParams *p) 326 { 327 MultiFDPacket_t *packet = p->packet; 328 MultiFDPages_t *pages = p->pages; 329 uint64_t packet_num; 330 int i; 331 332 packet->flags = cpu_to_be32(p->flags); 333 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 334 packet->normal_pages = cpu_to_be32(pages->num); 335 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 336 337 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 338 packet->packet_num = cpu_to_be64(packet_num); 339 340 if (pages->block) { 341 strncpy(packet->ramblock, pages->block->idstr, 256); 342 } 343 344 for (i = 0; i < pages->num; i++) { 345 /* there are architectures where ram_addr_t is 32 bit */ 346 uint64_t temp = pages->offset[i]; 347 348 packet->offset[i] = cpu_to_be64(temp); 349 } 350 351 p->packets_sent++; 352 p->total_normal_pages += pages->num; 353 354 trace_multifd_send(p->id, packet_num, pages->num, p->flags, 355 p->next_packet_size); 356 } 357 358 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 359 { 360 MultiFDPacket_t *packet = p->packet; 361 int i; 362 363 packet->magic = be32_to_cpu(packet->magic); 364 if (packet->magic != MULTIFD_MAGIC) { 365 error_setg(errp, "multifd: received packet " 366 "magic %x and expected magic %x", 367 packet->magic, MULTIFD_MAGIC); 368 return -1; 369 } 370 371 packet->version = be32_to_cpu(packet->version); 372 if (packet->version != MULTIFD_VERSION) { 373 error_setg(errp, "multifd: received packet " 374 "version %u and expected version %u", 375 packet->version, MULTIFD_VERSION); 376 return -1; 377 } 378 379 p->flags = be32_to_cpu(packet->flags); 380 381 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 382 /* 383 * If we received a packet that is 100 times bigger than expected 384 * just stop migration. It is a magic number. 385 */ 386 if (packet->pages_alloc > p->page_count) { 387 error_setg(errp, "multifd: received packet " 388 "with size %u and expected a size of %u", 389 packet->pages_alloc, p->page_count) ; 390 return -1; 391 } 392 393 p->normal_num = be32_to_cpu(packet->normal_pages); 394 if (p->normal_num > packet->pages_alloc) { 395 error_setg(errp, "multifd: received packet " 396 "with %u pages and expected maximum pages are %u", 397 p->normal_num, packet->pages_alloc) ; 398 return -1; 399 } 400 401 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 402 p->packet_num = be64_to_cpu(packet->packet_num); 403 p->packets_recved++; 404 p->total_normal_pages += p->normal_num; 405 406 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags, 407 p->next_packet_size); 408 409 if (p->normal_num == 0) { 410 return 0; 411 } 412 413 /* make sure that ramblock is 0 terminated */ 414 packet->ramblock[255] = 0; 415 p->block = qemu_ram_block_by_name(packet->ramblock); 416 if (!p->block) { 417 error_setg(errp, "multifd: unknown ram block %s", 418 packet->ramblock); 419 return -1; 420 } 421 422 p->host = p->block->host; 423 for (i = 0; i < p->normal_num; i++) { 424 uint64_t offset = be64_to_cpu(packet->offset[i]); 425 426 if (offset > (p->block->used_length - p->page_size)) { 427 error_setg(errp, "multifd: offset too long %" PRIu64 428 " (max " RAM_ADDR_FMT ")", 429 offset, p->block->used_length); 430 return -1; 431 } 432 p->normal[i] = offset; 433 } 434 435 return 0; 436 } 437 438 static bool multifd_send_should_exit(void) 439 { 440 return qatomic_read(&multifd_send_state->exiting); 441 } 442 443 /* 444 * The migration thread can wait on either of the two semaphores. This 445 * function can be used to kick the main thread out of waiting on either of 446 * them. Should mostly only be called when something wrong happened with 447 * the current multifd send thread. 448 */ 449 static void multifd_send_kick_main(MultiFDSendParams *p) 450 { 451 qemu_sem_post(&p->sem_sync); 452 qemu_sem_post(&multifd_send_state->channels_ready); 453 } 454 455 /* 456 * How we use multifd_send_state->pages and channel->pages? 457 * 458 * We create a pages for each channel, and a main one. Each time that 459 * we need to send a batch of pages we interchange the ones between 460 * multifd_send_state and the channel that is sending it. There are 461 * two reasons for that: 462 * - to not have to do so many mallocs during migration 463 * - to make easier to know what to free at the end of migration 464 * 465 * This way we always know who is the owner of each "pages" struct, 466 * and we don't need any locking. It belongs to the migration thread 467 * or to the channel thread. Switching is safe because the migration 468 * thread is using the channel mutex when changing it, and the channel 469 * have to had finish with its own, otherwise pending_job can't be 470 * false. 471 * 472 * Returns true if succeed, false otherwise. 473 */ 474 static bool multifd_send_pages(void) 475 { 476 int i; 477 static int next_channel; 478 MultiFDSendParams *p = NULL; /* make happy gcc */ 479 MultiFDPages_t *pages = multifd_send_state->pages; 480 481 if (multifd_send_should_exit()) { 482 return false; 483 } 484 485 /* We wait here, until at least one channel is ready */ 486 qemu_sem_wait(&multifd_send_state->channels_ready); 487 488 /* 489 * next_channel can remain from a previous migration that was 490 * using more channels, so ensure it doesn't overflow if the 491 * limit is lower now. 492 */ 493 next_channel %= migrate_multifd_channels(); 494 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 495 if (multifd_send_should_exit()) { 496 return false; 497 } 498 p = &multifd_send_state->params[i]; 499 /* 500 * Lockless read to p->pending_job is safe, because only multifd 501 * sender thread can clear it. 502 */ 503 if (qatomic_read(&p->pending_job) == false) { 504 next_channel = (i + 1) % migrate_multifd_channels(); 505 break; 506 } 507 } 508 509 /* 510 * Make sure we read p->pending_job before all the rest. Pairs with 511 * qatomic_store_release() in multifd_send_thread(). 512 */ 513 smp_mb_acquire(); 514 assert(!p->pages->num); 515 multifd_send_state->pages = p->pages; 516 p->pages = pages; 517 /* 518 * Making sure p->pages is setup before marking pending_job=true. Pairs 519 * with the qatomic_load_acquire() in multifd_send_thread(). 520 */ 521 qatomic_store_release(&p->pending_job, true); 522 qemu_sem_post(&p->sem); 523 524 return true; 525 } 526 527 static inline bool multifd_queue_empty(MultiFDPages_t *pages) 528 { 529 return pages->num == 0; 530 } 531 532 static inline bool multifd_queue_full(MultiFDPages_t *pages) 533 { 534 return pages->num == pages->allocated; 535 } 536 537 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) 538 { 539 pages->offset[pages->num++] = offset; 540 } 541 542 /* Returns true if enqueue successful, false otherwise */ 543 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) 544 { 545 MultiFDPages_t *pages; 546 547 retry: 548 pages = multifd_send_state->pages; 549 550 /* If the queue is empty, we can already enqueue now */ 551 if (multifd_queue_empty(pages)) { 552 pages->block = block; 553 multifd_enqueue(pages, offset); 554 return true; 555 } 556 557 /* 558 * Not empty, meanwhile we need a flush. It can because of either: 559 * 560 * (1) The page is not on the same ramblock of previous ones, or, 561 * (2) The queue is full. 562 * 563 * After flush, always retry. 564 */ 565 if (pages->block != block || multifd_queue_full(pages)) { 566 if (!multifd_send_pages()) { 567 return false; 568 } 569 goto retry; 570 } 571 572 /* Not empty, and we still have space, do it! */ 573 multifd_enqueue(pages, offset); 574 return true; 575 } 576 577 /* Multifd send side hit an error; remember it and prepare to quit */ 578 static void multifd_send_set_error(Error *err) 579 { 580 /* 581 * We don't want to exit each threads twice. Depending on where 582 * we get the error, or if there are two independent errors in two 583 * threads at the same time, we can end calling this function 584 * twice. 585 */ 586 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 587 return; 588 } 589 590 if (err) { 591 MigrationState *s = migrate_get_current(); 592 migrate_set_error(s, err); 593 if (s->state == MIGRATION_STATUS_SETUP || 594 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 595 s->state == MIGRATION_STATUS_DEVICE || 596 s->state == MIGRATION_STATUS_ACTIVE) { 597 migrate_set_state(&s->state, s->state, 598 MIGRATION_STATUS_FAILED); 599 } 600 } 601 } 602 603 static void multifd_send_terminate_threads(void) 604 { 605 int i; 606 607 trace_multifd_send_terminate_threads(); 608 609 /* 610 * Tell everyone we're quitting. No xchg() needed here; we simply 611 * always set it. 612 */ 613 qatomic_set(&multifd_send_state->exiting, 1); 614 615 /* 616 * Firstly, kick all threads out; no matter whether they are just idle, 617 * or blocked in an IO system call. 618 */ 619 for (i = 0; i < migrate_multifd_channels(); i++) { 620 MultiFDSendParams *p = &multifd_send_state->params[i]; 621 622 qemu_sem_post(&p->sem); 623 if (p->c) { 624 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 625 } 626 } 627 628 /* 629 * Finally recycle all the threads. 630 */ 631 for (i = 0; i < migrate_multifd_channels(); i++) { 632 MultiFDSendParams *p = &multifd_send_state->params[i]; 633 634 if (p->tls_thread_created) { 635 qemu_thread_join(&p->tls_thread); 636 } 637 638 if (p->thread_created) { 639 qemu_thread_join(&p->thread); 640 } 641 } 642 } 643 644 static int multifd_send_channel_destroy(QIOChannel *send) 645 { 646 return socket_send_channel_destroy(send); 647 } 648 649 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 650 { 651 if (p->registered_yank) { 652 migration_ioc_unregister_yank(p->c); 653 } 654 multifd_send_channel_destroy(p->c); 655 p->c = NULL; 656 qemu_sem_destroy(&p->sem); 657 qemu_sem_destroy(&p->sem_sync); 658 g_free(p->name); 659 p->name = NULL; 660 multifd_pages_clear(p->pages); 661 p->pages = NULL; 662 p->packet_len = 0; 663 g_free(p->packet); 664 p->packet = NULL; 665 g_free(p->iov); 666 p->iov = NULL; 667 multifd_send_state->ops->send_cleanup(p, errp); 668 669 return *errp == NULL; 670 } 671 672 static void multifd_send_cleanup_state(void) 673 { 674 qemu_sem_destroy(&multifd_send_state->channels_created); 675 qemu_sem_destroy(&multifd_send_state->channels_ready); 676 g_free(multifd_send_state->params); 677 multifd_send_state->params = NULL; 678 multifd_pages_clear(multifd_send_state->pages); 679 multifd_send_state->pages = NULL; 680 g_free(multifd_send_state); 681 multifd_send_state = NULL; 682 } 683 684 void multifd_send_shutdown(void) 685 { 686 int i; 687 688 if (!migrate_multifd()) { 689 return; 690 } 691 692 multifd_send_terminate_threads(); 693 694 for (i = 0; i < migrate_multifd_channels(); i++) { 695 MultiFDSendParams *p = &multifd_send_state->params[i]; 696 Error *local_err = NULL; 697 698 if (!multifd_send_cleanup_channel(p, &local_err)) { 699 migrate_set_error(migrate_get_current(), local_err); 700 error_free(local_err); 701 } 702 } 703 704 multifd_send_cleanup_state(); 705 } 706 707 static int multifd_zero_copy_flush(QIOChannel *c) 708 { 709 int ret; 710 Error *err = NULL; 711 712 ret = qio_channel_flush(c, &err); 713 if (ret < 0) { 714 error_report_err(err); 715 return -1; 716 } 717 if (ret == 1) { 718 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 719 } 720 721 return ret; 722 } 723 724 int multifd_send_sync_main(void) 725 { 726 int i; 727 bool flush_zero_copy; 728 729 if (!migrate_multifd()) { 730 return 0; 731 } 732 if (multifd_send_state->pages->num) { 733 if (!multifd_send_pages()) { 734 error_report("%s: multifd_send_pages fail", __func__); 735 return -1; 736 } 737 } 738 739 flush_zero_copy = migrate_zero_copy_send(); 740 741 for (i = 0; i < migrate_multifd_channels(); i++) { 742 MultiFDSendParams *p = &multifd_send_state->params[i]; 743 744 if (multifd_send_should_exit()) { 745 return -1; 746 } 747 748 trace_multifd_send_sync_main_signal(p->id); 749 750 /* 751 * We should be the only user so far, so not possible to be set by 752 * others concurrently. 753 */ 754 assert(qatomic_read(&p->pending_sync) == false); 755 qatomic_set(&p->pending_sync, true); 756 qemu_sem_post(&p->sem); 757 } 758 for (i = 0; i < migrate_multifd_channels(); i++) { 759 MultiFDSendParams *p = &multifd_send_state->params[i]; 760 761 if (multifd_send_should_exit()) { 762 return -1; 763 } 764 765 qemu_sem_wait(&multifd_send_state->channels_ready); 766 trace_multifd_send_sync_main_wait(p->id); 767 qemu_sem_wait(&p->sem_sync); 768 769 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 770 return -1; 771 } 772 } 773 trace_multifd_send_sync_main(multifd_send_state->packet_num); 774 775 return 0; 776 } 777 778 static void *multifd_send_thread(void *opaque) 779 { 780 MultiFDSendParams *p = opaque; 781 MigrationThread *thread = NULL; 782 Error *local_err = NULL; 783 int ret = 0; 784 785 thread = migration_threads_add(p->name, qemu_get_thread_id()); 786 787 trace_multifd_send_thread_start(p->id); 788 rcu_register_thread(); 789 790 if (multifd_send_initial_packet(p, &local_err) < 0) { 791 ret = -1; 792 goto out; 793 } 794 795 while (true) { 796 qemu_sem_post(&multifd_send_state->channels_ready); 797 qemu_sem_wait(&p->sem); 798 799 if (multifd_send_should_exit()) { 800 break; 801 } 802 803 /* 804 * Read pending_job flag before p->pages. Pairs with the 805 * qatomic_store_release() in multifd_send_pages(). 806 */ 807 if (qatomic_load_acquire(&p->pending_job)) { 808 MultiFDPages_t *pages = p->pages; 809 810 p->iovs_num = 0; 811 assert(pages->num); 812 813 ret = multifd_send_state->ops->send_prepare(p, &local_err); 814 if (ret != 0) { 815 break; 816 } 817 818 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 819 0, p->write_flags, &local_err); 820 if (ret != 0) { 821 break; 822 } 823 824 stat64_add(&mig_stats.multifd_bytes, 825 p->next_packet_size + p->packet_len); 826 827 multifd_pages_reset(p->pages); 828 p->next_packet_size = 0; 829 830 /* 831 * Making sure p->pages is published before saying "we're 832 * free". Pairs with the smp_mb_acquire() in 833 * multifd_send_pages(). 834 */ 835 qatomic_store_release(&p->pending_job, false); 836 } else { 837 /* 838 * If not a normal job, must be a sync request. Note that 839 * pending_sync is a standalone flag (unlike pending_job), so 840 * it doesn't require explicit memory barriers. 841 */ 842 assert(qatomic_read(&p->pending_sync)); 843 p->flags = MULTIFD_FLAG_SYNC; 844 multifd_send_fill_packet(p); 845 ret = qio_channel_write_all(p->c, (void *)p->packet, 846 p->packet_len, &local_err); 847 if (ret != 0) { 848 break; 849 } 850 /* p->next_packet_size will always be zero for a SYNC packet */ 851 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 852 p->flags = 0; 853 qatomic_set(&p->pending_sync, false); 854 qemu_sem_post(&p->sem_sync); 855 } 856 } 857 858 out: 859 if (ret) { 860 assert(local_err); 861 trace_multifd_send_error(p->id); 862 multifd_send_set_error(local_err); 863 multifd_send_kick_main(p); 864 error_free(local_err); 865 } 866 867 rcu_unregister_thread(); 868 migration_threads_remove(thread); 869 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages); 870 871 return NULL; 872 } 873 874 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 875 876 static void *multifd_tls_handshake_thread(void *opaque) 877 { 878 MultiFDSendParams *p = opaque; 879 QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 880 881 qio_channel_tls_handshake(tioc, 882 multifd_new_send_channel_async, 883 p, 884 NULL, 885 NULL); 886 return NULL; 887 } 888 889 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 890 QIOChannel *ioc, 891 Error **errp) 892 { 893 MigrationState *s = migrate_get_current(); 894 const char *hostname = s->hostname; 895 QIOChannelTLS *tioc; 896 897 tioc = migration_tls_client_create(ioc, hostname, errp); 898 if (!tioc) { 899 return false; 900 } 901 902 /* 903 * Ownership of the socket channel now transfers to the newly 904 * created TLS channel, which has already taken a reference. 905 */ 906 object_unref(OBJECT(ioc)); 907 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 908 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 909 p->c = QIO_CHANNEL(tioc); 910 911 p->tls_thread_created = true; 912 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker", 913 multifd_tls_handshake_thread, p, 914 QEMU_THREAD_JOINABLE); 915 return true; 916 } 917 918 static bool multifd_channel_connect(MultiFDSendParams *p, 919 QIOChannel *ioc, 920 Error **errp) 921 { 922 qio_channel_set_delay(ioc, false); 923 924 migration_ioc_register_yank(ioc); 925 p->registered_yank = true; 926 p->c = ioc; 927 928 p->thread_created = true; 929 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 930 QEMU_THREAD_JOINABLE); 931 return true; 932 } 933 934 /* 935 * When TLS is enabled this function is called once to establish the 936 * TLS connection and a second time after the TLS handshake to create 937 * the multifd channel. Without TLS it goes straight into the channel 938 * creation. 939 */ 940 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 941 { 942 MultiFDSendParams *p = opaque; 943 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 944 Error *local_err = NULL; 945 bool ret; 946 947 trace_multifd_new_send_channel_async(p->id); 948 949 if (qio_task_propagate_error(task, &local_err)) { 950 ret = false; 951 goto out; 952 } 953 954 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 955 migrate_get_current()->hostname); 956 957 if (migrate_channel_requires_tls_upgrade(ioc)) { 958 ret = multifd_tls_channel_connect(p, ioc, &local_err); 959 if (ret) { 960 return; 961 } 962 } else { 963 ret = multifd_channel_connect(p, ioc, &local_err); 964 } 965 966 out: 967 /* 968 * Here we're not interested whether creation succeeded, only that 969 * it happened at all. 970 */ 971 qemu_sem_post(&multifd_send_state->channels_created); 972 973 if (ret) { 974 return; 975 } 976 977 trace_multifd_new_send_channel_async_error(p->id, local_err); 978 multifd_send_set_error(local_err); 979 if (!p->c) { 980 /* 981 * If no channel has been created, drop the initial 982 * reference. Otherwise cleanup happens at 983 * multifd_send_channel_destroy() 984 */ 985 object_unref(OBJECT(ioc)); 986 } 987 error_free(local_err); 988 } 989 990 static void multifd_new_send_channel_create(gpointer opaque) 991 { 992 socket_send_channel_create(multifd_new_send_channel_async, opaque); 993 } 994 995 bool multifd_send_setup(void) 996 { 997 MigrationState *s = migrate_get_current(); 998 Error *local_err = NULL; 999 int thread_count, ret = 0; 1000 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1001 uint8_t i; 1002 1003 if (!migrate_multifd()) { 1004 return true; 1005 } 1006 1007 thread_count = migrate_multifd_channels(); 1008 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 1009 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 1010 multifd_send_state->pages = multifd_pages_init(page_count); 1011 qemu_sem_init(&multifd_send_state->channels_created, 0); 1012 qemu_sem_init(&multifd_send_state->channels_ready, 0); 1013 qatomic_set(&multifd_send_state->exiting, 0); 1014 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 1015 1016 for (i = 0; i < thread_count; i++) { 1017 MultiFDSendParams *p = &multifd_send_state->params[i]; 1018 1019 qemu_sem_init(&p->sem, 0); 1020 qemu_sem_init(&p->sem_sync, 0); 1021 p->id = i; 1022 p->pages = multifd_pages_init(page_count); 1023 p->packet_len = sizeof(MultiFDPacket_t) 1024 + sizeof(uint64_t) * page_count; 1025 p->packet = g_malloc0(p->packet_len); 1026 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 1027 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 1028 p->name = g_strdup_printf("multifdsend_%d", i); 1029 /* We need one extra place for the packet header */ 1030 p->iov = g_new0(struct iovec, page_count + 1); 1031 p->page_size = qemu_target_page_size(); 1032 p->page_count = page_count; 1033 p->write_flags = 0; 1034 multifd_new_send_channel_create(p); 1035 } 1036 1037 /* 1038 * Wait until channel creation has started for all channels. The 1039 * creation can still fail, but no more channels will be created 1040 * past this point. 1041 */ 1042 for (i = 0; i < thread_count; i++) { 1043 qemu_sem_wait(&multifd_send_state->channels_created); 1044 } 1045 1046 for (i = 0; i < thread_count; i++) { 1047 MultiFDSendParams *p = &multifd_send_state->params[i]; 1048 1049 ret = multifd_send_state->ops->send_setup(p, &local_err); 1050 if (ret) { 1051 break; 1052 } 1053 } 1054 1055 if (ret) { 1056 migrate_set_error(s, local_err); 1057 error_report_err(local_err); 1058 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1059 MIGRATION_STATUS_FAILED); 1060 return false; 1061 } 1062 1063 return true; 1064 } 1065 1066 struct { 1067 MultiFDRecvParams *params; 1068 /* number of created threads */ 1069 int count; 1070 /* syncs main thread and channels */ 1071 QemuSemaphore sem_sync; 1072 /* global number of generated multifd packets */ 1073 uint64_t packet_num; 1074 /* multifd ops */ 1075 MultiFDMethods *ops; 1076 } *multifd_recv_state; 1077 1078 static void multifd_recv_terminate_threads(Error *err) 1079 { 1080 int i; 1081 1082 trace_multifd_recv_terminate_threads(err != NULL); 1083 1084 if (err) { 1085 MigrationState *s = migrate_get_current(); 1086 migrate_set_error(s, err); 1087 if (s->state == MIGRATION_STATUS_SETUP || 1088 s->state == MIGRATION_STATUS_ACTIVE) { 1089 migrate_set_state(&s->state, s->state, 1090 MIGRATION_STATUS_FAILED); 1091 } 1092 } 1093 1094 for (i = 0; i < migrate_multifd_channels(); i++) { 1095 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1096 1097 qemu_mutex_lock(&p->mutex); 1098 p->quit = true; 1099 /* 1100 * We could arrive here for two reasons: 1101 * - normal quit, i.e. everything went fine, just finished 1102 * - error quit: We close the channels so the channel threads 1103 * finish the qio_channel_read_all_eof() 1104 */ 1105 if (p->c) { 1106 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1107 } 1108 qemu_mutex_unlock(&p->mutex); 1109 } 1110 } 1111 1112 void multifd_recv_shutdown(void) 1113 { 1114 if (migrate_multifd()) { 1115 multifd_recv_terminate_threads(NULL); 1116 } 1117 } 1118 1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1120 { 1121 migration_ioc_unregister_yank(p->c); 1122 object_unref(OBJECT(p->c)); 1123 p->c = NULL; 1124 qemu_mutex_destroy(&p->mutex); 1125 qemu_sem_destroy(&p->sem_sync); 1126 g_free(p->name); 1127 p->name = NULL; 1128 p->packet_len = 0; 1129 g_free(p->packet); 1130 p->packet = NULL; 1131 g_free(p->iov); 1132 p->iov = NULL; 1133 g_free(p->normal); 1134 p->normal = NULL; 1135 multifd_recv_state->ops->recv_cleanup(p); 1136 } 1137 1138 static void multifd_recv_cleanup_state(void) 1139 { 1140 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1141 g_free(multifd_recv_state->params); 1142 multifd_recv_state->params = NULL; 1143 g_free(multifd_recv_state); 1144 multifd_recv_state = NULL; 1145 } 1146 1147 void multifd_recv_cleanup(void) 1148 { 1149 int i; 1150 1151 if (!migrate_multifd()) { 1152 return; 1153 } 1154 multifd_recv_terminate_threads(NULL); 1155 for (i = 0; i < migrate_multifd_channels(); i++) { 1156 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1157 1158 /* 1159 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1160 * however try to wakeup it without harm in cleanup phase. 1161 */ 1162 qemu_sem_post(&p->sem_sync); 1163 1164 if (p->thread_created) { 1165 qemu_thread_join(&p->thread); 1166 } 1167 } 1168 for (i = 0; i < migrate_multifd_channels(); i++) { 1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1170 } 1171 multifd_recv_cleanup_state(); 1172 } 1173 1174 void multifd_recv_sync_main(void) 1175 { 1176 int i; 1177 1178 if (!migrate_multifd()) { 1179 return; 1180 } 1181 for (i = 0; i < migrate_multifd_channels(); i++) { 1182 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1183 1184 trace_multifd_recv_sync_main_wait(p->id); 1185 qemu_sem_wait(&multifd_recv_state->sem_sync); 1186 } 1187 for (i = 0; i < migrate_multifd_channels(); i++) { 1188 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1189 1190 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1191 if (multifd_recv_state->packet_num < p->packet_num) { 1192 multifd_recv_state->packet_num = p->packet_num; 1193 } 1194 } 1195 trace_multifd_recv_sync_main_signal(p->id); 1196 qemu_sem_post(&p->sem_sync); 1197 } 1198 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1199 } 1200 1201 static void *multifd_recv_thread(void *opaque) 1202 { 1203 MultiFDRecvParams *p = opaque; 1204 Error *local_err = NULL; 1205 int ret; 1206 1207 trace_multifd_recv_thread_start(p->id); 1208 rcu_register_thread(); 1209 1210 while (true) { 1211 uint32_t flags; 1212 1213 if (p->quit) { 1214 break; 1215 } 1216 1217 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1218 p->packet_len, &local_err); 1219 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */ 1220 break; 1221 } 1222 1223 qemu_mutex_lock(&p->mutex); 1224 ret = multifd_recv_unfill_packet(p, &local_err); 1225 if (ret) { 1226 qemu_mutex_unlock(&p->mutex); 1227 break; 1228 } 1229 1230 flags = p->flags; 1231 /* recv methods don't know how to handle the SYNC flag */ 1232 p->flags &= ~MULTIFD_FLAG_SYNC; 1233 qemu_mutex_unlock(&p->mutex); 1234 1235 if (p->normal_num) { 1236 ret = multifd_recv_state->ops->recv_pages(p, &local_err); 1237 if (ret != 0) { 1238 break; 1239 } 1240 } 1241 1242 if (flags & MULTIFD_FLAG_SYNC) { 1243 qemu_sem_post(&multifd_recv_state->sem_sync); 1244 qemu_sem_wait(&p->sem_sync); 1245 } 1246 } 1247 1248 if (local_err) { 1249 multifd_recv_terminate_threads(local_err); 1250 error_free(local_err); 1251 } 1252 1253 rcu_unregister_thread(); 1254 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages); 1255 1256 return NULL; 1257 } 1258 1259 int multifd_recv_setup(Error **errp) 1260 { 1261 int thread_count; 1262 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1263 uint8_t i; 1264 1265 /* 1266 * Return successfully if multiFD recv state is already initialised 1267 * or multiFD is not enabled. 1268 */ 1269 if (multifd_recv_state || !migrate_multifd()) { 1270 return 0; 1271 } 1272 1273 thread_count = migrate_multifd_channels(); 1274 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1275 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1276 qatomic_set(&multifd_recv_state->count, 0); 1277 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1278 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1279 1280 for (i = 0; i < thread_count; i++) { 1281 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1282 1283 qemu_mutex_init(&p->mutex); 1284 qemu_sem_init(&p->sem_sync, 0); 1285 p->quit = false; 1286 p->id = i; 1287 p->packet_len = sizeof(MultiFDPacket_t) 1288 + sizeof(uint64_t) * page_count; 1289 p->packet = g_malloc0(p->packet_len); 1290 p->name = g_strdup_printf("multifdrecv_%d", i); 1291 p->iov = g_new0(struct iovec, page_count); 1292 p->normal = g_new0(ram_addr_t, page_count); 1293 p->page_count = page_count; 1294 p->page_size = qemu_target_page_size(); 1295 } 1296 1297 for (i = 0; i < thread_count; i++) { 1298 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1299 int ret; 1300 1301 ret = multifd_recv_state->ops->recv_setup(p, errp); 1302 if (ret) { 1303 return ret; 1304 } 1305 } 1306 return 0; 1307 } 1308 1309 bool multifd_recv_all_channels_created(void) 1310 { 1311 int thread_count = migrate_multifd_channels(); 1312 1313 if (!migrate_multifd()) { 1314 return true; 1315 } 1316 1317 if (!multifd_recv_state) { 1318 /* Called before any connections created */ 1319 return false; 1320 } 1321 1322 return thread_count == qatomic_read(&multifd_recv_state->count); 1323 } 1324 1325 /* 1326 * Try to receive all multifd channels to get ready for the migration. 1327 * Sets @errp when failing to receive the current channel. 1328 */ 1329 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1330 { 1331 MultiFDRecvParams *p; 1332 Error *local_err = NULL; 1333 int id; 1334 1335 id = multifd_recv_initial_packet(ioc, &local_err); 1336 if (id < 0) { 1337 multifd_recv_terminate_threads(local_err); 1338 error_propagate_prepend(errp, local_err, 1339 "failed to receive packet" 1340 " via multifd channel %d: ", 1341 qatomic_read(&multifd_recv_state->count)); 1342 return; 1343 } 1344 trace_multifd_recv_new_channel(id); 1345 1346 p = &multifd_recv_state->params[id]; 1347 if (p->c != NULL) { 1348 error_setg(&local_err, "multifd: received id '%d' already setup'", 1349 id); 1350 multifd_recv_terminate_threads(local_err); 1351 error_propagate(errp, local_err); 1352 return; 1353 } 1354 p->c = ioc; 1355 object_ref(OBJECT(ioc)); 1356 1357 p->thread_created = true; 1358 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1359 QEMU_THREAD_JOINABLE); 1360 qatomic_inc(&multifd_recv_state->count); 1361 } 1362