xref: /openbmc/linux/net/sunrpc/xprtsock.c (revision f4a2e418bfd03a1f25f515e8a92ecd584d96cfc1)
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20 
21 #include <linux/types.h>
22 #include <linux/slab.h>
23 #include <linux/module.h>
24 #include <linux/capability.h>
25 #include <linux/pagemap.h>
26 #include <linux/errno.h>
27 #include <linux/socket.h>
28 #include <linux/in.h>
29 #include <linux/net.h>
30 #include <linux/mm.h>
31 #include <linux/udp.h>
32 #include <linux/tcp.h>
33 #include <linux/sunrpc/clnt.h>
34 #include <linux/sunrpc/sched.h>
35 #include <linux/sunrpc/xprtsock.h>
36 #include <linux/file.h>
37 
38 #include <net/sock.h>
39 #include <net/checksum.h>
40 #include <net/udp.h>
41 #include <net/tcp.h>
42 
43 /*
44  * xprtsock tunables
45  */
46 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
47 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
48 
49 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
50 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
51 
52 #define XS_TCP_LINGER_TO	(15U * HZ)
53 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
54 
55 /*
56  * We can register our own files under /proc/sys/sunrpc by
57  * calling register_sysctl_table() again.  The files in that
58  * directory become the union of all files registered there.
59  *
60  * We simply need to make sure that we don't collide with
61  * someone else's file names!
62  */
63 
64 #ifdef RPC_DEBUG
65 
66 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
67 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
68 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
69 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
70 
71 static struct ctl_table_header *sunrpc_table_header;
72 
73 /*
74  * FIXME: changing the UDP slot table size should also resize the UDP
75  *        socket buffers for existing UDP transports
76  */
77 static ctl_table xs_tunables_table[] = {
78 	{
79 		.ctl_name	= CTL_SLOTTABLE_UDP,
80 		.procname	= "udp_slot_table_entries",
81 		.data		= &xprt_udp_slot_table_entries,
82 		.maxlen		= sizeof(unsigned int),
83 		.mode		= 0644,
84 		.proc_handler	= &proc_dointvec_minmax,
85 		.strategy	= &sysctl_intvec,
86 		.extra1		= &min_slot_table_size,
87 		.extra2		= &max_slot_table_size
88 	},
89 	{
90 		.ctl_name	= CTL_SLOTTABLE_TCP,
91 		.procname	= "tcp_slot_table_entries",
92 		.data		= &xprt_tcp_slot_table_entries,
93 		.maxlen		= sizeof(unsigned int),
94 		.mode		= 0644,
95 		.proc_handler	= &proc_dointvec_minmax,
96 		.strategy	= &sysctl_intvec,
97 		.extra1		= &min_slot_table_size,
98 		.extra2		= &max_slot_table_size
99 	},
100 	{
101 		.ctl_name	= CTL_MIN_RESVPORT,
102 		.procname	= "min_resvport",
103 		.data		= &xprt_min_resvport,
104 		.maxlen		= sizeof(unsigned int),
105 		.mode		= 0644,
106 		.proc_handler	= &proc_dointvec_minmax,
107 		.strategy	= &sysctl_intvec,
108 		.extra1		= &xprt_min_resvport_limit,
109 		.extra2		= &xprt_max_resvport_limit
110 	},
111 	{
112 		.ctl_name	= CTL_MAX_RESVPORT,
113 		.procname	= "max_resvport",
114 		.data		= &xprt_max_resvport,
115 		.maxlen		= sizeof(unsigned int),
116 		.mode		= 0644,
117 		.proc_handler	= &proc_dointvec_minmax,
118 		.strategy	= &sysctl_intvec,
119 		.extra1		= &xprt_min_resvport_limit,
120 		.extra2		= &xprt_max_resvport_limit
121 	},
122 	{
123 		.procname	= "tcp_fin_timeout",
124 		.data		= &xs_tcp_fin_timeout,
125 		.maxlen		= sizeof(xs_tcp_fin_timeout),
126 		.mode		= 0644,
127 		.proc_handler	= &proc_dointvec_jiffies,
128 		.strategy	= sysctl_jiffies
129 	},
130 	{
131 		.ctl_name = 0,
132 	},
133 };
134 
135 static ctl_table sunrpc_table[] = {
136 	{
137 		.ctl_name	= CTL_SUNRPC,
138 		.procname	= "sunrpc",
139 		.mode		= 0555,
140 		.child		= xs_tunables_table
141 	},
142 	{
143 		.ctl_name = 0,
144 	},
145 };
146 
147 #endif
148 
149 /*
150  * Time out for an RPC UDP socket connect.  UDP socket connects are
151  * synchronous, but we set a timeout anyway in case of resource
152  * exhaustion on the local host.
153  */
154 #define XS_UDP_CONN_TO		(5U * HZ)
155 
156 /*
157  * Wait duration for an RPC TCP connection to be established.  Solaris
158  * NFS over TCP uses 60 seconds, for example, which is in line with how
159  * long a server takes to reboot.
160  */
161 #define XS_TCP_CONN_TO		(60U * HZ)
162 
163 /*
164  * Wait duration for a reply from the RPC portmapper.
165  */
166 #define XS_BIND_TO		(60U * HZ)
167 
168 /*
169  * Delay if a UDP socket connect error occurs.  This is most likely some
170  * kind of resource problem on the local host.
171  */
172 #define XS_UDP_REEST_TO		(2U * HZ)
173 
174 /*
175  * The reestablish timeout allows clients to delay for a bit before attempting
176  * to reconnect to a server that just dropped our connection.
177  *
178  * We implement an exponential backoff when trying to reestablish a TCP
179  * transport connection with the server.  Some servers like to drop a TCP
180  * connection when they are overworked, so we start with a short timeout and
181  * increase over time if the server is down or not responding.
182  */
183 #define XS_TCP_INIT_REEST_TO	(3U * HZ)
184 #define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
185 
186 /*
187  * TCP idle timeout; client drops the transport socket if it is idle
188  * for this long.  Note that we also timeout UDP sockets to prevent
189  * holding port numbers when there is no RPC traffic.
190  */
191 #define XS_IDLE_DISC_TO		(5U * 60 * HZ)
192 
193 #ifdef RPC_DEBUG
194 # undef  RPC_DEBUG_DATA
195 # define RPCDBG_FACILITY	RPCDBG_TRANS
196 #endif
197 
198 #ifdef RPC_DEBUG_DATA
199 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
200 {
201 	u8 *buf = (u8 *) packet;
202 	int j;
203 
204 	dprintk("RPC:       %s\n", msg);
205 	for (j = 0; j < count && j < 128; j += 4) {
206 		if (!(j & 31)) {
207 			if (j)
208 				dprintk("\n");
209 			dprintk("0x%04x ", j);
210 		}
211 		dprintk("%02x%02x%02x%02x ",
212 			buf[j], buf[j+1], buf[j+2], buf[j+3]);
213 	}
214 	dprintk("\n");
215 }
216 #else
217 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
218 {
219 	/* NOP */
220 }
221 #endif
222 
223 struct sock_xprt {
224 	struct rpc_xprt		xprt;
225 
226 	/*
227 	 * Network layer
228 	 */
229 	struct socket *		sock;
230 	struct sock *		inet;
231 
232 	/*
233 	 * State of TCP reply receive
234 	 */
235 	__be32			tcp_fraghdr,
236 				tcp_xid;
237 
238 	u32			tcp_offset,
239 				tcp_reclen;
240 
241 	unsigned long		tcp_copied,
242 				tcp_flags;
243 
244 	/*
245 	 * Connection of transports
246 	 */
247 	struct delayed_work	connect_worker;
248 	struct sockaddr_storage	addr;
249 	unsigned short		port;
250 
251 	/*
252 	 * UDP socket buffer size parameters
253 	 */
254 	size_t			rcvsize,
255 				sndsize;
256 
257 	/*
258 	 * Saved socket callback addresses
259 	 */
260 	void			(*old_data_ready)(struct sock *, int);
261 	void			(*old_state_change)(struct sock *);
262 	void			(*old_write_space)(struct sock *);
263 	void			(*old_error_report)(struct sock *);
264 };
265 
266 /*
267  * TCP receive state flags
268  */
269 #define TCP_RCV_LAST_FRAG	(1UL << 0)
270 #define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
271 #define TCP_RCV_COPY_XID	(1UL << 2)
272 #define TCP_RCV_COPY_DATA	(1UL << 3)
273 #define TCP_RCV_READ_CALLDIR	(1UL << 4)
274 #define TCP_RCV_COPY_CALLDIR	(1UL << 5)
275 
276 /*
277  * TCP RPC flags
278  */
279 #define TCP_RPC_REPLY		(1UL << 6)
280 
281 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
282 {
283 	return (struct sockaddr *) &xprt->addr;
284 }
285 
286 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
287 {
288 	return (struct sockaddr_in *) &xprt->addr;
289 }
290 
291 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
292 {
293 	return (struct sockaddr_in6 *) &xprt->addr;
294 }
295 
296 static void xs_format_ipv4_peer_addresses(struct rpc_xprt *xprt,
297 					  const char *protocol,
298 					  const char *netid)
299 {
300 	struct sockaddr_in *addr = xs_addr_in(xprt);
301 	char *buf;
302 
303 	buf = kzalloc(20, GFP_KERNEL);
304 	if (buf) {
305 		snprintf(buf, 20, "%pI4", &addr->sin_addr.s_addr);
306 	}
307 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
308 
309 	buf = kzalloc(8, GFP_KERNEL);
310 	if (buf) {
311 		snprintf(buf, 8, "%u",
312 				ntohs(addr->sin_port));
313 	}
314 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
315 
316 	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
317 
318 	buf = kzalloc(48, GFP_KERNEL);
319 	if (buf) {
320 		snprintf(buf, 48, "addr=%pI4 port=%u proto=%s",
321 			&addr->sin_addr.s_addr,
322 			ntohs(addr->sin_port),
323 			protocol);
324 	}
325 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
326 
327 	buf = kzalloc(10, GFP_KERNEL);
328 	if (buf) {
329 		snprintf(buf, 10, "%02x%02x%02x%02x",
330 				NIPQUAD(addr->sin_addr.s_addr));
331 	}
332 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
333 
334 	buf = kzalloc(8, GFP_KERNEL);
335 	if (buf) {
336 		snprintf(buf, 8, "%4hx",
337 				ntohs(addr->sin_port));
338 	}
339 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
340 
341 	buf = kzalloc(30, GFP_KERNEL);
342 	if (buf) {
343 		snprintf(buf, 30, "%pI4.%u.%u",
344 				&addr->sin_addr.s_addr,
345 				ntohs(addr->sin_port) >> 8,
346 				ntohs(addr->sin_port) & 0xff);
347 	}
348 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
349 
350 	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
351 }
352 
353 static void xs_format_ipv6_peer_addresses(struct rpc_xprt *xprt,
354 					  const char *protocol,
355 					  const char *netid)
356 {
357 	struct sockaddr_in6 *addr = xs_addr_in6(xprt);
358 	char *buf;
359 
360 	buf = kzalloc(40, GFP_KERNEL);
361 	if (buf) {
362 		snprintf(buf, 40, "%pI6",&addr->sin6_addr);
363 	}
364 	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
365 
366 	buf = kzalloc(8, GFP_KERNEL);
367 	if (buf) {
368 		snprintf(buf, 8, "%u",
369 				ntohs(addr->sin6_port));
370 	}
371 	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
372 
373 	xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
374 
375 	buf = kzalloc(64, GFP_KERNEL);
376 	if (buf) {
377 		snprintf(buf, 64, "addr=%pI6 port=%u proto=%s",
378 				&addr->sin6_addr,
379 				ntohs(addr->sin6_port),
380 				protocol);
381 	}
382 	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
383 
384 	buf = kzalloc(36, GFP_KERNEL);
385 	if (buf)
386 		snprintf(buf, 36, "%pi6", &addr->sin6_addr);
387 
388 	xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = buf;
389 
390 	buf = kzalloc(8, GFP_KERNEL);
391 	if (buf) {
392 		snprintf(buf, 8, "%4hx",
393 				ntohs(addr->sin6_port));
394 	}
395 	xprt->address_strings[RPC_DISPLAY_HEX_PORT] = buf;
396 
397 	buf = kzalloc(50, GFP_KERNEL);
398 	if (buf) {
399 		snprintf(buf, 50, "%pI6.%u.%u",
400 			 &addr->sin6_addr,
401 			 ntohs(addr->sin6_port) >> 8,
402 			 ntohs(addr->sin6_port) & 0xff);
403 	}
404 	xprt->address_strings[RPC_DISPLAY_UNIVERSAL_ADDR] = buf;
405 
406 	xprt->address_strings[RPC_DISPLAY_NETID] = netid;
407 }
408 
409 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
410 {
411 	unsigned int i;
412 
413 	for (i = 0; i < RPC_DISPLAY_MAX; i++)
414 		switch (i) {
415 		case RPC_DISPLAY_PROTO:
416 		case RPC_DISPLAY_NETID:
417 			continue;
418 		default:
419 			kfree(xprt->address_strings[i]);
420 		}
421 }
422 
423 #define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
424 
425 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
426 {
427 	struct msghdr msg = {
428 		.msg_name	= addr,
429 		.msg_namelen	= addrlen,
430 		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
431 	};
432 	struct kvec iov = {
433 		.iov_base	= vec->iov_base + base,
434 		.iov_len	= vec->iov_len - base,
435 	};
436 
437 	if (iov.iov_len != 0)
438 		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
439 	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
440 }
441 
442 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
443 {
444 	struct page **ppage;
445 	unsigned int remainder;
446 	int err, sent = 0;
447 
448 	remainder = xdr->page_len - base;
449 	base += xdr->page_base;
450 	ppage = xdr->pages + (base >> PAGE_SHIFT);
451 	base &= ~PAGE_MASK;
452 	for(;;) {
453 		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
454 		int flags = XS_SENDMSG_FLAGS;
455 
456 		remainder -= len;
457 		if (remainder != 0 || more)
458 			flags |= MSG_MORE;
459 		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
460 		if (remainder == 0 || err != len)
461 			break;
462 		sent += err;
463 		ppage++;
464 		base = 0;
465 	}
466 	if (sent == 0)
467 		return err;
468 	if (err > 0)
469 		sent += err;
470 	return sent;
471 }
472 
473 /**
474  * xs_sendpages - write pages directly to a socket
475  * @sock: socket to send on
476  * @addr: UDP only -- address of destination
477  * @addrlen: UDP only -- length of destination address
478  * @xdr: buffer containing this request
479  * @base: starting position in the buffer
480  *
481  */
482 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
483 {
484 	unsigned int remainder = xdr->len - base;
485 	int err, sent = 0;
486 
487 	if (unlikely(!sock))
488 		return -ENOTSOCK;
489 
490 	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
491 	if (base != 0) {
492 		addr = NULL;
493 		addrlen = 0;
494 	}
495 
496 	if (base < xdr->head[0].iov_len || addr != NULL) {
497 		unsigned int len = xdr->head[0].iov_len - base;
498 		remainder -= len;
499 		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
500 		if (remainder == 0 || err != len)
501 			goto out;
502 		sent += err;
503 		base = 0;
504 	} else
505 		base -= xdr->head[0].iov_len;
506 
507 	if (base < xdr->page_len) {
508 		unsigned int len = xdr->page_len - base;
509 		remainder -= len;
510 		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
511 		if (remainder == 0 || err != len)
512 			goto out;
513 		sent += err;
514 		base = 0;
515 	} else
516 		base -= xdr->page_len;
517 
518 	if (base >= xdr->tail[0].iov_len)
519 		return sent;
520 	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
521 out:
522 	if (sent == 0)
523 		return err;
524 	if (err > 0)
525 		sent += err;
526 	return sent;
527 }
528 
529 static void xs_nospace_callback(struct rpc_task *task)
530 {
531 	struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
532 
533 	transport->inet->sk_write_pending--;
534 	clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
535 }
536 
537 /**
538  * xs_nospace - place task on wait queue if transmit was incomplete
539  * @task: task to put to sleep
540  *
541  */
542 static int xs_nospace(struct rpc_task *task)
543 {
544 	struct rpc_rqst *req = task->tk_rqstp;
545 	struct rpc_xprt *xprt = req->rq_xprt;
546 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
547 	int ret = 0;
548 
549 	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
550 			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
551 			req->rq_slen);
552 
553 	/* Protect against races with write_space */
554 	spin_lock_bh(&xprt->transport_lock);
555 
556 	/* Don't race with disconnect */
557 	if (xprt_connected(xprt)) {
558 		if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
559 			ret = -EAGAIN;
560 			/*
561 			 * Notify TCP that we're limited by the application
562 			 * window size
563 			 */
564 			set_bit(SOCK_NOSPACE, &transport->sock->flags);
565 			transport->inet->sk_write_pending++;
566 			/* ...and wait for more buffer space */
567 			xprt_wait_for_buffer_space(task, xs_nospace_callback);
568 		}
569 	} else {
570 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
571 		ret = -ENOTCONN;
572 	}
573 
574 	spin_unlock_bh(&xprt->transport_lock);
575 	return ret;
576 }
577 
578 /**
579  * xs_udp_send_request - write an RPC request to a UDP socket
580  * @task: address of RPC task that manages the state of an RPC request
581  *
582  * Return values:
583  *        0:	The request has been sent
584  *   EAGAIN:	The socket was blocked, please call again later to
585  *		complete the request
586  * ENOTCONN:	Caller needs to invoke connect logic then call again
587  *    other:	Some other error occured, the request was not sent
588  */
589 static int xs_udp_send_request(struct rpc_task *task)
590 {
591 	struct rpc_rqst *req = task->tk_rqstp;
592 	struct rpc_xprt *xprt = req->rq_xprt;
593 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
594 	struct xdr_buf *xdr = &req->rq_snd_buf;
595 	int status;
596 
597 	xs_pktdump("packet data:",
598 				req->rq_svec->iov_base,
599 				req->rq_svec->iov_len);
600 
601 	if (!xprt_bound(xprt))
602 		return -ENOTCONN;
603 	status = xs_sendpages(transport->sock,
604 			      xs_addr(xprt),
605 			      xprt->addrlen, xdr,
606 			      req->rq_bytes_sent);
607 
608 	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
609 			xdr->len - req->rq_bytes_sent, status);
610 
611 	if (status >= 0) {
612 		task->tk_bytes_sent += status;
613 		if (status >= req->rq_slen)
614 			return 0;
615 		/* Still some bytes left; set up for a retry later. */
616 		status = -EAGAIN;
617 	}
618 	if (!transport->sock)
619 		goto out;
620 
621 	switch (status) {
622 	case -ENOTSOCK:
623 		status = -ENOTCONN;
624 		/* Should we call xs_close() here? */
625 		break;
626 	case -EAGAIN:
627 		status = xs_nospace(task);
628 		break;
629 	default:
630 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
631 			-status);
632 	case -ENETUNREACH:
633 	case -EPIPE:
634 	case -ECONNREFUSED:
635 		/* When the server has died, an ICMP port unreachable message
636 		 * prompts ECONNREFUSED. */
637 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
638 	}
639 out:
640 	return status;
641 }
642 
643 /**
644  * xs_tcp_shutdown - gracefully shut down a TCP socket
645  * @xprt: transport
646  *
647  * Initiates a graceful shutdown of the TCP socket by calling the
648  * equivalent of shutdown(SHUT_WR);
649  */
650 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
651 {
652 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
653 	struct socket *sock = transport->sock;
654 
655 	if (sock != NULL)
656 		kernel_sock_shutdown(sock, SHUT_WR);
657 }
658 
659 static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
660 {
661 	u32 reclen = buf->len - sizeof(rpc_fraghdr);
662 	rpc_fraghdr *base = buf->head[0].iov_base;
663 	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
664 }
665 
666 /**
667  * xs_tcp_send_request - write an RPC request to a TCP socket
668  * @task: address of RPC task that manages the state of an RPC request
669  *
670  * Return values:
671  *        0:	The request has been sent
672  *   EAGAIN:	The socket was blocked, please call again later to
673  *		complete the request
674  * ENOTCONN:	Caller needs to invoke connect logic then call again
675  *    other:	Some other error occured, the request was not sent
676  *
677  * XXX: In the case of soft timeouts, should we eventually give up
678  *	if sendmsg is not able to make progress?
679  */
680 static int xs_tcp_send_request(struct rpc_task *task)
681 {
682 	struct rpc_rqst *req = task->tk_rqstp;
683 	struct rpc_xprt *xprt = req->rq_xprt;
684 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
685 	struct xdr_buf *xdr = &req->rq_snd_buf;
686 	int status;
687 
688 	xs_encode_tcp_record_marker(&req->rq_snd_buf);
689 
690 	xs_pktdump("packet data:",
691 				req->rq_svec->iov_base,
692 				req->rq_svec->iov_len);
693 
694 	/* Continue transmitting the packet/record. We must be careful
695 	 * to cope with writespace callbacks arriving _after_ we have
696 	 * called sendmsg(). */
697 	while (1) {
698 		status = xs_sendpages(transport->sock,
699 					NULL, 0, xdr, req->rq_bytes_sent);
700 
701 		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
702 				xdr->len - req->rq_bytes_sent, status);
703 
704 		if (unlikely(status < 0))
705 			break;
706 
707 		/* If we've sent the entire packet, immediately
708 		 * reset the count of bytes sent. */
709 		req->rq_bytes_sent += status;
710 		task->tk_bytes_sent += status;
711 		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
712 			req->rq_bytes_sent = 0;
713 			return 0;
714 		}
715 
716 		if (status != 0)
717 			continue;
718 		status = -EAGAIN;
719 		break;
720 	}
721 	if (!transport->sock)
722 		goto out;
723 
724 	switch (status) {
725 	case -ENOTSOCK:
726 		status = -ENOTCONN;
727 		/* Should we call xs_close() here? */
728 		break;
729 	case -EAGAIN:
730 		status = xs_nospace(task);
731 		break;
732 	default:
733 		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
734 			-status);
735 	case -ECONNRESET:
736 	case -EPIPE:
737 		xs_tcp_shutdown(xprt);
738 	case -ECONNREFUSED:
739 	case -ENOTCONN:
740 		clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
741 	}
742 out:
743 	return status;
744 }
745 
746 /**
747  * xs_tcp_release_xprt - clean up after a tcp transmission
748  * @xprt: transport
749  * @task: rpc task
750  *
751  * This cleans up if an error causes us to abort the transmission of a request.
752  * In this case, the socket may need to be reset in order to avoid confusing
753  * the server.
754  */
755 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
756 {
757 	struct rpc_rqst *req;
758 
759 	if (task != xprt->snd_task)
760 		return;
761 	if (task == NULL)
762 		goto out_release;
763 	req = task->tk_rqstp;
764 	if (req->rq_bytes_sent == 0)
765 		goto out_release;
766 	if (req->rq_bytes_sent == req->rq_snd_buf.len)
767 		goto out_release;
768 	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
769 out_release:
770 	xprt_release_xprt(xprt, task);
771 }
772 
773 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
774 {
775 	transport->old_data_ready = sk->sk_data_ready;
776 	transport->old_state_change = sk->sk_state_change;
777 	transport->old_write_space = sk->sk_write_space;
778 	transport->old_error_report = sk->sk_error_report;
779 }
780 
781 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
782 {
783 	sk->sk_data_ready = transport->old_data_ready;
784 	sk->sk_state_change = transport->old_state_change;
785 	sk->sk_write_space = transport->old_write_space;
786 	sk->sk_error_report = transport->old_error_report;
787 }
788 
789 static void xs_reset_transport(struct sock_xprt *transport)
790 {
791 	struct socket *sock = transport->sock;
792 	struct sock *sk = transport->inet;
793 
794 	if (sk == NULL)
795 		return;
796 
797 	write_lock_bh(&sk->sk_callback_lock);
798 	transport->inet = NULL;
799 	transport->sock = NULL;
800 
801 	sk->sk_user_data = NULL;
802 
803 	xs_restore_old_callbacks(transport, sk);
804 	write_unlock_bh(&sk->sk_callback_lock);
805 
806 	sk->sk_no_check = 0;
807 
808 	sock_release(sock);
809 }
810 
811 /**
812  * xs_close - close a socket
813  * @xprt: transport
814  *
815  * This is used when all requests are complete; ie, no DRC state remains
816  * on the server we want to save.
817  *
818  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
819  * xs_reset_transport() zeroing the socket from underneath a writer.
820  */
821 static void xs_close(struct rpc_xprt *xprt)
822 {
823 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
824 
825 	dprintk("RPC:       xs_close xprt %p\n", xprt);
826 
827 	xs_reset_transport(transport);
828 
829 	smp_mb__before_clear_bit();
830 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
831 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
832 	clear_bit(XPRT_CLOSING, &xprt->state);
833 	smp_mb__after_clear_bit();
834 	xprt_disconnect_done(xprt);
835 }
836 
837 static void xs_tcp_close(struct rpc_xprt *xprt)
838 {
839 	if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
840 		xs_close(xprt);
841 	else
842 		xs_tcp_shutdown(xprt);
843 }
844 
845 /**
846  * xs_destroy - prepare to shutdown a transport
847  * @xprt: doomed transport
848  *
849  */
850 static void xs_destroy(struct rpc_xprt *xprt)
851 {
852 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
853 
854 	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
855 
856 	cancel_rearming_delayed_work(&transport->connect_worker);
857 
858 	xs_close(xprt);
859 	xs_free_peer_addresses(xprt);
860 	kfree(xprt->slot);
861 	kfree(xprt);
862 	module_put(THIS_MODULE);
863 }
864 
865 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
866 {
867 	return (struct rpc_xprt *) sk->sk_user_data;
868 }
869 
870 /**
871  * xs_udp_data_ready - "data ready" callback for UDP sockets
872  * @sk: socket with data to read
873  * @len: how much data to read
874  *
875  */
876 static void xs_udp_data_ready(struct sock *sk, int len)
877 {
878 	struct rpc_task *task;
879 	struct rpc_xprt *xprt;
880 	struct rpc_rqst *rovr;
881 	struct sk_buff *skb;
882 	int err, repsize, copied;
883 	u32 _xid;
884 	__be32 *xp;
885 
886 	read_lock(&sk->sk_callback_lock);
887 	dprintk("RPC:       xs_udp_data_ready...\n");
888 	if (!(xprt = xprt_from_sock(sk)))
889 		goto out;
890 
891 	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
892 		goto out;
893 
894 	if (xprt->shutdown)
895 		goto dropit;
896 
897 	repsize = skb->len - sizeof(struct udphdr);
898 	if (repsize < 4) {
899 		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
900 		goto dropit;
901 	}
902 
903 	/* Copy the XID from the skb... */
904 	xp = skb_header_pointer(skb, sizeof(struct udphdr),
905 				sizeof(_xid), &_xid);
906 	if (xp == NULL)
907 		goto dropit;
908 
909 	/* Look up and lock the request corresponding to the given XID */
910 	spin_lock(&xprt->transport_lock);
911 	rovr = xprt_lookup_rqst(xprt, *xp);
912 	if (!rovr)
913 		goto out_unlock;
914 	task = rovr->rq_task;
915 
916 	if ((copied = rovr->rq_private_buf.buflen) > repsize)
917 		copied = repsize;
918 
919 	/* Suck it into the iovec, verify checksum if not done by hw. */
920 	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
921 		UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
922 		goto out_unlock;
923 	}
924 
925 	UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
926 
927 	/* Something worked... */
928 	dst_confirm(skb->dst);
929 
930 	xprt_adjust_cwnd(task, copied);
931 	xprt_update_rtt(task);
932 	xprt_complete_rqst(task, copied);
933 
934  out_unlock:
935 	spin_unlock(&xprt->transport_lock);
936  dropit:
937 	skb_free_datagram(sk, skb);
938  out:
939 	read_unlock(&sk->sk_callback_lock);
940 }
941 
942 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
943 {
944 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
945 	size_t len, used;
946 	char *p;
947 
948 	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
949 	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
950 	used = xdr_skb_read_bits(desc, p, len);
951 	transport->tcp_offset += used;
952 	if (used != len)
953 		return;
954 
955 	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
956 	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
957 		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
958 	else
959 		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
960 	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
961 
962 	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
963 	transport->tcp_offset = 0;
964 
965 	/* Sanity check of the record length */
966 	if (unlikely(transport->tcp_reclen < 8)) {
967 		dprintk("RPC:       invalid TCP record fragment length\n");
968 		xprt_force_disconnect(xprt);
969 		return;
970 	}
971 	dprintk("RPC:       reading TCP record fragment of length %d\n",
972 			transport->tcp_reclen);
973 }
974 
975 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
976 {
977 	if (transport->tcp_offset == transport->tcp_reclen) {
978 		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
979 		transport->tcp_offset = 0;
980 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
981 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
982 			transport->tcp_flags |= TCP_RCV_COPY_XID;
983 			transport->tcp_copied = 0;
984 		}
985 	}
986 }
987 
988 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
989 {
990 	size_t len, used;
991 	char *p;
992 
993 	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
994 	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
995 	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
996 	used = xdr_skb_read_bits(desc, p, len);
997 	transport->tcp_offset += used;
998 	if (used != len)
999 		return;
1000 	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1001 	transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1002 	transport->tcp_copied = 4;
1003 	dprintk("RPC:       reading %s XID %08x\n",
1004 			(transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1005 							      : "request with",
1006 			ntohl(transport->tcp_xid));
1007 	xs_tcp_check_fraghdr(transport);
1008 }
1009 
1010 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1011 				       struct xdr_skb_reader *desc)
1012 {
1013 	size_t len, used;
1014 	u32 offset;
1015 	__be32	calldir;
1016 
1017 	/*
1018 	 * We want transport->tcp_offset to be 8 at the end of this routine
1019 	 * (4 bytes for the xid and 4 bytes for the call/reply flag).
1020 	 * When this function is called for the first time,
1021 	 * transport->tcp_offset is 4 (after having already read the xid).
1022 	 */
1023 	offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1024 	len = sizeof(calldir) - offset;
1025 	dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1026 	used = xdr_skb_read_bits(desc, &calldir, len);
1027 	transport->tcp_offset += used;
1028 	if (used != len)
1029 		return;
1030 	transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1031 	transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1032 	transport->tcp_flags |= TCP_RCV_COPY_DATA;
1033 	/*
1034 	 * We don't yet have the XDR buffer, so we will write the calldir
1035 	 * out after we get the buffer from the 'struct rpc_rqst'
1036 	 */
1037 	if (ntohl(calldir) == RPC_REPLY)
1038 		transport->tcp_flags |= TCP_RPC_REPLY;
1039 	else
1040 		transport->tcp_flags &= ~TCP_RPC_REPLY;
1041 	dprintk("RPC:       reading %s CALL/REPLY flag %08x\n",
1042 			(transport->tcp_flags & TCP_RPC_REPLY) ?
1043 				"reply for" : "request with", calldir);
1044 	xs_tcp_check_fraghdr(transport);
1045 }
1046 
1047 static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1048 {
1049 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1050 	struct rpc_rqst *req;
1051 	struct xdr_buf *rcvbuf;
1052 	size_t len;
1053 	ssize_t r;
1054 
1055 	/* Find and lock the request corresponding to this xid */
1056 	spin_lock(&xprt->transport_lock);
1057 	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1058 	if (!req) {
1059 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1060 		dprintk("RPC:       XID %08x request not found!\n",
1061 				ntohl(transport->tcp_xid));
1062 		spin_unlock(&xprt->transport_lock);
1063 		return;
1064 	}
1065 
1066 	rcvbuf = &req->rq_private_buf;
1067 
1068 	if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1069 		/*
1070 		 * Save the RPC direction in the XDR buffer
1071 		 */
1072 		__be32	calldir = transport->tcp_flags & TCP_RPC_REPLY ?
1073 					htonl(RPC_REPLY) : 0;
1074 
1075 		memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1076 			&calldir, sizeof(calldir));
1077 		transport->tcp_copied += sizeof(calldir);
1078 		transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1079 	}
1080 
1081 	len = desc->count;
1082 	if (len > transport->tcp_reclen - transport->tcp_offset) {
1083 		struct xdr_skb_reader my_desc;
1084 
1085 		len = transport->tcp_reclen - transport->tcp_offset;
1086 		memcpy(&my_desc, desc, sizeof(my_desc));
1087 		my_desc.count = len;
1088 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1089 					  &my_desc, xdr_skb_read_bits);
1090 		desc->count -= r;
1091 		desc->offset += r;
1092 	} else
1093 		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1094 					  desc, xdr_skb_read_bits);
1095 
1096 	if (r > 0) {
1097 		transport->tcp_copied += r;
1098 		transport->tcp_offset += r;
1099 	}
1100 	if (r != len) {
1101 		/* Error when copying to the receive buffer,
1102 		 * usually because we weren't able to allocate
1103 		 * additional buffer pages. All we can do now
1104 		 * is turn off TCP_RCV_COPY_DATA, so the request
1105 		 * will not receive any additional updates,
1106 		 * and time out.
1107 		 * Any remaining data from this record will
1108 		 * be discarded.
1109 		 */
1110 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1111 		dprintk("RPC:       XID %08x truncated request\n",
1112 				ntohl(transport->tcp_xid));
1113 		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1114 				"tcp_offset = %u, tcp_reclen = %u\n",
1115 				xprt, transport->tcp_copied,
1116 				transport->tcp_offset, transport->tcp_reclen);
1117 		goto out;
1118 	}
1119 
1120 	dprintk("RPC:       XID %08x read %Zd bytes\n",
1121 			ntohl(transport->tcp_xid), r);
1122 	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1123 			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
1124 			transport->tcp_offset, transport->tcp_reclen);
1125 
1126 	if (transport->tcp_copied == req->rq_private_buf.buflen)
1127 		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1128 	else if (transport->tcp_offset == transport->tcp_reclen) {
1129 		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1130 			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1131 	}
1132 
1133 out:
1134 	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1135 		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1136 	spin_unlock(&xprt->transport_lock);
1137 	xs_tcp_check_fraghdr(transport);
1138 }
1139 
1140 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1141 {
1142 	size_t len;
1143 
1144 	len = transport->tcp_reclen - transport->tcp_offset;
1145 	if (len > desc->count)
1146 		len = desc->count;
1147 	desc->count -= len;
1148 	desc->offset += len;
1149 	transport->tcp_offset += len;
1150 	dprintk("RPC:       discarded %Zu bytes\n", len);
1151 	xs_tcp_check_fraghdr(transport);
1152 }
1153 
1154 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1155 {
1156 	struct rpc_xprt *xprt = rd_desc->arg.data;
1157 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1158 	struct xdr_skb_reader desc = {
1159 		.skb	= skb,
1160 		.offset	= offset,
1161 		.count	= len,
1162 	};
1163 
1164 	dprintk("RPC:       xs_tcp_data_recv started\n");
1165 	do {
1166 		/* Read in a new fragment marker if necessary */
1167 		/* Can we ever really expect to get completely empty fragments? */
1168 		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1169 			xs_tcp_read_fraghdr(xprt, &desc);
1170 			continue;
1171 		}
1172 		/* Read in the xid if necessary */
1173 		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1174 			xs_tcp_read_xid(transport, &desc);
1175 			continue;
1176 		}
1177 		/* Read in the call/reply flag */
1178 		if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1179 			xs_tcp_read_calldir(transport, &desc);
1180 			continue;
1181 		}
1182 		/* Read in the request data */
1183 		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1184 			xs_tcp_read_request(xprt, &desc);
1185 			continue;
1186 		}
1187 		/* Skip over any trailing bytes on short reads */
1188 		xs_tcp_read_discard(transport, &desc);
1189 	} while (desc.count);
1190 	dprintk("RPC:       xs_tcp_data_recv done\n");
1191 	return len - desc.count;
1192 }
1193 
1194 /**
1195  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1196  * @sk: socket with data to read
1197  * @bytes: how much data to read
1198  *
1199  */
1200 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1201 {
1202 	struct rpc_xprt *xprt;
1203 	read_descriptor_t rd_desc;
1204 	int read;
1205 
1206 	dprintk("RPC:       xs_tcp_data_ready...\n");
1207 
1208 	read_lock(&sk->sk_callback_lock);
1209 	if (!(xprt = xprt_from_sock(sk)))
1210 		goto out;
1211 	if (xprt->shutdown)
1212 		goto out;
1213 
1214 	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1215 	rd_desc.arg.data = xprt;
1216 	do {
1217 		rd_desc.count = 65536;
1218 		read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1219 	} while (read > 0);
1220 out:
1221 	read_unlock(&sk->sk_callback_lock);
1222 }
1223 
1224 /*
1225  * Do the equivalent of linger/linger2 handling for dealing with
1226  * broken servers that don't close the socket in a timely
1227  * fashion
1228  */
1229 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1230 		unsigned long timeout)
1231 {
1232 	struct sock_xprt *transport;
1233 
1234 	if (xprt_test_and_set_connecting(xprt))
1235 		return;
1236 	set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1237 	transport = container_of(xprt, struct sock_xprt, xprt);
1238 	queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1239 			   timeout);
1240 }
1241 
1242 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1243 {
1244 	struct sock_xprt *transport;
1245 
1246 	transport = container_of(xprt, struct sock_xprt, xprt);
1247 
1248 	if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1249 	    !cancel_delayed_work(&transport->connect_worker))
1250 		return;
1251 	clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1252 	xprt_clear_connecting(xprt);
1253 }
1254 
1255 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1256 {
1257 	smp_mb__before_clear_bit();
1258 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1259 	clear_bit(XPRT_CLOSING, &xprt->state);
1260 	smp_mb__after_clear_bit();
1261 	/* Mark transport as closed and wake up all pending tasks */
1262 	xprt_disconnect_done(xprt);
1263 }
1264 
1265 /**
1266  * xs_tcp_state_change - callback to handle TCP socket state changes
1267  * @sk: socket whose state has changed
1268  *
1269  */
1270 static void xs_tcp_state_change(struct sock *sk)
1271 {
1272 	struct rpc_xprt *xprt;
1273 
1274 	read_lock(&sk->sk_callback_lock);
1275 	if (!(xprt = xprt_from_sock(sk)))
1276 		goto out;
1277 	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1278 	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
1279 			sk->sk_state, xprt_connected(xprt),
1280 			sock_flag(sk, SOCK_DEAD),
1281 			sock_flag(sk, SOCK_ZAPPED));
1282 
1283 	switch (sk->sk_state) {
1284 	case TCP_ESTABLISHED:
1285 		spin_lock_bh(&xprt->transport_lock);
1286 		if (!xprt_test_and_set_connected(xprt)) {
1287 			struct sock_xprt *transport = container_of(xprt,
1288 					struct sock_xprt, xprt);
1289 
1290 			/* Reset TCP record info */
1291 			transport->tcp_offset = 0;
1292 			transport->tcp_reclen = 0;
1293 			transport->tcp_copied = 0;
1294 			transport->tcp_flags =
1295 				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1296 
1297 			xprt_wake_pending_tasks(xprt, -EAGAIN);
1298 		}
1299 		spin_unlock_bh(&xprt->transport_lock);
1300 		break;
1301 	case TCP_FIN_WAIT1:
1302 		/* The client initiated a shutdown of the socket */
1303 		xprt->connect_cookie++;
1304 		xprt->reestablish_timeout = 0;
1305 		set_bit(XPRT_CLOSING, &xprt->state);
1306 		smp_mb__before_clear_bit();
1307 		clear_bit(XPRT_CONNECTED, &xprt->state);
1308 		clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1309 		smp_mb__after_clear_bit();
1310 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1311 		break;
1312 	case TCP_CLOSE_WAIT:
1313 		/* The server initiated a shutdown of the socket */
1314 		xprt_force_disconnect(xprt);
1315 	case TCP_SYN_SENT:
1316 		xprt->connect_cookie++;
1317 	case TCP_CLOSING:
1318 		/*
1319 		 * If the server closed down the connection, make sure that
1320 		 * we back off before reconnecting
1321 		 */
1322 		if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1323 			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1324 		break;
1325 	case TCP_LAST_ACK:
1326 		set_bit(XPRT_CLOSING, &xprt->state);
1327 		xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1328 		smp_mb__before_clear_bit();
1329 		clear_bit(XPRT_CONNECTED, &xprt->state);
1330 		smp_mb__after_clear_bit();
1331 		break;
1332 	case TCP_CLOSE:
1333 		xs_tcp_cancel_linger_timeout(xprt);
1334 		xs_sock_mark_closed(xprt);
1335 	}
1336  out:
1337 	read_unlock(&sk->sk_callback_lock);
1338 }
1339 
1340 /**
1341  * xs_error_report - callback mainly for catching socket errors
1342  * @sk: socket
1343  */
1344 static void xs_error_report(struct sock *sk)
1345 {
1346 	struct rpc_xprt *xprt;
1347 
1348 	read_lock(&sk->sk_callback_lock);
1349 	if (!(xprt = xprt_from_sock(sk)))
1350 		goto out;
1351 	dprintk("RPC:       %s client %p...\n"
1352 			"RPC:       error %d\n",
1353 			__func__, xprt, sk->sk_err);
1354 	xprt_wake_pending_tasks(xprt, -EAGAIN);
1355 out:
1356 	read_unlock(&sk->sk_callback_lock);
1357 }
1358 
1359 static void xs_write_space(struct sock *sk)
1360 {
1361 	struct socket *sock;
1362 	struct rpc_xprt *xprt;
1363 
1364 	if (unlikely(!(sock = sk->sk_socket)))
1365 		return;
1366 	clear_bit(SOCK_NOSPACE, &sock->flags);
1367 
1368 	if (unlikely(!(xprt = xprt_from_sock(sk))))
1369 		return;
1370 	if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1371 		return;
1372 
1373 	xprt_write_space(xprt);
1374 }
1375 
1376 /**
1377  * xs_udp_write_space - callback invoked when socket buffer space
1378  *                             becomes available
1379  * @sk: socket whose state has changed
1380  *
1381  * Called when more output buffer space is available for this socket.
1382  * We try not to wake our writers until they can make "significant"
1383  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1384  * with a bunch of small requests.
1385  */
1386 static void xs_udp_write_space(struct sock *sk)
1387 {
1388 	read_lock(&sk->sk_callback_lock);
1389 
1390 	/* from net/core/sock.c:sock_def_write_space */
1391 	if (sock_writeable(sk))
1392 		xs_write_space(sk);
1393 
1394 	read_unlock(&sk->sk_callback_lock);
1395 }
1396 
1397 /**
1398  * xs_tcp_write_space - callback invoked when socket buffer space
1399  *                             becomes available
1400  * @sk: socket whose state has changed
1401  *
1402  * Called when more output buffer space is available for this socket.
1403  * We try not to wake our writers until they can make "significant"
1404  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1405  * with a bunch of small requests.
1406  */
1407 static void xs_tcp_write_space(struct sock *sk)
1408 {
1409 	read_lock(&sk->sk_callback_lock);
1410 
1411 	/* from net/core/stream.c:sk_stream_write_space */
1412 	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1413 		xs_write_space(sk);
1414 
1415 	read_unlock(&sk->sk_callback_lock);
1416 }
1417 
1418 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1419 {
1420 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1421 	struct sock *sk = transport->inet;
1422 
1423 	if (transport->rcvsize) {
1424 		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1425 		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1426 	}
1427 	if (transport->sndsize) {
1428 		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1429 		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1430 		sk->sk_write_space(sk);
1431 	}
1432 }
1433 
1434 /**
1435  * xs_udp_set_buffer_size - set send and receive limits
1436  * @xprt: generic transport
1437  * @sndsize: requested size of send buffer, in bytes
1438  * @rcvsize: requested size of receive buffer, in bytes
1439  *
1440  * Set socket send and receive buffer size limits.
1441  */
1442 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1443 {
1444 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1445 
1446 	transport->sndsize = 0;
1447 	if (sndsize)
1448 		transport->sndsize = sndsize + 1024;
1449 	transport->rcvsize = 0;
1450 	if (rcvsize)
1451 		transport->rcvsize = rcvsize + 1024;
1452 
1453 	xs_udp_do_set_buffer_size(xprt);
1454 }
1455 
1456 /**
1457  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1458  * @task: task that timed out
1459  *
1460  * Adjust the congestion window after a retransmit timeout has occurred.
1461  */
1462 static void xs_udp_timer(struct rpc_task *task)
1463 {
1464 	xprt_adjust_cwnd(task, -ETIMEDOUT);
1465 }
1466 
1467 static unsigned short xs_get_random_port(void)
1468 {
1469 	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1470 	unsigned short rand = (unsigned short) net_random() % range;
1471 	return rand + xprt_min_resvport;
1472 }
1473 
1474 /**
1475  * xs_set_port - reset the port number in the remote endpoint address
1476  * @xprt: generic transport
1477  * @port: new port number
1478  *
1479  */
1480 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1481 {
1482 	struct sockaddr *addr = xs_addr(xprt);
1483 
1484 	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1485 
1486 	switch (addr->sa_family) {
1487 	case AF_INET:
1488 		((struct sockaddr_in *)addr)->sin_port = htons(port);
1489 		break;
1490 	case AF_INET6:
1491 		((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
1492 		break;
1493 	default:
1494 		BUG();
1495 	}
1496 }
1497 
1498 static unsigned short xs_get_srcport(struct sock_xprt *transport, struct socket *sock)
1499 {
1500 	unsigned short port = transport->port;
1501 
1502 	if (port == 0 && transport->xprt.resvport)
1503 		port = xs_get_random_port();
1504 	return port;
1505 }
1506 
1507 static unsigned short xs_next_srcport(struct sock_xprt *transport, struct socket *sock, unsigned short port)
1508 {
1509 	if (transport->port != 0)
1510 		transport->port = 0;
1511 	if (!transport->xprt.resvport)
1512 		return 0;
1513 	if (port <= xprt_min_resvport || port > xprt_max_resvport)
1514 		return xprt_max_resvport;
1515 	return --port;
1516 }
1517 
1518 static int xs_bind4(struct sock_xprt *transport, struct socket *sock)
1519 {
1520 	struct sockaddr_in myaddr = {
1521 		.sin_family = AF_INET,
1522 	};
1523 	struct sockaddr_in *sa;
1524 	int err, nloop = 0;
1525 	unsigned short port = xs_get_srcport(transport, sock);
1526 	unsigned short last;
1527 
1528 	sa = (struct sockaddr_in *)&transport->addr;
1529 	myaddr.sin_addr = sa->sin_addr;
1530 	do {
1531 		myaddr.sin_port = htons(port);
1532 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1533 						sizeof(myaddr));
1534 		if (port == 0)
1535 			break;
1536 		if (err == 0) {
1537 			transport->port = port;
1538 			break;
1539 		}
1540 		last = port;
1541 		port = xs_next_srcport(transport, sock, port);
1542 		if (port > last)
1543 			nloop++;
1544 	} while (err == -EADDRINUSE && nloop != 2);
1545 	dprintk("RPC:       %s %pI4:%u: %s (%d)\n",
1546 			__func__, &myaddr.sin_addr,
1547 			port, err ? "failed" : "ok", err);
1548 	return err;
1549 }
1550 
1551 static int xs_bind6(struct sock_xprt *transport, struct socket *sock)
1552 {
1553 	struct sockaddr_in6 myaddr = {
1554 		.sin6_family = AF_INET6,
1555 	};
1556 	struct sockaddr_in6 *sa;
1557 	int err, nloop = 0;
1558 	unsigned short port = xs_get_srcport(transport, sock);
1559 	unsigned short last;
1560 
1561 	sa = (struct sockaddr_in6 *)&transport->addr;
1562 	myaddr.sin6_addr = sa->sin6_addr;
1563 	do {
1564 		myaddr.sin6_port = htons(port);
1565 		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1566 						sizeof(myaddr));
1567 		if (port == 0)
1568 			break;
1569 		if (err == 0) {
1570 			transport->port = port;
1571 			break;
1572 		}
1573 		last = port;
1574 		port = xs_next_srcport(transport, sock, port);
1575 		if (port > last)
1576 			nloop++;
1577 	} while (err == -EADDRINUSE && nloop != 2);
1578 	dprintk("RPC:       xs_bind6 %pI6:%u: %s (%d)\n",
1579 		&myaddr.sin6_addr, port, err ? "failed" : "ok", err);
1580 	return err;
1581 }
1582 
1583 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1584 static struct lock_class_key xs_key[2];
1585 static struct lock_class_key xs_slock_key[2];
1586 
1587 static inline void xs_reclassify_socket4(struct socket *sock)
1588 {
1589 	struct sock *sk = sock->sk;
1590 
1591 	BUG_ON(sock_owned_by_user(sk));
1592 	sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1593 		&xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1594 }
1595 
1596 static inline void xs_reclassify_socket6(struct socket *sock)
1597 {
1598 	struct sock *sk = sock->sk;
1599 
1600 	BUG_ON(sock_owned_by_user(sk));
1601 	sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1602 		&xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1603 }
1604 #else
1605 static inline void xs_reclassify_socket4(struct socket *sock)
1606 {
1607 }
1608 
1609 static inline void xs_reclassify_socket6(struct socket *sock)
1610 {
1611 }
1612 #endif
1613 
1614 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1615 {
1616 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1617 
1618 	if (!transport->inet) {
1619 		struct sock *sk = sock->sk;
1620 
1621 		write_lock_bh(&sk->sk_callback_lock);
1622 
1623 		xs_save_old_callbacks(transport, sk);
1624 
1625 		sk->sk_user_data = xprt;
1626 		sk->sk_data_ready = xs_udp_data_ready;
1627 		sk->sk_write_space = xs_udp_write_space;
1628 		sk->sk_error_report = xs_error_report;
1629 		sk->sk_no_check = UDP_CSUM_NORCV;
1630 		sk->sk_allocation = GFP_ATOMIC;
1631 
1632 		xprt_set_connected(xprt);
1633 
1634 		/* Reset to new socket */
1635 		transport->sock = sock;
1636 		transport->inet = sk;
1637 
1638 		write_unlock_bh(&sk->sk_callback_lock);
1639 	}
1640 	xs_udp_do_set_buffer_size(xprt);
1641 }
1642 
1643 /**
1644  * xs_udp_connect_worker4 - set up a UDP socket
1645  * @work: RPC transport to connect
1646  *
1647  * Invoked by a work queue tasklet.
1648  */
1649 static void xs_udp_connect_worker4(struct work_struct *work)
1650 {
1651 	struct sock_xprt *transport =
1652 		container_of(work, struct sock_xprt, connect_worker.work);
1653 	struct rpc_xprt *xprt = &transport->xprt;
1654 	struct socket *sock = transport->sock;
1655 	int err, status = -EIO;
1656 
1657 	if (xprt->shutdown)
1658 		goto out;
1659 
1660 	/* Start by resetting any existing state */
1661 	xs_reset_transport(transport);
1662 
1663 	err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock);
1664 	if (err < 0) {
1665 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1666 		goto out;
1667 	}
1668 	xs_reclassify_socket4(sock);
1669 
1670 	if (xs_bind4(transport, sock)) {
1671 		sock_release(sock);
1672 		goto out;
1673 	}
1674 
1675 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1676 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1677 
1678 	xs_udp_finish_connecting(xprt, sock);
1679 	status = 0;
1680 out:
1681 	xprt_clear_connecting(xprt);
1682 	xprt_wake_pending_tasks(xprt, status);
1683 }
1684 
1685 /**
1686  * xs_udp_connect_worker6 - set up a UDP socket
1687  * @work: RPC transport to connect
1688  *
1689  * Invoked by a work queue tasklet.
1690  */
1691 static void xs_udp_connect_worker6(struct work_struct *work)
1692 {
1693 	struct sock_xprt *transport =
1694 		container_of(work, struct sock_xprt, connect_worker.work);
1695 	struct rpc_xprt *xprt = &transport->xprt;
1696 	struct socket *sock = transport->sock;
1697 	int err, status = -EIO;
1698 
1699 	if (xprt->shutdown)
1700 		goto out;
1701 
1702 	/* Start by resetting any existing state */
1703 	xs_reset_transport(transport);
1704 
1705 	err = sock_create_kern(PF_INET6, SOCK_DGRAM, IPPROTO_UDP, &sock);
1706 	if (err < 0) {
1707 		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1708 		goto out;
1709 	}
1710 	xs_reclassify_socket6(sock);
1711 
1712 	if (xs_bind6(transport, sock) < 0) {
1713 		sock_release(sock);
1714 		goto out;
1715 	}
1716 
1717 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1718 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1719 
1720 	xs_udp_finish_connecting(xprt, sock);
1721 	status = 0;
1722 out:
1723 	xprt_clear_connecting(xprt);
1724 	xprt_wake_pending_tasks(xprt, status);
1725 }
1726 
1727 /*
1728  * We need to preserve the port number so the reply cache on the server can
1729  * find our cached RPC replies when we get around to reconnecting.
1730  */
1731 static void xs_abort_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1732 {
1733 	int result;
1734 	struct sockaddr any;
1735 
1736 	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1737 
1738 	/*
1739 	 * Disconnect the transport socket by doing a connect operation
1740 	 * with AF_UNSPEC.  This should return immediately...
1741 	 */
1742 	memset(&any, 0, sizeof(any));
1743 	any.sa_family = AF_UNSPEC;
1744 	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1745 	if (!result)
1746 		xs_sock_mark_closed(xprt);
1747 	else
1748 		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1749 				result);
1750 }
1751 
1752 static void xs_tcp_reuse_connection(struct rpc_xprt *xprt, struct sock_xprt *transport)
1753 {
1754 	unsigned int state = transport->inet->sk_state;
1755 
1756 	if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED)
1757 		return;
1758 	if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT))
1759 		return;
1760 	xs_abort_connection(xprt, transport);
1761 }
1762 
1763 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1764 {
1765 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1766 
1767 	if (!transport->inet) {
1768 		struct sock *sk = sock->sk;
1769 
1770 		write_lock_bh(&sk->sk_callback_lock);
1771 
1772 		xs_save_old_callbacks(transport, sk);
1773 
1774 		sk->sk_user_data = xprt;
1775 		sk->sk_data_ready = xs_tcp_data_ready;
1776 		sk->sk_state_change = xs_tcp_state_change;
1777 		sk->sk_write_space = xs_tcp_write_space;
1778 		sk->sk_error_report = xs_error_report;
1779 		sk->sk_allocation = GFP_ATOMIC;
1780 
1781 		/* socket options */
1782 		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1783 		sock_reset_flag(sk, SOCK_LINGER);
1784 		tcp_sk(sk)->linger2 = 0;
1785 		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1786 
1787 		xprt_clear_connected(xprt);
1788 
1789 		/* Reset to new socket */
1790 		transport->sock = sock;
1791 		transport->inet = sk;
1792 
1793 		write_unlock_bh(&sk->sk_callback_lock);
1794 	}
1795 
1796 	if (!xprt_bound(xprt))
1797 		return -ENOTCONN;
1798 
1799 	/* Tell the socket layer to start connecting... */
1800 	xprt->stat.connect_count++;
1801 	xprt->stat.connect_start = jiffies;
1802 	return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
1803 }
1804 
1805 /**
1806  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
1807  * @xprt: RPC transport to connect
1808  * @transport: socket transport to connect
1809  * @create_sock: function to create a socket of the correct type
1810  *
1811  * Invoked by a work queue tasklet.
1812  */
1813 static void xs_tcp_setup_socket(struct rpc_xprt *xprt,
1814 		struct sock_xprt *transport,
1815 		struct socket *(*create_sock)(struct rpc_xprt *,
1816 			struct sock_xprt *))
1817 {
1818 	struct socket *sock = transport->sock;
1819 	int status = -EIO;
1820 
1821 	if (xprt->shutdown)
1822 		goto out;
1823 
1824 	if (!sock) {
1825 		clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1826 		sock = create_sock(xprt, transport);
1827 		if (IS_ERR(sock)) {
1828 			status = PTR_ERR(sock);
1829 			goto out;
1830 		}
1831 	} else {
1832 		int abort_and_exit;
1833 
1834 		abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
1835 				&xprt->state);
1836 		/* "close" the socket, preserving the local port */
1837 		xs_tcp_reuse_connection(xprt, transport);
1838 
1839 		if (abort_and_exit)
1840 			goto out_eagain;
1841 	}
1842 
1843 	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1844 			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1845 
1846 	status = xs_tcp_finish_connecting(xprt, sock);
1847 	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1848 			xprt, -status, xprt_connected(xprt),
1849 			sock->sk->sk_state);
1850 	switch (status) {
1851 	default:
1852 		printk("%s: connect returned unhandled error %d\n",
1853 			__func__, status);
1854 	case -EADDRNOTAVAIL:
1855 		/* We're probably in TIME_WAIT. Get rid of existing socket,
1856 		 * and retry
1857 		 */
1858 		set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1859 		xprt_force_disconnect(xprt);
1860 	case -ECONNREFUSED:
1861 	case -ECONNRESET:
1862 	case -ENETUNREACH:
1863 		/* retry with existing socket, after a delay */
1864 	case 0:
1865 	case -EINPROGRESS:
1866 	case -EALREADY:
1867 		xprt_clear_connecting(xprt);
1868 		return;
1869 	}
1870 out_eagain:
1871 	status = -EAGAIN;
1872 out:
1873 	xprt_clear_connecting(xprt);
1874 	xprt_wake_pending_tasks(xprt, status);
1875 }
1876 
1877 static struct socket *xs_create_tcp_sock4(struct rpc_xprt *xprt,
1878 		struct sock_xprt *transport)
1879 {
1880 	struct socket *sock;
1881 	int err;
1882 
1883 	/* start from scratch */
1884 	err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);
1885 	if (err < 0) {
1886 		dprintk("RPC:       can't create TCP transport socket (%d).\n",
1887 				-err);
1888 		goto out_err;
1889 	}
1890 	xs_reclassify_socket4(sock);
1891 
1892 	if (xs_bind4(transport, sock) < 0) {
1893 		sock_release(sock);
1894 		goto out_err;
1895 	}
1896 	return sock;
1897 out_err:
1898 	return ERR_PTR(-EIO);
1899 }
1900 
1901 /**
1902  * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint
1903  * @work: RPC transport to connect
1904  *
1905  * Invoked by a work queue tasklet.
1906  */
1907 static void xs_tcp_connect_worker4(struct work_struct *work)
1908 {
1909 	struct sock_xprt *transport =
1910 		container_of(work, struct sock_xprt, connect_worker.work);
1911 	struct rpc_xprt *xprt = &transport->xprt;
1912 
1913 	xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock4);
1914 }
1915 
1916 static struct socket *xs_create_tcp_sock6(struct rpc_xprt *xprt,
1917 		struct sock_xprt *transport)
1918 {
1919 	struct socket *sock;
1920 	int err;
1921 
1922 	/* start from scratch */
1923 	err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock);
1924 	if (err < 0) {
1925 		dprintk("RPC:       can't create TCP transport socket (%d).\n",
1926 				-err);
1927 		goto out_err;
1928 	}
1929 	xs_reclassify_socket6(sock);
1930 
1931 	if (xs_bind6(transport, sock) < 0) {
1932 		sock_release(sock);
1933 		goto out_err;
1934 	}
1935 	return sock;
1936 out_err:
1937 	return ERR_PTR(-EIO);
1938 }
1939 
1940 /**
1941  * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint
1942  * @work: RPC transport to connect
1943  *
1944  * Invoked by a work queue tasklet.
1945  */
1946 static void xs_tcp_connect_worker6(struct work_struct *work)
1947 {
1948 	struct sock_xprt *transport =
1949 		container_of(work, struct sock_xprt, connect_worker.work);
1950 	struct rpc_xprt *xprt = &transport->xprt;
1951 
1952 	xs_tcp_setup_socket(xprt, transport, xs_create_tcp_sock6);
1953 }
1954 
1955 /**
1956  * xs_connect - connect a socket to a remote endpoint
1957  * @task: address of RPC task that manages state of connect request
1958  *
1959  * TCP: If the remote end dropped the connection, delay reconnecting.
1960  *
1961  * UDP socket connects are synchronous, but we use a work queue anyway
1962  * to guarantee that even unprivileged user processes can set up a
1963  * socket on a privileged port.
1964  *
1965  * If a UDP socket connect fails, the delay behavior here prevents
1966  * retry floods (hard mounts).
1967  */
1968 static void xs_connect(struct rpc_task *task)
1969 {
1970 	struct rpc_xprt *xprt = task->tk_xprt;
1971 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1972 
1973 	if (xprt_test_and_set_connecting(xprt))
1974 		return;
1975 
1976 	if (transport->sock != NULL) {
1977 		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
1978 				"seconds\n",
1979 				xprt, xprt->reestablish_timeout / HZ);
1980 		queue_delayed_work(rpciod_workqueue,
1981 				   &transport->connect_worker,
1982 				   xprt->reestablish_timeout);
1983 		xprt->reestablish_timeout <<= 1;
1984 		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1985 			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1986 	} else {
1987 		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1988 		queue_delayed_work(rpciod_workqueue,
1989 				   &transport->connect_worker, 0);
1990 	}
1991 }
1992 
1993 static void xs_tcp_connect(struct rpc_task *task)
1994 {
1995 	struct rpc_xprt *xprt = task->tk_xprt;
1996 
1997 	/* Exit if we need to wait for socket shutdown to complete */
1998 	if (test_bit(XPRT_CLOSING, &xprt->state))
1999 		return;
2000 	xs_connect(task);
2001 }
2002 
2003 /**
2004  * xs_udp_print_stats - display UDP socket-specifc stats
2005  * @xprt: rpc_xprt struct containing statistics
2006  * @seq: output file
2007  *
2008  */
2009 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2010 {
2011 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2012 
2013 	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2014 			transport->port,
2015 			xprt->stat.bind_count,
2016 			xprt->stat.sends,
2017 			xprt->stat.recvs,
2018 			xprt->stat.bad_xids,
2019 			xprt->stat.req_u,
2020 			xprt->stat.bklog_u);
2021 }
2022 
2023 /**
2024  * xs_tcp_print_stats - display TCP socket-specifc stats
2025  * @xprt: rpc_xprt struct containing statistics
2026  * @seq: output file
2027  *
2028  */
2029 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2030 {
2031 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2032 	long idle_time = 0;
2033 
2034 	if (xprt_connected(xprt))
2035 		idle_time = (long)(jiffies - xprt->last_used) / HZ;
2036 
2037 	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2038 			transport->port,
2039 			xprt->stat.bind_count,
2040 			xprt->stat.connect_count,
2041 			xprt->stat.connect_time,
2042 			idle_time,
2043 			xprt->stat.sends,
2044 			xprt->stat.recvs,
2045 			xprt->stat.bad_xids,
2046 			xprt->stat.req_u,
2047 			xprt->stat.bklog_u);
2048 }
2049 
2050 static struct rpc_xprt_ops xs_udp_ops = {
2051 	.set_buffer_size	= xs_udp_set_buffer_size,
2052 	.reserve_xprt		= xprt_reserve_xprt_cong,
2053 	.release_xprt		= xprt_release_xprt_cong,
2054 	.rpcbind		= rpcb_getport_async,
2055 	.set_port		= xs_set_port,
2056 	.connect		= xs_connect,
2057 	.buf_alloc		= rpc_malloc,
2058 	.buf_free		= rpc_free,
2059 	.send_request		= xs_udp_send_request,
2060 	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
2061 	.timer			= xs_udp_timer,
2062 	.release_request	= xprt_release_rqst_cong,
2063 	.close			= xs_close,
2064 	.destroy		= xs_destroy,
2065 	.print_stats		= xs_udp_print_stats,
2066 };
2067 
2068 static struct rpc_xprt_ops xs_tcp_ops = {
2069 	.reserve_xprt		= xprt_reserve_xprt,
2070 	.release_xprt		= xs_tcp_release_xprt,
2071 	.rpcbind		= rpcb_getport_async,
2072 	.set_port		= xs_set_port,
2073 	.connect		= xs_tcp_connect,
2074 	.buf_alloc		= rpc_malloc,
2075 	.buf_free		= rpc_free,
2076 	.send_request		= xs_tcp_send_request,
2077 	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
2078 	.close			= xs_tcp_close,
2079 	.destroy		= xs_destroy,
2080 	.print_stats		= xs_tcp_print_stats,
2081 };
2082 
2083 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2084 				      unsigned int slot_table_size)
2085 {
2086 	struct rpc_xprt *xprt;
2087 	struct sock_xprt *new;
2088 
2089 	if (args->addrlen > sizeof(xprt->addr)) {
2090 		dprintk("RPC:       xs_setup_xprt: address too large\n");
2091 		return ERR_PTR(-EBADF);
2092 	}
2093 
2094 	new = kzalloc(sizeof(*new), GFP_KERNEL);
2095 	if (new == NULL) {
2096 		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2097 				"rpc_xprt\n");
2098 		return ERR_PTR(-ENOMEM);
2099 	}
2100 	xprt = &new->xprt;
2101 
2102 	xprt->max_reqs = slot_table_size;
2103 	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
2104 	if (xprt->slot == NULL) {
2105 		kfree(xprt);
2106 		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
2107 				"table\n");
2108 		return ERR_PTR(-ENOMEM);
2109 	}
2110 
2111 	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2112 	xprt->addrlen = args->addrlen;
2113 	if (args->srcaddr)
2114 		memcpy(&new->addr, args->srcaddr, args->addrlen);
2115 
2116 	return xprt;
2117 }
2118 
2119 static const struct rpc_timeout xs_udp_default_timeout = {
2120 	.to_initval = 5 * HZ,
2121 	.to_maxval = 30 * HZ,
2122 	.to_increment = 5 * HZ,
2123 	.to_retries = 5,
2124 };
2125 
2126 /**
2127  * xs_setup_udp - Set up transport to use a UDP socket
2128  * @args: rpc transport creation arguments
2129  *
2130  */
2131 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2132 {
2133 	struct sockaddr *addr = args->dstaddr;
2134 	struct rpc_xprt *xprt;
2135 	struct sock_xprt *transport;
2136 
2137 	xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries);
2138 	if (IS_ERR(xprt))
2139 		return xprt;
2140 	transport = container_of(xprt, struct sock_xprt, xprt);
2141 
2142 	xprt->prot = IPPROTO_UDP;
2143 	xprt->tsh_size = 0;
2144 	/* XXX: header size can vary due to auth type, IPv6, etc. */
2145 	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2146 
2147 	xprt->bind_timeout = XS_BIND_TO;
2148 	xprt->connect_timeout = XS_UDP_CONN_TO;
2149 	xprt->reestablish_timeout = XS_UDP_REEST_TO;
2150 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2151 
2152 	xprt->ops = &xs_udp_ops;
2153 
2154 	xprt->timeout = &xs_udp_default_timeout;
2155 
2156 	switch (addr->sa_family) {
2157 	case AF_INET:
2158 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2159 			xprt_set_bound(xprt);
2160 
2161 		INIT_DELAYED_WORK(&transport->connect_worker,
2162 					xs_udp_connect_worker4);
2163 		xs_format_ipv4_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2164 		break;
2165 	case AF_INET6:
2166 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2167 			xprt_set_bound(xprt);
2168 
2169 		INIT_DELAYED_WORK(&transport->connect_worker,
2170 					xs_udp_connect_worker6);
2171 		xs_format_ipv6_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2172 		break;
2173 	default:
2174 		kfree(xprt);
2175 		return ERR_PTR(-EAFNOSUPPORT);
2176 	}
2177 
2178 	dprintk("RPC:       set up transport to address %s\n",
2179 			xprt->address_strings[RPC_DISPLAY_ALL]);
2180 
2181 	if (try_module_get(THIS_MODULE))
2182 		return xprt;
2183 
2184 	kfree(xprt->slot);
2185 	kfree(xprt);
2186 	return ERR_PTR(-EINVAL);
2187 }
2188 
2189 static const struct rpc_timeout xs_tcp_default_timeout = {
2190 	.to_initval = 60 * HZ,
2191 	.to_maxval = 60 * HZ,
2192 	.to_retries = 2,
2193 };
2194 
2195 /**
2196  * xs_setup_tcp - Set up transport to use a TCP socket
2197  * @args: rpc transport creation arguments
2198  *
2199  */
2200 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2201 {
2202 	struct sockaddr *addr = args->dstaddr;
2203 	struct rpc_xprt *xprt;
2204 	struct sock_xprt *transport;
2205 
2206 	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
2207 	if (IS_ERR(xprt))
2208 		return xprt;
2209 	transport = container_of(xprt, struct sock_xprt, xprt);
2210 
2211 	xprt->prot = IPPROTO_TCP;
2212 	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2213 	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2214 
2215 	xprt->bind_timeout = XS_BIND_TO;
2216 	xprt->connect_timeout = XS_TCP_CONN_TO;
2217 	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2218 	xprt->idle_timeout = XS_IDLE_DISC_TO;
2219 
2220 	xprt->ops = &xs_tcp_ops;
2221 	xprt->timeout = &xs_tcp_default_timeout;
2222 
2223 	switch (addr->sa_family) {
2224 	case AF_INET:
2225 		if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2226 			xprt_set_bound(xprt);
2227 
2228 		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4);
2229 		xs_format_ipv4_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2230 		break;
2231 	case AF_INET6:
2232 		if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2233 			xprt_set_bound(xprt);
2234 
2235 		INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6);
2236 		xs_format_ipv6_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2237 		break;
2238 	default:
2239 		kfree(xprt);
2240 		return ERR_PTR(-EAFNOSUPPORT);
2241 	}
2242 
2243 	dprintk("RPC:       set up transport to address %s\n",
2244 			xprt->address_strings[RPC_DISPLAY_ALL]);
2245 
2246 	if (try_module_get(THIS_MODULE))
2247 		return xprt;
2248 
2249 	kfree(xprt->slot);
2250 	kfree(xprt);
2251 	return ERR_PTR(-EINVAL);
2252 }
2253 
2254 static struct xprt_class	xs_udp_transport = {
2255 	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
2256 	.name		= "udp",
2257 	.owner		= THIS_MODULE,
2258 	.ident		= IPPROTO_UDP,
2259 	.setup		= xs_setup_udp,
2260 };
2261 
2262 static struct xprt_class	xs_tcp_transport = {
2263 	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
2264 	.name		= "tcp",
2265 	.owner		= THIS_MODULE,
2266 	.ident		= IPPROTO_TCP,
2267 	.setup		= xs_setup_tcp,
2268 };
2269 
2270 /**
2271  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2272  *
2273  */
2274 int init_socket_xprt(void)
2275 {
2276 #ifdef RPC_DEBUG
2277 	if (!sunrpc_table_header)
2278 		sunrpc_table_header = register_sysctl_table(sunrpc_table);
2279 #endif
2280 
2281 	xprt_register_transport(&xs_udp_transport);
2282 	xprt_register_transport(&xs_tcp_transport);
2283 
2284 	return 0;
2285 }
2286 
2287 /**
2288  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2289  *
2290  */
2291 void cleanup_socket_xprt(void)
2292 {
2293 #ifdef RPC_DEBUG
2294 	if (sunrpc_table_header) {
2295 		unregister_sysctl_table(sunrpc_table_header);
2296 		sunrpc_table_header = NULL;
2297 	}
2298 #endif
2299 
2300 	xprt_unregister_transport(&xs_udp_transport);
2301 	xprt_unregister_transport(&xs_tcp_transport);
2302 }
2303