xref: /openbmc/linux/kernel/workqueue.c (revision 87c2ce3b)
1 /*
2  * linux/kernel/workqueue.c
3  *
4  * Generic mechanism for defining kernel helper threads for running
5  * arbitrary tasks in process context.
6  *
7  * Started by Ingo Molnar, Copyright (C) 2002
8  *
9  * Derived from the taskqueue/keventd code by:
10  *
11  *   David Woodhouse <dwmw2@infradead.org>
12  *   Andrew Morton <andrewm@uow.edu.au>
13  *   Kai Petzke <wpp@marie.physik.tu-berlin.de>
14  *   Theodore Ts'o <tytso@mit.edu>
15  *
16  * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
17  */
18 
19 #include <linux/module.h>
20 #include <linux/kernel.h>
21 #include <linux/sched.h>
22 #include <linux/init.h>
23 #include <linux/signal.h>
24 #include <linux/completion.h>
25 #include <linux/workqueue.h>
26 #include <linux/slab.h>
27 #include <linux/cpu.h>
28 #include <linux/notifier.h>
29 #include <linux/kthread.h>
30 
31 /*
32  * The per-CPU workqueue (if single thread, we always use the first
33  * possible cpu).
34  *
35  * The sequence counters are for flush_scheduled_work().  It wants to wait
36  * until until all currently-scheduled works are completed, but it doesn't
37  * want to be livelocked by new, incoming ones.  So it waits until
38  * remove_sequence is >= the insert_sequence which pertained when
39  * flush_scheduled_work() was called.
40  */
41 struct cpu_workqueue_struct {
42 
43 	spinlock_t lock;
44 
45 	long remove_sequence;	/* Least-recently added (next to run) */
46 	long insert_sequence;	/* Next to add */
47 
48 	struct list_head worklist;
49 	wait_queue_head_t more_work;
50 	wait_queue_head_t work_done;
51 
52 	struct workqueue_struct *wq;
53 	task_t *thread;
54 
55 	int run_depth;		/* Detect run_workqueue() recursion depth */
56 } ____cacheline_aligned;
57 
58 /*
59  * The externally visible workqueue abstraction is an array of
60  * per-CPU workqueues:
61  */
62 struct workqueue_struct {
63 	struct cpu_workqueue_struct *cpu_wq;
64 	const char *name;
65 	struct list_head list; 	/* Empty if single thread */
66 };
67 
68 /* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
69    threads to each one as cpus come/go. */
70 static DEFINE_SPINLOCK(workqueue_lock);
71 static LIST_HEAD(workqueues);
72 
73 static int singlethread_cpu;
74 
75 /* If it's single threaded, it isn't in the list of workqueues. */
76 static inline int is_single_threaded(struct workqueue_struct *wq)
77 {
78 	return list_empty(&wq->list);
79 }
80 
81 /* Preempt must be disabled. */
82 static void __queue_work(struct cpu_workqueue_struct *cwq,
83 			 struct work_struct *work)
84 {
85 	unsigned long flags;
86 
87 	spin_lock_irqsave(&cwq->lock, flags);
88 	work->wq_data = cwq;
89 	list_add_tail(&work->entry, &cwq->worklist);
90 	cwq->insert_sequence++;
91 	wake_up(&cwq->more_work);
92 	spin_unlock_irqrestore(&cwq->lock, flags);
93 }
94 
95 /*
96  * Queue work on a workqueue. Return non-zero if it was successfully
97  * added.
98  *
99  * We queue the work to the CPU it was submitted, but there is no
100  * guarantee that it will be processed by that CPU.
101  */
102 int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
103 {
104 	int ret = 0, cpu = get_cpu();
105 
106 	if (!test_and_set_bit(0, &work->pending)) {
107 		if (unlikely(is_single_threaded(wq)))
108 			cpu = singlethread_cpu;
109 		BUG_ON(!list_empty(&work->entry));
110 		__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
111 		ret = 1;
112 	}
113 	put_cpu();
114 	return ret;
115 }
116 
117 static void delayed_work_timer_fn(unsigned long __data)
118 {
119 	struct work_struct *work = (struct work_struct *)__data;
120 	struct workqueue_struct *wq = work->wq_data;
121 	int cpu = smp_processor_id();
122 
123 	if (unlikely(is_single_threaded(wq)))
124 		cpu = singlethread_cpu;
125 
126 	__queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
127 }
128 
129 int fastcall queue_delayed_work(struct workqueue_struct *wq,
130 			struct work_struct *work, unsigned long delay)
131 {
132 	int ret = 0;
133 	struct timer_list *timer = &work->timer;
134 
135 	if (!test_and_set_bit(0, &work->pending)) {
136 		BUG_ON(timer_pending(timer));
137 		BUG_ON(!list_empty(&work->entry));
138 
139 		/* This stores wq for the moment, for the timer_fn */
140 		work->wq_data = wq;
141 		timer->expires = jiffies + delay;
142 		timer->data = (unsigned long)work;
143 		timer->function = delayed_work_timer_fn;
144 		add_timer(timer);
145 		ret = 1;
146 	}
147 	return ret;
148 }
149 
150 static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
151 {
152 	unsigned long flags;
153 
154 	/*
155 	 * Keep taking off work from the queue until
156 	 * done.
157 	 */
158 	spin_lock_irqsave(&cwq->lock, flags);
159 	cwq->run_depth++;
160 	if (cwq->run_depth > 3) {
161 		/* morton gets to eat his hat */
162 		printk("%s: recursion depth exceeded: %d\n",
163 			__FUNCTION__, cwq->run_depth);
164 		dump_stack();
165 	}
166 	while (!list_empty(&cwq->worklist)) {
167 		struct work_struct *work = list_entry(cwq->worklist.next,
168 						struct work_struct, entry);
169 		void (*f) (void *) = work->func;
170 		void *data = work->data;
171 
172 		list_del_init(cwq->worklist.next);
173 		spin_unlock_irqrestore(&cwq->lock, flags);
174 
175 		BUG_ON(work->wq_data != cwq);
176 		clear_bit(0, &work->pending);
177 		f(data);
178 
179 		spin_lock_irqsave(&cwq->lock, flags);
180 		cwq->remove_sequence++;
181 		wake_up(&cwq->work_done);
182 	}
183 	cwq->run_depth--;
184 	spin_unlock_irqrestore(&cwq->lock, flags);
185 }
186 
187 static int worker_thread(void *__cwq)
188 {
189 	struct cpu_workqueue_struct *cwq = __cwq;
190 	DECLARE_WAITQUEUE(wait, current);
191 	struct k_sigaction sa;
192 	sigset_t blocked;
193 
194 	current->flags |= PF_NOFREEZE;
195 
196 	set_user_nice(current, -5);
197 
198 	/* Block and flush all signals */
199 	sigfillset(&blocked);
200 	sigprocmask(SIG_BLOCK, &blocked, NULL);
201 	flush_signals(current);
202 
203 	/* SIG_IGN makes children autoreap: see do_notify_parent(). */
204 	sa.sa.sa_handler = SIG_IGN;
205 	sa.sa.sa_flags = 0;
206 	siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
207 	do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
208 
209 	set_current_state(TASK_INTERRUPTIBLE);
210 	while (!kthread_should_stop()) {
211 		add_wait_queue(&cwq->more_work, &wait);
212 		if (list_empty(&cwq->worklist))
213 			schedule();
214 		else
215 			__set_current_state(TASK_RUNNING);
216 		remove_wait_queue(&cwq->more_work, &wait);
217 
218 		if (!list_empty(&cwq->worklist))
219 			run_workqueue(cwq);
220 		set_current_state(TASK_INTERRUPTIBLE);
221 	}
222 	__set_current_state(TASK_RUNNING);
223 	return 0;
224 }
225 
226 static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
227 {
228 	if (cwq->thread == current) {
229 		/*
230 		 * Probably keventd trying to flush its own queue. So simply run
231 		 * it by hand rather than deadlocking.
232 		 */
233 		run_workqueue(cwq);
234 	} else {
235 		DEFINE_WAIT(wait);
236 		long sequence_needed;
237 
238 		spin_lock_irq(&cwq->lock);
239 		sequence_needed = cwq->insert_sequence;
240 
241 		while (sequence_needed - cwq->remove_sequence > 0) {
242 			prepare_to_wait(&cwq->work_done, &wait,
243 					TASK_UNINTERRUPTIBLE);
244 			spin_unlock_irq(&cwq->lock);
245 			schedule();
246 			spin_lock_irq(&cwq->lock);
247 		}
248 		finish_wait(&cwq->work_done, &wait);
249 		spin_unlock_irq(&cwq->lock);
250 	}
251 }
252 
253 /*
254  * flush_workqueue - ensure that any scheduled work has run to completion.
255  *
256  * Forces execution of the workqueue and blocks until its completion.
257  * This is typically used in driver shutdown handlers.
258  *
259  * This function will sample each workqueue's current insert_sequence number and
260  * will sleep until the head sequence is greater than or equal to that.  This
261  * means that we sleep until all works which were queued on entry have been
262  * handled, but we are not livelocked by new incoming ones.
263  *
264  * This function used to run the workqueues itself.  Now we just wait for the
265  * helper threads to do it.
266  */
267 void fastcall flush_workqueue(struct workqueue_struct *wq)
268 {
269 	might_sleep();
270 
271 	if (is_single_threaded(wq)) {
272 		/* Always use first cpu's area. */
273 		flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
274 	} else {
275 		int cpu;
276 
277 		lock_cpu_hotplug();
278 		for_each_online_cpu(cpu)
279 			flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
280 		unlock_cpu_hotplug();
281 	}
282 }
283 
284 static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
285 						   int cpu)
286 {
287 	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
288 	struct task_struct *p;
289 
290 	spin_lock_init(&cwq->lock);
291 	cwq->wq = wq;
292 	cwq->thread = NULL;
293 	cwq->insert_sequence = 0;
294 	cwq->remove_sequence = 0;
295 	INIT_LIST_HEAD(&cwq->worklist);
296 	init_waitqueue_head(&cwq->more_work);
297 	init_waitqueue_head(&cwq->work_done);
298 
299 	if (is_single_threaded(wq))
300 		p = kthread_create(worker_thread, cwq, "%s", wq->name);
301 	else
302 		p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
303 	if (IS_ERR(p))
304 		return NULL;
305 	cwq->thread = p;
306 	return p;
307 }
308 
309 struct workqueue_struct *__create_workqueue(const char *name,
310 					    int singlethread)
311 {
312 	int cpu, destroy = 0;
313 	struct workqueue_struct *wq;
314 	struct task_struct *p;
315 
316 	wq = kzalloc(sizeof(*wq), GFP_KERNEL);
317 	if (!wq)
318 		return NULL;
319 
320 	wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
321 	if (!wq->cpu_wq) {
322 		kfree(wq);
323 		return NULL;
324 	}
325 
326 	wq->name = name;
327 	/* We don't need the distraction of CPUs appearing and vanishing. */
328 	lock_cpu_hotplug();
329 	if (singlethread) {
330 		INIT_LIST_HEAD(&wq->list);
331 		p = create_workqueue_thread(wq, singlethread_cpu);
332 		if (!p)
333 			destroy = 1;
334 		else
335 			wake_up_process(p);
336 	} else {
337 		spin_lock(&workqueue_lock);
338 		list_add(&wq->list, &workqueues);
339 		spin_unlock(&workqueue_lock);
340 		for_each_online_cpu(cpu) {
341 			p = create_workqueue_thread(wq, cpu);
342 			if (p) {
343 				kthread_bind(p, cpu);
344 				wake_up_process(p);
345 			} else
346 				destroy = 1;
347 		}
348 	}
349 	unlock_cpu_hotplug();
350 
351 	/*
352 	 * Was there any error during startup? If yes then clean up:
353 	 */
354 	if (destroy) {
355 		destroy_workqueue(wq);
356 		wq = NULL;
357 	}
358 	return wq;
359 }
360 
361 static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
362 {
363 	struct cpu_workqueue_struct *cwq;
364 	unsigned long flags;
365 	struct task_struct *p;
366 
367 	cwq = per_cpu_ptr(wq->cpu_wq, cpu);
368 	spin_lock_irqsave(&cwq->lock, flags);
369 	p = cwq->thread;
370 	cwq->thread = NULL;
371 	spin_unlock_irqrestore(&cwq->lock, flags);
372 	if (p)
373 		kthread_stop(p);
374 }
375 
376 void destroy_workqueue(struct workqueue_struct *wq)
377 {
378 	int cpu;
379 
380 	flush_workqueue(wq);
381 
382 	/* We don't need the distraction of CPUs appearing and vanishing. */
383 	lock_cpu_hotplug();
384 	if (is_single_threaded(wq))
385 		cleanup_workqueue_thread(wq, singlethread_cpu);
386 	else {
387 		for_each_online_cpu(cpu)
388 			cleanup_workqueue_thread(wq, cpu);
389 		spin_lock(&workqueue_lock);
390 		list_del(&wq->list);
391 		spin_unlock(&workqueue_lock);
392 	}
393 	unlock_cpu_hotplug();
394 	free_percpu(wq->cpu_wq);
395 	kfree(wq);
396 }
397 
398 static struct workqueue_struct *keventd_wq;
399 
400 int fastcall schedule_work(struct work_struct *work)
401 {
402 	return queue_work(keventd_wq, work);
403 }
404 
405 int fastcall schedule_delayed_work(struct work_struct *work, unsigned long delay)
406 {
407 	return queue_delayed_work(keventd_wq, work, delay);
408 }
409 
410 int schedule_delayed_work_on(int cpu,
411 			struct work_struct *work, unsigned long delay)
412 {
413 	int ret = 0;
414 	struct timer_list *timer = &work->timer;
415 
416 	if (!test_and_set_bit(0, &work->pending)) {
417 		BUG_ON(timer_pending(timer));
418 		BUG_ON(!list_empty(&work->entry));
419 		/* This stores keventd_wq for the moment, for the timer_fn */
420 		work->wq_data = keventd_wq;
421 		timer->expires = jiffies + delay;
422 		timer->data = (unsigned long)work;
423 		timer->function = delayed_work_timer_fn;
424 		add_timer_on(timer, cpu);
425 		ret = 1;
426 	}
427 	return ret;
428 }
429 
430 int schedule_on_each_cpu(void (*func) (void *info), void *info)
431 {
432 	int cpu;
433 	struct work_struct *work;
434 
435 	work = kmalloc(NR_CPUS * sizeof(struct work_struct), GFP_KERNEL);
436 
437 	if (!work)
438 		return -ENOMEM;
439 	for_each_online_cpu(cpu) {
440 		INIT_WORK(work + cpu, func, info);
441 		__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu),
442 				work + cpu);
443 	}
444 	flush_workqueue(keventd_wq);
445 	kfree(work);
446 	return 0;
447 }
448 
449 void flush_scheduled_work(void)
450 {
451 	flush_workqueue(keventd_wq);
452 }
453 
454 /**
455  * cancel_rearming_delayed_workqueue - reliably kill off a delayed
456  *			work whose handler rearms the delayed work.
457  * @wq:   the controlling workqueue structure
458  * @work: the delayed work struct
459  */
460 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
461 				       struct work_struct *work)
462 {
463 	while (!cancel_delayed_work(work))
464 		flush_workqueue(wq);
465 }
466 EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
467 
468 /**
469  * cancel_rearming_delayed_work - reliably kill off a delayed keventd
470  *			work whose handler rearms the delayed work.
471  * @work: the delayed work struct
472  */
473 void cancel_rearming_delayed_work(struct work_struct *work)
474 {
475 	cancel_rearming_delayed_workqueue(keventd_wq, work);
476 }
477 EXPORT_SYMBOL(cancel_rearming_delayed_work);
478 
479 int keventd_up(void)
480 {
481 	return keventd_wq != NULL;
482 }
483 
484 int current_is_keventd(void)
485 {
486 	struct cpu_workqueue_struct *cwq;
487 	int cpu = smp_processor_id();	/* preempt-safe: keventd is per-cpu */
488 	int ret = 0;
489 
490 	BUG_ON(!keventd_wq);
491 
492 	cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
493 	if (current == cwq->thread)
494 		ret = 1;
495 
496 	return ret;
497 
498 }
499 
500 #ifdef CONFIG_HOTPLUG_CPU
501 /* Take the work from this (downed) CPU. */
502 static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
503 {
504 	struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
505 	LIST_HEAD(list);
506 	struct work_struct *work;
507 
508 	spin_lock_irq(&cwq->lock);
509 	list_splice_init(&cwq->worklist, &list);
510 
511 	while (!list_empty(&list)) {
512 		printk("Taking work for %s\n", wq->name);
513 		work = list_entry(list.next,struct work_struct,entry);
514 		list_del(&work->entry);
515 		__queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
516 	}
517 	spin_unlock_irq(&cwq->lock);
518 }
519 
520 /* We're holding the cpucontrol mutex here */
521 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
522 				  unsigned long action,
523 				  void *hcpu)
524 {
525 	unsigned int hotcpu = (unsigned long)hcpu;
526 	struct workqueue_struct *wq;
527 
528 	switch (action) {
529 	case CPU_UP_PREPARE:
530 		/* Create a new workqueue thread for it. */
531 		list_for_each_entry(wq, &workqueues, list) {
532 			if (!create_workqueue_thread(wq, hotcpu)) {
533 				printk("workqueue for %i failed\n", hotcpu);
534 				return NOTIFY_BAD;
535 			}
536 		}
537 		break;
538 
539 	case CPU_ONLINE:
540 		/* Kick off worker threads. */
541 		list_for_each_entry(wq, &workqueues, list) {
542 			struct cpu_workqueue_struct *cwq;
543 
544 			cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
545 			kthread_bind(cwq->thread, hotcpu);
546 			wake_up_process(cwq->thread);
547 		}
548 		break;
549 
550 	case CPU_UP_CANCELED:
551 		list_for_each_entry(wq, &workqueues, list) {
552 			/* Unbind so it can run. */
553 			kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
554 				     any_online_cpu(cpu_online_map));
555 			cleanup_workqueue_thread(wq, hotcpu);
556 		}
557 		break;
558 
559 	case CPU_DEAD:
560 		list_for_each_entry(wq, &workqueues, list)
561 			cleanup_workqueue_thread(wq, hotcpu);
562 		list_for_each_entry(wq, &workqueues, list)
563 			take_over_work(wq, hotcpu);
564 		break;
565 	}
566 
567 	return NOTIFY_OK;
568 }
569 #endif
570 
571 void init_workqueues(void)
572 {
573 	singlethread_cpu = first_cpu(cpu_possible_map);
574 	hotcpu_notifier(workqueue_cpu_callback, 0);
575 	keventd_wq = create_workqueue("events");
576 	BUG_ON(!keventd_wq);
577 }
578 
579 EXPORT_SYMBOL_GPL(__create_workqueue);
580 EXPORT_SYMBOL_GPL(queue_work);
581 EXPORT_SYMBOL_GPL(queue_delayed_work);
582 EXPORT_SYMBOL_GPL(flush_workqueue);
583 EXPORT_SYMBOL_GPL(destroy_workqueue);
584 
585 EXPORT_SYMBOL(schedule_work);
586 EXPORT_SYMBOL(schedule_delayed_work);
587 EXPORT_SYMBOL(schedule_delayed_work_on);
588 EXPORT_SYMBOL(flush_scheduled_work);
589