1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 32 /* 33 * The per-CPU workqueue (if single thread, we always use the first 34 * possible cpu). 35 * 36 * The sequence counters are for flush_scheduled_work(). It wants to wait 37 * until until all currently-scheduled works are completed, but it doesn't 38 * want to be livelocked by new, incoming ones. So it waits until 39 * remove_sequence is >= the insert_sequence which pertained when 40 * flush_scheduled_work() was called. 41 */ 42 struct cpu_workqueue_struct { 43 44 spinlock_t lock; 45 46 long remove_sequence; /* Least-recently added (next to run) */ 47 long insert_sequence; /* Next to add */ 48 49 struct list_head worklist; 50 wait_queue_head_t more_work; 51 wait_queue_head_t work_done; 52 53 struct workqueue_struct *wq; 54 struct task_struct *thread; 55 56 int run_depth; /* Detect run_workqueue() recursion depth */ 57 } ____cacheline_aligned; 58 59 /* 60 * The externally visible workqueue abstraction is an array of 61 * per-CPU workqueues: 62 */ 63 struct workqueue_struct { 64 struct cpu_workqueue_struct *cpu_wq; 65 const char *name; 66 struct list_head list; /* Empty if single thread */ 67 }; 68 69 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 70 threads to each one as cpus come/go. */ 71 static DEFINE_MUTEX(workqueue_mutex); 72 static LIST_HEAD(workqueues); 73 74 static int singlethread_cpu; 75 76 /* If it's single threaded, it isn't in the list of workqueues. */ 77 static inline int is_single_threaded(struct workqueue_struct *wq) 78 { 79 return list_empty(&wq->list); 80 } 81 82 /* Preempt must be disabled. */ 83 static void __queue_work(struct cpu_workqueue_struct *cwq, 84 struct work_struct *work) 85 { 86 unsigned long flags; 87 88 spin_lock_irqsave(&cwq->lock, flags); 89 work->wq_data = cwq; 90 list_add_tail(&work->entry, &cwq->worklist); 91 cwq->insert_sequence++; 92 wake_up(&cwq->more_work); 93 spin_unlock_irqrestore(&cwq->lock, flags); 94 } 95 96 /** 97 * queue_work - queue work on a workqueue 98 * @wq: workqueue to use 99 * @work: work to queue 100 * 101 * Returns non-zero if it was successfully added. 102 * 103 * We queue the work to the CPU it was submitted, but there is no 104 * guarantee that it will be processed by that CPU. 105 */ 106 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 107 { 108 int ret = 0, cpu = get_cpu(); 109 110 if (!test_and_set_bit(0, &work->pending)) { 111 if (unlikely(is_single_threaded(wq))) 112 cpu = singlethread_cpu; 113 BUG_ON(!list_empty(&work->entry)); 114 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 115 ret = 1; 116 } 117 put_cpu(); 118 return ret; 119 } 120 EXPORT_SYMBOL_GPL(queue_work); 121 122 static void delayed_work_timer_fn(unsigned long __data) 123 { 124 struct work_struct *work = (struct work_struct *)__data; 125 struct workqueue_struct *wq = work->wq_data; 126 int cpu = smp_processor_id(); 127 128 if (unlikely(is_single_threaded(wq))) 129 cpu = singlethread_cpu; 130 131 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 132 } 133 134 /** 135 * queue_delayed_work - queue work on a workqueue after delay 136 * @wq: workqueue to use 137 * @work: work to queue 138 * @delay: number of jiffies to wait before queueing 139 * 140 * Returns non-zero if it was successfully added. 141 */ 142 int fastcall queue_delayed_work(struct workqueue_struct *wq, 143 struct work_struct *work, unsigned long delay) 144 { 145 int ret = 0; 146 struct timer_list *timer = &work->timer; 147 148 if (!test_and_set_bit(0, &work->pending)) { 149 BUG_ON(timer_pending(timer)); 150 BUG_ON(!list_empty(&work->entry)); 151 152 /* This stores wq for the moment, for the timer_fn */ 153 work->wq_data = wq; 154 timer->expires = jiffies + delay; 155 timer->data = (unsigned long)work; 156 timer->function = delayed_work_timer_fn; 157 add_timer(timer); 158 ret = 1; 159 } 160 return ret; 161 } 162 EXPORT_SYMBOL_GPL(queue_delayed_work); 163 164 /** 165 * queue_delayed_work_on - queue work on specific CPU after delay 166 * @cpu: CPU number to execute work on 167 * @wq: workqueue to use 168 * @work: work to queue 169 * @delay: number of jiffies to wait before queueing 170 * 171 * Returns non-zero if it was successfully added. 172 */ 173 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 174 struct work_struct *work, unsigned long delay) 175 { 176 int ret = 0; 177 struct timer_list *timer = &work->timer; 178 179 if (!test_and_set_bit(0, &work->pending)) { 180 BUG_ON(timer_pending(timer)); 181 BUG_ON(!list_empty(&work->entry)); 182 183 /* This stores wq for the moment, for the timer_fn */ 184 work->wq_data = wq; 185 timer->expires = jiffies + delay; 186 timer->data = (unsigned long)work; 187 timer->function = delayed_work_timer_fn; 188 add_timer_on(timer, cpu); 189 ret = 1; 190 } 191 return ret; 192 } 193 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 194 195 static void run_workqueue(struct cpu_workqueue_struct *cwq) 196 { 197 unsigned long flags; 198 199 /* 200 * Keep taking off work from the queue until 201 * done. 202 */ 203 spin_lock_irqsave(&cwq->lock, flags); 204 cwq->run_depth++; 205 if (cwq->run_depth > 3) { 206 /* morton gets to eat his hat */ 207 printk("%s: recursion depth exceeded: %d\n", 208 __FUNCTION__, cwq->run_depth); 209 dump_stack(); 210 } 211 while (!list_empty(&cwq->worklist)) { 212 struct work_struct *work = list_entry(cwq->worklist.next, 213 struct work_struct, entry); 214 void (*f) (void *) = work->func; 215 void *data = work->data; 216 217 list_del_init(cwq->worklist.next); 218 spin_unlock_irqrestore(&cwq->lock, flags); 219 220 BUG_ON(work->wq_data != cwq); 221 clear_bit(0, &work->pending); 222 f(data); 223 224 spin_lock_irqsave(&cwq->lock, flags); 225 cwq->remove_sequence++; 226 wake_up(&cwq->work_done); 227 } 228 cwq->run_depth--; 229 spin_unlock_irqrestore(&cwq->lock, flags); 230 } 231 232 static int worker_thread(void *__cwq) 233 { 234 struct cpu_workqueue_struct *cwq = __cwq; 235 DECLARE_WAITQUEUE(wait, current); 236 struct k_sigaction sa; 237 sigset_t blocked; 238 239 current->flags |= PF_NOFREEZE; 240 241 set_user_nice(current, -5); 242 243 /* Block and flush all signals */ 244 sigfillset(&blocked); 245 sigprocmask(SIG_BLOCK, &blocked, NULL); 246 flush_signals(current); 247 248 /* SIG_IGN makes children autoreap: see do_notify_parent(). */ 249 sa.sa.sa_handler = SIG_IGN; 250 sa.sa.sa_flags = 0; 251 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD)); 252 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0); 253 254 set_current_state(TASK_INTERRUPTIBLE); 255 while (!kthread_should_stop()) { 256 add_wait_queue(&cwq->more_work, &wait); 257 if (list_empty(&cwq->worklist)) 258 schedule(); 259 else 260 __set_current_state(TASK_RUNNING); 261 remove_wait_queue(&cwq->more_work, &wait); 262 263 if (!list_empty(&cwq->worklist)) 264 run_workqueue(cwq); 265 set_current_state(TASK_INTERRUPTIBLE); 266 } 267 __set_current_state(TASK_RUNNING); 268 return 0; 269 } 270 271 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 272 { 273 if (cwq->thread == current) { 274 /* 275 * Probably keventd trying to flush its own queue. So simply run 276 * it by hand rather than deadlocking. 277 */ 278 run_workqueue(cwq); 279 } else { 280 DEFINE_WAIT(wait); 281 long sequence_needed; 282 283 spin_lock_irq(&cwq->lock); 284 sequence_needed = cwq->insert_sequence; 285 286 while (sequence_needed - cwq->remove_sequence > 0) { 287 prepare_to_wait(&cwq->work_done, &wait, 288 TASK_UNINTERRUPTIBLE); 289 spin_unlock_irq(&cwq->lock); 290 schedule(); 291 spin_lock_irq(&cwq->lock); 292 } 293 finish_wait(&cwq->work_done, &wait); 294 spin_unlock_irq(&cwq->lock); 295 } 296 } 297 298 /** 299 * flush_workqueue - ensure that any scheduled work has run to completion. 300 * @wq: workqueue to flush 301 * 302 * Forces execution of the workqueue and blocks until its completion. 303 * This is typically used in driver shutdown handlers. 304 * 305 * This function will sample each workqueue's current insert_sequence number and 306 * will sleep until the head sequence is greater than or equal to that. This 307 * means that we sleep until all works which were queued on entry have been 308 * handled, but we are not livelocked by new incoming ones. 309 * 310 * This function used to run the workqueues itself. Now we just wait for the 311 * helper threads to do it. 312 */ 313 void fastcall flush_workqueue(struct workqueue_struct *wq) 314 { 315 might_sleep(); 316 317 if (is_single_threaded(wq)) { 318 /* Always use first cpu's area. */ 319 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu)); 320 } else { 321 int cpu; 322 323 mutex_lock(&workqueue_mutex); 324 for_each_online_cpu(cpu) 325 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 326 mutex_unlock(&workqueue_mutex); 327 } 328 } 329 EXPORT_SYMBOL_GPL(flush_workqueue); 330 331 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq, 332 int cpu) 333 { 334 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 335 struct task_struct *p; 336 337 spin_lock_init(&cwq->lock); 338 cwq->wq = wq; 339 cwq->thread = NULL; 340 cwq->insert_sequence = 0; 341 cwq->remove_sequence = 0; 342 INIT_LIST_HEAD(&cwq->worklist); 343 init_waitqueue_head(&cwq->more_work); 344 init_waitqueue_head(&cwq->work_done); 345 346 if (is_single_threaded(wq)) 347 p = kthread_create(worker_thread, cwq, "%s", wq->name); 348 else 349 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu); 350 if (IS_ERR(p)) 351 return NULL; 352 cwq->thread = p; 353 return p; 354 } 355 356 struct workqueue_struct *__create_workqueue(const char *name, 357 int singlethread) 358 { 359 int cpu, destroy = 0; 360 struct workqueue_struct *wq; 361 struct task_struct *p; 362 363 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 364 if (!wq) 365 return NULL; 366 367 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 368 if (!wq->cpu_wq) { 369 kfree(wq); 370 return NULL; 371 } 372 373 wq->name = name; 374 mutex_lock(&workqueue_mutex); 375 if (singlethread) { 376 INIT_LIST_HEAD(&wq->list); 377 p = create_workqueue_thread(wq, singlethread_cpu); 378 if (!p) 379 destroy = 1; 380 else 381 wake_up_process(p); 382 } else { 383 list_add(&wq->list, &workqueues); 384 for_each_online_cpu(cpu) { 385 p = create_workqueue_thread(wq, cpu); 386 if (p) { 387 kthread_bind(p, cpu); 388 wake_up_process(p); 389 } else 390 destroy = 1; 391 } 392 } 393 mutex_unlock(&workqueue_mutex); 394 395 /* 396 * Was there any error during startup? If yes then clean up: 397 */ 398 if (destroy) { 399 destroy_workqueue(wq); 400 wq = NULL; 401 } 402 return wq; 403 } 404 EXPORT_SYMBOL_GPL(__create_workqueue); 405 406 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu) 407 { 408 struct cpu_workqueue_struct *cwq; 409 unsigned long flags; 410 struct task_struct *p; 411 412 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 413 spin_lock_irqsave(&cwq->lock, flags); 414 p = cwq->thread; 415 cwq->thread = NULL; 416 spin_unlock_irqrestore(&cwq->lock, flags); 417 if (p) 418 kthread_stop(p); 419 } 420 421 /** 422 * destroy_workqueue - safely terminate a workqueue 423 * @wq: target workqueue 424 * 425 * Safely destroy a workqueue. All work currently pending will be done first. 426 */ 427 void destroy_workqueue(struct workqueue_struct *wq) 428 { 429 int cpu; 430 431 flush_workqueue(wq); 432 433 /* We don't need the distraction of CPUs appearing and vanishing. */ 434 mutex_lock(&workqueue_mutex); 435 if (is_single_threaded(wq)) 436 cleanup_workqueue_thread(wq, singlethread_cpu); 437 else { 438 for_each_online_cpu(cpu) 439 cleanup_workqueue_thread(wq, cpu); 440 list_del(&wq->list); 441 } 442 mutex_unlock(&workqueue_mutex); 443 free_percpu(wq->cpu_wq); 444 kfree(wq); 445 } 446 EXPORT_SYMBOL_GPL(destroy_workqueue); 447 448 static struct workqueue_struct *keventd_wq; 449 450 /** 451 * schedule_work - put work task in global workqueue 452 * @work: job to be done 453 * 454 * This puts a job in the kernel-global workqueue. 455 */ 456 int fastcall schedule_work(struct work_struct *work) 457 { 458 return queue_work(keventd_wq, work); 459 } 460 EXPORT_SYMBOL(schedule_work); 461 462 /** 463 * schedule_delayed_work - put work task in global workqueue after delay 464 * @work: job to be done 465 * @delay: number of jiffies to wait 466 * 467 * After waiting for a given time this puts a job in the kernel-global 468 * workqueue. 469 */ 470 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay) 471 { 472 return queue_delayed_work(keventd_wq, work, delay); 473 } 474 EXPORT_SYMBOL(schedule_delayed_work); 475 476 /** 477 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 478 * @cpu: cpu to use 479 * @work: job to be done 480 * @delay: number of jiffies to wait 481 * 482 * After waiting for a given time this puts a job in the kernel-global 483 * workqueue on the specified CPU. 484 */ 485 int schedule_delayed_work_on(int cpu, 486 struct work_struct *work, unsigned long delay) 487 { 488 return queue_delayed_work_on(cpu, keventd_wq, work, delay); 489 } 490 EXPORT_SYMBOL(schedule_delayed_work_on); 491 492 /** 493 * schedule_on_each_cpu - call a function on each online CPU from keventd 494 * @func: the function to call 495 * @info: a pointer to pass to func() 496 * 497 * Returns zero on success. 498 * Returns -ve errno on failure. 499 * 500 * Appears to be racy against CPU hotplug. 501 * 502 * schedule_on_each_cpu() is very slow. 503 */ 504 int schedule_on_each_cpu(void (*func)(void *info), void *info) 505 { 506 int cpu; 507 struct work_struct *works; 508 509 works = alloc_percpu(struct work_struct); 510 if (!works) 511 return -ENOMEM; 512 513 mutex_lock(&workqueue_mutex); 514 for_each_online_cpu(cpu) { 515 INIT_WORK(per_cpu_ptr(works, cpu), func, info); 516 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), 517 per_cpu_ptr(works, cpu)); 518 } 519 mutex_unlock(&workqueue_mutex); 520 flush_workqueue(keventd_wq); 521 free_percpu(works); 522 return 0; 523 } 524 525 void flush_scheduled_work(void) 526 { 527 flush_workqueue(keventd_wq); 528 } 529 EXPORT_SYMBOL(flush_scheduled_work); 530 531 /** 532 * cancel_rearming_delayed_workqueue - reliably kill off a delayed 533 * work whose handler rearms the delayed work. 534 * @wq: the controlling workqueue structure 535 * @work: the delayed work struct 536 */ 537 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq, 538 struct work_struct *work) 539 { 540 while (!cancel_delayed_work(work)) 541 flush_workqueue(wq); 542 } 543 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue); 544 545 /** 546 * cancel_rearming_delayed_work - reliably kill off a delayed keventd 547 * work whose handler rearms the delayed work. 548 * @work: the delayed work struct 549 */ 550 void cancel_rearming_delayed_work(struct work_struct *work) 551 { 552 cancel_rearming_delayed_workqueue(keventd_wq, work); 553 } 554 EXPORT_SYMBOL(cancel_rearming_delayed_work); 555 556 /** 557 * execute_in_process_context - reliably execute the routine with user context 558 * @fn: the function to execute 559 * @data: data to pass to the function 560 * @ew: guaranteed storage for the execute work structure (must 561 * be available when the work executes) 562 * 563 * Executes the function immediately if process context is available, 564 * otherwise schedules the function for delayed execution. 565 * 566 * Returns: 0 - function was executed 567 * 1 - function was scheduled for execution 568 */ 569 int execute_in_process_context(void (*fn)(void *data), void *data, 570 struct execute_work *ew) 571 { 572 if (!in_interrupt()) { 573 fn(data); 574 return 0; 575 } 576 577 INIT_WORK(&ew->work, fn, data); 578 schedule_work(&ew->work); 579 580 return 1; 581 } 582 EXPORT_SYMBOL_GPL(execute_in_process_context); 583 584 int keventd_up(void) 585 { 586 return keventd_wq != NULL; 587 } 588 589 int current_is_keventd(void) 590 { 591 struct cpu_workqueue_struct *cwq; 592 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 593 int ret = 0; 594 595 BUG_ON(!keventd_wq); 596 597 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 598 if (current == cwq->thread) 599 ret = 1; 600 601 return ret; 602 603 } 604 605 #ifdef CONFIG_HOTPLUG_CPU 606 /* Take the work from this (downed) CPU. */ 607 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu) 608 { 609 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 610 struct list_head list; 611 struct work_struct *work; 612 613 spin_lock_irq(&cwq->lock); 614 list_replace_init(&cwq->worklist, &list); 615 616 while (!list_empty(&list)) { 617 printk("Taking work for %s\n", wq->name); 618 work = list_entry(list.next,struct work_struct,entry); 619 list_del(&work->entry); 620 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work); 621 } 622 spin_unlock_irq(&cwq->lock); 623 } 624 625 /* We're holding the cpucontrol mutex here */ 626 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 627 unsigned long action, 628 void *hcpu) 629 { 630 unsigned int hotcpu = (unsigned long)hcpu; 631 struct workqueue_struct *wq; 632 633 switch (action) { 634 case CPU_UP_PREPARE: 635 mutex_lock(&workqueue_mutex); 636 /* Create a new workqueue thread for it. */ 637 list_for_each_entry(wq, &workqueues, list) { 638 if (!create_workqueue_thread(wq, hotcpu)) { 639 printk("workqueue for %i failed\n", hotcpu); 640 return NOTIFY_BAD; 641 } 642 } 643 break; 644 645 case CPU_ONLINE: 646 /* Kick off worker threads. */ 647 list_for_each_entry(wq, &workqueues, list) { 648 struct cpu_workqueue_struct *cwq; 649 650 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu); 651 kthread_bind(cwq->thread, hotcpu); 652 wake_up_process(cwq->thread); 653 } 654 mutex_unlock(&workqueue_mutex); 655 break; 656 657 case CPU_UP_CANCELED: 658 list_for_each_entry(wq, &workqueues, list) { 659 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread) 660 continue; 661 /* Unbind so it can run. */ 662 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread, 663 any_online_cpu(cpu_online_map)); 664 cleanup_workqueue_thread(wq, hotcpu); 665 } 666 mutex_unlock(&workqueue_mutex); 667 break; 668 669 case CPU_DOWN_PREPARE: 670 mutex_lock(&workqueue_mutex); 671 break; 672 673 case CPU_DOWN_FAILED: 674 mutex_unlock(&workqueue_mutex); 675 break; 676 677 case CPU_DEAD: 678 list_for_each_entry(wq, &workqueues, list) 679 cleanup_workqueue_thread(wq, hotcpu); 680 list_for_each_entry(wq, &workqueues, list) 681 take_over_work(wq, hotcpu); 682 mutex_unlock(&workqueue_mutex); 683 break; 684 } 685 686 return NOTIFY_OK; 687 } 688 #endif 689 690 void init_workqueues(void) 691 { 692 singlethread_cpu = first_cpu(cpu_possible_map); 693 hotcpu_notifier(workqueue_cpu_callback, 0); 694 keventd_wq = create_workqueue("events"); 695 BUG_ON(!keventd_wq); 696 } 697 698