xref: /openbmc/qemu/migration/colo.c (revision 7609ffb9)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22 #include "replication.h"
23 #include "qmp-commands.h"
24 
25 static bool vmstate_loading;
26 
27 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
28 
29 bool colo_supported(void)
30 {
31     return true;
32 }
33 
34 bool migration_in_colo_state(void)
35 {
36     MigrationState *s = migrate_get_current();
37 
38     return (s->state == MIGRATION_STATUS_COLO);
39 }
40 
41 bool migration_incoming_in_colo_state(void)
42 {
43     MigrationIncomingState *mis = migration_incoming_get_current();
44 
45     return mis && (mis->state == MIGRATION_STATUS_COLO);
46 }
47 
48 static bool colo_runstate_is_stopped(void)
49 {
50     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
51 }
52 
53 static void secondary_vm_do_failover(void)
54 {
55     int old_state;
56     MigrationIncomingState *mis = migration_incoming_get_current();
57 
58     /* Can not do failover during the process of VM's loading VMstate, Or
59      * it will break the secondary VM.
60      */
61     if (vmstate_loading) {
62         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
63                         FAILOVER_STATUS_RELAUNCH);
64         if (old_state != FAILOVER_STATUS_ACTIVE) {
65             error_report("Unknown error while do failover for secondary VM,"
66                          "old_state: %s", FailoverStatus_lookup[old_state]);
67         }
68         return;
69     }
70 
71     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
72                       MIGRATION_STATUS_COMPLETED);
73 
74     if (!autostart) {
75         error_report("\"-S\" qemu option will be ignored in secondary side");
76         /* recover runstate to normal migration finish state */
77         autostart = true;
78     }
79     /*
80      * Make sure COLO incoming thread not block in recv or send,
81      * If mis->from_src_file and mis->to_src_file use the same fd,
82      * The second shutdown() will return -1, we ignore this value,
83      * It is harmless.
84      */
85     if (mis->from_src_file) {
86         qemu_file_shutdown(mis->from_src_file);
87     }
88     if (mis->to_src_file) {
89         qemu_file_shutdown(mis->to_src_file);
90     }
91 
92     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
93                                    FAILOVER_STATUS_COMPLETED);
94     if (old_state != FAILOVER_STATUS_ACTIVE) {
95         error_report("Incorrect state (%s) while doing failover for "
96                      "secondary VM", FailoverStatus_lookup[old_state]);
97         return;
98     }
99     /* Notify COLO incoming thread that failover work is finished */
100     qemu_sem_post(&mis->colo_incoming_sem);
101     /* For Secondary VM, jump to incoming co */
102     if (mis->migration_incoming_co) {
103         qemu_coroutine_enter(mis->migration_incoming_co);
104     }
105 }
106 
107 static void primary_vm_do_failover(void)
108 {
109     MigrationState *s = migrate_get_current();
110     int old_state;
111 
112     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
113                       MIGRATION_STATUS_COMPLETED);
114 
115     /*
116      * Wake up COLO thread which may blocked in recv() or send(),
117      * The s->rp_state.from_dst_file and s->to_dst_file may use the
118      * same fd, but we still shutdown the fd for twice, it is harmless.
119      */
120     if (s->to_dst_file) {
121         qemu_file_shutdown(s->to_dst_file);
122     }
123     if (s->rp_state.from_dst_file) {
124         qemu_file_shutdown(s->rp_state.from_dst_file);
125     }
126 
127     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
128                                    FAILOVER_STATUS_COMPLETED);
129     if (old_state != FAILOVER_STATUS_ACTIVE) {
130         error_report("Incorrect state (%s) while doing failover for Primary VM",
131                      FailoverStatus_lookup[old_state]);
132         return;
133     }
134     /* Notify COLO thread that failover work is finished */
135     qemu_sem_post(&s->colo_exit_sem);
136 }
137 
138 void colo_do_failover(MigrationState *s)
139 {
140     /* Make sure VM stopped while failover happened. */
141     if (!colo_runstate_is_stopped()) {
142         vm_stop_force_state(RUN_STATE_COLO);
143     }
144 
145     if (get_colo_mode() == COLO_MODE_PRIMARY) {
146         primary_vm_do_failover();
147     } else {
148         secondary_vm_do_failover();
149     }
150 }
151 
152 void qmp_xen_set_replication(bool enable, bool primary,
153                              bool has_failover, bool failover,
154                              Error **errp)
155 {
156     ReplicationMode mode = primary ?
157                            REPLICATION_MODE_PRIMARY :
158                            REPLICATION_MODE_SECONDARY;
159 
160     if (has_failover && enable) {
161         error_setg(errp, "Parameter 'failover' is only for"
162                    " stopping replication");
163         return;
164     }
165 
166     if (enable) {
167         replication_start_all(mode, errp);
168     } else {
169         if (!has_failover) {
170             failover = NULL;
171         }
172         replication_stop_all(failover, failover ? NULL : errp);
173     }
174 }
175 
176 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
177 {
178     Error *err = NULL;
179     ReplicationStatus *s = g_new0(ReplicationStatus, 1);
180 
181     replication_get_error_all(&err);
182     if (err) {
183         s->error = true;
184         s->has_desc = true;
185         s->desc = g_strdup(error_get_pretty(err));
186     } else {
187         s->error = false;
188     }
189 
190     error_free(err);
191     return s;
192 }
193 
194 void qmp_xen_colo_do_checkpoint(Error **errp)
195 {
196     replication_do_checkpoint_all(errp);
197 }
198 
199 static void colo_send_message(QEMUFile *f, COLOMessage msg,
200                               Error **errp)
201 {
202     int ret;
203 
204     if (msg >= COLO_MESSAGE__MAX) {
205         error_setg(errp, "%s: Invalid message", __func__);
206         return;
207     }
208     qemu_put_be32(f, msg);
209     qemu_fflush(f);
210 
211     ret = qemu_file_get_error(f);
212     if (ret < 0) {
213         error_setg_errno(errp, -ret, "Can't send COLO message");
214     }
215     trace_colo_send_message(COLOMessage_lookup[msg]);
216 }
217 
218 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
219                                     uint64_t value, Error **errp)
220 {
221     Error *local_err = NULL;
222     int ret;
223 
224     colo_send_message(f, msg, &local_err);
225     if (local_err) {
226         error_propagate(errp, local_err);
227         return;
228     }
229     qemu_put_be64(f, value);
230     qemu_fflush(f);
231 
232     ret = qemu_file_get_error(f);
233     if (ret < 0) {
234         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
235                          COLOMessage_lookup[msg]);
236     }
237 }
238 
239 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
240 {
241     COLOMessage msg;
242     int ret;
243 
244     msg = qemu_get_be32(f);
245     ret = qemu_file_get_error(f);
246     if (ret < 0) {
247         error_setg_errno(errp, -ret, "Can't receive COLO message");
248         return msg;
249     }
250     if (msg >= COLO_MESSAGE__MAX) {
251         error_setg(errp, "%s: Invalid message", __func__);
252         return msg;
253     }
254     trace_colo_receive_message(COLOMessage_lookup[msg]);
255     return msg;
256 }
257 
258 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
259                                        Error **errp)
260 {
261     COLOMessage msg;
262     Error *local_err = NULL;
263 
264     msg = colo_receive_message(f, &local_err);
265     if (local_err) {
266         error_propagate(errp, local_err);
267         return;
268     }
269     if (msg != expect_msg) {
270         error_setg(errp, "Unexpected COLO message %d, expected %d",
271                           msg, expect_msg);
272     }
273 }
274 
275 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
276                                            Error **errp)
277 {
278     Error *local_err = NULL;
279     uint64_t value;
280     int ret;
281 
282     colo_receive_check_message(f, expect_msg, &local_err);
283     if (local_err) {
284         error_propagate(errp, local_err);
285         return 0;
286     }
287 
288     value = qemu_get_be64(f);
289     ret = qemu_file_get_error(f);
290     if (ret < 0) {
291         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
292                          COLOMessage_lookup[expect_msg]);
293     }
294     return value;
295 }
296 
297 static int colo_do_checkpoint_transaction(MigrationState *s,
298                                           QIOChannelBuffer *bioc,
299                                           QEMUFile *fb)
300 {
301     Error *local_err = NULL;
302     int ret = -1;
303 
304     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
305                       &local_err);
306     if (local_err) {
307         goto out;
308     }
309 
310     colo_receive_check_message(s->rp_state.from_dst_file,
311                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
312     if (local_err) {
313         goto out;
314     }
315     /* Reset channel-buffer directly */
316     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
317     bioc->usage = 0;
318 
319     qemu_mutex_lock_iothread();
320     if (failover_get_state() != FAILOVER_STATUS_NONE) {
321         qemu_mutex_unlock_iothread();
322         goto out;
323     }
324     vm_stop_force_state(RUN_STATE_COLO);
325     qemu_mutex_unlock_iothread();
326     trace_colo_vm_state_change("run", "stop");
327     /*
328      * Failover request bh could be called after vm_stop_force_state(),
329      * So we need check failover_request_is_active() again.
330      */
331     if (failover_get_state() != FAILOVER_STATUS_NONE) {
332         goto out;
333     }
334 
335     /* Disable block migration */
336     s->params.blk = 0;
337     s->params.shared = 0;
338     qemu_savevm_state_header(fb);
339     qemu_savevm_state_begin(fb, &s->params);
340     qemu_mutex_lock_iothread();
341     qemu_savevm_state_complete_precopy(fb, false);
342     qemu_mutex_unlock_iothread();
343 
344     qemu_fflush(fb);
345 
346     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
347     if (local_err) {
348         goto out;
349     }
350     /*
351      * We need the size of the VMstate data in Secondary side,
352      * With which we can decide how much data should be read.
353      */
354     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
355                             bioc->usage, &local_err);
356     if (local_err) {
357         goto out;
358     }
359 
360     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
361     qemu_fflush(s->to_dst_file);
362     ret = qemu_file_get_error(s->to_dst_file);
363     if (ret < 0) {
364         goto out;
365     }
366 
367     colo_receive_check_message(s->rp_state.from_dst_file,
368                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
369     if (local_err) {
370         goto out;
371     }
372 
373     colo_receive_check_message(s->rp_state.from_dst_file,
374                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
375     if (local_err) {
376         goto out;
377     }
378 
379     ret = 0;
380 
381     qemu_mutex_lock_iothread();
382     vm_start();
383     qemu_mutex_unlock_iothread();
384     trace_colo_vm_state_change("stop", "run");
385 
386 out:
387     if (local_err) {
388         error_report_err(local_err);
389     }
390     return ret;
391 }
392 
393 static void colo_process_checkpoint(MigrationState *s)
394 {
395     QIOChannelBuffer *bioc;
396     QEMUFile *fb = NULL;
397     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
398     Error *local_err = NULL;
399     int ret;
400 
401     failover_init_state();
402 
403     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
404     if (!s->rp_state.from_dst_file) {
405         error_report("Open QEMUFile from_dst_file failed");
406         goto out;
407     }
408 
409     /*
410      * Wait for Secondary finish loading VM states and enter COLO
411      * restore.
412      */
413     colo_receive_check_message(s->rp_state.from_dst_file,
414                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
415     if (local_err) {
416         goto out;
417     }
418     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
419     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
420     object_unref(OBJECT(bioc));
421 
422     qemu_mutex_lock_iothread();
423     vm_start();
424     qemu_mutex_unlock_iothread();
425     trace_colo_vm_state_change("stop", "run");
426 
427     timer_mod(s->colo_delay_timer,
428             current_time + s->parameters.x_checkpoint_delay);
429 
430     while (s->state == MIGRATION_STATUS_COLO) {
431         if (failover_get_state() != FAILOVER_STATUS_NONE) {
432             error_report("failover request");
433             goto out;
434         }
435 
436         qemu_sem_wait(&s->colo_checkpoint_sem);
437 
438         ret = colo_do_checkpoint_transaction(s, bioc, fb);
439         if (ret < 0) {
440             goto out;
441         }
442     }
443 
444 out:
445     /* Throw the unreported error message after exited from loop */
446     if (local_err) {
447         error_report_err(local_err);
448     }
449 
450     if (fb) {
451         qemu_fclose(fb);
452     }
453 
454     timer_del(s->colo_delay_timer);
455 
456     /* Hope this not to be too long to wait here */
457     qemu_sem_wait(&s->colo_exit_sem);
458     qemu_sem_destroy(&s->colo_exit_sem);
459     /*
460      * Must be called after failover BH is completed,
461      * Or the failover BH may shutdown the wrong fd that
462      * re-used by other threads after we release here.
463      */
464     if (s->rp_state.from_dst_file) {
465         qemu_fclose(s->rp_state.from_dst_file);
466     }
467 }
468 
469 void colo_checkpoint_notify(void *opaque)
470 {
471     MigrationState *s = opaque;
472     int64_t next_notify_time;
473 
474     qemu_sem_post(&s->colo_checkpoint_sem);
475     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
476     next_notify_time = s->colo_checkpoint_time +
477                     s->parameters.x_checkpoint_delay;
478     timer_mod(s->colo_delay_timer, next_notify_time);
479 }
480 
481 void migrate_start_colo_process(MigrationState *s)
482 {
483     qemu_mutex_unlock_iothread();
484     qemu_sem_init(&s->colo_checkpoint_sem, 0);
485     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
486                                 colo_checkpoint_notify, s);
487 
488     qemu_sem_init(&s->colo_exit_sem, 0);
489     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
490                       MIGRATION_STATUS_COLO);
491     colo_process_checkpoint(s);
492     qemu_mutex_lock_iothread();
493 }
494 
495 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
496                                      Error **errp)
497 {
498     COLOMessage msg;
499     Error *local_err = NULL;
500 
501     msg = colo_receive_message(f, &local_err);
502     if (local_err) {
503         error_propagate(errp, local_err);
504         return;
505     }
506 
507     switch (msg) {
508     case COLO_MESSAGE_CHECKPOINT_REQUEST:
509         *checkpoint_request = 1;
510         break;
511     default:
512         *checkpoint_request = 0;
513         error_setg(errp, "Got unknown COLO message: %d", msg);
514         break;
515     }
516 }
517 
518 void *colo_process_incoming_thread(void *opaque)
519 {
520     MigrationIncomingState *mis = opaque;
521     QEMUFile *fb = NULL;
522     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
523     uint64_t total_size;
524     uint64_t value;
525     Error *local_err = NULL;
526 
527     qemu_sem_init(&mis->colo_incoming_sem, 0);
528 
529     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
530                       MIGRATION_STATUS_COLO);
531 
532     failover_init_state();
533 
534     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
535     if (!mis->to_src_file) {
536         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
537         goto out;
538     }
539     /*
540      * Note: the communication between Primary side and Secondary side
541      * should be sequential, we set the fd to unblocked in migration incoming
542      * coroutine, and here we are in the COLO incoming thread, so it is ok to
543      * set the fd back to blocked.
544      */
545     qemu_file_set_blocking(mis->from_src_file, true);
546 
547     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
548     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
549     object_unref(OBJECT(bioc));
550 
551     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
552                       &local_err);
553     if (local_err) {
554         goto out;
555     }
556 
557     while (mis->state == MIGRATION_STATUS_COLO) {
558         int request = 0;
559 
560         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
561         if (local_err) {
562             goto out;
563         }
564         assert(request);
565         if (failover_get_state() != FAILOVER_STATUS_NONE) {
566             error_report("failover request");
567             goto out;
568         }
569 
570         /* FIXME: This is unnecessary for periodic checkpoint mode */
571         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
572                      &local_err);
573         if (local_err) {
574             goto out;
575         }
576 
577         colo_receive_check_message(mis->from_src_file,
578                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
579         if (local_err) {
580             goto out;
581         }
582 
583         value = colo_receive_message_value(mis->from_src_file,
584                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
585         if (local_err) {
586             goto out;
587         }
588 
589         /*
590          * Read VM device state data into channel buffer,
591          * It's better to re-use the memory allocated.
592          * Here we need to handle the channel buffer directly.
593          */
594         if (value > bioc->capacity) {
595             bioc->capacity = value;
596             bioc->data = g_realloc(bioc->data, bioc->capacity);
597         }
598         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
599         if (total_size != value) {
600             error_report("Got %" PRIu64 " VMState data, less than expected"
601                         " %" PRIu64, total_size, value);
602             goto out;
603         }
604         bioc->usage = total_size;
605         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
606 
607         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
608                      &local_err);
609         if (local_err) {
610             goto out;
611         }
612 
613         qemu_mutex_lock_iothread();
614         qemu_system_reset(VMRESET_SILENT);
615         vmstate_loading = true;
616         if (qemu_loadvm_state(fb) < 0) {
617             error_report("COLO: loadvm failed");
618             qemu_mutex_unlock_iothread();
619             goto out;
620         }
621 
622         vmstate_loading = false;
623         qemu_mutex_unlock_iothread();
624 
625         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
626             failover_set_state(FAILOVER_STATUS_RELAUNCH,
627                             FAILOVER_STATUS_NONE);
628             failover_request_active(NULL);
629             goto out;
630         }
631 
632         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
633                      &local_err);
634         if (local_err) {
635             goto out;
636         }
637     }
638 
639 out:
640     vmstate_loading = false;
641     /* Throw the unreported error message after exited from loop */
642     if (local_err) {
643         error_report_err(local_err);
644     }
645 
646     if (fb) {
647         qemu_fclose(fb);
648     }
649 
650     /* Hope this not to be too long to loop here */
651     qemu_sem_wait(&mis->colo_incoming_sem);
652     qemu_sem_destroy(&mis->colo_incoming_sem);
653     /* Must be called after failover BH is completed */
654     if (mis->to_src_file) {
655         qemu_fclose(mis->to_src_file);
656     }
657     migration_incoming_exit_colo();
658 
659     return NULL;
660 }
661