xref: /openbmc/qemu/migration/multifd.c (revision 423ac96729202d8703f683ca9303fd317a434c67)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "system/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38 
39 typedef struct {
40     uint32_t magic;
41     uint32_t version;
42     unsigned char uuid[16]; /* QemuUUID */
43     uint8_t id;
44     uint8_t unused1[7];     /* Reserved for future use */
45     uint64_t unused2[4];    /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
47 
48 struct {
49     MultiFDSendParams *params;
50 
51     /* multifd_send() body is not thread safe, needs serialization */
52     QemuMutex multifd_send_mutex;
53 
54     /*
55      * Global number of generated multifd packets.
56      *
57      * Note that we used 'uintptr_t' because it'll naturally support atomic
58      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
59      * multifd will overflow the packet_num easier, but that should be
60      * fine.
61      *
62      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63      * hosts, however so far it does not support atomic fetch_add() yet.
64      * Make it easy for now.
65      */
66     uintptr_t packet_num;
67     /*
68      * Synchronization point past which no more channels will be
69      * created.
70      */
71     QemuSemaphore channels_created;
72     /* send channels ready */
73     QemuSemaphore channels_ready;
74     /*
75      * Have we already run terminate threads.  There is a race when it
76      * happens that we got one error while we are exiting.
77      * We will use atomic operations.  Only valid values are 0 and 1.
78      */
79     int exiting;
80     /* multifd ops */
81     const MultiFDMethods *ops;
82 } *multifd_send_state;
83 
84 struct {
85     MultiFDRecvParams *params;
86     MultiFDRecvData *data;
87     /* number of created threads */
88     int count;
89     /*
90      * This is always posted by the recv threads, the migration thread
91      * uses it to wait for recv threads to finish assigned tasks.
92      */
93     QemuSemaphore sem_sync;
94     /* global number of generated multifd packets */
95     uint64_t packet_num;
96     int exiting;
97     /* multifd ops */
98     const MultiFDMethods *ops;
99 } *multifd_recv_state;
100 
101 MultiFDSendData *multifd_send_data_alloc(void)
102 {
103     MultiFDSendData *new = g_new0(MultiFDSendData, 1);
104 
105     multifd_ram_payload_alloc(&new->u.ram);
106     /* Device state allocates its payload on-demand */
107 
108     return new;
109 }
110 
111 void multifd_send_data_clear(MultiFDSendData *data)
112 {
113     if (multifd_payload_empty(data)) {
114         return;
115     }
116 
117     switch (data->type) {
118     case MULTIFD_PAYLOAD_DEVICE_STATE:
119         multifd_send_data_clear_device_state(&data->u.device_state);
120         break;
121     default:
122         /* Nothing to do */
123         break;
124     }
125 
126     data->type = MULTIFD_PAYLOAD_NONE;
127 }
128 
129 void multifd_send_data_free(MultiFDSendData *data)
130 {
131     if (!data) {
132         return;
133     }
134 
135     /* This also free's device state payload */
136     multifd_send_data_clear(data);
137 
138     multifd_ram_payload_free(&data->u.ram);
139 
140     g_free(data);
141 }
142 
143 static bool multifd_use_packets(void)
144 {
145     return !migrate_mapped_ram();
146 }
147 
148 void multifd_send_channel_created(void)
149 {
150     qemu_sem_post(&multifd_send_state->channels_created);
151 }
152 
153 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
154 
155 void multifd_register_ops(int method, const MultiFDMethods *ops)
156 {
157     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
158     assert(!multifd_ops[method]);
159     multifd_ops[method] = ops;
160 }
161 
162 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
163 {
164     MultiFDInit_t msg = {};
165     size_t size = sizeof(msg);
166     int ret;
167 
168     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
169     msg.version = cpu_to_be32(MULTIFD_VERSION);
170     msg.id = p->id;
171     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
172 
173     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
174     if (ret != 0) {
175         return -1;
176     }
177     stat64_add(&mig_stats.multifd_bytes, size);
178     return 0;
179 }
180 
181 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
182 {
183     MultiFDInit_t msg;
184     int ret;
185 
186     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
187     if (ret != 0) {
188         return -1;
189     }
190 
191     msg.magic = be32_to_cpu(msg.magic);
192     msg.version = be32_to_cpu(msg.version);
193 
194     if (msg.magic != MULTIFD_MAGIC) {
195         error_setg(errp, "multifd: received packet magic %x "
196                    "expected %x", msg.magic, MULTIFD_MAGIC);
197         return -1;
198     }
199 
200     if (msg.version != MULTIFD_VERSION) {
201         error_setg(errp, "multifd: received packet version %u "
202                    "expected %u", msg.version, MULTIFD_VERSION);
203         return -1;
204     }
205 
206     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
207         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
208         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
209 
210         error_setg(errp, "multifd: received uuid '%s' and expected "
211                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
212         g_free(uuid);
213         g_free(msg_uuid);
214         return -1;
215     }
216 
217     if (msg.id > migrate_multifd_channels()) {
218         error_setg(errp, "multifd: received channel id %u is greater than "
219                    "number of channels %u", msg.id, migrate_multifd_channels());
220         return -1;
221     }
222 
223     return msg.id;
224 }
225 
226 /* Fills a RAM multifd packet */
227 void multifd_send_fill_packet(MultiFDSendParams *p)
228 {
229     MultiFDPacket_t *packet = p->packet;
230     uint64_t packet_num;
231     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
232 
233     memset(packet, 0, p->packet_len);
234 
235     packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
236     packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
237 
238     packet->hdr.flags = cpu_to_be32(p->flags);
239     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
240 
241     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
242     packet->packet_num = cpu_to_be64(packet_num);
243 
244     p->packets_sent++;
245 
246     if (!sync_packet) {
247         multifd_ram_fill_packet(p);
248     }
249 
250     trace_multifd_send_fill(p->id, packet_num,
251                             p->flags, p->next_packet_size);
252 }
253 
254 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
255                                              const MultiFDPacketHdr_t *hdr,
256                                              Error **errp)
257 {
258     uint32_t magic = be32_to_cpu(hdr->magic);
259     uint32_t version = be32_to_cpu(hdr->version);
260 
261     if (magic != MULTIFD_MAGIC) {
262         error_setg(errp, "multifd: received packet magic %x, expected %x",
263                    magic, MULTIFD_MAGIC);
264         return -1;
265     }
266 
267     if (version != MULTIFD_VERSION) {
268         error_setg(errp, "multifd: received packet version %u, expected %u",
269                    version, MULTIFD_VERSION);
270         return -1;
271     }
272 
273     p->flags = be32_to_cpu(hdr->flags);
274 
275     return 0;
276 }
277 
278 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
279                                                    Error **errp)
280 {
281     MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
282 
283     packet->instance_id = be32_to_cpu(packet->instance_id);
284     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
285 
286     return 0;
287 }
288 
289 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
290 {
291     const MultiFDPacket_t *packet = p->packet;
292     int ret = 0;
293 
294     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
295     p->packet_num = be64_to_cpu(packet->packet_num);
296 
297     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
298     ret = multifd_ram_unfill_packet(p, errp);
299 
300     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
301                               p->next_packet_size);
302 
303     return ret;
304 }
305 
306 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
307 {
308     p->packets_recved++;
309 
310     if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
311         return multifd_recv_unfill_packet_device_state(p, errp);
312     }
313 
314     return multifd_recv_unfill_packet_ram(p, errp);
315 }
316 
317 static bool multifd_send_should_exit(void)
318 {
319     return qatomic_read(&multifd_send_state->exiting);
320 }
321 
322 static bool multifd_recv_should_exit(void)
323 {
324     return qatomic_read(&multifd_recv_state->exiting);
325 }
326 
327 /*
328  * The migration thread can wait on either of the two semaphores.  This
329  * function can be used to kick the main thread out of waiting on either of
330  * them.  Should mostly only be called when something wrong happened with
331  * the current multifd send thread.
332  */
333 static void multifd_send_kick_main(MultiFDSendParams *p)
334 {
335     qemu_sem_post(&p->sem_sync);
336     qemu_sem_post(&multifd_send_state->channels_ready);
337 }
338 
339 /*
340  * multifd_send() works by exchanging the MultiFDSendData object
341  * provided by the caller with an unused MultiFDSendData object from
342  * the next channel that is found to be idle.
343  *
344  * The channel owns the data until it finishes transmitting and the
345  * caller owns the empty object until it fills it with data and calls
346  * this function again. No locking necessary.
347  *
348  * Switching is safe because both the migration thread and the channel
349  * thread have barriers in place to serialize access.
350  *
351  * Returns true if succeed, false otherwise.
352  */
353 bool multifd_send(MultiFDSendData **send_data)
354 {
355     int i;
356     static int next_channel;
357     MultiFDSendParams *p = NULL; /* make happy gcc */
358     MultiFDSendData *tmp;
359 
360     if (multifd_send_should_exit()) {
361         return false;
362     }
363 
364     QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
365 
366     /* We wait here, until at least one channel is ready */
367     qemu_sem_wait(&multifd_send_state->channels_ready);
368 
369     /*
370      * next_channel can remain from a previous migration that was
371      * using more channels, so ensure it doesn't overflow if the
372      * limit is lower now.
373      */
374     next_channel %= migrate_multifd_channels();
375     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
376         if (multifd_send_should_exit()) {
377             return false;
378         }
379         p = &multifd_send_state->params[i];
380         /*
381          * Lockless read to p->pending_job is safe, because only multifd
382          * sender thread can clear it.
383          */
384         if (qatomic_read(&p->pending_job) == false) {
385             next_channel = (i + 1) % migrate_multifd_channels();
386             break;
387         }
388     }
389 
390     /*
391      * Make sure we read p->pending_job before all the rest.  Pairs with
392      * qatomic_store_release() in multifd_send_thread().
393      */
394     smp_mb_acquire();
395 
396     assert(multifd_payload_empty(p->data));
397 
398     /*
399      * Swap the pointers. The channel gets the client data for
400      * transferring and the client gets back an unused data slot.
401      */
402     tmp = *send_data;
403     *send_data = p->data;
404     p->data = tmp;
405 
406     /*
407      * Making sure p->data is setup before marking pending_job=true. Pairs
408      * with the qatomic_load_acquire() in multifd_send_thread().
409      */
410     qatomic_store_release(&p->pending_job, true);
411     qemu_sem_post(&p->sem);
412 
413     return true;
414 }
415 
416 /* Multifd send side hit an error; remember it and prepare to quit */
417 static void multifd_send_set_error(Error *err)
418 {
419     /*
420      * We don't want to exit each threads twice.  Depending on where
421      * we get the error, or if there are two independent errors in two
422      * threads at the same time, we can end calling this function
423      * twice.
424      */
425     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
426         return;
427     }
428 
429     if (err) {
430         MigrationState *s = migrate_get_current();
431         migrate_set_error(s, err);
432         if (s->state == MIGRATION_STATUS_SETUP ||
433             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
434             s->state == MIGRATION_STATUS_DEVICE ||
435             s->state == MIGRATION_STATUS_ACTIVE) {
436             migrate_set_state(&s->state, s->state,
437                               MIGRATION_STATUS_FAILED);
438         }
439     }
440 }
441 
442 /*
443  * Gracefully shutdown IOChannels. Only needed for successful migrations on
444  * top of TLS channels.  Otherwise it is same to qio_channel_shutdown().
445  *
446  * A successful migration also guarantees multifd sender threads are
447  * properly flushed and halted.  It is only safe to send BYE in the
448  * migration thread here when we know there's no other thread writting to
449  * the channel, because GnuTLS doesn't support concurrent writers.
450  */
451 static void migration_ioc_shutdown_gracefully(QIOChannel *ioc)
452 {
453     Error *local_err = NULL;
454 
455     if (!migration_has_failed(migrate_get_current()) &&
456         object_dynamic_cast((Object *)ioc, TYPE_QIO_CHANNEL_TLS)) {
457 
458         /*
459          * The destination expects the TLS session to always be properly
460          * terminated. This helps to detect a premature termination in the
461          * middle of the stream.  Note that older QEMUs always break the
462          * connection on the source and the destination always sees
463          * GNUTLS_E_PREMATURE_TERMINATION.
464          */
465         migration_tls_channel_end(ioc, &local_err);
466         if (local_err) {
467             warn_reportf_err(local_err,
468                         "Failed to gracefully terminate TLS connection: ");
469         }
470     }
471 
472     qio_channel_shutdown(ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
473 }
474 
475 static void multifd_send_terminate_threads(void)
476 {
477     int i;
478 
479     trace_multifd_send_terminate_threads();
480 
481     /*
482      * Tell everyone we're quitting.  No xchg() needed here; we simply
483      * always set it.
484      */
485     qatomic_set(&multifd_send_state->exiting, 1);
486 
487     /*
488      * Firstly, kick all threads out; no matter whether they are just idle,
489      * or blocked in an IO system call.
490      */
491     for (i = 0; i < migrate_multifd_channels(); i++) {
492         MultiFDSendParams *p = &multifd_send_state->params[i];
493 
494         qemu_sem_post(&p->sem);
495         if (p->c) {
496             migration_ioc_shutdown_gracefully(p->c);
497         }
498     }
499 
500     /*
501      * Finally recycle all the threads.
502      */
503     for (i = 0; i < migrate_multifd_channels(); i++) {
504         MultiFDSendParams *p = &multifd_send_state->params[i];
505 
506         if (p->tls_thread_created) {
507             qemu_thread_join(&p->tls_thread);
508         }
509 
510         if (p->thread_created) {
511             qemu_thread_join(&p->thread);
512         }
513     }
514 }
515 
516 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
517 {
518     if (p->c) {
519         migration_ioc_unregister_yank(p->c);
520         /*
521          * The object_unref() cannot guarantee the fd will always be
522          * released because finalize() of the iochannel is only
523          * triggered on the last reference and it's not guaranteed
524          * that we always hold the last refcount when reaching here.
525          *
526          * Closing the fd explicitly has the benefit that if there is any
527          * registered I/O handler callbacks on such fd, that will get a
528          * POLLNVAL event and will further trigger the cleanup to finally
529          * release the IOC.
530          *
531          * FIXME: It should logically be guaranteed that all multifd
532          * channels have no I/O handler callback registered when reaching
533          * here, because migration thread will wait for all multifd channel
534          * establishments to complete during setup.  Since
535          * migration_cleanup() will be scheduled in main thread too, all
536          * previous callbacks should guarantee to be completed when
537          * reaching here.  See multifd_send_state.channels_created and its
538          * usage.  In the future, we could replace this with an assert
539          * making sure we're the last reference, or simply drop it if above
540          * is more clear to be justified.
541          */
542         qio_channel_close(p->c, &error_abort);
543         object_unref(OBJECT(p->c));
544         p->c = NULL;
545     }
546     qemu_sem_destroy(&p->sem);
547     qemu_sem_destroy(&p->sem_sync);
548     g_free(p->name);
549     p->name = NULL;
550     g_clear_pointer(&p->data, multifd_send_data_free);
551     p->packet_len = 0;
552     g_clear_pointer(&p->packet_device_state, g_free);
553     g_free(p->packet);
554     p->packet = NULL;
555     multifd_send_state->ops->send_cleanup(p, errp);
556     assert(!p->iov);
557 
558     return *errp == NULL;
559 }
560 
561 static void multifd_send_cleanup_state(void)
562 {
563     file_cleanup_outgoing_migration();
564     socket_cleanup_outgoing_migration();
565     multifd_device_state_send_cleanup();
566     qemu_sem_destroy(&multifd_send_state->channels_created);
567     qemu_sem_destroy(&multifd_send_state->channels_ready);
568     qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
569     g_free(multifd_send_state->params);
570     multifd_send_state->params = NULL;
571     g_free(multifd_send_state);
572     multifd_send_state = NULL;
573 }
574 
575 void multifd_send_shutdown(void)
576 {
577     int i;
578 
579     if (!migrate_multifd()) {
580         return;
581     }
582 
583     multifd_send_terminate_threads();
584 
585     for (i = 0; i < migrate_multifd_channels(); i++) {
586         MultiFDSendParams *p = &multifd_send_state->params[i];
587         Error *local_err = NULL;
588 
589         if (!multifd_send_cleanup_channel(p, &local_err)) {
590             migrate_set_error(migrate_get_current(), local_err);
591             error_free(local_err);
592         }
593     }
594 
595     multifd_send_cleanup_state();
596 }
597 
598 static int multifd_zero_copy_flush(QIOChannel *c)
599 {
600     int ret;
601     Error *err = NULL;
602 
603     ret = qio_channel_flush(c, &err);
604     if (ret < 0) {
605         error_report_err(err);
606         return -1;
607     }
608     if (ret == 1) {
609         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
610     }
611 
612     return ret;
613 }
614 
615 int multifd_send_sync_main(MultiFDSyncReq req)
616 {
617     int i;
618     bool flush_zero_copy;
619 
620     assert(req != MULTIFD_SYNC_NONE);
621 
622     flush_zero_copy = migrate_zero_copy_send();
623 
624     for (i = 0; i < migrate_multifd_channels(); i++) {
625         MultiFDSendParams *p = &multifd_send_state->params[i];
626 
627         if (multifd_send_should_exit()) {
628             return -1;
629         }
630 
631         trace_multifd_send_sync_main_signal(p->id);
632 
633         /*
634          * We should be the only user so far, so not possible to be set by
635          * others concurrently.
636          */
637         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
638         qatomic_set(&p->pending_sync, req);
639         qemu_sem_post(&p->sem);
640     }
641     for (i = 0; i < migrate_multifd_channels(); i++) {
642         MultiFDSendParams *p = &multifd_send_state->params[i];
643 
644         if (multifd_send_should_exit()) {
645             return -1;
646         }
647 
648         qemu_sem_wait(&multifd_send_state->channels_ready);
649         trace_multifd_send_sync_main_wait(p->id);
650         qemu_sem_wait(&p->sem_sync);
651 
652         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
653             return -1;
654         }
655     }
656     trace_multifd_send_sync_main(multifd_send_state->packet_num);
657 
658     return 0;
659 }
660 
661 static void *multifd_send_thread(void *opaque)
662 {
663     MultiFDSendParams *p = opaque;
664     MigrationThread *thread = NULL;
665     Error *local_err = NULL;
666     int ret = 0;
667     bool use_packets = multifd_use_packets();
668 
669     thread = migration_threads_add(p->name, qemu_get_thread_id());
670 
671     trace_multifd_send_thread_start(p->id);
672     rcu_register_thread();
673 
674     if (use_packets) {
675         if (multifd_send_initial_packet(p, &local_err) < 0) {
676             ret = -1;
677             goto out;
678         }
679     }
680 
681     while (true) {
682         qemu_sem_post(&multifd_send_state->channels_ready);
683         qemu_sem_wait(&p->sem);
684 
685         if (multifd_send_should_exit()) {
686             break;
687         }
688 
689         /*
690          * Read pending_job flag before p->data.  Pairs with the
691          * qatomic_store_release() in multifd_send().
692          */
693         if (qatomic_load_acquire(&p->pending_job)) {
694             bool is_device_state = multifd_payload_device_state(p->data);
695             size_t total_size;
696             int write_flags_masked = 0;
697 
698             p->flags = 0;
699             p->iovs_num = 0;
700             assert(!multifd_payload_empty(p->data));
701 
702             if (is_device_state) {
703                 multifd_device_state_send_prepare(p);
704 
705                 /* Device state packets cannot be sent via zerocopy */
706                 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
707             } else {
708                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
709                 if (ret != 0) {
710                     break;
711                 }
712             }
713 
714             /*
715              * The packet header in the zerocopy RAM case is accounted for
716              * in multifd_nocomp_send_prepare() - where it is actually
717              * being sent.
718              */
719             total_size = iov_size(p->iov, p->iovs_num);
720 
721             if (migrate_mapped_ram()) {
722                 assert(!is_device_state);
723 
724                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
725                                               &p->data->u.ram, &local_err);
726             } else {
727                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
728                                                   NULL, 0,
729                                                   p->write_flags & ~write_flags_masked,
730                                                   &local_err);
731             }
732 
733             if (ret != 0) {
734                 break;
735             }
736 
737             stat64_add(&mig_stats.multifd_bytes, total_size);
738 
739             p->next_packet_size = 0;
740             multifd_send_data_clear(p->data);
741 
742             /*
743              * Making sure p->data is published before saying "we're
744              * free".  Pairs with the smp_mb_acquire() in
745              * multifd_send().
746              */
747             qatomic_store_release(&p->pending_job, false);
748         } else {
749             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
750 
751             /*
752              * If not a normal job, must be a sync request.  Note that
753              * pending_sync is a standalone flag (unlike pending_job), so
754              * it doesn't require explicit memory barriers.
755              */
756             assert(req != MULTIFD_SYNC_NONE);
757 
758             /* Only push the SYNC message if it involves a remote sync */
759             if (req == MULTIFD_SYNC_ALL) {
760                 p->flags = MULTIFD_FLAG_SYNC;
761                 multifd_send_fill_packet(p);
762                 ret = qio_channel_write_all(p->c, (void *)p->packet,
763                                             p->packet_len, &local_err);
764                 if (ret != 0) {
765                     break;
766                 }
767                 /* p->next_packet_size will always be zero for a SYNC packet */
768                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
769             }
770 
771             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
772             qemu_sem_post(&p->sem_sync);
773         }
774     }
775 
776 out:
777     if (ret) {
778         assert(local_err);
779         trace_multifd_send_error(p->id);
780         multifd_send_set_error(local_err);
781         multifd_send_kick_main(p);
782         error_free(local_err);
783     }
784 
785     rcu_unregister_thread();
786     migration_threads_remove(thread);
787     trace_multifd_send_thread_end(p->id, p->packets_sent);
788 
789     return NULL;
790 }
791 
792 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
793 
794 typedef struct {
795     MultiFDSendParams *p;
796     QIOChannelTLS *tioc;
797 } MultiFDTLSThreadArgs;
798 
799 static void *multifd_tls_handshake_thread(void *opaque)
800 {
801     MultiFDTLSThreadArgs *args = opaque;
802 
803     qio_channel_tls_handshake(args->tioc,
804                               multifd_new_send_channel_async,
805                               args->p,
806                               NULL,
807                               NULL);
808     g_free(args);
809 
810     return NULL;
811 }
812 
813 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
814                                         QIOChannel *ioc,
815                                         Error **errp)
816 {
817     MigrationState *s = migrate_get_current();
818     const char *hostname = s->hostname;
819     MultiFDTLSThreadArgs *args;
820     QIOChannelTLS *tioc;
821 
822     tioc = migration_tls_client_create(ioc, hostname, errp);
823     if (!tioc) {
824         return false;
825     }
826 
827     /*
828      * Ownership of the socket channel now transfers to the newly
829      * created TLS channel, which has already taken a reference.
830      */
831     object_unref(OBJECT(ioc));
832     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
833     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
834 
835     args = g_new0(MultiFDTLSThreadArgs, 1);
836     args->tioc = tioc;
837     args->p = p;
838 
839     p->tls_thread_created = true;
840     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
841                        multifd_tls_handshake_thread, args,
842                        QEMU_THREAD_JOINABLE);
843     return true;
844 }
845 
846 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
847 {
848     qio_channel_set_delay(ioc, false);
849 
850     migration_ioc_register_yank(ioc);
851     /* Setup p->c only if the channel is completely setup */
852     p->c = ioc;
853 
854     p->thread_created = true;
855     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
856                        QEMU_THREAD_JOINABLE);
857 }
858 
859 /*
860  * When TLS is enabled this function is called once to establish the
861  * TLS connection and a second time after the TLS handshake to create
862  * the multifd channel. Without TLS it goes straight into the channel
863  * creation.
864  */
865 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
866 {
867     MultiFDSendParams *p = opaque;
868     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
869     Error *local_err = NULL;
870     bool ret;
871 
872     trace_multifd_new_send_channel_async(p->id);
873 
874     if (qio_task_propagate_error(task, &local_err)) {
875         ret = false;
876         goto out;
877     }
878 
879     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
880                                        migrate_get_current()->hostname);
881 
882     if (migrate_channel_requires_tls_upgrade(ioc)) {
883         ret = multifd_tls_channel_connect(p, ioc, &local_err);
884         if (ret) {
885             return;
886         }
887     } else {
888         multifd_channel_connect(p, ioc);
889         ret = true;
890     }
891 
892 out:
893     /*
894      * Here we're not interested whether creation succeeded, only that
895      * it happened at all.
896      */
897     multifd_send_channel_created();
898 
899     if (ret) {
900         return;
901     }
902 
903     trace_multifd_new_send_channel_async_error(p->id, local_err);
904     multifd_send_set_error(local_err);
905     /*
906      * For error cases (TLS or non-TLS), IO channel is always freed here
907      * rather than when cleanup multifd: since p->c is not set, multifd
908      * cleanup code doesn't even know its existence.
909      */
910     object_unref(OBJECT(ioc));
911     error_free(local_err);
912 }
913 
914 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
915 {
916     if (!multifd_use_packets()) {
917         return file_send_channel_create(opaque, errp);
918     }
919 
920     socket_send_channel_create(multifd_new_send_channel_async, opaque);
921     return true;
922 }
923 
924 bool multifd_send_setup(void)
925 {
926     MigrationState *s = migrate_get_current();
927     int thread_count, ret = 0;
928     uint32_t page_count = multifd_ram_page_count();
929     bool use_packets = multifd_use_packets();
930     uint8_t i;
931 
932     if (!migrate_multifd()) {
933         return true;
934     }
935 
936     thread_count = migrate_multifd_channels();
937     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
938     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
939     qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
940     qemu_sem_init(&multifd_send_state->channels_created, 0);
941     qemu_sem_init(&multifd_send_state->channels_ready, 0);
942     qatomic_set(&multifd_send_state->exiting, 0);
943     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
944 
945     for (i = 0; i < thread_count; i++) {
946         MultiFDSendParams *p = &multifd_send_state->params[i];
947         Error *local_err = NULL;
948 
949         qemu_sem_init(&p->sem, 0);
950         qemu_sem_init(&p->sem_sync, 0);
951         p->id = i;
952         p->data = multifd_send_data_alloc();
953 
954         if (use_packets) {
955             p->packet_len = sizeof(MultiFDPacket_t)
956                           + sizeof(uint64_t) * page_count;
957             p->packet = g_malloc0(p->packet_len);
958             p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
959             p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
960             p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
961         }
962         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
963         p->write_flags = 0;
964 
965         if (!multifd_new_send_channel_create(p, &local_err)) {
966             migrate_set_error(s, local_err);
967             error_free(local_err);
968             ret = -1;
969         }
970     }
971 
972     /*
973      * Wait until channel creation has started for all channels. The
974      * creation can still fail, but no more channels will be created
975      * past this point.
976      */
977     for (i = 0; i < thread_count; i++) {
978         qemu_sem_wait(&multifd_send_state->channels_created);
979     }
980 
981     if (ret) {
982         goto err;
983     }
984 
985     for (i = 0; i < thread_count; i++) {
986         MultiFDSendParams *p = &multifd_send_state->params[i];
987         Error *local_err = NULL;
988 
989         ret = multifd_send_state->ops->send_setup(p, &local_err);
990         if (ret) {
991             migrate_set_error(s, local_err);
992             error_free(local_err);
993             goto err;
994         }
995         assert(p->iov);
996     }
997 
998     multifd_device_state_send_setup();
999 
1000     return true;
1001 
1002 err:
1003     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1004                       MIGRATION_STATUS_FAILED);
1005     return false;
1006 }
1007 
1008 bool multifd_recv(void)
1009 {
1010     int i;
1011     static int next_recv_channel;
1012     MultiFDRecvParams *p = NULL;
1013     MultiFDRecvData *data = multifd_recv_state->data;
1014 
1015     /*
1016      * next_channel can remain from a previous migration that was
1017      * using more channels, so ensure it doesn't overflow if the
1018      * limit is lower now.
1019      */
1020     next_recv_channel %= migrate_multifd_channels();
1021     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1022         if (multifd_recv_should_exit()) {
1023             return false;
1024         }
1025 
1026         p = &multifd_recv_state->params[i];
1027 
1028         if (qatomic_read(&p->pending_job) == false) {
1029             next_recv_channel = (i + 1) % migrate_multifd_channels();
1030             break;
1031         }
1032     }
1033 
1034     /*
1035      * Order pending_job read before manipulating p->data below. Pairs
1036      * with qatomic_store_release() at multifd_recv_thread().
1037      */
1038     smp_mb_acquire();
1039 
1040     assert(!p->data->size);
1041     multifd_recv_state->data = p->data;
1042     p->data = data;
1043 
1044     /*
1045      * Order p->data update before setting pending_job. Pairs with
1046      * qatomic_load_acquire() at multifd_recv_thread().
1047      */
1048     qatomic_store_release(&p->pending_job, true);
1049     qemu_sem_post(&p->sem);
1050 
1051     return true;
1052 }
1053 
1054 MultiFDRecvData *multifd_get_recv_data(void)
1055 {
1056     return multifd_recv_state->data;
1057 }
1058 
1059 static void multifd_recv_terminate_threads(Error *err)
1060 {
1061     int i;
1062 
1063     trace_multifd_recv_terminate_threads(err != NULL);
1064 
1065     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1066         return;
1067     }
1068 
1069     if (err) {
1070         MigrationState *s = migrate_get_current();
1071         migrate_set_error(s, err);
1072         if (s->state == MIGRATION_STATUS_SETUP ||
1073             s->state == MIGRATION_STATUS_ACTIVE) {
1074             migrate_set_state(&s->state, s->state,
1075                               MIGRATION_STATUS_FAILED);
1076         }
1077     }
1078 
1079     for (i = 0; i < migrate_multifd_channels(); i++) {
1080         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1081 
1082         /*
1083          * The migration thread and channels interact differently
1084          * depending on the presence of packets.
1085          */
1086         if (multifd_use_packets()) {
1087             /*
1088              * The channel receives as long as there are packets. When
1089              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1090              * channel waits for the migration thread to sync. If the
1091              * sync never happens, do it here.
1092              */
1093             qemu_sem_post(&p->sem_sync);
1094         } else {
1095             /*
1096              * The channel waits for the migration thread to give it
1097              * work. When the migration thread runs out of work, it
1098              * releases the channel and waits for any pending work to
1099              * finish. If we reach here (e.g. due to error) before the
1100              * work runs out, release the channel.
1101              */
1102             qemu_sem_post(&p->sem);
1103         }
1104 
1105         /*
1106          * We could arrive here for two reasons:
1107          *  - normal quit, i.e. everything went fine, just finished
1108          *  - error quit: We close the channels so the channel threads
1109          *    finish the qio_channel_read_all_eof()
1110          */
1111         if (p->c) {
1112             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1113         }
1114     }
1115 }
1116 
1117 void multifd_recv_shutdown(void)
1118 {
1119     if (migrate_multifd()) {
1120         multifd_recv_terminate_threads(NULL);
1121     }
1122 }
1123 
1124 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1125 {
1126     migration_ioc_unregister_yank(p->c);
1127     object_unref(OBJECT(p->c));
1128     p->c = NULL;
1129     qemu_mutex_destroy(&p->mutex);
1130     qemu_sem_destroy(&p->sem_sync);
1131     qemu_sem_destroy(&p->sem);
1132     g_free(p->data);
1133     p->data = NULL;
1134     g_free(p->name);
1135     p->name = NULL;
1136     p->packet_len = 0;
1137     g_free(p->packet);
1138     p->packet = NULL;
1139     g_clear_pointer(&p->packet_dev_state, g_free);
1140     g_free(p->normal);
1141     p->normal = NULL;
1142     g_free(p->zero);
1143     p->zero = NULL;
1144     multifd_recv_state->ops->recv_cleanup(p);
1145 }
1146 
1147 static void multifd_recv_cleanup_state(void)
1148 {
1149     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1150     g_free(multifd_recv_state->params);
1151     multifd_recv_state->params = NULL;
1152     g_free(multifd_recv_state->data);
1153     multifd_recv_state->data = NULL;
1154     g_free(multifd_recv_state);
1155     multifd_recv_state = NULL;
1156 }
1157 
1158 void multifd_recv_cleanup(void)
1159 {
1160     int i;
1161 
1162     if (!migrate_multifd()) {
1163         return;
1164     }
1165     multifd_recv_terminate_threads(NULL);
1166     for (i = 0; i < migrate_multifd_channels(); i++) {
1167         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1168 
1169         if (p->thread_created) {
1170             qemu_thread_join(&p->thread);
1171         }
1172     }
1173     for (i = 0; i < migrate_multifd_channels(); i++) {
1174         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1175     }
1176     multifd_recv_cleanup_state();
1177 }
1178 
1179 void multifd_recv_sync_main(void)
1180 {
1181     int thread_count = migrate_multifd_channels();
1182     bool file_based = !multifd_use_packets();
1183     int i;
1184 
1185     if (!migrate_multifd()) {
1186         return;
1187     }
1188 
1189     /*
1190      * File-based channels don't use packets and therefore need to
1191      * wait for more work. Release them to start the sync.
1192      */
1193     if (file_based) {
1194         for (i = 0; i < thread_count; i++) {
1195             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1196 
1197             trace_multifd_recv_sync_main_signal(p->id);
1198             qemu_sem_post(&p->sem);
1199         }
1200     }
1201 
1202     /*
1203      * Initiate the synchronization by waiting for all channels.
1204      *
1205      * For socket-based migration this means each channel has received
1206      * the SYNC packet on the stream.
1207      *
1208      * For file-based migration this means each channel is done with
1209      * the work (pending_job=false).
1210      */
1211     for (i = 0; i < thread_count; i++) {
1212         trace_multifd_recv_sync_main_wait(i);
1213         qemu_sem_wait(&multifd_recv_state->sem_sync);
1214     }
1215 
1216     if (file_based) {
1217         /*
1218          * For file-based loading is done in one iteration. We're
1219          * done.
1220          */
1221         return;
1222     }
1223 
1224     /*
1225      * Sync done. Release the channels for the next iteration.
1226      */
1227     for (i = 0; i < thread_count; i++) {
1228         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1229 
1230         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1231             if (multifd_recv_state->packet_num < p->packet_num) {
1232                 multifd_recv_state->packet_num = p->packet_num;
1233             }
1234         }
1235         trace_multifd_recv_sync_main_signal(p->id);
1236         qemu_sem_post(&p->sem_sync);
1237     }
1238     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1239 }
1240 
1241 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1242 {
1243     g_autofree char *dev_state_buf = NULL;
1244     int ret;
1245 
1246     dev_state_buf = g_malloc(p->next_packet_size);
1247 
1248     ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1249     if (ret != 0) {
1250         return ret;
1251     }
1252 
1253     if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1254         != 0) {
1255         error_setg(errp, "unterminated multifd device state idstr");
1256         return -1;
1257     }
1258 
1259     if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1260                                        p->packet_dev_state->instance_id,
1261                                        dev_state_buf, p->next_packet_size,
1262                                        errp)) {
1263         ret = -1;
1264     }
1265 
1266     return ret;
1267 }
1268 
1269 static void *multifd_recv_thread(void *opaque)
1270 {
1271     MigrationState *s = migrate_get_current();
1272     MultiFDRecvParams *p = opaque;
1273     Error *local_err = NULL;
1274     bool use_packets = multifd_use_packets();
1275     int ret;
1276 
1277     trace_multifd_recv_thread_start(p->id);
1278     rcu_register_thread();
1279 
1280     if (!s->multifd_clean_tls_termination) {
1281         p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1282     }
1283 
1284     while (true) {
1285         MultiFDPacketHdr_t hdr;
1286         uint32_t flags = 0;
1287         bool is_device_state = false;
1288         bool has_data = false;
1289         uint8_t *pkt_buf;
1290         size_t pkt_len;
1291 
1292         p->normal_num = 0;
1293 
1294         if (use_packets) {
1295             struct iovec iov = {
1296                 .iov_base = (void *)&hdr,
1297                 .iov_len = sizeof(hdr)
1298             };
1299 
1300             if (multifd_recv_should_exit()) {
1301                 break;
1302             }
1303 
1304             ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1305                                                  p->read_flags, &local_err);
1306             if (!ret) {
1307                 /* EOF */
1308                 assert(!local_err);
1309                 break;
1310             }
1311 
1312             if (ret == -1) {
1313                 break;
1314             }
1315 
1316             ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1317             if (ret) {
1318                 break;
1319             }
1320 
1321             is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1322             if (is_device_state) {
1323                 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1324                 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1325             } else {
1326                 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1327                 pkt_len = p->packet_len - sizeof(hdr);
1328             }
1329 
1330             ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1331                                            &local_err);
1332             if (!ret) {
1333                 /* EOF */
1334                 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1335                 break;
1336             }
1337 
1338             if (ret == -1) {
1339                 break;
1340             }
1341 
1342             qemu_mutex_lock(&p->mutex);
1343             ret = multifd_recv_unfill_packet(p, &local_err);
1344             if (ret) {
1345                 qemu_mutex_unlock(&p->mutex);
1346                 break;
1347             }
1348 
1349             flags = p->flags;
1350             /* recv methods don't know how to handle the SYNC flag */
1351             p->flags &= ~MULTIFD_FLAG_SYNC;
1352 
1353             if (is_device_state) {
1354                 has_data = p->next_packet_size > 0;
1355             } else {
1356                 /*
1357                  * Even if it's a SYNC packet, this needs to be set
1358                  * because older QEMUs (<9.0) still send data along with
1359                  * the SYNC packet.
1360                  */
1361                 has_data = p->normal_num || p->zero_num;
1362             }
1363 
1364             qemu_mutex_unlock(&p->mutex);
1365         } else {
1366             /*
1367              * No packets, so we need to wait for the vmstate code to
1368              * give us work.
1369              */
1370             qemu_sem_wait(&p->sem);
1371 
1372             if (multifd_recv_should_exit()) {
1373                 break;
1374             }
1375 
1376             /* pairs with qatomic_store_release() at multifd_recv() */
1377             if (!qatomic_load_acquire(&p->pending_job)) {
1378                 /*
1379                  * Migration thread did not send work, this is
1380                  * equivalent to pending_sync on the sending
1381                  * side. Post sem_sync to notify we reached this
1382                  * point.
1383                  */
1384                 qemu_sem_post(&multifd_recv_state->sem_sync);
1385                 continue;
1386             }
1387 
1388             has_data = !!p->data->size;
1389         }
1390 
1391         if (has_data) {
1392             /*
1393              * multifd thread should not be active and receive data
1394              * when migration is in the Postcopy phase. Two threads
1395              * writing the same memory area could easily corrupt
1396              * the guest state.
1397              */
1398             assert(!migration_in_postcopy());
1399             if (is_device_state) {
1400                 assert(use_packets);
1401                 ret = multifd_device_state_recv(p, &local_err);
1402             } else {
1403                 ret = multifd_recv_state->ops->recv(p, &local_err);
1404             }
1405             if (ret != 0) {
1406                 break;
1407             }
1408         } else if (is_device_state) {
1409             error_setg(&local_err,
1410                        "multifd: received empty device state packet");
1411             break;
1412         }
1413 
1414         if (use_packets) {
1415             if (flags & MULTIFD_FLAG_SYNC) {
1416                 if (is_device_state) {
1417                     error_setg(&local_err,
1418                                "multifd: received SYNC device state packet");
1419                     break;
1420                 }
1421 
1422                 qemu_sem_post(&multifd_recv_state->sem_sync);
1423                 qemu_sem_wait(&p->sem_sync);
1424             }
1425         } else {
1426             p->data->size = 0;
1427             /*
1428              * Order data->size update before clearing
1429              * pending_job. Pairs with smp_mb_acquire() at
1430              * multifd_recv().
1431              */
1432             qatomic_store_release(&p->pending_job, false);
1433         }
1434     }
1435 
1436     if (local_err) {
1437         multifd_recv_terminate_threads(local_err);
1438         error_free(local_err);
1439     }
1440 
1441     rcu_unregister_thread();
1442     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1443 
1444     return NULL;
1445 }
1446 
1447 int multifd_recv_setup(Error **errp)
1448 {
1449     int thread_count;
1450     uint32_t page_count = multifd_ram_page_count();
1451     bool use_packets = multifd_use_packets();
1452     uint8_t i;
1453 
1454     /*
1455      * Return successfully if multiFD recv state is already initialised
1456      * or multiFD is not enabled.
1457      */
1458     if (multifd_recv_state || !migrate_multifd()) {
1459         return 0;
1460     }
1461 
1462     thread_count = migrate_multifd_channels();
1463     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1464     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1465 
1466     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1467     multifd_recv_state->data->size = 0;
1468 
1469     qatomic_set(&multifd_recv_state->count, 0);
1470     qatomic_set(&multifd_recv_state->exiting, 0);
1471     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1472     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1473 
1474     for (i = 0; i < thread_count; i++) {
1475         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1476 
1477         qemu_mutex_init(&p->mutex);
1478         qemu_sem_init(&p->sem_sync, 0);
1479         qemu_sem_init(&p->sem, 0);
1480         p->pending_job = false;
1481         p->id = i;
1482 
1483         p->data = g_new0(MultiFDRecvData, 1);
1484         p->data->size = 0;
1485 
1486         if (use_packets) {
1487             p->packet_len = sizeof(MultiFDPacket_t)
1488                 + sizeof(uint64_t) * page_count;
1489             p->packet = g_malloc0(p->packet_len);
1490             p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1491         }
1492         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1493         p->normal = g_new0(ram_addr_t, page_count);
1494         p->zero = g_new0(ram_addr_t, page_count);
1495     }
1496 
1497     for (i = 0; i < thread_count; i++) {
1498         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1499         int ret;
1500 
1501         ret = multifd_recv_state->ops->recv_setup(p, errp);
1502         if (ret) {
1503             return ret;
1504         }
1505     }
1506     return 0;
1507 }
1508 
1509 bool multifd_recv_all_channels_created(void)
1510 {
1511     int thread_count = migrate_multifd_channels();
1512 
1513     if (!migrate_multifd()) {
1514         return true;
1515     }
1516 
1517     if (!multifd_recv_state) {
1518         /* Called before any connections created */
1519         return false;
1520     }
1521 
1522     return thread_count == qatomic_read(&multifd_recv_state->count);
1523 }
1524 
1525 /*
1526  * Try to receive all multifd channels to get ready for the migration.
1527  * Sets @errp when failing to receive the current channel.
1528  */
1529 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1530 {
1531     MultiFDRecvParams *p;
1532     Error *local_err = NULL;
1533     bool use_packets = multifd_use_packets();
1534     int id;
1535 
1536     if (use_packets) {
1537         id = multifd_recv_initial_packet(ioc, &local_err);
1538         if (id < 0) {
1539             multifd_recv_terminate_threads(local_err);
1540             error_propagate_prepend(errp, local_err,
1541                                     "failed to receive packet"
1542                                     " via multifd channel %d: ",
1543                                     qatomic_read(&multifd_recv_state->count));
1544             return;
1545         }
1546         trace_multifd_recv_new_channel(id);
1547     } else {
1548         id = qatomic_read(&multifd_recv_state->count);
1549     }
1550 
1551     p = &multifd_recv_state->params[id];
1552     if (p->c != NULL) {
1553         error_setg(&local_err, "multifd: received id '%d' already setup'",
1554                    id);
1555         multifd_recv_terminate_threads(local_err);
1556         error_propagate(errp, local_err);
1557         return;
1558     }
1559     p->c = ioc;
1560     object_ref(OBJECT(ioc));
1561 
1562     p->thread_created = true;
1563     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1564                        QEMU_THREAD_JOINABLE);
1565     qatomic_inc(&multifd_recv_state->count);
1566 }
1567