xref: /openbmc/linux/net/sunrpc/clnt.c (revision 7e24a55b2122746c2eef192296fc84624354f895)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   *  linux/net/sunrpc/clnt.c
4   *
5   *  This file contains the high-level RPC interface.
6   *  It is modeled as a finite state machine to support both synchronous
7   *  and asynchronous requests.
8   *
9   *  -	RPC header generation and argument serialization.
10   *  -	Credential refresh.
11   *  -	TCP connect handling.
12   *  -	Retry of operation when it is suspected the operation failed because
13   *	of uid squashing on the server, or when the credentials were stale
14   *	and need to be refreshed, or when a packet was damaged in transit.
15   *	This may be have to be moved to the VFS layer.
16   *
17   *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18   *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19   */
20  
21  
22  #include <linux/module.h>
23  #include <linux/types.h>
24  #include <linux/kallsyms.h>
25  #include <linux/mm.h>
26  #include <linux/namei.h>
27  #include <linux/mount.h>
28  #include <linux/slab.h>
29  #include <linux/rcupdate.h>
30  #include <linux/utsname.h>
31  #include <linux/workqueue.h>
32  #include <linux/in.h>
33  #include <linux/in6.h>
34  #include <linux/un.h>
35  
36  #include <linux/sunrpc/clnt.h>
37  #include <linux/sunrpc/addr.h>
38  #include <linux/sunrpc/rpc_pipe_fs.h>
39  #include <linux/sunrpc/metrics.h>
40  #include <linux/sunrpc/bc_xprt.h>
41  #include <trace/events/sunrpc.h>
42  
43  #include "sunrpc.h"
44  #include "sysfs.h"
45  #include "netns.h"
46  
47  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48  # define RPCDBG_FACILITY	RPCDBG_CALL
49  #endif
50  
51  /*
52   * All RPC clients are linked into this list
53   */
54  
55  static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56  
57  
58  static void	call_start(struct rpc_task *task);
59  static void	call_reserve(struct rpc_task *task);
60  static void	call_reserveresult(struct rpc_task *task);
61  static void	call_allocate(struct rpc_task *task);
62  static void	call_encode(struct rpc_task *task);
63  static void	call_decode(struct rpc_task *task);
64  static void	call_bind(struct rpc_task *task);
65  static void	call_bind_status(struct rpc_task *task);
66  static void	call_transmit(struct rpc_task *task);
67  static void	call_status(struct rpc_task *task);
68  static void	call_transmit_status(struct rpc_task *task);
69  static void	call_refresh(struct rpc_task *task);
70  static void	call_refreshresult(struct rpc_task *task);
71  static void	call_connect(struct rpc_task *task);
72  static void	call_connect_status(struct rpc_task *task);
73  
74  static int	rpc_encode_header(struct rpc_task *task,
75  				  struct xdr_stream *xdr);
76  static int	rpc_decode_header(struct rpc_task *task,
77  				  struct xdr_stream *xdr);
78  static int	rpc_ping(struct rpc_clnt *clnt);
79  static int	rpc_ping_noreply(struct rpc_clnt *clnt);
80  static void	rpc_check_timeout(struct rpc_task *task);
81  
rpc_register_client(struct rpc_clnt * clnt)82  static void rpc_register_client(struct rpc_clnt *clnt)
83  {
84  	struct net *net = rpc_net_ns(clnt);
85  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
86  
87  	spin_lock(&sn->rpc_client_lock);
88  	list_add(&clnt->cl_clients, &sn->all_clients);
89  	spin_unlock(&sn->rpc_client_lock);
90  }
91  
rpc_unregister_client(struct rpc_clnt * clnt)92  static void rpc_unregister_client(struct rpc_clnt *clnt)
93  {
94  	struct net *net = rpc_net_ns(clnt);
95  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
96  
97  	spin_lock(&sn->rpc_client_lock);
98  	list_del(&clnt->cl_clients);
99  	spin_unlock(&sn->rpc_client_lock);
100  }
101  
__rpc_clnt_remove_pipedir(struct rpc_clnt * clnt)102  static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103  {
104  	rpc_remove_client_dir(clnt);
105  }
106  
rpc_clnt_remove_pipedir(struct rpc_clnt * clnt)107  static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
108  {
109  	struct net *net = rpc_net_ns(clnt);
110  	struct super_block *pipefs_sb;
111  
112  	pipefs_sb = rpc_get_sb_net(net);
113  	if (pipefs_sb) {
114  		if (pipefs_sb == clnt->pipefs_sb)
115  			__rpc_clnt_remove_pipedir(clnt);
116  		rpc_put_sb_net(net);
117  	}
118  }
119  
rpc_setup_pipedir_sb(struct super_block * sb,struct rpc_clnt * clnt)120  static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
121  				    struct rpc_clnt *clnt)
122  {
123  	static uint32_t clntid;
124  	const char *dir_name = clnt->cl_program->pipe_dir_name;
125  	char name[15];
126  	struct dentry *dir, *dentry;
127  
128  	dir = rpc_d_lookup_sb(sb, dir_name);
129  	if (dir == NULL) {
130  		pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
131  		return dir;
132  	}
133  	for (;;) {
134  		snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
135  		name[sizeof(name) - 1] = '\0';
136  		dentry = rpc_create_client_dir(dir, name, clnt);
137  		if (!IS_ERR(dentry))
138  			break;
139  		if (dentry == ERR_PTR(-EEXIST))
140  			continue;
141  		printk(KERN_INFO "RPC: Couldn't create pipefs entry"
142  				" %s/%s, error %ld\n",
143  				dir_name, name, PTR_ERR(dentry));
144  		break;
145  	}
146  	dput(dir);
147  	return dentry;
148  }
149  
150  static int
rpc_setup_pipedir(struct super_block * pipefs_sb,struct rpc_clnt * clnt)151  rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
152  {
153  	struct dentry *dentry;
154  
155  	clnt->pipefs_sb = pipefs_sb;
156  
157  	if (clnt->cl_program->pipe_dir_name != NULL) {
158  		dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
159  		if (IS_ERR(dentry))
160  			return PTR_ERR(dentry);
161  	}
162  	return 0;
163  }
164  
rpc_clnt_skip_event(struct rpc_clnt * clnt,unsigned long event)165  static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
166  {
167  	if (clnt->cl_program->pipe_dir_name == NULL)
168  		return 1;
169  
170  	switch (event) {
171  	case RPC_PIPEFS_MOUNT:
172  		if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
173  			return 1;
174  		if (refcount_read(&clnt->cl_count) == 0)
175  			return 1;
176  		break;
177  	case RPC_PIPEFS_UMOUNT:
178  		if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
179  			return 1;
180  		break;
181  	}
182  	return 0;
183  }
184  
__rpc_clnt_handle_event(struct rpc_clnt * clnt,unsigned long event,struct super_block * sb)185  static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
186  				   struct super_block *sb)
187  {
188  	struct dentry *dentry;
189  
190  	switch (event) {
191  	case RPC_PIPEFS_MOUNT:
192  		dentry = rpc_setup_pipedir_sb(sb, clnt);
193  		if (!dentry)
194  			return -ENOENT;
195  		if (IS_ERR(dentry))
196  			return PTR_ERR(dentry);
197  		break;
198  	case RPC_PIPEFS_UMOUNT:
199  		__rpc_clnt_remove_pipedir(clnt);
200  		break;
201  	default:
202  		printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
203  		return -ENOTSUPP;
204  	}
205  	return 0;
206  }
207  
__rpc_pipefs_event(struct rpc_clnt * clnt,unsigned long event,struct super_block * sb)208  static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
209  				struct super_block *sb)
210  {
211  	int error = 0;
212  
213  	for (;; clnt = clnt->cl_parent) {
214  		if (!rpc_clnt_skip_event(clnt, event))
215  			error = __rpc_clnt_handle_event(clnt, event, sb);
216  		if (error || clnt == clnt->cl_parent)
217  			break;
218  	}
219  	return error;
220  }
221  
rpc_get_client_for_event(struct net * net,int event)222  static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
223  {
224  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
225  	struct rpc_clnt *clnt;
226  
227  	spin_lock(&sn->rpc_client_lock);
228  	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
229  		if (rpc_clnt_skip_event(clnt, event))
230  			continue;
231  		spin_unlock(&sn->rpc_client_lock);
232  		return clnt;
233  	}
234  	spin_unlock(&sn->rpc_client_lock);
235  	return NULL;
236  }
237  
rpc_pipefs_event(struct notifier_block * nb,unsigned long event,void * ptr)238  static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
239  			    void *ptr)
240  {
241  	struct super_block *sb = ptr;
242  	struct rpc_clnt *clnt;
243  	int error = 0;
244  
245  	while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
246  		error = __rpc_pipefs_event(clnt, event, sb);
247  		if (error)
248  			break;
249  	}
250  	return error;
251  }
252  
253  static struct notifier_block rpc_clients_block = {
254  	.notifier_call	= rpc_pipefs_event,
255  	.priority	= SUNRPC_PIPEFS_RPC_PRIO,
256  };
257  
rpc_clients_notifier_register(void)258  int rpc_clients_notifier_register(void)
259  {
260  	return rpc_pipefs_notifier_register(&rpc_clients_block);
261  }
262  
rpc_clients_notifier_unregister(void)263  void rpc_clients_notifier_unregister(void)
264  {
265  	return rpc_pipefs_notifier_unregister(&rpc_clients_block);
266  }
267  
rpc_clnt_set_transport(struct rpc_clnt * clnt,struct rpc_xprt * xprt,const struct rpc_timeout * timeout)268  static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
269  		struct rpc_xprt *xprt,
270  		const struct rpc_timeout *timeout)
271  {
272  	struct rpc_xprt *old;
273  
274  	spin_lock(&clnt->cl_lock);
275  	old = rcu_dereference_protected(clnt->cl_xprt,
276  			lockdep_is_held(&clnt->cl_lock));
277  
278  	if (!xprt_bound(xprt))
279  		clnt->cl_autobind = 1;
280  
281  	clnt->cl_timeout = timeout;
282  	rcu_assign_pointer(clnt->cl_xprt, xprt);
283  	spin_unlock(&clnt->cl_lock);
284  
285  	return old;
286  }
287  
rpc_clnt_set_nodename(struct rpc_clnt * clnt,const char * nodename)288  static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
289  {
290  	clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
291  			nodename, sizeof(clnt->cl_nodename));
292  }
293  
rpc_client_register(struct rpc_clnt * clnt,rpc_authflavor_t pseudoflavor,const char * client_name)294  static int rpc_client_register(struct rpc_clnt *clnt,
295  			       rpc_authflavor_t pseudoflavor,
296  			       const char *client_name)
297  {
298  	struct rpc_auth_create_args auth_args = {
299  		.pseudoflavor = pseudoflavor,
300  		.target_name = client_name,
301  	};
302  	struct rpc_auth *auth;
303  	struct net *net = rpc_net_ns(clnt);
304  	struct super_block *pipefs_sb;
305  	int err;
306  
307  	rpc_clnt_debugfs_register(clnt);
308  
309  	pipefs_sb = rpc_get_sb_net(net);
310  	if (pipefs_sb) {
311  		err = rpc_setup_pipedir(pipefs_sb, clnt);
312  		if (err)
313  			goto out;
314  	}
315  
316  	rpc_register_client(clnt);
317  	if (pipefs_sb)
318  		rpc_put_sb_net(net);
319  
320  	auth = rpcauth_create(&auth_args, clnt);
321  	if (IS_ERR(auth)) {
322  		dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
323  				pseudoflavor);
324  		err = PTR_ERR(auth);
325  		goto err_auth;
326  	}
327  	return 0;
328  err_auth:
329  	pipefs_sb = rpc_get_sb_net(net);
330  	rpc_unregister_client(clnt);
331  	__rpc_clnt_remove_pipedir(clnt);
332  out:
333  	if (pipefs_sb)
334  		rpc_put_sb_net(net);
335  	rpc_sysfs_client_destroy(clnt);
336  	rpc_clnt_debugfs_unregister(clnt);
337  	return err;
338  }
339  
340  static DEFINE_IDA(rpc_clids);
341  
rpc_cleanup_clids(void)342  void rpc_cleanup_clids(void)
343  {
344  	ida_destroy(&rpc_clids);
345  }
346  
rpc_alloc_clid(struct rpc_clnt * clnt)347  static int rpc_alloc_clid(struct rpc_clnt *clnt)
348  {
349  	int clid;
350  
351  	clid = ida_alloc(&rpc_clids, GFP_KERNEL);
352  	if (clid < 0)
353  		return clid;
354  	clnt->cl_clid = clid;
355  	return 0;
356  }
357  
rpc_free_clid(struct rpc_clnt * clnt)358  static void rpc_free_clid(struct rpc_clnt *clnt)
359  {
360  	ida_free(&rpc_clids, clnt->cl_clid);
361  }
362  
rpc_new_client(const struct rpc_create_args * args,struct rpc_xprt_switch * xps,struct rpc_xprt * xprt,struct rpc_clnt * parent)363  static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
364  		struct rpc_xprt_switch *xps,
365  		struct rpc_xprt *xprt,
366  		struct rpc_clnt *parent)
367  {
368  	const struct rpc_program *program = args->program;
369  	const struct rpc_version *version;
370  	struct rpc_clnt *clnt = NULL;
371  	const struct rpc_timeout *timeout;
372  	const char *nodename = args->nodename;
373  	int err;
374  
375  	err = rpciod_up();
376  	if (err)
377  		goto out_no_rpciod;
378  
379  	err = -EINVAL;
380  	if (args->version >= program->nrvers)
381  		goto out_err;
382  	version = program->version[args->version];
383  	if (version == NULL)
384  		goto out_err;
385  
386  	err = -ENOMEM;
387  	clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
388  	if (!clnt)
389  		goto out_err;
390  	clnt->cl_parent = parent ? : clnt;
391  	clnt->cl_xprtsec = args->xprtsec;
392  
393  	err = rpc_alloc_clid(clnt);
394  	if (err)
395  		goto out_no_clid;
396  
397  	clnt->cl_cred	  = get_cred(args->cred);
398  	clnt->cl_procinfo = version->procs;
399  	clnt->cl_maxproc  = version->nrprocs;
400  	clnt->cl_prog     = args->prognumber ? : program->number;
401  	clnt->cl_vers     = version->number;
402  	clnt->cl_stats    = args->stats ? : program->stats;
403  	clnt->cl_metrics  = rpc_alloc_iostats(clnt);
404  	rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
405  	err = -ENOMEM;
406  	if (clnt->cl_metrics == NULL)
407  		goto out_no_stats;
408  	clnt->cl_program  = program;
409  	INIT_LIST_HEAD(&clnt->cl_tasks);
410  	spin_lock_init(&clnt->cl_lock);
411  
412  	timeout = xprt->timeout;
413  	if (args->timeout != NULL) {
414  		memcpy(&clnt->cl_timeout_default, args->timeout,
415  				sizeof(clnt->cl_timeout_default));
416  		timeout = &clnt->cl_timeout_default;
417  	}
418  
419  	rpc_clnt_set_transport(clnt, xprt, timeout);
420  	xprt->main = true;
421  	xprt_iter_init(&clnt->cl_xpi, xps);
422  	xprt_switch_put(xps);
423  
424  	clnt->cl_rtt = &clnt->cl_rtt_default;
425  	rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
426  
427  	refcount_set(&clnt->cl_count, 1);
428  
429  	if (nodename == NULL)
430  		nodename = utsname()->nodename;
431  	/* save the nodename */
432  	rpc_clnt_set_nodename(clnt, nodename);
433  
434  	rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
435  	err = rpc_client_register(clnt, args->authflavor, args->client_name);
436  	if (err)
437  		goto out_no_path;
438  	if (parent)
439  		refcount_inc(&parent->cl_count);
440  
441  	trace_rpc_clnt_new(clnt, xprt, args);
442  	return clnt;
443  
444  out_no_path:
445  	rpc_free_iostats(clnt->cl_metrics);
446  out_no_stats:
447  	put_cred(clnt->cl_cred);
448  	rpc_free_clid(clnt);
449  out_no_clid:
450  	kfree(clnt);
451  out_err:
452  	rpciod_down();
453  out_no_rpciod:
454  	xprt_switch_put(xps);
455  	xprt_put(xprt);
456  	trace_rpc_clnt_new_err(program->name, args->servername, err);
457  	return ERR_PTR(err);
458  }
459  
rpc_create_xprt(struct rpc_create_args * args,struct rpc_xprt * xprt)460  static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
461  					struct rpc_xprt *xprt)
462  {
463  	struct rpc_clnt *clnt = NULL;
464  	struct rpc_xprt_switch *xps;
465  
466  	if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
467  		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
468  		xps = args->bc_xprt->xpt_bc_xps;
469  		xprt_switch_get(xps);
470  	} else {
471  		xps = xprt_switch_alloc(xprt, GFP_KERNEL);
472  		if (xps == NULL) {
473  			xprt_put(xprt);
474  			return ERR_PTR(-ENOMEM);
475  		}
476  		if (xprt->bc_xprt) {
477  			xprt_switch_get(xps);
478  			xprt->bc_xprt->xpt_bc_xps = xps;
479  		}
480  	}
481  	clnt = rpc_new_client(args, xps, xprt, NULL);
482  	if (IS_ERR(clnt))
483  		return clnt;
484  
485  	if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
486  		int err = rpc_ping(clnt);
487  		if (err != 0) {
488  			rpc_shutdown_client(clnt);
489  			return ERR_PTR(err);
490  		}
491  	} else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
492  		int err = rpc_ping_noreply(clnt);
493  		if (err != 0) {
494  			rpc_shutdown_client(clnt);
495  			return ERR_PTR(err);
496  		}
497  	}
498  
499  	clnt->cl_softrtry = 1;
500  	if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
501  		clnt->cl_softrtry = 0;
502  		if (args->flags & RPC_CLNT_CREATE_SOFTERR)
503  			clnt->cl_softerr = 1;
504  	}
505  
506  	if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
507  		clnt->cl_autobind = 1;
508  	if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
509  		clnt->cl_noretranstimeo = 1;
510  	if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
511  		clnt->cl_discrtry = 1;
512  	if (!(args->flags & RPC_CLNT_CREATE_QUIET))
513  		clnt->cl_chatty = 1;
514  
515  	return clnt;
516  }
517  
518  /**
519   * rpc_create - create an RPC client and transport with one call
520   * @args: rpc_clnt create argument structure
521   *
522   * Creates and initializes an RPC transport and an RPC client.
523   *
524   * It can ping the server in order to determine if it is up, and to see if
525   * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
526   * this behavior so asynchronous tasks can also use rpc_create.
527   */
rpc_create(struct rpc_create_args * args)528  struct rpc_clnt *rpc_create(struct rpc_create_args *args)
529  {
530  	struct rpc_xprt *xprt;
531  	struct xprt_create xprtargs = {
532  		.net = args->net,
533  		.ident = args->protocol,
534  		.srcaddr = args->saddress,
535  		.dstaddr = args->address,
536  		.addrlen = args->addrsize,
537  		.servername = args->servername,
538  		.bc_xprt = args->bc_xprt,
539  		.xprtsec = args->xprtsec,
540  		.connect_timeout = args->connect_timeout,
541  		.reconnect_timeout = args->reconnect_timeout,
542  	};
543  	char servername[48];
544  	struct rpc_clnt *clnt;
545  	int i;
546  
547  	if (args->bc_xprt) {
548  		WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
549  		xprt = args->bc_xprt->xpt_bc_xprt;
550  		if (xprt) {
551  			xprt_get(xprt);
552  			return rpc_create_xprt(args, xprt);
553  		}
554  	}
555  
556  	if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
557  		xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
558  	if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
559  		xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
560  	/*
561  	 * If the caller chooses not to specify a hostname, whip
562  	 * up a string representation of the passed-in address.
563  	 */
564  	if (xprtargs.servername == NULL) {
565  		struct sockaddr_un *sun =
566  				(struct sockaddr_un *)args->address;
567  		struct sockaddr_in *sin =
568  				(struct sockaddr_in *)args->address;
569  		struct sockaddr_in6 *sin6 =
570  				(struct sockaddr_in6 *)args->address;
571  
572  		servername[0] = '\0';
573  		switch (args->address->sa_family) {
574  		case AF_LOCAL:
575  			if (sun->sun_path[0])
576  				snprintf(servername, sizeof(servername), "%s",
577  					 sun->sun_path);
578  			else
579  				snprintf(servername, sizeof(servername), "@%s",
580  					 sun->sun_path+1);
581  			break;
582  		case AF_INET:
583  			snprintf(servername, sizeof(servername), "%pI4",
584  				 &sin->sin_addr.s_addr);
585  			break;
586  		case AF_INET6:
587  			snprintf(servername, sizeof(servername), "%pI6",
588  				 &sin6->sin6_addr);
589  			break;
590  		default:
591  			/* caller wants default server name, but
592  			 * address family isn't recognized. */
593  			return ERR_PTR(-EINVAL);
594  		}
595  		xprtargs.servername = servername;
596  	}
597  
598  	xprt = xprt_create_transport(&xprtargs);
599  	if (IS_ERR(xprt))
600  		return (struct rpc_clnt *)xprt;
601  
602  	/*
603  	 * By default, kernel RPC client connects from a reserved port.
604  	 * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
605  	 * but it is always enabled for rpciod, which handles the connect
606  	 * operation.
607  	 */
608  	xprt->resvport = 1;
609  	if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
610  		xprt->resvport = 0;
611  	xprt->reuseport = 0;
612  	if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
613  		xprt->reuseport = 1;
614  
615  	clnt = rpc_create_xprt(args, xprt);
616  	if (IS_ERR(clnt) || args->nconnect <= 1)
617  		return clnt;
618  
619  	for (i = 0; i < args->nconnect - 1; i++) {
620  		if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
621  			break;
622  	}
623  	return clnt;
624  }
625  EXPORT_SYMBOL_GPL(rpc_create);
626  
627  /*
628   * This function clones the RPC client structure. It allows us to share the
629   * same transport while varying parameters such as the authentication
630   * flavour.
631   */
__rpc_clone_client(struct rpc_create_args * args,struct rpc_clnt * clnt)632  static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
633  					   struct rpc_clnt *clnt)
634  {
635  	struct rpc_xprt_switch *xps;
636  	struct rpc_xprt *xprt;
637  	struct rpc_clnt *new;
638  	int err;
639  
640  	err = -ENOMEM;
641  	rcu_read_lock();
642  	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
643  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
644  	rcu_read_unlock();
645  	if (xprt == NULL || xps == NULL) {
646  		xprt_put(xprt);
647  		xprt_switch_put(xps);
648  		goto out_err;
649  	}
650  	args->servername = xprt->servername;
651  	args->nodename = clnt->cl_nodename;
652  
653  	new = rpc_new_client(args, xps, xprt, clnt);
654  	if (IS_ERR(new))
655  		return new;
656  
657  	/* Turn off autobind on clones */
658  	new->cl_autobind = 0;
659  	new->cl_softrtry = clnt->cl_softrtry;
660  	new->cl_softerr = clnt->cl_softerr;
661  	new->cl_noretranstimeo = clnt->cl_noretranstimeo;
662  	new->cl_discrtry = clnt->cl_discrtry;
663  	new->cl_chatty = clnt->cl_chatty;
664  	new->cl_principal = clnt->cl_principal;
665  	new->cl_max_connect = clnt->cl_max_connect;
666  	return new;
667  
668  out_err:
669  	trace_rpc_clnt_clone_err(clnt, err);
670  	return ERR_PTR(err);
671  }
672  
673  /**
674   * rpc_clone_client - Clone an RPC client structure
675   *
676   * @clnt: RPC client whose parameters are copied
677   *
678   * Returns a fresh RPC client or an ERR_PTR.
679   */
rpc_clone_client(struct rpc_clnt * clnt)680  struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
681  {
682  	struct rpc_create_args args = {
683  		.program	= clnt->cl_program,
684  		.prognumber	= clnt->cl_prog,
685  		.version	= clnt->cl_vers,
686  		.authflavor	= clnt->cl_auth->au_flavor,
687  		.cred		= clnt->cl_cred,
688  		.stats		= clnt->cl_stats,
689  	};
690  	return __rpc_clone_client(&args, clnt);
691  }
692  EXPORT_SYMBOL_GPL(rpc_clone_client);
693  
694  /**
695   * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
696   *
697   * @clnt: RPC client whose parameters are copied
698   * @flavor: security flavor for new client
699   *
700   * Returns a fresh RPC client or an ERR_PTR.
701   */
702  struct rpc_clnt *
rpc_clone_client_set_auth(struct rpc_clnt * clnt,rpc_authflavor_t flavor)703  rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
704  {
705  	struct rpc_create_args args = {
706  		.program	= clnt->cl_program,
707  		.prognumber	= clnt->cl_prog,
708  		.version	= clnt->cl_vers,
709  		.authflavor	= flavor,
710  		.cred		= clnt->cl_cred,
711  		.stats		= clnt->cl_stats,
712  	};
713  	return __rpc_clone_client(&args, clnt);
714  }
715  EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
716  
717  /**
718   * rpc_switch_client_transport: switch the RPC transport on the fly
719   * @clnt: pointer to a struct rpc_clnt
720   * @args: pointer to the new transport arguments
721   * @timeout: pointer to the new timeout parameters
722   *
723   * This function allows the caller to switch the RPC transport for the
724   * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
725   * server, for instance.  It assumes that the caller has ensured that
726   * there are no active RPC tasks by using some form of locking.
727   *
728   * Returns zero if "clnt" is now using the new xprt.  Otherwise a
729   * negative errno is returned, and "clnt" continues to use the old
730   * xprt.
731   */
rpc_switch_client_transport(struct rpc_clnt * clnt,struct xprt_create * args,const struct rpc_timeout * timeout)732  int rpc_switch_client_transport(struct rpc_clnt *clnt,
733  		struct xprt_create *args,
734  		const struct rpc_timeout *timeout)
735  {
736  	const struct rpc_timeout *old_timeo;
737  	rpc_authflavor_t pseudoflavor;
738  	struct rpc_xprt_switch *xps, *oldxps;
739  	struct rpc_xprt *xprt, *old;
740  	struct rpc_clnt *parent;
741  	int err;
742  
743  	args->xprtsec = clnt->cl_xprtsec;
744  	xprt = xprt_create_transport(args);
745  	if (IS_ERR(xprt))
746  		return PTR_ERR(xprt);
747  
748  	xps = xprt_switch_alloc(xprt, GFP_KERNEL);
749  	if (xps == NULL) {
750  		xprt_put(xprt);
751  		return -ENOMEM;
752  	}
753  
754  	pseudoflavor = clnt->cl_auth->au_flavor;
755  
756  	old_timeo = clnt->cl_timeout;
757  	old = rpc_clnt_set_transport(clnt, xprt, timeout);
758  	oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
759  
760  	rpc_unregister_client(clnt);
761  	__rpc_clnt_remove_pipedir(clnt);
762  	rpc_sysfs_client_destroy(clnt);
763  	rpc_clnt_debugfs_unregister(clnt);
764  
765  	/*
766  	 * A new transport was created.  "clnt" therefore
767  	 * becomes the root of a new cl_parent tree.  clnt's
768  	 * children, if it has any, still point to the old xprt.
769  	 */
770  	parent = clnt->cl_parent;
771  	clnt->cl_parent = clnt;
772  
773  	/*
774  	 * The old rpc_auth cache cannot be re-used.  GSS
775  	 * contexts in particular are between a single
776  	 * client and server.
777  	 */
778  	err = rpc_client_register(clnt, pseudoflavor, NULL);
779  	if (err)
780  		goto out_revert;
781  
782  	synchronize_rcu();
783  	if (parent != clnt)
784  		rpc_release_client(parent);
785  	xprt_switch_put(oldxps);
786  	xprt_put(old);
787  	trace_rpc_clnt_replace_xprt(clnt);
788  	return 0;
789  
790  out_revert:
791  	xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
792  	rpc_clnt_set_transport(clnt, old, old_timeo);
793  	clnt->cl_parent = parent;
794  	rpc_client_register(clnt, pseudoflavor, NULL);
795  	xprt_switch_put(xps);
796  	xprt_put(xprt);
797  	trace_rpc_clnt_replace_xprt_err(clnt);
798  	return err;
799  }
800  EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
801  
802  static
_rpc_clnt_xprt_iter_init(struct rpc_clnt * clnt,struct rpc_xprt_iter * xpi,void func (struct rpc_xprt_iter * xpi,struct rpc_xprt_switch * xps))803  int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
804  			     void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
805  {
806  	struct rpc_xprt_switch *xps;
807  
808  	rcu_read_lock();
809  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
810  	rcu_read_unlock();
811  	if (xps == NULL)
812  		return -EAGAIN;
813  	func(xpi, xps);
814  	xprt_switch_put(xps);
815  	return 0;
816  }
817  
818  static
rpc_clnt_xprt_iter_init(struct rpc_clnt * clnt,struct rpc_xprt_iter * xpi)819  int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
820  {
821  	return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listall);
822  }
823  
824  static
rpc_clnt_xprt_iter_offline_init(struct rpc_clnt * clnt,struct rpc_xprt_iter * xpi)825  int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
826  				    struct rpc_xprt_iter *xpi)
827  {
828  	return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listoffline);
829  }
830  
831  /**
832   * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
833   * @clnt: pointer to client
834   * @fn: function to apply
835   * @data: void pointer to function data
836   *
837   * Iterates through the list of RPC transports currently attached to the
838   * client and applies the function fn(clnt, xprt, data).
839   *
840   * On error, the iteration stops, and the function returns the error value.
841   */
rpc_clnt_iterate_for_each_xprt(struct rpc_clnt * clnt,int (* fn)(struct rpc_clnt *,struct rpc_xprt *,void *),void * data)842  int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
843  		int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
844  		void *data)
845  {
846  	struct rpc_xprt_iter xpi;
847  	int ret;
848  
849  	ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
850  	if (ret)
851  		return ret;
852  	for (;;) {
853  		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
854  
855  		if (!xprt)
856  			break;
857  		ret = fn(clnt, xprt, data);
858  		xprt_put(xprt);
859  		if (ret < 0)
860  			break;
861  	}
862  	xprt_iter_destroy(&xpi);
863  	return ret;
864  }
865  EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
866  
867  /*
868   * Kill all tasks for the given client.
869   * XXX: kill their descendants as well?
870   */
rpc_killall_tasks(struct rpc_clnt * clnt)871  void rpc_killall_tasks(struct rpc_clnt *clnt)
872  {
873  	struct rpc_task	*rovr;
874  
875  
876  	if (list_empty(&clnt->cl_tasks))
877  		return;
878  
879  	/*
880  	 * Spin lock all_tasks to prevent changes...
881  	 */
882  	trace_rpc_clnt_killall(clnt);
883  	spin_lock(&clnt->cl_lock);
884  	list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
885  		rpc_signal_task(rovr);
886  	spin_unlock(&clnt->cl_lock);
887  }
888  EXPORT_SYMBOL_GPL(rpc_killall_tasks);
889  
890  /**
891   * rpc_cancel_tasks - try to cancel a set of RPC tasks
892   * @clnt: Pointer to RPC client
893   * @error: RPC task error value to set
894   * @fnmatch: Pointer to selector function
895   * @data: User data
896   *
897   * Uses @fnmatch to define a set of RPC tasks that are to be cancelled.
898   * The argument @error must be a negative error value.
899   */
rpc_cancel_tasks(struct rpc_clnt * clnt,int error,bool (* fnmatch)(const struct rpc_task *,const void *),const void * data)900  unsigned long rpc_cancel_tasks(struct rpc_clnt *clnt, int error,
901  			       bool (*fnmatch)(const struct rpc_task *,
902  					       const void *),
903  			       const void *data)
904  {
905  	struct rpc_task *task;
906  	unsigned long count = 0;
907  
908  	if (list_empty(&clnt->cl_tasks))
909  		return 0;
910  	/*
911  	 * Spin lock all_tasks to prevent changes...
912  	 */
913  	spin_lock(&clnt->cl_lock);
914  	list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
915  		if (!RPC_IS_ACTIVATED(task))
916  			continue;
917  		if (!fnmatch(task, data))
918  			continue;
919  		rpc_task_try_cancel(task, error);
920  		count++;
921  	}
922  	spin_unlock(&clnt->cl_lock);
923  	return count;
924  }
925  EXPORT_SYMBOL_GPL(rpc_cancel_tasks);
926  
rpc_clnt_disconnect_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * dummy)927  static int rpc_clnt_disconnect_xprt(struct rpc_clnt *clnt,
928  				    struct rpc_xprt *xprt, void *dummy)
929  {
930  	if (xprt_connected(xprt))
931  		xprt_force_disconnect(xprt);
932  	return 0;
933  }
934  
rpc_clnt_disconnect(struct rpc_clnt * clnt)935  void rpc_clnt_disconnect(struct rpc_clnt *clnt)
936  {
937  	rpc_clnt_iterate_for_each_xprt(clnt, rpc_clnt_disconnect_xprt, NULL);
938  }
939  EXPORT_SYMBOL_GPL(rpc_clnt_disconnect);
940  
941  /*
942   * Properly shut down an RPC client, terminating all outstanding
943   * requests.
944   */
rpc_shutdown_client(struct rpc_clnt * clnt)945  void rpc_shutdown_client(struct rpc_clnt *clnt)
946  {
947  	might_sleep();
948  
949  	trace_rpc_clnt_shutdown(clnt);
950  
951  	while (!list_empty(&clnt->cl_tasks)) {
952  		rpc_killall_tasks(clnt);
953  		wait_event_timeout(destroy_wait,
954  			list_empty(&clnt->cl_tasks), 1*HZ);
955  	}
956  
957  	rpc_release_client(clnt);
958  }
959  EXPORT_SYMBOL_GPL(rpc_shutdown_client);
960  
961  /*
962   * Free an RPC client
963   */
rpc_free_client_work(struct work_struct * work)964  static void rpc_free_client_work(struct work_struct *work)
965  {
966  	struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
967  
968  	trace_rpc_clnt_free(clnt);
969  
970  	/* These might block on processes that might allocate memory,
971  	 * so they cannot be called in rpciod, so they are handled separately
972  	 * here.
973  	 */
974  	rpc_sysfs_client_destroy(clnt);
975  	rpc_clnt_debugfs_unregister(clnt);
976  	rpc_free_clid(clnt);
977  	rpc_clnt_remove_pipedir(clnt);
978  	xprt_put(rcu_dereference_raw(clnt->cl_xprt));
979  
980  	kfree(clnt);
981  	rpciod_down();
982  }
983  static struct rpc_clnt *
rpc_free_client(struct rpc_clnt * clnt)984  rpc_free_client(struct rpc_clnt *clnt)
985  {
986  	struct rpc_clnt *parent = NULL;
987  
988  	trace_rpc_clnt_release(clnt);
989  	if (clnt->cl_parent != clnt)
990  		parent = clnt->cl_parent;
991  	rpc_unregister_client(clnt);
992  	rpc_free_iostats(clnt->cl_metrics);
993  	clnt->cl_metrics = NULL;
994  	xprt_iter_destroy(&clnt->cl_xpi);
995  	put_cred(clnt->cl_cred);
996  
997  	INIT_WORK(&clnt->cl_work, rpc_free_client_work);
998  	schedule_work(&clnt->cl_work);
999  	return parent;
1000  }
1001  
1002  /*
1003   * Free an RPC client
1004   */
1005  static struct rpc_clnt *
rpc_free_auth(struct rpc_clnt * clnt)1006  rpc_free_auth(struct rpc_clnt *clnt)
1007  {
1008  	/*
1009  	 * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
1010  	 *       release remaining GSS contexts. This mechanism ensures
1011  	 *       that it can do so safely.
1012  	 */
1013  	if (clnt->cl_auth != NULL) {
1014  		rpcauth_release(clnt->cl_auth);
1015  		clnt->cl_auth = NULL;
1016  	}
1017  	if (refcount_dec_and_test(&clnt->cl_count))
1018  		return rpc_free_client(clnt);
1019  	return NULL;
1020  }
1021  
1022  /*
1023   * Release reference to the RPC client
1024   */
1025  void
rpc_release_client(struct rpc_clnt * clnt)1026  rpc_release_client(struct rpc_clnt *clnt)
1027  {
1028  	do {
1029  		if (list_empty(&clnt->cl_tasks))
1030  			wake_up(&destroy_wait);
1031  		if (refcount_dec_not_one(&clnt->cl_count))
1032  			break;
1033  		clnt = rpc_free_auth(clnt);
1034  	} while (clnt != NULL);
1035  }
1036  EXPORT_SYMBOL_GPL(rpc_release_client);
1037  
1038  /**
1039   * rpc_bind_new_program - bind a new RPC program to an existing client
1040   * @old: old rpc_client
1041   * @program: rpc program to set
1042   * @vers: rpc program version
1043   *
1044   * Clones the rpc client and sets up a new RPC program. This is mainly
1045   * of use for enabling different RPC programs to share the same transport.
1046   * The Sun NFSv2/v3 ACL protocol can do this.
1047   */
rpc_bind_new_program(struct rpc_clnt * old,const struct rpc_program * program,u32 vers)1048  struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
1049  				      const struct rpc_program *program,
1050  				      u32 vers)
1051  {
1052  	struct rpc_create_args args = {
1053  		.program	= program,
1054  		.prognumber	= program->number,
1055  		.version	= vers,
1056  		.authflavor	= old->cl_auth->au_flavor,
1057  		.cred		= old->cl_cred,
1058  		.stats		= old->cl_stats,
1059  		.timeout	= old->cl_timeout,
1060  	};
1061  	struct rpc_clnt *clnt;
1062  	int err;
1063  
1064  	clnt = __rpc_clone_client(&args, old);
1065  	if (IS_ERR(clnt))
1066  		goto out;
1067  	err = rpc_ping(clnt);
1068  	if (err != 0) {
1069  		rpc_shutdown_client(clnt);
1070  		clnt = ERR_PTR(err);
1071  	}
1072  out:
1073  	return clnt;
1074  }
1075  EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1076  
1077  struct rpc_xprt *
rpc_task_get_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)1078  rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1079  {
1080  	struct rpc_xprt_switch *xps;
1081  
1082  	if (!xprt)
1083  		return NULL;
1084  	rcu_read_lock();
1085  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1086  	atomic_long_inc(&xps->xps_queuelen);
1087  	rcu_read_unlock();
1088  	atomic_long_inc(&xprt->queuelen);
1089  
1090  	return xprt;
1091  }
1092  
1093  static void
rpc_task_release_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)1094  rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1095  {
1096  	struct rpc_xprt_switch *xps;
1097  
1098  	atomic_long_dec(&xprt->queuelen);
1099  	rcu_read_lock();
1100  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1101  	atomic_long_dec(&xps->xps_queuelen);
1102  	rcu_read_unlock();
1103  
1104  	xprt_put(xprt);
1105  }
1106  
rpc_task_release_transport(struct rpc_task * task)1107  void rpc_task_release_transport(struct rpc_task *task)
1108  {
1109  	struct rpc_xprt *xprt = task->tk_xprt;
1110  
1111  	if (xprt) {
1112  		task->tk_xprt = NULL;
1113  		if (task->tk_client)
1114  			rpc_task_release_xprt(task->tk_client, xprt);
1115  		else
1116  			xprt_put(xprt);
1117  	}
1118  }
1119  EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1120  
rpc_task_release_client(struct rpc_task * task)1121  void rpc_task_release_client(struct rpc_task *task)
1122  {
1123  	struct rpc_clnt *clnt = task->tk_client;
1124  
1125  	rpc_task_release_transport(task);
1126  	if (clnt != NULL) {
1127  		/* Remove from client task list */
1128  		spin_lock(&clnt->cl_lock);
1129  		list_del(&task->tk_task);
1130  		spin_unlock(&clnt->cl_lock);
1131  		task->tk_client = NULL;
1132  
1133  		rpc_release_client(clnt);
1134  	}
1135  }
1136  
1137  static struct rpc_xprt *
rpc_task_get_first_xprt(struct rpc_clnt * clnt)1138  rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1139  {
1140  	struct rpc_xprt *xprt;
1141  
1142  	rcu_read_lock();
1143  	xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1144  	rcu_read_unlock();
1145  	return rpc_task_get_xprt(clnt, xprt);
1146  }
1147  
1148  static struct rpc_xprt *
rpc_task_get_next_xprt(struct rpc_clnt * clnt)1149  rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1150  {
1151  	return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1152  }
1153  
1154  static
rpc_task_set_transport(struct rpc_task * task,struct rpc_clnt * clnt)1155  void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1156  {
1157  	if (task->tk_xprt) {
1158  		if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1159  		      (task->tk_flags & RPC_TASK_MOVEABLE)))
1160  			return;
1161  		xprt_release(task);
1162  		xprt_put(task->tk_xprt);
1163  	}
1164  	if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1165  		task->tk_xprt = rpc_task_get_first_xprt(clnt);
1166  	else
1167  		task->tk_xprt = rpc_task_get_next_xprt(clnt);
1168  }
1169  
1170  static
rpc_task_set_client(struct rpc_task * task,struct rpc_clnt * clnt)1171  void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1172  {
1173  	rpc_task_set_transport(task, clnt);
1174  	task->tk_client = clnt;
1175  	refcount_inc(&clnt->cl_count);
1176  	if (clnt->cl_softrtry)
1177  		task->tk_flags |= RPC_TASK_SOFT;
1178  	if (clnt->cl_softerr)
1179  		task->tk_flags |= RPC_TASK_TIMEOUT;
1180  	if (clnt->cl_noretranstimeo)
1181  		task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1182  	/* Add to the client's list of all tasks */
1183  	spin_lock(&clnt->cl_lock);
1184  	list_add_tail(&task->tk_task, &clnt->cl_tasks);
1185  	spin_unlock(&clnt->cl_lock);
1186  }
1187  
1188  static void
rpc_task_set_rpc_message(struct rpc_task * task,const struct rpc_message * msg)1189  rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1190  {
1191  	if (msg != NULL) {
1192  		task->tk_msg.rpc_proc = msg->rpc_proc;
1193  		task->tk_msg.rpc_argp = msg->rpc_argp;
1194  		task->tk_msg.rpc_resp = msg->rpc_resp;
1195  		task->tk_msg.rpc_cred = msg->rpc_cred;
1196  		if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1197  			get_cred(task->tk_msg.rpc_cred);
1198  	}
1199  }
1200  
1201  /*
1202   * Default callback for async RPC calls
1203   */
1204  static void
rpc_default_callback(struct rpc_task * task,void * data)1205  rpc_default_callback(struct rpc_task *task, void *data)
1206  {
1207  }
1208  
1209  static const struct rpc_call_ops rpc_default_ops = {
1210  	.rpc_call_done = rpc_default_callback,
1211  };
1212  
1213  /**
1214   * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1215   * @task_setup_data: pointer to task initialisation data
1216   */
rpc_run_task(const struct rpc_task_setup * task_setup_data)1217  struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1218  {
1219  	struct rpc_task *task;
1220  
1221  	task = rpc_new_task(task_setup_data);
1222  	if (IS_ERR(task))
1223  		return task;
1224  
1225  	if (!RPC_IS_ASYNC(task))
1226  		task->tk_flags |= RPC_TASK_CRED_NOREF;
1227  
1228  	rpc_task_set_client(task, task_setup_data->rpc_client);
1229  	rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1230  
1231  	if (task->tk_action == NULL)
1232  		rpc_call_start(task);
1233  
1234  	atomic_inc(&task->tk_count);
1235  	rpc_execute(task);
1236  	return task;
1237  }
1238  EXPORT_SYMBOL_GPL(rpc_run_task);
1239  
1240  /**
1241   * rpc_call_sync - Perform a synchronous RPC call
1242   * @clnt: pointer to RPC client
1243   * @msg: RPC call parameters
1244   * @flags: RPC call flags
1245   */
rpc_call_sync(struct rpc_clnt * clnt,const struct rpc_message * msg,int flags)1246  int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1247  {
1248  	struct rpc_task	*task;
1249  	struct rpc_task_setup task_setup_data = {
1250  		.rpc_client = clnt,
1251  		.rpc_message = msg,
1252  		.callback_ops = &rpc_default_ops,
1253  		.flags = flags,
1254  	};
1255  	int status;
1256  
1257  	WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1258  	if (flags & RPC_TASK_ASYNC) {
1259  		rpc_release_calldata(task_setup_data.callback_ops,
1260  			task_setup_data.callback_data);
1261  		return -EINVAL;
1262  	}
1263  
1264  	task = rpc_run_task(&task_setup_data);
1265  	if (IS_ERR(task))
1266  		return PTR_ERR(task);
1267  	status = task->tk_status;
1268  	rpc_put_task(task);
1269  	return status;
1270  }
1271  EXPORT_SYMBOL_GPL(rpc_call_sync);
1272  
1273  /**
1274   * rpc_call_async - Perform an asynchronous RPC call
1275   * @clnt: pointer to RPC client
1276   * @msg: RPC call parameters
1277   * @flags: RPC call flags
1278   * @tk_ops: RPC call ops
1279   * @data: user call data
1280   */
1281  int
rpc_call_async(struct rpc_clnt * clnt,const struct rpc_message * msg,int flags,const struct rpc_call_ops * tk_ops,void * data)1282  rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1283  	       const struct rpc_call_ops *tk_ops, void *data)
1284  {
1285  	struct rpc_task	*task;
1286  	struct rpc_task_setup task_setup_data = {
1287  		.rpc_client = clnt,
1288  		.rpc_message = msg,
1289  		.callback_ops = tk_ops,
1290  		.callback_data = data,
1291  		.flags = flags|RPC_TASK_ASYNC,
1292  	};
1293  
1294  	task = rpc_run_task(&task_setup_data);
1295  	if (IS_ERR(task))
1296  		return PTR_ERR(task);
1297  	rpc_put_task(task);
1298  	return 0;
1299  }
1300  EXPORT_SYMBOL_GPL(rpc_call_async);
1301  
1302  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1303  static void call_bc_encode(struct rpc_task *task);
1304  
1305  /**
1306   * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1307   * rpc_execute against it
1308   * @req: RPC request
1309   */
rpc_run_bc_task(struct rpc_rqst * req)1310  struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1311  {
1312  	struct rpc_task *task;
1313  	struct rpc_task_setup task_setup_data = {
1314  		.callback_ops = &rpc_default_ops,
1315  		.flags = RPC_TASK_SOFTCONN |
1316  			RPC_TASK_NO_RETRANS_TIMEOUT,
1317  	};
1318  
1319  	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1320  	/*
1321  	 * Create an rpc_task to send the data
1322  	 */
1323  	task = rpc_new_task(&task_setup_data);
1324  	if (IS_ERR(task)) {
1325  		xprt_free_bc_request(req);
1326  		return task;
1327  	}
1328  
1329  	xprt_init_bc_request(req, task);
1330  
1331  	task->tk_action = call_bc_encode;
1332  	atomic_inc(&task->tk_count);
1333  	WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1334  	rpc_execute(task);
1335  
1336  	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1337  	return task;
1338  }
1339  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1340  
1341  /**
1342   * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1343   * @req: RPC request to prepare
1344   * @pages: vector of struct page pointers
1345   * @base: offset in first page where receive should start, in bytes
1346   * @len: expected size of the upper layer data payload, in bytes
1347   * @hdrsize: expected size of upper layer reply header, in XDR words
1348   *
1349   */
rpc_prepare_reply_pages(struct rpc_rqst * req,struct page ** pages,unsigned int base,unsigned int len,unsigned int hdrsize)1350  void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1351  			     unsigned int base, unsigned int len,
1352  			     unsigned int hdrsize)
1353  {
1354  	hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1355  
1356  	xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1357  	trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1358  }
1359  EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1360  
1361  void
rpc_call_start(struct rpc_task * task)1362  rpc_call_start(struct rpc_task *task)
1363  {
1364  	task->tk_action = call_start;
1365  }
1366  EXPORT_SYMBOL_GPL(rpc_call_start);
1367  
1368  /**
1369   * rpc_peeraddr - extract remote peer address from clnt's xprt
1370   * @clnt: RPC client structure
1371   * @buf: target buffer
1372   * @bufsize: length of target buffer
1373   *
1374   * Returns the number of bytes that are actually in the stored address.
1375   */
rpc_peeraddr(struct rpc_clnt * clnt,struct sockaddr * buf,size_t bufsize)1376  size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1377  {
1378  	size_t bytes;
1379  	struct rpc_xprt *xprt;
1380  
1381  	rcu_read_lock();
1382  	xprt = rcu_dereference(clnt->cl_xprt);
1383  
1384  	bytes = xprt->addrlen;
1385  	if (bytes > bufsize)
1386  		bytes = bufsize;
1387  	memcpy(buf, &xprt->addr, bytes);
1388  	rcu_read_unlock();
1389  
1390  	return bytes;
1391  }
1392  EXPORT_SYMBOL_GPL(rpc_peeraddr);
1393  
1394  /**
1395   * rpc_peeraddr2str - return remote peer address in printable format
1396   * @clnt: RPC client structure
1397   * @format: address format
1398   *
1399   * NB: the lifetime of the memory referenced by the returned pointer is
1400   * the same as the rpc_xprt itself.  As long as the caller uses this
1401   * pointer, it must hold the RCU read lock.
1402   */
rpc_peeraddr2str(struct rpc_clnt * clnt,enum rpc_display_format_t format)1403  const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1404  			     enum rpc_display_format_t format)
1405  {
1406  	struct rpc_xprt *xprt;
1407  
1408  	xprt = rcu_dereference(clnt->cl_xprt);
1409  
1410  	if (xprt->address_strings[format] != NULL)
1411  		return xprt->address_strings[format];
1412  	else
1413  		return "unprintable";
1414  }
1415  EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1416  
1417  static const struct sockaddr_in rpc_inaddr_loopback = {
1418  	.sin_family		= AF_INET,
1419  	.sin_addr.s_addr	= htonl(INADDR_ANY),
1420  };
1421  
1422  static const struct sockaddr_in6 rpc_in6addr_loopback = {
1423  	.sin6_family		= AF_INET6,
1424  	.sin6_addr		= IN6ADDR_ANY_INIT,
1425  };
1426  
1427  /*
1428   * Try a getsockname() on a connected datagram socket.  Using a
1429   * connected datagram socket prevents leaving a socket in TIME_WAIT.
1430   * This conserves the ephemeral port number space.
1431   *
1432   * Returns zero and fills in "buf" if successful; otherwise, a
1433   * negative errno is returned.
1434   */
rpc_sockname(struct net * net,struct sockaddr * sap,size_t salen,struct sockaddr * buf)1435  static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1436  			struct sockaddr *buf)
1437  {
1438  	struct socket *sock;
1439  	int err;
1440  
1441  	err = __sock_create(net, sap->sa_family,
1442  				SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1443  	if (err < 0) {
1444  		dprintk("RPC:       can't create UDP socket (%d)\n", err);
1445  		goto out;
1446  	}
1447  
1448  	switch (sap->sa_family) {
1449  	case AF_INET:
1450  		err = kernel_bind(sock,
1451  				(struct sockaddr *)&rpc_inaddr_loopback,
1452  				sizeof(rpc_inaddr_loopback));
1453  		break;
1454  	case AF_INET6:
1455  		err = kernel_bind(sock,
1456  				(struct sockaddr *)&rpc_in6addr_loopback,
1457  				sizeof(rpc_in6addr_loopback));
1458  		break;
1459  	default:
1460  		err = -EAFNOSUPPORT;
1461  		goto out_release;
1462  	}
1463  	if (err < 0) {
1464  		dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1465  		goto out_release;
1466  	}
1467  
1468  	err = kernel_connect(sock, sap, salen, 0);
1469  	if (err < 0) {
1470  		dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1471  		goto out_release;
1472  	}
1473  
1474  	err = kernel_getsockname(sock, buf);
1475  	if (err < 0) {
1476  		dprintk("RPC:       getsockname failed (%d)\n", err);
1477  		goto out_release;
1478  	}
1479  
1480  	err = 0;
1481  	if (buf->sa_family == AF_INET6) {
1482  		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1483  		sin6->sin6_scope_id = 0;
1484  	}
1485  	dprintk("RPC:       %s succeeded\n", __func__);
1486  
1487  out_release:
1488  	sock_release(sock);
1489  out:
1490  	return err;
1491  }
1492  
1493  /*
1494   * Scraping a connected socket failed, so we don't have a useable
1495   * local address.  Fallback: generate an address that will prevent
1496   * the server from calling us back.
1497   *
1498   * Returns zero and fills in "buf" if successful; otherwise, a
1499   * negative errno is returned.
1500   */
rpc_anyaddr(int family,struct sockaddr * buf,size_t buflen)1501  static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1502  {
1503  	switch (family) {
1504  	case AF_INET:
1505  		if (buflen < sizeof(rpc_inaddr_loopback))
1506  			return -EINVAL;
1507  		memcpy(buf, &rpc_inaddr_loopback,
1508  				sizeof(rpc_inaddr_loopback));
1509  		break;
1510  	case AF_INET6:
1511  		if (buflen < sizeof(rpc_in6addr_loopback))
1512  			return -EINVAL;
1513  		memcpy(buf, &rpc_in6addr_loopback,
1514  				sizeof(rpc_in6addr_loopback));
1515  		break;
1516  	default:
1517  		dprintk("RPC:       %s: address family not supported\n",
1518  			__func__);
1519  		return -EAFNOSUPPORT;
1520  	}
1521  	dprintk("RPC:       %s: succeeded\n", __func__);
1522  	return 0;
1523  }
1524  
1525  /**
1526   * rpc_localaddr - discover local endpoint address for an RPC client
1527   * @clnt: RPC client structure
1528   * @buf: target buffer
1529   * @buflen: size of target buffer, in bytes
1530   *
1531   * Returns zero and fills in "buf" and "buflen" if successful;
1532   * otherwise, a negative errno is returned.
1533   *
1534   * This works even if the underlying transport is not currently connected,
1535   * or if the upper layer never previously provided a source address.
1536   *
1537   * The result of this function call is transient: multiple calls in
1538   * succession may give different results, depending on how local
1539   * networking configuration changes over time.
1540   */
rpc_localaddr(struct rpc_clnt * clnt,struct sockaddr * buf,size_t buflen)1541  int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1542  {
1543  	struct sockaddr_storage address;
1544  	struct sockaddr *sap = (struct sockaddr *)&address;
1545  	struct rpc_xprt *xprt;
1546  	struct net *net;
1547  	size_t salen;
1548  	int err;
1549  
1550  	rcu_read_lock();
1551  	xprt = rcu_dereference(clnt->cl_xprt);
1552  	salen = xprt->addrlen;
1553  	memcpy(sap, &xprt->addr, salen);
1554  	net = get_net(xprt->xprt_net);
1555  	rcu_read_unlock();
1556  
1557  	rpc_set_port(sap, 0);
1558  	err = rpc_sockname(net, sap, salen, buf);
1559  	put_net(net);
1560  	if (err != 0)
1561  		/* Couldn't discover local address, return ANYADDR */
1562  		return rpc_anyaddr(sap->sa_family, buf, buflen);
1563  	return 0;
1564  }
1565  EXPORT_SYMBOL_GPL(rpc_localaddr);
1566  
1567  void
rpc_setbufsize(struct rpc_clnt * clnt,unsigned int sndsize,unsigned int rcvsize)1568  rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1569  {
1570  	struct rpc_xprt *xprt;
1571  
1572  	rcu_read_lock();
1573  	xprt = rcu_dereference(clnt->cl_xprt);
1574  	if (xprt->ops->set_buffer_size)
1575  		xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1576  	rcu_read_unlock();
1577  }
1578  EXPORT_SYMBOL_GPL(rpc_setbufsize);
1579  
1580  /**
1581   * rpc_net_ns - Get the network namespace for this RPC client
1582   * @clnt: RPC client to query
1583   *
1584   */
rpc_net_ns(struct rpc_clnt * clnt)1585  struct net *rpc_net_ns(struct rpc_clnt *clnt)
1586  {
1587  	struct net *ret;
1588  
1589  	rcu_read_lock();
1590  	ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1591  	rcu_read_unlock();
1592  	return ret;
1593  }
1594  EXPORT_SYMBOL_GPL(rpc_net_ns);
1595  
1596  /**
1597   * rpc_max_payload - Get maximum payload size for a transport, in bytes
1598   * @clnt: RPC client to query
1599   *
1600   * For stream transports, this is one RPC record fragment (see RFC
1601   * 1831), as we don't support multi-record requests yet.  For datagram
1602   * transports, this is the size of an IP packet minus the IP, UDP, and
1603   * RPC header sizes.
1604   */
rpc_max_payload(struct rpc_clnt * clnt)1605  size_t rpc_max_payload(struct rpc_clnt *clnt)
1606  {
1607  	size_t ret;
1608  
1609  	rcu_read_lock();
1610  	ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1611  	rcu_read_unlock();
1612  	return ret;
1613  }
1614  EXPORT_SYMBOL_GPL(rpc_max_payload);
1615  
1616  /**
1617   * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1618   * @clnt: RPC client to query
1619   */
rpc_max_bc_payload(struct rpc_clnt * clnt)1620  size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1621  {
1622  	struct rpc_xprt *xprt;
1623  	size_t ret;
1624  
1625  	rcu_read_lock();
1626  	xprt = rcu_dereference(clnt->cl_xprt);
1627  	ret = xprt->ops->bc_maxpayload(xprt);
1628  	rcu_read_unlock();
1629  	return ret;
1630  }
1631  EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1632  
rpc_num_bc_slots(struct rpc_clnt * clnt)1633  unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1634  {
1635  	struct rpc_xprt *xprt;
1636  	unsigned int ret;
1637  
1638  	rcu_read_lock();
1639  	xprt = rcu_dereference(clnt->cl_xprt);
1640  	ret = xprt->ops->bc_num_slots(xprt);
1641  	rcu_read_unlock();
1642  	return ret;
1643  }
1644  EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1645  
1646  /**
1647   * rpc_force_rebind - force transport to check that remote port is unchanged
1648   * @clnt: client to rebind
1649   *
1650   */
rpc_force_rebind(struct rpc_clnt * clnt)1651  void rpc_force_rebind(struct rpc_clnt *clnt)
1652  {
1653  	if (clnt->cl_autobind) {
1654  		rcu_read_lock();
1655  		xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1656  		rcu_read_unlock();
1657  	}
1658  }
1659  EXPORT_SYMBOL_GPL(rpc_force_rebind);
1660  
1661  static int
__rpc_restart_call(struct rpc_task * task,void (* action)(struct rpc_task *))1662  __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1663  {
1664  	task->tk_status = 0;
1665  	task->tk_rpc_status = 0;
1666  	task->tk_action = action;
1667  	return 1;
1668  }
1669  
1670  /*
1671   * Restart an (async) RPC call. Usually called from within the
1672   * exit handler.
1673   */
1674  int
rpc_restart_call(struct rpc_task * task)1675  rpc_restart_call(struct rpc_task *task)
1676  {
1677  	return __rpc_restart_call(task, call_start);
1678  }
1679  EXPORT_SYMBOL_GPL(rpc_restart_call);
1680  
1681  /*
1682   * Restart an (async) RPC call from the call_prepare state.
1683   * Usually called from within the exit handler.
1684   */
1685  int
rpc_restart_call_prepare(struct rpc_task * task)1686  rpc_restart_call_prepare(struct rpc_task *task)
1687  {
1688  	if (task->tk_ops->rpc_call_prepare != NULL)
1689  		return __rpc_restart_call(task, rpc_prepare_task);
1690  	return rpc_restart_call(task);
1691  }
1692  EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1693  
1694  const char
rpc_proc_name(const struct rpc_task * task)1695  *rpc_proc_name(const struct rpc_task *task)
1696  {
1697  	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1698  
1699  	if (proc) {
1700  		if (proc->p_name)
1701  			return proc->p_name;
1702  		else
1703  			return "NULL";
1704  	} else
1705  		return "no proc";
1706  }
1707  
1708  static void
__rpc_call_rpcerror(struct rpc_task * task,int tk_status,int rpc_status)1709  __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1710  {
1711  	trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1712  	rpc_task_set_rpc_status(task, rpc_status);
1713  	rpc_exit(task, tk_status);
1714  }
1715  
1716  static void
rpc_call_rpcerror(struct rpc_task * task,int status)1717  rpc_call_rpcerror(struct rpc_task *task, int status)
1718  {
1719  	__rpc_call_rpcerror(task, status, status);
1720  }
1721  
1722  /*
1723   * 0.  Initial state
1724   *
1725   *     Other FSM states can be visited zero or more times, but
1726   *     this state is visited exactly once for each RPC.
1727   */
1728  static void
call_start(struct rpc_task * task)1729  call_start(struct rpc_task *task)
1730  {
1731  	struct rpc_clnt	*clnt = task->tk_client;
1732  	int idx = task->tk_msg.rpc_proc->p_statidx;
1733  
1734  	trace_rpc_request(task);
1735  
1736  	if (task->tk_client->cl_shutdown) {
1737  		rpc_call_rpcerror(task, -EIO);
1738  		return;
1739  	}
1740  
1741  	/* Increment call count (version might not be valid for ping) */
1742  	if (clnt->cl_program->version[clnt->cl_vers])
1743  		clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1744  	clnt->cl_stats->rpccnt++;
1745  	task->tk_action = call_reserve;
1746  	rpc_task_set_transport(task, clnt);
1747  }
1748  
1749  /*
1750   * 1.	Reserve an RPC call slot
1751   */
1752  static void
call_reserve(struct rpc_task * task)1753  call_reserve(struct rpc_task *task)
1754  {
1755  	task->tk_status  = 0;
1756  	task->tk_action  = call_reserveresult;
1757  	xprt_reserve(task);
1758  }
1759  
1760  static void call_retry_reserve(struct rpc_task *task);
1761  
1762  /*
1763   * 1b.	Grok the result of xprt_reserve()
1764   */
1765  static void
call_reserveresult(struct rpc_task * task)1766  call_reserveresult(struct rpc_task *task)
1767  {
1768  	int status = task->tk_status;
1769  
1770  	/*
1771  	 * After a call to xprt_reserve(), we must have either
1772  	 * a request slot or else an error status.
1773  	 */
1774  	task->tk_status = 0;
1775  	if (status >= 0) {
1776  		if (task->tk_rqstp) {
1777  			task->tk_action = call_refresh;
1778  			return;
1779  		}
1780  
1781  		rpc_call_rpcerror(task, -EIO);
1782  		return;
1783  	}
1784  
1785  	switch (status) {
1786  	case -ENOMEM:
1787  		rpc_delay(task, HZ >> 2);
1788  		fallthrough;
1789  	case -EAGAIN:	/* woken up; retry */
1790  		task->tk_action = call_retry_reserve;
1791  		return;
1792  	default:
1793  		rpc_call_rpcerror(task, status);
1794  	}
1795  }
1796  
1797  /*
1798   * 1c.	Retry reserving an RPC call slot
1799   */
1800  static void
call_retry_reserve(struct rpc_task * task)1801  call_retry_reserve(struct rpc_task *task)
1802  {
1803  	task->tk_status  = 0;
1804  	task->tk_action  = call_reserveresult;
1805  	xprt_retry_reserve(task);
1806  }
1807  
1808  /*
1809   * 2.	Bind and/or refresh the credentials
1810   */
1811  static void
call_refresh(struct rpc_task * task)1812  call_refresh(struct rpc_task *task)
1813  {
1814  	task->tk_action = call_refreshresult;
1815  	task->tk_status = 0;
1816  	task->tk_client->cl_stats->rpcauthrefresh++;
1817  	rpcauth_refreshcred(task);
1818  }
1819  
1820  /*
1821   * 2a.	Process the results of a credential refresh
1822   */
1823  static void
call_refreshresult(struct rpc_task * task)1824  call_refreshresult(struct rpc_task *task)
1825  {
1826  	int status = task->tk_status;
1827  
1828  	task->tk_status = 0;
1829  	task->tk_action = call_refresh;
1830  	switch (status) {
1831  	case 0:
1832  		if (rpcauth_uptodatecred(task)) {
1833  			task->tk_action = call_allocate;
1834  			return;
1835  		}
1836  		/* Use rate-limiting and a max number of retries if refresh
1837  		 * had status 0 but failed to update the cred.
1838  		 */
1839  		fallthrough;
1840  	case -ETIMEDOUT:
1841  		rpc_delay(task, 3*HZ);
1842  		fallthrough;
1843  	case -EAGAIN:
1844  		status = -EACCES;
1845  		fallthrough;
1846  	case -EKEYEXPIRED:
1847  		if (!task->tk_cred_retry)
1848  			break;
1849  		task->tk_cred_retry--;
1850  		trace_rpc_retry_refresh_status(task);
1851  		return;
1852  	case -ENOMEM:
1853  		rpc_delay(task, HZ >> 4);
1854  		return;
1855  	}
1856  	trace_rpc_refresh_status(task);
1857  	rpc_call_rpcerror(task, status);
1858  }
1859  
1860  /*
1861   * 2b.	Allocate the buffer. For details, see sched.c:rpc_malloc.
1862   *	(Note: buffer memory is freed in xprt_release).
1863   */
1864  static void
call_allocate(struct rpc_task * task)1865  call_allocate(struct rpc_task *task)
1866  {
1867  	const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1868  	struct rpc_rqst *req = task->tk_rqstp;
1869  	struct rpc_xprt *xprt = req->rq_xprt;
1870  	const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1871  	int status;
1872  
1873  	task->tk_status = 0;
1874  	task->tk_action = call_encode;
1875  
1876  	if (req->rq_buffer)
1877  		return;
1878  
1879  	if (proc->p_proc != 0) {
1880  		BUG_ON(proc->p_arglen == 0);
1881  		if (proc->p_decode != NULL)
1882  			BUG_ON(proc->p_replen == 0);
1883  	}
1884  
1885  	/*
1886  	 * Calculate the size (in quads) of the RPC call
1887  	 * and reply headers, and convert both values
1888  	 * to byte sizes.
1889  	 */
1890  	req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1891  			   proc->p_arglen;
1892  	req->rq_callsize <<= 2;
1893  	/*
1894  	 * Note: the reply buffer must at minimum allocate enough space
1895  	 * for the 'struct accepted_reply' from RFC5531.
1896  	 */
1897  	req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1898  			max_t(size_t, proc->p_replen, 2);
1899  	req->rq_rcvsize <<= 2;
1900  
1901  	status = xprt->ops->buf_alloc(task);
1902  	trace_rpc_buf_alloc(task, status);
1903  	if (status == 0)
1904  		return;
1905  	if (status != -ENOMEM) {
1906  		rpc_call_rpcerror(task, status);
1907  		return;
1908  	}
1909  
1910  	if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1911  		task->tk_action = call_allocate;
1912  		rpc_delay(task, HZ>>4);
1913  		return;
1914  	}
1915  
1916  	rpc_call_rpcerror(task, -ERESTARTSYS);
1917  }
1918  
1919  static int
rpc_task_need_encode(struct rpc_task * task)1920  rpc_task_need_encode(struct rpc_task *task)
1921  {
1922  	return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1923  		(!(task->tk_flags & RPC_TASK_SENT) ||
1924  		 !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1925  		 xprt_request_need_retransmit(task));
1926  }
1927  
1928  static void
rpc_xdr_encode(struct rpc_task * task)1929  rpc_xdr_encode(struct rpc_task *task)
1930  {
1931  	struct rpc_rqst	*req = task->tk_rqstp;
1932  	struct xdr_stream xdr;
1933  
1934  	xdr_buf_init(&req->rq_snd_buf,
1935  		     req->rq_buffer,
1936  		     req->rq_callsize);
1937  	xdr_buf_init(&req->rq_rcv_buf,
1938  		     req->rq_rbuffer,
1939  		     req->rq_rcvsize);
1940  
1941  	req->rq_reply_bytes_recvd = 0;
1942  	req->rq_snd_buf.head[0].iov_len = 0;
1943  	xdr_init_encode(&xdr, &req->rq_snd_buf,
1944  			req->rq_snd_buf.head[0].iov_base, req);
1945  	if (rpc_encode_header(task, &xdr))
1946  		return;
1947  
1948  	task->tk_status = rpcauth_wrap_req(task, &xdr);
1949  }
1950  
1951  /*
1952   * 3.	Encode arguments of an RPC call
1953   */
1954  static void
call_encode(struct rpc_task * task)1955  call_encode(struct rpc_task *task)
1956  {
1957  	if (!rpc_task_need_encode(task))
1958  		goto out;
1959  
1960  	/* Dequeue task from the receive queue while we're encoding */
1961  	xprt_request_dequeue_xprt(task);
1962  	/* Encode here so that rpcsec_gss can use correct sequence number. */
1963  	rpc_xdr_encode(task);
1964  	/* Add task to reply queue before transmission to avoid races */
1965  	if (task->tk_status == 0 && rpc_reply_expected(task))
1966  		task->tk_status = xprt_request_enqueue_receive(task);
1967  	/* Did the encode result in an error condition? */
1968  	if (task->tk_status != 0) {
1969  		/* Was the error nonfatal? */
1970  		switch (task->tk_status) {
1971  		case -EAGAIN:
1972  		case -ENOMEM:
1973  			rpc_delay(task, HZ >> 4);
1974  			break;
1975  		case -EKEYEXPIRED:
1976  			if (!task->tk_cred_retry) {
1977  				rpc_call_rpcerror(task, task->tk_status);
1978  			} else {
1979  				task->tk_action = call_refresh;
1980  				task->tk_cred_retry--;
1981  				trace_rpc_retry_refresh_status(task);
1982  			}
1983  			break;
1984  		default:
1985  			rpc_call_rpcerror(task, task->tk_status);
1986  		}
1987  		return;
1988  	}
1989  
1990  	xprt_request_enqueue_transmit(task);
1991  out:
1992  	task->tk_action = call_transmit;
1993  	/* Check that the connection is OK */
1994  	if (!xprt_bound(task->tk_xprt))
1995  		task->tk_action = call_bind;
1996  	else if (!xprt_connected(task->tk_xprt))
1997  		task->tk_action = call_connect;
1998  }
1999  
2000  /*
2001   * Helpers to check if the task was already transmitted, and
2002   * to take action when that is the case.
2003   */
2004  static bool
rpc_task_transmitted(struct rpc_task * task)2005  rpc_task_transmitted(struct rpc_task *task)
2006  {
2007  	return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
2008  }
2009  
2010  static void
rpc_task_handle_transmitted(struct rpc_task * task)2011  rpc_task_handle_transmitted(struct rpc_task *task)
2012  {
2013  	xprt_end_transmit(task);
2014  	task->tk_action = call_transmit_status;
2015  }
2016  
2017  /*
2018   * 4.	Get the server port number if not yet set
2019   */
2020  static void
call_bind(struct rpc_task * task)2021  call_bind(struct rpc_task *task)
2022  {
2023  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2024  
2025  	if (rpc_task_transmitted(task)) {
2026  		rpc_task_handle_transmitted(task);
2027  		return;
2028  	}
2029  
2030  	if (xprt_bound(xprt)) {
2031  		task->tk_action = call_connect;
2032  		return;
2033  	}
2034  
2035  	task->tk_action = call_bind_status;
2036  	if (!xprt_prepare_transmit(task))
2037  		return;
2038  
2039  	xprt->ops->rpcbind(task);
2040  }
2041  
2042  /*
2043   * 4a.	Sort out bind result
2044   */
2045  static void
call_bind_status(struct rpc_task * task)2046  call_bind_status(struct rpc_task *task)
2047  {
2048  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2049  	int status = -EIO;
2050  
2051  	if (rpc_task_transmitted(task)) {
2052  		rpc_task_handle_transmitted(task);
2053  		return;
2054  	}
2055  
2056  	if (task->tk_status >= 0)
2057  		goto out_next;
2058  	if (xprt_bound(xprt)) {
2059  		task->tk_status = 0;
2060  		goto out_next;
2061  	}
2062  
2063  	switch (task->tk_status) {
2064  	case -ENOMEM:
2065  		rpc_delay(task, HZ >> 2);
2066  		goto retry_timeout;
2067  	case -EACCES:
2068  		trace_rpcb_prog_unavail_err(task);
2069  		/* fail immediately if this is an RPC ping */
2070  		if (task->tk_msg.rpc_proc->p_proc == 0) {
2071  			status = -EOPNOTSUPP;
2072  			break;
2073  		}
2074  		rpc_delay(task, 3*HZ);
2075  		goto retry_timeout;
2076  	case -ENOBUFS:
2077  		rpc_delay(task, HZ >> 2);
2078  		goto retry_timeout;
2079  	case -EAGAIN:
2080  		goto retry_timeout;
2081  	case -ETIMEDOUT:
2082  		trace_rpcb_timeout_err(task);
2083  		goto retry_timeout;
2084  	case -EPFNOSUPPORT:
2085  		/* server doesn't support any rpcbind version we know of */
2086  		trace_rpcb_bind_version_err(task);
2087  		break;
2088  	case -EPROTONOSUPPORT:
2089  		trace_rpcb_bind_version_err(task);
2090  		goto retry_timeout;
2091  	case -ECONNREFUSED:		/* connection problems */
2092  	case -ECONNRESET:
2093  	case -ECONNABORTED:
2094  	case -ENOTCONN:
2095  	case -EHOSTDOWN:
2096  	case -ENETDOWN:
2097  	case -EHOSTUNREACH:
2098  	case -ENETUNREACH:
2099  	case -EPIPE:
2100  		trace_rpcb_unreachable_err(task);
2101  		if (!RPC_IS_SOFTCONN(task)) {
2102  			rpc_delay(task, 5*HZ);
2103  			goto retry_timeout;
2104  		}
2105  		status = task->tk_status;
2106  		break;
2107  	default:
2108  		trace_rpcb_unrecognized_err(task);
2109  	}
2110  
2111  	rpc_call_rpcerror(task, status);
2112  	return;
2113  out_next:
2114  	task->tk_action = call_connect;
2115  	return;
2116  retry_timeout:
2117  	task->tk_status = 0;
2118  	task->tk_action = call_bind;
2119  	rpc_check_timeout(task);
2120  }
2121  
2122  /*
2123   * 4b.	Connect to the RPC server
2124   */
2125  static void
call_connect(struct rpc_task * task)2126  call_connect(struct rpc_task *task)
2127  {
2128  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2129  
2130  	if (rpc_task_transmitted(task)) {
2131  		rpc_task_handle_transmitted(task);
2132  		return;
2133  	}
2134  
2135  	if (xprt_connected(xprt)) {
2136  		task->tk_action = call_transmit;
2137  		return;
2138  	}
2139  
2140  	task->tk_action = call_connect_status;
2141  	if (task->tk_status < 0)
2142  		return;
2143  	if (task->tk_flags & RPC_TASK_NOCONNECT) {
2144  		rpc_call_rpcerror(task, -ENOTCONN);
2145  		return;
2146  	}
2147  	if (!xprt_prepare_transmit(task))
2148  		return;
2149  	xprt_connect(task);
2150  }
2151  
2152  /*
2153   * 4c.	Sort out connect result
2154   */
2155  static void
call_connect_status(struct rpc_task * task)2156  call_connect_status(struct rpc_task *task)
2157  {
2158  	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2159  	struct rpc_clnt *clnt = task->tk_client;
2160  	int status = task->tk_status;
2161  
2162  	if (rpc_task_transmitted(task)) {
2163  		rpc_task_handle_transmitted(task);
2164  		return;
2165  	}
2166  
2167  	trace_rpc_connect_status(task);
2168  
2169  	if (task->tk_status == 0) {
2170  		clnt->cl_stats->netreconn++;
2171  		goto out_next;
2172  	}
2173  	if (xprt_connected(xprt)) {
2174  		task->tk_status = 0;
2175  		goto out_next;
2176  	}
2177  
2178  	task->tk_status = 0;
2179  	switch (status) {
2180  	case -ECONNREFUSED:
2181  	case -ECONNRESET:
2182  		/* A positive refusal suggests a rebind is needed. */
2183  		if (RPC_IS_SOFTCONN(task))
2184  			break;
2185  		if (clnt->cl_autobind) {
2186  			rpc_force_rebind(clnt);
2187  			goto out_retry;
2188  		}
2189  		fallthrough;
2190  	case -ECONNABORTED:
2191  	case -ENETDOWN:
2192  	case -ENETUNREACH:
2193  	case -EHOSTUNREACH:
2194  	case -EPIPE:
2195  	case -EPROTO:
2196  		xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2197  					    task->tk_rqstp->rq_connect_cookie);
2198  		if (RPC_IS_SOFTCONN(task))
2199  			break;
2200  		/* retry with existing socket, after a delay */
2201  		rpc_delay(task, 3*HZ);
2202  		fallthrough;
2203  	case -EADDRINUSE:
2204  	case -ENOTCONN:
2205  	case -EAGAIN:
2206  	case -ETIMEDOUT:
2207  		if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2208  		    (task->tk_flags & RPC_TASK_MOVEABLE) &&
2209  		    test_bit(XPRT_REMOVE, &xprt->state)) {
2210  			struct rpc_xprt *saved = task->tk_xprt;
2211  			struct rpc_xprt_switch *xps;
2212  
2213  			rcu_read_lock();
2214  			xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2215  			rcu_read_unlock();
2216  			if (xps->xps_nxprts > 1) {
2217  				long value;
2218  
2219  				xprt_release(task);
2220  				value = atomic_long_dec_return(&xprt->queuelen);
2221  				if (value == 0)
2222  					rpc_xprt_switch_remove_xprt(xps, saved,
2223  								    true);
2224  				xprt_put(saved);
2225  				task->tk_xprt = NULL;
2226  				task->tk_action = call_start;
2227  			}
2228  			xprt_switch_put(xps);
2229  			if (!task->tk_xprt)
2230  				return;
2231  		}
2232  		goto out_retry;
2233  	case -ENOBUFS:
2234  		rpc_delay(task, HZ >> 2);
2235  		goto out_retry;
2236  	}
2237  	rpc_call_rpcerror(task, status);
2238  	return;
2239  out_next:
2240  	task->tk_action = call_transmit;
2241  	return;
2242  out_retry:
2243  	/* Check for timeouts before looping back to call_bind */
2244  	task->tk_action = call_bind;
2245  	rpc_check_timeout(task);
2246  }
2247  
2248  /*
2249   * 5.	Transmit the RPC request, and wait for reply
2250   */
2251  static void
call_transmit(struct rpc_task * task)2252  call_transmit(struct rpc_task *task)
2253  {
2254  	if (rpc_task_transmitted(task)) {
2255  		rpc_task_handle_transmitted(task);
2256  		return;
2257  	}
2258  
2259  	task->tk_action = call_transmit_status;
2260  	if (!xprt_prepare_transmit(task))
2261  		return;
2262  	task->tk_status = 0;
2263  	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2264  		if (!xprt_connected(task->tk_xprt)) {
2265  			task->tk_status = -ENOTCONN;
2266  			return;
2267  		}
2268  		xprt_transmit(task);
2269  	}
2270  	xprt_end_transmit(task);
2271  }
2272  
2273  /*
2274   * 5a.	Handle cleanup after a transmission
2275   */
2276  static void
call_transmit_status(struct rpc_task * task)2277  call_transmit_status(struct rpc_task *task)
2278  {
2279  	task->tk_action = call_status;
2280  
2281  	/*
2282  	 * Common case: success.  Force the compiler to put this
2283  	 * test first.
2284  	 */
2285  	if (rpc_task_transmitted(task)) {
2286  		task->tk_status = 0;
2287  		xprt_request_wait_receive(task);
2288  		return;
2289  	}
2290  
2291  	switch (task->tk_status) {
2292  	default:
2293  		break;
2294  	case -EBADMSG:
2295  		task->tk_status = 0;
2296  		task->tk_action = call_encode;
2297  		break;
2298  		/*
2299  		 * Special cases: if we've been waiting on the
2300  		 * socket's write_space() callback, or if the
2301  		 * socket just returned a connection error,
2302  		 * then hold onto the transport lock.
2303  		 */
2304  	case -ENOMEM:
2305  	case -ENOBUFS:
2306  		rpc_delay(task, HZ>>2);
2307  		fallthrough;
2308  	case -EBADSLT:
2309  	case -EAGAIN:
2310  		task->tk_action = call_transmit;
2311  		task->tk_status = 0;
2312  		break;
2313  	case -EHOSTDOWN:
2314  	case -ENETDOWN:
2315  	case -EHOSTUNREACH:
2316  	case -ENETUNREACH:
2317  	case -EPERM:
2318  		break;
2319  	case -ECONNREFUSED:
2320  		if (RPC_IS_SOFTCONN(task)) {
2321  			if (!task->tk_msg.rpc_proc->p_proc)
2322  				trace_xprt_ping(task->tk_xprt,
2323  						task->tk_status);
2324  			rpc_call_rpcerror(task, task->tk_status);
2325  			return;
2326  		}
2327  		fallthrough;
2328  	case -ECONNRESET:
2329  	case -ECONNABORTED:
2330  	case -EADDRINUSE:
2331  	case -ENOTCONN:
2332  	case -EPIPE:
2333  		task->tk_action = call_bind;
2334  		task->tk_status = 0;
2335  		break;
2336  	}
2337  	rpc_check_timeout(task);
2338  }
2339  
2340  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2341  static void call_bc_transmit(struct rpc_task *task);
2342  static void call_bc_transmit_status(struct rpc_task *task);
2343  
2344  static void
call_bc_encode(struct rpc_task * task)2345  call_bc_encode(struct rpc_task *task)
2346  {
2347  	xprt_request_enqueue_transmit(task);
2348  	task->tk_action = call_bc_transmit;
2349  }
2350  
2351  /*
2352   * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
2353   * addition, disconnect on connectivity errors.
2354   */
2355  static void
call_bc_transmit(struct rpc_task * task)2356  call_bc_transmit(struct rpc_task *task)
2357  {
2358  	task->tk_action = call_bc_transmit_status;
2359  	if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2360  		if (!xprt_prepare_transmit(task))
2361  			return;
2362  		task->tk_status = 0;
2363  		xprt_transmit(task);
2364  	}
2365  	xprt_end_transmit(task);
2366  }
2367  
2368  static void
call_bc_transmit_status(struct rpc_task * task)2369  call_bc_transmit_status(struct rpc_task *task)
2370  {
2371  	struct rpc_rqst *req = task->tk_rqstp;
2372  
2373  	if (rpc_task_transmitted(task))
2374  		task->tk_status = 0;
2375  
2376  	switch (task->tk_status) {
2377  	case 0:
2378  		/* Success */
2379  	case -ENETDOWN:
2380  	case -EHOSTDOWN:
2381  	case -EHOSTUNREACH:
2382  	case -ENETUNREACH:
2383  	case -ECONNRESET:
2384  	case -ECONNREFUSED:
2385  	case -EADDRINUSE:
2386  	case -ENOTCONN:
2387  	case -EPIPE:
2388  		break;
2389  	case -ENOMEM:
2390  	case -ENOBUFS:
2391  		rpc_delay(task, HZ>>2);
2392  		fallthrough;
2393  	case -EBADSLT:
2394  	case -EAGAIN:
2395  		task->tk_status = 0;
2396  		task->tk_action = call_bc_transmit;
2397  		return;
2398  	case -ETIMEDOUT:
2399  		/*
2400  		 * Problem reaching the server.  Disconnect and let the
2401  		 * forechannel reestablish the connection.  The server will
2402  		 * have to retransmit the backchannel request and we'll
2403  		 * reprocess it.  Since these ops are idempotent, there's no
2404  		 * need to cache our reply at this time.
2405  		 */
2406  		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2407  			"error: %d\n", task->tk_status);
2408  		xprt_conditional_disconnect(req->rq_xprt,
2409  			req->rq_connect_cookie);
2410  		break;
2411  	default:
2412  		/*
2413  		 * We were unable to reply and will have to drop the
2414  		 * request.  The server should reconnect and retransmit.
2415  		 */
2416  		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2417  			"error: %d\n", task->tk_status);
2418  		break;
2419  	}
2420  	task->tk_action = rpc_exit_task;
2421  }
2422  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2423  
2424  /*
2425   * 6.	Sort out the RPC call status
2426   */
2427  static void
call_status(struct rpc_task * task)2428  call_status(struct rpc_task *task)
2429  {
2430  	struct rpc_clnt	*clnt = task->tk_client;
2431  	int		status;
2432  
2433  	if (!task->tk_msg.rpc_proc->p_proc)
2434  		trace_xprt_ping(task->tk_xprt, task->tk_status);
2435  
2436  	status = task->tk_status;
2437  	if (status >= 0) {
2438  		task->tk_action = call_decode;
2439  		return;
2440  	}
2441  
2442  	trace_rpc_call_status(task);
2443  	task->tk_status = 0;
2444  	switch(status) {
2445  	case -EHOSTDOWN:
2446  	case -ENETDOWN:
2447  	case -EHOSTUNREACH:
2448  	case -ENETUNREACH:
2449  	case -EPERM:
2450  		if (RPC_IS_SOFTCONN(task))
2451  			goto out_exit;
2452  		/*
2453  		 * Delay any retries for 3 seconds, then handle as if it
2454  		 * were a timeout.
2455  		 */
2456  		rpc_delay(task, 3*HZ);
2457  		fallthrough;
2458  	case -ETIMEDOUT:
2459  		break;
2460  	case -ECONNREFUSED:
2461  	case -ECONNRESET:
2462  	case -ECONNABORTED:
2463  	case -ENOTCONN:
2464  		rpc_force_rebind(clnt);
2465  		break;
2466  	case -EADDRINUSE:
2467  		rpc_delay(task, 3*HZ);
2468  		fallthrough;
2469  	case -EPIPE:
2470  	case -EAGAIN:
2471  		break;
2472  	case -ENFILE:
2473  	case -ENOBUFS:
2474  	case -ENOMEM:
2475  		rpc_delay(task, HZ>>2);
2476  		break;
2477  	case -EIO:
2478  		/* shutdown or soft timeout */
2479  		goto out_exit;
2480  	default:
2481  		if (clnt->cl_chatty)
2482  			printk("%s: RPC call returned error %d\n",
2483  			       clnt->cl_program->name, -status);
2484  		goto out_exit;
2485  	}
2486  	task->tk_action = call_encode;
2487  	rpc_check_timeout(task);
2488  	return;
2489  out_exit:
2490  	rpc_call_rpcerror(task, status);
2491  }
2492  
2493  static bool
rpc_check_connected(const struct rpc_rqst * req)2494  rpc_check_connected(const struct rpc_rqst *req)
2495  {
2496  	/* No allocated request or transport? return true */
2497  	if (!req || !req->rq_xprt)
2498  		return true;
2499  	return xprt_connected(req->rq_xprt);
2500  }
2501  
2502  static void
rpc_check_timeout(struct rpc_task * task)2503  rpc_check_timeout(struct rpc_task *task)
2504  {
2505  	struct rpc_clnt	*clnt = task->tk_client;
2506  
2507  	if (RPC_SIGNALLED(task))
2508  		return;
2509  
2510  	if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2511  		return;
2512  
2513  	trace_rpc_timeout_status(task);
2514  	task->tk_timeouts++;
2515  
2516  	if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2517  		rpc_call_rpcerror(task, -ETIMEDOUT);
2518  		return;
2519  	}
2520  
2521  	if (RPC_IS_SOFT(task)) {
2522  		/*
2523  		 * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2524  		 * been sent, it should time out only if the transport
2525  		 * connection gets terminally broken.
2526  		 */
2527  		if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2528  		    rpc_check_connected(task->tk_rqstp))
2529  			return;
2530  
2531  		if (clnt->cl_chatty) {
2532  			pr_notice_ratelimited(
2533  				"%s: server %s not responding, timed out\n",
2534  				clnt->cl_program->name,
2535  				task->tk_xprt->servername);
2536  		}
2537  		if (task->tk_flags & RPC_TASK_TIMEOUT)
2538  			rpc_call_rpcerror(task, -ETIMEDOUT);
2539  		else
2540  			__rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2541  		return;
2542  	}
2543  
2544  	if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2545  		task->tk_flags |= RPC_CALL_MAJORSEEN;
2546  		if (clnt->cl_chatty) {
2547  			pr_notice_ratelimited(
2548  				"%s: server %s not responding, still trying\n",
2549  				clnt->cl_program->name,
2550  				task->tk_xprt->servername);
2551  		}
2552  	}
2553  	rpc_force_rebind(clnt);
2554  	/*
2555  	 * Did our request time out due to an RPCSEC_GSS out-of-sequence
2556  	 * event? RFC2203 requires the server to drop all such requests.
2557  	 */
2558  	rpcauth_invalcred(task);
2559  }
2560  
2561  /*
2562   * 7.	Decode the RPC reply
2563   */
2564  static void
call_decode(struct rpc_task * task)2565  call_decode(struct rpc_task *task)
2566  {
2567  	struct rpc_clnt	*clnt = task->tk_client;
2568  	struct rpc_rqst	*req = task->tk_rqstp;
2569  	struct xdr_stream xdr;
2570  	int err;
2571  
2572  	if (!task->tk_msg.rpc_proc->p_decode) {
2573  		task->tk_action = rpc_exit_task;
2574  		return;
2575  	}
2576  
2577  	if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2578  		if (clnt->cl_chatty) {
2579  			pr_notice_ratelimited("%s: server %s OK\n",
2580  				clnt->cl_program->name,
2581  				task->tk_xprt->servername);
2582  		}
2583  		task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2584  	}
2585  
2586  	/*
2587  	 * Did we ever call xprt_complete_rqst()? If not, we should assume
2588  	 * the message is incomplete.
2589  	 */
2590  	err = -EAGAIN;
2591  	if (!req->rq_reply_bytes_recvd)
2592  		goto out;
2593  
2594  	/* Ensure that we see all writes made by xprt_complete_rqst()
2595  	 * before it changed req->rq_reply_bytes_recvd.
2596  	 */
2597  	smp_rmb();
2598  
2599  	req->rq_rcv_buf.len = req->rq_private_buf.len;
2600  	trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2601  
2602  	/* Check that the softirq receive buffer is valid */
2603  	WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2604  				sizeof(req->rq_rcv_buf)) != 0);
2605  
2606  	xdr_init_decode(&xdr, &req->rq_rcv_buf,
2607  			req->rq_rcv_buf.head[0].iov_base, req);
2608  	err = rpc_decode_header(task, &xdr);
2609  out:
2610  	switch (err) {
2611  	case 0:
2612  		task->tk_action = rpc_exit_task;
2613  		task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2614  		xdr_finish_decode(&xdr);
2615  		return;
2616  	case -EAGAIN:
2617  		task->tk_status = 0;
2618  		if (task->tk_client->cl_discrtry)
2619  			xprt_conditional_disconnect(req->rq_xprt,
2620  						    req->rq_connect_cookie);
2621  		task->tk_action = call_encode;
2622  		rpc_check_timeout(task);
2623  		break;
2624  	case -EKEYREJECTED:
2625  		task->tk_action = call_reserve;
2626  		rpc_check_timeout(task);
2627  		rpcauth_invalcred(task);
2628  		/* Ensure we obtain a new XID if we retry! */
2629  		xprt_release(task);
2630  	}
2631  }
2632  
2633  static int
rpc_encode_header(struct rpc_task * task,struct xdr_stream * xdr)2634  rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2635  {
2636  	struct rpc_clnt *clnt = task->tk_client;
2637  	struct rpc_rqst	*req = task->tk_rqstp;
2638  	__be32 *p;
2639  	int error;
2640  
2641  	error = -EMSGSIZE;
2642  	p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2643  	if (!p)
2644  		goto out_fail;
2645  	*p++ = req->rq_xid;
2646  	*p++ = rpc_call;
2647  	*p++ = cpu_to_be32(RPC_VERSION);
2648  	*p++ = cpu_to_be32(clnt->cl_prog);
2649  	*p++ = cpu_to_be32(clnt->cl_vers);
2650  	*p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2651  
2652  	error = rpcauth_marshcred(task, xdr);
2653  	if (error < 0)
2654  		goto out_fail;
2655  	return 0;
2656  out_fail:
2657  	trace_rpc_bad_callhdr(task);
2658  	rpc_call_rpcerror(task, error);
2659  	return error;
2660  }
2661  
2662  static noinline int
rpc_decode_header(struct rpc_task * task,struct xdr_stream * xdr)2663  rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2664  {
2665  	struct rpc_clnt *clnt = task->tk_client;
2666  	int error;
2667  	__be32 *p;
2668  
2669  	/* RFC-1014 says that the representation of XDR data must be a
2670  	 * multiple of four bytes
2671  	 * - if it isn't pointer subtraction in the NFS client may give
2672  	 *   undefined results
2673  	 */
2674  	if (task->tk_rqstp->rq_rcv_buf.len & 3)
2675  		goto out_unparsable;
2676  
2677  	p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2678  	if (!p)
2679  		goto out_unparsable;
2680  	p++;	/* skip XID */
2681  	if (*p++ != rpc_reply)
2682  		goto out_unparsable;
2683  	if (*p++ != rpc_msg_accepted)
2684  		goto out_msg_denied;
2685  
2686  	error = rpcauth_checkverf(task, xdr);
2687  	if (error)
2688  		goto out_verifier;
2689  
2690  	p = xdr_inline_decode(xdr, sizeof(*p));
2691  	if (!p)
2692  		goto out_unparsable;
2693  	switch (*p) {
2694  	case rpc_success:
2695  		return 0;
2696  	case rpc_prog_unavail:
2697  		trace_rpc__prog_unavail(task);
2698  		error = -EPFNOSUPPORT;
2699  		goto out_err;
2700  	case rpc_prog_mismatch:
2701  		trace_rpc__prog_mismatch(task);
2702  		error = -EPROTONOSUPPORT;
2703  		goto out_err;
2704  	case rpc_proc_unavail:
2705  		trace_rpc__proc_unavail(task);
2706  		error = -EOPNOTSUPP;
2707  		goto out_err;
2708  	case rpc_garbage_args:
2709  	case rpc_system_err:
2710  		trace_rpc__garbage_args(task);
2711  		error = -EIO;
2712  		break;
2713  	default:
2714  		goto out_unparsable;
2715  	}
2716  
2717  out_garbage:
2718  	clnt->cl_stats->rpcgarbage++;
2719  	if (task->tk_garb_retry) {
2720  		task->tk_garb_retry--;
2721  		task->tk_action = call_encode;
2722  		return -EAGAIN;
2723  	}
2724  out_err:
2725  	rpc_call_rpcerror(task, error);
2726  	return error;
2727  
2728  out_unparsable:
2729  	trace_rpc__unparsable(task);
2730  	error = -EIO;
2731  	goto out_garbage;
2732  
2733  out_verifier:
2734  	trace_rpc_bad_verifier(task);
2735  	switch (error) {
2736  	case -EPROTONOSUPPORT:
2737  		goto out_err;
2738  	case -EACCES:
2739  		/* Re-encode with a fresh cred */
2740  		fallthrough;
2741  	default:
2742  		goto out_garbage;
2743  	}
2744  
2745  out_msg_denied:
2746  	error = -EACCES;
2747  	p = xdr_inline_decode(xdr, sizeof(*p));
2748  	if (!p)
2749  		goto out_unparsable;
2750  	switch (*p++) {
2751  	case rpc_auth_error:
2752  		break;
2753  	case rpc_mismatch:
2754  		trace_rpc__mismatch(task);
2755  		error = -EPROTONOSUPPORT;
2756  		goto out_err;
2757  	default:
2758  		goto out_unparsable;
2759  	}
2760  
2761  	p = xdr_inline_decode(xdr, sizeof(*p));
2762  	if (!p)
2763  		goto out_unparsable;
2764  	switch (*p++) {
2765  	case rpc_autherr_rejectedcred:
2766  	case rpc_autherr_rejectedverf:
2767  	case rpcsec_gsserr_credproblem:
2768  	case rpcsec_gsserr_ctxproblem:
2769  		rpcauth_invalcred(task);
2770  		if (!task->tk_cred_retry)
2771  			break;
2772  		task->tk_cred_retry--;
2773  		trace_rpc__stale_creds(task);
2774  		return -EKEYREJECTED;
2775  	case rpc_autherr_badcred:
2776  	case rpc_autherr_badverf:
2777  		/* possibly garbled cred/verf? */
2778  		if (!task->tk_garb_retry)
2779  			break;
2780  		task->tk_garb_retry--;
2781  		trace_rpc__bad_creds(task);
2782  		task->tk_action = call_encode;
2783  		return -EAGAIN;
2784  	case rpc_autherr_tooweak:
2785  		trace_rpc__auth_tooweak(task);
2786  		pr_warn("RPC: server %s requires stronger authentication.\n",
2787  			task->tk_xprt->servername);
2788  		break;
2789  	default:
2790  		goto out_unparsable;
2791  	}
2792  	goto out_err;
2793  }
2794  
rpcproc_encode_null(struct rpc_rqst * rqstp,struct xdr_stream * xdr,const void * obj)2795  static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2796  		const void *obj)
2797  {
2798  }
2799  
rpcproc_decode_null(struct rpc_rqst * rqstp,struct xdr_stream * xdr,void * obj)2800  static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2801  		void *obj)
2802  {
2803  	return 0;
2804  }
2805  
2806  static const struct rpc_procinfo rpcproc_null = {
2807  	.p_encode = rpcproc_encode_null,
2808  	.p_decode = rpcproc_decode_null,
2809  };
2810  
2811  static const struct rpc_procinfo rpcproc_null_noreply = {
2812  	.p_encode = rpcproc_encode_null,
2813  };
2814  
2815  static void
rpc_null_call_prepare(struct rpc_task * task,void * data)2816  rpc_null_call_prepare(struct rpc_task *task, void *data)
2817  {
2818  	task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2819  	rpc_call_start(task);
2820  }
2821  
2822  static const struct rpc_call_ops rpc_null_ops = {
2823  	.rpc_call_prepare = rpc_null_call_prepare,
2824  	.rpc_call_done = rpc_default_callback,
2825  };
2826  
2827  static
rpc_call_null_helper(struct rpc_clnt * clnt,struct rpc_xprt * xprt,struct rpc_cred * cred,int flags,const struct rpc_call_ops * ops,void * data)2828  struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2829  		struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2830  		const struct rpc_call_ops *ops, void *data)
2831  {
2832  	struct rpc_message msg = {
2833  		.rpc_proc = &rpcproc_null,
2834  	};
2835  	struct rpc_task_setup task_setup_data = {
2836  		.rpc_client = clnt,
2837  		.rpc_xprt = xprt,
2838  		.rpc_message = &msg,
2839  		.rpc_op_cred = cred,
2840  		.callback_ops = ops ?: &rpc_null_ops,
2841  		.callback_data = data,
2842  		.flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2843  			 RPC_TASK_NULLCREDS,
2844  	};
2845  
2846  	return rpc_run_task(&task_setup_data);
2847  }
2848  
rpc_call_null(struct rpc_clnt * clnt,struct rpc_cred * cred,int flags)2849  struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2850  {
2851  	return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2852  }
2853  EXPORT_SYMBOL_GPL(rpc_call_null);
2854  
rpc_ping(struct rpc_clnt * clnt)2855  static int rpc_ping(struct rpc_clnt *clnt)
2856  {
2857  	struct rpc_task	*task;
2858  	int status;
2859  
2860  	if (clnt->cl_auth->au_ops->ping)
2861  		return clnt->cl_auth->au_ops->ping(clnt);
2862  
2863  	task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2864  	if (IS_ERR(task))
2865  		return PTR_ERR(task);
2866  	status = task->tk_status;
2867  	rpc_put_task(task);
2868  	return status;
2869  }
2870  
rpc_ping_noreply(struct rpc_clnt * clnt)2871  static int rpc_ping_noreply(struct rpc_clnt *clnt)
2872  {
2873  	struct rpc_message msg = {
2874  		.rpc_proc = &rpcproc_null_noreply,
2875  	};
2876  	struct rpc_task_setup task_setup_data = {
2877  		.rpc_client = clnt,
2878  		.rpc_message = &msg,
2879  		.callback_ops = &rpc_null_ops,
2880  		.flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2881  	};
2882  	struct rpc_task	*task;
2883  	int status;
2884  
2885  	task = rpc_run_task(&task_setup_data);
2886  	if (IS_ERR(task))
2887  		return PTR_ERR(task);
2888  	status = task->tk_status;
2889  	rpc_put_task(task);
2890  	return status;
2891  }
2892  
2893  struct rpc_cb_add_xprt_calldata {
2894  	struct rpc_xprt_switch *xps;
2895  	struct rpc_xprt *xprt;
2896  };
2897  
rpc_cb_add_xprt_done(struct rpc_task * task,void * calldata)2898  static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2899  {
2900  	struct rpc_cb_add_xprt_calldata *data = calldata;
2901  
2902  	if (task->tk_status == 0)
2903  		rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2904  }
2905  
rpc_cb_add_xprt_release(void * calldata)2906  static void rpc_cb_add_xprt_release(void *calldata)
2907  {
2908  	struct rpc_cb_add_xprt_calldata *data = calldata;
2909  
2910  	xprt_put(data->xprt);
2911  	xprt_switch_put(data->xps);
2912  	kfree(data);
2913  }
2914  
2915  static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2916  	.rpc_call_prepare = rpc_null_call_prepare,
2917  	.rpc_call_done = rpc_cb_add_xprt_done,
2918  	.rpc_release = rpc_cb_add_xprt_release,
2919  };
2920  
2921  /**
2922   * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2923   * @clnt: pointer to struct rpc_clnt
2924   * @xps: pointer to struct rpc_xprt_switch,
2925   * @xprt: pointer struct rpc_xprt
2926   * @in_max_connect: pointer to the max_connect value for the passed in xprt transport
2927   */
rpc_clnt_test_and_add_xprt(struct rpc_clnt * clnt,struct rpc_xprt_switch * xps,struct rpc_xprt * xprt,void * in_max_connect)2928  int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2929  		struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2930  		void *in_max_connect)
2931  {
2932  	struct rpc_cb_add_xprt_calldata *data;
2933  	struct rpc_task *task;
2934  	int max_connect = clnt->cl_max_connect;
2935  
2936  	if (in_max_connect)
2937  		max_connect = *(int *)in_max_connect;
2938  	if (xps->xps_nunique_destaddr_xprts + 1 > max_connect) {
2939  		rcu_read_lock();
2940  		pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2941  			"transport to server: %s\n", max_connect,
2942  			rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2943  		rcu_read_unlock();
2944  		return -EINVAL;
2945  	}
2946  
2947  	data = kmalloc(sizeof(*data), GFP_KERNEL);
2948  	if (!data)
2949  		return -ENOMEM;
2950  	data->xps = xprt_switch_get(xps);
2951  	data->xprt = xprt_get(xprt);
2952  	if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2953  		rpc_cb_add_xprt_release(data);
2954  		goto success;
2955  	}
2956  
2957  	task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2958  			&rpc_cb_add_xprt_call_ops, data);
2959  	if (IS_ERR(task))
2960  		return PTR_ERR(task);
2961  
2962  	data->xps->xps_nunique_destaddr_xprts++;
2963  	rpc_put_task(task);
2964  success:
2965  	return 1;
2966  }
2967  EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2968  
rpc_clnt_add_xprt_helper(struct rpc_clnt * clnt,struct rpc_xprt * xprt,struct rpc_add_xprt_test * data)2969  static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
2970  				    struct rpc_xprt *xprt,
2971  				    struct rpc_add_xprt_test *data)
2972  {
2973  	struct rpc_task *task;
2974  	int status = -EADDRINUSE;
2975  
2976  	/* Test the connection */
2977  	task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2978  	if (IS_ERR(task))
2979  		return PTR_ERR(task);
2980  
2981  	status = task->tk_status;
2982  	rpc_put_task(task);
2983  
2984  	if (status < 0)
2985  		return status;
2986  
2987  	/* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2988  	data->add_xprt_test(clnt, xprt, data->data);
2989  
2990  	return 0;
2991  }
2992  
2993  /**
2994   * rpc_clnt_setup_test_and_add_xprt()
2995   *
2996   * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2997   *   1) caller of the test function must dereference the rpc_xprt_switch
2998   *   and the rpc_xprt.
2999   *   2) test function must call rpc_xprt_switch_add_xprt, usually in
3000   *   the rpc_call_done routine.
3001   *
3002   * Upon success (return of 1), the test function adds the new
3003   * transport to the rpc_clnt xprt switch
3004   *
3005   * @clnt: struct rpc_clnt to get the new transport
3006   * @xps:  the rpc_xprt_switch to hold the new transport
3007   * @xprt: the rpc_xprt to test
3008   * @data: a struct rpc_add_xprt_test pointer that holds the test function
3009   *        and test function call data
3010   */
rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt * clnt,struct rpc_xprt_switch * xps,struct rpc_xprt * xprt,void * data)3011  int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
3012  				     struct rpc_xprt_switch *xps,
3013  				     struct rpc_xprt *xprt,
3014  				     void *data)
3015  {
3016  	int status = -EADDRINUSE;
3017  
3018  	xprt = xprt_get(xprt);
3019  	xprt_switch_get(xps);
3020  
3021  	if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
3022  		goto out_err;
3023  
3024  	status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3025  	if (status < 0)
3026  		goto out_err;
3027  
3028  	status = 1;
3029  out_err:
3030  	xprt_put(xprt);
3031  	xprt_switch_put(xps);
3032  	if (status < 0)
3033  		pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not "
3034  			"added\n", status,
3035  			xprt->address_strings[RPC_DISPLAY_ADDR]);
3036  	/* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
3037  	return status;
3038  }
3039  EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
3040  
3041  /**
3042   * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
3043   * @clnt: pointer to struct rpc_clnt
3044   * @xprtargs: pointer to struct xprt_create
3045   * @setup: callback to test and/or set up the connection
3046   * @data: pointer to setup function data
3047   *
3048   * Creates a new transport using the parameters set in args and
3049   * adds it to clnt.
3050   * If ping is set, then test that connectivity succeeds before
3051   * adding the new transport.
3052   *
3053   */
rpc_clnt_add_xprt(struct rpc_clnt * clnt,struct xprt_create * xprtargs,int (* setup)(struct rpc_clnt *,struct rpc_xprt_switch *,struct rpc_xprt *,void *),void * data)3054  int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
3055  		struct xprt_create *xprtargs,
3056  		int (*setup)(struct rpc_clnt *,
3057  			struct rpc_xprt_switch *,
3058  			struct rpc_xprt *,
3059  			void *),
3060  		void *data)
3061  {
3062  	struct rpc_xprt_switch *xps;
3063  	struct rpc_xprt *xprt;
3064  	unsigned long connect_timeout;
3065  	unsigned long reconnect_timeout;
3066  	unsigned char resvport, reuseport;
3067  	int ret = 0, ident;
3068  
3069  	rcu_read_lock();
3070  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3071  	xprt = xprt_iter_xprt(&clnt->cl_xpi);
3072  	if (xps == NULL || xprt == NULL) {
3073  		rcu_read_unlock();
3074  		xprt_switch_put(xps);
3075  		return -EAGAIN;
3076  	}
3077  	resvport = xprt->resvport;
3078  	reuseport = xprt->reuseport;
3079  	connect_timeout = xprt->connect_timeout;
3080  	reconnect_timeout = xprt->max_reconnect_timeout;
3081  	ident = xprt->xprt_class->ident;
3082  	rcu_read_unlock();
3083  
3084  	if (!xprtargs->ident)
3085  		xprtargs->ident = ident;
3086  	xprtargs->xprtsec = clnt->cl_xprtsec;
3087  	xprt = xprt_create_transport(xprtargs);
3088  	if (IS_ERR(xprt)) {
3089  		ret = PTR_ERR(xprt);
3090  		goto out_put_switch;
3091  	}
3092  	xprt->resvport = resvport;
3093  	xprt->reuseport = reuseport;
3094  
3095  	if (xprtargs->connect_timeout)
3096  		connect_timeout = xprtargs->connect_timeout;
3097  	if (xprtargs->reconnect_timeout)
3098  		reconnect_timeout = xprtargs->reconnect_timeout;
3099  	if (xprt->ops->set_connect_timeout != NULL)
3100  		xprt->ops->set_connect_timeout(xprt,
3101  				connect_timeout,
3102  				reconnect_timeout);
3103  
3104  	rpc_xprt_switch_set_roundrobin(xps);
3105  	if (setup) {
3106  		ret = setup(clnt, xps, xprt, data);
3107  		if (ret != 0)
3108  			goto out_put_xprt;
3109  	}
3110  	rpc_xprt_switch_add_xprt(xps, xprt);
3111  out_put_xprt:
3112  	xprt_put(xprt);
3113  out_put_switch:
3114  	xprt_switch_put(xps);
3115  	return ret;
3116  }
3117  EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3118  
rpc_xprt_probe_trunked(struct rpc_clnt * clnt,struct rpc_xprt * xprt,struct rpc_add_xprt_test * data)3119  static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3120  				  struct rpc_xprt *xprt,
3121  				  struct rpc_add_xprt_test *data)
3122  {
3123  	struct rpc_xprt_switch *xps;
3124  	struct rpc_xprt *main_xprt;
3125  	int status = 0;
3126  
3127  	xprt_get(xprt);
3128  
3129  	rcu_read_lock();
3130  	main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3131  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3132  	status = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3133  				   (struct sockaddr *)&main_xprt->addr);
3134  	rcu_read_unlock();
3135  	xprt_put(main_xprt);
3136  	if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3137  		goto out;
3138  
3139  	status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3140  out:
3141  	xprt_put(xprt);
3142  	xprt_switch_put(xps);
3143  	return status;
3144  }
3145  
3146  /* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3147   * @clnt rpc_clnt structure
3148   *
3149   * For each offlined transport found in the rpc_clnt structure call
3150   * the function rpc_xprt_probe_trunked() which will determine if this
3151   * transport still belongs to the trunking group.
3152   */
rpc_clnt_probe_trunked_xprts(struct rpc_clnt * clnt,struct rpc_add_xprt_test * data)3153  void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3154  				  struct rpc_add_xprt_test *data)
3155  {
3156  	struct rpc_xprt_iter xpi;
3157  	int ret;
3158  
3159  	ret = rpc_clnt_xprt_iter_offline_init(clnt, &xpi);
3160  	if (ret)
3161  		return;
3162  	for (;;) {
3163  		struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
3164  
3165  		if (!xprt)
3166  			break;
3167  		ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3168  		xprt_put(xprt);
3169  		if (ret < 0)
3170  			break;
3171  		xprt_iter_rewind(&xpi);
3172  	}
3173  	xprt_iter_destroy(&xpi);
3174  }
3175  EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3176  
rpc_xprt_offline(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * data)3177  static int rpc_xprt_offline(struct rpc_clnt *clnt,
3178  			    struct rpc_xprt *xprt,
3179  			    void *data)
3180  {
3181  	struct rpc_xprt *main_xprt;
3182  	struct rpc_xprt_switch *xps;
3183  	int err = 0;
3184  
3185  	xprt_get(xprt);
3186  
3187  	rcu_read_lock();
3188  	main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3189  	xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3190  	err = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3191  				(struct sockaddr *)&main_xprt->addr);
3192  	rcu_read_unlock();
3193  	xprt_put(main_xprt);
3194  	if (err)
3195  		goto out;
3196  
3197  	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3198  		err = -EINTR;
3199  		goto out;
3200  	}
3201  	xprt_set_offline_locked(xprt, xps);
3202  
3203  	xprt_release_write(xprt, NULL);
3204  out:
3205  	xprt_put(xprt);
3206  	xprt_switch_put(xps);
3207  	return err;
3208  }
3209  
3210  /* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3211   * @clnt rpc_clnt structure
3212   *
3213   * For each active transport found in the rpc_clnt structure call
3214   * the function rpc_xprt_offline() which will identify trunked transports
3215   * and will mark them offline.
3216   */
rpc_clnt_manage_trunked_xprts(struct rpc_clnt * clnt)3217  void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3218  {
3219  	rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3220  }
3221  EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3222  
3223  struct connect_timeout_data {
3224  	unsigned long connect_timeout;
3225  	unsigned long reconnect_timeout;
3226  };
3227  
3228  static int
rpc_xprt_set_connect_timeout(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * data)3229  rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3230  		struct rpc_xprt *xprt,
3231  		void *data)
3232  {
3233  	struct connect_timeout_data *timeo = data;
3234  
3235  	if (xprt->ops->set_connect_timeout)
3236  		xprt->ops->set_connect_timeout(xprt,
3237  				timeo->connect_timeout,
3238  				timeo->reconnect_timeout);
3239  	return 0;
3240  }
3241  
3242  void
rpc_set_connect_timeout(struct rpc_clnt * clnt,unsigned long connect_timeout,unsigned long reconnect_timeout)3243  rpc_set_connect_timeout(struct rpc_clnt *clnt,
3244  		unsigned long connect_timeout,
3245  		unsigned long reconnect_timeout)
3246  {
3247  	struct connect_timeout_data timeout = {
3248  		.connect_timeout = connect_timeout,
3249  		.reconnect_timeout = reconnect_timeout,
3250  	};
3251  	rpc_clnt_iterate_for_each_xprt(clnt,
3252  			rpc_xprt_set_connect_timeout,
3253  			&timeout);
3254  }
3255  EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3256  
rpc_clnt_xprt_switch_put(struct rpc_clnt * clnt)3257  void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
3258  {
3259  	rcu_read_lock();
3260  	xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3261  	rcu_read_unlock();
3262  }
3263  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3264  
rpc_clnt_xprt_set_online(struct rpc_clnt * clnt,struct rpc_xprt * xprt)3265  void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3266  {
3267  	struct rpc_xprt_switch *xps;
3268  
3269  	rcu_read_lock();
3270  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3271  	rcu_read_unlock();
3272  	xprt_set_online_locked(xprt, xps);
3273  }
3274  
rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)3275  void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3276  {
3277  	if (rpc_clnt_xprt_switch_has_addr(clnt,
3278  		(const struct sockaddr *)&xprt->addr)) {
3279  		return rpc_clnt_xprt_set_online(clnt, xprt);
3280  	}
3281  	rcu_read_lock();
3282  	rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3283  				 xprt);
3284  	rcu_read_unlock();
3285  }
3286  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3287  
rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt * clnt,struct rpc_xprt * xprt)3288  void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3289  {
3290  	struct rpc_xprt_switch *xps;
3291  
3292  	rcu_read_lock();
3293  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3294  	rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3295  				    xprt, 0);
3296  	xps->xps_nunique_destaddr_xprts--;
3297  	rcu_read_unlock();
3298  }
3299  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3300  
rpc_clnt_xprt_switch_has_addr(struct rpc_clnt * clnt,const struct sockaddr * sap)3301  bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3302  				   const struct sockaddr *sap)
3303  {
3304  	struct rpc_xprt_switch *xps;
3305  	bool ret;
3306  
3307  	rcu_read_lock();
3308  	xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3309  	ret = rpc_xprt_switch_has_addr(xps, sap);
3310  	rcu_read_unlock();
3311  	return ret;
3312  }
3313  EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3314  
3315  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
rpc_show_header(void)3316  static void rpc_show_header(void)
3317  {
3318  	printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3319  		"-timeout ---ops--\n");
3320  }
3321  
rpc_show_task(const struct rpc_clnt * clnt,const struct rpc_task * task)3322  static void rpc_show_task(const struct rpc_clnt *clnt,
3323  			  const struct rpc_task *task)
3324  {
3325  	const char *rpc_waitq = "none";
3326  
3327  	if (RPC_IS_QUEUED(task))
3328  		rpc_waitq = rpc_qname(task->tk_waitqueue);
3329  
3330  	printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3331  		task->tk_pid, task->tk_flags, task->tk_status,
3332  		clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3333  		clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3334  		task->tk_action, rpc_waitq);
3335  }
3336  
rpc_show_tasks(struct net * net)3337  void rpc_show_tasks(struct net *net)
3338  {
3339  	struct rpc_clnt *clnt;
3340  	struct rpc_task *task;
3341  	int header = 0;
3342  	struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3343  
3344  	spin_lock(&sn->rpc_client_lock);
3345  	list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3346  		spin_lock(&clnt->cl_lock);
3347  		list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3348  			if (!header) {
3349  				rpc_show_header();
3350  				header++;
3351  			}
3352  			rpc_show_task(clnt, task);
3353  		}
3354  		spin_unlock(&clnt->cl_lock);
3355  	}
3356  	spin_unlock(&sn->rpc_client_lock);
3357  }
3358  #endif
3359  
3360  #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3361  static int
rpc_clnt_swap_activate_callback(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * dummy)3362  rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3363  		struct rpc_xprt *xprt,
3364  		void *dummy)
3365  {
3366  	return xprt_enable_swap(xprt);
3367  }
3368  
3369  int
rpc_clnt_swap_activate(struct rpc_clnt * clnt)3370  rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3371  {
3372  	while (clnt != clnt->cl_parent)
3373  		clnt = clnt->cl_parent;
3374  	if (atomic_inc_return(&clnt->cl_swapper) == 1)
3375  		return rpc_clnt_iterate_for_each_xprt(clnt,
3376  				rpc_clnt_swap_activate_callback, NULL);
3377  	return 0;
3378  }
3379  EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3380  
3381  static int
rpc_clnt_swap_deactivate_callback(struct rpc_clnt * clnt,struct rpc_xprt * xprt,void * dummy)3382  rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3383  		struct rpc_xprt *xprt,
3384  		void *dummy)
3385  {
3386  	xprt_disable_swap(xprt);
3387  	return 0;
3388  }
3389  
3390  void
rpc_clnt_swap_deactivate(struct rpc_clnt * clnt)3391  rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3392  {
3393  	while (clnt != clnt->cl_parent)
3394  		clnt = clnt->cl_parent;
3395  	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3396  		rpc_clnt_iterate_for_each_xprt(clnt,
3397  				rpc_clnt_swap_deactivate_callback, NULL);
3398  }
3399  EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3400  #endif /* CONFIG_SUNRPC_SWAP */
3401