xref: /openbmc/qemu/migration/colo.c (revision c2b38b27)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22 
23 static bool vmstate_loading;
24 
25 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
26 
27 bool colo_supported(void)
28 {
29     return true;
30 }
31 
32 bool migration_in_colo_state(void)
33 {
34     MigrationState *s = migrate_get_current();
35 
36     return (s->state == MIGRATION_STATUS_COLO);
37 }
38 
39 bool migration_incoming_in_colo_state(void)
40 {
41     MigrationIncomingState *mis = migration_incoming_get_current();
42 
43     return mis && (mis->state == MIGRATION_STATUS_COLO);
44 }
45 
46 static bool colo_runstate_is_stopped(void)
47 {
48     return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
49 }
50 
51 static void secondary_vm_do_failover(void)
52 {
53     int old_state;
54     MigrationIncomingState *mis = migration_incoming_get_current();
55 
56     /* Can not do failover during the process of VM's loading VMstate, Or
57      * it will break the secondary VM.
58      */
59     if (vmstate_loading) {
60         old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
61                         FAILOVER_STATUS_RELAUNCH);
62         if (old_state != FAILOVER_STATUS_ACTIVE) {
63             error_report("Unknown error while do failover for secondary VM,"
64                          "old_state: %s", FailoverStatus_lookup[old_state]);
65         }
66         return;
67     }
68 
69     migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
70                       MIGRATION_STATUS_COMPLETED);
71 
72     if (!autostart) {
73         error_report("\"-S\" qemu option will be ignored in secondary side");
74         /* recover runstate to normal migration finish state */
75         autostart = true;
76     }
77     /*
78      * Make sure COLO incoming thread not block in recv or send,
79      * If mis->from_src_file and mis->to_src_file use the same fd,
80      * The second shutdown() will return -1, we ignore this value,
81      * It is harmless.
82      */
83     if (mis->from_src_file) {
84         qemu_file_shutdown(mis->from_src_file);
85     }
86     if (mis->to_src_file) {
87         qemu_file_shutdown(mis->to_src_file);
88     }
89 
90     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
91                                    FAILOVER_STATUS_COMPLETED);
92     if (old_state != FAILOVER_STATUS_ACTIVE) {
93         error_report("Incorrect state (%s) while doing failover for "
94                      "secondary VM", FailoverStatus_lookup[old_state]);
95         return;
96     }
97     /* Notify COLO incoming thread that failover work is finished */
98     qemu_sem_post(&mis->colo_incoming_sem);
99     /* For Secondary VM, jump to incoming co */
100     if (mis->migration_incoming_co) {
101         qemu_coroutine_enter(mis->migration_incoming_co);
102     }
103 }
104 
105 static void primary_vm_do_failover(void)
106 {
107     MigrationState *s = migrate_get_current();
108     int old_state;
109 
110     migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
111                       MIGRATION_STATUS_COMPLETED);
112 
113     /*
114      * Wake up COLO thread which may blocked in recv() or send(),
115      * The s->rp_state.from_dst_file and s->to_dst_file may use the
116      * same fd, but we still shutdown the fd for twice, it is harmless.
117      */
118     if (s->to_dst_file) {
119         qemu_file_shutdown(s->to_dst_file);
120     }
121     if (s->rp_state.from_dst_file) {
122         qemu_file_shutdown(s->rp_state.from_dst_file);
123     }
124 
125     old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
126                                    FAILOVER_STATUS_COMPLETED);
127     if (old_state != FAILOVER_STATUS_ACTIVE) {
128         error_report("Incorrect state (%s) while doing failover for Primary VM",
129                      FailoverStatus_lookup[old_state]);
130         return;
131     }
132     /* Notify COLO thread that failover work is finished */
133     qemu_sem_post(&s->colo_exit_sem);
134 }
135 
136 void colo_do_failover(MigrationState *s)
137 {
138     /* Make sure VM stopped while failover happened. */
139     if (!colo_runstate_is_stopped()) {
140         vm_stop_force_state(RUN_STATE_COLO);
141     }
142 
143     if (get_colo_mode() == COLO_MODE_PRIMARY) {
144         primary_vm_do_failover();
145     } else {
146         secondary_vm_do_failover();
147     }
148 }
149 
150 static void colo_send_message(QEMUFile *f, COLOMessage msg,
151                               Error **errp)
152 {
153     int ret;
154 
155     if (msg >= COLO_MESSAGE__MAX) {
156         error_setg(errp, "%s: Invalid message", __func__);
157         return;
158     }
159     qemu_put_be32(f, msg);
160     qemu_fflush(f);
161 
162     ret = qemu_file_get_error(f);
163     if (ret < 0) {
164         error_setg_errno(errp, -ret, "Can't send COLO message");
165     }
166     trace_colo_send_message(COLOMessage_lookup[msg]);
167 }
168 
169 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
170                                     uint64_t value, Error **errp)
171 {
172     Error *local_err = NULL;
173     int ret;
174 
175     colo_send_message(f, msg, &local_err);
176     if (local_err) {
177         error_propagate(errp, local_err);
178         return;
179     }
180     qemu_put_be64(f, value);
181     qemu_fflush(f);
182 
183     ret = qemu_file_get_error(f);
184     if (ret < 0) {
185         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
186                          COLOMessage_lookup[msg]);
187     }
188 }
189 
190 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
191 {
192     COLOMessage msg;
193     int ret;
194 
195     msg = qemu_get_be32(f);
196     ret = qemu_file_get_error(f);
197     if (ret < 0) {
198         error_setg_errno(errp, -ret, "Can't receive COLO message");
199         return msg;
200     }
201     if (msg >= COLO_MESSAGE__MAX) {
202         error_setg(errp, "%s: Invalid message", __func__);
203         return msg;
204     }
205     trace_colo_receive_message(COLOMessage_lookup[msg]);
206     return msg;
207 }
208 
209 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
210                                        Error **errp)
211 {
212     COLOMessage msg;
213     Error *local_err = NULL;
214 
215     msg = colo_receive_message(f, &local_err);
216     if (local_err) {
217         error_propagate(errp, local_err);
218         return;
219     }
220     if (msg != expect_msg) {
221         error_setg(errp, "Unexpected COLO message %d, expected %d",
222                           msg, expect_msg);
223     }
224 }
225 
226 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
227                                            Error **errp)
228 {
229     Error *local_err = NULL;
230     uint64_t value;
231     int ret;
232 
233     colo_receive_check_message(f, expect_msg, &local_err);
234     if (local_err) {
235         error_propagate(errp, local_err);
236         return 0;
237     }
238 
239     value = qemu_get_be64(f);
240     ret = qemu_file_get_error(f);
241     if (ret < 0) {
242         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
243                          COLOMessage_lookup[expect_msg]);
244     }
245     return value;
246 }
247 
248 static int colo_do_checkpoint_transaction(MigrationState *s,
249                                           QIOChannelBuffer *bioc,
250                                           QEMUFile *fb)
251 {
252     Error *local_err = NULL;
253     int ret = -1;
254 
255     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
256                       &local_err);
257     if (local_err) {
258         goto out;
259     }
260 
261     colo_receive_check_message(s->rp_state.from_dst_file,
262                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
263     if (local_err) {
264         goto out;
265     }
266     /* Reset channel-buffer directly */
267     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
268     bioc->usage = 0;
269 
270     qemu_mutex_lock_iothread();
271     if (failover_get_state() != FAILOVER_STATUS_NONE) {
272         qemu_mutex_unlock_iothread();
273         goto out;
274     }
275     vm_stop_force_state(RUN_STATE_COLO);
276     qemu_mutex_unlock_iothread();
277     trace_colo_vm_state_change("run", "stop");
278     /*
279      * Failover request bh could be called after vm_stop_force_state(),
280      * So we need check failover_request_is_active() again.
281      */
282     if (failover_get_state() != FAILOVER_STATUS_NONE) {
283         goto out;
284     }
285 
286     /* Disable block migration */
287     s->params.blk = 0;
288     s->params.shared = 0;
289     qemu_savevm_state_header(fb);
290     qemu_savevm_state_begin(fb, &s->params);
291     qemu_mutex_lock_iothread();
292     qemu_savevm_state_complete_precopy(fb, false);
293     qemu_mutex_unlock_iothread();
294 
295     qemu_fflush(fb);
296 
297     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
298     if (local_err) {
299         goto out;
300     }
301     /*
302      * We need the size of the VMstate data in Secondary side,
303      * With which we can decide how much data should be read.
304      */
305     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
306                             bioc->usage, &local_err);
307     if (local_err) {
308         goto out;
309     }
310 
311     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
312     qemu_fflush(s->to_dst_file);
313     ret = qemu_file_get_error(s->to_dst_file);
314     if (ret < 0) {
315         goto out;
316     }
317 
318     colo_receive_check_message(s->rp_state.from_dst_file,
319                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
320     if (local_err) {
321         goto out;
322     }
323 
324     colo_receive_check_message(s->rp_state.from_dst_file,
325                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
326     if (local_err) {
327         goto out;
328     }
329 
330     ret = 0;
331 
332     qemu_mutex_lock_iothread();
333     vm_start();
334     qemu_mutex_unlock_iothread();
335     trace_colo_vm_state_change("stop", "run");
336 
337 out:
338     if (local_err) {
339         error_report_err(local_err);
340     }
341     return ret;
342 }
343 
344 static void colo_process_checkpoint(MigrationState *s)
345 {
346     QIOChannelBuffer *bioc;
347     QEMUFile *fb = NULL;
348     int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
349     Error *local_err = NULL;
350     int ret;
351 
352     failover_init_state();
353 
354     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
355     if (!s->rp_state.from_dst_file) {
356         error_report("Open QEMUFile from_dst_file failed");
357         goto out;
358     }
359 
360     /*
361      * Wait for Secondary finish loading VM states and enter COLO
362      * restore.
363      */
364     colo_receive_check_message(s->rp_state.from_dst_file,
365                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
366     if (local_err) {
367         goto out;
368     }
369     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
370     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
371     object_unref(OBJECT(bioc));
372 
373     qemu_mutex_lock_iothread();
374     vm_start();
375     qemu_mutex_unlock_iothread();
376     trace_colo_vm_state_change("stop", "run");
377 
378     timer_mod(s->colo_delay_timer,
379             current_time + s->parameters.x_checkpoint_delay);
380 
381     while (s->state == MIGRATION_STATUS_COLO) {
382         if (failover_get_state() != FAILOVER_STATUS_NONE) {
383             error_report("failover request");
384             goto out;
385         }
386 
387         qemu_sem_wait(&s->colo_checkpoint_sem);
388 
389         ret = colo_do_checkpoint_transaction(s, bioc, fb);
390         if (ret < 0) {
391             goto out;
392         }
393     }
394 
395 out:
396     /* Throw the unreported error message after exited from loop */
397     if (local_err) {
398         error_report_err(local_err);
399     }
400 
401     if (fb) {
402         qemu_fclose(fb);
403     }
404 
405     timer_del(s->colo_delay_timer);
406 
407     /* Hope this not to be too long to wait here */
408     qemu_sem_wait(&s->colo_exit_sem);
409     qemu_sem_destroy(&s->colo_exit_sem);
410     /*
411      * Must be called after failover BH is completed,
412      * Or the failover BH may shutdown the wrong fd that
413      * re-used by other threads after we release here.
414      */
415     if (s->rp_state.from_dst_file) {
416         qemu_fclose(s->rp_state.from_dst_file);
417     }
418 }
419 
420 void colo_checkpoint_notify(void *opaque)
421 {
422     MigrationState *s = opaque;
423     int64_t next_notify_time;
424 
425     qemu_sem_post(&s->colo_checkpoint_sem);
426     s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
427     next_notify_time = s->colo_checkpoint_time +
428                     s->parameters.x_checkpoint_delay;
429     timer_mod(s->colo_delay_timer, next_notify_time);
430 }
431 
432 void migrate_start_colo_process(MigrationState *s)
433 {
434     qemu_mutex_unlock_iothread();
435     qemu_sem_init(&s->colo_checkpoint_sem, 0);
436     s->colo_delay_timer =  timer_new_ms(QEMU_CLOCK_HOST,
437                                 colo_checkpoint_notify, s);
438 
439     qemu_sem_init(&s->colo_exit_sem, 0);
440     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
441                       MIGRATION_STATUS_COLO);
442     colo_process_checkpoint(s);
443     qemu_mutex_lock_iothread();
444 }
445 
446 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
447                                      Error **errp)
448 {
449     COLOMessage msg;
450     Error *local_err = NULL;
451 
452     msg = colo_receive_message(f, &local_err);
453     if (local_err) {
454         error_propagate(errp, local_err);
455         return;
456     }
457 
458     switch (msg) {
459     case COLO_MESSAGE_CHECKPOINT_REQUEST:
460         *checkpoint_request = 1;
461         break;
462     default:
463         *checkpoint_request = 0;
464         error_setg(errp, "Got unknown COLO message: %d", msg);
465         break;
466     }
467 }
468 
469 void *colo_process_incoming_thread(void *opaque)
470 {
471     MigrationIncomingState *mis = opaque;
472     QEMUFile *fb = NULL;
473     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
474     uint64_t total_size;
475     uint64_t value;
476     Error *local_err = NULL;
477 
478     qemu_sem_init(&mis->colo_incoming_sem, 0);
479 
480     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
481                       MIGRATION_STATUS_COLO);
482 
483     failover_init_state();
484 
485     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
486     if (!mis->to_src_file) {
487         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
488         goto out;
489     }
490     /*
491      * Note: the communication between Primary side and Secondary side
492      * should be sequential, we set the fd to unblocked in migration incoming
493      * coroutine, and here we are in the COLO incoming thread, so it is ok to
494      * set the fd back to blocked.
495      */
496     qemu_file_set_blocking(mis->from_src_file, true);
497 
498     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
499     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
500     object_unref(OBJECT(bioc));
501 
502     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
503                       &local_err);
504     if (local_err) {
505         goto out;
506     }
507 
508     while (mis->state == MIGRATION_STATUS_COLO) {
509         int request = 0;
510 
511         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
512         if (local_err) {
513             goto out;
514         }
515         assert(request);
516         if (failover_get_state() != FAILOVER_STATUS_NONE) {
517             error_report("failover request");
518             goto out;
519         }
520 
521         /* FIXME: This is unnecessary for periodic checkpoint mode */
522         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
523                      &local_err);
524         if (local_err) {
525             goto out;
526         }
527 
528         colo_receive_check_message(mis->from_src_file,
529                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
530         if (local_err) {
531             goto out;
532         }
533 
534         value = colo_receive_message_value(mis->from_src_file,
535                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
536         if (local_err) {
537             goto out;
538         }
539 
540         /*
541          * Read VM device state data into channel buffer,
542          * It's better to re-use the memory allocated.
543          * Here we need to handle the channel buffer directly.
544          */
545         if (value > bioc->capacity) {
546             bioc->capacity = value;
547             bioc->data = g_realloc(bioc->data, bioc->capacity);
548         }
549         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
550         if (total_size != value) {
551             error_report("Got %" PRIu64 " VMState data, less than expected"
552                         " %" PRIu64, total_size, value);
553             goto out;
554         }
555         bioc->usage = total_size;
556         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
557 
558         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
559                      &local_err);
560         if (local_err) {
561             goto out;
562         }
563 
564         qemu_mutex_lock_iothread();
565         qemu_system_reset(VMRESET_SILENT);
566         vmstate_loading = true;
567         if (qemu_loadvm_state(fb) < 0) {
568             error_report("COLO: loadvm failed");
569             qemu_mutex_unlock_iothread();
570             goto out;
571         }
572 
573         vmstate_loading = false;
574         qemu_mutex_unlock_iothread();
575 
576         if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
577             failover_set_state(FAILOVER_STATUS_RELAUNCH,
578                             FAILOVER_STATUS_NONE);
579             failover_request_active(NULL);
580             goto out;
581         }
582 
583         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
584                      &local_err);
585         if (local_err) {
586             goto out;
587         }
588     }
589 
590 out:
591     vmstate_loading = false;
592     /* Throw the unreported error message after exited from loop */
593     if (local_err) {
594         error_report_err(local_err);
595     }
596 
597     if (fb) {
598         qemu_fclose(fb);
599     }
600 
601     /* Hope this not to be too long to loop here */
602     qemu_sem_wait(&mis->colo_incoming_sem);
603     qemu_sem_destroy(&mis->colo_incoming_sem);
604     /* Must be called after failover BH is completed */
605     if (mis->to_src_file) {
606         qemu_fclose(mis->to_src_file);
607     }
608     migration_incoming_exit_colo();
609 
610     return NULL;
611 }
612