1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "sysemu/sysemu.h" 15 #include "qemu-file-channel.h" 16 #include "migration.h" 17 #include "qemu-file.h" 18 #include "savevm.h" 19 #include "migration/colo.h" 20 #include "block.h" 21 #include "io/channel-buffer.h" 22 #include "trace.h" 23 #include "qemu/error-report.h" 24 #include "migration/failover.h" 25 #include "replication.h" 26 #include "qmp-commands.h" 27 28 static bool vmstate_loading; 29 30 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) 31 32 bool colo_supported(void) 33 { 34 return true; 35 } 36 37 bool migration_in_colo_state(void) 38 { 39 MigrationState *s = migrate_get_current(); 40 41 return (s->state == MIGRATION_STATUS_COLO); 42 } 43 44 bool migration_incoming_in_colo_state(void) 45 { 46 MigrationIncomingState *mis = migration_incoming_get_current(); 47 48 return mis && (mis->state == MIGRATION_STATUS_COLO); 49 } 50 51 static bool colo_runstate_is_stopped(void) 52 { 53 return runstate_check(RUN_STATE_COLO) || !runstate_is_running(); 54 } 55 56 static void secondary_vm_do_failover(void) 57 { 58 int old_state; 59 MigrationIncomingState *mis = migration_incoming_get_current(); 60 61 /* Can not do failover during the process of VM's loading VMstate, Or 62 * it will break the secondary VM. 63 */ 64 if (vmstate_loading) { 65 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 66 FAILOVER_STATUS_RELAUNCH); 67 if (old_state != FAILOVER_STATUS_ACTIVE) { 68 error_report("Unknown error while do failover for secondary VM," 69 "old_state: %s", FailoverStatus_lookup[old_state]); 70 } 71 return; 72 } 73 74 migrate_set_state(&mis->state, MIGRATION_STATUS_COLO, 75 MIGRATION_STATUS_COMPLETED); 76 77 if (!autostart) { 78 error_report("\"-S\" qemu option will be ignored in secondary side"); 79 /* recover runstate to normal migration finish state */ 80 autostart = true; 81 } 82 /* 83 * Make sure COLO incoming thread not block in recv or send, 84 * If mis->from_src_file and mis->to_src_file use the same fd, 85 * The second shutdown() will return -1, we ignore this value, 86 * It is harmless. 87 */ 88 if (mis->from_src_file) { 89 qemu_file_shutdown(mis->from_src_file); 90 } 91 if (mis->to_src_file) { 92 qemu_file_shutdown(mis->to_src_file); 93 } 94 95 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 96 FAILOVER_STATUS_COMPLETED); 97 if (old_state != FAILOVER_STATUS_ACTIVE) { 98 error_report("Incorrect state (%s) while doing failover for " 99 "secondary VM", FailoverStatus_lookup[old_state]); 100 return; 101 } 102 /* Notify COLO incoming thread that failover work is finished */ 103 qemu_sem_post(&mis->colo_incoming_sem); 104 /* For Secondary VM, jump to incoming co */ 105 if (mis->migration_incoming_co) { 106 qemu_coroutine_enter(mis->migration_incoming_co); 107 } 108 } 109 110 static void primary_vm_do_failover(void) 111 { 112 MigrationState *s = migrate_get_current(); 113 int old_state; 114 115 migrate_set_state(&s->state, MIGRATION_STATUS_COLO, 116 MIGRATION_STATUS_COMPLETED); 117 118 /* 119 * Wake up COLO thread which may blocked in recv() or send(), 120 * The s->rp_state.from_dst_file and s->to_dst_file may use the 121 * same fd, but we still shutdown the fd for twice, it is harmless. 122 */ 123 if (s->to_dst_file) { 124 qemu_file_shutdown(s->to_dst_file); 125 } 126 if (s->rp_state.from_dst_file) { 127 qemu_file_shutdown(s->rp_state.from_dst_file); 128 } 129 130 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 131 FAILOVER_STATUS_COMPLETED); 132 if (old_state != FAILOVER_STATUS_ACTIVE) { 133 error_report("Incorrect state (%s) while doing failover for Primary VM", 134 FailoverStatus_lookup[old_state]); 135 return; 136 } 137 /* Notify COLO thread that failover work is finished */ 138 qemu_sem_post(&s->colo_exit_sem); 139 } 140 141 void colo_do_failover(MigrationState *s) 142 { 143 /* Make sure VM stopped while failover happened. */ 144 if (!colo_runstate_is_stopped()) { 145 vm_stop_force_state(RUN_STATE_COLO); 146 } 147 148 if (get_colo_mode() == COLO_MODE_PRIMARY) { 149 primary_vm_do_failover(); 150 } else { 151 secondary_vm_do_failover(); 152 } 153 } 154 155 void qmp_xen_set_replication(bool enable, bool primary, 156 bool has_failover, bool failover, 157 Error **errp) 158 { 159 #ifdef CONFIG_REPLICATION 160 ReplicationMode mode = primary ? 161 REPLICATION_MODE_PRIMARY : 162 REPLICATION_MODE_SECONDARY; 163 164 if (has_failover && enable) { 165 error_setg(errp, "Parameter 'failover' is only for" 166 " stopping replication"); 167 return; 168 } 169 170 if (enable) { 171 replication_start_all(mode, errp); 172 } else { 173 if (!has_failover) { 174 failover = NULL; 175 } 176 replication_stop_all(failover, failover ? NULL : errp); 177 } 178 #else 179 abort(); 180 #endif 181 } 182 183 ReplicationStatus *qmp_query_xen_replication_status(Error **errp) 184 { 185 #ifdef CONFIG_REPLICATION 186 Error *err = NULL; 187 ReplicationStatus *s = g_new0(ReplicationStatus, 1); 188 189 replication_get_error_all(&err); 190 if (err) { 191 s->error = true; 192 s->has_desc = true; 193 s->desc = g_strdup(error_get_pretty(err)); 194 } else { 195 s->error = false; 196 } 197 198 error_free(err); 199 return s; 200 #else 201 abort(); 202 #endif 203 } 204 205 void qmp_xen_colo_do_checkpoint(Error **errp) 206 { 207 #ifdef CONFIG_REPLICATION 208 replication_do_checkpoint_all(errp); 209 #else 210 abort(); 211 #endif 212 } 213 214 static void colo_send_message(QEMUFile *f, COLOMessage msg, 215 Error **errp) 216 { 217 int ret; 218 219 if (msg >= COLO_MESSAGE__MAX) { 220 error_setg(errp, "%s: Invalid message", __func__); 221 return; 222 } 223 qemu_put_be32(f, msg); 224 qemu_fflush(f); 225 226 ret = qemu_file_get_error(f); 227 if (ret < 0) { 228 error_setg_errno(errp, -ret, "Can't send COLO message"); 229 } 230 trace_colo_send_message(COLOMessage_lookup[msg]); 231 } 232 233 static void colo_send_message_value(QEMUFile *f, COLOMessage msg, 234 uint64_t value, Error **errp) 235 { 236 Error *local_err = NULL; 237 int ret; 238 239 colo_send_message(f, msg, &local_err); 240 if (local_err) { 241 error_propagate(errp, local_err); 242 return; 243 } 244 qemu_put_be64(f, value); 245 qemu_fflush(f); 246 247 ret = qemu_file_get_error(f); 248 if (ret < 0) { 249 error_setg_errno(errp, -ret, "Failed to send value for message:%s", 250 COLOMessage_lookup[msg]); 251 } 252 } 253 254 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) 255 { 256 COLOMessage msg; 257 int ret; 258 259 msg = qemu_get_be32(f); 260 ret = qemu_file_get_error(f); 261 if (ret < 0) { 262 error_setg_errno(errp, -ret, "Can't receive COLO message"); 263 return msg; 264 } 265 if (msg >= COLO_MESSAGE__MAX) { 266 error_setg(errp, "%s: Invalid message", __func__); 267 return msg; 268 } 269 trace_colo_receive_message(COLOMessage_lookup[msg]); 270 return msg; 271 } 272 273 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, 274 Error **errp) 275 { 276 COLOMessage msg; 277 Error *local_err = NULL; 278 279 msg = colo_receive_message(f, &local_err); 280 if (local_err) { 281 error_propagate(errp, local_err); 282 return; 283 } 284 if (msg != expect_msg) { 285 error_setg(errp, "Unexpected COLO message %d, expected %d", 286 msg, expect_msg); 287 } 288 } 289 290 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, 291 Error **errp) 292 { 293 Error *local_err = NULL; 294 uint64_t value; 295 int ret; 296 297 colo_receive_check_message(f, expect_msg, &local_err); 298 if (local_err) { 299 error_propagate(errp, local_err); 300 return 0; 301 } 302 303 value = qemu_get_be64(f); 304 ret = qemu_file_get_error(f); 305 if (ret < 0) { 306 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", 307 COLOMessage_lookup[expect_msg]); 308 } 309 return value; 310 } 311 312 static int colo_do_checkpoint_transaction(MigrationState *s, 313 QIOChannelBuffer *bioc, 314 QEMUFile *fb) 315 { 316 Error *local_err = NULL; 317 int ret = -1; 318 319 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, 320 &local_err); 321 if (local_err) { 322 goto out; 323 } 324 325 colo_receive_check_message(s->rp_state.from_dst_file, 326 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); 327 if (local_err) { 328 goto out; 329 } 330 /* Reset channel-buffer directly */ 331 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 332 bioc->usage = 0; 333 334 qemu_mutex_lock_iothread(); 335 if (failover_get_state() != FAILOVER_STATUS_NONE) { 336 qemu_mutex_unlock_iothread(); 337 goto out; 338 } 339 vm_stop_force_state(RUN_STATE_COLO); 340 qemu_mutex_unlock_iothread(); 341 trace_colo_vm_state_change("run", "stop"); 342 /* 343 * Failover request bh could be called after vm_stop_force_state(), 344 * So we need check failover_request_is_active() again. 345 */ 346 if (failover_get_state() != FAILOVER_STATUS_NONE) { 347 goto out; 348 } 349 350 /* Disable block migration */ 351 migrate_set_block_enabled(false, &local_err); 352 qemu_savevm_state_header(fb); 353 qemu_savevm_state_setup(fb); 354 qemu_mutex_lock_iothread(); 355 qemu_savevm_state_complete_precopy(fb, false, false); 356 qemu_mutex_unlock_iothread(); 357 358 qemu_fflush(fb); 359 360 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); 361 if (local_err) { 362 goto out; 363 } 364 /* 365 * We need the size of the VMstate data in Secondary side, 366 * With which we can decide how much data should be read. 367 */ 368 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, 369 bioc->usage, &local_err); 370 if (local_err) { 371 goto out; 372 } 373 374 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); 375 qemu_fflush(s->to_dst_file); 376 ret = qemu_file_get_error(s->to_dst_file); 377 if (ret < 0) { 378 goto out; 379 } 380 381 colo_receive_check_message(s->rp_state.from_dst_file, 382 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); 383 if (local_err) { 384 goto out; 385 } 386 387 colo_receive_check_message(s->rp_state.from_dst_file, 388 COLO_MESSAGE_VMSTATE_LOADED, &local_err); 389 if (local_err) { 390 goto out; 391 } 392 393 ret = 0; 394 395 qemu_mutex_lock_iothread(); 396 vm_start(); 397 qemu_mutex_unlock_iothread(); 398 trace_colo_vm_state_change("stop", "run"); 399 400 out: 401 if (local_err) { 402 error_report_err(local_err); 403 } 404 return ret; 405 } 406 407 static void colo_process_checkpoint(MigrationState *s) 408 { 409 QIOChannelBuffer *bioc; 410 QEMUFile *fb = NULL; 411 int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 412 Error *local_err = NULL; 413 int ret; 414 415 failover_init_state(); 416 417 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); 418 if (!s->rp_state.from_dst_file) { 419 error_report("Open QEMUFile from_dst_file failed"); 420 goto out; 421 } 422 423 /* 424 * Wait for Secondary finish loading VM states and enter COLO 425 * restore. 426 */ 427 colo_receive_check_message(s->rp_state.from_dst_file, 428 COLO_MESSAGE_CHECKPOINT_READY, &local_err); 429 if (local_err) { 430 goto out; 431 } 432 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 433 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); 434 object_unref(OBJECT(bioc)); 435 436 qemu_mutex_lock_iothread(); 437 vm_start(); 438 qemu_mutex_unlock_iothread(); 439 trace_colo_vm_state_change("stop", "run"); 440 441 timer_mod(s->colo_delay_timer, 442 current_time + s->parameters.x_checkpoint_delay); 443 444 while (s->state == MIGRATION_STATUS_COLO) { 445 if (failover_get_state() != FAILOVER_STATUS_NONE) { 446 error_report("failover request"); 447 goto out; 448 } 449 450 qemu_sem_wait(&s->colo_checkpoint_sem); 451 452 ret = colo_do_checkpoint_transaction(s, bioc, fb); 453 if (ret < 0) { 454 goto out; 455 } 456 } 457 458 out: 459 /* Throw the unreported error message after exited from loop */ 460 if (local_err) { 461 error_report_err(local_err); 462 } 463 464 if (fb) { 465 qemu_fclose(fb); 466 } 467 468 timer_del(s->colo_delay_timer); 469 470 /* Hope this not to be too long to wait here */ 471 qemu_sem_wait(&s->colo_exit_sem); 472 qemu_sem_destroy(&s->colo_exit_sem); 473 /* 474 * Must be called after failover BH is completed, 475 * Or the failover BH may shutdown the wrong fd that 476 * re-used by other threads after we release here. 477 */ 478 if (s->rp_state.from_dst_file) { 479 qemu_fclose(s->rp_state.from_dst_file); 480 } 481 } 482 483 void colo_checkpoint_notify(void *opaque) 484 { 485 MigrationState *s = opaque; 486 int64_t next_notify_time; 487 488 qemu_sem_post(&s->colo_checkpoint_sem); 489 s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 490 next_notify_time = s->colo_checkpoint_time + 491 s->parameters.x_checkpoint_delay; 492 timer_mod(s->colo_delay_timer, next_notify_time); 493 } 494 495 void migrate_start_colo_process(MigrationState *s) 496 { 497 qemu_mutex_unlock_iothread(); 498 qemu_sem_init(&s->colo_checkpoint_sem, 0); 499 s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST, 500 colo_checkpoint_notify, s); 501 502 qemu_sem_init(&s->colo_exit_sem, 0); 503 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE, 504 MIGRATION_STATUS_COLO); 505 colo_process_checkpoint(s); 506 qemu_mutex_lock_iothread(); 507 } 508 509 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request, 510 Error **errp) 511 { 512 COLOMessage msg; 513 Error *local_err = NULL; 514 515 msg = colo_receive_message(f, &local_err); 516 if (local_err) { 517 error_propagate(errp, local_err); 518 return; 519 } 520 521 switch (msg) { 522 case COLO_MESSAGE_CHECKPOINT_REQUEST: 523 *checkpoint_request = 1; 524 break; 525 default: 526 *checkpoint_request = 0; 527 error_setg(errp, "Got unknown COLO message: %d", msg); 528 break; 529 } 530 } 531 532 void *colo_process_incoming_thread(void *opaque) 533 { 534 MigrationIncomingState *mis = opaque; 535 QEMUFile *fb = NULL; 536 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ 537 uint64_t total_size; 538 uint64_t value; 539 Error *local_err = NULL; 540 541 qemu_sem_init(&mis->colo_incoming_sem, 0); 542 543 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 544 MIGRATION_STATUS_COLO); 545 546 failover_init_state(); 547 548 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); 549 if (!mis->to_src_file) { 550 error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); 551 goto out; 552 } 553 /* 554 * Note: the communication between Primary side and Secondary side 555 * should be sequential, we set the fd to unblocked in migration incoming 556 * coroutine, and here we are in the COLO incoming thread, so it is ok to 557 * set the fd back to blocked. 558 */ 559 qemu_file_set_blocking(mis->from_src_file, true); 560 561 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 562 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); 563 object_unref(OBJECT(bioc)); 564 565 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, 566 &local_err); 567 if (local_err) { 568 goto out; 569 } 570 571 while (mis->state == MIGRATION_STATUS_COLO) { 572 int request = 0; 573 574 colo_wait_handle_message(mis->from_src_file, &request, &local_err); 575 if (local_err) { 576 goto out; 577 } 578 assert(request); 579 if (failover_get_state() != FAILOVER_STATUS_NONE) { 580 error_report("failover request"); 581 goto out; 582 } 583 584 /* FIXME: This is unnecessary for periodic checkpoint mode */ 585 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, 586 &local_err); 587 if (local_err) { 588 goto out; 589 } 590 591 colo_receive_check_message(mis->from_src_file, 592 COLO_MESSAGE_VMSTATE_SEND, &local_err); 593 if (local_err) { 594 goto out; 595 } 596 597 value = colo_receive_message_value(mis->from_src_file, 598 COLO_MESSAGE_VMSTATE_SIZE, &local_err); 599 if (local_err) { 600 goto out; 601 } 602 603 /* 604 * Read VM device state data into channel buffer, 605 * It's better to re-use the memory allocated. 606 * Here we need to handle the channel buffer directly. 607 */ 608 if (value > bioc->capacity) { 609 bioc->capacity = value; 610 bioc->data = g_realloc(bioc->data, bioc->capacity); 611 } 612 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); 613 if (total_size != value) { 614 error_report("Got %" PRIu64 " VMState data, less than expected" 615 " %" PRIu64, total_size, value); 616 goto out; 617 } 618 bioc->usage = total_size; 619 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 620 621 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, 622 &local_err); 623 if (local_err) { 624 goto out; 625 } 626 627 qemu_mutex_lock_iothread(); 628 qemu_system_reset(SHUTDOWN_CAUSE_NONE); 629 vmstate_loading = true; 630 if (qemu_loadvm_state(fb) < 0) { 631 error_report("COLO: loadvm failed"); 632 qemu_mutex_unlock_iothread(); 633 goto out; 634 } 635 636 vmstate_loading = false; 637 qemu_mutex_unlock_iothread(); 638 639 if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { 640 failover_set_state(FAILOVER_STATUS_RELAUNCH, 641 FAILOVER_STATUS_NONE); 642 failover_request_active(NULL); 643 goto out; 644 } 645 646 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, 647 &local_err); 648 if (local_err) { 649 goto out; 650 } 651 } 652 653 out: 654 vmstate_loading = false; 655 /* Throw the unreported error message after exited from loop */ 656 if (local_err) { 657 error_report_err(local_err); 658 } 659 660 if (fb) { 661 qemu_fclose(fb); 662 } 663 664 /* Hope this not to be too long to loop here */ 665 qemu_sem_wait(&mis->colo_incoming_sem); 666 qemu_sem_destroy(&mis->colo_incoming_sem); 667 /* Must be called after failover BH is completed */ 668 if (mis->to_src_file) { 669 qemu_fclose(mis->to_src_file); 670 } 671 migration_incoming_exit_colo(); 672 673 return NULL; 674 } 675