xref: /openbmc/linux/net/sunrpc/sched.c (revision df2634f43f5106947f3735a0b61a6527a4b278cd)
1 /*
2  * linux/net/sunrpc/sched.c
3  *
4  * Scheduling for synchronous and asynchronous RPC requests.
5  *
6  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7  *
8  * TCP NFS related read + write fixes
9  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  */
11 
12 #include <linux/module.h>
13 
14 #include <linux/sched.h>
15 #include <linux/interrupt.h>
16 #include <linux/slab.h>
17 #include <linux/mempool.h>
18 #include <linux/smp.h>
19 #include <linux/spinlock.h>
20 #include <linux/mutex.h>
21 
22 #include <linux/sunrpc/clnt.h>
23 
24 #include "sunrpc.h"
25 
26 #ifdef RPC_DEBUG
27 #define RPCDBG_FACILITY		RPCDBG_SCHED
28 #endif
29 
30 /*
31  * RPC slabs and memory pools
32  */
33 #define RPC_BUFFER_MAXSIZE	(2048)
34 #define RPC_BUFFER_POOLSIZE	(8)
35 #define RPC_TASK_POOLSIZE	(8)
36 static struct kmem_cache	*rpc_task_slabp __read_mostly;
37 static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
38 static mempool_t	*rpc_task_mempool __read_mostly;
39 static mempool_t	*rpc_buffer_mempool __read_mostly;
40 
41 static void			rpc_async_schedule(struct work_struct *);
42 static void			 rpc_release_task(struct rpc_task *task);
43 static void __rpc_queue_timer_fn(unsigned long ptr);
44 
45 /*
46  * RPC tasks sit here while waiting for conditions to improve.
47  */
48 static struct rpc_wait_queue delay_queue;
49 
50 /*
51  * rpciod-related stuff
52  */
53 struct workqueue_struct *rpciod_workqueue;
54 
55 /*
56  * Disable the timer for a given RPC task. Should be called with
57  * queue->lock and bh_disabled in order to avoid races within
58  * rpc_run_timer().
59  */
60 static void
61 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
62 {
63 	if (task->tk_timeout == 0)
64 		return;
65 	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
66 	task->tk_timeout = 0;
67 	list_del(&task->u.tk_wait.timer_list);
68 	if (list_empty(&queue->timer_list.list))
69 		del_timer(&queue->timer_list.timer);
70 }
71 
72 static void
73 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
74 {
75 	queue->timer_list.expires = expires;
76 	mod_timer(&queue->timer_list.timer, expires);
77 }
78 
79 /*
80  * Set up a timer for the current task.
81  */
82 static void
83 __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
84 {
85 	if (!task->tk_timeout)
86 		return;
87 
88 	dprintk("RPC: %5u setting alarm for %lu ms\n",
89 			task->tk_pid, task->tk_timeout * 1000 / HZ);
90 
91 	task->u.tk_wait.expires = jiffies + task->tk_timeout;
92 	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
93 		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
94 	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
95 }
96 
97 /*
98  * Add new request to a priority queue.
99  */
100 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
101 {
102 	struct list_head *q;
103 	struct rpc_task *t;
104 
105 	INIT_LIST_HEAD(&task->u.tk_wait.links);
106 	q = &queue->tasks[task->tk_priority];
107 	if (unlikely(task->tk_priority > queue->maxpriority))
108 		q = &queue->tasks[queue->maxpriority];
109 	list_for_each_entry(t, q, u.tk_wait.list) {
110 		if (t->tk_owner == task->tk_owner) {
111 			list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
112 			return;
113 		}
114 	}
115 	list_add_tail(&task->u.tk_wait.list, q);
116 }
117 
118 /*
119  * Add new request to wait queue.
120  *
121  * Swapper tasks always get inserted at the head of the queue.
122  * This should avoid many nasty memory deadlocks and hopefully
123  * improve overall performance.
124  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
125  */
126 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
127 {
128 	BUG_ON (RPC_IS_QUEUED(task));
129 
130 	if (RPC_IS_PRIORITY(queue))
131 		__rpc_add_wait_queue_priority(queue, task);
132 	else if (RPC_IS_SWAPPER(task))
133 		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
134 	else
135 		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
136 	task->tk_waitqueue = queue;
137 	queue->qlen++;
138 	rpc_set_queued(task);
139 
140 	dprintk("RPC: %5u added to queue %p \"%s\"\n",
141 			task->tk_pid, queue, rpc_qname(queue));
142 }
143 
144 /*
145  * Remove request from a priority queue.
146  */
147 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
148 {
149 	struct rpc_task *t;
150 
151 	if (!list_empty(&task->u.tk_wait.links)) {
152 		t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
153 		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
154 		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
155 	}
156 }
157 
158 /*
159  * Remove request from queue.
160  * Note: must be called with spin lock held.
161  */
162 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
163 {
164 	__rpc_disable_timer(queue, task);
165 	if (RPC_IS_PRIORITY(queue))
166 		__rpc_remove_wait_queue_priority(task);
167 	list_del(&task->u.tk_wait.list);
168 	queue->qlen--;
169 	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
170 			task->tk_pid, queue, rpc_qname(queue));
171 }
172 
173 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
174 {
175 	queue->priority = priority;
176 	queue->count = 1 << (priority * 2);
177 }
178 
179 static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
180 {
181 	queue->owner = pid;
182 	queue->nr = RPC_BATCH_COUNT;
183 }
184 
185 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
186 {
187 	rpc_set_waitqueue_priority(queue, queue->maxpriority);
188 	rpc_set_waitqueue_owner(queue, 0);
189 }
190 
191 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
192 {
193 	int i;
194 
195 	spin_lock_init(&queue->lock);
196 	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
197 		INIT_LIST_HEAD(&queue->tasks[i]);
198 	queue->maxpriority = nr_queues - 1;
199 	rpc_reset_waitqueue_priority(queue);
200 	queue->qlen = 0;
201 	setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
202 	INIT_LIST_HEAD(&queue->timer_list.list);
203 #ifdef RPC_DEBUG
204 	queue->name = qname;
205 #endif
206 }
207 
208 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
209 {
210 	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
211 }
212 EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
213 
214 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
215 {
216 	__rpc_init_priority_wait_queue(queue, qname, 1);
217 }
218 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
219 
220 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
221 {
222 	del_timer_sync(&queue->timer_list.timer);
223 }
224 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
225 
226 static int rpc_wait_bit_killable(void *word)
227 {
228 	if (fatal_signal_pending(current))
229 		return -ERESTARTSYS;
230 	schedule();
231 	return 0;
232 }
233 
234 #ifdef RPC_DEBUG
235 static void rpc_task_set_debuginfo(struct rpc_task *task)
236 {
237 	static atomic_t rpc_pid;
238 
239 	task->tk_pid = atomic_inc_return(&rpc_pid);
240 }
241 #else
242 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
243 {
244 }
245 #endif
246 
247 static void rpc_set_active(struct rpc_task *task)
248 {
249 	rpc_task_set_debuginfo(task);
250 	set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
251 }
252 
253 /*
254  * Mark an RPC call as having completed by clearing the 'active' bit
255  */
256 static void rpc_mark_complete_task(struct rpc_task *task)
257 {
258 	smp_mb__before_clear_bit();
259 	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
260 	smp_mb__after_clear_bit();
261 	wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
262 }
263 
264 /*
265  * Allow callers to wait for completion of an RPC call
266  */
267 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
268 {
269 	if (action == NULL)
270 		action = rpc_wait_bit_killable;
271 	return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
272 			action, TASK_KILLABLE);
273 }
274 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
275 
276 /*
277  * Make an RPC task runnable.
278  *
279  * Note: If the task is ASYNC, this must be called with
280  * the spinlock held to protect the wait queue operation.
281  */
282 static void rpc_make_runnable(struct rpc_task *task)
283 {
284 	rpc_clear_queued(task);
285 	if (rpc_test_and_set_running(task))
286 		return;
287 	if (RPC_IS_ASYNC(task)) {
288 		int status;
289 
290 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
291 		status = queue_work(rpciod_workqueue, &task->u.tk_work);
292 		if (status < 0) {
293 			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
294 			task->tk_status = status;
295 			return;
296 		}
297 	} else
298 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
299 }
300 
301 /*
302  * Prepare for sleeping on a wait queue.
303  * By always appending tasks to the list we ensure FIFO behavior.
304  * NB: An RPC task will only receive interrupt-driven events as long
305  * as it's on a wait queue.
306  */
307 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
308 			rpc_action action)
309 {
310 	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
311 			task->tk_pid, rpc_qname(q), jiffies);
312 
313 	__rpc_add_wait_queue(q, task);
314 
315 	BUG_ON(task->tk_callback != NULL);
316 	task->tk_callback = action;
317 	__rpc_add_timer(q, task);
318 }
319 
320 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
321 				rpc_action action)
322 {
323 	/* We shouldn't ever put an inactive task to sleep */
324 	BUG_ON(!RPC_IS_ACTIVATED(task));
325 
326 	/*
327 	 * Protect the queue operations.
328 	 */
329 	spin_lock_bh(&q->lock);
330 	__rpc_sleep_on(q, task, action);
331 	spin_unlock_bh(&q->lock);
332 }
333 EXPORT_SYMBOL_GPL(rpc_sleep_on);
334 
335 /**
336  * __rpc_do_wake_up_task - wake up a single rpc_task
337  * @queue: wait queue
338  * @task: task to be woken up
339  *
340  * Caller must hold queue->lock, and have cleared the task queued flag.
341  */
342 static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
343 {
344 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
345 			task->tk_pid, jiffies);
346 
347 	/* Has the task been executed yet? If not, we cannot wake it up! */
348 	if (!RPC_IS_ACTIVATED(task)) {
349 		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
350 		return;
351 	}
352 
353 	__rpc_remove_wait_queue(queue, task);
354 
355 	rpc_make_runnable(task);
356 
357 	dprintk("RPC:       __rpc_wake_up_task done\n");
358 }
359 
360 /*
361  * Wake up a queued task while the queue lock is being held
362  */
363 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
364 {
365 	if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
366 		__rpc_do_wake_up_task(queue, task);
367 }
368 
369 /*
370  * Tests whether rpc queue is empty
371  */
372 int rpc_queue_empty(struct rpc_wait_queue *queue)
373 {
374 	int res;
375 
376 	spin_lock_bh(&queue->lock);
377 	res = queue->qlen;
378 	spin_unlock_bh(&queue->lock);
379 	return res == 0;
380 }
381 EXPORT_SYMBOL_GPL(rpc_queue_empty);
382 
383 /*
384  * Wake up a task on a specific queue
385  */
386 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
387 {
388 	spin_lock_bh(&queue->lock);
389 	rpc_wake_up_task_queue_locked(queue, task);
390 	spin_unlock_bh(&queue->lock);
391 }
392 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
393 
394 /*
395  * Wake up the next task on a priority queue.
396  */
397 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
398 {
399 	struct list_head *q;
400 	struct rpc_task *task;
401 
402 	/*
403 	 * Service a batch of tasks from a single owner.
404 	 */
405 	q = &queue->tasks[queue->priority];
406 	if (!list_empty(q)) {
407 		task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
408 		if (queue->owner == task->tk_owner) {
409 			if (--queue->nr)
410 				goto out;
411 			list_move_tail(&task->u.tk_wait.list, q);
412 		}
413 		/*
414 		 * Check if we need to switch queues.
415 		 */
416 		if (--queue->count)
417 			goto new_owner;
418 	}
419 
420 	/*
421 	 * Service the next queue.
422 	 */
423 	do {
424 		if (q == &queue->tasks[0])
425 			q = &queue->tasks[queue->maxpriority];
426 		else
427 			q = q - 1;
428 		if (!list_empty(q)) {
429 			task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
430 			goto new_queue;
431 		}
432 	} while (q != &queue->tasks[queue->priority]);
433 
434 	rpc_reset_waitqueue_priority(queue);
435 	return NULL;
436 
437 new_queue:
438 	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
439 new_owner:
440 	rpc_set_waitqueue_owner(queue, task->tk_owner);
441 out:
442 	rpc_wake_up_task_queue_locked(queue, task);
443 	return task;
444 }
445 
446 /*
447  * Wake up the next task on the wait queue.
448  */
449 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
450 {
451 	struct rpc_task	*task = NULL;
452 
453 	dprintk("RPC:       wake_up_next(%p \"%s\")\n",
454 			queue, rpc_qname(queue));
455 	spin_lock_bh(&queue->lock);
456 	if (RPC_IS_PRIORITY(queue))
457 		task = __rpc_wake_up_next_priority(queue);
458 	else {
459 		task_for_first(task, &queue->tasks[0])
460 			rpc_wake_up_task_queue_locked(queue, task);
461 	}
462 	spin_unlock_bh(&queue->lock);
463 
464 	return task;
465 }
466 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
467 
468 /**
469  * rpc_wake_up - wake up all rpc_tasks
470  * @queue: rpc_wait_queue on which the tasks are sleeping
471  *
472  * Grabs queue->lock
473  */
474 void rpc_wake_up(struct rpc_wait_queue *queue)
475 {
476 	struct rpc_task *task, *next;
477 	struct list_head *head;
478 
479 	spin_lock_bh(&queue->lock);
480 	head = &queue->tasks[queue->maxpriority];
481 	for (;;) {
482 		list_for_each_entry_safe(task, next, head, u.tk_wait.list)
483 			rpc_wake_up_task_queue_locked(queue, task);
484 		if (head == &queue->tasks[0])
485 			break;
486 		head--;
487 	}
488 	spin_unlock_bh(&queue->lock);
489 }
490 EXPORT_SYMBOL_GPL(rpc_wake_up);
491 
492 /**
493  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
494  * @queue: rpc_wait_queue on which the tasks are sleeping
495  * @status: status value to set
496  *
497  * Grabs queue->lock
498  */
499 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
500 {
501 	struct rpc_task *task, *next;
502 	struct list_head *head;
503 
504 	spin_lock_bh(&queue->lock);
505 	head = &queue->tasks[queue->maxpriority];
506 	for (;;) {
507 		list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
508 			task->tk_status = status;
509 			rpc_wake_up_task_queue_locked(queue, task);
510 		}
511 		if (head == &queue->tasks[0])
512 			break;
513 		head--;
514 	}
515 	spin_unlock_bh(&queue->lock);
516 }
517 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
518 
519 static void __rpc_queue_timer_fn(unsigned long ptr)
520 {
521 	struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
522 	struct rpc_task *task, *n;
523 	unsigned long expires, now, timeo;
524 
525 	spin_lock(&queue->lock);
526 	expires = now = jiffies;
527 	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
528 		timeo = task->u.tk_wait.expires;
529 		if (time_after_eq(now, timeo)) {
530 			dprintk("RPC: %5u timeout\n", task->tk_pid);
531 			task->tk_status = -ETIMEDOUT;
532 			rpc_wake_up_task_queue_locked(queue, task);
533 			continue;
534 		}
535 		if (expires == now || time_after(expires, timeo))
536 			expires = timeo;
537 	}
538 	if (!list_empty(&queue->timer_list.list))
539 		rpc_set_queue_timer(queue, expires);
540 	spin_unlock(&queue->lock);
541 }
542 
543 static void __rpc_atrun(struct rpc_task *task)
544 {
545 	task->tk_status = 0;
546 }
547 
548 /*
549  * Run a task at a later time
550  */
551 void rpc_delay(struct rpc_task *task, unsigned long delay)
552 {
553 	task->tk_timeout = delay;
554 	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
555 }
556 EXPORT_SYMBOL_GPL(rpc_delay);
557 
558 /*
559  * Helper to call task->tk_ops->rpc_call_prepare
560  */
561 void rpc_prepare_task(struct rpc_task *task)
562 {
563 	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
564 }
565 
566 /*
567  * Helper that calls task->tk_ops->rpc_call_done if it exists
568  */
569 void rpc_exit_task(struct rpc_task *task)
570 {
571 	task->tk_action = NULL;
572 	if (task->tk_ops->rpc_call_done != NULL) {
573 		task->tk_ops->rpc_call_done(task, task->tk_calldata);
574 		if (task->tk_action != NULL) {
575 			WARN_ON(RPC_ASSASSINATED(task));
576 			/* Always release the RPC slot and buffer memory */
577 			xprt_release(task);
578 		}
579 	}
580 }
581 
582 void rpc_exit(struct rpc_task *task, int status)
583 {
584 	task->tk_status = status;
585 	task->tk_action = rpc_exit_task;
586 	if (RPC_IS_QUEUED(task))
587 		rpc_wake_up_queued_task(task->tk_waitqueue, task);
588 }
589 EXPORT_SYMBOL_GPL(rpc_exit);
590 
591 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
592 {
593 	if (ops->rpc_release != NULL)
594 		ops->rpc_release(calldata);
595 }
596 
597 /*
598  * This is the RPC `scheduler' (or rather, the finite state machine).
599  */
600 static void __rpc_execute(struct rpc_task *task)
601 {
602 	struct rpc_wait_queue *queue;
603 	int task_is_async = RPC_IS_ASYNC(task);
604 	int status = 0;
605 
606 	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
607 			task->tk_pid, task->tk_flags);
608 
609 	BUG_ON(RPC_IS_QUEUED(task));
610 
611 	for (;;) {
612 
613 		/*
614 		 * Execute any pending callback.
615 		 */
616 		if (task->tk_callback) {
617 			void (*save_callback)(struct rpc_task *);
618 
619 			/*
620 			 * We set tk_callback to NULL before calling it,
621 			 * in case it sets the tk_callback field itself:
622 			 */
623 			save_callback = task->tk_callback;
624 			task->tk_callback = NULL;
625 			save_callback(task);
626 		}
627 
628 		/*
629 		 * Perform the next FSM step.
630 		 * tk_action may be NULL when the task has been killed
631 		 * by someone else.
632 		 */
633 		if (!RPC_IS_QUEUED(task)) {
634 			if (task->tk_action == NULL)
635 				break;
636 			task->tk_action(task);
637 		}
638 
639 		/*
640 		 * Lockless check for whether task is sleeping or not.
641 		 */
642 		if (!RPC_IS_QUEUED(task))
643 			continue;
644 		/*
645 		 * The queue->lock protects against races with
646 		 * rpc_make_runnable().
647 		 *
648 		 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
649 		 * rpc_task, rpc_make_runnable() can assign it to a
650 		 * different workqueue. We therefore cannot assume that the
651 		 * rpc_task pointer may still be dereferenced.
652 		 */
653 		queue = task->tk_waitqueue;
654 		spin_lock_bh(&queue->lock);
655 		if (!RPC_IS_QUEUED(task)) {
656 			spin_unlock_bh(&queue->lock);
657 			continue;
658 		}
659 		rpc_clear_running(task);
660 		spin_unlock_bh(&queue->lock);
661 		if (task_is_async)
662 			return;
663 
664 		/* sync task: sleep here */
665 		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
666 		status = out_of_line_wait_on_bit(&task->tk_runstate,
667 				RPC_TASK_QUEUED, rpc_wait_bit_killable,
668 				TASK_KILLABLE);
669 		if (status == -ERESTARTSYS) {
670 			/*
671 			 * When a sync task receives a signal, it exits with
672 			 * -ERESTARTSYS. In order to catch any callbacks that
673 			 * clean up after sleeping on some queue, we don't
674 			 * break the loop here, but go around once more.
675 			 */
676 			dprintk("RPC: %5u got signal\n", task->tk_pid);
677 			task->tk_flags |= RPC_TASK_KILLED;
678 			rpc_exit(task, -ERESTARTSYS);
679 		}
680 		rpc_set_running(task);
681 		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
682 	}
683 
684 	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
685 			task->tk_status);
686 	/* Release all resources associated with the task */
687 	rpc_release_task(task);
688 }
689 
690 /*
691  * User-visible entry point to the scheduler.
692  *
693  * This may be called recursively if e.g. an async NFS task updates
694  * the attributes and finds that dirty pages must be flushed.
695  * NOTE: Upon exit of this function the task is guaranteed to be
696  *	 released. In particular note that tk_release() will have
697  *	 been called, so your task memory may have been freed.
698  */
699 void rpc_execute(struct rpc_task *task)
700 {
701 	rpc_set_active(task);
702 	rpc_make_runnable(task);
703 	if (!RPC_IS_ASYNC(task))
704 		__rpc_execute(task);
705 }
706 
707 static void rpc_async_schedule(struct work_struct *work)
708 {
709 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
710 }
711 
712 /**
713  * rpc_malloc - allocate an RPC buffer
714  * @task: RPC task that will use this buffer
715  * @size: requested byte size
716  *
717  * To prevent rpciod from hanging, this allocator never sleeps,
718  * returning NULL if the request cannot be serviced immediately.
719  * The caller can arrange to sleep in a way that is safe for rpciod.
720  *
721  * Most requests are 'small' (under 2KiB) and can be serviced from a
722  * mempool, ensuring that NFS reads and writes can always proceed,
723  * and that there is good locality of reference for these buffers.
724  *
725  * In order to avoid memory starvation triggering more writebacks of
726  * NFS requests, we avoid using GFP_KERNEL.
727  */
728 void *rpc_malloc(struct rpc_task *task, size_t size)
729 {
730 	struct rpc_buffer *buf;
731 	gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
732 
733 	size += sizeof(struct rpc_buffer);
734 	if (size <= RPC_BUFFER_MAXSIZE)
735 		buf = mempool_alloc(rpc_buffer_mempool, gfp);
736 	else
737 		buf = kmalloc(size, gfp);
738 
739 	if (!buf)
740 		return NULL;
741 
742 	buf->len = size;
743 	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
744 			task->tk_pid, size, buf);
745 	return &buf->data;
746 }
747 EXPORT_SYMBOL_GPL(rpc_malloc);
748 
749 /**
750  * rpc_free - free buffer allocated via rpc_malloc
751  * @buffer: buffer to free
752  *
753  */
754 void rpc_free(void *buffer)
755 {
756 	size_t size;
757 	struct rpc_buffer *buf;
758 
759 	if (!buffer)
760 		return;
761 
762 	buf = container_of(buffer, struct rpc_buffer, data);
763 	size = buf->len;
764 
765 	dprintk("RPC:       freeing buffer of size %zu at %p\n",
766 			size, buf);
767 
768 	if (size <= RPC_BUFFER_MAXSIZE)
769 		mempool_free(buf, rpc_buffer_mempool);
770 	else
771 		kfree(buf);
772 }
773 EXPORT_SYMBOL_GPL(rpc_free);
774 
775 /*
776  * Creation and deletion of RPC task structures
777  */
778 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
779 {
780 	memset(task, 0, sizeof(*task));
781 	atomic_set(&task->tk_count, 1);
782 	task->tk_flags  = task_setup_data->flags;
783 	task->tk_ops = task_setup_data->callback_ops;
784 	task->tk_calldata = task_setup_data->callback_data;
785 	INIT_LIST_HEAD(&task->tk_task);
786 
787 	/* Initialize retry counters */
788 	task->tk_garb_retry = 2;
789 	task->tk_cred_retry = 2;
790 
791 	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
792 	task->tk_owner = current->tgid;
793 
794 	/* Initialize workqueue for async tasks */
795 	task->tk_workqueue = task_setup_data->workqueue;
796 
797 	if (task->tk_ops->rpc_call_prepare != NULL)
798 		task->tk_action = rpc_prepare_task;
799 
800 	/* starting timestamp */
801 	task->tk_start = ktime_get();
802 
803 	dprintk("RPC:       new task initialized, procpid %u\n",
804 				task_pid_nr(current));
805 }
806 
807 static struct rpc_task *
808 rpc_alloc_task(void)
809 {
810 	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
811 }
812 
813 /*
814  * Create a new task for the specified client.
815  */
816 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
817 {
818 	struct rpc_task	*task = setup_data->task;
819 	unsigned short flags = 0;
820 
821 	if (task == NULL) {
822 		task = rpc_alloc_task();
823 		if (task == NULL) {
824 			rpc_release_calldata(setup_data->callback_ops,
825 					setup_data->callback_data);
826 			return ERR_PTR(-ENOMEM);
827 		}
828 		flags = RPC_TASK_DYNAMIC;
829 	}
830 
831 	rpc_init_task(task, setup_data);
832 	if (task->tk_status < 0) {
833 		int err = task->tk_status;
834 		rpc_put_task(task);
835 		return ERR_PTR(err);
836 	}
837 
838 	task->tk_flags |= flags;
839 	dprintk("RPC:       allocated task %p\n", task);
840 	return task;
841 }
842 
843 static void rpc_free_task(struct rpc_task *task)
844 {
845 	const struct rpc_call_ops *tk_ops = task->tk_ops;
846 	void *calldata = task->tk_calldata;
847 
848 	if (task->tk_flags & RPC_TASK_DYNAMIC) {
849 		dprintk("RPC: %5u freeing task\n", task->tk_pid);
850 		mempool_free(task, rpc_task_mempool);
851 	}
852 	rpc_release_calldata(tk_ops, calldata);
853 }
854 
855 static void rpc_async_release(struct work_struct *work)
856 {
857 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
858 }
859 
860 void rpc_put_task(struct rpc_task *task)
861 {
862 	if (!atomic_dec_and_test(&task->tk_count))
863 		return;
864 	/* Release resources */
865 	if (task->tk_rqstp)
866 		xprt_release(task);
867 	if (task->tk_msg.rpc_cred)
868 		put_rpccred(task->tk_msg.rpc_cred);
869 	rpc_task_release_client(task);
870 	if (task->tk_workqueue != NULL) {
871 		INIT_WORK(&task->u.tk_work, rpc_async_release);
872 		queue_work(task->tk_workqueue, &task->u.tk_work);
873 	} else
874 		rpc_free_task(task);
875 }
876 EXPORT_SYMBOL_GPL(rpc_put_task);
877 
878 static void rpc_release_task(struct rpc_task *task)
879 {
880 	dprintk("RPC: %5u release task\n", task->tk_pid);
881 
882 	BUG_ON (RPC_IS_QUEUED(task));
883 
884 	/* Wake up anyone who is waiting for task completion */
885 	rpc_mark_complete_task(task);
886 
887 	rpc_put_task(task);
888 }
889 
890 int rpciod_up(void)
891 {
892 	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
893 }
894 
895 void rpciod_down(void)
896 {
897 	module_put(THIS_MODULE);
898 }
899 
900 /*
901  * Start up the rpciod workqueue.
902  */
903 static int rpciod_start(void)
904 {
905 	struct workqueue_struct *wq;
906 
907 	/*
908 	 * Create the rpciod thread and wait for it to start.
909 	 */
910 	dprintk("RPC:       creating workqueue rpciod\n");
911 	wq = alloc_workqueue("rpciod", WQ_RESCUER, 0);
912 	rpciod_workqueue = wq;
913 	return rpciod_workqueue != NULL;
914 }
915 
916 static void rpciod_stop(void)
917 {
918 	struct workqueue_struct *wq = NULL;
919 
920 	if (rpciod_workqueue == NULL)
921 		return;
922 	dprintk("RPC:       destroying workqueue rpciod\n");
923 
924 	wq = rpciod_workqueue;
925 	rpciod_workqueue = NULL;
926 	destroy_workqueue(wq);
927 }
928 
929 void
930 rpc_destroy_mempool(void)
931 {
932 	rpciod_stop();
933 	if (rpc_buffer_mempool)
934 		mempool_destroy(rpc_buffer_mempool);
935 	if (rpc_task_mempool)
936 		mempool_destroy(rpc_task_mempool);
937 	if (rpc_task_slabp)
938 		kmem_cache_destroy(rpc_task_slabp);
939 	if (rpc_buffer_slabp)
940 		kmem_cache_destroy(rpc_buffer_slabp);
941 	rpc_destroy_wait_queue(&delay_queue);
942 }
943 
944 int
945 rpc_init_mempool(void)
946 {
947 	/*
948 	 * The following is not strictly a mempool initialisation,
949 	 * but there is no harm in doing it here
950 	 */
951 	rpc_init_wait_queue(&delay_queue, "delayq");
952 	if (!rpciod_start())
953 		goto err_nomem;
954 
955 	rpc_task_slabp = kmem_cache_create("rpc_tasks",
956 					     sizeof(struct rpc_task),
957 					     0, SLAB_HWCACHE_ALIGN,
958 					     NULL);
959 	if (!rpc_task_slabp)
960 		goto err_nomem;
961 	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
962 					     RPC_BUFFER_MAXSIZE,
963 					     0, SLAB_HWCACHE_ALIGN,
964 					     NULL);
965 	if (!rpc_buffer_slabp)
966 		goto err_nomem;
967 	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
968 						    rpc_task_slabp);
969 	if (!rpc_task_mempool)
970 		goto err_nomem;
971 	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
972 						      rpc_buffer_slabp);
973 	if (!rpc_buffer_mempool)
974 		goto err_nomem;
975 	return 0;
976 err_nomem:
977 	rpc_destroy_mempool();
978 	return -ENOMEM;
979 }
980