1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "sysemu/sysemu.h" 15 #include "qapi/error.h" 16 #include "qemu-file-channel.h" 17 #include "migration.h" 18 #include "qemu-file.h" 19 #include "savevm.h" 20 #include "migration/colo.h" 21 #include "block.h" 22 #include "io/channel-buffer.h" 23 #include "trace.h" 24 #include "qemu/error-report.h" 25 #include "migration/failover.h" 26 #include "replication.h" 27 #include "qmp-commands.h" 28 29 static bool vmstate_loading; 30 31 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) 32 33 bool migration_in_colo_state(void) 34 { 35 MigrationState *s = migrate_get_current(); 36 37 return (s->state == MIGRATION_STATUS_COLO); 38 } 39 40 bool migration_incoming_in_colo_state(void) 41 { 42 MigrationIncomingState *mis = migration_incoming_get_current(); 43 44 return mis && (mis->state == MIGRATION_STATUS_COLO); 45 } 46 47 static bool colo_runstate_is_stopped(void) 48 { 49 return runstate_check(RUN_STATE_COLO) || !runstate_is_running(); 50 } 51 52 static void secondary_vm_do_failover(void) 53 { 54 int old_state; 55 MigrationIncomingState *mis = migration_incoming_get_current(); 56 57 /* Can not do failover during the process of VM's loading VMstate, Or 58 * it will break the secondary VM. 59 */ 60 if (vmstate_loading) { 61 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 62 FAILOVER_STATUS_RELAUNCH); 63 if (old_state != FAILOVER_STATUS_ACTIVE) { 64 error_report("Unknown error while do failover for secondary VM," 65 "old_state: %s", FailoverStatus_str(old_state)); 66 } 67 return; 68 } 69 70 migrate_set_state(&mis->state, MIGRATION_STATUS_COLO, 71 MIGRATION_STATUS_COMPLETED); 72 73 if (!autostart) { 74 error_report("\"-S\" qemu option will be ignored in secondary side"); 75 /* recover runstate to normal migration finish state */ 76 autostart = true; 77 } 78 /* 79 * Make sure COLO incoming thread not block in recv or send, 80 * If mis->from_src_file and mis->to_src_file use the same fd, 81 * The second shutdown() will return -1, we ignore this value, 82 * It is harmless. 83 */ 84 if (mis->from_src_file) { 85 qemu_file_shutdown(mis->from_src_file); 86 } 87 if (mis->to_src_file) { 88 qemu_file_shutdown(mis->to_src_file); 89 } 90 91 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 92 FAILOVER_STATUS_COMPLETED); 93 if (old_state != FAILOVER_STATUS_ACTIVE) { 94 error_report("Incorrect state (%s) while doing failover for " 95 "secondary VM", FailoverStatus_str(old_state)); 96 return; 97 } 98 /* Notify COLO incoming thread that failover work is finished */ 99 qemu_sem_post(&mis->colo_incoming_sem); 100 /* For Secondary VM, jump to incoming co */ 101 if (mis->migration_incoming_co) { 102 qemu_coroutine_enter(mis->migration_incoming_co); 103 } 104 } 105 106 static void primary_vm_do_failover(void) 107 { 108 MigrationState *s = migrate_get_current(); 109 int old_state; 110 111 migrate_set_state(&s->state, MIGRATION_STATUS_COLO, 112 MIGRATION_STATUS_COMPLETED); 113 114 /* 115 * Wake up COLO thread which may blocked in recv() or send(), 116 * The s->rp_state.from_dst_file and s->to_dst_file may use the 117 * same fd, but we still shutdown the fd for twice, it is harmless. 118 */ 119 if (s->to_dst_file) { 120 qemu_file_shutdown(s->to_dst_file); 121 } 122 if (s->rp_state.from_dst_file) { 123 qemu_file_shutdown(s->rp_state.from_dst_file); 124 } 125 126 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE, 127 FAILOVER_STATUS_COMPLETED); 128 if (old_state != FAILOVER_STATUS_ACTIVE) { 129 error_report("Incorrect state (%s) while doing failover for Primary VM", 130 FailoverStatus_str(old_state)); 131 return; 132 } 133 /* Notify COLO thread that failover work is finished */ 134 qemu_sem_post(&s->colo_exit_sem); 135 } 136 137 void colo_do_failover(MigrationState *s) 138 { 139 /* Make sure VM stopped while failover happened. */ 140 if (!colo_runstate_is_stopped()) { 141 vm_stop_force_state(RUN_STATE_COLO); 142 } 143 144 if (get_colo_mode() == COLO_MODE_PRIMARY) { 145 primary_vm_do_failover(); 146 } else { 147 secondary_vm_do_failover(); 148 } 149 } 150 151 void qmp_xen_set_replication(bool enable, bool primary, 152 bool has_failover, bool failover, 153 Error **errp) 154 { 155 #ifdef CONFIG_REPLICATION 156 ReplicationMode mode = primary ? 157 REPLICATION_MODE_PRIMARY : 158 REPLICATION_MODE_SECONDARY; 159 160 if (has_failover && enable) { 161 error_setg(errp, "Parameter 'failover' is only for" 162 " stopping replication"); 163 return; 164 } 165 166 if (enable) { 167 replication_start_all(mode, errp); 168 } else { 169 if (!has_failover) { 170 failover = NULL; 171 } 172 replication_stop_all(failover, failover ? NULL : errp); 173 } 174 #else 175 abort(); 176 #endif 177 } 178 179 ReplicationStatus *qmp_query_xen_replication_status(Error **errp) 180 { 181 #ifdef CONFIG_REPLICATION 182 Error *err = NULL; 183 ReplicationStatus *s = g_new0(ReplicationStatus, 1); 184 185 replication_get_error_all(&err); 186 if (err) { 187 s->error = true; 188 s->has_desc = true; 189 s->desc = g_strdup(error_get_pretty(err)); 190 } else { 191 s->error = false; 192 } 193 194 error_free(err); 195 return s; 196 #else 197 abort(); 198 #endif 199 } 200 201 void qmp_xen_colo_do_checkpoint(Error **errp) 202 { 203 #ifdef CONFIG_REPLICATION 204 replication_do_checkpoint_all(errp); 205 #else 206 abort(); 207 #endif 208 } 209 210 static void colo_send_message(QEMUFile *f, COLOMessage msg, 211 Error **errp) 212 { 213 int ret; 214 215 if (msg >= COLO_MESSAGE__MAX) { 216 error_setg(errp, "%s: Invalid message", __func__); 217 return; 218 } 219 qemu_put_be32(f, msg); 220 qemu_fflush(f); 221 222 ret = qemu_file_get_error(f); 223 if (ret < 0) { 224 error_setg_errno(errp, -ret, "Can't send COLO message"); 225 } 226 trace_colo_send_message(COLOMessage_str(msg)); 227 } 228 229 static void colo_send_message_value(QEMUFile *f, COLOMessage msg, 230 uint64_t value, Error **errp) 231 { 232 Error *local_err = NULL; 233 int ret; 234 235 colo_send_message(f, msg, &local_err); 236 if (local_err) { 237 error_propagate(errp, local_err); 238 return; 239 } 240 qemu_put_be64(f, value); 241 qemu_fflush(f); 242 243 ret = qemu_file_get_error(f); 244 if (ret < 0) { 245 error_setg_errno(errp, -ret, "Failed to send value for message:%s", 246 COLOMessage_str(msg)); 247 } 248 } 249 250 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) 251 { 252 COLOMessage msg; 253 int ret; 254 255 msg = qemu_get_be32(f); 256 ret = qemu_file_get_error(f); 257 if (ret < 0) { 258 error_setg_errno(errp, -ret, "Can't receive COLO message"); 259 return msg; 260 } 261 if (msg >= COLO_MESSAGE__MAX) { 262 error_setg(errp, "%s: Invalid message", __func__); 263 return msg; 264 } 265 trace_colo_receive_message(COLOMessage_str(msg)); 266 return msg; 267 } 268 269 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, 270 Error **errp) 271 { 272 COLOMessage msg; 273 Error *local_err = NULL; 274 275 msg = colo_receive_message(f, &local_err); 276 if (local_err) { 277 error_propagate(errp, local_err); 278 return; 279 } 280 if (msg != expect_msg) { 281 error_setg(errp, "Unexpected COLO message %d, expected %d", 282 msg, expect_msg); 283 } 284 } 285 286 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, 287 Error **errp) 288 { 289 Error *local_err = NULL; 290 uint64_t value; 291 int ret; 292 293 colo_receive_check_message(f, expect_msg, &local_err); 294 if (local_err) { 295 error_propagate(errp, local_err); 296 return 0; 297 } 298 299 value = qemu_get_be64(f); 300 ret = qemu_file_get_error(f); 301 if (ret < 0) { 302 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", 303 COLOMessage_str(expect_msg)); 304 } 305 return value; 306 } 307 308 static int colo_do_checkpoint_transaction(MigrationState *s, 309 QIOChannelBuffer *bioc, 310 QEMUFile *fb) 311 { 312 Error *local_err = NULL; 313 int ret = -1; 314 315 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, 316 &local_err); 317 if (local_err) { 318 goto out; 319 } 320 321 colo_receive_check_message(s->rp_state.from_dst_file, 322 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); 323 if (local_err) { 324 goto out; 325 } 326 /* Reset channel-buffer directly */ 327 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 328 bioc->usage = 0; 329 330 qemu_mutex_lock_iothread(); 331 if (failover_get_state() != FAILOVER_STATUS_NONE) { 332 qemu_mutex_unlock_iothread(); 333 goto out; 334 } 335 vm_stop_force_state(RUN_STATE_COLO); 336 qemu_mutex_unlock_iothread(); 337 trace_colo_vm_state_change("run", "stop"); 338 /* 339 * Failover request bh could be called after vm_stop_force_state(), 340 * So we need check failover_request_is_active() again. 341 */ 342 if (failover_get_state() != FAILOVER_STATUS_NONE) { 343 goto out; 344 } 345 346 /* Disable block migration */ 347 migrate_set_block_enabled(false, &local_err); 348 qemu_savevm_state_header(fb); 349 qemu_savevm_state_setup(fb); 350 qemu_mutex_lock_iothread(); 351 qemu_savevm_state_complete_precopy(fb, false, false); 352 qemu_mutex_unlock_iothread(); 353 354 qemu_fflush(fb); 355 356 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); 357 if (local_err) { 358 goto out; 359 } 360 /* 361 * We need the size of the VMstate data in Secondary side, 362 * With which we can decide how much data should be read. 363 */ 364 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, 365 bioc->usage, &local_err); 366 if (local_err) { 367 goto out; 368 } 369 370 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); 371 qemu_fflush(s->to_dst_file); 372 ret = qemu_file_get_error(s->to_dst_file); 373 if (ret < 0) { 374 goto out; 375 } 376 377 colo_receive_check_message(s->rp_state.from_dst_file, 378 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); 379 if (local_err) { 380 goto out; 381 } 382 383 colo_receive_check_message(s->rp_state.from_dst_file, 384 COLO_MESSAGE_VMSTATE_LOADED, &local_err); 385 if (local_err) { 386 goto out; 387 } 388 389 ret = 0; 390 391 qemu_mutex_lock_iothread(); 392 vm_start(); 393 qemu_mutex_unlock_iothread(); 394 trace_colo_vm_state_change("stop", "run"); 395 396 out: 397 if (local_err) { 398 error_report_err(local_err); 399 } 400 return ret; 401 } 402 403 static void colo_process_checkpoint(MigrationState *s) 404 { 405 QIOChannelBuffer *bioc; 406 QEMUFile *fb = NULL; 407 int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 408 Error *local_err = NULL; 409 int ret; 410 411 failover_init_state(); 412 413 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); 414 if (!s->rp_state.from_dst_file) { 415 error_report("Open QEMUFile from_dst_file failed"); 416 goto out; 417 } 418 419 /* 420 * Wait for Secondary finish loading VM states and enter COLO 421 * restore. 422 */ 423 colo_receive_check_message(s->rp_state.from_dst_file, 424 COLO_MESSAGE_CHECKPOINT_READY, &local_err); 425 if (local_err) { 426 goto out; 427 } 428 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 429 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); 430 object_unref(OBJECT(bioc)); 431 432 qemu_mutex_lock_iothread(); 433 vm_start(); 434 qemu_mutex_unlock_iothread(); 435 trace_colo_vm_state_change("stop", "run"); 436 437 timer_mod(s->colo_delay_timer, 438 current_time + s->parameters.x_checkpoint_delay); 439 440 while (s->state == MIGRATION_STATUS_COLO) { 441 if (failover_get_state() != FAILOVER_STATUS_NONE) { 442 error_report("failover request"); 443 goto out; 444 } 445 446 qemu_sem_wait(&s->colo_checkpoint_sem); 447 448 ret = colo_do_checkpoint_transaction(s, bioc, fb); 449 if (ret < 0) { 450 goto out; 451 } 452 } 453 454 out: 455 /* Throw the unreported error message after exited from loop */ 456 if (local_err) { 457 error_report_err(local_err); 458 } 459 460 if (fb) { 461 qemu_fclose(fb); 462 } 463 464 timer_del(s->colo_delay_timer); 465 466 /* Hope this not to be too long to wait here */ 467 qemu_sem_wait(&s->colo_exit_sem); 468 qemu_sem_destroy(&s->colo_exit_sem); 469 /* 470 * Must be called after failover BH is completed, 471 * Or the failover BH may shutdown the wrong fd that 472 * re-used by other threads after we release here. 473 */ 474 if (s->rp_state.from_dst_file) { 475 qemu_fclose(s->rp_state.from_dst_file); 476 } 477 } 478 479 void colo_checkpoint_notify(void *opaque) 480 { 481 MigrationState *s = opaque; 482 int64_t next_notify_time; 483 484 qemu_sem_post(&s->colo_checkpoint_sem); 485 s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 486 next_notify_time = s->colo_checkpoint_time + 487 s->parameters.x_checkpoint_delay; 488 timer_mod(s->colo_delay_timer, next_notify_time); 489 } 490 491 void migrate_start_colo_process(MigrationState *s) 492 { 493 qemu_mutex_unlock_iothread(); 494 qemu_sem_init(&s->colo_checkpoint_sem, 0); 495 s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST, 496 colo_checkpoint_notify, s); 497 498 qemu_sem_init(&s->colo_exit_sem, 0); 499 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE, 500 MIGRATION_STATUS_COLO); 501 colo_process_checkpoint(s); 502 qemu_mutex_lock_iothread(); 503 } 504 505 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request, 506 Error **errp) 507 { 508 COLOMessage msg; 509 Error *local_err = NULL; 510 511 msg = colo_receive_message(f, &local_err); 512 if (local_err) { 513 error_propagate(errp, local_err); 514 return; 515 } 516 517 switch (msg) { 518 case COLO_MESSAGE_CHECKPOINT_REQUEST: 519 *checkpoint_request = 1; 520 break; 521 default: 522 *checkpoint_request = 0; 523 error_setg(errp, "Got unknown COLO message: %d", msg); 524 break; 525 } 526 } 527 528 void *colo_process_incoming_thread(void *opaque) 529 { 530 MigrationIncomingState *mis = opaque; 531 QEMUFile *fb = NULL; 532 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ 533 uint64_t total_size; 534 uint64_t value; 535 Error *local_err = NULL; 536 537 qemu_sem_init(&mis->colo_incoming_sem, 0); 538 539 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 540 MIGRATION_STATUS_COLO); 541 542 failover_init_state(); 543 544 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); 545 if (!mis->to_src_file) { 546 error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); 547 goto out; 548 } 549 /* 550 * Note: the communication between Primary side and Secondary side 551 * should be sequential, we set the fd to unblocked in migration incoming 552 * coroutine, and here we are in the COLO incoming thread, so it is ok to 553 * set the fd back to blocked. 554 */ 555 qemu_file_set_blocking(mis->from_src_file, true); 556 557 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 558 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); 559 object_unref(OBJECT(bioc)); 560 561 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, 562 &local_err); 563 if (local_err) { 564 goto out; 565 } 566 567 while (mis->state == MIGRATION_STATUS_COLO) { 568 int request = 0; 569 570 colo_wait_handle_message(mis->from_src_file, &request, &local_err); 571 if (local_err) { 572 goto out; 573 } 574 assert(request); 575 if (failover_get_state() != FAILOVER_STATUS_NONE) { 576 error_report("failover request"); 577 goto out; 578 } 579 580 /* FIXME: This is unnecessary for periodic checkpoint mode */ 581 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, 582 &local_err); 583 if (local_err) { 584 goto out; 585 } 586 587 colo_receive_check_message(mis->from_src_file, 588 COLO_MESSAGE_VMSTATE_SEND, &local_err); 589 if (local_err) { 590 goto out; 591 } 592 593 value = colo_receive_message_value(mis->from_src_file, 594 COLO_MESSAGE_VMSTATE_SIZE, &local_err); 595 if (local_err) { 596 goto out; 597 } 598 599 /* 600 * Read VM device state data into channel buffer, 601 * It's better to re-use the memory allocated. 602 * Here we need to handle the channel buffer directly. 603 */ 604 if (value > bioc->capacity) { 605 bioc->capacity = value; 606 bioc->data = g_realloc(bioc->data, bioc->capacity); 607 } 608 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); 609 if (total_size != value) { 610 error_report("Got %" PRIu64 " VMState data, less than expected" 611 " %" PRIu64, total_size, value); 612 goto out; 613 } 614 bioc->usage = total_size; 615 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 616 617 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, 618 &local_err); 619 if (local_err) { 620 goto out; 621 } 622 623 qemu_mutex_lock_iothread(); 624 qemu_system_reset(SHUTDOWN_CAUSE_NONE); 625 vmstate_loading = true; 626 if (qemu_loadvm_state(fb) < 0) { 627 error_report("COLO: loadvm failed"); 628 qemu_mutex_unlock_iothread(); 629 goto out; 630 } 631 632 vmstate_loading = false; 633 qemu_mutex_unlock_iothread(); 634 635 if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { 636 failover_set_state(FAILOVER_STATUS_RELAUNCH, 637 FAILOVER_STATUS_NONE); 638 failover_request_active(NULL); 639 goto out; 640 } 641 642 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, 643 &local_err); 644 if (local_err) { 645 goto out; 646 } 647 } 648 649 out: 650 vmstate_loading = false; 651 /* Throw the unreported error message after exited from loop */ 652 if (local_err) { 653 error_report_err(local_err); 654 } 655 656 if (fb) { 657 qemu_fclose(fb); 658 } 659 660 /* Hope this not to be too long to loop here */ 661 qemu_sem_wait(&mis->colo_incoming_sem); 662 qemu_sem_destroy(&mis->colo_incoming_sem); 663 /* Must be called after failover BH is completed */ 664 if (mis->to_src_file) { 665 qemu_fclose(mis->to_src_file); 666 } 667 migration_incoming_exit_colo(); 668 669 return NULL; 670 } 671