xref: /openbmc/qemu/block/block-copy.c (revision 8e0ef068942e4152f0d23e76ca1f5e35dc4456f7)
1 /*
2  * block_copy API
3  *
4  * Copyright (C) 2013 Proxmox Server Solutions
5  * Copyright (c) 2019 Virtuozzo International GmbH.
6  *
7  * Authors:
8  *  Dietmar Maurer (dietmar@proxmox.com)
9  *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 
17 #include "trace.h"
18 #include "qapi/error.h"
19 #include "block/block-copy.h"
20 #include "sysemu/block-backend.h"
21 #include "qemu/units.h"
22 #include "qemu/coroutine.h"
23 #include "block/aio_task.h"
24 
25 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
26 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
27 #define BLOCK_COPY_MAX_MEM (128 * MiB)
28 #define BLOCK_COPY_MAX_WORKERS 64
29 
30 static coroutine_fn int block_copy_task_entry(AioTask *task);
31 
32 typedef struct BlockCopyCallState {
33     bool failed;
34     bool error_is_read;
35 } BlockCopyCallState;
36 
37 typedef struct BlockCopyTask {
38     AioTask task;
39 
40     BlockCopyState *s;
41     BlockCopyCallState *call_state;
42     int64_t offset;
43     int64_t bytes;
44     bool zeroes;
45     QLIST_ENTRY(BlockCopyTask) list;
46     CoQueue wait_queue; /* coroutines blocked on this task */
47 } BlockCopyTask;
48 
49 static int64_t task_end(BlockCopyTask *task)
50 {
51     return task->offset + task->bytes;
52 }
53 
54 typedef struct BlockCopyState {
55     /*
56      * BdrvChild objects are not owned or managed by block-copy. They are
57      * provided by block-copy user and user is responsible for appropriate
58      * permissions on these children.
59      */
60     BdrvChild *source;
61     BdrvChild *target;
62     BdrvDirtyBitmap *copy_bitmap;
63     int64_t in_flight_bytes;
64     int64_t cluster_size;
65     bool use_copy_range;
66     int64_t copy_size;
67     uint64_t len;
68     QLIST_HEAD(, BlockCopyTask) tasks;
69 
70     BdrvRequestFlags write_flags;
71 
72     /*
73      * skip_unallocated:
74      *
75      * Used by sync=top jobs, which first scan the source node for unallocated
76      * areas and clear them in the copy_bitmap.  During this process, the bitmap
77      * is thus not fully initialized: It may still have bits set for areas that
78      * are unallocated and should actually not be copied.
79      *
80      * This is indicated by skip_unallocated.
81      *
82      * In this case, block_copy() will query the source’s allocation status,
83      * skip unallocated regions, clear them in the copy_bitmap, and invoke
84      * block_copy_reset_unallocated() every time it does.
85      */
86     bool skip_unallocated;
87 
88     ProgressMeter *progress;
89     /* progress_bytes_callback: called when some copying progress is done. */
90     ProgressBytesCallbackFunc progress_bytes_callback;
91     void *progress_opaque;
92 
93     SharedResource *mem;
94 } BlockCopyState;
95 
96 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
97                                             int64_t offset, int64_t bytes)
98 {
99     BlockCopyTask *t;
100 
101     QLIST_FOREACH(t, &s->tasks, list) {
102         if (offset + bytes > t->offset && offset < t->offset + t->bytes) {
103             return t;
104         }
105     }
106 
107     return NULL;
108 }
109 
110 /*
111  * If there are no intersecting tasks return false. Otherwise, wait for the
112  * first found intersecting tasks to finish and return true.
113  */
114 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
115                                              int64_t bytes)
116 {
117     BlockCopyTask *task = find_conflicting_task(s, offset, bytes);
118 
119     if (!task) {
120         return false;
121     }
122 
123     qemu_co_queue_wait(&task->wait_queue, NULL);
124 
125     return true;
126 }
127 
128 /*
129  * Search for the first dirty area in offset/bytes range and create task at
130  * the beginning of it.
131  */
132 static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
133                                              BlockCopyCallState *call_state,
134                                              int64_t offset, int64_t bytes)
135 {
136     BlockCopyTask *task;
137 
138     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
139                                            offset, offset + bytes,
140                                            s->copy_size, &offset, &bytes))
141     {
142         return NULL;
143     }
144 
145     /* region is dirty, so no existent tasks possible in it */
146     assert(!find_conflicting_task(s, offset, bytes));
147 
148     bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
149     s->in_flight_bytes += bytes;
150 
151     task = g_new(BlockCopyTask, 1);
152     *task = (BlockCopyTask) {
153         .task.func = block_copy_task_entry,
154         .s = s,
155         .call_state = call_state,
156         .offset = offset,
157         .bytes = bytes,
158     };
159     qemu_co_queue_init(&task->wait_queue);
160     QLIST_INSERT_HEAD(&s->tasks, task, list);
161 
162     return task;
163 }
164 
165 /*
166  * block_copy_task_shrink
167  *
168  * Drop the tail of the task to be handled later. Set dirty bits back and
169  * wake up all tasks waiting for us (may be some of them are not intersecting
170  * with shrunk task)
171  */
172 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
173                                                 int64_t new_bytes)
174 {
175     if (new_bytes == task->bytes) {
176         return;
177     }
178 
179     assert(new_bytes > 0 && new_bytes < task->bytes);
180 
181     task->s->in_flight_bytes -= task->bytes - new_bytes;
182     bdrv_set_dirty_bitmap(task->s->copy_bitmap,
183                           task->offset + new_bytes, task->bytes - new_bytes);
184 
185     task->bytes = new_bytes;
186     qemu_co_queue_restart_all(&task->wait_queue);
187 }
188 
189 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
190 {
191     task->s->in_flight_bytes -= task->bytes;
192     if (ret < 0) {
193         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
194     }
195     QLIST_REMOVE(task, list);
196     qemu_co_queue_restart_all(&task->wait_queue);
197 }
198 
199 void block_copy_state_free(BlockCopyState *s)
200 {
201     if (!s) {
202         return;
203     }
204 
205     bdrv_release_dirty_bitmap(s->copy_bitmap);
206     shres_destroy(s->mem);
207     g_free(s);
208 }
209 
210 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
211 {
212     return MIN_NON_ZERO(INT_MAX,
213                         MIN_NON_ZERO(source->bs->bl.max_transfer,
214                                      target->bs->bl.max_transfer));
215 }
216 
217 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
218                                      int64_t cluster_size,
219                                      BdrvRequestFlags write_flags, Error **errp)
220 {
221     BlockCopyState *s;
222     BdrvDirtyBitmap *copy_bitmap;
223 
224     copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
225                                            errp);
226     if (!copy_bitmap) {
227         return NULL;
228     }
229     bdrv_disable_dirty_bitmap(copy_bitmap);
230 
231     s = g_new(BlockCopyState, 1);
232     *s = (BlockCopyState) {
233         .source = source,
234         .target = target,
235         .copy_bitmap = copy_bitmap,
236         .cluster_size = cluster_size,
237         .len = bdrv_dirty_bitmap_size(copy_bitmap),
238         .write_flags = write_flags,
239         .mem = shres_create(BLOCK_COPY_MAX_MEM),
240     };
241 
242     if (block_copy_max_transfer(source, target) < cluster_size) {
243         /*
244          * copy_range does not respect max_transfer. We don't want to bother
245          * with requests smaller than block-copy cluster size, so fallback to
246          * buffered copying (read and write respect max_transfer on their
247          * behalf).
248          */
249         s->use_copy_range = false;
250         s->copy_size = cluster_size;
251     } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
252         /* Compression supports only cluster-size writes and no copy-range. */
253         s->use_copy_range = false;
254         s->copy_size = cluster_size;
255     } else {
256         /*
257          * We enable copy-range, but keep small copy_size, until first
258          * successful copy_range (look at block_copy_do_copy).
259          */
260         s->use_copy_range = true;
261         s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
262     }
263 
264     QLIST_INIT(&s->tasks);
265 
266     return s;
267 }
268 
269 void block_copy_set_progress_callback(
270         BlockCopyState *s,
271         ProgressBytesCallbackFunc progress_bytes_callback,
272         void *progress_opaque)
273 {
274     s->progress_bytes_callback = progress_bytes_callback;
275     s->progress_opaque = progress_opaque;
276 }
277 
278 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
279 {
280     s->progress = pm;
281 }
282 
283 /*
284  * Takes ownership of @task
285  *
286  * If pool is NULL directly run the task, otherwise schedule it into the pool.
287  *
288  * Returns: task.func return code if pool is NULL
289  *          otherwise -ECANCELED if pool status is bad
290  *          otherwise 0 (successfully scheduled)
291  */
292 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
293                                             BlockCopyTask *task)
294 {
295     if (!pool) {
296         int ret = task->task.func(&task->task);
297 
298         g_free(task);
299         return ret;
300     }
301 
302     aio_task_pool_wait_slot(pool);
303     if (aio_task_pool_status(pool) < 0) {
304         co_put_to_shres(task->s->mem, task->bytes);
305         block_copy_task_end(task, -ECANCELED);
306         g_free(task);
307         return -ECANCELED;
308     }
309 
310     aio_task_pool_start_task(pool, &task->task);
311 
312     return 0;
313 }
314 
315 /*
316  * block_copy_do_copy
317  *
318  * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
319  * s->len only to cover last cluster when s->len is not aligned to clusters.
320  *
321  * No sync here: nor bitmap neighter intersecting requests handling, only copy.
322  *
323  * Returns 0 on success.
324  */
325 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
326                                            int64_t offset, int64_t bytes,
327                                            bool zeroes, bool *error_is_read)
328 {
329     int ret;
330     int64_t nbytes = MIN(offset + bytes, s->len) - offset;
331     void *bounce_buffer = NULL;
332 
333     assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
334     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
335     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
336     assert(offset < s->len);
337     assert(offset + bytes <= s->len ||
338            offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
339     assert(nbytes < INT_MAX);
340 
341     if (zeroes) {
342         ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
343                                     ~BDRV_REQ_WRITE_COMPRESSED);
344         if (ret < 0) {
345             trace_block_copy_write_zeroes_fail(s, offset, ret);
346             *error_is_read = false;
347         }
348         return ret;
349     }
350 
351     if (s->use_copy_range) {
352         ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
353                                  0, s->write_flags);
354         if (ret < 0) {
355             trace_block_copy_copy_range_fail(s, offset, ret);
356             s->use_copy_range = false;
357             s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
358             /* Fallback to read+write with allocated buffer */
359         } else {
360             if (s->use_copy_range) {
361                 /*
362                  * Successful copy-range. Now increase copy_size.  copy_range
363                  * does not respect max_transfer (it's a TODO), so we factor
364                  * that in here.
365                  *
366                  * Note: we double-check s->use_copy_range for the case when
367                  * parallel block-copy request unsets it during previous
368                  * bdrv_co_copy_range call.
369                  */
370                 s->copy_size =
371                         MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
372                             QEMU_ALIGN_DOWN(block_copy_max_transfer(s->source,
373                                                                     s->target),
374                                             s->cluster_size));
375             }
376             goto out;
377         }
378     }
379 
380     /*
381      * In case of failed copy_range request above, we may proceed with buffered
382      * request larger than BLOCK_COPY_MAX_BUFFER. Still, further requests will
383      * be properly limited, so don't care too much. Moreover the most likely
384      * case (copy_range is unsupported for the configuration, so the very first
385      * copy_range request fails) is handled by setting large copy_size only
386      * after first successful copy_range.
387      */
388 
389     bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
390 
391     ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
392     if (ret < 0) {
393         trace_block_copy_read_fail(s, offset, ret);
394         *error_is_read = true;
395         goto out;
396     }
397 
398     ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
399                          s->write_flags);
400     if (ret < 0) {
401         trace_block_copy_write_fail(s, offset, ret);
402         *error_is_read = false;
403         goto out;
404     }
405 
406 out:
407     qemu_vfree(bounce_buffer);
408 
409     return ret;
410 }
411 
412 static coroutine_fn int block_copy_task_entry(AioTask *task)
413 {
414     BlockCopyTask *t = container_of(task, BlockCopyTask, task);
415     bool error_is_read = false;
416     int ret;
417 
418     ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes,
419                              &error_is_read);
420     if (ret < 0 && !t->call_state->failed) {
421         t->call_state->failed = true;
422         t->call_state->error_is_read = error_is_read;
423     } else {
424         progress_work_done(t->s->progress, t->bytes);
425         t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque);
426     }
427     co_put_to_shres(t->s->mem, t->bytes);
428     block_copy_task_end(t, ret);
429 
430     return ret;
431 }
432 
433 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
434                                    int64_t bytes, int64_t *pnum)
435 {
436     int64_t num;
437     BlockDriverState *base;
438     int ret;
439 
440     if (s->skip_unallocated && s->source->bs->backing) {
441         base = s->source->bs->backing->bs;
442     } else {
443         base = NULL;
444     }
445 
446     ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
447                                   NULL, NULL);
448     if (ret < 0 || num < s->cluster_size) {
449         /*
450          * On error or if failed to obtain large enough chunk just fallback to
451          * copy one cluster.
452          */
453         num = s->cluster_size;
454         ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
455     } else if (offset + num == s->len) {
456         num = QEMU_ALIGN_UP(num, s->cluster_size);
457     } else {
458         num = QEMU_ALIGN_DOWN(num, s->cluster_size);
459     }
460 
461     *pnum = num;
462     return ret;
463 }
464 
465 /*
466  * Check if the cluster starting at offset is allocated or not.
467  * return via pnum the number of contiguous clusters sharing this allocation.
468  */
469 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
470                                            int64_t *pnum)
471 {
472     BlockDriverState *bs = s->source->bs;
473     int64_t count, total_count = 0;
474     int64_t bytes = s->len - offset;
475     int ret;
476 
477     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
478 
479     while (true) {
480         ret = bdrv_is_allocated(bs, offset, bytes, &count);
481         if (ret < 0) {
482             return ret;
483         }
484 
485         total_count += count;
486 
487         if (ret || count == 0) {
488             /*
489              * ret: partial segment(s) are considered allocated.
490              * otherwise: unallocated tail is treated as an entire segment.
491              */
492             *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
493             return ret;
494         }
495 
496         /* Unallocated segment(s) with uncertain following segment(s) */
497         if (total_count >= s->cluster_size) {
498             *pnum = total_count / s->cluster_size;
499             return 0;
500         }
501 
502         offset += count;
503         bytes -= count;
504     }
505 }
506 
507 /*
508  * Reset bits in copy_bitmap starting at offset if they represent unallocated
509  * data in the image. May reset subsequent contiguous bits.
510  * @return 0 when the cluster at @offset was unallocated,
511  *         1 otherwise, and -ret on error.
512  */
513 int64_t block_copy_reset_unallocated(BlockCopyState *s,
514                                      int64_t offset, int64_t *count)
515 {
516     int ret;
517     int64_t clusters, bytes;
518 
519     ret = block_copy_is_cluster_allocated(s, offset, &clusters);
520     if (ret < 0) {
521         return ret;
522     }
523 
524     bytes = clusters * s->cluster_size;
525 
526     if (!ret) {
527         bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
528         progress_set_remaining(s->progress,
529                                bdrv_get_dirty_count(s->copy_bitmap) +
530                                s->in_flight_bytes);
531     }
532 
533     *count = bytes;
534     return ret;
535 }
536 
537 /*
538  * block_copy_dirty_clusters
539  *
540  * Copy dirty clusters in @offset/@bytes range.
541  * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
542  * clusters found and -errno on failure.
543  */
544 static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
545                                                   int64_t offset, int64_t bytes,
546                                                   bool *error_is_read)
547 {
548     int ret = 0;
549     bool found_dirty = false;
550     int64_t end = offset + bytes;
551     AioTaskPool *aio = NULL;
552     BlockCopyCallState call_state = {false, false};
553 
554     /*
555      * block_copy() user is responsible for keeping source and target in same
556      * aio context
557      */
558     assert(bdrv_get_aio_context(s->source->bs) ==
559            bdrv_get_aio_context(s->target->bs));
560 
561     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
562     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
563 
564     while (bytes && aio_task_pool_status(aio) == 0) {
565         BlockCopyTask *task;
566         int64_t status_bytes;
567 
568         task = block_copy_task_create(s, &call_state, offset, bytes);
569         if (!task) {
570             /* No more dirty bits in the bitmap */
571             trace_block_copy_skip_range(s, offset, bytes);
572             break;
573         }
574         if (task->offset > offset) {
575             trace_block_copy_skip_range(s, offset, task->offset - offset);
576         }
577 
578         found_dirty = true;
579 
580         ret = block_copy_block_status(s, task->offset, task->bytes,
581                                       &status_bytes);
582         assert(ret >= 0); /* never fail */
583         if (status_bytes < task->bytes) {
584             block_copy_task_shrink(task, status_bytes);
585         }
586         if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
587             block_copy_task_end(task, 0);
588             progress_set_remaining(s->progress,
589                                    bdrv_get_dirty_count(s->copy_bitmap) +
590                                    s->in_flight_bytes);
591             trace_block_copy_skip_range(s, task->offset, task->bytes);
592             offset = task_end(task);
593             bytes = end - offset;
594             g_free(task);
595             continue;
596         }
597         task->zeroes = ret & BDRV_BLOCK_ZERO;
598 
599         trace_block_copy_process(s, task->offset);
600 
601         co_get_from_shres(s->mem, task->bytes);
602 
603         offset = task_end(task);
604         bytes = end - offset;
605 
606         if (!aio && bytes) {
607             aio = aio_task_pool_new(BLOCK_COPY_MAX_WORKERS);
608         }
609 
610         ret = block_copy_task_run(aio, task);
611         if (ret < 0) {
612             goto out;
613         }
614     }
615 
616 out:
617     if (aio) {
618         aio_task_pool_wait_all(aio);
619 
620         /*
621          * We are not really interested in -ECANCELED returned from
622          * block_copy_task_run. If it fails, it means some task already failed
623          * for real reason, let's return first failure.
624          * Still, assert that we don't rewrite failure by success.
625          *
626          * Note: ret may be positive here because of block-status result.
627          */
628         assert(ret >= 0 || aio_task_pool_status(aio) < 0);
629         ret = aio_task_pool_status(aio);
630 
631         aio_task_pool_free(aio);
632     }
633     if (error_is_read && ret < 0) {
634         *error_is_read = call_state.error_is_read;
635     }
636 
637     return ret < 0 ? ret : found_dirty;
638 }
639 
640 /*
641  * block_copy
642  *
643  * Copy requested region, accordingly to dirty bitmap.
644  * Collaborate with parallel block_copy requests: if they succeed it will help
645  * us. If they fail, we will retry not-copied regions. So, if we return error,
646  * it means that some I/O operation failed in context of _this_ block_copy call,
647  * not some parallel operation.
648  */
649 int coroutine_fn block_copy(BlockCopyState *s, int64_t offset, int64_t bytes,
650                             bool *error_is_read)
651 {
652     int ret;
653 
654     do {
655         ret = block_copy_dirty_clusters(s, offset, bytes, error_is_read);
656 
657         if (ret == 0) {
658             ret = block_copy_wait_one(s, offset, bytes);
659         }
660 
661         /*
662          * We retry in two cases:
663          * 1. Some progress done
664          *    Something was copied, which means that there were yield points
665          *    and some new dirty bits may have appeared (due to failed parallel
666          *    block-copy requests).
667          * 2. We have waited for some intersecting block-copy request
668          *    It may have failed and produced new dirty bits.
669          */
670     } while (ret > 0);
671 
672     return ret;
673 }
674 
675 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
676 {
677     return s->copy_bitmap;
678 }
679 
680 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
681 {
682     s->skip_unallocated = skip;
683 }
684