1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <asm/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/device-mapper.h> 26 #include <linux/dm-kcopyd.h> 27 28 #include "dm.h" 29 30 /*----------------------------------------------------------------- 31 * Each kcopyd client has its own little pool of preallocated 32 * pages for kcopyd io. 33 *---------------------------------------------------------------*/ 34 struct dm_kcopyd_client { 35 spinlock_t lock; 36 struct page_list *pages; 37 unsigned int nr_pages; 38 unsigned int nr_free_pages; 39 40 /* 41 * Block devices to unplug. 42 * Non-NULL pointer means that a block device has some pending requests 43 * and needs to be unplugged. 44 */ 45 struct block_device *unplug[2]; 46 47 struct dm_io_client *io_client; 48 49 wait_queue_head_t destroyq; 50 atomic_t nr_jobs; 51 52 mempool_t *job_pool; 53 54 struct workqueue_struct *kcopyd_wq; 55 struct work_struct kcopyd_work; 56 57 /* 58 * We maintain three lists of jobs: 59 * 60 * i) jobs waiting for pages 61 * ii) jobs that have pages, and are waiting for the io to be issued. 62 * iii) jobs that have completed. 63 * 64 * All three of these are protected by job_lock. 65 */ 66 spinlock_t job_lock; 67 struct list_head complete_jobs; 68 struct list_head io_jobs; 69 struct list_head pages_jobs; 70 }; 71 72 static void wake(struct dm_kcopyd_client *kc) 73 { 74 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 75 } 76 77 static struct page_list *alloc_pl(void) 78 { 79 struct page_list *pl; 80 81 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 82 if (!pl) 83 return NULL; 84 85 pl->page = alloc_page(GFP_KERNEL); 86 if (!pl->page) { 87 kfree(pl); 88 return NULL; 89 } 90 91 return pl; 92 } 93 94 static void free_pl(struct page_list *pl) 95 { 96 __free_page(pl->page); 97 kfree(pl); 98 } 99 100 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 101 unsigned int nr, struct page_list **pages) 102 { 103 struct page_list *pl; 104 105 spin_lock(&kc->lock); 106 if (kc->nr_free_pages < nr) { 107 spin_unlock(&kc->lock); 108 return -ENOMEM; 109 } 110 111 kc->nr_free_pages -= nr; 112 for (*pages = pl = kc->pages; --nr; pl = pl->next) 113 ; 114 115 kc->pages = pl->next; 116 pl->next = NULL; 117 118 spin_unlock(&kc->lock); 119 120 return 0; 121 } 122 123 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 124 { 125 struct page_list *cursor; 126 127 spin_lock(&kc->lock); 128 for (cursor = pl; cursor->next; cursor = cursor->next) 129 kc->nr_free_pages++; 130 131 kc->nr_free_pages++; 132 cursor->next = kc->pages; 133 kc->pages = pl; 134 spin_unlock(&kc->lock); 135 } 136 137 /* 138 * These three functions resize the page pool. 139 */ 140 static void drop_pages(struct page_list *pl) 141 { 142 struct page_list *next; 143 144 while (pl) { 145 next = pl->next; 146 free_pl(pl); 147 pl = next; 148 } 149 } 150 151 static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr) 152 { 153 unsigned int i; 154 struct page_list *pl = NULL, *next; 155 156 for (i = 0; i < nr; i++) { 157 next = alloc_pl(); 158 if (!next) { 159 if (pl) 160 drop_pages(pl); 161 return -ENOMEM; 162 } 163 next->next = pl; 164 pl = next; 165 } 166 167 kcopyd_put_pages(kc, pl); 168 kc->nr_pages += nr; 169 return 0; 170 } 171 172 static void client_free_pages(struct dm_kcopyd_client *kc) 173 { 174 BUG_ON(kc->nr_free_pages != kc->nr_pages); 175 drop_pages(kc->pages); 176 kc->pages = NULL; 177 kc->nr_free_pages = kc->nr_pages = 0; 178 } 179 180 /*----------------------------------------------------------------- 181 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 182 * for this reason we use a mempool to prevent the client from 183 * ever having to do io (which could cause a deadlock). 184 *---------------------------------------------------------------*/ 185 struct kcopyd_job { 186 struct dm_kcopyd_client *kc; 187 struct list_head list; 188 unsigned long flags; 189 190 /* 191 * Error state of the job. 192 */ 193 int read_err; 194 unsigned long write_err; 195 196 /* 197 * Either READ or WRITE 198 */ 199 int rw; 200 struct dm_io_region source; 201 202 /* 203 * The destinations for the transfer. 204 */ 205 unsigned int num_dests; 206 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 207 208 sector_t offset; 209 unsigned int nr_pages; 210 struct page_list *pages; 211 212 /* 213 * Set this to ensure you are notified when the job has 214 * completed. 'context' is for callback to use. 215 */ 216 dm_kcopyd_notify_fn fn; 217 void *context; 218 219 /* 220 * These fields are only used if the job has been split 221 * into more manageable parts. 222 */ 223 struct mutex lock; 224 atomic_t sub_jobs; 225 sector_t progress; 226 }; 227 228 /* FIXME: this should scale with the number of pages */ 229 #define MIN_JOBS 512 230 231 static struct kmem_cache *_job_cache; 232 233 int __init dm_kcopyd_init(void) 234 { 235 _job_cache = KMEM_CACHE(kcopyd_job, 0); 236 if (!_job_cache) 237 return -ENOMEM; 238 239 return 0; 240 } 241 242 void dm_kcopyd_exit(void) 243 { 244 kmem_cache_destroy(_job_cache); 245 _job_cache = NULL; 246 } 247 248 /* 249 * Functions to push and pop a job onto the head of a given job 250 * list. 251 */ 252 static struct kcopyd_job *pop(struct list_head *jobs, 253 struct dm_kcopyd_client *kc) 254 { 255 struct kcopyd_job *job = NULL; 256 unsigned long flags; 257 258 spin_lock_irqsave(&kc->job_lock, flags); 259 260 if (!list_empty(jobs)) { 261 job = list_entry(jobs->next, struct kcopyd_job, list); 262 list_del(&job->list); 263 } 264 spin_unlock_irqrestore(&kc->job_lock, flags); 265 266 return job; 267 } 268 269 static void push(struct list_head *jobs, struct kcopyd_job *job) 270 { 271 unsigned long flags; 272 struct dm_kcopyd_client *kc = job->kc; 273 274 spin_lock_irqsave(&kc->job_lock, flags); 275 list_add_tail(&job->list, jobs); 276 spin_unlock_irqrestore(&kc->job_lock, flags); 277 } 278 279 280 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 281 { 282 unsigned long flags; 283 struct dm_kcopyd_client *kc = job->kc; 284 285 spin_lock_irqsave(&kc->job_lock, flags); 286 list_add(&job->list, jobs); 287 spin_unlock_irqrestore(&kc->job_lock, flags); 288 } 289 290 /* 291 * These three functions process 1 item from the corresponding 292 * job list. 293 * 294 * They return: 295 * < 0: error 296 * 0: success 297 * > 0: can't process yet. 298 */ 299 static int run_complete_job(struct kcopyd_job *job) 300 { 301 void *context = job->context; 302 int read_err = job->read_err; 303 unsigned long write_err = job->write_err; 304 dm_kcopyd_notify_fn fn = job->fn; 305 struct dm_kcopyd_client *kc = job->kc; 306 307 if (job->pages) 308 kcopyd_put_pages(kc, job->pages); 309 mempool_free(job, kc->job_pool); 310 fn(read_err, write_err, context); 311 312 if (atomic_dec_and_test(&kc->nr_jobs)) 313 wake_up(&kc->destroyq); 314 315 return 0; 316 } 317 318 /* 319 * Unplug the block device at the specified index. 320 */ 321 static void unplug(struct dm_kcopyd_client *kc, int rw) 322 { 323 if (kc->unplug[rw] != NULL) { 324 blk_unplug(bdev_get_queue(kc->unplug[rw])); 325 kc->unplug[rw] = NULL; 326 } 327 } 328 329 /* 330 * Prepare block device unplug. If there's another device 331 * to be unplugged at the same array index, we unplug that 332 * device first. 333 */ 334 static void prepare_unplug(struct dm_kcopyd_client *kc, int rw, 335 struct block_device *bdev) 336 { 337 if (likely(kc->unplug[rw] == bdev)) 338 return; 339 unplug(kc, rw); 340 kc->unplug[rw] = bdev; 341 } 342 343 static void complete_io(unsigned long error, void *context) 344 { 345 struct kcopyd_job *job = (struct kcopyd_job *) context; 346 struct dm_kcopyd_client *kc = job->kc; 347 348 if (error) { 349 if (job->rw == WRITE) 350 job->write_err |= error; 351 else 352 job->read_err = 1; 353 354 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 355 push(&kc->complete_jobs, job); 356 wake(kc); 357 return; 358 } 359 } 360 361 if (job->rw == WRITE) 362 push(&kc->complete_jobs, job); 363 364 else { 365 job->rw = WRITE; 366 push(&kc->io_jobs, job); 367 } 368 369 wake(kc); 370 } 371 372 /* 373 * Request io on as many buffer heads as we can currently get for 374 * a particular job. 375 */ 376 static int run_io_job(struct kcopyd_job *job) 377 { 378 int r; 379 struct dm_io_request io_req = { 380 .bi_rw = job->rw, 381 .mem.type = DM_IO_PAGE_LIST, 382 .mem.ptr.pl = job->pages, 383 .mem.offset = job->offset, 384 .notify.fn = complete_io, 385 .notify.context = job, 386 .client = job->kc->io_client, 387 }; 388 389 if (job->rw == READ) { 390 r = dm_io(&io_req, 1, &job->source, NULL); 391 prepare_unplug(job->kc, READ, job->source.bdev); 392 } else { 393 if (job->num_dests > 1) 394 io_req.bi_rw |= REQ_UNPLUG; 395 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 396 if (!(io_req.bi_rw & REQ_UNPLUG)) 397 prepare_unplug(job->kc, WRITE, job->dests[0].bdev); 398 } 399 400 return r; 401 } 402 403 static int run_pages_job(struct kcopyd_job *job) 404 { 405 int r; 406 407 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 408 PAGE_SIZE >> 9); 409 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 410 if (!r) { 411 /* this job is ready for io */ 412 push(&job->kc->io_jobs, job); 413 return 0; 414 } 415 416 if (r == -ENOMEM) 417 /* can't complete now */ 418 return 1; 419 420 return r; 421 } 422 423 /* 424 * Run through a list for as long as possible. Returns the count 425 * of successful jobs. 426 */ 427 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 428 int (*fn) (struct kcopyd_job *)) 429 { 430 struct kcopyd_job *job; 431 int r, count = 0; 432 433 while ((job = pop(jobs, kc))) { 434 435 r = fn(job); 436 437 if (r < 0) { 438 /* error this rogue job */ 439 if (job->rw == WRITE) 440 job->write_err = (unsigned long) -1L; 441 else 442 job->read_err = 1; 443 push(&kc->complete_jobs, job); 444 break; 445 } 446 447 if (r > 0) { 448 /* 449 * We couldn't service this job ATM, so 450 * push this job back onto the list. 451 */ 452 push_head(jobs, job); 453 break; 454 } 455 456 count++; 457 } 458 459 return count; 460 } 461 462 /* 463 * kcopyd does this every time it's woken up. 464 */ 465 static void do_work(struct work_struct *work) 466 { 467 struct dm_kcopyd_client *kc = container_of(work, 468 struct dm_kcopyd_client, kcopyd_work); 469 470 /* 471 * The order that these are called is *very* important. 472 * complete jobs can free some pages for pages jobs. 473 * Pages jobs when successful will jump onto the io jobs 474 * list. io jobs call wake when they complete and it all 475 * starts again. 476 * 477 * Note that io_jobs add block devices to the unplug array, 478 * this array is cleared with "unplug" calls. It is thus 479 * forbidden to run complete_jobs after io_jobs and before 480 * unplug because the block device could be destroyed in 481 * job completion callback. 482 */ 483 process_jobs(&kc->complete_jobs, kc, run_complete_job); 484 process_jobs(&kc->pages_jobs, kc, run_pages_job); 485 process_jobs(&kc->io_jobs, kc, run_io_job); 486 unplug(kc, READ); 487 unplug(kc, WRITE); 488 } 489 490 /* 491 * If we are copying a small region we just dispatch a single job 492 * to do the copy, otherwise the io has to be split up into many 493 * jobs. 494 */ 495 static void dispatch_job(struct kcopyd_job *job) 496 { 497 struct dm_kcopyd_client *kc = job->kc; 498 atomic_inc(&kc->nr_jobs); 499 if (unlikely(!job->source.count)) 500 push(&kc->complete_jobs, job); 501 else 502 push(&kc->pages_jobs, job); 503 wake(kc); 504 } 505 506 #define SUB_JOB_SIZE 128 507 static void segment_complete(int read_err, unsigned long write_err, 508 void *context) 509 { 510 /* FIXME: tidy this function */ 511 sector_t progress = 0; 512 sector_t count = 0; 513 struct kcopyd_job *job = (struct kcopyd_job *) context; 514 struct dm_kcopyd_client *kc = job->kc; 515 516 mutex_lock(&job->lock); 517 518 /* update the error */ 519 if (read_err) 520 job->read_err = 1; 521 522 if (write_err) 523 job->write_err |= write_err; 524 525 /* 526 * Only dispatch more work if there hasn't been an error. 527 */ 528 if ((!job->read_err && !job->write_err) || 529 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 530 /* get the next chunk of work */ 531 progress = job->progress; 532 count = job->source.count - progress; 533 if (count) { 534 if (count > SUB_JOB_SIZE) 535 count = SUB_JOB_SIZE; 536 537 job->progress += count; 538 } 539 } 540 mutex_unlock(&job->lock); 541 542 if (count) { 543 int i; 544 struct kcopyd_job *sub_job = mempool_alloc(kc->job_pool, 545 GFP_NOIO); 546 547 *sub_job = *job; 548 sub_job->source.sector += progress; 549 sub_job->source.count = count; 550 551 for (i = 0; i < job->num_dests; i++) { 552 sub_job->dests[i].sector += progress; 553 sub_job->dests[i].count = count; 554 } 555 556 sub_job->fn = segment_complete; 557 sub_job->context = job; 558 dispatch_job(sub_job); 559 560 } else if (atomic_dec_and_test(&job->sub_jobs)) { 561 562 /* 563 * Queue the completion callback to the kcopyd thread. 564 * 565 * Some callers assume that all the completions are called 566 * from a single thread and don't race with each other. 567 * 568 * We must not call the callback directly here because this 569 * code may not be executing in the thread. 570 */ 571 push(&kc->complete_jobs, job); 572 wake(kc); 573 } 574 } 575 576 /* 577 * Create some little jobs that will do the move between 578 * them. 579 */ 580 #define SPLIT_COUNT 8 581 static void split_job(struct kcopyd_job *job) 582 { 583 int i; 584 585 atomic_inc(&job->kc->nr_jobs); 586 587 atomic_set(&job->sub_jobs, SPLIT_COUNT); 588 for (i = 0; i < SPLIT_COUNT; i++) 589 segment_complete(0, 0u, job); 590 } 591 592 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 593 unsigned int num_dests, struct dm_io_region *dests, 594 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 595 { 596 struct kcopyd_job *job; 597 598 /* 599 * Allocate a new job. 600 */ 601 job = mempool_alloc(kc->job_pool, GFP_NOIO); 602 603 /* 604 * set up for the read. 605 */ 606 job->kc = kc; 607 job->flags = flags; 608 job->read_err = 0; 609 job->write_err = 0; 610 job->rw = READ; 611 612 job->source = *from; 613 614 job->num_dests = num_dests; 615 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 616 617 job->offset = 0; 618 job->nr_pages = 0; 619 job->pages = NULL; 620 621 job->fn = fn; 622 job->context = context; 623 624 if (job->source.count < SUB_JOB_SIZE) 625 dispatch_job(job); 626 627 else { 628 mutex_init(&job->lock); 629 job->progress = 0; 630 split_job(job); 631 } 632 633 return 0; 634 } 635 EXPORT_SYMBOL(dm_kcopyd_copy); 636 637 /* 638 * Cancels a kcopyd job, eg. someone might be deactivating a 639 * mirror. 640 */ 641 #if 0 642 int kcopyd_cancel(struct kcopyd_job *job, int block) 643 { 644 /* FIXME: finish */ 645 return -1; 646 } 647 #endif /* 0 */ 648 649 /*----------------------------------------------------------------- 650 * Client setup 651 *---------------------------------------------------------------*/ 652 int dm_kcopyd_client_create(unsigned int nr_pages, 653 struct dm_kcopyd_client **result) 654 { 655 int r = -ENOMEM; 656 struct dm_kcopyd_client *kc; 657 658 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 659 if (!kc) 660 return -ENOMEM; 661 662 spin_lock_init(&kc->lock); 663 spin_lock_init(&kc->job_lock); 664 INIT_LIST_HEAD(&kc->complete_jobs); 665 INIT_LIST_HEAD(&kc->io_jobs); 666 INIT_LIST_HEAD(&kc->pages_jobs); 667 668 memset(kc->unplug, 0, sizeof(kc->unplug)); 669 670 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 671 if (!kc->job_pool) 672 goto bad_slab; 673 674 INIT_WORK(&kc->kcopyd_work, do_work); 675 kc->kcopyd_wq = alloc_workqueue("kcopyd", 676 WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); 677 if (!kc->kcopyd_wq) 678 goto bad_workqueue; 679 680 kc->pages = NULL; 681 kc->nr_pages = kc->nr_free_pages = 0; 682 r = client_alloc_pages(kc, nr_pages); 683 if (r) 684 goto bad_client_pages; 685 686 kc->io_client = dm_io_client_create(nr_pages); 687 if (IS_ERR(kc->io_client)) { 688 r = PTR_ERR(kc->io_client); 689 goto bad_io_client; 690 } 691 692 init_waitqueue_head(&kc->destroyq); 693 atomic_set(&kc->nr_jobs, 0); 694 695 *result = kc; 696 return 0; 697 698 bad_io_client: 699 client_free_pages(kc); 700 bad_client_pages: 701 destroy_workqueue(kc->kcopyd_wq); 702 bad_workqueue: 703 mempool_destroy(kc->job_pool); 704 bad_slab: 705 kfree(kc); 706 707 return r; 708 } 709 EXPORT_SYMBOL(dm_kcopyd_client_create); 710 711 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 712 { 713 /* Wait for completion of all jobs submitted by this client. */ 714 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 715 716 BUG_ON(!list_empty(&kc->complete_jobs)); 717 BUG_ON(!list_empty(&kc->io_jobs)); 718 BUG_ON(!list_empty(&kc->pages_jobs)); 719 destroy_workqueue(kc->kcopyd_wq); 720 dm_io_client_destroy(kc->io_client); 721 client_free_pages(kc); 722 mempool_destroy(kc->job_pool); 723 kfree(kc); 724 } 725 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 726