xref: /openbmc/qemu/migration/colo.c (revision 4a9b31b8)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "sysemu/sysemu.h"
15 #include "qapi/error.h"
16 #include "qapi/qapi-commands-migration.h"
17 #include "qemu-file-channel.h"
18 #include "migration.h"
19 #include "qemu-file.h"
20 #include "savevm.h"
21 #include "migration/colo.h"
22 #include "block.h"
23 #include "io/channel-buffer.h"
24 #include "trace.h"
25 #include "qemu/error-report.h"
26 #include "migration/failover.h"
27 #include "replication.h"
28 #include "net/colo-compare.h"
29 #include "net/colo.h"
30 #include "block/block.h"
31 #include "qapi/qapi-events-migration.h"
32 #include "qapi/qmp/qerror.h"
33 #include "sysemu/cpus.h"
34 #include "net/filter.h"
35 
36 static bool vmstate_loading;
37 static Notifier packets_compare_notifier;
38 
39 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
40 
41 bool migration_in_colo_state(void)
42 {
43     MigrationState *s = migrate_get_current();
44 
45     return (s->state == MIGRATION_STATUS_COLO);
46 }
47 
48 bool migration_incoming_in_colo_state(void)
49 {
50     MigrationIncomingState *mis = migration_incoming_get_current();
51 
52     return mis && (mis->state == MIGRATION_STATUS_COLO);
53 }
54 
55 static bool colo_runstate_is_stopped(void)
56 {
57     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
58 }
59 
60 static void secondary_vm_do_failover(void)
61 {
62     int old_state;
63     MigrationIncomingState *mis = migration_incoming_get_current();
64     Error *local_err = NULL;
65 
66     /* Can not do failover during the process of VM's loading VMstate, Or
67      * it will break the secondary VM.
68      */
69     if (vmstate_loading) {
70         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
71                         FAILOVER_STATUS_RELAUNCH);
72         if (old_state != FAILOVER_STATUS_ACTIVE) {
73             error_report("Unknown error while do failover for secondary VM,"
74                          "old_state: %s", FailoverStatus_str(old_state));
75         }
76         return;
77     }
78 
79     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
80                       MIGRATION_STATUS_COMPLETED);
81 
82     replication_stop_all(true, &local_err);
83     if (local_err) {
84         error_report_err(local_err);
85     }
86 
87     /* Notify all filters of all NIC to do checkpoint */
88     colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
89     if (local_err) {
90         error_report_err(local_err);
91     }
92 
93     if (!autostart) {
94         error_report("\"-S\" qemu option will be ignored in secondary side");
95         /* recover runstate to normal migration finish state */
96         autostart = true;
97     }
98     /*
99      * Make sure COLO incoming thread not block in recv or send,
100      * If mis->from_src_file and mis->to_src_file use the same fd,
101      * The second shutdown() will return -1, we ignore this value,
102      * It is harmless.
103      */
104     if (mis->from_src_file) {
105         qemu_file_shutdown(mis->from_src_file);
106     }
107     if (mis->to_src_file) {
108         qemu_file_shutdown(mis->to_src_file);
109     }
110 
111     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
112                                    FAILOVER_STATUS_COMPLETED);
113     if (old_state != FAILOVER_STATUS_ACTIVE) {
114         error_report("Incorrect state (%s) while doing failover for "
115                      "secondary VM", FailoverStatus_str(old_state));
116         return;
117     }
118     /* Notify COLO incoming thread that failover work is finished */
119     qemu_sem_post(&mis->colo_incoming_sem);
120     /* For Secondary VM, jump to incoming co */
121     if (mis->migration_incoming_co) {
122         qemu_coroutine_enter(mis->migration_incoming_co);
123     }
124 }
125 
126 static void primary_vm_do_failover(void)
127 {
128     MigrationState *s = migrate_get_current();
129     int old_state;
130     Error *local_err = NULL;
131 
132     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
133                       MIGRATION_STATUS_COMPLETED);
134     /*
135      * kick COLO thread which might wait at
136      * qemu_sem_wait(&s->colo_checkpoint_sem).
137      */
138     colo_checkpoint_notify(migrate_get_current());
139 
140     /*
141      * Wake up COLO thread which may blocked in recv() or send(),
142      * The s->rp_state.from_dst_file and s->to_dst_file may use the
143      * same fd, but we still shutdown the fd for twice, it is harmless.
144      */
145     if (s->to_dst_file) {
146         qemu_file_shutdown(s->to_dst_file);
147     }
148     if (s->rp_state.from_dst_file) {
149         qemu_file_shutdown(s->rp_state.from_dst_file);
150     }
151 
152     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
153                                    FAILOVER_STATUS_COMPLETED);
154     if (old_state != FAILOVER_STATUS_ACTIVE) {
155         error_report("Incorrect state (%s) while doing failover for Primary VM",
156                      FailoverStatus_str(old_state));
157         return;
158     }
159 
160     replication_stop_all(true, &local_err);
161     if (local_err) {
162         error_report_err(local_err);
163         local_err = NULL;
164     }
165 
166     /* Notify COLO thread that failover work is finished */
167     qemu_sem_post(&s->colo_exit_sem);
168 }
169 
170 COLOMode get_colo_mode(void)
171 {
172     if (migration_in_colo_state()) {
173         return COLO_MODE_PRIMARY;
174     } else if (migration_incoming_in_colo_state()) {
175         return COLO_MODE_SECONDARY;
176     } else {
177         return COLO_MODE_NONE;
178     }
179 }
180 
181 void colo_do_failover(MigrationState *s)
182 {
183     /* Make sure VM stopped while failover happened. */
184     if (!colo_runstate_is_stopped()) {
185         vm_stop_force_state(RUN_STATE_COLO);
186     }
187 
188     if (get_colo_mode() == COLO_MODE_PRIMARY) {
189         primary_vm_do_failover();
190     } else {
191         secondary_vm_do_failover();
192     }
193 }
194 
195 void qmp_xen_set_replication(bool enable, bool primary,
196                              bool has_failover, bool failover,
197                              Error **errp)
198 {
199 #ifdef CONFIG_REPLICATION
200     ReplicationMode mode = primary ?
201                            REPLICATION_MODE_PRIMARY :
202                            REPLICATION_MODE_SECONDARY;
203 
204     if (has_failover && enable) {
205         error_setg(errp, "Parameter 'failover' is only for"
206                    " stopping replication");
207         return;
208     }
209 
210     if (enable) {
211         replication_start_all(mode, errp);
212     } else {
213         if (!has_failover) {
214             failover = NULL;
215         }
216         replication_stop_all(failover, failover ? NULL : errp);
217     }
218 #else
219     abort();
220 #endif
221 }
222 
223 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
224 {
225 #ifdef CONFIG_REPLICATION
226     Error *err = NULL;
227     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
228 
229     replication_get_error_all(&err);
230     if (err) {
231         s->error = true;
232         s->has_desc = true;
233         s->desc = g_strdup(error_get_pretty(err));
234     } else {
235         s->error = false;
236     }
237 
238     error_free(err);
239     return s;
240 #else
241     abort();
242 #endif
243 }
244 
245 void qmp_xen_colo_do_checkpoint(Error **errp)
246 {
247 #ifdef CONFIG_REPLICATION
248     replication_do_checkpoint_all(errp);
249 #else
250     abort();
251 #endif
252 }
253 
254 COLOStatus *qmp_query_colo_status(Error **errp)
255 {
256     COLOStatus *s = g_new0(COLOStatus, 1);
257 
258     s->mode = get_colo_mode();
259 
260     switch (failover_get_state()) {
261     case FAILOVER_STATUS_NONE:
262         s->reason = COLO_EXIT_REASON_NONE;
263         break;
264     case FAILOVER_STATUS_REQUIRE:
265         s->reason = COLO_EXIT_REASON_REQUEST;
266         break;
267     default:
268         s->reason = COLO_EXIT_REASON_ERROR;
269     }
270 
271     return s;
272 }
273 
274 static void colo_send_message(QEMUFile *f, COLOMessage msg,
275                               Error **errp)
276 {
277     int ret;
278 
279     if (msg >= COLO_MESSAGE__MAX) {
280         error_setg(errp, "%s: Invalid message", __func__);
281         return;
282     }
283     qemu_put_be32(f, msg);
284     qemu_fflush(f);
285 
286     ret = qemu_file_get_error(f);
287     if (ret < 0) {
288         error_setg_errno(errp, -ret, "Can't send COLO message");
289     }
290     trace_colo_send_message(COLOMessage_str(msg));
291 }
292 
293 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
294                                     uint64_t value, Error **errp)
295 {
296     Error *local_err = NULL;
297     int ret;
298 
299     colo_send_message(f, msg, &local_err);
300     if (local_err) {
301         error_propagate(errp, local_err);
302         return;
303     }
304     qemu_put_be64(f, value);
305     qemu_fflush(f);
306 
307     ret = qemu_file_get_error(f);
308     if (ret < 0) {
309         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
310                          COLOMessage_str(msg));
311     }
312 }
313 
314 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
315 {
316     COLOMessage msg;
317     int ret;
318 
319     msg = qemu_get_be32(f);
320     ret = qemu_file_get_error(f);
321     if (ret < 0) {
322         error_setg_errno(errp, -ret, "Can't receive COLO message");
323         return msg;
324     }
325     if (msg >= COLO_MESSAGE__MAX) {
326         error_setg(errp, "%s: Invalid message", __func__);
327         return msg;
328     }
329     trace_colo_receive_message(COLOMessage_str(msg));
330     return msg;
331 }
332 
333 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
334                                        Error **errp)
335 {
336     COLOMessage msg;
337     Error *local_err = NULL;
338 
339     msg = colo_receive_message(f, &local_err);
340     if (local_err) {
341         error_propagate(errp, local_err);
342         return;
343     }
344     if (msg != expect_msg) {
345         error_setg(errp, "Unexpected COLO message %d, expected %d",
346                           msg, expect_msg);
347     }
348 }
349 
350 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
351                                            Error **errp)
352 {
353     Error *local_err = NULL;
354     uint64_t value;
355     int ret;
356 
357     colo_receive_check_message(f, expect_msg, &local_err);
358     if (local_err) {
359         error_propagate(errp, local_err);
360         return 0;
361     }
362 
363     value = qemu_get_be64(f);
364     ret = qemu_file_get_error(f);
365     if (ret < 0) {
366         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
367                          COLOMessage_str(expect_msg));
368     }
369     return value;
370 }
371 
372 static int colo_do_checkpoint_transaction(MigrationState *s,
373                                           QIOChannelBuffer *bioc,
374                                           QEMUFile *fb)
375 {
376     Error *local_err = NULL;
377     int ret = -1;
378 
379     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
380                       &local_err);
381     if (local_err) {
382         goto out;
383     }
384 
385     colo_receive_check_message(s->rp_state.from_dst_file,
386                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
387     if (local_err) {
388         goto out;
389     }
390     /* Reset channel-buffer directly */
391     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
392     bioc->usage = 0;
393 
394     qemu_mutex_lock_iothread();
395     if (failover_get_state() != FAILOVER_STATUS_NONE) {
396         qemu_mutex_unlock_iothread();
397         goto out;
398     }
399     vm_stop_force_state(RUN_STATE_COLO);
400     qemu_mutex_unlock_iothread();
401     trace_colo_vm_state_change("run", "stop");
402     /*
403      * Failover request bh could be called after vm_stop_force_state(),
404      * So we need check failover_request_is_active() again.
405      */
406     if (failover_get_state() != FAILOVER_STATUS_NONE) {
407         goto out;
408     }
409 
410     colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
411     if (local_err) {
412         goto out;
413     }
414 
415     /* Disable block migration */
416     migrate_set_block_enabled(false, &local_err);
417     qemu_mutex_lock_iothread();
418     replication_do_checkpoint_all(&local_err);
419     if (local_err) {
420         qemu_mutex_unlock_iothread();
421         goto out;
422     }
423 
424     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
425     if (local_err) {
426         qemu_mutex_unlock_iothread();
427         goto out;
428     }
429     /* Note: device state is saved into buffer */
430     ret = qemu_save_device_state(fb);
431 
432     qemu_mutex_unlock_iothread();
433     if (ret < 0) {
434         goto out;
435     }
436     /*
437      * Only save VM's live state, which not including device state.
438      * TODO: We may need a timeout mechanism to prevent COLO process
439      * to be blocked here.
440      */
441     qemu_savevm_live_state(s->to_dst_file);
442 
443     qemu_fflush(fb);
444 
445     /*
446      * We need the size of the VMstate data in Secondary side,
447      * With which we can decide how much data should be read.
448      */
449     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
450                             bioc->usage, &local_err);
451     if (local_err) {
452         goto out;
453     }
454 
455     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
456     qemu_fflush(s->to_dst_file);
457     ret = qemu_file_get_error(s->to_dst_file);
458     if (ret < 0) {
459         goto out;
460     }
461 
462     colo_receive_check_message(s->rp_state.from_dst_file,
463                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
464     if (local_err) {
465         goto out;
466     }
467 
468     colo_receive_check_message(s->rp_state.from_dst_file,
469                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
470     if (local_err) {
471         goto out;
472     }
473 
474     ret = 0;
475 
476     qemu_mutex_lock_iothread();
477     vm_start();
478     qemu_mutex_unlock_iothread();
479     trace_colo_vm_state_change("stop", "run");
480 
481 out:
482     if (local_err) {
483         error_report_err(local_err);
484     }
485     return ret;
486 }
487 
488 static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
489 {
490     colo_checkpoint_notify(data);
491 }
492 
493 static void colo_process_checkpoint(MigrationState *s)
494 {
495     QIOChannelBuffer *bioc;
496     QEMUFile *fb = NULL;
497     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
498     Error *local_err = NULL;
499     int ret;
500 
501     failover_init_state();
502 
503     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
504     if (!s->rp_state.from_dst_file) {
505         error_report("Open QEMUFile from_dst_file failed");
506         goto out;
507     }
508 
509     packets_compare_notifier.notify = colo_compare_notify_checkpoint;
510     colo_compare_register_notifier(&packets_compare_notifier);
511 
512     /*
513      * Wait for Secondary finish loading VM states and enter COLO
514      * restore.
515      */
516     colo_receive_check_message(s->rp_state.from_dst_file,
517                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
518     if (local_err) {
519         goto out;
520     }
521     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
522     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
523     object_unref(OBJECT(bioc));
524 
525     qemu_mutex_lock_iothread();
526     replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
527     if (local_err) {
528         qemu_mutex_unlock_iothread();
529         goto out;
530     }
531 
532     vm_start();
533     qemu_mutex_unlock_iothread();
534     trace_colo_vm_state_change("stop", "run");
535 
536     timer_mod(s->colo_delay_timer,
537             current_time + s->parameters.x_checkpoint_delay);
538 
539     while (s->state == MIGRATION_STATUS_COLO) {
540         if (failover_get_state() != FAILOVER_STATUS_NONE) {
541             error_report("failover request");
542             goto out;
543         }
544 
545         qemu_sem_wait(&s->colo_checkpoint_sem);
546 
547         if (s->state != MIGRATION_STATUS_COLO) {
548             goto out;
549         }
550         ret = colo_do_checkpoint_transaction(s, bioc, fb);
551         if (ret < 0) {
552             goto out;
553         }
554     }
555 
556 out:
557     /* Throw the unreported error message after exited from loop */
558     if (local_err) {
559         error_report_err(local_err);
560     }
561 
562     if (fb) {
563         qemu_fclose(fb);
564     }
565 
566     /*
567      * There are only two reasons we can get here, some error happened
568      * or the user triggered failover.
569      */
570     switch (failover_get_state()) {
571     case FAILOVER_STATUS_NONE:
572         qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
573                                   COLO_EXIT_REASON_ERROR);
574         break;
575     case FAILOVER_STATUS_REQUIRE:
576         qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
577                                   COLO_EXIT_REASON_REQUEST);
578         break;
579     default:
580         abort();
581     }
582 
583     /* Hope this not to be too long to wait here */
584     qemu_sem_wait(&s->colo_exit_sem);
585     qemu_sem_destroy(&s->colo_exit_sem);
586 
587     /*
588      * It is safe to unregister notifier after failover finished.
589      * Besides, colo_delay_timer and colo_checkpoint_sem can't be
590      * released befor unregister notifier, or there will be use-after-free
591      * error.
592      */
593     colo_compare_unregister_notifier(&packets_compare_notifier);
594     timer_del(s->colo_delay_timer);
595     timer_free(s->colo_delay_timer);
596     qemu_sem_destroy(&s->colo_checkpoint_sem);
597 
598     /*
599      * Must be called after failover BH is completed,
600      * Or the failover BH may shutdown the wrong fd that
601      * re-used by other threads after we release here.
602      */
603     if (s->rp_state.from_dst_file) {
604         qemu_fclose(s->rp_state.from_dst_file);
605     }
606 }
607 
608 void colo_checkpoint_notify(void *opaque)
609 {
610     MigrationState *s = opaque;
611     int64_t next_notify_time;
612 
613     qemu_sem_post(&s->colo_checkpoint_sem);
614     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
615     next_notify_time = s->colo_checkpoint_time +
616                     s->parameters.x_checkpoint_delay;
617     timer_mod(s->colo_delay_timer, next_notify_time);
618 }
619 
620 void migrate_start_colo_process(MigrationState *s)
621 {
622     qemu_mutex_unlock_iothread();
623     qemu_sem_init(&s->colo_checkpoint_sem, 0);
624     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
625                                 colo_checkpoint_notify, s);
626 
627     qemu_sem_init(&s->colo_exit_sem, 0);
628     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
629                       MIGRATION_STATUS_COLO);
630     colo_process_checkpoint(s);
631     qemu_mutex_lock_iothread();
632 }
633 
634 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
635                                      Error **errp)
636 {
637     COLOMessage msg;
638     Error *local_err = NULL;
639 
640     msg = colo_receive_message(f, &local_err);
641     if (local_err) {
642         error_propagate(errp, local_err);
643         return;
644     }
645 
646     switch (msg) {
647     case COLO_MESSAGE_CHECKPOINT_REQUEST:
648         *checkpoint_request = 1;
649         break;
650     default:
651         *checkpoint_request = 0;
652         error_setg(errp, "Got unknown COLO message: %d", msg);
653         break;
654     }
655 }
656 
657 void *colo_process_incoming_thread(void *opaque)
658 {
659     MigrationIncomingState *mis = opaque;
660     QEMUFile *fb = NULL;
661     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
662     uint64_t total_size;
663     uint64_t value;
664     Error *local_err = NULL;
665     int ret;
666 
667     rcu_register_thread();
668     qemu_sem_init(&mis->colo_incoming_sem, 0);
669 
670     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
671                       MIGRATION_STATUS_COLO);
672 
673     failover_init_state();
674 
675     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
676     if (!mis->to_src_file) {
677         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
678         goto out;
679     }
680     /*
681      * Note: the communication between Primary side and Secondary side
682      * should be sequential, we set the fd to unblocked in migration incoming
683      * coroutine, and here we are in the COLO incoming thread, so it is ok to
684      * set the fd back to blocked.
685      */
686     qemu_file_set_blocking(mis->from_src_file, true);
687 
688     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
689     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
690     object_unref(OBJECT(bioc));
691 
692     qemu_mutex_lock_iothread();
693     replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
694     if (local_err) {
695         qemu_mutex_unlock_iothread();
696         goto out;
697     }
698     vm_start();
699     trace_colo_vm_state_change("stop", "run");
700     qemu_mutex_unlock_iothread();
701 
702     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
703                       &local_err);
704     if (local_err) {
705         goto out;
706     }
707 
708     while (mis->state == MIGRATION_STATUS_COLO) {
709         int request = 0;
710 
711         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
712         if (local_err) {
713             goto out;
714         }
715         assert(request);
716         if (failover_get_state() != FAILOVER_STATUS_NONE) {
717             error_report("failover request");
718             goto out;
719         }
720 
721         qemu_mutex_lock_iothread();
722         vm_stop_force_state(RUN_STATE_COLO);
723         trace_colo_vm_state_change("run", "stop");
724         qemu_mutex_unlock_iothread();
725 
726         /* FIXME: This is unnecessary for periodic checkpoint mode */
727         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
728                      &local_err);
729         if (local_err) {
730             goto out;
731         }
732 
733         colo_receive_check_message(mis->from_src_file,
734                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
735         if (local_err) {
736             goto out;
737         }
738 
739         qemu_mutex_lock_iothread();
740         cpu_synchronize_all_pre_loadvm();
741         ret = qemu_loadvm_state_main(mis->from_src_file, mis);
742         qemu_mutex_unlock_iothread();
743 
744         if (ret < 0) {
745             error_report("Load VM's live state (ram) error");
746             goto out;
747         }
748 
749         value = colo_receive_message_value(mis->from_src_file,
750                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
751         if (local_err) {
752             goto out;
753         }
754 
755         /*
756          * Read VM device state data into channel buffer,
757          * It's better to re-use the memory allocated.
758          * Here we need to handle the channel buffer directly.
759          */
760         if (value > bioc->capacity) {
761             bioc->capacity = value;
762             bioc->data = g_realloc(bioc->data, bioc->capacity);
763         }
764         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
765         if (total_size != value) {
766             error_report("Got %" PRIu64 " VMState data, less than expected"
767                         " %" PRIu64, total_size, value);
768             goto out;
769         }
770         bioc->usage = total_size;
771         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
772 
773         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
774                      &local_err);
775         if (local_err) {
776             goto out;
777         }
778 
779         qemu_mutex_lock_iothread();
780         vmstate_loading = true;
781         ret = qemu_load_device_state(fb);
782         if (ret < 0) {
783             error_report("COLO: load device state failed");
784             qemu_mutex_unlock_iothread();
785             goto out;
786         }
787 
788         replication_get_error_all(&local_err);
789         if (local_err) {
790             qemu_mutex_unlock_iothread();
791             goto out;
792         }
793         /* discard colo disk buffer */
794         replication_do_checkpoint_all(&local_err);
795         if (local_err) {
796             qemu_mutex_unlock_iothread();
797             goto out;
798         }
799 
800         /* Notify all filters of all NIC to do checkpoint */
801         colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
802 
803         if (local_err) {
804             qemu_mutex_unlock_iothread();
805             goto out;
806         }
807 
808         vmstate_loading = false;
809         vm_start();
810         trace_colo_vm_state_change("stop", "run");
811         qemu_mutex_unlock_iothread();
812 
813         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
814             failover_set_state(FAILOVER_STATUS_RELAUNCH,
815                             FAILOVER_STATUS_NONE);
816             failover_request_active(NULL);
817             goto out;
818         }
819 
820         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
821                      &local_err);
822         if (local_err) {
823             goto out;
824         }
825     }
826 
827 out:
828     vmstate_loading = false;
829     /* Throw the unreported error message after exited from loop */
830     if (local_err) {
831         error_report_err(local_err);
832     }
833 
834     switch (failover_get_state()) {
835     case FAILOVER_STATUS_NONE:
836         qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
837                                   COLO_EXIT_REASON_ERROR);
838         break;
839     case FAILOVER_STATUS_REQUIRE:
840         qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
841                                   COLO_EXIT_REASON_REQUEST);
842         break;
843     default:
844         abort();
845     }
846 
847     if (fb) {
848         qemu_fclose(fb);
849     }
850 
851     /* Hope this not to be too long to loop here */
852     qemu_sem_wait(&mis->colo_incoming_sem);
853     qemu_sem_destroy(&mis->colo_incoming_sem);
854     /* Must be called after failover BH is completed */
855     if (mis->to_src_file) {
856         qemu_fclose(mis->to_src_file);
857     }
858     migration_incoming_disable_colo();
859 
860     rcu_unregister_thread();
861     return NULL;
862 }
863