1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 atomic_t nr_jobs; 49 50 mempool_t *job_pool; 51 52 struct workqueue_struct *kcopyd_wq; 53 struct work_struct kcopyd_work; 54 55 struct dm_kcopyd_throttle *throttle; 56 57 /* 58 * We maintain three lists of jobs: 59 * 60 * i) jobs waiting for pages 61 * ii) jobs that have pages, and are waiting for the io to be issued. 62 * iii) jobs that have completed. 63 * 64 * All three of these are protected by job_lock. 65 */ 66 spinlock_t job_lock; 67 struct list_head complete_jobs; 68 struct list_head io_jobs; 69 struct list_head pages_jobs; 70 }; 71 72 static struct page_list zero_page_list; 73 74 static DEFINE_SPINLOCK(throttle_spinlock); 75 76 /* 77 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 78 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 79 * by 2. 80 */ 81 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 82 83 /* 84 * Sleep this number of milliseconds. 85 * 86 * The value was decided experimentally. 87 * Smaller values seem to cause an increased copy rate above the limit. 88 * The reason for this is unknown but possibly due to jiffies rounding errors 89 * or read/write cache inside the disk. 90 */ 91 #define SLEEP_MSEC 100 92 93 /* 94 * Maximum number of sleep events. There is a theoretical livelock if more 95 * kcopyd clients do work simultaneously which this limit avoids. 96 */ 97 #define MAX_SLEEPS 10 98 99 static void io_job_start(struct dm_kcopyd_throttle *t) 100 { 101 unsigned throttle, now, difference; 102 int slept = 0, skew; 103 104 if (unlikely(!t)) 105 return; 106 107 try_again: 108 spin_lock_irq(&throttle_spinlock); 109 110 throttle = READ_ONCE(t->throttle); 111 112 if (likely(throttle >= 100)) 113 goto skip_limit; 114 115 now = jiffies; 116 difference = now - t->last_jiffies; 117 t->last_jiffies = now; 118 if (t->num_io_jobs) 119 t->io_period += difference; 120 t->total_period += difference; 121 122 /* 123 * Maintain sane values if we got a temporary overflow. 124 */ 125 if (unlikely(t->io_period > t->total_period)) 126 t->io_period = t->total_period; 127 128 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 129 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 130 t->total_period >>= shift; 131 t->io_period >>= shift; 132 } 133 134 skew = t->io_period - throttle * t->total_period / 100; 135 136 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 137 slept++; 138 spin_unlock_irq(&throttle_spinlock); 139 msleep(SLEEP_MSEC); 140 goto try_again; 141 } 142 143 skip_limit: 144 t->num_io_jobs++; 145 146 spin_unlock_irq(&throttle_spinlock); 147 } 148 149 static void io_job_finish(struct dm_kcopyd_throttle *t) 150 { 151 unsigned long flags; 152 153 if (unlikely(!t)) 154 return; 155 156 spin_lock_irqsave(&throttle_spinlock, flags); 157 158 t->num_io_jobs--; 159 160 if (likely(READ_ONCE(t->throttle) >= 100)) 161 goto skip_limit; 162 163 if (!t->num_io_jobs) { 164 unsigned now, difference; 165 166 now = jiffies; 167 difference = now - t->last_jiffies; 168 t->last_jiffies = now; 169 170 t->io_period += difference; 171 t->total_period += difference; 172 173 /* 174 * Maintain sane values if we got a temporary overflow. 175 */ 176 if (unlikely(t->io_period > t->total_period)) 177 t->io_period = t->total_period; 178 } 179 180 skip_limit: 181 spin_unlock_irqrestore(&throttle_spinlock, flags); 182 } 183 184 185 static void wake(struct dm_kcopyd_client *kc) 186 { 187 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 188 } 189 190 /* 191 * Obtain one page for the use of kcopyd. 192 */ 193 static struct page_list *alloc_pl(gfp_t gfp) 194 { 195 struct page_list *pl; 196 197 pl = kmalloc(sizeof(*pl), gfp); 198 if (!pl) 199 return NULL; 200 201 pl->page = alloc_page(gfp); 202 if (!pl->page) { 203 kfree(pl); 204 return NULL; 205 } 206 207 return pl; 208 } 209 210 static void free_pl(struct page_list *pl) 211 { 212 __free_page(pl->page); 213 kfree(pl); 214 } 215 216 /* 217 * Add the provided pages to a client's free page list, releasing 218 * back to the system any beyond the reserved_pages limit. 219 */ 220 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 221 { 222 struct page_list *next; 223 224 do { 225 next = pl->next; 226 227 if (kc->nr_free_pages >= kc->nr_reserved_pages) 228 free_pl(pl); 229 else { 230 pl->next = kc->pages; 231 kc->pages = pl; 232 kc->nr_free_pages++; 233 } 234 235 pl = next; 236 } while (pl); 237 } 238 239 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 240 unsigned int nr, struct page_list **pages) 241 { 242 struct page_list *pl; 243 244 *pages = NULL; 245 246 do { 247 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 248 if (unlikely(!pl)) { 249 /* Use reserved pages */ 250 pl = kc->pages; 251 if (unlikely(!pl)) 252 goto out_of_memory; 253 kc->pages = pl->next; 254 kc->nr_free_pages--; 255 } 256 pl->next = *pages; 257 *pages = pl; 258 } while (--nr); 259 260 return 0; 261 262 out_of_memory: 263 if (*pages) 264 kcopyd_put_pages(kc, *pages); 265 return -ENOMEM; 266 } 267 268 /* 269 * These three functions resize the page pool. 270 */ 271 static void drop_pages(struct page_list *pl) 272 { 273 struct page_list *next; 274 275 while (pl) { 276 next = pl->next; 277 free_pl(pl); 278 pl = next; 279 } 280 } 281 282 /* 283 * Allocate and reserve nr_pages for the use of a specific client. 284 */ 285 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 286 { 287 unsigned i; 288 struct page_list *pl = NULL, *next; 289 290 for (i = 0; i < nr_pages; i++) { 291 next = alloc_pl(GFP_KERNEL); 292 if (!next) { 293 if (pl) 294 drop_pages(pl); 295 return -ENOMEM; 296 } 297 next->next = pl; 298 pl = next; 299 } 300 301 kc->nr_reserved_pages += nr_pages; 302 kcopyd_put_pages(kc, pl); 303 304 return 0; 305 } 306 307 static void client_free_pages(struct dm_kcopyd_client *kc) 308 { 309 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 310 drop_pages(kc->pages); 311 kc->pages = NULL; 312 kc->nr_free_pages = kc->nr_reserved_pages = 0; 313 } 314 315 /*----------------------------------------------------------------- 316 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 317 * for this reason we use a mempool to prevent the client from 318 * ever having to do io (which could cause a deadlock). 319 *---------------------------------------------------------------*/ 320 struct kcopyd_job { 321 struct dm_kcopyd_client *kc; 322 struct list_head list; 323 unsigned long flags; 324 325 /* 326 * Error state of the job. 327 */ 328 int read_err; 329 unsigned long write_err; 330 331 /* 332 * Either READ or WRITE 333 */ 334 int rw; 335 struct dm_io_region source; 336 337 /* 338 * The destinations for the transfer. 339 */ 340 unsigned int num_dests; 341 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 342 343 struct page_list *pages; 344 345 /* 346 * Set this to ensure you are notified when the job has 347 * completed. 'context' is for callback to use. 348 */ 349 dm_kcopyd_notify_fn fn; 350 void *context; 351 352 /* 353 * These fields are only used if the job has been split 354 * into more manageable parts. 355 */ 356 struct mutex lock; 357 atomic_t sub_jobs; 358 sector_t progress; 359 sector_t write_offset; 360 361 struct kcopyd_job *master_job; 362 }; 363 364 static struct kmem_cache *_job_cache; 365 366 int __init dm_kcopyd_init(void) 367 { 368 _job_cache = kmem_cache_create("kcopyd_job", 369 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 370 __alignof__(struct kcopyd_job), 0, NULL); 371 if (!_job_cache) 372 return -ENOMEM; 373 374 zero_page_list.next = &zero_page_list; 375 zero_page_list.page = ZERO_PAGE(0); 376 377 return 0; 378 } 379 380 void dm_kcopyd_exit(void) 381 { 382 kmem_cache_destroy(_job_cache); 383 _job_cache = NULL; 384 } 385 386 /* 387 * Functions to push and pop a job onto the head of a given job 388 * list. 389 */ 390 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 391 struct dm_kcopyd_client *kc) 392 { 393 struct kcopyd_job *job; 394 395 /* 396 * For I/O jobs, pop any read, any write without sequential write 397 * constraint and sequential writes that are at the right position. 398 */ 399 list_for_each_entry(job, jobs, list) { 400 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 401 list_del(&job->list); 402 return job; 403 } 404 405 if (job->write_offset == job->master_job->write_offset) { 406 job->master_job->write_offset += job->source.count; 407 list_del(&job->list); 408 return job; 409 } 410 } 411 412 return NULL; 413 } 414 415 static struct kcopyd_job *pop(struct list_head *jobs, 416 struct dm_kcopyd_client *kc) 417 { 418 struct kcopyd_job *job = NULL; 419 unsigned long flags; 420 421 spin_lock_irqsave(&kc->job_lock, flags); 422 423 if (!list_empty(jobs)) { 424 if (jobs == &kc->io_jobs) 425 job = pop_io_job(jobs, kc); 426 else { 427 job = list_entry(jobs->next, struct kcopyd_job, list); 428 list_del(&job->list); 429 } 430 } 431 spin_unlock_irqrestore(&kc->job_lock, flags); 432 433 return job; 434 } 435 436 static void push(struct list_head *jobs, struct kcopyd_job *job) 437 { 438 unsigned long flags; 439 struct dm_kcopyd_client *kc = job->kc; 440 441 spin_lock_irqsave(&kc->job_lock, flags); 442 list_add_tail(&job->list, jobs); 443 spin_unlock_irqrestore(&kc->job_lock, flags); 444 } 445 446 447 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 448 { 449 unsigned long flags; 450 struct dm_kcopyd_client *kc = job->kc; 451 452 spin_lock_irqsave(&kc->job_lock, flags); 453 list_add(&job->list, jobs); 454 spin_unlock_irqrestore(&kc->job_lock, flags); 455 } 456 457 /* 458 * These three functions process 1 item from the corresponding 459 * job list. 460 * 461 * They return: 462 * < 0: error 463 * 0: success 464 * > 0: can't process yet. 465 */ 466 static int run_complete_job(struct kcopyd_job *job) 467 { 468 void *context = job->context; 469 int read_err = job->read_err; 470 unsigned long write_err = job->write_err; 471 dm_kcopyd_notify_fn fn = job->fn; 472 struct dm_kcopyd_client *kc = job->kc; 473 474 if (job->pages && job->pages != &zero_page_list) 475 kcopyd_put_pages(kc, job->pages); 476 /* 477 * If this is the master job, the sub jobs have already 478 * completed so we can free everything. 479 */ 480 if (job->master_job == job) { 481 mutex_destroy(&job->lock); 482 mempool_free(job, kc->job_pool); 483 } 484 fn(read_err, write_err, context); 485 486 if (atomic_dec_and_test(&kc->nr_jobs)) 487 wake_up(&kc->destroyq); 488 489 return 0; 490 } 491 492 static void complete_io(unsigned long error, void *context) 493 { 494 struct kcopyd_job *job = (struct kcopyd_job *) context; 495 struct dm_kcopyd_client *kc = job->kc; 496 497 io_job_finish(kc->throttle); 498 499 if (error) { 500 if (op_is_write(job->rw)) 501 job->write_err |= error; 502 else 503 job->read_err = 1; 504 505 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 506 push(&kc->complete_jobs, job); 507 wake(kc); 508 return; 509 } 510 } 511 512 if (op_is_write(job->rw)) 513 push(&kc->complete_jobs, job); 514 515 else { 516 job->rw = WRITE; 517 push(&kc->io_jobs, job); 518 } 519 520 wake(kc); 521 } 522 523 /* 524 * Request io on as many buffer heads as we can currently get for 525 * a particular job. 526 */ 527 static int run_io_job(struct kcopyd_job *job) 528 { 529 int r; 530 struct dm_io_request io_req = { 531 .bi_op = job->rw, 532 .bi_op_flags = 0, 533 .mem.type = DM_IO_PAGE_LIST, 534 .mem.ptr.pl = job->pages, 535 .mem.offset = 0, 536 .notify.fn = complete_io, 537 .notify.context = job, 538 .client = job->kc->io_client, 539 }; 540 541 /* 542 * If we need to write sequentially and some reads or writes failed, 543 * no point in continuing. 544 */ 545 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 546 job->master_job->write_err) 547 return -EIO; 548 549 io_job_start(job->kc->throttle); 550 551 if (job->rw == READ) 552 r = dm_io(&io_req, 1, &job->source, NULL); 553 else 554 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 555 556 return r; 557 } 558 559 static int run_pages_job(struct kcopyd_job *job) 560 { 561 int r; 562 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 563 564 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 565 if (!r) { 566 /* this job is ready for io */ 567 push(&job->kc->io_jobs, job); 568 return 0; 569 } 570 571 if (r == -ENOMEM) 572 /* can't complete now */ 573 return 1; 574 575 return r; 576 } 577 578 /* 579 * Run through a list for as long as possible. Returns the count 580 * of successful jobs. 581 */ 582 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 583 int (*fn) (struct kcopyd_job *)) 584 { 585 struct kcopyd_job *job; 586 int r, count = 0; 587 588 while ((job = pop(jobs, kc))) { 589 590 r = fn(job); 591 592 if (r < 0) { 593 /* error this rogue job */ 594 if (op_is_write(job->rw)) 595 job->write_err = (unsigned long) -1L; 596 else 597 job->read_err = 1; 598 push(&kc->complete_jobs, job); 599 break; 600 } 601 602 if (r > 0) { 603 /* 604 * We couldn't service this job ATM, so 605 * push this job back onto the list. 606 */ 607 push_head(jobs, job); 608 break; 609 } 610 611 count++; 612 } 613 614 return count; 615 } 616 617 /* 618 * kcopyd does this every time it's woken up. 619 */ 620 static void do_work(struct work_struct *work) 621 { 622 struct dm_kcopyd_client *kc = container_of(work, 623 struct dm_kcopyd_client, kcopyd_work); 624 struct blk_plug plug; 625 626 /* 627 * The order that these are called is *very* important. 628 * complete jobs can free some pages for pages jobs. 629 * Pages jobs when successful will jump onto the io jobs 630 * list. io jobs call wake when they complete and it all 631 * starts again. 632 */ 633 blk_start_plug(&plug); 634 process_jobs(&kc->complete_jobs, kc, run_complete_job); 635 process_jobs(&kc->pages_jobs, kc, run_pages_job); 636 process_jobs(&kc->io_jobs, kc, run_io_job); 637 blk_finish_plug(&plug); 638 } 639 640 /* 641 * If we are copying a small region we just dispatch a single job 642 * to do the copy, otherwise the io has to be split up into many 643 * jobs. 644 */ 645 static void dispatch_job(struct kcopyd_job *job) 646 { 647 struct dm_kcopyd_client *kc = job->kc; 648 atomic_inc(&kc->nr_jobs); 649 if (unlikely(!job->source.count)) 650 push(&kc->complete_jobs, job); 651 else if (job->pages == &zero_page_list) 652 push(&kc->io_jobs, job); 653 else 654 push(&kc->pages_jobs, job); 655 wake(kc); 656 } 657 658 static void segment_complete(int read_err, unsigned long write_err, 659 void *context) 660 { 661 /* FIXME: tidy this function */ 662 sector_t progress = 0; 663 sector_t count = 0; 664 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 665 struct kcopyd_job *job = sub_job->master_job; 666 struct dm_kcopyd_client *kc = job->kc; 667 668 mutex_lock(&job->lock); 669 670 /* update the error */ 671 if (read_err) 672 job->read_err = 1; 673 674 if (write_err) 675 job->write_err |= write_err; 676 677 /* 678 * Only dispatch more work if there hasn't been an error. 679 */ 680 if ((!job->read_err && !job->write_err) || 681 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 682 /* get the next chunk of work */ 683 progress = job->progress; 684 count = job->source.count - progress; 685 if (count) { 686 if (count > SUB_JOB_SIZE) 687 count = SUB_JOB_SIZE; 688 689 job->progress += count; 690 } 691 } 692 mutex_unlock(&job->lock); 693 694 if (count) { 695 int i; 696 697 *sub_job = *job; 698 sub_job->write_offset = progress; 699 sub_job->source.sector += progress; 700 sub_job->source.count = count; 701 702 for (i = 0; i < job->num_dests; i++) { 703 sub_job->dests[i].sector += progress; 704 sub_job->dests[i].count = count; 705 } 706 707 sub_job->fn = segment_complete; 708 sub_job->context = sub_job; 709 dispatch_job(sub_job); 710 711 } else if (atomic_dec_and_test(&job->sub_jobs)) { 712 713 /* 714 * Queue the completion callback to the kcopyd thread. 715 * 716 * Some callers assume that all the completions are called 717 * from a single thread and don't race with each other. 718 * 719 * We must not call the callback directly here because this 720 * code may not be executing in the thread. 721 */ 722 push(&kc->complete_jobs, job); 723 wake(kc); 724 } 725 } 726 727 /* 728 * Create some sub jobs to share the work between them. 729 */ 730 static void split_job(struct kcopyd_job *master_job) 731 { 732 int i; 733 734 atomic_inc(&master_job->kc->nr_jobs); 735 736 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 737 for (i = 0; i < SPLIT_COUNT; i++) { 738 master_job[i + 1].master_job = master_job; 739 segment_complete(0, 0u, &master_job[i + 1]); 740 } 741 } 742 743 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 744 unsigned int num_dests, struct dm_io_region *dests, 745 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 746 { 747 struct kcopyd_job *job; 748 int i; 749 750 /* 751 * Allocate an array of jobs consisting of one master job 752 * followed by SPLIT_COUNT sub jobs. 753 */ 754 job = mempool_alloc(kc->job_pool, GFP_NOIO); 755 mutex_init(&job->lock); 756 757 /* 758 * set up for the read. 759 */ 760 job->kc = kc; 761 job->flags = flags; 762 job->read_err = 0; 763 job->write_err = 0; 764 765 job->num_dests = num_dests; 766 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 767 768 /* 769 * If one of the destination is a host-managed zoned block device, 770 * we need to write sequentially. If one of the destination is a 771 * host-aware device, then leave it to the caller to choose what to do. 772 */ 773 if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 774 for (i = 0; i < job->num_dests; i++) { 775 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 776 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags); 777 break; 778 } 779 } 780 } 781 782 /* 783 * If we need to write sequentially, errors cannot be ignored. 784 */ 785 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 786 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) 787 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags); 788 789 if (from) { 790 job->source = *from; 791 job->pages = NULL; 792 job->rw = READ; 793 } else { 794 memset(&job->source, 0, sizeof job->source); 795 job->source.count = job->dests[0].count; 796 job->pages = &zero_page_list; 797 798 /* 799 * Use WRITE ZEROES to optimize zeroing if all dests support it. 800 */ 801 job->rw = REQ_OP_WRITE_ZEROES; 802 for (i = 0; i < job->num_dests; i++) 803 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 804 job->rw = WRITE; 805 break; 806 } 807 } 808 809 job->fn = fn; 810 job->context = context; 811 job->master_job = job; 812 job->write_offset = 0; 813 814 if (job->source.count <= SUB_JOB_SIZE) 815 dispatch_job(job); 816 else { 817 job->progress = 0; 818 split_job(job); 819 } 820 821 return 0; 822 } 823 EXPORT_SYMBOL(dm_kcopyd_copy); 824 825 int dm_kcopyd_zero(struct dm_kcopyd_client *kc, 826 unsigned num_dests, struct dm_io_region *dests, 827 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 828 { 829 return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 830 } 831 EXPORT_SYMBOL(dm_kcopyd_zero); 832 833 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 834 dm_kcopyd_notify_fn fn, void *context) 835 { 836 struct kcopyd_job *job; 837 838 job = mempool_alloc(kc->job_pool, GFP_NOIO); 839 840 memset(job, 0, sizeof(struct kcopyd_job)); 841 job->kc = kc; 842 job->fn = fn; 843 job->context = context; 844 job->master_job = job; 845 846 atomic_inc(&kc->nr_jobs); 847 848 return job; 849 } 850 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 851 852 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 853 { 854 struct kcopyd_job *job = j; 855 struct dm_kcopyd_client *kc = job->kc; 856 857 job->read_err = read_err; 858 job->write_err = write_err; 859 860 push(&kc->complete_jobs, job); 861 wake(kc); 862 } 863 EXPORT_SYMBOL(dm_kcopyd_do_callback); 864 865 /* 866 * Cancels a kcopyd job, eg. someone might be deactivating a 867 * mirror. 868 */ 869 #if 0 870 int kcopyd_cancel(struct kcopyd_job *job, int block) 871 { 872 /* FIXME: finish */ 873 return -1; 874 } 875 #endif /* 0 */ 876 877 /*----------------------------------------------------------------- 878 * Client setup 879 *---------------------------------------------------------------*/ 880 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 881 { 882 int r = -ENOMEM; 883 struct dm_kcopyd_client *kc; 884 885 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 886 if (!kc) 887 return ERR_PTR(-ENOMEM); 888 889 spin_lock_init(&kc->job_lock); 890 INIT_LIST_HEAD(&kc->complete_jobs); 891 INIT_LIST_HEAD(&kc->io_jobs); 892 INIT_LIST_HEAD(&kc->pages_jobs); 893 kc->throttle = throttle; 894 895 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 896 if (!kc->job_pool) 897 goto bad_slab; 898 899 INIT_WORK(&kc->kcopyd_work, do_work); 900 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 901 if (!kc->kcopyd_wq) 902 goto bad_workqueue; 903 904 kc->pages = NULL; 905 kc->nr_reserved_pages = kc->nr_free_pages = 0; 906 r = client_reserve_pages(kc, RESERVE_PAGES); 907 if (r) 908 goto bad_client_pages; 909 910 kc->io_client = dm_io_client_create(); 911 if (IS_ERR(kc->io_client)) { 912 r = PTR_ERR(kc->io_client); 913 goto bad_io_client; 914 } 915 916 init_waitqueue_head(&kc->destroyq); 917 atomic_set(&kc->nr_jobs, 0); 918 919 return kc; 920 921 bad_io_client: 922 client_free_pages(kc); 923 bad_client_pages: 924 destroy_workqueue(kc->kcopyd_wq); 925 bad_workqueue: 926 mempool_destroy(kc->job_pool); 927 bad_slab: 928 kfree(kc); 929 930 return ERR_PTR(r); 931 } 932 EXPORT_SYMBOL(dm_kcopyd_client_create); 933 934 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 935 { 936 /* Wait for completion of all jobs submitted by this client. */ 937 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 938 939 BUG_ON(!list_empty(&kc->complete_jobs)); 940 BUG_ON(!list_empty(&kc->io_jobs)); 941 BUG_ON(!list_empty(&kc->pages_jobs)); 942 destroy_workqueue(kc->kcopyd_wq); 943 dm_io_client_destroy(kc->io_client); 944 client_free_pages(kc); 945 mempool_destroy(kc->job_pool); 946 kfree(kc); 947 } 948 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 949