1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/slab.h> 21 #include <linux/list.h> 22 #include <linux/spinlock.h> 23 #include <linux/freezer.h> 24 #include "async-thread.h" 25 26 #define WORK_QUEUED_BIT 0 27 #define WORK_DONE_BIT 1 28 #define WORK_ORDER_DONE_BIT 2 29 #define WORK_HIGH_PRIO_BIT 3 30 31 /* 32 * container for the kthread task pointer and the list of pending work 33 * One of these is allocated per thread. 34 */ 35 struct btrfs_worker_thread { 36 /* pool we belong to */ 37 struct btrfs_workers *workers; 38 39 /* list of struct btrfs_work that are waiting for service */ 40 struct list_head pending; 41 struct list_head prio_pending; 42 43 /* list of worker threads from struct btrfs_workers */ 44 struct list_head worker_list; 45 46 /* kthread */ 47 struct task_struct *task; 48 49 /* number of things on the pending list */ 50 atomic_t num_pending; 51 52 /* reference counter for this struct */ 53 atomic_t refs; 54 55 unsigned long sequence; 56 57 /* protects the pending list. */ 58 spinlock_t lock; 59 60 /* set to non-zero when this thread is already awake and kicking */ 61 int working; 62 63 /* are we currently idle */ 64 int idle; 65 }; 66 67 static int __btrfs_start_workers(struct btrfs_workers *workers); 68 69 /* 70 * btrfs_start_workers uses kthread_run, which can block waiting for memory 71 * for a very long time. It will actually throttle on page writeback, 72 * and so it may not make progress until after our btrfs worker threads 73 * process all of the pending work structs in their queue 74 * 75 * This means we can't use btrfs_start_workers from inside a btrfs worker 76 * thread that is used as part of cleaning dirty memory, which pretty much 77 * involves all of the worker threads. 78 * 79 * Instead we have a helper queue who never has more than one thread 80 * where we scheduler thread start operations. This worker_start struct 81 * is used to contain the work and hold a pointer to the queue that needs 82 * another worker. 83 */ 84 struct worker_start { 85 struct btrfs_work work; 86 struct btrfs_workers *queue; 87 }; 88 89 static void start_new_worker_func(struct btrfs_work *work) 90 { 91 struct worker_start *start; 92 start = container_of(work, struct worker_start, work); 93 __btrfs_start_workers(start->queue); 94 kfree(start); 95 } 96 97 /* 98 * helper function to move a thread onto the idle list after it 99 * has finished some requests. 100 */ 101 static void check_idle_worker(struct btrfs_worker_thread *worker) 102 { 103 if (!worker->idle && atomic_read(&worker->num_pending) < 104 worker->workers->idle_thresh / 2) { 105 unsigned long flags; 106 spin_lock_irqsave(&worker->workers->lock, flags); 107 worker->idle = 1; 108 109 /* the list may be empty if the worker is just starting */ 110 if (!list_empty(&worker->worker_list)) { 111 list_move(&worker->worker_list, 112 &worker->workers->idle_list); 113 } 114 spin_unlock_irqrestore(&worker->workers->lock, flags); 115 } 116 } 117 118 /* 119 * helper function to move a thread off the idle list after new 120 * pending work is added. 121 */ 122 static void check_busy_worker(struct btrfs_worker_thread *worker) 123 { 124 if (worker->idle && atomic_read(&worker->num_pending) >= 125 worker->workers->idle_thresh) { 126 unsigned long flags; 127 spin_lock_irqsave(&worker->workers->lock, flags); 128 worker->idle = 0; 129 130 if (!list_empty(&worker->worker_list)) { 131 list_move_tail(&worker->worker_list, 132 &worker->workers->worker_list); 133 } 134 spin_unlock_irqrestore(&worker->workers->lock, flags); 135 } 136 } 137 138 static void check_pending_worker_creates(struct btrfs_worker_thread *worker) 139 { 140 struct btrfs_workers *workers = worker->workers; 141 struct worker_start *start; 142 unsigned long flags; 143 144 rmb(); 145 if (!workers->atomic_start_pending) 146 return; 147 148 start = kzalloc(sizeof(*start), GFP_NOFS); 149 if (!start) 150 return; 151 152 start->work.func = start_new_worker_func; 153 start->queue = workers; 154 155 spin_lock_irqsave(&workers->lock, flags); 156 if (!workers->atomic_start_pending) 157 goto out; 158 159 workers->atomic_start_pending = 0; 160 if (workers->num_workers + workers->num_workers_starting >= 161 workers->max_workers) 162 goto out; 163 164 workers->num_workers_starting += 1; 165 spin_unlock_irqrestore(&workers->lock, flags); 166 btrfs_queue_worker(workers->atomic_worker_start, &start->work); 167 return; 168 169 out: 170 kfree(start); 171 spin_unlock_irqrestore(&workers->lock, flags); 172 } 173 174 static noinline void run_ordered_completions(struct btrfs_workers *workers, 175 struct btrfs_work *work) 176 { 177 if (!workers->ordered) 178 return; 179 180 set_bit(WORK_DONE_BIT, &work->flags); 181 182 spin_lock(&workers->order_lock); 183 184 while (1) { 185 if (!list_empty(&workers->prio_order_list)) { 186 work = list_entry(workers->prio_order_list.next, 187 struct btrfs_work, order_list); 188 } else if (!list_empty(&workers->order_list)) { 189 work = list_entry(workers->order_list.next, 190 struct btrfs_work, order_list); 191 } else { 192 break; 193 } 194 if (!test_bit(WORK_DONE_BIT, &work->flags)) 195 break; 196 197 /* we are going to call the ordered done function, but 198 * we leave the work item on the list as a barrier so 199 * that later work items that are done don't have their 200 * functions called before this one returns 201 */ 202 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 203 break; 204 205 spin_unlock(&workers->order_lock); 206 207 work->ordered_func(work); 208 209 /* now take the lock again and call the freeing code */ 210 spin_lock(&workers->order_lock); 211 list_del(&work->order_list); 212 work->ordered_free(work); 213 } 214 215 spin_unlock(&workers->order_lock); 216 } 217 218 static void put_worker(struct btrfs_worker_thread *worker) 219 { 220 if (atomic_dec_and_test(&worker->refs)) 221 kfree(worker); 222 } 223 224 static int try_worker_shutdown(struct btrfs_worker_thread *worker) 225 { 226 int freeit = 0; 227 228 spin_lock_irq(&worker->lock); 229 spin_lock(&worker->workers->lock); 230 if (worker->workers->num_workers > 1 && 231 worker->idle && 232 !worker->working && 233 !list_empty(&worker->worker_list) && 234 list_empty(&worker->prio_pending) && 235 list_empty(&worker->pending) && 236 atomic_read(&worker->num_pending) == 0) { 237 freeit = 1; 238 list_del_init(&worker->worker_list); 239 worker->workers->num_workers--; 240 } 241 spin_unlock(&worker->workers->lock); 242 spin_unlock_irq(&worker->lock); 243 244 if (freeit) 245 put_worker(worker); 246 return freeit; 247 } 248 249 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker, 250 struct list_head *prio_head, 251 struct list_head *head) 252 { 253 struct btrfs_work *work = NULL; 254 struct list_head *cur = NULL; 255 256 if(!list_empty(prio_head)) 257 cur = prio_head->next; 258 259 smp_mb(); 260 if (!list_empty(&worker->prio_pending)) 261 goto refill; 262 263 if (!list_empty(head)) 264 cur = head->next; 265 266 if (cur) 267 goto out; 268 269 refill: 270 spin_lock_irq(&worker->lock); 271 list_splice_tail_init(&worker->prio_pending, prio_head); 272 list_splice_tail_init(&worker->pending, head); 273 274 if (!list_empty(prio_head)) 275 cur = prio_head->next; 276 else if (!list_empty(head)) 277 cur = head->next; 278 spin_unlock_irq(&worker->lock); 279 280 if (!cur) 281 goto out_fail; 282 283 out: 284 work = list_entry(cur, struct btrfs_work, list); 285 286 out_fail: 287 return work; 288 } 289 290 /* 291 * main loop for servicing work items 292 */ 293 static int worker_loop(void *arg) 294 { 295 struct btrfs_worker_thread *worker = arg; 296 struct list_head head; 297 struct list_head prio_head; 298 struct btrfs_work *work; 299 300 INIT_LIST_HEAD(&head); 301 INIT_LIST_HEAD(&prio_head); 302 303 do { 304 again: 305 while (1) { 306 307 308 work = get_next_work(worker, &prio_head, &head); 309 if (!work) 310 break; 311 312 list_del(&work->list); 313 clear_bit(WORK_QUEUED_BIT, &work->flags); 314 315 work->worker = worker; 316 317 work->func(work); 318 319 atomic_dec(&worker->num_pending); 320 /* 321 * unless this is an ordered work queue, 322 * 'work' was probably freed by func above. 323 */ 324 run_ordered_completions(worker->workers, work); 325 326 check_pending_worker_creates(worker); 327 cond_resched(); 328 } 329 330 spin_lock_irq(&worker->lock); 331 check_idle_worker(worker); 332 333 if (freezing(current)) { 334 worker->working = 0; 335 spin_unlock_irq(&worker->lock); 336 try_to_freeze(); 337 } else { 338 spin_unlock_irq(&worker->lock); 339 if (!kthread_should_stop()) { 340 cpu_relax(); 341 /* 342 * we've dropped the lock, did someone else 343 * jump_in? 344 */ 345 smp_mb(); 346 if (!list_empty(&worker->pending) || 347 !list_empty(&worker->prio_pending)) 348 continue; 349 350 /* 351 * this short schedule allows more work to 352 * come in without the queue functions 353 * needing to go through wake_up_process() 354 * 355 * worker->working is still 1, so nobody 356 * is going to try and wake us up 357 */ 358 schedule_timeout(1); 359 smp_mb(); 360 if (!list_empty(&worker->pending) || 361 !list_empty(&worker->prio_pending)) 362 continue; 363 364 if (kthread_should_stop()) 365 break; 366 367 /* still no more work?, sleep for real */ 368 spin_lock_irq(&worker->lock); 369 set_current_state(TASK_INTERRUPTIBLE); 370 if (!list_empty(&worker->pending) || 371 !list_empty(&worker->prio_pending)) { 372 spin_unlock_irq(&worker->lock); 373 set_current_state(TASK_RUNNING); 374 goto again; 375 } 376 377 /* 378 * this makes sure we get a wakeup when someone 379 * adds something new to the queue 380 */ 381 worker->working = 0; 382 spin_unlock_irq(&worker->lock); 383 384 if (!kthread_should_stop()) { 385 schedule_timeout(HZ * 120); 386 if (!worker->working && 387 try_worker_shutdown(worker)) { 388 return 0; 389 } 390 } 391 } 392 __set_current_state(TASK_RUNNING); 393 } 394 } while (!kthread_should_stop()); 395 return 0; 396 } 397 398 /* 399 * this will wait for all the worker threads to shutdown 400 */ 401 void btrfs_stop_workers(struct btrfs_workers *workers) 402 { 403 struct list_head *cur; 404 struct btrfs_worker_thread *worker; 405 int can_stop; 406 407 spin_lock_irq(&workers->lock); 408 list_splice_init(&workers->idle_list, &workers->worker_list); 409 while (!list_empty(&workers->worker_list)) { 410 cur = workers->worker_list.next; 411 worker = list_entry(cur, struct btrfs_worker_thread, 412 worker_list); 413 414 atomic_inc(&worker->refs); 415 workers->num_workers -= 1; 416 if (!list_empty(&worker->worker_list)) { 417 list_del_init(&worker->worker_list); 418 put_worker(worker); 419 can_stop = 1; 420 } else 421 can_stop = 0; 422 spin_unlock_irq(&workers->lock); 423 if (can_stop) 424 kthread_stop(worker->task); 425 spin_lock_irq(&workers->lock); 426 put_worker(worker); 427 } 428 spin_unlock_irq(&workers->lock); 429 } 430 431 /* 432 * simple init on struct btrfs_workers 433 */ 434 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max, 435 struct btrfs_workers *async_helper) 436 { 437 workers->num_workers = 0; 438 workers->num_workers_starting = 0; 439 INIT_LIST_HEAD(&workers->worker_list); 440 INIT_LIST_HEAD(&workers->idle_list); 441 INIT_LIST_HEAD(&workers->order_list); 442 INIT_LIST_HEAD(&workers->prio_order_list); 443 spin_lock_init(&workers->lock); 444 spin_lock_init(&workers->order_lock); 445 workers->max_workers = max; 446 workers->idle_thresh = 32; 447 workers->name = name; 448 workers->ordered = 0; 449 workers->atomic_start_pending = 0; 450 workers->atomic_worker_start = async_helper; 451 } 452 453 /* 454 * starts new worker threads. This does not enforce the max worker 455 * count in case you need to temporarily go past it. 456 */ 457 static int __btrfs_start_workers(struct btrfs_workers *workers) 458 { 459 struct btrfs_worker_thread *worker; 460 int ret = 0; 461 462 worker = kzalloc(sizeof(*worker), GFP_NOFS); 463 if (!worker) { 464 ret = -ENOMEM; 465 goto fail; 466 } 467 468 INIT_LIST_HEAD(&worker->pending); 469 INIT_LIST_HEAD(&worker->prio_pending); 470 INIT_LIST_HEAD(&worker->worker_list); 471 spin_lock_init(&worker->lock); 472 473 atomic_set(&worker->num_pending, 0); 474 atomic_set(&worker->refs, 1); 475 worker->workers = workers; 476 worker->task = kthread_run(worker_loop, worker, 477 "btrfs-%s-%d", workers->name, 478 workers->num_workers + 1); 479 if (IS_ERR(worker->task)) { 480 ret = PTR_ERR(worker->task); 481 kfree(worker); 482 goto fail; 483 } 484 spin_lock_irq(&workers->lock); 485 list_add_tail(&worker->worker_list, &workers->idle_list); 486 worker->idle = 1; 487 workers->num_workers++; 488 workers->num_workers_starting--; 489 WARN_ON(workers->num_workers_starting < 0); 490 spin_unlock_irq(&workers->lock); 491 492 return 0; 493 fail: 494 spin_lock_irq(&workers->lock); 495 workers->num_workers_starting--; 496 spin_unlock_irq(&workers->lock); 497 return ret; 498 } 499 500 int btrfs_start_workers(struct btrfs_workers *workers) 501 { 502 spin_lock_irq(&workers->lock); 503 workers->num_workers_starting++; 504 spin_unlock_irq(&workers->lock); 505 return __btrfs_start_workers(workers); 506 } 507 508 /* 509 * run through the list and find a worker thread that doesn't have a lot 510 * to do right now. This can return null if we aren't yet at the thread 511 * count limit and all of the threads are busy. 512 */ 513 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 514 { 515 struct btrfs_worker_thread *worker; 516 struct list_head *next; 517 int enforce_min; 518 519 enforce_min = (workers->num_workers + workers->num_workers_starting) < 520 workers->max_workers; 521 522 /* 523 * if we find an idle thread, don't move it to the end of the 524 * idle list. This improves the chance that the next submission 525 * will reuse the same thread, and maybe catch it while it is still 526 * working 527 */ 528 if (!list_empty(&workers->idle_list)) { 529 next = workers->idle_list.next; 530 worker = list_entry(next, struct btrfs_worker_thread, 531 worker_list); 532 return worker; 533 } 534 if (enforce_min || list_empty(&workers->worker_list)) 535 return NULL; 536 537 /* 538 * if we pick a busy task, move the task to the end of the list. 539 * hopefully this will keep things somewhat evenly balanced. 540 * Do the move in batches based on the sequence number. This groups 541 * requests submitted at roughly the same time onto the same worker. 542 */ 543 next = workers->worker_list.next; 544 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 545 worker->sequence++; 546 547 if (worker->sequence % workers->idle_thresh == 0) 548 list_move_tail(next, &workers->worker_list); 549 return worker; 550 } 551 552 /* 553 * selects a worker thread to take the next job. This will either find 554 * an idle worker, start a new worker up to the max count, or just return 555 * one of the existing busy workers. 556 */ 557 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 558 { 559 struct btrfs_worker_thread *worker; 560 unsigned long flags; 561 struct list_head *fallback; 562 int ret; 563 564 spin_lock_irqsave(&workers->lock, flags); 565 again: 566 worker = next_worker(workers); 567 568 if (!worker) { 569 if (workers->num_workers + workers->num_workers_starting >= 570 workers->max_workers) { 571 goto fallback; 572 } else if (workers->atomic_worker_start) { 573 workers->atomic_start_pending = 1; 574 goto fallback; 575 } else { 576 workers->num_workers_starting++; 577 spin_unlock_irqrestore(&workers->lock, flags); 578 /* we're below the limit, start another worker */ 579 ret = __btrfs_start_workers(workers); 580 spin_lock_irqsave(&workers->lock, flags); 581 if (ret) 582 goto fallback; 583 goto again; 584 } 585 } 586 goto found; 587 588 fallback: 589 fallback = NULL; 590 /* 591 * we have failed to find any workers, just 592 * return the first one we can find. 593 */ 594 if (!list_empty(&workers->worker_list)) 595 fallback = workers->worker_list.next; 596 if (!list_empty(&workers->idle_list)) 597 fallback = workers->idle_list.next; 598 BUG_ON(!fallback); 599 worker = list_entry(fallback, 600 struct btrfs_worker_thread, worker_list); 601 found: 602 /* 603 * this makes sure the worker doesn't exit before it is placed 604 * onto a busy/idle list 605 */ 606 atomic_inc(&worker->num_pending); 607 spin_unlock_irqrestore(&workers->lock, flags); 608 return worker; 609 } 610 611 /* 612 * btrfs_requeue_work just puts the work item back on the tail of the list 613 * it was taken from. It is intended for use with long running work functions 614 * that make some progress and want to give the cpu up for others. 615 */ 616 void btrfs_requeue_work(struct btrfs_work *work) 617 { 618 struct btrfs_worker_thread *worker = work->worker; 619 unsigned long flags; 620 int wake = 0; 621 622 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 623 return; 624 625 spin_lock_irqsave(&worker->lock, flags); 626 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 627 list_add_tail(&work->list, &worker->prio_pending); 628 else 629 list_add_tail(&work->list, &worker->pending); 630 atomic_inc(&worker->num_pending); 631 632 /* by definition we're busy, take ourselves off the idle 633 * list 634 */ 635 if (worker->idle) { 636 spin_lock(&worker->workers->lock); 637 worker->idle = 0; 638 list_move_tail(&worker->worker_list, 639 &worker->workers->worker_list); 640 spin_unlock(&worker->workers->lock); 641 } 642 if (!worker->working) { 643 wake = 1; 644 worker->working = 1; 645 } 646 647 if (wake) 648 wake_up_process(worker->task); 649 spin_unlock_irqrestore(&worker->lock, flags); 650 } 651 652 void btrfs_set_work_high_prio(struct btrfs_work *work) 653 { 654 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 655 } 656 657 /* 658 * places a struct btrfs_work into the pending queue of one of the kthreads 659 */ 660 void btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 661 { 662 struct btrfs_worker_thread *worker; 663 unsigned long flags; 664 int wake = 0; 665 666 /* don't requeue something already on a list */ 667 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 668 return; 669 670 worker = find_worker(workers); 671 if (workers->ordered) { 672 /* 673 * you're not allowed to do ordered queues from an 674 * interrupt handler 675 */ 676 spin_lock(&workers->order_lock); 677 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 678 list_add_tail(&work->order_list, 679 &workers->prio_order_list); 680 } else { 681 list_add_tail(&work->order_list, &workers->order_list); 682 } 683 spin_unlock(&workers->order_lock); 684 } else { 685 INIT_LIST_HEAD(&work->order_list); 686 } 687 688 spin_lock_irqsave(&worker->lock, flags); 689 690 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 691 list_add_tail(&work->list, &worker->prio_pending); 692 else 693 list_add_tail(&work->list, &worker->pending); 694 check_busy_worker(worker); 695 696 /* 697 * avoid calling into wake_up_process if this thread has already 698 * been kicked 699 */ 700 if (!worker->working) 701 wake = 1; 702 worker->working = 1; 703 704 if (wake) 705 wake_up_process(worker->task); 706 spin_unlock_irqrestore(&worker->lock, flags); 707 } 708