1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <asm/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/device-mapper.h> 26 #include <linux/dm-kcopyd.h> 27 28 #include "dm.h" 29 30 /*----------------------------------------------------------------- 31 * Each kcopyd client has its own little pool of preallocated 32 * pages for kcopyd io. 33 *---------------------------------------------------------------*/ 34 struct dm_kcopyd_client { 35 spinlock_t lock; 36 struct page_list *pages; 37 unsigned int nr_pages; 38 unsigned int nr_free_pages; 39 40 struct dm_io_client *io_client; 41 42 wait_queue_head_t destroyq; 43 atomic_t nr_jobs; 44 45 mempool_t *job_pool; 46 47 struct workqueue_struct *kcopyd_wq; 48 struct work_struct kcopyd_work; 49 50 /* 51 * We maintain three lists of jobs: 52 * 53 * i) jobs waiting for pages 54 * ii) jobs that have pages, and are waiting for the io to be issued. 55 * iii) jobs that have completed. 56 * 57 * All three of these are protected by job_lock. 58 */ 59 spinlock_t job_lock; 60 struct list_head complete_jobs; 61 struct list_head io_jobs; 62 struct list_head pages_jobs; 63 }; 64 65 static void wake(struct dm_kcopyd_client *kc) 66 { 67 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 68 } 69 70 static struct page_list *alloc_pl(void) 71 { 72 struct page_list *pl; 73 74 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 75 if (!pl) 76 return NULL; 77 78 pl->page = alloc_page(GFP_KERNEL); 79 if (!pl->page) { 80 kfree(pl); 81 return NULL; 82 } 83 84 return pl; 85 } 86 87 static void free_pl(struct page_list *pl) 88 { 89 __free_page(pl->page); 90 kfree(pl); 91 } 92 93 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 94 unsigned int nr, struct page_list **pages) 95 { 96 struct page_list *pl; 97 98 spin_lock(&kc->lock); 99 if (kc->nr_free_pages < nr) { 100 spin_unlock(&kc->lock); 101 return -ENOMEM; 102 } 103 104 kc->nr_free_pages -= nr; 105 for (*pages = pl = kc->pages; --nr; pl = pl->next) 106 ; 107 108 kc->pages = pl->next; 109 pl->next = NULL; 110 111 spin_unlock(&kc->lock); 112 113 return 0; 114 } 115 116 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 117 { 118 struct page_list *cursor; 119 120 spin_lock(&kc->lock); 121 for (cursor = pl; cursor->next; cursor = cursor->next) 122 kc->nr_free_pages++; 123 124 kc->nr_free_pages++; 125 cursor->next = kc->pages; 126 kc->pages = pl; 127 spin_unlock(&kc->lock); 128 } 129 130 /* 131 * These three functions resize the page pool. 132 */ 133 static void drop_pages(struct page_list *pl) 134 { 135 struct page_list *next; 136 137 while (pl) { 138 next = pl->next; 139 free_pl(pl); 140 pl = next; 141 } 142 } 143 144 static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr) 145 { 146 unsigned int i; 147 struct page_list *pl = NULL, *next; 148 149 for (i = 0; i < nr; i++) { 150 next = alloc_pl(); 151 if (!next) { 152 if (pl) 153 drop_pages(pl); 154 return -ENOMEM; 155 } 156 next->next = pl; 157 pl = next; 158 } 159 160 kcopyd_put_pages(kc, pl); 161 kc->nr_pages += nr; 162 return 0; 163 } 164 165 static void client_free_pages(struct dm_kcopyd_client *kc) 166 { 167 BUG_ON(kc->nr_free_pages != kc->nr_pages); 168 drop_pages(kc->pages); 169 kc->pages = NULL; 170 kc->nr_free_pages = kc->nr_pages = 0; 171 } 172 173 /*----------------------------------------------------------------- 174 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 175 * for this reason we use a mempool to prevent the client from 176 * ever having to do io (which could cause a deadlock). 177 *---------------------------------------------------------------*/ 178 struct kcopyd_job { 179 struct dm_kcopyd_client *kc; 180 struct list_head list; 181 unsigned long flags; 182 183 /* 184 * Error state of the job. 185 */ 186 int read_err; 187 unsigned long write_err; 188 189 /* 190 * Either READ or WRITE 191 */ 192 int rw; 193 struct dm_io_region source; 194 195 /* 196 * The destinations for the transfer. 197 */ 198 unsigned int num_dests; 199 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 200 201 sector_t offset; 202 unsigned int nr_pages; 203 struct page_list *pages; 204 205 /* 206 * Set this to ensure you are notified when the job has 207 * completed. 'context' is for callback to use. 208 */ 209 dm_kcopyd_notify_fn fn; 210 void *context; 211 212 /* 213 * These fields are only used if the job has been split 214 * into more manageable parts. 215 */ 216 struct mutex lock; 217 atomic_t sub_jobs; 218 sector_t progress; 219 }; 220 221 /* FIXME: this should scale with the number of pages */ 222 #define MIN_JOBS 512 223 224 static struct kmem_cache *_job_cache; 225 226 int __init dm_kcopyd_init(void) 227 { 228 _job_cache = KMEM_CACHE(kcopyd_job, 0); 229 if (!_job_cache) 230 return -ENOMEM; 231 232 return 0; 233 } 234 235 void dm_kcopyd_exit(void) 236 { 237 kmem_cache_destroy(_job_cache); 238 _job_cache = NULL; 239 } 240 241 /* 242 * Functions to push and pop a job onto the head of a given job 243 * list. 244 */ 245 static struct kcopyd_job *pop(struct list_head *jobs, 246 struct dm_kcopyd_client *kc) 247 { 248 struct kcopyd_job *job = NULL; 249 unsigned long flags; 250 251 spin_lock_irqsave(&kc->job_lock, flags); 252 253 if (!list_empty(jobs)) { 254 job = list_entry(jobs->next, struct kcopyd_job, list); 255 list_del(&job->list); 256 } 257 spin_unlock_irqrestore(&kc->job_lock, flags); 258 259 return job; 260 } 261 262 static void push(struct list_head *jobs, struct kcopyd_job *job) 263 { 264 unsigned long flags; 265 struct dm_kcopyd_client *kc = job->kc; 266 267 spin_lock_irqsave(&kc->job_lock, flags); 268 list_add_tail(&job->list, jobs); 269 spin_unlock_irqrestore(&kc->job_lock, flags); 270 } 271 272 273 static void push_head(struct list_head *jobs, struct kcopyd_job *job) 274 { 275 unsigned long flags; 276 struct dm_kcopyd_client *kc = job->kc; 277 278 spin_lock_irqsave(&kc->job_lock, flags); 279 list_add(&job->list, jobs); 280 spin_unlock_irqrestore(&kc->job_lock, flags); 281 } 282 283 /* 284 * These three functions process 1 item from the corresponding 285 * job list. 286 * 287 * They return: 288 * < 0: error 289 * 0: success 290 * > 0: can't process yet. 291 */ 292 static int run_complete_job(struct kcopyd_job *job) 293 { 294 void *context = job->context; 295 int read_err = job->read_err; 296 unsigned long write_err = job->write_err; 297 dm_kcopyd_notify_fn fn = job->fn; 298 struct dm_kcopyd_client *kc = job->kc; 299 300 if (job->pages) 301 kcopyd_put_pages(kc, job->pages); 302 mempool_free(job, kc->job_pool); 303 fn(read_err, write_err, context); 304 305 if (atomic_dec_and_test(&kc->nr_jobs)) 306 wake_up(&kc->destroyq); 307 308 return 0; 309 } 310 311 static void complete_io(unsigned long error, void *context) 312 { 313 struct kcopyd_job *job = (struct kcopyd_job *) context; 314 struct dm_kcopyd_client *kc = job->kc; 315 316 if (error) { 317 if (job->rw == WRITE) 318 job->write_err |= error; 319 else 320 job->read_err = 1; 321 322 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 323 push(&kc->complete_jobs, job); 324 wake(kc); 325 return; 326 } 327 } 328 329 if (job->rw == WRITE) 330 push(&kc->complete_jobs, job); 331 332 else { 333 job->rw = WRITE; 334 push(&kc->io_jobs, job); 335 } 336 337 wake(kc); 338 } 339 340 /* 341 * Request io on as many buffer heads as we can currently get for 342 * a particular job. 343 */ 344 static int run_io_job(struct kcopyd_job *job) 345 { 346 int r; 347 struct dm_io_request io_req = { 348 .bi_rw = job->rw, 349 .mem.type = DM_IO_PAGE_LIST, 350 .mem.ptr.pl = job->pages, 351 .mem.offset = job->offset, 352 .notify.fn = complete_io, 353 .notify.context = job, 354 .client = job->kc->io_client, 355 }; 356 357 if (job->rw == READ) 358 r = dm_io(&io_req, 1, &job->source, NULL); 359 else 360 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 361 362 return r; 363 } 364 365 static int run_pages_job(struct kcopyd_job *job) 366 { 367 int r; 368 369 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 370 PAGE_SIZE >> 9); 371 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 372 if (!r) { 373 /* this job is ready for io */ 374 push(&job->kc->io_jobs, job); 375 return 0; 376 } 377 378 if (r == -ENOMEM) 379 /* can't complete now */ 380 return 1; 381 382 return r; 383 } 384 385 /* 386 * Run through a list for as long as possible. Returns the count 387 * of successful jobs. 388 */ 389 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 390 int (*fn) (struct kcopyd_job *)) 391 { 392 struct kcopyd_job *job; 393 int r, count = 0; 394 395 while ((job = pop(jobs, kc))) { 396 397 r = fn(job); 398 399 if (r < 0) { 400 /* error this rogue job */ 401 if (job->rw == WRITE) 402 job->write_err = (unsigned long) -1L; 403 else 404 job->read_err = 1; 405 push(&kc->complete_jobs, job); 406 break; 407 } 408 409 if (r > 0) { 410 /* 411 * We couldn't service this job ATM, so 412 * push this job back onto the list. 413 */ 414 push_head(jobs, job); 415 break; 416 } 417 418 count++; 419 } 420 421 return count; 422 } 423 424 /* 425 * kcopyd does this every time it's woken up. 426 */ 427 static void do_work(struct work_struct *work) 428 { 429 struct dm_kcopyd_client *kc = container_of(work, 430 struct dm_kcopyd_client, kcopyd_work); 431 struct blk_plug plug; 432 433 /* 434 * The order that these are called is *very* important. 435 * complete jobs can free some pages for pages jobs. 436 * Pages jobs when successful will jump onto the io jobs 437 * list. io jobs call wake when they complete and it all 438 * starts again. 439 */ 440 blk_start_plug(&plug); 441 process_jobs(&kc->complete_jobs, kc, run_complete_job); 442 process_jobs(&kc->pages_jobs, kc, run_pages_job); 443 process_jobs(&kc->io_jobs, kc, run_io_job); 444 blk_finish_plug(&plug); 445 } 446 447 /* 448 * If we are copying a small region we just dispatch a single job 449 * to do the copy, otherwise the io has to be split up into many 450 * jobs. 451 */ 452 static void dispatch_job(struct kcopyd_job *job) 453 { 454 struct dm_kcopyd_client *kc = job->kc; 455 atomic_inc(&kc->nr_jobs); 456 if (unlikely(!job->source.count)) 457 push(&kc->complete_jobs, job); 458 else 459 push(&kc->pages_jobs, job); 460 wake(kc); 461 } 462 463 #define SUB_JOB_SIZE 128 464 static void segment_complete(int read_err, unsigned long write_err, 465 void *context) 466 { 467 /* FIXME: tidy this function */ 468 sector_t progress = 0; 469 sector_t count = 0; 470 struct kcopyd_job *job = (struct kcopyd_job *) context; 471 struct dm_kcopyd_client *kc = job->kc; 472 473 mutex_lock(&job->lock); 474 475 /* update the error */ 476 if (read_err) 477 job->read_err = 1; 478 479 if (write_err) 480 job->write_err |= write_err; 481 482 /* 483 * Only dispatch more work if there hasn't been an error. 484 */ 485 if ((!job->read_err && !job->write_err) || 486 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 487 /* get the next chunk of work */ 488 progress = job->progress; 489 count = job->source.count - progress; 490 if (count) { 491 if (count > SUB_JOB_SIZE) 492 count = SUB_JOB_SIZE; 493 494 job->progress += count; 495 } 496 } 497 mutex_unlock(&job->lock); 498 499 if (count) { 500 int i; 501 struct kcopyd_job *sub_job = mempool_alloc(kc->job_pool, 502 GFP_NOIO); 503 504 *sub_job = *job; 505 sub_job->source.sector += progress; 506 sub_job->source.count = count; 507 508 for (i = 0; i < job->num_dests; i++) { 509 sub_job->dests[i].sector += progress; 510 sub_job->dests[i].count = count; 511 } 512 513 sub_job->fn = segment_complete; 514 sub_job->context = job; 515 dispatch_job(sub_job); 516 517 } else if (atomic_dec_and_test(&job->sub_jobs)) { 518 519 /* 520 * Queue the completion callback to the kcopyd thread. 521 * 522 * Some callers assume that all the completions are called 523 * from a single thread and don't race with each other. 524 * 525 * We must not call the callback directly here because this 526 * code may not be executing in the thread. 527 */ 528 push(&kc->complete_jobs, job); 529 wake(kc); 530 } 531 } 532 533 /* 534 * Create some little jobs that will do the move between 535 * them. 536 */ 537 #define SPLIT_COUNT 8 538 static void split_job(struct kcopyd_job *job) 539 { 540 int i; 541 542 atomic_inc(&job->kc->nr_jobs); 543 544 atomic_set(&job->sub_jobs, SPLIT_COUNT); 545 for (i = 0; i < SPLIT_COUNT; i++) 546 segment_complete(0, 0u, job); 547 } 548 549 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 550 unsigned int num_dests, struct dm_io_region *dests, 551 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 552 { 553 struct kcopyd_job *job; 554 555 /* 556 * Allocate a new job. 557 */ 558 job = mempool_alloc(kc->job_pool, GFP_NOIO); 559 560 /* 561 * set up for the read. 562 */ 563 job->kc = kc; 564 job->flags = flags; 565 job->read_err = 0; 566 job->write_err = 0; 567 job->rw = READ; 568 569 job->source = *from; 570 571 job->num_dests = num_dests; 572 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 573 574 job->offset = 0; 575 job->nr_pages = 0; 576 job->pages = NULL; 577 578 job->fn = fn; 579 job->context = context; 580 581 if (job->source.count < SUB_JOB_SIZE) 582 dispatch_job(job); 583 584 else { 585 mutex_init(&job->lock); 586 job->progress = 0; 587 split_job(job); 588 } 589 590 return 0; 591 } 592 EXPORT_SYMBOL(dm_kcopyd_copy); 593 594 /* 595 * Cancels a kcopyd job, eg. someone might be deactivating a 596 * mirror. 597 */ 598 #if 0 599 int kcopyd_cancel(struct kcopyd_job *job, int block) 600 { 601 /* FIXME: finish */ 602 return -1; 603 } 604 #endif /* 0 */ 605 606 /*----------------------------------------------------------------- 607 * Client setup 608 *---------------------------------------------------------------*/ 609 int dm_kcopyd_client_create(unsigned int nr_pages, 610 struct dm_kcopyd_client **result) 611 { 612 int r = -ENOMEM; 613 struct dm_kcopyd_client *kc; 614 615 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 616 if (!kc) 617 return -ENOMEM; 618 619 spin_lock_init(&kc->lock); 620 spin_lock_init(&kc->job_lock); 621 INIT_LIST_HEAD(&kc->complete_jobs); 622 INIT_LIST_HEAD(&kc->io_jobs); 623 INIT_LIST_HEAD(&kc->pages_jobs); 624 625 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 626 if (!kc->job_pool) 627 goto bad_slab; 628 629 INIT_WORK(&kc->kcopyd_work, do_work); 630 kc->kcopyd_wq = alloc_workqueue("kcopyd", 631 WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); 632 if (!kc->kcopyd_wq) 633 goto bad_workqueue; 634 635 kc->pages = NULL; 636 kc->nr_pages = kc->nr_free_pages = 0; 637 r = client_alloc_pages(kc, nr_pages); 638 if (r) 639 goto bad_client_pages; 640 641 kc->io_client = dm_io_client_create(nr_pages); 642 if (IS_ERR(kc->io_client)) { 643 r = PTR_ERR(kc->io_client); 644 goto bad_io_client; 645 } 646 647 init_waitqueue_head(&kc->destroyq); 648 atomic_set(&kc->nr_jobs, 0); 649 650 *result = kc; 651 return 0; 652 653 bad_io_client: 654 client_free_pages(kc); 655 bad_client_pages: 656 destroy_workqueue(kc->kcopyd_wq); 657 bad_workqueue: 658 mempool_destroy(kc->job_pool); 659 bad_slab: 660 kfree(kc); 661 662 return r; 663 } 664 EXPORT_SYMBOL(dm_kcopyd_client_create); 665 666 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 667 { 668 /* Wait for completion of all jobs submitted by this client. */ 669 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 670 671 BUG_ON(!list_empty(&kc->complete_jobs)); 672 BUG_ON(!list_empty(&kc->io_jobs)); 673 BUG_ON(!list_empty(&kc->pages_jobs)); 674 destroy_workqueue(kc->kcopyd_wq); 675 dm_io_client_destroy(kc->io_client); 676 client_free_pages(kc); 677 mempool_destroy(kc->job_pool); 678 kfree(kc); 679 } 680 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 681