xref: /openbmc/qemu/migration/multifd.c (revision 7484d61bdbc88923969d8d0e2e26e8ba0935f53a)
1 /*
2  * Multifd common code
3  *
4  * Copyright (c) 2019-2020 Red Hat Inc
5  *
6  * Authors:
7  *  Juan Quintela <quintela@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  */
12 
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "exec/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38 
39 /* Multiple fd's */
40 
41 #define MULTIFD_MAGIC 0x11223344U
42 #define MULTIFD_VERSION 1
43 
44 typedef struct {
45     uint32_t magic;
46     uint32_t version;
47     unsigned char uuid[16]; /* QemuUUID */
48     uint8_t id;
49     uint8_t unused1[7];     /* Reserved for future use */
50     uint64_t unused2[4];    /* Reserved for future use */
51 } __attribute__((packed)) MultiFDInit_t;
52 
53 struct {
54     MultiFDSendParams *params;
55 
56     /* multifd_send() body is not thread safe, needs serialization */
57     QemuMutex multifd_send_mutex;
58 
59     /*
60      * Global number of generated multifd packets.
61      *
62      * Note that we used 'uintptr_t' because it'll naturally support atomic
63      * operations on both 32bit / 64 bits hosts.  It means on 32bit systems
64      * multifd will overflow the packet_num easier, but that should be
65      * fine.
66      *
67      * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
68      * hosts, however so far it does not support atomic fetch_add() yet.
69      * Make it easy for now.
70      */
71     uintptr_t packet_num;
72     /*
73      * Synchronization point past which no more channels will be
74      * created.
75      */
76     QemuSemaphore channels_created;
77     /* send channels ready */
78     QemuSemaphore channels_ready;
79     /*
80      * Have we already run terminate threads.  There is a race when it
81      * happens that we got one error while we are exiting.
82      * We will use atomic operations.  Only valid values are 0 and 1.
83      */
84     int exiting;
85     /* multifd ops */
86     const MultiFDMethods *ops;
87 } *multifd_send_state;
88 
89 struct {
90     MultiFDRecvParams *params;
91     MultiFDRecvData *data;
92     /* number of created threads */
93     int count;
94     /*
95      * This is always posted by the recv threads, the migration thread
96      * uses it to wait for recv threads to finish assigned tasks.
97      */
98     QemuSemaphore sem_sync;
99     /* global number of generated multifd packets */
100     uint64_t packet_num;
101     int exiting;
102     /* multifd ops */
103     const MultiFDMethods *ops;
104 } *multifd_recv_state;
105 
multifd_send_data_alloc(void)106 MultiFDSendData *multifd_send_data_alloc(void)
107 {
108     MultiFDSendData *new = g_new0(MultiFDSendData, 1);
109 
110     multifd_ram_payload_alloc(&new->u.ram);
111     /* Device state allocates its payload on-demand */
112 
113     return new;
114 }
115 
multifd_send_data_clear(MultiFDSendData * data)116 void multifd_send_data_clear(MultiFDSendData *data)
117 {
118     if (multifd_payload_empty(data)) {
119         return;
120     }
121 
122     switch (data->type) {
123     case MULTIFD_PAYLOAD_DEVICE_STATE:
124         multifd_send_data_clear_device_state(&data->u.device_state);
125         break;
126     default:
127         /* Nothing to do */
128         break;
129     }
130 
131     data->type = MULTIFD_PAYLOAD_NONE;
132 }
133 
multifd_send_data_free(MultiFDSendData * data)134 void multifd_send_data_free(MultiFDSendData *data)
135 {
136     if (!data) {
137         return;
138     }
139 
140     /* This also free's device state payload */
141     multifd_send_data_clear(data);
142 
143     multifd_ram_payload_free(&data->u.ram);
144 
145     g_free(data);
146 }
147 
multifd_use_packets(void)148 static bool multifd_use_packets(void)
149 {
150     return !migrate_mapped_ram();
151 }
152 
multifd_send_channel_created(void)153 void multifd_send_channel_created(void)
154 {
155     qemu_sem_post(&multifd_send_state->channels_created);
156 }
157 
158 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
159 
multifd_register_ops(int method,const MultiFDMethods * ops)160 void multifd_register_ops(int method, const MultiFDMethods *ops)
161 {
162     assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
163     assert(!multifd_ops[method]);
164     multifd_ops[method] = ops;
165 }
166 
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)167 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
168 {
169     MultiFDInit_t msg = {};
170     size_t size = sizeof(msg);
171     int ret;
172 
173     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
174     msg.version = cpu_to_be32(MULTIFD_VERSION);
175     msg.id = p->id;
176     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
177 
178     ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
179     if (ret != 0) {
180         return -1;
181     }
182     stat64_add(&mig_stats.multifd_bytes, size);
183     return 0;
184 }
185 
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)186 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
187 {
188     MultiFDInit_t msg;
189     int ret;
190 
191     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
192     if (ret != 0) {
193         return -1;
194     }
195 
196     msg.magic = be32_to_cpu(msg.magic);
197     msg.version = be32_to_cpu(msg.version);
198 
199     if (msg.magic != MULTIFD_MAGIC) {
200         error_setg(errp, "multifd: received packet magic %x "
201                    "expected %x", msg.magic, MULTIFD_MAGIC);
202         return -1;
203     }
204 
205     if (msg.version != MULTIFD_VERSION) {
206         error_setg(errp, "multifd: received packet version %u "
207                    "expected %u", msg.version, MULTIFD_VERSION);
208         return -1;
209     }
210 
211     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
212         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
213         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
214 
215         error_setg(errp, "multifd: received uuid '%s' and expected "
216                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
217         g_free(uuid);
218         g_free(msg_uuid);
219         return -1;
220     }
221 
222     if (msg.id > migrate_multifd_channels()) {
223         error_setg(errp, "multifd: received channel id %u is greater than "
224                    "number of channels %u", msg.id, migrate_multifd_channels());
225         return -1;
226     }
227 
228     return msg.id;
229 }
230 
231 /* Fills a RAM multifd packet */
multifd_send_fill_packet(MultiFDSendParams * p)232 void multifd_send_fill_packet(MultiFDSendParams *p)
233 {
234     MultiFDPacket_t *packet = p->packet;
235     uint64_t packet_num;
236     bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
237 
238     memset(packet, 0, p->packet_len);
239 
240     packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
241     packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
242 
243     packet->hdr.flags = cpu_to_be32(p->flags);
244     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
245 
246     packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
247     packet->packet_num = cpu_to_be64(packet_num);
248 
249     p->packets_sent++;
250 
251     if (!sync_packet) {
252         multifd_ram_fill_packet(p);
253     }
254 
255     trace_multifd_send_fill(p->id, packet_num,
256                             p->flags, p->next_packet_size);
257 }
258 
multifd_recv_unfill_packet_header(MultiFDRecvParams * p,const MultiFDPacketHdr_t * hdr,Error ** errp)259 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
260                                              const MultiFDPacketHdr_t *hdr,
261                                              Error **errp)
262 {
263     uint32_t magic = be32_to_cpu(hdr->magic);
264     uint32_t version = be32_to_cpu(hdr->version);
265 
266     if (magic != MULTIFD_MAGIC) {
267         error_setg(errp, "multifd: received packet magic %x, expected %x",
268                    magic, MULTIFD_MAGIC);
269         return -1;
270     }
271 
272     if (version != MULTIFD_VERSION) {
273         error_setg(errp, "multifd: received packet version %u, expected %u",
274                    version, MULTIFD_VERSION);
275         return -1;
276     }
277 
278     p->flags = be32_to_cpu(hdr->flags);
279 
280     return 0;
281 }
282 
multifd_recv_unfill_packet_device_state(MultiFDRecvParams * p,Error ** errp)283 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
284                                                    Error **errp)
285 {
286     MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
287 
288     packet->instance_id = be32_to_cpu(packet->instance_id);
289     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
290 
291     return 0;
292 }
293 
multifd_recv_unfill_packet_ram(MultiFDRecvParams * p,Error ** errp)294 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
295 {
296     const MultiFDPacket_t *packet = p->packet;
297     int ret = 0;
298 
299     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
300     p->packet_num = be64_to_cpu(packet->packet_num);
301 
302     /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
303     ret = multifd_ram_unfill_packet(p, errp);
304 
305     trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
306                               p->next_packet_size);
307 
308     return ret;
309 }
310 
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)311 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
312 {
313     p->packets_recved++;
314 
315     if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
316         return multifd_recv_unfill_packet_device_state(p, errp);
317     }
318 
319     return multifd_recv_unfill_packet_ram(p, errp);
320 }
321 
multifd_send_should_exit(void)322 static bool multifd_send_should_exit(void)
323 {
324     return qatomic_read(&multifd_send_state->exiting);
325 }
326 
multifd_recv_should_exit(void)327 static bool multifd_recv_should_exit(void)
328 {
329     return qatomic_read(&multifd_recv_state->exiting);
330 }
331 
332 /*
333  * The migration thread can wait on either of the two semaphores.  This
334  * function can be used to kick the main thread out of waiting on either of
335  * them.  Should mostly only be called when something wrong happened with
336  * the current multifd send thread.
337  */
multifd_send_kick_main(MultiFDSendParams * p)338 static void multifd_send_kick_main(MultiFDSendParams *p)
339 {
340     qemu_sem_post(&p->sem_sync);
341     qemu_sem_post(&multifd_send_state->channels_ready);
342 }
343 
344 /*
345  * multifd_send() works by exchanging the MultiFDSendData object
346  * provided by the caller with an unused MultiFDSendData object from
347  * the next channel that is found to be idle.
348  *
349  * The channel owns the data until it finishes transmitting and the
350  * caller owns the empty object until it fills it with data and calls
351  * this function again. No locking necessary.
352  *
353  * Switching is safe because both the migration thread and the channel
354  * thread have barriers in place to serialize access.
355  *
356  * Returns true if succeed, false otherwise.
357  */
multifd_send(MultiFDSendData ** send_data)358 bool multifd_send(MultiFDSendData **send_data)
359 {
360     int i;
361     static int next_channel;
362     MultiFDSendParams *p = NULL; /* make happy gcc */
363     MultiFDSendData *tmp;
364 
365     if (multifd_send_should_exit()) {
366         return false;
367     }
368 
369     QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
370 
371     /* We wait here, until at least one channel is ready */
372     qemu_sem_wait(&multifd_send_state->channels_ready);
373 
374     /*
375      * next_channel can remain from a previous migration that was
376      * using more channels, so ensure it doesn't overflow if the
377      * limit is lower now.
378      */
379     next_channel %= migrate_multifd_channels();
380     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
381         if (multifd_send_should_exit()) {
382             return false;
383         }
384         p = &multifd_send_state->params[i];
385         /*
386          * Lockless read to p->pending_job is safe, because only multifd
387          * sender thread can clear it.
388          */
389         if (qatomic_read(&p->pending_job) == false) {
390             next_channel = (i + 1) % migrate_multifd_channels();
391             break;
392         }
393     }
394 
395     /*
396      * Make sure we read p->pending_job before all the rest.  Pairs with
397      * qatomic_store_release() in multifd_send_thread().
398      */
399     smp_mb_acquire();
400 
401     assert(multifd_payload_empty(p->data));
402 
403     /*
404      * Swap the pointers. The channel gets the client data for
405      * transferring and the client gets back an unused data slot.
406      */
407     tmp = *send_data;
408     *send_data = p->data;
409     p->data = tmp;
410 
411     /*
412      * Making sure p->data is setup before marking pending_job=true. Pairs
413      * with the qatomic_load_acquire() in multifd_send_thread().
414      */
415     qatomic_store_release(&p->pending_job, true);
416     qemu_sem_post(&p->sem);
417 
418     return true;
419 }
420 
421 /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)422 static void multifd_send_set_error(Error *err)
423 {
424     /*
425      * We don't want to exit each threads twice.  Depending on where
426      * we get the error, or if there are two independent errors in two
427      * threads at the same time, we can end calling this function
428      * twice.
429      */
430     if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
431         return;
432     }
433 
434     if (err) {
435         MigrationState *s = migrate_get_current();
436         migrate_set_error(s, err);
437         if (s->state == MIGRATION_STATUS_SETUP ||
438             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
439             s->state == MIGRATION_STATUS_DEVICE ||
440             s->state == MIGRATION_STATUS_ACTIVE) {
441             migrate_set_state(&s->state, s->state,
442                               MIGRATION_STATUS_FAILED);
443         }
444     }
445 }
446 
multifd_send_terminate_threads(void)447 static void multifd_send_terminate_threads(void)
448 {
449     int i;
450 
451     trace_multifd_send_terminate_threads();
452 
453     /*
454      * Tell everyone we're quitting.  No xchg() needed here; we simply
455      * always set it.
456      */
457     qatomic_set(&multifd_send_state->exiting, 1);
458 
459     /*
460      * Firstly, kick all threads out; no matter whether they are just idle,
461      * or blocked in an IO system call.
462      */
463     for (i = 0; i < migrate_multifd_channels(); i++) {
464         MultiFDSendParams *p = &multifd_send_state->params[i];
465 
466         qemu_sem_post(&p->sem);
467         if (p->c) {
468             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
469         }
470     }
471 
472     /*
473      * Finally recycle all the threads.
474      */
475     for (i = 0; i < migrate_multifd_channels(); i++) {
476         MultiFDSendParams *p = &multifd_send_state->params[i];
477 
478         if (p->tls_thread_created) {
479             qemu_thread_join(&p->tls_thread);
480         }
481 
482         if (p->thread_created) {
483             qemu_thread_join(&p->thread);
484         }
485     }
486 }
487 
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)488 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
489 {
490     if (p->c) {
491         migration_ioc_unregister_yank(p->c);
492         /*
493          * The object_unref() cannot guarantee the fd will always be
494          * released because finalize() of the iochannel is only
495          * triggered on the last reference and it's not guaranteed
496          * that we always hold the last refcount when reaching here.
497          *
498          * Closing the fd explicitly has the benefit that if there is any
499          * registered I/O handler callbacks on such fd, that will get a
500          * POLLNVAL event and will further trigger the cleanup to finally
501          * release the IOC.
502          *
503          * FIXME: It should logically be guaranteed that all multifd
504          * channels have no I/O handler callback registered when reaching
505          * here, because migration thread will wait for all multifd channel
506          * establishments to complete during setup.  Since
507          * migration_cleanup() will be scheduled in main thread too, all
508          * previous callbacks should guarantee to be completed when
509          * reaching here.  See multifd_send_state.channels_created and its
510          * usage.  In the future, we could replace this with an assert
511          * making sure we're the last reference, or simply drop it if above
512          * is more clear to be justified.
513          */
514         qio_channel_close(p->c, &error_abort);
515         object_unref(OBJECT(p->c));
516         p->c = NULL;
517     }
518     qemu_sem_destroy(&p->sem);
519     qemu_sem_destroy(&p->sem_sync);
520     g_free(p->name);
521     p->name = NULL;
522     g_clear_pointer(&p->data, multifd_send_data_free);
523     p->packet_len = 0;
524     g_clear_pointer(&p->packet_device_state, g_free);
525     g_free(p->packet);
526     p->packet = NULL;
527     multifd_send_state->ops->send_cleanup(p, errp);
528     assert(!p->iov);
529 
530     return *errp == NULL;
531 }
532 
multifd_send_cleanup_state(void)533 static void multifd_send_cleanup_state(void)
534 {
535     file_cleanup_outgoing_migration();
536     socket_cleanup_outgoing_migration();
537     multifd_device_state_send_cleanup();
538     qemu_sem_destroy(&multifd_send_state->channels_created);
539     qemu_sem_destroy(&multifd_send_state->channels_ready);
540     qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
541     g_free(multifd_send_state->params);
542     multifd_send_state->params = NULL;
543     g_free(multifd_send_state);
544     multifd_send_state = NULL;
545 }
546 
multifd_send_shutdown(void)547 void multifd_send_shutdown(void)
548 {
549     int i;
550 
551     if (!migrate_multifd()) {
552         return;
553     }
554 
555     for (i = 0; i < migrate_multifd_channels(); i++) {
556         MultiFDSendParams *p = &multifd_send_state->params[i];
557 
558         /* thread_created implies the TLS handshake has succeeded */
559         if (p->tls_thread_created && p->thread_created) {
560             Error *local_err = NULL;
561             /*
562              * The destination expects the TLS session to always be
563              * properly terminated. This helps to detect a premature
564              * termination in the middle of the stream.  Note that
565              * older QEMUs always break the connection on the source
566              * and the destination always sees
567              * GNUTLS_E_PREMATURE_TERMINATION.
568              */
569             migration_tls_channel_end(p->c, &local_err);
570 
571             /*
572              * The above can return an error in case the migration has
573              * already failed. If the migration succeeded, errors are
574              * not expected but there's no need to kill the source.
575              */
576             if (local_err && !migration_has_failed(migrate_get_current())) {
577                 warn_report(
578                     "multifd_send_%d: Failed to terminate TLS connection: %s",
579                     p->id, error_get_pretty(local_err));
580                 break;
581             }
582         }
583     }
584 
585     multifd_send_terminate_threads();
586 
587     for (i = 0; i < migrate_multifd_channels(); i++) {
588         MultiFDSendParams *p = &multifd_send_state->params[i];
589         Error *local_err = NULL;
590 
591         if (!multifd_send_cleanup_channel(p, &local_err)) {
592             migrate_set_error(migrate_get_current(), local_err);
593             error_free(local_err);
594         }
595     }
596 
597     multifd_send_cleanup_state();
598 }
599 
multifd_zero_copy_flush(QIOChannel * c)600 static int multifd_zero_copy_flush(QIOChannel *c)
601 {
602     int ret;
603     Error *err = NULL;
604 
605     ret = qio_channel_flush(c, &err);
606     if (ret < 0) {
607         error_report_err(err);
608         return -1;
609     }
610     if (ret == 1) {
611         stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
612     }
613 
614     return ret;
615 }
616 
multifd_send_sync_main(MultiFDSyncReq req)617 int multifd_send_sync_main(MultiFDSyncReq req)
618 {
619     int i;
620     bool flush_zero_copy;
621 
622     assert(req != MULTIFD_SYNC_NONE);
623 
624     flush_zero_copy = migrate_zero_copy_send();
625 
626     for (i = 0; i < migrate_multifd_channels(); i++) {
627         MultiFDSendParams *p = &multifd_send_state->params[i];
628 
629         if (multifd_send_should_exit()) {
630             return -1;
631         }
632 
633         trace_multifd_send_sync_main_signal(p->id);
634 
635         /*
636          * We should be the only user so far, so not possible to be set by
637          * others concurrently.
638          */
639         assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
640         qatomic_set(&p->pending_sync, req);
641         qemu_sem_post(&p->sem);
642     }
643     for (i = 0; i < migrate_multifd_channels(); i++) {
644         MultiFDSendParams *p = &multifd_send_state->params[i];
645 
646         if (multifd_send_should_exit()) {
647             return -1;
648         }
649 
650         qemu_sem_wait(&multifd_send_state->channels_ready);
651         trace_multifd_send_sync_main_wait(p->id);
652         qemu_sem_wait(&p->sem_sync);
653 
654         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
655             return -1;
656         }
657     }
658     trace_multifd_send_sync_main(multifd_send_state->packet_num);
659 
660     return 0;
661 }
662 
multifd_send_thread(void * opaque)663 static void *multifd_send_thread(void *opaque)
664 {
665     MultiFDSendParams *p = opaque;
666     MigrationThread *thread = NULL;
667     Error *local_err = NULL;
668     int ret = 0;
669     bool use_packets = multifd_use_packets();
670 
671     thread = migration_threads_add(p->name, qemu_get_thread_id());
672 
673     trace_multifd_send_thread_start(p->id);
674     rcu_register_thread();
675 
676     if (use_packets) {
677         if (multifd_send_initial_packet(p, &local_err) < 0) {
678             ret = -1;
679             goto out;
680         }
681     }
682 
683     while (true) {
684         qemu_sem_post(&multifd_send_state->channels_ready);
685         qemu_sem_wait(&p->sem);
686 
687         if (multifd_send_should_exit()) {
688             break;
689         }
690 
691         /*
692          * Read pending_job flag before p->data.  Pairs with the
693          * qatomic_store_release() in multifd_send().
694          */
695         if (qatomic_load_acquire(&p->pending_job)) {
696             bool is_device_state = multifd_payload_device_state(p->data);
697             size_t total_size;
698             int write_flags_masked = 0;
699 
700             p->flags = 0;
701             p->iovs_num = 0;
702             assert(!multifd_payload_empty(p->data));
703 
704             if (is_device_state) {
705                 multifd_device_state_send_prepare(p);
706 
707                 /* Device state packets cannot be sent via zerocopy */
708                 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
709             } else {
710                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
711                 if (ret != 0) {
712                     break;
713                 }
714             }
715 
716             /*
717              * The packet header in the zerocopy RAM case is accounted for
718              * in multifd_nocomp_send_prepare() - where it is actually
719              * being sent.
720              */
721             total_size = iov_size(p->iov, p->iovs_num);
722 
723             if (migrate_mapped_ram()) {
724                 assert(!is_device_state);
725 
726                 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
727                                               &p->data->u.ram, &local_err);
728             } else {
729                 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
730                                                   NULL, 0,
731                                                   p->write_flags & ~write_flags_masked,
732                                                   &local_err);
733             }
734 
735             if (ret != 0) {
736                 break;
737             }
738 
739             stat64_add(&mig_stats.multifd_bytes, total_size);
740 
741             p->next_packet_size = 0;
742             multifd_send_data_clear(p->data);
743 
744             /*
745              * Making sure p->data is published before saying "we're
746              * free".  Pairs with the smp_mb_acquire() in
747              * multifd_send().
748              */
749             qatomic_store_release(&p->pending_job, false);
750         } else {
751             MultiFDSyncReq req = qatomic_read(&p->pending_sync);
752 
753             /*
754              * If not a normal job, must be a sync request.  Note that
755              * pending_sync is a standalone flag (unlike pending_job), so
756              * it doesn't require explicit memory barriers.
757              */
758             assert(req != MULTIFD_SYNC_NONE);
759 
760             /* Only push the SYNC message if it involves a remote sync */
761             if (req == MULTIFD_SYNC_ALL) {
762                 p->flags = MULTIFD_FLAG_SYNC;
763                 multifd_send_fill_packet(p);
764                 ret = qio_channel_write_all(p->c, (void *)p->packet,
765                                             p->packet_len, &local_err);
766                 if (ret != 0) {
767                     break;
768                 }
769                 /* p->next_packet_size will always be zero for a SYNC packet */
770                 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
771             }
772 
773             qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
774             qemu_sem_post(&p->sem_sync);
775         }
776     }
777 
778 out:
779     if (ret) {
780         assert(local_err);
781         trace_multifd_send_error(p->id);
782         multifd_send_set_error(local_err);
783         multifd_send_kick_main(p);
784         error_free(local_err);
785     }
786 
787     rcu_unregister_thread();
788     migration_threads_remove(thread);
789     trace_multifd_send_thread_end(p->id, p->packets_sent);
790 
791     return NULL;
792 }
793 
794 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
795 
796 typedef struct {
797     MultiFDSendParams *p;
798     QIOChannelTLS *tioc;
799 } MultiFDTLSThreadArgs;
800 
multifd_tls_handshake_thread(void * opaque)801 static void *multifd_tls_handshake_thread(void *opaque)
802 {
803     MultiFDTLSThreadArgs *args = opaque;
804 
805     qio_channel_tls_handshake(args->tioc,
806                               multifd_new_send_channel_async,
807                               args->p,
808                               NULL,
809                               NULL);
810     g_free(args);
811 
812     return NULL;
813 }
814 
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)815 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
816                                         QIOChannel *ioc,
817                                         Error **errp)
818 {
819     MigrationState *s = migrate_get_current();
820     const char *hostname = s->hostname;
821     MultiFDTLSThreadArgs *args;
822     QIOChannelTLS *tioc;
823 
824     tioc = migration_tls_client_create(ioc, hostname, errp);
825     if (!tioc) {
826         return false;
827     }
828 
829     /*
830      * Ownership of the socket channel now transfers to the newly
831      * created TLS channel, which has already taken a reference.
832      */
833     object_unref(OBJECT(ioc));
834     trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
835     qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
836 
837     args = g_new0(MultiFDTLSThreadArgs, 1);
838     args->tioc = tioc;
839     args->p = p;
840 
841     p->tls_thread_created = true;
842     qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
843                        multifd_tls_handshake_thread, args,
844                        QEMU_THREAD_JOINABLE);
845     return true;
846 }
847 
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)848 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
849 {
850     qio_channel_set_delay(ioc, false);
851 
852     migration_ioc_register_yank(ioc);
853     /* Setup p->c only if the channel is completely setup */
854     p->c = ioc;
855 
856     p->thread_created = true;
857     qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
858                        QEMU_THREAD_JOINABLE);
859 }
860 
861 /*
862  * When TLS is enabled this function is called once to establish the
863  * TLS connection and a second time after the TLS handshake to create
864  * the multifd channel. Without TLS it goes straight into the channel
865  * creation.
866  */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)867 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
868 {
869     MultiFDSendParams *p = opaque;
870     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
871     Error *local_err = NULL;
872     bool ret;
873 
874     trace_multifd_new_send_channel_async(p->id);
875 
876     if (qio_task_propagate_error(task, &local_err)) {
877         ret = false;
878         goto out;
879     }
880 
881     trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
882                                        migrate_get_current()->hostname);
883 
884     if (migrate_channel_requires_tls_upgrade(ioc)) {
885         ret = multifd_tls_channel_connect(p, ioc, &local_err);
886         if (ret) {
887             return;
888         }
889     } else {
890         multifd_channel_connect(p, ioc);
891         ret = true;
892     }
893 
894 out:
895     /*
896      * Here we're not interested whether creation succeeded, only that
897      * it happened at all.
898      */
899     multifd_send_channel_created();
900 
901     if (ret) {
902         return;
903     }
904 
905     trace_multifd_new_send_channel_async_error(p->id, local_err);
906     multifd_send_set_error(local_err);
907     /*
908      * For error cases (TLS or non-TLS), IO channel is always freed here
909      * rather than when cleanup multifd: since p->c is not set, multifd
910      * cleanup code doesn't even know its existence.
911      */
912     object_unref(OBJECT(ioc));
913     error_free(local_err);
914 }
915 
multifd_new_send_channel_create(gpointer opaque,Error ** errp)916 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
917 {
918     if (!multifd_use_packets()) {
919         return file_send_channel_create(opaque, errp);
920     }
921 
922     socket_send_channel_create(multifd_new_send_channel_async, opaque);
923     return true;
924 }
925 
multifd_send_setup(void)926 bool multifd_send_setup(void)
927 {
928     MigrationState *s = migrate_get_current();
929     int thread_count, ret = 0;
930     uint32_t page_count = multifd_ram_page_count();
931     bool use_packets = multifd_use_packets();
932     uint8_t i;
933 
934     if (!migrate_multifd()) {
935         return true;
936     }
937 
938     thread_count = migrate_multifd_channels();
939     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
940     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
941     qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
942     qemu_sem_init(&multifd_send_state->channels_created, 0);
943     qemu_sem_init(&multifd_send_state->channels_ready, 0);
944     qatomic_set(&multifd_send_state->exiting, 0);
945     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
946 
947     for (i = 0; i < thread_count; i++) {
948         MultiFDSendParams *p = &multifd_send_state->params[i];
949         Error *local_err = NULL;
950 
951         qemu_sem_init(&p->sem, 0);
952         qemu_sem_init(&p->sem_sync, 0);
953         p->id = i;
954         p->data = multifd_send_data_alloc();
955 
956         if (use_packets) {
957             p->packet_len = sizeof(MultiFDPacket_t)
958                           + sizeof(uint64_t) * page_count;
959             p->packet = g_malloc0(p->packet_len);
960             p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
961             p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
962             p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
963         }
964         p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
965         p->write_flags = 0;
966 
967         if (!multifd_new_send_channel_create(p, &local_err)) {
968             migrate_set_error(s, local_err);
969             ret = -1;
970         }
971     }
972 
973     /*
974      * Wait until channel creation has started for all channels. The
975      * creation can still fail, but no more channels will be created
976      * past this point.
977      */
978     for (i = 0; i < thread_count; i++) {
979         qemu_sem_wait(&multifd_send_state->channels_created);
980     }
981 
982     if (ret) {
983         goto err;
984     }
985 
986     for (i = 0; i < thread_count; i++) {
987         MultiFDSendParams *p = &multifd_send_state->params[i];
988         Error *local_err = NULL;
989 
990         ret = multifd_send_state->ops->send_setup(p, &local_err);
991         if (ret) {
992             migrate_set_error(s, local_err);
993             goto err;
994         }
995         assert(p->iov);
996     }
997 
998     multifd_device_state_send_setup();
999 
1000     return true;
1001 
1002 err:
1003     migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1004                       MIGRATION_STATUS_FAILED);
1005     return false;
1006 }
1007 
multifd_recv(void)1008 bool multifd_recv(void)
1009 {
1010     int i;
1011     static int next_recv_channel;
1012     MultiFDRecvParams *p = NULL;
1013     MultiFDRecvData *data = multifd_recv_state->data;
1014 
1015     /*
1016      * next_channel can remain from a previous migration that was
1017      * using more channels, so ensure it doesn't overflow if the
1018      * limit is lower now.
1019      */
1020     next_recv_channel %= migrate_multifd_channels();
1021     for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1022         if (multifd_recv_should_exit()) {
1023             return false;
1024         }
1025 
1026         p = &multifd_recv_state->params[i];
1027 
1028         if (qatomic_read(&p->pending_job) == false) {
1029             next_recv_channel = (i + 1) % migrate_multifd_channels();
1030             break;
1031         }
1032     }
1033 
1034     /*
1035      * Order pending_job read before manipulating p->data below. Pairs
1036      * with qatomic_store_release() at multifd_recv_thread().
1037      */
1038     smp_mb_acquire();
1039 
1040     assert(!p->data->size);
1041     multifd_recv_state->data = p->data;
1042     p->data = data;
1043 
1044     /*
1045      * Order p->data update before setting pending_job. Pairs with
1046      * qatomic_load_acquire() at multifd_recv_thread().
1047      */
1048     qatomic_store_release(&p->pending_job, true);
1049     qemu_sem_post(&p->sem);
1050 
1051     return true;
1052 }
1053 
multifd_get_recv_data(void)1054 MultiFDRecvData *multifd_get_recv_data(void)
1055 {
1056     return multifd_recv_state->data;
1057 }
1058 
multifd_recv_terminate_threads(Error * err)1059 static void multifd_recv_terminate_threads(Error *err)
1060 {
1061     int i;
1062 
1063     trace_multifd_recv_terminate_threads(err != NULL);
1064 
1065     if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1066         return;
1067     }
1068 
1069     if (err) {
1070         MigrationState *s = migrate_get_current();
1071         migrate_set_error(s, err);
1072         if (s->state == MIGRATION_STATUS_SETUP ||
1073             s->state == MIGRATION_STATUS_ACTIVE) {
1074             migrate_set_state(&s->state, s->state,
1075                               MIGRATION_STATUS_FAILED);
1076         }
1077     }
1078 
1079     for (i = 0; i < migrate_multifd_channels(); i++) {
1080         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1081 
1082         /*
1083          * The migration thread and channels interact differently
1084          * depending on the presence of packets.
1085          */
1086         if (multifd_use_packets()) {
1087             /*
1088              * The channel receives as long as there are packets. When
1089              * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1090              * channel waits for the migration thread to sync. If the
1091              * sync never happens, do it here.
1092              */
1093             qemu_sem_post(&p->sem_sync);
1094         } else {
1095             /*
1096              * The channel waits for the migration thread to give it
1097              * work. When the migration thread runs out of work, it
1098              * releases the channel and waits for any pending work to
1099              * finish. If we reach here (e.g. due to error) before the
1100              * work runs out, release the channel.
1101              */
1102             qemu_sem_post(&p->sem);
1103         }
1104 
1105         /*
1106          * We could arrive here for two reasons:
1107          *  - normal quit, i.e. everything went fine, just finished
1108          *  - error quit: We close the channels so the channel threads
1109          *    finish the qio_channel_read_all_eof()
1110          */
1111         if (p->c) {
1112             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1113         }
1114     }
1115 }
1116 
multifd_recv_shutdown(void)1117 void multifd_recv_shutdown(void)
1118 {
1119     if (migrate_multifd()) {
1120         multifd_recv_terminate_threads(NULL);
1121     }
1122 }
1123 
multifd_recv_cleanup_channel(MultiFDRecvParams * p)1124 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1125 {
1126     migration_ioc_unregister_yank(p->c);
1127     object_unref(OBJECT(p->c));
1128     p->c = NULL;
1129     qemu_mutex_destroy(&p->mutex);
1130     qemu_sem_destroy(&p->sem_sync);
1131     qemu_sem_destroy(&p->sem);
1132     g_free(p->data);
1133     p->data = NULL;
1134     g_free(p->name);
1135     p->name = NULL;
1136     p->packet_len = 0;
1137     g_free(p->packet);
1138     p->packet = NULL;
1139     g_clear_pointer(&p->packet_dev_state, g_free);
1140     g_free(p->normal);
1141     p->normal = NULL;
1142     g_free(p->zero);
1143     p->zero = NULL;
1144     multifd_recv_state->ops->recv_cleanup(p);
1145 }
1146 
multifd_recv_cleanup_state(void)1147 static void multifd_recv_cleanup_state(void)
1148 {
1149     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1150     g_free(multifd_recv_state->params);
1151     multifd_recv_state->params = NULL;
1152     g_free(multifd_recv_state->data);
1153     multifd_recv_state->data = NULL;
1154     g_free(multifd_recv_state);
1155     multifd_recv_state = NULL;
1156 }
1157 
multifd_recv_cleanup(void)1158 void multifd_recv_cleanup(void)
1159 {
1160     int i;
1161 
1162     if (!migrate_multifd()) {
1163         return;
1164     }
1165     multifd_recv_terminate_threads(NULL);
1166     for (i = 0; i < migrate_multifd_channels(); i++) {
1167         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1168 
1169         if (p->thread_created) {
1170             qemu_thread_join(&p->thread);
1171         }
1172     }
1173     for (i = 0; i < migrate_multifd_channels(); i++) {
1174         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1175     }
1176     multifd_recv_cleanup_state();
1177 }
1178 
multifd_recv_sync_main(void)1179 void multifd_recv_sync_main(void)
1180 {
1181     int thread_count = migrate_multifd_channels();
1182     bool file_based = !multifd_use_packets();
1183     int i;
1184 
1185     if (!migrate_multifd()) {
1186         return;
1187     }
1188 
1189     /*
1190      * File-based channels don't use packets and therefore need to
1191      * wait for more work. Release them to start the sync.
1192      */
1193     if (file_based) {
1194         for (i = 0; i < thread_count; i++) {
1195             MultiFDRecvParams *p = &multifd_recv_state->params[i];
1196 
1197             trace_multifd_recv_sync_main_signal(p->id);
1198             qemu_sem_post(&p->sem);
1199         }
1200     }
1201 
1202     /*
1203      * Initiate the synchronization by waiting for all channels.
1204      *
1205      * For socket-based migration this means each channel has received
1206      * the SYNC packet on the stream.
1207      *
1208      * For file-based migration this means each channel is done with
1209      * the work (pending_job=false).
1210      */
1211     for (i = 0; i < thread_count; i++) {
1212         trace_multifd_recv_sync_main_wait(i);
1213         qemu_sem_wait(&multifd_recv_state->sem_sync);
1214     }
1215 
1216     if (file_based) {
1217         /*
1218          * For file-based loading is done in one iteration. We're
1219          * done.
1220          */
1221         return;
1222     }
1223 
1224     /*
1225      * Sync done. Release the channels for the next iteration.
1226      */
1227     for (i = 0; i < thread_count; i++) {
1228         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1229 
1230         WITH_QEMU_LOCK_GUARD(&p->mutex) {
1231             if (multifd_recv_state->packet_num < p->packet_num) {
1232                 multifd_recv_state->packet_num = p->packet_num;
1233             }
1234         }
1235         trace_multifd_recv_sync_main_signal(p->id);
1236         qemu_sem_post(&p->sem_sync);
1237     }
1238     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1239 }
1240 
multifd_device_state_recv(MultiFDRecvParams * p,Error ** errp)1241 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1242 {
1243     g_autofree char *dev_state_buf = NULL;
1244     int ret;
1245 
1246     dev_state_buf = g_malloc(p->next_packet_size);
1247 
1248     ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1249     if (ret != 0) {
1250         return ret;
1251     }
1252 
1253     if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1254         != 0) {
1255         error_setg(errp, "unterminated multifd device state idstr");
1256         return -1;
1257     }
1258 
1259     if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1260                                        p->packet_dev_state->instance_id,
1261                                        dev_state_buf, p->next_packet_size,
1262                                        errp)) {
1263         ret = -1;
1264     }
1265 
1266     return ret;
1267 }
1268 
multifd_recv_thread(void * opaque)1269 static void *multifd_recv_thread(void *opaque)
1270 {
1271     MigrationState *s = migrate_get_current();
1272     MultiFDRecvParams *p = opaque;
1273     Error *local_err = NULL;
1274     bool use_packets = multifd_use_packets();
1275     int ret;
1276 
1277     trace_multifd_recv_thread_start(p->id);
1278     rcu_register_thread();
1279 
1280     if (!s->multifd_clean_tls_termination) {
1281         p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1282     }
1283 
1284     while (true) {
1285         MultiFDPacketHdr_t hdr;
1286         uint32_t flags = 0;
1287         bool is_device_state = false;
1288         bool has_data = false;
1289         uint8_t *pkt_buf;
1290         size_t pkt_len;
1291 
1292         p->normal_num = 0;
1293 
1294         if (use_packets) {
1295             struct iovec iov = {
1296                 .iov_base = (void *)&hdr,
1297                 .iov_len = sizeof(hdr)
1298             };
1299 
1300             if (multifd_recv_should_exit()) {
1301                 break;
1302             }
1303 
1304             ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1305                                                  p->read_flags, &local_err);
1306             if (!ret) {
1307                 /* EOF */
1308                 assert(!local_err);
1309                 break;
1310             }
1311 
1312             if (ret == -1) {
1313                 break;
1314             }
1315 
1316             ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1317             if (ret) {
1318                 break;
1319             }
1320 
1321             is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1322             if (is_device_state) {
1323                 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1324                 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1325             } else {
1326                 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1327                 pkt_len = p->packet_len - sizeof(hdr);
1328             }
1329 
1330             ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1331                                            &local_err);
1332             if (!ret) {
1333                 /* EOF */
1334                 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1335                 break;
1336             }
1337 
1338             if (ret == -1) {
1339                 break;
1340             }
1341 
1342             qemu_mutex_lock(&p->mutex);
1343             ret = multifd_recv_unfill_packet(p, &local_err);
1344             if (ret) {
1345                 qemu_mutex_unlock(&p->mutex);
1346                 break;
1347             }
1348 
1349             flags = p->flags;
1350             /* recv methods don't know how to handle the SYNC flag */
1351             p->flags &= ~MULTIFD_FLAG_SYNC;
1352 
1353             if (is_device_state) {
1354                 has_data = p->next_packet_size > 0;
1355             } else {
1356                 /*
1357                  * Even if it's a SYNC packet, this needs to be set
1358                  * because older QEMUs (<9.0) still send data along with
1359                  * the SYNC packet.
1360                  */
1361                 has_data = p->normal_num || p->zero_num;
1362             }
1363 
1364             qemu_mutex_unlock(&p->mutex);
1365         } else {
1366             /*
1367              * No packets, so we need to wait for the vmstate code to
1368              * give us work.
1369              */
1370             qemu_sem_wait(&p->sem);
1371 
1372             if (multifd_recv_should_exit()) {
1373                 break;
1374             }
1375 
1376             /* pairs with qatomic_store_release() at multifd_recv() */
1377             if (!qatomic_load_acquire(&p->pending_job)) {
1378                 /*
1379                  * Migration thread did not send work, this is
1380                  * equivalent to pending_sync on the sending
1381                  * side. Post sem_sync to notify we reached this
1382                  * point.
1383                  */
1384                 qemu_sem_post(&multifd_recv_state->sem_sync);
1385                 continue;
1386             }
1387 
1388             has_data = !!p->data->size;
1389         }
1390 
1391         if (has_data) {
1392             if (is_device_state) {
1393                 assert(use_packets);
1394                 ret = multifd_device_state_recv(p, &local_err);
1395             } else {
1396                 ret = multifd_recv_state->ops->recv(p, &local_err);
1397             }
1398             if (ret != 0) {
1399                 break;
1400             }
1401         } else if (is_device_state) {
1402             error_setg(&local_err,
1403                        "multifd: received empty device state packet");
1404             break;
1405         }
1406 
1407         if (use_packets) {
1408             if (flags & MULTIFD_FLAG_SYNC) {
1409                 if (is_device_state) {
1410                     error_setg(&local_err,
1411                                "multifd: received SYNC device state packet");
1412                     break;
1413                 }
1414 
1415                 qemu_sem_post(&multifd_recv_state->sem_sync);
1416                 qemu_sem_wait(&p->sem_sync);
1417             }
1418         } else {
1419             p->data->size = 0;
1420             /*
1421              * Order data->size update before clearing
1422              * pending_job. Pairs with smp_mb_acquire() at
1423              * multifd_recv().
1424              */
1425             qatomic_store_release(&p->pending_job, false);
1426         }
1427     }
1428 
1429     if (local_err) {
1430         multifd_recv_terminate_threads(local_err);
1431         error_free(local_err);
1432     }
1433 
1434     rcu_unregister_thread();
1435     trace_multifd_recv_thread_end(p->id, p->packets_recved);
1436 
1437     return NULL;
1438 }
1439 
multifd_recv_setup(Error ** errp)1440 int multifd_recv_setup(Error **errp)
1441 {
1442     int thread_count;
1443     uint32_t page_count = multifd_ram_page_count();
1444     bool use_packets = multifd_use_packets();
1445     uint8_t i;
1446 
1447     /*
1448      * Return successfully if multiFD recv state is already initialised
1449      * or multiFD is not enabled.
1450      */
1451     if (multifd_recv_state || !migrate_multifd()) {
1452         return 0;
1453     }
1454 
1455     thread_count = migrate_multifd_channels();
1456     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1457     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1458 
1459     multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1460     multifd_recv_state->data->size = 0;
1461 
1462     qatomic_set(&multifd_recv_state->count, 0);
1463     qatomic_set(&multifd_recv_state->exiting, 0);
1464     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1465     multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1466 
1467     for (i = 0; i < thread_count; i++) {
1468         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1469 
1470         qemu_mutex_init(&p->mutex);
1471         qemu_sem_init(&p->sem_sync, 0);
1472         qemu_sem_init(&p->sem, 0);
1473         p->pending_job = false;
1474         p->id = i;
1475 
1476         p->data = g_new0(MultiFDRecvData, 1);
1477         p->data->size = 0;
1478 
1479         if (use_packets) {
1480             p->packet_len = sizeof(MultiFDPacket_t)
1481                 + sizeof(uint64_t) * page_count;
1482             p->packet = g_malloc0(p->packet_len);
1483             p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1484         }
1485         p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1486         p->normal = g_new0(ram_addr_t, page_count);
1487         p->zero = g_new0(ram_addr_t, page_count);
1488     }
1489 
1490     for (i = 0; i < thread_count; i++) {
1491         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1492         int ret;
1493 
1494         ret = multifd_recv_state->ops->recv_setup(p, errp);
1495         if (ret) {
1496             return ret;
1497         }
1498     }
1499     return 0;
1500 }
1501 
multifd_recv_all_channels_created(void)1502 bool multifd_recv_all_channels_created(void)
1503 {
1504     int thread_count = migrate_multifd_channels();
1505 
1506     if (!migrate_multifd()) {
1507         return true;
1508     }
1509 
1510     if (!multifd_recv_state) {
1511         /* Called before any connections created */
1512         return false;
1513     }
1514 
1515     return thread_count == qatomic_read(&multifd_recv_state->count);
1516 }
1517 
1518 /*
1519  * Try to receive all multifd channels to get ready for the migration.
1520  * Sets @errp when failing to receive the current channel.
1521  */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)1522 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1523 {
1524     MultiFDRecvParams *p;
1525     Error *local_err = NULL;
1526     bool use_packets = multifd_use_packets();
1527     int id;
1528 
1529     if (use_packets) {
1530         id = multifd_recv_initial_packet(ioc, &local_err);
1531         if (id < 0) {
1532             multifd_recv_terminate_threads(local_err);
1533             error_propagate_prepend(errp, local_err,
1534                                     "failed to receive packet"
1535                                     " via multifd channel %d: ",
1536                                     qatomic_read(&multifd_recv_state->count));
1537             return;
1538         }
1539         trace_multifd_recv_new_channel(id);
1540     } else {
1541         id = qatomic_read(&multifd_recv_state->count);
1542     }
1543 
1544     p = &multifd_recv_state->params[id];
1545     if (p->c != NULL) {
1546         error_setg(&local_err, "multifd: received id '%d' already setup'",
1547                    id);
1548         multifd_recv_terminate_threads(local_err);
1549         error_propagate(errp, local_err);
1550         return;
1551     }
1552     p->c = ioc;
1553     object_ref(OBJECT(ioc));
1554 
1555     p->thread_created = true;
1556     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1557                        QEMU_THREAD_JOINABLE);
1558     qatomic_inc(&multifd_recv_state->count);
1559 }
1560