xref: /openbmc/qemu/block/mirror.c (revision b8f218ef)
1 /*
2  * Image mirroring
3  *
4  * Copyright Red Hat, Inc. 2012
5  *
6  * Authors:
7  *  Paolo Bonzini  <pbonzini@redhat.com>
8  *
9  * This work is licensed under the terms of the GNU LGPL, version 2 or later.
10  * See the COPYING.LIB file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 #include "qemu/cutils.h"
16 #include "qemu/coroutine.h"
17 #include "qemu/range.h"
18 #include "trace.h"
19 #include "block/blockjob_int.h"
20 #include "block/block_int.h"
21 #include "block/dirty-bitmap.h"
22 #include "sysemu/block-backend.h"
23 #include "qapi/error.h"
24 #include "qapi/qmp/qerror.h"
25 #include "qemu/ratelimit.h"
26 #include "qemu/bitmap.h"
27 #include "qemu/memalign.h"
28 
29 #define MAX_IN_FLIGHT 16
30 #define MAX_IO_BYTES (1 << 20) /* 1 Mb */
31 #define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES)
32 
33 /* The mirroring buffer is a list of granularity-sized chunks.
34  * Free chunks are organized in a list.
35  */
36 typedef struct MirrorBuffer {
37     QSIMPLEQ_ENTRY(MirrorBuffer) next;
38 } MirrorBuffer;
39 
40 typedef struct MirrorOp MirrorOp;
41 
42 typedef struct MirrorBlockJob {
43     BlockJob common;
44     BlockBackend *target;
45     BlockDriverState *mirror_top_bs;
46     BlockDriverState *base;
47     BlockDriverState *base_overlay;
48 
49     /* The name of the graph node to replace */
50     char *replaces;
51     /* The BDS to replace */
52     BlockDriverState *to_replace;
53     /* Used to block operations on the drive-mirror-replace target */
54     Error *replace_blocker;
55     bool is_none_mode;
56     BlockMirrorBackingMode backing_mode;
57     /* Whether the target image requires explicit zero-initialization */
58     bool zero_target;
59     MirrorCopyMode copy_mode;
60     BlockdevOnError on_source_error, on_target_error;
61     /* Set when the target is synced (dirty bitmap is clean, nothing
62      * in flight) and the job is running in active mode */
63     bool actively_synced;
64     bool should_complete;
65     int64_t granularity;
66     size_t buf_size;
67     int64_t bdev_length;
68     unsigned long *cow_bitmap;
69     BdrvDirtyBitmap *dirty_bitmap;
70     BdrvDirtyBitmapIter *dbi;
71     uint8_t *buf;
72     QSIMPLEQ_HEAD(, MirrorBuffer) buf_free;
73     int buf_free_count;
74 
75     uint64_t last_pause_ns;
76     unsigned long *in_flight_bitmap;
77     unsigned in_flight;
78     int64_t bytes_in_flight;
79     QTAILQ_HEAD(, MirrorOp) ops_in_flight;
80     int ret;
81     bool unmap;
82     int target_cluster_size;
83     int max_iov;
84     bool initial_zeroing_ongoing;
85     int in_active_write_counter;
86     int64_t active_write_bytes_in_flight;
87     bool prepared;
88     bool in_drain;
89 } MirrorBlockJob;
90 
91 typedef struct MirrorBDSOpaque {
92     MirrorBlockJob *job;
93     bool stop;
94     bool is_commit;
95 } MirrorBDSOpaque;
96 
97 struct MirrorOp {
98     MirrorBlockJob *s;
99     QEMUIOVector qiov;
100     int64_t offset;
101     uint64_t bytes;
102 
103     /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
104      * mirror_co_discard() before yielding for the first time */
105     int64_t *bytes_handled;
106 
107     bool is_pseudo_op;
108     bool is_active_write;
109     bool is_in_flight;
110     CoQueue waiting_requests;
111     Coroutine *co;
112     MirrorOp *waiting_for_op;
113 
114     QTAILQ_ENTRY(MirrorOp) next;
115 };
116 
117 typedef enum MirrorMethod {
118     MIRROR_METHOD_COPY,
119     MIRROR_METHOD_ZERO,
120     MIRROR_METHOD_DISCARD,
121 } MirrorMethod;
122 
123 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
124                                             int error)
125 {
126     s->actively_synced = false;
127     if (read) {
128         return block_job_error_action(&s->common, s->on_source_error,
129                                       true, error);
130     } else {
131         return block_job_error_action(&s->common, s->on_target_error,
132                                       false, error);
133     }
134 }
135 
136 static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self,
137                                                   MirrorBlockJob *s,
138                                                   uint64_t offset,
139                                                   uint64_t bytes)
140 {
141     uint64_t self_start_chunk = offset / s->granularity;
142     uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
143     uint64_t self_nb_chunks = self_end_chunk - self_start_chunk;
144 
145     while (find_next_bit(s->in_flight_bitmap, self_end_chunk,
146                          self_start_chunk) < self_end_chunk &&
147            s->ret >= 0)
148     {
149         MirrorOp *op;
150 
151         QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
152             uint64_t op_start_chunk = op->offset / s->granularity;
153             uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes,
154                                                  s->granularity) -
155                                     op_start_chunk;
156 
157             if (op == self) {
158                 continue;
159             }
160 
161             if (ranges_overlap(self_start_chunk, self_nb_chunks,
162                                op_start_chunk, op_nb_chunks))
163             {
164                 if (self) {
165                     /*
166                      * If the operation is already (indirectly) waiting for us,
167                      * or will wait for us as soon as it wakes up, then just go
168                      * on (instead of producing a deadlock in the former case).
169                      */
170                     if (op->waiting_for_op) {
171                         continue;
172                     }
173 
174                     self->waiting_for_op = op;
175                 }
176 
177                 qemu_co_queue_wait(&op->waiting_requests, NULL);
178 
179                 if (self) {
180                     self->waiting_for_op = NULL;
181                 }
182 
183                 break;
184             }
185         }
186     }
187 }
188 
189 static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
190 {
191     MirrorBlockJob *s = op->s;
192     struct iovec *iov;
193     int64_t chunk_num;
194     int i, nb_chunks;
195 
196     trace_mirror_iteration_done(s, op->offset, op->bytes, ret);
197 
198     s->in_flight--;
199     s->bytes_in_flight -= op->bytes;
200     iov = op->qiov.iov;
201     for (i = 0; i < op->qiov.niov; i++) {
202         MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base;
203         QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next);
204         s->buf_free_count++;
205     }
206 
207     chunk_num = op->offset / s->granularity;
208     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
209 
210     bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks);
211     QTAILQ_REMOVE(&s->ops_in_flight, op, next);
212     if (ret >= 0) {
213         if (s->cow_bitmap) {
214             bitmap_set(s->cow_bitmap, chunk_num, nb_chunks);
215         }
216         if (!s->initial_zeroing_ongoing) {
217             job_progress_update(&s->common.job, op->bytes);
218         }
219     }
220     qemu_iovec_destroy(&op->qiov);
221 
222     qemu_co_queue_restart_all(&op->waiting_requests);
223     g_free(op);
224 }
225 
226 static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
227 {
228     MirrorBlockJob *s = op->s;
229 
230     if (ret < 0) {
231         BlockErrorAction action;
232 
233         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
234         action = mirror_error_action(s, false, -ret);
235         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
236             s->ret = ret;
237         }
238     }
239 
240     mirror_iteration_done(op, ret);
241 }
242 
243 static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
244 {
245     MirrorBlockJob *s = op->s;
246 
247     if (ret < 0) {
248         BlockErrorAction action;
249 
250         bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes);
251         action = mirror_error_action(s, true, -ret);
252         if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) {
253             s->ret = ret;
254         }
255 
256         mirror_iteration_done(op, ret);
257         return;
258     }
259 
260     ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0);
261     mirror_write_complete(op, ret);
262 }
263 
264 /* Clip bytes relative to offset to not exceed end-of-file */
265 static inline int64_t mirror_clip_bytes(MirrorBlockJob *s,
266                                         int64_t offset,
267                                         int64_t bytes)
268 {
269     return MIN(bytes, s->bdev_length - offset);
270 }
271 
272 /* Round offset and/or bytes to target cluster if COW is needed, and
273  * return the offset of the adjusted tail against original. */
274 static int mirror_cow_align(MirrorBlockJob *s, int64_t *offset,
275                             uint64_t *bytes)
276 {
277     bool need_cow;
278     int ret = 0;
279     int64_t align_offset = *offset;
280     int64_t align_bytes = *bytes;
281     int max_bytes = s->granularity * s->max_iov;
282 
283     need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap);
284     need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity,
285                           s->cow_bitmap);
286     if (need_cow) {
287         bdrv_round_to_clusters(blk_bs(s->target), *offset, *bytes,
288                                &align_offset, &align_bytes);
289     }
290 
291     if (align_bytes > max_bytes) {
292         align_bytes = max_bytes;
293         if (need_cow) {
294             align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size);
295         }
296     }
297     /* Clipping may result in align_bytes unaligned to chunk boundary, but
298      * that doesn't matter because it's already the end of source image. */
299     align_bytes = mirror_clip_bytes(s, align_offset, align_bytes);
300 
301     ret = align_offset + align_bytes - (*offset + *bytes);
302     *offset = align_offset;
303     *bytes = align_bytes;
304     assert(ret >= 0);
305     return ret;
306 }
307 
308 static inline void coroutine_fn
309 mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s)
310 {
311     MirrorOp *op;
312 
313     QTAILQ_FOREACH(op, &s->ops_in_flight, next) {
314         /*
315          * Do not wait on pseudo ops, because it may in turn wait on
316          * some other operation to start, which may in fact be the
317          * caller of this function.  Since there is only one pseudo op
318          * at any given time, we will always find some real operation
319          * to wait on.
320          * Also, do not wait on active operations, because they do not
321          * use up in-flight slots.
322          */
323         if (!op->is_pseudo_op && op->is_in_flight && !op->is_active_write) {
324             qemu_co_queue_wait(&op->waiting_requests, NULL);
325             return;
326         }
327     }
328     abort();
329 }
330 
331 /* Perform a mirror copy operation.
332  *
333  * *op->bytes_handled is set to the number of bytes copied after and
334  * including offset, excluding any bytes copied prior to offset due
335  * to alignment.  This will be op->bytes if no alignment is necessary,
336  * or (new_end - op->offset) if the tail is rounded up or down due to
337  * alignment or buffer limit.
338  */
339 static void coroutine_fn mirror_co_read(void *opaque)
340 {
341     MirrorOp *op = opaque;
342     MirrorBlockJob *s = op->s;
343     int nb_chunks;
344     uint64_t ret;
345     uint64_t max_bytes;
346 
347     max_bytes = s->granularity * s->max_iov;
348 
349     /* We can only handle as much as buf_size at a time. */
350     op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
351     assert(op->bytes);
352     assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
353     *op->bytes_handled = op->bytes;
354 
355     if (s->cow_bitmap) {
356         *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
357     }
358     /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
359     assert(*op->bytes_handled <= UINT_MAX);
360     assert(op->bytes <= s->buf_size);
361     /* The offset is granularity-aligned because:
362      * 1) Caller passes in aligned values;
363      * 2) mirror_cow_align is used only when target cluster is larger. */
364     assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
365     /* The range is sector-aligned, since bdrv_getlength() rounds up. */
366     assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
367     nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
368 
369     while (s->buf_free_count < nb_chunks) {
370         trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
371         mirror_wait_for_free_in_flight_slot(s);
372     }
373 
374     /* Now make a QEMUIOVector taking enough granularity-sized chunks
375      * from s->buf_free.
376      */
377     qemu_iovec_init(&op->qiov, nb_chunks);
378     while (nb_chunks-- > 0) {
379         MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
380         size_t remaining = op->bytes - op->qiov.size;
381 
382         QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
383         s->buf_free_count--;
384         qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining));
385     }
386 
387     /* Copy the dirty cluster.  */
388     s->in_flight++;
389     s->bytes_in_flight += op->bytes;
390     op->is_in_flight = true;
391     trace_mirror_one_iteration(s, op->offset, op->bytes);
392 
393     WITH_GRAPH_RDLOCK_GUARD() {
394         ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes,
395                              &op->qiov, 0);
396     }
397     mirror_read_complete(op, ret);
398 }
399 
400 static void coroutine_fn mirror_co_zero(void *opaque)
401 {
402     MirrorOp *op = opaque;
403     int ret;
404 
405     op->s->in_flight++;
406     op->s->bytes_in_flight += op->bytes;
407     *op->bytes_handled = op->bytes;
408     op->is_in_flight = true;
409 
410     ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
411                                op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
412     mirror_write_complete(op, ret);
413 }
414 
415 static void coroutine_fn mirror_co_discard(void *opaque)
416 {
417     MirrorOp *op = opaque;
418     int ret;
419 
420     op->s->in_flight++;
421     op->s->bytes_in_flight += op->bytes;
422     *op->bytes_handled = op->bytes;
423     op->is_in_flight = true;
424 
425     ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
426     mirror_write_complete(op, ret);
427 }
428 
429 static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
430                                unsigned bytes, MirrorMethod mirror_method)
431 {
432     MirrorOp *op;
433     Coroutine *co;
434     int64_t bytes_handled = -1;
435 
436     op = g_new(MirrorOp, 1);
437     *op = (MirrorOp){
438         .s              = s,
439         .offset         = offset,
440         .bytes          = bytes,
441         .bytes_handled  = &bytes_handled,
442     };
443     qemu_co_queue_init(&op->waiting_requests);
444 
445     switch (mirror_method) {
446     case MIRROR_METHOD_COPY:
447         co = qemu_coroutine_create(mirror_co_read, op);
448         break;
449     case MIRROR_METHOD_ZERO:
450         co = qemu_coroutine_create(mirror_co_zero, op);
451         break;
452     case MIRROR_METHOD_DISCARD:
453         co = qemu_coroutine_create(mirror_co_discard, op);
454         break;
455     default:
456         abort();
457     }
458     op->co = co;
459 
460     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
461     qemu_coroutine_enter(co);
462     /* At this point, ownership of op has been moved to the coroutine
463      * and the object may already be freed */
464 
465     /* Assert that this value has been set */
466     assert(bytes_handled >= 0);
467 
468     /* Same assertion as in mirror_co_read() (and for mirror_co_read()
469      * and mirror_co_discard(), bytes_handled == op->bytes, which
470      * is the @bytes parameter given to this function) */
471     assert(bytes_handled <= UINT_MAX);
472     return bytes_handled;
473 }
474 
475 static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
476 {
477     BlockDriverState *source = s->mirror_top_bs->backing->bs;
478     MirrorOp *pseudo_op;
479     int64_t offset;
480     uint64_t delay_ns = 0, ret = 0;
481     /* At least the first dirty chunk is mirrored in one iteration. */
482     int nb_chunks = 1;
483     bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target));
484     int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES);
485 
486     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
487     offset = bdrv_dirty_iter_next(s->dbi);
488     if (offset < 0) {
489         bdrv_set_dirty_iter(s->dbi, 0);
490         offset = bdrv_dirty_iter_next(s->dbi);
491         trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap));
492         assert(offset >= 0);
493     }
494     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
495 
496     /*
497      * Wait for concurrent requests to @offset.  The next loop will limit the
498      * copied area based on in_flight_bitmap so we only copy an area that does
499      * not overlap with concurrent in-flight requests.  Still, we would like to
500      * copy something, so wait until there are at least no more requests to the
501      * very beginning of the area.
502      */
503     mirror_wait_on_conflicts(NULL, s, offset, 1);
504 
505     job_pause_point(&s->common.job);
506 
507     /* Find the number of consective dirty chunks following the first dirty
508      * one, and wait for in flight requests in them. */
509     bdrv_dirty_bitmap_lock(s->dirty_bitmap);
510     while (nb_chunks * s->granularity < s->buf_size) {
511         int64_t next_dirty;
512         int64_t next_offset = offset + nb_chunks * s->granularity;
513         int64_t next_chunk = next_offset / s->granularity;
514         if (next_offset >= s->bdev_length ||
515             !bdrv_dirty_bitmap_get_locked(s->dirty_bitmap, next_offset)) {
516             break;
517         }
518         if (test_bit(next_chunk, s->in_flight_bitmap)) {
519             break;
520         }
521 
522         next_dirty = bdrv_dirty_iter_next(s->dbi);
523         if (next_dirty > next_offset || next_dirty < 0) {
524             /* The bitmap iterator's cache is stale, refresh it */
525             bdrv_set_dirty_iter(s->dbi, next_offset);
526             next_dirty = bdrv_dirty_iter_next(s->dbi);
527         }
528         assert(next_dirty == next_offset);
529         nb_chunks++;
530     }
531 
532     /* Clear dirty bits before querying the block status, because
533      * calling bdrv_block_status_above could yield - if some blocks are
534      * marked dirty in this window, we need to know.
535      */
536     bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset,
537                                    nb_chunks * s->granularity);
538     bdrv_dirty_bitmap_unlock(s->dirty_bitmap);
539 
540     /* Before claiming an area in the in-flight bitmap, we have to
541      * create a MirrorOp for it so that conflicting requests can wait
542      * for it.  mirror_perform() will create the real MirrorOps later,
543      * for now we just create a pseudo operation that will wake up all
544      * conflicting requests once all real operations have been
545      * launched. */
546     pseudo_op = g_new(MirrorOp, 1);
547     *pseudo_op = (MirrorOp){
548         .offset         = offset,
549         .bytes          = nb_chunks * s->granularity,
550         .is_pseudo_op   = true,
551     };
552     qemu_co_queue_init(&pseudo_op->waiting_requests);
553     QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next);
554 
555     bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks);
556     while (nb_chunks > 0 && offset < s->bdev_length) {
557         int ret;
558         int64_t io_bytes;
559         int64_t io_bytes_acct;
560         MirrorMethod mirror_method = MIRROR_METHOD_COPY;
561 
562         assert(!(offset % s->granularity));
563         WITH_GRAPH_RDLOCK_GUARD() {
564             ret = bdrv_block_status_above(source, NULL, offset,
565                                         nb_chunks * s->granularity,
566                                         &io_bytes, NULL, NULL);
567         }
568         if (ret < 0) {
569             io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes);
570         } else if (ret & BDRV_BLOCK_DATA) {
571             io_bytes = MIN(io_bytes, max_io_bytes);
572         }
573 
574         io_bytes -= io_bytes % s->granularity;
575         if (io_bytes < s->granularity) {
576             io_bytes = s->granularity;
577         } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) {
578             int64_t target_offset;
579             int64_t target_bytes;
580             bdrv_round_to_clusters(blk_bs(s->target), offset, io_bytes,
581                                    &target_offset, &target_bytes);
582             if (target_offset == offset &&
583                 target_bytes == io_bytes) {
584                 mirror_method = ret & BDRV_BLOCK_ZERO ?
585                                     MIRROR_METHOD_ZERO :
586                                     MIRROR_METHOD_DISCARD;
587             }
588         }
589 
590         while (s->in_flight >= MAX_IN_FLIGHT) {
591             trace_mirror_yield_in_flight(s, offset, s->in_flight);
592             mirror_wait_for_free_in_flight_slot(s);
593         }
594 
595         if (s->ret < 0) {
596             ret = 0;
597             goto fail;
598         }
599 
600         io_bytes = mirror_clip_bytes(s, offset, io_bytes);
601         io_bytes = mirror_perform(s, offset, io_bytes, mirror_method);
602         if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) {
603             io_bytes_acct = 0;
604         } else {
605             io_bytes_acct = io_bytes;
606         }
607         assert(io_bytes);
608         offset += io_bytes;
609         nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity);
610         delay_ns = block_job_ratelimit_get_delay(&s->common, io_bytes_acct);
611     }
612 
613     ret = delay_ns;
614 fail:
615     QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next);
616     qemu_co_queue_restart_all(&pseudo_op->waiting_requests);
617     g_free(pseudo_op);
618 
619     return ret;
620 }
621 
622 static void mirror_free_init(MirrorBlockJob *s)
623 {
624     int granularity = s->granularity;
625     size_t buf_size = s->buf_size;
626     uint8_t *buf = s->buf;
627 
628     assert(s->buf_free_count == 0);
629     QSIMPLEQ_INIT(&s->buf_free);
630     while (buf_size != 0) {
631         MirrorBuffer *cur = (MirrorBuffer *)buf;
632         QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next);
633         s->buf_free_count++;
634         buf_size -= granularity;
635         buf += granularity;
636     }
637 }
638 
639 /* This is also used for the .pause callback. There is no matching
640  * mirror_resume() because mirror_run() will begin iterating again
641  * when the job is resumed.
642  */
643 static void coroutine_fn mirror_wait_for_all_io(MirrorBlockJob *s)
644 {
645     while (s->in_flight > 0) {
646         mirror_wait_for_free_in_flight_slot(s);
647     }
648 }
649 
650 /**
651  * mirror_exit_common: handle both abort() and prepare() cases.
652  * for .prepare, returns 0 on success and -errno on failure.
653  * for .abort cases, denoted by abort = true, MUST return 0.
654  */
655 static int mirror_exit_common(Job *job)
656 {
657     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
658     BlockJob *bjob = &s->common;
659     MirrorBDSOpaque *bs_opaque;
660     AioContext *replace_aio_context = NULL;
661     BlockDriverState *src;
662     BlockDriverState *target_bs;
663     BlockDriverState *mirror_top_bs;
664     Error *local_err = NULL;
665     bool abort = job->ret < 0;
666     int ret = 0;
667 
668     if (s->prepared) {
669         return 0;
670     }
671     s->prepared = true;
672 
673     mirror_top_bs = s->mirror_top_bs;
674     bs_opaque = mirror_top_bs->opaque;
675     src = mirror_top_bs->backing->bs;
676     target_bs = blk_bs(s->target);
677 
678     if (bdrv_chain_contains(src, target_bs)) {
679         bdrv_unfreeze_backing_chain(mirror_top_bs, target_bs);
680     }
681 
682     bdrv_release_dirty_bitmap(s->dirty_bitmap);
683 
684     /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
685      * before we can call bdrv_drained_end */
686     bdrv_ref(src);
687     bdrv_ref(mirror_top_bs);
688     bdrv_ref(target_bs);
689 
690     /*
691      * Remove target parent that still uses BLK_PERM_WRITE/RESIZE before
692      * inserting target_bs at s->to_replace, where we might not be able to get
693      * these permissions.
694      */
695     blk_unref(s->target);
696     s->target = NULL;
697 
698     /* We don't access the source any more. Dropping any WRITE/RESIZE is
699      * required before it could become a backing file of target_bs. Not having
700      * these permissions any more means that we can't allow any new requests on
701      * mirror_top_bs from now on, so keep it drained. */
702     bdrv_drained_begin(mirror_top_bs);
703     bs_opaque->stop = true;
704     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
705                              &error_abort);
706     if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) {
707         BlockDriverState *backing = s->is_none_mode ? src : s->base;
708         BlockDriverState *unfiltered_target = bdrv_skip_filters(target_bs);
709 
710         if (bdrv_cow_bs(unfiltered_target) != backing) {
711             bdrv_set_backing_hd(unfiltered_target, backing, &local_err);
712             if (local_err) {
713                 error_report_err(local_err);
714                 local_err = NULL;
715                 ret = -EPERM;
716             }
717         }
718     } else if (!abort && s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) {
719         assert(!bdrv_backing_chain_next(target_bs));
720         ret = bdrv_open_backing_file(bdrv_skip_filters(target_bs), NULL,
721                                      "backing", &local_err);
722         if (ret < 0) {
723             error_report_err(local_err);
724             local_err = NULL;
725         }
726     }
727 
728     if (s->to_replace) {
729         replace_aio_context = bdrv_get_aio_context(s->to_replace);
730         aio_context_acquire(replace_aio_context);
731     }
732 
733     if (s->should_complete && !abort) {
734         BlockDriverState *to_replace = s->to_replace ?: src;
735         bool ro = bdrv_is_read_only(to_replace);
736 
737         if (ro != bdrv_is_read_only(target_bs)) {
738             bdrv_reopen_set_read_only(target_bs, ro, NULL);
739         }
740 
741         /* The mirror job has no requests in flight any more, but we need to
742          * drain potential other users of the BDS before changing the graph. */
743         assert(s->in_drain);
744         bdrv_drained_begin(target_bs);
745         /*
746          * Cannot use check_to_replace_node() here, because that would
747          * check for an op blocker on @to_replace, and we have our own
748          * there.
749          */
750         if (bdrv_recurse_can_replace(src, to_replace)) {
751             bdrv_replace_node(to_replace, target_bs, &local_err);
752         } else {
753             error_setg(&local_err, "Can no longer replace '%s' by '%s', "
754                        "because it can no longer be guaranteed that doing so "
755                        "would not lead to an abrupt change of visible data",
756                        to_replace->node_name, target_bs->node_name);
757         }
758         bdrv_drained_end(target_bs);
759         if (local_err) {
760             error_report_err(local_err);
761             ret = -EPERM;
762         }
763     }
764     if (s->to_replace) {
765         bdrv_op_unblock_all(s->to_replace, s->replace_blocker);
766         error_free(s->replace_blocker);
767         bdrv_unref(s->to_replace);
768     }
769     if (replace_aio_context) {
770         aio_context_release(replace_aio_context);
771     }
772     g_free(s->replaces);
773     bdrv_unref(target_bs);
774 
775     /*
776      * Remove the mirror filter driver from the graph. Before this, get rid of
777      * the blockers on the intermediate nodes so that the resulting state is
778      * valid.
779      */
780     block_job_remove_all_bdrv(bjob);
781     bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
782 
783     bs_opaque->job = NULL;
784 
785     bdrv_drained_end(src);
786     bdrv_drained_end(mirror_top_bs);
787     s->in_drain = false;
788     bdrv_unref(mirror_top_bs);
789     bdrv_unref(src);
790 
791     return ret;
792 }
793 
794 static int mirror_prepare(Job *job)
795 {
796     return mirror_exit_common(job);
797 }
798 
799 static void mirror_abort(Job *job)
800 {
801     int ret = mirror_exit_common(job);
802     assert(ret == 0);
803 }
804 
805 static void coroutine_fn mirror_throttle(MirrorBlockJob *s)
806 {
807     int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
808 
809     if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) {
810         s->last_pause_ns = now;
811         job_sleep_ns(&s->common.job, 0);
812     } else {
813         job_pause_point(&s->common.job);
814     }
815 }
816 
817 static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
818 {
819     int64_t offset;
820     BlockDriverState *bs = s->mirror_top_bs->backing->bs;
821     BlockDriverState *target_bs = blk_bs(s->target);
822     int ret;
823     int64_t count;
824 
825     if (s->zero_target) {
826         if (!bdrv_can_write_zeroes_with_unmap(target_bs)) {
827             bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length);
828             return 0;
829         }
830 
831         s->initial_zeroing_ongoing = true;
832         for (offset = 0; offset < s->bdev_length; ) {
833             int bytes = MIN(s->bdev_length - offset,
834                             QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
835 
836             mirror_throttle(s);
837 
838             if (job_is_cancelled(&s->common.job)) {
839                 s->initial_zeroing_ongoing = false;
840                 return 0;
841             }
842 
843             if (s->in_flight >= MAX_IN_FLIGHT) {
844                 trace_mirror_yield(s, UINT64_MAX, s->buf_free_count,
845                                    s->in_flight);
846                 mirror_wait_for_free_in_flight_slot(s);
847                 continue;
848             }
849 
850             mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO);
851             offset += bytes;
852         }
853 
854         mirror_wait_for_all_io(s);
855         s->initial_zeroing_ongoing = false;
856     }
857 
858     /* First part, loop on the sectors and initialize the dirty bitmap.  */
859     for (offset = 0; offset < s->bdev_length; ) {
860         /* Just to make sure we are not exceeding int limit. */
861         int bytes = MIN(s->bdev_length - offset,
862                         QEMU_ALIGN_DOWN(INT_MAX, s->granularity));
863 
864         mirror_throttle(s);
865 
866         if (job_is_cancelled(&s->common.job)) {
867             return 0;
868         }
869 
870         WITH_GRAPH_RDLOCK_GUARD() {
871             ret = bdrv_is_allocated_above(bs, s->base_overlay, true, offset,
872                                           bytes, &count);
873         }
874         if (ret < 0) {
875             return ret;
876         }
877 
878         assert(count);
879         if (ret > 0) {
880             bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count);
881         }
882         offset += count;
883     }
884     return 0;
885 }
886 
887 /* Called when going out of the streaming phase to flush the bulk of the
888  * data to the medium, or just before completing.
889  */
890 static int mirror_flush(MirrorBlockJob *s)
891 {
892     int ret = blk_flush(s->target);
893     if (ret < 0) {
894         if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) {
895             s->ret = ret;
896         }
897     }
898     return ret;
899 }
900 
901 static int coroutine_fn mirror_run(Job *job, Error **errp)
902 {
903     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
904     BlockDriverState *bs = s->mirror_top_bs->backing->bs;
905     MirrorBDSOpaque *mirror_top_opaque = s->mirror_top_bs->opaque;
906     BlockDriverState *target_bs = blk_bs(s->target);
907     bool need_drain = true;
908     BlockDeviceIoStatus iostatus;
909     int64_t length;
910     int64_t target_length;
911     BlockDriverInfo bdi;
912     char backing_filename[2]; /* we only need 2 characters because we are only
913                                  checking for a NULL string */
914     int ret = 0;
915 
916     if (job_is_cancelled(&s->common.job)) {
917         goto immediate_exit;
918     }
919 
920     bdrv_graph_co_rdlock();
921     s->bdev_length = bdrv_co_getlength(bs);
922     bdrv_graph_co_rdunlock();
923 
924     if (s->bdev_length < 0) {
925         ret = s->bdev_length;
926         goto immediate_exit;
927     }
928 
929     target_length = blk_co_getlength(s->target);
930     if (target_length < 0) {
931         ret = target_length;
932         goto immediate_exit;
933     }
934 
935     /* Active commit must resize the base image if its size differs from the
936      * active layer. */
937     if (s->base == blk_bs(s->target)) {
938         if (s->bdev_length > target_length) {
939             ret = blk_co_truncate(s->target, s->bdev_length, false,
940                                   PREALLOC_MODE_OFF, 0, NULL);
941             if (ret < 0) {
942                 goto immediate_exit;
943             }
944         }
945     } else if (s->bdev_length != target_length) {
946         error_setg(errp, "Source and target image have different sizes");
947         ret = -EINVAL;
948         goto immediate_exit;
949     }
950 
951     if (s->bdev_length == 0) {
952         /* Transition to the READY state and wait for complete. */
953         job_transition_to_ready(&s->common.job);
954         s->actively_synced = true;
955         while (!job_cancel_requested(&s->common.job) && !s->should_complete) {
956             job_yield(&s->common.job);
957         }
958         goto immediate_exit;
959     }
960 
961     length = DIV_ROUND_UP(s->bdev_length, s->granularity);
962     s->in_flight_bitmap = bitmap_new(length);
963 
964     /* If we have no backing file yet in the destination, we cannot let
965      * the destination do COW.  Instead, we copy sectors around the
966      * dirty data if needed.  We need a bitmap to do that.
967      */
968     bdrv_get_backing_filename(target_bs, backing_filename,
969                               sizeof(backing_filename));
970     if (!bdrv_co_get_info(target_bs, &bdi) && bdi.cluster_size) {
971         s->target_cluster_size = bdi.cluster_size;
972     } else {
973         s->target_cluster_size = BDRV_SECTOR_SIZE;
974     }
975     if (backing_filename[0] && !bdrv_backing_chain_next(target_bs) &&
976         s->granularity < s->target_cluster_size) {
977         s->buf_size = MAX(s->buf_size, s->target_cluster_size);
978         s->cow_bitmap = bitmap_new(length);
979     }
980     s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov);
981 
982     s->buf = qemu_try_blockalign(bs, s->buf_size);
983     if (s->buf == NULL) {
984         ret = -ENOMEM;
985         goto immediate_exit;
986     }
987 
988     mirror_free_init(s);
989 
990     s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
991     if (!s->is_none_mode) {
992         ret = mirror_dirty_init(s);
993         if (ret < 0 || job_is_cancelled(&s->common.job)) {
994             goto immediate_exit;
995         }
996     }
997 
998     /*
999      * Only now the job is fully initialised and mirror_top_bs should start
1000      * accessing it.
1001      */
1002     mirror_top_opaque->job = s;
1003 
1004     assert(!s->dbi);
1005     s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap);
1006     for (;;) {
1007         uint64_t delay_ns = 0;
1008         int64_t cnt, delta;
1009         bool should_complete;
1010 
1011         if (s->ret < 0) {
1012             ret = s->ret;
1013             goto immediate_exit;
1014         }
1015 
1016         job_pause_point(&s->common.job);
1017 
1018         if (job_is_cancelled(&s->common.job)) {
1019             ret = 0;
1020             goto immediate_exit;
1021         }
1022 
1023         cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1024         /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is
1025          * the number of bytes currently being processed; together those are
1026          * the current remaining operation length */
1027         job_progress_set_remaining(&s->common.job,
1028                                    s->bytes_in_flight + cnt +
1029                                    s->active_write_bytes_in_flight);
1030 
1031         /* Note that even when no rate limit is applied we need to yield
1032          * periodically with no pending I/O so that bdrv_drain_all() returns.
1033          * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is
1034          * an error, or when the source is clean, whichever comes first. */
1035         delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns;
1036         WITH_JOB_LOCK_GUARD() {
1037             iostatus = s->common.iostatus;
1038         }
1039         if (delta < BLOCK_JOB_SLICE_TIME &&
1040             iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1041             if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 ||
1042                 (cnt == 0 && s->in_flight > 0)) {
1043                 trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight);
1044                 mirror_wait_for_free_in_flight_slot(s);
1045                 continue;
1046             } else if (cnt != 0) {
1047                 delay_ns = mirror_iteration(s);
1048             }
1049         }
1050 
1051         should_complete = false;
1052         if (s->in_flight == 0 && cnt == 0) {
1053             trace_mirror_before_flush(s);
1054             if (!job_is_ready(&s->common.job)) {
1055                 if (mirror_flush(s) < 0) {
1056                     /* Go check s->ret.  */
1057                     continue;
1058                 }
1059                 /* We're out of the streaming phase.  From now on, if the job
1060                  * is cancelled we will actually complete all pending I/O and
1061                  * report completion.  This way, block-job-cancel will leave
1062                  * the target in a consistent state.
1063                  */
1064                 job_transition_to_ready(&s->common.job);
1065                 if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) {
1066                     s->actively_synced = true;
1067                 }
1068             }
1069 
1070             should_complete = s->should_complete ||
1071                 job_cancel_requested(&s->common.job);
1072             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1073         }
1074 
1075         if (cnt == 0 && should_complete) {
1076             /* The dirty bitmap is not updated while operations are pending.
1077              * If we're about to exit, wait for pending operations before
1078              * calling bdrv_get_dirty_count(bs), or we may exit while the
1079              * source has dirty data to copy!
1080              *
1081              * Note that I/O can be submitted by the guest while
1082              * mirror_populate runs, so pause it now.  Before deciding
1083              * whether to switch to target check one last time if I/O has
1084              * come in the meanwhile, and if not flush the data to disk.
1085              */
1086             trace_mirror_before_drain(s, cnt);
1087 
1088             s->in_drain = true;
1089             bdrv_drained_begin(bs);
1090 
1091             /* Must be zero because we are drained */
1092             assert(s->in_active_write_counter == 0);
1093 
1094             cnt = bdrv_get_dirty_count(s->dirty_bitmap);
1095             if (cnt > 0 || mirror_flush(s) < 0) {
1096                 bdrv_drained_end(bs);
1097                 s->in_drain = false;
1098                 continue;
1099             }
1100 
1101             /* The two disks are in sync.  Exit and report successful
1102              * completion.
1103              */
1104             assert(QLIST_EMPTY(&bs->tracked_requests));
1105             need_drain = false;
1106             break;
1107         }
1108 
1109         if (job_is_ready(&s->common.job) && !should_complete) {
1110             delay_ns = (s->in_flight == 0 &&
1111                         cnt == 0 ? BLOCK_JOB_SLICE_TIME : 0);
1112         }
1113         trace_mirror_before_sleep(s, cnt, job_is_ready(&s->common.job),
1114                                   delay_ns);
1115         job_sleep_ns(&s->common.job, delay_ns);
1116         s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
1117     }
1118 
1119 immediate_exit:
1120     if (s->in_flight > 0) {
1121         /* We get here only if something went wrong.  Either the job failed,
1122          * or it was cancelled prematurely so that we do not guarantee that
1123          * the target is a copy of the source.
1124          */
1125         assert(ret < 0 || job_is_cancelled(&s->common.job));
1126         assert(need_drain);
1127         mirror_wait_for_all_io(s);
1128     }
1129 
1130     assert(s->in_flight == 0);
1131     qemu_vfree(s->buf);
1132     g_free(s->cow_bitmap);
1133     g_free(s->in_flight_bitmap);
1134     bdrv_dirty_iter_free(s->dbi);
1135 
1136     if (need_drain) {
1137         s->in_drain = true;
1138         bdrv_drained_begin(bs);
1139     }
1140 
1141     return ret;
1142 }
1143 
1144 static void mirror_complete(Job *job, Error **errp)
1145 {
1146     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1147 
1148     if (!job_is_ready(job)) {
1149         error_setg(errp, "The active block job '%s' cannot be completed",
1150                    job->id);
1151         return;
1152     }
1153 
1154     /* block all operations on to_replace bs */
1155     if (s->replaces) {
1156         AioContext *replace_aio_context;
1157 
1158         s->to_replace = bdrv_find_node(s->replaces);
1159         if (!s->to_replace) {
1160             error_setg(errp, "Node name '%s' not found", s->replaces);
1161             return;
1162         }
1163 
1164         replace_aio_context = bdrv_get_aio_context(s->to_replace);
1165         aio_context_acquire(replace_aio_context);
1166 
1167         /* TODO Translate this into child freeze system. */
1168         error_setg(&s->replace_blocker,
1169                    "block device is in use by block-job-complete");
1170         bdrv_op_block_all(s->to_replace, s->replace_blocker);
1171         bdrv_ref(s->to_replace);
1172 
1173         aio_context_release(replace_aio_context);
1174     }
1175 
1176     s->should_complete = true;
1177 
1178     /* If the job is paused, it will be re-entered when it is resumed */
1179     WITH_JOB_LOCK_GUARD() {
1180         if (!job->paused) {
1181             job_enter_cond_locked(job, NULL);
1182         }
1183     }
1184 }
1185 
1186 static void coroutine_fn mirror_pause(Job *job)
1187 {
1188     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1189 
1190     mirror_wait_for_all_io(s);
1191 }
1192 
1193 static bool mirror_drained_poll(BlockJob *job)
1194 {
1195     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
1196 
1197     /* If the job isn't paused nor cancelled, we can't be sure that it won't
1198      * issue more requests. We make an exception if we've reached this point
1199      * from one of our own drain sections, to avoid a deadlock waiting for
1200      * ourselves.
1201      */
1202     WITH_JOB_LOCK_GUARD() {
1203         if (!s->common.job.paused && !job_is_cancelled_locked(&job->job)
1204             && !s->in_drain) {
1205             return true;
1206         }
1207     }
1208 
1209     return !!s->in_flight;
1210 }
1211 
1212 static bool mirror_cancel(Job *job, bool force)
1213 {
1214     MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
1215     BlockDriverState *target = blk_bs(s->target);
1216 
1217     /*
1218      * Before the job is READY, we treat any cancellation like a
1219      * force-cancellation.
1220      */
1221     force = force || !job_is_ready(job);
1222 
1223     if (force) {
1224         bdrv_cancel_in_flight(target);
1225     }
1226     return force;
1227 }
1228 
1229 static bool commit_active_cancel(Job *job, bool force)
1230 {
1231     /* Same as above in mirror_cancel() */
1232     return force || !job_is_ready(job);
1233 }
1234 
1235 static const BlockJobDriver mirror_job_driver = {
1236     .job_driver = {
1237         .instance_size          = sizeof(MirrorBlockJob),
1238         .job_type               = JOB_TYPE_MIRROR,
1239         .free                   = block_job_free,
1240         .user_resume            = block_job_user_resume,
1241         .run                    = mirror_run,
1242         .prepare                = mirror_prepare,
1243         .abort                  = mirror_abort,
1244         .pause                  = mirror_pause,
1245         .complete               = mirror_complete,
1246         .cancel                 = mirror_cancel,
1247     },
1248     .drained_poll           = mirror_drained_poll,
1249 };
1250 
1251 static const BlockJobDriver commit_active_job_driver = {
1252     .job_driver = {
1253         .instance_size          = sizeof(MirrorBlockJob),
1254         .job_type               = JOB_TYPE_COMMIT,
1255         .free                   = block_job_free,
1256         .user_resume            = block_job_user_resume,
1257         .run                    = mirror_run,
1258         .prepare                = mirror_prepare,
1259         .abort                  = mirror_abort,
1260         .pause                  = mirror_pause,
1261         .complete               = mirror_complete,
1262         .cancel                 = commit_active_cancel,
1263     },
1264     .drained_poll           = mirror_drained_poll,
1265 };
1266 
1267 static void coroutine_fn
1268 do_sync_target_write(MirrorBlockJob *job, MirrorMethod method,
1269                      uint64_t offset, uint64_t bytes,
1270                      QEMUIOVector *qiov, int flags)
1271 {
1272     int ret;
1273     size_t qiov_offset = 0;
1274     int64_t bitmap_offset, bitmap_end;
1275 
1276     if (!QEMU_IS_ALIGNED(offset, job->granularity) &&
1277         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset))
1278     {
1279             /*
1280              * Dirty unaligned padding: ignore it.
1281              *
1282              * Reasoning:
1283              * 1. If we copy it, we can't reset corresponding bit in
1284              *    dirty_bitmap as there may be some "dirty" bytes still not
1285              *    copied.
1286              * 2. It's already dirty, so skipping it we don't diverge mirror
1287              *    progress.
1288              *
1289              * Note, that because of this, guest write may have no contribution
1290              * into mirror converge, but that's not bad, as we have background
1291              * process of mirroring. If under some bad circumstances (high guest
1292              * IO load) background process starve, we will not converge anyway,
1293              * even if each write will contribute, as guest is not guaranteed to
1294              * rewrite the whole disk.
1295              */
1296             qiov_offset = QEMU_ALIGN_UP(offset, job->granularity) - offset;
1297             if (bytes <= qiov_offset) {
1298                 /* nothing to do after shrink */
1299                 return;
1300             }
1301             offset += qiov_offset;
1302             bytes -= qiov_offset;
1303     }
1304 
1305     if (!QEMU_IS_ALIGNED(offset + bytes, job->granularity) &&
1306         bdrv_dirty_bitmap_get(job->dirty_bitmap, offset + bytes - 1))
1307     {
1308         uint64_t tail = (offset + bytes) % job->granularity;
1309 
1310         if (bytes <= tail) {
1311             /* nothing to do after shrink */
1312             return;
1313         }
1314         bytes -= tail;
1315     }
1316 
1317     /*
1318      * Tails are either clean or shrunk, so for bitmap resetting
1319      * we safely align the range down.
1320      */
1321     bitmap_offset = QEMU_ALIGN_UP(offset, job->granularity);
1322     bitmap_end = QEMU_ALIGN_DOWN(offset + bytes, job->granularity);
1323     if (bitmap_offset < bitmap_end) {
1324         bdrv_reset_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1325                                 bitmap_end - bitmap_offset);
1326     }
1327 
1328     job_progress_increase_remaining(&job->common.job, bytes);
1329     job->active_write_bytes_in_flight += bytes;
1330 
1331     switch (method) {
1332     case MIRROR_METHOD_COPY:
1333         ret = blk_co_pwritev_part(job->target, offset, bytes,
1334                                   qiov, qiov_offset, flags);
1335         break;
1336 
1337     case MIRROR_METHOD_ZERO:
1338         assert(!qiov);
1339         ret = blk_co_pwrite_zeroes(job->target, offset, bytes, flags);
1340         break;
1341 
1342     case MIRROR_METHOD_DISCARD:
1343         assert(!qiov);
1344         ret = blk_co_pdiscard(job->target, offset, bytes);
1345         break;
1346 
1347     default:
1348         abort();
1349     }
1350 
1351     job->active_write_bytes_in_flight -= bytes;
1352     if (ret >= 0) {
1353         job_progress_update(&job->common.job, bytes);
1354     } else {
1355         BlockErrorAction action;
1356 
1357         /*
1358          * We failed, so we should mark dirty the whole area, aligned up.
1359          * Note that we don't care about shrunk tails if any: they were dirty
1360          * at function start, and they must be still dirty, as we've locked
1361          * the region for in-flight op.
1362          */
1363         bitmap_offset = QEMU_ALIGN_DOWN(offset, job->granularity);
1364         bitmap_end = QEMU_ALIGN_UP(offset + bytes, job->granularity);
1365         bdrv_set_dirty_bitmap(job->dirty_bitmap, bitmap_offset,
1366                               bitmap_end - bitmap_offset);
1367         job->actively_synced = false;
1368 
1369         action = mirror_error_action(job, false, -ret);
1370         if (action == BLOCK_ERROR_ACTION_REPORT) {
1371             if (!job->ret) {
1372                 job->ret = ret;
1373             }
1374         }
1375     }
1376 }
1377 
1378 static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s,
1379                                                    uint64_t offset,
1380                                                    uint64_t bytes)
1381 {
1382     MirrorOp *op;
1383     uint64_t start_chunk = offset / s->granularity;
1384     uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity);
1385 
1386     op = g_new(MirrorOp, 1);
1387     *op = (MirrorOp){
1388         .s                  = s,
1389         .offset             = offset,
1390         .bytes              = bytes,
1391         .is_active_write    = true,
1392         .is_in_flight       = true,
1393         .co                 = qemu_coroutine_self(),
1394     };
1395     qemu_co_queue_init(&op->waiting_requests);
1396     QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next);
1397 
1398     s->in_active_write_counter++;
1399 
1400     /*
1401      * Wait for concurrent requests affecting the area.  If there are already
1402      * running requests that are copying off now-to-be stale data in the area,
1403      * we must wait for them to finish before we begin writing fresh data to the
1404      * target so that the write operations appear in the correct order.
1405      * Note that background requests (see mirror_iteration()) in contrast only
1406      * wait for conflicting requests at the start of the dirty area, and then
1407      * (based on the in_flight_bitmap) truncate the area to copy so it will not
1408      * conflict with any requests beyond that.  For active writes, however, we
1409      * cannot truncate that area.  The request from our parent must be blocked
1410      * until the area is copied in full.  Therefore, we must wait for the whole
1411      * area to become free of concurrent requests.
1412      */
1413     mirror_wait_on_conflicts(op, s, offset, bytes);
1414 
1415     bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1416 
1417     return op;
1418 }
1419 
1420 static void coroutine_fn active_write_settle(MirrorOp *op)
1421 {
1422     uint64_t start_chunk = op->offset / op->s->granularity;
1423     uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes,
1424                                       op->s->granularity);
1425 
1426     if (!--op->s->in_active_write_counter && op->s->actively_synced) {
1427         BdrvChild *source = op->s->mirror_top_bs->backing;
1428 
1429         if (QLIST_FIRST(&source->bs->parents) == source &&
1430             QLIST_NEXT(source, next_parent) == NULL)
1431         {
1432             /* Assert that we are back in sync once all active write
1433              * operations are settled.
1434              * Note that we can only assert this if the mirror node
1435              * is the source node's only parent. */
1436             assert(!bdrv_get_dirty_count(op->s->dirty_bitmap));
1437         }
1438     }
1439     bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk);
1440     QTAILQ_REMOVE(&op->s->ops_in_flight, op, next);
1441     qemu_co_queue_restart_all(&op->waiting_requests);
1442     g_free(op);
1443 }
1444 
1445 static int coroutine_fn GRAPH_RDLOCK
1446 bdrv_mirror_top_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1447                        QEMUIOVector *qiov, BdrvRequestFlags flags)
1448 {
1449     return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags);
1450 }
1451 
1452 static int coroutine_fn GRAPH_RDLOCK
1453 bdrv_mirror_top_do_write(BlockDriverState *bs, MirrorMethod method,
1454                          uint64_t offset, uint64_t bytes, QEMUIOVector *qiov,
1455                          int flags)
1456 {
1457     MirrorOp *op = NULL;
1458     MirrorBDSOpaque *s = bs->opaque;
1459     int ret = 0;
1460     bool copy_to_target = false;
1461 
1462     if (s->job) {
1463         copy_to_target = s->job->ret >= 0 &&
1464                          !job_is_cancelled(&s->job->common.job) &&
1465                          s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
1466     }
1467 
1468     if (copy_to_target) {
1469         op = active_write_prepare(s->job, offset, bytes);
1470     }
1471 
1472     switch (method) {
1473     case MIRROR_METHOD_COPY:
1474         ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags);
1475         break;
1476 
1477     case MIRROR_METHOD_ZERO:
1478         ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags);
1479         break;
1480 
1481     case MIRROR_METHOD_DISCARD:
1482         ret = bdrv_co_pdiscard(bs->backing, offset, bytes);
1483         break;
1484 
1485     default:
1486         abort();
1487     }
1488 
1489     if (ret < 0) {
1490         goto out;
1491     }
1492 
1493     if (copy_to_target) {
1494         do_sync_target_write(s->job, method, offset, bytes, qiov, flags);
1495     }
1496 
1497 out:
1498     if (copy_to_target) {
1499         active_write_settle(op);
1500     }
1501     return ret;
1502 }
1503 
1504 static int coroutine_fn GRAPH_RDLOCK
1505 bdrv_mirror_top_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1506                         QEMUIOVector *qiov, BdrvRequestFlags flags)
1507 {
1508     MirrorBDSOpaque *s = bs->opaque;
1509     QEMUIOVector bounce_qiov;
1510     void *bounce_buf;
1511     int ret = 0;
1512     bool copy_to_target = false;
1513 
1514     if (s->job) {
1515         copy_to_target = s->job->ret >= 0 &&
1516                          !job_is_cancelled(&s->job->common.job) &&
1517                          s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING;
1518     }
1519 
1520     if (copy_to_target) {
1521         /* The guest might concurrently modify the data to write; but
1522          * the data on source and destination must match, so we have
1523          * to use a bounce buffer if we are going to write to the
1524          * target now. */
1525         bounce_buf = qemu_blockalign(bs, bytes);
1526         iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes);
1527 
1528         qemu_iovec_init(&bounce_qiov, 1);
1529         qemu_iovec_add(&bounce_qiov, bounce_buf, bytes);
1530         qiov = &bounce_qiov;
1531 
1532         flags &= ~BDRV_REQ_REGISTERED_BUF;
1533     }
1534 
1535     ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, offset, bytes, qiov,
1536                                    flags);
1537 
1538     if (copy_to_target) {
1539         qemu_iovec_destroy(&bounce_qiov);
1540         qemu_vfree(bounce_buf);
1541     }
1542 
1543     return ret;
1544 }
1545 
1546 static int coroutine_fn GRAPH_RDLOCK bdrv_mirror_top_flush(BlockDriverState *bs)
1547 {
1548     if (bs->backing == NULL) {
1549         /* we can be here after failed bdrv_append in mirror_start_job */
1550         return 0;
1551     }
1552     return bdrv_co_flush(bs->backing->bs);
1553 }
1554 
1555 static int coroutine_fn GRAPH_RDLOCK
1556 bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1557                               int64_t bytes, BdrvRequestFlags flags)
1558 {
1559     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, offset, bytes, NULL,
1560                                     flags);
1561 }
1562 
1563 static int coroutine_fn GRAPH_RDLOCK
1564 bdrv_mirror_top_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1565 {
1566     return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, offset, bytes,
1567                                     NULL, 0);
1568 }
1569 
1570 static void bdrv_mirror_top_refresh_filename(BlockDriverState *bs)
1571 {
1572     if (bs->backing == NULL) {
1573         /* we can be here after failed bdrv_attach_child in
1574          * bdrv_set_backing_hd */
1575         return;
1576     }
1577     pstrcpy(bs->exact_filename, sizeof(bs->exact_filename),
1578             bs->backing->bs->filename);
1579 }
1580 
1581 static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c,
1582                                        BdrvChildRole role,
1583                                        BlockReopenQueue *reopen_queue,
1584                                        uint64_t perm, uint64_t shared,
1585                                        uint64_t *nperm, uint64_t *nshared)
1586 {
1587     MirrorBDSOpaque *s = bs->opaque;
1588 
1589     if (s->stop) {
1590         /*
1591          * If the job is to be stopped, we do not need to forward
1592          * anything to the real image.
1593          */
1594         *nperm = 0;
1595         *nshared = BLK_PERM_ALL;
1596         return;
1597     }
1598 
1599     bdrv_default_perms(bs, c, role, reopen_queue,
1600                        perm, shared, nperm, nshared);
1601 
1602     if (s->is_commit) {
1603         /*
1604          * For commit jobs, we cannot take CONSISTENT_READ, because
1605          * that permission is unshared for everything above the base
1606          * node (except for filters on the base node).
1607          * We also have to force-share the WRITE permission, or
1608          * otherwise we would block ourselves at the base node (if
1609          * writes are blocked for a node, they are also blocked for
1610          * its backing file).
1611          * (We could also share RESIZE, because it may be needed for
1612          * the target if its size is less than the top node's; but
1613          * bdrv_default_perms_for_cow() automatically shares RESIZE
1614          * for backing nodes if WRITE is shared, so there is no need
1615          * to do it here.)
1616          */
1617         *nperm &= ~BLK_PERM_CONSISTENT_READ;
1618         *nshared |= BLK_PERM_WRITE;
1619     }
1620 }
1621 
1622 /* Dummy node that provides consistent read to its users without requiring it
1623  * from its backing file and that allows writes on the backing file chain. */
1624 static BlockDriver bdrv_mirror_top = {
1625     .format_name                = "mirror_top",
1626     .bdrv_co_preadv             = bdrv_mirror_top_preadv,
1627     .bdrv_co_pwritev            = bdrv_mirror_top_pwritev,
1628     .bdrv_co_pwrite_zeroes      = bdrv_mirror_top_pwrite_zeroes,
1629     .bdrv_co_pdiscard           = bdrv_mirror_top_pdiscard,
1630     .bdrv_co_flush              = bdrv_mirror_top_flush,
1631     .bdrv_refresh_filename      = bdrv_mirror_top_refresh_filename,
1632     .bdrv_child_perm            = bdrv_mirror_top_child_perm,
1633 
1634     .is_filter                  = true,
1635     .filtered_child_is_backing  = true,
1636 };
1637 
1638 static BlockJob *mirror_start_job(
1639                              const char *job_id, BlockDriverState *bs,
1640                              int creation_flags, BlockDriverState *target,
1641                              const char *replaces, int64_t speed,
1642                              uint32_t granularity, int64_t buf_size,
1643                              BlockMirrorBackingMode backing_mode,
1644                              bool zero_target,
1645                              BlockdevOnError on_source_error,
1646                              BlockdevOnError on_target_error,
1647                              bool unmap,
1648                              BlockCompletionFunc *cb,
1649                              void *opaque,
1650                              const BlockJobDriver *driver,
1651                              bool is_none_mode, BlockDriverState *base,
1652                              bool auto_complete, const char *filter_node_name,
1653                              bool is_mirror, MirrorCopyMode copy_mode,
1654                              Error **errp)
1655 {
1656     MirrorBlockJob *s;
1657     MirrorBDSOpaque *bs_opaque;
1658     BlockDriverState *mirror_top_bs;
1659     bool target_is_backing;
1660     uint64_t target_perms, target_shared_perms;
1661     int ret;
1662 
1663     if (granularity == 0) {
1664         granularity = bdrv_get_default_bitmap_granularity(target);
1665     }
1666 
1667     assert(is_power_of_2(granularity));
1668 
1669     if (buf_size < 0) {
1670         error_setg(errp, "Invalid parameter 'buf-size'");
1671         return NULL;
1672     }
1673 
1674     if (buf_size == 0) {
1675         buf_size = DEFAULT_MIRROR_BUF_SIZE;
1676     }
1677 
1678     if (bdrv_skip_filters(bs) == bdrv_skip_filters(target)) {
1679         error_setg(errp, "Can't mirror node into itself");
1680         return NULL;
1681     }
1682 
1683     target_is_backing = bdrv_chain_contains(bs, target);
1684 
1685     /* In the case of active commit, add dummy driver to provide consistent
1686      * reads on the top, while disabling it in the intermediate nodes, and make
1687      * the backing chain writable. */
1688     mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name,
1689                                          BDRV_O_RDWR, errp);
1690     if (mirror_top_bs == NULL) {
1691         return NULL;
1692     }
1693     if (!filter_node_name) {
1694         mirror_top_bs->implicit = true;
1695     }
1696 
1697     /* So that we can always drop this node */
1698     mirror_top_bs->never_freeze = true;
1699 
1700     mirror_top_bs->total_sectors = bs->total_sectors;
1701     mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED;
1702     mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED |
1703                                           BDRV_REQ_NO_FALLBACK;
1704     bs_opaque = g_new0(MirrorBDSOpaque, 1);
1705     mirror_top_bs->opaque = bs_opaque;
1706 
1707     bs_opaque->is_commit = target_is_backing;
1708 
1709     bdrv_drained_begin(bs);
1710     ret = bdrv_append(mirror_top_bs, bs, errp);
1711     bdrv_drained_end(bs);
1712 
1713     if (ret < 0) {
1714         bdrv_unref(mirror_top_bs);
1715         return NULL;
1716     }
1717 
1718     /* Make sure that the source is not resized while the job is running */
1719     s = block_job_create(job_id, driver, NULL, mirror_top_bs,
1720                          BLK_PERM_CONSISTENT_READ,
1721                          BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1722                          BLK_PERM_WRITE, speed,
1723                          creation_flags, cb, opaque, errp);
1724     if (!s) {
1725         goto fail;
1726     }
1727 
1728     /* The block job now has a reference to this node */
1729     bdrv_unref(mirror_top_bs);
1730 
1731     s->mirror_top_bs = mirror_top_bs;
1732 
1733     /* No resize for the target either; while the mirror is still running, a
1734      * consistent read isn't necessarily possible. We could possibly allow
1735      * writes and graph modifications, though it would likely defeat the
1736      * purpose of a mirror, so leave them blocked for now.
1737      *
1738      * In the case of active commit, things look a bit different, though,
1739      * because the target is an already populated backing file in active use.
1740      * We can allow anything except resize there.*/
1741 
1742     target_perms = BLK_PERM_WRITE;
1743     target_shared_perms = BLK_PERM_WRITE_UNCHANGED;
1744 
1745     if (target_is_backing) {
1746         int64_t bs_size, target_size;
1747         bs_size = bdrv_getlength(bs);
1748         if (bs_size < 0) {
1749             error_setg_errno(errp, -bs_size,
1750                              "Could not inquire top image size");
1751             goto fail;
1752         }
1753 
1754         target_size = bdrv_getlength(target);
1755         if (target_size < 0) {
1756             error_setg_errno(errp, -target_size,
1757                              "Could not inquire base image size");
1758             goto fail;
1759         }
1760 
1761         if (target_size < bs_size) {
1762             target_perms |= BLK_PERM_RESIZE;
1763         }
1764 
1765         target_shared_perms |= BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE;
1766     } else if (bdrv_chain_contains(bs, bdrv_skip_filters(target))) {
1767         /*
1768          * We may want to allow this in the future, but it would
1769          * require taking some extra care.
1770          */
1771         error_setg(errp, "Cannot mirror to a filter on top of a node in the "
1772                    "source's backing chain");
1773         goto fail;
1774     }
1775 
1776     s->target = blk_new(s->common.job.aio_context,
1777                         target_perms, target_shared_perms);
1778     ret = blk_insert_bs(s->target, target, errp);
1779     if (ret < 0) {
1780         goto fail;
1781     }
1782     if (is_mirror) {
1783         /* XXX: Mirror target could be a NBD server of target QEMU in the case
1784          * of non-shared block migration. To allow migration completion, we
1785          * have to allow "inactivate" of the target BB.  When that happens, we
1786          * know the job is drained, and the vcpus are stopped, so no write
1787          * operation will be performed. Block layer already has assertions to
1788          * ensure that. */
1789         blk_set_force_allow_inactivate(s->target);
1790     }
1791     blk_set_allow_aio_context_change(s->target, true);
1792     blk_set_disable_request_queuing(s->target, true);
1793 
1794     s->replaces = g_strdup(replaces);
1795     s->on_source_error = on_source_error;
1796     s->on_target_error = on_target_error;
1797     s->is_none_mode = is_none_mode;
1798     s->backing_mode = backing_mode;
1799     s->zero_target = zero_target;
1800     s->copy_mode = copy_mode;
1801     s->base = base;
1802     s->base_overlay = bdrv_find_overlay(bs, base);
1803     s->granularity = granularity;
1804     s->buf_size = ROUND_UP(buf_size, granularity);
1805     s->unmap = unmap;
1806     if (auto_complete) {
1807         s->should_complete = true;
1808     }
1809 
1810     s->dirty_bitmap = bdrv_create_dirty_bitmap(bs, granularity, NULL, errp);
1811     if (!s->dirty_bitmap) {
1812         goto fail;
1813     }
1814     if (s->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING) {
1815         bdrv_disable_dirty_bitmap(s->dirty_bitmap);
1816     }
1817 
1818     ret = block_job_add_bdrv(&s->common, "source", bs, 0,
1819                              BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE |
1820                              BLK_PERM_CONSISTENT_READ,
1821                              errp);
1822     if (ret < 0) {
1823         goto fail;
1824     }
1825 
1826     /* Required permissions are already taken with blk_new() */
1827     block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL,
1828                        &error_abort);
1829 
1830     /* In commit_active_start() all intermediate nodes disappear, so
1831      * any jobs in them must be blocked */
1832     if (target_is_backing) {
1833         BlockDriverState *iter, *filtered_target;
1834         uint64_t iter_shared_perms;
1835 
1836         /*
1837          * The topmost node with
1838          * bdrv_skip_filters(filtered_target) == bdrv_skip_filters(target)
1839          */
1840         filtered_target = bdrv_cow_bs(bdrv_find_overlay(bs, target));
1841 
1842         assert(bdrv_skip_filters(filtered_target) ==
1843                bdrv_skip_filters(target));
1844 
1845         /*
1846          * XXX BLK_PERM_WRITE needs to be allowed so we don't block
1847          * ourselves at s->base (if writes are blocked for a node, they are
1848          * also blocked for its backing file). The other options would be a
1849          * second filter driver above s->base (== target).
1850          */
1851         iter_shared_perms = BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE;
1852 
1853         for (iter = bdrv_filter_or_cow_bs(bs); iter != target;
1854              iter = bdrv_filter_or_cow_bs(iter))
1855         {
1856             if (iter == filtered_target) {
1857                 /*
1858                  * From here on, all nodes are filters on the base.
1859                  * This allows us to share BLK_PERM_CONSISTENT_READ.
1860                  */
1861                 iter_shared_perms |= BLK_PERM_CONSISTENT_READ;
1862             }
1863 
1864             ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0,
1865                                      iter_shared_perms, errp);
1866             if (ret < 0) {
1867                 goto fail;
1868             }
1869         }
1870 
1871         if (bdrv_freeze_backing_chain(mirror_top_bs, target, errp) < 0) {
1872             goto fail;
1873         }
1874     }
1875 
1876     QTAILQ_INIT(&s->ops_in_flight);
1877 
1878     trace_mirror_start(bs, s, opaque);
1879     job_start(&s->common.job);
1880 
1881     return &s->common;
1882 
1883 fail:
1884     if (s) {
1885         /* Make sure this BDS does not go away until we have completed the graph
1886          * changes below */
1887         bdrv_ref(mirror_top_bs);
1888 
1889         g_free(s->replaces);
1890         blk_unref(s->target);
1891         bs_opaque->job = NULL;
1892         if (s->dirty_bitmap) {
1893             bdrv_release_dirty_bitmap(s->dirty_bitmap);
1894         }
1895         job_early_fail(&s->common.job);
1896     }
1897 
1898     bs_opaque->stop = true;
1899     bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing,
1900                              &error_abort);
1901     bdrv_replace_node(mirror_top_bs, mirror_top_bs->backing->bs, &error_abort);
1902 
1903     bdrv_unref(mirror_top_bs);
1904 
1905     return NULL;
1906 }
1907 
1908 void mirror_start(const char *job_id, BlockDriverState *bs,
1909                   BlockDriverState *target, const char *replaces,
1910                   int creation_flags, int64_t speed,
1911                   uint32_t granularity, int64_t buf_size,
1912                   MirrorSyncMode mode, BlockMirrorBackingMode backing_mode,
1913                   bool zero_target,
1914                   BlockdevOnError on_source_error,
1915                   BlockdevOnError on_target_error,
1916                   bool unmap, const char *filter_node_name,
1917                   MirrorCopyMode copy_mode, Error **errp)
1918 {
1919     bool is_none_mode;
1920     BlockDriverState *base;
1921 
1922     GLOBAL_STATE_CODE();
1923 
1924     if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) ||
1925         (mode == MIRROR_SYNC_MODE_BITMAP)) {
1926         error_setg(errp, "Sync mode '%s' not supported",
1927                    MirrorSyncMode_str(mode));
1928         return;
1929     }
1930     is_none_mode = mode == MIRROR_SYNC_MODE_NONE;
1931     base = mode == MIRROR_SYNC_MODE_TOP ? bdrv_backing_chain_next(bs) : NULL;
1932     mirror_start_job(job_id, bs, creation_flags, target, replaces,
1933                      speed, granularity, buf_size, backing_mode, zero_target,
1934                      on_source_error, on_target_error, unmap, NULL, NULL,
1935                      &mirror_job_driver, is_none_mode, base, false,
1936                      filter_node_name, true, copy_mode, errp);
1937 }
1938 
1939 BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs,
1940                               BlockDriverState *base, int creation_flags,
1941                               int64_t speed, BlockdevOnError on_error,
1942                               const char *filter_node_name,
1943                               BlockCompletionFunc *cb, void *opaque,
1944                               bool auto_complete, Error **errp)
1945 {
1946     bool base_read_only;
1947     BlockJob *job;
1948 
1949     GLOBAL_STATE_CODE();
1950 
1951     base_read_only = bdrv_is_read_only(base);
1952 
1953     if (base_read_only) {
1954         if (bdrv_reopen_set_read_only(base, false, errp) < 0) {
1955             return NULL;
1956         }
1957     }
1958 
1959     job = mirror_start_job(
1960                      job_id, bs, creation_flags, base, NULL, speed, 0, 0,
1961                      MIRROR_LEAVE_BACKING_CHAIN, false,
1962                      on_error, on_error, true, cb, opaque,
1963                      &commit_active_job_driver, false, base, auto_complete,
1964                      filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND,
1965                      errp);
1966     if (!job) {
1967         goto error_restore_flags;
1968     }
1969 
1970     return job;
1971 
1972 error_restore_flags:
1973     /* ignore error and errp for bdrv_reopen, because we want to propagate
1974      * the original error */
1975     if (base_read_only) {
1976         bdrv_reopen_set_read_only(base, true, NULL);
1977     }
1978     return NULL;
1979 }
1980