1 /* 2 * linux/net/sunrpc/xprtsock.c 3 * 4 * Client-side transport implementation for sockets. 5 * 6 * TCP callback races fixes (C) 1998 Red Hat 7 * TCP send fixes (C) 1998 Red Hat 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 * 11 * Rewrite of larges part of the code in order to stabilize TCP stuff. 12 * Fix behaviour when socket buffer is full. 13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 14 * 15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 16 * 17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. 18 * <gilles.quillard@bull.net> 19 */ 20 21 #include <linux/types.h> 22 #include <linux/string.h> 23 #include <linux/slab.h> 24 #include <linux/module.h> 25 #include <linux/capability.h> 26 #include <linux/pagemap.h> 27 #include <linux/errno.h> 28 #include <linux/socket.h> 29 #include <linux/in.h> 30 #include <linux/net.h> 31 #include <linux/mm.h> 32 #include <linux/un.h> 33 #include <linux/udp.h> 34 #include <linux/tcp.h> 35 #include <linux/sunrpc/clnt.h> 36 #include <linux/sunrpc/sched.h> 37 #include <linux/sunrpc/svcsock.h> 38 #include <linux/sunrpc/xprtsock.h> 39 #include <linux/file.h> 40 #ifdef CONFIG_NFS_V4_1 41 #include <linux/sunrpc/bc_xprt.h> 42 #endif 43 44 #include <net/sock.h> 45 #include <net/checksum.h> 46 #include <net/udp.h> 47 #include <net/tcp.h> 48 49 #include "sunrpc.h" 50 51 static void xs_close(struct rpc_xprt *xprt); 52 53 /* 54 * xprtsock tunables 55 */ 56 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 57 unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE; 58 59 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 60 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 61 62 #define XS_TCP_LINGER_TO (15U * HZ) 63 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; 64 65 /* 66 * We can register our own files under /proc/sys/sunrpc by 67 * calling register_sysctl_table() again. The files in that 68 * directory become the union of all files registered there. 69 * 70 * We simply need to make sure that we don't collide with 71 * someone else's file names! 72 */ 73 74 #ifdef RPC_DEBUG 75 76 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; 77 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; 78 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; 79 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; 80 81 static struct ctl_table_header *sunrpc_table_header; 82 83 /* 84 * FIXME: changing the UDP slot table size should also resize the UDP 85 * socket buffers for existing UDP transports 86 */ 87 static ctl_table xs_tunables_table[] = { 88 { 89 .procname = "udp_slot_table_entries", 90 .data = &xprt_udp_slot_table_entries, 91 .maxlen = sizeof(unsigned int), 92 .mode = 0644, 93 .proc_handler = proc_dointvec_minmax, 94 .extra1 = &min_slot_table_size, 95 .extra2 = &max_slot_table_size 96 }, 97 { 98 .procname = "tcp_slot_table_entries", 99 .data = &xprt_tcp_slot_table_entries, 100 .maxlen = sizeof(unsigned int), 101 .mode = 0644, 102 .proc_handler = proc_dointvec_minmax, 103 .extra1 = &min_slot_table_size, 104 .extra2 = &max_slot_table_size 105 }, 106 { 107 .procname = "min_resvport", 108 .data = &xprt_min_resvport, 109 .maxlen = sizeof(unsigned int), 110 .mode = 0644, 111 .proc_handler = proc_dointvec_minmax, 112 .extra1 = &xprt_min_resvport_limit, 113 .extra2 = &xprt_max_resvport_limit 114 }, 115 { 116 .procname = "max_resvport", 117 .data = &xprt_max_resvport, 118 .maxlen = sizeof(unsigned int), 119 .mode = 0644, 120 .proc_handler = proc_dointvec_minmax, 121 .extra1 = &xprt_min_resvport_limit, 122 .extra2 = &xprt_max_resvport_limit 123 }, 124 { 125 .procname = "tcp_fin_timeout", 126 .data = &xs_tcp_fin_timeout, 127 .maxlen = sizeof(xs_tcp_fin_timeout), 128 .mode = 0644, 129 .proc_handler = proc_dointvec_jiffies, 130 }, 131 { }, 132 }; 133 134 static ctl_table sunrpc_table[] = { 135 { 136 .procname = "sunrpc", 137 .mode = 0555, 138 .child = xs_tunables_table 139 }, 140 { }, 141 }; 142 143 #endif 144 145 /* 146 * Wait duration for a reply from the RPC portmapper. 147 */ 148 #define XS_BIND_TO (60U * HZ) 149 150 /* 151 * Delay if a UDP socket connect error occurs. This is most likely some 152 * kind of resource problem on the local host. 153 */ 154 #define XS_UDP_REEST_TO (2U * HZ) 155 156 /* 157 * The reestablish timeout allows clients to delay for a bit before attempting 158 * to reconnect to a server that just dropped our connection. 159 * 160 * We implement an exponential backoff when trying to reestablish a TCP 161 * transport connection with the server. Some servers like to drop a TCP 162 * connection when they are overworked, so we start with a short timeout and 163 * increase over time if the server is down or not responding. 164 */ 165 #define XS_TCP_INIT_REEST_TO (3U * HZ) 166 #define XS_TCP_MAX_REEST_TO (5U * 60 * HZ) 167 168 /* 169 * TCP idle timeout; client drops the transport socket if it is idle 170 * for this long. Note that we also timeout UDP sockets to prevent 171 * holding port numbers when there is no RPC traffic. 172 */ 173 #define XS_IDLE_DISC_TO (5U * 60 * HZ) 174 175 #ifdef RPC_DEBUG 176 # undef RPC_DEBUG_DATA 177 # define RPCDBG_FACILITY RPCDBG_TRANS 178 #endif 179 180 #ifdef RPC_DEBUG_DATA 181 static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 182 { 183 u8 *buf = (u8 *) packet; 184 int j; 185 186 dprintk("RPC: %s\n", msg); 187 for (j = 0; j < count && j < 128; j += 4) { 188 if (!(j & 31)) { 189 if (j) 190 dprintk("\n"); 191 dprintk("0x%04x ", j); 192 } 193 dprintk("%02x%02x%02x%02x ", 194 buf[j], buf[j+1], buf[j+2], buf[j+3]); 195 } 196 dprintk("\n"); 197 } 198 #else 199 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 200 { 201 /* NOP */ 202 } 203 #endif 204 205 struct sock_xprt { 206 struct rpc_xprt xprt; 207 208 /* 209 * Network layer 210 */ 211 struct socket * sock; 212 struct sock * inet; 213 214 /* 215 * State of TCP reply receive 216 */ 217 __be32 tcp_fraghdr, 218 tcp_xid, 219 tcp_calldir; 220 221 u32 tcp_offset, 222 tcp_reclen; 223 224 unsigned long tcp_copied, 225 tcp_flags; 226 227 /* 228 * Connection of transports 229 */ 230 struct delayed_work connect_worker; 231 struct sockaddr_storage srcaddr; 232 unsigned short srcport; 233 234 /* 235 * UDP socket buffer size parameters 236 */ 237 size_t rcvsize, 238 sndsize; 239 240 /* 241 * Saved socket callback addresses 242 */ 243 void (*old_data_ready)(struct sock *, int); 244 void (*old_state_change)(struct sock *); 245 void (*old_write_space)(struct sock *); 246 void (*old_error_report)(struct sock *); 247 }; 248 249 /* 250 * TCP receive state flags 251 */ 252 #define TCP_RCV_LAST_FRAG (1UL << 0) 253 #define TCP_RCV_COPY_FRAGHDR (1UL << 1) 254 #define TCP_RCV_COPY_XID (1UL << 2) 255 #define TCP_RCV_COPY_DATA (1UL << 3) 256 #define TCP_RCV_READ_CALLDIR (1UL << 4) 257 #define TCP_RCV_COPY_CALLDIR (1UL << 5) 258 259 /* 260 * TCP RPC flags 261 */ 262 #define TCP_RPC_REPLY (1UL << 6) 263 264 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) 265 { 266 return (struct sockaddr *) &xprt->addr; 267 } 268 269 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt) 270 { 271 return (struct sockaddr_un *) &xprt->addr; 272 } 273 274 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) 275 { 276 return (struct sockaddr_in *) &xprt->addr; 277 } 278 279 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt) 280 { 281 return (struct sockaddr_in6 *) &xprt->addr; 282 } 283 284 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) 285 { 286 struct sockaddr *sap = xs_addr(xprt); 287 struct sockaddr_in6 *sin6; 288 struct sockaddr_in *sin; 289 struct sockaddr_un *sun; 290 char buf[128]; 291 292 switch (sap->sa_family) { 293 case AF_LOCAL: 294 sun = xs_addr_un(xprt); 295 strlcpy(buf, sun->sun_path, sizeof(buf)); 296 xprt->address_strings[RPC_DISPLAY_ADDR] = 297 kstrdup(buf, GFP_KERNEL); 298 break; 299 case AF_INET: 300 (void)rpc_ntop(sap, buf, sizeof(buf)); 301 xprt->address_strings[RPC_DISPLAY_ADDR] = 302 kstrdup(buf, GFP_KERNEL); 303 sin = xs_addr_in(xprt); 304 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); 305 break; 306 case AF_INET6: 307 (void)rpc_ntop(sap, buf, sizeof(buf)); 308 xprt->address_strings[RPC_DISPLAY_ADDR] = 309 kstrdup(buf, GFP_KERNEL); 310 sin6 = xs_addr_in6(xprt); 311 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 312 break; 313 default: 314 BUG(); 315 } 316 317 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 318 } 319 320 static void xs_format_common_peer_ports(struct rpc_xprt *xprt) 321 { 322 struct sockaddr *sap = xs_addr(xprt); 323 char buf[128]; 324 325 snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 326 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 327 328 snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 329 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 330 } 331 332 static void xs_format_peer_addresses(struct rpc_xprt *xprt, 333 const char *protocol, 334 const char *netid) 335 { 336 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 337 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 338 xs_format_common_peer_addresses(xprt); 339 xs_format_common_peer_ports(xprt); 340 } 341 342 static void xs_update_peer_port(struct rpc_xprt *xprt) 343 { 344 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); 345 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 346 347 xs_format_common_peer_ports(xprt); 348 } 349 350 static void xs_free_peer_addresses(struct rpc_xprt *xprt) 351 { 352 unsigned int i; 353 354 for (i = 0; i < RPC_DISPLAY_MAX; i++) 355 switch (i) { 356 case RPC_DISPLAY_PROTO: 357 case RPC_DISPLAY_NETID: 358 continue; 359 default: 360 kfree(xprt->address_strings[i]); 361 } 362 } 363 364 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 365 366 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) 367 { 368 struct msghdr msg = { 369 .msg_name = addr, 370 .msg_namelen = addrlen, 371 .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0), 372 }; 373 struct kvec iov = { 374 .iov_base = vec->iov_base + base, 375 .iov_len = vec->iov_len - base, 376 }; 377 378 if (iov.iov_len != 0) 379 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 380 return kernel_sendmsg(sock, &msg, NULL, 0, 0); 381 } 382 383 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) 384 { 385 struct page **ppage; 386 unsigned int remainder; 387 int err, sent = 0; 388 389 remainder = xdr->page_len - base; 390 base += xdr->page_base; 391 ppage = xdr->pages + (base >> PAGE_SHIFT); 392 base &= ~PAGE_MASK; 393 for(;;) { 394 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); 395 int flags = XS_SENDMSG_FLAGS; 396 397 remainder -= len; 398 if (remainder != 0 || more) 399 flags |= MSG_MORE; 400 err = sock->ops->sendpage(sock, *ppage, base, len, flags); 401 if (remainder == 0 || err != len) 402 break; 403 sent += err; 404 ppage++; 405 base = 0; 406 } 407 if (sent == 0) 408 return err; 409 if (err > 0) 410 sent += err; 411 return sent; 412 } 413 414 /** 415 * xs_sendpages - write pages directly to a socket 416 * @sock: socket to send on 417 * @addr: UDP only -- address of destination 418 * @addrlen: UDP only -- length of destination address 419 * @xdr: buffer containing this request 420 * @base: starting position in the buffer 421 * 422 */ 423 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 424 { 425 unsigned int remainder = xdr->len - base; 426 int err, sent = 0; 427 428 if (unlikely(!sock)) 429 return -ENOTSOCK; 430 431 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 432 if (base != 0) { 433 addr = NULL; 434 addrlen = 0; 435 } 436 437 if (base < xdr->head[0].iov_len || addr != NULL) { 438 unsigned int len = xdr->head[0].iov_len - base; 439 remainder -= len; 440 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); 441 if (remainder == 0 || err != len) 442 goto out; 443 sent += err; 444 base = 0; 445 } else 446 base -= xdr->head[0].iov_len; 447 448 if (base < xdr->page_len) { 449 unsigned int len = xdr->page_len - base; 450 remainder -= len; 451 err = xs_send_pagedata(sock, xdr, base, remainder != 0); 452 if (remainder == 0 || err != len) 453 goto out; 454 sent += err; 455 base = 0; 456 } else 457 base -= xdr->page_len; 458 459 if (base >= xdr->tail[0].iov_len) 460 return sent; 461 err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); 462 out: 463 if (sent == 0) 464 return err; 465 if (err > 0) 466 sent += err; 467 return sent; 468 } 469 470 static void xs_nospace_callback(struct rpc_task *task) 471 { 472 struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); 473 474 transport->inet->sk_write_pending--; 475 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 476 } 477 478 /** 479 * xs_nospace - place task on wait queue if transmit was incomplete 480 * @task: task to put to sleep 481 * 482 */ 483 static int xs_nospace(struct rpc_task *task) 484 { 485 struct rpc_rqst *req = task->tk_rqstp; 486 struct rpc_xprt *xprt = req->rq_xprt; 487 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 488 int ret = 0; 489 490 dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", 491 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 492 req->rq_slen); 493 494 /* Protect against races with write_space */ 495 spin_lock_bh(&xprt->transport_lock); 496 497 /* Don't race with disconnect */ 498 if (xprt_connected(xprt)) { 499 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) { 500 ret = -EAGAIN; 501 /* 502 * Notify TCP that we're limited by the application 503 * window size 504 */ 505 set_bit(SOCK_NOSPACE, &transport->sock->flags); 506 transport->inet->sk_write_pending++; 507 /* ...and wait for more buffer space */ 508 xprt_wait_for_buffer_space(task, xs_nospace_callback); 509 } 510 } else { 511 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 512 ret = -ENOTCONN; 513 } 514 515 spin_unlock_bh(&xprt->transport_lock); 516 return ret; 517 } 518 519 /* 520 * Construct a stream transport record marker in @buf. 521 */ 522 static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) 523 { 524 u32 reclen = buf->len - sizeof(rpc_fraghdr); 525 rpc_fraghdr *base = buf->head[0].iov_base; 526 *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); 527 } 528 529 /** 530 * xs_local_send_request - write an RPC request to an AF_LOCAL socket 531 * @task: RPC task that manages the state of an RPC request 532 * 533 * Return values: 534 * 0: The request has been sent 535 * EAGAIN: The socket was blocked, please call again later to 536 * complete the request 537 * ENOTCONN: Caller needs to invoke connect logic then call again 538 * other: Some other error occured, the request was not sent 539 */ 540 static int xs_local_send_request(struct rpc_task *task) 541 { 542 struct rpc_rqst *req = task->tk_rqstp; 543 struct rpc_xprt *xprt = req->rq_xprt; 544 struct sock_xprt *transport = 545 container_of(xprt, struct sock_xprt, xprt); 546 struct xdr_buf *xdr = &req->rq_snd_buf; 547 int status; 548 549 xs_encode_stream_record_marker(&req->rq_snd_buf); 550 551 xs_pktdump("packet data:", 552 req->rq_svec->iov_base, req->rq_svec->iov_len); 553 554 status = xs_sendpages(transport->sock, NULL, 0, 555 xdr, req->rq_bytes_sent); 556 dprintk("RPC: %s(%u) = %d\n", 557 __func__, xdr->len - req->rq_bytes_sent, status); 558 if (likely(status >= 0)) { 559 req->rq_bytes_sent += status; 560 req->rq_xmit_bytes_sent += status; 561 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 562 req->rq_bytes_sent = 0; 563 return 0; 564 } 565 status = -EAGAIN; 566 } 567 568 switch (status) { 569 case -EAGAIN: 570 status = xs_nospace(task); 571 break; 572 default: 573 dprintk("RPC: sendmsg returned unrecognized error %d\n", 574 -status); 575 case -EPIPE: 576 xs_close(xprt); 577 status = -ENOTCONN; 578 } 579 580 return status; 581 } 582 583 /** 584 * xs_udp_send_request - write an RPC request to a UDP socket 585 * @task: address of RPC task that manages the state of an RPC request 586 * 587 * Return values: 588 * 0: The request has been sent 589 * EAGAIN: The socket was blocked, please call again later to 590 * complete the request 591 * ENOTCONN: Caller needs to invoke connect logic then call again 592 * other: Some other error occurred, the request was not sent 593 */ 594 static int xs_udp_send_request(struct rpc_task *task) 595 { 596 struct rpc_rqst *req = task->tk_rqstp; 597 struct rpc_xprt *xprt = req->rq_xprt; 598 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 599 struct xdr_buf *xdr = &req->rq_snd_buf; 600 int status; 601 602 xs_pktdump("packet data:", 603 req->rq_svec->iov_base, 604 req->rq_svec->iov_len); 605 606 if (!xprt_bound(xprt)) 607 return -ENOTCONN; 608 status = xs_sendpages(transport->sock, 609 xs_addr(xprt), 610 xprt->addrlen, xdr, 611 req->rq_bytes_sent); 612 613 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 614 xdr->len - req->rq_bytes_sent, status); 615 616 if (status >= 0) { 617 req->rq_xmit_bytes_sent += status; 618 if (status >= req->rq_slen) 619 return 0; 620 /* Still some bytes left; set up for a retry later. */ 621 status = -EAGAIN; 622 } 623 624 switch (status) { 625 case -ENOTSOCK: 626 status = -ENOTCONN; 627 /* Should we call xs_close() here? */ 628 break; 629 case -EAGAIN: 630 status = xs_nospace(task); 631 break; 632 default: 633 dprintk("RPC: sendmsg returned unrecognized error %d\n", 634 -status); 635 case -ENETUNREACH: 636 case -EPIPE: 637 case -ECONNREFUSED: 638 /* When the server has died, an ICMP port unreachable message 639 * prompts ECONNREFUSED. */ 640 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 641 } 642 643 return status; 644 } 645 646 /** 647 * xs_tcp_shutdown - gracefully shut down a TCP socket 648 * @xprt: transport 649 * 650 * Initiates a graceful shutdown of the TCP socket by calling the 651 * equivalent of shutdown(SHUT_WR); 652 */ 653 static void xs_tcp_shutdown(struct rpc_xprt *xprt) 654 { 655 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 656 struct socket *sock = transport->sock; 657 658 if (sock != NULL) 659 kernel_sock_shutdown(sock, SHUT_WR); 660 } 661 662 /** 663 * xs_tcp_send_request - write an RPC request to a TCP socket 664 * @task: address of RPC task that manages the state of an RPC request 665 * 666 * Return values: 667 * 0: The request has been sent 668 * EAGAIN: The socket was blocked, please call again later to 669 * complete the request 670 * ENOTCONN: Caller needs to invoke connect logic then call again 671 * other: Some other error occurred, the request was not sent 672 * 673 * XXX: In the case of soft timeouts, should we eventually give up 674 * if sendmsg is not able to make progress? 675 */ 676 static int xs_tcp_send_request(struct rpc_task *task) 677 { 678 struct rpc_rqst *req = task->tk_rqstp; 679 struct rpc_xprt *xprt = req->rq_xprt; 680 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 681 struct xdr_buf *xdr = &req->rq_snd_buf; 682 int status; 683 684 xs_encode_stream_record_marker(&req->rq_snd_buf); 685 686 xs_pktdump("packet data:", 687 req->rq_svec->iov_base, 688 req->rq_svec->iov_len); 689 690 /* Continue transmitting the packet/record. We must be careful 691 * to cope with writespace callbacks arriving _after_ we have 692 * called sendmsg(). */ 693 while (1) { 694 status = xs_sendpages(transport->sock, 695 NULL, 0, xdr, req->rq_bytes_sent); 696 697 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 698 xdr->len - req->rq_bytes_sent, status); 699 700 if (unlikely(status < 0)) 701 break; 702 703 /* If we've sent the entire packet, immediately 704 * reset the count of bytes sent. */ 705 req->rq_bytes_sent += status; 706 req->rq_xmit_bytes_sent += status; 707 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 708 req->rq_bytes_sent = 0; 709 return 0; 710 } 711 712 if (status != 0) 713 continue; 714 status = -EAGAIN; 715 break; 716 } 717 718 switch (status) { 719 case -ENOTSOCK: 720 status = -ENOTCONN; 721 /* Should we call xs_close() here? */ 722 break; 723 case -EAGAIN: 724 status = xs_nospace(task); 725 break; 726 default: 727 dprintk("RPC: sendmsg returned unrecognized error %d\n", 728 -status); 729 case -ECONNRESET: 730 case -EPIPE: 731 xs_tcp_shutdown(xprt); 732 case -ECONNREFUSED: 733 case -ENOTCONN: 734 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 735 } 736 737 return status; 738 } 739 740 /** 741 * xs_tcp_release_xprt - clean up after a tcp transmission 742 * @xprt: transport 743 * @task: rpc task 744 * 745 * This cleans up if an error causes us to abort the transmission of a request. 746 * In this case, the socket may need to be reset in order to avoid confusing 747 * the server. 748 */ 749 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 750 { 751 struct rpc_rqst *req; 752 753 if (task != xprt->snd_task) 754 return; 755 if (task == NULL) 756 goto out_release; 757 req = task->tk_rqstp; 758 if (req->rq_bytes_sent == 0) 759 goto out_release; 760 if (req->rq_bytes_sent == req->rq_snd_buf.len) 761 goto out_release; 762 set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state); 763 out_release: 764 xprt_release_xprt(xprt, task); 765 } 766 767 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) 768 { 769 transport->old_data_ready = sk->sk_data_ready; 770 transport->old_state_change = sk->sk_state_change; 771 transport->old_write_space = sk->sk_write_space; 772 transport->old_error_report = sk->sk_error_report; 773 } 774 775 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) 776 { 777 sk->sk_data_ready = transport->old_data_ready; 778 sk->sk_state_change = transport->old_state_change; 779 sk->sk_write_space = transport->old_write_space; 780 sk->sk_error_report = transport->old_error_report; 781 } 782 783 static void xs_reset_transport(struct sock_xprt *transport) 784 { 785 struct socket *sock = transport->sock; 786 struct sock *sk = transport->inet; 787 788 if (sk == NULL) 789 return; 790 791 transport->srcport = 0; 792 793 write_lock_bh(&sk->sk_callback_lock); 794 transport->inet = NULL; 795 transport->sock = NULL; 796 797 sk->sk_user_data = NULL; 798 799 xs_restore_old_callbacks(transport, sk); 800 write_unlock_bh(&sk->sk_callback_lock); 801 802 sk->sk_no_check = 0; 803 804 sock_release(sock); 805 } 806 807 /** 808 * xs_close - close a socket 809 * @xprt: transport 810 * 811 * This is used when all requests are complete; ie, no DRC state remains 812 * on the server we want to save. 813 * 814 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with 815 * xs_reset_transport() zeroing the socket from underneath a writer. 816 */ 817 static void xs_close(struct rpc_xprt *xprt) 818 { 819 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 820 821 dprintk("RPC: xs_close xprt %p\n", xprt); 822 823 xs_reset_transport(transport); 824 xprt->reestablish_timeout = 0; 825 826 smp_mb__before_clear_bit(); 827 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 828 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 829 clear_bit(XPRT_CLOSING, &xprt->state); 830 smp_mb__after_clear_bit(); 831 xprt_disconnect_done(xprt); 832 } 833 834 static void xs_tcp_close(struct rpc_xprt *xprt) 835 { 836 if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) 837 xs_close(xprt); 838 else 839 xs_tcp_shutdown(xprt); 840 } 841 842 /** 843 * xs_destroy - prepare to shutdown a transport 844 * @xprt: doomed transport 845 * 846 */ 847 static void xs_destroy(struct rpc_xprt *xprt) 848 { 849 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 850 851 dprintk("RPC: xs_destroy xprt %p\n", xprt); 852 853 cancel_delayed_work_sync(&transport->connect_worker); 854 855 xs_close(xprt); 856 xs_free_peer_addresses(xprt); 857 xprt_free(xprt); 858 module_put(THIS_MODULE); 859 } 860 861 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 862 { 863 return (struct rpc_xprt *) sk->sk_user_data; 864 } 865 866 static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) 867 { 868 struct xdr_skb_reader desc = { 869 .skb = skb, 870 .offset = sizeof(rpc_fraghdr), 871 .count = skb->len - sizeof(rpc_fraghdr), 872 }; 873 874 if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0) 875 return -1; 876 if (desc.count) 877 return -1; 878 return 0; 879 } 880 881 /** 882 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets 883 * @sk: socket with data to read 884 * @len: how much data to read 885 * 886 * Currently this assumes we can read the whole reply in a single gulp. 887 */ 888 static void xs_local_data_ready(struct sock *sk, int len) 889 { 890 struct rpc_task *task; 891 struct rpc_xprt *xprt; 892 struct rpc_rqst *rovr; 893 struct sk_buff *skb; 894 int err, repsize, copied; 895 u32 _xid; 896 __be32 *xp; 897 898 read_lock_bh(&sk->sk_callback_lock); 899 dprintk("RPC: %s...\n", __func__); 900 xprt = xprt_from_sock(sk); 901 if (xprt == NULL) 902 goto out; 903 904 skb = skb_recv_datagram(sk, 0, 1, &err); 905 if (skb == NULL) 906 goto out; 907 908 if (xprt->shutdown) 909 goto dropit; 910 911 repsize = skb->len - sizeof(rpc_fraghdr); 912 if (repsize < 4) { 913 dprintk("RPC: impossible RPC reply size %d\n", repsize); 914 goto dropit; 915 } 916 917 /* Copy the XID from the skb... */ 918 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); 919 if (xp == NULL) 920 goto dropit; 921 922 /* Look up and lock the request corresponding to the given XID */ 923 spin_lock(&xprt->transport_lock); 924 rovr = xprt_lookup_rqst(xprt, *xp); 925 if (!rovr) 926 goto out_unlock; 927 task = rovr->rq_task; 928 929 copied = rovr->rq_private_buf.buflen; 930 if (copied > repsize) 931 copied = repsize; 932 933 if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { 934 dprintk("RPC: sk_buff copy failed\n"); 935 goto out_unlock; 936 } 937 938 xprt_complete_rqst(task, copied); 939 940 out_unlock: 941 spin_unlock(&xprt->transport_lock); 942 dropit: 943 skb_free_datagram(sk, skb); 944 out: 945 read_unlock_bh(&sk->sk_callback_lock); 946 } 947 948 /** 949 * xs_udp_data_ready - "data ready" callback for UDP sockets 950 * @sk: socket with data to read 951 * @len: how much data to read 952 * 953 */ 954 static void xs_udp_data_ready(struct sock *sk, int len) 955 { 956 struct rpc_task *task; 957 struct rpc_xprt *xprt; 958 struct rpc_rqst *rovr; 959 struct sk_buff *skb; 960 int err, repsize, copied; 961 u32 _xid; 962 __be32 *xp; 963 964 read_lock_bh(&sk->sk_callback_lock); 965 dprintk("RPC: xs_udp_data_ready...\n"); 966 if (!(xprt = xprt_from_sock(sk))) 967 goto out; 968 969 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 970 goto out; 971 972 if (xprt->shutdown) 973 goto dropit; 974 975 repsize = skb->len - sizeof(struct udphdr); 976 if (repsize < 4) { 977 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 978 goto dropit; 979 } 980 981 /* Copy the XID from the skb... */ 982 xp = skb_header_pointer(skb, sizeof(struct udphdr), 983 sizeof(_xid), &_xid); 984 if (xp == NULL) 985 goto dropit; 986 987 /* Look up and lock the request corresponding to the given XID */ 988 spin_lock(&xprt->transport_lock); 989 rovr = xprt_lookup_rqst(xprt, *xp); 990 if (!rovr) 991 goto out_unlock; 992 task = rovr->rq_task; 993 994 if ((copied = rovr->rq_private_buf.buflen) > repsize) 995 copied = repsize; 996 997 /* Suck it into the iovec, verify checksum if not done by hw. */ 998 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 999 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS); 1000 goto out_unlock; 1001 } 1002 1003 UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); 1004 1005 /* Something worked... */ 1006 dst_confirm(skb_dst(skb)); 1007 1008 xprt_adjust_cwnd(task, copied); 1009 xprt_complete_rqst(task, copied); 1010 1011 out_unlock: 1012 spin_unlock(&xprt->transport_lock); 1013 dropit: 1014 skb_free_datagram(sk, skb); 1015 out: 1016 read_unlock_bh(&sk->sk_callback_lock); 1017 } 1018 1019 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) 1020 { 1021 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1022 size_t len, used; 1023 char *p; 1024 1025 p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; 1026 len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; 1027 used = xdr_skb_read_bits(desc, p, len); 1028 transport->tcp_offset += used; 1029 if (used != len) 1030 return; 1031 1032 transport->tcp_reclen = ntohl(transport->tcp_fraghdr); 1033 if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) 1034 transport->tcp_flags |= TCP_RCV_LAST_FRAG; 1035 else 1036 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; 1037 transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; 1038 1039 transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; 1040 transport->tcp_offset = 0; 1041 1042 /* Sanity check of the record length */ 1043 if (unlikely(transport->tcp_reclen < 8)) { 1044 dprintk("RPC: invalid TCP record fragment length\n"); 1045 xprt_force_disconnect(xprt); 1046 return; 1047 } 1048 dprintk("RPC: reading TCP record fragment of length %d\n", 1049 transport->tcp_reclen); 1050 } 1051 1052 static void xs_tcp_check_fraghdr(struct sock_xprt *transport) 1053 { 1054 if (transport->tcp_offset == transport->tcp_reclen) { 1055 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; 1056 transport->tcp_offset = 0; 1057 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { 1058 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1059 transport->tcp_flags |= TCP_RCV_COPY_XID; 1060 transport->tcp_copied = 0; 1061 } 1062 } 1063 } 1064 1065 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1066 { 1067 size_t len, used; 1068 char *p; 1069 1070 len = sizeof(transport->tcp_xid) - transport->tcp_offset; 1071 dprintk("RPC: reading XID (%Zu bytes)\n", len); 1072 p = ((char *) &transport->tcp_xid) + transport->tcp_offset; 1073 used = xdr_skb_read_bits(desc, p, len); 1074 transport->tcp_offset += used; 1075 if (used != len) 1076 return; 1077 transport->tcp_flags &= ~TCP_RCV_COPY_XID; 1078 transport->tcp_flags |= TCP_RCV_READ_CALLDIR; 1079 transport->tcp_copied = 4; 1080 dprintk("RPC: reading %s XID %08x\n", 1081 (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" 1082 : "request with", 1083 ntohl(transport->tcp_xid)); 1084 xs_tcp_check_fraghdr(transport); 1085 } 1086 1087 static inline void xs_tcp_read_calldir(struct sock_xprt *transport, 1088 struct xdr_skb_reader *desc) 1089 { 1090 size_t len, used; 1091 u32 offset; 1092 char *p; 1093 1094 /* 1095 * We want transport->tcp_offset to be 8 at the end of this routine 1096 * (4 bytes for the xid and 4 bytes for the call/reply flag). 1097 * When this function is called for the first time, 1098 * transport->tcp_offset is 4 (after having already read the xid). 1099 */ 1100 offset = transport->tcp_offset - sizeof(transport->tcp_xid); 1101 len = sizeof(transport->tcp_calldir) - offset; 1102 dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); 1103 p = ((char *) &transport->tcp_calldir) + offset; 1104 used = xdr_skb_read_bits(desc, p, len); 1105 transport->tcp_offset += used; 1106 if (used != len) 1107 return; 1108 transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; 1109 /* 1110 * We don't yet have the XDR buffer, so we will write the calldir 1111 * out after we get the buffer from the 'struct rpc_rqst' 1112 */ 1113 switch (ntohl(transport->tcp_calldir)) { 1114 case RPC_REPLY: 1115 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; 1116 transport->tcp_flags |= TCP_RCV_COPY_DATA; 1117 transport->tcp_flags |= TCP_RPC_REPLY; 1118 break; 1119 case RPC_CALL: 1120 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; 1121 transport->tcp_flags |= TCP_RCV_COPY_DATA; 1122 transport->tcp_flags &= ~TCP_RPC_REPLY; 1123 break; 1124 default: 1125 dprintk("RPC: invalid request message type\n"); 1126 xprt_force_disconnect(&transport->xprt); 1127 } 1128 xs_tcp_check_fraghdr(transport); 1129 } 1130 1131 static inline void xs_tcp_read_common(struct rpc_xprt *xprt, 1132 struct xdr_skb_reader *desc, 1133 struct rpc_rqst *req) 1134 { 1135 struct sock_xprt *transport = 1136 container_of(xprt, struct sock_xprt, xprt); 1137 struct xdr_buf *rcvbuf; 1138 size_t len; 1139 ssize_t r; 1140 1141 rcvbuf = &req->rq_private_buf; 1142 1143 if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { 1144 /* 1145 * Save the RPC direction in the XDR buffer 1146 */ 1147 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, 1148 &transport->tcp_calldir, 1149 sizeof(transport->tcp_calldir)); 1150 transport->tcp_copied += sizeof(transport->tcp_calldir); 1151 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; 1152 } 1153 1154 len = desc->count; 1155 if (len > transport->tcp_reclen - transport->tcp_offset) { 1156 struct xdr_skb_reader my_desc; 1157 1158 len = transport->tcp_reclen - transport->tcp_offset; 1159 memcpy(&my_desc, desc, sizeof(my_desc)); 1160 my_desc.count = len; 1161 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1162 &my_desc, xdr_skb_read_bits); 1163 desc->count -= r; 1164 desc->offset += r; 1165 } else 1166 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1167 desc, xdr_skb_read_bits); 1168 1169 if (r > 0) { 1170 transport->tcp_copied += r; 1171 transport->tcp_offset += r; 1172 } 1173 if (r != len) { 1174 /* Error when copying to the receive buffer, 1175 * usually because we weren't able to allocate 1176 * additional buffer pages. All we can do now 1177 * is turn off TCP_RCV_COPY_DATA, so the request 1178 * will not receive any additional updates, 1179 * and time out. 1180 * Any remaining data from this record will 1181 * be discarded. 1182 */ 1183 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1184 dprintk("RPC: XID %08x truncated request\n", 1185 ntohl(transport->tcp_xid)); 1186 dprintk("RPC: xprt = %p, tcp_copied = %lu, " 1187 "tcp_offset = %u, tcp_reclen = %u\n", 1188 xprt, transport->tcp_copied, 1189 transport->tcp_offset, transport->tcp_reclen); 1190 return; 1191 } 1192 1193 dprintk("RPC: XID %08x read %Zd bytes\n", 1194 ntohl(transport->tcp_xid), r); 1195 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " 1196 "tcp_reclen = %u\n", xprt, transport->tcp_copied, 1197 transport->tcp_offset, transport->tcp_reclen); 1198 1199 if (transport->tcp_copied == req->rq_private_buf.buflen) 1200 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1201 else if (transport->tcp_offset == transport->tcp_reclen) { 1202 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) 1203 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1204 } 1205 } 1206 1207 /* 1208 * Finds the request corresponding to the RPC xid and invokes the common 1209 * tcp read code to read the data. 1210 */ 1211 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, 1212 struct xdr_skb_reader *desc) 1213 { 1214 struct sock_xprt *transport = 1215 container_of(xprt, struct sock_xprt, xprt); 1216 struct rpc_rqst *req; 1217 1218 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1219 1220 /* Find and lock the request corresponding to this xid */ 1221 spin_lock(&xprt->transport_lock); 1222 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1223 if (!req) { 1224 dprintk("RPC: XID %08x request not found!\n", 1225 ntohl(transport->tcp_xid)); 1226 spin_unlock(&xprt->transport_lock); 1227 return -1; 1228 } 1229 1230 xs_tcp_read_common(xprt, desc, req); 1231 1232 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1233 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1234 1235 spin_unlock(&xprt->transport_lock); 1236 return 0; 1237 } 1238 1239 #if defined(CONFIG_NFS_V4_1) 1240 /* 1241 * Obtains an rpc_rqst previously allocated and invokes the common 1242 * tcp read code to read the data. The result is placed in the callback 1243 * queue. 1244 * If we're unable to obtain the rpc_rqst we schedule the closing of the 1245 * connection and return -1. 1246 */ 1247 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, 1248 struct xdr_skb_reader *desc) 1249 { 1250 struct sock_xprt *transport = 1251 container_of(xprt, struct sock_xprt, xprt); 1252 struct rpc_rqst *req; 1253 1254 req = xprt_alloc_bc_request(xprt); 1255 if (req == NULL) { 1256 printk(KERN_WARNING "Callback slot table overflowed\n"); 1257 xprt_force_disconnect(xprt); 1258 return -1; 1259 } 1260 1261 req->rq_xid = transport->tcp_xid; 1262 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); 1263 xs_tcp_read_common(xprt, desc, req); 1264 1265 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { 1266 struct svc_serv *bc_serv = xprt->bc_serv; 1267 1268 /* 1269 * Add callback request to callback list. The callback 1270 * service sleeps on the sv_cb_waitq waiting for new 1271 * requests. Wake it up after adding enqueing the 1272 * request. 1273 */ 1274 dprintk("RPC: add callback request to list\n"); 1275 spin_lock(&bc_serv->sv_cb_lock); 1276 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 1277 spin_unlock(&bc_serv->sv_cb_lock); 1278 wake_up(&bc_serv->sv_cb_waitq); 1279 } 1280 1281 req->rq_private_buf.len = transport->tcp_copied; 1282 1283 return 0; 1284 } 1285 1286 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1287 struct xdr_skb_reader *desc) 1288 { 1289 struct sock_xprt *transport = 1290 container_of(xprt, struct sock_xprt, xprt); 1291 1292 return (transport->tcp_flags & TCP_RPC_REPLY) ? 1293 xs_tcp_read_reply(xprt, desc) : 1294 xs_tcp_read_callback(xprt, desc); 1295 } 1296 #else 1297 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1298 struct xdr_skb_reader *desc) 1299 { 1300 return xs_tcp_read_reply(xprt, desc); 1301 } 1302 #endif /* CONFIG_NFS_V4_1 */ 1303 1304 /* 1305 * Read data off the transport. This can be either an RPC_CALL or an 1306 * RPC_REPLY. Relay the processing to helper functions. 1307 */ 1308 static void xs_tcp_read_data(struct rpc_xprt *xprt, 1309 struct xdr_skb_reader *desc) 1310 { 1311 struct sock_xprt *transport = 1312 container_of(xprt, struct sock_xprt, xprt); 1313 1314 if (_xs_tcp_read_data(xprt, desc) == 0) 1315 xs_tcp_check_fraghdr(transport); 1316 else { 1317 /* 1318 * The transport_lock protects the request handling. 1319 * There's no need to hold it to update the tcp_flags. 1320 */ 1321 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1322 } 1323 } 1324 1325 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1326 { 1327 size_t len; 1328 1329 len = transport->tcp_reclen - transport->tcp_offset; 1330 if (len > desc->count) 1331 len = desc->count; 1332 desc->count -= len; 1333 desc->offset += len; 1334 transport->tcp_offset += len; 1335 dprintk("RPC: discarded %Zu bytes\n", len); 1336 xs_tcp_check_fraghdr(transport); 1337 } 1338 1339 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) 1340 { 1341 struct rpc_xprt *xprt = rd_desc->arg.data; 1342 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1343 struct xdr_skb_reader desc = { 1344 .skb = skb, 1345 .offset = offset, 1346 .count = len, 1347 }; 1348 1349 dprintk("RPC: xs_tcp_data_recv started\n"); 1350 do { 1351 /* Read in a new fragment marker if necessary */ 1352 /* Can we ever really expect to get completely empty fragments? */ 1353 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { 1354 xs_tcp_read_fraghdr(xprt, &desc); 1355 continue; 1356 } 1357 /* Read in the xid if necessary */ 1358 if (transport->tcp_flags & TCP_RCV_COPY_XID) { 1359 xs_tcp_read_xid(transport, &desc); 1360 continue; 1361 } 1362 /* Read in the call/reply flag */ 1363 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { 1364 xs_tcp_read_calldir(transport, &desc); 1365 continue; 1366 } 1367 /* Read in the request data */ 1368 if (transport->tcp_flags & TCP_RCV_COPY_DATA) { 1369 xs_tcp_read_data(xprt, &desc); 1370 continue; 1371 } 1372 /* Skip over any trailing bytes on short reads */ 1373 xs_tcp_read_discard(transport, &desc); 1374 } while (desc.count); 1375 dprintk("RPC: xs_tcp_data_recv done\n"); 1376 return len - desc.count; 1377 } 1378 1379 /** 1380 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1381 * @sk: socket with data to read 1382 * @bytes: how much data to read 1383 * 1384 */ 1385 static void xs_tcp_data_ready(struct sock *sk, int bytes) 1386 { 1387 struct rpc_xprt *xprt; 1388 read_descriptor_t rd_desc; 1389 int read; 1390 1391 dprintk("RPC: xs_tcp_data_ready...\n"); 1392 1393 read_lock_bh(&sk->sk_callback_lock); 1394 if (!(xprt = xprt_from_sock(sk))) 1395 goto out; 1396 if (xprt->shutdown) 1397 goto out; 1398 1399 /* Any data means we had a useful conversation, so 1400 * the we don't need to delay the next reconnect 1401 */ 1402 if (xprt->reestablish_timeout) 1403 xprt->reestablish_timeout = 0; 1404 1405 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 1406 rd_desc.arg.data = xprt; 1407 do { 1408 rd_desc.count = 65536; 1409 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 1410 } while (read > 0); 1411 out: 1412 read_unlock_bh(&sk->sk_callback_lock); 1413 } 1414 1415 /* 1416 * Do the equivalent of linger/linger2 handling for dealing with 1417 * broken servers that don't close the socket in a timely 1418 * fashion 1419 */ 1420 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, 1421 unsigned long timeout) 1422 { 1423 struct sock_xprt *transport; 1424 1425 if (xprt_test_and_set_connecting(xprt)) 1426 return; 1427 set_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1428 transport = container_of(xprt, struct sock_xprt, xprt); 1429 queue_delayed_work(rpciod_workqueue, &transport->connect_worker, 1430 timeout); 1431 } 1432 1433 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) 1434 { 1435 struct sock_xprt *transport; 1436 1437 transport = container_of(xprt, struct sock_xprt, xprt); 1438 1439 if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || 1440 !cancel_delayed_work(&transport->connect_worker)) 1441 return; 1442 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1443 xprt_clear_connecting(xprt); 1444 } 1445 1446 static void xs_sock_mark_closed(struct rpc_xprt *xprt) 1447 { 1448 smp_mb__before_clear_bit(); 1449 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1450 clear_bit(XPRT_CLOSING, &xprt->state); 1451 smp_mb__after_clear_bit(); 1452 /* Mark transport as closed and wake up all pending tasks */ 1453 xprt_disconnect_done(xprt); 1454 } 1455 1456 /** 1457 * xs_tcp_state_change - callback to handle TCP socket state changes 1458 * @sk: socket whose state has changed 1459 * 1460 */ 1461 static void xs_tcp_state_change(struct sock *sk) 1462 { 1463 struct rpc_xprt *xprt; 1464 1465 read_lock_bh(&sk->sk_callback_lock); 1466 if (!(xprt = xprt_from_sock(sk))) 1467 goto out; 1468 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 1469 dprintk("RPC: state %x conn %d dead %d zapped %d sk_shutdown %d\n", 1470 sk->sk_state, xprt_connected(xprt), 1471 sock_flag(sk, SOCK_DEAD), 1472 sock_flag(sk, SOCK_ZAPPED), 1473 sk->sk_shutdown); 1474 1475 switch (sk->sk_state) { 1476 case TCP_ESTABLISHED: 1477 spin_lock(&xprt->transport_lock); 1478 if (!xprt_test_and_set_connected(xprt)) { 1479 struct sock_xprt *transport = container_of(xprt, 1480 struct sock_xprt, xprt); 1481 1482 /* Reset TCP record info */ 1483 transport->tcp_offset = 0; 1484 transport->tcp_reclen = 0; 1485 transport->tcp_copied = 0; 1486 transport->tcp_flags = 1487 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; 1488 1489 xprt_wake_pending_tasks(xprt, -EAGAIN); 1490 } 1491 spin_unlock(&xprt->transport_lock); 1492 break; 1493 case TCP_FIN_WAIT1: 1494 /* The client initiated a shutdown of the socket */ 1495 xprt->connect_cookie++; 1496 xprt->reestablish_timeout = 0; 1497 set_bit(XPRT_CLOSING, &xprt->state); 1498 smp_mb__before_clear_bit(); 1499 clear_bit(XPRT_CONNECTED, &xprt->state); 1500 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1501 smp_mb__after_clear_bit(); 1502 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1503 break; 1504 case TCP_CLOSE_WAIT: 1505 /* The server initiated a shutdown of the socket */ 1506 xprt_force_disconnect(xprt); 1507 xprt->connect_cookie++; 1508 case TCP_CLOSING: 1509 /* 1510 * If the server closed down the connection, make sure that 1511 * we back off before reconnecting 1512 */ 1513 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 1514 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1515 break; 1516 case TCP_LAST_ACK: 1517 set_bit(XPRT_CLOSING, &xprt->state); 1518 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1519 smp_mb__before_clear_bit(); 1520 clear_bit(XPRT_CONNECTED, &xprt->state); 1521 smp_mb__after_clear_bit(); 1522 break; 1523 case TCP_CLOSE: 1524 xs_tcp_cancel_linger_timeout(xprt); 1525 xs_sock_mark_closed(xprt); 1526 } 1527 out: 1528 read_unlock_bh(&sk->sk_callback_lock); 1529 } 1530 1531 /** 1532 * xs_error_report - callback mainly for catching socket errors 1533 * @sk: socket 1534 */ 1535 static void xs_error_report(struct sock *sk) 1536 { 1537 struct rpc_xprt *xprt; 1538 1539 read_lock_bh(&sk->sk_callback_lock); 1540 if (!(xprt = xprt_from_sock(sk))) 1541 goto out; 1542 dprintk("RPC: %s client %p...\n" 1543 "RPC: error %d\n", 1544 __func__, xprt, sk->sk_err); 1545 xprt_wake_pending_tasks(xprt, -EAGAIN); 1546 out: 1547 read_unlock_bh(&sk->sk_callback_lock); 1548 } 1549 1550 static void xs_write_space(struct sock *sk) 1551 { 1552 struct socket *sock; 1553 struct rpc_xprt *xprt; 1554 1555 if (unlikely(!(sock = sk->sk_socket))) 1556 return; 1557 clear_bit(SOCK_NOSPACE, &sock->flags); 1558 1559 if (unlikely(!(xprt = xprt_from_sock(sk)))) 1560 return; 1561 if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) 1562 return; 1563 1564 xprt_write_space(xprt); 1565 } 1566 1567 /** 1568 * xs_udp_write_space - callback invoked when socket buffer space 1569 * becomes available 1570 * @sk: socket whose state has changed 1571 * 1572 * Called when more output buffer space is available for this socket. 1573 * We try not to wake our writers until they can make "significant" 1574 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1575 * with a bunch of small requests. 1576 */ 1577 static void xs_udp_write_space(struct sock *sk) 1578 { 1579 read_lock_bh(&sk->sk_callback_lock); 1580 1581 /* from net/core/sock.c:sock_def_write_space */ 1582 if (sock_writeable(sk)) 1583 xs_write_space(sk); 1584 1585 read_unlock_bh(&sk->sk_callback_lock); 1586 } 1587 1588 /** 1589 * xs_tcp_write_space - callback invoked when socket buffer space 1590 * becomes available 1591 * @sk: socket whose state has changed 1592 * 1593 * Called when more output buffer space is available for this socket. 1594 * We try not to wake our writers until they can make "significant" 1595 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1596 * with a bunch of small requests. 1597 */ 1598 static void xs_tcp_write_space(struct sock *sk) 1599 { 1600 read_lock_bh(&sk->sk_callback_lock); 1601 1602 /* from net/core/stream.c:sk_stream_write_space */ 1603 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) 1604 xs_write_space(sk); 1605 1606 read_unlock_bh(&sk->sk_callback_lock); 1607 } 1608 1609 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 1610 { 1611 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1612 struct sock *sk = transport->inet; 1613 1614 if (transport->rcvsize) { 1615 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1616 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; 1617 } 1618 if (transport->sndsize) { 1619 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1620 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; 1621 sk->sk_write_space(sk); 1622 } 1623 } 1624 1625 /** 1626 * xs_udp_set_buffer_size - set send and receive limits 1627 * @xprt: generic transport 1628 * @sndsize: requested size of send buffer, in bytes 1629 * @rcvsize: requested size of receive buffer, in bytes 1630 * 1631 * Set socket send and receive buffer size limits. 1632 */ 1633 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 1634 { 1635 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1636 1637 transport->sndsize = 0; 1638 if (sndsize) 1639 transport->sndsize = sndsize + 1024; 1640 transport->rcvsize = 0; 1641 if (rcvsize) 1642 transport->rcvsize = rcvsize + 1024; 1643 1644 xs_udp_do_set_buffer_size(xprt); 1645 } 1646 1647 /** 1648 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 1649 * @task: task that timed out 1650 * 1651 * Adjust the congestion window after a retransmit timeout has occurred. 1652 */ 1653 static void xs_udp_timer(struct rpc_task *task) 1654 { 1655 xprt_adjust_cwnd(task, -ETIMEDOUT); 1656 } 1657 1658 static unsigned short xs_get_random_port(void) 1659 { 1660 unsigned short range = xprt_max_resvport - xprt_min_resvport; 1661 unsigned short rand = (unsigned short) net_random() % range; 1662 return rand + xprt_min_resvport; 1663 } 1664 1665 /** 1666 * xs_set_port - reset the port number in the remote endpoint address 1667 * @xprt: generic transport 1668 * @port: new port number 1669 * 1670 */ 1671 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1672 { 1673 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1674 1675 rpc_set_port(xs_addr(xprt), port); 1676 xs_update_peer_port(xprt); 1677 } 1678 1679 static unsigned short xs_get_srcport(struct sock_xprt *transport) 1680 { 1681 unsigned short port = transport->srcport; 1682 1683 if (port == 0 && transport->xprt.resvport) 1684 port = xs_get_random_port(); 1685 return port; 1686 } 1687 1688 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) 1689 { 1690 if (transport->srcport != 0) 1691 transport->srcport = 0; 1692 if (!transport->xprt.resvport) 1693 return 0; 1694 if (port <= xprt_min_resvport || port > xprt_max_resvport) 1695 return xprt_max_resvport; 1696 return --port; 1697 } 1698 static int xs_bind(struct sock_xprt *transport, struct socket *sock) 1699 { 1700 struct sockaddr_storage myaddr; 1701 int err, nloop = 0; 1702 unsigned short port = xs_get_srcport(transport); 1703 unsigned short last; 1704 1705 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); 1706 do { 1707 rpc_set_port((struct sockaddr *)&myaddr, port); 1708 err = kernel_bind(sock, (struct sockaddr *)&myaddr, 1709 transport->xprt.addrlen); 1710 if (port == 0) 1711 break; 1712 if (err == 0) { 1713 transport->srcport = port; 1714 break; 1715 } 1716 last = port; 1717 port = xs_next_srcport(transport, port); 1718 if (port > last) 1719 nloop++; 1720 } while (err == -EADDRINUSE && nloop != 2); 1721 1722 if (myaddr.ss_family == AF_INET) 1723 dprintk("RPC: %s %pI4:%u: %s (%d)\n", __func__, 1724 &((struct sockaddr_in *)&myaddr)->sin_addr, 1725 port, err ? "failed" : "ok", err); 1726 else 1727 dprintk("RPC: %s %pI6:%u: %s (%d)\n", __func__, 1728 &((struct sockaddr_in6 *)&myaddr)->sin6_addr, 1729 port, err ? "failed" : "ok", err); 1730 return err; 1731 } 1732 1733 /* 1734 * We don't support autobind on AF_LOCAL sockets 1735 */ 1736 static void xs_local_rpcbind(struct rpc_task *task) 1737 { 1738 xprt_set_bound(task->tk_xprt); 1739 } 1740 1741 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) 1742 { 1743 } 1744 1745 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1746 static struct lock_class_key xs_key[2]; 1747 static struct lock_class_key xs_slock_key[2]; 1748 1749 static inline void xs_reclassify_socketu(struct socket *sock) 1750 { 1751 struct sock *sk = sock->sk; 1752 1753 BUG_ON(sock_owned_by_user(sk)); 1754 sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC", 1755 &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]); 1756 } 1757 1758 static inline void xs_reclassify_socket4(struct socket *sock) 1759 { 1760 struct sock *sk = sock->sk; 1761 1762 BUG_ON(sock_owned_by_user(sk)); 1763 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", 1764 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]); 1765 } 1766 1767 static inline void xs_reclassify_socket6(struct socket *sock) 1768 { 1769 struct sock *sk = sock->sk; 1770 1771 BUG_ON(sock_owned_by_user(sk)); 1772 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", 1773 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]); 1774 } 1775 1776 static inline void xs_reclassify_socket(int family, struct socket *sock) 1777 { 1778 switch (family) { 1779 case AF_LOCAL: 1780 xs_reclassify_socketu(sock); 1781 break; 1782 case AF_INET: 1783 xs_reclassify_socket4(sock); 1784 break; 1785 case AF_INET6: 1786 xs_reclassify_socket6(sock); 1787 break; 1788 } 1789 } 1790 #else 1791 static inline void xs_reclassify_socketu(struct socket *sock) 1792 { 1793 } 1794 1795 static inline void xs_reclassify_socket4(struct socket *sock) 1796 { 1797 } 1798 1799 static inline void xs_reclassify_socket6(struct socket *sock) 1800 { 1801 } 1802 1803 static inline void xs_reclassify_socket(int family, struct socket *sock) 1804 { 1805 } 1806 #endif 1807 1808 static struct socket *xs_create_sock(struct rpc_xprt *xprt, 1809 struct sock_xprt *transport, int family, int type, int protocol) 1810 { 1811 struct socket *sock; 1812 int err; 1813 1814 err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1); 1815 if (err < 0) { 1816 dprintk("RPC: can't create %d transport socket (%d).\n", 1817 protocol, -err); 1818 goto out; 1819 } 1820 xs_reclassify_socket(family, sock); 1821 1822 err = xs_bind(transport, sock); 1823 if (err) { 1824 sock_release(sock); 1825 goto out; 1826 } 1827 1828 return sock; 1829 out: 1830 return ERR_PTR(err); 1831 } 1832 1833 static int xs_local_finish_connecting(struct rpc_xprt *xprt, 1834 struct socket *sock) 1835 { 1836 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 1837 xprt); 1838 1839 if (!transport->inet) { 1840 struct sock *sk = sock->sk; 1841 1842 write_lock_bh(&sk->sk_callback_lock); 1843 1844 xs_save_old_callbacks(transport, sk); 1845 1846 sk->sk_user_data = xprt; 1847 sk->sk_data_ready = xs_local_data_ready; 1848 sk->sk_write_space = xs_udp_write_space; 1849 sk->sk_error_report = xs_error_report; 1850 sk->sk_allocation = GFP_ATOMIC; 1851 1852 xprt_clear_connected(xprt); 1853 1854 /* Reset to new socket */ 1855 transport->sock = sock; 1856 transport->inet = sk; 1857 1858 write_unlock_bh(&sk->sk_callback_lock); 1859 } 1860 1861 /* Tell the socket layer to start connecting... */ 1862 xprt->stat.connect_count++; 1863 xprt->stat.connect_start = jiffies; 1864 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); 1865 } 1866 1867 /** 1868 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint 1869 * @xprt: RPC transport to connect 1870 * @transport: socket transport to connect 1871 * @create_sock: function to create a socket of the correct type 1872 * 1873 * Invoked by a work queue tasklet. 1874 */ 1875 static void xs_local_setup_socket(struct work_struct *work) 1876 { 1877 struct sock_xprt *transport = 1878 container_of(work, struct sock_xprt, connect_worker.work); 1879 struct rpc_xprt *xprt = &transport->xprt; 1880 struct socket *sock; 1881 int status = -EIO; 1882 1883 if (xprt->shutdown) 1884 goto out; 1885 1886 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1887 status = __sock_create(xprt->xprt_net, AF_LOCAL, 1888 SOCK_STREAM, 0, &sock, 1); 1889 if (status < 0) { 1890 dprintk("RPC: can't create AF_LOCAL " 1891 "transport socket (%d).\n", -status); 1892 goto out; 1893 } 1894 xs_reclassify_socketu(sock); 1895 1896 dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", 1897 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1898 1899 status = xs_local_finish_connecting(xprt, sock); 1900 switch (status) { 1901 case 0: 1902 dprintk("RPC: xprt %p connected to %s\n", 1903 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1904 xprt_set_connected(xprt); 1905 break; 1906 case -ENOENT: 1907 dprintk("RPC: xprt %p: socket %s does not exist\n", 1908 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1909 break; 1910 default: 1911 printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n", 1912 __func__, -status, 1913 xprt->address_strings[RPC_DISPLAY_ADDR]); 1914 } 1915 1916 out: 1917 xprt_clear_connecting(xprt); 1918 xprt_wake_pending_tasks(xprt, status); 1919 } 1920 1921 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 1922 { 1923 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1924 1925 if (!transport->inet) { 1926 struct sock *sk = sock->sk; 1927 1928 write_lock_bh(&sk->sk_callback_lock); 1929 1930 xs_save_old_callbacks(transport, sk); 1931 1932 sk->sk_user_data = xprt; 1933 sk->sk_data_ready = xs_udp_data_ready; 1934 sk->sk_write_space = xs_udp_write_space; 1935 sk->sk_error_report = xs_error_report; 1936 sk->sk_no_check = UDP_CSUM_NORCV; 1937 sk->sk_allocation = GFP_ATOMIC; 1938 1939 xprt_set_connected(xprt); 1940 1941 /* Reset to new socket */ 1942 transport->sock = sock; 1943 transport->inet = sk; 1944 1945 write_unlock_bh(&sk->sk_callback_lock); 1946 } 1947 xs_udp_do_set_buffer_size(xprt); 1948 } 1949 1950 static void xs_udp_setup_socket(struct work_struct *work) 1951 { 1952 struct sock_xprt *transport = 1953 container_of(work, struct sock_xprt, connect_worker.work); 1954 struct rpc_xprt *xprt = &transport->xprt; 1955 struct socket *sock = transport->sock; 1956 int status = -EIO; 1957 1958 if (xprt->shutdown) 1959 goto out; 1960 1961 /* Start by resetting any existing state */ 1962 xs_reset_transport(transport); 1963 sock = xs_create_sock(xprt, transport, 1964 xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP); 1965 if (IS_ERR(sock)) 1966 goto out; 1967 1968 dprintk("RPC: worker connecting xprt %p via %s to " 1969 "%s (port %s)\n", xprt, 1970 xprt->address_strings[RPC_DISPLAY_PROTO], 1971 xprt->address_strings[RPC_DISPLAY_ADDR], 1972 xprt->address_strings[RPC_DISPLAY_PORT]); 1973 1974 xs_udp_finish_connecting(xprt, sock); 1975 status = 0; 1976 out: 1977 xprt_clear_connecting(xprt); 1978 xprt_wake_pending_tasks(xprt, status); 1979 } 1980 1981 /* 1982 * We need to preserve the port number so the reply cache on the server can 1983 * find our cached RPC replies when we get around to reconnecting. 1984 */ 1985 static void xs_abort_connection(struct sock_xprt *transport) 1986 { 1987 int result; 1988 struct sockaddr any; 1989 1990 dprintk("RPC: disconnecting xprt %p to reuse port\n", transport); 1991 1992 /* 1993 * Disconnect the transport socket by doing a connect operation 1994 * with AF_UNSPEC. This should return immediately... 1995 */ 1996 memset(&any, 0, sizeof(any)); 1997 any.sa_family = AF_UNSPEC; 1998 result = kernel_connect(transport->sock, &any, sizeof(any), 0); 1999 if (!result) 2000 xs_sock_mark_closed(&transport->xprt); 2001 else 2002 dprintk("RPC: AF_UNSPEC connect return code %d\n", 2003 result); 2004 } 2005 2006 static void xs_tcp_reuse_connection(struct sock_xprt *transport) 2007 { 2008 unsigned int state = transport->inet->sk_state; 2009 2010 if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) { 2011 /* we don't need to abort the connection if the socket 2012 * hasn't undergone a shutdown 2013 */ 2014 if (transport->inet->sk_shutdown == 0) 2015 return; 2016 dprintk("RPC: %s: TCP_CLOSEd and sk_shutdown set to %d\n", 2017 __func__, transport->inet->sk_shutdown); 2018 } 2019 if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) { 2020 /* we don't need to abort the connection if the socket 2021 * hasn't undergone a shutdown 2022 */ 2023 if (transport->inet->sk_shutdown == 0) 2024 return; 2025 dprintk("RPC: %s: ESTABLISHED/SYN_SENT " 2026 "sk_shutdown set to %d\n", 2027 __func__, transport->inet->sk_shutdown); 2028 } 2029 xs_abort_connection(transport); 2030 } 2031 2032 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 2033 { 2034 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2035 int ret = -ENOTCONN; 2036 2037 if (!transport->inet) { 2038 struct sock *sk = sock->sk; 2039 2040 write_lock_bh(&sk->sk_callback_lock); 2041 2042 xs_save_old_callbacks(transport, sk); 2043 2044 sk->sk_user_data = xprt; 2045 sk->sk_data_ready = xs_tcp_data_ready; 2046 sk->sk_state_change = xs_tcp_state_change; 2047 sk->sk_write_space = xs_tcp_write_space; 2048 sk->sk_error_report = xs_error_report; 2049 sk->sk_allocation = GFP_ATOMIC; 2050 2051 /* socket options */ 2052 sk->sk_userlocks |= SOCK_BINDPORT_LOCK; 2053 sock_reset_flag(sk, SOCK_LINGER); 2054 tcp_sk(sk)->linger2 = 0; 2055 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; 2056 2057 xprt_clear_connected(xprt); 2058 2059 /* Reset to new socket */ 2060 transport->sock = sock; 2061 transport->inet = sk; 2062 2063 write_unlock_bh(&sk->sk_callback_lock); 2064 } 2065 2066 if (!xprt_bound(xprt)) 2067 goto out; 2068 2069 /* Tell the socket layer to start connecting... */ 2070 xprt->stat.connect_count++; 2071 xprt->stat.connect_start = jiffies; 2072 ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); 2073 switch (ret) { 2074 case 0: 2075 case -EINPROGRESS: 2076 /* SYN_SENT! */ 2077 xprt->connect_cookie++; 2078 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2079 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2080 } 2081 out: 2082 return ret; 2083 } 2084 2085 /** 2086 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint 2087 * @xprt: RPC transport to connect 2088 * @transport: socket transport to connect 2089 * @create_sock: function to create a socket of the correct type 2090 * 2091 * Invoked by a work queue tasklet. 2092 */ 2093 static void xs_tcp_setup_socket(struct work_struct *work) 2094 { 2095 struct sock_xprt *transport = 2096 container_of(work, struct sock_xprt, connect_worker.work); 2097 struct socket *sock = transport->sock; 2098 struct rpc_xprt *xprt = &transport->xprt; 2099 int status = -EIO; 2100 2101 if (xprt->shutdown) 2102 goto out; 2103 2104 if (!sock) { 2105 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 2106 sock = xs_create_sock(xprt, transport, 2107 xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP); 2108 if (IS_ERR(sock)) { 2109 status = PTR_ERR(sock); 2110 goto out; 2111 } 2112 } else { 2113 int abort_and_exit; 2114 2115 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, 2116 &xprt->state); 2117 /* "close" the socket, preserving the local port */ 2118 xs_tcp_reuse_connection(transport); 2119 2120 if (abort_and_exit) 2121 goto out_eagain; 2122 } 2123 2124 dprintk("RPC: worker connecting xprt %p via %s to " 2125 "%s (port %s)\n", xprt, 2126 xprt->address_strings[RPC_DISPLAY_PROTO], 2127 xprt->address_strings[RPC_DISPLAY_ADDR], 2128 xprt->address_strings[RPC_DISPLAY_PORT]); 2129 2130 status = xs_tcp_finish_connecting(xprt, sock); 2131 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 2132 xprt, -status, xprt_connected(xprt), 2133 sock->sk->sk_state); 2134 switch (status) { 2135 default: 2136 printk("%s: connect returned unhandled error %d\n", 2137 __func__, status); 2138 case -EADDRNOTAVAIL: 2139 /* We're probably in TIME_WAIT. Get rid of existing socket, 2140 * and retry 2141 */ 2142 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 2143 xprt_force_disconnect(xprt); 2144 break; 2145 case -ECONNREFUSED: 2146 case -ECONNRESET: 2147 case -ENETUNREACH: 2148 /* retry with existing socket, after a delay */ 2149 case 0: 2150 case -EINPROGRESS: 2151 case -EALREADY: 2152 xprt_clear_connecting(xprt); 2153 return; 2154 case -EINVAL: 2155 /* Happens, for instance, if the user specified a link 2156 * local IPv6 address without a scope-id. 2157 */ 2158 goto out; 2159 } 2160 out_eagain: 2161 status = -EAGAIN; 2162 out: 2163 xprt_clear_connecting(xprt); 2164 xprt_wake_pending_tasks(xprt, status); 2165 } 2166 2167 /** 2168 * xs_connect - connect a socket to a remote endpoint 2169 * @task: address of RPC task that manages state of connect request 2170 * 2171 * TCP: If the remote end dropped the connection, delay reconnecting. 2172 * 2173 * UDP socket connects are synchronous, but we use a work queue anyway 2174 * to guarantee that even unprivileged user processes can set up a 2175 * socket on a privileged port. 2176 * 2177 * If a UDP socket connect fails, the delay behavior here prevents 2178 * retry floods (hard mounts). 2179 */ 2180 static void xs_connect(struct rpc_task *task) 2181 { 2182 struct rpc_xprt *xprt = task->tk_xprt; 2183 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2184 2185 if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { 2186 dprintk("RPC: xs_connect delayed xprt %p for %lu " 2187 "seconds\n", 2188 xprt, xprt->reestablish_timeout / HZ); 2189 queue_delayed_work(rpciod_workqueue, 2190 &transport->connect_worker, 2191 xprt->reestablish_timeout); 2192 xprt->reestablish_timeout <<= 1; 2193 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2194 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2195 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) 2196 xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; 2197 } else { 2198 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2199 queue_delayed_work(rpciod_workqueue, 2200 &transport->connect_worker, 0); 2201 } 2202 } 2203 2204 /** 2205 * xs_local_print_stats - display AF_LOCAL socket-specifc stats 2206 * @xprt: rpc_xprt struct containing statistics 2207 * @seq: output file 2208 * 2209 */ 2210 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2211 { 2212 long idle_time = 0; 2213 2214 if (xprt_connected(xprt)) 2215 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2216 2217 seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu " 2218 "%llu %llu\n", 2219 xprt->stat.bind_count, 2220 xprt->stat.connect_count, 2221 xprt->stat.connect_time, 2222 idle_time, 2223 xprt->stat.sends, 2224 xprt->stat.recvs, 2225 xprt->stat.bad_xids, 2226 xprt->stat.req_u, 2227 xprt->stat.bklog_u); 2228 } 2229 2230 /** 2231 * xs_udp_print_stats - display UDP socket-specifc stats 2232 * @xprt: rpc_xprt struct containing statistics 2233 * @seq: output file 2234 * 2235 */ 2236 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2237 { 2238 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2239 2240 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n", 2241 transport->srcport, 2242 xprt->stat.bind_count, 2243 xprt->stat.sends, 2244 xprt->stat.recvs, 2245 xprt->stat.bad_xids, 2246 xprt->stat.req_u, 2247 xprt->stat.bklog_u); 2248 } 2249 2250 /** 2251 * xs_tcp_print_stats - display TCP socket-specifc stats 2252 * @xprt: rpc_xprt struct containing statistics 2253 * @seq: output file 2254 * 2255 */ 2256 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2257 { 2258 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2259 long idle_time = 0; 2260 2261 if (xprt_connected(xprt)) 2262 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2263 2264 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n", 2265 transport->srcport, 2266 xprt->stat.bind_count, 2267 xprt->stat.connect_count, 2268 xprt->stat.connect_time, 2269 idle_time, 2270 xprt->stat.sends, 2271 xprt->stat.recvs, 2272 xprt->stat.bad_xids, 2273 xprt->stat.req_u, 2274 xprt->stat.bklog_u); 2275 } 2276 2277 /* 2278 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason 2279 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want 2280 * to use the server side send routines. 2281 */ 2282 static void *bc_malloc(struct rpc_task *task, size_t size) 2283 { 2284 struct page *page; 2285 struct rpc_buffer *buf; 2286 2287 BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer)); 2288 page = alloc_page(GFP_KERNEL); 2289 2290 if (!page) 2291 return NULL; 2292 2293 buf = page_address(page); 2294 buf->len = PAGE_SIZE; 2295 2296 return buf->data; 2297 } 2298 2299 /* 2300 * Free the space allocated in the bc_alloc routine 2301 */ 2302 static void bc_free(void *buffer) 2303 { 2304 struct rpc_buffer *buf; 2305 2306 if (!buffer) 2307 return; 2308 2309 buf = container_of(buffer, struct rpc_buffer, data); 2310 free_page((unsigned long)buf); 2311 } 2312 2313 /* 2314 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex 2315 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request. 2316 */ 2317 static int bc_sendto(struct rpc_rqst *req) 2318 { 2319 int len; 2320 struct xdr_buf *xbufp = &req->rq_snd_buf; 2321 struct rpc_xprt *xprt = req->rq_xprt; 2322 struct sock_xprt *transport = 2323 container_of(xprt, struct sock_xprt, xprt); 2324 struct socket *sock = transport->sock; 2325 unsigned long headoff; 2326 unsigned long tailoff; 2327 2328 xs_encode_stream_record_marker(xbufp); 2329 2330 tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK; 2331 headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK; 2332 len = svc_send_common(sock, xbufp, 2333 virt_to_page(xbufp->head[0].iov_base), headoff, 2334 xbufp->tail[0].iov_base, tailoff); 2335 2336 if (len != xbufp->len) { 2337 printk(KERN_NOTICE "Error sending entire callback!\n"); 2338 len = -EAGAIN; 2339 } 2340 2341 return len; 2342 } 2343 2344 /* 2345 * The send routine. Borrows from svc_send 2346 */ 2347 static int bc_send_request(struct rpc_task *task) 2348 { 2349 struct rpc_rqst *req = task->tk_rqstp; 2350 struct svc_xprt *xprt; 2351 struct svc_sock *svsk; 2352 u32 len; 2353 2354 dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); 2355 /* 2356 * Get the server socket associated with this callback xprt 2357 */ 2358 xprt = req->rq_xprt->bc_xprt; 2359 svsk = container_of(xprt, struct svc_sock, sk_xprt); 2360 2361 /* 2362 * Grab the mutex to serialize data as the connection is shared 2363 * with the fore channel 2364 */ 2365 if (!mutex_trylock(&xprt->xpt_mutex)) { 2366 rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL); 2367 if (!mutex_trylock(&xprt->xpt_mutex)) 2368 return -EAGAIN; 2369 rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task); 2370 } 2371 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 2372 len = -ENOTCONN; 2373 else 2374 len = bc_sendto(req); 2375 mutex_unlock(&xprt->xpt_mutex); 2376 2377 if (len > 0) 2378 len = 0; 2379 2380 return len; 2381 } 2382 2383 /* 2384 * The close routine. Since this is client initiated, we do nothing 2385 */ 2386 2387 static void bc_close(struct rpc_xprt *xprt) 2388 { 2389 } 2390 2391 /* 2392 * The xprt destroy routine. Again, because this connection is client 2393 * initiated, we do nothing 2394 */ 2395 2396 static void bc_destroy(struct rpc_xprt *xprt) 2397 { 2398 } 2399 2400 static struct rpc_xprt_ops xs_local_ops = { 2401 .reserve_xprt = xprt_reserve_xprt, 2402 .release_xprt = xs_tcp_release_xprt, 2403 .rpcbind = xs_local_rpcbind, 2404 .set_port = xs_local_set_port, 2405 .connect = xs_connect, 2406 .buf_alloc = rpc_malloc, 2407 .buf_free = rpc_free, 2408 .send_request = xs_local_send_request, 2409 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2410 .close = xs_close, 2411 .destroy = xs_destroy, 2412 .print_stats = xs_local_print_stats, 2413 }; 2414 2415 static struct rpc_xprt_ops xs_udp_ops = { 2416 .set_buffer_size = xs_udp_set_buffer_size, 2417 .reserve_xprt = xprt_reserve_xprt_cong, 2418 .release_xprt = xprt_release_xprt_cong, 2419 .rpcbind = rpcb_getport_async, 2420 .set_port = xs_set_port, 2421 .connect = xs_connect, 2422 .buf_alloc = rpc_malloc, 2423 .buf_free = rpc_free, 2424 .send_request = xs_udp_send_request, 2425 .set_retrans_timeout = xprt_set_retrans_timeout_rtt, 2426 .timer = xs_udp_timer, 2427 .release_request = xprt_release_rqst_cong, 2428 .close = xs_close, 2429 .destroy = xs_destroy, 2430 .print_stats = xs_udp_print_stats, 2431 }; 2432 2433 static struct rpc_xprt_ops xs_tcp_ops = { 2434 .reserve_xprt = xprt_reserve_xprt, 2435 .release_xprt = xs_tcp_release_xprt, 2436 .rpcbind = rpcb_getport_async, 2437 .set_port = xs_set_port, 2438 .connect = xs_connect, 2439 .buf_alloc = rpc_malloc, 2440 .buf_free = rpc_free, 2441 .send_request = xs_tcp_send_request, 2442 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2443 .close = xs_tcp_close, 2444 .destroy = xs_destroy, 2445 .print_stats = xs_tcp_print_stats, 2446 }; 2447 2448 /* 2449 * The rpc_xprt_ops for the server backchannel 2450 */ 2451 2452 static struct rpc_xprt_ops bc_tcp_ops = { 2453 .reserve_xprt = xprt_reserve_xprt, 2454 .release_xprt = xprt_release_xprt, 2455 .buf_alloc = bc_malloc, 2456 .buf_free = bc_free, 2457 .send_request = bc_send_request, 2458 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2459 .close = bc_close, 2460 .destroy = bc_destroy, 2461 .print_stats = xs_tcp_print_stats, 2462 }; 2463 2464 static int xs_init_anyaddr(const int family, struct sockaddr *sap) 2465 { 2466 static const struct sockaddr_in sin = { 2467 .sin_family = AF_INET, 2468 .sin_addr.s_addr = htonl(INADDR_ANY), 2469 }; 2470 static const struct sockaddr_in6 sin6 = { 2471 .sin6_family = AF_INET6, 2472 .sin6_addr = IN6ADDR_ANY_INIT, 2473 }; 2474 2475 switch (family) { 2476 case AF_LOCAL: 2477 break; 2478 case AF_INET: 2479 memcpy(sap, &sin, sizeof(sin)); 2480 break; 2481 case AF_INET6: 2482 memcpy(sap, &sin6, sizeof(sin6)); 2483 break; 2484 default: 2485 dprintk("RPC: %s: Bad address family\n", __func__); 2486 return -EAFNOSUPPORT; 2487 } 2488 return 0; 2489 } 2490 2491 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, 2492 unsigned int slot_table_size) 2493 { 2494 struct rpc_xprt *xprt; 2495 struct sock_xprt *new; 2496 2497 if (args->addrlen > sizeof(xprt->addr)) { 2498 dprintk("RPC: xs_setup_xprt: address too large\n"); 2499 return ERR_PTR(-EBADF); 2500 } 2501 2502 xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size); 2503 if (xprt == NULL) { 2504 dprintk("RPC: xs_setup_xprt: couldn't allocate " 2505 "rpc_xprt\n"); 2506 return ERR_PTR(-ENOMEM); 2507 } 2508 2509 new = container_of(xprt, struct sock_xprt, xprt); 2510 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2511 xprt->addrlen = args->addrlen; 2512 if (args->srcaddr) 2513 memcpy(&new->srcaddr, args->srcaddr, args->addrlen); 2514 else { 2515 int err; 2516 err = xs_init_anyaddr(args->dstaddr->sa_family, 2517 (struct sockaddr *)&new->srcaddr); 2518 if (err != 0) 2519 return ERR_PTR(err); 2520 } 2521 2522 return xprt; 2523 } 2524 2525 static const struct rpc_timeout xs_local_default_timeout = { 2526 .to_initval = 10 * HZ, 2527 .to_maxval = 10 * HZ, 2528 .to_retries = 2, 2529 }; 2530 2531 /** 2532 * xs_setup_local - Set up transport to use an AF_LOCAL socket 2533 * @args: rpc transport creation arguments 2534 * 2535 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP 2536 */ 2537 static struct rpc_xprt *xs_setup_local(struct xprt_create *args) 2538 { 2539 struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr; 2540 struct sock_xprt *transport; 2541 struct rpc_xprt *xprt; 2542 struct rpc_xprt *ret; 2543 2544 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); 2545 if (IS_ERR(xprt)) 2546 return xprt; 2547 transport = container_of(xprt, struct sock_xprt, xprt); 2548 2549 xprt->prot = 0; 2550 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2551 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2552 2553 xprt->bind_timeout = XS_BIND_TO; 2554 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2555 xprt->idle_timeout = XS_IDLE_DISC_TO; 2556 2557 xprt->ops = &xs_local_ops; 2558 xprt->timeout = &xs_local_default_timeout; 2559 2560 switch (sun->sun_family) { 2561 case AF_LOCAL: 2562 if (sun->sun_path[0] != '/') { 2563 dprintk("RPC: bad AF_LOCAL address: %s\n", 2564 sun->sun_path); 2565 ret = ERR_PTR(-EINVAL); 2566 goto out_err; 2567 } 2568 xprt_set_bound(xprt); 2569 INIT_DELAYED_WORK(&transport->connect_worker, 2570 xs_local_setup_socket); 2571 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); 2572 break; 2573 default: 2574 ret = ERR_PTR(-EAFNOSUPPORT); 2575 goto out_err; 2576 } 2577 2578 dprintk("RPC: set up xprt to %s via AF_LOCAL\n", 2579 xprt->address_strings[RPC_DISPLAY_ADDR]); 2580 2581 if (try_module_get(THIS_MODULE)) 2582 return xprt; 2583 ret = ERR_PTR(-EINVAL); 2584 out_err: 2585 xprt_free(xprt); 2586 return ret; 2587 } 2588 2589 static const struct rpc_timeout xs_udp_default_timeout = { 2590 .to_initval = 5 * HZ, 2591 .to_maxval = 30 * HZ, 2592 .to_increment = 5 * HZ, 2593 .to_retries = 5, 2594 }; 2595 2596 /** 2597 * xs_setup_udp - Set up transport to use a UDP socket 2598 * @args: rpc transport creation arguments 2599 * 2600 */ 2601 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) 2602 { 2603 struct sockaddr *addr = args->dstaddr; 2604 struct rpc_xprt *xprt; 2605 struct sock_xprt *transport; 2606 struct rpc_xprt *ret; 2607 2608 xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries); 2609 if (IS_ERR(xprt)) 2610 return xprt; 2611 transport = container_of(xprt, struct sock_xprt, xprt); 2612 2613 xprt->prot = IPPROTO_UDP; 2614 xprt->tsh_size = 0; 2615 /* XXX: header size can vary due to auth type, IPv6, etc. */ 2616 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 2617 2618 xprt->bind_timeout = XS_BIND_TO; 2619 xprt->reestablish_timeout = XS_UDP_REEST_TO; 2620 xprt->idle_timeout = XS_IDLE_DISC_TO; 2621 2622 xprt->ops = &xs_udp_ops; 2623 2624 xprt->timeout = &xs_udp_default_timeout; 2625 2626 switch (addr->sa_family) { 2627 case AF_INET: 2628 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2629 xprt_set_bound(xprt); 2630 2631 INIT_DELAYED_WORK(&transport->connect_worker, 2632 xs_udp_setup_socket); 2633 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2634 break; 2635 case AF_INET6: 2636 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2637 xprt_set_bound(xprt); 2638 2639 INIT_DELAYED_WORK(&transport->connect_worker, 2640 xs_udp_setup_socket); 2641 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2642 break; 2643 default: 2644 ret = ERR_PTR(-EAFNOSUPPORT); 2645 goto out_err; 2646 } 2647 2648 if (xprt_bound(xprt)) 2649 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2650 xprt->address_strings[RPC_DISPLAY_ADDR], 2651 xprt->address_strings[RPC_DISPLAY_PORT], 2652 xprt->address_strings[RPC_DISPLAY_PROTO]); 2653 else 2654 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 2655 xprt->address_strings[RPC_DISPLAY_ADDR], 2656 xprt->address_strings[RPC_DISPLAY_PROTO]); 2657 2658 if (try_module_get(THIS_MODULE)) 2659 return xprt; 2660 ret = ERR_PTR(-EINVAL); 2661 out_err: 2662 xprt_free(xprt); 2663 return ret; 2664 } 2665 2666 static const struct rpc_timeout xs_tcp_default_timeout = { 2667 .to_initval = 60 * HZ, 2668 .to_maxval = 60 * HZ, 2669 .to_retries = 2, 2670 }; 2671 2672 /** 2673 * xs_setup_tcp - Set up transport to use a TCP socket 2674 * @args: rpc transport creation arguments 2675 * 2676 */ 2677 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) 2678 { 2679 struct sockaddr *addr = args->dstaddr; 2680 struct rpc_xprt *xprt; 2681 struct sock_xprt *transport; 2682 struct rpc_xprt *ret; 2683 2684 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); 2685 if (IS_ERR(xprt)) 2686 return xprt; 2687 transport = container_of(xprt, struct sock_xprt, xprt); 2688 2689 xprt->prot = IPPROTO_TCP; 2690 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2691 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2692 2693 xprt->bind_timeout = XS_BIND_TO; 2694 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2695 xprt->idle_timeout = XS_IDLE_DISC_TO; 2696 2697 xprt->ops = &xs_tcp_ops; 2698 xprt->timeout = &xs_tcp_default_timeout; 2699 2700 switch (addr->sa_family) { 2701 case AF_INET: 2702 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2703 xprt_set_bound(xprt); 2704 2705 INIT_DELAYED_WORK(&transport->connect_worker, 2706 xs_tcp_setup_socket); 2707 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2708 break; 2709 case AF_INET6: 2710 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2711 xprt_set_bound(xprt); 2712 2713 INIT_DELAYED_WORK(&transport->connect_worker, 2714 xs_tcp_setup_socket); 2715 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2716 break; 2717 default: 2718 ret = ERR_PTR(-EAFNOSUPPORT); 2719 goto out_err; 2720 } 2721 2722 if (xprt_bound(xprt)) 2723 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2724 xprt->address_strings[RPC_DISPLAY_ADDR], 2725 xprt->address_strings[RPC_DISPLAY_PORT], 2726 xprt->address_strings[RPC_DISPLAY_PROTO]); 2727 else 2728 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 2729 xprt->address_strings[RPC_DISPLAY_ADDR], 2730 xprt->address_strings[RPC_DISPLAY_PROTO]); 2731 2732 2733 if (try_module_get(THIS_MODULE)) 2734 return xprt; 2735 ret = ERR_PTR(-EINVAL); 2736 out_err: 2737 xprt_free(xprt); 2738 return ret; 2739 } 2740 2741 /** 2742 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket 2743 * @args: rpc transport creation arguments 2744 * 2745 */ 2746 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) 2747 { 2748 struct sockaddr *addr = args->dstaddr; 2749 struct rpc_xprt *xprt; 2750 struct sock_xprt *transport; 2751 struct svc_sock *bc_sock; 2752 struct rpc_xprt *ret; 2753 2754 if (args->bc_xprt->xpt_bc_xprt) { 2755 /* 2756 * This server connection already has a backchannel 2757 * export; we can't create a new one, as we wouldn't be 2758 * able to match replies based on xid any more. So, 2759 * reuse the already-existing one: 2760 */ 2761 return args->bc_xprt->xpt_bc_xprt; 2762 } 2763 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); 2764 if (IS_ERR(xprt)) 2765 return xprt; 2766 transport = container_of(xprt, struct sock_xprt, xprt); 2767 2768 xprt->prot = IPPROTO_TCP; 2769 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2770 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2771 xprt->timeout = &xs_tcp_default_timeout; 2772 2773 /* backchannel */ 2774 xprt_set_bound(xprt); 2775 xprt->bind_timeout = 0; 2776 xprt->reestablish_timeout = 0; 2777 xprt->idle_timeout = 0; 2778 2779 xprt->ops = &bc_tcp_ops; 2780 2781 switch (addr->sa_family) { 2782 case AF_INET: 2783 xs_format_peer_addresses(xprt, "tcp", 2784 RPCBIND_NETID_TCP); 2785 break; 2786 case AF_INET6: 2787 xs_format_peer_addresses(xprt, "tcp", 2788 RPCBIND_NETID_TCP6); 2789 break; 2790 default: 2791 ret = ERR_PTR(-EAFNOSUPPORT); 2792 goto out_err; 2793 } 2794 2795 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2796 xprt->address_strings[RPC_DISPLAY_ADDR], 2797 xprt->address_strings[RPC_DISPLAY_PORT], 2798 xprt->address_strings[RPC_DISPLAY_PROTO]); 2799 2800 /* 2801 * Once we've associated a backchannel xprt with a connection, 2802 * we want to keep it around as long as long as the connection 2803 * lasts, in case we need to start using it for a backchannel 2804 * again; this reference won't be dropped until bc_xprt is 2805 * destroyed. 2806 */ 2807 xprt_get(xprt); 2808 args->bc_xprt->xpt_bc_xprt = xprt; 2809 xprt->bc_xprt = args->bc_xprt; 2810 bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt); 2811 transport->sock = bc_sock->sk_sock; 2812 transport->inet = bc_sock->sk_sk; 2813 2814 /* 2815 * Since we don't want connections for the backchannel, we set 2816 * the xprt status to connected 2817 */ 2818 xprt_set_connected(xprt); 2819 2820 2821 if (try_module_get(THIS_MODULE)) 2822 return xprt; 2823 xprt_put(xprt); 2824 ret = ERR_PTR(-EINVAL); 2825 out_err: 2826 xprt_free(xprt); 2827 return ret; 2828 } 2829 2830 static struct xprt_class xs_local_transport = { 2831 .list = LIST_HEAD_INIT(xs_local_transport.list), 2832 .name = "named UNIX socket", 2833 .owner = THIS_MODULE, 2834 .ident = XPRT_TRANSPORT_LOCAL, 2835 .setup = xs_setup_local, 2836 }; 2837 2838 static struct xprt_class xs_udp_transport = { 2839 .list = LIST_HEAD_INIT(xs_udp_transport.list), 2840 .name = "udp", 2841 .owner = THIS_MODULE, 2842 .ident = XPRT_TRANSPORT_UDP, 2843 .setup = xs_setup_udp, 2844 }; 2845 2846 static struct xprt_class xs_tcp_transport = { 2847 .list = LIST_HEAD_INIT(xs_tcp_transport.list), 2848 .name = "tcp", 2849 .owner = THIS_MODULE, 2850 .ident = XPRT_TRANSPORT_TCP, 2851 .setup = xs_setup_tcp, 2852 }; 2853 2854 static struct xprt_class xs_bc_tcp_transport = { 2855 .list = LIST_HEAD_INIT(xs_bc_tcp_transport.list), 2856 .name = "tcp NFSv4.1 backchannel", 2857 .owner = THIS_MODULE, 2858 .ident = XPRT_TRANSPORT_BC_TCP, 2859 .setup = xs_setup_bc_tcp, 2860 }; 2861 2862 /** 2863 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client 2864 * 2865 */ 2866 int init_socket_xprt(void) 2867 { 2868 #ifdef RPC_DEBUG 2869 if (!sunrpc_table_header) 2870 sunrpc_table_header = register_sysctl_table(sunrpc_table); 2871 #endif 2872 2873 xprt_register_transport(&xs_local_transport); 2874 xprt_register_transport(&xs_udp_transport); 2875 xprt_register_transport(&xs_tcp_transport); 2876 xprt_register_transport(&xs_bc_tcp_transport); 2877 2878 return 0; 2879 } 2880 2881 /** 2882 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister 2883 * 2884 */ 2885 void cleanup_socket_xprt(void) 2886 { 2887 #ifdef RPC_DEBUG 2888 if (sunrpc_table_header) { 2889 unregister_sysctl_table(sunrpc_table_header); 2890 sunrpc_table_header = NULL; 2891 } 2892 #endif 2893 2894 xprt_unregister_transport(&xs_local_transport); 2895 xprt_unregister_transport(&xs_udp_transport); 2896 xprt_unregister_transport(&xs_tcp_transport); 2897 xprt_unregister_transport(&xs_bc_tcp_transport); 2898 } 2899 2900 static int param_set_uint_minmax(const char *val, 2901 const struct kernel_param *kp, 2902 unsigned int min, unsigned int max) 2903 { 2904 unsigned long num; 2905 int ret; 2906 2907 if (!val) 2908 return -EINVAL; 2909 ret = strict_strtoul(val, 0, &num); 2910 if (ret == -EINVAL || num < min || num > max) 2911 return -EINVAL; 2912 *((unsigned int *)kp->arg) = num; 2913 return 0; 2914 } 2915 2916 static int param_set_portnr(const char *val, const struct kernel_param *kp) 2917 { 2918 return param_set_uint_minmax(val, kp, 2919 RPC_MIN_RESVPORT, 2920 RPC_MAX_RESVPORT); 2921 } 2922 2923 static struct kernel_param_ops param_ops_portnr = { 2924 .set = param_set_portnr, 2925 .get = param_get_uint, 2926 }; 2927 2928 #define param_check_portnr(name, p) \ 2929 __param_check(name, p, unsigned int); 2930 2931 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644); 2932 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644); 2933 2934 static int param_set_slot_table_size(const char *val, 2935 const struct kernel_param *kp) 2936 { 2937 return param_set_uint_minmax(val, kp, 2938 RPC_MIN_SLOT_TABLE, 2939 RPC_MAX_SLOT_TABLE); 2940 } 2941 2942 static struct kernel_param_ops param_ops_slot_table_size = { 2943 .set = param_set_slot_table_size, 2944 .get = param_get_uint, 2945 }; 2946 2947 #define param_check_slot_table_size(name, p) \ 2948 __param_check(name, p, unsigned int); 2949 2950 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries, 2951 slot_table_size, 0644); 2952 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries, 2953 slot_table_size, 0644); 2954 2955