xref: /openbmc/linux/net/sunrpc/clnt.c (revision 0d456bad)
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -	RPC header generation and argument serialization.
9  *  -	Credential refresh.
10  *  -	TCP connect handling.
11  *  -	Retry of operation when it is suspected the operation failed because
12  *	of uid squashing on the server, or when the credentials were stale
13  *	and need to be refreshed, or when a packet was damaged in transit.
14  *	This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19 
20 
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34 
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/rpc_pipe_fs.h>
37 #include <linux/sunrpc/metrics.h>
38 #include <linux/sunrpc/bc_xprt.h>
39 #include <trace/events/sunrpc.h>
40 
41 #include "sunrpc.h"
42 #include "netns.h"
43 
44 #ifdef RPC_DEBUG
45 # define RPCDBG_FACILITY	RPCDBG_CALL
46 #endif
47 
48 #define dprint_status(t)					\
49 	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
50 			__func__, t->tk_status)
51 
52 /*
53  * All RPC clients are linked into this list
54  */
55 
56 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
57 
58 
59 static void	call_start(struct rpc_task *task);
60 static void	call_reserve(struct rpc_task *task);
61 static void	call_reserveresult(struct rpc_task *task);
62 static void	call_allocate(struct rpc_task *task);
63 static void	call_decode(struct rpc_task *task);
64 static void	call_bind(struct rpc_task *task);
65 static void	call_bind_status(struct rpc_task *task);
66 static void	call_transmit(struct rpc_task *task);
67 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
68 static void	call_bc_transmit(struct rpc_task *task);
69 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
70 static void	call_status(struct rpc_task *task);
71 static void	call_transmit_status(struct rpc_task *task);
72 static void	call_refresh(struct rpc_task *task);
73 static void	call_refreshresult(struct rpc_task *task);
74 static void	call_timeout(struct rpc_task *task);
75 static void	call_connect(struct rpc_task *task);
76 static void	call_connect_status(struct rpc_task *task);
77 
78 static __be32	*rpc_encode_header(struct rpc_task *task);
79 static __be32	*rpc_verify_header(struct rpc_task *task);
80 static int	rpc_ping(struct rpc_clnt *clnt);
81 
82 static void rpc_register_client(struct rpc_clnt *clnt)
83 {
84 	struct net *net = rpc_net_ns(clnt);
85 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
86 
87 	spin_lock(&sn->rpc_client_lock);
88 	list_add(&clnt->cl_clients, &sn->all_clients);
89 	spin_unlock(&sn->rpc_client_lock);
90 }
91 
92 static void rpc_unregister_client(struct rpc_clnt *clnt)
93 {
94 	struct net *net = rpc_net_ns(clnt);
95 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
96 
97 	spin_lock(&sn->rpc_client_lock);
98 	list_del(&clnt->cl_clients);
99 	spin_unlock(&sn->rpc_client_lock);
100 }
101 
102 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103 {
104 	if (clnt->cl_dentry) {
105 		if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
106 			clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
107 		rpc_remove_client_dir(clnt->cl_dentry);
108 	}
109 	clnt->cl_dentry = NULL;
110 }
111 
112 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
113 {
114 	struct net *net = rpc_net_ns(clnt);
115 	struct super_block *pipefs_sb;
116 
117 	pipefs_sb = rpc_get_sb_net(net);
118 	if (pipefs_sb) {
119 		__rpc_clnt_remove_pipedir(clnt);
120 		rpc_put_sb_net(net);
121 	}
122 }
123 
124 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
125 				    struct rpc_clnt *clnt,
126 				    const char *dir_name)
127 {
128 	static uint32_t clntid;
129 	char name[15];
130 	struct qstr q = { .name = name };
131 	struct dentry *dir, *dentry;
132 	int error;
133 
134 	dir = rpc_d_lookup_sb(sb, dir_name);
135 	if (dir == NULL) {
136 		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
137 		return dir;
138 	}
139 	for (;;) {
140 		q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
141 		name[sizeof(name) - 1] = '\0';
142 		q.hash = full_name_hash(q.name, q.len);
143 		dentry = rpc_create_client_dir(dir, &q, clnt);
144 		if (!IS_ERR(dentry))
145 			break;
146 		error = PTR_ERR(dentry);
147 		if (error != -EEXIST) {
148 			printk(KERN_INFO "RPC: Couldn't create pipefs entry"
149 					" %s/%s, error %d\n",
150 					dir_name, name, error);
151 			break;
152 		}
153 	}
154 	dput(dir);
155 	return dentry;
156 }
157 
158 static int
159 rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name)
160 {
161 	struct net *net = rpc_net_ns(clnt);
162 	struct super_block *pipefs_sb;
163 	struct dentry *dentry;
164 
165 	clnt->cl_dentry = NULL;
166 	if (dir_name == NULL)
167 		return 0;
168 	pipefs_sb = rpc_get_sb_net(net);
169 	if (!pipefs_sb)
170 		return 0;
171 	dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
172 	rpc_put_sb_net(net);
173 	if (IS_ERR(dentry))
174 		return PTR_ERR(dentry);
175 	clnt->cl_dentry = dentry;
176 	return 0;
177 }
178 
179 static inline int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
180 {
181 	if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
182 	    ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
183 		return 1;
184 	return 0;
185 }
186 
187 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
188 				   struct super_block *sb)
189 {
190 	struct dentry *dentry;
191 	int err = 0;
192 
193 	switch (event) {
194 	case RPC_PIPEFS_MOUNT:
195 		dentry = rpc_setup_pipedir_sb(sb, clnt,
196 					      clnt->cl_program->pipe_dir_name);
197 		if (!dentry)
198 			return -ENOENT;
199 		if (IS_ERR(dentry))
200 			return PTR_ERR(dentry);
201 		clnt->cl_dentry = dentry;
202 		if (clnt->cl_auth->au_ops->pipes_create) {
203 			err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
204 			if (err)
205 				__rpc_clnt_remove_pipedir(clnt);
206 		}
207 		break;
208 	case RPC_PIPEFS_UMOUNT:
209 		__rpc_clnt_remove_pipedir(clnt);
210 		break;
211 	default:
212 		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
213 		return -ENOTSUPP;
214 	}
215 	return err;
216 }
217 
218 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
219 				struct super_block *sb)
220 {
221 	int error = 0;
222 
223 	for (;; clnt = clnt->cl_parent) {
224 		if (!rpc_clnt_skip_event(clnt, event))
225 			error = __rpc_clnt_handle_event(clnt, event, sb);
226 		if (error || clnt == clnt->cl_parent)
227 			break;
228 	}
229 	return error;
230 }
231 
232 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
233 {
234 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
235 	struct rpc_clnt *clnt;
236 
237 	spin_lock(&sn->rpc_client_lock);
238 	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
239 		if (clnt->cl_program->pipe_dir_name == NULL)
240 			continue;
241 		if (rpc_clnt_skip_event(clnt, event))
242 			continue;
243 		if (atomic_inc_not_zero(&clnt->cl_count) == 0)
244 			continue;
245 		spin_unlock(&sn->rpc_client_lock);
246 		return clnt;
247 	}
248 	spin_unlock(&sn->rpc_client_lock);
249 	return NULL;
250 }
251 
252 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
253 			    void *ptr)
254 {
255 	struct super_block *sb = ptr;
256 	struct rpc_clnt *clnt;
257 	int error = 0;
258 
259 	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
260 		error = __rpc_pipefs_event(clnt, event, sb);
261 		rpc_release_client(clnt);
262 		if (error)
263 			break;
264 	}
265 	return error;
266 }
267 
268 static struct notifier_block rpc_clients_block = {
269 	.notifier_call	= rpc_pipefs_event,
270 	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
271 };
272 
273 int rpc_clients_notifier_register(void)
274 {
275 	return rpc_pipefs_notifier_register(&rpc_clients_block);
276 }
277 
278 void rpc_clients_notifier_unregister(void)
279 {
280 	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
281 }
282 
283 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
284 {
285 	clnt->cl_nodelen = strlen(nodename);
286 	if (clnt->cl_nodelen > UNX_MAXNODENAME)
287 		clnt->cl_nodelen = UNX_MAXNODENAME;
288 	memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
289 }
290 
291 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
292 {
293 	const struct rpc_program *program = args->program;
294 	const struct rpc_version *version;
295 	struct rpc_clnt		*clnt = NULL;
296 	struct rpc_auth		*auth;
297 	int err;
298 
299 	/* sanity check the name before trying to print it */
300 	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
301 			program->name, args->servername, xprt);
302 
303 	err = rpciod_up();
304 	if (err)
305 		goto out_no_rpciod;
306 	err = -EINVAL;
307 	if (!xprt)
308 		goto out_no_xprt;
309 
310 	if (args->version >= program->nrvers)
311 		goto out_err;
312 	version = program->version[args->version];
313 	if (version == NULL)
314 		goto out_err;
315 
316 	err = -ENOMEM;
317 	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
318 	if (!clnt)
319 		goto out_err;
320 	clnt->cl_parent = clnt;
321 
322 	rcu_assign_pointer(clnt->cl_xprt, xprt);
323 	clnt->cl_procinfo = version->procs;
324 	clnt->cl_maxproc  = version->nrprocs;
325 	clnt->cl_protname = program->name;
326 	clnt->cl_prog     = args->prognumber ? : program->number;
327 	clnt->cl_vers     = version->number;
328 	clnt->cl_stats    = program->stats;
329 	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
330 	err = -ENOMEM;
331 	if (clnt->cl_metrics == NULL)
332 		goto out_no_stats;
333 	clnt->cl_program  = program;
334 	INIT_LIST_HEAD(&clnt->cl_tasks);
335 	spin_lock_init(&clnt->cl_lock);
336 
337 	if (!xprt_bound(xprt))
338 		clnt->cl_autobind = 1;
339 
340 	clnt->cl_timeout = xprt->timeout;
341 	if (args->timeout != NULL) {
342 		memcpy(&clnt->cl_timeout_default, args->timeout,
343 				sizeof(clnt->cl_timeout_default));
344 		clnt->cl_timeout = &clnt->cl_timeout_default;
345 	}
346 
347 	clnt->cl_rtt = &clnt->cl_rtt_default;
348 	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
349 	clnt->cl_principal = NULL;
350 	if (args->client_name) {
351 		clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
352 		if (!clnt->cl_principal)
353 			goto out_no_principal;
354 	}
355 
356 	atomic_set(&clnt->cl_count, 1);
357 
358 	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
359 	if (err < 0)
360 		goto out_no_path;
361 
362 	auth = rpcauth_create(args->authflavor, clnt);
363 	if (IS_ERR(auth)) {
364 		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
365 				args->authflavor);
366 		err = PTR_ERR(auth);
367 		goto out_no_auth;
368 	}
369 
370 	/* save the nodename */
371 	rpc_clnt_set_nodename(clnt, utsname()->nodename);
372 	rpc_register_client(clnt);
373 	return clnt;
374 
375 out_no_auth:
376 	rpc_clnt_remove_pipedir(clnt);
377 out_no_path:
378 	kfree(clnt->cl_principal);
379 out_no_principal:
380 	rpc_free_iostats(clnt->cl_metrics);
381 out_no_stats:
382 	kfree(clnt);
383 out_err:
384 	xprt_put(xprt);
385 out_no_xprt:
386 	rpciod_down();
387 out_no_rpciod:
388 	return ERR_PTR(err);
389 }
390 
391 /**
392  * rpc_create - create an RPC client and transport with one call
393  * @args: rpc_clnt create argument structure
394  *
395  * Creates and initializes an RPC transport and an RPC client.
396  *
397  * It can ping the server in order to determine if it is up, and to see if
398  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
399  * this behavior so asynchronous tasks can also use rpc_create.
400  */
401 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
402 {
403 	struct rpc_xprt *xprt;
404 	struct rpc_clnt *clnt;
405 	struct xprt_create xprtargs = {
406 		.net = args->net,
407 		.ident = args->protocol,
408 		.srcaddr = args->saddress,
409 		.dstaddr = args->address,
410 		.addrlen = args->addrsize,
411 		.servername = args->servername,
412 		.bc_xprt = args->bc_xprt,
413 	};
414 	char servername[48];
415 
416 	/*
417 	 * If the caller chooses not to specify a hostname, whip
418 	 * up a string representation of the passed-in address.
419 	 */
420 	if (xprtargs.servername == NULL) {
421 		struct sockaddr_un *sun =
422 				(struct sockaddr_un *)args->address;
423 		struct sockaddr_in *sin =
424 				(struct sockaddr_in *)args->address;
425 		struct sockaddr_in6 *sin6 =
426 				(struct sockaddr_in6 *)args->address;
427 
428 		servername[0] = '\0';
429 		switch (args->address->sa_family) {
430 		case AF_LOCAL:
431 			snprintf(servername, sizeof(servername), "%s",
432 				 sun->sun_path);
433 			break;
434 		case AF_INET:
435 			snprintf(servername, sizeof(servername), "%pI4",
436 				 &sin->sin_addr.s_addr);
437 			break;
438 		case AF_INET6:
439 			snprintf(servername, sizeof(servername), "%pI6",
440 				 &sin6->sin6_addr);
441 			break;
442 		default:
443 			/* caller wants default server name, but
444 			 * address family isn't recognized. */
445 			return ERR_PTR(-EINVAL);
446 		}
447 		xprtargs.servername = servername;
448 	}
449 
450 	xprt = xprt_create_transport(&xprtargs);
451 	if (IS_ERR(xprt))
452 		return (struct rpc_clnt *)xprt;
453 
454 	/*
455 	 * By default, kernel RPC client connects from a reserved port.
456 	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
457 	 * but it is always enabled for rpciod, which handles the connect
458 	 * operation.
459 	 */
460 	xprt->resvport = 1;
461 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
462 		xprt->resvport = 0;
463 
464 	clnt = rpc_new_client(args, xprt);
465 	if (IS_ERR(clnt))
466 		return clnt;
467 
468 	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
469 		int err = rpc_ping(clnt);
470 		if (err != 0) {
471 			rpc_shutdown_client(clnt);
472 			return ERR_PTR(err);
473 		}
474 	}
475 
476 	clnt->cl_softrtry = 1;
477 	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
478 		clnt->cl_softrtry = 0;
479 
480 	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
481 		clnt->cl_autobind = 1;
482 	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
483 		clnt->cl_discrtry = 1;
484 	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
485 		clnt->cl_chatty = 1;
486 
487 	return clnt;
488 }
489 EXPORT_SYMBOL_GPL(rpc_create);
490 
491 /*
492  * This function clones the RPC client structure. It allows us to share the
493  * same transport while varying parameters such as the authentication
494  * flavour.
495  */
496 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
497 					   struct rpc_clnt *clnt)
498 {
499 	struct rpc_xprt *xprt;
500 	struct rpc_clnt *new;
501 	int err;
502 
503 	err = -ENOMEM;
504 	rcu_read_lock();
505 	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
506 	rcu_read_unlock();
507 	if (xprt == NULL)
508 		goto out_err;
509 	args->servername = xprt->servername;
510 
511 	new = rpc_new_client(args, xprt);
512 	if (IS_ERR(new)) {
513 		err = PTR_ERR(new);
514 		goto out_put;
515 	}
516 
517 	atomic_inc(&clnt->cl_count);
518 	new->cl_parent = clnt;
519 
520 	/* Turn off autobind on clones */
521 	new->cl_autobind = 0;
522 	new->cl_softrtry = clnt->cl_softrtry;
523 	new->cl_discrtry = clnt->cl_discrtry;
524 	new->cl_chatty = clnt->cl_chatty;
525 	return new;
526 
527 out_put:
528 	xprt_put(xprt);
529 out_err:
530 	dprintk("RPC:       %s: returned error %d\n", __func__, err);
531 	return ERR_PTR(err);
532 }
533 
534 /**
535  * rpc_clone_client - Clone an RPC client structure
536  *
537  * @clnt: RPC client whose parameters are copied
538  *
539  * Returns a fresh RPC client or an ERR_PTR.
540  */
541 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
542 {
543 	struct rpc_create_args args = {
544 		.program	= clnt->cl_program,
545 		.prognumber	= clnt->cl_prog,
546 		.version	= clnt->cl_vers,
547 		.authflavor	= clnt->cl_auth->au_flavor,
548 		.client_name	= clnt->cl_principal,
549 	};
550 	return __rpc_clone_client(&args, clnt);
551 }
552 EXPORT_SYMBOL_GPL(rpc_clone_client);
553 
554 /**
555  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
556  *
557  * @clnt: RPC client whose parameters are copied
558  * @auth: security flavor for new client
559  *
560  * Returns a fresh RPC client or an ERR_PTR.
561  */
562 struct rpc_clnt *
563 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
564 {
565 	struct rpc_create_args args = {
566 		.program	= clnt->cl_program,
567 		.prognumber	= clnt->cl_prog,
568 		.version	= clnt->cl_vers,
569 		.authflavor	= flavor,
570 		.client_name	= clnt->cl_principal,
571 	};
572 	return __rpc_clone_client(&args, clnt);
573 }
574 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
575 
576 /*
577  * Kill all tasks for the given client.
578  * XXX: kill their descendants as well?
579  */
580 void rpc_killall_tasks(struct rpc_clnt *clnt)
581 {
582 	struct rpc_task	*rovr;
583 
584 
585 	if (list_empty(&clnt->cl_tasks))
586 		return;
587 	dprintk("RPC:       killing all tasks for client %p\n", clnt);
588 	/*
589 	 * Spin lock all_tasks to prevent changes...
590 	 */
591 	spin_lock(&clnt->cl_lock);
592 	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
593 		if (!RPC_IS_ACTIVATED(rovr))
594 			continue;
595 		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
596 			rovr->tk_flags |= RPC_TASK_KILLED;
597 			rpc_exit(rovr, -EIO);
598 			if (RPC_IS_QUEUED(rovr))
599 				rpc_wake_up_queued_task(rovr->tk_waitqueue,
600 							rovr);
601 		}
602 	}
603 	spin_unlock(&clnt->cl_lock);
604 }
605 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
606 
607 /*
608  * Properly shut down an RPC client, terminating all outstanding
609  * requests.
610  */
611 void rpc_shutdown_client(struct rpc_clnt *clnt)
612 {
613 	/*
614 	 * To avoid deadlock, never call rpc_shutdown_client from a
615 	 * workqueue context!
616 	 */
617 	WARN_ON_ONCE(current->flags & PF_WQ_WORKER);
618 	might_sleep();
619 
620 	dprintk_rcu("RPC:       shutting down %s client for %s\n",
621 			clnt->cl_protname,
622 			rcu_dereference(clnt->cl_xprt)->servername);
623 
624 	while (!list_empty(&clnt->cl_tasks)) {
625 		rpc_killall_tasks(clnt);
626 		wait_event_timeout(destroy_wait,
627 			list_empty(&clnt->cl_tasks), 1*HZ);
628 	}
629 
630 	rpc_release_client(clnt);
631 }
632 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
633 
634 /*
635  * Free an RPC client
636  */
637 static void
638 rpc_free_client(struct rpc_clnt *clnt)
639 {
640 	dprintk_rcu("RPC:       destroying %s client for %s\n",
641 			clnt->cl_protname,
642 			rcu_dereference(clnt->cl_xprt)->servername);
643 	if (clnt->cl_parent != clnt)
644 		rpc_release_client(clnt->cl_parent);
645 	rpc_unregister_client(clnt);
646 	rpc_clnt_remove_pipedir(clnt);
647 	rpc_free_iostats(clnt->cl_metrics);
648 	kfree(clnt->cl_principal);
649 	clnt->cl_metrics = NULL;
650 	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
651 	rpciod_down();
652 	kfree(clnt);
653 }
654 
655 /*
656  * Free an RPC client
657  */
658 static void
659 rpc_free_auth(struct rpc_clnt *clnt)
660 {
661 	if (clnt->cl_auth == NULL) {
662 		rpc_free_client(clnt);
663 		return;
664 	}
665 
666 	/*
667 	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
668 	 *       release remaining GSS contexts. This mechanism ensures
669 	 *       that it can do so safely.
670 	 */
671 	atomic_inc(&clnt->cl_count);
672 	rpcauth_release(clnt->cl_auth);
673 	clnt->cl_auth = NULL;
674 	if (atomic_dec_and_test(&clnt->cl_count))
675 		rpc_free_client(clnt);
676 }
677 
678 /*
679  * Release reference to the RPC client
680  */
681 void
682 rpc_release_client(struct rpc_clnt *clnt)
683 {
684 	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
685 
686 	if (list_empty(&clnt->cl_tasks))
687 		wake_up(&destroy_wait);
688 	if (atomic_dec_and_test(&clnt->cl_count))
689 		rpc_free_auth(clnt);
690 }
691 
692 /**
693  * rpc_bind_new_program - bind a new RPC program to an existing client
694  * @old: old rpc_client
695  * @program: rpc program to set
696  * @vers: rpc program version
697  *
698  * Clones the rpc client and sets up a new RPC program. This is mainly
699  * of use for enabling different RPC programs to share the same transport.
700  * The Sun NFSv2/v3 ACL protocol can do this.
701  */
702 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
703 				      const struct rpc_program *program,
704 				      u32 vers)
705 {
706 	struct rpc_create_args args = {
707 		.program	= program,
708 		.prognumber	= program->number,
709 		.version	= vers,
710 		.authflavor	= old->cl_auth->au_flavor,
711 		.client_name	= old->cl_principal,
712 	};
713 	struct rpc_clnt *clnt;
714 	int err;
715 
716 	clnt = __rpc_clone_client(&args, old);
717 	if (IS_ERR(clnt))
718 		goto out;
719 	err = rpc_ping(clnt);
720 	if (err != 0) {
721 		rpc_shutdown_client(clnt);
722 		clnt = ERR_PTR(err);
723 	}
724 out:
725 	return clnt;
726 }
727 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
728 
729 void rpc_task_release_client(struct rpc_task *task)
730 {
731 	struct rpc_clnt *clnt = task->tk_client;
732 
733 	if (clnt != NULL) {
734 		/* Remove from client task list */
735 		spin_lock(&clnt->cl_lock);
736 		list_del(&task->tk_task);
737 		spin_unlock(&clnt->cl_lock);
738 		task->tk_client = NULL;
739 
740 		rpc_release_client(clnt);
741 	}
742 }
743 
744 static
745 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
746 {
747 	if (clnt != NULL) {
748 		rpc_task_release_client(task);
749 		task->tk_client = clnt;
750 		atomic_inc(&clnt->cl_count);
751 		if (clnt->cl_softrtry)
752 			task->tk_flags |= RPC_TASK_SOFT;
753 		if (sk_memalloc_socks()) {
754 			struct rpc_xprt *xprt;
755 
756 			rcu_read_lock();
757 			xprt = rcu_dereference(clnt->cl_xprt);
758 			if (xprt->swapper)
759 				task->tk_flags |= RPC_TASK_SWAPPER;
760 			rcu_read_unlock();
761 		}
762 		/* Add to the client's list of all tasks */
763 		spin_lock(&clnt->cl_lock);
764 		list_add_tail(&task->tk_task, &clnt->cl_tasks);
765 		spin_unlock(&clnt->cl_lock);
766 	}
767 }
768 
769 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
770 {
771 	rpc_task_release_client(task);
772 	rpc_task_set_client(task, clnt);
773 }
774 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
775 
776 
777 static void
778 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
779 {
780 	if (msg != NULL) {
781 		task->tk_msg.rpc_proc = msg->rpc_proc;
782 		task->tk_msg.rpc_argp = msg->rpc_argp;
783 		task->tk_msg.rpc_resp = msg->rpc_resp;
784 		if (msg->rpc_cred != NULL)
785 			task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
786 	}
787 }
788 
789 /*
790  * Default callback for async RPC calls
791  */
792 static void
793 rpc_default_callback(struct rpc_task *task, void *data)
794 {
795 }
796 
797 static const struct rpc_call_ops rpc_default_ops = {
798 	.rpc_call_done = rpc_default_callback,
799 };
800 
801 /**
802  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
803  * @task_setup_data: pointer to task initialisation data
804  */
805 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
806 {
807 	struct rpc_task *task;
808 
809 	task = rpc_new_task(task_setup_data);
810 	if (IS_ERR(task))
811 		goto out;
812 
813 	rpc_task_set_client(task, task_setup_data->rpc_client);
814 	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
815 
816 	if (task->tk_action == NULL)
817 		rpc_call_start(task);
818 
819 	atomic_inc(&task->tk_count);
820 	rpc_execute(task);
821 out:
822 	return task;
823 }
824 EXPORT_SYMBOL_GPL(rpc_run_task);
825 
826 /**
827  * rpc_call_sync - Perform a synchronous RPC call
828  * @clnt: pointer to RPC client
829  * @msg: RPC call parameters
830  * @flags: RPC call flags
831  */
832 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
833 {
834 	struct rpc_task	*task;
835 	struct rpc_task_setup task_setup_data = {
836 		.rpc_client = clnt,
837 		.rpc_message = msg,
838 		.callback_ops = &rpc_default_ops,
839 		.flags = flags,
840 	};
841 	int status;
842 
843 	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
844 	if (flags & RPC_TASK_ASYNC) {
845 		rpc_release_calldata(task_setup_data.callback_ops,
846 			task_setup_data.callback_data);
847 		return -EINVAL;
848 	}
849 
850 	task = rpc_run_task(&task_setup_data);
851 	if (IS_ERR(task))
852 		return PTR_ERR(task);
853 	status = task->tk_status;
854 	rpc_put_task(task);
855 	return status;
856 }
857 EXPORT_SYMBOL_GPL(rpc_call_sync);
858 
859 /**
860  * rpc_call_async - Perform an asynchronous RPC call
861  * @clnt: pointer to RPC client
862  * @msg: RPC call parameters
863  * @flags: RPC call flags
864  * @tk_ops: RPC call ops
865  * @data: user call data
866  */
867 int
868 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
869 	       const struct rpc_call_ops *tk_ops, void *data)
870 {
871 	struct rpc_task	*task;
872 	struct rpc_task_setup task_setup_data = {
873 		.rpc_client = clnt,
874 		.rpc_message = msg,
875 		.callback_ops = tk_ops,
876 		.callback_data = data,
877 		.flags = flags|RPC_TASK_ASYNC,
878 	};
879 
880 	task = rpc_run_task(&task_setup_data);
881 	if (IS_ERR(task))
882 		return PTR_ERR(task);
883 	rpc_put_task(task);
884 	return 0;
885 }
886 EXPORT_SYMBOL_GPL(rpc_call_async);
887 
888 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
889 /**
890  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
891  * rpc_execute against it
892  * @req: RPC request
893  * @tk_ops: RPC call ops
894  */
895 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
896 				const struct rpc_call_ops *tk_ops)
897 {
898 	struct rpc_task *task;
899 	struct xdr_buf *xbufp = &req->rq_snd_buf;
900 	struct rpc_task_setup task_setup_data = {
901 		.callback_ops = tk_ops,
902 	};
903 
904 	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
905 	/*
906 	 * Create an rpc_task to send the data
907 	 */
908 	task = rpc_new_task(&task_setup_data);
909 	if (IS_ERR(task)) {
910 		xprt_free_bc_request(req);
911 		goto out;
912 	}
913 	task->tk_rqstp = req;
914 
915 	/*
916 	 * Set up the xdr_buf length.
917 	 * This also indicates that the buffer is XDR encoded already.
918 	 */
919 	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
920 			xbufp->tail[0].iov_len;
921 
922 	task->tk_action = call_bc_transmit;
923 	atomic_inc(&task->tk_count);
924 	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
925 	rpc_execute(task);
926 
927 out:
928 	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
929 	return task;
930 }
931 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
932 
933 void
934 rpc_call_start(struct rpc_task *task)
935 {
936 	task->tk_action = call_start;
937 }
938 EXPORT_SYMBOL_GPL(rpc_call_start);
939 
940 /**
941  * rpc_peeraddr - extract remote peer address from clnt's xprt
942  * @clnt: RPC client structure
943  * @buf: target buffer
944  * @bufsize: length of target buffer
945  *
946  * Returns the number of bytes that are actually in the stored address.
947  */
948 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
949 {
950 	size_t bytes;
951 	struct rpc_xprt *xprt;
952 
953 	rcu_read_lock();
954 	xprt = rcu_dereference(clnt->cl_xprt);
955 
956 	bytes = xprt->addrlen;
957 	if (bytes > bufsize)
958 		bytes = bufsize;
959 	memcpy(buf, &xprt->addr, bytes);
960 	rcu_read_unlock();
961 
962 	return bytes;
963 }
964 EXPORT_SYMBOL_GPL(rpc_peeraddr);
965 
966 /**
967  * rpc_peeraddr2str - return remote peer address in printable format
968  * @clnt: RPC client structure
969  * @format: address format
970  *
971  * NB: the lifetime of the memory referenced by the returned pointer is
972  * the same as the rpc_xprt itself.  As long as the caller uses this
973  * pointer, it must hold the RCU read lock.
974  */
975 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
976 			     enum rpc_display_format_t format)
977 {
978 	struct rpc_xprt *xprt;
979 
980 	xprt = rcu_dereference(clnt->cl_xprt);
981 
982 	if (xprt->address_strings[format] != NULL)
983 		return xprt->address_strings[format];
984 	else
985 		return "unprintable";
986 }
987 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
988 
989 static const struct sockaddr_in rpc_inaddr_loopback = {
990 	.sin_family		= AF_INET,
991 	.sin_addr.s_addr	= htonl(INADDR_ANY),
992 };
993 
994 static const struct sockaddr_in6 rpc_in6addr_loopback = {
995 	.sin6_family		= AF_INET6,
996 	.sin6_addr		= IN6ADDR_ANY_INIT,
997 };
998 
999 /*
1000  * Try a getsockname() on a connected datagram socket.  Using a
1001  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1002  * This conserves the ephemeral port number space.
1003  *
1004  * Returns zero and fills in "buf" if successful; otherwise, a
1005  * negative errno is returned.
1006  */
1007 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1008 			struct sockaddr *buf, int buflen)
1009 {
1010 	struct socket *sock;
1011 	int err;
1012 
1013 	err = __sock_create(net, sap->sa_family,
1014 				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1015 	if (err < 0) {
1016 		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1017 		goto out;
1018 	}
1019 
1020 	switch (sap->sa_family) {
1021 	case AF_INET:
1022 		err = kernel_bind(sock,
1023 				(struct sockaddr *)&rpc_inaddr_loopback,
1024 				sizeof(rpc_inaddr_loopback));
1025 		break;
1026 	case AF_INET6:
1027 		err = kernel_bind(sock,
1028 				(struct sockaddr *)&rpc_in6addr_loopback,
1029 				sizeof(rpc_in6addr_loopback));
1030 		break;
1031 	default:
1032 		err = -EAFNOSUPPORT;
1033 		goto out;
1034 	}
1035 	if (err < 0) {
1036 		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1037 		goto out_release;
1038 	}
1039 
1040 	err = kernel_connect(sock, sap, salen, 0);
1041 	if (err < 0) {
1042 		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1043 		goto out_release;
1044 	}
1045 
1046 	err = kernel_getsockname(sock, buf, &buflen);
1047 	if (err < 0) {
1048 		dprintk("RPC:       getsockname failed (%d)\n", err);
1049 		goto out_release;
1050 	}
1051 
1052 	err = 0;
1053 	if (buf->sa_family == AF_INET6) {
1054 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1055 		sin6->sin6_scope_id = 0;
1056 	}
1057 	dprintk("RPC:       %s succeeded\n", __func__);
1058 
1059 out_release:
1060 	sock_release(sock);
1061 out:
1062 	return err;
1063 }
1064 
1065 /*
1066  * Scraping a connected socket failed, so we don't have a useable
1067  * local address.  Fallback: generate an address that will prevent
1068  * the server from calling us back.
1069  *
1070  * Returns zero and fills in "buf" if successful; otherwise, a
1071  * negative errno is returned.
1072  */
1073 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1074 {
1075 	switch (family) {
1076 	case AF_INET:
1077 		if (buflen < sizeof(rpc_inaddr_loopback))
1078 			return -EINVAL;
1079 		memcpy(buf, &rpc_inaddr_loopback,
1080 				sizeof(rpc_inaddr_loopback));
1081 		break;
1082 	case AF_INET6:
1083 		if (buflen < sizeof(rpc_in6addr_loopback))
1084 			return -EINVAL;
1085 		memcpy(buf, &rpc_in6addr_loopback,
1086 				sizeof(rpc_in6addr_loopback));
1087 	default:
1088 		dprintk("RPC:       %s: address family not supported\n",
1089 			__func__);
1090 		return -EAFNOSUPPORT;
1091 	}
1092 	dprintk("RPC:       %s: succeeded\n", __func__);
1093 	return 0;
1094 }
1095 
1096 /**
1097  * rpc_localaddr - discover local endpoint address for an RPC client
1098  * @clnt: RPC client structure
1099  * @buf: target buffer
1100  * @buflen: size of target buffer, in bytes
1101  *
1102  * Returns zero and fills in "buf" and "buflen" if successful;
1103  * otherwise, a negative errno is returned.
1104  *
1105  * This works even if the underlying transport is not currently connected,
1106  * or if the upper layer never previously provided a source address.
1107  *
1108  * The result of this function call is transient: multiple calls in
1109  * succession may give different results, depending on how local
1110  * networking configuration changes over time.
1111  */
1112 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1113 {
1114 	struct sockaddr_storage address;
1115 	struct sockaddr *sap = (struct sockaddr *)&address;
1116 	struct rpc_xprt *xprt;
1117 	struct net *net;
1118 	size_t salen;
1119 	int err;
1120 
1121 	rcu_read_lock();
1122 	xprt = rcu_dereference(clnt->cl_xprt);
1123 	salen = xprt->addrlen;
1124 	memcpy(sap, &xprt->addr, salen);
1125 	net = get_net(xprt->xprt_net);
1126 	rcu_read_unlock();
1127 
1128 	rpc_set_port(sap, 0);
1129 	err = rpc_sockname(net, sap, salen, buf, buflen);
1130 	put_net(net);
1131 	if (err != 0)
1132 		/* Couldn't discover local address, return ANYADDR */
1133 		return rpc_anyaddr(sap->sa_family, buf, buflen);
1134 	return 0;
1135 }
1136 EXPORT_SYMBOL_GPL(rpc_localaddr);
1137 
1138 void
1139 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1140 {
1141 	struct rpc_xprt *xprt;
1142 
1143 	rcu_read_lock();
1144 	xprt = rcu_dereference(clnt->cl_xprt);
1145 	if (xprt->ops->set_buffer_size)
1146 		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1147 	rcu_read_unlock();
1148 }
1149 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1150 
1151 /**
1152  * rpc_protocol - Get transport protocol number for an RPC client
1153  * @clnt: RPC client to query
1154  *
1155  */
1156 int rpc_protocol(struct rpc_clnt *clnt)
1157 {
1158 	int protocol;
1159 
1160 	rcu_read_lock();
1161 	protocol = rcu_dereference(clnt->cl_xprt)->prot;
1162 	rcu_read_unlock();
1163 	return protocol;
1164 }
1165 EXPORT_SYMBOL_GPL(rpc_protocol);
1166 
1167 /**
1168  * rpc_net_ns - Get the network namespace for this RPC client
1169  * @clnt: RPC client to query
1170  *
1171  */
1172 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1173 {
1174 	struct net *ret;
1175 
1176 	rcu_read_lock();
1177 	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1178 	rcu_read_unlock();
1179 	return ret;
1180 }
1181 EXPORT_SYMBOL_GPL(rpc_net_ns);
1182 
1183 /**
1184  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1185  * @clnt: RPC client to query
1186  *
1187  * For stream transports, this is one RPC record fragment (see RFC
1188  * 1831), as we don't support multi-record requests yet.  For datagram
1189  * transports, this is the size of an IP packet minus the IP, UDP, and
1190  * RPC header sizes.
1191  */
1192 size_t rpc_max_payload(struct rpc_clnt *clnt)
1193 {
1194 	size_t ret;
1195 
1196 	rcu_read_lock();
1197 	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1198 	rcu_read_unlock();
1199 	return ret;
1200 }
1201 EXPORT_SYMBOL_GPL(rpc_max_payload);
1202 
1203 /**
1204  * rpc_force_rebind - force transport to check that remote port is unchanged
1205  * @clnt: client to rebind
1206  *
1207  */
1208 void rpc_force_rebind(struct rpc_clnt *clnt)
1209 {
1210 	if (clnt->cl_autobind) {
1211 		rcu_read_lock();
1212 		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1213 		rcu_read_unlock();
1214 	}
1215 }
1216 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1217 
1218 /*
1219  * Restart an (async) RPC call from the call_prepare state.
1220  * Usually called from within the exit handler.
1221  */
1222 int
1223 rpc_restart_call_prepare(struct rpc_task *task)
1224 {
1225 	if (RPC_ASSASSINATED(task))
1226 		return 0;
1227 	task->tk_action = call_start;
1228 	if (task->tk_ops->rpc_call_prepare != NULL)
1229 		task->tk_action = rpc_prepare_task;
1230 	return 1;
1231 }
1232 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1233 
1234 /*
1235  * Restart an (async) RPC call. Usually called from within the
1236  * exit handler.
1237  */
1238 int
1239 rpc_restart_call(struct rpc_task *task)
1240 {
1241 	if (RPC_ASSASSINATED(task))
1242 		return 0;
1243 	task->tk_action = call_start;
1244 	return 1;
1245 }
1246 EXPORT_SYMBOL_GPL(rpc_restart_call);
1247 
1248 #ifdef RPC_DEBUG
1249 static const char *rpc_proc_name(const struct rpc_task *task)
1250 {
1251 	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1252 
1253 	if (proc) {
1254 		if (proc->p_name)
1255 			return proc->p_name;
1256 		else
1257 			return "NULL";
1258 	} else
1259 		return "no proc";
1260 }
1261 #endif
1262 
1263 /*
1264  * 0.  Initial state
1265  *
1266  *     Other FSM states can be visited zero or more times, but
1267  *     this state is visited exactly once for each RPC.
1268  */
1269 static void
1270 call_start(struct rpc_task *task)
1271 {
1272 	struct rpc_clnt	*clnt = task->tk_client;
1273 
1274 	dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1275 			clnt->cl_protname, clnt->cl_vers,
1276 			rpc_proc_name(task),
1277 			(RPC_IS_ASYNC(task) ? "async" : "sync"));
1278 
1279 	/* Increment call count */
1280 	task->tk_msg.rpc_proc->p_count++;
1281 	clnt->cl_stats->rpccnt++;
1282 	task->tk_action = call_reserve;
1283 }
1284 
1285 /*
1286  * 1.	Reserve an RPC call slot
1287  */
1288 static void
1289 call_reserve(struct rpc_task *task)
1290 {
1291 	dprint_status(task);
1292 
1293 	task->tk_status  = 0;
1294 	task->tk_action  = call_reserveresult;
1295 	xprt_reserve(task);
1296 }
1297 
1298 /*
1299  * 1b.	Grok the result of xprt_reserve()
1300  */
1301 static void
1302 call_reserveresult(struct rpc_task *task)
1303 {
1304 	int status = task->tk_status;
1305 
1306 	dprint_status(task);
1307 
1308 	/*
1309 	 * After a call to xprt_reserve(), we must have either
1310 	 * a request slot or else an error status.
1311 	 */
1312 	task->tk_status = 0;
1313 	if (status >= 0) {
1314 		if (task->tk_rqstp) {
1315 			task->tk_action = call_refresh;
1316 			return;
1317 		}
1318 
1319 		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1320 				__func__, status);
1321 		rpc_exit(task, -EIO);
1322 		return;
1323 	}
1324 
1325 	/*
1326 	 * Even though there was an error, we may have acquired
1327 	 * a request slot somehow.  Make sure not to leak it.
1328 	 */
1329 	if (task->tk_rqstp) {
1330 		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1331 				__func__, status);
1332 		xprt_release(task);
1333 	}
1334 
1335 	switch (status) {
1336 	case -ENOMEM:
1337 		rpc_delay(task, HZ >> 2);
1338 	case -EAGAIN:	/* woken up; retry */
1339 		task->tk_action = call_reserve;
1340 		return;
1341 	case -EIO:	/* probably a shutdown */
1342 		break;
1343 	default:
1344 		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1345 				__func__, status);
1346 		break;
1347 	}
1348 	rpc_exit(task, status);
1349 }
1350 
1351 /*
1352  * 2.	Bind and/or refresh the credentials
1353  */
1354 static void
1355 call_refresh(struct rpc_task *task)
1356 {
1357 	dprint_status(task);
1358 
1359 	task->tk_action = call_refreshresult;
1360 	task->tk_status = 0;
1361 	task->tk_client->cl_stats->rpcauthrefresh++;
1362 	rpcauth_refreshcred(task);
1363 }
1364 
1365 /*
1366  * 2a.	Process the results of a credential refresh
1367  */
1368 static void
1369 call_refreshresult(struct rpc_task *task)
1370 {
1371 	int status = task->tk_status;
1372 
1373 	dprint_status(task);
1374 
1375 	task->tk_status = 0;
1376 	task->tk_action = call_refresh;
1377 	switch (status) {
1378 	case 0:
1379 		if (rpcauth_uptodatecred(task))
1380 			task->tk_action = call_allocate;
1381 		return;
1382 	case -ETIMEDOUT:
1383 		rpc_delay(task, 3*HZ);
1384 	case -EKEYEXPIRED:
1385 	case -EAGAIN:
1386 		status = -EACCES;
1387 		if (!task->tk_cred_retry)
1388 			break;
1389 		task->tk_cred_retry--;
1390 		dprintk("RPC: %5u %s: retry refresh creds\n",
1391 				task->tk_pid, __func__);
1392 		return;
1393 	}
1394 	dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1395 				task->tk_pid, __func__, status);
1396 	rpc_exit(task, status);
1397 }
1398 
1399 /*
1400  * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1401  *	(Note: buffer memory is freed in xprt_release).
1402  */
1403 static void
1404 call_allocate(struct rpc_task *task)
1405 {
1406 	unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1407 	struct rpc_rqst *req = task->tk_rqstp;
1408 	struct rpc_xprt *xprt = task->tk_xprt;
1409 	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1410 
1411 	dprint_status(task);
1412 
1413 	task->tk_status = 0;
1414 	task->tk_action = call_bind;
1415 
1416 	if (req->rq_buffer)
1417 		return;
1418 
1419 	if (proc->p_proc != 0) {
1420 		BUG_ON(proc->p_arglen == 0);
1421 		if (proc->p_decode != NULL)
1422 			BUG_ON(proc->p_replen == 0);
1423 	}
1424 
1425 	/*
1426 	 * Calculate the size (in quads) of the RPC call
1427 	 * and reply headers, and convert both values
1428 	 * to byte sizes.
1429 	 */
1430 	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1431 	req->rq_callsize <<= 2;
1432 	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1433 	req->rq_rcvsize <<= 2;
1434 
1435 	req->rq_buffer = xprt->ops->buf_alloc(task,
1436 					req->rq_callsize + req->rq_rcvsize);
1437 	if (req->rq_buffer != NULL)
1438 		return;
1439 
1440 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1441 
1442 	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1443 		task->tk_action = call_allocate;
1444 		rpc_delay(task, HZ>>4);
1445 		return;
1446 	}
1447 
1448 	rpc_exit(task, -ERESTARTSYS);
1449 }
1450 
1451 static inline int
1452 rpc_task_need_encode(struct rpc_task *task)
1453 {
1454 	return task->tk_rqstp->rq_snd_buf.len == 0;
1455 }
1456 
1457 static inline void
1458 rpc_task_force_reencode(struct rpc_task *task)
1459 {
1460 	task->tk_rqstp->rq_snd_buf.len = 0;
1461 	task->tk_rqstp->rq_bytes_sent = 0;
1462 }
1463 
1464 static inline void
1465 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1466 {
1467 	buf->head[0].iov_base = start;
1468 	buf->head[0].iov_len = len;
1469 	buf->tail[0].iov_len = 0;
1470 	buf->page_len = 0;
1471 	buf->flags = 0;
1472 	buf->len = 0;
1473 	buf->buflen = len;
1474 }
1475 
1476 /*
1477  * 3.	Encode arguments of an RPC call
1478  */
1479 static void
1480 rpc_xdr_encode(struct rpc_task *task)
1481 {
1482 	struct rpc_rqst	*req = task->tk_rqstp;
1483 	kxdreproc_t	encode;
1484 	__be32		*p;
1485 
1486 	dprint_status(task);
1487 
1488 	rpc_xdr_buf_init(&req->rq_snd_buf,
1489 			 req->rq_buffer,
1490 			 req->rq_callsize);
1491 	rpc_xdr_buf_init(&req->rq_rcv_buf,
1492 			 (char *)req->rq_buffer + req->rq_callsize,
1493 			 req->rq_rcvsize);
1494 
1495 	p = rpc_encode_header(task);
1496 	if (p == NULL) {
1497 		printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1498 		rpc_exit(task, -EIO);
1499 		return;
1500 	}
1501 
1502 	encode = task->tk_msg.rpc_proc->p_encode;
1503 	if (encode == NULL)
1504 		return;
1505 
1506 	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1507 			task->tk_msg.rpc_argp);
1508 }
1509 
1510 /*
1511  * 4.	Get the server port number if not yet set
1512  */
1513 static void
1514 call_bind(struct rpc_task *task)
1515 {
1516 	struct rpc_xprt *xprt = task->tk_xprt;
1517 
1518 	dprint_status(task);
1519 
1520 	task->tk_action = call_connect;
1521 	if (!xprt_bound(xprt)) {
1522 		task->tk_action = call_bind_status;
1523 		task->tk_timeout = xprt->bind_timeout;
1524 		xprt->ops->rpcbind(task);
1525 	}
1526 }
1527 
1528 /*
1529  * 4a.	Sort out bind result
1530  */
1531 static void
1532 call_bind_status(struct rpc_task *task)
1533 {
1534 	int status = -EIO;
1535 
1536 	if (task->tk_status >= 0) {
1537 		dprint_status(task);
1538 		task->tk_status = 0;
1539 		task->tk_action = call_connect;
1540 		return;
1541 	}
1542 
1543 	trace_rpc_bind_status(task);
1544 	switch (task->tk_status) {
1545 	case -ENOMEM:
1546 		dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1547 		rpc_delay(task, HZ >> 2);
1548 		goto retry_timeout;
1549 	case -EACCES:
1550 		dprintk("RPC: %5u remote rpcbind: RPC program/version "
1551 				"unavailable\n", task->tk_pid);
1552 		/* fail immediately if this is an RPC ping */
1553 		if (task->tk_msg.rpc_proc->p_proc == 0) {
1554 			status = -EOPNOTSUPP;
1555 			break;
1556 		}
1557 		if (task->tk_rebind_retry == 0)
1558 			break;
1559 		task->tk_rebind_retry--;
1560 		rpc_delay(task, 3*HZ);
1561 		goto retry_timeout;
1562 	case -ETIMEDOUT:
1563 		dprintk("RPC: %5u rpcbind request timed out\n",
1564 				task->tk_pid);
1565 		goto retry_timeout;
1566 	case -EPFNOSUPPORT:
1567 		/* server doesn't support any rpcbind version we know of */
1568 		dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1569 				task->tk_pid);
1570 		break;
1571 	case -EPROTONOSUPPORT:
1572 		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1573 				task->tk_pid);
1574 		task->tk_status = 0;
1575 		task->tk_action = call_bind;
1576 		return;
1577 	case -ECONNREFUSED:		/* connection problems */
1578 	case -ECONNRESET:
1579 	case -ENOTCONN:
1580 	case -EHOSTDOWN:
1581 	case -EHOSTUNREACH:
1582 	case -ENETUNREACH:
1583 	case -EPIPE:
1584 		dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1585 				task->tk_pid, task->tk_status);
1586 		if (!RPC_IS_SOFTCONN(task)) {
1587 			rpc_delay(task, 5*HZ);
1588 			goto retry_timeout;
1589 		}
1590 		status = task->tk_status;
1591 		break;
1592 	default:
1593 		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1594 				task->tk_pid, -task->tk_status);
1595 	}
1596 
1597 	rpc_exit(task, status);
1598 	return;
1599 
1600 retry_timeout:
1601 	task->tk_action = call_timeout;
1602 }
1603 
1604 /*
1605  * 4b.	Connect to the RPC server
1606  */
1607 static void
1608 call_connect(struct rpc_task *task)
1609 {
1610 	struct rpc_xprt *xprt = task->tk_xprt;
1611 
1612 	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1613 			task->tk_pid, xprt,
1614 			(xprt_connected(xprt) ? "is" : "is not"));
1615 
1616 	task->tk_action = call_transmit;
1617 	if (!xprt_connected(xprt)) {
1618 		task->tk_action = call_connect_status;
1619 		if (task->tk_status < 0)
1620 			return;
1621 		xprt_connect(task);
1622 	}
1623 }
1624 
1625 /*
1626  * 4c.	Sort out connect result
1627  */
1628 static void
1629 call_connect_status(struct rpc_task *task)
1630 {
1631 	struct rpc_clnt *clnt = task->tk_client;
1632 	int status = task->tk_status;
1633 
1634 	dprint_status(task);
1635 
1636 	task->tk_status = 0;
1637 	if (status >= 0 || status == -EAGAIN) {
1638 		clnt->cl_stats->netreconn++;
1639 		task->tk_action = call_transmit;
1640 		return;
1641 	}
1642 
1643 	trace_rpc_connect_status(task, status);
1644 	switch (status) {
1645 		/* if soft mounted, test if we've timed out */
1646 	case -ETIMEDOUT:
1647 		task->tk_action = call_timeout;
1648 		break;
1649 	default:
1650 		rpc_exit(task, -EIO);
1651 	}
1652 }
1653 
1654 /*
1655  * 5.	Transmit the RPC request, and wait for reply
1656  */
1657 static void
1658 call_transmit(struct rpc_task *task)
1659 {
1660 	dprint_status(task);
1661 
1662 	task->tk_action = call_status;
1663 	if (task->tk_status < 0)
1664 		return;
1665 	task->tk_status = xprt_prepare_transmit(task);
1666 	if (task->tk_status != 0)
1667 		return;
1668 	task->tk_action = call_transmit_status;
1669 	/* Encode here so that rpcsec_gss can use correct sequence number. */
1670 	if (rpc_task_need_encode(task)) {
1671 		rpc_xdr_encode(task);
1672 		/* Did the encode result in an error condition? */
1673 		if (task->tk_status != 0) {
1674 			/* Was the error nonfatal? */
1675 			if (task->tk_status == -EAGAIN)
1676 				rpc_delay(task, HZ >> 4);
1677 			else
1678 				rpc_exit(task, task->tk_status);
1679 			return;
1680 		}
1681 	}
1682 	xprt_transmit(task);
1683 	if (task->tk_status < 0)
1684 		return;
1685 	/*
1686 	 * On success, ensure that we call xprt_end_transmit() before sleeping
1687 	 * in order to allow access to the socket to other RPC requests.
1688 	 */
1689 	call_transmit_status(task);
1690 	if (rpc_reply_expected(task))
1691 		return;
1692 	task->tk_action = rpc_exit_task;
1693 	rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1694 }
1695 
1696 /*
1697  * 5a.	Handle cleanup after a transmission
1698  */
1699 static void
1700 call_transmit_status(struct rpc_task *task)
1701 {
1702 	task->tk_action = call_status;
1703 
1704 	/*
1705 	 * Common case: success.  Force the compiler to put this
1706 	 * test first.
1707 	 */
1708 	if (task->tk_status == 0) {
1709 		xprt_end_transmit(task);
1710 		rpc_task_force_reencode(task);
1711 		return;
1712 	}
1713 
1714 	switch (task->tk_status) {
1715 	case -EAGAIN:
1716 		break;
1717 	default:
1718 		dprint_status(task);
1719 		xprt_end_transmit(task);
1720 		rpc_task_force_reencode(task);
1721 		break;
1722 		/*
1723 		 * Special cases: if we've been waiting on the
1724 		 * socket's write_space() callback, or if the
1725 		 * socket just returned a connection error,
1726 		 * then hold onto the transport lock.
1727 		 */
1728 	case -ECONNREFUSED:
1729 	case -EHOSTDOWN:
1730 	case -EHOSTUNREACH:
1731 	case -ENETUNREACH:
1732 		if (RPC_IS_SOFTCONN(task)) {
1733 			xprt_end_transmit(task);
1734 			rpc_exit(task, task->tk_status);
1735 			break;
1736 		}
1737 	case -ECONNRESET:
1738 	case -ENOTCONN:
1739 	case -EPIPE:
1740 		rpc_task_force_reencode(task);
1741 	}
1742 }
1743 
1744 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1745 /*
1746  * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
1747  * addition, disconnect on connectivity errors.
1748  */
1749 static void
1750 call_bc_transmit(struct rpc_task *task)
1751 {
1752 	struct rpc_rqst *req = task->tk_rqstp;
1753 
1754 	task->tk_status = xprt_prepare_transmit(task);
1755 	if (task->tk_status == -EAGAIN) {
1756 		/*
1757 		 * Could not reserve the transport. Try again after the
1758 		 * transport is released.
1759 		 */
1760 		task->tk_status = 0;
1761 		task->tk_action = call_bc_transmit;
1762 		return;
1763 	}
1764 
1765 	task->tk_action = rpc_exit_task;
1766 	if (task->tk_status < 0) {
1767 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1768 			"error: %d\n", task->tk_status);
1769 		return;
1770 	}
1771 
1772 	xprt_transmit(task);
1773 	xprt_end_transmit(task);
1774 	dprint_status(task);
1775 	switch (task->tk_status) {
1776 	case 0:
1777 		/* Success */
1778 		break;
1779 	case -EHOSTDOWN:
1780 	case -EHOSTUNREACH:
1781 	case -ENETUNREACH:
1782 	case -ETIMEDOUT:
1783 		/*
1784 		 * Problem reaching the server.  Disconnect and let the
1785 		 * forechannel reestablish the connection.  The server will
1786 		 * have to retransmit the backchannel request and we'll
1787 		 * reprocess it.  Since these ops are idempotent, there's no
1788 		 * need to cache our reply at this time.
1789 		 */
1790 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1791 			"error: %d\n", task->tk_status);
1792 		xprt_conditional_disconnect(task->tk_xprt,
1793 			req->rq_connect_cookie);
1794 		break;
1795 	default:
1796 		/*
1797 		 * We were unable to reply and will have to drop the
1798 		 * request.  The server should reconnect and retransmit.
1799 		 */
1800 		WARN_ON_ONCE(task->tk_status == -EAGAIN);
1801 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1802 			"error: %d\n", task->tk_status);
1803 		break;
1804 	}
1805 	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1806 }
1807 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1808 
1809 /*
1810  * 6.	Sort out the RPC call status
1811  */
1812 static void
1813 call_status(struct rpc_task *task)
1814 {
1815 	struct rpc_clnt	*clnt = task->tk_client;
1816 	struct rpc_rqst	*req = task->tk_rqstp;
1817 	int		status;
1818 
1819 	if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1820 		task->tk_status = req->rq_reply_bytes_recvd;
1821 
1822 	dprint_status(task);
1823 
1824 	status = task->tk_status;
1825 	if (status >= 0) {
1826 		task->tk_action = call_decode;
1827 		return;
1828 	}
1829 
1830 	trace_rpc_call_status(task);
1831 	task->tk_status = 0;
1832 	switch(status) {
1833 	case -EHOSTDOWN:
1834 	case -EHOSTUNREACH:
1835 	case -ENETUNREACH:
1836 		/*
1837 		 * Delay any retries for 3 seconds, then handle as if it
1838 		 * were a timeout.
1839 		 */
1840 		rpc_delay(task, 3*HZ);
1841 	case -ETIMEDOUT:
1842 		task->tk_action = call_timeout;
1843 		if (task->tk_client->cl_discrtry)
1844 			xprt_conditional_disconnect(task->tk_xprt,
1845 					req->rq_connect_cookie);
1846 		break;
1847 	case -ECONNRESET:
1848 	case -ECONNREFUSED:
1849 		rpc_force_rebind(clnt);
1850 		rpc_delay(task, 3*HZ);
1851 	case -EPIPE:
1852 	case -ENOTCONN:
1853 		task->tk_action = call_bind;
1854 		break;
1855 	case -EAGAIN:
1856 		task->tk_action = call_transmit;
1857 		break;
1858 	case -EIO:
1859 		/* shutdown or soft timeout */
1860 		rpc_exit(task, status);
1861 		break;
1862 	default:
1863 		if (clnt->cl_chatty)
1864 			printk("%s: RPC call returned error %d\n",
1865 			       clnt->cl_protname, -status);
1866 		rpc_exit(task, status);
1867 	}
1868 }
1869 
1870 /*
1871  * 6a.	Handle RPC timeout
1872  * 	We do not release the request slot, so we keep using the
1873  *	same XID for all retransmits.
1874  */
1875 static void
1876 call_timeout(struct rpc_task *task)
1877 {
1878 	struct rpc_clnt	*clnt = task->tk_client;
1879 
1880 	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1881 		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1882 		goto retry;
1883 	}
1884 
1885 	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1886 	task->tk_timeouts++;
1887 
1888 	if (RPC_IS_SOFTCONN(task)) {
1889 		rpc_exit(task, -ETIMEDOUT);
1890 		return;
1891 	}
1892 	if (RPC_IS_SOFT(task)) {
1893 		if (clnt->cl_chatty) {
1894 			rcu_read_lock();
1895 			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1896 				clnt->cl_protname,
1897 				rcu_dereference(clnt->cl_xprt)->servername);
1898 			rcu_read_unlock();
1899 		}
1900 		if (task->tk_flags & RPC_TASK_TIMEOUT)
1901 			rpc_exit(task, -ETIMEDOUT);
1902 		else
1903 			rpc_exit(task, -EIO);
1904 		return;
1905 	}
1906 
1907 	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1908 		task->tk_flags |= RPC_CALL_MAJORSEEN;
1909 		if (clnt->cl_chatty) {
1910 			rcu_read_lock();
1911 			printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1912 			clnt->cl_protname,
1913 			rcu_dereference(clnt->cl_xprt)->servername);
1914 			rcu_read_unlock();
1915 		}
1916 	}
1917 	rpc_force_rebind(clnt);
1918 	/*
1919 	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
1920 	 * event? RFC2203 requires the server to drop all such requests.
1921 	 */
1922 	rpcauth_invalcred(task);
1923 
1924 retry:
1925 	clnt->cl_stats->rpcretrans++;
1926 	task->tk_action = call_bind;
1927 	task->tk_status = 0;
1928 }
1929 
1930 /*
1931  * 7.	Decode the RPC reply
1932  */
1933 static void
1934 call_decode(struct rpc_task *task)
1935 {
1936 	struct rpc_clnt	*clnt = task->tk_client;
1937 	struct rpc_rqst	*req = task->tk_rqstp;
1938 	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1939 	__be32		*p;
1940 
1941 	dprint_status(task);
1942 
1943 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1944 		if (clnt->cl_chatty) {
1945 			rcu_read_lock();
1946 			printk(KERN_NOTICE "%s: server %s OK\n",
1947 				clnt->cl_protname,
1948 				rcu_dereference(clnt->cl_xprt)->servername);
1949 			rcu_read_unlock();
1950 		}
1951 		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1952 	}
1953 
1954 	/*
1955 	 * Ensure that we see all writes made by xprt_complete_rqst()
1956 	 * before it changed req->rq_reply_bytes_recvd.
1957 	 */
1958 	smp_rmb();
1959 	req->rq_rcv_buf.len = req->rq_private_buf.len;
1960 
1961 	/* Check that the softirq receive buffer is valid */
1962 	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1963 				sizeof(req->rq_rcv_buf)) != 0);
1964 
1965 	if (req->rq_rcv_buf.len < 12) {
1966 		if (!RPC_IS_SOFT(task)) {
1967 			task->tk_action = call_bind;
1968 			clnt->cl_stats->rpcretrans++;
1969 			goto out_retry;
1970 		}
1971 		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1972 				clnt->cl_protname, task->tk_status);
1973 		task->tk_action = call_timeout;
1974 		goto out_retry;
1975 	}
1976 
1977 	p = rpc_verify_header(task);
1978 	if (IS_ERR(p)) {
1979 		if (p == ERR_PTR(-EAGAIN))
1980 			goto out_retry;
1981 		return;
1982 	}
1983 
1984 	task->tk_action = rpc_exit_task;
1985 
1986 	if (decode) {
1987 		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1988 						      task->tk_msg.rpc_resp);
1989 	}
1990 	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1991 			task->tk_status);
1992 	return;
1993 out_retry:
1994 	task->tk_status = 0;
1995 	/* Note: rpc_verify_header() may have freed the RPC slot */
1996 	if (task->tk_rqstp == req) {
1997 		req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
1998 		if (task->tk_client->cl_discrtry)
1999 			xprt_conditional_disconnect(task->tk_xprt,
2000 					req->rq_connect_cookie);
2001 	}
2002 }
2003 
2004 static __be32 *
2005 rpc_encode_header(struct rpc_task *task)
2006 {
2007 	struct rpc_clnt *clnt = task->tk_client;
2008 	struct rpc_rqst	*req = task->tk_rqstp;
2009 	__be32		*p = req->rq_svec[0].iov_base;
2010 
2011 	/* FIXME: check buffer size? */
2012 
2013 	p = xprt_skip_transport_header(task->tk_xprt, p);
2014 	*p++ = req->rq_xid;		/* XID */
2015 	*p++ = htonl(RPC_CALL);		/* CALL */
2016 	*p++ = htonl(RPC_VERSION);	/* RPC version */
2017 	*p++ = htonl(clnt->cl_prog);	/* program number */
2018 	*p++ = htonl(clnt->cl_vers);	/* program version */
2019 	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
2020 	p = rpcauth_marshcred(task, p);
2021 	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2022 	return p;
2023 }
2024 
2025 static __be32 *
2026 rpc_verify_header(struct rpc_task *task)
2027 {
2028 	struct rpc_clnt *clnt = task->tk_client;
2029 	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2030 	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2031 	__be32	*p = iov->iov_base;
2032 	u32 n;
2033 	int error = -EACCES;
2034 
2035 	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2036 		/* RFC-1014 says that the representation of XDR data must be a
2037 		 * multiple of four bytes
2038 		 * - if it isn't pointer subtraction in the NFS client may give
2039 		 *   undefined results
2040 		 */
2041 		dprintk("RPC: %5u %s: XDR representation not a multiple of"
2042 		       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2043 		       task->tk_rqstp->rq_rcv_buf.len);
2044 		goto out_eio;
2045 	}
2046 	if ((len -= 3) < 0)
2047 		goto out_overflow;
2048 
2049 	p += 1; /* skip XID */
2050 	if ((n = ntohl(*p++)) != RPC_REPLY) {
2051 		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2052 			task->tk_pid, __func__, n);
2053 		goto out_garbage;
2054 	}
2055 
2056 	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2057 		if (--len < 0)
2058 			goto out_overflow;
2059 		switch ((n = ntohl(*p++))) {
2060 		case RPC_AUTH_ERROR:
2061 			break;
2062 		case RPC_MISMATCH:
2063 			dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2064 				task->tk_pid, __func__);
2065 			error = -EPROTONOSUPPORT;
2066 			goto out_err;
2067 		default:
2068 			dprintk("RPC: %5u %s: RPC call rejected, "
2069 				"unknown error: %x\n",
2070 				task->tk_pid, __func__, n);
2071 			goto out_eio;
2072 		}
2073 		if (--len < 0)
2074 			goto out_overflow;
2075 		switch ((n = ntohl(*p++))) {
2076 		case RPC_AUTH_REJECTEDCRED:
2077 		case RPC_AUTH_REJECTEDVERF:
2078 		case RPCSEC_GSS_CREDPROBLEM:
2079 		case RPCSEC_GSS_CTXPROBLEM:
2080 			if (!task->tk_cred_retry)
2081 				break;
2082 			task->tk_cred_retry--;
2083 			dprintk("RPC: %5u %s: retry stale creds\n",
2084 					task->tk_pid, __func__);
2085 			rpcauth_invalcred(task);
2086 			/* Ensure we obtain a new XID! */
2087 			xprt_release(task);
2088 			task->tk_action = call_reserve;
2089 			goto out_retry;
2090 		case RPC_AUTH_BADCRED:
2091 		case RPC_AUTH_BADVERF:
2092 			/* possibly garbled cred/verf? */
2093 			if (!task->tk_garb_retry)
2094 				break;
2095 			task->tk_garb_retry--;
2096 			dprintk("RPC: %5u %s: retry garbled creds\n",
2097 					task->tk_pid, __func__);
2098 			task->tk_action = call_bind;
2099 			goto out_retry;
2100 		case RPC_AUTH_TOOWEAK:
2101 			rcu_read_lock();
2102 			printk(KERN_NOTICE "RPC: server %s requires stronger "
2103 			       "authentication.\n",
2104 			       rcu_dereference(clnt->cl_xprt)->servername);
2105 			rcu_read_unlock();
2106 			break;
2107 		default:
2108 			dprintk("RPC: %5u %s: unknown auth error: %x\n",
2109 					task->tk_pid, __func__, n);
2110 			error = -EIO;
2111 		}
2112 		dprintk("RPC: %5u %s: call rejected %d\n",
2113 				task->tk_pid, __func__, n);
2114 		goto out_err;
2115 	}
2116 	if (!(p = rpcauth_checkverf(task, p))) {
2117 		dprintk("RPC: %5u %s: auth check failed\n",
2118 				task->tk_pid, __func__);
2119 		goto out_garbage;		/* bad verifier, retry */
2120 	}
2121 	len = p - (__be32 *)iov->iov_base - 1;
2122 	if (len < 0)
2123 		goto out_overflow;
2124 	switch ((n = ntohl(*p++))) {
2125 	case RPC_SUCCESS:
2126 		return p;
2127 	case RPC_PROG_UNAVAIL:
2128 		dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2129 				"by server %s\n", task->tk_pid, __func__,
2130 				(unsigned int)clnt->cl_prog,
2131 				rcu_dereference(clnt->cl_xprt)->servername);
2132 		error = -EPFNOSUPPORT;
2133 		goto out_err;
2134 	case RPC_PROG_MISMATCH:
2135 		dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2136 				"by server %s\n", task->tk_pid, __func__,
2137 				(unsigned int)clnt->cl_prog,
2138 				(unsigned int)clnt->cl_vers,
2139 				rcu_dereference(clnt->cl_xprt)->servername);
2140 		error = -EPROTONOSUPPORT;
2141 		goto out_err;
2142 	case RPC_PROC_UNAVAIL:
2143 		dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2144 				"version %u on server %s\n",
2145 				task->tk_pid, __func__,
2146 				rpc_proc_name(task),
2147 				clnt->cl_prog, clnt->cl_vers,
2148 				rcu_dereference(clnt->cl_xprt)->servername);
2149 		error = -EOPNOTSUPP;
2150 		goto out_err;
2151 	case RPC_GARBAGE_ARGS:
2152 		dprintk("RPC: %5u %s: server saw garbage\n",
2153 				task->tk_pid, __func__);
2154 		break;			/* retry */
2155 	default:
2156 		dprintk("RPC: %5u %s: server accept status: %x\n",
2157 				task->tk_pid, __func__, n);
2158 		/* Also retry */
2159 	}
2160 
2161 out_garbage:
2162 	clnt->cl_stats->rpcgarbage++;
2163 	if (task->tk_garb_retry) {
2164 		task->tk_garb_retry--;
2165 		dprintk("RPC: %5u %s: retrying\n",
2166 				task->tk_pid, __func__);
2167 		task->tk_action = call_bind;
2168 out_retry:
2169 		return ERR_PTR(-EAGAIN);
2170 	}
2171 out_eio:
2172 	error = -EIO;
2173 out_err:
2174 	rpc_exit(task, error);
2175 	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2176 			__func__, error);
2177 	return ERR_PTR(error);
2178 out_overflow:
2179 	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2180 			__func__);
2181 	goto out_garbage;
2182 }
2183 
2184 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2185 {
2186 }
2187 
2188 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2189 {
2190 	return 0;
2191 }
2192 
2193 static struct rpc_procinfo rpcproc_null = {
2194 	.p_encode = rpcproc_encode_null,
2195 	.p_decode = rpcproc_decode_null,
2196 };
2197 
2198 static int rpc_ping(struct rpc_clnt *clnt)
2199 {
2200 	struct rpc_message msg = {
2201 		.rpc_proc = &rpcproc_null,
2202 	};
2203 	int err;
2204 	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2205 	err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2206 	put_rpccred(msg.rpc_cred);
2207 	return err;
2208 }
2209 
2210 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2211 {
2212 	struct rpc_message msg = {
2213 		.rpc_proc = &rpcproc_null,
2214 		.rpc_cred = cred,
2215 	};
2216 	struct rpc_task_setup task_setup_data = {
2217 		.rpc_client = clnt,
2218 		.rpc_message = &msg,
2219 		.callback_ops = &rpc_default_ops,
2220 		.flags = flags,
2221 	};
2222 	return rpc_run_task(&task_setup_data);
2223 }
2224 EXPORT_SYMBOL_GPL(rpc_call_null);
2225 
2226 #ifdef RPC_DEBUG
2227 static void rpc_show_header(void)
2228 {
2229 	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2230 		"-timeout ---ops--\n");
2231 }
2232 
2233 static void rpc_show_task(const struct rpc_clnt *clnt,
2234 			  const struct rpc_task *task)
2235 {
2236 	const char *rpc_waitq = "none";
2237 
2238 	if (RPC_IS_QUEUED(task))
2239 		rpc_waitq = rpc_qname(task->tk_waitqueue);
2240 
2241 	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2242 		task->tk_pid, task->tk_flags, task->tk_status,
2243 		clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2244 		clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2245 		task->tk_action, rpc_waitq);
2246 }
2247 
2248 void rpc_show_tasks(struct net *net)
2249 {
2250 	struct rpc_clnt *clnt;
2251 	struct rpc_task *task;
2252 	int header = 0;
2253 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2254 
2255 	spin_lock(&sn->rpc_client_lock);
2256 	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2257 		spin_lock(&clnt->cl_lock);
2258 		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2259 			if (!header) {
2260 				rpc_show_header();
2261 				header++;
2262 			}
2263 			rpc_show_task(clnt, task);
2264 		}
2265 		spin_unlock(&clnt->cl_lock);
2266 	}
2267 	spin_unlock(&sn->rpc_client_lock);
2268 }
2269 #endif
2270