1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 #include <linux/lockdep.h> 36 #define CREATE_TRACE_POINTS 37 #include <trace/events/workqueue.h> 38 39 /* 40 * The per-CPU workqueue (if single thread, we always use the first 41 * possible cpu). 42 */ 43 struct cpu_workqueue_struct { 44 45 spinlock_t lock; 46 47 struct list_head worklist; 48 wait_queue_head_t more_work; 49 struct work_struct *current_work; 50 51 struct workqueue_struct *wq; 52 struct task_struct *thread; 53 } ____cacheline_aligned; 54 55 /* 56 * The externally visible workqueue abstraction is an array of 57 * per-CPU workqueues: 58 */ 59 struct workqueue_struct { 60 struct cpu_workqueue_struct *cpu_wq; 61 struct list_head list; 62 const char *name; 63 int singlethread; 64 int freezeable; /* Freeze threads during suspend */ 65 int rt; 66 #ifdef CONFIG_LOCKDEP 67 struct lockdep_map lockdep_map; 68 #endif 69 }; 70 71 #ifdef CONFIG_DEBUG_OBJECTS_WORK 72 73 static struct debug_obj_descr work_debug_descr; 74 75 /* 76 * fixup_init is called when: 77 * - an active object is initialized 78 */ 79 static int work_fixup_init(void *addr, enum debug_obj_state state) 80 { 81 struct work_struct *work = addr; 82 83 switch (state) { 84 case ODEBUG_STATE_ACTIVE: 85 cancel_work_sync(work); 86 debug_object_init(work, &work_debug_descr); 87 return 1; 88 default: 89 return 0; 90 } 91 } 92 93 /* 94 * fixup_activate is called when: 95 * - an active object is activated 96 * - an unknown object is activated (might be a statically initialized object) 97 */ 98 static int work_fixup_activate(void *addr, enum debug_obj_state state) 99 { 100 struct work_struct *work = addr; 101 102 switch (state) { 103 104 case ODEBUG_STATE_NOTAVAILABLE: 105 /* 106 * This is not really a fixup. The work struct was 107 * statically initialized. We just make sure that it 108 * is tracked in the object tracker. 109 */ 110 if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) { 111 debug_object_init(work, &work_debug_descr); 112 debug_object_activate(work, &work_debug_descr); 113 return 0; 114 } 115 WARN_ON_ONCE(1); 116 return 0; 117 118 case ODEBUG_STATE_ACTIVE: 119 WARN_ON(1); 120 121 default: 122 return 0; 123 } 124 } 125 126 /* 127 * fixup_free is called when: 128 * - an active object is freed 129 */ 130 static int work_fixup_free(void *addr, enum debug_obj_state state) 131 { 132 struct work_struct *work = addr; 133 134 switch (state) { 135 case ODEBUG_STATE_ACTIVE: 136 cancel_work_sync(work); 137 debug_object_free(work, &work_debug_descr); 138 return 1; 139 default: 140 return 0; 141 } 142 } 143 144 static struct debug_obj_descr work_debug_descr = { 145 .name = "work_struct", 146 .fixup_init = work_fixup_init, 147 .fixup_activate = work_fixup_activate, 148 .fixup_free = work_fixup_free, 149 }; 150 151 static inline void debug_work_activate(struct work_struct *work) 152 { 153 debug_object_activate(work, &work_debug_descr); 154 } 155 156 static inline void debug_work_deactivate(struct work_struct *work) 157 { 158 debug_object_deactivate(work, &work_debug_descr); 159 } 160 161 void __init_work(struct work_struct *work, int onstack) 162 { 163 if (onstack) 164 debug_object_init_on_stack(work, &work_debug_descr); 165 else 166 debug_object_init(work, &work_debug_descr); 167 } 168 EXPORT_SYMBOL_GPL(__init_work); 169 170 void destroy_work_on_stack(struct work_struct *work) 171 { 172 debug_object_free(work, &work_debug_descr); 173 } 174 EXPORT_SYMBOL_GPL(destroy_work_on_stack); 175 176 #else 177 static inline void debug_work_activate(struct work_struct *work) { } 178 static inline void debug_work_deactivate(struct work_struct *work) { } 179 #endif 180 181 /* Serializes the accesses to the list of workqueues. */ 182 static DEFINE_SPINLOCK(workqueue_lock); 183 static LIST_HEAD(workqueues); 184 185 static int singlethread_cpu __read_mostly; 186 static const struct cpumask *cpu_singlethread_map __read_mostly; 187 /* 188 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD 189 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work 190 * which comes in between can't use for_each_online_cpu(). We could 191 * use cpu_possible_map, the cpumask below is more a documentation 192 * than optimization. 193 */ 194 static cpumask_var_t cpu_populated_map __read_mostly; 195 196 /* If it's single threaded, it isn't in the list of workqueues. */ 197 static inline int is_wq_single_threaded(struct workqueue_struct *wq) 198 { 199 return wq->singlethread; 200 } 201 202 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 203 { 204 return is_wq_single_threaded(wq) 205 ? cpu_singlethread_map : cpu_populated_map; 206 } 207 208 static 209 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) 210 { 211 if (unlikely(is_wq_single_threaded(wq))) 212 cpu = singlethread_cpu; 213 return per_cpu_ptr(wq->cpu_wq, cpu); 214 } 215 216 /* 217 * Set the workqueue on which a work item is to be run 218 * - Must *only* be called if the pending flag is set 219 */ 220 static inline void set_wq_data(struct work_struct *work, 221 struct cpu_workqueue_struct *cwq) 222 { 223 unsigned long new; 224 225 BUG_ON(!work_pending(work)); 226 227 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 228 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 229 atomic_long_set(&work->data, new); 230 } 231 232 static inline 233 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 234 { 235 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 236 } 237 238 static void insert_work(struct cpu_workqueue_struct *cwq, 239 struct work_struct *work, struct list_head *head) 240 { 241 trace_workqueue_insertion(cwq->thread, work); 242 243 set_wq_data(work, cwq); 244 /* 245 * Ensure that we get the right work->data if we see the 246 * result of list_add() below, see try_to_grab_pending(). 247 */ 248 smp_wmb(); 249 list_add_tail(&work->entry, head); 250 wake_up(&cwq->more_work); 251 } 252 253 static void __queue_work(struct cpu_workqueue_struct *cwq, 254 struct work_struct *work) 255 { 256 unsigned long flags; 257 258 debug_work_activate(work); 259 spin_lock_irqsave(&cwq->lock, flags); 260 insert_work(cwq, work, &cwq->worklist); 261 spin_unlock_irqrestore(&cwq->lock, flags); 262 } 263 264 /** 265 * queue_work - queue work on a workqueue 266 * @wq: workqueue to use 267 * @work: work to queue 268 * 269 * Returns 0 if @work was already on a queue, non-zero otherwise. 270 * 271 * We queue the work to the CPU on which it was submitted, but if the CPU dies 272 * it can be processed by another CPU. 273 */ 274 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 275 { 276 int ret; 277 278 ret = queue_work_on(get_cpu(), wq, work); 279 put_cpu(); 280 281 return ret; 282 } 283 EXPORT_SYMBOL_GPL(queue_work); 284 285 /** 286 * queue_work_on - queue work on specific cpu 287 * @cpu: CPU number to execute work on 288 * @wq: workqueue to use 289 * @work: work to queue 290 * 291 * Returns 0 if @work was already on a queue, non-zero otherwise. 292 * 293 * We queue the work to a specific CPU, the caller must ensure it 294 * can't go away. 295 */ 296 int 297 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 298 { 299 int ret = 0; 300 301 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 302 BUG_ON(!list_empty(&work->entry)); 303 __queue_work(wq_per_cpu(wq, cpu), work); 304 ret = 1; 305 } 306 return ret; 307 } 308 EXPORT_SYMBOL_GPL(queue_work_on); 309 310 static void delayed_work_timer_fn(unsigned long __data) 311 { 312 struct delayed_work *dwork = (struct delayed_work *)__data; 313 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 314 struct workqueue_struct *wq = cwq->wq; 315 316 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); 317 } 318 319 /** 320 * queue_delayed_work - queue work on a workqueue after delay 321 * @wq: workqueue to use 322 * @dwork: delayable work to queue 323 * @delay: number of jiffies to wait before queueing 324 * 325 * Returns 0 if @work was already on a queue, non-zero otherwise. 326 */ 327 int queue_delayed_work(struct workqueue_struct *wq, 328 struct delayed_work *dwork, unsigned long delay) 329 { 330 if (delay == 0) 331 return queue_work(wq, &dwork->work); 332 333 return queue_delayed_work_on(-1, wq, dwork, delay); 334 } 335 EXPORT_SYMBOL_GPL(queue_delayed_work); 336 337 /** 338 * queue_delayed_work_on - queue work on specific CPU after delay 339 * @cpu: CPU number to execute work on 340 * @wq: workqueue to use 341 * @dwork: work to queue 342 * @delay: number of jiffies to wait before queueing 343 * 344 * Returns 0 if @work was already on a queue, non-zero otherwise. 345 */ 346 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 347 struct delayed_work *dwork, unsigned long delay) 348 { 349 int ret = 0; 350 struct timer_list *timer = &dwork->timer; 351 struct work_struct *work = &dwork->work; 352 353 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 354 BUG_ON(timer_pending(timer)); 355 BUG_ON(!list_empty(&work->entry)); 356 357 timer_stats_timer_set_start_info(&dwork->timer); 358 359 /* This stores cwq for the moment, for the timer_fn */ 360 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); 361 timer->expires = jiffies + delay; 362 timer->data = (unsigned long)dwork; 363 timer->function = delayed_work_timer_fn; 364 365 if (unlikely(cpu >= 0)) 366 add_timer_on(timer, cpu); 367 else 368 add_timer(timer); 369 ret = 1; 370 } 371 return ret; 372 } 373 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 374 375 static void run_workqueue(struct cpu_workqueue_struct *cwq) 376 { 377 spin_lock_irq(&cwq->lock); 378 while (!list_empty(&cwq->worklist)) { 379 struct work_struct *work = list_entry(cwq->worklist.next, 380 struct work_struct, entry); 381 work_func_t f = work->func; 382 #ifdef CONFIG_LOCKDEP 383 /* 384 * It is permissible to free the struct work_struct 385 * from inside the function that is called from it, 386 * this we need to take into account for lockdep too. 387 * To avoid bogus "held lock freed" warnings as well 388 * as problems when looking into work->lockdep_map, 389 * make a copy and use that here. 390 */ 391 struct lockdep_map lockdep_map = work->lockdep_map; 392 #endif 393 trace_workqueue_execution(cwq->thread, work); 394 debug_work_deactivate(work); 395 cwq->current_work = work; 396 list_del_init(cwq->worklist.next); 397 spin_unlock_irq(&cwq->lock); 398 399 BUG_ON(get_wq_data(work) != cwq); 400 work_clear_pending(work); 401 lock_map_acquire(&cwq->wq->lockdep_map); 402 lock_map_acquire(&lockdep_map); 403 f(work); 404 lock_map_release(&lockdep_map); 405 lock_map_release(&cwq->wq->lockdep_map); 406 407 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 408 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 409 "%s/0x%08x/%d\n", 410 current->comm, preempt_count(), 411 task_pid_nr(current)); 412 printk(KERN_ERR " last function: "); 413 print_symbol("%s\n", (unsigned long)f); 414 debug_show_held_locks(current); 415 dump_stack(); 416 } 417 418 spin_lock_irq(&cwq->lock); 419 cwq->current_work = NULL; 420 } 421 spin_unlock_irq(&cwq->lock); 422 } 423 424 static int worker_thread(void *__cwq) 425 { 426 struct cpu_workqueue_struct *cwq = __cwq; 427 DEFINE_WAIT(wait); 428 429 if (cwq->wq->freezeable) 430 set_freezable(); 431 432 for (;;) { 433 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 434 if (!freezing(current) && 435 !kthread_should_stop() && 436 list_empty(&cwq->worklist)) 437 schedule(); 438 finish_wait(&cwq->more_work, &wait); 439 440 try_to_freeze(); 441 442 if (kthread_should_stop()) 443 break; 444 445 run_workqueue(cwq); 446 } 447 448 return 0; 449 } 450 451 struct wq_barrier { 452 struct work_struct work; 453 struct completion done; 454 }; 455 456 static void wq_barrier_func(struct work_struct *work) 457 { 458 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 459 complete(&barr->done); 460 } 461 462 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 463 struct wq_barrier *barr, struct list_head *head) 464 { 465 /* 466 * debugobject calls are safe here even with cwq->lock locked 467 * as we know for sure that this will not trigger any of the 468 * checks and call back into the fixup functions where we 469 * might deadlock. 470 */ 471 INIT_WORK_ON_STACK(&barr->work, wq_barrier_func); 472 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); 473 474 init_completion(&barr->done); 475 476 debug_work_activate(&barr->work); 477 insert_work(cwq, &barr->work, head); 478 } 479 480 static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 481 { 482 int active = 0; 483 struct wq_barrier barr; 484 485 WARN_ON(cwq->thread == current); 486 487 spin_lock_irq(&cwq->lock); 488 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { 489 insert_wq_barrier(cwq, &barr, &cwq->worklist); 490 active = 1; 491 } 492 spin_unlock_irq(&cwq->lock); 493 494 if (active) { 495 wait_for_completion(&barr.done); 496 destroy_work_on_stack(&barr.work); 497 } 498 499 return active; 500 } 501 502 /** 503 * flush_workqueue - ensure that any scheduled work has run to completion. 504 * @wq: workqueue to flush 505 * 506 * Forces execution of the workqueue and blocks until its completion. 507 * This is typically used in driver shutdown handlers. 508 * 509 * We sleep until all works which were queued on entry have been handled, 510 * but we are not livelocked by new incoming ones. 511 * 512 * This function used to run the workqueues itself. Now we just wait for the 513 * helper threads to do it. 514 */ 515 void flush_workqueue(struct workqueue_struct *wq) 516 { 517 const struct cpumask *cpu_map = wq_cpu_map(wq); 518 int cpu; 519 520 might_sleep(); 521 lock_map_acquire(&wq->lockdep_map); 522 lock_map_release(&wq->lockdep_map); 523 for_each_cpu(cpu, cpu_map) 524 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 525 } 526 EXPORT_SYMBOL_GPL(flush_workqueue); 527 528 /** 529 * flush_work - block until a work_struct's callback has terminated 530 * @work: the work which is to be flushed 531 * 532 * Returns false if @work has already terminated. 533 * 534 * It is expected that, prior to calling flush_work(), the caller has 535 * arranged for the work to not be requeued, otherwise it doesn't make 536 * sense to use this function. 537 */ 538 int flush_work(struct work_struct *work) 539 { 540 struct cpu_workqueue_struct *cwq; 541 struct list_head *prev; 542 struct wq_barrier barr; 543 544 might_sleep(); 545 cwq = get_wq_data(work); 546 if (!cwq) 547 return 0; 548 549 lock_map_acquire(&cwq->wq->lockdep_map); 550 lock_map_release(&cwq->wq->lockdep_map); 551 552 prev = NULL; 553 spin_lock_irq(&cwq->lock); 554 if (!list_empty(&work->entry)) { 555 /* 556 * See the comment near try_to_grab_pending()->smp_rmb(). 557 * If it was re-queued under us we are not going to wait. 558 */ 559 smp_rmb(); 560 if (unlikely(cwq != get_wq_data(work))) 561 goto out; 562 prev = &work->entry; 563 } else { 564 if (cwq->current_work != work) 565 goto out; 566 prev = &cwq->worklist; 567 } 568 insert_wq_barrier(cwq, &barr, prev->next); 569 out: 570 spin_unlock_irq(&cwq->lock); 571 if (!prev) 572 return 0; 573 574 wait_for_completion(&barr.done); 575 destroy_work_on_stack(&barr.work); 576 return 1; 577 } 578 EXPORT_SYMBOL_GPL(flush_work); 579 580 /* 581 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, 582 * so this work can't be re-armed in any way. 583 */ 584 static int try_to_grab_pending(struct work_struct *work) 585 { 586 struct cpu_workqueue_struct *cwq; 587 int ret = -1; 588 589 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) 590 return 0; 591 592 /* 593 * The queueing is in progress, or it is already queued. Try to 594 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 595 */ 596 597 cwq = get_wq_data(work); 598 if (!cwq) 599 return ret; 600 601 spin_lock_irq(&cwq->lock); 602 if (!list_empty(&work->entry)) { 603 /* 604 * This work is queued, but perhaps we locked the wrong cwq. 605 * In that case we must see the new value after rmb(), see 606 * insert_work()->wmb(). 607 */ 608 smp_rmb(); 609 if (cwq == get_wq_data(work)) { 610 debug_work_deactivate(work); 611 list_del_init(&work->entry); 612 ret = 1; 613 } 614 } 615 spin_unlock_irq(&cwq->lock); 616 617 return ret; 618 } 619 620 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 621 struct work_struct *work) 622 { 623 struct wq_barrier barr; 624 int running = 0; 625 626 spin_lock_irq(&cwq->lock); 627 if (unlikely(cwq->current_work == work)) { 628 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 629 running = 1; 630 } 631 spin_unlock_irq(&cwq->lock); 632 633 if (unlikely(running)) { 634 wait_for_completion(&barr.done); 635 destroy_work_on_stack(&barr.work); 636 } 637 } 638 639 static void wait_on_work(struct work_struct *work) 640 { 641 struct cpu_workqueue_struct *cwq; 642 struct workqueue_struct *wq; 643 const struct cpumask *cpu_map; 644 int cpu; 645 646 might_sleep(); 647 648 lock_map_acquire(&work->lockdep_map); 649 lock_map_release(&work->lockdep_map); 650 651 cwq = get_wq_data(work); 652 if (!cwq) 653 return; 654 655 wq = cwq->wq; 656 cpu_map = wq_cpu_map(wq); 657 658 for_each_cpu(cpu, cpu_map) 659 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 660 } 661 662 static int __cancel_work_timer(struct work_struct *work, 663 struct timer_list* timer) 664 { 665 int ret; 666 667 do { 668 ret = (timer && likely(del_timer(timer))); 669 if (!ret) 670 ret = try_to_grab_pending(work); 671 wait_on_work(work); 672 } while (unlikely(ret < 0)); 673 674 work_clear_pending(work); 675 return ret; 676 } 677 678 /** 679 * cancel_work_sync - block until a work_struct's callback has terminated 680 * @work: the work which is to be flushed 681 * 682 * Returns true if @work was pending. 683 * 684 * cancel_work_sync() will cancel the work if it is queued. If the work's 685 * callback appears to be running, cancel_work_sync() will block until it 686 * has completed. 687 * 688 * It is possible to use this function if the work re-queues itself. It can 689 * cancel the work even if it migrates to another workqueue, however in that 690 * case it only guarantees that work->func() has completed on the last queued 691 * workqueue. 692 * 693 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not 694 * pending, otherwise it goes into a busy-wait loop until the timer expires. 695 * 696 * The caller must ensure that workqueue_struct on which this work was last 697 * queued can't be destroyed before this function returns. 698 */ 699 int cancel_work_sync(struct work_struct *work) 700 { 701 return __cancel_work_timer(work, NULL); 702 } 703 EXPORT_SYMBOL_GPL(cancel_work_sync); 704 705 /** 706 * cancel_delayed_work_sync - reliably kill off a delayed work. 707 * @dwork: the delayed work struct 708 * 709 * Returns true if @dwork was pending. 710 * 711 * It is possible to use this function if @dwork rearms itself via queue_work() 712 * or queue_delayed_work(). See also the comment for cancel_work_sync(). 713 */ 714 int cancel_delayed_work_sync(struct delayed_work *dwork) 715 { 716 return __cancel_work_timer(&dwork->work, &dwork->timer); 717 } 718 EXPORT_SYMBOL(cancel_delayed_work_sync); 719 720 static struct workqueue_struct *keventd_wq __read_mostly; 721 722 /** 723 * schedule_work - put work task in global workqueue 724 * @work: job to be done 725 * 726 * Returns zero if @work was already on the kernel-global workqueue and 727 * non-zero otherwise. 728 * 729 * This puts a job in the kernel-global workqueue if it was not already 730 * queued and leaves it in the same position on the kernel-global 731 * workqueue otherwise. 732 */ 733 int schedule_work(struct work_struct *work) 734 { 735 return queue_work(keventd_wq, work); 736 } 737 EXPORT_SYMBOL(schedule_work); 738 739 /* 740 * schedule_work_on - put work task on a specific cpu 741 * @cpu: cpu to put the work task on 742 * @work: job to be done 743 * 744 * This puts a job on a specific cpu 745 */ 746 int schedule_work_on(int cpu, struct work_struct *work) 747 { 748 return queue_work_on(cpu, keventd_wq, work); 749 } 750 EXPORT_SYMBOL(schedule_work_on); 751 752 /** 753 * schedule_delayed_work - put work task in global workqueue after delay 754 * @dwork: job to be done 755 * @delay: number of jiffies to wait or 0 for immediate execution 756 * 757 * After waiting for a given time this puts a job in the kernel-global 758 * workqueue. 759 */ 760 int schedule_delayed_work(struct delayed_work *dwork, 761 unsigned long delay) 762 { 763 return queue_delayed_work(keventd_wq, dwork, delay); 764 } 765 EXPORT_SYMBOL(schedule_delayed_work); 766 767 /** 768 * flush_delayed_work - block until a dwork_struct's callback has terminated 769 * @dwork: the delayed work which is to be flushed 770 * 771 * Any timeout is cancelled, and any pending work is run immediately. 772 */ 773 void flush_delayed_work(struct delayed_work *dwork) 774 { 775 if (del_timer_sync(&dwork->timer)) { 776 struct cpu_workqueue_struct *cwq; 777 cwq = wq_per_cpu(keventd_wq, get_cpu()); 778 __queue_work(cwq, &dwork->work); 779 put_cpu(); 780 } 781 flush_work(&dwork->work); 782 } 783 EXPORT_SYMBOL(flush_delayed_work); 784 785 /** 786 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 787 * @cpu: cpu to use 788 * @dwork: job to be done 789 * @delay: number of jiffies to wait 790 * 791 * After waiting for a given time this puts a job in the kernel-global 792 * workqueue on the specified CPU. 793 */ 794 int schedule_delayed_work_on(int cpu, 795 struct delayed_work *dwork, unsigned long delay) 796 { 797 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 798 } 799 EXPORT_SYMBOL(schedule_delayed_work_on); 800 801 /** 802 * schedule_on_each_cpu - call a function on each online CPU from keventd 803 * @func: the function to call 804 * 805 * Returns zero on success. 806 * Returns -ve errno on failure. 807 * 808 * schedule_on_each_cpu() is very slow. 809 */ 810 int schedule_on_each_cpu(work_func_t func) 811 { 812 int cpu; 813 int orig = -1; 814 struct work_struct *works; 815 816 works = alloc_percpu(struct work_struct); 817 if (!works) 818 return -ENOMEM; 819 820 get_online_cpus(); 821 822 /* 823 * When running in keventd don't schedule a work item on 824 * itself. Can just call directly because the work queue is 825 * already bound. This also is faster. 826 */ 827 if (current_is_keventd()) 828 orig = raw_smp_processor_id(); 829 830 for_each_online_cpu(cpu) { 831 struct work_struct *work = per_cpu_ptr(works, cpu); 832 833 INIT_WORK(work, func); 834 if (cpu != orig) 835 schedule_work_on(cpu, work); 836 } 837 if (orig >= 0) 838 func(per_cpu_ptr(works, orig)); 839 840 for_each_online_cpu(cpu) 841 flush_work(per_cpu_ptr(works, cpu)); 842 843 put_online_cpus(); 844 free_percpu(works); 845 return 0; 846 } 847 848 void flush_scheduled_work(void) 849 { 850 flush_workqueue(keventd_wq); 851 } 852 EXPORT_SYMBOL(flush_scheduled_work); 853 854 /** 855 * execute_in_process_context - reliably execute the routine with user context 856 * @fn: the function to execute 857 * @ew: guaranteed storage for the execute work structure (must 858 * be available when the work executes) 859 * 860 * Executes the function immediately if process context is available, 861 * otherwise schedules the function for delayed execution. 862 * 863 * Returns: 0 - function was executed 864 * 1 - function was scheduled for execution 865 */ 866 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 867 { 868 if (!in_interrupt()) { 869 fn(&ew->work); 870 return 0; 871 } 872 873 INIT_WORK(&ew->work, fn); 874 schedule_work(&ew->work); 875 876 return 1; 877 } 878 EXPORT_SYMBOL_GPL(execute_in_process_context); 879 880 int keventd_up(void) 881 { 882 return keventd_wq != NULL; 883 } 884 885 int current_is_keventd(void) 886 { 887 struct cpu_workqueue_struct *cwq; 888 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 889 int ret = 0; 890 891 BUG_ON(!keventd_wq); 892 893 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 894 if (current == cwq->thread) 895 ret = 1; 896 897 return ret; 898 899 } 900 901 static struct cpu_workqueue_struct * 902 init_cpu_workqueue(struct workqueue_struct *wq, int cpu) 903 { 904 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 905 906 cwq->wq = wq; 907 spin_lock_init(&cwq->lock); 908 INIT_LIST_HEAD(&cwq->worklist); 909 init_waitqueue_head(&cwq->more_work); 910 911 return cwq; 912 } 913 914 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 915 { 916 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; 917 struct workqueue_struct *wq = cwq->wq; 918 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; 919 struct task_struct *p; 920 921 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 922 /* 923 * Nobody can add the work_struct to this cwq, 924 * if (caller is __create_workqueue) 925 * nobody should see this wq 926 * else // caller is CPU_UP_PREPARE 927 * cpu is not on cpu_online_map 928 * so we can abort safely. 929 */ 930 if (IS_ERR(p)) 931 return PTR_ERR(p); 932 if (cwq->wq->rt) 933 sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m); 934 cwq->thread = p; 935 936 trace_workqueue_creation(cwq->thread, cpu); 937 938 return 0; 939 } 940 941 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 942 { 943 struct task_struct *p = cwq->thread; 944 945 if (p != NULL) { 946 if (cpu >= 0) 947 kthread_bind(p, cpu); 948 wake_up_process(p); 949 } 950 } 951 952 struct workqueue_struct *__create_workqueue_key(const char *name, 953 int singlethread, 954 int freezeable, 955 int rt, 956 struct lock_class_key *key, 957 const char *lock_name) 958 { 959 struct workqueue_struct *wq; 960 struct cpu_workqueue_struct *cwq; 961 int err = 0, cpu; 962 963 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 964 if (!wq) 965 return NULL; 966 967 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 968 if (!wq->cpu_wq) { 969 kfree(wq); 970 return NULL; 971 } 972 973 wq->name = name; 974 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 975 wq->singlethread = singlethread; 976 wq->freezeable = freezeable; 977 wq->rt = rt; 978 INIT_LIST_HEAD(&wq->list); 979 980 if (singlethread) { 981 cwq = init_cpu_workqueue(wq, singlethread_cpu); 982 err = create_workqueue_thread(cwq, singlethread_cpu); 983 start_workqueue_thread(cwq, -1); 984 } else { 985 cpu_maps_update_begin(); 986 /* 987 * We must place this wq on list even if the code below fails. 988 * cpu_down(cpu) can remove cpu from cpu_populated_map before 989 * destroy_workqueue() takes the lock, in that case we leak 990 * cwq[cpu]->thread. 991 */ 992 spin_lock(&workqueue_lock); 993 list_add(&wq->list, &workqueues); 994 spin_unlock(&workqueue_lock); 995 /* 996 * We must initialize cwqs for each possible cpu even if we 997 * are going to call destroy_workqueue() finally. Otherwise 998 * cpu_up() can hit the uninitialized cwq once we drop the 999 * lock. 1000 */ 1001 for_each_possible_cpu(cpu) { 1002 cwq = init_cpu_workqueue(wq, cpu); 1003 if (err || !cpu_online(cpu)) 1004 continue; 1005 err = create_workqueue_thread(cwq, cpu); 1006 start_workqueue_thread(cwq, cpu); 1007 } 1008 cpu_maps_update_done(); 1009 } 1010 1011 if (err) { 1012 destroy_workqueue(wq); 1013 wq = NULL; 1014 } 1015 return wq; 1016 } 1017 EXPORT_SYMBOL_GPL(__create_workqueue_key); 1018 1019 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) 1020 { 1021 /* 1022 * Our caller is either destroy_workqueue() or CPU_POST_DEAD, 1023 * cpu_add_remove_lock protects cwq->thread. 1024 */ 1025 if (cwq->thread == NULL) 1026 return; 1027 1028 lock_map_acquire(&cwq->wq->lockdep_map); 1029 lock_map_release(&cwq->wq->lockdep_map); 1030 1031 flush_cpu_workqueue(cwq); 1032 /* 1033 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, 1034 * a concurrent flush_workqueue() can insert a barrier after us. 1035 * However, in that case run_workqueue() won't return and check 1036 * kthread_should_stop() until it flushes all work_struct's. 1037 * When ->worklist becomes empty it is safe to exit because no 1038 * more work_structs can be queued on this cwq: flush_workqueue 1039 * checks list_empty(), and a "normal" queue_work() can't use 1040 * a dead CPU. 1041 */ 1042 trace_workqueue_destruction(cwq->thread); 1043 kthread_stop(cwq->thread); 1044 cwq->thread = NULL; 1045 } 1046 1047 /** 1048 * destroy_workqueue - safely terminate a workqueue 1049 * @wq: target workqueue 1050 * 1051 * Safely destroy a workqueue. All work currently pending will be done first. 1052 */ 1053 void destroy_workqueue(struct workqueue_struct *wq) 1054 { 1055 const struct cpumask *cpu_map = wq_cpu_map(wq); 1056 int cpu; 1057 1058 cpu_maps_update_begin(); 1059 spin_lock(&workqueue_lock); 1060 list_del(&wq->list); 1061 spin_unlock(&workqueue_lock); 1062 1063 for_each_cpu(cpu, cpu_map) 1064 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 1065 cpu_maps_update_done(); 1066 1067 free_percpu(wq->cpu_wq); 1068 kfree(wq); 1069 } 1070 EXPORT_SYMBOL_GPL(destroy_workqueue); 1071 1072 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 1073 unsigned long action, 1074 void *hcpu) 1075 { 1076 unsigned int cpu = (unsigned long)hcpu; 1077 struct cpu_workqueue_struct *cwq; 1078 struct workqueue_struct *wq; 1079 int ret = NOTIFY_OK; 1080 1081 action &= ~CPU_TASKS_FROZEN; 1082 1083 switch (action) { 1084 case CPU_UP_PREPARE: 1085 cpumask_set_cpu(cpu, cpu_populated_map); 1086 } 1087 undo: 1088 list_for_each_entry(wq, &workqueues, list) { 1089 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 1090 1091 switch (action) { 1092 case CPU_UP_PREPARE: 1093 if (!create_workqueue_thread(cwq, cpu)) 1094 break; 1095 printk(KERN_ERR "workqueue [%s] for %i failed\n", 1096 wq->name, cpu); 1097 action = CPU_UP_CANCELED; 1098 ret = NOTIFY_BAD; 1099 goto undo; 1100 1101 case CPU_ONLINE: 1102 start_workqueue_thread(cwq, cpu); 1103 break; 1104 1105 case CPU_UP_CANCELED: 1106 start_workqueue_thread(cwq, -1); 1107 case CPU_POST_DEAD: 1108 cleanup_workqueue_thread(cwq); 1109 break; 1110 } 1111 } 1112 1113 switch (action) { 1114 case CPU_UP_CANCELED: 1115 case CPU_POST_DEAD: 1116 cpumask_clear_cpu(cpu, cpu_populated_map); 1117 } 1118 1119 return ret; 1120 } 1121 1122 #ifdef CONFIG_SMP 1123 1124 struct work_for_cpu { 1125 struct completion completion; 1126 long (*fn)(void *); 1127 void *arg; 1128 long ret; 1129 }; 1130 1131 static int do_work_for_cpu(void *_wfc) 1132 { 1133 struct work_for_cpu *wfc = _wfc; 1134 wfc->ret = wfc->fn(wfc->arg); 1135 complete(&wfc->completion); 1136 return 0; 1137 } 1138 1139 /** 1140 * work_on_cpu - run a function in user context on a particular cpu 1141 * @cpu: the cpu to run on 1142 * @fn: the function to run 1143 * @arg: the function arg 1144 * 1145 * This will return the value @fn returns. 1146 * It is up to the caller to ensure that the cpu doesn't go offline. 1147 * The caller must not hold any locks which would prevent @fn from completing. 1148 */ 1149 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 1150 { 1151 struct task_struct *sub_thread; 1152 struct work_for_cpu wfc = { 1153 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion), 1154 .fn = fn, 1155 .arg = arg, 1156 }; 1157 1158 sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu"); 1159 if (IS_ERR(sub_thread)) 1160 return PTR_ERR(sub_thread); 1161 kthread_bind(sub_thread, cpu); 1162 wake_up_process(sub_thread); 1163 wait_for_completion(&wfc.completion); 1164 return wfc.ret; 1165 } 1166 EXPORT_SYMBOL_GPL(work_on_cpu); 1167 #endif /* CONFIG_SMP */ 1168 1169 void __init init_workqueues(void) 1170 { 1171 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 1172 1173 cpumask_copy(cpu_populated_map, cpu_online_mask); 1174 singlethread_cpu = cpumask_first(cpu_possible_mask); 1175 cpu_singlethread_map = cpumask_of(singlethread_cpu); 1176 hotcpu_notifier(workqueue_cpu_callback, 0); 1177 keventd_wq = create_workqueue("events"); 1178 BUG_ON(!keventd_wq); 1179 } 1180