1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "socket.h" 23 #include "tls.h" 24 #include "qemu-file.h" 25 #include "trace.h" 26 #include "multifd.h" 27 28 #include "qemu/yank.h" 29 #include "io/channel-socket.h" 30 #include "yank_functions.h" 31 32 /* Multiple fd's */ 33 34 #define MULTIFD_MAGIC 0x11223344U 35 #define MULTIFD_VERSION 1 36 37 typedef struct { 38 uint32_t magic; 39 uint32_t version; 40 unsigned char uuid[16]; /* QemuUUID */ 41 uint8_t id; 42 uint8_t unused1[7]; /* Reserved for future use */ 43 uint64_t unused2[4]; /* Reserved for future use */ 44 } __attribute__((packed)) MultiFDInit_t; 45 46 /* Multifd without compression */ 47 48 /** 49 * nocomp_send_setup: setup send side 50 * 51 * For no compression this function does nothing. 52 * 53 * Returns 0 for success or -1 for error 54 * 55 * @p: Params for the channel that we are using 56 * @errp: pointer to an error 57 */ 58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 59 { 60 return 0; 61 } 62 63 /** 64 * nocomp_send_cleanup: cleanup send side 65 * 66 * For no compression this function does nothing. 67 * 68 * @p: Params for the channel that we are using 69 * @errp: pointer to an error 70 */ 71 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 72 { 73 return; 74 } 75 76 /** 77 * nocomp_send_prepare: prepare date to be able to send 78 * 79 * For no compression we just have to calculate the size of the 80 * packet. 81 * 82 * Returns 0 for success or -1 for error 83 * 84 * @p: Params for the channel that we are using 85 * @errp: pointer to an error 86 */ 87 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) 88 { 89 p->next_packet_size = p->pages->num * qemu_target_page_size(); 90 p->flags |= MULTIFD_FLAG_NOCOMP; 91 return 0; 92 } 93 94 /** 95 * nocomp_send_write: do the actual write of the data 96 * 97 * For no compression we just have to write the data. 98 * 99 * Returns 0 for success or -1 for error 100 * 101 * @p: Params for the channel that we are using 102 * @used: number of pages used 103 * @errp: pointer to an error 104 */ 105 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 106 { 107 return qio_channel_writev_all(p->c, p->pages->iov, used, errp); 108 } 109 110 /** 111 * nocomp_recv_setup: setup receive side 112 * 113 * For no compression this function does nothing. 114 * 115 * Returns 0 for success or -1 for error 116 * 117 * @p: Params for the channel that we are using 118 * @errp: pointer to an error 119 */ 120 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 121 { 122 return 0; 123 } 124 125 /** 126 * nocomp_recv_cleanup: setup receive side 127 * 128 * For no compression this function does nothing. 129 * 130 * @p: Params for the channel that we are using 131 */ 132 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 133 { 134 } 135 136 /** 137 * nocomp_recv_pages: read the data from the channel into actual pages 138 * 139 * For no compression we just need to read things into the correct place. 140 * 141 * Returns 0 for success or -1 for error 142 * 143 * @p: Params for the channel that we are using 144 * @errp: pointer to an error 145 */ 146 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp) 147 { 148 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 149 150 if (flags != MULTIFD_FLAG_NOCOMP) { 151 error_setg(errp, "multifd %d: flags received %x flags expected %x", 152 p->id, flags, MULTIFD_FLAG_NOCOMP); 153 return -1; 154 } 155 return qio_channel_readv_all(p->c, p->pages->iov, p->pages->num, errp); 156 } 157 158 static MultiFDMethods multifd_nocomp_ops = { 159 .send_setup = nocomp_send_setup, 160 .send_cleanup = nocomp_send_cleanup, 161 .send_prepare = nocomp_send_prepare, 162 .send_write = nocomp_send_write, 163 .recv_setup = nocomp_recv_setup, 164 .recv_cleanup = nocomp_recv_cleanup, 165 .recv_pages = nocomp_recv_pages 166 }; 167 168 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 169 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 170 }; 171 172 void multifd_register_ops(int method, MultiFDMethods *ops) 173 { 174 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 175 multifd_ops[method] = ops; 176 } 177 178 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 179 { 180 MultiFDInit_t msg = {}; 181 int ret; 182 183 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 184 msg.version = cpu_to_be32(MULTIFD_VERSION); 185 msg.id = p->id; 186 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 187 188 ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 189 if (ret != 0) { 190 return -1; 191 } 192 return 0; 193 } 194 195 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 196 { 197 MultiFDInit_t msg; 198 int ret; 199 200 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 201 if (ret != 0) { 202 return -1; 203 } 204 205 msg.magic = be32_to_cpu(msg.magic); 206 msg.version = be32_to_cpu(msg.version); 207 208 if (msg.magic != MULTIFD_MAGIC) { 209 error_setg(errp, "multifd: received packet magic %x " 210 "expected %x", msg.magic, MULTIFD_MAGIC); 211 return -1; 212 } 213 214 if (msg.version != MULTIFD_VERSION) { 215 error_setg(errp, "multifd: received packet version %d " 216 "expected %d", msg.version, MULTIFD_VERSION); 217 return -1; 218 } 219 220 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 221 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 222 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 223 224 error_setg(errp, "multifd: received uuid '%s' and expected " 225 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 226 g_free(uuid); 227 g_free(msg_uuid); 228 return -1; 229 } 230 231 if (msg.id > migrate_multifd_channels()) { 232 error_setg(errp, "multifd: received channel version %d " 233 "expected %d", msg.version, MULTIFD_VERSION); 234 return -1; 235 } 236 237 return msg.id; 238 } 239 240 static MultiFDPages_t *multifd_pages_init(size_t size) 241 { 242 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 243 244 pages->allocated = size; 245 pages->iov = g_new0(struct iovec, size); 246 pages->offset = g_new0(ram_addr_t, size); 247 248 return pages; 249 } 250 251 static void multifd_pages_clear(MultiFDPages_t *pages) 252 { 253 pages->num = 0; 254 pages->allocated = 0; 255 pages->packet_num = 0; 256 pages->block = NULL; 257 g_free(pages->iov); 258 pages->iov = NULL; 259 g_free(pages->offset); 260 pages->offset = NULL; 261 g_free(pages); 262 } 263 264 static void multifd_send_fill_packet(MultiFDSendParams *p) 265 { 266 MultiFDPacket_t *packet = p->packet; 267 int i; 268 269 packet->flags = cpu_to_be32(p->flags); 270 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 271 packet->pages_used = cpu_to_be32(p->pages->num); 272 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 273 packet->packet_num = cpu_to_be64(p->packet_num); 274 275 if (p->pages->block) { 276 strncpy(packet->ramblock, p->pages->block->idstr, 256); 277 } 278 279 for (i = 0; i < p->pages->num; i++) { 280 /* there are architectures where ram_addr_t is 32 bit */ 281 uint64_t temp = p->pages->offset[i]; 282 283 packet->offset[i] = cpu_to_be64(temp); 284 } 285 } 286 287 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 288 { 289 MultiFDPacket_t *packet = p->packet; 290 size_t page_size = qemu_target_page_size(); 291 uint32_t pages_max = MULTIFD_PACKET_SIZE / page_size; 292 RAMBlock *block; 293 int i; 294 295 packet->magic = be32_to_cpu(packet->magic); 296 if (packet->magic != MULTIFD_MAGIC) { 297 error_setg(errp, "multifd: received packet " 298 "magic %x and expected magic %x", 299 packet->magic, MULTIFD_MAGIC); 300 return -1; 301 } 302 303 packet->version = be32_to_cpu(packet->version); 304 if (packet->version != MULTIFD_VERSION) { 305 error_setg(errp, "multifd: received packet " 306 "version %d and expected version %d", 307 packet->version, MULTIFD_VERSION); 308 return -1; 309 } 310 311 p->flags = be32_to_cpu(packet->flags); 312 313 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 314 /* 315 * If we received a packet that is 100 times bigger than expected 316 * just stop migration. It is a magic number. 317 */ 318 if (packet->pages_alloc > pages_max * 100) { 319 error_setg(errp, "multifd: received packet " 320 "with size %d and expected a maximum size of %d", 321 packet->pages_alloc, pages_max * 100) ; 322 return -1; 323 } 324 /* 325 * We received a packet that is bigger than expected but inside 326 * reasonable limits (see previous comment). Just reallocate. 327 */ 328 if (packet->pages_alloc > p->pages->allocated) { 329 multifd_pages_clear(p->pages); 330 p->pages = multifd_pages_init(packet->pages_alloc); 331 } 332 333 p->pages->num = be32_to_cpu(packet->pages_used); 334 if (p->pages->num > packet->pages_alloc) { 335 error_setg(errp, "multifd: received packet " 336 "with %d pages and expected maximum pages are %d", 337 p->pages->num, packet->pages_alloc) ; 338 return -1; 339 } 340 341 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 342 p->packet_num = be64_to_cpu(packet->packet_num); 343 344 if (p->pages->num == 0) { 345 return 0; 346 } 347 348 /* make sure that ramblock is 0 terminated */ 349 packet->ramblock[255] = 0; 350 block = qemu_ram_block_by_name(packet->ramblock); 351 if (!block) { 352 error_setg(errp, "multifd: unknown ram block %s", 353 packet->ramblock); 354 return -1; 355 } 356 357 p->pages->block = block; 358 for (i = 0; i < p->pages->num; i++) { 359 uint64_t offset = be64_to_cpu(packet->offset[i]); 360 361 if (offset > (block->used_length - page_size)) { 362 error_setg(errp, "multifd: offset too long %" PRIu64 363 " (max " RAM_ADDR_FMT ")", 364 offset, block->used_length); 365 return -1; 366 } 367 p->pages->offset[i] = offset; 368 p->pages->iov[i].iov_base = block->host + offset; 369 p->pages->iov[i].iov_len = page_size; 370 } 371 372 return 0; 373 } 374 375 struct { 376 MultiFDSendParams *params; 377 /* array of pages to sent */ 378 MultiFDPages_t *pages; 379 /* global number of generated multifd packets */ 380 uint64_t packet_num; 381 /* send channels ready */ 382 QemuSemaphore channels_ready; 383 /* 384 * Have we already run terminate threads. There is a race when it 385 * happens that we got one error while we are exiting. 386 * We will use atomic operations. Only valid values are 0 and 1. 387 */ 388 int exiting; 389 /* multifd ops */ 390 MultiFDMethods *ops; 391 } *multifd_send_state; 392 393 /* 394 * How we use multifd_send_state->pages and channel->pages? 395 * 396 * We create a pages for each channel, and a main one. Each time that 397 * we need to send a batch of pages we interchange the ones between 398 * multifd_send_state and the channel that is sending it. There are 399 * two reasons for that: 400 * - to not have to do so many mallocs during migration 401 * - to make easier to know what to free at the end of migration 402 * 403 * This way we always know who is the owner of each "pages" struct, 404 * and we don't need any locking. It belongs to the migration thread 405 * or to the channel thread. Switching is safe because the migration 406 * thread is using the channel mutex when changing it, and the channel 407 * have to had finish with its own, otherwise pending_job can't be 408 * false. 409 */ 410 411 static int multifd_send_pages(QEMUFile *f) 412 { 413 int i; 414 static int next_channel; 415 MultiFDSendParams *p = NULL; /* make happy gcc */ 416 MultiFDPages_t *pages = multifd_send_state->pages; 417 uint64_t transferred; 418 419 if (qatomic_read(&multifd_send_state->exiting)) { 420 return -1; 421 } 422 423 qemu_sem_wait(&multifd_send_state->channels_ready); 424 /* 425 * next_channel can remain from a previous migration that was 426 * using more channels, so ensure it doesn't overflow if the 427 * limit is lower now. 428 */ 429 next_channel %= migrate_multifd_channels(); 430 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 431 p = &multifd_send_state->params[i]; 432 433 qemu_mutex_lock(&p->mutex); 434 if (p->quit) { 435 error_report("%s: channel %d has already quit!", __func__, i); 436 qemu_mutex_unlock(&p->mutex); 437 return -1; 438 } 439 if (!p->pending_job) { 440 p->pending_job++; 441 next_channel = (i + 1) % migrate_multifd_channels(); 442 break; 443 } 444 qemu_mutex_unlock(&p->mutex); 445 } 446 assert(!p->pages->num); 447 assert(!p->pages->block); 448 449 p->packet_num = multifd_send_state->packet_num++; 450 multifd_send_state->pages = p->pages; 451 p->pages = pages; 452 transferred = ((uint64_t) pages->num) * qemu_target_page_size() 453 + p->packet_len; 454 qemu_file_update_transfer(f, transferred); 455 ram_counters.multifd_bytes += transferred; 456 ram_counters.transferred += transferred; 457 qemu_mutex_unlock(&p->mutex); 458 qemu_sem_post(&p->sem); 459 460 return 1; 461 } 462 463 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 464 { 465 MultiFDPages_t *pages = multifd_send_state->pages; 466 467 if (!pages->block) { 468 pages->block = block; 469 } 470 471 if (pages->block == block) { 472 pages->offset[pages->num] = offset; 473 pages->iov[pages->num].iov_base = block->host + offset; 474 pages->iov[pages->num].iov_len = qemu_target_page_size(); 475 pages->num++; 476 477 if (pages->num < pages->allocated) { 478 return 1; 479 } 480 } 481 482 if (multifd_send_pages(f) < 0) { 483 return -1; 484 } 485 486 if (pages->block != block) { 487 return multifd_queue_page(f, block, offset); 488 } 489 490 return 1; 491 } 492 493 static void multifd_send_terminate_threads(Error *err) 494 { 495 int i; 496 497 trace_multifd_send_terminate_threads(err != NULL); 498 499 if (err) { 500 MigrationState *s = migrate_get_current(); 501 migrate_set_error(s, err); 502 if (s->state == MIGRATION_STATUS_SETUP || 503 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 504 s->state == MIGRATION_STATUS_DEVICE || 505 s->state == MIGRATION_STATUS_ACTIVE) { 506 migrate_set_state(&s->state, s->state, 507 MIGRATION_STATUS_FAILED); 508 } 509 } 510 511 /* 512 * We don't want to exit each threads twice. Depending on where 513 * we get the error, or if there are two independent errors in two 514 * threads at the same time, we can end calling this function 515 * twice. 516 */ 517 if (qatomic_xchg(&multifd_send_state->exiting, 1)) { 518 return; 519 } 520 521 for (i = 0; i < migrate_multifd_channels(); i++) { 522 MultiFDSendParams *p = &multifd_send_state->params[i]; 523 524 qemu_mutex_lock(&p->mutex); 525 p->quit = true; 526 qemu_sem_post(&p->sem); 527 if (p->c) { 528 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 529 } 530 qemu_mutex_unlock(&p->mutex); 531 } 532 } 533 534 void multifd_save_cleanup(void) 535 { 536 int i; 537 538 if (!migrate_use_multifd() || !migrate_multifd_is_allowed()) { 539 return; 540 } 541 multifd_send_terminate_threads(NULL); 542 for (i = 0; i < migrate_multifd_channels(); i++) { 543 MultiFDSendParams *p = &multifd_send_state->params[i]; 544 545 if (p->running) { 546 qemu_thread_join(&p->thread); 547 } 548 } 549 for (i = 0; i < migrate_multifd_channels(); i++) { 550 MultiFDSendParams *p = &multifd_send_state->params[i]; 551 Error *local_err = NULL; 552 553 if (p->registered_yank) { 554 migration_ioc_unregister_yank(p->c); 555 } 556 socket_send_channel_destroy(p->c); 557 p->c = NULL; 558 qemu_mutex_destroy(&p->mutex); 559 qemu_sem_destroy(&p->sem); 560 qemu_sem_destroy(&p->sem_sync); 561 g_free(p->name); 562 p->name = NULL; 563 g_free(p->tls_hostname); 564 p->tls_hostname = NULL; 565 multifd_pages_clear(p->pages); 566 p->pages = NULL; 567 p->packet_len = 0; 568 g_free(p->packet); 569 p->packet = NULL; 570 multifd_send_state->ops->send_cleanup(p, &local_err); 571 if (local_err) { 572 migrate_set_error(migrate_get_current(), local_err); 573 error_free(local_err); 574 } 575 } 576 qemu_sem_destroy(&multifd_send_state->channels_ready); 577 g_free(multifd_send_state->params); 578 multifd_send_state->params = NULL; 579 multifd_pages_clear(multifd_send_state->pages); 580 multifd_send_state->pages = NULL; 581 g_free(multifd_send_state); 582 multifd_send_state = NULL; 583 } 584 585 void multifd_send_sync_main(QEMUFile *f) 586 { 587 int i; 588 589 if (!migrate_use_multifd()) { 590 return; 591 } 592 if (multifd_send_state->pages->num) { 593 if (multifd_send_pages(f) < 0) { 594 error_report("%s: multifd_send_pages fail", __func__); 595 return; 596 } 597 } 598 for (i = 0; i < migrate_multifd_channels(); i++) { 599 MultiFDSendParams *p = &multifd_send_state->params[i]; 600 601 trace_multifd_send_sync_main_signal(p->id); 602 603 qemu_mutex_lock(&p->mutex); 604 605 if (p->quit) { 606 error_report("%s: channel %d has already quit", __func__, i); 607 qemu_mutex_unlock(&p->mutex); 608 return; 609 } 610 611 p->packet_num = multifd_send_state->packet_num++; 612 p->flags |= MULTIFD_FLAG_SYNC; 613 p->pending_job++; 614 qemu_file_update_transfer(f, p->packet_len); 615 ram_counters.multifd_bytes += p->packet_len; 616 ram_counters.transferred += p->packet_len; 617 qemu_mutex_unlock(&p->mutex); 618 qemu_sem_post(&p->sem); 619 } 620 for (i = 0; i < migrate_multifd_channels(); i++) { 621 MultiFDSendParams *p = &multifd_send_state->params[i]; 622 623 trace_multifd_send_sync_main_wait(p->id); 624 qemu_sem_wait(&p->sem_sync); 625 } 626 trace_multifd_send_sync_main(multifd_send_state->packet_num); 627 } 628 629 static void *multifd_send_thread(void *opaque) 630 { 631 MultiFDSendParams *p = opaque; 632 Error *local_err = NULL; 633 int ret = 0; 634 635 trace_multifd_send_thread_start(p->id); 636 rcu_register_thread(); 637 638 if (multifd_send_initial_packet(p, &local_err) < 0) { 639 ret = -1; 640 goto out; 641 } 642 /* initial packet */ 643 p->num_packets = 1; 644 645 while (true) { 646 qemu_sem_wait(&p->sem); 647 648 if (qatomic_read(&multifd_send_state->exiting)) { 649 break; 650 } 651 qemu_mutex_lock(&p->mutex); 652 653 if (p->pending_job) { 654 uint32_t used = p->pages->num; 655 uint64_t packet_num = p->packet_num; 656 uint32_t flags = p->flags; 657 658 if (used) { 659 ret = multifd_send_state->ops->send_prepare(p, &local_err); 660 if (ret != 0) { 661 qemu_mutex_unlock(&p->mutex); 662 break; 663 } 664 } 665 multifd_send_fill_packet(p); 666 p->flags = 0; 667 p->num_packets++; 668 p->num_pages += used; 669 p->pages->num = 0; 670 p->pages->block = NULL; 671 qemu_mutex_unlock(&p->mutex); 672 673 trace_multifd_send(p->id, packet_num, used, flags, 674 p->next_packet_size); 675 676 ret = qio_channel_write_all(p->c, (void *)p->packet, 677 p->packet_len, &local_err); 678 if (ret != 0) { 679 break; 680 } 681 682 if (used) { 683 ret = multifd_send_state->ops->send_write(p, used, &local_err); 684 if (ret != 0) { 685 break; 686 } 687 } 688 689 qemu_mutex_lock(&p->mutex); 690 p->pending_job--; 691 qemu_mutex_unlock(&p->mutex); 692 693 if (flags & MULTIFD_FLAG_SYNC) { 694 qemu_sem_post(&p->sem_sync); 695 } 696 qemu_sem_post(&multifd_send_state->channels_ready); 697 } else if (p->quit) { 698 qemu_mutex_unlock(&p->mutex); 699 break; 700 } else { 701 qemu_mutex_unlock(&p->mutex); 702 /* sometimes there are spurious wakeups */ 703 } 704 } 705 706 out: 707 if (local_err) { 708 trace_multifd_send_error(p->id); 709 multifd_send_terminate_threads(local_err); 710 error_free(local_err); 711 } 712 713 /* 714 * Error happen, I will exit, but I can't just leave, tell 715 * who pay attention to me. 716 */ 717 if (ret != 0) { 718 qemu_sem_post(&p->sem_sync); 719 qemu_sem_post(&multifd_send_state->channels_ready); 720 } 721 722 qemu_mutex_lock(&p->mutex); 723 p->running = false; 724 qemu_mutex_unlock(&p->mutex); 725 726 rcu_unregister_thread(); 727 trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); 728 729 return NULL; 730 } 731 732 static bool multifd_channel_connect(MultiFDSendParams *p, 733 QIOChannel *ioc, 734 Error *error); 735 736 static void multifd_tls_outgoing_handshake(QIOTask *task, 737 gpointer opaque) 738 { 739 MultiFDSendParams *p = opaque; 740 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task)); 741 Error *err = NULL; 742 743 if (qio_task_propagate_error(task, &err)) { 744 trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err)); 745 } else { 746 trace_multifd_tls_outgoing_handshake_complete(ioc); 747 } 748 749 if (!multifd_channel_connect(p, ioc, err)) { 750 /* 751 * Error happen, mark multifd_send_thread status as 'quit' although it 752 * is not created, and then tell who pay attention to me. 753 */ 754 p->quit = true; 755 qemu_sem_post(&multifd_send_state->channels_ready); 756 qemu_sem_post(&p->sem_sync); 757 } 758 } 759 760 static void *multifd_tls_handshake_thread(void *opaque) 761 { 762 MultiFDSendParams *p = opaque; 763 QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c); 764 765 qio_channel_tls_handshake(tioc, 766 multifd_tls_outgoing_handshake, 767 p, 768 NULL, 769 NULL); 770 return NULL; 771 } 772 773 static void multifd_tls_channel_connect(MultiFDSendParams *p, 774 QIOChannel *ioc, 775 Error **errp) 776 { 777 MigrationState *s = migrate_get_current(); 778 const char *hostname = p->tls_hostname; 779 QIOChannelTLS *tioc; 780 781 tioc = migration_tls_client_create(s, ioc, hostname, errp); 782 if (!tioc) { 783 return; 784 } 785 786 object_unref(OBJECT(ioc)); 787 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname); 788 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing"); 789 p->c = QIO_CHANNEL(tioc); 790 qemu_thread_create(&p->thread, "multifd-tls-handshake-worker", 791 multifd_tls_handshake_thread, p, 792 QEMU_THREAD_JOINABLE); 793 } 794 795 static bool multifd_channel_connect(MultiFDSendParams *p, 796 QIOChannel *ioc, 797 Error *error) 798 { 799 MigrationState *s = migrate_get_current(); 800 801 trace_multifd_set_outgoing_channel( 802 ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error); 803 804 if (!error) { 805 if (s->parameters.tls_creds && 806 *s->parameters.tls_creds && 807 !object_dynamic_cast(OBJECT(ioc), 808 TYPE_QIO_CHANNEL_TLS)) { 809 multifd_tls_channel_connect(p, ioc, &error); 810 if (!error) { 811 /* 812 * tls_channel_connect will call back to this 813 * function after the TLS handshake, 814 * so we mustn't call multifd_send_thread until then 815 */ 816 return true; 817 } else { 818 return false; 819 } 820 } else { 821 migration_ioc_register_yank(ioc); 822 p->registered_yank = true; 823 p->c = ioc; 824 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 825 QEMU_THREAD_JOINABLE); 826 } 827 return true; 828 } 829 830 return false; 831 } 832 833 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p, 834 QIOChannel *ioc, Error *err) 835 { 836 migrate_set_error(migrate_get_current(), err); 837 /* Error happen, we need to tell who pay attention to me */ 838 qemu_sem_post(&multifd_send_state->channels_ready); 839 qemu_sem_post(&p->sem_sync); 840 /* 841 * Although multifd_send_thread is not created, but main migration 842 * thread neet to judge whether it is running, so we need to mark 843 * its status. 844 */ 845 p->quit = true; 846 object_unref(OBJECT(ioc)); 847 error_free(err); 848 } 849 850 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 851 { 852 MultiFDSendParams *p = opaque; 853 QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 854 Error *local_err = NULL; 855 856 trace_multifd_new_send_channel_async(p->id); 857 if (qio_task_propagate_error(task, &local_err)) { 858 goto cleanup; 859 } else { 860 p->c = QIO_CHANNEL(sioc); 861 qio_channel_set_delay(p->c, false); 862 p->running = true; 863 if (!multifd_channel_connect(p, sioc, local_err)) { 864 goto cleanup; 865 } 866 return; 867 } 868 869 cleanup: 870 multifd_new_send_channel_cleanup(p, sioc, local_err); 871 } 872 873 static bool migrate_allow_multifd = true; 874 void migrate_protocol_allow_multifd(bool allow) 875 { 876 migrate_allow_multifd = allow; 877 } 878 879 bool migrate_multifd_is_allowed(void) 880 { 881 return migrate_allow_multifd; 882 } 883 884 int multifd_save_setup(Error **errp) 885 { 886 int thread_count; 887 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 888 uint8_t i; 889 MigrationState *s; 890 891 if (!migrate_use_multifd()) { 892 return 0; 893 } 894 if (!migrate_multifd_is_allowed()) { 895 error_setg(errp, "multifd is not supported by current protocol"); 896 return -1; 897 } 898 899 s = migrate_get_current(); 900 thread_count = migrate_multifd_channels(); 901 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 902 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 903 multifd_send_state->pages = multifd_pages_init(page_count); 904 qemu_sem_init(&multifd_send_state->channels_ready, 0); 905 qatomic_set(&multifd_send_state->exiting, 0); 906 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 907 908 for (i = 0; i < thread_count; i++) { 909 MultiFDSendParams *p = &multifd_send_state->params[i]; 910 911 qemu_mutex_init(&p->mutex); 912 qemu_sem_init(&p->sem, 0); 913 qemu_sem_init(&p->sem_sync, 0); 914 p->quit = false; 915 p->pending_job = 0; 916 p->id = i; 917 p->pages = multifd_pages_init(page_count); 918 p->packet_len = sizeof(MultiFDPacket_t) 919 + sizeof(uint64_t) * page_count; 920 p->packet = g_malloc0(p->packet_len); 921 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 922 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 923 p->name = g_strdup_printf("multifdsend_%d", i); 924 p->tls_hostname = g_strdup(s->hostname); 925 socket_send_channel_create(multifd_new_send_channel_async, p); 926 } 927 928 for (i = 0; i < thread_count; i++) { 929 MultiFDSendParams *p = &multifd_send_state->params[i]; 930 Error *local_err = NULL; 931 int ret; 932 933 ret = multifd_send_state->ops->send_setup(p, &local_err); 934 if (ret) { 935 error_propagate(errp, local_err); 936 return ret; 937 } 938 } 939 return 0; 940 } 941 942 struct { 943 MultiFDRecvParams *params; 944 /* number of created threads */ 945 int count; 946 /* syncs main thread and channels */ 947 QemuSemaphore sem_sync; 948 /* global number of generated multifd packets */ 949 uint64_t packet_num; 950 /* multifd ops */ 951 MultiFDMethods *ops; 952 } *multifd_recv_state; 953 954 static void multifd_recv_terminate_threads(Error *err) 955 { 956 int i; 957 958 trace_multifd_recv_terminate_threads(err != NULL); 959 960 if (err) { 961 MigrationState *s = migrate_get_current(); 962 migrate_set_error(s, err); 963 if (s->state == MIGRATION_STATUS_SETUP || 964 s->state == MIGRATION_STATUS_ACTIVE) { 965 migrate_set_state(&s->state, s->state, 966 MIGRATION_STATUS_FAILED); 967 } 968 } 969 970 for (i = 0; i < migrate_multifd_channels(); i++) { 971 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 972 973 qemu_mutex_lock(&p->mutex); 974 p->quit = true; 975 /* 976 * We could arrive here for two reasons: 977 * - normal quit, i.e. everything went fine, just finished 978 * - error quit: We close the channels so the channel threads 979 * finish the qio_channel_read_all_eof() 980 */ 981 if (p->c) { 982 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 983 } 984 qemu_mutex_unlock(&p->mutex); 985 } 986 } 987 988 int multifd_load_cleanup(Error **errp) 989 { 990 int i; 991 992 if (!migrate_use_multifd() || !migrate_multifd_is_allowed()) { 993 return 0; 994 } 995 multifd_recv_terminate_threads(NULL); 996 for (i = 0; i < migrate_multifd_channels(); i++) { 997 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 998 999 if (p->running) { 1000 p->quit = true; 1001 /* 1002 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 1003 * however try to wakeup it without harm in cleanup phase. 1004 */ 1005 qemu_sem_post(&p->sem_sync); 1006 qemu_thread_join(&p->thread); 1007 } 1008 } 1009 for (i = 0; i < migrate_multifd_channels(); i++) { 1010 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1011 1012 migration_ioc_unregister_yank(p->c); 1013 object_unref(OBJECT(p->c)); 1014 p->c = NULL; 1015 qemu_mutex_destroy(&p->mutex); 1016 qemu_sem_destroy(&p->sem_sync); 1017 g_free(p->name); 1018 p->name = NULL; 1019 multifd_pages_clear(p->pages); 1020 p->pages = NULL; 1021 p->packet_len = 0; 1022 g_free(p->packet); 1023 p->packet = NULL; 1024 multifd_recv_state->ops->recv_cleanup(p); 1025 } 1026 qemu_sem_destroy(&multifd_recv_state->sem_sync); 1027 g_free(multifd_recv_state->params); 1028 multifd_recv_state->params = NULL; 1029 g_free(multifd_recv_state); 1030 multifd_recv_state = NULL; 1031 1032 return 0; 1033 } 1034 1035 void multifd_recv_sync_main(void) 1036 { 1037 int i; 1038 1039 if (!migrate_use_multifd()) { 1040 return; 1041 } 1042 for (i = 0; i < migrate_multifd_channels(); i++) { 1043 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1044 1045 trace_multifd_recv_sync_main_wait(p->id); 1046 qemu_sem_wait(&multifd_recv_state->sem_sync); 1047 } 1048 for (i = 0; i < migrate_multifd_channels(); i++) { 1049 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1050 1051 WITH_QEMU_LOCK_GUARD(&p->mutex) { 1052 if (multifd_recv_state->packet_num < p->packet_num) { 1053 multifd_recv_state->packet_num = p->packet_num; 1054 } 1055 } 1056 trace_multifd_recv_sync_main_signal(p->id); 1057 qemu_sem_post(&p->sem_sync); 1058 } 1059 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 1060 } 1061 1062 static void *multifd_recv_thread(void *opaque) 1063 { 1064 MultiFDRecvParams *p = opaque; 1065 Error *local_err = NULL; 1066 int ret; 1067 1068 trace_multifd_recv_thread_start(p->id); 1069 rcu_register_thread(); 1070 1071 while (true) { 1072 uint32_t used; 1073 uint32_t flags; 1074 1075 if (p->quit) { 1076 break; 1077 } 1078 1079 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 1080 p->packet_len, &local_err); 1081 if (ret == 0) { /* EOF */ 1082 break; 1083 } 1084 if (ret == -1) { /* Error */ 1085 break; 1086 } 1087 1088 qemu_mutex_lock(&p->mutex); 1089 ret = multifd_recv_unfill_packet(p, &local_err); 1090 if (ret) { 1091 qemu_mutex_unlock(&p->mutex); 1092 break; 1093 } 1094 1095 used = p->pages->num; 1096 flags = p->flags; 1097 /* recv methods don't know how to handle the SYNC flag */ 1098 p->flags &= ~MULTIFD_FLAG_SYNC; 1099 trace_multifd_recv(p->id, p->packet_num, used, flags, 1100 p->next_packet_size); 1101 p->num_packets++; 1102 p->num_pages += used; 1103 qemu_mutex_unlock(&p->mutex); 1104 1105 if (used) { 1106 ret = multifd_recv_state->ops->recv_pages(p, &local_err); 1107 if (ret != 0) { 1108 break; 1109 } 1110 } 1111 1112 if (flags & MULTIFD_FLAG_SYNC) { 1113 qemu_sem_post(&multifd_recv_state->sem_sync); 1114 qemu_sem_wait(&p->sem_sync); 1115 } 1116 } 1117 1118 if (local_err) { 1119 multifd_recv_terminate_threads(local_err); 1120 error_free(local_err); 1121 } 1122 qemu_mutex_lock(&p->mutex); 1123 p->running = false; 1124 qemu_mutex_unlock(&p->mutex); 1125 1126 rcu_unregister_thread(); 1127 trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); 1128 1129 return NULL; 1130 } 1131 1132 int multifd_load_setup(Error **errp) 1133 { 1134 int thread_count; 1135 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 1136 uint8_t i; 1137 1138 if (!migrate_use_multifd()) { 1139 return 0; 1140 } 1141 if (!migrate_multifd_is_allowed()) { 1142 error_setg(errp, "multifd is not supported by current protocol"); 1143 return -1; 1144 } 1145 thread_count = migrate_multifd_channels(); 1146 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 1147 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1148 qatomic_set(&multifd_recv_state->count, 0); 1149 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1150 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1151 1152 for (i = 0; i < thread_count; i++) { 1153 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1154 1155 qemu_mutex_init(&p->mutex); 1156 qemu_sem_init(&p->sem_sync, 0); 1157 p->quit = false; 1158 p->id = i; 1159 p->pages = multifd_pages_init(page_count); 1160 p->packet_len = sizeof(MultiFDPacket_t) 1161 + sizeof(uint64_t) * page_count; 1162 p->packet = g_malloc0(p->packet_len); 1163 p->name = g_strdup_printf("multifdrecv_%d", i); 1164 } 1165 1166 for (i = 0; i < thread_count; i++) { 1167 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1168 Error *local_err = NULL; 1169 int ret; 1170 1171 ret = multifd_recv_state->ops->recv_setup(p, &local_err); 1172 if (ret) { 1173 error_propagate(errp, local_err); 1174 return ret; 1175 } 1176 } 1177 return 0; 1178 } 1179 1180 bool multifd_recv_all_channels_created(void) 1181 { 1182 int thread_count = migrate_multifd_channels(); 1183 1184 if (!migrate_use_multifd()) { 1185 return true; 1186 } 1187 1188 if (!multifd_recv_state) { 1189 /* Called before any connections created */ 1190 return false; 1191 } 1192 1193 return thread_count == qatomic_read(&multifd_recv_state->count); 1194 } 1195 1196 /* 1197 * Try to receive all multifd channels to get ready for the migration. 1198 * - Return true and do not set @errp when correctly receiving all channels; 1199 * - Return false and do not set @errp when correctly receiving the current one; 1200 * - Return false and set @errp when failing to receive the current channel. 1201 */ 1202 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1203 { 1204 MultiFDRecvParams *p; 1205 Error *local_err = NULL; 1206 int id; 1207 1208 id = multifd_recv_initial_packet(ioc, &local_err); 1209 if (id < 0) { 1210 multifd_recv_terminate_threads(local_err); 1211 error_propagate_prepend(errp, local_err, 1212 "failed to receive packet" 1213 " via multifd channel %d: ", 1214 qatomic_read(&multifd_recv_state->count)); 1215 return false; 1216 } 1217 trace_multifd_recv_new_channel(id); 1218 1219 p = &multifd_recv_state->params[id]; 1220 if (p->c != NULL) { 1221 error_setg(&local_err, "multifd: received id '%d' already setup'", 1222 id); 1223 multifd_recv_terminate_threads(local_err); 1224 error_propagate(errp, local_err); 1225 return false; 1226 } 1227 p->c = ioc; 1228 object_ref(OBJECT(ioc)); 1229 /* initial packet */ 1230 p->num_packets = 1; 1231 1232 p->running = true; 1233 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1234 QEMU_THREAD_JOINABLE); 1235 qatomic_inc(&multifd_recv_state->count); 1236 return qatomic_read(&multifd_recv_state->count) == 1237 migrate_multifd_channels(); 1238 } 1239