xref: /openbmc/qemu/migration/multifd.c (revision 48663349)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /* array of pages to sent */
53     MultiFDPages_t *pages;
54     /*
55      * Global number of generated multifd packets.
56      *
57      * Note that we used 'uintptr_t' because it'll naturally support atomic
58      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
59      * multifd will overflow the packet_num easier, but that should be
60      * fine.
61      *
62      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63      * hosts, however so far it does not support atomic fetch_add() yet.
64      * Make it easy for now.
65      */
66     uintptr_t packet_num;
67     /*
68      * Synchronization point past which no more channels will be
69      * created.
70      */
71     QemuSemaphore channels_created;
72     /* send channels ready */
73     QemuSemaphore channels_ready;
74     /*
75      * Have we already run terminate threads.  There is a race when it
76      * happens that we got one error while we are exiting.
77      * We will use atomic operations.  Only valid values are 0 and 1.
78      */
79     int exiting;
80     /* multifd ops */
81     MultiFDMethods *ops;
82 } *multifd_send_state;
83 
84 struct {
85     MultiFDRecvParams *params;
86     MultiFDRecvData *data;
87     /* number of created threads */
88     int count;
89     /*
90      * This is always posted by the recv threads, the migration thread
91      * uses it to wait for recv threads to finish assigned tasks.
92      */
93     QemuSemaphore sem_sync;
94     /* global number of generated multifd packets */
95     uint64_t packet_num;
96     int exiting;
97     /* multifd ops */
98     MultiFDMethods *ops;
99 } *multifd_recv_state;
100 
101 static bool multifd_use_packets(void)
102 {
103     return !migrate_mapped_ram();
104 }
105 
106 void multifd_send_channel_created(void)
107 {
108     qemu_sem_post(&multifd_send_state->channels_created);
109 }
110 
111 static void multifd_set_file_bitmap(MultiFDSendParams *p)
112 {
113     MultiFDPages_t *pages = p->pages;
114 
115     assert(pages->block);
116 
117     for (int i = 0; i < p->pages->normal_num; i++) {
118         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
119     }
120 
121     for (int i = p->pages->normal_num; i < p->pages->num; i++) {
122         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
123     }
124 }
125 
126 /* Multifd without compression */
127 
128 /**
129  * nocomp_send_setup: setup send side
130  *
131  * @p: Params for the channel that we are using
132  * @errp: pointer to an error
133  */
134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
135 {
136     if (migrate_zero_copy_send()) {
137         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
138     }
139 
140     return 0;
141 }
142 
143 /**
144  * nocomp_send_cleanup: cleanup send side
145  *
146  * For no compression this function does nothing.
147  *
148  * @p: Params for the channel that we are using
149  * @errp: pointer to an error
150  */
151 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
152 {
153     return;
154 }
155 
156 static void multifd_send_prepare_iovs(MultiFDSendParams *p)
157 {
158     MultiFDPages_t *pages = p->pages;
159 
160     for (int i = 0; i < pages->normal_num; i++) {
161         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
162         p->iov[p->iovs_num].iov_len = p->page_size;
163         p->iovs_num++;
164     }
165 
166     p->next_packet_size = pages->normal_num * p->page_size;
167 }
168 
169 /**
170  * nocomp_send_prepare: prepare date to be able to send
171  *
172  * For no compression we just have to calculate the size of the
173  * packet.
174  *
175  * Returns 0 for success or -1 for error
176  *
177  * @p: Params for the channel that we are using
178  * @errp: pointer to an error
179  */
180 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
181 {
182     bool use_zero_copy_send = migrate_zero_copy_send();
183     int ret;
184 
185     multifd_send_zero_page_detect(p);
186 
187     if (!multifd_use_packets()) {
188         multifd_send_prepare_iovs(p);
189         multifd_set_file_bitmap(p);
190 
191         return 0;
192     }
193 
194     if (!use_zero_copy_send) {
195         /*
196          * Only !zerocopy needs the header in IOV; zerocopy will
197          * send it separately.
198          */
199         multifd_send_prepare_header(p);
200     }
201 
202     multifd_send_prepare_iovs(p);
203     p->flags |= MULTIFD_FLAG_NOCOMP;
204 
205     multifd_send_fill_packet(p);
206 
207     if (use_zero_copy_send) {
208         /* Send header first, without zerocopy */
209         ret = qio_channel_write_all(p->c, (void *)p->packet,
210                                     p->packet_len, errp);
211         if (ret != 0) {
212             return -1;
213         }
214     }
215 
216     return 0;
217 }
218 
219 /**
220  * nocomp_recv_setup: setup receive side
221  *
222  * For no compression this function does nothing.
223  *
224  * Returns 0 for success or -1 for error
225  *
226  * @p: Params for the channel that we are using
227  * @errp: pointer to an error
228  */
229 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
230 {
231     return 0;
232 }
233 
234 /**
235  * nocomp_recv_cleanup: setup receive side
236  *
237  * For no compression this function does nothing.
238  *
239  * @p: Params for the channel that we are using
240  */
241 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
242 {
243 }
244 
245 /**
246  * nocomp_recv: read the data from the channel
247  *
248  * For no compression we just need to read things into the correct place.
249  *
250  * Returns 0 for success or -1 for error
251  *
252  * @p: Params for the channel that we are using
253  * @errp: pointer to an error
254  */
255 static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
256 {
257     uint32_t flags;
258 
259     if (!multifd_use_packets()) {
260         return multifd_file_recv_data(p, errp);
261     }
262 
263     flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
264 
265     if (flags != MULTIFD_FLAG_NOCOMP) {
266         error_setg(errp, "multifd %u: flags received %x flags expected %x",
267                    p->id, flags, MULTIFD_FLAG_NOCOMP);
268         return -1;
269     }
270 
271     multifd_recv_zero_page_process(p);
272 
273     if (!p->normal_num) {
274         return 0;
275     }
276 
277     for (int i = 0; i < p->normal_num; i++) {
278         p->iov[i].iov_base = p->host + p->normal[i];
279         p->iov[i].iov_len = p->page_size;
280         ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
281     }
282     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
283 }
284 
285 static MultiFDMethods multifd_nocomp_ops = {
286     .send_setup = nocomp_send_setup,
287     .send_cleanup = nocomp_send_cleanup,
288     .send_prepare = nocomp_send_prepare,
289     .recv_setup = nocomp_recv_setup,
290     .recv_cleanup = nocomp_recv_cleanup,
291     .recv = nocomp_recv
292 };
293 
294 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
295     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
296 };
297 
298 void multifd_register_ops(int method, MultiFDMethods *ops)
299 {
300     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
301     multifd_ops[method] = ops;
302 }
303 
304 /* Reset a MultiFDPages_t* object for the next use */
305 static void multifd_pages_reset(MultiFDPages_t *pages)
306 {
307     /*
308      * We don't need to touch offset[] array, because it will be
309      * overwritten later when reused.
310      */
311     pages->num = 0;
312     pages->normal_num = 0;
313     pages->block = NULL;
314 }
315 
316 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
317 {
318     MultiFDInit_t msg = {};
319     size_t size = sizeof(msg);
320     int ret;
321 
322     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
323     msg.version = cpu_to_be32(MULTIFD_VERSION);
324     msg.id = p->id;
325     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
326 
327     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
328     if (ret != 0) {
329         return -1;
330     }
331     stat64_add(&mig_stats.multifd_bytes, size);
332     return 0;
333 }
334 
335 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
336 {
337     MultiFDInit_t msg;
338     int ret;
339 
340     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
341     if (ret != 0) {
342         return -1;
343     }
344 
345     msg.magic = be32_to_cpu(msg.magic);
346     msg.version = be32_to_cpu(msg.version);
347 
348     if (msg.magic != MULTIFD_MAGIC) {
349         error_setg(errp, "multifd: received packet magic %x "
350                    "expected %x", msg.magic, MULTIFD_MAGIC);
351         return -1;
352     }
353 
354     if (msg.version != MULTIFD_VERSION) {
355         error_setg(errp, "multifd: received packet version %u "
356                    "expected %u", msg.version, MULTIFD_VERSION);
357         return -1;
358     }
359 
360     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
361         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
362         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
363 
364         error_setg(errp, "multifd: received uuid '%s' and expected "
365                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
366         g_free(uuid);
367         g_free(msg_uuid);
368         return -1;
369     }
370 
371     if (msg.id > migrate_multifd_channels()) {
372         error_setg(errp, "multifd: received channel id %u is greater than "
373                    "number of channels %u", msg.id, migrate_multifd_channels());
374         return -1;
375     }
376 
377     return msg.id;
378 }
379 
380 static MultiFDPages_t *multifd_pages_init(uint32_t n)
381 {
382     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
383 
384     pages->allocated = n;
385     pages->offset = g_new0(ram_addr_t, n);
386 
387     return pages;
388 }
389 
390 static void multifd_pages_clear(MultiFDPages_t *pages)
391 {
392     multifd_pages_reset(pages);
393     pages->allocated = 0;
394     g_free(pages->offset);
395     pages->offset = NULL;
396     g_free(pages);
397 }
398 
399 void multifd_send_fill_packet(MultiFDSendParams *p)
400 {
401     MultiFDPacket_t *packet = p->packet;
402     MultiFDPages_t *pages = p->pages;
403     uint64_t packet_num;
404     uint32_t zero_num = pages->num - pages->normal_num;
405     int i;
406 
407     packet->flags = cpu_to_be32(p->flags);
408     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
409     packet->normal_pages = cpu_to_be32(pages->normal_num);
410     packet->zero_pages = cpu_to_be32(zero_num);
411     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
412 
413     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
414     packet->packet_num = cpu_to_be64(packet_num);
415 
416     if (pages->block) {
417         strncpy(packet->ramblock, pages->block->idstr, 256);
418     }
419 
420     for (i = 0; i < pages->num; i++) {
421         /* there are architectures where ram_addr_t is 32 bit */
422         uint64_t temp = pages->offset[i];
423 
424         packet->offset[i] = cpu_to_be64(temp);
425     }
426 
427     p->packets_sent++;
428     p->total_normal_pages += pages->normal_num;
429     p->total_zero_pages += zero_num;
430 
431     trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
432                        p->flags, p->next_packet_size);
433 }
434 
435 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
436 {
437     MultiFDPacket_t *packet = p->packet;
438     int i;
439 
440     packet->magic = be32_to_cpu(packet->magic);
441     if (packet->magic != MULTIFD_MAGIC) {
442         error_setg(errp, "multifd: received packet "
443                    "magic %x and expected magic %x",
444                    packet->magic, MULTIFD_MAGIC);
445         return -1;
446     }
447 
448     packet->version = be32_to_cpu(packet->version);
449     if (packet->version != MULTIFD_VERSION) {
450         error_setg(errp, "multifd: received packet "
451                    "version %u and expected version %u",
452                    packet->version, MULTIFD_VERSION);
453         return -1;
454     }
455 
456     p->flags = be32_to_cpu(packet->flags);
457 
458     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
459     /*
460      * If we received a packet that is 100 times bigger than expected
461      * just stop migration.  It is a magic number.
462      */
463     if (packet->pages_alloc > p->page_count) {
464         error_setg(errp, "multifd: received packet "
465                    "with size %u and expected a size of %u",
466                    packet->pages_alloc, p->page_count) ;
467         return -1;
468     }
469 
470     p->normal_num = be32_to_cpu(packet->normal_pages);
471     if (p->normal_num > packet->pages_alloc) {
472         error_setg(errp, "multifd: received packet "
473                    "with %u normal pages and expected maximum pages are %u",
474                    p->normal_num, packet->pages_alloc) ;
475         return -1;
476     }
477 
478     p->zero_num = be32_to_cpu(packet->zero_pages);
479     if (p->zero_num > packet->pages_alloc - p->normal_num) {
480         error_setg(errp, "multifd: received packet "
481                    "with %u zero pages and expected maximum zero pages are %u",
482                    p->zero_num, packet->pages_alloc - p->normal_num) ;
483         return -1;
484     }
485 
486     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
487     p->packet_num = be64_to_cpu(packet->packet_num);
488     p->packets_recved++;
489     p->total_normal_pages += p->normal_num;
490     p->total_zero_pages += p->zero_num;
491 
492     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
493                        p->flags, p->next_packet_size);
494 
495     if (p->normal_num == 0 && p->zero_num == 0) {
496         return 0;
497     }
498 
499     /* make sure that ramblock is 0 terminated */
500     packet->ramblock[255] = 0;
501     p->block = qemu_ram_block_by_name(packet->ramblock);
502     if (!p->block) {
503         error_setg(errp, "multifd: unknown ram block %s",
504                    packet->ramblock);
505         return -1;
506     }
507 
508     p->host = p->block->host;
509     for (i = 0; i < p->normal_num; i++) {
510         uint64_t offset = be64_to_cpu(packet->offset[i]);
511 
512         if (offset > (p->block->used_length - p->page_size)) {
513             error_setg(errp, "multifd: offset too long %" PRIu64
514                        " (max " RAM_ADDR_FMT ")",
515                        offset, p->block->used_length);
516             return -1;
517         }
518         p->normal[i] = offset;
519     }
520 
521     for (i = 0; i < p->zero_num; i++) {
522         uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
523 
524         if (offset > (p->block->used_length - p->page_size)) {
525             error_setg(errp, "multifd: offset too long %" PRIu64
526                        " (max " RAM_ADDR_FMT ")",
527                        offset, p->block->used_length);
528             return -1;
529         }
530         p->zero[i] = offset;
531     }
532 
533     return 0;
534 }
535 
536 static bool multifd_send_should_exit(void)
537 {
538     return qatomic_read(&multifd_send_state->exiting);
539 }
540 
541 static bool multifd_recv_should_exit(void)
542 {
543     return qatomic_read(&multifd_recv_state->exiting);
544 }
545 
546 /*
547  * The migration thread can wait on either of the two semaphores.  This
548  * function can be used to kick the main thread out of waiting on either of
549  * them.  Should mostly only be called when something wrong happened with
550  * the current multifd send thread.
551  */
552 static void multifd_send_kick_main(MultiFDSendParams *p)
553 {
554     qemu_sem_post(&p->sem_sync);
555     qemu_sem_post(&multifd_send_state->channels_ready);
556 }
557 
558 /*
559  * How we use multifd_send_state->pages and channel->pages?
560  *
561  * We create a pages for each channel, and a main one.  Each time that
562  * we need to send a batch of pages we interchange the ones between
563  * multifd_send_state and the channel that is sending it.  There are
564  * two reasons for that:
565  *    - to not have to do so many mallocs during migration
566  *    - to make easier to know what to free at the end of migration
567  *
568  * This way we always know who is the owner of each "pages" struct,
569  * and we don't need any locking.  It belongs to the migration thread
570  * or to the channel thread.  Switching is safe because the migration
571  * thread is using the channel mutex when changing it, and the channel
572  * have to had finish with its own, otherwise pending_job can't be
573  * false.
574  *
575  * Returns true if succeed, false otherwise.
576  */
577 static bool multifd_send_pages(void)
578 {
579     int i;
580     static int next_channel;
581     MultiFDSendParams *p = NULL; /* make happy gcc */
582     MultiFDPages_t *pages = multifd_send_state->pages;
583 
584     if (multifd_send_should_exit()) {
585         return false;
586     }
587 
588     /* We wait here, until at least one channel is ready */
589     qemu_sem_wait(&multifd_send_state->channels_ready);
590 
591     /*
592      * next_channel can remain from a previous migration that was
593      * using more channels, so ensure it doesn't overflow if the
594      * limit is lower now.
595      */
596     next_channel %= migrate_multifd_channels();
597     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
598         if (multifd_send_should_exit()) {
599             return false;
600         }
601         p = &multifd_send_state->params[i];
602         /*
603          * Lockless read to p->pending_job is safe, because only multifd
604          * sender thread can clear it.
605          */
606         if (qatomic_read(&p->pending_job) == false) {
607             next_channel = (i + 1) % migrate_multifd_channels();
608             break;
609         }
610     }
611 
612     /*
613      * Make sure we read p->pending_job before all the rest.  Pairs with
614      * qatomic_store_release() in multifd_send_thread().
615      */
616     smp_mb_acquire();
617     assert(!p->pages->num);
618     multifd_send_state->pages = p->pages;
619     p->pages = pages;
620     /*
621      * Making sure p->pages is setup before marking pending_job=true. Pairs
622      * with the qatomic_load_acquire() in multifd_send_thread().
623      */
624     qatomic_store_release(&p->pending_job, true);
625     qemu_sem_post(&p->sem);
626 
627     return true;
628 }
629 
630 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
631 {
632     return pages->num == 0;
633 }
634 
635 static inline bool multifd_queue_full(MultiFDPages_t *pages)
636 {
637     return pages->num == pages->allocated;
638 }
639 
640 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
641 {
642     pages->offset[pages->num++] = offset;
643 }
644 
645 /* Returns true if enqueue successful, false otherwise */
646 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
647 {
648     MultiFDPages_t *pages;
649 
650 retry:
651     pages = multifd_send_state->pages;
652 
653     /* If the queue is empty, we can already enqueue now */
654     if (multifd_queue_empty(pages)) {
655         pages->block = block;
656         multifd_enqueue(pages, offset);
657         return true;
658     }
659 
660     /*
661      * Not empty, meanwhile we need a flush.  It can because of either:
662      *
663      * (1) The page is not on the same ramblock of previous ones, or,
664      * (2) The queue is full.
665      *
666      * After flush, always retry.
667      */
668     if (pages->block != block || multifd_queue_full(pages)) {
669         if (!multifd_send_pages()) {
670             return false;
671         }
672         goto retry;
673     }
674 
675     /* Not empty, and we still have space, do it! */
676     multifd_enqueue(pages, offset);
677     return true;
678 }
679 
680 /* Multifd send side hit an error; remember it and prepare to quit */
681 static void multifd_send_set_error(Error *err)
682 {
683     /*
684      * We don't want to exit each threads twice.  Depending on where
685      * we get the error, or if there are two independent errors in two
686      * threads at the same time, we can end calling this function
687      * twice.
688      */
689     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
690         return;
691     }
692 
693     if (err) {
694         MigrationState *s = migrate_get_current();
695         migrate_set_error(s, err);
696         if (s->state == MIGRATION_STATUS_SETUP ||
697             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
698             s->state == MIGRATION_STATUS_DEVICE ||
699             s->state == MIGRATION_STATUS_ACTIVE) {
700             migrate_set_state(&s->state, s->state,
701                               MIGRATION_STATUS_FAILED);
702         }
703     }
704 }
705 
706 static void multifd_send_terminate_threads(void)
707 {
708     int i;
709 
710     trace_multifd_send_terminate_threads();
711 
712     /*
713      * Tell everyone we're quitting.  No xchg() needed here; we simply
714      * always set it.
715      */
716     qatomic_set(&multifd_send_state->exiting, 1);
717 
718     /*
719      * Firstly, kick all threads out; no matter whether they are just idle,
720      * or blocked in an IO system call.
721      */
722     for (i = 0; i < migrate_multifd_channels(); i++) {
723         MultiFDSendParams *p = &multifd_send_state->params[i];
724 
725         qemu_sem_post(&p->sem);
726         if (p->c) {
727             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
728         }
729     }
730 
731     /*
732      * Finally recycle all the threads.
733      */
734     for (i = 0; i < migrate_multifd_channels(); i++) {
735         MultiFDSendParams *p = &multifd_send_state->params[i];
736 
737         if (p->tls_thread_created) {
738             qemu_thread_join(&p->tls_thread);
739         }
740 
741         if (p->thread_created) {
742             qemu_thread_join(&p->thread);
743         }
744     }
745 }
746 
747 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
748 {
749     if (p->c) {
750         migration_ioc_unregister_yank(p->c);
751         /*
752          * The object_unref() cannot guarantee the fd will always be
753          * released because finalize() of the iochannel is only
754          * triggered on the last reference and it's not guaranteed
755          * that we always hold the last refcount when reaching here.
756          *
757          * Closing the fd explicitly has the benefit that if there is any
758          * registered I/O handler callbacks on such fd, that will get a
759          * POLLNVAL event and will further trigger the cleanup to finally
760          * release the IOC.
761          *
762          * FIXME: It should logically be guaranteed that all multifd
763          * channels have no I/O handler callback registered when reaching
764          * here, because migration thread will wait for all multifd channel
765          * establishments to complete during setup.  Since
766          * migrate_fd_cleanup() will be scheduled in main thread too, all
767          * previous callbacks should guarantee to be completed when
768          * reaching here.  See multifd_send_state.channels_created and its
769          * usage.  In the future, we could replace this with an assert
770          * making sure we're the last reference, or simply drop it if above
771          * is more clear to be justified.
772          */
773         qio_channel_close(p->c, &error_abort);
774         object_unref(OBJECT(p->c));
775         p->c = NULL;
776     }
777     qemu_sem_destroy(&p->sem);
778     qemu_sem_destroy(&p->sem_sync);
779     g_free(p->name);
780     p->name = NULL;
781     multifd_pages_clear(p->pages);
782     p->pages = NULL;
783     p->packet_len = 0;
784     g_free(p->packet);
785     p->packet = NULL;
786     g_free(p->iov);
787     p->iov = NULL;
788     multifd_send_state->ops->send_cleanup(p, errp);
789 
790     return *errp == NULL;
791 }
792 
793 static void multifd_send_cleanup_state(void)
794 {
795     file_cleanup_outgoing_migration();
796     socket_cleanup_outgoing_migration();
797     qemu_sem_destroy(&multifd_send_state->channels_created);
798     qemu_sem_destroy(&multifd_send_state->channels_ready);
799     g_free(multifd_send_state->params);
800     multifd_send_state->params = NULL;
801     multifd_pages_clear(multifd_send_state->pages);
802     multifd_send_state->pages = NULL;
803     g_free(multifd_send_state);
804     multifd_send_state = NULL;
805 }
806 
807 void multifd_send_shutdown(void)
808 {
809     int i;
810 
811     if (!migrate_multifd()) {
812         return;
813     }
814 
815     multifd_send_terminate_threads();
816 
817     for (i = 0; i < migrate_multifd_channels(); i++) {
818         MultiFDSendParams *p = &multifd_send_state->params[i];
819         Error *local_err = NULL;
820 
821         if (!multifd_send_cleanup_channel(p, &local_err)) {
822             migrate_set_error(migrate_get_current(), local_err);
823             error_free(local_err);
824         }
825     }
826 
827     multifd_send_cleanup_state();
828 }
829 
830 static int multifd_zero_copy_flush(QIOChannel *c)
831 {
832     int ret;
833     Error *err = NULL;
834 
835     ret = qio_channel_flush(c, &err);
836     if (ret < 0) {
837         error_report_err(err);
838         return -1;
839     }
840     if (ret == 1) {
841         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
842     }
843 
844     return ret;
845 }
846 
847 int multifd_send_sync_main(void)
848 {
849     int i;
850     bool flush_zero_copy;
851 
852     if (!migrate_multifd()) {
853         return 0;
854     }
855     if (multifd_send_state->pages->num) {
856         if (!multifd_send_pages()) {
857             error_report("%s: multifd_send_pages fail", __func__);
858             return -1;
859         }
860     }
861 
862     flush_zero_copy = migrate_zero_copy_send();
863 
864     for (i = 0; i < migrate_multifd_channels(); i++) {
865         MultiFDSendParams *p = &multifd_send_state->params[i];
866 
867         if (multifd_send_should_exit()) {
868             return -1;
869         }
870 
871         trace_multifd_send_sync_main_signal(p->id);
872 
873         /*
874          * We should be the only user so far, so not possible to be set by
875          * others concurrently.
876          */
877         assert(qatomic_read(&p->pending_sync) == false);
878         qatomic_set(&p->pending_sync, true);
879         qemu_sem_post(&p->sem);
880     }
881     for (i = 0; i < migrate_multifd_channels(); i++) {
882         MultiFDSendParams *p = &multifd_send_state->params[i];
883 
884         if (multifd_send_should_exit()) {
885             return -1;
886         }
887 
888         qemu_sem_wait(&multifd_send_state->channels_ready);
889         trace_multifd_send_sync_main_wait(p->id);
890         qemu_sem_wait(&p->sem_sync);
891 
892         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
893             return -1;
894         }
895     }
896     trace_multifd_send_sync_main(multifd_send_state->packet_num);
897 
898     return 0;
899 }
900 
901 static void *multifd_send_thread(void *opaque)
902 {
903     MultiFDSendParams *p = opaque;
904     MigrationThread *thread = NULL;
905     Error *local_err = NULL;
906     int ret = 0;
907     bool use_packets = multifd_use_packets();
908 
909     thread = migration_threads_add(p->name, qemu_get_thread_id());
910 
911     trace_multifd_send_thread_start(p->id);
912     rcu_register_thread();
913 
914     if (use_packets) {
915         if (multifd_send_initial_packet(p, &local_err) < 0) {
916             ret = -1;
917             goto out;
918         }
919     }
920 
921     while (true) {
922         qemu_sem_post(&multifd_send_state->channels_ready);
923         qemu_sem_wait(&p->sem);
924 
925         if (multifd_send_should_exit()) {
926             break;
927         }
928 
929         /*
930          * Read pending_job flag before p->pages.  Pairs with the
931          * qatomic_store_release() in multifd_send_pages().
932          */
933         if (qatomic_load_acquire(&p->pending_job)) {
934             MultiFDPages_t *pages = p->pages;
935 
936             p->iovs_num = 0;
937             assert(pages->num);
938 
939             ret = multifd_send_state->ops->send_prepare(p, &local_err);
940             if (ret != 0) {
941                 break;
942             }
943 
944             if (migrate_mapped_ram()) {
945                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
946                                               p->pages->block, &local_err);
947             } else {
948                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
949                                                   NULL, 0, p->write_flags,
950                                                   &local_err);
951             }
952 
953             if (ret != 0) {
954                 break;
955             }
956 
957             stat64_add(&mig_stats.multifd_bytes,
958                        p->next_packet_size + p->packet_len);
959             stat64_add(&mig_stats.normal_pages, pages->normal_num);
960             stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
961 
962             multifd_pages_reset(p->pages);
963             p->next_packet_size = 0;
964 
965             /*
966              * Making sure p->pages is published before saying "we're
967              * free".  Pairs with the smp_mb_acquire() in
968              * multifd_send_pages().
969              */
970             qatomic_store_release(&p->pending_job, false);
971         } else {
972             /*
973              * If not a normal job, must be a sync request.  Note that
974              * pending_sync is a standalone flag (unlike pending_job), so
975              * it doesn't require explicit memory barriers.
976              */
977             assert(qatomic_read(&p->pending_sync));
978 
979             if (use_packets) {
980                 p->flags = MULTIFD_FLAG_SYNC;
981                 multifd_send_fill_packet(p);
982                 ret = qio_channel_write_all(p->c, (void *)p->packet,
983                                             p->packet_len, &local_err);
984                 if (ret != 0) {
985                     break;
986                 }
987                 /* p->next_packet_size will always be zero for a SYNC packet */
988                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
989                 p->flags = 0;
990             }
991 
992             qatomic_set(&p->pending_sync, false);
993             qemu_sem_post(&p->sem_sync);
994         }
995     }
996 
997 out:
998     if (ret) {
999         assert(local_err);
1000         trace_multifd_send_error(p->id);
1001         multifd_send_set_error(local_err);
1002         multifd_send_kick_main(p);
1003         error_free(local_err);
1004     }
1005 
1006     rcu_unregister_thread();
1007     migration_threads_remove(thread);
1008     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
1009                                   p->total_zero_pages);
1010 
1011     return NULL;
1012 }
1013 
1014 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
1015 
1016 typedef struct {
1017     MultiFDSendParams *p;
1018     QIOChannelTLS *tioc;
1019 } MultiFDTLSThreadArgs;
1020 
1021 static void *multifd_tls_handshake_thread(void *opaque)
1022 {
1023     MultiFDTLSThreadArgs *args = opaque;
1024 
1025     qio_channel_tls_handshake(args->tioc,
1026                               multifd_new_send_channel_async,
1027                               args->p,
1028                               NULL,
1029                               NULL);
1030     g_free(args);
1031 
1032     return NULL;
1033 }
1034 
1035 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
1036                                         QIOChannel *ioc,
1037                                         Error **errp)
1038 {
1039     MigrationState *s = migrate_get_current();
1040     const char *hostname = s->hostname;
1041     MultiFDTLSThreadArgs *args;
1042     QIOChannelTLS *tioc;
1043 
1044     tioc = migration_tls_client_create(ioc, hostname, errp);
1045     if (!tioc) {
1046         return false;
1047     }
1048 
1049     /*
1050      * Ownership of the socket channel now transfers to the newly
1051      * created TLS channel, which has already taken a reference.
1052      */
1053     object_unref(OBJECT(ioc));
1054     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
1055     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
1056 
1057     args = g_new0(MultiFDTLSThreadArgs, 1);
1058     args->tioc = tioc;
1059     args->p = p;
1060 
1061     p->tls_thread_created = true;
1062     qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
1063                        multifd_tls_handshake_thread, args,
1064                        QEMU_THREAD_JOINABLE);
1065     return true;
1066 }
1067 
1068 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
1069 {
1070     qio_channel_set_delay(ioc, false);
1071 
1072     migration_ioc_register_yank(ioc);
1073     /* Setup p->c only if the channel is completely setup */
1074     p->c = ioc;
1075 
1076     p->thread_created = true;
1077     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1078                        QEMU_THREAD_JOINABLE);
1079 }
1080 
1081 /*
1082  * When TLS is enabled this function is called once to establish the
1083  * TLS connection and a second time after the TLS handshake to create
1084  * the multifd channel. Without TLS it goes straight into the channel
1085  * creation.
1086  */
1087 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1088 {
1089     MultiFDSendParams *p = opaque;
1090     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1091     Error *local_err = NULL;
1092     bool ret;
1093 
1094     trace_multifd_new_send_channel_async(p->id);
1095 
1096     if (qio_task_propagate_error(task, &local_err)) {
1097         ret = false;
1098         goto out;
1099     }
1100 
1101     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1102                                        migrate_get_current()->hostname);
1103 
1104     if (migrate_channel_requires_tls_upgrade(ioc)) {
1105         ret = multifd_tls_channel_connect(p, ioc, &local_err);
1106         if (ret) {
1107             return;
1108         }
1109     } else {
1110         multifd_channel_connect(p, ioc);
1111         ret = true;
1112     }
1113 
1114 out:
1115     /*
1116      * Here we're not interested whether creation succeeded, only that
1117      * it happened at all.
1118      */
1119     multifd_send_channel_created();
1120 
1121     if (ret) {
1122         return;
1123     }
1124 
1125     trace_multifd_new_send_channel_async_error(p->id, local_err);
1126     multifd_send_set_error(local_err);
1127     /*
1128      * For error cases (TLS or non-TLS), IO channel is always freed here
1129      * rather than when cleanup multifd: since p->c is not set, multifd
1130      * cleanup code doesn't even know its existence.
1131      */
1132     object_unref(OBJECT(ioc));
1133     error_free(local_err);
1134 }
1135 
1136 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
1137 {
1138     if (!multifd_use_packets()) {
1139         return file_send_channel_create(opaque, errp);
1140     }
1141 
1142     socket_send_channel_create(multifd_new_send_channel_async, opaque);
1143     return true;
1144 }
1145 
1146 bool multifd_send_setup(void)
1147 {
1148     MigrationState *s = migrate_get_current();
1149     Error *local_err = NULL;
1150     int thread_count, ret = 0;
1151     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1152     bool use_packets = multifd_use_packets();
1153     uint8_t i;
1154 
1155     if (!migrate_multifd()) {
1156         return true;
1157     }
1158 
1159     thread_count = migrate_multifd_channels();
1160     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1161     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1162     multifd_send_state->pages = multifd_pages_init(page_count);
1163     qemu_sem_init(&multifd_send_state->channels_created, 0);
1164     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1165     qatomic_set(&multifd_send_state->exiting, 0);
1166     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1167 
1168     for (i = 0; i < thread_count; i++) {
1169         MultiFDSendParams *p = &multifd_send_state->params[i];
1170 
1171         qemu_sem_init(&p->sem, 0);
1172         qemu_sem_init(&p->sem_sync, 0);
1173         p->id = i;
1174         p->pages = multifd_pages_init(page_count);
1175 
1176         if (use_packets) {
1177             p->packet_len = sizeof(MultiFDPacket_t)
1178                           + sizeof(uint64_t) * page_count;
1179             p->packet = g_malloc0(p->packet_len);
1180             p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1181             p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1182 
1183             /* We need one extra place for the packet header */
1184             p->iov = g_new0(struct iovec, page_count + 1);
1185         } else {
1186             p->iov = g_new0(struct iovec, page_count);
1187         }
1188         p->name = g_strdup_printf("multifdsend_%d", i);
1189         p->page_size = qemu_target_page_size();
1190         p->page_count = page_count;
1191         p->write_flags = 0;
1192 
1193         if (!multifd_new_send_channel_create(p, &local_err)) {
1194             return false;
1195         }
1196     }
1197 
1198     /*
1199      * Wait until channel creation has started for all channels. The
1200      * creation can still fail, but no more channels will be created
1201      * past this point.
1202      */
1203     for (i = 0; i < thread_count; i++) {
1204         qemu_sem_wait(&multifd_send_state->channels_created);
1205     }
1206 
1207     for (i = 0; i < thread_count; i++) {
1208         MultiFDSendParams *p = &multifd_send_state->params[i];
1209 
1210         ret = multifd_send_state->ops->send_setup(p, &local_err);
1211         if (ret) {
1212             break;
1213         }
1214     }
1215 
1216     if (ret) {
1217         migrate_set_error(s, local_err);
1218         error_report_err(local_err);
1219         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1220                           MIGRATION_STATUS_FAILED);
1221         return false;
1222     }
1223 
1224     return true;
1225 }
1226 
1227 bool multifd_recv(void)
1228 {
1229     int i;
1230     static int next_recv_channel;
1231     MultiFDRecvParams *p = NULL;
1232     MultiFDRecvData *data = multifd_recv_state->data;
1233 
1234     /*
1235      * next_channel can remain from a previous migration that was
1236      * using more channels, so ensure it doesn't overflow if the
1237      * limit is lower now.
1238      */
1239     next_recv_channel %= migrate_multifd_channels();
1240     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1241         if (multifd_recv_should_exit()) {
1242             return false;
1243         }
1244 
1245         p = &multifd_recv_state->params[i];
1246 
1247         if (qatomic_read(&p->pending_job) == false) {
1248             next_recv_channel = (i + 1) % migrate_multifd_channels();
1249             break;
1250         }
1251     }
1252 
1253     /*
1254      * Order pending_job read before manipulating p->data below. Pairs
1255      * with qatomic_store_release() at multifd_recv_thread().
1256      */
1257     smp_mb_acquire();
1258 
1259     assert(!p->data->size);
1260     multifd_recv_state->data = p->data;
1261     p->data = data;
1262 
1263     /*
1264      * Order p->data update before setting pending_job. Pairs with
1265      * qatomic_load_acquire() at multifd_recv_thread().
1266      */
1267     qatomic_store_release(&p->pending_job, true);
1268     qemu_sem_post(&p->sem);
1269 
1270     return true;
1271 }
1272 
1273 MultiFDRecvData *multifd_get_recv_data(void)
1274 {
1275     return multifd_recv_state->data;
1276 }
1277 
1278 static void multifd_recv_terminate_threads(Error *err)
1279 {
1280     int i;
1281 
1282     trace_multifd_recv_terminate_threads(err != NULL);
1283 
1284     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1285         return;
1286     }
1287 
1288     if (err) {
1289         MigrationState *s = migrate_get_current();
1290         migrate_set_error(s, err);
1291         if (s->state == MIGRATION_STATUS_SETUP ||
1292             s->state == MIGRATION_STATUS_ACTIVE) {
1293             migrate_set_state(&s->state, s->state,
1294                               MIGRATION_STATUS_FAILED);
1295         }
1296     }
1297 
1298     for (i = 0; i < migrate_multifd_channels(); i++) {
1299         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1300 
1301         /*
1302          * The migration thread and channels interact differently
1303          * depending on the presence of packets.
1304          */
1305         if (multifd_use_packets()) {
1306             /*
1307              * The channel receives as long as there are packets. When
1308              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1309              * channel waits for the migration thread to sync. If the
1310              * sync never happens, do it here.
1311              */
1312             qemu_sem_post(&p->sem_sync);
1313         } else {
1314             /*
1315              * The channel waits for the migration thread to give it
1316              * work. When the migration thread runs out of work, it
1317              * releases the channel and waits for any pending work to
1318              * finish. If we reach here (e.g. due to error) before the
1319              * work runs out, release the channel.
1320              */
1321             qemu_sem_post(&p->sem);
1322         }
1323 
1324         /*
1325          * We could arrive here for two reasons:
1326          *  - normal quit, i.e. everything went fine, just finished
1327          *  - error quit: We close the channels so the channel threads
1328          *    finish the qio_channel_read_all_eof()
1329          */
1330         if (p->c) {
1331             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1332         }
1333     }
1334 }
1335 
1336 void multifd_recv_shutdown(void)
1337 {
1338     if (migrate_multifd()) {
1339         multifd_recv_terminate_threads(NULL);
1340     }
1341 }
1342 
1343 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1344 {
1345     migration_ioc_unregister_yank(p->c);
1346     object_unref(OBJECT(p->c));
1347     p->c = NULL;
1348     qemu_mutex_destroy(&p->mutex);
1349     qemu_sem_destroy(&p->sem_sync);
1350     qemu_sem_destroy(&p->sem);
1351     g_free(p->name);
1352     p->name = NULL;
1353     p->packet_len = 0;
1354     g_free(p->packet);
1355     p->packet = NULL;
1356     g_free(p->iov);
1357     p->iov = NULL;
1358     g_free(p->normal);
1359     p->normal = NULL;
1360     g_free(p->zero);
1361     p->zero = NULL;
1362     multifd_recv_state->ops->recv_cleanup(p);
1363 }
1364 
1365 static void multifd_recv_cleanup_state(void)
1366 {
1367     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1368     g_free(multifd_recv_state->params);
1369     multifd_recv_state->params = NULL;
1370     g_free(multifd_recv_state->data);
1371     multifd_recv_state->data = NULL;
1372     g_free(multifd_recv_state);
1373     multifd_recv_state = NULL;
1374 }
1375 
1376 void multifd_recv_cleanup(void)
1377 {
1378     int i;
1379 
1380     if (!migrate_multifd()) {
1381         return;
1382     }
1383     multifd_recv_terminate_threads(NULL);
1384     for (i = 0; i < migrate_multifd_channels(); i++) {
1385         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1386 
1387         if (p->thread_created) {
1388             qemu_thread_join(&p->thread);
1389         }
1390     }
1391     for (i = 0; i < migrate_multifd_channels(); i++) {
1392         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1393     }
1394     multifd_recv_cleanup_state();
1395 }
1396 
1397 void multifd_recv_sync_main(void)
1398 {
1399     int thread_count = migrate_multifd_channels();
1400     bool file_based = !multifd_use_packets();
1401     int i;
1402 
1403     if (!migrate_multifd()) {
1404         return;
1405     }
1406 
1407     /*
1408      * File-based channels don't use packets and therefore need to
1409      * wait for more work. Release them to start the sync.
1410      */
1411     if (file_based) {
1412         for (i = 0; i < thread_count; i++) {
1413             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1414 
1415             trace_multifd_recv_sync_main_signal(p->id);
1416             qemu_sem_post(&p->sem);
1417         }
1418     }
1419 
1420     /*
1421      * Initiate the synchronization by waiting for all channels.
1422      *
1423      * For socket-based migration this means each channel has received
1424      * the SYNC packet on the stream.
1425      *
1426      * For file-based migration this means each channel is done with
1427      * the work (pending_job=false).
1428      */
1429     for (i = 0; i < thread_count; i++) {
1430         trace_multifd_recv_sync_main_wait(i);
1431         qemu_sem_wait(&multifd_recv_state->sem_sync);
1432     }
1433 
1434     if (file_based) {
1435         /*
1436          * For file-based loading is done in one iteration. We're
1437          * done.
1438          */
1439         return;
1440     }
1441 
1442     /*
1443      * Sync done. Release the channels for the next iteration.
1444      */
1445     for (i = 0; i < thread_count; i++) {
1446         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1447 
1448         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1449             if (multifd_recv_state->packet_num < p->packet_num) {
1450                 multifd_recv_state->packet_num = p->packet_num;
1451             }
1452         }
1453         trace_multifd_recv_sync_main_signal(p->id);
1454         qemu_sem_post(&p->sem_sync);
1455     }
1456     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1457 }
1458 
1459 static void *multifd_recv_thread(void *opaque)
1460 {
1461     MultiFDRecvParams *p = opaque;
1462     Error *local_err = NULL;
1463     bool use_packets = multifd_use_packets();
1464     int ret;
1465 
1466     trace_multifd_recv_thread_start(p->id);
1467     rcu_register_thread();
1468 
1469     while (true) {
1470         uint32_t flags = 0;
1471         bool has_data = false;
1472         p->normal_num = 0;
1473 
1474         if (use_packets) {
1475             if (multifd_recv_should_exit()) {
1476                 break;
1477             }
1478 
1479             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1480                                            p->packet_len, &local_err);
1481             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1482                 break;
1483             }
1484 
1485             qemu_mutex_lock(&p->mutex);
1486             ret = multifd_recv_unfill_packet(p, &local_err);
1487             if (ret) {
1488                 qemu_mutex_unlock(&p->mutex);
1489                 break;
1490             }
1491 
1492             flags = p->flags;
1493             /* recv methods don't know how to handle the SYNC flag */
1494             p->flags &= ~MULTIFD_FLAG_SYNC;
1495             has_data = p->normal_num || p->zero_num;
1496             qemu_mutex_unlock(&p->mutex);
1497         } else {
1498             /*
1499              * No packets, so we need to wait for the vmstate code to
1500              * give us work.
1501              */
1502             qemu_sem_wait(&p->sem);
1503 
1504             if (multifd_recv_should_exit()) {
1505                 break;
1506             }
1507 
1508             /* pairs with qatomic_store_release() at multifd_recv() */
1509             if (!qatomic_load_acquire(&p->pending_job)) {
1510                 /*
1511                  * Migration thread did not send work, this is
1512                  * equivalent to pending_sync on the sending
1513                  * side. Post sem_sync to notify we reached this
1514                  * point.
1515                  */
1516                 qemu_sem_post(&multifd_recv_state->sem_sync);
1517                 continue;
1518             }
1519 
1520             has_data = !!p->data->size;
1521         }
1522 
1523         if (has_data) {
1524             ret = multifd_recv_state->ops->recv(p, &local_err);
1525             if (ret != 0) {
1526                 break;
1527             }
1528         }
1529 
1530         if (use_packets) {
1531             if (flags & MULTIFD_FLAG_SYNC) {
1532                 qemu_sem_post(&multifd_recv_state->sem_sync);
1533                 qemu_sem_wait(&p->sem_sync);
1534             }
1535         } else {
1536             p->total_normal_pages += p->data->size / qemu_target_page_size();
1537             p->data->size = 0;
1538             /*
1539              * Order data->size update before clearing
1540              * pending_job. Pairs with smp_mb_acquire() at
1541              * multifd_recv().
1542              */
1543             qatomic_store_release(&p->pending_job, false);
1544         }
1545     }
1546 
1547     if (local_err) {
1548         multifd_recv_terminate_threads(local_err);
1549         error_free(local_err);
1550     }
1551 
1552     rcu_unregister_thread();
1553     trace_multifd_recv_thread_end(p->id, p->packets_recved,
1554                                   p->total_normal_pages,
1555                                   p->total_zero_pages);
1556 
1557     return NULL;
1558 }
1559 
1560 int multifd_recv_setup(Error **errp)
1561 {
1562     int thread_count;
1563     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1564     bool use_packets = multifd_use_packets();
1565     uint8_t i;
1566 
1567     /*
1568      * Return successfully if multiFD recv state is already initialised
1569      * or multiFD is not enabled.
1570      */
1571     if (multifd_recv_state || !migrate_multifd()) {
1572         return 0;
1573     }
1574 
1575     thread_count = migrate_multifd_channels();
1576     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1577     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1578 
1579     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1580     multifd_recv_state->data->size = 0;
1581 
1582     qatomic_set(&multifd_recv_state->count, 0);
1583     qatomic_set(&multifd_recv_state->exiting, 0);
1584     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1585     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1586 
1587     for (i = 0; i < thread_count; i++) {
1588         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1589 
1590         qemu_mutex_init(&p->mutex);
1591         qemu_sem_init(&p->sem_sync, 0);
1592         qemu_sem_init(&p->sem, 0);
1593         p->pending_job = false;
1594         p->id = i;
1595 
1596         p->data = g_new0(MultiFDRecvData, 1);
1597         p->data->size = 0;
1598 
1599         if (use_packets) {
1600             p->packet_len = sizeof(MultiFDPacket_t)
1601                 + sizeof(uint64_t) * page_count;
1602             p->packet = g_malloc0(p->packet_len);
1603         }
1604         p->name = g_strdup_printf("multifdrecv_%d", i);
1605         p->iov = g_new0(struct iovec, page_count);
1606         p->normal = g_new0(ram_addr_t, page_count);
1607         p->zero = g_new0(ram_addr_t, page_count);
1608         p->page_count = page_count;
1609         p->page_size = qemu_target_page_size();
1610     }
1611 
1612     for (i = 0; i < thread_count; i++) {
1613         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1614         int ret;
1615 
1616         ret = multifd_recv_state->ops->recv_setup(p, errp);
1617         if (ret) {
1618             return ret;
1619         }
1620     }
1621     return 0;
1622 }
1623 
1624 bool multifd_recv_all_channels_created(void)
1625 {
1626     int thread_count = migrate_multifd_channels();
1627 
1628     if (!migrate_multifd()) {
1629         return true;
1630     }
1631 
1632     if (!multifd_recv_state) {
1633         /* Called before any connections created */
1634         return false;
1635     }
1636 
1637     return thread_count == qatomic_read(&multifd_recv_state->count);
1638 }
1639 
1640 /*
1641  * Try to receive all multifd channels to get ready for the migration.
1642  * Sets @errp when failing to receive the current channel.
1643  */
1644 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1645 {
1646     MultiFDRecvParams *p;
1647     Error *local_err = NULL;
1648     bool use_packets = multifd_use_packets();
1649     int id;
1650 
1651     if (use_packets) {
1652         id = multifd_recv_initial_packet(ioc, &local_err);
1653         if (id < 0) {
1654             multifd_recv_terminate_threads(local_err);
1655             error_propagate_prepend(errp, local_err,
1656                                     "failed to receive packet"
1657                                     " via multifd channel %d: ",
1658                                     qatomic_read(&multifd_recv_state->count));
1659             return;
1660         }
1661         trace_multifd_recv_new_channel(id);
1662     } else {
1663         id = qatomic_read(&multifd_recv_state->count);
1664     }
1665 
1666     p = &multifd_recv_state->params[id];
1667     if (p->c != NULL) {
1668         error_setg(&local_err, "multifd: received id '%d' already setup'",
1669                    id);
1670         multifd_recv_terminate_threads(local_err);
1671         error_propagate(errp, local_err);
1672         return;
1673     }
1674     p->c = ioc;
1675     object_ref(OBJECT(ioc));
1676 
1677     p->thread_created = true;
1678     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1679                        QEMU_THREAD_JOINABLE);
1680     qatomic_inc(&multifd_recv_state->count);
1681 }
1682 
1683 bool multifd_send_prepare_common(MultiFDSendParams *p)
1684 {
1685     multifd_send_zero_page_detect(p);
1686 
1687     if (!p->pages->normal_num) {
1688         p->next_packet_size = 0;
1689         return false;
1690     }
1691 
1692     multifd_send_prepare_header(p);
1693 
1694     return true;
1695 }
1696