xref: /openbmc/qemu/block/backup.c (revision 7609ffb9191e3fc473203f4bd58b934161eab358)
1 /*
2  * QEMU backup
3  *
4  * Copyright (C) 2013 Proxmox Server Solutions
5  *
6  * Authors:
7  *  Dietmar Maurer (dietmar@proxmox.com)
8  *
9  * This work is licensed under the terms of the GNU GPL, version 2 or later.
10  * See the COPYING file in the top-level directory.
11  *
12  */
13 
14 #include "qemu/osdep.h"
15 
16 #include "trace.h"
17 #include "block/block.h"
18 #include "block/block_int.h"
19 #include "block/blockjob_int.h"
20 #include "block/block_backup.h"
21 #include "qapi/error.h"
22 #include "qapi/qmp/qerror.h"
23 #include "qemu/ratelimit.h"
24 #include "qemu/cutils.h"
25 #include "sysemu/block-backend.h"
26 #include "qemu/bitmap.h"
27 #include "qemu/error-report.h"
28 
29 #define BACKUP_CLUSTER_SIZE_DEFAULT (1 << 16)
30 #define SLICE_TIME 100000000ULL /* ns */
31 
32 typedef struct BackupBlockJob {
33     BlockJob common;
34     BlockBackend *target;
35     /* bitmap for sync=incremental */
36     BdrvDirtyBitmap *sync_bitmap;
37     MirrorSyncMode sync_mode;
38     RateLimit limit;
39     BlockdevOnError on_source_error;
40     BlockdevOnError on_target_error;
41     CoRwlock flush_rwlock;
42     uint64_t sectors_read;
43     unsigned long *done_bitmap;
44     int64_t cluster_size;
45     bool compress;
46     NotifierWithReturn before_write;
47     QLIST_HEAD(, CowRequest) inflight_reqs;
48 } BackupBlockJob;
49 
50 /* Size of a cluster in sectors, instead of bytes. */
51 static inline int64_t cluster_size_sectors(BackupBlockJob *job)
52 {
53   return job->cluster_size / BDRV_SECTOR_SIZE;
54 }
55 
56 /* See if in-flight requests overlap and wait for them to complete */
57 static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job,
58                                                        int64_t start,
59                                                        int64_t end)
60 {
61     CowRequest *req;
62     bool retry;
63 
64     do {
65         retry = false;
66         QLIST_FOREACH(req, &job->inflight_reqs, list) {
67             if (end > req->start && start < req->end) {
68                 qemu_co_queue_wait(&req->wait_queue, NULL);
69                 retry = true;
70                 break;
71             }
72         }
73     } while (retry);
74 }
75 
76 /* Keep track of an in-flight request */
77 static void cow_request_begin(CowRequest *req, BackupBlockJob *job,
78                                      int64_t start, int64_t end)
79 {
80     req->start = start;
81     req->end = end;
82     qemu_co_queue_init(&req->wait_queue);
83     QLIST_INSERT_HEAD(&job->inflight_reqs, req, list);
84 }
85 
86 /* Forget about a completed request */
87 static void cow_request_end(CowRequest *req)
88 {
89     QLIST_REMOVE(req, list);
90     qemu_co_queue_restart_all(&req->wait_queue);
91 }
92 
93 static int coroutine_fn backup_do_cow(BackupBlockJob *job,
94                                       int64_t sector_num, int nb_sectors,
95                                       bool *error_is_read,
96                                       bool is_write_notifier)
97 {
98     BlockBackend *blk = job->common.blk;
99     CowRequest cow_request;
100     struct iovec iov;
101     QEMUIOVector bounce_qiov;
102     void *bounce_buffer = NULL;
103     int ret = 0;
104     int64_t sectors_per_cluster = cluster_size_sectors(job);
105     int64_t start, end;
106     int n;
107 
108     qemu_co_rwlock_rdlock(&job->flush_rwlock);
109 
110     start = sector_num / sectors_per_cluster;
111     end = DIV_ROUND_UP(sector_num + nb_sectors, sectors_per_cluster);
112 
113     trace_backup_do_cow_enter(job, start, sector_num, nb_sectors);
114 
115     wait_for_overlapping_requests(job, start, end);
116     cow_request_begin(&cow_request, job, start, end);
117 
118     for (; start < end; start++) {
119         if (test_bit(start, job->done_bitmap)) {
120             trace_backup_do_cow_skip(job, start);
121             continue; /* already copied */
122         }
123 
124         trace_backup_do_cow_process(job, start);
125 
126         n = MIN(sectors_per_cluster,
127                 job->common.len / BDRV_SECTOR_SIZE -
128                 start * sectors_per_cluster);
129 
130         if (!bounce_buffer) {
131             bounce_buffer = blk_blockalign(blk, job->cluster_size);
132         }
133         iov.iov_base = bounce_buffer;
134         iov.iov_len = n * BDRV_SECTOR_SIZE;
135         qemu_iovec_init_external(&bounce_qiov, &iov, 1);
136 
137         ret = blk_co_preadv(blk, start * job->cluster_size,
138                             bounce_qiov.size, &bounce_qiov,
139                             is_write_notifier ? BDRV_REQ_NO_SERIALISING : 0);
140         if (ret < 0) {
141             trace_backup_do_cow_read_fail(job, start, ret);
142             if (error_is_read) {
143                 *error_is_read = true;
144             }
145             goto out;
146         }
147 
148         if (buffer_is_zero(iov.iov_base, iov.iov_len)) {
149             ret = blk_co_pwrite_zeroes(job->target, start * job->cluster_size,
150                                        bounce_qiov.size, BDRV_REQ_MAY_UNMAP);
151         } else {
152             ret = blk_co_pwritev(job->target, start * job->cluster_size,
153                                  bounce_qiov.size, &bounce_qiov,
154                                  job->compress ? BDRV_REQ_WRITE_COMPRESSED : 0);
155         }
156         if (ret < 0) {
157             trace_backup_do_cow_write_fail(job, start, ret);
158             if (error_is_read) {
159                 *error_is_read = false;
160             }
161             goto out;
162         }
163 
164         set_bit(start, job->done_bitmap);
165 
166         /* Publish progress, guest I/O counts as progress too.  Note that the
167          * offset field is an opaque progress value, it is not a disk offset.
168          */
169         job->sectors_read += n;
170         job->common.offset += n * BDRV_SECTOR_SIZE;
171     }
172 
173 out:
174     if (bounce_buffer) {
175         qemu_vfree(bounce_buffer);
176     }
177 
178     cow_request_end(&cow_request);
179 
180     trace_backup_do_cow_return(job, sector_num, nb_sectors, ret);
181 
182     qemu_co_rwlock_unlock(&job->flush_rwlock);
183 
184     return ret;
185 }
186 
187 static int coroutine_fn backup_before_write_notify(
188         NotifierWithReturn *notifier,
189         void *opaque)
190 {
191     BackupBlockJob *job = container_of(notifier, BackupBlockJob, before_write);
192     BdrvTrackedRequest *req = opaque;
193     int64_t sector_num = req->offset >> BDRV_SECTOR_BITS;
194     int nb_sectors = req->bytes >> BDRV_SECTOR_BITS;
195 
196     assert(req->bs == blk_bs(job->common.blk));
197     assert((req->offset & (BDRV_SECTOR_SIZE - 1)) == 0);
198     assert((req->bytes & (BDRV_SECTOR_SIZE - 1)) == 0);
199 
200     return backup_do_cow(job, sector_num, nb_sectors, NULL, true);
201 }
202 
203 static void backup_set_speed(BlockJob *job, int64_t speed, Error **errp)
204 {
205     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
206 
207     if (speed < 0) {
208         error_setg(errp, QERR_INVALID_PARAMETER, "speed");
209         return;
210     }
211     ratelimit_set_speed(&s->limit, speed / BDRV_SECTOR_SIZE, SLICE_TIME);
212 }
213 
214 static void backup_cleanup_sync_bitmap(BackupBlockJob *job, int ret)
215 {
216     BdrvDirtyBitmap *bm;
217     BlockDriverState *bs = blk_bs(job->common.blk);
218 
219     if (ret < 0 || block_job_is_cancelled(&job->common)) {
220         /* Merge the successor back into the parent, delete nothing. */
221         bm = bdrv_reclaim_dirty_bitmap(bs, job->sync_bitmap, NULL);
222         assert(bm);
223     } else {
224         /* Everything is fine, delete this bitmap and install the backup. */
225         bm = bdrv_dirty_bitmap_abdicate(bs, job->sync_bitmap, NULL);
226         assert(bm);
227     }
228 }
229 
230 static void backup_commit(BlockJob *job)
231 {
232     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
233     if (s->sync_bitmap) {
234         backup_cleanup_sync_bitmap(s, 0);
235     }
236 }
237 
238 static void backup_abort(BlockJob *job)
239 {
240     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
241     if (s->sync_bitmap) {
242         backup_cleanup_sync_bitmap(s, -1);
243     }
244 }
245 
246 static void backup_clean(BlockJob *job)
247 {
248     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
249     assert(s->target);
250     blk_unref(s->target);
251     s->target = NULL;
252 }
253 
254 static void backup_attached_aio_context(BlockJob *job, AioContext *aio_context)
255 {
256     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
257 
258     blk_set_aio_context(s->target, aio_context);
259 }
260 
261 void backup_do_checkpoint(BlockJob *job, Error **errp)
262 {
263     BackupBlockJob *backup_job = container_of(job, BackupBlockJob, common);
264     int64_t len;
265 
266     assert(job->driver->job_type == BLOCK_JOB_TYPE_BACKUP);
267 
268     if (backup_job->sync_mode != MIRROR_SYNC_MODE_NONE) {
269         error_setg(errp, "The backup job only supports block checkpoint in"
270                    " sync=none mode");
271         return;
272     }
273 
274     len = DIV_ROUND_UP(backup_job->common.len, backup_job->cluster_size);
275     bitmap_zero(backup_job->done_bitmap, len);
276 }
277 
278 void backup_wait_for_overlapping_requests(BlockJob *job, int64_t sector_num,
279                                           int nb_sectors)
280 {
281     BackupBlockJob *backup_job = container_of(job, BackupBlockJob, common);
282     int64_t sectors_per_cluster = cluster_size_sectors(backup_job);
283     int64_t start, end;
284 
285     assert(job->driver->job_type == BLOCK_JOB_TYPE_BACKUP);
286 
287     start = sector_num / sectors_per_cluster;
288     end = DIV_ROUND_UP(sector_num + nb_sectors, sectors_per_cluster);
289     wait_for_overlapping_requests(backup_job, start, end);
290 }
291 
292 void backup_cow_request_begin(CowRequest *req, BlockJob *job,
293                               int64_t sector_num,
294                               int nb_sectors)
295 {
296     BackupBlockJob *backup_job = container_of(job, BackupBlockJob, common);
297     int64_t sectors_per_cluster = cluster_size_sectors(backup_job);
298     int64_t start, end;
299 
300     assert(job->driver->job_type == BLOCK_JOB_TYPE_BACKUP);
301 
302     start = sector_num / sectors_per_cluster;
303     end = DIV_ROUND_UP(sector_num + nb_sectors, sectors_per_cluster);
304     cow_request_begin(req, backup_job, start, end);
305 }
306 
307 void backup_cow_request_end(CowRequest *req)
308 {
309     cow_request_end(req);
310 }
311 
312 static void backup_drain(BlockJob *job)
313 {
314     BackupBlockJob *s = container_of(job, BackupBlockJob, common);
315 
316     /* Need to keep a reference in case blk_drain triggers execution
317      * of backup_complete...
318      */
319     if (s->target) {
320         BlockBackend *target = s->target;
321         blk_ref(target);
322         blk_drain(target);
323         blk_unref(target);
324     }
325 }
326 
327 static BlockErrorAction backup_error_action(BackupBlockJob *job,
328                                             bool read, int error)
329 {
330     if (read) {
331         return block_job_error_action(&job->common, job->on_source_error,
332                                       true, error);
333     } else {
334         return block_job_error_action(&job->common, job->on_target_error,
335                                       false, error);
336     }
337 }
338 
339 typedef struct {
340     int ret;
341 } BackupCompleteData;
342 
343 static void backup_complete(BlockJob *job, void *opaque)
344 {
345     BackupCompleteData *data = opaque;
346 
347     block_job_completed(job, data->ret);
348     g_free(data);
349 }
350 
351 static bool coroutine_fn yield_and_check(BackupBlockJob *job)
352 {
353     if (block_job_is_cancelled(&job->common)) {
354         return true;
355     }
356 
357     /* we need to yield so that bdrv_drain_all() returns.
358      * (without, VM does not reboot)
359      */
360     if (job->common.speed) {
361         uint64_t delay_ns = ratelimit_calculate_delay(&job->limit,
362                                                       job->sectors_read);
363         job->sectors_read = 0;
364         block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, delay_ns);
365     } else {
366         block_job_sleep_ns(&job->common, QEMU_CLOCK_REALTIME, 0);
367     }
368 
369     if (block_job_is_cancelled(&job->common)) {
370         return true;
371     }
372 
373     return false;
374 }
375 
376 static int coroutine_fn backup_run_incremental(BackupBlockJob *job)
377 {
378     bool error_is_read;
379     int ret = 0;
380     int clusters_per_iter;
381     uint32_t granularity;
382     int64_t sector;
383     int64_t cluster;
384     int64_t end;
385     int64_t last_cluster = -1;
386     int64_t sectors_per_cluster = cluster_size_sectors(job);
387     BdrvDirtyBitmapIter *dbi;
388 
389     granularity = bdrv_dirty_bitmap_granularity(job->sync_bitmap);
390     clusters_per_iter = MAX((granularity / job->cluster_size), 1);
391     dbi = bdrv_dirty_iter_new(job->sync_bitmap, 0);
392 
393     /* Find the next dirty sector(s) */
394     while ((sector = bdrv_dirty_iter_next(dbi)) != -1) {
395         cluster = sector / sectors_per_cluster;
396 
397         /* Fake progress updates for any clusters we skipped */
398         if (cluster != last_cluster + 1) {
399             job->common.offset += ((cluster - last_cluster - 1) *
400                                    job->cluster_size);
401         }
402 
403         for (end = cluster + clusters_per_iter; cluster < end; cluster++) {
404             do {
405                 if (yield_and_check(job)) {
406                     goto out;
407                 }
408                 ret = backup_do_cow(job, cluster * sectors_per_cluster,
409                                     sectors_per_cluster, &error_is_read,
410                                     false);
411                 if ((ret < 0) &&
412                     backup_error_action(job, error_is_read, -ret) ==
413                     BLOCK_ERROR_ACTION_REPORT) {
414                     goto out;
415                 }
416             } while (ret < 0);
417         }
418 
419         /* If the bitmap granularity is smaller than the backup granularity,
420          * we need to advance the iterator pointer to the next cluster. */
421         if (granularity < job->cluster_size) {
422             bdrv_set_dirty_iter(dbi, cluster * sectors_per_cluster);
423         }
424 
425         last_cluster = cluster - 1;
426     }
427 
428     /* Play some final catchup with the progress meter */
429     end = DIV_ROUND_UP(job->common.len, job->cluster_size);
430     if (last_cluster + 1 < end) {
431         job->common.offset += ((end - last_cluster - 1) * job->cluster_size);
432     }
433 
434 out:
435     bdrv_dirty_iter_free(dbi);
436     return ret;
437 }
438 
439 static void coroutine_fn backup_run(void *opaque)
440 {
441     BackupBlockJob *job = opaque;
442     BackupCompleteData *data;
443     BlockDriverState *bs = blk_bs(job->common.blk);
444     int64_t start, end;
445     int64_t sectors_per_cluster = cluster_size_sectors(job);
446     int ret = 0;
447 
448     QLIST_INIT(&job->inflight_reqs);
449     qemu_co_rwlock_init(&job->flush_rwlock);
450 
451     start = 0;
452     end = DIV_ROUND_UP(job->common.len, job->cluster_size);
453 
454     job->done_bitmap = bitmap_new(end);
455 
456     job->before_write.notify = backup_before_write_notify;
457     bdrv_add_before_write_notifier(bs, &job->before_write);
458 
459     if (job->sync_mode == MIRROR_SYNC_MODE_NONE) {
460         while (!block_job_is_cancelled(&job->common)) {
461             /* Yield until the job is cancelled.  We just let our before_write
462              * notify callback service CoW requests. */
463             block_job_yield(&job->common);
464         }
465     } else if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
466         ret = backup_run_incremental(job);
467     } else {
468         /* Both FULL and TOP SYNC_MODE's require copying.. */
469         for (; start < end; start++) {
470             bool error_is_read;
471             int alloced = 0;
472 
473             if (yield_and_check(job)) {
474                 break;
475             }
476 
477             if (job->sync_mode == MIRROR_SYNC_MODE_TOP) {
478                 int i, n;
479 
480                 /* Check to see if these blocks are already in the
481                  * backing file. */
482 
483                 for (i = 0; i < sectors_per_cluster;) {
484                     /* bdrv_is_allocated() only returns true/false based
485                      * on the first set of sectors it comes across that
486                      * are are all in the same state.
487                      * For that reason we must verify each sector in the
488                      * backup cluster length.  We end up copying more than
489                      * needed but at some point that is always the case. */
490                     alloced =
491                         bdrv_is_allocated(bs,
492                                 start * sectors_per_cluster + i,
493                                 sectors_per_cluster - i, &n);
494                     i += n;
495 
496                     if (alloced || n == 0) {
497                         break;
498                     }
499                 }
500 
501                 /* If the above loop never found any sectors that are in
502                  * the topmost image, skip this backup. */
503                 if (alloced == 0) {
504                     continue;
505                 }
506             }
507             /* FULL sync mode we copy the whole drive. */
508             if (alloced < 0) {
509                 ret = alloced;
510             } else {
511                 ret = backup_do_cow(job, start * sectors_per_cluster,
512                                     sectors_per_cluster, &error_is_read,
513                                     false);
514             }
515             if (ret < 0) {
516                 /* Depending on error action, fail now or retry cluster */
517                 BlockErrorAction action =
518                     backup_error_action(job, error_is_read, -ret);
519                 if (action == BLOCK_ERROR_ACTION_REPORT) {
520                     break;
521                 } else {
522                     start--;
523                     continue;
524                 }
525             }
526         }
527     }
528 
529     notifier_with_return_remove(&job->before_write);
530 
531     /* wait until pending backup_do_cow() calls have completed */
532     qemu_co_rwlock_wrlock(&job->flush_rwlock);
533     qemu_co_rwlock_unlock(&job->flush_rwlock);
534     g_free(job->done_bitmap);
535 
536     data = g_malloc(sizeof(*data));
537     data->ret = ret;
538     block_job_defer_to_main_loop(&job->common, backup_complete, data);
539 }
540 
541 static const BlockJobDriver backup_job_driver = {
542     .instance_size          = sizeof(BackupBlockJob),
543     .job_type               = BLOCK_JOB_TYPE_BACKUP,
544     .start                  = backup_run,
545     .set_speed              = backup_set_speed,
546     .commit                 = backup_commit,
547     .abort                  = backup_abort,
548     .clean                  = backup_clean,
549     .attached_aio_context   = backup_attached_aio_context,
550     .drain                  = backup_drain,
551 };
552 
553 BlockJob *backup_job_create(const char *job_id, BlockDriverState *bs,
554                   BlockDriverState *target, int64_t speed,
555                   MirrorSyncMode sync_mode, BdrvDirtyBitmap *sync_bitmap,
556                   bool compress,
557                   BlockdevOnError on_source_error,
558                   BlockdevOnError on_target_error,
559                   int creation_flags,
560                   BlockCompletionFunc *cb, void *opaque,
561                   BlockJobTxn *txn, Error **errp)
562 {
563     int64_t len;
564     BlockDriverInfo bdi;
565     BackupBlockJob *job = NULL;
566     int ret;
567 
568     assert(bs);
569     assert(target);
570 
571     if (bs == target) {
572         error_setg(errp, "Source and target cannot be the same");
573         return NULL;
574     }
575 
576     if (!bdrv_is_inserted(bs)) {
577         error_setg(errp, "Device is not inserted: %s",
578                    bdrv_get_device_name(bs));
579         return NULL;
580     }
581 
582     if (!bdrv_is_inserted(target)) {
583         error_setg(errp, "Device is not inserted: %s",
584                    bdrv_get_device_name(target));
585         return NULL;
586     }
587 
588     if (compress && target->drv->bdrv_co_pwritev_compressed == NULL) {
589         error_setg(errp, "Compression is not supported for this drive %s",
590                    bdrv_get_device_name(target));
591         return NULL;
592     }
593 
594     if (bdrv_op_is_blocked(bs, BLOCK_OP_TYPE_BACKUP_SOURCE, errp)) {
595         return NULL;
596     }
597 
598     if (bdrv_op_is_blocked(target, BLOCK_OP_TYPE_BACKUP_TARGET, errp)) {
599         return NULL;
600     }
601 
602     if (sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
603         if (!sync_bitmap) {
604             error_setg(errp, "must provide a valid bitmap name for "
605                              "\"incremental\" sync mode");
606             return NULL;
607         }
608 
609         /* Create a new bitmap, and freeze/disable this one. */
610         if (bdrv_dirty_bitmap_create_successor(bs, sync_bitmap, errp) < 0) {
611             return NULL;
612         }
613     } else if (sync_bitmap) {
614         error_setg(errp,
615                    "a sync_bitmap was provided to backup_run, "
616                    "but received an incompatible sync_mode (%s)",
617                    MirrorSyncMode_lookup[sync_mode]);
618         return NULL;
619     }
620 
621     len = bdrv_getlength(bs);
622     if (len < 0) {
623         error_setg_errno(errp, -len, "unable to get length for '%s'",
624                          bdrv_get_device_name(bs));
625         goto error;
626     }
627 
628     /* job->common.len is fixed, so we can't allow resize */
629     job = block_job_create(job_id, &backup_job_driver, bs,
630                            BLK_PERM_CONSISTENT_READ,
631                            BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE |
632                            BLK_PERM_WRITE_UNCHANGED | BLK_PERM_GRAPH_MOD,
633                            speed, creation_flags, cb, opaque, errp);
634     if (!job) {
635         goto error;
636     }
637 
638     /* The target must match the source in size, so no resize here either */
639     job->target = blk_new(BLK_PERM_WRITE,
640                           BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE |
641                           BLK_PERM_WRITE_UNCHANGED | BLK_PERM_GRAPH_MOD);
642     ret = blk_insert_bs(job->target, target, errp);
643     if (ret < 0) {
644         goto error;
645     }
646 
647     job->on_source_error = on_source_error;
648     job->on_target_error = on_target_error;
649     job->sync_mode = sync_mode;
650     job->sync_bitmap = sync_mode == MIRROR_SYNC_MODE_INCREMENTAL ?
651                        sync_bitmap : NULL;
652     job->compress = compress;
653 
654     /* If there is no backing file on the target, we cannot rely on COW if our
655      * backup cluster size is smaller than the target cluster size. Even for
656      * targets with a backing file, try to avoid COW if possible. */
657     ret = bdrv_get_info(target, &bdi);
658     if (ret == -ENOTSUP && !target->backing) {
659         /* Cluster size is not defined */
660         error_report("WARNING: The target block device doesn't provide "
661                      "information about the block size and it doesn't have a "
662                      "backing file. The default block size of %u bytes is "
663                      "used. If the actual block size of the target exceeds "
664                      "this default, the backup may be unusable",
665                      BACKUP_CLUSTER_SIZE_DEFAULT);
666         job->cluster_size = BACKUP_CLUSTER_SIZE_DEFAULT;
667     } else if (ret < 0 && !target->backing) {
668         error_setg_errno(errp, -ret,
669             "Couldn't determine the cluster size of the target image, "
670             "which has no backing file");
671         error_append_hint(errp,
672             "Aborting, since this may create an unusable destination image\n");
673         goto error;
674     } else if (ret < 0 && target->backing) {
675         /* Not fatal; just trudge on ahead. */
676         job->cluster_size = BACKUP_CLUSTER_SIZE_DEFAULT;
677     } else {
678         job->cluster_size = MAX(BACKUP_CLUSTER_SIZE_DEFAULT, bdi.cluster_size);
679     }
680 
681     /* Required permissions are already taken with target's blk_new() */
682     block_job_add_bdrv(&job->common, "target", target, 0, BLK_PERM_ALL,
683                        &error_abort);
684     job->common.len = len;
685     block_job_txn_add_job(txn, &job->common);
686 
687     return &job->common;
688 
689  error:
690     if (sync_bitmap) {
691         bdrv_reclaim_dirty_bitmap(bs, sync_bitmap, NULL);
692     }
693     if (job) {
694         backup_clean(&job->common);
695         block_job_unref(&job->common);
696     }
697 
698     return NULL;
699 }
700