xref: /openbmc/qemu/migration/colo.c (revision 7c08eefc)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "sysemu/sysemu.h"
15 #include "qapi/error.h"
16 #include "qapi/qapi-commands-migration.h"
17 #include "migration.h"
18 #include "qemu-file.h"
19 #include "savevm.h"
20 #include "migration/colo.h"
21 #include "io/channel-buffer.h"
22 #include "trace.h"
23 #include "qemu/error-report.h"
24 #include "qemu/main-loop.h"
25 #include "qemu/rcu.h"
26 #include "migration/failover.h"
27 #include "migration/ram.h"
28 #include "block/replication.h"
29 #include "net/colo-compare.h"
30 #include "net/colo.h"
31 #include "block/block.h"
32 #include "qapi/qapi-events-migration.h"
33 #include "sysemu/cpus.h"
34 #include "sysemu/runstate.h"
35 #include "net/filter.h"
36 #include "options.h"
37 
38 static bool vmstate_loading;
39 static Notifier packets_compare_notifier;
40 
41 /* User need to know colo mode after COLO failover */
42 static COLOMode last_colo_mode;
43 
44 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
45 
46 bool migration_in_colo_state(void)
47 {
48     MigrationState *s = migrate_get_current();
49 
50     return (s->state == MIGRATION_STATUS_COLO);
51 }
52 
53 bool migration_incoming_in_colo_state(void)
54 {
55     MigrationIncomingState *mis = migration_incoming_get_current();
56 
57     return mis && (mis->state == MIGRATION_STATUS_COLO);
58 }
59 
60 static bool colo_runstate_is_stopped(void)
61 {
62     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
63 }
64 
65 static void colo_checkpoint_notify(void)
66 {
67     MigrationState *s = migrate_get_current();
68     int64_t next_notify_time;
69 
70     qemu_event_set(&s->colo_checkpoint_event);
71     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
72     next_notify_time = s->colo_checkpoint_time + migrate_checkpoint_delay();
73     timer_mod(s->colo_delay_timer, next_notify_time);
74 }
75 
76 static void colo_checkpoint_notify_timer(void *opaque)
77 {
78     colo_checkpoint_notify();
79 }
80 
81 void colo_checkpoint_delay_set(void)
82 {
83     if (migration_in_colo_state()) {
84         colo_checkpoint_notify();
85     }
86 }
87 
88 static void secondary_vm_do_failover(void)
89 {
90 /* COLO needs enable block-replication */
91     int old_state;
92     MigrationIncomingState *mis = migration_incoming_get_current();
93     Error *local_err = NULL;
94 
95     /* Can not do failover during the process of VM's loading VMstate, Or
96      * it will break the secondary VM.
97      */
98     if (vmstate_loading) {
99         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
100                         FAILOVER_STATUS_RELAUNCH);
101         if (old_state != FAILOVER_STATUS_ACTIVE) {
102             error_report("Unknown error while do failover for secondary VM,"
103                          "old_state: %s", FailoverStatus_str(old_state));
104         }
105         return;
106     }
107 
108     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
109                       MIGRATION_STATUS_COMPLETED);
110 
111     replication_stop_all(true, &local_err);
112     if (local_err) {
113         error_report_err(local_err);
114         local_err = NULL;
115     }
116 
117     /* Notify all filters of all NIC to do checkpoint */
118     colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err);
119     if (local_err) {
120         error_report_err(local_err);
121     }
122 
123     if (!autostart) {
124         error_report("\"-S\" qemu option will be ignored in secondary side");
125         /* recover runstate to normal migration finish state */
126         autostart = true;
127     }
128     /*
129      * Make sure COLO incoming thread not block in recv or send,
130      * If mis->from_src_file and mis->to_src_file use the same fd,
131      * The second shutdown() will return -1, we ignore this value,
132      * It is harmless.
133      */
134     if (mis->from_src_file) {
135         qemu_file_shutdown(mis->from_src_file);
136     }
137     if (mis->to_src_file) {
138         qemu_file_shutdown(mis->to_src_file);
139     }
140 
141     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
142                                    FAILOVER_STATUS_COMPLETED);
143     if (old_state != FAILOVER_STATUS_ACTIVE) {
144         error_report("Incorrect state (%s) while doing failover for "
145                      "secondary VM", FailoverStatus_str(old_state));
146         return;
147     }
148     /* Notify COLO incoming thread that failover work is finished */
149     qemu_sem_post(&mis->colo_incoming_sem);
150 
151     /* For Secondary VM, jump to incoming co */
152     if (mis->colo_incoming_co) {
153         qemu_coroutine_enter(mis->colo_incoming_co);
154     }
155 }
156 
157 static void primary_vm_do_failover(void)
158 {
159     MigrationState *s = migrate_get_current();
160     int old_state;
161     Error *local_err = NULL;
162 
163     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
164                       MIGRATION_STATUS_COMPLETED);
165     /*
166      * kick COLO thread which might wait at
167      * qemu_sem_wait(&s->colo_checkpoint_sem).
168      */
169     colo_checkpoint_notify();
170 
171     /*
172      * Wake up COLO thread which may blocked in recv() or send(),
173      * The s->rp_state.from_dst_file and s->to_dst_file may use the
174      * same fd, but we still shutdown the fd for twice, it is harmless.
175      */
176     if (s->to_dst_file) {
177         qemu_file_shutdown(s->to_dst_file);
178     }
179     if (s->rp_state.from_dst_file) {
180         qemu_file_shutdown(s->rp_state.from_dst_file);
181     }
182 
183     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
184                                    FAILOVER_STATUS_COMPLETED);
185     if (old_state != FAILOVER_STATUS_ACTIVE) {
186         error_report("Incorrect state (%s) while doing failover for Primary VM",
187                      FailoverStatus_str(old_state));
188         return;
189     }
190 
191     replication_stop_all(true, &local_err);
192     if (local_err) {
193         error_report_err(local_err);
194         local_err = NULL;
195     }
196 
197     /* Notify COLO thread that failover work is finished */
198     qemu_sem_post(&s->colo_exit_sem);
199 }
200 
201 COLOMode get_colo_mode(void)
202 {
203     if (migration_in_colo_state()) {
204         return COLO_MODE_PRIMARY;
205     } else if (migration_incoming_in_colo_state()) {
206         return COLO_MODE_SECONDARY;
207     } else {
208         return COLO_MODE_NONE;
209     }
210 }
211 
212 void colo_do_failover(void)
213 {
214     /* Make sure VM stopped while failover happened. */
215     if (!colo_runstate_is_stopped()) {
216         vm_stop_force_state(RUN_STATE_COLO);
217     }
218 
219     switch (last_colo_mode = get_colo_mode()) {
220     case COLO_MODE_PRIMARY:
221         primary_vm_do_failover();
222         break;
223     case COLO_MODE_SECONDARY:
224         secondary_vm_do_failover();
225         break;
226     default:
227         error_report("colo_do_failover failed because the colo mode"
228                      " could not be obtained");
229     }
230 }
231 
232 void qmp_xen_set_replication(bool enable, bool primary,
233                              bool has_failover, bool failover,
234                              Error **errp)
235 {
236     ReplicationMode mode = primary ?
237                            REPLICATION_MODE_PRIMARY :
238                            REPLICATION_MODE_SECONDARY;
239 
240     if (has_failover && enable) {
241         error_setg(errp, "Parameter 'failover' is only for"
242                    " stopping replication");
243         return;
244     }
245 
246     if (enable) {
247         replication_start_all(mode, errp);
248     } else {
249         if (!has_failover) {
250             failover = NULL;
251         }
252         replication_stop_all(failover, failover ? NULL : errp);
253     }
254 }
255 
256 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
257 {
258     Error *err = NULL;
259     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
260 
261     replication_get_error_all(&err);
262     if (err) {
263         s->error = true;
264         s->desc = g_strdup(error_get_pretty(err));
265     } else {
266         s->error = false;
267     }
268 
269     error_free(err);
270     return s;
271 }
272 
273 void qmp_xen_colo_do_checkpoint(Error **errp)
274 {
275     Error *err = NULL;
276 
277     replication_do_checkpoint_all(&err);
278     if (err) {
279         error_propagate(errp, err);
280         return;
281     }
282     /* Notify all filters of all NIC to do checkpoint */
283     colo_notify_filters_event(COLO_EVENT_CHECKPOINT, errp);
284 }
285 
286 COLOStatus *qmp_query_colo_status(Error **errp)
287 {
288     COLOStatus *s = g_new0(COLOStatus, 1);
289 
290     s->mode = get_colo_mode();
291     s->last_mode = last_colo_mode;
292 
293     switch (failover_get_state()) {
294     case FAILOVER_STATUS_NONE:
295         s->reason = COLO_EXIT_REASON_NONE;
296         break;
297     case FAILOVER_STATUS_COMPLETED:
298         s->reason = COLO_EXIT_REASON_REQUEST;
299         break;
300     default:
301         if (migration_in_colo_state()) {
302             s->reason = COLO_EXIT_REASON_PROCESSING;
303         } else {
304             s->reason = COLO_EXIT_REASON_ERROR;
305         }
306     }
307 
308     return s;
309 }
310 
311 static void colo_send_message(QEMUFile *f, COLOMessage msg,
312                               Error **errp)
313 {
314     int ret;
315 
316     if (msg >= COLO_MESSAGE__MAX) {
317         error_setg(errp, "%s: Invalid message", __func__);
318         return;
319     }
320     qemu_put_be32(f, msg);
321     ret = qemu_fflush(f);
322     if (ret < 0) {
323         error_setg_errno(errp, -ret, "Can't send COLO message");
324     }
325     trace_colo_send_message(COLOMessage_str(msg));
326 }
327 
328 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
329                                     uint64_t value, Error **errp)
330 {
331     Error *local_err = NULL;
332     int ret;
333 
334     colo_send_message(f, msg, &local_err);
335     if (local_err) {
336         error_propagate(errp, local_err);
337         return;
338     }
339     qemu_put_be64(f, value);
340     ret = qemu_fflush(f);
341     if (ret < 0) {
342         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
343                          COLOMessage_str(msg));
344     }
345 }
346 
347 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
348 {
349     COLOMessage msg;
350     int ret;
351 
352     msg = qemu_get_be32(f);
353     ret = qemu_file_get_error(f);
354     if (ret < 0) {
355         error_setg_errno(errp, -ret, "Can't receive COLO message");
356         return msg;
357     }
358     if (msg >= COLO_MESSAGE__MAX) {
359         error_setg(errp, "%s: Invalid message", __func__);
360         return msg;
361     }
362     trace_colo_receive_message(COLOMessage_str(msg));
363     return msg;
364 }
365 
366 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
367                                        Error **errp)
368 {
369     COLOMessage msg;
370     Error *local_err = NULL;
371 
372     msg = colo_receive_message(f, &local_err);
373     if (local_err) {
374         error_propagate(errp, local_err);
375         return;
376     }
377     if (msg != expect_msg) {
378         error_setg(errp, "Unexpected COLO message %d, expected %d",
379                           msg, expect_msg);
380     }
381 }
382 
383 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
384                                            Error **errp)
385 {
386     Error *local_err = NULL;
387     uint64_t value;
388     int ret;
389 
390     colo_receive_check_message(f, expect_msg, &local_err);
391     if (local_err) {
392         error_propagate(errp, local_err);
393         return 0;
394     }
395 
396     value = qemu_get_be64(f);
397     ret = qemu_file_get_error(f);
398     if (ret < 0) {
399         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
400                          COLOMessage_str(expect_msg));
401     }
402     return value;
403 }
404 
405 static int colo_do_checkpoint_transaction(MigrationState *s,
406                                           QIOChannelBuffer *bioc,
407                                           QEMUFile *fb)
408 {
409     Error *local_err = NULL;
410     int ret = -1;
411 
412     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
413                       &local_err);
414     if (local_err) {
415         goto out;
416     }
417 
418     colo_receive_check_message(s->rp_state.from_dst_file,
419                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
420     if (local_err) {
421         goto out;
422     }
423     /* Reset channel-buffer directly */
424     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
425     bioc->usage = 0;
426 
427     bql_lock();
428     if (failover_get_state() != FAILOVER_STATUS_NONE) {
429         bql_unlock();
430         goto out;
431     }
432     vm_stop_force_state(RUN_STATE_COLO);
433     bql_unlock();
434     trace_colo_vm_state_change("run", "stop");
435     /*
436      * Failover request bh could be called after vm_stop_force_state(),
437      * So we need check failover_request_is_active() again.
438      */
439     if (failover_get_state() != FAILOVER_STATUS_NONE) {
440         goto out;
441     }
442     bql_lock();
443 
444     replication_do_checkpoint_all(&local_err);
445     if (local_err) {
446         bql_unlock();
447         goto out;
448     }
449 
450     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
451     if (local_err) {
452         bql_unlock();
453         goto out;
454     }
455     /* Note: device state is saved into buffer */
456     ret = qemu_save_device_state(fb);
457 
458     bql_unlock();
459     if (ret < 0) {
460         goto out;
461     }
462 
463     if (migrate_auto_converge()) {
464         mig_throttle_counter_reset();
465     }
466     /*
467      * Only save VM's live state, which not including device state.
468      * TODO: We may need a timeout mechanism to prevent COLO process
469      * to be blocked here.
470      */
471     qemu_savevm_live_state(s->to_dst_file);
472 
473     qemu_fflush(fb);
474 
475     /*
476      * We need the size of the VMstate data in Secondary side,
477      * With which we can decide how much data should be read.
478      */
479     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
480                             bioc->usage, &local_err);
481     if (local_err) {
482         goto out;
483     }
484 
485     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
486     ret = qemu_fflush(s->to_dst_file);
487     if (ret < 0) {
488         goto out;
489     }
490 
491     colo_receive_check_message(s->rp_state.from_dst_file,
492                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
493     if (local_err) {
494         goto out;
495     }
496 
497     qemu_event_reset(&s->colo_checkpoint_event);
498     colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
499     if (local_err) {
500         goto out;
501     }
502 
503     colo_receive_check_message(s->rp_state.from_dst_file,
504                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
505     if (local_err) {
506         goto out;
507     }
508 
509     ret = 0;
510 
511     bql_lock();
512     vm_start();
513     bql_unlock();
514     trace_colo_vm_state_change("stop", "run");
515 
516 out:
517     if (local_err) {
518         error_report_err(local_err);
519     }
520     return ret;
521 }
522 
523 static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
524 {
525     colo_checkpoint_notify();
526 }
527 
528 static void colo_process_checkpoint(MigrationState *s)
529 {
530     QIOChannelBuffer *bioc;
531     QEMUFile *fb = NULL;
532     Error *local_err = NULL;
533     int ret;
534 
535     if (get_colo_mode() != COLO_MODE_PRIMARY) {
536         error_report("COLO mode must be COLO_MODE_PRIMARY");
537         return;
538     }
539 
540     failover_init_state();
541 
542     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
543     if (!s->rp_state.from_dst_file) {
544         error_report("Open QEMUFile from_dst_file failed");
545         goto out;
546     }
547 
548     packets_compare_notifier.notify = colo_compare_notify_checkpoint;
549     colo_compare_register_notifier(&packets_compare_notifier);
550 
551     /*
552      * Wait for Secondary finish loading VM states and enter COLO
553      * restore.
554      */
555     colo_receive_check_message(s->rp_state.from_dst_file,
556                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
557     if (local_err) {
558         goto out;
559     }
560     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
561     fb = qemu_file_new_output(QIO_CHANNEL(bioc));
562     object_unref(OBJECT(bioc));
563 
564     bql_lock();
565     replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
566     if (local_err) {
567         bql_unlock();
568         goto out;
569     }
570 
571     vm_start();
572     bql_unlock();
573     trace_colo_vm_state_change("stop", "run");
574 
575     timer_mod(s->colo_delay_timer, qemu_clock_get_ms(QEMU_CLOCK_HOST) +
576               migrate_checkpoint_delay());
577 
578     while (s->state == MIGRATION_STATUS_COLO) {
579         if (failover_get_state() != FAILOVER_STATUS_NONE) {
580             error_report("failover request");
581             goto out;
582         }
583 
584         qemu_event_wait(&s->colo_checkpoint_event);
585 
586         if (s->state != MIGRATION_STATUS_COLO) {
587             goto out;
588         }
589         ret = colo_do_checkpoint_transaction(s, bioc, fb);
590         if (ret < 0) {
591             goto out;
592         }
593     }
594 
595 out:
596     /* Throw the unreported error message after exited from loop */
597     if (local_err) {
598         error_report_err(local_err);
599     }
600 
601     if (fb) {
602         qemu_fclose(fb);
603     }
604 
605     /*
606      * There are only two reasons we can get here, some error happened
607      * or the user triggered failover.
608      */
609     switch (failover_get_state()) {
610     case FAILOVER_STATUS_COMPLETED:
611         qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
612                                   COLO_EXIT_REASON_REQUEST);
613         break;
614     default:
615         qapi_event_send_colo_exit(COLO_MODE_PRIMARY,
616                                   COLO_EXIT_REASON_ERROR);
617     }
618 
619     /* Hope this not to be too long to wait here */
620     qemu_sem_wait(&s->colo_exit_sem);
621     qemu_sem_destroy(&s->colo_exit_sem);
622 
623     /*
624      * It is safe to unregister notifier after failover finished.
625      * Besides, colo_delay_timer and colo_checkpoint_sem can't be
626      * released before unregister notifier, or there will be use-after-free
627      * error.
628      */
629     colo_compare_unregister_notifier(&packets_compare_notifier);
630     timer_free(s->colo_delay_timer);
631     qemu_event_destroy(&s->colo_checkpoint_event);
632 
633     /*
634      * Must be called after failover BH is completed,
635      * Or the failover BH may shutdown the wrong fd that
636      * re-used by other threads after we release here.
637      */
638     if (s->rp_state.from_dst_file) {
639         qemu_fclose(s->rp_state.from_dst_file);
640         s->rp_state.from_dst_file = NULL;
641     }
642 }
643 
644 void migrate_start_colo_process(MigrationState *s)
645 {
646     bql_unlock();
647     qemu_event_init(&s->colo_checkpoint_event, false);
648     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
649                                 colo_checkpoint_notify_timer, NULL);
650 
651     qemu_sem_init(&s->colo_exit_sem, 0);
652     colo_process_checkpoint(s);
653     bql_lock();
654 }
655 
656 static void colo_incoming_process_checkpoint(MigrationIncomingState *mis,
657                       QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
658 {
659     uint64_t total_size;
660     uint64_t value;
661     Error *local_err = NULL;
662     int ret;
663 
664     bql_lock();
665     vm_stop_force_state(RUN_STATE_COLO);
666     bql_unlock();
667     trace_colo_vm_state_change("run", "stop");
668 
669     /* FIXME: This is unnecessary for periodic checkpoint mode */
670     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
671                  &local_err);
672     if (local_err) {
673         error_propagate(errp, local_err);
674         return;
675     }
676 
677     colo_receive_check_message(mis->from_src_file,
678                        COLO_MESSAGE_VMSTATE_SEND, &local_err);
679     if (local_err) {
680         error_propagate(errp, local_err);
681         return;
682     }
683 
684     bql_lock();
685     cpu_synchronize_all_states();
686     ret = qemu_loadvm_state_main(mis->from_src_file, mis);
687     bql_unlock();
688 
689     if (ret < 0) {
690         error_setg(errp, "Load VM's live state (ram) error");
691         return;
692     }
693 
694     value = colo_receive_message_value(mis->from_src_file,
695                              COLO_MESSAGE_VMSTATE_SIZE, &local_err);
696     if (local_err) {
697         error_propagate(errp, local_err);
698         return;
699     }
700 
701     /*
702      * Read VM device state data into channel buffer,
703      * It's better to re-use the memory allocated.
704      * Here we need to handle the channel buffer directly.
705      */
706     if (value > bioc->capacity) {
707         bioc->capacity = value;
708         bioc->data = g_realloc(bioc->data, bioc->capacity);
709     }
710     total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
711     if (total_size != value) {
712         error_setg(errp, "Got %" PRIu64 " VMState data, less than expected"
713                     " %" PRIu64, total_size, value);
714         return;
715     }
716     bioc->usage = total_size;
717     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
718 
719     colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
720                  &local_err);
721     if (local_err) {
722         error_propagate(errp, local_err);
723         return;
724     }
725 
726     bql_lock();
727     vmstate_loading = true;
728     colo_flush_ram_cache();
729     ret = qemu_load_device_state(fb);
730     if (ret < 0) {
731         error_setg(errp, "COLO: load device state failed");
732         vmstate_loading = false;
733         bql_unlock();
734         return;
735     }
736 
737     replication_get_error_all(&local_err);
738     if (local_err) {
739         error_propagate(errp, local_err);
740         vmstate_loading = false;
741         bql_unlock();
742         return;
743     }
744 
745     /* discard colo disk buffer */
746     replication_do_checkpoint_all(&local_err);
747     if (local_err) {
748         error_propagate(errp, local_err);
749         vmstate_loading = false;
750         bql_unlock();
751         return;
752     }
753     /* Notify all filters of all NIC to do checkpoint */
754     colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err);
755 
756     if (local_err) {
757         error_propagate(errp, local_err);
758         vmstate_loading = false;
759         bql_unlock();
760         return;
761     }
762 
763     vmstate_loading = false;
764     vm_start();
765     bql_unlock();
766     trace_colo_vm_state_change("stop", "run");
767 
768     if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
769         return;
770     }
771 
772     colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
773                  &local_err);
774     error_propagate(errp, local_err);
775 }
776 
777 static void colo_wait_handle_message(MigrationIncomingState *mis,
778                 QEMUFile *fb, QIOChannelBuffer *bioc, Error **errp)
779 {
780     COLOMessage msg;
781     Error *local_err = NULL;
782 
783     msg = colo_receive_message(mis->from_src_file, &local_err);
784     if (local_err) {
785         error_propagate(errp, local_err);
786         return;
787     }
788 
789     switch (msg) {
790     case COLO_MESSAGE_CHECKPOINT_REQUEST:
791         colo_incoming_process_checkpoint(mis, fb, bioc, errp);
792         break;
793     default:
794         error_setg(errp, "Got unknown COLO message: %d", msg);
795         break;
796     }
797 }
798 
799 void colo_shutdown(void)
800 {
801     MigrationIncomingState *mis = NULL;
802     MigrationState *s = NULL;
803 
804     switch (get_colo_mode()) {
805     case COLO_MODE_PRIMARY:
806         s = migrate_get_current();
807         qemu_event_set(&s->colo_checkpoint_event);
808         qemu_sem_post(&s->colo_exit_sem);
809         break;
810     case COLO_MODE_SECONDARY:
811         mis = migration_incoming_get_current();
812         qemu_sem_post(&mis->colo_incoming_sem);
813         break;
814     default:
815         break;
816     }
817 }
818 
819 static void *colo_process_incoming_thread(void *opaque)
820 {
821     MigrationIncomingState *mis = opaque;
822     QEMUFile *fb = NULL;
823     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
824     Error *local_err = NULL;
825 
826     rcu_register_thread();
827     qemu_sem_init(&mis->colo_incoming_sem, 0);
828 
829     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
830                       MIGRATION_STATUS_COLO);
831 
832     if (get_colo_mode() != COLO_MODE_SECONDARY) {
833         error_report("COLO mode must be COLO_MODE_SECONDARY");
834         return NULL;
835     }
836 
837     /* Make sure all file formats throw away their mutable metadata */
838     bql_lock();
839     bdrv_activate_all(&local_err);
840     bql_unlock();
841     if (local_err) {
842         error_report_err(local_err);
843         return NULL;
844     }
845 
846     failover_init_state();
847 
848     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
849     if (!mis->to_src_file) {
850         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
851         goto out;
852     }
853     /*
854      * Note: the communication between Primary side and Secondary side
855      * should be sequential, we set the fd to unblocked in migration incoming
856      * coroutine, and here we are in the COLO incoming thread, so it is ok to
857      * set the fd back to blocked.
858      */
859     qemu_file_set_blocking(mis->from_src_file, true);
860 
861     colo_incoming_start_dirty_log();
862 
863     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
864     fb = qemu_file_new_input(QIO_CHANNEL(bioc));
865     object_unref(OBJECT(bioc));
866 
867     bql_lock();
868     replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
869     if (local_err) {
870         bql_unlock();
871         goto out;
872     }
873     vm_start();
874     bql_unlock();
875     trace_colo_vm_state_change("stop", "run");
876 
877     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
878                       &local_err);
879     if (local_err) {
880         goto out;
881     }
882 
883     while (mis->state == MIGRATION_STATUS_COLO) {
884         colo_wait_handle_message(mis, fb, bioc, &local_err);
885         if (local_err) {
886             error_report_err(local_err);
887             break;
888         }
889 
890         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
891             failover_set_state(FAILOVER_STATUS_RELAUNCH,
892                             FAILOVER_STATUS_NONE);
893             failover_request_active(NULL);
894             break;
895         }
896 
897         if (failover_get_state() != FAILOVER_STATUS_NONE) {
898             error_report("failover request");
899             break;
900         }
901     }
902 
903 out:
904     /*
905      * There are only two reasons we can get here, some error happened
906      * or the user triggered failover.
907      */
908     switch (failover_get_state()) {
909     case FAILOVER_STATUS_COMPLETED:
910         qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
911                                   COLO_EXIT_REASON_REQUEST);
912         break;
913     default:
914         qapi_event_send_colo_exit(COLO_MODE_SECONDARY,
915                                   COLO_EXIT_REASON_ERROR);
916     }
917 
918     if (fb) {
919         qemu_fclose(fb);
920     }
921 
922     /* Hope this not to be too long to loop here */
923     qemu_sem_wait(&mis->colo_incoming_sem);
924     qemu_sem_destroy(&mis->colo_incoming_sem);
925 
926     rcu_unregister_thread();
927     return NULL;
928 }
929 
930 void coroutine_fn colo_incoming_co(void)
931 {
932     MigrationIncomingState *mis = migration_incoming_get_current();
933     QemuThread th;
934 
935     assert(bql_locked());
936     assert(migration_incoming_colo_enabled());
937 
938     qemu_thread_create(&th, "mig/dst/colo", colo_process_incoming_thread,
939                        mis, QEMU_THREAD_JOINABLE);
940 
941     mis->colo_incoming_co = qemu_coroutine_self();
942     qemu_coroutine_yield();
943     mis->colo_incoming_co = NULL;
944 
945     bql_unlock();
946     /* Wait checkpoint incoming thread exit before free resource */
947     qemu_thread_join(&th);
948     bql_lock();
949 
950     /* We hold the global BQL, so it is safe here */
951     colo_release_ram_cache();
952 }
953