xref: /openbmc/qemu/block/block-copy.c (revision 623d7e3551a6fc5693c06ea938c60fe281b52e27)
1  /*
2   * block_copy API
3   *
4   * Copyright (C) 2013 Proxmox Server Solutions
5   * Copyright (c) 2019 Virtuozzo International GmbH.
6   *
7   * Authors:
8   *  Dietmar Maurer (dietmar@proxmox.com)
9   *  Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
10   *
11   * This work is licensed under the terms of the GNU GPL, version 2 or later.
12   * See the COPYING file in the top-level directory.
13   */
14  
15  #include "qemu/osdep.h"
16  
17  #include "trace.h"
18  #include "qapi/error.h"
19  #include "block/block-copy.h"
20  #include "block/block_int-io.h"
21  #include "block/dirty-bitmap.h"
22  #include "block/reqlist.h"
23  #include "sysemu/block-backend.h"
24  #include "qemu/units.h"
25  #include "qemu/co-shared-resource.h"
26  #include "qemu/coroutine.h"
27  #include "qemu/ratelimit.h"
28  #include "block/aio_task.h"
29  #include "qemu/error-report.h"
30  #include "qemu/memalign.h"
31  
32  #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB)
33  #define BLOCK_COPY_MAX_BUFFER (1 * MiB)
34  #define BLOCK_COPY_MAX_MEM (128 * MiB)
35  #define BLOCK_COPY_MAX_WORKERS 64
36  #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */
37  #define BLOCK_COPY_CLUSTER_SIZE_DEFAULT (1 << 16)
38  
39  typedef enum {
40      COPY_READ_WRITE_CLUSTER,
41      COPY_READ_WRITE,
42      COPY_WRITE_ZEROES,
43      COPY_RANGE_SMALL,
44      COPY_RANGE_FULL
45  } BlockCopyMethod;
46  
47  static coroutine_fn int block_copy_task_entry(AioTask *task);
48  
49  typedef struct BlockCopyCallState {
50      /* Fields initialized in block_copy_async() and never changed. */
51      BlockCopyState *s;
52      int64_t offset;
53      int64_t bytes;
54      int max_workers;
55      int64_t max_chunk;
56      bool ignore_ratelimit;
57      BlockCopyAsyncCallbackFunc cb;
58      void *cb_opaque;
59      /* Coroutine where async block-copy is running */
60      Coroutine *co;
61  
62      /* Fields whose state changes throughout the execution */
63      bool finished; /* atomic */
64      QemuCoSleep sleep; /* TODO: protect API with a lock */
65      bool cancelled; /* atomic */
66      /* To reference all call states from BlockCopyState */
67      QLIST_ENTRY(BlockCopyCallState) list;
68  
69      /*
70       * Fields that report information about return values and erros.
71       * Protected by lock in BlockCopyState.
72       */
73      bool error_is_read;
74      /*
75       * @ret is set concurrently by tasks under mutex. Only set once by first
76       * failed task (and untouched if no task failed).
77       * After finishing (call_state->finished is true), it is not modified
78       * anymore and may be safely read without mutex.
79       */
80      int ret;
81  } BlockCopyCallState;
82  
83  typedef struct BlockCopyTask {
84      AioTask task;
85  
86      /*
87       * Fields initialized in block_copy_task_create()
88       * and never changed.
89       */
90      BlockCopyState *s;
91      BlockCopyCallState *call_state;
92      /*
93       * @method can also be set again in the while loop of
94       * block_copy_dirty_clusters(), but it is never accessed concurrently
95       * because the only other function that reads it is
96       * block_copy_task_entry() and it is invoked afterwards in the same
97       * iteration.
98       */
99      BlockCopyMethod method;
100  
101      /*
102       * Generally, req is protected by lock in BlockCopyState, Still req.offset
103       * is only set on task creation, so may be read concurrently after creation.
104       * req.bytes is changed at most once, and need only protecting the case of
105       * parallel read while updating @bytes value in block_copy_task_shrink().
106       */
107      BlockReq req;
108  } BlockCopyTask;
109  
110  static int64_t task_end(BlockCopyTask *task)
111  {
112      return task->req.offset + task->req.bytes;
113  }
114  
115  typedef struct BlockCopyState {
116      /*
117       * BdrvChild objects are not owned or managed by block-copy. They are
118       * provided by block-copy user and user is responsible for appropriate
119       * permissions on these children.
120       */
121      BdrvChild *source;
122      BdrvChild *target;
123  
124      /*
125       * Fields initialized in block_copy_state_new()
126       * and never changed.
127       */
128      int64_t cluster_size;
129      int64_t max_transfer;
130      uint64_t len;
131      BdrvRequestFlags write_flags;
132  
133      /*
134       * Fields whose state changes throughout the execution
135       * Protected by lock.
136       */
137      CoMutex lock;
138      int64_t in_flight_bytes;
139      BlockCopyMethod method;
140      BlockReqList reqs;
141      QLIST_HEAD(, BlockCopyCallState) calls;
142      /*
143       * skip_unallocated:
144       *
145       * Used by sync=top jobs, which first scan the source node for unallocated
146       * areas and clear them in the copy_bitmap.  During this process, the bitmap
147       * is thus not fully initialized: It may still have bits set for areas that
148       * are unallocated and should actually not be copied.
149       *
150       * This is indicated by skip_unallocated.
151       *
152       * In this case, block_copy() will query the source’s allocation status,
153       * skip unallocated regions, clear them in the copy_bitmap, and invoke
154       * block_copy_reset_unallocated() every time it does.
155       */
156      bool skip_unallocated; /* atomic */
157      /* State fields that use a thread-safe API */
158      BdrvDirtyBitmap *copy_bitmap;
159      ProgressMeter *progress;
160      SharedResource *mem;
161      RateLimit rate_limit;
162  } BlockCopyState;
163  
164  /* Called with lock held */
165  static int64_t block_copy_chunk_size(BlockCopyState *s)
166  {
167      switch (s->method) {
168      case COPY_READ_WRITE_CLUSTER:
169          return s->cluster_size;
170      case COPY_READ_WRITE:
171      case COPY_RANGE_SMALL:
172          return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER),
173                     s->max_transfer);
174      case COPY_RANGE_FULL:
175          return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE),
176                     s->max_transfer);
177      default:
178          /* Cannot have COPY_WRITE_ZEROES here.  */
179          abort();
180      }
181  }
182  
183  /*
184   * Search for the first dirty area in offset/bytes range and create task at
185   * the beginning of it.
186   */
187  static coroutine_fn BlockCopyTask *
188  block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
189                         int64_t offset, int64_t bytes)
190  {
191      BlockCopyTask *task;
192      int64_t max_chunk;
193  
194      QEMU_LOCK_GUARD(&s->lock);
195      max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
196      if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
197                                             offset, offset + bytes,
198                                             max_chunk, &offset, &bytes))
199      {
200          return NULL;
201      }
202  
203      assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
204      bytes = QEMU_ALIGN_UP(bytes, s->cluster_size);
205  
206      /* region is dirty, so no existent tasks possible in it */
207      assert(!reqlist_find_conflict(&s->reqs, offset, bytes));
208  
209      bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
210      s->in_flight_bytes += bytes;
211  
212      task = g_new(BlockCopyTask, 1);
213      *task = (BlockCopyTask) {
214          .task.func = block_copy_task_entry,
215          .s = s,
216          .call_state = call_state,
217          .method = s->method,
218      };
219      reqlist_init_req(&s->reqs, &task->req, offset, bytes);
220  
221      return task;
222  }
223  
224  /*
225   * block_copy_task_shrink
226   *
227   * Drop the tail of the task to be handled later. Set dirty bits back and
228   * wake up all tasks waiting for us (may be some of them are not intersecting
229   * with shrunk task)
230   */
231  static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
232                                                  int64_t new_bytes)
233  {
234      QEMU_LOCK_GUARD(&task->s->lock);
235      if (new_bytes == task->req.bytes) {
236          return;
237      }
238  
239      assert(new_bytes > 0 && new_bytes < task->req.bytes);
240  
241      task->s->in_flight_bytes -= task->req.bytes - new_bytes;
242      bdrv_set_dirty_bitmap(task->s->copy_bitmap,
243                            task->req.offset + new_bytes,
244                            task->req.bytes - new_bytes);
245  
246      reqlist_shrink_req(&task->req, new_bytes);
247  }
248  
249  static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
250  {
251      QEMU_LOCK_GUARD(&task->s->lock);
252      task->s->in_flight_bytes -= task->req.bytes;
253      if (ret < 0) {
254          bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->req.offset,
255                                task->req.bytes);
256      }
257      if (task->s->progress) {
258          progress_set_remaining(task->s->progress,
259                                 bdrv_get_dirty_count(task->s->copy_bitmap) +
260                                 task->s->in_flight_bytes);
261      }
262      reqlist_remove_req(&task->req);
263  }
264  
265  void block_copy_state_free(BlockCopyState *s)
266  {
267      if (!s) {
268          return;
269      }
270  
271      ratelimit_destroy(&s->rate_limit);
272      bdrv_release_dirty_bitmap(s->copy_bitmap);
273      shres_destroy(s->mem);
274      g_free(s);
275  }
276  
277  static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target)
278  {
279      return MIN_NON_ZERO(INT_MAX,
280                          MIN_NON_ZERO(source->bs->bl.max_transfer,
281                                       target->bs->bl.max_transfer));
282  }
283  
284  void block_copy_set_copy_opts(BlockCopyState *s, bool use_copy_range,
285                                bool compress)
286  {
287      /* Keep BDRV_REQ_SERIALISING set (or not set) in block_copy_state_new() */
288      s->write_flags = (s->write_flags & BDRV_REQ_SERIALISING) |
289          (compress ? BDRV_REQ_WRITE_COMPRESSED : 0);
290  
291      if (s->max_transfer < s->cluster_size) {
292          /*
293           * copy_range does not respect max_transfer. We don't want to bother
294           * with requests smaller than block-copy cluster size, so fallback to
295           * buffered copying (read and write respect max_transfer on their
296           * behalf).
297           */
298          s->method = COPY_READ_WRITE_CLUSTER;
299      } else if (compress) {
300          /* Compression supports only cluster-size writes and no copy-range. */
301          s->method = COPY_READ_WRITE_CLUSTER;
302      } else {
303          /*
304           * If copy range enabled, start with COPY_RANGE_SMALL, until first
305           * successful copy_range (look at block_copy_do_copy).
306           */
307          s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE;
308      }
309  }
310  
311  static int64_t block_copy_calculate_cluster_size(BlockDriverState *target,
312                                                   Error **errp)
313  {
314      int ret;
315      BlockDriverInfo bdi;
316      bool target_does_cow = bdrv_backing_chain_next(target);
317  
318      /*
319       * If there is no backing file on the target, we cannot rely on COW if our
320       * backup cluster size is smaller than the target cluster size. Even for
321       * targets with a backing file, try to avoid COW if possible.
322       */
323      ret = bdrv_get_info(target, &bdi);
324      if (ret == -ENOTSUP && !target_does_cow) {
325          /* Cluster size is not defined */
326          warn_report("The target block device doesn't provide "
327                      "information about the block size and it doesn't have a "
328                      "backing file. The default block size of %u bytes is "
329                      "used. If the actual block size of the target exceeds "
330                      "this default, the backup may be unusable",
331                      BLOCK_COPY_CLUSTER_SIZE_DEFAULT);
332          return BLOCK_COPY_CLUSTER_SIZE_DEFAULT;
333      } else if (ret < 0 && !target_does_cow) {
334          error_setg_errno(errp, -ret,
335              "Couldn't determine the cluster size of the target image, "
336              "which has no backing file");
337          error_append_hint(errp,
338              "Aborting, since this may create an unusable destination image\n");
339          return ret;
340      } else if (ret < 0 && target_does_cow) {
341          /* Not fatal; just trudge on ahead. */
342          return BLOCK_COPY_CLUSTER_SIZE_DEFAULT;
343      }
344  
345      return MAX(BLOCK_COPY_CLUSTER_SIZE_DEFAULT, bdi.cluster_size);
346  }
347  
348  BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
349                                       const BdrvDirtyBitmap *bitmap,
350                                       Error **errp)
351  {
352      ERRP_GUARD();
353      BlockCopyState *s;
354      int64_t cluster_size;
355      BdrvDirtyBitmap *copy_bitmap;
356      bool is_fleecing;
357  
358      cluster_size = block_copy_calculate_cluster_size(target->bs, errp);
359      if (cluster_size < 0) {
360          return NULL;
361      }
362  
363      copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL,
364                                             errp);
365      if (!copy_bitmap) {
366          return NULL;
367      }
368      bdrv_disable_dirty_bitmap(copy_bitmap);
369      if (bitmap) {
370          if (!bdrv_merge_dirty_bitmap(copy_bitmap, bitmap, NULL, errp)) {
371              error_prepend(errp, "Failed to merge bitmap '%s' to internal "
372                            "copy-bitmap: ", bdrv_dirty_bitmap_name(bitmap));
373              bdrv_release_dirty_bitmap(copy_bitmap);
374              return NULL;
375          }
376      } else {
377          bdrv_set_dirty_bitmap(copy_bitmap, 0,
378                                bdrv_dirty_bitmap_size(copy_bitmap));
379      }
380  
381      /*
382       * If source is in backing chain of target assume that target is going to be
383       * used for "image fleecing", i.e. it should represent a kind of snapshot of
384       * source at backup-start point in time. And target is going to be read by
385       * somebody (for example, used as NBD export) during backup job.
386       *
387       * In this case, we need to add BDRV_REQ_SERIALISING write flag to avoid
388       * intersection of backup writes and third party reads from target,
389       * otherwise reading from target we may occasionally read already updated by
390       * guest data.
391       *
392       * For more information see commit f8d59dfb40bb and test
393       * tests/qemu-iotests/222
394       */
395      is_fleecing = bdrv_chain_contains(target->bs, source->bs);
396  
397      s = g_new(BlockCopyState, 1);
398      *s = (BlockCopyState) {
399          .source = source,
400          .target = target,
401          .copy_bitmap = copy_bitmap,
402          .cluster_size = cluster_size,
403          .len = bdrv_dirty_bitmap_size(copy_bitmap),
404          .write_flags = (is_fleecing ? BDRV_REQ_SERIALISING : 0),
405          .mem = shres_create(BLOCK_COPY_MAX_MEM),
406          .max_transfer = QEMU_ALIGN_DOWN(
407                                      block_copy_max_transfer(source, target),
408                                      cluster_size),
409      };
410  
411      block_copy_set_copy_opts(s, false, false);
412  
413      ratelimit_init(&s->rate_limit);
414      qemu_co_mutex_init(&s->lock);
415      QLIST_INIT(&s->reqs);
416      QLIST_INIT(&s->calls);
417  
418      return s;
419  }
420  
421  /* Only set before running the job, no need for locking. */
422  void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
423  {
424      s->progress = pm;
425  }
426  
427  /*
428   * Takes ownership of @task
429   *
430   * If pool is NULL directly run the task, otherwise schedule it into the pool.
431   *
432   * Returns: task.func return code if pool is NULL
433   *          otherwise -ECANCELED if pool status is bad
434   *          otherwise 0 (successfully scheduled)
435   */
436  static coroutine_fn int block_copy_task_run(AioTaskPool *pool,
437                                              BlockCopyTask *task)
438  {
439      if (!pool) {
440          int ret = task->task.func(&task->task);
441  
442          g_free(task);
443          return ret;
444      }
445  
446      aio_task_pool_wait_slot(pool);
447      if (aio_task_pool_status(pool) < 0) {
448          co_put_to_shres(task->s->mem, task->req.bytes);
449          block_copy_task_end(task, -ECANCELED);
450          g_free(task);
451          return -ECANCELED;
452      }
453  
454      aio_task_pool_start_task(pool, &task->task);
455  
456      return 0;
457  }
458  
459  /*
460   * block_copy_do_copy
461   *
462   * Do copy of cluster-aligned chunk. Requested region is allowed to exceed
463   * s->len only to cover last cluster when s->len is not aligned to clusters.
464   *
465   * No sync here: nor bitmap neighter intersecting requests handling, only copy.
466   *
467   * @method is an in-out argument, so that copy_range can be either extended to
468   * a full-size buffer or disabled if the copy_range attempt fails.  The output
469   * value of @method should be used for subsequent tasks.
470   * Returns 0 on success.
471   */
472  static int coroutine_fn GRAPH_RDLOCK
473  block_copy_do_copy(BlockCopyState *s, int64_t offset, int64_t bytes,
474                     BlockCopyMethod *method, bool *error_is_read)
475  {
476      int ret;
477      int64_t nbytes = MIN(offset + bytes, s->len) - offset;
478      void *bounce_buffer = NULL;
479  
480      assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes);
481      assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
482      assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
483      assert(offset < s->len);
484      assert(offset + bytes <= s->len ||
485             offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size));
486      assert(nbytes < INT_MAX);
487  
488      switch (*method) {
489      case COPY_WRITE_ZEROES:
490          ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags &
491                                      ~BDRV_REQ_WRITE_COMPRESSED);
492          if (ret < 0) {
493              trace_block_copy_write_zeroes_fail(s, offset, ret);
494              *error_is_read = false;
495          }
496          return ret;
497  
498      case COPY_RANGE_SMALL:
499      case COPY_RANGE_FULL:
500          ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes,
501                                   0, s->write_flags);
502          if (ret >= 0) {
503              /* Successful copy-range, increase chunk size.  */
504              *method = COPY_RANGE_FULL;
505              return 0;
506          }
507  
508          trace_block_copy_copy_range_fail(s, offset, ret);
509          *method = COPY_READ_WRITE;
510          /* Fall through to read+write with allocated buffer */
511  
512      case COPY_READ_WRITE_CLUSTER:
513      case COPY_READ_WRITE:
514          /*
515           * In case of failed copy_range request above, we may proceed with
516           * buffered request larger than BLOCK_COPY_MAX_BUFFER.
517           * Still, further requests will be properly limited, so don't care too
518           * much. Moreover the most likely case (copy_range is unsupported for
519           * the configuration, so the very first copy_range request fails)
520           * is handled by setting large copy_size only after first successful
521           * copy_range.
522           */
523  
524          bounce_buffer = qemu_blockalign(s->source->bs, nbytes);
525  
526          ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0);
527          if (ret < 0) {
528              trace_block_copy_read_fail(s, offset, ret);
529              *error_is_read = true;
530              goto out;
531          }
532  
533          ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer,
534                               s->write_flags);
535          if (ret < 0) {
536              trace_block_copy_write_fail(s, offset, ret);
537              *error_is_read = false;
538              goto out;
539          }
540  
541      out:
542          qemu_vfree(bounce_buffer);
543          break;
544  
545      default:
546          abort();
547      }
548  
549      return ret;
550  }
551  
552  static coroutine_fn int block_copy_task_entry(AioTask *task)
553  {
554      BlockCopyTask *t = container_of(task, BlockCopyTask, task);
555      BlockCopyState *s = t->s;
556      bool error_is_read = false;
557      BlockCopyMethod method = t->method;
558      int ret;
559  
560      WITH_GRAPH_RDLOCK_GUARD() {
561          ret = block_copy_do_copy(s, t->req.offset, t->req.bytes, &method,
562                                   &error_is_read);
563      }
564  
565      WITH_QEMU_LOCK_GUARD(&s->lock) {
566          if (s->method == t->method) {
567              s->method = method;
568          }
569  
570          if (ret < 0) {
571              if (!t->call_state->ret) {
572                  t->call_state->ret = ret;
573                  t->call_state->error_is_read = error_is_read;
574              }
575          } else if (s->progress) {
576              progress_work_done(s->progress, t->req.bytes);
577          }
578      }
579      co_put_to_shres(s->mem, t->req.bytes);
580      block_copy_task_end(t, ret);
581  
582      return ret;
583  }
584  
585  static coroutine_fn GRAPH_RDLOCK
586  int block_copy_block_status(BlockCopyState *s, int64_t offset, int64_t bytes,
587                              int64_t *pnum)
588  {
589      int64_t num;
590      BlockDriverState *base;
591      int ret;
592  
593      if (qatomic_read(&s->skip_unallocated)) {
594          base = bdrv_backing_chain_next(s->source->bs);
595      } else {
596          base = NULL;
597      }
598  
599      ret = bdrv_co_block_status_above(s->source->bs, base, offset, bytes, &num,
600                                       NULL, NULL);
601      if (ret < 0 || num < s->cluster_size) {
602          /*
603           * On error or if failed to obtain large enough chunk just fallback to
604           * copy one cluster.
605           */
606          num = s->cluster_size;
607          ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA;
608      } else if (offset + num == s->len) {
609          num = QEMU_ALIGN_UP(num, s->cluster_size);
610      } else {
611          num = QEMU_ALIGN_DOWN(num, s->cluster_size);
612      }
613  
614      *pnum = num;
615      return ret;
616  }
617  
618  /*
619   * Check if the cluster starting at offset is allocated or not.
620   * return via pnum the number of contiguous clusters sharing this allocation.
621   */
622  static int coroutine_fn GRAPH_RDLOCK
623  block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset,
624                                  int64_t *pnum)
625  {
626      BlockDriverState *bs = s->source->bs;
627      int64_t count, total_count = 0;
628      int64_t bytes = s->len - offset;
629      int ret;
630  
631      assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
632  
633      while (true) {
634          /* protected in backup_run() */
635          ret = bdrv_co_is_allocated(bs, offset, bytes, &count);
636          if (ret < 0) {
637              return ret;
638          }
639  
640          total_count += count;
641  
642          if (ret || count == 0) {
643              /*
644               * ret: partial segment(s) are considered allocated.
645               * otherwise: unallocated tail is treated as an entire segment.
646               */
647              *pnum = DIV_ROUND_UP(total_count, s->cluster_size);
648              return ret;
649          }
650  
651          /* Unallocated segment(s) with uncertain following segment(s) */
652          if (total_count >= s->cluster_size) {
653              *pnum = total_count / s->cluster_size;
654              return 0;
655          }
656  
657          offset += count;
658          bytes -= count;
659      }
660  }
661  
662  void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes)
663  {
664      QEMU_LOCK_GUARD(&s->lock);
665  
666      bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
667      if (s->progress) {
668          progress_set_remaining(s->progress,
669                                 bdrv_get_dirty_count(s->copy_bitmap) +
670                                 s->in_flight_bytes);
671      }
672  }
673  
674  /*
675   * Reset bits in copy_bitmap starting at offset if they represent unallocated
676   * data in the image. May reset subsequent contiguous bits.
677   * @return 0 when the cluster at @offset was unallocated,
678   *         1 otherwise, and -ret on error.
679   */
680  int64_t coroutine_fn block_copy_reset_unallocated(BlockCopyState *s,
681                                                    int64_t offset,
682                                                    int64_t *count)
683  {
684      int ret;
685      int64_t clusters, bytes;
686  
687      ret = block_copy_is_cluster_allocated(s, offset, &clusters);
688      if (ret < 0) {
689          return ret;
690      }
691  
692      bytes = clusters * s->cluster_size;
693  
694      if (!ret) {
695          block_copy_reset(s, offset, bytes);
696      }
697  
698      *count = bytes;
699      return ret;
700  }
701  
702  /*
703   * block_copy_dirty_clusters
704   *
705   * Copy dirty clusters in @offset/@bytes range.
706   * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty
707   * clusters found and -errno on failure.
708   */
709  static int coroutine_fn GRAPH_RDLOCK
710  block_copy_dirty_clusters(BlockCopyCallState *call_state)
711  {
712      BlockCopyState *s = call_state->s;
713      int64_t offset = call_state->offset;
714      int64_t bytes = call_state->bytes;
715  
716      int ret = 0;
717      bool found_dirty = false;
718      int64_t end = offset + bytes;
719      AioTaskPool *aio = NULL;
720  
721      /*
722       * block_copy() user is responsible for keeping source and target in same
723       * aio context
724       */
725      assert(bdrv_get_aio_context(s->source->bs) ==
726             bdrv_get_aio_context(s->target->bs));
727  
728      assert(QEMU_IS_ALIGNED(offset, s->cluster_size));
729      assert(QEMU_IS_ALIGNED(bytes, s->cluster_size));
730  
731      while (bytes && aio_task_pool_status(aio) == 0 &&
732             !qatomic_read(&call_state->cancelled)) {
733          BlockCopyTask *task;
734          int64_t status_bytes;
735  
736          task = block_copy_task_create(s, call_state, offset, bytes);
737          if (!task) {
738              /* No more dirty bits in the bitmap */
739              trace_block_copy_skip_range(s, offset, bytes);
740              break;
741          }
742          if (task->req.offset > offset) {
743              trace_block_copy_skip_range(s, offset, task->req.offset - offset);
744          }
745  
746          found_dirty = true;
747  
748          ret = block_copy_block_status(s, task->req.offset, task->req.bytes,
749                                        &status_bytes);
750          assert(ret >= 0); /* never fail */
751          if (status_bytes < task->req.bytes) {
752              block_copy_task_shrink(task, status_bytes);
753          }
754          if (qatomic_read(&s->skip_unallocated) &&
755              !(ret & BDRV_BLOCK_ALLOCATED)) {
756              block_copy_task_end(task, 0);
757              trace_block_copy_skip_range(s, task->req.offset, task->req.bytes);
758              offset = task_end(task);
759              bytes = end - offset;
760              g_free(task);
761              continue;
762          }
763          if (ret & BDRV_BLOCK_ZERO) {
764              task->method = COPY_WRITE_ZEROES;
765          }
766  
767          if (!call_state->ignore_ratelimit) {
768              uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0);
769              if (ns > 0) {
770                  block_copy_task_end(task, -EAGAIN);
771                  g_free(task);
772                  qemu_co_sleep_ns_wakeable(&call_state->sleep,
773                                            QEMU_CLOCK_REALTIME, ns);
774                  continue;
775              }
776          }
777  
778          ratelimit_calculate_delay(&s->rate_limit, task->req.bytes);
779  
780          trace_block_copy_process(s, task->req.offset);
781  
782          co_get_from_shres(s->mem, task->req.bytes);
783  
784          offset = task_end(task);
785          bytes = end - offset;
786  
787          if (!aio && bytes) {
788              aio = aio_task_pool_new(call_state->max_workers);
789          }
790  
791          ret = block_copy_task_run(aio, task);
792          if (ret < 0) {
793              goto out;
794          }
795      }
796  
797  out:
798      if (aio) {
799          aio_task_pool_wait_all(aio);
800  
801          /*
802           * We are not really interested in -ECANCELED returned from
803           * block_copy_task_run. If it fails, it means some task already failed
804           * for real reason, let's return first failure.
805           * Still, assert that we don't rewrite failure by success.
806           *
807           * Note: ret may be positive here because of block-status result.
808           */
809          assert(ret >= 0 || aio_task_pool_status(aio) < 0);
810          ret = aio_task_pool_status(aio);
811  
812          aio_task_pool_free(aio);
813      }
814  
815      return ret < 0 ? ret : found_dirty;
816  }
817  
818  void block_copy_kick(BlockCopyCallState *call_state)
819  {
820      qemu_co_sleep_wake(&call_state->sleep);
821  }
822  
823  /*
824   * block_copy_common
825   *
826   * Copy requested region, accordingly to dirty bitmap.
827   * Collaborate with parallel block_copy requests: if they succeed it will help
828   * us. If they fail, we will retry not-copied regions. So, if we return error,
829   * it means that some I/O operation failed in context of _this_ block_copy call,
830   * not some parallel operation.
831   */
832  static int coroutine_fn GRAPH_RDLOCK
833  block_copy_common(BlockCopyCallState *call_state)
834  {
835      int ret;
836      BlockCopyState *s = call_state->s;
837  
838      qemu_co_mutex_lock(&s->lock);
839      QLIST_INSERT_HEAD(&s->calls, call_state, list);
840      qemu_co_mutex_unlock(&s->lock);
841  
842      do {
843          ret = block_copy_dirty_clusters(call_state);
844  
845          if (ret == 0 && !qatomic_read(&call_state->cancelled)) {
846              WITH_QEMU_LOCK_GUARD(&s->lock) {
847                  /*
848                   * Check that there is no task we still need to
849                   * wait to complete
850                   */
851                  ret = reqlist_wait_one(&s->reqs, call_state->offset,
852                                         call_state->bytes, &s->lock);
853                  if (ret == 0) {
854                      /*
855                       * No pending tasks, but check again the bitmap in this
856                       * same critical section, since a task might have failed
857                       * between this and the critical section in
858                       * block_copy_dirty_clusters().
859                       *
860                       * reqlist_wait_one return value 0 also means that it
861                       * didn't release the lock. So, we are still in the same
862                       * critical section, not interrupted by any concurrent
863                       * access to state.
864                       */
865                      ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
866                                                         call_state->offset,
867                                                         call_state->bytes) >= 0;
868                  }
869              }
870          }
871  
872          /*
873           * We retry in two cases:
874           * 1. Some progress done
875           *    Something was copied, which means that there were yield points
876           *    and some new dirty bits may have appeared (due to failed parallel
877           *    block-copy requests).
878           * 2. We have waited for some intersecting block-copy request
879           *    It may have failed and produced new dirty bits.
880           */
881      } while (ret > 0 && !qatomic_read(&call_state->cancelled));
882  
883      qatomic_store_release(&call_state->finished, true);
884  
885      if (call_state->cb) {
886          call_state->cb(call_state->cb_opaque);
887      }
888  
889      qemu_co_mutex_lock(&s->lock);
890      QLIST_REMOVE(call_state, list);
891      qemu_co_mutex_unlock(&s->lock);
892  
893      return ret;
894  }
895  
896  static void coroutine_fn block_copy_async_co_entry(void *opaque)
897  {
898      GRAPH_RDLOCK_GUARD();
899      block_copy_common(opaque);
900  }
901  
902  int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes,
903                              bool ignore_ratelimit, uint64_t timeout_ns,
904                              BlockCopyAsyncCallbackFunc cb,
905                              void *cb_opaque)
906  {
907      int ret;
908      BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
909  
910      *call_state = (BlockCopyCallState) {
911          .s = s,
912          .offset = start,
913          .bytes = bytes,
914          .ignore_ratelimit = ignore_ratelimit,
915          .max_workers = BLOCK_COPY_MAX_WORKERS,
916          .cb = cb,
917          .cb_opaque = cb_opaque,
918      };
919  
920      ret = qemu_co_timeout(block_copy_async_co_entry, call_state, timeout_ns,
921                            g_free);
922      if (ret < 0) {
923          assert(ret == -ETIMEDOUT);
924          block_copy_call_cancel(call_state);
925          /* call_state will be freed by running coroutine. */
926          return ret;
927      }
928  
929      ret = call_state->ret;
930      g_free(call_state);
931  
932      return ret;
933  }
934  
935  BlockCopyCallState *block_copy_async(BlockCopyState *s,
936                                       int64_t offset, int64_t bytes,
937                                       int max_workers, int64_t max_chunk,
938                                       BlockCopyAsyncCallbackFunc cb,
939                                       void *cb_opaque)
940  {
941      BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
942  
943      *call_state = (BlockCopyCallState) {
944          .s = s,
945          .offset = offset,
946          .bytes = bytes,
947          .max_workers = max_workers,
948          .max_chunk = max_chunk,
949          .cb = cb,
950          .cb_opaque = cb_opaque,
951  
952          .co = qemu_coroutine_create(block_copy_async_co_entry, call_state),
953      };
954  
955      qemu_coroutine_enter(call_state->co);
956  
957      return call_state;
958  }
959  
960  void block_copy_call_free(BlockCopyCallState *call_state)
961  {
962      if (!call_state) {
963          return;
964      }
965  
966      assert(qatomic_read(&call_state->finished));
967      g_free(call_state);
968  }
969  
970  bool block_copy_call_finished(BlockCopyCallState *call_state)
971  {
972      return qatomic_read(&call_state->finished);
973  }
974  
975  bool block_copy_call_succeeded(BlockCopyCallState *call_state)
976  {
977      return qatomic_load_acquire(&call_state->finished) &&
978             !qatomic_read(&call_state->cancelled) &&
979             call_state->ret == 0;
980  }
981  
982  bool block_copy_call_failed(BlockCopyCallState *call_state)
983  {
984      return qatomic_load_acquire(&call_state->finished) &&
985             !qatomic_read(&call_state->cancelled) &&
986             call_state->ret < 0;
987  }
988  
989  bool block_copy_call_cancelled(BlockCopyCallState *call_state)
990  {
991      return qatomic_read(&call_state->cancelled);
992  }
993  
994  int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read)
995  {
996      assert(qatomic_load_acquire(&call_state->finished));
997      if (error_is_read) {
998          *error_is_read = call_state->error_is_read;
999      }
1000      return call_state->ret;
1001  }
1002  
1003  /*
1004   * Note that cancelling and finishing are racy.
1005   * User can cancel a block-copy that is already finished.
1006   */
1007  void block_copy_call_cancel(BlockCopyCallState *call_state)
1008  {
1009      qatomic_set(&call_state->cancelled, true);
1010      block_copy_kick(call_state);
1011  }
1012  
1013  BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
1014  {
1015      return s->copy_bitmap;
1016  }
1017  
1018  int64_t block_copy_cluster_size(BlockCopyState *s)
1019  {
1020      return s->cluster_size;
1021  }
1022  
1023  void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
1024  {
1025      qatomic_set(&s->skip_unallocated, skip);
1026  }
1027  
1028  void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
1029  {
1030      ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME);
1031  
1032      /*
1033       * Note: it's good to kick all call states from here, but it should be done
1034       * only from a coroutine, to not crash if s->calls list changed while
1035       * entering one call. So for now, the only user of this function kicks its
1036       * only one call_state by hand.
1037       */
1038  }
1039