1 /* 2 * block_copy API 3 * 4 * Copyright (C) 2013 Proxmox Server Solutions 5 * Copyright (c) 2019 Virtuozzo International GmbH. 6 * 7 * Authors: 8 * Dietmar Maurer (dietmar@proxmox.com) 9 * Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> 10 * 11 * This work is licensed under the terms of the GNU GPL, version 2 or later. 12 * See the COPYING file in the top-level directory. 13 */ 14 15 #include "qemu/osdep.h" 16 17 #include "trace.h" 18 #include "qapi/error.h" 19 #include "block/block-copy.h" 20 #include "block/reqlist.h" 21 #include "sysemu/block-backend.h" 22 #include "qemu/units.h" 23 #include "qemu/coroutine.h" 24 #include "block/aio_task.h" 25 #include "qemu/error-report.h" 26 27 #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB) 28 #define BLOCK_COPY_MAX_BUFFER (1 * MiB) 29 #define BLOCK_COPY_MAX_MEM (128 * MiB) 30 #define BLOCK_COPY_MAX_WORKERS 64 31 #define BLOCK_COPY_SLICE_TIME 100000000ULL /* ns */ 32 #define BLOCK_COPY_CLUSTER_SIZE_DEFAULT (1 << 16) 33 34 typedef enum { 35 COPY_READ_WRITE_CLUSTER, 36 COPY_READ_WRITE, 37 COPY_WRITE_ZEROES, 38 COPY_RANGE_SMALL, 39 COPY_RANGE_FULL 40 } BlockCopyMethod; 41 42 static coroutine_fn int block_copy_task_entry(AioTask *task); 43 44 typedef struct BlockCopyCallState { 45 /* Fields initialized in block_copy_async() and never changed. */ 46 BlockCopyState *s; 47 int64_t offset; 48 int64_t bytes; 49 int max_workers; 50 int64_t max_chunk; 51 bool ignore_ratelimit; 52 BlockCopyAsyncCallbackFunc cb; 53 void *cb_opaque; 54 /* Coroutine where async block-copy is running */ 55 Coroutine *co; 56 57 /* Fields whose state changes throughout the execution */ 58 bool finished; /* atomic */ 59 QemuCoSleep sleep; /* TODO: protect API with a lock */ 60 bool cancelled; /* atomic */ 61 /* To reference all call states from BlockCopyState */ 62 QLIST_ENTRY(BlockCopyCallState) list; 63 64 /* 65 * Fields that report information about return values and erros. 66 * Protected by lock in BlockCopyState. 67 */ 68 bool error_is_read; 69 /* 70 * @ret is set concurrently by tasks under mutex. Only set once by first 71 * failed task (and untouched if no task failed). 72 * After finishing (call_state->finished is true), it is not modified 73 * anymore and may be safely read without mutex. 74 */ 75 int ret; 76 } BlockCopyCallState; 77 78 typedef struct BlockCopyTask { 79 AioTask task; 80 81 /* 82 * Fields initialized in block_copy_task_create() 83 * and never changed. 84 */ 85 BlockCopyState *s; 86 BlockCopyCallState *call_state; 87 /* 88 * @method can also be set again in the while loop of 89 * block_copy_dirty_clusters(), but it is never accessed concurrently 90 * because the only other function that reads it is 91 * block_copy_task_entry() and it is invoked afterwards in the same 92 * iteration. 93 */ 94 BlockCopyMethod method; 95 96 /* 97 * Generally, req is protected by lock in BlockCopyState, Still req.offset 98 * is only set on task creation, so may be read concurrently after creation. 99 * req.bytes is changed at most once, and need only protecting the case of 100 * parallel read while updating @bytes value in block_copy_task_shrink(). 101 */ 102 BlockReq req; 103 } BlockCopyTask; 104 105 static int64_t task_end(BlockCopyTask *task) 106 { 107 return task->req.offset + task->req.bytes; 108 } 109 110 typedef struct BlockCopyState { 111 /* 112 * BdrvChild objects are not owned or managed by block-copy. They are 113 * provided by block-copy user and user is responsible for appropriate 114 * permissions on these children. 115 */ 116 BdrvChild *source; 117 BdrvChild *target; 118 119 /* 120 * Fields initialized in block_copy_state_new() 121 * and never changed. 122 */ 123 int64_t cluster_size; 124 int64_t max_transfer; 125 uint64_t len; 126 BdrvRequestFlags write_flags; 127 128 /* 129 * Fields whose state changes throughout the execution 130 * Protected by lock. 131 */ 132 CoMutex lock; 133 int64_t in_flight_bytes; 134 BlockCopyMethod method; 135 BlockReqList reqs; 136 QLIST_HEAD(, BlockCopyCallState) calls; 137 /* 138 * skip_unallocated: 139 * 140 * Used by sync=top jobs, which first scan the source node for unallocated 141 * areas and clear them in the copy_bitmap. During this process, the bitmap 142 * is thus not fully initialized: It may still have bits set for areas that 143 * are unallocated and should actually not be copied. 144 * 145 * This is indicated by skip_unallocated. 146 * 147 * In this case, block_copy() will query the source’s allocation status, 148 * skip unallocated regions, clear them in the copy_bitmap, and invoke 149 * block_copy_reset_unallocated() every time it does. 150 */ 151 bool skip_unallocated; /* atomic */ 152 /* State fields that use a thread-safe API */ 153 BdrvDirtyBitmap *copy_bitmap; 154 ProgressMeter *progress; 155 SharedResource *mem; 156 RateLimit rate_limit; 157 } BlockCopyState; 158 159 /* Called with lock held */ 160 static int64_t block_copy_chunk_size(BlockCopyState *s) 161 { 162 switch (s->method) { 163 case COPY_READ_WRITE_CLUSTER: 164 return s->cluster_size; 165 case COPY_READ_WRITE: 166 case COPY_RANGE_SMALL: 167 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_BUFFER), 168 s->max_transfer); 169 case COPY_RANGE_FULL: 170 return MIN(MAX(s->cluster_size, BLOCK_COPY_MAX_COPY_RANGE), 171 s->max_transfer); 172 default: 173 /* Cannot have COPY_WRITE_ZEROES here. */ 174 abort(); 175 } 176 } 177 178 /* 179 * Search for the first dirty area in offset/bytes range and create task at 180 * the beginning of it. 181 */ 182 static coroutine_fn BlockCopyTask * 183 block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state, 184 int64_t offset, int64_t bytes) 185 { 186 BlockCopyTask *task; 187 int64_t max_chunk; 188 189 QEMU_LOCK_GUARD(&s->lock); 190 max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk); 191 if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap, 192 offset, offset + bytes, 193 max_chunk, &offset, &bytes)) 194 { 195 return NULL; 196 } 197 198 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 199 bytes = QEMU_ALIGN_UP(bytes, s->cluster_size); 200 201 /* region is dirty, so no existent tasks possible in it */ 202 assert(!reqlist_find_conflict(&s->reqs, offset, bytes)); 203 204 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); 205 s->in_flight_bytes += bytes; 206 207 task = g_new(BlockCopyTask, 1); 208 *task = (BlockCopyTask) { 209 .task.func = block_copy_task_entry, 210 .s = s, 211 .call_state = call_state, 212 .method = s->method, 213 }; 214 reqlist_init_req(&s->reqs, &task->req, offset, bytes); 215 216 return task; 217 } 218 219 /* 220 * block_copy_task_shrink 221 * 222 * Drop the tail of the task to be handled later. Set dirty bits back and 223 * wake up all tasks waiting for us (may be some of them are not intersecting 224 * with shrunk task) 225 */ 226 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task, 227 int64_t new_bytes) 228 { 229 QEMU_LOCK_GUARD(&task->s->lock); 230 if (new_bytes == task->req.bytes) { 231 return; 232 } 233 234 assert(new_bytes > 0 && new_bytes < task->req.bytes); 235 236 task->s->in_flight_bytes -= task->req.bytes - new_bytes; 237 bdrv_set_dirty_bitmap(task->s->copy_bitmap, 238 task->req.offset + new_bytes, 239 task->req.bytes - new_bytes); 240 241 reqlist_shrink_req(&task->req, new_bytes); 242 } 243 244 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret) 245 { 246 QEMU_LOCK_GUARD(&task->s->lock); 247 task->s->in_flight_bytes -= task->req.bytes; 248 if (ret < 0) { 249 bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->req.offset, 250 task->req.bytes); 251 } 252 if (task->s->progress) { 253 progress_set_remaining(task->s->progress, 254 bdrv_get_dirty_count(task->s->copy_bitmap) + 255 task->s->in_flight_bytes); 256 } 257 reqlist_remove_req(&task->req); 258 } 259 260 void block_copy_state_free(BlockCopyState *s) 261 { 262 if (!s) { 263 return; 264 } 265 266 ratelimit_destroy(&s->rate_limit); 267 bdrv_release_dirty_bitmap(s->copy_bitmap); 268 shres_destroy(s->mem); 269 g_free(s); 270 } 271 272 static uint32_t block_copy_max_transfer(BdrvChild *source, BdrvChild *target) 273 { 274 return MIN_NON_ZERO(INT_MAX, 275 MIN_NON_ZERO(source->bs->bl.max_transfer, 276 target->bs->bl.max_transfer)); 277 } 278 279 void block_copy_set_copy_opts(BlockCopyState *s, bool use_copy_range, 280 bool compress) 281 { 282 /* Keep BDRV_REQ_SERIALISING set (or not set) in block_copy_state_new() */ 283 s->write_flags = (s->write_flags & BDRV_REQ_SERIALISING) | 284 (compress ? BDRV_REQ_WRITE_COMPRESSED : 0); 285 286 if (s->max_transfer < s->cluster_size) { 287 /* 288 * copy_range does not respect max_transfer. We don't want to bother 289 * with requests smaller than block-copy cluster size, so fallback to 290 * buffered copying (read and write respect max_transfer on their 291 * behalf). 292 */ 293 s->method = COPY_READ_WRITE_CLUSTER; 294 } else if (compress) { 295 /* Compression supports only cluster-size writes and no copy-range. */ 296 s->method = COPY_READ_WRITE_CLUSTER; 297 } else { 298 /* 299 * If copy range enabled, start with COPY_RANGE_SMALL, until first 300 * successful copy_range (look at block_copy_do_copy). 301 */ 302 s->method = use_copy_range ? COPY_RANGE_SMALL : COPY_READ_WRITE; 303 } 304 } 305 306 static int64_t block_copy_calculate_cluster_size(BlockDriverState *target, 307 Error **errp) 308 { 309 int ret; 310 BlockDriverInfo bdi; 311 bool target_does_cow = bdrv_backing_chain_next(target); 312 313 /* 314 * If there is no backing file on the target, we cannot rely on COW if our 315 * backup cluster size is smaller than the target cluster size. Even for 316 * targets with a backing file, try to avoid COW if possible. 317 */ 318 ret = bdrv_get_info(target, &bdi); 319 if (ret == -ENOTSUP && !target_does_cow) { 320 /* Cluster size is not defined */ 321 warn_report("The target block device doesn't provide " 322 "information about the block size and it doesn't have a " 323 "backing file. The default block size of %u bytes is " 324 "used. If the actual block size of the target exceeds " 325 "this default, the backup may be unusable", 326 BLOCK_COPY_CLUSTER_SIZE_DEFAULT); 327 return BLOCK_COPY_CLUSTER_SIZE_DEFAULT; 328 } else if (ret < 0 && !target_does_cow) { 329 error_setg_errno(errp, -ret, 330 "Couldn't determine the cluster size of the target image, " 331 "which has no backing file"); 332 error_append_hint(errp, 333 "Aborting, since this may create an unusable destination image\n"); 334 return ret; 335 } else if (ret < 0 && target_does_cow) { 336 /* Not fatal; just trudge on ahead. */ 337 return BLOCK_COPY_CLUSTER_SIZE_DEFAULT; 338 } 339 340 return MAX(BLOCK_COPY_CLUSTER_SIZE_DEFAULT, bdi.cluster_size); 341 } 342 343 BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target, 344 const BdrvDirtyBitmap *bitmap, 345 Error **errp) 346 { 347 ERRP_GUARD(); 348 BlockCopyState *s; 349 int64_t cluster_size; 350 BdrvDirtyBitmap *copy_bitmap; 351 bool is_fleecing; 352 353 cluster_size = block_copy_calculate_cluster_size(target->bs, errp); 354 if (cluster_size < 0) { 355 return NULL; 356 } 357 358 copy_bitmap = bdrv_create_dirty_bitmap(source->bs, cluster_size, NULL, 359 errp); 360 if (!copy_bitmap) { 361 return NULL; 362 } 363 bdrv_disable_dirty_bitmap(copy_bitmap); 364 if (bitmap) { 365 if (!bdrv_merge_dirty_bitmap(copy_bitmap, bitmap, NULL, errp)) { 366 error_prepend(errp, "Failed to merge bitmap '%s' to internal " 367 "copy-bitmap: ", bdrv_dirty_bitmap_name(bitmap)); 368 bdrv_release_dirty_bitmap(copy_bitmap); 369 return NULL; 370 } 371 } else { 372 bdrv_set_dirty_bitmap(copy_bitmap, 0, 373 bdrv_dirty_bitmap_size(copy_bitmap)); 374 } 375 376 /* 377 * If source is in backing chain of target assume that target is going to be 378 * used for "image fleecing", i.e. it should represent a kind of snapshot of 379 * source at backup-start point in time. And target is going to be read by 380 * somebody (for example, used as NBD export) during backup job. 381 * 382 * In this case, we need to add BDRV_REQ_SERIALISING write flag to avoid 383 * intersection of backup writes and third party reads from target, 384 * otherwise reading from target we may occasionally read already updated by 385 * guest data. 386 * 387 * For more information see commit f8d59dfb40bb and test 388 * tests/qemu-iotests/222 389 */ 390 is_fleecing = bdrv_chain_contains(target->bs, source->bs); 391 392 s = g_new(BlockCopyState, 1); 393 *s = (BlockCopyState) { 394 .source = source, 395 .target = target, 396 .copy_bitmap = copy_bitmap, 397 .cluster_size = cluster_size, 398 .len = bdrv_dirty_bitmap_size(copy_bitmap), 399 .write_flags = (is_fleecing ? BDRV_REQ_SERIALISING : 0), 400 .mem = shres_create(BLOCK_COPY_MAX_MEM), 401 .max_transfer = QEMU_ALIGN_DOWN( 402 block_copy_max_transfer(source, target), 403 cluster_size), 404 }; 405 406 block_copy_set_copy_opts(s, false, false); 407 408 ratelimit_init(&s->rate_limit); 409 qemu_co_mutex_init(&s->lock); 410 QLIST_INIT(&s->reqs); 411 QLIST_INIT(&s->calls); 412 413 return s; 414 } 415 416 /* Only set before running the job, no need for locking. */ 417 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm) 418 { 419 s->progress = pm; 420 } 421 422 /* 423 * Takes ownership of @task 424 * 425 * If pool is NULL directly run the task, otherwise schedule it into the pool. 426 * 427 * Returns: task.func return code if pool is NULL 428 * otherwise -ECANCELED if pool status is bad 429 * otherwise 0 (successfully scheduled) 430 */ 431 static coroutine_fn int block_copy_task_run(AioTaskPool *pool, 432 BlockCopyTask *task) 433 { 434 if (!pool) { 435 int ret = task->task.func(&task->task); 436 437 g_free(task); 438 return ret; 439 } 440 441 aio_task_pool_wait_slot(pool); 442 if (aio_task_pool_status(pool) < 0) { 443 co_put_to_shres(task->s->mem, task->req.bytes); 444 block_copy_task_end(task, -ECANCELED); 445 g_free(task); 446 return -ECANCELED; 447 } 448 449 aio_task_pool_start_task(pool, &task->task); 450 451 return 0; 452 } 453 454 /* 455 * block_copy_do_copy 456 * 457 * Do copy of cluster-aligned chunk. Requested region is allowed to exceed 458 * s->len only to cover last cluster when s->len is not aligned to clusters. 459 * 460 * No sync here: nor bitmap neighter intersecting requests handling, only copy. 461 * 462 * @method is an in-out argument, so that copy_range can be either extended to 463 * a full-size buffer or disabled if the copy_range attempt fails. The output 464 * value of @method should be used for subsequent tasks. 465 * Returns 0 on success. 466 */ 467 static int coroutine_fn block_copy_do_copy(BlockCopyState *s, 468 int64_t offset, int64_t bytes, 469 BlockCopyMethod *method, 470 bool *error_is_read) 471 { 472 int ret; 473 int64_t nbytes = MIN(offset + bytes, s->len) - offset; 474 void *bounce_buffer = NULL; 475 476 assert(offset >= 0 && bytes > 0 && INT64_MAX - offset >= bytes); 477 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 478 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); 479 assert(offset < s->len); 480 assert(offset + bytes <= s->len || 481 offset + bytes == QEMU_ALIGN_UP(s->len, s->cluster_size)); 482 assert(nbytes < INT_MAX); 483 484 switch (*method) { 485 case COPY_WRITE_ZEROES: 486 ret = bdrv_co_pwrite_zeroes(s->target, offset, nbytes, s->write_flags & 487 ~BDRV_REQ_WRITE_COMPRESSED); 488 if (ret < 0) { 489 trace_block_copy_write_zeroes_fail(s, offset, ret); 490 *error_is_read = false; 491 } 492 return ret; 493 494 case COPY_RANGE_SMALL: 495 case COPY_RANGE_FULL: 496 ret = bdrv_co_copy_range(s->source, offset, s->target, offset, nbytes, 497 0, s->write_flags); 498 if (ret >= 0) { 499 /* Successful copy-range, increase chunk size. */ 500 *method = COPY_RANGE_FULL; 501 return 0; 502 } 503 504 trace_block_copy_copy_range_fail(s, offset, ret); 505 *method = COPY_READ_WRITE; 506 /* Fall through to read+write with allocated buffer */ 507 508 case COPY_READ_WRITE_CLUSTER: 509 case COPY_READ_WRITE: 510 /* 511 * In case of failed copy_range request above, we may proceed with 512 * buffered request larger than BLOCK_COPY_MAX_BUFFER. 513 * Still, further requests will be properly limited, so don't care too 514 * much. Moreover the most likely case (copy_range is unsupported for 515 * the configuration, so the very first copy_range request fails) 516 * is handled by setting large copy_size only after first successful 517 * copy_range. 518 */ 519 520 bounce_buffer = qemu_blockalign(s->source->bs, nbytes); 521 522 ret = bdrv_co_pread(s->source, offset, nbytes, bounce_buffer, 0); 523 if (ret < 0) { 524 trace_block_copy_read_fail(s, offset, ret); 525 *error_is_read = true; 526 goto out; 527 } 528 529 ret = bdrv_co_pwrite(s->target, offset, nbytes, bounce_buffer, 530 s->write_flags); 531 if (ret < 0) { 532 trace_block_copy_write_fail(s, offset, ret); 533 *error_is_read = false; 534 goto out; 535 } 536 537 out: 538 qemu_vfree(bounce_buffer); 539 break; 540 541 default: 542 abort(); 543 } 544 545 return ret; 546 } 547 548 static coroutine_fn int block_copy_task_entry(AioTask *task) 549 { 550 BlockCopyTask *t = container_of(task, BlockCopyTask, task); 551 BlockCopyState *s = t->s; 552 bool error_is_read = false; 553 BlockCopyMethod method = t->method; 554 int ret; 555 556 ret = block_copy_do_copy(s, t->req.offset, t->req.bytes, &method, 557 &error_is_read); 558 559 WITH_QEMU_LOCK_GUARD(&s->lock) { 560 if (s->method == t->method) { 561 s->method = method; 562 } 563 564 if (ret < 0) { 565 if (!t->call_state->ret) { 566 t->call_state->ret = ret; 567 t->call_state->error_is_read = error_is_read; 568 } 569 } else if (s->progress) { 570 progress_work_done(s->progress, t->req.bytes); 571 } 572 } 573 co_put_to_shres(s->mem, t->req.bytes); 574 block_copy_task_end(t, ret); 575 576 return ret; 577 } 578 579 static int block_copy_block_status(BlockCopyState *s, int64_t offset, 580 int64_t bytes, int64_t *pnum) 581 { 582 int64_t num; 583 BlockDriverState *base; 584 int ret; 585 586 if (qatomic_read(&s->skip_unallocated)) { 587 base = bdrv_backing_chain_next(s->source->bs); 588 } else { 589 base = NULL; 590 } 591 592 ret = bdrv_block_status_above(s->source->bs, base, offset, bytes, &num, 593 NULL, NULL); 594 if (ret < 0 || num < s->cluster_size) { 595 /* 596 * On error or if failed to obtain large enough chunk just fallback to 597 * copy one cluster. 598 */ 599 num = s->cluster_size; 600 ret = BDRV_BLOCK_ALLOCATED | BDRV_BLOCK_DATA; 601 } else if (offset + num == s->len) { 602 num = QEMU_ALIGN_UP(num, s->cluster_size); 603 } else { 604 num = QEMU_ALIGN_DOWN(num, s->cluster_size); 605 } 606 607 *pnum = num; 608 return ret; 609 } 610 611 /* 612 * Check if the cluster starting at offset is allocated or not. 613 * return via pnum the number of contiguous clusters sharing this allocation. 614 */ 615 static int block_copy_is_cluster_allocated(BlockCopyState *s, int64_t offset, 616 int64_t *pnum) 617 { 618 BlockDriverState *bs = s->source->bs; 619 int64_t count, total_count = 0; 620 int64_t bytes = s->len - offset; 621 int ret; 622 623 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 624 625 while (true) { 626 ret = bdrv_is_allocated(bs, offset, bytes, &count); 627 if (ret < 0) { 628 return ret; 629 } 630 631 total_count += count; 632 633 if (ret || count == 0) { 634 /* 635 * ret: partial segment(s) are considered allocated. 636 * otherwise: unallocated tail is treated as an entire segment. 637 */ 638 *pnum = DIV_ROUND_UP(total_count, s->cluster_size); 639 return ret; 640 } 641 642 /* Unallocated segment(s) with uncertain following segment(s) */ 643 if (total_count >= s->cluster_size) { 644 *pnum = total_count / s->cluster_size; 645 return 0; 646 } 647 648 offset += count; 649 bytes -= count; 650 } 651 } 652 653 void block_copy_reset(BlockCopyState *s, int64_t offset, int64_t bytes) 654 { 655 QEMU_LOCK_GUARD(&s->lock); 656 657 bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes); 658 if (s->progress) { 659 progress_set_remaining(s->progress, 660 bdrv_get_dirty_count(s->copy_bitmap) + 661 s->in_flight_bytes); 662 } 663 } 664 665 /* 666 * Reset bits in copy_bitmap starting at offset if they represent unallocated 667 * data in the image. May reset subsequent contiguous bits. 668 * @return 0 when the cluster at @offset was unallocated, 669 * 1 otherwise, and -ret on error. 670 */ 671 int64_t block_copy_reset_unallocated(BlockCopyState *s, 672 int64_t offset, int64_t *count) 673 { 674 int ret; 675 int64_t clusters, bytes; 676 677 ret = block_copy_is_cluster_allocated(s, offset, &clusters); 678 if (ret < 0) { 679 return ret; 680 } 681 682 bytes = clusters * s->cluster_size; 683 684 if (!ret) { 685 block_copy_reset(s, offset, bytes); 686 } 687 688 *count = bytes; 689 return ret; 690 } 691 692 /* 693 * block_copy_dirty_clusters 694 * 695 * Copy dirty clusters in @offset/@bytes range. 696 * Returns 1 if dirty clusters found and successfully copied, 0 if no dirty 697 * clusters found and -errno on failure. 698 */ 699 static int coroutine_fn 700 block_copy_dirty_clusters(BlockCopyCallState *call_state) 701 { 702 BlockCopyState *s = call_state->s; 703 int64_t offset = call_state->offset; 704 int64_t bytes = call_state->bytes; 705 706 int ret = 0; 707 bool found_dirty = false; 708 int64_t end = offset + bytes; 709 AioTaskPool *aio = NULL; 710 711 /* 712 * block_copy() user is responsible for keeping source and target in same 713 * aio context 714 */ 715 assert(bdrv_get_aio_context(s->source->bs) == 716 bdrv_get_aio_context(s->target->bs)); 717 718 assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); 719 assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); 720 721 while (bytes && aio_task_pool_status(aio) == 0 && 722 !qatomic_read(&call_state->cancelled)) { 723 BlockCopyTask *task; 724 int64_t status_bytes; 725 726 task = block_copy_task_create(s, call_state, offset, bytes); 727 if (!task) { 728 /* No more dirty bits in the bitmap */ 729 trace_block_copy_skip_range(s, offset, bytes); 730 break; 731 } 732 if (task->req.offset > offset) { 733 trace_block_copy_skip_range(s, offset, task->req.offset - offset); 734 } 735 736 found_dirty = true; 737 738 ret = block_copy_block_status(s, task->req.offset, task->req.bytes, 739 &status_bytes); 740 assert(ret >= 0); /* never fail */ 741 if (status_bytes < task->req.bytes) { 742 block_copy_task_shrink(task, status_bytes); 743 } 744 if (qatomic_read(&s->skip_unallocated) && 745 !(ret & BDRV_BLOCK_ALLOCATED)) { 746 block_copy_task_end(task, 0); 747 trace_block_copy_skip_range(s, task->req.offset, task->req.bytes); 748 offset = task_end(task); 749 bytes = end - offset; 750 g_free(task); 751 continue; 752 } 753 if (ret & BDRV_BLOCK_ZERO) { 754 task->method = COPY_WRITE_ZEROES; 755 } 756 757 if (!call_state->ignore_ratelimit) { 758 uint64_t ns = ratelimit_calculate_delay(&s->rate_limit, 0); 759 if (ns > 0) { 760 block_copy_task_end(task, -EAGAIN); 761 g_free(task); 762 qemu_co_sleep_ns_wakeable(&call_state->sleep, 763 QEMU_CLOCK_REALTIME, ns); 764 continue; 765 } 766 } 767 768 ratelimit_calculate_delay(&s->rate_limit, task->req.bytes); 769 770 trace_block_copy_process(s, task->req.offset); 771 772 co_get_from_shres(s->mem, task->req.bytes); 773 774 offset = task_end(task); 775 bytes = end - offset; 776 777 if (!aio && bytes) { 778 aio = aio_task_pool_new(call_state->max_workers); 779 } 780 781 ret = block_copy_task_run(aio, task); 782 if (ret < 0) { 783 goto out; 784 } 785 } 786 787 out: 788 if (aio) { 789 aio_task_pool_wait_all(aio); 790 791 /* 792 * We are not really interested in -ECANCELED returned from 793 * block_copy_task_run. If it fails, it means some task already failed 794 * for real reason, let's return first failure. 795 * Still, assert that we don't rewrite failure by success. 796 * 797 * Note: ret may be positive here because of block-status result. 798 */ 799 assert(ret >= 0 || aio_task_pool_status(aio) < 0); 800 ret = aio_task_pool_status(aio); 801 802 aio_task_pool_free(aio); 803 } 804 805 return ret < 0 ? ret : found_dirty; 806 } 807 808 void block_copy_kick(BlockCopyCallState *call_state) 809 { 810 qemu_co_sleep_wake(&call_state->sleep); 811 } 812 813 /* 814 * block_copy_common 815 * 816 * Copy requested region, accordingly to dirty bitmap. 817 * Collaborate with parallel block_copy requests: if they succeed it will help 818 * us. If they fail, we will retry not-copied regions. So, if we return error, 819 * it means that some I/O operation failed in context of _this_ block_copy call, 820 * not some parallel operation. 821 */ 822 static int coroutine_fn block_copy_common(BlockCopyCallState *call_state) 823 { 824 int ret; 825 BlockCopyState *s = call_state->s; 826 827 qemu_co_mutex_lock(&s->lock); 828 QLIST_INSERT_HEAD(&s->calls, call_state, list); 829 qemu_co_mutex_unlock(&s->lock); 830 831 do { 832 ret = block_copy_dirty_clusters(call_state); 833 834 if (ret == 0 && !qatomic_read(&call_state->cancelled)) { 835 WITH_QEMU_LOCK_GUARD(&s->lock) { 836 /* 837 * Check that there is no task we still need to 838 * wait to complete 839 */ 840 ret = reqlist_wait_one(&s->reqs, call_state->offset, 841 call_state->bytes, &s->lock); 842 if (ret == 0) { 843 /* 844 * No pending tasks, but check again the bitmap in this 845 * same critical section, since a task might have failed 846 * between this and the critical section in 847 * block_copy_dirty_clusters(). 848 * 849 * reqlist_wait_one return value 0 also means that it 850 * didn't release the lock. So, we are still in the same 851 * critical section, not interrupted by any concurrent 852 * access to state. 853 */ 854 ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap, 855 call_state->offset, 856 call_state->bytes) >= 0; 857 } 858 } 859 } 860 861 /* 862 * We retry in two cases: 863 * 1. Some progress done 864 * Something was copied, which means that there were yield points 865 * and some new dirty bits may have appeared (due to failed parallel 866 * block-copy requests). 867 * 2. We have waited for some intersecting block-copy request 868 * It may have failed and produced new dirty bits. 869 */ 870 } while (ret > 0 && !qatomic_read(&call_state->cancelled)); 871 872 qatomic_store_release(&call_state->finished, true); 873 874 if (call_state->cb) { 875 call_state->cb(call_state->cb_opaque); 876 } 877 878 qemu_co_mutex_lock(&s->lock); 879 QLIST_REMOVE(call_state, list); 880 qemu_co_mutex_unlock(&s->lock); 881 882 return ret; 883 } 884 885 int coroutine_fn block_copy(BlockCopyState *s, int64_t start, int64_t bytes, 886 bool ignore_ratelimit) 887 { 888 BlockCopyCallState call_state = { 889 .s = s, 890 .offset = start, 891 .bytes = bytes, 892 .ignore_ratelimit = ignore_ratelimit, 893 .max_workers = BLOCK_COPY_MAX_WORKERS, 894 }; 895 896 return block_copy_common(&call_state); 897 } 898 899 static void coroutine_fn block_copy_async_co_entry(void *opaque) 900 { 901 block_copy_common(opaque); 902 } 903 904 BlockCopyCallState *block_copy_async(BlockCopyState *s, 905 int64_t offset, int64_t bytes, 906 int max_workers, int64_t max_chunk, 907 BlockCopyAsyncCallbackFunc cb, 908 void *cb_opaque) 909 { 910 BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1); 911 912 *call_state = (BlockCopyCallState) { 913 .s = s, 914 .offset = offset, 915 .bytes = bytes, 916 .max_workers = max_workers, 917 .max_chunk = max_chunk, 918 .cb = cb, 919 .cb_opaque = cb_opaque, 920 921 .co = qemu_coroutine_create(block_copy_async_co_entry, call_state), 922 }; 923 924 qemu_coroutine_enter(call_state->co); 925 926 return call_state; 927 } 928 929 void block_copy_call_free(BlockCopyCallState *call_state) 930 { 931 if (!call_state) { 932 return; 933 } 934 935 assert(qatomic_read(&call_state->finished)); 936 g_free(call_state); 937 } 938 939 bool block_copy_call_finished(BlockCopyCallState *call_state) 940 { 941 return qatomic_read(&call_state->finished); 942 } 943 944 bool block_copy_call_succeeded(BlockCopyCallState *call_state) 945 { 946 return qatomic_load_acquire(&call_state->finished) && 947 !qatomic_read(&call_state->cancelled) && 948 call_state->ret == 0; 949 } 950 951 bool block_copy_call_failed(BlockCopyCallState *call_state) 952 { 953 return qatomic_load_acquire(&call_state->finished) && 954 !qatomic_read(&call_state->cancelled) && 955 call_state->ret < 0; 956 } 957 958 bool block_copy_call_cancelled(BlockCopyCallState *call_state) 959 { 960 return qatomic_read(&call_state->cancelled); 961 } 962 963 int block_copy_call_status(BlockCopyCallState *call_state, bool *error_is_read) 964 { 965 assert(qatomic_load_acquire(&call_state->finished)); 966 if (error_is_read) { 967 *error_is_read = call_state->error_is_read; 968 } 969 return call_state->ret; 970 } 971 972 /* 973 * Note that cancelling and finishing are racy. 974 * User can cancel a block-copy that is already finished. 975 */ 976 void block_copy_call_cancel(BlockCopyCallState *call_state) 977 { 978 qatomic_set(&call_state->cancelled, true); 979 block_copy_kick(call_state); 980 } 981 982 BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s) 983 { 984 return s->copy_bitmap; 985 } 986 987 int64_t block_copy_cluster_size(BlockCopyState *s) 988 { 989 return s->cluster_size; 990 } 991 992 void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip) 993 { 994 qatomic_set(&s->skip_unallocated, skip); 995 } 996 997 void block_copy_set_speed(BlockCopyState *s, uint64_t speed) 998 { 999 ratelimit_set_speed(&s->rate_limit, speed, BLOCK_COPY_SLICE_TIME); 1000 1001 /* 1002 * Note: it's good to kick all call states from here, but it should be done 1003 * only from a coroutine, to not crash if s->calls list changed while 1004 * entering one call. So for now, the only user of this function kicks its 1005 * only one call_state by hand. 1006 */ 1007 } 1008