xref: /openbmc/linux/net/sunrpc/sched.c (revision 63dc02bd)
1 /*
2  * linux/net/sunrpc/sched.c
3  *
4  * Scheduling for synchronous and asynchronous RPC requests.
5  *
6  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
7  *
8  * TCP NFS related read + write fixes
9  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  */
11 
12 #include <linux/module.h>
13 
14 #include <linux/sched.h>
15 #include <linux/interrupt.h>
16 #include <linux/slab.h>
17 #include <linux/mempool.h>
18 #include <linux/smp.h>
19 #include <linux/spinlock.h>
20 #include <linux/mutex.h>
21 #include <linux/freezer.h>
22 
23 #include <linux/sunrpc/clnt.h>
24 
25 #include "sunrpc.h"
26 
27 #ifdef RPC_DEBUG
28 #define RPCDBG_FACILITY		RPCDBG_SCHED
29 #endif
30 
31 #define CREATE_TRACE_POINTS
32 #include <trace/events/sunrpc.h>
33 
34 /*
35  * RPC slabs and memory pools
36  */
37 #define RPC_BUFFER_MAXSIZE	(2048)
38 #define RPC_BUFFER_POOLSIZE	(8)
39 #define RPC_TASK_POOLSIZE	(8)
40 static struct kmem_cache	*rpc_task_slabp __read_mostly;
41 static struct kmem_cache	*rpc_buffer_slabp __read_mostly;
42 static mempool_t	*rpc_task_mempool __read_mostly;
43 static mempool_t	*rpc_buffer_mempool __read_mostly;
44 
45 static void			rpc_async_schedule(struct work_struct *);
46 static void			 rpc_release_task(struct rpc_task *task);
47 static void __rpc_queue_timer_fn(unsigned long ptr);
48 
49 /*
50  * RPC tasks sit here while waiting for conditions to improve.
51  */
52 static struct rpc_wait_queue delay_queue;
53 
54 /*
55  * rpciod-related stuff
56  */
57 struct workqueue_struct *rpciod_workqueue;
58 
59 /*
60  * Disable the timer for a given RPC task. Should be called with
61  * queue->lock and bh_disabled in order to avoid races within
62  * rpc_run_timer().
63  */
64 static void
65 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
66 {
67 	if (task->tk_timeout == 0)
68 		return;
69 	dprintk("RPC: %5u disabling timer\n", task->tk_pid);
70 	task->tk_timeout = 0;
71 	list_del(&task->u.tk_wait.timer_list);
72 	if (list_empty(&queue->timer_list.list))
73 		del_timer(&queue->timer_list.timer);
74 }
75 
76 static void
77 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
78 {
79 	queue->timer_list.expires = expires;
80 	mod_timer(&queue->timer_list.timer, expires);
81 }
82 
83 /*
84  * Set up a timer for the current task.
85  */
86 static void
87 __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
88 {
89 	if (!task->tk_timeout)
90 		return;
91 
92 	dprintk("RPC: %5u setting alarm for %lu ms\n",
93 			task->tk_pid, task->tk_timeout * 1000 / HZ);
94 
95 	task->u.tk_wait.expires = jiffies + task->tk_timeout;
96 	if (list_empty(&queue->timer_list.list) || time_before(task->u.tk_wait.expires, queue->timer_list.expires))
97 		rpc_set_queue_timer(queue, task->u.tk_wait.expires);
98 	list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
99 }
100 
101 /*
102  * Add new request to a priority queue.
103  */
104 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
105 		struct rpc_task *task,
106 		unsigned char queue_priority)
107 {
108 	struct list_head *q;
109 	struct rpc_task *t;
110 
111 	INIT_LIST_HEAD(&task->u.tk_wait.links);
112 	q = &queue->tasks[queue_priority];
113 	if (unlikely(queue_priority > queue->maxpriority))
114 		q = &queue->tasks[queue->maxpriority];
115 	list_for_each_entry(t, q, u.tk_wait.list) {
116 		if (t->tk_owner == task->tk_owner) {
117 			list_add_tail(&task->u.tk_wait.list, &t->u.tk_wait.links);
118 			return;
119 		}
120 	}
121 	list_add_tail(&task->u.tk_wait.list, q);
122 }
123 
124 /*
125  * Add new request to wait queue.
126  *
127  * Swapper tasks always get inserted at the head of the queue.
128  * This should avoid many nasty memory deadlocks and hopefully
129  * improve overall performance.
130  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
131  */
132 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
133 		struct rpc_task *task,
134 		unsigned char queue_priority)
135 {
136 	BUG_ON (RPC_IS_QUEUED(task));
137 
138 	if (RPC_IS_PRIORITY(queue))
139 		__rpc_add_wait_queue_priority(queue, task, queue_priority);
140 	else if (RPC_IS_SWAPPER(task))
141 		list_add(&task->u.tk_wait.list, &queue->tasks[0]);
142 	else
143 		list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
144 	task->tk_waitqueue = queue;
145 	queue->qlen++;
146 	rpc_set_queued(task);
147 
148 	dprintk("RPC: %5u added to queue %p \"%s\"\n",
149 			task->tk_pid, queue, rpc_qname(queue));
150 }
151 
152 /*
153  * Remove request from a priority queue.
154  */
155 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
156 {
157 	struct rpc_task *t;
158 
159 	if (!list_empty(&task->u.tk_wait.links)) {
160 		t = list_entry(task->u.tk_wait.links.next, struct rpc_task, u.tk_wait.list);
161 		list_move(&t->u.tk_wait.list, &task->u.tk_wait.list);
162 		list_splice_init(&task->u.tk_wait.links, &t->u.tk_wait.links);
163 	}
164 }
165 
166 /*
167  * Remove request from queue.
168  * Note: must be called with spin lock held.
169  */
170 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
171 {
172 	__rpc_disable_timer(queue, task);
173 	if (RPC_IS_PRIORITY(queue))
174 		__rpc_remove_wait_queue_priority(task);
175 	list_del(&task->u.tk_wait.list);
176 	queue->qlen--;
177 	dprintk("RPC: %5u removed from queue %p \"%s\"\n",
178 			task->tk_pid, queue, rpc_qname(queue));
179 }
180 
181 static inline void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
182 {
183 	queue->priority = priority;
184 	queue->count = 1 << (priority * 2);
185 }
186 
187 static inline void rpc_set_waitqueue_owner(struct rpc_wait_queue *queue, pid_t pid)
188 {
189 	queue->owner = pid;
190 	queue->nr = RPC_BATCH_COUNT;
191 }
192 
193 static inline void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
194 {
195 	rpc_set_waitqueue_priority(queue, queue->maxpriority);
196 	rpc_set_waitqueue_owner(queue, 0);
197 }
198 
199 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
200 {
201 	int i;
202 
203 	spin_lock_init(&queue->lock);
204 	for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
205 		INIT_LIST_HEAD(&queue->tasks[i]);
206 	queue->maxpriority = nr_queues - 1;
207 	rpc_reset_waitqueue_priority(queue);
208 	queue->qlen = 0;
209 	setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
210 	INIT_LIST_HEAD(&queue->timer_list.list);
211 	rpc_assign_waitqueue_name(queue, qname);
212 }
213 
214 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
215 {
216 	__rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
217 }
218 EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
219 
220 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
221 {
222 	__rpc_init_priority_wait_queue(queue, qname, 1);
223 }
224 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
225 
226 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
227 {
228 	del_timer_sync(&queue->timer_list.timer);
229 }
230 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
231 
232 static int rpc_wait_bit_killable(void *word)
233 {
234 	if (fatal_signal_pending(current))
235 		return -ERESTARTSYS;
236 	freezable_schedule();
237 	return 0;
238 }
239 
240 #ifdef RPC_DEBUG
241 static void rpc_task_set_debuginfo(struct rpc_task *task)
242 {
243 	static atomic_t rpc_pid;
244 
245 	task->tk_pid = atomic_inc_return(&rpc_pid);
246 }
247 #else
248 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
249 {
250 }
251 #endif
252 
253 static void rpc_set_active(struct rpc_task *task)
254 {
255 	trace_rpc_task_begin(task->tk_client, task, NULL);
256 
257 	rpc_task_set_debuginfo(task);
258 	set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
259 }
260 
261 /*
262  * Mark an RPC call as having completed by clearing the 'active' bit
263  * and then waking up all tasks that were sleeping.
264  */
265 static int rpc_complete_task(struct rpc_task *task)
266 {
267 	void *m = &task->tk_runstate;
268 	wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
269 	struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
270 	unsigned long flags;
271 	int ret;
272 
273 	trace_rpc_task_complete(task->tk_client, task, NULL);
274 
275 	spin_lock_irqsave(&wq->lock, flags);
276 	clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
277 	ret = atomic_dec_and_test(&task->tk_count);
278 	if (waitqueue_active(wq))
279 		__wake_up_locked_key(wq, TASK_NORMAL, &k);
280 	spin_unlock_irqrestore(&wq->lock, flags);
281 	return ret;
282 }
283 
284 /*
285  * Allow callers to wait for completion of an RPC call
286  *
287  * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
288  * to enforce taking of the wq->lock and hence avoid races with
289  * rpc_complete_task().
290  */
291 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
292 {
293 	if (action == NULL)
294 		action = rpc_wait_bit_killable;
295 	return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
296 			action, TASK_KILLABLE);
297 }
298 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
299 
300 /*
301  * Make an RPC task runnable.
302  *
303  * Note: If the task is ASYNC, this must be called with
304  * the spinlock held to protect the wait queue operation.
305  */
306 static void rpc_make_runnable(struct rpc_task *task)
307 {
308 	rpc_clear_queued(task);
309 	if (rpc_test_and_set_running(task))
310 		return;
311 	if (RPC_IS_ASYNC(task)) {
312 		INIT_WORK(&task->u.tk_work, rpc_async_schedule);
313 		queue_work(rpciod_workqueue, &task->u.tk_work);
314 	} else
315 		wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
316 }
317 
318 /*
319  * Prepare for sleeping on a wait queue.
320  * By always appending tasks to the list we ensure FIFO behavior.
321  * NB: An RPC task will only receive interrupt-driven events as long
322  * as it's on a wait queue.
323  */
324 static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
325 		struct rpc_task *task,
326 		rpc_action action,
327 		unsigned char queue_priority)
328 {
329 	dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
330 			task->tk_pid, rpc_qname(q), jiffies);
331 
332 	trace_rpc_task_sleep(task->tk_client, task, q);
333 
334 	__rpc_add_wait_queue(q, task, queue_priority);
335 
336 	BUG_ON(task->tk_callback != NULL);
337 	task->tk_callback = action;
338 	__rpc_add_timer(q, task);
339 }
340 
341 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
342 				rpc_action action)
343 {
344 	/* We shouldn't ever put an inactive task to sleep */
345 	BUG_ON(!RPC_IS_ACTIVATED(task));
346 
347 	/*
348 	 * Protect the queue operations.
349 	 */
350 	spin_lock_bh(&q->lock);
351 	__rpc_sleep_on_priority(q, task, action, task->tk_priority);
352 	spin_unlock_bh(&q->lock);
353 }
354 EXPORT_SYMBOL_GPL(rpc_sleep_on);
355 
356 void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
357 		rpc_action action, int priority)
358 {
359 	/* We shouldn't ever put an inactive task to sleep */
360 	BUG_ON(!RPC_IS_ACTIVATED(task));
361 
362 	/*
363 	 * Protect the queue operations.
364 	 */
365 	spin_lock_bh(&q->lock);
366 	__rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
367 	spin_unlock_bh(&q->lock);
368 }
369 
370 /**
371  * __rpc_do_wake_up_task - wake up a single rpc_task
372  * @queue: wait queue
373  * @task: task to be woken up
374  *
375  * Caller must hold queue->lock, and have cleared the task queued flag.
376  */
377 static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task)
378 {
379 	dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
380 			task->tk_pid, jiffies);
381 
382 	/* Has the task been executed yet? If not, we cannot wake it up! */
383 	if (!RPC_IS_ACTIVATED(task)) {
384 		printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
385 		return;
386 	}
387 
388 	trace_rpc_task_wakeup(task->tk_client, task, queue);
389 
390 	__rpc_remove_wait_queue(queue, task);
391 
392 	rpc_make_runnable(task);
393 
394 	dprintk("RPC:       __rpc_wake_up_task done\n");
395 }
396 
397 /*
398  * Wake up a queued task while the queue lock is being held
399  */
400 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
401 {
402 	if (RPC_IS_QUEUED(task) && task->tk_waitqueue == queue)
403 		__rpc_do_wake_up_task(queue, task);
404 }
405 
406 /*
407  * Tests whether rpc queue is empty
408  */
409 int rpc_queue_empty(struct rpc_wait_queue *queue)
410 {
411 	int res;
412 
413 	spin_lock_bh(&queue->lock);
414 	res = queue->qlen;
415 	spin_unlock_bh(&queue->lock);
416 	return res == 0;
417 }
418 EXPORT_SYMBOL_GPL(rpc_queue_empty);
419 
420 /*
421  * Wake up a task on a specific queue
422  */
423 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
424 {
425 	spin_lock_bh(&queue->lock);
426 	rpc_wake_up_task_queue_locked(queue, task);
427 	spin_unlock_bh(&queue->lock);
428 }
429 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
430 
431 /*
432  * Wake up the next task on a priority queue.
433  */
434 static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
435 {
436 	struct list_head *q;
437 	struct rpc_task *task;
438 
439 	/*
440 	 * Service a batch of tasks from a single owner.
441 	 */
442 	q = &queue->tasks[queue->priority];
443 	if (!list_empty(q)) {
444 		task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
445 		if (queue->owner == task->tk_owner) {
446 			if (--queue->nr)
447 				goto out;
448 			list_move_tail(&task->u.tk_wait.list, q);
449 		}
450 		/*
451 		 * Check if we need to switch queues.
452 		 */
453 		if (--queue->count)
454 			goto new_owner;
455 	}
456 
457 	/*
458 	 * Service the next queue.
459 	 */
460 	do {
461 		if (q == &queue->tasks[0])
462 			q = &queue->tasks[queue->maxpriority];
463 		else
464 			q = q - 1;
465 		if (!list_empty(q)) {
466 			task = list_entry(q->next, struct rpc_task, u.tk_wait.list);
467 			goto new_queue;
468 		}
469 	} while (q != &queue->tasks[queue->priority]);
470 
471 	rpc_reset_waitqueue_priority(queue);
472 	return NULL;
473 
474 new_queue:
475 	rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
476 new_owner:
477 	rpc_set_waitqueue_owner(queue, task->tk_owner);
478 out:
479 	return task;
480 }
481 
482 static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
483 {
484 	if (RPC_IS_PRIORITY(queue))
485 		return __rpc_find_next_queued_priority(queue);
486 	if (!list_empty(&queue->tasks[0]))
487 		return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
488 	return NULL;
489 }
490 
491 /*
492  * Wake up the first task on the wait queue.
493  */
494 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
495 		bool (*func)(struct rpc_task *, void *), void *data)
496 {
497 	struct rpc_task	*task = NULL;
498 
499 	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
500 			queue, rpc_qname(queue));
501 	spin_lock_bh(&queue->lock);
502 	task = __rpc_find_next_queued(queue);
503 	if (task != NULL) {
504 		if (func(task, data))
505 			rpc_wake_up_task_queue_locked(queue, task);
506 		else
507 			task = NULL;
508 	}
509 	spin_unlock_bh(&queue->lock);
510 
511 	return task;
512 }
513 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
514 
515 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
516 {
517 	return true;
518 }
519 
520 /*
521  * Wake up the next task on the wait queue.
522 */
523 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
524 {
525 	return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
526 }
527 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
528 
529 /**
530  * rpc_wake_up - wake up all rpc_tasks
531  * @queue: rpc_wait_queue on which the tasks are sleeping
532  *
533  * Grabs queue->lock
534  */
535 void rpc_wake_up(struct rpc_wait_queue *queue)
536 {
537 	struct list_head *head;
538 
539 	spin_lock_bh(&queue->lock);
540 	head = &queue->tasks[queue->maxpriority];
541 	for (;;) {
542 		while (!list_empty(head)) {
543 			struct rpc_task *task;
544 			task = list_first_entry(head,
545 					struct rpc_task,
546 					u.tk_wait.list);
547 			rpc_wake_up_task_queue_locked(queue, task);
548 		}
549 		if (head == &queue->tasks[0])
550 			break;
551 		head--;
552 	}
553 	spin_unlock_bh(&queue->lock);
554 }
555 EXPORT_SYMBOL_GPL(rpc_wake_up);
556 
557 /**
558  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
559  * @queue: rpc_wait_queue on which the tasks are sleeping
560  * @status: status value to set
561  *
562  * Grabs queue->lock
563  */
564 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
565 {
566 	struct list_head *head;
567 
568 	spin_lock_bh(&queue->lock);
569 	head = &queue->tasks[queue->maxpriority];
570 	for (;;) {
571 		while (!list_empty(head)) {
572 			struct rpc_task *task;
573 			task = list_first_entry(head,
574 					struct rpc_task,
575 					u.tk_wait.list);
576 			task->tk_status = status;
577 			rpc_wake_up_task_queue_locked(queue, task);
578 		}
579 		if (head == &queue->tasks[0])
580 			break;
581 		head--;
582 	}
583 	spin_unlock_bh(&queue->lock);
584 }
585 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
586 
587 static void __rpc_queue_timer_fn(unsigned long ptr)
588 {
589 	struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
590 	struct rpc_task *task, *n;
591 	unsigned long expires, now, timeo;
592 
593 	spin_lock(&queue->lock);
594 	expires = now = jiffies;
595 	list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
596 		timeo = task->u.tk_wait.expires;
597 		if (time_after_eq(now, timeo)) {
598 			dprintk("RPC: %5u timeout\n", task->tk_pid);
599 			task->tk_status = -ETIMEDOUT;
600 			rpc_wake_up_task_queue_locked(queue, task);
601 			continue;
602 		}
603 		if (expires == now || time_after(expires, timeo))
604 			expires = timeo;
605 	}
606 	if (!list_empty(&queue->timer_list.list))
607 		rpc_set_queue_timer(queue, expires);
608 	spin_unlock(&queue->lock);
609 }
610 
611 static void __rpc_atrun(struct rpc_task *task)
612 {
613 	task->tk_status = 0;
614 }
615 
616 /*
617  * Run a task at a later time
618  */
619 void rpc_delay(struct rpc_task *task, unsigned long delay)
620 {
621 	task->tk_timeout = delay;
622 	rpc_sleep_on(&delay_queue, task, __rpc_atrun);
623 }
624 EXPORT_SYMBOL_GPL(rpc_delay);
625 
626 /*
627  * Helper to call task->tk_ops->rpc_call_prepare
628  */
629 void rpc_prepare_task(struct rpc_task *task)
630 {
631 	task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
632 }
633 
634 static void
635 rpc_init_task_statistics(struct rpc_task *task)
636 {
637 	/* Initialize retry counters */
638 	task->tk_garb_retry = 2;
639 	task->tk_cred_retry = 2;
640 	task->tk_rebind_retry = 2;
641 
642 	/* starting timestamp */
643 	task->tk_start = ktime_get();
644 }
645 
646 static void
647 rpc_reset_task_statistics(struct rpc_task *task)
648 {
649 	task->tk_timeouts = 0;
650 	task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT);
651 
652 	rpc_init_task_statistics(task);
653 }
654 
655 /*
656  * Helper that calls task->tk_ops->rpc_call_done if it exists
657  */
658 void rpc_exit_task(struct rpc_task *task)
659 {
660 	task->tk_action = NULL;
661 	if (task->tk_ops->rpc_call_done != NULL) {
662 		task->tk_ops->rpc_call_done(task, task->tk_calldata);
663 		if (task->tk_action != NULL) {
664 			WARN_ON(RPC_ASSASSINATED(task));
665 			/* Always release the RPC slot and buffer memory */
666 			xprt_release(task);
667 			rpc_reset_task_statistics(task);
668 		}
669 	}
670 }
671 
672 void rpc_exit(struct rpc_task *task, int status)
673 {
674 	task->tk_status = status;
675 	task->tk_action = rpc_exit_task;
676 	if (RPC_IS_QUEUED(task))
677 		rpc_wake_up_queued_task(task->tk_waitqueue, task);
678 }
679 EXPORT_SYMBOL_GPL(rpc_exit);
680 
681 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
682 {
683 	if (ops->rpc_release != NULL)
684 		ops->rpc_release(calldata);
685 }
686 
687 /*
688  * This is the RPC `scheduler' (or rather, the finite state machine).
689  */
690 static void __rpc_execute(struct rpc_task *task)
691 {
692 	struct rpc_wait_queue *queue;
693 	int task_is_async = RPC_IS_ASYNC(task);
694 	int status = 0;
695 
696 	dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
697 			task->tk_pid, task->tk_flags);
698 
699 	BUG_ON(RPC_IS_QUEUED(task));
700 
701 	for (;;) {
702 		void (*do_action)(struct rpc_task *);
703 
704 		/*
705 		 * Execute any pending callback first.
706 		 */
707 		do_action = task->tk_callback;
708 		task->tk_callback = NULL;
709 		if (do_action == NULL) {
710 			/*
711 			 * Perform the next FSM step.
712 			 * tk_action may be NULL if the task has been killed.
713 			 * In particular, note that rpc_killall_tasks may
714 			 * do this at any time, so beware when dereferencing.
715 			 */
716 			do_action = task->tk_action;
717 			if (do_action == NULL)
718 				break;
719 		}
720 		trace_rpc_task_run_action(task->tk_client, task, task->tk_action);
721 		do_action(task);
722 
723 		/*
724 		 * Lockless check for whether task is sleeping or not.
725 		 */
726 		if (!RPC_IS_QUEUED(task))
727 			continue;
728 		/*
729 		 * The queue->lock protects against races with
730 		 * rpc_make_runnable().
731 		 *
732 		 * Note that once we clear RPC_TASK_RUNNING on an asynchronous
733 		 * rpc_task, rpc_make_runnable() can assign it to a
734 		 * different workqueue. We therefore cannot assume that the
735 		 * rpc_task pointer may still be dereferenced.
736 		 */
737 		queue = task->tk_waitqueue;
738 		spin_lock_bh(&queue->lock);
739 		if (!RPC_IS_QUEUED(task)) {
740 			spin_unlock_bh(&queue->lock);
741 			continue;
742 		}
743 		rpc_clear_running(task);
744 		spin_unlock_bh(&queue->lock);
745 		if (task_is_async)
746 			return;
747 
748 		/* sync task: sleep here */
749 		dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
750 		status = out_of_line_wait_on_bit(&task->tk_runstate,
751 				RPC_TASK_QUEUED, rpc_wait_bit_killable,
752 				TASK_KILLABLE);
753 		if (status == -ERESTARTSYS) {
754 			/*
755 			 * When a sync task receives a signal, it exits with
756 			 * -ERESTARTSYS. In order to catch any callbacks that
757 			 * clean up after sleeping on some queue, we don't
758 			 * break the loop here, but go around once more.
759 			 */
760 			dprintk("RPC: %5u got signal\n", task->tk_pid);
761 			task->tk_flags |= RPC_TASK_KILLED;
762 			rpc_exit(task, -ERESTARTSYS);
763 		}
764 		rpc_set_running(task);
765 		dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
766 	}
767 
768 	dprintk("RPC: %5u return %d, status %d\n", task->tk_pid, status,
769 			task->tk_status);
770 	/* Release all resources associated with the task */
771 	rpc_release_task(task);
772 }
773 
774 /*
775  * User-visible entry point to the scheduler.
776  *
777  * This may be called recursively if e.g. an async NFS task updates
778  * the attributes and finds that dirty pages must be flushed.
779  * NOTE: Upon exit of this function the task is guaranteed to be
780  *	 released. In particular note that tk_release() will have
781  *	 been called, so your task memory may have been freed.
782  */
783 void rpc_execute(struct rpc_task *task)
784 {
785 	rpc_set_active(task);
786 	rpc_make_runnable(task);
787 	if (!RPC_IS_ASYNC(task))
788 		__rpc_execute(task);
789 }
790 
791 static void rpc_async_schedule(struct work_struct *work)
792 {
793 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
794 }
795 
796 /**
797  * rpc_malloc - allocate an RPC buffer
798  * @task: RPC task that will use this buffer
799  * @size: requested byte size
800  *
801  * To prevent rpciod from hanging, this allocator never sleeps,
802  * returning NULL if the request cannot be serviced immediately.
803  * The caller can arrange to sleep in a way that is safe for rpciod.
804  *
805  * Most requests are 'small' (under 2KiB) and can be serviced from a
806  * mempool, ensuring that NFS reads and writes can always proceed,
807  * and that there is good locality of reference for these buffers.
808  *
809  * In order to avoid memory starvation triggering more writebacks of
810  * NFS requests, we avoid using GFP_KERNEL.
811  */
812 void *rpc_malloc(struct rpc_task *task, size_t size)
813 {
814 	struct rpc_buffer *buf;
815 	gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
816 
817 	size += sizeof(struct rpc_buffer);
818 	if (size <= RPC_BUFFER_MAXSIZE)
819 		buf = mempool_alloc(rpc_buffer_mempool, gfp);
820 	else
821 		buf = kmalloc(size, gfp);
822 
823 	if (!buf)
824 		return NULL;
825 
826 	buf->len = size;
827 	dprintk("RPC: %5u allocated buffer of size %zu at %p\n",
828 			task->tk_pid, size, buf);
829 	return &buf->data;
830 }
831 EXPORT_SYMBOL_GPL(rpc_malloc);
832 
833 /**
834  * rpc_free - free buffer allocated via rpc_malloc
835  * @buffer: buffer to free
836  *
837  */
838 void rpc_free(void *buffer)
839 {
840 	size_t size;
841 	struct rpc_buffer *buf;
842 
843 	if (!buffer)
844 		return;
845 
846 	buf = container_of(buffer, struct rpc_buffer, data);
847 	size = buf->len;
848 
849 	dprintk("RPC:       freeing buffer of size %zu at %p\n",
850 			size, buf);
851 
852 	if (size <= RPC_BUFFER_MAXSIZE)
853 		mempool_free(buf, rpc_buffer_mempool);
854 	else
855 		kfree(buf);
856 }
857 EXPORT_SYMBOL_GPL(rpc_free);
858 
859 /*
860  * Creation and deletion of RPC task structures
861  */
862 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
863 {
864 	memset(task, 0, sizeof(*task));
865 	atomic_set(&task->tk_count, 1);
866 	task->tk_flags  = task_setup_data->flags;
867 	task->tk_ops = task_setup_data->callback_ops;
868 	task->tk_calldata = task_setup_data->callback_data;
869 	INIT_LIST_HEAD(&task->tk_task);
870 
871 	task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
872 	task->tk_owner = current->tgid;
873 
874 	/* Initialize workqueue for async tasks */
875 	task->tk_workqueue = task_setup_data->workqueue;
876 
877 	if (task->tk_ops->rpc_call_prepare != NULL)
878 		task->tk_action = rpc_prepare_task;
879 
880 	rpc_init_task_statistics(task);
881 
882 	dprintk("RPC:       new task initialized, procpid %u\n",
883 				task_pid_nr(current));
884 }
885 
886 static struct rpc_task *
887 rpc_alloc_task(void)
888 {
889 	return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
890 }
891 
892 /*
893  * Create a new task for the specified client.
894  */
895 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
896 {
897 	struct rpc_task	*task = setup_data->task;
898 	unsigned short flags = 0;
899 
900 	if (task == NULL) {
901 		task = rpc_alloc_task();
902 		if (task == NULL) {
903 			rpc_release_calldata(setup_data->callback_ops,
904 					setup_data->callback_data);
905 			return ERR_PTR(-ENOMEM);
906 		}
907 		flags = RPC_TASK_DYNAMIC;
908 	}
909 
910 	rpc_init_task(task, setup_data);
911 	task->tk_flags |= flags;
912 	dprintk("RPC:       allocated task %p\n", task);
913 	return task;
914 }
915 
916 static void rpc_free_task(struct rpc_task *task)
917 {
918 	const struct rpc_call_ops *tk_ops = task->tk_ops;
919 	void *calldata = task->tk_calldata;
920 
921 	if (task->tk_flags & RPC_TASK_DYNAMIC) {
922 		dprintk("RPC: %5u freeing task\n", task->tk_pid);
923 		mempool_free(task, rpc_task_mempool);
924 	}
925 	rpc_release_calldata(tk_ops, calldata);
926 }
927 
928 static void rpc_async_release(struct work_struct *work)
929 {
930 	rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
931 }
932 
933 static void rpc_release_resources_task(struct rpc_task *task)
934 {
935 	if (task->tk_rqstp)
936 		xprt_release(task);
937 	if (task->tk_msg.rpc_cred) {
938 		put_rpccred(task->tk_msg.rpc_cred);
939 		task->tk_msg.rpc_cred = NULL;
940 	}
941 	rpc_task_release_client(task);
942 }
943 
944 static void rpc_final_put_task(struct rpc_task *task,
945 		struct workqueue_struct *q)
946 {
947 	if (q != NULL) {
948 		INIT_WORK(&task->u.tk_work, rpc_async_release);
949 		queue_work(q, &task->u.tk_work);
950 	} else
951 		rpc_free_task(task);
952 }
953 
954 static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
955 {
956 	if (atomic_dec_and_test(&task->tk_count)) {
957 		rpc_release_resources_task(task);
958 		rpc_final_put_task(task, q);
959 	}
960 }
961 
962 void rpc_put_task(struct rpc_task *task)
963 {
964 	rpc_do_put_task(task, NULL);
965 }
966 EXPORT_SYMBOL_GPL(rpc_put_task);
967 
968 void rpc_put_task_async(struct rpc_task *task)
969 {
970 	rpc_do_put_task(task, task->tk_workqueue);
971 }
972 EXPORT_SYMBOL_GPL(rpc_put_task_async);
973 
974 static void rpc_release_task(struct rpc_task *task)
975 {
976 	dprintk("RPC: %5u release task\n", task->tk_pid);
977 
978 	BUG_ON (RPC_IS_QUEUED(task));
979 
980 	rpc_release_resources_task(task);
981 
982 	/*
983 	 * Note: at this point we have been removed from rpc_clnt->cl_tasks,
984 	 * so it should be safe to use task->tk_count as a test for whether
985 	 * or not any other processes still hold references to our rpc_task.
986 	 */
987 	if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
988 		/* Wake up anyone who may be waiting for task completion */
989 		if (!rpc_complete_task(task))
990 			return;
991 	} else {
992 		if (!atomic_dec_and_test(&task->tk_count))
993 			return;
994 	}
995 	rpc_final_put_task(task, task->tk_workqueue);
996 }
997 
998 int rpciod_up(void)
999 {
1000 	return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
1001 }
1002 
1003 void rpciod_down(void)
1004 {
1005 	module_put(THIS_MODULE);
1006 }
1007 
1008 /*
1009  * Start up the rpciod workqueue.
1010  */
1011 static int rpciod_start(void)
1012 {
1013 	struct workqueue_struct *wq;
1014 
1015 	/*
1016 	 * Create the rpciod thread and wait for it to start.
1017 	 */
1018 	dprintk("RPC:       creating workqueue rpciod\n");
1019 	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
1020 	rpciod_workqueue = wq;
1021 	return rpciod_workqueue != NULL;
1022 }
1023 
1024 static void rpciod_stop(void)
1025 {
1026 	struct workqueue_struct *wq = NULL;
1027 
1028 	if (rpciod_workqueue == NULL)
1029 		return;
1030 	dprintk("RPC:       destroying workqueue rpciod\n");
1031 
1032 	wq = rpciod_workqueue;
1033 	rpciod_workqueue = NULL;
1034 	destroy_workqueue(wq);
1035 }
1036 
1037 void
1038 rpc_destroy_mempool(void)
1039 {
1040 	rpciod_stop();
1041 	if (rpc_buffer_mempool)
1042 		mempool_destroy(rpc_buffer_mempool);
1043 	if (rpc_task_mempool)
1044 		mempool_destroy(rpc_task_mempool);
1045 	if (rpc_task_slabp)
1046 		kmem_cache_destroy(rpc_task_slabp);
1047 	if (rpc_buffer_slabp)
1048 		kmem_cache_destroy(rpc_buffer_slabp);
1049 	rpc_destroy_wait_queue(&delay_queue);
1050 }
1051 
1052 int
1053 rpc_init_mempool(void)
1054 {
1055 	/*
1056 	 * The following is not strictly a mempool initialisation,
1057 	 * but there is no harm in doing it here
1058 	 */
1059 	rpc_init_wait_queue(&delay_queue, "delayq");
1060 	if (!rpciod_start())
1061 		goto err_nomem;
1062 
1063 	rpc_task_slabp = kmem_cache_create("rpc_tasks",
1064 					     sizeof(struct rpc_task),
1065 					     0, SLAB_HWCACHE_ALIGN,
1066 					     NULL);
1067 	if (!rpc_task_slabp)
1068 		goto err_nomem;
1069 	rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1070 					     RPC_BUFFER_MAXSIZE,
1071 					     0, SLAB_HWCACHE_ALIGN,
1072 					     NULL);
1073 	if (!rpc_buffer_slabp)
1074 		goto err_nomem;
1075 	rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1076 						    rpc_task_slabp);
1077 	if (!rpc_task_mempool)
1078 		goto err_nomem;
1079 	rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1080 						      rpc_buffer_slabp);
1081 	if (!rpc_buffer_mempool)
1082 		goto err_nomem;
1083 	return 0;
1084 err_nomem:
1085 	rpc_destroy_mempool();
1086 	return -ENOMEM;
1087 }
1088