xref: /openbmc/qemu/migration/ram.c (revision c124c152)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "cpu.h"
31 #include <zlib.h>
32 #include "qemu/cutils.h"
33 #include "qemu/bitops.h"
34 #include "qemu/bitmap.h"
35 #include "qemu/main-loop.h"
36 #include "qemu/pmem.h"
37 #include "xbzrle.h"
38 #include "ram.h"
39 #include "migration.h"
40 #include "socket.h"
41 #include "migration/register.h"
42 #include "migration/misc.h"
43 #include "qemu-file.h"
44 #include "postcopy-ram.h"
45 #include "page_cache.h"
46 #include "qemu/error-report.h"
47 #include "qapi/error.h"
48 #include "qapi/qapi-events-migration.h"
49 #include "qapi/qmp/qerror.h"
50 #include "trace.h"
51 #include "exec/ram_addr.h"
52 #include "exec/target_page.h"
53 #include "qemu/rcu_queue.h"
54 #include "migration/colo.h"
55 #include "block.h"
56 #include "sysemu/sysemu.h"
57 #include "qemu/uuid.h"
58 #include "savevm.h"
59 #include "qemu/iov.h"
60 
61 /***********************************************************/
62 /* ram save/restore */
63 
64 /* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
65  * worked for pages that where filled with the same char.  We switched
66  * it to only search for the zero value.  And to avoid confusion with
67  * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
68  */
69 
70 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
71 #define RAM_SAVE_FLAG_ZERO     0x02
72 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
73 #define RAM_SAVE_FLAG_PAGE     0x08
74 #define RAM_SAVE_FLAG_EOS      0x10
75 #define RAM_SAVE_FLAG_CONTINUE 0x20
76 #define RAM_SAVE_FLAG_XBZRLE   0x40
77 /* 0x80 is reserved in migration.h start with 0x100 next */
78 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
79 
80 static inline bool is_zero_range(uint8_t *p, uint64_t size)
81 {
82     return buffer_is_zero(p, size);
83 }
84 
85 XBZRLECacheStats xbzrle_counters;
86 
87 /* struct contains XBZRLE cache and a static page
88    used by the compression */
89 static struct {
90     /* buffer used for XBZRLE encoding */
91     uint8_t *encoded_buf;
92     /* buffer for storing page content */
93     uint8_t *current_buf;
94     /* Cache for XBZRLE, Protected by lock. */
95     PageCache *cache;
96     QemuMutex lock;
97     /* it will store a page full of zeros */
98     uint8_t *zero_target_page;
99     /* buffer used for XBZRLE decoding */
100     uint8_t *decoded_buf;
101 } XBZRLE;
102 
103 static void XBZRLE_cache_lock(void)
104 {
105     if (migrate_use_xbzrle())
106         qemu_mutex_lock(&XBZRLE.lock);
107 }
108 
109 static void XBZRLE_cache_unlock(void)
110 {
111     if (migrate_use_xbzrle())
112         qemu_mutex_unlock(&XBZRLE.lock);
113 }
114 
115 /**
116  * xbzrle_cache_resize: resize the xbzrle cache
117  *
118  * This function is called from qmp_migrate_set_cache_size in main
119  * thread, possibly while a migration is in progress.  A running
120  * migration may be using the cache and might finish during this call,
121  * hence changes to the cache are protected by XBZRLE.lock().
122  *
123  * Returns 0 for success or -1 for error
124  *
125  * @new_size: new cache size
126  * @errp: set *errp if the check failed, with reason
127  */
128 int xbzrle_cache_resize(int64_t new_size, Error **errp)
129 {
130     PageCache *new_cache;
131     int64_t ret = 0;
132 
133     /* Check for truncation */
134     if (new_size != (size_t)new_size) {
135         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
136                    "exceeding address space");
137         return -1;
138     }
139 
140     if (new_size == migrate_xbzrle_cache_size()) {
141         /* nothing to do */
142         return 0;
143     }
144 
145     XBZRLE_cache_lock();
146 
147     if (XBZRLE.cache != NULL) {
148         new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
149         if (!new_cache) {
150             ret = -1;
151             goto out;
152         }
153 
154         cache_fini(XBZRLE.cache);
155         XBZRLE.cache = new_cache;
156     }
157 out:
158     XBZRLE_cache_unlock();
159     return ret;
160 }
161 
162 static bool ramblock_is_ignored(RAMBlock *block)
163 {
164     return !qemu_ram_is_migratable(block) ||
165            (migrate_ignore_shared() && qemu_ram_is_shared(block));
166 }
167 
168 /* Should be holding either ram_list.mutex, or the RCU lock. */
169 #define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
170     INTERNAL_RAMBLOCK_FOREACH(block)                   \
171         if (ramblock_is_ignored(block)) {} else
172 
173 #define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
174     INTERNAL_RAMBLOCK_FOREACH(block)                   \
175         if (!qemu_ram_is_migratable(block)) {} else
176 
177 #undef RAMBLOCK_FOREACH
178 
179 int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
180 {
181     RAMBlock *block;
182     int ret = 0;
183 
184     rcu_read_lock();
185     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
186         ret = func(block, opaque);
187         if (ret) {
188             break;
189         }
190     }
191     rcu_read_unlock();
192     return ret;
193 }
194 
195 static void ramblock_recv_map_init(void)
196 {
197     RAMBlock *rb;
198 
199     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
200         assert(!rb->receivedmap);
201         rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
202     }
203 }
204 
205 int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
206 {
207     return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
208                     rb->receivedmap);
209 }
210 
211 bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
212 {
213     return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
214 }
215 
216 void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
217 {
218     set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
219 }
220 
221 void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
222                                     size_t nr)
223 {
224     bitmap_set_atomic(rb->receivedmap,
225                       ramblock_recv_bitmap_offset(host_addr, rb),
226                       nr);
227 }
228 
229 #define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
230 
231 /*
232  * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
233  *
234  * Returns >0 if success with sent bytes, or <0 if error.
235  */
236 int64_t ramblock_recv_bitmap_send(QEMUFile *file,
237                                   const char *block_name)
238 {
239     RAMBlock *block = qemu_ram_block_by_name(block_name);
240     unsigned long *le_bitmap, nbits;
241     uint64_t size;
242 
243     if (!block) {
244         error_report("%s: invalid block name: %s", __func__, block_name);
245         return -1;
246     }
247 
248     nbits = block->used_length >> TARGET_PAGE_BITS;
249 
250     /*
251      * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
252      * machines we may need 4 more bytes for padding (see below
253      * comment). So extend it a bit before hand.
254      */
255     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
256 
257     /*
258      * Always use little endian when sending the bitmap. This is
259      * required that when source and destination VMs are not using the
260      * same endianess. (Note: big endian won't work.)
261      */
262     bitmap_to_le(le_bitmap, block->receivedmap, nbits);
263 
264     /* Size of the bitmap, in bytes */
265     size = DIV_ROUND_UP(nbits, 8);
266 
267     /*
268      * size is always aligned to 8 bytes for 64bit machines, but it
269      * may not be true for 32bit machines. We need this padding to
270      * make sure the migration can survive even between 32bit and
271      * 64bit machines.
272      */
273     size = ROUND_UP(size, 8);
274 
275     qemu_put_be64(file, size);
276     qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
277     /*
278      * Mark as an end, in case the middle part is screwed up due to
279      * some "misterious" reason.
280      */
281     qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
282     qemu_fflush(file);
283 
284     g_free(le_bitmap);
285 
286     if (qemu_file_get_error(file)) {
287         return qemu_file_get_error(file);
288     }
289 
290     return size + sizeof(size);
291 }
292 
293 /*
294  * An outstanding page request, on the source, having been received
295  * and queued
296  */
297 struct RAMSrcPageRequest {
298     RAMBlock *rb;
299     hwaddr    offset;
300     hwaddr    len;
301 
302     QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
303 };
304 
305 /* State of RAM for migration */
306 struct RAMState {
307     /* QEMUFile used for this migration */
308     QEMUFile *f;
309     /* Last block that we have visited searching for dirty pages */
310     RAMBlock *last_seen_block;
311     /* Last block from where we have sent data */
312     RAMBlock *last_sent_block;
313     /* Last dirty target page we have sent */
314     ram_addr_t last_page;
315     /* last ram version we have seen */
316     uint32_t last_version;
317     /* We are in the first round */
318     bool ram_bulk_stage;
319     /* The free page optimization is enabled */
320     bool fpo_enabled;
321     /* How many times we have dirty too many pages */
322     int dirty_rate_high_cnt;
323     /* these variables are used for bitmap sync */
324     /* last time we did a full bitmap_sync */
325     int64_t time_last_bitmap_sync;
326     /* bytes transferred at start_time */
327     uint64_t bytes_xfer_prev;
328     /* number of dirty pages since start_time */
329     uint64_t num_dirty_pages_period;
330     /* xbzrle misses since the beginning of the period */
331     uint64_t xbzrle_cache_miss_prev;
332 
333     /* compression statistics since the beginning of the period */
334     /* amount of count that no free thread to compress data */
335     uint64_t compress_thread_busy_prev;
336     /* amount bytes after compression */
337     uint64_t compressed_size_prev;
338     /* amount of compressed pages */
339     uint64_t compress_pages_prev;
340 
341     /* total handled target pages at the beginning of period */
342     uint64_t target_page_count_prev;
343     /* total handled target pages since start */
344     uint64_t target_page_count;
345     /* number of dirty bits in the bitmap */
346     uint64_t migration_dirty_pages;
347     /* Protects modification of the bitmap and migration dirty pages */
348     QemuMutex bitmap_mutex;
349     /* The RAMBlock used in the last src_page_requests */
350     RAMBlock *last_req_rb;
351     /* Queue of outstanding page requests from the destination */
352     QemuMutex src_page_req_mutex;
353     QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
354 };
355 typedef struct RAMState RAMState;
356 
357 static RAMState *ram_state;
358 
359 static NotifierWithReturnList precopy_notifier_list;
360 
361 void precopy_infrastructure_init(void)
362 {
363     notifier_with_return_list_init(&precopy_notifier_list);
364 }
365 
366 void precopy_add_notifier(NotifierWithReturn *n)
367 {
368     notifier_with_return_list_add(&precopy_notifier_list, n);
369 }
370 
371 void precopy_remove_notifier(NotifierWithReturn *n)
372 {
373     notifier_with_return_remove(n);
374 }
375 
376 int precopy_notify(PrecopyNotifyReason reason, Error **errp)
377 {
378     PrecopyNotifyData pnd;
379     pnd.reason = reason;
380     pnd.errp = errp;
381 
382     return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
383 }
384 
385 void precopy_enable_free_page_optimization(void)
386 {
387     if (!ram_state) {
388         return;
389     }
390 
391     ram_state->fpo_enabled = true;
392 }
393 
394 uint64_t ram_bytes_remaining(void)
395 {
396     return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
397                        0;
398 }
399 
400 MigrationStats ram_counters;
401 
402 /* used by the search for pages to send */
403 struct PageSearchStatus {
404     /* Current block being searched */
405     RAMBlock    *block;
406     /* Current page to search from */
407     unsigned long page;
408     /* Set once we wrap around */
409     bool         complete_round;
410 };
411 typedef struct PageSearchStatus PageSearchStatus;
412 
413 CompressionStats compression_counters;
414 
415 struct CompressParam {
416     bool done;
417     bool quit;
418     bool zero_page;
419     QEMUFile *file;
420     QemuMutex mutex;
421     QemuCond cond;
422     RAMBlock *block;
423     ram_addr_t offset;
424 
425     /* internally used fields */
426     z_stream stream;
427     uint8_t *originbuf;
428 };
429 typedef struct CompressParam CompressParam;
430 
431 struct DecompressParam {
432     bool done;
433     bool quit;
434     QemuMutex mutex;
435     QemuCond cond;
436     void *des;
437     uint8_t *compbuf;
438     int len;
439     z_stream stream;
440 };
441 typedef struct DecompressParam DecompressParam;
442 
443 static CompressParam *comp_param;
444 static QemuThread *compress_threads;
445 /* comp_done_cond is used to wake up the migration thread when
446  * one of the compression threads has finished the compression.
447  * comp_done_lock is used to co-work with comp_done_cond.
448  */
449 static QemuMutex comp_done_lock;
450 static QemuCond comp_done_cond;
451 /* The empty QEMUFileOps will be used by file in CompressParam */
452 static const QEMUFileOps empty_ops = { };
453 
454 static QEMUFile *decomp_file;
455 static DecompressParam *decomp_param;
456 static QemuThread *decompress_threads;
457 static QemuMutex decomp_done_lock;
458 static QemuCond decomp_done_cond;
459 
460 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
461                                  ram_addr_t offset, uint8_t *source_buf);
462 
463 static void *do_data_compress(void *opaque)
464 {
465     CompressParam *param = opaque;
466     RAMBlock *block;
467     ram_addr_t offset;
468     bool zero_page;
469 
470     qemu_mutex_lock(&param->mutex);
471     while (!param->quit) {
472         if (param->block) {
473             block = param->block;
474             offset = param->offset;
475             param->block = NULL;
476             qemu_mutex_unlock(&param->mutex);
477 
478             zero_page = do_compress_ram_page(param->file, &param->stream,
479                                              block, offset, param->originbuf);
480 
481             qemu_mutex_lock(&comp_done_lock);
482             param->done = true;
483             param->zero_page = zero_page;
484             qemu_cond_signal(&comp_done_cond);
485             qemu_mutex_unlock(&comp_done_lock);
486 
487             qemu_mutex_lock(&param->mutex);
488         } else {
489             qemu_cond_wait(&param->cond, &param->mutex);
490         }
491     }
492     qemu_mutex_unlock(&param->mutex);
493 
494     return NULL;
495 }
496 
497 static void compress_threads_save_cleanup(void)
498 {
499     int i, thread_count;
500 
501     if (!migrate_use_compression() || !comp_param) {
502         return;
503     }
504 
505     thread_count = migrate_compress_threads();
506     for (i = 0; i < thread_count; i++) {
507         /*
508          * we use it as a indicator which shows if the thread is
509          * properly init'd or not
510          */
511         if (!comp_param[i].file) {
512             break;
513         }
514 
515         qemu_mutex_lock(&comp_param[i].mutex);
516         comp_param[i].quit = true;
517         qemu_cond_signal(&comp_param[i].cond);
518         qemu_mutex_unlock(&comp_param[i].mutex);
519 
520         qemu_thread_join(compress_threads + i);
521         qemu_mutex_destroy(&comp_param[i].mutex);
522         qemu_cond_destroy(&comp_param[i].cond);
523         deflateEnd(&comp_param[i].stream);
524         g_free(comp_param[i].originbuf);
525         qemu_fclose(comp_param[i].file);
526         comp_param[i].file = NULL;
527     }
528     qemu_mutex_destroy(&comp_done_lock);
529     qemu_cond_destroy(&comp_done_cond);
530     g_free(compress_threads);
531     g_free(comp_param);
532     compress_threads = NULL;
533     comp_param = NULL;
534 }
535 
536 static int compress_threads_save_setup(void)
537 {
538     int i, thread_count;
539 
540     if (!migrate_use_compression()) {
541         return 0;
542     }
543     thread_count = migrate_compress_threads();
544     compress_threads = g_new0(QemuThread, thread_count);
545     comp_param = g_new0(CompressParam, thread_count);
546     qemu_cond_init(&comp_done_cond);
547     qemu_mutex_init(&comp_done_lock);
548     for (i = 0; i < thread_count; i++) {
549         comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
550         if (!comp_param[i].originbuf) {
551             goto exit;
552         }
553 
554         if (deflateInit(&comp_param[i].stream,
555                         migrate_compress_level()) != Z_OK) {
556             g_free(comp_param[i].originbuf);
557             goto exit;
558         }
559 
560         /* comp_param[i].file is just used as a dummy buffer to save data,
561          * set its ops to empty.
562          */
563         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
564         comp_param[i].done = true;
565         comp_param[i].quit = false;
566         qemu_mutex_init(&comp_param[i].mutex);
567         qemu_cond_init(&comp_param[i].cond);
568         qemu_thread_create(compress_threads + i, "compress",
569                            do_data_compress, comp_param + i,
570                            QEMU_THREAD_JOINABLE);
571     }
572     return 0;
573 
574 exit:
575     compress_threads_save_cleanup();
576     return -1;
577 }
578 
579 /* Multiple fd's */
580 
581 #define MULTIFD_MAGIC 0x11223344U
582 #define MULTIFD_VERSION 1
583 
584 #define MULTIFD_FLAG_SYNC (1 << 0)
585 
586 typedef struct {
587     uint32_t magic;
588     uint32_t version;
589     unsigned char uuid[16]; /* QemuUUID */
590     uint8_t id;
591 } __attribute__((packed)) MultiFDInit_t;
592 
593 typedef struct {
594     uint32_t magic;
595     uint32_t version;
596     uint32_t flags;
597     uint32_t size;
598     uint32_t used;
599     uint64_t packet_num;
600     char ramblock[256];
601     uint64_t offset[];
602 } __attribute__((packed)) MultiFDPacket_t;
603 
604 typedef struct {
605     /* number of used pages */
606     uint32_t used;
607     /* number of allocated pages */
608     uint32_t allocated;
609     /* global number of generated multifd packets */
610     uint64_t packet_num;
611     /* offset of each page */
612     ram_addr_t *offset;
613     /* pointer to each page */
614     struct iovec *iov;
615     RAMBlock *block;
616 } MultiFDPages_t;
617 
618 typedef struct {
619     /* this fields are not changed once the thread is created */
620     /* channel number */
621     uint8_t id;
622     /* channel thread name */
623     char *name;
624     /* channel thread id */
625     QemuThread thread;
626     /* communication channel */
627     QIOChannel *c;
628     /* sem where to wait for more work */
629     QemuSemaphore sem;
630     /* this mutex protects the following parameters */
631     QemuMutex mutex;
632     /* is this channel thread running */
633     bool running;
634     /* should this thread finish */
635     bool quit;
636     /* thread has work to do */
637     int pending_job;
638     /* array of pages to sent */
639     MultiFDPages_t *pages;
640     /* packet allocated len */
641     uint32_t packet_len;
642     /* pointer to the packet */
643     MultiFDPacket_t *packet;
644     /* multifd flags for each packet */
645     uint32_t flags;
646     /* global number of generated multifd packets */
647     uint64_t packet_num;
648     /* thread local variables */
649     /* packets sent through this channel */
650     uint64_t num_packets;
651     /* pages sent through this channel */
652     uint64_t num_pages;
653     /* syncs main thread and channels */
654     QemuSemaphore sem_sync;
655 }  MultiFDSendParams;
656 
657 typedef struct {
658     /* this fields are not changed once the thread is created */
659     /* channel number */
660     uint8_t id;
661     /* channel thread name */
662     char *name;
663     /* channel thread id */
664     QemuThread thread;
665     /* communication channel */
666     QIOChannel *c;
667     /* this mutex protects the following parameters */
668     QemuMutex mutex;
669     /* is this channel thread running */
670     bool running;
671     /* array of pages to receive */
672     MultiFDPages_t *pages;
673     /* packet allocated len */
674     uint32_t packet_len;
675     /* pointer to the packet */
676     MultiFDPacket_t *packet;
677     /* multifd flags for each packet */
678     uint32_t flags;
679     /* global number of generated multifd packets */
680     uint64_t packet_num;
681     /* thread local variables */
682     /* packets sent through this channel */
683     uint64_t num_packets;
684     /* pages sent through this channel */
685     uint64_t num_pages;
686     /* syncs main thread and channels */
687     QemuSemaphore sem_sync;
688 } MultiFDRecvParams;
689 
690 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
691 {
692     MultiFDInit_t msg;
693     int ret;
694 
695     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
696     msg.version = cpu_to_be32(MULTIFD_VERSION);
697     msg.id = p->id;
698     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
699 
700     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
701     if (ret != 0) {
702         return -1;
703     }
704     return 0;
705 }
706 
707 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
708 {
709     MultiFDInit_t msg;
710     int ret;
711 
712     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
713     if (ret != 0) {
714         return -1;
715     }
716 
717     msg.magic = be32_to_cpu(msg.magic);
718     msg.version = be32_to_cpu(msg.version);
719 
720     if (msg.magic != MULTIFD_MAGIC) {
721         error_setg(errp, "multifd: received packet magic %x "
722                    "expected %x", msg.magic, MULTIFD_MAGIC);
723         return -1;
724     }
725 
726     if (msg.version != MULTIFD_VERSION) {
727         error_setg(errp, "multifd: received packet version %d "
728                    "expected %d", msg.version, MULTIFD_VERSION);
729         return -1;
730     }
731 
732     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
733         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
734         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
735 
736         error_setg(errp, "multifd: received uuid '%s' and expected "
737                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
738         g_free(uuid);
739         g_free(msg_uuid);
740         return -1;
741     }
742 
743     if (msg.id > migrate_multifd_channels()) {
744         error_setg(errp, "multifd: received channel version %d "
745                    "expected %d", msg.version, MULTIFD_VERSION);
746         return -1;
747     }
748 
749     return msg.id;
750 }
751 
752 static MultiFDPages_t *multifd_pages_init(size_t size)
753 {
754     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
755 
756     pages->allocated = size;
757     pages->iov = g_new0(struct iovec, size);
758     pages->offset = g_new0(ram_addr_t, size);
759 
760     return pages;
761 }
762 
763 static void multifd_pages_clear(MultiFDPages_t *pages)
764 {
765     pages->used = 0;
766     pages->allocated = 0;
767     pages->packet_num = 0;
768     pages->block = NULL;
769     g_free(pages->iov);
770     pages->iov = NULL;
771     g_free(pages->offset);
772     pages->offset = NULL;
773     g_free(pages);
774 }
775 
776 static void multifd_send_fill_packet(MultiFDSendParams *p)
777 {
778     MultiFDPacket_t *packet = p->packet;
779     int i;
780 
781     packet->magic = cpu_to_be32(MULTIFD_MAGIC);
782     packet->version = cpu_to_be32(MULTIFD_VERSION);
783     packet->flags = cpu_to_be32(p->flags);
784     packet->size = cpu_to_be32(migrate_multifd_page_count());
785     packet->used = cpu_to_be32(p->pages->used);
786     packet->packet_num = cpu_to_be64(p->packet_num);
787 
788     if (p->pages->block) {
789         strncpy(packet->ramblock, p->pages->block->idstr, 256);
790     }
791 
792     for (i = 0; i < p->pages->used; i++) {
793         packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
794     }
795 }
796 
797 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
798 {
799     MultiFDPacket_t *packet = p->packet;
800     RAMBlock *block;
801     int i;
802 
803     packet->magic = be32_to_cpu(packet->magic);
804     if (packet->magic != MULTIFD_MAGIC) {
805         error_setg(errp, "multifd: received packet "
806                    "magic %x and expected magic %x",
807                    packet->magic, MULTIFD_MAGIC);
808         return -1;
809     }
810 
811     packet->version = be32_to_cpu(packet->version);
812     if (packet->version != MULTIFD_VERSION) {
813         error_setg(errp, "multifd: received packet "
814                    "version %d and expected version %d",
815                    packet->version, MULTIFD_VERSION);
816         return -1;
817     }
818 
819     p->flags = be32_to_cpu(packet->flags);
820 
821     packet->size = be32_to_cpu(packet->size);
822     if (packet->size > migrate_multifd_page_count()) {
823         error_setg(errp, "multifd: received packet "
824                    "with size %d and expected maximum size %d",
825                    packet->size, migrate_multifd_page_count()) ;
826         return -1;
827     }
828 
829     p->pages->used = be32_to_cpu(packet->used);
830     if (p->pages->used > packet->size) {
831         error_setg(errp, "multifd: received packet "
832                    "with size %d and expected maximum size %d",
833                    p->pages->used, packet->size) ;
834         return -1;
835     }
836 
837     p->packet_num = be64_to_cpu(packet->packet_num);
838 
839     if (p->pages->used) {
840         /* make sure that ramblock is 0 terminated */
841         packet->ramblock[255] = 0;
842         block = qemu_ram_block_by_name(packet->ramblock);
843         if (!block) {
844             error_setg(errp, "multifd: unknown ram block %s",
845                        packet->ramblock);
846             return -1;
847         }
848     }
849 
850     for (i = 0; i < p->pages->used; i++) {
851         ram_addr_t offset = be64_to_cpu(packet->offset[i]);
852 
853         if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
854             error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
855                        " (max " RAM_ADDR_FMT ")",
856                        offset, block->max_length);
857             return -1;
858         }
859         p->pages->iov[i].iov_base = block->host + offset;
860         p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
861     }
862 
863     return 0;
864 }
865 
866 struct {
867     MultiFDSendParams *params;
868     /* number of created threads */
869     int count;
870     /* array of pages to sent */
871     MultiFDPages_t *pages;
872     /* syncs main thread and channels */
873     QemuSemaphore sem_sync;
874     /* global number of generated multifd packets */
875     uint64_t packet_num;
876     /* send channels ready */
877     QemuSemaphore channels_ready;
878 } *multifd_send_state;
879 
880 /*
881  * How we use multifd_send_state->pages and channel->pages?
882  *
883  * We create a pages for each channel, and a main one.  Each time that
884  * we need to send a batch of pages we interchange the ones between
885  * multifd_send_state and the channel that is sending it.  There are
886  * two reasons for that:
887  *    - to not have to do so many mallocs during migration
888  *    - to make easier to know what to free at the end of migration
889  *
890  * This way we always know who is the owner of each "pages" struct,
891  * and we don't need any loocking.  It belongs to the migration thread
892  * or to the channel thread.  Switching is safe because the migration
893  * thread is using the channel mutex when changing it, and the channel
894  * have to had finish with its own, otherwise pending_job can't be
895  * false.
896  */
897 
898 static void multifd_send_pages(void)
899 {
900     int i;
901     static int next_channel;
902     MultiFDSendParams *p = NULL; /* make happy gcc */
903     MultiFDPages_t *pages = multifd_send_state->pages;
904     uint64_t transferred;
905 
906     qemu_sem_wait(&multifd_send_state->channels_ready);
907     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
908         p = &multifd_send_state->params[i];
909 
910         qemu_mutex_lock(&p->mutex);
911         if (!p->pending_job) {
912             p->pending_job++;
913             next_channel = (i + 1) % migrate_multifd_channels();
914             break;
915         }
916         qemu_mutex_unlock(&p->mutex);
917     }
918     p->pages->used = 0;
919 
920     p->packet_num = multifd_send_state->packet_num++;
921     p->pages->block = NULL;
922     multifd_send_state->pages = p->pages;
923     p->pages = pages;
924     transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
925     ram_counters.multifd_bytes += transferred;
926     ram_counters.transferred += transferred;;
927     qemu_mutex_unlock(&p->mutex);
928     qemu_sem_post(&p->sem);
929 }
930 
931 static void multifd_queue_page(RAMBlock *block, ram_addr_t offset)
932 {
933     MultiFDPages_t *pages = multifd_send_state->pages;
934 
935     if (!pages->block) {
936         pages->block = block;
937     }
938 
939     if (pages->block == block) {
940         pages->offset[pages->used] = offset;
941         pages->iov[pages->used].iov_base = block->host + offset;
942         pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
943         pages->used++;
944 
945         if (pages->used < pages->allocated) {
946             return;
947         }
948     }
949 
950     multifd_send_pages();
951 
952     if (pages->block != block) {
953         multifd_queue_page(block, offset);
954     }
955 }
956 
957 static void multifd_send_terminate_threads(Error *err)
958 {
959     int i;
960 
961     if (err) {
962         MigrationState *s = migrate_get_current();
963         migrate_set_error(s, err);
964         if (s->state == MIGRATION_STATUS_SETUP ||
965             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
966             s->state == MIGRATION_STATUS_DEVICE ||
967             s->state == MIGRATION_STATUS_ACTIVE) {
968             migrate_set_state(&s->state, s->state,
969                               MIGRATION_STATUS_FAILED);
970         }
971     }
972 
973     for (i = 0; i < migrate_multifd_channels(); i++) {
974         MultiFDSendParams *p = &multifd_send_state->params[i];
975 
976         qemu_mutex_lock(&p->mutex);
977         p->quit = true;
978         qemu_sem_post(&p->sem);
979         qemu_mutex_unlock(&p->mutex);
980     }
981 }
982 
983 void multifd_save_cleanup(void)
984 {
985     int i;
986 
987     if (!migrate_use_multifd()) {
988         return;
989     }
990     multifd_send_terminate_threads(NULL);
991     for (i = 0; i < migrate_multifd_channels(); i++) {
992         MultiFDSendParams *p = &multifd_send_state->params[i];
993 
994         if (p->running) {
995             qemu_thread_join(&p->thread);
996         }
997         socket_send_channel_destroy(p->c);
998         p->c = NULL;
999         qemu_mutex_destroy(&p->mutex);
1000         qemu_sem_destroy(&p->sem);
1001         qemu_sem_destroy(&p->sem_sync);
1002         g_free(p->name);
1003         p->name = NULL;
1004         multifd_pages_clear(p->pages);
1005         p->pages = NULL;
1006         p->packet_len = 0;
1007         g_free(p->packet);
1008         p->packet = NULL;
1009     }
1010     qemu_sem_destroy(&multifd_send_state->channels_ready);
1011     qemu_sem_destroy(&multifd_send_state->sem_sync);
1012     g_free(multifd_send_state->params);
1013     multifd_send_state->params = NULL;
1014     multifd_pages_clear(multifd_send_state->pages);
1015     multifd_send_state->pages = NULL;
1016     g_free(multifd_send_state);
1017     multifd_send_state = NULL;
1018 }
1019 
1020 static void multifd_send_sync_main(void)
1021 {
1022     int i;
1023 
1024     if (!migrate_use_multifd()) {
1025         return;
1026     }
1027     if (multifd_send_state->pages->used) {
1028         multifd_send_pages();
1029     }
1030     for (i = 0; i < migrate_multifd_channels(); i++) {
1031         MultiFDSendParams *p = &multifd_send_state->params[i];
1032 
1033         trace_multifd_send_sync_main_signal(p->id);
1034 
1035         qemu_mutex_lock(&p->mutex);
1036 
1037         p->packet_num = multifd_send_state->packet_num++;
1038         p->flags |= MULTIFD_FLAG_SYNC;
1039         p->pending_job++;
1040         qemu_mutex_unlock(&p->mutex);
1041         qemu_sem_post(&p->sem);
1042     }
1043     for (i = 0; i < migrate_multifd_channels(); i++) {
1044         MultiFDSendParams *p = &multifd_send_state->params[i];
1045 
1046         trace_multifd_send_sync_main_wait(p->id);
1047         qemu_sem_wait(&multifd_send_state->sem_sync);
1048     }
1049     trace_multifd_send_sync_main(multifd_send_state->packet_num);
1050 }
1051 
1052 static void *multifd_send_thread(void *opaque)
1053 {
1054     MultiFDSendParams *p = opaque;
1055     Error *local_err = NULL;
1056     int ret;
1057 
1058     trace_multifd_send_thread_start(p->id);
1059     rcu_register_thread();
1060 
1061     if (multifd_send_initial_packet(p, &local_err) < 0) {
1062         goto out;
1063     }
1064     /* initial packet */
1065     p->num_packets = 1;
1066 
1067     while (true) {
1068         qemu_sem_wait(&p->sem);
1069         qemu_mutex_lock(&p->mutex);
1070 
1071         if (p->pending_job) {
1072             uint32_t used = p->pages->used;
1073             uint64_t packet_num = p->packet_num;
1074             uint32_t flags = p->flags;
1075 
1076             multifd_send_fill_packet(p);
1077             p->flags = 0;
1078             p->num_packets++;
1079             p->num_pages += used;
1080             p->pages->used = 0;
1081             qemu_mutex_unlock(&p->mutex);
1082 
1083             trace_multifd_send(p->id, packet_num, used, flags);
1084 
1085             ret = qio_channel_write_all(p->c, (void *)p->packet,
1086                                         p->packet_len, &local_err);
1087             if (ret != 0) {
1088                 break;
1089             }
1090 
1091             ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
1092             if (ret != 0) {
1093                 break;
1094             }
1095 
1096             qemu_mutex_lock(&p->mutex);
1097             p->pending_job--;
1098             qemu_mutex_unlock(&p->mutex);
1099 
1100             if (flags & MULTIFD_FLAG_SYNC) {
1101                 qemu_sem_post(&multifd_send_state->sem_sync);
1102             }
1103             qemu_sem_post(&multifd_send_state->channels_ready);
1104         } else if (p->quit) {
1105             qemu_mutex_unlock(&p->mutex);
1106             break;
1107         } else {
1108             qemu_mutex_unlock(&p->mutex);
1109             /* sometimes there are spurious wakeups */
1110         }
1111     }
1112 
1113 out:
1114     if (local_err) {
1115         multifd_send_terminate_threads(local_err);
1116     }
1117 
1118     qemu_mutex_lock(&p->mutex);
1119     p->running = false;
1120     qemu_mutex_unlock(&p->mutex);
1121 
1122     rcu_unregister_thread();
1123     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1124 
1125     return NULL;
1126 }
1127 
1128 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1129 {
1130     MultiFDSendParams *p = opaque;
1131     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1132     Error *local_err = NULL;
1133 
1134     if (qio_task_propagate_error(task, &local_err)) {
1135         migrate_set_error(migrate_get_current(), local_err);
1136         multifd_save_cleanup();
1137     } else {
1138         p->c = QIO_CHANNEL(sioc);
1139         qio_channel_set_delay(p->c, false);
1140         p->running = true;
1141         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1142                            QEMU_THREAD_JOINABLE);
1143 
1144         atomic_inc(&multifd_send_state->count);
1145     }
1146 }
1147 
1148 int multifd_save_setup(void)
1149 {
1150     int thread_count;
1151     uint32_t page_count = migrate_multifd_page_count();
1152     uint8_t i;
1153 
1154     if (!migrate_use_multifd()) {
1155         return 0;
1156     }
1157     thread_count = migrate_multifd_channels();
1158     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1159     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1160     atomic_set(&multifd_send_state->count, 0);
1161     multifd_send_state->pages = multifd_pages_init(page_count);
1162     qemu_sem_init(&multifd_send_state->sem_sync, 0);
1163     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1164 
1165     for (i = 0; i < thread_count; i++) {
1166         MultiFDSendParams *p = &multifd_send_state->params[i];
1167 
1168         qemu_mutex_init(&p->mutex);
1169         qemu_sem_init(&p->sem, 0);
1170         qemu_sem_init(&p->sem_sync, 0);
1171         p->quit = false;
1172         p->pending_job = 0;
1173         p->id = i;
1174         p->pages = multifd_pages_init(page_count);
1175         p->packet_len = sizeof(MultiFDPacket_t)
1176                       + sizeof(ram_addr_t) * page_count;
1177         p->packet = g_malloc0(p->packet_len);
1178         p->name = g_strdup_printf("multifdsend_%d", i);
1179         socket_send_channel_create(multifd_new_send_channel_async, p);
1180     }
1181     return 0;
1182 }
1183 
1184 struct {
1185     MultiFDRecvParams *params;
1186     /* number of created threads */
1187     int count;
1188     /* syncs main thread and channels */
1189     QemuSemaphore sem_sync;
1190     /* global number of generated multifd packets */
1191     uint64_t packet_num;
1192 } *multifd_recv_state;
1193 
1194 static void multifd_recv_terminate_threads(Error *err)
1195 {
1196     int i;
1197 
1198     if (err) {
1199         MigrationState *s = migrate_get_current();
1200         migrate_set_error(s, err);
1201         if (s->state == MIGRATION_STATUS_SETUP ||
1202             s->state == MIGRATION_STATUS_ACTIVE) {
1203             migrate_set_state(&s->state, s->state,
1204                               MIGRATION_STATUS_FAILED);
1205         }
1206     }
1207 
1208     for (i = 0; i < migrate_multifd_channels(); i++) {
1209         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1210 
1211         qemu_mutex_lock(&p->mutex);
1212         /* We could arrive here for two reasons:
1213            - normal quit, i.e. everything went fine, just finished
1214            - error quit: We close the channels so the channel threads
1215              finish the qio_channel_read_all_eof() */
1216         qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1217         qemu_mutex_unlock(&p->mutex);
1218     }
1219 }
1220 
1221 int multifd_load_cleanup(Error **errp)
1222 {
1223     int i;
1224     int ret = 0;
1225 
1226     if (!migrate_use_multifd()) {
1227         return 0;
1228     }
1229     multifd_recv_terminate_threads(NULL);
1230     for (i = 0; i < migrate_multifd_channels(); i++) {
1231         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1232 
1233         if (p->running) {
1234             qemu_thread_join(&p->thread);
1235         }
1236         object_unref(OBJECT(p->c));
1237         p->c = NULL;
1238         qemu_mutex_destroy(&p->mutex);
1239         qemu_sem_destroy(&p->sem_sync);
1240         g_free(p->name);
1241         p->name = NULL;
1242         multifd_pages_clear(p->pages);
1243         p->pages = NULL;
1244         p->packet_len = 0;
1245         g_free(p->packet);
1246         p->packet = NULL;
1247     }
1248     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1249     g_free(multifd_recv_state->params);
1250     multifd_recv_state->params = NULL;
1251     g_free(multifd_recv_state);
1252     multifd_recv_state = NULL;
1253 
1254     return ret;
1255 }
1256 
1257 static void multifd_recv_sync_main(void)
1258 {
1259     int i;
1260 
1261     if (!migrate_use_multifd()) {
1262         return;
1263     }
1264     for (i = 0; i < migrate_multifd_channels(); i++) {
1265         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1266 
1267         trace_multifd_recv_sync_main_wait(p->id);
1268         qemu_sem_wait(&multifd_recv_state->sem_sync);
1269         qemu_mutex_lock(&p->mutex);
1270         if (multifd_recv_state->packet_num < p->packet_num) {
1271             multifd_recv_state->packet_num = p->packet_num;
1272         }
1273         qemu_mutex_unlock(&p->mutex);
1274     }
1275     for (i = 0; i < migrate_multifd_channels(); i++) {
1276         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1277 
1278         trace_multifd_recv_sync_main_signal(p->id);
1279         qemu_sem_post(&p->sem_sync);
1280     }
1281     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1282 }
1283 
1284 static void *multifd_recv_thread(void *opaque)
1285 {
1286     MultiFDRecvParams *p = opaque;
1287     Error *local_err = NULL;
1288     int ret;
1289 
1290     trace_multifd_recv_thread_start(p->id);
1291     rcu_register_thread();
1292 
1293     while (true) {
1294         uint32_t used;
1295         uint32_t flags;
1296 
1297         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1298                                        p->packet_len, &local_err);
1299         if (ret == 0) {   /* EOF */
1300             break;
1301         }
1302         if (ret == -1) {   /* Error */
1303             break;
1304         }
1305 
1306         qemu_mutex_lock(&p->mutex);
1307         ret = multifd_recv_unfill_packet(p, &local_err);
1308         if (ret) {
1309             qemu_mutex_unlock(&p->mutex);
1310             break;
1311         }
1312 
1313         used = p->pages->used;
1314         flags = p->flags;
1315         trace_multifd_recv(p->id, p->packet_num, used, flags);
1316         p->num_packets++;
1317         p->num_pages += used;
1318         qemu_mutex_unlock(&p->mutex);
1319 
1320         ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
1321         if (ret != 0) {
1322             break;
1323         }
1324 
1325         if (flags & MULTIFD_FLAG_SYNC) {
1326             qemu_sem_post(&multifd_recv_state->sem_sync);
1327             qemu_sem_wait(&p->sem_sync);
1328         }
1329     }
1330 
1331     if (local_err) {
1332         multifd_recv_terminate_threads(local_err);
1333     }
1334     qemu_mutex_lock(&p->mutex);
1335     p->running = false;
1336     qemu_mutex_unlock(&p->mutex);
1337 
1338     rcu_unregister_thread();
1339     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1340 
1341     return NULL;
1342 }
1343 
1344 int multifd_load_setup(void)
1345 {
1346     int thread_count;
1347     uint32_t page_count = migrate_multifd_page_count();
1348     uint8_t i;
1349 
1350     if (!migrate_use_multifd()) {
1351         return 0;
1352     }
1353     thread_count = migrate_multifd_channels();
1354     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1355     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1356     atomic_set(&multifd_recv_state->count, 0);
1357     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1358 
1359     for (i = 0; i < thread_count; i++) {
1360         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1361 
1362         qemu_mutex_init(&p->mutex);
1363         qemu_sem_init(&p->sem_sync, 0);
1364         p->id = i;
1365         p->pages = multifd_pages_init(page_count);
1366         p->packet_len = sizeof(MultiFDPacket_t)
1367                       + sizeof(ram_addr_t) * page_count;
1368         p->packet = g_malloc0(p->packet_len);
1369         p->name = g_strdup_printf("multifdrecv_%d", i);
1370     }
1371     return 0;
1372 }
1373 
1374 bool multifd_recv_all_channels_created(void)
1375 {
1376     int thread_count = migrate_multifd_channels();
1377 
1378     if (!migrate_use_multifd()) {
1379         return true;
1380     }
1381 
1382     return thread_count == atomic_read(&multifd_recv_state->count);
1383 }
1384 
1385 /*
1386  * Try to receive all multifd channels to get ready for the migration.
1387  * - Return true and do not set @errp when correctly receving all channels;
1388  * - Return false and do not set @errp when correctly receiving the current one;
1389  * - Return false and set @errp when failing to receive the current channel.
1390  */
1391 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1392 {
1393     MultiFDRecvParams *p;
1394     Error *local_err = NULL;
1395     int id;
1396 
1397     id = multifd_recv_initial_packet(ioc, &local_err);
1398     if (id < 0) {
1399         multifd_recv_terminate_threads(local_err);
1400         error_propagate_prepend(errp, local_err,
1401                                 "failed to receive packet"
1402                                 " via multifd channel %d: ",
1403                                 atomic_read(&multifd_recv_state->count));
1404         return false;
1405     }
1406 
1407     p = &multifd_recv_state->params[id];
1408     if (p->c != NULL) {
1409         error_setg(&local_err, "multifd: received id '%d' already setup'",
1410                    id);
1411         multifd_recv_terminate_threads(local_err);
1412         error_propagate(errp, local_err);
1413         return false;
1414     }
1415     p->c = ioc;
1416     object_ref(OBJECT(ioc));
1417     /* initial packet */
1418     p->num_packets = 1;
1419 
1420     p->running = true;
1421     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1422                        QEMU_THREAD_JOINABLE);
1423     atomic_inc(&multifd_recv_state->count);
1424     return atomic_read(&multifd_recv_state->count) ==
1425            migrate_multifd_channels();
1426 }
1427 
1428 /**
1429  * save_page_header: write page header to wire
1430  *
1431  * If this is the 1st block, it also writes the block identification
1432  *
1433  * Returns the number of bytes written
1434  *
1435  * @f: QEMUFile where to send the data
1436  * @block: block that contains the page we want to send
1437  * @offset: offset inside the block for the page
1438  *          in the lower bits, it contains flags
1439  */
1440 static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1441                                ram_addr_t offset)
1442 {
1443     size_t size, len;
1444 
1445     if (block == rs->last_sent_block) {
1446         offset |= RAM_SAVE_FLAG_CONTINUE;
1447     }
1448     qemu_put_be64(f, offset);
1449     size = 8;
1450 
1451     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1452         len = strlen(block->idstr);
1453         qemu_put_byte(f, len);
1454         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1455         size += 1 + len;
1456         rs->last_sent_block = block;
1457     }
1458     return size;
1459 }
1460 
1461 /**
1462  * mig_throttle_guest_down: throotle down the guest
1463  *
1464  * Reduce amount of guest cpu execution to hopefully slow down memory
1465  * writes. If guest dirty memory rate is reduced below the rate at
1466  * which we can transfer pages to the destination then we should be
1467  * able to complete migration. Some workloads dirty memory way too
1468  * fast and will not effectively converge, even with auto-converge.
1469  */
1470 static void mig_throttle_guest_down(void)
1471 {
1472     MigrationState *s = migrate_get_current();
1473     uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1474     uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1475     int pct_max = s->parameters.max_cpu_throttle;
1476 
1477     /* We have not started throttling yet. Let's start it. */
1478     if (!cpu_throttle_active()) {
1479         cpu_throttle_set(pct_initial);
1480     } else {
1481         /* Throttling already on, just increase the rate */
1482         cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1483                          pct_max));
1484     }
1485 }
1486 
1487 /**
1488  * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1489  *
1490  * @rs: current RAM state
1491  * @current_addr: address for the zero page
1492  *
1493  * Update the xbzrle cache to reflect a page that's been sent as all 0.
1494  * The important thing is that a stale (not-yet-0'd) page be replaced
1495  * by the new data.
1496  * As a bonus, if the page wasn't in the cache it gets added so that
1497  * when a small write is made into the 0'd page it gets XBZRLE sent.
1498  */
1499 static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1500 {
1501     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1502         return;
1503     }
1504 
1505     /* We don't care if this fails to allocate a new cache page
1506      * as long as it updated an old one */
1507     cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1508                  ram_counters.dirty_sync_count);
1509 }
1510 
1511 #define ENCODING_FLAG_XBZRLE 0x1
1512 
1513 /**
1514  * save_xbzrle_page: compress and send current page
1515  *
1516  * Returns: 1 means that we wrote the page
1517  *          0 means that page is identical to the one already sent
1518  *          -1 means that xbzrle would be longer than normal
1519  *
1520  * @rs: current RAM state
1521  * @current_data: pointer to the address of the page contents
1522  * @current_addr: addr of the page
1523  * @block: block that contains the page we want to send
1524  * @offset: offset inside the block for the page
1525  * @last_stage: if we are at the completion stage
1526  */
1527 static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1528                             ram_addr_t current_addr, RAMBlock *block,
1529                             ram_addr_t offset, bool last_stage)
1530 {
1531     int encoded_len = 0, bytes_xbzrle;
1532     uint8_t *prev_cached_page;
1533 
1534     if (!cache_is_cached(XBZRLE.cache, current_addr,
1535                          ram_counters.dirty_sync_count)) {
1536         xbzrle_counters.cache_miss++;
1537         if (!last_stage) {
1538             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1539                              ram_counters.dirty_sync_count) == -1) {
1540                 return -1;
1541             } else {
1542                 /* update *current_data when the page has been
1543                    inserted into cache */
1544                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
1545             }
1546         }
1547         return -1;
1548     }
1549 
1550     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1551 
1552     /* save current buffer into memory */
1553     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1554 
1555     /* XBZRLE encoding (if there is no overflow) */
1556     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1557                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1558                                        TARGET_PAGE_SIZE);
1559     if (encoded_len == 0) {
1560         trace_save_xbzrle_page_skipping();
1561         return 0;
1562     } else if (encoded_len == -1) {
1563         trace_save_xbzrle_page_overflow();
1564         xbzrle_counters.overflow++;
1565         /* update data in the cache */
1566         if (!last_stage) {
1567             memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
1568             *current_data = prev_cached_page;
1569         }
1570         return -1;
1571     }
1572 
1573     /* we need to update the data in the cache, in order to get the same data */
1574     if (!last_stage) {
1575         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1576     }
1577 
1578     /* Send XBZRLE based compressed page */
1579     bytes_xbzrle = save_page_header(rs, rs->f, block,
1580                                     offset | RAM_SAVE_FLAG_XBZRLE);
1581     qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1582     qemu_put_be16(rs->f, encoded_len);
1583     qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1584     bytes_xbzrle += encoded_len + 1 + 2;
1585     xbzrle_counters.pages++;
1586     xbzrle_counters.bytes += bytes_xbzrle;
1587     ram_counters.transferred += bytes_xbzrle;
1588 
1589     return 1;
1590 }
1591 
1592 /**
1593  * migration_bitmap_find_dirty: find the next dirty page from start
1594  *
1595  * Called with rcu_read_lock() to protect migration_bitmap
1596  *
1597  * Returns the byte offset within memory region of the start of a dirty page
1598  *
1599  * @rs: current RAM state
1600  * @rb: RAMBlock where to search for dirty pages
1601  * @start: page where we start the search
1602  */
1603 static inline
1604 unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1605                                           unsigned long start)
1606 {
1607     unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1608     unsigned long *bitmap = rb->bmap;
1609     unsigned long next;
1610 
1611     if (ramblock_is_ignored(rb)) {
1612         return size;
1613     }
1614 
1615     /*
1616      * When the free page optimization is enabled, we need to check the bitmap
1617      * to send the non-free pages rather than all the pages in the bulk stage.
1618      */
1619     if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1620         next = start + 1;
1621     } else {
1622         next = find_next_bit(bitmap, size, start);
1623     }
1624 
1625     return next;
1626 }
1627 
1628 static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1629                                                 RAMBlock *rb,
1630                                                 unsigned long page)
1631 {
1632     bool ret;
1633 
1634     qemu_mutex_lock(&rs->bitmap_mutex);
1635     ret = test_and_clear_bit(page, rb->bmap);
1636 
1637     if (ret) {
1638         rs->migration_dirty_pages--;
1639     }
1640     qemu_mutex_unlock(&rs->bitmap_mutex);
1641 
1642     return ret;
1643 }
1644 
1645 static void migration_bitmap_sync_range(RAMState *rs, RAMBlock *rb,
1646                                         ram_addr_t start, ram_addr_t length)
1647 {
1648     rs->migration_dirty_pages +=
1649         cpu_physical_memory_sync_dirty_bitmap(rb, start, length,
1650                                               &rs->num_dirty_pages_period);
1651 }
1652 
1653 /**
1654  * ram_pagesize_summary: calculate all the pagesizes of a VM
1655  *
1656  * Returns a summary bitmap of the page sizes of all RAMBlocks
1657  *
1658  * For VMs with just normal pages this is equivalent to the host page
1659  * size. If it's got some huge pages then it's the OR of all the
1660  * different page sizes.
1661  */
1662 uint64_t ram_pagesize_summary(void)
1663 {
1664     RAMBlock *block;
1665     uint64_t summary = 0;
1666 
1667     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1668         summary |= block->page_size;
1669     }
1670 
1671     return summary;
1672 }
1673 
1674 uint64_t ram_get_total_transferred_pages(void)
1675 {
1676     return  ram_counters.normal + ram_counters.duplicate +
1677                 compression_counters.pages + xbzrle_counters.pages;
1678 }
1679 
1680 static void migration_update_rates(RAMState *rs, int64_t end_time)
1681 {
1682     uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1683     double compressed_size;
1684 
1685     /* calculate period counters */
1686     ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1687                 / (end_time - rs->time_last_bitmap_sync);
1688 
1689     if (!page_count) {
1690         return;
1691     }
1692 
1693     if (migrate_use_xbzrle()) {
1694         xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1695             rs->xbzrle_cache_miss_prev) / page_count;
1696         rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1697     }
1698 
1699     if (migrate_use_compression()) {
1700         compression_counters.busy_rate = (double)(compression_counters.busy -
1701             rs->compress_thread_busy_prev) / page_count;
1702         rs->compress_thread_busy_prev = compression_counters.busy;
1703 
1704         compressed_size = compression_counters.compressed_size -
1705                           rs->compressed_size_prev;
1706         if (compressed_size) {
1707             double uncompressed_size = (compression_counters.pages -
1708                                     rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1709 
1710             /* Compression-Ratio = Uncompressed-size / Compressed-size */
1711             compression_counters.compression_rate =
1712                                         uncompressed_size / compressed_size;
1713 
1714             rs->compress_pages_prev = compression_counters.pages;
1715             rs->compressed_size_prev = compression_counters.compressed_size;
1716         }
1717     }
1718 }
1719 
1720 static void migration_bitmap_sync(RAMState *rs)
1721 {
1722     RAMBlock *block;
1723     int64_t end_time;
1724     uint64_t bytes_xfer_now;
1725 
1726     ram_counters.dirty_sync_count++;
1727 
1728     if (!rs->time_last_bitmap_sync) {
1729         rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1730     }
1731 
1732     trace_migration_bitmap_sync_start();
1733     memory_global_dirty_log_sync();
1734 
1735     qemu_mutex_lock(&rs->bitmap_mutex);
1736     rcu_read_lock();
1737     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1738         migration_bitmap_sync_range(rs, block, 0, block->used_length);
1739     }
1740     ram_counters.remaining = ram_bytes_remaining();
1741     rcu_read_unlock();
1742     qemu_mutex_unlock(&rs->bitmap_mutex);
1743 
1744     trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1745 
1746     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1747 
1748     /* more than 1 second = 1000 millisecons */
1749     if (end_time > rs->time_last_bitmap_sync + 1000) {
1750         bytes_xfer_now = ram_counters.transferred;
1751 
1752         /* During block migration the auto-converge logic incorrectly detects
1753          * that ram migration makes no progress. Avoid this by disabling the
1754          * throttling logic during the bulk phase of block migration. */
1755         if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1756             /* The following detection logic can be refined later. For now:
1757                Check to see if the dirtied bytes is 50% more than the approx.
1758                amount of bytes that just got transferred since the last time we
1759                were in this routine. If that happens twice, start or increase
1760                throttling */
1761 
1762             if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1763                    (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1764                 (++rs->dirty_rate_high_cnt >= 2)) {
1765                     trace_migration_throttle();
1766                     rs->dirty_rate_high_cnt = 0;
1767                     mig_throttle_guest_down();
1768             }
1769         }
1770 
1771         migration_update_rates(rs, end_time);
1772 
1773         rs->target_page_count_prev = rs->target_page_count;
1774 
1775         /* reset period counters */
1776         rs->time_last_bitmap_sync = end_time;
1777         rs->num_dirty_pages_period = 0;
1778         rs->bytes_xfer_prev = bytes_xfer_now;
1779     }
1780     if (migrate_use_events()) {
1781         qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1782     }
1783 }
1784 
1785 static void migration_bitmap_sync_precopy(RAMState *rs)
1786 {
1787     Error *local_err = NULL;
1788 
1789     /*
1790      * The current notifier usage is just an optimization to migration, so we
1791      * don't stop the normal migration process in the error case.
1792      */
1793     if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1794         error_report_err(local_err);
1795     }
1796 
1797     migration_bitmap_sync(rs);
1798 
1799     if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1800         error_report_err(local_err);
1801     }
1802 }
1803 
1804 /**
1805  * save_zero_page_to_file: send the zero page to the file
1806  *
1807  * Returns the size of data written to the file, 0 means the page is not
1808  * a zero page
1809  *
1810  * @rs: current RAM state
1811  * @file: the file where the data is saved
1812  * @block: block that contains the page we want to send
1813  * @offset: offset inside the block for the page
1814  */
1815 static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1816                                   RAMBlock *block, ram_addr_t offset)
1817 {
1818     uint8_t *p = block->host + offset;
1819     int len = 0;
1820 
1821     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1822         len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1823         qemu_put_byte(file, 0);
1824         len += 1;
1825     }
1826     return len;
1827 }
1828 
1829 /**
1830  * save_zero_page: send the zero page to the stream
1831  *
1832  * Returns the number of pages written.
1833  *
1834  * @rs: current RAM state
1835  * @block: block that contains the page we want to send
1836  * @offset: offset inside the block for the page
1837  */
1838 static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1839 {
1840     int len = save_zero_page_to_file(rs, rs->f, block, offset);
1841 
1842     if (len) {
1843         ram_counters.duplicate++;
1844         ram_counters.transferred += len;
1845         return 1;
1846     }
1847     return -1;
1848 }
1849 
1850 static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
1851 {
1852     if (!migrate_release_ram() || !migration_in_postcopy()) {
1853         return;
1854     }
1855 
1856     ram_discard_range(rbname, offset, pages << TARGET_PAGE_BITS);
1857 }
1858 
1859 /*
1860  * @pages: the number of pages written by the control path,
1861  *        < 0 - error
1862  *        > 0 - number of pages written
1863  *
1864  * Return true if the pages has been saved, otherwise false is returned.
1865  */
1866 static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1867                               int *pages)
1868 {
1869     uint64_t bytes_xmit = 0;
1870     int ret;
1871 
1872     *pages = -1;
1873     ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
1874                                 &bytes_xmit);
1875     if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
1876         return false;
1877     }
1878 
1879     if (bytes_xmit) {
1880         ram_counters.transferred += bytes_xmit;
1881         *pages = 1;
1882     }
1883 
1884     if (ret == RAM_SAVE_CONTROL_DELAYED) {
1885         return true;
1886     }
1887 
1888     if (bytes_xmit > 0) {
1889         ram_counters.normal++;
1890     } else if (bytes_xmit == 0) {
1891         ram_counters.duplicate++;
1892     }
1893 
1894     return true;
1895 }
1896 
1897 /*
1898  * directly send the page to the stream
1899  *
1900  * Returns the number of pages written.
1901  *
1902  * @rs: current RAM state
1903  * @block: block that contains the page we want to send
1904  * @offset: offset inside the block for the page
1905  * @buf: the page to be sent
1906  * @async: send to page asyncly
1907  */
1908 static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
1909                             uint8_t *buf, bool async)
1910 {
1911     ram_counters.transferred += save_page_header(rs, rs->f, block,
1912                                                  offset | RAM_SAVE_FLAG_PAGE);
1913     if (async) {
1914         qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
1915                               migrate_release_ram() &
1916                               migration_in_postcopy());
1917     } else {
1918         qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
1919     }
1920     ram_counters.transferred += TARGET_PAGE_SIZE;
1921     ram_counters.normal++;
1922     return 1;
1923 }
1924 
1925 /**
1926  * ram_save_page: send the given page to the stream
1927  *
1928  * Returns the number of pages written.
1929  *          < 0 - error
1930  *          >=0 - Number of pages written - this might legally be 0
1931  *                if xbzrle noticed the page was the same.
1932  *
1933  * @rs: current RAM state
1934  * @block: block that contains the page we want to send
1935  * @offset: offset inside the block for the page
1936  * @last_stage: if we are at the completion stage
1937  */
1938 static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
1939 {
1940     int pages = -1;
1941     uint8_t *p;
1942     bool send_async = true;
1943     RAMBlock *block = pss->block;
1944     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
1945     ram_addr_t current_addr = block->offset + offset;
1946 
1947     p = block->host + offset;
1948     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
1949 
1950     XBZRLE_cache_lock();
1951     if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
1952         migrate_use_xbzrle()) {
1953         pages = save_xbzrle_page(rs, &p, current_addr, block,
1954                                  offset, last_stage);
1955         if (!last_stage) {
1956             /* Can't send this cached data async, since the cache page
1957              * might get updated before it gets to the wire
1958              */
1959             send_async = false;
1960         }
1961     }
1962 
1963     /* XBZRLE overflow or normal page */
1964     if (pages == -1) {
1965         pages = save_normal_page(rs, block, offset, p, send_async);
1966     }
1967 
1968     XBZRLE_cache_unlock();
1969 
1970     return pages;
1971 }
1972 
1973 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
1974                                  ram_addr_t offset)
1975 {
1976     multifd_queue_page(block, offset);
1977     ram_counters.normal++;
1978 
1979     return 1;
1980 }
1981 
1982 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
1983                                  ram_addr_t offset, uint8_t *source_buf)
1984 {
1985     RAMState *rs = ram_state;
1986     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
1987     bool zero_page = false;
1988     int ret;
1989 
1990     if (save_zero_page_to_file(rs, f, block, offset)) {
1991         zero_page = true;
1992         goto exit;
1993     }
1994 
1995     save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
1996 
1997     /*
1998      * copy it to a internal buffer to avoid it being modified by VM
1999      * so that we can catch up the error during compression and
2000      * decompression
2001      */
2002     memcpy(source_buf, p, TARGET_PAGE_SIZE);
2003     ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2004     if (ret < 0) {
2005         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2006         error_report("compressed data failed!");
2007         return false;
2008     }
2009 
2010 exit:
2011     ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2012     return zero_page;
2013 }
2014 
2015 static void
2016 update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2017 {
2018     ram_counters.transferred += bytes_xmit;
2019 
2020     if (param->zero_page) {
2021         ram_counters.duplicate++;
2022         return;
2023     }
2024 
2025     /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2026     compression_counters.compressed_size += bytes_xmit - 8;
2027     compression_counters.pages++;
2028 }
2029 
2030 static bool save_page_use_compression(RAMState *rs);
2031 
2032 static void flush_compressed_data(RAMState *rs)
2033 {
2034     int idx, len, thread_count;
2035 
2036     if (!save_page_use_compression(rs)) {
2037         return;
2038     }
2039     thread_count = migrate_compress_threads();
2040 
2041     qemu_mutex_lock(&comp_done_lock);
2042     for (idx = 0; idx < thread_count; idx++) {
2043         while (!comp_param[idx].done) {
2044             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2045         }
2046     }
2047     qemu_mutex_unlock(&comp_done_lock);
2048 
2049     for (idx = 0; idx < thread_count; idx++) {
2050         qemu_mutex_lock(&comp_param[idx].mutex);
2051         if (!comp_param[idx].quit) {
2052             len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2053             /*
2054              * it's safe to fetch zero_page without holding comp_done_lock
2055              * as there is no further request submitted to the thread,
2056              * i.e, the thread should be waiting for a request at this point.
2057              */
2058             update_compress_thread_counts(&comp_param[idx], len);
2059         }
2060         qemu_mutex_unlock(&comp_param[idx].mutex);
2061     }
2062 }
2063 
2064 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2065                                        ram_addr_t offset)
2066 {
2067     param->block = block;
2068     param->offset = offset;
2069 }
2070 
2071 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2072                                            ram_addr_t offset)
2073 {
2074     int idx, thread_count, bytes_xmit = -1, pages = -1;
2075     bool wait = migrate_compress_wait_thread();
2076 
2077     thread_count = migrate_compress_threads();
2078     qemu_mutex_lock(&comp_done_lock);
2079 retry:
2080     for (idx = 0; idx < thread_count; idx++) {
2081         if (comp_param[idx].done) {
2082             comp_param[idx].done = false;
2083             bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2084             qemu_mutex_lock(&comp_param[idx].mutex);
2085             set_compress_params(&comp_param[idx], block, offset);
2086             qemu_cond_signal(&comp_param[idx].cond);
2087             qemu_mutex_unlock(&comp_param[idx].mutex);
2088             pages = 1;
2089             update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2090             break;
2091         }
2092     }
2093 
2094     /*
2095      * wait for the free thread if the user specifies 'compress-wait-thread',
2096      * otherwise we will post the page out in the main thread as normal page.
2097      */
2098     if (pages < 0 && wait) {
2099         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2100         goto retry;
2101     }
2102     qemu_mutex_unlock(&comp_done_lock);
2103 
2104     return pages;
2105 }
2106 
2107 /**
2108  * find_dirty_block: find the next dirty page and update any state
2109  * associated with the search process.
2110  *
2111  * Returns if a page is found
2112  *
2113  * @rs: current RAM state
2114  * @pss: data about the state of the current dirty page scan
2115  * @again: set to false if the search has scanned the whole of RAM
2116  */
2117 static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2118 {
2119     pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2120     if (pss->complete_round && pss->block == rs->last_seen_block &&
2121         pss->page >= rs->last_page) {
2122         /*
2123          * We've been once around the RAM and haven't found anything.
2124          * Give up.
2125          */
2126         *again = false;
2127         return false;
2128     }
2129     if ((pss->page << TARGET_PAGE_BITS) >= pss->block->used_length) {
2130         /* Didn't find anything in this RAM Block */
2131         pss->page = 0;
2132         pss->block = QLIST_NEXT_RCU(pss->block, next);
2133         if (!pss->block) {
2134             /*
2135              * If memory migration starts over, we will meet a dirtied page
2136              * which may still exists in compression threads's ring, so we
2137              * should flush the compressed data to make sure the new page
2138              * is not overwritten by the old one in the destination.
2139              *
2140              * Also If xbzrle is on, stop using the data compression at this
2141              * point. In theory, xbzrle can do better than compression.
2142              */
2143             flush_compressed_data(rs);
2144 
2145             /* Hit the end of the list */
2146             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2147             /* Flag that we've looped */
2148             pss->complete_round = true;
2149             rs->ram_bulk_stage = false;
2150         }
2151         /* Didn't find anything this time, but try again on the new block */
2152         *again = true;
2153         return false;
2154     } else {
2155         /* Can go around again, but... */
2156         *again = true;
2157         /* We've found something so probably don't need to */
2158         return true;
2159     }
2160 }
2161 
2162 /**
2163  * unqueue_page: gets a page of the queue
2164  *
2165  * Helper for 'get_queued_page' - gets a page off the queue
2166  *
2167  * Returns the block of the page (or NULL if none available)
2168  *
2169  * @rs: current RAM state
2170  * @offset: used to return the offset within the RAMBlock
2171  */
2172 static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2173 {
2174     RAMBlock *block = NULL;
2175 
2176     if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2177         return NULL;
2178     }
2179 
2180     qemu_mutex_lock(&rs->src_page_req_mutex);
2181     if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2182         struct RAMSrcPageRequest *entry =
2183                                 QSIMPLEQ_FIRST(&rs->src_page_requests);
2184         block = entry->rb;
2185         *offset = entry->offset;
2186 
2187         if (entry->len > TARGET_PAGE_SIZE) {
2188             entry->len -= TARGET_PAGE_SIZE;
2189             entry->offset += TARGET_PAGE_SIZE;
2190         } else {
2191             memory_region_unref(block->mr);
2192             QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2193             g_free(entry);
2194             migration_consume_urgent_request();
2195         }
2196     }
2197     qemu_mutex_unlock(&rs->src_page_req_mutex);
2198 
2199     return block;
2200 }
2201 
2202 /**
2203  * get_queued_page: unqueue a page from the postocpy requests
2204  *
2205  * Skips pages that are already sent (!dirty)
2206  *
2207  * Returns if a queued page is found
2208  *
2209  * @rs: current RAM state
2210  * @pss: data about the state of the current dirty page scan
2211  */
2212 static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2213 {
2214     RAMBlock  *block;
2215     ram_addr_t offset;
2216     bool dirty;
2217 
2218     do {
2219         block = unqueue_page(rs, &offset);
2220         /*
2221          * We're sending this page, and since it's postcopy nothing else
2222          * will dirty it, and we must make sure it doesn't get sent again
2223          * even if this queue request was received after the background
2224          * search already sent it.
2225          */
2226         if (block) {
2227             unsigned long page;
2228 
2229             page = offset >> TARGET_PAGE_BITS;
2230             dirty = test_bit(page, block->bmap);
2231             if (!dirty) {
2232                 trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2233                        page, test_bit(page, block->unsentmap));
2234             } else {
2235                 trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2236             }
2237         }
2238 
2239     } while (block && !dirty);
2240 
2241     if (block) {
2242         /*
2243          * As soon as we start servicing pages out of order, then we have
2244          * to kill the bulk stage, since the bulk stage assumes
2245          * in (migration_bitmap_find_and_reset_dirty) that every page is
2246          * dirty, that's no longer true.
2247          */
2248         rs->ram_bulk_stage = false;
2249 
2250         /*
2251          * We want the background search to continue from the queued page
2252          * since the guest is likely to want other pages near to the page
2253          * it just requested.
2254          */
2255         pss->block = block;
2256         pss->page = offset >> TARGET_PAGE_BITS;
2257     }
2258 
2259     return !!block;
2260 }
2261 
2262 /**
2263  * migration_page_queue_free: drop any remaining pages in the ram
2264  * request queue
2265  *
2266  * It should be empty at the end anyway, but in error cases there may
2267  * be some left.  in case that there is any page left, we drop it.
2268  *
2269  */
2270 static void migration_page_queue_free(RAMState *rs)
2271 {
2272     struct RAMSrcPageRequest *mspr, *next_mspr;
2273     /* This queue generally should be empty - but in the case of a failed
2274      * migration might have some droppings in.
2275      */
2276     rcu_read_lock();
2277     QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2278         memory_region_unref(mspr->rb->mr);
2279         QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2280         g_free(mspr);
2281     }
2282     rcu_read_unlock();
2283 }
2284 
2285 /**
2286  * ram_save_queue_pages: queue the page for transmission
2287  *
2288  * A request from postcopy destination for example.
2289  *
2290  * Returns zero on success or negative on error
2291  *
2292  * @rbname: Name of the RAMBLock of the request. NULL means the
2293  *          same that last one.
2294  * @start: starting address from the start of the RAMBlock
2295  * @len: length (in bytes) to send
2296  */
2297 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2298 {
2299     RAMBlock *ramblock;
2300     RAMState *rs = ram_state;
2301 
2302     ram_counters.postcopy_requests++;
2303     rcu_read_lock();
2304     if (!rbname) {
2305         /* Reuse last RAMBlock */
2306         ramblock = rs->last_req_rb;
2307 
2308         if (!ramblock) {
2309             /*
2310              * Shouldn't happen, we can't reuse the last RAMBlock if
2311              * it's the 1st request.
2312              */
2313             error_report("ram_save_queue_pages no previous block");
2314             goto err;
2315         }
2316     } else {
2317         ramblock = qemu_ram_block_by_name(rbname);
2318 
2319         if (!ramblock) {
2320             /* We shouldn't be asked for a non-existent RAMBlock */
2321             error_report("ram_save_queue_pages no block '%s'", rbname);
2322             goto err;
2323         }
2324         rs->last_req_rb = ramblock;
2325     }
2326     trace_ram_save_queue_pages(ramblock->idstr, start, len);
2327     if (start+len > ramblock->used_length) {
2328         error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2329                      RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2330                      __func__, start, len, ramblock->used_length);
2331         goto err;
2332     }
2333 
2334     struct RAMSrcPageRequest *new_entry =
2335         g_malloc0(sizeof(struct RAMSrcPageRequest));
2336     new_entry->rb = ramblock;
2337     new_entry->offset = start;
2338     new_entry->len = len;
2339 
2340     memory_region_ref(ramblock->mr);
2341     qemu_mutex_lock(&rs->src_page_req_mutex);
2342     QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2343     migration_make_urgent_request();
2344     qemu_mutex_unlock(&rs->src_page_req_mutex);
2345     rcu_read_unlock();
2346 
2347     return 0;
2348 
2349 err:
2350     rcu_read_unlock();
2351     return -1;
2352 }
2353 
2354 static bool save_page_use_compression(RAMState *rs)
2355 {
2356     if (!migrate_use_compression()) {
2357         return false;
2358     }
2359 
2360     /*
2361      * If xbzrle is on, stop using the data compression after first
2362      * round of migration even if compression is enabled. In theory,
2363      * xbzrle can do better than compression.
2364      */
2365     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2366         return true;
2367     }
2368 
2369     return false;
2370 }
2371 
2372 /*
2373  * try to compress the page before posting it out, return true if the page
2374  * has been properly handled by compression, otherwise needs other
2375  * paths to handle it
2376  */
2377 static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2378 {
2379     if (!save_page_use_compression(rs)) {
2380         return false;
2381     }
2382 
2383     /*
2384      * When starting the process of a new block, the first page of
2385      * the block should be sent out before other pages in the same
2386      * block, and all the pages in last block should have been sent
2387      * out, keeping this order is important, because the 'cont' flag
2388      * is used to avoid resending the block name.
2389      *
2390      * We post the fist page as normal page as compression will take
2391      * much CPU resource.
2392      */
2393     if (block != rs->last_sent_block) {
2394         flush_compressed_data(rs);
2395         return false;
2396     }
2397 
2398     if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2399         return true;
2400     }
2401 
2402     compression_counters.busy++;
2403     return false;
2404 }
2405 
2406 /**
2407  * ram_save_target_page: save one target page
2408  *
2409  * Returns the number of pages written
2410  *
2411  * @rs: current RAM state
2412  * @pss: data about the page we want to send
2413  * @last_stage: if we are at the completion stage
2414  */
2415 static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2416                                 bool last_stage)
2417 {
2418     RAMBlock *block = pss->block;
2419     ram_addr_t offset = pss->page << TARGET_PAGE_BITS;
2420     int res;
2421 
2422     if (control_save_page(rs, block, offset, &res)) {
2423         return res;
2424     }
2425 
2426     if (save_compress_page(rs, block, offset)) {
2427         return 1;
2428     }
2429 
2430     res = save_zero_page(rs, block, offset);
2431     if (res > 0) {
2432         /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2433          * page would be stale
2434          */
2435         if (!save_page_use_compression(rs)) {
2436             XBZRLE_cache_lock();
2437             xbzrle_cache_zero_page(rs, block->offset + offset);
2438             XBZRLE_cache_unlock();
2439         }
2440         ram_release_pages(block->idstr, offset, res);
2441         return res;
2442     }
2443 
2444     /*
2445      * do not use multifd for compression as the first page in the new
2446      * block should be posted out before sending the compressed page
2447      */
2448     if (!save_page_use_compression(rs) && migrate_use_multifd()) {
2449         return ram_save_multifd_page(rs, block, offset);
2450     }
2451 
2452     return ram_save_page(rs, pss, last_stage);
2453 }
2454 
2455 /**
2456  * ram_save_host_page: save a whole host page
2457  *
2458  * Starting at *offset send pages up to the end of the current host
2459  * page. It's valid for the initial offset to point into the middle of
2460  * a host page in which case the remainder of the hostpage is sent.
2461  * Only dirty target pages are sent. Note that the host page size may
2462  * be a huge page for this block.
2463  * The saving stops at the boundary of the used_length of the block
2464  * if the RAMBlock isn't a multiple of the host page size.
2465  *
2466  * Returns the number of pages written or negative on error
2467  *
2468  * @rs: current RAM state
2469  * @ms: current migration state
2470  * @pss: data about the page we want to send
2471  * @last_stage: if we are at the completion stage
2472  */
2473 static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2474                               bool last_stage)
2475 {
2476     int tmppages, pages = 0;
2477     size_t pagesize_bits =
2478         qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2479 
2480     if (ramblock_is_ignored(pss->block)) {
2481         error_report("block %s should not be migrated !", pss->block->idstr);
2482         return 0;
2483     }
2484 
2485     do {
2486         /* Check the pages is dirty and if it is send it */
2487         if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2488             pss->page++;
2489             continue;
2490         }
2491 
2492         tmppages = ram_save_target_page(rs, pss, last_stage);
2493         if (tmppages < 0) {
2494             return tmppages;
2495         }
2496 
2497         pages += tmppages;
2498         if (pss->block->unsentmap) {
2499             clear_bit(pss->page, pss->block->unsentmap);
2500         }
2501 
2502         pss->page++;
2503     } while ((pss->page & (pagesize_bits - 1)) &&
2504              offset_in_ramblock(pss->block, pss->page << TARGET_PAGE_BITS));
2505 
2506     /* The offset we leave with is the last one we looked at */
2507     pss->page--;
2508     return pages;
2509 }
2510 
2511 /**
2512  * ram_find_and_save_block: finds a dirty page and sends it to f
2513  *
2514  * Called within an RCU critical section.
2515  *
2516  * Returns the number of pages written where zero means no dirty pages,
2517  * or negative on error
2518  *
2519  * @rs: current RAM state
2520  * @last_stage: if we are at the completion stage
2521  *
2522  * On systems where host-page-size > target-page-size it will send all the
2523  * pages in a host page that are dirty.
2524  */
2525 
2526 static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2527 {
2528     PageSearchStatus pss;
2529     int pages = 0;
2530     bool again, found;
2531 
2532     /* No dirty page as there is zero RAM */
2533     if (!ram_bytes_total()) {
2534         return pages;
2535     }
2536 
2537     pss.block = rs->last_seen_block;
2538     pss.page = rs->last_page;
2539     pss.complete_round = false;
2540 
2541     if (!pss.block) {
2542         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2543     }
2544 
2545     do {
2546         again = true;
2547         found = get_queued_page(rs, &pss);
2548 
2549         if (!found) {
2550             /* priority queue empty, so just search for something dirty */
2551             found = find_dirty_block(rs, &pss, &again);
2552         }
2553 
2554         if (found) {
2555             pages = ram_save_host_page(rs, &pss, last_stage);
2556         }
2557     } while (!pages && again);
2558 
2559     rs->last_seen_block = pss.block;
2560     rs->last_page = pss.page;
2561 
2562     return pages;
2563 }
2564 
2565 void acct_update_position(QEMUFile *f, size_t size, bool zero)
2566 {
2567     uint64_t pages = size / TARGET_PAGE_SIZE;
2568 
2569     if (zero) {
2570         ram_counters.duplicate += pages;
2571     } else {
2572         ram_counters.normal += pages;
2573         ram_counters.transferred += size;
2574         qemu_update_position(f, size);
2575     }
2576 }
2577 
2578 static uint64_t ram_bytes_total_common(bool count_ignored)
2579 {
2580     RAMBlock *block;
2581     uint64_t total = 0;
2582 
2583     rcu_read_lock();
2584     if (count_ignored) {
2585         RAMBLOCK_FOREACH_MIGRATABLE(block) {
2586             total += block->used_length;
2587         }
2588     } else {
2589         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2590             total += block->used_length;
2591         }
2592     }
2593     rcu_read_unlock();
2594     return total;
2595 }
2596 
2597 uint64_t ram_bytes_total(void)
2598 {
2599     return ram_bytes_total_common(false);
2600 }
2601 
2602 static void xbzrle_load_setup(void)
2603 {
2604     XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2605 }
2606 
2607 static void xbzrle_load_cleanup(void)
2608 {
2609     g_free(XBZRLE.decoded_buf);
2610     XBZRLE.decoded_buf = NULL;
2611 }
2612 
2613 static void ram_state_cleanup(RAMState **rsp)
2614 {
2615     if (*rsp) {
2616         migration_page_queue_free(*rsp);
2617         qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2618         qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2619         g_free(*rsp);
2620         *rsp = NULL;
2621     }
2622 }
2623 
2624 static void xbzrle_cleanup(void)
2625 {
2626     XBZRLE_cache_lock();
2627     if (XBZRLE.cache) {
2628         cache_fini(XBZRLE.cache);
2629         g_free(XBZRLE.encoded_buf);
2630         g_free(XBZRLE.current_buf);
2631         g_free(XBZRLE.zero_target_page);
2632         XBZRLE.cache = NULL;
2633         XBZRLE.encoded_buf = NULL;
2634         XBZRLE.current_buf = NULL;
2635         XBZRLE.zero_target_page = NULL;
2636     }
2637     XBZRLE_cache_unlock();
2638 }
2639 
2640 static void ram_save_cleanup(void *opaque)
2641 {
2642     RAMState **rsp = opaque;
2643     RAMBlock *block;
2644 
2645     /* caller have hold iothread lock or is in a bh, so there is
2646      * no writing race against this migration_bitmap
2647      */
2648     memory_global_dirty_log_stop();
2649 
2650     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2651         g_free(block->bmap);
2652         block->bmap = NULL;
2653         g_free(block->unsentmap);
2654         block->unsentmap = NULL;
2655     }
2656 
2657     xbzrle_cleanup();
2658     compress_threads_save_cleanup();
2659     ram_state_cleanup(rsp);
2660 }
2661 
2662 static void ram_state_reset(RAMState *rs)
2663 {
2664     rs->last_seen_block = NULL;
2665     rs->last_sent_block = NULL;
2666     rs->last_page = 0;
2667     rs->last_version = ram_list.version;
2668     rs->ram_bulk_stage = true;
2669     rs->fpo_enabled = false;
2670 }
2671 
2672 #define MAX_WAIT 50 /* ms, half buffered_file limit */
2673 
2674 /*
2675  * 'expected' is the value you expect the bitmap mostly to be full
2676  * of; it won't bother printing lines that are all this value.
2677  * If 'todump' is null the migration bitmap is dumped.
2678  */
2679 void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2680                            unsigned long pages)
2681 {
2682     int64_t cur;
2683     int64_t linelen = 128;
2684     char linebuf[129];
2685 
2686     for (cur = 0; cur < pages; cur += linelen) {
2687         int64_t curb;
2688         bool found = false;
2689         /*
2690          * Last line; catch the case where the line length
2691          * is longer than remaining ram
2692          */
2693         if (cur + linelen > pages) {
2694             linelen = pages - cur;
2695         }
2696         for (curb = 0; curb < linelen; curb++) {
2697             bool thisbit = test_bit(cur + curb, todump);
2698             linebuf[curb] = thisbit ? '1' : '.';
2699             found = found || (thisbit != expected);
2700         }
2701         if (found) {
2702             linebuf[curb] = '\0';
2703             fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2704         }
2705     }
2706 }
2707 
2708 /* **** functions for postcopy ***** */
2709 
2710 void ram_postcopy_migrated_memory_release(MigrationState *ms)
2711 {
2712     struct RAMBlock *block;
2713 
2714     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2715         unsigned long *bitmap = block->bmap;
2716         unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2717         unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2718 
2719         while (run_start < range) {
2720             unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2721             ram_discard_range(block->idstr, run_start << TARGET_PAGE_BITS,
2722                               (run_end - run_start) << TARGET_PAGE_BITS);
2723             run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2724         }
2725     }
2726 }
2727 
2728 /**
2729  * postcopy_send_discard_bm_ram: discard a RAMBlock
2730  *
2731  * Returns zero on success
2732  *
2733  * Callback from postcopy_each_ram_send_discard for each RAMBlock
2734  * Note: At this point the 'unsentmap' is the processed bitmap combined
2735  *       with the dirtymap; so a '1' means it's either dirty or unsent.
2736  *
2737  * @ms: current migration state
2738  * @pds: state for postcopy
2739  * @start: RAMBlock starting page
2740  * @length: RAMBlock size
2741  */
2742 static int postcopy_send_discard_bm_ram(MigrationState *ms,
2743                                         PostcopyDiscardState *pds,
2744                                         RAMBlock *block)
2745 {
2746     unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2747     unsigned long current;
2748     unsigned long *unsentmap = block->unsentmap;
2749 
2750     for (current = 0; current < end; ) {
2751         unsigned long one = find_next_bit(unsentmap, end, current);
2752 
2753         if (one <= end) {
2754             unsigned long zero = find_next_zero_bit(unsentmap, end, one + 1);
2755             unsigned long discard_length;
2756 
2757             if (zero >= end) {
2758                 discard_length = end - one;
2759             } else {
2760                 discard_length = zero - one;
2761             }
2762             if (discard_length) {
2763                 postcopy_discard_send_range(ms, pds, one, discard_length);
2764             }
2765             current = one + discard_length;
2766         } else {
2767             current = one;
2768         }
2769     }
2770 
2771     return 0;
2772 }
2773 
2774 /**
2775  * postcopy_each_ram_send_discard: discard all RAMBlocks
2776  *
2777  * Returns 0 for success or negative for error
2778  *
2779  * Utility for the outgoing postcopy code.
2780  *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2781  *   passing it bitmap indexes and name.
2782  * (qemu_ram_foreach_block ends up passing unscaled lengths
2783  *  which would mean postcopy code would have to deal with target page)
2784  *
2785  * @ms: current migration state
2786  */
2787 static int postcopy_each_ram_send_discard(MigrationState *ms)
2788 {
2789     struct RAMBlock *block;
2790     int ret;
2791 
2792     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2793         PostcopyDiscardState *pds =
2794             postcopy_discard_send_init(ms, block->idstr);
2795 
2796         /*
2797          * Postcopy sends chunks of bitmap over the wire, but it
2798          * just needs indexes at this point, avoids it having
2799          * target page specific code.
2800          */
2801         ret = postcopy_send_discard_bm_ram(ms, pds, block);
2802         postcopy_discard_send_finish(ms, pds);
2803         if (ret) {
2804             return ret;
2805         }
2806     }
2807 
2808     return 0;
2809 }
2810 
2811 /**
2812  * postcopy_chunk_hostpages_pass: canocalize bitmap in hostpages
2813  *
2814  * Helper for postcopy_chunk_hostpages; it's called twice to
2815  * canonicalize the two bitmaps, that are similar, but one is
2816  * inverted.
2817  *
2818  * Postcopy requires that all target pages in a hostpage are dirty or
2819  * clean, not a mix.  This function canonicalizes the bitmaps.
2820  *
2821  * @ms: current migration state
2822  * @unsent_pass: if true we need to canonicalize partially unsent host pages
2823  *               otherwise we need to canonicalize partially dirty host pages
2824  * @block: block that contains the page we want to canonicalize
2825  * @pds: state for postcopy
2826  */
2827 static void postcopy_chunk_hostpages_pass(MigrationState *ms, bool unsent_pass,
2828                                           RAMBlock *block,
2829                                           PostcopyDiscardState *pds)
2830 {
2831     RAMState *rs = ram_state;
2832     unsigned long *bitmap = block->bmap;
2833     unsigned long *unsentmap = block->unsentmap;
2834     unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2835     unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2836     unsigned long run_start;
2837 
2838     if (block->page_size == TARGET_PAGE_SIZE) {
2839         /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2840         return;
2841     }
2842 
2843     if (unsent_pass) {
2844         /* Find a sent page */
2845         run_start = find_next_zero_bit(unsentmap, pages, 0);
2846     } else {
2847         /* Find a dirty page */
2848         run_start = find_next_bit(bitmap, pages, 0);
2849     }
2850 
2851     while (run_start < pages) {
2852         bool do_fixup = false;
2853         unsigned long fixup_start_addr;
2854         unsigned long host_offset;
2855 
2856         /*
2857          * If the start of this run of pages is in the middle of a host
2858          * page, then we need to fixup this host page.
2859          */
2860         host_offset = run_start % host_ratio;
2861         if (host_offset) {
2862             do_fixup = true;
2863             run_start -= host_offset;
2864             fixup_start_addr = run_start;
2865             /* For the next pass */
2866             run_start = run_start + host_ratio;
2867         } else {
2868             /* Find the end of this run */
2869             unsigned long run_end;
2870             if (unsent_pass) {
2871                 run_end = find_next_bit(unsentmap, pages, run_start + 1);
2872             } else {
2873                 run_end = find_next_zero_bit(bitmap, pages, run_start + 1);
2874             }
2875             /*
2876              * If the end isn't at the start of a host page, then the
2877              * run doesn't finish at the end of a host page
2878              * and we need to discard.
2879              */
2880             host_offset = run_end % host_ratio;
2881             if (host_offset) {
2882                 do_fixup = true;
2883                 fixup_start_addr = run_end - host_offset;
2884                 /*
2885                  * This host page has gone, the next loop iteration starts
2886                  * from after the fixup
2887                  */
2888                 run_start = fixup_start_addr + host_ratio;
2889             } else {
2890                 /*
2891                  * No discards on this iteration, next loop starts from
2892                  * next sent/dirty page
2893                  */
2894                 run_start = run_end + 1;
2895             }
2896         }
2897 
2898         if (do_fixup) {
2899             unsigned long page;
2900 
2901             /* Tell the destination to discard this page */
2902             if (unsent_pass || !test_bit(fixup_start_addr, unsentmap)) {
2903                 /* For the unsent_pass we:
2904                  *     discard partially sent pages
2905                  * For the !unsent_pass (dirty) we:
2906                  *     discard partially dirty pages that were sent
2907                  *     (any partially sent pages were already discarded
2908                  *     by the previous unsent_pass)
2909                  */
2910                 postcopy_discard_send_range(ms, pds, fixup_start_addr,
2911                                             host_ratio);
2912             }
2913 
2914             /* Clean up the bitmap */
2915             for (page = fixup_start_addr;
2916                  page < fixup_start_addr + host_ratio; page++) {
2917                 /* All pages in this host page are now not sent */
2918                 set_bit(page, unsentmap);
2919 
2920                 /*
2921                  * Remark them as dirty, updating the count for any pages
2922                  * that weren't previously dirty.
2923                  */
2924                 rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
2925             }
2926         }
2927 
2928         if (unsent_pass) {
2929             /* Find the next sent page for the next iteration */
2930             run_start = find_next_zero_bit(unsentmap, pages, run_start);
2931         } else {
2932             /* Find the next dirty page for the next iteration */
2933             run_start = find_next_bit(bitmap, pages, run_start);
2934         }
2935     }
2936 }
2937 
2938 /**
2939  * postcopy_chuck_hostpages: discrad any partially sent host page
2940  *
2941  * Utility for the outgoing postcopy code.
2942  *
2943  * Discard any partially sent host-page size chunks, mark any partially
2944  * dirty host-page size chunks as all dirty.  In this case the host-page
2945  * is the host-page for the particular RAMBlock, i.e. it might be a huge page
2946  *
2947  * Returns zero on success
2948  *
2949  * @ms: current migration state
2950  * @block: block we want to work with
2951  */
2952 static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
2953 {
2954     PostcopyDiscardState *pds =
2955         postcopy_discard_send_init(ms, block->idstr);
2956 
2957     /* First pass: Discard all partially sent host pages */
2958     postcopy_chunk_hostpages_pass(ms, true, block, pds);
2959     /*
2960      * Second pass: Ensure that all partially dirty host pages are made
2961      * fully dirty.
2962      */
2963     postcopy_chunk_hostpages_pass(ms, false, block, pds);
2964 
2965     postcopy_discard_send_finish(ms, pds);
2966     return 0;
2967 }
2968 
2969 /**
2970  * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
2971  *
2972  * Returns zero on success
2973  *
2974  * Transmit the set of pages to be discarded after precopy to the target
2975  * these are pages that:
2976  *     a) Have been previously transmitted but are now dirty again
2977  *     b) Pages that have never been transmitted, this ensures that
2978  *        any pages on the destination that have been mapped by background
2979  *        tasks get discarded (transparent huge pages is the specific concern)
2980  * Hopefully this is pretty sparse
2981  *
2982  * @ms: current migration state
2983  */
2984 int ram_postcopy_send_discard_bitmap(MigrationState *ms)
2985 {
2986     RAMState *rs = ram_state;
2987     RAMBlock *block;
2988     int ret;
2989 
2990     rcu_read_lock();
2991 
2992     /* This should be our last sync, the src is now paused */
2993     migration_bitmap_sync(rs);
2994 
2995     /* Easiest way to make sure we don't resume in the middle of a host-page */
2996     rs->last_seen_block = NULL;
2997     rs->last_sent_block = NULL;
2998     rs->last_page = 0;
2999 
3000     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3001         unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
3002         unsigned long *bitmap = block->bmap;
3003         unsigned long *unsentmap = block->unsentmap;
3004 
3005         if (!unsentmap) {
3006             /* We don't have a safe way to resize the sentmap, so
3007              * if the bitmap was resized it will be NULL at this
3008              * point.
3009              */
3010             error_report("migration ram resized during precopy phase");
3011             rcu_read_unlock();
3012             return -EINVAL;
3013         }
3014         /* Deal with TPS != HPS and huge pages */
3015         ret = postcopy_chunk_hostpages(ms, block);
3016         if (ret) {
3017             rcu_read_unlock();
3018             return ret;
3019         }
3020 
3021         /*
3022          * Update the unsentmap to be unsentmap = unsentmap | dirty
3023          */
3024         bitmap_or(unsentmap, unsentmap, bitmap, pages);
3025 #ifdef DEBUG_POSTCOPY
3026         ram_debug_dump_bitmap(unsentmap, true, pages);
3027 #endif
3028     }
3029     trace_ram_postcopy_send_discard_bitmap();
3030 
3031     ret = postcopy_each_ram_send_discard(ms);
3032     rcu_read_unlock();
3033 
3034     return ret;
3035 }
3036 
3037 /**
3038  * ram_discard_range: discard dirtied pages at the beginning of postcopy
3039  *
3040  * Returns zero on success
3041  *
3042  * @rbname: name of the RAMBlock of the request. NULL means the
3043  *          same that last one.
3044  * @start: RAMBlock starting page
3045  * @length: RAMBlock size
3046  */
3047 int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3048 {
3049     int ret = -1;
3050 
3051     trace_ram_discard_range(rbname, start, length);
3052 
3053     rcu_read_lock();
3054     RAMBlock *rb = qemu_ram_block_by_name(rbname);
3055 
3056     if (!rb) {
3057         error_report("ram_discard_range: Failed to find block '%s'", rbname);
3058         goto err;
3059     }
3060 
3061     /*
3062      * On source VM, we don't need to update the received bitmap since
3063      * we don't even have one.
3064      */
3065     if (rb->receivedmap) {
3066         bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3067                      length >> qemu_target_page_bits());
3068     }
3069 
3070     ret = ram_block_discard_range(rb, start, length);
3071 
3072 err:
3073     rcu_read_unlock();
3074 
3075     return ret;
3076 }
3077 
3078 /*
3079  * For every allocation, we will try not to crash the VM if the
3080  * allocation failed.
3081  */
3082 static int xbzrle_init(void)
3083 {
3084     Error *local_err = NULL;
3085 
3086     if (!migrate_use_xbzrle()) {
3087         return 0;
3088     }
3089 
3090     XBZRLE_cache_lock();
3091 
3092     XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3093     if (!XBZRLE.zero_target_page) {
3094         error_report("%s: Error allocating zero page", __func__);
3095         goto err_out;
3096     }
3097 
3098     XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3099                               TARGET_PAGE_SIZE, &local_err);
3100     if (!XBZRLE.cache) {
3101         error_report_err(local_err);
3102         goto free_zero_page;
3103     }
3104 
3105     XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3106     if (!XBZRLE.encoded_buf) {
3107         error_report("%s: Error allocating encoded_buf", __func__);
3108         goto free_cache;
3109     }
3110 
3111     XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3112     if (!XBZRLE.current_buf) {
3113         error_report("%s: Error allocating current_buf", __func__);
3114         goto free_encoded_buf;
3115     }
3116 
3117     /* We are all good */
3118     XBZRLE_cache_unlock();
3119     return 0;
3120 
3121 free_encoded_buf:
3122     g_free(XBZRLE.encoded_buf);
3123     XBZRLE.encoded_buf = NULL;
3124 free_cache:
3125     cache_fini(XBZRLE.cache);
3126     XBZRLE.cache = NULL;
3127 free_zero_page:
3128     g_free(XBZRLE.zero_target_page);
3129     XBZRLE.zero_target_page = NULL;
3130 err_out:
3131     XBZRLE_cache_unlock();
3132     return -ENOMEM;
3133 }
3134 
3135 static int ram_state_init(RAMState **rsp)
3136 {
3137     *rsp = g_try_new0(RAMState, 1);
3138 
3139     if (!*rsp) {
3140         error_report("%s: Init ramstate fail", __func__);
3141         return -1;
3142     }
3143 
3144     qemu_mutex_init(&(*rsp)->bitmap_mutex);
3145     qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3146     QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3147 
3148     /*
3149      * Count the total number of pages used by ram blocks not including any
3150      * gaps due to alignment or unplugs.
3151      */
3152     (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3153 
3154     ram_state_reset(*rsp);
3155 
3156     return 0;
3157 }
3158 
3159 static void ram_list_init_bitmaps(void)
3160 {
3161     RAMBlock *block;
3162     unsigned long pages;
3163 
3164     /* Skip setting bitmap if there is no RAM */
3165     if (ram_bytes_total()) {
3166         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3167             pages = block->max_length >> TARGET_PAGE_BITS;
3168             block->bmap = bitmap_new(pages);
3169             bitmap_set(block->bmap, 0, pages);
3170             if (migrate_postcopy_ram()) {
3171                 block->unsentmap = bitmap_new(pages);
3172                 bitmap_set(block->unsentmap, 0, pages);
3173             }
3174         }
3175     }
3176 }
3177 
3178 static void ram_init_bitmaps(RAMState *rs)
3179 {
3180     /* For memory_global_dirty_log_start below.  */
3181     qemu_mutex_lock_iothread();
3182     qemu_mutex_lock_ramlist();
3183     rcu_read_lock();
3184 
3185     ram_list_init_bitmaps();
3186     memory_global_dirty_log_start();
3187     migration_bitmap_sync_precopy(rs);
3188 
3189     rcu_read_unlock();
3190     qemu_mutex_unlock_ramlist();
3191     qemu_mutex_unlock_iothread();
3192 }
3193 
3194 static int ram_init_all(RAMState **rsp)
3195 {
3196     if (ram_state_init(rsp)) {
3197         return -1;
3198     }
3199 
3200     if (xbzrle_init()) {
3201         ram_state_cleanup(rsp);
3202         return -1;
3203     }
3204 
3205     ram_init_bitmaps(*rsp);
3206 
3207     return 0;
3208 }
3209 
3210 static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3211 {
3212     RAMBlock *block;
3213     uint64_t pages = 0;
3214 
3215     /*
3216      * Postcopy is not using xbzrle/compression, so no need for that.
3217      * Also, since source are already halted, we don't need to care
3218      * about dirty page logging as well.
3219      */
3220 
3221     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3222         pages += bitmap_count_one(block->bmap,
3223                                   block->used_length >> TARGET_PAGE_BITS);
3224     }
3225 
3226     /* This may not be aligned with current bitmaps. Recalculate. */
3227     rs->migration_dirty_pages = pages;
3228 
3229     rs->last_seen_block = NULL;
3230     rs->last_sent_block = NULL;
3231     rs->last_page = 0;
3232     rs->last_version = ram_list.version;
3233     /*
3234      * Disable the bulk stage, otherwise we'll resend the whole RAM no
3235      * matter what we have sent.
3236      */
3237     rs->ram_bulk_stage = false;
3238 
3239     /* Update RAMState cache of output QEMUFile */
3240     rs->f = out;
3241 
3242     trace_ram_state_resume_prepare(pages);
3243 }
3244 
3245 /*
3246  * This function clears bits of the free pages reported by the caller from the
3247  * migration dirty bitmap. @addr is the host address corresponding to the
3248  * start of the continuous guest free pages, and @len is the total bytes of
3249  * those pages.
3250  */
3251 void qemu_guest_free_page_hint(void *addr, size_t len)
3252 {
3253     RAMBlock *block;
3254     ram_addr_t offset;
3255     size_t used_len, start, npages;
3256     MigrationState *s = migrate_get_current();
3257 
3258     /* This function is currently expected to be used during live migration */
3259     if (!migration_is_setup_or_active(s->state)) {
3260         return;
3261     }
3262 
3263     for (; len > 0; len -= used_len, addr += used_len) {
3264         block = qemu_ram_block_from_host(addr, false, &offset);
3265         if (unlikely(!block || offset >= block->used_length)) {
3266             /*
3267              * The implementation might not support RAMBlock resize during
3268              * live migration, but it could happen in theory with future
3269              * updates. So we add a check here to capture that case.
3270              */
3271             error_report_once("%s unexpected error", __func__);
3272             return;
3273         }
3274 
3275         if (len <= block->used_length - offset) {
3276             used_len = len;
3277         } else {
3278             used_len = block->used_length - offset;
3279         }
3280 
3281         start = offset >> TARGET_PAGE_BITS;
3282         npages = used_len >> TARGET_PAGE_BITS;
3283 
3284         qemu_mutex_lock(&ram_state->bitmap_mutex);
3285         ram_state->migration_dirty_pages -=
3286                       bitmap_count_one_with_offset(block->bmap, start, npages);
3287         bitmap_clear(block->bmap, start, npages);
3288         qemu_mutex_unlock(&ram_state->bitmap_mutex);
3289     }
3290 }
3291 
3292 /*
3293  * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3294  * long-running RCU critical section.  When rcu-reclaims in the code
3295  * start to become numerous it will be necessary to reduce the
3296  * granularity of these critical sections.
3297  */
3298 
3299 /**
3300  * ram_save_setup: Setup RAM for migration
3301  *
3302  * Returns zero to indicate success and negative for error
3303  *
3304  * @f: QEMUFile where to send the data
3305  * @opaque: RAMState pointer
3306  */
3307 static int ram_save_setup(QEMUFile *f, void *opaque)
3308 {
3309     RAMState **rsp = opaque;
3310     RAMBlock *block;
3311 
3312     if (compress_threads_save_setup()) {
3313         return -1;
3314     }
3315 
3316     /* migration has already setup the bitmap, reuse it. */
3317     if (!migration_in_colo_state()) {
3318         if (ram_init_all(rsp) != 0) {
3319             compress_threads_save_cleanup();
3320             return -1;
3321         }
3322     }
3323     (*rsp)->f = f;
3324 
3325     rcu_read_lock();
3326 
3327     qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3328 
3329     RAMBLOCK_FOREACH_MIGRATABLE(block) {
3330         qemu_put_byte(f, strlen(block->idstr));
3331         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3332         qemu_put_be64(f, block->used_length);
3333         if (migrate_postcopy_ram() && block->page_size != qemu_host_page_size) {
3334             qemu_put_be64(f, block->page_size);
3335         }
3336         if (migrate_ignore_shared()) {
3337             qemu_put_be64(f, block->mr->addr);
3338             qemu_put_byte(f, ramblock_is_ignored(block) ? 1 : 0);
3339         }
3340     }
3341 
3342     rcu_read_unlock();
3343 
3344     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3345     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3346 
3347     multifd_send_sync_main();
3348     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3349     qemu_fflush(f);
3350 
3351     return 0;
3352 }
3353 
3354 /**
3355  * ram_save_iterate: iterative stage for migration
3356  *
3357  * Returns zero to indicate success and negative for error
3358  *
3359  * @f: QEMUFile where to send the data
3360  * @opaque: RAMState pointer
3361  */
3362 static int ram_save_iterate(QEMUFile *f, void *opaque)
3363 {
3364     RAMState **temp = opaque;
3365     RAMState *rs = *temp;
3366     int ret;
3367     int i;
3368     int64_t t0;
3369     int done = 0;
3370 
3371     if (blk_mig_bulk_active()) {
3372         /* Avoid transferring ram during bulk phase of block migration as
3373          * the bulk phase will usually take a long time and transferring
3374          * ram updates during that time is pointless. */
3375         goto out;
3376     }
3377 
3378     rcu_read_lock();
3379     if (ram_list.version != rs->last_version) {
3380         ram_state_reset(rs);
3381     }
3382 
3383     /* Read version before ram_list.blocks */
3384     smp_rmb();
3385 
3386     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3387 
3388     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3389     i = 0;
3390     while ((ret = qemu_file_rate_limit(f)) == 0 ||
3391             !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3392         int pages;
3393 
3394         if (qemu_file_get_error(f)) {
3395             break;
3396         }
3397 
3398         pages = ram_find_and_save_block(rs, false);
3399         /* no more pages to sent */
3400         if (pages == 0) {
3401             done = 1;
3402             break;
3403         }
3404 
3405         if (pages < 0) {
3406             qemu_file_set_error(f, pages);
3407             break;
3408         }
3409 
3410         rs->target_page_count += pages;
3411 
3412         /* we want to check in the 1st loop, just in case it was the 1st time
3413            and we had to sync the dirty bitmap.
3414            qemu_get_clock_ns() is a bit expensive, so we only check each some
3415            iterations
3416         */
3417         if ((i & 63) == 0) {
3418             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
3419             if (t1 > MAX_WAIT) {
3420                 trace_ram_save_iterate_big_wait(t1, i);
3421                 break;
3422             }
3423         }
3424         i++;
3425     }
3426     rcu_read_unlock();
3427 
3428     /*
3429      * Must occur before EOS (or any QEMUFile operation)
3430      * because of RDMA protocol.
3431      */
3432     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3433 
3434     multifd_send_sync_main();
3435 out:
3436     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3437     qemu_fflush(f);
3438     ram_counters.transferred += 8;
3439 
3440     ret = qemu_file_get_error(f);
3441     if (ret < 0) {
3442         return ret;
3443     }
3444 
3445     return done;
3446 }
3447 
3448 /**
3449  * ram_save_complete: function called to send the remaining amount of ram
3450  *
3451  * Returns zero to indicate success or negative on error
3452  *
3453  * Called with iothread lock
3454  *
3455  * @f: QEMUFile where to send the data
3456  * @opaque: RAMState pointer
3457  */
3458 static int ram_save_complete(QEMUFile *f, void *opaque)
3459 {
3460     RAMState **temp = opaque;
3461     RAMState *rs = *temp;
3462     int ret = 0;
3463 
3464     rcu_read_lock();
3465 
3466     if (!migration_in_postcopy()) {
3467         migration_bitmap_sync_precopy(rs);
3468     }
3469 
3470     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3471 
3472     /* try transferring iterative blocks of memory */
3473 
3474     /* flush all remaining blocks regardless of rate limiting */
3475     while (true) {
3476         int pages;
3477 
3478         pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3479         /* no more blocks to sent */
3480         if (pages == 0) {
3481             break;
3482         }
3483         if (pages < 0) {
3484             ret = pages;
3485             break;
3486         }
3487     }
3488 
3489     flush_compressed_data(rs);
3490     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3491 
3492     rcu_read_unlock();
3493 
3494     multifd_send_sync_main();
3495     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3496     qemu_fflush(f);
3497 
3498     return ret;
3499 }
3500 
3501 static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3502                              uint64_t *res_precopy_only,
3503                              uint64_t *res_compatible,
3504                              uint64_t *res_postcopy_only)
3505 {
3506     RAMState **temp = opaque;
3507     RAMState *rs = *temp;
3508     uint64_t remaining_size;
3509 
3510     remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3511 
3512     if (!migration_in_postcopy() &&
3513         remaining_size < max_size) {
3514         qemu_mutex_lock_iothread();
3515         rcu_read_lock();
3516         migration_bitmap_sync_precopy(rs);
3517         rcu_read_unlock();
3518         qemu_mutex_unlock_iothread();
3519         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3520     }
3521 
3522     if (migrate_postcopy_ram()) {
3523         /* We can do postcopy, and all the data is postcopiable */
3524         *res_compatible += remaining_size;
3525     } else {
3526         *res_precopy_only += remaining_size;
3527     }
3528 }
3529 
3530 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3531 {
3532     unsigned int xh_len;
3533     int xh_flags;
3534     uint8_t *loaded_data;
3535 
3536     /* extract RLE header */
3537     xh_flags = qemu_get_byte(f);
3538     xh_len = qemu_get_be16(f);
3539 
3540     if (xh_flags != ENCODING_FLAG_XBZRLE) {
3541         error_report("Failed to load XBZRLE page - wrong compression!");
3542         return -1;
3543     }
3544 
3545     if (xh_len > TARGET_PAGE_SIZE) {
3546         error_report("Failed to load XBZRLE page - len overflow!");
3547         return -1;
3548     }
3549     loaded_data = XBZRLE.decoded_buf;
3550     /* load data and decode */
3551     /* it can change loaded_data to point to an internal buffer */
3552     qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3553 
3554     /* decode RLE */
3555     if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3556                              TARGET_PAGE_SIZE) == -1) {
3557         error_report("Failed to load XBZRLE page - decode error!");
3558         return -1;
3559     }
3560 
3561     return 0;
3562 }
3563 
3564 /**
3565  * ram_block_from_stream: read a RAMBlock id from the migration stream
3566  *
3567  * Must be called from within a rcu critical section.
3568  *
3569  * Returns a pointer from within the RCU-protected ram_list.
3570  *
3571  * @f: QEMUFile where to read the data from
3572  * @flags: Page flags (mostly to see if it's a continuation of previous block)
3573  */
3574 static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3575 {
3576     static RAMBlock *block = NULL;
3577     char id[256];
3578     uint8_t len;
3579 
3580     if (flags & RAM_SAVE_FLAG_CONTINUE) {
3581         if (!block) {
3582             error_report("Ack, bad migration stream!");
3583             return NULL;
3584         }
3585         return block;
3586     }
3587 
3588     len = qemu_get_byte(f);
3589     qemu_get_buffer(f, (uint8_t *)id, len);
3590     id[len] = 0;
3591 
3592     block = qemu_ram_block_by_name(id);
3593     if (!block) {
3594         error_report("Can't find block %s", id);
3595         return NULL;
3596     }
3597 
3598     if (ramblock_is_ignored(block)) {
3599         error_report("block %s should not be migrated !", id);
3600         return NULL;
3601     }
3602 
3603     return block;
3604 }
3605 
3606 static inline void *host_from_ram_block_offset(RAMBlock *block,
3607                                                ram_addr_t offset)
3608 {
3609     if (!offset_in_ramblock(block, offset)) {
3610         return NULL;
3611     }
3612 
3613     return block->host + offset;
3614 }
3615 
3616 static inline void *colo_cache_from_block_offset(RAMBlock *block,
3617                                                  ram_addr_t offset)
3618 {
3619     if (!offset_in_ramblock(block, offset)) {
3620         return NULL;
3621     }
3622     if (!block->colo_cache) {
3623         error_report("%s: colo_cache is NULL in block :%s",
3624                      __func__, block->idstr);
3625         return NULL;
3626     }
3627 
3628     /*
3629     * During colo checkpoint, we need bitmap of these migrated pages.
3630     * It help us to decide which pages in ram cache should be flushed
3631     * into VM's RAM later.
3632     */
3633     if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3634         ram_state->migration_dirty_pages++;
3635     }
3636     return block->colo_cache + offset;
3637 }
3638 
3639 /**
3640  * ram_handle_compressed: handle the zero page case
3641  *
3642  * If a page (or a whole RDMA chunk) has been
3643  * determined to be zero, then zap it.
3644  *
3645  * @host: host address for the zero page
3646  * @ch: what the page is filled from.  We only support zero
3647  * @size: size of the zero page
3648  */
3649 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3650 {
3651     if (ch != 0 || !is_zero_range(host, size)) {
3652         memset(host, ch, size);
3653     }
3654 }
3655 
3656 /* return the size after decompression, or negative value on error */
3657 static int
3658 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3659                      const uint8_t *source, size_t source_len)
3660 {
3661     int err;
3662 
3663     err = inflateReset(stream);
3664     if (err != Z_OK) {
3665         return -1;
3666     }
3667 
3668     stream->avail_in = source_len;
3669     stream->next_in = (uint8_t *)source;
3670     stream->avail_out = dest_len;
3671     stream->next_out = dest;
3672 
3673     err = inflate(stream, Z_NO_FLUSH);
3674     if (err != Z_STREAM_END) {
3675         return -1;
3676     }
3677 
3678     return stream->total_out;
3679 }
3680 
3681 static void *do_data_decompress(void *opaque)
3682 {
3683     DecompressParam *param = opaque;
3684     unsigned long pagesize;
3685     uint8_t *des;
3686     int len, ret;
3687 
3688     qemu_mutex_lock(&param->mutex);
3689     while (!param->quit) {
3690         if (param->des) {
3691             des = param->des;
3692             len = param->len;
3693             param->des = 0;
3694             qemu_mutex_unlock(&param->mutex);
3695 
3696             pagesize = TARGET_PAGE_SIZE;
3697 
3698             ret = qemu_uncompress_data(&param->stream, des, pagesize,
3699                                        param->compbuf, len);
3700             if (ret < 0 && migrate_get_current()->decompress_error_check) {
3701                 error_report("decompress data failed");
3702                 qemu_file_set_error(decomp_file, ret);
3703             }
3704 
3705             qemu_mutex_lock(&decomp_done_lock);
3706             param->done = true;
3707             qemu_cond_signal(&decomp_done_cond);
3708             qemu_mutex_unlock(&decomp_done_lock);
3709 
3710             qemu_mutex_lock(&param->mutex);
3711         } else {
3712             qemu_cond_wait(&param->cond, &param->mutex);
3713         }
3714     }
3715     qemu_mutex_unlock(&param->mutex);
3716 
3717     return NULL;
3718 }
3719 
3720 static int wait_for_decompress_done(void)
3721 {
3722     int idx, thread_count;
3723 
3724     if (!migrate_use_compression()) {
3725         return 0;
3726     }
3727 
3728     thread_count = migrate_decompress_threads();
3729     qemu_mutex_lock(&decomp_done_lock);
3730     for (idx = 0; idx < thread_count; idx++) {
3731         while (!decomp_param[idx].done) {
3732             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3733         }
3734     }
3735     qemu_mutex_unlock(&decomp_done_lock);
3736     return qemu_file_get_error(decomp_file);
3737 }
3738 
3739 static void compress_threads_load_cleanup(void)
3740 {
3741     int i, thread_count;
3742 
3743     if (!migrate_use_compression()) {
3744         return;
3745     }
3746     thread_count = migrate_decompress_threads();
3747     for (i = 0; i < thread_count; i++) {
3748         /*
3749          * we use it as a indicator which shows if the thread is
3750          * properly init'd or not
3751          */
3752         if (!decomp_param[i].compbuf) {
3753             break;
3754         }
3755 
3756         qemu_mutex_lock(&decomp_param[i].mutex);
3757         decomp_param[i].quit = true;
3758         qemu_cond_signal(&decomp_param[i].cond);
3759         qemu_mutex_unlock(&decomp_param[i].mutex);
3760     }
3761     for (i = 0; i < thread_count; i++) {
3762         if (!decomp_param[i].compbuf) {
3763             break;
3764         }
3765 
3766         qemu_thread_join(decompress_threads + i);
3767         qemu_mutex_destroy(&decomp_param[i].mutex);
3768         qemu_cond_destroy(&decomp_param[i].cond);
3769         inflateEnd(&decomp_param[i].stream);
3770         g_free(decomp_param[i].compbuf);
3771         decomp_param[i].compbuf = NULL;
3772     }
3773     g_free(decompress_threads);
3774     g_free(decomp_param);
3775     decompress_threads = NULL;
3776     decomp_param = NULL;
3777     decomp_file = NULL;
3778 }
3779 
3780 static int compress_threads_load_setup(QEMUFile *f)
3781 {
3782     int i, thread_count;
3783 
3784     if (!migrate_use_compression()) {
3785         return 0;
3786     }
3787 
3788     thread_count = migrate_decompress_threads();
3789     decompress_threads = g_new0(QemuThread, thread_count);
3790     decomp_param = g_new0(DecompressParam, thread_count);
3791     qemu_mutex_init(&decomp_done_lock);
3792     qemu_cond_init(&decomp_done_cond);
3793     decomp_file = f;
3794     for (i = 0; i < thread_count; i++) {
3795         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3796             goto exit;
3797         }
3798 
3799         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3800         qemu_mutex_init(&decomp_param[i].mutex);
3801         qemu_cond_init(&decomp_param[i].cond);
3802         decomp_param[i].done = true;
3803         decomp_param[i].quit = false;
3804         qemu_thread_create(decompress_threads + i, "decompress",
3805                            do_data_decompress, decomp_param + i,
3806                            QEMU_THREAD_JOINABLE);
3807     }
3808     return 0;
3809 exit:
3810     compress_threads_load_cleanup();
3811     return -1;
3812 }
3813 
3814 static void decompress_data_with_multi_threads(QEMUFile *f,
3815                                                void *host, int len)
3816 {
3817     int idx, thread_count;
3818 
3819     thread_count = migrate_decompress_threads();
3820     qemu_mutex_lock(&decomp_done_lock);
3821     while (true) {
3822         for (idx = 0; idx < thread_count; idx++) {
3823             if (decomp_param[idx].done) {
3824                 decomp_param[idx].done = false;
3825                 qemu_mutex_lock(&decomp_param[idx].mutex);
3826                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3827                 decomp_param[idx].des = host;
3828                 decomp_param[idx].len = len;
3829                 qemu_cond_signal(&decomp_param[idx].cond);
3830                 qemu_mutex_unlock(&decomp_param[idx].mutex);
3831                 break;
3832             }
3833         }
3834         if (idx < thread_count) {
3835             break;
3836         } else {
3837             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3838         }
3839     }
3840     qemu_mutex_unlock(&decomp_done_lock);
3841 }
3842 
3843 /*
3844  * colo cache: this is for secondary VM, we cache the whole
3845  * memory of the secondary VM, it is need to hold the global lock
3846  * to call this helper.
3847  */
3848 int colo_init_ram_cache(void)
3849 {
3850     RAMBlock *block;
3851 
3852     rcu_read_lock();
3853     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3854         block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3855                                                 NULL,
3856                                                 false);
3857         if (!block->colo_cache) {
3858             error_report("%s: Can't alloc memory for COLO cache of block %s,"
3859                          "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3860                          block->used_length);
3861             goto out_locked;
3862         }
3863         memcpy(block->colo_cache, block->host, block->used_length);
3864     }
3865     rcu_read_unlock();
3866     /*
3867     * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3868     * with to decide which page in cache should be flushed into SVM's RAM. Here
3869     * we use the same name 'ram_bitmap' as for migration.
3870     */
3871     if (ram_bytes_total()) {
3872         RAMBlock *block;
3873 
3874         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3875             unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3876 
3877             block->bmap = bitmap_new(pages);
3878             bitmap_set(block->bmap, 0, pages);
3879         }
3880     }
3881     ram_state = g_new0(RAMState, 1);
3882     ram_state->migration_dirty_pages = 0;
3883     memory_global_dirty_log_start();
3884 
3885     return 0;
3886 
3887 out_locked:
3888 
3889     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3890         if (block->colo_cache) {
3891             qemu_anon_ram_free(block->colo_cache, block->used_length);
3892             block->colo_cache = NULL;
3893         }
3894     }
3895 
3896     rcu_read_unlock();
3897     return -errno;
3898 }
3899 
3900 /* It is need to hold the global lock to call this helper */
3901 void colo_release_ram_cache(void)
3902 {
3903     RAMBlock *block;
3904 
3905     memory_global_dirty_log_stop();
3906     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3907         g_free(block->bmap);
3908         block->bmap = NULL;
3909     }
3910 
3911     rcu_read_lock();
3912 
3913     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3914         if (block->colo_cache) {
3915             qemu_anon_ram_free(block->colo_cache, block->used_length);
3916             block->colo_cache = NULL;
3917         }
3918     }
3919 
3920     rcu_read_unlock();
3921     g_free(ram_state);
3922     ram_state = NULL;
3923 }
3924 
3925 /**
3926  * ram_load_setup: Setup RAM for migration incoming side
3927  *
3928  * Returns zero to indicate success and negative for error
3929  *
3930  * @f: QEMUFile where to receive the data
3931  * @opaque: RAMState pointer
3932  */
3933 static int ram_load_setup(QEMUFile *f, void *opaque)
3934 {
3935     if (compress_threads_load_setup(f)) {
3936         return -1;
3937     }
3938 
3939     xbzrle_load_setup();
3940     ramblock_recv_map_init();
3941 
3942     return 0;
3943 }
3944 
3945 static int ram_load_cleanup(void *opaque)
3946 {
3947     RAMBlock *rb;
3948 
3949     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3950         if (ramblock_is_pmem(rb)) {
3951             pmem_persist(rb->host, rb->used_length);
3952         }
3953     }
3954 
3955     xbzrle_load_cleanup();
3956     compress_threads_load_cleanup();
3957 
3958     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
3959         g_free(rb->receivedmap);
3960         rb->receivedmap = NULL;
3961     }
3962 
3963     return 0;
3964 }
3965 
3966 /**
3967  * ram_postcopy_incoming_init: allocate postcopy data structures
3968  *
3969  * Returns 0 for success and negative if there was one error
3970  *
3971  * @mis: current migration incoming state
3972  *
3973  * Allocate data structures etc needed by incoming migration with
3974  * postcopy-ram. postcopy-ram's similarly names
3975  * postcopy_ram_incoming_init does the work.
3976  */
3977 int ram_postcopy_incoming_init(MigrationIncomingState *mis)
3978 {
3979     return postcopy_ram_incoming_init(mis);
3980 }
3981 
3982 /**
3983  * ram_load_postcopy: load a page in postcopy case
3984  *
3985  * Returns 0 for success or -errno in case of error
3986  *
3987  * Called in postcopy mode by ram_load().
3988  * rcu_read_lock is taken prior to this being called.
3989  *
3990  * @f: QEMUFile where to send the data
3991  */
3992 static int ram_load_postcopy(QEMUFile *f)
3993 {
3994     int flags = 0, ret = 0;
3995     bool place_needed = false;
3996     bool matches_target_page_size = false;
3997     MigrationIncomingState *mis = migration_incoming_get_current();
3998     /* Temporary page that is later 'placed' */
3999     void *postcopy_host_page = postcopy_get_tmp_page(mis);
4000     void *last_host = NULL;
4001     bool all_zero = false;
4002 
4003     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4004         ram_addr_t addr;
4005         void *host = NULL;
4006         void *page_buffer = NULL;
4007         void *place_source = NULL;
4008         RAMBlock *block = NULL;
4009         uint8_t ch;
4010 
4011         addr = qemu_get_be64(f);
4012 
4013         /*
4014          * If qemu file error, we should stop here, and then "addr"
4015          * may be invalid
4016          */
4017         ret = qemu_file_get_error(f);
4018         if (ret) {
4019             break;
4020         }
4021 
4022         flags = addr & ~TARGET_PAGE_MASK;
4023         addr &= TARGET_PAGE_MASK;
4024 
4025         trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4026         place_needed = false;
4027         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE)) {
4028             block = ram_block_from_stream(f, flags);
4029 
4030             host = host_from_ram_block_offset(block, addr);
4031             if (!host) {
4032                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4033                 ret = -EINVAL;
4034                 break;
4035             }
4036             matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4037             /*
4038              * Postcopy requires that we place whole host pages atomically;
4039              * these may be huge pages for RAMBlocks that are backed by
4040              * hugetlbfs.
4041              * To make it atomic, the data is read into a temporary page
4042              * that's moved into place later.
4043              * The migration protocol uses,  possibly smaller, target-pages
4044              * however the source ensures it always sends all the components
4045              * of a host page in order.
4046              */
4047             page_buffer = postcopy_host_page +
4048                           ((uintptr_t)host & (block->page_size - 1));
4049             /* If all TP are zero then we can optimise the place */
4050             if (!((uintptr_t)host & (block->page_size - 1))) {
4051                 all_zero = true;
4052             } else {
4053                 /* not the 1st TP within the HP */
4054                 if (host != (last_host + TARGET_PAGE_SIZE)) {
4055                     error_report("Non-sequential target page %p/%p",
4056                                   host, last_host);
4057                     ret = -EINVAL;
4058                     break;
4059                 }
4060             }
4061 
4062 
4063             /*
4064              * If it's the last part of a host page then we place the host
4065              * page
4066              */
4067             place_needed = (((uintptr_t)host + TARGET_PAGE_SIZE) &
4068                                      (block->page_size - 1)) == 0;
4069             place_source = postcopy_host_page;
4070         }
4071         last_host = host;
4072 
4073         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4074         case RAM_SAVE_FLAG_ZERO:
4075             ch = qemu_get_byte(f);
4076             memset(page_buffer, ch, TARGET_PAGE_SIZE);
4077             if (ch) {
4078                 all_zero = false;
4079             }
4080             break;
4081 
4082         case RAM_SAVE_FLAG_PAGE:
4083             all_zero = false;
4084             if (!matches_target_page_size) {
4085                 /* For huge pages, we always use temporary buffer */
4086                 qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4087             } else {
4088                 /*
4089                  * For small pages that matches target page size, we
4090                  * avoid the qemu_file copy.  Instead we directly use
4091                  * the buffer of QEMUFile to place the page.  Note: we
4092                  * cannot do any QEMUFile operation before using that
4093                  * buffer to make sure the buffer is valid when
4094                  * placing the page.
4095                  */
4096                 qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4097                                          TARGET_PAGE_SIZE);
4098             }
4099             break;
4100         case RAM_SAVE_FLAG_EOS:
4101             /* normal exit */
4102             multifd_recv_sync_main();
4103             break;
4104         default:
4105             error_report("Unknown combination of migration flags: %#x"
4106                          " (postcopy mode)", flags);
4107             ret = -EINVAL;
4108             break;
4109         }
4110 
4111         /* Detect for any possible file errors */
4112         if (!ret && qemu_file_get_error(f)) {
4113             ret = qemu_file_get_error(f);
4114         }
4115 
4116         if (!ret && place_needed) {
4117             /* This gets called at the last target page in the host page */
4118             void *place_dest = host + TARGET_PAGE_SIZE - block->page_size;
4119 
4120             if (all_zero) {
4121                 ret = postcopy_place_page_zero(mis, place_dest,
4122                                                block);
4123             } else {
4124                 ret = postcopy_place_page(mis, place_dest,
4125                                           place_source, block);
4126             }
4127         }
4128     }
4129 
4130     return ret;
4131 }
4132 
4133 static bool postcopy_is_advised(void)
4134 {
4135     PostcopyState ps = postcopy_state_get();
4136     return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4137 }
4138 
4139 static bool postcopy_is_running(void)
4140 {
4141     PostcopyState ps = postcopy_state_get();
4142     return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4143 }
4144 
4145 /*
4146  * Flush content of RAM cache into SVM's memory.
4147  * Only flush the pages that be dirtied by PVM or SVM or both.
4148  */
4149 static void colo_flush_ram_cache(void)
4150 {
4151     RAMBlock *block = NULL;
4152     void *dst_host;
4153     void *src_host;
4154     unsigned long offset = 0;
4155 
4156     memory_global_dirty_log_sync();
4157     rcu_read_lock();
4158     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4159         migration_bitmap_sync_range(ram_state, block, 0, block->used_length);
4160     }
4161     rcu_read_unlock();
4162 
4163     trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4164     rcu_read_lock();
4165     block = QLIST_FIRST_RCU(&ram_list.blocks);
4166 
4167     while (block) {
4168         offset = migration_bitmap_find_dirty(ram_state, block, offset);
4169 
4170         if (offset << TARGET_PAGE_BITS >= block->used_length) {
4171             offset = 0;
4172             block = QLIST_NEXT_RCU(block, next);
4173         } else {
4174             migration_bitmap_clear_dirty(ram_state, block, offset);
4175             dst_host = block->host + (offset << TARGET_PAGE_BITS);
4176             src_host = block->colo_cache + (offset << TARGET_PAGE_BITS);
4177             memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4178         }
4179     }
4180 
4181     rcu_read_unlock();
4182     trace_colo_flush_ram_cache_end();
4183 }
4184 
4185 static int ram_load(QEMUFile *f, void *opaque, int version_id)
4186 {
4187     int flags = 0, ret = 0, invalid_flags = 0;
4188     static uint64_t seq_iter;
4189     int len = 0;
4190     /*
4191      * If system is running in postcopy mode, page inserts to host memory must
4192      * be atomic
4193      */
4194     bool postcopy_running = postcopy_is_running();
4195     /* ADVISE is earlier, it shows the source has the postcopy capability on */
4196     bool postcopy_advised = postcopy_is_advised();
4197 
4198     seq_iter++;
4199 
4200     if (version_id != 4) {
4201         ret = -EINVAL;
4202     }
4203 
4204     if (!migrate_use_compression()) {
4205         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4206     }
4207     /* This RCU critical section can be very long running.
4208      * When RCU reclaims in the code start to become numerous,
4209      * it will be necessary to reduce the granularity of this
4210      * critical section.
4211      */
4212     rcu_read_lock();
4213 
4214     if (postcopy_running) {
4215         ret = ram_load_postcopy(f);
4216     }
4217 
4218     while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4219         ram_addr_t addr, total_ram_bytes;
4220         void *host = NULL;
4221         uint8_t ch;
4222 
4223         addr = qemu_get_be64(f);
4224         flags = addr & ~TARGET_PAGE_MASK;
4225         addr &= TARGET_PAGE_MASK;
4226 
4227         if (flags & invalid_flags) {
4228             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4229                 error_report("Received an unexpected compressed page");
4230             }
4231 
4232             ret = -EINVAL;
4233             break;
4234         }
4235 
4236         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4237                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4238             RAMBlock *block = ram_block_from_stream(f, flags);
4239 
4240             /*
4241              * After going into COLO, we should load the Page into colo_cache.
4242              */
4243             if (migration_incoming_in_colo_state()) {
4244                 host = colo_cache_from_block_offset(block, addr);
4245             } else {
4246                 host = host_from_ram_block_offset(block, addr);
4247             }
4248             if (!host) {
4249                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4250                 ret = -EINVAL;
4251                 break;
4252             }
4253 
4254             if (!migration_incoming_in_colo_state()) {
4255                 ramblock_recv_bitmap_set(block, host);
4256             }
4257 
4258             trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4259         }
4260 
4261         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4262         case RAM_SAVE_FLAG_MEM_SIZE:
4263             /* Synchronize RAM block list */
4264             total_ram_bytes = addr;
4265             while (!ret && total_ram_bytes) {
4266                 RAMBlock *block;
4267                 char id[256];
4268                 ram_addr_t length;
4269 
4270                 len = qemu_get_byte(f);
4271                 qemu_get_buffer(f, (uint8_t *)id, len);
4272                 id[len] = 0;
4273                 length = qemu_get_be64(f);
4274 
4275                 block = qemu_ram_block_by_name(id);
4276                 if (block && !qemu_ram_is_migratable(block)) {
4277                     error_report("block %s should not be migrated !", id);
4278                     ret = -EINVAL;
4279                 } else if (block) {
4280                     if (length != block->used_length) {
4281                         Error *local_err = NULL;
4282 
4283                         ret = qemu_ram_resize(block, length,
4284                                               &local_err);
4285                         if (local_err) {
4286                             error_report_err(local_err);
4287                         }
4288                     }
4289                     /* For postcopy we need to check hugepage sizes match */
4290                     if (postcopy_advised &&
4291                         block->page_size != qemu_host_page_size) {
4292                         uint64_t remote_page_size = qemu_get_be64(f);
4293                         if (remote_page_size != block->page_size) {
4294                             error_report("Mismatched RAM page size %s "
4295                                          "(local) %zd != %" PRId64,
4296                                          id, block->page_size,
4297                                          remote_page_size);
4298                             ret = -EINVAL;
4299                         }
4300                     }
4301                     if (migrate_ignore_shared()) {
4302                         hwaddr addr = qemu_get_be64(f);
4303                         bool ignored = qemu_get_byte(f);
4304                         if (ignored != ramblock_is_ignored(block)) {
4305                             error_report("RAM block %s should %s be migrated",
4306                                          id, ignored ? "" : "not");
4307                             ret = -EINVAL;
4308                         }
4309                         if (ramblock_is_ignored(block) &&
4310                             block->mr->addr != addr) {
4311                             error_report("Mismatched GPAs for block %s "
4312                                          "%" PRId64 "!= %" PRId64,
4313                                          id, (uint64_t)addr,
4314                                          (uint64_t)block->mr->addr);
4315                             ret = -EINVAL;
4316                         }
4317                     }
4318                     ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4319                                           block->idstr);
4320                 } else {
4321                     error_report("Unknown ramblock \"%s\", cannot "
4322                                  "accept migration", id);
4323                     ret = -EINVAL;
4324                 }
4325 
4326                 total_ram_bytes -= length;
4327             }
4328             break;
4329 
4330         case RAM_SAVE_FLAG_ZERO:
4331             ch = qemu_get_byte(f);
4332             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4333             break;
4334 
4335         case RAM_SAVE_FLAG_PAGE:
4336             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4337             break;
4338 
4339         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4340             len = qemu_get_be32(f);
4341             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4342                 error_report("Invalid compressed data length: %d", len);
4343                 ret = -EINVAL;
4344                 break;
4345             }
4346             decompress_data_with_multi_threads(f, host, len);
4347             break;
4348 
4349         case RAM_SAVE_FLAG_XBZRLE:
4350             if (load_xbzrle(f, addr, host) < 0) {
4351                 error_report("Failed to decompress XBZRLE page at "
4352                              RAM_ADDR_FMT, addr);
4353                 ret = -EINVAL;
4354                 break;
4355             }
4356             break;
4357         case RAM_SAVE_FLAG_EOS:
4358             /* normal exit */
4359             multifd_recv_sync_main();
4360             break;
4361         default:
4362             if (flags & RAM_SAVE_FLAG_HOOK) {
4363                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4364             } else {
4365                 error_report("Unknown combination of migration flags: %#x",
4366                              flags);
4367                 ret = -EINVAL;
4368             }
4369         }
4370         if (!ret) {
4371             ret = qemu_file_get_error(f);
4372         }
4373     }
4374 
4375     ret |= wait_for_decompress_done();
4376     rcu_read_unlock();
4377     trace_ram_load_complete(ret, seq_iter);
4378 
4379     if (!ret  && migration_incoming_in_colo_state()) {
4380         colo_flush_ram_cache();
4381     }
4382     return ret;
4383 }
4384 
4385 static bool ram_has_postcopy(void *opaque)
4386 {
4387     RAMBlock *rb;
4388     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4389         if (ramblock_is_pmem(rb)) {
4390             info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4391                          "is not supported now!", rb->idstr, rb->host);
4392             return false;
4393         }
4394     }
4395 
4396     return migrate_postcopy_ram();
4397 }
4398 
4399 /* Sync all the dirty bitmap with destination VM.  */
4400 static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4401 {
4402     RAMBlock *block;
4403     QEMUFile *file = s->to_dst_file;
4404     int ramblock_count = 0;
4405 
4406     trace_ram_dirty_bitmap_sync_start();
4407 
4408     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4409         qemu_savevm_send_recv_bitmap(file, block->idstr);
4410         trace_ram_dirty_bitmap_request(block->idstr);
4411         ramblock_count++;
4412     }
4413 
4414     trace_ram_dirty_bitmap_sync_wait();
4415 
4416     /* Wait until all the ramblocks' dirty bitmap synced */
4417     while (ramblock_count--) {
4418         qemu_sem_wait(&s->rp_state.rp_sem);
4419     }
4420 
4421     trace_ram_dirty_bitmap_sync_complete();
4422 
4423     return 0;
4424 }
4425 
4426 static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4427 {
4428     qemu_sem_post(&s->rp_state.rp_sem);
4429 }
4430 
4431 /*
4432  * Read the received bitmap, revert it as the initial dirty bitmap.
4433  * This is only used when the postcopy migration is paused but wants
4434  * to resume from a middle point.
4435  */
4436 int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4437 {
4438     int ret = -EINVAL;
4439     QEMUFile *file = s->rp_state.from_dst_file;
4440     unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4441     uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4442     uint64_t size, end_mark;
4443 
4444     trace_ram_dirty_bitmap_reload_begin(block->idstr);
4445 
4446     if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4447         error_report("%s: incorrect state %s", __func__,
4448                      MigrationStatus_str(s->state));
4449         return -EINVAL;
4450     }
4451 
4452     /*
4453      * Note: see comments in ramblock_recv_bitmap_send() on why we
4454      * need the endianess convertion, and the paddings.
4455      */
4456     local_size = ROUND_UP(local_size, 8);
4457 
4458     /* Add paddings */
4459     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4460 
4461     size = qemu_get_be64(file);
4462 
4463     /* The size of the bitmap should match with our ramblock */
4464     if (size != local_size) {
4465         error_report("%s: ramblock '%s' bitmap size mismatch "
4466                      "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4467                      block->idstr, size, local_size);
4468         ret = -EINVAL;
4469         goto out;
4470     }
4471 
4472     size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4473     end_mark = qemu_get_be64(file);
4474 
4475     ret = qemu_file_get_error(file);
4476     if (ret || size != local_size) {
4477         error_report("%s: read bitmap failed for ramblock '%s': %d"
4478                      " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4479                      __func__, block->idstr, ret, local_size, size);
4480         ret = -EIO;
4481         goto out;
4482     }
4483 
4484     if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4485         error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4486                      __func__, block->idstr, end_mark);
4487         ret = -EINVAL;
4488         goto out;
4489     }
4490 
4491     /*
4492      * Endianess convertion. We are during postcopy (though paused).
4493      * The dirty bitmap won't change. We can directly modify it.
4494      */
4495     bitmap_from_le(block->bmap, le_bitmap, nbits);
4496 
4497     /*
4498      * What we received is "received bitmap". Revert it as the initial
4499      * dirty bitmap for this ramblock.
4500      */
4501     bitmap_complement(block->bmap, block->bmap, nbits);
4502 
4503     trace_ram_dirty_bitmap_reload_complete(block->idstr);
4504 
4505     /*
4506      * We succeeded to sync bitmap for current ramblock. If this is
4507      * the last one to sync, we need to notify the main send thread.
4508      */
4509     ram_dirty_bitmap_reload_notify(s);
4510 
4511     ret = 0;
4512 out:
4513     g_free(le_bitmap);
4514     return ret;
4515 }
4516 
4517 static int ram_resume_prepare(MigrationState *s, void *opaque)
4518 {
4519     RAMState *rs = *(RAMState **)opaque;
4520     int ret;
4521 
4522     ret = ram_dirty_bitmap_sync_all(s, rs);
4523     if (ret) {
4524         return ret;
4525     }
4526 
4527     ram_state_resume_prepare(rs, s->to_dst_file);
4528 
4529     return 0;
4530 }
4531 
4532 static SaveVMHandlers savevm_ram_handlers = {
4533     .save_setup = ram_save_setup,
4534     .save_live_iterate = ram_save_iterate,
4535     .save_live_complete_postcopy = ram_save_complete,
4536     .save_live_complete_precopy = ram_save_complete,
4537     .has_postcopy = ram_has_postcopy,
4538     .save_live_pending = ram_save_pending,
4539     .load_state = ram_load,
4540     .save_cleanup = ram_save_cleanup,
4541     .load_setup = ram_load_setup,
4542     .load_cleanup = ram_load_cleanup,
4543     .resume_prepare = ram_resume_prepare,
4544 };
4545 
4546 void ram_mig_init(void)
4547 {
4548     qemu_mutex_init(&XBZRLE.lock);
4549     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, &ram_state);
4550 }
4551