xref: /openbmc/qemu/migration/colo.c (revision d89e666e)
1 /*
2  * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3  * (a.k.a. Fault Tolerance or Continuous Replication)
4  *
5  * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6  * Copyright (c) 2016 FUJITSU LIMITED
7  * Copyright (c) 2016 Intel Corporation
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or
10  * later.  See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/timer.h"
15 #include "sysemu/sysemu.h"
16 #include "migration/colo.h"
17 #include "io/channel-buffer.h"
18 #include "trace.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "migration/failover.h"
22 
23 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
24 
25 bool colo_supported(void)
26 {
27     return false;
28 }
29 
30 bool migration_in_colo_state(void)
31 {
32     MigrationState *s = migrate_get_current();
33 
34     return (s->state == MIGRATION_STATUS_COLO);
35 }
36 
37 bool migration_incoming_in_colo_state(void)
38 {
39     MigrationIncomingState *mis = migration_incoming_get_current();
40 
41     return mis && (mis->state == MIGRATION_STATUS_COLO);
42 }
43 
44 static void colo_send_message(QEMUFile *f, COLOMessage msg,
45                               Error **errp)
46 {
47     int ret;
48 
49     if (msg >= COLO_MESSAGE__MAX) {
50         error_setg(errp, "%s: Invalid message", __func__);
51         return;
52     }
53     qemu_put_be32(f, msg);
54     qemu_fflush(f);
55 
56     ret = qemu_file_get_error(f);
57     if (ret < 0) {
58         error_setg_errno(errp, -ret, "Can't send COLO message");
59     }
60     trace_colo_send_message(COLOMessage_lookup[msg]);
61 }
62 
63 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
64                                     uint64_t value, Error **errp)
65 {
66     Error *local_err = NULL;
67     int ret;
68 
69     colo_send_message(f, msg, &local_err);
70     if (local_err) {
71         error_propagate(errp, local_err);
72         return;
73     }
74     qemu_put_be64(f, value);
75     qemu_fflush(f);
76 
77     ret = qemu_file_get_error(f);
78     if (ret < 0) {
79         error_setg_errno(errp, -ret, "Failed to send value for message:%s",
80                          COLOMessage_lookup[msg]);
81     }
82 }
83 
84 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
85 {
86     COLOMessage msg;
87     int ret;
88 
89     msg = qemu_get_be32(f);
90     ret = qemu_file_get_error(f);
91     if (ret < 0) {
92         error_setg_errno(errp, -ret, "Can't receive COLO message");
93         return msg;
94     }
95     if (msg >= COLO_MESSAGE__MAX) {
96         error_setg(errp, "%s: Invalid message", __func__);
97         return msg;
98     }
99     trace_colo_receive_message(COLOMessage_lookup[msg]);
100     return msg;
101 }
102 
103 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
104                                        Error **errp)
105 {
106     COLOMessage msg;
107     Error *local_err = NULL;
108 
109     msg = colo_receive_message(f, &local_err);
110     if (local_err) {
111         error_propagate(errp, local_err);
112         return;
113     }
114     if (msg != expect_msg) {
115         error_setg(errp, "Unexpected COLO message %d, expected %d",
116                           msg, expect_msg);
117     }
118 }
119 
120 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
121                                            Error **errp)
122 {
123     Error *local_err = NULL;
124     uint64_t value;
125     int ret;
126 
127     colo_receive_check_message(f, expect_msg, &local_err);
128     if (local_err) {
129         error_propagate(errp, local_err);
130         return 0;
131     }
132 
133     value = qemu_get_be64(f);
134     ret = qemu_file_get_error(f);
135     if (ret < 0) {
136         error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
137                          COLOMessage_lookup[expect_msg]);
138     }
139     return value;
140 }
141 
142 static int colo_do_checkpoint_transaction(MigrationState *s,
143                                           QIOChannelBuffer *bioc,
144                                           QEMUFile *fb)
145 {
146     Error *local_err = NULL;
147     int ret = -1;
148 
149     colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
150                       &local_err);
151     if (local_err) {
152         goto out;
153     }
154 
155     colo_receive_check_message(s->rp_state.from_dst_file,
156                     COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
157     if (local_err) {
158         goto out;
159     }
160     /* Reset channel-buffer directly */
161     qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
162     bioc->usage = 0;
163 
164     qemu_mutex_lock_iothread();
165     vm_stop_force_state(RUN_STATE_COLO);
166     qemu_mutex_unlock_iothread();
167     trace_colo_vm_state_change("run", "stop");
168 
169     /* Disable block migration */
170     s->params.blk = 0;
171     s->params.shared = 0;
172     qemu_savevm_state_header(fb);
173     qemu_savevm_state_begin(fb, &s->params);
174     qemu_mutex_lock_iothread();
175     qemu_savevm_state_complete_precopy(fb, false);
176     qemu_mutex_unlock_iothread();
177 
178     qemu_fflush(fb);
179 
180     colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
181     if (local_err) {
182         goto out;
183     }
184     /*
185      * We need the size of the VMstate data in Secondary side,
186      * With which we can decide how much data should be read.
187      */
188     colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
189                             bioc->usage, &local_err);
190     if (local_err) {
191         goto out;
192     }
193 
194     qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
195     qemu_fflush(s->to_dst_file);
196     ret = qemu_file_get_error(s->to_dst_file);
197     if (ret < 0) {
198         goto out;
199     }
200 
201     colo_receive_check_message(s->rp_state.from_dst_file,
202                        COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
203     if (local_err) {
204         goto out;
205     }
206 
207     colo_receive_check_message(s->rp_state.from_dst_file,
208                        COLO_MESSAGE_VMSTATE_LOADED, &local_err);
209     if (local_err) {
210         goto out;
211     }
212 
213     ret = 0;
214 
215     qemu_mutex_lock_iothread();
216     vm_start();
217     qemu_mutex_unlock_iothread();
218     trace_colo_vm_state_change("stop", "run");
219 
220 out:
221     if (local_err) {
222         error_report_err(local_err);
223     }
224     return ret;
225 }
226 
227 static void colo_process_checkpoint(MigrationState *s)
228 {
229     QIOChannelBuffer *bioc;
230     QEMUFile *fb = NULL;
231     int64_t current_time, checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
232     Error *local_err = NULL;
233     int ret;
234 
235     s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
236     if (!s->rp_state.from_dst_file) {
237         error_report("Open QEMUFile from_dst_file failed");
238         goto out;
239     }
240 
241     /*
242      * Wait for Secondary finish loading VM states and enter COLO
243      * restore.
244      */
245     colo_receive_check_message(s->rp_state.from_dst_file,
246                        COLO_MESSAGE_CHECKPOINT_READY, &local_err);
247     if (local_err) {
248         goto out;
249     }
250     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
251     fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
252     object_unref(OBJECT(bioc));
253 
254     qemu_mutex_lock_iothread();
255     vm_start();
256     qemu_mutex_unlock_iothread();
257     trace_colo_vm_state_change("stop", "run");
258 
259     while (s->state == MIGRATION_STATUS_COLO) {
260         current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
261         if (current_time - checkpoint_time <
262             s->parameters.x_checkpoint_delay) {
263             int64_t delay_ms;
264 
265             delay_ms = s->parameters.x_checkpoint_delay -
266                        (current_time - checkpoint_time);
267             g_usleep(delay_ms * 1000);
268         }
269         ret = colo_do_checkpoint_transaction(s, bioc, fb);
270         if (ret < 0) {
271             goto out;
272         }
273         checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
274     }
275 
276 out:
277     /* Throw the unreported error message after exited from loop */
278     if (local_err) {
279         error_report_err(local_err);
280     }
281 
282     if (fb) {
283         qemu_fclose(fb);
284     }
285 
286     if (s->rp_state.from_dst_file) {
287         qemu_fclose(s->rp_state.from_dst_file);
288     }
289 }
290 
291 void migrate_start_colo_process(MigrationState *s)
292 {
293     qemu_mutex_unlock_iothread();
294     migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
295                       MIGRATION_STATUS_COLO);
296     colo_process_checkpoint(s);
297     qemu_mutex_lock_iothread();
298 }
299 
300 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
301                                      Error **errp)
302 {
303     COLOMessage msg;
304     Error *local_err = NULL;
305 
306     msg = colo_receive_message(f, &local_err);
307     if (local_err) {
308         error_propagate(errp, local_err);
309         return;
310     }
311 
312     switch (msg) {
313     case COLO_MESSAGE_CHECKPOINT_REQUEST:
314         *checkpoint_request = 1;
315         break;
316     default:
317         *checkpoint_request = 0;
318         error_setg(errp, "Got unknown COLO message: %d", msg);
319         break;
320     }
321 }
322 
323 void *colo_process_incoming_thread(void *opaque)
324 {
325     MigrationIncomingState *mis = opaque;
326     QEMUFile *fb = NULL;
327     QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
328     uint64_t total_size;
329     uint64_t value;
330     Error *local_err = NULL;
331 
332     migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
333                       MIGRATION_STATUS_COLO);
334 
335     mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
336     if (!mis->to_src_file) {
337         error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
338         goto out;
339     }
340     /*
341      * Note: the communication between Primary side and Secondary side
342      * should be sequential, we set the fd to unblocked in migration incoming
343      * coroutine, and here we are in the COLO incoming thread, so it is ok to
344      * set the fd back to blocked.
345      */
346     qemu_file_set_blocking(mis->from_src_file, true);
347 
348     bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
349     fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
350     object_unref(OBJECT(bioc));
351 
352     colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
353                       &local_err);
354     if (local_err) {
355         goto out;
356     }
357 
358     while (mis->state == MIGRATION_STATUS_COLO) {
359         int request;
360 
361         colo_wait_handle_message(mis->from_src_file, &request, &local_err);
362         if (local_err) {
363             goto out;
364         }
365         assert(request);
366         /* FIXME: This is unnecessary for periodic checkpoint mode */
367         colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
368                      &local_err);
369         if (local_err) {
370             goto out;
371         }
372 
373         colo_receive_check_message(mis->from_src_file,
374                            COLO_MESSAGE_VMSTATE_SEND, &local_err);
375         if (local_err) {
376             goto out;
377         }
378 
379         value = colo_receive_message_value(mis->from_src_file,
380                                  COLO_MESSAGE_VMSTATE_SIZE, &local_err);
381         if (local_err) {
382             goto out;
383         }
384 
385         /*
386          * Read VM device state data into channel buffer,
387          * It's better to re-use the memory allocated.
388          * Here we need to handle the channel buffer directly.
389          */
390         if (value > bioc->capacity) {
391             bioc->capacity = value;
392             bioc->data = g_realloc(bioc->data, bioc->capacity);
393         }
394         total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
395         if (total_size != value) {
396             error_report("Got %" PRIu64 " VMState data, less than expected"
397                         " %" PRIu64, total_size, value);
398             goto out;
399         }
400         bioc->usage = total_size;
401         qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
402 
403         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
404                      &local_err);
405         if (local_err) {
406             goto out;
407         }
408 
409         qemu_mutex_lock_iothread();
410         qemu_system_reset(VMRESET_SILENT);
411         if (qemu_loadvm_state(fb) < 0) {
412             error_report("COLO: loadvm failed");
413             qemu_mutex_unlock_iothread();
414             goto out;
415         }
416         qemu_mutex_unlock_iothread();
417 
418         colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
419                      &local_err);
420         if (local_err) {
421             goto out;
422         }
423     }
424 
425 out:
426     /* Throw the unreported error message after exited from loop */
427     if (local_err) {
428         error_report_err(local_err);
429     }
430 
431     if (fb) {
432         qemu_fclose(fb);
433     }
434 
435     if (mis->to_src_file) {
436         qemu_fclose(mis->to_src_file);
437     }
438     migration_incoming_exit_colo();
439 
440     return NULL;
441 }
442