xref: /openbmc/linux/net/sunrpc/sched.c (revision 643d1f7f)
1 /*
2  * linux/net/sunrpc/sched.c
3  *
4  * Scheduling for synchronous and asynchronous RPC requests.
5  *
6  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7  *
8  * TCP NFS related read + write fixes
9  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  */
11 
12 #include <linux/module.h>
13 
14 #include <linux/sched.h>
15 #include <linux/interrupt.h>
16 #include <linux/slab.h>
17 #include <linux/mempool.h>
18 #include <linux/smp.h>
19 #include <linux/smp_lock.h>
20 #include <linux/spinlock.h>
21 #include <linux/mutex.h>
22 
23 #include <linux/sunrpc/clnt.h>
24 
25 #ifdef RPC_DEBUG
26 #define RPCDBG_FACILITY		RPCDBG_SCHED
27 #define RPC_TASK_MAGIC_ID	0xf00baa
28 #endif
29 
30 /*
31  * RPC slabs and memory pools
32  */
33 #define RPC_BUFFER_MAXSIZE	(2048)
34 #define RPC_BUFFER_POOLSIZE	(8)
35 #define RPC_TASK_POOLSIZE	(8)
36 static struct kmem_cache	*rpc_task_slabp __read_mostly;
37 static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
38 static mempool_t	*rpc_task_mempool __read_mostly;
39 static mempool_t	*rpc_buffer_mempool __read_mostly;
40 
41 static void			__rpc_default_timer(struct rpc_task *task);
42 static void			rpc_async_schedule(struct work_struct *);
43 static void			 rpc_release_task(struct rpc_task *task);
44 
45 /*
46  * RPC tasks sit here while waiting for conditions to improve.
47  */
48 static struct rpc_wait_queue delay_queue;
49 
50 /*
51  * rpciod-related stuff
52  */
53 struct workqueue_struct *rpciod_workqueue;
54 
55 /*
56  * Disable the timer for a given RPC task. Should be called with
57  * queue->lock and bh_disabled in order to avoid races within
58  * rpc_run_timer().
59  */
60 static inline void
61 __rpc_disable_timer(struct rpc_task *task)
62 {
63 	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
64 	task->tk_timeout_fn = NULL;
65 	task->tk_timeout = 0;
66 }
67 
68 /*
69  * Run a timeout function.
70  * We use the callback in order to allow __rpc_wake_up_task()
71  * and friends to disable the timer synchronously on SMP systems
72  * without calling del_timer_sync(). The latter could cause a
73  * deadlock if called while we're holding spinlocks...
74  */
75 static void rpc_run_timer(struct rpc_task *task)
76 {
77 	void (*callback)(struct rpc_task *);
78 
79 	callback = task->tk_timeout_fn;
80 	task->tk_timeout_fn = NULL;
81 	if (callback && RPC_IS_QUEUED(task)) {
82 		dprintk("RPC: %5u running timer\n", task->tk_pid);
83 		callback(task);
84 	}
85 	smp_mb__before_clear_bit();
86 	clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
87 	smp_mb__after_clear_bit();
88 }
89 
90 /*
91  * Set up a timer for the current task.
92  */
93 static inline void
94 __rpc_add_timer(struct rpc_task *task, rpc_action timer)
95 {
96 	if (!task->tk_timeout)
97 		return;
98 
99 	dprintk("RPC: %5u setting alarm for %lu ms\n",
100 			task->tk_pid, task->tk_timeout * 1000 / HZ);
101 
102 	if (timer)
103 		task->tk_timeout_fn = timer;
104 	else
105 		task->tk_timeout_fn = __rpc_default_timer;
106 	set_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate);
107 	mod_timer(&task->tk_timer, jiffies + task->tk_timeout);
108 }
109 
110 /*
111  * Delete any timer for the current task. Because we use del_timer_sync(),
112  * this function should never be called while holding queue->lock.
113  */
114 static void
115 rpc_delete_timer(struct rpc_task *task)
116 {
117 	if (RPC_IS_QUEUED(task))
118 		return;
119 	if (test_and_clear_bit(RPC_TASK_HAS_TIMER, &task->tk_runstate)) {
120 		del_singleshot_timer_sync(&task->tk_timer);
121 		dprintk("RPC: %5u deleting timer\n", task->tk_pid);
122 	}
123 }
124 
125 /*
126  * Add new request to a priority queue.
127  */
128 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
129 {
130 	struct list_head *q;
131 	struct rpc_task *t;
132 
133 	INIT_LIST_HEAD(&task->u.tk_wait.links);
134 	q = &queue->tasks[task->tk_priority];
135 	if (unlikely(task->tk_priority > queue->maxpriority))
136 		q = &queue->tasks[queue->maxpriority];
137 	list_for_each_entry(t, q, u.tk_wait.list) {
138 		if (t->tk_owner == task->tk_owner) {
139 			list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
140 			return;
141 		}
142 	}
143 	list_add_tail(&task->u.tk_wait.list, q);
144 }
145 
146 /*
147  * Add new request to wait queue.
148  *
149  * Swapper tasks always get inserted at the head of the queue.
150  * This should avoid many nasty memory deadlocks and hopefully
151  * improve overall performance.
152  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
153  */
154 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
155 {
156 	BUG_ON (RPC_IS_QUEUED(task));
157 
158 	if (RPC_IS_PRIORITY(queue))
159 		__rpc_add_wait_queue_priority(queue, task);
160 	else if (RPC_IS_SWAPPER(task))
161 		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
162 	else
163 		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
164 	task->u.tk_wait.rpc_waitq = queue;
165 	queue->qlen++;
166 	rpc_set_queued(task);
167 
168 	dprintk("RPC: %5u added to queue %p \"%s\"\n",
169 			task->tk_pid, queue, rpc_qname(queue));
170 }
171 
172 /*
173  * Remove request from a priority queue.
174  */
175 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
176 {
177 	struct rpc_task *t;
178 
179 	if (!list_empty(&task->u.tk_wait.links)) {
180 		t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
181 		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
182 		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
183 	}
184 	list_del(&task->u.tk_wait.list);
185 }
186 
187 /*
188  * Remove request from queue.
189  * Note: must be called with spin lock held.
190  */
191 static void __rpc_remove_wait_queue(struct rpc_task *task)
192 {
193 	struct rpc_wait_queue *queue;
194 	queue = task->u.tk_wait.rpc_waitq;
195 
196 	if (RPC_IS_PRIORITY(queue))
197 		__rpc_remove_wait_queue_priority(task);
198 	else
199 		list_del(&task->u.tk_wait.list);
200 	queue->qlen--;
201 	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
202 			task->tk_pid, queue, rpc_qname(queue));
203 }
204 
205 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
206 {
207 	queue->priority = priority;
208 	queue->count = 1 << (priority * 2);
209 }
210 
211 static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
212 {
213 	queue->owner = pid;
214 	queue->nr = RPC_BATCH_COUNT;
215 }
216 
217 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
218 {
219 	rpc_set_waitqueue_priority(queue, queue->maxpriority);
220 	rpc_set_waitqueue_owner(queue, 0);
221 }
222 
223 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
224 {
225 	int i;
226 
227 	spin_lock_init(&queue->lock);
228 	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
229 		INIT_LIST_HEAD(&queue->tasks[i]);
230 	queue->maxpriority = nr_queues - 1;
231 	rpc_reset_waitqueue_priority(queue);
232 #ifdef RPC_DEBUG
233 	queue->name = qname;
234 #endif
235 }
236 
237 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
238 {
239 	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
240 }
241 
242 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
243 {
244 	__rpc_init_priority_wait_queue(queue, qname, 1);
245 }
246 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
247 
248 static int rpc_wait_bit_killable(void *word)
249 {
250 	if (fatal_signal_pending(current))
251 		return -ERESTARTSYS;
252 	schedule();
253 	return 0;
254 }
255 
256 #ifdef RPC_DEBUG
257 static void rpc_task_set_debuginfo(struct rpc_task *task)
258 {
259 	static atomic_t rpc_pid;
260 
261 	task->tk_magic = RPC_TASK_MAGIC_ID;
262 	task->tk_pid = atomic_inc_return(&rpc_pid);
263 }
264 #else
265 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
266 {
267 }
268 #endif
269 
270 static void rpc_set_active(struct rpc_task *task)
271 {
272 	struct rpc_clnt *clnt;
273 	if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
274 		return;
275 	rpc_task_set_debuginfo(task);
276 	/* Add to global list of all tasks */
277 	clnt = task->tk_client;
278 	if (clnt != NULL) {
279 		spin_lock(&clnt->cl_lock);
280 		list_add_tail(&task->tk_task, &clnt->cl_tasks);
281 		spin_unlock(&clnt->cl_lock);
282 	}
283 }
284 
285 /*
286  * Mark an RPC call as having completed by clearing the 'active' bit
287  */
288 static void rpc_mark_complete_task(struct rpc_task *task)
289 {
290 	smp_mb__before_clear_bit();
291 	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
292 	smp_mb__after_clear_bit();
293 	wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
294 }
295 
296 /*
297  * Allow callers to wait for completion of an RPC call
298  */
299 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
300 {
301 	if (action == NULL)
302 		action = rpc_wait_bit_killable;
303 	return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
304 			action, TASK_KILLABLE);
305 }
306 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
307 
308 /*
309  * Make an RPC task runnable.
310  *
311  * Note: If the task is ASYNC, this must be called with
312  * the spinlock held to protect the wait queue operation.
313  */
314 static void rpc_make_runnable(struct rpc_task *task)
315 {
316 	BUG_ON(task->tk_timeout_fn);
317 	rpc_clear_queued(task);
318 	if (rpc_test_and_set_running(task))
319 		return;
320 	/* We might have raced */
321 	if (RPC_IS_QUEUED(task)) {
322 		rpc_clear_running(task);
323 		return;
324 	}
325 	if (RPC_IS_ASYNC(task)) {
326 		int status;
327 
328 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
329 		status = queue_work(task->tk_workqueue, &task->u.tk_work);
330 		if (status < 0) {
331 			printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
332 			task->tk_status = status;
333 			return;
334 		}
335 	} else
336 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
337 }
338 
339 /*
340  * Prepare for sleeping on a wait queue.
341  * By always appending tasks to the list we ensure FIFO behavior.
342  * NB: An RPC task will only receive interrupt-driven events as long
343  * as it's on a wait queue.
344  */
345 static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
346 			rpc_action action, rpc_action timer)
347 {
348 	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
349 			task->tk_pid, rpc_qname(q), jiffies);
350 
351 	if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
352 		printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
353 		return;
354 	}
355 
356 	__rpc_add_wait_queue(q, task);
357 
358 	BUG_ON(task->tk_callback != NULL);
359 	task->tk_callback = action;
360 	__rpc_add_timer(task, timer);
361 }
362 
363 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
364 				rpc_action action, rpc_action timer)
365 {
366 	/* Mark the task as being activated if so needed */
367 	rpc_set_active(task);
368 
369 	/*
370 	 * Protect the queue operations.
371 	 */
372 	spin_lock_bh(&q->lock);
373 	__rpc_sleep_on(q, task, action, timer);
374 	spin_unlock_bh(&q->lock);
375 }
376 EXPORT_SYMBOL_GPL(rpc_sleep_on);
377 
378 /**
379  * __rpc_do_wake_up_task - wake up a single rpc_task
380  * @task: task to be woken up
381  *
382  * Caller must hold queue->lock, and have cleared the task queued flag.
383  */
384 static void __rpc_do_wake_up_task(struct rpc_task *task)
385 {
386 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
387 			task->tk_pid, jiffies);
388 
389 #ifdef RPC_DEBUG
390 	BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
391 #endif
392 	/* Has the task been executed yet? If not, we cannot wake it up! */
393 	if (!RPC_IS_ACTIVATED(task)) {
394 		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
395 		return;
396 	}
397 
398 	__rpc_disable_timer(task);
399 	__rpc_remove_wait_queue(task);
400 
401 	rpc_make_runnable(task);
402 
403 	dprintk("RPC:       __rpc_wake_up_task done\n");
404 }
405 
406 /*
407  * Wake up the specified task
408  */
409 static void __rpc_wake_up_task(struct rpc_task *task)
410 {
411 	if (rpc_start_wakeup(task)) {
412 		if (RPC_IS_QUEUED(task))
413 			__rpc_do_wake_up_task(task);
414 		rpc_finish_wakeup(task);
415 	}
416 }
417 
418 /*
419  * Default timeout handler if none specified by user
420  */
421 static void
422 __rpc_default_timer(struct rpc_task *task)
423 {
424 	dprintk("RPC: %5u timeout (default timer)\n", task->tk_pid);
425 	task->tk_status = -ETIMEDOUT;
426 	rpc_wake_up_task(task);
427 }
428 
429 /*
430  * Wake up the specified task
431  */
432 void rpc_wake_up_task(struct rpc_task *task)
433 {
434 	rcu_read_lock_bh();
435 	if (rpc_start_wakeup(task)) {
436 		if (RPC_IS_QUEUED(task)) {
437 			struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
438 
439 			/* Note: we're already in a bh-safe context */
440 			spin_lock(&queue->lock);
441 			__rpc_do_wake_up_task(task);
442 			spin_unlock(&queue->lock);
443 		}
444 		rpc_finish_wakeup(task);
445 	}
446 	rcu_read_unlock_bh();
447 }
448 EXPORT_SYMBOL_GPL(rpc_wake_up_task);
449 
450 /*
451  * Wake up the next task on a priority queue.
452  */
453 static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
454 {
455 	struct list_head *q;
456 	struct rpc_task *task;
457 
458 	/*
459 	 * Service a batch of tasks from a single owner.
460 	 */
461 	q = &queue->tasks[queue->priority];
462 	if (!list_empty(q)) {
463 		task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
464 		if (queue->owner == task->tk_owner) {
465 			if (--queue->nr)
466 				goto out;
467 			list_move_tail(&task->u.tk_wait.list, q);
468 		}
469 		/*
470 		 * Check if we need to switch queues.
471 		 */
472 		if (--queue->count)
473 			goto new_owner;
474 	}
475 
476 	/*
477 	 * Service the next queue.
478 	 */
479 	do {
480 		if (q == &queue->tasks[0])
481 			q = &queue->tasks[queue->maxpriority];
482 		else
483 			q = q - 1;
484 		if (!list_empty(q)) {
485 			task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
486 			goto new_queue;
487 		}
488 	} while (q != &queue->tasks[queue->priority]);
489 
490 	rpc_reset_waitqueue_priority(queue);
491 	return NULL;
492 
493 new_queue:
494 	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
495 new_owner:
496 	rpc_set_waitqueue_owner(queue, task->tk_owner);
497 out:
498 	__rpc_wake_up_task(task);
499 	return task;
500 }
501 
502 /*
503  * Wake up the next task on the wait queue.
504  */
505 struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
506 {
507 	struct rpc_task	*task = NULL;
508 
509 	dprintk("RPC:       wake_up_next(%p \"%s\")\n",
510 			queue, rpc_qname(queue));
511 	rcu_read_lock_bh();
512 	spin_lock(&queue->lock);
513 	if (RPC_IS_PRIORITY(queue))
514 		task = __rpc_wake_up_next_priority(queue);
515 	else {
516 		task_for_first(task, &queue->tasks[0])
517 			__rpc_wake_up_task(task);
518 	}
519 	spin_unlock(&queue->lock);
520 	rcu_read_unlock_bh();
521 
522 	return task;
523 }
524 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
525 
526 /**
527  * rpc_wake_up - wake up all rpc_tasks
528  * @queue: rpc_wait_queue on which the tasks are sleeping
529  *
530  * Grabs queue->lock
531  */
532 void rpc_wake_up(struct rpc_wait_queue *queue)
533 {
534 	struct rpc_task *task, *next;
535 	struct list_head *head;
536 
537 	rcu_read_lock_bh();
538 	spin_lock(&queue->lock);
539 	head = &queue->tasks[queue->maxpriority];
540 	for (;;) {
541 		list_for_each_entry_safe(task, next, head, u.tk_wait.list)
542 			__rpc_wake_up_task(task);
543 		if (head == &queue->tasks[0])
544 			break;
545 		head--;
546 	}
547 	spin_unlock(&queue->lock);
548 	rcu_read_unlock_bh();
549 }
550 EXPORT_SYMBOL_GPL(rpc_wake_up);
551 
552 /**
553  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
554  * @queue: rpc_wait_queue on which the tasks are sleeping
555  * @status: status value to set
556  *
557  * Grabs queue->lock
558  */
559 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
560 {
561 	struct rpc_task *task, *next;
562 	struct list_head *head;
563 
564 	rcu_read_lock_bh();
565 	spin_lock(&queue->lock);
566 	head = &queue->tasks[queue->maxpriority];
567 	for (;;) {
568 		list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
569 			task->tk_status = status;
570 			__rpc_wake_up_task(task);
571 		}
572 		if (head == &queue->tasks[0])
573 			break;
574 		head--;
575 	}
576 	spin_unlock(&queue->lock);
577 	rcu_read_unlock_bh();
578 }
579 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
580 
581 static void __rpc_atrun(struct rpc_task *task)
582 {
583 	rpc_wake_up_task(task);
584 }
585 
586 /*
587  * Run a task at a later time
588  */
589 void rpc_delay(struct rpc_task *task, unsigned long delay)
590 {
591 	task->tk_timeout = delay;
592 	rpc_sleep_on(&delay_queue, task, NULL, __rpc_atrun);
593 }
594 EXPORT_SYMBOL_GPL(rpc_delay);
595 
596 /*
597  * Helper to call task->tk_ops->rpc_call_prepare
598  */
599 static void rpc_prepare_task(struct rpc_task *task)
600 {
601 	lock_kernel();
602 	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
603 	unlock_kernel();
604 }
605 
606 /*
607  * Helper that calls task->tk_ops->rpc_call_done if it exists
608  */
609 void rpc_exit_task(struct rpc_task *task)
610 {
611 	task->tk_action = NULL;
612 	if (task->tk_ops->rpc_call_done != NULL) {
613 		lock_kernel();
614 		task->tk_ops->rpc_call_done(task, task->tk_calldata);
615 		unlock_kernel();
616 		if (task->tk_action != NULL) {
617 			WARN_ON(RPC_ASSASSINATED(task));
618 			/* Always release the RPC slot and buffer memory */
619 			xprt_release(task);
620 		}
621 	}
622 }
623 EXPORT_SYMBOL_GPL(rpc_exit_task);
624 
625 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
626 {
627 	if (ops->rpc_release != NULL) {
628 		lock_kernel();
629 		ops->rpc_release(calldata);
630 		unlock_kernel();
631 	}
632 }
633 
634 /*
635  * This is the RPC `scheduler' (or rather, the finite state machine).
636  */
637 static void __rpc_execute(struct rpc_task *task)
638 {
639 	int		status = 0;
640 
641 	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
642 			task->tk_pid, task->tk_flags);
643 
644 	BUG_ON(RPC_IS_QUEUED(task));
645 
646 	for (;;) {
647 		/*
648 		 * Garbage collection of pending timers...
649 		 */
650 		rpc_delete_timer(task);
651 
652 		/*
653 		 * Execute any pending callback.
654 		 */
655 		if (RPC_DO_CALLBACK(task)) {
656 			/* Define a callback save pointer */
657 			void (*save_callback)(struct rpc_task *);
658 
659 			/*
660 			 * If a callback exists, save it, reset it,
661 			 * call it.
662 			 * The save is needed to stop from resetting
663 			 * another callback set within the callback handler
664 			 * - Dave
665 			 */
666 			save_callback=task->tk_callback;
667 			task->tk_callback=NULL;
668 			save_callback(task);
669 		}
670 
671 		/*
672 		 * Perform the next FSM step.
673 		 * tk_action may be NULL when the task has been killed
674 		 * by someone else.
675 		 */
676 		if (!RPC_IS_QUEUED(task)) {
677 			if (task->tk_action == NULL)
678 				break;
679 			task->tk_action(task);
680 		}
681 
682 		/*
683 		 * Lockless check for whether task is sleeping or not.
684 		 */
685 		if (!RPC_IS_QUEUED(task))
686 			continue;
687 		rpc_clear_running(task);
688 		if (RPC_IS_ASYNC(task)) {
689 			/* Careful! we may have raced... */
690 			if (RPC_IS_QUEUED(task))
691 				return;
692 			if (rpc_test_and_set_running(task))
693 				return;
694 			continue;
695 		}
696 
697 		/* sync task: sleep here */
698 		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
699 		status = out_of_line_wait_on_bit(&task->tk_runstate,
700 				RPC_TASK_QUEUED, rpc_wait_bit_killable,
701 				TASK_KILLABLE);
702 		if (status == -ERESTARTSYS) {
703 			/*
704 			 * When a sync task receives a signal, it exits with
705 			 * -ERESTARTSYS. In order to catch any callbacks that
706 			 * clean up after sleeping on some queue, we don't
707 			 * break the loop here, but go around once more.
708 			 */
709 			dprintk("RPC: %5u got signal\n", task->tk_pid);
710 			task->tk_flags |= RPC_TASK_KILLED;
711 			rpc_exit(task, -ERESTARTSYS);
712 			rpc_wake_up_task(task);
713 		}
714 		rpc_set_running(task);
715 		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
716 	}
717 
718 	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
719 			task->tk_status);
720 	/* Release all resources associated with the task */
721 	rpc_release_task(task);
722 }
723 
724 /*
725  * User-visible entry point to the scheduler.
726  *
727  * This may be called recursively if e.g. an async NFS task updates
728  * the attributes and finds that dirty pages must be flushed.
729  * NOTE: Upon exit of this function the task is guaranteed to be
730  *	 released. In particular note that tk_release() will have
731  *	 been called, so your task memory may have been freed.
732  */
733 void rpc_execute(struct rpc_task *task)
734 {
735 	rpc_set_active(task);
736 	rpc_set_running(task);
737 	__rpc_execute(task);
738 }
739 
740 static void rpc_async_schedule(struct work_struct *work)
741 {
742 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
743 }
744 
745 struct rpc_buffer {
746 	size_t	len;
747 	char	data[];
748 };
749 
750 /**
751  * rpc_malloc - allocate an RPC buffer
752  * @task: RPC task that will use this buffer
753  * @size: requested byte size
754  *
755  * To prevent rpciod from hanging, this allocator never sleeps,
756  * returning NULL if the request cannot be serviced immediately.
757  * The caller can arrange to sleep in a way that is safe for rpciod.
758  *
759  * Most requests are 'small' (under 2KiB) and can be serviced from a
760  * mempool, ensuring that NFS reads and writes can always proceed,
761  * and that there is good locality of reference for these buffers.
762  *
763  * In order to avoid memory starvation triggering more writebacks of
764  * NFS requests, we avoid using GFP_KERNEL.
765  */
766 void *rpc_malloc(struct rpc_task *task, size_t size)
767 {
768 	struct rpc_buffer *buf;
769 	gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
770 
771 	size += sizeof(struct rpc_buffer);
772 	if (size <= RPC_BUFFER_MAXSIZE)
773 		buf = mempool_alloc(rpc_buffer_mempool, gfp);
774 	else
775 		buf = kmalloc(size, gfp);
776 
777 	if (!buf)
778 		return NULL;
779 
780 	buf->len = size;
781 	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
782 			task->tk_pid, size, buf);
783 	return &buf->data;
784 }
785 EXPORT_SYMBOL_GPL(rpc_malloc);
786 
787 /**
788  * rpc_free - free buffer allocated via rpc_malloc
789  * @buffer: buffer to free
790  *
791  */
792 void rpc_free(void *buffer)
793 {
794 	size_t size;
795 	struct rpc_buffer *buf;
796 
797 	if (!buffer)
798 		return;
799 
800 	buf = container_of(buffer, struct rpc_buffer, data);
801 	size = buf->len;
802 
803 	dprintk("RPC:       freeing buffer of size %zu at %p\n",
804 			size, buf);
805 
806 	if (size <= RPC_BUFFER_MAXSIZE)
807 		mempool_free(buf, rpc_buffer_mempool);
808 	else
809 		kfree(buf);
810 }
811 EXPORT_SYMBOL_GPL(rpc_free);
812 
813 /*
814  * Creation and deletion of RPC task structures
815  */
816 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
817 {
818 	memset(task, 0, sizeof(*task));
819 	setup_timer(&task->tk_timer, (void (*)(unsigned long))rpc_run_timer,
820 			(unsigned long)task);
821 	atomic_set(&task->tk_count, 1);
822 	task->tk_flags  = task_setup_data->flags;
823 	task->tk_ops = task_setup_data->callback_ops;
824 	task->tk_calldata = task_setup_data->callback_data;
825 	INIT_LIST_HEAD(&task->tk_task);
826 
827 	/* Initialize retry counters */
828 	task->tk_garb_retry = 2;
829 	task->tk_cred_retry = 2;
830 
831 	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
832 	task->tk_owner = current->tgid;
833 
834 	/* Initialize workqueue for async tasks */
835 	task->tk_workqueue = rpciod_workqueue;
836 
837 	task->tk_client = task_setup_data->rpc_client;
838 	if (task->tk_client != NULL) {
839 		kref_get(&task->tk_client->cl_kref);
840 		if (task->tk_client->cl_softrtry)
841 			task->tk_flags |= RPC_TASK_SOFT;
842 	}
843 
844 	if (task->tk_ops->rpc_call_prepare != NULL)
845 		task->tk_action = rpc_prepare_task;
846 
847 	if (task_setup_data->rpc_message != NULL) {
848 		memcpy(&task->tk_msg, task_setup_data->rpc_message, sizeof(task->tk_msg));
849 		/* Bind the user cred */
850 		if (task->tk_msg.rpc_cred != NULL)
851 			rpcauth_holdcred(task);
852 		else
853 			rpcauth_bindcred(task);
854 		if (task->tk_action == NULL)
855 			rpc_call_start(task);
856 	}
857 
858 	/* starting timestamp */
859 	task->tk_start = jiffies;
860 
861 	dprintk("RPC:       new task initialized, procpid %u\n",
862 				task_pid_nr(current));
863 }
864 
865 static struct rpc_task *
866 rpc_alloc_task(void)
867 {
868 	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
869 }
870 
871 static void rpc_free_task(struct rcu_head *rcu)
872 {
873 	struct rpc_task *task = container_of(rcu, struct rpc_task, u.tk_rcu);
874 	dprintk("RPC: %5u freeing task\n", task->tk_pid);
875 	mempool_free(task, rpc_task_mempool);
876 }
877 
878 /*
879  * Create a new task for the specified client.
880  */
881 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
882 {
883 	struct rpc_task	*task = setup_data->task;
884 	unsigned short flags = 0;
885 
886 	if (task == NULL) {
887 		task = rpc_alloc_task();
888 		if (task == NULL)
889 			goto out;
890 		flags = RPC_TASK_DYNAMIC;
891 	}
892 
893 	rpc_init_task(task, setup_data);
894 
895 	task->tk_flags |= flags;
896 	dprintk("RPC:       allocated task %p\n", task);
897 out:
898 	return task;
899 }
900 
901 
902 void rpc_put_task(struct rpc_task *task)
903 {
904 	const struct rpc_call_ops *tk_ops = task->tk_ops;
905 	void *calldata = task->tk_calldata;
906 
907 	if (!atomic_dec_and_test(&task->tk_count))
908 		return;
909 	/* Release resources */
910 	if (task->tk_rqstp)
911 		xprt_release(task);
912 	if (task->tk_msg.rpc_cred)
913 		rpcauth_unbindcred(task);
914 	if (task->tk_client) {
915 		rpc_release_client(task->tk_client);
916 		task->tk_client = NULL;
917 	}
918 	if (task->tk_flags & RPC_TASK_DYNAMIC)
919 		call_rcu_bh(&task->u.tk_rcu, rpc_free_task);
920 	rpc_release_calldata(tk_ops, calldata);
921 }
922 EXPORT_SYMBOL_GPL(rpc_put_task);
923 
924 static void rpc_release_task(struct rpc_task *task)
925 {
926 #ifdef RPC_DEBUG
927 	BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
928 #endif
929 	dprintk("RPC: %5u release task\n", task->tk_pid);
930 
931 	if (!list_empty(&task->tk_task)) {
932 		struct rpc_clnt *clnt = task->tk_client;
933 		/* Remove from client task list */
934 		spin_lock(&clnt->cl_lock);
935 		list_del(&task->tk_task);
936 		spin_unlock(&clnt->cl_lock);
937 	}
938 	BUG_ON (RPC_IS_QUEUED(task));
939 
940 	/* Synchronously delete any running timer */
941 	rpc_delete_timer(task);
942 
943 #ifdef RPC_DEBUG
944 	task->tk_magic = 0;
945 #endif
946 	/* Wake up anyone who is waiting for task completion */
947 	rpc_mark_complete_task(task);
948 
949 	rpc_put_task(task);
950 }
951 
952 /*
953  * Kill all tasks for the given client.
954  * XXX: kill their descendants as well?
955  */
956 void rpc_killall_tasks(struct rpc_clnt *clnt)
957 {
958 	struct rpc_task	*rovr;
959 
960 
961 	if (list_empty(&clnt->cl_tasks))
962 		return;
963 	dprintk("RPC:       killing all tasks for client %p\n", clnt);
964 	/*
965 	 * Spin lock all_tasks to prevent changes...
966 	 */
967 	spin_lock(&clnt->cl_lock);
968 	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
969 		if (! RPC_IS_ACTIVATED(rovr))
970 			continue;
971 		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
972 			rovr->tk_flags |= RPC_TASK_KILLED;
973 			rpc_exit(rovr, -EIO);
974 			rpc_wake_up_task(rovr);
975 		}
976 	}
977 	spin_unlock(&clnt->cl_lock);
978 }
979 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
980 
981 int rpciod_up(void)
982 {
983 	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
984 }
985 
986 void rpciod_down(void)
987 {
988 	module_put(THIS_MODULE);
989 }
990 
991 /*
992  * Start up the rpciod workqueue.
993  */
994 static int rpciod_start(void)
995 {
996 	struct workqueue_struct *wq;
997 
998 	/*
999 	 * Create the rpciod thread and wait for it to start.
1000 	 */
1001 	dprintk("RPC:       creating workqueue rpciod\n");
1002 	wq = create_workqueue("rpciod");
1003 	rpciod_workqueue = wq;
1004 	return rpciod_workqueue != NULL;
1005 }
1006 
1007 static void rpciod_stop(void)
1008 {
1009 	struct workqueue_struct *wq = NULL;
1010 
1011 	if (rpciod_workqueue == NULL)
1012 		return;
1013 	dprintk("RPC:       destroying workqueue rpciod\n");
1014 
1015 	wq = rpciod_workqueue;
1016 	rpciod_workqueue = NULL;
1017 	destroy_workqueue(wq);
1018 }
1019 
1020 void
1021 rpc_destroy_mempool(void)
1022 {
1023 	rpciod_stop();
1024 	if (rpc_buffer_mempool)
1025 		mempool_destroy(rpc_buffer_mempool);
1026 	if (rpc_task_mempool)
1027 		mempool_destroy(rpc_task_mempool);
1028 	if (rpc_task_slabp)
1029 		kmem_cache_destroy(rpc_task_slabp);
1030 	if (rpc_buffer_slabp)
1031 		kmem_cache_destroy(rpc_buffer_slabp);
1032 }
1033 
1034 int
1035 rpc_init_mempool(void)
1036 {
1037 	rpc_task_slabp = kmem_cache_create("rpc_tasks",
1038 					     sizeof(struct rpc_task),
1039 					     0, SLAB_HWCACHE_ALIGN,
1040 					     NULL);
1041 	if (!rpc_task_slabp)
1042 		goto err_nomem;
1043 	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1044 					     RPC_BUFFER_MAXSIZE,
1045 					     0, SLAB_HWCACHE_ALIGN,
1046 					     NULL);
1047 	if (!rpc_buffer_slabp)
1048 		goto err_nomem;
1049 	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1050 						    rpc_task_slabp);
1051 	if (!rpc_task_mempool)
1052 		goto err_nomem;
1053 	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1054 						      rpc_buffer_slabp);
1055 	if (!rpc_buffer_mempool)
1056 		goto err_nomem;
1057 	if (!rpciod_start())
1058 		goto err_nomem;
1059 	/*
1060 	 * The following is not strictly a mempool initialisation,
1061 	 * but there is no harm in doing it here
1062 	 */
1063 	rpc_init_wait_queue(&delay_queue, "delayq");
1064 	return 0;
1065 err_nomem:
1066 	rpc_destroy_mempool();
1067 	return -ENOMEM;
1068 }
1069