1 /* 2 * linux/net/sunrpc/xprtsock.c 3 * 4 * Client-side transport implementation for sockets. 5 * 6 * TCP callback races fixes (C) 1998 Red Hat 7 * TCP send fixes (C) 1998 Red Hat 8 * TCP NFS related read + write fixes 9 * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10 * 11 * Rewrite of larges part of the code in order to stabilize TCP stuff. 12 * Fix behaviour when socket buffer is full. 13 * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 14 * 15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com> 16 * 17 * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005. 18 * <gilles.quillard@bull.net> 19 */ 20 21 #include <linux/types.h> 22 #include <linux/string.h> 23 #include <linux/slab.h> 24 #include <linux/module.h> 25 #include <linux/capability.h> 26 #include <linux/pagemap.h> 27 #include <linux/errno.h> 28 #include <linux/socket.h> 29 #include <linux/in.h> 30 #include <linux/net.h> 31 #include <linux/mm.h> 32 #include <linux/un.h> 33 #include <linux/udp.h> 34 #include <linux/tcp.h> 35 #include <linux/sunrpc/clnt.h> 36 #include <linux/sunrpc/addr.h> 37 #include <linux/sunrpc/sched.h> 38 #include <linux/sunrpc/svcsock.h> 39 #include <linux/sunrpc/xprtsock.h> 40 #include <linux/file.h> 41 #ifdef CONFIG_SUNRPC_BACKCHANNEL 42 #include <linux/sunrpc/bc_xprt.h> 43 #endif 44 45 #include <net/sock.h> 46 #include <net/checksum.h> 47 #include <net/udp.h> 48 #include <net/tcp.h> 49 50 #include <trace/events/sunrpc.h> 51 52 #include "sunrpc.h" 53 54 static void xs_close(struct rpc_xprt *xprt); 55 56 /* 57 * xprtsock tunables 58 */ 59 static unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE; 60 static unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE; 61 static unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE; 62 63 static unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT; 64 static unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT; 65 66 #define XS_TCP_LINGER_TO (15U * HZ) 67 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO; 68 69 /* 70 * We can register our own files under /proc/sys/sunrpc by 71 * calling register_sysctl_table() again. The files in that 72 * directory become the union of all files registered there. 73 * 74 * We simply need to make sure that we don't collide with 75 * someone else's file names! 76 */ 77 78 #ifdef RPC_DEBUG 79 80 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE; 81 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE; 82 static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT; 83 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT; 84 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT; 85 86 static struct ctl_table_header *sunrpc_table_header; 87 88 /* 89 * FIXME: changing the UDP slot table size should also resize the UDP 90 * socket buffers for existing UDP transports 91 */ 92 static struct ctl_table xs_tunables_table[] = { 93 { 94 .procname = "udp_slot_table_entries", 95 .data = &xprt_udp_slot_table_entries, 96 .maxlen = sizeof(unsigned int), 97 .mode = 0644, 98 .proc_handler = proc_dointvec_minmax, 99 .extra1 = &min_slot_table_size, 100 .extra2 = &max_slot_table_size 101 }, 102 { 103 .procname = "tcp_slot_table_entries", 104 .data = &xprt_tcp_slot_table_entries, 105 .maxlen = sizeof(unsigned int), 106 .mode = 0644, 107 .proc_handler = proc_dointvec_minmax, 108 .extra1 = &min_slot_table_size, 109 .extra2 = &max_slot_table_size 110 }, 111 { 112 .procname = "tcp_max_slot_table_entries", 113 .data = &xprt_max_tcp_slot_table_entries, 114 .maxlen = sizeof(unsigned int), 115 .mode = 0644, 116 .proc_handler = proc_dointvec_minmax, 117 .extra1 = &min_slot_table_size, 118 .extra2 = &max_tcp_slot_table_limit 119 }, 120 { 121 .procname = "min_resvport", 122 .data = &xprt_min_resvport, 123 .maxlen = sizeof(unsigned int), 124 .mode = 0644, 125 .proc_handler = proc_dointvec_minmax, 126 .extra1 = &xprt_min_resvport_limit, 127 .extra2 = &xprt_max_resvport_limit 128 }, 129 { 130 .procname = "max_resvport", 131 .data = &xprt_max_resvport, 132 .maxlen = sizeof(unsigned int), 133 .mode = 0644, 134 .proc_handler = proc_dointvec_minmax, 135 .extra1 = &xprt_min_resvport_limit, 136 .extra2 = &xprt_max_resvport_limit 137 }, 138 { 139 .procname = "tcp_fin_timeout", 140 .data = &xs_tcp_fin_timeout, 141 .maxlen = sizeof(xs_tcp_fin_timeout), 142 .mode = 0644, 143 .proc_handler = proc_dointvec_jiffies, 144 }, 145 { }, 146 }; 147 148 static struct ctl_table sunrpc_table[] = { 149 { 150 .procname = "sunrpc", 151 .mode = 0555, 152 .child = xs_tunables_table 153 }, 154 { }, 155 }; 156 157 #endif 158 159 /* 160 * Wait duration for a reply from the RPC portmapper. 161 */ 162 #define XS_BIND_TO (60U * HZ) 163 164 /* 165 * Delay if a UDP socket connect error occurs. This is most likely some 166 * kind of resource problem on the local host. 167 */ 168 #define XS_UDP_REEST_TO (2U * HZ) 169 170 /* 171 * The reestablish timeout allows clients to delay for a bit before attempting 172 * to reconnect to a server that just dropped our connection. 173 * 174 * We implement an exponential backoff when trying to reestablish a TCP 175 * transport connection with the server. Some servers like to drop a TCP 176 * connection when they are overworked, so we start with a short timeout and 177 * increase over time if the server is down or not responding. 178 */ 179 #define XS_TCP_INIT_REEST_TO (3U * HZ) 180 #define XS_TCP_MAX_REEST_TO (5U * 60 * HZ) 181 182 /* 183 * TCP idle timeout; client drops the transport socket if it is idle 184 * for this long. Note that we also timeout UDP sockets to prevent 185 * holding port numbers when there is no RPC traffic. 186 */ 187 #define XS_IDLE_DISC_TO (5U * 60 * HZ) 188 189 #ifdef RPC_DEBUG 190 # undef RPC_DEBUG_DATA 191 # define RPCDBG_FACILITY RPCDBG_TRANS 192 #endif 193 194 #ifdef RPC_DEBUG_DATA 195 static void xs_pktdump(char *msg, u32 *packet, unsigned int count) 196 { 197 u8 *buf = (u8 *) packet; 198 int j; 199 200 dprintk("RPC: %s\n", msg); 201 for (j = 0; j < count && j < 128; j += 4) { 202 if (!(j & 31)) { 203 if (j) 204 dprintk("\n"); 205 dprintk("0x%04x ", j); 206 } 207 dprintk("%02x%02x%02x%02x ", 208 buf[j], buf[j+1], buf[j+2], buf[j+3]); 209 } 210 dprintk("\n"); 211 } 212 #else 213 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count) 214 { 215 /* NOP */ 216 } 217 #endif 218 219 struct sock_xprt { 220 struct rpc_xprt xprt; 221 222 /* 223 * Network layer 224 */ 225 struct socket * sock; 226 struct sock * inet; 227 228 /* 229 * State of TCP reply receive 230 */ 231 __be32 tcp_fraghdr, 232 tcp_xid, 233 tcp_calldir; 234 235 u32 tcp_offset, 236 tcp_reclen; 237 238 unsigned long tcp_copied, 239 tcp_flags; 240 241 /* 242 * Connection of transports 243 */ 244 struct delayed_work connect_worker; 245 struct sockaddr_storage srcaddr; 246 unsigned short srcport; 247 248 /* 249 * UDP socket buffer size parameters 250 */ 251 size_t rcvsize, 252 sndsize; 253 254 /* 255 * Saved socket callback addresses 256 */ 257 void (*old_data_ready)(struct sock *, int); 258 void (*old_state_change)(struct sock *); 259 void (*old_write_space)(struct sock *); 260 }; 261 262 /* 263 * TCP receive state flags 264 */ 265 #define TCP_RCV_LAST_FRAG (1UL << 0) 266 #define TCP_RCV_COPY_FRAGHDR (1UL << 1) 267 #define TCP_RCV_COPY_XID (1UL << 2) 268 #define TCP_RCV_COPY_DATA (1UL << 3) 269 #define TCP_RCV_READ_CALLDIR (1UL << 4) 270 #define TCP_RCV_COPY_CALLDIR (1UL << 5) 271 272 /* 273 * TCP RPC flags 274 */ 275 #define TCP_RPC_REPLY (1UL << 6) 276 277 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt) 278 { 279 return (struct sockaddr *) &xprt->addr; 280 } 281 282 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt) 283 { 284 return (struct sockaddr_un *) &xprt->addr; 285 } 286 287 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt) 288 { 289 return (struct sockaddr_in *) &xprt->addr; 290 } 291 292 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt) 293 { 294 return (struct sockaddr_in6 *) &xprt->addr; 295 } 296 297 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt) 298 { 299 struct sockaddr *sap = xs_addr(xprt); 300 struct sockaddr_in6 *sin6; 301 struct sockaddr_in *sin; 302 struct sockaddr_un *sun; 303 char buf[128]; 304 305 switch (sap->sa_family) { 306 case AF_LOCAL: 307 sun = xs_addr_un(xprt); 308 strlcpy(buf, sun->sun_path, sizeof(buf)); 309 xprt->address_strings[RPC_DISPLAY_ADDR] = 310 kstrdup(buf, GFP_KERNEL); 311 break; 312 case AF_INET: 313 (void)rpc_ntop(sap, buf, sizeof(buf)); 314 xprt->address_strings[RPC_DISPLAY_ADDR] = 315 kstrdup(buf, GFP_KERNEL); 316 sin = xs_addr_in(xprt); 317 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr)); 318 break; 319 case AF_INET6: 320 (void)rpc_ntop(sap, buf, sizeof(buf)); 321 xprt->address_strings[RPC_DISPLAY_ADDR] = 322 kstrdup(buf, GFP_KERNEL); 323 sin6 = xs_addr_in6(xprt); 324 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr); 325 break; 326 default: 327 BUG(); 328 } 329 330 xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL); 331 } 332 333 static void xs_format_common_peer_ports(struct rpc_xprt *xprt) 334 { 335 struct sockaddr *sap = xs_addr(xprt); 336 char buf[128]; 337 338 snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap)); 339 xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL); 340 341 snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap)); 342 xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL); 343 } 344 345 static void xs_format_peer_addresses(struct rpc_xprt *xprt, 346 const char *protocol, 347 const char *netid) 348 { 349 xprt->address_strings[RPC_DISPLAY_PROTO] = protocol; 350 xprt->address_strings[RPC_DISPLAY_NETID] = netid; 351 xs_format_common_peer_addresses(xprt); 352 xs_format_common_peer_ports(xprt); 353 } 354 355 static void xs_update_peer_port(struct rpc_xprt *xprt) 356 { 357 kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]); 358 kfree(xprt->address_strings[RPC_DISPLAY_PORT]); 359 360 xs_format_common_peer_ports(xprt); 361 } 362 363 static void xs_free_peer_addresses(struct rpc_xprt *xprt) 364 { 365 unsigned int i; 366 367 for (i = 0; i < RPC_DISPLAY_MAX; i++) 368 switch (i) { 369 case RPC_DISPLAY_PROTO: 370 case RPC_DISPLAY_NETID: 371 continue; 372 default: 373 kfree(xprt->address_strings[i]); 374 } 375 } 376 377 #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) 378 379 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) 380 { 381 struct msghdr msg = { 382 .msg_name = addr, 383 .msg_namelen = addrlen, 384 .msg_flags = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0), 385 }; 386 struct kvec iov = { 387 .iov_base = vec->iov_base + base, 388 .iov_len = vec->iov_len - base, 389 }; 390 391 if (iov.iov_len != 0) 392 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 393 return kernel_sendmsg(sock, &msg, NULL, 0, 0); 394 } 395 396 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more) 397 { 398 struct page **ppage; 399 unsigned int remainder; 400 int err, sent = 0; 401 402 remainder = xdr->page_len - base; 403 base += xdr->page_base; 404 ppage = xdr->pages + (base >> PAGE_SHIFT); 405 base &= ~PAGE_MASK; 406 for(;;) { 407 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder); 408 int flags = XS_SENDMSG_FLAGS; 409 410 remainder -= len; 411 if (remainder != 0 || more) 412 flags |= MSG_MORE; 413 err = sock->ops->sendpage(sock, *ppage, base, len, flags); 414 if (remainder == 0 || err != len) 415 break; 416 sent += err; 417 ppage++; 418 base = 0; 419 } 420 if (sent == 0) 421 return err; 422 if (err > 0) 423 sent += err; 424 return sent; 425 } 426 427 /** 428 * xs_sendpages - write pages directly to a socket 429 * @sock: socket to send on 430 * @addr: UDP only -- address of destination 431 * @addrlen: UDP only -- length of destination address 432 * @xdr: buffer containing this request 433 * @base: starting position in the buffer 434 * 435 */ 436 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base) 437 { 438 unsigned int remainder = xdr->len - base; 439 int err, sent = 0; 440 441 if (unlikely(!sock)) 442 return -ENOTSOCK; 443 444 clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 445 if (base != 0) { 446 addr = NULL; 447 addrlen = 0; 448 } 449 450 if (base < xdr->head[0].iov_len || addr != NULL) { 451 unsigned int len = xdr->head[0].iov_len - base; 452 remainder -= len; 453 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0); 454 if (remainder == 0 || err != len) 455 goto out; 456 sent += err; 457 base = 0; 458 } else 459 base -= xdr->head[0].iov_len; 460 461 if (base < xdr->page_len) { 462 unsigned int len = xdr->page_len - base; 463 remainder -= len; 464 err = xs_send_pagedata(sock, xdr, base, remainder != 0); 465 if (remainder == 0 || err != len) 466 goto out; 467 sent += err; 468 base = 0; 469 } else 470 base -= xdr->page_len; 471 472 if (base >= xdr->tail[0].iov_len) 473 return sent; 474 err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0); 475 out: 476 if (sent == 0) 477 return err; 478 if (err > 0) 479 sent += err; 480 return sent; 481 } 482 483 static void xs_nospace_callback(struct rpc_task *task) 484 { 485 struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt); 486 487 transport->inet->sk_write_pending--; 488 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 489 } 490 491 /** 492 * xs_nospace - place task on wait queue if transmit was incomplete 493 * @task: task to put to sleep 494 * 495 */ 496 static int xs_nospace(struct rpc_task *task) 497 { 498 struct rpc_rqst *req = task->tk_rqstp; 499 struct rpc_xprt *xprt = req->rq_xprt; 500 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 501 int ret = -EAGAIN; 502 503 dprintk("RPC: %5u xmit incomplete (%u left of %u)\n", 504 task->tk_pid, req->rq_slen - req->rq_bytes_sent, 505 req->rq_slen); 506 507 /* Protect against races with write_space */ 508 spin_lock_bh(&xprt->transport_lock); 509 510 /* Don't race with disconnect */ 511 if (xprt_connected(xprt)) { 512 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) { 513 /* 514 * Notify TCP that we're limited by the application 515 * window size 516 */ 517 set_bit(SOCK_NOSPACE, &transport->sock->flags); 518 transport->inet->sk_write_pending++; 519 /* ...and wait for more buffer space */ 520 xprt_wait_for_buffer_space(task, xs_nospace_callback); 521 } 522 } else { 523 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 524 ret = -ENOTCONN; 525 } 526 527 spin_unlock_bh(&xprt->transport_lock); 528 return ret; 529 } 530 531 /* 532 * Construct a stream transport record marker in @buf. 533 */ 534 static inline void xs_encode_stream_record_marker(struct xdr_buf *buf) 535 { 536 u32 reclen = buf->len - sizeof(rpc_fraghdr); 537 rpc_fraghdr *base = buf->head[0].iov_base; 538 *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen); 539 } 540 541 /** 542 * xs_local_send_request - write an RPC request to an AF_LOCAL socket 543 * @task: RPC task that manages the state of an RPC request 544 * 545 * Return values: 546 * 0: The request has been sent 547 * EAGAIN: The socket was blocked, please call again later to 548 * complete the request 549 * ENOTCONN: Caller needs to invoke connect logic then call again 550 * other: Some other error occured, the request was not sent 551 */ 552 static int xs_local_send_request(struct rpc_task *task) 553 { 554 struct rpc_rqst *req = task->tk_rqstp; 555 struct rpc_xprt *xprt = req->rq_xprt; 556 struct sock_xprt *transport = 557 container_of(xprt, struct sock_xprt, xprt); 558 struct xdr_buf *xdr = &req->rq_snd_buf; 559 int status; 560 561 xs_encode_stream_record_marker(&req->rq_snd_buf); 562 563 xs_pktdump("packet data:", 564 req->rq_svec->iov_base, req->rq_svec->iov_len); 565 566 status = xs_sendpages(transport->sock, NULL, 0, 567 xdr, req->rq_bytes_sent); 568 dprintk("RPC: %s(%u) = %d\n", 569 __func__, xdr->len - req->rq_bytes_sent, status); 570 if (likely(status >= 0)) { 571 req->rq_bytes_sent += status; 572 req->rq_xmit_bytes_sent += status; 573 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 574 req->rq_bytes_sent = 0; 575 return 0; 576 } 577 status = -EAGAIN; 578 } 579 580 switch (status) { 581 case -EAGAIN: 582 status = xs_nospace(task); 583 break; 584 default: 585 dprintk("RPC: sendmsg returned unrecognized error %d\n", 586 -status); 587 case -EPIPE: 588 xs_close(xprt); 589 status = -ENOTCONN; 590 } 591 592 return status; 593 } 594 595 /** 596 * xs_udp_send_request - write an RPC request to a UDP socket 597 * @task: address of RPC task that manages the state of an RPC request 598 * 599 * Return values: 600 * 0: The request has been sent 601 * EAGAIN: The socket was blocked, please call again later to 602 * complete the request 603 * ENOTCONN: Caller needs to invoke connect logic then call again 604 * other: Some other error occurred, the request was not sent 605 */ 606 static int xs_udp_send_request(struct rpc_task *task) 607 { 608 struct rpc_rqst *req = task->tk_rqstp; 609 struct rpc_xprt *xprt = req->rq_xprt; 610 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 611 struct xdr_buf *xdr = &req->rq_snd_buf; 612 int status; 613 614 xs_pktdump("packet data:", 615 req->rq_svec->iov_base, 616 req->rq_svec->iov_len); 617 618 if (!xprt_bound(xprt)) 619 return -ENOTCONN; 620 status = xs_sendpages(transport->sock, 621 xs_addr(xprt), 622 xprt->addrlen, xdr, 623 req->rq_bytes_sent); 624 625 dprintk("RPC: xs_udp_send_request(%u) = %d\n", 626 xdr->len - req->rq_bytes_sent, status); 627 628 if (status >= 0) { 629 req->rq_xmit_bytes_sent += status; 630 if (status >= req->rq_slen) 631 return 0; 632 /* Still some bytes left; set up for a retry later. */ 633 status = -EAGAIN; 634 } 635 636 switch (status) { 637 case -ENOTSOCK: 638 status = -ENOTCONN; 639 /* Should we call xs_close() here? */ 640 break; 641 case -EAGAIN: 642 status = xs_nospace(task); 643 break; 644 default: 645 dprintk("RPC: sendmsg returned unrecognized error %d\n", 646 -status); 647 case -ENETUNREACH: 648 case -EPIPE: 649 case -ECONNREFUSED: 650 /* When the server has died, an ICMP port unreachable message 651 * prompts ECONNREFUSED. */ 652 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 653 } 654 655 return status; 656 } 657 658 /** 659 * xs_tcp_shutdown - gracefully shut down a TCP socket 660 * @xprt: transport 661 * 662 * Initiates a graceful shutdown of the TCP socket by calling the 663 * equivalent of shutdown(SHUT_WR); 664 */ 665 static void xs_tcp_shutdown(struct rpc_xprt *xprt) 666 { 667 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 668 struct socket *sock = transport->sock; 669 670 if (sock != NULL) { 671 kernel_sock_shutdown(sock, SHUT_WR); 672 trace_rpc_socket_shutdown(xprt, sock); 673 } 674 } 675 676 /** 677 * xs_tcp_send_request - write an RPC request to a TCP socket 678 * @task: address of RPC task that manages the state of an RPC request 679 * 680 * Return values: 681 * 0: The request has been sent 682 * EAGAIN: The socket was blocked, please call again later to 683 * complete the request 684 * ENOTCONN: Caller needs to invoke connect logic then call again 685 * other: Some other error occurred, the request was not sent 686 * 687 * XXX: In the case of soft timeouts, should we eventually give up 688 * if sendmsg is not able to make progress? 689 */ 690 static int xs_tcp_send_request(struct rpc_task *task) 691 { 692 struct rpc_rqst *req = task->tk_rqstp; 693 struct rpc_xprt *xprt = req->rq_xprt; 694 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 695 struct xdr_buf *xdr = &req->rq_snd_buf; 696 int status; 697 698 xs_encode_stream_record_marker(&req->rq_snd_buf); 699 700 xs_pktdump("packet data:", 701 req->rq_svec->iov_base, 702 req->rq_svec->iov_len); 703 704 /* Continue transmitting the packet/record. We must be careful 705 * to cope with writespace callbacks arriving _after_ we have 706 * called sendmsg(). */ 707 while (1) { 708 status = xs_sendpages(transport->sock, 709 NULL, 0, xdr, req->rq_bytes_sent); 710 711 dprintk("RPC: xs_tcp_send_request(%u) = %d\n", 712 xdr->len - req->rq_bytes_sent, status); 713 714 if (unlikely(status < 0)) 715 break; 716 717 /* If we've sent the entire packet, immediately 718 * reset the count of bytes sent. */ 719 req->rq_bytes_sent += status; 720 req->rq_xmit_bytes_sent += status; 721 if (likely(req->rq_bytes_sent >= req->rq_slen)) { 722 req->rq_bytes_sent = 0; 723 return 0; 724 } 725 726 if (status != 0) 727 continue; 728 status = -EAGAIN; 729 break; 730 } 731 732 switch (status) { 733 case -ENOTSOCK: 734 status = -ENOTCONN; 735 /* Should we call xs_close() here? */ 736 break; 737 case -EAGAIN: 738 status = xs_nospace(task); 739 break; 740 default: 741 dprintk("RPC: sendmsg returned unrecognized error %d\n", 742 -status); 743 case -ECONNRESET: 744 xs_tcp_shutdown(xprt); 745 case -ECONNREFUSED: 746 case -ENOTCONN: 747 case -EPIPE: 748 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags); 749 } 750 751 return status; 752 } 753 754 /** 755 * xs_tcp_release_xprt - clean up after a tcp transmission 756 * @xprt: transport 757 * @task: rpc task 758 * 759 * This cleans up if an error causes us to abort the transmission of a request. 760 * In this case, the socket may need to be reset in order to avoid confusing 761 * the server. 762 */ 763 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task) 764 { 765 struct rpc_rqst *req; 766 767 if (task != xprt->snd_task) 768 return; 769 if (task == NULL) 770 goto out_release; 771 req = task->tk_rqstp; 772 if (req == NULL) 773 goto out_release; 774 if (req->rq_bytes_sent == 0) 775 goto out_release; 776 if (req->rq_bytes_sent == req->rq_snd_buf.len) 777 goto out_release; 778 set_bit(XPRT_CLOSE_WAIT, &xprt->state); 779 out_release: 780 xprt_release_xprt(xprt, task); 781 } 782 783 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk) 784 { 785 transport->old_data_ready = sk->sk_data_ready; 786 transport->old_state_change = sk->sk_state_change; 787 transport->old_write_space = sk->sk_write_space; 788 } 789 790 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk) 791 { 792 sk->sk_data_ready = transport->old_data_ready; 793 sk->sk_state_change = transport->old_state_change; 794 sk->sk_write_space = transport->old_write_space; 795 } 796 797 static void xs_reset_transport(struct sock_xprt *transport) 798 { 799 struct socket *sock = transport->sock; 800 struct sock *sk = transport->inet; 801 802 if (sk == NULL) 803 return; 804 805 transport->srcport = 0; 806 807 write_lock_bh(&sk->sk_callback_lock); 808 transport->inet = NULL; 809 transport->sock = NULL; 810 811 sk->sk_user_data = NULL; 812 813 xs_restore_old_callbacks(transport, sk); 814 write_unlock_bh(&sk->sk_callback_lock); 815 816 sk->sk_no_check = 0; 817 818 trace_rpc_socket_close(&transport->xprt, sock); 819 sock_release(sock); 820 } 821 822 /** 823 * xs_close - close a socket 824 * @xprt: transport 825 * 826 * This is used when all requests are complete; ie, no DRC state remains 827 * on the server we want to save. 828 * 829 * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with 830 * xs_reset_transport() zeroing the socket from underneath a writer. 831 */ 832 static void xs_close(struct rpc_xprt *xprt) 833 { 834 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 835 836 dprintk("RPC: xs_close xprt %p\n", xprt); 837 838 cancel_delayed_work_sync(&transport->connect_worker); 839 840 xs_reset_transport(transport); 841 xprt->reestablish_timeout = 0; 842 843 smp_mb__before_clear_bit(); 844 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 845 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 846 clear_bit(XPRT_CLOSING, &xprt->state); 847 smp_mb__after_clear_bit(); 848 xprt_disconnect_done(xprt); 849 } 850 851 static void xs_tcp_close(struct rpc_xprt *xprt) 852 { 853 if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state)) 854 xs_close(xprt); 855 else 856 xs_tcp_shutdown(xprt); 857 } 858 859 /** 860 * xs_destroy - prepare to shutdown a transport 861 * @xprt: doomed transport 862 * 863 */ 864 static void xs_destroy(struct rpc_xprt *xprt) 865 { 866 dprintk("RPC: xs_destroy xprt %p\n", xprt); 867 868 xs_close(xprt); 869 xs_free_peer_addresses(xprt); 870 xprt_free(xprt); 871 module_put(THIS_MODULE); 872 } 873 874 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk) 875 { 876 return (struct rpc_xprt *) sk->sk_user_data; 877 } 878 879 static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) 880 { 881 struct xdr_skb_reader desc = { 882 .skb = skb, 883 .offset = sizeof(rpc_fraghdr), 884 .count = skb->len - sizeof(rpc_fraghdr), 885 }; 886 887 if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0) 888 return -1; 889 if (desc.count) 890 return -1; 891 return 0; 892 } 893 894 /** 895 * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets 896 * @sk: socket with data to read 897 * @len: how much data to read 898 * 899 * Currently this assumes we can read the whole reply in a single gulp. 900 */ 901 static void xs_local_data_ready(struct sock *sk, int len) 902 { 903 struct rpc_task *task; 904 struct rpc_xprt *xprt; 905 struct rpc_rqst *rovr; 906 struct sk_buff *skb; 907 int err, repsize, copied; 908 u32 _xid; 909 __be32 *xp; 910 911 read_lock_bh(&sk->sk_callback_lock); 912 dprintk("RPC: %s...\n", __func__); 913 xprt = xprt_from_sock(sk); 914 if (xprt == NULL) 915 goto out; 916 917 skb = skb_recv_datagram(sk, 0, 1, &err); 918 if (skb == NULL) 919 goto out; 920 921 repsize = skb->len - sizeof(rpc_fraghdr); 922 if (repsize < 4) { 923 dprintk("RPC: impossible RPC reply size %d\n", repsize); 924 goto dropit; 925 } 926 927 /* Copy the XID from the skb... */ 928 xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); 929 if (xp == NULL) 930 goto dropit; 931 932 /* Look up and lock the request corresponding to the given XID */ 933 spin_lock(&xprt->transport_lock); 934 rovr = xprt_lookup_rqst(xprt, *xp); 935 if (!rovr) 936 goto out_unlock; 937 task = rovr->rq_task; 938 939 copied = rovr->rq_private_buf.buflen; 940 if (copied > repsize) 941 copied = repsize; 942 943 if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) { 944 dprintk("RPC: sk_buff copy failed\n"); 945 goto out_unlock; 946 } 947 948 xprt_complete_rqst(task, copied); 949 950 out_unlock: 951 spin_unlock(&xprt->transport_lock); 952 dropit: 953 skb_free_datagram(sk, skb); 954 out: 955 read_unlock_bh(&sk->sk_callback_lock); 956 } 957 958 /** 959 * xs_udp_data_ready - "data ready" callback for UDP sockets 960 * @sk: socket with data to read 961 * @len: how much data to read 962 * 963 */ 964 static void xs_udp_data_ready(struct sock *sk, int len) 965 { 966 struct rpc_task *task; 967 struct rpc_xprt *xprt; 968 struct rpc_rqst *rovr; 969 struct sk_buff *skb; 970 int err, repsize, copied; 971 u32 _xid; 972 __be32 *xp; 973 974 read_lock_bh(&sk->sk_callback_lock); 975 dprintk("RPC: xs_udp_data_ready...\n"); 976 if (!(xprt = xprt_from_sock(sk))) 977 goto out; 978 979 if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 980 goto out; 981 982 repsize = skb->len - sizeof(struct udphdr); 983 if (repsize < 4) { 984 dprintk("RPC: impossible RPC reply size %d!\n", repsize); 985 goto dropit; 986 } 987 988 /* Copy the XID from the skb... */ 989 xp = skb_header_pointer(skb, sizeof(struct udphdr), 990 sizeof(_xid), &_xid); 991 if (xp == NULL) 992 goto dropit; 993 994 /* Look up and lock the request corresponding to the given XID */ 995 spin_lock(&xprt->transport_lock); 996 rovr = xprt_lookup_rqst(xprt, *xp); 997 if (!rovr) 998 goto out_unlock; 999 task = rovr->rq_task; 1000 1001 if ((copied = rovr->rq_private_buf.buflen) > repsize) 1002 copied = repsize; 1003 1004 /* Suck it into the iovec, verify checksum if not done by hw. */ 1005 if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { 1006 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS); 1007 goto out_unlock; 1008 } 1009 1010 UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); 1011 1012 xprt_adjust_cwnd(xprt, task, copied); 1013 xprt_complete_rqst(task, copied); 1014 1015 out_unlock: 1016 spin_unlock(&xprt->transport_lock); 1017 dropit: 1018 skb_free_datagram(sk, skb); 1019 out: 1020 read_unlock_bh(&sk->sk_callback_lock); 1021 } 1022 1023 /* 1024 * Helper function to force a TCP close if the server is sending 1025 * junk and/or it has put us in CLOSE_WAIT 1026 */ 1027 static void xs_tcp_force_close(struct rpc_xprt *xprt) 1028 { 1029 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 1030 xprt_force_disconnect(xprt); 1031 } 1032 1033 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) 1034 { 1035 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1036 size_t len, used; 1037 char *p; 1038 1039 p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset; 1040 len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset; 1041 used = xdr_skb_read_bits(desc, p, len); 1042 transport->tcp_offset += used; 1043 if (used != len) 1044 return; 1045 1046 transport->tcp_reclen = ntohl(transport->tcp_fraghdr); 1047 if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT) 1048 transport->tcp_flags |= TCP_RCV_LAST_FRAG; 1049 else 1050 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG; 1051 transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK; 1052 1053 transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR; 1054 transport->tcp_offset = 0; 1055 1056 /* Sanity check of the record length */ 1057 if (unlikely(transport->tcp_reclen < 8)) { 1058 dprintk("RPC: invalid TCP record fragment length\n"); 1059 xs_tcp_force_close(xprt); 1060 return; 1061 } 1062 dprintk("RPC: reading TCP record fragment of length %d\n", 1063 transport->tcp_reclen); 1064 } 1065 1066 static void xs_tcp_check_fraghdr(struct sock_xprt *transport) 1067 { 1068 if (transport->tcp_offset == transport->tcp_reclen) { 1069 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR; 1070 transport->tcp_offset = 0; 1071 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) { 1072 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1073 transport->tcp_flags |= TCP_RCV_COPY_XID; 1074 transport->tcp_copied = 0; 1075 } 1076 } 1077 } 1078 1079 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1080 { 1081 size_t len, used; 1082 char *p; 1083 1084 len = sizeof(transport->tcp_xid) - transport->tcp_offset; 1085 dprintk("RPC: reading XID (%Zu bytes)\n", len); 1086 p = ((char *) &transport->tcp_xid) + transport->tcp_offset; 1087 used = xdr_skb_read_bits(desc, p, len); 1088 transport->tcp_offset += used; 1089 if (used != len) 1090 return; 1091 transport->tcp_flags &= ~TCP_RCV_COPY_XID; 1092 transport->tcp_flags |= TCP_RCV_READ_CALLDIR; 1093 transport->tcp_copied = 4; 1094 dprintk("RPC: reading %s XID %08x\n", 1095 (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for" 1096 : "request with", 1097 ntohl(transport->tcp_xid)); 1098 xs_tcp_check_fraghdr(transport); 1099 } 1100 1101 static inline void xs_tcp_read_calldir(struct sock_xprt *transport, 1102 struct xdr_skb_reader *desc) 1103 { 1104 size_t len, used; 1105 u32 offset; 1106 char *p; 1107 1108 /* 1109 * We want transport->tcp_offset to be 8 at the end of this routine 1110 * (4 bytes for the xid and 4 bytes for the call/reply flag). 1111 * When this function is called for the first time, 1112 * transport->tcp_offset is 4 (after having already read the xid). 1113 */ 1114 offset = transport->tcp_offset - sizeof(transport->tcp_xid); 1115 len = sizeof(transport->tcp_calldir) - offset; 1116 dprintk("RPC: reading CALL/REPLY flag (%Zu bytes)\n", len); 1117 p = ((char *) &transport->tcp_calldir) + offset; 1118 used = xdr_skb_read_bits(desc, p, len); 1119 transport->tcp_offset += used; 1120 if (used != len) 1121 return; 1122 transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR; 1123 /* 1124 * We don't yet have the XDR buffer, so we will write the calldir 1125 * out after we get the buffer from the 'struct rpc_rqst' 1126 */ 1127 switch (ntohl(transport->tcp_calldir)) { 1128 case RPC_REPLY: 1129 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; 1130 transport->tcp_flags |= TCP_RCV_COPY_DATA; 1131 transport->tcp_flags |= TCP_RPC_REPLY; 1132 break; 1133 case RPC_CALL: 1134 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR; 1135 transport->tcp_flags |= TCP_RCV_COPY_DATA; 1136 transport->tcp_flags &= ~TCP_RPC_REPLY; 1137 break; 1138 default: 1139 dprintk("RPC: invalid request message type\n"); 1140 xs_tcp_force_close(&transport->xprt); 1141 } 1142 xs_tcp_check_fraghdr(transport); 1143 } 1144 1145 static inline void xs_tcp_read_common(struct rpc_xprt *xprt, 1146 struct xdr_skb_reader *desc, 1147 struct rpc_rqst *req) 1148 { 1149 struct sock_xprt *transport = 1150 container_of(xprt, struct sock_xprt, xprt); 1151 struct xdr_buf *rcvbuf; 1152 size_t len; 1153 ssize_t r; 1154 1155 rcvbuf = &req->rq_private_buf; 1156 1157 if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) { 1158 /* 1159 * Save the RPC direction in the XDR buffer 1160 */ 1161 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied, 1162 &transport->tcp_calldir, 1163 sizeof(transport->tcp_calldir)); 1164 transport->tcp_copied += sizeof(transport->tcp_calldir); 1165 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR; 1166 } 1167 1168 len = desc->count; 1169 if (len > transport->tcp_reclen - transport->tcp_offset) { 1170 struct xdr_skb_reader my_desc; 1171 1172 len = transport->tcp_reclen - transport->tcp_offset; 1173 memcpy(&my_desc, desc, sizeof(my_desc)); 1174 my_desc.count = len; 1175 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1176 &my_desc, xdr_skb_read_bits); 1177 desc->count -= r; 1178 desc->offset += r; 1179 } else 1180 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied, 1181 desc, xdr_skb_read_bits); 1182 1183 if (r > 0) { 1184 transport->tcp_copied += r; 1185 transport->tcp_offset += r; 1186 } 1187 if (r != len) { 1188 /* Error when copying to the receive buffer, 1189 * usually because we weren't able to allocate 1190 * additional buffer pages. All we can do now 1191 * is turn off TCP_RCV_COPY_DATA, so the request 1192 * will not receive any additional updates, 1193 * and time out. 1194 * Any remaining data from this record will 1195 * be discarded. 1196 */ 1197 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1198 dprintk("RPC: XID %08x truncated request\n", 1199 ntohl(transport->tcp_xid)); 1200 dprintk("RPC: xprt = %p, tcp_copied = %lu, " 1201 "tcp_offset = %u, tcp_reclen = %u\n", 1202 xprt, transport->tcp_copied, 1203 transport->tcp_offset, transport->tcp_reclen); 1204 return; 1205 } 1206 1207 dprintk("RPC: XID %08x read %Zd bytes\n", 1208 ntohl(transport->tcp_xid), r); 1209 dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, " 1210 "tcp_reclen = %u\n", xprt, transport->tcp_copied, 1211 transport->tcp_offset, transport->tcp_reclen); 1212 1213 if (transport->tcp_copied == req->rq_private_buf.buflen) 1214 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1215 else if (transport->tcp_offset == transport->tcp_reclen) { 1216 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) 1217 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1218 } 1219 } 1220 1221 /* 1222 * Finds the request corresponding to the RPC xid and invokes the common 1223 * tcp read code to read the data. 1224 */ 1225 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, 1226 struct xdr_skb_reader *desc) 1227 { 1228 struct sock_xprt *transport = 1229 container_of(xprt, struct sock_xprt, xprt); 1230 struct rpc_rqst *req; 1231 1232 dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); 1233 1234 /* Find and lock the request corresponding to this xid */ 1235 spin_lock(&xprt->transport_lock); 1236 req = xprt_lookup_rqst(xprt, transport->tcp_xid); 1237 if (!req) { 1238 dprintk("RPC: XID %08x request not found!\n", 1239 ntohl(transport->tcp_xid)); 1240 spin_unlock(&xprt->transport_lock); 1241 return -1; 1242 } 1243 1244 xs_tcp_read_common(xprt, desc, req); 1245 1246 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) 1247 xprt_complete_rqst(req->rq_task, transport->tcp_copied); 1248 1249 spin_unlock(&xprt->transport_lock); 1250 return 0; 1251 } 1252 1253 #if defined(CONFIG_SUNRPC_BACKCHANNEL) 1254 /* 1255 * Obtains an rpc_rqst previously allocated and invokes the common 1256 * tcp read code to read the data. The result is placed in the callback 1257 * queue. 1258 * If we're unable to obtain the rpc_rqst we schedule the closing of the 1259 * connection and return -1. 1260 */ 1261 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt, 1262 struct xdr_skb_reader *desc) 1263 { 1264 struct sock_xprt *transport = 1265 container_of(xprt, struct sock_xprt, xprt); 1266 struct rpc_rqst *req; 1267 1268 req = xprt_alloc_bc_request(xprt); 1269 if (req == NULL) { 1270 printk(KERN_WARNING "Callback slot table overflowed\n"); 1271 xprt_force_disconnect(xprt); 1272 return -1; 1273 } 1274 1275 req->rq_xid = transport->tcp_xid; 1276 dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid)); 1277 xs_tcp_read_common(xprt, desc, req); 1278 1279 if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) { 1280 struct svc_serv *bc_serv = xprt->bc_serv; 1281 1282 /* 1283 * Add callback request to callback list. The callback 1284 * service sleeps on the sv_cb_waitq waiting for new 1285 * requests. Wake it up after adding enqueing the 1286 * request. 1287 */ 1288 dprintk("RPC: add callback request to list\n"); 1289 spin_lock(&bc_serv->sv_cb_lock); 1290 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list); 1291 spin_unlock(&bc_serv->sv_cb_lock); 1292 wake_up(&bc_serv->sv_cb_waitq); 1293 } 1294 1295 req->rq_private_buf.len = transport->tcp_copied; 1296 1297 return 0; 1298 } 1299 1300 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1301 struct xdr_skb_reader *desc) 1302 { 1303 struct sock_xprt *transport = 1304 container_of(xprt, struct sock_xprt, xprt); 1305 1306 return (transport->tcp_flags & TCP_RPC_REPLY) ? 1307 xs_tcp_read_reply(xprt, desc) : 1308 xs_tcp_read_callback(xprt, desc); 1309 } 1310 #else 1311 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, 1312 struct xdr_skb_reader *desc) 1313 { 1314 return xs_tcp_read_reply(xprt, desc); 1315 } 1316 #endif /* CONFIG_SUNRPC_BACKCHANNEL */ 1317 1318 /* 1319 * Read data off the transport. This can be either an RPC_CALL or an 1320 * RPC_REPLY. Relay the processing to helper functions. 1321 */ 1322 static void xs_tcp_read_data(struct rpc_xprt *xprt, 1323 struct xdr_skb_reader *desc) 1324 { 1325 struct sock_xprt *transport = 1326 container_of(xprt, struct sock_xprt, xprt); 1327 1328 if (_xs_tcp_read_data(xprt, desc) == 0) 1329 xs_tcp_check_fraghdr(transport); 1330 else { 1331 /* 1332 * The transport_lock protects the request handling. 1333 * There's no need to hold it to update the tcp_flags. 1334 */ 1335 transport->tcp_flags &= ~TCP_RCV_COPY_DATA; 1336 } 1337 } 1338 1339 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc) 1340 { 1341 size_t len; 1342 1343 len = transport->tcp_reclen - transport->tcp_offset; 1344 if (len > desc->count) 1345 len = desc->count; 1346 desc->count -= len; 1347 desc->offset += len; 1348 transport->tcp_offset += len; 1349 dprintk("RPC: discarded %Zu bytes\n", len); 1350 xs_tcp_check_fraghdr(transport); 1351 } 1352 1353 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len) 1354 { 1355 struct rpc_xprt *xprt = rd_desc->arg.data; 1356 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1357 struct xdr_skb_reader desc = { 1358 .skb = skb, 1359 .offset = offset, 1360 .count = len, 1361 }; 1362 1363 dprintk("RPC: xs_tcp_data_recv started\n"); 1364 do { 1365 /* Read in a new fragment marker if necessary */ 1366 /* Can we ever really expect to get completely empty fragments? */ 1367 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) { 1368 xs_tcp_read_fraghdr(xprt, &desc); 1369 continue; 1370 } 1371 /* Read in the xid if necessary */ 1372 if (transport->tcp_flags & TCP_RCV_COPY_XID) { 1373 xs_tcp_read_xid(transport, &desc); 1374 continue; 1375 } 1376 /* Read in the call/reply flag */ 1377 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) { 1378 xs_tcp_read_calldir(transport, &desc); 1379 continue; 1380 } 1381 /* Read in the request data */ 1382 if (transport->tcp_flags & TCP_RCV_COPY_DATA) { 1383 xs_tcp_read_data(xprt, &desc); 1384 continue; 1385 } 1386 /* Skip over any trailing bytes on short reads */ 1387 xs_tcp_read_discard(transport, &desc); 1388 } while (desc.count); 1389 dprintk("RPC: xs_tcp_data_recv done\n"); 1390 return len - desc.count; 1391 } 1392 1393 /** 1394 * xs_tcp_data_ready - "data ready" callback for TCP sockets 1395 * @sk: socket with data to read 1396 * @bytes: how much data to read 1397 * 1398 */ 1399 static void xs_tcp_data_ready(struct sock *sk, int bytes) 1400 { 1401 struct rpc_xprt *xprt; 1402 read_descriptor_t rd_desc; 1403 int read; 1404 1405 dprintk("RPC: xs_tcp_data_ready...\n"); 1406 1407 read_lock_bh(&sk->sk_callback_lock); 1408 if (!(xprt = xprt_from_sock(sk))) 1409 goto out; 1410 /* Any data means we had a useful conversation, so 1411 * the we don't need to delay the next reconnect 1412 */ 1413 if (xprt->reestablish_timeout) 1414 xprt->reestablish_timeout = 0; 1415 1416 /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ 1417 rd_desc.arg.data = xprt; 1418 do { 1419 rd_desc.count = 65536; 1420 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); 1421 } while (read > 0); 1422 out: 1423 read_unlock_bh(&sk->sk_callback_lock); 1424 } 1425 1426 /* 1427 * Do the equivalent of linger/linger2 handling for dealing with 1428 * broken servers that don't close the socket in a timely 1429 * fashion 1430 */ 1431 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt, 1432 unsigned long timeout) 1433 { 1434 struct sock_xprt *transport; 1435 1436 if (xprt_test_and_set_connecting(xprt)) 1437 return; 1438 set_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1439 transport = container_of(xprt, struct sock_xprt, xprt); 1440 queue_delayed_work(rpciod_workqueue, &transport->connect_worker, 1441 timeout); 1442 } 1443 1444 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt) 1445 { 1446 struct sock_xprt *transport; 1447 1448 transport = container_of(xprt, struct sock_xprt, xprt); 1449 1450 if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) || 1451 !cancel_delayed_work(&transport->connect_worker)) 1452 return; 1453 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1454 xprt_clear_connecting(xprt); 1455 } 1456 1457 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) 1458 { 1459 smp_mb__before_clear_bit(); 1460 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1461 clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state); 1462 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1463 clear_bit(XPRT_CLOSING, &xprt->state); 1464 smp_mb__after_clear_bit(); 1465 } 1466 1467 static void xs_sock_mark_closed(struct rpc_xprt *xprt) 1468 { 1469 xs_sock_reset_connection_flags(xprt); 1470 /* Mark transport as closed and wake up all pending tasks */ 1471 xprt_disconnect_done(xprt); 1472 } 1473 1474 /** 1475 * xs_tcp_state_change - callback to handle TCP socket state changes 1476 * @sk: socket whose state has changed 1477 * 1478 */ 1479 static void xs_tcp_state_change(struct sock *sk) 1480 { 1481 struct rpc_xprt *xprt; 1482 1483 read_lock_bh(&sk->sk_callback_lock); 1484 if (!(xprt = xprt_from_sock(sk))) 1485 goto out; 1486 dprintk("RPC: xs_tcp_state_change client %p...\n", xprt); 1487 dprintk("RPC: state %x conn %d dead %d zapped %d sk_shutdown %d\n", 1488 sk->sk_state, xprt_connected(xprt), 1489 sock_flag(sk, SOCK_DEAD), 1490 sock_flag(sk, SOCK_ZAPPED), 1491 sk->sk_shutdown); 1492 1493 trace_rpc_socket_state_change(xprt, sk->sk_socket); 1494 switch (sk->sk_state) { 1495 case TCP_ESTABLISHED: 1496 spin_lock(&xprt->transport_lock); 1497 if (!xprt_test_and_set_connected(xprt)) { 1498 struct sock_xprt *transport = container_of(xprt, 1499 struct sock_xprt, xprt); 1500 1501 /* Reset TCP record info */ 1502 transport->tcp_offset = 0; 1503 transport->tcp_reclen = 0; 1504 transport->tcp_copied = 0; 1505 transport->tcp_flags = 1506 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID; 1507 xprt->connect_cookie++; 1508 1509 xprt_wake_pending_tasks(xprt, -EAGAIN); 1510 } 1511 spin_unlock(&xprt->transport_lock); 1512 break; 1513 case TCP_FIN_WAIT1: 1514 /* The client initiated a shutdown of the socket */ 1515 xprt->connect_cookie++; 1516 xprt->reestablish_timeout = 0; 1517 set_bit(XPRT_CLOSING, &xprt->state); 1518 smp_mb__before_clear_bit(); 1519 clear_bit(XPRT_CONNECTED, &xprt->state); 1520 clear_bit(XPRT_CLOSE_WAIT, &xprt->state); 1521 smp_mb__after_clear_bit(); 1522 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1523 break; 1524 case TCP_CLOSE_WAIT: 1525 /* The server initiated a shutdown of the socket */ 1526 xprt->connect_cookie++; 1527 clear_bit(XPRT_CONNECTED, &xprt->state); 1528 xs_tcp_force_close(xprt); 1529 case TCP_CLOSING: 1530 /* 1531 * If the server closed down the connection, make sure that 1532 * we back off before reconnecting 1533 */ 1534 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 1535 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 1536 break; 1537 case TCP_LAST_ACK: 1538 set_bit(XPRT_CLOSING, &xprt->state); 1539 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout); 1540 smp_mb__before_clear_bit(); 1541 clear_bit(XPRT_CONNECTED, &xprt->state); 1542 smp_mb__after_clear_bit(); 1543 break; 1544 case TCP_CLOSE: 1545 xs_tcp_cancel_linger_timeout(xprt); 1546 xs_sock_mark_closed(xprt); 1547 } 1548 out: 1549 read_unlock_bh(&sk->sk_callback_lock); 1550 } 1551 1552 static void xs_write_space(struct sock *sk) 1553 { 1554 struct socket *sock; 1555 struct rpc_xprt *xprt; 1556 1557 if (unlikely(!(sock = sk->sk_socket))) 1558 return; 1559 clear_bit(SOCK_NOSPACE, &sock->flags); 1560 1561 if (unlikely(!(xprt = xprt_from_sock(sk)))) 1562 return; 1563 if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0) 1564 return; 1565 1566 xprt_write_space(xprt); 1567 } 1568 1569 /** 1570 * xs_udp_write_space - callback invoked when socket buffer space 1571 * becomes available 1572 * @sk: socket whose state has changed 1573 * 1574 * Called when more output buffer space is available for this socket. 1575 * We try not to wake our writers until they can make "significant" 1576 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1577 * with a bunch of small requests. 1578 */ 1579 static void xs_udp_write_space(struct sock *sk) 1580 { 1581 read_lock_bh(&sk->sk_callback_lock); 1582 1583 /* from net/core/sock.c:sock_def_write_space */ 1584 if (sock_writeable(sk)) 1585 xs_write_space(sk); 1586 1587 read_unlock_bh(&sk->sk_callback_lock); 1588 } 1589 1590 /** 1591 * xs_tcp_write_space - callback invoked when socket buffer space 1592 * becomes available 1593 * @sk: socket whose state has changed 1594 * 1595 * Called when more output buffer space is available for this socket. 1596 * We try not to wake our writers until they can make "significant" 1597 * progress, otherwise we'll waste resources thrashing kernel_sendmsg 1598 * with a bunch of small requests. 1599 */ 1600 static void xs_tcp_write_space(struct sock *sk) 1601 { 1602 read_lock_bh(&sk->sk_callback_lock); 1603 1604 /* from net/core/stream.c:sk_stream_write_space */ 1605 if (sk_stream_is_writeable(sk)) 1606 xs_write_space(sk); 1607 1608 read_unlock_bh(&sk->sk_callback_lock); 1609 } 1610 1611 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt) 1612 { 1613 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1614 struct sock *sk = transport->inet; 1615 1616 if (transport->rcvsize) { 1617 sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 1618 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2; 1619 } 1620 if (transport->sndsize) { 1621 sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 1622 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2; 1623 sk->sk_write_space(sk); 1624 } 1625 } 1626 1627 /** 1628 * xs_udp_set_buffer_size - set send and receive limits 1629 * @xprt: generic transport 1630 * @sndsize: requested size of send buffer, in bytes 1631 * @rcvsize: requested size of receive buffer, in bytes 1632 * 1633 * Set socket send and receive buffer size limits. 1634 */ 1635 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize) 1636 { 1637 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1638 1639 transport->sndsize = 0; 1640 if (sndsize) 1641 transport->sndsize = sndsize + 1024; 1642 transport->rcvsize = 0; 1643 if (rcvsize) 1644 transport->rcvsize = rcvsize + 1024; 1645 1646 xs_udp_do_set_buffer_size(xprt); 1647 } 1648 1649 /** 1650 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport 1651 * @task: task that timed out 1652 * 1653 * Adjust the congestion window after a retransmit timeout has occurred. 1654 */ 1655 static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) 1656 { 1657 xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); 1658 } 1659 1660 static unsigned short xs_get_random_port(void) 1661 { 1662 unsigned short range = xprt_max_resvport - xprt_min_resvport; 1663 unsigned short rand = (unsigned short) net_random() % range; 1664 return rand + xprt_min_resvport; 1665 } 1666 1667 /** 1668 * xs_set_port - reset the port number in the remote endpoint address 1669 * @xprt: generic transport 1670 * @port: new port number 1671 * 1672 */ 1673 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port) 1674 { 1675 dprintk("RPC: setting port for xprt %p to %u\n", xprt, port); 1676 1677 rpc_set_port(xs_addr(xprt), port); 1678 xs_update_peer_port(xprt); 1679 } 1680 1681 static unsigned short xs_get_srcport(struct sock_xprt *transport) 1682 { 1683 unsigned short port = transport->srcport; 1684 1685 if (port == 0 && transport->xprt.resvport) 1686 port = xs_get_random_port(); 1687 return port; 1688 } 1689 1690 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port) 1691 { 1692 if (transport->srcport != 0) 1693 transport->srcport = 0; 1694 if (!transport->xprt.resvport) 1695 return 0; 1696 if (port <= xprt_min_resvport || port > xprt_max_resvport) 1697 return xprt_max_resvport; 1698 return --port; 1699 } 1700 static int xs_bind(struct sock_xprt *transport, struct socket *sock) 1701 { 1702 struct sockaddr_storage myaddr; 1703 int err, nloop = 0; 1704 unsigned short port = xs_get_srcport(transport); 1705 unsigned short last; 1706 1707 memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen); 1708 do { 1709 rpc_set_port((struct sockaddr *)&myaddr, port); 1710 err = kernel_bind(sock, (struct sockaddr *)&myaddr, 1711 transport->xprt.addrlen); 1712 if (port == 0) 1713 break; 1714 if (err == 0) { 1715 transport->srcport = port; 1716 break; 1717 } 1718 last = port; 1719 port = xs_next_srcport(transport, port); 1720 if (port > last) 1721 nloop++; 1722 } while (err == -EADDRINUSE && nloop != 2); 1723 1724 if (myaddr.ss_family == AF_INET) 1725 dprintk("RPC: %s %pI4:%u: %s (%d)\n", __func__, 1726 &((struct sockaddr_in *)&myaddr)->sin_addr, 1727 port, err ? "failed" : "ok", err); 1728 else 1729 dprintk("RPC: %s %pI6:%u: %s (%d)\n", __func__, 1730 &((struct sockaddr_in6 *)&myaddr)->sin6_addr, 1731 port, err ? "failed" : "ok", err); 1732 return err; 1733 } 1734 1735 /* 1736 * We don't support autobind on AF_LOCAL sockets 1737 */ 1738 static void xs_local_rpcbind(struct rpc_task *task) 1739 { 1740 rcu_read_lock(); 1741 xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt)); 1742 rcu_read_unlock(); 1743 } 1744 1745 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port) 1746 { 1747 } 1748 1749 #ifdef CONFIG_DEBUG_LOCK_ALLOC 1750 static struct lock_class_key xs_key[2]; 1751 static struct lock_class_key xs_slock_key[2]; 1752 1753 static inline void xs_reclassify_socketu(struct socket *sock) 1754 { 1755 struct sock *sk = sock->sk; 1756 1757 sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC", 1758 &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]); 1759 } 1760 1761 static inline void xs_reclassify_socket4(struct socket *sock) 1762 { 1763 struct sock *sk = sock->sk; 1764 1765 sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC", 1766 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]); 1767 } 1768 1769 static inline void xs_reclassify_socket6(struct socket *sock) 1770 { 1771 struct sock *sk = sock->sk; 1772 1773 sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC", 1774 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]); 1775 } 1776 1777 static inline void xs_reclassify_socket(int family, struct socket *sock) 1778 { 1779 WARN_ON_ONCE(sock_owned_by_user(sock->sk)); 1780 if (sock_owned_by_user(sock->sk)) 1781 return; 1782 1783 switch (family) { 1784 case AF_LOCAL: 1785 xs_reclassify_socketu(sock); 1786 break; 1787 case AF_INET: 1788 xs_reclassify_socket4(sock); 1789 break; 1790 case AF_INET6: 1791 xs_reclassify_socket6(sock); 1792 break; 1793 } 1794 } 1795 #else 1796 static inline void xs_reclassify_socketu(struct socket *sock) 1797 { 1798 } 1799 1800 static inline void xs_reclassify_socket4(struct socket *sock) 1801 { 1802 } 1803 1804 static inline void xs_reclassify_socket6(struct socket *sock) 1805 { 1806 } 1807 1808 static inline void xs_reclassify_socket(int family, struct socket *sock) 1809 { 1810 } 1811 #endif 1812 1813 static void xs_dummy_setup_socket(struct work_struct *work) 1814 { 1815 } 1816 1817 static struct socket *xs_create_sock(struct rpc_xprt *xprt, 1818 struct sock_xprt *transport, int family, int type, int protocol) 1819 { 1820 struct socket *sock; 1821 int err; 1822 1823 err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1); 1824 if (err < 0) { 1825 dprintk("RPC: can't create %d transport socket (%d).\n", 1826 protocol, -err); 1827 goto out; 1828 } 1829 xs_reclassify_socket(family, sock); 1830 1831 err = xs_bind(transport, sock); 1832 if (err) { 1833 sock_release(sock); 1834 goto out; 1835 } 1836 1837 return sock; 1838 out: 1839 return ERR_PTR(err); 1840 } 1841 1842 static int xs_local_finish_connecting(struct rpc_xprt *xprt, 1843 struct socket *sock) 1844 { 1845 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 1846 xprt); 1847 1848 if (!transport->inet) { 1849 struct sock *sk = sock->sk; 1850 1851 write_lock_bh(&sk->sk_callback_lock); 1852 1853 xs_save_old_callbacks(transport, sk); 1854 1855 sk->sk_user_data = xprt; 1856 sk->sk_data_ready = xs_local_data_ready; 1857 sk->sk_write_space = xs_udp_write_space; 1858 sk->sk_allocation = GFP_ATOMIC; 1859 1860 xprt_clear_connected(xprt); 1861 1862 /* Reset to new socket */ 1863 transport->sock = sock; 1864 transport->inet = sk; 1865 1866 write_unlock_bh(&sk->sk_callback_lock); 1867 } 1868 1869 /* Tell the socket layer to start connecting... */ 1870 xprt->stat.connect_count++; 1871 xprt->stat.connect_start = jiffies; 1872 return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0); 1873 } 1874 1875 /** 1876 * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint 1877 * @xprt: RPC transport to connect 1878 * @transport: socket transport to connect 1879 * @create_sock: function to create a socket of the correct type 1880 */ 1881 static int xs_local_setup_socket(struct sock_xprt *transport) 1882 { 1883 struct rpc_xprt *xprt = &transport->xprt; 1884 struct socket *sock; 1885 int status = -EIO; 1886 1887 current->flags |= PF_FSTRANS; 1888 1889 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 1890 status = __sock_create(xprt->xprt_net, AF_LOCAL, 1891 SOCK_STREAM, 0, &sock, 1); 1892 if (status < 0) { 1893 dprintk("RPC: can't create AF_LOCAL " 1894 "transport socket (%d).\n", -status); 1895 goto out; 1896 } 1897 xs_reclassify_socketu(sock); 1898 1899 dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", 1900 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1901 1902 status = xs_local_finish_connecting(xprt, sock); 1903 trace_rpc_socket_connect(xprt, sock, status); 1904 switch (status) { 1905 case 0: 1906 dprintk("RPC: xprt %p connected to %s\n", 1907 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1908 xprt_set_connected(xprt); 1909 break; 1910 case -ENOENT: 1911 dprintk("RPC: xprt %p: socket %s does not exist\n", 1912 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1913 break; 1914 case -ECONNREFUSED: 1915 dprintk("RPC: xprt %p: connection refused for %s\n", 1916 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); 1917 break; 1918 default: 1919 printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n", 1920 __func__, -status, 1921 xprt->address_strings[RPC_DISPLAY_ADDR]); 1922 } 1923 1924 out: 1925 xprt_clear_connecting(xprt); 1926 xprt_wake_pending_tasks(xprt, status); 1927 current->flags &= ~PF_FSTRANS; 1928 return status; 1929 } 1930 1931 static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) 1932 { 1933 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1934 int ret; 1935 1936 if (RPC_IS_ASYNC(task)) { 1937 /* 1938 * We want the AF_LOCAL connect to be resolved in the 1939 * filesystem namespace of the process making the rpc 1940 * call. Thus we connect synchronously. 1941 * 1942 * If we want to support asynchronous AF_LOCAL calls, 1943 * we'll need to figure out how to pass a namespace to 1944 * connect. 1945 */ 1946 rpc_exit(task, -ENOTCONN); 1947 return; 1948 } 1949 ret = xs_local_setup_socket(transport); 1950 if (ret && !RPC_IS_SOFTCONN(task)) 1951 msleep_interruptible(15000); 1952 } 1953 1954 #ifdef CONFIG_SUNRPC_SWAP 1955 static void xs_set_memalloc(struct rpc_xprt *xprt) 1956 { 1957 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 1958 xprt); 1959 1960 if (xprt->swapper) 1961 sk_set_memalloc(transport->inet); 1962 } 1963 1964 /** 1965 * xs_swapper - Tag this transport as being used for swap. 1966 * @xprt: transport to tag 1967 * @enable: enable/disable 1968 * 1969 */ 1970 int xs_swapper(struct rpc_xprt *xprt, int enable) 1971 { 1972 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, 1973 xprt); 1974 int err = 0; 1975 1976 if (enable) { 1977 xprt->swapper++; 1978 xs_set_memalloc(xprt); 1979 } else if (xprt->swapper) { 1980 xprt->swapper--; 1981 sk_clear_memalloc(transport->inet); 1982 } 1983 1984 return err; 1985 } 1986 EXPORT_SYMBOL_GPL(xs_swapper); 1987 #else 1988 static void xs_set_memalloc(struct rpc_xprt *xprt) 1989 { 1990 } 1991 #endif 1992 1993 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 1994 { 1995 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 1996 1997 if (!transport->inet) { 1998 struct sock *sk = sock->sk; 1999 2000 write_lock_bh(&sk->sk_callback_lock); 2001 2002 xs_save_old_callbacks(transport, sk); 2003 2004 sk->sk_user_data = xprt; 2005 sk->sk_data_ready = xs_udp_data_ready; 2006 sk->sk_write_space = xs_udp_write_space; 2007 sk->sk_no_check = UDP_CSUM_NORCV; 2008 sk->sk_allocation = GFP_ATOMIC; 2009 2010 xprt_set_connected(xprt); 2011 2012 /* Reset to new socket */ 2013 transport->sock = sock; 2014 transport->inet = sk; 2015 2016 xs_set_memalloc(xprt); 2017 2018 write_unlock_bh(&sk->sk_callback_lock); 2019 } 2020 xs_udp_do_set_buffer_size(xprt); 2021 } 2022 2023 static void xs_udp_setup_socket(struct work_struct *work) 2024 { 2025 struct sock_xprt *transport = 2026 container_of(work, struct sock_xprt, connect_worker.work); 2027 struct rpc_xprt *xprt = &transport->xprt; 2028 struct socket *sock = transport->sock; 2029 int status = -EIO; 2030 2031 current->flags |= PF_FSTRANS; 2032 2033 /* Start by resetting any existing state */ 2034 xs_reset_transport(transport); 2035 sock = xs_create_sock(xprt, transport, 2036 xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP); 2037 if (IS_ERR(sock)) 2038 goto out; 2039 2040 dprintk("RPC: worker connecting xprt %p via %s to " 2041 "%s (port %s)\n", xprt, 2042 xprt->address_strings[RPC_DISPLAY_PROTO], 2043 xprt->address_strings[RPC_DISPLAY_ADDR], 2044 xprt->address_strings[RPC_DISPLAY_PORT]); 2045 2046 xs_udp_finish_connecting(xprt, sock); 2047 trace_rpc_socket_connect(xprt, sock, 0); 2048 status = 0; 2049 out: 2050 xprt_clear_connecting(xprt); 2051 xprt_wake_pending_tasks(xprt, status); 2052 current->flags &= ~PF_FSTRANS; 2053 } 2054 2055 /* 2056 * We need to preserve the port number so the reply cache on the server can 2057 * find our cached RPC replies when we get around to reconnecting. 2058 */ 2059 static void xs_abort_connection(struct sock_xprt *transport) 2060 { 2061 int result; 2062 struct sockaddr any; 2063 2064 dprintk("RPC: disconnecting xprt %p to reuse port\n", transport); 2065 2066 /* 2067 * Disconnect the transport socket by doing a connect operation 2068 * with AF_UNSPEC. This should return immediately... 2069 */ 2070 memset(&any, 0, sizeof(any)); 2071 any.sa_family = AF_UNSPEC; 2072 result = kernel_connect(transport->sock, &any, sizeof(any), 0); 2073 trace_rpc_socket_reset_connection(&transport->xprt, 2074 transport->sock, result); 2075 if (!result) 2076 xs_sock_reset_connection_flags(&transport->xprt); 2077 dprintk("RPC: AF_UNSPEC connect return code %d\n", result); 2078 } 2079 2080 static void xs_tcp_reuse_connection(struct sock_xprt *transport) 2081 { 2082 unsigned int state = transport->inet->sk_state; 2083 2084 if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) { 2085 /* we don't need to abort the connection if the socket 2086 * hasn't undergone a shutdown 2087 */ 2088 if (transport->inet->sk_shutdown == 0) 2089 return; 2090 dprintk("RPC: %s: TCP_CLOSEd and sk_shutdown set to %d\n", 2091 __func__, transport->inet->sk_shutdown); 2092 } 2093 if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) { 2094 /* we don't need to abort the connection if the socket 2095 * hasn't undergone a shutdown 2096 */ 2097 if (transport->inet->sk_shutdown == 0) 2098 return; 2099 dprintk("RPC: %s: ESTABLISHED/SYN_SENT " 2100 "sk_shutdown set to %d\n", 2101 __func__, transport->inet->sk_shutdown); 2102 } 2103 xs_abort_connection(transport); 2104 } 2105 2106 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) 2107 { 2108 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2109 int ret = -ENOTCONN; 2110 2111 if (!transport->inet) { 2112 struct sock *sk = sock->sk; 2113 unsigned int keepidle = xprt->timeout->to_initval / HZ; 2114 unsigned int keepcnt = xprt->timeout->to_retries + 1; 2115 unsigned int opt_on = 1; 2116 2117 /* TCP Keepalive options */ 2118 kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 2119 (char *)&opt_on, sizeof(opt_on)); 2120 kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE, 2121 (char *)&keepidle, sizeof(keepidle)); 2122 kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL, 2123 (char *)&keepidle, sizeof(keepidle)); 2124 kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, 2125 (char *)&keepcnt, sizeof(keepcnt)); 2126 2127 write_lock_bh(&sk->sk_callback_lock); 2128 2129 xs_save_old_callbacks(transport, sk); 2130 2131 sk->sk_user_data = xprt; 2132 sk->sk_data_ready = xs_tcp_data_ready; 2133 sk->sk_state_change = xs_tcp_state_change; 2134 sk->sk_write_space = xs_tcp_write_space; 2135 sk->sk_allocation = GFP_ATOMIC; 2136 2137 /* socket options */ 2138 sk->sk_userlocks |= SOCK_BINDPORT_LOCK; 2139 sock_reset_flag(sk, SOCK_LINGER); 2140 tcp_sk(sk)->linger2 = 0; 2141 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; 2142 2143 xprt_clear_connected(xprt); 2144 2145 /* Reset to new socket */ 2146 transport->sock = sock; 2147 transport->inet = sk; 2148 2149 write_unlock_bh(&sk->sk_callback_lock); 2150 } 2151 2152 if (!xprt_bound(xprt)) 2153 goto out; 2154 2155 xs_set_memalloc(xprt); 2156 2157 /* Tell the socket layer to start connecting... */ 2158 xprt->stat.connect_count++; 2159 xprt->stat.connect_start = jiffies; 2160 ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK); 2161 switch (ret) { 2162 case 0: 2163 case -EINPROGRESS: 2164 /* SYN_SENT! */ 2165 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2166 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2167 } 2168 out: 2169 return ret; 2170 } 2171 2172 /** 2173 * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint 2174 * @xprt: RPC transport to connect 2175 * @transport: socket transport to connect 2176 * @create_sock: function to create a socket of the correct type 2177 * 2178 * Invoked by a work queue tasklet. 2179 */ 2180 static void xs_tcp_setup_socket(struct work_struct *work) 2181 { 2182 struct sock_xprt *transport = 2183 container_of(work, struct sock_xprt, connect_worker.work); 2184 struct socket *sock = transport->sock; 2185 struct rpc_xprt *xprt = &transport->xprt; 2186 int status = -EIO; 2187 2188 current->flags |= PF_FSTRANS; 2189 2190 if (!sock) { 2191 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state); 2192 sock = xs_create_sock(xprt, transport, 2193 xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP); 2194 if (IS_ERR(sock)) { 2195 status = PTR_ERR(sock); 2196 goto out; 2197 } 2198 } else { 2199 int abort_and_exit; 2200 2201 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT, 2202 &xprt->state); 2203 /* "close" the socket, preserving the local port */ 2204 xs_tcp_reuse_connection(transport); 2205 2206 if (abort_and_exit) 2207 goto out_eagain; 2208 } 2209 2210 dprintk("RPC: worker connecting xprt %p via %s to " 2211 "%s (port %s)\n", xprt, 2212 xprt->address_strings[RPC_DISPLAY_PROTO], 2213 xprt->address_strings[RPC_DISPLAY_ADDR], 2214 xprt->address_strings[RPC_DISPLAY_PORT]); 2215 2216 status = xs_tcp_finish_connecting(xprt, sock); 2217 trace_rpc_socket_connect(xprt, sock, status); 2218 dprintk("RPC: %p connect status %d connected %d sock state %d\n", 2219 xprt, -status, xprt_connected(xprt), 2220 sock->sk->sk_state); 2221 switch (status) { 2222 default: 2223 printk("%s: connect returned unhandled error %d\n", 2224 __func__, status); 2225 case -EADDRNOTAVAIL: 2226 /* We're probably in TIME_WAIT. Get rid of existing socket, 2227 * and retry 2228 */ 2229 xs_tcp_force_close(xprt); 2230 break; 2231 case 0: 2232 case -EINPROGRESS: 2233 case -EALREADY: 2234 xprt_clear_connecting(xprt); 2235 current->flags &= ~PF_FSTRANS; 2236 return; 2237 case -EINVAL: 2238 /* Happens, for instance, if the user specified a link 2239 * local IPv6 address without a scope-id. 2240 */ 2241 case -ECONNREFUSED: 2242 case -ECONNRESET: 2243 case -ENETUNREACH: 2244 /* retry with existing socket, after a delay */ 2245 goto out; 2246 } 2247 out_eagain: 2248 status = -EAGAIN; 2249 out: 2250 xprt_clear_connecting(xprt); 2251 xprt_wake_pending_tasks(xprt, status); 2252 current->flags &= ~PF_FSTRANS; 2253 } 2254 2255 /** 2256 * xs_connect - connect a socket to a remote endpoint 2257 * @xprt: pointer to transport structure 2258 * @task: address of RPC task that manages state of connect request 2259 * 2260 * TCP: If the remote end dropped the connection, delay reconnecting. 2261 * 2262 * UDP socket connects are synchronous, but we use a work queue anyway 2263 * to guarantee that even unprivileged user processes can set up a 2264 * socket on a privileged port. 2265 * 2266 * If a UDP socket connect fails, the delay behavior here prevents 2267 * retry floods (hard mounts). 2268 */ 2269 static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) 2270 { 2271 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2272 2273 if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) { 2274 dprintk("RPC: xs_connect delayed xprt %p for %lu " 2275 "seconds\n", 2276 xprt, xprt->reestablish_timeout / HZ); 2277 queue_delayed_work(rpciod_workqueue, 2278 &transport->connect_worker, 2279 xprt->reestablish_timeout); 2280 xprt->reestablish_timeout <<= 1; 2281 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) 2282 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2283 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) 2284 xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; 2285 } else { 2286 dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); 2287 queue_delayed_work(rpciod_workqueue, 2288 &transport->connect_worker, 0); 2289 } 2290 } 2291 2292 /** 2293 * xs_local_print_stats - display AF_LOCAL socket-specifc stats 2294 * @xprt: rpc_xprt struct containing statistics 2295 * @seq: output file 2296 * 2297 */ 2298 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2299 { 2300 long idle_time = 0; 2301 2302 if (xprt_connected(xprt)) 2303 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2304 2305 seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu " 2306 "%llu %llu %lu %llu %llu\n", 2307 xprt->stat.bind_count, 2308 xprt->stat.connect_count, 2309 xprt->stat.connect_time, 2310 idle_time, 2311 xprt->stat.sends, 2312 xprt->stat.recvs, 2313 xprt->stat.bad_xids, 2314 xprt->stat.req_u, 2315 xprt->stat.bklog_u, 2316 xprt->stat.max_slots, 2317 xprt->stat.sending_u, 2318 xprt->stat.pending_u); 2319 } 2320 2321 /** 2322 * xs_udp_print_stats - display UDP socket-specifc stats 2323 * @xprt: rpc_xprt struct containing statistics 2324 * @seq: output file 2325 * 2326 */ 2327 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2328 { 2329 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2330 2331 seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %llu %llu " 2332 "%lu %llu %llu\n", 2333 transport->srcport, 2334 xprt->stat.bind_count, 2335 xprt->stat.sends, 2336 xprt->stat.recvs, 2337 xprt->stat.bad_xids, 2338 xprt->stat.req_u, 2339 xprt->stat.bklog_u, 2340 xprt->stat.max_slots, 2341 xprt->stat.sending_u, 2342 xprt->stat.pending_u); 2343 } 2344 2345 /** 2346 * xs_tcp_print_stats - display TCP socket-specifc stats 2347 * @xprt: rpc_xprt struct containing statistics 2348 * @seq: output file 2349 * 2350 */ 2351 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) 2352 { 2353 struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); 2354 long idle_time = 0; 2355 2356 if (xprt_connected(xprt)) 2357 idle_time = (long)(jiffies - xprt->last_used) / HZ; 2358 2359 seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu " 2360 "%llu %llu %lu %llu %llu\n", 2361 transport->srcport, 2362 xprt->stat.bind_count, 2363 xprt->stat.connect_count, 2364 xprt->stat.connect_time, 2365 idle_time, 2366 xprt->stat.sends, 2367 xprt->stat.recvs, 2368 xprt->stat.bad_xids, 2369 xprt->stat.req_u, 2370 xprt->stat.bklog_u, 2371 xprt->stat.max_slots, 2372 xprt->stat.sending_u, 2373 xprt->stat.pending_u); 2374 } 2375 2376 /* 2377 * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason 2378 * we allocate pages instead doing a kmalloc like rpc_malloc is because we want 2379 * to use the server side send routines. 2380 */ 2381 static void *bc_malloc(struct rpc_task *task, size_t size) 2382 { 2383 struct page *page; 2384 struct rpc_buffer *buf; 2385 2386 WARN_ON_ONCE(size > PAGE_SIZE - sizeof(struct rpc_buffer)); 2387 if (size > PAGE_SIZE - sizeof(struct rpc_buffer)) 2388 return NULL; 2389 2390 page = alloc_page(GFP_KERNEL); 2391 if (!page) 2392 return NULL; 2393 2394 buf = page_address(page); 2395 buf->len = PAGE_SIZE; 2396 2397 return buf->data; 2398 } 2399 2400 /* 2401 * Free the space allocated in the bc_alloc routine 2402 */ 2403 static void bc_free(void *buffer) 2404 { 2405 struct rpc_buffer *buf; 2406 2407 if (!buffer) 2408 return; 2409 2410 buf = container_of(buffer, struct rpc_buffer, data); 2411 free_page((unsigned long)buf); 2412 } 2413 2414 /* 2415 * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex 2416 * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request. 2417 */ 2418 static int bc_sendto(struct rpc_rqst *req) 2419 { 2420 int len; 2421 struct xdr_buf *xbufp = &req->rq_snd_buf; 2422 struct rpc_xprt *xprt = req->rq_xprt; 2423 struct sock_xprt *transport = 2424 container_of(xprt, struct sock_xprt, xprt); 2425 struct socket *sock = transport->sock; 2426 unsigned long headoff; 2427 unsigned long tailoff; 2428 2429 xs_encode_stream_record_marker(xbufp); 2430 2431 tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK; 2432 headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK; 2433 len = svc_send_common(sock, xbufp, 2434 virt_to_page(xbufp->head[0].iov_base), headoff, 2435 xbufp->tail[0].iov_base, tailoff); 2436 2437 if (len != xbufp->len) { 2438 printk(KERN_NOTICE "Error sending entire callback!\n"); 2439 len = -EAGAIN; 2440 } 2441 2442 return len; 2443 } 2444 2445 /* 2446 * The send routine. Borrows from svc_send 2447 */ 2448 static int bc_send_request(struct rpc_task *task) 2449 { 2450 struct rpc_rqst *req = task->tk_rqstp; 2451 struct svc_xprt *xprt; 2452 u32 len; 2453 2454 dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid)); 2455 /* 2456 * Get the server socket associated with this callback xprt 2457 */ 2458 xprt = req->rq_xprt->bc_xprt; 2459 2460 /* 2461 * Grab the mutex to serialize data as the connection is shared 2462 * with the fore channel 2463 */ 2464 if (!mutex_trylock(&xprt->xpt_mutex)) { 2465 rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL); 2466 if (!mutex_trylock(&xprt->xpt_mutex)) 2467 return -EAGAIN; 2468 rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task); 2469 } 2470 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 2471 len = -ENOTCONN; 2472 else 2473 len = bc_sendto(req); 2474 mutex_unlock(&xprt->xpt_mutex); 2475 2476 if (len > 0) 2477 len = 0; 2478 2479 return len; 2480 } 2481 2482 /* 2483 * The close routine. Since this is client initiated, we do nothing 2484 */ 2485 2486 static void bc_close(struct rpc_xprt *xprt) 2487 { 2488 } 2489 2490 /* 2491 * The xprt destroy routine. Again, because this connection is client 2492 * initiated, we do nothing 2493 */ 2494 2495 static void bc_destroy(struct rpc_xprt *xprt) 2496 { 2497 } 2498 2499 static struct rpc_xprt_ops xs_local_ops = { 2500 .reserve_xprt = xprt_reserve_xprt, 2501 .release_xprt = xs_tcp_release_xprt, 2502 .alloc_slot = xprt_alloc_slot, 2503 .rpcbind = xs_local_rpcbind, 2504 .set_port = xs_local_set_port, 2505 .connect = xs_local_connect, 2506 .buf_alloc = rpc_malloc, 2507 .buf_free = rpc_free, 2508 .send_request = xs_local_send_request, 2509 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2510 .close = xs_close, 2511 .destroy = xs_destroy, 2512 .print_stats = xs_local_print_stats, 2513 }; 2514 2515 static struct rpc_xprt_ops xs_udp_ops = { 2516 .set_buffer_size = xs_udp_set_buffer_size, 2517 .reserve_xprt = xprt_reserve_xprt_cong, 2518 .release_xprt = xprt_release_xprt_cong, 2519 .alloc_slot = xprt_alloc_slot, 2520 .rpcbind = rpcb_getport_async, 2521 .set_port = xs_set_port, 2522 .connect = xs_connect, 2523 .buf_alloc = rpc_malloc, 2524 .buf_free = rpc_free, 2525 .send_request = xs_udp_send_request, 2526 .set_retrans_timeout = xprt_set_retrans_timeout_rtt, 2527 .timer = xs_udp_timer, 2528 .release_request = xprt_release_rqst_cong, 2529 .close = xs_close, 2530 .destroy = xs_destroy, 2531 .print_stats = xs_udp_print_stats, 2532 }; 2533 2534 static struct rpc_xprt_ops xs_tcp_ops = { 2535 .reserve_xprt = xprt_reserve_xprt, 2536 .release_xprt = xs_tcp_release_xprt, 2537 .alloc_slot = xprt_lock_and_alloc_slot, 2538 .rpcbind = rpcb_getport_async, 2539 .set_port = xs_set_port, 2540 .connect = xs_connect, 2541 .buf_alloc = rpc_malloc, 2542 .buf_free = rpc_free, 2543 .send_request = xs_tcp_send_request, 2544 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2545 .close = xs_tcp_close, 2546 .destroy = xs_destroy, 2547 .print_stats = xs_tcp_print_stats, 2548 }; 2549 2550 /* 2551 * The rpc_xprt_ops for the server backchannel 2552 */ 2553 2554 static struct rpc_xprt_ops bc_tcp_ops = { 2555 .reserve_xprt = xprt_reserve_xprt, 2556 .release_xprt = xprt_release_xprt, 2557 .alloc_slot = xprt_alloc_slot, 2558 .buf_alloc = bc_malloc, 2559 .buf_free = bc_free, 2560 .send_request = bc_send_request, 2561 .set_retrans_timeout = xprt_set_retrans_timeout_def, 2562 .close = bc_close, 2563 .destroy = bc_destroy, 2564 .print_stats = xs_tcp_print_stats, 2565 }; 2566 2567 static int xs_init_anyaddr(const int family, struct sockaddr *sap) 2568 { 2569 static const struct sockaddr_in sin = { 2570 .sin_family = AF_INET, 2571 .sin_addr.s_addr = htonl(INADDR_ANY), 2572 }; 2573 static const struct sockaddr_in6 sin6 = { 2574 .sin6_family = AF_INET6, 2575 .sin6_addr = IN6ADDR_ANY_INIT, 2576 }; 2577 2578 switch (family) { 2579 case AF_LOCAL: 2580 break; 2581 case AF_INET: 2582 memcpy(sap, &sin, sizeof(sin)); 2583 break; 2584 case AF_INET6: 2585 memcpy(sap, &sin6, sizeof(sin6)); 2586 break; 2587 default: 2588 dprintk("RPC: %s: Bad address family\n", __func__); 2589 return -EAFNOSUPPORT; 2590 } 2591 return 0; 2592 } 2593 2594 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, 2595 unsigned int slot_table_size, 2596 unsigned int max_slot_table_size) 2597 { 2598 struct rpc_xprt *xprt; 2599 struct sock_xprt *new; 2600 2601 if (args->addrlen > sizeof(xprt->addr)) { 2602 dprintk("RPC: xs_setup_xprt: address too large\n"); 2603 return ERR_PTR(-EBADF); 2604 } 2605 2606 xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size, 2607 max_slot_table_size); 2608 if (xprt == NULL) { 2609 dprintk("RPC: xs_setup_xprt: couldn't allocate " 2610 "rpc_xprt\n"); 2611 return ERR_PTR(-ENOMEM); 2612 } 2613 2614 new = container_of(xprt, struct sock_xprt, xprt); 2615 memcpy(&xprt->addr, args->dstaddr, args->addrlen); 2616 xprt->addrlen = args->addrlen; 2617 if (args->srcaddr) 2618 memcpy(&new->srcaddr, args->srcaddr, args->addrlen); 2619 else { 2620 int err; 2621 err = xs_init_anyaddr(args->dstaddr->sa_family, 2622 (struct sockaddr *)&new->srcaddr); 2623 if (err != 0) { 2624 xprt_free(xprt); 2625 return ERR_PTR(err); 2626 } 2627 } 2628 2629 return xprt; 2630 } 2631 2632 static const struct rpc_timeout xs_local_default_timeout = { 2633 .to_initval = 10 * HZ, 2634 .to_maxval = 10 * HZ, 2635 .to_retries = 2, 2636 }; 2637 2638 /** 2639 * xs_setup_local - Set up transport to use an AF_LOCAL socket 2640 * @args: rpc transport creation arguments 2641 * 2642 * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP 2643 */ 2644 static struct rpc_xprt *xs_setup_local(struct xprt_create *args) 2645 { 2646 struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr; 2647 struct sock_xprt *transport; 2648 struct rpc_xprt *xprt; 2649 struct rpc_xprt *ret; 2650 2651 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 2652 xprt_max_tcp_slot_table_entries); 2653 if (IS_ERR(xprt)) 2654 return xprt; 2655 transport = container_of(xprt, struct sock_xprt, xprt); 2656 2657 xprt->prot = 0; 2658 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2659 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2660 2661 xprt->bind_timeout = XS_BIND_TO; 2662 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2663 xprt->idle_timeout = XS_IDLE_DISC_TO; 2664 2665 xprt->ops = &xs_local_ops; 2666 xprt->timeout = &xs_local_default_timeout; 2667 2668 INIT_DELAYED_WORK(&transport->connect_worker, 2669 xs_dummy_setup_socket); 2670 2671 switch (sun->sun_family) { 2672 case AF_LOCAL: 2673 if (sun->sun_path[0] != '/') { 2674 dprintk("RPC: bad AF_LOCAL address: %s\n", 2675 sun->sun_path); 2676 ret = ERR_PTR(-EINVAL); 2677 goto out_err; 2678 } 2679 xprt_set_bound(xprt); 2680 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL); 2681 ret = ERR_PTR(xs_local_setup_socket(transport)); 2682 if (ret) 2683 goto out_err; 2684 break; 2685 default: 2686 ret = ERR_PTR(-EAFNOSUPPORT); 2687 goto out_err; 2688 } 2689 2690 dprintk("RPC: set up xprt to %s via AF_LOCAL\n", 2691 xprt->address_strings[RPC_DISPLAY_ADDR]); 2692 2693 if (try_module_get(THIS_MODULE)) 2694 return xprt; 2695 ret = ERR_PTR(-EINVAL); 2696 out_err: 2697 xprt_free(xprt); 2698 return ret; 2699 } 2700 2701 static const struct rpc_timeout xs_udp_default_timeout = { 2702 .to_initval = 5 * HZ, 2703 .to_maxval = 30 * HZ, 2704 .to_increment = 5 * HZ, 2705 .to_retries = 5, 2706 }; 2707 2708 /** 2709 * xs_setup_udp - Set up transport to use a UDP socket 2710 * @args: rpc transport creation arguments 2711 * 2712 */ 2713 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) 2714 { 2715 struct sockaddr *addr = args->dstaddr; 2716 struct rpc_xprt *xprt; 2717 struct sock_xprt *transport; 2718 struct rpc_xprt *ret; 2719 2720 xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries, 2721 xprt_udp_slot_table_entries); 2722 if (IS_ERR(xprt)) 2723 return xprt; 2724 transport = container_of(xprt, struct sock_xprt, xprt); 2725 2726 xprt->prot = IPPROTO_UDP; 2727 xprt->tsh_size = 0; 2728 /* XXX: header size can vary due to auth type, IPv6, etc. */ 2729 xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 2730 2731 xprt->bind_timeout = XS_BIND_TO; 2732 xprt->reestablish_timeout = XS_UDP_REEST_TO; 2733 xprt->idle_timeout = XS_IDLE_DISC_TO; 2734 2735 xprt->ops = &xs_udp_ops; 2736 2737 xprt->timeout = &xs_udp_default_timeout; 2738 2739 switch (addr->sa_family) { 2740 case AF_INET: 2741 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2742 xprt_set_bound(xprt); 2743 2744 INIT_DELAYED_WORK(&transport->connect_worker, 2745 xs_udp_setup_socket); 2746 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); 2747 break; 2748 case AF_INET6: 2749 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2750 xprt_set_bound(xprt); 2751 2752 INIT_DELAYED_WORK(&transport->connect_worker, 2753 xs_udp_setup_socket); 2754 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); 2755 break; 2756 default: 2757 ret = ERR_PTR(-EAFNOSUPPORT); 2758 goto out_err; 2759 } 2760 2761 if (xprt_bound(xprt)) 2762 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2763 xprt->address_strings[RPC_DISPLAY_ADDR], 2764 xprt->address_strings[RPC_DISPLAY_PORT], 2765 xprt->address_strings[RPC_DISPLAY_PROTO]); 2766 else 2767 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 2768 xprt->address_strings[RPC_DISPLAY_ADDR], 2769 xprt->address_strings[RPC_DISPLAY_PROTO]); 2770 2771 if (try_module_get(THIS_MODULE)) 2772 return xprt; 2773 ret = ERR_PTR(-EINVAL); 2774 out_err: 2775 xprt_free(xprt); 2776 return ret; 2777 } 2778 2779 static const struct rpc_timeout xs_tcp_default_timeout = { 2780 .to_initval = 60 * HZ, 2781 .to_maxval = 60 * HZ, 2782 .to_retries = 2, 2783 }; 2784 2785 /** 2786 * xs_setup_tcp - Set up transport to use a TCP socket 2787 * @args: rpc transport creation arguments 2788 * 2789 */ 2790 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) 2791 { 2792 struct sockaddr *addr = args->dstaddr; 2793 struct rpc_xprt *xprt; 2794 struct sock_xprt *transport; 2795 struct rpc_xprt *ret; 2796 unsigned int max_slot_table_size = xprt_max_tcp_slot_table_entries; 2797 2798 if (args->flags & XPRT_CREATE_INFINITE_SLOTS) 2799 max_slot_table_size = RPC_MAX_SLOT_TABLE_LIMIT; 2800 2801 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 2802 max_slot_table_size); 2803 if (IS_ERR(xprt)) 2804 return xprt; 2805 transport = container_of(xprt, struct sock_xprt, xprt); 2806 2807 xprt->prot = IPPROTO_TCP; 2808 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2809 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2810 2811 xprt->bind_timeout = XS_BIND_TO; 2812 xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; 2813 xprt->idle_timeout = XS_IDLE_DISC_TO; 2814 2815 xprt->ops = &xs_tcp_ops; 2816 xprt->timeout = &xs_tcp_default_timeout; 2817 2818 switch (addr->sa_family) { 2819 case AF_INET: 2820 if (((struct sockaddr_in *)addr)->sin_port != htons(0)) 2821 xprt_set_bound(xprt); 2822 2823 INIT_DELAYED_WORK(&transport->connect_worker, 2824 xs_tcp_setup_socket); 2825 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); 2826 break; 2827 case AF_INET6: 2828 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) 2829 xprt_set_bound(xprt); 2830 2831 INIT_DELAYED_WORK(&transport->connect_worker, 2832 xs_tcp_setup_socket); 2833 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); 2834 break; 2835 default: 2836 ret = ERR_PTR(-EAFNOSUPPORT); 2837 goto out_err; 2838 } 2839 2840 if (xprt_bound(xprt)) 2841 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2842 xprt->address_strings[RPC_DISPLAY_ADDR], 2843 xprt->address_strings[RPC_DISPLAY_PORT], 2844 xprt->address_strings[RPC_DISPLAY_PROTO]); 2845 else 2846 dprintk("RPC: set up xprt to %s (autobind) via %s\n", 2847 xprt->address_strings[RPC_DISPLAY_ADDR], 2848 xprt->address_strings[RPC_DISPLAY_PROTO]); 2849 2850 2851 if (try_module_get(THIS_MODULE)) 2852 return xprt; 2853 ret = ERR_PTR(-EINVAL); 2854 out_err: 2855 xprt_free(xprt); 2856 return ret; 2857 } 2858 2859 /** 2860 * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket 2861 * @args: rpc transport creation arguments 2862 * 2863 */ 2864 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args) 2865 { 2866 struct sockaddr *addr = args->dstaddr; 2867 struct rpc_xprt *xprt; 2868 struct sock_xprt *transport; 2869 struct svc_sock *bc_sock; 2870 struct rpc_xprt *ret; 2871 2872 if (args->bc_xprt->xpt_bc_xprt) { 2873 /* 2874 * This server connection already has a backchannel 2875 * transport; we can't create a new one, as we wouldn't 2876 * be able to match replies based on xid any more. So, 2877 * reuse the already-existing one: 2878 */ 2879 return args->bc_xprt->xpt_bc_xprt; 2880 } 2881 xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries, 2882 xprt_tcp_slot_table_entries); 2883 if (IS_ERR(xprt)) 2884 return xprt; 2885 transport = container_of(xprt, struct sock_xprt, xprt); 2886 2887 xprt->prot = IPPROTO_TCP; 2888 xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); 2889 xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; 2890 xprt->timeout = &xs_tcp_default_timeout; 2891 2892 /* backchannel */ 2893 xprt_set_bound(xprt); 2894 xprt->bind_timeout = 0; 2895 xprt->reestablish_timeout = 0; 2896 xprt->idle_timeout = 0; 2897 2898 xprt->ops = &bc_tcp_ops; 2899 2900 switch (addr->sa_family) { 2901 case AF_INET: 2902 xs_format_peer_addresses(xprt, "tcp", 2903 RPCBIND_NETID_TCP); 2904 break; 2905 case AF_INET6: 2906 xs_format_peer_addresses(xprt, "tcp", 2907 RPCBIND_NETID_TCP6); 2908 break; 2909 default: 2910 ret = ERR_PTR(-EAFNOSUPPORT); 2911 goto out_err; 2912 } 2913 2914 dprintk("RPC: set up xprt to %s (port %s) via %s\n", 2915 xprt->address_strings[RPC_DISPLAY_ADDR], 2916 xprt->address_strings[RPC_DISPLAY_PORT], 2917 xprt->address_strings[RPC_DISPLAY_PROTO]); 2918 2919 /* 2920 * Once we've associated a backchannel xprt with a connection, 2921 * we want to keep it around as long as long as the connection 2922 * lasts, in case we need to start using it for a backchannel 2923 * again; this reference won't be dropped until bc_xprt is 2924 * destroyed. 2925 */ 2926 xprt_get(xprt); 2927 args->bc_xprt->xpt_bc_xprt = xprt; 2928 xprt->bc_xprt = args->bc_xprt; 2929 bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt); 2930 transport->sock = bc_sock->sk_sock; 2931 transport->inet = bc_sock->sk_sk; 2932 2933 /* 2934 * Since we don't want connections for the backchannel, we set 2935 * the xprt status to connected 2936 */ 2937 xprt_set_connected(xprt); 2938 2939 2940 if (try_module_get(THIS_MODULE)) 2941 return xprt; 2942 xprt_put(xprt); 2943 ret = ERR_PTR(-EINVAL); 2944 out_err: 2945 xprt_free(xprt); 2946 return ret; 2947 } 2948 2949 static struct xprt_class xs_local_transport = { 2950 .list = LIST_HEAD_INIT(xs_local_transport.list), 2951 .name = "named UNIX socket", 2952 .owner = THIS_MODULE, 2953 .ident = XPRT_TRANSPORT_LOCAL, 2954 .setup = xs_setup_local, 2955 }; 2956 2957 static struct xprt_class xs_udp_transport = { 2958 .list = LIST_HEAD_INIT(xs_udp_transport.list), 2959 .name = "udp", 2960 .owner = THIS_MODULE, 2961 .ident = XPRT_TRANSPORT_UDP, 2962 .setup = xs_setup_udp, 2963 }; 2964 2965 static struct xprt_class xs_tcp_transport = { 2966 .list = LIST_HEAD_INIT(xs_tcp_transport.list), 2967 .name = "tcp", 2968 .owner = THIS_MODULE, 2969 .ident = XPRT_TRANSPORT_TCP, 2970 .setup = xs_setup_tcp, 2971 }; 2972 2973 static struct xprt_class xs_bc_tcp_transport = { 2974 .list = LIST_HEAD_INIT(xs_bc_tcp_transport.list), 2975 .name = "tcp NFSv4.1 backchannel", 2976 .owner = THIS_MODULE, 2977 .ident = XPRT_TRANSPORT_BC_TCP, 2978 .setup = xs_setup_bc_tcp, 2979 }; 2980 2981 /** 2982 * init_socket_xprt - set up xprtsock's sysctls, register with RPC client 2983 * 2984 */ 2985 int init_socket_xprt(void) 2986 { 2987 #ifdef RPC_DEBUG 2988 if (!sunrpc_table_header) 2989 sunrpc_table_header = register_sysctl_table(sunrpc_table); 2990 #endif 2991 2992 xprt_register_transport(&xs_local_transport); 2993 xprt_register_transport(&xs_udp_transport); 2994 xprt_register_transport(&xs_tcp_transport); 2995 xprt_register_transport(&xs_bc_tcp_transport); 2996 2997 return 0; 2998 } 2999 3000 /** 3001 * cleanup_socket_xprt - remove xprtsock's sysctls, unregister 3002 * 3003 */ 3004 void cleanup_socket_xprt(void) 3005 { 3006 #ifdef RPC_DEBUG 3007 if (sunrpc_table_header) { 3008 unregister_sysctl_table(sunrpc_table_header); 3009 sunrpc_table_header = NULL; 3010 } 3011 #endif 3012 3013 xprt_unregister_transport(&xs_local_transport); 3014 xprt_unregister_transport(&xs_udp_transport); 3015 xprt_unregister_transport(&xs_tcp_transport); 3016 xprt_unregister_transport(&xs_bc_tcp_transport); 3017 } 3018 3019 static int param_set_uint_minmax(const char *val, 3020 const struct kernel_param *kp, 3021 unsigned int min, unsigned int max) 3022 { 3023 unsigned long num; 3024 int ret; 3025 3026 if (!val) 3027 return -EINVAL; 3028 ret = strict_strtoul(val, 0, &num); 3029 if (ret == -EINVAL || num < min || num > max) 3030 return -EINVAL; 3031 *((unsigned int *)kp->arg) = num; 3032 return 0; 3033 } 3034 3035 static int param_set_portnr(const char *val, const struct kernel_param *kp) 3036 { 3037 return param_set_uint_minmax(val, kp, 3038 RPC_MIN_RESVPORT, 3039 RPC_MAX_RESVPORT); 3040 } 3041 3042 static struct kernel_param_ops param_ops_portnr = { 3043 .set = param_set_portnr, 3044 .get = param_get_uint, 3045 }; 3046 3047 #define param_check_portnr(name, p) \ 3048 __param_check(name, p, unsigned int); 3049 3050 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644); 3051 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644); 3052 3053 static int param_set_slot_table_size(const char *val, 3054 const struct kernel_param *kp) 3055 { 3056 return param_set_uint_minmax(val, kp, 3057 RPC_MIN_SLOT_TABLE, 3058 RPC_MAX_SLOT_TABLE); 3059 } 3060 3061 static struct kernel_param_ops param_ops_slot_table_size = { 3062 .set = param_set_slot_table_size, 3063 .get = param_get_uint, 3064 }; 3065 3066 #define param_check_slot_table_size(name, p) \ 3067 __param_check(name, p, unsigned int); 3068 3069 static int param_set_max_slot_table_size(const char *val, 3070 const struct kernel_param *kp) 3071 { 3072 return param_set_uint_minmax(val, kp, 3073 RPC_MIN_SLOT_TABLE, 3074 RPC_MAX_SLOT_TABLE_LIMIT); 3075 } 3076 3077 static struct kernel_param_ops param_ops_max_slot_table_size = { 3078 .set = param_set_max_slot_table_size, 3079 .get = param_get_uint, 3080 }; 3081 3082 #define param_check_max_slot_table_size(name, p) \ 3083 __param_check(name, p, unsigned int); 3084 3085 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries, 3086 slot_table_size, 0644); 3087 module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries, 3088 max_slot_table_size, 0644); 3089 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries, 3090 slot_table_size, 0644); 3091 3092