1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/cutils.h" 15 #include "qemu/iov.h" 16 #include "qemu/rcu.h" 17 #include "exec/target_page.h" 18 #include "system/system.h" 19 #include "system/ramblock.h" 20 #include "qemu/error-report.h" 21 #include "qapi/error.h" 22 #include "file.h" 23 #include "migration/misc.h" 24 #include "migration.h" 25 #include "migration-stats.h" 26 #include "savevm.h" 27 #include "socket.h" 28 #include "tls.h" 29 #include "qemu-file.h" 30 #include "trace.h" 31 #include "multifd.h" 32 #include "threadinfo.h" 33 #include "options.h" 34 #include "qemu/yank.h" 35 #include "io/channel-file.h" 36 #include "io/channel-socket.h" 37 #include "yank_functions.h" 38 39 typedef struct { 40 uint32_t magic; 41 uint32_t version; 42 unsigned char uuid[16]; /* QemuUUID */ 43 uint8_t id; 44 uint8_t unused1[7]; /* Reserved for future use */ 45 uint64_t unused2[4]; /* Reserved for future use */ 46 } __attribute__((packed)) MultiFDInit_t; 47 48 struct { 49 MultiFDSendParams *params; 50 51 /* multifd_send() body is not thread safe, needs serialization */ 52 QemuMutex multifd_send_mutex; 53 54 /* 55 * Global number of generated multifd packets. 56 * 57 * Note that we used 'uintptr_t' because it'll naturally support atomic 58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems 59 * multifd will overflow the packet_num easier, but that should be 60 * fine. 61 * 62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all 63 * hosts, however so far it does not support atomic fetch_add() yet. 64 * Make it easy for now. 65 */ 66 uintptr_t packet_num; 67 /* 68 * Synchronization point past which no more channels will be 69 * created. 70 */ 71 QemuSemaphore channels_created; 72 /* send channels ready */ 73 QemuSemaphore channels_ready; 74 /* 75 * Have we already run terminate threads. There is a race when it 76 * happens that we got one error while we are exiting. 77 * We will use atomic operations. Only valid values are 0 and 1. 78 */ 79 int exiting; 80 /* multifd ops */ 81 const MultiFDMethods *ops; 82 } *multifd_send_state; 83 84 struct { 85 MultiFDRecvParams *params; 86 MultiFDRecvData *data; 87 /* number of created threads */ 88 int count; 89 /* 90 * This is always posted by the recv threads, the migration thread 91 * uses it to wait for recv threads to finish assigned tasks. 92 */ 93 QemuSemaphore sem_sync; 94 /* global number of generated multifd packets */ 95 uint64_t packet_num; 96 int exiting; 97 /* multifd ops */ 98 const MultiFDMethods *ops; 99 } *multifd_recv_state; 100 101 MultiFDSendData *multifd_send_data_alloc(void) 102 { 103 MultiFDSendData *new = g_new0(MultiFDSendData, 1); 104 105 multifd_ram_payload_alloc(&new->u.ram); 106 /* Device state allocates its payload on-demand */ 107 108 return new; 109 } 110 111 void multifd_send_data_clear(MultiFDSendData *data) 112 { 113 if (multifd_payload_empty(data)) { 114 return; 115 } 116 117 switch (data->type) { 118 case MULTIFD_PAYLOAD_DEVICE_STATE: 119 multifd_send_data_clear_device_state(&data->u.device_state); 120 break; 121 default: 122 /* Nothing to do */ 123 break; 124 } 125 126 data->type = MULTIFD_PAYLOAD_NONE; 127 } 128 129 void multifd_send_data_free(MultiFDSendData *data) 130 { 131 if (!data) { 132 return; 133 } 134 135 /* This also free's device state payload */ 136 multifd_send_data_clear(data); 137 138 multifd_ram_payload_free(&data->u.ram); 139 140 g_free(data); 141 } 142 143 static bool multifd_use_packets(void) 144 { 145 return !migrate_mapped_ram(); 146 } 147 148 void multifd_send_channel_created(void) 149 { 150 qemu_sem_post(&multifd_send_state->channels_created); 151 } 152 153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {}; 154 155 void multifd_register_ops(int method, const MultiFDMethods *ops) 156 { 157 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX); 158 assert(!multifd_ops[method]); 159 multifd_ops[method] = ops; 160 } 161 162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 163 { 164 MultiFDInit_t msg = {}; 165 size_t size = sizeof(msg); 166 int ret; 167 168 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 169 msg.version = cpu_to_be32(MULTIFD_VERSION); 170 msg.id = p->id; 171 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 172 173 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp); 174 if (ret != 0) { 175 return -1; 176 } 177 stat64_add(&mig_stats.multifd_bytes, size); 178 return 0; 179 } 180 181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 182 { 183 MultiFDInit_t msg; 184 int ret; 185 186 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 187 if (ret != 0) { 188 return -1; 189 } 190 191 msg.magic = be32_to_cpu(msg.magic); 192 msg.version = be32_to_cpu(msg.version); 193 194 if (msg.magic != MULTIFD_MAGIC) { 195 error_setg(errp, "multifd: received packet magic %x " 196 "expected %x", msg.magic, MULTIFD_MAGIC); 197 return -1; 198 } 199 200 if (msg.version != MULTIFD_VERSION) { 201 error_setg(errp, "multifd: received packet version %u " 202 "expected %u", msg.version, MULTIFD_VERSION); 203 return -1; 204 } 205 206 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 207 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 208 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 209 210 error_setg(errp, "multifd: received uuid '%s' and expected " 211 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 212 g_free(uuid); 213 g_free(msg_uuid); 214 return -1; 215 } 216 217 if (msg.id > migrate_multifd_channels()) { 218 error_setg(errp, "multifd: received channel id %u is greater than " 219 "number of channels %u", msg.id, migrate_multifd_channels()); 220 return -1; 221 } 222 223 return msg.id; 224 } 225 226 /* Fills a RAM multifd packet */ 227 void multifd_send_fill_packet(MultiFDSendParams *p) 228 { 229 MultiFDPacket_t *packet = p->packet; 230 uint64_t packet_num; 231 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC; 232 233 memset(packet, 0, p->packet_len); 234 235 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 236 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); 237 238 packet->hdr.flags = cpu_to_be32(p->flags); 239 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 240 241 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num); 242 packet->packet_num = cpu_to_be64(packet_num); 243 244 p->packets_sent++; 245 246 if (!sync_packet) { 247 multifd_ram_fill_packet(p); 248 } 249 250 trace_multifd_send_fill(p->id, packet_num, 251 p->flags, p->next_packet_size); 252 } 253 254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p, 255 const MultiFDPacketHdr_t *hdr, 256 Error **errp) 257 { 258 uint32_t magic = be32_to_cpu(hdr->magic); 259 uint32_t version = be32_to_cpu(hdr->version); 260 261 if (magic != MULTIFD_MAGIC) { 262 error_setg(errp, "multifd: received packet magic %x, expected %x", 263 magic, MULTIFD_MAGIC); 264 return -1; 265 } 266 267 if (version != MULTIFD_VERSION) { 268 error_setg(errp, "multifd: received packet version %u, expected %u", 269 version, MULTIFD_VERSION); 270 return -1; 271 } 272 273 p->flags = be32_to_cpu(hdr->flags); 274 275 return 0; 276 } 277 278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p, 279 Error **errp) 280 { 281 MultiFDPacketDeviceState_t *packet = p->packet_dev_state; 282 283 packet->instance_id = be32_to_cpu(packet->instance_id); 284 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 285 286 return 0; 287 } 288 289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp) 290 { 291 const MultiFDPacket_t *packet = p->packet; 292 int ret = 0; 293 294 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 295 p->packet_num = be64_to_cpu(packet->packet_num); 296 297 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */ 298 ret = multifd_ram_unfill_packet(p, errp); 299 300 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags, 301 p->next_packet_size); 302 303 return ret; 304 } 305 306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 307 { 308 p->packets_recved++; 309 310 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) { 311 return multifd_recv_unfill_packet_device_state(p, errp); 312 } 313 314 return multifd_recv_unfill_packet_ram(p, errp); 315 } 316 317 static bool multifd_send_should_exit(void) 318 { 319 return qatomic_read(&multifd_send_state->exiting); 320 } 321 322 static bool multifd_recv_should_exit(void) 323 { 324 return qatomic_read(&multifd_recv_state->exiting); 325 } 326 327 /* 328 * The migration thread can wait on either of the two semaphores. This 329 * function can be used to kick the main thread out of waiting on either of 330 * them. Should mostly only be called when something wrong happened with 331 * the current multifd send thread. 332 */ 333 static void multifd_send_kick_main(MultiFDSendParams *p) 334 { 335 qemu_sem_post(&p->sem_sync); 336 qemu_sem_post(&multifd_send_state->channels_ready); 337 } 338 339 /* 340 * multifd_send() works by exchanging the MultiFDSendData object 341 * provided by the caller with an unused MultiFDSendData object from 342 * the next channel that is found to be idle. 343 * 344 * The channel owns the data until it finishes transmitting and the 345 * caller owns the empty object until it fills it with data and calls 346 * this function again. No locking necessary. 347 * 348 * Switching is safe because both the migration thread and the channel 349 * thread have barriers in place to serialize access. 350 * 351 * Returns true if succeed, false otherwise. 352 */ 353 bool multifd_send(MultiFDSendData **send_data) 354 { 355 int i; 356 static int next_channel; 357 MultiFDSendParams *p = NULL; /* make happy gcc */ 358 MultiFDSendData *tmp; 359 360 if (multifd_send_should_exit()) { 361 return false; 362 } 363 364 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex); 365 366 /* We wait here, until at least one channel is ready */ 367 qemu_sem_wait(&multifd_send_state->channels_ready); 368 369 /* 370 * next_channel can remain from a previous migration that was 371 * using more channels, so ensure it doesn't overflow if the 372 * limit is lower now. 373 */ 374 next_channel %= migrate_multifd_channels(); 375 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 376 if (multifd_send_should_exit()) { 377 return false; 378 } 379 p = &multifd_send_state->params[i]; 380 /* 381 * Lockless read to p->pending_job is safe, because only multifd 382 * sender thread can clear it. 383 */ 384 if (qatomic_read(&p->pending_job) == false) { 385 next_channel = (i + 1) % migrate_multifd_channels(); 386 break; 387 } 388 } 389 390 /* 391 * Make sure we read p->pending_job before all the rest. Pairs with 392 * qatomic_store_release() in multifd_send_thread(). 393 */ 394 smp_mb_acquire(); 395 396 assert(multifd_payload_empty(p->data)); 397 398 /* 399 * Swap the pointers. The channel gets the client data for 400 * transferring and the client gets back an unused data slot. 401 */ 402 tmp = *send_data; 403 *send_data = p->data; 404 p->data = tmp; 405 406 /* 407 * Making sure p->data is setup before marking pending_job=true. Pairs 408 * with the qatomic_load_acquire() in multifd_send_thread(). 409 */ 410 qatomic_store_release(&p->pending_job, true); 411 qemu_sem_post(&p->sem); 412 413 return true; 414 } 415 416 /* Multifd send side hit an error; remember it and prepare to quit */ 417 static void multifd_send_set_error(Error *err) 418 { 419 /* 420 * We don't want to exit each threads twice. Depending on where 421 * we get the error, or if there are two independent errors in two 422 * threads at the same time, we can end calling this function 423 * twice. 424 */ 425 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 426 return; 427 } 428 429 if (err) { 430 MigrationState *s = migrate_get_current(); 431 migrate_set_error(s, err); 432 if (s->state == MIGRATION_STATUS_SETUP || 433 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 434 s->state == MIGRATION_STATUS_DEVICE || 435 s->state == MIGRATION_STATUS_ACTIVE) { 436 migrate_set_state(&s->state, s->state, 437 MIGRATION_STATUS_FAILED); 438 } 439 } 440 } 441 442 /* 443 * Gracefully shutdown IOChannels. Only needed for successful migrations on 444 * top of TLS channels. Otherwise it is same to qio_channel_shutdown(). 445 * 446 * A successful migration also guarantees multifd sender threads are 447 * properly flushed and halted. It is only safe to send BYE in the 448 * migration thread here when we know there's no other thread writting to 449 * the channel, because GnuTLS doesn't support concurrent writers. 450 */ 451 static void migration_ioc_shutdown_gracefully(QIOChannel *ioc) 452 { 453 Error *local_err = NULL; 454 455 if (!migration_has_failed(migrate_get_current()) && 456 object_dynamic_cast((Object *)ioc, TYPE_QIO_CHANNEL_TLS)) { 457 458 /* 459 * The destination expects the TLS session to always be properly 460 * terminated. This helps to detect a premature termination in the 461 * middle of the stream. Note that older QEMUs always break the 462 * connection on the source and the destination always sees 463 * GNUTLS_E_PREMATURE_TERMINATION. 464 */ 465 migration_tls_channel_end(ioc, &local_err); 466 if (local_err) { 467 warn_reportf_err(local_err, 468 "Failed to gracefully terminate TLS connection: "); 469 } 470 } 471 472 qio_channel_shutdown(ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 473 } 474 475 static void multifd_send_terminate_threads(void) 476 { 477 int i; 478 479 trace_multifd_send_terminate_threads(); 480 481 /* 482 * Tell everyone we're quitting. No xchg() needed here; we simply 483 * always set it. 484 */ 485 qatomic_set(&multifd_send_state->exiting, 1); 486 487 /* 488 * Firstly, kick all threads out; no matter whether they are just idle, 489 * or blocked in an IO system call. 490 */ 491 for (i = 0; i < migrate_multifd_channels(); i++) { 492 MultiFDSendParams *p = &multifd_send_state->params[i]; 493 494 qemu_sem_post(&p->sem); 495 if (p->c) { 496 migration_ioc_shutdown_gracefully(p->c); 497 } 498 } 499 500 /* 501 * Finally recycle all the threads. 502 */ 503 for (i = 0; i < migrate_multifd_channels(); i++) { 504 MultiFDSendParams *p = &multifd_send_state->params[i]; 505 506 if (p->tls_thread_created) { 507 qemu_thread_join(&p->tls_thread); 508 } 509 510 if (p->thread_created) { 511 qemu_thread_join(&p->thread); 512 } 513 } 514 } 515 516 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) 517 { 518 if (p->c) { 519 migration_ioc_unregister_yank(p->c); 520 /* 521 * The object_unref() cannot guarantee the fd will always be 522 * released because finalize() of the iochannel is only 523 * triggered on the last reference and it's not guaranteed 524 * that we always hold the last refcount when reaching here. 525 * 526 * Closing the fd explicitly has the benefit that if there is any 527 * registered I/O handler callbacks on such fd, that will get a 528 * POLLNVAL event and will further trigger the cleanup to finally 529 * release the IOC. 530 * 531 * FIXME: It should logically be guaranteed that all multifd 532 * channels have no I/O handler callback registered when reaching 533 * here, because migration thread will wait for all multifd channel 534 * establishments to complete during setup. Since 535 * migration_cleanup() will be scheduled in main thread too, all 536 * previous callbacks should guarantee to be completed when 537 * reaching here. See multifd_send_state.channels_created and its 538 * usage. In the future, we could replace this with an assert 539 * making sure we're the last reference, or simply drop it if above 540 * is more clear to be justified. 541 */ 542 qio_channel_close(p->c, &error_abort); 543 object_unref(OBJECT(p->c)); 544 p->c = NULL; 545 } 546 qemu_sem_destroy(&p->sem); 547 qemu_sem_destroy(&p->sem_sync); 548 g_free(p->name); 549 p->name = NULL; 550 g_clear_pointer(&p->data, multifd_send_data_free); 551 p->packet_len = 0; 552 g_clear_pointer(&p->packet_device_state, g_free); 553 g_free(p->packet); 554 p->packet = NULL; 555 multifd_send_state->ops->send_cleanup(p, errp); 556 assert(!p->iov); 557 558 return *errp == NULL; 559 } 560 561 static void multifd_send_cleanup_state(void) 562 { 563 file_cleanup_outgoing_migration(); 564 socket_cleanup_outgoing_migration(); 565 multifd_device_state_send_cleanup(); 566 qemu_sem_destroy(&multifd_send_state->channels_created); 567 qemu_sem_destroy(&multifd_send_state->channels_ready); 568 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex); 569 g_free(multifd_send_state->params); 570 multifd_send_state->params = NULL; 571 g_free(multifd_send_state); 572 multifd_send_state = NULL; 573 } 574 575 void multifd_send_shutdown(void) 576 { 577 int i; 578 579 if (!migrate_multifd()) { 580 return; 581 } 582 583 multifd_send_terminate_threads(); 584 585 for (i = 0; i < migrate_multifd_channels(); i++) { 586 MultiFDSendParams *p = &multifd_send_state->params[i]; 587 Error *local_err = NULL; 588 589 if (!multifd_send_cleanup_channel(p, &local_err)) { 590 migrate_set_error(migrate_get_current(), local_err); 591 error_free(local_err); 592 } 593 } 594 595 multifd_send_cleanup_state(); 596 } 597 598 static int multifd_zero_copy_flush(QIOChannel *c) 599 { 600 int ret; 601 Error *err = NULL; 602 603 ret = qio_channel_flush(c, &err); 604 if (ret < 0) { 605 error_report_err(err); 606 return -1; 607 } 608 if (ret == 1) { 609 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1); 610 } 611 612 return ret; 613 } 614 615 int multifd_send_sync_main(MultiFDSyncReq req) 616 { 617 int i; 618 bool flush_zero_copy; 619 620 assert(req != MULTIFD_SYNC_NONE); 621 622 flush_zero_copy = migrate_zero_copy_send(); 623 624 for (i = 0; i < migrate_multifd_channels(); i++) { 625 MultiFDSendParams *p = &multifd_send_state->params[i]; 626 627 if (multifd_send_should_exit()) { 628 return -1; 629 } 630 631 trace_multifd_send_sync_main_signal(p->id); 632 633 /* 634 * We should be the only user so far, so not possible to be set by 635 * others concurrently. 636 */ 637 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE); 638 qatomic_set(&p->pending_sync, req); 639 qemu_sem_post(&p->sem); 640 } 641 for (i = 0; i < migrate_multifd_channels(); i++) { 642 MultiFDSendParams *p = &multifd_send_state->params[i]; 643 644 if (multifd_send_should_exit()) { 645 return -1; 646 } 647 648 qemu_sem_wait(&multifd_send_state->channels_ready); 649 trace_multifd_send_sync_main_wait(p->id); 650 qemu_sem_wait(&p->sem_sync); 651 652 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 653 return -1; 654 } 655 } 656 trace_multifd_send_sync_main(multifd_send_state->packet_num); 657 658 return 0; 659 } 660 661 static void *multifd_send_thread(void *opaque) 662 { 663 MultiFDSendParams *p = opaque; 664 MigrationThread *thread = NULL; 665 Error *local_err = NULL; 666 int ret = 0; 667 bool use_packets = multifd_use_packets(); 668 669 thread = migration_threads_add(p->name, qemu_get_thread_id()); 670 671 trace_multifd_send_thread_start(p->id); 672 rcu_register_thread(); 673 674 if (use_packets) { 675 if (multifd_send_initial_packet(p, &local_err) < 0) { 676 ret = -1; 677 goto out; 678 } 679 } 680 681 while (true) { 682 qemu_sem_post(&multifd_send_state->channels_ready); 683 qemu_sem_wait(&p->sem); 684 685 if (multifd_send_should_exit()) { 686 break; 687 } 688 689 /* 690 * Read pending_job flag before p->data. Pairs with the 691 * qatomic_store_release() in multifd_send(). 692 */ 693 if (qatomic_load_acquire(&p->pending_job)) { 694 bool is_device_state = multifd_payload_device_state(p->data); 695 size_t total_size; 696 int write_flags_masked = 0; 697 698 p->flags = 0; 699 p->iovs_num = 0; 700 assert(!multifd_payload_empty(p->data)); 701 702 if (is_device_state) { 703 multifd_device_state_send_prepare(p); 704 705 /* Device state packets cannot be sent via zerocopy */ 706 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 707 } else { 708 ret = multifd_send_state->ops->send_prepare(p, &local_err); 709 if (ret != 0) { 710 break; 711 } 712 } 713 714 /* 715 * The packet header in the zerocopy RAM case is accounted for 716 * in multifd_nocomp_send_prepare() - where it is actually 717 * being sent. 718 */ 719 total_size = iov_size(p->iov, p->iovs_num); 720 721 if (migrate_mapped_ram()) { 722 assert(!is_device_state); 723 724 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, 725 &p->data->u.ram, &local_err); 726 } else { 727 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, 728 NULL, 0, 729 p->write_flags & ~write_flags_masked, 730 &local_err); 731 } 732 733 if (ret != 0) { 734 break; 735 } 736 737 stat64_add(&mig_stats.multifd_bytes, total_size); 738 739 p->next_packet_size = 0; 740 multifd_send_data_clear(p->data); 741 742 /* 743 * Making sure p->data is published before saying "we're 744 * free". Pairs with the smp_mb_acquire() in 745 * multifd_send(). 746 */ 747 qatomic_store_release(&p->pending_job, false); 748 } else { 749 MultiFDSyncReq req = qatomic_read(&p->pending_sync); 750 751 /* 752 * If not a normal job, must be a sync request. Note that 753 * pending_sync is a standalone flag (unlike pending_job), so 754 * it doesn't require explicit memory barriers. 755 */ 756 assert(req != MULTIFD_SYNC_NONE); 757 758 /* Only push the SYNC message if it involves a remote sync */ 759 if (req == MULTIFD_SYNC_ALL) { 760 p->flags = MULTIFD_FLAG_SYNC; 761 multifd_send_fill_packet(p); 762 ret = qio_channel_write_all(p->c, (void *)p->packet, 763 p->packet_len, &local_err); 764 if (ret != 0) { 765 break; 766 } 767 /* p->next_packet_size will always be zero for a SYNC packet */ 768 stat64_add(&mig_stats.multifd_bytes, p->packet_len); 769 } 770 771 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE); 772 qemu_sem_post(&p->sem_sync); 773 } 774 } 775 776 out: 777 if (ret) { 778 assert(local_err); 779 trace_multifd_send_error(p->id); 780 multifd_send_set_error(local_err); 781 multifd_send_kick_main(p); 782 error_free(local_err); 783 } 784 785 rcu_unregister_thread(); 786 migration_threads_remove(thread); 787 trace_multifd_send_thread_end(p->id, p->packets_sent); 788 789 return NULL; 790 } 791 792 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque); 793 794 typedef struct { 795 MultiFDSendParams *p; 796 QIOChannelTLS *tioc; 797 } MultiFDTLSThreadArgs; 798 799 static void *multifd_tls_handshake_thread(void *opaque) 800 { 801 MultiFDTLSThreadArgs *args = opaque; 802 803 qio_channel_tls_handshake(args->tioc, 804 multifd_new_send_channel_async, 805 args->p, 806 NULL, 807 NULL); 808 g_free(args); 809 810 return NULL; 811 } 812 813 static bool multifd_tls_channel_connect(MultiFDSendParams *p, 814 QIOChannel *ioc, 815 Error **errp) 816 { 817 MigrationState *s = migrate_get_current(); 818 const char *hostname = s->hostname; 819 MultiFDTLSThreadArgs *args; 820 QIOChannelTLS *tioc; 821 822 tioc = migration_tls_client_create(ioc, hostname, errp); 823 if (!tioc) { 824 return false; 825 } 826 827 /* 828 * Ownership of the socket channel now transfers to the newly 829 * created TLS channel, which has already taken a reference. 830 */ 831 object_unref(OBJECT(ioc)); 832 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 833 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 834 835 args = g_new0(MultiFDTLSThreadArgs, 1); 836 args->tioc = tioc; 837 args->p = p; 838 839 p->tls_thread_created = true; 840 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS, 841 multifd_tls_handshake_thread, args, 842 QEMU_THREAD_JOINABLE); 843 return true; 844 } 845 846 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc) 847 { 848 qio_channel_set_delay(ioc, false); 849 850 migration_ioc_register_yank(ioc); 851 /* Setup p->c only if the channel is completely setup */ 852 p->c = ioc; 853 854 p->thread_created = true; 855 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 856 QEMU_THREAD_JOINABLE); 857 } 858 859 /* 860 * When TLS is enabled this function is called once to establish the 861 * TLS connection and a second time after the TLS handshake to create 862 * the multifd channel. Without TLS it goes straight into the channel 863 * creation. 864 */ 865 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 866 { 867 MultiFDSendParams *p = opaque; 868 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 869 Error *local_err = NULL; 870 bool ret; 871 872 trace_multifd_new_send_channel_async(p->id); 873 874 if (qio_task_propagate_error(task, &local_err)) { 875 ret = false; 876 goto out; 877 } 878 879 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)), 880 migrate_get_current()->hostname); 881 882 if (migrate_channel_requires_tls_upgrade(ioc)) { 883 ret = multifd_tls_channel_connect(p, ioc, &local_err); 884 if (ret) { 885 return; 886 } 887 } else { 888 multifd_channel_connect(p, ioc); 889 ret = true; 890 } 891 892 out: 893 /* 894 * Here we're not interested whether creation succeeded, only that 895 * it happened at all. 896 */ 897 multifd_send_channel_created(); 898 899 if (ret) { 900 return; 901 } 902 903 trace_multifd_new_send_channel_async_error(p->id, local_err); 904 multifd_send_set_error(local_err); 905 /* 906 * For error cases (TLS or non-TLS), IO channel is always freed here 907 * rather than when cleanup multifd: since p->c is not set, multifd 908 * cleanup code doesn't even know its existence. 909 */ 910 object_unref(OBJECT(ioc)); 911 error_free(local_err); 912 } 913 914 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp) 915 { 916 if (!multifd_use_packets()) { 917 return file_send_channel_create(opaque, errp); 918 } 919 920 socket_send_channel_create(multifd_new_send_channel_async, opaque); 921 return true; 922 } 923 924 bool multifd_send_setup(void) 925 { 926 MigrationState *s = migrate_get_current(); 927 int thread_count, ret = 0; 928 uint32_t page_count = multifd_ram_page_count(); 929 bool use_packets = multifd_use_packets(); 930 uint8_t i; 931 932 if (!migrate_multifd()) { 933 return true; 934 } 935 936 thread_count = migrate_multifd_channels(); 937 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 938 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 939 qemu_mutex_init(&multifd_send_state->multifd_send_mutex); 940 qemu_sem_init(&multifd_send_state->channels_created, 0); 941 qemu_sem_init(&multifd_send_state->channels_ready, 0); 942 qatomic_set(&multifd_send_state->exiting, 0); 943 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 944 945 for (i = 0; i < thread_count; i++) { 946 MultiFDSendParams *p = &multifd_send_state->params[i]; 947 Error *local_err = NULL; 948 949 qemu_sem_init(&p->sem, 0); 950 qemu_sem_init(&p->sem_sync, 0); 951 p->id = i; 952 p->data = multifd_send_data_alloc(); 953 954 if (use_packets) { 955 p->packet_len = sizeof(MultiFDPacket_t) 956 + sizeof(uint64_t) * page_count; 957 p->packet = g_malloc0(p->packet_len); 958 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); 959 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); 960 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION); 961 } 962 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i); 963 p->write_flags = 0; 964 965 if (!multifd_new_send_channel_create(p, &local_err)) { 966 migrate_set_error(s, local_err); 967 error_free(local_err); 968 ret = -1; 969 } 970 } 971 972 /* 973 * Wait until channel creation has started for all channels. The 974 * creation can still fail, but no more channels will be created 975 * past this point. 976 */ 977 for (i = 0; i < thread_count; i++) { 978 qemu_sem_wait(&multifd_send_state->channels_created); 979 } 980 981 if (ret) { 982 goto err; 983 } 984 985 for (i = 0; i < thread_count; i++) { 986 MultiFDSendParams *p = &multifd_send_state->params[i]; 987 Error *local_err = NULL; 988 989 ret = multifd_send_state->ops->send_setup(p, &local_err); 990 if (ret) { 991 migrate_set_error(s, local_err); 992 error_free(local_err); 993 goto err; 994 } 995 assert(p->iov); 996 } 997 998 multifd_device_state_send_setup(); 999 1000 return true; 1001 1002 err: 1003 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1004 MIGRATION_STATUS_FAILED); 1005 return false; 1006 } 1007 1008 bool multifd_recv(void) 1009 { 1010 int i; 1011 static int next_recv_channel; 1012 MultiFDRecvParams *p = NULL; 1013 MultiFDRecvData *data = multifd_recv_state->data; 1014 1015 /* 1016 * next_channel can remain from a previous migration that was 1017 * using more channels, so ensure it doesn't overflow if the 1018 * limit is lower now. 1019 */ 1020 next_recv_channel %= migrate_multifd_channels(); 1021 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { 1022 if (multifd_recv_should_exit()) { 1023 return false; 1024 } 1025 1026 p = &multifd_recv_state->params[i]; 1027 1028 if (qatomic_read(&p->pending_job) == false) { 1029 next_recv_channel = (i + 1) % migrate_multifd_channels(); 1030 break; 1031 } 1032 } 1033 1034 /* 1035 * Order pending_job read before manipulating p->data below. Pairs 1036 * with qatomic_store_release() at multifd_recv_thread(). 1037 */ 1038 smp_mb_acquire(); 1039 1040 assert(!p->data->size); 1041 multifd_recv_state->data = p->data; 1042 p->data = data; 1043 1044 /* 1045 * Order p->data update before setting pending_job. Pairs with 1046 * qatomic_load_acquire() at multifd_recv_thread(). 1047 */ 1048 qatomic_store_release(&p->pending_job, true); 1049 qemu_sem_post(&p->sem); 1050 1051 return true; 1052 } 1053 1054 MultiFDRecvData *multifd_get_recv_data(void) 1055 { 1056 return multifd_recv_state->data; 1057 } 1058 1059 static void multifd_recv_terminate_threads(Error *err) 1060 { 1061 int i; 1062 1063 trace_multifd_recv_terminate_threads(err != NULL); 1064 1065 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) { 1066 return; 1067 } 1068 1069 if (err) { 1070 MigrationState *s = migrate_get_current(); 1071 migrate_set_error(s, err); 1072 if (s->state == MIGRATION_STATUS_SETUP || 1073 s->state == MIGRATION_STATUS_ACTIVE) { 1074 migrate_set_state(&s->state, s->state, 1075 MIGRATION_STATUS_FAILED); 1076 } 1077 } 1078 1079 for (i = 0; i < migrate_multifd_channels(); i++) { 1080 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1081 1082 /* 1083 * The migration thread and channels interact differently 1084 * depending on the presence of packets. 1085 */ 1086 if (multifd_use_packets()) { 1087 /* 1088 * The channel receives as long as there are packets. When 1089 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the 1090 * channel waits for the migration thread to sync. If the 1091 * sync never happens, do it here. 1092 */ 1093 qemu_sem_post(&p->sem_sync); 1094 } else { 1095 /* 1096 * The channel waits for the migration thread to give it 1097 * work. When the migration thread runs out of work, it 1098 * releases the channel and waits for any pending work to 1099 * finish. If we reach here (e.g. due to error) before the 1100 * work runs out, release the channel. 1101 */ 1102 qemu_sem_post(&p->sem); 1103 } 1104 1105 /* 1106 * We could arrive here for two reasons: 1107 * - normal quit, i.e. everything went fine, just finished 1108 * - error quit: We close the channels so the channel threads 1109 * finish the qio_channel_read_all_eof() 1110 */ 1111 if (p->c) { 1112 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1113 } 1114 } 1115 } 1116 1117 void multifd_recv_shutdown(void) 1118 { 1119 if (migrate_multifd()) { 1120 multifd_recv_terminate_threads(NULL); 1121 } 1122 } 1123 1124 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p) 1125 { 1126 migration_ioc_unregister_yank(p->c); 1127 object_unref(OBJECT(p->c)); 1128 p->c = NULL; 1129 qemu_mutex_destroy(&p->mutex); 1130 qemu_sem_destroy(&p->sem_sync); 1131 qemu_sem_destroy(&p->sem); 1132 g_free(p->data); 1133 p->data = NULL; 1134 g_free(p->name); 1135 p->name = NULL; 1136 p->packet_len = 0; 1137 g_free(p->packet); 1138 p->packet = NULL; 1139 g_clear_pointer(&p->packet_dev_state, g_free); 1140 g_free(p->normal); 1141 p->normal = NULL; 1142 g_free(p->zero); 1143 p->zero = NULL; 1144 multifd_recv_state->ops->recv_cleanup(p); 1145 } 1146 1147 static void multifd_recv_cleanup_state(void) 1148 { 1149 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1150 g_free(multifd_recv_state->params); 1151 multifd_recv_state->params = NULL; 1152 g_free(multifd_recv_state->data); 1153 multifd_recv_state->data = NULL; 1154 g_free(multifd_recv_state); 1155 multifd_recv_state = NULL; 1156 } 1157 1158 void multifd_recv_cleanup(void) 1159 { 1160 int i; 1161 1162 if (!migrate_multifd()) { 1163 return; 1164 } 1165 multifd_recv_terminate_threads(NULL); 1166 for (i = 0; i < migrate_multifd_channels(); i++) { 1167 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1168 1169 if (p->thread_created) { 1170 qemu_thread_join(&p->thread); 1171 } 1172 } 1173 for (i = 0; i < migrate_multifd_channels(); i++) { 1174 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]); 1175 } 1176 multifd_recv_cleanup_state(); 1177 } 1178 1179 void multifd_recv_sync_main(void) 1180 { 1181 int thread_count = migrate_multifd_channels(); 1182 bool file_based = !multifd_use_packets(); 1183 int i; 1184 1185 if (!migrate_multifd()) { 1186 return; 1187 } 1188 1189 /* 1190 * File-based channels don't use packets and therefore need to 1191 * wait for more work. Release them to start the sync. 1192 */ 1193 if (file_based) { 1194 for (i = 0; i < thread_count; i++) { 1195 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1196 1197 trace_multifd_recv_sync_main_signal(p->id); 1198 qemu_sem_post(&p->sem); 1199 } 1200 } 1201 1202 /* 1203 * Initiate the synchronization by waiting for all channels. 1204 * 1205 * For socket-based migration this means each channel has received 1206 * the SYNC packet on the stream. 1207 * 1208 * For file-based migration this means each channel is done with 1209 * the work (pending_job=false). 1210 */ 1211 for (i = 0; i < thread_count; i++) { 1212 trace_multifd_recv_sync_main_wait(i); 1213 qemu_sem_wait(&multifd_recv_state->sem_sync); 1214 } 1215 1216 if (file_based) { 1217 /* 1218 * For file-based loading is done in one iteration. We're 1219 * done. 1220 */ 1221 return; 1222 } 1223 1224 /* 1225 * Sync done. Release the channels for the next iteration. 1226 */ 1227 for (i = 0; i < thread_count; i++) { 1228 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1229 1230 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1231 if (multifd_recv_state->packet_num < p->packet_num) { 1232 multifd_recv_state->packet_num = p->packet_num; 1233 } 1234 } 1235 trace_multifd_recv_sync_main_signal(p->id); 1236 qemu_sem_post(&p->sem_sync); 1237 } 1238 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1239 } 1240 1241 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp) 1242 { 1243 g_autofree char *dev_state_buf = NULL; 1244 int ret; 1245 1246 dev_state_buf = g_malloc(p->next_packet_size); 1247 1248 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp); 1249 if (ret != 0) { 1250 return ret; 1251 } 1252 1253 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1] 1254 != 0) { 1255 error_setg(errp, "unterminated multifd device state idstr"); 1256 return -1; 1257 } 1258 1259 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr, 1260 p->packet_dev_state->instance_id, 1261 dev_state_buf, p->next_packet_size, 1262 errp)) { 1263 ret = -1; 1264 } 1265 1266 return ret; 1267 } 1268 1269 static void *multifd_recv_thread(void *opaque) 1270 { 1271 MigrationState *s = migrate_get_current(); 1272 MultiFDRecvParams *p = opaque; 1273 Error *local_err = NULL; 1274 bool use_packets = multifd_use_packets(); 1275 int ret; 1276 1277 trace_multifd_recv_thread_start(p->id); 1278 rcu_register_thread(); 1279 1280 if (!s->multifd_clean_tls_termination) { 1281 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF; 1282 } 1283 1284 while (true) { 1285 MultiFDPacketHdr_t hdr; 1286 uint32_t flags = 0; 1287 bool is_device_state = false; 1288 bool has_data = false; 1289 uint8_t *pkt_buf; 1290 size_t pkt_len; 1291 1292 p->normal_num = 0; 1293 1294 if (use_packets) { 1295 struct iovec iov = { 1296 .iov_base = (void *)&hdr, 1297 .iov_len = sizeof(hdr) 1298 }; 1299 1300 if (multifd_recv_should_exit()) { 1301 break; 1302 } 1303 1304 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL, 1305 p->read_flags, &local_err); 1306 if (!ret) { 1307 /* EOF */ 1308 assert(!local_err); 1309 break; 1310 } 1311 1312 if (ret == -1) { 1313 break; 1314 } 1315 1316 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err); 1317 if (ret) { 1318 break; 1319 } 1320 1321 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE; 1322 if (is_device_state) { 1323 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr); 1324 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr); 1325 } else { 1326 pkt_buf = (uint8_t *)p->packet + sizeof(hdr); 1327 pkt_len = p->packet_len - sizeof(hdr); 1328 } 1329 1330 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len, 1331 &local_err); 1332 if (!ret) { 1333 /* EOF */ 1334 error_setg(&local_err, "multifd: unexpected EOF after packet header"); 1335 break; 1336 } 1337 1338 if (ret == -1) { 1339 break; 1340 } 1341 1342 qemu_mutex_lock(&p->mutex); 1343 ret = multifd_recv_unfill_packet(p, &local_err); 1344 if (ret) { 1345 qemu_mutex_unlock(&p->mutex); 1346 break; 1347 } 1348 1349 flags = p->flags; 1350 /* recv methods don't know how to handle the SYNC flag */ 1351 p->flags &= ~MULTIFD_FLAG_SYNC; 1352 1353 if (is_device_state) { 1354 has_data = p->next_packet_size > 0; 1355 } else { 1356 /* 1357 * Even if it's a SYNC packet, this needs to be set 1358 * because older QEMUs (<9.0) still send data along with 1359 * the SYNC packet. 1360 */ 1361 has_data = p->normal_num || p->zero_num; 1362 } 1363 1364 qemu_mutex_unlock(&p->mutex); 1365 } else { 1366 /* 1367 * No packets, so we need to wait for the vmstate code to 1368 * give us work. 1369 */ 1370 qemu_sem_wait(&p->sem); 1371 1372 if (multifd_recv_should_exit()) { 1373 break; 1374 } 1375 1376 /* pairs with qatomic_store_release() at multifd_recv() */ 1377 if (!qatomic_load_acquire(&p->pending_job)) { 1378 /* 1379 * Migration thread did not send work, this is 1380 * equivalent to pending_sync on the sending 1381 * side. Post sem_sync to notify we reached this 1382 * point. 1383 */ 1384 qemu_sem_post(&multifd_recv_state->sem_sync); 1385 continue; 1386 } 1387 1388 has_data = !!p->data->size; 1389 } 1390 1391 if (has_data) { 1392 /* 1393 * multifd thread should not be active and receive data 1394 * when migration is in the Postcopy phase. Two threads 1395 * writing the same memory area could easily corrupt 1396 * the guest state. 1397 */ 1398 assert(!migration_in_postcopy()); 1399 if (is_device_state) { 1400 assert(use_packets); 1401 ret = multifd_device_state_recv(p, &local_err); 1402 } else { 1403 ret = multifd_recv_state->ops->recv(p, &local_err); 1404 } 1405 if (ret != 0) { 1406 break; 1407 } 1408 } else if (is_device_state) { 1409 error_setg(&local_err, 1410 "multifd: received empty device state packet"); 1411 break; 1412 } 1413 1414 if (use_packets) { 1415 if (flags & MULTIFD_FLAG_SYNC) { 1416 if (is_device_state) { 1417 error_setg(&local_err, 1418 "multifd: received SYNC device state packet"); 1419 break; 1420 } 1421 1422 qemu_sem_post(&multifd_recv_state->sem_sync); 1423 qemu_sem_wait(&p->sem_sync); 1424 } 1425 } else { 1426 p->data->size = 0; 1427 /* 1428 * Order data->size update before clearing 1429 * pending_job. Pairs with smp_mb_acquire() at 1430 * multifd_recv(). 1431 */ 1432 qatomic_store_release(&p->pending_job, false); 1433 } 1434 } 1435 1436 if (local_err) { 1437 multifd_recv_terminate_threads(local_err); 1438 error_free(local_err); 1439 } 1440 1441 rcu_unregister_thread(); 1442 trace_multifd_recv_thread_end(p->id, p->packets_recved); 1443 1444 return NULL; 1445 } 1446 1447 int multifd_recv_setup(Error **errp) 1448 { 1449 int thread_count; 1450 uint32_t page_count = multifd_ram_page_count(); 1451 bool use_packets = multifd_use_packets(); 1452 uint8_t i; 1453 1454 /* 1455 * Return successfully if multiFD recv state is already initialised 1456 * or multiFD is not enabled. 1457 */ 1458 if (multifd_recv_state || !migrate_multifd()) { 1459 return 0; 1460 } 1461 1462 thread_count = migrate_multifd_channels(); 1463 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1464 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1465 1466 multifd_recv_state->data = g_new0(MultiFDRecvData, 1); 1467 multifd_recv_state->data->size = 0; 1468 1469 qatomic_set(&multifd_recv_state->count, 0); 1470 qatomic_set(&multifd_recv_state->exiting, 0); 1471 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1472 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1473 1474 for (i = 0; i < thread_count; i++) { 1475 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1476 1477 qemu_mutex_init(&p->mutex); 1478 qemu_sem_init(&p->sem_sync, 0); 1479 qemu_sem_init(&p->sem, 0); 1480 p->pending_job = false; 1481 p->id = i; 1482 1483 p->data = g_new0(MultiFDRecvData, 1); 1484 p->data->size = 0; 1485 1486 if (use_packets) { 1487 p->packet_len = sizeof(MultiFDPacket_t) 1488 + sizeof(uint64_t) * page_count; 1489 p->packet = g_malloc0(p->packet_len); 1490 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state)); 1491 } 1492 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i); 1493 p->normal = g_new0(ram_addr_t, page_count); 1494 p->zero = g_new0(ram_addr_t, page_count); 1495 } 1496 1497 for (i = 0; i < thread_count; i++) { 1498 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1499 int ret; 1500 1501 ret = multifd_recv_state->ops->recv_setup(p, errp); 1502 if (ret) { 1503 return ret; 1504 } 1505 } 1506 return 0; 1507 } 1508 1509 bool multifd_recv_all_channels_created(void) 1510 { 1511 int thread_count = migrate_multifd_channels(); 1512 1513 if (!migrate_multifd()) { 1514 return true; 1515 } 1516 1517 if (!multifd_recv_state) { 1518 /* Called before any connections created */ 1519 return false; 1520 } 1521 1522 return thread_count == qatomic_read(&multifd_recv_state->count); 1523 } 1524 1525 /* 1526 * Try to receive all multifd channels to get ready for the migration. 1527 * Sets @errp when failing to receive the current channel. 1528 */ 1529 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1530 { 1531 MultiFDRecvParams *p; 1532 Error *local_err = NULL; 1533 bool use_packets = multifd_use_packets(); 1534 int id; 1535 1536 if (use_packets) { 1537 id = multifd_recv_initial_packet(ioc, &local_err); 1538 if (id < 0) { 1539 multifd_recv_terminate_threads(local_err); 1540 error_propagate_prepend(errp, local_err, 1541 "failed to receive packet" 1542 " via multifd channel %d: ", 1543 qatomic_read(&multifd_recv_state->count)); 1544 return; 1545 } 1546 trace_multifd_recv_new_channel(id); 1547 } else { 1548 id = qatomic_read(&multifd_recv_state->count); 1549 } 1550 1551 p = &multifd_recv_state->params[id]; 1552 if (p->c != NULL) { 1553 error_setg(&local_err, "multifd: received id '%d' already setup'", 1554 id); 1555 multifd_recv_terminate_threads(local_err); 1556 error_propagate(errp, local_err); 1557 return; 1558 } 1559 p->c = ioc; 1560 object_ref(OBJECT(ioc)); 1561 1562 p->thread_created = true; 1563 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1564 QEMU_THREAD_JOINABLE); 1565 qatomic_inc(&multifd_recv_state->count); 1566 } 1567