1 /* 2 * Copyright (C) 2007 Oracle. All rights reserved. 3 * 4 * This program is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU General Public 6 * License v2 as published by the Free Software Foundation. 7 * 8 * This program is distributed in the hope that it will be useful, 9 * but WITHOUT ANY WARRANTY; without even the implied warranty of 10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 * General Public License for more details. 12 * 13 * You should have received a copy of the GNU General Public 14 * License along with this program; if not, write to the 15 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 16 * Boston, MA 021110-1307, USA. 17 */ 18 19 #include <linux/kthread.h> 20 #include <linux/list.h> 21 #include <linux/spinlock.h> 22 #include <linux/freezer.h> 23 #include "async-thread.h" 24 25 #define WORK_QUEUED_BIT 0 26 #define WORK_DONE_BIT 1 27 #define WORK_ORDER_DONE_BIT 2 28 #define WORK_HIGH_PRIO_BIT 3 29 30 /* 31 * container for the kthread task pointer and the list of pending work 32 * One of these is allocated per thread. 33 */ 34 struct btrfs_worker_thread { 35 /* pool we belong to */ 36 struct btrfs_workers *workers; 37 38 /* list of struct btrfs_work that are waiting for service */ 39 struct list_head pending; 40 struct list_head prio_pending; 41 42 /* list of worker threads from struct btrfs_workers */ 43 struct list_head worker_list; 44 45 /* kthread */ 46 struct task_struct *task; 47 48 /* number of things on the pending list */ 49 atomic_t num_pending; 50 51 unsigned long sequence; 52 53 /* protects the pending list. */ 54 spinlock_t lock; 55 56 /* set to non-zero when this thread is already awake and kicking */ 57 int working; 58 59 /* are we currently idle */ 60 int idle; 61 }; 62 63 /* 64 * helper function to move a thread onto the idle list after it 65 * has finished some requests. 66 */ 67 static void check_idle_worker(struct btrfs_worker_thread *worker) 68 { 69 if (!worker->idle && atomic_read(&worker->num_pending) < 70 worker->workers->idle_thresh / 2) { 71 unsigned long flags; 72 spin_lock_irqsave(&worker->workers->lock, flags); 73 worker->idle = 1; 74 list_move(&worker->worker_list, &worker->workers->idle_list); 75 spin_unlock_irqrestore(&worker->workers->lock, flags); 76 } 77 } 78 79 /* 80 * helper function to move a thread off the idle list after new 81 * pending work is added. 82 */ 83 static void check_busy_worker(struct btrfs_worker_thread *worker) 84 { 85 if (worker->idle && atomic_read(&worker->num_pending) >= 86 worker->workers->idle_thresh) { 87 unsigned long flags; 88 spin_lock_irqsave(&worker->workers->lock, flags); 89 worker->idle = 0; 90 list_move_tail(&worker->worker_list, 91 &worker->workers->worker_list); 92 spin_unlock_irqrestore(&worker->workers->lock, flags); 93 } 94 } 95 96 static noinline int run_ordered_completions(struct btrfs_workers *workers, 97 struct btrfs_work *work) 98 { 99 unsigned long flags; 100 101 if (!workers->ordered) 102 return 0; 103 104 set_bit(WORK_DONE_BIT, &work->flags); 105 106 spin_lock_irqsave(&workers->lock, flags); 107 108 while (1) { 109 if (!list_empty(&workers->prio_order_list)) { 110 work = list_entry(workers->prio_order_list.next, 111 struct btrfs_work, order_list); 112 } else if (!list_empty(&workers->order_list)) { 113 work = list_entry(workers->order_list.next, 114 struct btrfs_work, order_list); 115 } else { 116 break; 117 } 118 if (!test_bit(WORK_DONE_BIT, &work->flags)) 119 break; 120 121 /* we are going to call the ordered done function, but 122 * we leave the work item on the list as a barrier so 123 * that later work items that are done don't have their 124 * functions called before this one returns 125 */ 126 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags)) 127 break; 128 129 spin_unlock_irqrestore(&workers->lock, flags); 130 131 work->ordered_func(work); 132 133 /* now take the lock again and call the freeing code */ 134 spin_lock_irqsave(&workers->lock, flags); 135 list_del(&work->order_list); 136 work->ordered_free(work); 137 } 138 139 spin_unlock_irqrestore(&workers->lock, flags); 140 return 0; 141 } 142 143 /* 144 * main loop for servicing work items 145 */ 146 static int worker_loop(void *arg) 147 { 148 struct btrfs_worker_thread *worker = arg; 149 struct list_head *cur; 150 struct btrfs_work *work; 151 do { 152 spin_lock_irq(&worker->lock); 153 again_locked: 154 while (1) { 155 if (!list_empty(&worker->prio_pending)) 156 cur = worker->prio_pending.next; 157 else if (!list_empty(&worker->pending)) 158 cur = worker->pending.next; 159 else 160 break; 161 162 work = list_entry(cur, struct btrfs_work, list); 163 list_del(&work->list); 164 clear_bit(WORK_QUEUED_BIT, &work->flags); 165 166 work->worker = worker; 167 spin_unlock_irq(&worker->lock); 168 169 work->func(work); 170 171 atomic_dec(&worker->num_pending); 172 /* 173 * unless this is an ordered work queue, 174 * 'work' was probably freed by func above. 175 */ 176 run_ordered_completions(worker->workers, work); 177 178 spin_lock_irq(&worker->lock); 179 check_idle_worker(worker); 180 } 181 if (freezing(current)) { 182 worker->working = 0; 183 spin_unlock_irq(&worker->lock); 184 refrigerator(); 185 } else { 186 spin_unlock_irq(&worker->lock); 187 if (!kthread_should_stop()) { 188 cpu_relax(); 189 /* 190 * we've dropped the lock, did someone else 191 * jump_in? 192 */ 193 smp_mb(); 194 if (!list_empty(&worker->pending) || 195 !list_empty(&worker->prio_pending)) 196 continue; 197 198 /* 199 * this short schedule allows more work to 200 * come in without the queue functions 201 * needing to go through wake_up_process() 202 * 203 * worker->working is still 1, so nobody 204 * is going to try and wake us up 205 */ 206 schedule_timeout(1); 207 smp_mb(); 208 if (!list_empty(&worker->pending) || 209 !list_empty(&worker->prio_pending)) 210 continue; 211 212 if (kthread_should_stop()) 213 break; 214 215 /* still no more work?, sleep for real */ 216 spin_lock_irq(&worker->lock); 217 set_current_state(TASK_INTERRUPTIBLE); 218 if (!list_empty(&worker->pending) || 219 !list_empty(&worker->prio_pending)) 220 goto again_locked; 221 222 /* 223 * this makes sure we get a wakeup when someone 224 * adds something new to the queue 225 */ 226 worker->working = 0; 227 spin_unlock_irq(&worker->lock); 228 229 if (!kthread_should_stop()) 230 schedule(); 231 } 232 __set_current_state(TASK_RUNNING); 233 } 234 } while (!kthread_should_stop()); 235 return 0; 236 } 237 238 /* 239 * this will wait for all the worker threads to shutdown 240 */ 241 int btrfs_stop_workers(struct btrfs_workers *workers) 242 { 243 struct list_head *cur; 244 struct btrfs_worker_thread *worker; 245 246 list_splice_init(&workers->idle_list, &workers->worker_list); 247 while (!list_empty(&workers->worker_list)) { 248 cur = workers->worker_list.next; 249 worker = list_entry(cur, struct btrfs_worker_thread, 250 worker_list); 251 kthread_stop(worker->task); 252 list_del(&worker->worker_list); 253 kfree(worker); 254 } 255 return 0; 256 } 257 258 /* 259 * simple init on struct btrfs_workers 260 */ 261 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max) 262 { 263 workers->num_workers = 0; 264 INIT_LIST_HEAD(&workers->worker_list); 265 INIT_LIST_HEAD(&workers->idle_list); 266 INIT_LIST_HEAD(&workers->order_list); 267 INIT_LIST_HEAD(&workers->prio_order_list); 268 spin_lock_init(&workers->lock); 269 workers->max_workers = max; 270 workers->idle_thresh = 32; 271 workers->name = name; 272 workers->ordered = 0; 273 } 274 275 /* 276 * starts new worker threads. This does not enforce the max worker 277 * count in case you need to temporarily go past it. 278 */ 279 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers) 280 { 281 struct btrfs_worker_thread *worker; 282 int ret = 0; 283 int i; 284 285 for (i = 0; i < num_workers; i++) { 286 worker = kzalloc(sizeof(*worker), GFP_NOFS); 287 if (!worker) { 288 ret = -ENOMEM; 289 goto fail; 290 } 291 292 INIT_LIST_HEAD(&worker->pending); 293 INIT_LIST_HEAD(&worker->prio_pending); 294 INIT_LIST_HEAD(&worker->worker_list); 295 spin_lock_init(&worker->lock); 296 atomic_set(&worker->num_pending, 0); 297 worker->task = kthread_run(worker_loop, worker, 298 "btrfs-%s-%d", workers->name, 299 workers->num_workers + i); 300 worker->workers = workers; 301 if (IS_ERR(worker->task)) { 302 kfree(worker); 303 ret = PTR_ERR(worker->task); 304 goto fail; 305 } 306 307 spin_lock_irq(&workers->lock); 308 list_add_tail(&worker->worker_list, &workers->idle_list); 309 worker->idle = 1; 310 workers->num_workers++; 311 spin_unlock_irq(&workers->lock); 312 } 313 return 0; 314 fail: 315 btrfs_stop_workers(workers); 316 return ret; 317 } 318 319 /* 320 * run through the list and find a worker thread that doesn't have a lot 321 * to do right now. This can return null if we aren't yet at the thread 322 * count limit and all of the threads are busy. 323 */ 324 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers) 325 { 326 struct btrfs_worker_thread *worker; 327 struct list_head *next; 328 int enforce_min = workers->num_workers < workers->max_workers; 329 330 /* 331 * if we find an idle thread, don't move it to the end of the 332 * idle list. This improves the chance that the next submission 333 * will reuse the same thread, and maybe catch it while it is still 334 * working 335 */ 336 if (!list_empty(&workers->idle_list)) { 337 next = workers->idle_list.next; 338 worker = list_entry(next, struct btrfs_worker_thread, 339 worker_list); 340 return worker; 341 } 342 if (enforce_min || list_empty(&workers->worker_list)) 343 return NULL; 344 345 /* 346 * if we pick a busy task, move the task to the end of the list. 347 * hopefully this will keep things somewhat evenly balanced. 348 * Do the move in batches based on the sequence number. This groups 349 * requests submitted at roughly the same time onto the same worker. 350 */ 351 next = workers->worker_list.next; 352 worker = list_entry(next, struct btrfs_worker_thread, worker_list); 353 atomic_inc(&worker->num_pending); 354 worker->sequence++; 355 356 if (worker->sequence % workers->idle_thresh == 0) 357 list_move_tail(next, &workers->worker_list); 358 return worker; 359 } 360 361 /* 362 * selects a worker thread to take the next job. This will either find 363 * an idle worker, start a new worker up to the max count, or just return 364 * one of the existing busy workers. 365 */ 366 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers) 367 { 368 struct btrfs_worker_thread *worker; 369 unsigned long flags; 370 371 again: 372 spin_lock_irqsave(&workers->lock, flags); 373 worker = next_worker(workers); 374 spin_unlock_irqrestore(&workers->lock, flags); 375 376 if (!worker) { 377 spin_lock_irqsave(&workers->lock, flags); 378 if (workers->num_workers >= workers->max_workers) { 379 struct list_head *fallback = NULL; 380 /* 381 * we have failed to find any workers, just 382 * return the force one 383 */ 384 if (!list_empty(&workers->worker_list)) 385 fallback = workers->worker_list.next; 386 if (!list_empty(&workers->idle_list)) 387 fallback = workers->idle_list.next; 388 BUG_ON(!fallback); 389 worker = list_entry(fallback, 390 struct btrfs_worker_thread, worker_list); 391 spin_unlock_irqrestore(&workers->lock, flags); 392 } else { 393 spin_unlock_irqrestore(&workers->lock, flags); 394 /* we're below the limit, start another worker */ 395 btrfs_start_workers(workers, 1); 396 goto again; 397 } 398 } 399 return worker; 400 } 401 402 /* 403 * btrfs_requeue_work just puts the work item back on the tail of the list 404 * it was taken from. It is intended for use with long running work functions 405 * that make some progress and want to give the cpu up for others. 406 */ 407 int btrfs_requeue_work(struct btrfs_work *work) 408 { 409 struct btrfs_worker_thread *worker = work->worker; 410 unsigned long flags; 411 int wake = 0; 412 413 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 414 goto out; 415 416 spin_lock_irqsave(&worker->lock, flags); 417 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 418 list_add_tail(&work->list, &worker->prio_pending); 419 else 420 list_add_tail(&work->list, &worker->pending); 421 atomic_inc(&worker->num_pending); 422 423 /* by definition we're busy, take ourselves off the idle 424 * list 425 */ 426 if (worker->idle) { 427 spin_lock_irqsave(&worker->workers->lock, flags); 428 worker->idle = 0; 429 list_move_tail(&worker->worker_list, 430 &worker->workers->worker_list); 431 spin_unlock_irqrestore(&worker->workers->lock, flags); 432 } 433 if (!worker->working) { 434 wake = 1; 435 worker->working = 1; 436 } 437 438 spin_unlock_irqrestore(&worker->lock, flags); 439 if (wake) 440 wake_up_process(worker->task); 441 out: 442 443 return 0; 444 } 445 446 void btrfs_set_work_high_prio(struct btrfs_work *work) 447 { 448 set_bit(WORK_HIGH_PRIO_BIT, &work->flags); 449 } 450 451 /* 452 * places a struct btrfs_work into the pending queue of one of the kthreads 453 */ 454 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work) 455 { 456 struct btrfs_worker_thread *worker; 457 unsigned long flags; 458 int wake = 0; 459 460 /* don't requeue something already on a list */ 461 if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags)) 462 goto out; 463 464 worker = find_worker(workers); 465 if (workers->ordered) { 466 spin_lock_irqsave(&workers->lock, flags); 467 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) { 468 list_add_tail(&work->order_list, 469 &workers->prio_order_list); 470 } else { 471 list_add_tail(&work->order_list, &workers->order_list); 472 } 473 spin_unlock_irqrestore(&workers->lock, flags); 474 } else { 475 INIT_LIST_HEAD(&work->order_list); 476 } 477 478 spin_lock_irqsave(&worker->lock, flags); 479 480 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) 481 list_add_tail(&work->list, &worker->prio_pending); 482 else 483 list_add_tail(&work->list, &worker->pending); 484 atomic_inc(&worker->num_pending); 485 check_busy_worker(worker); 486 487 /* 488 * avoid calling into wake_up_process if this thread has already 489 * been kicked 490 */ 491 if (!worker->working) 492 wake = 1; 493 worker->working = 1; 494 495 spin_unlock_irqrestore(&worker->lock, flags); 496 497 if (wake) 498 wake_up_process(worker->task); 499 out: 500 return 0; 501 } 502