xref: /openbmc/linux/net/sunrpc/svc.c (revision a09d2831)
1 /*
2  * linux/net/sunrpc/svc.c
3  *
4  * High-level RPC service routines
5  *
6  * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
7  *
8  * Multiple threads pools and NUMAisation
9  * Copyright (c) 2006 Silicon Graphics, Inc.
10  * by Greg Banks <gnb@melbourne.sgi.com>
11  */
12 
13 #include <linux/linkage.h>
14 #include <linux/sched.h>
15 #include <linux/errno.h>
16 #include <linux/net.h>
17 #include <linux/in.h>
18 #include <linux/mm.h>
19 #include <linux/interrupt.h>
20 #include <linux/module.h>
21 #include <linux/kthread.h>
22 
23 #include <linux/sunrpc/types.h>
24 #include <linux/sunrpc/xdr.h>
25 #include <linux/sunrpc/stats.h>
26 #include <linux/sunrpc/svcsock.h>
27 #include <linux/sunrpc/clnt.h>
28 #include <linux/sunrpc/bc_xprt.h>
29 
30 #define RPCDBG_FACILITY	RPCDBG_SVCDSP
31 
32 static void svc_unregister(const struct svc_serv *serv);
33 
34 #define svc_serv_is_pooled(serv)    ((serv)->sv_function)
35 
36 /*
37  * Mode for mapping cpus to pools.
38  */
39 enum {
40 	SVC_POOL_AUTO = -1,	/* choose one of the others */
41 	SVC_POOL_GLOBAL,	/* no mapping, just a single global pool
42 				 * (legacy & UP mode) */
43 	SVC_POOL_PERCPU,	/* one pool per cpu */
44 	SVC_POOL_PERNODE	/* one pool per numa node */
45 };
46 #define SVC_POOL_DEFAULT	SVC_POOL_GLOBAL
47 
48 /*
49  * Structure for mapping cpus to pools and vice versa.
50  * Setup once during sunrpc initialisation.
51  */
52 static struct svc_pool_map {
53 	int count;			/* How many svc_servs use us */
54 	int mode;			/* Note: int not enum to avoid
55 					 * warnings about "enumeration value
56 					 * not handled in switch" */
57 	unsigned int npools;
58 	unsigned int *pool_to;		/* maps pool id to cpu or node */
59 	unsigned int *to_pool;		/* maps cpu or node to pool id */
60 } svc_pool_map = {
61 	.count = 0,
62 	.mode = SVC_POOL_DEFAULT
63 };
64 static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
65 
66 static int
67 param_set_pool_mode(const char *val, struct kernel_param *kp)
68 {
69 	int *ip = (int *)kp->arg;
70 	struct svc_pool_map *m = &svc_pool_map;
71 	int err;
72 
73 	mutex_lock(&svc_pool_map_mutex);
74 
75 	err = -EBUSY;
76 	if (m->count)
77 		goto out;
78 
79 	err = 0;
80 	if (!strncmp(val, "auto", 4))
81 		*ip = SVC_POOL_AUTO;
82 	else if (!strncmp(val, "global", 6))
83 		*ip = SVC_POOL_GLOBAL;
84 	else if (!strncmp(val, "percpu", 6))
85 		*ip = SVC_POOL_PERCPU;
86 	else if (!strncmp(val, "pernode", 7))
87 		*ip = SVC_POOL_PERNODE;
88 	else
89 		err = -EINVAL;
90 
91 out:
92 	mutex_unlock(&svc_pool_map_mutex);
93 	return err;
94 }
95 
96 static int
97 param_get_pool_mode(char *buf, struct kernel_param *kp)
98 {
99 	int *ip = (int *)kp->arg;
100 
101 	switch (*ip)
102 	{
103 	case SVC_POOL_AUTO:
104 		return strlcpy(buf, "auto", 20);
105 	case SVC_POOL_GLOBAL:
106 		return strlcpy(buf, "global", 20);
107 	case SVC_POOL_PERCPU:
108 		return strlcpy(buf, "percpu", 20);
109 	case SVC_POOL_PERNODE:
110 		return strlcpy(buf, "pernode", 20);
111 	default:
112 		return sprintf(buf, "%d", *ip);
113 	}
114 }
115 
116 module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
117 		 &svc_pool_map.mode, 0644);
118 
119 /*
120  * Detect best pool mapping mode heuristically,
121  * according to the machine's topology.
122  */
123 static int
124 svc_pool_map_choose_mode(void)
125 {
126 	unsigned int node;
127 
128 	if (nr_online_nodes > 1) {
129 		/*
130 		 * Actually have multiple NUMA nodes,
131 		 * so split pools on NUMA node boundaries
132 		 */
133 		return SVC_POOL_PERNODE;
134 	}
135 
136 	node = any_online_node(node_online_map);
137 	if (nr_cpus_node(node) > 2) {
138 		/*
139 		 * Non-trivial SMP, or CONFIG_NUMA on
140 		 * non-NUMA hardware, e.g. with a generic
141 		 * x86_64 kernel on Xeons.  In this case we
142 		 * want to divide the pools on cpu boundaries.
143 		 */
144 		return SVC_POOL_PERCPU;
145 	}
146 
147 	/* default: one global pool */
148 	return SVC_POOL_GLOBAL;
149 }
150 
151 /*
152  * Allocate the to_pool[] and pool_to[] arrays.
153  * Returns 0 on success or an errno.
154  */
155 static int
156 svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
157 {
158 	m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
159 	if (!m->to_pool)
160 		goto fail;
161 	m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
162 	if (!m->pool_to)
163 		goto fail_free;
164 
165 	return 0;
166 
167 fail_free:
168 	kfree(m->to_pool);
169 fail:
170 	return -ENOMEM;
171 }
172 
173 /*
174  * Initialise the pool map for SVC_POOL_PERCPU mode.
175  * Returns number of pools or <0 on error.
176  */
177 static int
178 svc_pool_map_init_percpu(struct svc_pool_map *m)
179 {
180 	unsigned int maxpools = nr_cpu_ids;
181 	unsigned int pidx = 0;
182 	unsigned int cpu;
183 	int err;
184 
185 	err = svc_pool_map_alloc_arrays(m, maxpools);
186 	if (err)
187 		return err;
188 
189 	for_each_online_cpu(cpu) {
190 		BUG_ON(pidx > maxpools);
191 		m->to_pool[cpu] = pidx;
192 		m->pool_to[pidx] = cpu;
193 		pidx++;
194 	}
195 	/* cpus brought online later all get mapped to pool0, sorry */
196 
197 	return pidx;
198 };
199 
200 
201 /*
202  * Initialise the pool map for SVC_POOL_PERNODE mode.
203  * Returns number of pools or <0 on error.
204  */
205 static int
206 svc_pool_map_init_pernode(struct svc_pool_map *m)
207 {
208 	unsigned int maxpools = nr_node_ids;
209 	unsigned int pidx = 0;
210 	unsigned int node;
211 	int err;
212 
213 	err = svc_pool_map_alloc_arrays(m, maxpools);
214 	if (err)
215 		return err;
216 
217 	for_each_node_with_cpus(node) {
218 		/* some architectures (e.g. SN2) have cpuless nodes */
219 		BUG_ON(pidx > maxpools);
220 		m->to_pool[node] = pidx;
221 		m->pool_to[pidx] = node;
222 		pidx++;
223 	}
224 	/* nodes brought online later all get mapped to pool0, sorry */
225 
226 	return pidx;
227 }
228 
229 
230 /*
231  * Add a reference to the global map of cpus to pools (and
232  * vice versa).  Initialise the map if we're the first user.
233  * Returns the number of pools.
234  */
235 static unsigned int
236 svc_pool_map_get(void)
237 {
238 	struct svc_pool_map *m = &svc_pool_map;
239 	int npools = -1;
240 
241 	mutex_lock(&svc_pool_map_mutex);
242 
243 	if (m->count++) {
244 		mutex_unlock(&svc_pool_map_mutex);
245 		return m->npools;
246 	}
247 
248 	if (m->mode == SVC_POOL_AUTO)
249 		m->mode = svc_pool_map_choose_mode();
250 
251 	switch (m->mode) {
252 	case SVC_POOL_PERCPU:
253 		npools = svc_pool_map_init_percpu(m);
254 		break;
255 	case SVC_POOL_PERNODE:
256 		npools = svc_pool_map_init_pernode(m);
257 		break;
258 	}
259 
260 	if (npools < 0) {
261 		/* default, or memory allocation failure */
262 		npools = 1;
263 		m->mode = SVC_POOL_GLOBAL;
264 	}
265 	m->npools = npools;
266 
267 	mutex_unlock(&svc_pool_map_mutex);
268 	return m->npools;
269 }
270 
271 
272 /*
273  * Drop a reference to the global map of cpus to pools.
274  * When the last reference is dropped, the map data is
275  * freed; this allows the sysadmin to change the pool
276  * mode using the pool_mode module option without
277  * rebooting or re-loading sunrpc.ko.
278  */
279 static void
280 svc_pool_map_put(void)
281 {
282 	struct svc_pool_map *m = &svc_pool_map;
283 
284 	mutex_lock(&svc_pool_map_mutex);
285 
286 	if (!--m->count) {
287 		m->mode = SVC_POOL_DEFAULT;
288 		kfree(m->to_pool);
289 		kfree(m->pool_to);
290 		m->npools = 0;
291 	}
292 
293 	mutex_unlock(&svc_pool_map_mutex);
294 }
295 
296 
297 /*
298  * Set the given thread's cpus_allowed mask so that it
299  * will only run on cpus in the given pool.
300  */
301 static inline void
302 svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
303 {
304 	struct svc_pool_map *m = &svc_pool_map;
305 	unsigned int node = m->pool_to[pidx];
306 
307 	/*
308 	 * The caller checks for sv_nrpools > 1, which
309 	 * implies that we've been initialized.
310 	 */
311 	BUG_ON(m->count == 0);
312 
313 	switch (m->mode) {
314 	case SVC_POOL_PERCPU:
315 	{
316 		set_cpus_allowed_ptr(task, cpumask_of(node));
317 		break;
318 	}
319 	case SVC_POOL_PERNODE:
320 	{
321 		set_cpus_allowed_ptr(task, cpumask_of_node(node));
322 		break;
323 	}
324 	}
325 }
326 
327 /*
328  * Use the mapping mode to choose a pool for a given CPU.
329  * Used when enqueueing an incoming RPC.  Always returns
330  * a non-NULL pool pointer.
331  */
332 struct svc_pool *
333 svc_pool_for_cpu(struct svc_serv *serv, int cpu)
334 {
335 	struct svc_pool_map *m = &svc_pool_map;
336 	unsigned int pidx = 0;
337 
338 	/*
339 	 * An uninitialised map happens in a pure client when
340 	 * lockd is brought up, so silently treat it the
341 	 * same as SVC_POOL_GLOBAL.
342 	 */
343 	if (svc_serv_is_pooled(serv)) {
344 		switch (m->mode) {
345 		case SVC_POOL_PERCPU:
346 			pidx = m->to_pool[cpu];
347 			break;
348 		case SVC_POOL_PERNODE:
349 			pidx = m->to_pool[cpu_to_node(cpu)];
350 			break;
351 		}
352 	}
353 	return &serv->sv_pools[pidx % serv->sv_nrpools];
354 }
355 
356 
357 /*
358  * Create an RPC service
359  */
360 static struct svc_serv *
361 __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
362 	     void (*shutdown)(struct svc_serv *serv))
363 {
364 	struct svc_serv	*serv;
365 	unsigned int vers;
366 	unsigned int xdrsize;
367 	unsigned int i;
368 
369 	if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
370 		return NULL;
371 	serv->sv_name      = prog->pg_name;
372 	serv->sv_program   = prog;
373 	serv->sv_nrthreads = 1;
374 	serv->sv_stats     = prog->pg_stats;
375 	if (bufsize > RPCSVC_MAXPAYLOAD)
376 		bufsize = RPCSVC_MAXPAYLOAD;
377 	serv->sv_max_payload = bufsize? bufsize : 4096;
378 	serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
379 	serv->sv_shutdown  = shutdown;
380 	xdrsize = 0;
381 	while (prog) {
382 		prog->pg_lovers = prog->pg_nvers-1;
383 		for (vers=0; vers<prog->pg_nvers ; vers++)
384 			if (prog->pg_vers[vers]) {
385 				prog->pg_hivers = vers;
386 				if (prog->pg_lovers > vers)
387 					prog->pg_lovers = vers;
388 				if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
389 					xdrsize = prog->pg_vers[vers]->vs_xdrsize;
390 			}
391 		prog = prog->pg_next;
392 	}
393 	serv->sv_xdrsize   = xdrsize;
394 	INIT_LIST_HEAD(&serv->sv_tempsocks);
395 	INIT_LIST_HEAD(&serv->sv_permsocks);
396 	init_timer(&serv->sv_temptimer);
397 	spin_lock_init(&serv->sv_lock);
398 
399 	serv->sv_nrpools = npools;
400 	serv->sv_pools =
401 		kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
402 			GFP_KERNEL);
403 	if (!serv->sv_pools) {
404 		kfree(serv);
405 		return NULL;
406 	}
407 
408 	for (i = 0; i < serv->sv_nrpools; i++) {
409 		struct svc_pool *pool = &serv->sv_pools[i];
410 
411 		dprintk("svc: initialising pool %u for %s\n",
412 				i, serv->sv_name);
413 
414 		pool->sp_id = i;
415 		INIT_LIST_HEAD(&pool->sp_threads);
416 		INIT_LIST_HEAD(&pool->sp_sockets);
417 		INIT_LIST_HEAD(&pool->sp_all_threads);
418 		spin_lock_init(&pool->sp_lock);
419 	}
420 
421 	/* Remove any stale portmap registrations */
422 	svc_unregister(serv);
423 
424 	return serv;
425 }
426 
427 struct svc_serv *
428 svc_create(struct svc_program *prog, unsigned int bufsize,
429 	   void (*shutdown)(struct svc_serv *serv))
430 {
431 	return __svc_create(prog, bufsize, /*npools*/1, shutdown);
432 }
433 EXPORT_SYMBOL_GPL(svc_create);
434 
435 struct svc_serv *
436 svc_create_pooled(struct svc_program *prog, unsigned int bufsize,
437 		  void (*shutdown)(struct svc_serv *serv),
438 		  svc_thread_fn func, struct module *mod)
439 {
440 	struct svc_serv *serv;
441 	unsigned int npools = svc_pool_map_get();
442 
443 	serv = __svc_create(prog, bufsize, npools, shutdown);
444 
445 	if (serv != NULL) {
446 		serv->sv_function = func;
447 		serv->sv_module = mod;
448 	}
449 
450 	return serv;
451 }
452 EXPORT_SYMBOL_GPL(svc_create_pooled);
453 
454 /*
455  * Destroy an RPC service. Should be called with appropriate locking to
456  * protect the sv_nrthreads, sv_permsocks and sv_tempsocks.
457  */
458 void
459 svc_destroy(struct svc_serv *serv)
460 {
461 	dprintk("svc: svc_destroy(%s, %d)\n",
462 				serv->sv_program->pg_name,
463 				serv->sv_nrthreads);
464 
465 	if (serv->sv_nrthreads) {
466 		if (--(serv->sv_nrthreads) != 0) {
467 			svc_sock_update_bufs(serv);
468 			return;
469 		}
470 	} else
471 		printk("svc_destroy: no threads for serv=%p!\n", serv);
472 
473 	del_timer_sync(&serv->sv_temptimer);
474 
475 	svc_close_all(&serv->sv_tempsocks);
476 
477 	if (serv->sv_shutdown)
478 		serv->sv_shutdown(serv);
479 
480 	svc_close_all(&serv->sv_permsocks);
481 
482 	BUG_ON(!list_empty(&serv->sv_permsocks));
483 	BUG_ON(!list_empty(&serv->sv_tempsocks));
484 
485 	cache_clean_deferred(serv);
486 
487 	if (svc_serv_is_pooled(serv))
488 		svc_pool_map_put();
489 
490 #if defined(CONFIG_NFS_V4_1)
491 	svc_sock_destroy(serv->bc_xprt);
492 #endif /* CONFIG_NFS_V4_1 */
493 
494 	svc_unregister(serv);
495 	kfree(serv->sv_pools);
496 	kfree(serv);
497 }
498 EXPORT_SYMBOL_GPL(svc_destroy);
499 
500 /*
501  * Allocate an RPC server's buffer space.
502  * We allocate pages and place them in rq_argpages.
503  */
504 static int
505 svc_init_buffer(struct svc_rqst *rqstp, unsigned int size)
506 {
507 	unsigned int pages, arghi;
508 
509 	pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
510 				       * We assume one is at most one page
511 				       */
512 	arghi = 0;
513 	BUG_ON(pages > RPCSVC_MAXPAGES);
514 	while (pages) {
515 		struct page *p = alloc_page(GFP_KERNEL);
516 		if (!p)
517 			break;
518 		rqstp->rq_pages[arghi++] = p;
519 		pages--;
520 	}
521 	return pages == 0;
522 }
523 
524 /*
525  * Release an RPC server buffer
526  */
527 static void
528 svc_release_buffer(struct svc_rqst *rqstp)
529 {
530 	unsigned int i;
531 
532 	for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
533 		if (rqstp->rq_pages[i])
534 			put_page(rqstp->rq_pages[i]);
535 }
536 
537 struct svc_rqst *
538 svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool)
539 {
540 	struct svc_rqst	*rqstp;
541 
542 	rqstp = kzalloc(sizeof(*rqstp), GFP_KERNEL);
543 	if (!rqstp)
544 		goto out_enomem;
545 
546 	init_waitqueue_head(&rqstp->rq_wait);
547 
548 	serv->sv_nrthreads++;
549 	spin_lock_bh(&pool->sp_lock);
550 	pool->sp_nrthreads++;
551 	list_add(&rqstp->rq_all, &pool->sp_all_threads);
552 	spin_unlock_bh(&pool->sp_lock);
553 	rqstp->rq_server = serv;
554 	rqstp->rq_pool = pool;
555 
556 	rqstp->rq_argp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
557 	if (!rqstp->rq_argp)
558 		goto out_thread;
559 
560 	rqstp->rq_resp = kmalloc(serv->sv_xdrsize, GFP_KERNEL);
561 	if (!rqstp->rq_resp)
562 		goto out_thread;
563 
564 	if (!svc_init_buffer(rqstp, serv->sv_max_mesg))
565 		goto out_thread;
566 
567 	return rqstp;
568 out_thread:
569 	svc_exit_thread(rqstp);
570 out_enomem:
571 	return ERR_PTR(-ENOMEM);
572 }
573 EXPORT_SYMBOL_GPL(svc_prepare_thread);
574 
575 /*
576  * Choose a pool in which to create a new thread, for svc_set_num_threads
577  */
578 static inline struct svc_pool *
579 choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
580 {
581 	if (pool != NULL)
582 		return pool;
583 
584 	return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
585 }
586 
587 /*
588  * Choose a thread to kill, for svc_set_num_threads
589  */
590 static inline struct task_struct *
591 choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
592 {
593 	unsigned int i;
594 	struct task_struct *task = NULL;
595 
596 	if (pool != NULL) {
597 		spin_lock_bh(&pool->sp_lock);
598 	} else {
599 		/* choose a pool in round-robin fashion */
600 		for (i = 0; i < serv->sv_nrpools; i++) {
601 			pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
602 			spin_lock_bh(&pool->sp_lock);
603 			if (!list_empty(&pool->sp_all_threads))
604 				goto found_pool;
605 			spin_unlock_bh(&pool->sp_lock);
606 		}
607 		return NULL;
608 	}
609 
610 found_pool:
611 	if (!list_empty(&pool->sp_all_threads)) {
612 		struct svc_rqst *rqstp;
613 
614 		/*
615 		 * Remove from the pool->sp_all_threads list
616 		 * so we don't try to kill it again.
617 		 */
618 		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
619 		list_del_init(&rqstp->rq_all);
620 		task = rqstp->rq_task;
621 	}
622 	spin_unlock_bh(&pool->sp_lock);
623 
624 	return task;
625 }
626 
627 /*
628  * Create or destroy enough new threads to make the number
629  * of threads the given number.  If `pool' is non-NULL, applies
630  * only to threads in that pool, otherwise round-robins between
631  * all pools.  Must be called with a svc_get() reference and
632  * the BKL or another lock to protect access to svc_serv fields.
633  *
634  * Destroying threads relies on the service threads filling in
635  * rqstp->rq_task, which only the nfs ones do.  Assumes the serv
636  * has been created using svc_create_pooled().
637  *
638  * Based on code that used to be in nfsd_svc() but tweaked
639  * to be pool-aware.
640  */
641 int
642 svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
643 {
644 	struct svc_rqst	*rqstp;
645 	struct task_struct *task;
646 	struct svc_pool *chosen_pool;
647 	int error = 0;
648 	unsigned int state = serv->sv_nrthreads-1;
649 
650 	if (pool == NULL) {
651 		/* The -1 assumes caller has done a svc_get() */
652 		nrservs -= (serv->sv_nrthreads-1);
653 	} else {
654 		spin_lock_bh(&pool->sp_lock);
655 		nrservs -= pool->sp_nrthreads;
656 		spin_unlock_bh(&pool->sp_lock);
657 	}
658 
659 	/* create new threads */
660 	while (nrservs > 0) {
661 		nrservs--;
662 		chosen_pool = choose_pool(serv, pool, &state);
663 
664 		rqstp = svc_prepare_thread(serv, chosen_pool);
665 		if (IS_ERR(rqstp)) {
666 			error = PTR_ERR(rqstp);
667 			break;
668 		}
669 
670 		__module_get(serv->sv_module);
671 		task = kthread_create(serv->sv_function, rqstp, serv->sv_name);
672 		if (IS_ERR(task)) {
673 			error = PTR_ERR(task);
674 			module_put(serv->sv_module);
675 			svc_exit_thread(rqstp);
676 			break;
677 		}
678 
679 		rqstp->rq_task = task;
680 		if (serv->sv_nrpools > 1)
681 			svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
682 
683 		svc_sock_update_bufs(serv);
684 		wake_up_process(task);
685 	}
686 	/* destroy old threads */
687 	while (nrservs < 0 &&
688 	       (task = choose_victim(serv, pool, &state)) != NULL) {
689 		send_sig(SIGINT, task, 1);
690 		nrservs++;
691 	}
692 
693 	return error;
694 }
695 EXPORT_SYMBOL_GPL(svc_set_num_threads);
696 
697 /*
698  * Called from a server thread as it's exiting. Caller must hold the BKL or
699  * the "service mutex", whichever is appropriate for the service.
700  */
701 void
702 svc_exit_thread(struct svc_rqst *rqstp)
703 {
704 	struct svc_serv	*serv = rqstp->rq_server;
705 	struct svc_pool	*pool = rqstp->rq_pool;
706 
707 	svc_release_buffer(rqstp);
708 	kfree(rqstp->rq_resp);
709 	kfree(rqstp->rq_argp);
710 	kfree(rqstp->rq_auth_data);
711 
712 	spin_lock_bh(&pool->sp_lock);
713 	pool->sp_nrthreads--;
714 	list_del(&rqstp->rq_all);
715 	spin_unlock_bh(&pool->sp_lock);
716 
717 	kfree(rqstp);
718 
719 	/* Release the server */
720 	if (serv)
721 		svc_destroy(serv);
722 }
723 EXPORT_SYMBOL_GPL(svc_exit_thread);
724 
725 /*
726  * Register an "inet" protocol family netid with the local
727  * rpcbind daemon via an rpcbind v4 SET request.
728  *
729  * No netconfig infrastructure is available in the kernel, so
730  * we map IP_ protocol numbers to netids by hand.
731  *
732  * Returns zero on success; a negative errno value is returned
733  * if any error occurs.
734  */
735 static int __svc_rpcb_register4(const u32 program, const u32 version,
736 				const unsigned short protocol,
737 				const unsigned short port)
738 {
739 	const struct sockaddr_in sin = {
740 		.sin_family		= AF_INET,
741 		.sin_addr.s_addr	= htonl(INADDR_ANY),
742 		.sin_port		= htons(port),
743 	};
744 	const char *netid;
745 	int error;
746 
747 	switch (protocol) {
748 	case IPPROTO_UDP:
749 		netid = RPCBIND_NETID_UDP;
750 		break;
751 	case IPPROTO_TCP:
752 		netid = RPCBIND_NETID_TCP;
753 		break;
754 	default:
755 		return -ENOPROTOOPT;
756 	}
757 
758 	error = rpcb_v4_register(program, version,
759 					(const struct sockaddr *)&sin, netid);
760 
761 	/*
762 	 * User space didn't support rpcbind v4, so retry this
763 	 * registration request with the legacy rpcbind v2 protocol.
764 	 */
765 	if (error == -EPROTONOSUPPORT)
766 		error = rpcb_register(program, version, protocol, port);
767 
768 	return error;
769 }
770 
771 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
772 /*
773  * Register an "inet6" protocol family netid with the local
774  * rpcbind daemon via an rpcbind v4 SET request.
775  *
776  * No netconfig infrastructure is available in the kernel, so
777  * we map IP_ protocol numbers to netids by hand.
778  *
779  * Returns zero on success; a negative errno value is returned
780  * if any error occurs.
781  */
782 static int __svc_rpcb_register6(const u32 program, const u32 version,
783 				const unsigned short protocol,
784 				const unsigned short port)
785 {
786 	const struct sockaddr_in6 sin6 = {
787 		.sin6_family		= AF_INET6,
788 		.sin6_addr		= IN6ADDR_ANY_INIT,
789 		.sin6_port		= htons(port),
790 	};
791 	const char *netid;
792 	int error;
793 
794 	switch (protocol) {
795 	case IPPROTO_UDP:
796 		netid = RPCBIND_NETID_UDP6;
797 		break;
798 	case IPPROTO_TCP:
799 		netid = RPCBIND_NETID_TCP6;
800 		break;
801 	default:
802 		return -ENOPROTOOPT;
803 	}
804 
805 	error = rpcb_v4_register(program, version,
806 					(const struct sockaddr *)&sin6, netid);
807 
808 	/*
809 	 * User space didn't support rpcbind version 4, so we won't
810 	 * use a PF_INET6 listener.
811 	 */
812 	if (error == -EPROTONOSUPPORT)
813 		error = -EAFNOSUPPORT;
814 
815 	return error;
816 }
817 #endif	/* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */
818 
819 /*
820  * Register a kernel RPC service via rpcbind version 4.
821  *
822  * Returns zero on success; a negative errno value is returned
823  * if any error occurs.
824  */
825 static int __svc_register(const char *progname,
826 			  const u32 program, const u32 version,
827 			  const int family,
828 			  const unsigned short protocol,
829 			  const unsigned short port)
830 {
831 	int error = -EAFNOSUPPORT;
832 
833 	switch (family) {
834 	case PF_INET:
835 		error = __svc_rpcb_register4(program, version,
836 						protocol, port);
837 		break;
838 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
839 	case PF_INET6:
840 		error = __svc_rpcb_register6(program, version,
841 						protocol, port);
842 #endif	/* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */
843 	}
844 
845 	if (error < 0)
846 		printk(KERN_WARNING "svc: failed to register %sv%u RPC "
847 			"service (errno %d).\n", progname, version, -error);
848 	return error;
849 }
850 
851 /**
852  * svc_register - register an RPC service with the local portmapper
853  * @serv: svc_serv struct for the service to register
854  * @family: protocol family of service's listener socket
855  * @proto: transport protocol number to advertise
856  * @port: port to advertise
857  *
858  * Service is registered for any address in the passed-in protocol family
859  */
860 int svc_register(const struct svc_serv *serv, const int family,
861 		 const unsigned short proto, const unsigned short port)
862 {
863 	struct svc_program	*progp;
864 	unsigned int		i;
865 	int			error = 0;
866 
867 	BUG_ON(proto == 0 && port == 0);
868 
869 	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
870 		for (i = 0; i < progp->pg_nvers; i++) {
871 			if (progp->pg_vers[i] == NULL)
872 				continue;
873 
874 			dprintk("svc: svc_register(%sv%d, %s, %u, %u)%s\n",
875 					progp->pg_name,
876 					i,
877 					proto == IPPROTO_UDP?  "udp" : "tcp",
878 					port,
879 					family,
880 					progp->pg_vers[i]->vs_hidden?
881 						" (but not telling portmap)" : "");
882 
883 			if (progp->pg_vers[i]->vs_hidden)
884 				continue;
885 
886 			error = __svc_register(progp->pg_name, progp->pg_prog,
887 						i, family, proto, port);
888 			if (error < 0)
889 				break;
890 		}
891 	}
892 
893 	return error;
894 }
895 
896 /*
897  * If user space is running rpcbind, it should take the v4 UNSET
898  * and clear everything for this [program, version].  If user space
899  * is running portmap, it will reject the v4 UNSET, but won't have
900  * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
901  * in this case to clear all existing entries for [program, version].
902  */
903 static void __svc_unregister(const u32 program, const u32 version,
904 			     const char *progname)
905 {
906 	int error;
907 
908 	error = rpcb_v4_register(program, version, NULL, "");
909 
910 	/*
911 	 * User space didn't support rpcbind v4, so retry this
912 	 * request with the legacy rpcbind v2 protocol.
913 	 */
914 	if (error == -EPROTONOSUPPORT)
915 		error = rpcb_register(program, version, 0, 0);
916 
917 	dprintk("svc: %s(%sv%u), error %d\n",
918 			__func__, progname, version, error);
919 }
920 
921 /*
922  * All netids, bind addresses and ports registered for [program, version]
923  * are removed from the local rpcbind database (if the service is not
924  * hidden) to make way for a new instance of the service.
925  *
926  * The result of unregistration is reported via dprintk for those who want
927  * verification of the result, but is otherwise not important.
928  */
929 static void svc_unregister(const struct svc_serv *serv)
930 {
931 	struct svc_program *progp;
932 	unsigned long flags;
933 	unsigned int i;
934 
935 	clear_thread_flag(TIF_SIGPENDING);
936 
937 	for (progp = serv->sv_program; progp; progp = progp->pg_next) {
938 		for (i = 0; i < progp->pg_nvers; i++) {
939 			if (progp->pg_vers[i] == NULL)
940 				continue;
941 			if (progp->pg_vers[i]->vs_hidden)
942 				continue;
943 
944 			__svc_unregister(progp->pg_prog, i, progp->pg_name);
945 		}
946 	}
947 
948 	spin_lock_irqsave(&current->sighand->siglock, flags);
949 	recalc_sigpending();
950 	spin_unlock_irqrestore(&current->sighand->siglock, flags);
951 }
952 
953 /*
954  * Printk the given error with the address of the client that caused it.
955  */
956 static int
957 __attribute__ ((format (printf, 2, 3)))
958 svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
959 {
960 	va_list args;
961 	int 	r;
962 	char 	buf[RPC_MAX_ADDRBUFLEN];
963 
964 	if (!net_ratelimit())
965 		return 0;
966 
967 	printk(KERN_WARNING "svc: %s: ",
968 		svc_print_addr(rqstp, buf, sizeof(buf)));
969 
970 	va_start(args, fmt);
971 	r = vprintk(fmt, args);
972 	va_end(args);
973 
974 	return r;
975 }
976 
977 /*
978  * Common routine for processing the RPC request.
979  */
980 static int
981 svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
982 {
983 	struct svc_program	*progp;
984 	struct svc_version	*versp = NULL;	/* compiler food */
985 	struct svc_procedure	*procp = NULL;
986 	struct svc_serv		*serv = rqstp->rq_server;
987 	kxdrproc_t		xdr;
988 	__be32			*statp;
989 	u32			prog, vers, proc;
990 	__be32			auth_stat, rpc_stat;
991 	int			auth_res;
992 	__be32			*reply_statp;
993 
994 	rpc_stat = rpc_success;
995 
996 	if (argv->iov_len < 6*4)
997 		goto err_short_len;
998 
999 	/* Will be turned off only in gss privacy case: */
1000 	rqstp->rq_splice_ok = 1;
1001 	/* Will be turned off only when NFSv4 Sessions are used */
1002 	rqstp->rq_usedeferral = 1;
1003 
1004 	/* Setup reply header */
1005 	rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
1006 
1007 	svc_putu32(resv, rqstp->rq_xid);
1008 
1009 	vers = svc_getnl(argv);
1010 
1011 	/* First words of reply: */
1012 	svc_putnl(resv, 1);		/* REPLY */
1013 
1014 	if (vers != 2)		/* RPC version number */
1015 		goto err_bad_rpc;
1016 
1017 	/* Save position in case we later decide to reject: */
1018 	reply_statp = resv->iov_base + resv->iov_len;
1019 
1020 	svc_putnl(resv, 0);		/* ACCEPT */
1021 
1022 	rqstp->rq_prog = prog = svc_getnl(argv);	/* program number */
1023 	rqstp->rq_vers = vers = svc_getnl(argv);	/* version number */
1024 	rqstp->rq_proc = proc = svc_getnl(argv);	/* procedure number */
1025 
1026 	progp = serv->sv_program;
1027 
1028 	for (progp = serv->sv_program; progp; progp = progp->pg_next)
1029 		if (prog == progp->pg_prog)
1030 			break;
1031 
1032 	/*
1033 	 * Decode auth data, and add verifier to reply buffer.
1034 	 * We do this before anything else in order to get a decent
1035 	 * auth verifier.
1036 	 */
1037 	auth_res = svc_authenticate(rqstp, &auth_stat);
1038 	/* Also give the program a chance to reject this call: */
1039 	if (auth_res == SVC_OK && progp) {
1040 		auth_stat = rpc_autherr_badcred;
1041 		auth_res = progp->pg_authenticate(rqstp);
1042 	}
1043 	switch (auth_res) {
1044 	case SVC_OK:
1045 		break;
1046 	case SVC_GARBAGE:
1047 		goto err_garbage;
1048 	case SVC_SYSERR:
1049 		rpc_stat = rpc_system_err;
1050 		goto err_bad;
1051 	case SVC_DENIED:
1052 		goto err_bad_auth;
1053 	case SVC_DROP:
1054 		goto dropit;
1055 	case SVC_COMPLETE:
1056 		goto sendit;
1057 	}
1058 
1059 	if (progp == NULL)
1060 		goto err_bad_prog;
1061 
1062 	if (vers >= progp->pg_nvers ||
1063 	  !(versp = progp->pg_vers[vers]))
1064 		goto err_bad_vers;
1065 
1066 	procp = versp->vs_proc + proc;
1067 	if (proc >= versp->vs_nproc || !procp->pc_func)
1068 		goto err_bad_proc;
1069 	rqstp->rq_procinfo = procp;
1070 
1071 	/* Syntactic check complete */
1072 	serv->sv_stats->rpccnt++;
1073 
1074 	/* Build the reply header. */
1075 	statp = resv->iov_base +resv->iov_len;
1076 	svc_putnl(resv, RPC_SUCCESS);
1077 
1078 	/* Bump per-procedure stats counter */
1079 	procp->pc_count++;
1080 
1081 	/* Initialize storage for argp and resp */
1082 	memset(rqstp->rq_argp, 0, procp->pc_argsize);
1083 	memset(rqstp->rq_resp, 0, procp->pc_ressize);
1084 
1085 	/* un-reserve some of the out-queue now that we have a
1086 	 * better idea of reply size
1087 	 */
1088 	if (procp->pc_xdrressize)
1089 		svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1090 
1091 	/* Call the function that processes the request. */
1092 	if (!versp->vs_dispatch) {
1093 		/* Decode arguments */
1094 		xdr = procp->pc_decode;
1095 		if (xdr && !xdr(rqstp, argv->iov_base, rqstp->rq_argp))
1096 			goto err_garbage;
1097 
1098 		*statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
1099 
1100 		/* Encode reply */
1101 		if (*statp == rpc_drop_reply) {
1102 			if (procp->pc_release)
1103 				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1104 			goto dropit;
1105 		}
1106 		if (*statp == rpc_success &&
1107 		    (xdr = procp->pc_encode) &&
1108 		    !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) {
1109 			dprintk("svc: failed to encode reply\n");
1110 			/* serv->sv_stats->rpcsystemerr++; */
1111 			*statp = rpc_system_err;
1112 		}
1113 	} else {
1114 		dprintk("svc: calling dispatcher\n");
1115 		if (!versp->vs_dispatch(rqstp, statp)) {
1116 			/* Release reply info */
1117 			if (procp->pc_release)
1118 				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1119 			goto dropit;
1120 		}
1121 	}
1122 
1123 	/* Check RPC status result */
1124 	if (*statp != rpc_success)
1125 		resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1126 
1127 	/* Release reply info */
1128 	if (procp->pc_release)
1129 		procp->pc_release(rqstp, NULL, rqstp->rq_resp);
1130 
1131 	if (procp->pc_encode == NULL)
1132 		goto dropit;
1133 
1134  sendit:
1135 	if (svc_authorise(rqstp))
1136 		goto dropit;
1137 	return 1;		/* Caller can now send it */
1138 
1139  dropit:
1140 	svc_authorise(rqstp);	/* doesn't hurt to call this twice */
1141 	dprintk("svc: svc_process dropit\n");
1142 	svc_drop(rqstp);
1143 	return 0;
1144 
1145 err_short_len:
1146 	svc_printk(rqstp, "short len %Zd, dropping request\n",
1147 			argv->iov_len);
1148 
1149 	goto dropit;			/* drop request */
1150 
1151 err_bad_rpc:
1152 	serv->sv_stats->rpcbadfmt++;
1153 	svc_putnl(resv, 1);	/* REJECT */
1154 	svc_putnl(resv, 0);	/* RPC_MISMATCH */
1155 	svc_putnl(resv, 2);	/* Only RPCv2 supported */
1156 	svc_putnl(resv, 2);
1157 	goto sendit;
1158 
1159 err_bad_auth:
1160 	dprintk("svc: authentication failed (%d)\n", ntohl(auth_stat));
1161 	serv->sv_stats->rpcbadauth++;
1162 	/* Restore write pointer to location of accept status: */
1163 	xdr_ressize_check(rqstp, reply_statp);
1164 	svc_putnl(resv, 1);	/* REJECT */
1165 	svc_putnl(resv, 1);	/* AUTH_ERROR */
1166 	svc_putnl(resv, ntohl(auth_stat));	/* status */
1167 	goto sendit;
1168 
1169 err_bad_prog:
1170 	dprintk("svc: unknown program %d\n", prog);
1171 	serv->sv_stats->rpcbadfmt++;
1172 	svc_putnl(resv, RPC_PROG_UNAVAIL);
1173 	goto sendit;
1174 
1175 err_bad_vers:
1176 	svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1177 		       vers, prog, progp->pg_name);
1178 
1179 	serv->sv_stats->rpcbadfmt++;
1180 	svc_putnl(resv, RPC_PROG_MISMATCH);
1181 	svc_putnl(resv, progp->pg_lovers);
1182 	svc_putnl(resv, progp->pg_hivers);
1183 	goto sendit;
1184 
1185 err_bad_proc:
1186 	svc_printk(rqstp, "unknown procedure (%d)\n", proc);
1187 
1188 	serv->sv_stats->rpcbadfmt++;
1189 	svc_putnl(resv, RPC_PROC_UNAVAIL);
1190 	goto sendit;
1191 
1192 err_garbage:
1193 	svc_printk(rqstp, "failed to decode args\n");
1194 
1195 	rpc_stat = rpc_garbage_args;
1196 err_bad:
1197 	serv->sv_stats->rpcbadfmt++;
1198 	svc_putnl(resv, ntohl(rpc_stat));
1199 	goto sendit;
1200 }
1201 EXPORT_SYMBOL_GPL(svc_process);
1202 
1203 /*
1204  * Process the RPC request.
1205  */
1206 int
1207 svc_process(struct svc_rqst *rqstp)
1208 {
1209 	struct kvec		*argv = &rqstp->rq_arg.head[0];
1210 	struct kvec		*resv = &rqstp->rq_res.head[0];
1211 	struct svc_serv		*serv = rqstp->rq_server;
1212 	u32			dir;
1213 	int			error;
1214 
1215 	/*
1216 	 * Setup response xdr_buf.
1217 	 * Initially it has just one page
1218 	 */
1219 	rqstp->rq_resused = 1;
1220 	resv->iov_base = page_address(rqstp->rq_respages[0]);
1221 	resv->iov_len = 0;
1222 	rqstp->rq_res.pages = rqstp->rq_respages + 1;
1223 	rqstp->rq_res.len = 0;
1224 	rqstp->rq_res.page_base = 0;
1225 	rqstp->rq_res.page_len = 0;
1226 	rqstp->rq_res.buflen = PAGE_SIZE;
1227 	rqstp->rq_res.tail[0].iov_base = NULL;
1228 	rqstp->rq_res.tail[0].iov_len = 0;
1229 
1230 	rqstp->rq_xid = svc_getu32(argv);
1231 
1232 	dir  = svc_getnl(argv);
1233 	if (dir != 0) {
1234 		/* direction != CALL */
1235 		svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1236 		serv->sv_stats->rpcbadfmt++;
1237 		svc_drop(rqstp);
1238 		return 0;
1239 	}
1240 
1241 	error = svc_process_common(rqstp, argv, resv);
1242 	if (error <= 0)
1243 		return error;
1244 
1245 	return svc_send(rqstp);
1246 }
1247 
1248 #if defined(CONFIG_NFS_V4_1)
1249 /*
1250  * Process a backchannel RPC request that arrived over an existing
1251  * outbound connection
1252  */
1253 int
1254 bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1255 	       struct svc_rqst *rqstp)
1256 {
1257 	struct kvec	*argv = &rqstp->rq_arg.head[0];
1258 	struct kvec	*resv = &rqstp->rq_res.head[0];
1259 	int 		error;
1260 
1261 	/* Build the svc_rqst used by the common processing routine */
1262 	rqstp->rq_xprt = serv->bc_xprt;
1263 	rqstp->rq_xid = req->rq_xid;
1264 	rqstp->rq_prot = req->rq_xprt->prot;
1265 	rqstp->rq_server = serv;
1266 
1267 	rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1268 	memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1269 	memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1270 	memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1271 
1272 	/* reset result send buffer "put" position */
1273 	resv->iov_len = 0;
1274 
1275 	if (rqstp->rq_prot != IPPROTO_TCP) {
1276 		printk(KERN_ERR "No support for Non-TCP transports!\n");
1277 		BUG();
1278 	}
1279 
1280 	/*
1281 	 * Skip the next two words because they've already been
1282 	 * processed in the trasport
1283 	 */
1284 	svc_getu32(argv);	/* XID */
1285 	svc_getnl(argv);	/* CALLDIR */
1286 
1287 	error = svc_process_common(rqstp, argv, resv);
1288 	if (error <= 0)
1289 		return error;
1290 
1291 	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1292 	return bc_send(req);
1293 }
1294 EXPORT_SYMBOL(bc_svc_process);
1295 #endif /* CONFIG_NFS_V4_1 */
1296 
1297 /*
1298  * Return (transport-specific) limit on the rpc payload.
1299  */
1300 u32 svc_max_payload(const struct svc_rqst *rqstp)
1301 {
1302 	u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1303 
1304 	if (rqstp->rq_server->sv_max_payload < max)
1305 		max = rqstp->rq_server->sv_max_payload;
1306 	return max;
1307 }
1308 EXPORT_SYMBOL_GPL(svc_max_payload);
1309