1 /* 2 * Image mirroring 3 * 4 * Copyright Red Hat, Inc. 2012 5 * 6 * Authors: 7 * Paolo Bonzini <pbonzini@redhat.com> 8 * 9 * This work is licensed under the terms of the GNU LGPL, version 2 or later. 10 * See the COPYING.LIB file in the top-level directory. 11 * 12 */ 13 14 #include "qemu/osdep.h" 15 #include "qemu/cutils.h" 16 #include "qemu/coroutine.h" 17 #include "qemu/range.h" 18 #include "trace.h" 19 #include "block/blockjob_int.h" 20 #include "block/block_int.h" 21 #include "sysemu/block-backend.h" 22 #include "qapi/error.h" 23 #include "qapi/qmp/qerror.h" 24 #include "qemu/ratelimit.h" 25 #include "qemu/bitmap.h" 26 27 #define MAX_IN_FLIGHT 16 28 #define MAX_IO_BYTES (1 << 20) /* 1 Mb */ 29 #define DEFAULT_MIRROR_BUF_SIZE (MAX_IN_FLIGHT * MAX_IO_BYTES) 30 31 /* The mirroring buffer is a list of granularity-sized chunks. 32 * Free chunks are organized in a list. 33 */ 34 typedef struct MirrorBuffer { 35 QSIMPLEQ_ENTRY(MirrorBuffer) next; 36 } MirrorBuffer; 37 38 typedef struct MirrorOp MirrorOp; 39 40 typedef struct MirrorBlockJob { 41 BlockJob common; 42 BlockBackend *target; 43 BlockDriverState *mirror_top_bs; 44 BlockDriverState *base; 45 46 /* The name of the graph node to replace */ 47 char *replaces; 48 /* The BDS to replace */ 49 BlockDriverState *to_replace; 50 /* Used to block operations on the drive-mirror-replace target */ 51 Error *replace_blocker; 52 bool is_none_mode; 53 BlockMirrorBackingMode backing_mode; 54 /* Whether the target image requires explicit zero-initialization */ 55 bool zero_target; 56 MirrorCopyMode copy_mode; 57 BlockdevOnError on_source_error, on_target_error; 58 bool synced; 59 /* Set when the target is synced (dirty bitmap is clean, nothing 60 * in flight) and the job is running in active mode */ 61 bool actively_synced; 62 bool should_complete; 63 int64_t granularity; 64 size_t buf_size; 65 int64_t bdev_length; 66 unsigned long *cow_bitmap; 67 BdrvDirtyBitmap *dirty_bitmap; 68 BdrvDirtyBitmapIter *dbi; 69 uint8_t *buf; 70 QSIMPLEQ_HEAD(, MirrorBuffer) buf_free; 71 int buf_free_count; 72 73 uint64_t last_pause_ns; 74 unsigned long *in_flight_bitmap; 75 int in_flight; 76 int64_t bytes_in_flight; 77 QTAILQ_HEAD(, MirrorOp) ops_in_flight; 78 int ret; 79 bool unmap; 80 int target_cluster_size; 81 int max_iov; 82 bool initial_zeroing_ongoing; 83 int in_active_write_counter; 84 bool prepared; 85 bool in_drain; 86 } MirrorBlockJob; 87 88 typedef struct MirrorBDSOpaque { 89 MirrorBlockJob *job; 90 bool stop; 91 } MirrorBDSOpaque; 92 93 struct MirrorOp { 94 MirrorBlockJob *s; 95 QEMUIOVector qiov; 96 int64_t offset; 97 uint64_t bytes; 98 99 /* The pointee is set by mirror_co_read(), mirror_co_zero(), and 100 * mirror_co_discard() before yielding for the first time */ 101 int64_t *bytes_handled; 102 103 bool is_pseudo_op; 104 bool is_active_write; 105 bool is_in_flight; 106 CoQueue waiting_requests; 107 Coroutine *co; 108 109 QTAILQ_ENTRY(MirrorOp) next; 110 }; 111 112 typedef enum MirrorMethod { 113 MIRROR_METHOD_COPY, 114 MIRROR_METHOD_ZERO, 115 MIRROR_METHOD_DISCARD, 116 } MirrorMethod; 117 118 static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, 119 int error) 120 { 121 s->synced = false; 122 s->actively_synced = false; 123 if (read) { 124 return block_job_error_action(&s->common, s->on_source_error, 125 true, error); 126 } else { 127 return block_job_error_action(&s->common, s->on_target_error, 128 false, error); 129 } 130 } 131 132 static void coroutine_fn mirror_wait_on_conflicts(MirrorOp *self, 133 MirrorBlockJob *s, 134 uint64_t offset, 135 uint64_t bytes) 136 { 137 uint64_t self_start_chunk = offset / s->granularity; 138 uint64_t self_end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity); 139 uint64_t self_nb_chunks = self_end_chunk - self_start_chunk; 140 141 while (find_next_bit(s->in_flight_bitmap, self_end_chunk, 142 self_start_chunk) < self_end_chunk && 143 s->ret >= 0) 144 { 145 MirrorOp *op; 146 147 QTAILQ_FOREACH(op, &s->ops_in_flight, next) { 148 uint64_t op_start_chunk = op->offset / s->granularity; 149 uint64_t op_nb_chunks = DIV_ROUND_UP(op->offset + op->bytes, 150 s->granularity) - 151 op_start_chunk; 152 153 if (op == self) { 154 continue; 155 } 156 157 if (ranges_overlap(self_start_chunk, self_nb_chunks, 158 op_start_chunk, op_nb_chunks)) 159 { 160 qemu_co_queue_wait(&op->waiting_requests, NULL); 161 break; 162 } 163 } 164 } 165 } 166 167 static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) 168 { 169 MirrorBlockJob *s = op->s; 170 struct iovec *iov; 171 int64_t chunk_num; 172 int i, nb_chunks; 173 174 trace_mirror_iteration_done(s, op->offset, op->bytes, ret); 175 176 s->in_flight--; 177 s->bytes_in_flight -= op->bytes; 178 iov = op->qiov.iov; 179 for (i = 0; i < op->qiov.niov; i++) { 180 MirrorBuffer *buf = (MirrorBuffer *) iov[i].iov_base; 181 QSIMPLEQ_INSERT_TAIL(&s->buf_free, buf, next); 182 s->buf_free_count++; 183 } 184 185 chunk_num = op->offset / s->granularity; 186 nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); 187 188 bitmap_clear(s->in_flight_bitmap, chunk_num, nb_chunks); 189 QTAILQ_REMOVE(&s->ops_in_flight, op, next); 190 if (ret >= 0) { 191 if (s->cow_bitmap) { 192 bitmap_set(s->cow_bitmap, chunk_num, nb_chunks); 193 } 194 if (!s->initial_zeroing_ongoing) { 195 job_progress_update(&s->common.job, op->bytes); 196 } 197 } 198 qemu_iovec_destroy(&op->qiov); 199 200 qemu_co_queue_restart_all(&op->waiting_requests); 201 g_free(op); 202 } 203 204 static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) 205 { 206 MirrorBlockJob *s = op->s; 207 208 if (ret < 0) { 209 BlockErrorAction action; 210 211 bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes); 212 action = mirror_error_action(s, false, -ret); 213 if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) { 214 s->ret = ret; 215 } 216 } 217 218 mirror_iteration_done(op, ret); 219 } 220 221 static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) 222 { 223 MirrorBlockJob *s = op->s; 224 225 if (ret < 0) { 226 BlockErrorAction action; 227 228 bdrv_set_dirty_bitmap(s->dirty_bitmap, op->offset, op->bytes); 229 action = mirror_error_action(s, true, -ret); 230 if (action == BLOCK_ERROR_ACTION_REPORT && s->ret >= 0) { 231 s->ret = ret; 232 } 233 234 mirror_iteration_done(op, ret); 235 return; 236 } 237 238 ret = blk_co_pwritev(s->target, op->offset, op->qiov.size, &op->qiov, 0); 239 mirror_write_complete(op, ret); 240 } 241 242 /* Clip bytes relative to offset to not exceed end-of-file */ 243 static inline int64_t mirror_clip_bytes(MirrorBlockJob *s, 244 int64_t offset, 245 int64_t bytes) 246 { 247 return MIN(bytes, s->bdev_length - offset); 248 } 249 250 /* Round offset and/or bytes to target cluster if COW is needed, and 251 * return the offset of the adjusted tail against original. */ 252 static int mirror_cow_align(MirrorBlockJob *s, int64_t *offset, 253 uint64_t *bytes) 254 { 255 bool need_cow; 256 int ret = 0; 257 int64_t align_offset = *offset; 258 int64_t align_bytes = *bytes; 259 int max_bytes = s->granularity * s->max_iov; 260 261 need_cow = !test_bit(*offset / s->granularity, s->cow_bitmap); 262 need_cow |= !test_bit((*offset + *bytes - 1) / s->granularity, 263 s->cow_bitmap); 264 if (need_cow) { 265 bdrv_round_to_clusters(blk_bs(s->target), *offset, *bytes, 266 &align_offset, &align_bytes); 267 } 268 269 if (align_bytes > max_bytes) { 270 align_bytes = max_bytes; 271 if (need_cow) { 272 align_bytes = QEMU_ALIGN_DOWN(align_bytes, s->target_cluster_size); 273 } 274 } 275 /* Clipping may result in align_bytes unaligned to chunk boundary, but 276 * that doesn't matter because it's already the end of source image. */ 277 align_bytes = mirror_clip_bytes(s, align_offset, align_bytes); 278 279 ret = align_offset + align_bytes - (*offset + *bytes); 280 *offset = align_offset; 281 *bytes = align_bytes; 282 assert(ret >= 0); 283 return ret; 284 } 285 286 static inline void coroutine_fn 287 mirror_wait_for_any_operation(MirrorBlockJob *s, bool active) 288 { 289 MirrorOp *op; 290 291 QTAILQ_FOREACH(op, &s->ops_in_flight, next) { 292 /* Do not wait on pseudo ops, because it may in turn wait on 293 * some other operation to start, which may in fact be the 294 * caller of this function. Since there is only one pseudo op 295 * at any given time, we will always find some real operation 296 * to wait on. */ 297 if (!op->is_pseudo_op && op->is_in_flight && 298 op->is_active_write == active) 299 { 300 qemu_co_queue_wait(&op->waiting_requests, NULL); 301 return; 302 } 303 } 304 abort(); 305 } 306 307 static inline void coroutine_fn 308 mirror_wait_for_free_in_flight_slot(MirrorBlockJob *s) 309 { 310 /* Only non-active operations use up in-flight slots */ 311 mirror_wait_for_any_operation(s, false); 312 } 313 314 /* Perform a mirror copy operation. 315 * 316 * *op->bytes_handled is set to the number of bytes copied after and 317 * including offset, excluding any bytes copied prior to offset due 318 * to alignment. This will be op->bytes if no alignment is necessary, 319 * or (new_end - op->offset) if the tail is rounded up or down due to 320 * alignment or buffer limit. 321 */ 322 static void coroutine_fn mirror_co_read(void *opaque) 323 { 324 MirrorOp *op = opaque; 325 MirrorBlockJob *s = op->s; 326 int nb_chunks; 327 uint64_t ret; 328 uint64_t max_bytes; 329 330 max_bytes = s->granularity * s->max_iov; 331 332 /* We can only handle as much as buf_size at a time. */ 333 op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); 334 assert(op->bytes); 335 assert(op->bytes < BDRV_REQUEST_MAX_BYTES); 336 *op->bytes_handled = op->bytes; 337 338 if (s->cow_bitmap) { 339 *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes); 340 } 341 /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ 342 assert(*op->bytes_handled <= UINT_MAX); 343 assert(op->bytes <= s->buf_size); 344 /* The offset is granularity-aligned because: 345 * 1) Caller passes in aligned values; 346 * 2) mirror_cow_align is used only when target cluster is larger. */ 347 assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); 348 /* The range is sector-aligned, since bdrv_getlength() rounds up. */ 349 assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); 350 nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); 351 352 while (s->buf_free_count < nb_chunks) { 353 trace_mirror_yield_in_flight(s, op->offset, s->in_flight); 354 mirror_wait_for_free_in_flight_slot(s); 355 } 356 357 /* Now make a QEMUIOVector taking enough granularity-sized chunks 358 * from s->buf_free. 359 */ 360 qemu_iovec_init(&op->qiov, nb_chunks); 361 while (nb_chunks-- > 0) { 362 MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); 363 size_t remaining = op->bytes - op->qiov.size; 364 365 QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); 366 s->buf_free_count--; 367 qemu_iovec_add(&op->qiov, buf, MIN(s->granularity, remaining)); 368 } 369 370 /* Copy the dirty cluster. */ 371 s->in_flight++; 372 s->bytes_in_flight += op->bytes; 373 op->is_in_flight = true; 374 trace_mirror_one_iteration(s, op->offset, op->bytes); 375 376 ret = bdrv_co_preadv(s->mirror_top_bs->backing, op->offset, op->bytes, 377 &op->qiov, 0); 378 mirror_read_complete(op, ret); 379 } 380 381 static void coroutine_fn mirror_co_zero(void *opaque) 382 { 383 MirrorOp *op = opaque; 384 int ret; 385 386 op->s->in_flight++; 387 op->s->bytes_in_flight += op->bytes; 388 *op->bytes_handled = op->bytes; 389 op->is_in_flight = true; 390 391 ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, 392 op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); 393 mirror_write_complete(op, ret); 394 } 395 396 static void coroutine_fn mirror_co_discard(void *opaque) 397 { 398 MirrorOp *op = opaque; 399 int ret; 400 401 op->s->in_flight++; 402 op->s->bytes_in_flight += op->bytes; 403 *op->bytes_handled = op->bytes; 404 op->is_in_flight = true; 405 406 ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); 407 mirror_write_complete(op, ret); 408 } 409 410 static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, 411 unsigned bytes, MirrorMethod mirror_method) 412 { 413 MirrorOp *op; 414 Coroutine *co; 415 int64_t bytes_handled = -1; 416 417 op = g_new(MirrorOp, 1); 418 *op = (MirrorOp){ 419 .s = s, 420 .offset = offset, 421 .bytes = bytes, 422 .bytes_handled = &bytes_handled, 423 }; 424 qemu_co_queue_init(&op->waiting_requests); 425 426 switch (mirror_method) { 427 case MIRROR_METHOD_COPY: 428 co = qemu_coroutine_create(mirror_co_read, op); 429 break; 430 case MIRROR_METHOD_ZERO: 431 co = qemu_coroutine_create(mirror_co_zero, op); 432 break; 433 case MIRROR_METHOD_DISCARD: 434 co = qemu_coroutine_create(mirror_co_discard, op); 435 break; 436 default: 437 abort(); 438 } 439 op->co = co; 440 441 QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next); 442 qemu_coroutine_enter(co); 443 /* At this point, ownership of op has been moved to the coroutine 444 * and the object may already be freed */ 445 446 /* Assert that this value has been set */ 447 assert(bytes_handled >= 0); 448 449 /* Same assertion as in mirror_co_read() (and for mirror_co_read() 450 * and mirror_co_discard(), bytes_handled == op->bytes, which 451 * is the @bytes parameter given to this function) */ 452 assert(bytes_handled <= UINT_MAX); 453 return bytes_handled; 454 } 455 456 static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) 457 { 458 BlockDriverState *source = s->mirror_top_bs->backing->bs; 459 MirrorOp *pseudo_op; 460 int64_t offset; 461 uint64_t delay_ns = 0, ret = 0; 462 /* At least the first dirty chunk is mirrored in one iteration. */ 463 int nb_chunks = 1; 464 bool write_zeroes_ok = bdrv_can_write_zeroes_with_unmap(blk_bs(s->target)); 465 int max_io_bytes = MAX(s->buf_size / MAX_IN_FLIGHT, MAX_IO_BYTES); 466 467 bdrv_dirty_bitmap_lock(s->dirty_bitmap); 468 offset = bdrv_dirty_iter_next(s->dbi); 469 if (offset < 0) { 470 bdrv_set_dirty_iter(s->dbi, 0); 471 offset = bdrv_dirty_iter_next(s->dbi); 472 trace_mirror_restart_iter(s, bdrv_get_dirty_count(s->dirty_bitmap)); 473 assert(offset >= 0); 474 } 475 bdrv_dirty_bitmap_unlock(s->dirty_bitmap); 476 477 mirror_wait_on_conflicts(NULL, s, offset, 1); 478 479 job_pause_point(&s->common.job); 480 481 /* Find the number of consective dirty chunks following the first dirty 482 * one, and wait for in flight requests in them. */ 483 bdrv_dirty_bitmap_lock(s->dirty_bitmap); 484 while (nb_chunks * s->granularity < s->buf_size) { 485 int64_t next_dirty; 486 int64_t next_offset = offset + nb_chunks * s->granularity; 487 int64_t next_chunk = next_offset / s->granularity; 488 if (next_offset >= s->bdev_length || 489 !bdrv_dirty_bitmap_get_locked(s->dirty_bitmap, next_offset)) { 490 break; 491 } 492 if (test_bit(next_chunk, s->in_flight_bitmap)) { 493 break; 494 } 495 496 next_dirty = bdrv_dirty_iter_next(s->dbi); 497 if (next_dirty > next_offset || next_dirty < 0) { 498 /* The bitmap iterator's cache is stale, refresh it */ 499 bdrv_set_dirty_iter(s->dbi, next_offset); 500 next_dirty = bdrv_dirty_iter_next(s->dbi); 501 } 502 assert(next_dirty == next_offset); 503 nb_chunks++; 504 } 505 506 /* Clear dirty bits before querying the block status, because 507 * calling bdrv_block_status_above could yield - if some blocks are 508 * marked dirty in this window, we need to know. 509 */ 510 bdrv_reset_dirty_bitmap_locked(s->dirty_bitmap, offset, 511 nb_chunks * s->granularity); 512 bdrv_dirty_bitmap_unlock(s->dirty_bitmap); 513 514 /* Before claiming an area in the in-flight bitmap, we have to 515 * create a MirrorOp for it so that conflicting requests can wait 516 * for it. mirror_perform() will create the real MirrorOps later, 517 * for now we just create a pseudo operation that will wake up all 518 * conflicting requests once all real operations have been 519 * launched. */ 520 pseudo_op = g_new(MirrorOp, 1); 521 *pseudo_op = (MirrorOp){ 522 .offset = offset, 523 .bytes = nb_chunks * s->granularity, 524 .is_pseudo_op = true, 525 }; 526 qemu_co_queue_init(&pseudo_op->waiting_requests); 527 QTAILQ_INSERT_TAIL(&s->ops_in_flight, pseudo_op, next); 528 529 bitmap_set(s->in_flight_bitmap, offset / s->granularity, nb_chunks); 530 while (nb_chunks > 0 && offset < s->bdev_length) { 531 int ret; 532 int64_t io_bytes; 533 int64_t io_bytes_acct; 534 MirrorMethod mirror_method = MIRROR_METHOD_COPY; 535 536 assert(!(offset % s->granularity)); 537 ret = bdrv_block_status_above(source, NULL, offset, 538 nb_chunks * s->granularity, 539 &io_bytes, NULL, NULL); 540 if (ret < 0) { 541 io_bytes = MIN(nb_chunks * s->granularity, max_io_bytes); 542 } else if (ret & BDRV_BLOCK_DATA) { 543 io_bytes = MIN(io_bytes, max_io_bytes); 544 } 545 546 io_bytes -= io_bytes % s->granularity; 547 if (io_bytes < s->granularity) { 548 io_bytes = s->granularity; 549 } else if (ret >= 0 && !(ret & BDRV_BLOCK_DATA)) { 550 int64_t target_offset; 551 int64_t target_bytes; 552 bdrv_round_to_clusters(blk_bs(s->target), offset, io_bytes, 553 &target_offset, &target_bytes); 554 if (target_offset == offset && 555 target_bytes == io_bytes) { 556 mirror_method = ret & BDRV_BLOCK_ZERO ? 557 MIRROR_METHOD_ZERO : 558 MIRROR_METHOD_DISCARD; 559 } 560 } 561 562 while (s->in_flight >= MAX_IN_FLIGHT) { 563 trace_mirror_yield_in_flight(s, offset, s->in_flight); 564 mirror_wait_for_free_in_flight_slot(s); 565 } 566 567 if (s->ret < 0) { 568 ret = 0; 569 goto fail; 570 } 571 572 io_bytes = mirror_clip_bytes(s, offset, io_bytes); 573 io_bytes = mirror_perform(s, offset, io_bytes, mirror_method); 574 if (mirror_method != MIRROR_METHOD_COPY && write_zeroes_ok) { 575 io_bytes_acct = 0; 576 } else { 577 io_bytes_acct = io_bytes; 578 } 579 assert(io_bytes); 580 offset += io_bytes; 581 nb_chunks -= DIV_ROUND_UP(io_bytes, s->granularity); 582 delay_ns = block_job_ratelimit_get_delay(&s->common, io_bytes_acct); 583 } 584 585 ret = delay_ns; 586 fail: 587 QTAILQ_REMOVE(&s->ops_in_flight, pseudo_op, next); 588 qemu_co_queue_restart_all(&pseudo_op->waiting_requests); 589 g_free(pseudo_op); 590 591 return ret; 592 } 593 594 static void mirror_free_init(MirrorBlockJob *s) 595 { 596 int granularity = s->granularity; 597 size_t buf_size = s->buf_size; 598 uint8_t *buf = s->buf; 599 600 assert(s->buf_free_count == 0); 601 QSIMPLEQ_INIT(&s->buf_free); 602 while (buf_size != 0) { 603 MirrorBuffer *cur = (MirrorBuffer *)buf; 604 QSIMPLEQ_INSERT_TAIL(&s->buf_free, cur, next); 605 s->buf_free_count++; 606 buf_size -= granularity; 607 buf += granularity; 608 } 609 } 610 611 /* This is also used for the .pause callback. There is no matching 612 * mirror_resume() because mirror_run() will begin iterating again 613 * when the job is resumed. 614 */ 615 static void coroutine_fn mirror_wait_for_all_io(MirrorBlockJob *s) 616 { 617 while (s->in_flight > 0) { 618 mirror_wait_for_free_in_flight_slot(s); 619 } 620 } 621 622 /** 623 * mirror_exit_common: handle both abort() and prepare() cases. 624 * for .prepare, returns 0 on success and -errno on failure. 625 * for .abort cases, denoted by abort = true, MUST return 0. 626 */ 627 static int mirror_exit_common(Job *job) 628 { 629 MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); 630 BlockJob *bjob = &s->common; 631 MirrorBDSOpaque *bs_opaque; 632 AioContext *replace_aio_context = NULL; 633 BlockDriverState *src; 634 BlockDriverState *target_bs; 635 BlockDriverState *mirror_top_bs; 636 Error *local_err = NULL; 637 bool abort = job->ret < 0; 638 int ret = 0; 639 640 if (s->prepared) { 641 return 0; 642 } 643 s->prepared = true; 644 645 mirror_top_bs = s->mirror_top_bs; 646 bs_opaque = mirror_top_bs->opaque; 647 src = mirror_top_bs->backing->bs; 648 target_bs = blk_bs(s->target); 649 650 if (bdrv_chain_contains(src, target_bs)) { 651 bdrv_unfreeze_backing_chain(mirror_top_bs, target_bs); 652 } 653 654 bdrv_release_dirty_bitmap(s->dirty_bitmap); 655 656 /* Make sure that the source BDS doesn't go away during bdrv_replace_node, 657 * before we can call bdrv_drained_end */ 658 bdrv_ref(src); 659 bdrv_ref(mirror_top_bs); 660 bdrv_ref(target_bs); 661 662 /* 663 * Remove target parent that still uses BLK_PERM_WRITE/RESIZE before 664 * inserting target_bs at s->to_replace, where we might not be able to get 665 * these permissions. 666 */ 667 blk_unref(s->target); 668 s->target = NULL; 669 670 /* We don't access the source any more. Dropping any WRITE/RESIZE is 671 * required before it could become a backing file of target_bs. Not having 672 * these permissions any more means that we can't allow any new requests on 673 * mirror_top_bs from now on, so keep it drained. */ 674 bdrv_drained_begin(mirror_top_bs); 675 bs_opaque->stop = true; 676 bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing, 677 &error_abort); 678 if (!abort && s->backing_mode == MIRROR_SOURCE_BACKING_CHAIN) { 679 BlockDriverState *backing = s->is_none_mode ? src : s->base; 680 if (backing_bs(target_bs) != backing) { 681 bdrv_set_backing_hd(target_bs, backing, &local_err); 682 if (local_err) { 683 error_report_err(local_err); 684 local_err = NULL; 685 ret = -EPERM; 686 } 687 } 688 } 689 690 if (s->to_replace) { 691 replace_aio_context = bdrv_get_aio_context(s->to_replace); 692 aio_context_acquire(replace_aio_context); 693 } 694 695 if (s->should_complete && !abort) { 696 BlockDriverState *to_replace = s->to_replace ?: src; 697 bool ro = bdrv_is_read_only(to_replace); 698 699 if (ro != bdrv_is_read_only(target_bs)) { 700 bdrv_reopen_set_read_only(target_bs, ro, NULL); 701 } 702 703 /* The mirror job has no requests in flight any more, but we need to 704 * drain potential other users of the BDS before changing the graph. */ 705 assert(s->in_drain); 706 bdrv_drained_begin(target_bs); 707 /* 708 * Cannot use check_to_replace_node() here, because that would 709 * check for an op blocker on @to_replace, and we have our own 710 * there. 711 */ 712 if (bdrv_recurse_can_replace(src, to_replace)) { 713 bdrv_replace_node(to_replace, target_bs, &local_err); 714 } else { 715 error_setg(&local_err, "Can no longer replace '%s' by '%s', " 716 "because it can no longer be guaranteed that doing so " 717 "would not lead to an abrupt change of visible data", 718 to_replace->node_name, target_bs->node_name); 719 } 720 bdrv_drained_end(target_bs); 721 if (local_err) { 722 error_report_err(local_err); 723 ret = -EPERM; 724 } 725 } 726 if (s->to_replace) { 727 bdrv_op_unblock_all(s->to_replace, s->replace_blocker); 728 error_free(s->replace_blocker); 729 bdrv_unref(s->to_replace); 730 } 731 if (replace_aio_context) { 732 aio_context_release(replace_aio_context); 733 } 734 g_free(s->replaces); 735 bdrv_unref(target_bs); 736 737 /* 738 * Remove the mirror filter driver from the graph. Before this, get rid of 739 * the blockers on the intermediate nodes so that the resulting state is 740 * valid. 741 */ 742 block_job_remove_all_bdrv(bjob); 743 bdrv_replace_node(mirror_top_bs, backing_bs(mirror_top_bs), &error_abort); 744 745 /* We just changed the BDS the job BB refers to (with either or both of the 746 * bdrv_replace_node() calls), so switch the BB back so the cleanup does 747 * the right thing. We don't need any permissions any more now. */ 748 blk_remove_bs(bjob->blk); 749 blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort); 750 blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort); 751 752 bs_opaque->job = NULL; 753 754 bdrv_drained_end(src); 755 bdrv_drained_end(mirror_top_bs); 756 s->in_drain = false; 757 bdrv_unref(mirror_top_bs); 758 bdrv_unref(src); 759 760 return ret; 761 } 762 763 static int mirror_prepare(Job *job) 764 { 765 return mirror_exit_common(job); 766 } 767 768 static void mirror_abort(Job *job) 769 { 770 int ret = mirror_exit_common(job); 771 assert(ret == 0); 772 } 773 774 static void coroutine_fn mirror_throttle(MirrorBlockJob *s) 775 { 776 int64_t now = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 777 778 if (now - s->last_pause_ns > BLOCK_JOB_SLICE_TIME) { 779 s->last_pause_ns = now; 780 job_sleep_ns(&s->common.job, 0); 781 } else { 782 job_pause_point(&s->common.job); 783 } 784 } 785 786 static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s) 787 { 788 int64_t offset; 789 BlockDriverState *base = s->base; 790 BlockDriverState *bs = s->mirror_top_bs->backing->bs; 791 BlockDriverState *target_bs = blk_bs(s->target); 792 int ret; 793 int64_t count; 794 795 if (s->zero_target) { 796 if (!bdrv_can_write_zeroes_with_unmap(target_bs)) { 797 bdrv_set_dirty_bitmap(s->dirty_bitmap, 0, s->bdev_length); 798 return 0; 799 } 800 801 s->initial_zeroing_ongoing = true; 802 for (offset = 0; offset < s->bdev_length; ) { 803 int bytes = MIN(s->bdev_length - offset, 804 QEMU_ALIGN_DOWN(INT_MAX, s->granularity)); 805 806 mirror_throttle(s); 807 808 if (job_is_cancelled(&s->common.job)) { 809 s->initial_zeroing_ongoing = false; 810 return 0; 811 } 812 813 if (s->in_flight >= MAX_IN_FLIGHT) { 814 trace_mirror_yield(s, UINT64_MAX, s->buf_free_count, 815 s->in_flight); 816 mirror_wait_for_free_in_flight_slot(s); 817 continue; 818 } 819 820 mirror_perform(s, offset, bytes, MIRROR_METHOD_ZERO); 821 offset += bytes; 822 } 823 824 mirror_wait_for_all_io(s); 825 s->initial_zeroing_ongoing = false; 826 } 827 828 /* First part, loop on the sectors and initialize the dirty bitmap. */ 829 for (offset = 0; offset < s->bdev_length; ) { 830 /* Just to make sure we are not exceeding int limit. */ 831 int bytes = MIN(s->bdev_length - offset, 832 QEMU_ALIGN_DOWN(INT_MAX, s->granularity)); 833 834 mirror_throttle(s); 835 836 if (job_is_cancelled(&s->common.job)) { 837 return 0; 838 } 839 840 ret = bdrv_is_allocated_above(bs, base, false, offset, bytes, &count); 841 if (ret < 0) { 842 return ret; 843 } 844 845 assert(count); 846 if (ret == 1) { 847 bdrv_set_dirty_bitmap(s->dirty_bitmap, offset, count); 848 } 849 offset += count; 850 } 851 return 0; 852 } 853 854 /* Called when going out of the streaming phase to flush the bulk of the 855 * data to the medium, or just before completing. 856 */ 857 static int mirror_flush(MirrorBlockJob *s) 858 { 859 int ret = blk_flush(s->target); 860 if (ret < 0) { 861 if (mirror_error_action(s, false, -ret) == BLOCK_ERROR_ACTION_REPORT) { 862 s->ret = ret; 863 } 864 } 865 return ret; 866 } 867 868 static int coroutine_fn mirror_run(Job *job, Error **errp) 869 { 870 MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); 871 BlockDriverState *bs = s->mirror_top_bs->backing->bs; 872 BlockDriverState *target_bs = blk_bs(s->target); 873 bool need_drain = true; 874 int64_t length; 875 BlockDriverInfo bdi; 876 char backing_filename[2]; /* we only need 2 characters because we are only 877 checking for a NULL string */ 878 int ret = 0; 879 880 if (job_is_cancelled(&s->common.job)) { 881 goto immediate_exit; 882 } 883 884 s->bdev_length = bdrv_getlength(bs); 885 if (s->bdev_length < 0) { 886 ret = s->bdev_length; 887 goto immediate_exit; 888 } 889 890 /* Active commit must resize the base image if its size differs from the 891 * active layer. */ 892 if (s->base == blk_bs(s->target)) { 893 int64_t base_length; 894 895 base_length = blk_getlength(s->target); 896 if (base_length < 0) { 897 ret = base_length; 898 goto immediate_exit; 899 } 900 901 if (s->bdev_length > base_length) { 902 ret = blk_truncate(s->target, s->bdev_length, false, 903 PREALLOC_MODE_OFF, NULL); 904 if (ret < 0) { 905 goto immediate_exit; 906 } 907 } 908 } 909 910 if (s->bdev_length == 0) { 911 /* Transition to the READY state and wait for complete. */ 912 job_transition_to_ready(&s->common.job); 913 s->synced = true; 914 s->actively_synced = true; 915 while (!job_is_cancelled(&s->common.job) && !s->should_complete) { 916 job_yield(&s->common.job); 917 } 918 s->common.job.cancelled = false; 919 goto immediate_exit; 920 } 921 922 length = DIV_ROUND_UP(s->bdev_length, s->granularity); 923 s->in_flight_bitmap = bitmap_new(length); 924 925 /* If we have no backing file yet in the destination, we cannot let 926 * the destination do COW. Instead, we copy sectors around the 927 * dirty data if needed. We need a bitmap to do that. 928 */ 929 bdrv_get_backing_filename(target_bs, backing_filename, 930 sizeof(backing_filename)); 931 if (!bdrv_get_info(target_bs, &bdi) && bdi.cluster_size) { 932 s->target_cluster_size = bdi.cluster_size; 933 } else { 934 s->target_cluster_size = BDRV_SECTOR_SIZE; 935 } 936 if (backing_filename[0] && !target_bs->backing && 937 s->granularity < s->target_cluster_size) { 938 s->buf_size = MAX(s->buf_size, s->target_cluster_size); 939 s->cow_bitmap = bitmap_new(length); 940 } 941 s->max_iov = MIN(bs->bl.max_iov, target_bs->bl.max_iov); 942 943 s->buf = qemu_try_blockalign(bs, s->buf_size); 944 if (s->buf == NULL) { 945 ret = -ENOMEM; 946 goto immediate_exit; 947 } 948 949 mirror_free_init(s); 950 951 s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 952 if (!s->is_none_mode) { 953 ret = mirror_dirty_init(s); 954 if (ret < 0 || job_is_cancelled(&s->common.job)) { 955 goto immediate_exit; 956 } 957 } 958 959 assert(!s->dbi); 960 s->dbi = bdrv_dirty_iter_new(s->dirty_bitmap); 961 for (;;) { 962 uint64_t delay_ns = 0; 963 int64_t cnt, delta; 964 bool should_complete; 965 966 /* Do not start passive operations while there are active 967 * writes in progress */ 968 while (s->in_active_write_counter) { 969 mirror_wait_for_any_operation(s, true); 970 } 971 972 if (s->ret < 0) { 973 ret = s->ret; 974 goto immediate_exit; 975 } 976 977 job_pause_point(&s->common.job); 978 979 cnt = bdrv_get_dirty_count(s->dirty_bitmap); 980 /* cnt is the number of dirty bytes remaining and s->bytes_in_flight is 981 * the number of bytes currently being processed; together those are 982 * the current remaining operation length */ 983 job_progress_set_remaining(&s->common.job, s->bytes_in_flight + cnt); 984 985 /* Note that even when no rate limit is applied we need to yield 986 * periodically with no pending I/O so that bdrv_drain_all() returns. 987 * We do so every BLKOCK_JOB_SLICE_TIME nanoseconds, or when there is 988 * an error, or when the source is clean, whichever comes first. */ 989 delta = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - s->last_pause_ns; 990 if (delta < BLOCK_JOB_SLICE_TIME && 991 s->common.iostatus == BLOCK_DEVICE_IO_STATUS_OK) { 992 if (s->in_flight >= MAX_IN_FLIGHT || s->buf_free_count == 0 || 993 (cnt == 0 && s->in_flight > 0)) { 994 trace_mirror_yield(s, cnt, s->buf_free_count, s->in_flight); 995 mirror_wait_for_free_in_flight_slot(s); 996 continue; 997 } else if (cnt != 0) { 998 delay_ns = mirror_iteration(s); 999 } 1000 } 1001 1002 should_complete = false; 1003 if (s->in_flight == 0 && cnt == 0) { 1004 trace_mirror_before_flush(s); 1005 if (!s->synced) { 1006 if (mirror_flush(s) < 0) { 1007 /* Go check s->ret. */ 1008 continue; 1009 } 1010 /* We're out of the streaming phase. From now on, if the job 1011 * is cancelled we will actually complete all pending I/O and 1012 * report completion. This way, block-job-cancel will leave 1013 * the target in a consistent state. 1014 */ 1015 job_transition_to_ready(&s->common.job); 1016 s->synced = true; 1017 if (s->copy_mode != MIRROR_COPY_MODE_BACKGROUND) { 1018 s->actively_synced = true; 1019 } 1020 } 1021 1022 should_complete = s->should_complete || 1023 job_is_cancelled(&s->common.job); 1024 cnt = bdrv_get_dirty_count(s->dirty_bitmap); 1025 } 1026 1027 if (cnt == 0 && should_complete) { 1028 /* The dirty bitmap is not updated while operations are pending. 1029 * If we're about to exit, wait for pending operations before 1030 * calling bdrv_get_dirty_count(bs), or we may exit while the 1031 * source has dirty data to copy! 1032 * 1033 * Note that I/O can be submitted by the guest while 1034 * mirror_populate runs, so pause it now. Before deciding 1035 * whether to switch to target check one last time if I/O has 1036 * come in the meanwhile, and if not flush the data to disk. 1037 */ 1038 trace_mirror_before_drain(s, cnt); 1039 1040 s->in_drain = true; 1041 bdrv_drained_begin(bs); 1042 cnt = bdrv_get_dirty_count(s->dirty_bitmap); 1043 if (cnt > 0 || mirror_flush(s) < 0) { 1044 bdrv_drained_end(bs); 1045 s->in_drain = false; 1046 continue; 1047 } 1048 1049 /* The two disks are in sync. Exit and report successful 1050 * completion. 1051 */ 1052 assert(QLIST_EMPTY(&bs->tracked_requests)); 1053 s->common.job.cancelled = false; 1054 need_drain = false; 1055 break; 1056 } 1057 1058 ret = 0; 1059 1060 if (s->synced && !should_complete) { 1061 delay_ns = (s->in_flight == 0 && 1062 cnt == 0 ? BLOCK_JOB_SLICE_TIME : 0); 1063 } 1064 trace_mirror_before_sleep(s, cnt, s->synced, delay_ns); 1065 job_sleep_ns(&s->common.job, delay_ns); 1066 if (job_is_cancelled(&s->common.job) && 1067 (!s->synced || s->common.job.force_cancel)) 1068 { 1069 break; 1070 } 1071 s->last_pause_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME); 1072 } 1073 1074 immediate_exit: 1075 if (s->in_flight > 0) { 1076 /* We get here only if something went wrong. Either the job failed, 1077 * or it was cancelled prematurely so that we do not guarantee that 1078 * the target is a copy of the source. 1079 */ 1080 assert(ret < 0 || ((s->common.job.force_cancel || !s->synced) && 1081 job_is_cancelled(&s->common.job))); 1082 assert(need_drain); 1083 mirror_wait_for_all_io(s); 1084 } 1085 1086 assert(s->in_flight == 0); 1087 qemu_vfree(s->buf); 1088 g_free(s->cow_bitmap); 1089 g_free(s->in_flight_bitmap); 1090 bdrv_dirty_iter_free(s->dbi); 1091 1092 if (need_drain) { 1093 s->in_drain = true; 1094 bdrv_drained_begin(bs); 1095 } 1096 1097 return ret; 1098 } 1099 1100 static void mirror_complete(Job *job, Error **errp) 1101 { 1102 MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); 1103 BlockDriverState *target; 1104 1105 target = blk_bs(s->target); 1106 1107 if (!s->synced) { 1108 error_setg(errp, "The active block job '%s' cannot be completed", 1109 job->id); 1110 return; 1111 } 1112 1113 if (s->backing_mode == MIRROR_OPEN_BACKING_CHAIN) { 1114 int ret; 1115 1116 assert(!target->backing); 1117 ret = bdrv_open_backing_file(target, NULL, "backing", errp); 1118 if (ret < 0) { 1119 return; 1120 } 1121 } 1122 1123 /* block all operations on to_replace bs */ 1124 if (s->replaces) { 1125 AioContext *replace_aio_context; 1126 1127 s->to_replace = bdrv_find_node(s->replaces); 1128 if (!s->to_replace) { 1129 error_setg(errp, "Node name '%s' not found", s->replaces); 1130 return; 1131 } 1132 1133 replace_aio_context = bdrv_get_aio_context(s->to_replace); 1134 aio_context_acquire(replace_aio_context); 1135 1136 /* TODO Translate this into permission system. Current definition of 1137 * GRAPH_MOD would require to request it for the parents; they might 1138 * not even be BlockDriverStates, however, so a BdrvChild can't address 1139 * them. May need redefinition of GRAPH_MOD. */ 1140 error_setg(&s->replace_blocker, 1141 "block device is in use by block-job-complete"); 1142 bdrv_op_block_all(s->to_replace, s->replace_blocker); 1143 bdrv_ref(s->to_replace); 1144 1145 aio_context_release(replace_aio_context); 1146 } 1147 1148 s->should_complete = true; 1149 job_enter(job); 1150 } 1151 1152 static void coroutine_fn mirror_pause(Job *job) 1153 { 1154 MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job); 1155 1156 mirror_wait_for_all_io(s); 1157 } 1158 1159 static bool mirror_drained_poll(BlockJob *job) 1160 { 1161 MirrorBlockJob *s = container_of(job, MirrorBlockJob, common); 1162 1163 /* If the job isn't paused nor cancelled, we can't be sure that it won't 1164 * issue more requests. We make an exception if we've reached this point 1165 * from one of our own drain sections, to avoid a deadlock waiting for 1166 * ourselves. 1167 */ 1168 if (!s->common.job.paused && !s->common.job.cancelled && !s->in_drain) { 1169 return true; 1170 } 1171 1172 return !!s->in_flight; 1173 } 1174 1175 static const BlockJobDriver mirror_job_driver = { 1176 .job_driver = { 1177 .instance_size = sizeof(MirrorBlockJob), 1178 .job_type = JOB_TYPE_MIRROR, 1179 .free = block_job_free, 1180 .user_resume = block_job_user_resume, 1181 .run = mirror_run, 1182 .prepare = mirror_prepare, 1183 .abort = mirror_abort, 1184 .pause = mirror_pause, 1185 .complete = mirror_complete, 1186 }, 1187 .drained_poll = mirror_drained_poll, 1188 }; 1189 1190 static const BlockJobDriver commit_active_job_driver = { 1191 .job_driver = { 1192 .instance_size = sizeof(MirrorBlockJob), 1193 .job_type = JOB_TYPE_COMMIT, 1194 .free = block_job_free, 1195 .user_resume = block_job_user_resume, 1196 .run = mirror_run, 1197 .prepare = mirror_prepare, 1198 .abort = mirror_abort, 1199 .pause = mirror_pause, 1200 .complete = mirror_complete, 1201 }, 1202 .drained_poll = mirror_drained_poll, 1203 }; 1204 1205 static void coroutine_fn 1206 do_sync_target_write(MirrorBlockJob *job, MirrorMethod method, 1207 uint64_t offset, uint64_t bytes, 1208 QEMUIOVector *qiov, int flags) 1209 { 1210 int ret; 1211 size_t qiov_offset = 0; 1212 int64_t bitmap_offset, bitmap_end; 1213 1214 if (!QEMU_IS_ALIGNED(offset, job->granularity) && 1215 bdrv_dirty_bitmap_get(job->dirty_bitmap, offset)) 1216 { 1217 /* 1218 * Dirty unaligned padding: ignore it. 1219 * 1220 * Reasoning: 1221 * 1. If we copy it, we can't reset corresponding bit in 1222 * dirty_bitmap as there may be some "dirty" bytes still not 1223 * copied. 1224 * 2. It's already dirty, so skipping it we don't diverge mirror 1225 * progress. 1226 * 1227 * Note, that because of this, guest write may have no contribution 1228 * into mirror converge, but that's not bad, as we have background 1229 * process of mirroring. If under some bad circumstances (high guest 1230 * IO load) background process starve, we will not converge anyway, 1231 * even if each write will contribute, as guest is not guaranteed to 1232 * rewrite the whole disk. 1233 */ 1234 qiov_offset = QEMU_ALIGN_UP(offset, job->granularity) - offset; 1235 if (bytes <= qiov_offset) { 1236 /* nothing to do after shrink */ 1237 return; 1238 } 1239 offset += qiov_offset; 1240 bytes -= qiov_offset; 1241 } 1242 1243 if (!QEMU_IS_ALIGNED(offset + bytes, job->granularity) && 1244 bdrv_dirty_bitmap_get(job->dirty_bitmap, offset + bytes - 1)) 1245 { 1246 uint64_t tail = (offset + bytes) % job->granularity; 1247 1248 if (bytes <= tail) { 1249 /* nothing to do after shrink */ 1250 return; 1251 } 1252 bytes -= tail; 1253 } 1254 1255 /* 1256 * Tails are either clean or shrunk, so for bitmap resetting 1257 * we safely align the range down. 1258 */ 1259 bitmap_offset = QEMU_ALIGN_UP(offset, job->granularity); 1260 bitmap_end = QEMU_ALIGN_DOWN(offset + bytes, job->granularity); 1261 if (bitmap_offset < bitmap_end) { 1262 bdrv_reset_dirty_bitmap(job->dirty_bitmap, bitmap_offset, 1263 bitmap_end - bitmap_offset); 1264 } 1265 1266 job_progress_increase_remaining(&job->common.job, bytes); 1267 1268 switch (method) { 1269 case MIRROR_METHOD_COPY: 1270 ret = blk_co_pwritev_part(job->target, offset, bytes, 1271 qiov, qiov_offset, flags); 1272 break; 1273 1274 case MIRROR_METHOD_ZERO: 1275 assert(!qiov); 1276 ret = blk_co_pwrite_zeroes(job->target, offset, bytes, flags); 1277 break; 1278 1279 case MIRROR_METHOD_DISCARD: 1280 assert(!qiov); 1281 ret = blk_co_pdiscard(job->target, offset, bytes); 1282 break; 1283 1284 default: 1285 abort(); 1286 } 1287 1288 if (ret >= 0) { 1289 job_progress_update(&job->common.job, bytes); 1290 } else { 1291 BlockErrorAction action; 1292 1293 /* 1294 * We failed, so we should mark dirty the whole area, aligned up. 1295 * Note that we don't care about shrunk tails if any: they were dirty 1296 * at function start, and they must be still dirty, as we've locked 1297 * the region for in-flight op. 1298 */ 1299 bitmap_offset = QEMU_ALIGN_DOWN(offset, job->granularity); 1300 bitmap_end = QEMU_ALIGN_UP(offset + bytes, job->granularity); 1301 bdrv_set_dirty_bitmap(job->dirty_bitmap, bitmap_offset, 1302 bitmap_end - bitmap_offset); 1303 job->actively_synced = false; 1304 1305 action = mirror_error_action(job, false, -ret); 1306 if (action == BLOCK_ERROR_ACTION_REPORT) { 1307 if (!job->ret) { 1308 job->ret = ret; 1309 } 1310 } 1311 } 1312 } 1313 1314 static MirrorOp *coroutine_fn active_write_prepare(MirrorBlockJob *s, 1315 uint64_t offset, 1316 uint64_t bytes) 1317 { 1318 MirrorOp *op; 1319 uint64_t start_chunk = offset / s->granularity; 1320 uint64_t end_chunk = DIV_ROUND_UP(offset + bytes, s->granularity); 1321 1322 op = g_new(MirrorOp, 1); 1323 *op = (MirrorOp){ 1324 .s = s, 1325 .offset = offset, 1326 .bytes = bytes, 1327 .is_active_write = true, 1328 .is_in_flight = true, 1329 }; 1330 qemu_co_queue_init(&op->waiting_requests); 1331 QTAILQ_INSERT_TAIL(&s->ops_in_flight, op, next); 1332 1333 s->in_active_write_counter++; 1334 1335 mirror_wait_on_conflicts(op, s, offset, bytes); 1336 1337 bitmap_set(s->in_flight_bitmap, start_chunk, end_chunk - start_chunk); 1338 1339 return op; 1340 } 1341 1342 static void coroutine_fn active_write_settle(MirrorOp *op) 1343 { 1344 uint64_t start_chunk = op->offset / op->s->granularity; 1345 uint64_t end_chunk = DIV_ROUND_UP(op->offset + op->bytes, 1346 op->s->granularity); 1347 1348 if (!--op->s->in_active_write_counter && op->s->actively_synced) { 1349 BdrvChild *source = op->s->mirror_top_bs->backing; 1350 1351 if (QLIST_FIRST(&source->bs->parents) == source && 1352 QLIST_NEXT(source, next_parent) == NULL) 1353 { 1354 /* Assert that we are back in sync once all active write 1355 * operations are settled. 1356 * Note that we can only assert this if the mirror node 1357 * is the source node's only parent. */ 1358 assert(!bdrv_get_dirty_count(op->s->dirty_bitmap)); 1359 } 1360 } 1361 bitmap_clear(op->s->in_flight_bitmap, start_chunk, end_chunk - start_chunk); 1362 QTAILQ_REMOVE(&op->s->ops_in_flight, op, next); 1363 qemu_co_queue_restart_all(&op->waiting_requests); 1364 g_free(op); 1365 } 1366 1367 static int coroutine_fn bdrv_mirror_top_preadv(BlockDriverState *bs, 1368 uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) 1369 { 1370 return bdrv_co_preadv(bs->backing, offset, bytes, qiov, flags); 1371 } 1372 1373 static int coroutine_fn bdrv_mirror_top_do_write(BlockDriverState *bs, 1374 MirrorMethod method, uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, 1375 int flags) 1376 { 1377 MirrorOp *op = NULL; 1378 MirrorBDSOpaque *s = bs->opaque; 1379 int ret = 0; 1380 bool copy_to_target; 1381 1382 copy_to_target = s->job->ret >= 0 && 1383 s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING; 1384 1385 if (copy_to_target) { 1386 op = active_write_prepare(s->job, offset, bytes); 1387 } 1388 1389 switch (method) { 1390 case MIRROR_METHOD_COPY: 1391 ret = bdrv_co_pwritev(bs->backing, offset, bytes, qiov, flags); 1392 break; 1393 1394 case MIRROR_METHOD_ZERO: 1395 ret = bdrv_co_pwrite_zeroes(bs->backing, offset, bytes, flags); 1396 break; 1397 1398 case MIRROR_METHOD_DISCARD: 1399 ret = bdrv_co_pdiscard(bs->backing, offset, bytes); 1400 break; 1401 1402 default: 1403 abort(); 1404 } 1405 1406 if (ret < 0) { 1407 goto out; 1408 } 1409 1410 if (copy_to_target) { 1411 do_sync_target_write(s->job, method, offset, bytes, qiov, flags); 1412 } 1413 1414 out: 1415 if (copy_to_target) { 1416 active_write_settle(op); 1417 } 1418 return ret; 1419 } 1420 1421 static int coroutine_fn bdrv_mirror_top_pwritev(BlockDriverState *bs, 1422 uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags) 1423 { 1424 MirrorBDSOpaque *s = bs->opaque; 1425 QEMUIOVector bounce_qiov; 1426 void *bounce_buf; 1427 int ret = 0; 1428 bool copy_to_target; 1429 1430 copy_to_target = s->job->ret >= 0 && 1431 s->job->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING; 1432 1433 if (copy_to_target) { 1434 /* The guest might concurrently modify the data to write; but 1435 * the data on source and destination must match, so we have 1436 * to use a bounce buffer if we are going to write to the 1437 * target now. */ 1438 bounce_buf = qemu_blockalign(bs, bytes); 1439 iov_to_buf_full(qiov->iov, qiov->niov, 0, bounce_buf, bytes); 1440 1441 qemu_iovec_init(&bounce_qiov, 1); 1442 qemu_iovec_add(&bounce_qiov, bounce_buf, bytes); 1443 qiov = &bounce_qiov; 1444 } 1445 1446 ret = bdrv_mirror_top_do_write(bs, MIRROR_METHOD_COPY, offset, bytes, qiov, 1447 flags); 1448 1449 if (copy_to_target) { 1450 qemu_iovec_destroy(&bounce_qiov); 1451 qemu_vfree(bounce_buf); 1452 } 1453 1454 return ret; 1455 } 1456 1457 static int coroutine_fn bdrv_mirror_top_flush(BlockDriverState *bs) 1458 { 1459 if (bs->backing == NULL) { 1460 /* we can be here after failed bdrv_append in mirror_start_job */ 1461 return 0; 1462 } 1463 return bdrv_co_flush(bs->backing->bs); 1464 } 1465 1466 static int coroutine_fn bdrv_mirror_top_pwrite_zeroes(BlockDriverState *bs, 1467 int64_t offset, int bytes, BdrvRequestFlags flags) 1468 { 1469 return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_ZERO, offset, bytes, NULL, 1470 flags); 1471 } 1472 1473 static int coroutine_fn bdrv_mirror_top_pdiscard(BlockDriverState *bs, 1474 int64_t offset, int bytes) 1475 { 1476 return bdrv_mirror_top_do_write(bs, MIRROR_METHOD_DISCARD, offset, bytes, 1477 NULL, 0); 1478 } 1479 1480 static void bdrv_mirror_top_refresh_filename(BlockDriverState *bs) 1481 { 1482 if (bs->backing == NULL) { 1483 /* we can be here after failed bdrv_attach_child in 1484 * bdrv_set_backing_hd */ 1485 return; 1486 } 1487 pstrcpy(bs->exact_filename, sizeof(bs->exact_filename), 1488 bs->backing->bs->filename); 1489 } 1490 1491 static void bdrv_mirror_top_child_perm(BlockDriverState *bs, BdrvChild *c, 1492 const BdrvChildRole *role, 1493 BlockReopenQueue *reopen_queue, 1494 uint64_t perm, uint64_t shared, 1495 uint64_t *nperm, uint64_t *nshared) 1496 { 1497 MirrorBDSOpaque *s = bs->opaque; 1498 1499 if (s->stop) { 1500 /* 1501 * If the job is to be stopped, we do not need to forward 1502 * anything to the real image. 1503 */ 1504 *nperm = 0; 1505 *nshared = BLK_PERM_ALL; 1506 return; 1507 } 1508 1509 /* Must be able to forward guest writes to the real image */ 1510 *nperm = 0; 1511 if (perm & BLK_PERM_WRITE) { 1512 *nperm |= BLK_PERM_WRITE; 1513 } 1514 1515 *nshared = BLK_PERM_ALL; 1516 } 1517 1518 /* Dummy node that provides consistent read to its users without requiring it 1519 * from its backing file and that allows writes on the backing file chain. */ 1520 static BlockDriver bdrv_mirror_top = { 1521 .format_name = "mirror_top", 1522 .bdrv_co_preadv = bdrv_mirror_top_preadv, 1523 .bdrv_co_pwritev = bdrv_mirror_top_pwritev, 1524 .bdrv_co_pwrite_zeroes = bdrv_mirror_top_pwrite_zeroes, 1525 .bdrv_co_pdiscard = bdrv_mirror_top_pdiscard, 1526 .bdrv_co_flush = bdrv_mirror_top_flush, 1527 .bdrv_co_block_status = bdrv_co_block_status_from_backing, 1528 .bdrv_refresh_filename = bdrv_mirror_top_refresh_filename, 1529 .bdrv_child_perm = bdrv_mirror_top_child_perm, 1530 }; 1531 1532 static BlockJob *mirror_start_job( 1533 const char *job_id, BlockDriverState *bs, 1534 int creation_flags, BlockDriverState *target, 1535 const char *replaces, int64_t speed, 1536 uint32_t granularity, int64_t buf_size, 1537 BlockMirrorBackingMode backing_mode, 1538 bool zero_target, 1539 BlockdevOnError on_source_error, 1540 BlockdevOnError on_target_error, 1541 bool unmap, 1542 BlockCompletionFunc *cb, 1543 void *opaque, 1544 const BlockJobDriver *driver, 1545 bool is_none_mode, BlockDriverState *base, 1546 bool auto_complete, const char *filter_node_name, 1547 bool is_mirror, MirrorCopyMode copy_mode, 1548 Error **errp) 1549 { 1550 MirrorBlockJob *s; 1551 MirrorBDSOpaque *bs_opaque; 1552 BlockDriverState *mirror_top_bs; 1553 bool target_graph_mod; 1554 bool target_is_backing; 1555 Error *local_err = NULL; 1556 int ret; 1557 1558 if (granularity == 0) { 1559 granularity = bdrv_get_default_bitmap_granularity(target); 1560 } 1561 1562 assert(is_power_of_2(granularity)); 1563 1564 if (buf_size < 0) { 1565 error_setg(errp, "Invalid parameter 'buf-size'"); 1566 return NULL; 1567 } 1568 1569 if (buf_size == 0) { 1570 buf_size = DEFAULT_MIRROR_BUF_SIZE; 1571 } 1572 1573 if (bs == target) { 1574 error_setg(errp, "Can't mirror node into itself"); 1575 return NULL; 1576 } 1577 1578 /* In the case of active commit, add dummy driver to provide consistent 1579 * reads on the top, while disabling it in the intermediate nodes, and make 1580 * the backing chain writable. */ 1581 mirror_top_bs = bdrv_new_open_driver(&bdrv_mirror_top, filter_node_name, 1582 BDRV_O_RDWR, errp); 1583 if (mirror_top_bs == NULL) { 1584 return NULL; 1585 } 1586 if (!filter_node_name) { 1587 mirror_top_bs->implicit = true; 1588 } 1589 1590 /* So that we can always drop this node */ 1591 mirror_top_bs->never_freeze = true; 1592 1593 mirror_top_bs->total_sectors = bs->total_sectors; 1594 mirror_top_bs->supported_write_flags = BDRV_REQ_WRITE_UNCHANGED; 1595 mirror_top_bs->supported_zero_flags = BDRV_REQ_WRITE_UNCHANGED | 1596 BDRV_REQ_NO_FALLBACK; 1597 bs_opaque = g_new0(MirrorBDSOpaque, 1); 1598 mirror_top_bs->opaque = bs_opaque; 1599 1600 /* bdrv_append takes ownership of the mirror_top_bs reference, need to keep 1601 * it alive until block_job_create() succeeds even if bs has no parent. */ 1602 bdrv_ref(mirror_top_bs); 1603 bdrv_drained_begin(bs); 1604 bdrv_append(mirror_top_bs, bs, &local_err); 1605 bdrv_drained_end(bs); 1606 1607 if (local_err) { 1608 bdrv_unref(mirror_top_bs); 1609 error_propagate(errp, local_err); 1610 return NULL; 1611 } 1612 1613 /* Make sure that the source is not resized while the job is running */ 1614 s = block_job_create(job_id, driver, NULL, mirror_top_bs, 1615 BLK_PERM_CONSISTENT_READ, 1616 BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED | 1617 BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD, speed, 1618 creation_flags, cb, opaque, errp); 1619 if (!s) { 1620 goto fail; 1621 } 1622 bs_opaque->job = s; 1623 1624 /* The block job now has a reference to this node */ 1625 bdrv_unref(mirror_top_bs); 1626 1627 s->mirror_top_bs = mirror_top_bs; 1628 1629 /* No resize for the target either; while the mirror is still running, a 1630 * consistent read isn't necessarily possible. We could possibly allow 1631 * writes and graph modifications, though it would likely defeat the 1632 * purpose of a mirror, so leave them blocked for now. 1633 * 1634 * In the case of active commit, things look a bit different, though, 1635 * because the target is an already populated backing file in active use. 1636 * We can allow anything except resize there.*/ 1637 target_is_backing = bdrv_chain_contains(bs, target); 1638 target_graph_mod = (backing_mode != MIRROR_LEAVE_BACKING_CHAIN); 1639 s->target = blk_new(s->common.job.aio_context, 1640 BLK_PERM_WRITE | BLK_PERM_RESIZE | 1641 (target_graph_mod ? BLK_PERM_GRAPH_MOD : 0), 1642 BLK_PERM_WRITE_UNCHANGED | 1643 (target_is_backing ? BLK_PERM_CONSISTENT_READ | 1644 BLK_PERM_WRITE | 1645 BLK_PERM_GRAPH_MOD : 0)); 1646 ret = blk_insert_bs(s->target, target, errp); 1647 if (ret < 0) { 1648 goto fail; 1649 } 1650 if (is_mirror) { 1651 /* XXX: Mirror target could be a NBD server of target QEMU in the case 1652 * of non-shared block migration. To allow migration completion, we 1653 * have to allow "inactivate" of the target BB. When that happens, we 1654 * know the job is drained, and the vcpus are stopped, so no write 1655 * operation will be performed. Block layer already has assertions to 1656 * ensure that. */ 1657 blk_set_force_allow_inactivate(s->target); 1658 } 1659 blk_set_allow_aio_context_change(s->target, true); 1660 blk_set_disable_request_queuing(s->target, true); 1661 1662 s->replaces = g_strdup(replaces); 1663 s->on_source_error = on_source_error; 1664 s->on_target_error = on_target_error; 1665 s->is_none_mode = is_none_mode; 1666 s->backing_mode = backing_mode; 1667 s->zero_target = zero_target; 1668 s->copy_mode = copy_mode; 1669 s->base = base; 1670 s->granularity = granularity; 1671 s->buf_size = ROUND_UP(buf_size, granularity); 1672 s->unmap = unmap; 1673 if (auto_complete) { 1674 s->should_complete = true; 1675 } 1676 1677 s->dirty_bitmap = bdrv_create_dirty_bitmap(bs, granularity, NULL, errp); 1678 if (!s->dirty_bitmap) { 1679 goto fail; 1680 } 1681 if (s->copy_mode == MIRROR_COPY_MODE_WRITE_BLOCKING) { 1682 bdrv_disable_dirty_bitmap(s->dirty_bitmap); 1683 } 1684 1685 ret = block_job_add_bdrv(&s->common, "source", bs, 0, 1686 BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE | 1687 BLK_PERM_CONSISTENT_READ, 1688 errp); 1689 if (ret < 0) { 1690 goto fail; 1691 } 1692 1693 /* Required permissions are already taken with blk_new() */ 1694 block_job_add_bdrv(&s->common, "target", target, 0, BLK_PERM_ALL, 1695 &error_abort); 1696 1697 /* In commit_active_start() all intermediate nodes disappear, so 1698 * any jobs in them must be blocked */ 1699 if (target_is_backing) { 1700 BlockDriverState *iter; 1701 for (iter = backing_bs(bs); iter != target; iter = backing_bs(iter)) { 1702 /* XXX BLK_PERM_WRITE needs to be allowed so we don't block 1703 * ourselves at s->base (if writes are blocked for a node, they are 1704 * also blocked for its backing file). The other options would be a 1705 * second filter driver above s->base (== target). */ 1706 ret = block_job_add_bdrv(&s->common, "intermediate node", iter, 0, 1707 BLK_PERM_WRITE_UNCHANGED | BLK_PERM_WRITE, 1708 errp); 1709 if (ret < 0) { 1710 goto fail; 1711 } 1712 } 1713 1714 if (bdrv_freeze_backing_chain(mirror_top_bs, target, errp) < 0) { 1715 goto fail; 1716 } 1717 } 1718 1719 QTAILQ_INIT(&s->ops_in_flight); 1720 1721 trace_mirror_start(bs, s, opaque); 1722 job_start(&s->common.job); 1723 1724 return &s->common; 1725 1726 fail: 1727 if (s) { 1728 /* Make sure this BDS does not go away until we have completed the graph 1729 * changes below */ 1730 bdrv_ref(mirror_top_bs); 1731 1732 g_free(s->replaces); 1733 blk_unref(s->target); 1734 bs_opaque->job = NULL; 1735 if (s->dirty_bitmap) { 1736 bdrv_release_dirty_bitmap(s->dirty_bitmap); 1737 } 1738 job_early_fail(&s->common.job); 1739 } 1740 1741 bs_opaque->stop = true; 1742 bdrv_child_refresh_perms(mirror_top_bs, mirror_top_bs->backing, 1743 &error_abort); 1744 bdrv_replace_node(mirror_top_bs, backing_bs(mirror_top_bs), &error_abort); 1745 1746 bdrv_unref(mirror_top_bs); 1747 1748 return NULL; 1749 } 1750 1751 void mirror_start(const char *job_id, BlockDriverState *bs, 1752 BlockDriverState *target, const char *replaces, 1753 int creation_flags, int64_t speed, 1754 uint32_t granularity, int64_t buf_size, 1755 MirrorSyncMode mode, BlockMirrorBackingMode backing_mode, 1756 bool zero_target, 1757 BlockdevOnError on_source_error, 1758 BlockdevOnError on_target_error, 1759 bool unmap, const char *filter_node_name, 1760 MirrorCopyMode copy_mode, Error **errp) 1761 { 1762 bool is_none_mode; 1763 BlockDriverState *base; 1764 1765 if ((mode == MIRROR_SYNC_MODE_INCREMENTAL) || 1766 (mode == MIRROR_SYNC_MODE_BITMAP)) { 1767 error_setg(errp, "Sync mode '%s' not supported", 1768 MirrorSyncMode_str(mode)); 1769 return; 1770 } 1771 is_none_mode = mode == MIRROR_SYNC_MODE_NONE; 1772 base = mode == MIRROR_SYNC_MODE_TOP ? backing_bs(bs) : NULL; 1773 mirror_start_job(job_id, bs, creation_flags, target, replaces, 1774 speed, granularity, buf_size, backing_mode, zero_target, 1775 on_source_error, on_target_error, unmap, NULL, NULL, 1776 &mirror_job_driver, is_none_mode, base, false, 1777 filter_node_name, true, copy_mode, errp); 1778 } 1779 1780 BlockJob *commit_active_start(const char *job_id, BlockDriverState *bs, 1781 BlockDriverState *base, int creation_flags, 1782 int64_t speed, BlockdevOnError on_error, 1783 const char *filter_node_name, 1784 BlockCompletionFunc *cb, void *opaque, 1785 bool auto_complete, Error **errp) 1786 { 1787 bool base_read_only; 1788 Error *local_err = NULL; 1789 BlockJob *ret; 1790 1791 base_read_only = bdrv_is_read_only(base); 1792 1793 if (base_read_only) { 1794 if (bdrv_reopen_set_read_only(base, false, errp) < 0) { 1795 return NULL; 1796 } 1797 } 1798 1799 ret = mirror_start_job( 1800 job_id, bs, creation_flags, base, NULL, speed, 0, 0, 1801 MIRROR_LEAVE_BACKING_CHAIN, false, 1802 on_error, on_error, true, cb, opaque, 1803 &commit_active_job_driver, false, base, auto_complete, 1804 filter_node_name, false, MIRROR_COPY_MODE_BACKGROUND, 1805 &local_err); 1806 if (local_err) { 1807 error_propagate(errp, local_err); 1808 goto error_restore_flags; 1809 } 1810 1811 return ret; 1812 1813 error_restore_flags: 1814 /* ignore error and errp for bdrv_reopen, because we want to propagate 1815 * the original error */ 1816 if (base_read_only) { 1817 bdrv_reopen_set_read_only(base, true, NULL); 1818 } 1819 return NULL; 1820 } 1821