1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 #include <linux/lockdep.h> 36 37 /* 38 * The per-CPU workqueue (if single thread, we always use the first 39 * possible cpu). 40 */ 41 struct cpu_workqueue_struct { 42 43 spinlock_t lock; 44 45 struct list_head worklist; 46 wait_queue_head_t more_work; 47 struct work_struct *current_work; 48 49 struct workqueue_struct *wq; 50 struct task_struct *thread; 51 52 int run_depth; /* Detect run_workqueue() recursion depth */ 53 } ____cacheline_aligned; 54 55 /* 56 * The externally visible workqueue abstraction is an array of 57 * per-CPU workqueues: 58 */ 59 struct workqueue_struct { 60 struct cpu_workqueue_struct *cpu_wq; 61 struct list_head list; 62 const char *name; 63 int singlethread; 64 int freezeable; /* Freeze threads during suspend */ 65 int rt; 66 #ifdef CONFIG_LOCKDEP 67 struct lockdep_map lockdep_map; 68 #endif 69 }; 70 71 /* Serializes the accesses to the list of workqueues. */ 72 static DEFINE_SPINLOCK(workqueue_lock); 73 static LIST_HEAD(workqueues); 74 75 static int singlethread_cpu __read_mostly; 76 static const struct cpumask *cpu_singlethread_map __read_mostly; 77 /* 78 * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD 79 * flushes cwq->worklist. This means that flush_workqueue/wait_on_work 80 * which comes in between can't use for_each_online_cpu(). We could 81 * use cpu_possible_map, the cpumask below is more a documentation 82 * than optimization. 83 */ 84 static cpumask_var_t cpu_populated_map __read_mostly; 85 86 /* If it's single threaded, it isn't in the list of workqueues. */ 87 static inline int is_wq_single_threaded(struct workqueue_struct *wq) 88 { 89 return wq->singlethread; 90 } 91 92 static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq) 93 { 94 return is_wq_single_threaded(wq) 95 ? cpu_singlethread_map : cpu_populated_map; 96 } 97 98 static 99 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) 100 { 101 if (unlikely(is_wq_single_threaded(wq))) 102 cpu = singlethread_cpu; 103 return per_cpu_ptr(wq->cpu_wq, cpu); 104 } 105 106 /* 107 * Set the workqueue on which a work item is to be run 108 * - Must *only* be called if the pending flag is set 109 */ 110 static inline void set_wq_data(struct work_struct *work, 111 struct cpu_workqueue_struct *cwq) 112 { 113 unsigned long new; 114 115 BUG_ON(!work_pending(work)); 116 117 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 118 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 119 atomic_long_set(&work->data, new); 120 } 121 122 static inline 123 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 124 { 125 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 126 } 127 128 static void insert_work(struct cpu_workqueue_struct *cwq, 129 struct work_struct *work, struct list_head *head) 130 { 131 set_wq_data(work, cwq); 132 /* 133 * Ensure that we get the right work->data if we see the 134 * result of list_add() below, see try_to_grab_pending(). 135 */ 136 smp_wmb(); 137 list_add_tail(&work->entry, head); 138 wake_up(&cwq->more_work); 139 } 140 141 static void __queue_work(struct cpu_workqueue_struct *cwq, 142 struct work_struct *work) 143 { 144 unsigned long flags; 145 146 spin_lock_irqsave(&cwq->lock, flags); 147 insert_work(cwq, work, &cwq->worklist); 148 spin_unlock_irqrestore(&cwq->lock, flags); 149 } 150 151 /** 152 * queue_work - queue work on a workqueue 153 * @wq: workqueue to use 154 * @work: work to queue 155 * 156 * Returns 0 if @work was already on a queue, non-zero otherwise. 157 * 158 * We queue the work to the CPU on which it was submitted, but if the CPU dies 159 * it can be processed by another CPU. 160 */ 161 int queue_work(struct workqueue_struct *wq, struct work_struct *work) 162 { 163 int ret; 164 165 ret = queue_work_on(get_cpu(), wq, work); 166 put_cpu(); 167 168 return ret; 169 } 170 EXPORT_SYMBOL_GPL(queue_work); 171 172 /** 173 * queue_work_on - queue work on specific cpu 174 * @cpu: CPU number to execute work on 175 * @wq: workqueue to use 176 * @work: work to queue 177 * 178 * Returns 0 if @work was already on a queue, non-zero otherwise. 179 * 180 * We queue the work to a specific CPU, the caller must ensure it 181 * can't go away. 182 */ 183 int 184 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 185 { 186 int ret = 0; 187 188 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 189 BUG_ON(!list_empty(&work->entry)); 190 __queue_work(wq_per_cpu(wq, cpu), work); 191 ret = 1; 192 } 193 return ret; 194 } 195 EXPORT_SYMBOL_GPL(queue_work_on); 196 197 static void delayed_work_timer_fn(unsigned long __data) 198 { 199 struct delayed_work *dwork = (struct delayed_work *)__data; 200 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 201 struct workqueue_struct *wq = cwq->wq; 202 203 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); 204 } 205 206 /** 207 * queue_delayed_work - queue work on a workqueue after delay 208 * @wq: workqueue to use 209 * @dwork: delayable work to queue 210 * @delay: number of jiffies to wait before queueing 211 * 212 * Returns 0 if @work was already on a queue, non-zero otherwise. 213 */ 214 int queue_delayed_work(struct workqueue_struct *wq, 215 struct delayed_work *dwork, unsigned long delay) 216 { 217 if (delay == 0) 218 return queue_work(wq, &dwork->work); 219 220 return queue_delayed_work_on(-1, wq, dwork, delay); 221 } 222 EXPORT_SYMBOL_GPL(queue_delayed_work); 223 224 /** 225 * queue_delayed_work_on - queue work on specific CPU after delay 226 * @cpu: CPU number to execute work on 227 * @wq: workqueue to use 228 * @dwork: work to queue 229 * @delay: number of jiffies to wait before queueing 230 * 231 * Returns 0 if @work was already on a queue, non-zero otherwise. 232 */ 233 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 234 struct delayed_work *dwork, unsigned long delay) 235 { 236 int ret = 0; 237 struct timer_list *timer = &dwork->timer; 238 struct work_struct *work = &dwork->work; 239 240 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 241 BUG_ON(timer_pending(timer)); 242 BUG_ON(!list_empty(&work->entry)); 243 244 timer_stats_timer_set_start_info(&dwork->timer); 245 246 /* This stores cwq for the moment, for the timer_fn */ 247 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); 248 timer->expires = jiffies + delay; 249 timer->data = (unsigned long)dwork; 250 timer->function = delayed_work_timer_fn; 251 252 if (unlikely(cpu >= 0)) 253 add_timer_on(timer, cpu); 254 else 255 add_timer(timer); 256 ret = 1; 257 } 258 return ret; 259 } 260 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 261 262 static void run_workqueue(struct cpu_workqueue_struct *cwq) 263 { 264 spin_lock_irq(&cwq->lock); 265 cwq->run_depth++; 266 if (cwq->run_depth > 3) { 267 /* morton gets to eat his hat */ 268 printk("%s: recursion depth exceeded: %d\n", 269 __func__, cwq->run_depth); 270 dump_stack(); 271 } 272 while (!list_empty(&cwq->worklist)) { 273 struct work_struct *work = list_entry(cwq->worklist.next, 274 struct work_struct, entry); 275 work_func_t f = work->func; 276 #ifdef CONFIG_LOCKDEP 277 /* 278 * It is permissible to free the struct work_struct 279 * from inside the function that is called from it, 280 * this we need to take into account for lockdep too. 281 * To avoid bogus "held lock freed" warnings as well 282 * as problems when looking into work->lockdep_map, 283 * make a copy and use that here. 284 */ 285 struct lockdep_map lockdep_map = work->lockdep_map; 286 #endif 287 288 cwq->current_work = work; 289 list_del_init(cwq->worklist.next); 290 spin_unlock_irq(&cwq->lock); 291 292 BUG_ON(get_wq_data(work) != cwq); 293 work_clear_pending(work); 294 lock_map_acquire(&cwq->wq->lockdep_map); 295 lock_map_acquire(&lockdep_map); 296 f(work); 297 lock_map_release(&lockdep_map); 298 lock_map_release(&cwq->wq->lockdep_map); 299 300 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 301 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 302 "%s/0x%08x/%d\n", 303 current->comm, preempt_count(), 304 task_pid_nr(current)); 305 printk(KERN_ERR " last function: "); 306 print_symbol("%s\n", (unsigned long)f); 307 debug_show_held_locks(current); 308 dump_stack(); 309 } 310 311 spin_lock_irq(&cwq->lock); 312 cwq->current_work = NULL; 313 } 314 cwq->run_depth--; 315 spin_unlock_irq(&cwq->lock); 316 } 317 318 static int worker_thread(void *__cwq) 319 { 320 struct cpu_workqueue_struct *cwq = __cwq; 321 DEFINE_WAIT(wait); 322 323 if (cwq->wq->freezeable) 324 set_freezable(); 325 326 set_user_nice(current, -5); 327 328 for (;;) { 329 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 330 if (!freezing(current) && 331 !kthread_should_stop() && 332 list_empty(&cwq->worklist)) 333 schedule(); 334 finish_wait(&cwq->more_work, &wait); 335 336 try_to_freeze(); 337 338 if (kthread_should_stop()) 339 break; 340 341 run_workqueue(cwq); 342 } 343 344 return 0; 345 } 346 347 struct wq_barrier { 348 struct work_struct work; 349 struct completion done; 350 }; 351 352 static void wq_barrier_func(struct work_struct *work) 353 { 354 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 355 complete(&barr->done); 356 } 357 358 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 359 struct wq_barrier *barr, struct list_head *head) 360 { 361 INIT_WORK(&barr->work, wq_barrier_func); 362 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); 363 364 init_completion(&barr->done); 365 366 insert_work(cwq, &barr->work, head); 367 } 368 369 static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 370 { 371 int active; 372 373 if (cwq->thread == current) { 374 /* 375 * Probably keventd trying to flush its own queue. So simply run 376 * it by hand rather than deadlocking. 377 */ 378 run_workqueue(cwq); 379 active = 1; 380 } else { 381 struct wq_barrier barr; 382 383 active = 0; 384 spin_lock_irq(&cwq->lock); 385 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { 386 insert_wq_barrier(cwq, &barr, &cwq->worklist); 387 active = 1; 388 } 389 spin_unlock_irq(&cwq->lock); 390 391 if (active) 392 wait_for_completion(&barr.done); 393 } 394 395 return active; 396 } 397 398 /** 399 * flush_workqueue - ensure that any scheduled work has run to completion. 400 * @wq: workqueue to flush 401 * 402 * Forces execution of the workqueue and blocks until its completion. 403 * This is typically used in driver shutdown handlers. 404 * 405 * We sleep until all works which were queued on entry have been handled, 406 * but we are not livelocked by new incoming ones. 407 * 408 * This function used to run the workqueues itself. Now we just wait for the 409 * helper threads to do it. 410 */ 411 void flush_workqueue(struct workqueue_struct *wq) 412 { 413 const struct cpumask *cpu_map = wq_cpu_map(wq); 414 int cpu; 415 416 might_sleep(); 417 lock_map_acquire(&wq->lockdep_map); 418 lock_map_release(&wq->lockdep_map); 419 for_each_cpu(cpu, cpu_map) 420 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 421 } 422 EXPORT_SYMBOL_GPL(flush_workqueue); 423 424 /** 425 * flush_work - block until a work_struct's callback has terminated 426 * @work: the work which is to be flushed 427 * 428 * Returns false if @work has already terminated. 429 * 430 * It is expected that, prior to calling flush_work(), the caller has 431 * arranged for the work to not be requeued, otherwise it doesn't make 432 * sense to use this function. 433 */ 434 int flush_work(struct work_struct *work) 435 { 436 struct cpu_workqueue_struct *cwq; 437 struct list_head *prev; 438 struct wq_barrier barr; 439 440 might_sleep(); 441 cwq = get_wq_data(work); 442 if (!cwq) 443 return 0; 444 445 lock_map_acquire(&cwq->wq->lockdep_map); 446 lock_map_release(&cwq->wq->lockdep_map); 447 448 prev = NULL; 449 spin_lock_irq(&cwq->lock); 450 if (!list_empty(&work->entry)) { 451 /* 452 * See the comment near try_to_grab_pending()->smp_rmb(). 453 * If it was re-queued under us we are not going to wait. 454 */ 455 smp_rmb(); 456 if (unlikely(cwq != get_wq_data(work))) 457 goto out; 458 prev = &work->entry; 459 } else { 460 if (cwq->current_work != work) 461 goto out; 462 prev = &cwq->worklist; 463 } 464 insert_wq_barrier(cwq, &barr, prev->next); 465 out: 466 spin_unlock_irq(&cwq->lock); 467 if (!prev) 468 return 0; 469 470 wait_for_completion(&barr.done); 471 return 1; 472 } 473 EXPORT_SYMBOL_GPL(flush_work); 474 475 /* 476 * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit, 477 * so this work can't be re-armed in any way. 478 */ 479 static int try_to_grab_pending(struct work_struct *work) 480 { 481 struct cpu_workqueue_struct *cwq; 482 int ret = -1; 483 484 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) 485 return 0; 486 487 /* 488 * The queueing is in progress, or it is already queued. Try to 489 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 490 */ 491 492 cwq = get_wq_data(work); 493 if (!cwq) 494 return ret; 495 496 spin_lock_irq(&cwq->lock); 497 if (!list_empty(&work->entry)) { 498 /* 499 * This work is queued, but perhaps we locked the wrong cwq. 500 * In that case we must see the new value after rmb(), see 501 * insert_work()->wmb(). 502 */ 503 smp_rmb(); 504 if (cwq == get_wq_data(work)) { 505 list_del_init(&work->entry); 506 ret = 1; 507 } 508 } 509 spin_unlock_irq(&cwq->lock); 510 511 return ret; 512 } 513 514 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 515 struct work_struct *work) 516 { 517 struct wq_barrier barr; 518 int running = 0; 519 520 spin_lock_irq(&cwq->lock); 521 if (unlikely(cwq->current_work == work)) { 522 insert_wq_barrier(cwq, &barr, cwq->worklist.next); 523 running = 1; 524 } 525 spin_unlock_irq(&cwq->lock); 526 527 if (unlikely(running)) 528 wait_for_completion(&barr.done); 529 } 530 531 static void wait_on_work(struct work_struct *work) 532 { 533 struct cpu_workqueue_struct *cwq; 534 struct workqueue_struct *wq; 535 const struct cpumask *cpu_map; 536 int cpu; 537 538 might_sleep(); 539 540 lock_map_acquire(&work->lockdep_map); 541 lock_map_release(&work->lockdep_map); 542 543 cwq = get_wq_data(work); 544 if (!cwq) 545 return; 546 547 wq = cwq->wq; 548 cpu_map = wq_cpu_map(wq); 549 550 for_each_cpu(cpu, cpu_map) 551 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 552 } 553 554 static int __cancel_work_timer(struct work_struct *work, 555 struct timer_list* timer) 556 { 557 int ret; 558 559 do { 560 ret = (timer && likely(del_timer(timer))); 561 if (!ret) 562 ret = try_to_grab_pending(work); 563 wait_on_work(work); 564 } while (unlikely(ret < 0)); 565 566 work_clear_pending(work); 567 return ret; 568 } 569 570 /** 571 * cancel_work_sync - block until a work_struct's callback has terminated 572 * @work: the work which is to be flushed 573 * 574 * Returns true if @work was pending. 575 * 576 * cancel_work_sync() will cancel the work if it is queued. If the work's 577 * callback appears to be running, cancel_work_sync() will block until it 578 * has completed. 579 * 580 * It is possible to use this function if the work re-queues itself. It can 581 * cancel the work even if it migrates to another workqueue, however in that 582 * case it only guarantees that work->func() has completed on the last queued 583 * workqueue. 584 * 585 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not 586 * pending, otherwise it goes into a busy-wait loop until the timer expires. 587 * 588 * The caller must ensure that workqueue_struct on which this work was last 589 * queued can't be destroyed before this function returns. 590 */ 591 int cancel_work_sync(struct work_struct *work) 592 { 593 return __cancel_work_timer(work, NULL); 594 } 595 EXPORT_SYMBOL_GPL(cancel_work_sync); 596 597 /** 598 * cancel_delayed_work_sync - reliably kill off a delayed work. 599 * @dwork: the delayed work struct 600 * 601 * Returns true if @dwork was pending. 602 * 603 * It is possible to use this function if @dwork rearms itself via queue_work() 604 * or queue_delayed_work(). See also the comment for cancel_work_sync(). 605 */ 606 int cancel_delayed_work_sync(struct delayed_work *dwork) 607 { 608 return __cancel_work_timer(&dwork->work, &dwork->timer); 609 } 610 EXPORT_SYMBOL(cancel_delayed_work_sync); 611 612 static struct workqueue_struct *keventd_wq __read_mostly; 613 614 /** 615 * schedule_work - put work task in global workqueue 616 * @work: job to be done 617 * 618 * This puts a job in the kernel-global workqueue. 619 */ 620 int schedule_work(struct work_struct *work) 621 { 622 return queue_work(keventd_wq, work); 623 } 624 EXPORT_SYMBOL(schedule_work); 625 626 /* 627 * schedule_work_on - put work task on a specific cpu 628 * @cpu: cpu to put the work task on 629 * @work: job to be done 630 * 631 * This puts a job on a specific cpu 632 */ 633 int schedule_work_on(int cpu, struct work_struct *work) 634 { 635 return queue_work_on(cpu, keventd_wq, work); 636 } 637 EXPORT_SYMBOL(schedule_work_on); 638 639 /** 640 * schedule_delayed_work - put work task in global workqueue after delay 641 * @dwork: job to be done 642 * @delay: number of jiffies to wait or 0 for immediate execution 643 * 644 * After waiting for a given time this puts a job in the kernel-global 645 * workqueue. 646 */ 647 int schedule_delayed_work(struct delayed_work *dwork, 648 unsigned long delay) 649 { 650 return queue_delayed_work(keventd_wq, dwork, delay); 651 } 652 EXPORT_SYMBOL(schedule_delayed_work); 653 654 /** 655 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 656 * @cpu: cpu to use 657 * @dwork: job to be done 658 * @delay: number of jiffies to wait 659 * 660 * After waiting for a given time this puts a job in the kernel-global 661 * workqueue on the specified CPU. 662 */ 663 int schedule_delayed_work_on(int cpu, 664 struct delayed_work *dwork, unsigned long delay) 665 { 666 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 667 } 668 EXPORT_SYMBOL(schedule_delayed_work_on); 669 670 /** 671 * schedule_on_each_cpu - call a function on each online CPU from keventd 672 * @func: the function to call 673 * 674 * Returns zero on success. 675 * Returns -ve errno on failure. 676 * 677 * schedule_on_each_cpu() is very slow. 678 */ 679 int schedule_on_each_cpu(work_func_t func) 680 { 681 int cpu; 682 struct work_struct *works; 683 684 works = alloc_percpu(struct work_struct); 685 if (!works) 686 return -ENOMEM; 687 688 get_online_cpus(); 689 for_each_online_cpu(cpu) { 690 struct work_struct *work = per_cpu_ptr(works, cpu); 691 692 INIT_WORK(work, func); 693 schedule_work_on(cpu, work); 694 } 695 for_each_online_cpu(cpu) 696 flush_work(per_cpu_ptr(works, cpu)); 697 put_online_cpus(); 698 free_percpu(works); 699 return 0; 700 } 701 702 void flush_scheduled_work(void) 703 { 704 flush_workqueue(keventd_wq); 705 } 706 EXPORT_SYMBOL(flush_scheduled_work); 707 708 /** 709 * execute_in_process_context - reliably execute the routine with user context 710 * @fn: the function to execute 711 * @ew: guaranteed storage for the execute work structure (must 712 * be available when the work executes) 713 * 714 * Executes the function immediately if process context is available, 715 * otherwise schedules the function for delayed execution. 716 * 717 * Returns: 0 - function was executed 718 * 1 - function was scheduled for execution 719 */ 720 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 721 { 722 if (!in_interrupt()) { 723 fn(&ew->work); 724 return 0; 725 } 726 727 INIT_WORK(&ew->work, fn); 728 schedule_work(&ew->work); 729 730 return 1; 731 } 732 EXPORT_SYMBOL_GPL(execute_in_process_context); 733 734 int keventd_up(void) 735 { 736 return keventd_wq != NULL; 737 } 738 739 int current_is_keventd(void) 740 { 741 struct cpu_workqueue_struct *cwq; 742 int cpu = raw_smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 743 int ret = 0; 744 745 BUG_ON(!keventd_wq); 746 747 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 748 if (current == cwq->thread) 749 ret = 1; 750 751 return ret; 752 753 } 754 755 static struct cpu_workqueue_struct * 756 init_cpu_workqueue(struct workqueue_struct *wq, int cpu) 757 { 758 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 759 760 cwq->wq = wq; 761 spin_lock_init(&cwq->lock); 762 INIT_LIST_HEAD(&cwq->worklist); 763 init_waitqueue_head(&cwq->more_work); 764 765 return cwq; 766 } 767 768 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 769 { 770 struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 }; 771 struct workqueue_struct *wq = cwq->wq; 772 const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d"; 773 struct task_struct *p; 774 775 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 776 /* 777 * Nobody can add the work_struct to this cwq, 778 * if (caller is __create_workqueue) 779 * nobody should see this wq 780 * else // caller is CPU_UP_PREPARE 781 * cpu is not on cpu_online_map 782 * so we can abort safely. 783 */ 784 if (IS_ERR(p)) 785 return PTR_ERR(p); 786 if (cwq->wq->rt) 787 sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m); 788 cwq->thread = p; 789 790 return 0; 791 } 792 793 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 794 { 795 struct task_struct *p = cwq->thread; 796 797 if (p != NULL) { 798 if (cpu >= 0) 799 kthread_bind(p, cpu); 800 wake_up_process(p); 801 } 802 } 803 804 struct workqueue_struct *__create_workqueue_key(const char *name, 805 int singlethread, 806 int freezeable, 807 int rt, 808 struct lock_class_key *key, 809 const char *lock_name) 810 { 811 struct workqueue_struct *wq; 812 struct cpu_workqueue_struct *cwq; 813 int err = 0, cpu; 814 815 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 816 if (!wq) 817 return NULL; 818 819 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 820 if (!wq->cpu_wq) { 821 kfree(wq); 822 return NULL; 823 } 824 825 wq->name = name; 826 lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 827 wq->singlethread = singlethread; 828 wq->freezeable = freezeable; 829 wq->rt = rt; 830 INIT_LIST_HEAD(&wq->list); 831 832 if (singlethread) { 833 cwq = init_cpu_workqueue(wq, singlethread_cpu); 834 err = create_workqueue_thread(cwq, singlethread_cpu); 835 start_workqueue_thread(cwq, -1); 836 } else { 837 cpu_maps_update_begin(); 838 /* 839 * We must place this wq on list even if the code below fails. 840 * cpu_down(cpu) can remove cpu from cpu_populated_map before 841 * destroy_workqueue() takes the lock, in that case we leak 842 * cwq[cpu]->thread. 843 */ 844 spin_lock(&workqueue_lock); 845 list_add(&wq->list, &workqueues); 846 spin_unlock(&workqueue_lock); 847 /* 848 * We must initialize cwqs for each possible cpu even if we 849 * are going to call destroy_workqueue() finally. Otherwise 850 * cpu_up() can hit the uninitialized cwq once we drop the 851 * lock. 852 */ 853 for_each_possible_cpu(cpu) { 854 cwq = init_cpu_workqueue(wq, cpu); 855 if (err || !cpu_online(cpu)) 856 continue; 857 err = create_workqueue_thread(cwq, cpu); 858 start_workqueue_thread(cwq, cpu); 859 } 860 cpu_maps_update_done(); 861 } 862 863 if (err) { 864 destroy_workqueue(wq); 865 wq = NULL; 866 } 867 return wq; 868 } 869 EXPORT_SYMBOL_GPL(__create_workqueue_key); 870 871 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq) 872 { 873 /* 874 * Our caller is either destroy_workqueue() or CPU_POST_DEAD, 875 * cpu_add_remove_lock protects cwq->thread. 876 */ 877 if (cwq->thread == NULL) 878 return; 879 880 lock_map_acquire(&cwq->wq->lockdep_map); 881 lock_map_release(&cwq->wq->lockdep_map); 882 883 flush_cpu_workqueue(cwq); 884 /* 885 * If the caller is CPU_POST_DEAD and cwq->worklist was not empty, 886 * a concurrent flush_workqueue() can insert a barrier after us. 887 * However, in that case run_workqueue() won't return and check 888 * kthread_should_stop() until it flushes all work_struct's. 889 * When ->worklist becomes empty it is safe to exit because no 890 * more work_structs can be queued on this cwq: flush_workqueue 891 * checks list_empty(), and a "normal" queue_work() can't use 892 * a dead CPU. 893 */ 894 kthread_stop(cwq->thread); 895 cwq->thread = NULL; 896 } 897 898 /** 899 * destroy_workqueue - safely terminate a workqueue 900 * @wq: target workqueue 901 * 902 * Safely destroy a workqueue. All work currently pending will be done first. 903 */ 904 void destroy_workqueue(struct workqueue_struct *wq) 905 { 906 const struct cpumask *cpu_map = wq_cpu_map(wq); 907 int cpu; 908 909 cpu_maps_update_begin(); 910 spin_lock(&workqueue_lock); 911 list_del(&wq->list); 912 spin_unlock(&workqueue_lock); 913 914 for_each_cpu(cpu, cpu_map) 915 cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu)); 916 cpu_maps_update_done(); 917 918 free_percpu(wq->cpu_wq); 919 kfree(wq); 920 } 921 EXPORT_SYMBOL_GPL(destroy_workqueue); 922 923 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 924 unsigned long action, 925 void *hcpu) 926 { 927 unsigned int cpu = (unsigned long)hcpu; 928 struct cpu_workqueue_struct *cwq; 929 struct workqueue_struct *wq; 930 int ret = NOTIFY_OK; 931 932 action &= ~CPU_TASKS_FROZEN; 933 934 switch (action) { 935 case CPU_UP_PREPARE: 936 cpumask_set_cpu(cpu, cpu_populated_map); 937 } 938 undo: 939 list_for_each_entry(wq, &workqueues, list) { 940 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 941 942 switch (action) { 943 case CPU_UP_PREPARE: 944 if (!create_workqueue_thread(cwq, cpu)) 945 break; 946 printk(KERN_ERR "workqueue [%s] for %i failed\n", 947 wq->name, cpu); 948 action = CPU_UP_CANCELED; 949 ret = NOTIFY_BAD; 950 goto undo; 951 952 case CPU_ONLINE: 953 start_workqueue_thread(cwq, cpu); 954 break; 955 956 case CPU_UP_CANCELED: 957 start_workqueue_thread(cwq, -1); 958 case CPU_POST_DEAD: 959 cleanup_workqueue_thread(cwq); 960 break; 961 } 962 } 963 964 switch (action) { 965 case CPU_UP_CANCELED: 966 case CPU_POST_DEAD: 967 cpumask_clear_cpu(cpu, cpu_populated_map); 968 } 969 970 return ret; 971 } 972 973 #ifdef CONFIG_SMP 974 static struct workqueue_struct *work_on_cpu_wq __read_mostly; 975 976 struct work_for_cpu { 977 struct work_struct work; 978 long (*fn)(void *); 979 void *arg; 980 long ret; 981 }; 982 983 static void do_work_for_cpu(struct work_struct *w) 984 { 985 struct work_for_cpu *wfc = container_of(w, struct work_for_cpu, work); 986 987 wfc->ret = wfc->fn(wfc->arg); 988 } 989 990 /** 991 * work_on_cpu - run a function in user context on a particular cpu 992 * @cpu: the cpu to run on 993 * @fn: the function to run 994 * @arg: the function arg 995 * 996 * This will return the value @fn returns. 997 * It is up to the caller to ensure that the cpu doesn't go offline. 998 */ 999 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg) 1000 { 1001 struct work_for_cpu wfc; 1002 1003 INIT_WORK(&wfc.work, do_work_for_cpu); 1004 wfc.fn = fn; 1005 wfc.arg = arg; 1006 queue_work_on(cpu, work_on_cpu_wq, &wfc.work); 1007 flush_work(&wfc.work); 1008 1009 return wfc.ret; 1010 } 1011 EXPORT_SYMBOL_GPL(work_on_cpu); 1012 #endif /* CONFIG_SMP */ 1013 1014 void __init init_workqueues(void) 1015 { 1016 alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL); 1017 1018 cpumask_copy(cpu_populated_map, cpu_online_mask); 1019 singlethread_cpu = cpumask_first(cpu_possible_mask); 1020 cpu_singlethread_map = cpumask_of(singlethread_cpu); 1021 hotcpu_notifier(workqueue_cpu_callback, 0); 1022 keventd_wq = create_workqueue("events"); 1023 BUG_ON(!keventd_wq); 1024 #ifdef CONFIG_SMP 1025 work_on_cpu_wq = create_workqueue("work_on_cpu"); 1026 BUG_ON(!work_on_cpu_wq); 1027 #endif 1028 } 1029