xref: /openbmc/qemu/migration/colo.c (revision 795c40b8)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22 #include "replication.h"
23 #include "qmp-commands.h"
24 
25 static bool vmstate_loading;
26 
27 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
28 
29 bool colo_supported(void)
30 {
31     return true;
32 }
33 
34 bool migration_in_colo_state(void)
35 {
36     MigrationState *s = migrate_get_current();
37 
38     return (s->state == MIGRATION_STATUS_COLO);
39 }
40 
41 bool migration_incoming_in_colo_state(void)
42 {
43     MigrationIncomingState *mis = migration_incoming_get_current();
44 
45     return mis && (mis->state == MIGRATION_STATUS_COLO);
46 }
47 
48 static bool colo_runstate_is_stopped(void)
49 {
50     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
51 }
52 
53 static void secondary_vm_do_failover(void)
54 {
55     int old_state;
56     MigrationIncomingState *mis = migration_incoming_get_current();
57 
58     /* Can not do failover during the process of VM's loading VMstate, Or
59      * it will break the secondary VM.
60      */
61     if (vmstate_loading) {
62         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
63                         FAILOVER_STATUS_RELAUNCH);
64         if (old_state != FAILOVER_STATUS_ACTIVE) {
65             error_report("Unknown error while do failover for secondary VM,"
66                          "old_state: %s", FailoverStatus_lookup[old_state]);
67         }
68         return;
69     }
70 
71     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
72                       MIGRATION_STATUS_COMPLETED);
73 
74     if (!autostart) {
75         error_report("\"-S\" qemu option will be ignored in secondary side");
76         /* recover runstate to normal migration finish state */
77         autostart = true;
78     }
79     /*
80      * Make sure COLO incoming thread not block in recv or send,
81      * If mis->from_src_file and mis->to_src_file use the same fd,
82      * The second shutdown() will return -1, we ignore this value,
83      * It is harmless.
84      */
85     if (mis->from_src_file) {
86         qemu_file_shutdown(mis->from_src_file);
87     }
88     if (mis->to_src_file) {
89         qemu_file_shutdown(mis->to_src_file);
90     }
91 
92     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
93                                    FAILOVER_STATUS_COMPLETED);
94     if (old_state != FAILOVER_STATUS_ACTIVE) {
95         error_report("Incorrect state (%s) while doing failover for "
96                      "secondary VM", FailoverStatus_lookup[old_state]);
97         return;
98     }
99     /* Notify COLO incoming thread that failover work is finished */
100     qemu_sem_post(&mis->colo_incoming_sem);
101     /* For Secondary VM, jump to incoming co */
102     if (mis->migration_incoming_co) {
103         qemu_coroutine_enter(mis->migration_incoming_co);
104     }
105 }
106 
107 static void primary_vm_do_failover(void)
108 {
109     MigrationState *s = migrate_get_current();
110     int old_state;
111 
112     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
113                       MIGRATION_STATUS_COMPLETED);
114 
115     /*
116      * Wake up COLO thread which may blocked in recv() or send(),
117      * The s->rp_state.from_dst_file and s->to_dst_file may use the
118      * same fd, but we still shutdown the fd for twice, it is harmless.
119      */
120     if (s->to_dst_file) {
121         qemu_file_shutdown(s->to_dst_file);
122     }
123     if (s->rp_state.from_dst_file) {
124         qemu_file_shutdown(s->rp_state.from_dst_file);
125     }
126 
127     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
128                                    FAILOVER_STATUS_COMPLETED);
129     if (old_state != FAILOVER_STATUS_ACTIVE) {
130         error_report("Incorrect state (%s) while doing failover for Primary VM",
131                      FailoverStatus_lookup[old_state]);
132         return;
133     }
134     /* Notify COLO thread that failover work is finished */
135     qemu_sem_post(&s->colo_exit_sem);
136 }
137 
138 void colo_do_failover(MigrationState *s)
139 {
140     /* Make sure VM stopped while failover happened. */
141     if (!colo_runstate_is_stopped()) {
142         vm_stop_force_state(RUN_STATE_COLO);
143     }
144 
145     if (get_colo_mode() == COLO_MODE_PRIMARY) {
146         primary_vm_do_failover();
147     } else {
148         secondary_vm_do_failover();
149     }
150 }
151 
152 void qmp_xen_set_replication(bool enable, bool primary,
153                              bool has_failover, bool failover,
154                              Error **errp)
155 {
156 #ifdef CONFIG_REPLICATION
157     ReplicationMode mode = primary ?
158                            REPLICATION_MODE_PRIMARY :
159                            REPLICATION_MODE_SECONDARY;
160 
161     if (has_failover && enable) {
162         error_setg(errp, "Parameter 'failover' is only for"
163                    " stopping replication");
164         return;
165     }
166 
167     if (enable) {
168         replication_start_all(mode, errp);
169     } else {
170         if (!has_failover) {
171             failover = NULL;
172         }
173         replication_stop_all(failover, failover ? NULL : errp);
174     }
175 #else
176     abort();
177 #endif
178 }
179 
180 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
181 {
182 #ifdef CONFIG_REPLICATION
183     Error *err = NULL;
184     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
185 
186     replication_get_error_all(&err);
187     if (err) {
188         s->error = true;
189         s->has_desc = true;
190         s->desc = g_strdup(error_get_pretty(err));
191     } else {
192         s->error = false;
193     }
194 
195     error_free(err);
196     return s;
197 #else
198     abort();
199 #endif
200 }
201 
202 void qmp_xen_colo_do_checkpoint(Error **errp)
203 {
204 #ifdef CONFIG_REPLICATION
205     replication_do_checkpoint_all(errp);
206 #else
207     abort();
208 #endif
209 }
210 
211 static void colo_send_message(QEMUFile *f, COLOMessage msg,
212                               Error **errp)
213 {
214     int ret;
215 
216     if (msg >= COLO_MESSAGE__MAX) {
217         error_setg(errp, "%s: Invalid message", __func__);
218         return;
219     }
220     qemu_put_be32(f, msg);
221     qemu_fflush(f);
222 
223     ret = qemu_file_get_error(f);
224     if (ret < 0) {
225         error_setg_errno(errp, -ret, "Can't send COLO message");
226     }
227     trace_colo_send_message(COLOMessage_lookup[msg]);
228 }
229 
230 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
231                                     uint64_t value, Error **errp)
232 {
233     Error *local_err = NULL;
234     int ret;
235 
236     colo_send_message(f, msg, &local_err);
237     if (local_err) {
238         error_propagate(errp, local_err);
239         return;
240     }
241     qemu_put_be64(f, value);
242     qemu_fflush(f);
243 
244     ret = qemu_file_get_error(f);
245     if (ret < 0) {
246         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
247                          COLOMessage_lookup[msg]);
248     }
249 }
250 
251 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
252 {
253     COLOMessage msg;
254     int ret;
255 
256     msg = qemu_get_be32(f);
257     ret = qemu_file_get_error(f);
258     if (ret < 0) {
259         error_setg_errno(errp, -ret, "Can't receive COLO message");
260         return msg;
261     }
262     if (msg >= COLO_MESSAGE__MAX) {
263         error_setg(errp, "%s: Invalid message", __func__);
264         return msg;
265     }
266     trace_colo_receive_message(COLOMessage_lookup[msg]);
267     return msg;
268 }
269 
270 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
271                                        Error **errp)
272 {
273     COLOMessage msg;
274     Error *local_err = NULL;
275 
276     msg = colo_receive_message(f, &local_err);
277     if (local_err) {
278         error_propagate(errp, local_err);
279         return;
280     }
281     if (msg != expect_msg) {
282         error_setg(errp, "Unexpected COLO message %d, expected %d",
283                           msg, expect_msg);
284     }
285 }
286 
287 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
288                                            Error **errp)
289 {
290     Error *local_err = NULL;
291     uint64_t value;
292     int ret;
293 
294     colo_receive_check_message(f, expect_msg, &local_err);
295     if (local_err) {
296         error_propagate(errp, local_err);
297         return 0;
298     }
299 
300     value = qemu_get_be64(f);
301     ret = qemu_file_get_error(f);
302     if (ret < 0) {
303         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
304                          COLOMessage_lookup[expect_msg]);
305     }
306     return value;
307 }
308 
309 static int colo_do_checkpoint_transaction(MigrationState *s,
310                                           QIOChannelBuffer *bioc,
311                                           QEMUFile *fb)
312 {
313     Error *local_err = NULL;
314     int ret = -1;
315 
316     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
317                       &local_err);
318     if (local_err) {
319         goto out;
320     }
321 
322     colo_receive_check_message(s->rp_state.from_dst_file,
323                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
324     if (local_err) {
325         goto out;
326     }
327     /* Reset channel-buffer directly */
328     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
329     bioc->usage = 0;
330 
331     qemu_mutex_lock_iothread();
332     if (failover_get_state() != FAILOVER_STATUS_NONE) {
333         qemu_mutex_unlock_iothread();
334         goto out;
335     }
336     vm_stop_force_state(RUN_STATE_COLO);
337     qemu_mutex_unlock_iothread();
338     trace_colo_vm_state_change("run", "stop");
339     /*
340      * Failover request bh could be called after vm_stop_force_state(),
341      * So we need check failover_request_is_active() again.
342      */
343     if (failover_get_state() != FAILOVER_STATUS_NONE) {
344         goto out;
345     }
346 
347     /* Disable block migration */
348     s->params.blk = 0;
349     s->params.shared = 0;
350     qemu_savevm_state_header(fb);
351     qemu_savevm_state_begin(fb, &s->params);
352     qemu_mutex_lock_iothread();
353     qemu_savevm_state_complete_precopy(fb, false);
354     qemu_mutex_unlock_iothread();
355 
356     qemu_fflush(fb);
357 
358     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
359     if (local_err) {
360         goto out;
361     }
362     /*
363      * We need the size of the VMstate data in Secondary side,
364      * With which we can decide how much data should be read.
365      */
366     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
367                             bioc->usage, &local_err);
368     if (local_err) {
369         goto out;
370     }
371 
372     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
373     qemu_fflush(s->to_dst_file);
374     ret = qemu_file_get_error(s->to_dst_file);
375     if (ret < 0) {
376         goto out;
377     }
378 
379     colo_receive_check_message(s->rp_state.from_dst_file,
380                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
381     if (local_err) {
382         goto out;
383     }
384 
385     colo_receive_check_message(s->rp_state.from_dst_file,
386                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
387     if (local_err) {
388         goto out;
389     }
390 
391     ret = 0;
392 
393     qemu_mutex_lock_iothread();
394     vm_start();
395     qemu_mutex_unlock_iothread();
396     trace_colo_vm_state_change("stop", "run");
397 
398 out:
399     if (local_err) {
400         error_report_err(local_err);
401     }
402     return ret;
403 }
404 
405 static void colo_process_checkpoint(MigrationState *s)
406 {
407     QIOChannelBuffer *bioc;
408     QEMUFile *fb = NULL;
409     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
410     Error *local_err = NULL;
411     int ret;
412 
413     failover_init_state();
414 
415     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
416     if (!s->rp_state.from_dst_file) {
417         error_report("Open QEMUFile from_dst_file failed");
418         goto out;
419     }
420 
421     /*
422      * Wait for Secondary finish loading VM states and enter COLO
423      * restore.
424      */
425     colo_receive_check_message(s->rp_state.from_dst_file,
426                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
427     if (local_err) {
428         goto out;
429     }
430     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
431     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
432     object_unref(OBJECT(bioc));
433 
434     qemu_mutex_lock_iothread();
435     vm_start();
436     qemu_mutex_unlock_iothread();
437     trace_colo_vm_state_change("stop", "run");
438 
439     timer_mod(s->colo_delay_timer,
440             current_time + s->parameters.x_checkpoint_delay);
441 
442     while (s->state == MIGRATION_STATUS_COLO) {
443         if (failover_get_state() != FAILOVER_STATUS_NONE) {
444             error_report("failover request");
445             goto out;
446         }
447 
448         qemu_sem_wait(&s->colo_checkpoint_sem);
449 
450         ret = colo_do_checkpoint_transaction(s, bioc, fb);
451         if (ret < 0) {
452             goto out;
453         }
454     }
455 
456 out:
457     /* Throw the unreported error message after exited from loop */
458     if (local_err) {
459         error_report_err(local_err);
460     }
461 
462     if (fb) {
463         qemu_fclose(fb);
464     }
465 
466     timer_del(s->colo_delay_timer);
467 
468     /* Hope this not to be too long to wait here */
469     qemu_sem_wait(&s->colo_exit_sem);
470     qemu_sem_destroy(&s->colo_exit_sem);
471     /*
472      * Must be called after failover BH is completed,
473      * Or the failover BH may shutdown the wrong fd that
474      * re-used by other threads after we release here.
475      */
476     if (s->rp_state.from_dst_file) {
477         qemu_fclose(s->rp_state.from_dst_file);
478     }
479 }
480 
481 void colo_checkpoint_notify(void *opaque)
482 {
483     MigrationState *s = opaque;
484     int64_t next_notify_time;
485 
486     qemu_sem_post(&s->colo_checkpoint_sem);
487     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
488     next_notify_time = s->colo_checkpoint_time +
489                     s->parameters.x_checkpoint_delay;
490     timer_mod(s->colo_delay_timer, next_notify_time);
491 }
492 
493 void migrate_start_colo_process(MigrationState *s)
494 {
495     qemu_mutex_unlock_iothread();
496     qemu_sem_init(&s->colo_checkpoint_sem, 0);
497     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
498                                 colo_checkpoint_notify, s);
499 
500     qemu_sem_init(&s->colo_exit_sem, 0);
501     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
502                       MIGRATION_STATUS_COLO);
503     colo_process_checkpoint(s);
504     qemu_mutex_lock_iothread();
505 }
506 
507 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
508                                      Error **errp)
509 {
510     COLOMessage msg;
511     Error *local_err = NULL;
512 
513     msg = colo_receive_message(f, &local_err);
514     if (local_err) {
515         error_propagate(errp, local_err);
516         return;
517     }
518 
519     switch (msg) {
520     case COLO_MESSAGE_CHECKPOINT_REQUEST:
521         *checkpoint_request = 1;
522         break;
523     default:
524         *checkpoint_request = 0;
525         error_setg(errp, "Got unknown COLO message: %d", msg);
526         break;
527     }
528 }
529 
530 void *colo_process_incoming_thread(void *opaque)
531 {
532     MigrationIncomingState *mis = opaque;
533     QEMUFile *fb = NULL;
534     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
535     uint64_t total_size;
536     uint64_t value;
537     Error *local_err = NULL;
538 
539     qemu_sem_init(&mis->colo_incoming_sem, 0);
540 
541     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
542                       MIGRATION_STATUS_COLO);
543 
544     failover_init_state();
545 
546     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
547     if (!mis->to_src_file) {
548         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
549         goto out;
550     }
551     /*
552      * Note: the communication between Primary side and Secondary side
553      * should be sequential, we set the fd to unblocked in migration incoming
554      * coroutine, and here we are in the COLO incoming thread, so it is ok to
555      * set the fd back to blocked.
556      */
557     qemu_file_set_blocking(mis->from_src_file, true);
558 
559     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
560     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
561     object_unref(OBJECT(bioc));
562 
563     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
564                       &local_err);
565     if (local_err) {
566         goto out;
567     }
568 
569     while (mis->state == MIGRATION_STATUS_COLO) {
570         int request = 0;
571 
572         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
573         if (local_err) {
574             goto out;
575         }
576         assert(request);
577         if (failover_get_state() != FAILOVER_STATUS_NONE) {
578             error_report("failover request");
579             goto out;
580         }
581 
582         /* FIXME: This is unnecessary for periodic checkpoint mode */
583         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
584                      &local_err);
585         if (local_err) {
586             goto out;
587         }
588 
589         colo_receive_check_message(mis->from_src_file,
590                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
591         if (local_err) {
592             goto out;
593         }
594 
595         value = colo_receive_message_value(mis->from_src_file,
596                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
597         if (local_err) {
598             goto out;
599         }
600 
601         /*
602          * Read VM device state data into channel buffer,
603          * It's better to re-use the memory allocated.
604          * Here we need to handle the channel buffer directly.
605          */
606         if (value > bioc->capacity) {
607             bioc->capacity = value;
608             bioc->data = g_realloc(bioc->data, bioc->capacity);
609         }
610         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
611         if (total_size != value) {
612             error_report("Got %" PRIu64 " VMState data, less than expected"
613                         " %" PRIu64, total_size, value);
614             goto out;
615         }
616         bioc->usage = total_size;
617         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
618 
619         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
620                      &local_err);
621         if (local_err) {
622             goto out;
623         }
624 
625         qemu_mutex_lock_iothread();
626         qemu_system_reset(VMRESET_SILENT);
627         vmstate_loading = true;
628         if (qemu_loadvm_state(fb) < 0) {
629             error_report("COLO: loadvm failed");
630             qemu_mutex_unlock_iothread();
631             goto out;
632         }
633 
634         vmstate_loading = false;
635         qemu_mutex_unlock_iothread();
636 
637         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
638             failover_set_state(FAILOVER_STATUS_RELAUNCH,
639                             FAILOVER_STATUS_NONE);
640             failover_request_active(NULL);
641             goto out;
642         }
643 
644         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
645                      &local_err);
646         if (local_err) {
647             goto out;
648         }
649     }
650 
651 out:
652     vmstate_loading = false;
653     /* Throw the unreported error message after exited from loop */
654     if (local_err) {
655         error_report_err(local_err);
656     }
657 
658     if (fb) {
659         qemu_fclose(fb);
660     }
661 
662     /* Hope this not to be too long to loop here */
663     qemu_sem_wait(&mis->colo_incoming_sem);
664     qemu_sem_destroy(&mis->colo_incoming_sem);
665     /* Must be called after failover BH is completed */
666     if (mis->to_src_file) {
667         qemu_fclose(mis->to_src_file);
668     }
669     migration_incoming_exit_colo();
670 
671     return NULL;
672 }
673