xref: /openbmc/linux/net/sunrpc/clnt.c (revision cd354f1a)
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -	RPC header generation and argument serialization.
9  *  -	Credential refresh.
10  *  -	TCP connect handling.
11  *  -	Retry of operation when it is suspected the operation failed because
12  *	of uid squashing on the server, or when the credentials were stale
13  *	and need to be refreshed, or when a packet was damaged in transit.
14  *	This may be have to be moved to the VFS layer.
15  *
16  *  NB: BSD uses a more intelligent approach to guessing when a request
17  *  or reply has been lost by keeping the RTO estimate for each procedure.
18  *  We currently make do with a constant timeout value.
19  *
20  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
21  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
22  */
23 
24 #include <asm/system.h>
25 
26 #include <linux/module.h>
27 #include <linux/types.h>
28 #include <linux/mm.h>
29 #include <linux/slab.h>
30 #include <linux/smp_lock.h>
31 #include <linux/utsname.h>
32 #include <linux/workqueue.h>
33 
34 #include <linux/sunrpc/clnt.h>
35 #include <linux/sunrpc/rpc_pipe_fs.h>
36 #include <linux/sunrpc/metrics.h>
37 
38 
39 #define RPC_SLACK_SPACE		(1024)	/* total overkill */
40 
41 #ifdef RPC_DEBUG
42 # define RPCDBG_FACILITY	RPCDBG_CALL
43 #endif
44 
45 #define dprint_status(t)					\
46 	dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,		\
47 			__FUNCTION__, t->tk_status)
48 
49 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
50 
51 
52 static void	call_start(struct rpc_task *task);
53 static void	call_reserve(struct rpc_task *task);
54 static void	call_reserveresult(struct rpc_task *task);
55 static void	call_allocate(struct rpc_task *task);
56 static void	call_encode(struct rpc_task *task);
57 static void	call_decode(struct rpc_task *task);
58 static void	call_bind(struct rpc_task *task);
59 static void	call_bind_status(struct rpc_task *task);
60 static void	call_transmit(struct rpc_task *task);
61 static void	call_status(struct rpc_task *task);
62 static void	call_transmit_status(struct rpc_task *task);
63 static void	call_refresh(struct rpc_task *task);
64 static void	call_refreshresult(struct rpc_task *task);
65 static void	call_timeout(struct rpc_task *task);
66 static void	call_connect(struct rpc_task *task);
67 static void	call_connect_status(struct rpc_task *task);
68 static __be32 *	call_header(struct rpc_task *task);
69 static __be32 *	call_verify(struct rpc_task *task);
70 
71 
72 static int
73 rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
74 {
75 	static uint32_t clntid;
76 	int error;
77 
78 	clnt->cl_vfsmnt = ERR_PTR(-ENOENT);
79 	clnt->cl_dentry = ERR_PTR(-ENOENT);
80 	if (dir_name == NULL)
81 		return 0;
82 
83 	clnt->cl_vfsmnt = rpc_get_mount();
84 	if (IS_ERR(clnt->cl_vfsmnt))
85 		return PTR_ERR(clnt->cl_vfsmnt);
86 
87 	for (;;) {
88 		snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname),
89 				"%s/clnt%x", dir_name,
90 				(unsigned int)clntid++);
91 		clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0';
92 		clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt);
93 		if (!IS_ERR(clnt->cl_dentry))
94 			return 0;
95 		error = PTR_ERR(clnt->cl_dentry);
96 		if (error != -EEXIST) {
97 			printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n",
98 					clnt->cl_pathname, error);
99 			rpc_put_mount();
100 			return error;
101 		}
102 	}
103 }
104 
105 static struct rpc_clnt * rpc_new_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor)
106 {
107 	struct rpc_version	*version;
108 	struct rpc_clnt		*clnt = NULL;
109 	struct rpc_auth		*auth;
110 	int err;
111 	int len;
112 
113 	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
114 			program->name, servname, xprt);
115 
116 	err = -EINVAL;
117 	if (!xprt)
118 		goto out_no_xprt;
119 	if (vers >= program->nrvers || !(version = program->version[vers]))
120 		goto out_err;
121 
122 	err = -ENOMEM;
123 	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
124 	if (!clnt)
125 		goto out_err;
126 	atomic_set(&clnt->cl_users, 0);
127 	atomic_set(&clnt->cl_count, 1);
128 	clnt->cl_parent = clnt;
129 
130 	clnt->cl_server = clnt->cl_inline_name;
131 	len = strlen(servname) + 1;
132 	if (len > sizeof(clnt->cl_inline_name)) {
133 		char *buf = kmalloc(len, GFP_KERNEL);
134 		if (buf != 0)
135 			clnt->cl_server = buf;
136 		else
137 			len = sizeof(clnt->cl_inline_name);
138 	}
139 	strlcpy(clnt->cl_server, servname, len);
140 
141 	clnt->cl_xprt     = xprt;
142 	clnt->cl_procinfo = version->procs;
143 	clnt->cl_maxproc  = version->nrprocs;
144 	clnt->cl_protname = program->name;
145 	clnt->cl_prog     = program->number;
146 	clnt->cl_vers     = version->number;
147 	clnt->cl_stats    = program->stats;
148 	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
149 	err = -ENOMEM;
150 	if (clnt->cl_metrics == NULL)
151 		goto out_no_stats;
152 	clnt->cl_program  = program;
153 
154 	if (!xprt_bound(clnt->cl_xprt))
155 		clnt->cl_autobind = 1;
156 
157 	clnt->cl_rtt = &clnt->cl_rtt_default;
158 	rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval);
159 
160 	err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
161 	if (err < 0)
162 		goto out_no_path;
163 
164 	auth = rpcauth_create(flavor, clnt);
165 	if (IS_ERR(auth)) {
166 		printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
167 				flavor);
168 		err = PTR_ERR(auth);
169 		goto out_no_auth;
170 	}
171 
172 	/* save the nodename */
173 	clnt->cl_nodelen = strlen(utsname()->nodename);
174 	if (clnt->cl_nodelen > UNX_MAXNODENAME)
175 		clnt->cl_nodelen = UNX_MAXNODENAME;
176 	memcpy(clnt->cl_nodename, utsname()->nodename, clnt->cl_nodelen);
177 	return clnt;
178 
179 out_no_auth:
180 	if (!IS_ERR(clnt->cl_dentry)) {
181 		rpc_rmdir(clnt->cl_dentry);
182 		rpc_put_mount();
183 	}
184 out_no_path:
185 	rpc_free_iostats(clnt->cl_metrics);
186 out_no_stats:
187 	if (clnt->cl_server != clnt->cl_inline_name)
188 		kfree(clnt->cl_server);
189 	kfree(clnt);
190 out_err:
191 	xprt_put(xprt);
192 out_no_xprt:
193 	return ERR_PTR(err);
194 }
195 
196 /*
197  * rpc_create - create an RPC client and transport with one call
198  * @args: rpc_clnt create argument structure
199  *
200  * Creates and initializes an RPC transport and an RPC client.
201  *
202  * It can ping the server in order to determine if it is up, and to see if
203  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
204  * this behavior so asynchronous tasks can also use rpc_create.
205  */
206 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
207 {
208 	struct rpc_xprt *xprt;
209 	struct rpc_clnt *clnt;
210 
211 	xprt = xprt_create_transport(args->protocol, args->address,
212 					args->addrsize, args->timeout);
213 	if (IS_ERR(xprt))
214 		return (struct rpc_clnt *)xprt;
215 
216 	/*
217 	 * By default, kernel RPC client connects from a reserved port.
218 	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
219 	 * but it is always enabled for rpciod, which handles the connect
220 	 * operation.
221 	 */
222 	xprt->resvport = 1;
223 	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
224 		xprt->resvport = 0;
225 
226 	dprintk("RPC:       creating %s client for %s (xprt %p)\n",
227 			args->program->name, args->servername, xprt);
228 
229 	clnt = rpc_new_client(xprt, args->servername, args->program,
230 				args->version, args->authflavor);
231 	if (IS_ERR(clnt))
232 		return clnt;
233 
234 	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
235 		int err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
236 		if (err != 0) {
237 			rpc_shutdown_client(clnt);
238 			return ERR_PTR(err);
239 		}
240 	}
241 
242 	clnt->cl_softrtry = 1;
243 	if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
244 		clnt->cl_softrtry = 0;
245 
246 	if (args->flags & RPC_CLNT_CREATE_INTR)
247 		clnt->cl_intr = 1;
248 	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
249 		clnt->cl_autobind = 1;
250 	if (args->flags & RPC_CLNT_CREATE_ONESHOT)
251 		clnt->cl_oneshot = 1;
252 	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
253 		clnt->cl_discrtry = 1;
254 
255 	return clnt;
256 }
257 EXPORT_SYMBOL_GPL(rpc_create);
258 
259 /*
260  * This function clones the RPC client structure. It allows us to share the
261  * same transport while varying parameters such as the authentication
262  * flavour.
263  */
264 struct rpc_clnt *
265 rpc_clone_client(struct rpc_clnt *clnt)
266 {
267 	struct rpc_clnt *new;
268 	int err = -ENOMEM;
269 
270 	new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
271 	if (!new)
272 		goto out_no_clnt;
273 	atomic_set(&new->cl_count, 1);
274 	atomic_set(&new->cl_users, 0);
275 	new->cl_metrics = rpc_alloc_iostats(clnt);
276 	if (new->cl_metrics == NULL)
277 		goto out_no_stats;
278 	err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
279 	if (err != 0)
280 		goto out_no_path;
281 	new->cl_parent = clnt;
282 	atomic_inc(&clnt->cl_count);
283 	new->cl_xprt = xprt_get(clnt->cl_xprt);
284 	/* Turn off autobind on clones */
285 	new->cl_autobind = 0;
286 	new->cl_oneshot = 0;
287 	new->cl_dead = 0;
288 	rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval);
289 	if (new->cl_auth)
290 		atomic_inc(&new->cl_auth->au_count);
291 	return new;
292 out_no_path:
293 	rpc_free_iostats(new->cl_metrics);
294 out_no_stats:
295 	kfree(new);
296 out_no_clnt:
297 	dprintk("RPC:       %s: returned error %d\n", __FUNCTION__, err);
298 	return ERR_PTR(err);
299 }
300 
301 /*
302  * Properly shut down an RPC client, terminating all outstanding
303  * requests. Note that we must be certain that cl_oneshot and
304  * cl_dead are cleared, or else the client would be destroyed
305  * when the last task releases it.
306  */
307 int
308 rpc_shutdown_client(struct rpc_clnt *clnt)
309 {
310 	dprintk("RPC:       shutting down %s client for %s, tasks=%d\n",
311 			clnt->cl_protname, clnt->cl_server,
312 			atomic_read(&clnt->cl_users));
313 
314 	while (atomic_read(&clnt->cl_users) > 0) {
315 		/* Don't let rpc_release_client destroy us */
316 		clnt->cl_oneshot = 0;
317 		clnt->cl_dead = 0;
318 		rpc_killall_tasks(clnt);
319 		wait_event_timeout(destroy_wait,
320 			!atomic_read(&clnt->cl_users), 1*HZ);
321 	}
322 
323 	if (atomic_read(&clnt->cl_users) < 0) {
324 		printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n",
325 				clnt, atomic_read(&clnt->cl_users));
326 #ifdef RPC_DEBUG
327 		rpc_show_tasks();
328 #endif
329 		BUG();
330 	}
331 
332 	return rpc_destroy_client(clnt);
333 }
334 
335 /*
336  * Delete an RPC client
337  */
338 int
339 rpc_destroy_client(struct rpc_clnt *clnt)
340 {
341 	if (!atomic_dec_and_test(&clnt->cl_count))
342 		return 1;
343 	BUG_ON(atomic_read(&clnt->cl_users) != 0);
344 
345 	dprintk("RPC:       destroying %s client for %s\n",
346 			clnt->cl_protname, clnt->cl_server);
347 	if (clnt->cl_auth) {
348 		rpcauth_destroy(clnt->cl_auth);
349 		clnt->cl_auth = NULL;
350 	}
351 	if (!IS_ERR(clnt->cl_dentry)) {
352 		rpc_rmdir(clnt->cl_dentry);
353 		rpc_put_mount();
354 	}
355 	if (clnt->cl_parent != clnt) {
356 		rpc_destroy_client(clnt->cl_parent);
357 		goto out_free;
358 	}
359 	if (clnt->cl_server != clnt->cl_inline_name)
360 		kfree(clnt->cl_server);
361 out_free:
362 	rpc_free_iostats(clnt->cl_metrics);
363 	clnt->cl_metrics = NULL;
364 	xprt_put(clnt->cl_xprt);
365 	kfree(clnt);
366 	return 0;
367 }
368 
369 /*
370  * Release an RPC client
371  */
372 void
373 rpc_release_client(struct rpc_clnt *clnt)
374 {
375 	dprintk("RPC:       rpc_release_client(%p, %d)\n",
376 			clnt, atomic_read(&clnt->cl_users));
377 
378 	if (!atomic_dec_and_test(&clnt->cl_users))
379 		return;
380 	wake_up(&destroy_wait);
381 	if (clnt->cl_oneshot || clnt->cl_dead)
382 		rpc_destroy_client(clnt);
383 }
384 
385 /**
386  * rpc_bind_new_program - bind a new RPC program to an existing client
387  * @old - old rpc_client
388  * @program - rpc program to set
389  * @vers - rpc program version
390  *
391  * Clones the rpc client and sets up a new RPC program. This is mainly
392  * of use for enabling different RPC programs to share the same transport.
393  * The Sun NFSv2/v3 ACL protocol can do this.
394  */
395 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
396 				      struct rpc_program *program,
397 				      int vers)
398 {
399 	struct rpc_clnt *clnt;
400 	struct rpc_version *version;
401 	int err;
402 
403 	BUG_ON(vers >= program->nrvers || !program->version[vers]);
404 	version = program->version[vers];
405 	clnt = rpc_clone_client(old);
406 	if (IS_ERR(clnt))
407 		goto out;
408 	clnt->cl_procinfo = version->procs;
409 	clnt->cl_maxproc  = version->nrprocs;
410 	clnt->cl_protname = program->name;
411 	clnt->cl_prog     = program->number;
412 	clnt->cl_vers     = version->number;
413 	clnt->cl_stats    = program->stats;
414 	err = rpc_ping(clnt, RPC_TASK_SOFT|RPC_TASK_NOINTR);
415 	if (err != 0) {
416 		rpc_shutdown_client(clnt);
417 		clnt = ERR_PTR(err);
418 	}
419 out:
420 	return clnt;
421 }
422 
423 /*
424  * Default callback for async RPC calls
425  */
426 static void
427 rpc_default_callback(struct rpc_task *task, void *data)
428 {
429 }
430 
431 static const struct rpc_call_ops rpc_default_ops = {
432 	.rpc_call_done = rpc_default_callback,
433 };
434 
435 /*
436  *	Export the signal mask handling for synchronous code that
437  *	sleeps on RPC calls
438  */
439 #define RPC_INTR_SIGNALS (sigmask(SIGHUP) | sigmask(SIGINT) | sigmask(SIGQUIT) | sigmask(SIGTERM))
440 
441 static void rpc_save_sigmask(sigset_t *oldset, int intr)
442 {
443 	unsigned long	sigallow = sigmask(SIGKILL);
444 	sigset_t sigmask;
445 
446 	/* Block all signals except those listed in sigallow */
447 	if (intr)
448 		sigallow |= RPC_INTR_SIGNALS;
449 	siginitsetinv(&sigmask, sigallow);
450 	sigprocmask(SIG_BLOCK, &sigmask, oldset);
451 }
452 
453 static inline void rpc_task_sigmask(struct rpc_task *task, sigset_t *oldset)
454 {
455 	rpc_save_sigmask(oldset, !RPC_TASK_UNINTERRUPTIBLE(task));
456 }
457 
458 static inline void rpc_restore_sigmask(sigset_t *oldset)
459 {
460 	sigprocmask(SIG_SETMASK, oldset, NULL);
461 }
462 
463 void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset)
464 {
465 	rpc_save_sigmask(oldset, clnt->cl_intr);
466 }
467 
468 void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset)
469 {
470 	rpc_restore_sigmask(oldset);
471 }
472 
473 /*
474  * New rpc_call implementation
475  */
476 int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags)
477 {
478 	struct rpc_task	*task;
479 	sigset_t	oldset;
480 	int		status;
481 
482 	/* If this client is slain all further I/O fails */
483 	if (clnt->cl_dead)
484 		return -EIO;
485 
486 	BUG_ON(flags & RPC_TASK_ASYNC);
487 
488 	task = rpc_new_task(clnt, flags, &rpc_default_ops, NULL);
489 	if (task == NULL)
490 		return -ENOMEM;
491 
492 	/* Mask signals on RPC calls _and_ GSS_AUTH upcalls */
493 	rpc_task_sigmask(task, &oldset);
494 
495 	/* Set up the call info struct and execute the task */
496 	rpc_call_setup(task, msg, 0);
497 	if (task->tk_status == 0) {
498 		atomic_inc(&task->tk_count);
499 		rpc_execute(task);
500 	}
501 	status = task->tk_status;
502 	rpc_put_task(task);
503 	rpc_restore_sigmask(&oldset);
504 	return status;
505 }
506 
507 /*
508  * New rpc_call implementation
509  */
510 int
511 rpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags,
512 	       const struct rpc_call_ops *tk_ops, void *data)
513 {
514 	struct rpc_task	*task;
515 	sigset_t	oldset;
516 	int		status;
517 
518 	/* If this client is slain all further I/O fails */
519 	status = -EIO;
520 	if (clnt->cl_dead)
521 		goto out_release;
522 
523 	flags |= RPC_TASK_ASYNC;
524 
525 	/* Create/initialize a new RPC task */
526 	status = -ENOMEM;
527 	if (!(task = rpc_new_task(clnt, flags, tk_ops, data)))
528 		goto out_release;
529 
530 	/* Mask signals on GSS_AUTH upcalls */
531 	rpc_task_sigmask(task, &oldset);
532 
533 	rpc_call_setup(task, msg, 0);
534 
535 	/* Set up the call info struct and execute the task */
536 	status = task->tk_status;
537 	if (status == 0)
538 		rpc_execute(task);
539 	else
540 		rpc_put_task(task);
541 
542 	rpc_restore_sigmask(&oldset);
543 	return status;
544 out_release:
545 	rpc_release_calldata(tk_ops, data);
546 	return status;
547 }
548 
549 
550 void
551 rpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags)
552 {
553 	task->tk_msg   = *msg;
554 	task->tk_flags |= flags;
555 	/* Bind the user cred */
556 	if (task->tk_msg.rpc_cred != NULL)
557 		rpcauth_holdcred(task);
558 	else
559 		rpcauth_bindcred(task);
560 
561 	if (task->tk_status == 0)
562 		task->tk_action = call_start;
563 	else
564 		task->tk_action = rpc_exit_task;
565 }
566 
567 /**
568  * rpc_peeraddr - extract remote peer address from clnt's xprt
569  * @clnt: RPC client structure
570  * @buf: target buffer
571  * @size: length of target buffer
572  *
573  * Returns the number of bytes that are actually in the stored address.
574  */
575 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
576 {
577 	size_t bytes;
578 	struct rpc_xprt *xprt = clnt->cl_xprt;
579 
580 	bytes = sizeof(xprt->addr);
581 	if (bytes > bufsize)
582 		bytes = bufsize;
583 	memcpy(buf, &clnt->cl_xprt->addr, bytes);
584 	return xprt->addrlen;
585 }
586 EXPORT_SYMBOL_GPL(rpc_peeraddr);
587 
588 /**
589  * rpc_peeraddr2str - return remote peer address in printable format
590  * @clnt: RPC client structure
591  * @format: address format
592  *
593  */
594 char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format)
595 {
596 	struct rpc_xprt *xprt = clnt->cl_xprt;
597 
598 	if (xprt->address_strings[format] != NULL)
599 		return xprt->address_strings[format];
600 	else
601 		return "unprintable";
602 }
603 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
604 
605 void
606 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
607 {
608 	struct rpc_xprt *xprt = clnt->cl_xprt;
609 	if (xprt->ops->set_buffer_size)
610 		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
611 }
612 
613 /*
614  * Return size of largest payload RPC client can support, in bytes
615  *
616  * For stream transports, this is one RPC record fragment (see RFC
617  * 1831), as we don't support multi-record requests yet.  For datagram
618  * transports, this is the size of an IP packet minus the IP, UDP, and
619  * RPC header sizes.
620  */
621 size_t rpc_max_payload(struct rpc_clnt *clnt)
622 {
623 	return clnt->cl_xprt->max_payload;
624 }
625 EXPORT_SYMBOL_GPL(rpc_max_payload);
626 
627 /**
628  * rpc_force_rebind - force transport to check that remote port is unchanged
629  * @clnt: client to rebind
630  *
631  */
632 void rpc_force_rebind(struct rpc_clnt *clnt)
633 {
634 	if (clnt->cl_autobind)
635 		xprt_clear_bound(clnt->cl_xprt);
636 }
637 EXPORT_SYMBOL_GPL(rpc_force_rebind);
638 
639 /*
640  * Restart an (async) RPC call. Usually called from within the
641  * exit handler.
642  */
643 void
644 rpc_restart_call(struct rpc_task *task)
645 {
646 	if (RPC_ASSASSINATED(task))
647 		return;
648 
649 	task->tk_action = call_start;
650 }
651 
652 /*
653  * 0.  Initial state
654  *
655  *     Other FSM states can be visited zero or more times, but
656  *     this state is visited exactly once for each RPC.
657  */
658 static void
659 call_start(struct rpc_task *task)
660 {
661 	struct rpc_clnt	*clnt = task->tk_client;
662 
663 	dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid,
664 			clnt->cl_protname, clnt->cl_vers,
665 			task->tk_msg.rpc_proc->p_proc,
666 			(RPC_IS_ASYNC(task) ? "async" : "sync"));
667 
668 	/* Increment call count */
669 	task->tk_msg.rpc_proc->p_count++;
670 	clnt->cl_stats->rpccnt++;
671 	task->tk_action = call_reserve;
672 }
673 
674 /*
675  * 1.	Reserve an RPC call slot
676  */
677 static void
678 call_reserve(struct rpc_task *task)
679 {
680 	dprint_status(task);
681 
682 	if (!rpcauth_uptodatecred(task)) {
683 		task->tk_action = call_refresh;
684 		return;
685 	}
686 
687 	task->tk_status  = 0;
688 	task->tk_action  = call_reserveresult;
689 	xprt_reserve(task);
690 }
691 
692 /*
693  * 1b.	Grok the result of xprt_reserve()
694  */
695 static void
696 call_reserveresult(struct rpc_task *task)
697 {
698 	int status = task->tk_status;
699 
700 	dprint_status(task);
701 
702 	/*
703 	 * After a call to xprt_reserve(), we must have either
704 	 * a request slot or else an error status.
705 	 */
706 	task->tk_status = 0;
707 	if (status >= 0) {
708 		if (task->tk_rqstp) {
709 			task->tk_action = call_allocate;
710 			return;
711 		}
712 
713 		printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
714 				__FUNCTION__, status);
715 		rpc_exit(task, -EIO);
716 		return;
717 	}
718 
719 	/*
720 	 * Even though there was an error, we may have acquired
721 	 * a request slot somehow.  Make sure not to leak it.
722 	 */
723 	if (task->tk_rqstp) {
724 		printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
725 				__FUNCTION__, status);
726 		xprt_release(task);
727 	}
728 
729 	switch (status) {
730 	case -EAGAIN:	/* woken up; retry */
731 		task->tk_action = call_reserve;
732 		return;
733 	case -EIO:	/* probably a shutdown */
734 		break;
735 	default:
736 		printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
737 				__FUNCTION__, status);
738 		break;
739 	}
740 	rpc_exit(task, status);
741 }
742 
743 /*
744  * 2.	Allocate the buffer. For details, see sched.c:rpc_malloc.
745  *	(Note: buffer memory is freed in xprt_release).
746  */
747 static void
748 call_allocate(struct rpc_task *task)
749 {
750 	struct rpc_rqst *req = task->tk_rqstp;
751 	struct rpc_xprt *xprt = task->tk_xprt;
752 	unsigned int	bufsiz;
753 
754 	dprint_status(task);
755 
756 	task->tk_action = call_bind;
757 	if (req->rq_buffer)
758 		return;
759 
760 	/* FIXME: compute buffer requirements more exactly using
761 	 * auth->au_wslack */
762 	bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
763 
764 	if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
765 		return;
766 
767 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
768 
769 	if (RPC_IS_ASYNC(task) || !signalled()) {
770 		xprt_release(task);
771 		task->tk_action = call_reserve;
772 		rpc_delay(task, HZ>>4);
773 		return;
774 	}
775 
776 	rpc_exit(task, -ERESTARTSYS);
777 }
778 
779 static inline int
780 rpc_task_need_encode(struct rpc_task *task)
781 {
782 	return task->tk_rqstp->rq_snd_buf.len == 0;
783 }
784 
785 static inline void
786 rpc_task_force_reencode(struct rpc_task *task)
787 {
788 	task->tk_rqstp->rq_snd_buf.len = 0;
789 }
790 
791 /*
792  * 3.	Encode arguments of an RPC call
793  */
794 static void
795 call_encode(struct rpc_task *task)
796 {
797 	struct rpc_rqst	*req = task->tk_rqstp;
798 	struct xdr_buf *sndbuf = &req->rq_snd_buf;
799 	struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
800 	unsigned int	bufsiz;
801 	kxdrproc_t	encode;
802 	__be32		*p;
803 
804 	dprint_status(task);
805 
806 	/* Default buffer setup */
807 	bufsiz = req->rq_bufsize >> 1;
808 	sndbuf->head[0].iov_base = (void *)req->rq_buffer;
809 	sndbuf->head[0].iov_len  = bufsiz;
810 	sndbuf->tail[0].iov_len  = 0;
811 	sndbuf->page_len	 = 0;
812 	sndbuf->len		 = 0;
813 	sndbuf->buflen		 = bufsiz;
814 	rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
815 	rcvbuf->head[0].iov_len  = bufsiz;
816 	rcvbuf->tail[0].iov_len  = 0;
817 	rcvbuf->page_len	 = 0;
818 	rcvbuf->len		 = 0;
819 	rcvbuf->buflen		 = bufsiz;
820 
821 	/* Encode header and provided arguments */
822 	encode = task->tk_msg.rpc_proc->p_encode;
823 	if (!(p = call_header(task))) {
824 		printk(KERN_INFO "RPC: call_header failed, exit EIO\n");
825 		rpc_exit(task, -EIO);
826 		return;
827 	}
828 	if (encode == NULL)
829 		return;
830 
831 	lock_kernel();
832 	task->tk_status = rpcauth_wrap_req(task, encode, req, p,
833 			task->tk_msg.rpc_argp);
834 	unlock_kernel();
835 	if (task->tk_status == -ENOMEM) {
836 		/* XXX: Is this sane? */
837 		rpc_delay(task, 3*HZ);
838 		task->tk_status = -EAGAIN;
839 	}
840 }
841 
842 /*
843  * 4.	Get the server port number if not yet set
844  */
845 static void
846 call_bind(struct rpc_task *task)
847 {
848 	struct rpc_xprt *xprt = task->tk_xprt;
849 
850 	dprint_status(task);
851 
852 	task->tk_action = call_connect;
853 	if (!xprt_bound(xprt)) {
854 		task->tk_action = call_bind_status;
855 		task->tk_timeout = xprt->bind_timeout;
856 		xprt->ops->rpcbind(task);
857 	}
858 }
859 
860 /*
861  * 4a.	Sort out bind result
862  */
863 static void
864 call_bind_status(struct rpc_task *task)
865 {
866 	int status = -EACCES;
867 
868 	if (task->tk_status >= 0) {
869 		dprint_status(task);
870 		task->tk_status = 0;
871 		task->tk_action = call_connect;
872 		return;
873 	}
874 
875 	switch (task->tk_status) {
876 	case -EACCES:
877 		dprintk("RPC: %5u remote rpcbind: RPC program/version "
878 				"unavailable\n", task->tk_pid);
879 		rpc_delay(task, 3*HZ);
880 		goto retry_timeout;
881 	case -ETIMEDOUT:
882 		dprintk("RPC: %5u rpcbind request timed out\n",
883 				task->tk_pid);
884 		goto retry_timeout;
885 	case -EPFNOSUPPORT:
886 		dprintk("RPC: %5u remote rpcbind service unavailable\n",
887 				task->tk_pid);
888 		break;
889 	case -EPROTONOSUPPORT:
890 		dprintk("RPC: %5u remote rpcbind version 2 unavailable\n",
891 				task->tk_pid);
892 		break;
893 	default:
894 		dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
895 				task->tk_pid, -task->tk_status);
896 		status = -EIO;
897 	}
898 
899 	rpc_exit(task, status);
900 	return;
901 
902 retry_timeout:
903 	task->tk_action = call_timeout;
904 }
905 
906 /*
907  * 4b.	Connect to the RPC server
908  */
909 static void
910 call_connect(struct rpc_task *task)
911 {
912 	struct rpc_xprt *xprt = task->tk_xprt;
913 
914 	dprintk("RPC: %5u call_connect xprt %p %s connected\n",
915 			task->tk_pid, xprt,
916 			(xprt_connected(xprt) ? "is" : "is not"));
917 
918 	task->tk_action = call_transmit;
919 	if (!xprt_connected(xprt)) {
920 		task->tk_action = call_connect_status;
921 		if (task->tk_status < 0)
922 			return;
923 		xprt_connect(task);
924 	}
925 }
926 
927 /*
928  * 4c.	Sort out connect result
929  */
930 static void
931 call_connect_status(struct rpc_task *task)
932 {
933 	struct rpc_clnt *clnt = task->tk_client;
934 	int status = task->tk_status;
935 
936 	dprint_status(task);
937 
938 	task->tk_status = 0;
939 	if (status >= 0) {
940 		clnt->cl_stats->netreconn++;
941 		task->tk_action = call_transmit;
942 		return;
943 	}
944 
945 	/* Something failed: remote service port may have changed */
946 	rpc_force_rebind(clnt);
947 
948 	switch (status) {
949 	case -ENOTCONN:
950 	case -EAGAIN:
951 		task->tk_action = call_bind;
952 		if (!RPC_IS_SOFT(task))
953 			return;
954 		/* if soft mounted, test if we've timed out */
955 	case -ETIMEDOUT:
956 		task->tk_action = call_timeout;
957 		return;
958 	}
959 	rpc_exit(task, -EIO);
960 }
961 
962 /*
963  * 5.	Transmit the RPC request, and wait for reply
964  */
965 static void
966 call_transmit(struct rpc_task *task)
967 {
968 	dprint_status(task);
969 
970 	task->tk_action = call_status;
971 	if (task->tk_status < 0)
972 		return;
973 	task->tk_status = xprt_prepare_transmit(task);
974 	if (task->tk_status != 0)
975 		return;
976 	task->tk_action = call_transmit_status;
977 	/* Encode here so that rpcsec_gss can use correct sequence number. */
978 	if (rpc_task_need_encode(task)) {
979 		BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
980 		call_encode(task);
981 		/* Did the encode result in an error condition? */
982 		if (task->tk_status != 0)
983 			return;
984 	}
985 	xprt_transmit(task);
986 	if (task->tk_status < 0)
987 		return;
988 	/*
989 	 * On success, ensure that we call xprt_end_transmit() before sleeping
990 	 * in order to allow access to the socket to other RPC requests.
991 	 */
992 	call_transmit_status(task);
993 	if (task->tk_msg.rpc_proc->p_decode != NULL)
994 		return;
995 	task->tk_action = rpc_exit_task;
996 	rpc_wake_up_task(task);
997 }
998 
999 /*
1000  * 5a.	Handle cleanup after a transmission
1001  */
1002 static void
1003 call_transmit_status(struct rpc_task *task)
1004 {
1005 	task->tk_action = call_status;
1006 	/*
1007 	 * Special case: if we've been waiting on the socket's write_space()
1008 	 * callback, then don't call xprt_end_transmit().
1009 	 */
1010 	if (task->tk_status == -EAGAIN)
1011 		return;
1012 	xprt_end_transmit(task);
1013 	rpc_task_force_reencode(task);
1014 }
1015 
1016 /*
1017  * 6.	Sort out the RPC call status
1018  */
1019 static void
1020 call_status(struct rpc_task *task)
1021 {
1022 	struct rpc_clnt	*clnt = task->tk_client;
1023 	struct rpc_rqst	*req = task->tk_rqstp;
1024 	int		status;
1025 
1026 	if (req->rq_received > 0 && !req->rq_bytes_sent)
1027 		task->tk_status = req->rq_received;
1028 
1029 	dprint_status(task);
1030 
1031 	status = task->tk_status;
1032 	if (status >= 0) {
1033 		task->tk_action = call_decode;
1034 		return;
1035 	}
1036 
1037 	task->tk_status = 0;
1038 	switch(status) {
1039 	case -EHOSTDOWN:
1040 	case -EHOSTUNREACH:
1041 	case -ENETUNREACH:
1042 		/*
1043 		 * Delay any retries for 3 seconds, then handle as if it
1044 		 * were a timeout.
1045 		 */
1046 		rpc_delay(task, 3*HZ);
1047 	case -ETIMEDOUT:
1048 		task->tk_action = call_timeout;
1049 		break;
1050 	case -ECONNREFUSED:
1051 	case -ENOTCONN:
1052 		rpc_force_rebind(clnt);
1053 		task->tk_action = call_bind;
1054 		break;
1055 	case -EAGAIN:
1056 		task->tk_action = call_transmit;
1057 		break;
1058 	case -EIO:
1059 		/* shutdown or soft timeout */
1060 		rpc_exit(task, status);
1061 		break;
1062 	default:
1063 		printk("%s: RPC call returned error %d\n",
1064 			       clnt->cl_protname, -status);
1065 		rpc_exit(task, status);
1066 	}
1067 }
1068 
1069 /*
1070  * 6a.	Handle RPC timeout
1071  * 	We do not release the request slot, so we keep using the
1072  *	same XID for all retransmits.
1073  */
1074 static void
1075 call_timeout(struct rpc_task *task)
1076 {
1077 	struct rpc_clnt	*clnt = task->tk_client;
1078 
1079 	if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1080 		dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1081 		goto retry;
1082 	}
1083 
1084 	dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1085 	task->tk_timeouts++;
1086 
1087 	if (RPC_IS_SOFT(task)) {
1088 		printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1089 				clnt->cl_protname, clnt->cl_server);
1090 		rpc_exit(task, -EIO);
1091 		return;
1092 	}
1093 
1094 	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1095 		task->tk_flags |= RPC_CALL_MAJORSEEN;
1096 		printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1097 			clnt->cl_protname, clnt->cl_server);
1098 	}
1099 	rpc_force_rebind(clnt);
1100 
1101 retry:
1102 	clnt->cl_stats->rpcretrans++;
1103 	task->tk_action = call_bind;
1104 	task->tk_status = 0;
1105 }
1106 
1107 /*
1108  * 7.	Decode the RPC reply
1109  */
1110 static void
1111 call_decode(struct rpc_task *task)
1112 {
1113 	struct rpc_clnt	*clnt = task->tk_client;
1114 	struct rpc_rqst	*req = task->tk_rqstp;
1115 	kxdrproc_t	decode = task->tk_msg.rpc_proc->p_decode;
1116 	__be32		*p;
1117 
1118 	dprintk("RPC: %5u call_decode (status %d)\n",
1119 			task->tk_pid, task->tk_status);
1120 
1121 	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1122 		printk(KERN_NOTICE "%s: server %s OK\n",
1123 			clnt->cl_protname, clnt->cl_server);
1124 		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1125 	}
1126 
1127 	if (task->tk_status < 12) {
1128 		if (!RPC_IS_SOFT(task)) {
1129 			task->tk_action = call_bind;
1130 			clnt->cl_stats->rpcretrans++;
1131 			goto out_retry;
1132 		}
1133 		dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1134 				clnt->cl_protname, task->tk_status);
1135 		task->tk_action = call_timeout;
1136 		goto out_retry;
1137 	}
1138 
1139 	/*
1140 	 * Ensure that we see all writes made by xprt_complete_rqst()
1141 	 * before it changed req->rq_received.
1142 	 */
1143 	smp_rmb();
1144 	req->rq_rcv_buf.len = req->rq_private_buf.len;
1145 
1146 	/* Check that the softirq receive buffer is valid */
1147 	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1148 				sizeof(req->rq_rcv_buf)) != 0);
1149 
1150 	/* Verify the RPC header */
1151 	p = call_verify(task);
1152 	if (IS_ERR(p)) {
1153 		if (p == ERR_PTR(-EAGAIN))
1154 			goto out_retry;
1155 		return;
1156 	}
1157 
1158 	task->tk_action = rpc_exit_task;
1159 
1160 	if (decode) {
1161 		lock_kernel();
1162 		task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1163 						      task->tk_msg.rpc_resp);
1164 		unlock_kernel();
1165 	}
1166 	dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1167 			task->tk_status);
1168 	return;
1169 out_retry:
1170 	req->rq_received = req->rq_private_buf.len = 0;
1171 	task->tk_status = 0;
1172 }
1173 
1174 /*
1175  * 8.	Refresh the credentials if rejected by the server
1176  */
1177 static void
1178 call_refresh(struct rpc_task *task)
1179 {
1180 	dprint_status(task);
1181 
1182 	xprt_release(task);	/* Must do to obtain new XID */
1183 	task->tk_action = call_refreshresult;
1184 	task->tk_status = 0;
1185 	task->tk_client->cl_stats->rpcauthrefresh++;
1186 	rpcauth_refreshcred(task);
1187 }
1188 
1189 /*
1190  * 8a.	Process the results of a credential refresh
1191  */
1192 static void
1193 call_refreshresult(struct rpc_task *task)
1194 {
1195 	int status = task->tk_status;
1196 
1197 	dprint_status(task);
1198 
1199 	task->tk_status = 0;
1200 	task->tk_action = call_reserve;
1201 	if (status >= 0 && rpcauth_uptodatecred(task))
1202 		return;
1203 	if (status == -EACCES) {
1204 		rpc_exit(task, -EACCES);
1205 		return;
1206 	}
1207 	task->tk_action = call_refresh;
1208 	if (status != -ETIMEDOUT)
1209 		rpc_delay(task, 3*HZ);
1210 	return;
1211 }
1212 
1213 /*
1214  * Call header serialization
1215  */
1216 static __be32 *
1217 call_header(struct rpc_task *task)
1218 {
1219 	struct rpc_clnt *clnt = task->tk_client;
1220 	struct rpc_rqst	*req = task->tk_rqstp;
1221 	__be32		*p = req->rq_svec[0].iov_base;
1222 
1223 	/* FIXME: check buffer size? */
1224 
1225 	p = xprt_skip_transport_header(task->tk_xprt, p);
1226 	*p++ = req->rq_xid;		/* XID */
1227 	*p++ = htonl(RPC_CALL);		/* CALL */
1228 	*p++ = htonl(RPC_VERSION);	/* RPC version */
1229 	*p++ = htonl(clnt->cl_prog);	/* program number */
1230 	*p++ = htonl(clnt->cl_vers);	/* program version */
1231 	*p++ = htonl(task->tk_msg.rpc_proc->p_proc);	/* procedure */
1232 	p = rpcauth_marshcred(task, p);
1233 	req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1234 	return p;
1235 }
1236 
1237 /*
1238  * Reply header verification
1239  */
1240 static __be32 *
1241 call_verify(struct rpc_task *task)
1242 {
1243 	struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1244 	int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1245 	__be32	*p = iov->iov_base;
1246 	u32 n;
1247 	int error = -EACCES;
1248 
1249 	if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1250 		/* RFC-1014 says that the representation of XDR data must be a
1251 		 * multiple of four bytes
1252 		 * - if it isn't pointer subtraction in the NFS client may give
1253 		 *   undefined results
1254 		 */
1255 		printk(KERN_WARNING
1256 		       "call_verify: XDR representation not a multiple of"
1257 		       " 4 bytes: 0x%x\n", task->tk_rqstp->rq_rcv_buf.len);
1258 		goto out_eio;
1259 	}
1260 	if ((len -= 3) < 0)
1261 		goto out_overflow;
1262 	p += 1;	/* skip XID */
1263 
1264 	if ((n = ntohl(*p++)) != RPC_REPLY) {
1265 		printk(KERN_WARNING "call_verify: not an RPC reply: %x\n", n);
1266 		goto out_garbage;
1267 	}
1268 	if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1269 		if (--len < 0)
1270 			goto out_overflow;
1271 		switch ((n = ntohl(*p++))) {
1272 			case RPC_AUTH_ERROR:
1273 				break;
1274 			case RPC_MISMATCH:
1275 				dprintk("RPC: %5u %s: RPC call version "
1276 						"mismatch!\n",
1277 						task->tk_pid, __FUNCTION__);
1278 				error = -EPROTONOSUPPORT;
1279 				goto out_err;
1280 			default:
1281 				dprintk("RPC: %5u %s: RPC call rejected, "
1282 						"unknown error: %x\n",
1283 						task->tk_pid, __FUNCTION__, n);
1284 				goto out_eio;
1285 		}
1286 		if (--len < 0)
1287 			goto out_overflow;
1288 		switch ((n = ntohl(*p++))) {
1289 		case RPC_AUTH_REJECTEDCRED:
1290 		case RPC_AUTH_REJECTEDVERF:
1291 		case RPCSEC_GSS_CREDPROBLEM:
1292 		case RPCSEC_GSS_CTXPROBLEM:
1293 			if (!task->tk_cred_retry)
1294 				break;
1295 			task->tk_cred_retry--;
1296 			dprintk("RPC: %5u %s: retry stale creds\n",
1297 					task->tk_pid, __FUNCTION__);
1298 			rpcauth_invalcred(task);
1299 			task->tk_action = call_refresh;
1300 			goto out_retry;
1301 		case RPC_AUTH_BADCRED:
1302 		case RPC_AUTH_BADVERF:
1303 			/* possibly garbled cred/verf? */
1304 			if (!task->tk_garb_retry)
1305 				break;
1306 			task->tk_garb_retry--;
1307 			dprintk("RPC: %5u %s: retry garbled creds\n",
1308 					task->tk_pid, __FUNCTION__);
1309 			task->tk_action = call_bind;
1310 			goto out_retry;
1311 		case RPC_AUTH_TOOWEAK:
1312 			printk(KERN_NOTICE "call_verify: server %s requires stronger "
1313 			       "authentication.\n", task->tk_client->cl_server);
1314 			break;
1315 		default:
1316 			printk(KERN_WARNING "call_verify: unknown auth error: %x\n", n);
1317 			error = -EIO;
1318 		}
1319 		dprintk("RPC: %5u %s: call rejected %d\n",
1320 				task->tk_pid, __FUNCTION__, n);
1321 		goto out_err;
1322 	}
1323 	if (!(p = rpcauth_checkverf(task, p))) {
1324 		printk(KERN_WARNING "call_verify: auth check failed\n");
1325 		goto out_garbage;		/* bad verifier, retry */
1326 	}
1327 	len = p - (__be32 *)iov->iov_base - 1;
1328 	if (len < 0)
1329 		goto out_overflow;
1330 	switch ((n = ntohl(*p++))) {
1331 	case RPC_SUCCESS:
1332 		return p;
1333 	case RPC_PROG_UNAVAIL:
1334 		dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
1335 				task->tk_pid, __FUNCTION__,
1336 				(unsigned int)task->tk_client->cl_prog,
1337 				task->tk_client->cl_server);
1338 		error = -EPFNOSUPPORT;
1339 		goto out_err;
1340 	case RPC_PROG_MISMATCH:
1341 		dprintk("RPC: %5u %s: program %u, version %u unsupported by "
1342 				"server %s\n", task->tk_pid, __FUNCTION__,
1343 				(unsigned int)task->tk_client->cl_prog,
1344 				(unsigned int)task->tk_client->cl_vers,
1345 				task->tk_client->cl_server);
1346 		error = -EPROTONOSUPPORT;
1347 		goto out_err;
1348 	case RPC_PROC_UNAVAIL:
1349 		dprintk("RPC: %5u %s: proc %p unsupported by program %u, "
1350 				"version %u on server %s\n",
1351 				task->tk_pid, __FUNCTION__,
1352 				task->tk_msg.rpc_proc,
1353 				task->tk_client->cl_prog,
1354 				task->tk_client->cl_vers,
1355 				task->tk_client->cl_server);
1356 		error = -EOPNOTSUPP;
1357 		goto out_err;
1358 	case RPC_GARBAGE_ARGS:
1359 		dprintk("RPC: %5u %s: server saw garbage\n",
1360 				task->tk_pid, __FUNCTION__);
1361 		break;			/* retry */
1362 	default:
1363 		printk(KERN_WARNING "call_verify: server accept status: %x\n", n);
1364 		/* Also retry */
1365 	}
1366 
1367 out_garbage:
1368 	task->tk_client->cl_stats->rpcgarbage++;
1369 	if (task->tk_garb_retry) {
1370 		task->tk_garb_retry--;
1371 		dprintk("RPC: %5u %s: retrying\n",
1372 				task->tk_pid, __FUNCTION__);
1373 		task->tk_action = call_bind;
1374 out_retry:
1375 		return ERR_PTR(-EAGAIN);
1376 	}
1377 	printk(KERN_WARNING "RPC %s: retry failed, exit EIO\n", __FUNCTION__);
1378 out_eio:
1379 	error = -EIO;
1380 out_err:
1381 	rpc_exit(task, error);
1382 	return ERR_PTR(error);
1383 out_overflow:
1384 	printk(KERN_WARNING "RPC %s: server reply was truncated.\n", __FUNCTION__);
1385 	goto out_garbage;
1386 }
1387 
1388 static int rpcproc_encode_null(void *rqstp, __be32 *data, void *obj)
1389 {
1390 	return 0;
1391 }
1392 
1393 static int rpcproc_decode_null(void *rqstp, __be32 *data, void *obj)
1394 {
1395 	return 0;
1396 }
1397 
1398 static struct rpc_procinfo rpcproc_null = {
1399 	.p_encode = rpcproc_encode_null,
1400 	.p_decode = rpcproc_decode_null,
1401 };
1402 
1403 int rpc_ping(struct rpc_clnt *clnt, int flags)
1404 {
1405 	struct rpc_message msg = {
1406 		.rpc_proc = &rpcproc_null,
1407 	};
1408 	int err;
1409 	msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
1410 	err = rpc_call_sync(clnt, &msg, flags);
1411 	put_rpccred(msg.rpc_cred);
1412 	return err;
1413 }
1414