xref: /openbmc/linux/net/sunrpc/xprtsock.c (revision a743419f)
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20 
21 #include <linux/types.h>
22 #include <linux/string.h>
23 #include <linux/slab.h>
24 #include <linux/module.h>
25 #include <linux/capability.h>
26 #include <linux/pagemap.h>
27 #include <linux/errno.h>
28 #include <linux/socket.h>
29 #include <linux/in.h>
30 #include <linux/net.h>
31 #include <linux/mm.h>
32 #include <linux/un.h>
33 #include <linux/udp.h>
34 #include <linux/tcp.h>
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/sched.h>
38 #include <linux/sunrpc/svcsock.h>
39 #include <linux/sunrpc/xprtsock.h>
40 #include <linux/file.h>
41 #ifdef CONFIG_SUNRPC_BACKCHANNEL
42 #include <linux/sunrpc/bc_xprt.h>
43 #endif
44 
45 #include <net/sock.h>
46 #include <net/checksum.h>
47 #include <net/udp.h>
48 #include <net/tcp.h>
49 
50 #include <trace/events/sunrpc.h>
51 
52 #include "sunrpc.h"
53 
54 static void xs_close(struct rpc_xprt *xprt);
55 
56 /*
57  * xprtsock tunables
58  */
59 static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
60 static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
61 static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
62 
63 static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
64 static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
65 
66 #define XS_TCP_LINGER_TO	(15U * HZ)
67 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
68 
69 /*
70  * We can register our own files under /proc/sys/sunrpc by
71  * calling register_sysctl_table() again.  The files in that
72  * directory become the union of all files registered there.
73  *
74  * We simply need to make sure that we don't collide with
75  * someone else's file names!
76  */
77 
78 #ifdef RPC_DEBUG
79 
80 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
81 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
82 static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
83 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
84 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
85 
86 static struct ctl_table_header *sunrpc_table_header;
87 
88 /*
89  * FIXME: changing the UDP slot table size should also resize the UDP
90  *        socket buffers for existing UDP transports
91  */
92 static struct ctl_table xs_tunables_table[] = {
93 	{
94 		.procname	= "udp_slot_table_entries",
95 		.data		= &xprt_udp_slot_table_entries,
96 		.maxlen		= sizeof(unsigned int),
97 		.mode		= 0644,
98 		.proc_handler	= proc_dointvec_minmax,
99 		.extra1		= &min_slot_table_size,
100 		.extra2		= &max_slot_table_size
101 	},
102 	{
103 		.procname	= "tcp_slot_table_entries",
104 		.data		= &xprt_tcp_slot_table_entries,
105 		.maxlen		= sizeof(unsigned int),
106 		.mode		= 0644,
107 		.proc_handler	= proc_dointvec_minmax,
108 		.extra1		= &min_slot_table_size,
109 		.extra2		= &max_slot_table_size
110 	},
111 	{
112 		.procname	= "tcp_max_slot_table_entries",
113 		.data		= &xprt_max_tcp_slot_table_entries,
114 		.maxlen		= sizeof(unsigned int),
115 		.mode		= 0644,
116 		.proc_handler	= proc_dointvec_minmax,
117 		.extra1		= &min_slot_table_size,
118 		.extra2		= &max_tcp_slot_table_limit
119 	},
120 	{
121 		.procname	= "min_resvport",
122 		.data		= &xprt_min_resvport,
123 		.maxlen		= sizeof(unsigned int),
124 		.mode		= 0644,
125 		.proc_handler	= proc_dointvec_minmax,
126 		.extra1		= &xprt_min_resvport_limit,
127 		.extra2		= &xprt_max_resvport_limit
128 	},
129 	{
130 		.procname	= "max_resvport",
131 		.data		= &xprt_max_resvport,
132 		.maxlen		= sizeof(unsigned int),
133 		.mode		= 0644,
134 		.proc_handler	= proc_dointvec_minmax,
135 		.extra1		= &xprt_min_resvport_limit,
136 		.extra2		= &xprt_max_resvport_limit
137 	},
138 	{
139 		.procname	= "tcp_fin_timeout",
140 		.data		= &xs_tcp_fin_timeout,
141 		.maxlen		= sizeof(xs_tcp_fin_timeout),
142 		.mode		= 0644,
143 		.proc_handler	= proc_dointvec_jiffies,
144 	},
145 	{ },
146 };
147 
148 static struct ctl_table sunrpc_table[] = {
149 	{
150 		.procname	= "sunrpc",
151 		.mode		= 0555,
152 		.child		= xs_tunables_table
153 	},
154 	{ },
155 };
156 
157 #endif
158 
159 /*
160  * Wait duration for a reply from the RPC portmapper.
161  */
162 #define XS_BIND_TO		(60U * HZ)
163 
164 /*
165  * Delay if a UDP socket connect error occurs.  This is most likely some
166  * kind of resource problem on the local host.
167  */
168 #define XS_UDP_REEST_TO		(2U * HZ)
169 
170 /*
171  * The reestablish timeout allows clients to delay for a bit before attempting
172  * to reconnect to a server that just dropped our connection.
173  *
174  * We implement an exponential backoff when trying to reestablish a TCP
175  * transport connection with the server.  Some servers like to drop a TCP
176  * connection when they are overworked, so we start with a short timeout and
177  * increase over time if the server is down or not responding.
178  */
179 #define XS_TCP_INIT_REEST_TO	(3U * HZ)
180 #define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
181 
182 /*
183  * TCP idle timeout; client drops the transport socket if it is idle
184  * for this long.  Note that we also timeout UDP sockets to prevent
185  * holding port numbers when there is no RPC traffic.
186  */
187 #define XS_IDLE_DISC_TO		(5U * 60 * HZ)
188 
189 #ifdef RPC_DEBUG
190 # undef  RPC_DEBUG_DATA
191 # define RPCDBG_FACILITY	RPCDBG_TRANS
192 #endif
193 
194 #ifdef RPC_DEBUG_DATA
195 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
196 {
197 	u8 *buf = (u8 *) packet;
198 	int j;
199 
200 	dprintk("RPC:       %s\n", msg);
201 	for (j = 0; j < count && j < 128; j += 4) {
202 		if (!(j & 31)) {
203 			if (j)
204 				dprintk("\n");
205 			dprintk("0x%04x ", j);
206 		}
207 		dprintk("%02x%02x%02x%02x ",
208 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
209 	}
210 	dprintk("\n");
211 }
212 #else
213 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
214 {
215 	/* NOP */
216 }
217 #endif
218 
219 struct sock_xprt {
220 	struct rpc_xprt		xprt;
221 
222 	/*
223 	 * Network layer
224 	 */
225 	struct socket *		sock;
226 	struct sock *		inet;
227 
228 	/*
229 	 * State of TCP reply receive
230 	 */
231 	__be32			tcp_fraghdr,
232 				tcp_xid,
233 				tcp_calldir;
234 
235 	u32			tcp_offset,
236 				tcp_reclen;
237 
238 	unsigned long		tcp_copied,
239 				tcp_flags;
240 
241 	/*
242 	 * Connection of transports
243 	 */
244 	struct delayed_work	connect_worker;
245 	struct sockaddr_storage	srcaddr;
246 	unsigned short		srcport;
247 
248 	/*
249 	 * UDP socket buffer size parameters
250 	 */
251 	size_t			rcvsize,
252 				sndsize;
253 
254 	/*
255 	 * Saved socket callback addresses
256 	 */
257 	void			(*old_data_ready)(struct sock *);
258 	void			(*old_state_change)(struct sock *);
259 	void			(*old_write_space)(struct sock *);
260 	void			(*old_error_report)(struct sock *);
261 };
262 
263 /*
264  * TCP receive state flags
265  */
266 #define TCP_RCV_LAST_FRAG	(1UL << 0)
267 #define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
268 #define TCP_RCV_COPY_XID	(1UL << 2)
269 #define TCP_RCV_COPY_DATA	(1UL << 3)
270 #define TCP_RCV_READ_CALLDIR	(1UL << 4)
271 #define TCP_RCV_COPY_CALLDIR	(1UL << 5)
272 
273 /*
274  * TCP RPC flags
275  */
276 #define TCP_RPC_REPLY		(1UL << 6)
277 
278 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
279 {
280 	return (struct rpc_xprt *) sk->sk_user_data;
281 }
282 
283 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
284 {
285 	return (struct sockaddr *) &xprt->addr;
286 }
287 
288 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
289 {
290 	return (struct sockaddr_un *) &xprt->addr;
291 }
292 
293 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
294 {
295 	return (struct sockaddr_in *) &xprt->addr;
296 }
297 
298 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
299 {
300 	return (struct sockaddr_in6 *) &xprt->addr;
301 }
302 
303 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
304 {
305 	struct sockaddr *sap = xs_addr(xprt);
306 	struct sockaddr_in6 *sin6;
307 	struct sockaddr_in *sin;
308 	struct sockaddr_un *sun;
309 	char buf[128];
310 
311 	switch (sap->sa_family) {
312 	case AF_LOCAL:
313 		sun = xs_addr_un(xprt);
314 		strlcpy(buf, sun->sun_path, sizeof(buf));
315 		xprt->address_strings[RPC_DISPLAY_ADDR] =
316 						kstrdup(buf, GFP_KERNEL);
317 		break;
318 	case AF_INET:
319 		(void)rpc_ntop(sap, buf, sizeof(buf));
320 		xprt->address_strings[RPC_DISPLAY_ADDR] =
321 						kstrdup(buf, GFP_KERNEL);
322 		sin = xs_addr_in(xprt);
323 		snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
324 		break;
325 	case AF_INET6:
326 		(void)rpc_ntop(sap, buf, sizeof(buf));
327 		xprt->address_strings[RPC_DISPLAY_ADDR] =
328 						kstrdup(buf, GFP_KERNEL);
329 		sin6 = xs_addr_in6(xprt);
330 		snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
331 		break;
332 	default:
333 		BUG();
334 	}
335 
336 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
337 }
338 
339 static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
340 {
341 	struct sockaddr *sap = xs_addr(xprt);
342 	char buf[128];
343 
344 	snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
345 	xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
346 
347 	snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
348 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
349 }
350 
351 static void xs_format_peer_addresses(struct rpc_xprt *xprt,
352 				     const char *protocol,
353 				     const char *netid)
354 {
355 	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
356 	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
357 	xs_format_common_peer_addresses(xprt);
358 	xs_format_common_peer_ports(xprt);
359 }
360 
361 static void xs_update_peer_port(struct rpc_xprt *xprt)
362 {
363 	kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
364 	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
365 
366 	xs_format_common_peer_ports(xprt);
367 }
368 
369 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
370 {
371 	unsigned int i;
372 
373 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
374 		switch (i) {
375 		case RPC_DISPLAY_PROTO:
376 		case RPC_DISPLAY_NETID:
377 			continue;
378 		default:
379 			kfree(xprt->address_strings[i]);
380 		}
381 }
382 
383 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
384 
385 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
386 {
387 	struct msghdr msg = {
388 		.msg_name	= addr,
389 		.msg_namelen	= addrlen,
390 		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
391 	};
392 	struct kvec iov = {
393 		.iov_base	= vec->iov_base + base,
394 		.iov_len	= vec->iov_len - base,
395 	};
396 
397 	if (iov.iov_len != 0)
398 		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
399 	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
400 }
401 
402 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy)
403 {
404 	ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
405 			int offset, size_t size, int flags);
406 	struct page **ppage;
407 	unsigned int remainder;
408 	int err, sent = 0;
409 
410 	remainder = xdr->page_len - base;
411 	base += xdr->page_base;
412 	ppage = xdr->pages + (base >> PAGE_SHIFT);
413 	base &= ~PAGE_MASK;
414 	do_sendpage = sock->ops->sendpage;
415 	if (!zerocopy)
416 		do_sendpage = sock_no_sendpage;
417 	for(;;) {
418 		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
419 		int flags = XS_SENDMSG_FLAGS;
420 
421 		remainder -= len;
422 		if (remainder != 0 || more)
423 			flags |= MSG_MORE;
424 		err = do_sendpage(sock, *ppage, base, len, flags);
425 		if (remainder == 0 || err != len)
426 			break;
427 		sent += err;
428 		ppage++;
429 		base = 0;
430 	}
431 	if (sent == 0)
432 		return err;
433 	if (err > 0)
434 		sent += err;
435 	return sent;
436 }
437 
438 /**
439  * xs_sendpages - write pages directly to a socket
440  * @sock: socket to send on
441  * @addr: UDP only -- address of destination
442  * @addrlen: UDP only -- length of destination address
443  * @xdr: buffer containing this request
444  * @base: starting position in the buffer
445  * @zerocopy: true if it is safe to use sendpage()
446  *
447  */
448 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy)
449 {
450 	unsigned int remainder = xdr->len - base;
451 	int err, sent = 0;
452 
453 	if (unlikely(!sock))
454 		return -ENOTSOCK;
455 
456 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
457 	if (base != 0) {
458 		addr = NULL;
459 		addrlen = 0;
460 	}
461 
462 	if (base < xdr->head[0].iov_len || addr != NULL) {
463 		unsigned int len = xdr->head[0].iov_len - base;
464 		remainder -= len;
465 		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
466 		if (remainder == 0 || err != len)
467 			goto out;
468 		sent += err;
469 		base = 0;
470 	} else
471 		base -= xdr->head[0].iov_len;
472 
473 	if (base < xdr->page_len) {
474 		unsigned int len = xdr->page_len - base;
475 		remainder -= len;
476 		err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy);
477 		if (remainder == 0 || err != len)
478 			goto out;
479 		sent += err;
480 		base = 0;
481 	} else
482 		base -= xdr->page_len;
483 
484 	if (base >= xdr->tail[0].iov_len)
485 		return sent;
486 	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
487 out:
488 	if (sent == 0)
489 		return err;
490 	if (err > 0)
491 		sent += err;
492 	return sent;
493 }
494 
495 static void xs_nospace_callback(struct rpc_task *task)
496 {
497 	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
498 
499 	transport->inet->sk_write_pending--;
500 	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
501 }
502 
503 /**
504  * xs_nospace - place task on wait queue if transmit was incomplete
505  * @task: task to put to sleep
506  *
507  */
508 static int xs_nospace(struct rpc_task *task)
509 {
510 	struct rpc_rqst *req = task->tk_rqstp;
511 	struct rpc_xprt *xprt = req->rq_xprt;
512 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
513 	struct sock *sk = transport->inet;
514 	int ret = -EAGAIN;
515 
516 	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
517 			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
518 			req->rq_slen);
519 
520 	/* Protect against races with write_space */
521 	spin_lock_bh(&xprt->transport_lock);
522 
523 	/* Don't race with disconnect */
524 	if (xprt_connected(xprt)) {
525 		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
526 			/*
527 			 * Notify TCP that we're limited by the application
528 			 * window size
529 			 */
530 			set_bit(SOCK_NOSPACE, &transport->sock->flags);
531 			sk->sk_write_pending++;
532 			/* ...and wait for more buffer space */
533 			xprt_wait_for_buffer_space(task, xs_nospace_callback);
534 		}
535 	} else {
536 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
537 		ret = -ENOTCONN;
538 	}
539 
540 	spin_unlock_bh(&xprt->transport_lock);
541 
542 	/* Race breaker in case memory is freed before above code is called */
543 	sk->sk_write_space(sk);
544 	return ret;
545 }
546 
547 /*
548  * Construct a stream transport record marker in @buf.
549  */
550 static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
551 {
552 	u32 reclen = buf->len - sizeof(rpc_fraghdr);
553 	rpc_fraghdr *base = buf->head[0].iov_base;
554 	*base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
555 }
556 
557 /**
558  * xs_local_send_request - write an RPC request to an AF_LOCAL socket
559  * @task: RPC task that manages the state of an RPC request
560  *
561  * Return values:
562  *        0:	The request has been sent
563  *   EAGAIN:	The socket was blocked, please call again later to
564  *		complete the request
565  * ENOTCONN:	Caller needs to invoke connect logic then call again
566  *    other:	Some other error occured, the request was not sent
567  */
568 static int xs_local_send_request(struct rpc_task *task)
569 {
570 	struct rpc_rqst *req = task->tk_rqstp;
571 	struct rpc_xprt *xprt = req->rq_xprt;
572 	struct sock_xprt *transport =
573 				container_of(xprt, struct sock_xprt, xprt);
574 	struct xdr_buf *xdr = &req->rq_snd_buf;
575 	int status;
576 
577 	xs_encode_stream_record_marker(&req->rq_snd_buf);
578 
579 	xs_pktdump("packet data:",
580 			req->rq_svec->iov_base, req->rq_svec->iov_len);
581 
582 	status = xs_sendpages(transport->sock, NULL, 0,
583 						xdr, req->rq_bytes_sent, true);
584 	dprintk("RPC:       %s(%u) = %d\n",
585 			__func__, xdr->len - req->rq_bytes_sent, status);
586 	if (likely(status >= 0)) {
587 		req->rq_bytes_sent += status;
588 		req->rq_xmit_bytes_sent += status;
589 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
590 			req->rq_bytes_sent = 0;
591 			return 0;
592 		}
593 		status = -EAGAIN;
594 	}
595 
596 	switch (status) {
597 	case -ENOBUFS:
598 	case -EAGAIN:
599 		status = xs_nospace(task);
600 		break;
601 	default:
602 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
603 			-status);
604 	case -EPIPE:
605 		xs_close(xprt);
606 		status = -ENOTCONN;
607 	}
608 
609 	return status;
610 }
611 
612 /**
613  * xs_udp_send_request - write an RPC request to a UDP socket
614  * @task: address of RPC task that manages the state of an RPC request
615  *
616  * Return values:
617  *        0:	The request has been sent
618  *   EAGAIN:	The socket was blocked, please call again later to
619  *		complete the request
620  * ENOTCONN:	Caller needs to invoke connect logic then call again
621  *    other:	Some other error occurred, the request was not sent
622  */
623 static int xs_udp_send_request(struct rpc_task *task)
624 {
625 	struct rpc_rqst *req = task->tk_rqstp;
626 	struct rpc_xprt *xprt = req->rq_xprt;
627 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
628 	struct xdr_buf *xdr = &req->rq_snd_buf;
629 	int status;
630 
631 	xs_pktdump("packet data:",
632 				req->rq_svec->iov_base,
633 				req->rq_svec->iov_len);
634 
635 	if (!xprt_bound(xprt))
636 		return -ENOTCONN;
637 	status = xs_sendpages(transport->sock,
638 			      xs_addr(xprt),
639 			      xprt->addrlen, xdr,
640 			      req->rq_bytes_sent, true);
641 
642 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
643 			xdr->len - req->rq_bytes_sent, status);
644 
645 	if (status >= 0) {
646 		req->rq_xmit_bytes_sent += status;
647 		if (status >= req->rq_slen)
648 			return 0;
649 		/* Still some bytes left; set up for a retry later. */
650 		status = -EAGAIN;
651 	}
652 
653 	switch (status) {
654 	case -ENOTSOCK:
655 		status = -ENOTCONN;
656 		/* Should we call xs_close() here? */
657 		break;
658 	case -EAGAIN:
659 		status = xs_nospace(task);
660 		break;
661 	default:
662 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
663 			-status);
664 	case -ENETUNREACH:
665 	case -ENOBUFS:
666 	case -EPIPE:
667 	case -ECONNREFUSED:
668 		/* When the server has died, an ICMP port unreachable message
669 		 * prompts ECONNREFUSED. */
670 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
671 	}
672 
673 	return status;
674 }
675 
676 /**
677  * xs_tcp_shutdown - gracefully shut down a TCP socket
678  * @xprt: transport
679  *
680  * Initiates a graceful shutdown of the TCP socket by calling the
681  * equivalent of shutdown(SHUT_WR);
682  */
683 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
684 {
685 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
686 	struct socket *sock = transport->sock;
687 
688 	if (sock != NULL) {
689 		kernel_sock_shutdown(sock, SHUT_WR);
690 		trace_rpc_socket_shutdown(xprt, sock);
691 	}
692 }
693 
694 /**
695  * xs_tcp_send_request - write an RPC request to a TCP socket
696  * @task: address of RPC task that manages the state of an RPC request
697  *
698  * Return values:
699  *        0:	The request has been sent
700  *   EAGAIN:	The socket was blocked, please call again later to
701  *		complete the request
702  * ENOTCONN:	Caller needs to invoke connect logic then call again
703  *    other:	Some other error occurred, the request was not sent
704  *
705  * XXX: In the case of soft timeouts, should we eventually give up
706  *	if sendmsg is not able to make progress?
707  */
708 static int xs_tcp_send_request(struct rpc_task *task)
709 {
710 	struct rpc_rqst *req = task->tk_rqstp;
711 	struct rpc_xprt *xprt = req->rq_xprt;
712 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
713 	struct xdr_buf *xdr = &req->rq_snd_buf;
714 	bool zerocopy = true;
715 	int status;
716 
717 	xs_encode_stream_record_marker(&req->rq_snd_buf);
718 
719 	xs_pktdump("packet data:",
720 				req->rq_svec->iov_base,
721 				req->rq_svec->iov_len);
722 	/* Don't use zero copy if this is a resend. If the RPC call
723 	 * completes while the socket holds a reference to the pages,
724 	 * then we may end up resending corrupted data.
725 	 */
726 	if (task->tk_flags & RPC_TASK_SENT)
727 		zerocopy = false;
728 
729 	/* Continue transmitting the packet/record. We must be careful
730 	 * to cope with writespace callbacks arriving _after_ we have
731 	 * called sendmsg(). */
732 	while (1) {
733 		status = xs_sendpages(transport->sock,
734 					NULL, 0, xdr, req->rq_bytes_sent,
735 					zerocopy);
736 
737 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
738 				xdr->len - req->rq_bytes_sent, status);
739 
740 		if (unlikely(status < 0))
741 			break;
742 
743 		/* If we've sent the entire packet, immediately
744 		 * reset the count of bytes sent. */
745 		req->rq_bytes_sent += status;
746 		req->rq_xmit_bytes_sent += status;
747 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
748 			req->rq_bytes_sent = 0;
749 			return 0;
750 		}
751 
752 		if (status != 0)
753 			continue;
754 		status = -EAGAIN;
755 		break;
756 	}
757 
758 	switch (status) {
759 	case -ENOTSOCK:
760 		status = -ENOTCONN;
761 		/* Should we call xs_close() here? */
762 		break;
763 	case -ENOBUFS:
764 	case -EAGAIN:
765 		status = xs_nospace(task);
766 		break;
767 	default:
768 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
769 			-status);
770 	case -ECONNRESET:
771 		xs_tcp_shutdown(xprt);
772 	case -ECONNREFUSED:
773 	case -ENOTCONN:
774 	case -EPIPE:
775 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
776 	}
777 
778 	return status;
779 }
780 
781 /**
782  * xs_tcp_release_xprt - clean up after a tcp transmission
783  * @xprt: transport
784  * @task: rpc task
785  *
786  * This cleans up if an error causes us to abort the transmission of a request.
787  * In this case, the socket may need to be reset in order to avoid confusing
788  * the server.
789  */
790 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
791 {
792 	struct rpc_rqst *req;
793 
794 	if (task != xprt->snd_task)
795 		return;
796 	if (task == NULL)
797 		goto out_release;
798 	req = task->tk_rqstp;
799 	if (req == NULL)
800 		goto out_release;
801 	if (req->rq_bytes_sent == 0)
802 		goto out_release;
803 	if (req->rq_bytes_sent == req->rq_snd_buf.len)
804 		goto out_release;
805 	set_bit(XPRT_CLOSE_WAIT, &xprt->state);
806 out_release:
807 	xprt_release_xprt(xprt, task);
808 }
809 
810 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
811 {
812 	transport->old_data_ready = sk->sk_data_ready;
813 	transport->old_state_change = sk->sk_state_change;
814 	transport->old_write_space = sk->sk_write_space;
815 	transport->old_error_report = sk->sk_error_report;
816 }
817 
818 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
819 {
820 	sk->sk_data_ready = transport->old_data_ready;
821 	sk->sk_state_change = transport->old_state_change;
822 	sk->sk_write_space = transport->old_write_space;
823 	sk->sk_error_report = transport->old_error_report;
824 }
825 
826 /**
827  * xs_error_report - callback to handle TCP socket state errors
828  * @sk: socket
829  *
830  * Note: we don't call sock_error() since there may be a rpc_task
831  * using the socket, and so we don't want to clear sk->sk_err.
832  */
833 static void xs_error_report(struct sock *sk)
834 {
835 	struct rpc_xprt *xprt;
836 	int err;
837 
838 	read_lock_bh(&sk->sk_callback_lock);
839 	if (!(xprt = xprt_from_sock(sk)))
840 		goto out;
841 
842 	err = -sk->sk_err;
843 	if (err == 0)
844 		goto out;
845 	dprintk("RPC:       xs_error_report client %p, error=%d...\n",
846 			xprt, -err);
847 	trace_rpc_socket_error(xprt, sk->sk_socket, err);
848 	if (test_bit(XPRT_CONNECTION_REUSE, &xprt->state))
849 		goto out;
850 	xprt_wake_pending_tasks(xprt, err);
851  out:
852 	read_unlock_bh(&sk->sk_callback_lock);
853 }
854 
855 static void xs_reset_transport(struct sock_xprt *transport)
856 {
857 	struct socket *sock = transport->sock;
858 	struct sock *sk = transport->inet;
859 
860 	if (sk == NULL)
861 		return;
862 
863 	transport->srcport = 0;
864 
865 	write_lock_bh(&sk->sk_callback_lock);
866 	transport->inet = NULL;
867 	transport->sock = NULL;
868 
869 	sk->sk_user_data = NULL;
870 
871 	xs_restore_old_callbacks(transport, sk);
872 	write_unlock_bh(&sk->sk_callback_lock);
873 
874 	trace_rpc_socket_close(&transport->xprt, sock);
875 	sock_release(sock);
876 }
877 
878 /**
879  * xs_close - close a socket
880  * @xprt: transport
881  *
882  * This is used when all requests are complete; ie, no DRC state remains
883  * on the server we want to save.
884  *
885  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
886  * xs_reset_transport() zeroing the socket from underneath a writer.
887  */
888 static void xs_close(struct rpc_xprt *xprt)
889 {
890 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
891 
892 	dprintk("RPC:       xs_close xprt %p\n", xprt);
893 
894 	cancel_delayed_work_sync(&transport->connect_worker);
895 
896 	xs_reset_transport(transport);
897 	xprt->reestablish_timeout = 0;
898 
899 	smp_mb__before_atomic();
900 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
901 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
902 	clear_bit(XPRT_CLOSING, &xprt->state);
903 	smp_mb__after_atomic();
904 	xprt_disconnect_done(xprt);
905 }
906 
907 static void xs_tcp_close(struct rpc_xprt *xprt)
908 {
909 	if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
910 		xs_close(xprt);
911 	else
912 		xs_tcp_shutdown(xprt);
913 }
914 
915 static void xs_xprt_free(struct rpc_xprt *xprt)
916 {
917 	xs_free_peer_addresses(xprt);
918 	xprt_free(xprt);
919 }
920 
921 /**
922  * xs_destroy - prepare to shutdown a transport
923  * @xprt: doomed transport
924  *
925  */
926 static void xs_destroy(struct rpc_xprt *xprt)
927 {
928 	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
929 
930 	xs_close(xprt);
931 	xs_xprt_free(xprt);
932 	module_put(THIS_MODULE);
933 }
934 
935 static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
936 {
937 	struct xdr_skb_reader desc = {
938 		.skb		= skb,
939 		.offset		= sizeof(rpc_fraghdr),
940 		.count		= skb->len - sizeof(rpc_fraghdr),
941 	};
942 
943 	if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
944 		return -1;
945 	if (desc.count)
946 		return -1;
947 	return 0;
948 }
949 
950 /**
951  * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
952  * @sk: socket with data to read
953  * @len: how much data to read
954  *
955  * Currently this assumes we can read the whole reply in a single gulp.
956  */
957 static void xs_local_data_ready(struct sock *sk)
958 {
959 	struct rpc_task *task;
960 	struct rpc_xprt *xprt;
961 	struct rpc_rqst *rovr;
962 	struct sk_buff *skb;
963 	int err, repsize, copied;
964 	u32 _xid;
965 	__be32 *xp;
966 
967 	read_lock_bh(&sk->sk_callback_lock);
968 	dprintk("RPC:       %s...\n", __func__);
969 	xprt = xprt_from_sock(sk);
970 	if (xprt == NULL)
971 		goto out;
972 
973 	skb = skb_recv_datagram(sk, 0, 1, &err);
974 	if (skb == NULL)
975 		goto out;
976 
977 	repsize = skb->len - sizeof(rpc_fraghdr);
978 	if (repsize < 4) {
979 		dprintk("RPC:       impossible RPC reply size %d\n", repsize);
980 		goto dropit;
981 	}
982 
983 	/* Copy the XID from the skb... */
984 	xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
985 	if (xp == NULL)
986 		goto dropit;
987 
988 	/* Look up and lock the request corresponding to the given XID */
989 	spin_lock(&xprt->transport_lock);
990 	rovr = xprt_lookup_rqst(xprt, *xp);
991 	if (!rovr)
992 		goto out_unlock;
993 	task = rovr->rq_task;
994 
995 	copied = rovr->rq_private_buf.buflen;
996 	if (copied > repsize)
997 		copied = repsize;
998 
999 	if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1000 		dprintk("RPC:       sk_buff copy failed\n");
1001 		goto out_unlock;
1002 	}
1003 
1004 	xprt_complete_rqst(task, copied);
1005 
1006  out_unlock:
1007 	spin_unlock(&xprt->transport_lock);
1008  dropit:
1009 	skb_free_datagram(sk, skb);
1010  out:
1011 	read_unlock_bh(&sk->sk_callback_lock);
1012 }
1013 
1014 /**
1015  * xs_udp_data_ready - "data ready" callback for UDP sockets
1016  * @sk: socket with data to read
1017  * @len: how much data to read
1018  *
1019  */
1020 static void xs_udp_data_ready(struct sock *sk)
1021 {
1022 	struct rpc_task *task;
1023 	struct rpc_xprt *xprt;
1024 	struct rpc_rqst *rovr;
1025 	struct sk_buff *skb;
1026 	int err, repsize, copied;
1027 	u32 _xid;
1028 	__be32 *xp;
1029 
1030 	read_lock_bh(&sk->sk_callback_lock);
1031 	dprintk("RPC:       xs_udp_data_ready...\n");
1032 	if (!(xprt = xprt_from_sock(sk)))
1033 		goto out;
1034 
1035 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
1036 		goto out;
1037 
1038 	repsize = skb->len - sizeof(struct udphdr);
1039 	if (repsize < 4) {
1040 		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
1041 		goto dropit;
1042 	}
1043 
1044 	/* Copy the XID from the skb... */
1045 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
1046 				sizeof(_xid), &_xid);
1047 	if (xp == NULL)
1048 		goto dropit;
1049 
1050 	/* Look up and lock the request corresponding to the given XID */
1051 	spin_lock(&xprt->transport_lock);
1052 	rovr = xprt_lookup_rqst(xprt, *xp);
1053 	if (!rovr)
1054 		goto out_unlock;
1055 	task = rovr->rq_task;
1056 
1057 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
1058 		copied = repsize;
1059 
1060 	/* Suck it into the iovec, verify checksum if not done by hw. */
1061 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1062 		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1063 		goto out_unlock;
1064 	}
1065 
1066 	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1067 
1068 	xprt_adjust_cwnd(xprt, task, copied);
1069 	xprt_complete_rqst(task, copied);
1070 
1071  out_unlock:
1072 	spin_unlock(&xprt->transport_lock);
1073  dropit:
1074 	skb_free_datagram(sk, skb);
1075  out:
1076 	read_unlock_bh(&sk->sk_callback_lock);
1077 }
1078 
1079 /*
1080  * Helper function to force a TCP close if the server is sending
1081  * junk and/or it has put us in CLOSE_WAIT
1082  */
1083 static void xs_tcp_force_close(struct rpc_xprt *xprt)
1084 {
1085 	set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1086 	xprt_force_disconnect(xprt);
1087 }
1088 
1089 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1090 {
1091 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1092 	size_t len, used;
1093 	char *p;
1094 
1095 	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1096 	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1097 	used = xdr_skb_read_bits(desc, p, len);
1098 	transport->tcp_offset += used;
1099 	if (used != len)
1100 		return;
1101 
1102 	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1103 	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1104 		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1105 	else
1106 		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1107 	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1108 
1109 	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1110 	transport->tcp_offset = 0;
1111 
1112 	/* Sanity check of the record length */
1113 	if (unlikely(transport->tcp_reclen < 8)) {
1114 		dprintk("RPC:       invalid TCP record fragment length\n");
1115 		xs_tcp_force_close(xprt);
1116 		return;
1117 	}
1118 	dprintk("RPC:       reading TCP record fragment of length %d\n",
1119 			transport->tcp_reclen);
1120 }
1121 
1122 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1123 {
1124 	if (transport->tcp_offset == transport->tcp_reclen) {
1125 		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1126 		transport->tcp_offset = 0;
1127 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1128 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1129 			transport->tcp_flags |= TCP_RCV_COPY_XID;
1130 			transport->tcp_copied = 0;
1131 		}
1132 	}
1133 }
1134 
1135 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1136 {
1137 	size_t len, used;
1138 	char *p;
1139 
1140 	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1141 	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1142 	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1143 	used = xdr_skb_read_bits(desc, p, len);
1144 	transport->tcp_offset += used;
1145 	if (used != len)
1146 		return;
1147 	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1148 	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1149 	transport->tcp_copied = 4;
1150 	dprintk("RPC:       reading %s XID %08x\n",
1151 			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1152 							      : "request with",
1153 			ntohl(transport->tcp_xid));
1154 	xs_tcp_check_fraghdr(transport);
1155 }
1156 
1157 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1158 				       struct xdr_skb_reader *desc)
1159 {
1160 	size_t len, used;
1161 	u32 offset;
1162 	char *p;
1163 
1164 	/*
1165 	 * We want transport->tcp_offset to be 8 at the end of this routine
1166 	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
1167 	 * When this function is called for the first time,
1168 	 * transport->tcp_offset is 4 (after having already read the xid).
1169 	 */
1170 	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1171 	len = sizeof(transport->tcp_calldir) - offset;
1172 	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1173 	p = ((char *) &transport->tcp_calldir) + offset;
1174 	used = xdr_skb_read_bits(desc, p, len);
1175 	transport->tcp_offset += used;
1176 	if (used != len)
1177 		return;
1178 	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1179 	/*
1180 	 * We don't yet have the XDR buffer, so we will write the calldir
1181 	 * out after we get the buffer from the 'struct rpc_rqst'
1182 	 */
1183 	switch (ntohl(transport->tcp_calldir)) {
1184 	case RPC_REPLY:
1185 		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1186 		transport->tcp_flags |= TCP_RCV_COPY_DATA;
1187 		transport->tcp_flags |= TCP_RPC_REPLY;
1188 		break;
1189 	case RPC_CALL:
1190 		transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1191 		transport->tcp_flags |= TCP_RCV_COPY_DATA;
1192 		transport->tcp_flags &= ~TCP_RPC_REPLY;
1193 		break;
1194 	default:
1195 		dprintk("RPC:       invalid request message type\n");
1196 		xs_tcp_force_close(&transport->xprt);
1197 	}
1198 	xs_tcp_check_fraghdr(transport);
1199 }
1200 
1201 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1202 				     struct xdr_skb_reader *desc,
1203 				     struct rpc_rqst *req)
1204 {
1205 	struct sock_xprt *transport =
1206 				container_of(xprt, struct sock_xprt, xprt);
1207 	struct xdr_buf *rcvbuf;
1208 	size_t len;
1209 	ssize_t r;
1210 
1211 	rcvbuf = &req->rq_private_buf;
1212 
1213 	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1214 		/*
1215 		 * Save the RPC direction in the XDR buffer
1216 		 */
1217 		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1218 			&transport->tcp_calldir,
1219 			sizeof(transport->tcp_calldir));
1220 		transport->tcp_copied += sizeof(transport->tcp_calldir);
1221 		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1222 	}
1223 
1224 	len = desc->count;
1225 	if (len > transport->tcp_reclen - transport->tcp_offset) {
1226 		struct xdr_skb_reader my_desc;
1227 
1228 		len = transport->tcp_reclen - transport->tcp_offset;
1229 		memcpy(&my_desc, desc, sizeof(my_desc));
1230 		my_desc.count = len;
1231 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1232 					  &my_desc, xdr_skb_read_bits);
1233 		desc->count -= r;
1234 		desc->offset += r;
1235 	} else
1236 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1237 					  desc, xdr_skb_read_bits);
1238 
1239 	if (r > 0) {
1240 		transport->tcp_copied += r;
1241 		transport->tcp_offset += r;
1242 	}
1243 	if (r != len) {
1244 		/* Error when copying to the receive buffer,
1245 		 * usually because we weren't able to allocate
1246 		 * additional buffer pages. All we can do now
1247 		 * is turn off TCP_RCV_COPY_DATA, so the request
1248 		 * will not receive any additional updates,
1249 		 * and time out.
1250 		 * Any remaining data from this record will
1251 		 * be discarded.
1252 		 */
1253 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1254 		dprintk("RPC:       XID %08x truncated request\n",
1255 				ntohl(transport->tcp_xid));
1256 		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1257 				"tcp_offset = %u, tcp_reclen = %u\n",
1258 				xprt, transport->tcp_copied,
1259 				transport->tcp_offset, transport->tcp_reclen);
1260 		return;
1261 	}
1262 
1263 	dprintk("RPC:       XID %08x read %Zd bytes\n",
1264 			ntohl(transport->tcp_xid), r);
1265 	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1266 			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
1267 			transport->tcp_offset, transport->tcp_reclen);
1268 
1269 	if (transport->tcp_copied == req->rq_private_buf.buflen)
1270 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1271 	else if (transport->tcp_offset == transport->tcp_reclen) {
1272 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1273 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1274 	}
1275 }
1276 
1277 /*
1278  * Finds the request corresponding to the RPC xid and invokes the common
1279  * tcp read code to read the data.
1280  */
1281 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1282 				    struct xdr_skb_reader *desc)
1283 {
1284 	struct sock_xprt *transport =
1285 				container_of(xprt, struct sock_xprt, xprt);
1286 	struct rpc_rqst *req;
1287 
1288 	dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1289 
1290 	/* Find and lock the request corresponding to this xid */
1291 	spin_lock(&xprt->transport_lock);
1292 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1293 	if (!req) {
1294 		dprintk("RPC:       XID %08x request not found!\n",
1295 				ntohl(transport->tcp_xid));
1296 		spin_unlock(&xprt->transport_lock);
1297 		return -1;
1298 	}
1299 
1300 	xs_tcp_read_common(xprt, desc, req);
1301 
1302 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1303 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1304 
1305 	spin_unlock(&xprt->transport_lock);
1306 	return 0;
1307 }
1308 
1309 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1310 /*
1311  * Obtains an rpc_rqst previously allocated and invokes the common
1312  * tcp read code to read the data.  The result is placed in the callback
1313  * queue.
1314  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1315  * connection and return -1.
1316  */
1317 static int xs_tcp_read_callback(struct rpc_xprt *xprt,
1318 				       struct xdr_skb_reader *desc)
1319 {
1320 	struct sock_xprt *transport =
1321 				container_of(xprt, struct sock_xprt, xprt);
1322 	struct rpc_rqst *req;
1323 
1324 	/* Look up and lock the request corresponding to the given XID */
1325 	spin_lock(&xprt->transport_lock);
1326 	req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
1327 	if (req == NULL) {
1328 		spin_unlock(&xprt->transport_lock);
1329 		printk(KERN_WARNING "Callback slot table overflowed\n");
1330 		xprt_force_disconnect(xprt);
1331 		return -1;
1332 	}
1333 
1334 	dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1335 	xs_tcp_read_common(xprt, desc, req);
1336 
1337 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1338 		xprt_complete_bc_request(req, transport->tcp_copied);
1339 	spin_unlock(&xprt->transport_lock);
1340 
1341 	return 0;
1342 }
1343 
1344 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1345 					struct xdr_skb_reader *desc)
1346 {
1347 	struct sock_xprt *transport =
1348 				container_of(xprt, struct sock_xprt, xprt);
1349 
1350 	return (transport->tcp_flags & TCP_RPC_REPLY) ?
1351 		xs_tcp_read_reply(xprt, desc) :
1352 		xs_tcp_read_callback(xprt, desc);
1353 }
1354 #else
1355 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1356 					struct xdr_skb_reader *desc)
1357 {
1358 	return xs_tcp_read_reply(xprt, desc);
1359 }
1360 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1361 
1362 /*
1363  * Read data off the transport.  This can be either an RPC_CALL or an
1364  * RPC_REPLY.  Relay the processing to helper functions.
1365  */
1366 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1367 				    struct xdr_skb_reader *desc)
1368 {
1369 	struct sock_xprt *transport =
1370 				container_of(xprt, struct sock_xprt, xprt);
1371 
1372 	if (_xs_tcp_read_data(xprt, desc) == 0)
1373 		xs_tcp_check_fraghdr(transport);
1374 	else {
1375 		/*
1376 		 * The transport_lock protects the request handling.
1377 		 * There's no need to hold it to update the tcp_flags.
1378 		 */
1379 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1380 	}
1381 }
1382 
1383 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1384 {
1385 	size_t len;
1386 
1387 	len = transport->tcp_reclen - transport->tcp_offset;
1388 	if (len > desc->count)
1389 		len = desc->count;
1390 	desc->count -= len;
1391 	desc->offset += len;
1392 	transport->tcp_offset += len;
1393 	dprintk("RPC:       discarded %Zu bytes\n", len);
1394 	xs_tcp_check_fraghdr(transport);
1395 }
1396 
1397 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1398 {
1399 	struct rpc_xprt *xprt = rd_desc->arg.data;
1400 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1401 	struct xdr_skb_reader desc = {
1402 		.skb	= skb,
1403 		.offset	= offset,
1404 		.count	= len,
1405 	};
1406 
1407 	dprintk("RPC:       xs_tcp_data_recv started\n");
1408 	do {
1409 		/* Read in a new fragment marker if necessary */
1410 		/* Can we ever really expect to get completely empty fragments? */
1411 		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1412 			xs_tcp_read_fraghdr(xprt, &desc);
1413 			continue;
1414 		}
1415 		/* Read in the xid if necessary */
1416 		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1417 			xs_tcp_read_xid(transport, &desc);
1418 			continue;
1419 		}
1420 		/* Read in the call/reply flag */
1421 		if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1422 			xs_tcp_read_calldir(transport, &desc);
1423 			continue;
1424 		}
1425 		/* Read in the request data */
1426 		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1427 			xs_tcp_read_data(xprt, &desc);
1428 			continue;
1429 		}
1430 		/* Skip over any trailing bytes on short reads */
1431 		xs_tcp_read_discard(transport, &desc);
1432 	} while (desc.count);
1433 	dprintk("RPC:       xs_tcp_data_recv done\n");
1434 	return len - desc.count;
1435 }
1436 
1437 /**
1438  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1439  * @sk: socket with data to read
1440  * @bytes: how much data to read
1441  *
1442  */
1443 static void xs_tcp_data_ready(struct sock *sk)
1444 {
1445 	struct rpc_xprt *xprt;
1446 	read_descriptor_t rd_desc;
1447 	int read;
1448 
1449 	dprintk("RPC:       xs_tcp_data_ready...\n");
1450 
1451 	read_lock_bh(&sk->sk_callback_lock);
1452 	if (!(xprt = xprt_from_sock(sk)))
1453 		goto out;
1454 	/* Any data means we had a useful conversation, so
1455 	 * the we don't need to delay the next reconnect
1456 	 */
1457 	if (xprt->reestablish_timeout)
1458 		xprt->reestablish_timeout = 0;
1459 
1460 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1461 	rd_desc.arg.data = xprt;
1462 	do {
1463 		rd_desc.count = 65536;
1464 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1465 	} while (read > 0);
1466 out:
1467 	read_unlock_bh(&sk->sk_callback_lock);
1468 }
1469 
1470 /*
1471  * Do the equivalent of linger/linger2 handling for dealing with
1472  * broken servers that don't close the socket in a timely
1473  * fashion
1474  */
1475 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1476 		unsigned long timeout)
1477 {
1478 	struct sock_xprt *transport;
1479 
1480 	if (xprt_test_and_set_connecting(xprt))
1481 		return;
1482 	set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1483 	transport = container_of(xprt, struct sock_xprt, xprt);
1484 	queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1485 			   timeout);
1486 }
1487 
1488 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1489 {
1490 	struct sock_xprt *transport;
1491 
1492 	transport = container_of(xprt, struct sock_xprt, xprt);
1493 
1494 	if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1495 	    !cancel_delayed_work(&transport->connect_worker))
1496 		return;
1497 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1498 	xprt_clear_connecting(xprt);
1499 }
1500 
1501 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
1502 {
1503 	smp_mb__before_atomic();
1504 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1505 	clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1506 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1507 	clear_bit(XPRT_CLOSING, &xprt->state);
1508 	smp_mb__after_atomic();
1509 }
1510 
1511 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1512 {
1513 	xs_sock_reset_connection_flags(xprt);
1514 	/* Mark transport as closed and wake up all pending tasks */
1515 	xprt_disconnect_done(xprt);
1516 }
1517 
1518 /**
1519  * xs_tcp_state_change - callback to handle TCP socket state changes
1520  * @sk: socket whose state has changed
1521  *
1522  */
1523 static void xs_tcp_state_change(struct sock *sk)
1524 {
1525 	struct rpc_xprt *xprt;
1526 
1527 	read_lock_bh(&sk->sk_callback_lock);
1528 	if (!(xprt = xprt_from_sock(sk)))
1529 		goto out;
1530 	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1531 	dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1532 			sk->sk_state, xprt_connected(xprt),
1533 			sock_flag(sk, SOCK_DEAD),
1534 			sock_flag(sk, SOCK_ZAPPED),
1535 			sk->sk_shutdown);
1536 
1537 	trace_rpc_socket_state_change(xprt, sk->sk_socket);
1538 	switch (sk->sk_state) {
1539 	case TCP_ESTABLISHED:
1540 		spin_lock(&xprt->transport_lock);
1541 		if (!xprt_test_and_set_connected(xprt)) {
1542 			struct sock_xprt *transport = container_of(xprt,
1543 					struct sock_xprt, xprt);
1544 
1545 			/* Reset TCP record info */
1546 			transport->tcp_offset = 0;
1547 			transport->tcp_reclen = 0;
1548 			transport->tcp_copied = 0;
1549 			transport->tcp_flags =
1550 				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1551 			xprt->connect_cookie++;
1552 
1553 			xprt_wake_pending_tasks(xprt, -EAGAIN);
1554 		}
1555 		spin_unlock(&xprt->transport_lock);
1556 		break;
1557 	case TCP_FIN_WAIT1:
1558 		/* The client initiated a shutdown of the socket */
1559 		xprt->connect_cookie++;
1560 		xprt->reestablish_timeout = 0;
1561 		set_bit(XPRT_CLOSING, &xprt->state);
1562 		smp_mb__before_atomic();
1563 		clear_bit(XPRT_CONNECTED, &xprt->state);
1564 		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1565 		smp_mb__after_atomic();
1566 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1567 		break;
1568 	case TCP_CLOSE_WAIT:
1569 		/* The server initiated a shutdown of the socket */
1570 		xprt->connect_cookie++;
1571 		clear_bit(XPRT_CONNECTED, &xprt->state);
1572 		xs_tcp_force_close(xprt);
1573 	case TCP_CLOSING:
1574 		/*
1575 		 * If the server closed down the connection, make sure that
1576 		 * we back off before reconnecting
1577 		 */
1578 		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1579 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1580 		break;
1581 	case TCP_LAST_ACK:
1582 		set_bit(XPRT_CLOSING, &xprt->state);
1583 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1584 		smp_mb__before_atomic();
1585 		clear_bit(XPRT_CONNECTED, &xprt->state);
1586 		smp_mb__after_atomic();
1587 		break;
1588 	case TCP_CLOSE:
1589 		xs_tcp_cancel_linger_timeout(xprt);
1590 		xs_sock_mark_closed(xprt);
1591 	}
1592  out:
1593 	read_unlock_bh(&sk->sk_callback_lock);
1594 }
1595 
1596 static void xs_write_space(struct sock *sk)
1597 {
1598 	struct socket *sock;
1599 	struct rpc_xprt *xprt;
1600 
1601 	if (unlikely(!(sock = sk->sk_socket)))
1602 		return;
1603 	clear_bit(SOCK_NOSPACE, &sock->flags);
1604 
1605 	if (unlikely(!(xprt = xprt_from_sock(sk))))
1606 		return;
1607 	if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1608 		return;
1609 
1610 	xprt_write_space(xprt);
1611 }
1612 
1613 /**
1614  * xs_udp_write_space - callback invoked when socket buffer space
1615  *                             becomes available
1616  * @sk: socket whose state has changed
1617  *
1618  * Called when more output buffer space is available for this socket.
1619  * We try not to wake our writers until they can make "significant"
1620  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1621  * with a bunch of small requests.
1622  */
1623 static void xs_udp_write_space(struct sock *sk)
1624 {
1625 	read_lock_bh(&sk->sk_callback_lock);
1626 
1627 	/* from net/core/sock.c:sock_def_write_space */
1628 	if (sock_writeable(sk))
1629 		xs_write_space(sk);
1630 
1631 	read_unlock_bh(&sk->sk_callback_lock);
1632 }
1633 
1634 /**
1635  * xs_tcp_write_space - callback invoked when socket buffer space
1636  *                             becomes available
1637  * @sk: socket whose state has changed
1638  *
1639  * Called when more output buffer space is available for this socket.
1640  * We try not to wake our writers until they can make "significant"
1641  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1642  * with a bunch of small requests.
1643  */
1644 static void xs_tcp_write_space(struct sock *sk)
1645 {
1646 	read_lock_bh(&sk->sk_callback_lock);
1647 
1648 	/* from net/core/stream.c:sk_stream_write_space */
1649 	if (sk_stream_is_writeable(sk))
1650 		xs_write_space(sk);
1651 
1652 	read_unlock_bh(&sk->sk_callback_lock);
1653 }
1654 
1655 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1656 {
1657 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1658 	struct sock *sk = transport->inet;
1659 
1660 	if (transport->rcvsize) {
1661 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1662 		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1663 	}
1664 	if (transport->sndsize) {
1665 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1666 		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1667 		sk->sk_write_space(sk);
1668 	}
1669 }
1670 
1671 /**
1672  * xs_udp_set_buffer_size - set send and receive limits
1673  * @xprt: generic transport
1674  * @sndsize: requested size of send buffer, in bytes
1675  * @rcvsize: requested size of receive buffer, in bytes
1676  *
1677  * Set socket send and receive buffer size limits.
1678  */
1679 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1680 {
1681 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1682 
1683 	transport->sndsize = 0;
1684 	if (sndsize)
1685 		transport->sndsize = sndsize + 1024;
1686 	transport->rcvsize = 0;
1687 	if (rcvsize)
1688 		transport->rcvsize = rcvsize + 1024;
1689 
1690 	xs_udp_do_set_buffer_size(xprt);
1691 }
1692 
1693 /**
1694  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1695  * @task: task that timed out
1696  *
1697  * Adjust the congestion window after a retransmit timeout has occurred.
1698  */
1699 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
1700 {
1701 	xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
1702 }
1703 
1704 static unsigned short xs_get_random_port(void)
1705 {
1706 	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1707 	unsigned short rand = (unsigned short) prandom_u32() % range;
1708 	return rand + xprt_min_resvport;
1709 }
1710 
1711 /**
1712  * xs_set_port - reset the port number in the remote endpoint address
1713  * @xprt: generic transport
1714  * @port: new port number
1715  *
1716  */
1717 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1718 {
1719 	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1720 
1721 	rpc_set_port(xs_addr(xprt), port);
1722 	xs_update_peer_port(xprt);
1723 }
1724 
1725 static unsigned short xs_get_srcport(struct sock_xprt *transport)
1726 {
1727 	unsigned short port = transport->srcport;
1728 
1729 	if (port == 0 && transport->xprt.resvport)
1730 		port = xs_get_random_port();
1731 	return port;
1732 }
1733 
1734 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1735 {
1736 	if (transport->srcport != 0)
1737 		transport->srcport = 0;
1738 	if (!transport->xprt.resvport)
1739 		return 0;
1740 	if (port <= xprt_min_resvport || port > xprt_max_resvport)
1741 		return xprt_max_resvport;
1742 	return --port;
1743 }
1744 static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1745 {
1746 	struct sockaddr_storage myaddr;
1747 	int err, nloop = 0;
1748 	unsigned short port = xs_get_srcport(transport);
1749 	unsigned short last;
1750 
1751 	/*
1752 	 * If we are asking for any ephemeral port (i.e. port == 0 &&
1753 	 * transport->xprt.resvport == 0), don't bind.  Let the local
1754 	 * port selection happen implicitly when the socket is used
1755 	 * (for example at connect time).
1756 	 *
1757 	 * This ensures that we can continue to establish TCP
1758 	 * connections even when all local ephemeral ports are already
1759 	 * a part of some TCP connection.  This makes no difference
1760 	 * for UDP sockets, but also doens't harm them.
1761 	 *
1762 	 * If we're asking for any reserved port (i.e. port == 0 &&
1763 	 * transport->xprt.resvport == 1) xs_get_srcport above will
1764 	 * ensure that port is non-zero and we will bind as needed.
1765 	 */
1766 	if (port == 0)
1767 		return 0;
1768 
1769 	memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1770 	do {
1771 		rpc_set_port((struct sockaddr *)&myaddr, port);
1772 		err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1773 				transport->xprt.addrlen);
1774 		if (err == 0) {
1775 			transport->srcport = port;
1776 			break;
1777 		}
1778 		last = port;
1779 		port = xs_next_srcport(transport, port);
1780 		if (port > last)
1781 			nloop++;
1782 	} while (err == -EADDRINUSE && nloop != 2);
1783 
1784 	if (myaddr.ss_family == AF_INET)
1785 		dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1786 				&((struct sockaddr_in *)&myaddr)->sin_addr,
1787 				port, err ? "failed" : "ok", err);
1788 	else
1789 		dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1790 				&((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1791 				port, err ? "failed" : "ok", err);
1792 	return err;
1793 }
1794 
1795 /*
1796  * We don't support autobind on AF_LOCAL sockets
1797  */
1798 static void xs_local_rpcbind(struct rpc_task *task)
1799 {
1800 	rcu_read_lock();
1801 	xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
1802 	rcu_read_unlock();
1803 }
1804 
1805 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1806 {
1807 }
1808 
1809 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1810 static struct lock_class_key xs_key[2];
1811 static struct lock_class_key xs_slock_key[2];
1812 
1813 static inline void xs_reclassify_socketu(struct socket *sock)
1814 {
1815 	struct sock *sk = sock->sk;
1816 
1817 	sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1818 		&xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1819 }
1820 
1821 static inline void xs_reclassify_socket4(struct socket *sock)
1822 {
1823 	struct sock *sk = sock->sk;
1824 
1825 	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1826 		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1827 }
1828 
1829 static inline void xs_reclassify_socket6(struct socket *sock)
1830 {
1831 	struct sock *sk = sock->sk;
1832 
1833 	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1834 		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1835 }
1836 
1837 static inline void xs_reclassify_socket(int family, struct socket *sock)
1838 {
1839 	WARN_ON_ONCE(sock_owned_by_user(sock->sk));
1840 	if (sock_owned_by_user(sock->sk))
1841 		return;
1842 
1843 	switch (family) {
1844 	case AF_LOCAL:
1845 		xs_reclassify_socketu(sock);
1846 		break;
1847 	case AF_INET:
1848 		xs_reclassify_socket4(sock);
1849 		break;
1850 	case AF_INET6:
1851 		xs_reclassify_socket6(sock);
1852 		break;
1853 	}
1854 }
1855 #else
1856 static inline void xs_reclassify_socketu(struct socket *sock)
1857 {
1858 }
1859 
1860 static inline void xs_reclassify_socket4(struct socket *sock)
1861 {
1862 }
1863 
1864 static inline void xs_reclassify_socket6(struct socket *sock)
1865 {
1866 }
1867 
1868 static inline void xs_reclassify_socket(int family, struct socket *sock)
1869 {
1870 }
1871 #endif
1872 
1873 static void xs_dummy_setup_socket(struct work_struct *work)
1874 {
1875 }
1876 
1877 static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1878 		struct sock_xprt *transport, int family, int type, int protocol)
1879 {
1880 	struct socket *sock;
1881 	int err;
1882 
1883 	err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1884 	if (err < 0) {
1885 		dprintk("RPC:       can't create %d transport socket (%d).\n",
1886 				protocol, -err);
1887 		goto out;
1888 	}
1889 	xs_reclassify_socket(family, sock);
1890 
1891 	err = xs_bind(transport, sock);
1892 	if (err) {
1893 		sock_release(sock);
1894 		goto out;
1895 	}
1896 
1897 	return sock;
1898 out:
1899 	return ERR_PTR(err);
1900 }
1901 
1902 static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1903 				      struct socket *sock)
1904 {
1905 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1906 									xprt);
1907 
1908 	if (!transport->inet) {
1909 		struct sock *sk = sock->sk;
1910 
1911 		write_lock_bh(&sk->sk_callback_lock);
1912 
1913 		xs_save_old_callbacks(transport, sk);
1914 
1915 		sk->sk_user_data = xprt;
1916 		sk->sk_data_ready = xs_local_data_ready;
1917 		sk->sk_write_space = xs_udp_write_space;
1918 		sk->sk_error_report = xs_error_report;
1919 		sk->sk_allocation = GFP_ATOMIC;
1920 
1921 		xprt_clear_connected(xprt);
1922 
1923 		/* Reset to new socket */
1924 		transport->sock = sock;
1925 		transport->inet = sk;
1926 
1927 		write_unlock_bh(&sk->sk_callback_lock);
1928 	}
1929 
1930 	/* Tell the socket layer to start connecting... */
1931 	xprt->stat.connect_count++;
1932 	xprt->stat.connect_start = jiffies;
1933 	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1934 }
1935 
1936 /**
1937  * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1938  * @xprt: RPC transport to connect
1939  * @transport: socket transport to connect
1940  * @create_sock: function to create a socket of the correct type
1941  */
1942 static int xs_local_setup_socket(struct sock_xprt *transport)
1943 {
1944 	struct rpc_xprt *xprt = &transport->xprt;
1945 	struct socket *sock;
1946 	int status = -EIO;
1947 
1948 	current->flags |= PF_FSTRANS;
1949 
1950 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1951 	status = __sock_create(xprt->xprt_net, AF_LOCAL,
1952 					SOCK_STREAM, 0, &sock, 1);
1953 	if (status < 0) {
1954 		dprintk("RPC:       can't create AF_LOCAL "
1955 			"transport socket (%d).\n", -status);
1956 		goto out;
1957 	}
1958 	xs_reclassify_socketu(sock);
1959 
1960 	dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1961 			xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1962 
1963 	status = xs_local_finish_connecting(xprt, sock);
1964 	trace_rpc_socket_connect(xprt, sock, status);
1965 	switch (status) {
1966 	case 0:
1967 		dprintk("RPC:       xprt %p connected to %s\n",
1968 				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1969 		xprt_set_connected(xprt);
1970 	case -ENOBUFS:
1971 		break;
1972 	case -ENOENT:
1973 		dprintk("RPC:       xprt %p: socket %s does not exist\n",
1974 				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1975 		break;
1976 	case -ECONNREFUSED:
1977 		dprintk("RPC:       xprt %p: connection refused for %s\n",
1978 				xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1979 		break;
1980 	default:
1981 		printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1982 				__func__, -status,
1983 				xprt->address_strings[RPC_DISPLAY_ADDR]);
1984 	}
1985 
1986 out:
1987 	xprt_clear_connecting(xprt);
1988 	xprt_wake_pending_tasks(xprt, status);
1989 	current->flags &= ~PF_FSTRANS;
1990 	return status;
1991 }
1992 
1993 static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task)
1994 {
1995 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1996 	int ret;
1997 
1998 	 if (RPC_IS_ASYNC(task)) {
1999 		/*
2000 		 * We want the AF_LOCAL connect to be resolved in the
2001 		 * filesystem namespace of the process making the rpc
2002 		 * call.  Thus we connect synchronously.
2003 		 *
2004 		 * If we want to support asynchronous AF_LOCAL calls,
2005 		 * we'll need to figure out how to pass a namespace to
2006 		 * connect.
2007 		 */
2008 		rpc_exit(task, -ENOTCONN);
2009 		return;
2010 	}
2011 	ret = xs_local_setup_socket(transport);
2012 	if (ret && !RPC_IS_SOFTCONN(task))
2013 		msleep_interruptible(15000);
2014 }
2015 
2016 #ifdef CONFIG_SUNRPC_SWAP
2017 static void xs_set_memalloc(struct rpc_xprt *xprt)
2018 {
2019 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
2020 			xprt);
2021 
2022 	if (xprt->swapper)
2023 		sk_set_memalloc(transport->inet);
2024 }
2025 
2026 /**
2027  * xs_swapper - Tag this transport as being used for swap.
2028  * @xprt: transport to tag
2029  * @enable: enable/disable
2030  *
2031  */
2032 int xs_swapper(struct rpc_xprt *xprt, int enable)
2033 {
2034 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
2035 			xprt);
2036 	int err = 0;
2037 
2038 	if (enable) {
2039 		xprt->swapper++;
2040 		xs_set_memalloc(xprt);
2041 	} else if (xprt->swapper) {
2042 		xprt->swapper--;
2043 		sk_clear_memalloc(transport->inet);
2044 	}
2045 
2046 	return err;
2047 }
2048 EXPORT_SYMBOL_GPL(xs_swapper);
2049 #else
2050 static void xs_set_memalloc(struct rpc_xprt *xprt)
2051 {
2052 }
2053 #endif
2054 
2055 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2056 {
2057 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2058 
2059 	if (!transport->inet) {
2060 		struct sock *sk = sock->sk;
2061 
2062 		write_lock_bh(&sk->sk_callback_lock);
2063 
2064 		xs_save_old_callbacks(transport, sk);
2065 
2066 		sk->sk_user_data = xprt;
2067 		sk->sk_data_ready = xs_udp_data_ready;
2068 		sk->sk_write_space = xs_udp_write_space;
2069 		sk->sk_allocation = GFP_ATOMIC;
2070 
2071 		xprt_set_connected(xprt);
2072 
2073 		/* Reset to new socket */
2074 		transport->sock = sock;
2075 		transport->inet = sk;
2076 
2077 		xs_set_memalloc(xprt);
2078 
2079 		write_unlock_bh(&sk->sk_callback_lock);
2080 	}
2081 	xs_udp_do_set_buffer_size(xprt);
2082 }
2083 
2084 static void xs_udp_setup_socket(struct work_struct *work)
2085 {
2086 	struct sock_xprt *transport =
2087 		container_of(work, struct sock_xprt, connect_worker.work);
2088 	struct rpc_xprt *xprt = &transport->xprt;
2089 	struct socket *sock = transport->sock;
2090 	int status = -EIO;
2091 
2092 	current->flags |= PF_FSTRANS;
2093 
2094 	/* Start by resetting any existing state */
2095 	xs_reset_transport(transport);
2096 	sock = xs_create_sock(xprt, transport,
2097 			xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
2098 	if (IS_ERR(sock))
2099 		goto out;
2100 
2101 	dprintk("RPC:       worker connecting xprt %p via %s to "
2102 				"%s (port %s)\n", xprt,
2103 			xprt->address_strings[RPC_DISPLAY_PROTO],
2104 			xprt->address_strings[RPC_DISPLAY_ADDR],
2105 			xprt->address_strings[RPC_DISPLAY_PORT]);
2106 
2107 	xs_udp_finish_connecting(xprt, sock);
2108 	trace_rpc_socket_connect(xprt, sock, 0);
2109 	status = 0;
2110 out:
2111 	xprt_clear_connecting(xprt);
2112 	xprt_wake_pending_tasks(xprt, status);
2113 	current->flags &= ~PF_FSTRANS;
2114 }
2115 
2116 /*
2117  * We need to preserve the port number so the reply cache on the server can
2118  * find our cached RPC replies when we get around to reconnecting.
2119  */
2120 static void xs_abort_connection(struct sock_xprt *transport)
2121 {
2122 	int result;
2123 	struct sockaddr any;
2124 
2125 	dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
2126 
2127 	/*
2128 	 * Disconnect the transport socket by doing a connect operation
2129 	 * with AF_UNSPEC.  This should return immediately...
2130 	 */
2131 	memset(&any, 0, sizeof(any));
2132 	any.sa_family = AF_UNSPEC;
2133 	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
2134 	trace_rpc_socket_reset_connection(&transport->xprt,
2135 			transport->sock, result);
2136 	if (!result)
2137 		xs_sock_reset_connection_flags(&transport->xprt);
2138 	dprintk("RPC:       AF_UNSPEC connect return code %d\n", result);
2139 }
2140 
2141 static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2142 {
2143 	unsigned int state = transport->inet->sk_state;
2144 
2145 	if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2146 		/* we don't need to abort the connection if the socket
2147 		 * hasn't undergone a shutdown
2148 		 */
2149 		if (transport->inet->sk_shutdown == 0)
2150 			return;
2151 		dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2152 				__func__, transport->inet->sk_shutdown);
2153 	}
2154 	if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2155 		/* we don't need to abort the connection if the socket
2156 		 * hasn't undergone a shutdown
2157 		 */
2158 		if (transport->inet->sk_shutdown == 0)
2159 			return;
2160 		dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2161 				"sk_shutdown set to %d\n",
2162 				__func__, transport->inet->sk_shutdown);
2163 	}
2164 	xs_abort_connection(transport);
2165 }
2166 
2167 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2168 {
2169 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2170 	int ret = -ENOTCONN;
2171 
2172 	if (!transport->inet) {
2173 		struct sock *sk = sock->sk;
2174 		unsigned int keepidle = xprt->timeout->to_initval / HZ;
2175 		unsigned int keepcnt = xprt->timeout->to_retries + 1;
2176 		unsigned int opt_on = 1;
2177 
2178 		/* TCP Keepalive options */
2179 		kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
2180 				(char *)&opt_on, sizeof(opt_on));
2181 		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
2182 				(char *)&keepidle, sizeof(keepidle));
2183 		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
2184 				(char *)&keepidle, sizeof(keepidle));
2185 		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
2186 				(char *)&keepcnt, sizeof(keepcnt));
2187 
2188 		write_lock_bh(&sk->sk_callback_lock);
2189 
2190 		xs_save_old_callbacks(transport, sk);
2191 
2192 		sk->sk_user_data = xprt;
2193 		sk->sk_data_ready = xs_tcp_data_ready;
2194 		sk->sk_state_change = xs_tcp_state_change;
2195 		sk->sk_write_space = xs_tcp_write_space;
2196 		sk->sk_error_report = xs_error_report;
2197 		sk->sk_allocation = GFP_ATOMIC;
2198 
2199 		/* socket options */
2200 		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2201 		sock_reset_flag(sk, SOCK_LINGER);
2202 		tcp_sk(sk)->linger2 = 0;
2203 		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2204 
2205 		xprt_clear_connected(xprt);
2206 
2207 		/* Reset to new socket */
2208 		transport->sock = sock;
2209 		transport->inet = sk;
2210 
2211 		write_unlock_bh(&sk->sk_callback_lock);
2212 	}
2213 
2214 	if (!xprt_bound(xprt))
2215 		goto out;
2216 
2217 	xs_set_memalloc(xprt);
2218 
2219 	/* Tell the socket layer to start connecting... */
2220 	xprt->stat.connect_count++;
2221 	xprt->stat.connect_start = jiffies;
2222 	ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2223 	switch (ret) {
2224 	case 0:
2225 	case -EINPROGRESS:
2226 		/* SYN_SENT! */
2227 		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2228 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2229 	}
2230 out:
2231 	return ret;
2232 }
2233 
2234 /**
2235  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2236  * @xprt: RPC transport to connect
2237  * @transport: socket transport to connect
2238  * @create_sock: function to create a socket of the correct type
2239  *
2240  * Invoked by a work queue tasklet.
2241  */
2242 static void xs_tcp_setup_socket(struct work_struct *work)
2243 {
2244 	struct sock_xprt *transport =
2245 		container_of(work, struct sock_xprt, connect_worker.work);
2246 	struct socket *sock = transport->sock;
2247 	struct rpc_xprt *xprt = &transport->xprt;
2248 	int status = -EIO;
2249 
2250 	current->flags |= PF_FSTRANS;
2251 
2252 	if (!sock) {
2253 		clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2254 		sock = xs_create_sock(xprt, transport,
2255 				xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2256 		if (IS_ERR(sock)) {
2257 			status = PTR_ERR(sock);
2258 			goto out;
2259 		}
2260 	} else {
2261 		int abort_and_exit;
2262 
2263 		abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2264 				&xprt->state);
2265 		/* "close" the socket, preserving the local port */
2266 		set_bit(XPRT_CONNECTION_REUSE, &xprt->state);
2267 		xs_tcp_reuse_connection(transport);
2268 		clear_bit(XPRT_CONNECTION_REUSE, &xprt->state);
2269 
2270 		if (abort_and_exit)
2271 			goto out_eagain;
2272 	}
2273 
2274 	dprintk("RPC:       worker connecting xprt %p via %s to "
2275 				"%s (port %s)\n", xprt,
2276 			xprt->address_strings[RPC_DISPLAY_PROTO],
2277 			xprt->address_strings[RPC_DISPLAY_ADDR],
2278 			xprt->address_strings[RPC_DISPLAY_PORT]);
2279 
2280 	status = xs_tcp_finish_connecting(xprt, sock);
2281 	trace_rpc_socket_connect(xprt, sock, status);
2282 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2283 			xprt, -status, xprt_connected(xprt),
2284 			sock->sk->sk_state);
2285 	switch (status) {
2286 	default:
2287 		printk("%s: connect returned unhandled error %d\n",
2288 			__func__, status);
2289 	case -EADDRNOTAVAIL:
2290 		/* We're probably in TIME_WAIT. Get rid of existing socket,
2291 		 * and retry
2292 		 */
2293 		xs_tcp_force_close(xprt);
2294 		break;
2295 	case 0:
2296 	case -EINPROGRESS:
2297 	case -EALREADY:
2298 		xprt_clear_connecting(xprt);
2299 		current->flags &= ~PF_FSTRANS;
2300 		return;
2301 	case -EINVAL:
2302 		/* Happens, for instance, if the user specified a link
2303 		 * local IPv6 address without a scope-id.
2304 		 */
2305 	case -ECONNREFUSED:
2306 	case -ECONNRESET:
2307 	case -ENETUNREACH:
2308 	case -ENOBUFS:
2309 		/* retry with existing socket, after a delay */
2310 		goto out;
2311 	}
2312 out_eagain:
2313 	status = -EAGAIN;
2314 out:
2315 	xprt_clear_connecting(xprt);
2316 	xprt_wake_pending_tasks(xprt, status);
2317 	current->flags &= ~PF_FSTRANS;
2318 }
2319 
2320 /**
2321  * xs_connect - connect a socket to a remote endpoint
2322  * @xprt: pointer to transport structure
2323  * @task: address of RPC task that manages state of connect request
2324  *
2325  * TCP: If the remote end dropped the connection, delay reconnecting.
2326  *
2327  * UDP socket connects are synchronous, but we use a work queue anyway
2328  * to guarantee that even unprivileged user processes can set up a
2329  * socket on a privileged port.
2330  *
2331  * If a UDP socket connect fails, the delay behavior here prevents
2332  * retry floods (hard mounts).
2333  */
2334 static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
2335 {
2336 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2337 
2338 	if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2339 		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2340 				"seconds\n",
2341 				xprt, xprt->reestablish_timeout / HZ);
2342 		queue_delayed_work(rpciod_workqueue,
2343 				   &transport->connect_worker,
2344 				   xprt->reestablish_timeout);
2345 		xprt->reestablish_timeout <<= 1;
2346 		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2347 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2348 		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2349 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2350 	} else {
2351 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2352 		queue_delayed_work(rpciod_workqueue,
2353 				   &transport->connect_worker, 0);
2354 	}
2355 }
2356 
2357 /**
2358  * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2359  * @xprt: rpc_xprt struct containing statistics
2360  * @seq: output file
2361  *
2362  */
2363 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2364 {
2365 	long idle_time = 0;
2366 
2367 	if (xprt_connected(xprt))
2368 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2369 
2370 	seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2371 			"%llu %llu %lu %llu %llu\n",
2372 			xprt->stat.bind_count,
2373 			xprt->stat.connect_count,
2374 			xprt->stat.connect_time,
2375 			idle_time,
2376 			xprt->stat.sends,
2377 			xprt->stat.recvs,
2378 			xprt->stat.bad_xids,
2379 			xprt->stat.req_u,
2380 			xprt->stat.bklog_u,
2381 			xprt->stat.max_slots,
2382 			xprt->stat.sending_u,
2383 			xprt->stat.pending_u);
2384 }
2385 
2386 /**
2387  * xs_udp_print_stats - display UDP socket-specifc stats
2388  * @xprt: rpc_xprt struct containing statistics
2389  * @seq: output file
2390  *
2391  */
2392 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2393 {
2394 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2395 
2396 	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu "
2397 			"%lu %llu %llu\n",
2398 			transport->srcport,
2399 			xprt->stat.bind_count,
2400 			xprt->stat.sends,
2401 			xprt->stat.recvs,
2402 			xprt->stat.bad_xids,
2403 			xprt->stat.req_u,
2404 			xprt->stat.bklog_u,
2405 			xprt->stat.max_slots,
2406 			xprt->stat.sending_u,
2407 			xprt->stat.pending_u);
2408 }
2409 
2410 /**
2411  * xs_tcp_print_stats - display TCP socket-specifc stats
2412  * @xprt: rpc_xprt struct containing statistics
2413  * @seq: output file
2414  *
2415  */
2416 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2417 {
2418 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2419 	long idle_time = 0;
2420 
2421 	if (xprt_connected(xprt))
2422 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2423 
2424 	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu "
2425 			"%llu %llu %lu %llu %llu\n",
2426 			transport->srcport,
2427 			xprt->stat.bind_count,
2428 			xprt->stat.connect_count,
2429 			xprt->stat.connect_time,
2430 			idle_time,
2431 			xprt->stat.sends,
2432 			xprt->stat.recvs,
2433 			xprt->stat.bad_xids,
2434 			xprt->stat.req_u,
2435 			xprt->stat.bklog_u,
2436 			xprt->stat.max_slots,
2437 			xprt->stat.sending_u,
2438 			xprt->stat.pending_u);
2439 }
2440 
2441 /*
2442  * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2443  * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2444  * to use the server side send routines.
2445  */
2446 static void *bc_malloc(struct rpc_task *task, size_t size)
2447 {
2448 	struct page *page;
2449 	struct rpc_buffer *buf;
2450 
2451 	WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2452 	if (size > PAGE_SIZE - sizeof(struct rpc_buffer))
2453 		return NULL;
2454 
2455 	page = alloc_page(GFP_KERNEL);
2456 	if (!page)
2457 		return NULL;
2458 
2459 	buf = page_address(page);
2460 	buf->len = PAGE_SIZE;
2461 
2462 	return buf->data;
2463 }
2464 
2465 /*
2466  * Free the space allocated in the bc_alloc routine
2467  */
2468 static void bc_free(void *buffer)
2469 {
2470 	struct rpc_buffer *buf;
2471 
2472 	if (!buffer)
2473 		return;
2474 
2475 	buf = container_of(buffer, struct rpc_buffer, data);
2476 	free_page((unsigned long)buf);
2477 }
2478 
2479 /*
2480  * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2481  * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2482  */
2483 static int bc_sendto(struct rpc_rqst *req)
2484 {
2485 	int len;
2486 	struct xdr_buf *xbufp = &req->rq_snd_buf;
2487 	struct rpc_xprt *xprt = req->rq_xprt;
2488 	struct sock_xprt *transport =
2489 				container_of(xprt, struct sock_xprt, xprt);
2490 	struct socket *sock = transport->sock;
2491 	unsigned long headoff;
2492 	unsigned long tailoff;
2493 
2494 	xs_encode_stream_record_marker(xbufp);
2495 
2496 	tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2497 	headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2498 	len = svc_send_common(sock, xbufp,
2499 			      virt_to_page(xbufp->head[0].iov_base), headoff,
2500 			      xbufp->tail[0].iov_base, tailoff);
2501 
2502 	if (len != xbufp->len) {
2503 		printk(KERN_NOTICE "Error sending entire callback!\n");
2504 		len = -EAGAIN;
2505 	}
2506 
2507 	return len;
2508 }
2509 
2510 /*
2511  * The send routine. Borrows from svc_send
2512  */
2513 static int bc_send_request(struct rpc_task *task)
2514 {
2515 	struct rpc_rqst *req = task->tk_rqstp;
2516 	struct svc_xprt	*xprt;
2517 	u32                     len;
2518 
2519 	dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2520 	/*
2521 	 * Get the server socket associated with this callback xprt
2522 	 */
2523 	xprt = req->rq_xprt->bc_xprt;
2524 
2525 	/*
2526 	 * Grab the mutex to serialize data as the connection is shared
2527 	 * with the fore channel
2528 	 */
2529 	if (!mutex_trylock(&xprt->xpt_mutex)) {
2530 		rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2531 		if (!mutex_trylock(&xprt->xpt_mutex))
2532 			return -EAGAIN;
2533 		rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2534 	}
2535 	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2536 		len = -ENOTCONN;
2537 	else
2538 		len = bc_sendto(req);
2539 	mutex_unlock(&xprt->xpt_mutex);
2540 
2541 	if (len > 0)
2542 		len = 0;
2543 
2544 	return len;
2545 }
2546 
2547 /*
2548  * The close routine. Since this is client initiated, we do nothing
2549  */
2550 
2551 static void bc_close(struct rpc_xprt *xprt)
2552 {
2553 }
2554 
2555 /*
2556  * The xprt destroy routine. Again, because this connection is client
2557  * initiated, we do nothing
2558  */
2559 
2560 static void bc_destroy(struct rpc_xprt *xprt)
2561 {
2562 	dprintk("RPC:       bc_destroy xprt %p\n", xprt);
2563 
2564 	xs_xprt_free(xprt);
2565 	module_put(THIS_MODULE);
2566 }
2567 
2568 static struct rpc_xprt_ops xs_local_ops = {
2569 	.reserve_xprt		= xprt_reserve_xprt,
2570 	.release_xprt		= xs_tcp_release_xprt,
2571 	.alloc_slot		= xprt_alloc_slot,
2572 	.rpcbind		= xs_local_rpcbind,
2573 	.set_port		= xs_local_set_port,
2574 	.connect		= xs_local_connect,
2575 	.buf_alloc		= rpc_malloc,
2576 	.buf_free		= rpc_free,
2577 	.send_request		= xs_local_send_request,
2578 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2579 	.close			= xs_close,
2580 	.destroy		= xs_destroy,
2581 	.print_stats		= xs_local_print_stats,
2582 };
2583 
2584 static struct rpc_xprt_ops xs_udp_ops = {
2585 	.set_buffer_size	= xs_udp_set_buffer_size,
2586 	.reserve_xprt		= xprt_reserve_xprt_cong,
2587 	.release_xprt		= xprt_release_xprt_cong,
2588 	.alloc_slot		= xprt_alloc_slot,
2589 	.rpcbind		= rpcb_getport_async,
2590 	.set_port		= xs_set_port,
2591 	.connect		= xs_connect,
2592 	.buf_alloc		= rpc_malloc,
2593 	.buf_free		= rpc_free,
2594 	.send_request		= xs_udp_send_request,
2595 	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
2596 	.timer			= xs_udp_timer,
2597 	.release_request	= xprt_release_rqst_cong,
2598 	.close			= xs_close,
2599 	.destroy		= xs_destroy,
2600 	.print_stats		= xs_udp_print_stats,
2601 };
2602 
2603 static struct rpc_xprt_ops xs_tcp_ops = {
2604 	.reserve_xprt		= xprt_reserve_xprt,
2605 	.release_xprt		= xs_tcp_release_xprt,
2606 	.alloc_slot		= xprt_lock_and_alloc_slot,
2607 	.rpcbind		= rpcb_getport_async,
2608 	.set_port		= xs_set_port,
2609 	.connect		= xs_connect,
2610 	.buf_alloc		= rpc_malloc,
2611 	.buf_free		= rpc_free,
2612 	.send_request		= xs_tcp_send_request,
2613 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2614 	.close			= xs_tcp_close,
2615 	.destroy		= xs_destroy,
2616 	.print_stats		= xs_tcp_print_stats,
2617 };
2618 
2619 /*
2620  * The rpc_xprt_ops for the server backchannel
2621  */
2622 
2623 static struct rpc_xprt_ops bc_tcp_ops = {
2624 	.reserve_xprt		= xprt_reserve_xprt,
2625 	.release_xprt		= xprt_release_xprt,
2626 	.alloc_slot		= xprt_alloc_slot,
2627 	.buf_alloc		= bc_malloc,
2628 	.buf_free		= bc_free,
2629 	.send_request		= bc_send_request,
2630 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2631 	.close			= bc_close,
2632 	.destroy		= bc_destroy,
2633 	.print_stats		= xs_tcp_print_stats,
2634 };
2635 
2636 static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2637 {
2638 	static const struct sockaddr_in sin = {
2639 		.sin_family		= AF_INET,
2640 		.sin_addr.s_addr	= htonl(INADDR_ANY),
2641 	};
2642 	static const struct sockaddr_in6 sin6 = {
2643 		.sin6_family		= AF_INET6,
2644 		.sin6_addr		= IN6ADDR_ANY_INIT,
2645 	};
2646 
2647 	switch (family) {
2648 	case AF_LOCAL:
2649 		break;
2650 	case AF_INET:
2651 		memcpy(sap, &sin, sizeof(sin));
2652 		break;
2653 	case AF_INET6:
2654 		memcpy(sap, &sin6, sizeof(sin6));
2655 		break;
2656 	default:
2657 		dprintk("RPC:       %s: Bad address family\n", __func__);
2658 		return -EAFNOSUPPORT;
2659 	}
2660 	return 0;
2661 }
2662 
2663 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2664 				      unsigned int slot_table_size,
2665 				      unsigned int max_slot_table_size)
2666 {
2667 	struct rpc_xprt *xprt;
2668 	struct sock_xprt *new;
2669 
2670 	if (args->addrlen > sizeof(xprt->addr)) {
2671 		dprintk("RPC:       xs_setup_xprt: address too large\n");
2672 		return ERR_PTR(-EBADF);
2673 	}
2674 
2675 	xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2676 			max_slot_table_size);
2677 	if (xprt == NULL) {
2678 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2679 				"rpc_xprt\n");
2680 		return ERR_PTR(-ENOMEM);
2681 	}
2682 
2683 	new = container_of(xprt, struct sock_xprt, xprt);
2684 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2685 	xprt->addrlen = args->addrlen;
2686 	if (args->srcaddr)
2687 		memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2688 	else {
2689 		int err;
2690 		err = xs_init_anyaddr(args->dstaddr->sa_family,
2691 					(struct sockaddr *)&new->srcaddr);
2692 		if (err != 0) {
2693 			xprt_free(xprt);
2694 			return ERR_PTR(err);
2695 		}
2696 	}
2697 
2698 	return xprt;
2699 }
2700 
2701 static const struct rpc_timeout xs_local_default_timeout = {
2702 	.to_initval = 10 * HZ,
2703 	.to_maxval = 10 * HZ,
2704 	.to_retries = 2,
2705 };
2706 
2707 /**
2708  * xs_setup_local - Set up transport to use an AF_LOCAL socket
2709  * @args: rpc transport creation arguments
2710  *
2711  * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2712  */
2713 static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2714 {
2715 	struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2716 	struct sock_xprt *transport;
2717 	struct rpc_xprt *xprt;
2718 	struct rpc_xprt *ret;
2719 
2720 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2721 			xprt_max_tcp_slot_table_entries);
2722 	if (IS_ERR(xprt))
2723 		return xprt;
2724 	transport = container_of(xprt, struct sock_xprt, xprt);
2725 
2726 	xprt->prot = 0;
2727 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2728 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2729 
2730 	xprt->bind_timeout = XS_BIND_TO;
2731 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2732 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2733 
2734 	xprt->ops = &xs_local_ops;
2735 	xprt->timeout = &xs_local_default_timeout;
2736 
2737 	INIT_DELAYED_WORK(&transport->connect_worker,
2738 			xs_dummy_setup_socket);
2739 
2740 	switch (sun->sun_family) {
2741 	case AF_LOCAL:
2742 		if (sun->sun_path[0] != '/') {
2743 			dprintk("RPC:       bad AF_LOCAL address: %s\n",
2744 					sun->sun_path);
2745 			ret = ERR_PTR(-EINVAL);
2746 			goto out_err;
2747 		}
2748 		xprt_set_bound(xprt);
2749 		xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2750 		ret = ERR_PTR(xs_local_setup_socket(transport));
2751 		if (ret)
2752 			goto out_err;
2753 		break;
2754 	default:
2755 		ret = ERR_PTR(-EAFNOSUPPORT);
2756 		goto out_err;
2757 	}
2758 
2759 	dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2760 			xprt->address_strings[RPC_DISPLAY_ADDR]);
2761 
2762 	if (try_module_get(THIS_MODULE))
2763 		return xprt;
2764 	ret = ERR_PTR(-EINVAL);
2765 out_err:
2766 	xs_xprt_free(xprt);
2767 	return ret;
2768 }
2769 
2770 static const struct rpc_timeout xs_udp_default_timeout = {
2771 	.to_initval = 5 * HZ,
2772 	.to_maxval = 30 * HZ,
2773 	.to_increment = 5 * HZ,
2774 	.to_retries = 5,
2775 };
2776 
2777 /**
2778  * xs_setup_udp - Set up transport to use a UDP socket
2779  * @args: rpc transport creation arguments
2780  *
2781  */
2782 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2783 {
2784 	struct sockaddr *addr = args->dstaddr;
2785 	struct rpc_xprt *xprt;
2786 	struct sock_xprt *transport;
2787 	struct rpc_xprt *ret;
2788 
2789 	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2790 			xprt_udp_slot_table_entries);
2791 	if (IS_ERR(xprt))
2792 		return xprt;
2793 	transport = container_of(xprt, struct sock_xprt, xprt);
2794 
2795 	xprt->prot = IPPROTO_UDP;
2796 	xprt->tsh_size = 0;
2797 	/* XXX: header size can vary due to auth type, IPv6, etc. */
2798 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2799 
2800 	xprt->bind_timeout = XS_BIND_TO;
2801 	xprt->reestablish_timeout = XS_UDP_REEST_TO;
2802 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2803 
2804 	xprt->ops = &xs_udp_ops;
2805 
2806 	xprt->timeout = &xs_udp_default_timeout;
2807 
2808 	switch (addr->sa_family) {
2809 	case AF_INET:
2810 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2811 			xprt_set_bound(xprt);
2812 
2813 		INIT_DELAYED_WORK(&transport->connect_worker,
2814 					xs_udp_setup_socket);
2815 		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2816 		break;
2817 	case AF_INET6:
2818 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2819 			xprt_set_bound(xprt);
2820 
2821 		INIT_DELAYED_WORK(&transport->connect_worker,
2822 					xs_udp_setup_socket);
2823 		xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2824 		break;
2825 	default:
2826 		ret = ERR_PTR(-EAFNOSUPPORT);
2827 		goto out_err;
2828 	}
2829 
2830 	if (xprt_bound(xprt))
2831 		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2832 				xprt->address_strings[RPC_DISPLAY_ADDR],
2833 				xprt->address_strings[RPC_DISPLAY_PORT],
2834 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2835 	else
2836 		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2837 				xprt->address_strings[RPC_DISPLAY_ADDR],
2838 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2839 
2840 	if (try_module_get(THIS_MODULE))
2841 		return xprt;
2842 	ret = ERR_PTR(-EINVAL);
2843 out_err:
2844 	xs_xprt_free(xprt);
2845 	return ret;
2846 }
2847 
2848 static const struct rpc_timeout xs_tcp_default_timeout = {
2849 	.to_initval = 60 * HZ,
2850 	.to_maxval = 60 * HZ,
2851 	.to_retries = 2,
2852 };
2853 
2854 /**
2855  * xs_setup_tcp - Set up transport to use a TCP socket
2856  * @args: rpc transport creation arguments
2857  *
2858  */
2859 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2860 {
2861 	struct sockaddr *addr = args->dstaddr;
2862 	struct rpc_xprt *xprt;
2863 	struct sock_xprt *transport;
2864 	struct rpc_xprt *ret;
2865 	unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries;
2866 
2867 	if (args->flags & XPRT_CREATE_INFINITE_SLOTS)
2868 		max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT;
2869 
2870 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2871 			max_slot_table_size);
2872 	if (IS_ERR(xprt))
2873 		return xprt;
2874 	transport = container_of(xprt, struct sock_xprt, xprt);
2875 
2876 	xprt->prot = IPPROTO_TCP;
2877 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2878 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2879 
2880 	xprt->bind_timeout = XS_BIND_TO;
2881 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2882 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2883 
2884 	xprt->ops = &xs_tcp_ops;
2885 	xprt->timeout = &xs_tcp_default_timeout;
2886 
2887 	switch (addr->sa_family) {
2888 	case AF_INET:
2889 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2890 			xprt_set_bound(xprt);
2891 
2892 		INIT_DELAYED_WORK(&transport->connect_worker,
2893 					xs_tcp_setup_socket);
2894 		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2895 		break;
2896 	case AF_INET6:
2897 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2898 			xprt_set_bound(xprt);
2899 
2900 		INIT_DELAYED_WORK(&transport->connect_worker,
2901 					xs_tcp_setup_socket);
2902 		xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2903 		break;
2904 	default:
2905 		ret = ERR_PTR(-EAFNOSUPPORT);
2906 		goto out_err;
2907 	}
2908 
2909 	if (xprt_bound(xprt))
2910 		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2911 				xprt->address_strings[RPC_DISPLAY_ADDR],
2912 				xprt->address_strings[RPC_DISPLAY_PORT],
2913 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2914 	else
2915 		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2916 				xprt->address_strings[RPC_DISPLAY_ADDR],
2917 				xprt->address_strings[RPC_DISPLAY_PROTO]);
2918 
2919 	if (try_module_get(THIS_MODULE))
2920 		return xprt;
2921 	ret = ERR_PTR(-EINVAL);
2922 out_err:
2923 	xs_xprt_free(xprt);
2924 	return ret;
2925 }
2926 
2927 /**
2928  * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2929  * @args: rpc transport creation arguments
2930  *
2931  */
2932 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2933 {
2934 	struct sockaddr *addr = args->dstaddr;
2935 	struct rpc_xprt *xprt;
2936 	struct sock_xprt *transport;
2937 	struct svc_sock *bc_sock;
2938 	struct rpc_xprt *ret;
2939 
2940 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2941 			xprt_tcp_slot_table_entries);
2942 	if (IS_ERR(xprt))
2943 		return xprt;
2944 	transport = container_of(xprt, struct sock_xprt, xprt);
2945 
2946 	xprt->prot = IPPROTO_TCP;
2947 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2948 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2949 	xprt->timeout = &xs_tcp_default_timeout;
2950 
2951 	/* backchannel */
2952 	xprt_set_bound(xprt);
2953 	xprt->bind_timeout = 0;
2954 	xprt->reestablish_timeout = 0;
2955 	xprt->idle_timeout = 0;
2956 
2957 	xprt->ops = &bc_tcp_ops;
2958 
2959 	switch (addr->sa_family) {
2960 	case AF_INET:
2961 		xs_format_peer_addresses(xprt, "tcp",
2962 					 RPCBIND_NETID_TCP);
2963 		break;
2964 	case AF_INET6:
2965 		xs_format_peer_addresses(xprt, "tcp",
2966 				   RPCBIND_NETID_TCP6);
2967 		break;
2968 	default:
2969 		ret = ERR_PTR(-EAFNOSUPPORT);
2970 		goto out_err;
2971 	}
2972 
2973 	dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2974 			xprt->address_strings[RPC_DISPLAY_ADDR],
2975 			xprt->address_strings[RPC_DISPLAY_PORT],
2976 			xprt->address_strings[RPC_DISPLAY_PROTO]);
2977 
2978 	/*
2979 	 * Once we've associated a backchannel xprt with a connection,
2980 	 * we want to keep it around as long as the connection lasts,
2981 	 * in case we need to start using it for a backchannel again;
2982 	 * this reference won't be dropped until bc_xprt is destroyed.
2983 	 */
2984 	xprt_get(xprt);
2985 	args->bc_xprt->xpt_bc_xprt = xprt;
2986 	xprt->bc_xprt = args->bc_xprt;
2987 	bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2988 	transport->sock = bc_sock->sk_sock;
2989 	transport->inet = bc_sock->sk_sk;
2990 
2991 	/*
2992 	 * Since we don't want connections for the backchannel, we set
2993 	 * the xprt status to connected
2994 	 */
2995 	xprt_set_connected(xprt);
2996 
2997 	if (try_module_get(THIS_MODULE))
2998 		return xprt;
2999 
3000 	args->bc_xprt->xpt_bc_xprt = NULL;
3001 	xprt_put(xprt);
3002 	ret = ERR_PTR(-EINVAL);
3003 out_err:
3004 	xs_xprt_free(xprt);
3005 	return ret;
3006 }
3007 
3008 static struct xprt_class	xs_local_transport = {
3009 	.list		= LIST_HEAD_INIT(xs_local_transport.list),
3010 	.name		= "named UNIX socket",
3011 	.owner		= THIS_MODULE,
3012 	.ident		= XPRT_TRANSPORT_LOCAL,
3013 	.setup		= xs_setup_local,
3014 };
3015 
3016 static struct xprt_class	xs_udp_transport = {
3017 	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
3018 	.name		= "udp",
3019 	.owner		= THIS_MODULE,
3020 	.ident		= XPRT_TRANSPORT_UDP,
3021 	.setup		= xs_setup_udp,
3022 };
3023 
3024 static struct xprt_class	xs_tcp_transport = {
3025 	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
3026 	.name		= "tcp",
3027 	.owner		= THIS_MODULE,
3028 	.ident		= XPRT_TRANSPORT_TCP,
3029 	.setup		= xs_setup_tcp,
3030 };
3031 
3032 static struct xprt_class	xs_bc_tcp_transport = {
3033 	.list		= LIST_HEAD_INIT(xs_bc_tcp_transport.list),
3034 	.name		= "tcp NFSv4.1 backchannel",
3035 	.owner		= THIS_MODULE,
3036 	.ident		= XPRT_TRANSPORT_BC_TCP,
3037 	.setup		= xs_setup_bc_tcp,
3038 };
3039 
3040 /**
3041  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
3042  *
3043  */
3044 int init_socket_xprt(void)
3045 {
3046 #ifdef RPC_DEBUG
3047 	if (!sunrpc_table_header)
3048 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
3049 #endif
3050 
3051 	xprt_register_transport(&xs_local_transport);
3052 	xprt_register_transport(&xs_udp_transport);
3053 	xprt_register_transport(&xs_tcp_transport);
3054 	xprt_register_transport(&xs_bc_tcp_transport);
3055 
3056 	return 0;
3057 }
3058 
3059 /**
3060  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
3061  *
3062  */
3063 void cleanup_socket_xprt(void)
3064 {
3065 #ifdef RPC_DEBUG
3066 	if (sunrpc_table_header) {
3067 		unregister_sysctl_table(sunrpc_table_header);
3068 		sunrpc_table_header = NULL;
3069 	}
3070 #endif
3071 
3072 	xprt_unregister_transport(&xs_local_transport);
3073 	xprt_unregister_transport(&xs_udp_transport);
3074 	xprt_unregister_transport(&xs_tcp_transport);
3075 	xprt_unregister_transport(&xs_bc_tcp_transport);
3076 }
3077 
3078 static int param_set_uint_minmax(const char *val,
3079 		const struct kernel_param *kp,
3080 		unsigned int min, unsigned int max)
3081 {
3082 	unsigned int num;
3083 	int ret;
3084 
3085 	if (!val)
3086 		return -EINVAL;
3087 	ret = kstrtouint(val, 0, &num);
3088 	if (ret == -EINVAL || num < min || num > max)
3089 		return -EINVAL;
3090 	*((unsigned int *)kp->arg) = num;
3091 	return 0;
3092 }
3093 
3094 static int param_set_portnr(const char *val, const struct kernel_param *kp)
3095 {
3096 	return param_set_uint_minmax(val, kp,
3097 			RPC_MIN_RESVPORT,
3098 			RPC_MAX_RESVPORT);
3099 }
3100 
3101 static struct kernel_param_ops param_ops_portnr = {
3102 	.set = param_set_portnr,
3103 	.get = param_get_uint,
3104 };
3105 
3106 #define param_check_portnr(name, p) \
3107 	__param_check(name, p, unsigned int);
3108 
3109 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
3110 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
3111 
3112 static int param_set_slot_table_size(const char *val,
3113 				     const struct kernel_param *kp)
3114 {
3115 	return param_set_uint_minmax(val, kp,
3116 			RPC_MIN_SLOT_TABLE,
3117 			RPC_MAX_SLOT_TABLE);
3118 }
3119 
3120 static struct kernel_param_ops param_ops_slot_table_size = {
3121 	.set = param_set_slot_table_size,
3122 	.get = param_get_uint,
3123 };
3124 
3125 #define param_check_slot_table_size(name, p) \
3126 	__param_check(name, p, unsigned int);
3127 
3128 static int param_set_max_slot_table_size(const char *val,
3129 				     const struct kernel_param *kp)
3130 {
3131 	return param_set_uint_minmax(val, kp,
3132 			RPC_MIN_SLOT_TABLE,
3133 			RPC_MAX_SLOT_TABLE_LIMIT);
3134 }
3135 
3136 static struct kernel_param_ops param_ops_max_slot_table_size = {
3137 	.set = param_set_max_slot_table_size,
3138 	.get = param_get_uint,
3139 };
3140 
3141 #define param_check_max_slot_table_size(name, p) \
3142 	__param_check(name, p, unsigned int);
3143 
3144 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
3145 		   slot_table_size, 0644);
3146 module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
3147 		   max_slot_table_size, 0644);
3148 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
3149 		   slot_table_size, 0644);
3150 
3151