1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/slab.h> 21 #include <linux/list.h> 22 #include <linux/spinlock.h> 23 #include <linux/freezer.h> 24 #include "async-thread.h" 25 26 #define WORK_QUEUED_BIT 0 27 #define WORK_DONE_BIT 1 28 #define WORK_ORDER_DONE_BIT 2 29 #define WORK_HIGH_PRIO_BIT 3 30 31 /* 32 * container for the kthread task pointer and the list of pending work 33 * One of these is allocated per thread. 34 */ 35 struct btrfs_worker_thread { 36 /* pool we belong to */ 37 struct btrfs_workers *workers; 38 39 /* list of struct btrfs_work that are waiting for service */ 40 struct list_head pending; 41 struct list_head prio_pending; 42 43 /* list of worker threads from struct btrfs_workers */ 44 struct list_head worker_list; 45 46 /* kthread */ 47 struct task_struct *task; 48 49 /* number of things on the pending list */ 50 atomic_t num_pending; 51 52 /* reference counter for this struct */ 53 atomic_t refs; 54 55 unsigned long sequence; 56 57 /* protects the pending list. */ 58 spinlock_t lock; 59 60 /* set to non-zero when this thread is already awake and kicking */ 61 int working; 62 63 /* are we currently idle */ 64 int idle; 65 }; 66 67 static int __btrfs_start_workers(struct btrfs_workers *workers); 68 69 /* 70 * btrfs_start_workers uses kthread_run, which can block waiting for memory 71 * for a very long time. It will actually throttle on page writeback, 72 * and so it may not make progress until after our btrfs worker threads 73 * process all of the pending work structs in their queue 74 * 75 * This means we can't use btrfs_start_workers from inside a btrfs worker 76 * thread that is used as part of cleaning dirty memory, which pretty much 77 * involves all of the worker threads. 78 * 79 * Instead we have a helper queue who never has more than one thread 80 * where we scheduler thread start operations. This worker_start struct 81 * is used to contain the work and hold a pointer to the queue that needs 82 * another worker. 83 */ 84 struct worker_start { 85 struct btrfs_work work; 86 struct btrfs_workers *queue; 87 }; 88 89 static void start_new_worker_func(struct btrfs_work *work) 90 { 91 struct worker_start *start; 92 start = container_of(work, struct worker_start, work); 93 __btrfs_start_workers(start->queue); 94 kfree(start); 95 } 96 97 /* 98 * helper function to move a thread onto the idle list after it 99 * has finished some requests. 100 */ 101 static void check_idle_worker(struct btrfs_worker_thread *worker) 102 { 103 if (!worker->idle && atomic_read(&worker->num_pending) < 104 worker->workers->idle_thresh / 2) { 105 unsigned long flags; 106 spin_lock_irqsave(&worker->workers->lock, flags); 107 worker->idle = 1; 108 109 /* the list may be empty if the worker is just starting */ 110 if (!list_empty(&worker->worker_list)) { 111 list_move(&worker->worker_list, 112 &worker->workers->idle_list); 113 } 114 spin_unlock_irqrestore(&worker->workers->lock, flags); 115 } 116 } 117 118 /* 119 * helper function to move a thread off the idle list after new 120 * pending work is added. 121 */ 122 static void check_busy_worker(struct btrfs_worker_thread *worker) 123 { 124 if (worker->idle && atomic_read(&worker->num_pending) >= 125 worker->workers->idle_thresh) { 126 unsigned long flags; 127 spin_lock_irqsave(&worker->workers->lock, flags); 128 worker->idle = 0; 129 130 if (!list_empty(&worker->worker_list)) { 131 list_move_tail(&worker->worker_list, 132 &worker->workers->worker_list); 133 } 134 spin_unlock_irqrestore(&worker->workers->lock, flags); 135 } 136 } 137 138 static void check_pending_worker_creates(struct btrfs_worker_thread *worker) 139 { 140 struct btrfs_workers *workers = worker->workers; 141 struct worker_start *start; 142 unsigned long flags; 143 144 rmb(); 145 if (!workers->atomic_start_pending) 146 return; 147 148 start = kzalloc(sizeof(*start), GFP_NOFS); 149 if (!start) 150 return; 151 152 start->work.func = start_new_worker_func; 153 start->queue = workers; 154 155 spin_lock_irqsave(&workers->lock, flags); 156 if (!workers->atomic_start_pending) 157 goto out; 158 159 workers->atomic_start_pending = 0; 160 if (workers->num_workers + workers->num_workers_starting >= 161 workers->max_workers) 162 goto out; 163 164 workers->num_workers_starting += 1; 165 spin_unlock_irqrestore(&workers->lock, flags); 166 btrfs_queue_worker(workers->atomic_worker_start, &start->work); 167 return; 168 169 out: 170 kfree(start); 171 spin_unlock_irqrestore(&workers->lock, flags); 172 } 173 174 static noinline void run_ordered_completions(struct btrfs_workers *workers, 175 struct btrfs_work *work) 176 { 177 if (!workers->ordered) 178 return; 179 180 set_bit(WORK_DONE_BIT, &work->flags); 181 182 spin_lock(&workers->order_lock); 183 184 while (1) { 185 if (!list_empty(&workers->prio_order_list)) { 186 work = list_entry(workers->prio_order_list.next, 187 struct btrfs_work, order_list); 188 } else if (!list_empty(&workers->order_list)) { 189 work = list_entry(workers->order_list.next, 190 struct btrfs_work, order_list); 191 } else { 192 break; 193 } 194 if (!test_bit(WORK_DONE_BIT, &work->flags)) 195 break; 196 197 /* we are going to call the ordered done function, but 198 * we leave the work item on the list as a barrier so 199 * that later work items that are done don't have their 200 * functions called before this one returns 201 */ 202 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 203 break; 204 205 spin_unlock(&workers->order_lock); 206 207 work->ordered_func(work); 208 209 /* now take the lock again and drop our item from the list */ 210 spin_lock(&workers->order_lock); 211 list_del(&work->order_list); 212 spin_unlock(&workers->order_lock); 213 214 /* 215 * we don't want to call the ordered free functions 216 * with the lock held though 217 */ 218 work->ordered_free(work); 219 spin_lock(&workers->order_lock); 220 } 221 222 spin_unlock(&workers->order_lock); 223 } 224 225 static void put_worker(struct btrfs_worker_thread *worker) 226 { 227 if (atomic_dec_and_test(&worker->refs)) 228 kfree(worker); 229 } 230 231 static int try_worker_shutdown(struct btrfs_worker_thread *worker) 232 { 233 int freeit = 0; 234 235 spin_lock_irq(&worker->lock); 236 spin_lock(&worker->workers->lock); 237 if (worker->workers->num_workers > 1 && 238 worker->idle && 239 !worker->working && 240 !list_empty(&worker->worker_list) && 241 list_empty(&worker->prio_pending) && 242 list_empty(&worker->pending) && 243 atomic_read(&worker->num_pending) == 0) { 244 freeit = 1; 245 list_del_init(&worker->worker_list); 246 worker->workers->num_workers--; 247 } 248 spin_unlock(&worker->workers->lock); 249 spin_unlock_irq(&worker->lock); 250 251 if (freeit) 252 put_worker(worker); 253 return freeit; 254 } 255 256 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, 257 struct list_head *prio_head, 258 struct list_head *head) 259 { 260 struct btrfs_work *work = NULL; 261 struct list_head *cur = NULL; 262 263 if(!list_empty(prio_head)) 264 cur = prio_head->next; 265 266 smp_mb(); 267 if (!list_empty(&worker->prio_pending)) 268 goto refill; 269 270 if (!list_empty(head)) 271 cur = head->next; 272 273 if (cur) 274 goto out; 275 276 refill: 277 spin_lock_irq(&worker->lock); 278 list_splice_tail_init(&worker->prio_pending, prio_head); 279 list_splice_tail_init(&worker->pending, head); 280 281 if (!list_empty(prio_head)) 282 cur = prio_head->next; 283 else if (!list_empty(head)) 284 cur = head->next; 285 spin_unlock_irq(&worker->lock); 286 287 if (!cur) 288 goto out_fail; 289 290 out: 291 work = list_entry(cur, struct btrfs_work, list); 292 293 out_fail: 294 return work; 295 } 296 297 /* 298 * main loop for servicing work items 299 */ 300 static int worker_loop(void *arg) 301 { 302 struct btrfs_worker_thread *worker = arg; 303 struct list_head head; 304 struct list_head prio_head; 305 struct btrfs_work *work; 306 307 INIT_LIST_HEAD(&head); 308 INIT_LIST_HEAD(&prio_head); 309 310 do { 311 again: 312 while (1) { 313 314 315 work = get_next_work(worker, &prio_head, &head); 316 if (!work) 317 break; 318 319 list_del(&work->list); 320 clear_bit(WORK_QUEUED_BIT, &work->flags); 321 322 work->worker = worker; 323 324 work->func(work); 325 326 atomic_dec(&worker->num_pending); 327 /* 328 * unless this is an ordered work queue, 329 * 'work' was probably freed by func above. 330 */ 331 run_ordered_completions(worker->workers, work); 332 333 check_pending_worker_creates(worker); 334 cond_resched(); 335 } 336 337 spin_lock_irq(&worker->lock); 338 check_idle_worker(worker); 339 340 if (freezing(current)) { 341 worker->working = 0; 342 spin_unlock_irq(&worker->lock); 343 try_to_freeze(); 344 } else { 345 spin_unlock_irq(&worker->lock); 346 if (!kthread_should_stop()) { 347 cpu_relax(); 348 /* 349 * we've dropped the lock, did someone else 350 * jump_in? 351 */ 352 smp_mb(); 353 if (!list_empty(&worker->pending) || 354 !list_empty(&worker->prio_pending)) 355 continue; 356 357 /* 358 * this short schedule allows more work to 359 * come in without the queue functions 360 * needing to go through wake_up_process() 361 * 362 * worker->working is still 1, so nobody 363 * is going to try and wake us up 364 */ 365 schedule_timeout(1); 366 smp_mb(); 367 if (!list_empty(&worker->pending) || 368 !list_empty(&worker->prio_pending)) 369 continue; 370 371 if (kthread_should_stop()) 372 break; 373 374 /* still no more work?, sleep for real */ 375 spin_lock_irq(&worker->lock); 376 set_current_state(TASK_INTERRUPTIBLE); 377 if (!list_empty(&worker->pending) || 378 !list_empty(&worker->prio_pending)) { 379 spin_unlock_irq(&worker->lock); 380 set_current_state(TASK_RUNNING); 381 goto again; 382 } 383 384 /* 385 * this makes sure we get a wakeup when someone 386 * adds something new to the queue 387 */ 388 worker->working = 0; 389 spin_unlock_irq(&worker->lock); 390 391 if (!kthread_should_stop()) { 392 schedule_timeout(HZ * 120); 393 if (!worker->working && 394 try_worker_shutdown(worker)) { 395 return 0; 396 } 397 } 398 } 399 __set_current_state(TASK_RUNNING); 400 } 401 } while (!kthread_should_stop()); 402 return 0; 403 } 404 405 /* 406 * this will wait for all the worker threads to shutdown 407 */ 408 void btrfs_stop_workers(struct btrfs_workers *workers) 409 { 410 struct list_head *cur; 411 struct btrfs_worker_thread *worker; 412 int can_stop; 413 414 spin_lock_irq(&workers->lock); 415 list_splice_init(&workers->idle_list, &workers->worker_list); 416 while (!list_empty(&workers->worker_list)) { 417 cur = workers->worker_list.next; 418 worker = list_entry(cur, struct btrfs_worker_thread, 419 worker_list); 420 421 atomic_inc(&worker->refs); 422 workers->num_workers -= 1; 423 if (!list_empty(&worker->worker_list)) { 424 list_del_init(&worker->worker_list); 425 put_worker(worker); 426 can_stop = 1; 427 } else 428 can_stop = 0; 429 spin_unlock_irq(&workers->lock); 430 if (can_stop) 431 kthread_stop(worker->task); 432 spin_lock_irq(&workers->lock); 433 put_worker(worker); 434 } 435 spin_unlock_irq(&workers->lock); 436 } 437 438 /* 439 * simple init on struct btrfs_workers 440 */ 441 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, 442 struct btrfs_workers *async_helper) 443 { 444 workers->num_workers = 0; 445 workers->num_workers_starting = 0; 446 INIT_LIST_HEAD(&workers->worker_list); 447 INIT_LIST_HEAD(&workers->idle_list); 448 INIT_LIST_HEAD(&workers->order_list); 449 INIT_LIST_HEAD(&workers->prio_order_list); 450 spin_lock_init(&workers->lock); 451 spin_lock_init(&workers->order_lock); 452 workers->max_workers = max; 453 workers->idle_thresh = 32; 454 workers->name = name; 455 workers->ordered = 0; 456 workers->atomic_start_pending = 0; 457 workers->atomic_worker_start = async_helper; 458 } 459 460 /* 461 * starts new worker threads. This does not enforce the max worker 462 * count in case you need to temporarily go past it. 463 */ 464 static int __btrfs_start_workers(struct btrfs_workers *workers) 465 { 466 struct btrfs_worker_thread *worker; 467 int ret = 0; 468 469 worker = kzalloc(sizeof(*worker), GFP_NOFS); 470 if (!worker) { 471 ret = -ENOMEM; 472 goto fail; 473 } 474 475 INIT_LIST_HEAD(&worker->pending); 476 INIT_LIST_HEAD(&worker->prio_pending); 477 INIT_LIST_HEAD(&worker->worker_list); 478 spin_lock_init(&worker->lock); 479 480 atomic_set(&worker->num_pending, 0); 481 atomic_set(&worker->refs, 1); 482 worker->workers = workers; 483 worker->task = kthread_run(worker_loop, worker, 484 "btrfs-%s-%d", workers->name, 485 workers->num_workers + 1); 486 if (IS_ERR(worker->task)) { 487 ret = PTR_ERR(worker->task); 488 kfree(worker); 489 goto fail; 490 } 491 spin_lock_irq(&workers->lock); 492 list_add_tail(&worker->worker_list, &workers->idle_list); 493 worker->idle = 1; 494 workers->num_workers++; 495 workers->num_workers_starting--; 496 WARN_ON(workers->num_workers_starting < 0); 497 spin_unlock_irq(&workers->lock); 498 499 return 0; 500 fail: 501 spin_lock_irq(&workers->lock); 502 workers->num_workers_starting--; 503 spin_unlock_irq(&workers->lock); 504 return ret; 505 } 506 507 int btrfs_start_workers(struct btrfs_workers *workers) 508 { 509 spin_lock_irq(&workers->lock); 510 workers->num_workers_starting++; 511 spin_unlock_irq(&workers->lock); 512 return __btrfs_start_workers(workers); 513 } 514 515 /* 516 * run through the list and find a worker thread that doesn't have a lot 517 * to do right now. This can return null if we aren't yet at the thread 518 * count limit and all of the threads are busy. 519 */ 520 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 521 { 522 struct btrfs_worker_thread *worker; 523 struct list_head *next; 524 int enforce_min; 525 526 enforce_min = (workers->num_workers + workers->num_workers_starting) < 527 workers->max_workers; 528 529 /* 530 * if we find an idle thread, don't move it to the end of the 531 * idle list. This improves the chance that the next submission 532 * will reuse the same thread, and maybe catch it while it is still 533 * working 534 */ 535 if (!list_empty(&workers->idle_list)) { 536 next = workers->idle_list.next; 537 worker = list_entry(next, struct btrfs_worker_thread, 538 worker_list); 539 return worker; 540 } 541 if (enforce_min || list_empty(&workers->worker_list)) 542 return NULL; 543 544 /* 545 * if we pick a busy task, move the task to the end of the list. 546 * hopefully this will keep things somewhat evenly balanced. 547 * Do the move in batches based on the sequence number. This groups 548 * requests submitted at roughly the same time onto the same worker. 549 */ 550 next = workers->worker_list.next; 551 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 552 worker->sequence++; 553 554 if (worker->sequence % workers->idle_thresh == 0) 555 list_move_tail(next, &workers->worker_list); 556 return worker; 557 } 558 559 /* 560 * selects a worker thread to take the next job. This will either find 561 * an idle worker, start a new worker up to the max count, or just return 562 * one of the existing busy workers. 563 */ 564 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 565 { 566 struct btrfs_worker_thread *worker; 567 unsigned long flags; 568 struct list_head *fallback; 569 int ret; 570 571 spin_lock_irqsave(&workers->lock, flags); 572 again: 573 worker = next_worker(workers); 574 575 if (!worker) { 576 if (workers->num_workers + workers->num_workers_starting >= 577 workers->max_workers) { 578 goto fallback; 579 } else if (workers->atomic_worker_start) { 580 workers->atomic_start_pending = 1; 581 goto fallback; 582 } else { 583 workers->num_workers_starting++; 584 spin_unlock_irqrestore(&workers->lock, flags); 585 /* we're below the limit, start another worker */ 586 ret = __btrfs_start_workers(workers); 587 spin_lock_irqsave(&workers->lock, flags); 588 if (ret) 589 goto fallback; 590 goto again; 591 } 592 } 593 goto found; 594 595 fallback: 596 fallback = NULL; 597 /* 598 * we have failed to find any workers, just 599 * return the first one we can find. 600 */ 601 if (!list_empty(&workers->worker_list)) 602 fallback = workers->worker_list.next; 603 if (!list_empty(&workers->idle_list)) 604 fallback = workers->idle_list.next; 605 BUG_ON(!fallback); 606 worker = list_entry(fallback, 607 struct btrfs_worker_thread, worker_list); 608 found: 609 /* 610 * this makes sure the worker doesn't exit before it is placed 611 * onto a busy/idle list 612 */ 613 atomic_inc(&worker->num_pending); 614 spin_unlock_irqrestore(&workers->lock, flags); 615 return worker; 616 } 617 618 /* 619 * btrfs_requeue_work just puts the work item back on the tail of the list 620 * it was taken from. It is intended for use with long running work functions 621 * that make some progress and want to give the cpu up for others. 622 */ 623 void btrfs_requeue_work(struct btrfs_work *work) 624 { 625 struct btrfs_worker_thread *worker = work->worker; 626 unsigned long flags; 627 int wake = 0; 628 629 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 630 return; 631 632 spin_lock_irqsave(&worker->lock, flags); 633 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 634 list_add_tail(&work->list, &worker->prio_pending); 635 else 636 list_add_tail(&work->list, &worker->pending); 637 atomic_inc(&worker->num_pending); 638 639 /* by definition we're busy, take ourselves off the idle 640 * list 641 */ 642 if (worker->idle) { 643 spin_lock(&worker->workers->lock); 644 worker->idle = 0; 645 list_move_tail(&worker->worker_list, 646 &worker->workers->worker_list); 647 spin_unlock(&worker->workers->lock); 648 } 649 if (!worker->working) { 650 wake = 1; 651 worker->working = 1; 652 } 653 654 if (wake) 655 wake_up_process(worker->task); 656 spin_unlock_irqrestore(&worker->lock, flags); 657 } 658 659 void btrfs_set_work_high_prio(struct btrfs_work *work) 660 { 661 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 662 } 663 664 /* 665 * places a struct btrfs_work into the pending queue of one of the kthreads 666 */ 667 void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 668 { 669 struct btrfs_worker_thread *worker; 670 unsigned long flags; 671 int wake = 0; 672 673 /* don't requeue something already on a list */ 674 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 675 return; 676 677 worker = find_worker(workers); 678 if (workers->ordered) { 679 /* 680 * you're not allowed to do ordered queues from an 681 * interrupt handler 682 */ 683 spin_lock(&workers->order_lock); 684 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 685 list_add_tail(&work->order_list, 686 &workers->prio_order_list); 687 } else { 688 list_add_tail(&work->order_list, &workers->order_list); 689 } 690 spin_unlock(&workers->order_lock); 691 } else { 692 INIT_LIST_HEAD(&work->order_list); 693 } 694 695 spin_lock_irqsave(&worker->lock, flags); 696 697 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 698 list_add_tail(&work->list, &worker->prio_pending); 699 else 700 list_add_tail(&work->list, &worker->pending); 701 check_busy_worker(worker); 702 703 /* 704 * avoid calling into wake_up_process if this thread has already 705 * been kicked 706 */ 707 if (!worker->working) 708 wake = 1; 709 worker->working = 1; 710 711 if (wake) 712 wake_up_process(worker->task); 713 spin_unlock_irqrestore(&worker->lock, flags); 714 } 715