1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "socket.h" 23 #include "tls.h" 24 #include "qemu-file.h" 25 #include "trace.h" 26 #include "multifd.h" 27 28 #include "qemu/yank.h" 29 #include "io/channel-socket.h" 30 #include "yank_functions.h" 31 32 /* Multiple fd's */ 33 34 #define MULTIFD_MAGIC 0x11223344U 35 #define MULTIFD_VERSION 1 36 37 typedef struct { 38 uint32_t magic; 39 uint32_t version; 40 unsigned char uuid[16]; /* QemuUUID */ 41 uint8_t id; 42 uint8_t unused1[7]; /* Reserved for future use */ 43 uint64_t unused2[4]; /* Reserved for future use */ 44 } __attribute__((packed)) MultiFDInit_t; 45 46 /* Multifd without compression */ 47 48 /** 49 * nocomp_send_setup: setup send side 50 * 51 * For no compression this function does nothing. 52 * 53 * Returns 0 for success or -1 for error 54 * 55 * @p: Params for the channel that we are using 56 * @errp: pointer to an error 57 */ 58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 59 { 60 return 0; 61 } 62 63 /** 64 * nocomp_send_cleanup: cleanup send side 65 * 66 * For no compression this function does nothing. 67 * 68 * @p: Params for the channel that we are using 69 */ 70 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 71 { 72 return; 73 } 74 75 /** 76 * nocomp_send_prepare: prepare date to be able to send 77 * 78 * For no compression we just have to calculate the size of the 79 * packet. 80 * 81 * Returns 0 for success or -1 for error 82 * 83 * @p: Params for the channel that we are using 84 * @used: number of pages used 85 * @errp: pointer to an error 86 */ 87 static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used, 88 Error **errp) 89 { 90 p->next_packet_size = used * qemu_target_page_size(); 91 p->flags |= MULTIFD_FLAG_NOCOMP; 92 return 0; 93 } 94 95 /** 96 * nocomp_send_write: do the actual write of the data 97 * 98 * For no compression we just have to write the data. 99 * 100 * Returns 0 for success or -1 for error 101 * 102 * @p: Params for the channel that we are using 103 * @used: number of pages used 104 * @errp: pointer to an error 105 */ 106 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 107 { 108 return qio_channel_writev_all(p->c, p->pages->iov, used, errp); 109 } 110 111 /** 112 * nocomp_recv_setup: setup receive side 113 * 114 * For no compression this function does nothing. 115 * 116 * Returns 0 for success or -1 for error 117 * 118 * @p: Params for the channel that we are using 119 * @errp: pointer to an error 120 */ 121 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 122 { 123 return 0; 124 } 125 126 /** 127 * nocomp_recv_cleanup: setup receive side 128 * 129 * For no compression this function does nothing. 130 * 131 * @p: Params for the channel that we are using 132 */ 133 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 134 { 135 } 136 137 /** 138 * nocomp_recv_pages: read the data from the channel into actual pages 139 * 140 * For no compression we just need to read things into the correct place. 141 * 142 * Returns 0 for success or -1 for error 143 * 144 * @p: Params for the channel that we are using 145 * @used: number of pages used 146 * @errp: pointer to an error 147 */ 148 static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) 149 { 150 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 151 152 if (flags != MULTIFD_FLAG_NOCOMP) { 153 error_setg(errp, "multifd %d: flags received %x flags expected %x", 154 p->id, flags, MULTIFD_FLAG_NOCOMP); 155 return -1; 156 } 157 return qio_channel_readv_all(p->c, p->pages->iov, used, errp); 158 } 159 160 static MultiFDMethods multifd_nocomp_ops = { 161 .send_setup = nocomp_send_setup, 162 .send_cleanup = nocomp_send_cleanup, 163 .send_prepare = nocomp_send_prepare, 164 .send_write = nocomp_send_write, 165 .recv_setup = nocomp_recv_setup, 166 .recv_cleanup = nocomp_recv_cleanup, 167 .recv_pages = nocomp_recv_pages 168 }; 169 170 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 171 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 172 }; 173 174 void multifd_register_ops(int method, MultiFDMethods *ops) 175 { 176 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 177 multifd_ops[method] = ops; 178 } 179 180 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 181 { 182 MultiFDInit_t msg = {}; 183 int ret; 184 185 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 186 msg.version = cpu_to_be32(MULTIFD_VERSION); 187 msg.id = p->id; 188 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 189 190 ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 191 if (ret != 0) { 192 return -1; 193 } 194 return 0; 195 } 196 197 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 198 { 199 MultiFDInit_t msg; 200 int ret; 201 202 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 203 if (ret != 0) { 204 return -1; 205 } 206 207 msg.magic = be32_to_cpu(msg.magic); 208 msg.version = be32_to_cpu(msg.version); 209 210 if (msg.magic != MULTIFD_MAGIC) { 211 error_setg(errp, "multifd: received packet magic %x " 212 "expected %x", msg.magic, MULTIFD_MAGIC); 213 return -1; 214 } 215 216 if (msg.version != MULTIFD_VERSION) { 217 error_setg(errp, "multifd: received packet version %d " 218 "expected %d", msg.version, MULTIFD_VERSION); 219 return -1; 220 } 221 222 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 223 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 224 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 225 226 error_setg(errp, "multifd: received uuid '%s' and expected " 227 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 228 g_free(uuid); 229 g_free(msg_uuid); 230 return -1; 231 } 232 233 if (msg.id > migrate_multifd_channels()) { 234 error_setg(errp, "multifd: received channel version %d " 235 "expected %d", msg.version, MULTIFD_VERSION); 236 return -1; 237 } 238 239 return msg.id; 240 } 241 242 static MultiFDPages_t *multifd_pages_init(size_t size) 243 { 244 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 245 246 pages->allocated = size; 247 pages->iov = g_new0(struct iovec, size); 248 pages->offset = g_new0(ram_addr_t, size); 249 250 return pages; 251 } 252 253 static void multifd_pages_clear(MultiFDPages_t *pages) 254 { 255 pages->used = 0; 256 pages->allocated = 0; 257 pages->packet_num = 0; 258 pages->block = NULL; 259 g_free(pages->iov); 260 pages->iov = NULL; 261 g_free(pages->offset); 262 pages->offset = NULL; 263 g_free(pages); 264 } 265 266 static void multifd_send_fill_packet(MultiFDSendParams *p) 267 { 268 MultiFDPacket_t *packet = p->packet; 269 int i; 270 271 packet->flags = cpu_to_be32(p->flags); 272 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 273 packet->pages_used = cpu_to_be32(p->pages->used); 274 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 275 packet->packet_num = cpu_to_be64(p->packet_num); 276 277 if (p->pages->block) { 278 strncpy(packet->ramblock, p->pages->block->idstr, 256); 279 } 280 281 for (i = 0; i < p->pages->used; i++) { 282 /* there are architectures where ram_addr_t is 32 bit */ 283 uint64_t temp = p->pages->offset[i]; 284 285 packet->offset[i] = cpu_to_be64(temp); 286 } 287 } 288 289 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 290 { 291 MultiFDPacket_t *packet = p->packet; 292 uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 293 RAMBlock *block; 294 int i; 295 296 packet->magic = be32_to_cpu(packet->magic); 297 if (packet->magic != MULTIFD_MAGIC) { 298 error_setg(errp, "multifd: received packet " 299 "magic %x and expected magic %x", 300 packet->magic, MULTIFD_MAGIC); 301 return -1; 302 } 303 304 packet->version = be32_to_cpu(packet->version); 305 if (packet->version != MULTIFD_VERSION) { 306 error_setg(errp, "multifd: received packet " 307 "version %d and expected version %d", 308 packet->version, MULTIFD_VERSION); 309 return -1; 310 } 311 312 p->flags = be32_to_cpu(packet->flags); 313 314 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 315 /* 316 * If we received a packet that is 100 times bigger than expected 317 * just stop migration. It is a magic number. 318 */ 319 if (packet->pages_alloc > pages_max * 100) { 320 error_setg(errp, "multifd: received packet " 321 "with size %d and expected a maximum size of %d", 322 packet->pages_alloc, pages_max * 100) ; 323 return -1; 324 } 325 /* 326 * We received a packet that is bigger than expected but inside 327 * reasonable limits (see previous comment). Just reallocate. 328 */ 329 if (packet->pages_alloc > p->pages->allocated) { 330 multifd_pages_clear(p->pages); 331 p->pages = multifd_pages_init(packet->pages_alloc); 332 } 333 334 p->pages->used = be32_to_cpu(packet->pages_used); 335 if (p->pages->used > packet->pages_alloc) { 336 error_setg(errp, "multifd: received packet " 337 "with %d pages and expected maximum pages are %d", 338 p->pages->used, packet->pages_alloc) ; 339 return -1; 340 } 341 342 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 343 p->packet_num = be64_to_cpu(packet->packet_num); 344 345 if (p->pages->used == 0) { 346 return 0; 347 } 348 349 /* make sure that ramblock is 0 terminated */ 350 packet->ramblock[255] = 0; 351 block = qemu_ram_block_by_name(packet->ramblock); 352 if (!block) { 353 error_setg(errp, "multifd: unknown ram block %s", 354 packet->ramblock); 355 return -1; 356 } 357 358 for (i = 0; i < p->pages->used; i++) { 359 uint64_t offset = be64_to_cpu(packet->offset[i]); 360 361 if (offset > (block->used_length - qemu_target_page_size())) { 362 error_setg(errp, "multifd: offset too long %" PRIu64 363 " (max " RAM_ADDR_FMT ")", 364 offset, block->used_length); 365 return -1; 366 } 367 p->pages->iov[i].iov_base = block->host + offset; 368 p->pages->iov[i].iov_len = qemu_target_page_size(); 369 } 370 371 return 0; 372 } 373 374 struct { 375 MultiFDSendParams *params; 376 /* array of pages to sent */ 377 MultiFDPages_t *pages; 378 /* global number of generated multifd packets */ 379 uint64_t packet_num; 380 /* send channels ready */ 381 QemuSemaphore channels_ready; 382 /* 383 * Have we already run terminate threads. There is a race when it 384 * happens that we got one error while we are exiting. 385 * We will use atomic operations. Only valid values are 0 and 1. 386 */ 387 int exiting; 388 /* multifd ops */ 389 MultiFDMethods *ops; 390 } *multifd_send_state; 391 392 /* 393 * How we use multifd_send_state->pages and channel->pages? 394 * 395 * We create a pages for each channel, and a main one. Each time that 396 * we need to send a batch of pages we interchange the ones between 397 * multifd_send_state and the channel that is sending it. There are 398 * two reasons for that: 399 * - to not have to do so many mallocs during migration 400 * - to make easier to know what to free at the end of migration 401 * 402 * This way we always know who is the owner of each "pages" struct, 403 * and we don't need any locking. It belongs to the migration thread 404 * or to the channel thread. Switching is safe because the migration 405 * thread is using the channel mutex when changing it, and the channel 406 * have to had finish with its own, otherwise pending_job can't be 407 * false. 408 */ 409 410 static int multifd_send_pages(QEMUFile *f) 411 { 412 int i; 413 static int next_channel; 414 MultiFDSendParams *p = NULL; /* make happy gcc */ 415 MultiFDPages_t *pages = multifd_send_state->pages; 416 uint64_t transferred; 417 418 if (qatomic_read(&multifd_send_state->exiting)) { 419 return -1; 420 } 421 422 qemu_sem_wait(&multifd_send_state->channels_ready); 423 /* 424 * next_channel can remain from a previous migration that was 425 * using more channels, so ensure it doesn't overflow if the 426 * limit is lower now. 427 */ 428 next_channel %= migrate_multifd_channels(); 429 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 430 p = &multifd_send_state->params[i]; 431 432 qemu_mutex_lock(&p->mutex); 433 if (p->quit) { 434 error_report("%s: channel %d has already quit!", __func__, i); 435 qemu_mutex_unlock(&p->mutex); 436 return -1; 437 } 438 if (!p->pending_job) { 439 p->pending_job++; 440 next_channel = (i + 1) % migrate_multifd_channels(); 441 break; 442 } 443 qemu_mutex_unlock(&p->mutex); 444 } 445 assert(!p->pages->used); 446 assert(!p->pages->block); 447 448 p->packet_num = multifd_send_state->packet_num++; 449 multifd_send_state->pages = p->pages; 450 p->pages = pages; 451 transferred = ((uint64_t) pages->used) * qemu_target_page_size() 452 + p->packet_len; 453 qemu_file_update_transfer(f, transferred); 454 ram_counters.multifd_bytes += transferred; 455 ram_counters.transferred += transferred; 456 qemu_mutex_unlock(&p->mutex); 457 qemu_sem_post(&p->sem); 458 459 return 1; 460 } 461 462 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 463 { 464 MultiFDPages_t *pages = multifd_send_state->pages; 465 466 if (!pages->block) { 467 pages->block = block; 468 } 469 470 if (pages->block == block) { 471 pages->offset[pages->used] = offset; 472 pages->iov[pages->used].iov_base = block->host + offset; 473 pages->iov[pages->used].iov_len = qemu_target_page_size(); 474 pages->used++; 475 476 if (pages->used < pages->allocated) { 477 return 1; 478 } 479 } 480 481 if (multifd_send_pages(f) < 0) { 482 return -1; 483 } 484 485 if (pages->block != block) { 486 return multifd_queue_page(f, block, offset); 487 } 488 489 return 1; 490 } 491 492 static void multifd_send_terminate_threads(Error *err) 493 { 494 int i; 495 496 trace_multifd_send_terminate_threads(err != NULL); 497 498 if (err) { 499 MigrationState *s = migrate_get_current(); 500 migrate_set_error(s, err); 501 if (s->state == MIGRATION_STATUS_SETUP || 502 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 503 s->state == MIGRATION_STATUS_DEVICE || 504 s->state == MIGRATION_STATUS_ACTIVE) { 505 migrate_set_state(&s->state, s->state, 506 MIGRATION_STATUS_FAILED); 507 } 508 } 509 510 /* 511 * We don't want to exit each threads twice. Depending on where 512 * we get the error, or if there are two independent errors in two 513 * threads at the same time, we can end calling this function 514 * twice. 515 */ 516 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 517 return; 518 } 519 520 for (i = 0; i < migrate_multifd_channels(); i++) { 521 MultiFDSendParams *p = &multifd_send_state->params[i]; 522 523 qemu_mutex_lock(&p->mutex); 524 p->quit = true; 525 qemu_sem_post(&p->sem); 526 qemu_mutex_unlock(&p->mutex); 527 } 528 } 529 530 void multifd_save_cleanup(void) 531 { 532 int i; 533 534 if (!migrate_use_multifd() || !migrate_multifd_is_allowed()) { 535 return; 536 } 537 multifd_send_terminate_threads(NULL); 538 for (i = 0; i < migrate_multifd_channels(); i++) { 539 MultiFDSendParams *p = &multifd_send_state->params[i]; 540 541 if (p->running) { 542 qemu_thread_join(&p->thread); 543 } 544 } 545 for (i = 0; i < migrate_multifd_channels(); i++) { 546 MultiFDSendParams *p = &multifd_send_state->params[i]; 547 Error *local_err = NULL; 548 549 if (p->registered_yank) { 550 migration_ioc_unregister_yank(p->c); 551 } 552 socket_send_channel_destroy(p->c); 553 p->c = NULL; 554 qemu_mutex_destroy(&p->mutex); 555 qemu_sem_destroy(&p->sem); 556 qemu_sem_destroy(&p->sem_sync); 557 g_free(p->name); 558 p->name = NULL; 559 g_free(p->tls_hostname); 560 p->tls_hostname = NULL; 561 multifd_pages_clear(p->pages); 562 p->pages = NULL; 563 p->packet_len = 0; 564 g_free(p->packet); 565 p->packet = NULL; 566 multifd_send_state->ops->send_cleanup(p, &local_err); 567 if (local_err) { 568 migrate_set_error(migrate_get_current(), local_err); 569 error_free(local_err); 570 } 571 } 572 qemu_sem_destroy(&multifd_send_state->channels_ready); 573 g_free(multifd_send_state->params); 574 multifd_send_state->params = NULL; 575 multifd_pages_clear(multifd_send_state->pages); 576 multifd_send_state->pages = NULL; 577 g_free(multifd_send_state); 578 multifd_send_state = NULL; 579 } 580 581 void multifd_send_sync_main(QEMUFile *f) 582 { 583 int i; 584 585 if (!migrate_use_multifd()) { 586 return; 587 } 588 if (multifd_send_state->pages->used) { 589 if (multifd_send_pages(f) < 0) { 590 error_report("%s: multifd_send_pages fail", __func__); 591 return; 592 } 593 } 594 for (i = 0; i < migrate_multifd_channels(); i++) { 595 MultiFDSendParams *p = &multifd_send_state->params[i]; 596 597 trace_multifd_send_sync_main_signal(p->id); 598 599 qemu_mutex_lock(&p->mutex); 600 601 if (p->quit) { 602 error_report("%s: channel %d has already quit", __func__, i); 603 qemu_mutex_unlock(&p->mutex); 604 return; 605 } 606 607 p->packet_num = multifd_send_state->packet_num++; 608 p->flags |= MULTIFD_FLAG_SYNC; 609 p->pending_job++; 610 qemu_file_update_transfer(f, p->packet_len); 611 ram_counters.multifd_bytes += p->packet_len; 612 ram_counters.transferred += p->packet_len; 613 qemu_mutex_unlock(&p->mutex); 614 qemu_sem_post(&p->sem); 615 } 616 for (i = 0; i < migrate_multifd_channels(); i++) { 617 MultiFDSendParams *p = &multifd_send_state->params[i]; 618 619 trace_multifd_send_sync_main_wait(p->id); 620 qemu_sem_wait(&p->sem_sync); 621 } 622 trace_multifd_send_sync_main(multifd_send_state->packet_num); 623 } 624 625 static void *multifd_send_thread(void *opaque) 626 { 627 MultiFDSendParams *p = opaque; 628 Error *local_err = NULL; 629 int ret = 0; 630 uint32_t flags = 0; 631 632 trace_multifd_send_thread_start(p->id); 633 rcu_register_thread(); 634 635 if (multifd_send_initial_packet(p, &local_err) < 0) { 636 ret = -1; 637 goto out; 638 } 639 /* initial packet */ 640 p->num_packets = 1; 641 642 while (true) { 643 qemu_sem_wait(&p->sem); 644 645 if (qatomic_read(&multifd_send_state->exiting)) { 646 break; 647 } 648 qemu_mutex_lock(&p->mutex); 649 650 if (p->pending_job) { 651 uint32_t used = p->pages->used; 652 uint64_t packet_num = p->packet_num; 653 flags = p->flags; 654 655 if (used) { 656 ret = multifd_send_state->ops->send_prepare(p, used, 657 &local_err); 658 if (ret != 0) { 659 qemu_mutex_unlock(&p->mutex); 660 break; 661 } 662 } 663 multifd_send_fill_packet(p); 664 p->flags = 0; 665 p->num_packets++; 666 p->num_pages += used; 667 p->pages->used = 0; 668 p->pages->block = NULL; 669 qemu_mutex_unlock(&p->mutex); 670 671 trace_multifd_send(p->id, packet_num, used, flags, 672 p->next_packet_size); 673 674 ret = qio_channel_write_all(p->c, (void *)p->packet, 675 p->packet_len, &local_err); 676 if (ret != 0) { 677 break; 678 } 679 680 if (used) { 681 ret = multifd_send_state->ops->send_write(p, used, &local_err); 682 if (ret != 0) { 683 break; 684 } 685 } 686 687 qemu_mutex_lock(&p->mutex); 688 p->pending_job--; 689 qemu_mutex_unlock(&p->mutex); 690 691 if (flags & MULTIFD_FLAG_SYNC) { 692 qemu_sem_post(&p->sem_sync); 693 } 694 qemu_sem_post(&multifd_send_state->channels_ready); 695 } else if (p->quit) { 696 qemu_mutex_unlock(&p->mutex); 697 break; 698 } else { 699 qemu_mutex_unlock(&p->mutex); 700 /* sometimes there are spurious wakeups */ 701 } 702 } 703 704 out: 705 if (local_err) { 706 trace_multifd_send_error(p->id); 707 multifd_send_terminate_threads(local_err); 708 error_free(local_err); 709 } 710 711 /* 712 * Error happen, I will exit, but I can't just leave, tell 713 * who pay attention to me. 714 */ 715 if (ret != 0) { 716 qemu_sem_post(&p->sem_sync); 717 qemu_sem_post(&multifd_send_state->channels_ready); 718 } 719 720 qemu_mutex_lock(&p->mutex); 721 p->running = false; 722 qemu_mutex_unlock(&p->mutex); 723 724 rcu_unregister_thread(); 725 trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); 726 727 return NULL; 728 } 729 730 static bool multifd_channel_connect(MultiFDSendParams *p, 731 QIOChannel *ioc, 732 Error *error); 733 734 static void multifd_tls_outgoing_handshake(QIOTask *task, 735 gpointer opaque) 736 { 737 MultiFDSendParams *p = opaque; 738 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 739 Error *err = NULL; 740 741 if (qio_task_propagate_error(task, &err)) { 742 trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); 743 } else { 744 trace_multifd_tls_outgoing_handshake_complete(ioc); 745 } 746 747 if (!multifd_channel_connect(p, ioc, err)) { 748 /* 749 * Error happen, mark multifd_send_thread status as 'quit' although it 750 * is not created, and then tell who pay attention to me. 751 */ 752 p->quit = true; 753 qemu_sem_post(&multifd_send_state->channels_ready); 754 qemu_sem_post(&p->sem_sync); 755 } 756 } 757 758 static void *multifd_tls_handshake_thread(void *opaque) 759 { 760 MultiFDSendParams *p = opaque; 761 QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 762 763 qio_channel_tls_handshake(tioc, 764 multifd_tls_outgoing_handshake, 765 p, 766 NULL, 767 NULL); 768 return NULL; 769 } 770 771 static void multifd_tls_channel_connect(MultiFDSendParams *p, 772 QIOChannel *ioc, 773 Error **errp) 774 { 775 MigrationState *s = migrate_get_current(); 776 const char *hostname = p->tls_hostname; 777 QIOChannelTLS *tioc; 778 779 tioc = migration_tls_client_create(s, ioc, hostname, errp); 780 if (!tioc) { 781 return; 782 } 783 784 object_unref(OBJECT(ioc)); 785 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 786 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 787 p->c = QIO_CHANNEL(tioc); 788 qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", 789 multifd_tls_handshake_thread, p, 790 QEMU_THREAD_JOINABLE); 791 } 792 793 static bool multifd_channel_connect(MultiFDSendParams *p, 794 QIOChannel *ioc, 795 Error *error) 796 { 797 MigrationState *s = migrate_get_current(); 798 799 trace_multifd_set_outgoing_channel( 800 ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error); 801 802 if (!error) { 803 if (s->parameters.tls_creds && 804 *s->parameters.tls_creds && 805 !object_dynamic_cast(OBJECT(ioc), 806 TYPE_QIO_CHANNEL_TLS)) { 807 multifd_tls_channel_connect(p, ioc, &error); 808 if (!error) { 809 /* 810 * tls_channel_connect will call back to this 811 * function after the TLS handshake, 812 * so we mustn't call multifd_send_thread until then 813 */ 814 return true; 815 } else { 816 return false; 817 } 818 } else { 819 migration_ioc_register_yank(ioc); 820 p->registered_yank = true; 821 p->c = ioc; 822 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 823 QEMU_THREAD_JOINABLE); 824 } 825 return true; 826 } 827 828 return false; 829 } 830 831 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, 832 QIOChannel *ioc, Error *err) 833 { 834 migrate_set_error(migrate_get_current(), err); 835 /* Error happen, we need to tell who pay attention to me */ 836 qemu_sem_post(&multifd_send_state->channels_ready); 837 qemu_sem_post(&p->sem_sync); 838 /* 839 * Although multifd_send_thread is not created, but main migration 840 * thread neet to judge whether it is running, so we need to mark 841 * its status. 842 */ 843 p->quit = true; 844 object_unref(OBJECT(ioc)); 845 error_free(err); 846 } 847 848 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 849 { 850 MultiFDSendParams *p = opaque; 851 QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 852 Error *local_err = NULL; 853 854 trace_multifd_new_send_channel_async(p->id); 855 if (qio_task_propagate_error(task, &local_err)) { 856 goto cleanup; 857 } else { 858 p->c = QIO_CHANNEL(sioc); 859 qio_channel_set_delay(p->c, false); 860 p->running = true; 861 if (!multifd_channel_connect(p, sioc, local_err)) { 862 goto cleanup; 863 } 864 return; 865 } 866 867 cleanup: 868 multifd_new_send_channel_cleanup(p, sioc, local_err); 869 } 870 871 static bool migrate_allow_multifd = true; 872 void migrate_protocol_allow_multifd(bool allow) 873 { 874 migrate_allow_multifd = allow; 875 } 876 877 bool migrate_multifd_is_allowed(void) 878 { 879 return migrate_allow_multifd; 880 } 881 882 int multifd_save_setup(Error **errp) 883 { 884 int thread_count; 885 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 886 uint8_t i; 887 MigrationState *s; 888 889 if (!migrate_use_multifd()) { 890 return 0; 891 } 892 if (!migrate_multifd_is_allowed()) { 893 error_setg(errp, "multifd is not supported by current protocol"); 894 return -1; 895 } 896 897 s = migrate_get_current(); 898 thread_count = migrate_multifd_channels(); 899 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 900 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 901 multifd_send_state->pages = multifd_pages_init(page_count); 902 qemu_sem_init(&multifd_send_state->channels_ready, 0); 903 qatomic_set(&multifd_send_state->exiting, 0); 904 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 905 906 for (i = 0; i < thread_count; i++) { 907 MultiFDSendParams *p = &multifd_send_state->params[i]; 908 909 qemu_mutex_init(&p->mutex); 910 qemu_sem_init(&p->sem, 0); 911 qemu_sem_init(&p->sem_sync, 0); 912 p->quit = false; 913 p->pending_job = 0; 914 p->id = i; 915 p->pages = multifd_pages_init(page_count); 916 p->packet_len = sizeof(MultiFDPacket_t) 917 + sizeof(uint64_t) * page_count; 918 p->packet = g_malloc0(p->packet_len); 919 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 920 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 921 p->name = g_strdup_printf("multifdsend_%d", i); 922 p->tls_hostname = g_strdup(s->hostname); 923 socket_send_channel_create(multifd_new_send_channel_async, p); 924 } 925 926 for (i = 0; i < thread_count; i++) { 927 MultiFDSendParams *p = &multifd_send_state->params[i]; 928 Error *local_err = NULL; 929 int ret; 930 931 ret = multifd_send_state->ops->send_setup(p, &local_err); 932 if (ret) { 933 error_propagate(errp, local_err); 934 return ret; 935 } 936 } 937 return 0; 938 } 939 940 struct { 941 MultiFDRecvParams *params; 942 /* number of created threads */ 943 int count; 944 /* syncs main thread and channels */ 945 QemuSemaphore sem_sync; 946 /* global number of generated multifd packets */ 947 uint64_t packet_num; 948 /* multifd ops */ 949 MultiFDMethods *ops; 950 } *multifd_recv_state; 951 952 static void multifd_recv_terminate_threads(Error *err) 953 { 954 int i; 955 956 trace_multifd_recv_terminate_threads(err != NULL); 957 958 if (err) { 959 MigrationState *s = migrate_get_current(); 960 migrate_set_error(s, err); 961 if (s->state == MIGRATION_STATUS_SETUP || 962 s->state == MIGRATION_STATUS_ACTIVE) { 963 migrate_set_state(&s->state, s->state, 964 MIGRATION_STATUS_FAILED); 965 } 966 } 967 968 for (i = 0; i < migrate_multifd_channels(); i++) { 969 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 970 971 qemu_mutex_lock(&p->mutex); 972 p->quit = true; 973 /* 974 * We could arrive here for two reasons: 975 * - normal quit, i.e. everything went fine, just finished 976 * - error quit: We close the channels so the channel threads 977 * finish the qio_channel_read_all_eof() 978 */ 979 if (p->c) { 980 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 981 } 982 qemu_mutex_unlock(&p->mutex); 983 } 984 } 985 986 int multifd_load_cleanup(Error **errp) 987 { 988 int i; 989 990 if (!migrate_use_multifd() || !migrate_multifd_is_allowed()) { 991 return 0; 992 } 993 multifd_recv_terminate_threads(NULL); 994 for (i = 0; i < migrate_multifd_channels(); i++) { 995 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 996 997 if (p->running) { 998 p->quit = true; 999 /* 1000 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1001 * however try to wakeup it without harm in cleanup phase. 1002 */ 1003 qemu_sem_post(&p->sem_sync); 1004 qemu_thread_join(&p->thread); 1005 } 1006 } 1007 for (i = 0; i < migrate_multifd_channels(); i++) { 1008 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1009 1010 migration_ioc_unregister_yank(p->c); 1011 object_unref(OBJECT(p->c)); 1012 p->c = NULL; 1013 qemu_mutex_destroy(&p->mutex); 1014 qemu_sem_destroy(&p->sem_sync); 1015 g_free(p->name); 1016 p->name = NULL; 1017 multifd_pages_clear(p->pages); 1018 p->pages = NULL; 1019 p->packet_len = 0; 1020 g_free(p->packet); 1021 p->packet = NULL; 1022 multifd_recv_state->ops->recv_cleanup(p); 1023 } 1024 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1025 g_free(multifd_recv_state->params); 1026 multifd_recv_state->params = NULL; 1027 g_free(multifd_recv_state); 1028 multifd_recv_state = NULL; 1029 1030 return 0; 1031 } 1032 1033 void multifd_recv_sync_main(void) 1034 { 1035 int i; 1036 1037 if (!migrate_use_multifd()) { 1038 return; 1039 } 1040 for (i = 0; i < migrate_multifd_channels(); i++) { 1041 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1042 1043 trace_multifd_recv_sync_main_wait(p->id); 1044 qemu_sem_wait(&multifd_recv_state->sem_sync); 1045 } 1046 for (i = 0; i < migrate_multifd_channels(); i++) { 1047 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1048 1049 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1050 if (multifd_recv_state->packet_num < p->packet_num) { 1051 multifd_recv_state->packet_num = p->packet_num; 1052 } 1053 } 1054 trace_multifd_recv_sync_main_signal(p->id); 1055 qemu_sem_post(&p->sem_sync); 1056 } 1057 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1058 } 1059 1060 static void *multifd_recv_thread(void *opaque) 1061 { 1062 MultiFDRecvParams *p = opaque; 1063 Error *local_err = NULL; 1064 int ret; 1065 1066 trace_multifd_recv_thread_start(p->id); 1067 rcu_register_thread(); 1068 1069 while (true) { 1070 uint32_t used; 1071 uint32_t flags; 1072 1073 if (p->quit) { 1074 break; 1075 } 1076 1077 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1078 p->packet_len, &local_err); 1079 if (ret == 0) { /* EOF */ 1080 break; 1081 } 1082 if (ret == -1) { /* Error */ 1083 break; 1084 } 1085 1086 qemu_mutex_lock(&p->mutex); 1087 ret = multifd_recv_unfill_packet(p, &local_err); 1088 if (ret) { 1089 qemu_mutex_unlock(&p->mutex); 1090 break; 1091 } 1092 1093 used = p->pages->used; 1094 flags = p->flags; 1095 /* recv methods don't know how to handle the SYNC flag */ 1096 p->flags &= ~MULTIFD_FLAG_SYNC; 1097 trace_multifd_recv(p->id, p->packet_num, used, flags, 1098 p->next_packet_size); 1099 p->num_packets++; 1100 p->num_pages += used; 1101 qemu_mutex_unlock(&p->mutex); 1102 1103 if (used) { 1104 ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); 1105 if (ret != 0) { 1106 break; 1107 } 1108 } 1109 1110 if (flags & MULTIFD_FLAG_SYNC) { 1111 qemu_sem_post(&multifd_recv_state->sem_sync); 1112 qemu_sem_wait(&p->sem_sync); 1113 } 1114 } 1115 1116 if (local_err) { 1117 multifd_recv_terminate_threads(local_err); 1118 error_free(local_err); 1119 } 1120 qemu_mutex_lock(&p->mutex); 1121 p->running = false; 1122 qemu_mutex_unlock(&p->mutex); 1123 1124 rcu_unregister_thread(); 1125 trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); 1126 1127 return NULL; 1128 } 1129 1130 int multifd_load_setup(Error **errp) 1131 { 1132 int thread_count; 1133 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1134 uint8_t i; 1135 1136 if (!migrate_use_multifd()) { 1137 return 0; 1138 } 1139 if (!migrate_multifd_is_allowed()) { 1140 error_setg(errp, "multifd is not supported by current protocol"); 1141 return -1; 1142 } 1143 thread_count = migrate_multifd_channels(); 1144 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1145 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1146 qatomic_set(&multifd_recv_state->count, 0); 1147 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1148 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1149 1150 for (i = 0; i < thread_count; i++) { 1151 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1152 1153 qemu_mutex_init(&p->mutex); 1154 qemu_sem_init(&p->sem_sync, 0); 1155 p->quit = false; 1156 p->id = i; 1157 p->pages = multifd_pages_init(page_count); 1158 p->packet_len = sizeof(MultiFDPacket_t) 1159 + sizeof(uint64_t) * page_count; 1160 p->packet = g_malloc0(p->packet_len); 1161 p->name = g_strdup_printf("multifdrecv_%d", i); 1162 } 1163 1164 for (i = 0; i < thread_count; i++) { 1165 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1166 Error *local_err = NULL; 1167 int ret; 1168 1169 ret = multifd_recv_state->ops->recv_setup(p, &local_err); 1170 if (ret) { 1171 error_propagate(errp, local_err); 1172 return ret; 1173 } 1174 } 1175 return 0; 1176 } 1177 1178 bool multifd_recv_all_channels_created(void) 1179 { 1180 int thread_count = migrate_multifd_channels(); 1181 1182 if (!migrate_use_multifd()) { 1183 return true; 1184 } 1185 1186 if (!multifd_recv_state) { 1187 /* Called before any connections created */ 1188 return false; 1189 } 1190 1191 return thread_count == qatomic_read(&multifd_recv_state->count); 1192 } 1193 1194 /* 1195 * Try to receive all multifd channels to get ready for the migration. 1196 * - Return true and do not set @errp when correctly receiving all channels; 1197 * - Return false and do not set @errp when correctly receiving the current one; 1198 * - Return false and set @errp when failing to receive the current channel. 1199 */ 1200 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1201 { 1202 MultiFDRecvParams *p; 1203 Error *local_err = NULL; 1204 int id; 1205 1206 id = multifd_recv_initial_packet(ioc, &local_err); 1207 if (id < 0) { 1208 multifd_recv_terminate_threads(local_err); 1209 error_propagate_prepend(errp, local_err, 1210 "failed to receive packet" 1211 " via multifd channel %d: ", 1212 qatomic_read(&multifd_recv_state->count)); 1213 return false; 1214 } 1215 trace_multifd_recv_new_channel(id); 1216 1217 p = &multifd_recv_state->params[id]; 1218 if (p->c != NULL) { 1219 error_setg(&local_err, "multifd: received id '%d' already setup'", 1220 id); 1221 multifd_recv_terminate_threads(local_err); 1222 error_propagate(errp, local_err); 1223 return false; 1224 } 1225 p->c = ioc; 1226 object_ref(OBJECT(ioc)); 1227 /* initial packet */ 1228 p->num_packets = 1; 1229 1230 p->running = true; 1231 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1232 QEMU_THREAD_JOINABLE); 1233 qatomic_inc(&multifd_recv_state->count); 1234 return qatomic_read(&multifd_recv_state->count) == 1235 migrate_multifd_channels(); 1236 } 1237