xref: /openbmc/linux/fs/btrfs/async-thread.c (revision 82ced6fd)
1 /*
2  * Copyright (C) 2007 Oracle.  All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public
6  * License v2 as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11  * General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public
14  * License along with this program; if not, write to the
15  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16  * Boston, MA 021110-1307, USA.
17  */
18 
19 #include <linux/kthread.h>
20 #include <linux/list.h>
21 #include <linux/spinlock.h>
22 #include <linux/freezer.h>
23 #include "async-thread.h"
24 
25 #define WORK_QUEUED_BIT 0
26 #define WORK_DONE_BIT 1
27 #define WORK_ORDER_DONE_BIT 2
28 #define WORK_HIGH_PRIO_BIT 3
29 
30 /*
31  * container for the kthread task pointer and the list of pending work
32  * One of these is allocated per thread.
33  */
34 struct btrfs_worker_thread {
35 	/* pool we belong to */
36 	struct btrfs_workers *workers;
37 
38 	/* list of struct btrfs_work that are waiting for service */
39 	struct list_head pending;
40 	struct list_head prio_pending;
41 
42 	/* list of worker threads from struct btrfs_workers */
43 	struct list_head worker_list;
44 
45 	/* kthread */
46 	struct task_struct *task;
47 
48 	/* number of things on the pending list */
49 	atomic_t num_pending;
50 
51 	unsigned long sequence;
52 
53 	/* protects the pending list. */
54 	spinlock_t lock;
55 
56 	/* set to non-zero when this thread is already awake and kicking */
57 	int working;
58 
59 	/* are we currently idle */
60 	int idle;
61 };
62 
63 /*
64  * helper function to move a thread onto the idle list after it
65  * has finished some requests.
66  */
67 static void check_idle_worker(struct btrfs_worker_thread *worker)
68 {
69 	if (!worker->idle && atomic_read(&worker->num_pending) <
70 	    worker->workers->idle_thresh / 2) {
71 		unsigned long flags;
72 		spin_lock_irqsave(&worker->workers->lock, flags);
73 		worker->idle = 1;
74 		list_move(&worker->worker_list, &worker->workers->idle_list);
75 		spin_unlock_irqrestore(&worker->workers->lock, flags);
76 	}
77 }
78 
79 /*
80  * helper function to move a thread off the idle list after new
81  * pending work is added.
82  */
83 static void check_busy_worker(struct btrfs_worker_thread *worker)
84 {
85 	if (worker->idle && atomic_read(&worker->num_pending) >=
86 	    worker->workers->idle_thresh) {
87 		unsigned long flags;
88 		spin_lock_irqsave(&worker->workers->lock, flags);
89 		worker->idle = 0;
90 		list_move_tail(&worker->worker_list,
91 			       &worker->workers->worker_list);
92 		spin_unlock_irqrestore(&worker->workers->lock, flags);
93 	}
94 }
95 
96 static noinline int run_ordered_completions(struct btrfs_workers *workers,
97 					    struct btrfs_work *work)
98 {
99 	unsigned long flags;
100 
101 	if (!workers->ordered)
102 		return 0;
103 
104 	set_bit(WORK_DONE_BIT, &work->flags);
105 
106 	spin_lock_irqsave(&workers->lock, flags);
107 
108 	while (1) {
109 		if (!list_empty(&workers->prio_order_list)) {
110 			work = list_entry(workers->prio_order_list.next,
111 					  struct btrfs_work, order_list);
112 		} else if (!list_empty(&workers->order_list)) {
113 			work = list_entry(workers->order_list.next,
114 					  struct btrfs_work, order_list);
115 		} else {
116 			break;
117 		}
118 		if (!test_bit(WORK_DONE_BIT, &work->flags))
119 			break;
120 
121 		/* we are going to call the ordered done function, but
122 		 * we leave the work item on the list as a barrier so
123 		 * that later work items that are done don't have their
124 		 * functions called before this one returns
125 		 */
126 		if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
127 			break;
128 
129 		spin_unlock_irqrestore(&workers->lock, flags);
130 
131 		work->ordered_func(work);
132 
133 		/* now take the lock again and call the freeing code */
134 		spin_lock_irqsave(&workers->lock, flags);
135 		list_del(&work->order_list);
136 		work->ordered_free(work);
137 	}
138 
139 	spin_unlock_irqrestore(&workers->lock, flags);
140 	return 0;
141 }
142 
143 /*
144  * main loop for servicing work items
145  */
146 static int worker_loop(void *arg)
147 {
148 	struct btrfs_worker_thread *worker = arg;
149 	struct list_head *cur;
150 	struct btrfs_work *work;
151 	do {
152 		spin_lock_irq(&worker->lock);
153 again_locked:
154 		while (1) {
155 			if (!list_empty(&worker->prio_pending))
156 				cur = worker->prio_pending.next;
157 			else if (!list_empty(&worker->pending))
158 				cur = worker->pending.next;
159 			else
160 				break;
161 
162 			work = list_entry(cur, struct btrfs_work, list);
163 			list_del(&work->list);
164 			clear_bit(WORK_QUEUED_BIT, &work->flags);
165 
166 			work->worker = worker;
167 			spin_unlock_irq(&worker->lock);
168 
169 			work->func(work);
170 
171 			atomic_dec(&worker->num_pending);
172 			/*
173 			 * unless this is an ordered work queue,
174 			 * 'work' was probably freed by func above.
175 			 */
176 			run_ordered_completions(worker->workers, work);
177 
178 			spin_lock_irq(&worker->lock);
179 			check_idle_worker(worker);
180 		}
181 		if (freezing(current)) {
182 			worker->working = 0;
183 			spin_unlock_irq(&worker->lock);
184 			refrigerator();
185 		} else {
186 			spin_unlock_irq(&worker->lock);
187 			if (!kthread_should_stop()) {
188 				cpu_relax();
189 				/*
190 				 * we've dropped the lock, did someone else
191 				 * jump_in?
192 				 */
193 				smp_mb();
194 				if (!list_empty(&worker->pending) ||
195 				    !list_empty(&worker->prio_pending))
196 					continue;
197 
198 				/*
199 				 * this short schedule allows more work to
200 				 * come in without the queue functions
201 				 * needing to go through wake_up_process()
202 				 *
203 				 * worker->working is still 1, so nobody
204 				 * is going to try and wake us up
205 				 */
206 				schedule_timeout(1);
207 				smp_mb();
208 				if (!list_empty(&worker->pending) ||
209 				    !list_empty(&worker->prio_pending))
210 					continue;
211 
212 				if (kthread_should_stop())
213 					break;
214 
215 				/* still no more work?, sleep for real */
216 				spin_lock_irq(&worker->lock);
217 				set_current_state(TASK_INTERRUPTIBLE);
218 				if (!list_empty(&worker->pending) ||
219 				    !list_empty(&worker->prio_pending))
220 					goto again_locked;
221 
222 				/*
223 				 * this makes sure we get a wakeup when someone
224 				 * adds something new to the queue
225 				 */
226 				worker->working = 0;
227 				spin_unlock_irq(&worker->lock);
228 
229 				if (!kthread_should_stop())
230 					schedule();
231 			}
232 			__set_current_state(TASK_RUNNING);
233 		}
234 	} while (!kthread_should_stop());
235 	return 0;
236 }
237 
238 /*
239  * this will wait for all the worker threads to shutdown
240  */
241 int btrfs_stop_workers(struct btrfs_workers *workers)
242 {
243 	struct list_head *cur;
244 	struct btrfs_worker_thread *worker;
245 
246 	list_splice_init(&workers->idle_list, &workers->worker_list);
247 	while (!list_empty(&workers->worker_list)) {
248 		cur = workers->worker_list.next;
249 		worker = list_entry(cur, struct btrfs_worker_thread,
250 				    worker_list);
251 		kthread_stop(worker->task);
252 		list_del(&worker->worker_list);
253 		kfree(worker);
254 	}
255 	return 0;
256 }
257 
258 /*
259  * simple init on struct btrfs_workers
260  */
261 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
262 {
263 	workers->num_workers = 0;
264 	INIT_LIST_HEAD(&workers->worker_list);
265 	INIT_LIST_HEAD(&workers->idle_list);
266 	INIT_LIST_HEAD(&workers->order_list);
267 	INIT_LIST_HEAD(&workers->prio_order_list);
268 	spin_lock_init(&workers->lock);
269 	workers->max_workers = max;
270 	workers->idle_thresh = 32;
271 	workers->name = name;
272 	workers->ordered = 0;
273 }
274 
275 /*
276  * starts new worker threads.  This does not enforce the max worker
277  * count in case you need to temporarily go past it.
278  */
279 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
280 {
281 	struct btrfs_worker_thread *worker;
282 	int ret = 0;
283 	int i;
284 
285 	for (i = 0; i < num_workers; i++) {
286 		worker = kzalloc(sizeof(*worker), GFP_NOFS);
287 		if (!worker) {
288 			ret = -ENOMEM;
289 			goto fail;
290 		}
291 
292 		INIT_LIST_HEAD(&worker->pending);
293 		INIT_LIST_HEAD(&worker->prio_pending);
294 		INIT_LIST_HEAD(&worker->worker_list);
295 		spin_lock_init(&worker->lock);
296 		atomic_set(&worker->num_pending, 0);
297 		worker->task = kthread_run(worker_loop, worker,
298 					   "btrfs-%s-%d", workers->name,
299 					   workers->num_workers + i);
300 		worker->workers = workers;
301 		if (IS_ERR(worker->task)) {
302 			kfree(worker);
303 			ret = PTR_ERR(worker->task);
304 			goto fail;
305 		}
306 
307 		spin_lock_irq(&workers->lock);
308 		list_add_tail(&worker->worker_list, &workers->idle_list);
309 		worker->idle = 1;
310 		workers->num_workers++;
311 		spin_unlock_irq(&workers->lock);
312 	}
313 	return 0;
314 fail:
315 	btrfs_stop_workers(workers);
316 	return ret;
317 }
318 
319 /*
320  * run through the list and find a worker thread that doesn't have a lot
321  * to do right now.  This can return null if we aren't yet at the thread
322  * count limit and all of the threads are busy.
323  */
324 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
325 {
326 	struct btrfs_worker_thread *worker;
327 	struct list_head *next;
328 	int enforce_min = workers->num_workers < workers->max_workers;
329 
330 	/*
331 	 * if we find an idle thread, don't move it to the end of the
332 	 * idle list.  This improves the chance that the next submission
333 	 * will reuse the same thread, and maybe catch it while it is still
334 	 * working
335 	 */
336 	if (!list_empty(&workers->idle_list)) {
337 		next = workers->idle_list.next;
338 		worker = list_entry(next, struct btrfs_worker_thread,
339 				    worker_list);
340 		return worker;
341 	}
342 	if (enforce_min || list_empty(&workers->worker_list))
343 		return NULL;
344 
345 	/*
346 	 * if we pick a busy task, move the task to the end of the list.
347 	 * hopefully this will keep things somewhat evenly balanced.
348 	 * Do the move in batches based on the sequence number.  This groups
349 	 * requests submitted at roughly the same time onto the same worker.
350 	 */
351 	next = workers->worker_list.next;
352 	worker = list_entry(next, struct btrfs_worker_thread, worker_list);
353 	atomic_inc(&worker->num_pending);
354 	worker->sequence++;
355 
356 	if (worker->sequence % workers->idle_thresh == 0)
357 		list_move_tail(next, &workers->worker_list);
358 	return worker;
359 }
360 
361 /*
362  * selects a worker thread to take the next job.  This will either find
363  * an idle worker, start a new worker up to the max count, or just return
364  * one of the existing busy workers.
365  */
366 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
367 {
368 	struct btrfs_worker_thread *worker;
369 	unsigned long flags;
370 
371 again:
372 	spin_lock_irqsave(&workers->lock, flags);
373 	worker = next_worker(workers);
374 	spin_unlock_irqrestore(&workers->lock, flags);
375 
376 	if (!worker) {
377 		spin_lock_irqsave(&workers->lock, flags);
378 		if (workers->num_workers >= workers->max_workers) {
379 			struct list_head *fallback = NULL;
380 			/*
381 			 * we have failed to find any workers, just
382 			 * return the force one
383 			 */
384 			if (!list_empty(&workers->worker_list))
385 				fallback = workers->worker_list.next;
386 			if (!list_empty(&workers->idle_list))
387 				fallback = workers->idle_list.next;
388 			BUG_ON(!fallback);
389 			worker = list_entry(fallback,
390 				  struct btrfs_worker_thread, worker_list);
391 			spin_unlock_irqrestore(&workers->lock, flags);
392 		} else {
393 			spin_unlock_irqrestore(&workers->lock, flags);
394 			/* we're below the limit, start another worker */
395 			btrfs_start_workers(workers, 1);
396 			goto again;
397 		}
398 	}
399 	return worker;
400 }
401 
402 /*
403  * btrfs_requeue_work just puts the work item back on the tail of the list
404  * it was taken from.  It is intended for use with long running work functions
405  * that make some progress and want to give the cpu up for others.
406  */
407 int btrfs_requeue_work(struct btrfs_work *work)
408 {
409 	struct btrfs_worker_thread *worker = work->worker;
410 	unsigned long flags;
411 	int wake = 0;
412 
413 	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
414 		goto out;
415 
416 	spin_lock_irqsave(&worker->lock, flags);
417 	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
418 		list_add_tail(&work->list, &worker->prio_pending);
419 	else
420 		list_add_tail(&work->list, &worker->pending);
421 	atomic_inc(&worker->num_pending);
422 
423 	/* by definition we're busy, take ourselves off the idle
424 	 * list
425 	 */
426 	if (worker->idle) {
427 		spin_lock_irqsave(&worker->workers->lock, flags);
428 		worker->idle = 0;
429 		list_move_tail(&worker->worker_list,
430 			       &worker->workers->worker_list);
431 		spin_unlock_irqrestore(&worker->workers->lock, flags);
432 	}
433 	if (!worker->working) {
434 		wake = 1;
435 		worker->working = 1;
436 	}
437 
438 	spin_unlock_irqrestore(&worker->lock, flags);
439 	if (wake)
440 		wake_up_process(worker->task);
441 out:
442 
443 	return 0;
444 }
445 
446 void btrfs_set_work_high_prio(struct btrfs_work *work)
447 {
448 	set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
449 }
450 
451 /*
452  * places a struct btrfs_work into the pending queue of one of the kthreads
453  */
454 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
455 {
456 	struct btrfs_worker_thread *worker;
457 	unsigned long flags;
458 	int wake = 0;
459 
460 	/* don't requeue something already on a list */
461 	if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
462 		goto out;
463 
464 	worker = find_worker(workers);
465 	if (workers->ordered) {
466 		spin_lock_irqsave(&workers->lock, flags);
467 		if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
468 			list_add_tail(&work->order_list,
469 				      &workers->prio_order_list);
470 		} else {
471 			list_add_tail(&work->order_list, &workers->order_list);
472 		}
473 		spin_unlock_irqrestore(&workers->lock, flags);
474 	} else {
475 		INIT_LIST_HEAD(&work->order_list);
476 	}
477 
478 	spin_lock_irqsave(&worker->lock, flags);
479 
480 	if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
481 		list_add_tail(&work->list, &worker->prio_pending);
482 	else
483 		list_add_tail(&work->list, &worker->pending);
484 	atomic_inc(&worker->num_pending);
485 	check_busy_worker(worker);
486 
487 	/*
488 	 * avoid calling into wake_up_process if this thread has already
489 	 * been kicked
490 	 */
491 	if (!worker->working)
492 		wake = 1;
493 	worker->working = 1;
494 
495 	spin_unlock_irqrestore(&worker->lock, flags);
496 
497 	if (wake)
498 		wake_up_process(worker->task);
499 out:
500 	return 0;
501 }
502