1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 49 mempool_t job_pool; 50 51 struct workqueue_struct *kcopyd_wq; 52 struct work_struct kcopyd_work; 53 54 struct dm_kcopyd_throttle *throttle; 55 56 atomic_t nr_jobs; 57 58 /* 59 * We maintain three lists of jobs: 60 * 61 * i) jobs waiting for pages 62 * ii) jobs that have pages, and are waiting for the io to be issued. 63 * iii) jobs that have completed. 64 * 65 * All three of these are protected by job_lock. 66 */ 67 spinlock_t job_lock; 68 struct list_head complete_jobs; 69 struct list_head io_jobs; 70 struct list_head pages_jobs; 71 }; 72 73 static struct page_list zero_page_list; 74 75 static DEFINE_SPINLOCK(throttle_spinlock); 76 77 /* 78 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 79 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 80 * by 2. 81 */ 82 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 83 84 /* 85 * Sleep this number of milliseconds. 86 * 87 * The value was decided experimentally. 88 * Smaller values seem to cause an increased copy rate above the limit. 89 * The reason for this is unknown but possibly due to jiffies rounding errors 90 * or read/write cache inside the disk. 91 */ 92 #define SLEEP_MSEC 100 93 94 /* 95 * Maximum number of sleep events. There is a theoretical livelock if more 96 * kcopyd clients do work simultaneously which this limit avoids. 97 */ 98 #define MAX_SLEEPS 10 99 100 static void io_job_start(struct dm_kcopyd_throttle *t) 101 { 102 unsigned throttle, now, difference; 103 int slept = 0, skew; 104 105 if (unlikely(!t)) 106 return; 107 108 try_again: 109 spin_lock_irq(&throttle_spinlock); 110 111 throttle = READ_ONCE(t->throttle); 112 113 if (likely(throttle >= 100)) 114 goto skip_limit; 115 116 now = jiffies; 117 difference = now - t->last_jiffies; 118 t->last_jiffies = now; 119 if (t->num_io_jobs) 120 t->io_period += difference; 121 t->total_period += difference; 122 123 /* 124 * Maintain sane values if we got a temporary overflow. 125 */ 126 if (unlikely(t->io_period > t->total_period)) 127 t->io_period = t->total_period; 128 129 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 130 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 131 t->total_period >>= shift; 132 t->io_period >>= shift; 133 } 134 135 skew = t->io_period - throttle * t->total_period / 100; 136 137 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 138 slept++; 139 spin_unlock_irq(&throttle_spinlock); 140 msleep(SLEEP_MSEC); 141 goto try_again; 142 } 143 144 skip_limit: 145 t->num_io_jobs++; 146 147 spin_unlock_irq(&throttle_spinlock); 148 } 149 150 static void io_job_finish(struct dm_kcopyd_throttle *t) 151 { 152 unsigned long flags; 153 154 if (unlikely(!t)) 155 return; 156 157 spin_lock_irqsave(&throttle_spinlock, flags); 158 159 t->num_io_jobs--; 160 161 if (likely(READ_ONCE(t->throttle) >= 100)) 162 goto skip_limit; 163 164 if (!t->num_io_jobs) { 165 unsigned now, difference; 166 167 now = jiffies; 168 difference = now - t->last_jiffies; 169 t->last_jiffies = now; 170 171 t->io_period += difference; 172 t->total_period += difference; 173 174 /* 175 * Maintain sane values if we got a temporary overflow. 176 */ 177 if (unlikely(t->io_period > t->total_period)) 178 t->io_period = t->total_period; 179 } 180 181 skip_limit: 182 spin_unlock_irqrestore(&throttle_spinlock, flags); 183 } 184 185 186 static void wake(struct dm_kcopyd_client *kc) 187 { 188 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 189 } 190 191 /* 192 * Obtain one page for the use of kcopyd. 193 */ 194 static struct page_list *alloc_pl(gfp_t gfp) 195 { 196 struct page_list *pl; 197 198 pl = kmalloc(sizeof(*pl), gfp); 199 if (!pl) 200 return NULL; 201 202 pl->page = alloc_page(gfp); 203 if (!pl->page) { 204 kfree(pl); 205 return NULL; 206 } 207 208 return pl; 209 } 210 211 static void free_pl(struct page_list *pl) 212 { 213 __free_page(pl->page); 214 kfree(pl); 215 } 216 217 /* 218 * Add the provided pages to a client's free page list, releasing 219 * back to the system any beyond the reserved_pages limit. 220 */ 221 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 222 { 223 struct page_list *next; 224 225 do { 226 next = pl->next; 227 228 if (kc->nr_free_pages >= kc->nr_reserved_pages) 229 free_pl(pl); 230 else { 231 pl->next = kc->pages; 232 kc->pages = pl; 233 kc->nr_free_pages++; 234 } 235 236 pl = next; 237 } while (pl); 238 } 239 240 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 241 unsigned int nr, struct page_list **pages) 242 { 243 struct page_list *pl; 244 245 *pages = NULL; 246 247 do { 248 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 249 if (unlikely(!pl)) { 250 /* Use reserved pages */ 251 pl = kc->pages; 252 if (unlikely(!pl)) 253 goto out_of_memory; 254 kc->pages = pl->next; 255 kc->nr_free_pages--; 256 } 257 pl->next = *pages; 258 *pages = pl; 259 } while (--nr); 260 261 return 0; 262 263 out_of_memory: 264 if (*pages) 265 kcopyd_put_pages(kc, *pages); 266 return -ENOMEM; 267 } 268 269 /* 270 * These three functions resize the page pool. 271 */ 272 static void drop_pages(struct page_list *pl) 273 { 274 struct page_list *next; 275 276 while (pl) { 277 next = pl->next; 278 free_pl(pl); 279 pl = next; 280 } 281 } 282 283 /* 284 * Allocate and reserve nr_pages for the use of a specific client. 285 */ 286 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 287 { 288 unsigned i; 289 struct page_list *pl = NULL, *next; 290 291 for (i = 0; i < nr_pages; i++) { 292 next = alloc_pl(GFP_KERNEL); 293 if (!next) { 294 if (pl) 295 drop_pages(pl); 296 return -ENOMEM; 297 } 298 next->next = pl; 299 pl = next; 300 } 301 302 kc->nr_reserved_pages += nr_pages; 303 kcopyd_put_pages(kc, pl); 304 305 return 0; 306 } 307 308 static void client_free_pages(struct dm_kcopyd_client *kc) 309 { 310 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 311 drop_pages(kc->pages); 312 kc->pages = NULL; 313 kc->nr_free_pages = kc->nr_reserved_pages = 0; 314 } 315 316 /*----------------------------------------------------------------- 317 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 318 * for this reason we use a mempool to prevent the client from 319 * ever having to do io (which could cause a deadlock). 320 *---------------------------------------------------------------*/ 321 struct kcopyd_job { 322 struct dm_kcopyd_client *kc; 323 struct list_head list; 324 unsigned long flags; 325 326 /* 327 * Error state of the job. 328 */ 329 int read_err; 330 unsigned long write_err; 331 332 /* 333 * Either READ or WRITE 334 */ 335 int rw; 336 struct dm_io_region source; 337 338 /* 339 * The destinations for the transfer. 340 */ 341 unsigned int num_dests; 342 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 343 344 struct page_list *pages; 345 346 /* 347 * Set this to ensure you are notified when the job has 348 * completed. 'context' is for callback to use. 349 */ 350 dm_kcopyd_notify_fn fn; 351 void *context; 352 353 /* 354 * These fields are only used if the job has been split 355 * into more manageable parts. 356 */ 357 struct mutex lock; 358 atomic_t sub_jobs; 359 sector_t progress; 360 sector_t write_offset; 361 362 struct kcopyd_job *master_job; 363 }; 364 365 static struct kmem_cache *_job_cache; 366 367 int __init dm_kcopyd_init(void) 368 { 369 _job_cache = kmem_cache_create("kcopyd_job", 370 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 371 __alignof__(struct kcopyd_job), 0, NULL); 372 if (!_job_cache) 373 return -ENOMEM; 374 375 zero_page_list.next = &zero_page_list; 376 zero_page_list.page = ZERO_PAGE(0); 377 378 return 0; 379 } 380 381 void dm_kcopyd_exit(void) 382 { 383 kmem_cache_destroy(_job_cache); 384 _job_cache = NULL; 385 } 386 387 /* 388 * Functions to push and pop a job onto the head of a given job 389 * list. 390 */ 391 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 392 struct dm_kcopyd_client *kc) 393 { 394 struct kcopyd_job *job; 395 396 /* 397 * For I/O jobs, pop any read, any write without sequential write 398 * constraint and sequential writes that are at the right position. 399 */ 400 list_for_each_entry(job, jobs, list) { 401 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 402 list_del(&job->list); 403 return job; 404 } 405 406 if (job->write_offset == job->master_job->write_offset) { 407 job->master_job->write_offset += job->source.count; 408 list_del(&job->list); 409 return job; 410 } 411 } 412 413 return NULL; 414 } 415 416 static struct kcopyd_job *pop(struct list_head *jobs, 417 struct dm_kcopyd_client *kc) 418 { 419 struct kcopyd_job *job = NULL; 420 unsigned long flags; 421 422 spin_lock_irqsave(&kc->job_lock, flags); 423 424 if (!list_empty(jobs)) { 425 if (jobs == &kc->io_jobs) 426 job = pop_io_job(jobs, kc); 427 else { 428 job = list_entry(jobs->next, struct kcopyd_job, list); 429 list_del(&job->list); 430 } 431 } 432 spin_unlock_irqrestore(&kc->job_lock, flags); 433 434 return job; 435 } 436 437 static void push(struct list_head *jobs, struct kcopyd_job *job) 438 { 439 unsigned long flags; 440 struct dm_kcopyd_client *kc = job->kc; 441 442 spin_lock_irqsave(&kc->job_lock, flags); 443 list_add_tail(&job->list, jobs); 444 spin_unlock_irqrestore(&kc->job_lock, flags); 445 } 446 447 448 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 449 { 450 unsigned long flags; 451 struct dm_kcopyd_client *kc = job->kc; 452 453 spin_lock_irqsave(&kc->job_lock, flags); 454 list_add(&job->list, jobs); 455 spin_unlock_irqrestore(&kc->job_lock, flags); 456 } 457 458 /* 459 * These three functions process 1 item from the corresponding 460 * job list. 461 * 462 * They return: 463 * < 0: error 464 * 0: success 465 * > 0: can't process yet. 466 */ 467 static int run_complete_job(struct kcopyd_job *job) 468 { 469 void *context = job->context; 470 int read_err = job->read_err; 471 unsigned long write_err = job->write_err; 472 dm_kcopyd_notify_fn fn = job->fn; 473 struct dm_kcopyd_client *kc = job->kc; 474 475 if (job->pages && job->pages != &zero_page_list) 476 kcopyd_put_pages(kc, job->pages); 477 /* 478 * If this is the master job, the sub jobs have already 479 * completed so we can free everything. 480 */ 481 if (job->master_job == job) { 482 mutex_destroy(&job->lock); 483 mempool_free(job, &kc->job_pool); 484 } 485 fn(read_err, write_err, context); 486 487 if (atomic_dec_and_test(&kc->nr_jobs)) 488 wake_up(&kc->destroyq); 489 490 cond_resched(); 491 492 return 0; 493 } 494 495 static void complete_io(unsigned long error, void *context) 496 { 497 struct kcopyd_job *job = (struct kcopyd_job *) context; 498 struct dm_kcopyd_client *kc = job->kc; 499 500 io_job_finish(kc->throttle); 501 502 if (error) { 503 if (op_is_write(job->rw)) 504 job->write_err |= error; 505 else 506 job->read_err = 1; 507 508 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 509 push(&kc->complete_jobs, job); 510 wake(kc); 511 return; 512 } 513 } 514 515 if (op_is_write(job->rw)) 516 push(&kc->complete_jobs, job); 517 518 else { 519 job->rw = WRITE; 520 push(&kc->io_jobs, job); 521 } 522 523 wake(kc); 524 } 525 526 /* 527 * Request io on as many buffer heads as we can currently get for 528 * a particular job. 529 */ 530 static int run_io_job(struct kcopyd_job *job) 531 { 532 int r; 533 struct dm_io_request io_req = { 534 .bi_op = job->rw, 535 .bi_op_flags = 0, 536 .mem.type = DM_IO_PAGE_LIST, 537 .mem.ptr.pl = job->pages, 538 .mem.offset = 0, 539 .notify.fn = complete_io, 540 .notify.context = job, 541 .client = job->kc->io_client, 542 }; 543 544 /* 545 * If we need to write sequentially and some reads or writes failed, 546 * no point in continuing. 547 */ 548 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 549 job->master_job->write_err) 550 return -EIO; 551 552 io_job_start(job->kc->throttle); 553 554 if (job->rw == READ) 555 r = dm_io(&io_req, 1, &job->source, NULL); 556 else 557 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 558 559 return r; 560 } 561 562 static int run_pages_job(struct kcopyd_job *job) 563 { 564 int r; 565 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 566 567 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 568 if (!r) { 569 /* this job is ready for io */ 570 push(&job->kc->io_jobs, job); 571 return 0; 572 } 573 574 if (r == -ENOMEM) 575 /* can't complete now */ 576 return 1; 577 578 return r; 579 } 580 581 /* 582 * Run through a list for as long as possible. Returns the count 583 * of successful jobs. 584 */ 585 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 586 int (*fn) (struct kcopyd_job *)) 587 { 588 struct kcopyd_job *job; 589 int r, count = 0; 590 591 while ((job = pop(jobs, kc))) { 592 593 r = fn(job); 594 595 if (r < 0) { 596 /* error this rogue job */ 597 if (op_is_write(job->rw)) 598 job->write_err = (unsigned long) -1L; 599 else 600 job->read_err = 1; 601 push(&kc->complete_jobs, job); 602 break; 603 } 604 605 if (r > 0) { 606 /* 607 * We couldn't service this job ATM, so 608 * push this job back onto the list. 609 */ 610 push_head(jobs, job); 611 break; 612 } 613 614 count++; 615 } 616 617 return count; 618 } 619 620 /* 621 * kcopyd does this every time it's woken up. 622 */ 623 static void do_work(struct work_struct *work) 624 { 625 struct dm_kcopyd_client *kc = container_of(work, 626 struct dm_kcopyd_client, kcopyd_work); 627 struct blk_plug plug; 628 629 /* 630 * The order that these are called is *very* important. 631 * complete jobs can free some pages for pages jobs. 632 * Pages jobs when successful will jump onto the io jobs 633 * list. io jobs call wake when they complete and it all 634 * starts again. 635 */ 636 blk_start_plug(&plug); 637 process_jobs(&kc->complete_jobs, kc, run_complete_job); 638 process_jobs(&kc->pages_jobs, kc, run_pages_job); 639 process_jobs(&kc->io_jobs, kc, run_io_job); 640 blk_finish_plug(&plug); 641 } 642 643 /* 644 * If we are copying a small region we just dispatch a single job 645 * to do the copy, otherwise the io has to be split up into many 646 * jobs. 647 */ 648 static void dispatch_job(struct kcopyd_job *job) 649 { 650 struct dm_kcopyd_client *kc = job->kc; 651 atomic_inc(&kc->nr_jobs); 652 if (unlikely(!job->source.count)) 653 push(&kc->complete_jobs, job); 654 else if (job->pages == &zero_page_list) 655 push(&kc->io_jobs, job); 656 else 657 push(&kc->pages_jobs, job); 658 wake(kc); 659 } 660 661 static void segment_complete(int read_err, unsigned long write_err, 662 void *context) 663 { 664 /* FIXME: tidy this function */ 665 sector_t progress = 0; 666 sector_t count = 0; 667 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 668 struct kcopyd_job *job = sub_job->master_job; 669 struct dm_kcopyd_client *kc = job->kc; 670 671 mutex_lock(&job->lock); 672 673 /* update the error */ 674 if (read_err) 675 job->read_err = 1; 676 677 if (write_err) 678 job->write_err |= write_err; 679 680 /* 681 * Only dispatch more work if there hasn't been an error. 682 */ 683 if ((!job->read_err && !job->write_err) || 684 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 685 /* get the next chunk of work */ 686 progress = job->progress; 687 count = job->source.count - progress; 688 if (count) { 689 if (count > SUB_JOB_SIZE) 690 count = SUB_JOB_SIZE; 691 692 job->progress += count; 693 } 694 } 695 mutex_unlock(&job->lock); 696 697 if (count) { 698 int i; 699 700 *sub_job = *job; 701 sub_job->write_offset = progress; 702 sub_job->source.sector += progress; 703 sub_job->source.count = count; 704 705 for (i = 0; i < job->num_dests; i++) { 706 sub_job->dests[i].sector += progress; 707 sub_job->dests[i].count = count; 708 } 709 710 sub_job->fn = segment_complete; 711 sub_job->context = sub_job; 712 dispatch_job(sub_job); 713 714 } else if (atomic_dec_and_test(&job->sub_jobs)) { 715 716 /* 717 * Queue the completion callback to the kcopyd thread. 718 * 719 * Some callers assume that all the completions are called 720 * from a single thread and don't race with each other. 721 * 722 * We must not call the callback directly here because this 723 * code may not be executing in the thread. 724 */ 725 push(&kc->complete_jobs, job); 726 wake(kc); 727 } 728 } 729 730 /* 731 * Create some sub jobs to share the work between them. 732 */ 733 static void split_job(struct kcopyd_job *master_job) 734 { 735 int i; 736 737 atomic_inc(&master_job->kc->nr_jobs); 738 739 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 740 for (i = 0; i < SPLIT_COUNT; i++) { 741 master_job[i + 1].master_job = master_job; 742 segment_complete(0, 0u, &master_job[i + 1]); 743 } 744 } 745 746 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 747 unsigned int num_dests, struct dm_io_region *dests, 748 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 749 { 750 struct kcopyd_job *job; 751 int i; 752 753 /* 754 * Allocate an array of jobs consisting of one master job 755 * followed by SPLIT_COUNT sub jobs. 756 */ 757 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 758 mutex_init(&job->lock); 759 760 /* 761 * set up for the read. 762 */ 763 job->kc = kc; 764 job->flags = flags; 765 job->read_err = 0; 766 job->write_err = 0; 767 768 job->num_dests = num_dests; 769 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 770 771 /* 772 * If one of the destination is a host-managed zoned block device, 773 * we need to write sequentially. If one of the destination is a 774 * host-aware device, then leave it to the caller to choose what to do. 775 */ 776 if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 777 for (i = 0; i < job->num_dests; i++) { 778 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 779 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags); 780 break; 781 } 782 } 783 } 784 785 /* 786 * If we need to write sequentially, errors cannot be ignored. 787 */ 788 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 789 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) 790 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags); 791 792 if (from) { 793 job->source = *from; 794 job->pages = NULL; 795 job->rw = READ; 796 } else { 797 memset(&job->source, 0, sizeof job->source); 798 job->source.count = job->dests[0].count; 799 job->pages = &zero_page_list; 800 801 /* 802 * Use WRITE ZEROES to optimize zeroing if all dests support it. 803 */ 804 job->rw = REQ_OP_WRITE_ZEROES; 805 for (i = 0; i < job->num_dests; i++) 806 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 807 job->rw = WRITE; 808 break; 809 } 810 } 811 812 job->fn = fn; 813 job->context = context; 814 job->master_job = job; 815 job->write_offset = 0; 816 817 if (job->source.count <= SUB_JOB_SIZE) 818 dispatch_job(job); 819 else { 820 job->progress = 0; 821 split_job(job); 822 } 823 } 824 EXPORT_SYMBOL(dm_kcopyd_copy); 825 826 void dm_kcopyd_zero(struct dm_kcopyd_client *kc, 827 unsigned num_dests, struct dm_io_region *dests, 828 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 829 { 830 dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 831 } 832 EXPORT_SYMBOL(dm_kcopyd_zero); 833 834 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 835 dm_kcopyd_notify_fn fn, void *context) 836 { 837 struct kcopyd_job *job; 838 839 job = mempool_alloc(&kc->job_pool, GFP_NOIO); 840 841 memset(job, 0, sizeof(struct kcopyd_job)); 842 job->kc = kc; 843 job->fn = fn; 844 job->context = context; 845 job->master_job = job; 846 847 atomic_inc(&kc->nr_jobs); 848 849 return job; 850 } 851 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 852 853 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 854 { 855 struct kcopyd_job *job = j; 856 struct dm_kcopyd_client *kc = job->kc; 857 858 job->read_err = read_err; 859 job->write_err = write_err; 860 861 push(&kc->complete_jobs, job); 862 wake(kc); 863 } 864 EXPORT_SYMBOL(dm_kcopyd_do_callback); 865 866 /* 867 * Cancels a kcopyd job, eg. someone might be deactivating a 868 * mirror. 869 */ 870 #if 0 871 int kcopyd_cancel(struct kcopyd_job *job, int block) 872 { 873 /* FIXME: finish */ 874 return -1; 875 } 876 #endif /* 0 */ 877 878 /*----------------------------------------------------------------- 879 * Client setup 880 *---------------------------------------------------------------*/ 881 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 882 { 883 int r; 884 struct dm_kcopyd_client *kc; 885 886 kc = kzalloc(sizeof(*kc), GFP_KERNEL); 887 if (!kc) 888 return ERR_PTR(-ENOMEM); 889 890 spin_lock_init(&kc->job_lock); 891 INIT_LIST_HEAD(&kc->complete_jobs); 892 INIT_LIST_HEAD(&kc->io_jobs); 893 INIT_LIST_HEAD(&kc->pages_jobs); 894 kc->throttle = throttle; 895 896 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache); 897 if (r) 898 goto bad_slab; 899 900 INIT_WORK(&kc->kcopyd_work, do_work); 901 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 902 if (!kc->kcopyd_wq) { 903 r = -ENOMEM; 904 goto bad_workqueue; 905 } 906 907 kc->pages = NULL; 908 kc->nr_reserved_pages = kc->nr_free_pages = 0; 909 r = client_reserve_pages(kc, RESERVE_PAGES); 910 if (r) 911 goto bad_client_pages; 912 913 kc->io_client = dm_io_client_create(); 914 if (IS_ERR(kc->io_client)) { 915 r = PTR_ERR(kc->io_client); 916 goto bad_io_client; 917 } 918 919 init_waitqueue_head(&kc->destroyq); 920 atomic_set(&kc->nr_jobs, 0); 921 922 return kc; 923 924 bad_io_client: 925 client_free_pages(kc); 926 bad_client_pages: 927 destroy_workqueue(kc->kcopyd_wq); 928 bad_workqueue: 929 mempool_exit(&kc->job_pool); 930 bad_slab: 931 kfree(kc); 932 933 return ERR_PTR(r); 934 } 935 EXPORT_SYMBOL(dm_kcopyd_client_create); 936 937 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 938 { 939 /* Wait for completion of all jobs submitted by this client. */ 940 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 941 942 BUG_ON(!list_empty(&kc->complete_jobs)); 943 BUG_ON(!list_empty(&kc->io_jobs)); 944 BUG_ON(!list_empty(&kc->pages_jobs)); 945 destroy_workqueue(kc->kcopyd_wq); 946 dm_io_client_destroy(kc->io_client); 947 client_free_pages(kc); 948 mempool_exit(&kc->job_pool); 949 kfree(kc); 950 } 951 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 952