1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 49 mempool_t job_pool; 50 51 struct workqueue_struct *kcopyd_wq; 52 struct work_struct kcopyd_work; 53 54 struct dm_kcopyd_throttle *throttle; 55 56 atomic_t nr_jobs; 57 58 /* 59 * We maintain four lists of jobs: 60 * 61 * i) jobs waiting for pages 62 * ii) jobs that have pages, and are waiting for the io to be issued. 63 * iii) jobs that don't need to do any IO and just run a callback 64 * iv) jobs that have completed. 65 * 66 * All four of these are protected by job_lock. 67 */ 68 spinlock_t job_lock; 69 struct list_head callback_jobs; 70 struct list_head complete_jobs; 71 struct list_head io_jobs; 72 struct list_head pages_jobs; 73 }; 74 75 static struct page_list zero_page_list; 76 77 static DEFINE_SPINLOCK(throttle_spinlock); 78 79 /* 80 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 81 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 82 * by 2. 83 */ 84 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 85 86 /* 87 * Sleep this number of milliseconds. 88 * 89 * The value was decided experimentally. 90 * Smaller values seem to cause an increased copy rate above the limit. 91 * The reason for this is unknown but possibly due to jiffies rounding errors 92 * or read/write cache inside the disk. 93 */ 94 #define SLEEP_MSEC 100 95 96 /* 97 * Maximum number of sleep events. There is a theoretical livelock if more 98 * kcopyd clients do work simultaneously which this limit avoids. 99 */ 100 #define MAX_SLEEPS 10 101 102 static void io_job_start(struct dm_kcopyd_throttle *t) 103 { 104 unsigned throttle, now, difference; 105 int slept = 0, skew; 106 107 if (unlikely(!t)) 108 return; 109 110 try_again: 111 spin_lock_irq(&throttle_spinlock); 112 113 throttle = READ_ONCE(t->throttle); 114 115 if (likely(throttle >= 100)) 116 goto skip_limit; 117 118 now = jiffies; 119 difference = now - t->last_jiffies; 120 t->last_jiffies = now; 121 if (t->num_io_jobs) 122 t->io_period += difference; 123 t->total_period += difference; 124 125 /* 126 * Maintain sane values if we got a temporary overflow. 127 */ 128 if (unlikely(t->io_period > t->total_period)) 129 t->io_period = t->total_period; 130 131 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 132 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 133 t->total_period >>= shift; 134 t->io_period >>= shift; 135 } 136 137 skew = t->io_period - throttle * t->total_period / 100; 138 139 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 140 slept++; 141 spin_unlock_irq(&throttle_spinlock); 142 msleep(SLEEP_MSEC); 143 goto try_again; 144 } 145 146 skip_limit: 147 t->num_io_jobs++; 148 149 spin_unlock_irq(&throttle_spinlock); 150 } 151 152 static void io_job_finish(struct dm_kcopyd_throttle *t) 153 { 154 unsigned long flags; 155 156 if (unlikely(!t)) 157 return; 158 159 spin_lock_irqsave(&throttle_spinlock, flags); 160 161 t->num_io_jobs--; 162 163 if (likely(READ_ONCE(t->throttle) >= 100)) 164 goto skip_limit; 165 166 if (!t->num_io_jobs) { 167 unsigned now, difference; 168 169 now = jiffies; 170 difference = now - t->last_jiffies; 171 t->last_jiffies = now; 172 173 t->io_period += difference; 174 t->total_period += difference; 175 176 /* 177 * Maintain sane values if we got a temporary overflow. 178 */ 179 if (unlikely(t->io_period > t->total_period)) 180 t->io_period = t->total_period; 181 } 182 183 skip_limit: 184 spin_unlock_irqrestore(&throttle_spinlock, flags); 185 } 186 187 188 static void wake(struct dm_kcopyd_client *kc) 189 { 190 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 191 } 192 193 /* 194 * Obtain one page for the use of kcopyd. 195 */ 196 static struct page_list *alloc_pl(gfp_t gfp) 197 { 198 struct page_list *pl; 199 200 pl = kmalloc(sizeof(*pl), gfp); 201 if (!pl) 202 return NULL; 203 204 pl->page = alloc_page(gfp); 205 if (!pl->page) { 206 kfree(pl); 207 return NULL; 208 } 209 210 return pl; 211 } 212 213 static void free_pl(struct page_list *pl) 214 { 215 __free_page(pl->page); 216 kfree(pl); 217 } 218 219 /* 220 * Add the provided pages to a client's free page list, releasing 221 * back to the system any beyond the reserved_pages limit. 222 */ 223 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 224 { 225 struct page_list *next; 226 227 do { 228 next = pl->next; 229 230 if (kc->nr_free_pages >= kc->nr_reserved_pages) 231 free_pl(pl); 232 else { 233 pl->next = kc->pages; 234 kc->pages = pl; 235 kc->nr_free_pages++; 236 } 237 238 pl = next; 239 } while (pl); 240 } 241 242 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 243 unsigned int nr, struct page_list **pages) 244 { 245 struct page_list *pl; 246 247 *pages = NULL; 248 249 do { 250 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 251 if (unlikely(!pl)) { 252 /* Use reserved pages */ 253 pl = kc->pages; 254 if (unlikely(!pl)) 255 goto out_of_memory; 256 kc->pages = pl->next; 257 kc->nr_free_pages--; 258 } 259 pl->next = *pages; 260 *pages = pl; 261 } while (--nr); 262 263 return 0; 264 265 out_of_memory: 266 if (*pages) 267 kcopyd_put_pages(kc, *pages); 268 return -ENOMEM; 269 } 270 271 /* 272 * These three functions resize the page pool. 273 */ 274 static void drop_pages(struct page_list *pl) 275 { 276 struct page_list *next; 277 278 while (pl) { 279 next = pl->next; 280 free_pl(pl); 281 pl = next; 282 } 283 } 284 285 /* 286 * Allocate and reserve nr_pages for the use of a specific client. 287 */ 288 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 289 { 290 unsigned i; 291 struct page_list *pl = NULL, *next; 292 293 for (i = 0; i < nr_pages; i++) { 294 next = alloc_pl(GFP_KERNEL); 295 if (!next) { 296 if (pl) 297 drop_pages(pl); 298 return -ENOMEM; 299 } 300 next->next = pl; 301 pl = next; 302 } 303 304 kc->nr_reserved_pages += nr_pages; 305 kcopyd_put_pages(kc, pl); 306 307 return 0; 308 } 309 310 static void client_free_pages(struct dm_kcopyd_client *kc) 311 { 312 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 313 drop_pages(kc->pages); 314 kc->pages = NULL; 315 kc->nr_free_pages = kc->nr_reserved_pages = 0; 316 } 317 318 /*----------------------------------------------------------------- 319 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 320 * for this reason we use a mempool to prevent the client from 321 * ever having to do io (which could cause a deadlock). 322 *---------------------------------------------------------------*/ 323 struct kcopyd_job { 324 struct dm_kcopyd_client *kc; 325 struct list_head list; 326 unsigned long flags; 327 328 /* 329 * Error state of the job. 330 */ 331 int read_err; 332 unsigned long write_err; 333 334 /* 335 * Either READ or WRITE 336 */ 337 int rw; 338 struct dm_io_region source; 339 340 /* 341 * The destinations for the transfer. 342 */ 343 unsigned int num_dests; 344 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 345 346 struct page_list *pages; 347 348 /* 349 * Set this to ensure you are notified when the job has 350 * completed. 'context' is for callback to use. 351 */ 352 dm_kcopyd_notify_fn fn; 353 void *context; 354 355 /* 356 * These fields are only used if the job has been split 357 * into more manageable parts. 358 */ 359 struct mutex lock; 360 atomic_t sub_jobs; 361 sector_t progress; 362 sector_t write_offset; 363 364 struct kcopyd_job *master_job; 365 }; 366 367 static struct kmem_cache *_job_cache; 368 369 int __init dm_kcopyd_init(void) 370 { 371 _job_cache = kmem_cache_create("kcopyd_job", 372 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 373 __alignof__(struct kcopyd_job), 0, NULL); 374 if (!_job_cache) 375 return -ENOMEM; 376 377 zero_page_list.next = &zero_page_list; 378 zero_page_list.page = ZERO_PAGE(0); 379 380 return 0; 381 } 382 383 void dm_kcopyd_exit(void) 384 { 385 kmem_cache_destroy(_job_cache); 386 _job_cache = NULL; 387 } 388 389 /* 390 * Functions to push and pop a job onto the head of a given job 391 * list. 392 */ 393 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 394 struct dm_kcopyd_client *kc) 395 { 396 struct kcopyd_job *job; 397 398 /* 399 * For I/O jobs, pop any read, any write without sequential write 400 * constraint and sequential writes that are at the right position. 401 */ 402 list_for_each_entry(job, jobs, list) { 403 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 404 list_del(&job->list); 405 return job; 406 } 407 408 if (job->write_offset == job->master_job->write_offset) { 409 job->master_job->write_offset += job->source.count; 410 list_del(&job->list); 411 return job; 412 } 413 } 414 415 return NULL; 416 } 417 418 static struct kcopyd_job *pop(struct list_head *jobs, 419 struct dm_kcopyd_client *kc) 420 { 421 struct kcopyd_job *job = NULL; 422 unsigned long flags; 423 424 spin_lock_irqsave(&kc->job_lock, flags); 425 426 if (!list_empty(jobs)) { 427 if (jobs == &kc->io_jobs) 428 job = pop_io_job(jobs, kc); 429 else { 430 job = list_entry(jobs->next, struct kcopyd_job, list); 431 list_del(&job->list); 432 } 433 } 434 spin_unlock_irqrestore(&kc->job_lock, flags); 435 436 return job; 437 } 438 439 static void push(struct list_head *jobs, struct kcopyd_job *job) 440 { 441 unsigned long flags; 442 struct dm_kcopyd_client *kc = job->kc; 443 444 spin_lock_irqsave(&kc->job_lock, flags); 445 list_add_tail(&job->list, jobs); 446 spin_unlock_irqrestore(&kc->job_lock, flags); 447 } 448 449 450 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 451 { 452 unsigned long flags; 453 struct dm_kcopyd_client *kc = job->kc; 454 455 spin_lock_irqsave(&kc->job_lock, flags); 456 list_add(&job->list, jobs); 457 spin_unlock_irqrestore(&kc->job_lock, flags); 458 } 459 460 /* 461 * These three functions process 1 item from the corresponding 462 * job list. 463 * 464 * They return: 465 * < 0: error 466 * 0: success 467 * > 0: can't process yet. 468 */ 469 static int run_complete_job(struct kcopyd_job *job) 470 { 471 void *context = job->context; 472 int read_err = job->read_err; 473 unsigned long write_err = job->write_err; 474 dm_kcopyd_notify_fn fn = job->fn; 475 struct dm_kcopyd_client *kc = job->kc; 476 477 if (job->pages && job->pages != &zero_page_list) 478 kcopyd_put_pages(kc, job->pages); 479 /* 480 * If this is the master job, the sub jobs have already 481 * completed so we can free everything. 482 */ 483 if (job->master_job == job) { 484 mutex_destroy(&job->lock); 485 mempool_free(job, &kc->job_pool); 486 } 487 fn(read_err, write_err, context); 488 489 if (atomic_dec_and_test(&kc->nr_jobs)) 490 wake_up(&kc->destroyq); 491 492 cond_resched(); 493 494 return 0; 495 } 496 497 static void complete_io(unsigned long error, void *context) 498 { 499 struct kcopyd_job *job = (struct kcopyd_job *) context; 500 struct dm_kcopyd_client *kc = job->kc; 501 502 io_job_finish(kc->throttle); 503 504 if (error) { 505 if (op_is_write(job->rw)) 506 job->write_err |= error; 507 else 508 job->read_err = 1; 509 510 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 511 push(&kc->complete_jobs, job); 512 wake(kc); 513 return; 514 } 515 } 516 517 if (op_is_write(job->rw)) 518 push(&kc->complete_jobs, job); 519 520 else { 521 job->rw = WRITE; 522 push(&kc->io_jobs, job); 523 } 524 525 wake(kc); 526 } 527 528 /* 529 * Request io on as many buffer heads as we can currently get for 530 * a particular job. 531 */ 532 static int run_io_job(struct kcopyd_job *job) 533 { 534 int r; 535 struct dm_io_request io_req = { 536 .bi_op = job->rw, 537 .bi_op_flags = 0, 538 .mem.type = DM_IO_PAGE_LIST, 539 .mem.ptr.pl = job->pages, 540 .mem.offset = 0, 541 .notify.fn = complete_io, 542 .notify.context = job, 543 .client = job->kc->io_client, 544 }; 545 546 /* 547 * If we need to write sequentially and some reads or writes failed, 548 * no point in continuing. 549 */ 550 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 551 job->master_job->write_err) 552 return -EIO; 553 554 io_job_start(job->kc->throttle); 555 556 if (job->rw == READ) 557 r = dm_io(&io_req, 1, &job->source, NULL); 558 else 559 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 560 561 return r; 562 } 563 564 static int run_pages_job(struct kcopyd_job *job) 565 { 566 int r; 567 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 568 569 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 570 if (!r) { 571 /* this job is ready for io */ 572 push(&job->kc->io_jobs, job); 573 return 0; 574 } 575 576 if (r == -ENOMEM) 577 /* can't complete now */ 578 return 1; 579 580 return r; 581 } 582 583 /* 584 * Run through a list for as long as possible. Returns the count 585 * of successful jobs. 586 */ 587 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 588 int (*fn) (struct kcopyd_job *)) 589 { 590 struct kcopyd_job *job; 591 int r, count = 0; 592 593 while ((job = pop(jobs, kc))) { 594 595 r = fn(job); 596 597 if (r < 0) { 598 /* error this rogue job */ 599 if (op_is_write(job->rw)) 600 job->write_err = (unsigned long) -1L; 601 else 602 job->read_err = 1; 603 push(&kc->complete_jobs, job); 604 break; 605 } 606 607 if (r > 0) { 608 /* 609 * We couldn't service this job ATM, so 610 * push this job back onto the list. 611 */ 612 push_head(jobs, job); 613 break; 614 } 615 616 count++; 617 } 618 619 return count; 620 } 621 622 /* 623 * kcopyd does this every time it's woken up. 624 */ 625 static void do_work(struct work_struct *work) 626 { 627 struct dm_kcopyd_client *kc = container_of(work, 628 struct dm_kcopyd_client, kcopyd_work); 629 struct blk_plug plug; 630 unsigned long flags; 631 632 /* 633 * The order that these are called is *very* important. 634 * complete jobs can free some pages for pages jobs. 635 * Pages jobs when successful will jump onto the io jobs 636 * list. io jobs call wake when they complete and it all 637 * starts again. 638 */ 639 spin_lock_irqsave(&kc->job_lock, flags); 640 list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs); 641 spin_unlock_irqrestore(&kc->job_lock, flags); 642 643 blk_start_plug(&plug); 644 process_jobs(&kc->complete_jobs, kc, run_complete_job); 645 process_jobs(&kc->pages_jobs, kc, run_pages_job); 646 process_jobs(&kc->io_jobs, kc, run_io_job); 647 blk_finish_plug(&plug); 648 } 649 650 /* 651 * If we are copying a small region we just dispatch a single job 652 * to do the copy, otherwise the io has to be split up into many 653 * jobs. 654 */ 655 static void dispatch_job(struct kcopyd_job *job) 656 { 657 struct dm_kcopyd_client *kc = job->kc; 658 atomic_inc(&kc->nr_jobs); 659 if (unlikely(!job->source.count)) 660 push(&kc->callback_jobs, job); 661 else if (job->pages == &zero_page_list) 662 push(&kc->io_jobs, job); 663 else 664 push(&kc->pages_jobs, job); 665 wake(kc); 666 } 667 668 static void segment_complete(int read_err, unsigned long write_err, 669 void *context) 670 { 671 /* FIXME: tidy this function */ 672 sector_t progress = 0; 673 sector_t count = 0; 674 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 675 struct kcopyd_job *job = sub_job->master_job; 676 struct dm_kcopyd_client *kc = job->kc; 677 678 mutex_lock(&job->lock); 679 680 /* update the error */ 681 if (read_err) 682 job->read_err = 1; 683 684 if (write_err) 685 job->write_err |= write_err; 686 687 /* 688 * Only dispatch more work if there hasn't been an error. 689 */ 690 if ((!job->read_err && !job->write_err) || 691 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 692 /* get the next chunk of work */ 693 progress = job->progress; 694 count = job->source.count - progress; 695 if (count) { 696 if (count > SUB_JOB_SIZE) 697 count = SUB_JOB_SIZE; 698 699 job->progress += count; 700 } 701 } 702 mutex_unlock(&job->lock); 703 704 if (count) { 705 int i; 706 707 *sub_job = *job; 708 sub_job->write_offset = progress; 709 sub_job->source.sector += progress; 710 sub_job->source.count = count; 711 712 for (i = 0; i < job->num_dests; i++) { 713 sub_job->dests[i].sector += progress; 714 sub_job->dests[i].count = count; 715 } 716 717 sub_job->fn = segment_complete; 718 sub_job->context = sub_job; 719 dispatch_job(sub_job); 720 721 } else if (atomic_dec_and_test(&job->sub_jobs)) { 722 723 /* 724 * Queue the completion callback to the kcopyd thread. 725 * 726 * Some callers assume that all the completions are called 727 * from a single thread and don't race with each other. 728 * 729 * We must not call the callback directly here because this 730 * code may not be executing in the thread. 731 */ 732 push(&kc->complete_jobs, job); 733 wake(kc); 734 } 735 } 736 737 /* 738 * Create some sub jobs to share the work between them. 739 */ 740 static void split_job(struct kcopyd_job *master_job) 741 { 742 int i; 743 744 atomic_inc(&master_job->kc->nr_jobs); 745 746 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 747 for (i = 0; i < SPLIT_COUNT; i++) { 748 master_job[i + 1].master_job = master_job; 749 segment_complete(0, 0u, &master_job[i + 1]); 750 } 751 } 752 753 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 754 unsigned int num_dests, struct dm_io_region *dests, 755 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 756 { 757 struct kcopyd_job *job; 758 int i; 759 760 /* 761 * Allocate an array of jobs consisting of one master job 762 * followed by SPLIT_COUNT sub jobs. 763 */ 764 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 765 mutex_init(&job->lock); 766 767 /* 768 * set up for the read. 769 */ 770 job->kc = kc; 771 job->flags = flags; 772 job->read_err = 0; 773 job->write_err = 0; 774 775 job->num_dests = num_dests; 776 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 777 778 /* 779 * If one of the destination is a host-managed zoned block device, 780 * we need to write sequentially. If one of the destination is a 781 * host-aware device, then leave it to the caller to choose what to do. 782 */ 783 if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 784 for (i = 0; i < job->num_dests; i++) { 785 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 786 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags); 787 break; 788 } 789 } 790 } 791 792 /* 793 * If we need to write sequentially, errors cannot be ignored. 794 */ 795 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 796 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) 797 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags); 798 799 if (from) { 800 job->source = *from; 801 job->pages = NULL; 802 job->rw = READ; 803 } else { 804 memset(&job->source, 0, sizeof job->source); 805 job->source.count = job->dests[0].count; 806 job->pages = &zero_page_list; 807 808 /* 809 * Use WRITE ZEROES to optimize zeroing if all dests support it. 810 */ 811 job->rw = REQ_OP_WRITE_ZEROES; 812 for (i = 0; i < job->num_dests; i++) 813 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 814 job->rw = WRITE; 815 break; 816 } 817 } 818 819 job->fn = fn; 820 job->context = context; 821 job->master_job = job; 822 job->write_offset = 0; 823 824 if (job->source.count <= SUB_JOB_SIZE) 825 dispatch_job(job); 826 else { 827 job->progress = 0; 828 split_job(job); 829 } 830 } 831 EXPORT_SYMBOL(dm_kcopyd_copy); 832 833 void dm_kcopyd_zero(struct dm_kcopyd_client *kc, 834 unsigned num_dests, struct dm_io_region *dests, 835 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 836 { 837 dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 838 } 839 EXPORT_SYMBOL(dm_kcopyd_zero); 840 841 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 842 dm_kcopyd_notify_fn fn, void *context) 843 { 844 struct kcopyd_job *job; 845 846 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 847 848 memset(job, 0, sizeof(struct kcopyd_job)); 849 job->kc = kc; 850 job->fn = fn; 851 job->context = context; 852 job->master_job = job; 853 854 atomic_inc(&kc->nr_jobs); 855 856 return job; 857 } 858 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 859 860 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 861 { 862 struct kcopyd_job *job = j; 863 struct dm_kcopyd_client *kc = job->kc; 864 865 job->read_err = read_err; 866 job->write_err = write_err; 867 868 push(&kc->callback_jobs, job); 869 wake(kc); 870 } 871 EXPORT_SYMBOL(dm_kcopyd_do_callback); 872 873 /* 874 * Cancels a kcopyd job, eg. someone might be deactivating a 875 * mirror. 876 */ 877 #if 0 878 int kcopyd_cancel(struct kcopyd_job *job, int block) 879 { 880 /* FIXME: finish */ 881 return -1; 882 } 883 #endif /* 0 */ 884 885 /*----------------------------------------------------------------- 886 * Client setup 887 *---------------------------------------------------------------*/ 888 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 889 { 890 int r; 891 struct dm_kcopyd_client *kc; 892 893 kc = kzalloc(sizeof(*kc), GFP_KERNEL); 894 if (!kc) 895 return ERR_PTR(-ENOMEM); 896 897 spin_lock_init(&kc->job_lock); 898 INIT_LIST_HEAD(&kc->callback_jobs); 899 INIT_LIST_HEAD(&kc->complete_jobs); 900 INIT_LIST_HEAD(&kc->io_jobs); 901 INIT_LIST_HEAD(&kc->pages_jobs); 902 kc->throttle = throttle; 903 904 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache); 905 if (r) 906 goto bad_slab; 907 908 INIT_WORK(&kc->kcopyd_work, do_work); 909 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 910 if (!kc->kcopyd_wq) { 911 r = -ENOMEM; 912 goto bad_workqueue; 913 } 914 915 kc->pages = NULL; 916 kc->nr_reserved_pages = kc->nr_free_pages = 0; 917 r = client_reserve_pages(kc, RESERVE_PAGES); 918 if (r) 919 goto bad_client_pages; 920 921 kc->io_client = dm_io_client_create(); 922 if (IS_ERR(kc->io_client)) { 923 r = PTR_ERR(kc->io_client); 924 goto bad_io_client; 925 } 926 927 init_waitqueue_head(&kc->destroyq); 928 atomic_set(&kc->nr_jobs, 0); 929 930 return kc; 931 932 bad_io_client: 933 client_free_pages(kc); 934 bad_client_pages: 935 destroy_workqueue(kc->kcopyd_wq); 936 bad_workqueue: 937 mempool_exit(&kc->job_pool); 938 bad_slab: 939 kfree(kc); 940 941 return ERR_PTR(r); 942 } 943 EXPORT_SYMBOL(dm_kcopyd_client_create); 944 945 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 946 { 947 /* Wait for completion of all jobs submitted by this client. */ 948 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 949 950 BUG_ON(!list_empty(&kc->callback_jobs)); 951 BUG_ON(!list_empty(&kc->complete_jobs)); 952 BUG_ON(!list_empty(&kc->io_jobs)); 953 BUG_ON(!list_empty(&kc->pages_jobs)); 954 destroy_workqueue(kc->kcopyd_wq); 955 dm_io_client_destroy(kc->io_client); 956 client_free_pages(kc); 957 mempool_exit(&kc->job_pool); 958 kfree(kc); 959 } 960 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 961