xref: /openbmc/linux/net/sunrpc/xprtsock.c (revision 4417c8c41a51a2ae95b2a2fa2811640b368c4151)
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7  * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20 
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/capability.h>
24 #include <linux/pagemap.h>
25 #include <linux/errno.h>
26 #include <linux/socket.h>
27 #include <linux/in.h>
28 #include <linux/net.h>
29 #include <linux/mm.h>
30 #include <linux/udp.h>
31 #include <linux/tcp.h>
32 #include <linux/sunrpc/clnt.h>
33 #include <linux/sunrpc/sched.h>
34 #include <linux/file.h>
35 
36 #include <net/sock.h>
37 #include <net/checksum.h>
38 #include <net/udp.h>
39 #include <net/tcp.h>
40 
41 /*
42  * xprtsock tunables
43  */
44 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
45 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
46 
47 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
48 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
49 
50 /*
51  * We can register our own files under /proc/sys/sunrpc by
52  * calling register_sysctl_table() again.  The files in that
53  * directory become the union of all files registered there.
54  *
55  * We simply need to make sure that we don't collide with
56  * someone else's file names!
57  */
58 
59 #ifdef RPC_DEBUG
60 
61 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
62 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
63 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
64 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
65 
66 static struct ctl_table_header *sunrpc_table_header;
67 
68 /*
69  * FIXME: changing the UDP slot table size should also resize the UDP
70  *        socket buffers for existing UDP transports
71  */
72 static ctl_table xs_tunables_table[] = {
73 	{
74 		.ctl_name	= CTL_SLOTTABLE_UDP,
75 		.procname	= "udp_slot_table_entries",
76 		.data		= &xprt_udp_slot_table_entries,
77 		.maxlen		= sizeof(unsigned int),
78 		.mode		= 0644,
79 		.proc_handler	= &proc_dointvec_minmax,
80 		.strategy	= &sysctl_intvec,
81 		.extra1		= &min_slot_table_size,
82 		.extra2		= &max_slot_table_size
83 	},
84 	{
85 		.ctl_name	= CTL_SLOTTABLE_TCP,
86 		.procname	= "tcp_slot_table_entries",
87 		.data		= &xprt_tcp_slot_table_entries,
88 		.maxlen		= sizeof(unsigned int),
89 		.mode		= 0644,
90 		.proc_handler	= &proc_dointvec_minmax,
91 		.strategy	= &sysctl_intvec,
92 		.extra1		= &min_slot_table_size,
93 		.extra2		= &max_slot_table_size
94 	},
95 	{
96 		.ctl_name	= CTL_MIN_RESVPORT,
97 		.procname	= "min_resvport",
98 		.data		= &xprt_min_resvport,
99 		.maxlen		= sizeof(unsigned int),
100 		.mode		= 0644,
101 		.proc_handler	= &proc_dointvec_minmax,
102 		.strategy	= &sysctl_intvec,
103 		.extra1		= &xprt_min_resvport_limit,
104 		.extra2		= &xprt_max_resvport_limit
105 	},
106 	{
107 		.ctl_name	= CTL_MAX_RESVPORT,
108 		.procname	= "max_resvport",
109 		.data		= &xprt_max_resvport,
110 		.maxlen		= sizeof(unsigned int),
111 		.mode		= 0644,
112 		.proc_handler	= &proc_dointvec_minmax,
113 		.strategy	= &sysctl_intvec,
114 		.extra1		= &xprt_min_resvport_limit,
115 		.extra2		= &xprt_max_resvport_limit
116 	},
117 	{
118 		.ctl_name = 0,
119 	},
120 };
121 
122 static ctl_table sunrpc_table[] = {
123 	{
124 		.ctl_name	= CTL_SUNRPC,
125 		.procname	= "sunrpc",
126 		.mode		= 0555,
127 		.child		= xs_tunables_table
128 	},
129 	{
130 		.ctl_name = 0,
131 	},
132 };
133 
134 #endif
135 
136 /*
137  * How many times to try sending a request on a socket before waiting
138  * for the socket buffer to clear.
139  */
140 #define XS_SENDMSG_RETRY	(10U)
141 
142 /*
143  * Time out for an RPC UDP socket connect.  UDP socket connects are
144  * synchronous, but we set a timeout anyway in case of resource
145  * exhaustion on the local host.
146  */
147 #define XS_UDP_CONN_TO		(5U * HZ)
148 
149 /*
150  * Wait duration for an RPC TCP connection to be established.  Solaris
151  * NFS over TCP uses 60 seconds, for example, which is in line with how
152  * long a server takes to reboot.
153  */
154 #define XS_TCP_CONN_TO		(60U * HZ)
155 
156 /*
157  * Wait duration for a reply from the RPC portmapper.
158  */
159 #define XS_BIND_TO		(60U * HZ)
160 
161 /*
162  * Delay if a UDP socket connect error occurs.  This is most likely some
163  * kind of resource problem on the local host.
164  */
165 #define XS_UDP_REEST_TO		(2U * HZ)
166 
167 /*
168  * The reestablish timeout allows clients to delay for a bit before attempting
169  * to reconnect to a server that just dropped our connection.
170  *
171  * We implement an exponential backoff when trying to reestablish a TCP
172  * transport connection with the server.  Some servers like to drop a TCP
173  * connection when they are overworked, so we start with a short timeout and
174  * increase over time if the server is down or not responding.
175  */
176 #define XS_TCP_INIT_REEST_TO	(3U * HZ)
177 #define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
178 
179 /*
180  * TCP idle timeout; client drops the transport socket if it is idle
181  * for this long.  Note that we also timeout UDP sockets to prevent
182  * holding port numbers when there is no RPC traffic.
183  */
184 #define XS_IDLE_DISC_TO		(5U * 60 * HZ)
185 
186 #ifdef RPC_DEBUG
187 # undef  RPC_DEBUG_DATA
188 # define RPCDBG_FACILITY	RPCDBG_TRANS
189 #endif
190 
191 #ifdef RPC_DEBUG_DATA
192 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
193 {
194 	u8 *buf = (u8 *) packet;
195 	int j;
196 
197 	dprintk("RPC:       %s\n", msg);
198 	for (j = 0; j < count && j < 128; j += 4) {
199 		if (!(j & 31)) {
200 			if (j)
201 				dprintk("\n");
202 			dprintk("0x%04x ", j);
203 		}
204 		dprintk("%02x%02x%02x%02x ",
205 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
206 	}
207 	dprintk("\n");
208 }
209 #else
210 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
211 {
212 	/* NOP */
213 }
214 #endif
215 
216 struct sock_xprt {
217 	struct rpc_xprt		xprt;
218 
219 	/*
220 	 * Network layer
221 	 */
222 	struct socket *		sock;
223 	struct sock *		inet;
224 
225 	/*
226 	 * State of TCP reply receive
227 	 */
228 	__be32			tcp_fraghdr,
229 				tcp_xid;
230 
231 	u32			tcp_offset,
232 				tcp_reclen;
233 
234 	unsigned long		tcp_copied,
235 				tcp_flags;
236 
237 	/*
238 	 * Connection of transports
239 	 */
240 	struct delayed_work	connect_worker;
241 	struct sockaddr_storage	addr;
242 	unsigned short		port;
243 
244 	/*
245 	 * UDP socket buffer size parameters
246 	 */
247 	size_t			rcvsize,
248 				sndsize;
249 
250 	/*
251 	 * Saved socket callback addresses
252 	 */
253 	void			(*old_data_ready)(struct sock *, int);
254 	void			(*old_state_change)(struct sock *);
255 	void			(*old_write_space)(struct sock *);
256 };
257 
258 /*
259  * TCP receive state flags
260  */
261 #define TCP_RCV_LAST_FRAG	(1UL << 0)
262 #define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
263 #define TCP_RCV_COPY_XID	(1UL << 2)
264 #define TCP_RCV_COPY_DATA	(1UL << 3)
265 
266 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
267 {
268 	return (struct sockaddr *) &xprt->addr;
269 }
270 
271 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
272 {
273 	return (struct sockaddr_in *) &xprt->addr;
274 }
275 
276 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
277 {
278 	return (struct sockaddr_in6 *) &xprt->addr;
279 }
280 
281 static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt)
282 {
283 	struct sockaddr_in *addr = xs_addr_in(xprt);
284 	char *buf;
285 
286 	buf = kzalloc(20, GFP_KERNEL);
287 	if (buf) {
288 		snprintf(buf, 20, NIPQUAD_FMT,
289 				NIPQUAD(addr->sin_addr.s_addr));
290 	}
291 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
292 
293 	buf = kzalloc(8, GFP_KERNEL);
294 	if (buf) {
295 		snprintf(buf, 8, "%u",
296 				ntohs(addr->sin_port));
297 	}
298 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
299 
300 	buf = kzalloc(8, GFP_KERNEL);
301 	if (buf) {
302 		if (xprt->prot == IPPROTO_UDP)
303 			snprintf(buf, 8, "udp");
304 		else
305 			snprintf(buf, 8, "tcp");
306 	}
307 	xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
308 
309 	buf = kzalloc(48, GFP_KERNEL);
310 	if (buf) {
311 		snprintf(buf, 48, "addr="NIPQUAD_FMT" port=%u proto=%s",
312 			NIPQUAD(addr->sin_addr.s_addr),
313 			ntohs(addr->sin_port),
314 			xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
315 	}
316 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
317 
318 	buf = kzalloc(10, GFP_KERNEL);
319 	if (buf) {
320 		snprintf(buf, 10, "%02x%02x%02x%02x",
321 				NIPQUAD(addr->sin_addr.s_addr));
322 	}
323 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
324 
325 	buf = kzalloc(8, GFP_KERNEL);
326 	if (buf) {
327 		snprintf(buf, 8, "%4hx",
328 				ntohs(addr->sin_port));
329 	}
330 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
331 
332 	buf = kzalloc(30, GFP_KERNEL);
333 	if (buf) {
334 		snprintf(buf, 30, NIPQUAD_FMT".%u.%u",
335 				NIPQUAD(addr->sin_addr.s_addr),
336 				ntohs(addr->sin_port) >> 8,
337 				ntohs(addr->sin_port) & 0xff);
338 	}
339 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
340 
341 	xprt->address_strings[RPC_DISPLAY_NETID] =
342 		kstrdup(xprt->prot == IPPROTO_UDP ?
343 			RPCBIND_NETID_UDP : RPCBIND_NETID_TCP, GFP_KERNEL);
344 }
345 
346 static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt)
347 {
348 	struct sockaddr_in6 *addr = xs_addr_in6(xprt);
349 	char *buf;
350 
351 	buf = kzalloc(40, GFP_KERNEL);
352 	if (buf) {
353 		snprintf(buf, 40, NIP6_FMT,
354 				NIP6(addr->sin6_addr));
355 	}
356 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
357 
358 	buf = kzalloc(8, GFP_KERNEL);
359 	if (buf) {
360 		snprintf(buf, 8, "%u",
361 				ntohs(addr->sin6_port));
362 	}
363 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
364 
365 	buf = kzalloc(8, GFP_KERNEL);
366 	if (buf) {
367 		if (xprt->prot == IPPROTO_UDP)
368 			snprintf(buf, 8, "udp");
369 		else
370 			snprintf(buf, 8, "tcp");
371 	}
372 	xprt->address_strings[RPC_DISPLAY_PROTO] = buf;
373 
374 	buf = kzalloc(64, GFP_KERNEL);
375 	if (buf) {
376 		snprintf(buf, 64, "addr="NIP6_FMT" port=%u proto=%s",
377 				NIP6(addr->sin6_addr),
378 				ntohs(addr->sin6_port),
379 				xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
380 	}
381 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
382 
383 	buf = kzalloc(36, GFP_KERNEL);
384 	if (buf) {
385 		snprintf(buf, 36, NIP6_SEQFMT,
386 				NIP6(addr->sin6_addr));
387 	}
388 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
389 
390 	buf = kzalloc(8, GFP_KERNEL);
391 	if (buf) {
392 		snprintf(buf, 8, "%4hx",
393 				ntohs(addr->sin6_port));
394 	}
395 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
396 
397 	buf = kzalloc(50, GFP_KERNEL);
398 	if (buf) {
399 		snprintf(buf, 50, NIP6_FMT".%u.%u",
400 				NIP6(addr->sin6_addr),
401 				ntohs(addr->sin6_port) >> 8,
402 				ntohs(addr->sin6_port) & 0xff);
403 	}
404 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
405 
406 	xprt->address_strings[RPC_DISPLAY_NETID] =
407 		kstrdup(xprt->prot == IPPROTO_UDP ?
408 			RPCBIND_NETID_UDP6 : RPCBIND_NETID_TCP6, GFP_KERNEL);
409 }
410 
411 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
412 {
413 	int i;
414 
415 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
416 		kfree(xprt->address_strings[i]);
417 }
418 
419 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
420 
421 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
422 {
423 	struct msghdr msg = {
424 		.msg_name	= addr,
425 		.msg_namelen	= addrlen,
426 		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
427 	};
428 	struct kvec iov = {
429 		.iov_base	= vec->iov_base + base,
430 		.iov_len	= vec->iov_len - base,
431 	};
432 
433 	if (iov.iov_len != 0)
434 		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
435 	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
436 }
437 
438 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
439 {
440 	struct page **ppage;
441 	unsigned int remainder;
442 	int err, sent = 0;
443 
444 	remainder = xdr->page_len - base;
445 	base += xdr->page_base;
446 	ppage = xdr->pages + (base >> PAGE_SHIFT);
447 	base &= ~PAGE_MASK;
448 	for(;;) {
449 		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
450 		int flags = XS_SENDMSG_FLAGS;
451 
452 		remainder -= len;
453 		if (remainder != 0 || more)
454 			flags |= MSG_MORE;
455 		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
456 		if (remainder == 0 || err != len)
457 			break;
458 		sent += err;
459 		ppage++;
460 		base = 0;
461 	}
462 	if (sent == 0)
463 		return err;
464 	if (err > 0)
465 		sent += err;
466 	return sent;
467 }
468 
469 /**
470  * xs_sendpages - write pages directly to a socket
471  * @sock: socket to send on
472  * @addr: UDP only -- address of destination
473  * @addrlen: UDP only -- length of destination address
474  * @xdr: buffer containing this request
475  * @base: starting position in the buffer
476  *
477  */
478 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
479 {
480 	unsigned int remainder = xdr->len - base;
481 	int err, sent = 0;
482 
483 	if (unlikely(!sock))
484 		return -ENOTCONN;
485 
486 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
487 	if (base != 0) {
488 		addr = NULL;
489 		addrlen = 0;
490 	}
491 
492 	if (base < xdr->head[0].iov_len || addr != NULL) {
493 		unsigned int len = xdr->head[0].iov_len - base;
494 		remainder -= len;
495 		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
496 		if (remainder == 0 || err != len)
497 			goto out;
498 		sent += err;
499 		base = 0;
500 	} else
501 		base -= xdr->head[0].iov_len;
502 
503 	if (base < xdr->page_len) {
504 		unsigned int len = xdr->page_len - base;
505 		remainder -= len;
506 		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
507 		if (remainder == 0 || err != len)
508 			goto out;
509 		sent += err;
510 		base = 0;
511 	} else
512 		base -= xdr->page_len;
513 
514 	if (base >= xdr->tail[0].iov_len)
515 		return sent;
516 	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
517 out:
518 	if (sent == 0)
519 		return err;
520 	if (err > 0)
521 		sent += err;
522 	return sent;
523 }
524 
525 /**
526  * xs_nospace - place task on wait queue if transmit was incomplete
527  * @task: task to put to sleep
528  *
529  */
530 static void xs_nospace(struct rpc_task *task)
531 {
532 	struct rpc_rqst *req = task->tk_rqstp;
533 	struct rpc_xprt *xprt = req->rq_xprt;
534 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
535 
536 	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
537 			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
538 			req->rq_slen);
539 
540 	if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
541 		/* Protect against races with write_space */
542 		spin_lock_bh(&xprt->transport_lock);
543 
544 		/* Don't race with disconnect */
545 		if (!xprt_connected(xprt))
546 			task->tk_status = -ENOTCONN;
547 		else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
548 			xprt_wait_for_buffer_space(task);
549 
550 		spin_unlock_bh(&xprt->transport_lock);
551 	} else
552 		/* Keep holding the socket if it is blocked */
553 		rpc_delay(task, HZ>>4);
554 }
555 
556 /**
557  * xs_udp_send_request - write an RPC request to a UDP socket
558  * @task: address of RPC task that manages the state of an RPC request
559  *
560  * Return values:
561  *        0:	The request has been sent
562  *   EAGAIN:	The socket was blocked, please call again later to
563  *		complete the request
564  * ENOTCONN:	Caller needs to invoke connect logic then call again
565  *    other:	Some other error occured, the request was not sent
566  */
567 static int xs_udp_send_request(struct rpc_task *task)
568 {
569 	struct rpc_rqst *req = task->tk_rqstp;
570 	struct rpc_xprt *xprt = req->rq_xprt;
571 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
572 	struct xdr_buf *xdr = &req->rq_snd_buf;
573 	int status;
574 
575 	xs_pktdump("packet data:",
576 				req->rq_svec->iov_base,
577 				req->rq_svec->iov_len);
578 
579 	req->rq_xtime = jiffies;
580 	status = xs_sendpages(transport->sock,
581 			      xs_addr(xprt),
582 			      xprt->addrlen, xdr,
583 			      req->rq_bytes_sent);
584 
585 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
586 			xdr->len - req->rq_bytes_sent, status);
587 
588 	if (likely(status >= (int) req->rq_slen))
589 		return 0;
590 
591 	/* Still some bytes left; set up for a retry later. */
592 	if (status > 0)
593 		status = -EAGAIN;
594 
595 	switch (status) {
596 	case -ENETUNREACH:
597 	case -EPIPE:
598 	case -ECONNREFUSED:
599 		/* When the server has died, an ICMP port unreachable message
600 		 * prompts ECONNREFUSED. */
601 		break;
602 	case -EAGAIN:
603 		xs_nospace(task);
604 		break;
605 	default:
606 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
607 			-status);
608 		break;
609 	}
610 
611 	return status;
612 }
613 
614 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
615 {
616 	u32 reclen = buf->len - sizeof(rpc_fraghdr);
617 	rpc_fraghdr *base = buf->head[0].iov_base;
618 	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
619 }
620 
621 /**
622  * xs_tcp_send_request - write an RPC request to a TCP socket
623  * @task: address of RPC task that manages the state of an RPC request
624  *
625  * Return values:
626  *        0:	The request has been sent
627  *   EAGAIN:	The socket was blocked, please call again later to
628  *		complete the request
629  * ENOTCONN:	Caller needs to invoke connect logic then call again
630  *    other:	Some other error occured, the request was not sent
631  *
632  * XXX: In the case of soft timeouts, should we eventually give up
633  *	if sendmsg is not able to make progress?
634  */
635 static int xs_tcp_send_request(struct rpc_task *task)
636 {
637 	struct rpc_rqst *req = task->tk_rqstp;
638 	struct rpc_xprt *xprt = req->rq_xprt;
639 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
640 	struct xdr_buf *xdr = &req->rq_snd_buf;
641 	int status;
642 	unsigned int retry = 0;
643 
644 	xs_encode_tcp_record_marker(&req->rq_snd_buf);
645 
646 	xs_pktdump("packet data:",
647 				req->rq_svec->iov_base,
648 				req->rq_svec->iov_len);
649 
650 	/* Continue transmitting the packet/record. We must be careful
651 	 * to cope with writespace callbacks arriving _after_ we have
652 	 * called sendmsg(). */
653 	while (1) {
654 		req->rq_xtime = jiffies;
655 		status = xs_sendpages(transport->sock,
656 					NULL, 0, xdr, req->rq_bytes_sent);
657 
658 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
659 				xdr->len - req->rq_bytes_sent, status);
660 
661 		if (unlikely(status < 0))
662 			break;
663 
664 		/* If we've sent the entire packet, immediately
665 		 * reset the count of bytes sent. */
666 		req->rq_bytes_sent += status;
667 		task->tk_bytes_sent += status;
668 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
669 			req->rq_bytes_sent = 0;
670 			return 0;
671 		}
672 
673 		status = -EAGAIN;
674 		if (retry++ > XS_SENDMSG_RETRY)
675 			break;
676 	}
677 
678 	switch (status) {
679 	case -EAGAIN:
680 		xs_nospace(task);
681 		break;
682 	case -ECONNREFUSED:
683 	case -ECONNRESET:
684 	case -ENOTCONN:
685 	case -EPIPE:
686 		status = -ENOTCONN;
687 		break;
688 	default:
689 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
690 			-status);
691 		xprt_disconnect(xprt);
692 		break;
693 	}
694 
695 	return status;
696 }
697 
698 /**
699  * xs_tcp_release_xprt - clean up after a tcp transmission
700  * @xprt: transport
701  * @task: rpc task
702  *
703  * This cleans up if an error causes us to abort the transmission of a request.
704  * In this case, the socket may need to be reset in order to avoid confusing
705  * the server.
706  */
707 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
708 {
709 	struct rpc_rqst *req;
710 
711 	if (task != xprt->snd_task)
712 		return;
713 	if (task == NULL)
714 		goto out_release;
715 	req = task->tk_rqstp;
716 	if (req->rq_bytes_sent == 0)
717 		goto out_release;
718 	if (req->rq_bytes_sent == req->rq_snd_buf.len)
719 		goto out_release;
720 	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
721 out_release:
722 	xprt_release_xprt(xprt, task);
723 }
724 
725 /**
726  * xs_close - close a socket
727  * @xprt: transport
728  *
729  * This is used when all requests are complete; ie, no DRC state remains
730  * on the server we want to save.
731  */
732 static void xs_close(struct rpc_xprt *xprt)
733 {
734 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
735 	struct socket *sock = transport->sock;
736 	struct sock *sk = transport->inet;
737 
738 	if (!sk)
739 		goto clear_close_wait;
740 
741 	dprintk("RPC:       xs_close xprt %p\n", xprt);
742 
743 	write_lock_bh(&sk->sk_callback_lock);
744 	transport->inet = NULL;
745 	transport->sock = NULL;
746 
747 	sk->sk_user_data = NULL;
748 	sk->sk_data_ready = transport->old_data_ready;
749 	sk->sk_state_change = transport->old_state_change;
750 	sk->sk_write_space = transport->old_write_space;
751 	write_unlock_bh(&sk->sk_callback_lock);
752 
753 	sk->sk_no_check = 0;
754 
755 	sock_release(sock);
756 clear_close_wait:
757 	smp_mb__before_clear_bit();
758 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
759 	smp_mb__after_clear_bit();
760 }
761 
762 /**
763  * xs_destroy - prepare to shutdown a transport
764  * @xprt: doomed transport
765  *
766  */
767 static void xs_destroy(struct rpc_xprt *xprt)
768 {
769 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
770 
771 	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
772 
773 	cancel_rearming_delayed_work(&transport->connect_worker);
774 
775 	xprt_disconnect(xprt);
776 	xs_close(xprt);
777 	xs_free_peer_addresses(xprt);
778 	kfree(xprt->slot);
779 	kfree(xprt);
780 }
781 
782 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
783 {
784 	return (struct rpc_xprt *) sk->sk_user_data;
785 }
786 
787 /**
788  * xs_udp_data_ready - "data ready" callback for UDP sockets
789  * @sk: socket with data to read
790  * @len: how much data to read
791  *
792  */
793 static void xs_udp_data_ready(struct sock *sk, int len)
794 {
795 	struct rpc_task *task;
796 	struct rpc_xprt *xprt;
797 	struct rpc_rqst *rovr;
798 	struct sk_buff *skb;
799 	int err, repsize, copied;
800 	u32 _xid;
801 	__be32 *xp;
802 
803 	read_lock(&sk->sk_callback_lock);
804 	dprintk("RPC:       xs_udp_data_ready...\n");
805 	if (!(xprt = xprt_from_sock(sk)))
806 		goto out;
807 
808 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
809 		goto out;
810 
811 	if (xprt->shutdown)
812 		goto dropit;
813 
814 	repsize = skb->len - sizeof(struct udphdr);
815 	if (repsize < 4) {
816 		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
817 		goto dropit;
818 	}
819 
820 	/* Copy the XID from the skb... */
821 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
822 				sizeof(_xid), &_xid);
823 	if (xp == NULL)
824 		goto dropit;
825 
826 	/* Look up and lock the request corresponding to the given XID */
827 	spin_lock(&xprt->transport_lock);
828 	rovr = xprt_lookup_rqst(xprt, *xp);
829 	if (!rovr)
830 		goto out_unlock;
831 	task = rovr->rq_task;
832 
833 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
834 		copied = repsize;
835 
836 	/* Suck it into the iovec, verify checksum if not done by hw. */
837 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
838 		goto out_unlock;
839 
840 	/* Something worked... */
841 	dst_confirm(skb->dst);
842 
843 	xprt_adjust_cwnd(task, copied);
844 	xprt_update_rtt(task);
845 	xprt_complete_rqst(task, copied);
846 
847  out_unlock:
848 	spin_unlock(&xprt->transport_lock);
849  dropit:
850 	skb_free_datagram(sk, skb);
851  out:
852 	read_unlock(&sk->sk_callback_lock);
853 }
854 
855 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
856 {
857 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
858 	size_t len, used;
859 	char *p;
860 
861 	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
862 	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
863 	used = xdr_skb_read_bits(desc, p, len);
864 	transport->tcp_offset += used;
865 	if (used != len)
866 		return;
867 
868 	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
869 	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
870 		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
871 	else
872 		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
873 	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
874 
875 	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
876 	transport->tcp_offset = 0;
877 
878 	/* Sanity check of the record length */
879 	if (unlikely(transport->tcp_reclen < 4)) {
880 		dprintk("RPC:       invalid TCP record fragment length\n");
881 		xprt_disconnect(xprt);
882 		return;
883 	}
884 	dprintk("RPC:       reading TCP record fragment of length %d\n",
885 			transport->tcp_reclen);
886 }
887 
888 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
889 {
890 	if (transport->tcp_offset == transport->tcp_reclen) {
891 		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
892 		transport->tcp_offset = 0;
893 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
894 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
895 			transport->tcp_flags |= TCP_RCV_COPY_XID;
896 			transport->tcp_copied = 0;
897 		}
898 	}
899 }
900 
901 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
902 {
903 	size_t len, used;
904 	char *p;
905 
906 	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
907 	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
908 	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
909 	used = xdr_skb_read_bits(desc, p, len);
910 	transport->tcp_offset += used;
911 	if (used != len)
912 		return;
913 	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
914 	transport->tcp_flags |= TCP_RCV_COPY_DATA;
915 	transport->tcp_copied = 4;
916 	dprintk("RPC:       reading reply for XID %08x\n",
917 			ntohl(transport->tcp_xid));
918 	xs_tcp_check_fraghdr(transport);
919 }
920 
921 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
922 {
923 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
924 	struct rpc_rqst *req;
925 	struct xdr_buf *rcvbuf;
926 	size_t len;
927 	ssize_t r;
928 
929 	/* Find and lock the request corresponding to this xid */
930 	spin_lock(&xprt->transport_lock);
931 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
932 	if (!req) {
933 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
934 		dprintk("RPC:       XID %08x request not found!\n",
935 				ntohl(transport->tcp_xid));
936 		spin_unlock(&xprt->transport_lock);
937 		return;
938 	}
939 
940 	rcvbuf = &req->rq_private_buf;
941 	len = desc->count;
942 	if (len > transport->tcp_reclen - transport->tcp_offset) {
943 		struct xdr_skb_reader my_desc;
944 
945 		len = transport->tcp_reclen - transport->tcp_offset;
946 		memcpy(&my_desc, desc, sizeof(my_desc));
947 		my_desc.count = len;
948 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
949 					  &my_desc, xdr_skb_read_bits);
950 		desc->count -= r;
951 		desc->offset += r;
952 	} else
953 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
954 					  desc, xdr_skb_read_bits);
955 
956 	if (r > 0) {
957 		transport->tcp_copied += r;
958 		transport->tcp_offset += r;
959 	}
960 	if (r != len) {
961 		/* Error when copying to the receive buffer,
962 		 * usually because we weren't able to allocate
963 		 * additional buffer pages. All we can do now
964 		 * is turn off TCP_RCV_COPY_DATA, so the request
965 		 * will not receive any additional updates,
966 		 * and time out.
967 		 * Any remaining data from this record will
968 		 * be discarded.
969 		 */
970 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
971 		dprintk("RPC:       XID %08x truncated request\n",
972 				ntohl(transport->tcp_xid));
973 		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
974 				"tcp_offset = %u, tcp_reclen = %u\n",
975 				xprt, transport->tcp_copied,
976 				transport->tcp_offset, transport->tcp_reclen);
977 		goto out;
978 	}
979 
980 	dprintk("RPC:       XID %08x read %Zd bytes\n",
981 			ntohl(transport->tcp_xid), r);
982 	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
983 			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
984 			transport->tcp_offset, transport->tcp_reclen);
985 
986 	if (transport->tcp_copied == req->rq_private_buf.buflen)
987 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
988 	else if (transport->tcp_offset == transport->tcp_reclen) {
989 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
990 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
991 	}
992 
993 out:
994 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
995 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
996 	spin_unlock(&xprt->transport_lock);
997 	xs_tcp_check_fraghdr(transport);
998 }
999 
1000 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1001 {
1002 	size_t len;
1003 
1004 	len = transport->tcp_reclen - transport->tcp_offset;
1005 	if (len > desc->count)
1006 		len = desc->count;
1007 	desc->count -= len;
1008 	desc->offset += len;
1009 	transport->tcp_offset += len;
1010 	dprintk("RPC:       discarded %Zu bytes\n", len);
1011 	xs_tcp_check_fraghdr(transport);
1012 }
1013 
1014 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1015 {
1016 	struct rpc_xprt *xprt = rd_desc->arg.data;
1017 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1018 	struct xdr_skb_reader desc = {
1019 		.skb	= skb,
1020 		.offset	= offset,
1021 		.count	= len,
1022 	};
1023 
1024 	dprintk("RPC:       xs_tcp_data_recv started\n");
1025 	do {
1026 		/* Read in a new fragment marker if necessary */
1027 		/* Can we ever really expect to get completely empty fragments? */
1028 		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1029 			xs_tcp_read_fraghdr(xprt, &desc);
1030 			continue;
1031 		}
1032 		/* Read in the xid if necessary */
1033 		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1034 			xs_tcp_read_xid(transport, &desc);
1035 			continue;
1036 		}
1037 		/* Read in the request data */
1038 		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1039 			xs_tcp_read_request(xprt, &desc);
1040 			continue;
1041 		}
1042 		/* Skip over any trailing bytes on short reads */
1043 		xs_tcp_read_discard(transport, &desc);
1044 	} while (desc.count);
1045 	dprintk("RPC:       xs_tcp_data_recv done\n");
1046 	return len - desc.count;
1047 }
1048 
1049 /**
1050  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1051  * @sk: socket with data to read
1052  * @bytes: how much data to read
1053  *
1054  */
1055 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1056 {
1057 	struct rpc_xprt *xprt;
1058 	read_descriptor_t rd_desc;
1059 
1060 	dprintk("RPC:       xs_tcp_data_ready...\n");
1061 
1062 	read_lock(&sk->sk_callback_lock);
1063 	if (!(xprt = xprt_from_sock(sk)))
1064 		goto out;
1065 	if (xprt->shutdown)
1066 		goto out;
1067 
1068 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1069 	rd_desc.arg.data = xprt;
1070 	rd_desc.count = 65536;
1071 	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1072 out:
1073 	read_unlock(&sk->sk_callback_lock);
1074 }
1075 
1076 /**
1077  * xs_tcp_state_change - callback to handle TCP socket state changes
1078  * @sk: socket whose state has changed
1079  *
1080  */
1081 static void xs_tcp_state_change(struct sock *sk)
1082 {
1083 	struct rpc_xprt *xprt;
1084 
1085 	read_lock(&sk->sk_callback_lock);
1086 	if (!(xprt = xprt_from_sock(sk)))
1087 		goto out;
1088 	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1089 	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1090 			sk->sk_state, xprt_connected(xprt),
1091 			sock_flag(sk, SOCK_DEAD),
1092 			sock_flag(sk, SOCK_ZAPPED));
1093 
1094 	switch (sk->sk_state) {
1095 	case TCP_ESTABLISHED:
1096 		spin_lock_bh(&xprt->transport_lock);
1097 		if (!xprt_test_and_set_connected(xprt)) {
1098 			struct sock_xprt *transport = container_of(xprt,
1099 					struct sock_xprt, xprt);
1100 
1101 			/* Reset TCP record info */
1102 			transport->tcp_offset = 0;
1103 			transport->tcp_reclen = 0;
1104 			transport->tcp_copied = 0;
1105 			transport->tcp_flags =
1106 				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1107 
1108 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1109 			xprt_wake_pending_tasks(xprt, 0);
1110 		}
1111 		spin_unlock_bh(&xprt->transport_lock);
1112 		break;
1113 	case TCP_SYN_SENT:
1114 	case TCP_SYN_RECV:
1115 		break;
1116 	case TCP_CLOSE_WAIT:
1117 		/* Try to schedule an autoclose RPC calls */
1118 		set_bit(XPRT_CLOSE_WAIT, &xprt->state);
1119 		if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
1120 			queue_work(rpciod_workqueue, &xprt->task_cleanup);
1121 	default:
1122 		xprt_disconnect(xprt);
1123 	}
1124  out:
1125 	read_unlock(&sk->sk_callback_lock);
1126 }
1127 
1128 /**
1129  * xs_udp_write_space - callback invoked when socket buffer space
1130  *                             becomes available
1131  * @sk: socket whose state has changed
1132  *
1133  * Called when more output buffer space is available for this socket.
1134  * We try not to wake our writers until they can make "significant"
1135  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1136  * with a bunch of small requests.
1137  */
1138 static void xs_udp_write_space(struct sock *sk)
1139 {
1140 	read_lock(&sk->sk_callback_lock);
1141 
1142 	/* from net/core/sock.c:sock_def_write_space */
1143 	if (sock_writeable(sk)) {
1144 		struct socket *sock;
1145 		struct rpc_xprt *xprt;
1146 
1147 		if (unlikely(!(sock = sk->sk_socket)))
1148 			goto out;
1149 		if (unlikely(!(xprt = xprt_from_sock(sk))))
1150 			goto out;
1151 		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
1152 			goto out;
1153 
1154 		xprt_write_space(xprt);
1155 	}
1156 
1157  out:
1158 	read_unlock(&sk->sk_callback_lock);
1159 }
1160 
1161 /**
1162  * xs_tcp_write_space - callback invoked when socket buffer space
1163  *                             becomes available
1164  * @sk: socket whose state has changed
1165  *
1166  * Called when more output buffer space is available for this socket.
1167  * We try not to wake our writers until they can make "significant"
1168  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1169  * with a bunch of small requests.
1170  */
1171 static void xs_tcp_write_space(struct sock *sk)
1172 {
1173 	read_lock(&sk->sk_callback_lock);
1174 
1175 	/* from net/core/stream.c:sk_stream_write_space */
1176 	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
1177 		struct socket *sock;
1178 		struct rpc_xprt *xprt;
1179 
1180 		if (unlikely(!(sock = sk->sk_socket)))
1181 			goto out;
1182 		if (unlikely(!(xprt = xprt_from_sock(sk))))
1183 			goto out;
1184 		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
1185 			goto out;
1186 
1187 		xprt_write_space(xprt);
1188 	}
1189 
1190  out:
1191 	read_unlock(&sk->sk_callback_lock);
1192 }
1193 
1194 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1195 {
1196 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1197 	struct sock *sk = transport->inet;
1198 
1199 	if (transport->rcvsize) {
1200 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1201 		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1202 	}
1203 	if (transport->sndsize) {
1204 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1205 		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1206 		sk->sk_write_space(sk);
1207 	}
1208 }
1209 
1210 /**
1211  * xs_udp_set_buffer_size - set send and receive limits
1212  * @xprt: generic transport
1213  * @sndsize: requested size of send buffer, in bytes
1214  * @rcvsize: requested size of receive buffer, in bytes
1215  *
1216  * Set socket send and receive buffer size limits.
1217  */
1218 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1219 {
1220 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1221 
1222 	transport->sndsize = 0;
1223 	if (sndsize)
1224 		transport->sndsize = sndsize + 1024;
1225 	transport->rcvsize = 0;
1226 	if (rcvsize)
1227 		transport->rcvsize = rcvsize + 1024;
1228 
1229 	xs_udp_do_set_buffer_size(xprt);
1230 }
1231 
1232 /**
1233  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1234  * @task: task that timed out
1235  *
1236  * Adjust the congestion window after a retransmit timeout has occurred.
1237  */
1238 static void xs_udp_timer(struct rpc_task *task)
1239 {
1240 	xprt_adjust_cwnd(task, -ETIMEDOUT);
1241 }
1242 
1243 static unsigned short xs_get_random_port(void)
1244 {
1245 	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1246 	unsigned short rand = (unsigned short) net_random() % range;
1247 	return rand + xprt_min_resvport;
1248 }
1249 
1250 /**
1251  * xs_set_port - reset the port number in the remote endpoint address
1252  * @xprt: generic transport
1253  * @port: new port number
1254  *
1255  */
1256 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1257 {
1258 	struct sockaddr *addr = xs_addr(xprt);
1259 
1260 	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1261 
1262 	switch (addr->sa_family) {
1263 	case AF_INET:
1264 		((struct sockaddr_in *)addr)->sin_port = htons(port);
1265 		break;
1266 	case AF_INET6:
1267 		((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1268 		break;
1269 	default:
1270 		BUG();
1271 	}
1272 }
1273 
1274 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1275 {
1276 	struct sockaddr_in myaddr = {
1277 		.sin_family = AF_INET,
1278 	};
1279 	struct sockaddr_in *sa;
1280 	int err;
1281 	unsigned short port = transport->port;
1282 
1283 	if (!transport->xprt.resvport)
1284 		port = 0;
1285 	sa = (struct sockaddr_in *)&transport->addr;
1286 	myaddr.sin_addr = sa->sin_addr;
1287 	do {
1288 		myaddr.sin_port = htons(port);
1289 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1290 						sizeof(myaddr));
1291 		if (!transport->xprt.resvport)
1292 			break;
1293 		if (err == 0) {
1294 			transport->port = port;
1295 			break;
1296 		}
1297 		if (port <= xprt_min_resvport)
1298 			port = xprt_max_resvport;
1299 		else
1300 			port--;
1301 	} while (err == -EADDRINUSE && port != transport->port);
1302 	dprintk("RPC:       %s "NIPQUAD_FMT":%u: %s (%d)\n",
1303 			__FUNCTION__, NIPQUAD(myaddr.sin_addr),
1304 			port, err ? "failed" : "ok", err);
1305 	return err;
1306 }
1307 
1308 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1309 {
1310 	struct sockaddr_in6 myaddr = {
1311 		.sin6_family = AF_INET6,
1312 	};
1313 	struct sockaddr_in6 *sa;
1314 	int err;
1315 	unsigned short port = transport->port;
1316 
1317 	if (!transport->xprt.resvport)
1318 		port = 0;
1319 	sa = (struct sockaddr_in6 *)&transport->addr;
1320 	myaddr.sin6_addr = sa->sin6_addr;
1321 	do {
1322 		myaddr.sin6_port = htons(port);
1323 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1324 						sizeof(myaddr));
1325 		if (!transport->xprt.resvport)
1326 			break;
1327 		if (err == 0) {
1328 			transport->port = port;
1329 			break;
1330 		}
1331 		if (port <= xprt_min_resvport)
1332 			port = xprt_max_resvport;
1333 		else
1334 			port--;
1335 	} while (err == -EADDRINUSE && port != transport->port);
1336 	dprintk("RPC:       xs_bind6 "NIP6_FMT":%u: %s (%d)\n",
1337 		NIP6(myaddr.sin6_addr), port, err ? "failed" : "ok", err);
1338 	return err;
1339 }
1340 
1341 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1342 static struct lock_class_key xs_key[2];
1343 static struct lock_class_key xs_slock_key[2];
1344 
1345 static inline void xs_reclassify_socket4(struct socket *sock)
1346 {
1347 	struct sock *sk = sock->sk;
1348 
1349 	BUG_ON(sk->sk_lock.owner != NULL);
1350 	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1351 		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1352 }
1353 
1354 static inline void xs_reclassify_socket6(struct socket *sock)
1355 {
1356 	struct sock *sk = sock->sk;
1357 
1358 	BUG_ON(sk->sk_lock.owner != NULL);
1359 	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1360 		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1361 }
1362 #else
1363 static inline void xs_reclassify_socket4(struct socket *sock)
1364 {
1365 }
1366 
1367 static inline void xs_reclassify_socket6(struct socket *sock)
1368 {
1369 }
1370 #endif
1371 
1372 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1373 {
1374 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1375 
1376 	if (!transport->inet) {
1377 		struct sock *sk = sock->sk;
1378 
1379 		write_lock_bh(&sk->sk_callback_lock);
1380 
1381 		sk->sk_user_data = xprt;
1382 		transport->old_data_ready = sk->sk_data_ready;
1383 		transport->old_state_change = sk->sk_state_change;
1384 		transport->old_write_space = sk->sk_write_space;
1385 		sk->sk_data_ready = xs_udp_data_ready;
1386 		sk->sk_write_space = xs_udp_write_space;
1387 		sk->sk_no_check = UDP_CSUM_NORCV;
1388 		sk->sk_allocation = GFP_ATOMIC;
1389 
1390 		xprt_set_connected(xprt);
1391 
1392 		/* Reset to new socket */
1393 		transport->sock = sock;
1394 		transport->inet = sk;
1395 
1396 		write_unlock_bh(&sk->sk_callback_lock);
1397 	}
1398 	xs_udp_do_set_buffer_size(xprt);
1399 }
1400 
1401 /**
1402  * xs_udp_connect_worker4 - set up a UDP socket
1403  * @work: RPC transport to connect
1404  *
1405  * Invoked by a work queue tasklet.
1406  */
1407 static void xs_udp_connect_worker4(struct work_struct *work)
1408 {
1409 	struct sock_xprt *transport =
1410 		container_of(work, struct sock_xprt, connect_worker.work);
1411 	struct rpc_xprt *xprt = &transport->xprt;
1412 	struct socket *sock = transport->sock;
1413 	int err, status = -EIO;
1414 
1415 	if (xprt->shutdown || !xprt_bound(xprt))
1416 		goto out;
1417 
1418 	/* Start by resetting any existing state */
1419 	xs_close(xprt);
1420 
1421 	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1422 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1423 		goto out;
1424 	}
1425 	xs_reclassify_socket4(sock);
1426 
1427 	if (xs_bind4(transport, sock)) {
1428 		sock_release(sock);
1429 		goto out;
1430 	}
1431 
1432 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1433 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1434 
1435 	xs_udp_finish_connecting(xprt, sock);
1436 	status = 0;
1437 out:
1438 	xprt_wake_pending_tasks(xprt, status);
1439 	xprt_clear_connecting(xprt);
1440 }
1441 
1442 /**
1443  * xs_udp_connect_worker6 - set up a UDP socket
1444  * @work: RPC transport to connect
1445  *
1446  * Invoked by a work queue tasklet.
1447  */
1448 static void xs_udp_connect_worker6(struct work_struct *work)
1449 {
1450 	struct sock_xprt *transport =
1451 		container_of(work, struct sock_xprt, connect_worker.work);
1452 	struct rpc_xprt *xprt = &transport->xprt;
1453 	struct socket *sock = transport->sock;
1454 	int err, status = -EIO;
1455 
1456 	if (xprt->shutdown || !xprt_bound(xprt))
1457 		goto out;
1458 
1459 	/* Start by resetting any existing state */
1460 	xs_close(xprt);
1461 
1462 	if ((err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1463 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1464 		goto out;
1465 	}
1466 	xs_reclassify_socket6(sock);
1467 
1468 	if (xs_bind6(transport, sock) < 0) {
1469 		sock_release(sock);
1470 		goto out;
1471 	}
1472 
1473 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1474 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1475 
1476 	xs_udp_finish_connecting(xprt, sock);
1477 	status = 0;
1478 out:
1479 	xprt_wake_pending_tasks(xprt, status);
1480 	xprt_clear_connecting(xprt);
1481 }
1482 
1483 /*
1484  * We need to preserve the port number so the reply cache on the server can
1485  * find our cached RPC replies when we get around to reconnecting.
1486  */
1487 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1488 {
1489 	int result;
1490 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1491 	struct sockaddr any;
1492 
1493 	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1494 
1495 	/*
1496 	 * Disconnect the transport socket by doing a connect operation
1497 	 * with AF_UNSPEC.  This should return immediately...
1498 	 */
1499 	memset(&any, 0, sizeof(any));
1500 	any.sa_family = AF_UNSPEC;
1501 	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1502 	if (result)
1503 		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1504 				result);
1505 }
1506 
1507 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1508 {
1509 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1510 
1511 	if (!transport->inet) {
1512 		struct sock *sk = sock->sk;
1513 
1514 		write_lock_bh(&sk->sk_callback_lock);
1515 
1516 		sk->sk_user_data = xprt;
1517 		transport->old_data_ready = sk->sk_data_ready;
1518 		transport->old_state_change = sk->sk_state_change;
1519 		transport->old_write_space = sk->sk_write_space;
1520 		sk->sk_data_ready = xs_tcp_data_ready;
1521 		sk->sk_state_change = xs_tcp_state_change;
1522 		sk->sk_write_space = xs_tcp_write_space;
1523 		sk->sk_allocation = GFP_ATOMIC;
1524 
1525 		/* socket options */
1526 		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1527 		sock_reset_flag(sk, SOCK_LINGER);
1528 		tcp_sk(sk)->linger2 = 0;
1529 		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1530 
1531 		xprt_clear_connected(xprt);
1532 
1533 		/* Reset to new socket */
1534 		transport->sock = sock;
1535 		transport->inet = sk;
1536 
1537 		write_unlock_bh(&sk->sk_callback_lock);
1538 	}
1539 
1540 	/* Tell the socket layer to start connecting... */
1541 	xprt->stat.connect_count++;
1542 	xprt->stat.connect_start = jiffies;
1543 	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1544 }
1545 
1546 /**
1547  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1548  * @work: RPC transport to connect
1549  *
1550  * Invoked by a work queue tasklet.
1551  */
1552 static void xs_tcp_connect_worker4(struct work_struct *work)
1553 {
1554 	struct sock_xprt *transport =
1555 		container_of(work, struct sock_xprt, connect_worker.work);
1556 	struct rpc_xprt *xprt = &transport->xprt;
1557 	struct socket *sock = transport->sock;
1558 	int err, status = -EIO;
1559 
1560 	if (xprt->shutdown || !xprt_bound(xprt))
1561 		goto out;
1562 
1563 	if (!sock) {
1564 		/* start from scratch */
1565 		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1566 			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
1567 			goto out;
1568 		}
1569 		xs_reclassify_socket4(sock);
1570 
1571 		if (xs_bind4(transport, sock) < 0) {
1572 			sock_release(sock);
1573 			goto out;
1574 		}
1575 	} else
1576 		/* "close" the socket, preserving the local port */
1577 		xs_tcp_reuse_connection(xprt);
1578 
1579 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1580 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1581 
1582 	status = xs_tcp_finish_connecting(xprt, sock);
1583 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1584 			xprt, -status, xprt_connected(xprt),
1585 			sock->sk->sk_state);
1586 	if (status < 0) {
1587 		switch (status) {
1588 			case -EINPROGRESS:
1589 			case -EALREADY:
1590 				goto out_clear;
1591 			case -ECONNREFUSED:
1592 			case -ECONNRESET:
1593 				/* retry with existing socket, after a delay */
1594 				break;
1595 			default:
1596 				/* get rid of existing socket, and retry */
1597 				xs_close(xprt);
1598 				break;
1599 		}
1600 	}
1601 out:
1602 	xprt_wake_pending_tasks(xprt, status);
1603 out_clear:
1604 	xprt_clear_connecting(xprt);
1605 }
1606 
1607 /**
1608  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1609  * @work: RPC transport to connect
1610  *
1611  * Invoked by a work queue tasklet.
1612  */
1613 static void xs_tcp_connect_worker6(struct work_struct *work)
1614 {
1615 	struct sock_xprt *transport =
1616 		container_of(work, struct sock_xprt, connect_worker.work);
1617 	struct rpc_xprt *xprt = &transport->xprt;
1618 	struct socket *sock = transport->sock;
1619 	int err, status = -EIO;
1620 
1621 	if (xprt->shutdown || !xprt_bound(xprt))
1622 		goto out;
1623 
1624 	if (!sock) {
1625 		/* start from scratch */
1626 		if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1627 			dprintk("RPC:       can't create TCP transport socket (%d).\n", -err);
1628 			goto out;
1629 		}
1630 		xs_reclassify_socket6(sock);
1631 
1632 		if (xs_bind6(transport, sock) < 0) {
1633 			sock_release(sock);
1634 			goto out;
1635 		}
1636 	} else
1637 		/* "close" the socket, preserving the local port */
1638 		xs_tcp_reuse_connection(xprt);
1639 
1640 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1641 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1642 
1643 	status = xs_tcp_finish_connecting(xprt, sock);
1644 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1645 			xprt, -status, xprt_connected(xprt), sock->sk->sk_state);
1646 	if (status < 0) {
1647 		switch (status) {
1648 			case -EINPROGRESS:
1649 			case -EALREADY:
1650 				goto out_clear;
1651 			case -ECONNREFUSED:
1652 			case -ECONNRESET:
1653 				/* retry with existing socket, after a delay */
1654 				break;
1655 			default:
1656 				/* get rid of existing socket, and retry */
1657 				xs_close(xprt);
1658 				break;
1659 		}
1660 	}
1661 out:
1662 	xprt_wake_pending_tasks(xprt, status);
1663 out_clear:
1664 	xprt_clear_connecting(xprt);
1665 }
1666 
1667 /**
1668  * xs_connect - connect a socket to a remote endpoint
1669  * @task: address of RPC task that manages state of connect request
1670  *
1671  * TCP: If the remote end dropped the connection, delay reconnecting.
1672  *
1673  * UDP socket connects are synchronous, but we use a work queue anyway
1674  * to guarantee that even unprivileged user processes can set up a
1675  * socket on a privileged port.
1676  *
1677  * If a UDP socket connect fails, the delay behavior here prevents
1678  * retry floods (hard mounts).
1679  */
1680 static void xs_connect(struct rpc_task *task)
1681 {
1682 	struct rpc_xprt *xprt = task->tk_xprt;
1683 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1684 
1685 	if (xprt_test_and_set_connecting(xprt))
1686 		return;
1687 
1688 	if (transport->sock != NULL) {
1689 		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
1690 				"seconds\n",
1691 				xprt, xprt->reestablish_timeout / HZ);
1692 		queue_delayed_work(rpciod_workqueue,
1693 				   &transport->connect_worker,
1694 				   xprt->reestablish_timeout);
1695 		xprt->reestablish_timeout <<= 1;
1696 		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1697 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1698 	} else {
1699 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1700 		queue_delayed_work(rpciod_workqueue,
1701 				   &transport->connect_worker, 0);
1702 	}
1703 }
1704 
1705 /**
1706  * xs_udp_print_stats - display UDP socket-specifc stats
1707  * @xprt: rpc_xprt struct containing statistics
1708  * @seq: output file
1709  *
1710  */
1711 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1712 {
1713 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1714 
1715 	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1716 			transport->port,
1717 			xprt->stat.bind_count,
1718 			xprt->stat.sends,
1719 			xprt->stat.recvs,
1720 			xprt->stat.bad_xids,
1721 			xprt->stat.req_u,
1722 			xprt->stat.bklog_u);
1723 }
1724 
1725 /**
1726  * xs_tcp_print_stats - display TCP socket-specifc stats
1727  * @xprt: rpc_xprt struct containing statistics
1728  * @seq: output file
1729  *
1730  */
1731 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1732 {
1733 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1734 	long idle_time = 0;
1735 
1736 	if (xprt_connected(xprt))
1737 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
1738 
1739 	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1740 			transport->port,
1741 			xprt->stat.bind_count,
1742 			xprt->stat.connect_count,
1743 			xprt->stat.connect_time,
1744 			idle_time,
1745 			xprt->stat.sends,
1746 			xprt->stat.recvs,
1747 			xprt->stat.bad_xids,
1748 			xprt->stat.req_u,
1749 			xprt->stat.bklog_u);
1750 }
1751 
1752 static struct rpc_xprt_ops xs_udp_ops = {
1753 	.set_buffer_size	= xs_udp_set_buffer_size,
1754 	.reserve_xprt		= xprt_reserve_xprt_cong,
1755 	.release_xprt		= xprt_release_xprt_cong,
1756 	.rpcbind		= rpcb_getport_async,
1757 	.set_port		= xs_set_port,
1758 	.connect		= xs_connect,
1759 	.buf_alloc		= rpc_malloc,
1760 	.buf_free		= rpc_free,
1761 	.send_request		= xs_udp_send_request,
1762 	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
1763 	.timer			= xs_udp_timer,
1764 	.release_request	= xprt_release_rqst_cong,
1765 	.close			= xs_close,
1766 	.destroy		= xs_destroy,
1767 	.print_stats		= xs_udp_print_stats,
1768 };
1769 
1770 static struct rpc_xprt_ops xs_tcp_ops = {
1771 	.reserve_xprt		= xprt_reserve_xprt,
1772 	.release_xprt		= xs_tcp_release_xprt,
1773 	.rpcbind		= rpcb_getport_async,
1774 	.set_port		= xs_set_port,
1775 	.connect		= xs_connect,
1776 	.buf_alloc		= rpc_malloc,
1777 	.buf_free		= rpc_free,
1778 	.send_request		= xs_tcp_send_request,
1779 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
1780 	.close			= xs_close,
1781 	.destroy		= xs_destroy,
1782 	.print_stats		= xs_tcp_print_stats,
1783 };
1784 
1785 static struct rpc_xprt *xs_setup_xprt(struct rpc_xprtsock_create *args, unsigned int slot_table_size)
1786 {
1787 	struct rpc_xprt *xprt;
1788 	struct sock_xprt *new;
1789 
1790 	if (args->addrlen > sizeof(xprt->addr)) {
1791 		dprintk("RPC:       xs_setup_xprt: address too large\n");
1792 		return ERR_PTR(-EBADF);
1793 	}
1794 
1795 	new = kzalloc(sizeof(*new), GFP_KERNEL);
1796 	if (new == NULL) {
1797 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
1798 				"rpc_xprt\n");
1799 		return ERR_PTR(-ENOMEM);
1800 	}
1801 	xprt = &new->xprt;
1802 
1803 	xprt->max_reqs = slot_table_size;
1804 	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
1805 	if (xprt->slot == NULL) {
1806 		kfree(xprt);
1807 		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
1808 				"table\n");
1809 		return ERR_PTR(-ENOMEM);
1810 	}
1811 
1812 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
1813 	xprt->addrlen = args->addrlen;
1814 	if (args->srcaddr)
1815 		memcpy(&new->addr, args->srcaddr, args->addrlen);
1816 	new->port = xs_get_random_port();
1817 
1818 	return xprt;
1819 }
1820 
1821 /**
1822  * xs_setup_udp - Set up transport to use a UDP socket
1823  * @args: rpc transport creation arguments
1824  *
1825  */
1826 struct rpc_xprt *xs_setup_udp(struct rpc_xprtsock_create *args)
1827 {
1828 	struct sockaddr *addr = args->dstaddr;
1829 	struct rpc_xprt *xprt;
1830 	struct sock_xprt *transport;
1831 
1832 	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
1833 	if (IS_ERR(xprt))
1834 		return xprt;
1835 	transport = container_of(xprt, struct sock_xprt, xprt);
1836 
1837 	xprt->prot = IPPROTO_UDP;
1838 	xprt->tsh_size = 0;
1839 	/* XXX: header size can vary due to auth type, IPv6, etc. */
1840 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1841 
1842 	xprt->bind_timeout = XS_BIND_TO;
1843 	xprt->connect_timeout = XS_UDP_CONN_TO;
1844 	xprt->reestablish_timeout = XS_UDP_REEST_TO;
1845 	xprt->idle_timeout = XS_IDLE_DISC_TO;
1846 
1847 	xprt->ops = &xs_udp_ops;
1848 
1849 	if (args->timeout)
1850 		xprt->timeout = *args->timeout;
1851 	else
1852 		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1853 
1854 	switch (addr->sa_family) {
1855 	case AF_INET:
1856 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
1857 			xprt_set_bound(xprt);
1858 
1859 		INIT_DELAYED_WORK(&transport->connect_worker,
1860 					xs_udp_connect_worker4);
1861 		xs_format_ipv4_peer_addresses(xprt);
1862 		break;
1863 	case AF_INET6:
1864 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
1865 			xprt_set_bound(xprt);
1866 
1867 		INIT_DELAYED_WORK(&transport->connect_worker,
1868 					xs_udp_connect_worker6);
1869 		xs_format_ipv6_peer_addresses(xprt);
1870 		break;
1871 	default:
1872 		kfree(xprt);
1873 		return ERR_PTR(-EAFNOSUPPORT);
1874 	}
1875 
1876 	dprintk("RPC:       set up transport to address %s\n",
1877 			xprt->address_strings[RPC_DISPLAY_ALL]);
1878 
1879 	return xprt;
1880 }
1881 
1882 /**
1883  * xs_setup_tcp - Set up transport to use a TCP socket
1884  * @args: rpc transport creation arguments
1885  *
1886  */
1887 struct rpc_xprt *xs_setup_tcp(struct rpc_xprtsock_create *args)
1888 {
1889 	struct sockaddr *addr = args->dstaddr;
1890 	struct rpc_xprt *xprt;
1891 	struct sock_xprt *transport;
1892 
1893 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
1894 	if (IS_ERR(xprt))
1895 		return xprt;
1896 	transport = container_of(xprt, struct sock_xprt, xprt);
1897 
1898 	xprt->prot = IPPROTO_TCP;
1899 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1900 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1901 
1902 	xprt->bind_timeout = XS_BIND_TO;
1903 	xprt->connect_timeout = XS_TCP_CONN_TO;
1904 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1905 	xprt->idle_timeout = XS_IDLE_DISC_TO;
1906 
1907 	xprt->ops = &xs_tcp_ops;
1908 
1909 	if (args->timeout)
1910 		xprt->timeout = *args->timeout;
1911 	else
1912 		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1913 
1914 	switch (addr->sa_family) {
1915 	case AF_INET:
1916 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
1917 			xprt_set_bound(xprt);
1918 
1919 		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
1920 		xs_format_ipv4_peer_addresses(xprt);
1921 		break;
1922 	case AF_INET6:
1923 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
1924 			xprt_set_bound(xprt);
1925 
1926 		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
1927 		xs_format_ipv6_peer_addresses(xprt);
1928 		break;
1929 	default:
1930 		kfree(xprt);
1931 		return ERR_PTR(-EAFNOSUPPORT);
1932 	}
1933 
1934 	dprintk("RPC:       set up transport to address %s\n",
1935 			xprt->address_strings[RPC_DISPLAY_ALL]);
1936 
1937 	return xprt;
1938 }
1939 
1940 /**
1941  * init_socket_xprt - set up xprtsock's sysctls
1942  *
1943  */
1944 int init_socket_xprt(void)
1945 {
1946 #ifdef RPC_DEBUG
1947 	if (!sunrpc_table_header)
1948 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
1949 #endif
1950 
1951 	return 0;
1952 }
1953 
1954 /**
1955  * cleanup_socket_xprt - remove xprtsock's sysctls
1956  *
1957  */
1958 void cleanup_socket_xprt(void)
1959 {
1960 #ifdef RPC_DEBUG
1961 	if (sunrpc_table_header) {
1962 		unregister_sysctl_table(sunrpc_table_header);
1963 		sunrpc_table_header = NULL;
1964 	}
1965 #endif
1966 }
1967