xref: /openbmc/qemu/migration/multifd.c (revision 8cbb4fc12e1d10182cbab93f234510bc616594ca)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /* array of pages to sent */
53     MultiFDPages_t *pages;
54     /*
55      * Global number of generated multifd packets.
56      *
57      * Note that we used 'uintptr_t' because it'll naturally support atomic
58      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
59      * multifd will overflow the packet_num easier, but that should be
60      * fine.
61      *
62      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63      * hosts, however so far it does not support atomic fetch_add() yet.
64      * Make it easy for now.
65      */
66     uintptr_t packet_num;
67     /*
68      * Synchronization point past which no more channels will be
69      * created.
70      */
71     QemuSemaphore channels_created;
72     /* send channels ready */
73     QemuSemaphore channels_ready;
74     /*
75      * Have we already run terminate threads.  There is a race when it
76      * happens that we got one error while we are exiting.
77      * We will use atomic operations.  Only valid values are 0 and 1.
78      */
79     int exiting;
80     /* multifd ops */
81     MultiFDMethods *ops;
82 } *multifd_send_state;
83 
84 struct {
85     MultiFDRecvParams *params;
86     MultiFDRecvData *data;
87     /* number of created threads */
88     int count;
89     /*
90      * This is always posted by the recv threads, the migration thread
91      * uses it to wait for recv threads to finish assigned tasks.
92      */
93     QemuSemaphore sem_sync;
94     /* global number of generated multifd packets */
95     uint64_t packet_num;
96     int exiting;
97     /* multifd ops */
98     MultiFDMethods *ops;
99 } *multifd_recv_state;
100 
101 static bool multifd_use_packets(void)
102 {
103     return !migrate_mapped_ram();
104 }
105 
106 void multifd_send_channel_created(void)
107 {
108     qemu_sem_post(&multifd_send_state->channels_created);
109 }
110 
111 static void multifd_set_file_bitmap(MultiFDSendParams *p)
112 {
113     MultiFDPages_t *pages = p->pages;
114 
115     assert(pages->block);
116 
117     for (int i = 0; i < p->pages->normal_num; i++) {
118         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
119     }
120 
121     for (int i = p->pages->normal_num; i < p->pages->num; i++) {
122         ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
123     }
124 }
125 
126 /* Multifd without compression */
127 
128 /**
129  * nocomp_send_setup: setup send side
130  *
131  * @p: Params for the channel that we are using
132  * @errp: pointer to an error
133  */
134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
135 {
136     if (migrate_zero_copy_send()) {
137         p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
138     }
139 
140     if (multifd_use_packets()) {
141         /* We need one extra place for the packet header */
142         p->iov = g_new0(struct iovec, p->page_count + 1);
143     } else {
144         p->iov = g_new0(struct iovec, p->page_count);
145     }
146 
147     return 0;
148 }
149 
150 /**
151  * nocomp_send_cleanup: cleanup send side
152  *
153  * For no compression this function does nothing.
154  *
155  * @p: Params for the channel that we are using
156  * @errp: pointer to an error
157  */
158 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
159 {
160     g_free(p->iov);
161     p->iov = NULL;
162     return;
163 }
164 
165 static void multifd_send_prepare_iovs(MultiFDSendParams *p)
166 {
167     MultiFDPages_t *pages = p->pages;
168 
169     for (int i = 0; i < pages->normal_num; i++) {
170         p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
171         p->iov[p->iovs_num].iov_len = p->page_size;
172         p->iovs_num++;
173     }
174 
175     p->next_packet_size = pages->normal_num * p->page_size;
176 }
177 
178 /**
179  * nocomp_send_prepare: prepare date to be able to send
180  *
181  * For no compression we just have to calculate the size of the
182  * packet.
183  *
184  * Returns 0 for success or -1 for error
185  *
186  * @p: Params for the channel that we are using
187  * @errp: pointer to an error
188  */
189 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
190 {
191     bool use_zero_copy_send = migrate_zero_copy_send();
192     int ret;
193 
194     multifd_send_zero_page_detect(p);
195 
196     if (!multifd_use_packets()) {
197         multifd_send_prepare_iovs(p);
198         multifd_set_file_bitmap(p);
199 
200         return 0;
201     }
202 
203     if (!use_zero_copy_send) {
204         /*
205          * Only !zerocopy needs the header in IOV; zerocopy will
206          * send it separately.
207          */
208         multifd_send_prepare_header(p);
209     }
210 
211     multifd_send_prepare_iovs(p);
212     p->flags |= MULTIFD_FLAG_NOCOMP;
213 
214     multifd_send_fill_packet(p);
215 
216     if (use_zero_copy_send) {
217         /* Send header first, without zerocopy */
218         ret = qio_channel_write_all(p->c, (void *)p->packet,
219                                     p->packet_len, errp);
220         if (ret != 0) {
221             return -1;
222         }
223     }
224 
225     return 0;
226 }
227 
228 /**
229  * nocomp_recv_setup: setup receive side
230  *
231  * For no compression this function does nothing.
232  *
233  * Returns 0 for success or -1 for error
234  *
235  * @p: Params for the channel that we are using
236  * @errp: pointer to an error
237  */
238 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
239 {
240     p->iov = g_new0(struct iovec, p->page_count);
241     return 0;
242 }
243 
244 /**
245  * nocomp_recv_cleanup: setup receive side
246  *
247  * For no compression this function does nothing.
248  *
249  * @p: Params for the channel that we are using
250  */
251 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
252 {
253     g_free(p->iov);
254     p->iov = NULL;
255 }
256 
257 /**
258  * nocomp_recv: read the data from the channel
259  *
260  * For no compression we just need to read things into the correct place.
261  *
262  * Returns 0 for success or -1 for error
263  *
264  * @p: Params for the channel that we are using
265  * @errp: pointer to an error
266  */
267 static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
268 {
269     uint32_t flags;
270 
271     if (!multifd_use_packets()) {
272         return multifd_file_recv_data(p, errp);
273     }
274 
275     flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
276 
277     if (flags != MULTIFD_FLAG_NOCOMP) {
278         error_setg(errp, "multifd %u: flags received %x flags expected %x",
279                    p->id, flags, MULTIFD_FLAG_NOCOMP);
280         return -1;
281     }
282 
283     multifd_recv_zero_page_process(p);
284 
285     if (!p->normal_num) {
286         return 0;
287     }
288 
289     for (int i = 0; i < p->normal_num; i++) {
290         p->iov[i].iov_base = p->host + p->normal[i];
291         p->iov[i].iov_len = p->page_size;
292         ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
293     }
294     return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
295 }
296 
297 static MultiFDMethods multifd_nocomp_ops = {
298     .send_setup = nocomp_send_setup,
299     .send_cleanup = nocomp_send_cleanup,
300     .send_prepare = nocomp_send_prepare,
301     .recv_setup = nocomp_recv_setup,
302     .recv_cleanup = nocomp_recv_cleanup,
303     .recv = nocomp_recv
304 };
305 
306 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
307     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
308 };
309 
310 void multifd_register_ops(int method, MultiFDMethods *ops)
311 {
312     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
313     multifd_ops[method] = ops;
314 }
315 
316 /* Reset a MultiFDPages_t* object for the next use */
317 static void multifd_pages_reset(MultiFDPages_t *pages)
318 {
319     /*
320      * We don't need to touch offset[] array, because it will be
321      * overwritten later when reused.
322      */
323     pages->num = 0;
324     pages->normal_num = 0;
325     pages->block = NULL;
326 }
327 
328 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
329 {
330     MultiFDInit_t msg = {};
331     size_t size = sizeof(msg);
332     int ret;
333 
334     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
335     msg.version = cpu_to_be32(MULTIFD_VERSION);
336     msg.id = p->id;
337     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
338 
339     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
340     if (ret != 0) {
341         return -1;
342     }
343     stat64_add(&mig_stats.multifd_bytes, size);
344     return 0;
345 }
346 
347 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
348 {
349     MultiFDInit_t msg;
350     int ret;
351 
352     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
353     if (ret != 0) {
354         return -1;
355     }
356 
357     msg.magic = be32_to_cpu(msg.magic);
358     msg.version = be32_to_cpu(msg.version);
359 
360     if (msg.magic != MULTIFD_MAGIC) {
361         error_setg(errp, "multifd: received packet magic %x "
362                    "expected %x", msg.magic, MULTIFD_MAGIC);
363         return -1;
364     }
365 
366     if (msg.version != MULTIFD_VERSION) {
367         error_setg(errp, "multifd: received packet version %u "
368                    "expected %u", msg.version, MULTIFD_VERSION);
369         return -1;
370     }
371 
372     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
373         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
374         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
375 
376         error_setg(errp, "multifd: received uuid '%s' and expected "
377                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
378         g_free(uuid);
379         g_free(msg_uuid);
380         return -1;
381     }
382 
383     if (msg.id > migrate_multifd_channels()) {
384         error_setg(errp, "multifd: received channel id %u is greater than "
385                    "number of channels %u", msg.id, migrate_multifd_channels());
386         return -1;
387     }
388 
389     return msg.id;
390 }
391 
392 static MultiFDPages_t *multifd_pages_init(uint32_t n)
393 {
394     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
395 
396     pages->allocated = n;
397     pages->offset = g_new0(ram_addr_t, n);
398 
399     return pages;
400 }
401 
402 static void multifd_pages_clear(MultiFDPages_t *pages)
403 {
404     multifd_pages_reset(pages);
405     pages->allocated = 0;
406     g_free(pages->offset);
407     pages->offset = NULL;
408     g_free(pages);
409 }
410 
411 void multifd_send_fill_packet(MultiFDSendParams *p)
412 {
413     MultiFDPacket_t *packet = p->packet;
414     MultiFDPages_t *pages = p->pages;
415     uint64_t packet_num;
416     uint32_t zero_num = pages->num - pages->normal_num;
417     int i;
418 
419     packet->flags = cpu_to_be32(p->flags);
420     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
421     packet->normal_pages = cpu_to_be32(pages->normal_num);
422     packet->zero_pages = cpu_to_be32(zero_num);
423     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
424 
425     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
426     packet->packet_num = cpu_to_be64(packet_num);
427 
428     if (pages->block) {
429         strncpy(packet->ramblock, pages->block->idstr, 256);
430     }
431 
432     for (i = 0; i < pages->num; i++) {
433         /* there are architectures where ram_addr_t is 32 bit */
434         uint64_t temp = pages->offset[i];
435 
436         packet->offset[i] = cpu_to_be64(temp);
437     }
438 
439     p->packets_sent++;
440     p->total_normal_pages += pages->normal_num;
441     p->total_zero_pages += zero_num;
442 
443     trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
444                        p->flags, p->next_packet_size);
445 }
446 
447 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
448 {
449     MultiFDPacket_t *packet = p->packet;
450     int i;
451 
452     packet->magic = be32_to_cpu(packet->magic);
453     if (packet->magic != MULTIFD_MAGIC) {
454         error_setg(errp, "multifd: received packet "
455                    "magic %x and expected magic %x",
456                    packet->magic, MULTIFD_MAGIC);
457         return -1;
458     }
459 
460     packet->version = be32_to_cpu(packet->version);
461     if (packet->version != MULTIFD_VERSION) {
462         error_setg(errp, "multifd: received packet "
463                    "version %u and expected version %u",
464                    packet->version, MULTIFD_VERSION);
465         return -1;
466     }
467 
468     p->flags = be32_to_cpu(packet->flags);
469 
470     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
471     /*
472      * If we received a packet that is 100 times bigger than expected
473      * just stop migration.  It is a magic number.
474      */
475     if (packet->pages_alloc > p->page_count) {
476         error_setg(errp, "multifd: received packet "
477                    "with size %u and expected a size of %u",
478                    packet->pages_alloc, p->page_count) ;
479         return -1;
480     }
481 
482     p->normal_num = be32_to_cpu(packet->normal_pages);
483     if (p->normal_num > packet->pages_alloc) {
484         error_setg(errp, "multifd: received packet "
485                    "with %u normal pages and expected maximum pages are %u",
486                    p->normal_num, packet->pages_alloc) ;
487         return -1;
488     }
489 
490     p->zero_num = be32_to_cpu(packet->zero_pages);
491     if (p->zero_num > packet->pages_alloc - p->normal_num) {
492         error_setg(errp, "multifd: received packet "
493                    "with %u zero pages and expected maximum zero pages are %u",
494                    p->zero_num, packet->pages_alloc - p->normal_num) ;
495         return -1;
496     }
497 
498     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
499     p->packet_num = be64_to_cpu(packet->packet_num);
500     p->packets_recved++;
501     p->total_normal_pages += p->normal_num;
502     p->total_zero_pages += p->zero_num;
503 
504     trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
505                        p->flags, p->next_packet_size);
506 
507     if (p->normal_num == 0 && p->zero_num == 0) {
508         return 0;
509     }
510 
511     /* make sure that ramblock is 0 terminated */
512     packet->ramblock[255] = 0;
513     p->block = qemu_ram_block_by_name(packet->ramblock);
514     if (!p->block) {
515         error_setg(errp, "multifd: unknown ram block %s",
516                    packet->ramblock);
517         return -1;
518     }
519 
520     p->host = p->block->host;
521     for (i = 0; i < p->normal_num; i++) {
522         uint64_t offset = be64_to_cpu(packet->offset[i]);
523 
524         if (offset > (p->block->used_length - p->page_size)) {
525             error_setg(errp, "multifd: offset too long %" PRIu64
526                        " (max " RAM_ADDR_FMT ")",
527                        offset, p->block->used_length);
528             return -1;
529         }
530         p->normal[i] = offset;
531     }
532 
533     for (i = 0; i < p->zero_num; i++) {
534         uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
535 
536         if (offset > (p->block->used_length - p->page_size)) {
537             error_setg(errp, "multifd: offset too long %" PRIu64
538                        " (max " RAM_ADDR_FMT ")",
539                        offset, p->block->used_length);
540             return -1;
541         }
542         p->zero[i] = offset;
543     }
544 
545     return 0;
546 }
547 
548 static bool multifd_send_should_exit(void)
549 {
550     return qatomic_read(&multifd_send_state->exiting);
551 }
552 
553 static bool multifd_recv_should_exit(void)
554 {
555     return qatomic_read(&multifd_recv_state->exiting);
556 }
557 
558 /*
559  * The migration thread can wait on either of the two semaphores.  This
560  * function can be used to kick the main thread out of waiting on either of
561  * them.  Should mostly only be called when something wrong happened with
562  * the current multifd send thread.
563  */
564 static void multifd_send_kick_main(MultiFDSendParams *p)
565 {
566     qemu_sem_post(&p->sem_sync);
567     qemu_sem_post(&multifd_send_state->channels_ready);
568 }
569 
570 /*
571  * How we use multifd_send_state->pages and channel->pages?
572  *
573  * We create a pages for each channel, and a main one.  Each time that
574  * we need to send a batch of pages we interchange the ones between
575  * multifd_send_state and the channel that is sending it.  There are
576  * two reasons for that:
577  *    - to not have to do so many mallocs during migration
578  *    - to make easier to know what to free at the end of migration
579  *
580  * This way we always know who is the owner of each "pages" struct,
581  * and we don't need any locking.  It belongs to the migration thread
582  * or to the channel thread.  Switching is safe because the migration
583  * thread is using the channel mutex when changing it, and the channel
584  * have to had finish with its own, otherwise pending_job can't be
585  * false.
586  *
587  * Returns true if succeed, false otherwise.
588  */
589 static bool multifd_send_pages(void)
590 {
591     int i;
592     static int next_channel;
593     MultiFDSendParams *p = NULL; /* make happy gcc */
594     MultiFDPages_t *pages = multifd_send_state->pages;
595 
596     if (multifd_send_should_exit()) {
597         return false;
598     }
599 
600     /* We wait here, until at least one channel is ready */
601     qemu_sem_wait(&multifd_send_state->channels_ready);
602 
603     /*
604      * next_channel can remain from a previous migration that was
605      * using more channels, so ensure it doesn't overflow if the
606      * limit is lower now.
607      */
608     next_channel %= migrate_multifd_channels();
609     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
610         if (multifd_send_should_exit()) {
611             return false;
612         }
613         p = &multifd_send_state->params[i];
614         /*
615          * Lockless read to p->pending_job is safe, because only multifd
616          * sender thread can clear it.
617          */
618         if (qatomic_read(&p->pending_job) == false) {
619             next_channel = (i + 1) % migrate_multifd_channels();
620             break;
621         }
622     }
623 
624     /*
625      * Make sure we read p->pending_job before all the rest.  Pairs with
626      * qatomic_store_release() in multifd_send_thread().
627      */
628     smp_mb_acquire();
629     assert(!p->pages->num);
630     multifd_send_state->pages = p->pages;
631     p->pages = pages;
632     /*
633      * Making sure p->pages is setup before marking pending_job=true. Pairs
634      * with the qatomic_load_acquire() in multifd_send_thread().
635      */
636     qatomic_store_release(&p->pending_job, true);
637     qemu_sem_post(&p->sem);
638 
639     return true;
640 }
641 
642 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
643 {
644     return pages->num == 0;
645 }
646 
647 static inline bool multifd_queue_full(MultiFDPages_t *pages)
648 {
649     return pages->num == pages->allocated;
650 }
651 
652 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
653 {
654     pages->offset[pages->num++] = offset;
655 }
656 
657 /* Returns true if enqueue successful, false otherwise */
658 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
659 {
660     MultiFDPages_t *pages;
661 
662 retry:
663     pages = multifd_send_state->pages;
664 
665     /* If the queue is empty, we can already enqueue now */
666     if (multifd_queue_empty(pages)) {
667         pages->block = block;
668         multifd_enqueue(pages, offset);
669         return true;
670     }
671 
672     /*
673      * Not empty, meanwhile we need a flush.  It can because of either:
674      *
675      * (1) The page is not on the same ramblock of previous ones, or,
676      * (2) The queue is full.
677      *
678      * After flush, always retry.
679      */
680     if (pages->block != block || multifd_queue_full(pages)) {
681         if (!multifd_send_pages()) {
682             return false;
683         }
684         goto retry;
685     }
686 
687     /* Not empty, and we still have space, do it! */
688     multifd_enqueue(pages, offset);
689     return true;
690 }
691 
692 /* Multifd send side hit an error; remember it and prepare to quit */
693 static void multifd_send_set_error(Error *err)
694 {
695     /*
696      * We don't want to exit each threads twice.  Depending on where
697      * we get the error, or if there are two independent errors in two
698      * threads at the same time, we can end calling this function
699      * twice.
700      */
701     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
702         return;
703     }
704 
705     if (err) {
706         MigrationState *s = migrate_get_current();
707         migrate_set_error(s, err);
708         if (s->state == MIGRATION_STATUS_SETUP ||
709             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
710             s->state == MIGRATION_STATUS_DEVICE ||
711             s->state == MIGRATION_STATUS_ACTIVE) {
712             migrate_set_state(&s->state, s->state,
713                               MIGRATION_STATUS_FAILED);
714         }
715     }
716 }
717 
718 static void multifd_send_terminate_threads(void)
719 {
720     int i;
721 
722     trace_multifd_send_terminate_threads();
723 
724     /*
725      * Tell everyone we're quitting.  No xchg() needed here; we simply
726      * always set it.
727      */
728     qatomic_set(&multifd_send_state->exiting, 1);
729 
730     /*
731      * Firstly, kick all threads out; no matter whether they are just idle,
732      * or blocked in an IO system call.
733      */
734     for (i = 0; i < migrate_multifd_channels(); i++) {
735         MultiFDSendParams *p = &multifd_send_state->params[i];
736 
737         qemu_sem_post(&p->sem);
738         if (p->c) {
739             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
740         }
741     }
742 
743     /*
744      * Finally recycle all the threads.
745      */
746     for (i = 0; i < migrate_multifd_channels(); i++) {
747         MultiFDSendParams *p = &multifd_send_state->params[i];
748 
749         if (p->tls_thread_created) {
750             qemu_thread_join(&p->tls_thread);
751         }
752 
753         if (p->thread_created) {
754             qemu_thread_join(&p->thread);
755         }
756     }
757 }
758 
759 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
760 {
761     if (p->c) {
762         migration_ioc_unregister_yank(p->c);
763         /*
764          * The object_unref() cannot guarantee the fd will always be
765          * released because finalize() of the iochannel is only
766          * triggered on the last reference and it's not guaranteed
767          * that we always hold the last refcount when reaching here.
768          *
769          * Closing the fd explicitly has the benefit that if there is any
770          * registered I/O handler callbacks on such fd, that will get a
771          * POLLNVAL event and will further trigger the cleanup to finally
772          * release the IOC.
773          *
774          * FIXME: It should logically be guaranteed that all multifd
775          * channels have no I/O handler callback registered when reaching
776          * here, because migration thread will wait for all multifd channel
777          * establishments to complete during setup.  Since
778          * migrate_fd_cleanup() will be scheduled in main thread too, all
779          * previous callbacks should guarantee to be completed when
780          * reaching here.  See multifd_send_state.channels_created and its
781          * usage.  In the future, we could replace this with an assert
782          * making sure we're the last reference, or simply drop it if above
783          * is more clear to be justified.
784          */
785         qio_channel_close(p->c, &error_abort);
786         object_unref(OBJECT(p->c));
787         p->c = NULL;
788     }
789     qemu_sem_destroy(&p->sem);
790     qemu_sem_destroy(&p->sem_sync);
791     g_free(p->name);
792     p->name = NULL;
793     multifd_pages_clear(p->pages);
794     p->pages = NULL;
795     p->packet_len = 0;
796     g_free(p->packet);
797     p->packet = NULL;
798     multifd_send_state->ops->send_cleanup(p, errp);
799 
800     return *errp == NULL;
801 }
802 
803 static void multifd_send_cleanup_state(void)
804 {
805     file_cleanup_outgoing_migration();
806     socket_cleanup_outgoing_migration();
807     qemu_sem_destroy(&multifd_send_state->channels_created);
808     qemu_sem_destroy(&multifd_send_state->channels_ready);
809     g_free(multifd_send_state->params);
810     multifd_send_state->params = NULL;
811     multifd_pages_clear(multifd_send_state->pages);
812     multifd_send_state->pages = NULL;
813     g_free(multifd_send_state);
814     multifd_send_state = NULL;
815 }
816 
817 void multifd_send_shutdown(void)
818 {
819     int i;
820 
821     if (!migrate_multifd()) {
822         return;
823     }
824 
825     multifd_send_terminate_threads();
826 
827     for (i = 0; i < migrate_multifd_channels(); i++) {
828         MultiFDSendParams *p = &multifd_send_state->params[i];
829         Error *local_err = NULL;
830 
831         if (!multifd_send_cleanup_channel(p, &local_err)) {
832             migrate_set_error(migrate_get_current(), local_err);
833             error_free(local_err);
834         }
835     }
836 
837     multifd_send_cleanup_state();
838 }
839 
840 static int multifd_zero_copy_flush(QIOChannel *c)
841 {
842     int ret;
843     Error *err = NULL;
844 
845     ret = qio_channel_flush(c, &err);
846     if (ret < 0) {
847         error_report_err(err);
848         return -1;
849     }
850     if (ret == 1) {
851         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
852     }
853 
854     return ret;
855 }
856 
857 int multifd_send_sync_main(void)
858 {
859     int i;
860     bool flush_zero_copy;
861 
862     if (!migrate_multifd()) {
863         return 0;
864     }
865     if (multifd_send_state->pages->num) {
866         if (!multifd_send_pages()) {
867             error_report("%s: multifd_send_pages fail", __func__);
868             return -1;
869         }
870     }
871 
872     flush_zero_copy = migrate_zero_copy_send();
873 
874     for (i = 0; i < migrate_multifd_channels(); i++) {
875         MultiFDSendParams *p = &multifd_send_state->params[i];
876 
877         if (multifd_send_should_exit()) {
878             return -1;
879         }
880 
881         trace_multifd_send_sync_main_signal(p->id);
882 
883         /*
884          * We should be the only user so far, so not possible to be set by
885          * others concurrently.
886          */
887         assert(qatomic_read(&p->pending_sync) == false);
888         qatomic_set(&p->pending_sync, true);
889         qemu_sem_post(&p->sem);
890     }
891     for (i = 0; i < migrate_multifd_channels(); i++) {
892         MultiFDSendParams *p = &multifd_send_state->params[i];
893 
894         if (multifd_send_should_exit()) {
895             return -1;
896         }
897 
898         qemu_sem_wait(&multifd_send_state->channels_ready);
899         trace_multifd_send_sync_main_wait(p->id);
900         qemu_sem_wait(&p->sem_sync);
901 
902         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
903             return -1;
904         }
905     }
906     trace_multifd_send_sync_main(multifd_send_state->packet_num);
907 
908     return 0;
909 }
910 
911 static void *multifd_send_thread(void *opaque)
912 {
913     MultiFDSendParams *p = opaque;
914     MigrationThread *thread = NULL;
915     Error *local_err = NULL;
916     int ret = 0;
917     bool use_packets = multifd_use_packets();
918 
919     thread = migration_threads_add(p->name, qemu_get_thread_id());
920 
921     trace_multifd_send_thread_start(p->id);
922     rcu_register_thread();
923 
924     if (use_packets) {
925         if (multifd_send_initial_packet(p, &local_err) < 0) {
926             ret = -1;
927             goto out;
928         }
929     }
930 
931     while (true) {
932         qemu_sem_post(&multifd_send_state->channels_ready);
933         qemu_sem_wait(&p->sem);
934 
935         if (multifd_send_should_exit()) {
936             break;
937         }
938 
939         /*
940          * Read pending_job flag before p->pages.  Pairs with the
941          * qatomic_store_release() in multifd_send_pages().
942          */
943         if (qatomic_load_acquire(&p->pending_job)) {
944             MultiFDPages_t *pages = p->pages;
945 
946             p->iovs_num = 0;
947             assert(pages->num);
948 
949             ret = multifd_send_state->ops->send_prepare(p, &local_err);
950             if (ret != 0) {
951                 break;
952             }
953 
954             if (migrate_mapped_ram()) {
955                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
956                                               p->pages->block, &local_err);
957             } else {
958                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
959                                                   NULL, 0, p->write_flags,
960                                                   &local_err);
961             }
962 
963             if (ret != 0) {
964                 break;
965             }
966 
967             stat64_add(&mig_stats.multifd_bytes,
968                        p->next_packet_size + p->packet_len);
969             stat64_add(&mig_stats.normal_pages, pages->normal_num);
970             stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
971 
972             multifd_pages_reset(p->pages);
973             p->next_packet_size = 0;
974 
975             /*
976              * Making sure p->pages is published before saying "we're
977              * free".  Pairs with the smp_mb_acquire() in
978              * multifd_send_pages().
979              */
980             qatomic_store_release(&p->pending_job, false);
981         } else {
982             /*
983              * If not a normal job, must be a sync request.  Note that
984              * pending_sync is a standalone flag (unlike pending_job), so
985              * it doesn't require explicit memory barriers.
986              */
987             assert(qatomic_read(&p->pending_sync));
988 
989             if (use_packets) {
990                 p->flags = MULTIFD_FLAG_SYNC;
991                 multifd_send_fill_packet(p);
992                 ret = qio_channel_write_all(p->c, (void *)p->packet,
993                                             p->packet_len, &local_err);
994                 if (ret != 0) {
995                     break;
996                 }
997                 /* p->next_packet_size will always be zero for a SYNC packet */
998                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
999                 p->flags = 0;
1000             }
1001 
1002             qatomic_set(&p->pending_sync, false);
1003             qemu_sem_post(&p->sem_sync);
1004         }
1005     }
1006 
1007 out:
1008     if (ret) {
1009         assert(local_err);
1010         trace_multifd_send_error(p->id);
1011         multifd_send_set_error(local_err);
1012         multifd_send_kick_main(p);
1013         error_free(local_err);
1014     }
1015 
1016     rcu_unregister_thread();
1017     migration_threads_remove(thread);
1018     trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
1019                                   p->total_zero_pages);
1020 
1021     return NULL;
1022 }
1023 
1024 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
1025 
1026 typedef struct {
1027     MultiFDSendParams *p;
1028     QIOChannelTLS *tioc;
1029 } MultiFDTLSThreadArgs;
1030 
1031 static void *multifd_tls_handshake_thread(void *opaque)
1032 {
1033     MultiFDTLSThreadArgs *args = opaque;
1034 
1035     qio_channel_tls_handshake(args->tioc,
1036                               multifd_new_send_channel_async,
1037                               args->p,
1038                               NULL,
1039                               NULL);
1040     g_free(args);
1041 
1042     return NULL;
1043 }
1044 
1045 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
1046                                         QIOChannel *ioc,
1047                                         Error **errp)
1048 {
1049     MigrationState *s = migrate_get_current();
1050     const char *hostname = s->hostname;
1051     MultiFDTLSThreadArgs *args;
1052     QIOChannelTLS *tioc;
1053 
1054     tioc = migration_tls_client_create(ioc, hostname, errp);
1055     if (!tioc) {
1056         return false;
1057     }
1058 
1059     /*
1060      * Ownership of the socket channel now transfers to the newly
1061      * created TLS channel, which has already taken a reference.
1062      */
1063     object_unref(OBJECT(ioc));
1064     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
1065     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
1066 
1067     args = g_new0(MultiFDTLSThreadArgs, 1);
1068     args->tioc = tioc;
1069     args->p = p;
1070 
1071     p->tls_thread_created = true;
1072     qemu_thread_create(&p->tls_thread, "mig/src/tls",
1073                        multifd_tls_handshake_thread, args,
1074                        QEMU_THREAD_JOINABLE);
1075     return true;
1076 }
1077 
1078 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
1079 {
1080     qio_channel_set_delay(ioc, false);
1081 
1082     migration_ioc_register_yank(ioc);
1083     /* Setup p->c only if the channel is completely setup */
1084     p->c = ioc;
1085 
1086     p->thread_created = true;
1087     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1088                        QEMU_THREAD_JOINABLE);
1089 }
1090 
1091 /*
1092  * When TLS is enabled this function is called once to establish the
1093  * TLS connection and a second time after the TLS handshake to create
1094  * the multifd channel. Without TLS it goes straight into the channel
1095  * creation.
1096  */
1097 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1098 {
1099     MultiFDSendParams *p = opaque;
1100     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1101     Error *local_err = NULL;
1102     bool ret;
1103 
1104     trace_multifd_new_send_channel_async(p->id);
1105 
1106     if (qio_task_propagate_error(task, &local_err)) {
1107         ret = false;
1108         goto out;
1109     }
1110 
1111     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1112                                        migrate_get_current()->hostname);
1113 
1114     if (migrate_channel_requires_tls_upgrade(ioc)) {
1115         ret = multifd_tls_channel_connect(p, ioc, &local_err);
1116         if (ret) {
1117             return;
1118         }
1119     } else {
1120         multifd_channel_connect(p, ioc);
1121         ret = true;
1122     }
1123 
1124 out:
1125     /*
1126      * Here we're not interested whether creation succeeded, only that
1127      * it happened at all.
1128      */
1129     multifd_send_channel_created();
1130 
1131     if (ret) {
1132         return;
1133     }
1134 
1135     trace_multifd_new_send_channel_async_error(p->id, local_err);
1136     multifd_send_set_error(local_err);
1137     /*
1138      * For error cases (TLS or non-TLS), IO channel is always freed here
1139      * rather than when cleanup multifd: since p->c is not set, multifd
1140      * cleanup code doesn't even know its existence.
1141      */
1142     object_unref(OBJECT(ioc));
1143     error_free(local_err);
1144 }
1145 
1146 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
1147 {
1148     if (!multifd_use_packets()) {
1149         return file_send_channel_create(opaque, errp);
1150     }
1151 
1152     socket_send_channel_create(multifd_new_send_channel_async, opaque);
1153     return true;
1154 }
1155 
1156 bool multifd_send_setup(void)
1157 {
1158     MigrationState *s = migrate_get_current();
1159     Error *local_err = NULL;
1160     int thread_count, ret = 0;
1161     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1162     bool use_packets = multifd_use_packets();
1163     uint8_t i;
1164 
1165     if (!migrate_multifd()) {
1166         return true;
1167     }
1168 
1169     thread_count = migrate_multifd_channels();
1170     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1171     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1172     multifd_send_state->pages = multifd_pages_init(page_count);
1173     qemu_sem_init(&multifd_send_state->channels_created, 0);
1174     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1175     qatomic_set(&multifd_send_state->exiting, 0);
1176     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1177 
1178     for (i = 0; i < thread_count; i++) {
1179         MultiFDSendParams *p = &multifd_send_state->params[i];
1180 
1181         qemu_sem_init(&p->sem, 0);
1182         qemu_sem_init(&p->sem_sync, 0);
1183         p->id = i;
1184         p->pages = multifd_pages_init(page_count);
1185 
1186         if (use_packets) {
1187             p->packet_len = sizeof(MultiFDPacket_t)
1188                           + sizeof(uint64_t) * page_count;
1189             p->packet = g_malloc0(p->packet_len);
1190             p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1191             p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1192         }
1193         p->name = g_strdup_printf("mig/src/send_%d", i);
1194         p->page_size = qemu_target_page_size();
1195         p->page_count = page_count;
1196         p->write_flags = 0;
1197 
1198         if (!multifd_new_send_channel_create(p, &local_err)) {
1199             return false;
1200         }
1201     }
1202 
1203     /*
1204      * Wait until channel creation has started for all channels. The
1205      * creation can still fail, but no more channels will be created
1206      * past this point.
1207      */
1208     for (i = 0; i < thread_count; i++) {
1209         qemu_sem_wait(&multifd_send_state->channels_created);
1210     }
1211 
1212     for (i = 0; i < thread_count; i++) {
1213         MultiFDSendParams *p = &multifd_send_state->params[i];
1214 
1215         ret = multifd_send_state->ops->send_setup(p, &local_err);
1216         if (ret) {
1217             break;
1218         }
1219     }
1220 
1221     if (ret) {
1222         migrate_set_error(s, local_err);
1223         error_report_err(local_err);
1224         migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1225                           MIGRATION_STATUS_FAILED);
1226         return false;
1227     }
1228 
1229     return true;
1230 }
1231 
1232 bool multifd_recv(void)
1233 {
1234     int i;
1235     static int next_recv_channel;
1236     MultiFDRecvParams *p = NULL;
1237     MultiFDRecvData *data = multifd_recv_state->data;
1238 
1239     /*
1240      * next_channel can remain from a previous migration that was
1241      * using more channels, so ensure it doesn't overflow if the
1242      * limit is lower now.
1243      */
1244     next_recv_channel %= migrate_multifd_channels();
1245     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1246         if (multifd_recv_should_exit()) {
1247             return false;
1248         }
1249 
1250         p = &multifd_recv_state->params[i];
1251 
1252         if (qatomic_read(&p->pending_job) == false) {
1253             next_recv_channel = (i + 1) % migrate_multifd_channels();
1254             break;
1255         }
1256     }
1257 
1258     /*
1259      * Order pending_job read before manipulating p->data below. Pairs
1260      * with qatomic_store_release() at multifd_recv_thread().
1261      */
1262     smp_mb_acquire();
1263 
1264     assert(!p->data->size);
1265     multifd_recv_state->data = p->data;
1266     p->data = data;
1267 
1268     /*
1269      * Order p->data update before setting pending_job. Pairs with
1270      * qatomic_load_acquire() at multifd_recv_thread().
1271      */
1272     qatomic_store_release(&p->pending_job, true);
1273     qemu_sem_post(&p->sem);
1274 
1275     return true;
1276 }
1277 
1278 MultiFDRecvData *multifd_get_recv_data(void)
1279 {
1280     return multifd_recv_state->data;
1281 }
1282 
1283 static void multifd_recv_terminate_threads(Error *err)
1284 {
1285     int i;
1286 
1287     trace_multifd_recv_terminate_threads(err != NULL);
1288 
1289     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1290         return;
1291     }
1292 
1293     if (err) {
1294         MigrationState *s = migrate_get_current();
1295         migrate_set_error(s, err);
1296         if (s->state == MIGRATION_STATUS_SETUP ||
1297             s->state == MIGRATION_STATUS_ACTIVE) {
1298             migrate_set_state(&s->state, s->state,
1299                               MIGRATION_STATUS_FAILED);
1300         }
1301     }
1302 
1303     for (i = 0; i < migrate_multifd_channels(); i++) {
1304         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1305 
1306         /*
1307          * The migration thread and channels interact differently
1308          * depending on the presence of packets.
1309          */
1310         if (multifd_use_packets()) {
1311             /*
1312              * The channel receives as long as there are packets. When
1313              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1314              * channel waits for the migration thread to sync. If the
1315              * sync never happens, do it here.
1316              */
1317             qemu_sem_post(&p->sem_sync);
1318         } else {
1319             /*
1320              * The channel waits for the migration thread to give it
1321              * work. When the migration thread runs out of work, it
1322              * releases the channel and waits for any pending work to
1323              * finish. If we reach here (e.g. due to error) before the
1324              * work runs out, release the channel.
1325              */
1326             qemu_sem_post(&p->sem);
1327         }
1328 
1329         /*
1330          * We could arrive here for two reasons:
1331          *  - normal quit, i.e. everything went fine, just finished
1332          *  - error quit: We close the channels so the channel threads
1333          *    finish the qio_channel_read_all_eof()
1334          */
1335         if (p->c) {
1336             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1337         }
1338     }
1339 }
1340 
1341 void multifd_recv_shutdown(void)
1342 {
1343     if (migrate_multifd()) {
1344         multifd_recv_terminate_threads(NULL);
1345     }
1346 }
1347 
1348 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1349 {
1350     migration_ioc_unregister_yank(p->c);
1351     object_unref(OBJECT(p->c));
1352     p->c = NULL;
1353     qemu_mutex_destroy(&p->mutex);
1354     qemu_sem_destroy(&p->sem_sync);
1355     qemu_sem_destroy(&p->sem);
1356     g_free(p->name);
1357     p->name = NULL;
1358     p->packet_len = 0;
1359     g_free(p->packet);
1360     p->packet = NULL;
1361     g_free(p->normal);
1362     p->normal = NULL;
1363     g_free(p->zero);
1364     p->zero = NULL;
1365     multifd_recv_state->ops->recv_cleanup(p);
1366 }
1367 
1368 static void multifd_recv_cleanup_state(void)
1369 {
1370     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1371     g_free(multifd_recv_state->params);
1372     multifd_recv_state->params = NULL;
1373     g_free(multifd_recv_state->data);
1374     multifd_recv_state->data = NULL;
1375     g_free(multifd_recv_state);
1376     multifd_recv_state = NULL;
1377 }
1378 
1379 void multifd_recv_cleanup(void)
1380 {
1381     int i;
1382 
1383     if (!migrate_multifd()) {
1384         return;
1385     }
1386     multifd_recv_terminate_threads(NULL);
1387     for (i = 0; i < migrate_multifd_channels(); i++) {
1388         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1389 
1390         if (p->thread_created) {
1391             qemu_thread_join(&p->thread);
1392         }
1393     }
1394     for (i = 0; i < migrate_multifd_channels(); i++) {
1395         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1396     }
1397     multifd_recv_cleanup_state();
1398 }
1399 
1400 void multifd_recv_sync_main(void)
1401 {
1402     int thread_count = migrate_multifd_channels();
1403     bool file_based = !multifd_use_packets();
1404     int i;
1405 
1406     if (!migrate_multifd()) {
1407         return;
1408     }
1409 
1410     /*
1411      * File-based channels don't use packets and therefore need to
1412      * wait for more work. Release them to start the sync.
1413      */
1414     if (file_based) {
1415         for (i = 0; i < thread_count; i++) {
1416             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1417 
1418             trace_multifd_recv_sync_main_signal(p->id);
1419             qemu_sem_post(&p->sem);
1420         }
1421     }
1422 
1423     /*
1424      * Initiate the synchronization by waiting for all channels.
1425      *
1426      * For socket-based migration this means each channel has received
1427      * the SYNC packet on the stream.
1428      *
1429      * For file-based migration this means each channel is done with
1430      * the work (pending_job=false).
1431      */
1432     for (i = 0; i < thread_count; i++) {
1433         trace_multifd_recv_sync_main_wait(i);
1434         qemu_sem_wait(&multifd_recv_state->sem_sync);
1435     }
1436 
1437     if (file_based) {
1438         /*
1439          * For file-based loading is done in one iteration. We're
1440          * done.
1441          */
1442         return;
1443     }
1444 
1445     /*
1446      * Sync done. Release the channels for the next iteration.
1447      */
1448     for (i = 0; i < thread_count; i++) {
1449         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1450 
1451         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1452             if (multifd_recv_state->packet_num < p->packet_num) {
1453                 multifd_recv_state->packet_num = p->packet_num;
1454             }
1455         }
1456         trace_multifd_recv_sync_main_signal(p->id);
1457         qemu_sem_post(&p->sem_sync);
1458     }
1459     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1460 }
1461 
1462 static void *multifd_recv_thread(void *opaque)
1463 {
1464     MultiFDRecvParams *p = opaque;
1465     Error *local_err = NULL;
1466     bool use_packets = multifd_use_packets();
1467     int ret;
1468 
1469     trace_multifd_recv_thread_start(p->id);
1470     rcu_register_thread();
1471 
1472     while (true) {
1473         uint32_t flags = 0;
1474         bool has_data = false;
1475         p->normal_num = 0;
1476 
1477         if (use_packets) {
1478             if (multifd_recv_should_exit()) {
1479                 break;
1480             }
1481 
1482             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1483                                            p->packet_len, &local_err);
1484             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1485                 break;
1486             }
1487 
1488             qemu_mutex_lock(&p->mutex);
1489             ret = multifd_recv_unfill_packet(p, &local_err);
1490             if (ret) {
1491                 qemu_mutex_unlock(&p->mutex);
1492                 break;
1493             }
1494 
1495             flags = p->flags;
1496             /* recv methods don't know how to handle the SYNC flag */
1497             p->flags &= ~MULTIFD_FLAG_SYNC;
1498             has_data = p->normal_num || p->zero_num;
1499             qemu_mutex_unlock(&p->mutex);
1500         } else {
1501             /*
1502              * No packets, so we need to wait for the vmstate code to
1503              * give us work.
1504              */
1505             qemu_sem_wait(&p->sem);
1506 
1507             if (multifd_recv_should_exit()) {
1508                 break;
1509             }
1510 
1511             /* pairs with qatomic_store_release() at multifd_recv() */
1512             if (!qatomic_load_acquire(&p->pending_job)) {
1513                 /*
1514                  * Migration thread did not send work, this is
1515                  * equivalent to pending_sync on the sending
1516                  * side. Post sem_sync to notify we reached this
1517                  * point.
1518                  */
1519                 qemu_sem_post(&multifd_recv_state->sem_sync);
1520                 continue;
1521             }
1522 
1523             has_data = !!p->data->size;
1524         }
1525 
1526         if (has_data) {
1527             ret = multifd_recv_state->ops->recv(p, &local_err);
1528             if (ret != 0) {
1529                 break;
1530             }
1531         }
1532 
1533         if (use_packets) {
1534             if (flags & MULTIFD_FLAG_SYNC) {
1535                 qemu_sem_post(&multifd_recv_state->sem_sync);
1536                 qemu_sem_wait(&p->sem_sync);
1537             }
1538         } else {
1539             p->total_normal_pages += p->data->size / qemu_target_page_size();
1540             p->data->size = 0;
1541             /*
1542              * Order data->size update before clearing
1543              * pending_job. Pairs with smp_mb_acquire() at
1544              * multifd_recv().
1545              */
1546             qatomic_store_release(&p->pending_job, false);
1547         }
1548     }
1549 
1550     if (local_err) {
1551         multifd_recv_terminate_threads(local_err);
1552         error_free(local_err);
1553     }
1554 
1555     rcu_unregister_thread();
1556     trace_multifd_recv_thread_end(p->id, p->packets_recved,
1557                                   p->total_normal_pages,
1558                                   p->total_zero_pages);
1559 
1560     return NULL;
1561 }
1562 
1563 int multifd_recv_setup(Error **errp)
1564 {
1565     int thread_count;
1566     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1567     bool use_packets = multifd_use_packets();
1568     uint8_t i;
1569 
1570     /*
1571      * Return successfully if multiFD recv state is already initialised
1572      * or multiFD is not enabled.
1573      */
1574     if (multifd_recv_state || !migrate_multifd()) {
1575         return 0;
1576     }
1577 
1578     thread_count = migrate_multifd_channels();
1579     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1580     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1581 
1582     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1583     multifd_recv_state->data->size = 0;
1584 
1585     qatomic_set(&multifd_recv_state->count, 0);
1586     qatomic_set(&multifd_recv_state->exiting, 0);
1587     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1588     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1589 
1590     for (i = 0; i < thread_count; i++) {
1591         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1592 
1593         qemu_mutex_init(&p->mutex);
1594         qemu_sem_init(&p->sem_sync, 0);
1595         qemu_sem_init(&p->sem, 0);
1596         p->pending_job = false;
1597         p->id = i;
1598 
1599         p->data = g_new0(MultiFDRecvData, 1);
1600         p->data->size = 0;
1601 
1602         if (use_packets) {
1603             p->packet_len = sizeof(MultiFDPacket_t)
1604                 + sizeof(uint64_t) * page_count;
1605             p->packet = g_malloc0(p->packet_len);
1606         }
1607         p->name = g_strdup_printf("mig/dst/recv_%d", i);
1608         p->normal = g_new0(ram_addr_t, page_count);
1609         p->zero = g_new0(ram_addr_t, page_count);
1610         p->page_count = page_count;
1611         p->page_size = qemu_target_page_size();
1612     }
1613 
1614     for (i = 0; i < thread_count; i++) {
1615         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1616         int ret;
1617 
1618         ret = multifd_recv_state->ops->recv_setup(p, errp);
1619         if (ret) {
1620             return ret;
1621         }
1622     }
1623     return 0;
1624 }
1625 
1626 bool multifd_recv_all_channels_created(void)
1627 {
1628     int thread_count = migrate_multifd_channels();
1629 
1630     if (!migrate_multifd()) {
1631         return true;
1632     }
1633 
1634     if (!multifd_recv_state) {
1635         /* Called before any connections created */
1636         return false;
1637     }
1638 
1639     return thread_count == qatomic_read(&multifd_recv_state->count);
1640 }
1641 
1642 /*
1643  * Try to receive all multifd channels to get ready for the migration.
1644  * Sets @errp when failing to receive the current channel.
1645  */
1646 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1647 {
1648     MultiFDRecvParams *p;
1649     Error *local_err = NULL;
1650     bool use_packets = multifd_use_packets();
1651     int id;
1652 
1653     if (use_packets) {
1654         id = multifd_recv_initial_packet(ioc, &local_err);
1655         if (id < 0) {
1656             multifd_recv_terminate_threads(local_err);
1657             error_propagate_prepend(errp, local_err,
1658                                     "failed to receive packet"
1659                                     " via multifd channel %d: ",
1660                                     qatomic_read(&multifd_recv_state->count));
1661             return;
1662         }
1663         trace_multifd_recv_new_channel(id);
1664     } else {
1665         id = qatomic_read(&multifd_recv_state->count);
1666     }
1667 
1668     p = &multifd_recv_state->params[id];
1669     if (p->c != NULL) {
1670         error_setg(&local_err, "multifd: received id '%d' already setup'",
1671                    id);
1672         multifd_recv_terminate_threads(local_err);
1673         error_propagate(errp, local_err);
1674         return;
1675     }
1676     p->c = ioc;
1677     object_ref(OBJECT(ioc));
1678 
1679     p->thread_created = true;
1680     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1681                        QEMU_THREAD_JOINABLE);
1682     qatomic_inc(&multifd_recv_state->count);
1683 }
1684 
1685 bool multifd_send_prepare_common(MultiFDSendParams *p)
1686 {
1687     multifd_send_zero_page_detect(p);
1688 
1689     if (!p->pages->normal_num) {
1690         p->next_packet_size = 0;
1691         return false;
1692     }
1693 
1694     multifd_send_prepare_header(p);
1695 
1696     return true;
1697 }
1698