1 /* 2 * linux/kernel/workqueue.c 3 * 4 * Generic mechanism for defining kernel helper threads for running 5 * arbitrary tasks in process context. 6 * 7 * Started by Ingo Molnar, Copyright (C) 2002 8 * 9 * Derived from the taskqueue/keventd code by: 10 * 11 * David Woodhouse <dwmw2@infradead.org> 12 * Andrew Morton <andrewm@uow.edu.au> 13 * Kai Petzke <wpp@marie.physik.tu-berlin.de> 14 * Theodore Ts'o <tytso@mit.edu> 15 * 16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>. 17 */ 18 19 #include <linux/module.h> 20 #include <linux/kernel.h> 21 #include <linux/sched.h> 22 #include <linux/init.h> 23 #include <linux/signal.h> 24 #include <linux/completion.h> 25 #include <linux/workqueue.h> 26 #include <linux/slab.h> 27 #include <linux/cpu.h> 28 #include <linux/notifier.h> 29 #include <linux/kthread.h> 30 #include <linux/hardirq.h> 31 #include <linux/mempolicy.h> 32 #include <linux/freezer.h> 33 #include <linux/kallsyms.h> 34 #include <linux/debug_locks.h> 35 36 /* 37 * The per-CPU workqueue (if single thread, we always use the first 38 * possible cpu). 39 */ 40 struct cpu_workqueue_struct { 41 42 spinlock_t lock; 43 44 struct list_head worklist; 45 wait_queue_head_t more_work; 46 struct work_struct *current_work; 47 48 struct workqueue_struct *wq; 49 struct task_struct *thread; 50 int should_stop; 51 52 int run_depth; /* Detect run_workqueue() recursion depth */ 53 } ____cacheline_aligned; 54 55 /* 56 * The externally visible workqueue abstraction is an array of 57 * per-CPU workqueues: 58 */ 59 struct workqueue_struct { 60 struct cpu_workqueue_struct *cpu_wq; 61 struct list_head list; 62 const char *name; 63 int singlethread; 64 int freezeable; /* Freeze threads during suspend */ 65 }; 66 67 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove 68 threads to each one as cpus come/go. */ 69 static DEFINE_MUTEX(workqueue_mutex); 70 static LIST_HEAD(workqueues); 71 72 static int singlethread_cpu __read_mostly; 73 static cpumask_t cpu_singlethread_map __read_mostly; 74 /* optimization, we could use cpu_possible_map */ 75 static cpumask_t cpu_populated_map __read_mostly; 76 77 /* If it's single threaded, it isn't in the list of workqueues. */ 78 static inline int is_single_threaded(struct workqueue_struct *wq) 79 { 80 return wq->singlethread; 81 } 82 83 static const cpumask_t *wq_cpu_map(struct workqueue_struct *wq) 84 { 85 return is_single_threaded(wq) 86 ? &cpu_singlethread_map : &cpu_populated_map; 87 } 88 89 static 90 struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu) 91 { 92 if (unlikely(is_single_threaded(wq))) 93 cpu = singlethread_cpu; 94 return per_cpu_ptr(wq->cpu_wq, cpu); 95 } 96 97 /* 98 * Set the workqueue on which a work item is to be run 99 * - Must *only* be called if the pending flag is set 100 */ 101 static inline void set_wq_data(struct work_struct *work, 102 struct cpu_workqueue_struct *cwq) 103 { 104 unsigned long new; 105 106 BUG_ON(!work_pending(work)); 107 108 new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING); 109 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work); 110 atomic_long_set(&work->data, new); 111 } 112 113 static inline 114 struct cpu_workqueue_struct *get_wq_data(struct work_struct *work) 115 { 116 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK); 117 } 118 119 static void insert_work(struct cpu_workqueue_struct *cwq, 120 struct work_struct *work, int tail) 121 { 122 set_wq_data(work, cwq); 123 /* 124 * Ensure that we get the right work->data if we see the 125 * result of list_add() below, see try_to_grab_pending(). 126 */ 127 smp_wmb(); 128 if (tail) 129 list_add_tail(&work->entry, &cwq->worklist); 130 else 131 list_add(&work->entry, &cwq->worklist); 132 wake_up(&cwq->more_work); 133 } 134 135 /* Preempt must be disabled. */ 136 static void __queue_work(struct cpu_workqueue_struct *cwq, 137 struct work_struct *work) 138 { 139 unsigned long flags; 140 141 spin_lock_irqsave(&cwq->lock, flags); 142 insert_work(cwq, work, 1); 143 spin_unlock_irqrestore(&cwq->lock, flags); 144 } 145 146 /** 147 * queue_work - queue work on a workqueue 148 * @wq: workqueue to use 149 * @work: work to queue 150 * 151 * Returns 0 if @work was already on a queue, non-zero otherwise. 152 * 153 * We queue the work to the CPU it was submitted, but there is no 154 * guarantee that it will be processed by that CPU. 155 */ 156 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work) 157 { 158 int ret = 0; 159 160 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 161 BUG_ON(!list_empty(&work->entry)); 162 __queue_work(wq_per_cpu(wq, get_cpu()), work); 163 put_cpu(); 164 ret = 1; 165 } 166 return ret; 167 } 168 EXPORT_SYMBOL_GPL(queue_work); 169 170 void delayed_work_timer_fn(unsigned long __data) 171 { 172 struct delayed_work *dwork = (struct delayed_work *)__data; 173 struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work); 174 struct workqueue_struct *wq = cwq->wq; 175 176 __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work); 177 } 178 179 /** 180 * queue_delayed_work - queue work on a workqueue after delay 181 * @wq: workqueue to use 182 * @dwork: delayable work to queue 183 * @delay: number of jiffies to wait before queueing 184 * 185 * Returns 0 if @work was already on a queue, non-zero otherwise. 186 */ 187 int fastcall queue_delayed_work(struct workqueue_struct *wq, 188 struct delayed_work *dwork, unsigned long delay) 189 { 190 timer_stats_timer_set_start_info(&dwork->timer); 191 if (delay == 0) 192 return queue_work(wq, &dwork->work); 193 194 return queue_delayed_work_on(-1, wq, dwork, delay); 195 } 196 EXPORT_SYMBOL_GPL(queue_delayed_work); 197 198 /** 199 * queue_delayed_work_on - queue work on specific CPU after delay 200 * @cpu: CPU number to execute work on 201 * @wq: workqueue to use 202 * @dwork: work to queue 203 * @delay: number of jiffies to wait before queueing 204 * 205 * Returns 0 if @work was already on a queue, non-zero otherwise. 206 */ 207 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq, 208 struct delayed_work *dwork, unsigned long delay) 209 { 210 int ret = 0; 211 struct timer_list *timer = &dwork->timer; 212 struct work_struct *work = &dwork->work; 213 214 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 215 BUG_ON(timer_pending(timer)); 216 BUG_ON(!list_empty(&work->entry)); 217 218 /* This stores cwq for the moment, for the timer_fn */ 219 set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id())); 220 timer->expires = jiffies + delay; 221 timer->data = (unsigned long)dwork; 222 timer->function = delayed_work_timer_fn; 223 224 if (unlikely(cpu >= 0)) 225 add_timer_on(timer, cpu); 226 else 227 add_timer(timer); 228 ret = 1; 229 } 230 return ret; 231 } 232 EXPORT_SYMBOL_GPL(queue_delayed_work_on); 233 234 static void run_workqueue(struct cpu_workqueue_struct *cwq) 235 { 236 spin_lock_irq(&cwq->lock); 237 cwq->run_depth++; 238 if (cwq->run_depth > 3) { 239 /* morton gets to eat his hat */ 240 printk("%s: recursion depth exceeded: %d\n", 241 __FUNCTION__, cwq->run_depth); 242 dump_stack(); 243 } 244 while (!list_empty(&cwq->worklist)) { 245 struct work_struct *work = list_entry(cwq->worklist.next, 246 struct work_struct, entry); 247 work_func_t f = work->func; 248 249 cwq->current_work = work; 250 list_del_init(cwq->worklist.next); 251 spin_unlock_irq(&cwq->lock); 252 253 BUG_ON(get_wq_data(work) != cwq); 254 work_clear_pending(work); 255 f(work); 256 257 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 258 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 259 "%s/0x%08x/%d\n", 260 current->comm, preempt_count(), 261 current->pid); 262 printk(KERN_ERR " last function: "); 263 print_symbol("%s\n", (unsigned long)f); 264 debug_show_held_locks(current); 265 dump_stack(); 266 } 267 268 spin_lock_irq(&cwq->lock); 269 cwq->current_work = NULL; 270 } 271 cwq->run_depth--; 272 spin_unlock_irq(&cwq->lock); 273 } 274 275 /* 276 * NOTE: the caller must not touch *cwq if this func returns true 277 */ 278 static int cwq_should_stop(struct cpu_workqueue_struct *cwq) 279 { 280 int should_stop = cwq->should_stop; 281 282 if (unlikely(should_stop)) { 283 spin_lock_irq(&cwq->lock); 284 should_stop = cwq->should_stop && list_empty(&cwq->worklist); 285 if (should_stop) 286 cwq->thread = NULL; 287 spin_unlock_irq(&cwq->lock); 288 } 289 290 return should_stop; 291 } 292 293 static int worker_thread(void *__cwq) 294 { 295 struct cpu_workqueue_struct *cwq = __cwq; 296 DEFINE_WAIT(wait); 297 298 if (!cwq->wq->freezeable) 299 current->flags |= PF_NOFREEZE; 300 301 set_user_nice(current, -5); 302 303 for (;;) { 304 prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 305 if (!freezing(current) && !cwq->should_stop 306 && list_empty(&cwq->worklist)) 307 schedule(); 308 finish_wait(&cwq->more_work, &wait); 309 310 try_to_freeze(); 311 312 if (cwq_should_stop(cwq)) 313 break; 314 315 run_workqueue(cwq); 316 } 317 318 return 0; 319 } 320 321 struct wq_barrier { 322 struct work_struct work; 323 struct completion done; 324 }; 325 326 static void wq_barrier_func(struct work_struct *work) 327 { 328 struct wq_barrier *barr = container_of(work, struct wq_barrier, work); 329 complete(&barr->done); 330 } 331 332 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq, 333 struct wq_barrier *barr, int tail) 334 { 335 INIT_WORK(&barr->work, wq_barrier_func); 336 __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work)); 337 338 init_completion(&barr->done); 339 340 insert_work(cwq, &barr->work, tail); 341 } 342 343 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq) 344 { 345 if (cwq->thread == current) { 346 /* 347 * Probably keventd trying to flush its own queue. So simply run 348 * it by hand rather than deadlocking. 349 */ 350 run_workqueue(cwq); 351 } else { 352 struct wq_barrier barr; 353 int active = 0; 354 355 spin_lock_irq(&cwq->lock); 356 if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) { 357 insert_wq_barrier(cwq, &barr, 1); 358 active = 1; 359 } 360 spin_unlock_irq(&cwq->lock); 361 362 if (active) 363 wait_for_completion(&barr.done); 364 } 365 } 366 367 /** 368 * flush_workqueue - ensure that any scheduled work has run to completion. 369 * @wq: workqueue to flush 370 * 371 * Forces execution of the workqueue and blocks until its completion. 372 * This is typically used in driver shutdown handlers. 373 * 374 * We sleep until all works which were queued on entry have been handled, 375 * but we are not livelocked by new incoming ones. 376 * 377 * This function used to run the workqueues itself. Now we just wait for the 378 * helper threads to do it. 379 */ 380 void fastcall flush_workqueue(struct workqueue_struct *wq) 381 { 382 const cpumask_t *cpu_map = wq_cpu_map(wq); 383 int cpu; 384 385 might_sleep(); 386 for_each_cpu_mask(cpu, *cpu_map) 387 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu)); 388 } 389 EXPORT_SYMBOL_GPL(flush_workqueue); 390 391 /* 392 * Upon a successful return, the caller "owns" WORK_STRUCT_PENDING bit, 393 * so this work can't be re-armed in any way. 394 */ 395 static int try_to_grab_pending(struct work_struct *work) 396 { 397 struct cpu_workqueue_struct *cwq; 398 int ret = 0; 399 400 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) 401 return 1; 402 403 /* 404 * The queueing is in progress, or it is already queued. Try to 405 * steal it from ->worklist without clearing WORK_STRUCT_PENDING. 406 */ 407 408 cwq = get_wq_data(work); 409 if (!cwq) 410 return ret; 411 412 spin_lock_irq(&cwq->lock); 413 if (!list_empty(&work->entry)) { 414 /* 415 * This work is queued, but perhaps we locked the wrong cwq. 416 * In that case we must see the new value after rmb(), see 417 * insert_work()->wmb(). 418 */ 419 smp_rmb(); 420 if (cwq == get_wq_data(work)) { 421 list_del_init(&work->entry); 422 ret = 1; 423 } 424 } 425 spin_unlock_irq(&cwq->lock); 426 427 return ret; 428 } 429 430 static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq, 431 struct work_struct *work) 432 { 433 struct wq_barrier barr; 434 int running = 0; 435 436 spin_lock_irq(&cwq->lock); 437 if (unlikely(cwq->current_work == work)) { 438 insert_wq_barrier(cwq, &barr, 0); 439 running = 1; 440 } 441 spin_unlock_irq(&cwq->lock); 442 443 if (unlikely(running)) 444 wait_for_completion(&barr.done); 445 } 446 447 static void wait_on_work(struct work_struct *work) 448 { 449 struct cpu_workqueue_struct *cwq; 450 struct workqueue_struct *wq; 451 const cpumask_t *cpu_map; 452 int cpu; 453 454 might_sleep(); 455 456 cwq = get_wq_data(work); 457 if (!cwq) 458 return; 459 460 wq = cwq->wq; 461 cpu_map = wq_cpu_map(wq); 462 463 for_each_cpu_mask(cpu, *cpu_map) 464 wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work); 465 } 466 467 /** 468 * cancel_work_sync - block until a work_struct's callback has terminated 469 * @work: the work which is to be flushed 470 * 471 * cancel_work_sync() will cancel the work if it is queued. If the work's 472 * callback appears to be running, cancel_work_sync() will block until it 473 * has completed. 474 * 475 * It is possible to use this function if the work re-queues itself. It can 476 * cancel the work even if it migrates to another workqueue, however in that 477 * case it only guarantees that work->func() has completed on the last queued 478 * workqueue. 479 * 480 * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not 481 * pending, otherwise it goes into a busy-wait loop until the timer expires. 482 * 483 * The caller must ensure that workqueue_struct on which this work was last 484 * queued can't be destroyed before this function returns. 485 */ 486 void cancel_work_sync(struct work_struct *work) 487 { 488 while (!try_to_grab_pending(work)) 489 cpu_relax(); 490 wait_on_work(work); 491 work_clear_pending(work); 492 } 493 EXPORT_SYMBOL_GPL(cancel_work_sync); 494 495 /** 496 * cancel_rearming_delayed_work - reliably kill off a delayed work. 497 * @dwork: the delayed work struct 498 * 499 * It is possible to use this function if @dwork rearms itself via queue_work() 500 * or queue_delayed_work(). See also the comment for cancel_work_sync(). 501 */ 502 void cancel_rearming_delayed_work(struct delayed_work *dwork) 503 { 504 while (!del_timer(&dwork->timer) && 505 !try_to_grab_pending(&dwork->work)) 506 cpu_relax(); 507 wait_on_work(&dwork->work); 508 work_clear_pending(&dwork->work); 509 } 510 EXPORT_SYMBOL(cancel_rearming_delayed_work); 511 512 static struct workqueue_struct *keventd_wq __read_mostly; 513 514 /** 515 * schedule_work - put work task in global workqueue 516 * @work: job to be done 517 * 518 * This puts a job in the kernel-global workqueue. 519 */ 520 int fastcall schedule_work(struct work_struct *work) 521 { 522 return queue_work(keventd_wq, work); 523 } 524 EXPORT_SYMBOL(schedule_work); 525 526 /** 527 * schedule_delayed_work - put work task in global workqueue after delay 528 * @dwork: job to be done 529 * @delay: number of jiffies to wait or 0 for immediate execution 530 * 531 * After waiting for a given time this puts a job in the kernel-global 532 * workqueue. 533 */ 534 int fastcall schedule_delayed_work(struct delayed_work *dwork, 535 unsigned long delay) 536 { 537 timer_stats_timer_set_start_info(&dwork->timer); 538 return queue_delayed_work(keventd_wq, dwork, delay); 539 } 540 EXPORT_SYMBOL(schedule_delayed_work); 541 542 /** 543 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay 544 * @cpu: cpu to use 545 * @dwork: job to be done 546 * @delay: number of jiffies to wait 547 * 548 * After waiting for a given time this puts a job in the kernel-global 549 * workqueue on the specified CPU. 550 */ 551 int schedule_delayed_work_on(int cpu, 552 struct delayed_work *dwork, unsigned long delay) 553 { 554 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay); 555 } 556 EXPORT_SYMBOL(schedule_delayed_work_on); 557 558 /** 559 * schedule_on_each_cpu - call a function on each online CPU from keventd 560 * @func: the function to call 561 * 562 * Returns zero on success. 563 * Returns -ve errno on failure. 564 * 565 * Appears to be racy against CPU hotplug. 566 * 567 * schedule_on_each_cpu() is very slow. 568 */ 569 int schedule_on_each_cpu(work_func_t func) 570 { 571 int cpu; 572 struct work_struct *works; 573 574 works = alloc_percpu(struct work_struct); 575 if (!works) 576 return -ENOMEM; 577 578 preempt_disable(); /* CPU hotplug */ 579 for_each_online_cpu(cpu) { 580 struct work_struct *work = per_cpu_ptr(works, cpu); 581 582 INIT_WORK(work, func); 583 set_bit(WORK_STRUCT_PENDING, work_data_bits(work)); 584 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work); 585 } 586 preempt_enable(); 587 flush_workqueue(keventd_wq); 588 free_percpu(works); 589 return 0; 590 } 591 592 void flush_scheduled_work(void) 593 { 594 flush_workqueue(keventd_wq); 595 } 596 EXPORT_SYMBOL(flush_scheduled_work); 597 598 /** 599 * execute_in_process_context - reliably execute the routine with user context 600 * @fn: the function to execute 601 * @ew: guaranteed storage for the execute work structure (must 602 * be available when the work executes) 603 * 604 * Executes the function immediately if process context is available, 605 * otherwise schedules the function for delayed execution. 606 * 607 * Returns: 0 - function was executed 608 * 1 - function was scheduled for execution 609 */ 610 int execute_in_process_context(work_func_t fn, struct execute_work *ew) 611 { 612 if (!in_interrupt()) { 613 fn(&ew->work); 614 return 0; 615 } 616 617 INIT_WORK(&ew->work, fn); 618 schedule_work(&ew->work); 619 620 return 1; 621 } 622 EXPORT_SYMBOL_GPL(execute_in_process_context); 623 624 int keventd_up(void) 625 { 626 return keventd_wq != NULL; 627 } 628 629 int current_is_keventd(void) 630 { 631 struct cpu_workqueue_struct *cwq; 632 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */ 633 int ret = 0; 634 635 BUG_ON(!keventd_wq); 636 637 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu); 638 if (current == cwq->thread) 639 ret = 1; 640 641 return ret; 642 643 } 644 645 static struct cpu_workqueue_struct * 646 init_cpu_workqueue(struct workqueue_struct *wq, int cpu) 647 { 648 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu); 649 650 cwq->wq = wq; 651 spin_lock_init(&cwq->lock); 652 INIT_LIST_HEAD(&cwq->worklist); 653 init_waitqueue_head(&cwq->more_work); 654 655 return cwq; 656 } 657 658 static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 659 { 660 struct workqueue_struct *wq = cwq->wq; 661 const char *fmt = is_single_threaded(wq) ? "%s" : "%s/%d"; 662 struct task_struct *p; 663 664 p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu); 665 /* 666 * Nobody can add the work_struct to this cwq, 667 * if (caller is __create_workqueue) 668 * nobody should see this wq 669 * else // caller is CPU_UP_PREPARE 670 * cpu is not on cpu_online_map 671 * so we can abort safely. 672 */ 673 if (IS_ERR(p)) 674 return PTR_ERR(p); 675 676 cwq->thread = p; 677 cwq->should_stop = 0; 678 679 return 0; 680 } 681 682 static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 683 { 684 struct task_struct *p = cwq->thread; 685 686 if (p != NULL) { 687 if (cpu >= 0) 688 kthread_bind(p, cpu); 689 wake_up_process(p); 690 } 691 } 692 693 struct workqueue_struct *__create_workqueue(const char *name, 694 int singlethread, int freezeable) 695 { 696 struct workqueue_struct *wq; 697 struct cpu_workqueue_struct *cwq; 698 int err = 0, cpu; 699 700 wq = kzalloc(sizeof(*wq), GFP_KERNEL); 701 if (!wq) 702 return NULL; 703 704 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 705 if (!wq->cpu_wq) { 706 kfree(wq); 707 return NULL; 708 } 709 710 wq->name = name; 711 wq->singlethread = singlethread; 712 wq->freezeable = freezeable; 713 INIT_LIST_HEAD(&wq->list); 714 715 if (singlethread) { 716 cwq = init_cpu_workqueue(wq, singlethread_cpu); 717 err = create_workqueue_thread(cwq, singlethread_cpu); 718 start_workqueue_thread(cwq, -1); 719 } else { 720 mutex_lock(&workqueue_mutex); 721 list_add(&wq->list, &workqueues); 722 723 for_each_possible_cpu(cpu) { 724 cwq = init_cpu_workqueue(wq, cpu); 725 if (err || !cpu_online(cpu)) 726 continue; 727 err = create_workqueue_thread(cwq, cpu); 728 start_workqueue_thread(cwq, cpu); 729 } 730 mutex_unlock(&workqueue_mutex); 731 } 732 733 if (err) { 734 destroy_workqueue(wq); 735 wq = NULL; 736 } 737 return wq; 738 } 739 EXPORT_SYMBOL_GPL(__create_workqueue); 740 741 static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu) 742 { 743 struct wq_barrier barr; 744 int alive = 0; 745 746 spin_lock_irq(&cwq->lock); 747 if (cwq->thread != NULL) { 748 insert_wq_barrier(cwq, &barr, 1); 749 cwq->should_stop = 1; 750 alive = 1; 751 } 752 spin_unlock_irq(&cwq->lock); 753 754 if (alive) { 755 wait_for_completion(&barr.done); 756 757 while (unlikely(cwq->thread != NULL)) 758 cpu_relax(); 759 /* 760 * Wait until cwq->thread unlocks cwq->lock, 761 * it won't touch *cwq after that. 762 */ 763 smp_rmb(); 764 spin_unlock_wait(&cwq->lock); 765 } 766 } 767 768 /** 769 * destroy_workqueue - safely terminate a workqueue 770 * @wq: target workqueue 771 * 772 * Safely destroy a workqueue. All work currently pending will be done first. 773 */ 774 void destroy_workqueue(struct workqueue_struct *wq) 775 { 776 const cpumask_t *cpu_map = wq_cpu_map(wq); 777 struct cpu_workqueue_struct *cwq; 778 int cpu; 779 780 mutex_lock(&workqueue_mutex); 781 list_del(&wq->list); 782 mutex_unlock(&workqueue_mutex); 783 784 for_each_cpu_mask(cpu, *cpu_map) { 785 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 786 cleanup_workqueue_thread(cwq, cpu); 787 } 788 789 free_percpu(wq->cpu_wq); 790 kfree(wq); 791 } 792 EXPORT_SYMBOL_GPL(destroy_workqueue); 793 794 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb, 795 unsigned long action, 796 void *hcpu) 797 { 798 unsigned int cpu = (unsigned long)hcpu; 799 struct cpu_workqueue_struct *cwq; 800 struct workqueue_struct *wq; 801 802 action &= ~CPU_TASKS_FROZEN; 803 804 switch (action) { 805 case CPU_LOCK_ACQUIRE: 806 mutex_lock(&workqueue_mutex); 807 return NOTIFY_OK; 808 809 case CPU_LOCK_RELEASE: 810 mutex_unlock(&workqueue_mutex); 811 return NOTIFY_OK; 812 813 case CPU_UP_PREPARE: 814 cpu_set(cpu, cpu_populated_map); 815 } 816 817 list_for_each_entry(wq, &workqueues, list) { 818 cwq = per_cpu_ptr(wq->cpu_wq, cpu); 819 820 switch (action) { 821 case CPU_UP_PREPARE: 822 if (!create_workqueue_thread(cwq, cpu)) 823 break; 824 printk(KERN_ERR "workqueue for %i failed\n", cpu); 825 return NOTIFY_BAD; 826 827 case CPU_ONLINE: 828 start_workqueue_thread(cwq, cpu); 829 break; 830 831 case CPU_UP_CANCELED: 832 start_workqueue_thread(cwq, -1); 833 case CPU_DEAD: 834 cleanup_workqueue_thread(cwq, cpu); 835 break; 836 } 837 } 838 839 return NOTIFY_OK; 840 } 841 842 void __init init_workqueues(void) 843 { 844 cpu_populated_map = cpu_online_map; 845 singlethread_cpu = first_cpu(cpu_possible_map); 846 cpu_singlethread_map = cpumask_of_cpu(singlethread_cpu); 847 hotcpu_notifier(workqueue_cpu_callback, 0); 848 keventd_wq = create_workqueue("events"); 849 BUG_ON(!keventd_wq); 850 } 851