1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 31 /* 32 * The per-CPU workqueue (if single thread, we always use cpu 0's). 33 * 34 * The sequence counters are for flush_scheduled_work(). It wants to wait 35 * until until all currently-scheduled works are completed, but it doesn't 36 * want to be livelocked by new, incoming ones. So it waits until 37 * remove_sequence is >= the insert_sequence which pertained when 38 * flush_scheduled_work() was called. 39 */ 40 struct cpu_workqueue_struct { 41 42 spinlock_t lock; 43 44 long remove_sequence; /* Least-recently added (next to run) */ 45 long insert_sequence; /* Next to add */ 46 47 struct list_head worklist; 48 wait_queue_head_t more_work; 49 wait_queue_head_t work_done; 50 51 struct workqueue_struct *wq; 52 task_t *thread; 53 54 int run_depth; /* Detect run_workqueue() recursion depth */ 55 } ____cacheline_aligned; 56 57 /* 58 * The externally visible workqueue abstraction is an array of 59 * per-CPU workqueues: 60 */ 61 struct workqueue_struct { 62 struct cpu_workqueue_struct *cpu_wq; 63 const char *name; 64 struct list_head list; /* Empty if single thread */ 65 }; 66 67 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 68 threads to each one as cpus come/go. */ 69 static DEFINE_SPINLOCK(workqueue_lock); 70 static LIST_HEAD(workqueues); 71 72 /* If it's single threaded, it isn't in the list of workqueues. */ 73 static inline int is_single_threaded(struct workqueue_struct *wq) 74 { 75 return list_empty(&wq->list); 76 } 77 78 /* Preempt must be disabled. */ 79 static void __queue_work(struct cpu_workqueue_struct *cwq, 80 struct work_struct *work) 81 { 82 unsigned long flags; 83 84 spin_lock_irqsave(&cwq->lock, flags); 85 work->wq_data = cwq; 86 list_add_tail(&work->entry, &cwq->worklist); 87 cwq->insert_sequence++; 88 wake_up(&cwq->more_work); 89 spin_unlock_irqrestore(&cwq->lock, flags); 90 } 91 92 /* 93 * Queue work on a workqueue. Return non-zero if it was successfully 94 * added. 95 * 96 * We queue the work to the CPU it was submitted, but there is no 97 * guarantee that it will be processed by that CPU. 98 */ 99 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 100 { 101 int ret = 0, cpu = get_cpu(); 102 103 if (!test_and_set_bit(0, &work->pending)) { 104 if (unlikely(is_single_threaded(wq))) 105 cpu = 0; 106 BUG_ON(!list_empty(&work->entry)); 107 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 108 ret = 1; 109 } 110 put_cpu(); 111 return ret; 112 } 113 114 static void delayed_work_timer_fn(unsigned long __data) 115 { 116 struct work_struct *work = (struct work_struct *)__data; 117 struct workqueue_struct *wq = work->wq_data; 118 int cpu = smp_processor_id(); 119 120 if (unlikely(is_single_threaded(wq))) 121 cpu = 0; 122 123 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 124 } 125 126 int fastcall queue_delayed_work(struct workqueue_struct *wq, 127 struct work_struct *work, unsigned long delay) 128 { 129 int ret = 0; 130 struct timer_list *timer = &work->timer; 131 132 if (!test_and_set_bit(0, &work->pending)) { 133 BUG_ON(timer_pending(timer)); 134 BUG_ON(!list_empty(&work->entry)); 135 136 /* This stores wq for the moment, for the timer_fn */ 137 work->wq_data = wq; 138 timer->expires = jiffies + delay; 139 timer->data = (unsigned long)work; 140 timer->function = delayed_work_timer_fn; 141 add_timer(timer); 142 ret = 1; 143 } 144 return ret; 145 } 146 147 static inline void run_workqueue(struct cpu_workqueue_struct *cwq) 148 { 149 unsigned long flags; 150 151 /* 152 * Keep taking off work from the queue until 153 * done. 154 */ 155 spin_lock_irqsave(&cwq->lock, flags); 156 cwq->run_depth++; 157 if (cwq->run_depth > 3) { 158 /* morton gets to eat his hat */ 159 printk("%s: recursion depth exceeded: %d\n", 160 __FUNCTION__, cwq->run_depth); 161 dump_stack(); 162 } 163 while (!list_empty(&cwq->worklist)) { 164 struct work_struct *work = list_entry(cwq->worklist.next, 165 struct work_struct, entry); 166 void (*f) (void *) = work->func; 167 void *data = work->data; 168 169 list_del_init(cwq->worklist.next); 170 spin_unlock_irqrestore(&cwq->lock, flags); 171 172 BUG_ON(work->wq_data != cwq); 173 clear_bit(0, &work->pending); 174 f(data); 175 176 spin_lock_irqsave(&cwq->lock, flags); 177 cwq->remove_sequence++; 178 wake_up(&cwq->work_done); 179 } 180 cwq->run_depth--; 181 spin_unlock_irqrestore(&cwq->lock, flags); 182 } 183 184 static int worker_thread(void *__cwq) 185 { 186 struct cpu_workqueue_struct *cwq = __cwq; 187 DECLARE_WAITQUEUE(wait, current); 188 struct k_sigaction sa; 189 sigset_t blocked; 190 191 current->flags |= PF_NOFREEZE; 192 193 set_user_nice(current, -5); 194 195 /* Block and flush all signals */ 196 sigfillset(&blocked); 197 sigprocmask(SIG_BLOCK, &blocked, NULL); 198 flush_signals(current); 199 200 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 201 sa.sa.sa_handler = SIG_IGN; 202 sa.sa.sa_flags = 0; 203 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 204 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 205 206 set_current_state(TASK_INTERRUPTIBLE); 207 while (!kthread_should_stop()) { 208 add_wait_queue(&cwq->more_work, &wait); 209 if (list_empty(&cwq->worklist)) 210 schedule(); 211 else 212 __set_current_state(TASK_RUNNING); 213 remove_wait_queue(&cwq->more_work, &wait); 214 215 if (!list_empty(&cwq->worklist)) 216 run_workqueue(cwq); 217 set_current_state(TASK_INTERRUPTIBLE); 218 } 219 __set_current_state(TASK_RUNNING); 220 return 0; 221 } 222 223 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 224 { 225 if (cwq->thread == current) { 226 /* 227 * Probably keventd trying to flush its own queue. So simply run 228 * it by hand rather than deadlocking. 229 */ 230 run_workqueue(cwq); 231 } else { 232 DEFINE_WAIT(wait); 233 long sequence_needed; 234 235 spin_lock_irq(&cwq->lock); 236 sequence_needed = cwq->insert_sequence; 237 238 while (sequence_needed - cwq->remove_sequence > 0) { 239 prepare_to_wait(&cwq->work_done, &wait, 240 TASK_UNINTERRUPTIBLE); 241 spin_unlock_irq(&cwq->lock); 242 schedule(); 243 spin_lock_irq(&cwq->lock); 244 } 245 finish_wait(&cwq->work_done, &wait); 246 spin_unlock_irq(&cwq->lock); 247 } 248 } 249 250 /* 251 * flush_workqueue - ensure that any scheduled work has run to completion. 252 * 253 * Forces execution of the workqueue and blocks until its completion. 254 * This is typically used in driver shutdown handlers. 255 * 256 * This function will sample each workqueue's current insert_sequence number and 257 * will sleep until the head sequence is greater than or equal to that. This 258 * means that we sleep until all works which were queued on entry have been 259 * handled, but we are not livelocked by new incoming ones. 260 * 261 * This function used to run the workqueues itself. Now we just wait for the 262 * helper threads to do it. 263 */ 264 void fastcall flush_workqueue(struct workqueue_struct *wq) 265 { 266 might_sleep(); 267 268 if (is_single_threaded(wq)) { 269 /* Always use cpu 0's area. */ 270 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, 0)); 271 } else { 272 int cpu; 273 274 lock_cpu_hotplug(); 275 for_each_online_cpu(cpu) 276 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 277 unlock_cpu_hotplug(); 278 } 279 } 280 281 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 282 int cpu) 283 { 284 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 285 struct task_struct *p; 286 287 spin_lock_init(&cwq->lock); 288 cwq->wq = wq; 289 cwq->thread = NULL; 290 cwq->insert_sequence = 0; 291 cwq->remove_sequence = 0; 292 INIT_LIST_HEAD(&cwq->worklist); 293 init_waitqueue_head(&cwq->more_work); 294 init_waitqueue_head(&cwq->work_done); 295 296 if (is_single_threaded(wq)) 297 p = kthread_create(worker_thread, cwq, "%s", wq->name); 298 else 299 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 300 if (IS_ERR(p)) 301 return NULL; 302 cwq->thread = p; 303 return p; 304 } 305 306 struct workqueue_struct *__create_workqueue(const char *name, 307 int singlethread) 308 { 309 int cpu, destroy = 0; 310 struct workqueue_struct *wq; 311 struct task_struct *p; 312 313 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 314 if (!wq) 315 return NULL; 316 317 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 318 wq->name = name; 319 /* We don't need the distraction of CPUs appearing and vanishing. */ 320 lock_cpu_hotplug(); 321 if (singlethread) { 322 INIT_LIST_HEAD(&wq->list); 323 p = create_workqueue_thread(wq, 0); 324 if (!p) 325 destroy = 1; 326 else 327 wake_up_process(p); 328 } else { 329 spin_lock(&workqueue_lock); 330 list_add(&wq->list, &workqueues); 331 spin_unlock(&workqueue_lock); 332 for_each_online_cpu(cpu) { 333 p = create_workqueue_thread(wq, cpu); 334 if (p) { 335 kthread_bind(p, cpu); 336 wake_up_process(p); 337 } else 338 destroy = 1; 339 } 340 } 341 unlock_cpu_hotplug(); 342 343 /* 344 * Was there any error during startup? If yes then clean up: 345 */ 346 if (destroy) { 347 destroy_workqueue(wq); 348 wq = NULL; 349 } 350 return wq; 351 } 352 353 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 354 { 355 struct cpu_workqueue_struct *cwq; 356 unsigned long flags; 357 struct task_struct *p; 358 359 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 360 spin_lock_irqsave(&cwq->lock, flags); 361 p = cwq->thread; 362 cwq->thread = NULL; 363 spin_unlock_irqrestore(&cwq->lock, flags); 364 if (p) 365 kthread_stop(p); 366 } 367 368 void destroy_workqueue(struct workqueue_struct *wq) 369 { 370 int cpu; 371 372 flush_workqueue(wq); 373 374 /* We don't need the distraction of CPUs appearing and vanishing. */ 375 lock_cpu_hotplug(); 376 if (is_single_threaded(wq)) 377 cleanup_workqueue_thread(wq, 0); 378 else { 379 for_each_online_cpu(cpu) 380 cleanup_workqueue_thread(wq, cpu); 381 spin_lock(&workqueue_lock); 382 list_del(&wq->list); 383 spin_unlock(&workqueue_lock); 384 } 385 unlock_cpu_hotplug(); 386 free_percpu(wq->cpu_wq); 387 kfree(wq); 388 } 389 390 static struct workqueue_struct *keventd_wq; 391 392 int fastcall schedule_work(struct work_struct *work) 393 { 394 return queue_work(keventd_wq, work); 395 } 396 397 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) 398 { 399 return queue_delayed_work(keventd_wq, work, delay); 400 } 401 402 int schedule_delayed_work_on(int cpu, 403 struct work_struct *work, unsigned long delay) 404 { 405 int ret = 0; 406 struct timer_list *timer = &work->timer; 407 408 if (!test_and_set_bit(0, &work->pending)) { 409 BUG_ON(timer_pending(timer)); 410 BUG_ON(!list_empty(&work->entry)); 411 /* This stores keventd_wq for the moment, for the timer_fn */ 412 work->wq_data = keventd_wq; 413 timer->expires = jiffies + delay; 414 timer->data = (unsigned long)work; 415 timer->function = delayed_work_timer_fn; 416 add_timer_on(timer, cpu); 417 ret = 1; 418 } 419 return ret; 420 } 421 422 void flush_scheduled_work(void) 423 { 424 flush_workqueue(keventd_wq); 425 } 426 427 /** 428 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 429 * work whose handler rearms the delayed work. 430 * @wq: the controlling workqueue structure 431 * @work: the delayed work struct 432 */ 433 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 434 struct work_struct *work) 435 { 436 while (!cancel_delayed_work(work)) 437 flush_workqueue(wq); 438 } 439 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 440 441 /** 442 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 443 * work whose handler rearms the delayed work. 444 * @work: the delayed work struct 445 */ 446 void cancel_rearming_delayed_work(struct work_struct *work) 447 { 448 cancel_rearming_delayed_workqueue(keventd_wq, work); 449 } 450 EXPORT_SYMBOL(cancel_rearming_delayed_work); 451 452 int keventd_up(void) 453 { 454 return keventd_wq != NULL; 455 } 456 457 int current_is_keventd(void) 458 { 459 struct cpu_workqueue_struct *cwq; 460 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 461 int ret = 0; 462 463 BUG_ON(!keventd_wq); 464 465 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 466 if (current == cwq->thread) 467 ret = 1; 468 469 return ret; 470 471 } 472 473 #ifdef CONFIG_HOTPLUG_CPU 474 /* Take the work from this (downed) CPU. */ 475 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 476 { 477 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 478 LIST_HEAD(list); 479 struct work_struct *work; 480 481 spin_lock_irq(&cwq->lock); 482 list_splice_init(&cwq->worklist, &list); 483 484 while (!list_empty(&list)) { 485 printk("Taking work for %s\n", wq->name); 486 work = list_entry(list.next,struct work_struct,entry); 487 list_del(&work->entry); 488 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 489 } 490 spin_unlock_irq(&cwq->lock); 491 } 492 493 /* We're holding the cpucontrol mutex here */ 494 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 495 unsigned long action, 496 void *hcpu) 497 { 498 unsigned int hotcpu = (unsigned long)hcpu; 499 struct workqueue_struct *wq; 500 501 switch (action) { 502 case CPU_UP_PREPARE: 503 /* Create a new workqueue thread for it. */ 504 list_for_each_entry(wq, &workqueues, list) { 505 if (!create_workqueue_thread(wq, hotcpu)) { 506 printk("workqueue for %i failed\n", hotcpu); 507 return NOTIFY_BAD; 508 } 509 } 510 break; 511 512 case CPU_ONLINE: 513 /* Kick off worker threads. */ 514 list_for_each_entry(wq, &workqueues, list) { 515 struct cpu_workqueue_struct *cwq; 516 517 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 518 kthread_bind(cwq->thread, hotcpu); 519 wake_up_process(cwq->thread); 520 } 521 break; 522 523 case CPU_UP_CANCELED: 524 list_for_each_entry(wq, &workqueues, list) { 525 /* Unbind so it can run. */ 526 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 527 smp_processor_id()); 528 cleanup_workqueue_thread(wq, hotcpu); 529 } 530 break; 531 532 case CPU_DEAD: 533 list_for_each_entry(wq, &workqueues, list) 534 cleanup_workqueue_thread(wq, hotcpu); 535 list_for_each_entry(wq, &workqueues, list) 536 take_over_work(wq, hotcpu); 537 break; 538 } 539 540 return NOTIFY_OK; 541 } 542 #endif 543 544 void init_workqueues(void) 545 { 546 hotcpu_notifier(workqueue_cpu_callback, 0); 547 keventd_wq = create_workqueue("events"); 548 BUG_ON(!keventd_wq); 549 } 550 551 EXPORT_SYMBOL_GPL(__create_workqueue); 552 EXPORT_SYMBOL_GPL(queue_work); 553 EXPORT_SYMBOL_GPL(queue_delayed_work); 554 EXPORT_SYMBOL_GPL(flush_workqueue); 555 EXPORT_SYMBOL_GPL(destroy_workqueue); 556 557 EXPORT_SYMBOL(schedule_work); 558 EXPORT_SYMBOL(schedule_delayed_work); 559 EXPORT_SYMBOL(schedule_delayed_work_on); 560 EXPORT_SYMBOL(flush_scheduled_work); 561