1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/timer.h" 15 #include "sysemu/sysemu.h" 16 #include "qemu-file-channel.h" 17 #include "migration/migration.h" 18 #include "migration/qemu-file.h" 19 #include "migration/colo.h" 20 #include "migration/block.h" 21 #include "io/channel-buffer.h" 22 #include "trace.h" 23 #include "qemu/error-report.h" 24 #include "qapi/error.h" 25 #include "migration/failover.h" 26 #include "replication.h" 27 #include "qmp-commands.h" 28 29 static bool vmstate_loading; 30 31 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) 32 33 bool colo_supported(void) 34 { 35 return true; 36 } 37 38 bool migration_in_colo_state(void) 39 { 40 MigrationState *s = migrate_get_current(); 41 42 return (s->state == MIGRATION_STATUS_COLO); 43 } 44 45 bool migration_incoming_in_colo_state(void) 46 { 47 MigrationIncomingState *mis = migration_incoming_get_current(); 48 49 return mis && (mis->state == MIGRATION_STATUS_COLO); 50 } 51 52 static bool colo_runstate_is_stopped(void) 53 { 54 return runstate_check(RUN_STATE_COLO) || !runstate_is_running(); 55 } 56 57 static void secondary_vm_do_failover(void) 58 { 59 int old_state; 60 MigrationIncomingState *mis = migration_incoming_get_current(); 61 62 /* Can not do failover during the process of VM's loading VMstate, Or 63 * it will break the secondary VM. 64 */ 65 if (vmstate_loading) { 66 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 67 FAILOVER_STATUS_RELAUNCH); 68 if (old_state != FAILOVER_STATUS_ACTIVE) { 69 error_report("Unknown error while do failover for secondary VM," 70 "old_state: %s", FailoverStatus_lookup[old_state]); 71 } 72 return; 73 } 74 75 migrate_set_state(&mis->state, MIGRATION_STATUS_COLO, 76 MIGRATION_STATUS_COMPLETED); 77 78 if (!autostart) { 79 error_report("\"-S\" qemu option will be ignored in secondary side"); 80 /* recover runstate to normal migration finish state */ 81 autostart = true; 82 } 83 /* 84 * Make sure COLO incoming thread not block in recv or send, 85 * If mis->from_src_file and mis->to_src_file use the same fd, 86 * The second shutdown() will return -1, we ignore this value, 87 * It is harmless. 88 */ 89 if (mis->from_src_file) { 90 qemu_file_shutdown(mis->from_src_file); 91 } 92 if (mis->to_src_file) { 93 qemu_file_shutdown(mis->to_src_file); 94 } 95 96 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 97 FAILOVER_STATUS_COMPLETED); 98 if (old_state != FAILOVER_STATUS_ACTIVE) { 99 error_report("Incorrect state (%s) while doing failover for " 100 "secondary VM", FailoverStatus_lookup[old_state]); 101 return; 102 } 103 /* Notify COLO incoming thread that failover work is finished */ 104 qemu_sem_post(&mis->colo_incoming_sem); 105 /* For Secondary VM, jump to incoming co */ 106 if (mis->migration_incoming_co) { 107 qemu_coroutine_enter(mis->migration_incoming_co); 108 } 109 } 110 111 static void primary_vm_do_failover(void) 112 { 113 MigrationState *s = migrate_get_current(); 114 int old_state; 115 116 migrate_set_state(&s->state, MIGRATION_STATUS_COLO, 117 MIGRATION_STATUS_COMPLETED); 118 119 /* 120 * Wake up COLO thread which may blocked in recv() or send(), 121 * The s->rp_state.from_dst_file and s->to_dst_file may use the 122 * same fd, but we still shutdown the fd for twice, it is harmless. 123 */ 124 if (s->to_dst_file) { 125 qemu_file_shutdown(s->to_dst_file); 126 } 127 if (s->rp_state.from_dst_file) { 128 qemu_file_shutdown(s->rp_state.from_dst_file); 129 } 130 131 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 132 FAILOVER_STATUS_COMPLETED); 133 if (old_state != FAILOVER_STATUS_ACTIVE) { 134 error_report("Incorrect state (%s) while doing failover for Primary VM", 135 FailoverStatus_lookup[old_state]); 136 return; 137 } 138 /* Notify COLO thread that failover work is finished */ 139 qemu_sem_post(&s->colo_exit_sem); 140 } 141 142 void colo_do_failover(MigrationState *s) 143 { 144 /* Make sure VM stopped while failover happened. */ 145 if (!colo_runstate_is_stopped()) { 146 vm_stop_force_state(RUN_STATE_COLO); 147 } 148 149 if (get_colo_mode() == COLO_MODE_PRIMARY) { 150 primary_vm_do_failover(); 151 } else { 152 secondary_vm_do_failover(); 153 } 154 } 155 156 void qmp_xen_set_replication(bool enable, bool primary, 157 bool has_failover, bool failover, 158 Error **errp) 159 { 160 #ifdef CONFIG_REPLICATION 161 ReplicationMode mode = primary ? 162 REPLICATION_MODE_PRIMARY : 163 REPLICATION_MODE_SECONDARY; 164 165 if (has_failover && enable) { 166 error_setg(errp, "Parameter 'failover' is only for" 167 " stopping replication"); 168 return; 169 } 170 171 if (enable) { 172 replication_start_all(mode, errp); 173 } else { 174 if (!has_failover) { 175 failover = NULL; 176 } 177 replication_stop_all(failover, failover ? NULL : errp); 178 } 179 #else 180 abort(); 181 #endif 182 } 183 184 ReplicationStatus *qmp_query_xen_replication_status(Error **errp) 185 { 186 #ifdef CONFIG_REPLICATION 187 Error *err = NULL; 188 ReplicationStatus *s = g_new0(ReplicationStatus, 1); 189 190 replication_get_error_all(&err); 191 if (err) { 192 s->error = true; 193 s->has_desc = true; 194 s->desc = g_strdup(error_get_pretty(err)); 195 } else { 196 s->error = false; 197 } 198 199 error_free(err); 200 return s; 201 #else 202 abort(); 203 #endif 204 } 205 206 void qmp_xen_colo_do_checkpoint(Error **errp) 207 { 208 #ifdef CONFIG_REPLICATION 209 replication_do_checkpoint_all(errp); 210 #else 211 abort(); 212 #endif 213 } 214 215 static void colo_send_message(QEMUFile *f, COLOMessage msg, 216 Error **errp) 217 { 218 int ret; 219 220 if (msg >= COLO_MESSAGE__MAX) { 221 error_setg(errp, "%s: Invalid message", __func__); 222 return; 223 } 224 qemu_put_be32(f, msg); 225 qemu_fflush(f); 226 227 ret = qemu_file_get_error(f); 228 if (ret < 0) { 229 error_setg_errno(errp, -ret, "Can't send COLO message"); 230 } 231 trace_colo_send_message(COLOMessage_lookup[msg]); 232 } 233 234 static void colo_send_message_value(QEMUFile *f, COLOMessage msg, 235 uint64_t value, Error **errp) 236 { 237 Error *local_err = NULL; 238 int ret; 239 240 colo_send_message(f, msg, &local_err); 241 if (local_err) { 242 error_propagate(errp, local_err); 243 return; 244 } 245 qemu_put_be64(f, value); 246 qemu_fflush(f); 247 248 ret = qemu_file_get_error(f); 249 if (ret < 0) { 250 error_setg_errno(errp, -ret, "Failed to send value for message:%s", 251 COLOMessage_lookup[msg]); 252 } 253 } 254 255 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) 256 { 257 COLOMessage msg; 258 int ret; 259 260 msg = qemu_get_be32(f); 261 ret = qemu_file_get_error(f); 262 if (ret < 0) { 263 error_setg_errno(errp, -ret, "Can't receive COLO message"); 264 return msg; 265 } 266 if (msg >= COLO_MESSAGE__MAX) { 267 error_setg(errp, "%s: Invalid message", __func__); 268 return msg; 269 } 270 trace_colo_receive_message(COLOMessage_lookup[msg]); 271 return msg; 272 } 273 274 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, 275 Error **errp) 276 { 277 COLOMessage msg; 278 Error *local_err = NULL; 279 280 msg = colo_receive_message(f, &local_err); 281 if (local_err) { 282 error_propagate(errp, local_err); 283 return; 284 } 285 if (msg != expect_msg) { 286 error_setg(errp, "Unexpected COLO message %d, expected %d", 287 msg, expect_msg); 288 } 289 } 290 291 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, 292 Error **errp) 293 { 294 Error *local_err = NULL; 295 uint64_t value; 296 int ret; 297 298 colo_receive_check_message(f, expect_msg, &local_err); 299 if (local_err) { 300 error_propagate(errp, local_err); 301 return 0; 302 } 303 304 value = qemu_get_be64(f); 305 ret = qemu_file_get_error(f); 306 if (ret < 0) { 307 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", 308 COLOMessage_lookup[expect_msg]); 309 } 310 return value; 311 } 312 313 static int colo_do_checkpoint_transaction(MigrationState *s, 314 QIOChannelBuffer *bioc, 315 QEMUFile *fb) 316 { 317 Error *local_err = NULL; 318 int ret = -1; 319 320 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, 321 &local_err); 322 if (local_err) { 323 goto out; 324 } 325 326 colo_receive_check_message(s->rp_state.from_dst_file, 327 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); 328 if (local_err) { 329 goto out; 330 } 331 /* Reset channel-buffer directly */ 332 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 333 bioc->usage = 0; 334 335 qemu_mutex_lock_iothread(); 336 if (failover_get_state() != FAILOVER_STATUS_NONE) { 337 qemu_mutex_unlock_iothread(); 338 goto out; 339 } 340 vm_stop_force_state(RUN_STATE_COLO); 341 qemu_mutex_unlock_iothread(); 342 trace_colo_vm_state_change("run", "stop"); 343 /* 344 * Failover request bh could be called after vm_stop_force_state(), 345 * So we need check failover_request_is_active() again. 346 */ 347 if (failover_get_state() != FAILOVER_STATUS_NONE) { 348 goto out; 349 } 350 351 /* Disable block migration */ 352 migrate_set_block_enabled(false, &local_err); 353 qemu_savevm_state_header(fb); 354 qemu_savevm_state_begin(fb); 355 qemu_mutex_lock_iothread(); 356 qemu_savevm_state_complete_precopy(fb, false); 357 qemu_mutex_unlock_iothread(); 358 359 qemu_fflush(fb); 360 361 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); 362 if (local_err) { 363 goto out; 364 } 365 /* 366 * We need the size of the VMstate data in Secondary side, 367 * With which we can decide how much data should be read. 368 */ 369 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, 370 bioc->usage, &local_err); 371 if (local_err) { 372 goto out; 373 } 374 375 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); 376 qemu_fflush(s->to_dst_file); 377 ret = qemu_file_get_error(s->to_dst_file); 378 if (ret < 0) { 379 goto out; 380 } 381 382 colo_receive_check_message(s->rp_state.from_dst_file, 383 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); 384 if (local_err) { 385 goto out; 386 } 387 388 colo_receive_check_message(s->rp_state.from_dst_file, 389 COLO_MESSAGE_VMSTATE_LOADED, &local_err); 390 if (local_err) { 391 goto out; 392 } 393 394 ret = 0; 395 396 qemu_mutex_lock_iothread(); 397 vm_start(); 398 qemu_mutex_unlock_iothread(); 399 trace_colo_vm_state_change("stop", "run"); 400 401 out: 402 if (local_err) { 403 error_report_err(local_err); 404 } 405 return ret; 406 } 407 408 static void colo_process_checkpoint(MigrationState *s) 409 { 410 QIOChannelBuffer *bioc; 411 QEMUFile *fb = NULL; 412 int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 413 Error *local_err = NULL; 414 int ret; 415 416 failover_init_state(); 417 418 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); 419 if (!s->rp_state.from_dst_file) { 420 error_report("Open QEMUFile from_dst_file failed"); 421 goto out; 422 } 423 424 /* 425 * Wait for Secondary finish loading VM states and enter COLO 426 * restore. 427 */ 428 colo_receive_check_message(s->rp_state.from_dst_file, 429 COLO_MESSAGE_CHECKPOINT_READY, &local_err); 430 if (local_err) { 431 goto out; 432 } 433 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 434 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); 435 object_unref(OBJECT(bioc)); 436 437 qemu_mutex_lock_iothread(); 438 vm_start(); 439 qemu_mutex_unlock_iothread(); 440 trace_colo_vm_state_change("stop", "run"); 441 442 timer_mod(s->colo_delay_timer, 443 current_time + s->parameters.x_checkpoint_delay); 444 445 while (s->state == MIGRATION_STATUS_COLO) { 446 if (failover_get_state() != FAILOVER_STATUS_NONE) { 447 error_report("failover request"); 448 goto out; 449 } 450 451 qemu_sem_wait(&s->colo_checkpoint_sem); 452 453 ret = colo_do_checkpoint_transaction(s, bioc, fb); 454 if (ret < 0) { 455 goto out; 456 } 457 } 458 459 out: 460 /* Throw the unreported error message after exited from loop */ 461 if (local_err) { 462 error_report_err(local_err); 463 } 464 465 if (fb) { 466 qemu_fclose(fb); 467 } 468 469 timer_del(s->colo_delay_timer); 470 471 /* Hope this not to be too long to wait here */ 472 qemu_sem_wait(&s->colo_exit_sem); 473 qemu_sem_destroy(&s->colo_exit_sem); 474 /* 475 * Must be called after failover BH is completed, 476 * Or the failover BH may shutdown the wrong fd that 477 * re-used by other threads after we release here. 478 */ 479 if (s->rp_state.from_dst_file) { 480 qemu_fclose(s->rp_state.from_dst_file); 481 } 482 } 483 484 void colo_checkpoint_notify(void *opaque) 485 { 486 MigrationState *s = opaque; 487 int64_t next_notify_time; 488 489 qemu_sem_post(&s->colo_checkpoint_sem); 490 s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 491 next_notify_time = s->colo_checkpoint_time + 492 s->parameters.x_checkpoint_delay; 493 timer_mod(s->colo_delay_timer, next_notify_time); 494 } 495 496 void migrate_start_colo_process(MigrationState *s) 497 { 498 qemu_mutex_unlock_iothread(); 499 qemu_sem_init(&s->colo_checkpoint_sem, 0); 500 s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST, 501 colo_checkpoint_notify, s); 502 503 qemu_sem_init(&s->colo_exit_sem, 0); 504 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE, 505 MIGRATION_STATUS_COLO); 506 colo_process_checkpoint(s); 507 qemu_mutex_lock_iothread(); 508 } 509 510 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request, 511 Error **errp) 512 { 513 COLOMessage msg; 514 Error *local_err = NULL; 515 516 msg = colo_receive_message(f, &local_err); 517 if (local_err) { 518 error_propagate(errp, local_err); 519 return; 520 } 521 522 switch (msg) { 523 case COLO_MESSAGE_CHECKPOINT_REQUEST: 524 *checkpoint_request = 1; 525 break; 526 default: 527 *checkpoint_request = 0; 528 error_setg(errp, "Got unknown COLO message: %d", msg); 529 break; 530 } 531 } 532 533 void *colo_process_incoming_thread(void *opaque) 534 { 535 MigrationIncomingState *mis = opaque; 536 QEMUFile *fb = NULL; 537 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ 538 uint64_t total_size; 539 uint64_t value; 540 Error *local_err = NULL; 541 542 qemu_sem_init(&mis->colo_incoming_sem, 0); 543 544 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 545 MIGRATION_STATUS_COLO); 546 547 failover_init_state(); 548 549 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); 550 if (!mis->to_src_file) { 551 error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); 552 goto out; 553 } 554 /* 555 * Note: the communication between Primary side and Secondary side 556 * should be sequential, we set the fd to unblocked in migration incoming 557 * coroutine, and here we are in the COLO incoming thread, so it is ok to 558 * set the fd back to blocked. 559 */ 560 qemu_file_set_blocking(mis->from_src_file, true); 561 562 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 563 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); 564 object_unref(OBJECT(bioc)); 565 566 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, 567 &local_err); 568 if (local_err) { 569 goto out; 570 } 571 572 while (mis->state == MIGRATION_STATUS_COLO) { 573 int request = 0; 574 575 colo_wait_handle_message(mis->from_src_file, &request, &local_err); 576 if (local_err) { 577 goto out; 578 } 579 assert(request); 580 if (failover_get_state() != FAILOVER_STATUS_NONE) { 581 error_report("failover request"); 582 goto out; 583 } 584 585 /* FIXME: This is unnecessary for periodic checkpoint mode */ 586 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, 587 &local_err); 588 if (local_err) { 589 goto out; 590 } 591 592 colo_receive_check_message(mis->from_src_file, 593 COLO_MESSAGE_VMSTATE_SEND, &local_err); 594 if (local_err) { 595 goto out; 596 } 597 598 value = colo_receive_message_value(mis->from_src_file, 599 COLO_MESSAGE_VMSTATE_SIZE, &local_err); 600 if (local_err) { 601 goto out; 602 } 603 604 /* 605 * Read VM device state data into channel buffer, 606 * It's better to re-use the memory allocated. 607 * Here we need to handle the channel buffer directly. 608 */ 609 if (value > bioc->capacity) { 610 bioc->capacity = value; 611 bioc->data = g_realloc(bioc->data, bioc->capacity); 612 } 613 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); 614 if (total_size != value) { 615 error_report("Got %" PRIu64 " VMState data, less than expected" 616 " %" PRIu64, total_size, value); 617 goto out; 618 } 619 bioc->usage = total_size; 620 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 621 622 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, 623 &local_err); 624 if (local_err) { 625 goto out; 626 } 627 628 qemu_mutex_lock_iothread(); 629 qemu_system_reset(VMRESET_SILENT); 630 vmstate_loading = true; 631 if (qemu_loadvm_state(fb) < 0) { 632 error_report("COLO: loadvm failed"); 633 qemu_mutex_unlock_iothread(); 634 goto out; 635 } 636 637 vmstate_loading = false; 638 qemu_mutex_unlock_iothread(); 639 640 if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { 641 failover_set_state(FAILOVER_STATUS_RELAUNCH, 642 FAILOVER_STATUS_NONE); 643 failover_request_active(NULL); 644 goto out; 645 } 646 647 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, 648 &local_err); 649 if (local_err) { 650 goto out; 651 } 652 } 653 654 out: 655 vmstate_loading = false; 656 /* Throw the unreported error message after exited from loop */ 657 if (local_err) { 658 error_report_err(local_err); 659 } 660 661 if (fb) { 662 qemu_fclose(fb); 663 } 664 665 /* Hope this not to be too long to loop here */ 666 qemu_sem_wait(&mis->colo_incoming_sem); 667 qemu_sem_destroy(&mis->colo_incoming_sem); 668 /* Must be called after failover BH is completed */ 669 if (mis->to_src_file) { 670 qemu_fclose(mis->to_src_file); 671 } 672 migration_incoming_exit_colo(); 673 674 return NULL; 675 } 676