1 /* 2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) 3 * (a.k.a. Fault Tolerance or Continuous Replication) 4 * 5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. 6 * Copyright (c) 2016 FUJITSU LIMITED 7 * Copyright (c) 2016 Intel Corporation 8 * 9 * This work is licensed under the terms of the GNU GPL, version 2 or 10 * later. See the COPYING file in the top-level directory. 11 */ 12 13 #include "qemu/osdep.h" 14 #include "qemu/timer.h" 15 #include "sysemu/sysemu.h" 16 #include "migration/colo.h" 17 #include "io/channel-buffer.h" 18 #include "trace.h" 19 #include "qemu/error-report.h" 20 #include "qapi/error.h" 21 #include "migration/failover.h" 22 23 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) 24 25 bool colo_supported(void) 26 { 27 return false; 28 } 29 30 bool migration_in_colo_state(void) 31 { 32 MigrationState *s = migrate_get_current(); 33 34 return (s->state == MIGRATION_STATUS_COLO); 35 } 36 37 bool migration_incoming_in_colo_state(void) 38 { 39 MigrationIncomingState *mis = migration_incoming_get_current(); 40 41 return mis && (mis->state == MIGRATION_STATUS_COLO); 42 } 43 44 static void colo_send_message(QEMUFile *f, COLOMessage msg, 45 Error **errp) 46 { 47 int ret; 48 49 if (msg >= COLO_MESSAGE__MAX) { 50 error_setg(errp, "%s: Invalid message", __func__); 51 return; 52 } 53 qemu_put_be32(f, msg); 54 qemu_fflush(f); 55 56 ret = qemu_file_get_error(f); 57 if (ret < 0) { 58 error_setg_errno(errp, -ret, "Can't send COLO message"); 59 } 60 trace_colo_send_message(COLOMessage_lookup[msg]); 61 } 62 63 static void colo_send_message_value(QEMUFile *f, COLOMessage msg, 64 uint64_t value, Error **errp) 65 { 66 Error *local_err = NULL; 67 int ret; 68 69 colo_send_message(f, msg, &local_err); 70 if (local_err) { 71 error_propagate(errp, local_err); 72 return; 73 } 74 qemu_put_be64(f, value); 75 qemu_fflush(f); 76 77 ret = qemu_file_get_error(f); 78 if (ret < 0) { 79 error_setg_errno(errp, -ret, "Failed to send value for message:%s", 80 COLOMessage_lookup[msg]); 81 } 82 } 83 84 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp) 85 { 86 COLOMessage msg; 87 int ret; 88 89 msg = qemu_get_be32(f); 90 ret = qemu_file_get_error(f); 91 if (ret < 0) { 92 error_setg_errno(errp, -ret, "Can't receive COLO message"); 93 return msg; 94 } 95 if (msg >= COLO_MESSAGE__MAX) { 96 error_setg(errp, "%s: Invalid message", __func__); 97 return msg; 98 } 99 trace_colo_receive_message(COLOMessage_lookup[msg]); 100 return msg; 101 } 102 103 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg, 104 Error **errp) 105 { 106 COLOMessage msg; 107 Error *local_err = NULL; 108 109 msg = colo_receive_message(f, &local_err); 110 if (local_err) { 111 error_propagate(errp, local_err); 112 return; 113 } 114 if (msg != expect_msg) { 115 error_setg(errp, "Unexpected COLO message %d, expected %d", 116 msg, expect_msg); 117 } 118 } 119 120 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg, 121 Error **errp) 122 { 123 Error *local_err = NULL; 124 uint64_t value; 125 int ret; 126 127 colo_receive_check_message(f, expect_msg, &local_err); 128 if (local_err) { 129 error_propagate(errp, local_err); 130 return 0; 131 } 132 133 value = qemu_get_be64(f); 134 ret = qemu_file_get_error(f); 135 if (ret < 0) { 136 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s", 137 COLOMessage_lookup[expect_msg]); 138 } 139 return value; 140 } 141 142 static int colo_do_checkpoint_transaction(MigrationState *s, 143 QIOChannelBuffer *bioc, 144 QEMUFile *fb) 145 { 146 Error *local_err = NULL; 147 int ret = -1; 148 149 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST, 150 &local_err); 151 if (local_err) { 152 goto out; 153 } 154 155 colo_receive_check_message(s->rp_state.from_dst_file, 156 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); 157 if (local_err) { 158 goto out; 159 } 160 /* Reset channel-buffer directly */ 161 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 162 bioc->usage = 0; 163 164 qemu_mutex_lock_iothread(); 165 vm_stop_force_state(RUN_STATE_COLO); 166 qemu_mutex_unlock_iothread(); 167 trace_colo_vm_state_change("run", "stop"); 168 169 /* Disable block migration */ 170 s->params.blk = 0; 171 s->params.shared = 0; 172 qemu_savevm_state_header(fb); 173 qemu_savevm_state_begin(fb, &s->params); 174 qemu_mutex_lock_iothread(); 175 qemu_savevm_state_complete_precopy(fb, false); 176 qemu_mutex_unlock_iothread(); 177 178 qemu_fflush(fb); 179 180 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); 181 if (local_err) { 182 goto out; 183 } 184 /* 185 * We need the size of the VMstate data in Secondary side, 186 * With which we can decide how much data should be read. 187 */ 188 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE, 189 bioc->usage, &local_err); 190 if (local_err) { 191 goto out; 192 } 193 194 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage); 195 qemu_fflush(s->to_dst_file); 196 ret = qemu_file_get_error(s->to_dst_file); 197 if (ret < 0) { 198 goto out; 199 } 200 201 colo_receive_check_message(s->rp_state.from_dst_file, 202 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err); 203 if (local_err) { 204 goto out; 205 } 206 207 colo_receive_check_message(s->rp_state.from_dst_file, 208 COLO_MESSAGE_VMSTATE_LOADED, &local_err); 209 if (local_err) { 210 goto out; 211 } 212 213 ret = 0; 214 215 qemu_mutex_lock_iothread(); 216 vm_start(); 217 qemu_mutex_unlock_iothread(); 218 trace_colo_vm_state_change("stop", "run"); 219 220 out: 221 if (local_err) { 222 error_report_err(local_err); 223 } 224 return ret; 225 } 226 227 static void colo_process_checkpoint(MigrationState *s) 228 { 229 QIOChannelBuffer *bioc; 230 QEMUFile *fb = NULL; 231 int64_t current_time, checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 232 Error *local_err = NULL; 233 int ret; 234 235 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file); 236 if (!s->rp_state.from_dst_file) { 237 error_report("Open QEMUFile from_dst_file failed"); 238 goto out; 239 } 240 241 /* 242 * Wait for Secondary finish loading VM states and enter COLO 243 * restore. 244 */ 245 colo_receive_check_message(s->rp_state.from_dst_file, 246 COLO_MESSAGE_CHECKPOINT_READY, &local_err); 247 if (local_err) { 248 goto out; 249 } 250 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 251 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc)); 252 object_unref(OBJECT(bioc)); 253 254 qemu_mutex_lock_iothread(); 255 vm_start(); 256 qemu_mutex_unlock_iothread(); 257 trace_colo_vm_state_change("stop", "run"); 258 259 while (s->state == MIGRATION_STATUS_COLO) { 260 current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 261 if (current_time - checkpoint_time < 262 s->parameters.x_checkpoint_delay) { 263 int64_t delay_ms; 264 265 delay_ms = s->parameters.x_checkpoint_delay - 266 (current_time - checkpoint_time); 267 g_usleep(delay_ms * 1000); 268 } 269 ret = colo_do_checkpoint_transaction(s, bioc, fb); 270 if (ret < 0) { 271 goto out; 272 } 273 checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST); 274 } 275 276 out: 277 /* Throw the unreported error message after exited from loop */ 278 if (local_err) { 279 error_report_err(local_err); 280 } 281 282 if (fb) { 283 qemu_fclose(fb); 284 } 285 286 if (s->rp_state.from_dst_file) { 287 qemu_fclose(s->rp_state.from_dst_file); 288 } 289 } 290 291 void migrate_start_colo_process(MigrationState *s) 292 { 293 qemu_mutex_unlock_iothread(); 294 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE, 295 MIGRATION_STATUS_COLO); 296 colo_process_checkpoint(s); 297 qemu_mutex_lock_iothread(); 298 } 299 300 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request, 301 Error **errp) 302 { 303 COLOMessage msg; 304 Error *local_err = NULL; 305 306 msg = colo_receive_message(f, &local_err); 307 if (local_err) { 308 error_propagate(errp, local_err); 309 return; 310 } 311 312 switch (msg) { 313 case COLO_MESSAGE_CHECKPOINT_REQUEST: 314 *checkpoint_request = 1; 315 break; 316 default: 317 *checkpoint_request = 0; 318 error_setg(errp, "Got unknown COLO message: %d", msg); 319 break; 320 } 321 } 322 323 void *colo_process_incoming_thread(void *opaque) 324 { 325 MigrationIncomingState *mis = opaque; 326 QEMUFile *fb = NULL; 327 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */ 328 uint64_t total_size; 329 uint64_t value; 330 Error *local_err = NULL; 331 332 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, 333 MIGRATION_STATUS_COLO); 334 335 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file); 336 if (!mis->to_src_file) { 337 error_report("COLO incoming thread: Open QEMUFile to_src_file failed"); 338 goto out; 339 } 340 /* 341 * Note: the communication between Primary side and Secondary side 342 * should be sequential, we set the fd to unblocked in migration incoming 343 * coroutine, and here we are in the COLO incoming thread, so it is ok to 344 * set the fd back to blocked. 345 */ 346 qemu_file_set_blocking(mis->from_src_file, true); 347 348 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE); 349 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); 350 object_unref(OBJECT(bioc)); 351 352 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, 353 &local_err); 354 if (local_err) { 355 goto out; 356 } 357 358 while (mis->state == MIGRATION_STATUS_COLO) { 359 int request; 360 361 colo_wait_handle_message(mis->from_src_file, &request, &local_err); 362 if (local_err) { 363 goto out; 364 } 365 assert(request); 366 /* FIXME: This is unnecessary for periodic checkpoint mode */ 367 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, 368 &local_err); 369 if (local_err) { 370 goto out; 371 } 372 373 colo_receive_check_message(mis->from_src_file, 374 COLO_MESSAGE_VMSTATE_SEND, &local_err); 375 if (local_err) { 376 goto out; 377 } 378 379 value = colo_receive_message_value(mis->from_src_file, 380 COLO_MESSAGE_VMSTATE_SIZE, &local_err); 381 if (local_err) { 382 goto out; 383 } 384 385 /* 386 * Read VM device state data into channel buffer, 387 * It's better to re-use the memory allocated. 388 * Here we need to handle the channel buffer directly. 389 */ 390 if (value > bioc->capacity) { 391 bioc->capacity = value; 392 bioc->data = g_realloc(bioc->data, bioc->capacity); 393 } 394 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value); 395 if (total_size != value) { 396 error_report("Got %" PRIu64 " VMState data, less than expected" 397 " %" PRIu64, total_size, value); 398 goto out; 399 } 400 bioc->usage = total_size; 401 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL); 402 403 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED, 404 &local_err); 405 if (local_err) { 406 goto out; 407 } 408 409 qemu_mutex_lock_iothread(); 410 qemu_system_reset(VMRESET_SILENT); 411 if (qemu_loadvm_state(fb) < 0) { 412 error_report("COLO: loadvm failed"); 413 qemu_mutex_unlock_iothread(); 414 goto out; 415 } 416 qemu_mutex_unlock_iothread(); 417 418 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED, 419 &local_err); 420 if (local_err) { 421 goto out; 422 } 423 } 424 425 out: 426 /* Throw the unreported error message after exited from loop */ 427 if (local_err) { 428 error_report_err(local_err); 429 } 430 431 if (fb) { 432 qemu_fclose(fb); 433 } 434 435 if (mis->to_src_file) { 436 qemu_fclose(mis->to_src_file); 437 } 438 migration_incoming_exit_colo(); 439 440 return NULL; 441 } 442