1 /* 2 * block_copy API 3 * 4 * Copyright (C) 2013 Proxmox Server Solutions 5 * Copyright (c) 2019 Virtuozzo International GmbH. 6 * 7 * Authors: 8 * Dietmar Maurer (dietmar@proxmox.com) 9 * Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or later. 12 * See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 17 #include "trace.h" 18 #include "qapi/error.h" 19 #include "block/block-copy.h" 20 #include "block/reqlist.h" 21 #include "sysemu/block-backend.h" 22 #include "qemu/units.h" 23 #include "qemu/coroutine.h" 24 #include "block/aio_task.h" 25 #include "qemu/error-report.h" 26 #include "qemu/memalign.h" 27 28 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB) 29 #define BLOCK_COPY_MAX_BUFFER (1 * MiB) 30 #define BLOCK_COPY_MAX_MEM (128 * MiB) 31 #define BLOCK_COPY_MAX_WORKERS 64 32 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */ 33 #define BLOCK_COPY_CLUSTER_SIZE_DEFAULT (1 << 16) 34 35 typedef enum { 36 COPY_READ_WRITE_CLUSTER, 37 COPY_READ_WRITE, 38 COPY_WRITE_ZEROES, 39 COPY_RANGE_SMALL, 40 COPY_RANGE_FULL 41 } BlockCopyMethod; 42 43 static coroutine_fn int block_copy_task_entry(AioTask *task); 44 45 typedef struct BlockCopyCallState { 46 /* Fields initialized in block_copy_async() and never changed. */ 47 BlockCopyState *s; 48 int64_t offset; 49 int64_t bytes; 50 int max_workers; 51 int64_t max_chunk; 52 bool ignore_ratelimit; 53 BlockCopyAsyncCallbackFunc cb; 54 void *cb_opaque; 55 /* Coroutine where async block-copy is running */ 56 Coroutine *co; 57 58 /* Fields whose state changes throughout the execution */ 59 bool finished; /* atomic */ 60 QemuCoSleep sleep; /* TODO: protect API with a lock */ 61 bool cancelled; /* atomic */ 62 /* To reference all call states from BlockCopyState */ 63 QLIST_ENTRY(BlockCopyCallState) list; 64 65 /* 66 * Fields that report information about return values and erros. 67 * Protected by lock in BlockCopyState. 68 */ 69 bool error_is_read; 70 /* 71 * @ret is set concurrently by tasks under mutex. Only set once by first 72 * failed task (and untouched if no task failed). 73 * After finishing (call_state->finished is true), it is not modified 74 * anymore and may be safely read without mutex. 75 */ 76 int ret; 77 } BlockCopyCallState; 78 79 typedef struct BlockCopyTask { 80 AioTask task; 81 82 /* 83 * Fields initialized in block_copy_task_create() 84 * and never changed. 85 */ 86 BlockCopyState *s; 87 BlockCopyCallState *call_state; 88 /* 89 * @method can also be set again in the while loop of 90 * block_copy_dirty_clusters(), but it is never accessed concurrently 91 * because the only other function that reads it is 92 * block_copy_task_entry() and it is invoked afterwards in the same 93 * iteration. 94 */ 95 BlockCopyMethod method; 96 97 /* 98 * Generally, req is protected by lock in BlockCopyState, Still req.offset 99 * is only set on task creation, so may be read concurrently after creation. 100 * req.bytes is changed at most once, and need only protecting the case of 101 * parallel read while updating @bytes value in block_copy_task_shrink(). 102 */ 103 BlockReq req; 104 } BlockCopyTask; 105 106 static int64_t task_end(BlockCopyTask *task) 107 { 108 return task->req.offset + task->req.bytes; 109 } 110 111 typedef struct BlockCopyState { 112 /* 113 * BdrvChild objects are not owned or managed by block-copy. They are 114 * provided by block-copy user and user is responsible for appropriate 115 * permissions on these children. 116 */ 117 BdrvChild *source; 118 BdrvChild *target; 119 120 /* 121 * Fields initialized in block_copy_state_new() 122 * and never changed. 123 */ 124 int64_t cluster_size; 125 int64_t max_transfer; 126 uint64_t len; 127 BdrvRequestFlags write_flags; 128 129 /* 130 * Fields whose state changes throughout the execution 131 * Protected by lock. 132 */ 133 CoMutex lock; 134 int64_t in_flight_bytes; 135 BlockCopyMethod method; 136 BlockReqList reqs; 137 QLIST_HEAD(, BlockCopyCallState) calls; 138 /* 139 * skip_unallocated: 140 * 141 * Used by sync=top jobs, which first scan the source node for unallocated 142 * areas and clear them in the copy_bitmap. During this process, the bitmap 143 * is thus not fully initialized: It may still have bits set for areas that 144 * are unallocated and should actually not be copied. 145 * 146 * This is indicated by skip_unallocated. 147 * 148 * In this case, block_copy() will query the source’s allocation status, 149 * skip unallocated regions, clear them in the copy_bitmap, and invoke 150 * block_copy_reset_unallocated() every time it does. 151 */ 152 bool skip_unallocated; /* atomic */ 153 /* State fields that use a thread-safe API */ 154 BdrvDirtyBitmap *copy_bitmap; 155 ProgressMeter *progress; 156 SharedResource *mem; 157 RateLimit rate_limit; 158 } BlockCopyState; 159 160 /* Called with lock held */ 161 static int64_t block_copy_chunk_size(BlockCopyState *s) 162 { 163 switch (s->method) { 164 case COPY_READ_WRITE_CLUSTER: 165 return s->cluster_size; 166 case COPY_READ_WRITE: 167 case COPY_RANGE_SMALL: 168 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER), 169 s->max_transfer); 170 case COPY_RANGE_FULL: 171 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE), 172 s->max_transfer); 173 default: 174 /* Cannot have COPY_WRITE_ZEROES here. */ 175 abort(); 176 } 177 } 178 179 /* 180 * Search for the first dirty area in offset/bytes range and create task at 181 * the beginning of it. 182 */ 183 static coroutine_fn BlockCopyTask * 184 block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state, 185 int64_t offset, int64_t bytes) 186 { 187 BlockCopyTask *task; 188 int64_t max_chunk; 189 190 QEMU_LOCK_GUARD(&s->lock); 191 max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk); 192 if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap, 193 offset, offset + bytes, 194 max_chunk, &offset, &bytes)) 195 { 196 return NULL; 197 } 198 199 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 200 bytes = QEMU_ALIGN_UP(bytes, s->cluster_size); 201 202 /* region is dirty, so no existent tasks possible in it */ 203 assert(!reqlist_find_conflict(&s->reqs, offset, bytes)); 204 205 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); 206 s->in_flight_bytes += bytes; 207 208 task = g_new(BlockCopyTask, 1); 209 *task = (BlockCopyTask) { 210 .task.func = block_copy_task_entry, 211 .s = s, 212 .call_state = call_state, 213 .method = s->method, 214 }; 215 reqlist_init_req(&s->reqs, &task->req, offset, bytes); 216 217 return task; 218 } 219 220 /* 221 * block_copy_task_shrink 222 * 223 * Drop the tail of the task to be handled later. Set dirty bits back and 224 * wake up all tasks waiting for us (may be some of them are not intersecting 225 * with shrunk task) 226 */ 227 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task, 228 int64_t new_bytes) 229 { 230 QEMU_LOCK_GUARD(&task->s->lock); 231 if (new_bytes == task->req.bytes) { 232 return; 233 } 234 235 assert(new_bytes > 0 && new_bytes < task->req.bytes); 236 237 task->s->in_flight_bytes -= task->req.bytes - new_bytes; 238 bdrv_set_dirty_bitmap(task->s->copy_bitmap, 239 task->req.offset + new_bytes, 240 task->req.bytes - new_bytes); 241 242 reqlist_shrink_req(&task->req, new_bytes); 243 } 244 245 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret) 246 { 247 QEMU_LOCK_GUARD(&task->s->lock); 248 task->s->in_flight_bytes -= task->req.bytes; 249 if (ret < 0) { 250 bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->req.offset, 251 task->req.bytes); 252 } 253 if (task->s->progress) { 254 progress_set_remaining(task->s->progress, 255 bdrv_get_dirty_count(task->s->copy_bitmap) + 256 task->s->in_flight_bytes); 257 } 258 reqlist_remove_req(&task->req); 259 } 260 261 void block_copy_state_free(BlockCopyState *s) 262 { 263 if (!s) { 264 return; 265 } 266 267 ratelimit_destroy(&s->rate_limit); 268 bdrv_release_dirty_bitmap(s->copy_bitmap); 269 shres_destroy(s->mem); 270 g_free(s); 271 } 272 273 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target) 274 { 275 return MIN_NON_ZERO(INT_MAX, 276 MIN_NON_ZERO(source->bs->bl.max_transfer, 277 target->bs->bl.max_transfer)); 278 } 279 280 void block_copy_set_copy_opts(BlockCopyState *s, bool use_copy_range, 281 bool compress) 282 { 283 /* Keep BDRV_REQ_SERIALISING set (or not set) in block_copy_state_new() */ 284 s->write_flags = (s->write_flags & BDRV_REQ_SERIALISING) | 285 (compress ? BDRV_REQ_WRITE_COMPRESSED : 0); 286 287 if (s->max_transfer < s->cluster_size) { 288 /* 289 * copy_range does not respect max_transfer. We don't want to bother 290 * with requests smaller than block-copy cluster size, so fallback to 291 * buffered copying (read and write respect max_transfer on their 292 * behalf). 293 */ 294 s->method = COPY_READ_WRITE_CLUSTER; 295 } else if (compress) { 296 /* Compression supports only cluster-size writes and no copy-range. */ 297 s->method = COPY_READ_WRITE_CLUSTER; 298 } else { 299 /* 300 * If copy range enabled, start with COPY_RANGE_SMALL, until first 301 * successful copy_range (look at block_copy_do_copy). 302 */ 303 s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE; 304 } 305 } 306 307 static int64_t block_copy_calculate_cluster_size(BlockDriverState *target, 308 Error **errp) 309 { 310 int ret; 311 BlockDriverInfo bdi; 312 bool target_does_cow = bdrv_backing_chain_next(target); 313 314 /* 315 * If there is no backing file on the target, we cannot rely on COW if our 316 * backup cluster size is smaller than the target cluster size. Even for 317 * targets with a backing file, try to avoid COW if possible. 318 */ 319 ret = bdrv_get_info(target, &bdi); 320 if (ret == -ENOTSUP && !target_does_cow) { 321 /* Cluster size is not defined */ 322 warn_report("The target block device doesn't provide " 323 "information about the block size and it doesn't have a " 324 "backing file. The default block size of %u bytes is " 325 "used. If the actual block size of the target exceeds " 326 "this default, the backup may be unusable", 327 BLOCK_COPY_CLUSTER_SIZE_DEFAULT); 328 return BLOCK_COPY_CLUSTER_SIZE_DEFAULT; 329 } else if (ret < 0 && !target_does_cow) { 330 error_setg_errno(errp, -ret, 331 "Couldn't determine the cluster size of the target image, " 332 "which has no backing file"); 333 error_append_hint(errp, 334 "Aborting, since this may create an unusable destination image\n"); 335 return ret; 336 } else if (ret < 0 && target_does_cow) { 337 /* Not fatal; just trudge on ahead. */ 338 return BLOCK_COPY_CLUSTER_SIZE_DEFAULT; 339 } 340 341 return MAX(BLOCK_COPY_CLUSTER_SIZE_DEFAULT, bdi.cluster_size); 342 } 343 344 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target, 345 const BdrvDirtyBitmap *bitmap, 346 Error **errp) 347 { 348 ERRP_GUARD(); 349 BlockCopyState *s; 350 int64_t cluster_size; 351 BdrvDirtyBitmap *copy_bitmap; 352 bool is_fleecing; 353 354 cluster_size = block_copy_calculate_cluster_size(target->bs, errp); 355 if (cluster_size < 0) { 356 return NULL; 357 } 358 359 copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL, 360 errp); 361 if (!copy_bitmap) { 362 return NULL; 363 } 364 bdrv_disable_dirty_bitmap(copy_bitmap); 365 if (bitmap) { 366 if (!bdrv_merge_dirty_bitmap(copy_bitmap, bitmap, NULL, errp)) { 367 error_prepend(errp, "Failed to merge bitmap '%s' to internal " 368 "copy-bitmap: ", bdrv_dirty_bitmap_name(bitmap)); 369 bdrv_release_dirty_bitmap(copy_bitmap); 370 return NULL; 371 } 372 } else { 373 bdrv_set_dirty_bitmap(copy_bitmap, 0, 374 bdrv_dirty_bitmap_size(copy_bitmap)); 375 } 376 377 /* 378 * If source is in backing chain of target assume that target is going to be 379 * used for "image fleecing", i.e. it should represent a kind of snapshot of 380 * source at backup-start point in time. And target is going to be read by 381 * somebody (for example, used as NBD export) during backup job. 382 * 383 * In this case, we need to add BDRV_REQ_SERIALISING write flag to avoid 384 * intersection of backup writes and third party reads from target, 385 * otherwise reading from target we may occasionally read already updated by 386 * guest data. 387 * 388 * For more information see commit f8d59dfb40bb and test 389 * tests/qemu-iotests/222 390 */ 391 is_fleecing = bdrv_chain_contains(target->bs, source->bs); 392 393 s = g_new(BlockCopyState, 1); 394 *s = (BlockCopyState) { 395 .source = source, 396 .target = target, 397 .copy_bitmap = copy_bitmap, 398 .cluster_size = cluster_size, 399 .len = bdrv_dirty_bitmap_size(copy_bitmap), 400 .write_flags = (is_fleecing ? BDRV_REQ_SERIALISING : 0), 401 .mem = shres_create(BLOCK_COPY_MAX_MEM), 402 .max_transfer = QEMU_ALIGN_DOWN( 403 block_copy_max_transfer(source, target), 404 cluster_size), 405 }; 406 407 block_copy_set_copy_opts(s, false, false); 408 409 ratelimit_init(&s->rate_limit); 410 qemu_co_mutex_init(&s->lock); 411 QLIST_INIT(&s->reqs); 412 QLIST_INIT(&s->calls); 413 414 return s; 415 } 416 417 /* Only set before running the job, no need for locking. */ 418 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm) 419 { 420 s->progress = pm; 421 } 422 423 /* 424 * Takes ownership of @task 425 * 426 * If pool is NULL directly run the task, otherwise schedule it into the pool. 427 * 428 * Returns: task.func return code if pool is NULL 429 * otherwise -ECANCELED if pool status is bad 430 * otherwise 0 (successfully scheduled) 431 */ 432 static coroutine_fn int block_copy_task_run(AioTaskPool *pool, 433 BlockCopyTask *task) 434 { 435 if (!pool) { 436 int ret = task->task.func(&task->task); 437 438 g_free(task); 439 return ret; 440 } 441 442 aio_task_pool_wait_slot(pool); 443 if (aio_task_pool_status(pool) < 0) { 444 co_put_to_shres(task->s->mem, task->req.bytes); 445 block_copy_task_end(task, -ECANCELED); 446 g_free(task); 447 return -ECANCELED; 448 } 449 450 aio_task_pool_start_task(pool, &task->task); 451 452 return 0; 453 } 454 455 /* 456 * block_copy_do_copy 457 * 458 * Do copy of cluster-aligned chunk. Requested region is allowed to exceed 459 * s->len only to cover last cluster when s->len is not aligned to clusters. 460 * 461 * No sync here: nor bitmap neighter intersecting requests handling, only copy. 462 * 463 * @method is an in-out argument, so that copy_range can be either extended to 464 * a full-size buffer or disabled if the copy_range attempt fails. The output 465 * value of @method should be used for subsequent tasks. 466 * Returns 0 on success. 467 */ 468 static int coroutine_fn block_copy_do_copy(BlockCopyState *s, 469 int64_t offset, int64_t bytes, 470 BlockCopyMethod *method, 471 bool *error_is_read) 472 { 473 int ret; 474 int64_t nbytes = MIN(offset + bytes, s->len) - offset; 475 void *bounce_buffer = NULL; 476 477 assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes); 478 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 479 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); 480 assert(offset < s->len); 481 assert(offset + bytes <= s->len || 482 offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size)); 483 assert(nbytes < INT_MAX); 484 485 switch (*method) { 486 case COPY_WRITE_ZEROES: 487 ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags & 488 ~BDRV_REQ_WRITE_COMPRESSED); 489 if (ret < 0) { 490 trace_block_copy_write_zeroes_fail(s, offset, ret); 491 *error_is_read = false; 492 } 493 return ret; 494 495 case COPY_RANGE_SMALL: 496 case COPY_RANGE_FULL: 497 ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes, 498 0, s->write_flags); 499 if (ret >= 0) { 500 /* Successful copy-range, increase chunk size. */ 501 *method = COPY_RANGE_FULL; 502 return 0; 503 } 504 505 trace_block_copy_copy_range_fail(s, offset, ret); 506 *method = COPY_READ_WRITE; 507 /* Fall through to read+write with allocated buffer */ 508 509 case COPY_READ_WRITE_CLUSTER: 510 case COPY_READ_WRITE: 511 /* 512 * In case of failed copy_range request above, we may proceed with 513 * buffered request larger than BLOCK_COPY_MAX_BUFFER. 514 * Still, further requests will be properly limited, so don't care too 515 * much. Moreover the most likely case (copy_range is unsupported for 516 * the configuration, so the very first copy_range request fails) 517 * is handled by setting large copy_size only after first successful 518 * copy_range. 519 */ 520 521 bounce_buffer = qemu_blockalign(s->source->bs, nbytes); 522 523 ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0); 524 if (ret < 0) { 525 trace_block_copy_read_fail(s, offset, ret); 526 *error_is_read = true; 527 goto out; 528 } 529 530 ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer, 531 s->write_flags); 532 if (ret < 0) { 533 trace_block_copy_write_fail(s, offset, ret); 534 *error_is_read = false; 535 goto out; 536 } 537 538 out: 539 qemu_vfree(bounce_buffer); 540 break; 541 542 default: 543 abort(); 544 } 545 546 return ret; 547 } 548 549 static coroutine_fn int block_copy_task_entry(AioTask *task) 550 { 551 BlockCopyTask *t = container_of(task, BlockCopyTask, task); 552 BlockCopyState *s = t->s; 553 bool error_is_read = false; 554 BlockCopyMethod method = t->method; 555 int ret; 556 557 ret = block_copy_do_copy(s, t->req.offset, t->req.bytes, &method, 558 &error_is_read); 559 560 WITH_QEMU_LOCK_GUARD(&s->lock) { 561 if (s->method == t->method) { 562 s->method = method; 563 } 564 565 if (ret < 0) { 566 if (!t->call_state->ret) { 567 t->call_state->ret = ret; 568 t->call_state->error_is_read = error_is_read; 569 } 570 } else if (s->progress) { 571 progress_work_done(s->progress, t->req.bytes); 572 } 573 } 574 co_put_to_shres(s->mem, t->req.bytes); 575 block_copy_task_end(t, ret); 576 577 return ret; 578 } 579 580 static int block_copy_block_status(BlockCopyState *s, int64_t offset, 581 int64_t bytes, int64_t *pnum) 582 { 583 int64_t num; 584 BlockDriverState *base; 585 int ret; 586 587 if (qatomic_read(&s->skip_unallocated)) { 588 base = bdrv_backing_chain_next(s->source->bs); 589 } else { 590 base = NULL; 591 } 592 593 ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num, 594 NULL, NULL); 595 if (ret < 0 || num < s->cluster_size) { 596 /* 597 * On error or if failed to obtain large enough chunk just fallback to 598 * copy one cluster. 599 */ 600 num = s->cluster_size; 601 ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA; 602 } else if (offset + num == s->len) { 603 num = QEMU_ALIGN_UP(num, s->cluster_size); 604 } else { 605 num = QEMU_ALIGN_DOWN(num, s->cluster_size); 606 } 607 608 *pnum = num; 609 return ret; 610 } 611 612 /* 613 * Check if the cluster starting at offset is allocated or not. 614 * return via pnum the number of contiguous clusters sharing this allocation. 615 */ 616 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset, 617 int64_t *pnum) 618 { 619 BlockDriverState *bs = s->source->bs; 620 int64_t count, total_count = 0; 621 int64_t bytes = s->len - offset; 622 int ret; 623 624 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 625 626 while (true) { 627 ret = bdrv_is_allocated(bs, offset, bytes, &count); 628 if (ret < 0) { 629 return ret; 630 } 631 632 total_count += count; 633 634 if (ret || count == 0) { 635 /* 636 * ret: partial segment(s) are considered allocated. 637 * otherwise: unallocated tail is treated as an entire segment. 638 */ 639 *pnum = DIV_ROUND_UP(total_count, s->cluster_size); 640 return ret; 641 } 642 643 /* Unallocated segment(s) with uncertain following segment(s) */ 644 if (total_count >= s->cluster_size) { 645 *pnum = total_count / s->cluster_size; 646 return 0; 647 } 648 649 offset += count; 650 bytes -= count; 651 } 652 } 653 654 void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes) 655 { 656 QEMU_LOCK_GUARD(&s->lock); 657 658 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); 659 if (s->progress) { 660 progress_set_remaining(s->progress, 661 bdrv_get_dirty_count(s->copy_bitmap) + 662 s->in_flight_bytes); 663 } 664 } 665 666 /* 667 * Reset bits in copy_bitmap starting at offset if they represent unallocated 668 * data in the image. May reset subsequent contiguous bits. 669 * @return 0 when the cluster at @offset was unallocated, 670 * 1 otherwise, and -ret on error. 671 */ 672 int64_t block_copy_reset_unallocated(BlockCopyState *s, 673 int64_t offset, int64_t *count) 674 { 675 int ret; 676 int64_t clusters, bytes; 677 678 ret = block_copy_is_cluster_allocated(s, offset, &clusters); 679 if (ret < 0) { 680 return ret; 681 } 682 683 bytes = clusters * s->cluster_size; 684 685 if (!ret) { 686 block_copy_reset(s, offset, bytes); 687 } 688 689 *count = bytes; 690 return ret; 691 } 692 693 /* 694 * block_copy_dirty_clusters 695 * 696 * Copy dirty clusters in @offset/@bytes range. 697 * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty 698 * clusters found and -errno on failure. 699 */ 700 static int coroutine_fn 701 block_copy_dirty_clusters(BlockCopyCallState *call_state) 702 { 703 BlockCopyState *s = call_state->s; 704 int64_t offset = call_state->offset; 705 int64_t bytes = call_state->bytes; 706 707 int ret = 0; 708 bool found_dirty = false; 709 int64_t end = offset + bytes; 710 AioTaskPool *aio = NULL; 711 712 /* 713 * block_copy() user is responsible for keeping source and target in same 714 * aio context 715 */ 716 assert(bdrv_get_aio_context(s->source->bs) == 717 bdrv_get_aio_context(s->target->bs)); 718 719 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 720 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); 721 722 while (bytes && aio_task_pool_status(aio) == 0 && 723 !qatomic_read(&call_state->cancelled)) { 724 BlockCopyTask *task; 725 int64_t status_bytes; 726 727 task = block_copy_task_create(s, call_state, offset, bytes); 728 if (!task) { 729 /* No more dirty bits in the bitmap */ 730 trace_block_copy_skip_range(s, offset, bytes); 731 break; 732 } 733 if (task->req.offset > offset) { 734 trace_block_copy_skip_range(s, offset, task->req.offset - offset); 735 } 736 737 found_dirty = true; 738 739 ret = block_copy_block_status(s, task->req.offset, task->req.bytes, 740 &status_bytes); 741 assert(ret >= 0); /* never fail */ 742 if (status_bytes < task->req.bytes) { 743 block_copy_task_shrink(task, status_bytes); 744 } 745 if (qatomic_read(&s->skip_unallocated) && 746 !(ret & BDRV_BLOCK_ALLOCATED)) { 747 block_copy_task_end(task, 0); 748 trace_block_copy_skip_range(s, task->req.offset, task->req.bytes); 749 offset = task_end(task); 750 bytes = end - offset; 751 g_free(task); 752 continue; 753 } 754 if (ret & BDRV_BLOCK_ZERO) { 755 task->method = COPY_WRITE_ZEROES; 756 } 757 758 if (!call_state->ignore_ratelimit) { 759 uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0); 760 if (ns > 0) { 761 block_copy_task_end(task, -EAGAIN); 762 g_free(task); 763 qemu_co_sleep_ns_wakeable(&call_state->sleep, 764 QEMU_CLOCK_REALTIME, ns); 765 continue; 766 } 767 } 768 769 ratelimit_calculate_delay(&s->rate_limit, task->req.bytes); 770 771 trace_block_copy_process(s, task->req.offset); 772 773 co_get_from_shres(s->mem, task->req.bytes); 774 775 offset = task_end(task); 776 bytes = end - offset; 777 778 if (!aio && bytes) { 779 aio = aio_task_pool_new(call_state->max_workers); 780 } 781 782 ret = block_copy_task_run(aio, task); 783 if (ret < 0) { 784 goto out; 785 } 786 } 787 788 out: 789 if (aio) { 790 aio_task_pool_wait_all(aio); 791 792 /* 793 * We are not really interested in -ECANCELED returned from 794 * block_copy_task_run. If it fails, it means some task already failed 795 * for real reason, let's return first failure. 796 * Still, assert that we don't rewrite failure by success. 797 * 798 * Note: ret may be positive here because of block-status result. 799 */ 800 assert(ret >= 0 || aio_task_pool_status(aio) < 0); 801 ret = aio_task_pool_status(aio); 802 803 aio_task_pool_free(aio); 804 } 805 806 return ret < 0 ? ret : found_dirty; 807 } 808 809 void block_copy_kick(BlockCopyCallState *call_state) 810 { 811 qemu_co_sleep_wake(&call_state->sleep); 812 } 813 814 /* 815 * block_copy_common 816 * 817 * Copy requested region, accordingly to dirty bitmap. 818 * Collaborate with parallel block_copy requests: if they succeed it will help 819 * us. If they fail, we will retry not-copied regions. So, if we return error, 820 * it means that some I/O operation failed in context of _this_ block_copy call, 821 * not some parallel operation. 822 */ 823 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state) 824 { 825 int ret; 826 BlockCopyState *s = call_state->s; 827 828 qemu_co_mutex_lock(&s->lock); 829 QLIST_INSERT_HEAD(&s->calls, call_state, list); 830 qemu_co_mutex_unlock(&s->lock); 831 832 do { 833 ret = block_copy_dirty_clusters(call_state); 834 835 if (ret == 0 && !qatomic_read(&call_state->cancelled)) { 836 WITH_QEMU_LOCK_GUARD(&s->lock) { 837 /* 838 * Check that there is no task we still need to 839 * wait to complete 840 */ 841 ret = reqlist_wait_one(&s->reqs, call_state->offset, 842 call_state->bytes, &s->lock); 843 if (ret == 0) { 844 /* 845 * No pending tasks, but check again the bitmap in this 846 * same critical section, since a task might have failed 847 * between this and the critical section in 848 * block_copy_dirty_clusters(). 849 * 850 * reqlist_wait_one return value 0 also means that it 851 * didn't release the lock. So, we are still in the same 852 * critical section, not interrupted by any concurrent 853 * access to state. 854 */ 855 ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap, 856 call_state->offset, 857 call_state->bytes) >= 0; 858 } 859 } 860 } 861 862 /* 863 * We retry in two cases: 864 * 1. Some progress done 865 * Something was copied, which means that there were yield points 866 * and some new dirty bits may have appeared (due to failed parallel 867 * block-copy requests). 868 * 2. We have waited for some intersecting block-copy request 869 * It may have failed and produced new dirty bits. 870 */ 871 } while (ret > 0 && !qatomic_read(&call_state->cancelled)); 872 873 qatomic_store_release(&call_state->finished, true); 874 875 if (call_state->cb) { 876 call_state->cb(call_state->cb_opaque); 877 } 878 879 qemu_co_mutex_lock(&s->lock); 880 QLIST_REMOVE(call_state, list); 881 qemu_co_mutex_unlock(&s->lock); 882 883 return ret; 884 } 885 886 static void coroutine_fn block_copy_async_co_entry(void *opaque) 887 { 888 block_copy_common(opaque); 889 } 890 891 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes, 892 bool ignore_ratelimit, uint64_t timeout_ns, 893 BlockCopyAsyncCallbackFunc cb, 894 void *cb_opaque) 895 { 896 int ret; 897 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1); 898 899 *call_state = (BlockCopyCallState) { 900 .s = s, 901 .offset = start, 902 .bytes = bytes, 903 .ignore_ratelimit = ignore_ratelimit, 904 .max_workers = BLOCK_COPY_MAX_WORKERS, 905 .cb = cb, 906 .cb_opaque = cb_opaque, 907 }; 908 909 ret = qemu_co_timeout(block_copy_async_co_entry, call_state, timeout_ns, 910 g_free); 911 if (ret < 0) { 912 assert(ret == -ETIMEDOUT); 913 block_copy_call_cancel(call_state); 914 /* call_state will be freed by running coroutine. */ 915 return ret; 916 } 917 918 ret = call_state->ret; 919 g_free(call_state); 920 921 return ret; 922 } 923 924 BlockCopyCallState *block_copy_async(BlockCopyState *s, 925 int64_t offset, int64_t bytes, 926 int max_workers, int64_t max_chunk, 927 BlockCopyAsyncCallbackFunc cb, 928 void *cb_opaque) 929 { 930 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1); 931 932 *call_state = (BlockCopyCallState) { 933 .s = s, 934 .offset = offset, 935 .bytes = bytes, 936 .max_workers = max_workers, 937 .max_chunk = max_chunk, 938 .cb = cb, 939 .cb_opaque = cb_opaque, 940 941 .co = qemu_coroutine_create(block_copy_async_co_entry, call_state), 942 }; 943 944 qemu_coroutine_enter(call_state->co); 945 946 return call_state; 947 } 948 949 void block_copy_call_free(BlockCopyCallState *call_state) 950 { 951 if (!call_state) { 952 return; 953 } 954 955 assert(qatomic_read(&call_state->finished)); 956 g_free(call_state); 957 } 958 959 bool block_copy_call_finished(BlockCopyCallState *call_state) 960 { 961 return qatomic_read(&call_state->finished); 962 } 963 964 bool block_copy_call_succeeded(BlockCopyCallState *call_state) 965 { 966 return qatomic_load_acquire(&call_state->finished) && 967 !qatomic_read(&call_state->cancelled) && 968 call_state->ret == 0; 969 } 970 971 bool block_copy_call_failed(BlockCopyCallState *call_state) 972 { 973 return qatomic_load_acquire(&call_state->finished) && 974 !qatomic_read(&call_state->cancelled) && 975 call_state->ret < 0; 976 } 977 978 bool block_copy_call_cancelled(BlockCopyCallState *call_state) 979 { 980 return qatomic_read(&call_state->cancelled); 981 } 982 983 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read) 984 { 985 assert(qatomic_load_acquire(&call_state->finished)); 986 if (error_is_read) { 987 *error_is_read = call_state->error_is_read; 988 } 989 return call_state->ret; 990 } 991 992 /* 993 * Note that cancelling and finishing are racy. 994 * User can cancel a block-copy that is already finished. 995 */ 996 void block_copy_call_cancel(BlockCopyCallState *call_state) 997 { 998 qatomic_set(&call_state->cancelled, true); 999 block_copy_kick(call_state); 1000 } 1001 1002 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s) 1003 { 1004 return s->copy_bitmap; 1005 } 1006 1007 int64_t block_copy_cluster_size(BlockCopyState *s) 1008 { 1009 return s->cluster_size; 1010 } 1011 1012 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip) 1013 { 1014 qatomic_set(&s->skip_unallocated, skip); 1015 } 1016 1017 void block_copy_set_speed(BlockCopyState *s, uint64_t speed) 1018 { 1019 ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME); 1020 1021 /* 1022 * Note: it's good to kick all call states from here, but it should be done 1023 * only from a coroutine, to not crash if s->calls list changed while 1024 * entering one call. So for now, the only user of this function kicks its 1025 * only one call_state by hand. 1026 */ 1027 } 1028