1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <linux/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/device-mapper.h> 26 #include <linux/dm-kcopyd.h> 27 28 #include "dm.h" 29 30 #define SUB_JOB_SIZE 128 31 #define SPLIT_COUNT 8 32 #define MIN_JOBS 8 33 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 34 35 /*----------------------------------------------------------------- 36 * Each kcopyd client has its own little pool of preallocated 37 * pages for kcopyd io. 38 *---------------------------------------------------------------*/ 39 struct dm_kcopyd_client { 40 struct page_list *pages; 41 unsigned nr_reserved_pages; 42 unsigned nr_free_pages; 43 44 struct dm_io_client *io_client; 45 46 wait_queue_head_t destroyq; 47 atomic_t nr_jobs; 48 49 mempool_t *job_pool; 50 51 struct workqueue_struct *kcopyd_wq; 52 struct work_struct kcopyd_work; 53 54 /* 55 * We maintain three lists of jobs: 56 * 57 * i) jobs waiting for pages 58 * ii) jobs that have pages, and are waiting for the io to be issued. 59 * iii) jobs that have completed. 60 * 61 * All three of these are protected by job_lock. 62 */ 63 spinlock_t job_lock; 64 struct list_head complete_jobs; 65 struct list_head io_jobs; 66 struct list_head pages_jobs; 67 }; 68 69 static struct page_list zero_page_list; 70 71 static void wake(struct dm_kcopyd_client *kc) 72 { 73 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 74 } 75 76 /* 77 * Obtain one page for the use of kcopyd. 78 */ 79 static struct page_list *alloc_pl(gfp_t gfp) 80 { 81 struct page_list *pl; 82 83 pl = kmalloc(sizeof(*pl), gfp); 84 if (!pl) 85 return NULL; 86 87 pl->page = alloc_page(gfp); 88 if (!pl->page) { 89 kfree(pl); 90 return NULL; 91 } 92 93 return pl; 94 } 95 96 static void free_pl(struct page_list *pl) 97 { 98 __free_page(pl->page); 99 kfree(pl); 100 } 101 102 /* 103 * Add the provided pages to a client's free page list, releasing 104 * back to the system any beyond the reserved_pages limit. 105 */ 106 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 107 { 108 struct page_list *next; 109 110 do { 111 next = pl->next; 112 113 if (kc->nr_free_pages >= kc->nr_reserved_pages) 114 free_pl(pl); 115 else { 116 pl->next = kc->pages; 117 kc->pages = pl; 118 kc->nr_free_pages++; 119 } 120 121 pl = next; 122 } while (pl); 123 } 124 125 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 126 unsigned int nr, struct page_list **pages) 127 { 128 struct page_list *pl; 129 130 *pages = NULL; 131 132 do { 133 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY); 134 if (unlikely(!pl)) { 135 /* Use reserved pages */ 136 pl = kc->pages; 137 if (unlikely(!pl)) 138 goto out_of_memory; 139 kc->pages = pl->next; 140 kc->nr_free_pages--; 141 } 142 pl->next = *pages; 143 *pages = pl; 144 } while (--nr); 145 146 return 0; 147 148 out_of_memory: 149 if (*pages) 150 kcopyd_put_pages(kc, *pages); 151 return -ENOMEM; 152 } 153 154 /* 155 * These three functions resize the page pool. 156 */ 157 static void drop_pages(struct page_list *pl) 158 { 159 struct page_list *next; 160 161 while (pl) { 162 next = pl->next; 163 free_pl(pl); 164 pl = next; 165 } 166 } 167 168 /* 169 * Allocate and reserve nr_pages for the use of a specific client. 170 */ 171 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 172 { 173 unsigned i; 174 struct page_list *pl = NULL, *next; 175 176 for (i = 0; i < nr_pages; i++) { 177 next = alloc_pl(GFP_KERNEL); 178 if (!next) { 179 if (pl) 180 drop_pages(pl); 181 return -ENOMEM; 182 } 183 next->next = pl; 184 pl = next; 185 } 186 187 kc->nr_reserved_pages += nr_pages; 188 kcopyd_put_pages(kc, pl); 189 190 return 0; 191 } 192 193 static void client_free_pages(struct dm_kcopyd_client *kc) 194 { 195 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 196 drop_pages(kc->pages); 197 kc->pages = NULL; 198 kc->nr_free_pages = kc->nr_reserved_pages = 0; 199 } 200 201 /*----------------------------------------------------------------- 202 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 203 * for this reason we use a mempool to prevent the client from 204 * ever having to do io (which could cause a deadlock). 205 *---------------------------------------------------------------*/ 206 struct kcopyd_job { 207 struct dm_kcopyd_client *kc; 208 struct list_head list; 209 unsigned long flags; 210 211 /* 212 * Error state of the job. 213 */ 214 int read_err; 215 unsigned long write_err; 216 217 /* 218 * Either READ or WRITE 219 */ 220 int rw; 221 struct dm_io_region source; 222 223 /* 224 * The destinations for the transfer. 225 */ 226 unsigned int num_dests; 227 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 228 229 struct page_list *pages; 230 231 /* 232 * Set this to ensure you are notified when the job has 233 * completed. 'context' is for callback to use. 234 */ 235 dm_kcopyd_notify_fn fn; 236 void *context; 237 238 /* 239 * These fields are only used if the job has been split 240 * into more manageable parts. 241 */ 242 struct mutex lock; 243 atomic_t sub_jobs; 244 sector_t progress; 245 246 struct kcopyd_job *master_job; 247 }; 248 249 static struct kmem_cache *_job_cache; 250 251 int __init dm_kcopyd_init(void) 252 { 253 _job_cache = kmem_cache_create("kcopyd_job", 254 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 255 __alignof__(struct kcopyd_job), 0, NULL); 256 if (!_job_cache) 257 return -ENOMEM; 258 259 zero_page_list.next = &zero_page_list; 260 zero_page_list.page = ZERO_PAGE(0); 261 262 return 0; 263 } 264 265 void dm_kcopyd_exit(void) 266 { 267 kmem_cache_destroy(_job_cache); 268 _job_cache = NULL; 269 } 270 271 /* 272 * Functions to push and pop a job onto the head of a given job 273 * list. 274 */ 275 static struct kcopyd_job *pop(struct list_head *jobs, 276 struct dm_kcopyd_client *kc) 277 { 278 struct kcopyd_job *job = NULL; 279 unsigned long flags; 280 281 spin_lock_irqsave(&kc->job_lock, flags); 282 283 if (!list_empty(jobs)) { 284 job = list_entry(jobs->next, struct kcopyd_job, list); 285 list_del(&job->list); 286 } 287 spin_unlock_irqrestore(&kc->job_lock, flags); 288 289 return job; 290 } 291 292 static void push(struct list_head *jobs, struct kcopyd_job *job) 293 { 294 unsigned long flags; 295 struct dm_kcopyd_client *kc = job->kc; 296 297 spin_lock_irqsave(&kc->job_lock, flags); 298 list_add_tail(&job->list, jobs); 299 spin_unlock_irqrestore(&kc->job_lock, flags); 300 } 301 302 303 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 304 { 305 unsigned long flags; 306 struct dm_kcopyd_client *kc = job->kc; 307 308 spin_lock_irqsave(&kc->job_lock, flags); 309 list_add(&job->list, jobs); 310 spin_unlock_irqrestore(&kc->job_lock, flags); 311 } 312 313 /* 314 * These three functions process 1 item from the corresponding 315 * job list. 316 * 317 * They return: 318 * < 0: error 319 * 0: success 320 * > 0: can't process yet. 321 */ 322 static int run_complete_job(struct kcopyd_job *job) 323 { 324 void *context = job->context; 325 int read_err = job->read_err; 326 unsigned long write_err = job->write_err; 327 dm_kcopyd_notify_fn fn = job->fn; 328 struct dm_kcopyd_client *kc = job->kc; 329 330 if (job->pages && job->pages != &zero_page_list) 331 kcopyd_put_pages(kc, job->pages); 332 /* 333 * If this is the master job, the sub jobs have already 334 * completed so we can free everything. 335 */ 336 if (job->master_job == job) 337 mempool_free(job, kc->job_pool); 338 fn(read_err, write_err, context); 339 340 if (atomic_dec_and_test(&kc->nr_jobs)) 341 wake_up(&kc->destroyq); 342 343 return 0; 344 } 345 346 static void complete_io(unsigned long error, void *context) 347 { 348 struct kcopyd_job *job = (struct kcopyd_job *) context; 349 struct dm_kcopyd_client *kc = job->kc; 350 351 if (error) { 352 if (job->rw == WRITE) 353 job->write_err |= error; 354 else 355 job->read_err = 1; 356 357 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 358 push(&kc->complete_jobs, job); 359 wake(kc); 360 return; 361 } 362 } 363 364 if (job->rw == WRITE) 365 push(&kc->complete_jobs, job); 366 367 else { 368 job->rw = WRITE; 369 push(&kc->io_jobs, job); 370 } 371 372 wake(kc); 373 } 374 375 /* 376 * Request io on as many buffer heads as we can currently get for 377 * a particular job. 378 */ 379 static int run_io_job(struct kcopyd_job *job) 380 { 381 int r; 382 struct dm_io_request io_req = { 383 .bi_rw = job->rw, 384 .mem.type = DM_IO_PAGE_LIST, 385 .mem.ptr.pl = job->pages, 386 .mem.offset = 0, 387 .notify.fn = complete_io, 388 .notify.context = job, 389 .client = job->kc->io_client, 390 }; 391 392 if (job->rw == READ) 393 r = dm_io(&io_req, 1, &job->source, NULL); 394 else 395 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 396 397 return r; 398 } 399 400 static int run_pages_job(struct kcopyd_job *job) 401 { 402 int r; 403 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9); 404 405 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); 406 if (!r) { 407 /* this job is ready for io */ 408 push(&job->kc->io_jobs, job); 409 return 0; 410 } 411 412 if (r == -ENOMEM) 413 /* can't complete now */ 414 return 1; 415 416 return r; 417 } 418 419 /* 420 * Run through a list for as long as possible. Returns the count 421 * of successful jobs. 422 */ 423 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 424 int (*fn) (struct kcopyd_job *)) 425 { 426 struct kcopyd_job *job; 427 int r, count = 0; 428 429 while ((job = pop(jobs, kc))) { 430 431 r = fn(job); 432 433 if (r < 0) { 434 /* error this rogue job */ 435 if (job->rw == WRITE) 436 job->write_err = (unsigned long) -1L; 437 else 438 job->read_err = 1; 439 push(&kc->complete_jobs, job); 440 break; 441 } 442 443 if (r > 0) { 444 /* 445 * We couldn't service this job ATM, so 446 * push this job back onto the list. 447 */ 448 push_head(jobs, job); 449 break; 450 } 451 452 count++; 453 } 454 455 return count; 456 } 457 458 /* 459 * kcopyd does this every time it's woken up. 460 */ 461 static void do_work(struct work_struct *work) 462 { 463 struct dm_kcopyd_client *kc = container_of(work, 464 struct dm_kcopyd_client, kcopyd_work); 465 struct blk_plug plug; 466 467 /* 468 * The order that these are called is *very* important. 469 * complete jobs can free some pages for pages jobs. 470 * Pages jobs when successful will jump onto the io jobs 471 * list. io jobs call wake when they complete and it all 472 * starts again. 473 */ 474 blk_start_plug(&plug); 475 process_jobs(&kc->complete_jobs, kc, run_complete_job); 476 process_jobs(&kc->pages_jobs, kc, run_pages_job); 477 process_jobs(&kc->io_jobs, kc, run_io_job); 478 blk_finish_plug(&plug); 479 } 480 481 /* 482 * If we are copying a small region we just dispatch a single job 483 * to do the copy, otherwise the io has to be split up into many 484 * jobs. 485 */ 486 static void dispatch_job(struct kcopyd_job *job) 487 { 488 struct dm_kcopyd_client *kc = job->kc; 489 atomic_inc(&kc->nr_jobs); 490 if (unlikely(!job->source.count)) 491 push(&kc->complete_jobs, job); 492 else if (job->pages == &zero_page_list) 493 push(&kc->io_jobs, job); 494 else 495 push(&kc->pages_jobs, job); 496 wake(kc); 497 } 498 499 static void segment_complete(int read_err, unsigned long write_err, 500 void *context) 501 { 502 /* FIXME: tidy this function */ 503 sector_t progress = 0; 504 sector_t count = 0; 505 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 506 struct kcopyd_job *job = sub_job->master_job; 507 struct dm_kcopyd_client *kc = job->kc; 508 509 mutex_lock(&job->lock); 510 511 /* update the error */ 512 if (read_err) 513 job->read_err = 1; 514 515 if (write_err) 516 job->write_err |= write_err; 517 518 /* 519 * Only dispatch more work if there hasn't been an error. 520 */ 521 if ((!job->read_err && !job->write_err) || 522 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 523 /* get the next chunk of work */ 524 progress = job->progress; 525 count = job->source.count - progress; 526 if (count) { 527 if (count > SUB_JOB_SIZE) 528 count = SUB_JOB_SIZE; 529 530 job->progress += count; 531 } 532 } 533 mutex_unlock(&job->lock); 534 535 if (count) { 536 int i; 537 538 *sub_job = *job; 539 sub_job->source.sector += progress; 540 sub_job->source.count = count; 541 542 for (i = 0; i < job->num_dests; i++) { 543 sub_job->dests[i].sector += progress; 544 sub_job->dests[i].count = count; 545 } 546 547 sub_job->fn = segment_complete; 548 sub_job->context = sub_job; 549 dispatch_job(sub_job); 550 551 } else if (atomic_dec_and_test(&job->sub_jobs)) { 552 553 /* 554 * Queue the completion callback to the kcopyd thread. 555 * 556 * Some callers assume that all the completions are called 557 * from a single thread and don't race with each other. 558 * 559 * We must not call the callback directly here because this 560 * code may not be executing in the thread. 561 */ 562 push(&kc->complete_jobs, job); 563 wake(kc); 564 } 565 } 566 567 /* 568 * Create some sub jobs to share the work between them. 569 */ 570 static void split_job(struct kcopyd_job *master_job) 571 { 572 int i; 573 574 atomic_inc(&master_job->kc->nr_jobs); 575 576 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 577 for (i = 0; i < SPLIT_COUNT; i++) { 578 master_job[i + 1].master_job = master_job; 579 segment_complete(0, 0u, &master_job[i + 1]); 580 } 581 } 582 583 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 584 unsigned int num_dests, struct dm_io_region *dests, 585 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 586 { 587 struct kcopyd_job *job; 588 589 /* 590 * Allocate an array of jobs consisting of one master job 591 * followed by SPLIT_COUNT sub jobs. 592 */ 593 job = mempool_alloc(kc->job_pool, GFP_NOIO); 594 595 /* 596 * set up for the read. 597 */ 598 job->kc = kc; 599 job->flags = flags; 600 job->read_err = 0; 601 job->write_err = 0; 602 603 job->num_dests = num_dests; 604 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 605 606 if (from) { 607 job->source = *from; 608 job->pages = NULL; 609 job->rw = READ; 610 } else { 611 memset(&job->source, 0, sizeof job->source); 612 job->source.count = job->dests[0].count; 613 job->pages = &zero_page_list; 614 job->rw = WRITE; 615 } 616 617 job->fn = fn; 618 job->context = context; 619 job->master_job = job; 620 621 if (job->source.count <= SUB_JOB_SIZE) 622 dispatch_job(job); 623 else { 624 mutex_init(&job->lock); 625 job->progress = 0; 626 split_job(job); 627 } 628 629 return 0; 630 } 631 EXPORT_SYMBOL(dm_kcopyd_copy); 632 633 int dm_kcopyd_zero(struct dm_kcopyd_client *kc, 634 unsigned num_dests, struct dm_io_region *dests, 635 unsigned flags, dm_kcopyd_notify_fn fn, void *context) 636 { 637 return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context); 638 } 639 EXPORT_SYMBOL(dm_kcopyd_zero); 640 641 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc, 642 dm_kcopyd_notify_fn fn, void *context) 643 { 644 struct kcopyd_job *job; 645 646 job = mempool_alloc(kc->job_pool, GFP_NOIO); 647 648 memset(job, 0, sizeof(struct kcopyd_job)); 649 job->kc = kc; 650 job->fn = fn; 651 job->context = context; 652 job->master_job = job; 653 654 atomic_inc(&kc->nr_jobs); 655 656 return job; 657 } 658 EXPORT_SYMBOL(dm_kcopyd_prepare_callback); 659 660 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err) 661 { 662 struct kcopyd_job *job = j; 663 struct dm_kcopyd_client *kc = job->kc; 664 665 job->read_err = read_err; 666 job->write_err = write_err; 667 668 push(&kc->complete_jobs, job); 669 wake(kc); 670 } 671 EXPORT_SYMBOL(dm_kcopyd_do_callback); 672 673 /* 674 * Cancels a kcopyd job, eg. someone might be deactivating a 675 * mirror. 676 */ 677 #if 0 678 int kcopyd_cancel(struct kcopyd_job *job, int block) 679 { 680 /* FIXME: finish */ 681 return -1; 682 } 683 #endif /* 0 */ 684 685 /*----------------------------------------------------------------- 686 * Client setup 687 *---------------------------------------------------------------*/ 688 struct dm_kcopyd_client *dm_kcopyd_client_create(void) 689 { 690 int r = -ENOMEM; 691 struct dm_kcopyd_client *kc; 692 693 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 694 if (!kc) 695 return ERR_PTR(-ENOMEM); 696 697 spin_lock_init(&kc->job_lock); 698 INIT_LIST_HEAD(&kc->complete_jobs); 699 INIT_LIST_HEAD(&kc->io_jobs); 700 INIT_LIST_HEAD(&kc->pages_jobs); 701 702 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 703 if (!kc->job_pool) 704 goto bad_slab; 705 706 INIT_WORK(&kc->kcopyd_work, do_work); 707 kc->kcopyd_wq = alloc_workqueue("kcopyd", 708 WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); 709 if (!kc->kcopyd_wq) 710 goto bad_workqueue; 711 712 kc->pages = NULL; 713 kc->nr_reserved_pages = kc->nr_free_pages = 0; 714 r = client_reserve_pages(kc, RESERVE_PAGES); 715 if (r) 716 goto bad_client_pages; 717 718 kc->io_client = dm_io_client_create(); 719 if (IS_ERR(kc->io_client)) { 720 r = PTR_ERR(kc->io_client); 721 goto bad_io_client; 722 } 723 724 init_waitqueue_head(&kc->destroyq); 725 atomic_set(&kc->nr_jobs, 0); 726 727 return kc; 728 729 bad_io_client: 730 client_free_pages(kc); 731 bad_client_pages: 732 destroy_workqueue(kc->kcopyd_wq); 733 bad_workqueue: 734 mempool_destroy(kc->job_pool); 735 bad_slab: 736 kfree(kc); 737 738 return ERR_PTR(r); 739 } 740 EXPORT_SYMBOL(dm_kcopyd_client_create); 741 742 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 743 { 744 /* Wait for completion of all jobs submitted by this client. */ 745 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 746 747 BUG_ON(!list_empty(&kc->complete_jobs)); 748 BUG_ON(!list_empty(&kc->io_jobs)); 749 BUG_ON(!list_empty(&kc->pages_jobs)); 750 destroy_workqueue(kc->kcopyd_wq); 751 dm_io_client_destroy(kc->io_client); 752 client_free_pages(kc); 753 mempool_destroy(kc->job_pool); 754 kfree(kc); 755 } 756 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 757