1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "socket.h" 23 #include "tls.h" 24 #include "qemu-file.h" 25 #include "trace.h" 26 #include "multifd.h" 27 28 #include "qemu/yank.h" 29 #include "io/channel-socket.h" 30 #include "yank_functions.h" 31 32 /* Multiple fd's */ 33 34 #define MULTIFD_MAGIC 0x11223344U 35 #define MULTIFD_VERSION 1 36 37 typedef struct { 38 uint32_t magic; 39 uint32_t version; 40 unsigned char uuid[16]; /* QemuUUID */ 41 uint8_t id; 42 uint8_t unused1[7]; /* Reserved for future use */ 43 uint64_t unused2[4]; /* Reserved for future use */ 44 } __attribute__((packed)) MultiFDInit_t; 45 46 /* Multifd without compression */ 47 48 /** 49 * nocomp_send_setup: setup send side 50 * 51 * For no compression this function does nothing. 52 * 53 * Returns 0 for success or -1 for error 54 * 55 * @p: Params for the channel that we are using 56 * @errp: pointer to an error 57 */ 58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 59 { 60 return 0; 61 } 62 63 /** 64 * nocomp_send_cleanup: cleanup send side 65 * 66 * For no compression this function does nothing. 67 * 68 * @p: Params for the channel that we are using 69 * @errp: pointer to an error 70 */ 71 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 72 { 73 return; 74 } 75 76 /** 77 * nocomp_send_prepare: prepare date to be able to send 78 * 79 * For no compression we just have to calculate the size of the 80 * packet. 81 * 82 * Returns 0 for success or -1 for error 83 * 84 * @p: Params for the channel that we are using 85 * @errp: pointer to an error 86 */ 87 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 88 { 89 MultiFDPages_t *pages = p->pages; 90 91 for (int i = 0; i < p->normal_num; i++) { 92 p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i]; 93 p->iov[p->iovs_num].iov_len = p->page_size; 94 p->iovs_num++; 95 } 96 97 p->next_packet_size = p->normal_num * p->page_size; 98 p->flags |= MULTIFD_FLAG_NOCOMP; 99 return 0; 100 } 101 102 /** 103 * nocomp_recv_setup: setup receive side 104 * 105 * For no compression this function does nothing. 106 * 107 * Returns 0 for success or -1 for error 108 * 109 * @p: Params for the channel that we are using 110 * @errp: pointer to an error 111 */ 112 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 113 { 114 return 0; 115 } 116 117 /** 118 * nocomp_recv_cleanup: setup receive side 119 * 120 * For no compression this function does nothing. 121 * 122 * @p: Params for the channel that we are using 123 */ 124 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 125 { 126 } 127 128 /** 129 * nocomp_recv_pages: read the data from the channel into actual pages 130 * 131 * For no compression we just need to read things into the correct place. 132 * 133 * Returns 0 for success or -1 for error 134 * 135 * @p: Params for the channel that we are using 136 * @errp: pointer to an error 137 */ 138 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) 139 { 140 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 141 142 if (flags != MULTIFD_FLAG_NOCOMP) { 143 error_setg(errp, "multifd %u: flags received %x flags expected %x", 144 p->id, flags, MULTIFD_FLAG_NOCOMP); 145 return -1; 146 } 147 for (int i = 0; i < p->normal_num; i++) { 148 p->iov[i].iov_base = p->host + p->normal[i]; 149 p->iov[i].iov_len = p->page_size; 150 } 151 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); 152 } 153 154 static MultiFDMethods multifd_nocomp_ops = { 155 .send_setup = nocomp_send_setup, 156 .send_cleanup = nocomp_send_cleanup, 157 .send_prepare = nocomp_send_prepare, 158 .recv_setup = nocomp_recv_setup, 159 .recv_cleanup = nocomp_recv_cleanup, 160 .recv_pages = nocomp_recv_pages 161 }; 162 163 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 164 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 165 }; 166 167 void multifd_register_ops(int method, MultiFDMethods *ops) 168 { 169 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 170 multifd_ops[method] = ops; 171 } 172 173 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 174 { 175 MultiFDInit_t msg = {}; 176 int ret; 177 178 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 179 msg.version = cpu_to_be32(MULTIFD_VERSION); 180 msg.id = p->id; 181 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 182 183 ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 184 if (ret != 0) { 185 return -1; 186 } 187 return 0; 188 } 189 190 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 191 { 192 MultiFDInit_t msg; 193 int ret; 194 195 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 196 if (ret != 0) { 197 return -1; 198 } 199 200 msg.magic = be32_to_cpu(msg.magic); 201 msg.version = be32_to_cpu(msg.version); 202 203 if (msg.magic != MULTIFD_MAGIC) { 204 error_setg(errp, "multifd: received packet magic %x " 205 "expected %x", msg.magic, MULTIFD_MAGIC); 206 return -1; 207 } 208 209 if (msg.version != MULTIFD_VERSION) { 210 error_setg(errp, "multifd: received packet version %u " 211 "expected %u", msg.version, MULTIFD_VERSION); 212 return -1; 213 } 214 215 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 216 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 217 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 218 219 error_setg(errp, "multifd: received uuid '%s' and expected " 220 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 221 g_free(uuid); 222 g_free(msg_uuid); 223 return -1; 224 } 225 226 if (msg.id > migrate_multifd_channels()) { 227 error_setg(errp, "multifd: received channel version %u " 228 "expected %u", msg.version, MULTIFD_VERSION); 229 return -1; 230 } 231 232 return msg.id; 233 } 234 235 static MultiFDPages_t *multifd_pages_init(size_t size) 236 { 237 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 238 239 pages->allocated = size; 240 pages->offset = g_new0(ram_addr_t, size); 241 242 return pages; 243 } 244 245 static void multifd_pages_clear(MultiFDPages_t *pages) 246 { 247 pages->num = 0; 248 pages->allocated = 0; 249 pages->packet_num = 0; 250 pages->block = NULL; 251 g_free(pages->offset); 252 pages->offset = NULL; 253 g_free(pages); 254 } 255 256 static void multifd_send_fill_packet(MultiFDSendParams *p) 257 { 258 MultiFDPacket_t *packet = p->packet; 259 int i; 260 261 packet->flags = cpu_to_be32(p->flags); 262 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 263 packet->normal_pages = cpu_to_be32(p->normal_num); 264 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 265 packet->packet_num = cpu_to_be64(p->packet_num); 266 267 if (p->pages->block) { 268 strncpy(packet->ramblock, p->pages->block->idstr, 256); 269 } 270 271 for (i = 0; i < p->normal_num; i++) { 272 /* there are architectures where ram_addr_t is 32 bit */ 273 uint64_t temp = p->normal[i]; 274 275 packet->offset[i] = cpu_to_be64(temp); 276 } 277 } 278 279 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 280 { 281 MultiFDPacket_t *packet = p->packet; 282 RAMBlock *block; 283 int i; 284 285 packet->magic = be32_to_cpu(packet->magic); 286 if (packet->magic != MULTIFD_MAGIC) { 287 error_setg(errp, "multifd: received packet " 288 "magic %x and expected magic %x", 289 packet->magic, MULTIFD_MAGIC); 290 return -1; 291 } 292 293 packet->version = be32_to_cpu(packet->version); 294 if (packet->version != MULTIFD_VERSION) { 295 error_setg(errp, "multifd: received packet " 296 "version %u and expected version %u", 297 packet->version, MULTIFD_VERSION); 298 return -1; 299 } 300 301 p->flags = be32_to_cpu(packet->flags); 302 303 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 304 /* 305 * If we received a packet that is 100 times bigger than expected 306 * just stop migration. It is a magic number. 307 */ 308 if (packet->pages_alloc > p->page_count) { 309 error_setg(errp, "multifd: received packet " 310 "with size %u and expected a size of %u", 311 packet->pages_alloc, p->page_count) ; 312 return -1; 313 } 314 315 p->normal_num = be32_to_cpu(packet->normal_pages); 316 if (p->normal_num > packet->pages_alloc) { 317 error_setg(errp, "multifd: received packet " 318 "with %u pages and expected maximum pages are %u", 319 p->normal_num, packet->pages_alloc) ; 320 return -1; 321 } 322 323 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 324 p->packet_num = be64_to_cpu(packet->packet_num); 325 326 if (p->normal_num == 0) { 327 return 0; 328 } 329 330 /* make sure that ramblock is 0 terminated */ 331 packet->ramblock[255] = 0; 332 block = qemu_ram_block_by_name(packet->ramblock); 333 if (!block) { 334 error_setg(errp, "multifd: unknown ram block %s", 335 packet->ramblock); 336 return -1; 337 } 338 339 p->host = block->host; 340 for (i = 0; i < p->normal_num; i++) { 341 uint64_t offset = be64_to_cpu(packet->offset[i]); 342 343 if (offset > (block->used_length - p->page_size)) { 344 error_setg(errp, "multifd: offset too long %" PRIu64 345 " (max " RAM_ADDR_FMT ")", 346 offset, block->used_length); 347 return -1; 348 } 349 p->normal[i] = offset; 350 } 351 352 return 0; 353 } 354 355 struct { 356 MultiFDSendParams *params; 357 /* array of pages to sent */ 358 MultiFDPages_t *pages; 359 /* global number of generated multifd packets */ 360 uint64_t packet_num; 361 /* send channels ready */ 362 QemuSemaphore channels_ready; 363 /* 364 * Have we already run terminate threads. There is a race when it 365 * happens that we got one error while we are exiting. 366 * We will use atomic operations. Only valid values are 0 and 1. 367 */ 368 int exiting; 369 /* multifd ops */ 370 MultiFDMethods *ops; 371 } *multifd_send_state; 372 373 /* 374 * How we use multifd_send_state->pages and channel->pages? 375 * 376 * We create a pages for each channel, and a main one. Each time that 377 * we need to send a batch of pages we interchange the ones between 378 * multifd_send_state and the channel that is sending it. There are 379 * two reasons for that: 380 * - to not have to do so many mallocs during migration 381 * - to make easier to know what to free at the end of migration 382 * 383 * This way we always know who is the owner of each "pages" struct, 384 * and we don't need any locking. It belongs to the migration thread 385 * or to the channel thread. Switching is safe because the migration 386 * thread is using the channel mutex when changing it, and the channel 387 * have to had finish with its own, otherwise pending_job can't be 388 * false. 389 */ 390 391 static int multifd_send_pages(QEMUFile *f) 392 { 393 int i; 394 static int next_channel; 395 MultiFDSendParams *p = NULL; /* make happy gcc */ 396 MultiFDPages_t *pages = multifd_send_state->pages; 397 uint64_t transferred; 398 399 if (qatomic_read(&multifd_send_state->exiting)) { 400 return -1; 401 } 402 403 qemu_sem_wait(&multifd_send_state->channels_ready); 404 /* 405 * next_channel can remain from a previous migration that was 406 * using more channels, so ensure it doesn't overflow if the 407 * limit is lower now. 408 */ 409 next_channel %= migrate_multifd_channels(); 410 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 411 p = &multifd_send_state->params[i]; 412 413 qemu_mutex_lock(&p->mutex); 414 if (p->quit) { 415 error_report("%s: channel %d has already quit!", __func__, i); 416 qemu_mutex_unlock(&p->mutex); 417 return -1; 418 } 419 if (!p->pending_job) { 420 p->pending_job++; 421 next_channel = (i + 1) % migrate_multifd_channels(); 422 break; 423 } 424 qemu_mutex_unlock(&p->mutex); 425 } 426 assert(!p->pages->num); 427 assert(!p->pages->block); 428 429 p->packet_num = multifd_send_state->packet_num++; 430 multifd_send_state->pages = p->pages; 431 p->pages = pages; 432 transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len; 433 qemu_file_acct_rate_limit(f, transferred); 434 ram_counters.multifd_bytes += transferred; 435 stat64_add(&ram_atomic_counters.transferred, transferred); 436 qemu_mutex_unlock(&p->mutex); 437 qemu_sem_post(&p->sem); 438 439 return 1; 440 } 441 442 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 443 { 444 MultiFDPages_t *pages = multifd_send_state->pages; 445 446 if (!pages->block) { 447 pages->block = block; 448 } 449 450 if (pages->block == block) { 451 pages->offset[pages->num] = offset; 452 pages->num++; 453 454 if (pages->num < pages->allocated) { 455 return 1; 456 } 457 } 458 459 if (multifd_send_pages(f) < 0) { 460 return -1; 461 } 462 463 if (pages->block != block) { 464 return multifd_queue_page(f, block, offset); 465 } 466 467 return 1; 468 } 469 470 static void multifd_send_terminate_threads(Error *err) 471 { 472 int i; 473 474 trace_multifd_send_terminate_threads(err != NULL); 475 476 if (err) { 477 MigrationState *s = migrate_get_current(); 478 migrate_set_error(s, err); 479 if (s->state == MIGRATION_STATUS_SETUP || 480 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 481 s->state == MIGRATION_STATUS_DEVICE || 482 s->state == MIGRATION_STATUS_ACTIVE) { 483 migrate_set_state(&s->state, s->state, 484 MIGRATION_STATUS_FAILED); 485 } 486 } 487 488 /* 489 * We don't want to exit each threads twice. Depending on where 490 * we get the error, or if there are two independent errors in two 491 * threads at the same time, we can end calling this function 492 * twice. 493 */ 494 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 495 return; 496 } 497 498 for (i = 0; i < migrate_multifd_channels(); i++) { 499 MultiFDSendParams *p = &multifd_send_state->params[i]; 500 501 qemu_mutex_lock(&p->mutex); 502 p->quit = true; 503 qemu_sem_post(&p->sem); 504 if (p->c) { 505 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 506 } 507 qemu_mutex_unlock(&p->mutex); 508 } 509 } 510 511 void multifd_save_cleanup(void) 512 { 513 int i; 514 515 if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { 516 return; 517 } 518 multifd_send_terminate_threads(NULL); 519 for (i = 0; i < migrate_multifd_channels(); i++) { 520 MultiFDSendParams *p = &multifd_send_state->params[i]; 521 522 if (p->running) { 523 qemu_thread_join(&p->thread); 524 } 525 } 526 for (i = 0; i < migrate_multifd_channels(); i++) { 527 MultiFDSendParams *p = &multifd_send_state->params[i]; 528 Error *local_err = NULL; 529 530 if (p->registered_yank) { 531 migration_ioc_unregister_yank(p->c); 532 } 533 socket_send_channel_destroy(p->c); 534 p->c = NULL; 535 qemu_mutex_destroy(&p->mutex); 536 qemu_sem_destroy(&p->sem); 537 qemu_sem_destroy(&p->sem_sync); 538 g_free(p->name); 539 p->name = NULL; 540 multifd_pages_clear(p->pages); 541 p->pages = NULL; 542 p->packet_len = 0; 543 g_free(p->packet); 544 p->packet = NULL; 545 g_free(p->iov); 546 p->iov = NULL; 547 g_free(p->normal); 548 p->normal = NULL; 549 multifd_send_state->ops->send_cleanup(p, &local_err); 550 if (local_err) { 551 migrate_set_error(migrate_get_current(), local_err); 552 error_free(local_err); 553 } 554 } 555 qemu_sem_destroy(&multifd_send_state->channels_ready); 556 g_free(multifd_send_state->params); 557 multifd_send_state->params = NULL; 558 multifd_pages_clear(multifd_send_state->pages); 559 multifd_send_state->pages = NULL; 560 g_free(multifd_send_state); 561 multifd_send_state = NULL; 562 } 563 564 static int multifd_zero_copy_flush(QIOChannel *c) 565 { 566 int ret; 567 Error *err = NULL; 568 569 ret = qio_channel_flush(c, &err); 570 if (ret < 0) { 571 error_report_err(err); 572 return -1; 573 } 574 if (ret == 1) { 575 dirty_sync_missed_zero_copy(); 576 } 577 578 return ret; 579 } 580 581 int multifd_send_sync_main(QEMUFile *f) 582 { 583 int i; 584 bool flush_zero_copy; 585 586 if (!migrate_use_multifd()) { 587 return 0; 588 } 589 if (multifd_send_state->pages->num) { 590 if (multifd_send_pages(f) < 0) { 591 error_report("%s: multifd_send_pages fail", __func__); 592 return -1; 593 } 594 } 595 596 /* 597 * When using zero-copy, it's necessary to flush the pages before any of 598 * the pages can be sent again, so we'll make sure the new version of the 599 * pages will always arrive _later_ than the old pages. 600 * 601 * Currently we achieve this by flushing the zero-page requested writes 602 * per ram iteration, but in the future we could potentially optimize it 603 * to be less frequent, e.g. only after we finished one whole scanning of 604 * all the dirty bitmaps. 605 */ 606 607 flush_zero_copy = migrate_use_zero_copy_send(); 608 609 for (i = 0; i < migrate_multifd_channels(); i++) { 610 MultiFDSendParams *p = &multifd_send_state->params[i]; 611 612 trace_multifd_send_sync_main_signal(p->id); 613 614 qemu_mutex_lock(&p->mutex); 615 616 if (p->quit) { 617 error_report("%s: channel %d has already quit", __func__, i); 618 qemu_mutex_unlock(&p->mutex); 619 return -1; 620 } 621 622 p->packet_num = multifd_send_state->packet_num++; 623 p->flags |= MULTIFD_FLAG_SYNC; 624 p->pending_job++; 625 qemu_file_acct_rate_limit(f, p->packet_len); 626 ram_counters.multifd_bytes += p->packet_len; 627 stat64_add(&ram_atomic_counters.transferred, p->packet_len); 628 qemu_mutex_unlock(&p->mutex); 629 qemu_sem_post(&p->sem); 630 631 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) { 632 return -1; 633 } 634 } 635 for (i = 0; i < migrate_multifd_channels(); i++) { 636 MultiFDSendParams *p = &multifd_send_state->params[i]; 637 638 trace_multifd_send_sync_main_wait(p->id); 639 qemu_sem_wait(&p->sem_sync); 640 } 641 trace_multifd_send_sync_main(multifd_send_state->packet_num); 642 643 return 0; 644 } 645 646 static void *multifd_send_thread(void *opaque) 647 { 648 MultiFDSendParams *p = opaque; 649 Error *local_err = NULL; 650 int ret = 0; 651 bool use_zero_copy_send = migrate_use_zero_copy_send(); 652 653 trace_multifd_send_thread_start(p->id); 654 rcu_register_thread(); 655 656 if (multifd_send_initial_packet(p, &local_err) < 0) { 657 ret = -1; 658 goto out; 659 } 660 /* initial packet */ 661 p->num_packets = 1; 662 663 while (true) { 664 qemu_sem_wait(&p->sem); 665 666 if (qatomic_read(&multifd_send_state->exiting)) { 667 break; 668 } 669 qemu_mutex_lock(&p->mutex); 670 671 if (p->pending_job) { 672 uint64_t packet_num = p->packet_num; 673 uint32_t flags = p->flags; 674 p->normal_num = 0; 675 676 if (use_zero_copy_send) { 677 p->iovs_num = 0; 678 } else { 679 p->iovs_num = 1; 680 } 681 682 for (int i = 0; i < p->pages->num; i++) { 683 p->normal[p->normal_num] = p->pages->offset[i]; 684 p->normal_num++; 685 } 686 687 if (p->normal_num) { 688 ret = multifd_send_state->ops->send_prepare(p, &local_err); 689 if (ret != 0) { 690 qemu_mutex_unlock(&p->mutex); 691 break; 692 } 693 } 694 multifd_send_fill_packet(p); 695 p->flags = 0; 696 p->num_packets++; 697 p->total_normal_pages += p->normal_num; 698 p->pages->num = 0; 699 p->pages->block = NULL; 700 qemu_mutex_unlock(&p->mutex); 701 702 trace_multifd_send(p->id, packet_num, p->normal_num, flags, 703 p->next_packet_size); 704 705 if (use_zero_copy_send) { 706 /* Send header first, without zerocopy */ 707 ret = qio_channel_write_all(p->c, (void *)p->packet, 708 p->packet_len, &local_err); 709 if (ret != 0) { 710 break; 711 } 712 } else { 713 /* Send header using the same writev call */ 714 p->iov[0].iov_len = p->packet_len; 715 p->iov[0].iov_base = p->packet; 716 } 717 718 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL, 719 0, p->write_flags, &local_err); 720 if (ret != 0) { 721 break; 722 } 723 724 qemu_mutex_lock(&p->mutex); 725 p->pending_job--; 726 qemu_mutex_unlock(&p->mutex); 727 728 if (flags & MULTIFD_FLAG_SYNC) { 729 qemu_sem_post(&p->sem_sync); 730 } 731 qemu_sem_post(&multifd_send_state->channels_ready); 732 } else if (p->quit) { 733 qemu_mutex_unlock(&p->mutex); 734 break; 735 } else { 736 qemu_mutex_unlock(&p->mutex); 737 /* sometimes there are spurious wakeups */ 738 } 739 } 740 741 out: 742 if (local_err) { 743 trace_multifd_send_error(p->id); 744 multifd_send_terminate_threads(local_err); 745 error_free(local_err); 746 } 747 748 /* 749 * Error happen, I will exit, but I can't just leave, tell 750 * who pay attention to me. 751 */ 752 if (ret != 0) { 753 qemu_sem_post(&p->sem_sync); 754 qemu_sem_post(&multifd_send_state->channels_ready); 755 } 756 757 qemu_mutex_lock(&p->mutex); 758 p->running = false; 759 qemu_mutex_unlock(&p->mutex); 760 761 rcu_unregister_thread(); 762 trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages); 763 764 return NULL; 765 } 766 767 static bool multifd_channel_connect(MultiFDSendParams *p, 768 QIOChannel *ioc, 769 Error *error); 770 771 static void multifd_tls_outgoing_handshake(QIOTask *task, 772 gpointer opaque) 773 { 774 MultiFDSendParams *p = opaque; 775 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 776 Error *err = NULL; 777 778 if (qio_task_propagate_error(task, &err)) { 779 trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); 780 } else { 781 trace_multifd_tls_outgoing_handshake_complete(ioc); 782 } 783 784 if (!multifd_channel_connect(p, ioc, err)) { 785 /* 786 * Error happen, mark multifd_send_thread status as 'quit' although it 787 * is not created, and then tell who pay attention to me. 788 */ 789 p->quit = true; 790 qemu_sem_post(&multifd_send_state->channels_ready); 791 qemu_sem_post(&p->sem_sync); 792 } 793 } 794 795 static void *multifd_tls_handshake_thread(void *opaque) 796 { 797 MultiFDSendParams *p = opaque; 798 QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 799 800 qio_channel_tls_handshake(tioc, 801 multifd_tls_outgoing_handshake, 802 p, 803 NULL, 804 NULL); 805 return NULL; 806 } 807 808 static void multifd_tls_channel_connect(MultiFDSendParams *p, 809 QIOChannel *ioc, 810 Error **errp) 811 { 812 MigrationState *s = migrate_get_current(); 813 const char *hostname = s->hostname; 814 QIOChannelTLS *tioc; 815 816 tioc = migration_tls_client_create(s, ioc, hostname, errp); 817 if (!tioc) { 818 return; 819 } 820 821 object_unref(OBJECT(ioc)); 822 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 823 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 824 p->c = QIO_CHANNEL(tioc); 825 qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", 826 multifd_tls_handshake_thread, p, 827 QEMU_THREAD_JOINABLE); 828 } 829 830 static bool multifd_channel_connect(MultiFDSendParams *p, 831 QIOChannel *ioc, 832 Error *error) 833 { 834 trace_multifd_set_outgoing_channel( 835 ioc, object_get_typename(OBJECT(ioc)), 836 migrate_get_current()->hostname, error); 837 838 if (!error) { 839 if (migrate_channel_requires_tls_upgrade(ioc)) { 840 multifd_tls_channel_connect(p, ioc, &error); 841 if (!error) { 842 /* 843 * tls_channel_connect will call back to this 844 * function after the TLS handshake, 845 * so we mustn't call multifd_send_thread until then 846 */ 847 return true; 848 } else { 849 return false; 850 } 851 } else { 852 migration_ioc_register_yank(ioc); 853 p->registered_yank = true; 854 p->c = ioc; 855 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 856 QEMU_THREAD_JOINABLE); 857 } 858 return true; 859 } 860 861 return false; 862 } 863 864 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, 865 QIOChannel *ioc, Error *err) 866 { 867 migrate_set_error(migrate_get_current(), err); 868 /* Error happen, we need to tell who pay attention to me */ 869 qemu_sem_post(&multifd_send_state->channels_ready); 870 qemu_sem_post(&p->sem_sync); 871 /* 872 * Although multifd_send_thread is not created, but main migration 873 * thread neet to judge whether it is running, so we need to mark 874 * its status. 875 */ 876 p->quit = true; 877 object_unref(OBJECT(ioc)); 878 error_free(err); 879 } 880 881 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 882 { 883 MultiFDSendParams *p = opaque; 884 QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 885 Error *local_err = NULL; 886 887 trace_multifd_new_send_channel_async(p->id); 888 if (qio_task_propagate_error(task, &local_err)) { 889 goto cleanup; 890 } else { 891 p->c = QIO_CHANNEL(sioc); 892 qio_channel_set_delay(p->c, false); 893 p->running = true; 894 if (!multifd_channel_connect(p, sioc, local_err)) { 895 goto cleanup; 896 } 897 return; 898 } 899 900 cleanup: 901 multifd_new_send_channel_cleanup(p, sioc, local_err); 902 } 903 904 int multifd_save_setup(Error **errp) 905 { 906 int thread_count; 907 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 908 uint8_t i; 909 910 if (!migrate_use_multifd()) { 911 return 0; 912 } 913 if (!migrate_multi_channels_is_allowed()) { 914 error_setg(errp, "multifd is not supported by current protocol"); 915 return -1; 916 } 917 918 thread_count = migrate_multifd_channels(); 919 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 920 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 921 multifd_send_state->pages = multifd_pages_init(page_count); 922 qemu_sem_init(&multifd_send_state->channels_ready, 0); 923 qatomic_set(&multifd_send_state->exiting, 0); 924 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 925 926 for (i = 0; i < thread_count; i++) { 927 MultiFDSendParams *p = &multifd_send_state->params[i]; 928 929 qemu_mutex_init(&p->mutex); 930 qemu_sem_init(&p->sem, 0); 931 qemu_sem_init(&p->sem_sync, 0); 932 p->quit = false; 933 p->pending_job = 0; 934 p->id = i; 935 p->pages = multifd_pages_init(page_count); 936 p->packet_len = sizeof(MultiFDPacket_t) 937 + sizeof(uint64_t) * page_count; 938 p->packet = g_malloc0(p->packet_len); 939 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 940 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 941 p->name = g_strdup_printf("multifdsend_%d", i); 942 /* We need one extra place for the packet header */ 943 p->iov = g_new0(struct iovec, page_count + 1); 944 p->normal = g_new0(ram_addr_t, page_count); 945 p->page_size = qemu_target_page_size(); 946 p->page_count = page_count; 947 948 if (migrate_use_zero_copy_send()) { 949 p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY; 950 } else { 951 p->write_flags = 0; 952 } 953 954 socket_send_channel_create(multifd_new_send_channel_async, p); 955 } 956 957 for (i = 0; i < thread_count; i++) { 958 MultiFDSendParams *p = &multifd_send_state->params[i]; 959 Error *local_err = NULL; 960 int ret; 961 962 ret = multifd_send_state->ops->send_setup(p, &local_err); 963 if (ret) { 964 error_propagate(errp, local_err); 965 return ret; 966 } 967 } 968 return 0; 969 } 970 971 struct { 972 MultiFDRecvParams *params; 973 /* number of created threads */ 974 int count; 975 /* syncs main thread and channels */ 976 QemuSemaphore sem_sync; 977 /* global number of generated multifd packets */ 978 uint64_t packet_num; 979 /* multifd ops */ 980 MultiFDMethods *ops; 981 } *multifd_recv_state; 982 983 static void multifd_recv_terminate_threads(Error *err) 984 { 985 int i; 986 987 trace_multifd_recv_terminate_threads(err != NULL); 988 989 if (err) { 990 MigrationState *s = migrate_get_current(); 991 migrate_set_error(s, err); 992 if (s->state == MIGRATION_STATUS_SETUP || 993 s->state == MIGRATION_STATUS_ACTIVE) { 994 migrate_set_state(&s->state, s->state, 995 MIGRATION_STATUS_FAILED); 996 } 997 } 998 999 for (i = 0; i < migrate_multifd_channels(); i++) { 1000 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1001 1002 qemu_mutex_lock(&p->mutex); 1003 p->quit = true; 1004 /* 1005 * We could arrive here for two reasons: 1006 * - normal quit, i.e. everything went fine, just finished 1007 * - error quit: We close the channels so the channel threads 1008 * finish the qio_channel_read_all_eof() 1009 */ 1010 if (p->c) { 1011 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 1012 } 1013 qemu_mutex_unlock(&p->mutex); 1014 } 1015 } 1016 1017 int multifd_load_cleanup(Error **errp) 1018 { 1019 int i; 1020 1021 if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) { 1022 return 0; 1023 } 1024 multifd_recv_terminate_threads(NULL); 1025 for (i = 0; i < migrate_multifd_channels(); i++) { 1026 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1027 1028 if (p->running) { 1029 p->quit = true; 1030 /* 1031 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1032 * however try to wakeup it without harm in cleanup phase. 1033 */ 1034 qemu_sem_post(&p->sem_sync); 1035 qemu_thread_join(&p->thread); 1036 } 1037 } 1038 for (i = 0; i < migrate_multifd_channels(); i++) { 1039 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1040 1041 migration_ioc_unregister_yank(p->c); 1042 object_unref(OBJECT(p->c)); 1043 p->c = NULL; 1044 qemu_mutex_destroy(&p->mutex); 1045 qemu_sem_destroy(&p->sem_sync); 1046 g_free(p->name); 1047 p->name = NULL; 1048 p->packet_len = 0; 1049 g_free(p->packet); 1050 p->packet = NULL; 1051 g_free(p->iov); 1052 p->iov = NULL; 1053 g_free(p->normal); 1054 p->normal = NULL; 1055 multifd_recv_state->ops->recv_cleanup(p); 1056 } 1057 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1058 g_free(multifd_recv_state->params); 1059 multifd_recv_state->params = NULL; 1060 g_free(multifd_recv_state); 1061 multifd_recv_state = NULL; 1062 1063 return 0; 1064 } 1065 1066 void multifd_recv_sync_main(void) 1067 { 1068 int i; 1069 1070 if (!migrate_use_multifd()) { 1071 return; 1072 } 1073 for (i = 0; i < migrate_multifd_channels(); i++) { 1074 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1075 1076 trace_multifd_recv_sync_main_wait(p->id); 1077 qemu_sem_wait(&multifd_recv_state->sem_sync); 1078 } 1079 for (i = 0; i < migrate_multifd_channels(); i++) { 1080 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1081 1082 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1083 if (multifd_recv_state->packet_num < p->packet_num) { 1084 multifd_recv_state->packet_num = p->packet_num; 1085 } 1086 } 1087 trace_multifd_recv_sync_main_signal(p->id); 1088 qemu_sem_post(&p->sem_sync); 1089 } 1090 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1091 } 1092 1093 static void *multifd_recv_thread(void *opaque) 1094 { 1095 MultiFDRecvParams *p = opaque; 1096 Error *local_err = NULL; 1097 int ret; 1098 1099 trace_multifd_recv_thread_start(p->id); 1100 rcu_register_thread(); 1101 1102 while (true) { 1103 uint32_t flags; 1104 1105 if (p->quit) { 1106 break; 1107 } 1108 1109 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1110 p->packet_len, &local_err); 1111 if (ret == 0) { /* EOF */ 1112 break; 1113 } 1114 if (ret == -1) { /* Error */ 1115 break; 1116 } 1117 1118 qemu_mutex_lock(&p->mutex); 1119 ret = multifd_recv_unfill_packet(p, &local_err); 1120 if (ret) { 1121 qemu_mutex_unlock(&p->mutex); 1122 break; 1123 } 1124 1125 flags = p->flags; 1126 /* recv methods don't know how to handle the SYNC flag */ 1127 p->flags &= ~MULTIFD_FLAG_SYNC; 1128 trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags, 1129 p->next_packet_size); 1130 p->num_packets++; 1131 p->total_normal_pages += p->normal_num; 1132 qemu_mutex_unlock(&p->mutex); 1133 1134 if (p->normal_num) { 1135 ret = multifd_recv_state->ops->recv_pages(p, &local_err); 1136 if (ret != 0) { 1137 break; 1138 } 1139 } 1140 1141 if (flags & MULTIFD_FLAG_SYNC) { 1142 qemu_sem_post(&multifd_recv_state->sem_sync); 1143 qemu_sem_wait(&p->sem_sync); 1144 } 1145 } 1146 1147 if (local_err) { 1148 multifd_recv_terminate_threads(local_err); 1149 error_free(local_err); 1150 } 1151 qemu_mutex_lock(&p->mutex); 1152 p->running = false; 1153 qemu_mutex_unlock(&p->mutex); 1154 1155 rcu_unregister_thread(); 1156 trace_multifd_recv_thread_end(p->id, p->num_packets, p->total_normal_pages); 1157 1158 return NULL; 1159 } 1160 1161 int multifd_load_setup(Error **errp) 1162 { 1163 int thread_count; 1164 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1165 uint8_t i; 1166 1167 if (!migrate_use_multifd()) { 1168 return 0; 1169 } 1170 if (!migrate_multi_channels_is_allowed()) { 1171 error_setg(errp, "multifd is not supported by current protocol"); 1172 return -1; 1173 } 1174 thread_count = migrate_multifd_channels(); 1175 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1176 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1177 qatomic_set(&multifd_recv_state->count, 0); 1178 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1179 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1180 1181 for (i = 0; i < thread_count; i++) { 1182 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1183 1184 qemu_mutex_init(&p->mutex); 1185 qemu_sem_init(&p->sem_sync, 0); 1186 p->quit = false; 1187 p->id = i; 1188 p->packet_len = sizeof(MultiFDPacket_t) 1189 + sizeof(uint64_t) * page_count; 1190 p->packet = g_malloc0(p->packet_len); 1191 p->name = g_strdup_printf("multifdrecv_%d", i); 1192 p->iov = g_new0(struct iovec, page_count); 1193 p->normal = g_new0(ram_addr_t, page_count); 1194 p->page_count = page_count; 1195 p->page_size = qemu_target_page_size(); 1196 } 1197 1198 for (i = 0; i < thread_count; i++) { 1199 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1200 Error *local_err = NULL; 1201 int ret; 1202 1203 ret = multifd_recv_state->ops->recv_setup(p, &local_err); 1204 if (ret) { 1205 error_propagate(errp, local_err); 1206 return ret; 1207 } 1208 } 1209 return 0; 1210 } 1211 1212 bool multifd_recv_all_channels_created(void) 1213 { 1214 int thread_count = migrate_multifd_channels(); 1215 1216 if (!migrate_use_multifd()) { 1217 return true; 1218 } 1219 1220 if (!multifd_recv_state) { 1221 /* Called before any connections created */ 1222 return false; 1223 } 1224 1225 return thread_count == qatomic_read(&multifd_recv_state->count); 1226 } 1227 1228 /* 1229 * Try to receive all multifd channels to get ready for the migration. 1230 * - Return true and do not set @errp when correctly receiving all channels; 1231 * - Return false and do not set @errp when correctly receiving the current one; 1232 * - Return false and set @errp when failing to receive the current channel. 1233 */ 1234 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1235 { 1236 MultiFDRecvParams *p; 1237 Error *local_err = NULL; 1238 int id; 1239 1240 id = multifd_recv_initial_packet(ioc, &local_err); 1241 if (id < 0) { 1242 multifd_recv_terminate_threads(local_err); 1243 error_propagate_prepend(errp, local_err, 1244 "failed to receive packet" 1245 " via multifd channel %d: ", 1246 qatomic_read(&multifd_recv_state->count)); 1247 return false; 1248 } 1249 trace_multifd_recv_new_channel(id); 1250 1251 p = &multifd_recv_state->params[id]; 1252 if (p->c != NULL) { 1253 error_setg(&local_err, "multifd: received id '%d' already setup'", 1254 id); 1255 multifd_recv_terminate_threads(local_err); 1256 error_propagate(errp, local_err); 1257 return false; 1258 } 1259 p->c = ioc; 1260 object_ref(OBJECT(ioc)); 1261 /* initial packet */ 1262 p->num_packets = 1; 1263 1264 p->running = true; 1265 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1266 QEMU_THREAD_JOINABLE); 1267 qatomic_inc(&multifd_recv_state->count); 1268 return qatomic_read(&multifd_recv_state->count) == 1269 migrate_multifd_channels(); 1270 } 1271