1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 31 /* 32 * The per-CPU workqueue (if single thread, we always use the first 33 * possible cpu). 34 * 35 * The sequence counters are for flush_scheduled_work(). It wants to wait 36 * until until all currently-scheduled works are completed, but it doesn't 37 * want to be livelocked by new, incoming ones. So it waits until 38 * remove_sequence is >= the insert_sequence which pertained when 39 * flush_scheduled_work() was called. 40 */ 41 struct cpu_workqueue_struct { 42 43 spinlock_t lock; 44 45 long remove_sequence; /* Least-recently added (next to run) */ 46 long insert_sequence; /* Next to add */ 47 48 struct list_head worklist; 49 wait_queue_head_t more_work; 50 wait_queue_head_t work_done; 51 52 struct workqueue_struct *wq; 53 task_t *thread; 54 55 int run_depth; /* Detect run_workqueue() recursion depth */ 56 } ____cacheline_aligned; 57 58 /* 59 * The externally visible workqueue abstraction is an array of 60 * per-CPU workqueues: 61 */ 62 struct workqueue_struct { 63 struct cpu_workqueue_struct *cpu_wq; 64 const char *name; 65 struct list_head list; /* Empty if single thread */ 66 }; 67 68 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 69 threads to each one as cpus come/go. */ 70 static DEFINE_SPINLOCK(workqueue_lock); 71 static LIST_HEAD(workqueues); 72 73 static int singlethread_cpu; 74 75 /* If it's single threaded, it isn't in the list of workqueues. */ 76 static inline int is_single_threaded(struct workqueue_struct *wq) 77 { 78 return list_empty(&wq->list); 79 } 80 81 /* Preempt must be disabled. */ 82 static void __queue_work(struct cpu_workqueue_struct *cwq, 83 struct work_struct *work) 84 { 85 unsigned long flags; 86 87 spin_lock_irqsave(&cwq->lock, flags); 88 work->wq_data = cwq; 89 list_add_tail(&work->entry, &cwq->worklist); 90 cwq->insert_sequence++; 91 wake_up(&cwq->more_work); 92 spin_unlock_irqrestore(&cwq->lock, flags); 93 } 94 95 /* 96 * Queue work on a workqueue. Return non-zero if it was successfully 97 * added. 98 * 99 * We queue the work to the CPU it was submitted, but there is no 100 * guarantee that it will be processed by that CPU. 101 */ 102 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 103 { 104 int ret = 0, cpu = get_cpu(); 105 106 if (!test_and_set_bit(0, &work->pending)) { 107 if (unlikely(is_single_threaded(wq))) 108 cpu = singlethread_cpu; 109 BUG_ON(!list_empty(&work->entry)); 110 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 111 ret = 1; 112 } 113 put_cpu(); 114 return ret; 115 } 116 117 static void delayed_work_timer_fn(unsigned long __data) 118 { 119 struct work_struct *work = (struct work_struct *)__data; 120 struct workqueue_struct *wq = work->wq_data; 121 int cpu = smp_processor_id(); 122 123 if (unlikely(is_single_threaded(wq))) 124 cpu = singlethread_cpu; 125 126 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 127 } 128 129 int fastcall queue_delayed_work(struct workqueue_struct *wq, 130 struct work_struct *work, unsigned long delay) 131 { 132 int ret = 0; 133 struct timer_list *timer = &work->timer; 134 135 if (!test_and_set_bit(0, &work->pending)) { 136 BUG_ON(timer_pending(timer)); 137 BUG_ON(!list_empty(&work->entry)); 138 139 /* This stores wq for the moment, for the timer_fn */ 140 work->wq_data = wq; 141 timer->expires = jiffies + delay; 142 timer->data = (unsigned long)work; 143 timer->function = delayed_work_timer_fn; 144 add_timer(timer); 145 ret = 1; 146 } 147 return ret; 148 } 149 150 static inline void run_workqueue(struct cpu_workqueue_struct *cwq) 151 { 152 unsigned long flags; 153 154 /* 155 * Keep taking off work from the queue until 156 * done. 157 */ 158 spin_lock_irqsave(&cwq->lock, flags); 159 cwq->run_depth++; 160 if (cwq->run_depth > 3) { 161 /* morton gets to eat his hat */ 162 printk("%s: recursion depth exceeded: %d\n", 163 __FUNCTION__, cwq->run_depth); 164 dump_stack(); 165 } 166 while (!list_empty(&cwq->worklist)) { 167 struct work_struct *work = list_entry(cwq->worklist.next, 168 struct work_struct, entry); 169 void (*f) (void *) = work->func; 170 void *data = work->data; 171 172 list_del_init(cwq->worklist.next); 173 spin_unlock_irqrestore(&cwq->lock, flags); 174 175 BUG_ON(work->wq_data != cwq); 176 clear_bit(0, &work->pending); 177 f(data); 178 179 spin_lock_irqsave(&cwq->lock, flags); 180 cwq->remove_sequence++; 181 wake_up(&cwq->work_done); 182 } 183 cwq->run_depth--; 184 spin_unlock_irqrestore(&cwq->lock, flags); 185 } 186 187 static int worker_thread(void *__cwq) 188 { 189 struct cpu_workqueue_struct *cwq = __cwq; 190 DECLARE_WAITQUEUE(wait, current); 191 struct k_sigaction sa; 192 sigset_t blocked; 193 194 current->flags |= PF_NOFREEZE; 195 196 set_user_nice(current, -5); 197 198 /* Block and flush all signals */ 199 sigfillset(&blocked); 200 sigprocmask(SIG_BLOCK, &blocked, NULL); 201 flush_signals(current); 202 203 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 204 sa.sa.sa_handler = SIG_IGN; 205 sa.sa.sa_flags = 0; 206 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 207 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 208 209 set_current_state(TASK_INTERRUPTIBLE); 210 while (!kthread_should_stop()) { 211 add_wait_queue(&cwq->more_work, &wait); 212 if (list_empty(&cwq->worklist)) 213 schedule(); 214 else 215 __set_current_state(TASK_RUNNING); 216 remove_wait_queue(&cwq->more_work, &wait); 217 218 if (!list_empty(&cwq->worklist)) 219 run_workqueue(cwq); 220 set_current_state(TASK_INTERRUPTIBLE); 221 } 222 __set_current_state(TASK_RUNNING); 223 return 0; 224 } 225 226 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 227 { 228 if (cwq->thread == current) { 229 /* 230 * Probably keventd trying to flush its own queue. So simply run 231 * it by hand rather than deadlocking. 232 */ 233 run_workqueue(cwq); 234 } else { 235 DEFINE_WAIT(wait); 236 long sequence_needed; 237 238 spin_lock_irq(&cwq->lock); 239 sequence_needed = cwq->insert_sequence; 240 241 while (sequence_needed - cwq->remove_sequence > 0) { 242 prepare_to_wait(&cwq->work_done, &wait, 243 TASK_UNINTERRUPTIBLE); 244 spin_unlock_irq(&cwq->lock); 245 schedule(); 246 spin_lock_irq(&cwq->lock); 247 } 248 finish_wait(&cwq->work_done, &wait); 249 spin_unlock_irq(&cwq->lock); 250 } 251 } 252 253 /* 254 * flush_workqueue - ensure that any scheduled work has run to completion. 255 * 256 * Forces execution of the workqueue and blocks until its completion. 257 * This is typically used in driver shutdown handlers. 258 * 259 * This function will sample each workqueue's current insert_sequence number and 260 * will sleep until the head sequence is greater than or equal to that. This 261 * means that we sleep until all works which were queued on entry have been 262 * handled, but we are not livelocked by new incoming ones. 263 * 264 * This function used to run the workqueues itself. Now we just wait for the 265 * helper threads to do it. 266 */ 267 void fastcall flush_workqueue(struct workqueue_struct *wq) 268 { 269 might_sleep(); 270 271 if (is_single_threaded(wq)) { 272 /* Always use first cpu's area. */ 273 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 274 } else { 275 int cpu; 276 277 lock_cpu_hotplug(); 278 for_each_online_cpu(cpu) 279 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 280 unlock_cpu_hotplug(); 281 } 282 } 283 284 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 285 int cpu) 286 { 287 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 288 struct task_struct *p; 289 290 spin_lock_init(&cwq->lock); 291 cwq->wq = wq; 292 cwq->thread = NULL; 293 cwq->insert_sequence = 0; 294 cwq->remove_sequence = 0; 295 INIT_LIST_HEAD(&cwq->worklist); 296 init_waitqueue_head(&cwq->more_work); 297 init_waitqueue_head(&cwq->work_done); 298 299 if (is_single_threaded(wq)) 300 p = kthread_create(worker_thread, cwq, "%s", wq->name); 301 else 302 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 303 if (IS_ERR(p)) 304 return NULL; 305 cwq->thread = p; 306 return p; 307 } 308 309 struct workqueue_struct *__create_workqueue(const char *name, 310 int singlethread) 311 { 312 int cpu, destroy = 0; 313 struct workqueue_struct *wq; 314 struct task_struct *p; 315 316 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 317 if (!wq) 318 return NULL; 319 320 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 321 if (!wq->cpu_wq) { 322 kfree(wq); 323 return NULL; 324 } 325 326 wq->name = name; 327 /* We don't need the distraction of CPUs appearing and vanishing. */ 328 lock_cpu_hotplug(); 329 if (singlethread) { 330 INIT_LIST_HEAD(&wq->list); 331 p = create_workqueue_thread(wq, singlethread_cpu); 332 if (!p) 333 destroy = 1; 334 else 335 wake_up_process(p); 336 } else { 337 spin_lock(&workqueue_lock); 338 list_add(&wq->list, &workqueues); 339 spin_unlock(&workqueue_lock); 340 for_each_online_cpu(cpu) { 341 p = create_workqueue_thread(wq, cpu); 342 if (p) { 343 kthread_bind(p, cpu); 344 wake_up_process(p); 345 } else 346 destroy = 1; 347 } 348 } 349 unlock_cpu_hotplug(); 350 351 /* 352 * Was there any error during startup? If yes then clean up: 353 */ 354 if (destroy) { 355 destroy_workqueue(wq); 356 wq = NULL; 357 } 358 return wq; 359 } 360 361 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 362 { 363 struct cpu_workqueue_struct *cwq; 364 unsigned long flags; 365 struct task_struct *p; 366 367 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 368 spin_lock_irqsave(&cwq->lock, flags); 369 p = cwq->thread; 370 cwq->thread = NULL; 371 spin_unlock_irqrestore(&cwq->lock, flags); 372 if (p) 373 kthread_stop(p); 374 } 375 376 void destroy_workqueue(struct workqueue_struct *wq) 377 { 378 int cpu; 379 380 flush_workqueue(wq); 381 382 /* We don't need the distraction of CPUs appearing and vanishing. */ 383 lock_cpu_hotplug(); 384 if (is_single_threaded(wq)) 385 cleanup_workqueue_thread(wq, singlethread_cpu); 386 else { 387 for_each_online_cpu(cpu) 388 cleanup_workqueue_thread(wq, cpu); 389 spin_lock(&workqueue_lock); 390 list_del(&wq->list); 391 spin_unlock(&workqueue_lock); 392 } 393 unlock_cpu_hotplug(); 394 free_percpu(wq->cpu_wq); 395 kfree(wq); 396 } 397 398 static struct workqueue_struct *keventd_wq; 399 400 int fastcall schedule_work(struct work_struct *work) 401 { 402 return queue_work(keventd_wq, work); 403 } 404 405 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) 406 { 407 return queue_delayed_work(keventd_wq, work, delay); 408 } 409 410 int schedule_delayed_work_on(int cpu, 411 struct work_struct *work, unsigned long delay) 412 { 413 int ret = 0; 414 struct timer_list *timer = &work->timer; 415 416 if (!test_and_set_bit(0, &work->pending)) { 417 BUG_ON(timer_pending(timer)); 418 BUG_ON(!list_empty(&work->entry)); 419 /* This stores keventd_wq for the moment, for the timer_fn */ 420 work->wq_data = keventd_wq; 421 timer->expires = jiffies + delay; 422 timer->data = (unsigned long)work; 423 timer->function = delayed_work_timer_fn; 424 add_timer_on(timer, cpu); 425 ret = 1; 426 } 427 return ret; 428 } 429 430 int schedule_on_each_cpu(void (*func) (void *info), void *info) 431 { 432 int cpu; 433 struct work_struct *work; 434 435 work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL); 436 437 if (!work) 438 return -ENOMEM; 439 for_each_online_cpu(cpu) { 440 INIT_WORK(work + cpu, func, info); 441 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), 442 work + cpu); 443 } 444 flush_workqueue(keventd_wq); 445 kfree(work); 446 return 0; 447 } 448 449 void flush_scheduled_work(void) 450 { 451 flush_workqueue(keventd_wq); 452 } 453 454 /** 455 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 456 * work whose handler rearms the delayed work. 457 * @wq: the controlling workqueue structure 458 * @work: the delayed work struct 459 */ 460 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 461 struct work_struct *work) 462 { 463 while (!cancel_delayed_work(work)) 464 flush_workqueue(wq); 465 } 466 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 467 468 /** 469 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 470 * work whose handler rearms the delayed work. 471 * @work: the delayed work struct 472 */ 473 void cancel_rearming_delayed_work(struct work_struct *work) 474 { 475 cancel_rearming_delayed_workqueue(keventd_wq, work); 476 } 477 EXPORT_SYMBOL(cancel_rearming_delayed_work); 478 479 int keventd_up(void) 480 { 481 return keventd_wq != NULL; 482 } 483 484 int current_is_keventd(void) 485 { 486 struct cpu_workqueue_struct *cwq; 487 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 488 int ret = 0; 489 490 BUG_ON(!keventd_wq); 491 492 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 493 if (current == cwq->thread) 494 ret = 1; 495 496 return ret; 497 498 } 499 500 #ifdef CONFIG_HOTPLUG_CPU 501 /* Take the work from this (downed) CPU. */ 502 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 503 { 504 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 505 LIST_HEAD(list); 506 struct work_struct *work; 507 508 spin_lock_irq(&cwq->lock); 509 list_splice_init(&cwq->worklist, &list); 510 511 while (!list_empty(&list)) { 512 printk("Taking work for %s\n", wq->name); 513 work = list_entry(list.next,struct work_struct,entry); 514 list_del(&work->entry); 515 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 516 } 517 spin_unlock_irq(&cwq->lock); 518 } 519 520 /* We're holding the cpucontrol mutex here */ 521 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 522 unsigned long action, 523 void *hcpu) 524 { 525 unsigned int hotcpu = (unsigned long)hcpu; 526 struct workqueue_struct *wq; 527 528 switch (action) { 529 case CPU_UP_PREPARE: 530 /* Create a new workqueue thread for it. */ 531 list_for_each_entry(wq, &workqueues, list) { 532 if (!create_workqueue_thread(wq, hotcpu)) { 533 printk("workqueue for %i failed\n", hotcpu); 534 return NOTIFY_BAD; 535 } 536 } 537 break; 538 539 case CPU_ONLINE: 540 /* Kick off worker threads. */ 541 list_for_each_entry(wq, &workqueues, list) { 542 struct cpu_workqueue_struct *cwq; 543 544 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 545 kthread_bind(cwq->thread, hotcpu); 546 wake_up_process(cwq->thread); 547 } 548 break; 549 550 case CPU_UP_CANCELED: 551 list_for_each_entry(wq, &workqueues, list) { 552 /* Unbind so it can run. */ 553 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 554 any_online_cpu(cpu_online_map)); 555 cleanup_workqueue_thread(wq, hotcpu); 556 } 557 break; 558 559 case CPU_DEAD: 560 list_for_each_entry(wq, &workqueues, list) 561 cleanup_workqueue_thread(wq, hotcpu); 562 list_for_each_entry(wq, &workqueues, list) 563 take_over_work(wq, hotcpu); 564 break; 565 } 566 567 return NOTIFY_OK; 568 } 569 #endif 570 571 void init_workqueues(void) 572 { 573 singlethread_cpu = first_cpu(cpu_possible_map); 574 hotcpu_notifier(workqueue_cpu_callback, 0); 575 keventd_wq = create_workqueue("events"); 576 BUG_ON(!keventd_wq); 577 } 578 579 EXPORT_SYMBOL_GPL(__create_workqueue); 580 EXPORT_SYMBOL_GPL(queue_work); 581 EXPORT_SYMBOL_GPL(queue_delayed_work); 582 EXPORT_SYMBOL_GPL(flush_workqueue); 583 EXPORT_SYMBOL_GPL(destroy_workqueue); 584 585 EXPORT_SYMBOL(schedule_work); 586 EXPORT_SYMBOL(schedule_delayed_work); 587 EXPORT_SYMBOL(schedule_delayed_work_on); 588 EXPORT_SYMBOL(flush_scheduled_work); 589