xref: /openbmc/qemu/block/mirror.c (revision 423be09ab9492735924e73a2d36069784441ebc6)
1  /*
2   * Image mirroring
3   *
4   * Copyright Red Hat, Inc. 2012
5   *
6   * Authors:
7   *  Paolo Bonzini  <pbonzini@redhat.com>
8   *
9   * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10   * See the COPYING.LIB file in the top-level directory.
11   *
12   */
13  
14  #include "qemu/osdep.h"
15  #include "qemu/cutils.h"
16  #include "qemu/coroutine.h"
17  #include "qemu/range.h"
18  #include "trace.h"
19  #include "block/blockjob_int.h"
20  #include "block/block_int.h"
21  #include "block/dirty-bitmap.h"
22  #include "sysemu/block-backend.h"
23  #include "qapi/error.h"
24  #include "qemu/ratelimit.h"
25  #include "qemu/bitmap.h"
26  #include "qemu/memalign.h"
27  
28  #define MAX_IN_FLIGHT 16
29  #define MAX_IO_BYTES (1 << 20) /* 1 Mb */
30  #define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES)
31  
32  /* The mirroring buffer is a list of granularity-sized chunks.
33   * Free chunks are organized in a list.
34   */
35  typedef struct MirrorBuffer {
36      QSIMPLEQ_ENTRY(MirrorBuffer) next;
37  } MirrorBuffer;
38  
39  typedef struct MirrorOp MirrorOp;
40  
41  typedef struct MirrorBlockJob {
42      BlockJob common;
43      BlockBackend *target;
44      BlockDriverState *mirror_top_bs;
45      BlockDriverState *base;
46      BlockDriverState *base_overlay;
47  
48      /* The name of the graph node to replace */
49      char *replaces;
50      /* The BDS to replace */
51      BlockDriverState *to_replace;
52      /* Used to block operations on the drive-mirror-replace target */
53      Error *replace_blocker;
54      bool is_none_mode;
55      BlockMirrorBackingMode backing_mode;
56      /* Whether the target image requires explicit zero-initialization */
57      bool zero_target;
58      /*
59       * To be accesssed with atomics. Written only under the BQL (required by the
60       * current implementation of mirror_change()).
61       */
62      MirrorCopyMode copy_mode;
63      BlockdevOnError on_source_error, on_target_error;
64      /*
65       * To be accessed with atomics.
66       *
67       * Set when the target is synced (dirty bitmap is clean, nothing in flight)
68       * and the job is running in active mode.
69       */
70      bool actively_synced;
71      bool should_complete;
72      int64_t granularity;
73      size_t buf_size;
74      int64_t bdev_length;
75      unsigned long *cow_bitmap;
76      BdrvDirtyBitmap *dirty_bitmap;
77      BdrvDirtyBitmapIter *dbi;
78      uint8_t *buf;
79      QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
80      int buf_free_count;
81  
82      uint64_t last_pause_ns;
83      unsigned long *in_flight_bitmap;
84      unsigned in_flight;
85      int64_t bytes_in_flight;
86      QTAILQ_HEAD(, MirrorOp) ops_in_flight;
87      int ret;
88      bool unmap;
89      int target_cluster_size;
90      int max_iov;
91      bool initial_zeroing_ongoing;
92      int in_active_write_counter;
93      int64_t active_write_bytes_in_flight;
94      bool prepared;
95      bool in_drain;
96      bool base_ro;
97  } MirrorBlockJob;
98  
99  typedef struct MirrorBDSOpaque {
100      MirrorBlockJob *job;
101      bool stop;
102      bool is_commit;
103  } MirrorBDSOpaque;
104  
105  struct MirrorOp {
106      MirrorBlockJob *s;
107      QEMUIOVector qiov;
108      int64_t offset;
109      uint64_t bytes;
110  
111      /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
112       * mirror_co_discard() before yielding for the first time */
113      int64_t *bytes_handled;
114  
115      bool is_pseudo_op;
116      bool is_active_write;
117      bool is_in_flight;
118      CoQueue waiting_requests;
119      Coroutine *co;
120      MirrorOp *waiting_for_op;
121  
122      QTAILQ_ENTRY(MirrorOp) next;
123  };
124  
125  typedef enum MirrorMethod {
126      MIRROR_METHOD_COPY,
127      MIRROR_METHOD_ZERO,
128      MIRROR_METHOD_DISCARD,
129  } MirrorMethod;
130  
mirror_error_action(MirrorBlockJob * s,bool read,int error)131  static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
132                                              int error)
133  {
134      qatomic_set(&s->actively_synced, false);
135      if (read) {
136          return block_job_error_action(&s->common, s->on_source_error,
137                                        true, error);
138      } else {
139          return block_job_error_action(&s->common, s->on_target_error,
140                                        false, error);
141      }
142  }
143  
mirror_wait_on_conflicts(MirrorOp * self,MirrorBlockJob * s,uint64_t offset,uint64_t bytes)144  static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
145                                                    MirrorBlockJob *s,
146                                                    uint64_t offset,
147                                                    uint64_t bytes)
148  {
149      uint64_t self_start_chunk = offset / s->granularity;
150      uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
151      uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;
152  
153      while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
154                           self_start_chunk) < self_end_chunk &&
155             s->ret >= 0)
156      {
157          MirrorOp *op;
158  
159          QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
160              uint64_t op_start_chunk = op->offset / s->granularity;
161              uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
162                                                   s->granularity) -
163                                      op_start_chunk;
164  
165              if (op == self) {
166                  continue;
167              }
168  
169              if (ranges_overlap(self_start_chunk, self_nb_chunks,
170                                 op_start_chunk, op_nb_chunks))
171              {
172                  if (self) {
173                      /*
174                       * If the operation is already (indirectly) waiting for us,
175                       * or will wait for us as soon as it wakes up, then just go
176                       * on (instead of producing a deadlock in the former case).
177                       */
178                      if (op->waiting_for_op) {
179                          continue;
180                      }
181  
182                      self->waiting_for_op = op;
183                  }
184  
185                  qemu_co_queue_wait(&op->waiting_requests, NULL);
186  
187                  if (self) {
188                      self->waiting_for_op = NULL;
189                  }
190  
191                  break;
192              }
193          }
194      }
195  }
196  
mirror_iteration_done(MirrorOp * op,int ret)197  static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
198  {
199      MirrorBlockJob *s = op->s;
200      struct iovec *iov;
201      int64_t chunk_num;
202      int i, nb_chunks;
203  
204      trace_mirror_iteration_done(s, op->offset, op->bytes, ret);
205  
206      s->in_flight--;
207      s->bytes_in_flight -= op->bytes;
208      iov = op->qiov.iov;
209      for (i = 0; i < op->qiov.niov; i++) {
210          MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
211          QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
212          s->buf_free_count++;
213      }
214  
215      chunk_num = op->offset / s->granularity;
216      nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
217  
218      bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
219      QTAILQ_REMOVE(&s->ops_in_flight, op, next);
220      if (ret >= 0) {
221          if (s->cow_bitmap) {
222              bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
223          }
224          if (!s->initial_zeroing_ongoing) {
225              job_progress_update(&s->common.job, op->bytes);
226          }
227      }
228      qemu_iovec_destroy(&op->qiov);
229  
230      qemu_co_queue_restart_all(&op->waiting_requests);
231      g_free(op);
232  }
233  
mirror_write_complete(MirrorOp * op,int ret)234  static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
235  {
236      MirrorBlockJob *s = op->s;
237  
238      if (ret < 0) {
239          BlockErrorAction action;
240  
241          bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
242          action = mirror_error_action(s, false, -ret);
243          if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
244              s->ret = ret;
245          }
246      }
247  
248      mirror_iteration_done(op, ret);
249  }
250  
mirror_read_complete(MirrorOp * op,int ret)251  static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
252  {
253      MirrorBlockJob *s = op->s;
254  
255      if (ret < 0) {
256          BlockErrorAction action;
257  
258          bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
259          action = mirror_error_action(s, true, -ret);
260          if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
261              s->ret = ret;
262          }
263  
264          mirror_iteration_done(op, ret);
265          return;
266      }
267  
268      ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0);
269      mirror_write_complete(op, ret);
270  }
271  
272  /* Clip bytes relative to offset to not exceed end-of-file */
mirror_clip_bytes(MirrorBlockJob * s,int64_t offset,int64_t bytes)273  static inline int64_t mirror_clip_bytes(MirrorBlockJob *s,
274                                          int64_t offset,
275                                          int64_t bytes)
276  {
277      return MIN(bytes, s->bdev_length - offset);
278  }
279  
280  /* Round offset and/or bytes to target cluster if COW is needed, and
281   * return the offset of the adjusted tail against original. */
mirror_cow_align(MirrorBlockJob * s,int64_t * offset,uint64_t * bytes)282  static int coroutine_fn mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
283                                           uint64_t *bytes)
284  {
285      bool need_cow;
286      int ret = 0;
287      int64_t align_offset = *offset;
288      int64_t align_bytes = *bytes;
289      int max_bytes = s->granularity * s->max_iov;
290  
291      need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap);
292      need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity,
293                            s->cow_bitmap);
294      if (need_cow) {
295          bdrv_round_to_subclusters(blk_bs(s->target), *offset, *bytes,
296                                    &align_offset, &align_bytes);
297      }
298  
299      if (align_bytes > max_bytes) {
300          align_bytes = max_bytes;
301          if (need_cow) {
302              align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size);
303          }
304      }
305      /* Clipping may result in align_bytes unaligned to chunk boundary, but
306       * that doesn't matter because it's already the end of source image. */
307      align_bytes = mirror_clip_bytes(s, align_offset, align_bytes);
308  
309      ret = align_offset + align_bytes - (*offset + *bytes);
310      *offset = align_offset;
311      *bytes = align_bytes;
312      assert(ret >= 0);
313      return ret;
314  }
315  
316  static inline void coroutine_fn
mirror_wait_for_free_in_flight_slot(MirrorBlockJob * s)317  mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
318  {
319      MirrorOp *op;
320  
321      QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
322          /*
323           * Do not wait on pseudo ops, because it may in turn wait on
324           * some other operation to start, which may in fact be the
325           * caller of this function.  Since there is only one pseudo op
326           * at any given time, we will always find some real operation
327           * to wait on.
328           * Also, do not wait on active operations, because they do not
329           * use up in-flight slots.
330           */
331          if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
332              qemu_co_queue_wait(&op->waiting_requests, NULL);
333              return;
334          }
335      }
336      abort();
337  }
338  
339  /* Perform a mirror copy operation.
340   *
341   * *op->bytes_handled is set to the number of bytes copied after and
342   * including offset, excluding any bytes copied prior to offset due
343   * to alignment.  This will be op->bytes if no alignment is necessary,
344   * or (new_end - op->offset) if the tail is rounded up or down due to
345   * alignment or buffer limit.
346   */
mirror_co_read(void * opaque)347  static void coroutine_fn mirror_co_read(void *opaque)
348  {
349      MirrorOp *op = opaque;
350      MirrorBlockJob *s = op->s;
351      int nb_chunks;
352      int ret = -1;
353      uint64_t max_bytes;
354  
355      max_bytes = s->granularity * s->max_iov;
356  
357      /* We can only handle as much as buf_size at a time. */
358      op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
359      assert(op->bytes);
360      assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
361      *op->bytes_handled = op->bytes;
362  
363      if (s->cow_bitmap) {
364          *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
365      }
366      /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
367      assert(*op->bytes_handled <= UINT_MAX);
368      assert(op->bytes <= s->buf_size);
369      /* The offset is granularity-aligned because:
370       * 1) Caller passes in aligned values;
371       * 2) mirror_cow_align is used only when target cluster is larger. */
372      assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
373      /* The range is sector-aligned, since bdrv_getlength() rounds up. */
374      assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
375      nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
376  
377      while (s->buf_free_count < nb_chunks) {
378          trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
379          mirror_wait_for_free_in_flight_slot(s);
380      }
381  
382      /* Now make a QEMUIOVector taking enough granularity-sized chunks
383       * from s->buf_free.
384       */
385      qemu_iovec_init(&op->qiov, nb_chunks);
386      while (nb_chunks-- > 0) {
387          MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
388          size_t remaining = op->bytes - op->qiov.size;
389  
390          QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
391          s->buf_free_count--;
392          qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
393      }
394  
395      /* Copy the dirty cluster.  */
396      s->in_flight++;
397      s->bytes_in_flight += op->bytes;
398      op->is_in_flight = true;
399      trace_mirror_one_iteration(s, op->offset, op->bytes);
400  
401      WITH_GRAPH_RDLOCK_GUARD() {
402          ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
403                               &op->qiov, 0);
404      }
405      mirror_read_complete(op, ret);
406  }
407  
mirror_co_zero(void * opaque)408  static void coroutine_fn mirror_co_zero(void *opaque)
409  {
410      MirrorOp *op = opaque;
411      int ret;
412  
413      op->s->in_flight++;
414      op->s->bytes_in_flight += op->bytes;
415      *op->bytes_handled = op->bytes;
416      op->is_in_flight = true;
417  
418      ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
419                                 op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
420      mirror_write_complete(op, ret);
421  }
422  
mirror_co_discard(void * opaque)423  static void coroutine_fn mirror_co_discard(void *opaque)
424  {
425      MirrorOp *op = opaque;
426      int ret;
427  
428      op->s->in_flight++;
429      op->s->bytes_in_flight += op->bytes;
430      *op->bytes_handled = op->bytes;
431      op->is_in_flight = true;
432  
433      ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
434      mirror_write_complete(op, ret);
435  }
436  
mirror_perform(MirrorBlockJob * s,int64_t offset,unsigned bytes,MirrorMethod mirror_method)437  static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
438                                 unsigned bytes, MirrorMethod mirror_method)
439  {
440      MirrorOp *op;
441      Coroutine *co;
442      int64_t bytes_handled = -1;
443  
444      op = g_new(MirrorOp, 1);
445      *op = (MirrorOp){
446          .s              = s,
447          .offset         = offset,
448          .bytes          = bytes,
449          .bytes_handled  = &bytes_handled,
450      };
451      qemu_co_queue_init(&op->waiting_requests);
452  
453      switch (mirror_method) {
454      case MIRROR_METHOD_COPY:
455          co = qemu_coroutine_create(mirror_co_read, op);
456          break;
457      case MIRROR_METHOD_ZERO:
458          co = qemu_coroutine_create(mirror_co_zero, op);
459          break;
460      case MIRROR_METHOD_DISCARD:
461          co = qemu_coroutine_create(mirror_co_discard, op);
462          break;
463      default:
464          abort();
465      }
466      op->co = co;
467  
468      QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
469      qemu_coroutine_enter(co);
470      /* At this point, ownership of op has been moved to the coroutine
471       * and the object may already be freed */
472  
473      /* Assert that this value has been set */
474      assert(bytes_handled >= 0);
475  
476      /* Same assertion as in mirror_co_read() (and for mirror_co_read()
477       * and mirror_co_discard(), bytes_handled == op->bytes, which
478       * is the @bytes parameter given to this function) */
479      assert(bytes_handled <= UINT_MAX);
480      return bytes_handled;
481  }
482  
mirror_iteration(MirrorBlockJob * s)483  static void coroutine_fn GRAPH_UNLOCKED mirror_iteration(MirrorBlockJob *s)
484  {
485      BlockDriverState *source;
486      MirrorOp *pseudo_op;
487      int64_t offset;
488      /* At least the first dirty chunk is mirrored in one iteration. */
489      int nb_chunks = 1;
490      bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
491      int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES);
492  
493      bdrv_graph_co_rdlock();
494      source = s->mirror_top_bs->backing->bs;
495      bdrv_graph_co_rdunlock();
496  
497      bdrv_dirty_bitmap_lock(s->dirty_bitmap);
498      offset = bdrv_dirty_iter_next(s->dbi);
499      if (offset < 0) {
500          bdrv_set_dirty_iter(s->dbi, 0);
501          offset = bdrv_dirty_iter_next(s->dbi);
502          trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
503          assert(offset >= 0);
504      }
505      bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
506  
507      /*
508       * Wait for concurrent requests to @offset.  The next loop will limit the
509       * copied area based on in_flight_bitmap so we only copy an area that does
510       * not overlap with concurrent in-flight requests.  Still, we would like to
511       * copy something, so wait until there are at least no more requests to the
512       * very beginning of the area.
513       */
514      mirror_wait_on_conflicts(NULL, s, offset, 1);
515  
516      job_pause_point(&s->common.job);
517  
518      /* Find the number of consecutive dirty chunks following the first dirty
519       * one, and wait for in flight requests in them. */
520      bdrv_dirty_bitmap_lock(s->dirty_bitmap);
521      while (nb_chunks * s->granularity < s->buf_size) {
522          int64_t next_dirty;
523          int64_t next_offset = offset + nb_chunks * s->granularity;
524          int64_t next_chunk = next_offset / s->granularity;
525          if (next_offset >= s->bdev_length ||
526              !bdrv_dirty_bitmap_get_locked(s->dirty_bitmap, next_offset)) {
527              break;
528          }
529          if (test_bit(next_chunk, s->in_flight_bitmap)) {
530              break;
531          }
532  
533          next_dirty = bdrv_dirty_iter_next(s->dbi);
534          if (next_dirty > next_offset || next_dirty < 0) {
535              /* The bitmap iterator's cache is stale, refresh it */
536              bdrv_set_dirty_iter(s->dbi, next_offset);
537              next_dirty = bdrv_dirty_iter_next(s->dbi);
538          }
539          assert(next_dirty == next_offset);
540          nb_chunks++;
541      }
542  
543      /* Clear dirty bits before querying the block status, because
544       * calling bdrv_block_status_above could yield - if some blocks are
545       * marked dirty in this window, we need to know.
546       */
547      bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset,
548                                     nb_chunks * s->granularity);
549      bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
550  
551      /* Before claiming an area in the in-flight bitmap, we have to
552       * create a MirrorOp for it so that conflicting requests can wait
553       * for it.  mirror_perform() will create the real MirrorOps later,
554       * for now we just create a pseudo operation that will wake up all
555       * conflicting requests once all real operations have been
556       * launched. */
557      pseudo_op = g_new(MirrorOp, 1);
558      *pseudo_op = (MirrorOp){
559          .offset         = offset,
560          .bytes          = nb_chunks * s->granularity,
561          .is_pseudo_op   = true,
562      };
563      qemu_co_queue_init(&pseudo_op->waiting_requests);
564      QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);
565  
566      bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
567      while (nb_chunks > 0 && offset < s->bdev_length) {
568          int ret = -1;
569          int64_t io_bytes;
570          int64_t io_bytes_acct;
571          MirrorMethod mirror_method = MIRROR_METHOD_COPY;
572  
573          assert(!(offset % s->granularity));
574          WITH_GRAPH_RDLOCK_GUARD() {
575              ret = bdrv_co_block_status_above(source, NULL, offset,
576                                               nb_chunks * s->granularity,
577                                               &io_bytes, NULL, NULL);
578          }
579          if (ret < 0) {
580              io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes);
581          } else if (ret & BDRV_BLOCK_DATA) {
582              io_bytes = MIN(io_bytes, max_io_bytes);
583          }
584  
585          io_bytes -= io_bytes % s->granularity;
586          if (io_bytes < s->granularity) {
587              io_bytes = s->granularity;
588          } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
589              int64_t target_offset;
590              int64_t target_bytes;
591              WITH_GRAPH_RDLOCK_GUARD() {
592                  bdrv_round_to_subclusters(blk_bs(s->target), offset, io_bytes,
593                                            &target_offset, &target_bytes);
594              }
595              if (target_offset == offset &&
596                  target_bytes == io_bytes) {
597                  mirror_method = ret & BDRV_BLOCK_ZERO ?
598                                      MIRROR_METHOD_ZERO :
599                                      MIRROR_METHOD_DISCARD;
600              }
601          }
602  
603          while (s->in_flight >= MAX_IN_FLIGHT) {
604              trace_mirror_yield_in_flight(s, offset, s->in_flight);
605              mirror_wait_for_free_in_flight_slot(s);
606          }
607  
608          if (s->ret < 0) {
609              ret = 0;
610              goto fail;
611          }
612  
613          io_bytes = mirror_clip_bytes(s, offset, io_bytes);
614          io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
615          if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
616              io_bytes_acct = 0;
617          } else {
618              io_bytes_acct = io_bytes;
619          }
620          assert(io_bytes);
621          offset += io_bytes;
622          nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
623          block_job_ratelimit_processed_bytes(&s->common, io_bytes_acct);
624      }
625  
626  fail:
627      QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
628      qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
629      g_free(pseudo_op);
630  }
631  
mirror_free_init(MirrorBlockJob * s)632  static void mirror_free_init(MirrorBlockJob *s)
633  {
634      int granularity = s->granularity;
635      size_t buf_size = s->buf_size;
636      uint8_t *buf = s->buf;
637  
638      assert(s->buf_free_count == 0);
639      QSIMPLEQ_INIT(&s->buf_free);
640      while (buf_size != 0) {
641          MirrorBuffer *cur = (MirrorBuffer *)buf;
642          QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
643          s->buf_free_count++;
644          buf_size -= granularity;
645          buf += granularity;
646      }
647  }
648  
649  /* This is also used for the .pause callback. There is no matching
650   * mirror_resume() because mirror_run() will begin iterating again
651   * when the job is resumed.
652   */
mirror_wait_for_all_io(MirrorBlockJob * s)653  static void coroutine_fn mirror_wait_for_all_io(MirrorBlockJob *s)
654  {
655      while (s->in_flight > 0) {
656          mirror_wait_for_free_in_flight_slot(s);
657      }
658  }
659  
660  /**
661   * mirror_exit_common: handle both abort() and prepare() cases.
662   * for .prepare, returns 0 on success and -errno on failure.
663   * for .abort cases, denoted by abort = true, MUST return 0.
664   */
mirror_exit_common(Job * job)665  static int mirror_exit_common(Job *job)
666  {
667      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
668      BlockJob *bjob = &s->common;
669      MirrorBDSOpaque *bs_opaque;
670      BlockDriverState *src;
671      BlockDriverState *target_bs;
672      BlockDriverState *mirror_top_bs;
673      Error *local_err = NULL;
674      bool abort = job->ret < 0;
675      int ret = 0;
676  
677      GLOBAL_STATE_CODE();
678  
679      if (s->prepared) {
680          return 0;
681      }
682      s->prepared = true;
683  
684      bdrv_graph_rdlock_main_loop();
685  
686      mirror_top_bs = s->mirror_top_bs;
687      bs_opaque = mirror_top_bs->opaque;
688      src = mirror_top_bs->backing->bs;
689      target_bs = blk_bs(s->target);
690  
691      if (bdrv_chain_contains(src, target_bs)) {
692          bdrv_unfreeze_backing_chain(mirror_top_bs, target_bs);
693      }
694  
695      bdrv_release_dirty_bitmap(s->dirty_bitmap);
696  
697      /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
698       * before we can call bdrv_drained_end */
699      bdrv_ref(src);
700      bdrv_ref(mirror_top_bs);
701      bdrv_ref(target_bs);
702  
703      bdrv_graph_rdunlock_main_loop();
704  
705      /*
706       * Remove target parent that still uses BLK_PERM_WRITE/RESIZE before
707       * inserting target_bs at s->to_replace, where we might not be able to get
708       * these permissions.
709       */
710      blk_unref(s->target);
711      s->target = NULL;
712  
713      /* We don't access the source any more. Dropping any WRITE/RESIZE is
714       * required before it could become a backing file of target_bs. Not having
715       * these permissions any more means that we can't allow any new requests on
716       * mirror_top_bs from now on, so keep it drained. */
717      bdrv_drained_begin(mirror_top_bs);
718      bdrv_drained_begin(target_bs);
719      bs_opaque->stop = true;
720  
721      bdrv_graph_rdlock_main_loop();
722      bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
723                               &error_abort);
724  
725      if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
726          BlockDriverState *backing = s->is_none_mode ? src : s->base;
727          BlockDriverState *unfiltered_target = bdrv_skip_filters(target_bs);
728  
729          if (bdrv_cow_bs(unfiltered_target) != backing) {
730              bdrv_set_backing_hd(unfiltered_target, backing, &local_err);
731              if (local_err) {
732                  error_report_err(local_err);
733                  local_err = NULL;
734                  ret = -EPERM;
735              }
736          }
737      } else if (!abort && s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) {
738          assert(!bdrv_backing_chain_next(target_bs));
739          ret = bdrv_open_backing_file(bdrv_skip_filters(target_bs), NULL,
740                                       "backing", &local_err);
741          if (ret < 0) {
742              error_report_err(local_err);
743              local_err = NULL;
744          }
745      }
746      bdrv_graph_rdunlock_main_loop();
747  
748      if (s->should_complete && !abort) {
749          BlockDriverState *to_replace = s->to_replace ?: src;
750          bool ro = bdrv_is_read_only(to_replace);
751  
752          if (ro != bdrv_is_read_only(target_bs)) {
753              bdrv_reopen_set_read_only(target_bs, ro, NULL);
754          }
755  
756          /* The mirror job has no requests in flight any more, but we need to
757           * drain potential other users of the BDS before changing the graph. */
758          assert(s->in_drain);
759          bdrv_drained_begin(to_replace);
760          /*
761           * Cannot use check_to_replace_node() here, because that would
762           * check for an op blocker on @to_replace, and we have our own
763           * there.
764           */
765          bdrv_graph_wrlock();
766          if (bdrv_recurse_can_replace(src, to_replace)) {
767              bdrv_replace_node(to_replace, target_bs, &local_err);
768          } else {
769              error_setg(&local_err, "Can no longer replace '%s' by '%s', "
770                         "because it can no longer be guaranteed that doing so "
771                         "would not lead to an abrupt change of visible data",
772                         to_replace->node_name, target_bs->node_name);
773          }
774          bdrv_graph_wrunlock();
775          bdrv_drained_end(to_replace);
776          if (local_err) {
777              error_report_err(local_err);
778              ret = -EPERM;
779          }
780      }
781      if (s->to_replace) {
782          bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
783          error_free(s->replace_blocker);
784          bdrv_unref(s->to_replace);
785      }
786      g_free(s->replaces);
787  
788      /*
789       * Remove the mirror filter driver from the graph. Before this, get rid of
790       * the blockers on the intermediate nodes so that the resulting state is
791       * valid.
792       */
793      block_job_remove_all_bdrv(bjob);
794      bdrv_graph_wrlock();
795      bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
796      bdrv_graph_wrunlock();
797  
798      if (abort && s->base_ro && !bdrv_is_read_only(target_bs)) {
799          bdrv_reopen_set_read_only(target_bs, true, NULL);
800      }
801  
802      bdrv_drained_end(target_bs);
803      bdrv_unref(target_bs);
804  
805      bs_opaque->job = NULL;
806  
807      bdrv_drained_end(src);
808      bdrv_drained_end(mirror_top_bs);
809      s->in_drain = false;
810      bdrv_unref(mirror_top_bs);
811      bdrv_unref(src);
812  
813      return ret;
814  }
815  
mirror_prepare(Job * job)816  static int mirror_prepare(Job *job)
817  {
818      return mirror_exit_common(job);
819  }
820  
mirror_abort(Job * job)821  static void mirror_abort(Job *job)
822  {
823      int ret = mirror_exit_common(job);
824      assert(ret == 0);
825  }
826  
mirror_throttle(MirrorBlockJob * s)827  static void coroutine_fn mirror_throttle(MirrorBlockJob *s)
828  {
829      int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
830  
831      if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) {
832          s->last_pause_ns = now;
833          job_sleep_ns(&s->common.job, 0);
834      } else {
835          job_pause_point(&s->common.job);
836      }
837  }
838  
mirror_dirty_init(MirrorBlockJob * s)839  static int coroutine_fn GRAPH_UNLOCKED mirror_dirty_init(MirrorBlockJob *s)
840  {
841      int64_t offset;
842      BlockDriverState *bs;
843      BlockDriverState *target_bs = blk_bs(s->target);
844      int ret = -1;
845      int64_t count;
846  
847      bdrv_graph_co_rdlock();
848      bs = s->mirror_top_bs->backing->bs;
849      bdrv_graph_co_rdunlock();
850  
851      if (s->zero_target) {
852          if (!bdrv_can_write_zeroes_with_unmap(target_bs)) {
853              bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length);
854              return 0;
855          }
856  
857          s->initial_zeroing_ongoing = true;
858          for (offset = 0; offset < s->bdev_length; ) {
859              int bytes = MIN(s->bdev_length - offset,
860                              QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
861  
862              mirror_throttle(s);
863  
864              if (job_is_cancelled(&s->common.job)) {
865                  s->initial_zeroing_ongoing = false;
866                  return 0;
867              }
868  
869              if (s->in_flight >= MAX_IN_FLIGHT) {
870                  trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
871                                     s->in_flight);
872                  mirror_wait_for_free_in_flight_slot(s);
873                  continue;
874              }
875  
876              mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
877              offset += bytes;
878          }
879  
880          mirror_wait_for_all_io(s);
881          s->initial_zeroing_ongoing = false;
882      }
883  
884      /* First part, loop on the sectors and initialize the dirty bitmap.  */
885      for (offset = 0; offset < s->bdev_length; ) {
886          /* Just to make sure we are not exceeding int limit. */
887          int bytes = MIN(s->bdev_length - offset,
888                          QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
889  
890          mirror_throttle(s);
891  
892          if (job_is_cancelled(&s->common.job)) {
893              return 0;
894          }
895  
896          WITH_GRAPH_RDLOCK_GUARD() {
897              ret = bdrv_co_is_allocated_above(bs, s->base_overlay, true, offset,
898                                               bytes, &count);
899          }
900          if (ret < 0) {
901              return ret;
902          }
903  
904          assert(count);
905          if (ret > 0) {
906              bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count);
907          }
908          offset += count;
909      }
910      return 0;
911  }
912  
913  /* Called when going out of the streaming phase to flush the bulk of the
914   * data to the medium, or just before completing.
915   */
mirror_flush(MirrorBlockJob * s)916  static int coroutine_fn mirror_flush(MirrorBlockJob *s)
917  {
918      int ret = blk_co_flush(s->target);
919      if (ret < 0) {
920          if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
921              s->ret = ret;
922          }
923      }
924      return ret;
925  }
926  
mirror_run(Job * job,Error ** errp)927  static int coroutine_fn mirror_run(Job *job, Error **errp)
928  {
929      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
930      BlockDriverState *bs;
931      MirrorBDSOpaque *mirror_top_opaque = s->mirror_top_bs->opaque;
932      BlockDriverState *target_bs = blk_bs(s->target);
933      bool need_drain = true;
934      BlockDeviceIoStatus iostatus = BLOCK_DEVICE_IO_STATUS__MAX;
935      int64_t length;
936      int64_t target_length;
937      BlockDriverInfo bdi;
938      char backing_filename[2]; /* we only need 2 characters because we are only
939                                   checking for a NULL string */
940      int ret = 0;
941  
942      bdrv_graph_co_rdlock();
943      bs = bdrv_filter_bs(s->mirror_top_bs);
944      bdrv_graph_co_rdunlock();
945  
946      if (job_is_cancelled(&s->common.job)) {
947          goto immediate_exit;
948      }
949  
950      bdrv_graph_co_rdlock();
951      s->bdev_length = bdrv_co_getlength(bs);
952      bdrv_graph_co_rdunlock();
953  
954      if (s->bdev_length < 0) {
955          ret = s->bdev_length;
956          goto immediate_exit;
957      }
958  
959      target_length = blk_co_getlength(s->target);
960      if (target_length < 0) {
961          ret = target_length;
962          goto immediate_exit;
963      }
964  
965      /* Active commit must resize the base image if its size differs from the
966       * active layer. */
967      if (s->base == blk_bs(s->target)) {
968          if (s->bdev_length > target_length) {
969              ret = blk_co_truncate(s->target, s->bdev_length, false,
970                                    PREALLOC_MODE_OFF, 0, NULL);
971              if (ret < 0) {
972                  goto immediate_exit;
973              }
974          }
975      } else if (s->bdev_length != target_length) {
976          error_setg(errp, "Source and target image have different sizes");
977          ret = -EINVAL;
978          goto immediate_exit;
979      }
980  
981      if (s->bdev_length == 0) {
982          /* Transition to the READY state and wait for complete. */
983          job_transition_to_ready(&s->common.job);
984          qatomic_set(&s->actively_synced, true);
985          while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
986              job_yield(&s->common.job);
987          }
988          goto immediate_exit;
989      }
990  
991      length = DIV_ROUND_UP(s->bdev_length, s->granularity);
992      s->in_flight_bitmap = bitmap_new(length);
993  
994      /* If we have no backing file yet in the destination, we cannot let
995       * the destination do COW.  Instead, we copy sectors around the
996       * dirty data if needed.  We need a bitmap to do that.
997       */
998      bdrv_get_backing_filename(target_bs, backing_filename,
999                                sizeof(backing_filename));
1000      bdrv_graph_co_rdlock();
1001      if (!bdrv_co_get_info(target_bs, &bdi) && bdi.cluster_size) {
1002          s->target_cluster_size = bdi.cluster_size;
1003      } else {
1004          s->target_cluster_size = BDRV_SECTOR_SIZE;
1005      }
1006      if (backing_filename[0] && !bdrv_backing_chain_next(target_bs) &&
1007          s->granularity < s->target_cluster_size) {
1008          s->buf_size = MAX(s->buf_size, s->target_cluster_size);
1009          s->cow_bitmap = bitmap_new(length);
1010      }
1011      s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov);
1012      bdrv_graph_co_rdunlock();
1013  
1014      s->buf = qemu_try_blockalign(bs, s->buf_size);
1015      if (s->buf == NULL) {
1016          ret = -ENOMEM;
1017          goto immediate_exit;
1018      }
1019  
1020      mirror_free_init(s);
1021  
1022      s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1023      if (!s->is_none_mode) {
1024          ret = mirror_dirty_init(s);
1025          if (ret < 0 || job_is_cancelled(&s->common.job)) {
1026              goto immediate_exit;
1027          }
1028      }
1029  
1030      /*
1031       * Only now the job is fully initialised and mirror_top_bs should start
1032       * accessing it.
1033       */
1034      mirror_top_opaque->job = s;
1035  
1036      assert(!s->dbi);
1037      s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap);
1038      for (;;) {
1039          int64_t cnt, delta;
1040          bool should_complete;
1041  
1042          if (s->ret < 0) {
1043              ret = s->ret;
1044              goto immediate_exit;
1045          }
1046  
1047          job_pause_point(&s->common.job);
1048  
1049          if (job_is_cancelled(&s->common.job)) {
1050              ret = 0;
1051              goto immediate_exit;
1052          }
1053  
1054          cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1055          /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
1056           * the number of bytes currently being processed; together those are
1057           * the current remaining operation length */
1058          job_progress_set_remaining(&s->common.job,
1059                                     s->bytes_in_flight + cnt +
1060                                     s->active_write_bytes_in_flight);
1061  
1062          /* Note that even when no rate limit is applied we need to yield
1063           * periodically with no pending I/O so that bdrv_drain_all() returns.
1064           * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
1065           * an error, or when the source is clean, whichever comes first. */
1066          delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
1067          WITH_JOB_LOCK_GUARD() {
1068              iostatus = s->common.iostatus;
1069          }
1070          if (delta < BLOCK_JOB_SLICE_TIME &&
1071              iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1072              if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
1073                  (cnt == 0 && s->in_flight > 0)) {
1074                  trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
1075                  mirror_wait_for_free_in_flight_slot(s);
1076                  continue;
1077              } else if (cnt != 0) {
1078                  mirror_iteration(s);
1079              }
1080          }
1081  
1082          should_complete = false;
1083          if (s->in_flight == 0 && cnt == 0) {
1084              trace_mirror_before_flush(s);
1085              if (!job_is_ready(&s->common.job)) {
1086                  if (mirror_flush(s) < 0) {
1087                      /* Go check s->ret.  */
1088                      continue;
1089                  }
1090                  /* We're out of the streaming phase.  From now on, if the job
1091                   * is cancelled we will actually complete all pending I/O and
1092                   * report completion.  This way, block-job-cancel will leave
1093                   * the target in a consistent state.
1094                   */
1095                  job_transition_to_ready(&s->common.job);
1096              }
1097              if (qatomic_read(&s->copy_mode) != MIRROR_COPY_MODE_BACKGROUND) {
1098                  qatomic_set(&s->actively_synced, true);
1099              }
1100  
1101              should_complete = s->should_complete ||
1102                  job_cancel_requested(&s->common.job);
1103              cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1104          }
1105  
1106          if (cnt == 0 && should_complete) {
1107              /* The dirty bitmap is not updated while operations are pending.
1108               * If we're about to exit, wait for pending operations before
1109               * calling bdrv_get_dirty_count(bs), or we may exit while the
1110               * source has dirty data to copy!
1111               *
1112               * Note that I/O can be submitted by the guest while
1113               * mirror_populate runs, so pause it now.  Before deciding
1114               * whether to switch to target check one last time if I/O has
1115               * come in the meanwhile, and if not flush the data to disk.
1116               */
1117              trace_mirror_before_drain(s, cnt);
1118  
1119              s->in_drain = true;
1120              bdrv_drained_begin(bs);
1121  
1122              /* Must be zero because we are drained */
1123              assert(s->in_active_write_counter == 0);
1124  
1125              cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1126              if (cnt > 0 || mirror_flush(s) < 0) {
1127                  bdrv_drained_end(bs);
1128                  s->in_drain = false;
1129                  continue;
1130              }
1131  
1132              /* The two disks are in sync.  Exit and report successful
1133               * completion.
1134               */
1135              assert(QLIST_EMPTY(&bs->tracked_requests));
1136              need_drain = false;
1137              break;
1138          }
1139  
1140          if (job_is_ready(&s->common.job) && !should_complete) {
1141              if (s->in_flight == 0 && cnt == 0) {
1142                  trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
1143                                            BLOCK_JOB_SLICE_TIME);
1144                  job_sleep_ns(&s->common.job, BLOCK_JOB_SLICE_TIME);
1145              }
1146          } else {
1147              block_job_ratelimit_sleep(&s->common);
1148          }
1149          s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1150      }
1151  
1152  immediate_exit:
1153      if (s->in_flight > 0) {
1154          /* We get here only if something went wrong.  Either the job failed,
1155           * or it was cancelled prematurely so that we do not guarantee that
1156           * the target is a copy of the source.
1157           */
1158          assert(ret < 0 || job_is_cancelled(&s->common.job));
1159          assert(need_drain);
1160          mirror_wait_for_all_io(s);
1161      }
1162  
1163      assert(s->in_flight == 0);
1164      qemu_vfree(s->buf);
1165      g_free(s->cow_bitmap);
1166      g_free(s->in_flight_bitmap);
1167      bdrv_dirty_iter_free(s->dbi);
1168  
1169      if (need_drain) {
1170          s->in_drain = true;
1171          bdrv_drained_begin(bs);
1172      }
1173  
1174      return ret;
1175  }
1176  
mirror_complete(Job * job,Error ** errp)1177  static void mirror_complete(Job *job, Error **errp)
1178  {
1179      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1180  
1181      if (!job_is_ready(job)) {
1182          error_setg(errp, "The active block job '%s' cannot be completed",
1183                     job->id);
1184          return;
1185      }
1186  
1187      /* block all operations on to_replace bs */
1188      if (s->replaces) {
1189          s->to_replace = bdrv_find_node(s->replaces);
1190          if (!s->to_replace) {
1191              error_setg(errp, "Node name '%s' not found", s->replaces);
1192              return;
1193          }
1194  
1195          /* TODO Translate this into child freeze system. */
1196          error_setg(&s->replace_blocker,
1197                     "block device is in use by block-job-complete");
1198          bdrv_op_block_all(s->to_replace, s->replace_blocker);
1199          bdrv_ref(s->to_replace);
1200      }
1201  
1202      s->should_complete = true;
1203  
1204      /* If the job is paused, it will be re-entered when it is resumed */
1205      WITH_JOB_LOCK_GUARD() {
1206          if (!job->paused) {
1207              job_enter_cond_locked(job, NULL);
1208          }
1209      }
1210  }
1211  
mirror_pause(Job * job)1212  static void coroutine_fn mirror_pause(Job *job)
1213  {
1214      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1215  
1216      mirror_wait_for_all_io(s);
1217  }
1218  
mirror_drained_poll(BlockJob * job)1219  static bool mirror_drained_poll(BlockJob *job)
1220  {
1221      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1222  
1223      /* If the job isn't paused nor cancelled, we can't be sure that it won't
1224       * issue more requests. We make an exception if we've reached this point
1225       * from one of our own drain sections, to avoid a deadlock waiting for
1226       * ourselves.
1227       */
1228      WITH_JOB_LOCK_GUARD() {
1229          if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
1230              && !s->in_drain) {
1231              return true;
1232          }
1233      }
1234  
1235      return !!s->in_flight;
1236  }
1237  
mirror_cancel(Job * job,bool force)1238  static bool mirror_cancel(Job *job, bool force)
1239  {
1240      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1241      BlockDriverState *target = blk_bs(s->target);
1242  
1243      /*
1244       * Before the job is READY, we treat any cancellation like a
1245       * force-cancellation.
1246       */
1247      force = force || !job_is_ready(job);
1248  
1249      if (force) {
1250          bdrv_cancel_in_flight(target);
1251      }
1252      return force;
1253  }
1254  
commit_active_cancel(Job * job,bool force)1255  static bool commit_active_cancel(Job *job, bool force)
1256  {
1257      /* Same as above in mirror_cancel() */
1258      return force || !job_is_ready(job);
1259  }
1260  
mirror_change(BlockJob * job,BlockJobChangeOptions * opts,Error ** errp)1261  static void mirror_change(BlockJob *job, BlockJobChangeOptions *opts,
1262                            Error **errp)
1263  {
1264      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1265      BlockJobChangeOptionsMirror *change_opts = &opts->u.mirror;
1266      MirrorCopyMode current;
1267  
1268      /*
1269       * The implementation relies on the fact that copy_mode is only written
1270       * under the BQL. Otherwise, further synchronization would be required.
1271       */
1272  
1273      GLOBAL_STATE_CODE();
1274  
1275      if (qatomic_read(&s->copy_mode) == change_opts->copy_mode) {
1276          return;
1277      }
1278  
1279      if (change_opts->copy_mode != MIRROR_COPY_MODE_WRITE_BLOCKING) {
1280          error_setg(errp, "Change to copy mode '%s' is not implemented",
1281                     MirrorCopyMode_str(change_opts->copy_mode));
1282          return;
1283      }
1284  
1285      current = qatomic_cmpxchg(&s->copy_mode, MIRROR_COPY_MODE_BACKGROUND,
1286                                change_opts->copy_mode);
1287      if (current != MIRROR_COPY_MODE_BACKGROUND) {
1288          error_setg(errp, "Expected current copy mode '%s', got '%s'",
1289                     MirrorCopyMode_str(MIRROR_COPY_MODE_BACKGROUND),
1290                     MirrorCopyMode_str(current));
1291      }
1292  }
1293  
mirror_query(BlockJob * job,BlockJobInfo * info)1294  static void mirror_query(BlockJob *job, BlockJobInfo *info)
1295  {
1296      MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1297  
1298      info->u.mirror = (BlockJobInfoMirror) {
1299          .actively_synced = qatomic_read(&s->actively_synced),
1300      };
1301  }
1302  
1303  static const BlockJobDriver mirror_job_driver = {
1304      .job_driver = {
1305          .instance_size          = sizeof(MirrorBlockJob),
1306          .job_type               = JOB_TYPE_MIRROR,
1307          .free                   = block_job_free,
1308          .user_resume            = block_job_user_resume,
1309          .run                    = mirror_run,
1310          .prepare                = mirror_prepare,
1311          .abort                  = mirror_abort,
1312          .pause                  = mirror_pause,
1313          .complete               = mirror_complete,
1314          .cancel                 = mirror_cancel,
1315      },
1316      .drained_poll           = mirror_drained_poll,
1317      .change                 = mirror_change,
1318      .query                  = mirror_query,
1319  };
1320  
1321  static const BlockJobDriver commit_active_job_driver = {
1322      .job_driver = {
1323          .instance_size          = sizeof(MirrorBlockJob),
1324          .job_type               = JOB_TYPE_COMMIT,
1325          .free                   = block_job_free,
1326          .user_resume            = block_job_user_resume,
1327          .run                    = mirror_run,
1328          .prepare                = mirror_prepare,
1329          .abort                  = mirror_abort,
1330          .pause                  = mirror_pause,
1331          .complete               = mirror_complete,
1332          .cancel                 = commit_active_cancel,
1333      },
1334      .drained_poll           = mirror_drained_poll,
1335  };
1336  
1337  static void coroutine_fn
do_sync_target_write(MirrorBlockJob * job,MirrorMethod method,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags)1338  do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
1339                       uint64_t offset, uint64_t bytes,
1340                       QEMUIOVector *qiov, int flags)
1341  {
1342      int ret;
1343      size_t qiov_offset = 0;
1344      int64_t bitmap_offset, bitmap_end;
1345  
1346      if (!QEMU_IS_ALIGNED(offset, job->granularity) &&
1347          bdrv_dirty_bitmap_get(job->dirty_bitmap, offset))
1348      {
1349              /*
1350               * Dirty unaligned padding: ignore it.
1351               *
1352               * Reasoning:
1353               * 1. If we copy it, we can't reset corresponding bit in
1354               *    dirty_bitmap as there may be some "dirty" bytes still not
1355               *    copied.
1356               * 2. It's already dirty, so skipping it we don't diverge mirror
1357               *    progress.
1358               *
1359               * Note, that because of this, guest write may have no contribution
1360               * into mirror converge, but that's not bad, as we have background
1361               * process of mirroring. If under some bad circumstances (high guest
1362               * IO load) background process starve, we will not converge anyway,
1363               * even if each write will contribute, as guest is not guaranteed to
1364               * rewrite the whole disk.
1365               */
1366              qiov_offset = QEMU_ALIGN_UP(offset, job->granularity) - offset;
1367              if (bytes <= qiov_offset) {
1368                  /* nothing to do after shrink */
1369                  return;
1370              }
1371              offset += qiov_offset;
1372              bytes -= qiov_offset;
1373      }
1374  
1375      if (!QEMU_IS_ALIGNED(offset + bytes, job->granularity) &&
1376          bdrv_dirty_bitmap_get(job->dirty_bitmap, offset + bytes - 1))
1377      {
1378          uint64_t tail = (offset + bytes) % job->granularity;
1379  
1380          if (bytes <= tail) {
1381              /* nothing to do after shrink */
1382              return;
1383          }
1384          bytes -= tail;
1385      }
1386  
1387      /*
1388       * Tails are either clean or shrunk, so for bitmap resetting
1389       * we safely align the range down.
1390       */
1391      bitmap_offset = QEMU_ALIGN_UP(offset, job->granularity);
1392      bitmap_end = QEMU_ALIGN_DOWN(offset + bytes, job->granularity);
1393      if (bitmap_offset < bitmap_end) {
1394          bdrv_reset_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1395                                  bitmap_end - bitmap_offset);
1396      }
1397  
1398      job_progress_increase_remaining(&job->common.job, bytes);
1399      job->active_write_bytes_in_flight += bytes;
1400  
1401      switch (method) {
1402      case MIRROR_METHOD_COPY:
1403          ret = blk_co_pwritev_part(job->target, offset, bytes,
1404                                    qiov, qiov_offset, flags);
1405          break;
1406  
1407      case MIRROR_METHOD_ZERO:
1408          assert(!qiov);
1409          ret = blk_co_pwrite_zeroes(job->target, offset, bytes, flags);
1410          break;
1411  
1412      case MIRROR_METHOD_DISCARD:
1413          assert(!qiov);
1414          ret = blk_co_pdiscard(job->target, offset, bytes);
1415          break;
1416  
1417      default:
1418          abort();
1419      }
1420  
1421      job->active_write_bytes_in_flight -= bytes;
1422      if (ret >= 0) {
1423          job_progress_update(&job->common.job, bytes);
1424      } else {
1425          BlockErrorAction action;
1426  
1427          /*
1428           * We failed, so we should mark dirty the whole area, aligned up.
1429           * Note that we don't care about shrunk tails if any: they were dirty
1430           * at function start, and they must be still dirty, as we've locked
1431           * the region for in-flight op.
1432           */
1433          bitmap_offset = QEMU_ALIGN_DOWN(offset, job->granularity);
1434          bitmap_end = QEMU_ALIGN_UP(offset + bytes, job->granularity);
1435          bdrv_set_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1436                                bitmap_end - bitmap_offset);
1437          qatomic_set(&job->actively_synced, false);
1438  
1439          action = mirror_error_action(job, false, -ret);
1440          if (action == BLOCK_ERROR_ACTION_REPORT) {
1441              if (!job->ret) {
1442                  job->ret = ret;
1443              }
1444          }
1445      }
1446  }
1447  
active_write_prepare(MirrorBlockJob * s,uint64_t offset,uint64_t bytes)1448  static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
1449                                                     uint64_t offset,
1450                                                     uint64_t bytes)
1451  {
1452      MirrorOp *op;
1453      uint64_t start_chunk = offset / s->granularity;
1454      uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
1455  
1456      op = g_new(MirrorOp, 1);
1457      *op = (MirrorOp){
1458          .s                  = s,
1459          .offset             = offset,
1460          .bytes              = bytes,
1461          .is_active_write    = true,
1462          .is_in_flight       = true,
1463          .co                 = qemu_coroutine_self(),
1464      };
1465      qemu_co_queue_init(&op->waiting_requests);
1466      QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
1467  
1468      s->in_active_write_counter++;
1469  
1470      /*
1471       * Wait for concurrent requests affecting the area.  If there are already
1472       * running requests that are copying off now-to-be stale data in the area,
1473       * we must wait for them to finish before we begin writing fresh data to the
1474       * target so that the write operations appear in the correct order.
1475       * Note that background requests (see mirror_iteration()) in contrast only
1476       * wait for conflicting requests at the start of the dirty area, and then
1477       * (based on the in_flight_bitmap) truncate the area to copy so it will not
1478       * conflict with any requests beyond that.  For active writes, however, we
1479       * cannot truncate that area.  The request from our parent must be blocked
1480       * until the area is copied in full.  Therefore, we must wait for the whole
1481       * area to become free of concurrent requests.
1482       */
1483      mirror_wait_on_conflicts(op, s, offset, bytes);
1484  
1485      bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1486  
1487      return op;
1488  }
1489  
active_write_settle(MirrorOp * op)1490  static void coroutine_fn GRAPH_RDLOCK active_write_settle(MirrorOp *op)
1491  {
1492      uint64_t start_chunk = op->offset / op->s->granularity;
1493      uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
1494                                        op->s->granularity);
1495  
1496      if (!--op->s->in_active_write_counter &&
1497          qatomic_read(&op->s->actively_synced)) {
1498          BdrvChild *source = op->s->mirror_top_bs->backing;
1499  
1500          if (QLIST_FIRST(&source->bs->parents) == source &&
1501              QLIST_NEXT(source, next_parent) == NULL)
1502          {
1503              /* Assert that we are back in sync once all active write
1504               * operations are settled.
1505               * Note that we can only assert this if the mirror node
1506               * is the source node's only parent. */
1507              assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
1508          }
1509      }
1510      bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1511      QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
1512      qemu_co_queue_restart_all(&op->waiting_requests);
1513      g_free(op);
1514  }
1515  
1516  static int coroutine_fn GRAPH_RDLOCK
bdrv_mirror_top_preadv(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1517  bdrv_mirror_top_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1518                         QEMUIOVector *qiov, BdrvRequestFlags flags)
1519  {
1520      return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
1521  }
1522  
should_copy_to_target(MirrorBDSOpaque * s)1523  static bool should_copy_to_target(MirrorBDSOpaque *s)
1524  {
1525      return s->job && s->job->ret >= 0 &&
1526          !job_is_cancelled(&s->job->common.job) &&
1527          qatomic_read(&s->job->copy_mode) == MIRROR_COPY_MODE_WRITE_BLOCKING;
1528  }
1529  
1530  static int coroutine_fn GRAPH_RDLOCK
bdrv_mirror_top_do_write(BlockDriverState * bs,MirrorMethod method,bool copy_to_target,uint64_t offset,uint64_t bytes,QEMUIOVector * qiov,int flags)1531  bdrv_mirror_top_do_write(BlockDriverState *bs, MirrorMethod method,
1532                           bool copy_to_target, uint64_t offset, uint64_t bytes,
1533                           QEMUIOVector *qiov, int flags)
1534  {
1535      MirrorOp *op = NULL;
1536      MirrorBDSOpaque *s = bs->opaque;
1537      int ret = 0;
1538  
1539      if (copy_to_target) {
1540          op = active_write_prepare(s->job, offset, bytes);
1541      }
1542  
1543      switch (method) {
1544      case MIRROR_METHOD_COPY:
1545          ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
1546          break;
1547  
1548      case MIRROR_METHOD_ZERO:
1549          ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
1550          break;
1551  
1552      case MIRROR_METHOD_DISCARD:
1553          ret = bdrv_co_pdiscard(bs->backing, offset, bytes);
1554          break;
1555  
1556      default:
1557          abort();
1558      }
1559  
1560      if (!copy_to_target && s->job && s->job->dirty_bitmap) {
1561          qatomic_set(&s->job->actively_synced, false);
1562          bdrv_set_dirty_bitmap(s->job->dirty_bitmap, offset, bytes);
1563      }
1564  
1565      if (ret < 0) {
1566          goto out;
1567      }
1568  
1569      if (copy_to_target) {
1570          do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
1571      }
1572  
1573  out:
1574      if (copy_to_target) {
1575          active_write_settle(op);
1576      }
1577      return ret;
1578  }
1579  
1580  static int coroutine_fn GRAPH_RDLOCK
bdrv_mirror_top_pwritev(BlockDriverState * bs,int64_t offset,int64_t bytes,QEMUIOVector * qiov,BdrvRequestFlags flags)1581  bdrv_mirror_top_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1582                          QEMUIOVector *qiov, BdrvRequestFlags flags)
1583  {
1584      QEMUIOVector bounce_qiov;
1585      void *bounce_buf;
1586      int ret = 0;
1587      bool copy_to_target = should_copy_to_target(bs->opaque);
1588  
1589      if (copy_to_target) {
1590          /* The guest might concurrently modify the data to write; but
1591           * the data on source and destination must match, so we have
1592           * to use a bounce buffer if we are going to write to the
1593           * target now. */
1594          bounce_buf = qemu_blockalign(bs, bytes);
1595          iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);
1596  
1597          qemu_iovec_init(&bounce_qiov, 1);
1598          qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
1599          qiov = &bounce_qiov;
1600  
1601          flags &= ~BDRV_REQ_REGISTERED_BUF;
1602      }
1603  
1604      ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, copy_to_target,
1605                                     offset, bytes, qiov, flags);
1606  
1607      if (copy_to_target) {
1608          qemu_iovec_destroy(&bounce_qiov);
1609          qemu_vfree(bounce_buf);
1610      }
1611  
1612      return ret;
1613  }
1614  
bdrv_mirror_top_flush(BlockDriverState * bs)1615  static int coroutine_fn GRAPH_RDLOCK bdrv_mirror_top_flush(BlockDriverState *bs)
1616  {
1617      if (bs->backing == NULL) {
1618          /* we can be here after failed bdrv_append in mirror_start_job */
1619          return 0;
1620      }
1621      return bdrv_co_flush(bs->backing->bs);
1622  }
1623  
1624  static int coroutine_fn GRAPH_RDLOCK
bdrv_mirror_top_pwrite_zeroes(BlockDriverState * bs,int64_t offset,int64_t bytes,BdrvRequestFlags flags)1625  bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1626                                int64_t bytes, BdrvRequestFlags flags)
1627  {
1628      bool copy_to_target = should_copy_to_target(bs->opaque);
1629      return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, copy_to_target,
1630                                      offset, bytes, NULL, flags);
1631  }
1632  
1633  static int coroutine_fn GRAPH_RDLOCK
bdrv_mirror_top_pdiscard(BlockDriverState * bs,int64_t offset,int64_t bytes)1634  bdrv_mirror_top_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1635  {
1636      bool copy_to_target = should_copy_to_target(bs->opaque);
1637      return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, copy_to_target,
1638                                      offset, bytes, NULL, 0);
1639  }
1640  
bdrv_mirror_top_refresh_filename(BlockDriverState * bs)1641  static void GRAPH_RDLOCK bdrv_mirror_top_refresh_filename(BlockDriverState *bs)
1642  {
1643      if (bs->backing == NULL) {
1644          /* we can be here after failed bdrv_attach_child in
1645           * bdrv_set_backing_hd */
1646          return;
1647      }
1648      pstrcpy(bs->exact_filename, sizeof(bs->exact_filename),
1649              bs->backing->bs->filename);
1650  }
1651  
bdrv_mirror_top_child_perm(BlockDriverState * bs,BdrvChild * c,BdrvChildRole role,BlockReopenQueue * reopen_queue,uint64_t perm,uint64_t shared,uint64_t * nperm,uint64_t * nshared)1652  static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c,
1653                                         BdrvChildRole role,
1654                                         BlockReopenQueue *reopen_queue,
1655                                         uint64_t perm, uint64_t shared,
1656                                         uint64_t *nperm, uint64_t *nshared)
1657  {
1658      MirrorBDSOpaque *s = bs->opaque;
1659  
1660      if (s->stop) {
1661          /*
1662           * If the job is to be stopped, we do not need to forward
1663           * anything to the real image.
1664           */
1665          *nperm = 0;
1666          *nshared = BLK_PERM_ALL;
1667          return;
1668      }
1669  
1670      bdrv_default_perms(bs, c, role, reopen_queue,
1671                         perm, shared, nperm, nshared);
1672  
1673      if (s->is_commit) {
1674          /*
1675           * For commit jobs, we cannot take CONSISTENT_READ, because
1676           * that permission is unshared for everything above the base
1677           * node (except for filters on the base node).
1678           * We also have to force-share the WRITE permission, or
1679           * otherwise we would block ourselves at the base node (if
1680           * writes are blocked for a node, they are also blocked for
1681           * its backing file).
1682           * (We could also share RESIZE, because it may be needed for
1683           * the target if its size is less than the top node's; but
1684           * bdrv_default_perms_for_cow() automatically shares RESIZE
1685           * for backing nodes if WRITE is shared, so there is no need
1686           * to do it here.)
1687           */
1688          *nperm &= ~BLK_PERM_CONSISTENT_READ;
1689          *nshared |= BLK_PERM_WRITE;
1690      }
1691  }
1692  
1693  /* Dummy node that provides consistent read to its users without requiring it
1694   * from its backing file and that allows writes on the backing file chain. */
1695  static BlockDriver bdrv_mirror_top = {
1696      .format_name                = "mirror_top",
1697      .bdrv_co_preadv             = bdrv_mirror_top_preadv,
1698      .bdrv_co_pwritev            = bdrv_mirror_top_pwritev,
1699      .bdrv_co_pwrite_zeroes      = bdrv_mirror_top_pwrite_zeroes,
1700      .bdrv_co_pdiscard           = bdrv_mirror_top_pdiscard,
1701      .bdrv_co_flush              = bdrv_mirror_top_flush,
1702      .bdrv_refresh_filename      = bdrv_mirror_top_refresh_filename,
1703      .bdrv_child_perm            = bdrv_mirror_top_child_perm,
1704  
1705      .is_filter                  = true,
1706      .filtered_child_is_backing  = true,
1707  };
1708  
mirror_start_job(const char * job_id,BlockDriverState * bs,int creation_flags,BlockDriverState * target,const char * replaces,int64_t speed,uint32_t granularity,int64_t buf_size,BlockMirrorBackingMode backing_mode,bool zero_target,BlockdevOnError on_source_error,BlockdevOnError on_target_error,bool unmap,BlockCompletionFunc * cb,void * opaque,const BlockJobDriver * driver,bool is_none_mode,BlockDriverState * base,bool auto_complete,const char * filter_node_name,bool is_mirror,MirrorCopyMode copy_mode,bool base_ro,Error ** errp)1709  static BlockJob *mirror_start_job(
1710                               const char *job_id, BlockDriverState *bs,
1711                               int creation_flags, BlockDriverState *target,
1712                               const char *replaces, int64_t speed,
1713                               uint32_t granularity, int64_t buf_size,
1714                               BlockMirrorBackingMode backing_mode,
1715                               bool zero_target,
1716                               BlockdevOnError on_source_error,
1717                               BlockdevOnError on_target_error,
1718                               bool unmap,
1719                               BlockCompletionFunc *cb,
1720                               void *opaque,
1721                               const BlockJobDriver *driver,
1722                               bool is_none_mode, BlockDriverState *base,
1723                               bool auto_complete, const char *filter_node_name,
1724                               bool is_mirror, MirrorCopyMode copy_mode,
1725                               bool base_ro,
1726                               Error **errp)
1727  {
1728      MirrorBlockJob *s;
1729      MirrorBDSOpaque *bs_opaque;
1730      BlockDriverState *mirror_top_bs;
1731      bool target_is_backing;
1732      uint64_t target_perms, target_shared_perms;
1733      int ret;
1734  
1735      GLOBAL_STATE_CODE();
1736  
1737      if (granularity == 0) {
1738          granularity = bdrv_get_default_bitmap_granularity(target);
1739      }
1740  
1741      assert(is_power_of_2(granularity));
1742  
1743      if (buf_size < 0) {
1744          error_setg(errp, "Invalid parameter 'buf-size'");
1745          return NULL;
1746      }
1747  
1748      if (buf_size == 0) {
1749          buf_size = DEFAULT_MIRROR_BUF_SIZE;
1750      }
1751  
1752      bdrv_graph_rdlock_main_loop();
1753      if (bdrv_skip_filters(bs) == bdrv_skip_filters(target)) {
1754          error_setg(errp, "Can't mirror node into itself");
1755          bdrv_graph_rdunlock_main_loop();
1756          return NULL;
1757      }
1758  
1759      target_is_backing = bdrv_chain_contains(bs, target);
1760      bdrv_graph_rdunlock_main_loop();
1761  
1762      /* In the case of active commit, add dummy driver to provide consistent
1763       * reads on the top, while disabling it in the intermediate nodes, and make
1764       * the backing chain writable. */
1765      mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name,
1766                                           BDRV_O_RDWR, errp);
1767      if (mirror_top_bs == NULL) {
1768          return NULL;
1769      }
1770      if (!filter_node_name) {
1771          mirror_top_bs->implicit = true;
1772      }
1773  
1774      /* So that we can always drop this node */
1775      mirror_top_bs->never_freeze = true;
1776  
1777      mirror_top_bs->total_sectors = bs->total_sectors;
1778      mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1779      mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED |
1780                                            BDRV_REQ_NO_FALLBACK;
1781      bs_opaque = g_new0(MirrorBDSOpaque, 1);
1782      mirror_top_bs->opaque = bs_opaque;
1783  
1784      bs_opaque->is_commit = target_is_backing;
1785  
1786      bdrv_drained_begin(bs);
1787      ret = bdrv_append(mirror_top_bs, bs, errp);
1788      bdrv_drained_end(bs);
1789  
1790      if (ret < 0) {
1791          bdrv_unref(mirror_top_bs);
1792          return NULL;
1793      }
1794  
1795      /* Make sure that the source is not resized while the job is running */
1796      s = block_job_create(job_id, driver, NULL, mirror_top_bs,
1797                           BLK_PERM_CONSISTENT_READ,
1798                           BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1799                           BLK_PERM_WRITE, speed,
1800                           creation_flags, cb, opaque, errp);
1801      if (!s) {
1802          goto fail;
1803      }
1804  
1805      /* The block job now has a reference to this node */
1806      bdrv_unref(mirror_top_bs);
1807  
1808      s->mirror_top_bs = mirror_top_bs;
1809      s->base_ro = base_ro;
1810  
1811      /* No resize for the target either; while the mirror is still running, a
1812       * consistent read isn't necessarily possible. We could possibly allow
1813       * writes and graph modifications, though it would likely defeat the
1814       * purpose of a mirror, so leave them blocked for now.
1815       *
1816       * In the case of active commit, things look a bit different, though,
1817       * because the target is an already populated backing file in active use.
1818       * We can allow anything except resize there.*/
1819  
1820      target_perms = BLK_PERM_WRITE;
1821      target_shared_perms = BLK_PERM_WRITE_UNCHANGED;
1822  
1823      if (target_is_backing) {
1824          int64_t bs_size, target_size;
1825          bs_size = bdrv_getlength(bs);
1826          if (bs_size < 0) {
1827              error_setg_errno(errp, -bs_size,
1828                               "Could not inquire top image size");
1829              goto fail;
1830          }
1831  
1832          target_size = bdrv_getlength(target);
1833          if (target_size < 0) {
1834              error_setg_errno(errp, -target_size,
1835                               "Could not inquire base image size");
1836              goto fail;
1837          }
1838  
1839          if (target_size < bs_size) {
1840              target_perms |= BLK_PERM_RESIZE;
1841          }
1842  
1843          target_shared_perms |= BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE;
1844      } else {
1845          bdrv_graph_rdlock_main_loop();
1846          if (bdrv_chain_contains(bs, bdrv_skip_filters(target))) {
1847              /*
1848               * We may want to allow this in the future, but it would
1849               * require taking some extra care.
1850               */
1851              error_setg(errp, "Cannot mirror to a filter on top of a node in "
1852                         "the source's backing chain");
1853              bdrv_graph_rdunlock_main_loop();
1854              goto fail;
1855          }
1856          bdrv_graph_rdunlock_main_loop();
1857      }
1858  
1859      s->target = blk_new(s->common.job.aio_context,
1860                          target_perms, target_shared_perms);
1861      ret = blk_insert_bs(s->target, target, errp);
1862      if (ret < 0) {
1863          goto fail;
1864      }
1865      if (is_mirror) {
1866          /* XXX: Mirror target could be a NBD server of target QEMU in the case
1867           * of non-shared block migration. To allow migration completion, we
1868           * have to allow "inactivate" of the target BB.  When that happens, we
1869           * know the job is drained, and the vcpus are stopped, so no write
1870           * operation will be performed. Block layer already has assertions to
1871           * ensure that. */
1872          blk_set_force_allow_inactivate(s->target);
1873      }
1874      blk_set_allow_aio_context_change(s->target, true);
1875      blk_set_disable_request_queuing(s->target, true);
1876  
1877      bdrv_graph_rdlock_main_loop();
1878      s->replaces = g_strdup(replaces);
1879      s->on_source_error = on_source_error;
1880      s->on_target_error = on_target_error;
1881      s->is_none_mode = is_none_mode;
1882      s->backing_mode = backing_mode;
1883      s->zero_target = zero_target;
1884      qatomic_set(&s->copy_mode, copy_mode);
1885      s->base = base;
1886      s->base_overlay = bdrv_find_overlay(bs, base);
1887      s->granularity = granularity;
1888      s->buf_size = ROUND_UP(buf_size, granularity);
1889      s->unmap = unmap;
1890      if (auto_complete) {
1891          s->should_complete = true;
1892      }
1893      bdrv_graph_rdunlock_main_loop();
1894  
1895      s->dirty_bitmap = bdrv_create_dirty_bitmap(s->mirror_top_bs, granularity,
1896                                                 NULL, errp);
1897      if (!s->dirty_bitmap) {
1898          goto fail;
1899      }
1900  
1901      /*
1902       * The dirty bitmap is set by bdrv_mirror_top_do_write() when not in active
1903       * mode.
1904       */
1905      bdrv_disable_dirty_bitmap(s->dirty_bitmap);
1906  
1907      bdrv_graph_wrlock();
1908      ret = block_job_add_bdrv(&s->common, "source", bs, 0,
1909                               BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE |
1910                               BLK_PERM_CONSISTENT_READ,
1911                               errp);
1912      if (ret < 0) {
1913          bdrv_graph_wrunlock();
1914          goto fail;
1915      }
1916  
1917      /* Required permissions are already taken with blk_new() */
1918      block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL,
1919                         &error_abort);
1920  
1921      /* In commit_active_start() all intermediate nodes disappear, so
1922       * any jobs in them must be blocked */
1923      if (target_is_backing) {
1924          BlockDriverState *iter, *filtered_target;
1925          uint64_t iter_shared_perms;
1926  
1927          /*
1928           * The topmost node with
1929           * bdrv_skip_filters(filtered_target) == bdrv_skip_filters(target)
1930           */
1931          filtered_target = bdrv_cow_bs(bdrv_find_overlay(bs, target));
1932  
1933          assert(bdrv_skip_filters(filtered_target) ==
1934                 bdrv_skip_filters(target));
1935  
1936          /*
1937           * XXX BLK_PERM_WRITE needs to be allowed so we don't block
1938           * ourselves at s->base (if writes are blocked for a node, they are
1939           * also blocked for its backing file). The other options would be a
1940           * second filter driver above s->base (== target).
1941           */
1942          iter_shared_perms = BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE;
1943  
1944          for (iter = bdrv_filter_or_cow_bs(bs); iter != target;
1945               iter = bdrv_filter_or_cow_bs(iter))
1946          {
1947              if (iter == filtered_target) {
1948                  /*
1949                   * From here on, all nodes are filters on the base.
1950                   * This allows us to share BLK_PERM_CONSISTENT_READ.
1951                   */
1952                  iter_shared_perms |= BLK_PERM_CONSISTENT_READ;
1953              }
1954  
1955              ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0,
1956                                       iter_shared_perms, errp);
1957              if (ret < 0) {
1958                  bdrv_graph_wrunlock();
1959                  goto fail;
1960              }
1961          }
1962  
1963          if (bdrv_freeze_backing_chain(mirror_top_bs, target, errp) < 0) {
1964              bdrv_graph_wrunlock();
1965              goto fail;
1966          }
1967      }
1968      bdrv_graph_wrunlock();
1969  
1970      QTAILQ_INIT(&s->ops_in_flight);
1971  
1972      trace_mirror_start(bs, s, opaque);
1973      job_start(&s->common.job);
1974  
1975      return &s->common;
1976  
1977  fail:
1978      if (s) {
1979          /* Make sure this BDS does not go away until we have completed the graph
1980           * changes below */
1981          bdrv_ref(mirror_top_bs);
1982  
1983          g_free(s->replaces);
1984          blk_unref(s->target);
1985          bs_opaque->job = NULL;
1986          if (s->dirty_bitmap) {
1987              bdrv_release_dirty_bitmap(s->dirty_bitmap);
1988          }
1989          job_early_fail(&s->common.job);
1990      }
1991  
1992      bs_opaque->stop = true;
1993      bdrv_drained_begin(bs);
1994      bdrv_graph_wrlock();
1995      assert(mirror_top_bs->backing->bs == bs);
1996      bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
1997                               &error_abort);
1998      bdrv_replace_node(mirror_top_bs, bs, &error_abort);
1999      bdrv_graph_wrunlock();
2000      bdrv_drained_end(bs);
2001  
2002      bdrv_unref(mirror_top_bs);
2003  
2004      return NULL;
2005  }
2006  
mirror_start(const char * job_id,BlockDriverState * bs,BlockDriverState * target,const char * replaces,int creation_flags,int64_t speed,uint32_t granularity,int64_t buf_size,MirrorSyncMode mode,BlockMirrorBackingMode backing_mode,bool zero_target,BlockdevOnError on_source_error,BlockdevOnError on_target_error,bool unmap,const char * filter_node_name,MirrorCopyMode copy_mode,Error ** errp)2007  void mirror_start(const char *job_id, BlockDriverState *bs,
2008                    BlockDriverState *target, const char *replaces,
2009                    int creation_flags, int64_t speed,
2010                    uint32_t granularity, int64_t buf_size,
2011                    MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
2012                    bool zero_target,
2013                    BlockdevOnError on_source_error,
2014                    BlockdevOnError on_target_error,
2015                    bool unmap, const char *filter_node_name,
2016                    MirrorCopyMode copy_mode, Error **errp)
2017  {
2018      bool is_none_mode;
2019      BlockDriverState *base;
2020  
2021      GLOBAL_STATE_CODE();
2022  
2023      if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
2024          (mode == MIRROR_SYNC_MODE_BITMAP)) {
2025          error_setg(errp, "Sync mode '%s' not supported",
2026                     MirrorSyncMode_str(mode));
2027          return;
2028      }
2029  
2030      bdrv_graph_rdlock_main_loop();
2031      is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
2032      base = mode == MIRROR_SYNC_MODE_TOP ? bdrv_backing_chain_next(bs) : NULL;
2033      bdrv_graph_rdunlock_main_loop();
2034  
2035      mirror_start_job(job_id, bs, creation_flags, target, replaces,
2036                       speed, granularity, buf_size, backing_mode, zero_target,
2037                       on_source_error, on_target_error, unmap, NULL, NULL,
2038                       &mirror_job_driver, is_none_mode, base, false,
2039                       filter_node_name, true, copy_mode, false, errp);
2040  }
2041  
commit_active_start(const char * job_id,BlockDriverState * bs,BlockDriverState * base,int creation_flags,int64_t speed,BlockdevOnError on_error,const char * filter_node_name,BlockCompletionFunc * cb,void * opaque,bool auto_complete,Error ** errp)2042  BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs,
2043                                BlockDriverState *base, int creation_flags,
2044                                int64_t speed, BlockdevOnError on_error,
2045                                const char *filter_node_name,
2046                                BlockCompletionFunc *cb, void *opaque,
2047                                bool auto_complete, Error **errp)
2048  {
2049      bool base_read_only;
2050      BlockJob *job;
2051  
2052      GLOBAL_STATE_CODE();
2053  
2054      base_read_only = bdrv_is_read_only(base);
2055  
2056      if (base_read_only) {
2057          if (bdrv_reopen_set_read_only(base, false, errp) < 0) {
2058              return NULL;
2059          }
2060      }
2061  
2062      job = mirror_start_job(
2063                       job_id, bs, creation_flags, base, NULL, speed, 0, 0,
2064                       MIRROR_LEAVE_BACKING_CHAIN, false,
2065                       on_error, on_error, true, cb, opaque,
2066                       &commit_active_job_driver, false, base, auto_complete,
2067                       filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
2068                       base_read_only, errp);
2069      if (!job) {
2070          goto error_restore_flags;
2071      }
2072  
2073      return job;
2074  
2075  error_restore_flags:
2076      /* ignore error and errp for bdrv_reopen, because we want to propagate
2077       * the original error */
2078      if (base_read_only) {
2079          bdrv_reopen_set_read_only(base, true, NULL);
2080      }
2081      return NULL;
2082  }
2083