xref: /openbmc/qemu/migration/multifd.c (revision 32cfefb9)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /*
53      * Global number of generated multifd packets.
54      *
55      * Note that we used 'uintptr_t' because it'll naturally support atomic
56      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
57      * multifd will overflow the packet_num easier, but that should be
58      * fine.
59      *
60      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61      * hosts, however so far it does not support atomic fetch_add() yet.
62      * Make it easy for now.
63      */
64     uintptr_t packet_num;
65     /*
66      * Synchronization point past which no more channels will be
67      * created.
68      */
69     QemuSemaphore channels_created;
70     /* send channels ready */
71     QemuSemaphore channels_ready;
72     /*
73      * Have we already run terminate threads.  There is a race when it
74      * happens that we got one error while we are exiting.
75      * We will use atomic operations.  Only valid values are 0 and 1.
76      */
77     int exiting;
78     /* multifd ops */
79     const MultiFDMethods *ops;
80 } *multifd_send_state;
81 
82 struct {
83     MultiFDRecvParams *params;
84     MultiFDRecvData *data;
85     /* number of created threads */
86     int count;
87     /*
88      * This is always posted by the recv threads, the migration thread
89      * uses it to wait for recv threads to finish assigned tasks.
90      */
91     QemuSemaphore sem_sync;
92     /* global number of generated multifd packets */
93     uint64_t packet_num;
94     int exiting;
95     /* multifd ops */
96     const MultiFDMethods *ops;
97 } *multifd_recv_state;
98 
99 MultiFDSendData *multifd_send_data_alloc(void)
100 {
101     size_t max_payload_size, size_minus_payload;
102 
103     /*
104      * MultiFDPages_t has a flexible array at the end, account for it
105      * when allocating MultiFDSendData. Use max() in case other types
106      * added to the union in the future are larger than
107      * (MultiFDPages_t + flex array).
108      */
109     max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
110 
111     /*
112      * Account for any holes the compiler might insert. We can't pack
113      * the structure because that misaligns the members and triggers
114      * Waddress-of-packed-member.
115      */
116     size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
117 
118     return g_malloc0(size_minus_payload + max_payload_size);
119 }
120 
121 static bool multifd_use_packets(void)
122 {
123     return !migrate_mapped_ram();
124 }
125 
126 void multifd_send_channel_created(void)
127 {
128     qemu_sem_post(&multifd_send_state->channels_created);
129 }
130 
131 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
132 
133 void multifd_register_ops(int method, const MultiFDMethods *ops)
134 {
135     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
136     assert(!multifd_ops[method]);
137     multifd_ops[method] = ops;
138 }
139 
140 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
141 {
142     MultiFDInit_t msg = {};
143     size_t size = sizeof(msg);
144     int ret;
145 
146     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
147     msg.version = cpu_to_be32(MULTIFD_VERSION);
148     msg.id = p->id;
149     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
150 
151     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
152     if (ret != 0) {
153         return -1;
154     }
155     stat64_add(&mig_stats.multifd_bytes, size);
156     return 0;
157 }
158 
159 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
160 {
161     MultiFDInit_t msg;
162     int ret;
163 
164     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
165     if (ret != 0) {
166         return -1;
167     }
168 
169     msg.magic = be32_to_cpu(msg.magic);
170     msg.version = be32_to_cpu(msg.version);
171 
172     if (msg.magic != MULTIFD_MAGIC) {
173         error_setg(errp, "multifd: received packet magic %x "
174                    "expected %x", msg.magic, MULTIFD_MAGIC);
175         return -1;
176     }
177 
178     if (msg.version != MULTIFD_VERSION) {
179         error_setg(errp, "multifd: received packet version %u "
180                    "expected %u", msg.version, MULTIFD_VERSION);
181         return -1;
182     }
183 
184     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
185         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
186         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
187 
188         error_setg(errp, "multifd: received uuid '%s' and expected "
189                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
190         g_free(uuid);
191         g_free(msg_uuid);
192         return -1;
193     }
194 
195     if (msg.id > migrate_multifd_channels()) {
196         error_setg(errp, "multifd: received channel id %u is greater than "
197                    "number of channels %u", msg.id, migrate_multifd_channels());
198         return -1;
199     }
200 
201     return msg.id;
202 }
203 
204 void multifd_send_fill_packet(MultiFDSendParams *p)
205 {
206     MultiFDPacket_t *packet = p->packet;
207     uint64_t packet_num;
208     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
209 
210     memset(packet, 0, p->packet_len);
211 
212     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
213     packet->version = cpu_to_be32(MULTIFD_VERSION);
214 
215     packet->flags = cpu_to_be32(p->flags);
216     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
217 
218     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
219     packet->packet_num = cpu_to_be64(packet_num);
220 
221     p->packets_sent++;
222 
223     if (!sync_packet) {
224         multifd_ram_fill_packet(p);
225     }
226 
227     trace_multifd_send_fill(p->id, packet_num,
228                             p->flags, p->next_packet_size);
229 }
230 
231 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
232 {
233     const MultiFDPacket_t *packet = p->packet;
234     uint32_t magic = be32_to_cpu(packet->magic);
235     uint32_t version = be32_to_cpu(packet->version);
236     int ret = 0;
237 
238     if (magic != MULTIFD_MAGIC) {
239         error_setg(errp, "multifd: received packet magic %x, expected %x",
240                    magic, MULTIFD_MAGIC);
241         return -1;
242     }
243 
244     if (version != MULTIFD_VERSION) {
245         error_setg(errp, "multifd: received packet version %u, expected %u",
246                    version, MULTIFD_VERSION);
247         return -1;
248     }
249 
250     p->flags = be32_to_cpu(packet->flags);
251     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
252     p->packet_num = be64_to_cpu(packet->packet_num);
253     p->packets_recved++;
254 
255     if (!(p->flags & MULTIFD_FLAG_SYNC)) {
256         ret = multifd_ram_unfill_packet(p, errp);
257     }
258 
259     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
260                               p->next_packet_size);
261 
262     return ret;
263 }
264 
265 static bool multifd_send_should_exit(void)
266 {
267     return qatomic_read(&multifd_send_state->exiting);
268 }
269 
270 static bool multifd_recv_should_exit(void)
271 {
272     return qatomic_read(&multifd_recv_state->exiting);
273 }
274 
275 /*
276  * The migration thread can wait on either of the two semaphores.  This
277  * function can be used to kick the main thread out of waiting on either of
278  * them.  Should mostly only be called when something wrong happened with
279  * the current multifd send thread.
280  */
281 static void multifd_send_kick_main(MultiFDSendParams *p)
282 {
283     qemu_sem_post(&p->sem_sync);
284     qemu_sem_post(&multifd_send_state->channels_ready);
285 }
286 
287 /*
288  * multifd_send() works by exchanging the MultiFDSendData object
289  * provided by the caller with an unused MultiFDSendData object from
290  * the next channel that is found to be idle.
291  *
292  * The channel owns the data until it finishes transmitting and the
293  * caller owns the empty object until it fills it with data and calls
294  * this function again. No locking necessary.
295  *
296  * Switching is safe because both the migration thread and the channel
297  * thread have barriers in place to serialize access.
298  *
299  * Returns true if succeed, false otherwise.
300  */
301 bool multifd_send(MultiFDSendData **send_data)
302 {
303     int i;
304     static int next_channel;
305     MultiFDSendParams *p = NULL; /* make happy gcc */
306     MultiFDSendData *tmp;
307 
308     if (multifd_send_should_exit()) {
309         return false;
310     }
311 
312     /* We wait here, until at least one channel is ready */
313     qemu_sem_wait(&multifd_send_state->channels_ready);
314 
315     /*
316      * next_channel can remain from a previous migration that was
317      * using more channels, so ensure it doesn't overflow if the
318      * limit is lower now.
319      */
320     next_channel %= migrate_multifd_channels();
321     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
322         if (multifd_send_should_exit()) {
323             return false;
324         }
325         p = &multifd_send_state->params[i];
326         /*
327          * Lockless read to p->pending_job is safe, because only multifd
328          * sender thread can clear it.
329          */
330         if (qatomic_read(&p->pending_job) == false) {
331             next_channel = (i + 1) % migrate_multifd_channels();
332             break;
333         }
334     }
335 
336     /*
337      * Make sure we read p->pending_job before all the rest.  Pairs with
338      * qatomic_store_release() in multifd_send_thread().
339      */
340     smp_mb_acquire();
341 
342     assert(multifd_payload_empty(p->data));
343 
344     /*
345      * Swap the pointers. The channel gets the client data for
346      * transferring and the client gets back an unused data slot.
347      */
348     tmp = *send_data;
349     *send_data = p->data;
350     p->data = tmp;
351 
352     /*
353      * Making sure p->data is setup before marking pending_job=true. Pairs
354      * with the qatomic_load_acquire() in multifd_send_thread().
355      */
356     qatomic_store_release(&p->pending_job, true);
357     qemu_sem_post(&p->sem);
358 
359     return true;
360 }
361 
362 /* Multifd send side hit an error; remember it and prepare to quit */
363 static void multifd_send_set_error(Error *err)
364 {
365     /*
366      * We don't want to exit each threads twice.  Depending on where
367      * we get the error, or if there are two independent errors in two
368      * threads at the same time, we can end calling this function
369      * twice.
370      */
371     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
372         return;
373     }
374 
375     if (err) {
376         MigrationState *s = migrate_get_current();
377         migrate_set_error(s, err);
378         if (s->state == MIGRATION_STATUS_SETUP ||
379             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
380             s->state == MIGRATION_STATUS_DEVICE ||
381             s->state == MIGRATION_STATUS_ACTIVE) {
382             migrate_set_state(&s->state, s->state,
383                               MIGRATION_STATUS_FAILED);
384         }
385     }
386 }
387 
388 static void multifd_send_terminate_threads(void)
389 {
390     int i;
391 
392     trace_multifd_send_terminate_threads();
393 
394     /*
395      * Tell everyone we're quitting.  No xchg() needed here; we simply
396      * always set it.
397      */
398     qatomic_set(&multifd_send_state->exiting, 1);
399 
400     /*
401      * Firstly, kick all threads out; no matter whether they are just idle,
402      * or blocked in an IO system call.
403      */
404     for (i = 0; i < migrate_multifd_channels(); i++) {
405         MultiFDSendParams *p = &multifd_send_state->params[i];
406 
407         qemu_sem_post(&p->sem);
408         if (p->c) {
409             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
410         }
411     }
412 
413     /*
414      * Finally recycle all the threads.
415      */
416     for (i = 0; i < migrate_multifd_channels(); i++) {
417         MultiFDSendParams *p = &multifd_send_state->params[i];
418 
419         if (p->tls_thread_created) {
420             qemu_thread_join(&p->tls_thread);
421         }
422 
423         if (p->thread_created) {
424             qemu_thread_join(&p->thread);
425         }
426     }
427 }
428 
429 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
430 {
431     if (p->c) {
432         migration_ioc_unregister_yank(p->c);
433         /*
434          * The object_unref() cannot guarantee the fd will always be
435          * released because finalize() of the iochannel is only
436          * triggered on the last reference and it's not guaranteed
437          * that we always hold the last refcount when reaching here.
438          *
439          * Closing the fd explicitly has the benefit that if there is any
440          * registered I/O handler callbacks on such fd, that will get a
441          * POLLNVAL event and will further trigger the cleanup to finally
442          * release the IOC.
443          *
444          * FIXME: It should logically be guaranteed that all multifd
445          * channels have no I/O handler callback registered when reaching
446          * here, because migration thread will wait for all multifd channel
447          * establishments to complete during setup.  Since
448          * migrate_fd_cleanup() will be scheduled in main thread too, all
449          * previous callbacks should guarantee to be completed when
450          * reaching here.  See multifd_send_state.channels_created and its
451          * usage.  In the future, we could replace this with an assert
452          * making sure we're the last reference, or simply drop it if above
453          * is more clear to be justified.
454          */
455         qio_channel_close(p->c, &error_abort);
456         object_unref(OBJECT(p->c));
457         p->c = NULL;
458     }
459     qemu_sem_destroy(&p->sem);
460     qemu_sem_destroy(&p->sem_sync);
461     g_free(p->name);
462     p->name = NULL;
463     g_free(p->data);
464     p->data = NULL;
465     p->packet_len = 0;
466     g_free(p->packet);
467     p->packet = NULL;
468     multifd_send_state->ops->send_cleanup(p, errp);
469     assert(!p->iov);
470 
471     return *errp == NULL;
472 }
473 
474 static void multifd_send_cleanup_state(void)
475 {
476     file_cleanup_outgoing_migration();
477     socket_cleanup_outgoing_migration();
478     qemu_sem_destroy(&multifd_send_state->channels_created);
479     qemu_sem_destroy(&multifd_send_state->channels_ready);
480     g_free(multifd_send_state->params);
481     multifd_send_state->params = NULL;
482     g_free(multifd_send_state);
483     multifd_send_state = NULL;
484 }
485 
486 void multifd_send_shutdown(void)
487 {
488     int i;
489 
490     if (!migrate_multifd()) {
491         return;
492     }
493 
494     multifd_send_terminate_threads();
495 
496     for (i = 0; i < migrate_multifd_channels(); i++) {
497         MultiFDSendParams *p = &multifd_send_state->params[i];
498         Error *local_err = NULL;
499 
500         if (!multifd_send_cleanup_channel(p, &local_err)) {
501             migrate_set_error(migrate_get_current(), local_err);
502             error_free(local_err);
503         }
504     }
505 
506     multifd_send_cleanup_state();
507 }
508 
509 static int multifd_zero_copy_flush(QIOChannel *c)
510 {
511     int ret;
512     Error *err = NULL;
513 
514     ret = qio_channel_flush(c, &err);
515     if (ret < 0) {
516         error_report_err(err);
517         return -1;
518     }
519     if (ret == 1) {
520         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
521     }
522 
523     return ret;
524 }
525 
526 int multifd_send_sync_main(void)
527 {
528     int i;
529     bool flush_zero_copy;
530 
531     flush_zero_copy = migrate_zero_copy_send();
532 
533     for (i = 0; i < migrate_multifd_channels(); i++) {
534         MultiFDSendParams *p = &multifd_send_state->params[i];
535 
536         if (multifd_send_should_exit()) {
537             return -1;
538         }
539 
540         trace_multifd_send_sync_main_signal(p->id);
541 
542         /*
543          * We should be the only user so far, so not possible to be set by
544          * others concurrently.
545          */
546         assert(qatomic_read(&p->pending_sync) == false);
547         qatomic_set(&p->pending_sync, true);
548         qemu_sem_post(&p->sem);
549     }
550     for (i = 0; i < migrate_multifd_channels(); i++) {
551         MultiFDSendParams *p = &multifd_send_state->params[i];
552 
553         if (multifd_send_should_exit()) {
554             return -1;
555         }
556 
557         qemu_sem_wait(&multifd_send_state->channels_ready);
558         trace_multifd_send_sync_main_wait(p->id);
559         qemu_sem_wait(&p->sem_sync);
560 
561         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
562             return -1;
563         }
564     }
565     trace_multifd_send_sync_main(multifd_send_state->packet_num);
566 
567     return 0;
568 }
569 
570 static void *multifd_send_thread(void *opaque)
571 {
572     MultiFDSendParams *p = opaque;
573     MigrationThread *thread = NULL;
574     Error *local_err = NULL;
575     int ret = 0;
576     bool use_packets = multifd_use_packets();
577 
578     thread = migration_threads_add(p->name, qemu_get_thread_id());
579 
580     trace_multifd_send_thread_start(p->id);
581     rcu_register_thread();
582 
583     if (use_packets) {
584         if (multifd_send_initial_packet(p, &local_err) < 0) {
585             ret = -1;
586             goto out;
587         }
588     }
589 
590     while (true) {
591         qemu_sem_post(&multifd_send_state->channels_ready);
592         qemu_sem_wait(&p->sem);
593 
594         if (multifd_send_should_exit()) {
595             break;
596         }
597 
598         /*
599          * Read pending_job flag before p->data.  Pairs with the
600          * qatomic_store_release() in multifd_send().
601          */
602         if (qatomic_load_acquire(&p->pending_job)) {
603             p->iovs_num = 0;
604             assert(!multifd_payload_empty(p->data));
605 
606             ret = multifd_send_state->ops->send_prepare(p, &local_err);
607             if (ret != 0) {
608                 break;
609             }
610 
611             if (migrate_mapped_ram()) {
612                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
613                                               &p->data->u.ram, &local_err);
614             } else {
615                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
616                                                   NULL, 0, p->write_flags,
617                                                   &local_err);
618             }
619 
620             if (ret != 0) {
621                 break;
622             }
623 
624             stat64_add(&mig_stats.multifd_bytes,
625                        p->next_packet_size + p->packet_len);
626 
627             p->next_packet_size = 0;
628             multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
629 
630             /*
631              * Making sure p->data is published before saying "we're
632              * free".  Pairs with the smp_mb_acquire() in
633              * multifd_send().
634              */
635             qatomic_store_release(&p->pending_job, false);
636         } else {
637             /*
638              * If not a normal job, must be a sync request.  Note that
639              * pending_sync is a standalone flag (unlike pending_job), so
640              * it doesn't require explicit memory barriers.
641              */
642             assert(qatomic_read(&p->pending_sync));
643 
644             if (use_packets) {
645                 p->flags = MULTIFD_FLAG_SYNC;
646                 multifd_send_fill_packet(p);
647                 ret = qio_channel_write_all(p->c, (void *)p->packet,
648                                             p->packet_len, &local_err);
649                 if (ret != 0) {
650                     break;
651                 }
652                 /* p->next_packet_size will always be zero for a SYNC packet */
653                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
654                 p->flags = 0;
655             }
656 
657             qatomic_set(&p->pending_sync, false);
658             qemu_sem_post(&p->sem_sync);
659         }
660     }
661 
662 out:
663     if (ret) {
664         assert(local_err);
665         trace_multifd_send_error(p->id);
666         multifd_send_set_error(local_err);
667         multifd_send_kick_main(p);
668         error_free(local_err);
669     }
670 
671     rcu_unregister_thread();
672     migration_threads_remove(thread);
673     trace_multifd_send_thread_end(p->id, p->packets_sent);
674 
675     return NULL;
676 }
677 
678 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
679 
680 typedef struct {
681     MultiFDSendParams *p;
682     QIOChannelTLS *tioc;
683 } MultiFDTLSThreadArgs;
684 
685 static void *multifd_tls_handshake_thread(void *opaque)
686 {
687     MultiFDTLSThreadArgs *args = opaque;
688 
689     qio_channel_tls_handshake(args->tioc,
690                               multifd_new_send_channel_async,
691                               args->p,
692                               NULL,
693                               NULL);
694     g_free(args);
695 
696     return NULL;
697 }
698 
699 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
700                                         QIOChannel *ioc,
701                                         Error **errp)
702 {
703     MigrationState *s = migrate_get_current();
704     const char *hostname = s->hostname;
705     MultiFDTLSThreadArgs *args;
706     QIOChannelTLS *tioc;
707 
708     tioc = migration_tls_client_create(ioc, hostname, errp);
709     if (!tioc) {
710         return false;
711     }
712 
713     /*
714      * Ownership of the socket channel now transfers to the newly
715      * created TLS channel, which has already taken a reference.
716      */
717     object_unref(OBJECT(ioc));
718     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
719     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
720 
721     args = g_new0(MultiFDTLSThreadArgs, 1);
722     args->tioc = tioc;
723     args->p = p;
724 
725     p->tls_thread_created = true;
726     qemu_thread_create(&p->tls_thread, "mig/src/tls",
727                        multifd_tls_handshake_thread, args,
728                        QEMU_THREAD_JOINABLE);
729     return true;
730 }
731 
732 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
733 {
734     qio_channel_set_delay(ioc, false);
735 
736     migration_ioc_register_yank(ioc);
737     /* Setup p->c only if the channel is completely setup */
738     p->c = ioc;
739 
740     p->thread_created = true;
741     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
742                        QEMU_THREAD_JOINABLE);
743 }
744 
745 /*
746  * When TLS is enabled this function is called once to establish the
747  * TLS connection and a second time after the TLS handshake to create
748  * the multifd channel. Without TLS it goes straight into the channel
749  * creation.
750  */
751 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
752 {
753     MultiFDSendParams *p = opaque;
754     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
755     Error *local_err = NULL;
756     bool ret;
757 
758     trace_multifd_new_send_channel_async(p->id);
759 
760     if (qio_task_propagate_error(task, &local_err)) {
761         ret = false;
762         goto out;
763     }
764 
765     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
766                                        migrate_get_current()->hostname);
767 
768     if (migrate_channel_requires_tls_upgrade(ioc)) {
769         ret = multifd_tls_channel_connect(p, ioc, &local_err);
770         if (ret) {
771             return;
772         }
773     } else {
774         multifd_channel_connect(p, ioc);
775         ret = true;
776     }
777 
778 out:
779     /*
780      * Here we're not interested whether creation succeeded, only that
781      * it happened at all.
782      */
783     multifd_send_channel_created();
784 
785     if (ret) {
786         return;
787     }
788 
789     trace_multifd_new_send_channel_async_error(p->id, local_err);
790     multifd_send_set_error(local_err);
791     /*
792      * For error cases (TLS or non-TLS), IO channel is always freed here
793      * rather than when cleanup multifd: since p->c is not set, multifd
794      * cleanup code doesn't even know its existence.
795      */
796     object_unref(OBJECT(ioc));
797     error_free(local_err);
798 }
799 
800 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
801 {
802     if (!multifd_use_packets()) {
803         return file_send_channel_create(opaque, errp);
804     }
805 
806     socket_send_channel_create(multifd_new_send_channel_async, opaque);
807     return true;
808 }
809 
810 bool multifd_send_setup(void)
811 {
812     MigrationState *s = migrate_get_current();
813     int thread_count, ret = 0;
814     uint32_t page_count = multifd_ram_page_count();
815     bool use_packets = multifd_use_packets();
816     uint8_t i;
817 
818     if (!migrate_multifd()) {
819         return true;
820     }
821 
822     thread_count = migrate_multifd_channels();
823     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
824     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
825     qemu_sem_init(&multifd_send_state->channels_created, 0);
826     qemu_sem_init(&multifd_send_state->channels_ready, 0);
827     qatomic_set(&multifd_send_state->exiting, 0);
828     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
829 
830     for (i = 0; i < thread_count; i++) {
831         MultiFDSendParams *p = &multifd_send_state->params[i];
832         Error *local_err = NULL;
833 
834         qemu_sem_init(&p->sem, 0);
835         qemu_sem_init(&p->sem_sync, 0);
836         p->id = i;
837         p->data = multifd_send_data_alloc();
838 
839         if (use_packets) {
840             p->packet_len = sizeof(MultiFDPacket_t)
841                           + sizeof(uint64_t) * page_count;
842             p->packet = g_malloc0(p->packet_len);
843         }
844         p->name = g_strdup_printf("mig/src/send_%d", i);
845         p->write_flags = 0;
846 
847         if (!multifd_new_send_channel_create(p, &local_err)) {
848             migrate_set_error(s, local_err);
849             ret = -1;
850         }
851     }
852 
853     /*
854      * Wait until channel creation has started for all channels. The
855      * creation can still fail, but no more channels will be created
856      * past this point.
857      */
858     for (i = 0; i < thread_count; i++) {
859         qemu_sem_wait(&multifd_send_state->channels_created);
860     }
861 
862     if (ret) {
863         goto err;
864     }
865 
866     for (i = 0; i < thread_count; i++) {
867         MultiFDSendParams *p = &multifd_send_state->params[i];
868         Error *local_err = NULL;
869 
870         ret = multifd_send_state->ops->send_setup(p, &local_err);
871         if (ret) {
872             migrate_set_error(s, local_err);
873             goto err;
874         }
875         assert(p->iov);
876     }
877 
878     return true;
879 
880 err:
881     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
882                       MIGRATION_STATUS_FAILED);
883     return false;
884 }
885 
886 bool multifd_recv(void)
887 {
888     int i;
889     static int next_recv_channel;
890     MultiFDRecvParams *p = NULL;
891     MultiFDRecvData *data = multifd_recv_state->data;
892 
893     /*
894      * next_channel can remain from a previous migration that was
895      * using more channels, so ensure it doesn't overflow if the
896      * limit is lower now.
897      */
898     next_recv_channel %= migrate_multifd_channels();
899     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
900         if (multifd_recv_should_exit()) {
901             return false;
902         }
903 
904         p = &multifd_recv_state->params[i];
905 
906         if (qatomic_read(&p->pending_job) == false) {
907             next_recv_channel = (i + 1) % migrate_multifd_channels();
908             break;
909         }
910     }
911 
912     /*
913      * Order pending_job read before manipulating p->data below. Pairs
914      * with qatomic_store_release() at multifd_recv_thread().
915      */
916     smp_mb_acquire();
917 
918     assert(!p->data->size);
919     multifd_recv_state->data = p->data;
920     p->data = data;
921 
922     /*
923      * Order p->data update before setting pending_job. Pairs with
924      * qatomic_load_acquire() at multifd_recv_thread().
925      */
926     qatomic_store_release(&p->pending_job, true);
927     qemu_sem_post(&p->sem);
928 
929     return true;
930 }
931 
932 MultiFDRecvData *multifd_get_recv_data(void)
933 {
934     return multifd_recv_state->data;
935 }
936 
937 static void multifd_recv_terminate_threads(Error *err)
938 {
939     int i;
940 
941     trace_multifd_recv_terminate_threads(err != NULL);
942 
943     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
944         return;
945     }
946 
947     if (err) {
948         MigrationState *s = migrate_get_current();
949         migrate_set_error(s, err);
950         if (s->state == MIGRATION_STATUS_SETUP ||
951             s->state == MIGRATION_STATUS_ACTIVE) {
952             migrate_set_state(&s->state, s->state,
953                               MIGRATION_STATUS_FAILED);
954         }
955     }
956 
957     for (i = 0; i < migrate_multifd_channels(); i++) {
958         MultiFDRecvParams *p = &multifd_recv_state->params[i];
959 
960         /*
961          * The migration thread and channels interact differently
962          * depending on the presence of packets.
963          */
964         if (multifd_use_packets()) {
965             /*
966              * The channel receives as long as there are packets. When
967              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
968              * channel waits for the migration thread to sync. If the
969              * sync never happens, do it here.
970              */
971             qemu_sem_post(&p->sem_sync);
972         } else {
973             /*
974              * The channel waits for the migration thread to give it
975              * work. When the migration thread runs out of work, it
976              * releases the channel and waits for any pending work to
977              * finish. If we reach here (e.g. due to error) before the
978              * work runs out, release the channel.
979              */
980             qemu_sem_post(&p->sem);
981         }
982 
983         /*
984          * We could arrive here for two reasons:
985          *  - normal quit, i.e. everything went fine, just finished
986          *  - error quit: We close the channels so the channel threads
987          *    finish the qio_channel_read_all_eof()
988          */
989         if (p->c) {
990             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
991         }
992     }
993 }
994 
995 void multifd_recv_shutdown(void)
996 {
997     if (migrate_multifd()) {
998         multifd_recv_terminate_threads(NULL);
999     }
1000 }
1001 
1002 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1003 {
1004     migration_ioc_unregister_yank(p->c);
1005     object_unref(OBJECT(p->c));
1006     p->c = NULL;
1007     qemu_mutex_destroy(&p->mutex);
1008     qemu_sem_destroy(&p->sem_sync);
1009     qemu_sem_destroy(&p->sem);
1010     g_free(p->data);
1011     p->data = NULL;
1012     g_free(p->name);
1013     p->name = NULL;
1014     p->packet_len = 0;
1015     g_free(p->packet);
1016     p->packet = NULL;
1017     g_free(p->normal);
1018     p->normal = NULL;
1019     g_free(p->zero);
1020     p->zero = NULL;
1021     multifd_recv_state->ops->recv_cleanup(p);
1022 }
1023 
1024 static void multifd_recv_cleanup_state(void)
1025 {
1026     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1027     g_free(multifd_recv_state->params);
1028     multifd_recv_state->params = NULL;
1029     g_free(multifd_recv_state->data);
1030     multifd_recv_state->data = NULL;
1031     g_free(multifd_recv_state);
1032     multifd_recv_state = NULL;
1033 }
1034 
1035 void multifd_recv_cleanup(void)
1036 {
1037     int i;
1038 
1039     if (!migrate_multifd()) {
1040         return;
1041     }
1042     multifd_recv_terminate_threads(NULL);
1043     for (i = 0; i < migrate_multifd_channels(); i++) {
1044         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1045 
1046         if (p->thread_created) {
1047             qemu_thread_join(&p->thread);
1048         }
1049     }
1050     for (i = 0; i < migrate_multifd_channels(); i++) {
1051         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1052     }
1053     multifd_recv_cleanup_state();
1054 }
1055 
1056 void multifd_recv_sync_main(void)
1057 {
1058     int thread_count = migrate_multifd_channels();
1059     bool file_based = !multifd_use_packets();
1060     int i;
1061 
1062     if (!migrate_multifd()) {
1063         return;
1064     }
1065 
1066     /*
1067      * File-based channels don't use packets and therefore need to
1068      * wait for more work. Release them to start the sync.
1069      */
1070     if (file_based) {
1071         for (i = 0; i < thread_count; i++) {
1072             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1073 
1074             trace_multifd_recv_sync_main_signal(p->id);
1075             qemu_sem_post(&p->sem);
1076         }
1077     }
1078 
1079     /*
1080      * Initiate the synchronization by waiting for all channels.
1081      *
1082      * For socket-based migration this means each channel has received
1083      * the SYNC packet on the stream.
1084      *
1085      * For file-based migration this means each channel is done with
1086      * the work (pending_job=false).
1087      */
1088     for (i = 0; i < thread_count; i++) {
1089         trace_multifd_recv_sync_main_wait(i);
1090         qemu_sem_wait(&multifd_recv_state->sem_sync);
1091     }
1092 
1093     if (file_based) {
1094         /*
1095          * For file-based loading is done in one iteration. We're
1096          * done.
1097          */
1098         return;
1099     }
1100 
1101     /*
1102      * Sync done. Release the channels for the next iteration.
1103      */
1104     for (i = 0; i < thread_count; i++) {
1105         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1106 
1107         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1108             if (multifd_recv_state->packet_num < p->packet_num) {
1109                 multifd_recv_state->packet_num = p->packet_num;
1110             }
1111         }
1112         trace_multifd_recv_sync_main_signal(p->id);
1113         qemu_sem_post(&p->sem_sync);
1114     }
1115     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1116 }
1117 
1118 static void *multifd_recv_thread(void *opaque)
1119 {
1120     MultiFDRecvParams *p = opaque;
1121     Error *local_err = NULL;
1122     bool use_packets = multifd_use_packets();
1123     int ret;
1124 
1125     trace_multifd_recv_thread_start(p->id);
1126     rcu_register_thread();
1127 
1128     while (true) {
1129         uint32_t flags = 0;
1130         bool has_data = false;
1131         p->normal_num = 0;
1132 
1133         if (use_packets) {
1134             if (multifd_recv_should_exit()) {
1135                 break;
1136             }
1137 
1138             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1139                                            p->packet_len, &local_err);
1140             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1141                 break;
1142             }
1143 
1144             qemu_mutex_lock(&p->mutex);
1145             ret = multifd_recv_unfill_packet(p, &local_err);
1146             if (ret) {
1147                 qemu_mutex_unlock(&p->mutex);
1148                 break;
1149             }
1150 
1151             flags = p->flags;
1152             /* recv methods don't know how to handle the SYNC flag */
1153             p->flags &= ~MULTIFD_FLAG_SYNC;
1154             if (!(flags & MULTIFD_FLAG_SYNC)) {
1155                 has_data = p->normal_num || p->zero_num;
1156             }
1157             qemu_mutex_unlock(&p->mutex);
1158         } else {
1159             /*
1160              * No packets, so we need to wait for the vmstate code to
1161              * give us work.
1162              */
1163             qemu_sem_wait(&p->sem);
1164 
1165             if (multifd_recv_should_exit()) {
1166                 break;
1167             }
1168 
1169             /* pairs with qatomic_store_release() at multifd_recv() */
1170             if (!qatomic_load_acquire(&p->pending_job)) {
1171                 /*
1172                  * Migration thread did not send work, this is
1173                  * equivalent to pending_sync on the sending
1174                  * side. Post sem_sync to notify we reached this
1175                  * point.
1176                  */
1177                 qemu_sem_post(&multifd_recv_state->sem_sync);
1178                 continue;
1179             }
1180 
1181             has_data = !!p->data->size;
1182         }
1183 
1184         if (has_data) {
1185             ret = multifd_recv_state->ops->recv(p, &local_err);
1186             if (ret != 0) {
1187                 break;
1188             }
1189         }
1190 
1191         if (use_packets) {
1192             if (flags & MULTIFD_FLAG_SYNC) {
1193                 qemu_sem_post(&multifd_recv_state->sem_sync);
1194                 qemu_sem_wait(&p->sem_sync);
1195             }
1196         } else {
1197             p->data->size = 0;
1198             /*
1199              * Order data->size update before clearing
1200              * pending_job. Pairs with smp_mb_acquire() at
1201              * multifd_recv().
1202              */
1203             qatomic_store_release(&p->pending_job, false);
1204         }
1205     }
1206 
1207     if (local_err) {
1208         multifd_recv_terminate_threads(local_err);
1209         error_free(local_err);
1210     }
1211 
1212     rcu_unregister_thread();
1213     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1214 
1215     return NULL;
1216 }
1217 
1218 int multifd_recv_setup(Error **errp)
1219 {
1220     int thread_count;
1221     uint32_t page_count = multifd_ram_page_count();
1222     bool use_packets = multifd_use_packets();
1223     uint8_t i;
1224 
1225     /*
1226      * Return successfully if multiFD recv state is already initialised
1227      * or multiFD is not enabled.
1228      */
1229     if (multifd_recv_state || !migrate_multifd()) {
1230         return 0;
1231     }
1232 
1233     thread_count = migrate_multifd_channels();
1234     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1235     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1236 
1237     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1238     multifd_recv_state->data->size = 0;
1239 
1240     qatomic_set(&multifd_recv_state->count, 0);
1241     qatomic_set(&multifd_recv_state->exiting, 0);
1242     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1243     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1244 
1245     for (i = 0; i < thread_count; i++) {
1246         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1247 
1248         qemu_mutex_init(&p->mutex);
1249         qemu_sem_init(&p->sem_sync, 0);
1250         qemu_sem_init(&p->sem, 0);
1251         p->pending_job = false;
1252         p->id = i;
1253 
1254         p->data = g_new0(MultiFDRecvData, 1);
1255         p->data->size = 0;
1256 
1257         if (use_packets) {
1258             p->packet_len = sizeof(MultiFDPacket_t)
1259                 + sizeof(uint64_t) * page_count;
1260             p->packet = g_malloc0(p->packet_len);
1261         }
1262         p->name = g_strdup_printf("mig/dst/recv_%d", i);
1263         p->normal = g_new0(ram_addr_t, page_count);
1264         p->zero = g_new0(ram_addr_t, page_count);
1265     }
1266 
1267     for (i = 0; i < thread_count; i++) {
1268         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1269         int ret;
1270 
1271         ret = multifd_recv_state->ops->recv_setup(p, errp);
1272         if (ret) {
1273             return ret;
1274         }
1275     }
1276     return 0;
1277 }
1278 
1279 bool multifd_recv_all_channels_created(void)
1280 {
1281     int thread_count = migrate_multifd_channels();
1282 
1283     if (!migrate_multifd()) {
1284         return true;
1285     }
1286 
1287     if (!multifd_recv_state) {
1288         /* Called before any connections created */
1289         return false;
1290     }
1291 
1292     return thread_count == qatomic_read(&multifd_recv_state->count);
1293 }
1294 
1295 /*
1296  * Try to receive all multifd channels to get ready for the migration.
1297  * Sets @errp when failing to receive the current channel.
1298  */
1299 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1300 {
1301     MultiFDRecvParams *p;
1302     Error *local_err = NULL;
1303     bool use_packets = multifd_use_packets();
1304     int id;
1305 
1306     if (use_packets) {
1307         id = multifd_recv_initial_packet(ioc, &local_err);
1308         if (id < 0) {
1309             multifd_recv_terminate_threads(local_err);
1310             error_propagate_prepend(errp, local_err,
1311                                     "failed to receive packet"
1312                                     " via multifd channel %d: ",
1313                                     qatomic_read(&multifd_recv_state->count));
1314             return;
1315         }
1316         trace_multifd_recv_new_channel(id);
1317     } else {
1318         id = qatomic_read(&multifd_recv_state->count);
1319     }
1320 
1321     p = &multifd_recv_state->params[id];
1322     if (p->c != NULL) {
1323         error_setg(&local_err, "multifd: received id '%d' already setup'",
1324                    id);
1325         multifd_recv_terminate_threads(local_err);
1326         error_propagate(errp, local_err);
1327         return;
1328     }
1329     p->c = ioc;
1330     object_ref(OBJECT(ioc));
1331 
1332     p->thread_created = true;
1333     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1334                        QEMU_THREAD_JOINABLE);
1335     qatomic_inc(&multifd_recv_state->count);
1336 }
1337