1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "socket.h" 23 #include "qemu-file.h" 24 #include "trace.h" 25 #include "multifd.h" 26 27 /* Multiple fd's */ 28 29 #define MULTIFD_MAGIC 0x11223344U 30 #define MULTIFD_VERSION 1 31 32 typedef struct { 33 uint32_t magic; 34 uint32_t version; 35 unsigned char uuid[16]; /* QemuUUID */ 36 uint8_t id; 37 uint8_t unused1[7]; /* Reserved for future use */ 38 uint64_t unused2[4]; /* Reserved for future use */ 39 } __attribute__((packed)) MultiFDInit_t; 40 41 /* Multifd without compression */ 42 43 /** 44 * nocomp_send_setup: setup send side 45 * 46 * For no compression this function does nothing. 47 * 48 * Returns 0 for success or -1 for error 49 * 50 * @p: Params for the channel that we are using 51 * @errp: pointer to an error 52 */ 53 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp) 54 { 55 return 0; 56 } 57 58 /** 59 * nocomp_send_cleanup: cleanup send side 60 * 61 * For no compression this function does nothing. 62 * 63 * @p: Params for the channel that we are using 64 */ 65 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp) 66 { 67 return; 68 } 69 70 /** 71 * nocomp_send_prepare: prepare date to be able to send 72 * 73 * For no compression we just have to calculate the size of the 74 * packet. 75 * 76 * Returns 0 for success or -1 for error 77 * 78 * @p: Params for the channel that we are using 79 * @used: number of pages used 80 * @errp: pointer to an error 81 */ 82 static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used, 83 Error **errp) 84 { 85 p->next_packet_size = used * qemu_target_page_size(); 86 p->flags |= MULTIFD_FLAG_NOCOMP; 87 return 0; 88 } 89 90 /** 91 * nocomp_send_write: do the actual write of the data 92 * 93 * For no compression we just have to write the data. 94 * 95 * Returns 0 for success or -1 for error 96 * 97 * @p: Params for the channel that we are using 98 * @used: number of pages used 99 * @errp: pointer to an error 100 */ 101 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp) 102 { 103 return qio_channel_writev_all(p->c, p->pages->iov, used, errp); 104 } 105 106 /** 107 * nocomp_recv_setup: setup receive side 108 * 109 * For no compression this function does nothing. 110 * 111 * Returns 0 for success or -1 for error 112 * 113 * @p: Params for the channel that we are using 114 * @errp: pointer to an error 115 */ 116 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp) 117 { 118 return 0; 119 } 120 121 /** 122 * nocomp_recv_cleanup: setup receive side 123 * 124 * For no compression this function does nothing. 125 * 126 * @p: Params for the channel that we are using 127 */ 128 static void nocomp_recv_cleanup(MultiFDRecvParams *p) 129 { 130 } 131 132 /** 133 * nocomp_recv_pages: read the data from the channel into actual pages 134 * 135 * For no compression we just need to read things into the correct place. 136 * 137 * Returns 0 for success or -1 for error 138 * 139 * @p: Params for the channel that we are using 140 * @used: number of pages used 141 * @errp: pointer to an error 142 */ 143 static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp) 144 { 145 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; 146 147 if (flags != MULTIFD_FLAG_NOCOMP) { 148 error_setg(errp, "multifd %d: flags received %x flags expected %x", 149 p->id, flags, MULTIFD_FLAG_NOCOMP); 150 return -1; 151 } 152 return qio_channel_readv_all(p->c, p->pages->iov, used, errp); 153 } 154 155 static MultiFDMethods multifd_nocomp_ops = { 156 .send_setup = nocomp_send_setup, 157 .send_cleanup = nocomp_send_cleanup, 158 .send_prepare = nocomp_send_prepare, 159 .send_write = nocomp_send_write, 160 .recv_setup = nocomp_recv_setup, 161 .recv_cleanup = nocomp_recv_cleanup, 162 .recv_pages = nocomp_recv_pages 163 }; 164 165 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = { 166 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops, 167 }; 168 169 void multifd_register_ops(int method, MultiFDMethods *ops) 170 { 171 assert(0 < method && method < MULTIFD_COMPRESSION__MAX); 172 multifd_ops[method] = ops; 173 } 174 175 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 176 { 177 MultiFDInit_t msg = {}; 178 int ret; 179 180 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 181 msg.version = cpu_to_be32(MULTIFD_VERSION); 182 msg.id = p->id; 183 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 184 185 ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 186 if (ret != 0) { 187 return -1; 188 } 189 return 0; 190 } 191 192 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 193 { 194 MultiFDInit_t msg; 195 int ret; 196 197 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 198 if (ret != 0) { 199 return -1; 200 } 201 202 msg.magic = be32_to_cpu(msg.magic); 203 msg.version = be32_to_cpu(msg.version); 204 205 if (msg.magic != MULTIFD_MAGIC) { 206 error_setg(errp, "multifd: received packet magic %x " 207 "expected %x", msg.magic, MULTIFD_MAGIC); 208 return -1; 209 } 210 211 if (msg.version != MULTIFD_VERSION) { 212 error_setg(errp, "multifd: received packet version %d " 213 "expected %d", msg.version, MULTIFD_VERSION); 214 return -1; 215 } 216 217 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 218 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 219 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 220 221 error_setg(errp, "multifd: received uuid '%s' and expected " 222 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 223 g_free(uuid); 224 g_free(msg_uuid); 225 return -1; 226 } 227 228 if (msg.id > migrate_multifd_channels()) { 229 error_setg(errp, "multifd: received channel version %d " 230 "expected %d", msg.version, MULTIFD_VERSION); 231 return -1; 232 } 233 234 return msg.id; 235 } 236 237 static MultiFDPages_t *multifd_pages_init(size_t size) 238 { 239 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 240 241 pages->allocated = size; 242 pages->iov = g_new0(struct iovec, size); 243 pages->offset = g_new0(ram_addr_t, size); 244 245 return pages; 246 } 247 248 static void multifd_pages_clear(MultiFDPages_t *pages) 249 { 250 pages->used = 0; 251 pages->allocated = 0; 252 pages->packet_num = 0; 253 pages->block = NULL; 254 g_free(pages->iov); 255 pages->iov = NULL; 256 g_free(pages->offset); 257 pages->offset = NULL; 258 g_free(pages); 259 } 260 261 static void multifd_send_fill_packet(MultiFDSendParams *p) 262 { 263 MultiFDPacket_t *packet = p->packet; 264 int i; 265 266 packet->flags = cpu_to_be32(p->flags); 267 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 268 packet->pages_used = cpu_to_be32(p->pages->used); 269 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 270 packet->packet_num = cpu_to_be64(p->packet_num); 271 272 if (p->pages->block) { 273 strncpy(packet->ramblock, p->pages->block->idstr, 256); 274 } 275 276 for (i = 0; i < p->pages->used; i++) { 277 /* there are architectures where ram_addr_t is 32 bit */ 278 uint64_t temp = p->pages->offset[i]; 279 280 packet->offset[i] = cpu_to_be64(temp); 281 } 282 } 283 284 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 285 { 286 MultiFDPacket_t *packet = p->packet; 287 uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 288 RAMBlock *block; 289 int i; 290 291 packet->magic = be32_to_cpu(packet->magic); 292 if (packet->magic != MULTIFD_MAGIC) { 293 error_setg(errp, "multifd: received packet " 294 "magic %x and expected magic %x", 295 packet->magic, MULTIFD_MAGIC); 296 return -1; 297 } 298 299 packet->version = be32_to_cpu(packet->version); 300 if (packet->version != MULTIFD_VERSION) { 301 error_setg(errp, "multifd: received packet " 302 "version %d and expected version %d", 303 packet->version, MULTIFD_VERSION); 304 return -1; 305 } 306 307 p->flags = be32_to_cpu(packet->flags); 308 309 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 310 /* 311 * If we received a packet that is 100 times bigger than expected 312 * just stop migration. It is a magic number. 313 */ 314 if (packet->pages_alloc > pages_max * 100) { 315 error_setg(errp, "multifd: received packet " 316 "with size %d and expected a maximum size of %d", 317 packet->pages_alloc, pages_max * 100) ; 318 return -1; 319 } 320 /* 321 * We received a packet that is bigger than expected but inside 322 * reasonable limits (see previous comment). Just reallocate. 323 */ 324 if (packet->pages_alloc > p->pages->allocated) { 325 multifd_pages_clear(p->pages); 326 p->pages = multifd_pages_init(packet->pages_alloc); 327 } 328 329 p->pages->used = be32_to_cpu(packet->pages_used); 330 if (p->pages->used > packet->pages_alloc) { 331 error_setg(errp, "multifd: received packet " 332 "with %d pages and expected maximum pages are %d", 333 p->pages->used, packet->pages_alloc) ; 334 return -1; 335 } 336 337 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 338 p->packet_num = be64_to_cpu(packet->packet_num); 339 340 if (p->pages->used == 0) { 341 return 0; 342 } 343 344 /* make sure that ramblock is 0 terminated */ 345 packet->ramblock[255] = 0; 346 block = qemu_ram_block_by_name(packet->ramblock); 347 if (!block) { 348 error_setg(errp, "multifd: unknown ram block %s", 349 packet->ramblock); 350 return -1; 351 } 352 353 for (i = 0; i < p->pages->used; i++) { 354 uint64_t offset = be64_to_cpu(packet->offset[i]); 355 356 if (offset > (block->used_length - qemu_target_page_size())) { 357 error_setg(errp, "multifd: offset too long %" PRIu64 358 " (max " RAM_ADDR_FMT ")", 359 offset, block->max_length); 360 return -1; 361 } 362 p->pages->iov[i].iov_base = block->host + offset; 363 p->pages->iov[i].iov_len = qemu_target_page_size(); 364 } 365 366 return 0; 367 } 368 369 struct { 370 MultiFDSendParams *params; 371 /* array of pages to sent */ 372 MultiFDPages_t *pages; 373 /* global number of generated multifd packets */ 374 uint64_t packet_num; 375 /* send channels ready */ 376 QemuSemaphore channels_ready; 377 /* 378 * Have we already run terminate threads. There is a race when it 379 * happens that we got one error while we are exiting. 380 * We will use atomic operations. Only valid values are 0 and 1. 381 */ 382 int exiting; 383 /* multifd ops */ 384 MultiFDMethods *ops; 385 } *multifd_send_state; 386 387 /* 388 * How we use multifd_send_state->pages and channel->pages? 389 * 390 * We create a pages for each channel, and a main one. Each time that 391 * we need to send a batch of pages we interchange the ones between 392 * multifd_send_state and the channel that is sending it. There are 393 * two reasons for that: 394 * - to not have to do so many mallocs during migration 395 * - to make easier to know what to free at the end of migration 396 * 397 * This way we always know who is the owner of each "pages" struct, 398 * and we don't need any locking. It belongs to the migration thread 399 * or to the channel thread. Switching is safe because the migration 400 * thread is using the channel mutex when changing it, and the channel 401 * have to had finish with its own, otherwise pending_job can't be 402 * false. 403 */ 404 405 static int multifd_send_pages(QEMUFile *f) 406 { 407 int i; 408 static int next_channel; 409 MultiFDSendParams *p = NULL; /* make happy gcc */ 410 MultiFDPages_t *pages = multifd_send_state->pages; 411 uint64_t transferred; 412 413 if (atomic_read(&multifd_send_state->exiting)) { 414 return -1; 415 } 416 417 qemu_sem_wait(&multifd_send_state->channels_ready); 418 /* 419 * next_channel can remain from a previous migration that was 420 * using more channels, so ensure it doesn't overflow if the 421 * limit is lower now. 422 */ 423 next_channel %= migrate_multifd_channels(); 424 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 425 p = &multifd_send_state->params[i]; 426 427 qemu_mutex_lock(&p->mutex); 428 if (p->quit) { 429 error_report("%s: channel %d has already quit!", __func__, i); 430 qemu_mutex_unlock(&p->mutex); 431 return -1; 432 } 433 if (!p->pending_job) { 434 p->pending_job++; 435 next_channel = (i + 1) % migrate_multifd_channels(); 436 break; 437 } 438 qemu_mutex_unlock(&p->mutex); 439 } 440 assert(!p->pages->used); 441 assert(!p->pages->block); 442 443 p->packet_num = multifd_send_state->packet_num++; 444 multifd_send_state->pages = p->pages; 445 p->pages = pages; 446 transferred = ((uint64_t) pages->used) * qemu_target_page_size() 447 + p->packet_len; 448 qemu_file_update_transfer(f, transferred); 449 ram_counters.multifd_bytes += transferred; 450 ram_counters.transferred += transferred;; 451 qemu_mutex_unlock(&p->mutex); 452 qemu_sem_post(&p->sem); 453 454 return 1; 455 } 456 457 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 458 { 459 MultiFDPages_t *pages = multifd_send_state->pages; 460 461 if (!pages->block) { 462 pages->block = block; 463 } 464 465 if (pages->block == block) { 466 pages->offset[pages->used] = offset; 467 pages->iov[pages->used].iov_base = block->host + offset; 468 pages->iov[pages->used].iov_len = qemu_target_page_size(); 469 pages->used++; 470 471 if (pages->used < pages->allocated) { 472 return 1; 473 } 474 } 475 476 if (multifd_send_pages(f) < 0) { 477 return -1; 478 } 479 480 if (pages->block != block) { 481 return multifd_queue_page(f, block, offset); 482 } 483 484 return 1; 485 } 486 487 static void multifd_send_terminate_threads(Error *err) 488 { 489 int i; 490 491 trace_multifd_send_terminate_threads(err != NULL); 492 493 if (err) { 494 MigrationState *s = migrate_get_current(); 495 migrate_set_error(s, err); 496 if (s->state == MIGRATION_STATUS_SETUP || 497 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 498 s->state == MIGRATION_STATUS_DEVICE || 499 s->state == MIGRATION_STATUS_ACTIVE) { 500 migrate_set_state(&s->state, s->state, 501 MIGRATION_STATUS_FAILED); 502 } 503 } 504 505 /* 506 * We don't want to exit each threads twice. Depending on where 507 * we get the error, or if there are two independent errors in two 508 * threads at the same time, we can end calling this function 509 * twice. 510 */ 511 if (atomic_xchg(&multifd_send_state->exiting, 1)) { 512 return; 513 } 514 515 for (i = 0; i < migrate_multifd_channels(); i++) { 516 MultiFDSendParams *p = &multifd_send_state->params[i]; 517 518 qemu_mutex_lock(&p->mutex); 519 p->quit = true; 520 qemu_sem_post(&p->sem); 521 qemu_mutex_unlock(&p->mutex); 522 } 523 } 524 525 void multifd_save_cleanup(void) 526 { 527 int i; 528 529 if (!migrate_use_multifd()) { 530 return; 531 } 532 multifd_send_terminate_threads(NULL); 533 for (i = 0; i < migrate_multifd_channels(); i++) { 534 MultiFDSendParams *p = &multifd_send_state->params[i]; 535 536 if (p->running) { 537 qemu_thread_join(&p->thread); 538 } 539 } 540 for (i = 0; i < migrate_multifd_channels(); i++) { 541 MultiFDSendParams *p = &multifd_send_state->params[i]; 542 Error *local_err = NULL; 543 544 socket_send_channel_destroy(p->c); 545 p->c = NULL; 546 qemu_mutex_destroy(&p->mutex); 547 qemu_sem_destroy(&p->sem); 548 qemu_sem_destroy(&p->sem_sync); 549 g_free(p->name); 550 p->name = NULL; 551 multifd_pages_clear(p->pages); 552 p->pages = NULL; 553 p->packet_len = 0; 554 g_free(p->packet); 555 p->packet = NULL; 556 multifd_send_state->ops->send_cleanup(p, &local_err); 557 if (local_err) { 558 migrate_set_error(migrate_get_current(), local_err); 559 error_free(local_err); 560 } 561 } 562 qemu_sem_destroy(&multifd_send_state->channels_ready); 563 g_free(multifd_send_state->params); 564 multifd_send_state->params = NULL; 565 multifd_pages_clear(multifd_send_state->pages); 566 multifd_send_state->pages = NULL; 567 g_free(multifd_send_state); 568 multifd_send_state = NULL; 569 } 570 571 void multifd_send_sync_main(QEMUFile *f) 572 { 573 int i; 574 575 if (!migrate_use_multifd()) { 576 return; 577 } 578 if (multifd_send_state->pages->used) { 579 if (multifd_send_pages(f) < 0) { 580 error_report("%s: multifd_send_pages fail", __func__); 581 return; 582 } 583 } 584 for (i = 0; i < migrate_multifd_channels(); i++) { 585 MultiFDSendParams *p = &multifd_send_state->params[i]; 586 587 trace_multifd_send_sync_main_signal(p->id); 588 589 qemu_mutex_lock(&p->mutex); 590 591 if (p->quit) { 592 error_report("%s: channel %d has already quit", __func__, i); 593 qemu_mutex_unlock(&p->mutex); 594 return; 595 } 596 597 p->packet_num = multifd_send_state->packet_num++; 598 p->flags |= MULTIFD_FLAG_SYNC; 599 p->pending_job++; 600 qemu_file_update_transfer(f, p->packet_len); 601 ram_counters.multifd_bytes += p->packet_len; 602 ram_counters.transferred += p->packet_len; 603 qemu_mutex_unlock(&p->mutex); 604 qemu_sem_post(&p->sem); 605 } 606 for (i = 0; i < migrate_multifd_channels(); i++) { 607 MultiFDSendParams *p = &multifd_send_state->params[i]; 608 609 trace_multifd_send_sync_main_wait(p->id); 610 qemu_sem_wait(&p->sem_sync); 611 } 612 trace_multifd_send_sync_main(multifd_send_state->packet_num); 613 } 614 615 static void *multifd_send_thread(void *opaque) 616 { 617 MultiFDSendParams *p = opaque; 618 Error *local_err = NULL; 619 int ret = 0; 620 uint32_t flags = 0; 621 622 trace_multifd_send_thread_start(p->id); 623 rcu_register_thread(); 624 625 if (multifd_send_initial_packet(p, &local_err) < 0) { 626 ret = -1; 627 goto out; 628 } 629 /* initial packet */ 630 p->num_packets = 1; 631 632 while (true) { 633 qemu_sem_wait(&p->sem); 634 635 if (atomic_read(&multifd_send_state->exiting)) { 636 break; 637 } 638 qemu_mutex_lock(&p->mutex); 639 640 if (p->pending_job) { 641 uint32_t used = p->pages->used; 642 uint64_t packet_num = p->packet_num; 643 flags = p->flags; 644 645 if (used) { 646 ret = multifd_send_state->ops->send_prepare(p, used, 647 &local_err); 648 if (ret != 0) { 649 qemu_mutex_unlock(&p->mutex); 650 break; 651 } 652 } 653 multifd_send_fill_packet(p); 654 p->flags = 0; 655 p->num_packets++; 656 p->num_pages += used; 657 p->pages->used = 0; 658 p->pages->block = NULL; 659 qemu_mutex_unlock(&p->mutex); 660 661 trace_multifd_send(p->id, packet_num, used, flags, 662 p->next_packet_size); 663 664 ret = qio_channel_write_all(p->c, (void *)p->packet, 665 p->packet_len, &local_err); 666 if (ret != 0) { 667 break; 668 } 669 670 if (used) { 671 ret = multifd_send_state->ops->send_write(p, used, &local_err); 672 if (ret != 0) { 673 break; 674 } 675 } 676 677 qemu_mutex_lock(&p->mutex); 678 p->pending_job--; 679 qemu_mutex_unlock(&p->mutex); 680 681 if (flags & MULTIFD_FLAG_SYNC) { 682 qemu_sem_post(&p->sem_sync); 683 } 684 qemu_sem_post(&multifd_send_state->channels_ready); 685 } else if (p->quit) { 686 qemu_mutex_unlock(&p->mutex); 687 break; 688 } else { 689 qemu_mutex_unlock(&p->mutex); 690 /* sometimes there are spurious wakeups */ 691 } 692 } 693 694 out: 695 if (local_err) { 696 trace_multifd_send_error(p->id); 697 multifd_send_terminate_threads(local_err); 698 error_free(local_err); 699 } 700 701 /* 702 * Error happen, I will exit, but I can't just leave, tell 703 * who pay attention to me. 704 */ 705 if (ret != 0) { 706 qemu_sem_post(&p->sem_sync); 707 qemu_sem_post(&multifd_send_state->channels_ready); 708 } 709 710 qemu_mutex_lock(&p->mutex); 711 p->running = false; 712 qemu_mutex_unlock(&p->mutex); 713 714 rcu_unregister_thread(); 715 trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); 716 717 return NULL; 718 } 719 720 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 721 { 722 MultiFDSendParams *p = opaque; 723 QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 724 Error *local_err = NULL; 725 726 trace_multifd_new_send_channel_async(p->id); 727 if (qio_task_propagate_error(task, &local_err)) { 728 migrate_set_error(migrate_get_current(), local_err); 729 /* Error happen, we need to tell who pay attention to me */ 730 qemu_sem_post(&multifd_send_state->channels_ready); 731 qemu_sem_post(&p->sem_sync); 732 /* 733 * Although multifd_send_thread is not created, but main migration 734 * thread needs to judge whether it is running, so we need to mark 735 * its status. 736 */ 737 p->quit = true; 738 object_unref(OBJECT(sioc)); 739 error_free(local_err); 740 } else { 741 p->c = QIO_CHANNEL(sioc); 742 qio_channel_set_delay(p->c, false); 743 p->running = true; 744 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 745 QEMU_THREAD_JOINABLE); 746 } 747 } 748 749 int multifd_save_setup(Error **errp) 750 { 751 int thread_count; 752 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 753 uint8_t i; 754 755 if (!migrate_use_multifd()) { 756 return 0; 757 } 758 thread_count = migrate_multifd_channels(); 759 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 760 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 761 multifd_send_state->pages = multifd_pages_init(page_count); 762 qemu_sem_init(&multifd_send_state->channels_ready, 0); 763 atomic_set(&multifd_send_state->exiting, 0); 764 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; 765 766 for (i = 0; i < thread_count; i++) { 767 MultiFDSendParams *p = &multifd_send_state->params[i]; 768 769 qemu_mutex_init(&p->mutex); 770 qemu_sem_init(&p->sem, 0); 771 qemu_sem_init(&p->sem_sync, 0); 772 p->quit = false; 773 p->pending_job = 0; 774 p->id = i; 775 p->pages = multifd_pages_init(page_count); 776 p->packet_len = sizeof(MultiFDPacket_t) 777 + sizeof(uint64_t) * page_count; 778 p->packet = g_malloc0(p->packet_len); 779 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 780 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 781 p->name = g_strdup_printf("multifdsend_%d", i); 782 socket_send_channel_create(multifd_new_send_channel_async, p); 783 } 784 785 for (i = 0; i < thread_count; i++) { 786 MultiFDSendParams *p = &multifd_send_state->params[i]; 787 Error *local_err = NULL; 788 int ret; 789 790 ret = multifd_send_state->ops->send_setup(p, &local_err); 791 if (ret) { 792 error_propagate(errp, local_err); 793 return ret; 794 } 795 } 796 return 0; 797 } 798 799 struct { 800 MultiFDRecvParams *params; 801 /* number of created threads */ 802 int count; 803 /* syncs main thread and channels */ 804 QemuSemaphore sem_sync; 805 /* global number of generated multifd packets */ 806 uint64_t packet_num; 807 /* multifd ops */ 808 MultiFDMethods *ops; 809 } *multifd_recv_state; 810 811 static void multifd_recv_terminate_threads(Error *err) 812 { 813 int i; 814 815 trace_multifd_recv_terminate_threads(err != NULL); 816 817 if (err) { 818 MigrationState *s = migrate_get_current(); 819 migrate_set_error(s, err); 820 if (s->state == MIGRATION_STATUS_SETUP || 821 s->state == MIGRATION_STATUS_ACTIVE) { 822 migrate_set_state(&s->state, s->state, 823 MIGRATION_STATUS_FAILED); 824 } 825 } 826 827 for (i = 0; i < migrate_multifd_channels(); i++) { 828 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 829 830 qemu_mutex_lock(&p->mutex); 831 p->quit = true; 832 /* 833 * We could arrive here for two reasons: 834 * - normal quit, i.e. everything went fine, just finished 835 * - error quit: We close the channels so the channel threads 836 * finish the qio_channel_read_all_eof() 837 */ 838 if (p->c) { 839 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 840 } 841 qemu_mutex_unlock(&p->mutex); 842 } 843 } 844 845 int multifd_load_cleanup(Error **errp) 846 { 847 int i; 848 849 if (!migrate_use_multifd()) { 850 return 0; 851 } 852 multifd_recv_terminate_threads(NULL); 853 for (i = 0; i < migrate_multifd_channels(); i++) { 854 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 855 856 if (p->running) { 857 p->quit = true; 858 /* 859 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 860 * however try to wakeup it without harm in cleanup phase. 861 */ 862 qemu_sem_post(&p->sem_sync); 863 qemu_thread_join(&p->thread); 864 } 865 } 866 for (i = 0; i < migrate_multifd_channels(); i++) { 867 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 868 869 object_unref(OBJECT(p->c)); 870 p->c = NULL; 871 qemu_mutex_destroy(&p->mutex); 872 qemu_sem_destroy(&p->sem_sync); 873 g_free(p->name); 874 p->name = NULL; 875 multifd_pages_clear(p->pages); 876 p->pages = NULL; 877 p->packet_len = 0; 878 g_free(p->packet); 879 p->packet = NULL; 880 multifd_recv_state->ops->recv_cleanup(p); 881 } 882 qemu_sem_destroy(&multifd_recv_state->sem_sync); 883 g_free(multifd_recv_state->params); 884 multifd_recv_state->params = NULL; 885 g_free(multifd_recv_state); 886 multifd_recv_state = NULL; 887 888 return 0; 889 } 890 891 void multifd_recv_sync_main(void) 892 { 893 int i; 894 895 if (!migrate_use_multifd()) { 896 return; 897 } 898 for (i = 0; i < migrate_multifd_channels(); i++) { 899 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 900 901 trace_multifd_recv_sync_main_wait(p->id); 902 qemu_sem_wait(&multifd_recv_state->sem_sync); 903 } 904 for (i = 0; i < migrate_multifd_channels(); i++) { 905 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 906 907 WITH_QEMU_LOCK_GUARD(&p->mutex) { 908 if (multifd_recv_state->packet_num < p->packet_num) { 909 multifd_recv_state->packet_num = p->packet_num; 910 } 911 } 912 trace_multifd_recv_sync_main_signal(p->id); 913 qemu_sem_post(&p->sem_sync); 914 } 915 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 916 } 917 918 static void *multifd_recv_thread(void *opaque) 919 { 920 MultiFDRecvParams *p = opaque; 921 Error *local_err = NULL; 922 int ret; 923 924 trace_multifd_recv_thread_start(p->id); 925 rcu_register_thread(); 926 927 while (true) { 928 uint32_t used; 929 uint32_t flags; 930 931 if (p->quit) { 932 break; 933 } 934 935 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 936 p->packet_len, &local_err); 937 if (ret == 0) { /* EOF */ 938 break; 939 } 940 if (ret == -1) { /* Error */ 941 break; 942 } 943 944 qemu_mutex_lock(&p->mutex); 945 ret = multifd_recv_unfill_packet(p, &local_err); 946 if (ret) { 947 qemu_mutex_unlock(&p->mutex); 948 break; 949 } 950 951 used = p->pages->used; 952 flags = p->flags; 953 /* recv methods don't know how to handle the SYNC flag */ 954 p->flags &= ~MULTIFD_FLAG_SYNC; 955 trace_multifd_recv(p->id, p->packet_num, used, flags, 956 p->next_packet_size); 957 p->num_packets++; 958 p->num_pages += used; 959 qemu_mutex_unlock(&p->mutex); 960 961 if (used) { 962 ret = multifd_recv_state->ops->recv_pages(p, used, &local_err); 963 if (ret != 0) { 964 break; 965 } 966 } 967 968 if (flags & MULTIFD_FLAG_SYNC) { 969 qemu_sem_post(&multifd_recv_state->sem_sync); 970 qemu_sem_wait(&p->sem_sync); 971 } 972 } 973 974 if (local_err) { 975 multifd_recv_terminate_threads(local_err); 976 error_free(local_err); 977 } 978 qemu_mutex_lock(&p->mutex); 979 p->running = false; 980 qemu_mutex_unlock(&p->mutex); 981 982 rcu_unregister_thread(); 983 trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); 984 985 return NULL; 986 } 987 988 int multifd_load_setup(Error **errp) 989 { 990 int thread_count; 991 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 992 uint8_t i; 993 994 if (!migrate_use_multifd()) { 995 return 0; 996 } 997 thread_count = migrate_multifd_channels(); 998 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 999 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 1000 atomic_set(&multifd_recv_state->count, 0); 1001 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 1002 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; 1003 1004 for (i = 0; i < thread_count; i++) { 1005 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1006 1007 qemu_mutex_init(&p->mutex); 1008 qemu_sem_init(&p->sem_sync, 0); 1009 p->quit = false; 1010 p->id = i; 1011 p->pages = multifd_pages_init(page_count); 1012 p->packet_len = sizeof(MultiFDPacket_t) 1013 + sizeof(uint64_t) * page_count; 1014 p->packet = g_malloc0(p->packet_len); 1015 p->name = g_strdup_printf("multifdrecv_%d", i); 1016 } 1017 1018 for (i = 0; i < thread_count; i++) { 1019 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 1020 Error *local_err = NULL; 1021 int ret; 1022 1023 ret = multifd_recv_state->ops->recv_setup(p, &local_err); 1024 if (ret) { 1025 error_propagate(errp, local_err); 1026 return ret; 1027 } 1028 } 1029 return 0; 1030 } 1031 1032 bool multifd_recv_all_channels_created(void) 1033 { 1034 int thread_count = migrate_multifd_channels(); 1035 1036 if (!migrate_use_multifd()) { 1037 return true; 1038 } 1039 1040 return thread_count == atomic_read(&multifd_recv_state->count); 1041 } 1042 1043 /* 1044 * Try to receive all multifd channels to get ready for the migration. 1045 * - Return true and do not set @errp when correctly receiving all channels; 1046 * - Return false and do not set @errp when correctly receiving the current one; 1047 * - Return false and set @errp when failing to receive the current channel. 1048 */ 1049 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 1050 { 1051 MultiFDRecvParams *p; 1052 Error *local_err = NULL; 1053 int id; 1054 1055 id = multifd_recv_initial_packet(ioc, &local_err); 1056 if (id < 0) { 1057 multifd_recv_terminate_threads(local_err); 1058 error_propagate_prepend(errp, local_err, 1059 "failed to receive packet" 1060 " via multifd channel %d: ", 1061 atomic_read(&multifd_recv_state->count)); 1062 return false; 1063 } 1064 trace_multifd_recv_new_channel(id); 1065 1066 p = &multifd_recv_state->params[id]; 1067 if (p->c != NULL) { 1068 error_setg(&local_err, "multifd: received id '%d' already setup'", 1069 id); 1070 multifd_recv_terminate_threads(local_err); 1071 error_propagate(errp, local_err); 1072 return false; 1073 } 1074 p->c = ioc; 1075 object_ref(OBJECT(ioc)); 1076 /* initial packet */ 1077 p->num_packets = 1; 1078 1079 p->running = true; 1080 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 1081 QEMU_THREAD_JOINABLE); 1082 atomic_inc(&multifd_recv_state->count); 1083 return atomic_read(&multifd_recv_state->count) == 1084 migrate_multifd_channels(); 1085 } 1086