xref: /openbmc/qemu/migration/multifd.c (revision 4c107870)
1d32ca5adSJuan Quintela /*
2d32ca5adSJuan Quintela  * Multifd common code
3d32ca5adSJuan Quintela  *
4d32ca5adSJuan Quintela  * Copyright (c) 2019-2020 Red Hat Inc
5d32ca5adSJuan Quintela  *
6d32ca5adSJuan Quintela  * Authors:
7d32ca5adSJuan Quintela  *  Juan Quintela <quintela@redhat.com>
8d32ca5adSJuan Quintela  *
9d32ca5adSJuan Quintela  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10d32ca5adSJuan Quintela  * See the COPYING file in the top-level directory.
11d32ca5adSJuan Quintela  */
12d32ca5adSJuan Quintela 
13d32ca5adSJuan Quintela #include "qemu/osdep.h"
14303e6f54SHao Xiang #include "qemu/cutils.h"
15d32ca5adSJuan Quintela #include "qemu/rcu.h"
16d32ca5adSJuan Quintela #include "exec/target_page.h"
17d32ca5adSJuan Quintela #include "sysemu/sysemu.h"
18d32ca5adSJuan Quintela #include "exec/ramblock.h"
19d32ca5adSJuan Quintela #include "qemu/error-report.h"
20d32ca5adSJuan Quintela #include "qapi/error.h"
21b7b03eb6SFabiano Rosas #include "file.h"
22d32ca5adSJuan Quintela #include "migration.h"
23947701ccSJuan Quintela #include "migration-stats.h"
24d32ca5adSJuan Quintela #include "socket.h"
2529647140SChuan Zheng #include "tls.h"
26d32ca5adSJuan Quintela #include "qemu-file.h"
27d32ca5adSJuan Quintela #include "trace.h"
28d32ca5adSJuan Quintela #include "multifd.h"
291b1f4ab6SJiang Jiacheng #include "threadinfo.h"
30b4bc342cSJuan Quintela #include "options.h"
31b5eea99eSLukas Straub #include "qemu/yank.h"
32b7b03eb6SFabiano Rosas #include "io/channel-file.h"
33b5eea99eSLukas Straub #include "io/channel-socket.h"
341a92d6d5SLukas Straub #include "yank_functions.h"
35b5eea99eSLukas Straub 
36d32ca5adSJuan Quintela /* Multiple fd's */
37d32ca5adSJuan Quintela 
38d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U
39d32ca5adSJuan Quintela #define MULTIFD_VERSION 1
40d32ca5adSJuan Quintela 
41d32ca5adSJuan Quintela typedef struct {
42d32ca5adSJuan Quintela     uint32_t magic;
43d32ca5adSJuan Quintela     uint32_t version;
44d32ca5adSJuan Quintela     unsigned char uuid[16]; /* QemuUUID */
45d32ca5adSJuan Quintela     uint8_t id;
46d32ca5adSJuan Quintela     uint8_t unused1[7];     /* Reserved for future use */
47d32ca5adSJuan Quintela     uint64_t unused2[4];    /* Reserved for future use */
48d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t;
49d32ca5adSJuan Quintela 
5098ea497dSPeter Xu struct {
5198ea497dSPeter Xu     MultiFDSendParams *params;
5298ea497dSPeter Xu     /* array of pages to sent */
5398ea497dSPeter Xu     MultiFDPages_t *pages;
5498ea497dSPeter Xu     /*
5598ea497dSPeter Xu      * Global number of generated multifd packets.
5698ea497dSPeter Xu      *
5798ea497dSPeter Xu      * Note that we used 'uintptr_t' because it'll naturally support atomic
5898ea497dSPeter Xu      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
5998ea497dSPeter Xu      * multifd will overflow the packet_num easier, but that should be
6098ea497dSPeter Xu      * fine.
6198ea497dSPeter Xu      *
6298ea497dSPeter Xu      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
6398ea497dSPeter Xu      * hosts, however so far it does not support atomic fetch_add() yet.
6498ea497dSPeter Xu      * Make it easy for now.
6598ea497dSPeter Xu      */
6698ea497dSPeter Xu     uintptr_t packet_num;
6793fa9dc2SFabiano Rosas     /*
6893fa9dc2SFabiano Rosas      * Synchronization point past which no more channels will be
6993fa9dc2SFabiano Rosas      * created.
7093fa9dc2SFabiano Rosas      */
7193fa9dc2SFabiano Rosas     QemuSemaphore channels_created;
7298ea497dSPeter Xu     /* send channels ready */
7398ea497dSPeter Xu     QemuSemaphore channels_ready;
7498ea497dSPeter Xu     /*
7598ea497dSPeter Xu      * Have we already run terminate threads.  There is a race when it
7698ea497dSPeter Xu      * happens that we got one error while we are exiting.
7798ea497dSPeter Xu      * We will use atomic operations.  Only valid values are 0 and 1.
7898ea497dSPeter Xu      */
7998ea497dSPeter Xu     int exiting;
8098ea497dSPeter Xu     /* multifd ops */
8198ea497dSPeter Xu     MultiFDMethods *ops;
8298ea497dSPeter Xu } *multifd_send_state;
8398ea497dSPeter Xu 
8411dd7be5SFabiano Rosas struct {
8511dd7be5SFabiano Rosas     MultiFDRecvParams *params;
86d117ed06SFabiano Rosas     MultiFDRecvData *data;
8711dd7be5SFabiano Rosas     /* number of created threads */
8811dd7be5SFabiano Rosas     int count;
89d117ed06SFabiano Rosas     /*
90d117ed06SFabiano Rosas      * This is always posted by the recv threads, the migration thread
91d117ed06SFabiano Rosas      * uses it to wait for recv threads to finish assigned tasks.
92d117ed06SFabiano Rosas      */
9311dd7be5SFabiano Rosas     QemuSemaphore sem_sync;
9411dd7be5SFabiano Rosas     /* global number of generated multifd packets */
9511dd7be5SFabiano Rosas     uint64_t packet_num;
9611dd7be5SFabiano Rosas     int exiting;
9711dd7be5SFabiano Rosas     /* multifd ops */
9811dd7be5SFabiano Rosas     MultiFDMethods *ops;
9911dd7be5SFabiano Rosas } *multifd_recv_state;
10011dd7be5SFabiano Rosas 
multifd_use_packets(void)10106833d83SFabiano Rosas static bool multifd_use_packets(void)
10206833d83SFabiano Rosas {
10306833d83SFabiano Rosas     return !migrate_mapped_ram();
10406833d83SFabiano Rosas }
10506833d83SFabiano Rosas 
multifd_send_channel_created(void)106a8a3e710SFabiano Rosas void multifd_send_channel_created(void)
107a8a3e710SFabiano Rosas {
108a8a3e710SFabiano Rosas     qemu_sem_post(&multifd_send_state->channels_created);
109a8a3e710SFabiano Rosas }
110a8a3e710SFabiano Rosas 
multifd_set_file_bitmap(MultiFDSendParams * p)111f427d90bSFabiano Rosas static void multifd_set_file_bitmap(MultiFDSendParams *p)
112f427d90bSFabiano Rosas {
113f427d90bSFabiano Rosas     MultiFDPages_t *pages = p->pages;
114f427d90bSFabiano Rosas 
115f427d90bSFabiano Rosas     assert(pages->block);
116f427d90bSFabiano Rosas 
117303e6f54SHao Xiang     for (int i = 0; i < p->pages->normal_num; i++) {
118c3cdf3fbSFabiano Rosas         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
119f427d90bSFabiano Rosas     }
120303e6f54SHao Xiang 
1218fa1a21cSFabiano Rosas     for (int i = p->pages->normal_num; i < p->pages->num; i++) {
122303e6f54SHao Xiang         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
123303e6f54SHao Xiang     }
124f427d90bSFabiano Rosas }
125f427d90bSFabiano Rosas 
126ab7cbb0bSJuan Quintela /* Multifd without compression */
127ab7cbb0bSJuan Quintela 
128ab7cbb0bSJuan Quintela /**
129ab7cbb0bSJuan Quintela  * nocomp_send_setup: setup send side
130ab7cbb0bSJuan Quintela  *
131ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
132ab7cbb0bSJuan Quintela  * @errp: pointer to an error
133ab7cbb0bSJuan Quintela  */
nocomp_send_setup(MultiFDSendParams * p,Error ** errp)134ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
135ab7cbb0bSJuan Quintela {
13625a1f878SPeter Xu     if (migrate_zero_copy_send()) {
13725a1f878SPeter Xu         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
13825a1f878SPeter Xu     }
13925a1f878SPeter Xu 
140d9d3e4f2SYuan Liu     if (multifd_use_packets()) {
141d9d3e4f2SYuan Liu         /* We need one extra place for the packet header */
142d9d3e4f2SYuan Liu         p->iov = g_new0(struct iovec, p->page_count + 1);
143d9d3e4f2SYuan Liu     } else {
144d9d3e4f2SYuan Liu         p->iov = g_new0(struct iovec, p->page_count);
145d9d3e4f2SYuan Liu     }
146d9d3e4f2SYuan Liu 
147ab7cbb0bSJuan Quintela     return 0;
148ab7cbb0bSJuan Quintela }
149ab7cbb0bSJuan Quintela 
150ab7cbb0bSJuan Quintela /**
151ab7cbb0bSJuan Quintela  * nocomp_send_cleanup: cleanup send side
152ab7cbb0bSJuan Quintela  *
153ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
154ab7cbb0bSJuan Quintela  *
155ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
15618ede636SJuan Quintela  * @errp: pointer to an error
157ab7cbb0bSJuan Quintela  */
nocomp_send_cleanup(MultiFDSendParams * p,Error ** errp)158ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
159ab7cbb0bSJuan Quintela {
160d9d3e4f2SYuan Liu     g_free(p->iov);
161d9d3e4f2SYuan Liu     p->iov = NULL;
162ab7cbb0bSJuan Quintela     return;
163ab7cbb0bSJuan Quintela }
164ab7cbb0bSJuan Quintela 
multifd_send_prepare_iovs(MultiFDSendParams * p)16506833d83SFabiano Rosas static void multifd_send_prepare_iovs(MultiFDSendParams *p)
16606833d83SFabiano Rosas {
16706833d83SFabiano Rosas     MultiFDPages_t *pages = p->pages;
16806833d83SFabiano Rosas 
169303e6f54SHao Xiang     for (int i = 0; i < pages->normal_num; i++) {
17006833d83SFabiano Rosas         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
17106833d83SFabiano Rosas         p->iov[p->iovs_num].iov_len = p->page_size;
17206833d83SFabiano Rosas         p->iovs_num++;
17306833d83SFabiano Rosas     }
17406833d83SFabiano Rosas 
175303e6f54SHao Xiang     p->next_packet_size = pages->normal_num * p->page_size;
17606833d83SFabiano Rosas }
17706833d83SFabiano Rosas 
178ab7cbb0bSJuan Quintela /**
179ab7cbb0bSJuan Quintela  * nocomp_send_prepare: prepare date to be able to send
180ab7cbb0bSJuan Quintela  *
181ab7cbb0bSJuan Quintela  * For no compression we just have to calculate the size of the
182ab7cbb0bSJuan Quintela  * packet.
183ab7cbb0bSJuan Quintela  *
184ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
185ab7cbb0bSJuan Quintela  *
186ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
187ab7cbb0bSJuan Quintela  * @errp: pointer to an error
188ab7cbb0bSJuan Quintela  */
nocomp_send_prepare(MultiFDSendParams * p,Error ** errp)18902fb8104SJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
190ab7cbb0bSJuan Quintela {
19125a1f878SPeter Xu     bool use_zero_copy_send = migrate_zero_copy_send();
19225a1f878SPeter Xu     int ret;
19325a1f878SPeter Xu 
194303e6f54SHao Xiang     multifd_send_zero_page_detect(p);
195303e6f54SHao Xiang 
19606833d83SFabiano Rosas     if (!multifd_use_packets()) {
19706833d83SFabiano Rosas         multifd_send_prepare_iovs(p);
198f427d90bSFabiano Rosas         multifd_set_file_bitmap(p);
199f427d90bSFabiano Rosas 
20006833d83SFabiano Rosas         return 0;
20106833d83SFabiano Rosas     }
20206833d83SFabiano Rosas 
20325a1f878SPeter Xu     if (!use_zero_copy_send) {
20425a1f878SPeter Xu         /*
20525a1f878SPeter Xu          * Only !zerocopy needs the header in IOV; zerocopy will
20625a1f878SPeter Xu          * send it separately.
20725a1f878SPeter Xu          */
20825a1f878SPeter Xu         multifd_send_prepare_header(p);
20925a1f878SPeter Xu     }
210226468baSJuan Quintela 
21106833d83SFabiano Rosas     multifd_send_prepare_iovs(p);
212ab7cbb0bSJuan Quintela     p->flags |= MULTIFD_FLAG_NOCOMP;
21325a1f878SPeter Xu 
21425a1f878SPeter Xu     multifd_send_fill_packet(p);
21525a1f878SPeter Xu 
21625a1f878SPeter Xu     if (use_zero_copy_send) {
21725a1f878SPeter Xu         /* Send header first, without zerocopy */
21825a1f878SPeter Xu         ret = qio_channel_write_all(p->c, (void *)p->packet,
21925a1f878SPeter Xu                                     p->packet_len, errp);
22025a1f878SPeter Xu         if (ret != 0) {
22125a1f878SPeter Xu             return -1;
22225a1f878SPeter Xu         }
22325a1f878SPeter Xu     }
22425a1f878SPeter Xu 
225ab7cbb0bSJuan Quintela     return 0;
226ab7cbb0bSJuan Quintela }
227ab7cbb0bSJuan Quintela 
228ab7cbb0bSJuan Quintela /**
229ab7cbb0bSJuan Quintela  * nocomp_recv_setup: setup receive side
230ab7cbb0bSJuan Quintela  *
231ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
232ab7cbb0bSJuan Quintela  *
233ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
234ab7cbb0bSJuan Quintela  *
235ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
236ab7cbb0bSJuan Quintela  * @errp: pointer to an error
237ab7cbb0bSJuan Quintela  */
nocomp_recv_setup(MultiFDRecvParams * p,Error ** errp)238ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
239ab7cbb0bSJuan Quintela {
240d9d3e4f2SYuan Liu     p->iov = g_new0(struct iovec, p->page_count);
241ab7cbb0bSJuan Quintela     return 0;
242ab7cbb0bSJuan Quintela }
243ab7cbb0bSJuan Quintela 
244ab7cbb0bSJuan Quintela /**
245ab7cbb0bSJuan Quintela  * nocomp_recv_cleanup: setup receive side
246ab7cbb0bSJuan Quintela  *
247ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
248ab7cbb0bSJuan Quintela  *
249ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
250ab7cbb0bSJuan Quintela  */
nocomp_recv_cleanup(MultiFDRecvParams * p)251ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p)
252ab7cbb0bSJuan Quintela {
253d9d3e4f2SYuan Liu     g_free(p->iov);
254d9d3e4f2SYuan Liu     p->iov = NULL;
255ab7cbb0bSJuan Quintela }
256ab7cbb0bSJuan Quintela 
257ab7cbb0bSJuan Quintela /**
2589db19125SFabiano Rosas  * nocomp_recv: read the data from the channel
259ab7cbb0bSJuan Quintela  *
260ab7cbb0bSJuan Quintela  * For no compression we just need to read things into the correct place.
261ab7cbb0bSJuan Quintela  *
262ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
263ab7cbb0bSJuan Quintela  *
264ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
265ab7cbb0bSJuan Quintela  * @errp: pointer to an error
266ab7cbb0bSJuan Quintela  */
nocomp_recv(MultiFDRecvParams * p,Error ** errp)2679db19125SFabiano Rosas static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
268ab7cbb0bSJuan Quintela {
26906833d83SFabiano Rosas     uint32_t flags;
27006833d83SFabiano Rosas 
27106833d83SFabiano Rosas     if (!multifd_use_packets()) {
272a49d15a3SFabiano Rosas         return multifd_file_recv_data(p, errp);
27306833d83SFabiano Rosas     }
27406833d83SFabiano Rosas 
27506833d83SFabiano Rosas     flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
276ab7cbb0bSJuan Quintela 
277ab7cbb0bSJuan Quintela     if (flags != MULTIFD_FLAG_NOCOMP) {
27804e11404SJuan Quintela         error_setg(errp, "multifd %u: flags received %x flags expected %x",
279ab7cbb0bSJuan Quintela                    p->id, flags, MULTIFD_FLAG_NOCOMP);
280ab7cbb0bSJuan Quintela         return -1;
281ab7cbb0bSJuan Quintela     }
282303e6f54SHao Xiang 
283303e6f54SHao Xiang     multifd_recv_zero_page_process(p);
284303e6f54SHao Xiang 
285303e6f54SHao Xiang     if (!p->normal_num) {
286303e6f54SHao Xiang         return 0;
287303e6f54SHao Xiang     }
288303e6f54SHao Xiang 
289cf2d4aa8SJuan Quintela     for (int i = 0; i < p->normal_num; i++) {
290faf60935SJuan Quintela         p->iov[i].iov_base = p->host + p->normal[i];
291ddec20f8SJuan Quintela         p->iov[i].iov_len = p->page_size;
2925ef7e26bSYuan Liu         ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
293226468baSJuan Quintela     }
294cf2d4aa8SJuan Quintela     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
295ab7cbb0bSJuan Quintela }
296ab7cbb0bSJuan Quintela 
297ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = {
298ab7cbb0bSJuan Quintela     .send_setup = nocomp_send_setup,
299ab7cbb0bSJuan Quintela     .send_cleanup = nocomp_send_cleanup,
300ab7cbb0bSJuan Quintela     .send_prepare = nocomp_send_prepare,
301ab7cbb0bSJuan Quintela     .recv_setup = nocomp_recv_setup,
302ab7cbb0bSJuan Quintela     .recv_cleanup = nocomp_recv_cleanup,
3039db19125SFabiano Rosas     .recv = nocomp_recv
304ab7cbb0bSJuan Quintela };
305ab7cbb0bSJuan Quintela 
306ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
307ab7cbb0bSJuan Quintela     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
308ab7cbb0bSJuan Quintela };
309ab7cbb0bSJuan Quintela 
multifd_register_ops(int method,MultiFDMethods * ops)3107ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops)
3117ec2c2b3SJuan Quintela {
3127ec2c2b3SJuan Quintela     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
3137ec2c2b3SJuan Quintela     multifd_ops[method] = ops;
3147ec2c2b3SJuan Quintela }
3157ec2c2b3SJuan Quintela 
316836eca47SPeter Xu /* Reset a MultiFDPages_t* object for the next use */
multifd_pages_reset(MultiFDPages_t * pages)317836eca47SPeter Xu static void multifd_pages_reset(MultiFDPages_t *pages)
318836eca47SPeter Xu {
319836eca47SPeter Xu     /*
320836eca47SPeter Xu      * We don't need to touch offset[] array, because it will be
321836eca47SPeter Xu      * overwritten later when reused.
322836eca47SPeter Xu      */
323836eca47SPeter Xu     pages->num = 0;
324303e6f54SHao Xiang     pages->normal_num = 0;
325836eca47SPeter Xu     pages->block = NULL;
326836eca47SPeter Xu }
327836eca47SPeter Xu 
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)328d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
329d32ca5adSJuan Quintela {
330d32ca5adSJuan Quintela     MultiFDInit_t msg = {};
331cbec7eb7SJuan Quintela     size_t size = sizeof(msg);
332d32ca5adSJuan Quintela     int ret;
333d32ca5adSJuan Quintela 
334d32ca5adSJuan Quintela     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
335d32ca5adSJuan Quintela     msg.version = cpu_to_be32(MULTIFD_VERSION);
336d32ca5adSJuan Quintela     msg.id = p->id;
337d32ca5adSJuan Quintela     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
338d32ca5adSJuan Quintela 
339cbec7eb7SJuan Quintela     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
340d32ca5adSJuan Quintela     if (ret != 0) {
341d32ca5adSJuan Quintela         return -1;
342d32ca5adSJuan Quintela     }
343cbec7eb7SJuan Quintela     stat64_add(&mig_stats.multifd_bytes, size);
344d32ca5adSJuan Quintela     return 0;
345d32ca5adSJuan Quintela }
346d32ca5adSJuan Quintela 
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)347d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
348d32ca5adSJuan Quintela {
349d32ca5adSJuan Quintela     MultiFDInit_t msg;
350d32ca5adSJuan Quintela     int ret;
351d32ca5adSJuan Quintela 
352d32ca5adSJuan Quintela     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
353d32ca5adSJuan Quintela     if (ret != 0) {
354d32ca5adSJuan Quintela         return -1;
355d32ca5adSJuan Quintela     }
356d32ca5adSJuan Quintela 
357d32ca5adSJuan Quintela     msg.magic = be32_to_cpu(msg.magic);
358d32ca5adSJuan Quintela     msg.version = be32_to_cpu(msg.version);
359d32ca5adSJuan Quintela 
360d32ca5adSJuan Quintela     if (msg.magic != MULTIFD_MAGIC) {
361d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet magic %x "
362d32ca5adSJuan Quintela                    "expected %x", msg.magic, MULTIFD_MAGIC);
363d32ca5adSJuan Quintela         return -1;
364d32ca5adSJuan Quintela     }
365d32ca5adSJuan Quintela 
366d32ca5adSJuan Quintela     if (msg.version != MULTIFD_VERSION) {
36704e11404SJuan Quintela         error_setg(errp, "multifd: received packet version %u "
36804e11404SJuan Quintela                    "expected %u", msg.version, MULTIFD_VERSION);
369d32ca5adSJuan Quintela         return -1;
370d32ca5adSJuan Quintela     }
371d32ca5adSJuan Quintela 
372d32ca5adSJuan Quintela     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
373d32ca5adSJuan Quintela         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
374d32ca5adSJuan Quintela         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
375d32ca5adSJuan Quintela 
376d32ca5adSJuan Quintela         error_setg(errp, "multifd: received uuid '%s' and expected "
377d32ca5adSJuan Quintela                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
378d32ca5adSJuan Quintela         g_free(uuid);
379d32ca5adSJuan Quintela         g_free(msg_uuid);
380d32ca5adSJuan Quintela         return -1;
381d32ca5adSJuan Quintela     }
382d32ca5adSJuan Quintela 
383d32ca5adSJuan Quintela     if (msg.id > migrate_multifd_channels()) {
384c77b4085SAvihai Horon         error_setg(errp, "multifd: received channel id %u is greater than "
385c77b4085SAvihai Horon                    "number of channels %u", msg.id, migrate_multifd_channels());
386d32ca5adSJuan Quintela         return -1;
387d32ca5adSJuan Quintela     }
388d32ca5adSJuan Quintela 
389d32ca5adSJuan Quintela     return msg.id;
390d32ca5adSJuan Quintela }
391d32ca5adSJuan Quintela 
multifd_pages_init(uint32_t n)3926074f816SFabiano Rosas static MultiFDPages_t *multifd_pages_init(uint32_t n)
393d32ca5adSJuan Quintela {
394d32ca5adSJuan Quintela     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
395d32ca5adSJuan Quintela 
3966074f816SFabiano Rosas     pages->allocated = n;
3976074f816SFabiano Rosas     pages->offset = g_new0(ram_addr_t, n);
398d32ca5adSJuan Quintela 
399d32ca5adSJuan Quintela     return pages;
400d32ca5adSJuan Quintela }
401d32ca5adSJuan Quintela 
multifd_pages_clear(MultiFDPages_t * pages)402d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages)
403d32ca5adSJuan Quintela {
404836eca47SPeter Xu     multifd_pages_reset(pages);
405d32ca5adSJuan Quintela     pages->allocated = 0;
406d32ca5adSJuan Quintela     g_free(pages->offset);
407d32ca5adSJuan Quintela     pages->offset = NULL;
408d32ca5adSJuan Quintela     g_free(pages);
409d32ca5adSJuan Quintela }
410d32ca5adSJuan Quintela 
multifd_send_fill_packet(MultiFDSendParams * p)41125a1f878SPeter Xu void multifd_send_fill_packet(MultiFDSendParams *p)
412d32ca5adSJuan Quintela {
413d32ca5adSJuan Quintela     MultiFDPacket_t *packet = p->packet;
414efd8c543SPeter Xu     MultiFDPages_t *pages = p->pages;
41598ea497dSPeter Xu     uint64_t packet_num;
416303e6f54SHao Xiang     uint32_t zero_num = pages->num - pages->normal_num;
417d32ca5adSJuan Quintela     int i;
418d32ca5adSJuan Quintela 
419d32ca5adSJuan Quintela     packet->flags = cpu_to_be32(p->flags);
420d32ca5adSJuan Quintela     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
421303e6f54SHao Xiang     packet->normal_pages = cpu_to_be32(pages->normal_num);
422303e6f54SHao Xiang     packet->zero_pages = cpu_to_be32(zero_num);
423d32ca5adSJuan Quintela     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
42498ea497dSPeter Xu 
42598ea497dSPeter Xu     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
42698ea497dSPeter Xu     packet->packet_num = cpu_to_be64(packet_num);
427d32ca5adSJuan Quintela 
428efd8c543SPeter Xu     if (pages->block) {
429efd8c543SPeter Xu         strncpy(packet->ramblock, pages->block->idstr, 256);
430d32ca5adSJuan Quintela     }
431d32ca5adSJuan Quintela 
432efd8c543SPeter Xu     for (i = 0; i < pages->num; i++) {
433d32ca5adSJuan Quintela         /* there are architectures where ram_addr_t is 32 bit */
434efd8c543SPeter Xu         uint64_t temp = pages->offset[i];
435d32ca5adSJuan Quintela 
436d32ca5adSJuan Quintela         packet->offset[i] = cpu_to_be64(temp);
437d32ca5adSJuan Quintela     }
43805b7ec18SPeter Xu 
43905b7ec18SPeter Xu     p->packets_sent++;
440303e6f54SHao Xiang     p->total_normal_pages += pages->normal_num;
441303e6f54SHao Xiang     p->total_zero_pages += zero_num;
4428a9ef173SPeter Xu 
443303e6f54SHao Xiang     trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
444303e6f54SHao Xiang                        p->flags, p->next_packet_size);
445d32ca5adSJuan Quintela }
446d32ca5adSJuan Quintela 
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)447d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
448d32ca5adSJuan Quintela {
449d32ca5adSJuan Quintela     MultiFDPacket_t *packet = p->packet;
450d32ca5adSJuan Quintela     int i;
451d32ca5adSJuan Quintela 
452d32ca5adSJuan Quintela     packet->magic = be32_to_cpu(packet->magic);
453d32ca5adSJuan Quintela     if (packet->magic != MULTIFD_MAGIC) {
454d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
455d32ca5adSJuan Quintela                    "magic %x and expected magic %x",
456d32ca5adSJuan Quintela                    packet->magic, MULTIFD_MAGIC);
457d32ca5adSJuan Quintela         return -1;
458d32ca5adSJuan Quintela     }
459d32ca5adSJuan Quintela 
460d32ca5adSJuan Quintela     packet->version = be32_to_cpu(packet->version);
461d32ca5adSJuan Quintela     if (packet->version != MULTIFD_VERSION) {
462d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
46304e11404SJuan Quintela                    "version %u and expected version %u",
464d32ca5adSJuan Quintela                    packet->version, MULTIFD_VERSION);
465d32ca5adSJuan Quintela         return -1;
466d32ca5adSJuan Quintela     }
467d32ca5adSJuan Quintela 
468d32ca5adSJuan Quintela     p->flags = be32_to_cpu(packet->flags);
469d32ca5adSJuan Quintela 
470d32ca5adSJuan Quintela     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
471d32ca5adSJuan Quintela     /*
472d32ca5adSJuan Quintela      * If we received a packet that is 100 times bigger than expected
473d32ca5adSJuan Quintela      * just stop migration.  It is a magic number.
474d32ca5adSJuan Quintela      */
475d6f45ebaSJuan Quintela     if (packet->pages_alloc > p->page_count) {
476d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
477cf2d4aa8SJuan Quintela                    "with size %u and expected a size of %u",
478d6f45ebaSJuan Quintela                    packet->pages_alloc, p->page_count) ;
479d32ca5adSJuan Quintela         return -1;
480d32ca5adSJuan Quintela     }
481d32ca5adSJuan Quintela 
4828c0ec0b2SJuan Quintela     p->normal_num = be32_to_cpu(packet->normal_pages);
483cf2d4aa8SJuan Quintela     if (p->normal_num > packet->pages_alloc) {
484d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
485303e6f54SHao Xiang                    "with %u normal pages and expected maximum pages are %u",
486cf2d4aa8SJuan Quintela                    p->normal_num, packet->pages_alloc) ;
487d32ca5adSJuan Quintela         return -1;
488d32ca5adSJuan Quintela     }
489d32ca5adSJuan Quintela 
490303e6f54SHao Xiang     p->zero_num = be32_to_cpu(packet->zero_pages);
491303e6f54SHao Xiang     if (p->zero_num > packet->pages_alloc - p->normal_num) {
492303e6f54SHao Xiang         error_setg(errp, "multifd: received packet "
493303e6f54SHao Xiang                    "with %u zero pages and expected maximum zero pages are %u",
494303e6f54SHao Xiang                    p->zero_num, packet->pages_alloc - p->normal_num) ;
495303e6f54SHao Xiang         return -1;
496303e6f54SHao Xiang     }
497303e6f54SHao Xiang 
498d32ca5adSJuan Quintela     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
499d32ca5adSJuan Quintela     p->packet_num = be64_to_cpu(packet->packet_num);
50005b7ec18SPeter Xu     p->packets_recved++;
501db7e1cc5SPeter Xu     p->total_normal_pages += p->normal_num;
502303e6f54SHao Xiang     p->total_zero_pages += p->zero_num;
503d32ca5adSJuan Quintela 
504303e6f54SHao Xiang     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
505303e6f54SHao Xiang                        p->flags, p->next_packet_size);
5068a9ef173SPeter Xu 
507303e6f54SHao Xiang     if (p->normal_num == 0 && p->zero_num == 0) {
508d32ca5adSJuan Quintela         return 0;
509d32ca5adSJuan Quintela     }
510d32ca5adSJuan Quintela 
511d32ca5adSJuan Quintela     /* make sure that ramblock is 0 terminated */
512d32ca5adSJuan Quintela     packet->ramblock[255] = 0;
5135d1d1fcfSLukas Straub     p->block = qemu_ram_block_by_name(packet->ramblock);
5145d1d1fcfSLukas Straub     if (!p->block) {
515d32ca5adSJuan Quintela         error_setg(errp, "multifd: unknown ram block %s",
516d32ca5adSJuan Quintela                    packet->ramblock);
517d32ca5adSJuan Quintela         return -1;
518d32ca5adSJuan Quintela     }
519d32ca5adSJuan Quintela 
5205d1d1fcfSLukas Straub     p->host = p->block->host;
521cf2d4aa8SJuan Quintela     for (i = 0; i < p->normal_num; i++) {
522d32ca5adSJuan Quintela         uint64_t offset = be64_to_cpu(packet->offset[i]);
523d32ca5adSJuan Quintela 
5245d1d1fcfSLukas Straub         if (offset > (p->block->used_length - p->page_size)) {
525d32ca5adSJuan Quintela             error_setg(errp, "multifd: offset too long %" PRIu64
526d32ca5adSJuan Quintela                        " (max " RAM_ADDR_FMT ")",
5275d1d1fcfSLukas Straub                        offset, p->block->used_length);
528d32ca5adSJuan Quintela             return -1;
529d32ca5adSJuan Quintela         }
530cf2d4aa8SJuan Quintela         p->normal[i] = offset;
531d32ca5adSJuan Quintela     }
532d32ca5adSJuan Quintela 
533303e6f54SHao Xiang     for (i = 0; i < p->zero_num; i++) {
534303e6f54SHao Xiang         uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
535303e6f54SHao Xiang 
536303e6f54SHao Xiang         if (offset > (p->block->used_length - p->page_size)) {
537303e6f54SHao Xiang             error_setg(errp, "multifd: offset too long %" PRIu64
538303e6f54SHao Xiang                        " (max " RAM_ADDR_FMT ")",
539303e6f54SHao Xiang                        offset, p->block->used_length);
540303e6f54SHao Xiang             return -1;
541303e6f54SHao Xiang         }
542303e6f54SHao Xiang         p->zero[i] = offset;
543303e6f54SHao Xiang     }
544303e6f54SHao Xiang 
545d32ca5adSJuan Quintela     return 0;
546d32ca5adSJuan Quintela }
547d32ca5adSJuan Quintela 
multifd_send_should_exit(void)54815f3f21dSPeter Xu static bool multifd_send_should_exit(void)
54915f3f21dSPeter Xu {
55015f3f21dSPeter Xu     return qatomic_read(&multifd_send_state->exiting);
55115f3f21dSPeter Xu }
55215f3f21dSPeter Xu 
multifd_recv_should_exit(void)55311dd7be5SFabiano Rosas static bool multifd_recv_should_exit(void)
55411dd7be5SFabiano Rosas {
55511dd7be5SFabiano Rosas     return qatomic_read(&multifd_recv_state->exiting);
55611dd7be5SFabiano Rosas }
55711dd7be5SFabiano Rosas 
558d32ca5adSJuan Quintela /*
55948c0f5d5SPeter Xu  * The migration thread can wait on either of the two semaphores.  This
56048c0f5d5SPeter Xu  * function can be used to kick the main thread out of waiting on either of
56148c0f5d5SPeter Xu  * them.  Should mostly only be called when something wrong happened with
56248c0f5d5SPeter Xu  * the current multifd send thread.
56348c0f5d5SPeter Xu  */
multifd_send_kick_main(MultiFDSendParams * p)56448c0f5d5SPeter Xu static void multifd_send_kick_main(MultiFDSendParams *p)
56548c0f5d5SPeter Xu {
56648c0f5d5SPeter Xu     qemu_sem_post(&p->sem_sync);
56748c0f5d5SPeter Xu     qemu_sem_post(&multifd_send_state->channels_ready);
56848c0f5d5SPeter Xu }
56948c0f5d5SPeter Xu 
57048c0f5d5SPeter Xu /*
571d32ca5adSJuan Quintela  * How we use multifd_send_state->pages and channel->pages?
572d32ca5adSJuan Quintela  *
573d32ca5adSJuan Quintela  * We create a pages for each channel, and a main one.  Each time that
574d32ca5adSJuan Quintela  * we need to send a batch of pages we interchange the ones between
575d32ca5adSJuan Quintela  * multifd_send_state and the channel that is sending it.  There are
576d32ca5adSJuan Quintela  * two reasons for that:
577d32ca5adSJuan Quintela  *    - to not have to do so many mallocs during migration
578d32ca5adSJuan Quintela  *    - to make easier to know what to free at the end of migration
579d32ca5adSJuan Quintela  *
580d32ca5adSJuan Quintela  * This way we always know who is the owner of each "pages" struct,
581d32ca5adSJuan Quintela  * and we don't need any locking.  It belongs to the migration thread
582d32ca5adSJuan Quintela  * or to the channel thread.  Switching is safe because the migration
583d32ca5adSJuan Quintela  * thread is using the channel mutex when changing it, and the channel
584d32ca5adSJuan Quintela  * have to had finish with its own, otherwise pending_job can't be
585d32ca5adSJuan Quintela  * false.
5863b40964aSPeter Xu  *
5873b40964aSPeter Xu  * Returns true if succeed, false otherwise.
588d32ca5adSJuan Quintela  */
multifd_send_pages(void)5893b40964aSPeter Xu static bool multifd_send_pages(void)
590d32ca5adSJuan Quintela {
591d32ca5adSJuan Quintela     int i;
592d32ca5adSJuan Quintela     static int next_channel;
593d32ca5adSJuan Quintela     MultiFDSendParams *p = NULL; /* make happy gcc */
594d32ca5adSJuan Quintela     MultiFDPages_t *pages = multifd_send_state->pages;
595d32ca5adSJuan Quintela 
59615f3f21dSPeter Xu     if (multifd_send_should_exit()) {
5973b40964aSPeter Xu         return false;
598d32ca5adSJuan Quintela     }
599d32ca5adSJuan Quintela 
600e3cce9afSPeter Xu     /* We wait here, until at least one channel is ready */
601d32ca5adSJuan Quintela     qemu_sem_wait(&multifd_send_state->channels_ready);
602e3cce9afSPeter Xu 
6037e89a140SLaurent Vivier     /*
6047e89a140SLaurent Vivier      * next_channel can remain from a previous migration that was
6057e89a140SLaurent Vivier      * using more channels, so ensure it doesn't overflow if the
6067e89a140SLaurent Vivier      * limit is lower now.
6077e89a140SLaurent Vivier      */
6087e89a140SLaurent Vivier     next_channel %= migrate_multifd_channels();
609d32ca5adSJuan Quintela     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
61015f3f21dSPeter Xu         if (multifd_send_should_exit()) {
6113b40964aSPeter Xu             return false;
612d32ca5adSJuan Quintela         }
61315f3f21dSPeter Xu         p = &multifd_send_state->params[i];
614e3cce9afSPeter Xu         /*
615e3cce9afSPeter Xu          * Lockless read to p->pending_job is safe, because only multifd
616e3cce9afSPeter Xu          * sender thread can clear it.
617e3cce9afSPeter Xu          */
618f5f48a78SPeter Xu         if (qatomic_read(&p->pending_job) == false) {
619d32ca5adSJuan Quintela             next_channel = (i + 1) % migrate_multifd_channels();
620d32ca5adSJuan Quintela             break;
621d32ca5adSJuan Quintela         }
622d32ca5adSJuan Quintela     }
623e3cce9afSPeter Xu 
624e3cce9afSPeter Xu     /*
625488c84acSPeter Xu      * Make sure we read p->pending_job before all the rest.  Pairs with
626488c84acSPeter Xu      * qatomic_store_release() in multifd_send_thread().
627e3cce9afSPeter Xu      */
628488c84acSPeter Xu     smp_mb_acquire();
629488c84acSPeter Xu     assert(!p->pages->num);
630d32ca5adSJuan Quintela     multifd_send_state->pages = p->pages;
631d32ca5adSJuan Quintela     p->pages = pages;
632488c84acSPeter Xu     /*
633488c84acSPeter Xu      * Making sure p->pages is setup before marking pending_job=true. Pairs
634488c84acSPeter Xu      * with the qatomic_load_acquire() in multifd_send_thread().
635488c84acSPeter Xu      */
636488c84acSPeter Xu     qatomic_store_release(&p->pending_job, true);
637d32ca5adSJuan Quintela     qemu_sem_post(&p->sem);
638d32ca5adSJuan Quintela 
6393b40964aSPeter Xu     return true;
640d32ca5adSJuan Quintela }
641d32ca5adSJuan Quintela 
multifd_queue_empty(MultiFDPages_t * pages)642f88f86c4SPeter Xu static inline bool multifd_queue_empty(MultiFDPages_t *pages)
643f88f86c4SPeter Xu {
644f88f86c4SPeter Xu     return pages->num == 0;
645f88f86c4SPeter Xu }
646f88f86c4SPeter Xu 
multifd_queue_full(MultiFDPages_t * pages)647f88f86c4SPeter Xu static inline bool multifd_queue_full(MultiFDPages_t *pages)
648f88f86c4SPeter Xu {
649f88f86c4SPeter Xu     return pages->num == pages->allocated;
650f88f86c4SPeter Xu }
651f88f86c4SPeter Xu 
multifd_enqueue(MultiFDPages_t * pages,ram_addr_t offset)652f88f86c4SPeter Xu static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
653f88f86c4SPeter Xu {
654f88f86c4SPeter Xu     pages->offset[pages->num++] = offset;
655f88f86c4SPeter Xu }
656f88f86c4SPeter Xu 
657d6556d17SPeter Xu /* Returns true if enqueue successful, false otherwise */
multifd_queue_page(RAMBlock * block,ram_addr_t offset)658d6556d17SPeter Xu bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
659d32ca5adSJuan Quintela {
660f88f86c4SPeter Xu     MultiFDPages_t *pages;
661d32ca5adSJuan Quintela 
662f88f86c4SPeter Xu retry:
663f88f86c4SPeter Xu     pages = multifd_send_state->pages;
664f88f86c4SPeter Xu 
665f88f86c4SPeter Xu     /* If the queue is empty, we can already enqueue now */
666f88f86c4SPeter Xu     if (multifd_queue_empty(pages)) {
667d32ca5adSJuan Quintela         pages->block = block;
668f88f86c4SPeter Xu         multifd_enqueue(pages, offset);
669d6556d17SPeter Xu         return true;
670d32ca5adSJuan Quintela     }
671d32ca5adSJuan Quintela 
672f88f86c4SPeter Xu     /*
673f88f86c4SPeter Xu      * Not empty, meanwhile we need a flush.  It can because of either:
674f88f86c4SPeter Xu      *
675f88f86c4SPeter Xu      * (1) The page is not on the same ramblock of previous ones, or,
676f88f86c4SPeter Xu      * (2) The queue is full.
677f88f86c4SPeter Xu      *
678f88f86c4SPeter Xu      * After flush, always retry.
679f88f86c4SPeter Xu      */
680f88f86c4SPeter Xu     if (pages->block != block || multifd_queue_full(pages)) {
6813b40964aSPeter Xu         if (!multifd_send_pages()) {
682d6556d17SPeter Xu             return false;
683d32ca5adSJuan Quintela         }
684f88f86c4SPeter Xu         goto retry;
685d32ca5adSJuan Quintela     }
686d32ca5adSJuan Quintela 
687f88f86c4SPeter Xu     /* Not empty, and we still have space, do it! */
688f88f86c4SPeter Xu     multifd_enqueue(pages, offset);
689d6556d17SPeter Xu     return true;
690d32ca5adSJuan Quintela }
691d32ca5adSJuan Quintela 
6923ab4441dSPeter Xu /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)6933ab4441dSPeter Xu static void multifd_send_set_error(Error *err)
694d32ca5adSJuan Quintela {
69515f3f21dSPeter Xu     /*
69615f3f21dSPeter Xu      * We don't want to exit each threads twice.  Depending on where
69715f3f21dSPeter Xu      * we get the error, or if there are two independent errors in two
69815f3f21dSPeter Xu      * threads at the same time, we can end calling this function
69915f3f21dSPeter Xu      * twice.
70015f3f21dSPeter Xu      */
70115f3f21dSPeter Xu     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
70215f3f21dSPeter Xu         return;
70315f3f21dSPeter Xu     }
70415f3f21dSPeter Xu 
705d32ca5adSJuan Quintela     if (err) {
706d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
707d32ca5adSJuan Quintela         migrate_set_error(s, err);
708d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
709d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
710d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_DEVICE ||
711d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
712d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
713d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
714d32ca5adSJuan Quintela         }
715d32ca5adSJuan Quintela     }
7163ab4441dSPeter Xu }
717d32ca5adSJuan Quintela 
multifd_send_terminate_threads(void)7183ab4441dSPeter Xu static void multifd_send_terminate_threads(void)
7193ab4441dSPeter Xu {
7203ab4441dSPeter Xu     int i;
7213ab4441dSPeter Xu 
7223ab4441dSPeter Xu     trace_multifd_send_terminate_threads();
7233ab4441dSPeter Xu 
7243ab4441dSPeter Xu     /*
7253ab4441dSPeter Xu      * Tell everyone we're quitting.  No xchg() needed here; we simply
7263ab4441dSPeter Xu      * always set it.
7273ab4441dSPeter Xu      */
7283ab4441dSPeter Xu     qatomic_set(&multifd_send_state->exiting, 1);
72912808db3SPeter Xu 
73012808db3SPeter Xu     /*
73112808db3SPeter Xu      * Firstly, kick all threads out; no matter whether they are just idle,
73212808db3SPeter Xu      * or blocked in an IO system call.
73312808db3SPeter Xu      */
734d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
735d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
736d32ca5adSJuan Quintela 
737d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
738077fbb59SLi Zhang         if (p->c) {
739077fbb59SLi Zhang             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
740077fbb59SLi Zhang         }
741d32ca5adSJuan Quintela     }
74212808db3SPeter Xu 
74312808db3SPeter Xu     /*
74412808db3SPeter Xu      * Finally recycle all the threads.
74512808db3SPeter Xu      */
74612808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
74712808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
74812808db3SPeter Xu 
749e1921f10SFabiano Rosas         if (p->tls_thread_created) {
750e1921f10SFabiano Rosas             qemu_thread_join(&p->tls_thread);
751e1921f10SFabiano Rosas         }
752e1921f10SFabiano Rosas 
753a2a63c4aSFabiano Rosas         if (p->thread_created) {
75412808db3SPeter Xu             qemu_thread_join(&p->thread);
75512808db3SPeter Xu         }
75612808db3SPeter Xu     }
757d32ca5adSJuan Quintela }
758d32ca5adSJuan Quintela 
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)75912808db3SPeter Xu static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
760d32ca5adSJuan Quintela {
7610518b5d8SPeter Xu     if (p->c) {
76220171ea8SLukas Straub         migration_ioc_unregister_yank(p->c);
7631a6e217cSPeter Xu         /*
76461dec060SFabiano Rosas          * The object_unref() cannot guarantee the fd will always be
76561dec060SFabiano Rosas          * released because finalize() of the iochannel is only
76661dec060SFabiano Rosas          * triggered on the last reference and it's not guaranteed
76761dec060SFabiano Rosas          * that we always hold the last refcount when reaching here.
7681a6e217cSPeter Xu          *
76961dec060SFabiano Rosas          * Closing the fd explicitly has the benefit that if there is any
77061dec060SFabiano Rosas          * registered I/O handler callbacks on such fd, that will get a
77161dec060SFabiano Rosas          * POLLNVAL event and will further trigger the cleanup to finally
77261dec060SFabiano Rosas          * release the IOC.
77361dec060SFabiano Rosas          *
77461dec060SFabiano Rosas          * FIXME: It should logically be guaranteed that all multifd
77561dec060SFabiano Rosas          * channels have no I/O handler callback registered when reaching
77661dec060SFabiano Rosas          * here, because migration thread will wait for all multifd channel
77761dec060SFabiano Rosas          * establishments to complete during setup.  Since
77861dec060SFabiano Rosas          * migrate_fd_cleanup() will be scheduled in main thread too, all
77961dec060SFabiano Rosas          * previous callbacks should guarantee to be completed when
78061dec060SFabiano Rosas          * reaching here.  See multifd_send_state.channels_created and its
78161dec060SFabiano Rosas          * usage.  In the future, we could replace this with an assert
78261dec060SFabiano Rosas          * making sure we're the last reference, or simply drop it if above
78361dec060SFabiano Rosas          * is more clear to be justified.
7841a6e217cSPeter Xu          */
785b7b03eb6SFabiano Rosas         qio_channel_close(p->c, &error_abort);
786c9a7e83cSPeter Xu         object_unref(OBJECT(p->c));
787d32ca5adSJuan Quintela         p->c = NULL;
7880518b5d8SPeter Xu     }
789d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem);
790d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem_sync);
791d32ca5adSJuan Quintela     g_free(p->name);
792d32ca5adSJuan Quintela     p->name = NULL;
793d32ca5adSJuan Quintela     multifd_pages_clear(p->pages);
794d32ca5adSJuan Quintela     p->pages = NULL;
795d32ca5adSJuan Quintela     p->packet_len = 0;
796d32ca5adSJuan Quintela     g_free(p->packet);
797d32ca5adSJuan Quintela     p->packet = NULL;
79812808db3SPeter Xu     multifd_send_state->ops->send_cleanup(p, errp);
79912808db3SPeter Xu 
80012808db3SPeter Xu     return *errp == NULL;
801ab7cbb0bSJuan Quintela }
80212808db3SPeter Xu 
multifd_send_cleanup_state(void)80312808db3SPeter Xu static void multifd_send_cleanup_state(void)
80412808db3SPeter Xu {
805b7b03eb6SFabiano Rosas     file_cleanup_outgoing_migration();
80672b90b96SPeter Xu     socket_cleanup_outgoing_migration();
80793fa9dc2SFabiano Rosas     qemu_sem_destroy(&multifd_send_state->channels_created);
808d32ca5adSJuan Quintela     qemu_sem_destroy(&multifd_send_state->channels_ready);
809d32ca5adSJuan Quintela     g_free(multifd_send_state->params);
810d32ca5adSJuan Quintela     multifd_send_state->params = NULL;
811d32ca5adSJuan Quintela     multifd_pages_clear(multifd_send_state->pages);
812d32ca5adSJuan Quintela     multifd_send_state->pages = NULL;
813d32ca5adSJuan Quintela     g_free(multifd_send_state);
814d32ca5adSJuan Quintela     multifd_send_state = NULL;
815d32ca5adSJuan Quintela }
816d32ca5adSJuan Quintela 
multifd_send_shutdown(void)817cde85c37SPeter Xu void multifd_send_shutdown(void)
81812808db3SPeter Xu {
81912808db3SPeter Xu     int i;
82012808db3SPeter Xu 
82112808db3SPeter Xu     if (!migrate_multifd()) {
82212808db3SPeter Xu         return;
82312808db3SPeter Xu     }
82412808db3SPeter Xu 
82512808db3SPeter Xu     multifd_send_terminate_threads();
82612808db3SPeter Xu 
82712808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
82812808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
82912808db3SPeter Xu         Error *local_err = NULL;
83012808db3SPeter Xu 
83112808db3SPeter Xu         if (!multifd_send_cleanup_channel(p, &local_err)) {
83212808db3SPeter Xu             migrate_set_error(migrate_get_current(), local_err);
83312808db3SPeter Xu             error_free(local_err);
83412808db3SPeter Xu         }
83512808db3SPeter Xu     }
83612808db3SPeter Xu 
83712808db3SPeter Xu     multifd_send_cleanup_state();
83812808db3SPeter Xu }
83912808db3SPeter Xu 
multifd_zero_copy_flush(QIOChannel * c)8404cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c)
8414cc47b43SLeonardo Bras {
8424cc47b43SLeonardo Bras     int ret;
8434cc47b43SLeonardo Bras     Error *err = NULL;
8444cc47b43SLeonardo Bras 
8454cc47b43SLeonardo Bras     ret = qio_channel_flush(c, &err);
8464cc47b43SLeonardo Bras     if (ret < 0) {
8474cc47b43SLeonardo Bras         error_report_err(err);
8484cc47b43SLeonardo Bras         return -1;
8494cc47b43SLeonardo Bras     }
8504cc47b43SLeonardo Bras     if (ret == 1) {
851aff3f660SJuan Quintela         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
8524cc47b43SLeonardo Bras     }
8534cc47b43SLeonardo Bras 
8544cc47b43SLeonardo Bras     return ret;
8554cc47b43SLeonardo Bras }
8564cc47b43SLeonardo Bras 
multifd_send_sync_main(void)8579346fa18SFabiano Rosas int multifd_send_sync_main(void)
858d32ca5adSJuan Quintela {
859d32ca5adSJuan Quintela     int i;
8605b1d9babSLeonardo Bras     bool flush_zero_copy;
861d32ca5adSJuan Quintela 
86251b07548SJuan Quintela     if (!migrate_multifd()) {
86333d70973SLeonardo Bras         return 0;
864d32ca5adSJuan Quintela     }
86590a3d2f9SJuan Quintela     if (multifd_send_state->pages->num) {
8663b40964aSPeter Xu         if (!multifd_send_pages()) {
867d32ca5adSJuan Quintela             error_report("%s: multifd_send_pages fail", __func__);
86833d70973SLeonardo Bras             return -1;
869d32ca5adSJuan Quintela         }
870d32ca5adSJuan Quintela     }
8715b1d9babSLeonardo Bras 
872b4bc342cSJuan Quintela     flush_zero_copy = migrate_zero_copy_send();
8735b1d9babSLeonardo Bras 
874d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
875d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
876d32ca5adSJuan Quintela 
87715f3f21dSPeter Xu         if (multifd_send_should_exit()) {
87833d70973SLeonardo Bras             return -1;
879d32ca5adSJuan Quintela         }
880d32ca5adSJuan Quintela 
88115f3f21dSPeter Xu         trace_multifd_send_sync_main_signal(p->id);
88215f3f21dSPeter Xu 
883f5f48a78SPeter Xu         /*
884f5f48a78SPeter Xu          * We should be the only user so far, so not possible to be set by
885f5f48a78SPeter Xu          * others concurrently.
886f5f48a78SPeter Xu          */
887f5f48a78SPeter Xu         assert(qatomic_read(&p->pending_sync) == false);
888f5f48a78SPeter Xu         qatomic_set(&p->pending_sync, true);
889d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
890d32ca5adSJuan Quintela     }
891d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
892d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
893d32ca5adSJuan Quintela 
89415f3f21dSPeter Xu         if (multifd_send_should_exit()) {
89515f3f21dSPeter Xu             return -1;
89615f3f21dSPeter Xu         }
89715f3f21dSPeter Xu 
898d2026ee1SJuan Quintela         qemu_sem_wait(&multifd_send_state->channels_ready);
899d32ca5adSJuan Quintela         trace_multifd_send_sync_main_wait(p->id);
900d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem_sync);
901ebfc5787SZhenzhong Duan 
902ebfc5787SZhenzhong Duan         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
903ebfc5787SZhenzhong Duan             return -1;
904ebfc5787SZhenzhong Duan         }
905d32ca5adSJuan Quintela     }
906d32ca5adSJuan Quintela     trace_multifd_send_sync_main(multifd_send_state->packet_num);
90733d70973SLeonardo Bras 
90833d70973SLeonardo Bras     return 0;
909d32ca5adSJuan Quintela }
910d32ca5adSJuan Quintela 
multifd_send_thread(void * opaque)911d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque)
912d32ca5adSJuan Quintela {
913d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
9141b1f4ab6SJiang Jiacheng     MigrationThread *thread = NULL;
915d32ca5adSJuan Quintela     Error *local_err = NULL;
916d32ca5adSJuan Quintela     int ret = 0;
91706833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
918d32ca5adSJuan Quintela 
919788fa680SFabiano Rosas     thread = migration_threads_add(p->name, qemu_get_thread_id());
9201b1f4ab6SJiang Jiacheng 
921d32ca5adSJuan Quintela     trace_multifd_send_thread_start(p->id);
922d32ca5adSJuan Quintela     rcu_register_thread();
923d32ca5adSJuan Quintela 
92406833d83SFabiano Rosas     if (use_packets) {
925d32ca5adSJuan Quintela         if (multifd_send_initial_packet(p, &local_err) < 0) {
926d32ca5adSJuan Quintela             ret = -1;
927d32ca5adSJuan Quintela             goto out;
928d32ca5adSJuan Quintela         }
92906833d83SFabiano Rosas     }
930d32ca5adSJuan Quintela 
931d32ca5adSJuan Quintela     while (true) {
932d2026ee1SJuan Quintela         qemu_sem_post(&multifd_send_state->channels_ready);
933d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem);
934d32ca5adSJuan Quintela 
93515f3f21dSPeter Xu         if (multifd_send_should_exit()) {
936d32ca5adSJuan Quintela             break;
937d32ca5adSJuan Quintela         }
938d32ca5adSJuan Quintela 
939488c84acSPeter Xu         /*
940488c84acSPeter Xu          * Read pending_job flag before p->pages.  Pairs with the
941488c84acSPeter Xu          * qatomic_store_release() in multifd_send_pages().
942488c84acSPeter Xu          */
943488c84acSPeter Xu         if (qatomic_load_acquire(&p->pending_job)) {
944efd8c543SPeter Xu             MultiFDPages_t *pages = p->pages;
945d32ca5adSJuan Quintela 
946b7dbdd8eSLeonardo Bras             p->iovs_num = 0;
94783c560fbSPeter Xu             assert(pages->num);
94883c560fbSPeter Xu 
94902fb8104SJuan Quintela             ret = multifd_send_state->ops->send_prepare(p, &local_err);
950ab7cbb0bSJuan Quintela             if (ret != 0) {
951ab7cbb0bSJuan Quintela                 break;
952ab7cbb0bSJuan Quintela             }
95383c560fbSPeter Xu 
954f427d90bSFabiano Rosas             if (migrate_mapped_ram()) {
955f427d90bSFabiano Rosas                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
956f427d90bSFabiano Rosas                                               p->pages->block, &local_err);
957f427d90bSFabiano Rosas             } else {
958f427d90bSFabiano Rosas                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
959f427d90bSFabiano Rosas                                                   NULL, 0, p->write_flags,
960f427d90bSFabiano Rosas                                                   &local_err);
961f427d90bSFabiano Rosas             }
962f427d90bSFabiano Rosas 
963d32ca5adSJuan Quintela             if (ret != 0) {
964d32ca5adSJuan Quintela                 break;
965d32ca5adSJuan Quintela             }
966d32ca5adSJuan Quintela 
96768b6e000SElena Ufimtseva             stat64_add(&mig_stats.multifd_bytes,
96868b6e000SElena Ufimtseva                        p->next_packet_size + p->packet_len);
969303e6f54SHao Xiang             stat64_add(&mig_stats.normal_pages, pages->normal_num);
970303e6f54SHao Xiang             stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
971836eca47SPeter Xu 
972836eca47SPeter Xu             multifd_pages_reset(p->pages);
9731618f552SElena Ufimtseva             p->next_packet_size = 0;
974488c84acSPeter Xu 
975488c84acSPeter Xu             /*
976488c84acSPeter Xu              * Making sure p->pages is published before saying "we're
977488c84acSPeter Xu              * free".  Pairs with the smp_mb_acquire() in
978488c84acSPeter Xu              * multifd_send_pages().
979488c84acSPeter Xu              */
980488c84acSPeter Xu             qatomic_store_release(&p->pending_job, false);
981859ebaf3SPeter Xu         } else {
982488c84acSPeter Xu             /*
983488c84acSPeter Xu              * If not a normal job, must be a sync request.  Note that
984488c84acSPeter Xu              * pending_sync is a standalone flag (unlike pending_job), so
985488c84acSPeter Xu              * it doesn't require explicit memory barriers.
986488c84acSPeter Xu              */
987859ebaf3SPeter Xu             assert(qatomic_read(&p->pending_sync));
98806833d83SFabiano Rosas 
98906833d83SFabiano Rosas             if (use_packets) {
990f5f48a78SPeter Xu                 p->flags = MULTIFD_FLAG_SYNC;
991f5f48a78SPeter Xu                 multifd_send_fill_packet(p);
992f5f48a78SPeter Xu                 ret = qio_channel_write_all(p->c, (void *)p->packet,
993f5f48a78SPeter Xu                                             p->packet_len, &local_err);
994f5f48a78SPeter Xu                 if (ret != 0) {
995f5f48a78SPeter Xu                     break;
996d32ca5adSJuan Quintela                 }
997f5f48a78SPeter Xu                 /* p->next_packet_size will always be zero for a SYNC packet */
998f5f48a78SPeter Xu                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
999f5f48a78SPeter Xu                 p->flags = 0;
100006833d83SFabiano Rosas             }
100106833d83SFabiano Rosas 
1002f5f48a78SPeter Xu             qatomic_set(&p->pending_sync, false);
1003f5f48a78SPeter Xu             qemu_sem_post(&p->sem_sync);
1004d32ca5adSJuan Quintela         }
1005d32ca5adSJuan Quintela     }
1006d32ca5adSJuan Quintela 
1007d32ca5adSJuan Quintela out:
1008ee8a7c9cSFabiano Rosas     if (ret) {
1009ee8a7c9cSFabiano Rosas         assert(local_err);
1010d32ca5adSJuan Quintela         trace_multifd_send_error(p->id);
10113ab4441dSPeter Xu         multifd_send_set_error(local_err);
101248c0f5d5SPeter Xu         multifd_send_kick_main(p);
1013ee8a7c9cSFabiano Rosas         error_free(local_err);
1014d32ca5adSJuan Quintela     }
1015d32ca5adSJuan Quintela 
1016d32ca5adSJuan Quintela     rcu_unregister_thread();
1017788fa680SFabiano Rosas     migration_threads_remove(thread);
1018303e6f54SHao Xiang     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
1019303e6f54SHao Xiang                                   p->total_zero_pages);
1020d32ca5adSJuan Quintela 
1021d32ca5adSJuan Quintela     return NULL;
1022d32ca5adSJuan Quintela }
1023d32ca5adSJuan Quintela 
10242576ae48SFabiano Rosas static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
102529647140SChuan Zheng 
10269221e3c6SPeter Xu typedef struct {
10279221e3c6SPeter Xu     MultiFDSendParams *p;
10289221e3c6SPeter Xu     QIOChannelTLS *tioc;
10299221e3c6SPeter Xu } MultiFDTLSThreadArgs;
10309221e3c6SPeter Xu 
multifd_tls_handshake_thread(void * opaque)1031a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque)
1032a1af605bSChuan Zheng {
10339221e3c6SPeter Xu     MultiFDTLSThreadArgs *args = opaque;
1034a1af605bSChuan Zheng 
10359221e3c6SPeter Xu     qio_channel_tls_handshake(args->tioc,
10362576ae48SFabiano Rosas                               multifd_new_send_channel_async,
10379221e3c6SPeter Xu                               args->p,
1038a1af605bSChuan Zheng                               NULL,
1039a1af605bSChuan Zheng                               NULL);
10409221e3c6SPeter Xu     g_free(args);
10419221e3c6SPeter Xu 
1042a1af605bSChuan Zheng     return NULL;
1043a1af605bSChuan Zheng }
1044a1af605bSChuan Zheng 
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)1045967e3889SFabiano Rosas static bool multifd_tls_channel_connect(MultiFDSendParams *p,
104629647140SChuan Zheng                                         QIOChannel *ioc,
104729647140SChuan Zheng                                         Error **errp)
104829647140SChuan Zheng {
104929647140SChuan Zheng     MigrationState *s = migrate_get_current();
10507f692ec7SPeter Xu     const char *hostname = s->hostname;
10519221e3c6SPeter Xu     MultiFDTLSThreadArgs *args;
105229647140SChuan Zheng     QIOChannelTLS *tioc;
105329647140SChuan Zheng 
10540deb7e9bSJuan Quintela     tioc = migration_tls_client_create(ioc, hostname, errp);
105529647140SChuan Zheng     if (!tioc) {
1056967e3889SFabiano Rosas         return false;
105729647140SChuan Zheng     }
105829647140SChuan Zheng 
10592576ae48SFabiano Rosas     /*
10602576ae48SFabiano Rosas      * Ownership of the socket channel now transfers to the newly
10612576ae48SFabiano Rosas      * created TLS channel, which has already taken a reference.
10622576ae48SFabiano Rosas      */
10639e842408SChuan Zheng     object_unref(OBJECT(ioc));
1064894f0214SChuan Zheng     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
106529647140SChuan Zheng     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
10669221e3c6SPeter Xu 
10679221e3c6SPeter Xu     args = g_new0(MultiFDTLSThreadArgs, 1);
10689221e3c6SPeter Xu     args->tioc = tioc;
10699221e3c6SPeter Xu     args->p = p;
1070e1921f10SFabiano Rosas 
1071e1921f10SFabiano Rosas     p->tls_thread_created = true;
107260ce4767SPeter Xu     qemu_thread_create(&p->tls_thread, "mig/src/tls",
10739221e3c6SPeter Xu                        multifd_tls_handshake_thread, args,
1074a1af605bSChuan Zheng                        QEMU_THREAD_JOINABLE);
1075967e3889SFabiano Rosas     return true;
107629647140SChuan Zheng }
107729647140SChuan Zheng 
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)1078b7b03eb6SFabiano Rosas void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
107929647140SChuan Zheng {
10802576ae48SFabiano Rosas     qio_channel_set_delay(ioc, false);
1081967e3889SFabiano Rosas 
108220171ea8SLukas Straub     migration_ioc_register_yank(ioc);
10839221e3c6SPeter Xu     /* Setup p->c only if the channel is completely setup */
108429647140SChuan Zheng     p->c = ioc;
1085a2a63c4aSFabiano Rosas 
1086a2a63c4aSFabiano Rosas     p->thread_created = true;
108729647140SChuan Zheng     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
108829647140SChuan Zheng                        QEMU_THREAD_JOINABLE);
108929647140SChuan Zheng }
109029647140SChuan Zheng 
10912576ae48SFabiano Rosas /*
10922576ae48SFabiano Rosas  * When TLS is enabled this function is called once to establish the
10932576ae48SFabiano Rosas  * TLS connection and a second time after the TLS handshake to create
10942576ae48SFabiano Rosas  * the multifd channel. Without TLS it goes straight into the channel
10952576ae48SFabiano Rosas  * creation.
10962576ae48SFabiano Rosas  */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)1097d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1098d32ca5adSJuan Quintela {
1099d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
11000e92f644SFabiano Rosas     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1101d32ca5adSJuan Quintela     Error *local_err = NULL;
11022576ae48SFabiano Rosas     bool ret;
1103d32ca5adSJuan Quintela 
1104d32ca5adSJuan Quintela     trace_multifd_new_send_channel_async(p->id);
11052576ae48SFabiano Rosas 
11062576ae48SFabiano Rosas     if (qio_task_propagate_error(task, &local_err)) {
11072576ae48SFabiano Rosas         ret = false;
11082576ae48SFabiano Rosas         goto out;
1109bca762c2SLi Zhang     }
111003c7a42dSChuan Zheng 
11112576ae48SFabiano Rosas     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
11122576ae48SFabiano Rosas                                        migrate_get_current()->hostname);
11132576ae48SFabiano Rosas 
11142576ae48SFabiano Rosas     if (migrate_channel_requires_tls_upgrade(ioc)) {
11152576ae48SFabiano Rosas         ret = multifd_tls_channel_connect(p, ioc, &local_err);
111693fa9dc2SFabiano Rosas         if (ret) {
111793fa9dc2SFabiano Rosas             return;
111893fa9dc2SFabiano Rosas         }
11192576ae48SFabiano Rosas     } else {
1120770de49cSPeter Xu         multifd_channel_connect(p, ioc);
1121770de49cSPeter Xu         ret = true;
11222576ae48SFabiano Rosas     }
11232576ae48SFabiano Rosas 
112493fa9dc2SFabiano Rosas out:
112593fa9dc2SFabiano Rosas     /*
112693fa9dc2SFabiano Rosas      * Here we're not interested whether creation succeeded, only that
112793fa9dc2SFabiano Rosas      * it happened at all.
112893fa9dc2SFabiano Rosas      */
1129a8a3e710SFabiano Rosas     multifd_send_channel_created();
113093fa9dc2SFabiano Rosas 
11312576ae48SFabiano Rosas     if (ret) {
11322576ae48SFabiano Rosas         return;
11332576ae48SFabiano Rosas     }
11342576ae48SFabiano Rosas 
1135967e3889SFabiano Rosas     trace_multifd_new_send_channel_async_error(p->id, local_err);
11363ab4441dSPeter Xu     multifd_send_set_error(local_err);
11372576ae48SFabiano Rosas     /*
11389221e3c6SPeter Xu      * For error cases (TLS or non-TLS), IO channel is always freed here
11399221e3c6SPeter Xu      * rather than when cleanup multifd: since p->c is not set, multifd
11409221e3c6SPeter Xu      * cleanup code doesn't even know its existence.
11412576ae48SFabiano Rosas      */
114215f3f21dSPeter Xu     object_unref(OBJECT(ioc));
114315f3f21dSPeter Xu     error_free(local_err);
11440e92f644SFabiano Rosas }
11450e92f644SFabiano Rosas 
multifd_new_send_channel_create(gpointer opaque,Error ** errp)1146b7b03eb6SFabiano Rosas static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
11470e92f644SFabiano Rosas {
1148b7b03eb6SFabiano Rosas     if (!multifd_use_packets()) {
1149b7b03eb6SFabiano Rosas         return file_send_channel_create(opaque, errp);
1150b7b03eb6SFabiano Rosas     }
1151b7b03eb6SFabiano Rosas 
11520e92f644SFabiano Rosas     socket_send_channel_create(multifd_new_send_channel_async, opaque);
1153b7b03eb6SFabiano Rosas     return true;
1154d32ca5adSJuan Quintela }
1155d32ca5adSJuan Quintela 
multifd_send_setup(void)1156bd8b0a8fSFabiano Rosas bool multifd_send_setup(void)
1157d32ca5adSJuan Quintela {
1158bd8b0a8fSFabiano Rosas     MigrationState *s = migrate_get_current();
1159bd8b0a8fSFabiano Rosas     int thread_count, ret = 0;
1160d32ca5adSJuan Quintela     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
116106833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1162d32ca5adSJuan Quintela     uint8_t i;
1163d32ca5adSJuan Quintela 
116451b07548SJuan Quintela     if (!migrate_multifd()) {
1165bd8b0a8fSFabiano Rosas         return true;
1166d32ca5adSJuan Quintela     }
1167b7acd657SLi Zhijian 
1168d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
1169d32ca5adSJuan Quintela     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1170d32ca5adSJuan Quintela     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1171d32ca5adSJuan Quintela     multifd_send_state->pages = multifd_pages_init(page_count);
117293fa9dc2SFabiano Rosas     qemu_sem_init(&multifd_send_state->channels_created, 0);
1173d32ca5adSJuan Quintela     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1174d73415a3SStefan Hajnoczi     qatomic_set(&multifd_send_state->exiting, 0);
1175ab7cbb0bSJuan Quintela     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1176d32ca5adSJuan Quintela 
1177d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1178d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
11790bd5b928SFabiano Rosas         Error *local_err = NULL;
1180d32ca5adSJuan Quintela 
1181d32ca5adSJuan Quintela         qemu_sem_init(&p->sem, 0);
1182d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1183d32ca5adSJuan Quintela         p->id = i;
1184d32ca5adSJuan Quintela         p->pages = multifd_pages_init(page_count);
118506833d83SFabiano Rosas 
118606833d83SFabiano Rosas         if (use_packets) {
1187d32ca5adSJuan Quintela             p->packet_len = sizeof(MultiFDPacket_t)
1188d32ca5adSJuan Quintela                           + sizeof(uint64_t) * page_count;
1189d32ca5adSJuan Quintela             p->packet = g_malloc0(p->packet_len);
1190d32ca5adSJuan Quintela             p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1191d32ca5adSJuan Quintela             p->packet->version = cpu_to_be32(MULTIFD_VERSION);
119206833d83SFabiano Rosas         }
119360ce4767SPeter Xu         p->name = g_strdup_printf("mig/src/send_%d", i);
1194ddec20f8SJuan Quintela         p->page_size = qemu_target_page_size();
1195d6f45ebaSJuan Quintela         p->page_count = page_count;
11965b1d9babSLeonardo Bras         p->write_flags = 0;
1197b7b03eb6SFabiano Rosas 
1198b7b03eb6SFabiano Rosas         if (!multifd_new_send_channel_create(p, &local_err)) {
11990bd5b928SFabiano Rosas             migrate_set_error(s, local_err);
12000bd5b928SFabiano Rosas             ret = -1;
1201b7b03eb6SFabiano Rosas         }
1202d32ca5adSJuan Quintela     }
1203ab7cbb0bSJuan Quintela 
120493fa9dc2SFabiano Rosas     /*
120593fa9dc2SFabiano Rosas      * Wait until channel creation has started for all channels. The
120693fa9dc2SFabiano Rosas      * creation can still fail, but no more channels will be created
120793fa9dc2SFabiano Rosas      * past this point.
120893fa9dc2SFabiano Rosas      */
120993fa9dc2SFabiano Rosas     for (i = 0; i < thread_count; i++) {
121093fa9dc2SFabiano Rosas         qemu_sem_wait(&multifd_send_state->channels_created);
121193fa9dc2SFabiano Rosas     }
121293fa9dc2SFabiano Rosas 
12130bd5b928SFabiano Rosas     if (ret) {
12140bd5b928SFabiano Rosas         goto err;
12150bd5b928SFabiano Rosas     }
12160bd5b928SFabiano Rosas 
1217ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1218ab7cbb0bSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
12190bd5b928SFabiano Rosas         Error *local_err = NULL;
1220ab7cbb0bSJuan Quintela 
1221bd8b0a8fSFabiano Rosas         ret = multifd_send_state->ops->send_setup(p, &local_err);
1222ab7cbb0bSJuan Quintela         if (ret) {
1223bd8b0a8fSFabiano Rosas             migrate_set_error(s, local_err);
12240bd5b928SFabiano Rosas             goto err;
12250bd5b928SFabiano Rosas         }
1226bd8b0a8fSFabiano Rosas     }
1227bd8b0a8fSFabiano Rosas 
1228bd8b0a8fSFabiano Rosas     return true;
12290bd5b928SFabiano Rosas 
12300bd5b928SFabiano Rosas err:
12310bd5b928SFabiano Rosas     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
12320bd5b928SFabiano Rosas                       MIGRATION_STATUS_FAILED);
12330bd5b928SFabiano Rosas     return false;
1234d32ca5adSJuan Quintela }
1235d32ca5adSJuan Quintela 
multifd_recv(void)1236d117ed06SFabiano Rosas bool multifd_recv(void)
1237d117ed06SFabiano Rosas {
1238d117ed06SFabiano Rosas     int i;
1239d117ed06SFabiano Rosas     static int next_recv_channel;
1240d117ed06SFabiano Rosas     MultiFDRecvParams *p = NULL;
1241d117ed06SFabiano Rosas     MultiFDRecvData *data = multifd_recv_state->data;
1242d117ed06SFabiano Rosas 
1243d117ed06SFabiano Rosas     /*
1244d117ed06SFabiano Rosas      * next_channel can remain from a previous migration that was
1245d117ed06SFabiano Rosas      * using more channels, so ensure it doesn't overflow if the
1246d117ed06SFabiano Rosas      * limit is lower now.
1247d117ed06SFabiano Rosas      */
1248d117ed06SFabiano Rosas     next_recv_channel %= migrate_multifd_channels();
1249d117ed06SFabiano Rosas     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1250d117ed06SFabiano Rosas         if (multifd_recv_should_exit()) {
1251d117ed06SFabiano Rosas             return false;
1252d117ed06SFabiano Rosas         }
1253d117ed06SFabiano Rosas 
1254d117ed06SFabiano Rosas         p = &multifd_recv_state->params[i];
1255d117ed06SFabiano Rosas 
1256d117ed06SFabiano Rosas         if (qatomic_read(&p->pending_job) == false) {
1257d117ed06SFabiano Rosas             next_recv_channel = (i + 1) % migrate_multifd_channels();
1258d117ed06SFabiano Rosas             break;
1259d117ed06SFabiano Rosas         }
1260d117ed06SFabiano Rosas     }
1261d117ed06SFabiano Rosas 
1262d117ed06SFabiano Rosas     /*
1263d117ed06SFabiano Rosas      * Order pending_job read before manipulating p->data below. Pairs
1264d117ed06SFabiano Rosas      * with qatomic_store_release() at multifd_recv_thread().
1265d117ed06SFabiano Rosas      */
1266d117ed06SFabiano Rosas     smp_mb_acquire();
1267d117ed06SFabiano Rosas 
1268d117ed06SFabiano Rosas     assert(!p->data->size);
1269d117ed06SFabiano Rosas     multifd_recv_state->data = p->data;
1270d117ed06SFabiano Rosas     p->data = data;
1271d117ed06SFabiano Rosas 
1272d117ed06SFabiano Rosas     /*
1273d117ed06SFabiano Rosas      * Order p->data update before setting pending_job. Pairs with
1274d117ed06SFabiano Rosas      * qatomic_load_acquire() at multifd_recv_thread().
1275d117ed06SFabiano Rosas      */
1276d117ed06SFabiano Rosas     qatomic_store_release(&p->pending_job, true);
1277d117ed06SFabiano Rosas     qemu_sem_post(&p->sem);
1278d117ed06SFabiano Rosas 
1279d117ed06SFabiano Rosas     return true;
1280d117ed06SFabiano Rosas }
1281d117ed06SFabiano Rosas 
multifd_get_recv_data(void)1282d117ed06SFabiano Rosas MultiFDRecvData *multifd_get_recv_data(void)
1283d117ed06SFabiano Rosas {
1284d117ed06SFabiano Rosas     return multifd_recv_state->data;
1285d117ed06SFabiano Rosas }
1286d117ed06SFabiano Rosas 
multifd_recv_terminate_threads(Error * err)1287d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err)
1288d32ca5adSJuan Quintela {
1289d32ca5adSJuan Quintela     int i;
1290d32ca5adSJuan Quintela 
1291d32ca5adSJuan Quintela     trace_multifd_recv_terminate_threads(err != NULL);
1292d32ca5adSJuan Quintela 
129311dd7be5SFabiano Rosas     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
129411dd7be5SFabiano Rosas         return;
129511dd7be5SFabiano Rosas     }
129611dd7be5SFabiano Rosas 
1297d32ca5adSJuan Quintela     if (err) {
1298d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
1299d32ca5adSJuan Quintela         migrate_set_error(s, err);
1300d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
1301d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
1302d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
1303d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
1304d32ca5adSJuan Quintela         }
1305d32ca5adSJuan Quintela     }
1306d32ca5adSJuan Quintela 
1307d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1308d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1309d32ca5adSJuan Quintela 
1310d32ca5adSJuan Quintela         /*
1311d117ed06SFabiano Rosas          * The migration thread and channels interact differently
1312d117ed06SFabiano Rosas          * depending on the presence of packets.
1313d13f0026SFabiano Rosas          */
131406833d83SFabiano Rosas         if (multifd_use_packets()) {
1315d117ed06SFabiano Rosas             /*
1316d117ed06SFabiano Rosas              * The channel receives as long as there are packets. When
1317d117ed06SFabiano Rosas              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1318d117ed06SFabiano Rosas              * channel waits for the migration thread to sync. If the
1319d117ed06SFabiano Rosas              * sync never happens, do it here.
1320d117ed06SFabiano Rosas              */
1321d13f0026SFabiano Rosas             qemu_sem_post(&p->sem_sync);
1322d117ed06SFabiano Rosas         } else {
1323d117ed06SFabiano Rosas             /*
1324d117ed06SFabiano Rosas              * The channel waits for the migration thread to give it
1325d117ed06SFabiano Rosas              * work. When the migration thread runs out of work, it
1326d117ed06SFabiano Rosas              * releases the channel and waits for any pending work to
1327d117ed06SFabiano Rosas              * finish. If we reach here (e.g. due to error) before the
1328d117ed06SFabiano Rosas              * work runs out, release the channel.
1329d117ed06SFabiano Rosas              */
1330d117ed06SFabiano Rosas             qemu_sem_post(&p->sem);
133106833d83SFabiano Rosas         }
1332d13f0026SFabiano Rosas 
1333d13f0026SFabiano Rosas         /*
1334d32ca5adSJuan Quintela          * We could arrive here for two reasons:
1335d32ca5adSJuan Quintela          *  - normal quit, i.e. everything went fine, just finished
1336d32ca5adSJuan Quintela          *  - error quit: We close the channels so the channel threads
1337d32ca5adSJuan Quintela          *    finish the qio_channel_read_all_eof()
1338d32ca5adSJuan Quintela          */
1339d32ca5adSJuan Quintela         if (p->c) {
1340d32ca5adSJuan Quintela             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1341d32ca5adSJuan Quintela         }
1342d32ca5adSJuan Quintela     }
1343d32ca5adSJuan Quintela }
1344d32ca5adSJuan Quintela 
multifd_recv_shutdown(void)1345cde85c37SPeter Xu void multifd_recv_shutdown(void)
1346cfc3bcf3SLeonardo Bras {
134751b07548SJuan Quintela     if (migrate_multifd()) {
1348cfc3bcf3SLeonardo Bras         multifd_recv_terminate_threads(NULL);
1349cfc3bcf3SLeonardo Bras     }
1350cfc3bcf3SLeonardo Bras }
1351cfc3bcf3SLeonardo Bras 
multifd_recv_cleanup_channel(MultiFDRecvParams * p)13525e6ea8a1SPeter Xu static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
13535e6ea8a1SPeter Xu {
13545e6ea8a1SPeter Xu     migration_ioc_unregister_yank(p->c);
13555e6ea8a1SPeter Xu     object_unref(OBJECT(p->c));
13565e6ea8a1SPeter Xu     p->c = NULL;
13575e6ea8a1SPeter Xu     qemu_mutex_destroy(&p->mutex);
13585e6ea8a1SPeter Xu     qemu_sem_destroy(&p->sem_sync);
1359d117ed06SFabiano Rosas     qemu_sem_destroy(&p->sem);
1360*4c107870SPeter Maydell     g_free(p->data);
1361*4c107870SPeter Maydell     p->data = NULL;
13625e6ea8a1SPeter Xu     g_free(p->name);
13635e6ea8a1SPeter Xu     p->name = NULL;
13645e6ea8a1SPeter Xu     p->packet_len = 0;
13655e6ea8a1SPeter Xu     g_free(p->packet);
13665e6ea8a1SPeter Xu     p->packet = NULL;
13675e6ea8a1SPeter Xu     g_free(p->normal);
13685e6ea8a1SPeter Xu     p->normal = NULL;
1369303e6f54SHao Xiang     g_free(p->zero);
1370303e6f54SHao Xiang     p->zero = NULL;
13715e6ea8a1SPeter Xu     multifd_recv_state->ops->recv_cleanup(p);
13725e6ea8a1SPeter Xu }
13735e6ea8a1SPeter Xu 
multifd_recv_cleanup_state(void)13745e6ea8a1SPeter Xu static void multifd_recv_cleanup_state(void)
13755e6ea8a1SPeter Xu {
13765e6ea8a1SPeter Xu     qemu_sem_destroy(&multifd_recv_state->sem_sync);
13775e6ea8a1SPeter Xu     g_free(multifd_recv_state->params);
13785e6ea8a1SPeter Xu     multifd_recv_state->params = NULL;
1379d117ed06SFabiano Rosas     g_free(multifd_recv_state->data);
1380d117ed06SFabiano Rosas     multifd_recv_state->data = NULL;
13815e6ea8a1SPeter Xu     g_free(multifd_recv_state);
13825e6ea8a1SPeter Xu     multifd_recv_state = NULL;
13835e6ea8a1SPeter Xu }
13845e6ea8a1SPeter Xu 
multifd_recv_cleanup(void)1385cde85c37SPeter Xu void multifd_recv_cleanup(void)
1386d32ca5adSJuan Quintela {
1387d32ca5adSJuan Quintela     int i;
1388d32ca5adSJuan Quintela 
138951b07548SJuan Quintela     if (!migrate_multifd()) {
1390e5bac1f5SLeonardo Bras         return;
1391d32ca5adSJuan Quintela     }
1392d32ca5adSJuan Quintela     multifd_recv_terminate_threads(NULL);
1393d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1394d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1395d32ca5adSJuan Quintela 
1396a2a63c4aSFabiano Rosas         if (p->thread_created) {
139710351fbaSLeonardo Bras             qemu_thread_join(&p->thread);
1398d32ca5adSJuan Quintela         }
1399a2a63c4aSFabiano Rosas     }
1400d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
14015e6ea8a1SPeter Xu         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1402d32ca5adSJuan Quintela     }
14035e6ea8a1SPeter Xu     multifd_recv_cleanup_state();
1404d32ca5adSJuan Quintela }
1405d32ca5adSJuan Quintela 
multifd_recv_sync_main(void)1406d32ca5adSJuan Quintela void multifd_recv_sync_main(void)
1407d32ca5adSJuan Quintela {
14084aac6b1eSFabiano Rosas     int thread_count = migrate_multifd_channels();
1409a49d15a3SFabiano Rosas     bool file_based = !multifd_use_packets();
1410d32ca5adSJuan Quintela     int i;
1411d32ca5adSJuan Quintela 
1412a49d15a3SFabiano Rosas     if (!migrate_multifd()) {
1413d32ca5adSJuan Quintela         return;
1414d32ca5adSJuan Quintela     }
1415d32ca5adSJuan Quintela 
14164aac6b1eSFabiano Rosas     /*
1417a49d15a3SFabiano Rosas      * File-based channels don't use packets and therefore need to
1418a49d15a3SFabiano Rosas      * wait for more work. Release them to start the sync.
1419a49d15a3SFabiano Rosas      */
1420a49d15a3SFabiano Rosas     if (file_based) {
1421a49d15a3SFabiano Rosas         for (i = 0; i < thread_count; i++) {
1422a49d15a3SFabiano Rosas             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1423a49d15a3SFabiano Rosas 
1424a49d15a3SFabiano Rosas             trace_multifd_recv_sync_main_signal(p->id);
1425a49d15a3SFabiano Rosas             qemu_sem_post(&p->sem);
1426a49d15a3SFabiano Rosas         }
1427a49d15a3SFabiano Rosas     }
1428a49d15a3SFabiano Rosas 
1429a49d15a3SFabiano Rosas     /*
14304aac6b1eSFabiano Rosas      * Initiate the synchronization by waiting for all channels.
1431a49d15a3SFabiano Rosas      *
14324aac6b1eSFabiano Rosas      * For socket-based migration this means each channel has received
14334aac6b1eSFabiano Rosas      * the SYNC packet on the stream.
1434a49d15a3SFabiano Rosas      *
1435a49d15a3SFabiano Rosas      * For file-based migration this means each channel is done with
1436a49d15a3SFabiano Rosas      * the work (pending_job=false).
14374aac6b1eSFabiano Rosas      */
14384aac6b1eSFabiano Rosas     for (i = 0; i < thread_count; i++) {
14394aac6b1eSFabiano Rosas         trace_multifd_recv_sync_main_wait(i);
1440d32ca5adSJuan Quintela         qemu_sem_wait(&multifd_recv_state->sem_sync);
1441d32ca5adSJuan Quintela     }
14424aac6b1eSFabiano Rosas 
1443a49d15a3SFabiano Rosas     if (file_based) {
1444a49d15a3SFabiano Rosas         /*
1445a49d15a3SFabiano Rosas          * For file-based loading is done in one iteration. We're
1446a49d15a3SFabiano Rosas          * done.
1447a49d15a3SFabiano Rosas          */
1448a49d15a3SFabiano Rosas         return;
1449a49d15a3SFabiano Rosas     }
1450a49d15a3SFabiano Rosas 
14514aac6b1eSFabiano Rosas     /*
14524aac6b1eSFabiano Rosas      * Sync done. Release the channels for the next iteration.
14534aac6b1eSFabiano Rosas      */
14544aac6b1eSFabiano Rosas     for (i = 0; i < thread_count; i++) {
1455d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1456d32ca5adSJuan Quintela 
14576e8a355dSDaniel Brodsky         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1458d32ca5adSJuan Quintela             if (multifd_recv_state->packet_num < p->packet_num) {
1459d32ca5adSJuan Quintela                 multifd_recv_state->packet_num = p->packet_num;
1460d32ca5adSJuan Quintela             }
14616e8a355dSDaniel Brodsky         }
1462d32ca5adSJuan Quintela         trace_multifd_recv_sync_main_signal(p->id);
1463d32ca5adSJuan Quintela         qemu_sem_post(&p->sem_sync);
1464d32ca5adSJuan Quintela     }
1465d32ca5adSJuan Quintela     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1466d32ca5adSJuan Quintela }
1467d32ca5adSJuan Quintela 
multifd_recv_thread(void * opaque)1468d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque)
1469d32ca5adSJuan Quintela {
1470d32ca5adSJuan Quintela     MultiFDRecvParams *p = opaque;
1471d32ca5adSJuan Quintela     Error *local_err = NULL;
147206833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1473d32ca5adSJuan Quintela     int ret;
1474d32ca5adSJuan Quintela 
1475d32ca5adSJuan Quintela     trace_multifd_recv_thread_start(p->id);
1476d32ca5adSJuan Quintela     rcu_register_thread();
1477d32ca5adSJuan Quintela 
1478d32ca5adSJuan Quintela     while (true) {
147906833d83SFabiano Rosas         uint32_t flags = 0;
14809db19125SFabiano Rosas         bool has_data = false;
14819db19125SFabiano Rosas         p->normal_num = 0;
1482d32ca5adSJuan Quintela 
1483d117ed06SFabiano Rosas         if (use_packets) {
148411dd7be5SFabiano Rosas             if (multifd_recv_should_exit()) {
1485d32ca5adSJuan Quintela                 break;
1486d32ca5adSJuan Quintela             }
1487d32ca5adSJuan Quintela 
1488d32ca5adSJuan Quintela             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1489d32ca5adSJuan Quintela                                            p->packet_len, &local_err);
1490bca762c2SLi Zhang             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1491d32ca5adSJuan Quintela                 break;
1492d32ca5adSJuan Quintela             }
1493d32ca5adSJuan Quintela 
1494d32ca5adSJuan Quintela             qemu_mutex_lock(&p->mutex);
1495d32ca5adSJuan Quintela             ret = multifd_recv_unfill_packet(p, &local_err);
1496d32ca5adSJuan Quintela             if (ret) {
1497d32ca5adSJuan Quintela                 qemu_mutex_unlock(&p->mutex);
1498d32ca5adSJuan Quintela                 break;
1499d32ca5adSJuan Quintela             }
1500d32ca5adSJuan Quintela 
1501d32ca5adSJuan Quintela             flags = p->flags;
1502ab7cbb0bSJuan Quintela             /* recv methods don't know how to handle the SYNC flag */
1503ab7cbb0bSJuan Quintela             p->flags &= ~MULTIFD_FLAG_SYNC;
1504303e6f54SHao Xiang             has_data = p->normal_num || p->zero_num;
1505d32ca5adSJuan Quintela             qemu_mutex_unlock(&p->mutex);
1506d117ed06SFabiano Rosas         } else {
1507d117ed06SFabiano Rosas             /*
1508d117ed06SFabiano Rosas              * No packets, so we need to wait for the vmstate code to
1509d117ed06SFabiano Rosas              * give us work.
1510d117ed06SFabiano Rosas              */
1511d117ed06SFabiano Rosas             qemu_sem_wait(&p->sem);
1512d117ed06SFabiano Rosas 
1513d117ed06SFabiano Rosas             if (multifd_recv_should_exit()) {
1514d117ed06SFabiano Rosas                 break;
1515d117ed06SFabiano Rosas             }
1516d117ed06SFabiano Rosas 
1517d117ed06SFabiano Rosas             /* pairs with qatomic_store_release() at multifd_recv() */
1518d117ed06SFabiano Rosas             if (!qatomic_load_acquire(&p->pending_job)) {
1519d117ed06SFabiano Rosas                 /*
1520d117ed06SFabiano Rosas                  * Migration thread did not send work, this is
1521d117ed06SFabiano Rosas                  * equivalent to pending_sync on the sending
1522d117ed06SFabiano Rosas                  * side. Post sem_sync to notify we reached this
1523d117ed06SFabiano Rosas                  * point.
1524d117ed06SFabiano Rosas                  */
1525d117ed06SFabiano Rosas                 qemu_sem_post(&multifd_recv_state->sem_sync);
1526d117ed06SFabiano Rosas                 continue;
1527d117ed06SFabiano Rosas             }
1528d117ed06SFabiano Rosas 
1529d117ed06SFabiano Rosas             has_data = !!p->data->size;
153006833d83SFabiano Rosas         }
1531d32ca5adSJuan Quintela 
15329db19125SFabiano Rosas         if (has_data) {
15339db19125SFabiano Rosas             ret = multifd_recv_state->ops->recv(p, &local_err);
1534d32ca5adSJuan Quintela             if (ret != 0) {
1535d32ca5adSJuan Quintela                 break;
1536d32ca5adSJuan Quintela             }
1537d32ca5adSJuan Quintela         }
1538d32ca5adSJuan Quintela 
153906833d83SFabiano Rosas         if (use_packets) {
1540d32ca5adSJuan Quintela             if (flags & MULTIFD_FLAG_SYNC) {
1541d32ca5adSJuan Quintela                 qemu_sem_post(&multifd_recv_state->sem_sync);
1542d32ca5adSJuan Quintela                 qemu_sem_wait(&p->sem_sync);
1543d32ca5adSJuan Quintela             }
1544d117ed06SFabiano Rosas         } else {
1545d117ed06SFabiano Rosas             p->total_normal_pages += p->data->size / qemu_target_page_size();
1546d117ed06SFabiano Rosas             p->data->size = 0;
1547d117ed06SFabiano Rosas             /*
1548d117ed06SFabiano Rosas              * Order data->size update before clearing
1549d117ed06SFabiano Rosas              * pending_job. Pairs with smp_mb_acquire() at
1550d117ed06SFabiano Rosas              * multifd_recv().
1551d117ed06SFabiano Rosas              */
1552d117ed06SFabiano Rosas             qatomic_store_release(&p->pending_job, false);
1553d32ca5adSJuan Quintela         }
155406833d83SFabiano Rosas     }
1555d32ca5adSJuan Quintela 
1556d32ca5adSJuan Quintela     if (local_err) {
1557d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
155813f2cb21SPan Nengyuan         error_free(local_err);
1559d32ca5adSJuan Quintela     }
1560d32ca5adSJuan Quintela 
1561d32ca5adSJuan Quintela     rcu_unregister_thread();
1562303e6f54SHao Xiang     trace_multifd_recv_thread_end(p->id, p->packets_recved,
1563303e6f54SHao Xiang                                   p->total_normal_pages,
1564303e6f54SHao Xiang                                   p->total_zero_pages);
1565d32ca5adSJuan Quintela 
1566d32ca5adSJuan Quintela     return NULL;
1567d32ca5adSJuan Quintela }
1568d32ca5adSJuan Quintela 
multifd_recv_setup(Error ** errp)1569cde85c37SPeter Xu int multifd_recv_setup(Error **errp)
1570d32ca5adSJuan Quintela {
1571d32ca5adSJuan Quintela     int thread_count;
1572d32ca5adSJuan Quintela     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
157306833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1574d32ca5adSJuan Quintela     uint8_t i;
1575d32ca5adSJuan Quintela 
15766720c2b3Smanish.mishra     /*
15776720c2b3Smanish.mishra      * Return successfully if multiFD recv state is already initialised
15786720c2b3Smanish.mishra      * or multiFD is not enabled.
15796720c2b3Smanish.mishra      */
158051b07548SJuan Quintela     if (multifd_recv_state || !migrate_multifd()) {
1581d32ca5adSJuan Quintela         return 0;
1582d32ca5adSJuan Quintela     }
15836720c2b3Smanish.mishra 
1584d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
1585d32ca5adSJuan Quintela     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1586d32ca5adSJuan Quintela     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1587d117ed06SFabiano Rosas 
1588d117ed06SFabiano Rosas     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1589d117ed06SFabiano Rosas     multifd_recv_state->data->size = 0;
1590d117ed06SFabiano Rosas 
1591d73415a3SStefan Hajnoczi     qatomic_set(&multifd_recv_state->count, 0);
159211dd7be5SFabiano Rosas     qatomic_set(&multifd_recv_state->exiting, 0);
1593d32ca5adSJuan Quintela     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1594ab7cbb0bSJuan Quintela     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1595d32ca5adSJuan Quintela 
1596d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1597d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1598d32ca5adSJuan Quintela 
1599d32ca5adSJuan Quintela         qemu_mutex_init(&p->mutex);
1600d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1601d117ed06SFabiano Rosas         qemu_sem_init(&p->sem, 0);
1602d117ed06SFabiano Rosas         p->pending_job = false;
1603d32ca5adSJuan Quintela         p->id = i;
160406833d83SFabiano Rosas 
1605d117ed06SFabiano Rosas         p->data = g_new0(MultiFDRecvData, 1);
1606d117ed06SFabiano Rosas         p->data->size = 0;
1607d117ed06SFabiano Rosas 
160806833d83SFabiano Rosas         if (use_packets) {
1609d32ca5adSJuan Quintela             p->packet_len = sizeof(MultiFDPacket_t)
1610d32ca5adSJuan Quintela                 + sizeof(uint64_t) * page_count;
1611d32ca5adSJuan Quintela             p->packet = g_malloc0(p->packet_len);
161206833d83SFabiano Rosas         }
161360ce4767SPeter Xu         p->name = g_strdup_printf("mig/dst/recv_%d", i);
1614cf2d4aa8SJuan Quintela         p->normal = g_new0(ram_addr_t, page_count);
1615303e6f54SHao Xiang         p->zero = g_new0(ram_addr_t, page_count);
1616d6f45ebaSJuan Quintela         p->page_count = page_count;
1617ddec20f8SJuan Quintela         p->page_size = qemu_target_page_size();
1618d32ca5adSJuan Quintela     }
1619ab7cbb0bSJuan Quintela 
1620ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1621ab7cbb0bSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1622ab7cbb0bSJuan Quintela         int ret;
1623ab7cbb0bSJuan Quintela 
16243fc58efaSAvihai Horon         ret = multifd_recv_state->ops->recv_setup(p, errp);
1625ab7cbb0bSJuan Quintela         if (ret) {
1626ab7cbb0bSJuan Quintela             return ret;
1627ab7cbb0bSJuan Quintela         }
1628ab7cbb0bSJuan Quintela     }
1629d32ca5adSJuan Quintela     return 0;
1630d32ca5adSJuan Quintela }
1631d32ca5adSJuan Quintela 
multifd_recv_all_channels_created(void)1632d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void)
1633d32ca5adSJuan Quintela {
1634d32ca5adSJuan Quintela     int thread_count = migrate_multifd_channels();
1635d32ca5adSJuan Quintela 
163651b07548SJuan Quintela     if (!migrate_multifd()) {
1637d32ca5adSJuan Quintela         return true;
1638d32ca5adSJuan Quintela     }
1639d32ca5adSJuan Quintela 
1640a59136f3SDr. David Alan Gilbert     if (!multifd_recv_state) {
1641a59136f3SDr. David Alan Gilbert         /* Called before any connections created */
1642a59136f3SDr. David Alan Gilbert         return false;
1643a59136f3SDr. David Alan Gilbert     }
1644a59136f3SDr. David Alan Gilbert 
1645d73415a3SStefan Hajnoczi     return thread_count == qatomic_read(&multifd_recv_state->count);
1646d32ca5adSJuan Quintela }
1647d32ca5adSJuan Quintela 
1648d32ca5adSJuan Quintela /*
1649d32ca5adSJuan Quintela  * Try to receive all multifd channels to get ready for the migration.
16506720c2b3Smanish.mishra  * Sets @errp when failing to receive the current channel.
1651d32ca5adSJuan Quintela  */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)16526720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1653d32ca5adSJuan Quintela {
1654d32ca5adSJuan Quintela     MultiFDRecvParams *p;
1655d32ca5adSJuan Quintela     Error *local_err = NULL;
165606833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1657d32ca5adSJuan Quintela     int id;
1658d32ca5adSJuan Quintela 
165906833d83SFabiano Rosas     if (use_packets) {
1660d32ca5adSJuan Quintela         id = multifd_recv_initial_packet(ioc, &local_err);
1661d32ca5adSJuan Quintela         if (id < 0) {
1662d32ca5adSJuan Quintela             multifd_recv_terminate_threads(local_err);
1663d32ca5adSJuan Quintela             error_propagate_prepend(errp, local_err,
1664d32ca5adSJuan Quintela                                     "failed to receive packet"
1665d32ca5adSJuan Quintela                                     " via multifd channel %d: ",
1666d73415a3SStefan Hajnoczi                                     qatomic_read(&multifd_recv_state->count));
16676720c2b3Smanish.mishra             return;
1668d32ca5adSJuan Quintela         }
1669d32ca5adSJuan Quintela         trace_multifd_recv_new_channel(id);
167006833d83SFabiano Rosas     } else {
16712dd7ee7aSFabiano Rosas         id = qatomic_read(&multifd_recv_state->count);
167206833d83SFabiano Rosas     }
1673d32ca5adSJuan Quintela 
1674d32ca5adSJuan Quintela     p = &multifd_recv_state->params[id];
1675d32ca5adSJuan Quintela     if (p->c != NULL) {
1676d32ca5adSJuan Quintela         error_setg(&local_err, "multifd: received id '%d' already setup'",
1677d32ca5adSJuan Quintela                    id);
1678d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
1679d32ca5adSJuan Quintela         error_propagate(errp, local_err);
16806720c2b3Smanish.mishra         return;
1681d32ca5adSJuan Quintela     }
1682d32ca5adSJuan Quintela     p->c = ioc;
1683d32ca5adSJuan Quintela     object_ref(OBJECT(ioc));
1684d32ca5adSJuan Quintela 
1685a2a63c4aSFabiano Rosas     p->thread_created = true;
1686d32ca5adSJuan Quintela     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1687d32ca5adSJuan Quintela                        QEMU_THREAD_JOINABLE);
1688d73415a3SStefan Hajnoczi     qatomic_inc(&multifd_recv_state->count);
1689d32ca5adSJuan Quintela }
1690303e6f54SHao Xiang 
multifd_send_prepare_common(MultiFDSendParams * p)1691303e6f54SHao Xiang bool multifd_send_prepare_common(MultiFDSendParams *p)
1692303e6f54SHao Xiang {
1693303e6f54SHao Xiang     multifd_send_zero_page_detect(p);
1694303e6f54SHao Xiang 
1695303e6f54SHao Xiang     if (!p->pages->normal_num) {
1696303e6f54SHao Xiang         p->next_packet_size = 0;
1697303e6f54SHao Xiang         return false;
1698303e6f54SHao Xiang     }
1699303e6f54SHao Xiang 
1700303e6f54SHao Xiang     multifd_send_prepare_header(p);
1701303e6f54SHao Xiang 
1702303e6f54SHao Xiang     return true;
1703303e6f54SHao Xiang }
1704