xref: /openbmc/qemu/migration/ram.c (revision 93062e23)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 
29 #include "qemu/osdep.h"
30 #include "cpu.h"
31 #include <zlib.h>
32 #include "qemu/cutils.h"
33 #include "qemu/bitops.h"
34 #include "qemu/bitmap.h"
35 #include "qemu/main-loop.h"
36 #include "xbzrle.h"
37 #include "ram.h"
38 #include "migration.h"
39 #include "socket.h"
40 #include "migration/register.h"
41 #include "migration/misc.h"
42 #include "qemu-file.h"
43 #include "postcopy-ram.h"
44 #include "page_cache.h"
45 #include "qemu/error-report.h"
46 #include "qapi/error.h"
47 #include "qapi/qapi-events-migration.h"
48 #include "qapi/qmp/qerror.h"
49 #include "trace.h"
50 #include "exec/ram_addr.h"
51 #include "exec/target_page.h"
52 #include "qemu/rcu_queue.h"
53 #include "migration/colo.h"
54 #include "block.h"
55 #include "sysemu/sysemu.h"
56 #include "qemu/uuid.h"
57 #include "savevm.h"
58 #include "qemu/iov.h"
59 
60 /***********************************************************/
61 /* ram save/restore */
62 
63 /* RAM_SAVE_FLAG_ZERO used to be named RAM_SAVE_FLAG_COMPRESS, it
64  * worked for pages that where filled with the same char.  We switched
65  * it to only search for the zero value.  And to avoid confusion with
66  * RAM_SSAVE_FLAG_COMPRESS_PAGE just rename it.
67  */
68 
69 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
70 #define RAM_SAVE_FLAG_ZERO     0x02
71 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
72 #define RAM_SAVE_FLAG_PAGE     0x08
73 #define RAM_SAVE_FLAG_EOS      0x10
74 #define RAM_SAVE_FLAG_CONTINUE 0x20
75 #define RAM_SAVE_FLAG_XBZRLE   0x40
76 /* 0x80 is reserved in migration.h start with 0x100 next */
77 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
78 
79 static inline bool is_zero_range(uint8_t *p, uint64_t size)
80 {
81     return buffer_is_zero(p, size);
82 }
83 
84 XBZRLECacheStats xbzrle_counters;
85 
86 /* struct contains XBZRLE cache and a static page
87    used by the compression */
88 static struct {
89     /* buffer used for XBZRLE encoding */
90     uint8_t *encoded_buf;
91     /* buffer for storing page content */
92     uint8_t *current_buf;
93     /* Cache for XBZRLE, Protected by lock. */
94     PageCache *cache;
95     QemuMutex lock;
96     /* it will store a page full of zeros */
97     uint8_t *zero_target_page;
98     /* buffer used for XBZRLE decoding */
99     uint8_t *decoded_buf;
100 } XBZRLE;
101 
102 static void XBZRLE_cache_lock(void)
103 {
104     if (migrate_use_xbzrle())
105         qemu_mutex_lock(&XBZRLE.lock);
106 }
107 
108 static void XBZRLE_cache_unlock(void)
109 {
110     if (migrate_use_xbzrle())
111         qemu_mutex_unlock(&XBZRLE.lock);
112 }
113 
114 /**
115  * xbzrle_cache_resize: resize the xbzrle cache
116  *
117  * This function is called from qmp_migrate_set_cache_size in main
118  * thread, possibly while a migration is in progress.  A running
119  * migration may be using the cache and might finish during this call,
120  * hence changes to the cache are protected by XBZRLE.lock().
121  *
122  * Returns 0 for success or -1 for error
123  *
124  * @new_size: new cache size
125  * @errp: set *errp if the check failed, with reason
126  */
127 int xbzrle_cache_resize(int64_t new_size, Error **errp)
128 {
129     PageCache *new_cache;
130     int64_t ret = 0;
131 
132     /* Check for truncation */
133     if (new_size != (size_t)new_size) {
134         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "cache size",
135                    "exceeding address space");
136         return -1;
137     }
138 
139     if (new_size == migrate_xbzrle_cache_size()) {
140         /* nothing to do */
141         return 0;
142     }
143 
144     XBZRLE_cache_lock();
145 
146     if (XBZRLE.cache != NULL) {
147         new_cache = cache_init(new_size, TARGET_PAGE_SIZE, errp);
148         if (!new_cache) {
149             ret = -1;
150             goto out;
151         }
152 
153         cache_fini(XBZRLE.cache);
154         XBZRLE.cache = new_cache;
155     }
156 out:
157     XBZRLE_cache_unlock();
158     return ret;
159 }
160 
161 static bool ramblock_is_ignored(RAMBlock *block)
162 {
163     return !qemu_ram_is_migratable(block) ||
164            (migrate_ignore_shared() && qemu_ram_is_shared(block));
165 }
166 
167 /* Should be holding either ram_list.mutex, or the RCU lock. */
168 #define RAMBLOCK_FOREACH_NOT_IGNORED(block)            \
169     INTERNAL_RAMBLOCK_FOREACH(block)                   \
170         if (ramblock_is_ignored(block)) {} else
171 
172 #define RAMBLOCK_FOREACH_MIGRATABLE(block)             \
173     INTERNAL_RAMBLOCK_FOREACH(block)                   \
174         if (!qemu_ram_is_migratable(block)) {} else
175 
176 #undef RAMBLOCK_FOREACH
177 
178 int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque)
179 {
180     RAMBlock *block;
181     int ret = 0;
182 
183     RCU_READ_LOCK_GUARD();
184 
185     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
186         ret = func(block, opaque);
187         if (ret) {
188             break;
189         }
190     }
191     return ret;
192 }
193 
194 static void ramblock_recv_map_init(void)
195 {
196     RAMBlock *rb;
197 
198     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
199         assert(!rb->receivedmap);
200         rb->receivedmap = bitmap_new(rb->max_length >> qemu_target_page_bits());
201     }
202 }
203 
204 int ramblock_recv_bitmap_test(RAMBlock *rb, void *host_addr)
205 {
206     return test_bit(ramblock_recv_bitmap_offset(host_addr, rb),
207                     rb->receivedmap);
208 }
209 
210 bool ramblock_recv_bitmap_test_byte_offset(RAMBlock *rb, uint64_t byte_offset)
211 {
212     return test_bit(byte_offset >> TARGET_PAGE_BITS, rb->receivedmap);
213 }
214 
215 void ramblock_recv_bitmap_set(RAMBlock *rb, void *host_addr)
216 {
217     set_bit_atomic(ramblock_recv_bitmap_offset(host_addr, rb), rb->receivedmap);
218 }
219 
220 void ramblock_recv_bitmap_set_range(RAMBlock *rb, void *host_addr,
221                                     size_t nr)
222 {
223     bitmap_set_atomic(rb->receivedmap,
224                       ramblock_recv_bitmap_offset(host_addr, rb),
225                       nr);
226 }
227 
228 #define  RAMBLOCK_RECV_BITMAP_ENDING  (0x0123456789abcdefULL)
229 
230 /*
231  * Format: bitmap_size (8 bytes) + whole_bitmap (N bytes).
232  *
233  * Returns >0 if success with sent bytes, or <0 if error.
234  */
235 int64_t ramblock_recv_bitmap_send(QEMUFile *file,
236                                   const char *block_name)
237 {
238     RAMBlock *block = qemu_ram_block_by_name(block_name);
239     unsigned long *le_bitmap, nbits;
240     uint64_t size;
241 
242     if (!block) {
243         error_report("%s: invalid block name: %s", __func__, block_name);
244         return -1;
245     }
246 
247     nbits = block->used_length >> TARGET_PAGE_BITS;
248 
249     /*
250      * Make sure the tmp bitmap buffer is big enough, e.g., on 32bit
251      * machines we may need 4 more bytes for padding (see below
252      * comment). So extend it a bit before hand.
253      */
254     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
255 
256     /*
257      * Always use little endian when sending the bitmap. This is
258      * required that when source and destination VMs are not using the
259      * same endianess. (Note: big endian won't work.)
260      */
261     bitmap_to_le(le_bitmap, block->receivedmap, nbits);
262 
263     /* Size of the bitmap, in bytes */
264     size = DIV_ROUND_UP(nbits, 8);
265 
266     /*
267      * size is always aligned to 8 bytes for 64bit machines, but it
268      * may not be true for 32bit machines. We need this padding to
269      * make sure the migration can survive even between 32bit and
270      * 64bit machines.
271      */
272     size = ROUND_UP(size, 8);
273 
274     qemu_put_be64(file, size);
275     qemu_put_buffer(file, (const uint8_t *)le_bitmap, size);
276     /*
277      * Mark as an end, in case the middle part is screwed up due to
278      * some "misterious" reason.
279      */
280     qemu_put_be64(file, RAMBLOCK_RECV_BITMAP_ENDING);
281     qemu_fflush(file);
282 
283     g_free(le_bitmap);
284 
285     if (qemu_file_get_error(file)) {
286         return qemu_file_get_error(file);
287     }
288 
289     return size + sizeof(size);
290 }
291 
292 /*
293  * An outstanding page request, on the source, having been received
294  * and queued
295  */
296 struct RAMSrcPageRequest {
297     RAMBlock *rb;
298     hwaddr    offset;
299     hwaddr    len;
300 
301     QSIMPLEQ_ENTRY(RAMSrcPageRequest) next_req;
302 };
303 
304 /* State of RAM for migration */
305 struct RAMState {
306     /* QEMUFile used for this migration */
307     QEMUFile *f;
308     /* Last block that we have visited searching for dirty pages */
309     RAMBlock *last_seen_block;
310     /* Last block from where we have sent data */
311     RAMBlock *last_sent_block;
312     /* Last dirty target page we have sent */
313     ram_addr_t last_page;
314     /* last ram version we have seen */
315     uint32_t last_version;
316     /* We are in the first round */
317     bool ram_bulk_stage;
318     /* The free page optimization is enabled */
319     bool fpo_enabled;
320     /* How many times we have dirty too many pages */
321     int dirty_rate_high_cnt;
322     /* these variables are used for bitmap sync */
323     /* last time we did a full bitmap_sync */
324     int64_t time_last_bitmap_sync;
325     /* bytes transferred at start_time */
326     uint64_t bytes_xfer_prev;
327     /* number of dirty pages since start_time */
328     uint64_t num_dirty_pages_period;
329     /* xbzrle misses since the beginning of the period */
330     uint64_t xbzrle_cache_miss_prev;
331 
332     /* compression statistics since the beginning of the period */
333     /* amount of count that no free thread to compress data */
334     uint64_t compress_thread_busy_prev;
335     /* amount bytes after compression */
336     uint64_t compressed_size_prev;
337     /* amount of compressed pages */
338     uint64_t compress_pages_prev;
339 
340     /* total handled target pages at the beginning of period */
341     uint64_t target_page_count_prev;
342     /* total handled target pages since start */
343     uint64_t target_page_count;
344     /* number of dirty bits in the bitmap */
345     uint64_t migration_dirty_pages;
346     /* Protects modification of the bitmap and migration dirty pages */
347     QemuMutex bitmap_mutex;
348     /* The RAMBlock used in the last src_page_requests */
349     RAMBlock *last_req_rb;
350     /* Queue of outstanding page requests from the destination */
351     QemuMutex src_page_req_mutex;
352     QSIMPLEQ_HEAD(, RAMSrcPageRequest) src_page_requests;
353 };
354 typedef struct RAMState RAMState;
355 
356 static RAMState *ram_state;
357 
358 static NotifierWithReturnList precopy_notifier_list;
359 
360 void precopy_infrastructure_init(void)
361 {
362     notifier_with_return_list_init(&precopy_notifier_list);
363 }
364 
365 void precopy_add_notifier(NotifierWithReturn *n)
366 {
367     notifier_with_return_list_add(&precopy_notifier_list, n);
368 }
369 
370 void precopy_remove_notifier(NotifierWithReturn *n)
371 {
372     notifier_with_return_remove(n);
373 }
374 
375 int precopy_notify(PrecopyNotifyReason reason, Error **errp)
376 {
377     PrecopyNotifyData pnd;
378     pnd.reason = reason;
379     pnd.errp = errp;
380 
381     return notifier_with_return_list_notify(&precopy_notifier_list, &pnd);
382 }
383 
384 void precopy_enable_free_page_optimization(void)
385 {
386     if (!ram_state) {
387         return;
388     }
389 
390     ram_state->fpo_enabled = true;
391 }
392 
393 uint64_t ram_bytes_remaining(void)
394 {
395     return ram_state ? (ram_state->migration_dirty_pages * TARGET_PAGE_SIZE) :
396                        0;
397 }
398 
399 MigrationStats ram_counters;
400 
401 /* used by the search for pages to send */
402 struct PageSearchStatus {
403     /* Current block being searched */
404     RAMBlock    *block;
405     /* Current page to search from */
406     unsigned long page;
407     /* Set once we wrap around */
408     bool         complete_round;
409 };
410 typedef struct PageSearchStatus PageSearchStatus;
411 
412 CompressionStats compression_counters;
413 
414 struct CompressParam {
415     bool done;
416     bool quit;
417     bool zero_page;
418     QEMUFile *file;
419     QemuMutex mutex;
420     QemuCond cond;
421     RAMBlock *block;
422     ram_addr_t offset;
423 
424     /* internally used fields */
425     z_stream stream;
426     uint8_t *originbuf;
427 };
428 typedef struct CompressParam CompressParam;
429 
430 struct DecompressParam {
431     bool done;
432     bool quit;
433     QemuMutex mutex;
434     QemuCond cond;
435     void *des;
436     uint8_t *compbuf;
437     int len;
438     z_stream stream;
439 };
440 typedef struct DecompressParam DecompressParam;
441 
442 static CompressParam *comp_param;
443 static QemuThread *compress_threads;
444 /* comp_done_cond is used to wake up the migration thread when
445  * one of the compression threads has finished the compression.
446  * comp_done_lock is used to co-work with comp_done_cond.
447  */
448 static QemuMutex comp_done_lock;
449 static QemuCond comp_done_cond;
450 /* The empty QEMUFileOps will be used by file in CompressParam */
451 static const QEMUFileOps empty_ops = { };
452 
453 static QEMUFile *decomp_file;
454 static DecompressParam *decomp_param;
455 static QemuThread *decompress_threads;
456 static QemuMutex decomp_done_lock;
457 static QemuCond decomp_done_cond;
458 
459 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
460                                  ram_addr_t offset, uint8_t *source_buf);
461 
462 static void *do_data_compress(void *opaque)
463 {
464     CompressParam *param = opaque;
465     RAMBlock *block;
466     ram_addr_t offset;
467     bool zero_page;
468 
469     qemu_mutex_lock(&param->mutex);
470     while (!param->quit) {
471         if (param->block) {
472             block = param->block;
473             offset = param->offset;
474             param->block = NULL;
475             qemu_mutex_unlock(&param->mutex);
476 
477             zero_page = do_compress_ram_page(param->file, &param->stream,
478                                              block, offset, param->originbuf);
479 
480             qemu_mutex_lock(&comp_done_lock);
481             param->done = true;
482             param->zero_page = zero_page;
483             qemu_cond_signal(&comp_done_cond);
484             qemu_mutex_unlock(&comp_done_lock);
485 
486             qemu_mutex_lock(&param->mutex);
487         } else {
488             qemu_cond_wait(&param->cond, &param->mutex);
489         }
490     }
491     qemu_mutex_unlock(&param->mutex);
492 
493     return NULL;
494 }
495 
496 static void compress_threads_save_cleanup(void)
497 {
498     int i, thread_count;
499 
500     if (!migrate_use_compression() || !comp_param) {
501         return;
502     }
503 
504     thread_count = migrate_compress_threads();
505     for (i = 0; i < thread_count; i++) {
506         /*
507          * we use it as a indicator which shows if the thread is
508          * properly init'd or not
509          */
510         if (!comp_param[i].file) {
511             break;
512         }
513 
514         qemu_mutex_lock(&comp_param[i].mutex);
515         comp_param[i].quit = true;
516         qemu_cond_signal(&comp_param[i].cond);
517         qemu_mutex_unlock(&comp_param[i].mutex);
518 
519         qemu_thread_join(compress_threads + i);
520         qemu_mutex_destroy(&comp_param[i].mutex);
521         qemu_cond_destroy(&comp_param[i].cond);
522         deflateEnd(&comp_param[i].stream);
523         g_free(comp_param[i].originbuf);
524         qemu_fclose(comp_param[i].file);
525         comp_param[i].file = NULL;
526     }
527     qemu_mutex_destroy(&comp_done_lock);
528     qemu_cond_destroy(&comp_done_cond);
529     g_free(compress_threads);
530     g_free(comp_param);
531     compress_threads = NULL;
532     comp_param = NULL;
533 }
534 
535 static int compress_threads_save_setup(void)
536 {
537     int i, thread_count;
538 
539     if (!migrate_use_compression()) {
540         return 0;
541     }
542     thread_count = migrate_compress_threads();
543     compress_threads = g_new0(QemuThread, thread_count);
544     comp_param = g_new0(CompressParam, thread_count);
545     qemu_cond_init(&comp_done_cond);
546     qemu_mutex_init(&comp_done_lock);
547     for (i = 0; i < thread_count; i++) {
548         comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
549         if (!comp_param[i].originbuf) {
550             goto exit;
551         }
552 
553         if (deflateInit(&comp_param[i].stream,
554                         migrate_compress_level()) != Z_OK) {
555             g_free(comp_param[i].originbuf);
556             goto exit;
557         }
558 
559         /* comp_param[i].file is just used as a dummy buffer to save data,
560          * set its ops to empty.
561          */
562         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
563         comp_param[i].done = true;
564         comp_param[i].quit = false;
565         qemu_mutex_init(&comp_param[i].mutex);
566         qemu_cond_init(&comp_param[i].cond);
567         qemu_thread_create(compress_threads + i, "compress",
568                            do_data_compress, comp_param + i,
569                            QEMU_THREAD_JOINABLE);
570     }
571     return 0;
572 
573 exit:
574     compress_threads_save_cleanup();
575     return -1;
576 }
577 
578 /* Multiple fd's */
579 
580 #define MULTIFD_MAGIC 0x11223344U
581 #define MULTIFD_VERSION 1
582 
583 #define MULTIFD_FLAG_SYNC (1 << 0)
584 
585 /* This value needs to be a multiple of qemu_target_page_size() */
586 #define MULTIFD_PACKET_SIZE (512 * 1024)
587 
588 typedef struct {
589     uint32_t magic;
590     uint32_t version;
591     unsigned char uuid[16]; /* QemuUUID */
592     uint8_t id;
593     uint8_t unused1[7];     /* Reserved for future use */
594     uint64_t unused2[4];    /* Reserved for future use */
595 } __attribute__((packed)) MultiFDInit_t;
596 
597 typedef struct {
598     uint32_t magic;
599     uint32_t version;
600     uint32_t flags;
601     /* maximum number of allocated pages */
602     uint32_t pages_alloc;
603     uint32_t pages_used;
604     /* size of the next packet that contains pages */
605     uint32_t next_packet_size;
606     uint64_t packet_num;
607     uint64_t unused[4];    /* Reserved for future use */
608     char ramblock[256];
609     uint64_t offset[];
610 } __attribute__((packed)) MultiFDPacket_t;
611 
612 typedef struct {
613     /* number of used pages */
614     uint32_t used;
615     /* number of allocated pages */
616     uint32_t allocated;
617     /* global number of generated multifd packets */
618     uint64_t packet_num;
619     /* offset of each page */
620     ram_addr_t *offset;
621     /* pointer to each page */
622     struct iovec *iov;
623     RAMBlock *block;
624 } MultiFDPages_t;
625 
626 typedef struct {
627     /* this fields are not changed once the thread is created */
628     /* channel number */
629     uint8_t id;
630     /* channel thread name */
631     char *name;
632     /* channel thread id */
633     QemuThread thread;
634     /* communication channel */
635     QIOChannel *c;
636     /* sem where to wait for more work */
637     QemuSemaphore sem;
638     /* this mutex protects the following parameters */
639     QemuMutex mutex;
640     /* is this channel thread running */
641     bool running;
642     /* should this thread finish */
643     bool quit;
644     /* thread has work to do */
645     int pending_job;
646     /* array of pages to sent */
647     MultiFDPages_t *pages;
648     /* packet allocated len */
649     uint32_t packet_len;
650     /* pointer to the packet */
651     MultiFDPacket_t *packet;
652     /* multifd flags for each packet */
653     uint32_t flags;
654     /* size of the next packet that contains pages */
655     uint32_t next_packet_size;
656     /* global number of generated multifd packets */
657     uint64_t packet_num;
658     /* thread local variables */
659     /* packets sent through this channel */
660     uint64_t num_packets;
661     /* pages sent through this channel */
662     uint64_t num_pages;
663     /* syncs main thread and channels */
664     QemuSemaphore sem_sync;
665 }  MultiFDSendParams;
666 
667 typedef struct {
668     /* this fields are not changed once the thread is created */
669     /* channel number */
670     uint8_t id;
671     /* channel thread name */
672     char *name;
673     /* channel thread id */
674     QemuThread thread;
675     /* communication channel */
676     QIOChannel *c;
677     /* this mutex protects the following parameters */
678     QemuMutex mutex;
679     /* is this channel thread running */
680     bool running;
681     /* should this thread finish */
682     bool quit;
683     /* array of pages to receive */
684     MultiFDPages_t *pages;
685     /* packet allocated len */
686     uint32_t packet_len;
687     /* pointer to the packet */
688     MultiFDPacket_t *packet;
689     /* multifd flags for each packet */
690     uint32_t flags;
691     /* global number of generated multifd packets */
692     uint64_t packet_num;
693     /* thread local variables */
694     /* size of the next packet that contains pages */
695     uint32_t next_packet_size;
696     /* packets sent through this channel */
697     uint64_t num_packets;
698     /* pages sent through this channel */
699     uint64_t num_pages;
700     /* syncs main thread and channels */
701     QemuSemaphore sem_sync;
702 } MultiFDRecvParams;
703 
704 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
705 {
706     MultiFDInit_t msg = {};
707     int ret;
708 
709     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
710     msg.version = cpu_to_be32(MULTIFD_VERSION);
711     msg.id = p->id;
712     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
713 
714     ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
715     if (ret != 0) {
716         return -1;
717     }
718     return 0;
719 }
720 
721 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
722 {
723     MultiFDInit_t msg;
724     int ret;
725 
726     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
727     if (ret != 0) {
728         return -1;
729     }
730 
731     msg.magic = be32_to_cpu(msg.magic);
732     msg.version = be32_to_cpu(msg.version);
733 
734     if (msg.magic != MULTIFD_MAGIC) {
735         error_setg(errp, "multifd: received packet magic %x "
736                    "expected %x", msg.magic, MULTIFD_MAGIC);
737         return -1;
738     }
739 
740     if (msg.version != MULTIFD_VERSION) {
741         error_setg(errp, "multifd: received packet version %d "
742                    "expected %d", msg.version, MULTIFD_VERSION);
743         return -1;
744     }
745 
746     if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
747         char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
748         char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
749 
750         error_setg(errp, "multifd: received uuid '%s' and expected "
751                    "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
752         g_free(uuid);
753         g_free(msg_uuid);
754         return -1;
755     }
756 
757     if (msg.id > migrate_multifd_channels()) {
758         error_setg(errp, "multifd: received channel version %d "
759                    "expected %d", msg.version, MULTIFD_VERSION);
760         return -1;
761     }
762 
763     return msg.id;
764 }
765 
766 static MultiFDPages_t *multifd_pages_init(size_t size)
767 {
768     MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
769 
770     pages->allocated = size;
771     pages->iov = g_new0(struct iovec, size);
772     pages->offset = g_new0(ram_addr_t, size);
773 
774     return pages;
775 }
776 
777 static void multifd_pages_clear(MultiFDPages_t *pages)
778 {
779     pages->used = 0;
780     pages->allocated = 0;
781     pages->packet_num = 0;
782     pages->block = NULL;
783     g_free(pages->iov);
784     pages->iov = NULL;
785     g_free(pages->offset);
786     pages->offset = NULL;
787     g_free(pages);
788 }
789 
790 static void multifd_send_fill_packet(MultiFDSendParams *p)
791 {
792     MultiFDPacket_t *packet = p->packet;
793     int i;
794 
795     packet->flags = cpu_to_be32(p->flags);
796     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
797     packet->pages_used = cpu_to_be32(p->pages->used);
798     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
799     packet->packet_num = cpu_to_be64(p->packet_num);
800 
801     if (p->pages->block) {
802         strncpy(packet->ramblock, p->pages->block->idstr, 256);
803     }
804 
805     for (i = 0; i < p->pages->used; i++) {
806         packet->offset[i] = cpu_to_be64(p->pages->offset[i]);
807     }
808 }
809 
810 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
811 {
812     MultiFDPacket_t *packet = p->packet;
813     uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size();
814     RAMBlock *block;
815     int i;
816 
817     packet->magic = be32_to_cpu(packet->magic);
818     if (packet->magic != MULTIFD_MAGIC) {
819         error_setg(errp, "multifd: received packet "
820                    "magic %x and expected magic %x",
821                    packet->magic, MULTIFD_MAGIC);
822         return -1;
823     }
824 
825     packet->version = be32_to_cpu(packet->version);
826     if (packet->version != MULTIFD_VERSION) {
827         error_setg(errp, "multifd: received packet "
828                    "version %d and expected version %d",
829                    packet->version, MULTIFD_VERSION);
830         return -1;
831     }
832 
833     p->flags = be32_to_cpu(packet->flags);
834 
835     packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
836     /*
837      * If we received a packet that is 100 times bigger than expected
838      * just stop migration.  It is a magic number.
839      */
840     if (packet->pages_alloc > pages_max * 100) {
841         error_setg(errp, "multifd: received packet "
842                    "with size %d and expected a maximum size of %d",
843                    packet->pages_alloc, pages_max * 100) ;
844         return -1;
845     }
846     /*
847      * We received a packet that is bigger than expected but inside
848      * reasonable limits (see previous comment).  Just reallocate.
849      */
850     if (packet->pages_alloc > p->pages->allocated) {
851         multifd_pages_clear(p->pages);
852         p->pages = multifd_pages_init(packet->pages_alloc);
853     }
854 
855     p->pages->used = be32_to_cpu(packet->pages_used);
856     if (p->pages->used > packet->pages_alloc) {
857         error_setg(errp, "multifd: received packet "
858                    "with %d pages and expected maximum pages are %d",
859                    p->pages->used, packet->pages_alloc) ;
860         return -1;
861     }
862 
863     p->next_packet_size = be32_to_cpu(packet->next_packet_size);
864     p->packet_num = be64_to_cpu(packet->packet_num);
865 
866     if (p->pages->used == 0) {
867         return 0;
868     }
869 
870     /* make sure that ramblock is 0 terminated */
871     packet->ramblock[255] = 0;
872     block = qemu_ram_block_by_name(packet->ramblock);
873     if (!block) {
874         error_setg(errp, "multifd: unknown ram block %s",
875                    packet->ramblock);
876         return -1;
877     }
878 
879     for (i = 0; i < p->pages->used; i++) {
880         ram_addr_t offset = be64_to_cpu(packet->offset[i]);
881 
882         if (offset > (block->used_length - TARGET_PAGE_SIZE)) {
883             error_setg(errp, "multifd: offset too long " RAM_ADDR_FMT
884                        " (max " RAM_ADDR_FMT ")",
885                        offset, block->max_length);
886             return -1;
887         }
888         p->pages->iov[i].iov_base = block->host + offset;
889         p->pages->iov[i].iov_len = TARGET_PAGE_SIZE;
890     }
891 
892     return 0;
893 }
894 
895 struct {
896     MultiFDSendParams *params;
897     /* array of pages to sent */
898     MultiFDPages_t *pages;
899     /* global number of generated multifd packets */
900     uint64_t packet_num;
901     /* send channels ready */
902     QemuSemaphore channels_ready;
903     /*
904      * Have we already run terminate threads.  There is a race when it
905      * happens that we got one error while we are exiting.
906      * We will use atomic operations.  Only valid values are 0 and 1.
907      */
908     int exiting;
909 } *multifd_send_state;
910 
911 /*
912  * How we use multifd_send_state->pages and channel->pages?
913  *
914  * We create a pages for each channel, and a main one.  Each time that
915  * we need to send a batch of pages we interchange the ones between
916  * multifd_send_state and the channel that is sending it.  There are
917  * two reasons for that:
918  *    - to not have to do so many mallocs during migration
919  *    - to make easier to know what to free at the end of migration
920  *
921  * This way we always know who is the owner of each "pages" struct,
922  * and we don't need any locking.  It belongs to the migration thread
923  * or to the channel thread.  Switching is safe because the migration
924  * thread is using the channel mutex when changing it, and the channel
925  * have to had finish with its own, otherwise pending_job can't be
926  * false.
927  */
928 
929 static int multifd_send_pages(RAMState *rs)
930 {
931     int i;
932     static int next_channel;
933     MultiFDSendParams *p = NULL; /* make happy gcc */
934     MultiFDPages_t *pages = multifd_send_state->pages;
935     uint64_t transferred;
936 
937     if (atomic_read(&multifd_send_state->exiting)) {
938         return -1;
939     }
940 
941     qemu_sem_wait(&multifd_send_state->channels_ready);
942     for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
943         p = &multifd_send_state->params[i];
944 
945         qemu_mutex_lock(&p->mutex);
946         if (p->quit) {
947             error_report("%s: channel %d has already quit!", __func__, i);
948             qemu_mutex_unlock(&p->mutex);
949             return -1;
950         }
951         if (!p->pending_job) {
952             p->pending_job++;
953             next_channel = (i + 1) % migrate_multifd_channels();
954             break;
955         }
956         qemu_mutex_unlock(&p->mutex);
957     }
958     assert(!p->pages->used);
959     assert(!p->pages->block);
960 
961     p->packet_num = multifd_send_state->packet_num++;
962     multifd_send_state->pages = p->pages;
963     p->pages = pages;
964     transferred = ((uint64_t) pages->used) * TARGET_PAGE_SIZE + p->packet_len;
965     qemu_file_update_transfer(rs->f, transferred);
966     ram_counters.multifd_bytes += transferred;
967     ram_counters.transferred += transferred;;
968     qemu_mutex_unlock(&p->mutex);
969     qemu_sem_post(&p->sem);
970 
971     return 1;
972 }
973 
974 static int multifd_queue_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
975 {
976     MultiFDPages_t *pages = multifd_send_state->pages;
977 
978     if (!pages->block) {
979         pages->block = block;
980     }
981 
982     if (pages->block == block) {
983         pages->offset[pages->used] = offset;
984         pages->iov[pages->used].iov_base = block->host + offset;
985         pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
986         pages->used++;
987 
988         if (pages->used < pages->allocated) {
989             return 1;
990         }
991     }
992 
993     if (multifd_send_pages(rs) < 0) {
994         return -1;
995     }
996 
997     if (pages->block != block) {
998         return  multifd_queue_page(rs, block, offset);
999     }
1000 
1001     return 1;
1002 }
1003 
1004 static void multifd_send_terminate_threads(Error *err)
1005 {
1006     int i;
1007 
1008     trace_multifd_send_terminate_threads(err != NULL);
1009 
1010     if (err) {
1011         MigrationState *s = migrate_get_current();
1012         migrate_set_error(s, err);
1013         if (s->state == MIGRATION_STATUS_SETUP ||
1014             s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
1015             s->state == MIGRATION_STATUS_DEVICE ||
1016             s->state == MIGRATION_STATUS_ACTIVE) {
1017             migrate_set_state(&s->state, s->state,
1018                               MIGRATION_STATUS_FAILED);
1019         }
1020     }
1021 
1022     /*
1023      * We don't want to exit each threads twice.  Depending on where
1024      * we get the error, or if there are two independent errors in two
1025      * threads at the same time, we can end calling this function
1026      * twice.
1027      */
1028     if (atomic_xchg(&multifd_send_state->exiting, 1)) {
1029         return;
1030     }
1031 
1032     for (i = 0; i < migrate_multifd_channels(); i++) {
1033         MultiFDSendParams *p = &multifd_send_state->params[i];
1034 
1035         qemu_mutex_lock(&p->mutex);
1036         p->quit = true;
1037         qemu_sem_post(&p->sem);
1038         qemu_mutex_unlock(&p->mutex);
1039     }
1040 }
1041 
1042 void multifd_save_cleanup(void)
1043 {
1044     int i;
1045 
1046     if (!migrate_use_multifd()) {
1047         return;
1048     }
1049     multifd_send_terminate_threads(NULL);
1050     for (i = 0; i < migrate_multifd_channels(); i++) {
1051         MultiFDSendParams *p = &multifd_send_state->params[i];
1052 
1053         if (p->running) {
1054             qemu_thread_join(&p->thread);
1055         }
1056     }
1057     for (i = 0; i < migrate_multifd_channels(); i++) {
1058         MultiFDSendParams *p = &multifd_send_state->params[i];
1059 
1060         socket_send_channel_destroy(p->c);
1061         p->c = NULL;
1062         qemu_mutex_destroy(&p->mutex);
1063         qemu_sem_destroy(&p->sem);
1064         qemu_sem_destroy(&p->sem_sync);
1065         g_free(p->name);
1066         p->name = NULL;
1067         multifd_pages_clear(p->pages);
1068         p->pages = NULL;
1069         p->packet_len = 0;
1070         g_free(p->packet);
1071         p->packet = NULL;
1072     }
1073     qemu_sem_destroy(&multifd_send_state->channels_ready);
1074     g_free(multifd_send_state->params);
1075     multifd_send_state->params = NULL;
1076     multifd_pages_clear(multifd_send_state->pages);
1077     multifd_send_state->pages = NULL;
1078     g_free(multifd_send_state);
1079     multifd_send_state = NULL;
1080 }
1081 
1082 static void multifd_send_sync_main(RAMState *rs)
1083 {
1084     int i;
1085 
1086     if (!migrate_use_multifd()) {
1087         return;
1088     }
1089     if (multifd_send_state->pages->used) {
1090         if (multifd_send_pages(rs) < 0) {
1091             error_report("%s: multifd_send_pages fail", __func__);
1092             return;
1093         }
1094     }
1095     for (i = 0; i < migrate_multifd_channels(); i++) {
1096         MultiFDSendParams *p = &multifd_send_state->params[i];
1097 
1098         trace_multifd_send_sync_main_signal(p->id);
1099 
1100         qemu_mutex_lock(&p->mutex);
1101 
1102         if (p->quit) {
1103             error_report("%s: channel %d has already quit", __func__, i);
1104             qemu_mutex_unlock(&p->mutex);
1105             return;
1106         }
1107 
1108         p->packet_num = multifd_send_state->packet_num++;
1109         p->flags |= MULTIFD_FLAG_SYNC;
1110         p->pending_job++;
1111         qemu_file_update_transfer(rs->f, p->packet_len);
1112         ram_counters.multifd_bytes += p->packet_len;
1113         ram_counters.transferred += p->packet_len;
1114         qemu_mutex_unlock(&p->mutex);
1115         qemu_sem_post(&p->sem);
1116     }
1117     for (i = 0; i < migrate_multifd_channels(); i++) {
1118         MultiFDSendParams *p = &multifd_send_state->params[i];
1119 
1120         trace_multifd_send_sync_main_wait(p->id);
1121         qemu_sem_wait(&p->sem_sync);
1122     }
1123     trace_multifd_send_sync_main(multifd_send_state->packet_num);
1124 }
1125 
1126 static void *multifd_send_thread(void *opaque)
1127 {
1128     MultiFDSendParams *p = opaque;
1129     Error *local_err = NULL;
1130     int ret = 0;
1131     uint32_t flags = 0;
1132 
1133     trace_multifd_send_thread_start(p->id);
1134     rcu_register_thread();
1135 
1136     if (multifd_send_initial_packet(p, &local_err) < 0) {
1137         ret = -1;
1138         goto out;
1139     }
1140     /* initial packet */
1141     p->num_packets = 1;
1142 
1143     while (true) {
1144         qemu_sem_wait(&p->sem);
1145 
1146         if (atomic_read(&multifd_send_state->exiting)) {
1147             break;
1148         }
1149         qemu_mutex_lock(&p->mutex);
1150 
1151         if (p->pending_job) {
1152             uint32_t used = p->pages->used;
1153             uint64_t packet_num = p->packet_num;
1154             flags = p->flags;
1155 
1156             p->next_packet_size = used * qemu_target_page_size();
1157             multifd_send_fill_packet(p);
1158             p->flags = 0;
1159             p->num_packets++;
1160             p->num_pages += used;
1161             p->pages->used = 0;
1162             p->pages->block = NULL;
1163             qemu_mutex_unlock(&p->mutex);
1164 
1165             trace_multifd_send(p->id, packet_num, used, flags,
1166                                p->next_packet_size);
1167 
1168             ret = qio_channel_write_all(p->c, (void *)p->packet,
1169                                         p->packet_len, &local_err);
1170             if (ret != 0) {
1171                 break;
1172             }
1173 
1174             if (used) {
1175                 ret = qio_channel_writev_all(p->c, p->pages->iov,
1176                                              used, &local_err);
1177                 if (ret != 0) {
1178                     break;
1179                 }
1180             }
1181 
1182             qemu_mutex_lock(&p->mutex);
1183             p->pending_job--;
1184             qemu_mutex_unlock(&p->mutex);
1185 
1186             if (flags & MULTIFD_FLAG_SYNC) {
1187                 qemu_sem_post(&p->sem_sync);
1188             }
1189             qemu_sem_post(&multifd_send_state->channels_ready);
1190         } else if (p->quit) {
1191             qemu_mutex_unlock(&p->mutex);
1192             break;
1193         } else {
1194             qemu_mutex_unlock(&p->mutex);
1195             /* sometimes there are spurious wakeups */
1196         }
1197     }
1198 
1199 out:
1200     if (local_err) {
1201         trace_multifd_send_error(p->id);
1202         multifd_send_terminate_threads(local_err);
1203     }
1204 
1205     /*
1206      * Error happen, I will exit, but I can't just leave, tell
1207      * who pay attention to me.
1208      */
1209     if (ret != 0) {
1210         qemu_sem_post(&p->sem_sync);
1211         qemu_sem_post(&multifd_send_state->channels_ready);
1212     }
1213 
1214     qemu_mutex_lock(&p->mutex);
1215     p->running = false;
1216     qemu_mutex_unlock(&p->mutex);
1217 
1218     rcu_unregister_thread();
1219     trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
1220 
1221     return NULL;
1222 }
1223 
1224 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1225 {
1226     MultiFDSendParams *p = opaque;
1227     QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
1228     Error *local_err = NULL;
1229 
1230     trace_multifd_new_send_channel_async(p->id);
1231     if (qio_task_propagate_error(task, &local_err)) {
1232         migrate_set_error(migrate_get_current(), local_err);
1233         multifd_save_cleanup();
1234     } else {
1235         p->c = QIO_CHANNEL(sioc);
1236         qio_channel_set_delay(p->c, false);
1237         p->running = true;
1238         qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1239                            QEMU_THREAD_JOINABLE);
1240     }
1241 }
1242 
1243 int multifd_save_setup(void)
1244 {
1245     int thread_count;
1246     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1247     uint8_t i;
1248 
1249     if (!migrate_use_multifd()) {
1250         return 0;
1251     }
1252     thread_count = migrate_multifd_channels();
1253     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1254     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1255     multifd_send_state->pages = multifd_pages_init(page_count);
1256     qemu_sem_init(&multifd_send_state->channels_ready, 0);
1257     atomic_set(&multifd_send_state->exiting, 0);
1258 
1259     for (i = 0; i < thread_count; i++) {
1260         MultiFDSendParams *p = &multifd_send_state->params[i];
1261 
1262         qemu_mutex_init(&p->mutex);
1263         qemu_sem_init(&p->sem, 0);
1264         qemu_sem_init(&p->sem_sync, 0);
1265         p->quit = false;
1266         p->pending_job = 0;
1267         p->id = i;
1268         p->pages = multifd_pages_init(page_count);
1269         p->packet_len = sizeof(MultiFDPacket_t)
1270                       + sizeof(ram_addr_t) * page_count;
1271         p->packet = g_malloc0(p->packet_len);
1272         p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1273         p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1274         p->name = g_strdup_printf("multifdsend_%d", i);
1275         socket_send_channel_create(multifd_new_send_channel_async, p);
1276     }
1277     return 0;
1278 }
1279 
1280 struct {
1281     MultiFDRecvParams *params;
1282     /* number of created threads */
1283     int count;
1284     /* syncs main thread and channels */
1285     QemuSemaphore sem_sync;
1286     /* global number of generated multifd packets */
1287     uint64_t packet_num;
1288 } *multifd_recv_state;
1289 
1290 static void multifd_recv_terminate_threads(Error *err)
1291 {
1292     int i;
1293 
1294     trace_multifd_recv_terminate_threads(err != NULL);
1295 
1296     if (err) {
1297         MigrationState *s = migrate_get_current();
1298         migrate_set_error(s, err);
1299         if (s->state == MIGRATION_STATUS_SETUP ||
1300             s->state == MIGRATION_STATUS_ACTIVE) {
1301             migrate_set_state(&s->state, s->state,
1302                               MIGRATION_STATUS_FAILED);
1303         }
1304     }
1305 
1306     for (i = 0; i < migrate_multifd_channels(); i++) {
1307         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1308 
1309         qemu_mutex_lock(&p->mutex);
1310         p->quit = true;
1311         /* We could arrive here for two reasons:
1312            - normal quit, i.e. everything went fine, just finished
1313            - error quit: We close the channels so the channel threads
1314              finish the qio_channel_read_all_eof() */
1315         if (p->c) {
1316             qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1317         }
1318         qemu_mutex_unlock(&p->mutex);
1319     }
1320 }
1321 
1322 int multifd_load_cleanup(Error **errp)
1323 {
1324     int i;
1325     int ret = 0;
1326 
1327     if (!migrate_use_multifd()) {
1328         return 0;
1329     }
1330     multifd_recv_terminate_threads(NULL);
1331     for (i = 0; i < migrate_multifd_channels(); i++) {
1332         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1333 
1334         if (p->running) {
1335             p->quit = true;
1336             /*
1337              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1338              * however try to wakeup it without harm in cleanup phase.
1339              */
1340             qemu_sem_post(&p->sem_sync);
1341             qemu_thread_join(&p->thread);
1342         }
1343     }
1344     for (i = 0; i < migrate_multifd_channels(); i++) {
1345         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1346 
1347         object_unref(OBJECT(p->c));
1348         p->c = NULL;
1349         qemu_mutex_destroy(&p->mutex);
1350         qemu_sem_destroy(&p->sem_sync);
1351         g_free(p->name);
1352         p->name = NULL;
1353         multifd_pages_clear(p->pages);
1354         p->pages = NULL;
1355         p->packet_len = 0;
1356         g_free(p->packet);
1357         p->packet = NULL;
1358     }
1359     qemu_sem_destroy(&multifd_recv_state->sem_sync);
1360     g_free(multifd_recv_state->params);
1361     multifd_recv_state->params = NULL;
1362     g_free(multifd_recv_state);
1363     multifd_recv_state = NULL;
1364 
1365     return ret;
1366 }
1367 
1368 static void multifd_recv_sync_main(void)
1369 {
1370     int i;
1371 
1372     if (!migrate_use_multifd()) {
1373         return;
1374     }
1375     for (i = 0; i < migrate_multifd_channels(); i++) {
1376         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1377 
1378         trace_multifd_recv_sync_main_wait(p->id);
1379         qemu_sem_wait(&multifd_recv_state->sem_sync);
1380     }
1381     for (i = 0; i < migrate_multifd_channels(); i++) {
1382         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1383 
1384         qemu_mutex_lock(&p->mutex);
1385         if (multifd_recv_state->packet_num < p->packet_num) {
1386             multifd_recv_state->packet_num = p->packet_num;
1387         }
1388         qemu_mutex_unlock(&p->mutex);
1389         trace_multifd_recv_sync_main_signal(p->id);
1390         qemu_sem_post(&p->sem_sync);
1391     }
1392     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1393 }
1394 
1395 static void *multifd_recv_thread(void *opaque)
1396 {
1397     MultiFDRecvParams *p = opaque;
1398     Error *local_err = NULL;
1399     int ret;
1400 
1401     trace_multifd_recv_thread_start(p->id);
1402     rcu_register_thread();
1403 
1404     while (true) {
1405         uint32_t used;
1406         uint32_t flags;
1407 
1408         if (p->quit) {
1409             break;
1410         }
1411 
1412         ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1413                                        p->packet_len, &local_err);
1414         if (ret == 0) {   /* EOF */
1415             break;
1416         }
1417         if (ret == -1) {   /* Error */
1418             break;
1419         }
1420 
1421         qemu_mutex_lock(&p->mutex);
1422         ret = multifd_recv_unfill_packet(p, &local_err);
1423         if (ret) {
1424             qemu_mutex_unlock(&p->mutex);
1425             break;
1426         }
1427 
1428         used = p->pages->used;
1429         flags = p->flags;
1430         trace_multifd_recv(p->id, p->packet_num, used, flags,
1431                            p->next_packet_size);
1432         p->num_packets++;
1433         p->num_pages += used;
1434         qemu_mutex_unlock(&p->mutex);
1435 
1436         if (used) {
1437             ret = qio_channel_readv_all(p->c, p->pages->iov,
1438                                         used, &local_err);
1439             if (ret != 0) {
1440                 break;
1441             }
1442         }
1443 
1444         if (flags & MULTIFD_FLAG_SYNC) {
1445             qemu_sem_post(&multifd_recv_state->sem_sync);
1446             qemu_sem_wait(&p->sem_sync);
1447         }
1448     }
1449 
1450     if (local_err) {
1451         multifd_recv_terminate_threads(local_err);
1452     }
1453     qemu_mutex_lock(&p->mutex);
1454     p->running = false;
1455     qemu_mutex_unlock(&p->mutex);
1456 
1457     rcu_unregister_thread();
1458     trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages);
1459 
1460     return NULL;
1461 }
1462 
1463 int multifd_load_setup(void)
1464 {
1465     int thread_count;
1466     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1467     uint8_t i;
1468 
1469     if (!migrate_use_multifd()) {
1470         return 0;
1471     }
1472     thread_count = migrate_multifd_channels();
1473     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1474     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1475     atomic_set(&multifd_recv_state->count, 0);
1476     qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1477 
1478     for (i = 0; i < thread_count; i++) {
1479         MultiFDRecvParams *p = &multifd_recv_state->params[i];
1480 
1481         qemu_mutex_init(&p->mutex);
1482         qemu_sem_init(&p->sem_sync, 0);
1483         p->quit = false;
1484         p->id = i;
1485         p->pages = multifd_pages_init(page_count);
1486         p->packet_len = sizeof(MultiFDPacket_t)
1487                       + sizeof(ram_addr_t) * page_count;
1488         p->packet = g_malloc0(p->packet_len);
1489         p->name = g_strdup_printf("multifdrecv_%d", i);
1490     }
1491     return 0;
1492 }
1493 
1494 bool multifd_recv_all_channels_created(void)
1495 {
1496     int thread_count = migrate_multifd_channels();
1497 
1498     if (!migrate_use_multifd()) {
1499         return true;
1500     }
1501 
1502     return thread_count == atomic_read(&multifd_recv_state->count);
1503 }
1504 
1505 /*
1506  * Try to receive all multifd channels to get ready for the migration.
1507  * - Return true and do not set @errp when correctly receving all channels;
1508  * - Return false and do not set @errp when correctly receiving the current one;
1509  * - Return false and set @errp when failing to receive the current channel.
1510  */
1511 bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1512 {
1513     MultiFDRecvParams *p;
1514     Error *local_err = NULL;
1515     int id;
1516 
1517     id = multifd_recv_initial_packet(ioc, &local_err);
1518     if (id < 0) {
1519         multifd_recv_terminate_threads(local_err);
1520         error_propagate_prepend(errp, local_err,
1521                                 "failed to receive packet"
1522                                 " via multifd channel %d: ",
1523                                 atomic_read(&multifd_recv_state->count));
1524         return false;
1525     }
1526     trace_multifd_recv_new_channel(id);
1527 
1528     p = &multifd_recv_state->params[id];
1529     if (p->c != NULL) {
1530         error_setg(&local_err, "multifd: received id '%d' already setup'",
1531                    id);
1532         multifd_recv_terminate_threads(local_err);
1533         error_propagate(errp, local_err);
1534         return false;
1535     }
1536     p->c = ioc;
1537     object_ref(OBJECT(ioc));
1538     /* initial packet */
1539     p->num_packets = 1;
1540 
1541     p->running = true;
1542     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1543                        QEMU_THREAD_JOINABLE);
1544     atomic_inc(&multifd_recv_state->count);
1545     return atomic_read(&multifd_recv_state->count) ==
1546            migrate_multifd_channels();
1547 }
1548 
1549 /**
1550  * save_page_header: write page header to wire
1551  *
1552  * If this is the 1st block, it also writes the block identification
1553  *
1554  * Returns the number of bytes written
1555  *
1556  * @f: QEMUFile where to send the data
1557  * @block: block that contains the page we want to send
1558  * @offset: offset inside the block for the page
1559  *          in the lower bits, it contains flags
1560  */
1561 static size_t save_page_header(RAMState *rs, QEMUFile *f,  RAMBlock *block,
1562                                ram_addr_t offset)
1563 {
1564     size_t size, len;
1565 
1566     if (block == rs->last_sent_block) {
1567         offset |= RAM_SAVE_FLAG_CONTINUE;
1568     }
1569     qemu_put_be64(f, offset);
1570     size = 8;
1571 
1572     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
1573         len = strlen(block->idstr);
1574         qemu_put_byte(f, len);
1575         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
1576         size += 1 + len;
1577         rs->last_sent_block = block;
1578     }
1579     return size;
1580 }
1581 
1582 /**
1583  * mig_throttle_guest_down: throotle down the guest
1584  *
1585  * Reduce amount of guest cpu execution to hopefully slow down memory
1586  * writes. If guest dirty memory rate is reduced below the rate at
1587  * which we can transfer pages to the destination then we should be
1588  * able to complete migration. Some workloads dirty memory way too
1589  * fast and will not effectively converge, even with auto-converge.
1590  */
1591 static void mig_throttle_guest_down(void)
1592 {
1593     MigrationState *s = migrate_get_current();
1594     uint64_t pct_initial = s->parameters.cpu_throttle_initial;
1595     uint64_t pct_icrement = s->parameters.cpu_throttle_increment;
1596     int pct_max = s->parameters.max_cpu_throttle;
1597 
1598     /* We have not started throttling yet. Let's start it. */
1599     if (!cpu_throttle_active()) {
1600         cpu_throttle_set(pct_initial);
1601     } else {
1602         /* Throttling already on, just increase the rate */
1603         cpu_throttle_set(MIN(cpu_throttle_get_percentage() + pct_icrement,
1604                          pct_max));
1605     }
1606 }
1607 
1608 /**
1609  * xbzrle_cache_zero_page: insert a zero page in the XBZRLE cache
1610  *
1611  * @rs: current RAM state
1612  * @current_addr: address for the zero page
1613  *
1614  * Update the xbzrle cache to reflect a page that's been sent as all 0.
1615  * The important thing is that a stale (not-yet-0'd) page be replaced
1616  * by the new data.
1617  * As a bonus, if the page wasn't in the cache it gets added so that
1618  * when a small write is made into the 0'd page it gets XBZRLE sent.
1619  */
1620 static void xbzrle_cache_zero_page(RAMState *rs, ram_addr_t current_addr)
1621 {
1622     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
1623         return;
1624     }
1625 
1626     /* We don't care if this fails to allocate a new cache page
1627      * as long as it updated an old one */
1628     cache_insert(XBZRLE.cache, current_addr, XBZRLE.zero_target_page,
1629                  ram_counters.dirty_sync_count);
1630 }
1631 
1632 #define ENCODING_FLAG_XBZRLE 0x1
1633 
1634 /**
1635  * save_xbzrle_page: compress and send current page
1636  *
1637  * Returns: 1 means that we wrote the page
1638  *          0 means that page is identical to the one already sent
1639  *          -1 means that xbzrle would be longer than normal
1640  *
1641  * @rs: current RAM state
1642  * @current_data: pointer to the address of the page contents
1643  * @current_addr: addr of the page
1644  * @block: block that contains the page we want to send
1645  * @offset: offset inside the block for the page
1646  * @last_stage: if we are at the completion stage
1647  */
1648 static int save_xbzrle_page(RAMState *rs, uint8_t **current_data,
1649                             ram_addr_t current_addr, RAMBlock *block,
1650                             ram_addr_t offset, bool last_stage)
1651 {
1652     int encoded_len = 0, bytes_xbzrle;
1653     uint8_t *prev_cached_page;
1654 
1655     if (!cache_is_cached(XBZRLE.cache, current_addr,
1656                          ram_counters.dirty_sync_count)) {
1657         xbzrle_counters.cache_miss++;
1658         if (!last_stage) {
1659             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
1660                              ram_counters.dirty_sync_count) == -1) {
1661                 return -1;
1662             } else {
1663                 /* update *current_data when the page has been
1664                    inserted into cache */
1665                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
1666             }
1667         }
1668         return -1;
1669     }
1670 
1671     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
1672 
1673     /* save current buffer into memory */
1674     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
1675 
1676     /* XBZRLE encoding (if there is no overflow) */
1677     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
1678                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
1679                                        TARGET_PAGE_SIZE);
1680 
1681     /*
1682      * Update the cache contents, so that it corresponds to the data
1683      * sent, in all cases except where we skip the page.
1684      */
1685     if (!last_stage && encoded_len != 0) {
1686         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
1687         /*
1688          * In the case where we couldn't compress, ensure that the caller
1689          * sends the data from the cache, since the guest might have
1690          * changed the RAM since we copied it.
1691          */
1692         *current_data = prev_cached_page;
1693     }
1694 
1695     if (encoded_len == 0) {
1696         trace_save_xbzrle_page_skipping();
1697         return 0;
1698     } else if (encoded_len == -1) {
1699         trace_save_xbzrle_page_overflow();
1700         xbzrle_counters.overflow++;
1701         return -1;
1702     }
1703 
1704     /* Send XBZRLE based compressed page */
1705     bytes_xbzrle = save_page_header(rs, rs->f, block,
1706                                     offset | RAM_SAVE_FLAG_XBZRLE);
1707     qemu_put_byte(rs->f, ENCODING_FLAG_XBZRLE);
1708     qemu_put_be16(rs->f, encoded_len);
1709     qemu_put_buffer(rs->f, XBZRLE.encoded_buf, encoded_len);
1710     bytes_xbzrle += encoded_len + 1 + 2;
1711     xbzrle_counters.pages++;
1712     xbzrle_counters.bytes += bytes_xbzrle;
1713     ram_counters.transferred += bytes_xbzrle;
1714 
1715     return 1;
1716 }
1717 
1718 /**
1719  * migration_bitmap_find_dirty: find the next dirty page from start
1720  *
1721  * Returns the page offset within memory region of the start of a dirty page
1722  *
1723  * @rs: current RAM state
1724  * @rb: RAMBlock where to search for dirty pages
1725  * @start: page where we start the search
1726  */
1727 static inline
1728 unsigned long migration_bitmap_find_dirty(RAMState *rs, RAMBlock *rb,
1729                                           unsigned long start)
1730 {
1731     unsigned long size = rb->used_length >> TARGET_PAGE_BITS;
1732     unsigned long *bitmap = rb->bmap;
1733     unsigned long next;
1734 
1735     if (ramblock_is_ignored(rb)) {
1736         return size;
1737     }
1738 
1739     /*
1740      * When the free page optimization is enabled, we need to check the bitmap
1741      * to send the non-free pages rather than all the pages in the bulk stage.
1742      */
1743     if (!rs->fpo_enabled && rs->ram_bulk_stage && start > 0) {
1744         next = start + 1;
1745     } else {
1746         next = find_next_bit(bitmap, size, start);
1747     }
1748 
1749     return next;
1750 }
1751 
1752 static inline bool migration_bitmap_clear_dirty(RAMState *rs,
1753                                                 RAMBlock *rb,
1754                                                 unsigned long page)
1755 {
1756     bool ret;
1757 
1758     qemu_mutex_lock(&rs->bitmap_mutex);
1759 
1760     /*
1761      * Clear dirty bitmap if needed.  This _must_ be called before we
1762      * send any of the page in the chunk because we need to make sure
1763      * we can capture further page content changes when we sync dirty
1764      * log the next time.  So as long as we are going to send any of
1765      * the page in the chunk we clear the remote dirty bitmap for all.
1766      * Clearing it earlier won't be a problem, but too late will.
1767      */
1768     if (rb->clear_bmap && clear_bmap_test_and_clear(rb, page)) {
1769         uint8_t shift = rb->clear_bmap_shift;
1770         hwaddr size = 1ULL << (TARGET_PAGE_BITS + shift);
1771         hwaddr start = (((ram_addr_t)page) << TARGET_PAGE_BITS) & (-size);
1772 
1773         /*
1774          * CLEAR_BITMAP_SHIFT_MIN should always guarantee this... this
1775          * can make things easier sometimes since then start address
1776          * of the small chunk will always be 64 pages aligned so the
1777          * bitmap will always be aligned to unsigned long.  We should
1778          * even be able to remove this restriction but I'm simply
1779          * keeping it.
1780          */
1781         assert(shift >= 6);
1782         trace_migration_bitmap_clear_dirty(rb->idstr, start, size, page);
1783         memory_region_clear_dirty_bitmap(rb->mr, start, size);
1784     }
1785 
1786     ret = test_and_clear_bit(page, rb->bmap);
1787 
1788     if (ret) {
1789         rs->migration_dirty_pages--;
1790     }
1791     qemu_mutex_unlock(&rs->bitmap_mutex);
1792 
1793     return ret;
1794 }
1795 
1796 /* Called with RCU critical section */
1797 static void ramblock_sync_dirty_bitmap(RAMState *rs, RAMBlock *rb)
1798 {
1799     rs->migration_dirty_pages +=
1800         cpu_physical_memory_sync_dirty_bitmap(rb, 0, rb->used_length,
1801                                               &rs->num_dirty_pages_period);
1802 }
1803 
1804 /**
1805  * ram_pagesize_summary: calculate all the pagesizes of a VM
1806  *
1807  * Returns a summary bitmap of the page sizes of all RAMBlocks
1808  *
1809  * For VMs with just normal pages this is equivalent to the host page
1810  * size. If it's got some huge pages then it's the OR of all the
1811  * different page sizes.
1812  */
1813 uint64_t ram_pagesize_summary(void)
1814 {
1815     RAMBlock *block;
1816     uint64_t summary = 0;
1817 
1818     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1819         summary |= block->page_size;
1820     }
1821 
1822     return summary;
1823 }
1824 
1825 uint64_t ram_get_total_transferred_pages(void)
1826 {
1827     return  ram_counters.normal + ram_counters.duplicate +
1828                 compression_counters.pages + xbzrle_counters.pages;
1829 }
1830 
1831 static void migration_update_rates(RAMState *rs, int64_t end_time)
1832 {
1833     uint64_t page_count = rs->target_page_count - rs->target_page_count_prev;
1834     double compressed_size;
1835 
1836     /* calculate period counters */
1837     ram_counters.dirty_pages_rate = rs->num_dirty_pages_period * 1000
1838                 / (end_time - rs->time_last_bitmap_sync);
1839 
1840     if (!page_count) {
1841         return;
1842     }
1843 
1844     if (migrate_use_xbzrle()) {
1845         xbzrle_counters.cache_miss_rate = (double)(xbzrle_counters.cache_miss -
1846             rs->xbzrle_cache_miss_prev) / page_count;
1847         rs->xbzrle_cache_miss_prev = xbzrle_counters.cache_miss;
1848     }
1849 
1850     if (migrate_use_compression()) {
1851         compression_counters.busy_rate = (double)(compression_counters.busy -
1852             rs->compress_thread_busy_prev) / page_count;
1853         rs->compress_thread_busy_prev = compression_counters.busy;
1854 
1855         compressed_size = compression_counters.compressed_size -
1856                           rs->compressed_size_prev;
1857         if (compressed_size) {
1858             double uncompressed_size = (compression_counters.pages -
1859                                     rs->compress_pages_prev) * TARGET_PAGE_SIZE;
1860 
1861             /* Compression-Ratio = Uncompressed-size / Compressed-size */
1862             compression_counters.compression_rate =
1863                                         uncompressed_size / compressed_size;
1864 
1865             rs->compress_pages_prev = compression_counters.pages;
1866             rs->compressed_size_prev = compression_counters.compressed_size;
1867         }
1868     }
1869 }
1870 
1871 static void migration_bitmap_sync(RAMState *rs)
1872 {
1873     RAMBlock *block;
1874     int64_t end_time;
1875     uint64_t bytes_xfer_now;
1876 
1877     ram_counters.dirty_sync_count++;
1878 
1879     if (!rs->time_last_bitmap_sync) {
1880         rs->time_last_bitmap_sync = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1881     }
1882 
1883     trace_migration_bitmap_sync_start();
1884     memory_global_dirty_log_sync();
1885 
1886     qemu_mutex_lock(&rs->bitmap_mutex);
1887     WITH_RCU_READ_LOCK_GUARD() {
1888         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
1889             ramblock_sync_dirty_bitmap(rs, block);
1890         }
1891         ram_counters.remaining = ram_bytes_remaining();
1892     }
1893     qemu_mutex_unlock(&rs->bitmap_mutex);
1894 
1895     memory_global_after_dirty_log_sync();
1896     trace_migration_bitmap_sync_end(rs->num_dirty_pages_period);
1897 
1898     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
1899 
1900     /* more than 1 second = 1000 millisecons */
1901     if (end_time > rs->time_last_bitmap_sync + 1000) {
1902         bytes_xfer_now = ram_counters.transferred;
1903 
1904         /* During block migration the auto-converge logic incorrectly detects
1905          * that ram migration makes no progress. Avoid this by disabling the
1906          * throttling logic during the bulk phase of block migration. */
1907         if (migrate_auto_converge() && !blk_mig_bulk_active()) {
1908             /* The following detection logic can be refined later. For now:
1909                Check to see if the dirtied bytes is 50% more than the approx.
1910                amount of bytes that just got transferred since the last time we
1911                were in this routine. If that happens twice, start or increase
1912                throttling */
1913 
1914             if ((rs->num_dirty_pages_period * TARGET_PAGE_SIZE >
1915                    (bytes_xfer_now - rs->bytes_xfer_prev) / 2) &&
1916                 (++rs->dirty_rate_high_cnt >= 2)) {
1917                     trace_migration_throttle();
1918                     rs->dirty_rate_high_cnt = 0;
1919                     mig_throttle_guest_down();
1920             }
1921         }
1922 
1923         migration_update_rates(rs, end_time);
1924 
1925         rs->target_page_count_prev = rs->target_page_count;
1926 
1927         /* reset period counters */
1928         rs->time_last_bitmap_sync = end_time;
1929         rs->num_dirty_pages_period = 0;
1930         rs->bytes_xfer_prev = bytes_xfer_now;
1931     }
1932     if (migrate_use_events()) {
1933         qapi_event_send_migration_pass(ram_counters.dirty_sync_count);
1934     }
1935 }
1936 
1937 static void migration_bitmap_sync_precopy(RAMState *rs)
1938 {
1939     Error *local_err = NULL;
1940 
1941     /*
1942      * The current notifier usage is just an optimization to migration, so we
1943      * don't stop the normal migration process in the error case.
1944      */
1945     if (precopy_notify(PRECOPY_NOTIFY_BEFORE_BITMAP_SYNC, &local_err)) {
1946         error_report_err(local_err);
1947     }
1948 
1949     migration_bitmap_sync(rs);
1950 
1951     if (precopy_notify(PRECOPY_NOTIFY_AFTER_BITMAP_SYNC, &local_err)) {
1952         error_report_err(local_err);
1953     }
1954 }
1955 
1956 /**
1957  * save_zero_page_to_file: send the zero page to the file
1958  *
1959  * Returns the size of data written to the file, 0 means the page is not
1960  * a zero page
1961  *
1962  * @rs: current RAM state
1963  * @file: the file where the data is saved
1964  * @block: block that contains the page we want to send
1965  * @offset: offset inside the block for the page
1966  */
1967 static int save_zero_page_to_file(RAMState *rs, QEMUFile *file,
1968                                   RAMBlock *block, ram_addr_t offset)
1969 {
1970     uint8_t *p = block->host + offset;
1971     int len = 0;
1972 
1973     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
1974         len += save_page_header(rs, file, block, offset | RAM_SAVE_FLAG_ZERO);
1975         qemu_put_byte(file, 0);
1976         len += 1;
1977     }
1978     return len;
1979 }
1980 
1981 /**
1982  * save_zero_page: send the zero page to the stream
1983  *
1984  * Returns the number of pages written.
1985  *
1986  * @rs: current RAM state
1987  * @block: block that contains the page we want to send
1988  * @offset: offset inside the block for the page
1989  */
1990 static int save_zero_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
1991 {
1992     int len = save_zero_page_to_file(rs, rs->f, block, offset);
1993 
1994     if (len) {
1995         ram_counters.duplicate++;
1996         ram_counters.transferred += len;
1997         return 1;
1998     }
1999     return -1;
2000 }
2001 
2002 static void ram_release_pages(const char *rbname, uint64_t offset, int pages)
2003 {
2004     if (!migrate_release_ram() || !migration_in_postcopy()) {
2005         return;
2006     }
2007 
2008     ram_discard_range(rbname, offset, ((ram_addr_t)pages) << TARGET_PAGE_BITS);
2009 }
2010 
2011 /*
2012  * @pages: the number of pages written by the control path,
2013  *        < 0 - error
2014  *        > 0 - number of pages written
2015  *
2016  * Return true if the pages has been saved, otherwise false is returned.
2017  */
2018 static bool control_save_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
2019                               int *pages)
2020 {
2021     uint64_t bytes_xmit = 0;
2022     int ret;
2023 
2024     *pages = -1;
2025     ret = ram_control_save_page(rs->f, block->offset, offset, TARGET_PAGE_SIZE,
2026                                 &bytes_xmit);
2027     if (ret == RAM_SAVE_CONTROL_NOT_SUPP) {
2028         return false;
2029     }
2030 
2031     if (bytes_xmit) {
2032         ram_counters.transferred += bytes_xmit;
2033         *pages = 1;
2034     }
2035 
2036     if (ret == RAM_SAVE_CONTROL_DELAYED) {
2037         return true;
2038     }
2039 
2040     if (bytes_xmit > 0) {
2041         ram_counters.normal++;
2042     } else if (bytes_xmit == 0) {
2043         ram_counters.duplicate++;
2044     }
2045 
2046     return true;
2047 }
2048 
2049 /*
2050  * directly send the page to the stream
2051  *
2052  * Returns the number of pages written.
2053  *
2054  * @rs: current RAM state
2055  * @block: block that contains the page we want to send
2056  * @offset: offset inside the block for the page
2057  * @buf: the page to be sent
2058  * @async: send to page asyncly
2059  */
2060 static int save_normal_page(RAMState *rs, RAMBlock *block, ram_addr_t offset,
2061                             uint8_t *buf, bool async)
2062 {
2063     ram_counters.transferred += save_page_header(rs, rs->f, block,
2064                                                  offset | RAM_SAVE_FLAG_PAGE);
2065     if (async) {
2066         qemu_put_buffer_async(rs->f, buf, TARGET_PAGE_SIZE,
2067                               migrate_release_ram() &
2068                               migration_in_postcopy());
2069     } else {
2070         qemu_put_buffer(rs->f, buf, TARGET_PAGE_SIZE);
2071     }
2072     ram_counters.transferred += TARGET_PAGE_SIZE;
2073     ram_counters.normal++;
2074     return 1;
2075 }
2076 
2077 /**
2078  * ram_save_page: send the given page to the stream
2079  *
2080  * Returns the number of pages written.
2081  *          < 0 - error
2082  *          >=0 - Number of pages written - this might legally be 0
2083  *                if xbzrle noticed the page was the same.
2084  *
2085  * @rs: current RAM state
2086  * @block: block that contains the page we want to send
2087  * @offset: offset inside the block for the page
2088  * @last_stage: if we are at the completion stage
2089  */
2090 static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
2091 {
2092     int pages = -1;
2093     uint8_t *p;
2094     bool send_async = true;
2095     RAMBlock *block = pss->block;
2096     ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
2097     ram_addr_t current_addr = block->offset + offset;
2098 
2099     p = block->host + offset;
2100     trace_ram_save_page(block->idstr, (uint64_t)offset, p);
2101 
2102     XBZRLE_cache_lock();
2103     if (!rs->ram_bulk_stage && !migration_in_postcopy() &&
2104         migrate_use_xbzrle()) {
2105         pages = save_xbzrle_page(rs, &p, current_addr, block,
2106                                  offset, last_stage);
2107         if (!last_stage) {
2108             /* Can't send this cached data async, since the cache page
2109              * might get updated before it gets to the wire
2110              */
2111             send_async = false;
2112         }
2113     }
2114 
2115     /* XBZRLE overflow or normal page */
2116     if (pages == -1) {
2117         pages = save_normal_page(rs, block, offset, p, send_async);
2118     }
2119 
2120     XBZRLE_cache_unlock();
2121 
2122     return pages;
2123 }
2124 
2125 static int ram_save_multifd_page(RAMState *rs, RAMBlock *block,
2126                                  ram_addr_t offset)
2127 {
2128     if (multifd_queue_page(rs, block, offset) < 0) {
2129         return -1;
2130     }
2131     ram_counters.normal++;
2132 
2133     return 1;
2134 }
2135 
2136 static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
2137                                  ram_addr_t offset, uint8_t *source_buf)
2138 {
2139     RAMState *rs = ram_state;
2140     uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
2141     bool zero_page = false;
2142     int ret;
2143 
2144     if (save_zero_page_to_file(rs, f, block, offset)) {
2145         zero_page = true;
2146         goto exit;
2147     }
2148 
2149     save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
2150 
2151     /*
2152      * copy it to a internal buffer to avoid it being modified by VM
2153      * so that we can catch up the error during compression and
2154      * decompression
2155      */
2156     memcpy(source_buf, p, TARGET_PAGE_SIZE);
2157     ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
2158     if (ret < 0) {
2159         qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
2160         error_report("compressed data failed!");
2161         return false;
2162     }
2163 
2164 exit:
2165     ram_release_pages(block->idstr, offset & TARGET_PAGE_MASK, 1);
2166     return zero_page;
2167 }
2168 
2169 static void
2170 update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
2171 {
2172     ram_counters.transferred += bytes_xmit;
2173 
2174     if (param->zero_page) {
2175         ram_counters.duplicate++;
2176         return;
2177     }
2178 
2179     /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
2180     compression_counters.compressed_size += bytes_xmit - 8;
2181     compression_counters.pages++;
2182 }
2183 
2184 static bool save_page_use_compression(RAMState *rs);
2185 
2186 static void flush_compressed_data(RAMState *rs)
2187 {
2188     int idx, len, thread_count;
2189 
2190     if (!save_page_use_compression(rs)) {
2191         return;
2192     }
2193     thread_count = migrate_compress_threads();
2194 
2195     qemu_mutex_lock(&comp_done_lock);
2196     for (idx = 0; idx < thread_count; idx++) {
2197         while (!comp_param[idx].done) {
2198             qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2199         }
2200     }
2201     qemu_mutex_unlock(&comp_done_lock);
2202 
2203     for (idx = 0; idx < thread_count; idx++) {
2204         qemu_mutex_lock(&comp_param[idx].mutex);
2205         if (!comp_param[idx].quit) {
2206             len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2207             /*
2208              * it's safe to fetch zero_page without holding comp_done_lock
2209              * as there is no further request submitted to the thread,
2210              * i.e, the thread should be waiting for a request at this point.
2211              */
2212             update_compress_thread_counts(&comp_param[idx], len);
2213         }
2214         qemu_mutex_unlock(&comp_param[idx].mutex);
2215     }
2216 }
2217 
2218 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
2219                                        ram_addr_t offset)
2220 {
2221     param->block = block;
2222     param->offset = offset;
2223 }
2224 
2225 static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
2226                                            ram_addr_t offset)
2227 {
2228     int idx, thread_count, bytes_xmit = -1, pages = -1;
2229     bool wait = migrate_compress_wait_thread();
2230 
2231     thread_count = migrate_compress_threads();
2232     qemu_mutex_lock(&comp_done_lock);
2233 retry:
2234     for (idx = 0; idx < thread_count; idx++) {
2235         if (comp_param[idx].done) {
2236             comp_param[idx].done = false;
2237             bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
2238             qemu_mutex_lock(&comp_param[idx].mutex);
2239             set_compress_params(&comp_param[idx], block, offset);
2240             qemu_cond_signal(&comp_param[idx].cond);
2241             qemu_mutex_unlock(&comp_param[idx].mutex);
2242             pages = 1;
2243             update_compress_thread_counts(&comp_param[idx], bytes_xmit);
2244             break;
2245         }
2246     }
2247 
2248     /*
2249      * wait for the free thread if the user specifies 'compress-wait-thread',
2250      * otherwise we will post the page out in the main thread as normal page.
2251      */
2252     if (pages < 0 && wait) {
2253         qemu_cond_wait(&comp_done_cond, &comp_done_lock);
2254         goto retry;
2255     }
2256     qemu_mutex_unlock(&comp_done_lock);
2257 
2258     return pages;
2259 }
2260 
2261 /**
2262  * find_dirty_block: find the next dirty page and update any state
2263  * associated with the search process.
2264  *
2265  * Returns true if a page is found
2266  *
2267  * @rs: current RAM state
2268  * @pss: data about the state of the current dirty page scan
2269  * @again: set to false if the search has scanned the whole of RAM
2270  */
2271 static bool find_dirty_block(RAMState *rs, PageSearchStatus *pss, bool *again)
2272 {
2273     pss->page = migration_bitmap_find_dirty(rs, pss->block, pss->page);
2274     if (pss->complete_round && pss->block == rs->last_seen_block &&
2275         pss->page >= rs->last_page) {
2276         /*
2277          * We've been once around the RAM and haven't found anything.
2278          * Give up.
2279          */
2280         *again = false;
2281         return false;
2282     }
2283     if ((((ram_addr_t)pss->page) << TARGET_PAGE_BITS)
2284         >= pss->block->used_length) {
2285         /* Didn't find anything in this RAM Block */
2286         pss->page = 0;
2287         pss->block = QLIST_NEXT_RCU(pss->block, next);
2288         if (!pss->block) {
2289             /*
2290              * If memory migration starts over, we will meet a dirtied page
2291              * which may still exists in compression threads's ring, so we
2292              * should flush the compressed data to make sure the new page
2293              * is not overwritten by the old one in the destination.
2294              *
2295              * Also If xbzrle is on, stop using the data compression at this
2296              * point. In theory, xbzrle can do better than compression.
2297              */
2298             flush_compressed_data(rs);
2299 
2300             /* Hit the end of the list */
2301             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
2302             /* Flag that we've looped */
2303             pss->complete_round = true;
2304             rs->ram_bulk_stage = false;
2305         }
2306         /* Didn't find anything this time, but try again on the new block */
2307         *again = true;
2308         return false;
2309     } else {
2310         /* Can go around again, but... */
2311         *again = true;
2312         /* We've found something so probably don't need to */
2313         return true;
2314     }
2315 }
2316 
2317 /**
2318  * unqueue_page: gets a page of the queue
2319  *
2320  * Helper for 'get_queued_page' - gets a page off the queue
2321  *
2322  * Returns the block of the page (or NULL if none available)
2323  *
2324  * @rs: current RAM state
2325  * @offset: used to return the offset within the RAMBlock
2326  */
2327 static RAMBlock *unqueue_page(RAMState *rs, ram_addr_t *offset)
2328 {
2329     RAMBlock *block = NULL;
2330 
2331     if (QSIMPLEQ_EMPTY_ATOMIC(&rs->src_page_requests)) {
2332         return NULL;
2333     }
2334 
2335     qemu_mutex_lock(&rs->src_page_req_mutex);
2336     if (!QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
2337         struct RAMSrcPageRequest *entry =
2338                                 QSIMPLEQ_FIRST(&rs->src_page_requests);
2339         block = entry->rb;
2340         *offset = entry->offset;
2341 
2342         if (entry->len > TARGET_PAGE_SIZE) {
2343             entry->len -= TARGET_PAGE_SIZE;
2344             entry->offset += TARGET_PAGE_SIZE;
2345         } else {
2346             memory_region_unref(block->mr);
2347             QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2348             g_free(entry);
2349             migration_consume_urgent_request();
2350         }
2351     }
2352     qemu_mutex_unlock(&rs->src_page_req_mutex);
2353 
2354     return block;
2355 }
2356 
2357 /**
2358  * get_queued_page: unqueue a page from the postcopy requests
2359  *
2360  * Skips pages that are already sent (!dirty)
2361  *
2362  * Returns true if a queued page is found
2363  *
2364  * @rs: current RAM state
2365  * @pss: data about the state of the current dirty page scan
2366  */
2367 static bool get_queued_page(RAMState *rs, PageSearchStatus *pss)
2368 {
2369     RAMBlock  *block;
2370     ram_addr_t offset;
2371     bool dirty;
2372 
2373     do {
2374         block = unqueue_page(rs, &offset);
2375         /*
2376          * We're sending this page, and since it's postcopy nothing else
2377          * will dirty it, and we must make sure it doesn't get sent again
2378          * even if this queue request was received after the background
2379          * search already sent it.
2380          */
2381         if (block) {
2382             unsigned long page;
2383 
2384             page = offset >> TARGET_PAGE_BITS;
2385             dirty = test_bit(page, block->bmap);
2386             if (!dirty) {
2387                 trace_get_queued_page_not_dirty(block->idstr, (uint64_t)offset,
2388                                                 page);
2389             } else {
2390                 trace_get_queued_page(block->idstr, (uint64_t)offset, page);
2391             }
2392         }
2393 
2394     } while (block && !dirty);
2395 
2396     if (block) {
2397         /*
2398          * As soon as we start servicing pages out of order, then we have
2399          * to kill the bulk stage, since the bulk stage assumes
2400          * in (migration_bitmap_find_and_reset_dirty) that every page is
2401          * dirty, that's no longer true.
2402          */
2403         rs->ram_bulk_stage = false;
2404 
2405         /*
2406          * We want the background search to continue from the queued page
2407          * since the guest is likely to want other pages near to the page
2408          * it just requested.
2409          */
2410         pss->block = block;
2411         pss->page = offset >> TARGET_PAGE_BITS;
2412 
2413         /*
2414          * This unqueued page would break the "one round" check, even is
2415          * really rare.
2416          */
2417         pss->complete_round = false;
2418     }
2419 
2420     return !!block;
2421 }
2422 
2423 /**
2424  * migration_page_queue_free: drop any remaining pages in the ram
2425  * request queue
2426  *
2427  * It should be empty at the end anyway, but in error cases there may
2428  * be some left.  in case that there is any page left, we drop it.
2429  *
2430  */
2431 static void migration_page_queue_free(RAMState *rs)
2432 {
2433     struct RAMSrcPageRequest *mspr, *next_mspr;
2434     /* This queue generally should be empty - but in the case of a failed
2435      * migration might have some droppings in.
2436      */
2437     RCU_READ_LOCK_GUARD();
2438     QSIMPLEQ_FOREACH_SAFE(mspr, &rs->src_page_requests, next_req, next_mspr) {
2439         memory_region_unref(mspr->rb->mr);
2440         QSIMPLEQ_REMOVE_HEAD(&rs->src_page_requests, next_req);
2441         g_free(mspr);
2442     }
2443 }
2444 
2445 /**
2446  * ram_save_queue_pages: queue the page for transmission
2447  *
2448  * A request from postcopy destination for example.
2449  *
2450  * Returns zero on success or negative on error
2451  *
2452  * @rbname: Name of the RAMBLock of the request. NULL means the
2453  *          same that last one.
2454  * @start: starting address from the start of the RAMBlock
2455  * @len: length (in bytes) to send
2456  */
2457 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len)
2458 {
2459     RAMBlock *ramblock;
2460     RAMState *rs = ram_state;
2461 
2462     ram_counters.postcopy_requests++;
2463     RCU_READ_LOCK_GUARD();
2464 
2465     if (!rbname) {
2466         /* Reuse last RAMBlock */
2467         ramblock = rs->last_req_rb;
2468 
2469         if (!ramblock) {
2470             /*
2471              * Shouldn't happen, we can't reuse the last RAMBlock if
2472              * it's the 1st request.
2473              */
2474             error_report("ram_save_queue_pages no previous block");
2475             return -1;
2476         }
2477     } else {
2478         ramblock = qemu_ram_block_by_name(rbname);
2479 
2480         if (!ramblock) {
2481             /* We shouldn't be asked for a non-existent RAMBlock */
2482             error_report("ram_save_queue_pages no block '%s'", rbname);
2483             return -1;
2484         }
2485         rs->last_req_rb = ramblock;
2486     }
2487     trace_ram_save_queue_pages(ramblock->idstr, start, len);
2488     if (start+len > ramblock->used_length) {
2489         error_report("%s request overrun start=" RAM_ADDR_FMT " len="
2490                      RAM_ADDR_FMT " blocklen=" RAM_ADDR_FMT,
2491                      __func__, start, len, ramblock->used_length);
2492         return -1;
2493     }
2494 
2495     struct RAMSrcPageRequest *new_entry =
2496         g_malloc0(sizeof(struct RAMSrcPageRequest));
2497     new_entry->rb = ramblock;
2498     new_entry->offset = start;
2499     new_entry->len = len;
2500 
2501     memory_region_ref(ramblock->mr);
2502     qemu_mutex_lock(&rs->src_page_req_mutex);
2503     QSIMPLEQ_INSERT_TAIL(&rs->src_page_requests, new_entry, next_req);
2504     migration_make_urgent_request();
2505     qemu_mutex_unlock(&rs->src_page_req_mutex);
2506 
2507     return 0;
2508 }
2509 
2510 static bool save_page_use_compression(RAMState *rs)
2511 {
2512     if (!migrate_use_compression()) {
2513         return false;
2514     }
2515 
2516     /*
2517      * If xbzrle is on, stop using the data compression after first
2518      * round of migration even if compression is enabled. In theory,
2519      * xbzrle can do better than compression.
2520      */
2521     if (rs->ram_bulk_stage || !migrate_use_xbzrle()) {
2522         return true;
2523     }
2524 
2525     return false;
2526 }
2527 
2528 /*
2529  * try to compress the page before posting it out, return true if the page
2530  * has been properly handled by compression, otherwise needs other
2531  * paths to handle it
2532  */
2533 static bool save_compress_page(RAMState *rs, RAMBlock *block, ram_addr_t offset)
2534 {
2535     if (!save_page_use_compression(rs)) {
2536         return false;
2537     }
2538 
2539     /*
2540      * When starting the process of a new block, the first page of
2541      * the block should be sent out before other pages in the same
2542      * block, and all the pages in last block should have been sent
2543      * out, keeping this order is important, because the 'cont' flag
2544      * is used to avoid resending the block name.
2545      *
2546      * We post the fist page as normal page as compression will take
2547      * much CPU resource.
2548      */
2549     if (block != rs->last_sent_block) {
2550         flush_compressed_data(rs);
2551         return false;
2552     }
2553 
2554     if (compress_page_with_multi_thread(rs, block, offset) > 0) {
2555         return true;
2556     }
2557 
2558     compression_counters.busy++;
2559     return false;
2560 }
2561 
2562 /**
2563  * ram_save_target_page: save one target page
2564  *
2565  * Returns the number of pages written
2566  *
2567  * @rs: current RAM state
2568  * @pss: data about the page we want to send
2569  * @last_stage: if we are at the completion stage
2570  */
2571 static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss,
2572                                 bool last_stage)
2573 {
2574     RAMBlock *block = pss->block;
2575     ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
2576     int res;
2577 
2578     if (control_save_page(rs, block, offset, &res)) {
2579         return res;
2580     }
2581 
2582     if (save_compress_page(rs, block, offset)) {
2583         return 1;
2584     }
2585 
2586     res = save_zero_page(rs, block, offset);
2587     if (res > 0) {
2588         /* Must let xbzrle know, otherwise a previous (now 0'd) cached
2589          * page would be stale
2590          */
2591         if (!save_page_use_compression(rs)) {
2592             XBZRLE_cache_lock();
2593             xbzrle_cache_zero_page(rs, block->offset + offset);
2594             XBZRLE_cache_unlock();
2595         }
2596         ram_release_pages(block->idstr, offset, res);
2597         return res;
2598     }
2599 
2600     /*
2601      * Do not use multifd for:
2602      * 1. Compression as the first page in the new block should be posted out
2603      *    before sending the compressed page
2604      * 2. In postcopy as one whole host page should be placed
2605      */
2606     if (!save_page_use_compression(rs) && migrate_use_multifd()
2607         && !migration_in_postcopy()) {
2608         return ram_save_multifd_page(rs, block, offset);
2609     }
2610 
2611     return ram_save_page(rs, pss, last_stage);
2612 }
2613 
2614 /**
2615  * ram_save_host_page: save a whole host page
2616  *
2617  * Starting at *offset send pages up to the end of the current host
2618  * page. It's valid for the initial offset to point into the middle of
2619  * a host page in which case the remainder of the hostpage is sent.
2620  * Only dirty target pages are sent. Note that the host page size may
2621  * be a huge page for this block.
2622  * The saving stops at the boundary of the used_length of the block
2623  * if the RAMBlock isn't a multiple of the host page size.
2624  *
2625  * Returns the number of pages written or negative on error
2626  *
2627  * @rs: current RAM state
2628  * @ms: current migration state
2629  * @pss: data about the page we want to send
2630  * @last_stage: if we are at the completion stage
2631  */
2632 static int ram_save_host_page(RAMState *rs, PageSearchStatus *pss,
2633                               bool last_stage)
2634 {
2635     int tmppages, pages = 0;
2636     size_t pagesize_bits =
2637         qemu_ram_pagesize(pss->block) >> TARGET_PAGE_BITS;
2638 
2639     if (ramblock_is_ignored(pss->block)) {
2640         error_report("block %s should not be migrated !", pss->block->idstr);
2641         return 0;
2642     }
2643 
2644     do {
2645         /* Check the pages is dirty and if it is send it */
2646         if (!migration_bitmap_clear_dirty(rs, pss->block, pss->page)) {
2647             pss->page++;
2648             continue;
2649         }
2650 
2651         tmppages = ram_save_target_page(rs, pss, last_stage);
2652         if (tmppages < 0) {
2653             return tmppages;
2654         }
2655 
2656         pages += tmppages;
2657         pss->page++;
2658         /* Allow rate limiting to happen in the middle of huge pages */
2659         migration_rate_limit();
2660     } while ((pss->page & (pagesize_bits - 1)) &&
2661              offset_in_ramblock(pss->block,
2662                                 ((ram_addr_t)pss->page) << TARGET_PAGE_BITS));
2663 
2664     /* The offset we leave with is the last one we looked at */
2665     pss->page--;
2666     return pages;
2667 }
2668 
2669 /**
2670  * ram_find_and_save_block: finds a dirty page and sends it to f
2671  *
2672  * Called within an RCU critical section.
2673  *
2674  * Returns the number of pages written where zero means no dirty pages,
2675  * or negative on error
2676  *
2677  * @rs: current RAM state
2678  * @last_stage: if we are at the completion stage
2679  *
2680  * On systems where host-page-size > target-page-size it will send all the
2681  * pages in a host page that are dirty.
2682  */
2683 
2684 static int ram_find_and_save_block(RAMState *rs, bool last_stage)
2685 {
2686     PageSearchStatus pss;
2687     int pages = 0;
2688     bool again, found;
2689 
2690     /* No dirty page as there is zero RAM */
2691     if (!ram_bytes_total()) {
2692         return pages;
2693     }
2694 
2695     pss.block = rs->last_seen_block;
2696     pss.page = rs->last_page;
2697     pss.complete_round = false;
2698 
2699     if (!pss.block) {
2700         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
2701     }
2702 
2703     do {
2704         again = true;
2705         found = get_queued_page(rs, &pss);
2706 
2707         if (!found) {
2708             /* priority queue empty, so just search for something dirty */
2709             found = find_dirty_block(rs, &pss, &again);
2710         }
2711 
2712         if (found) {
2713             pages = ram_save_host_page(rs, &pss, last_stage);
2714         }
2715     } while (!pages && again);
2716 
2717     rs->last_seen_block = pss.block;
2718     rs->last_page = pss.page;
2719 
2720     return pages;
2721 }
2722 
2723 void acct_update_position(QEMUFile *f, size_t size, bool zero)
2724 {
2725     uint64_t pages = size / TARGET_PAGE_SIZE;
2726 
2727     if (zero) {
2728         ram_counters.duplicate += pages;
2729     } else {
2730         ram_counters.normal += pages;
2731         ram_counters.transferred += size;
2732         qemu_update_position(f, size);
2733     }
2734 }
2735 
2736 static uint64_t ram_bytes_total_common(bool count_ignored)
2737 {
2738     RAMBlock *block;
2739     uint64_t total = 0;
2740 
2741     RCU_READ_LOCK_GUARD();
2742 
2743     if (count_ignored) {
2744         RAMBLOCK_FOREACH_MIGRATABLE(block) {
2745             total += block->used_length;
2746         }
2747     } else {
2748         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2749             total += block->used_length;
2750         }
2751     }
2752     return total;
2753 }
2754 
2755 uint64_t ram_bytes_total(void)
2756 {
2757     return ram_bytes_total_common(false);
2758 }
2759 
2760 static void xbzrle_load_setup(void)
2761 {
2762     XBZRLE.decoded_buf = g_malloc(TARGET_PAGE_SIZE);
2763 }
2764 
2765 static void xbzrle_load_cleanup(void)
2766 {
2767     g_free(XBZRLE.decoded_buf);
2768     XBZRLE.decoded_buf = NULL;
2769 }
2770 
2771 static void ram_state_cleanup(RAMState **rsp)
2772 {
2773     if (*rsp) {
2774         migration_page_queue_free(*rsp);
2775         qemu_mutex_destroy(&(*rsp)->bitmap_mutex);
2776         qemu_mutex_destroy(&(*rsp)->src_page_req_mutex);
2777         g_free(*rsp);
2778         *rsp = NULL;
2779     }
2780 }
2781 
2782 static void xbzrle_cleanup(void)
2783 {
2784     XBZRLE_cache_lock();
2785     if (XBZRLE.cache) {
2786         cache_fini(XBZRLE.cache);
2787         g_free(XBZRLE.encoded_buf);
2788         g_free(XBZRLE.current_buf);
2789         g_free(XBZRLE.zero_target_page);
2790         XBZRLE.cache = NULL;
2791         XBZRLE.encoded_buf = NULL;
2792         XBZRLE.current_buf = NULL;
2793         XBZRLE.zero_target_page = NULL;
2794     }
2795     XBZRLE_cache_unlock();
2796 }
2797 
2798 static void ram_save_cleanup(void *opaque)
2799 {
2800     RAMState **rsp = opaque;
2801     RAMBlock *block;
2802 
2803     /* caller have hold iothread lock or is in a bh, so there is
2804      * no writing race against the migration bitmap
2805      */
2806     memory_global_dirty_log_stop();
2807 
2808     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2809         g_free(block->clear_bmap);
2810         block->clear_bmap = NULL;
2811         g_free(block->bmap);
2812         block->bmap = NULL;
2813     }
2814 
2815     xbzrle_cleanup();
2816     compress_threads_save_cleanup();
2817     ram_state_cleanup(rsp);
2818 }
2819 
2820 static void ram_state_reset(RAMState *rs)
2821 {
2822     rs->last_seen_block = NULL;
2823     rs->last_sent_block = NULL;
2824     rs->last_page = 0;
2825     rs->last_version = ram_list.version;
2826     rs->ram_bulk_stage = true;
2827     rs->fpo_enabled = false;
2828 }
2829 
2830 #define MAX_WAIT 50 /* ms, half buffered_file limit */
2831 
2832 /*
2833  * 'expected' is the value you expect the bitmap mostly to be full
2834  * of; it won't bother printing lines that are all this value.
2835  * If 'todump' is null the migration bitmap is dumped.
2836  */
2837 void ram_debug_dump_bitmap(unsigned long *todump, bool expected,
2838                            unsigned long pages)
2839 {
2840     int64_t cur;
2841     int64_t linelen = 128;
2842     char linebuf[129];
2843 
2844     for (cur = 0; cur < pages; cur += linelen) {
2845         int64_t curb;
2846         bool found = false;
2847         /*
2848          * Last line; catch the case where the line length
2849          * is longer than remaining ram
2850          */
2851         if (cur + linelen > pages) {
2852             linelen = pages - cur;
2853         }
2854         for (curb = 0; curb < linelen; curb++) {
2855             bool thisbit = test_bit(cur + curb, todump);
2856             linebuf[curb] = thisbit ? '1' : '.';
2857             found = found || (thisbit != expected);
2858         }
2859         if (found) {
2860             linebuf[curb] = '\0';
2861             fprintf(stderr,  "0x%08" PRIx64 " : %s\n", cur, linebuf);
2862         }
2863     }
2864 }
2865 
2866 /* **** functions for postcopy ***** */
2867 
2868 void ram_postcopy_migrated_memory_release(MigrationState *ms)
2869 {
2870     struct RAMBlock *block;
2871 
2872     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2873         unsigned long *bitmap = block->bmap;
2874         unsigned long range = block->used_length >> TARGET_PAGE_BITS;
2875         unsigned long run_start = find_next_zero_bit(bitmap, range, 0);
2876 
2877         while (run_start < range) {
2878             unsigned long run_end = find_next_bit(bitmap, range, run_start + 1);
2879             ram_discard_range(block->idstr,
2880                               ((ram_addr_t)run_start) << TARGET_PAGE_BITS,
2881                               ((ram_addr_t)(run_end - run_start))
2882                                 << TARGET_PAGE_BITS);
2883             run_start = find_next_zero_bit(bitmap, range, run_end + 1);
2884         }
2885     }
2886 }
2887 
2888 /**
2889  * postcopy_send_discard_bm_ram: discard a RAMBlock
2890  *
2891  * Returns zero on success
2892  *
2893  * Callback from postcopy_each_ram_send_discard for each RAMBlock
2894  *
2895  * @ms: current migration state
2896  * @block: RAMBlock to discard
2897  */
2898 static int postcopy_send_discard_bm_ram(MigrationState *ms, RAMBlock *block)
2899 {
2900     unsigned long end = block->used_length >> TARGET_PAGE_BITS;
2901     unsigned long current;
2902     unsigned long *bitmap = block->bmap;
2903 
2904     for (current = 0; current < end; ) {
2905         unsigned long one = find_next_bit(bitmap, end, current);
2906         unsigned long zero, discard_length;
2907 
2908         if (one >= end) {
2909             break;
2910         }
2911 
2912         zero = find_next_zero_bit(bitmap, end, one + 1);
2913 
2914         if (zero >= end) {
2915             discard_length = end - one;
2916         } else {
2917             discard_length = zero - one;
2918         }
2919         postcopy_discard_send_range(ms, one, discard_length);
2920         current = one + discard_length;
2921     }
2922 
2923     return 0;
2924 }
2925 
2926 /**
2927  * postcopy_each_ram_send_discard: discard all RAMBlocks
2928  *
2929  * Returns 0 for success or negative for error
2930  *
2931  * Utility for the outgoing postcopy code.
2932  *   Calls postcopy_send_discard_bm_ram for each RAMBlock
2933  *   passing it bitmap indexes and name.
2934  * (qemu_ram_foreach_block ends up passing unscaled lengths
2935  *  which would mean postcopy code would have to deal with target page)
2936  *
2937  * @ms: current migration state
2938  */
2939 static int postcopy_each_ram_send_discard(MigrationState *ms)
2940 {
2941     struct RAMBlock *block;
2942     int ret;
2943 
2944     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
2945         postcopy_discard_send_init(ms, block->idstr);
2946 
2947         /*
2948          * Postcopy sends chunks of bitmap over the wire, but it
2949          * just needs indexes at this point, avoids it having
2950          * target page specific code.
2951          */
2952         ret = postcopy_send_discard_bm_ram(ms, block);
2953         postcopy_discard_send_finish(ms);
2954         if (ret) {
2955             return ret;
2956         }
2957     }
2958 
2959     return 0;
2960 }
2961 
2962 /**
2963  * postcopy_chunk_hostpages_pass: canonicalize bitmap in hostpages
2964  *
2965  * Helper for postcopy_chunk_hostpages; it's called twice to
2966  * canonicalize the two bitmaps, that are similar, but one is
2967  * inverted.
2968  *
2969  * Postcopy requires that all target pages in a hostpage are dirty or
2970  * clean, not a mix.  This function canonicalizes the bitmaps.
2971  *
2972  * @ms: current migration state
2973  * @block: block that contains the page we want to canonicalize
2974  */
2975 static void postcopy_chunk_hostpages_pass(MigrationState *ms, RAMBlock *block)
2976 {
2977     RAMState *rs = ram_state;
2978     unsigned long *bitmap = block->bmap;
2979     unsigned int host_ratio = block->page_size / TARGET_PAGE_SIZE;
2980     unsigned long pages = block->used_length >> TARGET_PAGE_BITS;
2981     unsigned long run_start;
2982 
2983     if (block->page_size == TARGET_PAGE_SIZE) {
2984         /* Easy case - TPS==HPS for a non-huge page RAMBlock */
2985         return;
2986     }
2987 
2988     /* Find a dirty page */
2989     run_start = find_next_bit(bitmap, pages, 0);
2990 
2991     while (run_start < pages) {
2992 
2993         /*
2994          * If the start of this run of pages is in the middle of a host
2995          * page, then we need to fixup this host page.
2996          */
2997         if (QEMU_IS_ALIGNED(run_start, host_ratio)) {
2998             /* Find the end of this run */
2999             run_start = find_next_zero_bit(bitmap, pages, run_start + 1);
3000             /*
3001              * If the end isn't at the start of a host page, then the
3002              * run doesn't finish at the end of a host page
3003              * and we need to discard.
3004              */
3005         }
3006 
3007         if (!QEMU_IS_ALIGNED(run_start, host_ratio)) {
3008             unsigned long page;
3009             unsigned long fixup_start_addr = QEMU_ALIGN_DOWN(run_start,
3010                                                              host_ratio);
3011             run_start = QEMU_ALIGN_UP(run_start, host_ratio);
3012 
3013             /* Clean up the bitmap */
3014             for (page = fixup_start_addr;
3015                  page < fixup_start_addr + host_ratio; page++) {
3016                 /*
3017                  * Remark them as dirty, updating the count for any pages
3018                  * that weren't previously dirty.
3019                  */
3020                 rs->migration_dirty_pages += !test_and_set_bit(page, bitmap);
3021             }
3022         }
3023 
3024         /* Find the next dirty page for the next iteration */
3025         run_start = find_next_bit(bitmap, pages, run_start);
3026     }
3027 }
3028 
3029 /**
3030  * postcopy_chunk_hostpages: discard any partially sent host page
3031  *
3032  * Utility for the outgoing postcopy code.
3033  *
3034  * Discard any partially sent host-page size chunks, mark any partially
3035  * dirty host-page size chunks as all dirty.  In this case the host-page
3036  * is the host-page for the particular RAMBlock, i.e. it might be a huge page
3037  *
3038  * Returns zero on success
3039  *
3040  * @ms: current migration state
3041  * @block: block we want to work with
3042  */
3043 static int postcopy_chunk_hostpages(MigrationState *ms, RAMBlock *block)
3044 {
3045     postcopy_discard_send_init(ms, block->idstr);
3046 
3047     /*
3048      * Ensure that all partially dirty host pages are made fully dirty.
3049      */
3050     postcopy_chunk_hostpages_pass(ms, block);
3051 
3052     postcopy_discard_send_finish(ms);
3053     return 0;
3054 }
3055 
3056 /**
3057  * ram_postcopy_send_discard_bitmap: transmit the discard bitmap
3058  *
3059  * Returns zero on success
3060  *
3061  * Transmit the set of pages to be discarded after precopy to the target
3062  * these are pages that:
3063  *     a) Have been previously transmitted but are now dirty again
3064  *     b) Pages that have never been transmitted, this ensures that
3065  *        any pages on the destination that have been mapped by background
3066  *        tasks get discarded (transparent huge pages is the specific concern)
3067  * Hopefully this is pretty sparse
3068  *
3069  * @ms: current migration state
3070  */
3071 int ram_postcopy_send_discard_bitmap(MigrationState *ms)
3072 {
3073     RAMState *rs = ram_state;
3074     RAMBlock *block;
3075     int ret;
3076 
3077     RCU_READ_LOCK_GUARD();
3078 
3079     /* This should be our last sync, the src is now paused */
3080     migration_bitmap_sync(rs);
3081 
3082     /* Easiest way to make sure we don't resume in the middle of a host-page */
3083     rs->last_seen_block = NULL;
3084     rs->last_sent_block = NULL;
3085     rs->last_page = 0;
3086 
3087     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3088         /* Deal with TPS != HPS and huge pages */
3089         ret = postcopy_chunk_hostpages(ms, block);
3090         if (ret) {
3091             return ret;
3092         }
3093 
3094 #ifdef DEBUG_POSTCOPY
3095         ram_debug_dump_bitmap(block->bmap, true,
3096                               block->used_length >> TARGET_PAGE_BITS);
3097 #endif
3098     }
3099     trace_ram_postcopy_send_discard_bitmap();
3100 
3101     ret = postcopy_each_ram_send_discard(ms);
3102 
3103     return ret;
3104 }
3105 
3106 /**
3107  * ram_discard_range: discard dirtied pages at the beginning of postcopy
3108  *
3109  * Returns zero on success
3110  *
3111  * @rbname: name of the RAMBlock of the request. NULL means the
3112  *          same that last one.
3113  * @start: RAMBlock starting page
3114  * @length: RAMBlock size
3115  */
3116 int ram_discard_range(const char *rbname, uint64_t start, size_t length)
3117 {
3118     trace_ram_discard_range(rbname, start, length);
3119 
3120     RCU_READ_LOCK_GUARD();
3121     RAMBlock *rb = qemu_ram_block_by_name(rbname);
3122 
3123     if (!rb) {
3124         error_report("ram_discard_range: Failed to find block '%s'", rbname);
3125         return -1;
3126     }
3127 
3128     /*
3129      * On source VM, we don't need to update the received bitmap since
3130      * we don't even have one.
3131      */
3132     if (rb->receivedmap) {
3133         bitmap_clear(rb->receivedmap, start >> qemu_target_page_bits(),
3134                      length >> qemu_target_page_bits());
3135     }
3136 
3137     return ram_block_discard_range(rb, start, length);
3138 }
3139 
3140 /*
3141  * For every allocation, we will try not to crash the VM if the
3142  * allocation failed.
3143  */
3144 static int xbzrle_init(void)
3145 {
3146     Error *local_err = NULL;
3147 
3148     if (!migrate_use_xbzrle()) {
3149         return 0;
3150     }
3151 
3152     XBZRLE_cache_lock();
3153 
3154     XBZRLE.zero_target_page = g_try_malloc0(TARGET_PAGE_SIZE);
3155     if (!XBZRLE.zero_target_page) {
3156         error_report("%s: Error allocating zero page", __func__);
3157         goto err_out;
3158     }
3159 
3160     XBZRLE.cache = cache_init(migrate_xbzrle_cache_size(),
3161                               TARGET_PAGE_SIZE, &local_err);
3162     if (!XBZRLE.cache) {
3163         error_report_err(local_err);
3164         goto free_zero_page;
3165     }
3166 
3167     XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
3168     if (!XBZRLE.encoded_buf) {
3169         error_report("%s: Error allocating encoded_buf", __func__);
3170         goto free_cache;
3171     }
3172 
3173     XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
3174     if (!XBZRLE.current_buf) {
3175         error_report("%s: Error allocating current_buf", __func__);
3176         goto free_encoded_buf;
3177     }
3178 
3179     /* We are all good */
3180     XBZRLE_cache_unlock();
3181     return 0;
3182 
3183 free_encoded_buf:
3184     g_free(XBZRLE.encoded_buf);
3185     XBZRLE.encoded_buf = NULL;
3186 free_cache:
3187     cache_fini(XBZRLE.cache);
3188     XBZRLE.cache = NULL;
3189 free_zero_page:
3190     g_free(XBZRLE.zero_target_page);
3191     XBZRLE.zero_target_page = NULL;
3192 err_out:
3193     XBZRLE_cache_unlock();
3194     return -ENOMEM;
3195 }
3196 
3197 static int ram_state_init(RAMState **rsp)
3198 {
3199     *rsp = g_try_new0(RAMState, 1);
3200 
3201     if (!*rsp) {
3202         error_report("%s: Init ramstate fail", __func__);
3203         return -1;
3204     }
3205 
3206     qemu_mutex_init(&(*rsp)->bitmap_mutex);
3207     qemu_mutex_init(&(*rsp)->src_page_req_mutex);
3208     QSIMPLEQ_INIT(&(*rsp)->src_page_requests);
3209 
3210     /*
3211      * Count the total number of pages used by ram blocks not including any
3212      * gaps due to alignment or unplugs.
3213      * This must match with the initial values of dirty bitmap.
3214      */
3215     (*rsp)->migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
3216     ram_state_reset(*rsp);
3217 
3218     return 0;
3219 }
3220 
3221 static void ram_list_init_bitmaps(void)
3222 {
3223     MigrationState *ms = migrate_get_current();
3224     RAMBlock *block;
3225     unsigned long pages;
3226     uint8_t shift;
3227 
3228     /* Skip setting bitmap if there is no RAM */
3229     if (ram_bytes_total()) {
3230         shift = ms->clear_bitmap_shift;
3231         if (shift > CLEAR_BITMAP_SHIFT_MAX) {
3232             error_report("clear_bitmap_shift (%u) too big, using "
3233                          "max value (%u)", shift, CLEAR_BITMAP_SHIFT_MAX);
3234             shift = CLEAR_BITMAP_SHIFT_MAX;
3235         } else if (shift < CLEAR_BITMAP_SHIFT_MIN) {
3236             error_report("clear_bitmap_shift (%u) too small, using "
3237                          "min value (%u)", shift, CLEAR_BITMAP_SHIFT_MIN);
3238             shift = CLEAR_BITMAP_SHIFT_MIN;
3239         }
3240 
3241         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3242             pages = block->max_length >> TARGET_PAGE_BITS;
3243             /*
3244              * The initial dirty bitmap for migration must be set with all
3245              * ones to make sure we'll migrate every guest RAM page to
3246              * destination.
3247              * Here we set RAMBlock.bmap all to 1 because when rebegin a
3248              * new migration after a failed migration, ram_list.
3249              * dirty_memory[DIRTY_MEMORY_MIGRATION] don't include the whole
3250              * guest memory.
3251              */
3252             block->bmap = bitmap_new(pages);
3253             bitmap_set(block->bmap, 0, pages);
3254             block->clear_bmap_shift = shift;
3255             block->clear_bmap = bitmap_new(clear_bmap_size(pages, shift));
3256         }
3257     }
3258 }
3259 
3260 static void ram_init_bitmaps(RAMState *rs)
3261 {
3262     /* For memory_global_dirty_log_start below.  */
3263     qemu_mutex_lock_iothread();
3264     qemu_mutex_lock_ramlist();
3265 
3266     WITH_RCU_READ_LOCK_GUARD() {
3267         ram_list_init_bitmaps();
3268         memory_global_dirty_log_start();
3269         migration_bitmap_sync_precopy(rs);
3270     }
3271     qemu_mutex_unlock_ramlist();
3272     qemu_mutex_unlock_iothread();
3273 }
3274 
3275 static int ram_init_all(RAMState **rsp)
3276 {
3277     if (ram_state_init(rsp)) {
3278         return -1;
3279     }
3280 
3281     if (xbzrle_init()) {
3282         ram_state_cleanup(rsp);
3283         return -1;
3284     }
3285 
3286     ram_init_bitmaps(*rsp);
3287 
3288     return 0;
3289 }
3290 
3291 static void ram_state_resume_prepare(RAMState *rs, QEMUFile *out)
3292 {
3293     RAMBlock *block;
3294     uint64_t pages = 0;
3295 
3296     /*
3297      * Postcopy is not using xbzrle/compression, so no need for that.
3298      * Also, since source are already halted, we don't need to care
3299      * about dirty page logging as well.
3300      */
3301 
3302     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3303         pages += bitmap_count_one(block->bmap,
3304                                   block->used_length >> TARGET_PAGE_BITS);
3305     }
3306 
3307     /* This may not be aligned with current bitmaps. Recalculate. */
3308     rs->migration_dirty_pages = pages;
3309 
3310     rs->last_seen_block = NULL;
3311     rs->last_sent_block = NULL;
3312     rs->last_page = 0;
3313     rs->last_version = ram_list.version;
3314     /*
3315      * Disable the bulk stage, otherwise we'll resend the whole RAM no
3316      * matter what we have sent.
3317      */
3318     rs->ram_bulk_stage = false;
3319 
3320     /* Update RAMState cache of output QEMUFile */
3321     rs->f = out;
3322 
3323     trace_ram_state_resume_prepare(pages);
3324 }
3325 
3326 /*
3327  * This function clears bits of the free pages reported by the caller from the
3328  * migration dirty bitmap. @addr is the host address corresponding to the
3329  * start of the continuous guest free pages, and @len is the total bytes of
3330  * those pages.
3331  */
3332 void qemu_guest_free_page_hint(void *addr, size_t len)
3333 {
3334     RAMBlock *block;
3335     ram_addr_t offset;
3336     size_t used_len, start, npages;
3337     MigrationState *s = migrate_get_current();
3338 
3339     /* This function is currently expected to be used during live migration */
3340     if (!migration_is_setup_or_active(s->state)) {
3341         return;
3342     }
3343 
3344     for (; len > 0; len -= used_len, addr += used_len) {
3345         block = qemu_ram_block_from_host(addr, false, &offset);
3346         if (unlikely(!block || offset >= block->used_length)) {
3347             /*
3348              * The implementation might not support RAMBlock resize during
3349              * live migration, but it could happen in theory with future
3350              * updates. So we add a check here to capture that case.
3351              */
3352             error_report_once("%s unexpected error", __func__);
3353             return;
3354         }
3355 
3356         if (len <= block->used_length - offset) {
3357             used_len = len;
3358         } else {
3359             used_len = block->used_length - offset;
3360         }
3361 
3362         start = offset >> TARGET_PAGE_BITS;
3363         npages = used_len >> TARGET_PAGE_BITS;
3364 
3365         qemu_mutex_lock(&ram_state->bitmap_mutex);
3366         ram_state->migration_dirty_pages -=
3367                       bitmap_count_one_with_offset(block->bmap, start, npages);
3368         bitmap_clear(block->bmap, start, npages);
3369         qemu_mutex_unlock(&ram_state->bitmap_mutex);
3370     }
3371 }
3372 
3373 /*
3374  * Each of ram_save_setup, ram_save_iterate and ram_save_complete has
3375  * long-running RCU critical section.  When rcu-reclaims in the code
3376  * start to become numerous it will be necessary to reduce the
3377  * granularity of these critical sections.
3378  */
3379 
3380 /**
3381  * ram_save_setup: Setup RAM for migration
3382  *
3383  * Returns zero to indicate success and negative for error
3384  *
3385  * @f: QEMUFile where to send the data
3386  * @opaque: RAMState pointer
3387  */
3388 static int ram_save_setup(QEMUFile *f, void *opaque)
3389 {
3390     RAMState **rsp = opaque;
3391     RAMBlock *block;
3392 
3393     if (compress_threads_save_setup()) {
3394         return -1;
3395     }
3396 
3397     /* migration has already setup the bitmap, reuse it. */
3398     if (!migration_in_colo_state()) {
3399         if (ram_init_all(rsp) != 0) {
3400             compress_threads_save_cleanup();
3401             return -1;
3402         }
3403     }
3404     (*rsp)->f = f;
3405 
3406     WITH_RCU_READ_LOCK_GUARD() {
3407         qemu_put_be64(f, ram_bytes_total_common(true) | RAM_SAVE_FLAG_MEM_SIZE);
3408 
3409         RAMBLOCK_FOREACH_MIGRATABLE(block) {
3410             qemu_put_byte(f, strlen(block->idstr));
3411             qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
3412             qemu_put_be64(f, block->used_length);
3413             if (migrate_postcopy_ram() && block->page_size !=
3414                                           qemu_host_page_size) {
3415                 qemu_put_be64(f, block->page_size);
3416             }
3417             if (migrate_ignore_shared()) {
3418                 qemu_put_be64(f, block->mr->addr);
3419             }
3420         }
3421     }
3422 
3423     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
3424     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
3425 
3426     multifd_send_sync_main(*rsp);
3427     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3428     qemu_fflush(f);
3429 
3430     return 0;
3431 }
3432 
3433 /**
3434  * ram_save_iterate: iterative stage for migration
3435  *
3436  * Returns zero to indicate success and negative for error
3437  *
3438  * @f: QEMUFile where to send the data
3439  * @opaque: RAMState pointer
3440  */
3441 static int ram_save_iterate(QEMUFile *f, void *opaque)
3442 {
3443     RAMState **temp = opaque;
3444     RAMState *rs = *temp;
3445     int ret;
3446     int i;
3447     int64_t t0;
3448     int done = 0;
3449 
3450     if (blk_mig_bulk_active()) {
3451         /* Avoid transferring ram during bulk phase of block migration as
3452          * the bulk phase will usually take a long time and transferring
3453          * ram updates during that time is pointless. */
3454         goto out;
3455     }
3456 
3457     WITH_RCU_READ_LOCK_GUARD() {
3458         if (ram_list.version != rs->last_version) {
3459             ram_state_reset(rs);
3460         }
3461 
3462         /* Read version before ram_list.blocks */
3463         smp_rmb();
3464 
3465         ram_control_before_iterate(f, RAM_CONTROL_ROUND);
3466 
3467         t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
3468         i = 0;
3469         while ((ret = qemu_file_rate_limit(f)) == 0 ||
3470                 !QSIMPLEQ_EMPTY(&rs->src_page_requests)) {
3471             int pages;
3472 
3473             if (qemu_file_get_error(f)) {
3474                 break;
3475             }
3476 
3477             pages = ram_find_and_save_block(rs, false);
3478             /* no more pages to sent */
3479             if (pages == 0) {
3480                 done = 1;
3481                 break;
3482             }
3483 
3484             if (pages < 0) {
3485                 qemu_file_set_error(f, pages);
3486                 break;
3487             }
3488 
3489             rs->target_page_count += pages;
3490 
3491             /*
3492              * During postcopy, it is necessary to make sure one whole host
3493              * page is sent in one chunk.
3494              */
3495             if (migrate_postcopy_ram()) {
3496                 flush_compressed_data(rs);
3497             }
3498 
3499             /*
3500              * we want to check in the 1st loop, just in case it was the 1st
3501              * time and we had to sync the dirty bitmap.
3502              * qemu_clock_get_ns() is a bit expensive, so we only check each
3503              * some iterations
3504              */
3505             if ((i & 63) == 0) {
3506                 uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) /
3507                               1000000;
3508                 if (t1 > MAX_WAIT) {
3509                     trace_ram_save_iterate_big_wait(t1, i);
3510                     break;
3511                 }
3512             }
3513             i++;
3514         }
3515     }
3516 
3517     /*
3518      * Must occur before EOS (or any QEMUFile operation)
3519      * because of RDMA protocol.
3520      */
3521     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
3522 
3523 out:
3524     multifd_send_sync_main(rs);
3525     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3526     qemu_fflush(f);
3527     ram_counters.transferred += 8;
3528 
3529     ret = qemu_file_get_error(f);
3530     if (ret < 0) {
3531         return ret;
3532     }
3533 
3534     return done;
3535 }
3536 
3537 /**
3538  * ram_save_complete: function called to send the remaining amount of ram
3539  *
3540  * Returns zero to indicate success or negative on error
3541  *
3542  * Called with iothread lock
3543  *
3544  * @f: QEMUFile where to send the data
3545  * @opaque: RAMState pointer
3546  */
3547 static int ram_save_complete(QEMUFile *f, void *opaque)
3548 {
3549     RAMState **temp = opaque;
3550     RAMState *rs = *temp;
3551     int ret = 0;
3552 
3553     WITH_RCU_READ_LOCK_GUARD() {
3554         if (!migration_in_postcopy()) {
3555             migration_bitmap_sync_precopy(rs);
3556         }
3557 
3558         ram_control_before_iterate(f, RAM_CONTROL_FINISH);
3559 
3560         /* try transferring iterative blocks of memory */
3561 
3562         /* flush all remaining blocks regardless of rate limiting */
3563         while (true) {
3564             int pages;
3565 
3566             pages = ram_find_and_save_block(rs, !migration_in_colo_state());
3567             /* no more blocks to sent */
3568             if (pages == 0) {
3569                 break;
3570             }
3571             if (pages < 0) {
3572                 ret = pages;
3573                 break;
3574             }
3575         }
3576 
3577         flush_compressed_data(rs);
3578         ram_control_after_iterate(f, RAM_CONTROL_FINISH);
3579     }
3580 
3581     multifd_send_sync_main(rs);
3582     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
3583     qemu_fflush(f);
3584 
3585     return ret;
3586 }
3587 
3588 static void ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size,
3589                              uint64_t *res_precopy_only,
3590                              uint64_t *res_compatible,
3591                              uint64_t *res_postcopy_only)
3592 {
3593     RAMState **temp = opaque;
3594     RAMState *rs = *temp;
3595     uint64_t remaining_size;
3596 
3597     remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3598 
3599     if (!migration_in_postcopy() &&
3600         remaining_size < max_size) {
3601         qemu_mutex_lock_iothread();
3602         WITH_RCU_READ_LOCK_GUARD() {
3603             migration_bitmap_sync_precopy(rs);
3604         }
3605         qemu_mutex_unlock_iothread();
3606         remaining_size = rs->migration_dirty_pages * TARGET_PAGE_SIZE;
3607     }
3608 
3609     if (migrate_postcopy_ram()) {
3610         /* We can do postcopy, and all the data is postcopiable */
3611         *res_compatible += remaining_size;
3612     } else {
3613         *res_precopy_only += remaining_size;
3614     }
3615 }
3616 
3617 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
3618 {
3619     unsigned int xh_len;
3620     int xh_flags;
3621     uint8_t *loaded_data;
3622 
3623     /* extract RLE header */
3624     xh_flags = qemu_get_byte(f);
3625     xh_len = qemu_get_be16(f);
3626 
3627     if (xh_flags != ENCODING_FLAG_XBZRLE) {
3628         error_report("Failed to load XBZRLE page - wrong compression!");
3629         return -1;
3630     }
3631 
3632     if (xh_len > TARGET_PAGE_SIZE) {
3633         error_report("Failed to load XBZRLE page - len overflow!");
3634         return -1;
3635     }
3636     loaded_data = XBZRLE.decoded_buf;
3637     /* load data and decode */
3638     /* it can change loaded_data to point to an internal buffer */
3639     qemu_get_buffer_in_place(f, &loaded_data, xh_len);
3640 
3641     /* decode RLE */
3642     if (xbzrle_decode_buffer(loaded_data, xh_len, host,
3643                              TARGET_PAGE_SIZE) == -1) {
3644         error_report("Failed to load XBZRLE page - decode error!");
3645         return -1;
3646     }
3647 
3648     return 0;
3649 }
3650 
3651 /**
3652  * ram_block_from_stream: read a RAMBlock id from the migration stream
3653  *
3654  * Must be called from within a rcu critical section.
3655  *
3656  * Returns a pointer from within the RCU-protected ram_list.
3657  *
3658  * @f: QEMUFile where to read the data from
3659  * @flags: Page flags (mostly to see if it's a continuation of previous block)
3660  */
3661 static inline RAMBlock *ram_block_from_stream(QEMUFile *f, int flags)
3662 {
3663     static RAMBlock *block = NULL;
3664     char id[256];
3665     uint8_t len;
3666 
3667     if (flags & RAM_SAVE_FLAG_CONTINUE) {
3668         if (!block) {
3669             error_report("Ack, bad migration stream!");
3670             return NULL;
3671         }
3672         return block;
3673     }
3674 
3675     len = qemu_get_byte(f);
3676     qemu_get_buffer(f, (uint8_t *)id, len);
3677     id[len] = 0;
3678 
3679     block = qemu_ram_block_by_name(id);
3680     if (!block) {
3681         error_report("Can't find block %s", id);
3682         return NULL;
3683     }
3684 
3685     if (ramblock_is_ignored(block)) {
3686         error_report("block %s should not be migrated !", id);
3687         return NULL;
3688     }
3689 
3690     return block;
3691 }
3692 
3693 static inline void *host_from_ram_block_offset(RAMBlock *block,
3694                                                ram_addr_t offset)
3695 {
3696     if (!offset_in_ramblock(block, offset)) {
3697         return NULL;
3698     }
3699 
3700     return block->host + offset;
3701 }
3702 
3703 static inline void *colo_cache_from_block_offset(RAMBlock *block,
3704                                                  ram_addr_t offset)
3705 {
3706     if (!offset_in_ramblock(block, offset)) {
3707         return NULL;
3708     }
3709     if (!block->colo_cache) {
3710         error_report("%s: colo_cache is NULL in block :%s",
3711                      __func__, block->idstr);
3712         return NULL;
3713     }
3714 
3715     /*
3716     * During colo checkpoint, we need bitmap of these migrated pages.
3717     * It help us to decide which pages in ram cache should be flushed
3718     * into VM's RAM later.
3719     */
3720     if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) {
3721         ram_state->migration_dirty_pages++;
3722     }
3723     return block->colo_cache + offset;
3724 }
3725 
3726 /**
3727  * ram_handle_compressed: handle the zero page case
3728  *
3729  * If a page (or a whole RDMA chunk) has been
3730  * determined to be zero, then zap it.
3731  *
3732  * @host: host address for the zero page
3733  * @ch: what the page is filled from.  We only support zero
3734  * @size: size of the zero page
3735  */
3736 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
3737 {
3738     if (ch != 0 || !is_zero_range(host, size)) {
3739         memset(host, ch, size);
3740     }
3741 }
3742 
3743 /* return the size after decompression, or negative value on error */
3744 static int
3745 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
3746                      const uint8_t *source, size_t source_len)
3747 {
3748     int err;
3749 
3750     err = inflateReset(stream);
3751     if (err != Z_OK) {
3752         return -1;
3753     }
3754 
3755     stream->avail_in = source_len;
3756     stream->next_in = (uint8_t *)source;
3757     stream->avail_out = dest_len;
3758     stream->next_out = dest;
3759 
3760     err = inflate(stream, Z_NO_FLUSH);
3761     if (err != Z_STREAM_END) {
3762         return -1;
3763     }
3764 
3765     return stream->total_out;
3766 }
3767 
3768 static void *do_data_decompress(void *opaque)
3769 {
3770     DecompressParam *param = opaque;
3771     unsigned long pagesize;
3772     uint8_t *des;
3773     int len, ret;
3774 
3775     qemu_mutex_lock(&param->mutex);
3776     while (!param->quit) {
3777         if (param->des) {
3778             des = param->des;
3779             len = param->len;
3780             param->des = 0;
3781             qemu_mutex_unlock(&param->mutex);
3782 
3783             pagesize = TARGET_PAGE_SIZE;
3784 
3785             ret = qemu_uncompress_data(&param->stream, des, pagesize,
3786                                        param->compbuf, len);
3787             if (ret < 0 && migrate_get_current()->decompress_error_check) {
3788                 error_report("decompress data failed");
3789                 qemu_file_set_error(decomp_file, ret);
3790             }
3791 
3792             qemu_mutex_lock(&decomp_done_lock);
3793             param->done = true;
3794             qemu_cond_signal(&decomp_done_cond);
3795             qemu_mutex_unlock(&decomp_done_lock);
3796 
3797             qemu_mutex_lock(&param->mutex);
3798         } else {
3799             qemu_cond_wait(&param->cond, &param->mutex);
3800         }
3801     }
3802     qemu_mutex_unlock(&param->mutex);
3803 
3804     return NULL;
3805 }
3806 
3807 static int wait_for_decompress_done(void)
3808 {
3809     int idx, thread_count;
3810 
3811     if (!migrate_use_compression()) {
3812         return 0;
3813     }
3814 
3815     thread_count = migrate_decompress_threads();
3816     qemu_mutex_lock(&decomp_done_lock);
3817     for (idx = 0; idx < thread_count; idx++) {
3818         while (!decomp_param[idx].done) {
3819             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3820         }
3821     }
3822     qemu_mutex_unlock(&decomp_done_lock);
3823     return qemu_file_get_error(decomp_file);
3824 }
3825 
3826 static void compress_threads_load_cleanup(void)
3827 {
3828     int i, thread_count;
3829 
3830     if (!migrate_use_compression()) {
3831         return;
3832     }
3833     thread_count = migrate_decompress_threads();
3834     for (i = 0; i < thread_count; i++) {
3835         /*
3836          * we use it as a indicator which shows if the thread is
3837          * properly init'd or not
3838          */
3839         if (!decomp_param[i].compbuf) {
3840             break;
3841         }
3842 
3843         qemu_mutex_lock(&decomp_param[i].mutex);
3844         decomp_param[i].quit = true;
3845         qemu_cond_signal(&decomp_param[i].cond);
3846         qemu_mutex_unlock(&decomp_param[i].mutex);
3847     }
3848     for (i = 0; i < thread_count; i++) {
3849         if (!decomp_param[i].compbuf) {
3850             break;
3851         }
3852 
3853         qemu_thread_join(decompress_threads + i);
3854         qemu_mutex_destroy(&decomp_param[i].mutex);
3855         qemu_cond_destroy(&decomp_param[i].cond);
3856         inflateEnd(&decomp_param[i].stream);
3857         g_free(decomp_param[i].compbuf);
3858         decomp_param[i].compbuf = NULL;
3859     }
3860     g_free(decompress_threads);
3861     g_free(decomp_param);
3862     decompress_threads = NULL;
3863     decomp_param = NULL;
3864     decomp_file = NULL;
3865 }
3866 
3867 static int compress_threads_load_setup(QEMUFile *f)
3868 {
3869     int i, thread_count;
3870 
3871     if (!migrate_use_compression()) {
3872         return 0;
3873     }
3874 
3875     thread_count = migrate_decompress_threads();
3876     decompress_threads = g_new0(QemuThread, thread_count);
3877     decomp_param = g_new0(DecompressParam, thread_count);
3878     qemu_mutex_init(&decomp_done_lock);
3879     qemu_cond_init(&decomp_done_cond);
3880     decomp_file = f;
3881     for (i = 0; i < thread_count; i++) {
3882         if (inflateInit(&decomp_param[i].stream) != Z_OK) {
3883             goto exit;
3884         }
3885 
3886         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
3887         qemu_mutex_init(&decomp_param[i].mutex);
3888         qemu_cond_init(&decomp_param[i].cond);
3889         decomp_param[i].done = true;
3890         decomp_param[i].quit = false;
3891         qemu_thread_create(decompress_threads + i, "decompress",
3892                            do_data_decompress, decomp_param + i,
3893                            QEMU_THREAD_JOINABLE);
3894     }
3895     return 0;
3896 exit:
3897     compress_threads_load_cleanup();
3898     return -1;
3899 }
3900 
3901 static void decompress_data_with_multi_threads(QEMUFile *f,
3902                                                void *host, int len)
3903 {
3904     int idx, thread_count;
3905 
3906     thread_count = migrate_decompress_threads();
3907     qemu_mutex_lock(&decomp_done_lock);
3908     while (true) {
3909         for (idx = 0; idx < thread_count; idx++) {
3910             if (decomp_param[idx].done) {
3911                 decomp_param[idx].done = false;
3912                 qemu_mutex_lock(&decomp_param[idx].mutex);
3913                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
3914                 decomp_param[idx].des = host;
3915                 decomp_param[idx].len = len;
3916                 qemu_cond_signal(&decomp_param[idx].cond);
3917                 qemu_mutex_unlock(&decomp_param[idx].mutex);
3918                 break;
3919             }
3920         }
3921         if (idx < thread_count) {
3922             break;
3923         } else {
3924             qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
3925         }
3926     }
3927     qemu_mutex_unlock(&decomp_done_lock);
3928 }
3929 
3930 /*
3931  * colo cache: this is for secondary VM, we cache the whole
3932  * memory of the secondary VM, it is need to hold the global lock
3933  * to call this helper.
3934  */
3935 int colo_init_ram_cache(void)
3936 {
3937     RAMBlock *block;
3938 
3939     WITH_RCU_READ_LOCK_GUARD() {
3940         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3941             block->colo_cache = qemu_anon_ram_alloc(block->used_length,
3942                                                     NULL,
3943                                                     false);
3944             if (!block->colo_cache) {
3945                 error_report("%s: Can't alloc memory for COLO cache of block %s,"
3946                              "size 0x" RAM_ADDR_FMT, __func__, block->idstr,
3947                              block->used_length);
3948                 RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3949                     if (block->colo_cache) {
3950                         qemu_anon_ram_free(block->colo_cache, block->used_length);
3951                         block->colo_cache = NULL;
3952                     }
3953                 }
3954                 return -errno;
3955             }
3956             memcpy(block->colo_cache, block->host, block->used_length);
3957         }
3958     }
3959 
3960     /*
3961     * Record the dirty pages that sent by PVM, we use this dirty bitmap together
3962     * with to decide which page in cache should be flushed into SVM's RAM. Here
3963     * we use the same name 'ram_bitmap' as for migration.
3964     */
3965     if (ram_bytes_total()) {
3966         RAMBlock *block;
3967 
3968         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3969             unsigned long pages = block->max_length >> TARGET_PAGE_BITS;
3970 
3971             block->bmap = bitmap_new(pages);
3972             bitmap_set(block->bmap, 0, pages);
3973         }
3974     }
3975     ram_state = g_new0(RAMState, 1);
3976     ram_state->migration_dirty_pages = 0;
3977     qemu_mutex_init(&ram_state->bitmap_mutex);
3978     memory_global_dirty_log_start();
3979 
3980     return 0;
3981 }
3982 
3983 /* It is need to hold the global lock to call this helper */
3984 void colo_release_ram_cache(void)
3985 {
3986     RAMBlock *block;
3987 
3988     memory_global_dirty_log_stop();
3989     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3990         g_free(block->bmap);
3991         block->bmap = NULL;
3992     }
3993 
3994     WITH_RCU_READ_LOCK_GUARD() {
3995         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
3996             if (block->colo_cache) {
3997                 qemu_anon_ram_free(block->colo_cache, block->used_length);
3998                 block->colo_cache = NULL;
3999             }
4000         }
4001     }
4002     qemu_mutex_destroy(&ram_state->bitmap_mutex);
4003     g_free(ram_state);
4004     ram_state = NULL;
4005 }
4006 
4007 /**
4008  * ram_load_setup: Setup RAM for migration incoming side
4009  *
4010  * Returns zero to indicate success and negative for error
4011  *
4012  * @f: QEMUFile where to receive the data
4013  * @opaque: RAMState pointer
4014  */
4015 static int ram_load_setup(QEMUFile *f, void *opaque)
4016 {
4017     if (compress_threads_load_setup(f)) {
4018         return -1;
4019     }
4020 
4021     xbzrle_load_setup();
4022     ramblock_recv_map_init();
4023 
4024     return 0;
4025 }
4026 
4027 static int ram_load_cleanup(void *opaque)
4028 {
4029     RAMBlock *rb;
4030 
4031     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4032         qemu_ram_block_writeback(rb);
4033     }
4034 
4035     xbzrle_load_cleanup();
4036     compress_threads_load_cleanup();
4037 
4038     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4039         g_free(rb->receivedmap);
4040         rb->receivedmap = NULL;
4041     }
4042 
4043     return 0;
4044 }
4045 
4046 /**
4047  * ram_postcopy_incoming_init: allocate postcopy data structures
4048  *
4049  * Returns 0 for success and negative if there was one error
4050  *
4051  * @mis: current migration incoming state
4052  *
4053  * Allocate data structures etc needed by incoming migration with
4054  * postcopy-ram. postcopy-ram's similarly names
4055  * postcopy_ram_incoming_init does the work.
4056  */
4057 int ram_postcopy_incoming_init(MigrationIncomingState *mis)
4058 {
4059     return postcopy_ram_incoming_init(mis);
4060 }
4061 
4062 /**
4063  * ram_load_postcopy: load a page in postcopy case
4064  *
4065  * Returns 0 for success or -errno in case of error
4066  *
4067  * Called in postcopy mode by ram_load().
4068  * rcu_read_lock is taken prior to this being called.
4069  *
4070  * @f: QEMUFile where to send the data
4071  */
4072 static int ram_load_postcopy(QEMUFile *f)
4073 {
4074     int flags = 0, ret = 0;
4075     bool place_needed = false;
4076     bool matches_target_page_size = false;
4077     MigrationIncomingState *mis = migration_incoming_get_current();
4078     /* Temporary page that is later 'placed' */
4079     void *postcopy_host_page = mis->postcopy_tmp_page;
4080     void *this_host = NULL;
4081     bool all_zero = false;
4082     int target_pages = 0;
4083 
4084     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4085         ram_addr_t addr;
4086         void *host = NULL;
4087         void *page_buffer = NULL;
4088         void *place_source = NULL;
4089         RAMBlock *block = NULL;
4090         uint8_t ch;
4091         int len;
4092 
4093         addr = qemu_get_be64(f);
4094 
4095         /*
4096          * If qemu file error, we should stop here, and then "addr"
4097          * may be invalid
4098          */
4099         ret = qemu_file_get_error(f);
4100         if (ret) {
4101             break;
4102         }
4103 
4104         flags = addr & ~TARGET_PAGE_MASK;
4105         addr &= TARGET_PAGE_MASK;
4106 
4107         trace_ram_load_postcopy_loop((uint64_t)addr, flags);
4108         place_needed = false;
4109         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4110                      RAM_SAVE_FLAG_COMPRESS_PAGE)) {
4111             block = ram_block_from_stream(f, flags);
4112 
4113             host = host_from_ram_block_offset(block, addr);
4114             if (!host) {
4115                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4116                 ret = -EINVAL;
4117                 break;
4118             }
4119             target_pages++;
4120             matches_target_page_size = block->page_size == TARGET_PAGE_SIZE;
4121             /*
4122              * Postcopy requires that we place whole host pages atomically;
4123              * these may be huge pages for RAMBlocks that are backed by
4124              * hugetlbfs.
4125              * To make it atomic, the data is read into a temporary page
4126              * that's moved into place later.
4127              * The migration protocol uses,  possibly smaller, target-pages
4128              * however the source ensures it always sends all the components
4129              * of a host page in one chunk.
4130              */
4131             page_buffer = postcopy_host_page +
4132                           ((uintptr_t)host & (block->page_size - 1));
4133             /* If all TP are zero then we can optimise the place */
4134             if (target_pages == 1) {
4135                 all_zero = true;
4136                 this_host = (void *)QEMU_ALIGN_DOWN((uintptr_t)host,
4137                                                     block->page_size);
4138             } else {
4139                 /* not the 1st TP within the HP */
4140                 if (QEMU_ALIGN_DOWN((uintptr_t)host, block->page_size) !=
4141                     (uintptr_t)this_host) {
4142                     error_report("Non-same host page %p/%p",
4143                                   host, this_host);
4144                     ret = -EINVAL;
4145                     break;
4146                 }
4147             }
4148 
4149             /*
4150              * If it's the last part of a host page then we place the host
4151              * page
4152              */
4153             if (target_pages == (block->page_size / TARGET_PAGE_SIZE)) {
4154                 place_needed = true;
4155                 target_pages = 0;
4156             }
4157             place_source = postcopy_host_page;
4158         }
4159 
4160         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4161         case RAM_SAVE_FLAG_ZERO:
4162             ch = qemu_get_byte(f);
4163             /*
4164              * Can skip to set page_buffer when
4165              * this is a zero page and (block->page_size == TARGET_PAGE_SIZE).
4166              */
4167             if (ch || !matches_target_page_size) {
4168                 memset(page_buffer, ch, TARGET_PAGE_SIZE);
4169             }
4170             if (ch) {
4171                 all_zero = false;
4172             }
4173             break;
4174 
4175         case RAM_SAVE_FLAG_PAGE:
4176             all_zero = false;
4177             if (!matches_target_page_size) {
4178                 /* For huge pages, we always use temporary buffer */
4179                 qemu_get_buffer(f, page_buffer, TARGET_PAGE_SIZE);
4180             } else {
4181                 /*
4182                  * For small pages that matches target page size, we
4183                  * avoid the qemu_file copy.  Instead we directly use
4184                  * the buffer of QEMUFile to place the page.  Note: we
4185                  * cannot do any QEMUFile operation before using that
4186                  * buffer to make sure the buffer is valid when
4187                  * placing the page.
4188                  */
4189                 qemu_get_buffer_in_place(f, (uint8_t **)&place_source,
4190                                          TARGET_PAGE_SIZE);
4191             }
4192             break;
4193         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4194             all_zero = false;
4195             len = qemu_get_be32(f);
4196             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4197                 error_report("Invalid compressed data length: %d", len);
4198                 ret = -EINVAL;
4199                 break;
4200             }
4201             decompress_data_with_multi_threads(f, page_buffer, len);
4202             break;
4203 
4204         case RAM_SAVE_FLAG_EOS:
4205             /* normal exit */
4206             multifd_recv_sync_main();
4207             break;
4208         default:
4209             error_report("Unknown combination of migration flags: %#x"
4210                          " (postcopy mode)", flags);
4211             ret = -EINVAL;
4212             break;
4213         }
4214 
4215         /* Got the whole host page, wait for decompress before placing. */
4216         if (place_needed) {
4217             ret |= wait_for_decompress_done();
4218         }
4219 
4220         /* Detect for any possible file errors */
4221         if (!ret && qemu_file_get_error(f)) {
4222             ret = qemu_file_get_error(f);
4223         }
4224 
4225         if (!ret && place_needed) {
4226             /* This gets called at the last target page in the host page */
4227             void *place_dest = (void *)QEMU_ALIGN_DOWN((uintptr_t)host,
4228                                                        block->page_size);
4229 
4230             if (all_zero) {
4231                 ret = postcopy_place_page_zero(mis, place_dest,
4232                                                block);
4233             } else {
4234                 ret = postcopy_place_page(mis, place_dest,
4235                                           place_source, block);
4236             }
4237         }
4238     }
4239 
4240     return ret;
4241 }
4242 
4243 static bool postcopy_is_advised(void)
4244 {
4245     PostcopyState ps = postcopy_state_get();
4246     return ps >= POSTCOPY_INCOMING_ADVISE && ps < POSTCOPY_INCOMING_END;
4247 }
4248 
4249 static bool postcopy_is_running(void)
4250 {
4251     PostcopyState ps = postcopy_state_get();
4252     return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END;
4253 }
4254 
4255 /*
4256  * Flush content of RAM cache into SVM's memory.
4257  * Only flush the pages that be dirtied by PVM or SVM or both.
4258  */
4259 static void colo_flush_ram_cache(void)
4260 {
4261     RAMBlock *block = NULL;
4262     void *dst_host;
4263     void *src_host;
4264     unsigned long offset = 0;
4265 
4266     memory_global_dirty_log_sync();
4267     WITH_RCU_READ_LOCK_GUARD() {
4268         RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4269             ramblock_sync_dirty_bitmap(ram_state, block);
4270         }
4271     }
4272 
4273     trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages);
4274     WITH_RCU_READ_LOCK_GUARD() {
4275         block = QLIST_FIRST_RCU(&ram_list.blocks);
4276 
4277         while (block) {
4278             offset = migration_bitmap_find_dirty(ram_state, block, offset);
4279 
4280             if (((ram_addr_t)offset) << TARGET_PAGE_BITS
4281                 >= block->used_length) {
4282                 offset = 0;
4283                 block = QLIST_NEXT_RCU(block, next);
4284             } else {
4285                 migration_bitmap_clear_dirty(ram_state, block, offset);
4286                 dst_host = block->host
4287                          + (((ram_addr_t)offset) << TARGET_PAGE_BITS);
4288                 src_host = block->colo_cache
4289                          + (((ram_addr_t)offset) << TARGET_PAGE_BITS);
4290                 memcpy(dst_host, src_host, TARGET_PAGE_SIZE);
4291             }
4292         }
4293     }
4294     trace_colo_flush_ram_cache_end();
4295 }
4296 
4297 /**
4298  * ram_load_precopy: load pages in precopy case
4299  *
4300  * Returns 0 for success or -errno in case of error
4301  *
4302  * Called in precopy mode by ram_load().
4303  * rcu_read_lock is taken prior to this being called.
4304  *
4305  * @f: QEMUFile where to send the data
4306  */
4307 static int ram_load_precopy(QEMUFile *f)
4308 {
4309     int flags = 0, ret = 0, invalid_flags = 0, len = 0, i = 0;
4310     /* ADVISE is earlier, it shows the source has the postcopy capability on */
4311     bool postcopy_advised = postcopy_is_advised();
4312     if (!migrate_use_compression()) {
4313         invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE;
4314     }
4315 
4316     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
4317         ram_addr_t addr, total_ram_bytes;
4318         void *host = NULL;
4319         uint8_t ch;
4320 
4321         /*
4322          * Yield periodically to let main loop run, but an iteration of
4323          * the main loop is expensive, so do it each some iterations
4324          */
4325         if ((i & 32767) == 0 && qemu_in_coroutine()) {
4326             aio_co_schedule(qemu_get_current_aio_context(),
4327                             qemu_coroutine_self());
4328             qemu_coroutine_yield();
4329         }
4330         i++;
4331 
4332         addr = qemu_get_be64(f);
4333         flags = addr & ~TARGET_PAGE_MASK;
4334         addr &= TARGET_PAGE_MASK;
4335 
4336         if (flags & invalid_flags) {
4337             if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) {
4338                 error_report("Received an unexpected compressed page");
4339             }
4340 
4341             ret = -EINVAL;
4342             break;
4343         }
4344 
4345         if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE |
4346                      RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) {
4347             RAMBlock *block = ram_block_from_stream(f, flags);
4348 
4349             /*
4350              * After going into COLO, we should load the Page into colo_cache.
4351              */
4352             if (migration_incoming_in_colo_state()) {
4353                 host = colo_cache_from_block_offset(block, addr);
4354             } else {
4355                 host = host_from_ram_block_offset(block, addr);
4356             }
4357             if (!host) {
4358                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
4359                 ret = -EINVAL;
4360                 break;
4361             }
4362 
4363             if (!migration_incoming_in_colo_state()) {
4364                 ramblock_recv_bitmap_set(block, host);
4365             }
4366 
4367             trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host);
4368         }
4369 
4370         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
4371         case RAM_SAVE_FLAG_MEM_SIZE:
4372             /* Synchronize RAM block list */
4373             total_ram_bytes = addr;
4374             while (!ret && total_ram_bytes) {
4375                 RAMBlock *block;
4376                 char id[256];
4377                 ram_addr_t length;
4378 
4379                 len = qemu_get_byte(f);
4380                 qemu_get_buffer(f, (uint8_t *)id, len);
4381                 id[len] = 0;
4382                 length = qemu_get_be64(f);
4383 
4384                 block = qemu_ram_block_by_name(id);
4385                 if (block && !qemu_ram_is_migratable(block)) {
4386                     error_report("block %s should not be migrated !", id);
4387                     ret = -EINVAL;
4388                 } else if (block) {
4389                     if (length != block->used_length) {
4390                         Error *local_err = NULL;
4391 
4392                         ret = qemu_ram_resize(block, length,
4393                                               &local_err);
4394                         if (local_err) {
4395                             error_report_err(local_err);
4396                         }
4397                     }
4398                     /* For postcopy we need to check hugepage sizes match */
4399                     if (postcopy_advised &&
4400                         block->page_size != qemu_host_page_size) {
4401                         uint64_t remote_page_size = qemu_get_be64(f);
4402                         if (remote_page_size != block->page_size) {
4403                             error_report("Mismatched RAM page size %s "
4404                                          "(local) %zd != %" PRId64,
4405                                          id, block->page_size,
4406                                          remote_page_size);
4407                             ret = -EINVAL;
4408                         }
4409                     }
4410                     if (migrate_ignore_shared()) {
4411                         hwaddr addr = qemu_get_be64(f);
4412                         if (ramblock_is_ignored(block) &&
4413                             block->mr->addr != addr) {
4414                             error_report("Mismatched GPAs for block %s "
4415                                          "%" PRId64 "!= %" PRId64,
4416                                          id, (uint64_t)addr,
4417                                          (uint64_t)block->mr->addr);
4418                             ret = -EINVAL;
4419                         }
4420                     }
4421                     ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
4422                                           block->idstr);
4423                 } else {
4424                     error_report("Unknown ramblock \"%s\", cannot "
4425                                  "accept migration", id);
4426                     ret = -EINVAL;
4427                 }
4428 
4429                 total_ram_bytes -= length;
4430             }
4431             break;
4432 
4433         case RAM_SAVE_FLAG_ZERO:
4434             ch = qemu_get_byte(f);
4435             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
4436             break;
4437 
4438         case RAM_SAVE_FLAG_PAGE:
4439             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
4440             break;
4441 
4442         case RAM_SAVE_FLAG_COMPRESS_PAGE:
4443             len = qemu_get_be32(f);
4444             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
4445                 error_report("Invalid compressed data length: %d", len);
4446                 ret = -EINVAL;
4447                 break;
4448             }
4449             decompress_data_with_multi_threads(f, host, len);
4450             break;
4451 
4452         case RAM_SAVE_FLAG_XBZRLE:
4453             if (load_xbzrle(f, addr, host) < 0) {
4454                 error_report("Failed to decompress XBZRLE page at "
4455                              RAM_ADDR_FMT, addr);
4456                 ret = -EINVAL;
4457                 break;
4458             }
4459             break;
4460         case RAM_SAVE_FLAG_EOS:
4461             /* normal exit */
4462             multifd_recv_sync_main();
4463             break;
4464         default:
4465             if (flags & RAM_SAVE_FLAG_HOOK) {
4466                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
4467             } else {
4468                 error_report("Unknown combination of migration flags: %#x",
4469                              flags);
4470                 ret = -EINVAL;
4471             }
4472         }
4473         if (!ret) {
4474             ret = qemu_file_get_error(f);
4475         }
4476     }
4477 
4478     ret |= wait_for_decompress_done();
4479     return ret;
4480 }
4481 
4482 static int ram_load(QEMUFile *f, void *opaque, int version_id)
4483 {
4484     int ret = 0;
4485     static uint64_t seq_iter;
4486     /*
4487      * If system is running in postcopy mode, page inserts to host memory must
4488      * be atomic
4489      */
4490     bool postcopy_running = postcopy_is_running();
4491 
4492     seq_iter++;
4493 
4494     if (version_id != 4) {
4495         return -EINVAL;
4496     }
4497 
4498     /*
4499      * This RCU critical section can be very long running.
4500      * When RCU reclaims in the code start to become numerous,
4501      * it will be necessary to reduce the granularity of this
4502      * critical section.
4503      */
4504     WITH_RCU_READ_LOCK_GUARD() {
4505         if (postcopy_running) {
4506             ret = ram_load_postcopy(f);
4507         } else {
4508             ret = ram_load_precopy(f);
4509         }
4510     }
4511     trace_ram_load_complete(ret, seq_iter);
4512 
4513     if (!ret  && migration_incoming_in_colo_state()) {
4514         colo_flush_ram_cache();
4515     }
4516     return ret;
4517 }
4518 
4519 static bool ram_has_postcopy(void *opaque)
4520 {
4521     RAMBlock *rb;
4522     RAMBLOCK_FOREACH_NOT_IGNORED(rb) {
4523         if (ramblock_is_pmem(rb)) {
4524             info_report("Block: %s, host: %p is a nvdimm memory, postcopy"
4525                          "is not supported now!", rb->idstr, rb->host);
4526             return false;
4527         }
4528     }
4529 
4530     return migrate_postcopy_ram();
4531 }
4532 
4533 /* Sync all the dirty bitmap with destination VM.  */
4534 static int ram_dirty_bitmap_sync_all(MigrationState *s, RAMState *rs)
4535 {
4536     RAMBlock *block;
4537     QEMUFile *file = s->to_dst_file;
4538     int ramblock_count = 0;
4539 
4540     trace_ram_dirty_bitmap_sync_start();
4541 
4542     RAMBLOCK_FOREACH_NOT_IGNORED(block) {
4543         qemu_savevm_send_recv_bitmap(file, block->idstr);
4544         trace_ram_dirty_bitmap_request(block->idstr);
4545         ramblock_count++;
4546     }
4547 
4548     trace_ram_dirty_bitmap_sync_wait();
4549 
4550     /* Wait until all the ramblocks' dirty bitmap synced */
4551     while (ramblock_count--) {
4552         qemu_sem_wait(&s->rp_state.rp_sem);
4553     }
4554 
4555     trace_ram_dirty_bitmap_sync_complete();
4556 
4557     return 0;
4558 }
4559 
4560 static void ram_dirty_bitmap_reload_notify(MigrationState *s)
4561 {
4562     qemu_sem_post(&s->rp_state.rp_sem);
4563 }
4564 
4565 /*
4566  * Read the received bitmap, revert it as the initial dirty bitmap.
4567  * This is only used when the postcopy migration is paused but wants
4568  * to resume from a middle point.
4569  */
4570 int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *block)
4571 {
4572     int ret = -EINVAL;
4573     QEMUFile *file = s->rp_state.from_dst_file;
4574     unsigned long *le_bitmap, nbits = block->used_length >> TARGET_PAGE_BITS;
4575     uint64_t local_size = DIV_ROUND_UP(nbits, 8);
4576     uint64_t size, end_mark;
4577 
4578     trace_ram_dirty_bitmap_reload_begin(block->idstr);
4579 
4580     if (s->state != MIGRATION_STATUS_POSTCOPY_RECOVER) {
4581         error_report("%s: incorrect state %s", __func__,
4582                      MigrationStatus_str(s->state));
4583         return -EINVAL;
4584     }
4585 
4586     /*
4587      * Note: see comments in ramblock_recv_bitmap_send() on why we
4588      * need the endianess convertion, and the paddings.
4589      */
4590     local_size = ROUND_UP(local_size, 8);
4591 
4592     /* Add paddings */
4593     le_bitmap = bitmap_new(nbits + BITS_PER_LONG);
4594 
4595     size = qemu_get_be64(file);
4596 
4597     /* The size of the bitmap should match with our ramblock */
4598     if (size != local_size) {
4599         error_report("%s: ramblock '%s' bitmap size mismatch "
4600                      "(0x%"PRIx64" != 0x%"PRIx64")", __func__,
4601                      block->idstr, size, local_size);
4602         ret = -EINVAL;
4603         goto out;
4604     }
4605 
4606     size = qemu_get_buffer(file, (uint8_t *)le_bitmap, local_size);
4607     end_mark = qemu_get_be64(file);
4608 
4609     ret = qemu_file_get_error(file);
4610     if (ret || size != local_size) {
4611         error_report("%s: read bitmap failed for ramblock '%s': %d"
4612                      " (size 0x%"PRIx64", got: 0x%"PRIx64")",
4613                      __func__, block->idstr, ret, local_size, size);
4614         ret = -EIO;
4615         goto out;
4616     }
4617 
4618     if (end_mark != RAMBLOCK_RECV_BITMAP_ENDING) {
4619         error_report("%s: ramblock '%s' end mark incorrect: 0x%"PRIu64,
4620                      __func__, block->idstr, end_mark);
4621         ret = -EINVAL;
4622         goto out;
4623     }
4624 
4625     /*
4626      * Endianess convertion. We are during postcopy (though paused).
4627      * The dirty bitmap won't change. We can directly modify it.
4628      */
4629     bitmap_from_le(block->bmap, le_bitmap, nbits);
4630 
4631     /*
4632      * What we received is "received bitmap". Revert it as the initial
4633      * dirty bitmap for this ramblock.
4634      */
4635     bitmap_complement(block->bmap, block->bmap, nbits);
4636 
4637     trace_ram_dirty_bitmap_reload_complete(block->idstr);
4638 
4639     /*
4640      * We succeeded to sync bitmap for current ramblock. If this is
4641      * the last one to sync, we need to notify the main send thread.
4642      */
4643     ram_dirty_bitmap_reload_notify(s);
4644 
4645     ret = 0;
4646 out:
4647     g_free(le_bitmap);
4648     return ret;
4649 }
4650 
4651 static int ram_resume_prepare(MigrationState *s, void *opaque)
4652 {
4653     RAMState *rs = *(RAMState **)opaque;
4654     int ret;
4655 
4656     ret = ram_dirty_bitmap_sync_all(s, rs);
4657     if (ret) {
4658         return ret;
4659     }
4660 
4661     ram_state_resume_prepare(rs, s->to_dst_file);
4662 
4663     return 0;
4664 }
4665 
4666 static SaveVMHandlers savevm_ram_handlers = {
4667     .save_setup = ram_save_setup,
4668     .save_live_iterate = ram_save_iterate,
4669     .save_live_complete_postcopy = ram_save_complete,
4670     .save_live_complete_precopy = ram_save_complete,
4671     .has_postcopy = ram_has_postcopy,
4672     .save_live_pending = ram_save_pending,
4673     .load_state = ram_load,
4674     .save_cleanup = ram_save_cleanup,
4675     .load_setup = ram_load_setup,
4676     .load_cleanup = ram_load_cleanup,
4677     .resume_prepare = ram_resume_prepare,
4678 };
4679 
4680 void ram_mig_init(void)
4681 {
4682     qemu_mutex_init(&XBZRLE.lock);
4683     register_savevm_live("ram", 0, 4, &savevm_ram_handlers, &ram_state);
4684 }
4685