1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/timer.h" 15 #include "sysemu/sysemu.h" 16 #include "qemu-file-channel.h" 17 #include "migration.h" 18 #include "qemu-file.h" 19 #include "savevm.h" 20 #include "migration/colo.h" 21 #include "block.h" 22 #include "io/channel-buffer.h" 23 #include "trace.h" 24 #include "qemu/error-report.h" 25 #include "qapi/error.h" 26 #include "migration/failover.h" 27 #include "replication.h" 28 #include "qmp-commands.h" 29 30 static bool vmstate_loading; 31 32 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) 33 34 bool colo_supported(void) 35 { 36 return true; 37 } 38 39 bool migration_in_colo_state(void) 40 { 41 MigrationState *s = migrate_get_current(); 42 43 return (s->state == MIGRATION_STATUS_COLO); 44 } 45 46 bool migration_incoming_in_colo_state(void) 47 { 48 MigrationIncomingState *mis = migration_incoming_get_current(); 49 50 return mis && (mis->state == MIGRATION_STATUS_COLO); 51 } 52 53 static bool colo_runstate_is_stopped(void) 54 { 55 return runstate_check(RUN_STATE_COLO) || !runstate_is_running(); 56 } 57 58 static void secondary_vm_do_failover(void) 59 { 60 int old_state; 61 MigrationIncomingState *mis = migration_incoming_get_current(); 62 63 /* Can not do failover during the process of VM's loading VMstate, Or 64 * it will break the secondary VM. 65 */ 66 if (vmstate_loading) { 67 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 68 FAILOVER_STATUS_RELAUNCH); 69 if (old_state != FAILOVER_STATUS_ACTIVE) { 70 error_report("Unknown error while do failover for secondary VM," 71 "old_state: %s", FailoverStatus_lookup[old_state]); 72 } 73 return; 74 } 75 76 migrate_set_state(&mis->state, MIGRATION_STATUS_COLO, 77 MIGRATION_STATUS_COMPLETED); 78 79 if (!autostart) { 80 error_report("\"-S\" qemu option will be ignored in secondary side"); 81 /* recover runstate to normal migration finish state */ 82 autostart = true; 83 } 84 /* 85 * Make sure COLO incoming thread not block in recv or send, 86 * If mis->from_src_file and mis->to_src_file use the same fd, 87 * The second shutdown() will return -1, we ignore this value, 88 * It is harmless. 89 */ 90 if (mis->from_src_file) { 91 qemu_file_shutdown(mis->from_src_file); 92 } 93 if (mis->to_src_file) { 94 qemu_file_shutdown(mis->to_src_file); 95 } 96 97 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 98 FAILOVER_STATUS_COMPLETED); 99 if (old_state != FAILOVER_STATUS_ACTIVE) { 100 error_report("Incorrect state (%s) while doing failover for " 101 "secondary VM", FailoverStatus_lookup[old_state]); 102 return; 103 } 104 /* Notify COLO incoming thread that failover work is finished */ 105 qemu_sem_post(&mis->colo_incoming_sem); 106 /* For Secondary VM, jump to incoming co */ 107 if (mis->migration_incoming_co) { 108 qemu_coroutine_enter(mis->migration_incoming_co); 109 } 110 } 111 112 static void primary_vm_do_failover(void) 113 { 114 MigrationState *s = migrate_get_current(); 115 int old_state; 116 117 migrate_set_state(&s->state, MIGRATION_STATUS_COLO, 118 MIGRATION_STATUS_COMPLETED); 119 120 /* 121 * Wake up COLO thread which may blocked in recv() or send(), 122 * The s->rp_state.from_dst_file and s->to_dst_file may use the 123 * same fd, but we still shutdown the fd for twice, it is harmless. 124 */ 125 if (s->to_dst_file) { 126 qemu_file_shutdown(s->to_dst_file); 127 } 128 if (s->rp_state.from_dst_file) { 129 qemu_file_shutdown(s->rp_state.from_dst_file); 130 } 131 132 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 133 FAILOVER_STATUS_COMPLETED); 134 if (old_state != FAILOVER_STATUS_ACTIVE) { 135 error_report("Incorrect state (%s) while doing failover for Primary VM", 136 FailoverStatus_lookup[old_state]); 137 return; 138 } 139 /* Notify COLO thread that failover work is finished */ 140 qemu_sem_post(&s->colo_exit_sem); 141 } 142 143 void colo_do_failover(MigrationState *s) 144 { 145 /* Make sure VM stopped while failover happened. */ 146 if (!colo_runstate_is_stopped()) { 147 vm_stop_force_state(RUN_STATE_COLO); 148 } 149 150 if (get_colo_mode() == COLO_MODE_PRIMARY) { 151 primary_vm_do_failover(); 152 } else { 153 secondary_vm_do_failover(); 154 } 155 } 156 157 void qmp_xen_set_replication(bool enable, bool primary, 158 bool has_failover, bool failover, 159 Error **errp) 160 { 161 #ifdef CONFIG_REPLICATION 162 ReplicationMode mode = primary ? 163 REPLICATION_MODE_PRIMARY : 164 REPLICATION_MODE_SECONDARY; 165 166 if (has_failover && enable) { 167 error_setg(errp, "Parameter 'failover' is only for" 168 " stopping replication"); 169 return; 170 } 171 172 if (enable) { 173 replication_start_all(mode, errp); 174 } else { 175 if (!has_failover) { 176 failover = NULL; 177 } 178 replication_stop_all(failover, failover ? NULL : errp); 179 } 180 #else 181 abort(); 182 #endif 183 } 184 185 ReplicationStatus *qmp_query_xen_replication_status(Error **errp) 186 { 187 #ifdef CONFIG_REPLICATION 188 Error *err = NULL; 189 ReplicationStatus *s = g_new0(ReplicationStatus, 1); 190 191 replication_get_error_all(&err); 192 if (err) { 193 s->error = true; 194 s->has_desc = true; 195 s->desc = g_strdup(error_get_pretty(err)); 196 } else { 197 s->error = false; 198 } 199 200 error_free(err); 201 return s; 202 #else 203 abort(); 204 #endif 205 } 206 207 void qmp_xen_colo_do_checkpoint(Error **errp) 208 { 209 #ifdef CONFIG_REPLICATION 210 replication_do_checkpoint_all(errp); 211 #else 212 abort(); 213 #endif 214 } 215 216 static void colo_send_message(QEMUFile *f, COLOMessage msg, 217 Error **errp) 218 { 219 int ret; 220 221 if (msg >= COLO_MESSAGE__MAX) { 222 error_setg(errp, "%s: Invalid message", __func__); 223 return; 224 } 225 qemu_put_be32(f, msg); 226 qemu_fflush(f); 227 228 ret = qemu_file_get_error(f); 229 if (ret < 0) { 230 error_setg_errno(errp, -ret, "Can't send COLO message"); 231 } 232 trace_colo_send_message(COLOMessage_lookup[msg]); 233 } 234 235 static void colo_send_message_value(QEMUFile *f, COLOMessage msg, 236 uint64_t value, Error **errp) 237 { 238 Error *local_err = NULL; 239 int ret; 240 241 colo_send_message(f, msg, &local_err); 242 if (local_err) { 243 error_propagate(errp, local_err); 244 return; 245 } 246 qemu_put_be64(f, value); 247 qemu_fflush(f); 248 249 ret = qemu_file_get_error(f); 250 if (ret < 0) { 251 error_setg_errno(errp, -ret, "Failed to send value for message:%s", 252 COLOMessage_lookup[msg]); 253 } 254 } 255 256 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) 257 { 258 COLOMessage msg; 259 int ret; 260 261 msg = qemu_get_be32(f); 262 ret = qemu_file_get_error(f); 263 if (ret < 0) { 264 error_setg_errno(errp, -ret, "Can't receive COLO message"); 265 return msg; 266 } 267 if (msg >= COLO_MESSAGE__MAX) { 268 error_setg(errp, "%s: Invalid message", __func__); 269 return msg; 270 } 271 trace_colo_receive_message(COLOMessage_lookup[msg]); 272 return msg; 273 } 274 275 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, 276 Error **errp) 277 { 278 COLOMessage msg; 279 Error *local_err = NULL; 280 281 msg = colo_receive_message(f, &local_err); 282 if (local_err) { 283 error_propagate(errp, local_err); 284 return; 285 } 286 if (msg != expect_msg) { 287 error_setg(errp, "Unexpected COLO message %d, expected %d", 288 msg, expect_msg); 289 } 290 } 291 292 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, 293 Error **errp) 294 { 295 Error *local_err = NULL; 296 uint64_t value; 297 int ret; 298 299 colo_receive_check_message(f, expect_msg, &local_err); 300 if (local_err) { 301 error_propagate(errp, local_err); 302 return 0; 303 } 304 305 value = qemu_get_be64(f); 306 ret = qemu_file_get_error(f); 307 if (ret < 0) { 308 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", 309 COLOMessage_lookup[expect_msg]); 310 } 311 return value; 312 } 313 314 static int colo_do_checkpoint_transaction(MigrationState *s, 315 QIOChannelBuffer *bioc, 316 QEMUFile *fb) 317 { 318 Error *local_err = NULL; 319 int ret = -1; 320 321 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, 322 &local_err); 323 if (local_err) { 324 goto out; 325 } 326 327 colo_receive_check_message(s->rp_state.from_dst_file, 328 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); 329 if (local_err) { 330 goto out; 331 } 332 /* Reset channel-buffer directly */ 333 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 334 bioc->usage = 0; 335 336 qemu_mutex_lock_iothread(); 337 if (failover_get_state() != FAILOVER_STATUS_NONE) { 338 qemu_mutex_unlock_iothread(); 339 goto out; 340 } 341 vm_stop_force_state(RUN_STATE_COLO); 342 qemu_mutex_unlock_iothread(); 343 trace_colo_vm_state_change("run", "stop"); 344 /* 345 * Failover request bh could be called after vm_stop_force_state(), 346 * So we need check failover_request_is_active() again. 347 */ 348 if (failover_get_state() != FAILOVER_STATUS_NONE) { 349 goto out; 350 } 351 352 /* Disable block migration */ 353 migrate_set_block_enabled(false, &local_err); 354 qemu_savevm_state_header(fb); 355 qemu_savevm_state_begin(fb); 356 qemu_mutex_lock_iothread(); 357 qemu_savevm_state_complete_precopy(fb, false); 358 qemu_mutex_unlock_iothread(); 359 360 qemu_fflush(fb); 361 362 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); 363 if (local_err) { 364 goto out; 365 } 366 /* 367 * We need the size of the VMstate data in Secondary side, 368 * With which we can decide how much data should be read. 369 */ 370 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, 371 bioc->usage, &local_err); 372 if (local_err) { 373 goto out; 374 } 375 376 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); 377 qemu_fflush(s->to_dst_file); 378 ret = qemu_file_get_error(s->to_dst_file); 379 if (ret < 0) { 380 goto out; 381 } 382 383 colo_receive_check_message(s->rp_state.from_dst_file, 384 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); 385 if (local_err) { 386 goto out; 387 } 388 389 colo_receive_check_message(s->rp_state.from_dst_file, 390 COLO_MESSAGE_VMSTATE_LOADED, &local_err); 391 if (local_err) { 392 goto out; 393 } 394 395 ret = 0; 396 397 qemu_mutex_lock_iothread(); 398 vm_start(); 399 qemu_mutex_unlock_iothread(); 400 trace_colo_vm_state_change("stop", "run"); 401 402 out: 403 if (local_err) { 404 error_report_err(local_err); 405 } 406 return ret; 407 } 408 409 static void colo_process_checkpoint(MigrationState *s) 410 { 411 QIOChannelBuffer *bioc; 412 QEMUFile *fb = NULL; 413 int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 414 Error *local_err = NULL; 415 int ret; 416 417 failover_init_state(); 418 419 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); 420 if (!s->rp_state.from_dst_file) { 421 error_report("Open QEMUFile from_dst_file failed"); 422 goto out; 423 } 424 425 /* 426 * Wait for Secondary finish loading VM states and enter COLO 427 * restore. 428 */ 429 colo_receive_check_message(s->rp_state.from_dst_file, 430 COLO_MESSAGE_CHECKPOINT_READY, &local_err); 431 if (local_err) { 432 goto out; 433 } 434 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 435 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); 436 object_unref(OBJECT(bioc)); 437 438 qemu_mutex_lock_iothread(); 439 vm_start(); 440 qemu_mutex_unlock_iothread(); 441 trace_colo_vm_state_change("stop", "run"); 442 443 timer_mod(s->colo_delay_timer, 444 current_time + s->parameters.x_checkpoint_delay); 445 446 while (s->state == MIGRATION_STATUS_COLO) { 447 if (failover_get_state() != FAILOVER_STATUS_NONE) { 448 error_report("failover request"); 449 goto out; 450 } 451 452 qemu_sem_wait(&s->colo_checkpoint_sem); 453 454 ret = colo_do_checkpoint_transaction(s, bioc, fb); 455 if (ret < 0) { 456 goto out; 457 } 458 } 459 460 out: 461 /* Throw the unreported error message after exited from loop */ 462 if (local_err) { 463 error_report_err(local_err); 464 } 465 466 if (fb) { 467 qemu_fclose(fb); 468 } 469 470 timer_del(s->colo_delay_timer); 471 472 /* Hope this not to be too long to wait here */ 473 qemu_sem_wait(&s->colo_exit_sem); 474 qemu_sem_destroy(&s->colo_exit_sem); 475 /* 476 * Must be called after failover BH is completed, 477 * Or the failover BH may shutdown the wrong fd that 478 * re-used by other threads after we release here. 479 */ 480 if (s->rp_state.from_dst_file) { 481 qemu_fclose(s->rp_state.from_dst_file); 482 } 483 } 484 485 void colo_checkpoint_notify(void *opaque) 486 { 487 MigrationState *s = opaque; 488 int64_t next_notify_time; 489 490 qemu_sem_post(&s->colo_checkpoint_sem); 491 s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 492 next_notify_time = s->colo_checkpoint_time + 493 s->parameters.x_checkpoint_delay; 494 timer_mod(s->colo_delay_timer, next_notify_time); 495 } 496 497 void migrate_start_colo_process(MigrationState *s) 498 { 499 qemu_mutex_unlock_iothread(); 500 qemu_sem_init(&s->colo_checkpoint_sem, 0); 501 s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST, 502 colo_checkpoint_notify, s); 503 504 qemu_sem_init(&s->colo_exit_sem, 0); 505 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE, 506 MIGRATION_STATUS_COLO); 507 colo_process_checkpoint(s); 508 qemu_mutex_lock_iothread(); 509 } 510 511 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request, 512 Error **errp) 513 { 514 COLOMessage msg; 515 Error *local_err = NULL; 516 517 msg = colo_receive_message(f, &local_err); 518 if (local_err) { 519 error_propagate(errp, local_err); 520 return; 521 } 522 523 switch (msg) { 524 case COLO_MESSAGE_CHECKPOINT_REQUEST: 525 *checkpoint_request = 1; 526 break; 527 default: 528 *checkpoint_request = 0; 529 error_setg(errp, "Got unknown COLO message: %d", msg); 530 break; 531 } 532 } 533 534 void *colo_process_incoming_thread(void *opaque) 535 { 536 MigrationIncomingState *mis = opaque; 537 QEMUFile *fb = NULL; 538 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ 539 uint64_t total_size; 540 uint64_t value; 541 Error *local_err = NULL; 542 543 qemu_sem_init(&mis->colo_incoming_sem, 0); 544 545 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 546 MIGRATION_STATUS_COLO); 547 548 failover_init_state(); 549 550 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); 551 if (!mis->to_src_file) { 552 error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); 553 goto out; 554 } 555 /* 556 * Note: the communication between Primary side and Secondary side 557 * should be sequential, we set the fd to unblocked in migration incoming 558 * coroutine, and here we are in the COLO incoming thread, so it is ok to 559 * set the fd back to blocked. 560 */ 561 qemu_file_set_blocking(mis->from_src_file, true); 562 563 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 564 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); 565 object_unref(OBJECT(bioc)); 566 567 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, 568 &local_err); 569 if (local_err) { 570 goto out; 571 } 572 573 while (mis->state == MIGRATION_STATUS_COLO) { 574 int request = 0; 575 576 colo_wait_handle_message(mis->from_src_file, &request, &local_err); 577 if (local_err) { 578 goto out; 579 } 580 assert(request); 581 if (failover_get_state() != FAILOVER_STATUS_NONE) { 582 error_report("failover request"); 583 goto out; 584 } 585 586 /* FIXME: This is unnecessary for periodic checkpoint mode */ 587 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, 588 &local_err); 589 if (local_err) { 590 goto out; 591 } 592 593 colo_receive_check_message(mis->from_src_file, 594 COLO_MESSAGE_VMSTATE_SEND, &local_err); 595 if (local_err) { 596 goto out; 597 } 598 599 value = colo_receive_message_value(mis->from_src_file, 600 COLO_MESSAGE_VMSTATE_SIZE, &local_err); 601 if (local_err) { 602 goto out; 603 } 604 605 /* 606 * Read VM device state data into channel buffer, 607 * It's better to re-use the memory allocated. 608 * Here we need to handle the channel buffer directly. 609 */ 610 if (value > bioc->capacity) { 611 bioc->capacity = value; 612 bioc->data = g_realloc(bioc->data, bioc->capacity); 613 } 614 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); 615 if (total_size != value) { 616 error_report("Got %" PRIu64 " VMState data, less than expected" 617 " %" PRIu64, total_size, value); 618 goto out; 619 } 620 bioc->usage = total_size; 621 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 622 623 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, 624 &local_err); 625 if (local_err) { 626 goto out; 627 } 628 629 qemu_mutex_lock_iothread(); 630 qemu_system_reset(SHUTDOWN_CAUSE_NONE); 631 vmstate_loading = true; 632 if (qemu_loadvm_state(fb) < 0) { 633 error_report("COLO: loadvm failed"); 634 qemu_mutex_unlock_iothread(); 635 goto out; 636 } 637 638 vmstate_loading = false; 639 qemu_mutex_unlock_iothread(); 640 641 if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { 642 failover_set_state(FAILOVER_STATUS_RELAUNCH, 643 FAILOVER_STATUS_NONE); 644 failover_request_active(NULL); 645 goto out; 646 } 647 648 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, 649 &local_err); 650 if (local_err) { 651 goto out; 652 } 653 } 654 655 out: 656 vmstate_loading = false; 657 /* Throw the unreported error message after exited from loop */ 658 if (local_err) { 659 error_report_err(local_err); 660 } 661 662 if (fb) { 663 qemu_fclose(fb); 664 } 665 666 /* Hope this not to be too long to loop here */ 667 qemu_sem_wait(&mis->colo_incoming_sem); 668 qemu_sem_destroy(&mis->colo_incoming_sem); 669 /* Must be called after failover BH is completed */ 670 if (mis->to_src_file) { 671 qemu_fclose(mis->to_src_file); 672 } 673 migration_incoming_exit_colo(); 674 675 return NULL; 676 } 677