1 /* 2 * Multifd common code 3 * 4 * Copyright (c) 2019-2020 Red Hat Inc 5 * 6 * Authors: 7 * Juan Quintela <quintela@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or later. 10 * See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/rcu.h" 15 #include "exec/target_page.h" 16 #include "sysemu/sysemu.h" 17 #include "exec/ramblock.h" 18 #include "qemu/error-report.h" 19 #include "qapi/error.h" 20 #include "ram.h" 21 #include "migration.h" 22 #include "socket.h" 23 #include "qemu-file.h" 24 #include "trace.h" 25 #include "multifd.h" 26 27 /* Multiple fd's */ 28 29 #define MULTIFD_MAGIC 0x11223344U 30 #define MULTIFD_VERSION 1 31 32 typedef struct { 33 uint32_t magic; 34 uint32_t version; 35 unsigned char uuid[16]; /* QemuUUID */ 36 uint8_t id; 37 uint8_t unused1[7]; /* Reserved for future use */ 38 uint64_t unused2[4]; /* Reserved for future use */ 39 } __attribute__((packed)) MultiFDInit_t; 40 41 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) 42 { 43 MultiFDInit_t msg = {}; 44 int ret; 45 46 msg.magic = cpu_to_be32(MULTIFD_MAGIC); 47 msg.version = cpu_to_be32(MULTIFD_VERSION); 48 msg.id = p->id; 49 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); 50 51 ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); 52 if (ret != 0) { 53 return -1; 54 } 55 return 0; 56 } 57 58 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) 59 { 60 MultiFDInit_t msg; 61 int ret; 62 63 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); 64 if (ret != 0) { 65 return -1; 66 } 67 68 msg.magic = be32_to_cpu(msg.magic); 69 msg.version = be32_to_cpu(msg.version); 70 71 if (msg.magic != MULTIFD_MAGIC) { 72 error_setg(errp, "multifd: received packet magic %x " 73 "expected %x", msg.magic, MULTIFD_MAGIC); 74 return -1; 75 } 76 77 if (msg.version != MULTIFD_VERSION) { 78 error_setg(errp, "multifd: received packet version %d " 79 "expected %d", msg.version, MULTIFD_VERSION); 80 return -1; 81 } 82 83 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { 84 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); 85 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); 86 87 error_setg(errp, "multifd: received uuid '%s' and expected " 88 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); 89 g_free(uuid); 90 g_free(msg_uuid); 91 return -1; 92 } 93 94 if (msg.id > migrate_multifd_channels()) { 95 error_setg(errp, "multifd: received channel version %d " 96 "expected %d", msg.version, MULTIFD_VERSION); 97 return -1; 98 } 99 100 return msg.id; 101 } 102 103 static MultiFDPages_t *multifd_pages_init(size_t size) 104 { 105 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); 106 107 pages->allocated = size; 108 pages->iov = g_new0(struct iovec, size); 109 pages->offset = g_new0(ram_addr_t, size); 110 111 return pages; 112 } 113 114 static void multifd_pages_clear(MultiFDPages_t *pages) 115 { 116 pages->used = 0; 117 pages->allocated = 0; 118 pages->packet_num = 0; 119 pages->block = NULL; 120 g_free(pages->iov); 121 pages->iov = NULL; 122 g_free(pages->offset); 123 pages->offset = NULL; 124 g_free(pages); 125 } 126 127 static void multifd_send_fill_packet(MultiFDSendParams *p) 128 { 129 MultiFDPacket_t *packet = p->packet; 130 int i; 131 132 packet->flags = cpu_to_be32(p->flags); 133 packet->pages_alloc = cpu_to_be32(p->pages->allocated); 134 packet->pages_used = cpu_to_be32(p->pages->used); 135 packet->next_packet_size = cpu_to_be32(p->next_packet_size); 136 packet->packet_num = cpu_to_be64(p->packet_num); 137 138 if (p->pages->block) { 139 strncpy(packet->ramblock, p->pages->block->idstr, 256); 140 } 141 142 for (i = 0; i < p->pages->used; i++) { 143 /* there are architectures where ram_addr_t is 32 bit */ 144 uint64_t temp = p->pages->offset[i]; 145 146 packet->offset[i] = cpu_to_be64(temp); 147 } 148 } 149 150 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) 151 { 152 MultiFDPacket_t *packet = p->packet; 153 uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 154 RAMBlock *block; 155 int i; 156 157 packet->magic = be32_to_cpu(packet->magic); 158 if (packet->magic != MULTIFD_MAGIC) { 159 error_setg(errp, "multifd: received packet " 160 "magic %x and expected magic %x", 161 packet->magic, MULTIFD_MAGIC); 162 return -1; 163 } 164 165 packet->version = be32_to_cpu(packet->version); 166 if (packet->version != MULTIFD_VERSION) { 167 error_setg(errp, "multifd: received packet " 168 "version %d and expected version %d", 169 packet->version, MULTIFD_VERSION); 170 return -1; 171 } 172 173 p->flags = be32_to_cpu(packet->flags); 174 175 packet->pages_alloc = be32_to_cpu(packet->pages_alloc); 176 /* 177 * If we received a packet that is 100 times bigger than expected 178 * just stop migration. It is a magic number. 179 */ 180 if (packet->pages_alloc > pages_max * 100) { 181 error_setg(errp, "multifd: received packet " 182 "with size %d and expected a maximum size of %d", 183 packet->pages_alloc, pages_max * 100) ; 184 return -1; 185 } 186 /* 187 * We received a packet that is bigger than expected but inside 188 * reasonable limits (see previous comment). Just reallocate. 189 */ 190 if (packet->pages_alloc > p->pages->allocated) { 191 multifd_pages_clear(p->pages); 192 p->pages = multifd_pages_init(packet->pages_alloc); 193 } 194 195 p->pages->used = be32_to_cpu(packet->pages_used); 196 if (p->pages->used > packet->pages_alloc) { 197 error_setg(errp, "multifd: received packet " 198 "with %d pages and expected maximum pages are %d", 199 p->pages->used, packet->pages_alloc) ; 200 return -1; 201 } 202 203 p->next_packet_size = be32_to_cpu(packet->next_packet_size); 204 p->packet_num = be64_to_cpu(packet->packet_num); 205 206 if (p->pages->used == 0) { 207 return 0; 208 } 209 210 /* make sure that ramblock is 0 terminated */ 211 packet->ramblock[255] = 0; 212 block = qemu_ram_block_by_name(packet->ramblock); 213 if (!block) { 214 error_setg(errp, "multifd: unknown ram block %s", 215 packet->ramblock); 216 return -1; 217 } 218 219 for (i = 0; i < p->pages->used; i++) { 220 uint64_t offset = be64_to_cpu(packet->offset[i]); 221 222 if (offset > (block->used_length - qemu_target_page_size())) { 223 error_setg(errp, "multifd: offset too long %" PRIu64 224 " (max " RAM_ADDR_FMT ")", 225 offset, block->max_length); 226 return -1; 227 } 228 p->pages->iov[i].iov_base = block->host + offset; 229 p->pages->iov[i].iov_len = qemu_target_page_size(); 230 } 231 232 return 0; 233 } 234 235 struct { 236 MultiFDSendParams *params; 237 /* array of pages to sent */ 238 MultiFDPages_t *pages; 239 /* global number of generated multifd packets */ 240 uint64_t packet_num; 241 /* send channels ready */ 242 QemuSemaphore channels_ready; 243 /* 244 * Have we already run terminate threads. There is a race when it 245 * happens that we got one error while we are exiting. 246 * We will use atomic operations. Only valid values are 0 and 1. 247 */ 248 int exiting; 249 } *multifd_send_state; 250 251 /* 252 * How we use multifd_send_state->pages and channel->pages? 253 * 254 * We create a pages for each channel, and a main one. Each time that 255 * we need to send a batch of pages we interchange the ones between 256 * multifd_send_state and the channel that is sending it. There are 257 * two reasons for that: 258 * - to not have to do so many mallocs during migration 259 * - to make easier to know what to free at the end of migration 260 * 261 * This way we always know who is the owner of each "pages" struct, 262 * and we don't need any locking. It belongs to the migration thread 263 * or to the channel thread. Switching is safe because the migration 264 * thread is using the channel mutex when changing it, and the channel 265 * have to had finish with its own, otherwise pending_job can't be 266 * false. 267 */ 268 269 static int multifd_send_pages(QEMUFile *f) 270 { 271 int i; 272 static int next_channel; 273 MultiFDSendParams *p = NULL; /* make happy gcc */ 274 MultiFDPages_t *pages = multifd_send_state->pages; 275 uint64_t transferred; 276 277 if (atomic_read(&multifd_send_state->exiting)) { 278 return -1; 279 } 280 281 qemu_sem_wait(&multifd_send_state->channels_ready); 282 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { 283 p = &multifd_send_state->params[i]; 284 285 qemu_mutex_lock(&p->mutex); 286 if (p->quit) { 287 error_report("%s: channel %d has already quit!", __func__, i); 288 qemu_mutex_unlock(&p->mutex); 289 return -1; 290 } 291 if (!p->pending_job) { 292 p->pending_job++; 293 next_channel = (i + 1) % migrate_multifd_channels(); 294 break; 295 } 296 qemu_mutex_unlock(&p->mutex); 297 } 298 assert(!p->pages->used); 299 assert(!p->pages->block); 300 301 p->packet_num = multifd_send_state->packet_num++; 302 multifd_send_state->pages = p->pages; 303 p->pages = pages; 304 transferred = ((uint64_t) pages->used) * qemu_target_page_size() 305 + p->packet_len; 306 qemu_file_update_transfer(f, transferred); 307 ram_counters.multifd_bytes += transferred; 308 ram_counters.transferred += transferred;; 309 qemu_mutex_unlock(&p->mutex); 310 qemu_sem_post(&p->sem); 311 312 return 1; 313 } 314 315 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) 316 { 317 MultiFDPages_t *pages = multifd_send_state->pages; 318 319 if (!pages->block) { 320 pages->block = block; 321 } 322 323 if (pages->block == block) { 324 pages->offset[pages->used] = offset; 325 pages->iov[pages->used].iov_base = block->host + offset; 326 pages->iov[pages->used].iov_len = qemu_target_page_size(); 327 pages->used++; 328 329 if (pages->used < pages->allocated) { 330 return 1; 331 } 332 } 333 334 if (multifd_send_pages(f) < 0) { 335 return -1; 336 } 337 338 if (pages->block != block) { 339 return multifd_queue_page(f, block, offset); 340 } 341 342 return 1; 343 } 344 345 static void multifd_send_terminate_threads(Error *err) 346 { 347 int i; 348 349 trace_multifd_send_terminate_threads(err != NULL); 350 351 if (err) { 352 MigrationState *s = migrate_get_current(); 353 migrate_set_error(s, err); 354 if (s->state == MIGRATION_STATUS_SETUP || 355 s->state == MIGRATION_STATUS_PRE_SWITCHOVER || 356 s->state == MIGRATION_STATUS_DEVICE || 357 s->state == MIGRATION_STATUS_ACTIVE) { 358 migrate_set_state(&s->state, s->state, 359 MIGRATION_STATUS_FAILED); 360 } 361 } 362 363 /* 364 * We don't want to exit each threads twice. Depending on where 365 * we get the error, or if there are two independent errors in two 366 * threads at the same time, we can end calling this function 367 * twice. 368 */ 369 if (atomic_xchg(&multifd_send_state->exiting, 1)) { 370 return; 371 } 372 373 for (i = 0; i < migrate_multifd_channels(); i++) { 374 MultiFDSendParams *p = &multifd_send_state->params[i]; 375 376 qemu_mutex_lock(&p->mutex); 377 p->quit = true; 378 qemu_sem_post(&p->sem); 379 qemu_mutex_unlock(&p->mutex); 380 } 381 } 382 383 void multifd_save_cleanup(void) 384 { 385 int i; 386 387 if (!migrate_use_multifd()) { 388 return; 389 } 390 multifd_send_terminate_threads(NULL); 391 for (i = 0; i < migrate_multifd_channels(); i++) { 392 MultiFDSendParams *p = &multifd_send_state->params[i]; 393 394 if (p->running) { 395 qemu_thread_join(&p->thread); 396 } 397 } 398 for (i = 0; i < migrate_multifd_channels(); i++) { 399 MultiFDSendParams *p = &multifd_send_state->params[i]; 400 401 socket_send_channel_destroy(p->c); 402 p->c = NULL; 403 qemu_mutex_destroy(&p->mutex); 404 qemu_sem_destroy(&p->sem); 405 qemu_sem_destroy(&p->sem_sync); 406 g_free(p->name); 407 p->name = NULL; 408 multifd_pages_clear(p->pages); 409 p->pages = NULL; 410 p->packet_len = 0; 411 g_free(p->packet); 412 p->packet = NULL; 413 } 414 qemu_sem_destroy(&multifd_send_state->channels_ready); 415 g_free(multifd_send_state->params); 416 multifd_send_state->params = NULL; 417 multifd_pages_clear(multifd_send_state->pages); 418 multifd_send_state->pages = NULL; 419 g_free(multifd_send_state); 420 multifd_send_state = NULL; 421 } 422 423 void multifd_send_sync_main(QEMUFile *f) 424 { 425 int i; 426 427 if (!migrate_use_multifd()) { 428 return; 429 } 430 if (multifd_send_state->pages->used) { 431 if (multifd_send_pages(f) < 0) { 432 error_report("%s: multifd_send_pages fail", __func__); 433 return; 434 } 435 } 436 for (i = 0; i < migrate_multifd_channels(); i++) { 437 MultiFDSendParams *p = &multifd_send_state->params[i]; 438 439 trace_multifd_send_sync_main_signal(p->id); 440 441 qemu_mutex_lock(&p->mutex); 442 443 if (p->quit) { 444 error_report("%s: channel %d has already quit", __func__, i); 445 qemu_mutex_unlock(&p->mutex); 446 return; 447 } 448 449 p->packet_num = multifd_send_state->packet_num++; 450 p->flags |= MULTIFD_FLAG_SYNC; 451 p->pending_job++; 452 qemu_file_update_transfer(f, p->packet_len); 453 ram_counters.multifd_bytes += p->packet_len; 454 ram_counters.transferred += p->packet_len; 455 qemu_mutex_unlock(&p->mutex); 456 qemu_sem_post(&p->sem); 457 } 458 for (i = 0; i < migrate_multifd_channels(); i++) { 459 MultiFDSendParams *p = &multifd_send_state->params[i]; 460 461 trace_multifd_send_sync_main_wait(p->id); 462 qemu_sem_wait(&p->sem_sync); 463 } 464 trace_multifd_send_sync_main(multifd_send_state->packet_num); 465 } 466 467 static void *multifd_send_thread(void *opaque) 468 { 469 MultiFDSendParams *p = opaque; 470 Error *local_err = NULL; 471 int ret = 0; 472 uint32_t flags = 0; 473 474 trace_multifd_send_thread_start(p->id); 475 rcu_register_thread(); 476 477 if (multifd_send_initial_packet(p, &local_err) < 0) { 478 ret = -1; 479 goto out; 480 } 481 /* initial packet */ 482 p->num_packets = 1; 483 484 while (true) { 485 qemu_sem_wait(&p->sem); 486 487 if (atomic_read(&multifd_send_state->exiting)) { 488 break; 489 } 490 qemu_mutex_lock(&p->mutex); 491 492 if (p->pending_job) { 493 uint32_t used = p->pages->used; 494 uint64_t packet_num = p->packet_num; 495 flags = p->flags; 496 497 p->next_packet_size = used * qemu_target_page_size(); 498 multifd_send_fill_packet(p); 499 p->flags = 0; 500 p->num_packets++; 501 p->num_pages += used; 502 p->pages->used = 0; 503 p->pages->block = NULL; 504 qemu_mutex_unlock(&p->mutex); 505 506 trace_multifd_send(p->id, packet_num, used, flags, 507 p->next_packet_size); 508 509 ret = qio_channel_write_all(p->c, (void *)p->packet, 510 p->packet_len, &local_err); 511 if (ret != 0) { 512 break; 513 } 514 515 if (used) { 516 ret = qio_channel_writev_all(p->c, p->pages->iov, 517 used, &local_err); 518 if (ret != 0) { 519 break; 520 } 521 } 522 523 qemu_mutex_lock(&p->mutex); 524 p->pending_job--; 525 qemu_mutex_unlock(&p->mutex); 526 527 if (flags & MULTIFD_FLAG_SYNC) { 528 qemu_sem_post(&p->sem_sync); 529 } 530 qemu_sem_post(&multifd_send_state->channels_ready); 531 } else if (p->quit) { 532 qemu_mutex_unlock(&p->mutex); 533 break; 534 } else { 535 qemu_mutex_unlock(&p->mutex); 536 /* sometimes there are spurious wakeups */ 537 } 538 } 539 540 out: 541 if (local_err) { 542 trace_multifd_send_error(p->id); 543 multifd_send_terminate_threads(local_err); 544 } 545 546 /* 547 * Error happen, I will exit, but I can't just leave, tell 548 * who pay attention to me. 549 */ 550 if (ret != 0) { 551 qemu_sem_post(&p->sem_sync); 552 qemu_sem_post(&multifd_send_state->channels_ready); 553 } 554 555 qemu_mutex_lock(&p->mutex); 556 p->running = false; 557 qemu_mutex_unlock(&p->mutex); 558 559 rcu_unregister_thread(); 560 trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); 561 562 return NULL; 563 } 564 565 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) 566 { 567 MultiFDSendParams *p = opaque; 568 QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); 569 Error *local_err = NULL; 570 571 trace_multifd_new_send_channel_async(p->id); 572 if (qio_task_propagate_error(task, &local_err)) { 573 migrate_set_error(migrate_get_current(), local_err); 574 /* Error happen, we need to tell who pay attention to me */ 575 qemu_sem_post(&multifd_send_state->channels_ready); 576 qemu_sem_post(&p->sem_sync); 577 /* 578 * Although multifd_send_thread is not created, but main migration 579 * thread neet to judge whether it is running, so we need to mark 580 * its status. 581 */ 582 p->quit = true; 583 } else { 584 p->c = QIO_CHANNEL(sioc); 585 qio_channel_set_delay(p->c, false); 586 p->running = true; 587 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, 588 QEMU_THREAD_JOINABLE); 589 } 590 } 591 592 int multifd_save_setup(Error **errp) 593 { 594 int thread_count; 595 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 596 uint8_t i; 597 598 if (!migrate_use_multifd()) { 599 return 0; 600 } 601 thread_count = migrate_multifd_channels(); 602 multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); 603 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); 604 multifd_send_state->pages = multifd_pages_init(page_count); 605 qemu_sem_init(&multifd_send_state->channels_ready, 0); 606 atomic_set(&multifd_send_state->exiting, 0); 607 608 for (i = 0; i < thread_count; i++) { 609 MultiFDSendParams *p = &multifd_send_state->params[i]; 610 611 qemu_mutex_init(&p->mutex); 612 qemu_sem_init(&p->sem, 0); 613 qemu_sem_init(&p->sem_sync, 0); 614 p->quit = false; 615 p->pending_job = 0; 616 p->id = i; 617 p->pages = multifd_pages_init(page_count); 618 p->packet_len = sizeof(MultiFDPacket_t) 619 + sizeof(uint64_t) * page_count; 620 p->packet = g_malloc0(p->packet_len); 621 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); 622 p->packet->version = cpu_to_be32(MULTIFD_VERSION); 623 p->name = g_strdup_printf("multifdsend_%d", i); 624 socket_send_channel_create(multifd_new_send_channel_async, p); 625 } 626 return 0; 627 } 628 629 struct { 630 MultiFDRecvParams *params; 631 /* number of created threads */ 632 int count; 633 /* syncs main thread and channels */ 634 QemuSemaphore sem_sync; 635 /* global number of generated multifd packets */ 636 uint64_t packet_num; 637 } *multifd_recv_state; 638 639 static void multifd_recv_terminate_threads(Error *err) 640 { 641 int i; 642 643 trace_multifd_recv_terminate_threads(err != NULL); 644 645 if (err) { 646 MigrationState *s = migrate_get_current(); 647 migrate_set_error(s, err); 648 if (s->state == MIGRATION_STATUS_SETUP || 649 s->state == MIGRATION_STATUS_ACTIVE) { 650 migrate_set_state(&s->state, s->state, 651 MIGRATION_STATUS_FAILED); 652 } 653 } 654 655 for (i = 0; i < migrate_multifd_channels(); i++) { 656 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 657 658 qemu_mutex_lock(&p->mutex); 659 p->quit = true; 660 /* 661 * We could arrive here for two reasons: 662 * - normal quit, i.e. everything went fine, just finished 663 * - error quit: We close the channels so the channel threads 664 * finish the qio_channel_read_all_eof() 665 */ 666 if (p->c) { 667 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); 668 } 669 qemu_mutex_unlock(&p->mutex); 670 } 671 } 672 673 int multifd_load_cleanup(Error **errp) 674 { 675 int i; 676 int ret = 0; 677 678 if (!migrate_use_multifd()) { 679 return 0; 680 } 681 multifd_recv_terminate_threads(NULL); 682 for (i = 0; i < migrate_multifd_channels(); i++) { 683 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 684 685 if (p->running) { 686 p->quit = true; 687 /* 688 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, 689 * however try to wakeup it without harm in cleanup phase. 690 */ 691 qemu_sem_post(&p->sem_sync); 692 qemu_thread_join(&p->thread); 693 } 694 } 695 for (i = 0; i < migrate_multifd_channels(); i++) { 696 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 697 698 object_unref(OBJECT(p->c)); 699 p->c = NULL; 700 qemu_mutex_destroy(&p->mutex); 701 qemu_sem_destroy(&p->sem_sync); 702 g_free(p->name); 703 p->name = NULL; 704 multifd_pages_clear(p->pages); 705 p->pages = NULL; 706 p->packet_len = 0; 707 g_free(p->packet); 708 p->packet = NULL; 709 } 710 qemu_sem_destroy(&multifd_recv_state->sem_sync); 711 g_free(multifd_recv_state->params); 712 multifd_recv_state->params = NULL; 713 g_free(multifd_recv_state); 714 multifd_recv_state = NULL; 715 716 return ret; 717 } 718 719 void multifd_recv_sync_main(void) 720 { 721 int i; 722 723 if (!migrate_use_multifd()) { 724 return; 725 } 726 for (i = 0; i < migrate_multifd_channels(); i++) { 727 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 728 729 trace_multifd_recv_sync_main_wait(p->id); 730 qemu_sem_wait(&multifd_recv_state->sem_sync); 731 } 732 for (i = 0; i < migrate_multifd_channels(); i++) { 733 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 734 735 qemu_mutex_lock(&p->mutex); 736 if (multifd_recv_state->packet_num < p->packet_num) { 737 multifd_recv_state->packet_num = p->packet_num; 738 } 739 qemu_mutex_unlock(&p->mutex); 740 trace_multifd_recv_sync_main_signal(p->id); 741 qemu_sem_post(&p->sem_sync); 742 } 743 trace_multifd_recv_sync_main(multifd_recv_state->packet_num); 744 } 745 746 static void *multifd_recv_thread(void *opaque) 747 { 748 MultiFDRecvParams *p = opaque; 749 Error *local_err = NULL; 750 int ret; 751 752 trace_multifd_recv_thread_start(p->id); 753 rcu_register_thread(); 754 755 while (true) { 756 uint32_t used; 757 uint32_t flags; 758 759 if (p->quit) { 760 break; 761 } 762 763 ret = qio_channel_read_all_eof(p->c, (void *)p->packet, 764 p->packet_len, &local_err); 765 if (ret == 0) { /* EOF */ 766 break; 767 } 768 if (ret == -1) { /* Error */ 769 break; 770 } 771 772 qemu_mutex_lock(&p->mutex); 773 ret = multifd_recv_unfill_packet(p, &local_err); 774 if (ret) { 775 qemu_mutex_unlock(&p->mutex); 776 break; 777 } 778 779 used = p->pages->used; 780 flags = p->flags; 781 trace_multifd_recv(p->id, p->packet_num, used, flags, 782 p->next_packet_size); 783 p->num_packets++; 784 p->num_pages += used; 785 qemu_mutex_unlock(&p->mutex); 786 787 if (used) { 788 ret = qio_channel_readv_all(p->c, p->pages->iov, 789 used, &local_err); 790 if (ret != 0) { 791 break; 792 } 793 } 794 795 if (flags & MULTIFD_FLAG_SYNC) { 796 qemu_sem_post(&multifd_recv_state->sem_sync); 797 qemu_sem_wait(&p->sem_sync); 798 } 799 } 800 801 if (local_err) { 802 multifd_recv_terminate_threads(local_err); 803 } 804 qemu_mutex_lock(&p->mutex); 805 p->running = false; 806 qemu_mutex_unlock(&p->mutex); 807 808 rcu_unregister_thread(); 809 trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); 810 811 return NULL; 812 } 813 814 int multifd_load_setup(Error **errp) 815 { 816 int thread_count; 817 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); 818 uint8_t i; 819 820 if (!migrate_use_multifd()) { 821 return 0; 822 } 823 thread_count = migrate_multifd_channels(); 824 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); 825 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); 826 atomic_set(&multifd_recv_state->count, 0); 827 qemu_sem_init(&multifd_recv_state->sem_sync, 0); 828 829 for (i = 0; i < thread_count; i++) { 830 MultiFDRecvParams *p = &multifd_recv_state->params[i]; 831 832 qemu_mutex_init(&p->mutex); 833 qemu_sem_init(&p->sem_sync, 0); 834 p->quit = false; 835 p->id = i; 836 p->pages = multifd_pages_init(page_count); 837 p->packet_len = sizeof(MultiFDPacket_t) 838 + sizeof(uint64_t) * page_count; 839 p->packet = g_malloc0(p->packet_len); 840 p->name = g_strdup_printf("multifdrecv_%d", i); 841 } 842 return 0; 843 } 844 845 bool multifd_recv_all_channels_created(void) 846 { 847 int thread_count = migrate_multifd_channels(); 848 849 if (!migrate_use_multifd()) { 850 return true; 851 } 852 853 return thread_count == atomic_read(&multifd_recv_state->count); 854 } 855 856 /* 857 * Try to receive all multifd channels to get ready for the migration. 858 * - Return true and do not set @errp when correctly receving all channels; 859 * - Return false and do not set @errp when correctly receiving the current one; 860 * - Return false and set @errp when failing to receive the current channel. 861 */ 862 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) 863 { 864 MultiFDRecvParams *p; 865 Error *local_err = NULL; 866 int id; 867 868 id = multifd_recv_initial_packet(ioc, &local_err); 869 if (id < 0) { 870 multifd_recv_terminate_threads(local_err); 871 error_propagate_prepend(errp, local_err, 872 "failed to receive packet" 873 " via multifd channel %d: ", 874 atomic_read(&multifd_recv_state->count)); 875 return false; 876 } 877 trace_multifd_recv_new_channel(id); 878 879 p = &multifd_recv_state->params[id]; 880 if (p->c != NULL) { 881 error_setg(&local_err, "multifd: received id '%d' already setup'", 882 id); 883 multifd_recv_terminate_threads(local_err); 884 error_propagate(errp, local_err); 885 return false; 886 } 887 p->c = ioc; 888 object_ref(OBJECT(ioc)); 889 /* initial packet */ 890 p->num_packets = 1; 891 892 p->running = true; 893 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, 894 QEMU_THREAD_JOINABLE); 895 atomic_inc(&multifd_recv_state->count); 896 return atomic_read(&multifd_recv_state->count) == 897 migrate_multifd_channels(); 898 } 899 900