xref: /openbmc/qemu/migration/multifd.c (revision dbd9e084)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "socket.h"
23 #include "tls.h"
24 #include "qemu-file.h"
25 #include "trace.h"
26 #include "multifd.h"
27 
28 #include "qemu/yank.h"
29 #include "io/channel-socket.h"
30 #include "yank_functions.h"
31 
32 /* Multiple fd's */
33 
34 #define MULTIFD_MAGIC 0x11223344U
35 #define MULTIFD_VERSION 1
36 
37 typedef struct {
38     uint32_t magic;
39     uint32_t version;
40     unsigned char uuid[16]; /* QemuUUID */
41     uint8_t id;
42     uint8_t unused1[7];     /* Reserved for future use */
43     uint64_t unused2[4];    /* Reserved for future use */
44 } __attribute__((packed)) MultiFDInit_t;
45 
46 /* Multifd without compression */
47 
48 /**
49  * nocomp_send_setup: setup send side
50  *
51  * For no compression this function does nothing.
52  *
53  * Returns 0 for success or -1 for error
54  *
55  * @p: Params for the channel that we are using
56  * @errp: pointer to an error
57  */
58 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
59 {
60     return 0;
61 }
62 
63 /**
64  * nocomp_send_cleanup: cleanup send side
65  *
66  * For no compression this function does nothing.
67  *
68  * @p: Params for the channel that we are using
69  */
70 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
71 {
72     return;
73 }
74 
75 /**
76  * nocomp_send_prepare: prepare date to be able to send
77  *
78  * For no compression we just have to calculate the size of the
79  * packet.
80  *
81  * Returns 0 for success or -1 for error
82  *
83  * @p: Params for the channel that we are using
84  * @used: number of pages used
85  * @errp: pointer to an error
86  */
87 static int nocomp_send_prepare(MultiFDSendParams *p, uint32_t used,
88                                Error **errp)
89 {
90     p->next_packet_size = used * qemu_target_page_size();
91     p->flags |= MULTIFD_FLAG_NOCOMP;
92     return 0;
93 }
94 
95 /**
96  * nocomp_send_write: do the actual write of the data
97  *
98  * For no compression we just have to write the data.
99  *
100  * Returns 0 for success or -1 for error
101  *
102  * @p: Params for the channel that we are using
103  * @used: number of pages used
104  * @errp: pointer to an error
105  */
106 static int nocomp_send_write(MultiFDSendParams *p, uint32_t used, Error **errp)
107 {
108     return qio_channel_writev_all(p->c, p->pages->iov, used, errp);
109 }
110 
111 /**
112  * nocomp_recv_setup: setup receive side
113  *
114  * For no compression this function does nothing.
115  *
116  * Returns 0 for success or -1 for error
117  *
118  * @p: Params for the channel that we are using
119  * @errp: pointer to an error
120  */
121 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
122 {
123     return 0;
124 }
125 
126 /**
127  * nocomp_recv_cleanup: setup receive side
128  *
129  * For no compression this function does nothing.
130  *
131  * @p: Params for the channel that we are using
132  */
133 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
134 {
135 }
136 
137 /**
138  * nocomp_recv_pages: read the data from the channel into actual pages
139  *
140  * For no compression we just need to read things into the correct place.
141  *
142  * Returns 0 for success or -1 for error
143  *
144  * @p: Params for the channel that we are using
145  * @used: number of pages used
146  * @errp: pointer to an error
147  */
148 static int nocomp_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **errp)
149 {
150     uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
151 
152     if (flags != MULTIFD_FLAG_NOCOMP) {
153         error_setg(errp, "multifd %d: flags received %x flags expected %x",
154                    p->id, flags, MULTIFD_FLAG_NOCOMP);
155         return -1;
156     }
157     return qio_channel_readv_all(p->c, p->pages->iov, used, errp);
158 }
159 
160 static MultiFDMethods multifd_nocomp_ops = {
161     .send_setup = nocomp_send_setup,
162     .send_cleanup = nocomp_send_cleanup,
163     .send_prepare = nocomp_send_prepare,
164     .send_write = nocomp_send_write,
165     .recv_setup = nocomp_recv_setup,
166     .recv_cleanup = nocomp_recv_cleanup,
167     .recv_pages = nocomp_recv_pages
168 };
169 
170 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
171     [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
172 };
173 
174 void multifd_register_ops(int method, MultiFDMethods *ops)
175 {
176     assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
177     multifd_ops[method] = ops;
178 }
179 
180 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
181 {
182     MultiFDInit_t msg = {};
183     int ret;
184 
185     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
186     msg.version = cpu_to_be32(MULTIFD_VERSION);
187     msg.id = p->id;
188     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
189 
190     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
191     if (ret != 0) {
192         return -1;
193     }
194     return 0;
195 }
196 
197 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
198 {
199     MultiFDInit_t msg;
200     int ret;
201 
202     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
203     if (ret != 0) {
204         return -1;
205     }
206 
207     msg.magic = be32_to_cpu(msg.magic);
208     msg.version = be32_to_cpu(msg.version);
209 
210     if (msg.magic != MULTIFD_MAGIC) {
211         error_setg(errp, "multifd: received packet magic %x "
212                    "expected %x", msg.magic, MULTIFD_MAGIC);
213         return -1;
214     }
215 
216     if (msg.version != MULTIFD_VERSION) {
217         error_setg(errp, "multifd: received packet version %d "
218                    "expected %d", msg.version, MULTIFD_VERSION);
219         return -1;
220     }
221 
222     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
223         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
224         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
225 
226         error_setg(errp, "multifd: received uuid '%s' and expected "
227                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
228         g_free(uuid);
229         g_free(msg_uuid);
230         return -1;
231     }
232 
233     if (msg.id > migrate_multifd_channels()) {
234         error_setg(errp, "multifd: received channel version %d "
235                    "expected %d", msg.version, MULTIFD_VERSION);
236         return -1;
237     }
238 
239     return msg.id;
240 }
241 
242 static MultiFDPages_t *multifd_pages_init(size_t size)
243 {
244     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
245 
246     pages->allocated = size;
247     pages->iov = g_new0(struct iovec, size);
248     pages->offset = g_new0(ram_addr_t, size);
249 
250     return pages;
251 }
252 
253 static void multifd_pages_clear(MultiFDPages_t *pages)
254 {
255     pages->used = 0;
256     pages->allocated = 0;
257     pages->packet_num = 0;
258     pages->block = NULL;
259     g_free(pages->iov);
260     pages->iov = NULL;
261     g_free(pages->offset);
262     pages->offset = NULL;
263     g_free(pages);
264 }
265 
266 static void multifd_send_fill_packet(MultiFDSendParams *p)
267 {
268     MultiFDPacket_t *packet = p->packet;
269     int i;
270 
271     packet->flags = cpu_to_be32(p->flags);
272     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
273     packet->pages_used = cpu_to_be32(p->pages->used);
274     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
275     packet->packet_num = cpu_to_be64(p->packet_num);
276 
277     if (p->pages->block) {
278         strncpy(packet->ramblock, p->pages->block->idstr, 256);
279     }
280 
281     for (i = 0; i < p->pages->used; i++) {
282         /* there are architectures where ram_addr_t is 32 bit */
283         uint64_t temp = p->pages->offset[i];
284 
285         packet->offset[i] = cpu_to_be64(temp);
286     }
287 }
288 
289 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
290 {
291     MultiFDPacket_t *packet = p->packet;
292     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
293     RAMBlock *block;
294     int i;
295 
296     packet->magic = be32_to_cpu(packet->magic);
297     if (packet->magic != MULTIFD_MAGIC) {
298         error_setg(errp, "multifd: received packet "
299                    "magic %x and expected magic %x",
300                    packet->magic, MULTIFD_MAGIC);
301         return -1;
302     }
303 
304     packet->version = be32_to_cpu(packet->version);
305     if (packet->version != MULTIFD_VERSION) {
306         error_setg(errp, "multifd: received packet "
307                    "version %d and expected version %d",
308                    packet->version, MULTIFD_VERSION);
309         return -1;
310     }
311 
312     p->flags = be32_to_cpu(packet->flags);
313 
314     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
315     /*
316      * If we received a packet that is 100 times bigger than expected
317      * just stop migration.  It is a magic number.
318      */
319     if (packet->pages_alloc > pages_max * 100) {
320         error_setg(errp, "multifd: received packet "
321                    "with size %d and expected a maximum size of %d",
322                    packet->pages_alloc, pages_max * 100) ;
323         return -1;
324     }
325     /*
326      * We received a packet that is bigger than expected but inside
327      * reasonable limits (see previous comment).  Just reallocate.
328      */
329     if (packet->pages_alloc > p->pages->allocated) {
330         multifd_pages_clear(p->pages);
331         p->pages = multifd_pages_init(packet->pages_alloc);
332     }
333 
334     p->pages->used = be32_to_cpu(packet->pages_used);
335     if (p->pages->used > packet->pages_alloc) {
336         error_setg(errp, "multifd: received packet "
337                    "with %d pages and expected maximum pages are %d",
338                    p->pages->used, packet->pages_alloc) ;
339         return -1;
340     }
341 
342     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
343     p->packet_num = be64_to_cpu(packet->packet_num);
344 
345     if (p->pages->used == 0) {
346         return 0;
347     }
348 
349     /* make sure that ramblock is 0 terminated */
350     packet->ramblock[255] = 0;
351     block = qemu_ram_block_by_name(packet->ramblock);
352     if (!block) {
353         error_setg(errp, "multifd: unknown ram block %s",
354                    packet->ramblock);
355         return -1;
356     }
357 
358     for (i = 0; i < p->pages->used; i++) {
359         uint64_t offset = be64_to_cpu(packet->offset[i]);
360 
361         if (offset > (block->used_length - qemu_target_page_size())) {
362             error_setg(errp, "multifd: offset too long %" PRIu64
363                        " (max " RAM_ADDR_FMT ")",
364                        offset, block->used_length);
365             return -1;
366         }
367         p->pages->iov[i].iov_base = block->host + offset;
368         p->pages->iov[i].iov_len = qemu_target_page_size();
369     }
370 
371     return 0;
372 }
373 
374 struct {
375     MultiFDSendParams *params;
376     /* array of pages to sent */
377     MultiFDPages_t *pages;
378     /* global number of generated multifd packets */
379     uint64_t packet_num;
380     /* send channels ready */
381     QemuSemaphore channels_ready;
382     /*
383      * Have we already run terminate threads.  There is a race when it
384      * happens that we got one error while we are exiting.
385      * We will use atomic operations.  Only valid values are 0 and 1.
386      */
387     int exiting;
388     /* multifd ops */
389     MultiFDMethods *ops;
390 } *multifd_send_state;
391 
392 /*
393  * How we use multifd_send_state->pages and channel->pages?
394  *
395  * We create a pages for each channel, and a main one.  Each time that
396  * we need to send a batch of pages we interchange the ones between
397  * multifd_send_state and the channel that is sending it.  There are
398  * two reasons for that:
399  *    - to not have to do so many mallocs during migration
400  *    - to make easier to know what to free at the end of migration
401  *
402  * This way we always know who is the owner of each "pages" struct,
403  * and we don't need any locking.  It belongs to the migration thread
404  * or to the channel thread.  Switching is safe because the migration
405  * thread is using the channel mutex when changing it, and the channel
406  * have to had finish with its own, otherwise pending_job can't be
407  * false.
408  */
409 
410 static int multifd_send_pages(QEMUFile *f)
411 {
412     int i;
413     static int next_channel;
414     MultiFDSendParams *p = NULL; /* make happy gcc */
415     MultiFDPages_t *pages = multifd_send_state->pages;
416     uint64_t transferred;
417 
418     if (qatomic_read(&multifd_send_state->exiting)) {
419         return -1;
420     }
421 
422     qemu_sem_wait(&multifd_send_state->channels_ready);
423     /*
424      * next_channel can remain from a previous migration that was
425      * using more channels, so ensure it doesn't overflow if the
426      * limit is lower now.
427      */
428     next_channel %= migrate_multifd_channels();
429     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
430         p = &multifd_send_state->params[i];
431 
432         qemu_mutex_lock(&p->mutex);
433         if (p->quit) {
434             error_report("%s: channel %d has already quit!", __func__, i);
435             qemu_mutex_unlock(&p->mutex);
436             return -1;
437         }
438         if (!p->pending_job) {
439             p->pending_job++;
440             next_channel = (i + 1) % migrate_multifd_channels();
441             break;
442         }
443         qemu_mutex_unlock(&p->mutex);
444     }
445     assert(!p->pages->used);
446     assert(!p->pages->block);
447 
448     p->packet_num = multifd_send_state->packet_num++;
449     multifd_send_state->pages = p->pages;
450     p->pages = pages;
451     transferred = ((uint64_t) pages->used) * qemu_target_page_size()
452                 + p->packet_len;
453     qemu_file_update_transfer(f, transferred);
454     ram_counters.multifd_bytes += transferred;
455     ram_counters.transferred += transferred;
456     qemu_mutex_unlock(&p->mutex);
457     qemu_sem_post(&p->sem);
458 
459     return 1;
460 }
461 
462 int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
463 {
464     MultiFDPages_t *pages = multifd_send_state->pages;
465 
466     if (!pages->block) {
467         pages->block = block;
468     }
469 
470     if (pages->block == block) {
471         pages->offset[pages->used] = offset;
472         pages->iov[pages->used].iov_base = block->host + offset;
473         pages->iov[pages->used].iov_len = qemu_target_page_size();
474         pages->used++;
475 
476         if (pages->used < pages->allocated) {
477             return 1;
478         }
479     }
480 
481     if (multifd_send_pages(f) < 0) {
482         return -1;
483     }
484 
485     if (pages->block != block) {
486         return  multifd_queue_page(f, block, offset);
487     }
488 
489     return 1;
490 }
491 
492 static void multifd_send_terminate_threads(Error *err)
493 {
494     int i;
495 
496     trace_multifd_send_terminate_threads(err != NULL);
497 
498     if (err) {
499         MigrationState *s = migrate_get_current();
500         migrate_set_error(s, err);
501         if (s->state == MIGRATION_STATUS_SETUP ||
502             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
503             s->state == MIGRATION_STATUS_DEVICE ||
504             s->state == MIGRATION_STATUS_ACTIVE) {
505             migrate_set_state(&s->state, s->state,
506                               MIGRATION_STATUS_FAILED);
507         }
508     }
509 
510     /*
511      * We don't want to exit each threads twice.  Depending on where
512      * we get the error, or if there are two independent errors in two
513      * threads at the same time, we can end calling this function
514      * twice.
515      */
516     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
517         return;
518     }
519 
520     for (i = 0; i < migrate_multifd_channels(); i++) {
521         MultiFDSendParams *p = &multifd_send_state->params[i];
522 
523         qemu_mutex_lock(&p->mutex);
524         p->quit = true;
525         qemu_sem_post(&p->sem);
526         qemu_mutex_unlock(&p->mutex);
527     }
528 }
529 
530 void multifd_save_cleanup(void)
531 {
532     int i;
533 
534     if (!migrate_use_multifd() || !migrate_multifd_is_allowed()) {
535         return;
536     }
537     multifd_send_terminate_threads(NULL);
538     for (i = 0; i < migrate_multifd_channels(); i++) {
539         MultiFDSendParams *p = &multifd_send_state->params[i];
540 
541         if (p->running) {
542             qemu_thread_join(&p->thread);
543         }
544     }
545     for (i = 0; i < migrate_multifd_channels(); i++) {
546         MultiFDSendParams *p = &multifd_send_state->params[i];
547         Error *local_err = NULL;
548 
549         if (p->registered_yank) {
550             migration_ioc_unregister_yank(p->c);
551         }
552         socket_send_channel_destroy(p->c);
553         p->c = NULL;
554         qemu_mutex_destroy(&p->mutex);
555         qemu_sem_destroy(&p->sem);
556         qemu_sem_destroy(&p->sem_sync);
557         g_free(p->name);
558         p->name = NULL;
559         g_free(p->tls_hostname);
560         p->tls_hostname = NULL;
561         multifd_pages_clear(p->pages);
562         p->pages = NULL;
563         p->packet_len = 0;
564         g_free(p->packet);
565         p->packet = NULL;
566         multifd_send_state->ops->send_cleanup(p, &local_err);
567         if (local_err) {
568             migrate_set_error(migrate_get_current(), local_err);
569             error_free(local_err);
570         }
571     }
572     qemu_sem_destroy(&multifd_send_state->channels_ready);
573     g_free(multifd_send_state->params);
574     multifd_send_state->params = NULL;
575     multifd_pages_clear(multifd_send_state->pages);
576     multifd_send_state->pages = NULL;
577     g_free(multifd_send_state);
578     multifd_send_state = NULL;
579 }
580 
581 void multifd_send_sync_main(QEMUFile *f)
582 {
583     int i;
584 
585     if (!migrate_use_multifd()) {
586         return;
587     }
588     if (multifd_send_state->pages->used) {
589         if (multifd_send_pages(f) < 0) {
590             error_report("%s: multifd_send_pages fail", __func__);
591             return;
592         }
593     }
594     for (i = 0; i < migrate_multifd_channels(); i++) {
595         MultiFDSendParams *p = &multifd_send_state->params[i];
596 
597         trace_multifd_send_sync_main_signal(p->id);
598 
599         qemu_mutex_lock(&p->mutex);
600 
601         if (p->quit) {
602             error_report("%s: channel %d has already quit", __func__, i);
603             qemu_mutex_unlock(&p->mutex);
604             return;
605         }
606 
607         p->packet_num = multifd_send_state->packet_num++;
608         p->flags |= MULTIFD_FLAG_SYNC;
609         p->pending_job++;
610         qemu_file_update_transfer(f, p->packet_len);
611         ram_counters.multifd_bytes += p->packet_len;
612         ram_counters.transferred += p->packet_len;
613         qemu_mutex_unlock(&p->mutex);
614         qemu_sem_post(&p->sem);
615     }
616     for (i = 0; i < migrate_multifd_channels(); i++) {
617         MultiFDSendParams *p = &multifd_send_state->params[i];
618 
619         trace_multifd_send_sync_main_wait(p->id);
620         qemu_sem_wait(&p->sem_sync);
621     }
622     trace_multifd_send_sync_main(multifd_send_state->packet_num);
623 }
624 
625 static void *multifd_send_thread(void *opaque)
626 {
627     MultiFDSendParams *p = opaque;
628     Error *local_err = NULL;
629     int ret = 0;
630     uint32_t flags = 0;
631 
632     trace_multifd_send_thread_start(p->id);
633     rcu_register_thread();
634 
635     if (multifd_send_initial_packet(p, &local_err) < 0) {
636         ret = -1;
637         goto out;
638     }
639     /* initial packet */
640     p->num_packets = 1;
641 
642     while (true) {
643         qemu_sem_wait(&p->sem);
644 
645         if (qatomic_read(&multifd_send_state->exiting)) {
646             break;
647         }
648         qemu_mutex_lock(&p->mutex);
649 
650         if (p->pending_job) {
651             uint32_t used = p->pages->used;
652             uint64_t packet_num = p->packet_num;
653             flags = p->flags;
654 
655             if (used) {
656                 ret = multifd_send_state->ops->send_prepare(p, used,
657                                                             &local_err);
658                 if (ret != 0) {
659                     qemu_mutex_unlock(&p->mutex);
660                     break;
661                 }
662             }
663             multifd_send_fill_packet(p);
664             p->flags = 0;
665             p->num_packets++;
666             p->num_pages += used;
667             p->pages->used = 0;
668             p->pages->block = NULL;
669             qemu_mutex_unlock(&p->mutex);
670 
671             trace_multifd_send(p->id, packet_num, used, flags,
672                                p->next_packet_size);
673 
674             ret = qio_channel_write_all(p->c, (void *)p->packet,
675                                         p->packet_len, &local_err);
676             if (ret != 0) {
677                 break;
678             }
679 
680             if (used) {
681                 ret = multifd_send_state->ops->send_write(p, used, &local_err);
682                 if (ret != 0) {
683                     break;
684                 }
685             }
686 
687             qemu_mutex_lock(&p->mutex);
688             p->pending_job--;
689             qemu_mutex_unlock(&p->mutex);
690 
691             if (flags & MULTIFD_FLAG_SYNC) {
692                 qemu_sem_post(&p->sem_sync);
693             }
694             qemu_sem_post(&multifd_send_state->channels_ready);
695         } else if (p->quit) {
696             qemu_mutex_unlock(&p->mutex);
697             break;
698         } else {
699             qemu_mutex_unlock(&p->mutex);
700             /* sometimes there are spurious wakeups */
701         }
702     }
703 
704 out:
705     if (local_err) {
706         trace_multifd_send_error(p->id);
707         multifd_send_terminate_threads(local_err);
708         error_free(local_err);
709     }
710 
711     /*
712      * Error happen, I will exit, but I can't just leave, tell
713      * who pay attention to me.
714      */
715     if (ret != 0) {
716         qemu_sem_post(&p->sem_sync);
717         qemu_sem_post(&multifd_send_state->channels_ready);
718     }
719 
720     qemu_mutex_lock(&p->mutex);
721     p->running = false;
722     qemu_mutex_unlock(&p->mutex);
723 
724     rcu_unregister_thread();
725     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
726 
727     return NULL;
728 }
729 
730 static bool multifd_channel_connect(MultiFDSendParams *p,
731                                     QIOChannel *ioc,
732                                     Error *error);
733 
734 static void multifd_tls_outgoing_handshake(QIOTask *task,
735                                            gpointer opaque)
736 {
737     MultiFDSendParams *p = opaque;
738     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
739     Error *err = NULL;
740 
741     if (qio_task_propagate_error(task, &err)) {
742         trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
743     } else {
744         trace_multifd_tls_outgoing_handshake_complete(ioc);
745     }
746 
747     if (!multifd_channel_connect(p, ioc, err)) {
748         /*
749          * Error happen, mark multifd_send_thread status as 'quit' although it
750          * is not created, and then tell who pay attention to me.
751          */
752         p->quit = true;
753         qemu_sem_post(&multifd_send_state->channels_ready);
754         qemu_sem_post(&p->sem_sync);
755     }
756 }
757 
758 static void *multifd_tls_handshake_thread(void *opaque)
759 {
760     MultiFDSendParams *p = opaque;
761     QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
762 
763     qio_channel_tls_handshake(tioc,
764                               multifd_tls_outgoing_handshake,
765                               p,
766                               NULL,
767                               NULL);
768     return NULL;
769 }
770 
771 static void multifd_tls_channel_connect(MultiFDSendParams *p,
772                                         QIOChannel *ioc,
773                                         Error **errp)
774 {
775     MigrationState *s = migrate_get_current();
776     const char *hostname = p->tls_hostname;
777     QIOChannelTLS *tioc;
778 
779     tioc = migration_tls_client_create(s, ioc, hostname, errp);
780     if (!tioc) {
781         return;
782     }
783 
784     object_unref(OBJECT(ioc));
785     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
786     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
787     p->c = QIO_CHANNEL(tioc);
788     qemu_thread_create(&p->thread, "multifd-tls-handshake-worker",
789                        multifd_tls_handshake_thread, p,
790                        QEMU_THREAD_JOINABLE);
791 }
792 
793 static bool multifd_channel_connect(MultiFDSendParams *p,
794                                     QIOChannel *ioc,
795                                     Error *error)
796 {
797     MigrationState *s = migrate_get_current();
798 
799     trace_multifd_set_outgoing_channel(
800         ioc, object_get_typename(OBJECT(ioc)), p->tls_hostname, error);
801 
802     if (!error) {
803         if (s->parameters.tls_creds &&
804             *s->parameters.tls_creds &&
805             !object_dynamic_cast(OBJECT(ioc),
806                                  TYPE_QIO_CHANNEL_TLS)) {
807             multifd_tls_channel_connect(p, ioc, &error);
808             if (!error) {
809                 /*
810                  * tls_channel_connect will call back to this
811                  * function after the TLS handshake,
812                  * so we mustn't call multifd_send_thread until then
813                  */
814                 return true;
815             } else {
816                 return false;
817             }
818         } else {
819             migration_ioc_register_yank(ioc);
820             p->registered_yank = true;
821             p->c = ioc;
822             qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
823                                    QEMU_THREAD_JOINABLE);
824        }
825        return true;
826     }
827 
828     return false;
829 }
830 
831 static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
832                                              QIOChannel *ioc, Error *err)
833 {
834      migrate_set_error(migrate_get_current(), err);
835      /* Error happen, we need to tell who pay attention to me */
836      qemu_sem_post(&multifd_send_state->channels_ready);
837      qemu_sem_post(&p->sem_sync);
838      /*
839       * Although multifd_send_thread is not created, but main migration
840       * thread neet to judge whether it is running, so we need to mark
841       * its status.
842       */
843      p->quit = true;
844      object_unref(OBJECT(ioc));
845      error_free(err);
846 }
847 
848 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
849 {
850     MultiFDSendParams *p = opaque;
851     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
852     Error *local_err = NULL;
853 
854     trace_multifd_new_send_channel_async(p->id);
855     if (qio_task_propagate_error(task, &local_err)) {
856         goto cleanup;
857     } else {
858         p->c = QIO_CHANNEL(sioc);
859         qio_channel_set_delay(p->c, false);
860         p->running = true;
861         if (!multifd_channel_connect(p, sioc, local_err)) {
862             goto cleanup;
863         }
864         return;
865     }
866 
867 cleanup:
868     multifd_new_send_channel_cleanup(p, sioc, local_err);
869 }
870 
871 static bool migrate_allow_multifd = true;
872 void migrate_protocol_allow_multifd(bool allow)
873 {
874     migrate_allow_multifd = allow;
875 }
876 
877 bool migrate_multifd_is_allowed(void)
878 {
879     return migrate_allow_multifd;
880 }
881 
882 int multifd_save_setup(Error **errp)
883 {
884     int thread_count;
885     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
886     uint8_t i;
887     MigrationState *s;
888 
889     if (!migrate_use_multifd()) {
890         return 0;
891     }
892     if (!migrate_multifd_is_allowed()) {
893         error_setg(errp, "multifd is not supported by current protocol");
894         return -1;
895     }
896 
897     s = migrate_get_current();
898     thread_count = migrate_multifd_channels();
899     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
900     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
901     multifd_send_state->pages = multifd_pages_init(page_count);
902     qemu_sem_init(&multifd_send_state->channels_ready, 0);
903     qatomic_set(&multifd_send_state->exiting, 0);
904     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
905 
906     for (i = 0; i < thread_count; i++) {
907         MultiFDSendParams *p = &multifd_send_state->params[i];
908 
909         qemu_mutex_init(&p->mutex);
910         qemu_sem_init(&p->sem, 0);
911         qemu_sem_init(&p->sem_sync, 0);
912         p->quit = false;
913         p->pending_job = 0;
914         p->id = i;
915         p->pages = multifd_pages_init(page_count);
916         p->packet_len = sizeof(MultiFDPacket_t)
917                       + sizeof(uint64_t) * page_count;
918         p->packet = g_malloc0(p->packet_len);
919         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
920         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
921         p->name = g_strdup_printf("multifdsend_%d", i);
922         p->tls_hostname = g_strdup(s->hostname);
923         socket_send_channel_create(multifd_new_send_channel_async, p);
924     }
925 
926     for (i = 0; i < thread_count; i++) {
927         MultiFDSendParams *p = &multifd_send_state->params[i];
928         Error *local_err = NULL;
929         int ret;
930 
931         ret = multifd_send_state->ops->send_setup(p, &local_err);
932         if (ret) {
933             error_propagate(errp, local_err);
934             return ret;
935         }
936     }
937     return 0;
938 }
939 
940 struct {
941     MultiFDRecvParams *params;
942     /* number of created threads */
943     int count;
944     /* syncs main thread and channels */
945     QemuSemaphore sem_sync;
946     /* global number of generated multifd packets */
947     uint64_t packet_num;
948     /* multifd ops */
949     MultiFDMethods *ops;
950 } *multifd_recv_state;
951 
952 static void multifd_recv_terminate_threads(Error *err)
953 {
954     int i;
955 
956     trace_multifd_recv_terminate_threads(err != NULL);
957 
958     if (err) {
959         MigrationState *s = migrate_get_current();
960         migrate_set_error(s, err);
961         if (s->state == MIGRATION_STATUS_SETUP ||
962             s->state == MIGRATION_STATUS_ACTIVE) {
963             migrate_set_state(&s->state, s->state,
964                               MIGRATION_STATUS_FAILED);
965         }
966     }
967 
968     for (i = 0; i < migrate_multifd_channels(); i++) {
969         MultiFDRecvParams *p = &multifd_recv_state->params[i];
970 
971         qemu_mutex_lock(&p->mutex);
972         p->quit = true;
973         /*
974          * We could arrive here for two reasons:
975          *  - normal quit, i.e. everything went fine, just finished
976          *  - error quit: We close the channels so the channel threads
977          *    finish the qio_channel_read_all_eof()
978          */
979         if (p->c) {
980             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
981         }
982         qemu_mutex_unlock(&p->mutex);
983     }
984 }
985 
986 int multifd_load_cleanup(Error **errp)
987 {
988     int i;
989 
990     if (!migrate_use_multifd() || !migrate_multifd_is_allowed()) {
991         return 0;
992     }
993     multifd_recv_terminate_threads(NULL);
994     for (i = 0; i < migrate_multifd_channels(); i++) {
995         MultiFDRecvParams *p = &multifd_recv_state->params[i];
996 
997         if (p->running) {
998             p->quit = true;
999             /*
1000              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1001              * however try to wakeup it without harm in cleanup phase.
1002              */
1003             qemu_sem_post(&p->sem_sync);
1004             qemu_thread_join(&p->thread);
1005         }
1006     }
1007     for (i = 0; i < migrate_multifd_channels(); i++) {
1008         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1009 
1010         migration_ioc_unregister_yank(p->c);
1011         object_unref(OBJECT(p->c));
1012         p->c = NULL;
1013         qemu_mutex_destroy(&p->mutex);
1014         qemu_sem_destroy(&p->sem_sync);
1015         g_free(p->name);
1016         p->name = NULL;
1017         multifd_pages_clear(p->pages);
1018         p->pages = NULL;
1019         p->packet_len = 0;
1020         g_free(p->packet);
1021         p->packet = NULL;
1022         multifd_recv_state->ops->recv_cleanup(p);
1023     }
1024     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1025     g_free(multifd_recv_state->params);
1026     multifd_recv_state->params = NULL;
1027     g_free(multifd_recv_state);
1028     multifd_recv_state = NULL;
1029 
1030     return 0;
1031 }
1032 
1033 void multifd_recv_sync_main(void)
1034 {
1035     int i;
1036 
1037     if (!migrate_use_multifd()) {
1038         return;
1039     }
1040     for (i = 0; i < migrate_multifd_channels(); i++) {
1041         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1042 
1043         trace_multifd_recv_sync_main_wait(p->id);
1044         qemu_sem_wait(&multifd_recv_state->sem_sync);
1045     }
1046     for (i = 0; i < migrate_multifd_channels(); i++) {
1047         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1048 
1049         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1050             if (multifd_recv_state->packet_num < p->packet_num) {
1051                 multifd_recv_state->packet_num = p->packet_num;
1052             }
1053         }
1054         trace_multifd_recv_sync_main_signal(p->id);
1055         qemu_sem_post(&p->sem_sync);
1056     }
1057     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1058 }
1059 
1060 static void *multifd_recv_thread(void *opaque)
1061 {
1062     MultiFDRecvParams *p = opaque;
1063     Error *local_err = NULL;
1064     int ret;
1065 
1066     trace_multifd_recv_thread_start(p->id);
1067     rcu_register_thread();
1068 
1069     while (true) {
1070         uint32_t used;
1071         uint32_t flags;
1072 
1073         if (p->quit) {
1074             break;
1075         }
1076 
1077         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1078                                        p->packet_len, &local_err);
1079         if (ret == 0) {   /* EOF */
1080             break;
1081         }
1082         if (ret == -1) {   /* Error */
1083             break;
1084         }
1085 
1086         qemu_mutex_lock(&p->mutex);
1087         ret = multifd_recv_unfill_packet(p, &local_err);
1088         if (ret) {
1089             qemu_mutex_unlock(&p->mutex);
1090             break;
1091         }
1092 
1093         used = p->pages->used;
1094         flags = p->flags;
1095         /* recv methods don't know how to handle the SYNC flag */
1096         p->flags &= ~MULTIFD_FLAG_SYNC;
1097         trace_multifd_recv(p->id, p->packet_num, used, flags,
1098                            p->next_packet_size);
1099         p->num_packets++;
1100         p->num_pages += used;
1101         qemu_mutex_unlock(&p->mutex);
1102 
1103         if (used) {
1104             ret = multifd_recv_state->ops->recv_pages(p, used, &local_err);
1105             if (ret != 0) {
1106                 break;
1107             }
1108         }
1109 
1110         if (flags & MULTIFD_FLAG_SYNC) {
1111             qemu_sem_post(&multifd_recv_state->sem_sync);
1112             qemu_sem_wait(&p->sem_sync);
1113         }
1114     }
1115 
1116     if (local_err) {
1117         multifd_recv_terminate_threads(local_err);
1118         error_free(local_err);
1119     }
1120     qemu_mutex_lock(&p->mutex);
1121     p->running = false;
1122     qemu_mutex_unlock(&p->mutex);
1123 
1124     rcu_unregister_thread();
1125     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1126 
1127     return NULL;
1128 }
1129 
1130 int multifd_load_setup(Error **errp)
1131 {
1132     int thread_count;
1133     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1134     uint8_t i;
1135 
1136     if (!migrate_use_multifd()) {
1137         return 0;
1138     }
1139     if (!migrate_multifd_is_allowed()) {
1140         error_setg(errp, "multifd is not supported by current protocol");
1141         return -1;
1142     }
1143     thread_count = migrate_multifd_channels();
1144     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1145     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1146     qatomic_set(&multifd_recv_state->count, 0);
1147     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1148     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1149 
1150     for (i = 0; i < thread_count; i++) {
1151         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1152 
1153         qemu_mutex_init(&p->mutex);
1154         qemu_sem_init(&p->sem_sync, 0);
1155         p->quit = false;
1156         p->id = i;
1157         p->pages = multifd_pages_init(page_count);
1158         p->packet_len = sizeof(MultiFDPacket_t)
1159                       + sizeof(uint64_t) * page_count;
1160         p->packet = g_malloc0(p->packet_len);
1161         p->name = g_strdup_printf("multifdrecv_%d", i);
1162     }
1163 
1164     for (i = 0; i < thread_count; i++) {
1165         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1166         Error *local_err = NULL;
1167         int ret;
1168 
1169         ret = multifd_recv_state->ops->recv_setup(p, &local_err);
1170         if (ret) {
1171             error_propagate(errp, local_err);
1172             return ret;
1173         }
1174     }
1175     return 0;
1176 }
1177 
1178 bool multifd_recv_all_channels_created(void)
1179 {
1180     int thread_count = migrate_multifd_channels();
1181 
1182     if (!migrate_use_multifd()) {
1183         return true;
1184     }
1185 
1186     if (!multifd_recv_state) {
1187         /* Called before any connections created */
1188         return false;
1189     }
1190 
1191     return thread_count == qatomic_read(&multifd_recv_state->count);
1192 }
1193 
1194 /*
1195  * Try to receive all multifd channels to get ready for the migration.
1196  * - Return true and do not set @errp when correctly receiving all channels;
1197  * - Return false and do not set @errp when correctly receiving the current one;
1198  * - Return false and set @errp when failing to receive the current channel.
1199  */
1200 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1201 {
1202     MultiFDRecvParams *p;
1203     Error *local_err = NULL;
1204     int id;
1205 
1206     id = multifd_recv_initial_packet(ioc, &local_err);
1207     if (id < 0) {
1208         multifd_recv_terminate_threads(local_err);
1209         error_propagate_prepend(errp, local_err,
1210                                 "failed to receive packet"
1211                                 " via multifd channel %d: ",
1212                                 qatomic_read(&multifd_recv_state->count));
1213         return false;
1214     }
1215     trace_multifd_recv_new_channel(id);
1216 
1217     p = &multifd_recv_state->params[id];
1218     if (p->c != NULL) {
1219         error_setg(&local_err, "multifd: received id '%d' already setup'",
1220                    id);
1221         multifd_recv_terminate_threads(local_err);
1222         error_propagate(errp, local_err);
1223         return false;
1224     }
1225     p->c = ioc;
1226     object_ref(OBJECT(ioc));
1227     /* initial packet */
1228     p->num_packets = 1;
1229 
1230     p->running = true;
1231     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1232                        QEMU_THREAD_JOINABLE);
1233     qatomic_inc(&multifd_recv_state->count);
1234     return qatomic_read(&multifd_recv_state->count) ==
1235            migrate_multifd_channels();
1236 }
1237