xref: /openbmc/linux/net/sunrpc/clnt.c (revision ee8a99bd)
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -	RPC header generation and argument serialization.
9  *  -	Credential refresh.
10  *  -	TCP connect handling.
11  *  -	Retry of operation when it is suspected the operation failed because
12  *	of uid squashing on the server, or when the credentials were stale
13  *	and need to be refreshed, or when a packet was damaged in transit.
14  *	This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19 
20 
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34 
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41 
42 #include "sunrpc.h"
43 #include "netns.h"
44 
45 #ifdef RPC_DEBUG
46 # define RPCDBG_FACILITY	RPCDBG_CALL
47 #endif
48 
49 #define dprint_status(t)					\
50 	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
51 			__func__, t->tk_status)
52 
53 /*
54  * All RPC clients are linked into this list
55  */
56 
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58 
59 
60 static void	call_start(struct rpc_task *task);
61 static void	call_reserve(struct rpc_task *task);
62 static void	call_reserveresult(struct rpc_task *task);
63 static void	call_allocate(struct rpc_task *task);
64 static void	call_decode(struct rpc_task *task);
65 static void	call_bind(struct rpc_task *task);
66 static void	call_bind_status(struct rpc_task *task);
67 static void	call_transmit(struct rpc_task *task);
68 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
69 static void	call_bc_transmit(struct rpc_task *task);
70 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
71 static void	call_status(struct rpc_task *task);
72 static void	call_transmit_status(struct rpc_task *task);
73 static void	call_refresh(struct rpc_task *task);
74 static void	call_refreshresult(struct rpc_task *task);
75 static void	call_timeout(struct rpc_task *task);
76 static void	call_connect(struct rpc_task *task);
77 static void	call_connect_status(struct rpc_task *task);
78 
79 static __be32	*rpc_encode_header(struct rpc_task *task);
80 static __be32	*rpc_verify_header(struct rpc_task *task);
81 static int	rpc_ping(struct rpc_clnt *clnt);
82 
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85 	struct net *net = rpc_net_ns(clnt);
86 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87 
88 	spin_lock(&sn->rpc_client_lock);
89 	list_add(&clnt->cl_clients, &sn->all_clients);
90 	spin_unlock(&sn->rpc_client_lock);
91 }
92 
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95 	struct net *net = rpc_net_ns(clnt);
96 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97 
98 	spin_lock(&sn->rpc_client_lock);
99 	list_del(&clnt->cl_clients);
100 	spin_unlock(&sn->rpc_client_lock);
101 }
102 
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105 	if (clnt->cl_dentry) {
106 		if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107 			clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108 		rpc_remove_client_dir(clnt->cl_dentry);
109 	}
110 	clnt->cl_dentry = NULL;
111 }
112 
113 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114 {
115 	struct net *net = rpc_net_ns(clnt);
116 	struct super_block *pipefs_sb;
117 
118 	pipefs_sb = rpc_get_sb_net(net);
119 	if (pipefs_sb) {
120 		__rpc_clnt_remove_pipedir(clnt);
121 		rpc_put_sb_net(net);
122 	}
123 }
124 
125 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126 				    struct rpc_clnt *clnt,
127 				    const char *dir_name)
128 {
129 	static uint32_t clntid;
130 	char name[15];
131 	struct dentry *dir, *dentry;
132 
133 	dir = rpc_d_lookup_sb(sb, dir_name);
134 	if (dir == NULL) {
135 		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
136 		return dir;
137 	}
138 	for (;;) {
139 		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
140 		name[sizeof(name) - 1] = '\0';
141 		dentry = rpc_create_client_dir(dir, name, clnt);
142 		if (!IS_ERR(dentry))
143 			break;
144 		if (dentry == ERR_PTR(-EEXIST))
145 			continue;
146 		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
147 				" %s/%s, error %ld\n",
148 				dir_name, name, PTR_ERR(dentry));
149 		break;
150 	}
151 	dput(dir);
152 	return dentry;
153 }
154 
155 static int
156 rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name,
157 		  struct super_block *pipefs_sb)
158 {
159 	struct dentry *dentry;
160 
161 	clnt->cl_dentry = NULL;
162 	if (dir_name == NULL)
163 		return 0;
164 	dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
165 	if (IS_ERR(dentry))
166 		return PTR_ERR(dentry);
167 	clnt->cl_dentry = dentry;
168 	return 0;
169 }
170 
171 static inline int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
172 {
173 	if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
174 	    ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
175 		return 1;
176 	if ((event == RPC_PIPEFS_MOUNT) && atomic_read(&clnt->cl_count) == 0)
177 		return 1;
178 	return 0;
179 }
180 
181 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
182 				   struct super_block *sb)
183 {
184 	struct dentry *dentry;
185 	int err = 0;
186 
187 	switch (event) {
188 	case RPC_PIPEFS_MOUNT:
189 		dentry = rpc_setup_pipedir_sb(sb, clnt,
190 					      clnt->cl_program->pipe_dir_name);
191 		if (!dentry)
192 			return -ENOENT;
193 		if (IS_ERR(dentry))
194 			return PTR_ERR(dentry);
195 		clnt->cl_dentry = dentry;
196 		if (clnt->cl_auth->au_ops->pipes_create) {
197 			err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
198 			if (err)
199 				__rpc_clnt_remove_pipedir(clnt);
200 		}
201 		break;
202 	case RPC_PIPEFS_UMOUNT:
203 		__rpc_clnt_remove_pipedir(clnt);
204 		break;
205 	default:
206 		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
207 		return -ENOTSUPP;
208 	}
209 	return err;
210 }
211 
212 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
213 				struct super_block *sb)
214 {
215 	int error = 0;
216 
217 	for (;; clnt = clnt->cl_parent) {
218 		if (!rpc_clnt_skip_event(clnt, event))
219 			error = __rpc_clnt_handle_event(clnt, event, sb);
220 		if (error || clnt == clnt->cl_parent)
221 			break;
222 	}
223 	return error;
224 }
225 
226 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
227 {
228 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
229 	struct rpc_clnt *clnt;
230 
231 	spin_lock(&sn->rpc_client_lock);
232 	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
233 		if (clnt->cl_program->pipe_dir_name == NULL)
234 			continue;
235 		if (rpc_clnt_skip_event(clnt, event))
236 			continue;
237 		spin_unlock(&sn->rpc_client_lock);
238 		return clnt;
239 	}
240 	spin_unlock(&sn->rpc_client_lock);
241 	return NULL;
242 }
243 
244 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
245 			    void *ptr)
246 {
247 	struct super_block *sb = ptr;
248 	struct rpc_clnt *clnt;
249 	int error = 0;
250 
251 	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
252 		error = __rpc_pipefs_event(clnt, event, sb);
253 		if (error)
254 			break;
255 	}
256 	return error;
257 }
258 
259 static struct notifier_block rpc_clients_block = {
260 	.notifier_call	= rpc_pipefs_event,
261 	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
262 };
263 
264 int rpc_clients_notifier_register(void)
265 {
266 	return rpc_pipefs_notifier_register(&rpc_clients_block);
267 }
268 
269 void rpc_clients_notifier_unregister(void)
270 {
271 	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
272 }
273 
274 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
275 {
276 	clnt->cl_nodelen = strlen(nodename);
277 	if (clnt->cl_nodelen > UNX_MAXNODENAME)
278 		clnt->cl_nodelen = UNX_MAXNODENAME;
279 	memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
280 }
281 
282 static int rpc_client_register(const struct rpc_create_args *args,
283 			       struct rpc_clnt *clnt)
284 {
285 	const struct rpc_program *program = args->program;
286 	struct rpc_auth *auth;
287 	struct net *net = rpc_net_ns(clnt);
288 	struct super_block *pipefs_sb;
289 	int err;
290 
291 	pipefs_sb = rpc_get_sb_net(net);
292 	if (pipefs_sb) {
293 		err = rpc_setup_pipedir(clnt, program->pipe_dir_name, pipefs_sb);
294 		if (err)
295 			goto out;
296 	}
297 
298 	rpc_register_client(clnt);
299 	if (pipefs_sb)
300 		rpc_put_sb_net(net);
301 
302 	auth = rpcauth_create(args->authflavor, clnt);
303 	if (IS_ERR(auth)) {
304 		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
305 				args->authflavor);
306 		err = PTR_ERR(auth);
307 		goto err_auth;
308 	}
309 	return 0;
310 err_auth:
311 	pipefs_sb = rpc_get_sb_net(net);
312 	rpc_unregister_client(clnt);
313 	__rpc_clnt_remove_pipedir(clnt);
314 out:
315 	if (pipefs_sb)
316 		rpc_put_sb_net(net);
317 	return err;
318 }
319 
320 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
321 {
322 	const struct rpc_program *program = args->program;
323 	const struct rpc_version *version;
324 	struct rpc_clnt		*clnt = NULL;
325 	int err;
326 
327 	/* sanity check the name before trying to print it */
328 	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
329 			program->name, args->servername, xprt);
330 
331 	err = rpciod_up();
332 	if (err)
333 		goto out_no_rpciod;
334 
335 	err = -EINVAL;
336 	if (args->version >= program->nrvers)
337 		goto out_err;
338 	version = program->version[args->version];
339 	if (version == NULL)
340 		goto out_err;
341 
342 	err = -ENOMEM;
343 	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
344 	if (!clnt)
345 		goto out_err;
346 	clnt->cl_parent = clnt;
347 
348 	rcu_assign_pointer(clnt->cl_xprt, xprt);
349 	clnt->cl_procinfo = version->procs;
350 	clnt->cl_maxproc  = version->nrprocs;
351 	clnt->cl_protname = program->name;
352 	clnt->cl_prog     = args->prognumber ? : program->number;
353 	clnt->cl_vers     = version->number;
354 	clnt->cl_stats    = program->stats;
355 	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
356 	err = -ENOMEM;
357 	if (clnt->cl_metrics == NULL)
358 		goto out_no_stats;
359 	clnt->cl_program  = program;
360 	INIT_LIST_HEAD(&clnt->cl_tasks);
361 	spin_lock_init(&clnt->cl_lock);
362 
363 	if (!xprt_bound(xprt))
364 		clnt->cl_autobind = 1;
365 
366 	clnt->cl_timeout = xprt->timeout;
367 	if (args->timeout != NULL) {
368 		memcpy(&clnt->cl_timeout_default, args->timeout,
369 				sizeof(clnt->cl_timeout_default));
370 		clnt->cl_timeout = &clnt->cl_timeout_default;
371 	}
372 
373 	clnt->cl_rtt = &clnt->cl_rtt_default;
374 	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
375 	clnt->cl_principal = NULL;
376 	if (args->client_name) {
377 		clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
378 		if (!clnt->cl_principal)
379 			goto out_no_principal;
380 	}
381 
382 	atomic_set(&clnt->cl_count, 1);
383 
384 	/* save the nodename */
385 	rpc_clnt_set_nodename(clnt, utsname()->nodename);
386 
387 	err = rpc_client_register(args, clnt);
388 	if (err)
389 		goto out_no_path;
390 	return clnt;
391 
392 out_no_path:
393 	kfree(clnt->cl_principal);
394 out_no_principal:
395 	rpc_free_iostats(clnt->cl_metrics);
396 out_no_stats:
397 	kfree(clnt);
398 out_err:
399 	rpciod_down();
400 out_no_rpciod:
401 	xprt_put(xprt);
402 	return ERR_PTR(err);
403 }
404 
405 /**
406  * rpc_create - create an RPC client and transport with one call
407  * @args: rpc_clnt create argument structure
408  *
409  * Creates and initializes an RPC transport and an RPC client.
410  *
411  * It can ping the server in order to determine if it is up, and to see if
412  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
413  * this behavior so asynchronous tasks can also use rpc_create.
414  */
415 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
416 {
417 	struct rpc_xprt *xprt;
418 	struct rpc_clnt *clnt;
419 	struct xprt_create xprtargs = {
420 		.net = args->net,
421 		.ident = args->protocol,
422 		.srcaddr = args->saddress,
423 		.dstaddr = args->address,
424 		.addrlen = args->addrsize,
425 		.servername = args->servername,
426 		.bc_xprt = args->bc_xprt,
427 	};
428 	char servername[48];
429 
430 	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
431 		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
432 	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
433 		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
434 	/*
435 	 * If the caller chooses not to specify a hostname, whip
436 	 * up a string representation of the passed-in address.
437 	 */
438 	if (xprtargs.servername == NULL) {
439 		struct sockaddr_un *sun =
440 				(struct sockaddr_un *)args->address;
441 		struct sockaddr_in *sin =
442 				(struct sockaddr_in *)args->address;
443 		struct sockaddr_in6 *sin6 =
444 				(struct sockaddr_in6 *)args->address;
445 
446 		servername[0] = '\0';
447 		switch (args->address->sa_family) {
448 		case AF_LOCAL:
449 			snprintf(servername, sizeof(servername), "%s",
450 				 sun->sun_path);
451 			break;
452 		case AF_INET:
453 			snprintf(servername, sizeof(servername), "%pI4",
454 				 &sin->sin_addr.s_addr);
455 			break;
456 		case AF_INET6:
457 			snprintf(servername, sizeof(servername), "%pI6",
458 				 &sin6->sin6_addr);
459 			break;
460 		default:
461 			/* caller wants default server name, but
462 			 * address family isn't recognized. */
463 			return ERR_PTR(-EINVAL);
464 		}
465 		xprtargs.servername = servername;
466 	}
467 
468 	xprt = xprt_create_transport(&xprtargs);
469 	if (IS_ERR(xprt))
470 		return (struct rpc_clnt *)xprt;
471 
472 	/*
473 	 * By default, kernel RPC client connects from a reserved port.
474 	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
475 	 * but it is always enabled for rpciod, which handles the connect
476 	 * operation.
477 	 */
478 	xprt->resvport = 1;
479 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
480 		xprt->resvport = 0;
481 
482 	clnt = rpc_new_client(args, xprt);
483 	if (IS_ERR(clnt))
484 		return clnt;
485 
486 	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
487 		int err = rpc_ping(clnt);
488 		if (err != 0) {
489 			rpc_shutdown_client(clnt);
490 			return ERR_PTR(err);
491 		}
492 	}
493 
494 	clnt->cl_softrtry = 1;
495 	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
496 		clnt->cl_softrtry = 0;
497 
498 	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
499 		clnt->cl_autobind = 1;
500 	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
501 		clnt->cl_discrtry = 1;
502 	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
503 		clnt->cl_chatty = 1;
504 
505 	return clnt;
506 }
507 EXPORT_SYMBOL_GPL(rpc_create);
508 
509 /*
510  * This function clones the RPC client structure. It allows us to share the
511  * same transport while varying parameters such as the authentication
512  * flavour.
513  */
514 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
515 					   struct rpc_clnt *clnt)
516 {
517 	struct rpc_xprt *xprt;
518 	struct rpc_clnt *new;
519 	int err;
520 
521 	err = -ENOMEM;
522 	rcu_read_lock();
523 	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
524 	rcu_read_unlock();
525 	if (xprt == NULL)
526 		goto out_err;
527 	args->servername = xprt->servername;
528 
529 	new = rpc_new_client(args, xprt);
530 	if (IS_ERR(new)) {
531 		err = PTR_ERR(new);
532 		goto out_err;
533 	}
534 
535 	atomic_inc(&clnt->cl_count);
536 	new->cl_parent = clnt;
537 
538 	/* Turn off autobind on clones */
539 	new->cl_autobind = 0;
540 	new->cl_softrtry = clnt->cl_softrtry;
541 	new->cl_discrtry = clnt->cl_discrtry;
542 	new->cl_chatty = clnt->cl_chatty;
543 	return new;
544 
545 out_err:
546 	dprintk("RPC:       %s: returned error %d\n", __func__, err);
547 	return ERR_PTR(err);
548 }
549 
550 /**
551  * rpc_clone_client - Clone an RPC client structure
552  *
553  * @clnt: RPC client whose parameters are copied
554  *
555  * Returns a fresh RPC client or an ERR_PTR.
556  */
557 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
558 {
559 	struct rpc_create_args args = {
560 		.program	= clnt->cl_program,
561 		.prognumber	= clnt->cl_prog,
562 		.version	= clnt->cl_vers,
563 		.authflavor	= clnt->cl_auth->au_flavor,
564 		.client_name	= clnt->cl_principal,
565 	};
566 	return __rpc_clone_client(&args, clnt);
567 }
568 EXPORT_SYMBOL_GPL(rpc_clone_client);
569 
570 /**
571  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
572  *
573  * @clnt: RPC client whose parameters are copied
574  * @flavor: security flavor for new client
575  *
576  * Returns a fresh RPC client or an ERR_PTR.
577  */
578 struct rpc_clnt *
579 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
580 {
581 	struct rpc_create_args args = {
582 		.program	= clnt->cl_program,
583 		.prognumber	= clnt->cl_prog,
584 		.version	= clnt->cl_vers,
585 		.authflavor	= flavor,
586 		.client_name	= clnt->cl_principal,
587 	};
588 	return __rpc_clone_client(&args, clnt);
589 }
590 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
591 
592 /*
593  * Kill all tasks for the given client.
594  * XXX: kill their descendants as well?
595  */
596 void rpc_killall_tasks(struct rpc_clnt *clnt)
597 {
598 	struct rpc_task	*rovr;
599 
600 
601 	if (list_empty(&clnt->cl_tasks))
602 		return;
603 	dprintk("RPC:       killing all tasks for client %p\n", clnt);
604 	/*
605 	 * Spin lock all_tasks to prevent changes...
606 	 */
607 	spin_lock(&clnt->cl_lock);
608 	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
609 		if (!RPC_IS_ACTIVATED(rovr))
610 			continue;
611 		if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
612 			rovr->tk_flags |= RPC_TASK_KILLED;
613 			rpc_exit(rovr, -EIO);
614 			if (RPC_IS_QUEUED(rovr))
615 				rpc_wake_up_queued_task(rovr->tk_waitqueue,
616 							rovr);
617 		}
618 	}
619 	spin_unlock(&clnt->cl_lock);
620 }
621 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
622 
623 /*
624  * Properly shut down an RPC client, terminating all outstanding
625  * requests.
626  */
627 void rpc_shutdown_client(struct rpc_clnt *clnt)
628 {
629 	might_sleep();
630 
631 	dprintk_rcu("RPC:       shutting down %s client for %s\n",
632 			clnt->cl_protname,
633 			rcu_dereference(clnt->cl_xprt)->servername);
634 
635 	while (!list_empty(&clnt->cl_tasks)) {
636 		rpc_killall_tasks(clnt);
637 		wait_event_timeout(destroy_wait,
638 			list_empty(&clnt->cl_tasks), 1*HZ);
639 	}
640 
641 	rpc_release_client(clnt);
642 }
643 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
644 
645 /*
646  * Free an RPC client
647  */
648 static void
649 rpc_free_client(struct rpc_clnt *clnt)
650 {
651 	dprintk_rcu("RPC:       destroying %s client for %s\n",
652 			clnt->cl_protname,
653 			rcu_dereference(clnt->cl_xprt)->servername);
654 	if (clnt->cl_parent != clnt)
655 		rpc_release_client(clnt->cl_parent);
656 	rpc_clnt_remove_pipedir(clnt);
657 	rpc_unregister_client(clnt);
658 	rpc_free_iostats(clnt->cl_metrics);
659 	kfree(clnt->cl_principal);
660 	clnt->cl_metrics = NULL;
661 	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
662 	rpciod_down();
663 	kfree(clnt);
664 }
665 
666 /*
667  * Free an RPC client
668  */
669 static void
670 rpc_free_auth(struct rpc_clnt *clnt)
671 {
672 	if (clnt->cl_auth == NULL) {
673 		rpc_free_client(clnt);
674 		return;
675 	}
676 
677 	/*
678 	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
679 	 *       release remaining GSS contexts. This mechanism ensures
680 	 *       that it can do so safely.
681 	 */
682 	atomic_inc(&clnt->cl_count);
683 	rpcauth_release(clnt->cl_auth);
684 	clnt->cl_auth = NULL;
685 	if (atomic_dec_and_test(&clnt->cl_count))
686 		rpc_free_client(clnt);
687 }
688 
689 /*
690  * Release reference to the RPC client
691  */
692 void
693 rpc_release_client(struct rpc_clnt *clnt)
694 {
695 	dprintk("RPC:       rpc_release_client(%p)\n", clnt);
696 
697 	if (list_empty(&clnt->cl_tasks))
698 		wake_up(&destroy_wait);
699 	if (atomic_dec_and_test(&clnt->cl_count))
700 		rpc_free_auth(clnt);
701 }
702 EXPORT_SYMBOL_GPL(rpc_release_client);
703 
704 /**
705  * rpc_bind_new_program - bind a new RPC program to an existing client
706  * @old: old rpc_client
707  * @program: rpc program to set
708  * @vers: rpc program version
709  *
710  * Clones the rpc client and sets up a new RPC program. This is mainly
711  * of use for enabling different RPC programs to share the same transport.
712  * The Sun NFSv2/v3 ACL protocol can do this.
713  */
714 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
715 				      const struct rpc_program *program,
716 				      u32 vers)
717 {
718 	struct rpc_create_args args = {
719 		.program	= program,
720 		.prognumber	= program->number,
721 		.version	= vers,
722 		.authflavor	= old->cl_auth->au_flavor,
723 		.client_name	= old->cl_principal,
724 	};
725 	struct rpc_clnt *clnt;
726 	int err;
727 
728 	clnt = __rpc_clone_client(&args, old);
729 	if (IS_ERR(clnt))
730 		goto out;
731 	err = rpc_ping(clnt);
732 	if (err != 0) {
733 		rpc_shutdown_client(clnt);
734 		clnt = ERR_PTR(err);
735 	}
736 out:
737 	return clnt;
738 }
739 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
740 
741 void rpc_task_release_client(struct rpc_task *task)
742 {
743 	struct rpc_clnt *clnt = task->tk_client;
744 
745 	if (clnt != NULL) {
746 		/* Remove from client task list */
747 		spin_lock(&clnt->cl_lock);
748 		list_del(&task->tk_task);
749 		spin_unlock(&clnt->cl_lock);
750 		task->tk_client = NULL;
751 
752 		rpc_release_client(clnt);
753 	}
754 }
755 
756 static
757 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
758 {
759 	if (clnt != NULL) {
760 		rpc_task_release_client(task);
761 		task->tk_client = clnt;
762 		atomic_inc(&clnt->cl_count);
763 		if (clnt->cl_softrtry)
764 			task->tk_flags |= RPC_TASK_SOFT;
765 		if (sk_memalloc_socks()) {
766 			struct rpc_xprt *xprt;
767 
768 			rcu_read_lock();
769 			xprt = rcu_dereference(clnt->cl_xprt);
770 			if (xprt->swapper)
771 				task->tk_flags |= RPC_TASK_SWAPPER;
772 			rcu_read_unlock();
773 		}
774 		/* Add to the client's list of all tasks */
775 		spin_lock(&clnt->cl_lock);
776 		list_add_tail(&task->tk_task, &clnt->cl_tasks);
777 		spin_unlock(&clnt->cl_lock);
778 	}
779 }
780 
781 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
782 {
783 	rpc_task_release_client(task);
784 	rpc_task_set_client(task, clnt);
785 }
786 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
787 
788 
789 static void
790 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
791 {
792 	if (msg != NULL) {
793 		task->tk_msg.rpc_proc = msg->rpc_proc;
794 		task->tk_msg.rpc_argp = msg->rpc_argp;
795 		task->tk_msg.rpc_resp = msg->rpc_resp;
796 		if (msg->rpc_cred != NULL)
797 			task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
798 	}
799 }
800 
801 /*
802  * Default callback for async RPC calls
803  */
804 static void
805 rpc_default_callback(struct rpc_task *task, void *data)
806 {
807 }
808 
809 static const struct rpc_call_ops rpc_default_ops = {
810 	.rpc_call_done = rpc_default_callback,
811 };
812 
813 /**
814  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
815  * @task_setup_data: pointer to task initialisation data
816  */
817 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
818 {
819 	struct rpc_task *task;
820 
821 	task = rpc_new_task(task_setup_data);
822 	if (IS_ERR(task))
823 		goto out;
824 
825 	rpc_task_set_client(task, task_setup_data->rpc_client);
826 	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
827 
828 	if (task->tk_action == NULL)
829 		rpc_call_start(task);
830 
831 	atomic_inc(&task->tk_count);
832 	rpc_execute(task);
833 out:
834 	return task;
835 }
836 EXPORT_SYMBOL_GPL(rpc_run_task);
837 
838 /**
839  * rpc_call_sync - Perform a synchronous RPC call
840  * @clnt: pointer to RPC client
841  * @msg: RPC call parameters
842  * @flags: RPC call flags
843  */
844 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
845 {
846 	struct rpc_task	*task;
847 	struct rpc_task_setup task_setup_data = {
848 		.rpc_client = clnt,
849 		.rpc_message = msg,
850 		.callback_ops = &rpc_default_ops,
851 		.flags = flags,
852 	};
853 	int status;
854 
855 	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
856 	if (flags & RPC_TASK_ASYNC) {
857 		rpc_release_calldata(task_setup_data.callback_ops,
858 			task_setup_data.callback_data);
859 		return -EINVAL;
860 	}
861 
862 	task = rpc_run_task(&task_setup_data);
863 	if (IS_ERR(task))
864 		return PTR_ERR(task);
865 	status = task->tk_status;
866 	rpc_put_task(task);
867 	return status;
868 }
869 EXPORT_SYMBOL_GPL(rpc_call_sync);
870 
871 /**
872  * rpc_call_async - Perform an asynchronous RPC call
873  * @clnt: pointer to RPC client
874  * @msg: RPC call parameters
875  * @flags: RPC call flags
876  * @tk_ops: RPC call ops
877  * @data: user call data
878  */
879 int
880 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
881 	       const struct rpc_call_ops *tk_ops, void *data)
882 {
883 	struct rpc_task	*task;
884 	struct rpc_task_setup task_setup_data = {
885 		.rpc_client = clnt,
886 		.rpc_message = msg,
887 		.callback_ops = tk_ops,
888 		.callback_data = data,
889 		.flags = flags|RPC_TASK_ASYNC,
890 	};
891 
892 	task = rpc_run_task(&task_setup_data);
893 	if (IS_ERR(task))
894 		return PTR_ERR(task);
895 	rpc_put_task(task);
896 	return 0;
897 }
898 EXPORT_SYMBOL_GPL(rpc_call_async);
899 
900 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
901 /**
902  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
903  * rpc_execute against it
904  * @req: RPC request
905  * @tk_ops: RPC call ops
906  */
907 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
908 				const struct rpc_call_ops *tk_ops)
909 {
910 	struct rpc_task *task;
911 	struct xdr_buf *xbufp = &req->rq_snd_buf;
912 	struct rpc_task_setup task_setup_data = {
913 		.callback_ops = tk_ops,
914 	};
915 
916 	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
917 	/*
918 	 * Create an rpc_task to send the data
919 	 */
920 	task = rpc_new_task(&task_setup_data);
921 	if (IS_ERR(task)) {
922 		xprt_free_bc_request(req);
923 		goto out;
924 	}
925 	task->tk_rqstp = req;
926 
927 	/*
928 	 * Set up the xdr_buf length.
929 	 * This also indicates that the buffer is XDR encoded already.
930 	 */
931 	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
932 			xbufp->tail[0].iov_len;
933 
934 	task->tk_action = call_bc_transmit;
935 	atomic_inc(&task->tk_count);
936 	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
937 	rpc_execute(task);
938 
939 out:
940 	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
941 	return task;
942 }
943 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
944 
945 void
946 rpc_call_start(struct rpc_task *task)
947 {
948 	task->tk_action = call_start;
949 }
950 EXPORT_SYMBOL_GPL(rpc_call_start);
951 
952 /**
953  * rpc_peeraddr - extract remote peer address from clnt's xprt
954  * @clnt: RPC client structure
955  * @buf: target buffer
956  * @bufsize: length of target buffer
957  *
958  * Returns the number of bytes that are actually in the stored address.
959  */
960 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
961 {
962 	size_t bytes;
963 	struct rpc_xprt *xprt;
964 
965 	rcu_read_lock();
966 	xprt = rcu_dereference(clnt->cl_xprt);
967 
968 	bytes = xprt->addrlen;
969 	if (bytes > bufsize)
970 		bytes = bufsize;
971 	memcpy(buf, &xprt->addr, bytes);
972 	rcu_read_unlock();
973 
974 	return bytes;
975 }
976 EXPORT_SYMBOL_GPL(rpc_peeraddr);
977 
978 /**
979  * rpc_peeraddr2str - return remote peer address in printable format
980  * @clnt: RPC client structure
981  * @format: address format
982  *
983  * NB: the lifetime of the memory referenced by the returned pointer is
984  * the same as the rpc_xprt itself.  As long as the caller uses this
985  * pointer, it must hold the RCU read lock.
986  */
987 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
988 			     enum rpc_display_format_t format)
989 {
990 	struct rpc_xprt *xprt;
991 
992 	xprt = rcu_dereference(clnt->cl_xprt);
993 
994 	if (xprt->address_strings[format] != NULL)
995 		return xprt->address_strings[format];
996 	else
997 		return "unprintable";
998 }
999 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1000 
1001 static const struct sockaddr_in rpc_inaddr_loopback = {
1002 	.sin_family		= AF_INET,
1003 	.sin_addr.s_addr	= htonl(INADDR_ANY),
1004 };
1005 
1006 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1007 	.sin6_family		= AF_INET6,
1008 	.sin6_addr		= IN6ADDR_ANY_INIT,
1009 };
1010 
1011 /*
1012  * Try a getsockname() on a connected datagram socket.  Using a
1013  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1014  * This conserves the ephemeral port number space.
1015  *
1016  * Returns zero and fills in "buf" if successful; otherwise, a
1017  * negative errno is returned.
1018  */
1019 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1020 			struct sockaddr *buf, int buflen)
1021 {
1022 	struct socket *sock;
1023 	int err;
1024 
1025 	err = __sock_create(net, sap->sa_family,
1026 				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1027 	if (err < 0) {
1028 		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1029 		goto out;
1030 	}
1031 
1032 	switch (sap->sa_family) {
1033 	case AF_INET:
1034 		err = kernel_bind(sock,
1035 				(struct sockaddr *)&rpc_inaddr_loopback,
1036 				sizeof(rpc_inaddr_loopback));
1037 		break;
1038 	case AF_INET6:
1039 		err = kernel_bind(sock,
1040 				(struct sockaddr *)&rpc_in6addr_loopback,
1041 				sizeof(rpc_in6addr_loopback));
1042 		break;
1043 	default:
1044 		err = -EAFNOSUPPORT;
1045 		goto out;
1046 	}
1047 	if (err < 0) {
1048 		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1049 		goto out_release;
1050 	}
1051 
1052 	err = kernel_connect(sock, sap, salen, 0);
1053 	if (err < 0) {
1054 		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1055 		goto out_release;
1056 	}
1057 
1058 	err = kernel_getsockname(sock, buf, &buflen);
1059 	if (err < 0) {
1060 		dprintk("RPC:       getsockname failed (%d)\n", err);
1061 		goto out_release;
1062 	}
1063 
1064 	err = 0;
1065 	if (buf->sa_family == AF_INET6) {
1066 		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1067 		sin6->sin6_scope_id = 0;
1068 	}
1069 	dprintk("RPC:       %s succeeded\n", __func__);
1070 
1071 out_release:
1072 	sock_release(sock);
1073 out:
1074 	return err;
1075 }
1076 
1077 /*
1078  * Scraping a connected socket failed, so we don't have a useable
1079  * local address.  Fallback: generate an address that will prevent
1080  * the server from calling us back.
1081  *
1082  * Returns zero and fills in "buf" if successful; otherwise, a
1083  * negative errno is returned.
1084  */
1085 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1086 {
1087 	switch (family) {
1088 	case AF_INET:
1089 		if (buflen < sizeof(rpc_inaddr_loopback))
1090 			return -EINVAL;
1091 		memcpy(buf, &rpc_inaddr_loopback,
1092 				sizeof(rpc_inaddr_loopback));
1093 		break;
1094 	case AF_INET6:
1095 		if (buflen < sizeof(rpc_in6addr_loopback))
1096 			return -EINVAL;
1097 		memcpy(buf, &rpc_in6addr_loopback,
1098 				sizeof(rpc_in6addr_loopback));
1099 	default:
1100 		dprintk("RPC:       %s: address family not supported\n",
1101 			__func__);
1102 		return -EAFNOSUPPORT;
1103 	}
1104 	dprintk("RPC:       %s: succeeded\n", __func__);
1105 	return 0;
1106 }
1107 
1108 /**
1109  * rpc_localaddr - discover local endpoint address for an RPC client
1110  * @clnt: RPC client structure
1111  * @buf: target buffer
1112  * @buflen: size of target buffer, in bytes
1113  *
1114  * Returns zero and fills in "buf" and "buflen" if successful;
1115  * otherwise, a negative errno is returned.
1116  *
1117  * This works even if the underlying transport is not currently connected,
1118  * or if the upper layer never previously provided a source address.
1119  *
1120  * The result of this function call is transient: multiple calls in
1121  * succession may give different results, depending on how local
1122  * networking configuration changes over time.
1123  */
1124 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1125 {
1126 	struct sockaddr_storage address;
1127 	struct sockaddr *sap = (struct sockaddr *)&address;
1128 	struct rpc_xprt *xprt;
1129 	struct net *net;
1130 	size_t salen;
1131 	int err;
1132 
1133 	rcu_read_lock();
1134 	xprt = rcu_dereference(clnt->cl_xprt);
1135 	salen = xprt->addrlen;
1136 	memcpy(sap, &xprt->addr, salen);
1137 	net = get_net(xprt->xprt_net);
1138 	rcu_read_unlock();
1139 
1140 	rpc_set_port(sap, 0);
1141 	err = rpc_sockname(net, sap, salen, buf, buflen);
1142 	put_net(net);
1143 	if (err != 0)
1144 		/* Couldn't discover local address, return ANYADDR */
1145 		return rpc_anyaddr(sap->sa_family, buf, buflen);
1146 	return 0;
1147 }
1148 EXPORT_SYMBOL_GPL(rpc_localaddr);
1149 
1150 void
1151 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1152 {
1153 	struct rpc_xprt *xprt;
1154 
1155 	rcu_read_lock();
1156 	xprt = rcu_dereference(clnt->cl_xprt);
1157 	if (xprt->ops->set_buffer_size)
1158 		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1159 	rcu_read_unlock();
1160 }
1161 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1162 
1163 /**
1164  * rpc_protocol - Get transport protocol number for an RPC client
1165  * @clnt: RPC client to query
1166  *
1167  */
1168 int rpc_protocol(struct rpc_clnt *clnt)
1169 {
1170 	int protocol;
1171 
1172 	rcu_read_lock();
1173 	protocol = rcu_dereference(clnt->cl_xprt)->prot;
1174 	rcu_read_unlock();
1175 	return protocol;
1176 }
1177 EXPORT_SYMBOL_GPL(rpc_protocol);
1178 
1179 /**
1180  * rpc_net_ns - Get the network namespace for this RPC client
1181  * @clnt: RPC client to query
1182  *
1183  */
1184 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1185 {
1186 	struct net *ret;
1187 
1188 	rcu_read_lock();
1189 	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1190 	rcu_read_unlock();
1191 	return ret;
1192 }
1193 EXPORT_SYMBOL_GPL(rpc_net_ns);
1194 
1195 /**
1196  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1197  * @clnt: RPC client to query
1198  *
1199  * For stream transports, this is one RPC record fragment (see RFC
1200  * 1831), as we don't support multi-record requests yet.  For datagram
1201  * transports, this is the size of an IP packet minus the IP, UDP, and
1202  * RPC header sizes.
1203  */
1204 size_t rpc_max_payload(struct rpc_clnt *clnt)
1205 {
1206 	size_t ret;
1207 
1208 	rcu_read_lock();
1209 	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1210 	rcu_read_unlock();
1211 	return ret;
1212 }
1213 EXPORT_SYMBOL_GPL(rpc_max_payload);
1214 
1215 /**
1216  * rpc_get_timeout - Get timeout for transport in units of HZ
1217  * @clnt: RPC client to query
1218  */
1219 unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1220 {
1221 	unsigned long ret;
1222 
1223 	rcu_read_lock();
1224 	ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1225 	rcu_read_unlock();
1226 	return ret;
1227 }
1228 EXPORT_SYMBOL_GPL(rpc_get_timeout);
1229 
1230 /**
1231  * rpc_force_rebind - force transport to check that remote port is unchanged
1232  * @clnt: client to rebind
1233  *
1234  */
1235 void rpc_force_rebind(struct rpc_clnt *clnt)
1236 {
1237 	if (clnt->cl_autobind) {
1238 		rcu_read_lock();
1239 		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1240 		rcu_read_unlock();
1241 	}
1242 }
1243 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1244 
1245 /*
1246  * Restart an (async) RPC call from the call_prepare state.
1247  * Usually called from within the exit handler.
1248  */
1249 int
1250 rpc_restart_call_prepare(struct rpc_task *task)
1251 {
1252 	if (RPC_ASSASSINATED(task))
1253 		return 0;
1254 	task->tk_action = call_start;
1255 	if (task->tk_ops->rpc_call_prepare != NULL)
1256 		task->tk_action = rpc_prepare_task;
1257 	return 1;
1258 }
1259 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1260 
1261 /*
1262  * Restart an (async) RPC call. Usually called from within the
1263  * exit handler.
1264  */
1265 int
1266 rpc_restart_call(struct rpc_task *task)
1267 {
1268 	if (RPC_ASSASSINATED(task))
1269 		return 0;
1270 	task->tk_action = call_start;
1271 	return 1;
1272 }
1273 EXPORT_SYMBOL_GPL(rpc_restart_call);
1274 
1275 #ifdef RPC_DEBUG
1276 static const char *rpc_proc_name(const struct rpc_task *task)
1277 {
1278 	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1279 
1280 	if (proc) {
1281 		if (proc->p_name)
1282 			return proc->p_name;
1283 		else
1284 			return "NULL";
1285 	} else
1286 		return "no proc";
1287 }
1288 #endif
1289 
1290 /*
1291  * 0.  Initial state
1292  *
1293  *     Other FSM states can be visited zero or more times, but
1294  *     this state is visited exactly once for each RPC.
1295  */
1296 static void
1297 call_start(struct rpc_task *task)
1298 {
1299 	struct rpc_clnt	*clnt = task->tk_client;
1300 
1301 	dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1302 			clnt->cl_protname, clnt->cl_vers,
1303 			rpc_proc_name(task),
1304 			(RPC_IS_ASYNC(task) ? "async" : "sync"));
1305 
1306 	/* Increment call count */
1307 	task->tk_msg.rpc_proc->p_count++;
1308 	clnt->cl_stats->rpccnt++;
1309 	task->tk_action = call_reserve;
1310 }
1311 
1312 /*
1313  * 1.	Reserve an RPC call slot
1314  */
1315 static void
1316 call_reserve(struct rpc_task *task)
1317 {
1318 	dprint_status(task);
1319 
1320 	task->tk_status  = 0;
1321 	task->tk_action  = call_reserveresult;
1322 	xprt_reserve(task);
1323 }
1324 
1325 static void call_retry_reserve(struct rpc_task *task);
1326 
1327 /*
1328  * 1b.	Grok the result of xprt_reserve()
1329  */
1330 static void
1331 call_reserveresult(struct rpc_task *task)
1332 {
1333 	int status = task->tk_status;
1334 
1335 	dprint_status(task);
1336 
1337 	/*
1338 	 * After a call to xprt_reserve(), we must have either
1339 	 * a request slot or else an error status.
1340 	 */
1341 	task->tk_status = 0;
1342 	if (status >= 0) {
1343 		if (task->tk_rqstp) {
1344 			task->tk_action = call_refresh;
1345 			return;
1346 		}
1347 
1348 		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1349 				__func__, status);
1350 		rpc_exit(task, -EIO);
1351 		return;
1352 	}
1353 
1354 	/*
1355 	 * Even though there was an error, we may have acquired
1356 	 * a request slot somehow.  Make sure not to leak it.
1357 	 */
1358 	if (task->tk_rqstp) {
1359 		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1360 				__func__, status);
1361 		xprt_release(task);
1362 	}
1363 
1364 	switch (status) {
1365 	case -ENOMEM:
1366 		rpc_delay(task, HZ >> 2);
1367 	case -EAGAIN:	/* woken up; retry */
1368 		task->tk_action = call_retry_reserve;
1369 		return;
1370 	case -EIO:	/* probably a shutdown */
1371 		break;
1372 	default:
1373 		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1374 				__func__, status);
1375 		break;
1376 	}
1377 	rpc_exit(task, status);
1378 }
1379 
1380 /*
1381  * 1c.	Retry reserving an RPC call slot
1382  */
1383 static void
1384 call_retry_reserve(struct rpc_task *task)
1385 {
1386 	dprint_status(task);
1387 
1388 	task->tk_status  = 0;
1389 	task->tk_action  = call_reserveresult;
1390 	xprt_retry_reserve(task);
1391 }
1392 
1393 /*
1394  * 2.	Bind and/or refresh the credentials
1395  */
1396 static void
1397 call_refresh(struct rpc_task *task)
1398 {
1399 	dprint_status(task);
1400 
1401 	task->tk_action = call_refreshresult;
1402 	task->tk_status = 0;
1403 	task->tk_client->cl_stats->rpcauthrefresh++;
1404 	rpcauth_refreshcred(task);
1405 }
1406 
1407 /*
1408  * 2a.	Process the results of a credential refresh
1409  */
1410 static void
1411 call_refreshresult(struct rpc_task *task)
1412 {
1413 	int status = task->tk_status;
1414 
1415 	dprint_status(task);
1416 
1417 	task->tk_status = 0;
1418 	task->tk_action = call_refresh;
1419 	switch (status) {
1420 	case 0:
1421 		if (rpcauth_uptodatecred(task))
1422 			task->tk_action = call_allocate;
1423 		return;
1424 	case -ETIMEDOUT:
1425 		rpc_delay(task, 3*HZ);
1426 	case -EKEYEXPIRED:
1427 	case -EAGAIN:
1428 		status = -EACCES;
1429 		if (!task->tk_cred_retry)
1430 			break;
1431 		task->tk_cred_retry--;
1432 		dprintk("RPC: %5u %s: retry refresh creds\n",
1433 				task->tk_pid, __func__);
1434 		return;
1435 	}
1436 	dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1437 				task->tk_pid, __func__, status);
1438 	rpc_exit(task, status);
1439 }
1440 
1441 /*
1442  * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1443  *	(Note: buffer memory is freed in xprt_release).
1444  */
1445 static void
1446 call_allocate(struct rpc_task *task)
1447 {
1448 	unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1449 	struct rpc_rqst *req = task->tk_rqstp;
1450 	struct rpc_xprt *xprt = req->rq_xprt;
1451 	struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1452 
1453 	dprint_status(task);
1454 
1455 	task->tk_status = 0;
1456 	task->tk_action = call_bind;
1457 
1458 	if (req->rq_buffer)
1459 		return;
1460 
1461 	if (proc->p_proc != 0) {
1462 		BUG_ON(proc->p_arglen == 0);
1463 		if (proc->p_decode != NULL)
1464 			BUG_ON(proc->p_replen == 0);
1465 	}
1466 
1467 	/*
1468 	 * Calculate the size (in quads) of the RPC call
1469 	 * and reply headers, and convert both values
1470 	 * to byte sizes.
1471 	 */
1472 	req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1473 	req->rq_callsize <<= 2;
1474 	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1475 	req->rq_rcvsize <<= 2;
1476 
1477 	req->rq_buffer = xprt->ops->buf_alloc(task,
1478 					req->rq_callsize + req->rq_rcvsize);
1479 	if (req->rq_buffer != NULL)
1480 		return;
1481 
1482 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1483 
1484 	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1485 		task->tk_action = call_allocate;
1486 		rpc_delay(task, HZ>>4);
1487 		return;
1488 	}
1489 
1490 	rpc_exit(task, -ERESTARTSYS);
1491 }
1492 
1493 static inline int
1494 rpc_task_need_encode(struct rpc_task *task)
1495 {
1496 	return task->tk_rqstp->rq_snd_buf.len == 0;
1497 }
1498 
1499 static inline void
1500 rpc_task_force_reencode(struct rpc_task *task)
1501 {
1502 	task->tk_rqstp->rq_snd_buf.len = 0;
1503 	task->tk_rqstp->rq_bytes_sent = 0;
1504 }
1505 
1506 static inline void
1507 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1508 {
1509 	buf->head[0].iov_base = start;
1510 	buf->head[0].iov_len = len;
1511 	buf->tail[0].iov_len = 0;
1512 	buf->page_len = 0;
1513 	buf->flags = 0;
1514 	buf->len = 0;
1515 	buf->buflen = len;
1516 }
1517 
1518 /*
1519  * 3.	Encode arguments of an RPC call
1520  */
1521 static void
1522 rpc_xdr_encode(struct rpc_task *task)
1523 {
1524 	struct rpc_rqst	*req = task->tk_rqstp;
1525 	kxdreproc_t	encode;
1526 	__be32		*p;
1527 
1528 	dprint_status(task);
1529 
1530 	rpc_xdr_buf_init(&req->rq_snd_buf,
1531 			 req->rq_buffer,
1532 			 req->rq_callsize);
1533 	rpc_xdr_buf_init(&req->rq_rcv_buf,
1534 			 (char *)req->rq_buffer + req->rq_callsize,
1535 			 req->rq_rcvsize);
1536 
1537 	p = rpc_encode_header(task);
1538 	if (p == NULL) {
1539 		printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1540 		rpc_exit(task, -EIO);
1541 		return;
1542 	}
1543 
1544 	encode = task->tk_msg.rpc_proc->p_encode;
1545 	if (encode == NULL)
1546 		return;
1547 
1548 	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1549 			task->tk_msg.rpc_argp);
1550 }
1551 
1552 /*
1553  * 4.	Get the server port number if not yet set
1554  */
1555 static void
1556 call_bind(struct rpc_task *task)
1557 {
1558 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1559 
1560 	dprint_status(task);
1561 
1562 	task->tk_action = call_connect;
1563 	if (!xprt_bound(xprt)) {
1564 		task->tk_action = call_bind_status;
1565 		task->tk_timeout = xprt->bind_timeout;
1566 		xprt->ops->rpcbind(task);
1567 	}
1568 }
1569 
1570 /*
1571  * 4a.	Sort out bind result
1572  */
1573 static void
1574 call_bind_status(struct rpc_task *task)
1575 {
1576 	int status = -EIO;
1577 
1578 	if (task->tk_status >= 0) {
1579 		dprint_status(task);
1580 		task->tk_status = 0;
1581 		task->tk_action = call_connect;
1582 		return;
1583 	}
1584 
1585 	trace_rpc_bind_status(task);
1586 	switch (task->tk_status) {
1587 	case -ENOMEM:
1588 		dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1589 		rpc_delay(task, HZ >> 2);
1590 		goto retry_timeout;
1591 	case -EACCES:
1592 		dprintk("RPC: %5u remote rpcbind: RPC program/version "
1593 				"unavailable\n", task->tk_pid);
1594 		/* fail immediately if this is an RPC ping */
1595 		if (task->tk_msg.rpc_proc->p_proc == 0) {
1596 			status = -EOPNOTSUPP;
1597 			break;
1598 		}
1599 		if (task->tk_rebind_retry == 0)
1600 			break;
1601 		task->tk_rebind_retry--;
1602 		rpc_delay(task, 3*HZ);
1603 		goto retry_timeout;
1604 	case -ETIMEDOUT:
1605 		dprintk("RPC: %5u rpcbind request timed out\n",
1606 				task->tk_pid);
1607 		goto retry_timeout;
1608 	case -EPFNOSUPPORT:
1609 		/* server doesn't support any rpcbind version we know of */
1610 		dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1611 				task->tk_pid);
1612 		break;
1613 	case -EPROTONOSUPPORT:
1614 		dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1615 				task->tk_pid);
1616 		task->tk_status = 0;
1617 		task->tk_action = call_bind;
1618 		return;
1619 	case -ECONNREFUSED:		/* connection problems */
1620 	case -ECONNRESET:
1621 	case -ENOTCONN:
1622 	case -EHOSTDOWN:
1623 	case -EHOSTUNREACH:
1624 	case -ENETUNREACH:
1625 	case -EPIPE:
1626 		dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1627 				task->tk_pid, task->tk_status);
1628 		if (!RPC_IS_SOFTCONN(task)) {
1629 			rpc_delay(task, 5*HZ);
1630 			goto retry_timeout;
1631 		}
1632 		status = task->tk_status;
1633 		break;
1634 	default:
1635 		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1636 				task->tk_pid, -task->tk_status);
1637 	}
1638 
1639 	rpc_exit(task, status);
1640 	return;
1641 
1642 retry_timeout:
1643 	task->tk_action = call_timeout;
1644 }
1645 
1646 /*
1647  * 4b.	Connect to the RPC server
1648  */
1649 static void
1650 call_connect(struct rpc_task *task)
1651 {
1652 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1653 
1654 	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1655 			task->tk_pid, xprt,
1656 			(xprt_connected(xprt) ? "is" : "is not"));
1657 
1658 	task->tk_action = call_transmit;
1659 	if (!xprt_connected(xprt)) {
1660 		task->tk_action = call_connect_status;
1661 		if (task->tk_status < 0)
1662 			return;
1663 		if (task->tk_flags & RPC_TASK_NOCONNECT) {
1664 			rpc_exit(task, -ENOTCONN);
1665 			return;
1666 		}
1667 		xprt_connect(task);
1668 	}
1669 }
1670 
1671 /*
1672  * 4c.	Sort out connect result
1673  */
1674 static void
1675 call_connect_status(struct rpc_task *task)
1676 {
1677 	struct rpc_clnt *clnt = task->tk_client;
1678 	int status = task->tk_status;
1679 
1680 	dprint_status(task);
1681 
1682 	trace_rpc_connect_status(task, status);
1683 	switch (status) {
1684 		/* if soft mounted, test if we've timed out */
1685 	case -ETIMEDOUT:
1686 		task->tk_action = call_timeout;
1687 		return;
1688 	case -ECONNREFUSED:
1689 	case -ECONNRESET:
1690 	case -ENETUNREACH:
1691 		if (RPC_IS_SOFTCONN(task))
1692 			break;
1693 		/* retry with existing socket, after a delay */
1694 	case 0:
1695 	case -EAGAIN:
1696 		task->tk_status = 0;
1697 		clnt->cl_stats->netreconn++;
1698 		task->tk_action = call_transmit;
1699 		return;
1700 	}
1701 	rpc_exit(task, status);
1702 }
1703 
1704 /*
1705  * 5.	Transmit the RPC request, and wait for reply
1706  */
1707 static void
1708 call_transmit(struct rpc_task *task)
1709 {
1710 	dprint_status(task);
1711 
1712 	task->tk_action = call_status;
1713 	if (task->tk_status < 0)
1714 		return;
1715 	task->tk_status = xprt_prepare_transmit(task);
1716 	if (task->tk_status != 0)
1717 		return;
1718 	task->tk_action = call_transmit_status;
1719 	/* Encode here so that rpcsec_gss can use correct sequence number. */
1720 	if (rpc_task_need_encode(task)) {
1721 		rpc_xdr_encode(task);
1722 		/* Did the encode result in an error condition? */
1723 		if (task->tk_status != 0) {
1724 			/* Was the error nonfatal? */
1725 			if (task->tk_status == -EAGAIN)
1726 				rpc_delay(task, HZ >> 4);
1727 			else
1728 				rpc_exit(task, task->tk_status);
1729 			return;
1730 		}
1731 	}
1732 	xprt_transmit(task);
1733 	if (task->tk_status < 0)
1734 		return;
1735 	/*
1736 	 * On success, ensure that we call xprt_end_transmit() before sleeping
1737 	 * in order to allow access to the socket to other RPC requests.
1738 	 */
1739 	call_transmit_status(task);
1740 	if (rpc_reply_expected(task))
1741 		return;
1742 	task->tk_action = rpc_exit_task;
1743 	rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1744 }
1745 
1746 /*
1747  * 5a.	Handle cleanup after a transmission
1748  */
1749 static void
1750 call_transmit_status(struct rpc_task *task)
1751 {
1752 	task->tk_action = call_status;
1753 
1754 	/*
1755 	 * Common case: success.  Force the compiler to put this
1756 	 * test first.
1757 	 */
1758 	if (task->tk_status == 0) {
1759 		xprt_end_transmit(task);
1760 		rpc_task_force_reencode(task);
1761 		return;
1762 	}
1763 
1764 	switch (task->tk_status) {
1765 	case -EAGAIN:
1766 		break;
1767 	default:
1768 		dprint_status(task);
1769 		xprt_end_transmit(task);
1770 		rpc_task_force_reencode(task);
1771 		break;
1772 		/*
1773 		 * Special cases: if we've been waiting on the
1774 		 * socket's write_space() callback, or if the
1775 		 * socket just returned a connection error,
1776 		 * then hold onto the transport lock.
1777 		 */
1778 	case -ECONNREFUSED:
1779 	case -EHOSTDOWN:
1780 	case -EHOSTUNREACH:
1781 	case -ENETUNREACH:
1782 		if (RPC_IS_SOFTCONN(task)) {
1783 			xprt_end_transmit(task);
1784 			rpc_exit(task, task->tk_status);
1785 			break;
1786 		}
1787 	case -ECONNRESET:
1788 	case -ENOTCONN:
1789 	case -EPIPE:
1790 		rpc_task_force_reencode(task);
1791 	}
1792 }
1793 
1794 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1795 /*
1796  * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
1797  * addition, disconnect on connectivity errors.
1798  */
1799 static void
1800 call_bc_transmit(struct rpc_task *task)
1801 {
1802 	struct rpc_rqst *req = task->tk_rqstp;
1803 
1804 	task->tk_status = xprt_prepare_transmit(task);
1805 	if (task->tk_status == -EAGAIN) {
1806 		/*
1807 		 * Could not reserve the transport. Try again after the
1808 		 * transport is released.
1809 		 */
1810 		task->tk_status = 0;
1811 		task->tk_action = call_bc_transmit;
1812 		return;
1813 	}
1814 
1815 	task->tk_action = rpc_exit_task;
1816 	if (task->tk_status < 0) {
1817 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1818 			"error: %d\n", task->tk_status);
1819 		return;
1820 	}
1821 
1822 	xprt_transmit(task);
1823 	xprt_end_transmit(task);
1824 	dprint_status(task);
1825 	switch (task->tk_status) {
1826 	case 0:
1827 		/* Success */
1828 		break;
1829 	case -EHOSTDOWN:
1830 	case -EHOSTUNREACH:
1831 	case -ENETUNREACH:
1832 	case -ETIMEDOUT:
1833 		/*
1834 		 * Problem reaching the server.  Disconnect and let the
1835 		 * forechannel reestablish the connection.  The server will
1836 		 * have to retransmit the backchannel request and we'll
1837 		 * reprocess it.  Since these ops are idempotent, there's no
1838 		 * need to cache our reply at this time.
1839 		 */
1840 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1841 			"error: %d\n", task->tk_status);
1842 		xprt_conditional_disconnect(req->rq_xprt,
1843 			req->rq_connect_cookie);
1844 		break;
1845 	default:
1846 		/*
1847 		 * We were unable to reply and will have to drop the
1848 		 * request.  The server should reconnect and retransmit.
1849 		 */
1850 		WARN_ON_ONCE(task->tk_status == -EAGAIN);
1851 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1852 			"error: %d\n", task->tk_status);
1853 		break;
1854 	}
1855 	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1856 }
1857 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1858 
1859 /*
1860  * 6.	Sort out the RPC call status
1861  */
1862 static void
1863 call_status(struct rpc_task *task)
1864 {
1865 	struct rpc_clnt	*clnt = task->tk_client;
1866 	struct rpc_rqst	*req = task->tk_rqstp;
1867 	int		status;
1868 
1869 	if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1870 		task->tk_status = req->rq_reply_bytes_recvd;
1871 
1872 	dprint_status(task);
1873 
1874 	status = task->tk_status;
1875 	if (status >= 0) {
1876 		task->tk_action = call_decode;
1877 		return;
1878 	}
1879 
1880 	trace_rpc_call_status(task);
1881 	task->tk_status = 0;
1882 	switch(status) {
1883 	case -EHOSTDOWN:
1884 	case -EHOSTUNREACH:
1885 	case -ENETUNREACH:
1886 		/*
1887 		 * Delay any retries for 3 seconds, then handle as if it
1888 		 * were a timeout.
1889 		 */
1890 		rpc_delay(task, 3*HZ);
1891 	case -ETIMEDOUT:
1892 		task->tk_action = call_timeout;
1893 		if (task->tk_client->cl_discrtry)
1894 			xprt_conditional_disconnect(req->rq_xprt,
1895 					req->rq_connect_cookie);
1896 		break;
1897 	case -ECONNRESET:
1898 	case -ECONNREFUSED:
1899 		rpc_force_rebind(clnt);
1900 		rpc_delay(task, 3*HZ);
1901 	case -EPIPE:
1902 	case -ENOTCONN:
1903 		task->tk_action = call_bind;
1904 		break;
1905 	case -EAGAIN:
1906 		task->tk_action = call_transmit;
1907 		break;
1908 	case -EIO:
1909 		/* shutdown or soft timeout */
1910 		rpc_exit(task, status);
1911 		break;
1912 	default:
1913 		if (clnt->cl_chatty)
1914 			printk("%s: RPC call returned error %d\n",
1915 			       clnt->cl_protname, -status);
1916 		rpc_exit(task, status);
1917 	}
1918 }
1919 
1920 /*
1921  * 6a.	Handle RPC timeout
1922  * 	We do not release the request slot, so we keep using the
1923  *	same XID for all retransmits.
1924  */
1925 static void
1926 call_timeout(struct rpc_task *task)
1927 {
1928 	struct rpc_clnt	*clnt = task->tk_client;
1929 
1930 	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1931 		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1932 		goto retry;
1933 	}
1934 
1935 	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1936 	task->tk_timeouts++;
1937 
1938 	if (RPC_IS_SOFTCONN(task)) {
1939 		rpc_exit(task, -ETIMEDOUT);
1940 		return;
1941 	}
1942 	if (RPC_IS_SOFT(task)) {
1943 		if (clnt->cl_chatty) {
1944 			rcu_read_lock();
1945 			printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1946 				clnt->cl_protname,
1947 				rcu_dereference(clnt->cl_xprt)->servername);
1948 			rcu_read_unlock();
1949 		}
1950 		if (task->tk_flags & RPC_TASK_TIMEOUT)
1951 			rpc_exit(task, -ETIMEDOUT);
1952 		else
1953 			rpc_exit(task, -EIO);
1954 		return;
1955 	}
1956 
1957 	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1958 		task->tk_flags |= RPC_CALL_MAJORSEEN;
1959 		if (clnt->cl_chatty) {
1960 			rcu_read_lock();
1961 			printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1962 			clnt->cl_protname,
1963 			rcu_dereference(clnt->cl_xprt)->servername);
1964 			rcu_read_unlock();
1965 		}
1966 	}
1967 	rpc_force_rebind(clnt);
1968 	/*
1969 	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
1970 	 * event? RFC2203 requires the server to drop all such requests.
1971 	 */
1972 	rpcauth_invalcred(task);
1973 
1974 retry:
1975 	clnt->cl_stats->rpcretrans++;
1976 	task->tk_action = call_bind;
1977 	task->tk_status = 0;
1978 }
1979 
1980 /*
1981  * 7.	Decode the RPC reply
1982  */
1983 static void
1984 call_decode(struct rpc_task *task)
1985 {
1986 	struct rpc_clnt	*clnt = task->tk_client;
1987 	struct rpc_rqst	*req = task->tk_rqstp;
1988 	kxdrdproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1989 	__be32		*p;
1990 
1991 	dprint_status(task);
1992 
1993 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1994 		if (clnt->cl_chatty) {
1995 			rcu_read_lock();
1996 			printk(KERN_NOTICE "%s: server %s OK\n",
1997 				clnt->cl_protname,
1998 				rcu_dereference(clnt->cl_xprt)->servername);
1999 			rcu_read_unlock();
2000 		}
2001 		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2002 	}
2003 
2004 	/*
2005 	 * Ensure that we see all writes made by xprt_complete_rqst()
2006 	 * before it changed req->rq_reply_bytes_recvd.
2007 	 */
2008 	smp_rmb();
2009 	req->rq_rcv_buf.len = req->rq_private_buf.len;
2010 
2011 	/* Check that the softirq receive buffer is valid */
2012 	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2013 				sizeof(req->rq_rcv_buf)) != 0);
2014 
2015 	if (req->rq_rcv_buf.len < 12) {
2016 		if (!RPC_IS_SOFT(task)) {
2017 			task->tk_action = call_bind;
2018 			clnt->cl_stats->rpcretrans++;
2019 			goto out_retry;
2020 		}
2021 		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2022 				clnt->cl_protname, task->tk_status);
2023 		task->tk_action = call_timeout;
2024 		goto out_retry;
2025 	}
2026 
2027 	p = rpc_verify_header(task);
2028 	if (IS_ERR(p)) {
2029 		if (p == ERR_PTR(-EAGAIN))
2030 			goto out_retry;
2031 		return;
2032 	}
2033 
2034 	task->tk_action = rpc_exit_task;
2035 
2036 	if (decode) {
2037 		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2038 						      task->tk_msg.rpc_resp);
2039 	}
2040 	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2041 			task->tk_status);
2042 	return;
2043 out_retry:
2044 	task->tk_status = 0;
2045 	/* Note: rpc_verify_header() may have freed the RPC slot */
2046 	if (task->tk_rqstp == req) {
2047 		req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2048 		if (task->tk_client->cl_discrtry)
2049 			xprt_conditional_disconnect(req->rq_xprt,
2050 					req->rq_connect_cookie);
2051 	}
2052 }
2053 
2054 static __be32 *
2055 rpc_encode_header(struct rpc_task *task)
2056 {
2057 	struct rpc_clnt *clnt = task->tk_client;
2058 	struct rpc_rqst	*req = task->tk_rqstp;
2059 	__be32		*p = req->rq_svec[0].iov_base;
2060 
2061 	/* FIXME: check buffer size? */
2062 
2063 	p = xprt_skip_transport_header(req->rq_xprt, p);
2064 	*p++ = req->rq_xid;		/* XID */
2065 	*p++ = htonl(RPC_CALL);		/* CALL */
2066 	*p++ = htonl(RPC_VERSION);	/* RPC version */
2067 	*p++ = htonl(clnt->cl_prog);	/* program number */
2068 	*p++ = htonl(clnt->cl_vers);	/* program version */
2069 	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
2070 	p = rpcauth_marshcred(task, p);
2071 	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2072 	return p;
2073 }
2074 
2075 static __be32 *
2076 rpc_verify_header(struct rpc_task *task)
2077 {
2078 	struct rpc_clnt *clnt = task->tk_client;
2079 	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2080 	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2081 	__be32	*p = iov->iov_base;
2082 	u32 n;
2083 	int error = -EACCES;
2084 
2085 	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2086 		/* RFC-1014 says that the representation of XDR data must be a
2087 		 * multiple of four bytes
2088 		 * - if it isn't pointer subtraction in the NFS client may give
2089 		 *   undefined results
2090 		 */
2091 		dprintk("RPC: %5u %s: XDR representation not a multiple of"
2092 		       " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2093 		       task->tk_rqstp->rq_rcv_buf.len);
2094 		goto out_eio;
2095 	}
2096 	if ((len -= 3) < 0)
2097 		goto out_overflow;
2098 
2099 	p += 1; /* skip XID */
2100 	if ((n = ntohl(*p++)) != RPC_REPLY) {
2101 		dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2102 			task->tk_pid, __func__, n);
2103 		goto out_garbage;
2104 	}
2105 
2106 	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2107 		if (--len < 0)
2108 			goto out_overflow;
2109 		switch ((n = ntohl(*p++))) {
2110 		case RPC_AUTH_ERROR:
2111 			break;
2112 		case RPC_MISMATCH:
2113 			dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2114 				task->tk_pid, __func__);
2115 			error = -EPROTONOSUPPORT;
2116 			goto out_err;
2117 		default:
2118 			dprintk("RPC: %5u %s: RPC call rejected, "
2119 				"unknown error: %x\n",
2120 				task->tk_pid, __func__, n);
2121 			goto out_eio;
2122 		}
2123 		if (--len < 0)
2124 			goto out_overflow;
2125 		switch ((n = ntohl(*p++))) {
2126 		case RPC_AUTH_REJECTEDCRED:
2127 		case RPC_AUTH_REJECTEDVERF:
2128 		case RPCSEC_GSS_CREDPROBLEM:
2129 		case RPCSEC_GSS_CTXPROBLEM:
2130 			if (!task->tk_cred_retry)
2131 				break;
2132 			task->tk_cred_retry--;
2133 			dprintk("RPC: %5u %s: retry stale creds\n",
2134 					task->tk_pid, __func__);
2135 			rpcauth_invalcred(task);
2136 			/* Ensure we obtain a new XID! */
2137 			xprt_release(task);
2138 			task->tk_action = call_reserve;
2139 			goto out_retry;
2140 		case RPC_AUTH_BADCRED:
2141 		case RPC_AUTH_BADVERF:
2142 			/* possibly garbled cred/verf? */
2143 			if (!task->tk_garb_retry)
2144 				break;
2145 			task->tk_garb_retry--;
2146 			dprintk("RPC: %5u %s: retry garbled creds\n",
2147 					task->tk_pid, __func__);
2148 			task->tk_action = call_bind;
2149 			goto out_retry;
2150 		case RPC_AUTH_TOOWEAK:
2151 			rcu_read_lock();
2152 			printk(KERN_NOTICE "RPC: server %s requires stronger "
2153 			       "authentication.\n",
2154 			       rcu_dereference(clnt->cl_xprt)->servername);
2155 			rcu_read_unlock();
2156 			break;
2157 		default:
2158 			dprintk("RPC: %5u %s: unknown auth error: %x\n",
2159 					task->tk_pid, __func__, n);
2160 			error = -EIO;
2161 		}
2162 		dprintk("RPC: %5u %s: call rejected %d\n",
2163 				task->tk_pid, __func__, n);
2164 		goto out_err;
2165 	}
2166 	if (!(p = rpcauth_checkverf(task, p))) {
2167 		dprintk("RPC: %5u %s: auth check failed\n",
2168 				task->tk_pid, __func__);
2169 		goto out_garbage;		/* bad verifier, retry */
2170 	}
2171 	len = p - (__be32 *)iov->iov_base - 1;
2172 	if (len < 0)
2173 		goto out_overflow;
2174 	switch ((n = ntohl(*p++))) {
2175 	case RPC_SUCCESS:
2176 		return p;
2177 	case RPC_PROG_UNAVAIL:
2178 		dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2179 				"by server %s\n", task->tk_pid, __func__,
2180 				(unsigned int)clnt->cl_prog,
2181 				rcu_dereference(clnt->cl_xprt)->servername);
2182 		error = -EPFNOSUPPORT;
2183 		goto out_err;
2184 	case RPC_PROG_MISMATCH:
2185 		dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2186 				"by server %s\n", task->tk_pid, __func__,
2187 				(unsigned int)clnt->cl_prog,
2188 				(unsigned int)clnt->cl_vers,
2189 				rcu_dereference(clnt->cl_xprt)->servername);
2190 		error = -EPROTONOSUPPORT;
2191 		goto out_err;
2192 	case RPC_PROC_UNAVAIL:
2193 		dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2194 				"version %u on server %s\n",
2195 				task->tk_pid, __func__,
2196 				rpc_proc_name(task),
2197 				clnt->cl_prog, clnt->cl_vers,
2198 				rcu_dereference(clnt->cl_xprt)->servername);
2199 		error = -EOPNOTSUPP;
2200 		goto out_err;
2201 	case RPC_GARBAGE_ARGS:
2202 		dprintk("RPC: %5u %s: server saw garbage\n",
2203 				task->tk_pid, __func__);
2204 		break;			/* retry */
2205 	default:
2206 		dprintk("RPC: %5u %s: server accept status: %x\n",
2207 				task->tk_pid, __func__, n);
2208 		/* Also retry */
2209 	}
2210 
2211 out_garbage:
2212 	clnt->cl_stats->rpcgarbage++;
2213 	if (task->tk_garb_retry) {
2214 		task->tk_garb_retry--;
2215 		dprintk("RPC: %5u %s: retrying\n",
2216 				task->tk_pid, __func__);
2217 		task->tk_action = call_bind;
2218 out_retry:
2219 		return ERR_PTR(-EAGAIN);
2220 	}
2221 out_eio:
2222 	error = -EIO;
2223 out_err:
2224 	rpc_exit(task, error);
2225 	dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2226 			__func__, error);
2227 	return ERR_PTR(error);
2228 out_overflow:
2229 	dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2230 			__func__);
2231 	goto out_garbage;
2232 }
2233 
2234 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2235 {
2236 }
2237 
2238 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2239 {
2240 	return 0;
2241 }
2242 
2243 static struct rpc_procinfo rpcproc_null = {
2244 	.p_encode = rpcproc_encode_null,
2245 	.p_decode = rpcproc_decode_null,
2246 };
2247 
2248 static int rpc_ping(struct rpc_clnt *clnt)
2249 {
2250 	struct rpc_message msg = {
2251 		.rpc_proc = &rpcproc_null,
2252 	};
2253 	int err;
2254 	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2255 	err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2256 	put_rpccred(msg.rpc_cred);
2257 	return err;
2258 }
2259 
2260 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2261 {
2262 	struct rpc_message msg = {
2263 		.rpc_proc = &rpcproc_null,
2264 		.rpc_cred = cred,
2265 	};
2266 	struct rpc_task_setup task_setup_data = {
2267 		.rpc_client = clnt,
2268 		.rpc_message = &msg,
2269 		.callback_ops = &rpc_default_ops,
2270 		.flags = flags,
2271 	};
2272 	return rpc_run_task(&task_setup_data);
2273 }
2274 EXPORT_SYMBOL_GPL(rpc_call_null);
2275 
2276 #ifdef RPC_DEBUG
2277 static void rpc_show_header(void)
2278 {
2279 	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2280 		"-timeout ---ops--\n");
2281 }
2282 
2283 static void rpc_show_task(const struct rpc_clnt *clnt,
2284 			  const struct rpc_task *task)
2285 {
2286 	const char *rpc_waitq = "none";
2287 
2288 	if (RPC_IS_QUEUED(task))
2289 		rpc_waitq = rpc_qname(task->tk_waitqueue);
2290 
2291 	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2292 		task->tk_pid, task->tk_flags, task->tk_status,
2293 		clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2294 		clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2295 		task->tk_action, rpc_waitq);
2296 }
2297 
2298 void rpc_show_tasks(struct net *net)
2299 {
2300 	struct rpc_clnt *clnt;
2301 	struct rpc_task *task;
2302 	int header = 0;
2303 	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2304 
2305 	spin_lock(&sn->rpc_client_lock);
2306 	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2307 		spin_lock(&clnt->cl_lock);
2308 		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2309 			if (!header) {
2310 				rpc_show_header();
2311 				header++;
2312 			}
2313 			rpc_show_task(clnt, task);
2314 		}
2315 		spin_unlock(&clnt->cl_lock);
2316 	}
2317 	spin_unlock(&sn->rpc_client_lock);
2318 }
2319 #endif
2320