1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 atomic_t nr_jobs; 49 50 mempool_t *job_pool; 51 52 struct workqueue_struct *kcopyd_wq; 53 struct work_struct kcopyd_work; 54 55 struct dm_kcopyd_throttle *throttle; 56 57 /* 58 * We maintain three lists of jobs: 59 * 60 * i) jobs waiting for pages 61 * ii) jobs that have pages, and are waiting for the io to be issued. 62 * iii) jobs that have completed. 63 * 64 * All three of these are protected by job_lock. 65 */ 66 spinlock_t job_lock; 67 struct list_head complete_jobs; 68 struct list_head io_jobs; 69 struct list_head pages_jobs; 70 }; 71 72 static struct page_list zero_page_list; 73 74 static DEFINE_SPINLOCK(throttle_spinlock); 75 76 /* 77 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 78 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 79 * by 2. 80 */ 81 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 82 83 /* 84 * Sleep this number of milliseconds. 85 * 86 * The value was decided experimentally. 87 * Smaller values seem to cause an increased copy rate above the limit. 88 * The reason for this is unknown but possibly due to jiffies rounding errors 89 * or read/write cache inside the disk. 90 */ 91 #define SLEEP_MSEC 100 92 93 /* 94 * Maximum number of sleep events. There is a theoretical livelock if more 95 * kcopyd clients do work simultaneously which this limit avoids. 96 */ 97 #define MAX_SLEEPS 10 98 99 static void io_job_start(struct dm_kcopyd_throttle *t) 100 { 101 unsigned throttle, now, difference; 102 int slept = 0, skew; 103 104 if (unlikely(!t)) 105 return; 106 107 try_again: 108 spin_lock_irq(&throttle_spinlock); 109 110 throttle = ACCESS_ONCE(t->throttle); 111 112 if (likely(throttle >= 100)) 113 goto skip_limit; 114 115 now = jiffies; 116 difference = now - t->last_jiffies; 117 t->last_jiffies = now; 118 if (t->num_io_jobs) 119 t->io_period += difference; 120 t->total_period += difference; 121 122 /* 123 * Maintain sane values if we got a temporary overflow. 124 */ 125 if (unlikely(t->io_period > t->total_period)) 126 t->io_period = t->total_period; 127 128 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 129 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 130 t->total_period >>= shift; 131 t->io_period >>= shift; 132 } 133 134 skew = t->io_period - throttle * t->total_period / 100; 135 136 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 137 slept++; 138 spin_unlock_irq(&throttle_spinlock); 139 msleep(SLEEP_MSEC); 140 goto try_again; 141 } 142 143 skip_limit: 144 t->num_io_jobs++; 145 146 spin_unlock_irq(&throttle_spinlock); 147 } 148 149 static void io_job_finish(struct dm_kcopyd_throttle *t) 150 { 151 unsigned long flags; 152 153 if (unlikely(!t)) 154 return; 155 156 spin_lock_irqsave(&throttle_spinlock, flags); 157 158 t->num_io_jobs--; 159 160 if (likely(ACCESS_ONCE(t->throttle) >= 100)) 161 goto skip_limit; 162 163 if (!t->num_io_jobs) { 164 unsigned now, difference; 165 166 now = jiffies; 167 difference = now - t->last_jiffies; 168 t->last_jiffies = now; 169 170 t->io_period += difference; 171 t->total_period += difference; 172 173 /* 174 * Maintain sane values if we got a temporary overflow. 175 */ 176 if (unlikely(t->io_period > t->total_period)) 177 t->io_period = t->total_period; 178 } 179 180 skip_limit: 181 spin_unlock_irqrestore(&throttle_spinlock, flags); 182 } 183 184 185 static void wake(struct dm_kcopyd_client *kc) 186 { 187 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 188 } 189 190 /* 191 * Obtain one page for the use of kcopyd. 192 */ 193 static struct page_list *alloc_pl(gfp_t gfp) 194 { 195 struct page_list *pl; 196 197 pl = kmalloc(sizeof(*pl), gfp); 198 if (!pl) 199 return NULL; 200 201 pl->page = alloc_page(gfp); 202 if (!pl->page) { 203 kfree(pl); 204 return NULL; 205 } 206 207 return pl; 208 } 209 210 static void free_pl(struct page_list *pl) 211 { 212 __free_page(pl->page); 213 kfree(pl); 214 } 215 216 /* 217 * Add the provided pages to a client's free page list, releasing 218 * back to the system any beyond the reserved_pages limit. 219 */ 220 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 221 { 222 struct page_list *next; 223 224 do { 225 next = pl->next; 226 227 if (kc->nr_free_pages >= kc->nr_reserved_pages) 228 free_pl(pl); 229 else { 230 pl->next = kc->pages; 231 kc->pages = pl; 232 kc->nr_free_pages++; 233 } 234 235 pl = next; 236 } while (pl); 237 } 238 239 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 240 unsigned int nr, struct page_list **pages) 241 { 242 struct page_list *pl; 243 244 *pages = NULL; 245 246 do { 247 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY); 248 if (unlikely(!pl)) { 249 /* Use reserved pages */ 250 pl = kc->pages; 251 if (unlikely(!pl)) 252 goto out_of_memory; 253 kc->pages = pl->next; 254 kc->nr_free_pages--; 255 } 256 pl->next = *pages; 257 *pages = pl; 258 } while (--nr); 259 260 return 0; 261 262 out_of_memory: 263 if (*pages) 264 kcopyd_put_pages(kc, *pages); 265 return -ENOMEM; 266 } 267 268 /* 269 * These three functions resize the page pool. 270 */ 271 static void drop_pages(struct page_list *pl) 272 { 273 struct page_list *next; 274 275 while (pl) { 276 next = pl->next; 277 free_pl(pl); 278 pl = next; 279 } 280 } 281 282 /* 283 * Allocate and reserve nr_pages for the use of a specific client. 284 */ 285 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 286 { 287 unsigned i; 288 struct page_list *pl = NULL, *next; 289 290 for (i = 0; i < nr_pages; i++) { 291 next = alloc_pl(GFP_KERNEL); 292 if (!next) { 293 if (pl) 294 drop_pages(pl); 295 return -ENOMEM; 296 } 297 next->next = pl; 298 pl = next; 299 } 300 301 kc->nr_reserved_pages += nr_pages; 302 kcopyd_put_pages(kc, pl); 303 304 return 0; 305 } 306 307 static void client_free_pages(struct dm_kcopyd_client *kc) 308 { 309 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 310 drop_pages(kc->pages); 311 kc->pages = NULL; 312 kc->nr_free_pages = kc->nr_reserved_pages = 0; 313 } 314 315 /*----------------------------------------------------------------- 316 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 317 * for this reason we use a mempool to prevent the client from 318 * ever having to do io (which could cause a deadlock). 319 *---------------------------------------------------------------*/ 320 struct kcopyd_job { 321 struct dm_kcopyd_client *kc; 322 struct list_head list; 323 unsigned long flags; 324 325 /* 326 * Error state of the job. 327 */ 328 int read_err; 329 unsigned long write_err; 330 331 /* 332 * Either READ or WRITE 333 */ 334 int rw; 335 struct dm_io_region source; 336 337 /* 338 * The destinations for the transfer. 339 */ 340 unsigned int num_dests; 341 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 342 343 struct page_list *pages; 344 345 /* 346 * Set this to ensure you are notified when the job has 347 * completed. 'context' is for callback to use. 348 */ 349 dm_kcopyd_notify_fn fn; 350 void *context; 351 352 /* 353 * These fields are only used if the job has been split 354 * into more manageable parts. 355 */ 356 struct mutex lock; 357 atomic_t sub_jobs; 358 sector_t progress; 359 360 struct kcopyd_job *master_job; 361 }; 362 363 static struct kmem_cache *_job_cache; 364 365 int __init dm_kcopyd_init(void) 366 { 367 _job_cache = kmem_cache_create("kcopyd_job", 368 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 369 __alignof__(struct kcopyd_job), 0, NULL); 370 if (!_job_cache) 371 return -ENOMEM; 372 373 zero_page_list.next = &zero_page_list; 374 zero_page_list.page = ZERO_PAGE(0); 375 376 return 0; 377 } 378 379 void dm_kcopyd_exit(void) 380 { 381 kmem_cache_destroy(_job_cache); 382 _job_cache = NULL; 383 } 384 385 /* 386 * Functions to push and pop a job onto the head of a given job 387 * list. 388 */ 389 static struct kcopyd_job *pop(struct list_head *jobs, 390 struct dm_kcopyd_client *kc) 391 { 392 struct kcopyd_job *job = NULL; 393 unsigned long flags; 394 395 spin_lock_irqsave(&kc->job_lock, flags); 396 397 if (!list_empty(jobs)) { 398 job = list_entry(jobs->next, struct kcopyd_job, list); 399 list_del(&job->list); 400 } 401 spin_unlock_irqrestore(&kc->job_lock, flags); 402 403 return job; 404 } 405 406 static void push(struct list_head *jobs, struct kcopyd_job *job) 407 { 408 unsigned long flags; 409 struct dm_kcopyd_client *kc = job->kc; 410 411 spin_lock_irqsave(&kc->job_lock, flags); 412 list_add_tail(&job->list, jobs); 413 spin_unlock_irqrestore(&kc->job_lock, flags); 414 } 415 416 417 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 418 { 419 unsigned long flags; 420 struct dm_kcopyd_client *kc = job->kc; 421 422 spin_lock_irqsave(&kc->job_lock, flags); 423 list_add(&job->list, jobs); 424 spin_unlock_irqrestore(&kc->job_lock, flags); 425 } 426 427 /* 428 * These three functions process 1 item from the corresponding 429 * job list. 430 * 431 * They return: 432 * < 0: error 433 * 0: success 434 * > 0: can't process yet. 435 */ 436 static int run_complete_job(struct kcopyd_job *job) 437 { 438 void *context = job->context; 439 int read_err = job->read_err; 440 unsigned long write_err = job->write_err; 441 dm_kcopyd_notify_fn fn = job->fn; 442 struct dm_kcopyd_client *kc = job->kc; 443 444 if (job->pages && job->pages != &zero_page_list) 445 kcopyd_put_pages(kc, job->pages); 446 /* 447 * If this is the master job, the sub jobs have already 448 * completed so we can free everything. 449 */ 450 if (job->master_job == job) 451 mempool_free(job, kc->job_pool); 452 fn(read_err, write_err, context); 453 454 if (atomic_dec_and_test(&kc->nr_jobs)) 455 wake_up(&kc->destroyq); 456 457 return 0; 458 } 459 460 static void complete_io(unsigned long error, void *context) 461 { 462 struct kcopyd_job *job = (struct kcopyd_job *) context; 463 struct dm_kcopyd_client *kc = job->kc; 464 465 io_job_finish(kc->throttle); 466 467 if (error) { 468 if (job->rw & WRITE) 469 job->write_err |= error; 470 else 471 job->read_err = 1; 472 473 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 474 push(&kc->complete_jobs, job); 475 wake(kc); 476 return; 477 } 478 } 479 480 if (job->rw & WRITE) 481 push(&kc->complete_jobs, job); 482 483 else { 484 job->rw = WRITE; 485 push(&kc->io_jobs, job); 486 } 487 488 wake(kc); 489 } 490 491 /* 492 * Request io on as many buffer heads as we can currently get for 493 * a particular job. 494 */ 495 static int run_io_job(struct kcopyd_job *job) 496 { 497 int r; 498 struct dm_io_request io_req = { 499 .bi_rw = job->rw, 500 .mem.type = DM_IO_PAGE_LIST, 501 .mem.ptr.pl = job->pages, 502 .mem.offset = 0, 503 .notify.fn = complete_io, 504 .notify.context = job, 505 .client = job->kc->io_client, 506 }; 507 508 io_job_start(job->kc->throttle); 509 510 if (job->rw == READ) 511 r = dm_io(&io_req, 1, &job->source, NULL); 512 else 513 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 514 515 return r; 516 } 517 518 static int run_pages_job(struct kcopyd_job *job) 519 { 520 int r; 521 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 522 523 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 524 if (!r) { 525 /* this job is ready for io */ 526 push(&job->kc->io_jobs, job); 527 return 0; 528 } 529 530 if (r == -ENOMEM) 531 /* can't complete now */ 532 return 1; 533 534 return r; 535 } 536 537 /* 538 * Run through a list for as long as possible. Returns the count 539 * of successful jobs. 540 */ 541 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 542 int (*fn) (struct kcopyd_job *)) 543 { 544 struct kcopyd_job *job; 545 int r, count = 0; 546 547 while ((job = pop(jobs, kc))) { 548 549 r = fn(job); 550 551 if (r < 0) { 552 /* error this rogue job */ 553 if (job->rw & WRITE) 554 job->write_err = (unsigned long) -1L; 555 else 556 job->read_err = 1; 557 push(&kc->complete_jobs, job); 558 break; 559 } 560 561 if (r > 0) { 562 /* 563 * We couldn't service this job ATM, so 564 * push this job back onto the list. 565 */ 566 push_head(jobs, job); 567 break; 568 } 569 570 count++; 571 } 572 573 return count; 574 } 575 576 /* 577 * kcopyd does this every time it's woken up. 578 */ 579 static void do_work(struct work_struct *work) 580 { 581 struct dm_kcopyd_client *kc = container_of(work, 582 struct dm_kcopyd_client, kcopyd_work); 583 struct blk_plug plug; 584 585 /* 586 * The order that these are called is *very* important. 587 * complete jobs can free some pages for pages jobs. 588 * Pages jobs when successful will jump onto the io jobs 589 * list. io jobs call wake when they complete and it all 590 * starts again. 591 */ 592 blk_start_plug(&plug); 593 process_jobs(&kc->complete_jobs, kc, run_complete_job); 594 process_jobs(&kc->pages_jobs, kc, run_pages_job); 595 process_jobs(&kc->io_jobs, kc, run_io_job); 596 blk_finish_plug(&plug); 597 } 598 599 /* 600 * If we are copying a small region we just dispatch a single job 601 * to do the copy, otherwise the io has to be split up into many 602 * jobs. 603 */ 604 static void dispatch_job(struct kcopyd_job *job) 605 { 606 struct dm_kcopyd_client *kc = job->kc; 607 atomic_inc(&kc->nr_jobs); 608 if (unlikely(!job->source.count)) 609 push(&kc->complete_jobs, job); 610 else if (job->pages == &zero_page_list) 611 push(&kc->io_jobs, job); 612 else 613 push(&kc->pages_jobs, job); 614 wake(kc); 615 } 616 617 static void segment_complete(int read_err, unsigned long write_err, 618 void *context) 619 { 620 /* FIXME: tidy this function */ 621 sector_t progress = 0; 622 sector_t count = 0; 623 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 624 struct kcopyd_job *job = sub_job->master_job; 625 struct dm_kcopyd_client *kc = job->kc; 626 627 mutex_lock(&job->lock); 628 629 /* update the error */ 630 if (read_err) 631 job->read_err = 1; 632 633 if (write_err) 634 job->write_err |= write_err; 635 636 /* 637 * Only dispatch more work if there hasn't been an error. 638 */ 639 if ((!job->read_err && !job->write_err) || 640 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 641 /* get the next chunk of work */ 642 progress = job->progress; 643 count = job->source.count - progress; 644 if (count) { 645 if (count > SUB_JOB_SIZE) 646 count = SUB_JOB_SIZE; 647 648 job->progress += count; 649 } 650 } 651 mutex_unlock(&job->lock); 652 653 if (count) { 654 int i; 655 656 *sub_job = *job; 657 sub_job->source.sector += progress; 658 sub_job->source.count = count; 659 660 for (i = 0; i < job->num_dests; i++) { 661 sub_job->dests[i].sector += progress; 662 sub_job->dests[i].count = count; 663 } 664 665 sub_job->fn = segment_complete; 666 sub_job->context = sub_job; 667 dispatch_job(sub_job); 668 669 } else if (atomic_dec_and_test(&job->sub_jobs)) { 670 671 /* 672 * Queue the completion callback to the kcopyd thread. 673 * 674 * Some callers assume that all the completions are called 675 * from a single thread and don't race with each other. 676 * 677 * We must not call the callback directly here because this 678 * code may not be executing in the thread. 679 */ 680 push(&kc->complete_jobs, job); 681 wake(kc); 682 } 683 } 684 685 /* 686 * Create some sub jobs to share the work between them. 687 */ 688 static void split_job(struct kcopyd_job *master_job) 689 { 690 int i; 691 692 atomic_inc(&master_job->kc->nr_jobs); 693 694 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 695 for (i = 0; i < SPLIT_COUNT; i++) { 696 master_job[i + 1].master_job = master_job; 697 segment_complete(0, 0u, &master_job[i + 1]); 698 } 699 } 700 701 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 702 unsigned int num_dests, struct dm_io_region *dests, 703 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 704 { 705 struct kcopyd_job *job; 706 int i; 707 708 /* 709 * Allocate an array of jobs consisting of one master job 710 * followed by SPLIT_COUNT sub jobs. 711 */ 712 job = mempool_alloc(kc->job_pool, GFP_NOIO); 713 714 /* 715 * set up for the read. 716 */ 717 job->kc = kc; 718 job->flags = flags; 719 job->read_err = 0; 720 job->write_err = 0; 721 722 job->num_dests = num_dests; 723 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 724 725 if (from) { 726 job->source = *from; 727 job->pages = NULL; 728 job->rw = READ; 729 } else { 730 memset(&job->source, 0, sizeof job->source); 731 job->source.count = job->dests[0].count; 732 job->pages = &zero_page_list; 733 734 /* 735 * Use WRITE SAME to optimize zeroing if all dests support it. 736 */ 737 job->rw = WRITE | REQ_WRITE_SAME; 738 for (i = 0; i < job->num_dests; i++) 739 if (!bdev_write_same(job->dests[i].bdev)) { 740 job->rw = WRITE; 741 break; 742 } 743 } 744 745 job->fn = fn; 746 job->context = context; 747 job->master_job = job; 748 749 if (job->source.count <= SUB_JOB_SIZE) 750 dispatch_job(job); 751 else { 752 mutex_init(&job->lock); 753 job->progress = 0; 754 split_job(job); 755 } 756 757 return 0; 758 } 759 EXPORT_SYMBOL(dm_kcopyd_copy); 760 761 int dm_kcopyd_zero(struct dm_kcopyd_client *kc, 762 unsigned num_dests, struct dm_io_region *dests, 763 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 764 { 765 return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 766 } 767 EXPORT_SYMBOL(dm_kcopyd_zero); 768 769 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 770 dm_kcopyd_notify_fn fn, void *context) 771 { 772 struct kcopyd_job *job; 773 774 job = mempool_alloc(kc->job_pool, GFP_NOIO); 775 776 memset(job, 0, sizeof(struct kcopyd_job)); 777 job->kc = kc; 778 job->fn = fn; 779 job->context = context; 780 job->master_job = job; 781 782 atomic_inc(&kc->nr_jobs); 783 784 return job; 785 } 786 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 787 788 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 789 { 790 struct kcopyd_job *job = j; 791 struct dm_kcopyd_client *kc = job->kc; 792 793 job->read_err = read_err; 794 job->write_err = write_err; 795 796 push(&kc->complete_jobs, job); 797 wake(kc); 798 } 799 EXPORT_SYMBOL(dm_kcopyd_do_callback); 800 801 /* 802 * Cancels a kcopyd job, eg. someone might be deactivating a 803 * mirror. 804 */ 805 #if 0 806 int kcopyd_cancel(struct kcopyd_job *job, int block) 807 { 808 /* FIXME: finish */ 809 return -1; 810 } 811 #endif /* 0 */ 812 813 /*----------------------------------------------------------------- 814 * Client setup 815 *---------------------------------------------------------------*/ 816 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 817 { 818 int r = -ENOMEM; 819 struct dm_kcopyd_client *kc; 820 821 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 822 if (!kc) 823 return ERR_PTR(-ENOMEM); 824 825 spin_lock_init(&kc->job_lock); 826 INIT_LIST_HEAD(&kc->complete_jobs); 827 INIT_LIST_HEAD(&kc->io_jobs); 828 INIT_LIST_HEAD(&kc->pages_jobs); 829 kc->throttle = throttle; 830 831 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 832 if (!kc->job_pool) 833 goto bad_slab; 834 835 INIT_WORK(&kc->kcopyd_work, do_work); 836 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 837 if (!kc->kcopyd_wq) 838 goto bad_workqueue; 839 840 kc->pages = NULL; 841 kc->nr_reserved_pages = kc->nr_free_pages = 0; 842 r = client_reserve_pages(kc, RESERVE_PAGES); 843 if (r) 844 goto bad_client_pages; 845 846 kc->io_client = dm_io_client_create(); 847 if (IS_ERR(kc->io_client)) { 848 r = PTR_ERR(kc->io_client); 849 goto bad_io_client; 850 } 851 852 init_waitqueue_head(&kc->destroyq); 853 atomic_set(&kc->nr_jobs, 0); 854 855 return kc; 856 857 bad_io_client: 858 client_free_pages(kc); 859 bad_client_pages: 860 destroy_workqueue(kc->kcopyd_wq); 861 bad_workqueue: 862 mempool_destroy(kc->job_pool); 863 bad_slab: 864 kfree(kc); 865 866 return ERR_PTR(r); 867 } 868 EXPORT_SYMBOL(dm_kcopyd_client_create); 869 870 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 871 { 872 /* Wait for completion of all jobs submitted by this client. */ 873 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 874 875 BUG_ON(!list_empty(&kc->complete_jobs)); 876 BUG_ON(!list_empty(&kc->io_jobs)); 877 BUG_ON(!list_empty(&kc->pages_jobs)); 878 destroy_workqueue(kc->kcopyd_wq); 879 dm_io_client_destroy(kc->io_client); 880 client_free_pages(kc); 881 mempool_destroy(kc->job_pool); 882 kfree(kc); 883 } 884 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 885