xref: /openbmc/linux/net/sunrpc/svc.c (revision 09b35b41)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /*
3   * linux/net/sunrpc/svc.c
4   *
5   * High-level RPC service routines
6   *
7   * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
8   *
9   * Multiple threads pools and NUMAisation
10   * Copyright (c) 2006 Silicon Graphics, Inc.
11   * by Greg Banks <gnb@melbourne.sgi.com>
12   */
13  
14  #include <linux/linkage.h>
15  #include <linux/sched/signal.h>
16  #include <linux/errno.h>
17  #include <linux/net.h>
18  #include <linux/in.h>
19  #include <linux/mm.h>
20  #include <linux/interrupt.h>
21  #include <linux/module.h>
22  #include <linux/kthread.h>
23  #include <linux/slab.h>
24  
25  #include <linux/sunrpc/types.h>
26  #include <linux/sunrpc/xdr.h>
27  #include <linux/sunrpc/stats.h>
28  #include <linux/sunrpc/svcsock.h>
29  #include <linux/sunrpc/clnt.h>
30  #include <linux/sunrpc/bc_xprt.h>
31  
32  #include <trace/events/sunrpc.h>
33  
34  #define RPCDBG_FACILITY	RPCDBG_SVCDSP
35  
36  static void svc_unregister(const struct svc_serv *serv, struct net *net);
37  
38  #define svc_serv_is_pooled(serv)    ((serv)->sv_ops->svo_function)
39  
40  #define SVC_POOL_DEFAULT	SVC_POOL_GLOBAL
41  
42  /*
43   * Structure for mapping cpus to pools and vice versa.
44   * Setup once during sunrpc initialisation.
45   */
46  struct svc_pool_map svc_pool_map = {
47  	.mode = SVC_POOL_DEFAULT
48  };
49  EXPORT_SYMBOL_GPL(svc_pool_map);
50  
51  static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
52  
53  static int
54  param_set_pool_mode(const char *val, const struct kernel_param *kp)
55  {
56  	int *ip = (int *)kp->arg;
57  	struct svc_pool_map *m = &svc_pool_map;
58  	int err;
59  
60  	mutex_lock(&svc_pool_map_mutex);
61  
62  	err = -EBUSY;
63  	if (m->count)
64  		goto out;
65  
66  	err = 0;
67  	if (!strncmp(val, "auto", 4))
68  		*ip = SVC_POOL_AUTO;
69  	else if (!strncmp(val, "global", 6))
70  		*ip = SVC_POOL_GLOBAL;
71  	else if (!strncmp(val, "percpu", 6))
72  		*ip = SVC_POOL_PERCPU;
73  	else if (!strncmp(val, "pernode", 7))
74  		*ip = SVC_POOL_PERNODE;
75  	else
76  		err = -EINVAL;
77  
78  out:
79  	mutex_unlock(&svc_pool_map_mutex);
80  	return err;
81  }
82  
83  static int
84  param_get_pool_mode(char *buf, const struct kernel_param *kp)
85  {
86  	int *ip = (int *)kp->arg;
87  
88  	switch (*ip)
89  	{
90  	case SVC_POOL_AUTO:
91  		return strlcpy(buf, "auto", 20);
92  	case SVC_POOL_GLOBAL:
93  		return strlcpy(buf, "global", 20);
94  	case SVC_POOL_PERCPU:
95  		return strlcpy(buf, "percpu", 20);
96  	case SVC_POOL_PERNODE:
97  		return strlcpy(buf, "pernode", 20);
98  	default:
99  		return sprintf(buf, "%d", *ip);
100  	}
101  }
102  
103  module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
104  		 &svc_pool_map.mode, 0644);
105  
106  /*
107   * Detect best pool mapping mode heuristically,
108   * according to the machine's topology.
109   */
110  static int
111  svc_pool_map_choose_mode(void)
112  {
113  	unsigned int node;
114  
115  	if (nr_online_nodes > 1) {
116  		/*
117  		 * Actually have multiple NUMA nodes,
118  		 * so split pools on NUMA node boundaries
119  		 */
120  		return SVC_POOL_PERNODE;
121  	}
122  
123  	node = first_online_node;
124  	if (nr_cpus_node(node) > 2) {
125  		/*
126  		 * Non-trivial SMP, or CONFIG_NUMA on
127  		 * non-NUMA hardware, e.g. with a generic
128  		 * x86_64 kernel on Xeons.  In this case we
129  		 * want to divide the pools on cpu boundaries.
130  		 */
131  		return SVC_POOL_PERCPU;
132  	}
133  
134  	/* default: one global pool */
135  	return SVC_POOL_GLOBAL;
136  }
137  
138  /*
139   * Allocate the to_pool[] and pool_to[] arrays.
140   * Returns 0 on success or an errno.
141   */
142  static int
143  svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
144  {
145  	m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
146  	if (!m->to_pool)
147  		goto fail;
148  	m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
149  	if (!m->pool_to)
150  		goto fail_free;
151  
152  	return 0;
153  
154  fail_free:
155  	kfree(m->to_pool);
156  	m->to_pool = NULL;
157  fail:
158  	return -ENOMEM;
159  }
160  
161  /*
162   * Initialise the pool map for SVC_POOL_PERCPU mode.
163   * Returns number of pools or <0 on error.
164   */
165  static int
166  svc_pool_map_init_percpu(struct svc_pool_map *m)
167  {
168  	unsigned int maxpools = nr_cpu_ids;
169  	unsigned int pidx = 0;
170  	unsigned int cpu;
171  	int err;
172  
173  	err = svc_pool_map_alloc_arrays(m, maxpools);
174  	if (err)
175  		return err;
176  
177  	for_each_online_cpu(cpu) {
178  		BUG_ON(pidx >= maxpools);
179  		m->to_pool[cpu] = pidx;
180  		m->pool_to[pidx] = cpu;
181  		pidx++;
182  	}
183  	/* cpus brought online later all get mapped to pool0, sorry */
184  
185  	return pidx;
186  };
187  
188  
189  /*
190   * Initialise the pool map for SVC_POOL_PERNODE mode.
191   * Returns number of pools or <0 on error.
192   */
193  static int
194  svc_pool_map_init_pernode(struct svc_pool_map *m)
195  {
196  	unsigned int maxpools = nr_node_ids;
197  	unsigned int pidx = 0;
198  	unsigned int node;
199  	int err;
200  
201  	err = svc_pool_map_alloc_arrays(m, maxpools);
202  	if (err)
203  		return err;
204  
205  	for_each_node_with_cpus(node) {
206  		/* some architectures (e.g. SN2) have cpuless nodes */
207  		BUG_ON(pidx > maxpools);
208  		m->to_pool[node] = pidx;
209  		m->pool_to[pidx] = node;
210  		pidx++;
211  	}
212  	/* nodes brought online later all get mapped to pool0, sorry */
213  
214  	return pidx;
215  }
216  
217  
218  /*
219   * Add a reference to the global map of cpus to pools (and
220   * vice versa).  Initialise the map if we're the first user.
221   * Returns the number of pools.
222   */
223  unsigned int
224  svc_pool_map_get(void)
225  {
226  	struct svc_pool_map *m = &svc_pool_map;
227  	int npools = -1;
228  
229  	mutex_lock(&svc_pool_map_mutex);
230  
231  	if (m->count++) {
232  		mutex_unlock(&svc_pool_map_mutex);
233  		return m->npools;
234  	}
235  
236  	if (m->mode == SVC_POOL_AUTO)
237  		m->mode = svc_pool_map_choose_mode();
238  
239  	switch (m->mode) {
240  	case SVC_POOL_PERCPU:
241  		npools = svc_pool_map_init_percpu(m);
242  		break;
243  	case SVC_POOL_PERNODE:
244  		npools = svc_pool_map_init_pernode(m);
245  		break;
246  	}
247  
248  	if (npools < 0) {
249  		/* default, or memory allocation failure */
250  		npools = 1;
251  		m->mode = SVC_POOL_GLOBAL;
252  	}
253  	m->npools = npools;
254  
255  	mutex_unlock(&svc_pool_map_mutex);
256  	return m->npools;
257  }
258  EXPORT_SYMBOL_GPL(svc_pool_map_get);
259  
260  /*
261   * Drop a reference to the global map of cpus to pools.
262   * When the last reference is dropped, the map data is
263   * freed; this allows the sysadmin to change the pool
264   * mode using the pool_mode module option without
265   * rebooting or re-loading sunrpc.ko.
266   */
267  void
268  svc_pool_map_put(void)
269  {
270  	struct svc_pool_map *m = &svc_pool_map;
271  
272  	mutex_lock(&svc_pool_map_mutex);
273  
274  	if (!--m->count) {
275  		kfree(m->to_pool);
276  		m->to_pool = NULL;
277  		kfree(m->pool_to);
278  		m->pool_to = NULL;
279  		m->npools = 0;
280  	}
281  
282  	mutex_unlock(&svc_pool_map_mutex);
283  }
284  EXPORT_SYMBOL_GPL(svc_pool_map_put);
285  
286  static int svc_pool_map_get_node(unsigned int pidx)
287  {
288  	const struct svc_pool_map *m = &svc_pool_map;
289  
290  	if (m->count) {
291  		if (m->mode == SVC_POOL_PERCPU)
292  			return cpu_to_node(m->pool_to[pidx]);
293  		if (m->mode == SVC_POOL_PERNODE)
294  			return m->pool_to[pidx];
295  	}
296  	return NUMA_NO_NODE;
297  }
298  /*
299   * Set the given thread's cpus_allowed mask so that it
300   * will only run on cpus in the given pool.
301   */
302  static inline void
303  svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
304  {
305  	struct svc_pool_map *m = &svc_pool_map;
306  	unsigned int node = m->pool_to[pidx];
307  
308  	/*
309  	 * The caller checks for sv_nrpools > 1, which
310  	 * implies that we've been initialized.
311  	 */
312  	WARN_ON_ONCE(m->count == 0);
313  	if (m->count == 0)
314  		return;
315  
316  	switch (m->mode) {
317  	case SVC_POOL_PERCPU:
318  	{
319  		set_cpus_allowed_ptr(task, cpumask_of(node));
320  		break;
321  	}
322  	case SVC_POOL_PERNODE:
323  	{
324  		set_cpus_allowed_ptr(task, cpumask_of_node(node));
325  		break;
326  	}
327  	}
328  }
329  
330  /*
331   * Use the mapping mode to choose a pool for a given CPU.
332   * Used when enqueueing an incoming RPC.  Always returns
333   * a non-NULL pool pointer.
334   */
335  struct svc_pool *
336  svc_pool_for_cpu(struct svc_serv *serv, int cpu)
337  {
338  	struct svc_pool_map *m = &svc_pool_map;
339  	unsigned int pidx = 0;
340  
341  	/*
342  	 * An uninitialised map happens in a pure client when
343  	 * lockd is brought up, so silently treat it the
344  	 * same as SVC_POOL_GLOBAL.
345  	 */
346  	if (svc_serv_is_pooled(serv)) {
347  		switch (m->mode) {
348  		case SVC_POOL_PERCPU:
349  			pidx = m->to_pool[cpu];
350  			break;
351  		case SVC_POOL_PERNODE:
352  			pidx = m->to_pool[cpu_to_node(cpu)];
353  			break;
354  		}
355  	}
356  	return &serv->sv_pools[pidx % serv->sv_nrpools];
357  }
358  
359  int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
360  {
361  	int err;
362  
363  	err = rpcb_create_local(net);
364  	if (err)
365  		return err;
366  
367  	/* Remove any stale portmap registrations */
368  	svc_unregister(serv, net);
369  	return 0;
370  }
371  EXPORT_SYMBOL_GPL(svc_rpcb_setup);
372  
373  void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
374  {
375  	svc_unregister(serv, net);
376  	rpcb_put_local(net);
377  }
378  EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
379  
380  static int svc_uses_rpcbind(struct svc_serv *serv)
381  {
382  	struct svc_program	*progp;
383  	unsigned int		i;
384  
385  	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
386  		for (i = 0; i < progp->pg_nvers; i++) {
387  			if (progp->pg_vers[i] == NULL)
388  				continue;
389  			if (!progp->pg_vers[i]->vs_hidden)
390  				return 1;
391  		}
392  	}
393  
394  	return 0;
395  }
396  
397  int svc_bind(struct svc_serv *serv, struct net *net)
398  {
399  	if (!svc_uses_rpcbind(serv))
400  		return 0;
401  	return svc_rpcb_setup(serv, net);
402  }
403  EXPORT_SYMBOL_GPL(svc_bind);
404  
405  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
406  static void
407  __svc_init_bc(struct svc_serv *serv)
408  {
409  	INIT_LIST_HEAD(&serv->sv_cb_list);
410  	spin_lock_init(&serv->sv_cb_lock);
411  	init_waitqueue_head(&serv->sv_cb_waitq);
412  }
413  #else
414  static void
415  __svc_init_bc(struct svc_serv *serv)
416  {
417  }
418  #endif
419  
420  /*
421   * Create an RPC service
422   */
423  static struct svc_serv *
424  __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
425  	     const struct svc_serv_ops *ops)
426  {
427  	struct svc_serv	*serv;
428  	unsigned int vers;
429  	unsigned int xdrsize;
430  	unsigned int i;
431  
432  	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
433  		return NULL;
434  	serv->sv_name      = prog->pg_name;
435  	serv->sv_program   = prog;
436  	serv->sv_nrthreads = 1;
437  	serv->sv_stats     = prog->pg_stats;
438  	if (bufsize > RPCSVC_MAXPAYLOAD)
439  		bufsize = RPCSVC_MAXPAYLOAD;
440  	serv->sv_max_payload = bufsize? bufsize : 4096;
441  	serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
442  	serv->sv_ops = ops;
443  	xdrsize = 0;
444  	while (prog) {
445  		prog->pg_lovers = prog->pg_nvers-1;
446  		for (vers=0; vers<prog->pg_nvers ; vers++)
447  			if (prog->pg_vers[vers]) {
448  				prog->pg_hivers = vers;
449  				if (prog->pg_lovers > vers)
450  					prog->pg_lovers = vers;
451  				if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
452  					xdrsize = prog->pg_vers[vers]->vs_xdrsize;
453  			}
454  		prog = prog->pg_next;
455  	}
456  	serv->sv_xdrsize   = xdrsize;
457  	INIT_LIST_HEAD(&serv->sv_tempsocks);
458  	INIT_LIST_HEAD(&serv->sv_permsocks);
459  	timer_setup(&serv->sv_temptimer, NULL, 0);
460  	spin_lock_init(&serv->sv_lock);
461  
462  	__svc_init_bc(serv);
463  
464  	serv->sv_nrpools = npools;
465  	serv->sv_pools =
466  		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
467  			GFP_KERNEL);
468  	if (!serv->sv_pools) {
469  		kfree(serv);
470  		return NULL;
471  	}
472  
473  	for (i = 0; i < serv->sv_nrpools; i++) {
474  		struct svc_pool *pool = &serv->sv_pools[i];
475  
476  		dprintk("svc: initialising pool %u for %s\n",
477  				i, serv->sv_name);
478  
479  		pool->sp_id = i;
480  		INIT_LIST_HEAD(&pool->sp_sockets);
481  		INIT_LIST_HEAD(&pool->sp_all_threads);
482  		spin_lock_init(&pool->sp_lock);
483  	}
484  
485  	return serv;
486  }
487  
488  struct svc_serv *
489  svc_create(struct svc_program *prog, unsigned int bufsize,
490  	   const struct svc_serv_ops *ops)
491  {
492  	return __svc_create(prog, bufsize, /*npools*/1, ops);
493  }
494  EXPORT_SYMBOL_GPL(svc_create);
495  
496  struct svc_serv *
497  svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
498  		  const struct svc_serv_ops *ops)
499  {
500  	struct svc_serv *serv;
501  	unsigned int npools = svc_pool_map_get();
502  
503  	serv = __svc_create(prog, bufsize, npools, ops);
504  	if (!serv)
505  		goto out_err;
506  	return serv;
507  out_err:
508  	svc_pool_map_put();
509  	return NULL;
510  }
511  EXPORT_SYMBOL_GPL(svc_create_pooled);
512  
513  void svc_shutdown_net(struct svc_serv *serv, struct net *net)
514  {
515  	svc_close_net(serv, net);
516  
517  	if (serv->sv_ops->svo_shutdown)
518  		serv->sv_ops->svo_shutdown(serv, net);
519  }
520  EXPORT_SYMBOL_GPL(svc_shutdown_net);
521  
522  /*
523   * Destroy an RPC service. Should be called with appropriate locking to
524   * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
525   */
526  void
527  svc_destroy(struct svc_serv *serv)
528  {
529  	dprintk("svc: svc_destroy(%s, %d)\n",
530  				serv->sv_program->pg_name,
531  				serv->sv_nrthreads);
532  
533  	if (serv->sv_nrthreads) {
534  		if (--(serv->sv_nrthreads) != 0) {
535  			svc_sock_update_bufs(serv);
536  			return;
537  		}
538  	} else
539  		printk("svc_destroy: no threads for serv=%p!\n", serv);
540  
541  	del_timer_sync(&serv->sv_temptimer);
542  
543  	/*
544  	 * The last user is gone and thus all sockets have to be destroyed to
545  	 * the point. Check this.
546  	 */
547  	BUG_ON(!list_empty(&serv->sv_permsocks));
548  	BUG_ON(!list_empty(&serv->sv_tempsocks));
549  
550  	cache_clean_deferred(serv);
551  
552  	if (svc_serv_is_pooled(serv))
553  		svc_pool_map_put();
554  
555  	kfree(serv->sv_pools);
556  	kfree(serv);
557  }
558  EXPORT_SYMBOL_GPL(svc_destroy);
559  
560  /*
561   * Allocate an RPC server's buffer space.
562   * We allocate pages and place them in rq_argpages.
563   */
564  static int
565  svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
566  {
567  	unsigned int pages, arghi;
568  
569  	/* bc_xprt uses fore channel allocated buffers */
570  	if (svc_is_backchannel(rqstp))
571  		return 1;
572  
573  	pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
574  				       * We assume one is at most one page
575  				       */
576  	arghi = 0;
577  	WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
578  	if (pages > RPCSVC_MAXPAGES)
579  		pages = RPCSVC_MAXPAGES;
580  	while (pages) {
581  		struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
582  		if (!p)
583  			break;
584  		rqstp->rq_pages[arghi++] = p;
585  		pages--;
586  	}
587  	return pages == 0;
588  }
589  
590  /*
591   * Release an RPC server buffer
592   */
593  static void
594  svc_release_buffer(struct svc_rqst *rqstp)
595  {
596  	unsigned int i;
597  
598  	for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
599  		if (rqstp->rq_pages[i])
600  			put_page(rqstp->rq_pages[i]);
601  }
602  
603  struct svc_rqst *
604  svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
605  {
606  	struct svc_rqst	*rqstp;
607  
608  	rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
609  	if (!rqstp)
610  		return rqstp;
611  
612  	__set_bit(RQ_BUSY, &rqstp->rq_flags);
613  	spin_lock_init(&rqstp->rq_lock);
614  	rqstp->rq_server = serv;
615  	rqstp->rq_pool = pool;
616  
617  	rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
618  	if (!rqstp->rq_argp)
619  		goto out_enomem;
620  
621  	rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
622  	if (!rqstp->rq_resp)
623  		goto out_enomem;
624  
625  	if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
626  		goto out_enomem;
627  
628  	return rqstp;
629  out_enomem:
630  	svc_rqst_free(rqstp);
631  	return NULL;
632  }
633  EXPORT_SYMBOL_GPL(svc_rqst_alloc);
634  
635  struct svc_rqst *
636  svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
637  {
638  	struct svc_rqst	*rqstp;
639  
640  	rqstp = svc_rqst_alloc(serv, pool, node);
641  	if (!rqstp)
642  		return ERR_PTR(-ENOMEM);
643  
644  	serv->sv_nrthreads++;
645  	spin_lock_bh(&pool->sp_lock);
646  	pool->sp_nrthreads++;
647  	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
648  	spin_unlock_bh(&pool->sp_lock);
649  	return rqstp;
650  }
651  EXPORT_SYMBOL_GPL(svc_prepare_thread);
652  
653  /*
654   * Choose a pool in which to create a new thread, for svc_set_num_threads
655   */
656  static inline struct svc_pool *
657  choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
658  {
659  	if (pool != NULL)
660  		return pool;
661  
662  	return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
663  }
664  
665  /*
666   * Choose a thread to kill, for svc_set_num_threads
667   */
668  static inline struct task_struct *
669  choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
670  {
671  	unsigned int i;
672  	struct task_struct *task = NULL;
673  
674  	if (pool != NULL) {
675  		spin_lock_bh(&pool->sp_lock);
676  	} else {
677  		/* choose a pool in round-robin fashion */
678  		for (i = 0; i < serv->sv_nrpools; i++) {
679  			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
680  			spin_lock_bh(&pool->sp_lock);
681  			if (!list_empty(&pool->sp_all_threads))
682  				goto found_pool;
683  			spin_unlock_bh(&pool->sp_lock);
684  		}
685  		return NULL;
686  	}
687  
688  found_pool:
689  	if (!list_empty(&pool->sp_all_threads)) {
690  		struct svc_rqst *rqstp;
691  
692  		/*
693  		 * Remove from the pool->sp_all_threads list
694  		 * so we don't try to kill it again.
695  		 */
696  		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
697  		set_bit(RQ_VICTIM, &rqstp->rq_flags);
698  		list_del_rcu(&rqstp->rq_all);
699  		task = rqstp->rq_task;
700  	}
701  	spin_unlock_bh(&pool->sp_lock);
702  
703  	return task;
704  }
705  
706  /* create new threads */
707  static int
708  svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
709  {
710  	struct svc_rqst	*rqstp;
711  	struct task_struct *task;
712  	struct svc_pool *chosen_pool;
713  	unsigned int state = serv->sv_nrthreads-1;
714  	int node;
715  
716  	do {
717  		nrservs--;
718  		chosen_pool = choose_pool(serv, pool, &state);
719  
720  		node = svc_pool_map_get_node(chosen_pool->sp_id);
721  		rqstp = svc_prepare_thread(serv, chosen_pool, node);
722  		if (IS_ERR(rqstp))
723  			return PTR_ERR(rqstp);
724  
725  		__module_get(serv->sv_ops->svo_module);
726  		task = kthread_create_on_node(serv->sv_ops->svo_function, rqstp,
727  					      node, "%s", serv->sv_name);
728  		if (IS_ERR(task)) {
729  			module_put(serv->sv_ops->svo_module);
730  			svc_exit_thread(rqstp);
731  			return PTR_ERR(task);
732  		}
733  
734  		rqstp->rq_task = task;
735  		if (serv->sv_nrpools > 1)
736  			svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
737  
738  		svc_sock_update_bufs(serv);
739  		wake_up_process(task);
740  	} while (nrservs > 0);
741  
742  	return 0;
743  }
744  
745  
746  /* destroy old threads */
747  static int
748  svc_signal_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
749  {
750  	struct task_struct *task;
751  	unsigned int state = serv->sv_nrthreads-1;
752  
753  	/* destroy old threads */
754  	do {
755  		task = choose_victim(serv, pool, &state);
756  		if (task == NULL)
757  			break;
758  		send_sig(SIGINT, task, 1);
759  		nrservs++;
760  	} while (nrservs < 0);
761  
762  	return 0;
763  }
764  
765  /*
766   * Create or destroy enough new threads to make the number
767   * of threads the given number.  If `pool' is non-NULL, applies
768   * only to threads in that pool, otherwise round-robins between
769   * all pools.  Caller must ensure that mutual exclusion between this and
770   * server startup or shutdown.
771   *
772   * Destroying threads relies on the service threads filling in
773   * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
774   * has been created using svc_create_pooled().
775   *
776   * Based on code that used to be in nfsd_svc() but tweaked
777   * to be pool-aware.
778   */
779  int
780  svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
781  {
782  	if (pool == NULL) {
783  		/* The -1 assumes caller has done a svc_get() */
784  		nrservs -= (serv->sv_nrthreads-1);
785  	} else {
786  		spin_lock_bh(&pool->sp_lock);
787  		nrservs -= pool->sp_nrthreads;
788  		spin_unlock_bh(&pool->sp_lock);
789  	}
790  
791  	if (nrservs > 0)
792  		return svc_start_kthreads(serv, pool, nrservs);
793  	if (nrservs < 0)
794  		return svc_signal_kthreads(serv, pool, nrservs);
795  	return 0;
796  }
797  EXPORT_SYMBOL_GPL(svc_set_num_threads);
798  
799  /* destroy old threads */
800  static int
801  svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
802  {
803  	struct task_struct *task;
804  	unsigned int state = serv->sv_nrthreads-1;
805  
806  	/* destroy old threads */
807  	do {
808  		task = choose_victim(serv, pool, &state);
809  		if (task == NULL)
810  			break;
811  		kthread_stop(task);
812  		nrservs++;
813  	} while (nrservs < 0);
814  	return 0;
815  }
816  
817  int
818  svc_set_num_threads_sync(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
819  {
820  	if (pool == NULL) {
821  		/* The -1 assumes caller has done a svc_get() */
822  		nrservs -= (serv->sv_nrthreads-1);
823  	} else {
824  		spin_lock_bh(&pool->sp_lock);
825  		nrservs -= pool->sp_nrthreads;
826  		spin_unlock_bh(&pool->sp_lock);
827  	}
828  
829  	if (nrservs > 0)
830  		return svc_start_kthreads(serv, pool, nrservs);
831  	if (nrservs < 0)
832  		return svc_stop_kthreads(serv, pool, nrservs);
833  	return 0;
834  }
835  EXPORT_SYMBOL_GPL(svc_set_num_threads_sync);
836  
837  /*
838   * Called from a server thread as it's exiting. Caller must hold the "service
839   * mutex" for the service.
840   */
841  void
842  svc_rqst_free(struct svc_rqst *rqstp)
843  {
844  	svc_release_buffer(rqstp);
845  	kfree(rqstp->rq_resp);
846  	kfree(rqstp->rq_argp);
847  	kfree(rqstp->rq_auth_data);
848  	kfree_rcu(rqstp, rq_rcu_head);
849  }
850  EXPORT_SYMBOL_GPL(svc_rqst_free);
851  
852  void
853  svc_exit_thread(struct svc_rqst *rqstp)
854  {
855  	struct svc_serv	*serv = rqstp->rq_server;
856  	struct svc_pool	*pool = rqstp->rq_pool;
857  
858  	spin_lock_bh(&pool->sp_lock);
859  	pool->sp_nrthreads--;
860  	if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
861  		list_del_rcu(&rqstp->rq_all);
862  	spin_unlock_bh(&pool->sp_lock);
863  
864  	svc_rqst_free(rqstp);
865  
866  	/* Release the server */
867  	if (serv)
868  		svc_destroy(serv);
869  }
870  EXPORT_SYMBOL_GPL(svc_exit_thread);
871  
872  /*
873   * Register an "inet" protocol family netid with the local
874   * rpcbind daemon via an rpcbind v4 SET request.
875   *
876   * No netconfig infrastructure is available in the kernel, so
877   * we map IP_ protocol numbers to netids by hand.
878   *
879   * Returns zero on success; a negative errno value is returned
880   * if any error occurs.
881   */
882  static int __svc_rpcb_register4(struct net *net, const u32 program,
883  				const u32 version,
884  				const unsigned short protocol,
885  				const unsigned short port)
886  {
887  	const struct sockaddr_in sin = {
888  		.sin_family		= AF_INET,
889  		.sin_addr.s_addr	= htonl(INADDR_ANY),
890  		.sin_port		= htons(port),
891  	};
892  	const char *netid;
893  	int error;
894  
895  	switch (protocol) {
896  	case IPPROTO_UDP:
897  		netid = RPCBIND_NETID_UDP;
898  		break;
899  	case IPPROTO_TCP:
900  		netid = RPCBIND_NETID_TCP;
901  		break;
902  	default:
903  		return -ENOPROTOOPT;
904  	}
905  
906  	error = rpcb_v4_register(net, program, version,
907  					(const struct sockaddr *)&sin, netid);
908  
909  	/*
910  	 * User space didn't support rpcbind v4, so retry this
911  	 * registration request with the legacy rpcbind v2 protocol.
912  	 */
913  	if (error == -EPROTONOSUPPORT)
914  		error = rpcb_register(net, program, version, protocol, port);
915  
916  	return error;
917  }
918  
919  #if IS_ENABLED(CONFIG_IPV6)
920  /*
921   * Register an "inet6" protocol family netid with the local
922   * rpcbind daemon via an rpcbind v4 SET request.
923   *
924   * No netconfig infrastructure is available in the kernel, so
925   * we map IP_ protocol numbers to netids by hand.
926   *
927   * Returns zero on success; a negative errno value is returned
928   * if any error occurs.
929   */
930  static int __svc_rpcb_register6(struct net *net, const u32 program,
931  				const u32 version,
932  				const unsigned short protocol,
933  				const unsigned short port)
934  {
935  	const struct sockaddr_in6 sin6 = {
936  		.sin6_family		= AF_INET6,
937  		.sin6_addr		= IN6ADDR_ANY_INIT,
938  		.sin6_port		= htons(port),
939  	};
940  	const char *netid;
941  	int error;
942  
943  	switch (protocol) {
944  	case IPPROTO_UDP:
945  		netid = RPCBIND_NETID_UDP6;
946  		break;
947  	case IPPROTO_TCP:
948  		netid = RPCBIND_NETID_TCP6;
949  		break;
950  	default:
951  		return -ENOPROTOOPT;
952  	}
953  
954  	error = rpcb_v4_register(net, program, version,
955  					(const struct sockaddr *)&sin6, netid);
956  
957  	/*
958  	 * User space didn't support rpcbind version 4, so we won't
959  	 * use a PF_INET6 listener.
960  	 */
961  	if (error == -EPROTONOSUPPORT)
962  		error = -EAFNOSUPPORT;
963  
964  	return error;
965  }
966  #endif	/* IS_ENABLED(CONFIG_IPV6) */
967  
968  /*
969   * Register a kernel RPC service via rpcbind version 4.
970   *
971   * Returns zero on success; a negative errno value is returned
972   * if any error occurs.
973   */
974  static int __svc_register(struct net *net, const char *progname,
975  			  const u32 program, const u32 version,
976  			  const int family,
977  			  const unsigned short protocol,
978  			  const unsigned short port)
979  {
980  	int error = -EAFNOSUPPORT;
981  
982  	switch (family) {
983  	case PF_INET:
984  		error = __svc_rpcb_register4(net, program, version,
985  						protocol, port);
986  		break;
987  #if IS_ENABLED(CONFIG_IPV6)
988  	case PF_INET6:
989  		error = __svc_rpcb_register6(net, program, version,
990  						protocol, port);
991  #endif
992  	}
993  
994  	return error;
995  }
996  
997  int svc_rpcbind_set_version(struct net *net,
998  			    const struct svc_program *progp,
999  			    u32 version, int family,
1000  			    unsigned short proto,
1001  			    unsigned short port)
1002  {
1003  	dprintk("svc: svc_register(%sv%d, %s, %u, %u)\n",
1004  		progp->pg_name, version,
1005  		proto == IPPROTO_UDP?  "udp" : "tcp",
1006  		port, family);
1007  
1008  	return __svc_register(net, progp->pg_name, progp->pg_prog,
1009  				version, family, proto, port);
1010  
1011  }
1012  EXPORT_SYMBOL_GPL(svc_rpcbind_set_version);
1013  
1014  int svc_generic_rpcbind_set(struct net *net,
1015  			    const struct svc_program *progp,
1016  			    u32 version, int family,
1017  			    unsigned short proto,
1018  			    unsigned short port)
1019  {
1020  	const struct svc_version *vers = progp->pg_vers[version];
1021  	int error;
1022  
1023  	if (vers == NULL)
1024  		return 0;
1025  
1026  	if (vers->vs_hidden) {
1027  		dprintk("svc: svc_register(%sv%d, %s, %u, %u)"
1028  			" (but not telling portmap)\n",
1029  			progp->pg_name, version,
1030  			proto == IPPROTO_UDP?  "udp" : "tcp",
1031  			port, family);
1032  		return 0;
1033  	}
1034  
1035  	/*
1036  	 * Don't register a UDP port if we need congestion
1037  	 * control.
1038  	 */
1039  	if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1040  		return 0;
1041  
1042  	error = svc_rpcbind_set_version(net, progp, version,
1043  					family, proto, port);
1044  
1045  	return (vers->vs_rpcb_optnl) ? 0 : error;
1046  }
1047  EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);
1048  
1049  /**
1050   * svc_register - register an RPC service with the local portmapper
1051   * @serv: svc_serv struct for the service to register
1052   * @net: net namespace for the service to register
1053   * @family: protocol family of service's listener socket
1054   * @proto: transport protocol number to advertise
1055   * @port: port to advertise
1056   *
1057   * Service is registered for any address in the passed-in protocol family
1058   */
1059  int svc_register(const struct svc_serv *serv, struct net *net,
1060  		 const int family, const unsigned short proto,
1061  		 const unsigned short port)
1062  {
1063  	struct svc_program	*progp;
1064  	unsigned int		i;
1065  	int			error = 0;
1066  
1067  	WARN_ON_ONCE(proto == 0 && port == 0);
1068  	if (proto == 0 && port == 0)
1069  		return -EINVAL;
1070  
1071  	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1072  		for (i = 0; i < progp->pg_nvers; i++) {
1073  
1074  			error = progp->pg_rpcbind_set(net, progp, i,
1075  					family, proto, port);
1076  			if (error < 0) {
1077  				printk(KERN_WARNING "svc: failed to register "
1078  					"%sv%u RPC service (errno %d).\n",
1079  					progp->pg_name, i, -error);
1080  				break;
1081  			}
1082  		}
1083  	}
1084  
1085  	return error;
1086  }
1087  
1088  /*
1089   * If user space is running rpcbind, it should take the v4 UNSET
1090   * and clear everything for this [program, version].  If user space
1091   * is running portmap, it will reject the v4 UNSET, but won't have
1092   * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1093   * in this case to clear all existing entries for [program, version].
1094   */
1095  static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1096  			     const char *progname)
1097  {
1098  	int error;
1099  
1100  	error = rpcb_v4_register(net, program, version, NULL, "");
1101  
1102  	/*
1103  	 * User space didn't support rpcbind v4, so retry this
1104  	 * request with the legacy rpcbind v2 protocol.
1105  	 */
1106  	if (error == -EPROTONOSUPPORT)
1107  		error = rpcb_register(net, program, version, 0, 0);
1108  
1109  	dprintk("svc: %s(%sv%u), error %d\n",
1110  			__func__, progname, version, error);
1111  }
1112  
1113  /*
1114   * All netids, bind addresses and ports registered for [program, version]
1115   * are removed from the local rpcbind database (if the service is not
1116   * hidden) to make way for a new instance of the service.
1117   *
1118   * The result of unregistration is reported via dprintk for those who want
1119   * verification of the result, but is otherwise not important.
1120   */
1121  static void svc_unregister(const struct svc_serv *serv, struct net *net)
1122  {
1123  	struct svc_program *progp;
1124  	unsigned long flags;
1125  	unsigned int i;
1126  
1127  	clear_thread_flag(TIF_SIGPENDING);
1128  
1129  	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1130  		for (i = 0; i < progp->pg_nvers; i++) {
1131  			if (progp->pg_vers[i] == NULL)
1132  				continue;
1133  			if (progp->pg_vers[i]->vs_hidden)
1134  				continue;
1135  
1136  			dprintk("svc: attempting to unregister %sv%u\n",
1137  				progp->pg_name, i);
1138  			__svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1139  		}
1140  	}
1141  
1142  	spin_lock_irqsave(&current->sighand->siglock, flags);
1143  	recalc_sigpending();
1144  	spin_unlock_irqrestore(&current->sighand->siglock, flags);
1145  }
1146  
1147  /*
1148   * dprintk the given error with the address of the client that caused it.
1149   */
1150  #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1151  static __printf(2, 3)
1152  void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1153  {
1154  	struct va_format vaf;
1155  	va_list args;
1156  	char 	buf[RPC_MAX_ADDRBUFLEN];
1157  
1158  	va_start(args, fmt);
1159  
1160  	vaf.fmt = fmt;
1161  	vaf.va = &args;
1162  
1163  	dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1164  
1165  	va_end(args);
1166  }
1167  #else
1168  static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1169  #endif
1170  
1171  __be32
1172  svc_return_autherr(struct svc_rqst *rqstp, __be32 auth_err)
1173  {
1174  	set_bit(RQ_AUTHERR, &rqstp->rq_flags);
1175  	return auth_err;
1176  }
1177  EXPORT_SYMBOL_GPL(svc_return_autherr);
1178  
1179  static __be32
1180  svc_get_autherr(struct svc_rqst *rqstp, __be32 *statp)
1181  {
1182  	if (test_and_clear_bit(RQ_AUTHERR, &rqstp->rq_flags))
1183  		return *statp;
1184  	return rpc_auth_ok;
1185  }
1186  
1187  static int
1188  svc_generic_dispatch(struct svc_rqst *rqstp, __be32 *statp)
1189  {
1190  	struct kvec *argv = &rqstp->rq_arg.head[0];
1191  	struct kvec *resv = &rqstp->rq_res.head[0];
1192  	const struct svc_procedure *procp = rqstp->rq_procinfo;
1193  
1194  	/*
1195  	 * Decode arguments
1196  	 * XXX: why do we ignore the return value?
1197  	 */
1198  	if (procp->pc_decode &&
1199  	    !procp->pc_decode(rqstp, argv->iov_base)) {
1200  		*statp = rpc_garbage_args;
1201  		return 1;
1202  	}
1203  
1204  	*statp = procp->pc_func(rqstp);
1205  
1206  	if (*statp == rpc_drop_reply ||
1207  	    test_bit(RQ_DROPME, &rqstp->rq_flags))
1208  		return 0;
1209  
1210  	if (test_bit(RQ_AUTHERR, &rqstp->rq_flags))
1211  		return 1;
1212  
1213  	if (*statp != rpc_success)
1214  		return 1;
1215  
1216  	/* Encode reply */
1217  	if (procp->pc_encode &&
1218  	    !procp->pc_encode(rqstp, resv->iov_base + resv->iov_len)) {
1219  		dprintk("svc: failed to encode reply\n");
1220  		/* serv->sv_stats->rpcsystemerr++; */
1221  		*statp = rpc_system_err;
1222  	}
1223  	return 1;
1224  }
1225  
1226  __be32
1227  svc_generic_init_request(struct svc_rqst *rqstp,
1228  		const struct svc_program *progp,
1229  		struct svc_process_info *ret)
1230  {
1231  	const struct svc_version *versp = NULL;	/* compiler food */
1232  	const struct svc_procedure *procp = NULL;
1233  
1234  	if (rqstp->rq_vers >= progp->pg_nvers )
1235  		goto err_bad_vers;
1236  	  versp = progp->pg_vers[rqstp->rq_vers];
1237  	  if (!versp)
1238  		goto err_bad_vers;
1239  
1240  	/*
1241  	 * Some protocol versions (namely NFSv4) require some form of
1242  	 * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1243  	 * In other words, UDP is not allowed. We mark those when setting
1244  	 * up the svc_xprt, and verify that here.
1245  	 *
1246  	 * The spec is not very clear about what error should be returned
1247  	 * when someone tries to access a server that is listening on UDP
1248  	 * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1249  	 * fit.
1250  	 */
1251  	if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1252  	    !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1253  		goto err_bad_vers;
1254  
1255  	if (rqstp->rq_proc >= versp->vs_nproc)
1256  		goto err_bad_proc;
1257  	rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
1258  	if (!procp)
1259  		goto err_bad_proc;
1260  
1261  	/* Initialize storage for argp and resp */
1262  	memset(rqstp->rq_argp, 0, procp->pc_argsize);
1263  	memset(rqstp->rq_resp, 0, procp->pc_ressize);
1264  
1265  	/* Bump per-procedure stats counter */
1266  	versp->vs_count[rqstp->rq_proc]++;
1267  
1268  	ret->dispatch = versp->vs_dispatch;
1269  	return rpc_success;
1270  err_bad_vers:
1271  	ret->mismatch.lovers = progp->pg_lovers;
1272  	ret->mismatch.hivers = progp->pg_hivers;
1273  	return rpc_prog_mismatch;
1274  err_bad_proc:
1275  	return rpc_proc_unavail;
1276  }
1277  EXPORT_SYMBOL_GPL(svc_generic_init_request);
1278  
1279  /*
1280   * Common routine for processing the RPC request.
1281   */
1282  static int
1283  svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1284  {
1285  	struct svc_program	*progp;
1286  	const struct svc_procedure *procp = NULL;
1287  	struct svc_serv		*serv = rqstp->rq_server;
1288  	struct svc_process_info process;
1289  	__be32			*statp;
1290  	u32			prog, vers;
1291  	__be32			auth_stat, rpc_stat;
1292  	int			auth_res;
1293  	__be32			*reply_statp;
1294  
1295  	rpc_stat = rpc_success;
1296  
1297  	if (argv->iov_len < 6*4)
1298  		goto err_short_len;
1299  
1300  	/* Will be turned off by GSS integrity and privacy services */
1301  	set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1302  	/* Will be turned off only when NFSv4 Sessions are used */
1303  	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1304  	clear_bit(RQ_DROPME, &rqstp->rq_flags);
1305  
1306  	svc_putu32(resv, rqstp->rq_xid);
1307  
1308  	vers = svc_getnl(argv);
1309  
1310  	/* First words of reply: */
1311  	svc_putnl(resv, 1);		/* REPLY */
1312  
1313  	if (vers != 2)		/* RPC version number */
1314  		goto err_bad_rpc;
1315  
1316  	/* Save position in case we later decide to reject: */
1317  	reply_statp = resv->iov_base + resv->iov_len;
1318  
1319  	svc_putnl(resv, 0);		/* ACCEPT */
1320  
1321  	rqstp->rq_prog = prog = svc_getnl(argv);	/* program number */
1322  	rqstp->rq_vers = svc_getnl(argv);	/* version number */
1323  	rqstp->rq_proc = svc_getnl(argv);	/* procedure number */
1324  
1325  	for (progp = serv->sv_program; progp; progp = progp->pg_next)
1326  		if (prog == progp->pg_prog)
1327  			break;
1328  
1329  	/*
1330  	 * Decode auth data, and add verifier to reply buffer.
1331  	 * We do this before anything else in order to get a decent
1332  	 * auth verifier.
1333  	 */
1334  	auth_res = svc_authenticate(rqstp, &auth_stat);
1335  	/* Also give the program a chance to reject this call: */
1336  	if (auth_res == SVC_OK && progp) {
1337  		auth_stat = rpc_autherr_badcred;
1338  		auth_res = progp->pg_authenticate(rqstp);
1339  	}
1340  	switch (auth_res) {
1341  	case SVC_OK:
1342  		break;
1343  	case SVC_GARBAGE:
1344  		goto err_garbage;
1345  	case SVC_SYSERR:
1346  		rpc_stat = rpc_system_err;
1347  		goto err_bad;
1348  	case SVC_DENIED:
1349  		goto err_bad_auth;
1350  	case SVC_CLOSE:
1351  		goto close;
1352  	case SVC_DROP:
1353  		goto dropit;
1354  	case SVC_COMPLETE:
1355  		goto sendit;
1356  	}
1357  
1358  	if (progp == NULL)
1359  		goto err_bad_prog;
1360  
1361  	rpc_stat = progp->pg_init_request(rqstp, progp, &process);
1362  	switch (rpc_stat) {
1363  	case rpc_success:
1364  		break;
1365  	case rpc_prog_unavail:
1366  		goto err_bad_prog;
1367  	case rpc_prog_mismatch:
1368  		goto err_bad_vers;
1369  	case rpc_proc_unavail:
1370  		goto err_bad_proc;
1371  	}
1372  
1373  	procp = rqstp->rq_procinfo;
1374  	/* Should this check go into the dispatcher? */
1375  	if (!procp || !procp->pc_func)
1376  		goto err_bad_proc;
1377  
1378  	/* Syntactic check complete */
1379  	serv->sv_stats->rpccnt++;
1380  	trace_svc_process(rqstp, progp->pg_name);
1381  
1382  	/* Build the reply header. */
1383  	statp = resv->iov_base +resv->iov_len;
1384  	svc_putnl(resv, RPC_SUCCESS);
1385  
1386  	/* un-reserve some of the out-queue now that we have a
1387  	 * better idea of reply size
1388  	 */
1389  	if (procp->pc_xdrressize)
1390  		svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1391  
1392  	/* Call the function that processes the request. */
1393  	if (!process.dispatch) {
1394  		if (!svc_generic_dispatch(rqstp, statp))
1395  			goto release_dropit;
1396  		if (*statp == rpc_garbage_args)
1397  			goto err_garbage;
1398  		auth_stat = svc_get_autherr(rqstp, statp);
1399  		if (auth_stat != rpc_auth_ok)
1400  			goto err_release_bad_auth;
1401  	} else {
1402  		dprintk("svc: calling dispatcher\n");
1403  		if (!process.dispatch(rqstp, statp))
1404  			goto release_dropit; /* Release reply info */
1405  	}
1406  
1407  	/* Check RPC status result */
1408  	if (*statp != rpc_success)
1409  		resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1410  
1411  	/* Release reply info */
1412  	if (procp->pc_release)
1413  		procp->pc_release(rqstp);
1414  
1415  	if (procp->pc_encode == NULL)
1416  		goto dropit;
1417  
1418   sendit:
1419  	if (svc_authorise(rqstp))
1420  		goto close;
1421  	return 1;		/* Caller can now send it */
1422  
1423  release_dropit:
1424  	if (procp->pc_release)
1425  		procp->pc_release(rqstp);
1426   dropit:
1427  	svc_authorise(rqstp);	/* doesn't hurt to call this twice */
1428  	dprintk("svc: svc_process dropit\n");
1429  	return 0;
1430  
1431   close:
1432  	if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1433  		svc_close_xprt(rqstp->rq_xprt);
1434  	dprintk("svc: svc_process close\n");
1435  	return 0;
1436  
1437  err_short_len:
1438  	svc_printk(rqstp, "short len %zd, dropping request\n",
1439  			argv->iov_len);
1440  	goto close;
1441  
1442  err_bad_rpc:
1443  	serv->sv_stats->rpcbadfmt++;
1444  	svc_putnl(resv, 1);	/* REJECT */
1445  	svc_putnl(resv, 0);	/* RPC_MISMATCH */
1446  	svc_putnl(resv, 2);	/* Only RPCv2 supported */
1447  	svc_putnl(resv, 2);
1448  	goto sendit;
1449  
1450  err_release_bad_auth:
1451  	if (procp->pc_release)
1452  		procp->pc_release(rqstp);
1453  err_bad_auth:
1454  	dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1455  	serv->sv_stats->rpcbadauth++;
1456  	/* Restore write pointer to location of accept status: */
1457  	xdr_ressize_check(rqstp, reply_statp);
1458  	svc_putnl(resv, 1);	/* REJECT */
1459  	svc_putnl(resv, 1);	/* AUTH_ERROR */
1460  	svc_putnl(resv, ntohl(auth_stat));	/* status */
1461  	goto sendit;
1462  
1463  err_bad_prog:
1464  	dprintk("svc: unknown program %d\n", prog);
1465  	serv->sv_stats->rpcbadfmt++;
1466  	svc_putnl(resv, RPC_PROG_UNAVAIL);
1467  	goto sendit;
1468  
1469  err_bad_vers:
1470  	svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1471  		       rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
1472  
1473  	serv->sv_stats->rpcbadfmt++;
1474  	svc_putnl(resv, RPC_PROG_MISMATCH);
1475  	svc_putnl(resv, process.mismatch.lovers);
1476  	svc_putnl(resv, process.mismatch.hivers);
1477  	goto sendit;
1478  
1479  err_bad_proc:
1480  	svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
1481  
1482  	serv->sv_stats->rpcbadfmt++;
1483  	svc_putnl(resv, RPC_PROC_UNAVAIL);
1484  	goto sendit;
1485  
1486  err_garbage:
1487  	svc_printk(rqstp, "failed to decode args\n");
1488  
1489  	rpc_stat = rpc_garbage_args;
1490  err_bad:
1491  	serv->sv_stats->rpcbadfmt++;
1492  	svc_putnl(resv, ntohl(rpc_stat));
1493  	goto sendit;
1494  }
1495  
1496  /*
1497   * Process the RPC request.
1498   */
1499  int
1500  svc_process(struct svc_rqst *rqstp)
1501  {
1502  	struct kvec		*argv = &rqstp->rq_arg.head[0];
1503  	struct kvec		*resv = &rqstp->rq_res.head[0];
1504  	struct svc_serv		*serv = rqstp->rq_server;
1505  	u32			dir;
1506  
1507  	/*
1508  	 * Setup response xdr_buf.
1509  	 * Initially it has just one page
1510  	 */
1511  	rqstp->rq_next_page = &rqstp->rq_respages[1];
1512  	resv->iov_base = page_address(rqstp->rq_respages[0]);
1513  	resv->iov_len = 0;
1514  	rqstp->rq_res.pages = rqstp->rq_respages + 1;
1515  	rqstp->rq_res.len = 0;
1516  	rqstp->rq_res.page_base = 0;
1517  	rqstp->rq_res.page_len = 0;
1518  	rqstp->rq_res.buflen = PAGE_SIZE;
1519  	rqstp->rq_res.tail[0].iov_base = NULL;
1520  	rqstp->rq_res.tail[0].iov_len = 0;
1521  
1522  	dir  = svc_getnl(argv);
1523  	if (dir != 0) {
1524  		/* direction != CALL */
1525  		svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1526  		serv->sv_stats->rpcbadfmt++;
1527  		goto out_drop;
1528  	}
1529  
1530  	/* Reserve space for the record marker */
1531  	if (rqstp->rq_prot == IPPROTO_TCP)
1532  		svc_putnl(resv, 0);
1533  
1534  	/* Returns 1 for send, 0 for drop */
1535  	if (likely(svc_process_common(rqstp, argv, resv)))
1536  		return svc_send(rqstp);
1537  
1538  out_drop:
1539  	svc_drop(rqstp);
1540  	return 0;
1541  }
1542  EXPORT_SYMBOL_GPL(svc_process);
1543  
1544  #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1545  /*
1546   * Process a backchannel RPC request that arrived over an existing
1547   * outbound connection
1548   */
1549  int
1550  bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1551  	       struct svc_rqst *rqstp)
1552  {
1553  	struct kvec	*argv = &rqstp->rq_arg.head[0];
1554  	struct kvec	*resv = &rqstp->rq_res.head[0];
1555  	struct rpc_task *task;
1556  	int proc_error;
1557  	int error;
1558  
1559  	dprintk("svc: %s(%p)\n", __func__, req);
1560  
1561  	/* Build the svc_rqst used by the common processing routine */
1562  	rqstp->rq_xid = req->rq_xid;
1563  	rqstp->rq_prot = req->rq_xprt->prot;
1564  	rqstp->rq_server = serv;
1565  	rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1566  
1567  	rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1568  	memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1569  	memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1570  	memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1571  
1572  	/* Adjust the argument buffer length */
1573  	rqstp->rq_arg.len = req->rq_private_buf.len;
1574  	if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1575  		rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1576  		rqstp->rq_arg.page_len = 0;
1577  	} else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1578  			rqstp->rq_arg.page_len)
1579  		rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1580  			rqstp->rq_arg.head[0].iov_len;
1581  	else
1582  		rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1583  			rqstp->rq_arg.page_len;
1584  
1585  	/* reset result send buffer "put" position */
1586  	resv->iov_len = 0;
1587  
1588  	/*
1589  	 * Skip the next two words because they've already been
1590  	 * processed in the transport
1591  	 */
1592  	svc_getu32(argv);	/* XID */
1593  	svc_getnl(argv);	/* CALLDIR */
1594  
1595  	/* Parse and execute the bc call */
1596  	proc_error = svc_process_common(rqstp, argv, resv);
1597  
1598  	atomic_dec(&req->rq_xprt->bc_slot_count);
1599  	if (!proc_error) {
1600  		/* Processing error: drop the request */
1601  		xprt_free_bc_request(req);
1602  		error = -EINVAL;
1603  		goto out;
1604  	}
1605  	/* Finally, send the reply synchronously */
1606  	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1607  	task = rpc_run_bc_task(req);
1608  	if (IS_ERR(task)) {
1609  		error = PTR_ERR(task);
1610  		goto out;
1611  	}
1612  
1613  	WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1614  	error = task->tk_status;
1615  	rpc_put_task(task);
1616  
1617  out:
1618  	dprintk("svc: %s(), error=%d\n", __func__, error);
1619  	return error;
1620  }
1621  EXPORT_SYMBOL_GPL(bc_svc_process);
1622  #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1623  
1624  /*
1625   * Return (transport-specific) limit on the rpc payload.
1626   */
1627  u32 svc_max_payload(const struct svc_rqst *rqstp)
1628  {
1629  	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1630  
1631  	if (rqstp->rq_server->sv_max_payload < max)
1632  		max = rqstp->rq_server->sv_max_payload;
1633  	return max;
1634  }
1635  EXPORT_SYMBOL_GPL(svc_max_payload);
1636  
1637  /**
1638   * svc_fill_write_vector - Construct data argument for VFS write call
1639   * @rqstp: svc_rqst to operate on
1640   * @pages: list of pages containing data payload
1641   * @first: buffer containing first section of write payload
1642   * @total: total number of bytes of write payload
1643   *
1644   * Fills in rqstp::rq_vec, and returns the number of elements.
1645   */
1646  unsigned int svc_fill_write_vector(struct svc_rqst *rqstp, struct page **pages,
1647  				   struct kvec *first, size_t total)
1648  {
1649  	struct kvec *vec = rqstp->rq_vec;
1650  	unsigned int i;
1651  
1652  	/* Some types of transport can present the write payload
1653  	 * entirely in rq_arg.pages. In this case, @first is empty.
1654  	 */
1655  	i = 0;
1656  	if (first->iov_len) {
1657  		vec[i].iov_base = first->iov_base;
1658  		vec[i].iov_len = min_t(size_t, total, first->iov_len);
1659  		total -= vec[i].iov_len;
1660  		++i;
1661  	}
1662  
1663  	while (total) {
1664  		vec[i].iov_base = page_address(*pages);
1665  		vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1666  		total -= vec[i].iov_len;
1667  		++i;
1668  		++pages;
1669  	}
1670  
1671  	WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1672  	return i;
1673  }
1674  EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1675  
1676  /**
1677   * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1678   * @rqstp: svc_rqst to operate on
1679   * @first: buffer containing first section of pathname
1680   * @p: buffer containing remaining section of pathname
1681   * @total: total length of the pathname argument
1682   *
1683   * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
1684   * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
1685   * the returned string.
1686   */
1687  char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1688  				void *p, size_t total)
1689  {
1690  	size_t len, remaining;
1691  	char *result, *dst;
1692  
1693  	result = kmalloc(total + 1, GFP_KERNEL);
1694  	if (!result)
1695  		return ERR_PTR(-ESERVERFAULT);
1696  
1697  	dst = result;
1698  	remaining = total;
1699  
1700  	len = min_t(size_t, total, first->iov_len);
1701  	if (len) {
1702  		memcpy(dst, first->iov_base, len);
1703  		dst += len;
1704  		remaining -= len;
1705  	}
1706  
1707  	if (remaining) {
1708  		len = min_t(size_t, remaining, PAGE_SIZE);
1709  		memcpy(dst, p, len);
1710  		dst += len;
1711  	}
1712  
1713  	*dst = '\0';
1714  
1715  	/* Sanity check: Linux doesn't allow the pathname argument to
1716  	 * contain a NUL byte.
1717  	 */
1718  	if (strlen(result) != total) {
1719  		kfree(result);
1720  		return ERR_PTR(-EINVAL);
1721  	}
1722  	return result;
1723  }
1724  EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);
1725