xref: /openbmc/qemu/migration/multifd.c (revision 7e4480dde246982f18b51d8f316a95a7dba2b825)
1d32ca5adSJuan Quintela /*
2d32ca5adSJuan Quintela  * Multifd common code
3d32ca5adSJuan Quintela  *
4d32ca5adSJuan Quintela  * Copyright (c) 2019-2020 Red Hat Inc
5d32ca5adSJuan Quintela  *
6d32ca5adSJuan Quintela  * Authors:
7d32ca5adSJuan Quintela  *  Juan Quintela <quintela@redhat.com>
8d32ca5adSJuan Quintela  *
9d32ca5adSJuan Quintela  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10d32ca5adSJuan Quintela  * See the COPYING file in the top-level directory.
11d32ca5adSJuan Quintela  */
12d32ca5adSJuan Quintela 
13d32ca5adSJuan Quintela #include "qemu/osdep.h"
14303e6f54SHao Xiang #include "qemu/cutils.h"
15d32ca5adSJuan Quintela #include "qemu/rcu.h"
16d32ca5adSJuan Quintela #include "exec/target_page.h"
17d32ca5adSJuan Quintela #include "sysemu/sysemu.h"
18d32ca5adSJuan Quintela #include "exec/ramblock.h"
19d32ca5adSJuan Quintela #include "qemu/error-report.h"
20d32ca5adSJuan Quintela #include "qapi/error.h"
21b7b03eb6SFabiano Rosas #include "file.h"
22d32ca5adSJuan Quintela #include "migration.h"
23947701ccSJuan Quintela #include "migration-stats.h"
24d32ca5adSJuan Quintela #include "socket.h"
2529647140SChuan Zheng #include "tls.h"
26d32ca5adSJuan Quintela #include "qemu-file.h"
27d32ca5adSJuan Quintela #include "trace.h"
28d32ca5adSJuan Quintela #include "multifd.h"
291b1f4ab6SJiang Jiacheng #include "threadinfo.h"
30b4bc342cSJuan Quintela #include "options.h"
31b5eea99eSLukas Straub #include "qemu/yank.h"
32b7b03eb6SFabiano Rosas #include "io/channel-file.h"
33b5eea99eSLukas Straub #include "io/channel-socket.h"
341a92d6d5SLukas Straub #include "yank_functions.h"
35b5eea99eSLukas Straub 
36d32ca5adSJuan Quintela /* Multiple fd's */
37d32ca5adSJuan Quintela 
38d32ca5adSJuan Quintela #define MULTIFD_MAGIC 0x11223344U
39d32ca5adSJuan Quintela #define MULTIFD_VERSION 1
40d32ca5adSJuan Quintela 
41d32ca5adSJuan Quintela typedef struct {
42d32ca5adSJuan Quintela     uint32_t magic;
43d32ca5adSJuan Quintela     uint32_t version;
44d32ca5adSJuan Quintela     unsigned char uuid[16]; /* QemuUUID */
45d32ca5adSJuan Quintela     uint8_t id;
46d32ca5adSJuan Quintela     uint8_t unused1[7];     /* Reserved for future use */
47d32ca5adSJuan Quintela     uint64_t unused2[4];    /* Reserved for future use */
48d32ca5adSJuan Quintela } __attribute__((packed)) MultiFDInit_t;
49d32ca5adSJuan Quintela 
5098ea497dSPeter Xu struct {
5198ea497dSPeter Xu     MultiFDSendParams *params;
5298ea497dSPeter Xu     /*
5398ea497dSPeter Xu      * Global number of generated multifd packets.
5498ea497dSPeter Xu      *
5598ea497dSPeter Xu      * Note that we used 'uintptr_t' because it'll naturally support atomic
5698ea497dSPeter Xu      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
5798ea497dSPeter Xu      * multifd will overflow the packet_num easier, but that should be
5898ea497dSPeter Xu      * fine.
5998ea497dSPeter Xu      *
6098ea497dSPeter Xu      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
6198ea497dSPeter Xu      * hosts, however so far it does not support atomic fetch_add() yet.
6298ea497dSPeter Xu      * Make it easy for now.
6398ea497dSPeter Xu      */
6498ea497dSPeter Xu     uintptr_t packet_num;
6593fa9dc2SFabiano Rosas     /*
6693fa9dc2SFabiano Rosas      * Synchronization point past which no more channels will be
6793fa9dc2SFabiano Rosas      * created.
6893fa9dc2SFabiano Rosas      */
6993fa9dc2SFabiano Rosas     QemuSemaphore channels_created;
7098ea497dSPeter Xu     /* send channels ready */
7198ea497dSPeter Xu     QemuSemaphore channels_ready;
7298ea497dSPeter Xu     /*
7398ea497dSPeter Xu      * Have we already run terminate threads.  There is a race when it
7498ea497dSPeter Xu      * happens that we got one error while we are exiting.
7598ea497dSPeter Xu      * We will use atomic operations.  Only valid values are 0 and 1.
7698ea497dSPeter Xu      */
7798ea497dSPeter Xu     int exiting;
7898ea497dSPeter Xu     /* multifd ops */
79308d165cSFabiano Rosas     const MultiFDMethods *ops;
8098ea497dSPeter Xu } *multifd_send_state;
8198ea497dSPeter Xu 
8211dd7be5SFabiano Rosas struct {
8311dd7be5SFabiano Rosas     MultiFDRecvParams *params;
84d117ed06SFabiano Rosas     MultiFDRecvData *data;
8511dd7be5SFabiano Rosas     /* number of created threads */
8611dd7be5SFabiano Rosas     int count;
87d117ed06SFabiano Rosas     /*
88d117ed06SFabiano Rosas      * This is always posted by the recv threads, the migration thread
89d117ed06SFabiano Rosas      * uses it to wait for recv threads to finish assigned tasks.
90d117ed06SFabiano Rosas      */
9111dd7be5SFabiano Rosas     QemuSemaphore sem_sync;
9211dd7be5SFabiano Rosas     /* global number of generated multifd packets */
9311dd7be5SFabiano Rosas     uint64_t packet_num;
9411dd7be5SFabiano Rosas     int exiting;
9511dd7be5SFabiano Rosas     /* multifd ops */
96308d165cSFabiano Rosas     const MultiFDMethods *ops;
9711dd7be5SFabiano Rosas } *multifd_recv_state;
9811dd7be5SFabiano Rosas 
multifd_send_data_alloc(void)9940c9471eSFabiano Rosas MultiFDSendData *multifd_send_data_alloc(void)
1009f0e1089SFabiano Rosas {
1019f0e1089SFabiano Rosas     size_t max_payload_size, size_minus_payload;
1029f0e1089SFabiano Rosas 
1039f0e1089SFabiano Rosas     /*
1049f0e1089SFabiano Rosas      * MultiFDPages_t has a flexible array at the end, account for it
1059f0e1089SFabiano Rosas      * when allocating MultiFDSendData. Use max() in case other types
1069f0e1089SFabiano Rosas      * added to the union in the future are larger than
1079f0e1089SFabiano Rosas      * (MultiFDPages_t + flex array).
1089f0e1089SFabiano Rosas      */
1099f0e1089SFabiano Rosas     max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
1109f0e1089SFabiano Rosas 
1119f0e1089SFabiano Rosas     /*
1129f0e1089SFabiano Rosas      * Account for any holes the compiler might insert. We can't pack
1139f0e1089SFabiano Rosas      * the structure because that misaligns the members and triggers
1149f0e1089SFabiano Rosas      * Waddress-of-packed-member.
1159f0e1089SFabiano Rosas      */
1169f0e1089SFabiano Rosas     size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
1179f0e1089SFabiano Rosas 
1189f0e1089SFabiano Rosas     return g_malloc0(size_minus_payload + max_payload_size);
1199f0e1089SFabiano Rosas }
1209f0e1089SFabiano Rosas 
multifd_use_packets(void)12106833d83SFabiano Rosas static bool multifd_use_packets(void)
12206833d83SFabiano Rosas {
12306833d83SFabiano Rosas     return !migrate_mapped_ram();
12406833d83SFabiano Rosas }
12506833d83SFabiano Rosas 
multifd_send_channel_created(void)126a8a3e710SFabiano Rosas void multifd_send_channel_created(void)
127a8a3e710SFabiano Rosas {
128a8a3e710SFabiano Rosas     qemu_sem_post(&multifd_send_state->channels_created);
129a8a3e710SFabiano Rosas }
130a8a3e710SFabiano Rosas 
131308d165cSFabiano Rosas static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
132ab7cbb0bSJuan Quintela 
multifd_register_ops(int method,const MultiFDMethods * ops)133308d165cSFabiano Rosas void multifd_register_ops(int method, const MultiFDMethods *ops)
1347ec2c2b3SJuan Quintela {
135dc6327d9SFabiano Rosas     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
136dc6327d9SFabiano Rosas     assert(!multifd_ops[method]);
1377ec2c2b3SJuan Quintela     multifd_ops[method] = ops;
1387ec2c2b3SJuan Quintela }
1397ec2c2b3SJuan Quintela 
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)140d32ca5adSJuan Quintela static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
141d32ca5adSJuan Quintela {
142d32ca5adSJuan Quintela     MultiFDInit_t msg = {};
143cbec7eb7SJuan Quintela     size_t size = sizeof(msg);
144d32ca5adSJuan Quintela     int ret;
145d32ca5adSJuan Quintela 
146d32ca5adSJuan Quintela     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
147d32ca5adSJuan Quintela     msg.version = cpu_to_be32(MULTIFD_VERSION);
148d32ca5adSJuan Quintela     msg.id = p->id;
149d32ca5adSJuan Quintela     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
150d32ca5adSJuan Quintela 
151cbec7eb7SJuan Quintela     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
152d32ca5adSJuan Quintela     if (ret != 0) {
153d32ca5adSJuan Quintela         return -1;
154d32ca5adSJuan Quintela     }
155cbec7eb7SJuan Quintela     stat64_add(&mig_stats.multifd_bytes, size);
156d32ca5adSJuan Quintela     return 0;
157d32ca5adSJuan Quintela }
158d32ca5adSJuan Quintela 
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)159d32ca5adSJuan Quintela static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
160d32ca5adSJuan Quintela {
161d32ca5adSJuan Quintela     MultiFDInit_t msg;
162d32ca5adSJuan Quintela     int ret;
163d32ca5adSJuan Quintela 
164d32ca5adSJuan Quintela     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
165d32ca5adSJuan Quintela     if (ret != 0) {
166d32ca5adSJuan Quintela         return -1;
167d32ca5adSJuan Quintela     }
168d32ca5adSJuan Quintela 
169d32ca5adSJuan Quintela     msg.magic = be32_to_cpu(msg.magic);
170d32ca5adSJuan Quintela     msg.version = be32_to_cpu(msg.version);
171d32ca5adSJuan Quintela 
172d32ca5adSJuan Quintela     if (msg.magic != MULTIFD_MAGIC) {
173d32ca5adSJuan Quintela         error_setg(errp, "multifd: received packet magic %x "
174d32ca5adSJuan Quintela                    "expected %x", msg.magic, MULTIFD_MAGIC);
175d32ca5adSJuan Quintela         return -1;
176d32ca5adSJuan Quintela     }
177d32ca5adSJuan Quintela 
178d32ca5adSJuan Quintela     if (msg.version != MULTIFD_VERSION) {
17904e11404SJuan Quintela         error_setg(errp, "multifd: received packet version %u "
18004e11404SJuan Quintela                    "expected %u", msg.version, MULTIFD_VERSION);
181d32ca5adSJuan Quintela         return -1;
182d32ca5adSJuan Quintela     }
183d32ca5adSJuan Quintela 
184d32ca5adSJuan Quintela     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
185d32ca5adSJuan Quintela         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
186d32ca5adSJuan Quintela         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
187d32ca5adSJuan Quintela 
188d32ca5adSJuan Quintela         error_setg(errp, "multifd: received uuid '%s' and expected "
189d32ca5adSJuan Quintela                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
190d32ca5adSJuan Quintela         g_free(uuid);
191d32ca5adSJuan Quintela         g_free(msg_uuid);
192d32ca5adSJuan Quintela         return -1;
193d32ca5adSJuan Quintela     }
194d32ca5adSJuan Quintela 
195d32ca5adSJuan Quintela     if (msg.id > migrate_multifd_channels()) {
196c77b4085SAvihai Horon         error_setg(errp, "multifd: received channel id %u is greater than "
197c77b4085SAvihai Horon                    "number of channels %u", msg.id, migrate_multifd_channels());
198d32ca5adSJuan Quintela         return -1;
199d32ca5adSJuan Quintela     }
200d32ca5adSJuan Quintela 
201d32ca5adSJuan Quintela     return msg.id;
202d32ca5adSJuan Quintela }
203d32ca5adSJuan Quintela 
multifd_send_fill_packet(MultiFDSendParams * p)20487bb9e95SFabiano Rosas void multifd_send_fill_packet(MultiFDSendParams *p)
20587bb9e95SFabiano Rosas {
20687bb9e95SFabiano Rosas     MultiFDPacket_t *packet = p->packet;
20787bb9e95SFabiano Rosas     uint64_t packet_num;
208d7e58f41SFabiano Rosas     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
20987bb9e95SFabiano Rosas 
21087bb9e95SFabiano Rosas     memset(packet, 0, p->packet_len);
21187bb9e95SFabiano Rosas 
21287bb9e95SFabiano Rosas     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
21387bb9e95SFabiano Rosas     packet->version = cpu_to_be32(MULTIFD_VERSION);
21487bb9e95SFabiano Rosas 
21587bb9e95SFabiano Rosas     packet->flags = cpu_to_be32(p->flags);
21687bb9e95SFabiano Rosas     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
21787bb9e95SFabiano Rosas 
21887bb9e95SFabiano Rosas     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
21987bb9e95SFabiano Rosas     packet->packet_num = cpu_to_be64(packet_num);
22087bb9e95SFabiano Rosas 
22105b7ec18SPeter Xu     p->packets_sent++;
2228a9ef173SPeter Xu 
223d7e58f41SFabiano Rosas     if (!sync_packet) {
22487bb9e95SFabiano Rosas         multifd_ram_fill_packet(p);
225d7e58f41SFabiano Rosas     }
22687bb9e95SFabiano Rosas 
22787bb9e95SFabiano Rosas     trace_multifd_send_fill(p->id, packet_num,
228303e6f54SHao Xiang                             p->flags, p->next_packet_size);
229d32ca5adSJuan Quintela }
230d32ca5adSJuan Quintela 
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)23187bb9e95SFabiano Rosas static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
23287bb9e95SFabiano Rosas {
23381b0ed8aSFabiano Rosas     const MultiFDPacket_t *packet = p->packet;
23481b0ed8aSFabiano Rosas     uint32_t magic = be32_to_cpu(packet->magic);
23581b0ed8aSFabiano Rosas     uint32_t version = be32_to_cpu(packet->version);
23687bb9e95SFabiano Rosas     int ret = 0;
23787bb9e95SFabiano Rosas 
23881b0ed8aSFabiano Rosas     if (magic != MULTIFD_MAGIC) {
23981b0ed8aSFabiano Rosas         error_setg(errp, "multifd: received packet magic %x, expected %x",
24081b0ed8aSFabiano Rosas                    magic, MULTIFD_MAGIC);
24187bb9e95SFabiano Rosas         return -1;
24287bb9e95SFabiano Rosas     }
24387bb9e95SFabiano Rosas 
24481b0ed8aSFabiano Rosas     if (version != MULTIFD_VERSION) {
24581b0ed8aSFabiano Rosas         error_setg(errp, "multifd: received packet version %u, expected %u",
24681b0ed8aSFabiano Rosas                    version, MULTIFD_VERSION);
24787bb9e95SFabiano Rosas         return -1;
24887bb9e95SFabiano Rosas     }
24987bb9e95SFabiano Rosas 
25087bb9e95SFabiano Rosas     p->flags = be32_to_cpu(packet->flags);
25187bb9e95SFabiano Rosas     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
25287bb9e95SFabiano Rosas     p->packet_num = be64_to_cpu(packet->packet_num);
25387bb9e95SFabiano Rosas     p->packets_recved++;
25487bb9e95SFabiano Rosas 
255*7e4480ddSFabiano Rosas     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
25687bb9e95SFabiano Rosas     ret = multifd_ram_unfill_packet(p, errp);
25787bb9e95SFabiano Rosas 
25887bb9e95SFabiano Rosas     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
25987bb9e95SFabiano Rosas                               p->next_packet_size);
26087bb9e95SFabiano Rosas 
26187bb9e95SFabiano Rosas     return ret;
26287bb9e95SFabiano Rosas }
26387bb9e95SFabiano Rosas 
multifd_send_should_exit(void)26415f3f21dSPeter Xu static bool multifd_send_should_exit(void)
26515f3f21dSPeter Xu {
26615f3f21dSPeter Xu     return qatomic_read(&multifd_send_state->exiting);
26715f3f21dSPeter Xu }
26815f3f21dSPeter Xu 
multifd_recv_should_exit(void)26911dd7be5SFabiano Rosas static bool multifd_recv_should_exit(void)
27011dd7be5SFabiano Rosas {
27111dd7be5SFabiano Rosas     return qatomic_read(&multifd_recv_state->exiting);
27211dd7be5SFabiano Rosas }
27311dd7be5SFabiano Rosas 
274d32ca5adSJuan Quintela /*
27548c0f5d5SPeter Xu  * The migration thread can wait on either of the two semaphores.  This
27648c0f5d5SPeter Xu  * function can be used to kick the main thread out of waiting on either of
27748c0f5d5SPeter Xu  * them.  Should mostly only be called when something wrong happened with
27848c0f5d5SPeter Xu  * the current multifd send thread.
27948c0f5d5SPeter Xu  */
multifd_send_kick_main(MultiFDSendParams * p)28048c0f5d5SPeter Xu static void multifd_send_kick_main(MultiFDSendParams *p)
28148c0f5d5SPeter Xu {
28248c0f5d5SPeter Xu     qemu_sem_post(&p->sem_sync);
28348c0f5d5SPeter Xu     qemu_sem_post(&multifd_send_state->channels_ready);
28448c0f5d5SPeter Xu }
28548c0f5d5SPeter Xu 
28648c0f5d5SPeter Xu /*
287a71ef5c7SFabiano Rosas  * multifd_send() works by exchanging the MultiFDSendData object
288a71ef5c7SFabiano Rosas  * provided by the caller with an unused MultiFDSendData object from
289a71ef5c7SFabiano Rosas  * the next channel that is found to be idle.
290d32ca5adSJuan Quintela  *
291a71ef5c7SFabiano Rosas  * The channel owns the data until it finishes transmitting and the
292a71ef5c7SFabiano Rosas  * caller owns the empty object until it fills it with data and calls
293a71ef5c7SFabiano Rosas  * this function again. No locking necessary.
294d32ca5adSJuan Quintela  *
295a71ef5c7SFabiano Rosas  * Switching is safe because both the migration thread and the channel
296a71ef5c7SFabiano Rosas  * thread have barriers in place to serialize access.
2973b40964aSPeter Xu  *
2983b40964aSPeter Xu  * Returns true if succeed, false otherwise.
299d32ca5adSJuan Quintela  */
multifd_send(MultiFDSendData ** send_data)30040c9471eSFabiano Rosas bool multifd_send(MultiFDSendData **send_data)
301d32ca5adSJuan Quintela {
302d32ca5adSJuan Quintela     int i;
303d32ca5adSJuan Quintela     static int next_channel;
304d32ca5adSJuan Quintela     MultiFDSendParams *p = NULL; /* make happy gcc */
3059f0e1089SFabiano Rosas     MultiFDSendData *tmp;
306d32ca5adSJuan Quintela 
30715f3f21dSPeter Xu     if (multifd_send_should_exit()) {
3083b40964aSPeter Xu         return false;
309d32ca5adSJuan Quintela     }
310d32ca5adSJuan Quintela 
311e3cce9afSPeter Xu     /* We wait here, until at least one channel is ready */
312d32ca5adSJuan Quintela     qemu_sem_wait(&multifd_send_state->channels_ready);
313e3cce9afSPeter Xu 
3147e89a140SLaurent Vivier     /*
3157e89a140SLaurent Vivier      * next_channel can remain from a previous migration that was
3167e89a140SLaurent Vivier      * using more channels, so ensure it doesn't overflow if the
3177e89a140SLaurent Vivier      * limit is lower now.
3187e89a140SLaurent Vivier      */
3197e89a140SLaurent Vivier     next_channel %= migrate_multifd_channels();
320d32ca5adSJuan Quintela     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
32115f3f21dSPeter Xu         if (multifd_send_should_exit()) {
3223b40964aSPeter Xu             return false;
323d32ca5adSJuan Quintela         }
32415f3f21dSPeter Xu         p = &multifd_send_state->params[i];
325e3cce9afSPeter Xu         /*
326e3cce9afSPeter Xu          * Lockless read to p->pending_job is safe, because only multifd
327e3cce9afSPeter Xu          * sender thread can clear it.
328e3cce9afSPeter Xu          */
329f5f48a78SPeter Xu         if (qatomic_read(&p->pending_job) == false) {
330d32ca5adSJuan Quintela             next_channel = (i + 1) % migrate_multifd_channels();
331d32ca5adSJuan Quintela             break;
332d32ca5adSJuan Quintela         }
333d32ca5adSJuan Quintela     }
334e3cce9afSPeter Xu 
335e3cce9afSPeter Xu     /*
336488c84acSPeter Xu      * Make sure we read p->pending_job before all the rest.  Pairs with
337488c84acSPeter Xu      * qatomic_store_release() in multifd_send_thread().
338e3cce9afSPeter Xu      */
339488c84acSPeter Xu     smp_mb_acquire();
3409f0e1089SFabiano Rosas 
341a71ef5c7SFabiano Rosas     assert(multifd_payload_empty(p->data));
3429f0e1089SFabiano Rosas 
343a71ef5c7SFabiano Rosas     /*
344a71ef5c7SFabiano Rosas      * Swap the pointers. The channel gets the client data for
345a71ef5c7SFabiano Rosas      * transferring and the client gets back an unused data slot.
346a71ef5c7SFabiano Rosas      */
347a71ef5c7SFabiano Rosas     tmp = *send_data;
348a71ef5c7SFabiano Rosas     *send_data = p->data;
3499f0e1089SFabiano Rosas     p->data = tmp;
350a71ef5c7SFabiano Rosas 
351488c84acSPeter Xu     /*
3529f0e1089SFabiano Rosas      * Making sure p->data is setup before marking pending_job=true. Pairs
353488c84acSPeter Xu      * with the qatomic_load_acquire() in multifd_send_thread().
354488c84acSPeter Xu      */
355488c84acSPeter Xu     qatomic_store_release(&p->pending_job, true);
356d32ca5adSJuan Quintela     qemu_sem_post(&p->sem);
357d32ca5adSJuan Quintela 
3583b40964aSPeter Xu     return true;
359d32ca5adSJuan Quintela }
360d32ca5adSJuan Quintela 
3613ab4441dSPeter Xu /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)3623ab4441dSPeter Xu static void multifd_send_set_error(Error *err)
363d32ca5adSJuan Quintela {
36415f3f21dSPeter Xu     /*
36515f3f21dSPeter Xu      * We don't want to exit each threads twice.  Depending on where
36615f3f21dSPeter Xu      * we get the error, or if there are two independent errors in two
36715f3f21dSPeter Xu      * threads at the same time, we can end calling this function
36815f3f21dSPeter Xu      * twice.
36915f3f21dSPeter Xu      */
37015f3f21dSPeter Xu     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
37115f3f21dSPeter Xu         return;
37215f3f21dSPeter Xu     }
37315f3f21dSPeter Xu 
374d32ca5adSJuan Quintela     if (err) {
375d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
376d32ca5adSJuan Quintela         migrate_set_error(s, err);
377d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
378d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
379d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_DEVICE ||
380d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
381d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
382d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
383d32ca5adSJuan Quintela         }
384d32ca5adSJuan Quintela     }
3853ab4441dSPeter Xu }
386d32ca5adSJuan Quintela 
multifd_send_terminate_threads(void)3873ab4441dSPeter Xu static void multifd_send_terminate_threads(void)
3883ab4441dSPeter Xu {
3893ab4441dSPeter Xu     int i;
3903ab4441dSPeter Xu 
3913ab4441dSPeter Xu     trace_multifd_send_terminate_threads();
3923ab4441dSPeter Xu 
3933ab4441dSPeter Xu     /*
3943ab4441dSPeter Xu      * Tell everyone we're quitting.  No xchg() needed here; we simply
3953ab4441dSPeter Xu      * always set it.
3963ab4441dSPeter Xu      */
3973ab4441dSPeter Xu     qatomic_set(&multifd_send_state->exiting, 1);
39812808db3SPeter Xu 
39912808db3SPeter Xu     /*
40012808db3SPeter Xu      * Firstly, kick all threads out; no matter whether they are just idle,
40112808db3SPeter Xu      * or blocked in an IO system call.
40212808db3SPeter Xu      */
403d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
404d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
405d32ca5adSJuan Quintela 
406d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
407077fbb59SLi Zhang         if (p->c) {
408077fbb59SLi Zhang             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
409077fbb59SLi Zhang         }
410d32ca5adSJuan Quintela     }
41112808db3SPeter Xu 
41212808db3SPeter Xu     /*
41312808db3SPeter Xu      * Finally recycle all the threads.
41412808db3SPeter Xu      */
41512808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
41612808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
41712808db3SPeter Xu 
418e1921f10SFabiano Rosas         if (p->tls_thread_created) {
419e1921f10SFabiano Rosas             qemu_thread_join(&p->tls_thread);
420e1921f10SFabiano Rosas         }
421e1921f10SFabiano Rosas 
422a2a63c4aSFabiano Rosas         if (p->thread_created) {
42312808db3SPeter Xu             qemu_thread_join(&p->thread);
42412808db3SPeter Xu         }
42512808db3SPeter Xu     }
426d32ca5adSJuan Quintela }
427d32ca5adSJuan Quintela 
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)42812808db3SPeter Xu static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
429d32ca5adSJuan Quintela {
4300518b5d8SPeter Xu     if (p->c) {
43120171ea8SLukas Straub         migration_ioc_unregister_yank(p->c);
4321a6e217cSPeter Xu         /*
43361dec060SFabiano Rosas          * The object_unref() cannot guarantee the fd will always be
43461dec060SFabiano Rosas          * released because finalize() of the iochannel is only
43561dec060SFabiano Rosas          * triggered on the last reference and it's not guaranteed
43661dec060SFabiano Rosas          * that we always hold the last refcount when reaching here.
4371a6e217cSPeter Xu          *
43861dec060SFabiano Rosas          * Closing the fd explicitly has the benefit that if there is any
43961dec060SFabiano Rosas          * registered I/O handler callbacks on such fd, that will get a
44061dec060SFabiano Rosas          * POLLNVAL event and will further trigger the cleanup to finally
44161dec060SFabiano Rosas          * release the IOC.
44261dec060SFabiano Rosas          *
44361dec060SFabiano Rosas          * FIXME: It should logically be guaranteed that all multifd
44461dec060SFabiano Rosas          * channels have no I/O handler callback registered when reaching
44561dec060SFabiano Rosas          * here, because migration thread will wait for all multifd channel
44661dec060SFabiano Rosas          * establishments to complete during setup.  Since
44761dec060SFabiano Rosas          * migrate_fd_cleanup() will be scheduled in main thread too, all
44861dec060SFabiano Rosas          * previous callbacks should guarantee to be completed when
44961dec060SFabiano Rosas          * reaching here.  See multifd_send_state.channels_created and its
45061dec060SFabiano Rosas          * usage.  In the future, we could replace this with an assert
45161dec060SFabiano Rosas          * making sure we're the last reference, or simply drop it if above
45261dec060SFabiano Rosas          * is more clear to be justified.
4531a6e217cSPeter Xu          */
454b7b03eb6SFabiano Rosas         qio_channel_close(p->c, &error_abort);
455c9a7e83cSPeter Xu         object_unref(OBJECT(p->c));
456d32ca5adSJuan Quintela         p->c = NULL;
4570518b5d8SPeter Xu     }
458d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem);
459d32ca5adSJuan Quintela     qemu_sem_destroy(&p->sem_sync);
460d32ca5adSJuan Quintela     g_free(p->name);
461d32ca5adSJuan Quintela     p->name = NULL;
4629f0e1089SFabiano Rosas     g_free(p->data);
4639f0e1089SFabiano Rosas     p->data = NULL;
464d32ca5adSJuan Quintela     p->packet_len = 0;
465d32ca5adSJuan Quintela     g_free(p->packet);
466d32ca5adSJuan Quintela     p->packet = NULL;
46712808db3SPeter Xu     multifd_send_state->ops->send_cleanup(p, errp);
46890e0eeb9SFabiano Rosas     assert(!p->iov);
46912808db3SPeter Xu 
47012808db3SPeter Xu     return *errp == NULL;
471ab7cbb0bSJuan Quintela }
47212808db3SPeter Xu 
multifd_send_cleanup_state(void)47312808db3SPeter Xu static void multifd_send_cleanup_state(void)
47412808db3SPeter Xu {
475b7b03eb6SFabiano Rosas     file_cleanup_outgoing_migration();
47672b90b96SPeter Xu     socket_cleanup_outgoing_migration();
47793fa9dc2SFabiano Rosas     qemu_sem_destroy(&multifd_send_state->channels_created);
478d32ca5adSJuan Quintela     qemu_sem_destroy(&multifd_send_state->channels_ready);
479d32ca5adSJuan Quintela     g_free(multifd_send_state->params);
480d32ca5adSJuan Quintela     multifd_send_state->params = NULL;
481d32ca5adSJuan Quintela     g_free(multifd_send_state);
482d32ca5adSJuan Quintela     multifd_send_state = NULL;
483d32ca5adSJuan Quintela }
484d32ca5adSJuan Quintela 
multifd_send_shutdown(void)485cde85c37SPeter Xu void multifd_send_shutdown(void)
48612808db3SPeter Xu {
48712808db3SPeter Xu     int i;
48812808db3SPeter Xu 
48912808db3SPeter Xu     if (!migrate_multifd()) {
49012808db3SPeter Xu         return;
49112808db3SPeter Xu     }
49212808db3SPeter Xu 
49312808db3SPeter Xu     multifd_send_terminate_threads();
49412808db3SPeter Xu 
49512808db3SPeter Xu     for (i = 0; i < migrate_multifd_channels(); i++) {
49612808db3SPeter Xu         MultiFDSendParams *p = &multifd_send_state->params[i];
49712808db3SPeter Xu         Error *local_err = NULL;
49812808db3SPeter Xu 
49912808db3SPeter Xu         if (!multifd_send_cleanup_channel(p, &local_err)) {
50012808db3SPeter Xu             migrate_set_error(migrate_get_current(), local_err);
50112808db3SPeter Xu             error_free(local_err);
50212808db3SPeter Xu         }
50312808db3SPeter Xu     }
50412808db3SPeter Xu 
50512808db3SPeter Xu     multifd_send_cleanup_state();
50612808db3SPeter Xu }
50712808db3SPeter Xu 
multifd_zero_copy_flush(QIOChannel * c)5084cc47b43SLeonardo Bras static int multifd_zero_copy_flush(QIOChannel *c)
5094cc47b43SLeonardo Bras {
5104cc47b43SLeonardo Bras     int ret;
5114cc47b43SLeonardo Bras     Error *err = NULL;
5124cc47b43SLeonardo Bras 
5134cc47b43SLeonardo Bras     ret = qio_channel_flush(c, &err);
5144cc47b43SLeonardo Bras     if (ret < 0) {
5154cc47b43SLeonardo Bras         error_report_err(err);
5164cc47b43SLeonardo Bras         return -1;
5174cc47b43SLeonardo Bras     }
5184cc47b43SLeonardo Bras     if (ret == 1) {
519aff3f660SJuan Quintela         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
5204cc47b43SLeonardo Bras     }
5214cc47b43SLeonardo Bras 
5224cc47b43SLeonardo Bras     return ret;
5234cc47b43SLeonardo Bras }
5244cc47b43SLeonardo Bras 
multifd_send_sync_main(void)525a0c78d81SFabiano Rosas int multifd_send_sync_main(void)
526a0c78d81SFabiano Rosas {
527a0c78d81SFabiano Rosas     int i;
528a0c78d81SFabiano Rosas     bool flush_zero_copy;
529a0c78d81SFabiano Rosas 
530b4bc342cSJuan Quintela     flush_zero_copy = migrate_zero_copy_send();
5315b1d9babSLeonardo Bras 
532d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
533d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
534d32ca5adSJuan Quintela 
53515f3f21dSPeter Xu         if (multifd_send_should_exit()) {
53633d70973SLeonardo Bras             return -1;
537d32ca5adSJuan Quintela         }
538d32ca5adSJuan Quintela 
53915f3f21dSPeter Xu         trace_multifd_send_sync_main_signal(p->id);
54015f3f21dSPeter Xu 
541f5f48a78SPeter Xu         /*
542f5f48a78SPeter Xu          * We should be the only user so far, so not possible to be set by
543f5f48a78SPeter Xu          * others concurrently.
544f5f48a78SPeter Xu          */
545f5f48a78SPeter Xu         assert(qatomic_read(&p->pending_sync) == false);
546f5f48a78SPeter Xu         qatomic_set(&p->pending_sync, true);
547d32ca5adSJuan Quintela         qemu_sem_post(&p->sem);
548d32ca5adSJuan Quintela     }
549d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
550d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
551d32ca5adSJuan Quintela 
55215f3f21dSPeter Xu         if (multifd_send_should_exit()) {
55315f3f21dSPeter Xu             return -1;
55415f3f21dSPeter Xu         }
55515f3f21dSPeter Xu 
556d2026ee1SJuan Quintela         qemu_sem_wait(&multifd_send_state->channels_ready);
557d32ca5adSJuan Quintela         trace_multifd_send_sync_main_wait(p->id);
558d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem_sync);
559ebfc5787SZhenzhong Duan 
560ebfc5787SZhenzhong Duan         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
561ebfc5787SZhenzhong Duan             return -1;
562ebfc5787SZhenzhong Duan         }
563d32ca5adSJuan Quintela     }
564d32ca5adSJuan Quintela     trace_multifd_send_sync_main(multifd_send_state->packet_num);
56533d70973SLeonardo Bras 
56633d70973SLeonardo Bras     return 0;
567d32ca5adSJuan Quintela }
568d32ca5adSJuan Quintela 
multifd_send_thread(void * opaque)569d32ca5adSJuan Quintela static void *multifd_send_thread(void *opaque)
570d32ca5adSJuan Quintela {
571d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
5721b1f4ab6SJiang Jiacheng     MigrationThread *thread = NULL;
573d32ca5adSJuan Quintela     Error *local_err = NULL;
574d32ca5adSJuan Quintela     int ret = 0;
57506833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
576d32ca5adSJuan Quintela 
577788fa680SFabiano Rosas     thread = migration_threads_add(p->name, qemu_get_thread_id());
5781b1f4ab6SJiang Jiacheng 
579d32ca5adSJuan Quintela     trace_multifd_send_thread_start(p->id);
580d32ca5adSJuan Quintela     rcu_register_thread();
581d32ca5adSJuan Quintela 
58206833d83SFabiano Rosas     if (use_packets) {
583d32ca5adSJuan Quintela         if (multifd_send_initial_packet(p, &local_err) < 0) {
584d32ca5adSJuan Quintela             ret = -1;
585d32ca5adSJuan Quintela             goto out;
586d32ca5adSJuan Quintela         }
58706833d83SFabiano Rosas     }
588d32ca5adSJuan Quintela 
589d32ca5adSJuan Quintela     while (true) {
590d2026ee1SJuan Quintela         qemu_sem_post(&multifd_send_state->channels_ready);
591d32ca5adSJuan Quintela         qemu_sem_wait(&p->sem);
592d32ca5adSJuan Quintela 
59315f3f21dSPeter Xu         if (multifd_send_should_exit()) {
594d32ca5adSJuan Quintela             break;
595d32ca5adSJuan Quintela         }
596d32ca5adSJuan Quintela 
597488c84acSPeter Xu         /*
5989f0e1089SFabiano Rosas          * Read pending_job flag before p->data.  Pairs with the
599a71ef5c7SFabiano Rosas          * qatomic_store_release() in multifd_send().
600488c84acSPeter Xu          */
601488c84acSPeter Xu         if (qatomic_load_acquire(&p->pending_job)) {
60200b4b216SMaciej S. Szmigiero             p->flags = 0;
603b7dbdd8eSLeonardo Bras             p->iovs_num = 0;
604a71ef5c7SFabiano Rosas             assert(!multifd_payload_empty(p->data));
60583c560fbSPeter Xu 
60602fb8104SJuan Quintela             ret = multifd_send_state->ops->send_prepare(p, &local_err);
607ab7cbb0bSJuan Quintela             if (ret != 0) {
608ab7cbb0bSJuan Quintela                 break;
609ab7cbb0bSJuan Quintela             }
61083c560fbSPeter Xu 
611f427d90bSFabiano Rosas             if (migrate_mapped_ram()) {
612f427d90bSFabiano Rosas                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
6139f0e1089SFabiano Rosas                                               &p->data->u.ram, &local_err);
614f427d90bSFabiano Rosas             } else {
615f427d90bSFabiano Rosas                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
616f427d90bSFabiano Rosas                                                   NULL, 0, p->write_flags,
617f427d90bSFabiano Rosas                                                   &local_err);
618f427d90bSFabiano Rosas             }
619f427d90bSFabiano Rosas 
620d32ca5adSJuan Quintela             if (ret != 0) {
621d32ca5adSJuan Quintela                 break;
622d32ca5adSJuan Quintela             }
623d32ca5adSJuan Quintela 
62468b6e000SElena Ufimtseva             stat64_add(&mig_stats.multifd_bytes,
6250926c002SDmitry Frolov                        (uint64_t)p->next_packet_size + p->packet_len);
626836eca47SPeter Xu 
6271618f552SElena Ufimtseva             p->next_packet_size = 0;
628a71ef5c7SFabiano Rosas             multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
629488c84acSPeter Xu 
630488c84acSPeter Xu             /*
6319f0e1089SFabiano Rosas              * Making sure p->data is published before saying "we're
632488c84acSPeter Xu              * free".  Pairs with the smp_mb_acquire() in
633a71ef5c7SFabiano Rosas              * multifd_send().
634488c84acSPeter Xu              */
635488c84acSPeter Xu             qatomic_store_release(&p->pending_job, false);
636859ebaf3SPeter Xu         } else {
637488c84acSPeter Xu             /*
638488c84acSPeter Xu              * If not a normal job, must be a sync request.  Note that
639488c84acSPeter Xu              * pending_sync is a standalone flag (unlike pending_job), so
640488c84acSPeter Xu              * it doesn't require explicit memory barriers.
641488c84acSPeter Xu              */
642859ebaf3SPeter Xu             assert(qatomic_read(&p->pending_sync));
64306833d83SFabiano Rosas 
64406833d83SFabiano Rosas             if (use_packets) {
645f5f48a78SPeter Xu                 p->flags = MULTIFD_FLAG_SYNC;
646f5f48a78SPeter Xu                 multifd_send_fill_packet(p);
647f5f48a78SPeter Xu                 ret = qio_channel_write_all(p->c, (void *)p->packet,
648f5f48a78SPeter Xu                                             p->packet_len, &local_err);
649f5f48a78SPeter Xu                 if (ret != 0) {
650f5f48a78SPeter Xu                     break;
651d32ca5adSJuan Quintela                 }
652f5f48a78SPeter Xu                 /* p->next_packet_size will always be zero for a SYNC packet */
653f5f48a78SPeter Xu                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
65406833d83SFabiano Rosas             }
65506833d83SFabiano Rosas 
656f5f48a78SPeter Xu             qatomic_set(&p->pending_sync, false);
657f5f48a78SPeter Xu             qemu_sem_post(&p->sem_sync);
658d32ca5adSJuan Quintela         }
659d32ca5adSJuan Quintela     }
660d32ca5adSJuan Quintela 
661d32ca5adSJuan Quintela out:
662ee8a7c9cSFabiano Rosas     if (ret) {
663ee8a7c9cSFabiano Rosas         assert(local_err);
664d32ca5adSJuan Quintela         trace_multifd_send_error(p->id);
6653ab4441dSPeter Xu         multifd_send_set_error(local_err);
66648c0f5d5SPeter Xu         multifd_send_kick_main(p);
667ee8a7c9cSFabiano Rosas         error_free(local_err);
668d32ca5adSJuan Quintela     }
669d32ca5adSJuan Quintela 
670d32ca5adSJuan Quintela     rcu_unregister_thread();
671788fa680SFabiano Rosas     migration_threads_remove(thread);
67296d396bfSFabiano Rosas     trace_multifd_send_thread_end(p->id, p->packets_sent);
673d32ca5adSJuan Quintela 
674d32ca5adSJuan Quintela     return NULL;
675d32ca5adSJuan Quintela }
676d32ca5adSJuan Quintela 
6772576ae48SFabiano Rosas static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
67829647140SChuan Zheng 
6799221e3c6SPeter Xu typedef struct {
6809221e3c6SPeter Xu     MultiFDSendParams *p;
6819221e3c6SPeter Xu     QIOChannelTLS *tioc;
6829221e3c6SPeter Xu } MultiFDTLSThreadArgs;
6839221e3c6SPeter Xu 
multifd_tls_handshake_thread(void * opaque)684a1af605bSChuan Zheng static void *multifd_tls_handshake_thread(void *opaque)
685a1af605bSChuan Zheng {
6869221e3c6SPeter Xu     MultiFDTLSThreadArgs *args = opaque;
687a1af605bSChuan Zheng 
6889221e3c6SPeter Xu     qio_channel_tls_handshake(args->tioc,
6892576ae48SFabiano Rosas                               multifd_new_send_channel_async,
6909221e3c6SPeter Xu                               args->p,
691a1af605bSChuan Zheng                               NULL,
692a1af605bSChuan Zheng                               NULL);
6939221e3c6SPeter Xu     g_free(args);
6949221e3c6SPeter Xu 
695a1af605bSChuan Zheng     return NULL;
696a1af605bSChuan Zheng }
697a1af605bSChuan Zheng 
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)698967e3889SFabiano Rosas static bool multifd_tls_channel_connect(MultiFDSendParams *p,
69929647140SChuan Zheng                                         QIOChannel *ioc,
70029647140SChuan Zheng                                         Error **errp)
70129647140SChuan Zheng {
70229647140SChuan Zheng     MigrationState *s = migrate_get_current();
7037f692ec7SPeter Xu     const char *hostname = s->hostname;
7049221e3c6SPeter Xu     MultiFDTLSThreadArgs *args;
70529647140SChuan Zheng     QIOChannelTLS *tioc;
70629647140SChuan Zheng 
7070deb7e9bSJuan Quintela     tioc = migration_tls_client_create(ioc, hostname, errp);
70829647140SChuan Zheng     if (!tioc) {
709967e3889SFabiano Rosas         return false;
71029647140SChuan Zheng     }
71129647140SChuan Zheng 
7122576ae48SFabiano Rosas     /*
7132576ae48SFabiano Rosas      * Ownership of the socket channel now transfers to the newly
7142576ae48SFabiano Rosas      * created TLS channel, which has already taken a reference.
7152576ae48SFabiano Rosas      */
7169e842408SChuan Zheng     object_unref(OBJECT(ioc));
717894f0214SChuan Zheng     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
71829647140SChuan Zheng     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
7199221e3c6SPeter Xu 
7209221e3c6SPeter Xu     args = g_new0(MultiFDTLSThreadArgs, 1);
7219221e3c6SPeter Xu     args->tioc = tioc;
7229221e3c6SPeter Xu     args->p = p;
723e1921f10SFabiano Rosas 
724e1921f10SFabiano Rosas     p->tls_thread_created = true;
725e620b1e4SPeter Xu     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
7269221e3c6SPeter Xu                        multifd_tls_handshake_thread, args,
727a1af605bSChuan Zheng                        QEMU_THREAD_JOINABLE);
728967e3889SFabiano Rosas     return true;
72929647140SChuan Zheng }
73029647140SChuan Zheng 
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)731b7b03eb6SFabiano Rosas void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
73229647140SChuan Zheng {
7332576ae48SFabiano Rosas     qio_channel_set_delay(ioc, false);
734967e3889SFabiano Rosas 
73520171ea8SLukas Straub     migration_ioc_register_yank(ioc);
7369221e3c6SPeter Xu     /* Setup p->c only if the channel is completely setup */
73729647140SChuan Zheng     p->c = ioc;
738a2a63c4aSFabiano Rosas 
739a2a63c4aSFabiano Rosas     p->thread_created = true;
74029647140SChuan Zheng     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
74129647140SChuan Zheng                        QEMU_THREAD_JOINABLE);
74229647140SChuan Zheng }
74329647140SChuan Zheng 
7442576ae48SFabiano Rosas /*
7452576ae48SFabiano Rosas  * When TLS is enabled this function is called once to establish the
7462576ae48SFabiano Rosas  * TLS connection and a second time after the TLS handshake to create
7472576ae48SFabiano Rosas  * the multifd channel. Without TLS it goes straight into the channel
7482576ae48SFabiano Rosas  * creation.
7492576ae48SFabiano Rosas  */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)750d32ca5adSJuan Quintela static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
751d32ca5adSJuan Quintela {
752d32ca5adSJuan Quintela     MultiFDSendParams *p = opaque;
7530e92f644SFabiano Rosas     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
754d32ca5adSJuan Quintela     Error *local_err = NULL;
7552576ae48SFabiano Rosas     bool ret;
756d32ca5adSJuan Quintela 
757d32ca5adSJuan Quintela     trace_multifd_new_send_channel_async(p->id);
7582576ae48SFabiano Rosas 
7592576ae48SFabiano Rosas     if (qio_task_propagate_error(task, &local_err)) {
7602576ae48SFabiano Rosas         ret = false;
7612576ae48SFabiano Rosas         goto out;
762bca762c2SLi Zhang     }
76303c7a42dSChuan Zheng 
7642576ae48SFabiano Rosas     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
7652576ae48SFabiano Rosas                                        migrate_get_current()->hostname);
7662576ae48SFabiano Rosas 
7672576ae48SFabiano Rosas     if (migrate_channel_requires_tls_upgrade(ioc)) {
7682576ae48SFabiano Rosas         ret = multifd_tls_channel_connect(p, ioc, &local_err);
76993fa9dc2SFabiano Rosas         if (ret) {
77093fa9dc2SFabiano Rosas             return;
77193fa9dc2SFabiano Rosas         }
7722576ae48SFabiano Rosas     } else {
773770de49cSPeter Xu         multifd_channel_connect(p, ioc);
774770de49cSPeter Xu         ret = true;
7752576ae48SFabiano Rosas     }
7762576ae48SFabiano Rosas 
77793fa9dc2SFabiano Rosas out:
77893fa9dc2SFabiano Rosas     /*
77993fa9dc2SFabiano Rosas      * Here we're not interested whether creation succeeded, only that
78093fa9dc2SFabiano Rosas      * it happened at all.
78193fa9dc2SFabiano Rosas      */
782a8a3e710SFabiano Rosas     multifd_send_channel_created();
78393fa9dc2SFabiano Rosas 
7842576ae48SFabiano Rosas     if (ret) {
7852576ae48SFabiano Rosas         return;
7862576ae48SFabiano Rosas     }
7872576ae48SFabiano Rosas 
788967e3889SFabiano Rosas     trace_multifd_new_send_channel_async_error(p->id, local_err);
7893ab4441dSPeter Xu     multifd_send_set_error(local_err);
7902576ae48SFabiano Rosas     /*
7919221e3c6SPeter Xu      * For error cases (TLS or non-TLS), IO channel is always freed here
7929221e3c6SPeter Xu      * rather than when cleanup multifd: since p->c is not set, multifd
7939221e3c6SPeter Xu      * cleanup code doesn't even know its existence.
7942576ae48SFabiano Rosas      */
79515f3f21dSPeter Xu     object_unref(OBJECT(ioc));
79615f3f21dSPeter Xu     error_free(local_err);
7970e92f644SFabiano Rosas }
7980e92f644SFabiano Rosas 
multifd_new_send_channel_create(gpointer opaque,Error ** errp)799b7b03eb6SFabiano Rosas static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
8000e92f644SFabiano Rosas {
801b7b03eb6SFabiano Rosas     if (!multifd_use_packets()) {
802b7b03eb6SFabiano Rosas         return file_send_channel_create(opaque, errp);
803b7b03eb6SFabiano Rosas     }
804b7b03eb6SFabiano Rosas 
8050e92f644SFabiano Rosas     socket_send_channel_create(multifd_new_send_channel_async, opaque);
806b7b03eb6SFabiano Rosas     return true;
807d32ca5adSJuan Quintela }
808d32ca5adSJuan Quintela 
multifd_send_setup(void)809bd8b0a8fSFabiano Rosas bool multifd_send_setup(void)
810d32ca5adSJuan Quintela {
811bd8b0a8fSFabiano Rosas     MigrationState *s = migrate_get_current();
812bd8b0a8fSFabiano Rosas     int thread_count, ret = 0;
81390fa121cSFabiano Rosas     uint32_t page_count = multifd_ram_page_count();
81406833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
815d32ca5adSJuan Quintela     uint8_t i;
816d32ca5adSJuan Quintela 
81751b07548SJuan Quintela     if (!migrate_multifd()) {
818bd8b0a8fSFabiano Rosas         return true;
819d32ca5adSJuan Quintela     }
820b7acd657SLi Zhijian 
821d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
822d32ca5adSJuan Quintela     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
823d32ca5adSJuan Quintela     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
82493fa9dc2SFabiano Rosas     qemu_sem_init(&multifd_send_state->channels_created, 0);
825d32ca5adSJuan Quintela     qemu_sem_init(&multifd_send_state->channels_ready, 0);
826d73415a3SStefan Hajnoczi     qatomic_set(&multifd_send_state->exiting, 0);
827ab7cbb0bSJuan Quintela     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
828d32ca5adSJuan Quintela 
829d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
830d32ca5adSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
8310bd5b928SFabiano Rosas         Error *local_err = NULL;
832d32ca5adSJuan Quintela 
833d32ca5adSJuan Quintela         qemu_sem_init(&p->sem, 0);
834d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
835d32ca5adSJuan Quintela         p->id = i;
8369f0e1089SFabiano Rosas         p->data = multifd_send_data_alloc();
83706833d83SFabiano Rosas 
83806833d83SFabiano Rosas         if (use_packets) {
839d32ca5adSJuan Quintela             p->packet_len = sizeof(MultiFDPacket_t)
840d32ca5adSJuan Quintela                           + sizeof(uint64_t) * page_count;
841d32ca5adSJuan Quintela             p->packet = g_malloc0(p->packet_len);
84206833d83SFabiano Rosas         }
843e620b1e4SPeter Xu         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
8445b1d9babSLeonardo Bras         p->write_flags = 0;
845b7b03eb6SFabiano Rosas 
846b7b03eb6SFabiano Rosas         if (!multifd_new_send_channel_create(p, &local_err)) {
8470bd5b928SFabiano Rosas             migrate_set_error(s, local_err);
8480bd5b928SFabiano Rosas             ret = -1;
849b7b03eb6SFabiano Rosas         }
850d32ca5adSJuan Quintela     }
851ab7cbb0bSJuan Quintela 
85293fa9dc2SFabiano Rosas     /*
85393fa9dc2SFabiano Rosas      * Wait until channel creation has started for all channels. The
85493fa9dc2SFabiano Rosas      * creation can still fail, but no more channels will be created
85593fa9dc2SFabiano Rosas      * past this point.
85693fa9dc2SFabiano Rosas      */
85793fa9dc2SFabiano Rosas     for (i = 0; i < thread_count; i++) {
85893fa9dc2SFabiano Rosas         qemu_sem_wait(&multifd_send_state->channels_created);
85993fa9dc2SFabiano Rosas     }
86093fa9dc2SFabiano Rosas 
8610bd5b928SFabiano Rosas     if (ret) {
8620bd5b928SFabiano Rosas         goto err;
8630bd5b928SFabiano Rosas     }
8640bd5b928SFabiano Rosas 
865ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
866ab7cbb0bSJuan Quintela         MultiFDSendParams *p = &multifd_send_state->params[i];
8670bd5b928SFabiano Rosas         Error *local_err = NULL;
868ab7cbb0bSJuan Quintela 
869bd8b0a8fSFabiano Rosas         ret = multifd_send_state->ops->send_setup(p, &local_err);
870ab7cbb0bSJuan Quintela         if (ret) {
871bd8b0a8fSFabiano Rosas             migrate_set_error(s, local_err);
8720bd5b928SFabiano Rosas             goto err;
8730bd5b928SFabiano Rosas         }
87490e0eeb9SFabiano Rosas         assert(p->iov);
875bd8b0a8fSFabiano Rosas     }
876bd8b0a8fSFabiano Rosas 
877bd8b0a8fSFabiano Rosas     return true;
8780bd5b928SFabiano Rosas 
8790bd5b928SFabiano Rosas err:
8800bd5b928SFabiano Rosas     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
8810bd5b928SFabiano Rosas                       MIGRATION_STATUS_FAILED);
8820bd5b928SFabiano Rosas     return false;
883d32ca5adSJuan Quintela }
884d32ca5adSJuan Quintela 
multifd_recv(void)885d117ed06SFabiano Rosas bool multifd_recv(void)
886d117ed06SFabiano Rosas {
887d117ed06SFabiano Rosas     int i;
888d117ed06SFabiano Rosas     static int next_recv_channel;
889d117ed06SFabiano Rosas     MultiFDRecvParams *p = NULL;
890d117ed06SFabiano Rosas     MultiFDRecvData *data = multifd_recv_state->data;
891d117ed06SFabiano Rosas 
892d117ed06SFabiano Rosas     /*
893d117ed06SFabiano Rosas      * next_channel can remain from a previous migration that was
894d117ed06SFabiano Rosas      * using more channels, so ensure it doesn't overflow if the
895d117ed06SFabiano Rosas      * limit is lower now.
896d117ed06SFabiano Rosas      */
897d117ed06SFabiano Rosas     next_recv_channel %= migrate_multifd_channels();
898d117ed06SFabiano Rosas     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
899d117ed06SFabiano Rosas         if (multifd_recv_should_exit()) {
900d117ed06SFabiano Rosas             return false;
901d117ed06SFabiano Rosas         }
902d117ed06SFabiano Rosas 
903d117ed06SFabiano Rosas         p = &multifd_recv_state->params[i];
904d117ed06SFabiano Rosas 
905d117ed06SFabiano Rosas         if (qatomic_read(&p->pending_job) == false) {
906d117ed06SFabiano Rosas             next_recv_channel = (i + 1) % migrate_multifd_channels();
907d117ed06SFabiano Rosas             break;
908d117ed06SFabiano Rosas         }
909d117ed06SFabiano Rosas     }
910d117ed06SFabiano Rosas 
911d117ed06SFabiano Rosas     /*
912d117ed06SFabiano Rosas      * Order pending_job read before manipulating p->data below. Pairs
913d117ed06SFabiano Rosas      * with qatomic_store_release() at multifd_recv_thread().
914d117ed06SFabiano Rosas      */
915d117ed06SFabiano Rosas     smp_mb_acquire();
916d117ed06SFabiano Rosas 
917d117ed06SFabiano Rosas     assert(!p->data->size);
918d117ed06SFabiano Rosas     multifd_recv_state->data = p->data;
919d117ed06SFabiano Rosas     p->data = data;
920d117ed06SFabiano Rosas 
921d117ed06SFabiano Rosas     /*
922d117ed06SFabiano Rosas      * Order p->data update before setting pending_job. Pairs with
923d117ed06SFabiano Rosas      * qatomic_load_acquire() at multifd_recv_thread().
924d117ed06SFabiano Rosas      */
925d117ed06SFabiano Rosas     qatomic_store_release(&p->pending_job, true);
926d117ed06SFabiano Rosas     qemu_sem_post(&p->sem);
927d117ed06SFabiano Rosas 
928d117ed06SFabiano Rosas     return true;
929d117ed06SFabiano Rosas }
930d117ed06SFabiano Rosas 
multifd_get_recv_data(void)931d117ed06SFabiano Rosas MultiFDRecvData *multifd_get_recv_data(void)
932d117ed06SFabiano Rosas {
933d117ed06SFabiano Rosas     return multifd_recv_state->data;
934d117ed06SFabiano Rosas }
935d117ed06SFabiano Rosas 
multifd_recv_terminate_threads(Error * err)936d32ca5adSJuan Quintela static void multifd_recv_terminate_threads(Error *err)
937d32ca5adSJuan Quintela {
938d32ca5adSJuan Quintela     int i;
939d32ca5adSJuan Quintela 
940d32ca5adSJuan Quintela     trace_multifd_recv_terminate_threads(err != NULL);
941d32ca5adSJuan Quintela 
94211dd7be5SFabiano Rosas     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
94311dd7be5SFabiano Rosas         return;
94411dd7be5SFabiano Rosas     }
94511dd7be5SFabiano Rosas 
946d32ca5adSJuan Quintela     if (err) {
947d32ca5adSJuan Quintela         MigrationState *s = migrate_get_current();
948d32ca5adSJuan Quintela         migrate_set_error(s, err);
949d32ca5adSJuan Quintela         if (s->state == MIGRATION_STATUS_SETUP ||
950d32ca5adSJuan Quintela             s->state == MIGRATION_STATUS_ACTIVE) {
951d32ca5adSJuan Quintela             migrate_set_state(&s->state, s->state,
952d32ca5adSJuan Quintela                               MIGRATION_STATUS_FAILED);
953d32ca5adSJuan Quintela         }
954d32ca5adSJuan Quintela     }
955d32ca5adSJuan Quintela 
956d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
957d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
958d32ca5adSJuan Quintela 
959d32ca5adSJuan Quintela         /*
960d117ed06SFabiano Rosas          * The migration thread and channels interact differently
961d117ed06SFabiano Rosas          * depending on the presence of packets.
962d13f0026SFabiano Rosas          */
96306833d83SFabiano Rosas         if (multifd_use_packets()) {
964d117ed06SFabiano Rosas             /*
965d117ed06SFabiano Rosas              * The channel receives as long as there are packets. When
966d117ed06SFabiano Rosas              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
967d117ed06SFabiano Rosas              * channel waits for the migration thread to sync. If the
968d117ed06SFabiano Rosas              * sync never happens, do it here.
969d117ed06SFabiano Rosas              */
970d13f0026SFabiano Rosas             qemu_sem_post(&p->sem_sync);
971d117ed06SFabiano Rosas         } else {
972d117ed06SFabiano Rosas             /*
973d117ed06SFabiano Rosas              * The channel waits for the migration thread to give it
974d117ed06SFabiano Rosas              * work. When the migration thread runs out of work, it
975d117ed06SFabiano Rosas              * releases the channel and waits for any pending work to
976d117ed06SFabiano Rosas              * finish. If we reach here (e.g. due to error) before the
977d117ed06SFabiano Rosas              * work runs out, release the channel.
978d117ed06SFabiano Rosas              */
979d117ed06SFabiano Rosas             qemu_sem_post(&p->sem);
98006833d83SFabiano Rosas         }
981d13f0026SFabiano Rosas 
982d13f0026SFabiano Rosas         /*
983d32ca5adSJuan Quintela          * We could arrive here for two reasons:
984d32ca5adSJuan Quintela          *  - normal quit, i.e. everything went fine, just finished
985d32ca5adSJuan Quintela          *  - error quit: We close the channels so the channel threads
986d32ca5adSJuan Quintela          *    finish the qio_channel_read_all_eof()
987d32ca5adSJuan Quintela          */
988d32ca5adSJuan Quintela         if (p->c) {
989d32ca5adSJuan Quintela             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
990d32ca5adSJuan Quintela         }
991d32ca5adSJuan Quintela     }
992d32ca5adSJuan Quintela }
993d32ca5adSJuan Quintela 
multifd_recv_shutdown(void)994cde85c37SPeter Xu void multifd_recv_shutdown(void)
995cfc3bcf3SLeonardo Bras {
99651b07548SJuan Quintela     if (migrate_multifd()) {
997cfc3bcf3SLeonardo Bras         multifd_recv_terminate_threads(NULL);
998cfc3bcf3SLeonardo Bras     }
999cfc3bcf3SLeonardo Bras }
1000cfc3bcf3SLeonardo Bras 
multifd_recv_cleanup_channel(MultiFDRecvParams * p)10015e6ea8a1SPeter Xu static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
10025e6ea8a1SPeter Xu {
10035e6ea8a1SPeter Xu     migration_ioc_unregister_yank(p->c);
10045e6ea8a1SPeter Xu     object_unref(OBJECT(p->c));
10055e6ea8a1SPeter Xu     p->c = NULL;
10065e6ea8a1SPeter Xu     qemu_mutex_destroy(&p->mutex);
10075e6ea8a1SPeter Xu     qemu_sem_destroy(&p->sem_sync);
1008d117ed06SFabiano Rosas     qemu_sem_destroy(&p->sem);
10094c107870SPeter Maydell     g_free(p->data);
10104c107870SPeter Maydell     p->data = NULL;
10115e6ea8a1SPeter Xu     g_free(p->name);
10125e6ea8a1SPeter Xu     p->name = NULL;
10135e6ea8a1SPeter Xu     p->packet_len = 0;
10145e6ea8a1SPeter Xu     g_free(p->packet);
10155e6ea8a1SPeter Xu     p->packet = NULL;
10165e6ea8a1SPeter Xu     g_free(p->normal);
10175e6ea8a1SPeter Xu     p->normal = NULL;
1018303e6f54SHao Xiang     g_free(p->zero);
1019303e6f54SHao Xiang     p->zero = NULL;
10205e6ea8a1SPeter Xu     multifd_recv_state->ops->recv_cleanup(p);
10215e6ea8a1SPeter Xu }
10225e6ea8a1SPeter Xu 
multifd_recv_cleanup_state(void)10235e6ea8a1SPeter Xu static void multifd_recv_cleanup_state(void)
10245e6ea8a1SPeter Xu {
10255e6ea8a1SPeter Xu     qemu_sem_destroy(&multifd_recv_state->sem_sync);
10265e6ea8a1SPeter Xu     g_free(multifd_recv_state->params);
10275e6ea8a1SPeter Xu     multifd_recv_state->params = NULL;
1028d117ed06SFabiano Rosas     g_free(multifd_recv_state->data);
1029d117ed06SFabiano Rosas     multifd_recv_state->data = NULL;
10305e6ea8a1SPeter Xu     g_free(multifd_recv_state);
10315e6ea8a1SPeter Xu     multifd_recv_state = NULL;
10325e6ea8a1SPeter Xu }
10335e6ea8a1SPeter Xu 
multifd_recv_cleanup(void)1034cde85c37SPeter Xu void multifd_recv_cleanup(void)
1035d32ca5adSJuan Quintela {
1036d32ca5adSJuan Quintela     int i;
1037d32ca5adSJuan Quintela 
103851b07548SJuan Quintela     if (!migrate_multifd()) {
1039e5bac1f5SLeonardo Bras         return;
1040d32ca5adSJuan Quintela     }
1041d32ca5adSJuan Quintela     multifd_recv_terminate_threads(NULL);
1042d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
1043d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1044d32ca5adSJuan Quintela 
1045a2a63c4aSFabiano Rosas         if (p->thread_created) {
104610351fbaSLeonardo Bras             qemu_thread_join(&p->thread);
1047d32ca5adSJuan Quintela         }
1048a2a63c4aSFabiano Rosas     }
1049d32ca5adSJuan Quintela     for (i = 0; i < migrate_multifd_channels(); i++) {
10505e6ea8a1SPeter Xu         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1051d32ca5adSJuan Quintela     }
10525e6ea8a1SPeter Xu     multifd_recv_cleanup_state();
1053d32ca5adSJuan Quintela }
1054d32ca5adSJuan Quintela 
multifd_recv_sync_main(void)1055d32ca5adSJuan Quintela void multifd_recv_sync_main(void)
1056d32ca5adSJuan Quintela {
10574aac6b1eSFabiano Rosas     int thread_count = migrate_multifd_channels();
1058a49d15a3SFabiano Rosas     bool file_based = !multifd_use_packets();
1059d32ca5adSJuan Quintela     int i;
1060d32ca5adSJuan Quintela 
1061a49d15a3SFabiano Rosas     if (!migrate_multifd()) {
1062d32ca5adSJuan Quintela         return;
1063d32ca5adSJuan Quintela     }
1064d32ca5adSJuan Quintela 
10654aac6b1eSFabiano Rosas     /*
1066a49d15a3SFabiano Rosas      * File-based channels don't use packets and therefore need to
1067a49d15a3SFabiano Rosas      * wait for more work. Release them to start the sync.
1068a49d15a3SFabiano Rosas      */
1069a49d15a3SFabiano Rosas     if (file_based) {
1070a49d15a3SFabiano Rosas         for (i = 0; i < thread_count; i++) {
1071a49d15a3SFabiano Rosas             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1072a49d15a3SFabiano Rosas 
1073a49d15a3SFabiano Rosas             trace_multifd_recv_sync_main_signal(p->id);
1074a49d15a3SFabiano Rosas             qemu_sem_post(&p->sem);
1075a49d15a3SFabiano Rosas         }
1076a49d15a3SFabiano Rosas     }
1077a49d15a3SFabiano Rosas 
1078a49d15a3SFabiano Rosas     /*
10794aac6b1eSFabiano Rosas      * Initiate the synchronization by waiting for all channels.
1080a49d15a3SFabiano Rosas      *
10814aac6b1eSFabiano Rosas      * For socket-based migration this means each channel has received
10824aac6b1eSFabiano Rosas      * the SYNC packet on the stream.
1083a49d15a3SFabiano Rosas      *
1084a49d15a3SFabiano Rosas      * For file-based migration this means each channel is done with
1085a49d15a3SFabiano Rosas      * the work (pending_job=false).
10864aac6b1eSFabiano Rosas      */
10874aac6b1eSFabiano Rosas     for (i = 0; i < thread_count; i++) {
10884aac6b1eSFabiano Rosas         trace_multifd_recv_sync_main_wait(i);
1089d32ca5adSJuan Quintela         qemu_sem_wait(&multifd_recv_state->sem_sync);
1090d32ca5adSJuan Quintela     }
10914aac6b1eSFabiano Rosas 
1092a49d15a3SFabiano Rosas     if (file_based) {
1093a49d15a3SFabiano Rosas         /*
1094a49d15a3SFabiano Rosas          * For file-based loading is done in one iteration. We're
1095a49d15a3SFabiano Rosas          * done.
1096a49d15a3SFabiano Rosas          */
1097a49d15a3SFabiano Rosas         return;
1098a49d15a3SFabiano Rosas     }
1099a49d15a3SFabiano Rosas 
11004aac6b1eSFabiano Rosas     /*
11014aac6b1eSFabiano Rosas      * Sync done. Release the channels for the next iteration.
11024aac6b1eSFabiano Rosas      */
11034aac6b1eSFabiano Rosas     for (i = 0; i < thread_count; i++) {
1104d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1105d32ca5adSJuan Quintela 
11066e8a355dSDaniel Brodsky         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1107d32ca5adSJuan Quintela             if (multifd_recv_state->packet_num < p->packet_num) {
1108d32ca5adSJuan Quintela                 multifd_recv_state->packet_num = p->packet_num;
1109d32ca5adSJuan Quintela             }
11106e8a355dSDaniel Brodsky         }
1111d32ca5adSJuan Quintela         trace_multifd_recv_sync_main_signal(p->id);
1112d32ca5adSJuan Quintela         qemu_sem_post(&p->sem_sync);
1113d32ca5adSJuan Quintela     }
1114d32ca5adSJuan Quintela     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1115d32ca5adSJuan Quintela }
1116d32ca5adSJuan Quintela 
multifd_recv_thread(void * opaque)1117d32ca5adSJuan Quintela static void *multifd_recv_thread(void *opaque)
1118d32ca5adSJuan Quintela {
1119d32ca5adSJuan Quintela     MultiFDRecvParams *p = opaque;
1120d32ca5adSJuan Quintela     Error *local_err = NULL;
112106833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1122d32ca5adSJuan Quintela     int ret;
1123d32ca5adSJuan Quintela 
1124d32ca5adSJuan Quintela     trace_multifd_recv_thread_start(p->id);
1125d32ca5adSJuan Quintela     rcu_register_thread();
1126d32ca5adSJuan Quintela 
1127d32ca5adSJuan Quintela     while (true) {
112806833d83SFabiano Rosas         uint32_t flags = 0;
11299db19125SFabiano Rosas         bool has_data = false;
11309db19125SFabiano Rosas         p->normal_num = 0;
1131d32ca5adSJuan Quintela 
1132d117ed06SFabiano Rosas         if (use_packets) {
113311dd7be5SFabiano Rosas             if (multifd_recv_should_exit()) {
1134d32ca5adSJuan Quintela                 break;
1135d32ca5adSJuan Quintela             }
1136d32ca5adSJuan Quintela 
1137d32ca5adSJuan Quintela             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1138d32ca5adSJuan Quintela                                            p->packet_len, &local_err);
1139bca762c2SLi Zhang             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1140d32ca5adSJuan Quintela                 break;
1141d32ca5adSJuan Quintela             }
1142d32ca5adSJuan Quintela 
1143d32ca5adSJuan Quintela             qemu_mutex_lock(&p->mutex);
1144d32ca5adSJuan Quintela             ret = multifd_recv_unfill_packet(p, &local_err);
1145d32ca5adSJuan Quintela             if (ret) {
1146d32ca5adSJuan Quintela                 qemu_mutex_unlock(&p->mutex);
1147d32ca5adSJuan Quintela                 break;
1148d32ca5adSJuan Quintela             }
1149d32ca5adSJuan Quintela 
1150d32ca5adSJuan Quintela             flags = p->flags;
1151ab7cbb0bSJuan Quintela             /* recv methods don't know how to handle the SYNC flag */
1152ab7cbb0bSJuan Quintela             p->flags &= ~MULTIFD_FLAG_SYNC;
1153*7e4480ddSFabiano Rosas 
1154*7e4480ddSFabiano Rosas             /*
1155*7e4480ddSFabiano Rosas              * Even if it's a SYNC packet, this needs to be set
1156*7e4480ddSFabiano Rosas              * because older QEMUs (<9.0) still send data along with
1157*7e4480ddSFabiano Rosas              * the SYNC packet.
1158*7e4480ddSFabiano Rosas              */
1159303e6f54SHao Xiang             has_data = p->normal_num || p->zero_num;
1160d32ca5adSJuan Quintela             qemu_mutex_unlock(&p->mutex);
1161d117ed06SFabiano Rosas         } else {
1162d117ed06SFabiano Rosas             /*
1163d117ed06SFabiano Rosas              * No packets, so we need to wait for the vmstate code to
1164d117ed06SFabiano Rosas              * give us work.
1165d117ed06SFabiano Rosas              */
1166d117ed06SFabiano Rosas             qemu_sem_wait(&p->sem);
1167d117ed06SFabiano Rosas 
1168d117ed06SFabiano Rosas             if (multifd_recv_should_exit()) {
1169d117ed06SFabiano Rosas                 break;
1170d117ed06SFabiano Rosas             }
1171d117ed06SFabiano Rosas 
1172d117ed06SFabiano Rosas             /* pairs with qatomic_store_release() at multifd_recv() */
1173d117ed06SFabiano Rosas             if (!qatomic_load_acquire(&p->pending_job)) {
1174d117ed06SFabiano Rosas                 /*
1175d117ed06SFabiano Rosas                  * Migration thread did not send work, this is
1176d117ed06SFabiano Rosas                  * equivalent to pending_sync on the sending
1177d117ed06SFabiano Rosas                  * side. Post sem_sync to notify we reached this
1178d117ed06SFabiano Rosas                  * point.
1179d117ed06SFabiano Rosas                  */
1180d117ed06SFabiano Rosas                 qemu_sem_post(&multifd_recv_state->sem_sync);
1181d117ed06SFabiano Rosas                 continue;
1182d117ed06SFabiano Rosas             }
1183d117ed06SFabiano Rosas 
1184d117ed06SFabiano Rosas             has_data = !!p->data->size;
118506833d83SFabiano Rosas         }
1186d32ca5adSJuan Quintela 
11879db19125SFabiano Rosas         if (has_data) {
11889db19125SFabiano Rosas             ret = multifd_recv_state->ops->recv(p, &local_err);
1189d32ca5adSJuan Quintela             if (ret != 0) {
1190d32ca5adSJuan Quintela                 break;
1191d32ca5adSJuan Quintela             }
1192d32ca5adSJuan Quintela         }
1193d32ca5adSJuan Quintela 
119406833d83SFabiano Rosas         if (use_packets) {
1195d32ca5adSJuan Quintela             if (flags & MULTIFD_FLAG_SYNC) {
1196d32ca5adSJuan Quintela                 qemu_sem_post(&multifd_recv_state->sem_sync);
1197d32ca5adSJuan Quintela                 qemu_sem_wait(&p->sem_sync);
1198d32ca5adSJuan Quintela             }
1199d117ed06SFabiano Rosas         } else {
1200d117ed06SFabiano Rosas             p->data->size = 0;
1201d117ed06SFabiano Rosas             /*
1202d117ed06SFabiano Rosas              * Order data->size update before clearing
1203d117ed06SFabiano Rosas              * pending_job. Pairs with smp_mb_acquire() at
1204d117ed06SFabiano Rosas              * multifd_recv().
1205d117ed06SFabiano Rosas              */
1206d117ed06SFabiano Rosas             qatomic_store_release(&p->pending_job, false);
1207d32ca5adSJuan Quintela         }
120806833d83SFabiano Rosas     }
1209d32ca5adSJuan Quintela 
1210d32ca5adSJuan Quintela     if (local_err) {
1211d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
121213f2cb21SPan Nengyuan         error_free(local_err);
1213d32ca5adSJuan Quintela     }
1214d32ca5adSJuan Quintela 
1215d32ca5adSJuan Quintela     rcu_unregister_thread();
121696d396bfSFabiano Rosas     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1217d32ca5adSJuan Quintela 
1218d32ca5adSJuan Quintela     return NULL;
1219d32ca5adSJuan Quintela }
1220d32ca5adSJuan Quintela 
multifd_recv_setup(Error ** errp)1221cde85c37SPeter Xu int multifd_recv_setup(Error **errp)
1222d32ca5adSJuan Quintela {
1223d32ca5adSJuan Quintela     int thread_count;
122490fa121cSFabiano Rosas     uint32_t page_count = multifd_ram_page_count();
122506833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1226d32ca5adSJuan Quintela     uint8_t i;
1227d32ca5adSJuan Quintela 
12286720c2b3Smanish.mishra     /*
12296720c2b3Smanish.mishra      * Return successfully if multiFD recv state is already initialised
12306720c2b3Smanish.mishra      * or multiFD is not enabled.
12316720c2b3Smanish.mishra      */
123251b07548SJuan Quintela     if (multifd_recv_state || !migrate_multifd()) {
1233d32ca5adSJuan Quintela         return 0;
1234d32ca5adSJuan Quintela     }
12356720c2b3Smanish.mishra 
1236d32ca5adSJuan Quintela     thread_count = migrate_multifd_channels();
1237d32ca5adSJuan Quintela     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1238d32ca5adSJuan Quintela     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1239d117ed06SFabiano Rosas 
1240d117ed06SFabiano Rosas     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1241d117ed06SFabiano Rosas     multifd_recv_state->data->size = 0;
1242d117ed06SFabiano Rosas 
1243d73415a3SStefan Hajnoczi     qatomic_set(&multifd_recv_state->count, 0);
124411dd7be5SFabiano Rosas     qatomic_set(&multifd_recv_state->exiting, 0);
1245d32ca5adSJuan Quintela     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1246ab7cbb0bSJuan Quintela     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1247d32ca5adSJuan Quintela 
1248d32ca5adSJuan Quintela     for (i = 0; i < thread_count; i++) {
1249d32ca5adSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1250d32ca5adSJuan Quintela 
1251d32ca5adSJuan Quintela         qemu_mutex_init(&p->mutex);
1252d32ca5adSJuan Quintela         qemu_sem_init(&p->sem_sync, 0);
1253d117ed06SFabiano Rosas         qemu_sem_init(&p->sem, 0);
1254d117ed06SFabiano Rosas         p->pending_job = false;
1255d32ca5adSJuan Quintela         p->id = i;
125606833d83SFabiano Rosas 
1257d117ed06SFabiano Rosas         p->data = g_new0(MultiFDRecvData, 1);
1258d117ed06SFabiano Rosas         p->data->size = 0;
1259d117ed06SFabiano Rosas 
126006833d83SFabiano Rosas         if (use_packets) {
1261d32ca5adSJuan Quintela             p->packet_len = sizeof(MultiFDPacket_t)
1262d32ca5adSJuan Quintela                 + sizeof(uint64_t) * page_count;
1263d32ca5adSJuan Quintela             p->packet = g_malloc0(p->packet_len);
126406833d83SFabiano Rosas         }
1265e620b1e4SPeter Xu         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1266cf2d4aa8SJuan Quintela         p->normal = g_new0(ram_addr_t, page_count);
1267303e6f54SHao Xiang         p->zero = g_new0(ram_addr_t, page_count);
1268d32ca5adSJuan Quintela     }
1269ab7cbb0bSJuan Quintela 
1270ab7cbb0bSJuan Quintela     for (i = 0; i < thread_count; i++) {
1271ab7cbb0bSJuan Quintela         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1272ab7cbb0bSJuan Quintela         int ret;
1273ab7cbb0bSJuan Quintela 
12743fc58efaSAvihai Horon         ret = multifd_recv_state->ops->recv_setup(p, errp);
1275ab7cbb0bSJuan Quintela         if (ret) {
1276ab7cbb0bSJuan Quintela             return ret;
1277ab7cbb0bSJuan Quintela         }
1278ab7cbb0bSJuan Quintela     }
1279d32ca5adSJuan Quintela     return 0;
1280d32ca5adSJuan Quintela }
1281d32ca5adSJuan Quintela 
multifd_recv_all_channels_created(void)1282d32ca5adSJuan Quintela bool multifd_recv_all_channels_created(void)
1283d32ca5adSJuan Quintela {
1284d32ca5adSJuan Quintela     int thread_count = migrate_multifd_channels();
1285d32ca5adSJuan Quintela 
128651b07548SJuan Quintela     if (!migrate_multifd()) {
1287d32ca5adSJuan Quintela         return true;
1288d32ca5adSJuan Quintela     }
1289d32ca5adSJuan Quintela 
1290a59136f3SDr. David Alan Gilbert     if (!multifd_recv_state) {
1291a59136f3SDr. David Alan Gilbert         /* Called before any connections created */
1292a59136f3SDr. David Alan Gilbert         return false;
1293a59136f3SDr. David Alan Gilbert     }
1294a59136f3SDr. David Alan Gilbert 
1295d73415a3SStefan Hajnoczi     return thread_count == qatomic_read(&multifd_recv_state->count);
1296d32ca5adSJuan Quintela }
1297d32ca5adSJuan Quintela 
1298d32ca5adSJuan Quintela /*
1299d32ca5adSJuan Quintela  * Try to receive all multifd channels to get ready for the migration.
13006720c2b3Smanish.mishra  * Sets @errp when failing to receive the current channel.
1301d32ca5adSJuan Quintela  */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)13026720c2b3Smanish.mishra void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1303d32ca5adSJuan Quintela {
1304d32ca5adSJuan Quintela     MultiFDRecvParams *p;
1305d32ca5adSJuan Quintela     Error *local_err = NULL;
130606833d83SFabiano Rosas     bool use_packets = multifd_use_packets();
1307d32ca5adSJuan Quintela     int id;
1308d32ca5adSJuan Quintela 
130906833d83SFabiano Rosas     if (use_packets) {
1310d32ca5adSJuan Quintela         id = multifd_recv_initial_packet(ioc, &local_err);
1311d32ca5adSJuan Quintela         if (id < 0) {
1312d32ca5adSJuan Quintela             multifd_recv_terminate_threads(local_err);
1313d32ca5adSJuan Quintela             error_propagate_prepend(errp, local_err,
1314d32ca5adSJuan Quintela                                     "failed to receive packet"
1315d32ca5adSJuan Quintela                                     " via multifd channel %d: ",
1316d73415a3SStefan Hajnoczi                                     qatomic_read(&multifd_recv_state->count));
13176720c2b3Smanish.mishra             return;
1318d32ca5adSJuan Quintela         }
1319d32ca5adSJuan Quintela         trace_multifd_recv_new_channel(id);
132006833d83SFabiano Rosas     } else {
13212dd7ee7aSFabiano Rosas         id = qatomic_read(&multifd_recv_state->count);
132206833d83SFabiano Rosas     }
1323d32ca5adSJuan Quintela 
1324d32ca5adSJuan Quintela     p = &multifd_recv_state->params[id];
1325d32ca5adSJuan Quintela     if (p->c != NULL) {
1326d32ca5adSJuan Quintela         error_setg(&local_err, "multifd: received id '%d' already setup'",
1327d32ca5adSJuan Quintela                    id);
1328d32ca5adSJuan Quintela         multifd_recv_terminate_threads(local_err);
1329d32ca5adSJuan Quintela         error_propagate(errp, local_err);
13306720c2b3Smanish.mishra         return;
1331d32ca5adSJuan Quintela     }
1332d32ca5adSJuan Quintela     p->c = ioc;
1333d32ca5adSJuan Quintela     object_ref(OBJECT(ioc));
1334d32ca5adSJuan Quintela 
1335a2a63c4aSFabiano Rosas     p->thread_created = true;
1336d32ca5adSJuan Quintela     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1337d32ca5adSJuan Quintela                        QEMU_THREAD_JOINABLE);
1338d73415a3SStefan Hajnoczi     qatomic_inc(&multifd_recv_state->count);
1339d32ca5adSJuan Quintela }
1340