1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <asm/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/device-mapper.h> 26 #include <linux/dm-kcopyd.h> 27 28 #include "dm.h" 29 30 #define SUB_JOB_SIZE 128 31 #define SPLIT_COUNT 8 32 #define MIN_JOBS 8 33 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE)) 34 35 /*----------------------------------------------------------------- 36 * Each kcopyd client has its own little pool of preallocated 37 * pages for kcopyd io. 38 *---------------------------------------------------------------*/ 39 struct dm_kcopyd_client { 40 struct page_list *pages; 41 unsigned nr_reserved_pages; 42 unsigned nr_free_pages; 43 44 struct dm_io_client *io_client; 45 46 wait_queue_head_t destroyq; 47 atomic_t nr_jobs; 48 49 mempool_t *job_pool; 50 51 struct workqueue_struct *kcopyd_wq; 52 struct work_struct kcopyd_work; 53 54 /* 55 * We maintain three lists of jobs: 56 * 57 * i) jobs waiting for pages 58 * ii) jobs that have pages, and are waiting for the io to be issued. 59 * iii) jobs that have completed. 60 * 61 * All three of these are protected by job_lock. 62 */ 63 spinlock_t job_lock; 64 struct list_head complete_jobs; 65 struct list_head io_jobs; 66 struct list_head pages_jobs; 67 }; 68 69 static void wake(struct dm_kcopyd_client *kc) 70 { 71 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 72 } 73 74 /* 75 * Obtain one page for the use of kcopyd. 76 */ 77 static struct page_list *alloc_pl(gfp_t gfp) 78 { 79 struct page_list *pl; 80 81 pl = kmalloc(sizeof(*pl), gfp); 82 if (!pl) 83 return NULL; 84 85 pl->page = alloc_page(gfp); 86 if (!pl->page) { 87 kfree(pl); 88 return NULL; 89 } 90 91 return pl; 92 } 93 94 static void free_pl(struct page_list *pl) 95 { 96 __free_page(pl->page); 97 kfree(pl); 98 } 99 100 /* 101 * Add the provided pages to a client's free page list, releasing 102 * back to the system any beyond the reserved_pages limit. 103 */ 104 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 105 { 106 struct page_list *next; 107 108 do { 109 next = pl->next; 110 111 if (kc->nr_free_pages >= kc->nr_reserved_pages) 112 free_pl(pl); 113 else { 114 pl->next = kc->pages; 115 kc->pages = pl; 116 kc->nr_free_pages++; 117 } 118 119 pl = next; 120 } while (pl); 121 } 122 123 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 124 unsigned int nr, struct page_list **pages) 125 { 126 struct page_list *pl; 127 128 *pages = NULL; 129 130 do { 131 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY); 132 if (unlikely(!pl)) { 133 /* Use reserved pages */ 134 pl = kc->pages; 135 if (unlikely(!pl)) 136 goto out_of_memory; 137 kc->pages = pl->next; 138 kc->nr_free_pages--; 139 } 140 pl->next = *pages; 141 *pages = pl; 142 } while (--nr); 143 144 return 0; 145 146 out_of_memory: 147 if (*pages) 148 kcopyd_put_pages(kc, *pages); 149 return -ENOMEM; 150 } 151 152 /* 153 * These three functions resize the page pool. 154 */ 155 static void drop_pages(struct page_list *pl) 156 { 157 struct page_list *next; 158 159 while (pl) { 160 next = pl->next; 161 free_pl(pl); 162 pl = next; 163 } 164 } 165 166 /* 167 * Allocate and reserve nr_pages for the use of a specific client. 168 */ 169 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages) 170 { 171 unsigned i; 172 struct page_list *pl = NULL, *next; 173 174 for (i = 0; i < nr_pages; i++) { 175 next = alloc_pl(GFP_KERNEL); 176 if (!next) { 177 if (pl) 178 drop_pages(pl); 179 return -ENOMEM; 180 } 181 next->next = pl; 182 pl = next; 183 } 184 185 kc->nr_reserved_pages += nr_pages; 186 kcopyd_put_pages(kc, pl); 187 188 return 0; 189 } 190 191 static void client_free_pages(struct dm_kcopyd_client *kc) 192 { 193 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages); 194 drop_pages(kc->pages); 195 kc->pages = NULL; 196 kc->nr_free_pages = kc->nr_reserved_pages = 0; 197 } 198 199 /*----------------------------------------------------------------- 200 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 201 * for this reason we use a mempool to prevent the client from 202 * ever having to do io (which could cause a deadlock). 203 *---------------------------------------------------------------*/ 204 struct kcopyd_job { 205 struct dm_kcopyd_client *kc; 206 struct list_head list; 207 unsigned long flags; 208 209 /* 210 * Error state of the job. 211 */ 212 int read_err; 213 unsigned long write_err; 214 215 /* 216 * Either READ or WRITE 217 */ 218 int rw; 219 struct dm_io_region source; 220 221 /* 222 * The destinations for the transfer. 223 */ 224 unsigned int num_dests; 225 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 226 227 sector_t offset; 228 unsigned int nr_pages; 229 struct page_list *pages; 230 231 /* 232 * Set this to ensure you are notified when the job has 233 * completed. 'context' is for callback to use. 234 */ 235 dm_kcopyd_notify_fn fn; 236 void *context; 237 238 /* 239 * These fields are only used if the job has been split 240 * into more manageable parts. 241 */ 242 struct mutex lock; 243 atomic_t sub_jobs; 244 sector_t progress; 245 246 struct kcopyd_job *master_job; 247 }; 248 249 static struct kmem_cache *_job_cache; 250 251 int __init dm_kcopyd_init(void) 252 { 253 _job_cache = kmem_cache_create("kcopyd_job", 254 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1), 255 __alignof__(struct kcopyd_job), 0, NULL); 256 if (!_job_cache) 257 return -ENOMEM; 258 259 return 0; 260 } 261 262 void dm_kcopyd_exit(void) 263 { 264 kmem_cache_destroy(_job_cache); 265 _job_cache = NULL; 266 } 267 268 /* 269 * Functions to push and pop a job onto the head of a given job 270 * list. 271 */ 272 static struct kcopyd_job *pop(struct list_head *jobs, 273 struct dm_kcopyd_client *kc) 274 { 275 struct kcopyd_job *job = NULL; 276 unsigned long flags; 277 278 spin_lock_irqsave(&kc->job_lock, flags); 279 280 if (!list_empty(jobs)) { 281 job = list_entry(jobs->next, struct kcopyd_job, list); 282 list_del(&job->list); 283 } 284 spin_unlock_irqrestore(&kc->job_lock, flags); 285 286 return job; 287 } 288 289 static void push(struct list_head *jobs, struct kcopyd_job *job) 290 { 291 unsigned long flags; 292 struct dm_kcopyd_client *kc = job->kc; 293 294 spin_lock_irqsave(&kc->job_lock, flags); 295 list_add_tail(&job->list, jobs); 296 spin_unlock_irqrestore(&kc->job_lock, flags); 297 } 298 299 300 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 301 { 302 unsigned long flags; 303 struct dm_kcopyd_client *kc = job->kc; 304 305 spin_lock_irqsave(&kc->job_lock, flags); 306 list_add(&job->list, jobs); 307 spin_unlock_irqrestore(&kc->job_lock, flags); 308 } 309 310 /* 311 * These three functions process 1 item from the corresponding 312 * job list. 313 * 314 * They return: 315 * < 0: error 316 * 0: success 317 * > 0: can't process yet. 318 */ 319 static int run_complete_job(struct kcopyd_job *job) 320 { 321 void *context = job->context; 322 int read_err = job->read_err; 323 unsigned long write_err = job->write_err; 324 dm_kcopyd_notify_fn fn = job->fn; 325 struct dm_kcopyd_client *kc = job->kc; 326 327 if (job->pages) 328 kcopyd_put_pages(kc, job->pages); 329 /* 330 * If this is the master job, the sub jobs have already 331 * completed so we can free everything. 332 */ 333 if (job->master_job == job) 334 mempool_free(job, kc->job_pool); 335 fn(read_err, write_err, context); 336 337 if (atomic_dec_and_test(&kc->nr_jobs)) 338 wake_up(&kc->destroyq); 339 340 return 0; 341 } 342 343 static void complete_io(unsigned long error, void *context) 344 { 345 struct kcopyd_job *job = (struct kcopyd_job *) context; 346 struct dm_kcopyd_client *kc = job->kc; 347 348 if (error) { 349 if (job->rw == WRITE) 350 job->write_err |= error; 351 else 352 job->read_err = 1; 353 354 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 355 push(&kc->complete_jobs, job); 356 wake(kc); 357 return; 358 } 359 } 360 361 if (job->rw == WRITE) 362 push(&kc->complete_jobs, job); 363 364 else { 365 job->rw = WRITE; 366 push(&kc->io_jobs, job); 367 } 368 369 wake(kc); 370 } 371 372 /* 373 * Request io on as many buffer heads as we can currently get for 374 * a particular job. 375 */ 376 static int run_io_job(struct kcopyd_job *job) 377 { 378 int r; 379 struct dm_io_request io_req = { 380 .bi_rw = job->rw, 381 .mem.type = DM_IO_PAGE_LIST, 382 .mem.ptr.pl = job->pages, 383 .mem.offset = job->offset, 384 .notify.fn = complete_io, 385 .notify.context = job, 386 .client = job->kc->io_client, 387 }; 388 389 if (job->rw == READ) 390 r = dm_io(&io_req, 1, &job->source, NULL); 391 else 392 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 393 394 return r; 395 } 396 397 static int run_pages_job(struct kcopyd_job *job) 398 { 399 int r; 400 401 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 402 PAGE_SIZE >> 9); 403 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 404 if (!r) { 405 /* this job is ready for io */ 406 push(&job->kc->io_jobs, job); 407 return 0; 408 } 409 410 if (r == -ENOMEM) 411 /* can't complete now */ 412 return 1; 413 414 return r; 415 } 416 417 /* 418 * Run through a list for as long as possible. Returns the count 419 * of successful jobs. 420 */ 421 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 422 int (*fn) (struct kcopyd_job *)) 423 { 424 struct kcopyd_job *job; 425 int r, count = 0; 426 427 while ((job = pop(jobs, kc))) { 428 429 r = fn(job); 430 431 if (r < 0) { 432 /* error this rogue job */ 433 if (job->rw == WRITE) 434 job->write_err = (unsigned long) -1L; 435 else 436 job->read_err = 1; 437 push(&kc->complete_jobs, job); 438 break; 439 } 440 441 if (r > 0) { 442 /* 443 * We couldn't service this job ATM, so 444 * push this job back onto the list. 445 */ 446 push_head(jobs, job); 447 break; 448 } 449 450 count++; 451 } 452 453 return count; 454 } 455 456 /* 457 * kcopyd does this every time it's woken up. 458 */ 459 static void do_work(struct work_struct *work) 460 { 461 struct dm_kcopyd_client *kc = container_of(work, 462 struct dm_kcopyd_client, kcopyd_work); 463 struct blk_plug plug; 464 465 /* 466 * The order that these are called is *very* important. 467 * complete jobs can free some pages for pages jobs. 468 * Pages jobs when successful will jump onto the io jobs 469 * list. io jobs call wake when they complete and it all 470 * starts again. 471 */ 472 blk_start_plug(&plug); 473 process_jobs(&kc->complete_jobs, kc, run_complete_job); 474 process_jobs(&kc->pages_jobs, kc, run_pages_job); 475 process_jobs(&kc->io_jobs, kc, run_io_job); 476 blk_finish_plug(&plug); 477 } 478 479 /* 480 * If we are copying a small region we just dispatch a single job 481 * to do the copy, otherwise the io has to be split up into many 482 * jobs. 483 */ 484 static void dispatch_job(struct kcopyd_job *job) 485 { 486 struct dm_kcopyd_client *kc = job->kc; 487 atomic_inc(&kc->nr_jobs); 488 if (unlikely(!job->source.count)) 489 push(&kc->complete_jobs, job); 490 else 491 push(&kc->pages_jobs, job); 492 wake(kc); 493 } 494 495 static void segment_complete(int read_err, unsigned long write_err, 496 void *context) 497 { 498 /* FIXME: tidy this function */ 499 sector_t progress = 0; 500 sector_t count = 0; 501 struct kcopyd_job *sub_job = (struct kcopyd_job *) context; 502 struct kcopyd_job *job = sub_job->master_job; 503 struct dm_kcopyd_client *kc = job->kc; 504 505 mutex_lock(&job->lock); 506 507 /* update the error */ 508 if (read_err) 509 job->read_err = 1; 510 511 if (write_err) 512 job->write_err |= write_err; 513 514 /* 515 * Only dispatch more work if there hasn't been an error. 516 */ 517 if ((!job->read_err && !job->write_err) || 518 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 519 /* get the next chunk of work */ 520 progress = job->progress; 521 count = job->source.count - progress; 522 if (count) { 523 if (count > SUB_JOB_SIZE) 524 count = SUB_JOB_SIZE; 525 526 job->progress += count; 527 } 528 } 529 mutex_unlock(&job->lock); 530 531 if (count) { 532 int i; 533 534 *sub_job = *job; 535 sub_job->source.sector += progress; 536 sub_job->source.count = count; 537 538 for (i = 0; i < job->num_dests; i++) { 539 sub_job->dests[i].sector += progress; 540 sub_job->dests[i].count = count; 541 } 542 543 sub_job->fn = segment_complete; 544 sub_job->context = sub_job; 545 dispatch_job(sub_job); 546 547 } else if (atomic_dec_and_test(&job->sub_jobs)) { 548 549 /* 550 * Queue the completion callback to the kcopyd thread. 551 * 552 * Some callers assume that all the completions are called 553 * from a single thread and don't race with each other. 554 * 555 * We must not call the callback directly here because this 556 * code may not be executing in the thread. 557 */ 558 push(&kc->complete_jobs, job); 559 wake(kc); 560 } 561 } 562 563 /* 564 * Create some sub jobs to share the work between them. 565 */ 566 static void split_job(struct kcopyd_job *master_job) 567 { 568 int i; 569 570 atomic_inc(&master_job->kc->nr_jobs); 571 572 atomic_set(&master_job->sub_jobs, SPLIT_COUNT); 573 for (i = 0; i < SPLIT_COUNT; i++) { 574 master_job[i + 1].master_job = master_job; 575 segment_complete(0, 0u, &master_job[i + 1]); 576 } 577 } 578 579 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 580 unsigned int num_dests, struct dm_io_region *dests, 581 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 582 { 583 struct kcopyd_job *job; 584 585 /* 586 * Allocate an array of jobs consisting of one master job 587 * followed by SPLIT_COUNT sub jobs. 588 */ 589 job = mempool_alloc(kc->job_pool, GFP_NOIO); 590 591 /* 592 * set up for the read. 593 */ 594 job->kc = kc; 595 job->flags = flags; 596 job->read_err = 0; 597 job->write_err = 0; 598 job->rw = READ; 599 600 job->source = *from; 601 602 job->num_dests = num_dests; 603 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 604 605 job->offset = 0; 606 job->nr_pages = 0; 607 job->pages = NULL; 608 609 job->fn = fn; 610 job->context = context; 611 job->master_job = job; 612 613 if (job->source.count <= SUB_JOB_SIZE) 614 dispatch_job(job); 615 else { 616 mutex_init(&job->lock); 617 job->progress = 0; 618 split_job(job); 619 } 620 621 return 0; 622 } 623 EXPORT_SYMBOL(dm_kcopyd_copy); 624 625 /* 626 * Cancels a kcopyd job, eg. someone might be deactivating a 627 * mirror. 628 */ 629 #if 0 630 int kcopyd_cancel(struct kcopyd_job *job, int block) 631 { 632 /* FIXME: finish */ 633 return -1; 634 } 635 #endif /* 0 */ 636 637 /*----------------------------------------------------------------- 638 * Client setup 639 *---------------------------------------------------------------*/ 640 struct dm_kcopyd_client *dm_kcopyd_client_create(void) 641 { 642 int r = -ENOMEM; 643 struct dm_kcopyd_client *kc; 644 645 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 646 if (!kc) 647 return ERR_PTR(-ENOMEM); 648 649 spin_lock_init(&kc->job_lock); 650 INIT_LIST_HEAD(&kc->complete_jobs); 651 INIT_LIST_HEAD(&kc->io_jobs); 652 INIT_LIST_HEAD(&kc->pages_jobs); 653 654 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 655 if (!kc->job_pool) 656 goto bad_slab; 657 658 INIT_WORK(&kc->kcopyd_work, do_work); 659 kc->kcopyd_wq = alloc_workqueue("kcopyd", 660 WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); 661 if (!kc->kcopyd_wq) 662 goto bad_workqueue; 663 664 kc->pages = NULL; 665 kc->nr_reserved_pages = kc->nr_free_pages = 0; 666 r = client_reserve_pages(kc, RESERVE_PAGES); 667 if (r) 668 goto bad_client_pages; 669 670 kc->io_client = dm_io_client_create(); 671 if (IS_ERR(kc->io_client)) { 672 r = PTR_ERR(kc->io_client); 673 goto bad_io_client; 674 } 675 676 init_waitqueue_head(&kc->destroyq); 677 atomic_set(&kc->nr_jobs, 0); 678 679 return kc; 680 681 bad_io_client: 682 client_free_pages(kc); 683 bad_client_pages: 684 destroy_workqueue(kc->kcopyd_wq); 685 bad_workqueue: 686 mempool_destroy(kc->job_pool); 687 bad_slab: 688 kfree(kc); 689 690 return ERR_PTR(r); 691 } 692 EXPORT_SYMBOL(dm_kcopyd_client_create); 693 694 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 695 { 696 /* Wait for completion of all jobs submitted by this client. */ 697 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 698 699 BUG_ON(!list_empty(&kc->complete_jobs)); 700 BUG_ON(!list_empty(&kc->io_jobs)); 701 BUG_ON(!list_empty(&kc->pages_jobs)); 702 destroy_workqueue(kc->kcopyd_wq); 703 dm_io_client_destroy(kc->io_client); 704 client_free_pages(kc); 705 mempool_destroy(kc->job_pool); 706 kfree(kc); 707 } 708 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 709