xref: /openbmc/qemu/migration/multifd.c (revision 7e4480dde246982f18b51d8f316a95a7dba2b825)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35 
36 /* Multiple fd's */
37 
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40 
41 typedef struct {
42     uint32_t magic;
43     uint32_t version;
44     unsigned char uuid[16]; /* QemuUUID */
45     uint8_t id;
46     uint8_t unused1[7];     /* Reserved for future use */
47     uint64_t unused2[4];    /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49 
50 struct {
51     MultiFDSendParams *params;
52     /*
53      * Global number of generated multifd packets.
54      *
55      * Note that we used 'uintptr_t' because it'll naturally support atomic
56      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
57      * multifd will overflow the packet_num easier, but that should be
58      * fine.
59      *
60      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61      * hosts, however so far it does not support atomic fetch_add() yet.
62      * Make it easy for now.
63      */
64     uintptr_t packet_num;
65     /*
66      * Synchronization point past which no more channels will be
67      * created.
68      */
69     QemuSemaphore channels_created;
70     /* send channels ready */
71     QemuSemaphore channels_ready;
72     /*
73      * Have we already run terminate threads.  There is a race when it
74      * happens that we got one error while we are exiting.
75      * We will use atomic operations.  Only valid values are 0 and 1.
76      */
77     int exiting;
78     /* multifd ops */
79     const MultiFDMethods *ops;
80 } *multifd_send_state;
81 
82 struct {
83     MultiFDRecvParams *params;
84     MultiFDRecvData *data;
85     /* number of created threads */
86     int count;
87     /*
88      * This is always posted by the recv threads, the migration thread
89      * uses it to wait for recv threads to finish assigned tasks.
90      */
91     QemuSemaphore sem_sync;
92     /* global number of generated multifd packets */
93     uint64_t packet_num;
94     int exiting;
95     /* multifd ops */
96     const MultiFDMethods *ops;
97 } *multifd_recv_state;
98 
multifd_send_data_alloc(void)99 MultiFDSendData *multifd_send_data_alloc(void)
100 {
101     size_t max_payload_size, size_minus_payload;
102 
103     /*
104      * MultiFDPages_t has a flexible array at the end, account for it
105      * when allocating MultiFDSendData. Use max() in case other types
106      * added to the union in the future are larger than
107      * (MultiFDPages_t + flex array).
108      */
109     max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
110 
111     /*
112      * Account for any holes the compiler might insert. We can't pack
113      * the structure because that misaligns the members and triggers
114      * Waddress-of-packed-member.
115      */
116     size_minus_payload = sizeof(MultiFDSendData) - sizeof(MultiFDPayload);
117 
118     return g_malloc0(size_minus_payload + max_payload_size);
119 }
120 
multifd_use_packets(void)121 static bool multifd_use_packets(void)
122 {
123     return !migrate_mapped_ram();
124 }
125 
multifd_send_channel_created(void)126 void multifd_send_channel_created(void)
127 {
128     qemu_sem_post(&multifd_send_state->channels_created);
129 }
130 
131 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
132 
multifd_register_ops(int method,const MultiFDMethods * ops)133 void multifd_register_ops(int method, const MultiFDMethods *ops)
134 {
135     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
136     assert(!multifd_ops[method]);
137     multifd_ops[method] = ops;
138 }
139 
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)140 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
141 {
142     MultiFDInit_t msg = {};
143     size_t size = sizeof(msg);
144     int ret;
145 
146     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
147     msg.version = cpu_to_be32(MULTIFD_VERSION);
148     msg.id = p->id;
149     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
150 
151     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
152     if (ret != 0) {
153         return -1;
154     }
155     stat64_add(&mig_stats.multifd_bytes, size);
156     return 0;
157 }
158 
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)159 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
160 {
161     MultiFDInit_t msg;
162     int ret;
163 
164     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
165     if (ret != 0) {
166         return -1;
167     }
168 
169     msg.magic = be32_to_cpu(msg.magic);
170     msg.version = be32_to_cpu(msg.version);
171 
172     if (msg.magic != MULTIFD_MAGIC) {
173         error_setg(errp, "multifd: received packet magic %x "
174                    "expected %x", msg.magic, MULTIFD_MAGIC);
175         return -1;
176     }
177 
178     if (msg.version != MULTIFD_VERSION) {
179         error_setg(errp, "multifd: received packet version %u "
180                    "expected %u", msg.version, MULTIFD_VERSION);
181         return -1;
182     }
183 
184     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
185         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
186         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
187 
188         error_setg(errp, "multifd: received uuid '%s' and expected "
189                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
190         g_free(uuid);
191         g_free(msg_uuid);
192         return -1;
193     }
194 
195     if (msg.id > migrate_multifd_channels()) {
196         error_setg(errp, "multifd: received channel id %u is greater than "
197                    "number of channels %u", msg.id, migrate_multifd_channels());
198         return -1;
199     }
200 
201     return msg.id;
202 }
203 
multifd_send_fill_packet(MultiFDSendParams * p)204 void multifd_send_fill_packet(MultiFDSendParams *p)
205 {
206     MultiFDPacket_t *packet = p->packet;
207     uint64_t packet_num;
208     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
209 
210     memset(packet, 0, p->packet_len);
211 
212     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
213     packet->version = cpu_to_be32(MULTIFD_VERSION);
214 
215     packet->flags = cpu_to_be32(p->flags);
216     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
217 
218     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
219     packet->packet_num = cpu_to_be64(packet_num);
220 
221     p->packets_sent++;
222 
223     if (!sync_packet) {
224         multifd_ram_fill_packet(p);
225     }
226 
227     trace_multifd_send_fill(p->id, packet_num,
228                             p->flags, p->next_packet_size);
229 }
230 
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)231 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
232 {
233     const MultiFDPacket_t *packet = p->packet;
234     uint32_t magic = be32_to_cpu(packet->magic);
235     uint32_t version = be32_to_cpu(packet->version);
236     int ret = 0;
237 
238     if (magic != MULTIFD_MAGIC) {
239         error_setg(errp, "multifd: received packet magic %x, expected %x",
240                    magic, MULTIFD_MAGIC);
241         return -1;
242     }
243 
244     if (version != MULTIFD_VERSION) {
245         error_setg(errp, "multifd: received packet version %u, expected %u",
246                    version, MULTIFD_VERSION);
247         return -1;
248     }
249 
250     p->flags = be32_to_cpu(packet->flags);
251     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
252     p->packet_num = be64_to_cpu(packet->packet_num);
253     p->packets_recved++;
254 
255     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
256     ret = multifd_ram_unfill_packet(p, errp);
257 
258     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
259                               p->next_packet_size);
260 
261     return ret;
262 }
263 
multifd_send_should_exit(void)264 static bool multifd_send_should_exit(void)
265 {
266     return qatomic_read(&multifd_send_state->exiting);
267 }
268 
multifd_recv_should_exit(void)269 static bool multifd_recv_should_exit(void)
270 {
271     return qatomic_read(&multifd_recv_state->exiting);
272 }
273 
274 /*
275  * The migration thread can wait on either of the two semaphores.  This
276  * function can be used to kick the main thread out of waiting on either of
277  * them.  Should mostly only be called when something wrong happened with
278  * the current multifd send thread.
279  */
multifd_send_kick_main(MultiFDSendParams * p)280 static void multifd_send_kick_main(MultiFDSendParams *p)
281 {
282     qemu_sem_post(&p->sem_sync);
283     qemu_sem_post(&multifd_send_state->channels_ready);
284 }
285 
286 /*
287  * multifd_send() works by exchanging the MultiFDSendData object
288  * provided by the caller with an unused MultiFDSendData object from
289  * the next channel that is found to be idle.
290  *
291  * The channel owns the data until it finishes transmitting and the
292  * caller owns the empty object until it fills it with data and calls
293  * this function again. No locking necessary.
294  *
295  * Switching is safe because both the migration thread and the channel
296  * thread have barriers in place to serialize access.
297  *
298  * Returns true if succeed, false otherwise.
299  */
multifd_send(MultiFDSendData ** send_data)300 bool multifd_send(MultiFDSendData **send_data)
301 {
302     int i;
303     static int next_channel;
304     MultiFDSendParams *p = NULL; /* make happy gcc */
305     MultiFDSendData *tmp;
306 
307     if (multifd_send_should_exit()) {
308         return false;
309     }
310 
311     /* We wait here, until at least one channel is ready */
312     qemu_sem_wait(&multifd_send_state->channels_ready);
313 
314     /*
315      * next_channel can remain from a previous migration that was
316      * using more channels, so ensure it doesn't overflow if the
317      * limit is lower now.
318      */
319     next_channel %= migrate_multifd_channels();
320     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
321         if (multifd_send_should_exit()) {
322             return false;
323         }
324         p = &multifd_send_state->params[i];
325         /*
326          * Lockless read to p->pending_job is safe, because only multifd
327          * sender thread can clear it.
328          */
329         if (qatomic_read(&p->pending_job) == false) {
330             next_channel = (i + 1) % migrate_multifd_channels();
331             break;
332         }
333     }
334 
335     /*
336      * Make sure we read p->pending_job before all the rest.  Pairs with
337      * qatomic_store_release() in multifd_send_thread().
338      */
339     smp_mb_acquire();
340 
341     assert(multifd_payload_empty(p->data));
342 
343     /*
344      * Swap the pointers. The channel gets the client data for
345      * transferring and the client gets back an unused data slot.
346      */
347     tmp = *send_data;
348     *send_data = p->data;
349     p->data = tmp;
350 
351     /*
352      * Making sure p->data is setup before marking pending_job=true. Pairs
353      * with the qatomic_load_acquire() in multifd_send_thread().
354      */
355     qatomic_store_release(&p->pending_job, true);
356     qemu_sem_post(&p->sem);
357 
358     return true;
359 }
360 
361 /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)362 static void multifd_send_set_error(Error *err)
363 {
364     /*
365      * We don't want to exit each threads twice.  Depending on where
366      * we get the error, or if there are two independent errors in two
367      * threads at the same time, we can end calling this function
368      * twice.
369      */
370     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
371         return;
372     }
373 
374     if (err) {
375         MigrationState *s = migrate_get_current();
376         migrate_set_error(s, err);
377         if (s->state == MIGRATION_STATUS_SETUP ||
378             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
379             s->state == MIGRATION_STATUS_DEVICE ||
380             s->state == MIGRATION_STATUS_ACTIVE) {
381             migrate_set_state(&s->state, s->state,
382                               MIGRATION_STATUS_FAILED);
383         }
384     }
385 }
386 
multifd_send_terminate_threads(void)387 static void multifd_send_terminate_threads(void)
388 {
389     int i;
390 
391     trace_multifd_send_terminate_threads();
392 
393     /*
394      * Tell everyone we're quitting.  No xchg() needed here; we simply
395      * always set it.
396      */
397     qatomic_set(&multifd_send_state->exiting, 1);
398 
399     /*
400      * Firstly, kick all threads out; no matter whether they are just idle,
401      * or blocked in an IO system call.
402      */
403     for (i = 0; i < migrate_multifd_channels(); i++) {
404         MultiFDSendParams *p = &multifd_send_state->params[i];
405 
406         qemu_sem_post(&p->sem);
407         if (p->c) {
408             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
409         }
410     }
411 
412     /*
413      * Finally recycle all the threads.
414      */
415     for (i = 0; i < migrate_multifd_channels(); i++) {
416         MultiFDSendParams *p = &multifd_send_state->params[i];
417 
418         if (p->tls_thread_created) {
419             qemu_thread_join(&p->tls_thread);
420         }
421 
422         if (p->thread_created) {
423             qemu_thread_join(&p->thread);
424         }
425     }
426 }
427 
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)428 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
429 {
430     if (p->c) {
431         migration_ioc_unregister_yank(p->c);
432         /*
433          * The object_unref() cannot guarantee the fd will always be
434          * released because finalize() of the iochannel is only
435          * triggered on the last reference and it's not guaranteed
436          * that we always hold the last refcount when reaching here.
437          *
438          * Closing the fd explicitly has the benefit that if there is any
439          * registered I/O handler callbacks on such fd, that will get a
440          * POLLNVAL event and will further trigger the cleanup to finally
441          * release the IOC.
442          *
443          * FIXME: It should logically be guaranteed that all multifd
444          * channels have no I/O handler callback registered when reaching
445          * here, because migration thread will wait for all multifd channel
446          * establishments to complete during setup.  Since
447          * migrate_fd_cleanup() will be scheduled in main thread too, all
448          * previous callbacks should guarantee to be completed when
449          * reaching here.  See multifd_send_state.channels_created and its
450          * usage.  In the future, we could replace this with an assert
451          * making sure we're the last reference, or simply drop it if above
452          * is more clear to be justified.
453          */
454         qio_channel_close(p->c, &error_abort);
455         object_unref(OBJECT(p->c));
456         p->c = NULL;
457     }
458     qemu_sem_destroy(&p->sem);
459     qemu_sem_destroy(&p->sem_sync);
460     g_free(p->name);
461     p->name = NULL;
462     g_free(p->data);
463     p->data = NULL;
464     p->packet_len = 0;
465     g_free(p->packet);
466     p->packet = NULL;
467     multifd_send_state->ops->send_cleanup(p, errp);
468     assert(!p->iov);
469 
470     return *errp == NULL;
471 }
472 
multifd_send_cleanup_state(void)473 static void multifd_send_cleanup_state(void)
474 {
475     file_cleanup_outgoing_migration();
476     socket_cleanup_outgoing_migration();
477     qemu_sem_destroy(&multifd_send_state->channels_created);
478     qemu_sem_destroy(&multifd_send_state->channels_ready);
479     g_free(multifd_send_state->params);
480     multifd_send_state->params = NULL;
481     g_free(multifd_send_state);
482     multifd_send_state = NULL;
483 }
484 
multifd_send_shutdown(void)485 void multifd_send_shutdown(void)
486 {
487     int i;
488 
489     if (!migrate_multifd()) {
490         return;
491     }
492 
493     multifd_send_terminate_threads();
494 
495     for (i = 0; i < migrate_multifd_channels(); i++) {
496         MultiFDSendParams *p = &multifd_send_state->params[i];
497         Error *local_err = NULL;
498 
499         if (!multifd_send_cleanup_channel(p, &local_err)) {
500             migrate_set_error(migrate_get_current(), local_err);
501             error_free(local_err);
502         }
503     }
504 
505     multifd_send_cleanup_state();
506 }
507 
multifd_zero_copy_flush(QIOChannel * c)508 static int multifd_zero_copy_flush(QIOChannel *c)
509 {
510     int ret;
511     Error *err = NULL;
512 
513     ret = qio_channel_flush(c, &err);
514     if (ret < 0) {
515         error_report_err(err);
516         return -1;
517     }
518     if (ret == 1) {
519         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
520     }
521 
522     return ret;
523 }
524 
multifd_send_sync_main(void)525 int multifd_send_sync_main(void)
526 {
527     int i;
528     bool flush_zero_copy;
529 
530     flush_zero_copy = migrate_zero_copy_send();
531 
532     for (i = 0; i < migrate_multifd_channels(); i++) {
533         MultiFDSendParams *p = &multifd_send_state->params[i];
534 
535         if (multifd_send_should_exit()) {
536             return -1;
537         }
538 
539         trace_multifd_send_sync_main_signal(p->id);
540 
541         /*
542          * We should be the only user so far, so not possible to be set by
543          * others concurrently.
544          */
545         assert(qatomic_read(&p->pending_sync) == false);
546         qatomic_set(&p->pending_sync, true);
547         qemu_sem_post(&p->sem);
548     }
549     for (i = 0; i < migrate_multifd_channels(); i++) {
550         MultiFDSendParams *p = &multifd_send_state->params[i];
551 
552         if (multifd_send_should_exit()) {
553             return -1;
554         }
555 
556         qemu_sem_wait(&multifd_send_state->channels_ready);
557         trace_multifd_send_sync_main_wait(p->id);
558         qemu_sem_wait(&p->sem_sync);
559 
560         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
561             return -1;
562         }
563     }
564     trace_multifd_send_sync_main(multifd_send_state->packet_num);
565 
566     return 0;
567 }
568 
multifd_send_thread(void * opaque)569 static void *multifd_send_thread(void *opaque)
570 {
571     MultiFDSendParams *p = opaque;
572     MigrationThread *thread = NULL;
573     Error *local_err = NULL;
574     int ret = 0;
575     bool use_packets = multifd_use_packets();
576 
577     thread = migration_threads_add(p->name, qemu_get_thread_id());
578 
579     trace_multifd_send_thread_start(p->id);
580     rcu_register_thread();
581 
582     if (use_packets) {
583         if (multifd_send_initial_packet(p, &local_err) < 0) {
584             ret = -1;
585             goto out;
586         }
587     }
588 
589     while (true) {
590         qemu_sem_post(&multifd_send_state->channels_ready);
591         qemu_sem_wait(&p->sem);
592 
593         if (multifd_send_should_exit()) {
594             break;
595         }
596 
597         /*
598          * Read pending_job flag before p->data.  Pairs with the
599          * qatomic_store_release() in multifd_send().
600          */
601         if (qatomic_load_acquire(&p->pending_job)) {
602             p->flags = 0;
603             p->iovs_num = 0;
604             assert(!multifd_payload_empty(p->data));
605 
606             ret = multifd_send_state->ops->send_prepare(p, &local_err);
607             if (ret != 0) {
608                 break;
609             }
610 
611             if (migrate_mapped_ram()) {
612                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
613                                               &p->data->u.ram, &local_err);
614             } else {
615                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
616                                                   NULL, 0, p->write_flags,
617                                                   &local_err);
618             }
619 
620             if (ret != 0) {
621                 break;
622             }
623 
624             stat64_add(&mig_stats.multifd_bytes,
625                        (uint64_t)p->next_packet_size + p->packet_len);
626 
627             p->next_packet_size = 0;
628             multifd_set_payload_type(p->data, MULTIFD_PAYLOAD_NONE);
629 
630             /*
631              * Making sure p->data is published before saying "we're
632              * free".  Pairs with the smp_mb_acquire() in
633              * multifd_send().
634              */
635             qatomic_store_release(&p->pending_job, false);
636         } else {
637             /*
638              * If not a normal job, must be a sync request.  Note that
639              * pending_sync is a standalone flag (unlike pending_job), so
640              * it doesn't require explicit memory barriers.
641              */
642             assert(qatomic_read(&p->pending_sync));
643 
644             if (use_packets) {
645                 p->flags = MULTIFD_FLAG_SYNC;
646                 multifd_send_fill_packet(p);
647                 ret = qio_channel_write_all(p->c, (void *)p->packet,
648                                             p->packet_len, &local_err);
649                 if (ret != 0) {
650                     break;
651                 }
652                 /* p->next_packet_size will always be zero for a SYNC packet */
653                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
654             }
655 
656             qatomic_set(&p->pending_sync, false);
657             qemu_sem_post(&p->sem_sync);
658         }
659     }
660 
661 out:
662     if (ret) {
663         assert(local_err);
664         trace_multifd_send_error(p->id);
665         multifd_send_set_error(local_err);
666         multifd_send_kick_main(p);
667         error_free(local_err);
668     }
669 
670     rcu_unregister_thread();
671     migration_threads_remove(thread);
672     trace_multifd_send_thread_end(p->id, p->packets_sent);
673 
674     return NULL;
675 }
676 
677 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
678 
679 typedef struct {
680     MultiFDSendParams *p;
681     QIOChannelTLS *tioc;
682 } MultiFDTLSThreadArgs;
683 
multifd_tls_handshake_thread(void * opaque)684 static void *multifd_tls_handshake_thread(void *opaque)
685 {
686     MultiFDTLSThreadArgs *args = opaque;
687 
688     qio_channel_tls_handshake(args->tioc,
689                               multifd_new_send_channel_async,
690                               args->p,
691                               NULL,
692                               NULL);
693     g_free(args);
694 
695     return NULL;
696 }
697 
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)698 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
699                                         QIOChannel *ioc,
700                                         Error **errp)
701 {
702     MigrationState *s = migrate_get_current();
703     const char *hostname = s->hostname;
704     MultiFDTLSThreadArgs *args;
705     QIOChannelTLS *tioc;
706 
707     tioc = migration_tls_client_create(ioc, hostname, errp);
708     if (!tioc) {
709         return false;
710     }
711 
712     /*
713      * Ownership of the socket channel now transfers to the newly
714      * created TLS channel, which has already taken a reference.
715      */
716     object_unref(OBJECT(ioc));
717     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
718     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
719 
720     args = g_new0(MultiFDTLSThreadArgs, 1);
721     args->tioc = tioc;
722     args->p = p;
723 
724     p->tls_thread_created = true;
725     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
726                        multifd_tls_handshake_thread, args,
727                        QEMU_THREAD_JOINABLE);
728     return true;
729 }
730 
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)731 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
732 {
733     qio_channel_set_delay(ioc, false);
734 
735     migration_ioc_register_yank(ioc);
736     /* Setup p->c only if the channel is completely setup */
737     p->c = ioc;
738 
739     p->thread_created = true;
740     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
741                        QEMU_THREAD_JOINABLE);
742 }
743 
744 /*
745  * When TLS is enabled this function is called once to establish the
746  * TLS connection and a second time after the TLS handshake to create
747  * the multifd channel. Without TLS it goes straight into the channel
748  * creation.
749  */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)750 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
751 {
752     MultiFDSendParams *p = opaque;
753     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
754     Error *local_err = NULL;
755     bool ret;
756 
757     trace_multifd_new_send_channel_async(p->id);
758 
759     if (qio_task_propagate_error(task, &local_err)) {
760         ret = false;
761         goto out;
762     }
763 
764     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
765                                        migrate_get_current()->hostname);
766 
767     if (migrate_channel_requires_tls_upgrade(ioc)) {
768         ret = multifd_tls_channel_connect(p, ioc, &local_err);
769         if (ret) {
770             return;
771         }
772     } else {
773         multifd_channel_connect(p, ioc);
774         ret = true;
775     }
776 
777 out:
778     /*
779      * Here we're not interested whether creation succeeded, only that
780      * it happened at all.
781      */
782     multifd_send_channel_created();
783 
784     if (ret) {
785         return;
786     }
787 
788     trace_multifd_new_send_channel_async_error(p->id, local_err);
789     multifd_send_set_error(local_err);
790     /*
791      * For error cases (TLS or non-TLS), IO channel is always freed here
792      * rather than when cleanup multifd: since p->c is not set, multifd
793      * cleanup code doesn't even know its existence.
794      */
795     object_unref(OBJECT(ioc));
796     error_free(local_err);
797 }
798 
multifd_new_send_channel_create(gpointer opaque,Error ** errp)799 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
800 {
801     if (!multifd_use_packets()) {
802         return file_send_channel_create(opaque, errp);
803     }
804 
805     socket_send_channel_create(multifd_new_send_channel_async, opaque);
806     return true;
807 }
808 
multifd_send_setup(void)809 bool multifd_send_setup(void)
810 {
811     MigrationState *s = migrate_get_current();
812     int thread_count, ret = 0;
813     uint32_t page_count = multifd_ram_page_count();
814     bool use_packets = multifd_use_packets();
815     uint8_t i;
816 
817     if (!migrate_multifd()) {
818         return true;
819     }
820 
821     thread_count = migrate_multifd_channels();
822     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
823     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
824     qemu_sem_init(&multifd_send_state->channels_created, 0);
825     qemu_sem_init(&multifd_send_state->channels_ready, 0);
826     qatomic_set(&multifd_send_state->exiting, 0);
827     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
828 
829     for (i = 0; i < thread_count; i++) {
830         MultiFDSendParams *p = &multifd_send_state->params[i];
831         Error *local_err = NULL;
832 
833         qemu_sem_init(&p->sem, 0);
834         qemu_sem_init(&p->sem_sync, 0);
835         p->id = i;
836         p->data = multifd_send_data_alloc();
837 
838         if (use_packets) {
839             p->packet_len = sizeof(MultiFDPacket_t)
840                           + sizeof(uint64_t) * page_count;
841             p->packet = g_malloc0(p->packet_len);
842         }
843         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
844         p->write_flags = 0;
845 
846         if (!multifd_new_send_channel_create(p, &local_err)) {
847             migrate_set_error(s, local_err);
848             ret = -1;
849         }
850     }
851 
852     /*
853      * Wait until channel creation has started for all channels. The
854      * creation can still fail, but no more channels will be created
855      * past this point.
856      */
857     for (i = 0; i < thread_count; i++) {
858         qemu_sem_wait(&multifd_send_state->channels_created);
859     }
860 
861     if (ret) {
862         goto err;
863     }
864 
865     for (i = 0; i < thread_count; i++) {
866         MultiFDSendParams *p = &multifd_send_state->params[i];
867         Error *local_err = NULL;
868 
869         ret = multifd_send_state->ops->send_setup(p, &local_err);
870         if (ret) {
871             migrate_set_error(s, local_err);
872             goto err;
873         }
874         assert(p->iov);
875     }
876 
877     return true;
878 
879 err:
880     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
881                       MIGRATION_STATUS_FAILED);
882     return false;
883 }
884 
multifd_recv(void)885 bool multifd_recv(void)
886 {
887     int i;
888     static int next_recv_channel;
889     MultiFDRecvParams *p = NULL;
890     MultiFDRecvData *data = multifd_recv_state->data;
891 
892     /*
893      * next_channel can remain from a previous migration that was
894      * using more channels, so ensure it doesn't overflow if the
895      * limit is lower now.
896      */
897     next_recv_channel %= migrate_multifd_channels();
898     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
899         if (multifd_recv_should_exit()) {
900             return false;
901         }
902 
903         p = &multifd_recv_state->params[i];
904 
905         if (qatomic_read(&p->pending_job) == false) {
906             next_recv_channel = (i + 1) % migrate_multifd_channels();
907             break;
908         }
909     }
910 
911     /*
912      * Order pending_job read before manipulating p->data below. Pairs
913      * with qatomic_store_release() at multifd_recv_thread().
914      */
915     smp_mb_acquire();
916 
917     assert(!p->data->size);
918     multifd_recv_state->data = p->data;
919     p->data = data;
920 
921     /*
922      * Order p->data update before setting pending_job. Pairs with
923      * qatomic_load_acquire() at multifd_recv_thread().
924      */
925     qatomic_store_release(&p->pending_job, true);
926     qemu_sem_post(&p->sem);
927 
928     return true;
929 }
930 
multifd_get_recv_data(void)931 MultiFDRecvData *multifd_get_recv_data(void)
932 {
933     return multifd_recv_state->data;
934 }
935 
multifd_recv_terminate_threads(Error * err)936 static void multifd_recv_terminate_threads(Error *err)
937 {
938     int i;
939 
940     trace_multifd_recv_terminate_threads(err != NULL);
941 
942     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
943         return;
944     }
945 
946     if (err) {
947         MigrationState *s = migrate_get_current();
948         migrate_set_error(s, err);
949         if (s->state == MIGRATION_STATUS_SETUP ||
950             s->state == MIGRATION_STATUS_ACTIVE) {
951             migrate_set_state(&s->state, s->state,
952                               MIGRATION_STATUS_FAILED);
953         }
954     }
955 
956     for (i = 0; i < migrate_multifd_channels(); i++) {
957         MultiFDRecvParams *p = &multifd_recv_state->params[i];
958 
959         /*
960          * The migration thread and channels interact differently
961          * depending on the presence of packets.
962          */
963         if (multifd_use_packets()) {
964             /*
965              * The channel receives as long as there are packets. When
966              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
967              * channel waits for the migration thread to sync. If the
968              * sync never happens, do it here.
969              */
970             qemu_sem_post(&p->sem_sync);
971         } else {
972             /*
973              * The channel waits for the migration thread to give it
974              * work. When the migration thread runs out of work, it
975              * releases the channel and waits for any pending work to
976              * finish. If we reach here (e.g. due to error) before the
977              * work runs out, release the channel.
978              */
979             qemu_sem_post(&p->sem);
980         }
981 
982         /*
983          * We could arrive here for two reasons:
984          *  - normal quit, i.e. everything went fine, just finished
985          *  - error quit: We close the channels so the channel threads
986          *    finish the qio_channel_read_all_eof()
987          */
988         if (p->c) {
989             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
990         }
991     }
992 }
993 
multifd_recv_shutdown(void)994 void multifd_recv_shutdown(void)
995 {
996     if (migrate_multifd()) {
997         multifd_recv_terminate_threads(NULL);
998     }
999 }
1000 
multifd_recv_cleanup_channel(MultiFDRecvParams * p)1001 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1002 {
1003     migration_ioc_unregister_yank(p->c);
1004     object_unref(OBJECT(p->c));
1005     p->c = NULL;
1006     qemu_mutex_destroy(&p->mutex);
1007     qemu_sem_destroy(&p->sem_sync);
1008     qemu_sem_destroy(&p->sem);
1009     g_free(p->data);
1010     p->data = NULL;
1011     g_free(p->name);
1012     p->name = NULL;
1013     p->packet_len = 0;
1014     g_free(p->packet);
1015     p->packet = NULL;
1016     g_free(p->normal);
1017     p->normal = NULL;
1018     g_free(p->zero);
1019     p->zero = NULL;
1020     multifd_recv_state->ops->recv_cleanup(p);
1021 }
1022 
multifd_recv_cleanup_state(void)1023 static void multifd_recv_cleanup_state(void)
1024 {
1025     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1026     g_free(multifd_recv_state->params);
1027     multifd_recv_state->params = NULL;
1028     g_free(multifd_recv_state->data);
1029     multifd_recv_state->data = NULL;
1030     g_free(multifd_recv_state);
1031     multifd_recv_state = NULL;
1032 }
1033 
multifd_recv_cleanup(void)1034 void multifd_recv_cleanup(void)
1035 {
1036     int i;
1037 
1038     if (!migrate_multifd()) {
1039         return;
1040     }
1041     multifd_recv_terminate_threads(NULL);
1042     for (i = 0; i < migrate_multifd_channels(); i++) {
1043         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1044 
1045         if (p->thread_created) {
1046             qemu_thread_join(&p->thread);
1047         }
1048     }
1049     for (i = 0; i < migrate_multifd_channels(); i++) {
1050         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1051     }
1052     multifd_recv_cleanup_state();
1053 }
1054 
multifd_recv_sync_main(void)1055 void multifd_recv_sync_main(void)
1056 {
1057     int thread_count = migrate_multifd_channels();
1058     bool file_based = !multifd_use_packets();
1059     int i;
1060 
1061     if (!migrate_multifd()) {
1062         return;
1063     }
1064 
1065     /*
1066      * File-based channels don't use packets and therefore need to
1067      * wait for more work. Release them to start the sync.
1068      */
1069     if (file_based) {
1070         for (i = 0; i < thread_count; i++) {
1071             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1072 
1073             trace_multifd_recv_sync_main_signal(p->id);
1074             qemu_sem_post(&p->sem);
1075         }
1076     }
1077 
1078     /*
1079      * Initiate the synchronization by waiting for all channels.
1080      *
1081      * For socket-based migration this means each channel has received
1082      * the SYNC packet on the stream.
1083      *
1084      * For file-based migration this means each channel is done with
1085      * the work (pending_job=false).
1086      */
1087     for (i = 0; i < thread_count; i++) {
1088         trace_multifd_recv_sync_main_wait(i);
1089         qemu_sem_wait(&multifd_recv_state->sem_sync);
1090     }
1091 
1092     if (file_based) {
1093         /*
1094          * For file-based loading is done in one iteration. We're
1095          * done.
1096          */
1097         return;
1098     }
1099 
1100     /*
1101      * Sync done. Release the channels for the next iteration.
1102      */
1103     for (i = 0; i < thread_count; i++) {
1104         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1105 
1106         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1107             if (multifd_recv_state->packet_num < p->packet_num) {
1108                 multifd_recv_state->packet_num = p->packet_num;
1109             }
1110         }
1111         trace_multifd_recv_sync_main_signal(p->id);
1112         qemu_sem_post(&p->sem_sync);
1113     }
1114     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1115 }
1116 
multifd_recv_thread(void * opaque)1117 static void *multifd_recv_thread(void *opaque)
1118 {
1119     MultiFDRecvParams *p = opaque;
1120     Error *local_err = NULL;
1121     bool use_packets = multifd_use_packets();
1122     int ret;
1123 
1124     trace_multifd_recv_thread_start(p->id);
1125     rcu_register_thread();
1126 
1127     while (true) {
1128         uint32_t flags = 0;
1129         bool has_data = false;
1130         p->normal_num = 0;
1131 
1132         if (use_packets) {
1133             if (multifd_recv_should_exit()) {
1134                 break;
1135             }
1136 
1137             ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1138                                            p->packet_len, &local_err);
1139             if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
1140                 break;
1141             }
1142 
1143             qemu_mutex_lock(&p->mutex);
1144             ret = multifd_recv_unfill_packet(p, &local_err);
1145             if (ret) {
1146                 qemu_mutex_unlock(&p->mutex);
1147                 break;
1148             }
1149 
1150             flags = p->flags;
1151             /* recv methods don't know how to handle the SYNC flag */
1152             p->flags &= ~MULTIFD_FLAG_SYNC;
1153 
1154             /*
1155              * Even if it's a SYNC packet, this needs to be set
1156              * because older QEMUs (<9.0) still send data along with
1157              * the SYNC packet.
1158              */
1159             has_data = p->normal_num || p->zero_num;
1160             qemu_mutex_unlock(&p->mutex);
1161         } else {
1162             /*
1163              * No packets, so we need to wait for the vmstate code to
1164              * give us work.
1165              */
1166             qemu_sem_wait(&p->sem);
1167 
1168             if (multifd_recv_should_exit()) {
1169                 break;
1170             }
1171 
1172             /* pairs with qatomic_store_release() at multifd_recv() */
1173             if (!qatomic_load_acquire(&p->pending_job)) {
1174                 /*
1175                  * Migration thread did not send work, this is
1176                  * equivalent to pending_sync on the sending
1177                  * side. Post sem_sync to notify we reached this
1178                  * point.
1179                  */
1180                 qemu_sem_post(&multifd_recv_state->sem_sync);
1181                 continue;
1182             }
1183 
1184             has_data = !!p->data->size;
1185         }
1186 
1187         if (has_data) {
1188             ret = multifd_recv_state->ops->recv(p, &local_err);
1189             if (ret != 0) {
1190                 break;
1191             }
1192         }
1193 
1194         if (use_packets) {
1195             if (flags & MULTIFD_FLAG_SYNC) {
1196                 qemu_sem_post(&multifd_recv_state->sem_sync);
1197                 qemu_sem_wait(&p->sem_sync);
1198             }
1199         } else {
1200             p->data->size = 0;
1201             /*
1202              * Order data->size update before clearing
1203              * pending_job. Pairs with smp_mb_acquire() at
1204              * multifd_recv().
1205              */
1206             qatomic_store_release(&p->pending_job, false);
1207         }
1208     }
1209 
1210     if (local_err) {
1211         multifd_recv_terminate_threads(local_err);
1212         error_free(local_err);
1213     }
1214 
1215     rcu_unregister_thread();
1216     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1217 
1218     return NULL;
1219 }
1220 
multifd_recv_setup(Error ** errp)1221 int multifd_recv_setup(Error **errp)
1222 {
1223     int thread_count;
1224     uint32_t page_count = multifd_ram_page_count();
1225     bool use_packets = multifd_use_packets();
1226     uint8_t i;
1227 
1228     /*
1229      * Return successfully if multiFD recv state is already initialised
1230      * or multiFD is not enabled.
1231      */
1232     if (multifd_recv_state || !migrate_multifd()) {
1233         return 0;
1234     }
1235 
1236     thread_count = migrate_multifd_channels();
1237     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1238     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1239 
1240     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1241     multifd_recv_state->data->size = 0;
1242 
1243     qatomic_set(&multifd_recv_state->count, 0);
1244     qatomic_set(&multifd_recv_state->exiting, 0);
1245     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1246     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1247 
1248     for (i = 0; i < thread_count; i++) {
1249         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1250 
1251         qemu_mutex_init(&p->mutex);
1252         qemu_sem_init(&p->sem_sync, 0);
1253         qemu_sem_init(&p->sem, 0);
1254         p->pending_job = false;
1255         p->id = i;
1256 
1257         p->data = g_new0(MultiFDRecvData, 1);
1258         p->data->size = 0;
1259 
1260         if (use_packets) {
1261             p->packet_len = sizeof(MultiFDPacket_t)
1262                 + sizeof(uint64_t) * page_count;
1263             p->packet = g_malloc0(p->packet_len);
1264         }
1265         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1266         p->normal = g_new0(ram_addr_t, page_count);
1267         p->zero = g_new0(ram_addr_t, page_count);
1268     }
1269 
1270     for (i = 0; i < thread_count; i++) {
1271         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1272         int ret;
1273 
1274         ret = multifd_recv_state->ops->recv_setup(p, errp);
1275         if (ret) {
1276             return ret;
1277         }
1278     }
1279     return 0;
1280 }
1281 
multifd_recv_all_channels_created(void)1282 bool multifd_recv_all_channels_created(void)
1283 {
1284     int thread_count = migrate_multifd_channels();
1285 
1286     if (!migrate_multifd()) {
1287         return true;
1288     }
1289 
1290     if (!multifd_recv_state) {
1291         /* Called before any connections created */
1292         return false;
1293     }
1294 
1295     return thread_count == qatomic_read(&multifd_recv_state->count);
1296 }
1297 
1298 /*
1299  * Try to receive all multifd channels to get ready for the migration.
1300  * Sets @errp when failing to receive the current channel.
1301  */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)1302 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1303 {
1304     MultiFDRecvParams *p;
1305     Error *local_err = NULL;
1306     bool use_packets = multifd_use_packets();
1307     int id;
1308 
1309     if (use_packets) {
1310         id = multifd_recv_initial_packet(ioc, &local_err);
1311         if (id < 0) {
1312             multifd_recv_terminate_threads(local_err);
1313             error_propagate_prepend(errp, local_err,
1314                                     "failed to receive packet"
1315                                     " via multifd channel %d: ",
1316                                     qatomic_read(&multifd_recv_state->count));
1317             return;
1318         }
1319         trace_multifd_recv_new_channel(id);
1320     } else {
1321         id = qatomic_read(&multifd_recv_state->count);
1322     }
1323 
1324     p = &multifd_recv_state->params[id];
1325     if (p->c != NULL) {
1326         error_setg(&local_err, "multifd: received id '%d' already setup'",
1327                    id);
1328         multifd_recv_terminate_threads(local_err);
1329         error_propagate(errp, local_err);
1330         return;
1331     }
1332     p->c = ioc;
1333     object_ref(OBJECT(ioc));
1334 
1335     p->thread_created = true;
1336     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1337                        QEMU_THREAD_JOINABLE);
1338     qatomic_inc(&multifd_recv_state->count);
1339 }
1340