1 /* 2 * QEMU live migration 3 * 4 * Copyright IBM, Corp. 2008 5 * 6 * Authors: 7 * Anthony Liguori <aliguori@us.ibm.com> 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2. See 10 * the COPYING file in the top-level directory. 11 * 12 * Contributions after 2012-01-13 are licensed under the terms of the 13 * GNU GPL, version 2 or (at your option) any later version. 14 */ 15 16 #include "qemu/osdep.h" 17 #include "qemu/cutils.h" 18 #include "qemu/error-report.h" 19 #include "qemu/main-loop.h" 20 #include "migration/migration.h" 21 #include "migration/qemu-file.h" 22 #include "sysemu/sysemu.h" 23 #include "block/block.h" 24 #include "qapi/qmp/qerror.h" 25 #include "qapi/util.h" 26 #include "qemu/sockets.h" 27 #include "qemu/rcu.h" 28 #include "migration/block.h" 29 #include "migration/postcopy-ram.h" 30 #include "qemu/thread.h" 31 #include "qmp-commands.h" 32 #include "trace.h" 33 #include "qapi-event.h" 34 #include "qom/cpu.h" 35 #include "exec/memory.h" 36 #include "exec/address-spaces.h" 37 #include "io/channel-buffer.h" 38 #include "io/channel-tls.h" 39 40 #define MAX_THROTTLE (32 << 20) /* Migration transfer speed throttling */ 41 42 /* Amount of time to allocate to each "chunk" of bandwidth-throttled 43 * data. */ 44 #define BUFFER_DELAY 100 45 #define XFER_LIMIT_RATIO (1000 / BUFFER_DELAY) 46 47 /* Default compression thread count */ 48 #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 49 /* Default decompression thread count, usually decompression is at 50 * least 4 times as fast as compression.*/ 51 #define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2 52 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */ 53 #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 54 /* Define default autoconverge cpu throttle migration parameters */ 55 #define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20 56 #define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10 57 58 /* Migration XBZRLE default cache size */ 59 #define DEFAULT_MIGRATE_CACHE_SIZE (64 * 1024 * 1024) 60 61 static NotifierList migration_state_notifiers = 62 NOTIFIER_LIST_INITIALIZER(migration_state_notifiers); 63 64 static bool deferred_incoming; 65 66 /* 67 * Current state of incoming postcopy; note this is not part of 68 * MigrationIncomingState since it's state is used during cleanup 69 * at the end as MIS is being freed. 70 */ 71 static PostcopyState incoming_postcopy_state; 72 73 /* When we add fault tolerance, we could have several 74 migrations at once. For now we don't need to add 75 dynamic creation of migration */ 76 77 /* For outgoing */ 78 MigrationState *migrate_get_current(void) 79 { 80 static bool once; 81 static MigrationState current_migration = { 82 .state = MIGRATION_STATUS_NONE, 83 .bandwidth_limit = MAX_THROTTLE, 84 .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, 85 .mbps = -1, 86 .parameters = { 87 .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, 88 .compress_threads = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, 89 .decompress_threads = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT, 90 .cpu_throttle_initial = DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL, 91 .cpu_throttle_increment = DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT, 92 }, 93 }; 94 95 if (!once) { 96 qemu_mutex_init(¤t_migration.src_page_req_mutex); 97 once = true; 98 } 99 return ¤t_migration; 100 } 101 102 /* For incoming */ 103 static MigrationIncomingState *mis_current; 104 105 MigrationIncomingState *migration_incoming_get_current(void) 106 { 107 return mis_current; 108 } 109 110 MigrationIncomingState *migration_incoming_state_new(QEMUFile* f) 111 { 112 mis_current = g_new0(MigrationIncomingState, 1); 113 mis_current->from_src_file = f; 114 mis_current->state = MIGRATION_STATUS_NONE; 115 QLIST_INIT(&mis_current->loadvm_handlers); 116 qemu_mutex_init(&mis_current->rp_mutex); 117 qemu_event_init(&mis_current->main_thread_load_event, false); 118 119 return mis_current; 120 } 121 122 void migration_incoming_state_destroy(void) 123 { 124 qemu_event_destroy(&mis_current->main_thread_load_event); 125 loadvm_free_handlers(mis_current); 126 g_free(mis_current); 127 mis_current = NULL; 128 } 129 130 131 typedef struct { 132 bool optional; 133 uint32_t size; 134 uint8_t runstate[100]; 135 RunState state; 136 bool received; 137 } GlobalState; 138 139 static GlobalState global_state; 140 141 int global_state_store(void) 142 { 143 if (!runstate_store((char *)global_state.runstate, 144 sizeof(global_state.runstate))) { 145 error_report("runstate name too big: %s", global_state.runstate); 146 trace_migrate_state_too_big(); 147 return -EINVAL; 148 } 149 return 0; 150 } 151 152 void global_state_store_running(void) 153 { 154 const char *state = RunState_lookup[RUN_STATE_RUNNING]; 155 strncpy((char *)global_state.runstate, 156 state, sizeof(global_state.runstate)); 157 } 158 159 static bool global_state_received(void) 160 { 161 return global_state.received; 162 } 163 164 static RunState global_state_get_runstate(void) 165 { 166 return global_state.state; 167 } 168 169 void global_state_set_optional(void) 170 { 171 global_state.optional = true; 172 } 173 174 static bool global_state_needed(void *opaque) 175 { 176 GlobalState *s = opaque; 177 char *runstate = (char *)s->runstate; 178 179 /* If it is not optional, it is mandatory */ 180 181 if (s->optional == false) { 182 return true; 183 } 184 185 /* If state is running or paused, it is not needed */ 186 187 if (strcmp(runstate, "running") == 0 || 188 strcmp(runstate, "paused") == 0) { 189 return false; 190 } 191 192 /* for any other state it is needed */ 193 return true; 194 } 195 196 static int global_state_post_load(void *opaque, int version_id) 197 { 198 GlobalState *s = opaque; 199 Error *local_err = NULL; 200 int r; 201 char *runstate = (char *)s->runstate; 202 203 s->received = true; 204 trace_migrate_global_state_post_load(runstate); 205 206 r = qapi_enum_parse(RunState_lookup, runstate, RUN_STATE__MAX, 207 -1, &local_err); 208 209 if (r == -1) { 210 if (local_err) { 211 error_report_err(local_err); 212 } 213 return -EINVAL; 214 } 215 s->state = r; 216 217 return 0; 218 } 219 220 static void global_state_pre_save(void *opaque) 221 { 222 GlobalState *s = opaque; 223 224 trace_migrate_global_state_pre_save((char *)s->runstate); 225 s->size = strlen((char *)s->runstate) + 1; 226 } 227 228 static const VMStateDescription vmstate_globalstate = { 229 .name = "globalstate", 230 .version_id = 1, 231 .minimum_version_id = 1, 232 .post_load = global_state_post_load, 233 .pre_save = global_state_pre_save, 234 .needed = global_state_needed, 235 .fields = (VMStateField[]) { 236 VMSTATE_UINT32(size, GlobalState), 237 VMSTATE_BUFFER(runstate, GlobalState), 238 VMSTATE_END_OF_LIST() 239 }, 240 }; 241 242 void register_global_state(void) 243 { 244 /* We would use it independently that we receive it */ 245 strcpy((char *)&global_state.runstate, ""); 246 global_state.received = false; 247 vmstate_register(NULL, 0, &vmstate_globalstate, &global_state); 248 } 249 250 static void migrate_generate_event(int new_state) 251 { 252 if (migrate_use_events()) { 253 qapi_event_send_migration(new_state, &error_abort); 254 } 255 } 256 257 /* 258 * Called on -incoming with a defer: uri. 259 * The migration can be started later after any parameters have been 260 * changed. 261 */ 262 static void deferred_incoming_migration(Error **errp) 263 { 264 if (deferred_incoming) { 265 error_setg(errp, "Incoming migration already deferred"); 266 } 267 deferred_incoming = true; 268 } 269 270 /* Request a range of pages from the source VM at the given 271 * start address. 272 * rbname: Name of the RAMBlock to request the page in, if NULL it's the same 273 * as the last request (a name must have been given previously) 274 * Start: Address offset within the RB 275 * Len: Length in bytes required - must be a multiple of pagesize 276 */ 277 void migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname, 278 ram_addr_t start, size_t len) 279 { 280 uint8_t bufc[12 + 1 + 255]; /* start (8), len (4), rbname up to 256 */ 281 size_t msglen = 12; /* start + len */ 282 283 *(uint64_t *)bufc = cpu_to_be64((uint64_t)start); 284 *(uint32_t *)(bufc + 8) = cpu_to_be32((uint32_t)len); 285 286 if (rbname) { 287 int rbname_len = strlen(rbname); 288 assert(rbname_len < 256); 289 290 bufc[msglen++] = rbname_len; 291 memcpy(bufc + msglen, rbname, rbname_len); 292 msglen += rbname_len; 293 migrate_send_rp_message(mis, MIG_RP_MSG_REQ_PAGES_ID, msglen, bufc); 294 } else { 295 migrate_send_rp_message(mis, MIG_RP_MSG_REQ_PAGES, msglen, bufc); 296 } 297 } 298 299 void qemu_start_incoming_migration(const char *uri, Error **errp) 300 { 301 const char *p; 302 303 qapi_event_send_migration(MIGRATION_STATUS_SETUP, &error_abort); 304 if (!strcmp(uri, "defer")) { 305 deferred_incoming_migration(errp); 306 } else if (strstart(uri, "tcp:", &p)) { 307 tcp_start_incoming_migration(p, errp); 308 #ifdef CONFIG_RDMA 309 } else if (strstart(uri, "rdma:", &p)) { 310 rdma_start_incoming_migration(p, errp); 311 #endif 312 } else if (strstart(uri, "exec:", &p)) { 313 exec_start_incoming_migration(p, errp); 314 } else if (strstart(uri, "unix:", &p)) { 315 unix_start_incoming_migration(p, errp); 316 } else if (strstart(uri, "fd:", &p)) { 317 fd_start_incoming_migration(p, errp); 318 } else { 319 error_setg(errp, "unknown migration protocol: %s", uri); 320 } 321 } 322 323 static void process_incoming_migration_bh(void *opaque) 324 { 325 Error *local_err = NULL; 326 MigrationIncomingState *mis = opaque; 327 328 /* Make sure all file formats flush their mutable metadata */ 329 bdrv_invalidate_cache_all(&local_err); 330 if (local_err) { 331 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 332 MIGRATION_STATUS_FAILED); 333 error_report_err(local_err); 334 migrate_decompress_threads_join(); 335 exit(EXIT_FAILURE); 336 } 337 338 /* 339 * This must happen after all error conditions are dealt with and 340 * we're sure the VM is going to be running on this host. 341 */ 342 qemu_announce_self(); 343 344 /* If global state section was not received or we are in running 345 state, we need to obey autostart. Any other state is set with 346 runstate_set. */ 347 348 if (!global_state_received() || 349 global_state_get_runstate() == RUN_STATE_RUNNING) { 350 if (autostart) { 351 vm_start(); 352 } else { 353 runstate_set(RUN_STATE_PAUSED); 354 } 355 } else { 356 runstate_set(global_state_get_runstate()); 357 } 358 migrate_decompress_threads_join(); 359 /* 360 * This must happen after any state changes since as soon as an external 361 * observer sees this event they might start to prod at the VM assuming 362 * it's ready to use. 363 */ 364 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 365 MIGRATION_STATUS_COMPLETED); 366 qemu_bh_delete(mis->bh); 367 migration_incoming_state_destroy(); 368 } 369 370 static void process_incoming_migration_co(void *opaque) 371 { 372 QEMUFile *f = opaque; 373 MigrationIncomingState *mis; 374 PostcopyState ps; 375 int ret; 376 377 mis = migration_incoming_state_new(f); 378 postcopy_state_set(POSTCOPY_INCOMING_NONE); 379 migrate_set_state(&mis->state, MIGRATION_STATUS_NONE, 380 MIGRATION_STATUS_ACTIVE); 381 ret = qemu_loadvm_state(f); 382 383 ps = postcopy_state_get(); 384 trace_process_incoming_migration_co_end(ret, ps); 385 if (ps != POSTCOPY_INCOMING_NONE) { 386 if (ps == POSTCOPY_INCOMING_ADVISE) { 387 /* 388 * Where a migration had postcopy enabled (and thus went to advise) 389 * but managed to complete within the precopy period, we can use 390 * the normal exit. 391 */ 392 postcopy_ram_incoming_cleanup(mis); 393 } else if (ret >= 0) { 394 /* 395 * Postcopy was started, cleanup should happen at the end of the 396 * postcopy thread. 397 */ 398 trace_process_incoming_migration_co_postcopy_end_main(); 399 return; 400 } 401 /* Else if something went wrong then just fall out of the normal exit */ 402 } 403 404 qemu_fclose(f); 405 free_xbzrle_decoded_buf(); 406 407 if (ret < 0) { 408 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 409 MIGRATION_STATUS_FAILED); 410 error_report("load of migration failed: %s", strerror(-ret)); 411 migrate_decompress_threads_join(); 412 exit(EXIT_FAILURE); 413 } 414 415 mis->bh = qemu_bh_new(process_incoming_migration_bh, mis); 416 qemu_bh_schedule(mis->bh); 417 } 418 419 void process_incoming_migration(QEMUFile *f) 420 { 421 Coroutine *co = qemu_coroutine_create(process_incoming_migration_co); 422 423 migrate_decompress_threads_create(); 424 qemu_file_set_blocking(f, false); 425 qemu_coroutine_enter(co, f); 426 } 427 428 429 void migration_set_incoming_channel(MigrationState *s, 430 QIOChannel *ioc) 431 { 432 trace_migration_set_incoming_channel( 433 ioc, object_get_typename(OBJECT(ioc))); 434 435 if (s->parameters.tls_creds && 436 !object_dynamic_cast(OBJECT(ioc), 437 TYPE_QIO_CHANNEL_TLS)) { 438 Error *local_err = NULL; 439 migration_tls_set_incoming_channel(s, ioc, &local_err); 440 if (local_err) { 441 error_report_err(local_err); 442 } 443 } else { 444 QEMUFile *f = qemu_fopen_channel_input(ioc); 445 process_incoming_migration(f); 446 } 447 } 448 449 450 void migration_set_outgoing_channel(MigrationState *s, 451 QIOChannel *ioc, 452 const char *hostname) 453 { 454 trace_migration_set_outgoing_channel( 455 ioc, object_get_typename(OBJECT(ioc)), hostname); 456 457 if (s->parameters.tls_creds && 458 !object_dynamic_cast(OBJECT(ioc), 459 TYPE_QIO_CHANNEL_TLS)) { 460 Error *local_err = NULL; 461 migration_tls_set_outgoing_channel(s, ioc, hostname, &local_err); 462 if (local_err) { 463 migrate_fd_error(s, local_err); 464 error_free(local_err); 465 } 466 } else { 467 QEMUFile *f = qemu_fopen_channel_output(ioc); 468 469 s->to_dst_file = f; 470 471 migrate_fd_connect(s); 472 } 473 } 474 475 476 /* 477 * Send a message on the return channel back to the source 478 * of the migration. 479 */ 480 void migrate_send_rp_message(MigrationIncomingState *mis, 481 enum mig_rp_message_type message_type, 482 uint16_t len, void *data) 483 { 484 trace_migrate_send_rp_message((int)message_type, len); 485 qemu_mutex_lock(&mis->rp_mutex); 486 qemu_put_be16(mis->to_src_file, (unsigned int)message_type); 487 qemu_put_be16(mis->to_src_file, len); 488 qemu_put_buffer(mis->to_src_file, data, len); 489 qemu_fflush(mis->to_src_file); 490 qemu_mutex_unlock(&mis->rp_mutex); 491 } 492 493 /* 494 * Send a 'SHUT' message on the return channel with the given value 495 * to indicate that we've finished with the RP. Non-0 value indicates 496 * error. 497 */ 498 void migrate_send_rp_shut(MigrationIncomingState *mis, 499 uint32_t value) 500 { 501 uint32_t buf; 502 503 buf = cpu_to_be32(value); 504 migrate_send_rp_message(mis, MIG_RP_MSG_SHUT, sizeof(buf), &buf); 505 } 506 507 /* 508 * Send a 'PONG' message on the return channel with the given value 509 * (normally in response to a 'PING') 510 */ 511 void migrate_send_rp_pong(MigrationIncomingState *mis, 512 uint32_t value) 513 { 514 uint32_t buf; 515 516 buf = cpu_to_be32(value); 517 migrate_send_rp_message(mis, MIG_RP_MSG_PONG, sizeof(buf), &buf); 518 } 519 520 /* amount of nanoseconds we are willing to wait for migration to be down. 521 * the choice of nanoseconds is because it is the maximum resolution that 522 * get_clock() can achieve. It is an internal measure. All user-visible 523 * units must be in seconds */ 524 static uint64_t max_downtime = 300000000; 525 526 uint64_t migrate_max_downtime(void) 527 { 528 return max_downtime; 529 } 530 531 MigrationCapabilityStatusList *qmp_query_migrate_capabilities(Error **errp) 532 { 533 MigrationCapabilityStatusList *head = NULL; 534 MigrationCapabilityStatusList *caps; 535 MigrationState *s = migrate_get_current(); 536 int i; 537 538 caps = NULL; /* silence compiler warning */ 539 for (i = 0; i < MIGRATION_CAPABILITY__MAX; i++) { 540 if (head == NULL) { 541 head = g_malloc0(sizeof(*caps)); 542 caps = head; 543 } else { 544 caps->next = g_malloc0(sizeof(*caps)); 545 caps = caps->next; 546 } 547 caps->value = 548 g_malloc(sizeof(*caps->value)); 549 caps->value->capability = i; 550 caps->value->state = s->enabled_capabilities[i]; 551 } 552 553 return head; 554 } 555 556 MigrationParameters *qmp_query_migrate_parameters(Error **errp) 557 { 558 MigrationParameters *params; 559 MigrationState *s = migrate_get_current(); 560 561 params = g_malloc0(sizeof(*params)); 562 params->compress_level = s->parameters.compress_level; 563 params->compress_threads = s->parameters.compress_threads; 564 params->decompress_threads = s->parameters.decompress_threads; 565 params->cpu_throttle_initial = s->parameters.cpu_throttle_initial; 566 params->cpu_throttle_increment = s->parameters.cpu_throttle_increment; 567 params->tls_creds = g_strdup(s->parameters.tls_creds); 568 params->tls_hostname = g_strdup(s->parameters.tls_hostname); 569 570 return params; 571 } 572 573 /* 574 * Return true if we're already in the middle of a migration 575 * (i.e. any of the active or setup states) 576 */ 577 static bool migration_is_setup_or_active(int state) 578 { 579 switch (state) { 580 case MIGRATION_STATUS_ACTIVE: 581 case MIGRATION_STATUS_POSTCOPY_ACTIVE: 582 case MIGRATION_STATUS_SETUP: 583 return true; 584 585 default: 586 return false; 587 588 } 589 } 590 591 static void get_xbzrle_cache_stats(MigrationInfo *info) 592 { 593 if (migrate_use_xbzrle()) { 594 info->has_xbzrle_cache = true; 595 info->xbzrle_cache = g_malloc0(sizeof(*info->xbzrle_cache)); 596 info->xbzrle_cache->cache_size = migrate_xbzrle_cache_size(); 597 info->xbzrle_cache->bytes = xbzrle_mig_bytes_transferred(); 598 info->xbzrle_cache->pages = xbzrle_mig_pages_transferred(); 599 info->xbzrle_cache->cache_miss = xbzrle_mig_pages_cache_miss(); 600 info->xbzrle_cache->cache_miss_rate = xbzrle_mig_cache_miss_rate(); 601 info->xbzrle_cache->overflow = xbzrle_mig_pages_overflow(); 602 } 603 } 604 605 MigrationInfo *qmp_query_migrate(Error **errp) 606 { 607 MigrationInfo *info = g_malloc0(sizeof(*info)); 608 MigrationState *s = migrate_get_current(); 609 610 switch (s->state) { 611 case MIGRATION_STATUS_NONE: 612 /* no migration has happened ever */ 613 break; 614 case MIGRATION_STATUS_SETUP: 615 info->has_status = true; 616 info->has_total_time = false; 617 break; 618 case MIGRATION_STATUS_ACTIVE: 619 case MIGRATION_STATUS_CANCELLING: 620 info->has_status = true; 621 info->has_total_time = true; 622 info->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME) 623 - s->total_time; 624 info->has_expected_downtime = true; 625 info->expected_downtime = s->expected_downtime; 626 info->has_setup_time = true; 627 info->setup_time = s->setup_time; 628 629 info->has_ram = true; 630 info->ram = g_malloc0(sizeof(*info->ram)); 631 info->ram->transferred = ram_bytes_transferred(); 632 info->ram->remaining = ram_bytes_remaining(); 633 info->ram->total = ram_bytes_total(); 634 info->ram->duplicate = dup_mig_pages_transferred(); 635 info->ram->skipped = skipped_mig_pages_transferred(); 636 info->ram->normal = norm_mig_pages_transferred(); 637 info->ram->normal_bytes = norm_mig_bytes_transferred(); 638 info->ram->dirty_pages_rate = s->dirty_pages_rate; 639 info->ram->mbps = s->mbps; 640 info->ram->dirty_sync_count = s->dirty_sync_count; 641 642 if (blk_mig_active()) { 643 info->has_disk = true; 644 info->disk = g_malloc0(sizeof(*info->disk)); 645 info->disk->transferred = blk_mig_bytes_transferred(); 646 info->disk->remaining = blk_mig_bytes_remaining(); 647 info->disk->total = blk_mig_bytes_total(); 648 } 649 650 if (cpu_throttle_active()) { 651 info->has_cpu_throttle_percentage = true; 652 info->cpu_throttle_percentage = cpu_throttle_get_percentage(); 653 } 654 655 get_xbzrle_cache_stats(info); 656 break; 657 case MIGRATION_STATUS_POSTCOPY_ACTIVE: 658 /* Mostly the same as active; TODO add some postcopy stats */ 659 info->has_status = true; 660 info->has_total_time = true; 661 info->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME) 662 - s->total_time; 663 info->has_expected_downtime = true; 664 info->expected_downtime = s->expected_downtime; 665 info->has_setup_time = true; 666 info->setup_time = s->setup_time; 667 668 info->has_ram = true; 669 info->ram = g_malloc0(sizeof(*info->ram)); 670 info->ram->transferred = ram_bytes_transferred(); 671 info->ram->remaining = ram_bytes_remaining(); 672 info->ram->total = ram_bytes_total(); 673 info->ram->duplicate = dup_mig_pages_transferred(); 674 info->ram->skipped = skipped_mig_pages_transferred(); 675 info->ram->normal = norm_mig_pages_transferred(); 676 info->ram->normal_bytes = norm_mig_bytes_transferred(); 677 info->ram->dirty_pages_rate = s->dirty_pages_rate; 678 info->ram->mbps = s->mbps; 679 info->ram->dirty_sync_count = s->dirty_sync_count; 680 681 if (blk_mig_active()) { 682 info->has_disk = true; 683 info->disk = g_malloc0(sizeof(*info->disk)); 684 info->disk->transferred = blk_mig_bytes_transferred(); 685 info->disk->remaining = blk_mig_bytes_remaining(); 686 info->disk->total = blk_mig_bytes_total(); 687 } 688 689 get_xbzrle_cache_stats(info); 690 break; 691 case MIGRATION_STATUS_COMPLETED: 692 get_xbzrle_cache_stats(info); 693 694 info->has_status = true; 695 info->has_total_time = true; 696 info->total_time = s->total_time; 697 info->has_downtime = true; 698 info->downtime = s->downtime; 699 info->has_setup_time = true; 700 info->setup_time = s->setup_time; 701 702 info->has_ram = true; 703 info->ram = g_malloc0(sizeof(*info->ram)); 704 info->ram->transferred = ram_bytes_transferred(); 705 info->ram->remaining = 0; 706 info->ram->total = ram_bytes_total(); 707 info->ram->duplicate = dup_mig_pages_transferred(); 708 info->ram->skipped = skipped_mig_pages_transferred(); 709 info->ram->normal = norm_mig_pages_transferred(); 710 info->ram->normal_bytes = norm_mig_bytes_transferred(); 711 info->ram->mbps = s->mbps; 712 info->ram->dirty_sync_count = s->dirty_sync_count; 713 break; 714 case MIGRATION_STATUS_FAILED: 715 info->has_status = true; 716 if (s->error) { 717 info->has_error_desc = true; 718 info->error_desc = g_strdup(error_get_pretty(s->error)); 719 } 720 break; 721 case MIGRATION_STATUS_CANCELLED: 722 info->has_status = true; 723 break; 724 } 725 info->status = s->state; 726 727 return info; 728 } 729 730 void qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params, 731 Error **errp) 732 { 733 MigrationState *s = migrate_get_current(); 734 MigrationCapabilityStatusList *cap; 735 736 if (migration_is_setup_or_active(s->state)) { 737 error_setg(errp, QERR_MIGRATION_ACTIVE); 738 return; 739 } 740 741 for (cap = params; cap; cap = cap->next) { 742 s->enabled_capabilities[cap->value->capability] = cap->value->state; 743 } 744 745 if (migrate_postcopy_ram()) { 746 if (migrate_use_compression()) { 747 /* The decompression threads asynchronously write into RAM 748 * rather than use the atomic copies needed to avoid 749 * userfaulting. It should be possible to fix the decompression 750 * threads for compatibility in future. 751 */ 752 error_report("Postcopy is not currently compatible with " 753 "compression"); 754 s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_RAM] = 755 false; 756 } 757 } 758 } 759 760 void qmp_migrate_set_parameters(bool has_compress_level, 761 int64_t compress_level, 762 bool has_compress_threads, 763 int64_t compress_threads, 764 bool has_decompress_threads, 765 int64_t decompress_threads, 766 bool has_cpu_throttle_initial, 767 int64_t cpu_throttle_initial, 768 bool has_cpu_throttle_increment, 769 int64_t cpu_throttle_increment, 770 bool has_tls_creds, 771 const char *tls_creds, 772 bool has_tls_hostname, 773 const char *tls_hostname, 774 Error **errp) 775 { 776 MigrationState *s = migrate_get_current(); 777 778 if (has_compress_level && (compress_level < 0 || compress_level > 9)) { 779 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level", 780 "is invalid, it should be in the range of 0 to 9"); 781 return; 782 } 783 if (has_compress_threads && 784 (compress_threads < 1 || compress_threads > 255)) { 785 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 786 "compress_threads", 787 "is invalid, it should be in the range of 1 to 255"); 788 return; 789 } 790 if (has_decompress_threads && 791 (decompress_threads < 1 || decompress_threads > 255)) { 792 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 793 "decompress_threads", 794 "is invalid, it should be in the range of 1 to 255"); 795 return; 796 } 797 if (has_cpu_throttle_initial && 798 (cpu_throttle_initial < 1 || cpu_throttle_initial > 99)) { 799 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 800 "cpu_throttle_initial", 801 "an integer in the range of 1 to 99"); 802 } 803 if (has_cpu_throttle_increment && 804 (cpu_throttle_increment < 1 || cpu_throttle_increment > 99)) { 805 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, 806 "cpu_throttle_increment", 807 "an integer in the range of 1 to 99"); 808 } 809 810 if (has_compress_level) { 811 s->parameters.compress_level = compress_level; 812 } 813 if (has_compress_threads) { 814 s->parameters.compress_threads = compress_threads; 815 } 816 if (has_decompress_threads) { 817 s->parameters.decompress_threads = decompress_threads; 818 } 819 if (has_cpu_throttle_initial) { 820 s->parameters.cpu_throttle_initial = cpu_throttle_initial; 821 } 822 if (has_cpu_throttle_increment) { 823 s->parameters.cpu_throttle_increment = cpu_throttle_increment; 824 } 825 if (has_tls_creds) { 826 g_free(s->parameters.tls_creds); 827 s->parameters.tls_creds = g_strdup(tls_creds); 828 } 829 if (has_tls_hostname) { 830 g_free(s->parameters.tls_hostname); 831 s->parameters.tls_hostname = g_strdup(tls_hostname); 832 } 833 } 834 835 836 void qmp_migrate_start_postcopy(Error **errp) 837 { 838 MigrationState *s = migrate_get_current(); 839 840 if (!migrate_postcopy_ram()) { 841 error_setg(errp, "Enable postcopy with migrate_set_capability before" 842 " the start of migration"); 843 return; 844 } 845 846 if (s->state == MIGRATION_STATUS_NONE) { 847 error_setg(errp, "Postcopy must be started after migration has been" 848 " started"); 849 return; 850 } 851 /* 852 * we don't error if migration has finished since that would be racy 853 * with issuing this command. 854 */ 855 atomic_set(&s->start_postcopy, true); 856 } 857 858 /* shared migration helpers */ 859 860 void migrate_set_state(int *state, int old_state, int new_state) 861 { 862 if (atomic_cmpxchg(state, old_state, new_state) == old_state) { 863 trace_migrate_set_state(new_state); 864 migrate_generate_event(new_state); 865 } 866 } 867 868 static void migrate_fd_cleanup(void *opaque) 869 { 870 MigrationState *s = opaque; 871 872 qemu_bh_delete(s->cleanup_bh); 873 s->cleanup_bh = NULL; 874 875 flush_page_queue(s); 876 877 if (s->to_dst_file) { 878 trace_migrate_fd_cleanup(); 879 qemu_mutex_unlock_iothread(); 880 if (s->migration_thread_running) { 881 qemu_thread_join(&s->thread); 882 s->migration_thread_running = false; 883 } 884 qemu_mutex_lock_iothread(); 885 886 migrate_compress_threads_join(); 887 qemu_fclose(s->to_dst_file); 888 s->to_dst_file = NULL; 889 } 890 891 assert((s->state != MIGRATION_STATUS_ACTIVE) && 892 (s->state != MIGRATION_STATUS_POSTCOPY_ACTIVE)); 893 894 if (s->state == MIGRATION_STATUS_CANCELLING) { 895 migrate_set_state(&s->state, MIGRATION_STATUS_CANCELLING, 896 MIGRATION_STATUS_CANCELLED); 897 } 898 899 notifier_list_notify(&migration_state_notifiers, s); 900 } 901 902 void migrate_fd_error(MigrationState *s, const Error *error) 903 { 904 trace_migrate_fd_error(error ? error_get_pretty(error) : ""); 905 assert(s->to_dst_file == NULL); 906 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 907 MIGRATION_STATUS_FAILED); 908 if (!s->error) { 909 s->error = error_copy(error); 910 } 911 notifier_list_notify(&migration_state_notifiers, s); 912 } 913 914 static void migrate_fd_cancel(MigrationState *s) 915 { 916 int old_state ; 917 QEMUFile *f = migrate_get_current()->to_dst_file; 918 trace_migrate_fd_cancel(); 919 920 if (s->rp_state.from_dst_file) { 921 /* shutdown the rp socket, so causing the rp thread to shutdown */ 922 qemu_file_shutdown(s->rp_state.from_dst_file); 923 } 924 925 do { 926 old_state = s->state; 927 if (!migration_is_setup_or_active(old_state)) { 928 break; 929 } 930 migrate_set_state(&s->state, old_state, MIGRATION_STATUS_CANCELLING); 931 } while (s->state != MIGRATION_STATUS_CANCELLING); 932 933 /* 934 * If we're unlucky the migration code might be stuck somewhere in a 935 * send/write while the network has failed and is waiting to timeout; 936 * if we've got shutdown(2) available then we can force it to quit. 937 * The outgoing qemu file gets closed in migrate_fd_cleanup that is 938 * called in a bh, so there is no race against this cancel. 939 */ 940 if (s->state == MIGRATION_STATUS_CANCELLING && f) { 941 qemu_file_shutdown(f); 942 } 943 } 944 945 void add_migration_state_change_notifier(Notifier *notify) 946 { 947 notifier_list_add(&migration_state_notifiers, notify); 948 } 949 950 void remove_migration_state_change_notifier(Notifier *notify) 951 { 952 notifier_remove(notify); 953 } 954 955 bool migration_in_setup(MigrationState *s) 956 { 957 return s->state == MIGRATION_STATUS_SETUP; 958 } 959 960 bool migration_has_finished(MigrationState *s) 961 { 962 return s->state == MIGRATION_STATUS_COMPLETED; 963 } 964 965 bool migration_has_failed(MigrationState *s) 966 { 967 return (s->state == MIGRATION_STATUS_CANCELLED || 968 s->state == MIGRATION_STATUS_FAILED); 969 } 970 971 bool migration_in_postcopy(MigrationState *s) 972 { 973 return (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE); 974 } 975 976 bool migration_in_postcopy_after_devices(MigrationState *s) 977 { 978 return migration_in_postcopy(s) && s->postcopy_after_devices; 979 } 980 981 MigrationState *migrate_init(const MigrationParams *params) 982 { 983 MigrationState *s = migrate_get_current(); 984 985 /* 986 * Reinitialise all migration state, except 987 * parameters/capabilities that the user set, and 988 * locks. 989 */ 990 s->bytes_xfer = 0; 991 s->xfer_limit = 0; 992 s->cleanup_bh = 0; 993 s->to_dst_file = NULL; 994 s->state = MIGRATION_STATUS_NONE; 995 s->params = *params; 996 s->rp_state.from_dst_file = NULL; 997 s->rp_state.error = false; 998 s->mbps = 0.0; 999 s->downtime = 0; 1000 s->expected_downtime = 0; 1001 s->dirty_pages_rate = 0; 1002 s->dirty_bytes_rate = 0; 1003 s->setup_time = 0; 1004 s->dirty_sync_count = 0; 1005 s->start_postcopy = false; 1006 s->postcopy_after_devices = false; 1007 s->migration_thread_running = false; 1008 s->last_req_rb = NULL; 1009 error_free(s->error); 1010 s->error = NULL; 1011 1012 migrate_set_state(&s->state, MIGRATION_STATUS_NONE, MIGRATION_STATUS_SETUP); 1013 1014 QSIMPLEQ_INIT(&s->src_page_requests); 1015 1016 s->total_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); 1017 return s; 1018 } 1019 1020 static GSList *migration_blockers; 1021 1022 void migrate_add_blocker(Error *reason) 1023 { 1024 migration_blockers = g_slist_prepend(migration_blockers, reason); 1025 } 1026 1027 void migrate_del_blocker(Error *reason) 1028 { 1029 migration_blockers = g_slist_remove(migration_blockers, reason); 1030 } 1031 1032 void qmp_migrate_incoming(const char *uri, Error **errp) 1033 { 1034 Error *local_err = NULL; 1035 static bool once = true; 1036 1037 if (!deferred_incoming) { 1038 error_setg(errp, "For use with '-incoming defer'"); 1039 return; 1040 } 1041 if (!once) { 1042 error_setg(errp, "The incoming migration has already been started"); 1043 } 1044 1045 qemu_start_incoming_migration(uri, &local_err); 1046 1047 if (local_err) { 1048 error_propagate(errp, local_err); 1049 return; 1050 } 1051 1052 once = false; 1053 } 1054 1055 bool migration_is_blocked(Error **errp) 1056 { 1057 if (qemu_savevm_state_blocked(errp)) { 1058 return true; 1059 } 1060 1061 if (migration_blockers) { 1062 *errp = error_copy(migration_blockers->data); 1063 return true; 1064 } 1065 1066 return false; 1067 } 1068 1069 void qmp_migrate(const char *uri, bool has_blk, bool blk, 1070 bool has_inc, bool inc, bool has_detach, bool detach, 1071 Error **errp) 1072 { 1073 Error *local_err = NULL; 1074 MigrationState *s = migrate_get_current(); 1075 MigrationParams params; 1076 const char *p; 1077 1078 params.blk = has_blk && blk; 1079 params.shared = has_inc && inc; 1080 1081 if (migration_is_setup_or_active(s->state) || 1082 s->state == MIGRATION_STATUS_CANCELLING) { 1083 error_setg(errp, QERR_MIGRATION_ACTIVE); 1084 return; 1085 } 1086 if (runstate_check(RUN_STATE_INMIGRATE)) { 1087 error_setg(errp, "Guest is waiting for an incoming migration"); 1088 return; 1089 } 1090 1091 if (migration_is_blocked(errp)) { 1092 return; 1093 } 1094 1095 s = migrate_init(¶ms); 1096 1097 if (strstart(uri, "tcp:", &p)) { 1098 tcp_start_outgoing_migration(s, p, &local_err); 1099 #ifdef CONFIG_RDMA 1100 } else if (strstart(uri, "rdma:", &p)) { 1101 rdma_start_outgoing_migration(s, p, &local_err); 1102 #endif 1103 } else if (strstart(uri, "exec:", &p)) { 1104 exec_start_outgoing_migration(s, p, &local_err); 1105 } else if (strstart(uri, "unix:", &p)) { 1106 unix_start_outgoing_migration(s, p, &local_err); 1107 } else if (strstart(uri, "fd:", &p)) { 1108 fd_start_outgoing_migration(s, p, &local_err); 1109 } else { 1110 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "uri", 1111 "a valid migration protocol"); 1112 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1113 MIGRATION_STATUS_FAILED); 1114 return; 1115 } 1116 1117 if (local_err) { 1118 migrate_fd_error(s, local_err); 1119 error_propagate(errp, local_err); 1120 return; 1121 } 1122 } 1123 1124 void qmp_migrate_cancel(Error **errp) 1125 { 1126 migrate_fd_cancel(migrate_get_current()); 1127 } 1128 1129 void qmp_migrate_set_cache_size(int64_t value, Error **errp) 1130 { 1131 MigrationState *s = migrate_get_current(); 1132 int64_t new_size; 1133 1134 /* Check for truncation */ 1135 if (value != (size_t)value) { 1136 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size", 1137 "exceeding address space"); 1138 return; 1139 } 1140 1141 /* Cache should not be larger than guest ram size */ 1142 if (value > ram_bytes_total()) { 1143 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size", 1144 "exceeds guest ram size "); 1145 return; 1146 } 1147 1148 new_size = xbzrle_cache_resize(value); 1149 if (new_size < 0) { 1150 error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size", 1151 "is smaller than page size"); 1152 return; 1153 } 1154 1155 s->xbzrle_cache_size = new_size; 1156 } 1157 1158 int64_t qmp_query_migrate_cache_size(Error **errp) 1159 { 1160 return migrate_xbzrle_cache_size(); 1161 } 1162 1163 void qmp_migrate_set_speed(int64_t value, Error **errp) 1164 { 1165 MigrationState *s; 1166 1167 if (value < 0) { 1168 value = 0; 1169 } 1170 if (value > SIZE_MAX) { 1171 value = SIZE_MAX; 1172 } 1173 1174 s = migrate_get_current(); 1175 s->bandwidth_limit = value; 1176 if (s->to_dst_file) { 1177 qemu_file_set_rate_limit(s->to_dst_file, 1178 s->bandwidth_limit / XFER_LIMIT_RATIO); 1179 } 1180 } 1181 1182 void qmp_migrate_set_downtime(double value, Error **errp) 1183 { 1184 value *= 1e9; 1185 value = MAX(0, MIN(UINT64_MAX, value)); 1186 max_downtime = (uint64_t)value; 1187 } 1188 1189 bool migrate_postcopy_ram(void) 1190 { 1191 MigrationState *s; 1192 1193 s = migrate_get_current(); 1194 1195 return s->enabled_capabilities[MIGRATION_CAPABILITY_POSTCOPY_RAM]; 1196 } 1197 1198 bool migrate_auto_converge(void) 1199 { 1200 MigrationState *s; 1201 1202 s = migrate_get_current(); 1203 1204 return s->enabled_capabilities[MIGRATION_CAPABILITY_AUTO_CONVERGE]; 1205 } 1206 1207 bool migrate_zero_blocks(void) 1208 { 1209 MigrationState *s; 1210 1211 s = migrate_get_current(); 1212 1213 return s->enabled_capabilities[MIGRATION_CAPABILITY_ZERO_BLOCKS]; 1214 } 1215 1216 bool migrate_use_compression(void) 1217 { 1218 MigrationState *s; 1219 1220 s = migrate_get_current(); 1221 1222 return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS]; 1223 } 1224 1225 int migrate_compress_level(void) 1226 { 1227 MigrationState *s; 1228 1229 s = migrate_get_current(); 1230 1231 return s->parameters.compress_level; 1232 } 1233 1234 int migrate_compress_threads(void) 1235 { 1236 MigrationState *s; 1237 1238 s = migrate_get_current(); 1239 1240 return s->parameters.compress_threads; 1241 } 1242 1243 int migrate_decompress_threads(void) 1244 { 1245 MigrationState *s; 1246 1247 s = migrate_get_current(); 1248 1249 return s->parameters.decompress_threads; 1250 } 1251 1252 bool migrate_use_events(void) 1253 { 1254 MigrationState *s; 1255 1256 s = migrate_get_current(); 1257 1258 return s->enabled_capabilities[MIGRATION_CAPABILITY_EVENTS]; 1259 } 1260 1261 int migrate_use_xbzrle(void) 1262 { 1263 MigrationState *s; 1264 1265 s = migrate_get_current(); 1266 1267 return s->enabled_capabilities[MIGRATION_CAPABILITY_XBZRLE]; 1268 } 1269 1270 int64_t migrate_xbzrle_cache_size(void) 1271 { 1272 MigrationState *s; 1273 1274 s = migrate_get_current(); 1275 1276 return s->xbzrle_cache_size; 1277 } 1278 1279 /* migration thread support */ 1280 /* 1281 * Something bad happened to the RP stream, mark an error 1282 * The caller shall print or trace something to indicate why 1283 */ 1284 static void mark_source_rp_bad(MigrationState *s) 1285 { 1286 s->rp_state.error = true; 1287 } 1288 1289 static struct rp_cmd_args { 1290 ssize_t len; /* -1 = variable */ 1291 const char *name; 1292 } rp_cmd_args[] = { 1293 [MIG_RP_MSG_INVALID] = { .len = -1, .name = "INVALID" }, 1294 [MIG_RP_MSG_SHUT] = { .len = 4, .name = "SHUT" }, 1295 [MIG_RP_MSG_PONG] = { .len = 4, .name = "PONG" }, 1296 [MIG_RP_MSG_REQ_PAGES] = { .len = 12, .name = "REQ_PAGES" }, 1297 [MIG_RP_MSG_REQ_PAGES_ID] = { .len = -1, .name = "REQ_PAGES_ID" }, 1298 [MIG_RP_MSG_MAX] = { .len = -1, .name = "MAX" }, 1299 }; 1300 1301 /* 1302 * Process a request for pages received on the return path, 1303 * We're allowed to send more than requested (e.g. to round to our page size) 1304 * and we don't need to send pages that have already been sent. 1305 */ 1306 static void migrate_handle_rp_req_pages(MigrationState *ms, const char* rbname, 1307 ram_addr_t start, size_t len) 1308 { 1309 long our_host_ps = getpagesize(); 1310 1311 trace_migrate_handle_rp_req_pages(rbname, start, len); 1312 1313 /* 1314 * Since we currently insist on matching page sizes, just sanity check 1315 * we're being asked for whole host pages. 1316 */ 1317 if (start & (our_host_ps-1) || 1318 (len & (our_host_ps-1))) { 1319 error_report("%s: Misaligned page request, start: " RAM_ADDR_FMT 1320 " len: %zd", __func__, start, len); 1321 mark_source_rp_bad(ms); 1322 return; 1323 } 1324 1325 if (ram_save_queue_pages(ms, rbname, start, len)) { 1326 mark_source_rp_bad(ms); 1327 } 1328 } 1329 1330 /* 1331 * Handles messages sent on the return path towards the source VM 1332 * 1333 */ 1334 static void *source_return_path_thread(void *opaque) 1335 { 1336 MigrationState *ms = opaque; 1337 QEMUFile *rp = ms->rp_state.from_dst_file; 1338 uint16_t header_len, header_type; 1339 uint8_t buf[512]; 1340 uint32_t tmp32, sibling_error; 1341 ram_addr_t start = 0; /* =0 to silence warning */ 1342 size_t len = 0, expected_len; 1343 int res; 1344 1345 trace_source_return_path_thread_entry(); 1346 while (!ms->rp_state.error && !qemu_file_get_error(rp) && 1347 migration_is_setup_or_active(ms->state)) { 1348 trace_source_return_path_thread_loop_top(); 1349 header_type = qemu_get_be16(rp); 1350 header_len = qemu_get_be16(rp); 1351 1352 if (header_type >= MIG_RP_MSG_MAX || 1353 header_type == MIG_RP_MSG_INVALID) { 1354 error_report("RP: Received invalid message 0x%04x length 0x%04x", 1355 header_type, header_len); 1356 mark_source_rp_bad(ms); 1357 goto out; 1358 } 1359 1360 if ((rp_cmd_args[header_type].len != -1 && 1361 header_len != rp_cmd_args[header_type].len) || 1362 header_len > sizeof(buf)) { 1363 error_report("RP: Received '%s' message (0x%04x) with" 1364 "incorrect length %d expecting %zu", 1365 rp_cmd_args[header_type].name, header_type, header_len, 1366 (size_t)rp_cmd_args[header_type].len); 1367 mark_source_rp_bad(ms); 1368 goto out; 1369 } 1370 1371 /* We know we've got a valid header by this point */ 1372 res = qemu_get_buffer(rp, buf, header_len); 1373 if (res != header_len) { 1374 error_report("RP: Failed reading data for message 0x%04x" 1375 " read %d expected %d", 1376 header_type, res, header_len); 1377 mark_source_rp_bad(ms); 1378 goto out; 1379 } 1380 1381 /* OK, we have the message and the data */ 1382 switch (header_type) { 1383 case MIG_RP_MSG_SHUT: 1384 sibling_error = be32_to_cpup((uint32_t *)buf); 1385 trace_source_return_path_thread_shut(sibling_error); 1386 if (sibling_error) { 1387 error_report("RP: Sibling indicated error %d", sibling_error); 1388 mark_source_rp_bad(ms); 1389 } 1390 /* 1391 * We'll let the main thread deal with closing the RP 1392 * we could do a shutdown(2) on it, but we're the only user 1393 * anyway, so there's nothing gained. 1394 */ 1395 goto out; 1396 1397 case MIG_RP_MSG_PONG: 1398 tmp32 = be32_to_cpup((uint32_t *)buf); 1399 trace_source_return_path_thread_pong(tmp32); 1400 break; 1401 1402 case MIG_RP_MSG_REQ_PAGES: 1403 start = be64_to_cpup((uint64_t *)buf); 1404 len = be32_to_cpup((uint32_t *)(buf + 8)); 1405 migrate_handle_rp_req_pages(ms, NULL, start, len); 1406 break; 1407 1408 case MIG_RP_MSG_REQ_PAGES_ID: 1409 expected_len = 12 + 1; /* header + termination */ 1410 1411 if (header_len >= expected_len) { 1412 start = be64_to_cpup((uint64_t *)buf); 1413 len = be32_to_cpup((uint32_t *)(buf + 8)); 1414 /* Now we expect an idstr */ 1415 tmp32 = buf[12]; /* Length of the following idstr */ 1416 buf[13 + tmp32] = '\0'; 1417 expected_len += tmp32; 1418 } 1419 if (header_len != expected_len) { 1420 error_report("RP: Req_Page_id with length %d expecting %zd", 1421 header_len, expected_len); 1422 mark_source_rp_bad(ms); 1423 goto out; 1424 } 1425 migrate_handle_rp_req_pages(ms, (char *)&buf[13], start, len); 1426 break; 1427 1428 default: 1429 break; 1430 } 1431 } 1432 if (qemu_file_get_error(rp)) { 1433 trace_source_return_path_thread_bad_end(); 1434 mark_source_rp_bad(ms); 1435 } 1436 1437 trace_source_return_path_thread_end(); 1438 out: 1439 ms->rp_state.from_dst_file = NULL; 1440 qemu_fclose(rp); 1441 return NULL; 1442 } 1443 1444 static int open_return_path_on_source(MigrationState *ms) 1445 { 1446 1447 ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->to_dst_file); 1448 if (!ms->rp_state.from_dst_file) { 1449 return -1; 1450 } 1451 1452 trace_open_return_path_on_source(); 1453 qemu_thread_create(&ms->rp_state.rp_thread, "return path", 1454 source_return_path_thread, ms, QEMU_THREAD_JOINABLE); 1455 1456 trace_open_return_path_on_source_continue(); 1457 1458 return 0; 1459 } 1460 1461 /* Returns 0 if the RP was ok, otherwise there was an error on the RP */ 1462 static int await_return_path_close_on_source(MigrationState *ms) 1463 { 1464 /* 1465 * If this is a normal exit then the destination will send a SHUT and the 1466 * rp_thread will exit, however if there's an error we need to cause 1467 * it to exit. 1468 */ 1469 if (qemu_file_get_error(ms->to_dst_file) && ms->rp_state.from_dst_file) { 1470 /* 1471 * shutdown(2), if we have it, will cause it to unblock if it's stuck 1472 * waiting for the destination. 1473 */ 1474 qemu_file_shutdown(ms->rp_state.from_dst_file); 1475 mark_source_rp_bad(ms); 1476 } 1477 trace_await_return_path_close_on_source_joining(); 1478 qemu_thread_join(&ms->rp_state.rp_thread); 1479 trace_await_return_path_close_on_source_close(); 1480 return ms->rp_state.error; 1481 } 1482 1483 /* 1484 * Switch from normal iteration to postcopy 1485 * Returns non-0 on error 1486 */ 1487 static int postcopy_start(MigrationState *ms, bool *old_vm_running) 1488 { 1489 int ret; 1490 QIOChannelBuffer *bioc; 1491 QEMUFile *fb; 1492 int64_t time_at_stop = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); 1493 migrate_set_state(&ms->state, MIGRATION_STATUS_ACTIVE, 1494 MIGRATION_STATUS_POSTCOPY_ACTIVE); 1495 1496 trace_postcopy_start(); 1497 qemu_mutex_lock_iothread(); 1498 trace_postcopy_start_set_run(); 1499 1500 qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER); 1501 *old_vm_running = runstate_is_running(); 1502 global_state_store(); 1503 ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE); 1504 if (ret < 0) { 1505 goto fail; 1506 } 1507 1508 ret = bdrv_inactivate_all(); 1509 if (ret < 0) { 1510 goto fail; 1511 } 1512 1513 /* 1514 * Cause any non-postcopiable, but iterative devices to 1515 * send out their final data. 1516 */ 1517 qemu_savevm_state_complete_precopy(ms->to_dst_file, true); 1518 1519 /* 1520 * in Finish migrate and with the io-lock held everything should 1521 * be quiet, but we've potentially still got dirty pages and we 1522 * need to tell the destination to throw any pages it's already received 1523 * that are dirty 1524 */ 1525 if (ram_postcopy_send_discard_bitmap(ms)) { 1526 error_report("postcopy send discard bitmap failed"); 1527 goto fail; 1528 } 1529 1530 /* 1531 * send rest of state - note things that are doing postcopy 1532 * will notice we're in POSTCOPY_ACTIVE and not actually 1533 * wrap their state up here 1534 */ 1535 qemu_file_set_rate_limit(ms->to_dst_file, INT64_MAX); 1536 /* Ping just for debugging, helps line traces up */ 1537 qemu_savevm_send_ping(ms->to_dst_file, 2); 1538 1539 /* 1540 * While loading the device state we may trigger page transfer 1541 * requests and the fd must be free to process those, and thus 1542 * the destination must read the whole device state off the fd before 1543 * it starts processing it. Unfortunately the ad-hoc migration format 1544 * doesn't allow the destination to know the size to read without fully 1545 * parsing it through each devices load-state code (especially the open 1546 * coded devices that use get/put). 1547 * So we wrap the device state up in a package with a length at the start; 1548 * to do this we use a qemu_buf to hold the whole of the device state. 1549 */ 1550 bioc = qio_channel_buffer_new(4096); 1551 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); 1552 object_unref(OBJECT(bioc)); 1553 1554 /* 1555 * Make sure the receiver can get incoming pages before we send the rest 1556 * of the state 1557 */ 1558 qemu_savevm_send_postcopy_listen(fb); 1559 1560 qemu_savevm_state_complete_precopy(fb, false); 1561 qemu_savevm_send_ping(fb, 3); 1562 1563 qemu_savevm_send_postcopy_run(fb); 1564 1565 /* <><> end of stuff going into the package */ 1566 1567 /* Now send that blob */ 1568 if (qemu_savevm_send_packaged(ms->to_dst_file, bioc->data, bioc->usage)) { 1569 goto fail_closefb; 1570 } 1571 qemu_fclose(fb); 1572 1573 /* Send a notify to give a chance for anything that needs to happen 1574 * at the transition to postcopy and after the device state; in particular 1575 * spice needs to trigger a transition now 1576 */ 1577 ms->postcopy_after_devices = true; 1578 notifier_list_notify(&migration_state_notifiers, ms); 1579 1580 ms->downtime = qemu_clock_get_ms(QEMU_CLOCK_REALTIME) - time_at_stop; 1581 1582 qemu_mutex_unlock_iothread(); 1583 1584 /* 1585 * Although this ping is just for debug, it could potentially be 1586 * used for getting a better measurement of downtime at the source. 1587 */ 1588 qemu_savevm_send_ping(ms->to_dst_file, 4); 1589 1590 ret = qemu_file_get_error(ms->to_dst_file); 1591 if (ret) { 1592 error_report("postcopy_start: Migration stream errored"); 1593 migrate_set_state(&ms->state, MIGRATION_STATUS_POSTCOPY_ACTIVE, 1594 MIGRATION_STATUS_FAILED); 1595 } 1596 1597 return ret; 1598 1599 fail_closefb: 1600 qemu_fclose(fb); 1601 fail: 1602 migrate_set_state(&ms->state, MIGRATION_STATUS_POSTCOPY_ACTIVE, 1603 MIGRATION_STATUS_FAILED); 1604 qemu_mutex_unlock_iothread(); 1605 return -1; 1606 } 1607 1608 /** 1609 * migration_completion: Used by migration_thread when there's not much left. 1610 * The caller 'breaks' the loop when this returns. 1611 * 1612 * @s: Current migration state 1613 * @current_active_state: The migration state we expect to be in 1614 * @*old_vm_running: Pointer to old_vm_running flag 1615 * @*start_time: Pointer to time to update 1616 */ 1617 static void migration_completion(MigrationState *s, int current_active_state, 1618 bool *old_vm_running, 1619 int64_t *start_time) 1620 { 1621 int ret; 1622 1623 if (s->state == MIGRATION_STATUS_ACTIVE) { 1624 qemu_mutex_lock_iothread(); 1625 *start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); 1626 qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER); 1627 *old_vm_running = runstate_is_running(); 1628 ret = global_state_store(); 1629 1630 if (!ret) { 1631 ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE); 1632 if (ret >= 0) { 1633 ret = bdrv_inactivate_all(); 1634 } 1635 if (ret >= 0) { 1636 qemu_file_set_rate_limit(s->to_dst_file, INT64_MAX); 1637 qemu_savevm_state_complete_precopy(s->to_dst_file, false); 1638 } 1639 } 1640 qemu_mutex_unlock_iothread(); 1641 1642 if (ret < 0) { 1643 goto fail; 1644 } 1645 } else if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { 1646 trace_migration_completion_postcopy_end(); 1647 1648 qemu_savevm_state_complete_postcopy(s->to_dst_file); 1649 trace_migration_completion_postcopy_end_after_complete(); 1650 } 1651 1652 /* 1653 * If rp was opened we must clean up the thread before 1654 * cleaning everything else up (since if there are no failures 1655 * it will wait for the destination to send it's status in 1656 * a SHUT command). 1657 * Postcopy opens rp if enabled (even if it's not avtivated) 1658 */ 1659 if (migrate_postcopy_ram()) { 1660 int rp_error; 1661 trace_migration_completion_postcopy_end_before_rp(); 1662 rp_error = await_return_path_close_on_source(s); 1663 trace_migration_completion_postcopy_end_after_rp(rp_error); 1664 if (rp_error) { 1665 goto fail_invalidate; 1666 } 1667 } 1668 1669 if (qemu_file_get_error(s->to_dst_file)) { 1670 trace_migration_completion_file_err(); 1671 goto fail_invalidate; 1672 } 1673 1674 migrate_set_state(&s->state, current_active_state, 1675 MIGRATION_STATUS_COMPLETED); 1676 return; 1677 1678 fail_invalidate: 1679 /* If not doing postcopy, vm_start() will be called: let's regain 1680 * control on images. 1681 */ 1682 if (s->state == MIGRATION_STATUS_ACTIVE) { 1683 Error *local_err = NULL; 1684 1685 bdrv_invalidate_cache_all(&local_err); 1686 if (local_err) { 1687 error_report_err(local_err); 1688 } 1689 } 1690 1691 fail: 1692 migrate_set_state(&s->state, current_active_state, 1693 MIGRATION_STATUS_FAILED); 1694 } 1695 1696 /* 1697 * Master migration thread on the source VM. 1698 * It drives the migration and pumps the data down the outgoing channel. 1699 */ 1700 static void *migration_thread(void *opaque) 1701 { 1702 MigrationState *s = opaque; 1703 /* Used by the bandwidth calcs, updated later */ 1704 int64_t initial_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); 1705 int64_t setup_start = qemu_clock_get_ms(QEMU_CLOCK_HOST); 1706 int64_t initial_bytes = 0; 1707 int64_t max_size = 0; 1708 int64_t start_time = initial_time; 1709 int64_t end_time; 1710 bool old_vm_running = false; 1711 bool entered_postcopy = false; 1712 /* The active state we expect to be in; ACTIVE or POSTCOPY_ACTIVE */ 1713 enum MigrationStatus current_active_state = MIGRATION_STATUS_ACTIVE; 1714 1715 rcu_register_thread(); 1716 1717 qemu_savevm_state_header(s->to_dst_file); 1718 1719 if (migrate_postcopy_ram()) { 1720 /* Now tell the dest that it should open its end so it can reply */ 1721 qemu_savevm_send_open_return_path(s->to_dst_file); 1722 1723 /* And do a ping that will make stuff easier to debug */ 1724 qemu_savevm_send_ping(s->to_dst_file, 1); 1725 1726 /* 1727 * Tell the destination that we *might* want to do postcopy later; 1728 * if the other end can't do postcopy it should fail now, nice and 1729 * early. 1730 */ 1731 qemu_savevm_send_postcopy_advise(s->to_dst_file); 1732 } 1733 1734 qemu_savevm_state_begin(s->to_dst_file, &s->params); 1735 1736 s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start; 1737 current_active_state = MIGRATION_STATUS_ACTIVE; 1738 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1739 MIGRATION_STATUS_ACTIVE); 1740 1741 trace_migration_thread_setup_complete(); 1742 1743 while (s->state == MIGRATION_STATUS_ACTIVE || 1744 s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { 1745 int64_t current_time; 1746 uint64_t pending_size; 1747 1748 if (!qemu_file_rate_limit(s->to_dst_file)) { 1749 uint64_t pend_post, pend_nonpost; 1750 1751 qemu_savevm_state_pending(s->to_dst_file, max_size, &pend_nonpost, 1752 &pend_post); 1753 pending_size = pend_nonpost + pend_post; 1754 trace_migrate_pending(pending_size, max_size, 1755 pend_post, pend_nonpost); 1756 if (pending_size && pending_size >= max_size) { 1757 /* Still a significant amount to transfer */ 1758 1759 if (migrate_postcopy_ram() && 1760 s->state != MIGRATION_STATUS_POSTCOPY_ACTIVE && 1761 pend_nonpost <= max_size && 1762 atomic_read(&s->start_postcopy)) { 1763 1764 if (!postcopy_start(s, &old_vm_running)) { 1765 current_active_state = MIGRATION_STATUS_POSTCOPY_ACTIVE; 1766 entered_postcopy = true; 1767 } 1768 1769 continue; 1770 } 1771 /* Just another iteration step */ 1772 qemu_savevm_state_iterate(s->to_dst_file, entered_postcopy); 1773 } else { 1774 trace_migration_thread_low_pending(pending_size); 1775 migration_completion(s, current_active_state, 1776 &old_vm_running, &start_time); 1777 break; 1778 } 1779 } 1780 1781 if (qemu_file_get_error(s->to_dst_file)) { 1782 migrate_set_state(&s->state, current_active_state, 1783 MIGRATION_STATUS_FAILED); 1784 trace_migration_thread_file_err(); 1785 break; 1786 } 1787 current_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); 1788 if (current_time >= initial_time + BUFFER_DELAY) { 1789 uint64_t transferred_bytes = qemu_ftell(s->to_dst_file) - 1790 initial_bytes; 1791 uint64_t time_spent = current_time - initial_time; 1792 double bandwidth = (double)transferred_bytes / time_spent; 1793 max_size = bandwidth * migrate_max_downtime() / 1000000; 1794 1795 s->mbps = (((double) transferred_bytes * 8.0) / 1796 ((double) time_spent / 1000.0)) / 1000.0 / 1000.0; 1797 1798 trace_migrate_transferred(transferred_bytes, time_spent, 1799 bandwidth, max_size); 1800 /* if we haven't sent anything, we don't want to recalculate 1801 10000 is a small enough number for our purposes */ 1802 if (s->dirty_bytes_rate && transferred_bytes > 10000) { 1803 s->expected_downtime = s->dirty_bytes_rate / bandwidth; 1804 } 1805 1806 qemu_file_reset_rate_limit(s->to_dst_file); 1807 initial_time = current_time; 1808 initial_bytes = qemu_ftell(s->to_dst_file); 1809 } 1810 if (qemu_file_rate_limit(s->to_dst_file)) { 1811 /* usleep expects microseconds */ 1812 g_usleep((initial_time + BUFFER_DELAY - current_time)*1000); 1813 } 1814 } 1815 1816 trace_migration_thread_after_loop(); 1817 /* If we enabled cpu throttling for auto-converge, turn it off. */ 1818 cpu_throttle_stop(); 1819 end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME); 1820 1821 qemu_mutex_lock_iothread(); 1822 qemu_savevm_state_cleanup(); 1823 if (s->state == MIGRATION_STATUS_COMPLETED) { 1824 uint64_t transferred_bytes = qemu_ftell(s->to_dst_file); 1825 s->total_time = end_time - s->total_time; 1826 if (!entered_postcopy) { 1827 s->downtime = end_time - start_time; 1828 } 1829 if (s->total_time) { 1830 s->mbps = (((double) transferred_bytes * 8.0) / 1831 ((double) s->total_time)) / 1000; 1832 } 1833 runstate_set(RUN_STATE_POSTMIGRATE); 1834 } else { 1835 if (old_vm_running && !entered_postcopy) { 1836 vm_start(); 1837 } 1838 } 1839 qemu_bh_schedule(s->cleanup_bh); 1840 qemu_mutex_unlock_iothread(); 1841 1842 rcu_unregister_thread(); 1843 return NULL; 1844 } 1845 1846 void migrate_fd_connect(MigrationState *s) 1847 { 1848 /* This is a best 1st approximation. ns to ms */ 1849 s->expected_downtime = max_downtime/1000000; 1850 s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s); 1851 1852 qemu_file_set_blocking(s->to_dst_file, true); 1853 qemu_file_set_rate_limit(s->to_dst_file, 1854 s->bandwidth_limit / XFER_LIMIT_RATIO); 1855 1856 /* Notify before starting migration thread */ 1857 notifier_list_notify(&migration_state_notifiers, s); 1858 1859 /* 1860 * Open the return path; currently for postcopy but other things might 1861 * also want it. 1862 */ 1863 if (migrate_postcopy_ram()) { 1864 if (open_return_path_on_source(s)) { 1865 error_report("Unable to open return-path for postcopy"); 1866 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP, 1867 MIGRATION_STATUS_FAILED); 1868 migrate_fd_cleanup(s); 1869 return; 1870 } 1871 } 1872 1873 migrate_compress_threads_create(); 1874 qemu_thread_create(&s->thread, "migration", migration_thread, s, 1875 QEMU_THREAD_JOINABLE); 1876 s->migration_thread_running = true; 1877 } 1878 1879 PostcopyState postcopy_state_get(void) 1880 { 1881 return atomic_mb_read(&incoming_postcopy_state); 1882 } 1883 1884 /* Set the state and return the old state */ 1885 PostcopyState postcopy_state_set(PostcopyState new_state) 1886 { 1887 return atomic_xchg(&incoming_postcopy_state, new_state); 1888 } 1889 1890