1 /* 2 * Copyright (C) 2002 Sistina Software (UK) Limited. 3 * Copyright (C) 2006 Red Hat GmbH 4 * 5 * This file is released under the GPL. 6 * 7 * Kcopyd provides a simple interface for copying an area of one 8 * block-device to one or more other block-devices, with an asynchronous 9 * completion notification. 10 */ 11 12 #include <linux/types.h> 13 #include <asm/atomic.h> 14 #include <linux/blkdev.h> 15 #include <linux/fs.h> 16 #include <linux/init.h> 17 #include <linux/list.h> 18 #include <linux/mempool.h> 19 #include <linux/module.h> 20 #include <linux/pagemap.h> 21 #include <linux/slab.h> 22 #include <linux/vmalloc.h> 23 #include <linux/workqueue.h> 24 #include <linux/mutex.h> 25 #include <linux/dm-kcopyd.h> 26 27 #include "dm.h" 28 29 /*----------------------------------------------------------------- 30 * Each kcopyd client has its own little pool of preallocated 31 * pages for kcopyd io. 32 *---------------------------------------------------------------*/ 33 struct dm_kcopyd_client { 34 spinlock_t lock; 35 struct page_list *pages; 36 unsigned int nr_pages; 37 unsigned int nr_free_pages; 38 39 struct dm_io_client *io_client; 40 41 wait_queue_head_t destroyq; 42 atomic_t nr_jobs; 43 44 mempool_t *job_pool; 45 46 struct workqueue_struct *kcopyd_wq; 47 struct work_struct kcopyd_work; 48 49 /* 50 * We maintain three lists of jobs: 51 * 52 * i) jobs waiting for pages 53 * ii) jobs that have pages, and are waiting for the io to be issued. 54 * iii) jobs that have completed. 55 * 56 * All three of these are protected by job_lock. 57 */ 58 spinlock_t job_lock; 59 struct list_head complete_jobs; 60 struct list_head io_jobs; 61 struct list_head pages_jobs; 62 }; 63 64 static void wake(struct dm_kcopyd_client *kc) 65 { 66 queue_work(kc->kcopyd_wq, &kc->kcopyd_work); 67 } 68 69 static struct page_list *alloc_pl(void) 70 { 71 struct page_list *pl; 72 73 pl = kmalloc(sizeof(*pl), GFP_KERNEL); 74 if (!pl) 75 return NULL; 76 77 pl->page = alloc_page(GFP_KERNEL); 78 if (!pl->page) { 79 kfree(pl); 80 return NULL; 81 } 82 83 return pl; 84 } 85 86 static void free_pl(struct page_list *pl) 87 { 88 __free_page(pl->page); 89 kfree(pl); 90 } 91 92 static int kcopyd_get_pages(struct dm_kcopyd_client *kc, 93 unsigned int nr, struct page_list **pages) 94 { 95 struct page_list *pl; 96 97 spin_lock(&kc->lock); 98 if (kc->nr_free_pages < nr) { 99 spin_unlock(&kc->lock); 100 return -ENOMEM; 101 } 102 103 kc->nr_free_pages -= nr; 104 for (*pages = pl = kc->pages; --nr; pl = pl->next) 105 ; 106 107 kc->pages = pl->next; 108 pl->next = NULL; 109 110 spin_unlock(&kc->lock); 111 112 return 0; 113 } 114 115 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl) 116 { 117 struct page_list *cursor; 118 119 spin_lock(&kc->lock); 120 for (cursor = pl; cursor->next; cursor = cursor->next) 121 kc->nr_free_pages++; 122 123 kc->nr_free_pages++; 124 cursor->next = kc->pages; 125 kc->pages = pl; 126 spin_unlock(&kc->lock); 127 } 128 129 /* 130 * These three functions resize the page pool. 131 */ 132 static void drop_pages(struct page_list *pl) 133 { 134 struct page_list *next; 135 136 while (pl) { 137 next = pl->next; 138 free_pl(pl); 139 pl = next; 140 } 141 } 142 143 static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr) 144 { 145 unsigned int i; 146 struct page_list *pl = NULL, *next; 147 148 for (i = 0; i < nr; i++) { 149 next = alloc_pl(); 150 if (!next) { 151 if (pl) 152 drop_pages(pl); 153 return -ENOMEM; 154 } 155 next->next = pl; 156 pl = next; 157 } 158 159 kcopyd_put_pages(kc, pl); 160 kc->nr_pages += nr; 161 return 0; 162 } 163 164 static void client_free_pages(struct dm_kcopyd_client *kc) 165 { 166 BUG_ON(kc->nr_free_pages != kc->nr_pages); 167 drop_pages(kc->pages); 168 kc->pages = NULL; 169 kc->nr_free_pages = kc->nr_pages = 0; 170 } 171 172 /*----------------------------------------------------------------- 173 * kcopyd_jobs need to be allocated by the *clients* of kcopyd, 174 * for this reason we use a mempool to prevent the client from 175 * ever having to do io (which could cause a deadlock). 176 *---------------------------------------------------------------*/ 177 struct kcopyd_job { 178 struct dm_kcopyd_client *kc; 179 struct list_head list; 180 unsigned long flags; 181 182 /* 183 * Error state of the job. 184 */ 185 int read_err; 186 unsigned long write_err; 187 188 /* 189 * Either READ or WRITE 190 */ 191 int rw; 192 struct dm_io_region source; 193 194 /* 195 * The destinations for the transfer. 196 */ 197 unsigned int num_dests; 198 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS]; 199 200 sector_t offset; 201 unsigned int nr_pages; 202 struct page_list *pages; 203 204 /* 205 * Set this to ensure you are notified when the job has 206 * completed. 'context' is for callback to use. 207 */ 208 dm_kcopyd_notify_fn fn; 209 void *context; 210 211 /* 212 * These fields are only used if the job has been split 213 * into more manageable parts. 214 */ 215 struct mutex lock; 216 atomic_t sub_jobs; 217 sector_t progress; 218 }; 219 220 /* FIXME: this should scale with the number of pages */ 221 #define MIN_JOBS 512 222 223 static struct kmem_cache *_job_cache; 224 225 int __init dm_kcopyd_init(void) 226 { 227 _job_cache = KMEM_CACHE(kcopyd_job, 0); 228 if (!_job_cache) 229 return -ENOMEM; 230 231 return 0; 232 } 233 234 void dm_kcopyd_exit(void) 235 { 236 kmem_cache_destroy(_job_cache); 237 _job_cache = NULL; 238 } 239 240 /* 241 * Functions to push and pop a job onto the head of a given job 242 * list. 243 */ 244 static struct kcopyd_job *pop(struct list_head *jobs, 245 struct dm_kcopyd_client *kc) 246 { 247 struct kcopyd_job *job = NULL; 248 unsigned long flags; 249 250 spin_lock_irqsave(&kc->job_lock, flags); 251 252 if (!list_empty(jobs)) { 253 job = list_entry(jobs->next, struct kcopyd_job, list); 254 list_del(&job->list); 255 } 256 spin_unlock_irqrestore(&kc->job_lock, flags); 257 258 return job; 259 } 260 261 static void push(struct list_head *jobs, struct kcopyd_job *job) 262 { 263 unsigned long flags; 264 struct dm_kcopyd_client *kc = job->kc; 265 266 spin_lock_irqsave(&kc->job_lock, flags); 267 list_add_tail(&job->list, jobs); 268 spin_unlock_irqrestore(&kc->job_lock, flags); 269 } 270 271 /* 272 * These three functions process 1 item from the corresponding 273 * job list. 274 * 275 * They return: 276 * < 0: error 277 * 0: success 278 * > 0: can't process yet. 279 */ 280 static int run_complete_job(struct kcopyd_job *job) 281 { 282 void *context = job->context; 283 int read_err = job->read_err; 284 unsigned long write_err = job->write_err; 285 dm_kcopyd_notify_fn fn = job->fn; 286 struct dm_kcopyd_client *kc = job->kc; 287 288 kcopyd_put_pages(kc, job->pages); 289 mempool_free(job, kc->job_pool); 290 fn(read_err, write_err, context); 291 292 if (atomic_dec_and_test(&kc->nr_jobs)) 293 wake_up(&kc->destroyq); 294 295 return 0; 296 } 297 298 static void complete_io(unsigned long error, void *context) 299 { 300 struct kcopyd_job *job = (struct kcopyd_job *) context; 301 struct dm_kcopyd_client *kc = job->kc; 302 303 if (error) { 304 if (job->rw == WRITE) 305 job->write_err |= error; 306 else 307 job->read_err = 1; 308 309 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 310 push(&kc->complete_jobs, job); 311 wake(kc); 312 return; 313 } 314 } 315 316 if (job->rw == WRITE) 317 push(&kc->complete_jobs, job); 318 319 else { 320 job->rw = WRITE; 321 push(&kc->io_jobs, job); 322 } 323 324 wake(kc); 325 } 326 327 /* 328 * Request io on as many buffer heads as we can currently get for 329 * a particular job. 330 */ 331 static int run_io_job(struct kcopyd_job *job) 332 { 333 int r; 334 struct dm_io_request io_req = { 335 .bi_rw = job->rw | (1 << BIO_RW_SYNC), 336 .mem.type = DM_IO_PAGE_LIST, 337 .mem.ptr.pl = job->pages, 338 .mem.offset = job->offset, 339 .notify.fn = complete_io, 340 .notify.context = job, 341 .client = job->kc->io_client, 342 }; 343 344 if (job->rw == READ) 345 r = dm_io(&io_req, 1, &job->source, NULL); 346 else 347 r = dm_io(&io_req, job->num_dests, job->dests, NULL); 348 349 return r; 350 } 351 352 static int run_pages_job(struct kcopyd_job *job) 353 { 354 int r; 355 356 job->nr_pages = dm_div_up(job->dests[0].count + job->offset, 357 PAGE_SIZE >> 9); 358 r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages); 359 if (!r) { 360 /* this job is ready for io */ 361 push(&job->kc->io_jobs, job); 362 return 0; 363 } 364 365 if (r == -ENOMEM) 366 /* can't complete now */ 367 return 1; 368 369 return r; 370 } 371 372 /* 373 * Run through a list for as long as possible. Returns the count 374 * of successful jobs. 375 */ 376 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, 377 int (*fn) (struct kcopyd_job *)) 378 { 379 struct kcopyd_job *job; 380 int r, count = 0; 381 382 while ((job = pop(jobs, kc))) { 383 384 r = fn(job); 385 386 if (r < 0) { 387 /* error this rogue job */ 388 if (job->rw == WRITE) 389 job->write_err = (unsigned long) -1L; 390 else 391 job->read_err = 1; 392 push(&kc->complete_jobs, job); 393 break; 394 } 395 396 if (r > 0) { 397 /* 398 * We couldn't service this job ATM, so 399 * push this job back onto the list. 400 */ 401 push(jobs, job); 402 break; 403 } 404 405 count++; 406 } 407 408 return count; 409 } 410 411 /* 412 * kcopyd does this every time it's woken up. 413 */ 414 static void do_work(struct work_struct *work) 415 { 416 struct dm_kcopyd_client *kc = container_of(work, 417 struct dm_kcopyd_client, kcopyd_work); 418 419 /* 420 * The order that these are called is *very* important. 421 * complete jobs can free some pages for pages jobs. 422 * Pages jobs when successful will jump onto the io jobs 423 * list. io jobs call wake when they complete and it all 424 * starts again. 425 */ 426 process_jobs(&kc->complete_jobs, kc, run_complete_job); 427 process_jobs(&kc->pages_jobs, kc, run_pages_job); 428 process_jobs(&kc->io_jobs, kc, run_io_job); 429 } 430 431 /* 432 * If we are copying a small region we just dispatch a single job 433 * to do the copy, otherwise the io has to be split up into many 434 * jobs. 435 */ 436 static void dispatch_job(struct kcopyd_job *job) 437 { 438 struct dm_kcopyd_client *kc = job->kc; 439 atomic_inc(&kc->nr_jobs); 440 push(&kc->pages_jobs, job); 441 wake(kc); 442 } 443 444 #define SUB_JOB_SIZE 128 445 static void segment_complete(int read_err, unsigned long write_err, 446 void *context) 447 { 448 /* FIXME: tidy this function */ 449 sector_t progress = 0; 450 sector_t count = 0; 451 struct kcopyd_job *job = (struct kcopyd_job *) context; 452 453 mutex_lock(&job->lock); 454 455 /* update the error */ 456 if (read_err) 457 job->read_err = 1; 458 459 if (write_err) 460 job->write_err |= write_err; 461 462 /* 463 * Only dispatch more work if there hasn't been an error. 464 */ 465 if ((!job->read_err && !job->write_err) || 466 test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) { 467 /* get the next chunk of work */ 468 progress = job->progress; 469 count = job->source.count - progress; 470 if (count) { 471 if (count > SUB_JOB_SIZE) 472 count = SUB_JOB_SIZE; 473 474 job->progress += count; 475 } 476 } 477 mutex_unlock(&job->lock); 478 479 if (count) { 480 int i; 481 struct kcopyd_job *sub_job = mempool_alloc(job->kc->job_pool, 482 GFP_NOIO); 483 484 *sub_job = *job; 485 sub_job->source.sector += progress; 486 sub_job->source.count = count; 487 488 for (i = 0; i < job->num_dests; i++) { 489 sub_job->dests[i].sector += progress; 490 sub_job->dests[i].count = count; 491 } 492 493 sub_job->fn = segment_complete; 494 sub_job->context = job; 495 dispatch_job(sub_job); 496 497 } else if (atomic_dec_and_test(&job->sub_jobs)) { 498 499 /* 500 * To avoid a race we must keep the job around 501 * until after the notify function has completed. 502 * Otherwise the client may try and stop the job 503 * after we've completed. 504 */ 505 job->fn(read_err, write_err, job->context); 506 mempool_free(job, job->kc->job_pool); 507 } 508 } 509 510 /* 511 * Create some little jobs that will do the move between 512 * them. 513 */ 514 #define SPLIT_COUNT 8 515 static void split_job(struct kcopyd_job *job) 516 { 517 int i; 518 519 atomic_set(&job->sub_jobs, SPLIT_COUNT); 520 for (i = 0; i < SPLIT_COUNT; i++) 521 segment_complete(0, 0u, job); 522 } 523 524 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from, 525 unsigned int num_dests, struct dm_io_region *dests, 526 unsigned int flags, dm_kcopyd_notify_fn fn, void *context) 527 { 528 struct kcopyd_job *job; 529 530 /* 531 * Allocate a new job. 532 */ 533 job = mempool_alloc(kc->job_pool, GFP_NOIO); 534 535 /* 536 * set up for the read. 537 */ 538 job->kc = kc; 539 job->flags = flags; 540 job->read_err = 0; 541 job->write_err = 0; 542 job->rw = READ; 543 544 job->source = *from; 545 546 job->num_dests = num_dests; 547 memcpy(&job->dests, dests, sizeof(*dests) * num_dests); 548 549 job->offset = 0; 550 job->nr_pages = 0; 551 job->pages = NULL; 552 553 job->fn = fn; 554 job->context = context; 555 556 if (job->source.count < SUB_JOB_SIZE) 557 dispatch_job(job); 558 559 else { 560 mutex_init(&job->lock); 561 job->progress = 0; 562 split_job(job); 563 } 564 565 return 0; 566 } 567 EXPORT_SYMBOL(dm_kcopyd_copy); 568 569 /* 570 * Cancels a kcopyd job, eg. someone might be deactivating a 571 * mirror. 572 */ 573 #if 0 574 int kcopyd_cancel(struct kcopyd_job *job, int block) 575 { 576 /* FIXME: finish */ 577 return -1; 578 } 579 #endif /* 0 */ 580 581 /*----------------------------------------------------------------- 582 * Client setup 583 *---------------------------------------------------------------*/ 584 int dm_kcopyd_client_create(unsigned int nr_pages, 585 struct dm_kcopyd_client **result) 586 { 587 int r = -ENOMEM; 588 struct dm_kcopyd_client *kc; 589 590 kc = kmalloc(sizeof(*kc), GFP_KERNEL); 591 if (!kc) 592 return -ENOMEM; 593 594 spin_lock_init(&kc->lock); 595 spin_lock_init(&kc->job_lock); 596 INIT_LIST_HEAD(&kc->complete_jobs); 597 INIT_LIST_HEAD(&kc->io_jobs); 598 INIT_LIST_HEAD(&kc->pages_jobs); 599 600 kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache); 601 if (!kc->job_pool) 602 goto bad_slab; 603 604 INIT_WORK(&kc->kcopyd_work, do_work); 605 kc->kcopyd_wq = create_singlethread_workqueue("kcopyd"); 606 if (!kc->kcopyd_wq) 607 goto bad_workqueue; 608 609 kc->pages = NULL; 610 kc->nr_pages = kc->nr_free_pages = 0; 611 r = client_alloc_pages(kc, nr_pages); 612 if (r) 613 goto bad_client_pages; 614 615 kc->io_client = dm_io_client_create(nr_pages); 616 if (IS_ERR(kc->io_client)) { 617 r = PTR_ERR(kc->io_client); 618 goto bad_io_client; 619 } 620 621 init_waitqueue_head(&kc->destroyq); 622 atomic_set(&kc->nr_jobs, 0); 623 624 *result = kc; 625 return 0; 626 627 bad_io_client: 628 client_free_pages(kc); 629 bad_client_pages: 630 destroy_workqueue(kc->kcopyd_wq); 631 bad_workqueue: 632 mempool_destroy(kc->job_pool); 633 bad_slab: 634 kfree(kc); 635 636 return r; 637 } 638 EXPORT_SYMBOL(dm_kcopyd_client_create); 639 640 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc) 641 { 642 /* Wait for completion of all jobs submitted by this client. */ 643 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs)); 644 645 BUG_ON(!list_empty(&kc->complete_jobs)); 646 BUG_ON(!list_empty(&kc->io_jobs)); 647 BUG_ON(!list_empty(&kc->pages_jobs)); 648 destroy_workqueue(kc->kcopyd_wq); 649 dm_io_client_destroy(kc->io_client); 650 client_free_pages(kc); 651 mempool_destroy(kc->job_pool); 652 kfree(kc); 653 } 654 EXPORT_SYMBOL(dm_kcopyd_client_destroy); 655