xref: /openbmc/qemu/migration/multifd.c (revision 646746a1)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /* array of pages to sent */
53     MultiFDPages_t *pages;
54     /*
55      * Global number of generated multifd packets.
56      *
57      * Note that we used 'uintptr_t' because it'll naturally support atomic
58      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
59      * multifd will overflow the packet_num easier, but that should be
60      * fine.
61      *
62      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63      * hosts, however so far it does not support atomic fetch_add() yet.
64      * Make it easy for now.
65      */
66     uintptr_t packet_num;
67     /*
68      * Synchronization point past which no more channels will be
69      * created.
70      */
71     QemuSemaphore channels_created;
72     /* send channels ready */
73     QemuSemaphore channels_ready;
74     /*
75      * Have we already run terminate threads.  There is a race when it
76      * happens that we got one error while we are exiting.
77      * We will use atomic operations.  Only valid values are 0 and 1.
78      */
79     int exiting;
80     /* multifd ops */
81     MultiFDMethods *ops;
82 } *multifd_send_state;
83 
84 struct {
85     MultiFDRecvParams *params;
86     MultiFDRecvData *data;
87     /* number of created threads */
88     int count;
89     /*
90      * This is always posted by the recv threads, the migration thread
91      * uses it to wait for recv threads to finish assigned tasks.
92      */
93     QemuSemaphore sem_sync;
94     /* global number of generated multifd packets */
95     uint64_t packet_num;
96     int exiting;
97     /* multifd ops */
98     MultiFDMethods *ops;
99 } *multifd_recv_state;
100 
101 static bool multifd_use_packets(void)
102 {
103     return !migrate_mapped_ram();
104 }
105 
106 void multifd_send_channel_created(void)
107 {
108     qemu_sem_post(&multifd_send_state->channels_created);
109 }
110 
111 static void multifd_set_file_bitmap(MultiFDSendParams *p)
112 {
113     MultiFDPages_t *pages = p->pages;
114 
115     assert(pages->block);
116 
117     for (int i = 0; i < p->pages->normal_num; i++) {
118         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
119     }
120 
121     for (int i = p->pages->normal_num; i < p->pages->num; i++) {
122         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
123     }
124 }
125 
126 /* Multifd without compression */
127 
128 /**
129  * nocomp_send_setup: setup send side
130  *
131  * @p: Params for the channel that we are using
132  * @errp: pointer to an error
133  */
134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
135 {
136     if (migrate_zero_copy_send()) {
137         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
138     }
139 
140     if (multifd_use_packets()) {
141         /* We need one extra place for the packet header */
142         p->iov = g_new0(struct iovec, p->page_count + 1);
143     } else {
144         p->iov = g_new0(struct iovec, p->page_count);
145     }
146 
147     return 0;
148 }
149 
150 /**
151  * nocomp_send_cleanup: cleanup send side
152  *
153  * For no compression this function does nothing.
154  *
155  * @p: Params for the channel that we are using
156  * @errp: pointer to an error
157  */
158 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
159 {
160     g_free(p->iov);
161     p->iov = NULL;
162     return;
163 }
164 
165 static void multifd_send_prepare_iovs(MultiFDSendParams *p)
166 {
167     MultiFDPages_t *pages = p->pages;
168 
169     for (int i = 0; i < pages->normal_num; i++) {
170         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
171         p->iov[p->iovs_num].iov_len = p->page_size;
172         p->iovs_num++;
173     }
174 
175     p->next_packet_size = pages->normal_num * p->page_size;
176 }
177 
178 /**
179  * nocomp_send_prepare: prepare date to be able to send
180  *
181  * For no compression we just have to calculate the size of the
182  * packet.
183  *
184  * Returns 0 for success or -1 for error
185  *
186  * @p: Params for the channel that we are using
187  * @errp: pointer to an error
188  */
189 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
190 {
191     bool use_zero_copy_send = migrate_zero_copy_send();
192     int ret;
193 
194     multifd_send_zero_page_detect(p);
195 
196     if (!multifd_use_packets()) {
197         multifd_send_prepare_iovs(p);
198         multifd_set_file_bitmap(p);
199 
200         return 0;
201     }
202 
203     if (!use_zero_copy_send) {
204         /*
205          * Only !zerocopy needs the header in IOV; zerocopy will
206          * send it separately.
207          */
208         multifd_send_prepare_header(p);
209     }
210 
211     multifd_send_prepare_iovs(p);
212     p->flags |= MULTIFD_FLAG_NOCOMP;
213 
214     multifd_send_fill_packet(p);
215 
216     if (use_zero_copy_send) {
217         /* Send header first, without zerocopy */
218         ret = qio_channel_write_all(p->c, (void *)p->packet,
219                                     p->packet_len, errp);
220         if (ret != 0) {
221             return -1;
222         }
223     }
224 
225     return 0;
226 }
227 
228 /**
229  * nocomp_recv_setup: setup receive side
230  *
231  * For no compression this function does nothing.
232  *
233  * Returns 0 for success or -1 for error
234  *
235  * @p: Params for the channel that we are using
236  * @errp: pointer to an error
237  */
238 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
239 {
240     p->iov = g_new0(struct iovec, p->page_count);
241     return 0;
242 }
243 
244 /**
245  * nocomp_recv_cleanup: setup receive side
246  *
247  * For no compression this function does nothing.
248  *
249  * @p: Params for the channel that we are using
250  */
251 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
252 {
253     g_free(p->iov);
254     p->iov = NULL;
255 }
256 
257 /**
258  * nocomp_recv: read the data from the channel
259  *
260  * For no compression we just need to read things into the correct place.
261  *
262  * Returns 0 for success or -1 for error
263  *
264  * @p: Params for the channel that we are using
265  * @errp: pointer to an error
266  */
267 static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
268 {
269     uint32_t flags;
270 
271     if (!multifd_use_packets()) {
272         return multifd_file_recv_data(p, errp);
273     }
274 
275     flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
276 
277     if (flags != MULTIFD_FLAG_NOCOMP) {
278         error_setg(errp, "multifd %u: flags received %x flags expected %x",
279                    p->id, flags, MULTIFD_FLAG_NOCOMP);
280         return -1;
281     }
282 
283     multifd_recv_zero_page_process(p);
284 
285     if (!p->normal_num) {
286         return 0;
287     }
288 
289     for (int i = 0; i < p->normal_num; i++) {
290         p->iov[i].iov_base = p->host + p->normal[i];
291         p->iov[i].iov_len = p->page_size;
292         ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
293     }
294     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
295 }
296 
297 static MultiFDMethods multifd_nocomp_ops = {
298     .send_setup = nocomp_send_setup,
299     .send_cleanup = nocomp_send_cleanup,
300     .send_prepare = nocomp_send_prepare,
301     .recv_setup = nocomp_recv_setup,
302     .recv_cleanup = nocomp_recv_cleanup,
303     .recv = nocomp_recv
304 };
305 
306 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
307     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
308 };
309 
310 void multifd_register_ops(int method, MultiFDMethods *ops)
311 {
312     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
313     multifd_ops[method] = ops;
314 }
315 
316 /* Reset a MultiFDPages_t* object for the next use */
317 static void multifd_pages_reset(MultiFDPages_t *pages)
318 {
319     /*
320      * We don't need to touch offset[] array, because it will be
321      * overwritten later when reused.
322      */
323     pages->num = 0;
324     pages->normal_num = 0;
325     pages->block = NULL;
326 }
327 
328 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
329 {
330     MultiFDInit_t msg = {};
331     size_t size = sizeof(msg);
332     int ret;
333 
334     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
335     msg.version = cpu_to_be32(MULTIFD_VERSION);
336     msg.id = p->id;
337     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
338 
339     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
340     if (ret != 0) {
341         return -1;
342     }
343     stat64_add(&mig_stats.multifd_bytes, size);
344     return 0;
345 }
346 
347 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
348 {
349     MultiFDInit_t msg;
350     int ret;
351 
352     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
353     if (ret != 0) {
354         return -1;
355     }
356 
357     msg.magic = be32_to_cpu(msg.magic);
358     msg.version = be32_to_cpu(msg.version);
359 
360     if (msg.magic != MULTIFD_MAGIC) {
361         error_setg(errp, "multifd: received packet magic %x "
362                    "expected %x", msg.magic, MULTIFD_MAGIC);
363         return -1;
364     }
365 
366     if (msg.version != MULTIFD_VERSION) {
367         error_setg(errp, "multifd: received packet version %u "
368                    "expected %u", msg.version, MULTIFD_VERSION);
369         return -1;
370     }
371 
372     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
373         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
374         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
375 
376         error_setg(errp, "multifd: received uuid '%s' and expected "
377                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
378         g_free(uuid);
379         g_free(msg_uuid);
380         return -1;
381     }
382 
383     if (msg.id > migrate_multifd_channels()) {
384         error_setg(errp, "multifd: received channel id %u is greater than "
385                    "number of channels %u", msg.id, migrate_multifd_channels());
386         return -1;
387     }
388 
389     return msg.id;
390 }
391 
392 static MultiFDPages_t *multifd_pages_init(uint32_t n)
393 {
394     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
395 
396     pages->allocated = n;
397     pages->offset = g_new0(ram_addr_t, n);
398 
399     return pages;
400 }
401 
402 static void multifd_pages_clear(MultiFDPages_t *pages)
403 {
404     multifd_pages_reset(pages);
405     pages->allocated = 0;
406     g_free(pages->offset);
407     pages->offset = NULL;
408     g_free(pages);
409 }
410 
411 void multifd_send_fill_packet(MultiFDSendParams *p)
412 {
413     MultiFDPacket_t *packet = p->packet;
414     MultiFDPages_t *pages = p->pages;
415     uint64_t packet_num;
416     uint32_t zero_num = pages->num - pages->normal_num;
417     int i;
418 
419     packet->flags = cpu_to_be32(p->flags);
420     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
421     packet->normal_pages = cpu_to_be32(pages->normal_num);
422     packet->zero_pages = cpu_to_be32(zero_num);
423     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
424 
425     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
426     packet->packet_num = cpu_to_be64(packet_num);
427 
428     if (pages->block) {
429         strncpy(packet->ramblock, pages->block->idstr, 256);
430     }
431 
432     for (i = 0; i < pages->num; i++) {
433         /* there are architectures where ram_addr_t is 32 bit */
434         uint64_t temp = pages->offset[i];
435 
436         packet->offset[i] = cpu_to_be64(temp);
437     }
438 
439     p->packets_sent++;
440     p->total_normal_pages += pages->normal_num;
441     p->total_zero_pages += zero_num;
442 
443     trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
444                        p->flags, p->next_packet_size);
445 }
446 
447 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
448 {
449     MultiFDPacket_t *packet = p->packet;
450     int i;
451 
452     packet->magic = be32_to_cpu(packet->magic);
453     if (packet->magic != MULTIFD_MAGIC) {
454         error_setg(errp, "multifd: received packet "
455                    "magic %x and expected magic %x",
456                    packet->magic, MULTIFD_MAGIC);
457         return -1;
458     }
459 
460     packet->version = be32_to_cpu(packet->version);
461     if (packet->version != MULTIFD_VERSION) {
462         error_setg(errp, "multifd: received packet "
463                    "version %u and expected version %u",
464                    packet->version, MULTIFD_VERSION);
465         return -1;
466     }
467 
468     p->flags = be32_to_cpu(packet->flags);
469 
470     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
471     /*
472      * If we received a packet that is 100 times bigger than expected
473      * just stop migration.  It is a magic number.
474      */
475     if (packet->pages_alloc > p->page_count) {
476         error_setg(errp, "multifd: received packet "
477                    "with size %u and expected a size of %u",
478                    packet->pages_alloc, p->page_count) ;
479         return -1;
480     }
481 
482     p->normal_num = be32_to_cpu(packet->normal_pages);
483     if (p->normal_num > packet->pages_alloc) {
484         error_setg(errp, "multifd: received packet "
485                    "with %u normal pages and expected maximum pages are %u",
486                    p->normal_num, packet->pages_alloc) ;
487         return -1;
488     }
489 
490     p->zero_num = be32_to_cpu(packet->zero_pages);
491     if (p->zero_num > packet->pages_alloc - p->normal_num) {
492         error_setg(errp, "multifd: received packet "
493                    "with %u zero pages and expected maximum zero pages are %u",
494                    p->zero_num, packet->pages_alloc - p->normal_num) ;
495         return -1;
496     }
497 
498     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
499     p->packet_num = be64_to_cpu(packet->packet_num);
500     p->packets_recved++;
501     p->total_normal_pages += p->normal_num;
502     p->total_zero_pages += p->zero_num;
503 
504     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
505                        p->flags, p->next_packet_size);
506 
507     if (p->normal_num == 0 && p->zero_num == 0) {
508         return 0;
509     }
510 
511     /* make sure that ramblock is 0 terminated */
512     packet->ramblock[255] = 0;
513     p->block = qemu_ram_block_by_name(packet->ramblock);
514     if (!p->block) {
515         error_setg(errp, "multifd: unknown ram block %s",
516                    packet->ramblock);
517         return -1;
518     }
519 
520     p->host = p->block->host;
521     for (i = 0; i < p->normal_num; i++) {
522         uint64_t offset = be64_to_cpu(packet->offset[i]);
523 
524         if (offset > (p->block->used_length - p->page_size)) {
525             error_setg(errp, "multifd: offset too long %" PRIu64
526                        " (max " RAM_ADDR_FMT ")",
527                        offset, p->block->used_length);
528             return -1;
529         }
530         p->normal[i] = offset;
531     }
532 
533     for (i = 0; i < p->zero_num; i++) {
534         uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
535 
536         if (offset > (p->block->used_length - p->page_size)) {
537             error_setg(errp, "multifd: offset too long %" PRIu64
538                        " (max " RAM_ADDR_FMT ")",
539                        offset, p->block->used_length);
540             return -1;
541         }
542         p->zero[i] = offset;
543     }
544 
545     return 0;
546 }
547 
548 static bool multifd_send_should_exit(void)
549 {
550     return qatomic_read(&multifd_send_state->exiting);
551 }
552 
553 static bool multifd_recv_should_exit(void)
554 {
555     return qatomic_read(&multifd_recv_state->exiting);
556 }
557 
558 /*
559  * The migration thread can wait on either of the two semaphores.  This
560  * function can be used to kick the main thread out of waiting on either of
561  * them.  Should mostly only be called when something wrong happened with
562  * the current multifd send thread.
563  */
564 static void multifd_send_kick_main(MultiFDSendParams *p)
565 {
566     qemu_sem_post(&p->sem_sync);
567     qemu_sem_post(&multifd_send_state->channels_ready);
568 }
569 
570 /*
571  * How we use multifd_send_state->pages and channel->pages?
572  *
573  * We create a pages for each channel, and a main one.  Each time that
574  * we need to send a batch of pages we interchange the ones between
575  * multifd_send_state and the channel that is sending it.  There are
576  * two reasons for that:
577  *    - to not have to do so many mallocs during migration
578  *    - to make easier to know what to free at the end of migration
579  *
580  * This way we always know who is the owner of each "pages" struct,
581  * and we don't need any locking.  It belongs to the migration thread
582  * or to the channel thread.  Switching is safe because the migration
583  * thread is using the channel mutex when changing it, and the channel
584  * have to had finish with its own, otherwise pending_job can't be
585  * false.
586  *
587  * Returns true if succeed, false otherwise.
588  */
589 static bool multifd_send_pages(void)
590 {
591     int i;
592     static int next_channel;
593     MultiFDSendParams *p = NULL; /* make happy gcc */
594     MultiFDPages_t *pages = multifd_send_state->pages;
595 
596     if (multifd_send_should_exit()) {
597         return false;
598     }
599 
600     /* We wait here, until at least one channel is ready */
601     qemu_sem_wait(&multifd_send_state->channels_ready);
602 
603     /*
604      * next_channel can remain from a previous migration that was
605      * using more channels, so ensure it doesn't overflow if the
606      * limit is lower now.
607      */
608     next_channel %= migrate_multifd_channels();
609     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
610         if (multifd_send_should_exit()) {
611             return false;
612         }
613         p = &multifd_send_state->params[i];
614         /*
615          * Lockless read to p->pending_job is safe, because only multifd
616          * sender thread can clear it.
617          */
618         if (qatomic_read(&p->pending_job) == false) {
619             next_channel = (i + 1) % migrate_multifd_channels();
620             break;
621         }
622     }
623 
624     /*
625      * Make sure we read p->pending_job before all the rest.  Pairs with
626      * qatomic_store_release() in multifd_send_thread().
627      */
628     smp_mb_acquire();
629     assert(!p->pages->num);
630     multifd_send_state->pages = p->pages;
631     p->pages = pages;
632     /*
633      * Making sure p->pages is setup before marking pending_job=true. Pairs
634      * with the qatomic_load_acquire() in multifd_send_thread().
635      */
636     qatomic_store_release(&p->pending_job, true);
637     qemu_sem_post(&p->sem);
638 
639     return true;
640 }
641 
642 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
643 {
644     return pages->num == 0;
645 }
646 
647 static inline bool multifd_queue_full(MultiFDPages_t *pages)
648 {
649     return pages->num == pages->allocated;
650 }
651 
652 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
653 {
654     pages->offset[pages->num++] = offset;
655 }
656 
657 /* Returns true if enqueue successful, false otherwise */
658 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
659 {
660     MultiFDPages_t *pages;
661 
662 retry:
663     pages = multifd_send_state->pages;
664 
665     /* If the queue is empty, we can already enqueue now */
666     if (multifd_queue_empty(pages)) {
667         pages->block = block;
668         multifd_enqueue(pages, offset);
669         return true;
670     }
671 
672     /*
673      * Not empty, meanwhile we need a flush.  It can because of either:
674      *
675      * (1) The page is not on the same ramblock of previous ones, or,
676      * (2) The queue is full.
677      *
678      * After flush, always retry.
679      */
680     if (pages->block != block || multifd_queue_full(pages)) {
681         if (!multifd_send_pages()) {
682             return false;
683         }
684         goto retry;
685     }
686 
687     /* Not empty, and we still have space, do it! */
688     multifd_enqueue(pages, offset);
689     return true;
690 }
691 
692 /* Multifd send side hit an error; remember it and prepare to quit */
693 static void multifd_send_set_error(Error *err)
694 {
695     /*
696      * We don't want to exit each threads twice.  Depending on where
697      * we get the error, or if there are two independent errors in two
698      * threads at the same time, we can end calling this function
699      * twice.
700      */
701     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
702         return;
703     }
704 
705     if (err) {
706         MigrationState *s = migrate_get_current();
707         migrate_set_error(s, err);
708         if (s->state == MIGRATION_STATUS_SETUP ||
709             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
710             s->state == MIGRATION_STATUS_DEVICE ||
711             s->state == MIGRATION_STATUS_ACTIVE) {
712             migrate_set_state(&s->state, s->state,
713                               MIGRATION_STATUS_FAILED);
714         }
715     }
716 }
717 
718 static void multifd_send_terminate_threads(void)
719 {
720     int i;
721 
722     trace_multifd_send_terminate_threads();
723 
724     /*
725      * Tell everyone we're quitting.  No xchg() needed here; we simply
726      * always set it.
727      */
728     qatomic_set(&multifd_send_state->exiting, 1);
729 
730     /*
731      * Firstly, kick all threads out; no matter whether they are just idle,
732      * or blocked in an IO system call.
733      */
734     for (i = 0; i < migrate_multifd_channels(); i++) {
735         MultiFDSendParams *p = &multifd_send_state->params[i];
736 
737         qemu_sem_post(&p->sem);
738         if (p->c) {
739             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
740         }
741     }
742 
743     /*
744      * Finally recycle all the threads.
745      */
746     for (i = 0; i < migrate_multifd_channels(); i++) {
747         MultiFDSendParams *p = &multifd_send_state->params[i];
748 
749         if (p->tls_thread_created) {
750             qemu_thread_join(&p->tls_thread);
751         }
752 
753         if (p->thread_created) {
754             qemu_thread_join(&p->thread);
755         }
756     }
757 }
758 
759 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
760 {
761     if (p->c) {
762         migration_ioc_unregister_yank(p->c);
763         /*
764          * The object_unref() cannot guarantee the fd will always be
765          * released because finalize() of the iochannel is only
766          * triggered on the last reference and it's not guaranteed
767          * that we always hold the last refcount when reaching here.
768          *
769          * Closing the fd explicitly has the benefit that if there is any
770          * registered I/O handler callbacks on such fd, that will get a
771          * POLLNVAL event and will further trigger the cleanup to finally
772          * release the IOC.
773          *
774          * FIXME: It should logically be guaranteed that all multifd
775          * channels have no I/O handler callback registered when reaching
776          * here, because migration thread will wait for all multifd channel
777          * establishments to complete during setup.  Since
778          * migrate_fd_cleanup() will be scheduled in main thread too, all
779          * previous callbacks should guarantee to be completed when
780          * reaching here.  See multifd_send_state.channels_created and its
781          * usage.  In the future, we could replace this with an assert
782          * making sure we're the last reference, or simply drop it if above
783          * is more clear to be justified.
784          */
785         qio_channel_close(p->c, &error_abort);
786         object_unref(OBJECT(p->c));
787         p->c = NULL;
788     }
789     qemu_sem_destroy(&p->sem);
790     qemu_sem_destroy(&p->sem_sync);
791     g_free(p->name);
792     p->name = NULL;
793     multifd_pages_clear(p->pages);
794     p->pages = NULL;
795     p->packet_len = 0;
796     g_free(p->packet);
797     p->packet = NULL;
798     multifd_send_state->ops->send_cleanup(p, errp);
799 
800     return *errp == NULL;
801 }
802 
803 static void multifd_send_cleanup_state(void)
804 {
805     file_cleanup_outgoing_migration();
806     socket_cleanup_outgoing_migration();
807     qemu_sem_destroy(&multifd_send_state->channels_created);
808     qemu_sem_destroy(&multifd_send_state->channels_ready);
809     g_free(multifd_send_state->params);
810     multifd_send_state->params = NULL;
811     multifd_pages_clear(multifd_send_state->pages);
812     multifd_send_state->pages = NULL;
813     g_free(multifd_send_state);
814     multifd_send_state = NULL;
815 }
816 
817 void multifd_send_shutdown(void)
818 {
819     int i;
820 
821     if (!migrate_multifd()) {
822         return;
823     }
824 
825     multifd_send_terminate_threads();
826 
827     for (i = 0; i < migrate_multifd_channels(); i++) {
828         MultiFDSendParams *p = &multifd_send_state->params[i];
829         Error *local_err = NULL;
830 
831         if (!multifd_send_cleanup_channel(p, &local_err)) {
832             migrate_set_error(migrate_get_current(), local_err);
833             error_free(local_err);
834         }
835     }
836 
837     multifd_send_cleanup_state();
838 }
839 
840 static int multifd_zero_copy_flush(QIOChannel *c)
841 {
842     int ret;
843     Error *err = NULL;
844 
845     ret = qio_channel_flush(c, &err);
846     if (ret < 0) {
847         error_report_err(err);
848         return -1;
849     }
850     if (ret == 1) {
851         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
852     }
853 
854     return ret;
855 }
856 
857 int multifd_send_sync_main(void)
858 {
859     int i;
860     bool flush_zero_copy;
861 
862     if (!migrate_multifd()) {
863         return 0;
864     }
865     if (multifd_send_state->pages->num) {
866         if (!multifd_send_pages()) {
867             error_report("%s: multifd_send_pages fail", __func__);
868             return -1;
869         }
870     }
871 
872     flush_zero_copy = migrate_zero_copy_send();
873 
874     for (i = 0; i < migrate_multifd_channels(); i++) {
875         MultiFDSendParams *p = &multifd_send_state->params[i];
876 
877         if (multifd_send_should_exit()) {
878             return -1;
879         }
880 
881         trace_multifd_send_sync_main_signal(p->id);
882 
883         /*
884          * We should be the only user so far, so not possible to be set by
885          * others concurrently.
886          */
887         assert(qatomic_read(&p->pending_sync) == false);
888         qatomic_set(&p->pending_sync, true);
889         qemu_sem_post(&p->sem);
890     }
891     for (i = 0; i < migrate_multifd_channels(); i++) {
892         MultiFDSendParams *p = &multifd_send_state->params[i];
893 
894         if (multifd_send_should_exit()) {
895             return -1;
896         }
897 
898         qemu_sem_wait(&multifd_send_state->channels_ready);
899         trace_multifd_send_sync_main_wait(p->id);
900         qemu_sem_wait(&p->sem_sync);
901 
902         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
903             return -1;
904         }
905     }
906     trace_multifd_send_sync_main(multifd_send_state->packet_num);
907 
908     return 0;
909 }
910 
911 static void *multifd_send_thread(void *opaque)
912 {
913     MultiFDSendParams *p = opaque;
914     MigrationThread *thread = NULL;
915     Error *local_err = NULL;
916     int ret = 0;
917     bool use_packets = multifd_use_packets();
918 
919     thread = migration_threads_add(p->name, qemu_get_thread_id());
920 
921     trace_multifd_send_thread_start(p->id);
922     rcu_register_thread();
923 
924     if (use_packets) {
925         if (multifd_send_initial_packet(p, &local_err) < 0) {
926             ret = -1;
927             goto out;
928         }
929     }
930 
931     while (true) {
932         qemu_sem_post(&multifd_send_state->channels_ready);
933         qemu_sem_wait(&p->sem);
934 
935         if (multifd_send_should_exit()) {
936             break;
937         }
938 
939         /*
940          * Read pending_job flag before p->pages.  Pairs with the
941          * qatomic_store_release() in multifd_send_pages().
942          */
943         if (qatomic_load_acquire(&p->pending_job)) {
944             MultiFDPages_t *pages = p->pages;
945 
946             p->iovs_num = 0;
947             assert(pages->num);
948 
949             ret = multifd_send_state->ops->send_prepare(p, &local_err);
950             if (ret != 0) {
951                 break;
952             }
953 
954             if (migrate_mapped_ram()) {
955                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
956                                               p->pages->block, &local_err);
957             } else {
958                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
959                                                   NULL, 0, p->write_flags,
960                                                   &local_err);
961             }
962 
963             if (ret != 0) {
964                 break;
965             }
966 
967             stat64_add(&mig_stats.multifd_bytes,
968                        p->next_packet_size + p->packet_len);
969             stat64_add(&mig_stats.normal_pages, pages->normal_num);
970             stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
971 
972             multifd_pages_reset(p->pages);
973             p->next_packet_size = 0;
974 
975             /*
976              * Making sure p->pages is published before saying "we're
977              * free".  Pairs with the smp_mb_acquire() in
978              * multifd_send_pages().
979              */
980             qatomic_store_release(&p->pending_job, false);
981         } else {
982             /*
983              * If not a normal job, must be a sync request.  Note that
984              * pending_sync is a standalone flag (unlike pending_job), so
985              * it doesn't require explicit memory barriers.
986              */
987             assert(qatomic_read(&p->pending_sync));
988 
989             if (use_packets) {
990                 p->flags = MULTIFD_FLAG_SYNC;
991                 multifd_send_fill_packet(p);
992                 ret = qio_channel_write_all(p->c, (void *)p->packet,
993                                             p->packet_len, &local_err);
994                 if (ret != 0) {
995                     break;
996                 }
997                 /* p->next_packet_size will always be zero for a SYNC packet */
998                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
999                 p->flags = 0;
1000             }
1001 
1002             qatomic_set(&p->pending_sync, false);
1003             qemu_sem_post(&p->sem_sync);
1004         }
1005     }
1006 
1007 out:
1008     if (ret) {
1009         assert(local_err);
1010         trace_multifd_send_error(p->id);
1011         multifd_send_set_error(local_err);
1012         multifd_send_kick_main(p);
1013         error_free(local_err);
1014     }
1015 
1016     rcu_unregister_thread();
1017     migration_threads_remove(thread);
1018     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
1019                                   p->total_zero_pages);
1020 
1021     return NULL;
1022 }
1023 
1024 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
1025 
1026 typedef struct {
1027     MultiFDSendParams *p;
1028     QIOChannelTLS *tioc;
1029 } MultiFDTLSThreadArgs;
1030 
1031 static void *multifd_tls_handshake_thread(void *opaque)
1032 {
1033     MultiFDTLSThreadArgs *args = opaque;
1034 
1035     qio_channel_tls_handshake(args->tioc,
1036                               multifd_new_send_channel_async,
1037                               args->p,
1038                               NULL,
1039                               NULL);
1040     g_free(args);
1041 
1042     return NULL;
1043 }
1044 
1045 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
1046                                         QIOChannel *ioc,
1047                                         Error **errp)
1048 {
1049     MigrationState *s = migrate_get_current();
1050     const char *hostname = s->hostname;
1051     MultiFDTLSThreadArgs *args;
1052     QIOChannelTLS *tioc;
1053 
1054     tioc = migration_tls_client_create(ioc, hostname, errp);
1055     if (!tioc) {
1056         return false;
1057     }
1058 
1059     /*
1060      * Ownership of the socket channel now transfers to the newly
1061      * created TLS channel, which has already taken a reference.
1062      */
1063     object_unref(OBJECT(ioc));
1064     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
1065     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
1066 
1067     args = g_new0(MultiFDTLSThreadArgs, 1);
1068     args->tioc = tioc;
1069     args->p = p;
1070 
1071     p->tls_thread_created = true;
1072     qemu_thread_create(&p->tls_thread, "mig/src/tls",
1073                        multifd_tls_handshake_thread, args,
1074                        QEMU_THREAD_JOINABLE);
1075     return true;
1076 }
1077 
1078 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
1079 {
1080     qio_channel_set_delay(ioc, false);
1081 
1082     migration_ioc_register_yank(ioc);
1083     /* Setup p->c only if the channel is completely setup */
1084     p->c = ioc;
1085 
1086     p->thread_created = true;
1087     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1088                        QEMU_THREAD_JOINABLE);
1089 }
1090 
1091 /*
1092  * When TLS is enabled this function is called once to establish the
1093  * TLS connection and a second time after the TLS handshake to create
1094  * the multifd channel. Without TLS it goes straight into the channel
1095  * creation.
1096  */
1097 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1098 {
1099     MultiFDSendParams *p = opaque;
1100     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1101     Error *local_err = NULL;
1102     bool ret;
1103 
1104     trace_multifd_new_send_channel_async(p->id);
1105 
1106     if (qio_task_propagate_error(task, &local_err)) {
1107         ret = false;
1108         goto out;
1109     }
1110 
1111     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1112                                        migrate_get_current()->hostname);
1113 
1114     if (migrate_channel_requires_tls_upgrade(ioc)) {
1115         ret = multifd_tls_channel_connect(p, ioc, &local_err);
1116         if (ret) {
1117             return;
1118         }
1119     } else {
1120         multifd_channel_connect(p, ioc);
1121         ret = true;
1122     }
1123 
1124 out:
1125     /*
1126      * Here we're not interested whether creation succeeded, only that
1127      * it happened at all.
1128      */
1129     multifd_send_channel_created();
1130 
1131     if (ret) {
1132         return;
1133     }
1134 
1135     trace_multifd_new_send_channel_async_error(p->id, local_err);
1136     multifd_send_set_error(local_err);
1137     /*
1138      * For error cases (TLS or non-TLS), IO channel is always freed here
1139      * rather than when cleanup multifd: since p->c is not set, multifd
1140      * cleanup code doesn't even know its existence.
1141      */
1142     object_unref(OBJECT(ioc));
1143     error_free(local_err);
1144 }
1145 
1146 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
1147 {
1148     if (!multifd_use_packets()) {
1149         return file_send_channel_create(opaque, errp);
1150     }
1151 
1152     socket_send_channel_create(multifd_new_send_channel_async, opaque);
1153     return true;
1154 }
1155 
1156 bool multifd_send_setup(void)
1157 {
1158     MigrationState *s = migrate_get_current();
1159     int thread_count, ret = 0;
1160     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1161     bool use_packets = multifd_use_packets();
1162     uint8_t i;
1163 
1164     if (!migrate_multifd()) {
1165         return true;
1166     }
1167 
1168     thread_count = migrate_multifd_channels();
1169     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1170     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1171     multifd_send_state->pages = multifd_pages_init(page_count);
1172     qemu_sem_init(&multifd_send_state->channels_created, 0);
1173     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1174     qatomic_set(&multifd_send_state->exiting, 0);
1175     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1176 
1177     for (i = 0; i < thread_count; i++) {
1178         MultiFDSendParams *p = &multifd_send_state->params[i];
1179         Error *local_err = NULL;
1180 
1181         qemu_sem_init(&p->sem, 0);
1182         qemu_sem_init(&p->sem_sync, 0);
1183         p->id = i;
1184         p->pages = multifd_pages_init(page_count);
1185 
1186         if (use_packets) {
1187             p->packet_len = sizeof(MultiFDPacket_t)
1188                           + sizeof(uint64_t) * page_count;
1189             p->packet = g_malloc0(p->packet_len);
1190             p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1191             p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1192         }
1193         p->name = g_strdup_printf("mig/src/send_%d", i);
1194         p->page_size = qemu_target_page_size();
1195         p->page_count = page_count;
1196         p->write_flags = 0;
1197 
1198         if (!multifd_new_send_channel_create(p, &local_err)) {
1199             migrate_set_error(s, local_err);
1200             ret = -1;
1201         }
1202     }
1203 
1204     /*
1205      * Wait until channel creation has started for all channels. The
1206      * creation can still fail, but no more channels will be created
1207      * past this point.
1208      */
1209     for (i = 0; i < thread_count; i++) {
1210         qemu_sem_wait(&multifd_send_state->channels_created);
1211     }
1212 
1213     if (ret) {
1214         goto err;
1215     }
1216 
1217     for (i = 0; i < thread_count; i++) {
1218         MultiFDSendParams *p = &multifd_send_state->params[i];
1219         Error *local_err = NULL;
1220 
1221         ret = multifd_send_state->ops->send_setup(p, &local_err);
1222         if (ret) {
1223             migrate_set_error(s, local_err);
1224             goto err;
1225         }
1226     }
1227 
1228     return true;
1229 
1230 err:
1231     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1232                       MIGRATION_STATUS_FAILED);
1233     return false;
1234 }
1235 
1236 bool multifd_recv(void)
1237 {
1238     int i;
1239     static int next_recv_channel;
1240     MultiFDRecvParams *p = NULL;
1241     MultiFDRecvData *data = multifd_recv_state->data;
1242 
1243     /*
1244      * next_channel can remain from a previous migration that was
1245      * using more channels, so ensure it doesn't overflow if the
1246      * limit is lower now.
1247      */
1248     next_recv_channel %= migrate_multifd_channels();
1249     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1250         if (multifd_recv_should_exit()) {
1251             return false;
1252         }
1253 
1254         p = &multifd_recv_state->params[i];
1255 
1256         if (qatomic_read(&p->pending_job) == false) {
1257             next_recv_channel = (i + 1) % migrate_multifd_channels();
1258             break;
1259         }
1260     }
1261 
1262     /*
1263      * Order pending_job read before manipulating p->data below. Pairs
1264      * with qatomic_store_release() at multifd_recv_thread().
1265      */
1266     smp_mb_acquire();
1267 
1268     assert(!p->data->size);
1269     multifd_recv_state->data = p->data;
1270     p->data = data;
1271 
1272     /*
1273      * Order p->data update before setting pending_job. Pairs with
1274      * qatomic_load_acquire() at multifd_recv_thread().
1275      */
1276     qatomic_store_release(&p->pending_job, true);
1277     qemu_sem_post(&p->sem);
1278 
1279     return true;
1280 }
1281 
1282 MultiFDRecvData *multifd_get_recv_data(void)
1283 {
1284     return multifd_recv_state->data;
1285 }
1286 
1287 static void multifd_recv_terminate_threads(Error *err)
1288 {
1289     int i;
1290 
1291     trace_multifd_recv_terminate_threads(err != NULL);
1292 
1293     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1294         return;
1295     }
1296 
1297     if (err) {
1298         MigrationState *s = migrate_get_current();
1299         migrate_set_error(s, err);
1300         if (s->state == MIGRATION_STATUS_SETUP ||
1301             s->state == MIGRATION_STATUS_ACTIVE) {
1302             migrate_set_state(&s->state, s->state,
1303                               MIGRATION_STATUS_FAILED);
1304         }
1305     }
1306 
1307     for (i = 0; i < migrate_multifd_channels(); i++) {
1308         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1309 
1310         /*
1311          * The migration thread and channels interact differently
1312          * depending on the presence of packets.
1313          */
1314         if (multifd_use_packets()) {
1315             /*
1316              * The channel receives as long as there are packets. When
1317              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1318              * channel waits for the migration thread to sync. If the
1319              * sync never happens, do it here.
1320              */
1321             qemu_sem_post(&p->sem_sync);
1322         } else {
1323             /*
1324              * The channel waits for the migration thread to give it
1325              * work. When the migration thread runs out of work, it
1326              * releases the channel and waits for any pending work to
1327              * finish. If we reach here (e.g. due to error) before the
1328              * work runs out, release the channel.
1329              */
1330             qemu_sem_post(&p->sem);
1331         }
1332 
1333         /*
1334          * We could arrive here for two reasons:
1335          *  - normal quit, i.e. everything went fine, just finished
1336          *  - error quit: We close the channels so the channel threads
1337          *    finish the qio_channel_read_all_eof()
1338          */
1339         if (p->c) {
1340             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1341         }
1342     }
1343 }
1344 
1345 void multifd_recv_shutdown(void)
1346 {
1347     if (migrate_multifd()) {
1348         multifd_recv_terminate_threads(NULL);
1349     }
1350 }
1351 
1352 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1353 {
1354     migration_ioc_unregister_yank(p->c);
1355     object_unref(OBJECT(p->c));
1356     p->c = NULL;
1357     qemu_mutex_destroy(&p->mutex);
1358     qemu_sem_destroy(&p->sem_sync);
1359     qemu_sem_destroy(&p->sem);
1360     g_free(p->data);
1361     p->data = NULL;
1362     g_free(p->name);
1363     p->name = NULL;
1364     p->packet_len = 0;
1365     g_free(p->packet);
1366     p->packet = NULL;
1367     g_free(p->normal);
1368     p->normal = NULL;
1369     g_free(p->zero);
1370     p->zero = NULL;
1371     multifd_recv_state->ops->recv_cleanup(p);
1372 }
1373 
1374 static void multifd_recv_cleanup_state(void)
1375 {
1376     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1377     g_free(multifd_recv_state->params);
1378     multifd_recv_state->params = NULL;
1379     g_free(multifd_recv_state->data);
1380     multifd_recv_state->data = NULL;
1381     g_free(multifd_recv_state);
1382     multifd_recv_state = NULL;
1383 }
1384 
1385 void multifd_recv_cleanup(void)
1386 {
1387     int i;
1388 
1389     if (!migrate_multifd()) {
1390         return;
1391     }
1392     multifd_recv_terminate_threads(NULL);
1393     for (i = 0; i < migrate_multifd_channels(); i++) {
1394         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1395 
1396         if (p->thread_created) {
1397             qemu_thread_join(&p->thread);
1398         }
1399     }
1400     for (i = 0; i < migrate_multifd_channels(); i++) {
1401         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1402     }
1403     multifd_recv_cleanup_state();
1404 }
1405 
1406 void multifd_recv_sync_main(void)
1407 {
1408     int thread_count = migrate_multifd_channels();
1409     bool file_based = !multifd_use_packets();
1410     int i;
1411 
1412     if (!migrate_multifd()) {
1413         return;
1414     }
1415 
1416     /*
1417      * File-based channels don't use packets and therefore need to
1418      * wait for more work. Release them to start the sync.
1419      */
1420     if (file_based) {
1421         for (i = 0; i < thread_count; i++) {
1422             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1423 
1424             trace_multifd_recv_sync_main_signal(p->id);
1425             qemu_sem_post(&p->sem);
1426         }
1427     }
1428 
1429     /*
1430      * Initiate the synchronization by waiting for all channels.
1431      *
1432      * For socket-based migration this means each channel has received
1433      * the SYNC packet on the stream.
1434      *
1435      * For file-based migration this means each channel is done with
1436      * the work (pending_job=false).
1437      */
1438     for (i = 0; i < thread_count; i++) {
1439         trace_multifd_recv_sync_main_wait(i);
1440         qemu_sem_wait(&multifd_recv_state->sem_sync);
1441     }
1442 
1443     if (file_based) {
1444         /*
1445          * For file-based loading is done in one iteration. We're
1446          * done.
1447          */
1448         return;
1449     }
1450 
1451     /*
1452      * Sync done. Release the channels for the next iteration.
1453      */
1454     for (i = 0; i < thread_count; i++) {
1455         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1456 
1457         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1458             if (multifd_recv_state->packet_num < p->packet_num) {
1459                 multifd_recv_state->packet_num = p->packet_num;
1460             }
1461         }
1462         trace_multifd_recv_sync_main_signal(p->id);
1463         qemu_sem_post(&p->sem_sync);
1464     }
1465     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1466 }
1467 
1468 static void *multifd_recv_thread(void *opaque)
1469 {
1470     MultiFDRecvParams *p = opaque;
1471     Error *local_err = NULL;
1472     bool use_packets = multifd_use_packets();
1473     int ret;
1474 
1475     trace_multifd_recv_thread_start(p->id);
1476     rcu_register_thread();
1477 
1478     while (true) {
1479         uint32_t flags = 0;
1480         bool has_data = false;
1481         p->normal_num = 0;
1482 
1483         if (use_packets) {
1484             if (multifd_recv_should_exit()) {
1485                 break;
1486             }
1487 
1488             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1489                                            p->packet_len, &local_err);
1490             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1491                 break;
1492             }
1493 
1494             qemu_mutex_lock(&p->mutex);
1495             ret = multifd_recv_unfill_packet(p, &local_err);
1496             if (ret) {
1497                 qemu_mutex_unlock(&p->mutex);
1498                 break;
1499             }
1500 
1501             flags = p->flags;
1502             /* recv methods don't know how to handle the SYNC flag */
1503             p->flags &= ~MULTIFD_FLAG_SYNC;
1504             has_data = p->normal_num || p->zero_num;
1505             qemu_mutex_unlock(&p->mutex);
1506         } else {
1507             /*
1508              * No packets, so we need to wait for the vmstate code to
1509              * give us work.
1510              */
1511             qemu_sem_wait(&p->sem);
1512 
1513             if (multifd_recv_should_exit()) {
1514                 break;
1515             }
1516 
1517             /* pairs with qatomic_store_release() at multifd_recv() */
1518             if (!qatomic_load_acquire(&p->pending_job)) {
1519                 /*
1520                  * Migration thread did not send work, this is
1521                  * equivalent to pending_sync on the sending
1522                  * side. Post sem_sync to notify we reached this
1523                  * point.
1524                  */
1525                 qemu_sem_post(&multifd_recv_state->sem_sync);
1526                 continue;
1527             }
1528 
1529             has_data = !!p->data->size;
1530         }
1531 
1532         if (has_data) {
1533             ret = multifd_recv_state->ops->recv(p, &local_err);
1534             if (ret != 0) {
1535                 break;
1536             }
1537         }
1538 
1539         if (use_packets) {
1540             if (flags & MULTIFD_FLAG_SYNC) {
1541                 qemu_sem_post(&multifd_recv_state->sem_sync);
1542                 qemu_sem_wait(&p->sem_sync);
1543             }
1544         } else {
1545             p->total_normal_pages += p->data->size / qemu_target_page_size();
1546             p->data->size = 0;
1547             /*
1548              * Order data->size update before clearing
1549              * pending_job. Pairs with smp_mb_acquire() at
1550              * multifd_recv().
1551              */
1552             qatomic_store_release(&p->pending_job, false);
1553         }
1554     }
1555 
1556     if (local_err) {
1557         multifd_recv_terminate_threads(local_err);
1558         error_free(local_err);
1559     }
1560 
1561     rcu_unregister_thread();
1562     trace_multifd_recv_thread_end(p->id, p->packets_recved,
1563                                   p->total_normal_pages,
1564                                   p->total_zero_pages);
1565 
1566     return NULL;
1567 }
1568 
1569 int multifd_recv_setup(Error **errp)
1570 {
1571     int thread_count;
1572     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1573     bool use_packets = multifd_use_packets();
1574     uint8_t i;
1575 
1576     /*
1577      * Return successfully if multiFD recv state is already initialised
1578      * or multiFD is not enabled.
1579      */
1580     if (multifd_recv_state || !migrate_multifd()) {
1581         return 0;
1582     }
1583 
1584     thread_count = migrate_multifd_channels();
1585     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1586     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1587 
1588     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1589     multifd_recv_state->data->size = 0;
1590 
1591     qatomic_set(&multifd_recv_state->count, 0);
1592     qatomic_set(&multifd_recv_state->exiting, 0);
1593     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1594     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1595 
1596     for (i = 0; i < thread_count; i++) {
1597         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1598 
1599         qemu_mutex_init(&p->mutex);
1600         qemu_sem_init(&p->sem_sync, 0);
1601         qemu_sem_init(&p->sem, 0);
1602         p->pending_job = false;
1603         p->id = i;
1604 
1605         p->data = g_new0(MultiFDRecvData, 1);
1606         p->data->size = 0;
1607 
1608         if (use_packets) {
1609             p->packet_len = sizeof(MultiFDPacket_t)
1610                 + sizeof(uint64_t) * page_count;
1611             p->packet = g_malloc0(p->packet_len);
1612         }
1613         p->name = g_strdup_printf("mig/dst/recv_%d", i);
1614         p->normal = g_new0(ram_addr_t, page_count);
1615         p->zero = g_new0(ram_addr_t, page_count);
1616         p->page_count = page_count;
1617         p->page_size = qemu_target_page_size();
1618     }
1619 
1620     for (i = 0; i < thread_count; i++) {
1621         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1622         int ret;
1623 
1624         ret = multifd_recv_state->ops->recv_setup(p, errp);
1625         if (ret) {
1626             return ret;
1627         }
1628     }
1629     return 0;
1630 }
1631 
1632 bool multifd_recv_all_channels_created(void)
1633 {
1634     int thread_count = migrate_multifd_channels();
1635 
1636     if (!migrate_multifd()) {
1637         return true;
1638     }
1639 
1640     if (!multifd_recv_state) {
1641         /* Called before any connections created */
1642         return false;
1643     }
1644 
1645     return thread_count == qatomic_read(&multifd_recv_state->count);
1646 }
1647 
1648 /*
1649  * Try to receive all multifd channels to get ready for the migration.
1650  * Sets @errp when failing to receive the current channel.
1651  */
1652 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1653 {
1654     MultiFDRecvParams *p;
1655     Error *local_err = NULL;
1656     bool use_packets = multifd_use_packets();
1657     int id;
1658 
1659     if (use_packets) {
1660         id = multifd_recv_initial_packet(ioc, &local_err);
1661         if (id < 0) {
1662             multifd_recv_terminate_threads(local_err);
1663             error_propagate_prepend(errp, local_err,
1664                                     "failed to receive packet"
1665                                     " via multifd channel %d: ",
1666                                     qatomic_read(&multifd_recv_state->count));
1667             return;
1668         }
1669         trace_multifd_recv_new_channel(id);
1670     } else {
1671         id = qatomic_read(&multifd_recv_state->count);
1672     }
1673 
1674     p = &multifd_recv_state->params[id];
1675     if (p->c != NULL) {
1676         error_setg(&local_err, "multifd: received id '%d' already setup'",
1677                    id);
1678         multifd_recv_terminate_threads(local_err);
1679         error_propagate(errp, local_err);
1680         return;
1681     }
1682     p->c = ioc;
1683     object_ref(OBJECT(ioc));
1684 
1685     p->thread_created = true;
1686     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1687                        QEMU_THREAD_JOINABLE);
1688     qatomic_inc(&multifd_recv_state->count);
1689 }
1690 
1691 bool multifd_send_prepare_common(MultiFDSendParams *p)
1692 {
1693     multifd_send_zero_page_detect(p);
1694 
1695     if (!p->pages->normal_num) {
1696         p->next_packet_size = 0;
1697         return false;
1698     }
1699 
1700     multifd_send_prepare_header(p);
1701 
1702     return true;
1703 }
1704