xref: /openbmc/qemu/migration/multifd.c (revision 488c84acb465c21b716c3fd14de27ab5ce388c85)
1d32ca5adSJuan Quintela /*
2d32ca5adSJuan Quintela  * Multifd common code
3d32ca5adSJuan Quintela  *
4d32ca5adSJuan Quintela  * Copyright (c) 2019-2020 Red Hat Inc
5d32ca5adSJuan Quintela  *
6d32ca5adSJuan Quintela  * Authors:
7d32ca5adSJuan Quintela  *  Juan Quintela <quintela@redhat.com>
8d32ca5adSJuan Quintela  *
9d32ca5adSJuan Quintela  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10d32ca5adSJuan Quintela  * See the COPYING file in the top-level directory.
11d32ca5adSJuan Quintela  */
12d32ca5adSJuan Quintela 
13d32ca5adSJuan Quintela #include "qemu/osdep.h"
14d32ca5adSJuan Quintela #include "qemu/rcu.h"
15d32ca5adSJuan Quintela #include "exec/target_page.h"
16d32ca5adSJuan Quintela #include "sysemu/sysemu.h"
17d32ca5adSJuan Quintela #include "exec/ramblock.h"
18d32ca5adSJuan Quintela #include "qemu/error-report.h"
19d32ca5adSJuan Quintela #include "qapi/error.h"
20d32ca5adSJuan Quintela #include "ram.h"
21d32ca5adSJuan Quintela #include "migration.h"
22947701ccSJuan Quintela #include "migration-stats.h"
23d32ca5adSJuan Quintela #include "socket.h"
2429647140SChuan Zheng #include "tls.h"
25d32ca5adSJuan Quintela #include "qemu-file.h"
26d32ca5adSJuan Quintela #include "trace.h"
27d32ca5adSJuan Quintela #include "multifd.h"
281b1f4ab6SJiang Jiacheng #include "threadinfo.h"
29b4bc342cSJuan Quintela #include "options.h"
30b5eea99eSLukas Straub #include "qemu/yank.h"
31b5eea99eSLukas Straub #include "io/channel-socket.h"
321a92d6d5SLukas Straub #include "yank_functions.h"
33b5eea99eSLukas Straub 
34d32ca5adSJuan Quintela /* Multiple fd's */
35d32ca5adSJuan Quintela 
36d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U
37d32ca5adSJuan Quintela #define MULTIFD_VERSION 1
38d32ca5adSJuan Quintela 
39d32ca5adSJuan Quintela typedef struct {
40d32ca5adSJuan Quintela     uint32_t magic;
41d32ca5adSJuan Quintela     uint32_t version;
42d32ca5adSJuan Quintela     unsigned char uuid[16]; /* QemuUUID */
43d32ca5adSJuan Quintela     uint8_t id;
44d32ca5adSJuan Quintela     uint8_t unused1[7];     /* Reserved for future use */
45d32ca5adSJuan Quintela     uint64_t unused2[4];    /* Reserved for future use */
46d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t;
47d32ca5adSJuan Quintela 
4898ea497dSPeter Xu struct {
4998ea497dSPeter Xu     MultiFDSendParams *params;
5098ea497dSPeter Xu     /* array of pages to sent */
5198ea497dSPeter Xu     MultiFDPages_t *pages;
5298ea497dSPeter Xu     /*
5398ea497dSPeter Xu      * Global number of generated multifd packets.
5498ea497dSPeter Xu      *
5598ea497dSPeter Xu      * Note that we used 'uintptr_t' because it'll naturally support atomic
5698ea497dSPeter Xu      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
5798ea497dSPeter Xu      * multifd will overflow the packet_num easier, but that should be
5898ea497dSPeter Xu      * fine.
5998ea497dSPeter Xu      *
6098ea497dSPeter Xu      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
6198ea497dSPeter Xu      * hosts, however so far it does not support atomic fetch_add() yet.
6298ea497dSPeter Xu      * Make it easy for now.
6398ea497dSPeter Xu      */
6498ea497dSPeter Xu     uintptr_t packet_num;
6598ea497dSPeter Xu     /* send channels ready */
6698ea497dSPeter Xu     QemuSemaphore channels_ready;
6798ea497dSPeter Xu     /*
6898ea497dSPeter Xu      * Have we already run terminate threads.  There is a race when it
6998ea497dSPeter Xu      * happens that we got one error while we are exiting.
7098ea497dSPeter Xu      * We will use atomic operations.  Only valid values are 0 and 1.
7198ea497dSPeter Xu      */
7298ea497dSPeter Xu     int exiting;
7398ea497dSPeter Xu     /* multifd ops */
7498ea497dSPeter Xu     MultiFDMethods *ops;
7598ea497dSPeter Xu } *multifd_send_state;
7698ea497dSPeter Xu 
77ab7cbb0bSJuan Quintela /* Multifd without compression */
78ab7cbb0bSJuan Quintela 
79ab7cbb0bSJuan Quintela /**
80ab7cbb0bSJuan Quintela  * nocomp_send_setup: setup send side
81ab7cbb0bSJuan Quintela  *
82ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
83ab7cbb0bSJuan Quintela  * @errp: pointer to an error
84ab7cbb0bSJuan Quintela  */
85ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
86ab7cbb0bSJuan Quintela {
8725a1f878SPeter Xu     if (migrate_zero_copy_send()) {
8825a1f878SPeter Xu         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
8925a1f878SPeter Xu     }
9025a1f878SPeter Xu 
91ab7cbb0bSJuan Quintela     return 0;
92ab7cbb0bSJuan Quintela }
93ab7cbb0bSJuan Quintela 
94ab7cbb0bSJuan Quintela /**
95ab7cbb0bSJuan Quintela  * nocomp_send_cleanup: cleanup send side
96ab7cbb0bSJuan Quintela  *
97ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
98ab7cbb0bSJuan Quintela  *
99ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
10018ede636SJuan Quintela  * @errp: pointer to an error
101ab7cbb0bSJuan Quintela  */
102ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
103ab7cbb0bSJuan Quintela {
104ab7cbb0bSJuan Quintela     return;
105ab7cbb0bSJuan Quintela }
106ab7cbb0bSJuan Quintela 
107ab7cbb0bSJuan Quintela /**
108ab7cbb0bSJuan Quintela  * nocomp_send_prepare: prepare date to be able to send
109ab7cbb0bSJuan Quintela  *
110ab7cbb0bSJuan Quintela  * For no compression we just have to calculate the size of the
111ab7cbb0bSJuan Quintela  * packet.
112ab7cbb0bSJuan Quintela  *
113ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
114ab7cbb0bSJuan Quintela  *
115ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
116ab7cbb0bSJuan Quintela  * @errp: pointer to an error
117ab7cbb0bSJuan Quintela  */
11802fb8104SJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
119ab7cbb0bSJuan Quintela {
12025a1f878SPeter Xu     bool use_zero_copy_send = migrate_zero_copy_send();
121226468baSJuan Quintela     MultiFDPages_t *pages = p->pages;
12225a1f878SPeter Xu     int ret;
12325a1f878SPeter Xu 
12425a1f878SPeter Xu     if (!use_zero_copy_send) {
12525a1f878SPeter Xu         /*
12625a1f878SPeter Xu          * Only !zerocopy needs the header in IOV; zerocopy will
12725a1f878SPeter Xu          * send it separately.
12825a1f878SPeter Xu          */
12925a1f878SPeter Xu         multifd_send_prepare_header(p);
13025a1f878SPeter Xu     }
131226468baSJuan Quintela 
132efd8c543SPeter Xu     for (int i = 0; i < pages->num; i++) {
133efd8c543SPeter Xu         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
134ddec20f8SJuan Quintela         p->iov[p->iovs_num].iov_len = p->page_size;
135226468baSJuan Quintela         p->iovs_num++;
136226468baSJuan Quintela     }
137226468baSJuan Quintela 
138efd8c543SPeter Xu     p->next_packet_size = pages->num * p->page_size;
139ab7cbb0bSJuan Quintela     p->flags |= MULTIFD_FLAG_NOCOMP;
14025a1f878SPeter Xu 
14125a1f878SPeter Xu     multifd_send_fill_packet(p);
14225a1f878SPeter Xu 
14325a1f878SPeter Xu     if (use_zero_copy_send) {
14425a1f878SPeter Xu         /* Send header first, without zerocopy */
14525a1f878SPeter Xu         ret = qio_channel_write_all(p->c, (void *)p->packet,
14625a1f878SPeter Xu                                     p->packet_len, errp);
14725a1f878SPeter Xu         if (ret != 0) {
14825a1f878SPeter Xu             return -1;
14925a1f878SPeter Xu         }
15025a1f878SPeter Xu     }
15125a1f878SPeter Xu 
152ab7cbb0bSJuan Quintela     return 0;
153ab7cbb0bSJuan Quintela }
154ab7cbb0bSJuan Quintela 
155ab7cbb0bSJuan Quintela /**
156ab7cbb0bSJuan Quintela  * nocomp_recv_setup: setup receive side
157ab7cbb0bSJuan Quintela  *
158ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
159ab7cbb0bSJuan Quintela  *
160ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
161ab7cbb0bSJuan Quintela  *
162ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
163ab7cbb0bSJuan Quintela  * @errp: pointer to an error
164ab7cbb0bSJuan Quintela  */
165ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
166ab7cbb0bSJuan Quintela {
167ab7cbb0bSJuan Quintela     return 0;
168ab7cbb0bSJuan Quintela }
169ab7cbb0bSJuan Quintela 
170ab7cbb0bSJuan Quintela /**
171ab7cbb0bSJuan Quintela  * nocomp_recv_cleanup: setup receive side
172ab7cbb0bSJuan Quintela  *
173ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
174ab7cbb0bSJuan Quintela  *
175ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
176ab7cbb0bSJuan Quintela  */
177ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p)
178ab7cbb0bSJuan Quintela {
179ab7cbb0bSJuan Quintela }
180ab7cbb0bSJuan Quintela 
181ab7cbb0bSJuan Quintela /**
182ab7cbb0bSJuan Quintela  * nocomp_recv_pages: read the data from the channel into actual pages
183ab7cbb0bSJuan Quintela  *
184ab7cbb0bSJuan Quintela  * For no compression we just need to read things into the correct place.
185ab7cbb0bSJuan Quintela  *
186ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
187ab7cbb0bSJuan Quintela  *
188ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
189ab7cbb0bSJuan Quintela  * @errp: pointer to an error
190ab7cbb0bSJuan Quintela  */
19140a4bfe9SJuan Quintela static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
192ab7cbb0bSJuan Quintela {
193ab7cbb0bSJuan Quintela     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
194ab7cbb0bSJuan Quintela 
195ab7cbb0bSJuan Quintela     if (flags != MULTIFD_FLAG_NOCOMP) {
19604e11404SJuan Quintela         error_setg(errp, "multifd %u: flags received %x flags expected %x",
197ab7cbb0bSJuan Quintela                    p->id, flags, MULTIFD_FLAG_NOCOMP);
198ab7cbb0bSJuan Quintela         return -1;
199ab7cbb0bSJuan Quintela     }
200cf2d4aa8SJuan Quintela     for (int i = 0; i < p->normal_num; i++) {
201faf60935SJuan Quintela         p->iov[i].iov_base = p->host + p->normal[i];
202ddec20f8SJuan Quintela         p->iov[i].iov_len = p->page_size;
203226468baSJuan Quintela     }
204cf2d4aa8SJuan Quintela     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
205ab7cbb0bSJuan Quintela }
206ab7cbb0bSJuan Quintela 
207ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = {
208ab7cbb0bSJuan Quintela     .send_setup = nocomp_send_setup,
209ab7cbb0bSJuan Quintela     .send_cleanup = nocomp_send_cleanup,
210ab7cbb0bSJuan Quintela     .send_prepare = nocomp_send_prepare,
211ab7cbb0bSJuan Quintela     .recv_setup = nocomp_recv_setup,
212ab7cbb0bSJuan Quintela     .recv_cleanup = nocomp_recv_cleanup,
213ab7cbb0bSJuan Quintela     .recv_pages = nocomp_recv_pages
214ab7cbb0bSJuan Quintela };
215ab7cbb0bSJuan Quintela 
216ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
217ab7cbb0bSJuan Quintela     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
218ab7cbb0bSJuan Quintela };
219ab7cbb0bSJuan Quintela 
2207ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops)
2217ec2c2b3SJuan Quintela {
2227ec2c2b3SJuan Quintela     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
2237ec2c2b3SJuan Quintela     multifd_ops[method] = ops;
2247ec2c2b3SJuan Quintela }
2257ec2c2b3SJuan Quintela 
226836eca47SPeter Xu /* Reset a MultiFDPages_t* object for the next use */
227836eca47SPeter Xu static void multifd_pages_reset(MultiFDPages_t *pages)
228836eca47SPeter Xu {
229836eca47SPeter Xu     /*
230836eca47SPeter Xu      * We don't need to touch offset[] array, because it will be
231836eca47SPeter Xu      * overwritten later when reused.
232836eca47SPeter Xu      */
233836eca47SPeter Xu     pages->num = 0;
234836eca47SPeter Xu     pages->block = NULL;
235836eca47SPeter Xu }
236836eca47SPeter Xu 
237d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
238d32ca5adSJuan Quintela {
239d32ca5adSJuan Quintela     MultiFDInit_t msg = {};
240cbec7eb7SJuan Quintela     size_t size = sizeof(msg);
241d32ca5adSJuan Quintela     int ret;
242d32ca5adSJuan Quintela 
243d32ca5adSJuan Quintela     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
244d32ca5adSJuan Quintela     msg.version = cpu_to_be32(MULTIFD_VERSION);
245d32ca5adSJuan Quintela     msg.id = p->id;
246d32ca5adSJuan Quintela     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
247d32ca5adSJuan Quintela 
248cbec7eb7SJuan Quintela     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
249d32ca5adSJuan Quintela     if (ret != 0) {
250d32ca5adSJuan Quintela         return -1;
251d32ca5adSJuan Quintela     }
252cbec7eb7SJuan Quintela     stat64_add(&mig_stats.multifd_bytes, size);
253d32ca5adSJuan Quintela     return 0;
254d32ca5adSJuan Quintela }
255d32ca5adSJuan Quintela 
256d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
257d32ca5adSJuan Quintela {
258d32ca5adSJuan Quintela     MultiFDInit_t msg;
259d32ca5adSJuan Quintela     int ret;
260d32ca5adSJuan Quintela 
261d32ca5adSJuan Quintela     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
262d32ca5adSJuan Quintela     if (ret != 0) {
263d32ca5adSJuan Quintela         return -1;
264d32ca5adSJuan Quintela     }
265d32ca5adSJuan Quintela 
266d32ca5adSJuan Quintela     msg.magic = be32_to_cpu(msg.magic);
267d32ca5adSJuan Quintela     msg.version = be32_to_cpu(msg.version);
268d32ca5adSJuan Quintela 
269d32ca5adSJuan Quintela     if (msg.magic != MULTIFD_MAGIC) {
270d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet magic %x "
271d32ca5adSJuan Quintela                    "expected %x", msg.magic, MULTIFD_MAGIC);
272d32ca5adSJuan Quintela         return -1;
273d32ca5adSJuan Quintela     }
274d32ca5adSJuan Quintela 
275d32ca5adSJuan Quintela     if (msg.version != MULTIFD_VERSION) {
27604e11404SJuan Quintela         error_setg(errp, "multifd: received packet version %u "
27704e11404SJuan Quintela                    "expected %u", msg.version, MULTIFD_VERSION);
278d32ca5adSJuan Quintela         return -1;
279d32ca5adSJuan Quintela     }
280d32ca5adSJuan Quintela 
281d32ca5adSJuan Quintela     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
282d32ca5adSJuan Quintela         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
283d32ca5adSJuan Quintela         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
284d32ca5adSJuan Quintela 
285d32ca5adSJuan Quintela         error_setg(errp, "multifd: received uuid '%s' and expected "
286d32ca5adSJuan Quintela                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
287d32ca5adSJuan Quintela         g_free(uuid);
288d32ca5adSJuan Quintela         g_free(msg_uuid);
289d32ca5adSJuan Quintela         return -1;
290d32ca5adSJuan Quintela     }
291d32ca5adSJuan Quintela 
292d32ca5adSJuan Quintela     if (msg.id > migrate_multifd_channels()) {
293c77b4085SAvihai Horon         error_setg(errp, "multifd: received channel id %u is greater than "
294c77b4085SAvihai Horon                    "number of channels %u", msg.id, migrate_multifd_channels());
295d32ca5adSJuan Quintela         return -1;
296d32ca5adSJuan Quintela     }
297d32ca5adSJuan Quintela 
298d32ca5adSJuan Quintela     return msg.id;
299d32ca5adSJuan Quintela }
300d32ca5adSJuan Quintela 
3016074f816SFabiano Rosas static MultiFDPages_t *multifd_pages_init(uint32_t n)
302d32ca5adSJuan Quintela {
303d32ca5adSJuan Quintela     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
304d32ca5adSJuan Quintela 
3056074f816SFabiano Rosas     pages->allocated = n;
3066074f816SFabiano Rosas     pages->offset = g_new0(ram_addr_t, n);
307d32ca5adSJuan Quintela 
308d32ca5adSJuan Quintela     return pages;
309d32ca5adSJuan Quintela }
310d32ca5adSJuan Quintela 
311d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages)
312d32ca5adSJuan Quintela {
313836eca47SPeter Xu     multifd_pages_reset(pages);
314d32ca5adSJuan Quintela     pages->allocated = 0;
315d32ca5adSJuan Quintela     g_free(pages->offset);
316d32ca5adSJuan Quintela     pages->offset = NULL;
317d32ca5adSJuan Quintela     g_free(pages);
318d32ca5adSJuan Quintela }
319d32ca5adSJuan Quintela 
32025a1f878SPeter Xu void multifd_send_fill_packet(MultiFDSendParams *p)
321d32ca5adSJuan Quintela {
322d32ca5adSJuan Quintela     MultiFDPacket_t *packet = p->packet;
323efd8c543SPeter Xu     MultiFDPages_t *pages = p->pages;
32498ea497dSPeter Xu     uint64_t packet_num;
325d32ca5adSJuan Quintela     int i;
326d32ca5adSJuan Quintela 
327d32ca5adSJuan Quintela     packet->flags = cpu_to_be32(p->flags);
328d32ca5adSJuan Quintela     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
329efd8c543SPeter Xu     packet->normal_pages = cpu_to_be32(pages->num);
330d32ca5adSJuan Quintela     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
33198ea497dSPeter Xu 
33298ea497dSPeter Xu     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
33398ea497dSPeter Xu     packet->packet_num = cpu_to_be64(packet_num);
334d32ca5adSJuan Quintela 
335efd8c543SPeter Xu     if (pages->block) {
336efd8c543SPeter Xu         strncpy(packet->ramblock, pages->block->idstr, 256);
337d32ca5adSJuan Quintela     }
338d32ca5adSJuan Quintela 
339efd8c543SPeter Xu     for (i = 0; i < pages->num; i++) {
340d32ca5adSJuan Quintela         /* there are architectures where ram_addr_t is 32 bit */
341efd8c543SPeter Xu         uint64_t temp = pages->offset[i];
342d32ca5adSJuan Quintela 
343d32ca5adSJuan Quintela         packet->offset[i] = cpu_to_be64(temp);
344d32ca5adSJuan Quintela     }
34505b7ec18SPeter Xu 
34605b7ec18SPeter Xu     p->packets_sent++;
347db7e1cc5SPeter Xu     p->total_normal_pages += pages->num;
3488a9ef173SPeter Xu 
34998ea497dSPeter Xu     trace_multifd_send(p->id, packet_num, pages->num, p->flags,
3508a9ef173SPeter Xu                        p->next_packet_size);
351d32ca5adSJuan Quintela }
352d32ca5adSJuan Quintela 
353d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
354d32ca5adSJuan Quintela {
355d32ca5adSJuan Quintela     MultiFDPacket_t *packet = p->packet;
356d32ca5adSJuan Quintela     int i;
357d32ca5adSJuan Quintela 
358d32ca5adSJuan Quintela     packet->magic = be32_to_cpu(packet->magic);
359d32ca5adSJuan Quintela     if (packet->magic != MULTIFD_MAGIC) {
360d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
361d32ca5adSJuan Quintela                    "magic %x and expected magic %x",
362d32ca5adSJuan Quintela                    packet->magic, MULTIFD_MAGIC);
363d32ca5adSJuan Quintela         return -1;
364d32ca5adSJuan Quintela     }
365d32ca5adSJuan Quintela 
366d32ca5adSJuan Quintela     packet->version = be32_to_cpu(packet->version);
367d32ca5adSJuan Quintela     if (packet->version != MULTIFD_VERSION) {
368d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
36904e11404SJuan Quintela                    "version %u and expected version %u",
370d32ca5adSJuan Quintela                    packet->version, MULTIFD_VERSION);
371d32ca5adSJuan Quintela         return -1;
372d32ca5adSJuan Quintela     }
373d32ca5adSJuan Quintela 
374d32ca5adSJuan Quintela     p->flags = be32_to_cpu(packet->flags);
375d32ca5adSJuan Quintela 
376d32ca5adSJuan Quintela     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
377d32ca5adSJuan Quintela     /*
378d32ca5adSJuan Quintela      * If we received a packet that is 100 times bigger than expected
379d32ca5adSJuan Quintela      * just stop migration.  It is a magic number.
380d32ca5adSJuan Quintela      */
381d6f45ebaSJuan Quintela     if (packet->pages_alloc > p->page_count) {
382d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
383cf2d4aa8SJuan Quintela                    "with size %u and expected a size of %u",
384d6f45ebaSJuan Quintela                    packet->pages_alloc, p->page_count) ;
385d32ca5adSJuan Quintela         return -1;
386d32ca5adSJuan Quintela     }
387d32ca5adSJuan Quintela 
3888c0ec0b2SJuan Quintela     p->normal_num = be32_to_cpu(packet->normal_pages);
389cf2d4aa8SJuan Quintela     if (p->normal_num > packet->pages_alloc) {
390d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
39104e11404SJuan Quintela                    "with %u pages and expected maximum pages are %u",
392cf2d4aa8SJuan Quintela                    p->normal_num, packet->pages_alloc) ;
393d32ca5adSJuan Quintela         return -1;
394d32ca5adSJuan Quintela     }
395d32ca5adSJuan Quintela 
396d32ca5adSJuan Quintela     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
397d32ca5adSJuan Quintela     p->packet_num = be64_to_cpu(packet->packet_num);
39805b7ec18SPeter Xu     p->packets_recved++;
399db7e1cc5SPeter Xu     p->total_normal_pages += p->normal_num;
400d32ca5adSJuan Quintela 
4018a9ef173SPeter Xu     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
4028a9ef173SPeter Xu                        p->next_packet_size);
4038a9ef173SPeter Xu 
404cf2d4aa8SJuan Quintela     if (p->normal_num == 0) {
405d32ca5adSJuan Quintela         return 0;
406d32ca5adSJuan Quintela     }
407d32ca5adSJuan Quintela 
408d32ca5adSJuan Quintela     /* make sure that ramblock is 0 terminated */
409d32ca5adSJuan Quintela     packet->ramblock[255] = 0;
4105d1d1fcfSLukas Straub     p->block = qemu_ram_block_by_name(packet->ramblock);
4115d1d1fcfSLukas Straub     if (!p->block) {
412d32ca5adSJuan Quintela         error_setg(errp, "multifd: unknown ram block %s",
413d32ca5adSJuan Quintela                    packet->ramblock);
414d32ca5adSJuan Quintela         return -1;
415d32ca5adSJuan Quintela     }
416d32ca5adSJuan Quintela 
4175d1d1fcfSLukas Straub     p->host = p->block->host;
418cf2d4aa8SJuan Quintela     for (i = 0; i < p->normal_num; i++) {
419d32ca5adSJuan Quintela         uint64_t offset = be64_to_cpu(packet->offset[i]);
420d32ca5adSJuan Quintela 
4215d1d1fcfSLukas Straub         if (offset > (p->block->used_length - p->page_size)) {
422d32ca5adSJuan Quintela             error_setg(errp, "multifd: offset too long %" PRIu64
423d32ca5adSJuan Quintela                        " (max " RAM_ADDR_FMT ")",
4245d1d1fcfSLukas Straub                        offset, p->block->used_length);
425d32ca5adSJuan Quintela             return -1;
426d32ca5adSJuan Quintela         }
427cf2d4aa8SJuan Quintela         p->normal[i] = offset;
428d32ca5adSJuan Quintela     }
429d32ca5adSJuan Quintela 
430d32ca5adSJuan Quintela     return 0;
431d32ca5adSJuan Quintela }
432d32ca5adSJuan Quintela 
43315f3f21dSPeter Xu static bool multifd_send_should_exit(void)
43415f3f21dSPeter Xu {
43515f3f21dSPeter Xu     return qatomic_read(&multifd_send_state->exiting);
43615f3f21dSPeter Xu }
43715f3f21dSPeter Xu 
438d32ca5adSJuan Quintela /*
43948c0f5d5SPeter Xu  * The migration thread can wait on either of the two semaphores.  This
44048c0f5d5SPeter Xu  * function can be used to kick the main thread out of waiting on either of
44148c0f5d5SPeter Xu  * them.  Should mostly only be called when something wrong happened with
44248c0f5d5SPeter Xu  * the current multifd send thread.
44348c0f5d5SPeter Xu  */
44448c0f5d5SPeter Xu static void multifd_send_kick_main(MultiFDSendParams *p)
44548c0f5d5SPeter Xu {
44648c0f5d5SPeter Xu     qemu_sem_post(&p->sem_sync);
44748c0f5d5SPeter Xu     qemu_sem_post(&multifd_send_state->channels_ready);
44848c0f5d5SPeter Xu }
44948c0f5d5SPeter Xu 
45048c0f5d5SPeter Xu /*
451d32ca5adSJuan Quintela  * How we use multifd_send_state->pages and channel->pages?
452d32ca5adSJuan Quintela  *
453d32ca5adSJuan Quintela  * We create a pages for each channel, and a main one.  Each time that
454d32ca5adSJuan Quintela  * we need to send a batch of pages we interchange the ones between
455d32ca5adSJuan Quintela  * multifd_send_state and the channel that is sending it.  There are
456d32ca5adSJuan Quintela  * two reasons for that:
457d32ca5adSJuan Quintela  *    - to not have to do so many mallocs during migration
458d32ca5adSJuan Quintela  *    - to make easier to know what to free at the end of migration
459d32ca5adSJuan Quintela  *
460d32ca5adSJuan Quintela  * This way we always know who is the owner of each "pages" struct,
461d32ca5adSJuan Quintela  * and we don't need any locking.  It belongs to the migration thread
462d32ca5adSJuan Quintela  * or to the channel thread.  Switching is safe because the migration
463d32ca5adSJuan Quintela  * thread is using the channel mutex when changing it, and the channel
464d32ca5adSJuan Quintela  * have to had finish with its own, otherwise pending_job can't be
465d32ca5adSJuan Quintela  * false.
4663b40964aSPeter Xu  *
4673b40964aSPeter Xu  * Returns true if succeed, false otherwise.
468d32ca5adSJuan Quintela  */
4693b40964aSPeter Xu static bool multifd_send_pages(void)
470d32ca5adSJuan Quintela {
471d32ca5adSJuan Quintela     int i;
472d32ca5adSJuan Quintela     static int next_channel;
473d32ca5adSJuan Quintela     MultiFDSendParams *p = NULL; /* make happy gcc */
474d32ca5adSJuan Quintela     MultiFDPages_t *pages = multifd_send_state->pages;
475d32ca5adSJuan Quintela 
47615f3f21dSPeter Xu     if (multifd_send_should_exit()) {
4773b40964aSPeter Xu         return false;
478d32ca5adSJuan Quintela     }
479d32ca5adSJuan Quintela 
480e3cce9afSPeter Xu     /* We wait here, until at least one channel is ready */
481d32ca5adSJuan Quintela     qemu_sem_wait(&multifd_send_state->channels_ready);
482e3cce9afSPeter Xu 
4837e89a140SLaurent Vivier     /*
4847e89a140SLaurent Vivier      * next_channel can remain from a previous migration that was
4857e89a140SLaurent Vivier      * using more channels, so ensure it doesn't overflow if the
4867e89a140SLaurent Vivier      * limit is lower now.
4877e89a140SLaurent Vivier      */
4887e89a140SLaurent Vivier     next_channel %= migrate_multifd_channels();
489d32ca5adSJuan Quintela     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
49015f3f21dSPeter Xu         if (multifd_send_should_exit()) {
4913b40964aSPeter Xu             return false;
492d32ca5adSJuan Quintela         }
49315f3f21dSPeter Xu         p = &multifd_send_state->params[i];
494e3cce9afSPeter Xu         /*
495e3cce9afSPeter Xu          * Lockless read to p->pending_job is safe, because only multifd
496e3cce9afSPeter Xu          * sender thread can clear it.
497e3cce9afSPeter Xu          */
498f5f48a78SPeter Xu         if (qatomic_read(&p->pending_job) == false) {
499d32ca5adSJuan Quintela             next_channel = (i + 1) % migrate_multifd_channels();
500d32ca5adSJuan Quintela             break;
501d32ca5adSJuan Quintela         }
502d32ca5adSJuan Quintela     }
503e3cce9afSPeter Xu 
504e3cce9afSPeter Xu     /*
505*488c84acSPeter Xu      * Make sure we read p->pending_job before all the rest.  Pairs with
506*488c84acSPeter Xu      * qatomic_store_release() in multifd_send_thread().
507e3cce9afSPeter Xu      */
508*488c84acSPeter Xu     smp_mb_acquire();
509*488c84acSPeter Xu     assert(!p->pages->num);
510d32ca5adSJuan Quintela     multifd_send_state->pages = p->pages;
511d32ca5adSJuan Quintela     p->pages = pages;
512*488c84acSPeter Xu     /*
513*488c84acSPeter Xu      * Making sure p->pages is setup before marking pending_job=true. Pairs
514*488c84acSPeter Xu      * with the qatomic_load_acquire() in multifd_send_thread().
515*488c84acSPeter Xu      */
516*488c84acSPeter Xu     qatomic_store_release(&p->pending_job, true);
517d32ca5adSJuan Quintela     qemu_sem_post(&p->sem);
518d32ca5adSJuan Quintela 
5193b40964aSPeter Xu     return true;
520d32ca5adSJuan Quintela }
521d32ca5adSJuan Quintela 
522f88f86c4SPeter Xu static inline bool multifd_queue_empty(MultiFDPages_t *pages)
523f88f86c4SPeter Xu {
524f88f86c4SPeter Xu     return pages->num == 0;
525f88f86c4SPeter Xu }
526f88f86c4SPeter Xu 
527f88f86c4SPeter Xu static inline bool multifd_queue_full(MultiFDPages_t *pages)
528f88f86c4SPeter Xu {
529f88f86c4SPeter Xu     return pages->num == pages->allocated;
530f88f86c4SPeter Xu }
531f88f86c4SPeter Xu 
532f88f86c4SPeter Xu static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
533f88f86c4SPeter Xu {
534f88f86c4SPeter Xu     pages->offset[pages->num++] = offset;
535f88f86c4SPeter Xu }
536f88f86c4SPeter Xu 
537d6556d17SPeter Xu /* Returns true if enqueue successful, false otherwise */
538d6556d17SPeter Xu bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
539d32ca5adSJuan Quintela {
540f88f86c4SPeter Xu     MultiFDPages_t *pages;
541d32ca5adSJuan Quintela 
542f88f86c4SPeter Xu retry:
543f88f86c4SPeter Xu     pages = multifd_send_state->pages;
544f88f86c4SPeter Xu 
545f88f86c4SPeter Xu     /* If the queue is empty, we can already enqueue now */
546f88f86c4SPeter Xu     if (multifd_queue_empty(pages)) {
547d32ca5adSJuan Quintela         pages->block = block;
548f88f86c4SPeter Xu         multifd_enqueue(pages, offset);
549d6556d17SPeter Xu         return true;
550d32ca5adSJuan Quintela     }
551d32ca5adSJuan Quintela 
552f88f86c4SPeter Xu     /*
553f88f86c4SPeter Xu      * Not empty, meanwhile we need a flush.  It can because of either:
554f88f86c4SPeter Xu      *
555f88f86c4SPeter Xu      * (1) The page is not on the same ramblock of previous ones, or,
556f88f86c4SPeter Xu      * (2) The queue is full.
557f88f86c4SPeter Xu      *
558f88f86c4SPeter Xu      * After flush, always retry.
559f88f86c4SPeter Xu      */
560f88f86c4SPeter Xu     if (pages->block != block || multifd_queue_full(pages)) {
5613b40964aSPeter Xu         if (!multifd_send_pages()) {
562d6556d17SPeter Xu             return false;
563d32ca5adSJuan Quintela         }
564f88f86c4SPeter Xu         goto retry;
565d32ca5adSJuan Quintela     }
566d32ca5adSJuan Quintela 
567f88f86c4SPeter Xu     /* Not empty, and we still have space, do it! */
568f88f86c4SPeter Xu     multifd_enqueue(pages, offset);
569d6556d17SPeter Xu     return true;
570d32ca5adSJuan Quintela }
571d32ca5adSJuan Quintela 
5723ab4441dSPeter Xu /* Multifd send side hit an error; remember it and prepare to quit */
5733ab4441dSPeter Xu static void multifd_send_set_error(Error *err)
574d32ca5adSJuan Quintela {
57515f3f21dSPeter Xu     /*
57615f3f21dSPeter Xu      * We don't want to exit each threads twice.  Depending on where
57715f3f21dSPeter Xu      * we get the error, or if there are two independent errors in two
57815f3f21dSPeter Xu      * threads at the same time, we can end calling this function
57915f3f21dSPeter Xu      * twice.
58015f3f21dSPeter Xu      */
58115f3f21dSPeter Xu     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
58215f3f21dSPeter Xu         return;
58315f3f21dSPeter Xu     }
58415f3f21dSPeter Xu 
585d32ca5adSJuan Quintela     if (err) {
586d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
587d32ca5adSJuan Quintela         migrate_set_error(s, err);
588d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
589d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
590d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_DEVICE ||
591d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
592d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
593d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
594d32ca5adSJuan Quintela         }
595d32ca5adSJuan Quintela     }
5963ab4441dSPeter Xu }
597d32ca5adSJuan Quintela 
5983ab4441dSPeter Xu static void multifd_send_terminate_threads(void)
5993ab4441dSPeter Xu {
6003ab4441dSPeter Xu     int i;
6013ab4441dSPeter Xu 
6023ab4441dSPeter Xu     trace_multifd_send_terminate_threads();
6033ab4441dSPeter Xu 
6043ab4441dSPeter Xu     /*
6053ab4441dSPeter Xu      * Tell everyone we're quitting.  No xchg() needed here; we simply
6063ab4441dSPeter Xu      * always set it.
6073ab4441dSPeter Xu      */
6083ab4441dSPeter Xu     qatomic_set(&multifd_send_state->exiting, 1);
60912808db3SPeter Xu 
61012808db3SPeter Xu     /*
61112808db3SPeter Xu      * Firstly, kick all threads out; no matter whether they are just idle,
61212808db3SPeter Xu      * or blocked in an IO system call.
61312808db3SPeter Xu      */
614d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
615d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
616d32ca5adSJuan Quintela 
617d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
618077fbb59SLi Zhang         if (p->c) {
619077fbb59SLi Zhang             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
620077fbb59SLi Zhang         }
621d32ca5adSJuan Quintela     }
62212808db3SPeter Xu 
62312808db3SPeter Xu     /*
62412808db3SPeter Xu      * Finally recycle all the threads.
62512808db3SPeter Xu      *
62612808db3SPeter Xu      * TODO: p->running is still buggy, e.g. we can reach here without the
62712808db3SPeter Xu      * corresponding multifd_new_send_channel_async() get invoked yet,
62812808db3SPeter Xu      * then a new thread can even be created after this function returns.
62912808db3SPeter Xu      */
63012808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
63112808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
63212808db3SPeter Xu 
63312808db3SPeter Xu         if (p->running) {
63412808db3SPeter Xu             qemu_thread_join(&p->thread);
63512808db3SPeter Xu         }
63612808db3SPeter Xu     }
637d32ca5adSJuan Quintela }
638d32ca5adSJuan Quintela 
6390e92f644SFabiano Rosas static int multifd_send_channel_destroy(QIOChannel *send)
6400e92f644SFabiano Rosas {
6410e92f644SFabiano Rosas     return socket_send_channel_destroy(send);
6420e92f644SFabiano Rosas }
6430e92f644SFabiano Rosas 
64412808db3SPeter Xu static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
645d32ca5adSJuan Quintela {
64620171ea8SLukas Straub     if (p->registered_yank) {
64720171ea8SLukas Straub         migration_ioc_unregister_yank(p->c);
64820171ea8SLukas Straub     }
6490e92f644SFabiano Rosas     multifd_send_channel_destroy(p->c);
650d32ca5adSJuan Quintela     p->c = NULL;
651d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem);
652d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem_sync);
653d32ca5adSJuan Quintela     g_free(p->name);
654d32ca5adSJuan Quintela     p->name = NULL;
655d32ca5adSJuan Quintela     multifd_pages_clear(p->pages);
656d32ca5adSJuan Quintela     p->pages = NULL;
657d32ca5adSJuan Quintela     p->packet_len = 0;
658d32ca5adSJuan Quintela     g_free(p->packet);
659d32ca5adSJuan Quintela     p->packet = NULL;
660226468baSJuan Quintela     g_free(p->iov);
661226468baSJuan Quintela     p->iov = NULL;
66212808db3SPeter Xu     multifd_send_state->ops->send_cleanup(p, errp);
66312808db3SPeter Xu 
66412808db3SPeter Xu     return *errp == NULL;
665ab7cbb0bSJuan Quintela }
66612808db3SPeter Xu 
66712808db3SPeter Xu static void multifd_send_cleanup_state(void)
66812808db3SPeter Xu {
669d32ca5adSJuan Quintela     qemu_sem_destroy(&multifd_send_state->channels_ready);
670d32ca5adSJuan Quintela     g_free(multifd_send_state->params);
671d32ca5adSJuan Quintela     multifd_send_state->params = NULL;
672d32ca5adSJuan Quintela     multifd_pages_clear(multifd_send_state->pages);
673d32ca5adSJuan Quintela     multifd_send_state->pages = NULL;
674d32ca5adSJuan Quintela     g_free(multifd_send_state);
675d32ca5adSJuan Quintela     multifd_send_state = NULL;
676d32ca5adSJuan Quintela }
677d32ca5adSJuan Quintela 
678cde85c37SPeter Xu void multifd_send_shutdown(void)
67912808db3SPeter Xu {
68012808db3SPeter Xu     int i;
68112808db3SPeter Xu 
68212808db3SPeter Xu     if (!migrate_multifd()) {
68312808db3SPeter Xu         return;
68412808db3SPeter Xu     }
68512808db3SPeter Xu 
68612808db3SPeter Xu     multifd_send_terminate_threads();
68712808db3SPeter Xu 
68812808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
68912808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
69012808db3SPeter Xu         Error *local_err = NULL;
69112808db3SPeter Xu 
69212808db3SPeter Xu         if (!multifd_send_cleanup_channel(p, &local_err)) {
69312808db3SPeter Xu             migrate_set_error(migrate_get_current(), local_err);
69412808db3SPeter Xu             error_free(local_err);
69512808db3SPeter Xu         }
69612808db3SPeter Xu     }
69712808db3SPeter Xu 
69812808db3SPeter Xu     multifd_send_cleanup_state();
69912808db3SPeter Xu }
70012808db3SPeter Xu 
7014cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c)
7024cc47b43SLeonardo Bras {
7034cc47b43SLeonardo Bras     int ret;
7044cc47b43SLeonardo Bras     Error *err = NULL;
7054cc47b43SLeonardo Bras 
7064cc47b43SLeonardo Bras     ret = qio_channel_flush(c, &err);
7074cc47b43SLeonardo Bras     if (ret < 0) {
7084cc47b43SLeonardo Bras         error_report_err(err);
7094cc47b43SLeonardo Bras         return -1;
7104cc47b43SLeonardo Bras     }
7114cc47b43SLeonardo Bras     if (ret == 1) {
712aff3f660SJuan Quintela         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
7134cc47b43SLeonardo Bras     }
7144cc47b43SLeonardo Bras 
7154cc47b43SLeonardo Bras     return ret;
7164cc47b43SLeonardo Bras }
7174cc47b43SLeonardo Bras 
7189346fa18SFabiano Rosas int multifd_send_sync_main(void)
719d32ca5adSJuan Quintela {
720d32ca5adSJuan Quintela     int i;
7215b1d9babSLeonardo Bras     bool flush_zero_copy;
722d32ca5adSJuan Quintela 
72351b07548SJuan Quintela     if (!migrate_multifd()) {
72433d70973SLeonardo Bras         return 0;
725d32ca5adSJuan Quintela     }
72690a3d2f9SJuan Quintela     if (multifd_send_state->pages->num) {
7273b40964aSPeter Xu         if (!multifd_send_pages()) {
728d32ca5adSJuan Quintela             error_report("%s: multifd_send_pages fail", __func__);
72933d70973SLeonardo Bras             return -1;
730d32ca5adSJuan Quintela         }
731d32ca5adSJuan Quintela     }
7325b1d9babSLeonardo Bras 
733b4bc342cSJuan Quintela     flush_zero_copy = migrate_zero_copy_send();
7345b1d9babSLeonardo Bras 
735d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
736d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
737d32ca5adSJuan Quintela 
73815f3f21dSPeter Xu         if (multifd_send_should_exit()) {
73933d70973SLeonardo Bras             return -1;
740d32ca5adSJuan Quintela         }
741d32ca5adSJuan Quintela 
74215f3f21dSPeter Xu         trace_multifd_send_sync_main_signal(p->id);
74315f3f21dSPeter Xu 
744f5f48a78SPeter Xu         /*
745f5f48a78SPeter Xu          * We should be the only user so far, so not possible to be set by
746f5f48a78SPeter Xu          * others concurrently.
747f5f48a78SPeter Xu          */
748f5f48a78SPeter Xu         assert(qatomic_read(&p->pending_sync) == false);
749f5f48a78SPeter Xu         qatomic_set(&p->pending_sync, true);
750d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
751d32ca5adSJuan Quintela     }
752d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
753d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
754d32ca5adSJuan Quintela 
75515f3f21dSPeter Xu         if (multifd_send_should_exit()) {
75615f3f21dSPeter Xu             return -1;
75715f3f21dSPeter Xu         }
75815f3f21dSPeter Xu 
759d2026ee1SJuan Quintela         qemu_sem_wait(&multifd_send_state->channels_ready);
760d32ca5adSJuan Quintela         trace_multifd_send_sync_main_wait(p->id);
761d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem_sync);
762ebfc5787SZhenzhong Duan 
763ebfc5787SZhenzhong Duan         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
764ebfc5787SZhenzhong Duan             return -1;
765ebfc5787SZhenzhong Duan         }
766d32ca5adSJuan Quintela     }
767d32ca5adSJuan Quintela     trace_multifd_send_sync_main(multifd_send_state->packet_num);
76833d70973SLeonardo Bras 
76933d70973SLeonardo Bras     return 0;
770d32ca5adSJuan Quintela }
771d32ca5adSJuan Quintela 
772d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque)
773d32ca5adSJuan Quintela {
774d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
7751b1f4ab6SJiang Jiacheng     MigrationThread *thread = NULL;
776d32ca5adSJuan Quintela     Error *local_err = NULL;
777d32ca5adSJuan Quintela     int ret = 0;
778d32ca5adSJuan Quintela 
779788fa680SFabiano Rosas     thread = migration_threads_add(p->name, qemu_get_thread_id());
7801b1f4ab6SJiang Jiacheng 
781d32ca5adSJuan Quintela     trace_multifd_send_thread_start(p->id);
782d32ca5adSJuan Quintela     rcu_register_thread();
783d32ca5adSJuan Quintela 
784d32ca5adSJuan Quintela     if (multifd_send_initial_packet(p, &local_err) < 0) {
785d32ca5adSJuan Quintela         ret = -1;
786d32ca5adSJuan Quintela         goto out;
787d32ca5adSJuan Quintela     }
788d32ca5adSJuan Quintela 
789d32ca5adSJuan Quintela     while (true) {
790d2026ee1SJuan Quintela         qemu_sem_post(&multifd_send_state->channels_ready);
791d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem);
792d32ca5adSJuan Quintela 
79315f3f21dSPeter Xu         if (multifd_send_should_exit()) {
794d32ca5adSJuan Quintela             break;
795d32ca5adSJuan Quintela         }
796d32ca5adSJuan Quintela 
797*488c84acSPeter Xu         /*
798*488c84acSPeter Xu          * Read pending_job flag before p->pages.  Pairs with the
799*488c84acSPeter Xu          * qatomic_store_release() in multifd_send_pages().
800*488c84acSPeter Xu          */
801*488c84acSPeter Xu         if (qatomic_load_acquire(&p->pending_job)) {
802efd8c543SPeter Xu             MultiFDPages_t *pages = p->pages;
803d32ca5adSJuan Quintela 
804b7dbdd8eSLeonardo Bras             p->iovs_num = 0;
80583c560fbSPeter Xu             assert(pages->num);
80683c560fbSPeter Xu 
80702fb8104SJuan Quintela             ret = multifd_send_state->ops->send_prepare(p, &local_err);
808ab7cbb0bSJuan Quintela             if (ret != 0) {
809ab7cbb0bSJuan Quintela                 break;
810ab7cbb0bSJuan Quintela             }
81183c560fbSPeter Xu 
8125b1d9babSLeonardo Bras             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
8135b1d9babSLeonardo Bras                                               0, p->write_flags, &local_err);
814d32ca5adSJuan Quintela             if (ret != 0) {
815d32ca5adSJuan Quintela                 break;
816d32ca5adSJuan Quintela             }
817d32ca5adSJuan Quintela 
81868b6e000SElena Ufimtseva             stat64_add(&mig_stats.multifd_bytes,
81968b6e000SElena Ufimtseva                        p->next_packet_size + p->packet_len);
820836eca47SPeter Xu 
821836eca47SPeter Xu             multifd_pages_reset(p->pages);
8221618f552SElena Ufimtseva             p->next_packet_size = 0;
823*488c84acSPeter Xu 
824*488c84acSPeter Xu             /*
825*488c84acSPeter Xu              * Making sure p->pages is published before saying "we're
826*488c84acSPeter Xu              * free".  Pairs with the smp_mb_acquire() in
827*488c84acSPeter Xu              * multifd_send_pages().
828*488c84acSPeter Xu              */
829*488c84acSPeter Xu             qatomic_store_release(&p->pending_job, false);
830859ebaf3SPeter Xu         } else {
831*488c84acSPeter Xu             /*
832*488c84acSPeter Xu              * If not a normal job, must be a sync request.  Note that
833*488c84acSPeter Xu              * pending_sync is a standalone flag (unlike pending_job), so
834*488c84acSPeter Xu              * it doesn't require explicit memory barriers.
835*488c84acSPeter Xu              */
836859ebaf3SPeter Xu             assert(qatomic_read(&p->pending_sync));
837f5f48a78SPeter Xu             p->flags = MULTIFD_FLAG_SYNC;
838f5f48a78SPeter Xu             multifd_send_fill_packet(p);
839f5f48a78SPeter Xu             ret = qio_channel_write_all(p->c, (void *)p->packet,
840f5f48a78SPeter Xu                                         p->packet_len, &local_err);
841f5f48a78SPeter Xu             if (ret != 0) {
842f5f48a78SPeter Xu                 break;
843d32ca5adSJuan Quintela             }
844f5f48a78SPeter Xu             /* p->next_packet_size will always be zero for a SYNC packet */
845f5f48a78SPeter Xu             stat64_add(&mig_stats.multifd_bytes, p->packet_len);
846f5f48a78SPeter Xu             p->flags = 0;
847f5f48a78SPeter Xu             qatomic_set(&p->pending_sync, false);
848f5f48a78SPeter Xu             qemu_sem_post(&p->sem_sync);
849d32ca5adSJuan Quintela         }
850d32ca5adSJuan Quintela     }
851d32ca5adSJuan Quintela 
852d32ca5adSJuan Quintela out:
853ee8a7c9cSFabiano Rosas     if (ret) {
854ee8a7c9cSFabiano Rosas         assert(local_err);
855d32ca5adSJuan Quintela         trace_multifd_send_error(p->id);
8563ab4441dSPeter Xu         multifd_send_set_error(local_err);
85748c0f5d5SPeter Xu         multifd_send_kick_main(p);
858ee8a7c9cSFabiano Rosas         error_free(local_err);
859d32ca5adSJuan Quintela     }
860d32ca5adSJuan Quintela 
861d32ca5adSJuan Quintela     p->running = false;
862d32ca5adSJuan Quintela     rcu_unregister_thread();
863788fa680SFabiano Rosas     migration_threads_remove(thread);
86405b7ec18SPeter Xu     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
865d32ca5adSJuan Quintela 
866d32ca5adSJuan Quintela     return NULL;
867d32ca5adSJuan Quintela }
868d32ca5adSJuan Quintela 
86929647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p,
87029647140SChuan Zheng                                     QIOChannel *ioc,
871967e3889SFabiano Rosas                                     Error **errp);
87229647140SChuan Zheng 
87329647140SChuan Zheng static void multifd_tls_outgoing_handshake(QIOTask *task,
87429647140SChuan Zheng                                            gpointer opaque)
87529647140SChuan Zheng {
87629647140SChuan Zheng     MultiFDSendParams *p = opaque;
87729647140SChuan Zheng     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
87829647140SChuan Zheng     Error *err = NULL;
87929647140SChuan Zheng 
880967e3889SFabiano Rosas     if (!qio_task_propagate_error(task, &err)) {
881894f0214SChuan Zheng         trace_multifd_tls_outgoing_handshake_complete(ioc);
882967e3889SFabiano Rosas         if (multifd_channel_connect(p, ioc, &err)) {
883967e3889SFabiano Rosas             return;
884967e3889SFabiano Rosas         }
885894f0214SChuan Zheng     }
886fca67642SHao Wang 
887967e3889SFabiano Rosas     trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
888967e3889SFabiano Rosas 
8893ab4441dSPeter Xu     multifd_send_set_error(err);
89048c0f5d5SPeter Xu     multifd_send_kick_main(p);
8916ae208ceSAvihai Horon     error_free(err);
892fca67642SHao Wang }
89329647140SChuan Zheng 
894a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque)
895a1af605bSChuan Zheng {
896a1af605bSChuan Zheng     MultiFDSendParams *p = opaque;
897a1af605bSChuan Zheng     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
898a1af605bSChuan Zheng 
899a1af605bSChuan Zheng     qio_channel_tls_handshake(tioc,
900a1af605bSChuan Zheng                               multifd_tls_outgoing_handshake,
901a1af605bSChuan Zheng                               p,
902a1af605bSChuan Zheng                               NULL,
903a1af605bSChuan Zheng                               NULL);
904a1af605bSChuan Zheng     return NULL;
905a1af605bSChuan Zheng }
906a1af605bSChuan Zheng 
907967e3889SFabiano Rosas static bool multifd_tls_channel_connect(MultiFDSendParams *p,
90829647140SChuan Zheng                                         QIOChannel *ioc,
90929647140SChuan Zheng                                         Error **errp)
91029647140SChuan Zheng {
91129647140SChuan Zheng     MigrationState *s = migrate_get_current();
9127f692ec7SPeter Xu     const char *hostname = s->hostname;
91329647140SChuan Zheng     QIOChannelTLS *tioc;
91429647140SChuan Zheng 
9150deb7e9bSJuan Quintela     tioc = migration_tls_client_create(ioc, hostname, errp);
91629647140SChuan Zheng     if (!tioc) {
917967e3889SFabiano Rosas         return false;
91829647140SChuan Zheng     }
91929647140SChuan Zheng 
9209e842408SChuan Zheng     object_unref(OBJECT(ioc));
921894f0214SChuan Zheng     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
92229647140SChuan Zheng     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
923a1af605bSChuan Zheng     p->c = QIO_CHANNEL(tioc);
924a1af605bSChuan Zheng     qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
925a1af605bSChuan Zheng                        multifd_tls_handshake_thread, p,
926a1af605bSChuan Zheng                        QEMU_THREAD_JOINABLE);
927967e3889SFabiano Rosas     return true;
92829647140SChuan Zheng }
92929647140SChuan Zheng 
93029647140SChuan Zheng static bool multifd_channel_connect(MultiFDSendParams *p,
93129647140SChuan Zheng                                     QIOChannel *ioc,
932967e3889SFabiano Rosas                                     Error **errp)
93329647140SChuan Zheng {
934894f0214SChuan Zheng     trace_multifd_set_outgoing_channel(
9357f692ec7SPeter Xu         ioc, object_get_typename(OBJECT(ioc)),
936967e3889SFabiano Rosas         migrate_get_current()->hostname);
937894f0214SChuan Zheng 
93885a8578eSPeter Xu     if (migrate_channel_requires_tls_upgrade(ioc)) {
93929647140SChuan Zheng         /*
94029647140SChuan Zheng          * tls_channel_connect will call back to this
94129647140SChuan Zheng          * function after the TLS handshake,
94229647140SChuan Zheng          * so we mustn't call multifd_send_thread until then
94329647140SChuan Zheng          */
944967e3889SFabiano Rosas         return multifd_tls_channel_connect(p, ioc, errp);
945a4395f5dSAvihai Horon     }
946967e3889SFabiano Rosas 
94720171ea8SLukas Straub     migration_ioc_register_yank(ioc);
94820171ea8SLukas Straub     p->registered_yank = true;
94929647140SChuan Zheng     p->c = ioc;
95029647140SChuan Zheng     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
95129647140SChuan Zheng                        QEMU_THREAD_JOINABLE);
952a339149aSHao Wang     return true;
95329647140SChuan Zheng }
95429647140SChuan Zheng 
955d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
956d32ca5adSJuan Quintela {
957d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
9580e92f644SFabiano Rosas     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
959d32ca5adSJuan Quintela     Error *local_err = NULL;
960d32ca5adSJuan Quintela 
961d32ca5adSJuan Quintela     trace_multifd_new_send_channel_async(p->id);
962bca762c2SLi Zhang     if (!qio_task_propagate_error(task, &local_err)) {
9630a08c794SFabiano Rosas         qio_channel_set_delay(ioc, false);
964d32ca5adSJuan Quintela         p->running = true;
965967e3889SFabiano Rosas         if (multifd_channel_connect(p, ioc, &local_err)) {
96603c7a42dSChuan Zheng             return;
967d32ca5adSJuan Quintela         }
968bca762c2SLi Zhang     }
96903c7a42dSChuan Zheng 
970967e3889SFabiano Rosas     trace_multifd_new_send_channel_async_error(p->id, local_err);
9713ab4441dSPeter Xu     multifd_send_set_error(local_err);
97215f3f21dSPeter Xu     multifd_send_kick_main(p);
97315f3f21dSPeter Xu     object_unref(OBJECT(ioc));
97415f3f21dSPeter Xu     error_free(local_err);
9750e92f644SFabiano Rosas }
9760e92f644SFabiano Rosas 
9770e92f644SFabiano Rosas static void multifd_new_send_channel_create(gpointer opaque)
9780e92f644SFabiano Rosas {
9790e92f644SFabiano Rosas     socket_send_channel_create(multifd_new_send_channel_async, opaque);
980d32ca5adSJuan Quintela }
981d32ca5adSJuan Quintela 
982cde85c37SPeter Xu int multifd_send_setup(Error **errp)
983d32ca5adSJuan Quintela {
984d32ca5adSJuan Quintela     int thread_count;
985d32ca5adSJuan Quintela     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
986d32ca5adSJuan Quintela     uint8_t i;
987d32ca5adSJuan Quintela 
98851b07548SJuan Quintela     if (!migrate_multifd()) {
989d32ca5adSJuan Quintela         return 0;
990d32ca5adSJuan Quintela     }
991b7acd657SLi Zhijian 
992d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
993d32ca5adSJuan Quintela     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
994d32ca5adSJuan Quintela     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
995d32ca5adSJuan Quintela     multifd_send_state->pages = multifd_pages_init(page_count);
996d32ca5adSJuan Quintela     qemu_sem_init(&multifd_send_state->channels_ready, 0);
997d73415a3SStefan Hajnoczi     qatomic_set(&multifd_send_state->exiting, 0);
998ab7cbb0bSJuan Quintela     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
999d32ca5adSJuan Quintela 
1000d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1001d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
1002d32ca5adSJuan Quintela 
1003d32ca5adSJuan Quintela         qemu_sem_init(&p->sem, 0);
1004d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1005d32ca5adSJuan Quintela         p->id = i;
1006d32ca5adSJuan Quintela         p->pages = multifd_pages_init(page_count);
1007d32ca5adSJuan Quintela         p->packet_len = sizeof(MultiFDPacket_t)
1008d32ca5adSJuan Quintela                       + sizeof(uint64_t) * page_count;
1009d32ca5adSJuan Quintela         p->packet = g_malloc0(p->packet_len);
1010d32ca5adSJuan Quintela         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1011d32ca5adSJuan Quintela         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1012d32ca5adSJuan Quintela         p->name = g_strdup_printf("multifdsend_%d", i);
1013d48c3a04SJuan Quintela         /* We need one extra place for the packet header */
1014d48c3a04SJuan Quintela         p->iov = g_new0(struct iovec, page_count + 1);
1015ddec20f8SJuan Quintela         p->page_size = qemu_target_page_size();
1016d6f45ebaSJuan Quintela         p->page_count = page_count;
10175b1d9babSLeonardo Bras         p->write_flags = 0;
10180e92f644SFabiano Rosas         multifd_new_send_channel_create(p);
1019d32ca5adSJuan Quintela     }
1020ab7cbb0bSJuan Quintela 
1021ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1022ab7cbb0bSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
1023ab7cbb0bSJuan Quintela         int ret;
1024ab7cbb0bSJuan Quintela 
10253fc58efaSAvihai Horon         ret = multifd_send_state->ops->send_setup(p, errp);
1026ab7cbb0bSJuan Quintela         if (ret) {
1027ab7cbb0bSJuan Quintela             return ret;
1028ab7cbb0bSJuan Quintela         }
1029ab7cbb0bSJuan Quintela     }
1030d32ca5adSJuan Quintela     return 0;
1031d32ca5adSJuan Quintela }
1032d32ca5adSJuan Quintela 
1033d32ca5adSJuan Quintela struct {
1034d32ca5adSJuan Quintela     MultiFDRecvParams *params;
1035d32ca5adSJuan Quintela     /* number of created threads */
1036d32ca5adSJuan Quintela     int count;
1037d32ca5adSJuan Quintela     /* syncs main thread and channels */
1038d32ca5adSJuan Quintela     QemuSemaphore sem_sync;
1039d32ca5adSJuan Quintela     /* global number of generated multifd packets */
1040d32ca5adSJuan Quintela     uint64_t packet_num;
1041ab7cbb0bSJuan Quintela     /* multifd ops */
1042ab7cbb0bSJuan Quintela     MultiFDMethods *ops;
1043d32ca5adSJuan Quintela } *multifd_recv_state;
1044d32ca5adSJuan Quintela 
1045d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err)
1046d32ca5adSJuan Quintela {
1047d32ca5adSJuan Quintela     int i;
1048d32ca5adSJuan Quintela 
1049d32ca5adSJuan Quintela     trace_multifd_recv_terminate_threads(err != NULL);
1050d32ca5adSJuan Quintela 
1051d32ca5adSJuan Quintela     if (err) {
1052d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
1053d32ca5adSJuan Quintela         migrate_set_error(s, err);
1054d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
1055d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
1056d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
1057d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
1058d32ca5adSJuan Quintela         }
1059d32ca5adSJuan Quintela     }
1060d32ca5adSJuan Quintela 
1061d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1062d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1063d32ca5adSJuan Quintela 
1064d32ca5adSJuan Quintela         qemu_mutex_lock(&p->mutex);
1065d32ca5adSJuan Quintela         p->quit = true;
1066d32ca5adSJuan Quintela         /*
1067d32ca5adSJuan Quintela          * We could arrive here for two reasons:
1068d32ca5adSJuan Quintela          *  - normal quit, i.e. everything went fine, just finished
1069d32ca5adSJuan Quintela          *  - error quit: We close the channels so the channel threads
1070d32ca5adSJuan Quintela          *    finish the qio_channel_read_all_eof()
1071d32ca5adSJuan Quintela          */
1072d32ca5adSJuan Quintela         if (p->c) {
1073d32ca5adSJuan Quintela             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1074d32ca5adSJuan Quintela         }
1075d32ca5adSJuan Quintela         qemu_mutex_unlock(&p->mutex);
1076d32ca5adSJuan Quintela     }
1077d32ca5adSJuan Quintela }
1078d32ca5adSJuan Quintela 
1079cde85c37SPeter Xu void multifd_recv_shutdown(void)
1080cfc3bcf3SLeonardo Bras {
108151b07548SJuan Quintela     if (migrate_multifd()) {
1082cfc3bcf3SLeonardo Bras         multifd_recv_terminate_threads(NULL);
1083cfc3bcf3SLeonardo Bras     }
1084cfc3bcf3SLeonardo Bras }
1085cfc3bcf3SLeonardo Bras 
10865e6ea8a1SPeter Xu static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
10875e6ea8a1SPeter Xu {
10885e6ea8a1SPeter Xu     migration_ioc_unregister_yank(p->c);
10895e6ea8a1SPeter Xu     object_unref(OBJECT(p->c));
10905e6ea8a1SPeter Xu     p->c = NULL;
10915e6ea8a1SPeter Xu     qemu_mutex_destroy(&p->mutex);
10925e6ea8a1SPeter Xu     qemu_sem_destroy(&p->sem_sync);
10935e6ea8a1SPeter Xu     g_free(p->name);
10945e6ea8a1SPeter Xu     p->name = NULL;
10955e6ea8a1SPeter Xu     p->packet_len = 0;
10965e6ea8a1SPeter Xu     g_free(p->packet);
10975e6ea8a1SPeter Xu     p->packet = NULL;
10985e6ea8a1SPeter Xu     g_free(p->iov);
10995e6ea8a1SPeter Xu     p->iov = NULL;
11005e6ea8a1SPeter Xu     g_free(p->normal);
11015e6ea8a1SPeter Xu     p->normal = NULL;
11025e6ea8a1SPeter Xu     multifd_recv_state->ops->recv_cleanup(p);
11035e6ea8a1SPeter Xu }
11045e6ea8a1SPeter Xu 
11055e6ea8a1SPeter Xu static void multifd_recv_cleanup_state(void)
11065e6ea8a1SPeter Xu {
11075e6ea8a1SPeter Xu     qemu_sem_destroy(&multifd_recv_state->sem_sync);
11085e6ea8a1SPeter Xu     g_free(multifd_recv_state->params);
11095e6ea8a1SPeter Xu     multifd_recv_state->params = NULL;
11105e6ea8a1SPeter Xu     g_free(multifd_recv_state);
11115e6ea8a1SPeter Xu     multifd_recv_state = NULL;
11125e6ea8a1SPeter Xu }
11135e6ea8a1SPeter Xu 
1114cde85c37SPeter Xu void multifd_recv_cleanup(void)
1115d32ca5adSJuan Quintela {
1116d32ca5adSJuan Quintela     int i;
1117d32ca5adSJuan Quintela 
111851b07548SJuan Quintela     if (!migrate_multifd()) {
1119e5bac1f5SLeonardo Bras         return;
1120d32ca5adSJuan Quintela     }
1121d32ca5adSJuan Quintela     multifd_recv_terminate_threads(NULL);
1122d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1123d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1124d32ca5adSJuan Quintela 
1125d32ca5adSJuan Quintela         if (p->running) {
1126d32ca5adSJuan Quintela             /*
1127d32ca5adSJuan Quintela              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1128d32ca5adSJuan Quintela              * however try to wakeup it without harm in cleanup phase.
1129d32ca5adSJuan Quintela              */
1130d32ca5adSJuan Quintela             qemu_sem_post(&p->sem_sync);
1131d32ca5adSJuan Quintela         }
113210351fbaSLeonardo Bras 
113310351fbaSLeonardo Bras         qemu_thread_join(&p->thread);
1134d32ca5adSJuan Quintela     }
1135d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
11365e6ea8a1SPeter Xu         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1137d32ca5adSJuan Quintela     }
11385e6ea8a1SPeter Xu     multifd_recv_cleanup_state();
1139d32ca5adSJuan Quintela }
1140d32ca5adSJuan Quintela 
1141d32ca5adSJuan Quintela void multifd_recv_sync_main(void)
1142d32ca5adSJuan Quintela {
1143d32ca5adSJuan Quintela     int i;
1144d32ca5adSJuan Quintela 
114551b07548SJuan Quintela     if (!migrate_multifd()) {
1146d32ca5adSJuan Quintela         return;
1147d32ca5adSJuan Quintela     }
1148d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1149d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1150d32ca5adSJuan Quintela 
1151d32ca5adSJuan Quintela         trace_multifd_recv_sync_main_wait(p->id);
1152d32ca5adSJuan Quintela         qemu_sem_wait(&multifd_recv_state->sem_sync);
1153d32ca5adSJuan Quintela     }
1154d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1155d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1156d32ca5adSJuan Quintela 
11576e8a355dSDaniel Brodsky         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1158d32ca5adSJuan Quintela             if (multifd_recv_state->packet_num < p->packet_num) {
1159d32ca5adSJuan Quintela                 multifd_recv_state->packet_num = p->packet_num;
1160d32ca5adSJuan Quintela             }
11616e8a355dSDaniel Brodsky         }
1162d32ca5adSJuan Quintela         trace_multifd_recv_sync_main_signal(p->id);
1163d32ca5adSJuan Quintela         qemu_sem_post(&p->sem_sync);
1164d32ca5adSJuan Quintela     }
1165d32ca5adSJuan Quintela     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1166d32ca5adSJuan Quintela }
1167d32ca5adSJuan Quintela 
1168d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque)
1169d32ca5adSJuan Quintela {
1170d32ca5adSJuan Quintela     MultiFDRecvParams *p = opaque;
1171d32ca5adSJuan Quintela     Error *local_err = NULL;
1172d32ca5adSJuan Quintela     int ret;
1173d32ca5adSJuan Quintela 
1174d32ca5adSJuan Quintela     trace_multifd_recv_thread_start(p->id);
1175d32ca5adSJuan Quintela     rcu_register_thread();
1176d32ca5adSJuan Quintela 
1177d32ca5adSJuan Quintela     while (true) {
1178d32ca5adSJuan Quintela         uint32_t flags;
1179d32ca5adSJuan Quintela 
1180d32ca5adSJuan Quintela         if (p->quit) {
1181d32ca5adSJuan Quintela             break;
1182d32ca5adSJuan Quintela         }
1183d32ca5adSJuan Quintela 
1184d32ca5adSJuan Quintela         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1185d32ca5adSJuan Quintela                                        p->packet_len, &local_err);
1186bca762c2SLi Zhang         if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1187d32ca5adSJuan Quintela             break;
1188d32ca5adSJuan Quintela         }
1189d32ca5adSJuan Quintela 
1190d32ca5adSJuan Quintela         qemu_mutex_lock(&p->mutex);
1191d32ca5adSJuan Quintela         ret = multifd_recv_unfill_packet(p, &local_err);
1192d32ca5adSJuan Quintela         if (ret) {
1193d32ca5adSJuan Quintela             qemu_mutex_unlock(&p->mutex);
1194d32ca5adSJuan Quintela             break;
1195d32ca5adSJuan Quintela         }
1196d32ca5adSJuan Quintela 
1197d32ca5adSJuan Quintela         flags = p->flags;
1198ab7cbb0bSJuan Quintela         /* recv methods don't know how to handle the SYNC flag */
1199ab7cbb0bSJuan Quintela         p->flags &= ~MULTIFD_FLAG_SYNC;
1200d32ca5adSJuan Quintela         qemu_mutex_unlock(&p->mutex);
1201d32ca5adSJuan Quintela 
1202cf2d4aa8SJuan Quintela         if (p->normal_num) {
120340a4bfe9SJuan Quintela             ret = multifd_recv_state->ops->recv_pages(p, &local_err);
1204d32ca5adSJuan Quintela             if (ret != 0) {
1205d32ca5adSJuan Quintela                 break;
1206d32ca5adSJuan Quintela             }
1207d32ca5adSJuan Quintela         }
1208d32ca5adSJuan Quintela 
1209d32ca5adSJuan Quintela         if (flags & MULTIFD_FLAG_SYNC) {
1210d32ca5adSJuan Quintela             qemu_sem_post(&multifd_recv_state->sem_sync);
1211d32ca5adSJuan Quintela             qemu_sem_wait(&p->sem_sync);
1212d32ca5adSJuan Quintela         }
1213d32ca5adSJuan Quintela     }
1214d32ca5adSJuan Quintela 
1215d32ca5adSJuan Quintela     if (local_err) {
1216d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
121713f2cb21SPan Nengyuan         error_free(local_err);
1218d32ca5adSJuan Quintela     }
1219d32ca5adSJuan Quintela     qemu_mutex_lock(&p->mutex);
1220d32ca5adSJuan Quintela     p->running = false;
1221d32ca5adSJuan Quintela     qemu_mutex_unlock(&p->mutex);
1222d32ca5adSJuan Quintela 
1223d32ca5adSJuan Quintela     rcu_unregister_thread();
122405b7ec18SPeter Xu     trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
1225d32ca5adSJuan Quintela 
1226d32ca5adSJuan Quintela     return NULL;
1227d32ca5adSJuan Quintela }
1228d32ca5adSJuan Quintela 
1229cde85c37SPeter Xu int multifd_recv_setup(Error **errp)
1230d32ca5adSJuan Quintela {
1231d32ca5adSJuan Quintela     int thread_count;
1232d32ca5adSJuan Quintela     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1233d32ca5adSJuan Quintela     uint8_t i;
1234d32ca5adSJuan Quintela 
12356720c2b3Smanish.mishra     /*
12366720c2b3Smanish.mishra      * Return successfully if multiFD recv state is already initialised
12376720c2b3Smanish.mishra      * or multiFD is not enabled.
12386720c2b3Smanish.mishra      */
123951b07548SJuan Quintela     if (multifd_recv_state || !migrate_multifd()) {
1240d32ca5adSJuan Quintela         return 0;
1241d32ca5adSJuan Quintela     }
12426720c2b3Smanish.mishra 
1243d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
1244d32ca5adSJuan Quintela     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1245d32ca5adSJuan Quintela     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1246d73415a3SStefan Hajnoczi     qatomic_set(&multifd_recv_state->count, 0);
1247d32ca5adSJuan Quintela     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1248ab7cbb0bSJuan Quintela     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1249d32ca5adSJuan Quintela 
1250d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1251d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1252d32ca5adSJuan Quintela 
1253d32ca5adSJuan Quintela         qemu_mutex_init(&p->mutex);
1254d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1255d32ca5adSJuan Quintela         p->quit = false;
1256d32ca5adSJuan Quintela         p->id = i;
1257d32ca5adSJuan Quintela         p->packet_len = sizeof(MultiFDPacket_t)
1258d32ca5adSJuan Quintela                       + sizeof(uint64_t) * page_count;
1259d32ca5adSJuan Quintela         p->packet = g_malloc0(p->packet_len);
1260d32ca5adSJuan Quintela         p->name = g_strdup_printf("multifdrecv_%d", i);
1261226468baSJuan Quintela         p->iov = g_new0(struct iovec, page_count);
1262cf2d4aa8SJuan Quintela         p->normal = g_new0(ram_addr_t, page_count);
1263d6f45ebaSJuan Quintela         p->page_count = page_count;
1264ddec20f8SJuan Quintela         p->page_size = qemu_target_page_size();
1265d32ca5adSJuan Quintela     }
1266ab7cbb0bSJuan Quintela 
1267ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1268ab7cbb0bSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1269ab7cbb0bSJuan Quintela         int ret;
1270ab7cbb0bSJuan Quintela 
12713fc58efaSAvihai Horon         ret = multifd_recv_state->ops->recv_setup(p, errp);
1272ab7cbb0bSJuan Quintela         if (ret) {
1273ab7cbb0bSJuan Quintela             return ret;
1274ab7cbb0bSJuan Quintela         }
1275ab7cbb0bSJuan Quintela     }
1276d32ca5adSJuan Quintela     return 0;
1277d32ca5adSJuan Quintela }
1278d32ca5adSJuan Quintela 
1279d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void)
1280d32ca5adSJuan Quintela {
1281d32ca5adSJuan Quintela     int thread_count = migrate_multifd_channels();
1282d32ca5adSJuan Quintela 
128351b07548SJuan Quintela     if (!migrate_multifd()) {
1284d32ca5adSJuan Quintela         return true;
1285d32ca5adSJuan Quintela     }
1286d32ca5adSJuan Quintela 
1287a59136f3SDr. David Alan Gilbert     if (!multifd_recv_state) {
1288a59136f3SDr. David Alan Gilbert         /* Called before any connections created */
1289a59136f3SDr. David Alan Gilbert         return false;
1290a59136f3SDr. David Alan Gilbert     }
1291a59136f3SDr. David Alan Gilbert 
1292d73415a3SStefan Hajnoczi     return thread_count == qatomic_read(&multifd_recv_state->count);
1293d32ca5adSJuan Quintela }
1294d32ca5adSJuan Quintela 
1295d32ca5adSJuan Quintela /*
1296d32ca5adSJuan Quintela  * Try to receive all multifd channels to get ready for the migration.
12976720c2b3Smanish.mishra  * Sets @errp when failing to receive the current channel.
1298d32ca5adSJuan Quintela  */
12996720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1300d32ca5adSJuan Quintela {
1301d32ca5adSJuan Quintela     MultiFDRecvParams *p;
1302d32ca5adSJuan Quintela     Error *local_err = NULL;
1303d32ca5adSJuan Quintela     int id;
1304d32ca5adSJuan Quintela 
1305d32ca5adSJuan Quintela     id = multifd_recv_initial_packet(ioc, &local_err);
1306d32ca5adSJuan Quintela     if (id < 0) {
1307d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
1308d32ca5adSJuan Quintela         error_propagate_prepend(errp, local_err,
1309d32ca5adSJuan Quintela                                 "failed to receive packet"
1310d32ca5adSJuan Quintela                                 " via multifd channel %d: ",
1311d73415a3SStefan Hajnoczi                                 qatomic_read(&multifd_recv_state->count));
13126720c2b3Smanish.mishra         return;
1313d32ca5adSJuan Quintela     }
1314d32ca5adSJuan Quintela     trace_multifd_recv_new_channel(id);
1315d32ca5adSJuan Quintela 
1316d32ca5adSJuan Quintela     p = &multifd_recv_state->params[id];
1317d32ca5adSJuan Quintela     if (p->c != NULL) {
1318d32ca5adSJuan Quintela         error_setg(&local_err, "multifd: received id '%d' already setup'",
1319d32ca5adSJuan Quintela                    id);
1320d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
1321d32ca5adSJuan Quintela         error_propagate(errp, local_err);
13226720c2b3Smanish.mishra         return;
1323d32ca5adSJuan Quintela     }
1324d32ca5adSJuan Quintela     p->c = ioc;
1325d32ca5adSJuan Quintela     object_ref(OBJECT(ioc));
1326d32ca5adSJuan Quintela 
1327d32ca5adSJuan Quintela     p->running = true;
1328d32ca5adSJuan Quintela     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1329d32ca5adSJuan Quintela                        QEMU_THREAD_JOINABLE);
1330d73415a3SStefan Hajnoczi     qatomic_inc(&multifd_recv_state->count);
1331d32ca5adSJuan Quintela }
1332