1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/delay.h> 26 #include <linux/device-mapper.h> 27 #include <linux/dm-kcopyd.h> 28 29 #include "dm-core.h" 30 31 #define SUB_JOB_SIZE 128 32 #define SPLIT_COUNT 8 33 #define MIN_JOBS 8 34 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 35 36 /*----------------------------------------------------------------- 37 * Each kcopyd client has its own little pool of preallocated 38 * pages for kcopyd io. 39 *---------------------------------------------------------------*/ 40 struct dm_kcopyd_client { 41 struct page_list *pages; 42 unsigned nr_reserved_pages; 43 unsigned nr_free_pages; 44 45 struct dm_io_client *io_client; 46 47 wait_queue_head_t destroyq; 48 atomic_t nr_jobs; 49 50 mempool_t *job_pool; 51 52 struct workqueue_struct *kcopyd_wq; 53 struct work_struct kcopyd_work; 54 55 struct dm_kcopyd_throttle *throttle; 56 57 /* 58 * We maintain three lists of jobs: 59 * 60 * i) jobs waiting for pages 61 * ii) jobs that have pages, and are waiting for the io to be issued. 62 * iii) jobs that have completed. 63 * 64 * All three of these are protected by job_lock. 65 */ 66 spinlock_t job_lock; 67 struct list_head complete_jobs; 68 struct list_head io_jobs; 69 struct list_head pages_jobs; 70 }; 71 72 static struct page_list zero_page_list; 73 74 static DEFINE_SPINLOCK(throttle_spinlock); 75 76 /* 77 * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. 78 * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided 79 * by 2. 80 */ 81 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ 82 83 /* 84 * Sleep this number of milliseconds. 85 * 86 * The value was decided experimentally. 87 * Smaller values seem to cause an increased copy rate above the limit. 88 * The reason for this is unknown but possibly due to jiffies rounding errors 89 * or read/write cache inside the disk. 90 */ 91 #define SLEEP_MSEC 100 92 93 /* 94 * Maximum number of sleep events. There is a theoretical livelock if more 95 * kcopyd clients do work simultaneously which this limit avoids. 96 */ 97 #define MAX_SLEEPS 10 98 99 static void io_job_start(struct dm_kcopyd_throttle *t) 100 { 101 unsigned throttle, now, difference; 102 int slept = 0, skew; 103 104 if (unlikely(!t)) 105 return; 106 107 try_again: 108 spin_lock_irq(&throttle_spinlock); 109 110 throttle = READ_ONCE(t->throttle); 111 112 if (likely(throttle >= 100)) 113 goto skip_limit; 114 115 now = jiffies; 116 difference = now - t->last_jiffies; 117 t->last_jiffies = now; 118 if (t->num_io_jobs) 119 t->io_period += difference; 120 t->total_period += difference; 121 122 /* 123 * Maintain sane values if we got a temporary overflow. 124 */ 125 if (unlikely(t->io_period > t->total_period)) 126 t->io_period = t->total_period; 127 128 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) { 129 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT); 130 t->total_period >>= shift; 131 t->io_period >>= shift; 132 } 133 134 skew = t->io_period - throttle * t->total_period / 100; 135 136 if (unlikely(skew > 0) && slept < MAX_SLEEPS) { 137 slept++; 138 spin_unlock_irq(&throttle_spinlock); 139 msleep(SLEEP_MSEC); 140 goto try_again; 141 } 142 143 skip_limit: 144 t->num_io_jobs++; 145 146 spin_unlock_irq(&throttle_spinlock); 147 } 148 149 static void io_job_finish(struct dm_kcopyd_throttle *t) 150 { 151 unsigned long flags; 152 153 if (unlikely(!t)) 154 return; 155 156 spin_lock_irqsave(&throttle_spinlock, flags); 157 158 t->num_io_jobs--; 159 160 if (likely(READ_ONCE(t->throttle) >= 100)) 161 goto skip_limit; 162 163 if (!t->num_io_jobs) { 164 unsigned now, difference; 165 166 now = jiffies; 167 difference = now - t->last_jiffies; 168 t->last_jiffies = now; 169 170 t->io_period += difference; 171 t->total_period += difference; 172 173 /* 174 * Maintain sane values if we got a temporary overflow. 175 */ 176 if (unlikely(t->io_period > t->total_period)) 177 t->io_period = t->total_period; 178 } 179 180 skip_limit: 181 spin_unlock_irqrestore(&throttle_spinlock, flags); 182 } 183 184 185 static void wake(struct dm_kcopyd_client *kc) 186 { 187 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 188 } 189 190 /* 191 * Obtain one page for the use of kcopyd. 192 */ 193 static struct page_list *alloc_pl(gfp_t gfp) 194 { 195 struct page_list *pl; 196 197 pl = kmalloc(sizeof(*pl), gfp); 198 if (!pl) 199 return NULL; 200 201 pl->page = alloc_page(gfp); 202 if (!pl->page) { 203 kfree(pl); 204 return NULL; 205 } 206 207 return pl; 208 } 209 210 static void free_pl(struct page_list *pl) 211 { 212 __free_page(pl->page); 213 kfree(pl); 214 } 215 216 /* 217 * Add the provided pages to a client's free page list, releasing 218 * back to the system any beyond the reserved_pages limit. 219 */ 220 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 221 { 222 struct page_list *next; 223 224 do { 225 next = pl->next; 226 227 if (kc->nr_free_pages >= kc->nr_reserved_pages) 228 free_pl(pl); 229 else { 230 pl->next = kc->pages; 231 kc->pages = pl; 232 kc->nr_free_pages++; 233 } 234 235 pl = next; 236 } while (pl); 237 } 238 239 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 240 unsigned int nr, struct page_list **pages) 241 { 242 struct page_list *pl; 243 244 *pages = NULL; 245 246 do { 247 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); 248 if (unlikely(!pl)) { 249 /* Use reserved pages */ 250 pl = kc->pages; 251 if (unlikely(!pl)) 252 goto out_of_memory; 253 kc->pages = pl->next; 254 kc->nr_free_pages--; 255 } 256 pl->next = *pages; 257 *pages = pl; 258 } while (--nr); 259 260 return 0; 261 262 out_of_memory: 263 if (*pages) 264 kcopyd_put_pages(kc, *pages); 265 return -ENOMEM; 266 } 267 268 /* 269 * These three functions resize the page pool. 270 */ 271 static void drop_pages(struct page_list *pl) 272 { 273 struct page_list *next; 274 275 while (pl) { 276 next = pl->next; 277 free_pl(pl); 278 pl = next; 279 } 280 } 281 282 /* 283 * Allocate and reserve nr_pages for the use of a specific client. 284 */ 285 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 286 { 287 unsigned i; 288 struct page_list *pl = NULL, *next; 289 290 for (i = 0; i < nr_pages; i++) { 291 next = alloc_pl(GFP_KERNEL); 292 if (!next) { 293 if (pl) 294 drop_pages(pl); 295 return -ENOMEM; 296 } 297 next->next = pl; 298 pl = next; 299 } 300 301 kc->nr_reserved_pages += nr_pages; 302 kcopyd_put_pages(kc, pl); 303 304 return 0; 305 } 306 307 static void client_free_pages(struct dm_kcopyd_client *kc) 308 { 309 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 310 drop_pages(kc->pages); 311 kc->pages = NULL; 312 kc->nr_free_pages = kc->nr_reserved_pages = 0; 313 } 314 315 /*----------------------------------------------------------------- 316 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 317 * for this reason we use a mempool to prevent the client from 318 * ever having to do io (which could cause a deadlock). 319 *---------------------------------------------------------------*/ 320 struct kcopyd_job { 321 struct dm_kcopyd_client *kc; 322 struct list_head list; 323 unsigned long flags; 324 325 /* 326 * Error state of the job. 327 */ 328 int read_err; 329 unsigned long write_err; 330 331 /* 332 * Either READ or WRITE 333 */ 334 int rw; 335 struct dm_io_region source; 336 337 /* 338 * The destinations for the transfer. 339 */ 340 unsigned int num_dests; 341 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 342 343 struct page_list *pages; 344 345 /* 346 * Set this to ensure you are notified when the job has 347 * completed. 'context' is for callback to use. 348 */ 349 dm_kcopyd_notify_fn fn; 350 void *context; 351 352 /* 353 * These fields are only used if the job has been split 354 * into more manageable parts. 355 */ 356 struct mutex lock; 357 atomic_t sub_jobs; 358 sector_t progress; 359 sector_t write_offset; 360 361 struct kcopyd_job *master_job; 362 }; 363 364 static struct kmem_cache *_job_cache; 365 366 int __init dm_kcopyd_init(void) 367 { 368 _job_cache = kmem_cache_create("kcopyd_job", 369 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 370 __alignof__(struct kcopyd_job), 0, NULL); 371 if (!_job_cache) 372 return -ENOMEM; 373 374 zero_page_list.next = &zero_page_list; 375 zero_page_list.page = ZERO_PAGE(0); 376 377 return 0; 378 } 379 380 void dm_kcopyd_exit(void) 381 { 382 kmem_cache_destroy(_job_cache); 383 _job_cache = NULL; 384 } 385 386 /* 387 * Functions to push and pop a job onto the head of a given job 388 * list. 389 */ 390 static struct kcopyd_job *pop_io_job(struct list_head *jobs, 391 struct dm_kcopyd_client *kc) 392 { 393 struct kcopyd_job *job; 394 395 /* 396 * For I/O jobs, pop any read, any write without sequential write 397 * constraint and sequential writes that are at the right position. 398 */ 399 list_for_each_entry(job, jobs, list) { 400 if (job->rw == READ || !test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 401 list_del(&job->list); 402 return job; 403 } 404 405 if (job->write_offset == job->master_job->write_offset) { 406 job->master_job->write_offset += job->source.count; 407 list_del(&job->list); 408 return job; 409 } 410 } 411 412 return NULL; 413 } 414 415 static struct kcopyd_job *pop(struct list_head *jobs, 416 struct dm_kcopyd_client *kc) 417 { 418 struct kcopyd_job *job = NULL; 419 unsigned long flags; 420 421 spin_lock_irqsave(&kc->job_lock, flags); 422 423 if (!list_empty(jobs)) { 424 if (jobs == &kc->io_jobs) 425 job = pop_io_job(jobs, kc); 426 else { 427 job = list_entry(jobs->next, struct kcopyd_job, list); 428 list_del(&job->list); 429 } 430 } 431 spin_unlock_irqrestore(&kc->job_lock, flags); 432 433 return job; 434 } 435 436 static void push(struct list_head *jobs, struct kcopyd_job *job) 437 { 438 unsigned long flags; 439 struct dm_kcopyd_client *kc = job->kc; 440 441 spin_lock_irqsave(&kc->job_lock, flags); 442 list_add_tail(&job->list, jobs); 443 spin_unlock_irqrestore(&kc->job_lock, flags); 444 } 445 446 447 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 448 { 449 unsigned long flags; 450 struct dm_kcopyd_client *kc = job->kc; 451 452 spin_lock_irqsave(&kc->job_lock, flags); 453 list_add(&job->list, jobs); 454 spin_unlock_irqrestore(&kc->job_lock, flags); 455 } 456 457 /* 458 * These three functions process 1 item from the corresponding 459 * job list. 460 * 461 * They return: 462 * < 0: error 463 * 0: success 464 * > 0: can't process yet. 465 */ 466 static int run_complete_job(struct kcopyd_job *job) 467 { 468 void *context = job->context; 469 int read_err = job->read_err; 470 unsigned long write_err = job->write_err; 471 dm_kcopyd_notify_fn fn = job->fn; 472 struct dm_kcopyd_client *kc = job->kc; 473 474 if (job->pages && job->pages != &zero_page_list) 475 kcopyd_put_pages(kc, job->pages); 476 /* 477 * If this is the master job, the sub jobs have already 478 * completed so we can free everything. 479 */ 480 if (job->master_job == job) 481 mempool_free(job, kc->job_pool); 482 fn(read_err, write_err, context); 483 484 if (atomic_dec_and_test(&kc->nr_jobs)) 485 wake_up(&kc->destroyq); 486 487 return 0; 488 } 489 490 static void complete_io(unsigned long error, void *context) 491 { 492 struct kcopyd_job *job = (struct kcopyd_job *) context; 493 struct dm_kcopyd_client *kc = job->kc; 494 495 io_job_finish(kc->throttle); 496 497 if (error) { 498 if (op_is_write(job->rw)) 499 job->write_err |= error; 500 else 501 job->read_err = 1; 502 503 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 504 push(&kc->complete_jobs, job); 505 wake(kc); 506 return; 507 } 508 } 509 510 if (op_is_write(job->rw)) 511 push(&kc->complete_jobs, job); 512 513 else { 514 job->rw = WRITE; 515 push(&kc->io_jobs, job); 516 } 517 518 wake(kc); 519 } 520 521 /* 522 * Request io on as many buffer heads as we can currently get for 523 * a particular job. 524 */ 525 static int run_io_job(struct kcopyd_job *job) 526 { 527 int r; 528 struct dm_io_request io_req = { 529 .bi_op = job->rw, 530 .bi_op_flags = 0, 531 .mem.type = DM_IO_PAGE_LIST, 532 .mem.ptr.pl = job->pages, 533 .mem.offset = 0, 534 .notify.fn = complete_io, 535 .notify.context = job, 536 .client = job->kc->io_client, 537 }; 538 539 /* 540 * If we need to write sequentially and some reads or writes failed, 541 * no point in continuing. 542 */ 543 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 544 job->master_job->write_err) 545 return -EIO; 546 547 io_job_start(job->kc->throttle); 548 549 if (job->rw == READ) 550 r = dm_io(&io_req, 1, &job->source, NULL); 551 else 552 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 553 554 return r; 555 } 556 557 static int run_pages_job(struct kcopyd_job *job) 558 { 559 int r; 560 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 561 562 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 563 if (!r) { 564 /* this job is ready for io */ 565 push(&job->kc->io_jobs, job); 566 return 0; 567 } 568 569 if (r == -ENOMEM) 570 /* can't complete now */ 571 return 1; 572 573 return r; 574 } 575 576 /* 577 * Run through a list for as long as possible. Returns the count 578 * of successful jobs. 579 */ 580 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 581 int (*fn) (struct kcopyd_job *)) 582 { 583 struct kcopyd_job *job; 584 int r, count = 0; 585 586 while ((job = pop(jobs, kc))) { 587 588 r = fn(job); 589 590 if (r < 0) { 591 /* error this rogue job */ 592 if (op_is_write(job->rw)) 593 job->write_err = (unsigned long) -1L; 594 else 595 job->read_err = 1; 596 push(&kc->complete_jobs, job); 597 break; 598 } 599 600 if (r > 0) { 601 /* 602 * We couldn't service this job ATM, so 603 * push this job back onto the list. 604 */ 605 push_head(jobs, job); 606 break; 607 } 608 609 count++; 610 } 611 612 return count; 613 } 614 615 /* 616 * kcopyd does this every time it's woken up. 617 */ 618 static void do_work(struct work_struct *work) 619 { 620 struct dm_kcopyd_client *kc = container_of(work, 621 struct dm_kcopyd_client, kcopyd_work); 622 struct blk_plug plug; 623 624 /* 625 * The order that these are called is *very* important. 626 * complete jobs can free some pages for pages jobs. 627 * Pages jobs when successful will jump onto the io jobs 628 * list. io jobs call wake when they complete and it all 629 * starts again. 630 */ 631 blk_start_plug(&plug); 632 process_jobs(&kc->complete_jobs, kc, run_complete_job); 633 process_jobs(&kc->pages_jobs, kc, run_pages_job); 634 process_jobs(&kc->io_jobs, kc, run_io_job); 635 blk_finish_plug(&plug); 636 } 637 638 /* 639 * If we are copying a small region we just dispatch a single job 640 * to do the copy, otherwise the io has to be split up into many 641 * jobs. 642 */ 643 static void dispatch_job(struct kcopyd_job *job) 644 { 645 struct dm_kcopyd_client *kc = job->kc; 646 atomic_inc(&kc->nr_jobs); 647 if (unlikely(!job->source.count)) 648 push(&kc->complete_jobs, job); 649 else if (job->pages == &zero_page_list) 650 push(&kc->io_jobs, job); 651 else 652 push(&kc->pages_jobs, job); 653 wake(kc); 654 } 655 656 static void segment_complete(int read_err, unsigned long write_err, 657 void *context) 658 { 659 /* FIXME: tidy this function */ 660 sector_t progress = 0; 661 sector_t count = 0; 662 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 663 struct kcopyd_job *job = sub_job->master_job; 664 struct dm_kcopyd_client *kc = job->kc; 665 666 mutex_lock(&job->lock); 667 668 /* update the error */ 669 if (read_err) 670 job->read_err = 1; 671 672 if (write_err) 673 job->write_err |= write_err; 674 675 /* 676 * Only dispatch more work if there hasn't been an error. 677 */ 678 if ((!job->read_err && !job->write_err) || 679 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 680 /* get the next chunk of work */ 681 progress = job->progress; 682 count = job->source.count - progress; 683 if (count) { 684 if (count > SUB_JOB_SIZE) 685 count = SUB_JOB_SIZE; 686 687 job->progress += count; 688 } 689 } 690 mutex_unlock(&job->lock); 691 692 if (count) { 693 int i; 694 695 *sub_job = *job; 696 sub_job->write_offset = progress; 697 sub_job->source.sector += progress; 698 sub_job->source.count = count; 699 700 for (i = 0; i < job->num_dests; i++) { 701 sub_job->dests[i].sector += progress; 702 sub_job->dests[i].count = count; 703 } 704 705 sub_job->fn = segment_complete; 706 sub_job->context = sub_job; 707 dispatch_job(sub_job); 708 709 } else if (atomic_dec_and_test(&job->sub_jobs)) { 710 711 /* 712 * Queue the completion callback to the kcopyd thread. 713 * 714 * Some callers assume that all the completions are called 715 * from a single thread and don't race with each other. 716 * 717 * We must not call the callback directly here because this 718 * code may not be executing in the thread. 719 */ 720 push(&kc->complete_jobs, job); 721 wake(kc); 722 } 723 } 724 725 /* 726 * Create some sub jobs to share the work between them. 727 */ 728 static void split_job(struct kcopyd_job *master_job) 729 { 730 int i; 731 732 atomic_inc(&master_job->kc->nr_jobs); 733 734 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 735 for (i = 0; i < SPLIT_COUNT; i++) { 736 master_job[i + 1].master_job = master_job; 737 segment_complete(0, 0u, &master_job[i + 1]); 738 } 739 } 740 741 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 742 unsigned int num_dests, struct dm_io_region *dests, 743 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 744 { 745 struct kcopyd_job *job; 746 int i; 747 748 /* 749 * Allocate an array of jobs consisting of one master job 750 * followed by SPLIT_COUNT sub jobs. 751 */ 752 job = mempool_alloc(kc->job_pool, GFP_NOIO); 753 754 /* 755 * set up for the read. 756 */ 757 job->kc = kc; 758 job->flags = flags; 759 job->read_err = 0; 760 job->write_err = 0; 761 762 job->num_dests = num_dests; 763 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 764 765 /* 766 * If one of the destination is a host-managed zoned block device, 767 * we need to write sequentially. If one of the destination is a 768 * host-aware device, then leave it to the caller to choose what to do. 769 */ 770 if (!test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags)) { 771 for (i = 0; i < job->num_dests; i++) { 772 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) { 773 set_bit(DM_KCOPYD_WRITE_SEQ, &job->flags); 774 break; 775 } 776 } 777 } 778 779 /* 780 * If we need to write sequentially, errors cannot be ignored. 781 */ 782 if (test_bit(DM_KCOPYD_WRITE_SEQ, &job->flags) && 783 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) 784 clear_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags); 785 786 if (from) { 787 job->source = *from; 788 job->pages = NULL; 789 job->rw = READ; 790 } else { 791 memset(&job->source, 0, sizeof job->source); 792 job->source.count = job->dests[0].count; 793 job->pages = &zero_page_list; 794 795 /* 796 * Use WRITE ZEROES to optimize zeroing if all dests support it. 797 */ 798 job->rw = REQ_OP_WRITE_ZEROES; 799 for (i = 0; i < job->num_dests; i++) 800 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) { 801 job->rw = WRITE; 802 break; 803 } 804 } 805 806 job->fn = fn; 807 job->context = context; 808 job->master_job = job; 809 job->write_offset = 0; 810 811 if (job->source.count <= SUB_JOB_SIZE) 812 dispatch_job(job); 813 else { 814 mutex_init(&job->lock); 815 job->progress = 0; 816 split_job(job); 817 } 818 819 return 0; 820 } 821 EXPORT_SYMBOL(dm_kcopyd_copy); 822 823 int dm_kcopyd_zero(struct dm_kcopyd_client *kc, 824 unsigned num_dests, struct dm_io_region *dests, 825 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 826 { 827 return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 828 } 829 EXPORT_SYMBOL(dm_kcopyd_zero); 830 831 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 832 dm_kcopyd_notify_fn fn, void *context) 833 { 834 struct kcopyd_job *job; 835 836 job = mempool_alloc(kc->job_pool, GFP_NOIO); 837 838 memset(job, 0, sizeof(struct kcopyd_job)); 839 job->kc = kc; 840 job->fn = fn; 841 job->context = context; 842 job->master_job = job; 843 844 atomic_inc(&kc->nr_jobs); 845 846 return job; 847 } 848 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 849 850 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 851 { 852 struct kcopyd_job *job = j; 853 struct dm_kcopyd_client *kc = job->kc; 854 855 job->read_err = read_err; 856 job->write_err = write_err; 857 858 push(&kc->complete_jobs, job); 859 wake(kc); 860 } 861 EXPORT_SYMBOL(dm_kcopyd_do_callback); 862 863 /* 864 * Cancels a kcopyd job, eg. someone might be deactivating a 865 * mirror. 866 */ 867 #if 0 868 int kcopyd_cancel(struct kcopyd_job *job, int block) 869 { 870 /* FIXME: finish */ 871 return -1; 872 } 873 #endif /* 0 */ 874 875 /*----------------------------------------------------------------- 876 * Client setup 877 *---------------------------------------------------------------*/ 878 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle) 879 { 880 int r = -ENOMEM; 881 struct dm_kcopyd_client *kc; 882 883 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 884 if (!kc) 885 return ERR_PTR(-ENOMEM); 886 887 spin_lock_init(&kc->job_lock); 888 INIT_LIST_HEAD(&kc->complete_jobs); 889 INIT_LIST_HEAD(&kc->io_jobs); 890 INIT_LIST_HEAD(&kc->pages_jobs); 891 kc->throttle = throttle; 892 893 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 894 if (!kc->job_pool) 895 goto bad_slab; 896 897 INIT_WORK(&kc->kcopyd_work, do_work); 898 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0); 899 if (!kc->kcopyd_wq) 900 goto bad_workqueue; 901 902 kc->pages = NULL; 903 kc->nr_reserved_pages = kc->nr_free_pages = 0; 904 r = client_reserve_pages(kc, RESERVE_PAGES); 905 if (r) 906 goto bad_client_pages; 907 908 kc->io_client = dm_io_client_create(); 909 if (IS_ERR(kc->io_client)) { 910 r = PTR_ERR(kc->io_client); 911 goto bad_io_client; 912 } 913 914 init_waitqueue_head(&kc->destroyq); 915 atomic_set(&kc->nr_jobs, 0); 916 917 return kc; 918 919 bad_io_client: 920 client_free_pages(kc); 921 bad_client_pages: 922 destroy_workqueue(kc->kcopyd_wq); 923 bad_workqueue: 924 mempool_destroy(kc->job_pool); 925 bad_slab: 926 kfree(kc); 927 928 return ERR_PTR(r); 929 } 930 EXPORT_SYMBOL(dm_kcopyd_client_create); 931 932 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 933 { 934 /* Wait for completion of all jobs submitted by this client. */ 935 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 936 937 BUG_ON(!list_empty(&kc->complete_jobs)); 938 BUG_ON(!list_empty(&kc->io_jobs)); 939 BUG_ON(!list_empty(&kc->pages_jobs)); 940 destroy_workqueue(kc->kcopyd_wq); 941 dm_io_client_destroy(kc->io_client); 942 client_free_pages(kc); 943 mempool_destroy(kc->job_pool); 944 kfree(kc); 945 } 946 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 947