xref: /openbmc/qemu/migration/ram.c (revision d73abd6d)
1 /*
2  * QEMU System Emulator
3  *
4  * Copyright (c) 2003-2008 Fabrice Bellard
5  * Copyright (c) 2011-2015 Red Hat Inc
6  *
7  * Authors:
8  *  Juan Quintela <quintela@redhat.com>
9  *
10  * Permission is hereby granted, free of charge, to any person obtaining a copy
11  * of this software and associated documentation files (the "Software"), to deal
12  * in the Software without restriction, including without limitation the rights
13  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14  * copies of the Software, and to permit persons to whom the Software is
15  * furnished to do so, subject to the following conditions:
16  *
17  * The above copyright notice and this permission notice shall be included in
18  * all copies or substantial portions of the Software.
19  *
20  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26  * THE SOFTWARE.
27  */
28 #include <stdint.h>
29 #include <zlib.h>
30 #include "qemu/bitops.h"
31 #include "qemu/bitmap.h"
32 #include "qemu/timer.h"
33 #include "qemu/main-loop.h"
34 #include "migration/migration.h"
35 #include "exec/address-spaces.h"
36 #include "migration/page_cache.h"
37 #include "qemu/error-report.h"
38 #include "trace.h"
39 #include "exec/ram_addr.h"
40 #include "qemu/rcu_queue.h"
41 
42 #ifdef DEBUG_MIGRATION_RAM
43 #define DPRINTF(fmt, ...) \
44     do { fprintf(stdout, "migration_ram: " fmt, ## __VA_ARGS__); } while (0)
45 #else
46 #define DPRINTF(fmt, ...) \
47     do { } while (0)
48 #endif
49 
50 static int dirty_rate_high_cnt;
51 
52 static uint64_t bitmap_sync_count;
53 
54 /***********************************************************/
55 /* ram save/restore */
56 
57 #define RAM_SAVE_FLAG_FULL     0x01 /* Obsolete, not used anymore */
58 #define RAM_SAVE_FLAG_COMPRESS 0x02
59 #define RAM_SAVE_FLAG_MEM_SIZE 0x04
60 #define RAM_SAVE_FLAG_PAGE     0x08
61 #define RAM_SAVE_FLAG_EOS      0x10
62 #define RAM_SAVE_FLAG_CONTINUE 0x20
63 #define RAM_SAVE_FLAG_XBZRLE   0x40
64 /* 0x80 is reserved in migration.h start with 0x100 next */
65 #define RAM_SAVE_FLAG_COMPRESS_PAGE    0x100
66 
67 static const uint8_t ZERO_TARGET_PAGE[TARGET_PAGE_SIZE];
68 
69 static inline bool is_zero_range(uint8_t *p, uint64_t size)
70 {
71     return buffer_find_nonzero_offset(p, size) == size;
72 }
73 
74 /* struct contains XBZRLE cache and a static page
75    used by the compression */
76 static struct {
77     /* buffer used for XBZRLE encoding */
78     uint8_t *encoded_buf;
79     /* buffer for storing page content */
80     uint8_t *current_buf;
81     /* Cache for XBZRLE, Protected by lock. */
82     PageCache *cache;
83     QemuMutex lock;
84 } XBZRLE;
85 
86 /* buffer used for XBZRLE decoding */
87 static uint8_t *xbzrle_decoded_buf;
88 
89 static void XBZRLE_cache_lock(void)
90 {
91     if (migrate_use_xbzrle())
92         qemu_mutex_lock(&XBZRLE.lock);
93 }
94 
95 static void XBZRLE_cache_unlock(void)
96 {
97     if (migrate_use_xbzrle())
98         qemu_mutex_unlock(&XBZRLE.lock);
99 }
100 
101 /*
102  * called from qmp_migrate_set_cache_size in main thread, possibly while
103  * a migration is in progress.
104  * A running migration maybe using the cache and might finish during this
105  * call, hence changes to the cache are protected by XBZRLE.lock().
106  */
107 int64_t xbzrle_cache_resize(int64_t new_size)
108 {
109     PageCache *new_cache;
110     int64_t ret;
111 
112     if (new_size < TARGET_PAGE_SIZE) {
113         return -1;
114     }
115 
116     XBZRLE_cache_lock();
117 
118     if (XBZRLE.cache != NULL) {
119         if (pow2floor(new_size) == migrate_xbzrle_cache_size()) {
120             goto out_new_size;
121         }
122         new_cache = cache_init(new_size / TARGET_PAGE_SIZE,
123                                         TARGET_PAGE_SIZE);
124         if (!new_cache) {
125             error_report("Error creating cache");
126             ret = -1;
127             goto out;
128         }
129 
130         cache_fini(XBZRLE.cache);
131         XBZRLE.cache = new_cache;
132     }
133 
134 out_new_size:
135     ret = pow2floor(new_size);
136 out:
137     XBZRLE_cache_unlock();
138     return ret;
139 }
140 
141 /* accounting for migration statistics */
142 typedef struct AccountingInfo {
143     uint64_t dup_pages;
144     uint64_t skipped_pages;
145     uint64_t norm_pages;
146     uint64_t iterations;
147     uint64_t xbzrle_bytes;
148     uint64_t xbzrle_pages;
149     uint64_t xbzrle_cache_miss;
150     double xbzrle_cache_miss_rate;
151     uint64_t xbzrle_overflows;
152 } AccountingInfo;
153 
154 static AccountingInfo acct_info;
155 
156 static void acct_clear(void)
157 {
158     memset(&acct_info, 0, sizeof(acct_info));
159 }
160 
161 uint64_t dup_mig_bytes_transferred(void)
162 {
163     return acct_info.dup_pages * TARGET_PAGE_SIZE;
164 }
165 
166 uint64_t dup_mig_pages_transferred(void)
167 {
168     return acct_info.dup_pages;
169 }
170 
171 uint64_t skipped_mig_bytes_transferred(void)
172 {
173     return acct_info.skipped_pages * TARGET_PAGE_SIZE;
174 }
175 
176 uint64_t skipped_mig_pages_transferred(void)
177 {
178     return acct_info.skipped_pages;
179 }
180 
181 uint64_t norm_mig_bytes_transferred(void)
182 {
183     return acct_info.norm_pages * TARGET_PAGE_SIZE;
184 }
185 
186 uint64_t norm_mig_pages_transferred(void)
187 {
188     return acct_info.norm_pages;
189 }
190 
191 uint64_t xbzrle_mig_bytes_transferred(void)
192 {
193     return acct_info.xbzrle_bytes;
194 }
195 
196 uint64_t xbzrle_mig_pages_transferred(void)
197 {
198     return acct_info.xbzrle_pages;
199 }
200 
201 uint64_t xbzrle_mig_pages_cache_miss(void)
202 {
203     return acct_info.xbzrle_cache_miss;
204 }
205 
206 double xbzrle_mig_cache_miss_rate(void)
207 {
208     return acct_info.xbzrle_cache_miss_rate;
209 }
210 
211 uint64_t xbzrle_mig_pages_overflow(void)
212 {
213     return acct_info.xbzrle_overflows;
214 }
215 
216 /* This is the last block that we have visited serching for dirty pages
217  */
218 static RAMBlock *last_seen_block;
219 /* This is the last block from where we have sent data */
220 static RAMBlock *last_sent_block;
221 static ram_addr_t last_offset;
222 static QemuMutex migration_bitmap_mutex;
223 static uint64_t migration_dirty_pages;
224 static uint32_t last_version;
225 static bool ram_bulk_stage;
226 
227 /* used by the search for pages to send */
228 struct PageSearchStatus {
229     /* Current block being searched */
230     RAMBlock    *block;
231     /* Current offset to search from */
232     ram_addr_t   offset;
233     /* Set once we wrap around */
234     bool         complete_round;
235 };
236 typedef struct PageSearchStatus PageSearchStatus;
237 
238 static struct BitmapRcu {
239     struct rcu_head rcu;
240     unsigned long *bmap;
241 } *migration_bitmap_rcu;
242 
243 struct CompressParam {
244     bool start;
245     bool done;
246     QEMUFile *file;
247     QemuMutex mutex;
248     QemuCond cond;
249     RAMBlock *block;
250     ram_addr_t offset;
251 };
252 typedef struct CompressParam CompressParam;
253 
254 struct DecompressParam {
255     bool start;
256     QemuMutex mutex;
257     QemuCond cond;
258     void *des;
259     uint8 *compbuf;
260     int len;
261 };
262 typedef struct DecompressParam DecompressParam;
263 
264 static CompressParam *comp_param;
265 static QemuThread *compress_threads;
266 /* comp_done_cond is used to wake up the migration thread when
267  * one of the compression threads has finished the compression.
268  * comp_done_lock is used to co-work with comp_done_cond.
269  */
270 static QemuMutex *comp_done_lock;
271 static QemuCond *comp_done_cond;
272 /* The empty QEMUFileOps will be used by file in CompressParam */
273 static const QEMUFileOps empty_ops = { };
274 
275 static bool compression_switch;
276 static bool quit_comp_thread;
277 static bool quit_decomp_thread;
278 static DecompressParam *decomp_param;
279 static QemuThread *decompress_threads;
280 static uint8_t *compressed_data_buf;
281 
282 static int do_compress_ram_page(CompressParam *param);
283 
284 static void *do_data_compress(void *opaque)
285 {
286     CompressParam *param = opaque;
287 
288     while (!quit_comp_thread) {
289         qemu_mutex_lock(&param->mutex);
290         /* Re-check the quit_comp_thread in case of
291          * terminate_compression_threads is called just before
292          * qemu_mutex_lock(&param->mutex) and after
293          * while(!quit_comp_thread), re-check it here can make
294          * sure the compression thread terminate as expected.
295          */
296         while (!param->start && !quit_comp_thread) {
297             qemu_cond_wait(&param->cond, &param->mutex);
298         }
299         if (!quit_comp_thread) {
300             do_compress_ram_page(param);
301         }
302         param->start = false;
303         qemu_mutex_unlock(&param->mutex);
304 
305         qemu_mutex_lock(comp_done_lock);
306         param->done = true;
307         qemu_cond_signal(comp_done_cond);
308         qemu_mutex_unlock(comp_done_lock);
309     }
310 
311     return NULL;
312 }
313 
314 static inline void terminate_compression_threads(void)
315 {
316     int idx, thread_count;
317 
318     thread_count = migrate_compress_threads();
319     quit_comp_thread = true;
320     for (idx = 0; idx < thread_count; idx++) {
321         qemu_mutex_lock(&comp_param[idx].mutex);
322         qemu_cond_signal(&comp_param[idx].cond);
323         qemu_mutex_unlock(&comp_param[idx].mutex);
324     }
325 }
326 
327 void migrate_compress_threads_join(void)
328 {
329     int i, thread_count;
330 
331     if (!migrate_use_compression()) {
332         return;
333     }
334     terminate_compression_threads();
335     thread_count = migrate_compress_threads();
336     for (i = 0; i < thread_count; i++) {
337         qemu_thread_join(compress_threads + i);
338         qemu_fclose(comp_param[i].file);
339         qemu_mutex_destroy(&comp_param[i].mutex);
340         qemu_cond_destroy(&comp_param[i].cond);
341     }
342     qemu_mutex_destroy(comp_done_lock);
343     qemu_cond_destroy(comp_done_cond);
344     g_free(compress_threads);
345     g_free(comp_param);
346     g_free(comp_done_cond);
347     g_free(comp_done_lock);
348     compress_threads = NULL;
349     comp_param = NULL;
350     comp_done_cond = NULL;
351     comp_done_lock = NULL;
352 }
353 
354 void migrate_compress_threads_create(void)
355 {
356     int i, thread_count;
357 
358     if (!migrate_use_compression()) {
359         return;
360     }
361     quit_comp_thread = false;
362     compression_switch = true;
363     thread_count = migrate_compress_threads();
364     compress_threads = g_new0(QemuThread, thread_count);
365     comp_param = g_new0(CompressParam, thread_count);
366     comp_done_cond = g_new0(QemuCond, 1);
367     comp_done_lock = g_new0(QemuMutex, 1);
368     qemu_cond_init(comp_done_cond);
369     qemu_mutex_init(comp_done_lock);
370     for (i = 0; i < thread_count; i++) {
371         /* com_param[i].file is just used as a dummy buffer to save data, set
372          * it's ops to empty.
373          */
374         comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
375         comp_param[i].done = true;
376         qemu_mutex_init(&comp_param[i].mutex);
377         qemu_cond_init(&comp_param[i].cond);
378         qemu_thread_create(compress_threads + i, "compress",
379                            do_data_compress, comp_param + i,
380                            QEMU_THREAD_JOINABLE);
381     }
382 }
383 
384 /**
385  * save_page_header: Write page header to wire
386  *
387  * If this is the 1st block, it also writes the block identification
388  *
389  * Returns: Number of bytes written
390  *
391  * @f: QEMUFile where to send the data
392  * @block: block that contains the page we want to send
393  * @offset: offset inside the block for the page
394  *          in the lower bits, it contains flags
395  */
396 static size_t save_page_header(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
397 {
398     size_t size, len;
399 
400     qemu_put_be64(f, offset);
401     size = 8;
402 
403     if (!(offset & RAM_SAVE_FLAG_CONTINUE)) {
404         len = strlen(block->idstr);
405         qemu_put_byte(f, len);
406         qemu_put_buffer(f, (uint8_t *)block->idstr, len);
407         size += 1 + len;
408     }
409     return size;
410 }
411 
412 /* Reduce amount of guest cpu execution to hopefully slow down memory writes.
413  * If guest dirty memory rate is reduced below the rate at which we can
414  * transfer pages to the destination then we should be able to complete
415  * migration. Some workloads dirty memory way too fast and will not effectively
416  * converge, even with auto-converge.
417  */
418 static void mig_throttle_guest_down(void)
419 {
420     MigrationState *s = migrate_get_current();
421     uint64_t pct_initial =
422             s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INITIAL];
423     uint64_t pct_icrement =
424             s->parameters[MIGRATION_PARAMETER_X_CPU_THROTTLE_INCREMENT];
425 
426     /* We have not started throttling yet. Let's start it. */
427     if (!cpu_throttle_active()) {
428         cpu_throttle_set(pct_initial);
429     } else {
430         /* Throttling already on, just increase the rate */
431         cpu_throttle_set(cpu_throttle_get_percentage() + pct_icrement);
432     }
433 }
434 
435 /* Update the xbzrle cache to reflect a page that's been sent as all 0.
436  * The important thing is that a stale (not-yet-0'd) page be replaced
437  * by the new data.
438  * As a bonus, if the page wasn't in the cache it gets added so that
439  * when a small write is made into the 0'd page it gets XBZRLE sent
440  */
441 static void xbzrle_cache_zero_page(ram_addr_t current_addr)
442 {
443     if (ram_bulk_stage || !migrate_use_xbzrle()) {
444         return;
445     }
446 
447     /* We don't care if this fails to allocate a new cache page
448      * as long as it updated an old one */
449     cache_insert(XBZRLE.cache, current_addr, ZERO_TARGET_PAGE,
450                  bitmap_sync_count);
451 }
452 
453 #define ENCODING_FLAG_XBZRLE 0x1
454 
455 /**
456  * save_xbzrle_page: compress and send current page
457  *
458  * Returns: 1 means that we wrote the page
459  *          0 means that page is identical to the one already sent
460  *          -1 means that xbzrle would be longer than normal
461  *
462  * @f: QEMUFile where to send the data
463  * @current_data:
464  * @current_addr:
465  * @block: block that contains the page we want to send
466  * @offset: offset inside the block for the page
467  * @last_stage: if we are at the completion stage
468  * @bytes_transferred: increase it with the number of transferred bytes
469  */
470 static int save_xbzrle_page(QEMUFile *f, uint8_t **current_data,
471                             ram_addr_t current_addr, RAMBlock *block,
472                             ram_addr_t offset, bool last_stage,
473                             uint64_t *bytes_transferred)
474 {
475     int encoded_len = 0, bytes_xbzrle;
476     uint8_t *prev_cached_page;
477 
478     if (!cache_is_cached(XBZRLE.cache, current_addr, bitmap_sync_count)) {
479         acct_info.xbzrle_cache_miss++;
480         if (!last_stage) {
481             if (cache_insert(XBZRLE.cache, current_addr, *current_data,
482                              bitmap_sync_count) == -1) {
483                 return -1;
484             } else {
485                 /* update *current_data when the page has been
486                    inserted into cache */
487                 *current_data = get_cached_data(XBZRLE.cache, current_addr);
488             }
489         }
490         return -1;
491     }
492 
493     prev_cached_page = get_cached_data(XBZRLE.cache, current_addr);
494 
495     /* save current buffer into memory */
496     memcpy(XBZRLE.current_buf, *current_data, TARGET_PAGE_SIZE);
497 
498     /* XBZRLE encoding (if there is no overflow) */
499     encoded_len = xbzrle_encode_buffer(prev_cached_page, XBZRLE.current_buf,
500                                        TARGET_PAGE_SIZE, XBZRLE.encoded_buf,
501                                        TARGET_PAGE_SIZE);
502     if (encoded_len == 0) {
503         DPRINTF("Skipping unmodified page\n");
504         return 0;
505     } else if (encoded_len == -1) {
506         DPRINTF("Overflow\n");
507         acct_info.xbzrle_overflows++;
508         /* update data in the cache */
509         if (!last_stage) {
510             memcpy(prev_cached_page, *current_data, TARGET_PAGE_SIZE);
511             *current_data = prev_cached_page;
512         }
513         return -1;
514     }
515 
516     /* we need to update the data in the cache, in order to get the same data */
517     if (!last_stage) {
518         memcpy(prev_cached_page, XBZRLE.current_buf, TARGET_PAGE_SIZE);
519     }
520 
521     /* Send XBZRLE based compressed page */
522     bytes_xbzrle = save_page_header(f, block, offset | RAM_SAVE_FLAG_XBZRLE);
523     qemu_put_byte(f, ENCODING_FLAG_XBZRLE);
524     qemu_put_be16(f, encoded_len);
525     qemu_put_buffer(f, XBZRLE.encoded_buf, encoded_len);
526     bytes_xbzrle += encoded_len + 1 + 2;
527     acct_info.xbzrle_pages++;
528     acct_info.xbzrle_bytes += bytes_xbzrle;
529     *bytes_transferred += bytes_xbzrle;
530 
531     return 1;
532 }
533 
534 /* Called with rcu_read_lock() to protect migration_bitmap */
535 static inline
536 ram_addr_t migration_bitmap_find_and_reset_dirty(RAMBlock *rb,
537                                                  ram_addr_t start)
538 {
539     unsigned long base = rb->offset >> TARGET_PAGE_BITS;
540     unsigned long nr = base + (start >> TARGET_PAGE_BITS);
541     uint64_t rb_size = rb->used_length;
542     unsigned long size = base + (rb_size >> TARGET_PAGE_BITS);
543     unsigned long *bitmap;
544 
545     unsigned long next;
546 
547     bitmap = atomic_rcu_read(&migration_bitmap_rcu)->bmap;
548     if (ram_bulk_stage && nr > base) {
549         next = nr + 1;
550     } else {
551         next = find_next_bit(bitmap, size, nr);
552     }
553 
554     if (next < size) {
555         clear_bit(next, bitmap);
556         migration_dirty_pages--;
557     }
558     return (next - base) << TARGET_PAGE_BITS;
559 }
560 
561 /* Called with rcu_read_lock() to protect migration_bitmap */
562 static void migration_bitmap_sync_range(ram_addr_t start, ram_addr_t length)
563 {
564     unsigned long *bitmap;
565     bitmap = atomic_rcu_read(&migration_bitmap_rcu)->bmap;
566     migration_dirty_pages +=
567         cpu_physical_memory_sync_dirty_bitmap(bitmap, start, length);
568 }
569 
570 /* Fix me: there are too many global variables used in migration process. */
571 static int64_t start_time;
572 static int64_t bytes_xfer_prev;
573 static int64_t num_dirty_pages_period;
574 static uint64_t xbzrle_cache_miss_prev;
575 static uint64_t iterations_prev;
576 
577 static void migration_bitmap_sync_init(void)
578 {
579     start_time = 0;
580     bytes_xfer_prev = 0;
581     num_dirty_pages_period = 0;
582     xbzrle_cache_miss_prev = 0;
583     iterations_prev = 0;
584 }
585 
586 /* Called with iothread lock held, to protect ram_list.dirty_memory[] */
587 static void migration_bitmap_sync(void)
588 {
589     RAMBlock *block;
590     uint64_t num_dirty_pages_init = migration_dirty_pages;
591     MigrationState *s = migrate_get_current();
592     int64_t end_time;
593     int64_t bytes_xfer_now;
594 
595     bitmap_sync_count++;
596 
597     if (!bytes_xfer_prev) {
598         bytes_xfer_prev = ram_bytes_transferred();
599     }
600 
601     if (!start_time) {
602         start_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
603     }
604 
605     trace_migration_bitmap_sync_start();
606     address_space_sync_dirty_bitmap(&address_space_memory);
607 
608     qemu_mutex_lock(&migration_bitmap_mutex);
609     rcu_read_lock();
610     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
611         migration_bitmap_sync_range(block->offset, block->used_length);
612     }
613     rcu_read_unlock();
614     qemu_mutex_unlock(&migration_bitmap_mutex);
615 
616     trace_migration_bitmap_sync_end(migration_dirty_pages
617                                     - num_dirty_pages_init);
618     num_dirty_pages_period += migration_dirty_pages - num_dirty_pages_init;
619     end_time = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
620 
621     /* more than 1 second = 1000 millisecons */
622     if (end_time > start_time + 1000) {
623         if (migrate_auto_converge()) {
624             /* The following detection logic can be refined later. For now:
625                Check to see if the dirtied bytes is 50% more than the approx.
626                amount of bytes that just got transferred since the last time we
627                were in this routine. If that happens twice, start or increase
628                throttling */
629             bytes_xfer_now = ram_bytes_transferred();
630 
631             if (s->dirty_pages_rate &&
632                (num_dirty_pages_period * TARGET_PAGE_SIZE >
633                    (bytes_xfer_now - bytes_xfer_prev)/2) &&
634                (dirty_rate_high_cnt++ >= 2)) {
635                     trace_migration_throttle();
636                     dirty_rate_high_cnt = 0;
637                     mig_throttle_guest_down();
638              }
639              bytes_xfer_prev = bytes_xfer_now;
640         }
641 
642         if (migrate_use_xbzrle()) {
643             if (iterations_prev != acct_info.iterations) {
644                 acct_info.xbzrle_cache_miss_rate =
645                    (double)(acct_info.xbzrle_cache_miss -
646                             xbzrle_cache_miss_prev) /
647                    (acct_info.iterations - iterations_prev);
648             }
649             iterations_prev = acct_info.iterations;
650             xbzrle_cache_miss_prev = acct_info.xbzrle_cache_miss;
651         }
652         s->dirty_pages_rate = num_dirty_pages_period * 1000
653             / (end_time - start_time);
654         s->dirty_bytes_rate = s->dirty_pages_rate * TARGET_PAGE_SIZE;
655         start_time = end_time;
656         num_dirty_pages_period = 0;
657     }
658     s->dirty_sync_count = bitmap_sync_count;
659 }
660 
661 /**
662  * save_zero_page: Send the zero page to the stream
663  *
664  * Returns: Number of pages written.
665  *
666  * @f: QEMUFile where to send the data
667  * @block: block that contains the page we want to send
668  * @offset: offset inside the block for the page
669  * @p: pointer to the page
670  * @bytes_transferred: increase it with the number of transferred bytes
671  */
672 static int save_zero_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset,
673                           uint8_t *p, uint64_t *bytes_transferred)
674 {
675     int pages = -1;
676 
677     if (is_zero_range(p, TARGET_PAGE_SIZE)) {
678         acct_info.dup_pages++;
679         *bytes_transferred += save_page_header(f, block,
680                                                offset | RAM_SAVE_FLAG_COMPRESS);
681         qemu_put_byte(f, 0);
682         *bytes_transferred += 1;
683         pages = 1;
684     }
685 
686     return pages;
687 }
688 
689 /**
690  * ram_save_page: Send the given page to the stream
691  *
692  * Returns: Number of pages written.
693  *
694  * @f: QEMUFile where to send the data
695  * @block: block that contains the page we want to send
696  * @offset: offset inside the block for the page
697  * @last_stage: if we are at the completion stage
698  * @bytes_transferred: increase it with the number of transferred bytes
699  */
700 static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
701                          bool last_stage, uint64_t *bytes_transferred)
702 {
703     int pages = -1;
704     uint64_t bytes_xmit;
705     ram_addr_t current_addr;
706     uint8_t *p;
707     int ret;
708     bool send_async = true;
709 
710     p = block->host + offset;
711 
712     /* In doubt sent page as normal */
713     bytes_xmit = 0;
714     ret = ram_control_save_page(f, block->offset,
715                            offset, TARGET_PAGE_SIZE, &bytes_xmit);
716     if (bytes_xmit) {
717         *bytes_transferred += bytes_xmit;
718         pages = 1;
719     }
720 
721     XBZRLE_cache_lock();
722 
723     current_addr = block->offset + offset;
724 
725     if (block == last_sent_block) {
726         offset |= RAM_SAVE_FLAG_CONTINUE;
727     }
728     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
729         if (ret != RAM_SAVE_CONTROL_DELAYED) {
730             if (bytes_xmit > 0) {
731                 acct_info.norm_pages++;
732             } else if (bytes_xmit == 0) {
733                 acct_info.dup_pages++;
734             }
735         }
736     } else {
737         pages = save_zero_page(f, block, offset, p, bytes_transferred);
738         if (pages > 0) {
739             /* Must let xbzrle know, otherwise a previous (now 0'd) cached
740              * page would be stale
741              */
742             xbzrle_cache_zero_page(current_addr);
743         } else if (!ram_bulk_stage && migrate_use_xbzrle()) {
744             pages = save_xbzrle_page(f, &p, current_addr, block,
745                                      offset, last_stage, bytes_transferred);
746             if (!last_stage) {
747                 /* Can't send this cached data async, since the cache page
748                  * might get updated before it gets to the wire
749                  */
750                 send_async = false;
751             }
752         }
753     }
754 
755     /* XBZRLE overflow or normal page */
756     if (pages == -1) {
757         *bytes_transferred += save_page_header(f, block,
758                                                offset | RAM_SAVE_FLAG_PAGE);
759         if (send_async) {
760             qemu_put_buffer_async(f, p, TARGET_PAGE_SIZE);
761         } else {
762             qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
763         }
764         *bytes_transferred += TARGET_PAGE_SIZE;
765         pages = 1;
766         acct_info.norm_pages++;
767     }
768 
769     XBZRLE_cache_unlock();
770 
771     return pages;
772 }
773 
774 static int do_compress_ram_page(CompressParam *param)
775 {
776     int bytes_sent, blen;
777     uint8_t *p;
778     RAMBlock *block = param->block;
779     ram_addr_t offset = param->offset;
780 
781     p = block->host + (offset & TARGET_PAGE_MASK);
782 
783     bytes_sent = save_page_header(param->file, block, offset |
784                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
785     blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
786                                      migrate_compress_level());
787     bytes_sent += blen;
788 
789     return bytes_sent;
790 }
791 
792 static inline void start_compression(CompressParam *param)
793 {
794     param->done = false;
795     qemu_mutex_lock(&param->mutex);
796     param->start = true;
797     qemu_cond_signal(&param->cond);
798     qemu_mutex_unlock(&param->mutex);
799 }
800 
801 static inline void start_decompression(DecompressParam *param)
802 {
803     qemu_mutex_lock(&param->mutex);
804     param->start = true;
805     qemu_cond_signal(&param->cond);
806     qemu_mutex_unlock(&param->mutex);
807 }
808 
809 static uint64_t bytes_transferred;
810 
811 static void flush_compressed_data(QEMUFile *f)
812 {
813     int idx, len, thread_count;
814 
815     if (!migrate_use_compression()) {
816         return;
817     }
818     thread_count = migrate_compress_threads();
819     for (idx = 0; idx < thread_count; idx++) {
820         if (!comp_param[idx].done) {
821             qemu_mutex_lock(comp_done_lock);
822             while (!comp_param[idx].done && !quit_comp_thread) {
823                 qemu_cond_wait(comp_done_cond, comp_done_lock);
824             }
825             qemu_mutex_unlock(comp_done_lock);
826         }
827         if (!quit_comp_thread) {
828             len = qemu_put_qemu_file(f, comp_param[idx].file);
829             bytes_transferred += len;
830         }
831     }
832 }
833 
834 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
835                                        ram_addr_t offset)
836 {
837     param->block = block;
838     param->offset = offset;
839 }
840 
841 static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
842                                            ram_addr_t offset,
843                                            uint64_t *bytes_transferred)
844 {
845     int idx, thread_count, bytes_xmit = -1, pages = -1;
846 
847     thread_count = migrate_compress_threads();
848     qemu_mutex_lock(comp_done_lock);
849     while (true) {
850         for (idx = 0; idx < thread_count; idx++) {
851             if (comp_param[idx].done) {
852                 bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
853                 set_compress_params(&comp_param[idx], block, offset);
854                 start_compression(&comp_param[idx]);
855                 pages = 1;
856                 acct_info.norm_pages++;
857                 *bytes_transferred += bytes_xmit;
858                 break;
859             }
860         }
861         if (pages > 0) {
862             break;
863         } else {
864             qemu_cond_wait(comp_done_cond, comp_done_lock);
865         }
866     }
867     qemu_mutex_unlock(comp_done_lock);
868 
869     return pages;
870 }
871 
872 /**
873  * ram_save_compressed_page: compress the given page and send it to the stream
874  *
875  * Returns: Number of pages written.
876  *
877  * @f: QEMUFile where to send the data
878  * @block: block that contains the page we want to send
879  * @offset: offset inside the block for the page
880  * @last_stage: if we are at the completion stage
881  * @bytes_transferred: increase it with the number of transferred bytes
882  */
883 static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
884                                     ram_addr_t offset, bool last_stage,
885                                     uint64_t *bytes_transferred)
886 {
887     int pages = -1;
888     uint64_t bytes_xmit;
889     uint8_t *p;
890     int ret;
891 
892     p = block->host + offset;
893 
894     bytes_xmit = 0;
895     ret = ram_control_save_page(f, block->offset,
896                                 offset, TARGET_PAGE_SIZE, &bytes_xmit);
897     if (bytes_xmit) {
898         *bytes_transferred += bytes_xmit;
899         pages = 1;
900     }
901     if (block == last_sent_block) {
902         offset |= RAM_SAVE_FLAG_CONTINUE;
903     }
904     if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
905         if (ret != RAM_SAVE_CONTROL_DELAYED) {
906             if (bytes_xmit > 0) {
907                 acct_info.norm_pages++;
908             } else if (bytes_xmit == 0) {
909                 acct_info.dup_pages++;
910             }
911         }
912     } else {
913         /* When starting the process of a new block, the first page of
914          * the block should be sent out before other pages in the same
915          * block, and all the pages in last block should have been sent
916          * out, keeping this order is important, because the 'cont' flag
917          * is used to avoid resending the block name.
918          */
919         if (block != last_sent_block) {
920             flush_compressed_data(f);
921             pages = save_zero_page(f, block, offset, p, bytes_transferred);
922             if (pages == -1) {
923                 set_compress_params(&comp_param[0], block, offset);
924                 /* Use the qemu thread to compress the data to make sure the
925                  * first page is sent out before other pages
926                  */
927                 bytes_xmit = do_compress_ram_page(&comp_param[0]);
928                 acct_info.norm_pages++;
929                 qemu_put_qemu_file(f, comp_param[0].file);
930                 *bytes_transferred += bytes_xmit;
931                 pages = 1;
932             }
933         } else {
934             pages = save_zero_page(f, block, offset, p, bytes_transferred);
935             if (pages == -1) {
936                 pages = compress_page_with_multi_thread(f, block, offset,
937                                                         bytes_transferred);
938             }
939         }
940     }
941 
942     return pages;
943 }
944 
945 /*
946  * Find the next dirty page and update any state associated with
947  * the search process.
948  *
949  * Returns: True if a page is found
950  *
951  * @f: Current migration stream.
952  * @pss: Data about the state of the current dirty page scan.
953  * @*again: Set to false if the search has scanned the whole of RAM
954  */
955 static bool find_dirty_block(QEMUFile *f, PageSearchStatus *pss,
956                              bool *again)
957 {
958     pss->offset = migration_bitmap_find_and_reset_dirty(pss->block,
959                                                        pss->offset);
960     if (pss->complete_round && pss->block == last_seen_block &&
961         pss->offset >= last_offset) {
962         /*
963          * We've been once around the RAM and haven't found anything.
964          * Give up.
965          */
966         *again = false;
967         return false;
968     }
969     if (pss->offset >= pss->block->used_length) {
970         /* Didn't find anything in this RAM Block */
971         pss->offset = 0;
972         pss->block = QLIST_NEXT_RCU(pss->block, next);
973         if (!pss->block) {
974             /* Hit the end of the list */
975             pss->block = QLIST_FIRST_RCU(&ram_list.blocks);
976             /* Flag that we've looped */
977             pss->complete_round = true;
978             ram_bulk_stage = false;
979             if (migrate_use_xbzrle()) {
980                 /* If xbzrle is on, stop using the data compression at this
981                  * point. In theory, xbzrle can do better than compression.
982                  */
983                 flush_compressed_data(f);
984                 compression_switch = false;
985             }
986         }
987         /* Didn't find anything this time, but try again on the new block */
988         *again = true;
989         return false;
990     } else {
991         /* Can go around again, but... */
992         *again = true;
993         /* We've found something so probably don't need to */
994         return true;
995     }
996 }
997 
998 /**
999  * ram_find_and_save_block: Finds a dirty page and sends it to f
1000  *
1001  * Called within an RCU critical section.
1002  *
1003  * Returns:  The number of pages written
1004  *           0 means no dirty pages
1005  *
1006  * @f: QEMUFile where to send the data
1007  * @last_stage: if we are at the completion stage
1008  * @bytes_transferred: increase it with the number of transferred bytes
1009  */
1010 
1011 static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
1012                                    uint64_t *bytes_transferred)
1013 {
1014     PageSearchStatus pss;
1015     int pages = 0;
1016     bool again, found;
1017 
1018     pss.block = last_seen_block;
1019     pss.offset = last_offset;
1020     pss.complete_round = false;
1021 
1022     if (!pss.block) {
1023         pss.block = QLIST_FIRST_RCU(&ram_list.blocks);
1024     }
1025 
1026     do {
1027         found = find_dirty_block(f, &pss, &again);
1028 
1029         if (found) {
1030             if (compression_switch && migrate_use_compression()) {
1031                 pages = ram_save_compressed_page(f, pss.block, pss.offset,
1032                                                  last_stage,
1033                                                  bytes_transferred);
1034             } else {
1035                 pages = ram_save_page(f, pss.block, pss.offset, last_stage,
1036                                       bytes_transferred);
1037             }
1038 
1039             /* if page is unmodified, continue to the next */
1040             if (pages > 0) {
1041                 last_sent_block = pss.block;
1042             }
1043         }
1044     } while (!pages && again);
1045 
1046     last_seen_block = pss.block;
1047     last_offset = pss.offset;
1048 
1049     return pages;
1050 }
1051 
1052 void acct_update_position(QEMUFile *f, size_t size, bool zero)
1053 {
1054     uint64_t pages = size / TARGET_PAGE_SIZE;
1055     if (zero) {
1056         acct_info.dup_pages += pages;
1057     } else {
1058         acct_info.norm_pages += pages;
1059         bytes_transferred += size;
1060         qemu_update_position(f, size);
1061     }
1062 }
1063 
1064 static ram_addr_t ram_save_remaining(void)
1065 {
1066     return migration_dirty_pages;
1067 }
1068 
1069 uint64_t ram_bytes_remaining(void)
1070 {
1071     return ram_save_remaining() * TARGET_PAGE_SIZE;
1072 }
1073 
1074 uint64_t ram_bytes_transferred(void)
1075 {
1076     return bytes_transferred;
1077 }
1078 
1079 uint64_t ram_bytes_total(void)
1080 {
1081     RAMBlock *block;
1082     uint64_t total = 0;
1083 
1084     rcu_read_lock();
1085     QLIST_FOREACH_RCU(block, &ram_list.blocks, next)
1086         total += block->used_length;
1087     rcu_read_unlock();
1088     return total;
1089 }
1090 
1091 void free_xbzrle_decoded_buf(void)
1092 {
1093     g_free(xbzrle_decoded_buf);
1094     xbzrle_decoded_buf = NULL;
1095 }
1096 
1097 static void migration_bitmap_free(struct BitmapRcu *bmap)
1098 {
1099     g_free(bmap->bmap);
1100     g_free(bmap);
1101 }
1102 
1103 static void ram_migration_cleanup(void *opaque)
1104 {
1105     /* caller have hold iothread lock or is in a bh, so there is
1106      * no writing race against this migration_bitmap
1107      */
1108     struct BitmapRcu *bitmap = migration_bitmap_rcu;
1109     atomic_rcu_set(&migration_bitmap_rcu, NULL);
1110     if (bitmap) {
1111         memory_global_dirty_log_stop();
1112         call_rcu(bitmap, migration_bitmap_free, rcu);
1113     }
1114 
1115     XBZRLE_cache_lock();
1116     if (XBZRLE.cache) {
1117         cache_fini(XBZRLE.cache);
1118         g_free(XBZRLE.encoded_buf);
1119         g_free(XBZRLE.current_buf);
1120         XBZRLE.cache = NULL;
1121         XBZRLE.encoded_buf = NULL;
1122         XBZRLE.current_buf = NULL;
1123     }
1124     XBZRLE_cache_unlock();
1125 }
1126 
1127 static void reset_ram_globals(void)
1128 {
1129     last_seen_block = NULL;
1130     last_sent_block = NULL;
1131     last_offset = 0;
1132     last_version = ram_list.version;
1133     ram_bulk_stage = true;
1134 }
1135 
1136 #define MAX_WAIT 50 /* ms, half buffered_file limit */
1137 
1138 void migration_bitmap_extend(ram_addr_t old, ram_addr_t new)
1139 {
1140     /* called in qemu main thread, so there is
1141      * no writing race against this migration_bitmap
1142      */
1143     if (migration_bitmap_rcu) {
1144         struct BitmapRcu *old_bitmap = migration_bitmap_rcu, *bitmap;
1145         bitmap = g_new(struct BitmapRcu, 1);
1146         bitmap->bmap = bitmap_new(new);
1147 
1148         /* prevent migration_bitmap content from being set bit
1149          * by migration_bitmap_sync_range() at the same time.
1150          * it is safe to migration if migration_bitmap is cleared bit
1151          * at the same time.
1152          */
1153         qemu_mutex_lock(&migration_bitmap_mutex);
1154         bitmap_copy(bitmap->bmap, old_bitmap->bmap, old);
1155         bitmap_set(bitmap->bmap, old, new - old);
1156         atomic_rcu_set(&migration_bitmap_rcu, bitmap);
1157         qemu_mutex_unlock(&migration_bitmap_mutex);
1158         migration_dirty_pages += new - old;
1159         call_rcu(old_bitmap, migration_bitmap_free, rcu);
1160     }
1161 }
1162 
1163 /* Each of ram_save_setup, ram_save_iterate and ram_save_complete has
1164  * long-running RCU critical section.  When rcu-reclaims in the code
1165  * start to become numerous it will be necessary to reduce the
1166  * granularity of these critical sections.
1167  */
1168 
1169 static int ram_save_setup(QEMUFile *f, void *opaque)
1170 {
1171     RAMBlock *block;
1172     int64_t ram_bitmap_pages; /* Size of bitmap in pages, including gaps */
1173 
1174     dirty_rate_high_cnt = 0;
1175     bitmap_sync_count = 0;
1176     migration_bitmap_sync_init();
1177     qemu_mutex_init(&migration_bitmap_mutex);
1178 
1179     if (migrate_use_xbzrle()) {
1180         XBZRLE_cache_lock();
1181         XBZRLE.cache = cache_init(migrate_xbzrle_cache_size() /
1182                                   TARGET_PAGE_SIZE,
1183                                   TARGET_PAGE_SIZE);
1184         if (!XBZRLE.cache) {
1185             XBZRLE_cache_unlock();
1186             error_report("Error creating cache");
1187             return -1;
1188         }
1189         XBZRLE_cache_unlock();
1190 
1191         /* We prefer not to abort if there is no memory */
1192         XBZRLE.encoded_buf = g_try_malloc0(TARGET_PAGE_SIZE);
1193         if (!XBZRLE.encoded_buf) {
1194             error_report("Error allocating encoded_buf");
1195             return -1;
1196         }
1197 
1198         XBZRLE.current_buf = g_try_malloc(TARGET_PAGE_SIZE);
1199         if (!XBZRLE.current_buf) {
1200             error_report("Error allocating current_buf");
1201             g_free(XBZRLE.encoded_buf);
1202             XBZRLE.encoded_buf = NULL;
1203             return -1;
1204         }
1205 
1206         acct_clear();
1207     }
1208 
1209     /* iothread lock needed for ram_list.dirty_memory[] */
1210     qemu_mutex_lock_iothread();
1211     qemu_mutex_lock_ramlist();
1212     rcu_read_lock();
1213     bytes_transferred = 0;
1214     reset_ram_globals();
1215 
1216     ram_bitmap_pages = last_ram_offset() >> TARGET_PAGE_BITS;
1217     migration_bitmap_rcu = g_new(struct BitmapRcu, 1);
1218     migration_bitmap_rcu->bmap = bitmap_new(ram_bitmap_pages);
1219     bitmap_set(migration_bitmap_rcu->bmap, 0, ram_bitmap_pages);
1220 
1221     /*
1222      * Count the total number of pages used by ram blocks not including any
1223      * gaps due to alignment or unplugs.
1224      */
1225     migration_dirty_pages = ram_bytes_total() >> TARGET_PAGE_BITS;
1226 
1227     memory_global_dirty_log_start();
1228     migration_bitmap_sync();
1229     qemu_mutex_unlock_ramlist();
1230     qemu_mutex_unlock_iothread();
1231 
1232     qemu_put_be64(f, ram_bytes_total() | RAM_SAVE_FLAG_MEM_SIZE);
1233 
1234     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1235         qemu_put_byte(f, strlen(block->idstr));
1236         qemu_put_buffer(f, (uint8_t *)block->idstr, strlen(block->idstr));
1237         qemu_put_be64(f, block->used_length);
1238     }
1239 
1240     rcu_read_unlock();
1241 
1242     ram_control_before_iterate(f, RAM_CONTROL_SETUP);
1243     ram_control_after_iterate(f, RAM_CONTROL_SETUP);
1244 
1245     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1246 
1247     return 0;
1248 }
1249 
1250 static int ram_save_iterate(QEMUFile *f, void *opaque)
1251 {
1252     int ret;
1253     int i;
1254     int64_t t0;
1255     int pages_sent = 0;
1256 
1257     rcu_read_lock();
1258     if (ram_list.version != last_version) {
1259         reset_ram_globals();
1260     }
1261 
1262     /* Read version before ram_list.blocks */
1263     smp_rmb();
1264 
1265     ram_control_before_iterate(f, RAM_CONTROL_ROUND);
1266 
1267     t0 = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1268     i = 0;
1269     while ((ret = qemu_file_rate_limit(f)) == 0) {
1270         int pages;
1271 
1272         pages = ram_find_and_save_block(f, false, &bytes_transferred);
1273         /* no more pages to sent */
1274         if (pages == 0) {
1275             break;
1276         }
1277         pages_sent += pages;
1278         acct_info.iterations++;
1279 
1280         /* we want to check in the 1st loop, just in case it was the 1st time
1281            and we had to sync the dirty bitmap.
1282            qemu_get_clock_ns() is a bit expensive, so we only check each some
1283            iterations
1284         */
1285         if ((i & 63) == 0) {
1286             uint64_t t1 = (qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - t0) / 1000000;
1287             if (t1 > MAX_WAIT) {
1288                 DPRINTF("big wait: %" PRIu64 " milliseconds, %d iterations\n",
1289                         t1, i);
1290                 break;
1291             }
1292         }
1293         i++;
1294     }
1295     flush_compressed_data(f);
1296     rcu_read_unlock();
1297 
1298     /*
1299      * Must occur before EOS (or any QEMUFile operation)
1300      * because of RDMA protocol.
1301      */
1302     ram_control_after_iterate(f, RAM_CONTROL_ROUND);
1303 
1304     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1305     bytes_transferred += 8;
1306 
1307     ret = qemu_file_get_error(f);
1308     if (ret < 0) {
1309         return ret;
1310     }
1311 
1312     return pages_sent;
1313 }
1314 
1315 /* Called with iothread lock */
1316 static int ram_save_complete(QEMUFile *f, void *opaque)
1317 {
1318     rcu_read_lock();
1319 
1320     migration_bitmap_sync();
1321 
1322     ram_control_before_iterate(f, RAM_CONTROL_FINISH);
1323 
1324     /* try transferring iterative blocks of memory */
1325 
1326     /* flush all remaining blocks regardless of rate limiting */
1327     while (true) {
1328         int pages;
1329 
1330         pages = ram_find_and_save_block(f, true, &bytes_transferred);
1331         /* no more blocks to sent */
1332         if (pages == 0) {
1333             break;
1334         }
1335     }
1336 
1337     flush_compressed_data(f);
1338     ram_control_after_iterate(f, RAM_CONTROL_FINISH);
1339 
1340     rcu_read_unlock();
1341 
1342     qemu_put_be64(f, RAM_SAVE_FLAG_EOS);
1343 
1344     return 0;
1345 }
1346 
1347 static uint64_t ram_save_pending(QEMUFile *f, void *opaque, uint64_t max_size)
1348 {
1349     uint64_t remaining_size;
1350 
1351     remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1352 
1353     if (remaining_size < max_size) {
1354         qemu_mutex_lock_iothread();
1355         rcu_read_lock();
1356         migration_bitmap_sync();
1357         rcu_read_unlock();
1358         qemu_mutex_unlock_iothread();
1359         remaining_size = ram_save_remaining() * TARGET_PAGE_SIZE;
1360     }
1361     return remaining_size;
1362 }
1363 
1364 static int load_xbzrle(QEMUFile *f, ram_addr_t addr, void *host)
1365 {
1366     unsigned int xh_len;
1367     int xh_flags;
1368 
1369     if (!xbzrle_decoded_buf) {
1370         xbzrle_decoded_buf = g_malloc(TARGET_PAGE_SIZE);
1371     }
1372 
1373     /* extract RLE header */
1374     xh_flags = qemu_get_byte(f);
1375     xh_len = qemu_get_be16(f);
1376 
1377     if (xh_flags != ENCODING_FLAG_XBZRLE) {
1378         error_report("Failed to load XBZRLE page - wrong compression!");
1379         return -1;
1380     }
1381 
1382     if (xh_len > TARGET_PAGE_SIZE) {
1383         error_report("Failed to load XBZRLE page - len overflow!");
1384         return -1;
1385     }
1386     /* load data and decode */
1387     qemu_get_buffer(f, xbzrle_decoded_buf, xh_len);
1388 
1389     /* decode RLE */
1390     if (xbzrle_decode_buffer(xbzrle_decoded_buf, xh_len, host,
1391                              TARGET_PAGE_SIZE) == -1) {
1392         error_report("Failed to load XBZRLE page - decode error!");
1393         return -1;
1394     }
1395 
1396     return 0;
1397 }
1398 
1399 /* Must be called from within a rcu critical section.
1400  * Returns a pointer from within the RCU-protected ram_list.
1401  */
1402 static inline void *host_from_stream_offset(QEMUFile *f,
1403                                             ram_addr_t offset,
1404                                             int flags)
1405 {
1406     static RAMBlock *block = NULL;
1407     char id[256];
1408     uint8_t len;
1409 
1410     if (flags & RAM_SAVE_FLAG_CONTINUE) {
1411         if (!block || block->max_length <= offset) {
1412             error_report("Ack, bad migration stream!");
1413             return NULL;
1414         }
1415 
1416         return block->host + offset;
1417     }
1418 
1419     len = qemu_get_byte(f);
1420     qemu_get_buffer(f, (uint8_t *)id, len);
1421     id[len] = 0;
1422 
1423     QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1424         if (!strncmp(id, block->idstr, sizeof(id)) &&
1425             block->max_length > offset) {
1426             return block->host + offset;
1427         }
1428     }
1429 
1430     error_report("Can't find block %s!", id);
1431     return NULL;
1432 }
1433 
1434 /*
1435  * If a page (or a whole RDMA chunk) has been
1436  * determined to be zero, then zap it.
1437  */
1438 void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
1439 {
1440     if (ch != 0 || !is_zero_range(host, size)) {
1441         memset(host, ch, size);
1442     }
1443 }
1444 
1445 static void *do_data_decompress(void *opaque)
1446 {
1447     DecompressParam *param = opaque;
1448     unsigned long pagesize;
1449 
1450     while (!quit_decomp_thread) {
1451         qemu_mutex_lock(&param->mutex);
1452         while (!param->start && !quit_decomp_thread) {
1453             qemu_cond_wait(&param->cond, &param->mutex);
1454             pagesize = TARGET_PAGE_SIZE;
1455             if (!quit_decomp_thread) {
1456                 /* uncompress() will return failed in some case, especially
1457                  * when the page is dirted when doing the compression, it's
1458                  * not a problem because the dirty page will be retransferred
1459                  * and uncompress() won't break the data in other pages.
1460                  */
1461                 uncompress((Bytef *)param->des, &pagesize,
1462                            (const Bytef *)param->compbuf, param->len);
1463             }
1464             param->start = false;
1465         }
1466         qemu_mutex_unlock(&param->mutex);
1467     }
1468 
1469     return NULL;
1470 }
1471 
1472 void migrate_decompress_threads_create(void)
1473 {
1474     int i, thread_count;
1475 
1476     thread_count = migrate_decompress_threads();
1477     decompress_threads = g_new0(QemuThread, thread_count);
1478     decomp_param = g_new0(DecompressParam, thread_count);
1479     compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1480     quit_decomp_thread = false;
1481     for (i = 0; i < thread_count; i++) {
1482         qemu_mutex_init(&decomp_param[i].mutex);
1483         qemu_cond_init(&decomp_param[i].cond);
1484         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
1485         qemu_thread_create(decompress_threads + i, "decompress",
1486                            do_data_decompress, decomp_param + i,
1487                            QEMU_THREAD_JOINABLE);
1488     }
1489 }
1490 
1491 void migrate_decompress_threads_join(void)
1492 {
1493     int i, thread_count;
1494 
1495     quit_decomp_thread = true;
1496     thread_count = migrate_decompress_threads();
1497     for (i = 0; i < thread_count; i++) {
1498         qemu_mutex_lock(&decomp_param[i].mutex);
1499         qemu_cond_signal(&decomp_param[i].cond);
1500         qemu_mutex_unlock(&decomp_param[i].mutex);
1501     }
1502     for (i = 0; i < thread_count; i++) {
1503         qemu_thread_join(decompress_threads + i);
1504         qemu_mutex_destroy(&decomp_param[i].mutex);
1505         qemu_cond_destroy(&decomp_param[i].cond);
1506         g_free(decomp_param[i].compbuf);
1507     }
1508     g_free(decompress_threads);
1509     g_free(decomp_param);
1510     g_free(compressed_data_buf);
1511     decompress_threads = NULL;
1512     decomp_param = NULL;
1513     compressed_data_buf = NULL;
1514 }
1515 
1516 static void decompress_data_with_multi_threads(uint8_t *compbuf,
1517                                                void *host, int len)
1518 {
1519     int idx, thread_count;
1520 
1521     thread_count = migrate_decompress_threads();
1522     while (true) {
1523         for (idx = 0; idx < thread_count; idx++) {
1524             if (!decomp_param[idx].start) {
1525                 memcpy(decomp_param[idx].compbuf, compbuf, len);
1526                 decomp_param[idx].des = host;
1527                 decomp_param[idx].len = len;
1528                 start_decompression(&decomp_param[idx]);
1529                 break;
1530             }
1531         }
1532         if (idx < thread_count) {
1533             break;
1534         }
1535     }
1536 }
1537 
1538 static int ram_load(QEMUFile *f, void *opaque, int version_id)
1539 {
1540     int flags = 0, ret = 0;
1541     static uint64_t seq_iter;
1542     int len = 0;
1543 
1544     seq_iter++;
1545 
1546     if (version_id != 4) {
1547         ret = -EINVAL;
1548     }
1549 
1550     /* This RCU critical section can be very long running.
1551      * When RCU reclaims in the code start to become numerous,
1552      * it will be necessary to reduce the granularity of this
1553      * critical section.
1554      */
1555     rcu_read_lock();
1556     while (!ret && !(flags & RAM_SAVE_FLAG_EOS)) {
1557         ram_addr_t addr, total_ram_bytes;
1558         void *host;
1559         uint8_t ch;
1560 
1561         addr = qemu_get_be64(f);
1562         flags = addr & ~TARGET_PAGE_MASK;
1563         addr &= TARGET_PAGE_MASK;
1564 
1565         switch (flags & ~RAM_SAVE_FLAG_CONTINUE) {
1566         case RAM_SAVE_FLAG_MEM_SIZE:
1567             /* Synchronize RAM block list */
1568             total_ram_bytes = addr;
1569             while (!ret && total_ram_bytes) {
1570                 RAMBlock *block;
1571                 char id[256];
1572                 ram_addr_t length;
1573 
1574                 len = qemu_get_byte(f);
1575                 qemu_get_buffer(f, (uint8_t *)id, len);
1576                 id[len] = 0;
1577                 length = qemu_get_be64(f);
1578 
1579                 QLIST_FOREACH_RCU(block, &ram_list.blocks, next) {
1580                     if (!strncmp(id, block->idstr, sizeof(id))) {
1581                         if (length != block->used_length) {
1582                             Error *local_err = NULL;
1583 
1584                             ret = qemu_ram_resize(block->offset, length, &local_err);
1585                             if (local_err) {
1586                                 error_report_err(local_err);
1587                             }
1588                         }
1589                         ram_control_load_hook(f, RAM_CONTROL_BLOCK_REG,
1590                                               block->idstr);
1591                         break;
1592                     }
1593                 }
1594 
1595                 if (!block) {
1596                     error_report("Unknown ramblock \"%s\", cannot "
1597                                  "accept migration", id);
1598                     ret = -EINVAL;
1599                 }
1600 
1601                 total_ram_bytes -= length;
1602             }
1603             break;
1604         case RAM_SAVE_FLAG_COMPRESS:
1605             host = host_from_stream_offset(f, addr, flags);
1606             if (!host) {
1607                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1608                 ret = -EINVAL;
1609                 break;
1610             }
1611             ch = qemu_get_byte(f);
1612             ram_handle_compressed(host, ch, TARGET_PAGE_SIZE);
1613             break;
1614         case RAM_SAVE_FLAG_PAGE:
1615             host = host_from_stream_offset(f, addr, flags);
1616             if (!host) {
1617                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1618                 ret = -EINVAL;
1619                 break;
1620             }
1621             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
1622             break;
1623         case RAM_SAVE_FLAG_COMPRESS_PAGE:
1624             host = host_from_stream_offset(f, addr, flags);
1625             if (!host) {
1626                 error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
1627                 ret = -EINVAL;
1628                 break;
1629             }
1630 
1631             len = qemu_get_be32(f);
1632             if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
1633                 error_report("Invalid compressed data length: %d", len);
1634                 ret = -EINVAL;
1635                 break;
1636             }
1637             qemu_get_buffer(f, compressed_data_buf, len);
1638             decompress_data_with_multi_threads(compressed_data_buf, host, len);
1639             break;
1640         case RAM_SAVE_FLAG_XBZRLE:
1641             host = host_from_stream_offset(f, addr, flags);
1642             if (!host) {
1643                 error_report("Illegal RAM offset " RAM_ADDR_FMT, addr);
1644                 ret = -EINVAL;
1645                 break;
1646             }
1647             if (load_xbzrle(f, addr, host) < 0) {
1648                 error_report("Failed to decompress XBZRLE page at "
1649                              RAM_ADDR_FMT, addr);
1650                 ret = -EINVAL;
1651                 break;
1652             }
1653             break;
1654         case RAM_SAVE_FLAG_EOS:
1655             /* normal exit */
1656             break;
1657         default:
1658             if (flags & RAM_SAVE_FLAG_HOOK) {
1659                 ram_control_load_hook(f, RAM_CONTROL_HOOK, NULL);
1660             } else {
1661                 error_report("Unknown combination of migration flags: %#x",
1662                              flags);
1663                 ret = -EINVAL;
1664             }
1665         }
1666         if (!ret) {
1667             ret = qemu_file_get_error(f);
1668         }
1669     }
1670 
1671     rcu_read_unlock();
1672     DPRINTF("Completed load of VM with exit code %d seq iteration "
1673             "%" PRIu64 "\n", ret, seq_iter);
1674     return ret;
1675 }
1676 
1677 static SaveVMHandlers savevm_ram_handlers = {
1678     .save_live_setup = ram_save_setup,
1679     .save_live_iterate = ram_save_iterate,
1680     .save_live_complete = ram_save_complete,
1681     .save_live_pending = ram_save_pending,
1682     .load_state = ram_load,
1683     .cleanup = ram_migration_cleanup,
1684 };
1685 
1686 void ram_mig_init(void)
1687 {
1688     qemu_mutex_init(&XBZRLE.lock);
1689     register_savevm_live(NULL, "ram", 0, 4, &savevm_ram_handlers, NULL);
1690 }
1691