1 /*
2 * Multifd common code
3 *
4 * Copyright (c) 2019-2020 Red Hat Inc
5 *
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/iov.h"
16 #include "qemu/rcu.h"
17 #include "exec/target_page.h"
18 #include "system/system.h"
19 #include "exec/ramblock.h"
20 #include "qemu/error-report.h"
21 #include "qapi/error.h"
22 #include "file.h"
23 #include "migration/misc.h"
24 #include "migration.h"
25 #include "migration-stats.h"
26 #include "savevm.h"
27 #include "socket.h"
28 #include "tls.h"
29 #include "qemu-file.h"
30 #include "trace.h"
31 #include "multifd.h"
32 #include "threadinfo.h"
33 #include "options.h"
34 #include "qemu/yank.h"
35 #include "io/channel-file.h"
36 #include "io/channel-socket.h"
37 #include "yank_functions.h"
38
39 /* Multiple fd's */
40
41 #define MULTIFD_MAGIC 0x11223344U
42 #define MULTIFD_VERSION 1
43
44 typedef struct {
45 uint32_t magic;
46 uint32_t version;
47 unsigned char uuid[16]; /* QemuUUID */
48 uint8_t id;
49 uint8_t unused1[7]; /* Reserved for future use */
50 uint64_t unused2[4]; /* Reserved for future use */
51 } __attribute__((packed)) MultiFDInit_t;
52
53 struct {
54 MultiFDSendParams *params;
55
56 /* multifd_send() body is not thread safe, needs serialization */
57 QemuMutex multifd_send_mutex;
58
59 /*
60 * Global number of generated multifd packets.
61 *
62 * Note that we used 'uintptr_t' because it'll naturally support atomic
63 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
64 * multifd will overflow the packet_num easier, but that should be
65 * fine.
66 *
67 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
68 * hosts, however so far it does not support atomic fetch_add() yet.
69 * Make it easy for now.
70 */
71 uintptr_t packet_num;
72 /*
73 * Synchronization point past which no more channels will be
74 * created.
75 */
76 QemuSemaphore channels_created;
77 /* send channels ready */
78 QemuSemaphore channels_ready;
79 /*
80 * Have we already run terminate threads. There is a race when it
81 * happens that we got one error while we are exiting.
82 * We will use atomic operations. Only valid values are 0 and 1.
83 */
84 int exiting;
85 /* multifd ops */
86 const MultiFDMethods *ops;
87 } *multifd_send_state;
88
89 struct {
90 MultiFDRecvParams *params;
91 MultiFDRecvData *data;
92 /* number of created threads */
93 int count;
94 /*
95 * This is always posted by the recv threads, the migration thread
96 * uses it to wait for recv threads to finish assigned tasks.
97 */
98 QemuSemaphore sem_sync;
99 /* global number of generated multifd packets */
100 uint64_t packet_num;
101 int exiting;
102 /* multifd ops */
103 const MultiFDMethods *ops;
104 } *multifd_recv_state;
105
multifd_send_data_alloc(void)106 MultiFDSendData *multifd_send_data_alloc(void)
107 {
108 MultiFDSendData *new = g_new0(MultiFDSendData, 1);
109
110 multifd_ram_payload_alloc(&new->u.ram);
111 /* Device state allocates its payload on-demand */
112
113 return new;
114 }
115
multifd_send_data_clear(MultiFDSendData * data)116 void multifd_send_data_clear(MultiFDSendData *data)
117 {
118 if (multifd_payload_empty(data)) {
119 return;
120 }
121
122 switch (data->type) {
123 case MULTIFD_PAYLOAD_DEVICE_STATE:
124 multifd_send_data_clear_device_state(&data->u.device_state);
125 break;
126 default:
127 /* Nothing to do */
128 break;
129 }
130
131 data->type = MULTIFD_PAYLOAD_NONE;
132 }
133
multifd_send_data_free(MultiFDSendData * data)134 void multifd_send_data_free(MultiFDSendData *data)
135 {
136 if (!data) {
137 return;
138 }
139
140 /* This also free's device state payload */
141 multifd_send_data_clear(data);
142
143 multifd_ram_payload_free(&data->u.ram);
144
145 g_free(data);
146 }
147
multifd_use_packets(void)148 static bool multifd_use_packets(void)
149 {
150 return !migrate_mapped_ram();
151 }
152
multifd_send_channel_created(void)153 void multifd_send_channel_created(void)
154 {
155 qemu_sem_post(&multifd_send_state->channels_created);
156 }
157
158 static const MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {};
159
multifd_register_ops(int method,const MultiFDMethods * ops)160 void multifd_register_ops(int method, const MultiFDMethods *ops)
161 {
162 assert(0 <= method && method < MULTIFD_COMPRESSION__MAX);
163 assert(!multifd_ops[method]);
164 multifd_ops[method] = ops;
165 }
166
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)167 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
168 {
169 MultiFDInit_t msg = {};
170 size_t size = sizeof(msg);
171 int ret;
172
173 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
174 msg.version = cpu_to_be32(MULTIFD_VERSION);
175 msg.id = p->id;
176 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
177
178 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
179 if (ret != 0) {
180 return -1;
181 }
182 stat64_add(&mig_stats.multifd_bytes, size);
183 return 0;
184 }
185
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)186 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
187 {
188 MultiFDInit_t msg;
189 int ret;
190
191 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
192 if (ret != 0) {
193 return -1;
194 }
195
196 msg.magic = be32_to_cpu(msg.magic);
197 msg.version = be32_to_cpu(msg.version);
198
199 if (msg.magic != MULTIFD_MAGIC) {
200 error_setg(errp, "multifd: received packet magic %x "
201 "expected %x", msg.magic, MULTIFD_MAGIC);
202 return -1;
203 }
204
205 if (msg.version != MULTIFD_VERSION) {
206 error_setg(errp, "multifd: received packet version %u "
207 "expected %u", msg.version, MULTIFD_VERSION);
208 return -1;
209 }
210
211 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
212 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
213 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
214
215 error_setg(errp, "multifd: received uuid '%s' and expected "
216 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
217 g_free(uuid);
218 g_free(msg_uuid);
219 return -1;
220 }
221
222 if (msg.id > migrate_multifd_channels()) {
223 error_setg(errp, "multifd: received channel id %u is greater than "
224 "number of channels %u", msg.id, migrate_multifd_channels());
225 return -1;
226 }
227
228 return msg.id;
229 }
230
231 /* Fills a RAM multifd packet */
multifd_send_fill_packet(MultiFDSendParams * p)232 void multifd_send_fill_packet(MultiFDSendParams *p)
233 {
234 MultiFDPacket_t *packet = p->packet;
235 uint64_t packet_num;
236 bool sync_packet = p->flags & MULTIFD_FLAG_SYNC;
237
238 memset(packet, 0, p->packet_len);
239
240 packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
241 packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);
242
243 packet->hdr.flags = cpu_to_be32(p->flags);
244 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
245
246 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
247 packet->packet_num = cpu_to_be64(packet_num);
248
249 p->packets_sent++;
250
251 if (!sync_packet) {
252 multifd_ram_fill_packet(p);
253 }
254
255 trace_multifd_send_fill(p->id, packet_num,
256 p->flags, p->next_packet_size);
257 }
258
multifd_recv_unfill_packet_header(MultiFDRecvParams * p,const MultiFDPacketHdr_t * hdr,Error ** errp)259 static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
260 const MultiFDPacketHdr_t *hdr,
261 Error **errp)
262 {
263 uint32_t magic = be32_to_cpu(hdr->magic);
264 uint32_t version = be32_to_cpu(hdr->version);
265
266 if (magic != MULTIFD_MAGIC) {
267 error_setg(errp, "multifd: received packet magic %x, expected %x",
268 magic, MULTIFD_MAGIC);
269 return -1;
270 }
271
272 if (version != MULTIFD_VERSION) {
273 error_setg(errp, "multifd: received packet version %u, expected %u",
274 version, MULTIFD_VERSION);
275 return -1;
276 }
277
278 p->flags = be32_to_cpu(hdr->flags);
279
280 return 0;
281 }
282
multifd_recv_unfill_packet_device_state(MultiFDRecvParams * p,Error ** errp)283 static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
284 Error **errp)
285 {
286 MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
287
288 packet->instance_id = be32_to_cpu(packet->instance_id);
289 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
290
291 return 0;
292 }
293
multifd_recv_unfill_packet_ram(MultiFDRecvParams * p,Error ** errp)294 static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
295 {
296 const MultiFDPacket_t *packet = p->packet;
297 int ret = 0;
298
299 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
300 p->packet_num = be64_to_cpu(packet->packet_num);
301
302 /* Always unfill, old QEMUs (<9.0) send data along with SYNC */
303 ret = multifd_ram_unfill_packet(p, errp);
304
305 trace_multifd_recv_unfill(p->id, p->packet_num, p->flags,
306 p->next_packet_size);
307
308 return ret;
309 }
310
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)311 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
312 {
313 p->packets_recved++;
314
315 if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
316 return multifd_recv_unfill_packet_device_state(p, errp);
317 }
318
319 return multifd_recv_unfill_packet_ram(p, errp);
320 }
321
multifd_send_should_exit(void)322 static bool multifd_send_should_exit(void)
323 {
324 return qatomic_read(&multifd_send_state->exiting);
325 }
326
multifd_recv_should_exit(void)327 static bool multifd_recv_should_exit(void)
328 {
329 return qatomic_read(&multifd_recv_state->exiting);
330 }
331
332 /*
333 * The migration thread can wait on either of the two semaphores. This
334 * function can be used to kick the main thread out of waiting on either of
335 * them. Should mostly only be called when something wrong happened with
336 * the current multifd send thread.
337 */
multifd_send_kick_main(MultiFDSendParams * p)338 static void multifd_send_kick_main(MultiFDSendParams *p)
339 {
340 qemu_sem_post(&p->sem_sync);
341 qemu_sem_post(&multifd_send_state->channels_ready);
342 }
343
344 /*
345 * multifd_send() works by exchanging the MultiFDSendData object
346 * provided by the caller with an unused MultiFDSendData object from
347 * the next channel that is found to be idle.
348 *
349 * The channel owns the data until it finishes transmitting and the
350 * caller owns the empty object until it fills it with data and calls
351 * this function again. No locking necessary.
352 *
353 * Switching is safe because both the migration thread and the channel
354 * thread have barriers in place to serialize access.
355 *
356 * Returns true if succeed, false otherwise.
357 */
multifd_send(MultiFDSendData ** send_data)358 bool multifd_send(MultiFDSendData **send_data)
359 {
360 int i;
361 static int next_channel;
362 MultiFDSendParams *p = NULL; /* make happy gcc */
363 MultiFDSendData *tmp;
364
365 if (multifd_send_should_exit()) {
366 return false;
367 }
368
369 QEMU_LOCK_GUARD(&multifd_send_state->multifd_send_mutex);
370
371 /* We wait here, until at least one channel is ready */
372 qemu_sem_wait(&multifd_send_state->channels_ready);
373
374 /*
375 * next_channel can remain from a previous migration that was
376 * using more channels, so ensure it doesn't overflow if the
377 * limit is lower now.
378 */
379 next_channel %= migrate_multifd_channels();
380 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
381 if (multifd_send_should_exit()) {
382 return false;
383 }
384 p = &multifd_send_state->params[i];
385 /*
386 * Lockless read to p->pending_job is safe, because only multifd
387 * sender thread can clear it.
388 */
389 if (qatomic_read(&p->pending_job) == false) {
390 next_channel = (i + 1) % migrate_multifd_channels();
391 break;
392 }
393 }
394
395 /*
396 * Make sure we read p->pending_job before all the rest. Pairs with
397 * qatomic_store_release() in multifd_send_thread().
398 */
399 smp_mb_acquire();
400
401 assert(multifd_payload_empty(p->data));
402
403 /*
404 * Swap the pointers. The channel gets the client data for
405 * transferring and the client gets back an unused data slot.
406 */
407 tmp = *send_data;
408 *send_data = p->data;
409 p->data = tmp;
410
411 /*
412 * Making sure p->data is setup before marking pending_job=true. Pairs
413 * with the qatomic_load_acquire() in multifd_send_thread().
414 */
415 qatomic_store_release(&p->pending_job, true);
416 qemu_sem_post(&p->sem);
417
418 return true;
419 }
420
421 /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)422 static void multifd_send_set_error(Error *err)
423 {
424 /*
425 * We don't want to exit each threads twice. Depending on where
426 * we get the error, or if there are two independent errors in two
427 * threads at the same time, we can end calling this function
428 * twice.
429 */
430 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
431 return;
432 }
433
434 if (err) {
435 MigrationState *s = migrate_get_current();
436 migrate_set_error(s, err);
437 if (s->state == MIGRATION_STATUS_SETUP ||
438 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
439 s->state == MIGRATION_STATUS_DEVICE ||
440 s->state == MIGRATION_STATUS_ACTIVE) {
441 migrate_set_state(&s->state, s->state,
442 MIGRATION_STATUS_FAILED);
443 }
444 }
445 }
446
multifd_send_terminate_threads(void)447 static void multifd_send_terminate_threads(void)
448 {
449 int i;
450
451 trace_multifd_send_terminate_threads();
452
453 /*
454 * Tell everyone we're quitting. No xchg() needed here; we simply
455 * always set it.
456 */
457 qatomic_set(&multifd_send_state->exiting, 1);
458
459 /*
460 * Firstly, kick all threads out; no matter whether they are just idle,
461 * or blocked in an IO system call.
462 */
463 for (i = 0; i < migrate_multifd_channels(); i++) {
464 MultiFDSendParams *p = &multifd_send_state->params[i];
465
466 qemu_sem_post(&p->sem);
467 if (p->c) {
468 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
469 }
470 }
471
472 /*
473 * Finally recycle all the threads.
474 */
475 for (i = 0; i < migrate_multifd_channels(); i++) {
476 MultiFDSendParams *p = &multifd_send_state->params[i];
477
478 if (p->tls_thread_created) {
479 qemu_thread_join(&p->tls_thread);
480 }
481
482 if (p->thread_created) {
483 qemu_thread_join(&p->thread);
484 }
485 }
486 }
487
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)488 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
489 {
490 if (p->c) {
491 migration_ioc_unregister_yank(p->c);
492 /*
493 * The object_unref() cannot guarantee the fd will always be
494 * released because finalize() of the iochannel is only
495 * triggered on the last reference and it's not guaranteed
496 * that we always hold the last refcount when reaching here.
497 *
498 * Closing the fd explicitly has the benefit that if there is any
499 * registered I/O handler callbacks on such fd, that will get a
500 * POLLNVAL event and will further trigger the cleanup to finally
501 * release the IOC.
502 *
503 * FIXME: It should logically be guaranteed that all multifd
504 * channels have no I/O handler callback registered when reaching
505 * here, because migration thread will wait for all multifd channel
506 * establishments to complete during setup. Since
507 * migration_cleanup() will be scheduled in main thread too, all
508 * previous callbacks should guarantee to be completed when
509 * reaching here. See multifd_send_state.channels_created and its
510 * usage. In the future, we could replace this with an assert
511 * making sure we're the last reference, or simply drop it if above
512 * is more clear to be justified.
513 */
514 qio_channel_close(p->c, &error_abort);
515 object_unref(OBJECT(p->c));
516 p->c = NULL;
517 }
518 qemu_sem_destroy(&p->sem);
519 qemu_sem_destroy(&p->sem_sync);
520 g_free(p->name);
521 p->name = NULL;
522 g_clear_pointer(&p->data, multifd_send_data_free);
523 p->packet_len = 0;
524 g_clear_pointer(&p->packet_device_state, g_free);
525 g_free(p->packet);
526 p->packet = NULL;
527 multifd_send_state->ops->send_cleanup(p, errp);
528 assert(!p->iov);
529
530 return *errp == NULL;
531 }
532
multifd_send_cleanup_state(void)533 static void multifd_send_cleanup_state(void)
534 {
535 file_cleanup_outgoing_migration();
536 socket_cleanup_outgoing_migration();
537 multifd_device_state_send_cleanup();
538 qemu_sem_destroy(&multifd_send_state->channels_created);
539 qemu_sem_destroy(&multifd_send_state->channels_ready);
540 qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
541 g_free(multifd_send_state->params);
542 multifd_send_state->params = NULL;
543 g_free(multifd_send_state);
544 multifd_send_state = NULL;
545 }
546
multifd_send_shutdown(void)547 void multifd_send_shutdown(void)
548 {
549 int i;
550
551 if (!migrate_multifd()) {
552 return;
553 }
554
555 for (i = 0; i < migrate_multifd_channels(); i++) {
556 MultiFDSendParams *p = &multifd_send_state->params[i];
557
558 /* thread_created implies the TLS handshake has succeeded */
559 if (p->tls_thread_created && p->thread_created) {
560 Error *local_err = NULL;
561 /*
562 * The destination expects the TLS session to always be
563 * properly terminated. This helps to detect a premature
564 * termination in the middle of the stream. Note that
565 * older QEMUs always break the connection on the source
566 * and the destination always sees
567 * GNUTLS_E_PREMATURE_TERMINATION.
568 */
569 migration_tls_channel_end(p->c, &local_err);
570
571 /*
572 * The above can return an error in case the migration has
573 * already failed. If the migration succeeded, errors are
574 * not expected but there's no need to kill the source.
575 */
576 if (local_err && !migration_has_failed(migrate_get_current())) {
577 warn_report(
578 "multifd_send_%d: Failed to terminate TLS connection: %s",
579 p->id, error_get_pretty(local_err));
580 break;
581 }
582 }
583 }
584
585 multifd_send_terminate_threads();
586
587 for (i = 0; i < migrate_multifd_channels(); i++) {
588 MultiFDSendParams *p = &multifd_send_state->params[i];
589 Error *local_err = NULL;
590
591 if (!multifd_send_cleanup_channel(p, &local_err)) {
592 migrate_set_error(migrate_get_current(), local_err);
593 error_free(local_err);
594 }
595 }
596
597 multifd_send_cleanup_state();
598 }
599
multifd_zero_copy_flush(QIOChannel * c)600 static int multifd_zero_copy_flush(QIOChannel *c)
601 {
602 int ret;
603 Error *err = NULL;
604
605 ret = qio_channel_flush(c, &err);
606 if (ret < 0) {
607 error_report_err(err);
608 return -1;
609 }
610 if (ret == 1) {
611 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
612 }
613
614 return ret;
615 }
616
multifd_send_sync_main(MultiFDSyncReq req)617 int multifd_send_sync_main(MultiFDSyncReq req)
618 {
619 int i;
620 bool flush_zero_copy;
621
622 assert(req != MULTIFD_SYNC_NONE);
623
624 flush_zero_copy = migrate_zero_copy_send();
625
626 for (i = 0; i < migrate_multifd_channels(); i++) {
627 MultiFDSendParams *p = &multifd_send_state->params[i];
628
629 if (multifd_send_should_exit()) {
630 return -1;
631 }
632
633 trace_multifd_send_sync_main_signal(p->id);
634
635 /*
636 * We should be the only user so far, so not possible to be set by
637 * others concurrently.
638 */
639 assert(qatomic_read(&p->pending_sync) == MULTIFD_SYNC_NONE);
640 qatomic_set(&p->pending_sync, req);
641 qemu_sem_post(&p->sem);
642 }
643 for (i = 0; i < migrate_multifd_channels(); i++) {
644 MultiFDSendParams *p = &multifd_send_state->params[i];
645
646 if (multifd_send_should_exit()) {
647 return -1;
648 }
649
650 qemu_sem_wait(&multifd_send_state->channels_ready);
651 trace_multifd_send_sync_main_wait(p->id);
652 qemu_sem_wait(&p->sem_sync);
653
654 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
655 return -1;
656 }
657 }
658 trace_multifd_send_sync_main(multifd_send_state->packet_num);
659
660 return 0;
661 }
662
multifd_send_thread(void * opaque)663 static void *multifd_send_thread(void *opaque)
664 {
665 MultiFDSendParams *p = opaque;
666 MigrationThread *thread = NULL;
667 Error *local_err = NULL;
668 int ret = 0;
669 bool use_packets = multifd_use_packets();
670
671 thread = migration_threads_add(p->name, qemu_get_thread_id());
672
673 trace_multifd_send_thread_start(p->id);
674 rcu_register_thread();
675
676 if (use_packets) {
677 if (multifd_send_initial_packet(p, &local_err) < 0) {
678 ret = -1;
679 goto out;
680 }
681 }
682
683 while (true) {
684 qemu_sem_post(&multifd_send_state->channels_ready);
685 qemu_sem_wait(&p->sem);
686
687 if (multifd_send_should_exit()) {
688 break;
689 }
690
691 /*
692 * Read pending_job flag before p->data. Pairs with the
693 * qatomic_store_release() in multifd_send().
694 */
695 if (qatomic_load_acquire(&p->pending_job)) {
696 bool is_device_state = multifd_payload_device_state(p->data);
697 size_t total_size;
698 int write_flags_masked = 0;
699
700 p->flags = 0;
701 p->iovs_num = 0;
702 assert(!multifd_payload_empty(p->data));
703
704 if (is_device_state) {
705 multifd_device_state_send_prepare(p);
706
707 /* Device state packets cannot be sent via zerocopy */
708 write_flags_masked |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
709 } else {
710 ret = multifd_send_state->ops->send_prepare(p, &local_err);
711 if (ret != 0) {
712 break;
713 }
714 }
715
716 /*
717 * The packet header in the zerocopy RAM case is accounted for
718 * in multifd_nocomp_send_prepare() - where it is actually
719 * being sent.
720 */
721 total_size = iov_size(p->iov, p->iovs_num);
722
723 if (migrate_mapped_ram()) {
724 assert(!is_device_state);
725
726 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
727 &p->data->u.ram, &local_err);
728 } else {
729 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
730 NULL, 0,
731 p->write_flags & ~write_flags_masked,
732 &local_err);
733 }
734
735 if (ret != 0) {
736 break;
737 }
738
739 stat64_add(&mig_stats.multifd_bytes, total_size);
740
741 p->next_packet_size = 0;
742 multifd_send_data_clear(p->data);
743
744 /*
745 * Making sure p->data is published before saying "we're
746 * free". Pairs with the smp_mb_acquire() in
747 * multifd_send().
748 */
749 qatomic_store_release(&p->pending_job, false);
750 } else {
751 MultiFDSyncReq req = qatomic_read(&p->pending_sync);
752
753 /*
754 * If not a normal job, must be a sync request. Note that
755 * pending_sync is a standalone flag (unlike pending_job), so
756 * it doesn't require explicit memory barriers.
757 */
758 assert(req != MULTIFD_SYNC_NONE);
759
760 /* Only push the SYNC message if it involves a remote sync */
761 if (req == MULTIFD_SYNC_ALL) {
762 p->flags = MULTIFD_FLAG_SYNC;
763 multifd_send_fill_packet(p);
764 ret = qio_channel_write_all(p->c, (void *)p->packet,
765 p->packet_len, &local_err);
766 if (ret != 0) {
767 break;
768 }
769 /* p->next_packet_size will always be zero for a SYNC packet */
770 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
771 }
772
773 qatomic_set(&p->pending_sync, MULTIFD_SYNC_NONE);
774 qemu_sem_post(&p->sem_sync);
775 }
776 }
777
778 out:
779 if (ret) {
780 assert(local_err);
781 trace_multifd_send_error(p->id);
782 multifd_send_set_error(local_err);
783 multifd_send_kick_main(p);
784 error_free(local_err);
785 }
786
787 rcu_unregister_thread();
788 migration_threads_remove(thread);
789 trace_multifd_send_thread_end(p->id, p->packets_sent);
790
791 return NULL;
792 }
793
794 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
795
796 typedef struct {
797 MultiFDSendParams *p;
798 QIOChannelTLS *tioc;
799 } MultiFDTLSThreadArgs;
800
multifd_tls_handshake_thread(void * opaque)801 static void *multifd_tls_handshake_thread(void *opaque)
802 {
803 MultiFDTLSThreadArgs *args = opaque;
804
805 qio_channel_tls_handshake(args->tioc,
806 multifd_new_send_channel_async,
807 args->p,
808 NULL,
809 NULL);
810 g_free(args);
811
812 return NULL;
813 }
814
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)815 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
816 QIOChannel *ioc,
817 Error **errp)
818 {
819 MigrationState *s = migrate_get_current();
820 const char *hostname = s->hostname;
821 MultiFDTLSThreadArgs *args;
822 QIOChannelTLS *tioc;
823
824 tioc = migration_tls_client_create(ioc, hostname, errp);
825 if (!tioc) {
826 return false;
827 }
828
829 /*
830 * Ownership of the socket channel now transfers to the newly
831 * created TLS channel, which has already taken a reference.
832 */
833 object_unref(OBJECT(ioc));
834 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
835 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
836
837 args = g_new0(MultiFDTLSThreadArgs, 1);
838 args->tioc = tioc;
839 args->p = p;
840
841 p->tls_thread_created = true;
842 qemu_thread_create(&p->tls_thread, MIGRATION_THREAD_SRC_TLS,
843 multifd_tls_handshake_thread, args,
844 QEMU_THREAD_JOINABLE);
845 return true;
846 }
847
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)848 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
849 {
850 qio_channel_set_delay(ioc, false);
851
852 migration_ioc_register_yank(ioc);
853 /* Setup p->c only if the channel is completely setup */
854 p->c = ioc;
855
856 p->thread_created = true;
857 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
858 QEMU_THREAD_JOINABLE);
859 }
860
861 /*
862 * When TLS is enabled this function is called once to establish the
863 * TLS connection and a second time after the TLS handshake to create
864 * the multifd channel. Without TLS it goes straight into the channel
865 * creation.
866 */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)867 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
868 {
869 MultiFDSendParams *p = opaque;
870 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
871 Error *local_err = NULL;
872 bool ret;
873
874 trace_multifd_new_send_channel_async(p->id);
875
876 if (qio_task_propagate_error(task, &local_err)) {
877 ret = false;
878 goto out;
879 }
880
881 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
882 migrate_get_current()->hostname);
883
884 if (migrate_channel_requires_tls_upgrade(ioc)) {
885 ret = multifd_tls_channel_connect(p, ioc, &local_err);
886 if (ret) {
887 return;
888 }
889 } else {
890 multifd_channel_connect(p, ioc);
891 ret = true;
892 }
893
894 out:
895 /*
896 * Here we're not interested whether creation succeeded, only that
897 * it happened at all.
898 */
899 multifd_send_channel_created();
900
901 if (ret) {
902 return;
903 }
904
905 trace_multifd_new_send_channel_async_error(p->id, local_err);
906 multifd_send_set_error(local_err);
907 /*
908 * For error cases (TLS or non-TLS), IO channel is always freed here
909 * rather than when cleanup multifd: since p->c is not set, multifd
910 * cleanup code doesn't even know its existence.
911 */
912 object_unref(OBJECT(ioc));
913 error_free(local_err);
914 }
915
multifd_new_send_channel_create(gpointer opaque,Error ** errp)916 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
917 {
918 if (!multifd_use_packets()) {
919 return file_send_channel_create(opaque, errp);
920 }
921
922 socket_send_channel_create(multifd_new_send_channel_async, opaque);
923 return true;
924 }
925
multifd_send_setup(void)926 bool multifd_send_setup(void)
927 {
928 MigrationState *s = migrate_get_current();
929 int thread_count, ret = 0;
930 uint32_t page_count = multifd_ram_page_count();
931 bool use_packets = multifd_use_packets();
932 uint8_t i;
933
934 if (!migrate_multifd()) {
935 return true;
936 }
937
938 thread_count = migrate_multifd_channels();
939 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
940 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
941 qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
942 qemu_sem_init(&multifd_send_state->channels_created, 0);
943 qemu_sem_init(&multifd_send_state->channels_ready, 0);
944 qatomic_set(&multifd_send_state->exiting, 0);
945 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
946
947 for (i = 0; i < thread_count; i++) {
948 MultiFDSendParams *p = &multifd_send_state->params[i];
949 Error *local_err = NULL;
950
951 qemu_sem_init(&p->sem, 0);
952 qemu_sem_init(&p->sem_sync, 0);
953 p->id = i;
954 p->data = multifd_send_data_alloc();
955
956 if (use_packets) {
957 p->packet_len = sizeof(MultiFDPacket_t)
958 + sizeof(uint64_t) * page_count;
959 p->packet = g_malloc0(p->packet_len);
960 p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state));
961 p->packet_device_state->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
962 p->packet_device_state->hdr.version = cpu_to_be32(MULTIFD_VERSION);
963 }
964 p->name = g_strdup_printf(MIGRATION_THREAD_SRC_MULTIFD, i);
965 p->write_flags = 0;
966
967 if (!multifd_new_send_channel_create(p, &local_err)) {
968 migrate_set_error(s, local_err);
969 ret = -1;
970 }
971 }
972
973 /*
974 * Wait until channel creation has started for all channels. The
975 * creation can still fail, but no more channels will be created
976 * past this point.
977 */
978 for (i = 0; i < thread_count; i++) {
979 qemu_sem_wait(&multifd_send_state->channels_created);
980 }
981
982 if (ret) {
983 goto err;
984 }
985
986 for (i = 0; i < thread_count; i++) {
987 MultiFDSendParams *p = &multifd_send_state->params[i];
988 Error *local_err = NULL;
989
990 ret = multifd_send_state->ops->send_setup(p, &local_err);
991 if (ret) {
992 migrate_set_error(s, local_err);
993 goto err;
994 }
995 assert(p->iov);
996 }
997
998 multifd_device_state_send_setup();
999
1000 return true;
1001
1002 err:
1003 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1004 MIGRATION_STATUS_FAILED);
1005 return false;
1006 }
1007
multifd_recv(void)1008 bool multifd_recv(void)
1009 {
1010 int i;
1011 static int next_recv_channel;
1012 MultiFDRecvParams *p = NULL;
1013 MultiFDRecvData *data = multifd_recv_state->data;
1014
1015 /*
1016 * next_channel can remain from a previous migration that was
1017 * using more channels, so ensure it doesn't overflow if the
1018 * limit is lower now.
1019 */
1020 next_recv_channel %= migrate_multifd_channels();
1021 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1022 if (multifd_recv_should_exit()) {
1023 return false;
1024 }
1025
1026 p = &multifd_recv_state->params[i];
1027
1028 if (qatomic_read(&p->pending_job) == false) {
1029 next_recv_channel = (i + 1) % migrate_multifd_channels();
1030 break;
1031 }
1032 }
1033
1034 /*
1035 * Order pending_job read before manipulating p->data below. Pairs
1036 * with qatomic_store_release() at multifd_recv_thread().
1037 */
1038 smp_mb_acquire();
1039
1040 assert(!p->data->size);
1041 multifd_recv_state->data = p->data;
1042 p->data = data;
1043
1044 /*
1045 * Order p->data update before setting pending_job. Pairs with
1046 * qatomic_load_acquire() at multifd_recv_thread().
1047 */
1048 qatomic_store_release(&p->pending_job, true);
1049 qemu_sem_post(&p->sem);
1050
1051 return true;
1052 }
1053
multifd_get_recv_data(void)1054 MultiFDRecvData *multifd_get_recv_data(void)
1055 {
1056 return multifd_recv_state->data;
1057 }
1058
multifd_recv_terminate_threads(Error * err)1059 static void multifd_recv_terminate_threads(Error *err)
1060 {
1061 int i;
1062
1063 trace_multifd_recv_terminate_threads(err != NULL);
1064
1065 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1066 return;
1067 }
1068
1069 if (err) {
1070 MigrationState *s = migrate_get_current();
1071 migrate_set_error(s, err);
1072 if (s->state == MIGRATION_STATUS_SETUP ||
1073 s->state == MIGRATION_STATUS_ACTIVE) {
1074 migrate_set_state(&s->state, s->state,
1075 MIGRATION_STATUS_FAILED);
1076 }
1077 }
1078
1079 for (i = 0; i < migrate_multifd_channels(); i++) {
1080 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1081
1082 /*
1083 * The migration thread and channels interact differently
1084 * depending on the presence of packets.
1085 */
1086 if (multifd_use_packets()) {
1087 /*
1088 * The channel receives as long as there are packets. When
1089 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1090 * channel waits for the migration thread to sync. If the
1091 * sync never happens, do it here.
1092 */
1093 qemu_sem_post(&p->sem_sync);
1094 } else {
1095 /*
1096 * The channel waits for the migration thread to give it
1097 * work. When the migration thread runs out of work, it
1098 * releases the channel and waits for any pending work to
1099 * finish. If we reach here (e.g. due to error) before the
1100 * work runs out, release the channel.
1101 */
1102 qemu_sem_post(&p->sem);
1103 }
1104
1105 /*
1106 * We could arrive here for two reasons:
1107 * - normal quit, i.e. everything went fine, just finished
1108 * - error quit: We close the channels so the channel threads
1109 * finish the qio_channel_read_all_eof()
1110 */
1111 if (p->c) {
1112 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1113 }
1114 }
1115 }
1116
multifd_recv_shutdown(void)1117 void multifd_recv_shutdown(void)
1118 {
1119 if (migrate_multifd()) {
1120 multifd_recv_terminate_threads(NULL);
1121 }
1122 }
1123
multifd_recv_cleanup_channel(MultiFDRecvParams * p)1124 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1125 {
1126 migration_ioc_unregister_yank(p->c);
1127 object_unref(OBJECT(p->c));
1128 p->c = NULL;
1129 qemu_mutex_destroy(&p->mutex);
1130 qemu_sem_destroy(&p->sem_sync);
1131 qemu_sem_destroy(&p->sem);
1132 g_free(p->data);
1133 p->data = NULL;
1134 g_free(p->name);
1135 p->name = NULL;
1136 p->packet_len = 0;
1137 g_free(p->packet);
1138 p->packet = NULL;
1139 g_clear_pointer(&p->packet_dev_state, g_free);
1140 g_free(p->normal);
1141 p->normal = NULL;
1142 g_free(p->zero);
1143 p->zero = NULL;
1144 multifd_recv_state->ops->recv_cleanup(p);
1145 }
1146
multifd_recv_cleanup_state(void)1147 static void multifd_recv_cleanup_state(void)
1148 {
1149 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1150 g_free(multifd_recv_state->params);
1151 multifd_recv_state->params = NULL;
1152 g_free(multifd_recv_state->data);
1153 multifd_recv_state->data = NULL;
1154 g_free(multifd_recv_state);
1155 multifd_recv_state = NULL;
1156 }
1157
multifd_recv_cleanup(void)1158 void multifd_recv_cleanup(void)
1159 {
1160 int i;
1161
1162 if (!migrate_multifd()) {
1163 return;
1164 }
1165 multifd_recv_terminate_threads(NULL);
1166 for (i = 0; i < migrate_multifd_channels(); i++) {
1167 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1168
1169 if (p->thread_created) {
1170 qemu_thread_join(&p->thread);
1171 }
1172 }
1173 for (i = 0; i < migrate_multifd_channels(); i++) {
1174 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1175 }
1176 multifd_recv_cleanup_state();
1177 }
1178
multifd_recv_sync_main(void)1179 void multifd_recv_sync_main(void)
1180 {
1181 int thread_count = migrate_multifd_channels();
1182 bool file_based = !multifd_use_packets();
1183 int i;
1184
1185 if (!migrate_multifd()) {
1186 return;
1187 }
1188
1189 /*
1190 * File-based channels don't use packets and therefore need to
1191 * wait for more work. Release them to start the sync.
1192 */
1193 if (file_based) {
1194 for (i = 0; i < thread_count; i++) {
1195 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1196
1197 trace_multifd_recv_sync_main_signal(p->id);
1198 qemu_sem_post(&p->sem);
1199 }
1200 }
1201
1202 /*
1203 * Initiate the synchronization by waiting for all channels.
1204 *
1205 * For socket-based migration this means each channel has received
1206 * the SYNC packet on the stream.
1207 *
1208 * For file-based migration this means each channel is done with
1209 * the work (pending_job=false).
1210 */
1211 for (i = 0; i < thread_count; i++) {
1212 trace_multifd_recv_sync_main_wait(i);
1213 qemu_sem_wait(&multifd_recv_state->sem_sync);
1214 }
1215
1216 if (file_based) {
1217 /*
1218 * For file-based loading is done in one iteration. We're
1219 * done.
1220 */
1221 return;
1222 }
1223
1224 /*
1225 * Sync done. Release the channels for the next iteration.
1226 */
1227 for (i = 0; i < thread_count; i++) {
1228 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1229
1230 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1231 if (multifd_recv_state->packet_num < p->packet_num) {
1232 multifd_recv_state->packet_num = p->packet_num;
1233 }
1234 }
1235 trace_multifd_recv_sync_main_signal(p->id);
1236 qemu_sem_post(&p->sem_sync);
1237 }
1238 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1239 }
1240
multifd_device_state_recv(MultiFDRecvParams * p,Error ** errp)1241 static int multifd_device_state_recv(MultiFDRecvParams *p, Error **errp)
1242 {
1243 g_autofree char *dev_state_buf = NULL;
1244 int ret;
1245
1246 dev_state_buf = g_malloc(p->next_packet_size);
1247
1248 ret = qio_channel_read_all(p->c, dev_state_buf, p->next_packet_size, errp);
1249 if (ret != 0) {
1250 return ret;
1251 }
1252
1253 if (p->packet_dev_state->idstr[sizeof(p->packet_dev_state->idstr) - 1]
1254 != 0) {
1255 error_setg(errp, "unterminated multifd device state idstr");
1256 return -1;
1257 }
1258
1259 if (!qemu_loadvm_load_state_buffer(p->packet_dev_state->idstr,
1260 p->packet_dev_state->instance_id,
1261 dev_state_buf, p->next_packet_size,
1262 errp)) {
1263 ret = -1;
1264 }
1265
1266 return ret;
1267 }
1268
multifd_recv_thread(void * opaque)1269 static void *multifd_recv_thread(void *opaque)
1270 {
1271 MigrationState *s = migrate_get_current();
1272 MultiFDRecvParams *p = opaque;
1273 Error *local_err = NULL;
1274 bool use_packets = multifd_use_packets();
1275 int ret;
1276
1277 trace_multifd_recv_thread_start(p->id);
1278 rcu_register_thread();
1279
1280 if (!s->multifd_clean_tls_termination) {
1281 p->read_flags = QIO_CHANNEL_READ_FLAG_RELAXED_EOF;
1282 }
1283
1284 while (true) {
1285 MultiFDPacketHdr_t hdr;
1286 uint32_t flags = 0;
1287 bool is_device_state = false;
1288 bool has_data = false;
1289 uint8_t *pkt_buf;
1290 size_t pkt_len;
1291
1292 p->normal_num = 0;
1293
1294 if (use_packets) {
1295 struct iovec iov = {
1296 .iov_base = (void *)&hdr,
1297 .iov_len = sizeof(hdr)
1298 };
1299
1300 if (multifd_recv_should_exit()) {
1301 break;
1302 }
1303
1304 ret = qio_channel_readv_full_all_eof(p->c, &iov, 1, NULL, NULL,
1305 p->read_flags, &local_err);
1306 if (!ret) {
1307 /* EOF */
1308 assert(!local_err);
1309 break;
1310 }
1311
1312 if (ret == -1) {
1313 break;
1314 }
1315
1316 ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
1317 if (ret) {
1318 break;
1319 }
1320
1321 is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
1322 if (is_device_state) {
1323 pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
1324 pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
1325 } else {
1326 pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
1327 pkt_len = p->packet_len - sizeof(hdr);
1328 }
1329
1330 ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
1331 &local_err);
1332 if (!ret) {
1333 /* EOF */
1334 error_setg(&local_err, "multifd: unexpected EOF after packet header");
1335 break;
1336 }
1337
1338 if (ret == -1) {
1339 break;
1340 }
1341
1342 qemu_mutex_lock(&p->mutex);
1343 ret = multifd_recv_unfill_packet(p, &local_err);
1344 if (ret) {
1345 qemu_mutex_unlock(&p->mutex);
1346 break;
1347 }
1348
1349 flags = p->flags;
1350 /* recv methods don't know how to handle the SYNC flag */
1351 p->flags &= ~MULTIFD_FLAG_SYNC;
1352
1353 if (is_device_state) {
1354 has_data = p->next_packet_size > 0;
1355 } else {
1356 /*
1357 * Even if it's a SYNC packet, this needs to be set
1358 * because older QEMUs (<9.0) still send data along with
1359 * the SYNC packet.
1360 */
1361 has_data = p->normal_num || p->zero_num;
1362 }
1363
1364 qemu_mutex_unlock(&p->mutex);
1365 } else {
1366 /*
1367 * No packets, so we need to wait for the vmstate code to
1368 * give us work.
1369 */
1370 qemu_sem_wait(&p->sem);
1371
1372 if (multifd_recv_should_exit()) {
1373 break;
1374 }
1375
1376 /* pairs with qatomic_store_release() at multifd_recv() */
1377 if (!qatomic_load_acquire(&p->pending_job)) {
1378 /*
1379 * Migration thread did not send work, this is
1380 * equivalent to pending_sync on the sending
1381 * side. Post sem_sync to notify we reached this
1382 * point.
1383 */
1384 qemu_sem_post(&multifd_recv_state->sem_sync);
1385 continue;
1386 }
1387
1388 has_data = !!p->data->size;
1389 }
1390
1391 if (has_data) {
1392 if (is_device_state) {
1393 assert(use_packets);
1394 ret = multifd_device_state_recv(p, &local_err);
1395 } else {
1396 ret = multifd_recv_state->ops->recv(p, &local_err);
1397 }
1398 if (ret != 0) {
1399 break;
1400 }
1401 } else if (is_device_state) {
1402 error_setg(&local_err,
1403 "multifd: received empty device state packet");
1404 break;
1405 }
1406
1407 if (use_packets) {
1408 if (flags & MULTIFD_FLAG_SYNC) {
1409 if (is_device_state) {
1410 error_setg(&local_err,
1411 "multifd: received SYNC device state packet");
1412 break;
1413 }
1414
1415 qemu_sem_post(&multifd_recv_state->sem_sync);
1416 qemu_sem_wait(&p->sem_sync);
1417 }
1418 } else {
1419 p->data->size = 0;
1420 /*
1421 * Order data->size update before clearing
1422 * pending_job. Pairs with smp_mb_acquire() at
1423 * multifd_recv().
1424 */
1425 qatomic_store_release(&p->pending_job, false);
1426 }
1427 }
1428
1429 if (local_err) {
1430 multifd_recv_terminate_threads(local_err);
1431 error_free(local_err);
1432 }
1433
1434 rcu_unregister_thread();
1435 trace_multifd_recv_thread_end(p->id, p->packets_recved);
1436
1437 return NULL;
1438 }
1439
multifd_recv_setup(Error ** errp)1440 int multifd_recv_setup(Error **errp)
1441 {
1442 int thread_count;
1443 uint32_t page_count = multifd_ram_page_count();
1444 bool use_packets = multifd_use_packets();
1445 uint8_t i;
1446
1447 /*
1448 * Return successfully if multiFD recv state is already initialised
1449 * or multiFD is not enabled.
1450 */
1451 if (multifd_recv_state || !migrate_multifd()) {
1452 return 0;
1453 }
1454
1455 thread_count = migrate_multifd_channels();
1456 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1457 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1458
1459 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1460 multifd_recv_state->data->size = 0;
1461
1462 qatomic_set(&multifd_recv_state->count, 0);
1463 qatomic_set(&multifd_recv_state->exiting, 0);
1464 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1465 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1466
1467 for (i = 0; i < thread_count; i++) {
1468 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1469
1470 qemu_mutex_init(&p->mutex);
1471 qemu_sem_init(&p->sem_sync, 0);
1472 qemu_sem_init(&p->sem, 0);
1473 p->pending_job = false;
1474 p->id = i;
1475
1476 p->data = g_new0(MultiFDRecvData, 1);
1477 p->data->size = 0;
1478
1479 if (use_packets) {
1480 p->packet_len = sizeof(MultiFDPacket_t)
1481 + sizeof(uint64_t) * page_count;
1482 p->packet = g_malloc0(p->packet_len);
1483 p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
1484 }
1485 p->name = g_strdup_printf(MIGRATION_THREAD_DST_MULTIFD, i);
1486 p->normal = g_new0(ram_addr_t, page_count);
1487 p->zero = g_new0(ram_addr_t, page_count);
1488 }
1489
1490 for (i = 0; i < thread_count; i++) {
1491 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1492 int ret;
1493
1494 ret = multifd_recv_state->ops->recv_setup(p, errp);
1495 if (ret) {
1496 return ret;
1497 }
1498 }
1499 return 0;
1500 }
1501
multifd_recv_all_channels_created(void)1502 bool multifd_recv_all_channels_created(void)
1503 {
1504 int thread_count = migrate_multifd_channels();
1505
1506 if (!migrate_multifd()) {
1507 return true;
1508 }
1509
1510 if (!multifd_recv_state) {
1511 /* Called before any connections created */
1512 return false;
1513 }
1514
1515 return thread_count == qatomic_read(&multifd_recv_state->count);
1516 }
1517
1518 /*
1519 * Try to receive all multifd channels to get ready for the migration.
1520 * Sets @errp when failing to receive the current channel.
1521 */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)1522 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1523 {
1524 MultiFDRecvParams *p;
1525 Error *local_err = NULL;
1526 bool use_packets = multifd_use_packets();
1527 int id;
1528
1529 if (use_packets) {
1530 id = multifd_recv_initial_packet(ioc, &local_err);
1531 if (id < 0) {
1532 multifd_recv_terminate_threads(local_err);
1533 error_propagate_prepend(errp, local_err,
1534 "failed to receive packet"
1535 " via multifd channel %d: ",
1536 qatomic_read(&multifd_recv_state->count));
1537 return;
1538 }
1539 trace_multifd_recv_new_channel(id);
1540 } else {
1541 id = qatomic_read(&multifd_recv_state->count);
1542 }
1543
1544 p = &multifd_recv_state->params[id];
1545 if (p->c != NULL) {
1546 error_setg(&local_err, "multifd: received id '%d' already setup'",
1547 id);
1548 multifd_recv_terminate_threads(local_err);
1549 error_propagate(errp, local_err);
1550 return;
1551 }
1552 p->c = ioc;
1553 object_ref(OBJECT(ioc));
1554
1555 p->thread_created = true;
1556 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1557 QEMU_THREAD_JOINABLE);
1558 qatomic_inc(&multifd_recv_state->count);
1559 }
1560