1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/slab.h> 21 #include <linux/list.h> 22 #include <linux/spinlock.h> 23 #include <linux/freezer.h> 24 #include "async-thread.h" 25 26 #define WORK_QUEUED_BIT 0 27 #define WORK_DONE_BIT 1 28 #define WORK_ORDER_DONE_BIT 2 29 #define WORK_HIGH_PRIO_BIT 3 30 31 /* 32 * container for the kthread task pointer and the list of pending work 33 * One of these is allocated per thread. 34 */ 35 struct btrfs_worker_thread { 36 /* pool we belong to */ 37 struct btrfs_workers *workers; 38 39 /* list of struct btrfs_work that are waiting for service */ 40 struct list_head pending; 41 struct list_head prio_pending; 42 43 /* list of worker threads from struct btrfs_workers */ 44 struct list_head worker_list; 45 46 /* kthread */ 47 struct task_struct *task; 48 49 /* number of things on the pending list */ 50 atomic_t num_pending; 51 52 /* reference counter for this struct */ 53 atomic_t refs; 54 55 unsigned long sequence; 56 57 /* protects the pending list. */ 58 spinlock_t lock; 59 60 /* set to non-zero when this thread is already awake and kicking */ 61 int working; 62 63 /* are we currently idle */ 64 int idle; 65 }; 66 67 /* 68 * btrfs_start_workers uses kthread_run, which can block waiting for memory 69 * for a very long time. It will actually throttle on page writeback, 70 * and so it may not make progress until after our btrfs worker threads 71 * process all of the pending work structs in their queue 72 * 73 * This means we can't use btrfs_start_workers from inside a btrfs worker 74 * thread that is used as part of cleaning dirty memory, which pretty much 75 * involves all of the worker threads. 76 * 77 * Instead we have a helper queue who never has more than one thread 78 * where we scheduler thread start operations. This worker_start struct 79 * is used to contain the work and hold a pointer to the queue that needs 80 * another worker. 81 */ 82 struct worker_start { 83 struct btrfs_work work; 84 struct btrfs_workers *queue; 85 }; 86 87 static void start_new_worker_func(struct btrfs_work *work) 88 { 89 struct worker_start *start; 90 start = container_of(work, struct worker_start, work); 91 btrfs_start_workers(start->queue, 1); 92 kfree(start); 93 } 94 95 static int start_new_worker(struct btrfs_workers *queue) 96 { 97 struct worker_start *start; 98 int ret; 99 100 start = kzalloc(sizeof(*start), GFP_NOFS); 101 if (!start) 102 return -ENOMEM; 103 104 start->work.func = start_new_worker_func; 105 start->queue = queue; 106 ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work); 107 if (ret) 108 kfree(start); 109 return ret; 110 } 111 112 /* 113 * helper function to move a thread onto the idle list after it 114 * has finished some requests. 115 */ 116 static void check_idle_worker(struct btrfs_worker_thread *worker) 117 { 118 if (!worker->idle && atomic_read(&worker->num_pending) < 119 worker->workers->idle_thresh / 2) { 120 unsigned long flags; 121 spin_lock_irqsave(&worker->workers->lock, flags); 122 worker->idle = 1; 123 124 /* the list may be empty if the worker is just starting */ 125 if (!list_empty(&worker->worker_list)) { 126 list_move(&worker->worker_list, 127 &worker->workers->idle_list); 128 } 129 spin_unlock_irqrestore(&worker->workers->lock, flags); 130 } 131 } 132 133 /* 134 * helper function to move a thread off the idle list after new 135 * pending work is added. 136 */ 137 static void check_busy_worker(struct btrfs_worker_thread *worker) 138 { 139 if (worker->idle && atomic_read(&worker->num_pending) >= 140 worker->workers->idle_thresh) { 141 unsigned long flags; 142 spin_lock_irqsave(&worker->workers->lock, flags); 143 worker->idle = 0; 144 145 if (!list_empty(&worker->worker_list)) { 146 list_move_tail(&worker->worker_list, 147 &worker->workers->worker_list); 148 } 149 spin_unlock_irqrestore(&worker->workers->lock, flags); 150 } 151 } 152 153 static void check_pending_worker_creates(struct btrfs_worker_thread *worker) 154 { 155 struct btrfs_workers *workers = worker->workers; 156 unsigned long flags; 157 158 rmb(); 159 if (!workers->atomic_start_pending) 160 return; 161 162 spin_lock_irqsave(&workers->lock, flags); 163 if (!workers->atomic_start_pending) 164 goto out; 165 166 workers->atomic_start_pending = 0; 167 if (workers->num_workers + workers->num_workers_starting >= 168 workers->max_workers) 169 goto out; 170 171 workers->num_workers_starting += 1; 172 spin_unlock_irqrestore(&workers->lock, flags); 173 start_new_worker(workers); 174 return; 175 176 out: 177 spin_unlock_irqrestore(&workers->lock, flags); 178 } 179 180 static noinline int run_ordered_completions(struct btrfs_workers *workers, 181 struct btrfs_work *work) 182 { 183 if (!workers->ordered) 184 return 0; 185 186 set_bit(WORK_DONE_BIT, &work->flags); 187 188 spin_lock(&workers->order_lock); 189 190 while (1) { 191 if (!list_empty(&workers->prio_order_list)) { 192 work = list_entry(workers->prio_order_list.next, 193 struct btrfs_work, order_list); 194 } else if (!list_empty(&workers->order_list)) { 195 work = list_entry(workers->order_list.next, 196 struct btrfs_work, order_list); 197 } else { 198 break; 199 } 200 if (!test_bit(WORK_DONE_BIT, &work->flags)) 201 break; 202 203 /* we are going to call the ordered done function, but 204 * we leave the work item on the list as a barrier so 205 * that later work items that are done don't have their 206 * functions called before this one returns 207 */ 208 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 209 break; 210 211 spin_unlock(&workers->order_lock); 212 213 work->ordered_func(work); 214 215 /* now take the lock again and call the freeing code */ 216 spin_lock(&workers->order_lock); 217 list_del(&work->order_list); 218 work->ordered_free(work); 219 } 220 221 spin_unlock(&workers->order_lock); 222 return 0; 223 } 224 225 static void put_worker(struct btrfs_worker_thread *worker) 226 { 227 if (atomic_dec_and_test(&worker->refs)) 228 kfree(worker); 229 } 230 231 static int try_worker_shutdown(struct btrfs_worker_thread *worker) 232 { 233 int freeit = 0; 234 235 spin_lock_irq(&worker->lock); 236 spin_lock(&worker->workers->lock); 237 if (worker->workers->num_workers > 1 && 238 worker->idle && 239 !worker->working && 240 !list_empty(&worker->worker_list) && 241 list_empty(&worker->prio_pending) && 242 list_empty(&worker->pending) && 243 atomic_read(&worker->num_pending) == 0) { 244 freeit = 1; 245 list_del_init(&worker->worker_list); 246 worker->workers->num_workers--; 247 } 248 spin_unlock(&worker->workers->lock); 249 spin_unlock_irq(&worker->lock); 250 251 if (freeit) 252 put_worker(worker); 253 return freeit; 254 } 255 256 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, 257 struct list_head *prio_head, 258 struct list_head *head) 259 { 260 struct btrfs_work *work = NULL; 261 struct list_head *cur = NULL; 262 263 if(!list_empty(prio_head)) 264 cur = prio_head->next; 265 266 smp_mb(); 267 if (!list_empty(&worker->prio_pending)) 268 goto refill; 269 270 if (!list_empty(head)) 271 cur = head->next; 272 273 if (cur) 274 goto out; 275 276 refill: 277 spin_lock_irq(&worker->lock); 278 list_splice_tail_init(&worker->prio_pending, prio_head); 279 list_splice_tail_init(&worker->pending, head); 280 281 if (!list_empty(prio_head)) 282 cur = prio_head->next; 283 else if (!list_empty(head)) 284 cur = head->next; 285 spin_unlock_irq(&worker->lock); 286 287 if (!cur) 288 goto out_fail; 289 290 out: 291 work = list_entry(cur, struct btrfs_work, list); 292 293 out_fail: 294 return work; 295 } 296 297 /* 298 * main loop for servicing work items 299 */ 300 static int worker_loop(void *arg) 301 { 302 struct btrfs_worker_thread *worker = arg; 303 struct list_head head; 304 struct list_head prio_head; 305 struct btrfs_work *work; 306 307 INIT_LIST_HEAD(&head); 308 INIT_LIST_HEAD(&prio_head); 309 310 do { 311 again: 312 while (1) { 313 314 315 work = get_next_work(worker, &prio_head, &head); 316 if (!work) 317 break; 318 319 list_del(&work->list); 320 clear_bit(WORK_QUEUED_BIT, &work->flags); 321 322 work->worker = worker; 323 324 work->func(work); 325 326 atomic_dec(&worker->num_pending); 327 /* 328 * unless this is an ordered work queue, 329 * 'work' was probably freed by func above. 330 */ 331 run_ordered_completions(worker->workers, work); 332 333 check_pending_worker_creates(worker); 334 335 } 336 337 spin_lock_irq(&worker->lock); 338 check_idle_worker(worker); 339 340 if (freezing(current)) { 341 worker->working = 0; 342 spin_unlock_irq(&worker->lock); 343 refrigerator(); 344 } else { 345 spin_unlock_irq(&worker->lock); 346 if (!kthread_should_stop()) { 347 cpu_relax(); 348 /* 349 * we've dropped the lock, did someone else 350 * jump_in? 351 */ 352 smp_mb(); 353 if (!list_empty(&worker->pending) || 354 !list_empty(&worker->prio_pending)) 355 continue; 356 357 /* 358 * this short schedule allows more work to 359 * come in without the queue functions 360 * needing to go through wake_up_process() 361 * 362 * worker->working is still 1, so nobody 363 * is going to try and wake us up 364 */ 365 schedule_timeout(1); 366 smp_mb(); 367 if (!list_empty(&worker->pending) || 368 !list_empty(&worker->prio_pending)) 369 continue; 370 371 if (kthread_should_stop()) 372 break; 373 374 /* still no more work?, sleep for real */ 375 spin_lock_irq(&worker->lock); 376 set_current_state(TASK_INTERRUPTIBLE); 377 if (!list_empty(&worker->pending) || 378 !list_empty(&worker->prio_pending)) { 379 spin_unlock_irq(&worker->lock); 380 set_current_state(TASK_RUNNING); 381 goto again; 382 } 383 384 /* 385 * this makes sure we get a wakeup when someone 386 * adds something new to the queue 387 */ 388 worker->working = 0; 389 spin_unlock_irq(&worker->lock); 390 391 if (!kthread_should_stop()) { 392 schedule_timeout(HZ * 120); 393 if (!worker->working && 394 try_worker_shutdown(worker)) { 395 return 0; 396 } 397 } 398 } 399 __set_current_state(TASK_RUNNING); 400 } 401 } while (!kthread_should_stop()); 402 return 0; 403 } 404 405 /* 406 * this will wait for all the worker threads to shutdown 407 */ 408 int btrfs_stop_workers(struct btrfs_workers *workers) 409 { 410 struct list_head *cur; 411 struct btrfs_worker_thread *worker; 412 int can_stop; 413 414 spin_lock_irq(&workers->lock); 415 list_splice_init(&workers->idle_list, &workers->worker_list); 416 while (!list_empty(&workers->worker_list)) { 417 cur = workers->worker_list.next; 418 worker = list_entry(cur, struct btrfs_worker_thread, 419 worker_list); 420 421 atomic_inc(&worker->refs); 422 workers->num_workers -= 1; 423 if (!list_empty(&worker->worker_list)) { 424 list_del_init(&worker->worker_list); 425 put_worker(worker); 426 can_stop = 1; 427 } else 428 can_stop = 0; 429 spin_unlock_irq(&workers->lock); 430 if (can_stop) 431 kthread_stop(worker->task); 432 spin_lock_irq(&workers->lock); 433 put_worker(worker); 434 } 435 spin_unlock_irq(&workers->lock); 436 return 0; 437 } 438 439 /* 440 * simple init on struct btrfs_workers 441 */ 442 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, 443 struct btrfs_workers *async_helper) 444 { 445 workers->num_workers = 0; 446 workers->num_workers_starting = 0; 447 INIT_LIST_HEAD(&workers->worker_list); 448 INIT_LIST_HEAD(&workers->idle_list); 449 INIT_LIST_HEAD(&workers->order_list); 450 INIT_LIST_HEAD(&workers->prio_order_list); 451 spin_lock_init(&workers->lock); 452 spin_lock_init(&workers->order_lock); 453 workers->max_workers = max; 454 workers->idle_thresh = 32; 455 workers->name = name; 456 workers->ordered = 0; 457 workers->atomic_start_pending = 0; 458 workers->atomic_worker_start = async_helper; 459 } 460 461 /* 462 * starts new worker threads. This does not enforce the max worker 463 * count in case you need to temporarily go past it. 464 */ 465 static int __btrfs_start_workers(struct btrfs_workers *workers, 466 int num_workers) 467 { 468 struct btrfs_worker_thread *worker; 469 int ret = 0; 470 int i; 471 472 for (i = 0; i < num_workers; i++) { 473 worker = kzalloc(sizeof(*worker), GFP_NOFS); 474 if (!worker) { 475 ret = -ENOMEM; 476 goto fail; 477 } 478 479 INIT_LIST_HEAD(&worker->pending); 480 INIT_LIST_HEAD(&worker->prio_pending); 481 INIT_LIST_HEAD(&worker->worker_list); 482 spin_lock_init(&worker->lock); 483 484 atomic_set(&worker->num_pending, 0); 485 atomic_set(&worker->refs, 1); 486 worker->workers = workers; 487 worker->task = kthread_run(worker_loop, worker, 488 "btrfs-%s-%d", workers->name, 489 workers->num_workers + i); 490 if (IS_ERR(worker->task)) { 491 ret = PTR_ERR(worker->task); 492 kfree(worker); 493 goto fail; 494 } 495 spin_lock_irq(&workers->lock); 496 list_add_tail(&worker->worker_list, &workers->idle_list); 497 worker->idle = 1; 498 workers->num_workers++; 499 workers->num_workers_starting--; 500 WARN_ON(workers->num_workers_starting < 0); 501 spin_unlock_irq(&workers->lock); 502 } 503 return 0; 504 fail: 505 btrfs_stop_workers(workers); 506 return ret; 507 } 508 509 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) 510 { 511 spin_lock_irq(&workers->lock); 512 workers->num_workers_starting += num_workers; 513 spin_unlock_irq(&workers->lock); 514 return __btrfs_start_workers(workers, num_workers); 515 } 516 517 /* 518 * run through the list and find a worker thread that doesn't have a lot 519 * to do right now. This can return null if we aren't yet at the thread 520 * count limit and all of the threads are busy. 521 */ 522 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 523 { 524 struct btrfs_worker_thread *worker; 525 struct list_head *next; 526 int enforce_min; 527 528 enforce_min = (workers->num_workers + workers->num_workers_starting) < 529 workers->max_workers; 530 531 /* 532 * if we find an idle thread, don't move it to the end of the 533 * idle list. This improves the chance that the next submission 534 * will reuse the same thread, and maybe catch it while it is still 535 * working 536 */ 537 if (!list_empty(&workers->idle_list)) { 538 next = workers->idle_list.next; 539 worker = list_entry(next, struct btrfs_worker_thread, 540 worker_list); 541 return worker; 542 } 543 if (enforce_min || list_empty(&workers->worker_list)) 544 return NULL; 545 546 /* 547 * if we pick a busy task, move the task to the end of the list. 548 * hopefully this will keep things somewhat evenly balanced. 549 * Do the move in batches based on the sequence number. This groups 550 * requests submitted at roughly the same time onto the same worker. 551 */ 552 next = workers->worker_list.next; 553 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 554 worker->sequence++; 555 556 if (worker->sequence % workers->idle_thresh == 0) 557 list_move_tail(next, &workers->worker_list); 558 return worker; 559 } 560 561 /* 562 * selects a worker thread to take the next job. This will either find 563 * an idle worker, start a new worker up to the max count, or just return 564 * one of the existing busy workers. 565 */ 566 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 567 { 568 struct btrfs_worker_thread *worker; 569 unsigned long flags; 570 struct list_head *fallback; 571 572 again: 573 spin_lock_irqsave(&workers->lock, flags); 574 worker = next_worker(workers); 575 576 if (!worker) { 577 if (workers->num_workers + workers->num_workers_starting >= 578 workers->max_workers) { 579 goto fallback; 580 } else if (workers->atomic_worker_start) { 581 workers->atomic_start_pending = 1; 582 goto fallback; 583 } else { 584 workers->num_workers_starting++; 585 spin_unlock_irqrestore(&workers->lock, flags); 586 /* we're below the limit, start another worker */ 587 __btrfs_start_workers(workers, 1); 588 goto again; 589 } 590 } 591 goto found; 592 593 fallback: 594 fallback = NULL; 595 /* 596 * we have failed to find any workers, just 597 * return the first one we can find. 598 */ 599 if (!list_empty(&workers->worker_list)) 600 fallback = workers->worker_list.next; 601 if (!list_empty(&workers->idle_list)) 602 fallback = workers->idle_list.next; 603 BUG_ON(!fallback); 604 worker = list_entry(fallback, 605 struct btrfs_worker_thread, worker_list); 606 found: 607 /* 608 * this makes sure the worker doesn't exit before it is placed 609 * onto a busy/idle list 610 */ 611 atomic_inc(&worker->num_pending); 612 spin_unlock_irqrestore(&workers->lock, flags); 613 return worker; 614 } 615 616 /* 617 * btrfs_requeue_work just puts the work item back on the tail of the list 618 * it was taken from. It is intended for use with long running work functions 619 * that make some progress and want to give the cpu up for others. 620 */ 621 int btrfs_requeue_work(struct btrfs_work *work) 622 { 623 struct btrfs_worker_thread *worker = work->worker; 624 unsigned long flags; 625 int wake = 0; 626 627 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 628 goto out; 629 630 spin_lock_irqsave(&worker->lock, flags); 631 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 632 list_add_tail(&work->list, &worker->prio_pending); 633 else 634 list_add_tail(&work->list, &worker->pending); 635 atomic_inc(&worker->num_pending); 636 637 /* by definition we're busy, take ourselves off the idle 638 * list 639 */ 640 if (worker->idle) { 641 spin_lock(&worker->workers->lock); 642 worker->idle = 0; 643 list_move_tail(&worker->worker_list, 644 &worker->workers->worker_list); 645 spin_unlock(&worker->workers->lock); 646 } 647 if (!worker->working) { 648 wake = 1; 649 worker->working = 1; 650 } 651 652 if (wake) 653 wake_up_process(worker->task); 654 spin_unlock_irqrestore(&worker->lock, flags); 655 out: 656 657 return 0; 658 } 659 660 void btrfs_set_work_high_prio(struct btrfs_work *work) 661 { 662 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 663 } 664 665 /* 666 * places a struct btrfs_work into the pending queue of one of the kthreads 667 */ 668 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 669 { 670 struct btrfs_worker_thread *worker; 671 unsigned long flags; 672 int wake = 0; 673 674 /* don't requeue something already on a list */ 675 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 676 goto out; 677 678 worker = find_worker(workers); 679 if (workers->ordered) { 680 /* 681 * you're not allowed to do ordered queues from an 682 * interrupt handler 683 */ 684 spin_lock(&workers->order_lock); 685 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 686 list_add_tail(&work->order_list, 687 &workers->prio_order_list); 688 } else { 689 list_add_tail(&work->order_list, &workers->order_list); 690 } 691 spin_unlock(&workers->order_lock); 692 } else { 693 INIT_LIST_HEAD(&work->order_list); 694 } 695 696 spin_lock_irqsave(&worker->lock, flags); 697 698 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 699 list_add_tail(&work->list, &worker->prio_pending); 700 else 701 list_add_tail(&work->list, &worker->pending); 702 check_busy_worker(worker); 703 704 /* 705 * avoid calling into wake_up_process if this thread has already 706 * been kicked 707 */ 708 if (!worker->working) 709 wake = 1; 710 worker->working = 1; 711 712 if (wake) 713 wake_up_process(worker->task); 714 spin_unlock_irqrestore(&worker->lock, flags); 715 716 out: 717 return 0; 718 } 719