1 /*
2 * Multifd common code
3 *
4 * Copyright (c) 2019-2020 Red Hat Inc
5 *
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
11 */
12
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
35
36 /* Multiple fd's */
37
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
40
41 typedef struct {
42 uint32_t magic;
43 uint32_t version;
44 unsigned char uuid[16]; /* QemuUUID */
45 uint8_t id;
46 uint8_t unused1[7]; /* Reserved for future use */
47 uint64_t unused2[4]; /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
49
50 struct {
51 MultiFDSendParams *params;
52 /* array of pages to sent */
53 MultiFDPages_t *pages;
54 /*
55 * Global number of generated multifd packets.
56 *
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
60 * fine.
61 *
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
65 */
66 uintptr_t packet_num;
67 /*
68 * Synchronization point past which no more channels will be
69 * created.
70 */
71 QemuSemaphore channels_created;
72 /* send channels ready */
73 QemuSemaphore channels_ready;
74 /*
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
78 */
79 int exiting;
80 /* multifd ops */
81 MultiFDMethods *ops;
82 } *multifd_send_state;
83
84 struct {
85 MultiFDRecvParams *params;
86 MultiFDRecvData *data;
87 /* number of created threads */
88 int count;
89 /*
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
92 */
93 QemuSemaphore sem_sync;
94 /* global number of generated multifd packets */
95 uint64_t packet_num;
96 int exiting;
97 /* multifd ops */
98 MultiFDMethods *ops;
99 } *multifd_recv_state;
100
multifd_use_packets(void)101 static bool multifd_use_packets(void)
102 {
103 return !migrate_mapped_ram();
104 }
105
multifd_send_channel_created(void)106 void multifd_send_channel_created(void)
107 {
108 qemu_sem_post(&multifd_send_state->channels_created);
109 }
110
multifd_set_file_bitmap(MultiFDSendParams * p)111 static void multifd_set_file_bitmap(MultiFDSendParams *p)
112 {
113 MultiFDPages_t *pages = p->pages;
114
115 assert(pages->block);
116
117 for (int i = 0; i < p->pages->normal_num; i++) {
118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
119 }
120
121 for (int i = p->pages->normal_num; i < p->pages->num; i++) {
122 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
123 }
124 }
125
126 /* Multifd without compression */
127
128 /**
129 * nocomp_send_setup: setup send side
130 *
131 * @p: Params for the channel that we are using
132 * @errp: pointer to an error
133 */
nocomp_send_setup(MultiFDSendParams * p,Error ** errp)134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
135 {
136 if (migrate_zero_copy_send()) {
137 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
138 }
139
140 if (multifd_use_packets()) {
141 /* We need one extra place for the packet header */
142 p->iov = g_new0(struct iovec, p->page_count + 1);
143 } else {
144 p->iov = g_new0(struct iovec, p->page_count);
145 }
146
147 return 0;
148 }
149
150 /**
151 * nocomp_send_cleanup: cleanup send side
152 *
153 * For no compression this function does nothing.
154 *
155 * @p: Params for the channel that we are using
156 * @errp: pointer to an error
157 */
nocomp_send_cleanup(MultiFDSendParams * p,Error ** errp)158 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
159 {
160 g_free(p->iov);
161 p->iov = NULL;
162 return;
163 }
164
multifd_send_prepare_iovs(MultiFDSendParams * p)165 static void multifd_send_prepare_iovs(MultiFDSendParams *p)
166 {
167 MultiFDPages_t *pages = p->pages;
168
169 for (int i = 0; i < pages->normal_num; i++) {
170 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
171 p->iov[p->iovs_num].iov_len = p->page_size;
172 p->iovs_num++;
173 }
174
175 p->next_packet_size = pages->normal_num * p->page_size;
176 }
177
178 /**
179 * nocomp_send_prepare: prepare date to be able to send
180 *
181 * For no compression we just have to calculate the size of the
182 * packet.
183 *
184 * Returns 0 for success or -1 for error
185 *
186 * @p: Params for the channel that we are using
187 * @errp: pointer to an error
188 */
nocomp_send_prepare(MultiFDSendParams * p,Error ** errp)189 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
190 {
191 bool use_zero_copy_send = migrate_zero_copy_send();
192 int ret;
193
194 multifd_send_zero_page_detect(p);
195
196 if (!multifd_use_packets()) {
197 multifd_send_prepare_iovs(p);
198 multifd_set_file_bitmap(p);
199
200 return 0;
201 }
202
203 if (!use_zero_copy_send) {
204 /*
205 * Only !zerocopy needs the header in IOV; zerocopy will
206 * send it separately.
207 */
208 multifd_send_prepare_header(p);
209 }
210
211 multifd_send_prepare_iovs(p);
212 p->flags |= MULTIFD_FLAG_NOCOMP;
213
214 multifd_send_fill_packet(p);
215
216 if (use_zero_copy_send) {
217 /* Send header first, without zerocopy */
218 ret = qio_channel_write_all(p->c, (void *)p->packet,
219 p->packet_len, errp);
220 if (ret != 0) {
221 return -1;
222 }
223 }
224
225 return 0;
226 }
227
228 /**
229 * nocomp_recv_setup: setup receive side
230 *
231 * For no compression this function does nothing.
232 *
233 * Returns 0 for success or -1 for error
234 *
235 * @p: Params for the channel that we are using
236 * @errp: pointer to an error
237 */
nocomp_recv_setup(MultiFDRecvParams * p,Error ** errp)238 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
239 {
240 p->iov = g_new0(struct iovec, p->page_count);
241 return 0;
242 }
243
244 /**
245 * nocomp_recv_cleanup: setup receive side
246 *
247 * For no compression this function does nothing.
248 *
249 * @p: Params for the channel that we are using
250 */
nocomp_recv_cleanup(MultiFDRecvParams * p)251 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
252 {
253 g_free(p->iov);
254 p->iov = NULL;
255 }
256
257 /**
258 * nocomp_recv: read the data from the channel
259 *
260 * For no compression we just need to read things into the correct place.
261 *
262 * Returns 0 for success or -1 for error
263 *
264 * @p: Params for the channel that we are using
265 * @errp: pointer to an error
266 */
nocomp_recv(MultiFDRecvParams * p,Error ** errp)267 static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
268 {
269 uint32_t flags;
270
271 if (!multifd_use_packets()) {
272 return multifd_file_recv_data(p, errp);
273 }
274
275 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
276
277 if (flags != MULTIFD_FLAG_NOCOMP) {
278 error_setg(errp, "multifd %u: flags received %x flags expected %x",
279 p->id, flags, MULTIFD_FLAG_NOCOMP);
280 return -1;
281 }
282
283 multifd_recv_zero_page_process(p);
284
285 if (!p->normal_num) {
286 return 0;
287 }
288
289 for (int i = 0; i < p->normal_num; i++) {
290 p->iov[i].iov_base = p->host + p->normal[i];
291 p->iov[i].iov_len = p->page_size;
292 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
293 }
294 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
295 }
296
297 static MultiFDMethods multifd_nocomp_ops = {
298 .send_setup = nocomp_send_setup,
299 .send_cleanup = nocomp_send_cleanup,
300 .send_prepare = nocomp_send_prepare,
301 .recv_setup = nocomp_recv_setup,
302 .recv_cleanup = nocomp_recv_cleanup,
303 .recv = nocomp_recv
304 };
305
306 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
307 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
308 };
309
multifd_register_ops(int method,MultiFDMethods * ops)310 void multifd_register_ops(int method, MultiFDMethods *ops)
311 {
312 assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
313 multifd_ops[method] = ops;
314 }
315
316 /* Reset a MultiFDPages_t* object for the next use */
multifd_pages_reset(MultiFDPages_t * pages)317 static void multifd_pages_reset(MultiFDPages_t *pages)
318 {
319 /*
320 * We don't need to touch offset[] array, because it will be
321 * overwritten later when reused.
322 */
323 pages->num = 0;
324 pages->normal_num = 0;
325 pages->block = NULL;
326 }
327
multifd_send_initial_packet(MultiFDSendParams * p,Error ** errp)328 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
329 {
330 MultiFDInit_t msg = {};
331 size_t size = sizeof(msg);
332 int ret;
333
334 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
335 msg.version = cpu_to_be32(MULTIFD_VERSION);
336 msg.id = p->id;
337 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
338
339 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
340 if (ret != 0) {
341 return -1;
342 }
343 stat64_add(&mig_stats.multifd_bytes, size);
344 return 0;
345 }
346
multifd_recv_initial_packet(QIOChannel * c,Error ** errp)347 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
348 {
349 MultiFDInit_t msg;
350 int ret;
351
352 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
353 if (ret != 0) {
354 return -1;
355 }
356
357 msg.magic = be32_to_cpu(msg.magic);
358 msg.version = be32_to_cpu(msg.version);
359
360 if (msg.magic != MULTIFD_MAGIC) {
361 error_setg(errp, "multifd: received packet magic %x "
362 "expected %x", msg.magic, MULTIFD_MAGIC);
363 return -1;
364 }
365
366 if (msg.version != MULTIFD_VERSION) {
367 error_setg(errp, "multifd: received packet version %u "
368 "expected %u", msg.version, MULTIFD_VERSION);
369 return -1;
370 }
371
372 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
373 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
374 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
375
376 error_setg(errp, "multifd: received uuid '%s' and expected "
377 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
378 g_free(uuid);
379 g_free(msg_uuid);
380 return -1;
381 }
382
383 if (msg.id > migrate_multifd_channels()) {
384 error_setg(errp, "multifd: received channel id %u is greater than "
385 "number of channels %u", msg.id, migrate_multifd_channels());
386 return -1;
387 }
388
389 return msg.id;
390 }
391
multifd_pages_init(uint32_t n)392 static MultiFDPages_t *multifd_pages_init(uint32_t n)
393 {
394 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
395
396 pages->allocated = n;
397 pages->offset = g_new0(ram_addr_t, n);
398
399 return pages;
400 }
401
multifd_pages_clear(MultiFDPages_t * pages)402 static void multifd_pages_clear(MultiFDPages_t *pages)
403 {
404 multifd_pages_reset(pages);
405 pages->allocated = 0;
406 g_free(pages->offset);
407 pages->offset = NULL;
408 g_free(pages);
409 }
410
multifd_send_fill_packet(MultiFDSendParams * p)411 void multifd_send_fill_packet(MultiFDSendParams *p)
412 {
413 MultiFDPacket_t *packet = p->packet;
414 MultiFDPages_t *pages = p->pages;
415 uint64_t packet_num;
416 uint32_t zero_num = pages->num - pages->normal_num;
417 int i;
418
419 packet->flags = cpu_to_be32(p->flags);
420 packet->pages_alloc = cpu_to_be32(p->pages->allocated);
421 packet->normal_pages = cpu_to_be32(pages->normal_num);
422 packet->zero_pages = cpu_to_be32(zero_num);
423 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
424
425 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
426 packet->packet_num = cpu_to_be64(packet_num);
427
428 if (pages->block) {
429 strncpy(packet->ramblock, pages->block->idstr, 256);
430 }
431
432 for (i = 0; i < pages->num; i++) {
433 /* there are architectures where ram_addr_t is 32 bit */
434 uint64_t temp = pages->offset[i];
435
436 packet->offset[i] = cpu_to_be64(temp);
437 }
438
439 p->packets_sent++;
440 p->total_normal_pages += pages->normal_num;
441 p->total_zero_pages += zero_num;
442
443 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
444 p->flags, p->next_packet_size);
445 }
446
multifd_recv_unfill_packet(MultiFDRecvParams * p,Error ** errp)447 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
448 {
449 MultiFDPacket_t *packet = p->packet;
450 int i;
451
452 packet->magic = be32_to_cpu(packet->magic);
453 if (packet->magic != MULTIFD_MAGIC) {
454 error_setg(errp, "multifd: received packet "
455 "magic %x and expected magic %x",
456 packet->magic, MULTIFD_MAGIC);
457 return -1;
458 }
459
460 packet->version = be32_to_cpu(packet->version);
461 if (packet->version != MULTIFD_VERSION) {
462 error_setg(errp, "multifd: received packet "
463 "version %u and expected version %u",
464 packet->version, MULTIFD_VERSION);
465 return -1;
466 }
467
468 p->flags = be32_to_cpu(packet->flags);
469
470 packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
471 /*
472 * If we received a packet that is 100 times bigger than expected
473 * just stop migration. It is a magic number.
474 */
475 if (packet->pages_alloc > p->page_count) {
476 error_setg(errp, "multifd: received packet "
477 "with size %u and expected a size of %u",
478 packet->pages_alloc, p->page_count) ;
479 return -1;
480 }
481
482 p->normal_num = be32_to_cpu(packet->normal_pages);
483 if (p->normal_num > packet->pages_alloc) {
484 error_setg(errp, "multifd: received packet "
485 "with %u normal pages and expected maximum pages are %u",
486 p->normal_num, packet->pages_alloc) ;
487 return -1;
488 }
489
490 p->zero_num = be32_to_cpu(packet->zero_pages);
491 if (p->zero_num > packet->pages_alloc - p->normal_num) {
492 error_setg(errp, "multifd: received packet "
493 "with %u zero pages and expected maximum zero pages are %u",
494 p->zero_num, packet->pages_alloc - p->normal_num) ;
495 return -1;
496 }
497
498 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
499 p->packet_num = be64_to_cpu(packet->packet_num);
500 p->packets_recved++;
501 p->total_normal_pages += p->normal_num;
502 p->total_zero_pages += p->zero_num;
503
504 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
505 p->flags, p->next_packet_size);
506
507 if (p->normal_num == 0 && p->zero_num == 0) {
508 return 0;
509 }
510
511 /* make sure that ramblock is 0 terminated */
512 packet->ramblock[255] = 0;
513 p->block = qemu_ram_block_by_name(packet->ramblock);
514 if (!p->block) {
515 error_setg(errp, "multifd: unknown ram block %s",
516 packet->ramblock);
517 return -1;
518 }
519
520 p->host = p->block->host;
521 for (i = 0; i < p->normal_num; i++) {
522 uint64_t offset = be64_to_cpu(packet->offset[i]);
523
524 if (offset > (p->block->used_length - p->page_size)) {
525 error_setg(errp, "multifd: offset too long %" PRIu64
526 " (max " RAM_ADDR_FMT ")",
527 offset, p->block->used_length);
528 return -1;
529 }
530 p->normal[i] = offset;
531 }
532
533 for (i = 0; i < p->zero_num; i++) {
534 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
535
536 if (offset > (p->block->used_length - p->page_size)) {
537 error_setg(errp, "multifd: offset too long %" PRIu64
538 " (max " RAM_ADDR_FMT ")",
539 offset, p->block->used_length);
540 return -1;
541 }
542 p->zero[i] = offset;
543 }
544
545 return 0;
546 }
547
multifd_send_should_exit(void)548 static bool multifd_send_should_exit(void)
549 {
550 return qatomic_read(&multifd_send_state->exiting);
551 }
552
multifd_recv_should_exit(void)553 static bool multifd_recv_should_exit(void)
554 {
555 return qatomic_read(&multifd_recv_state->exiting);
556 }
557
558 /*
559 * The migration thread can wait on either of the two semaphores. This
560 * function can be used to kick the main thread out of waiting on either of
561 * them. Should mostly only be called when something wrong happened with
562 * the current multifd send thread.
563 */
multifd_send_kick_main(MultiFDSendParams * p)564 static void multifd_send_kick_main(MultiFDSendParams *p)
565 {
566 qemu_sem_post(&p->sem_sync);
567 qemu_sem_post(&multifd_send_state->channels_ready);
568 }
569
570 /*
571 * How we use multifd_send_state->pages and channel->pages?
572 *
573 * We create a pages for each channel, and a main one. Each time that
574 * we need to send a batch of pages we interchange the ones between
575 * multifd_send_state and the channel that is sending it. There are
576 * two reasons for that:
577 * - to not have to do so many mallocs during migration
578 * - to make easier to know what to free at the end of migration
579 *
580 * This way we always know who is the owner of each "pages" struct,
581 * and we don't need any locking. It belongs to the migration thread
582 * or to the channel thread. Switching is safe because the migration
583 * thread is using the channel mutex when changing it, and the channel
584 * have to had finish with its own, otherwise pending_job can't be
585 * false.
586 *
587 * Returns true if succeed, false otherwise.
588 */
multifd_send_pages(void)589 static bool multifd_send_pages(void)
590 {
591 int i;
592 static int next_channel;
593 MultiFDSendParams *p = NULL; /* make happy gcc */
594 MultiFDPages_t *pages = multifd_send_state->pages;
595
596 if (multifd_send_should_exit()) {
597 return false;
598 }
599
600 /* We wait here, until at least one channel is ready */
601 qemu_sem_wait(&multifd_send_state->channels_ready);
602
603 /*
604 * next_channel can remain from a previous migration that was
605 * using more channels, so ensure it doesn't overflow if the
606 * limit is lower now.
607 */
608 next_channel %= migrate_multifd_channels();
609 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
610 if (multifd_send_should_exit()) {
611 return false;
612 }
613 p = &multifd_send_state->params[i];
614 /*
615 * Lockless read to p->pending_job is safe, because only multifd
616 * sender thread can clear it.
617 */
618 if (qatomic_read(&p->pending_job) == false) {
619 next_channel = (i + 1) % migrate_multifd_channels();
620 break;
621 }
622 }
623
624 /*
625 * Make sure we read p->pending_job before all the rest. Pairs with
626 * qatomic_store_release() in multifd_send_thread().
627 */
628 smp_mb_acquire();
629 assert(!p->pages->num);
630 multifd_send_state->pages = p->pages;
631 p->pages = pages;
632 /*
633 * Making sure p->pages is setup before marking pending_job=true. Pairs
634 * with the qatomic_load_acquire() in multifd_send_thread().
635 */
636 qatomic_store_release(&p->pending_job, true);
637 qemu_sem_post(&p->sem);
638
639 return true;
640 }
641
multifd_queue_empty(MultiFDPages_t * pages)642 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
643 {
644 return pages->num == 0;
645 }
646
multifd_queue_full(MultiFDPages_t * pages)647 static inline bool multifd_queue_full(MultiFDPages_t *pages)
648 {
649 return pages->num == pages->allocated;
650 }
651
multifd_enqueue(MultiFDPages_t * pages,ram_addr_t offset)652 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
653 {
654 pages->offset[pages->num++] = offset;
655 }
656
657 /* Returns true if enqueue successful, false otherwise */
multifd_queue_page(RAMBlock * block,ram_addr_t offset)658 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
659 {
660 MultiFDPages_t *pages;
661
662 retry:
663 pages = multifd_send_state->pages;
664
665 /* If the queue is empty, we can already enqueue now */
666 if (multifd_queue_empty(pages)) {
667 pages->block = block;
668 multifd_enqueue(pages, offset);
669 return true;
670 }
671
672 /*
673 * Not empty, meanwhile we need a flush. It can because of either:
674 *
675 * (1) The page is not on the same ramblock of previous ones, or,
676 * (2) The queue is full.
677 *
678 * After flush, always retry.
679 */
680 if (pages->block != block || multifd_queue_full(pages)) {
681 if (!multifd_send_pages()) {
682 return false;
683 }
684 goto retry;
685 }
686
687 /* Not empty, and we still have space, do it! */
688 multifd_enqueue(pages, offset);
689 return true;
690 }
691
692 /* Multifd send side hit an error; remember it and prepare to quit */
multifd_send_set_error(Error * err)693 static void multifd_send_set_error(Error *err)
694 {
695 /*
696 * We don't want to exit each threads twice. Depending on where
697 * we get the error, or if there are two independent errors in two
698 * threads at the same time, we can end calling this function
699 * twice.
700 */
701 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
702 return;
703 }
704
705 if (err) {
706 MigrationState *s = migrate_get_current();
707 migrate_set_error(s, err);
708 if (s->state == MIGRATION_STATUS_SETUP ||
709 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
710 s->state == MIGRATION_STATUS_DEVICE ||
711 s->state == MIGRATION_STATUS_ACTIVE) {
712 migrate_set_state(&s->state, s->state,
713 MIGRATION_STATUS_FAILED);
714 }
715 }
716 }
717
multifd_send_terminate_threads(void)718 static void multifd_send_terminate_threads(void)
719 {
720 int i;
721
722 trace_multifd_send_terminate_threads();
723
724 /*
725 * Tell everyone we're quitting. No xchg() needed here; we simply
726 * always set it.
727 */
728 qatomic_set(&multifd_send_state->exiting, 1);
729
730 /*
731 * Firstly, kick all threads out; no matter whether they are just idle,
732 * or blocked in an IO system call.
733 */
734 for (i = 0; i < migrate_multifd_channels(); i++) {
735 MultiFDSendParams *p = &multifd_send_state->params[i];
736
737 qemu_sem_post(&p->sem);
738 if (p->c) {
739 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
740 }
741 }
742
743 /*
744 * Finally recycle all the threads.
745 */
746 for (i = 0; i < migrate_multifd_channels(); i++) {
747 MultiFDSendParams *p = &multifd_send_state->params[i];
748
749 if (p->tls_thread_created) {
750 qemu_thread_join(&p->tls_thread);
751 }
752
753 if (p->thread_created) {
754 qemu_thread_join(&p->thread);
755 }
756 }
757 }
758
multifd_send_cleanup_channel(MultiFDSendParams * p,Error ** errp)759 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
760 {
761 if (p->c) {
762 migration_ioc_unregister_yank(p->c);
763 /*
764 * The object_unref() cannot guarantee the fd will always be
765 * released because finalize() of the iochannel is only
766 * triggered on the last reference and it's not guaranteed
767 * that we always hold the last refcount when reaching here.
768 *
769 * Closing the fd explicitly has the benefit that if there is any
770 * registered I/O handler callbacks on such fd, that will get a
771 * POLLNVAL event and will further trigger the cleanup to finally
772 * release the IOC.
773 *
774 * FIXME: It should logically be guaranteed that all multifd
775 * channels have no I/O handler callback registered when reaching
776 * here, because migration thread will wait for all multifd channel
777 * establishments to complete during setup. Since
778 * migrate_fd_cleanup() will be scheduled in main thread too, all
779 * previous callbacks should guarantee to be completed when
780 * reaching here. See multifd_send_state.channels_created and its
781 * usage. In the future, we could replace this with an assert
782 * making sure we're the last reference, or simply drop it if above
783 * is more clear to be justified.
784 */
785 qio_channel_close(p->c, &error_abort);
786 object_unref(OBJECT(p->c));
787 p->c = NULL;
788 }
789 qemu_sem_destroy(&p->sem);
790 qemu_sem_destroy(&p->sem_sync);
791 g_free(p->name);
792 p->name = NULL;
793 multifd_pages_clear(p->pages);
794 p->pages = NULL;
795 p->packet_len = 0;
796 g_free(p->packet);
797 p->packet = NULL;
798 multifd_send_state->ops->send_cleanup(p, errp);
799
800 return *errp == NULL;
801 }
802
multifd_send_cleanup_state(void)803 static void multifd_send_cleanup_state(void)
804 {
805 file_cleanup_outgoing_migration();
806 socket_cleanup_outgoing_migration();
807 qemu_sem_destroy(&multifd_send_state->channels_created);
808 qemu_sem_destroy(&multifd_send_state->channels_ready);
809 g_free(multifd_send_state->params);
810 multifd_send_state->params = NULL;
811 multifd_pages_clear(multifd_send_state->pages);
812 multifd_send_state->pages = NULL;
813 g_free(multifd_send_state);
814 multifd_send_state = NULL;
815 }
816
multifd_send_shutdown(void)817 void multifd_send_shutdown(void)
818 {
819 int i;
820
821 if (!migrate_multifd()) {
822 return;
823 }
824
825 multifd_send_terminate_threads();
826
827 for (i = 0; i < migrate_multifd_channels(); i++) {
828 MultiFDSendParams *p = &multifd_send_state->params[i];
829 Error *local_err = NULL;
830
831 if (!multifd_send_cleanup_channel(p, &local_err)) {
832 migrate_set_error(migrate_get_current(), local_err);
833 error_free(local_err);
834 }
835 }
836
837 multifd_send_cleanup_state();
838 }
839
multifd_zero_copy_flush(QIOChannel * c)840 static int multifd_zero_copy_flush(QIOChannel *c)
841 {
842 int ret;
843 Error *err = NULL;
844
845 ret = qio_channel_flush(c, &err);
846 if (ret < 0) {
847 error_report_err(err);
848 return -1;
849 }
850 if (ret == 1) {
851 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
852 }
853
854 return ret;
855 }
856
multifd_send_sync_main(void)857 int multifd_send_sync_main(void)
858 {
859 int i;
860 bool flush_zero_copy;
861
862 if (!migrate_multifd()) {
863 return 0;
864 }
865 if (multifd_send_state->pages->num) {
866 if (!multifd_send_pages()) {
867 error_report("%s: multifd_send_pages fail", __func__);
868 return -1;
869 }
870 }
871
872 flush_zero_copy = migrate_zero_copy_send();
873
874 for (i = 0; i < migrate_multifd_channels(); i++) {
875 MultiFDSendParams *p = &multifd_send_state->params[i];
876
877 if (multifd_send_should_exit()) {
878 return -1;
879 }
880
881 trace_multifd_send_sync_main_signal(p->id);
882
883 /*
884 * We should be the only user so far, so not possible to be set by
885 * others concurrently.
886 */
887 assert(qatomic_read(&p->pending_sync) == false);
888 qatomic_set(&p->pending_sync, true);
889 qemu_sem_post(&p->sem);
890 }
891 for (i = 0; i < migrate_multifd_channels(); i++) {
892 MultiFDSendParams *p = &multifd_send_state->params[i];
893
894 if (multifd_send_should_exit()) {
895 return -1;
896 }
897
898 qemu_sem_wait(&multifd_send_state->channels_ready);
899 trace_multifd_send_sync_main_wait(p->id);
900 qemu_sem_wait(&p->sem_sync);
901
902 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
903 return -1;
904 }
905 }
906 trace_multifd_send_sync_main(multifd_send_state->packet_num);
907
908 return 0;
909 }
910
multifd_send_thread(void * opaque)911 static void *multifd_send_thread(void *opaque)
912 {
913 MultiFDSendParams *p = opaque;
914 MigrationThread *thread = NULL;
915 Error *local_err = NULL;
916 int ret = 0;
917 bool use_packets = multifd_use_packets();
918
919 thread = migration_threads_add(p->name, qemu_get_thread_id());
920
921 trace_multifd_send_thread_start(p->id);
922 rcu_register_thread();
923
924 if (use_packets) {
925 if (multifd_send_initial_packet(p, &local_err) < 0) {
926 ret = -1;
927 goto out;
928 }
929 }
930
931 while (true) {
932 qemu_sem_post(&multifd_send_state->channels_ready);
933 qemu_sem_wait(&p->sem);
934
935 if (multifd_send_should_exit()) {
936 break;
937 }
938
939 /*
940 * Read pending_job flag before p->pages. Pairs with the
941 * qatomic_store_release() in multifd_send_pages().
942 */
943 if (qatomic_load_acquire(&p->pending_job)) {
944 MultiFDPages_t *pages = p->pages;
945
946 p->iovs_num = 0;
947 assert(pages->num);
948
949 ret = multifd_send_state->ops->send_prepare(p, &local_err);
950 if (ret != 0) {
951 break;
952 }
953
954 if (migrate_mapped_ram()) {
955 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
956 p->pages->block, &local_err);
957 } else {
958 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
959 NULL, 0, p->write_flags,
960 &local_err);
961 }
962
963 if (ret != 0) {
964 break;
965 }
966
967 stat64_add(&mig_stats.multifd_bytes,
968 p->next_packet_size + p->packet_len);
969 stat64_add(&mig_stats.normal_pages, pages->normal_num);
970 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
971
972 multifd_pages_reset(p->pages);
973 p->next_packet_size = 0;
974
975 /*
976 * Making sure p->pages is published before saying "we're
977 * free". Pairs with the smp_mb_acquire() in
978 * multifd_send_pages().
979 */
980 qatomic_store_release(&p->pending_job, false);
981 } else {
982 /*
983 * If not a normal job, must be a sync request. Note that
984 * pending_sync is a standalone flag (unlike pending_job), so
985 * it doesn't require explicit memory barriers.
986 */
987 assert(qatomic_read(&p->pending_sync));
988
989 if (use_packets) {
990 p->flags = MULTIFD_FLAG_SYNC;
991 multifd_send_fill_packet(p);
992 ret = qio_channel_write_all(p->c, (void *)p->packet,
993 p->packet_len, &local_err);
994 if (ret != 0) {
995 break;
996 }
997 /* p->next_packet_size will always be zero for a SYNC packet */
998 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
999 p->flags = 0;
1000 }
1001
1002 qatomic_set(&p->pending_sync, false);
1003 qemu_sem_post(&p->sem_sync);
1004 }
1005 }
1006
1007 out:
1008 if (ret) {
1009 assert(local_err);
1010 trace_multifd_send_error(p->id);
1011 multifd_send_set_error(local_err);
1012 multifd_send_kick_main(p);
1013 error_free(local_err);
1014 }
1015
1016 rcu_unregister_thread();
1017 migration_threads_remove(thread);
1018 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
1019 p->total_zero_pages);
1020
1021 return NULL;
1022 }
1023
1024 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
1025
1026 typedef struct {
1027 MultiFDSendParams *p;
1028 QIOChannelTLS *tioc;
1029 } MultiFDTLSThreadArgs;
1030
multifd_tls_handshake_thread(void * opaque)1031 static void *multifd_tls_handshake_thread(void *opaque)
1032 {
1033 MultiFDTLSThreadArgs *args = opaque;
1034
1035 qio_channel_tls_handshake(args->tioc,
1036 multifd_new_send_channel_async,
1037 args->p,
1038 NULL,
1039 NULL);
1040 g_free(args);
1041
1042 return NULL;
1043 }
1044
multifd_tls_channel_connect(MultiFDSendParams * p,QIOChannel * ioc,Error ** errp)1045 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
1046 QIOChannel *ioc,
1047 Error **errp)
1048 {
1049 MigrationState *s = migrate_get_current();
1050 const char *hostname = s->hostname;
1051 MultiFDTLSThreadArgs *args;
1052 QIOChannelTLS *tioc;
1053
1054 tioc = migration_tls_client_create(ioc, hostname, errp);
1055 if (!tioc) {
1056 return false;
1057 }
1058
1059 /*
1060 * Ownership of the socket channel now transfers to the newly
1061 * created TLS channel, which has already taken a reference.
1062 */
1063 object_unref(OBJECT(ioc));
1064 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
1065 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
1066
1067 args = g_new0(MultiFDTLSThreadArgs, 1);
1068 args->tioc = tioc;
1069 args->p = p;
1070
1071 p->tls_thread_created = true;
1072 qemu_thread_create(&p->tls_thread, "mig/src/tls",
1073 multifd_tls_handshake_thread, args,
1074 QEMU_THREAD_JOINABLE);
1075 return true;
1076 }
1077
multifd_channel_connect(MultiFDSendParams * p,QIOChannel * ioc)1078 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
1079 {
1080 qio_channel_set_delay(ioc, false);
1081
1082 migration_ioc_register_yank(ioc);
1083 /* Setup p->c only if the channel is completely setup */
1084 p->c = ioc;
1085
1086 p->thread_created = true;
1087 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1088 QEMU_THREAD_JOINABLE);
1089 }
1090
1091 /*
1092 * When TLS is enabled this function is called once to establish the
1093 * TLS connection and a second time after the TLS handshake to create
1094 * the multifd channel. Without TLS it goes straight into the channel
1095 * creation.
1096 */
multifd_new_send_channel_async(QIOTask * task,gpointer opaque)1097 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1098 {
1099 MultiFDSendParams *p = opaque;
1100 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1101 Error *local_err = NULL;
1102 bool ret;
1103
1104 trace_multifd_new_send_channel_async(p->id);
1105
1106 if (qio_task_propagate_error(task, &local_err)) {
1107 ret = false;
1108 goto out;
1109 }
1110
1111 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1112 migrate_get_current()->hostname);
1113
1114 if (migrate_channel_requires_tls_upgrade(ioc)) {
1115 ret = multifd_tls_channel_connect(p, ioc, &local_err);
1116 if (ret) {
1117 return;
1118 }
1119 } else {
1120 multifd_channel_connect(p, ioc);
1121 ret = true;
1122 }
1123
1124 out:
1125 /*
1126 * Here we're not interested whether creation succeeded, only that
1127 * it happened at all.
1128 */
1129 multifd_send_channel_created();
1130
1131 if (ret) {
1132 return;
1133 }
1134
1135 trace_multifd_new_send_channel_async_error(p->id, local_err);
1136 multifd_send_set_error(local_err);
1137 /*
1138 * For error cases (TLS or non-TLS), IO channel is always freed here
1139 * rather than when cleanup multifd: since p->c is not set, multifd
1140 * cleanup code doesn't even know its existence.
1141 */
1142 object_unref(OBJECT(ioc));
1143 error_free(local_err);
1144 }
1145
multifd_new_send_channel_create(gpointer opaque,Error ** errp)1146 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
1147 {
1148 if (!multifd_use_packets()) {
1149 return file_send_channel_create(opaque, errp);
1150 }
1151
1152 socket_send_channel_create(multifd_new_send_channel_async, opaque);
1153 return true;
1154 }
1155
multifd_send_setup(void)1156 bool multifd_send_setup(void)
1157 {
1158 MigrationState *s = migrate_get_current();
1159 int thread_count, ret = 0;
1160 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1161 bool use_packets = multifd_use_packets();
1162 uint8_t i;
1163
1164 if (!migrate_multifd()) {
1165 return true;
1166 }
1167
1168 thread_count = migrate_multifd_channels();
1169 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1170 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1171 multifd_send_state->pages = multifd_pages_init(page_count);
1172 qemu_sem_init(&multifd_send_state->channels_created, 0);
1173 qemu_sem_init(&multifd_send_state->channels_ready, 0);
1174 qatomic_set(&multifd_send_state->exiting, 0);
1175 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1176
1177 for (i = 0; i < thread_count; i++) {
1178 MultiFDSendParams *p = &multifd_send_state->params[i];
1179 Error *local_err = NULL;
1180
1181 qemu_sem_init(&p->sem, 0);
1182 qemu_sem_init(&p->sem_sync, 0);
1183 p->id = i;
1184 p->pages = multifd_pages_init(page_count);
1185
1186 if (use_packets) {
1187 p->packet_len = sizeof(MultiFDPacket_t)
1188 + sizeof(uint64_t) * page_count;
1189 p->packet = g_malloc0(p->packet_len);
1190 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1191 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1192 }
1193 p->name = g_strdup_printf("mig/src/send_%d", i);
1194 p->page_size = qemu_target_page_size();
1195 p->page_count = page_count;
1196 p->write_flags = 0;
1197
1198 if (!multifd_new_send_channel_create(p, &local_err)) {
1199 migrate_set_error(s, local_err);
1200 ret = -1;
1201 }
1202 }
1203
1204 /*
1205 * Wait until channel creation has started for all channels. The
1206 * creation can still fail, but no more channels will be created
1207 * past this point.
1208 */
1209 for (i = 0; i < thread_count; i++) {
1210 qemu_sem_wait(&multifd_send_state->channels_created);
1211 }
1212
1213 if (ret) {
1214 goto err;
1215 }
1216
1217 for (i = 0; i < thread_count; i++) {
1218 MultiFDSendParams *p = &multifd_send_state->params[i];
1219 Error *local_err = NULL;
1220
1221 ret = multifd_send_state->ops->send_setup(p, &local_err);
1222 if (ret) {
1223 migrate_set_error(s, local_err);
1224 goto err;
1225 }
1226 }
1227
1228 return true;
1229
1230 err:
1231 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1232 MIGRATION_STATUS_FAILED);
1233 return false;
1234 }
1235
multifd_recv(void)1236 bool multifd_recv(void)
1237 {
1238 int i;
1239 static int next_recv_channel;
1240 MultiFDRecvParams *p = NULL;
1241 MultiFDRecvData *data = multifd_recv_state->data;
1242
1243 /*
1244 * next_channel can remain from a previous migration that was
1245 * using more channels, so ensure it doesn't overflow if the
1246 * limit is lower now.
1247 */
1248 next_recv_channel %= migrate_multifd_channels();
1249 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1250 if (multifd_recv_should_exit()) {
1251 return false;
1252 }
1253
1254 p = &multifd_recv_state->params[i];
1255
1256 if (qatomic_read(&p->pending_job) == false) {
1257 next_recv_channel = (i + 1) % migrate_multifd_channels();
1258 break;
1259 }
1260 }
1261
1262 /*
1263 * Order pending_job read before manipulating p->data below. Pairs
1264 * with qatomic_store_release() at multifd_recv_thread().
1265 */
1266 smp_mb_acquire();
1267
1268 assert(!p->data->size);
1269 multifd_recv_state->data = p->data;
1270 p->data = data;
1271
1272 /*
1273 * Order p->data update before setting pending_job. Pairs with
1274 * qatomic_load_acquire() at multifd_recv_thread().
1275 */
1276 qatomic_store_release(&p->pending_job, true);
1277 qemu_sem_post(&p->sem);
1278
1279 return true;
1280 }
1281
multifd_get_recv_data(void)1282 MultiFDRecvData *multifd_get_recv_data(void)
1283 {
1284 return multifd_recv_state->data;
1285 }
1286
multifd_recv_terminate_threads(Error * err)1287 static void multifd_recv_terminate_threads(Error *err)
1288 {
1289 int i;
1290
1291 trace_multifd_recv_terminate_threads(err != NULL);
1292
1293 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1294 return;
1295 }
1296
1297 if (err) {
1298 MigrationState *s = migrate_get_current();
1299 migrate_set_error(s, err);
1300 if (s->state == MIGRATION_STATUS_SETUP ||
1301 s->state == MIGRATION_STATUS_ACTIVE) {
1302 migrate_set_state(&s->state, s->state,
1303 MIGRATION_STATUS_FAILED);
1304 }
1305 }
1306
1307 for (i = 0; i < migrate_multifd_channels(); i++) {
1308 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1309
1310 /*
1311 * The migration thread and channels interact differently
1312 * depending on the presence of packets.
1313 */
1314 if (multifd_use_packets()) {
1315 /*
1316 * The channel receives as long as there are packets. When
1317 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1318 * channel waits for the migration thread to sync. If the
1319 * sync never happens, do it here.
1320 */
1321 qemu_sem_post(&p->sem_sync);
1322 } else {
1323 /*
1324 * The channel waits for the migration thread to give it
1325 * work. When the migration thread runs out of work, it
1326 * releases the channel and waits for any pending work to
1327 * finish. If we reach here (e.g. due to error) before the
1328 * work runs out, release the channel.
1329 */
1330 qemu_sem_post(&p->sem);
1331 }
1332
1333 /*
1334 * We could arrive here for two reasons:
1335 * - normal quit, i.e. everything went fine, just finished
1336 * - error quit: We close the channels so the channel threads
1337 * finish the qio_channel_read_all_eof()
1338 */
1339 if (p->c) {
1340 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1341 }
1342 }
1343 }
1344
multifd_recv_shutdown(void)1345 void multifd_recv_shutdown(void)
1346 {
1347 if (migrate_multifd()) {
1348 multifd_recv_terminate_threads(NULL);
1349 }
1350 }
1351
multifd_recv_cleanup_channel(MultiFDRecvParams * p)1352 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1353 {
1354 migration_ioc_unregister_yank(p->c);
1355 object_unref(OBJECT(p->c));
1356 p->c = NULL;
1357 qemu_mutex_destroy(&p->mutex);
1358 qemu_sem_destroy(&p->sem_sync);
1359 qemu_sem_destroy(&p->sem);
1360 g_free(p->data);
1361 p->data = NULL;
1362 g_free(p->name);
1363 p->name = NULL;
1364 p->packet_len = 0;
1365 g_free(p->packet);
1366 p->packet = NULL;
1367 g_free(p->normal);
1368 p->normal = NULL;
1369 g_free(p->zero);
1370 p->zero = NULL;
1371 multifd_recv_state->ops->recv_cleanup(p);
1372 }
1373
multifd_recv_cleanup_state(void)1374 static void multifd_recv_cleanup_state(void)
1375 {
1376 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1377 g_free(multifd_recv_state->params);
1378 multifd_recv_state->params = NULL;
1379 g_free(multifd_recv_state->data);
1380 multifd_recv_state->data = NULL;
1381 g_free(multifd_recv_state);
1382 multifd_recv_state = NULL;
1383 }
1384
multifd_recv_cleanup(void)1385 void multifd_recv_cleanup(void)
1386 {
1387 int i;
1388
1389 if (!migrate_multifd()) {
1390 return;
1391 }
1392 multifd_recv_terminate_threads(NULL);
1393 for (i = 0; i < migrate_multifd_channels(); i++) {
1394 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1395
1396 if (p->thread_created) {
1397 qemu_thread_join(&p->thread);
1398 }
1399 }
1400 for (i = 0; i < migrate_multifd_channels(); i++) {
1401 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1402 }
1403 multifd_recv_cleanup_state();
1404 }
1405
multifd_recv_sync_main(void)1406 void multifd_recv_sync_main(void)
1407 {
1408 int thread_count = migrate_multifd_channels();
1409 bool file_based = !multifd_use_packets();
1410 int i;
1411
1412 if (!migrate_multifd()) {
1413 return;
1414 }
1415
1416 /*
1417 * File-based channels don't use packets and therefore need to
1418 * wait for more work. Release them to start the sync.
1419 */
1420 if (file_based) {
1421 for (i = 0; i < thread_count; i++) {
1422 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1423
1424 trace_multifd_recv_sync_main_signal(p->id);
1425 qemu_sem_post(&p->sem);
1426 }
1427 }
1428
1429 /*
1430 * Initiate the synchronization by waiting for all channels.
1431 *
1432 * For socket-based migration this means each channel has received
1433 * the SYNC packet on the stream.
1434 *
1435 * For file-based migration this means each channel is done with
1436 * the work (pending_job=false).
1437 */
1438 for (i = 0; i < thread_count; i++) {
1439 trace_multifd_recv_sync_main_wait(i);
1440 qemu_sem_wait(&multifd_recv_state->sem_sync);
1441 }
1442
1443 if (file_based) {
1444 /*
1445 * For file-based loading is done in one iteration. We're
1446 * done.
1447 */
1448 return;
1449 }
1450
1451 /*
1452 * Sync done. Release the channels for the next iteration.
1453 */
1454 for (i = 0; i < thread_count; i++) {
1455 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1456
1457 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1458 if (multifd_recv_state->packet_num < p->packet_num) {
1459 multifd_recv_state->packet_num = p->packet_num;
1460 }
1461 }
1462 trace_multifd_recv_sync_main_signal(p->id);
1463 qemu_sem_post(&p->sem_sync);
1464 }
1465 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1466 }
1467
multifd_recv_thread(void * opaque)1468 static void *multifd_recv_thread(void *opaque)
1469 {
1470 MultiFDRecvParams *p = opaque;
1471 Error *local_err = NULL;
1472 bool use_packets = multifd_use_packets();
1473 int ret;
1474
1475 trace_multifd_recv_thread_start(p->id);
1476 rcu_register_thread();
1477
1478 while (true) {
1479 uint32_t flags = 0;
1480 bool has_data = false;
1481 p->normal_num = 0;
1482
1483 if (use_packets) {
1484 if (multifd_recv_should_exit()) {
1485 break;
1486 }
1487
1488 ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1489 p->packet_len, &local_err);
1490 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
1491 break;
1492 }
1493
1494 qemu_mutex_lock(&p->mutex);
1495 ret = multifd_recv_unfill_packet(p, &local_err);
1496 if (ret) {
1497 qemu_mutex_unlock(&p->mutex);
1498 break;
1499 }
1500
1501 flags = p->flags;
1502 /* recv methods don't know how to handle the SYNC flag */
1503 p->flags &= ~MULTIFD_FLAG_SYNC;
1504 has_data = p->normal_num || p->zero_num;
1505 qemu_mutex_unlock(&p->mutex);
1506 } else {
1507 /*
1508 * No packets, so we need to wait for the vmstate code to
1509 * give us work.
1510 */
1511 qemu_sem_wait(&p->sem);
1512
1513 if (multifd_recv_should_exit()) {
1514 break;
1515 }
1516
1517 /* pairs with qatomic_store_release() at multifd_recv() */
1518 if (!qatomic_load_acquire(&p->pending_job)) {
1519 /*
1520 * Migration thread did not send work, this is
1521 * equivalent to pending_sync on the sending
1522 * side. Post sem_sync to notify we reached this
1523 * point.
1524 */
1525 qemu_sem_post(&multifd_recv_state->sem_sync);
1526 continue;
1527 }
1528
1529 has_data = !!p->data->size;
1530 }
1531
1532 if (has_data) {
1533 ret = multifd_recv_state->ops->recv(p, &local_err);
1534 if (ret != 0) {
1535 break;
1536 }
1537 }
1538
1539 if (use_packets) {
1540 if (flags & MULTIFD_FLAG_SYNC) {
1541 qemu_sem_post(&multifd_recv_state->sem_sync);
1542 qemu_sem_wait(&p->sem_sync);
1543 }
1544 } else {
1545 p->total_normal_pages += p->data->size / qemu_target_page_size();
1546 p->data->size = 0;
1547 /*
1548 * Order data->size update before clearing
1549 * pending_job. Pairs with smp_mb_acquire() at
1550 * multifd_recv().
1551 */
1552 qatomic_store_release(&p->pending_job, false);
1553 }
1554 }
1555
1556 if (local_err) {
1557 multifd_recv_terminate_threads(local_err);
1558 error_free(local_err);
1559 }
1560
1561 rcu_unregister_thread();
1562 trace_multifd_recv_thread_end(p->id, p->packets_recved,
1563 p->total_normal_pages,
1564 p->total_zero_pages);
1565
1566 return NULL;
1567 }
1568
multifd_recv_setup(Error ** errp)1569 int multifd_recv_setup(Error **errp)
1570 {
1571 int thread_count;
1572 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1573 bool use_packets = multifd_use_packets();
1574 uint8_t i;
1575
1576 /*
1577 * Return successfully if multiFD recv state is already initialised
1578 * or multiFD is not enabled.
1579 */
1580 if (multifd_recv_state || !migrate_multifd()) {
1581 return 0;
1582 }
1583
1584 thread_count = migrate_multifd_channels();
1585 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1586 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1587
1588 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1589 multifd_recv_state->data->size = 0;
1590
1591 qatomic_set(&multifd_recv_state->count, 0);
1592 qatomic_set(&multifd_recv_state->exiting, 0);
1593 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1594 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1595
1596 for (i = 0; i < thread_count; i++) {
1597 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1598
1599 qemu_mutex_init(&p->mutex);
1600 qemu_sem_init(&p->sem_sync, 0);
1601 qemu_sem_init(&p->sem, 0);
1602 p->pending_job = false;
1603 p->id = i;
1604
1605 p->data = g_new0(MultiFDRecvData, 1);
1606 p->data->size = 0;
1607
1608 if (use_packets) {
1609 p->packet_len = sizeof(MultiFDPacket_t)
1610 + sizeof(uint64_t) * page_count;
1611 p->packet = g_malloc0(p->packet_len);
1612 }
1613 p->name = g_strdup_printf("mig/dst/recv_%d", i);
1614 p->normal = g_new0(ram_addr_t, page_count);
1615 p->zero = g_new0(ram_addr_t, page_count);
1616 p->page_count = page_count;
1617 p->page_size = qemu_target_page_size();
1618 }
1619
1620 for (i = 0; i < thread_count; i++) {
1621 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1622 int ret;
1623
1624 ret = multifd_recv_state->ops->recv_setup(p, errp);
1625 if (ret) {
1626 return ret;
1627 }
1628 }
1629 return 0;
1630 }
1631
multifd_recv_all_channels_created(void)1632 bool multifd_recv_all_channels_created(void)
1633 {
1634 int thread_count = migrate_multifd_channels();
1635
1636 if (!migrate_multifd()) {
1637 return true;
1638 }
1639
1640 if (!multifd_recv_state) {
1641 /* Called before any connections created */
1642 return false;
1643 }
1644
1645 return thread_count == qatomic_read(&multifd_recv_state->count);
1646 }
1647
1648 /*
1649 * Try to receive all multifd channels to get ready for the migration.
1650 * Sets @errp when failing to receive the current channel.
1651 */
multifd_recv_new_channel(QIOChannel * ioc,Error ** errp)1652 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1653 {
1654 MultiFDRecvParams *p;
1655 Error *local_err = NULL;
1656 bool use_packets = multifd_use_packets();
1657 int id;
1658
1659 if (use_packets) {
1660 id = multifd_recv_initial_packet(ioc, &local_err);
1661 if (id < 0) {
1662 multifd_recv_terminate_threads(local_err);
1663 error_propagate_prepend(errp, local_err,
1664 "failed to receive packet"
1665 " via multifd channel %d: ",
1666 qatomic_read(&multifd_recv_state->count));
1667 return;
1668 }
1669 trace_multifd_recv_new_channel(id);
1670 } else {
1671 id = qatomic_read(&multifd_recv_state->count);
1672 }
1673
1674 p = &multifd_recv_state->params[id];
1675 if (p->c != NULL) {
1676 error_setg(&local_err, "multifd: received id '%d' already setup'",
1677 id);
1678 multifd_recv_terminate_threads(local_err);
1679 error_propagate(errp, local_err);
1680 return;
1681 }
1682 p->c = ioc;
1683 object_ref(OBJECT(ioc));
1684
1685 p->thread_created = true;
1686 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1687 QEMU_THREAD_JOINABLE);
1688 qatomic_inc(&multifd_recv_state->count);
1689 }
1690
multifd_send_prepare_common(MultiFDSendParams * p)1691 bool multifd_send_prepare_common(MultiFDSendParams *p)
1692 {
1693 multifd_send_zero_page_detect(p);
1694
1695 if (!p->pages->normal_num) {
1696 p->next_packet_size = 0;
1697 return false;
1698 }
1699
1700 multifd_send_prepare_header(p);
1701
1702 return true;
1703 }
1704