xref: /openbmc/qemu/migration/multifd.c (revision 1a6e217c)
1d32ca5adSJuan Quintela /*
2d32ca5adSJuan Quintela  * Multifd common code
3d32ca5adSJuan Quintela  *
4d32ca5adSJuan Quintela  * Copyright (c) 2019-2020 Red Hat Inc
5d32ca5adSJuan Quintela  *
6d32ca5adSJuan Quintela  * Authors:
7d32ca5adSJuan Quintela  *  Juan Quintela <quintela@redhat.com>
8d32ca5adSJuan Quintela  *
9d32ca5adSJuan Quintela  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10d32ca5adSJuan Quintela  * See the COPYING file in the top-level directory.
11d32ca5adSJuan Quintela  */
12d32ca5adSJuan Quintela 
13d32ca5adSJuan Quintela #include "qemu/osdep.h"
14d32ca5adSJuan Quintela #include "qemu/rcu.h"
15d32ca5adSJuan Quintela #include "exec/target_page.h"
16d32ca5adSJuan Quintela #include "sysemu/sysemu.h"
17d32ca5adSJuan Quintela #include "exec/ramblock.h"
18d32ca5adSJuan Quintela #include "qemu/error-report.h"
19d32ca5adSJuan Quintela #include "qapi/error.h"
20decdc767SFabiano Rosas #include "fd.h"
21b7b03eb6SFabiano Rosas #include "file.h"
22d32ca5adSJuan Quintela #include "migration.h"
23947701ccSJuan Quintela #include "migration-stats.h"
24d32ca5adSJuan Quintela #include "socket.h"
2529647140SChuan Zheng #include "tls.h"
26d32ca5adSJuan Quintela #include "qemu-file.h"
27d32ca5adSJuan Quintela #include "trace.h"
28d32ca5adSJuan Quintela #include "multifd.h"
291b1f4ab6SJiang Jiacheng #include "threadinfo.h"
30b4bc342cSJuan Quintela #include "options.h"
31b5eea99eSLukas Straub #include "qemu/yank.h"
32b7b03eb6SFabiano Rosas #include "io/channel-file.h"
33b5eea99eSLukas Straub #include "io/channel-socket.h"
341a92d6d5SLukas Straub #include "yank_functions.h"
35b5eea99eSLukas Straub 
36d32ca5adSJuan Quintela /* Multiple fd's */
37d32ca5adSJuan Quintela 
38d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U
39d32ca5adSJuan Quintela #define MULTIFD_VERSION 1
40d32ca5adSJuan Quintela 
41d32ca5adSJuan Quintela typedef struct {
42d32ca5adSJuan Quintela     uint32_t magic;
43d32ca5adSJuan Quintela     uint32_t version;
44d32ca5adSJuan Quintela     unsigned char uuid[16]; /* QemuUUID */
45d32ca5adSJuan Quintela     uint8_t id;
46d32ca5adSJuan Quintela     uint8_t unused1[7];     /* Reserved for future use */
47d32ca5adSJuan Quintela     uint64_t unused2[4];    /* Reserved for future use */
48d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t;
49d32ca5adSJuan Quintela 
5098ea497dSPeter Xu struct {
5198ea497dSPeter Xu     MultiFDSendParams *params;
5298ea497dSPeter Xu     /* array of pages to sent */
5398ea497dSPeter Xu     MultiFDPages_t *pages;
5498ea497dSPeter Xu     /*
5598ea497dSPeter Xu      * Global number of generated multifd packets.
5698ea497dSPeter Xu      *
5798ea497dSPeter Xu      * Note that we used 'uintptr_t' because it'll naturally support atomic
5898ea497dSPeter Xu      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
5998ea497dSPeter Xu      * multifd will overflow the packet_num easier, but that should be
6098ea497dSPeter Xu      * fine.
6198ea497dSPeter Xu      *
6298ea497dSPeter Xu      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
6398ea497dSPeter Xu      * hosts, however so far it does not support atomic fetch_add() yet.
6498ea497dSPeter Xu      * Make it easy for now.
6598ea497dSPeter Xu      */
6698ea497dSPeter Xu     uintptr_t packet_num;
6793fa9dc2SFabiano Rosas     /*
6893fa9dc2SFabiano Rosas      * Synchronization point past which no more channels will be
6993fa9dc2SFabiano Rosas      * created.
7093fa9dc2SFabiano Rosas      */
7193fa9dc2SFabiano Rosas     QemuSemaphore channels_created;
7298ea497dSPeter Xu     /* send channels ready */
7398ea497dSPeter Xu     QemuSemaphore channels_ready;
7498ea497dSPeter Xu     /*
7598ea497dSPeter Xu      * Have we already run terminate threads.  There is a race when it
7698ea497dSPeter Xu      * happens that we got one error while we are exiting.
7798ea497dSPeter Xu      * We will use atomic operations.  Only valid values are 0 and 1.
7898ea497dSPeter Xu      */
7998ea497dSPeter Xu     int exiting;
8098ea497dSPeter Xu     /* multifd ops */
8198ea497dSPeter Xu     MultiFDMethods *ops;
8298ea497dSPeter Xu } *multifd_send_state;
8398ea497dSPeter Xu 
8411dd7be5SFabiano Rosas struct {
8511dd7be5SFabiano Rosas     MultiFDRecvParams *params;
86d117ed06SFabiano Rosas     MultiFDRecvData *data;
8711dd7be5SFabiano Rosas     /* number of created threads */
8811dd7be5SFabiano Rosas     int count;
89d117ed06SFabiano Rosas     /*
90d117ed06SFabiano Rosas      * This is always posted by the recv threads, the migration thread
91d117ed06SFabiano Rosas      * uses it to wait for recv threads to finish assigned tasks.
92d117ed06SFabiano Rosas      */
9311dd7be5SFabiano Rosas     QemuSemaphore sem_sync;
9411dd7be5SFabiano Rosas     /* global number of generated multifd packets */
9511dd7be5SFabiano Rosas     uint64_t packet_num;
9611dd7be5SFabiano Rosas     int exiting;
9711dd7be5SFabiano Rosas     /* multifd ops */
9811dd7be5SFabiano Rosas     MultiFDMethods *ops;
9911dd7be5SFabiano Rosas } *multifd_recv_state;
10011dd7be5SFabiano Rosas 
10106833d83SFabiano Rosas static bool multifd_use_packets(void)
10206833d83SFabiano Rosas {
10306833d83SFabiano Rosas     return !migrate_mapped_ram();
10406833d83SFabiano Rosas }
10506833d83SFabiano Rosas 
106a8a3e710SFabiano Rosas void multifd_send_channel_created(void)
107a8a3e710SFabiano Rosas {
108a8a3e710SFabiano Rosas     qemu_sem_post(&multifd_send_state->channels_created);
109a8a3e710SFabiano Rosas }
110a8a3e710SFabiano Rosas 
111f427d90bSFabiano Rosas static void multifd_set_file_bitmap(MultiFDSendParams *p)
112f427d90bSFabiano Rosas {
113f427d90bSFabiano Rosas     MultiFDPages_t *pages = p->pages;
114f427d90bSFabiano Rosas 
115f427d90bSFabiano Rosas     assert(pages->block);
116f427d90bSFabiano Rosas 
117f427d90bSFabiano Rosas     for (int i = 0; i < p->pages->num; i++) {
118f427d90bSFabiano Rosas         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i]);
119f427d90bSFabiano Rosas     }
120f427d90bSFabiano Rosas }
121f427d90bSFabiano Rosas 
122ab7cbb0bSJuan Quintela /* Multifd without compression */
123ab7cbb0bSJuan Quintela 
124ab7cbb0bSJuan Quintela /**
125ab7cbb0bSJuan Quintela  * nocomp_send_setup: setup send side
126ab7cbb0bSJuan Quintela  *
127ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
128ab7cbb0bSJuan Quintela  * @errp: pointer to an error
129ab7cbb0bSJuan Quintela  */
130ab7cbb0bSJuan Quintela static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
131ab7cbb0bSJuan Quintela {
13225a1f878SPeter Xu     if (migrate_zero_copy_send()) {
13325a1f878SPeter Xu         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
13425a1f878SPeter Xu     }
13525a1f878SPeter Xu 
136ab7cbb0bSJuan Quintela     return 0;
137ab7cbb0bSJuan Quintela }
138ab7cbb0bSJuan Quintela 
139ab7cbb0bSJuan Quintela /**
140ab7cbb0bSJuan Quintela  * nocomp_send_cleanup: cleanup send side
141ab7cbb0bSJuan Quintela  *
142ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
143ab7cbb0bSJuan Quintela  *
144ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
14518ede636SJuan Quintela  * @errp: pointer to an error
146ab7cbb0bSJuan Quintela  */
147ab7cbb0bSJuan Quintela static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
148ab7cbb0bSJuan Quintela {
149ab7cbb0bSJuan Quintela     return;
150ab7cbb0bSJuan Quintela }
151ab7cbb0bSJuan Quintela 
15206833d83SFabiano Rosas static void multifd_send_prepare_iovs(MultiFDSendParams *p)
15306833d83SFabiano Rosas {
15406833d83SFabiano Rosas     MultiFDPages_t *pages = p->pages;
15506833d83SFabiano Rosas 
15606833d83SFabiano Rosas     for (int i = 0; i < pages->num; i++) {
15706833d83SFabiano Rosas         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
15806833d83SFabiano Rosas         p->iov[p->iovs_num].iov_len = p->page_size;
15906833d83SFabiano Rosas         p->iovs_num++;
16006833d83SFabiano Rosas     }
16106833d83SFabiano Rosas 
16206833d83SFabiano Rosas     p->next_packet_size = pages->num * p->page_size;
16306833d83SFabiano Rosas }
16406833d83SFabiano Rosas 
165ab7cbb0bSJuan Quintela /**
166ab7cbb0bSJuan Quintela  * nocomp_send_prepare: prepare date to be able to send
167ab7cbb0bSJuan Quintela  *
168ab7cbb0bSJuan Quintela  * For no compression we just have to calculate the size of the
169ab7cbb0bSJuan Quintela  * packet.
170ab7cbb0bSJuan Quintela  *
171ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
172ab7cbb0bSJuan Quintela  *
173ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
174ab7cbb0bSJuan Quintela  * @errp: pointer to an error
175ab7cbb0bSJuan Quintela  */
17602fb8104SJuan Quintela static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
177ab7cbb0bSJuan Quintela {
17825a1f878SPeter Xu     bool use_zero_copy_send = migrate_zero_copy_send();
17925a1f878SPeter Xu     int ret;
18025a1f878SPeter Xu 
18106833d83SFabiano Rosas     if (!multifd_use_packets()) {
18206833d83SFabiano Rosas         multifd_send_prepare_iovs(p);
183f427d90bSFabiano Rosas         multifd_set_file_bitmap(p);
184f427d90bSFabiano Rosas 
18506833d83SFabiano Rosas         return 0;
18606833d83SFabiano Rosas     }
18706833d83SFabiano Rosas 
18825a1f878SPeter Xu     if (!use_zero_copy_send) {
18925a1f878SPeter Xu         /*
19025a1f878SPeter Xu          * Only !zerocopy needs the header in IOV; zerocopy will
19125a1f878SPeter Xu          * send it separately.
19225a1f878SPeter Xu          */
19325a1f878SPeter Xu         multifd_send_prepare_header(p);
19425a1f878SPeter Xu     }
195226468baSJuan Quintela 
19606833d83SFabiano Rosas     multifd_send_prepare_iovs(p);
197ab7cbb0bSJuan Quintela     p->flags |= MULTIFD_FLAG_NOCOMP;
19825a1f878SPeter Xu 
19925a1f878SPeter Xu     multifd_send_fill_packet(p);
20025a1f878SPeter Xu 
20125a1f878SPeter Xu     if (use_zero_copy_send) {
20225a1f878SPeter Xu         /* Send header first, without zerocopy */
20325a1f878SPeter Xu         ret = qio_channel_write_all(p->c, (void *)p->packet,
20425a1f878SPeter Xu                                     p->packet_len, errp);
20525a1f878SPeter Xu         if (ret != 0) {
20625a1f878SPeter Xu             return -1;
20725a1f878SPeter Xu         }
20825a1f878SPeter Xu     }
20925a1f878SPeter Xu 
210ab7cbb0bSJuan Quintela     return 0;
211ab7cbb0bSJuan Quintela }
212ab7cbb0bSJuan Quintela 
213ab7cbb0bSJuan Quintela /**
214ab7cbb0bSJuan Quintela  * nocomp_recv_setup: setup receive side
215ab7cbb0bSJuan Quintela  *
216ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
217ab7cbb0bSJuan Quintela  *
218ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
219ab7cbb0bSJuan Quintela  *
220ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
221ab7cbb0bSJuan Quintela  * @errp: pointer to an error
222ab7cbb0bSJuan Quintela  */
223ab7cbb0bSJuan Quintela static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
224ab7cbb0bSJuan Quintela {
225ab7cbb0bSJuan Quintela     return 0;
226ab7cbb0bSJuan Quintela }
227ab7cbb0bSJuan Quintela 
228ab7cbb0bSJuan Quintela /**
229ab7cbb0bSJuan Quintela  * nocomp_recv_cleanup: setup receive side
230ab7cbb0bSJuan Quintela  *
231ab7cbb0bSJuan Quintela  * For no compression this function does nothing.
232ab7cbb0bSJuan Quintela  *
233ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
234ab7cbb0bSJuan Quintela  */
235ab7cbb0bSJuan Quintela static void nocomp_recv_cleanup(MultiFDRecvParams *p)
236ab7cbb0bSJuan Quintela {
237ab7cbb0bSJuan Quintela }
238ab7cbb0bSJuan Quintela 
239ab7cbb0bSJuan Quintela /**
2409db19125SFabiano Rosas  * nocomp_recv: read the data from the channel
241ab7cbb0bSJuan Quintela  *
242ab7cbb0bSJuan Quintela  * For no compression we just need to read things into the correct place.
243ab7cbb0bSJuan Quintela  *
244ab7cbb0bSJuan Quintela  * Returns 0 for success or -1 for error
245ab7cbb0bSJuan Quintela  *
246ab7cbb0bSJuan Quintela  * @p: Params for the channel that we are using
247ab7cbb0bSJuan Quintela  * @errp: pointer to an error
248ab7cbb0bSJuan Quintela  */
2499db19125SFabiano Rosas static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
250ab7cbb0bSJuan Quintela {
25106833d83SFabiano Rosas     uint32_t flags;
25206833d83SFabiano Rosas 
25306833d83SFabiano Rosas     if (!multifd_use_packets()) {
254a49d15a3SFabiano Rosas         return multifd_file_recv_data(p, errp);
25506833d83SFabiano Rosas     }
25606833d83SFabiano Rosas 
25706833d83SFabiano Rosas     flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
258ab7cbb0bSJuan Quintela 
259ab7cbb0bSJuan Quintela     if (flags != MULTIFD_FLAG_NOCOMP) {
26004e11404SJuan Quintela         error_setg(errp, "multifd %u: flags received %x flags expected %x",
261ab7cbb0bSJuan Quintela                    p->id, flags, MULTIFD_FLAG_NOCOMP);
262ab7cbb0bSJuan Quintela         return -1;
263ab7cbb0bSJuan Quintela     }
264cf2d4aa8SJuan Quintela     for (int i = 0; i < p->normal_num; i++) {
265faf60935SJuan Quintela         p->iov[i].iov_base = p->host + p->normal[i];
266ddec20f8SJuan Quintela         p->iov[i].iov_len = p->page_size;
267226468baSJuan Quintela     }
268cf2d4aa8SJuan Quintela     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
269ab7cbb0bSJuan Quintela }
270ab7cbb0bSJuan Quintela 
271ab7cbb0bSJuan Quintela static MultiFDMethods multifd_nocomp_ops = {
272ab7cbb0bSJuan Quintela     .send_setup = nocomp_send_setup,
273ab7cbb0bSJuan Quintela     .send_cleanup = nocomp_send_cleanup,
274ab7cbb0bSJuan Quintela     .send_prepare = nocomp_send_prepare,
275ab7cbb0bSJuan Quintela     .recv_setup = nocomp_recv_setup,
276ab7cbb0bSJuan Quintela     .recv_cleanup = nocomp_recv_cleanup,
2779db19125SFabiano Rosas     .recv = nocomp_recv
278ab7cbb0bSJuan Quintela };
279ab7cbb0bSJuan Quintela 
280ab7cbb0bSJuan Quintela static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
281ab7cbb0bSJuan Quintela     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
282ab7cbb0bSJuan Quintela };
283ab7cbb0bSJuan Quintela 
2847ec2c2b3SJuan Quintela void multifd_register_ops(int method, MultiFDMethods *ops)
2857ec2c2b3SJuan Quintela {
2867ec2c2b3SJuan Quintela     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
2877ec2c2b3SJuan Quintela     multifd_ops[method] = ops;
2887ec2c2b3SJuan Quintela }
2897ec2c2b3SJuan Quintela 
290836eca47SPeter Xu /* Reset a MultiFDPages_t* object for the next use */
291836eca47SPeter Xu static void multifd_pages_reset(MultiFDPages_t *pages)
292836eca47SPeter Xu {
293836eca47SPeter Xu     /*
294836eca47SPeter Xu      * We don't need to touch offset[] array, because it will be
295836eca47SPeter Xu      * overwritten later when reused.
296836eca47SPeter Xu      */
297836eca47SPeter Xu     pages->num = 0;
298836eca47SPeter Xu     pages->block = NULL;
299836eca47SPeter Xu }
300836eca47SPeter Xu 
301d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
302d32ca5adSJuan Quintela {
303d32ca5adSJuan Quintela     MultiFDInit_t msg = {};
304cbec7eb7SJuan Quintela     size_t size = sizeof(msg);
305d32ca5adSJuan Quintela     int ret;
306d32ca5adSJuan Quintela 
307d32ca5adSJuan Quintela     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
308d32ca5adSJuan Quintela     msg.version = cpu_to_be32(MULTIFD_VERSION);
309d32ca5adSJuan Quintela     msg.id = p->id;
310d32ca5adSJuan Quintela     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
311d32ca5adSJuan Quintela 
312cbec7eb7SJuan Quintela     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
313d32ca5adSJuan Quintela     if (ret != 0) {
314d32ca5adSJuan Quintela         return -1;
315d32ca5adSJuan Quintela     }
316cbec7eb7SJuan Quintela     stat64_add(&mig_stats.multifd_bytes, size);
317d32ca5adSJuan Quintela     return 0;
318d32ca5adSJuan Quintela }
319d32ca5adSJuan Quintela 
320d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
321d32ca5adSJuan Quintela {
322d32ca5adSJuan Quintela     MultiFDInit_t msg;
323d32ca5adSJuan Quintela     int ret;
324d32ca5adSJuan Quintela 
325d32ca5adSJuan Quintela     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
326d32ca5adSJuan Quintela     if (ret != 0) {
327d32ca5adSJuan Quintela         return -1;
328d32ca5adSJuan Quintela     }
329d32ca5adSJuan Quintela 
330d32ca5adSJuan Quintela     msg.magic = be32_to_cpu(msg.magic);
331d32ca5adSJuan Quintela     msg.version = be32_to_cpu(msg.version);
332d32ca5adSJuan Quintela 
333d32ca5adSJuan Quintela     if (msg.magic != MULTIFD_MAGIC) {
334d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet magic %x "
335d32ca5adSJuan Quintela                    "expected %x", msg.magic, MULTIFD_MAGIC);
336d32ca5adSJuan Quintela         return -1;
337d32ca5adSJuan Quintela     }
338d32ca5adSJuan Quintela 
339d32ca5adSJuan Quintela     if (msg.version != MULTIFD_VERSION) {
34004e11404SJuan Quintela         error_setg(errp, "multifd: received packet version %u "
34104e11404SJuan Quintela                    "expected %u", msg.version, MULTIFD_VERSION);
342d32ca5adSJuan Quintela         return -1;
343d32ca5adSJuan Quintela     }
344d32ca5adSJuan Quintela 
345d32ca5adSJuan Quintela     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
346d32ca5adSJuan Quintela         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
347d32ca5adSJuan Quintela         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
348d32ca5adSJuan Quintela 
349d32ca5adSJuan Quintela         error_setg(errp, "multifd: received uuid '%s' and expected "
350d32ca5adSJuan Quintela                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
351d32ca5adSJuan Quintela         g_free(uuid);
352d32ca5adSJuan Quintela         g_free(msg_uuid);
353d32ca5adSJuan Quintela         return -1;
354d32ca5adSJuan Quintela     }
355d32ca5adSJuan Quintela 
356d32ca5adSJuan Quintela     if (msg.id > migrate_multifd_channels()) {
357c77b4085SAvihai Horon         error_setg(errp, "multifd: received channel id %u is greater than "
358c77b4085SAvihai Horon                    "number of channels %u", msg.id, migrate_multifd_channels());
359d32ca5adSJuan Quintela         return -1;
360d32ca5adSJuan Quintela     }
361d32ca5adSJuan Quintela 
362d32ca5adSJuan Quintela     return msg.id;
363d32ca5adSJuan Quintela }
364d32ca5adSJuan Quintela 
3656074f816SFabiano Rosas static MultiFDPages_t *multifd_pages_init(uint32_t n)
366d32ca5adSJuan Quintela {
367d32ca5adSJuan Quintela     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
368d32ca5adSJuan Quintela 
3696074f816SFabiano Rosas     pages->allocated = n;
3706074f816SFabiano Rosas     pages->offset = g_new0(ram_addr_t, n);
371d32ca5adSJuan Quintela 
372d32ca5adSJuan Quintela     return pages;
373d32ca5adSJuan Quintela }
374d32ca5adSJuan Quintela 
375d32ca5adSJuan Quintela static void multifd_pages_clear(MultiFDPages_t *pages)
376d32ca5adSJuan Quintela {
377836eca47SPeter Xu     multifd_pages_reset(pages);
378d32ca5adSJuan Quintela     pages->allocated = 0;
379d32ca5adSJuan Quintela     g_free(pages->offset);
380d32ca5adSJuan Quintela     pages->offset = NULL;
381d32ca5adSJuan Quintela     g_free(pages);
382d32ca5adSJuan Quintela }
383d32ca5adSJuan Quintela 
38425a1f878SPeter Xu void multifd_send_fill_packet(MultiFDSendParams *p)
385d32ca5adSJuan Quintela {
386d32ca5adSJuan Quintela     MultiFDPacket_t *packet = p->packet;
387efd8c543SPeter Xu     MultiFDPages_t *pages = p->pages;
38898ea497dSPeter Xu     uint64_t packet_num;
389d32ca5adSJuan Quintela     int i;
390d32ca5adSJuan Quintela 
391d32ca5adSJuan Quintela     packet->flags = cpu_to_be32(p->flags);
392d32ca5adSJuan Quintela     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
393efd8c543SPeter Xu     packet->normal_pages = cpu_to_be32(pages->num);
394d32ca5adSJuan Quintela     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
39598ea497dSPeter Xu 
39698ea497dSPeter Xu     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
39798ea497dSPeter Xu     packet->packet_num = cpu_to_be64(packet_num);
398d32ca5adSJuan Quintela 
399efd8c543SPeter Xu     if (pages->block) {
400efd8c543SPeter Xu         strncpy(packet->ramblock, pages->block->idstr, 256);
401d32ca5adSJuan Quintela     }
402d32ca5adSJuan Quintela 
403efd8c543SPeter Xu     for (i = 0; i < pages->num; i++) {
404d32ca5adSJuan Quintela         /* there are architectures where ram_addr_t is 32 bit */
405efd8c543SPeter Xu         uint64_t temp = pages->offset[i];
406d32ca5adSJuan Quintela 
407d32ca5adSJuan Quintela         packet->offset[i] = cpu_to_be64(temp);
408d32ca5adSJuan Quintela     }
40905b7ec18SPeter Xu 
41005b7ec18SPeter Xu     p->packets_sent++;
411db7e1cc5SPeter Xu     p->total_normal_pages += pages->num;
4128a9ef173SPeter Xu 
41398ea497dSPeter Xu     trace_multifd_send(p->id, packet_num, pages->num, p->flags,
4148a9ef173SPeter Xu                        p->next_packet_size);
415d32ca5adSJuan Quintela }
416d32ca5adSJuan Quintela 
417d32ca5adSJuan Quintela static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
418d32ca5adSJuan Quintela {
419d32ca5adSJuan Quintela     MultiFDPacket_t *packet = p->packet;
420d32ca5adSJuan Quintela     int i;
421d32ca5adSJuan Quintela 
422d32ca5adSJuan Quintela     packet->magic = be32_to_cpu(packet->magic);
423d32ca5adSJuan Quintela     if (packet->magic != MULTIFD_MAGIC) {
424d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
425d32ca5adSJuan Quintela                    "magic %x and expected magic %x",
426d32ca5adSJuan Quintela                    packet->magic, MULTIFD_MAGIC);
427d32ca5adSJuan Quintela         return -1;
428d32ca5adSJuan Quintela     }
429d32ca5adSJuan Quintela 
430d32ca5adSJuan Quintela     packet->version = be32_to_cpu(packet->version);
431d32ca5adSJuan Quintela     if (packet->version != MULTIFD_VERSION) {
432d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
43304e11404SJuan Quintela                    "version %u and expected version %u",
434d32ca5adSJuan Quintela                    packet->version, MULTIFD_VERSION);
435d32ca5adSJuan Quintela         return -1;
436d32ca5adSJuan Quintela     }
437d32ca5adSJuan Quintela 
438d32ca5adSJuan Quintela     p->flags = be32_to_cpu(packet->flags);
439d32ca5adSJuan Quintela 
440d32ca5adSJuan Quintela     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
441d32ca5adSJuan Quintela     /*
442d32ca5adSJuan Quintela      * If we received a packet that is 100 times bigger than expected
443d32ca5adSJuan Quintela      * just stop migration.  It is a magic number.
444d32ca5adSJuan Quintela      */
445d6f45ebaSJuan Quintela     if (packet->pages_alloc > p->page_count) {
446d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
447cf2d4aa8SJuan Quintela                    "with size %u and expected a size of %u",
448d6f45ebaSJuan Quintela                    packet->pages_alloc, p->page_count) ;
449d32ca5adSJuan Quintela         return -1;
450d32ca5adSJuan Quintela     }
451d32ca5adSJuan Quintela 
4528c0ec0b2SJuan Quintela     p->normal_num = be32_to_cpu(packet->normal_pages);
453cf2d4aa8SJuan Quintela     if (p->normal_num > packet->pages_alloc) {
454d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet "
45504e11404SJuan Quintela                    "with %u pages and expected maximum pages are %u",
456cf2d4aa8SJuan Quintela                    p->normal_num, packet->pages_alloc) ;
457d32ca5adSJuan Quintela         return -1;
458d32ca5adSJuan Quintela     }
459d32ca5adSJuan Quintela 
460d32ca5adSJuan Quintela     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
461d32ca5adSJuan Quintela     p->packet_num = be64_to_cpu(packet->packet_num);
46205b7ec18SPeter Xu     p->packets_recved++;
463db7e1cc5SPeter Xu     p->total_normal_pages += p->normal_num;
464d32ca5adSJuan Quintela 
4658a9ef173SPeter Xu     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
4668a9ef173SPeter Xu                        p->next_packet_size);
4678a9ef173SPeter Xu 
468cf2d4aa8SJuan Quintela     if (p->normal_num == 0) {
469d32ca5adSJuan Quintela         return 0;
470d32ca5adSJuan Quintela     }
471d32ca5adSJuan Quintela 
472d32ca5adSJuan Quintela     /* make sure that ramblock is 0 terminated */
473d32ca5adSJuan Quintela     packet->ramblock[255] = 0;
4745d1d1fcfSLukas Straub     p->block = qemu_ram_block_by_name(packet->ramblock);
4755d1d1fcfSLukas Straub     if (!p->block) {
476d32ca5adSJuan Quintela         error_setg(errp, "multifd: unknown ram block %s",
477d32ca5adSJuan Quintela                    packet->ramblock);
478d32ca5adSJuan Quintela         return -1;
479d32ca5adSJuan Quintela     }
480d32ca5adSJuan Quintela 
4815d1d1fcfSLukas Straub     p->host = p->block->host;
482cf2d4aa8SJuan Quintela     for (i = 0; i < p->normal_num; i++) {
483d32ca5adSJuan Quintela         uint64_t offset = be64_to_cpu(packet->offset[i]);
484d32ca5adSJuan Quintela 
4855d1d1fcfSLukas Straub         if (offset > (p->block->used_length - p->page_size)) {
486d32ca5adSJuan Quintela             error_setg(errp, "multifd: offset too long %" PRIu64
487d32ca5adSJuan Quintela                        " (max " RAM_ADDR_FMT ")",
4885d1d1fcfSLukas Straub                        offset, p->block->used_length);
489d32ca5adSJuan Quintela             return -1;
490d32ca5adSJuan Quintela         }
491cf2d4aa8SJuan Quintela         p->normal[i] = offset;
492d32ca5adSJuan Quintela     }
493d32ca5adSJuan Quintela 
494d32ca5adSJuan Quintela     return 0;
495d32ca5adSJuan Quintela }
496d32ca5adSJuan Quintela 
49715f3f21dSPeter Xu static bool multifd_send_should_exit(void)
49815f3f21dSPeter Xu {
49915f3f21dSPeter Xu     return qatomic_read(&multifd_send_state->exiting);
50015f3f21dSPeter Xu }
50115f3f21dSPeter Xu 
50211dd7be5SFabiano Rosas static bool multifd_recv_should_exit(void)
50311dd7be5SFabiano Rosas {
50411dd7be5SFabiano Rosas     return qatomic_read(&multifd_recv_state->exiting);
50511dd7be5SFabiano Rosas }
50611dd7be5SFabiano Rosas 
507d32ca5adSJuan Quintela /*
50848c0f5d5SPeter Xu  * The migration thread can wait on either of the two semaphores.  This
50948c0f5d5SPeter Xu  * function can be used to kick the main thread out of waiting on either of
51048c0f5d5SPeter Xu  * them.  Should mostly only be called when something wrong happened with
51148c0f5d5SPeter Xu  * the current multifd send thread.
51248c0f5d5SPeter Xu  */
51348c0f5d5SPeter Xu static void multifd_send_kick_main(MultiFDSendParams *p)
51448c0f5d5SPeter Xu {
51548c0f5d5SPeter Xu     qemu_sem_post(&p->sem_sync);
51648c0f5d5SPeter Xu     qemu_sem_post(&multifd_send_state->channels_ready);
51748c0f5d5SPeter Xu }
51848c0f5d5SPeter Xu 
51948c0f5d5SPeter Xu /*
520d32ca5adSJuan Quintela  * How we use multifd_send_state->pages and channel->pages?
521d32ca5adSJuan Quintela  *
522d32ca5adSJuan Quintela  * We create a pages for each channel, and a main one.  Each time that
523d32ca5adSJuan Quintela  * we need to send a batch of pages we interchange the ones between
524d32ca5adSJuan Quintela  * multifd_send_state and the channel that is sending it.  There are
525d32ca5adSJuan Quintela  * two reasons for that:
526d32ca5adSJuan Quintela  *    - to not have to do so many mallocs during migration
527d32ca5adSJuan Quintela  *    - to make easier to know what to free at the end of migration
528d32ca5adSJuan Quintela  *
529d32ca5adSJuan Quintela  * This way we always know who is the owner of each "pages" struct,
530d32ca5adSJuan Quintela  * and we don't need any locking.  It belongs to the migration thread
531d32ca5adSJuan Quintela  * or to the channel thread.  Switching is safe because the migration
532d32ca5adSJuan Quintela  * thread is using the channel mutex when changing it, and the channel
533d32ca5adSJuan Quintela  * have to had finish with its own, otherwise pending_job can't be
534d32ca5adSJuan Quintela  * false.
5353b40964aSPeter Xu  *
5363b40964aSPeter Xu  * Returns true if succeed, false otherwise.
537d32ca5adSJuan Quintela  */
5383b40964aSPeter Xu static bool multifd_send_pages(void)
539d32ca5adSJuan Quintela {
540d32ca5adSJuan Quintela     int i;
541d32ca5adSJuan Quintela     static int next_channel;
542d32ca5adSJuan Quintela     MultiFDSendParams *p = NULL; /* make happy gcc */
543d32ca5adSJuan Quintela     MultiFDPages_t *pages = multifd_send_state->pages;
544d32ca5adSJuan Quintela 
54515f3f21dSPeter Xu     if (multifd_send_should_exit()) {
5463b40964aSPeter Xu         return false;
547d32ca5adSJuan Quintela     }
548d32ca5adSJuan Quintela 
549e3cce9afSPeter Xu     /* We wait here, until at least one channel is ready */
550d32ca5adSJuan Quintela     qemu_sem_wait(&multifd_send_state->channels_ready);
551e3cce9afSPeter Xu 
5527e89a140SLaurent Vivier     /*
5537e89a140SLaurent Vivier      * next_channel can remain from a previous migration that was
5547e89a140SLaurent Vivier      * using more channels, so ensure it doesn't overflow if the
5557e89a140SLaurent Vivier      * limit is lower now.
5567e89a140SLaurent Vivier      */
5577e89a140SLaurent Vivier     next_channel %= migrate_multifd_channels();
558d32ca5adSJuan Quintela     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
55915f3f21dSPeter Xu         if (multifd_send_should_exit()) {
5603b40964aSPeter Xu             return false;
561d32ca5adSJuan Quintela         }
56215f3f21dSPeter Xu         p = &multifd_send_state->params[i];
563e3cce9afSPeter Xu         /*
564e3cce9afSPeter Xu          * Lockless read to p->pending_job is safe, because only multifd
565e3cce9afSPeter Xu          * sender thread can clear it.
566e3cce9afSPeter Xu          */
567f5f48a78SPeter Xu         if (qatomic_read(&p->pending_job) == false) {
568d32ca5adSJuan Quintela             next_channel = (i + 1) % migrate_multifd_channels();
569d32ca5adSJuan Quintela             break;
570d32ca5adSJuan Quintela         }
571d32ca5adSJuan Quintela     }
572e3cce9afSPeter Xu 
573e3cce9afSPeter Xu     /*
574488c84acSPeter Xu      * Make sure we read p->pending_job before all the rest.  Pairs with
575488c84acSPeter Xu      * qatomic_store_release() in multifd_send_thread().
576e3cce9afSPeter Xu      */
577488c84acSPeter Xu     smp_mb_acquire();
578488c84acSPeter Xu     assert(!p->pages->num);
579d32ca5adSJuan Quintela     multifd_send_state->pages = p->pages;
580d32ca5adSJuan Quintela     p->pages = pages;
581488c84acSPeter Xu     /*
582488c84acSPeter Xu      * Making sure p->pages is setup before marking pending_job=true. Pairs
583488c84acSPeter Xu      * with the qatomic_load_acquire() in multifd_send_thread().
584488c84acSPeter Xu      */
585488c84acSPeter Xu     qatomic_store_release(&p->pending_job, true);
586d32ca5adSJuan Quintela     qemu_sem_post(&p->sem);
587d32ca5adSJuan Quintela 
5883b40964aSPeter Xu     return true;
589d32ca5adSJuan Quintela }
590d32ca5adSJuan Quintela 
591f88f86c4SPeter Xu static inline bool multifd_queue_empty(MultiFDPages_t *pages)
592f88f86c4SPeter Xu {
593f88f86c4SPeter Xu     return pages->num == 0;
594f88f86c4SPeter Xu }
595f88f86c4SPeter Xu 
596f88f86c4SPeter Xu static inline bool multifd_queue_full(MultiFDPages_t *pages)
597f88f86c4SPeter Xu {
598f88f86c4SPeter Xu     return pages->num == pages->allocated;
599f88f86c4SPeter Xu }
600f88f86c4SPeter Xu 
601f88f86c4SPeter Xu static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
602f88f86c4SPeter Xu {
603f88f86c4SPeter Xu     pages->offset[pages->num++] = offset;
604f88f86c4SPeter Xu }
605f88f86c4SPeter Xu 
606d6556d17SPeter Xu /* Returns true if enqueue successful, false otherwise */
607d6556d17SPeter Xu bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
608d32ca5adSJuan Quintela {
609f88f86c4SPeter Xu     MultiFDPages_t *pages;
610d32ca5adSJuan Quintela 
611f88f86c4SPeter Xu retry:
612f88f86c4SPeter Xu     pages = multifd_send_state->pages;
613f88f86c4SPeter Xu 
614f88f86c4SPeter Xu     /* If the queue is empty, we can already enqueue now */
615f88f86c4SPeter Xu     if (multifd_queue_empty(pages)) {
616d32ca5adSJuan Quintela         pages->block = block;
617f88f86c4SPeter Xu         multifd_enqueue(pages, offset);
618d6556d17SPeter Xu         return true;
619d32ca5adSJuan Quintela     }
620d32ca5adSJuan Quintela 
621f88f86c4SPeter Xu     /*
622f88f86c4SPeter Xu      * Not empty, meanwhile we need a flush.  It can because of either:
623f88f86c4SPeter Xu      *
624f88f86c4SPeter Xu      * (1) The page is not on the same ramblock of previous ones, or,
625f88f86c4SPeter Xu      * (2) The queue is full.
626f88f86c4SPeter Xu      *
627f88f86c4SPeter Xu      * After flush, always retry.
628f88f86c4SPeter Xu      */
629f88f86c4SPeter Xu     if (pages->block != block || multifd_queue_full(pages)) {
6303b40964aSPeter Xu         if (!multifd_send_pages()) {
631d6556d17SPeter Xu             return false;
632d32ca5adSJuan Quintela         }
633f88f86c4SPeter Xu         goto retry;
634d32ca5adSJuan Quintela     }
635d32ca5adSJuan Quintela 
636f88f86c4SPeter Xu     /* Not empty, and we still have space, do it! */
637f88f86c4SPeter Xu     multifd_enqueue(pages, offset);
638d6556d17SPeter Xu     return true;
639d32ca5adSJuan Quintela }
640d32ca5adSJuan Quintela 
6413ab4441dSPeter Xu /* Multifd send side hit an error; remember it and prepare to quit */
6423ab4441dSPeter Xu static void multifd_send_set_error(Error *err)
643d32ca5adSJuan Quintela {
64415f3f21dSPeter Xu     /*
64515f3f21dSPeter Xu      * We don't want to exit each threads twice.  Depending on where
64615f3f21dSPeter Xu      * we get the error, or if there are two independent errors in two
64715f3f21dSPeter Xu      * threads at the same time, we can end calling this function
64815f3f21dSPeter Xu      * twice.
64915f3f21dSPeter Xu      */
65015f3f21dSPeter Xu     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
65115f3f21dSPeter Xu         return;
65215f3f21dSPeter Xu     }
65315f3f21dSPeter Xu 
654d32ca5adSJuan Quintela     if (err) {
655d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
656d32ca5adSJuan Quintela         migrate_set_error(s, err);
657d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
658d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
659d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_DEVICE ||
660d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
661d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
662d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
663d32ca5adSJuan Quintela         }
664d32ca5adSJuan Quintela     }
6653ab4441dSPeter Xu }
666d32ca5adSJuan Quintela 
6673ab4441dSPeter Xu static void multifd_send_terminate_threads(void)
6683ab4441dSPeter Xu {
6693ab4441dSPeter Xu     int i;
6703ab4441dSPeter Xu 
6713ab4441dSPeter Xu     trace_multifd_send_terminate_threads();
6723ab4441dSPeter Xu 
6733ab4441dSPeter Xu     /*
6743ab4441dSPeter Xu      * Tell everyone we're quitting.  No xchg() needed here; we simply
6753ab4441dSPeter Xu      * always set it.
6763ab4441dSPeter Xu      */
6773ab4441dSPeter Xu     qatomic_set(&multifd_send_state->exiting, 1);
67812808db3SPeter Xu 
67912808db3SPeter Xu     /*
68012808db3SPeter Xu      * Firstly, kick all threads out; no matter whether they are just idle,
68112808db3SPeter Xu      * or blocked in an IO system call.
68212808db3SPeter Xu      */
683d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
684d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
685d32ca5adSJuan Quintela 
686d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
687077fbb59SLi Zhang         if (p->c) {
688077fbb59SLi Zhang             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
689077fbb59SLi Zhang         }
690d32ca5adSJuan Quintela     }
69112808db3SPeter Xu 
69212808db3SPeter Xu     /*
69312808db3SPeter Xu      * Finally recycle all the threads.
69412808db3SPeter Xu      */
69512808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
69612808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
69712808db3SPeter Xu 
698e1921f10SFabiano Rosas         if (p->tls_thread_created) {
699e1921f10SFabiano Rosas             qemu_thread_join(&p->tls_thread);
700e1921f10SFabiano Rosas         }
701e1921f10SFabiano Rosas 
702a2a63c4aSFabiano Rosas         if (p->thread_created) {
70312808db3SPeter Xu             qemu_thread_join(&p->thread);
70412808db3SPeter Xu         }
70512808db3SPeter Xu     }
706d32ca5adSJuan Quintela }
707d32ca5adSJuan Quintela 
70812808db3SPeter Xu static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
709d32ca5adSJuan Quintela {
7100518b5d8SPeter Xu     if (p->c) {
71120171ea8SLukas Straub         migration_ioc_unregister_yank(p->c);
712*1a6e217cSPeter Xu         /*
713*1a6e217cSPeter Xu          * An explicit close() on the channel here is normally not
714*1a6e217cSPeter Xu          * required, but can be helpful for "file:" iochannels, where it
715*1a6e217cSPeter Xu          * will include fdatasync() to make sure the data is flushed to the
716*1a6e217cSPeter Xu          * disk backend.
717*1a6e217cSPeter Xu          *
718*1a6e217cSPeter Xu          * The object_unref() cannot guarantee that because: (1) finalize()
719*1a6e217cSPeter Xu          * of the iochannel is only triggered on the last reference, and
720*1a6e217cSPeter Xu          * it's not guaranteed that we always hold the last refcount when
721*1a6e217cSPeter Xu          * reaching here, and, (2) even if finalize() is invoked, it only
722*1a6e217cSPeter Xu          * does a close(fd) without data flush.
723*1a6e217cSPeter Xu          */
724b7b03eb6SFabiano Rosas         qio_channel_close(p->c, &error_abort);
725c9a7e83cSPeter Xu         object_unref(OBJECT(p->c));
726d32ca5adSJuan Quintela         p->c = NULL;
7270518b5d8SPeter Xu     }
728d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem);
729d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem_sync);
730d32ca5adSJuan Quintela     g_free(p->name);
731d32ca5adSJuan Quintela     p->name = NULL;
732d32ca5adSJuan Quintela     multifd_pages_clear(p->pages);
733d32ca5adSJuan Quintela     p->pages = NULL;
734d32ca5adSJuan Quintela     p->packet_len = 0;
735d32ca5adSJuan Quintela     g_free(p->packet);
736d32ca5adSJuan Quintela     p->packet = NULL;
737226468baSJuan Quintela     g_free(p->iov);
738226468baSJuan Quintela     p->iov = NULL;
73912808db3SPeter Xu     multifd_send_state->ops->send_cleanup(p, errp);
74012808db3SPeter Xu 
74112808db3SPeter Xu     return *errp == NULL;
742ab7cbb0bSJuan Quintela }
74312808db3SPeter Xu 
74412808db3SPeter Xu static void multifd_send_cleanup_state(void)
74512808db3SPeter Xu {
746b7b03eb6SFabiano Rosas     file_cleanup_outgoing_migration();
747decdc767SFabiano Rosas     fd_cleanup_outgoing_migration();
74872b90b96SPeter Xu     socket_cleanup_outgoing_migration();
74993fa9dc2SFabiano Rosas     qemu_sem_destroy(&multifd_send_state->channels_created);
750d32ca5adSJuan Quintela     qemu_sem_destroy(&multifd_send_state->channels_ready);
751d32ca5adSJuan Quintela     g_free(multifd_send_state->params);
752d32ca5adSJuan Quintela     multifd_send_state->params = NULL;
753d32ca5adSJuan Quintela     multifd_pages_clear(multifd_send_state->pages);
754d32ca5adSJuan Quintela     multifd_send_state->pages = NULL;
755d32ca5adSJuan Quintela     g_free(multifd_send_state);
756d32ca5adSJuan Quintela     multifd_send_state = NULL;
757d32ca5adSJuan Quintela }
758d32ca5adSJuan Quintela 
759cde85c37SPeter Xu void multifd_send_shutdown(void)
76012808db3SPeter Xu {
76112808db3SPeter Xu     int i;
76212808db3SPeter Xu 
76312808db3SPeter Xu     if (!migrate_multifd()) {
76412808db3SPeter Xu         return;
76512808db3SPeter Xu     }
76612808db3SPeter Xu 
76712808db3SPeter Xu     multifd_send_terminate_threads();
76812808db3SPeter Xu 
76912808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
77012808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
77112808db3SPeter Xu         Error *local_err = NULL;
77212808db3SPeter Xu 
77312808db3SPeter Xu         if (!multifd_send_cleanup_channel(p, &local_err)) {
77412808db3SPeter Xu             migrate_set_error(migrate_get_current(), local_err);
77512808db3SPeter Xu             error_free(local_err);
77612808db3SPeter Xu         }
77712808db3SPeter Xu     }
77812808db3SPeter Xu 
77912808db3SPeter Xu     multifd_send_cleanup_state();
78012808db3SPeter Xu }
78112808db3SPeter Xu 
7824cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c)
7834cc47b43SLeonardo Bras {
7844cc47b43SLeonardo Bras     int ret;
7854cc47b43SLeonardo Bras     Error *err = NULL;
7864cc47b43SLeonardo Bras 
7874cc47b43SLeonardo Bras     ret = qio_channel_flush(c, &err);
7884cc47b43SLeonardo Bras     if (ret < 0) {
7894cc47b43SLeonardo Bras         error_report_err(err);
7904cc47b43SLeonardo Bras         return -1;
7914cc47b43SLeonardo Bras     }
7924cc47b43SLeonardo Bras     if (ret == 1) {
793aff3f660SJuan Quintela         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
7944cc47b43SLeonardo Bras     }
7954cc47b43SLeonardo Bras 
7964cc47b43SLeonardo Bras     return ret;
7974cc47b43SLeonardo Bras }
7984cc47b43SLeonardo Bras 
7999346fa18SFabiano Rosas int multifd_send_sync_main(void)
800d32ca5adSJuan Quintela {
801d32ca5adSJuan Quintela     int i;
8025b1d9babSLeonardo Bras     bool flush_zero_copy;
803d32ca5adSJuan Quintela 
80451b07548SJuan Quintela     if (!migrate_multifd()) {
80533d70973SLeonardo Bras         return 0;
806d32ca5adSJuan Quintela     }
80790a3d2f9SJuan Quintela     if (multifd_send_state->pages->num) {
8083b40964aSPeter Xu         if (!multifd_send_pages()) {
809d32ca5adSJuan Quintela             error_report("%s: multifd_send_pages fail", __func__);
81033d70973SLeonardo Bras             return -1;
811d32ca5adSJuan Quintela         }
812d32ca5adSJuan Quintela     }
8135b1d9babSLeonardo Bras 
814b4bc342cSJuan Quintela     flush_zero_copy = migrate_zero_copy_send();
8155b1d9babSLeonardo Bras 
816d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
817d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
818d32ca5adSJuan Quintela 
81915f3f21dSPeter Xu         if (multifd_send_should_exit()) {
82033d70973SLeonardo Bras             return -1;
821d32ca5adSJuan Quintela         }
822d32ca5adSJuan Quintela 
82315f3f21dSPeter Xu         trace_multifd_send_sync_main_signal(p->id);
82415f3f21dSPeter Xu 
825f5f48a78SPeter Xu         /*
826f5f48a78SPeter Xu          * We should be the only user so far, so not possible to be set by
827f5f48a78SPeter Xu          * others concurrently.
828f5f48a78SPeter Xu          */
829f5f48a78SPeter Xu         assert(qatomic_read(&p->pending_sync) == false);
830f5f48a78SPeter Xu         qatomic_set(&p->pending_sync, true);
831d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
832d32ca5adSJuan Quintela     }
833d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
834d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
835d32ca5adSJuan Quintela 
83615f3f21dSPeter Xu         if (multifd_send_should_exit()) {
83715f3f21dSPeter Xu             return -1;
83815f3f21dSPeter Xu         }
83915f3f21dSPeter Xu 
840d2026ee1SJuan Quintela         qemu_sem_wait(&multifd_send_state->channels_ready);
841d32ca5adSJuan Quintela         trace_multifd_send_sync_main_wait(p->id);
842d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem_sync);
843ebfc5787SZhenzhong Duan 
844ebfc5787SZhenzhong Duan         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
845ebfc5787SZhenzhong Duan             return -1;
846ebfc5787SZhenzhong Duan         }
847d32ca5adSJuan Quintela     }
848d32ca5adSJuan Quintela     trace_multifd_send_sync_main(multifd_send_state->packet_num);
84933d70973SLeonardo Bras 
85033d70973SLeonardo Bras     return 0;
851d32ca5adSJuan Quintela }
852d32ca5adSJuan Quintela 
853d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque)
854d32ca5adSJuan Quintela {
855d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
8561b1f4ab6SJiang Jiacheng     MigrationThread *thread = NULL;
857d32ca5adSJuan Quintela     Error *local_err = NULL;
858d32ca5adSJuan Quintela     int ret = 0;
85906833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
860d32ca5adSJuan Quintela 
861788fa680SFabiano Rosas     thread = migration_threads_add(p->name, qemu_get_thread_id());
8621b1f4ab6SJiang Jiacheng 
863d32ca5adSJuan Quintela     trace_multifd_send_thread_start(p->id);
864d32ca5adSJuan Quintela     rcu_register_thread();
865d32ca5adSJuan Quintela 
86606833d83SFabiano Rosas     if (use_packets) {
867d32ca5adSJuan Quintela         if (multifd_send_initial_packet(p, &local_err) < 0) {
868d32ca5adSJuan Quintela             ret = -1;
869d32ca5adSJuan Quintela             goto out;
870d32ca5adSJuan Quintela         }
87106833d83SFabiano Rosas     }
872d32ca5adSJuan Quintela 
873d32ca5adSJuan Quintela     while (true) {
874d2026ee1SJuan Quintela         qemu_sem_post(&multifd_send_state->channels_ready);
875d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem);
876d32ca5adSJuan Quintela 
87715f3f21dSPeter Xu         if (multifd_send_should_exit()) {
878d32ca5adSJuan Quintela             break;
879d32ca5adSJuan Quintela         }
880d32ca5adSJuan Quintela 
881488c84acSPeter Xu         /*
882488c84acSPeter Xu          * Read pending_job flag before p->pages.  Pairs with the
883488c84acSPeter Xu          * qatomic_store_release() in multifd_send_pages().
884488c84acSPeter Xu          */
885488c84acSPeter Xu         if (qatomic_load_acquire(&p->pending_job)) {
886efd8c543SPeter Xu             MultiFDPages_t *pages = p->pages;
887d32ca5adSJuan Quintela 
888b7dbdd8eSLeonardo Bras             p->iovs_num = 0;
88983c560fbSPeter Xu             assert(pages->num);
89083c560fbSPeter Xu 
89102fb8104SJuan Quintela             ret = multifd_send_state->ops->send_prepare(p, &local_err);
892ab7cbb0bSJuan Quintela             if (ret != 0) {
893ab7cbb0bSJuan Quintela                 break;
894ab7cbb0bSJuan Quintela             }
89583c560fbSPeter Xu 
896f427d90bSFabiano Rosas             if (migrate_mapped_ram()) {
897f427d90bSFabiano Rosas                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
898f427d90bSFabiano Rosas                                               p->pages->block, &local_err);
899f427d90bSFabiano Rosas             } else {
900f427d90bSFabiano Rosas                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
901f427d90bSFabiano Rosas                                                   NULL, 0, p->write_flags,
902f427d90bSFabiano Rosas                                                   &local_err);
903f427d90bSFabiano Rosas             }
904f427d90bSFabiano Rosas 
905d32ca5adSJuan Quintela             if (ret != 0) {
906d32ca5adSJuan Quintela                 break;
907d32ca5adSJuan Quintela             }
908d32ca5adSJuan Quintela 
90968b6e000SElena Ufimtseva             stat64_add(&mig_stats.multifd_bytes,
91068b6e000SElena Ufimtseva                        p->next_packet_size + p->packet_len);
911836eca47SPeter Xu 
912836eca47SPeter Xu             multifd_pages_reset(p->pages);
9131618f552SElena Ufimtseva             p->next_packet_size = 0;
914488c84acSPeter Xu 
915488c84acSPeter Xu             /*
916488c84acSPeter Xu              * Making sure p->pages is published before saying "we're
917488c84acSPeter Xu              * free".  Pairs with the smp_mb_acquire() in
918488c84acSPeter Xu              * multifd_send_pages().
919488c84acSPeter Xu              */
920488c84acSPeter Xu             qatomic_store_release(&p->pending_job, false);
921859ebaf3SPeter Xu         } else {
922488c84acSPeter Xu             /*
923488c84acSPeter Xu              * If not a normal job, must be a sync request.  Note that
924488c84acSPeter Xu              * pending_sync is a standalone flag (unlike pending_job), so
925488c84acSPeter Xu              * it doesn't require explicit memory barriers.
926488c84acSPeter Xu              */
927859ebaf3SPeter Xu             assert(qatomic_read(&p->pending_sync));
92806833d83SFabiano Rosas 
92906833d83SFabiano Rosas             if (use_packets) {
930f5f48a78SPeter Xu                 p->flags = MULTIFD_FLAG_SYNC;
931f5f48a78SPeter Xu                 multifd_send_fill_packet(p);
932f5f48a78SPeter Xu                 ret = qio_channel_write_all(p->c, (void *)p->packet,
933f5f48a78SPeter Xu                                             p->packet_len, &local_err);
934f5f48a78SPeter Xu                 if (ret != 0) {
935f5f48a78SPeter Xu                     break;
936d32ca5adSJuan Quintela                 }
937f5f48a78SPeter Xu                 /* p->next_packet_size will always be zero for a SYNC packet */
938f5f48a78SPeter Xu                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
939f5f48a78SPeter Xu                 p->flags = 0;
94006833d83SFabiano Rosas             }
94106833d83SFabiano Rosas 
942f5f48a78SPeter Xu             qatomic_set(&p->pending_sync, false);
943f5f48a78SPeter Xu             qemu_sem_post(&p->sem_sync);
944d32ca5adSJuan Quintela         }
945d32ca5adSJuan Quintela     }
946d32ca5adSJuan Quintela 
947d32ca5adSJuan Quintela out:
948ee8a7c9cSFabiano Rosas     if (ret) {
949ee8a7c9cSFabiano Rosas         assert(local_err);
950d32ca5adSJuan Quintela         trace_multifd_send_error(p->id);
9513ab4441dSPeter Xu         multifd_send_set_error(local_err);
95248c0f5d5SPeter Xu         multifd_send_kick_main(p);
953ee8a7c9cSFabiano Rosas         error_free(local_err);
954d32ca5adSJuan Quintela     }
955d32ca5adSJuan Quintela 
956d32ca5adSJuan Quintela     rcu_unregister_thread();
957788fa680SFabiano Rosas     migration_threads_remove(thread);
95805b7ec18SPeter Xu     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
959d32ca5adSJuan Quintela 
960d32ca5adSJuan Quintela     return NULL;
961d32ca5adSJuan Quintela }
962d32ca5adSJuan Quintela 
9632576ae48SFabiano Rosas static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
96429647140SChuan Zheng 
9659221e3c6SPeter Xu typedef struct {
9669221e3c6SPeter Xu     MultiFDSendParams *p;
9679221e3c6SPeter Xu     QIOChannelTLS *tioc;
9689221e3c6SPeter Xu } MultiFDTLSThreadArgs;
9699221e3c6SPeter Xu 
970a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque)
971a1af605bSChuan Zheng {
9729221e3c6SPeter Xu     MultiFDTLSThreadArgs *args = opaque;
973a1af605bSChuan Zheng 
9749221e3c6SPeter Xu     qio_channel_tls_handshake(args->tioc,
9752576ae48SFabiano Rosas                               multifd_new_send_channel_async,
9769221e3c6SPeter Xu                               args->p,
977a1af605bSChuan Zheng                               NULL,
978a1af605bSChuan Zheng                               NULL);
9799221e3c6SPeter Xu     g_free(args);
9809221e3c6SPeter Xu 
981a1af605bSChuan Zheng     return NULL;
982a1af605bSChuan Zheng }
983a1af605bSChuan Zheng 
984967e3889SFabiano Rosas static bool multifd_tls_channel_connect(MultiFDSendParams *p,
98529647140SChuan Zheng                                         QIOChannel *ioc,
98629647140SChuan Zheng                                         Error **errp)
98729647140SChuan Zheng {
98829647140SChuan Zheng     MigrationState *s = migrate_get_current();
9897f692ec7SPeter Xu     const char *hostname = s->hostname;
9909221e3c6SPeter Xu     MultiFDTLSThreadArgs *args;
99129647140SChuan Zheng     QIOChannelTLS *tioc;
99229647140SChuan Zheng 
9930deb7e9bSJuan Quintela     tioc = migration_tls_client_create(ioc, hostname, errp);
99429647140SChuan Zheng     if (!tioc) {
995967e3889SFabiano Rosas         return false;
99629647140SChuan Zheng     }
99729647140SChuan Zheng 
9982576ae48SFabiano Rosas     /*
9992576ae48SFabiano Rosas      * Ownership of the socket channel now transfers to the newly
10002576ae48SFabiano Rosas      * created TLS channel, which has already taken a reference.
10012576ae48SFabiano Rosas      */
10029e842408SChuan Zheng     object_unref(OBJECT(ioc));
1003894f0214SChuan Zheng     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
100429647140SChuan Zheng     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
10059221e3c6SPeter Xu 
10069221e3c6SPeter Xu     args = g_new0(MultiFDTLSThreadArgs, 1);
10079221e3c6SPeter Xu     args->tioc = tioc;
10089221e3c6SPeter Xu     args->p = p;
1009e1921f10SFabiano Rosas 
1010e1921f10SFabiano Rosas     p->tls_thread_created = true;
1011e1921f10SFabiano Rosas     qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
10129221e3c6SPeter Xu                        multifd_tls_handshake_thread, args,
1013a1af605bSChuan Zheng                        QEMU_THREAD_JOINABLE);
1014967e3889SFabiano Rosas     return true;
101529647140SChuan Zheng }
101629647140SChuan Zheng 
1017b7b03eb6SFabiano Rosas void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
101829647140SChuan Zheng {
10192576ae48SFabiano Rosas     qio_channel_set_delay(ioc, false);
1020967e3889SFabiano Rosas 
102120171ea8SLukas Straub     migration_ioc_register_yank(ioc);
10229221e3c6SPeter Xu     /* Setup p->c only if the channel is completely setup */
102329647140SChuan Zheng     p->c = ioc;
1024a2a63c4aSFabiano Rosas 
1025a2a63c4aSFabiano Rosas     p->thread_created = true;
102629647140SChuan Zheng     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
102729647140SChuan Zheng                        QEMU_THREAD_JOINABLE);
102829647140SChuan Zheng }
102929647140SChuan Zheng 
10302576ae48SFabiano Rosas /*
10312576ae48SFabiano Rosas  * When TLS is enabled this function is called once to establish the
10322576ae48SFabiano Rosas  * TLS connection and a second time after the TLS handshake to create
10332576ae48SFabiano Rosas  * the multifd channel. Without TLS it goes straight into the channel
10342576ae48SFabiano Rosas  * creation.
10352576ae48SFabiano Rosas  */
1036d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1037d32ca5adSJuan Quintela {
1038d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
10390e92f644SFabiano Rosas     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1040d32ca5adSJuan Quintela     Error *local_err = NULL;
10412576ae48SFabiano Rosas     bool ret;
1042d32ca5adSJuan Quintela 
1043d32ca5adSJuan Quintela     trace_multifd_new_send_channel_async(p->id);
10442576ae48SFabiano Rosas 
10452576ae48SFabiano Rosas     if (qio_task_propagate_error(task, &local_err)) {
10462576ae48SFabiano Rosas         ret = false;
10472576ae48SFabiano Rosas         goto out;
1048bca762c2SLi Zhang     }
104903c7a42dSChuan Zheng 
10502576ae48SFabiano Rosas     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
10512576ae48SFabiano Rosas                                        migrate_get_current()->hostname);
10522576ae48SFabiano Rosas 
10532576ae48SFabiano Rosas     if (migrate_channel_requires_tls_upgrade(ioc)) {
10542576ae48SFabiano Rosas         ret = multifd_tls_channel_connect(p, ioc, &local_err);
105593fa9dc2SFabiano Rosas         if (ret) {
105693fa9dc2SFabiano Rosas             return;
105793fa9dc2SFabiano Rosas         }
10582576ae48SFabiano Rosas     } else {
1059770de49cSPeter Xu         multifd_channel_connect(p, ioc);
1060770de49cSPeter Xu         ret = true;
10612576ae48SFabiano Rosas     }
10622576ae48SFabiano Rosas 
106393fa9dc2SFabiano Rosas out:
106493fa9dc2SFabiano Rosas     /*
106593fa9dc2SFabiano Rosas      * Here we're not interested whether creation succeeded, only that
106693fa9dc2SFabiano Rosas      * it happened at all.
106793fa9dc2SFabiano Rosas      */
1068a8a3e710SFabiano Rosas     multifd_send_channel_created();
106993fa9dc2SFabiano Rosas 
10702576ae48SFabiano Rosas     if (ret) {
10712576ae48SFabiano Rosas         return;
10722576ae48SFabiano Rosas     }
10732576ae48SFabiano Rosas 
1074967e3889SFabiano Rosas     trace_multifd_new_send_channel_async_error(p->id, local_err);
10753ab4441dSPeter Xu     multifd_send_set_error(local_err);
10762576ae48SFabiano Rosas     /*
10779221e3c6SPeter Xu      * For error cases (TLS or non-TLS), IO channel is always freed here
10789221e3c6SPeter Xu      * rather than when cleanup multifd: since p->c is not set, multifd
10799221e3c6SPeter Xu      * cleanup code doesn't even know its existence.
10802576ae48SFabiano Rosas      */
108115f3f21dSPeter Xu     object_unref(OBJECT(ioc));
108215f3f21dSPeter Xu     error_free(local_err);
10830e92f644SFabiano Rosas }
10840e92f644SFabiano Rosas 
1085b7b03eb6SFabiano Rosas static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
10860e92f644SFabiano Rosas {
1087b7b03eb6SFabiano Rosas     if (!multifd_use_packets()) {
1088b7b03eb6SFabiano Rosas         return file_send_channel_create(opaque, errp);
1089b7b03eb6SFabiano Rosas     }
1090b7b03eb6SFabiano Rosas 
10910e92f644SFabiano Rosas     socket_send_channel_create(multifd_new_send_channel_async, opaque);
1092b7b03eb6SFabiano Rosas     return true;
1093d32ca5adSJuan Quintela }
1094d32ca5adSJuan Quintela 
1095bd8b0a8fSFabiano Rosas bool multifd_send_setup(void)
1096d32ca5adSJuan Quintela {
1097bd8b0a8fSFabiano Rosas     MigrationState *s = migrate_get_current();
1098bd8b0a8fSFabiano Rosas     Error *local_err = NULL;
1099bd8b0a8fSFabiano Rosas     int thread_count, ret = 0;
1100d32ca5adSJuan Quintela     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
110106833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1102d32ca5adSJuan Quintela     uint8_t i;
1103d32ca5adSJuan Quintela 
110451b07548SJuan Quintela     if (!migrate_multifd()) {
1105bd8b0a8fSFabiano Rosas         return true;
1106d32ca5adSJuan Quintela     }
1107b7acd657SLi Zhijian 
1108d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
1109d32ca5adSJuan Quintela     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1110d32ca5adSJuan Quintela     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1111d32ca5adSJuan Quintela     multifd_send_state->pages = multifd_pages_init(page_count);
111293fa9dc2SFabiano Rosas     qemu_sem_init(&multifd_send_state->channels_created, 0);
1113d32ca5adSJuan Quintela     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1114d73415a3SStefan Hajnoczi     qatomic_set(&multifd_send_state->exiting, 0);
1115ab7cbb0bSJuan Quintela     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1116d32ca5adSJuan Quintela 
1117d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1118d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
1119d32ca5adSJuan Quintela 
1120d32ca5adSJuan Quintela         qemu_sem_init(&p->sem, 0);
1121d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1122d32ca5adSJuan Quintela         p->id = i;
1123d32ca5adSJuan Quintela         p->pages = multifd_pages_init(page_count);
112406833d83SFabiano Rosas 
112506833d83SFabiano Rosas         if (use_packets) {
1126d32ca5adSJuan Quintela             p->packet_len = sizeof(MultiFDPacket_t)
1127d32ca5adSJuan Quintela                           + sizeof(uint64_t) * page_count;
1128d32ca5adSJuan Quintela             p->packet = g_malloc0(p->packet_len);
1129d32ca5adSJuan Quintela             p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1130d32ca5adSJuan Quintela             p->packet->version = cpu_to_be32(MULTIFD_VERSION);
113106833d83SFabiano Rosas 
1132d48c3a04SJuan Quintela             /* We need one extra place for the packet header */
1133d48c3a04SJuan Quintela             p->iov = g_new0(struct iovec, page_count + 1);
113406833d83SFabiano Rosas         } else {
113506833d83SFabiano Rosas             p->iov = g_new0(struct iovec, page_count);
113606833d83SFabiano Rosas         }
113706833d83SFabiano Rosas         p->name = g_strdup_printf("multifdsend_%d", i);
1138ddec20f8SJuan Quintela         p->page_size = qemu_target_page_size();
1139d6f45ebaSJuan Quintela         p->page_count = page_count;
11405b1d9babSLeonardo Bras         p->write_flags = 0;
1141b7b03eb6SFabiano Rosas 
1142b7b03eb6SFabiano Rosas         if (!multifd_new_send_channel_create(p, &local_err)) {
1143b7b03eb6SFabiano Rosas             return false;
1144b7b03eb6SFabiano Rosas         }
1145d32ca5adSJuan Quintela     }
1146ab7cbb0bSJuan Quintela 
114793fa9dc2SFabiano Rosas     /*
114893fa9dc2SFabiano Rosas      * Wait until channel creation has started for all channels. The
114993fa9dc2SFabiano Rosas      * creation can still fail, but no more channels will be created
115093fa9dc2SFabiano Rosas      * past this point.
115193fa9dc2SFabiano Rosas      */
115293fa9dc2SFabiano Rosas     for (i = 0; i < thread_count; i++) {
115393fa9dc2SFabiano Rosas         qemu_sem_wait(&multifd_send_state->channels_created);
115493fa9dc2SFabiano Rosas     }
115593fa9dc2SFabiano Rosas 
1156ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1157ab7cbb0bSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
1158ab7cbb0bSJuan Quintela 
1159bd8b0a8fSFabiano Rosas         ret = multifd_send_state->ops->send_setup(p, &local_err);
1160ab7cbb0bSJuan Quintela         if (ret) {
1161bd8b0a8fSFabiano Rosas             break;
1162ab7cbb0bSJuan Quintela         }
1163ab7cbb0bSJuan Quintela     }
1164bd8b0a8fSFabiano Rosas 
1165bd8b0a8fSFabiano Rosas     if (ret) {
1166bd8b0a8fSFabiano Rosas         migrate_set_error(s, local_err);
1167bd8b0a8fSFabiano Rosas         error_report_err(local_err);
1168bd8b0a8fSFabiano Rosas         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1169bd8b0a8fSFabiano Rosas                           MIGRATION_STATUS_FAILED);
1170bd8b0a8fSFabiano Rosas         return false;
1171bd8b0a8fSFabiano Rosas     }
1172bd8b0a8fSFabiano Rosas 
1173bd8b0a8fSFabiano Rosas     return true;
1174d32ca5adSJuan Quintela }
1175d32ca5adSJuan Quintela 
1176d117ed06SFabiano Rosas bool multifd_recv(void)
1177d117ed06SFabiano Rosas {
1178d117ed06SFabiano Rosas     int i;
1179d117ed06SFabiano Rosas     static int next_recv_channel;
1180d117ed06SFabiano Rosas     MultiFDRecvParams *p = NULL;
1181d117ed06SFabiano Rosas     MultiFDRecvData *data = multifd_recv_state->data;
1182d117ed06SFabiano Rosas 
1183d117ed06SFabiano Rosas     /*
1184d117ed06SFabiano Rosas      * next_channel can remain from a previous migration that was
1185d117ed06SFabiano Rosas      * using more channels, so ensure it doesn't overflow if the
1186d117ed06SFabiano Rosas      * limit is lower now.
1187d117ed06SFabiano Rosas      */
1188d117ed06SFabiano Rosas     next_recv_channel %= migrate_multifd_channels();
1189d117ed06SFabiano Rosas     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1190d117ed06SFabiano Rosas         if (multifd_recv_should_exit()) {
1191d117ed06SFabiano Rosas             return false;
1192d117ed06SFabiano Rosas         }
1193d117ed06SFabiano Rosas 
1194d117ed06SFabiano Rosas         p = &multifd_recv_state->params[i];
1195d117ed06SFabiano Rosas 
1196d117ed06SFabiano Rosas         if (qatomic_read(&p->pending_job) == false) {
1197d117ed06SFabiano Rosas             next_recv_channel = (i + 1) % migrate_multifd_channels();
1198d117ed06SFabiano Rosas             break;
1199d117ed06SFabiano Rosas         }
1200d117ed06SFabiano Rosas     }
1201d117ed06SFabiano Rosas 
1202d117ed06SFabiano Rosas     /*
1203d117ed06SFabiano Rosas      * Order pending_job read before manipulating p->data below. Pairs
1204d117ed06SFabiano Rosas      * with qatomic_store_release() at multifd_recv_thread().
1205d117ed06SFabiano Rosas      */
1206d117ed06SFabiano Rosas     smp_mb_acquire();
1207d117ed06SFabiano Rosas 
1208d117ed06SFabiano Rosas     assert(!p->data->size);
1209d117ed06SFabiano Rosas     multifd_recv_state->data = p->data;
1210d117ed06SFabiano Rosas     p->data = data;
1211d117ed06SFabiano Rosas 
1212d117ed06SFabiano Rosas     /*
1213d117ed06SFabiano Rosas      * Order p->data update before setting pending_job. Pairs with
1214d117ed06SFabiano Rosas      * qatomic_load_acquire() at multifd_recv_thread().
1215d117ed06SFabiano Rosas      */
1216d117ed06SFabiano Rosas     qatomic_store_release(&p->pending_job, true);
1217d117ed06SFabiano Rosas     qemu_sem_post(&p->sem);
1218d117ed06SFabiano Rosas 
1219d117ed06SFabiano Rosas     return true;
1220d117ed06SFabiano Rosas }
1221d117ed06SFabiano Rosas 
1222d117ed06SFabiano Rosas MultiFDRecvData *multifd_get_recv_data(void)
1223d117ed06SFabiano Rosas {
1224d117ed06SFabiano Rosas     return multifd_recv_state->data;
1225d117ed06SFabiano Rosas }
1226d117ed06SFabiano Rosas 
1227d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err)
1228d32ca5adSJuan Quintela {
1229d32ca5adSJuan Quintela     int i;
1230d32ca5adSJuan Quintela 
1231d32ca5adSJuan Quintela     trace_multifd_recv_terminate_threads(err != NULL);
1232d32ca5adSJuan Quintela 
123311dd7be5SFabiano Rosas     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
123411dd7be5SFabiano Rosas         return;
123511dd7be5SFabiano Rosas     }
123611dd7be5SFabiano Rosas 
1237d32ca5adSJuan Quintela     if (err) {
1238d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
1239d32ca5adSJuan Quintela         migrate_set_error(s, err);
1240d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
1241d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
1242d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
1243d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
1244d32ca5adSJuan Quintela         }
1245d32ca5adSJuan Quintela     }
1246d32ca5adSJuan Quintela 
1247d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1248d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1249d32ca5adSJuan Quintela 
1250d32ca5adSJuan Quintela         /*
1251d117ed06SFabiano Rosas          * The migration thread and channels interact differently
1252d117ed06SFabiano Rosas          * depending on the presence of packets.
1253d13f0026SFabiano Rosas          */
125406833d83SFabiano Rosas         if (multifd_use_packets()) {
1255d117ed06SFabiano Rosas             /*
1256d117ed06SFabiano Rosas              * The channel receives as long as there are packets. When
1257d117ed06SFabiano Rosas              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1258d117ed06SFabiano Rosas              * channel waits for the migration thread to sync. If the
1259d117ed06SFabiano Rosas              * sync never happens, do it here.
1260d117ed06SFabiano Rosas              */
1261d13f0026SFabiano Rosas             qemu_sem_post(&p->sem_sync);
1262d117ed06SFabiano Rosas         } else {
1263d117ed06SFabiano Rosas             /*
1264d117ed06SFabiano Rosas              * The channel waits for the migration thread to give it
1265d117ed06SFabiano Rosas              * work. When the migration thread runs out of work, it
1266d117ed06SFabiano Rosas              * releases the channel and waits for any pending work to
1267d117ed06SFabiano Rosas              * finish. If we reach here (e.g. due to error) before the
1268d117ed06SFabiano Rosas              * work runs out, release the channel.
1269d117ed06SFabiano Rosas              */
1270d117ed06SFabiano Rosas             qemu_sem_post(&p->sem);
127106833d83SFabiano Rosas         }
1272d13f0026SFabiano Rosas 
1273d13f0026SFabiano Rosas         /*
1274d32ca5adSJuan Quintela          * We could arrive here for two reasons:
1275d32ca5adSJuan Quintela          *  - normal quit, i.e. everything went fine, just finished
1276d32ca5adSJuan Quintela          *  - error quit: We close the channels so the channel threads
1277d32ca5adSJuan Quintela          *    finish the qio_channel_read_all_eof()
1278d32ca5adSJuan Quintela          */
1279d32ca5adSJuan Quintela         if (p->c) {
1280d32ca5adSJuan Quintela             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1281d32ca5adSJuan Quintela         }
1282d32ca5adSJuan Quintela     }
1283d32ca5adSJuan Quintela }
1284d32ca5adSJuan Quintela 
1285cde85c37SPeter Xu void multifd_recv_shutdown(void)
1286cfc3bcf3SLeonardo Bras {
128751b07548SJuan Quintela     if (migrate_multifd()) {
1288cfc3bcf3SLeonardo Bras         multifd_recv_terminate_threads(NULL);
1289cfc3bcf3SLeonardo Bras     }
1290cfc3bcf3SLeonardo Bras }
1291cfc3bcf3SLeonardo Bras 
12925e6ea8a1SPeter Xu static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
12935e6ea8a1SPeter Xu {
12945e6ea8a1SPeter Xu     migration_ioc_unregister_yank(p->c);
12955e6ea8a1SPeter Xu     object_unref(OBJECT(p->c));
12965e6ea8a1SPeter Xu     p->c = NULL;
12975e6ea8a1SPeter Xu     qemu_mutex_destroy(&p->mutex);
12985e6ea8a1SPeter Xu     qemu_sem_destroy(&p->sem_sync);
1299d117ed06SFabiano Rosas     qemu_sem_destroy(&p->sem);
13005e6ea8a1SPeter Xu     g_free(p->name);
13015e6ea8a1SPeter Xu     p->name = NULL;
13025e6ea8a1SPeter Xu     p->packet_len = 0;
13035e6ea8a1SPeter Xu     g_free(p->packet);
13045e6ea8a1SPeter Xu     p->packet = NULL;
13055e6ea8a1SPeter Xu     g_free(p->iov);
13065e6ea8a1SPeter Xu     p->iov = NULL;
13075e6ea8a1SPeter Xu     g_free(p->normal);
13085e6ea8a1SPeter Xu     p->normal = NULL;
13095e6ea8a1SPeter Xu     multifd_recv_state->ops->recv_cleanup(p);
13105e6ea8a1SPeter Xu }
13115e6ea8a1SPeter Xu 
13125e6ea8a1SPeter Xu static void multifd_recv_cleanup_state(void)
13135e6ea8a1SPeter Xu {
13145e6ea8a1SPeter Xu     qemu_sem_destroy(&multifd_recv_state->sem_sync);
13155e6ea8a1SPeter Xu     g_free(multifd_recv_state->params);
13165e6ea8a1SPeter Xu     multifd_recv_state->params = NULL;
1317d117ed06SFabiano Rosas     g_free(multifd_recv_state->data);
1318d117ed06SFabiano Rosas     multifd_recv_state->data = NULL;
13195e6ea8a1SPeter Xu     g_free(multifd_recv_state);
13205e6ea8a1SPeter Xu     multifd_recv_state = NULL;
13215e6ea8a1SPeter Xu }
13225e6ea8a1SPeter Xu 
1323cde85c37SPeter Xu void multifd_recv_cleanup(void)
1324d32ca5adSJuan Quintela {
1325d32ca5adSJuan Quintela     int i;
1326d32ca5adSJuan Quintela 
132751b07548SJuan Quintela     if (!migrate_multifd()) {
1328e5bac1f5SLeonardo Bras         return;
1329d32ca5adSJuan Quintela     }
1330d32ca5adSJuan Quintela     multifd_recv_terminate_threads(NULL);
1331d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1332d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1333d32ca5adSJuan Quintela 
1334a2a63c4aSFabiano Rosas         if (p->thread_created) {
133510351fbaSLeonardo Bras             qemu_thread_join(&p->thread);
1336d32ca5adSJuan Quintela         }
1337a2a63c4aSFabiano Rosas     }
1338d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
13395e6ea8a1SPeter Xu         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1340d32ca5adSJuan Quintela     }
13415e6ea8a1SPeter Xu     multifd_recv_cleanup_state();
1342d32ca5adSJuan Quintela }
1343d32ca5adSJuan Quintela 
1344d32ca5adSJuan Quintela void multifd_recv_sync_main(void)
1345d32ca5adSJuan Quintela {
13464aac6b1eSFabiano Rosas     int thread_count = migrate_multifd_channels();
1347a49d15a3SFabiano Rosas     bool file_based = !multifd_use_packets();
1348d32ca5adSJuan Quintela     int i;
1349d32ca5adSJuan Quintela 
1350a49d15a3SFabiano Rosas     if (!migrate_multifd()) {
1351d32ca5adSJuan Quintela         return;
1352d32ca5adSJuan Quintela     }
1353d32ca5adSJuan Quintela 
13544aac6b1eSFabiano Rosas     /*
1355a49d15a3SFabiano Rosas      * File-based channels don't use packets and therefore need to
1356a49d15a3SFabiano Rosas      * wait for more work. Release them to start the sync.
1357a49d15a3SFabiano Rosas      */
1358a49d15a3SFabiano Rosas     if (file_based) {
1359a49d15a3SFabiano Rosas         for (i = 0; i < thread_count; i++) {
1360a49d15a3SFabiano Rosas             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1361a49d15a3SFabiano Rosas 
1362a49d15a3SFabiano Rosas             trace_multifd_recv_sync_main_signal(p->id);
1363a49d15a3SFabiano Rosas             qemu_sem_post(&p->sem);
1364a49d15a3SFabiano Rosas         }
1365a49d15a3SFabiano Rosas     }
1366a49d15a3SFabiano Rosas 
1367a49d15a3SFabiano Rosas     /*
13684aac6b1eSFabiano Rosas      * Initiate the synchronization by waiting for all channels.
1369a49d15a3SFabiano Rosas      *
13704aac6b1eSFabiano Rosas      * For socket-based migration this means each channel has received
13714aac6b1eSFabiano Rosas      * the SYNC packet on the stream.
1372a49d15a3SFabiano Rosas      *
1373a49d15a3SFabiano Rosas      * For file-based migration this means each channel is done with
1374a49d15a3SFabiano Rosas      * the work (pending_job=false).
13754aac6b1eSFabiano Rosas      */
13764aac6b1eSFabiano Rosas     for (i = 0; i < thread_count; i++) {
13774aac6b1eSFabiano Rosas         trace_multifd_recv_sync_main_wait(i);
1378d32ca5adSJuan Quintela         qemu_sem_wait(&multifd_recv_state->sem_sync);
1379d32ca5adSJuan Quintela     }
13804aac6b1eSFabiano Rosas 
1381a49d15a3SFabiano Rosas     if (file_based) {
1382a49d15a3SFabiano Rosas         /*
1383a49d15a3SFabiano Rosas          * For file-based loading is done in one iteration. We're
1384a49d15a3SFabiano Rosas          * done.
1385a49d15a3SFabiano Rosas          */
1386a49d15a3SFabiano Rosas         return;
1387a49d15a3SFabiano Rosas     }
1388a49d15a3SFabiano Rosas 
13894aac6b1eSFabiano Rosas     /*
13904aac6b1eSFabiano Rosas      * Sync done. Release the channels for the next iteration.
13914aac6b1eSFabiano Rosas      */
13924aac6b1eSFabiano Rosas     for (i = 0; i < thread_count; i++) {
1393d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1394d32ca5adSJuan Quintela 
13956e8a355dSDaniel Brodsky         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1396d32ca5adSJuan Quintela             if (multifd_recv_state->packet_num < p->packet_num) {
1397d32ca5adSJuan Quintela                 multifd_recv_state->packet_num = p->packet_num;
1398d32ca5adSJuan Quintela             }
13996e8a355dSDaniel Brodsky         }
1400d32ca5adSJuan Quintela         trace_multifd_recv_sync_main_signal(p->id);
1401d32ca5adSJuan Quintela         qemu_sem_post(&p->sem_sync);
1402d32ca5adSJuan Quintela     }
1403d32ca5adSJuan Quintela     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1404d32ca5adSJuan Quintela }
1405d32ca5adSJuan Quintela 
1406d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque)
1407d32ca5adSJuan Quintela {
1408d32ca5adSJuan Quintela     MultiFDRecvParams *p = opaque;
1409d32ca5adSJuan Quintela     Error *local_err = NULL;
141006833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1411d32ca5adSJuan Quintela     int ret;
1412d32ca5adSJuan Quintela 
1413d32ca5adSJuan Quintela     trace_multifd_recv_thread_start(p->id);
1414d32ca5adSJuan Quintela     rcu_register_thread();
1415d32ca5adSJuan Quintela 
1416d32ca5adSJuan Quintela     while (true) {
141706833d83SFabiano Rosas         uint32_t flags = 0;
14189db19125SFabiano Rosas         bool has_data = false;
14199db19125SFabiano Rosas         p->normal_num = 0;
1420d32ca5adSJuan Quintela 
1421d117ed06SFabiano Rosas         if (use_packets) {
142211dd7be5SFabiano Rosas             if (multifd_recv_should_exit()) {
1423d32ca5adSJuan Quintela                 break;
1424d32ca5adSJuan Quintela             }
1425d32ca5adSJuan Quintela 
1426d32ca5adSJuan Quintela             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1427d32ca5adSJuan Quintela                                            p->packet_len, &local_err);
1428bca762c2SLi Zhang             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1429d32ca5adSJuan Quintela                 break;
1430d32ca5adSJuan Quintela             }
1431d32ca5adSJuan Quintela 
1432d32ca5adSJuan Quintela             qemu_mutex_lock(&p->mutex);
1433d32ca5adSJuan Quintela             ret = multifd_recv_unfill_packet(p, &local_err);
1434d32ca5adSJuan Quintela             if (ret) {
1435d32ca5adSJuan Quintela                 qemu_mutex_unlock(&p->mutex);
1436d32ca5adSJuan Quintela                 break;
1437d32ca5adSJuan Quintela             }
1438d32ca5adSJuan Quintela 
1439d32ca5adSJuan Quintela             flags = p->flags;
1440ab7cbb0bSJuan Quintela             /* recv methods don't know how to handle the SYNC flag */
1441ab7cbb0bSJuan Quintela             p->flags &= ~MULTIFD_FLAG_SYNC;
14429db19125SFabiano Rosas             has_data = !!p->normal_num;
1443d32ca5adSJuan Quintela             qemu_mutex_unlock(&p->mutex);
1444d117ed06SFabiano Rosas         } else {
1445d117ed06SFabiano Rosas             /*
1446d117ed06SFabiano Rosas              * No packets, so we need to wait for the vmstate code to
1447d117ed06SFabiano Rosas              * give us work.
1448d117ed06SFabiano Rosas              */
1449d117ed06SFabiano Rosas             qemu_sem_wait(&p->sem);
1450d117ed06SFabiano Rosas 
1451d117ed06SFabiano Rosas             if (multifd_recv_should_exit()) {
1452d117ed06SFabiano Rosas                 break;
1453d117ed06SFabiano Rosas             }
1454d117ed06SFabiano Rosas 
1455d117ed06SFabiano Rosas             /* pairs with qatomic_store_release() at multifd_recv() */
1456d117ed06SFabiano Rosas             if (!qatomic_load_acquire(&p->pending_job)) {
1457d117ed06SFabiano Rosas                 /*
1458d117ed06SFabiano Rosas                  * Migration thread did not send work, this is
1459d117ed06SFabiano Rosas                  * equivalent to pending_sync on the sending
1460d117ed06SFabiano Rosas                  * side. Post sem_sync to notify we reached this
1461d117ed06SFabiano Rosas                  * point.
1462d117ed06SFabiano Rosas                  */
1463d117ed06SFabiano Rosas                 qemu_sem_post(&multifd_recv_state->sem_sync);
1464d117ed06SFabiano Rosas                 continue;
1465d117ed06SFabiano Rosas             }
1466d117ed06SFabiano Rosas 
1467d117ed06SFabiano Rosas             has_data = !!p->data->size;
146806833d83SFabiano Rosas         }
1469d32ca5adSJuan Quintela 
14709db19125SFabiano Rosas         if (has_data) {
14719db19125SFabiano Rosas             ret = multifd_recv_state->ops->recv(p, &local_err);
1472d32ca5adSJuan Quintela             if (ret != 0) {
1473d32ca5adSJuan Quintela                 break;
1474d32ca5adSJuan Quintela             }
1475d32ca5adSJuan Quintela         }
1476d32ca5adSJuan Quintela 
147706833d83SFabiano Rosas         if (use_packets) {
1478d32ca5adSJuan Quintela             if (flags & MULTIFD_FLAG_SYNC) {
1479d32ca5adSJuan Quintela                 qemu_sem_post(&multifd_recv_state->sem_sync);
1480d32ca5adSJuan Quintela                 qemu_sem_wait(&p->sem_sync);
1481d32ca5adSJuan Quintela             }
1482d117ed06SFabiano Rosas         } else {
1483d117ed06SFabiano Rosas             p->total_normal_pages += p->data->size / qemu_target_page_size();
1484d117ed06SFabiano Rosas             p->data->size = 0;
1485d117ed06SFabiano Rosas             /*
1486d117ed06SFabiano Rosas              * Order data->size update before clearing
1487d117ed06SFabiano Rosas              * pending_job. Pairs with smp_mb_acquire() at
1488d117ed06SFabiano Rosas              * multifd_recv().
1489d117ed06SFabiano Rosas              */
1490d117ed06SFabiano Rosas             qatomic_store_release(&p->pending_job, false);
1491d32ca5adSJuan Quintela         }
149206833d83SFabiano Rosas     }
1493d32ca5adSJuan Quintela 
1494d32ca5adSJuan Quintela     if (local_err) {
1495d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
149613f2cb21SPan Nengyuan         error_free(local_err);
1497d32ca5adSJuan Quintela     }
1498d32ca5adSJuan Quintela 
1499d32ca5adSJuan Quintela     rcu_unregister_thread();
150005b7ec18SPeter Xu     trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
1501d32ca5adSJuan Quintela 
1502d32ca5adSJuan Quintela     return NULL;
1503d32ca5adSJuan Quintela }
1504d32ca5adSJuan Quintela 
1505cde85c37SPeter Xu int multifd_recv_setup(Error **errp)
1506d32ca5adSJuan Quintela {
1507d32ca5adSJuan Quintela     int thread_count;
1508d32ca5adSJuan Quintela     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
150906833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1510d32ca5adSJuan Quintela     uint8_t i;
1511d32ca5adSJuan Quintela 
15126720c2b3Smanish.mishra     /*
15136720c2b3Smanish.mishra      * Return successfully if multiFD recv state is already initialised
15146720c2b3Smanish.mishra      * or multiFD is not enabled.
15156720c2b3Smanish.mishra      */
151651b07548SJuan Quintela     if (multifd_recv_state || !migrate_multifd()) {
1517d32ca5adSJuan Quintela         return 0;
1518d32ca5adSJuan Quintela     }
15196720c2b3Smanish.mishra 
1520d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
1521d32ca5adSJuan Quintela     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1522d32ca5adSJuan Quintela     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1523d117ed06SFabiano Rosas 
1524d117ed06SFabiano Rosas     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1525d117ed06SFabiano Rosas     multifd_recv_state->data->size = 0;
1526d117ed06SFabiano Rosas 
1527d73415a3SStefan Hajnoczi     qatomic_set(&multifd_recv_state->count, 0);
152811dd7be5SFabiano Rosas     qatomic_set(&multifd_recv_state->exiting, 0);
1529d32ca5adSJuan Quintela     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1530ab7cbb0bSJuan Quintela     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1531d32ca5adSJuan Quintela 
1532d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1533d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1534d32ca5adSJuan Quintela 
1535d32ca5adSJuan Quintela         qemu_mutex_init(&p->mutex);
1536d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1537d117ed06SFabiano Rosas         qemu_sem_init(&p->sem, 0);
1538d117ed06SFabiano Rosas         p->pending_job = false;
1539d32ca5adSJuan Quintela         p->id = i;
154006833d83SFabiano Rosas 
1541d117ed06SFabiano Rosas         p->data = g_new0(MultiFDRecvData, 1);
1542d117ed06SFabiano Rosas         p->data->size = 0;
1543d117ed06SFabiano Rosas 
154406833d83SFabiano Rosas         if (use_packets) {
1545d32ca5adSJuan Quintela             p->packet_len = sizeof(MultiFDPacket_t)
1546d32ca5adSJuan Quintela                 + sizeof(uint64_t) * page_count;
1547d32ca5adSJuan Quintela             p->packet = g_malloc0(p->packet_len);
154806833d83SFabiano Rosas         }
1549d32ca5adSJuan Quintela         p->name = g_strdup_printf("multifdrecv_%d", i);
1550226468baSJuan Quintela         p->iov = g_new0(struct iovec, page_count);
1551cf2d4aa8SJuan Quintela         p->normal = g_new0(ram_addr_t, page_count);
1552d6f45ebaSJuan Quintela         p->page_count = page_count;
1553ddec20f8SJuan Quintela         p->page_size = qemu_target_page_size();
1554d32ca5adSJuan Quintela     }
1555ab7cbb0bSJuan Quintela 
1556ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1557ab7cbb0bSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1558ab7cbb0bSJuan Quintela         int ret;
1559ab7cbb0bSJuan Quintela 
15603fc58efaSAvihai Horon         ret = multifd_recv_state->ops->recv_setup(p, errp);
1561ab7cbb0bSJuan Quintela         if (ret) {
1562ab7cbb0bSJuan Quintela             return ret;
1563ab7cbb0bSJuan Quintela         }
1564ab7cbb0bSJuan Quintela     }
1565d32ca5adSJuan Quintela     return 0;
1566d32ca5adSJuan Quintela }
1567d32ca5adSJuan Quintela 
1568d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void)
1569d32ca5adSJuan Quintela {
1570d32ca5adSJuan Quintela     int thread_count = migrate_multifd_channels();
1571d32ca5adSJuan Quintela 
157251b07548SJuan Quintela     if (!migrate_multifd()) {
1573d32ca5adSJuan Quintela         return true;
1574d32ca5adSJuan Quintela     }
1575d32ca5adSJuan Quintela 
1576a59136f3SDr. David Alan Gilbert     if (!multifd_recv_state) {
1577a59136f3SDr. David Alan Gilbert         /* Called before any connections created */
1578a59136f3SDr. David Alan Gilbert         return false;
1579a59136f3SDr. David Alan Gilbert     }
1580a59136f3SDr. David Alan Gilbert 
1581d73415a3SStefan Hajnoczi     return thread_count == qatomic_read(&multifd_recv_state->count);
1582d32ca5adSJuan Quintela }
1583d32ca5adSJuan Quintela 
1584d32ca5adSJuan Quintela /*
1585d32ca5adSJuan Quintela  * Try to receive all multifd channels to get ready for the migration.
15866720c2b3Smanish.mishra  * Sets @errp when failing to receive the current channel.
1587d32ca5adSJuan Quintela  */
15886720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1589d32ca5adSJuan Quintela {
1590d32ca5adSJuan Quintela     MultiFDRecvParams *p;
1591d32ca5adSJuan Quintela     Error *local_err = NULL;
159206833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1593d32ca5adSJuan Quintela     int id;
1594d32ca5adSJuan Quintela 
159506833d83SFabiano Rosas     if (use_packets) {
1596d32ca5adSJuan Quintela         id = multifd_recv_initial_packet(ioc, &local_err);
1597d32ca5adSJuan Quintela         if (id < 0) {
1598d32ca5adSJuan Quintela             multifd_recv_terminate_threads(local_err);
1599d32ca5adSJuan Quintela             error_propagate_prepend(errp, local_err,
1600d32ca5adSJuan Quintela                                     "failed to receive packet"
1601d32ca5adSJuan Quintela                                     " via multifd channel %d: ",
1602d73415a3SStefan Hajnoczi                                     qatomic_read(&multifd_recv_state->count));
16036720c2b3Smanish.mishra             return;
1604d32ca5adSJuan Quintela         }
1605d32ca5adSJuan Quintela         trace_multifd_recv_new_channel(id);
160606833d83SFabiano Rosas     } else {
16072dd7ee7aSFabiano Rosas         id = qatomic_read(&multifd_recv_state->count);
160806833d83SFabiano Rosas     }
1609d32ca5adSJuan Quintela 
1610d32ca5adSJuan Quintela     p = &multifd_recv_state->params[id];
1611d32ca5adSJuan Quintela     if (p->c != NULL) {
1612d32ca5adSJuan Quintela         error_setg(&local_err, "multifd: received id '%d' already setup'",
1613d32ca5adSJuan Quintela                    id);
1614d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
1615d32ca5adSJuan Quintela         error_propagate(errp, local_err);
16166720c2b3Smanish.mishra         return;
1617d32ca5adSJuan Quintela     }
1618d32ca5adSJuan Quintela     p->c = ioc;
1619d32ca5adSJuan Quintela     object_ref(OBJECT(ioc));
1620d32ca5adSJuan Quintela 
1621a2a63c4aSFabiano Rosas     p->thread_created = true;
1622d32ca5adSJuan Quintela     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1623d32ca5adSJuan Quintela                        QEMU_THREAD_JOINABLE);
1624d73415a3SStefan Hajnoczi     qatomic_inc(&multifd_recv_state->count);
1625d32ca5adSJuan Quintela }
1626