xref: /openbmc/qemu/block/block-copy.c (revision 64ed6f92)
1 /*
2  * block_copy API
3  *
4  * Copyright (C) 2013 Proxmox Server Solutions
5  * Copyright (c) 2019 Virtuozzo International GmbH.
6  *
7  * Authors:
8  *  Dietmar Maurer (dietmar@proxmox.com)
9  *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
10  *
11  * This work is licensed under the terms of the GNU GPL, version 2 or later.
12  * See the COPYING file in the top-level directory.
13  */
14 
15 #include "qemu/osdep.h"
16 
17 #include "trace.h"
18 #include "qapi/error.h"
19 #include "block/block-copy.h"
20 #include "sysemu/block-backend.h"
21 #include "qemu/units.h"
22 #include "qemu/coroutine.h"
23 #include "block/aio_task.h"
24 
25 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
26 #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
27 #define BLOCK_COPY_MAX_MEM (128 * MiB)
28 #define BLOCK_COPY_MAX_WORKERS 64
29 
30 static coroutine_fn int block_copy_task_entry(AioTask *task);
31 
32 typedef struct BlockCopyCallState {
33     bool failed;
34     bool error_is_read;
35 } BlockCopyCallState;
36 
37 typedef struct BlockCopyTask {
38     AioTask task;
39 
40     BlockCopyState *s;
41     BlockCopyCallState *call_state;
42     int64_t offset;
43     int64_t bytes;
44     bool zeroes;
45     QLIST_ENTRY(BlockCopyTask) list;
46     CoQueue wait_queue; /* coroutines blocked on this task */
47 } BlockCopyTask;
48 
49 static int64_t task_end(BlockCopyTask *task)
50 {
51     return task->offset + task->bytes;
52 }
53 
54 typedef struct BlockCopyState {
55     /*
56      * BdrvChild objects are not owned or managed by block-copy. They are
57      * provided by block-copy user and user is responsible for appropriate
58      * permissions on these children.
59      */
60     BdrvChild *source;
61     BdrvChild *target;
62     BdrvDirtyBitmap *copy_bitmap;
63     int64_t in_flight_bytes;
64     int64_t cluster_size;
65     bool use_copy_range;
66     int64_t copy_size;
67     uint64_t len;
68     QLIST_HEAD(, BlockCopyTask) tasks;
69 
70     BdrvRequestFlags write_flags;
71 
72     /*
73      * skip_unallocated:
74      *
75      * Used by sync=top jobs, which first scan the source node for unallocated
76      * areas and clear them in the copy_bitmap.  During this process, the bitmap
77      * is thus not fully initialized: It may still have bits set for areas that
78      * are unallocated and should actually not be copied.
79      *
80      * This is indicated by skip_unallocated.
81      *
82      * In this case, block_copy() will query the source’s allocation status,
83      * skip unallocated regions, clear them in the copy_bitmap, and invoke
84      * block_copy_reset_unallocated() every time it does.
85      */
86     bool skip_unallocated;
87 
88     ProgressMeter *progress;
89     /* progress_bytes_callback: called when some copying progress is done. */
90     ProgressBytesCallbackFunc progress_bytes_callback;
91     void *progress_opaque;
92 
93     SharedResource *mem;
94 } BlockCopyState;
95 
96 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
97                                             int64_t offset, int64_t bytes)
98 {
99     BlockCopyTask *t;
100 
101     QLIST_FOREACH(t, &s->tasks, list) {
102         if (offset + bytes > t->offset && offset < t->offset + t->bytes) {
103             return t;
104         }
105     }
106 
107     return NULL;
108 }
109 
110 /*
111  * If there are no intersecting tasks return false. Otherwise, wait for the
112  * first found intersecting tasks to finish and return true.
113  */
114 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
115                                              int64_t bytes)
116 {
117     BlockCopyTask *task = find_conflicting_task(s, offset, bytes);
118 
119     if (!task) {
120         return false;
121     }
122 
123     qemu_co_queue_wait(&task->wait_queue, NULL);
124 
125     return true;
126 }
127 
128 /*
129  * Search for the first dirty area in offset/bytes range and create task at
130  * the beginning of it.
131  */
132 static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
133                                              BlockCopyCallState *call_state,
134                                              int64_t offset, int64_t bytes)
135 {
136     BlockCopyTask *task;
137 
138     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
139                                            offset, offset + bytes,
140                                            s->copy_size, &offset, &bytes))
141     {
142         return NULL;
143     }
144 
145     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
146     bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
147 
148     /* region is dirty, so no existent tasks possible in it */
149     assert(!find_conflicting_task(s, offset, bytes));
150 
151     bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
152     s->in_flight_bytes += bytes;
153 
154     task = g_new(BlockCopyTask, 1);
155     *task = (BlockCopyTask) {
156         .task.func = block_copy_task_entry,
157         .s = s,
158         .call_state = call_state,
159         .offset = offset,
160         .bytes = bytes,
161     };
162     qemu_co_queue_init(&task->wait_queue);
163     QLIST_INSERT_HEAD(&s->tasks, task, list);
164 
165     return task;
166 }
167 
168 /*
169  * block_copy_task_shrink
170  *
171  * Drop the tail of the task to be handled later. Set dirty bits back and
172  * wake up all tasks waiting for us (may be some of them are not intersecting
173  * with shrunk task)
174  */
175 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
176                                                 int64_t new_bytes)
177 {
178     if (new_bytes == task->bytes) {
179         return;
180     }
181 
182     assert(new_bytes > 0 && new_bytes < task->bytes);
183 
184     task->s->in_flight_bytes -= task->bytes - new_bytes;
185     bdrv_set_dirty_bitmap(task->s->copy_bitmap,
186                           task->offset + new_bytes, task->bytes - new_bytes);
187 
188     task->bytes = new_bytes;
189     qemu_co_queue_restart_all(&task->wait_queue);
190 }
191 
192 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
193 {
194     task->s->in_flight_bytes -= task->bytes;
195     if (ret < 0) {
196         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
197     }
198     QLIST_REMOVE(task, list);
199     qemu_co_queue_restart_all(&task->wait_queue);
200 }
201 
202 void block_copy_state_free(BlockCopyState *s)
203 {
204     if (!s) {
205         return;
206     }
207 
208     bdrv_release_dirty_bitmap(s->copy_bitmap);
209     shres_destroy(s->mem);
210     g_free(s);
211 }
212 
213 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
214 {
215     return MIN_NON_ZERO(INT_MAX,
216                         MIN_NON_ZERO(source->bs->bl.max_transfer,
217                                      target->bs->bl.max_transfer));
218 }
219 
220 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
221                                      int64_t cluster_size,
222                                      BdrvRequestFlags write_flags, Error **errp)
223 {
224     BlockCopyState *s;
225     BdrvDirtyBitmap *copy_bitmap;
226 
227     copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
228                                            errp);
229     if (!copy_bitmap) {
230         return NULL;
231     }
232     bdrv_disable_dirty_bitmap(copy_bitmap);
233 
234     s = g_new(BlockCopyState, 1);
235     *s = (BlockCopyState) {
236         .source = source,
237         .target = target,
238         .copy_bitmap = copy_bitmap,
239         .cluster_size = cluster_size,
240         .len = bdrv_dirty_bitmap_size(copy_bitmap),
241         .write_flags = write_flags,
242         .mem = shres_create(BLOCK_COPY_MAX_MEM),
243     };
244 
245     if (block_copy_max_transfer(source, target) < cluster_size) {
246         /*
247          * copy_range does not respect max_transfer. We don't want to bother
248          * with requests smaller than block-copy cluster size, so fallback to
249          * buffered copying (read and write respect max_transfer on their
250          * behalf).
251          */
252         s->use_copy_range = false;
253         s->copy_size = cluster_size;
254     } else if (write_flags & BDRV_REQ_WRITE_COMPRESSED) {
255         /* Compression supports only cluster-size writes and no copy-range. */
256         s->use_copy_range = false;
257         s->copy_size = cluster_size;
258     } else {
259         /*
260          * We enable copy-range, but keep small copy_size, until first
261          * successful copy_range (look at block_copy_do_copy).
262          */
263         s->use_copy_range = true;
264         s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
265     }
266 
267     QLIST_INIT(&s->tasks);
268 
269     return s;
270 }
271 
272 void block_copy_set_progress_callback(
273         BlockCopyState *s,
274         ProgressBytesCallbackFunc progress_bytes_callback,
275         void *progress_opaque)
276 {
277     s->progress_bytes_callback = progress_bytes_callback;
278     s->progress_opaque = progress_opaque;
279 }
280 
281 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
282 {
283     s->progress = pm;
284 }
285 
286 /*
287  * Takes ownership of @task
288  *
289  * If pool is NULL directly run the task, otherwise schedule it into the pool.
290  *
291  * Returns: task.func return code if pool is NULL
292  *          otherwise -ECANCELED if pool status is bad
293  *          otherwise 0 (successfully scheduled)
294  */
295 static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
296                                             BlockCopyTask *task)
297 {
298     if (!pool) {
299         int ret = task->task.func(&task->task);
300 
301         g_free(task);
302         return ret;
303     }
304 
305     aio_task_pool_wait_slot(pool);
306     if (aio_task_pool_status(pool) < 0) {
307         co_put_to_shres(task->s->mem, task->bytes);
308         block_copy_task_end(task, -ECANCELED);
309         g_free(task);
310         return -ECANCELED;
311     }
312 
313     aio_task_pool_start_task(pool, &task->task);
314 
315     return 0;
316 }
317 
318 /*
319  * block_copy_do_copy
320  *
321  * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
322  * s->len only to cover last cluster when s->len is not aligned to clusters.
323  *
324  * No sync here: nor bitmap neighter intersecting requests handling, only copy.
325  *
326  * Returns 0 on success.
327  */
328 static int coroutine_fn block_copy_do_copy(BlockCopyState *s,
329                                            int64_t offset, int64_t bytes,
330                                            bool zeroes, bool *error_is_read)
331 {
332     int ret;
333     int64_t nbytes = MIN(offset + bytes, s->len) - offset;
334     void *bounce_buffer = NULL;
335 
336     assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
337     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
338     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
339     assert(offset < s->len);
340     assert(offset + bytes <= s->len ||
341            offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
342     assert(nbytes < INT_MAX);
343 
344     if (zeroes) {
345         ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
346                                     ~BDRV_REQ_WRITE_COMPRESSED);
347         if (ret < 0) {
348             trace_block_copy_write_zeroes_fail(s, offset, ret);
349             *error_is_read = false;
350         }
351         return ret;
352     }
353 
354     if (s->use_copy_range) {
355         ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
356                                  0, s->write_flags);
357         if (ret < 0) {
358             trace_block_copy_copy_range_fail(s, offset, ret);
359             s->use_copy_range = false;
360             s->copy_size = MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER);
361             /* Fallback to read+write with allocated buffer */
362         } else {
363             if (s->use_copy_range) {
364                 /*
365                  * Successful copy-range. Now increase copy_size.  copy_range
366                  * does not respect max_transfer (it's a TODO), so we factor
367                  * that in here.
368                  *
369                  * Note: we double-check s->use_copy_range for the case when
370                  * parallel block-copy request unsets it during previous
371                  * bdrv_co_copy_range call.
372                  */
373                 s->copy_size =
374                         MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
375                             QEMU_ALIGN_DOWN(block_copy_max_transfer(s->source,
376                                                                     s->target),
377                                             s->cluster_size));
378             }
379             goto out;
380         }
381     }
382 
383     /*
384      * In case of failed copy_range request above, we may proceed with buffered
385      * request larger than BLOCK_COPY_MAX_BUFFER. Still, further requests will
386      * be properly limited, so don't care too much. Moreover the most likely
387      * case (copy_range is unsupported for the configuration, so the very first
388      * copy_range request fails) is handled by setting large copy_size only
389      * after first successful copy_range.
390      */
391 
392     bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
393 
394     ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
395     if (ret < 0) {
396         trace_block_copy_read_fail(s, offset, ret);
397         *error_is_read = true;
398         goto out;
399     }
400 
401     ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
402                          s->write_flags);
403     if (ret < 0) {
404         trace_block_copy_write_fail(s, offset, ret);
405         *error_is_read = false;
406         goto out;
407     }
408 
409 out:
410     qemu_vfree(bounce_buffer);
411 
412     return ret;
413 }
414 
415 static coroutine_fn int block_copy_task_entry(AioTask *task)
416 {
417     BlockCopyTask *t = container_of(task, BlockCopyTask, task);
418     bool error_is_read = false;
419     int ret;
420 
421     ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes,
422                              &error_is_read);
423     if (ret < 0 && !t->call_state->failed) {
424         t->call_state->failed = true;
425         t->call_state->error_is_read = error_is_read;
426     } else {
427         progress_work_done(t->s->progress, t->bytes);
428         t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque);
429     }
430     co_put_to_shres(t->s->mem, t->bytes);
431     block_copy_task_end(t, ret);
432 
433     return ret;
434 }
435 
436 static int block_copy_block_status(BlockCopyState *s, int64_t offset,
437                                    int64_t bytes, int64_t *pnum)
438 {
439     int64_t num;
440     BlockDriverState *base;
441     int ret;
442 
443     if (s->skip_unallocated && s->source->bs->backing) {
444         base = s->source->bs->backing->bs;
445     } else {
446         base = NULL;
447     }
448 
449     ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num,
450                                   NULL, NULL);
451     if (ret < 0 || num < s->cluster_size) {
452         /*
453          * On error or if failed to obtain large enough chunk just fallback to
454          * copy one cluster.
455          */
456         num = s->cluster_size;
457         ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
458     } else if (offset + num == s->len) {
459         num = QEMU_ALIGN_UP(num, s->cluster_size);
460     } else {
461         num = QEMU_ALIGN_DOWN(num, s->cluster_size);
462     }
463 
464     *pnum = num;
465     return ret;
466 }
467 
468 /*
469  * Check if the cluster starting at offset is allocated or not.
470  * return via pnum the number of contiguous clusters sharing this allocation.
471  */
472 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
473                                            int64_t *pnum)
474 {
475     BlockDriverState *bs = s->source->bs;
476     int64_t count, total_count = 0;
477     int64_t bytes = s->len - offset;
478     int ret;
479 
480     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
481 
482     while (true) {
483         ret = bdrv_is_allocated(bs, offset, bytes, &count);
484         if (ret < 0) {
485             return ret;
486         }
487 
488         total_count += count;
489 
490         if (ret || count == 0) {
491             /*
492              * ret: partial segment(s) are considered allocated.
493              * otherwise: unallocated tail is treated as an entire segment.
494              */
495             *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
496             return ret;
497         }
498 
499         /* Unallocated segment(s) with uncertain following segment(s) */
500         if (total_count >= s->cluster_size) {
501             *pnum = total_count / s->cluster_size;
502             return 0;
503         }
504 
505         offset += count;
506         bytes -= count;
507     }
508 }
509 
510 /*
511  * Reset bits in copy_bitmap starting at offset if they represent unallocated
512  * data in the image. May reset subsequent contiguous bits.
513  * @return 0 when the cluster at @offset was unallocated,
514  *         1 otherwise, and -ret on error.
515  */
516 int64_t block_copy_reset_unallocated(BlockCopyState *s,
517                                      int64_t offset, int64_t *count)
518 {
519     int ret;
520     int64_t clusters, bytes;
521 
522     ret = block_copy_is_cluster_allocated(s, offset, &clusters);
523     if (ret < 0) {
524         return ret;
525     }
526 
527     bytes = clusters * s->cluster_size;
528 
529     if (!ret) {
530         bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
531         progress_set_remaining(s->progress,
532                                bdrv_get_dirty_count(s->copy_bitmap) +
533                                s->in_flight_bytes);
534     }
535 
536     *count = bytes;
537     return ret;
538 }
539 
540 /*
541  * block_copy_dirty_clusters
542  *
543  * Copy dirty clusters in @offset/@bytes range.
544  * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
545  * clusters found and -errno on failure.
546  */
547 static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s,
548                                                   int64_t offset, int64_t bytes,
549                                                   bool *error_is_read)
550 {
551     int ret = 0;
552     bool found_dirty = false;
553     int64_t end = offset + bytes;
554     AioTaskPool *aio = NULL;
555     BlockCopyCallState call_state = {false, false};
556 
557     /*
558      * block_copy() user is responsible for keeping source and target in same
559      * aio context
560      */
561     assert(bdrv_get_aio_context(s->source->bs) ==
562            bdrv_get_aio_context(s->target->bs));
563 
564     assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
565     assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
566 
567     while (bytes && aio_task_pool_status(aio) == 0) {
568         BlockCopyTask *task;
569         int64_t status_bytes;
570 
571         task = block_copy_task_create(s, &call_state, offset, bytes);
572         if (!task) {
573             /* No more dirty bits in the bitmap */
574             trace_block_copy_skip_range(s, offset, bytes);
575             break;
576         }
577         if (task->offset > offset) {
578             trace_block_copy_skip_range(s, offset, task->offset - offset);
579         }
580 
581         found_dirty = true;
582 
583         ret = block_copy_block_status(s, task->offset, task->bytes,
584                                       &status_bytes);
585         assert(ret >= 0); /* never fail */
586         if (status_bytes < task->bytes) {
587             block_copy_task_shrink(task, status_bytes);
588         }
589         if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
590             block_copy_task_end(task, 0);
591             progress_set_remaining(s->progress,
592                                    bdrv_get_dirty_count(s->copy_bitmap) +
593                                    s->in_flight_bytes);
594             trace_block_copy_skip_range(s, task->offset, task->bytes);
595             offset = task_end(task);
596             bytes = end - offset;
597             g_free(task);
598             continue;
599         }
600         task->zeroes = ret & BDRV_BLOCK_ZERO;
601 
602         trace_block_copy_process(s, task->offset);
603 
604         co_get_from_shres(s->mem, task->bytes);
605 
606         offset = task_end(task);
607         bytes = end - offset;
608 
609         if (!aio && bytes) {
610             aio = aio_task_pool_new(BLOCK_COPY_MAX_WORKERS);
611         }
612 
613         ret = block_copy_task_run(aio, task);
614         if (ret < 0) {
615             goto out;
616         }
617     }
618 
619 out:
620     if (aio) {
621         aio_task_pool_wait_all(aio);
622 
623         /*
624          * We are not really interested in -ECANCELED returned from
625          * block_copy_task_run. If it fails, it means some task already failed
626          * for real reason, let's return first failure.
627          * Still, assert that we don't rewrite failure by success.
628          *
629          * Note: ret may be positive here because of block-status result.
630          */
631         assert(ret >= 0 || aio_task_pool_status(aio) < 0);
632         ret = aio_task_pool_status(aio);
633 
634         aio_task_pool_free(aio);
635     }
636     if (error_is_read && ret < 0) {
637         *error_is_read = call_state.error_is_read;
638     }
639 
640     return ret < 0 ? ret : found_dirty;
641 }
642 
643 /*
644  * block_copy
645  *
646  * Copy requested region, accordingly to dirty bitmap.
647  * Collaborate with parallel block_copy requests: if they succeed it will help
648  * us. If they fail, we will retry not-copied regions. So, if we return error,
649  * it means that some I/O operation failed in context of _this_ block_copy call,
650  * not some parallel operation.
651  */
652 int coroutine_fn block_copy(BlockCopyState *s, int64_t offset, int64_t bytes,
653                             bool *error_is_read)
654 {
655     int ret;
656 
657     do {
658         ret = block_copy_dirty_clusters(s, offset, bytes, error_is_read);
659 
660         if (ret == 0) {
661             ret = block_copy_wait_one(s, offset, bytes);
662         }
663 
664         /*
665          * We retry in two cases:
666          * 1. Some progress done
667          *    Something was copied, which means that there were yield points
668          *    and some new dirty bits may have appeared (due to failed parallel
669          *    block-copy requests).
670          * 2. We have waited for some intersecting block-copy request
671          *    It may have failed and produced new dirty bits.
672          */
673     } while (ret > 0);
674 
675     return ret;
676 }
677 
678 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
679 {
680     return s->copy_bitmap;
681 }
682 
683 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
684 {
685     s->skip_unallocated = skip;
686 }
687