1 /* 2 * block_copy API 3 * 4 * Copyright (C) 2013 Proxmox Server Solutions 5 * Copyright (c) 2019 Virtuozzo International GmbH. 6 * 7 * Authors: 8 * Dietmar Maurer (dietmar@proxmox.com) 9 * Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or later. 12 * See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 17 #include "trace.h" 18 #include "qapi/error.h" 19 #include "block/block-copy.h" 20 #include "block/reqlist.h" 21 #include "sysemu/block-backend.h" 22 #include "qemu/units.h" 23 #include "qemu/coroutine.h" 24 #include "block/aio_task.h" 25 #include "qemu/error-report.h" 26 #include "qemu/memalign.h" 27 28 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB) 29 #define BLOCK_COPY_MAX_BUFFER (1 * MiB) 30 #define BLOCK_COPY_MAX_MEM (128 * MiB) 31 #define BLOCK_COPY_MAX_WORKERS 64 32 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */ 33 #define BLOCK_COPY_CLUSTER_SIZE_DEFAULT (1 << 16) 34 35 typedef enum { 36 COPY_READ_WRITE_CLUSTER, 37 COPY_READ_WRITE, 38 COPY_WRITE_ZEROES, 39 COPY_RANGE_SMALL, 40 COPY_RANGE_FULL 41 } BlockCopyMethod; 42 43 static coroutine_fn int block_copy_task_entry(AioTask *task); 44 45 typedef struct BlockCopyCallState { 46 /* Fields initialized in block_copy_async() and never changed. */ 47 BlockCopyState *s; 48 int64_t offset; 49 int64_t bytes; 50 int max_workers; 51 int64_t max_chunk; 52 bool ignore_ratelimit; 53 BlockCopyAsyncCallbackFunc cb; 54 void *cb_opaque; 55 /* Coroutine where async block-copy is running */ 56 Coroutine *co; 57 58 /* Fields whose state changes throughout the execution */ 59 bool finished; /* atomic */ 60 QemuCoSleep sleep; /* TODO: protect API with a lock */ 61 bool cancelled; /* atomic */ 62 /* To reference all call states from BlockCopyState */ 63 QLIST_ENTRY(BlockCopyCallState) list; 64 65 /* 66 * Fields that report information about return values and erros. 67 * Protected by lock in BlockCopyState. 68 */ 69 bool error_is_read; 70 /* 71 * @ret is set concurrently by tasks under mutex. Only set once by first 72 * failed task (and untouched if no task failed). 73 * After finishing (call_state->finished is true), it is not modified 74 * anymore and may be safely read without mutex. 75 */ 76 int ret; 77 } BlockCopyCallState; 78 79 typedef struct BlockCopyTask { 80 AioTask task; 81 82 /* 83 * Fields initialized in block_copy_task_create() 84 * and never changed. 85 */ 86 BlockCopyState *s; 87 BlockCopyCallState *call_state; 88 /* 89 * @method can also be set again in the while loop of 90 * block_copy_dirty_clusters(), but it is never accessed concurrently 91 * because the only other function that reads it is 92 * block_copy_task_entry() and it is invoked afterwards in the same 93 * iteration. 94 */ 95 BlockCopyMethod method; 96 97 /* 98 * Generally, req is protected by lock in BlockCopyState, Still req.offset 99 * is only set on task creation, so may be read concurrently after creation. 100 * req.bytes is changed at most once, and need only protecting the case of 101 * parallel read while updating @bytes value in block_copy_task_shrink(). 102 */ 103 BlockReq req; 104 } BlockCopyTask; 105 106 static int64_t task_end(BlockCopyTask *task) 107 { 108 return task->req.offset + task->req.bytes; 109 } 110 111 typedef struct BlockCopyState { 112 /* 113 * BdrvChild objects are not owned or managed by block-copy. They are 114 * provided by block-copy user and user is responsible for appropriate 115 * permissions on these children. 116 */ 117 BdrvChild *source; 118 BdrvChild *target; 119 120 /* 121 * Fields initialized in block_copy_state_new() 122 * and never changed. 123 */ 124 int64_t cluster_size; 125 int64_t max_transfer; 126 uint64_t len; 127 BdrvRequestFlags write_flags; 128 129 /* 130 * Fields whose state changes throughout the execution 131 * Protected by lock. 132 */ 133 CoMutex lock; 134 int64_t in_flight_bytes; 135 BlockCopyMethod method; 136 BlockReqList reqs; 137 QLIST_HEAD(, BlockCopyCallState) calls; 138 /* 139 * skip_unallocated: 140 * 141 * Used by sync=top jobs, which first scan the source node for unallocated 142 * areas and clear them in the copy_bitmap. During this process, the bitmap 143 * is thus not fully initialized: It may still have bits set for areas that 144 * are unallocated and should actually not be copied. 145 * 146 * This is indicated by skip_unallocated. 147 * 148 * In this case, block_copy() will query the source’s allocation status, 149 * skip unallocated regions, clear them in the copy_bitmap, and invoke 150 * block_copy_reset_unallocated() every time it does. 151 */ 152 bool skip_unallocated; /* atomic */ 153 /* State fields that use a thread-safe API */ 154 BdrvDirtyBitmap *copy_bitmap; 155 ProgressMeter *progress; 156 SharedResource *mem; 157 RateLimit rate_limit; 158 } BlockCopyState; 159 160 /* Called with lock held */ 161 static int64_t block_copy_chunk_size(BlockCopyState *s) 162 { 163 switch (s->method) { 164 case COPY_READ_WRITE_CLUSTER: 165 return s->cluster_size; 166 case COPY_READ_WRITE: 167 case COPY_RANGE_SMALL: 168 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER), 169 s->max_transfer); 170 case COPY_RANGE_FULL: 171 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE), 172 s->max_transfer); 173 default: 174 /* Cannot have COPY_WRITE_ZEROES here. */ 175 abort(); 176 } 177 } 178 179 /* 180 * Search for the first dirty area in offset/bytes range and create task at 181 * the beginning of it. 182 */ 183 static coroutine_fn BlockCopyTask * 184 block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state, 185 int64_t offset, int64_t bytes) 186 { 187 BlockCopyTask *task; 188 int64_t max_chunk; 189 190 QEMU_LOCK_GUARD(&s->lock); 191 max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk); 192 if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap, 193 offset, offset + bytes, 194 max_chunk, &offset, &bytes)) 195 { 196 return NULL; 197 } 198 199 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 200 bytes = QEMU_ALIGN_UP(bytes, s->cluster_size); 201 202 /* region is dirty, so no existent tasks possible in it */ 203 assert(!reqlist_find_conflict(&s->reqs, offset, bytes)); 204 205 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); 206 s->in_flight_bytes += bytes; 207 208 task = g_new(BlockCopyTask, 1); 209 *task = (BlockCopyTask) { 210 .task.func = block_copy_task_entry, 211 .s = s, 212 .call_state = call_state, 213 .method = s->method, 214 }; 215 reqlist_init_req(&s->reqs, &task->req, offset, bytes); 216 217 return task; 218 } 219 220 /* 221 * block_copy_task_shrink 222 * 223 * Drop the tail of the task to be handled later. Set dirty bits back and 224 * wake up all tasks waiting for us (may be some of them are not intersecting 225 * with shrunk task) 226 */ 227 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task, 228 int64_t new_bytes) 229 { 230 QEMU_LOCK_GUARD(&task->s->lock); 231 if (new_bytes == task->req.bytes) { 232 return; 233 } 234 235 assert(new_bytes > 0 && new_bytes < task->req.bytes); 236 237 task->s->in_flight_bytes -= task->req.bytes - new_bytes; 238 bdrv_set_dirty_bitmap(task->s->copy_bitmap, 239 task->req.offset + new_bytes, 240 task->req.bytes - new_bytes); 241 242 reqlist_shrink_req(&task->req, new_bytes); 243 } 244 245 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret) 246 { 247 QEMU_LOCK_GUARD(&task->s->lock); 248 task->s->in_flight_bytes -= task->req.bytes; 249 if (ret < 0) { 250 bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->req.offset, 251 task->req.bytes); 252 } 253 if (task->s->progress) { 254 progress_set_remaining(task->s->progress, 255 bdrv_get_dirty_count(task->s->copy_bitmap) + 256 task->s->in_flight_bytes); 257 } 258 reqlist_remove_req(&task->req); 259 } 260 261 void block_copy_state_free(BlockCopyState *s) 262 { 263 if (!s) { 264 return; 265 } 266 267 ratelimit_destroy(&s->rate_limit); 268 bdrv_release_dirty_bitmap(s->copy_bitmap); 269 shres_destroy(s->mem); 270 g_free(s); 271 } 272 273 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target) 274 { 275 return MIN_NON_ZERO(INT_MAX, 276 MIN_NON_ZERO(source->bs->bl.max_transfer, 277 target->bs->bl.max_transfer)); 278 } 279 280 void block_copy_set_copy_opts(BlockCopyState *s, bool use_copy_range, 281 bool compress) 282 { 283 /* Keep BDRV_REQ_SERIALISING set (or not set) in block_copy_state_new() */ 284 s->write_flags = (s->write_flags & BDRV_REQ_SERIALISING) | 285 (compress ? BDRV_REQ_WRITE_COMPRESSED : 0); 286 287 if (s->max_transfer < s->cluster_size) { 288 /* 289 * copy_range does not respect max_transfer. We don't want to bother 290 * with requests smaller than block-copy cluster size, so fallback to 291 * buffered copying (read and write respect max_transfer on their 292 * behalf). 293 */ 294 s->method = COPY_READ_WRITE_CLUSTER; 295 } else if (compress) { 296 /* Compression supports only cluster-size writes and no copy-range. */ 297 s->method = COPY_READ_WRITE_CLUSTER; 298 } else { 299 /* 300 * If copy range enabled, start with COPY_RANGE_SMALL, until first 301 * successful copy_range (look at block_copy_do_copy). 302 */ 303 s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE; 304 } 305 } 306 307 static int64_t block_copy_calculate_cluster_size(BlockDriverState *target, 308 Error **errp) 309 { 310 int ret; 311 BlockDriverInfo bdi; 312 bool target_does_cow = bdrv_backing_chain_next(target); 313 314 /* 315 * If there is no backing file on the target, we cannot rely on COW if our 316 * backup cluster size is smaller than the target cluster size. Even for 317 * targets with a backing file, try to avoid COW if possible. 318 */ 319 ret = bdrv_get_info(target, &bdi); 320 if (ret == -ENOTSUP && !target_does_cow) { 321 /* Cluster size is not defined */ 322 warn_report("The target block device doesn't provide " 323 "information about the block size and it doesn't have a " 324 "backing file. The default block size of %u bytes is " 325 "used. If the actual block size of the target exceeds " 326 "this default, the backup may be unusable", 327 BLOCK_COPY_CLUSTER_SIZE_DEFAULT); 328 return BLOCK_COPY_CLUSTER_SIZE_DEFAULT; 329 } else if (ret < 0 && !target_does_cow) { 330 error_setg_errno(errp, -ret, 331 "Couldn't determine the cluster size of the target image, " 332 "which has no backing file"); 333 error_append_hint(errp, 334 "Aborting, since this may create an unusable destination image\n"); 335 return ret; 336 } else if (ret < 0 && target_does_cow) { 337 /* Not fatal; just trudge on ahead. */ 338 return BLOCK_COPY_CLUSTER_SIZE_DEFAULT; 339 } 340 341 return MAX(BLOCK_COPY_CLUSTER_SIZE_DEFAULT, bdi.cluster_size); 342 } 343 344 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target, 345 const BdrvDirtyBitmap *bitmap, 346 Error **errp) 347 { 348 ERRP_GUARD(); 349 BlockCopyState *s; 350 int64_t cluster_size; 351 BdrvDirtyBitmap *copy_bitmap; 352 bool is_fleecing; 353 354 cluster_size = block_copy_calculate_cluster_size(target->bs, errp); 355 if (cluster_size < 0) { 356 return NULL; 357 } 358 359 copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL, 360 errp); 361 if (!copy_bitmap) { 362 return NULL; 363 } 364 bdrv_disable_dirty_bitmap(copy_bitmap); 365 if (bitmap) { 366 if (!bdrv_merge_dirty_bitmap(copy_bitmap, bitmap, NULL, errp)) { 367 error_prepend(errp, "Failed to merge bitmap '%s' to internal " 368 "copy-bitmap: ", bdrv_dirty_bitmap_name(bitmap)); 369 bdrv_release_dirty_bitmap(copy_bitmap); 370 return NULL; 371 } 372 } else { 373 bdrv_set_dirty_bitmap(copy_bitmap, 0, 374 bdrv_dirty_bitmap_size(copy_bitmap)); 375 } 376 377 /* 378 * If source is in backing chain of target assume that target is going to be 379 * used for "image fleecing", i.e. it should represent a kind of snapshot of 380 * source at backup-start point in time. And target is going to be read by 381 * somebody (for example, used as NBD export) during backup job. 382 * 383 * In this case, we need to add BDRV_REQ_SERIALISING write flag to avoid 384 * intersection of backup writes and third party reads from target, 385 * otherwise reading from target we may occasionally read already updated by 386 * guest data. 387 * 388 * For more information see commit f8d59dfb40bb and test 389 * tests/qemu-iotests/222 390 */ 391 is_fleecing = bdrv_chain_contains(target->bs, source->bs); 392 393 s = g_new(BlockCopyState, 1); 394 *s = (BlockCopyState) { 395 .source = source, 396 .target = target, 397 .copy_bitmap = copy_bitmap, 398 .cluster_size = cluster_size, 399 .len = bdrv_dirty_bitmap_size(copy_bitmap), 400 .write_flags = (is_fleecing ? BDRV_REQ_SERIALISING : 0), 401 .mem = shres_create(BLOCK_COPY_MAX_MEM), 402 .max_transfer = QEMU_ALIGN_DOWN( 403 block_copy_max_transfer(source, target), 404 cluster_size), 405 }; 406 407 block_copy_set_copy_opts(s, false, false); 408 409 ratelimit_init(&s->rate_limit); 410 qemu_co_mutex_init(&s->lock); 411 QLIST_INIT(&s->reqs); 412 QLIST_INIT(&s->calls); 413 414 return s; 415 } 416 417 /* Only set before running the job, no need for locking. */ 418 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm) 419 { 420 s->progress = pm; 421 } 422 423 /* 424 * Takes ownership of @task 425 * 426 * If pool is NULL directly run the task, otherwise schedule it into the pool. 427 * 428 * Returns: task.func return code if pool is NULL 429 * otherwise -ECANCELED if pool status is bad 430 * otherwise 0 (successfully scheduled) 431 */ 432 static coroutine_fn int block_copy_task_run(AioTaskPool *pool, 433 BlockCopyTask *task) 434 { 435 if (!pool) { 436 int ret = task->task.func(&task->task); 437 438 g_free(task); 439 return ret; 440 } 441 442 aio_task_pool_wait_slot(pool); 443 if (aio_task_pool_status(pool) < 0) { 444 co_put_to_shres(task->s->mem, task->req.bytes); 445 block_copy_task_end(task, -ECANCELED); 446 g_free(task); 447 return -ECANCELED; 448 } 449 450 aio_task_pool_start_task(pool, &task->task); 451 452 return 0; 453 } 454 455 /* 456 * block_copy_do_copy 457 * 458 * Do copy of cluster-aligned chunk. Requested region is allowed to exceed 459 * s->len only to cover last cluster when s->len is not aligned to clusters. 460 * 461 * No sync here: nor bitmap neighter intersecting requests handling, only copy. 462 * 463 * @method is an in-out argument, so that copy_range can be either extended to 464 * a full-size buffer or disabled if the copy_range attempt fails. The output 465 * value of @method should be used for subsequent tasks. 466 * Returns 0 on success. 467 */ 468 static int coroutine_fn block_copy_do_copy(BlockCopyState *s, 469 int64_t offset, int64_t bytes, 470 BlockCopyMethod *method, 471 bool *error_is_read) 472 { 473 int ret; 474 int64_t nbytes = MIN(offset + bytes, s->len) - offset; 475 void *bounce_buffer = NULL; 476 477 assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes); 478 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 479 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); 480 assert(offset < s->len); 481 assert(offset + bytes <= s->len || 482 offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size)); 483 assert(nbytes < INT_MAX); 484 485 switch (*method) { 486 case COPY_WRITE_ZEROES: 487 ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags & 488 ~BDRV_REQ_WRITE_COMPRESSED); 489 if (ret < 0) { 490 trace_block_copy_write_zeroes_fail(s, offset, ret); 491 *error_is_read = false; 492 } 493 return ret; 494 495 case COPY_RANGE_SMALL: 496 case COPY_RANGE_FULL: 497 ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes, 498 0, s->write_flags); 499 if (ret >= 0) { 500 /* Successful copy-range, increase chunk size. */ 501 *method = COPY_RANGE_FULL; 502 return 0; 503 } 504 505 trace_block_copy_copy_range_fail(s, offset, ret); 506 *method = COPY_READ_WRITE; 507 /* Fall through to read+write with allocated buffer */ 508 509 case COPY_READ_WRITE_CLUSTER: 510 case COPY_READ_WRITE: 511 /* 512 * In case of failed copy_range request above, we may proceed with 513 * buffered request larger than BLOCK_COPY_MAX_BUFFER. 514 * Still, further requests will be properly limited, so don't care too 515 * much. Moreover the most likely case (copy_range is unsupported for 516 * the configuration, so the very first copy_range request fails) 517 * is handled by setting large copy_size only after first successful 518 * copy_range. 519 */ 520 521 bounce_buffer = qemu_blockalign(s->source->bs, nbytes); 522 523 ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0); 524 if (ret < 0) { 525 trace_block_copy_read_fail(s, offset, ret); 526 *error_is_read = true; 527 goto out; 528 } 529 530 ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer, 531 s->write_flags); 532 if (ret < 0) { 533 trace_block_copy_write_fail(s, offset, ret); 534 *error_is_read = false; 535 goto out; 536 } 537 538 out: 539 qemu_vfree(bounce_buffer); 540 break; 541 542 default: 543 abort(); 544 } 545 546 return ret; 547 } 548 549 static coroutine_fn int block_copy_task_entry(AioTask *task) 550 { 551 BlockCopyTask *t = container_of(task, BlockCopyTask, task); 552 BlockCopyState *s = t->s; 553 bool error_is_read = false; 554 BlockCopyMethod method = t->method; 555 int ret; 556 557 ret = block_copy_do_copy(s, t->req.offset, t->req.bytes, &method, 558 &error_is_read); 559 560 WITH_QEMU_LOCK_GUARD(&s->lock) { 561 if (s->method == t->method) { 562 s->method = method; 563 } 564 565 if (ret < 0) { 566 if (!t->call_state->ret) { 567 t->call_state->ret = ret; 568 t->call_state->error_is_read = error_is_read; 569 } 570 } else if (s->progress) { 571 progress_work_done(s->progress, t->req.bytes); 572 } 573 } 574 co_put_to_shres(s->mem, t->req.bytes); 575 block_copy_task_end(t, ret); 576 577 return ret; 578 } 579 580 static coroutine_fn int block_copy_block_status(BlockCopyState *s, 581 int64_t offset, 582 int64_t bytes, int64_t *pnum) 583 { 584 int64_t num; 585 BlockDriverState *base; 586 int ret; 587 588 if (qatomic_read(&s->skip_unallocated)) { 589 base = bdrv_backing_chain_next(s->source->bs); 590 } else { 591 base = NULL; 592 } 593 594 ret = bdrv_co_block_status_above(s->source->bs, base, offset, bytes, &num, 595 NULL, NULL); 596 if (ret < 0 || num < s->cluster_size) { 597 /* 598 * On error or if failed to obtain large enough chunk just fallback to 599 * copy one cluster. 600 */ 601 num = s->cluster_size; 602 ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA; 603 } else if (offset + num == s->len) { 604 num = QEMU_ALIGN_UP(num, s->cluster_size); 605 } else { 606 num = QEMU_ALIGN_DOWN(num, s->cluster_size); 607 } 608 609 *pnum = num; 610 return ret; 611 } 612 613 /* 614 * Check if the cluster starting at offset is allocated or not. 615 * return via pnum the number of contiguous clusters sharing this allocation. 616 */ 617 static int coroutine_fn block_copy_is_cluster_allocated(BlockCopyState *s, 618 int64_t offset, 619 int64_t *pnum) 620 { 621 BlockDriverState *bs = s->source->bs; 622 int64_t count, total_count = 0; 623 int64_t bytes = s->len - offset; 624 int ret; 625 626 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 627 628 while (true) { 629 ret = bdrv_co_is_allocated(bs, offset, bytes, &count); 630 if (ret < 0) { 631 return ret; 632 } 633 634 total_count += count; 635 636 if (ret || count == 0) { 637 /* 638 * ret: partial segment(s) are considered allocated. 639 * otherwise: unallocated tail is treated as an entire segment. 640 */ 641 *pnum = DIV_ROUND_UP(total_count, s->cluster_size); 642 return ret; 643 } 644 645 /* Unallocated segment(s) with uncertain following segment(s) */ 646 if (total_count >= s->cluster_size) { 647 *pnum = total_count / s->cluster_size; 648 return 0; 649 } 650 651 offset += count; 652 bytes -= count; 653 } 654 } 655 656 void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes) 657 { 658 QEMU_LOCK_GUARD(&s->lock); 659 660 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); 661 if (s->progress) { 662 progress_set_remaining(s->progress, 663 bdrv_get_dirty_count(s->copy_bitmap) + 664 s->in_flight_bytes); 665 } 666 } 667 668 /* 669 * Reset bits in copy_bitmap starting at offset if they represent unallocated 670 * data in the image. May reset subsequent contiguous bits. 671 * @return 0 when the cluster at @offset was unallocated, 672 * 1 otherwise, and -ret on error. 673 */ 674 int64_t coroutine_fn block_copy_reset_unallocated(BlockCopyState *s, 675 int64_t offset, 676 int64_t *count) 677 { 678 int ret; 679 int64_t clusters, bytes; 680 681 ret = block_copy_is_cluster_allocated(s, offset, &clusters); 682 if (ret < 0) { 683 return ret; 684 } 685 686 bytes = clusters * s->cluster_size; 687 688 if (!ret) { 689 block_copy_reset(s, offset, bytes); 690 } 691 692 *count = bytes; 693 return ret; 694 } 695 696 /* 697 * block_copy_dirty_clusters 698 * 699 * Copy dirty clusters in @offset/@bytes range. 700 * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty 701 * clusters found and -errno on failure. 702 */ 703 static int coroutine_fn 704 block_copy_dirty_clusters(BlockCopyCallState *call_state) 705 { 706 BlockCopyState *s = call_state->s; 707 int64_t offset = call_state->offset; 708 int64_t bytes = call_state->bytes; 709 710 int ret = 0; 711 bool found_dirty = false; 712 int64_t end = offset + bytes; 713 AioTaskPool *aio = NULL; 714 715 /* 716 * block_copy() user is responsible for keeping source and target in same 717 * aio context 718 */ 719 assert(bdrv_get_aio_context(s->source->bs) == 720 bdrv_get_aio_context(s->target->bs)); 721 722 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 723 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); 724 725 while (bytes && aio_task_pool_status(aio) == 0 && 726 !qatomic_read(&call_state->cancelled)) { 727 BlockCopyTask *task; 728 int64_t status_bytes; 729 730 task = block_copy_task_create(s, call_state, offset, bytes); 731 if (!task) { 732 /* No more dirty bits in the bitmap */ 733 trace_block_copy_skip_range(s, offset, bytes); 734 break; 735 } 736 if (task->req.offset > offset) { 737 trace_block_copy_skip_range(s, offset, task->req.offset - offset); 738 } 739 740 found_dirty = true; 741 742 ret = block_copy_block_status(s, task->req.offset, task->req.bytes, 743 &status_bytes); 744 assert(ret >= 0); /* never fail */ 745 if (status_bytes < task->req.bytes) { 746 block_copy_task_shrink(task, status_bytes); 747 } 748 if (qatomic_read(&s->skip_unallocated) && 749 !(ret & BDRV_BLOCK_ALLOCATED)) { 750 block_copy_task_end(task, 0); 751 trace_block_copy_skip_range(s, task->req.offset, task->req.bytes); 752 offset = task_end(task); 753 bytes = end - offset; 754 g_free(task); 755 continue; 756 } 757 if (ret & BDRV_BLOCK_ZERO) { 758 task->method = COPY_WRITE_ZEROES; 759 } 760 761 if (!call_state->ignore_ratelimit) { 762 uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0); 763 if (ns > 0) { 764 block_copy_task_end(task, -EAGAIN); 765 g_free(task); 766 qemu_co_sleep_ns_wakeable(&call_state->sleep, 767 QEMU_CLOCK_REALTIME, ns); 768 continue; 769 } 770 } 771 772 ratelimit_calculate_delay(&s->rate_limit, task->req.bytes); 773 774 trace_block_copy_process(s, task->req.offset); 775 776 co_get_from_shres(s->mem, task->req.bytes); 777 778 offset = task_end(task); 779 bytes = end - offset; 780 781 if (!aio && bytes) { 782 aio = aio_task_pool_new(call_state->max_workers); 783 } 784 785 ret = block_copy_task_run(aio, task); 786 if (ret < 0) { 787 goto out; 788 } 789 } 790 791 out: 792 if (aio) { 793 aio_task_pool_wait_all(aio); 794 795 /* 796 * We are not really interested in -ECANCELED returned from 797 * block_copy_task_run. If it fails, it means some task already failed 798 * for real reason, let's return first failure. 799 * Still, assert that we don't rewrite failure by success. 800 * 801 * Note: ret may be positive here because of block-status result. 802 */ 803 assert(ret >= 0 || aio_task_pool_status(aio) < 0); 804 ret = aio_task_pool_status(aio); 805 806 aio_task_pool_free(aio); 807 } 808 809 return ret < 0 ? ret : found_dirty; 810 } 811 812 void block_copy_kick(BlockCopyCallState *call_state) 813 { 814 qemu_co_sleep_wake(&call_state->sleep); 815 } 816 817 /* 818 * block_copy_common 819 * 820 * Copy requested region, accordingly to dirty bitmap. 821 * Collaborate with parallel block_copy requests: if they succeed it will help 822 * us. If they fail, we will retry not-copied regions. So, if we return error, 823 * it means that some I/O operation failed in context of _this_ block_copy call, 824 * not some parallel operation. 825 */ 826 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state) 827 { 828 int ret; 829 BlockCopyState *s = call_state->s; 830 831 qemu_co_mutex_lock(&s->lock); 832 QLIST_INSERT_HEAD(&s->calls, call_state, list); 833 qemu_co_mutex_unlock(&s->lock); 834 835 do { 836 ret = block_copy_dirty_clusters(call_state); 837 838 if (ret == 0 && !qatomic_read(&call_state->cancelled)) { 839 WITH_QEMU_LOCK_GUARD(&s->lock) { 840 /* 841 * Check that there is no task we still need to 842 * wait to complete 843 */ 844 ret = reqlist_wait_one(&s->reqs, call_state->offset, 845 call_state->bytes, &s->lock); 846 if (ret == 0) { 847 /* 848 * No pending tasks, but check again the bitmap in this 849 * same critical section, since a task might have failed 850 * between this and the critical section in 851 * block_copy_dirty_clusters(). 852 * 853 * reqlist_wait_one return value 0 also means that it 854 * didn't release the lock. So, we are still in the same 855 * critical section, not interrupted by any concurrent 856 * access to state. 857 */ 858 ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap, 859 call_state->offset, 860 call_state->bytes) >= 0; 861 } 862 } 863 } 864 865 /* 866 * We retry in two cases: 867 * 1. Some progress done 868 * Something was copied, which means that there were yield points 869 * and some new dirty bits may have appeared (due to failed parallel 870 * block-copy requests). 871 * 2. We have waited for some intersecting block-copy request 872 * It may have failed and produced new dirty bits. 873 */ 874 } while (ret > 0 && !qatomic_read(&call_state->cancelled)); 875 876 qatomic_store_release(&call_state->finished, true); 877 878 if (call_state->cb) { 879 call_state->cb(call_state->cb_opaque); 880 } 881 882 qemu_co_mutex_lock(&s->lock); 883 QLIST_REMOVE(call_state, list); 884 qemu_co_mutex_unlock(&s->lock); 885 886 return ret; 887 } 888 889 static void coroutine_fn block_copy_async_co_entry(void *opaque) 890 { 891 block_copy_common(opaque); 892 } 893 894 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes, 895 bool ignore_ratelimit, uint64_t timeout_ns, 896 BlockCopyAsyncCallbackFunc cb, 897 void *cb_opaque) 898 { 899 int ret; 900 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1); 901 902 *call_state = (BlockCopyCallState) { 903 .s = s, 904 .offset = start, 905 .bytes = bytes, 906 .ignore_ratelimit = ignore_ratelimit, 907 .max_workers = BLOCK_COPY_MAX_WORKERS, 908 .cb = cb, 909 .cb_opaque = cb_opaque, 910 }; 911 912 ret = qemu_co_timeout(block_copy_async_co_entry, call_state, timeout_ns, 913 g_free); 914 if (ret < 0) { 915 assert(ret == -ETIMEDOUT); 916 block_copy_call_cancel(call_state); 917 /* call_state will be freed by running coroutine. */ 918 return ret; 919 } 920 921 ret = call_state->ret; 922 g_free(call_state); 923 924 return ret; 925 } 926 927 BlockCopyCallState *block_copy_async(BlockCopyState *s, 928 int64_t offset, int64_t bytes, 929 int max_workers, int64_t max_chunk, 930 BlockCopyAsyncCallbackFunc cb, 931 void *cb_opaque) 932 { 933 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1); 934 935 *call_state = (BlockCopyCallState) { 936 .s = s, 937 .offset = offset, 938 .bytes = bytes, 939 .max_workers = max_workers, 940 .max_chunk = max_chunk, 941 .cb = cb, 942 .cb_opaque = cb_opaque, 943 944 .co = qemu_coroutine_create(block_copy_async_co_entry, call_state), 945 }; 946 947 qemu_coroutine_enter(call_state->co); 948 949 return call_state; 950 } 951 952 void block_copy_call_free(BlockCopyCallState *call_state) 953 { 954 if (!call_state) { 955 return; 956 } 957 958 assert(qatomic_read(&call_state->finished)); 959 g_free(call_state); 960 } 961 962 bool block_copy_call_finished(BlockCopyCallState *call_state) 963 { 964 return qatomic_read(&call_state->finished); 965 } 966 967 bool block_copy_call_succeeded(BlockCopyCallState *call_state) 968 { 969 return qatomic_load_acquire(&call_state->finished) && 970 !qatomic_read(&call_state->cancelled) && 971 call_state->ret == 0; 972 } 973 974 bool block_copy_call_failed(BlockCopyCallState *call_state) 975 { 976 return qatomic_load_acquire(&call_state->finished) && 977 !qatomic_read(&call_state->cancelled) && 978 call_state->ret < 0; 979 } 980 981 bool block_copy_call_cancelled(BlockCopyCallState *call_state) 982 { 983 return qatomic_read(&call_state->cancelled); 984 } 985 986 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read) 987 { 988 assert(qatomic_load_acquire(&call_state->finished)); 989 if (error_is_read) { 990 *error_is_read = call_state->error_is_read; 991 } 992 return call_state->ret; 993 } 994 995 /* 996 * Note that cancelling and finishing are racy. 997 * User can cancel a block-copy that is already finished. 998 */ 999 void block_copy_call_cancel(BlockCopyCallState *call_state) 1000 { 1001 qatomic_set(&call_state->cancelled, true); 1002 block_copy_kick(call_state); 1003 } 1004 1005 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s) 1006 { 1007 return s->copy_bitmap; 1008 } 1009 1010 int64_t block_copy_cluster_size(BlockCopyState *s) 1011 { 1012 return s->cluster_size; 1013 } 1014 1015 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip) 1016 { 1017 qatomic_set(&s->skip_unallocated, skip); 1018 } 1019 1020 void block_copy_set_speed(BlockCopyState *s, uint64_t speed) 1021 { 1022 ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME); 1023 1024 /* 1025 * Note: it's good to kick all call states from here, but it should be done 1026 * only from a coroutine, to not crash if s->calls list changed while 1027 * entering one call. So for now, the only user of this function kicks its 1028 * only one call_state by hand. 1029 */ 1030 } 1031