1*a246b010SChuck Lever /* 2*a246b010SChuck Lever * linux/net/sunrpc/xprtsock.c 3*a246b010SChuck Lever * 4*a246b010SChuck Lever * Client-side transport implementation for sockets. 5*a246b010SChuck Lever * 6*a246b010SChuck Lever * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> 7*a246b010SChuck Lever * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> 8*a246b010SChuck Lever * TCP NFS related read + write fixes 9*a246b010SChuck Lever * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> 10*a246b010SChuck Lever * 11*a246b010SChuck Lever * Rewrite of larges part of the code in order to stabilize TCP stuff. 12*a246b010SChuck Lever * Fix behaviour when socket buffer is full. 13*a246b010SChuck Lever * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> 14*a246b010SChuck Lever */ 15*a246b010SChuck Lever 16*a246b010SChuck Lever #include <linux/types.h> 17*a246b010SChuck Lever #include <linux/slab.h> 18*a246b010SChuck Lever #include <linux/capability.h> 19*a246b010SChuck Lever #include <linux/sched.h> 20*a246b010SChuck Lever #include <linux/pagemap.h> 21*a246b010SChuck Lever #include <linux/errno.h> 22*a246b010SChuck Lever #include <linux/socket.h> 23*a246b010SChuck Lever #include <linux/in.h> 24*a246b010SChuck Lever #include <linux/net.h> 25*a246b010SChuck Lever #include <linux/mm.h> 26*a246b010SChuck Lever #include <linux/udp.h> 27*a246b010SChuck Lever #include <linux/tcp.h> 28*a246b010SChuck Lever #include <linux/sunrpc/clnt.h> 29*a246b010SChuck Lever #include <linux/file.h> 30*a246b010SChuck Lever 31*a246b010SChuck Lever #include <net/sock.h> 32*a246b010SChuck Lever #include <net/checksum.h> 33*a246b010SChuck Lever #include <net/udp.h> 34*a246b010SChuck Lever #include <net/tcp.h> 35*a246b010SChuck Lever 36*a246b010SChuck Lever #ifdef RPC_DEBUG 37*a246b010SChuck Lever # undef RPC_DEBUG_DATA 38*a246b010SChuck Lever # define RPCDBG_FACILITY RPCDBG_XPRT 39*a246b010SChuck Lever #endif 40*a246b010SChuck Lever 41*a246b010SChuck Lever #define XPRT_MAX_RESVPORT (800) 42*a246b010SChuck Lever 43*a246b010SChuck Lever #ifdef RPC_DEBUG_DATA 44*a246b010SChuck Lever /* 45*a246b010SChuck Lever * Print the buffer contents (first 128 bytes only--just enough for 46*a246b010SChuck Lever * diropres return). 47*a246b010SChuck Lever */ 48*a246b010SChuck Lever static void 49*a246b010SChuck Lever xprt_pktdump(char *msg, u32 *packet, unsigned int count) 50*a246b010SChuck Lever { 51*a246b010SChuck Lever u8 *buf = (u8 *) packet; 52*a246b010SChuck Lever int j; 53*a246b010SChuck Lever 54*a246b010SChuck Lever dprintk("RPC: %s\n", msg); 55*a246b010SChuck Lever for (j = 0; j < count && j < 128; j += 4) { 56*a246b010SChuck Lever if (!(j & 31)) { 57*a246b010SChuck Lever if (j) 58*a246b010SChuck Lever dprintk("\n"); 59*a246b010SChuck Lever dprintk("0x%04x ", j); 60*a246b010SChuck Lever } 61*a246b010SChuck Lever dprintk("%02x%02x%02x%02x ", 62*a246b010SChuck Lever buf[j], buf[j+1], buf[j+2], buf[j+3]); 63*a246b010SChuck Lever } 64*a246b010SChuck Lever dprintk("\n"); 65*a246b010SChuck Lever } 66*a246b010SChuck Lever #else 67*a246b010SChuck Lever static inline void 68*a246b010SChuck Lever xprt_pktdump(char *msg, u32 *packet, unsigned int count) 69*a246b010SChuck Lever { 70*a246b010SChuck Lever /* NOP */ 71*a246b010SChuck Lever } 72*a246b010SChuck Lever #endif 73*a246b010SChuck Lever 74*a246b010SChuck Lever /* 75*a246b010SChuck Lever * Look up RPC transport given an INET socket 76*a246b010SChuck Lever */ 77*a246b010SChuck Lever static inline struct rpc_xprt * 78*a246b010SChuck Lever xprt_from_sock(struct sock *sk) 79*a246b010SChuck Lever { 80*a246b010SChuck Lever return (struct rpc_xprt *) sk->sk_user_data; 81*a246b010SChuck Lever } 82*a246b010SChuck Lever 83*a246b010SChuck Lever static int 84*a246b010SChuck Lever xdr_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, 85*a246b010SChuck Lever struct xdr_buf *xdr, unsigned int base, int msgflags) 86*a246b010SChuck Lever { 87*a246b010SChuck Lever struct page **ppage = xdr->pages; 88*a246b010SChuck Lever unsigned int len, pglen = xdr->page_len; 89*a246b010SChuck Lever int err, ret = 0; 90*a246b010SChuck Lever ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int); 91*a246b010SChuck Lever 92*a246b010SChuck Lever len = xdr->head[0].iov_len; 93*a246b010SChuck Lever if (base < len || (addr != NULL && base == 0)) { 94*a246b010SChuck Lever struct kvec iov = { 95*a246b010SChuck Lever .iov_base = xdr->head[0].iov_base + base, 96*a246b010SChuck Lever .iov_len = len - base, 97*a246b010SChuck Lever }; 98*a246b010SChuck Lever struct msghdr msg = { 99*a246b010SChuck Lever .msg_name = addr, 100*a246b010SChuck Lever .msg_namelen = addrlen, 101*a246b010SChuck Lever .msg_flags = msgflags, 102*a246b010SChuck Lever }; 103*a246b010SChuck Lever if (xdr->len > len) 104*a246b010SChuck Lever msg.msg_flags |= MSG_MORE; 105*a246b010SChuck Lever 106*a246b010SChuck Lever if (iov.iov_len != 0) 107*a246b010SChuck Lever err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 108*a246b010SChuck Lever else 109*a246b010SChuck Lever err = kernel_sendmsg(sock, &msg, NULL, 0, 0); 110*a246b010SChuck Lever if (ret == 0) 111*a246b010SChuck Lever ret = err; 112*a246b010SChuck Lever else if (err > 0) 113*a246b010SChuck Lever ret += err; 114*a246b010SChuck Lever if (err != iov.iov_len) 115*a246b010SChuck Lever goto out; 116*a246b010SChuck Lever base = 0; 117*a246b010SChuck Lever } else 118*a246b010SChuck Lever base -= len; 119*a246b010SChuck Lever 120*a246b010SChuck Lever if (pglen == 0) 121*a246b010SChuck Lever goto copy_tail; 122*a246b010SChuck Lever if (base >= pglen) { 123*a246b010SChuck Lever base -= pglen; 124*a246b010SChuck Lever goto copy_tail; 125*a246b010SChuck Lever } 126*a246b010SChuck Lever if (base || xdr->page_base) { 127*a246b010SChuck Lever pglen -= base; 128*a246b010SChuck Lever base += xdr->page_base; 129*a246b010SChuck Lever ppage += base >> PAGE_CACHE_SHIFT; 130*a246b010SChuck Lever base &= ~PAGE_CACHE_MASK; 131*a246b010SChuck Lever } 132*a246b010SChuck Lever 133*a246b010SChuck Lever sendpage = sock->ops->sendpage ? : sock_no_sendpage; 134*a246b010SChuck Lever do { 135*a246b010SChuck Lever int flags = msgflags; 136*a246b010SChuck Lever 137*a246b010SChuck Lever len = PAGE_CACHE_SIZE; 138*a246b010SChuck Lever if (base) 139*a246b010SChuck Lever len -= base; 140*a246b010SChuck Lever if (pglen < len) 141*a246b010SChuck Lever len = pglen; 142*a246b010SChuck Lever 143*a246b010SChuck Lever if (pglen != len || xdr->tail[0].iov_len != 0) 144*a246b010SChuck Lever flags |= MSG_MORE; 145*a246b010SChuck Lever 146*a246b010SChuck Lever /* Hmm... We might be dealing with highmem pages */ 147*a246b010SChuck Lever if (PageHighMem(*ppage)) 148*a246b010SChuck Lever sendpage = sock_no_sendpage; 149*a246b010SChuck Lever err = sendpage(sock, *ppage, base, len, flags); 150*a246b010SChuck Lever if (ret == 0) 151*a246b010SChuck Lever ret = err; 152*a246b010SChuck Lever else if (err > 0) 153*a246b010SChuck Lever ret += err; 154*a246b010SChuck Lever if (err != len) 155*a246b010SChuck Lever goto out; 156*a246b010SChuck Lever base = 0; 157*a246b010SChuck Lever ppage++; 158*a246b010SChuck Lever } while ((pglen -= len) != 0); 159*a246b010SChuck Lever copy_tail: 160*a246b010SChuck Lever len = xdr->tail[0].iov_len; 161*a246b010SChuck Lever if (base < len) { 162*a246b010SChuck Lever struct kvec iov = { 163*a246b010SChuck Lever .iov_base = xdr->tail[0].iov_base + base, 164*a246b010SChuck Lever .iov_len = len - base, 165*a246b010SChuck Lever }; 166*a246b010SChuck Lever struct msghdr msg = { 167*a246b010SChuck Lever .msg_flags = msgflags, 168*a246b010SChuck Lever }; 169*a246b010SChuck Lever err = kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len); 170*a246b010SChuck Lever if (ret == 0) 171*a246b010SChuck Lever ret = err; 172*a246b010SChuck Lever else if (err > 0) 173*a246b010SChuck Lever ret += err; 174*a246b010SChuck Lever } 175*a246b010SChuck Lever out: 176*a246b010SChuck Lever return ret; 177*a246b010SChuck Lever } 178*a246b010SChuck Lever 179*a246b010SChuck Lever /* 180*a246b010SChuck Lever * Write data to socket. 181*a246b010SChuck Lever */ 182*a246b010SChuck Lever static inline int 183*a246b010SChuck Lever xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req) 184*a246b010SChuck Lever { 185*a246b010SChuck Lever struct socket *sock = xprt->sock; 186*a246b010SChuck Lever struct xdr_buf *xdr = &req->rq_snd_buf; 187*a246b010SChuck Lever struct sockaddr *addr = NULL; 188*a246b010SChuck Lever int addrlen = 0; 189*a246b010SChuck Lever unsigned int skip; 190*a246b010SChuck Lever int result; 191*a246b010SChuck Lever 192*a246b010SChuck Lever if (!sock) 193*a246b010SChuck Lever return -ENOTCONN; 194*a246b010SChuck Lever 195*a246b010SChuck Lever xprt_pktdump("packet data:", 196*a246b010SChuck Lever req->rq_svec->iov_base, 197*a246b010SChuck Lever req->rq_svec->iov_len); 198*a246b010SChuck Lever 199*a246b010SChuck Lever /* For UDP, we need to provide an address */ 200*a246b010SChuck Lever if (!xprt->stream) { 201*a246b010SChuck Lever addr = (struct sockaddr *) &xprt->addr; 202*a246b010SChuck Lever addrlen = sizeof(xprt->addr); 203*a246b010SChuck Lever } 204*a246b010SChuck Lever /* Dont repeat bytes */ 205*a246b010SChuck Lever skip = req->rq_bytes_sent; 206*a246b010SChuck Lever 207*a246b010SChuck Lever clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); 208*a246b010SChuck Lever result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); 209*a246b010SChuck Lever 210*a246b010SChuck Lever dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); 211*a246b010SChuck Lever 212*a246b010SChuck Lever if (result >= 0) 213*a246b010SChuck Lever return result; 214*a246b010SChuck Lever 215*a246b010SChuck Lever switch (result) { 216*a246b010SChuck Lever case -ECONNREFUSED: 217*a246b010SChuck Lever /* When the server has died, an ICMP port unreachable message 218*a246b010SChuck Lever * prompts ECONNREFUSED. 219*a246b010SChuck Lever */ 220*a246b010SChuck Lever case -EAGAIN: 221*a246b010SChuck Lever break; 222*a246b010SChuck Lever case -ECONNRESET: 223*a246b010SChuck Lever case -ENOTCONN: 224*a246b010SChuck Lever case -EPIPE: 225*a246b010SChuck Lever /* connection broken */ 226*a246b010SChuck Lever if (xprt->stream) 227*a246b010SChuck Lever result = -ENOTCONN; 228*a246b010SChuck Lever break; 229*a246b010SChuck Lever default: 230*a246b010SChuck Lever printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); 231*a246b010SChuck Lever } 232*a246b010SChuck Lever return result; 233*a246b010SChuck Lever } 234*a246b010SChuck Lever 235*a246b010SChuck Lever static int 236*a246b010SChuck Lever xprt_send_request(struct rpc_task *task) 237*a246b010SChuck Lever { 238*a246b010SChuck Lever struct rpc_rqst *req = task->tk_rqstp; 239*a246b010SChuck Lever struct rpc_xprt *xprt = req->rq_xprt; 240*a246b010SChuck Lever int status, retry = 0; 241*a246b010SChuck Lever 242*a246b010SChuck Lever /* set up everything as needed. */ 243*a246b010SChuck Lever /* Write the record marker */ 244*a246b010SChuck Lever if (xprt->stream) { 245*a246b010SChuck Lever u32 *marker = req->rq_svec[0].iov_base; 246*a246b010SChuck Lever 247*a246b010SChuck Lever *marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker))); 248*a246b010SChuck Lever } 249*a246b010SChuck Lever 250*a246b010SChuck Lever /* Continue transmitting the packet/record. We must be careful 251*a246b010SChuck Lever * to cope with writespace callbacks arriving _after_ we have 252*a246b010SChuck Lever * called xprt_sendmsg(). 253*a246b010SChuck Lever */ 254*a246b010SChuck Lever while (1) { 255*a246b010SChuck Lever req->rq_xtime = jiffies; 256*a246b010SChuck Lever status = xprt_sendmsg(xprt, req); 257*a246b010SChuck Lever 258*a246b010SChuck Lever if (status < 0) 259*a246b010SChuck Lever break; 260*a246b010SChuck Lever 261*a246b010SChuck Lever if (xprt->stream) { 262*a246b010SChuck Lever req->rq_bytes_sent += status; 263*a246b010SChuck Lever 264*a246b010SChuck Lever /* If we've sent the entire packet, immediately 265*a246b010SChuck Lever * reset the count of bytes sent. */ 266*a246b010SChuck Lever if (req->rq_bytes_sent >= req->rq_slen) { 267*a246b010SChuck Lever req->rq_bytes_sent = 0; 268*a246b010SChuck Lever return 0; 269*a246b010SChuck Lever } 270*a246b010SChuck Lever } else { 271*a246b010SChuck Lever if (status >= req->rq_slen) 272*a246b010SChuck Lever return 0; 273*a246b010SChuck Lever status = -EAGAIN; 274*a246b010SChuck Lever break; 275*a246b010SChuck Lever } 276*a246b010SChuck Lever 277*a246b010SChuck Lever dprintk("RPC: %4d xmit incomplete (%d left of %d)\n", 278*a246b010SChuck Lever task->tk_pid, req->rq_slen - req->rq_bytes_sent, 279*a246b010SChuck Lever req->rq_slen); 280*a246b010SChuck Lever 281*a246b010SChuck Lever status = -EAGAIN; 282*a246b010SChuck Lever if (retry++ > 50) 283*a246b010SChuck Lever break; 284*a246b010SChuck Lever } 285*a246b010SChuck Lever 286*a246b010SChuck Lever if (status == -EAGAIN) { 287*a246b010SChuck Lever if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) { 288*a246b010SChuck Lever /* Protect against races with xprt_write_space */ 289*a246b010SChuck Lever spin_lock_bh(&xprt->sock_lock); 290*a246b010SChuck Lever /* Don't race with disconnect */ 291*a246b010SChuck Lever if (!xprt_connected(xprt)) 292*a246b010SChuck Lever task->tk_status = -ENOTCONN; 293*a246b010SChuck Lever else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) { 294*a246b010SChuck Lever task->tk_timeout = req->rq_timeout; 295*a246b010SChuck Lever rpc_sleep_on(&xprt->pending, task, NULL, NULL); 296*a246b010SChuck Lever } 297*a246b010SChuck Lever spin_unlock_bh(&xprt->sock_lock); 298*a246b010SChuck Lever return status; 299*a246b010SChuck Lever } 300*a246b010SChuck Lever /* Keep holding the socket if it is blocked */ 301*a246b010SChuck Lever rpc_delay(task, HZ>>4); 302*a246b010SChuck Lever } 303*a246b010SChuck Lever return status; 304*a246b010SChuck Lever } 305*a246b010SChuck Lever 306*a246b010SChuck Lever /* 307*a246b010SChuck Lever * Close down a transport socket 308*a246b010SChuck Lever */ 309*a246b010SChuck Lever static void 310*a246b010SChuck Lever xprt_close(struct rpc_xprt *xprt) 311*a246b010SChuck Lever { 312*a246b010SChuck Lever struct socket *sock = xprt->sock; 313*a246b010SChuck Lever struct sock *sk = xprt->inet; 314*a246b010SChuck Lever 315*a246b010SChuck Lever if (!sk) 316*a246b010SChuck Lever return; 317*a246b010SChuck Lever 318*a246b010SChuck Lever write_lock_bh(&sk->sk_callback_lock); 319*a246b010SChuck Lever xprt->inet = NULL; 320*a246b010SChuck Lever xprt->sock = NULL; 321*a246b010SChuck Lever 322*a246b010SChuck Lever sk->sk_user_data = NULL; 323*a246b010SChuck Lever sk->sk_data_ready = xprt->old_data_ready; 324*a246b010SChuck Lever sk->sk_state_change = xprt->old_state_change; 325*a246b010SChuck Lever sk->sk_write_space = xprt->old_write_space; 326*a246b010SChuck Lever write_unlock_bh(&sk->sk_callback_lock); 327*a246b010SChuck Lever 328*a246b010SChuck Lever sk->sk_no_check = 0; 329*a246b010SChuck Lever 330*a246b010SChuck Lever sock_release(sock); 331*a246b010SChuck Lever } 332*a246b010SChuck Lever 333*a246b010SChuck Lever static void xprt_socket_destroy(struct rpc_xprt *xprt) 334*a246b010SChuck Lever { 335*a246b010SChuck Lever cancel_delayed_work(&xprt->sock_connect); 336*a246b010SChuck Lever flush_scheduled_work(); 337*a246b010SChuck Lever 338*a246b010SChuck Lever xprt_disconnect(xprt); 339*a246b010SChuck Lever xprt_close(xprt); 340*a246b010SChuck Lever kfree(xprt->slot); 341*a246b010SChuck Lever } 342*a246b010SChuck Lever 343*a246b010SChuck Lever /* 344*a246b010SChuck Lever * Input handler for RPC replies. Called from a bottom half and hence 345*a246b010SChuck Lever * atomic. 346*a246b010SChuck Lever */ 347*a246b010SChuck Lever static void 348*a246b010SChuck Lever udp_data_ready(struct sock *sk, int len) 349*a246b010SChuck Lever { 350*a246b010SChuck Lever struct rpc_task *task; 351*a246b010SChuck Lever struct rpc_xprt *xprt; 352*a246b010SChuck Lever struct rpc_rqst *rovr; 353*a246b010SChuck Lever struct sk_buff *skb; 354*a246b010SChuck Lever int err, repsize, copied; 355*a246b010SChuck Lever u32 _xid, *xp; 356*a246b010SChuck Lever 357*a246b010SChuck Lever read_lock(&sk->sk_callback_lock); 358*a246b010SChuck Lever dprintk("RPC: udp_data_ready...\n"); 359*a246b010SChuck Lever if (!(xprt = xprt_from_sock(sk))) { 360*a246b010SChuck Lever printk("RPC: udp_data_ready request not found!\n"); 361*a246b010SChuck Lever goto out; 362*a246b010SChuck Lever } 363*a246b010SChuck Lever 364*a246b010SChuck Lever dprintk("RPC: udp_data_ready client %p\n", xprt); 365*a246b010SChuck Lever 366*a246b010SChuck Lever if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) 367*a246b010SChuck Lever goto out; 368*a246b010SChuck Lever 369*a246b010SChuck Lever if (xprt->shutdown) 370*a246b010SChuck Lever goto dropit; 371*a246b010SChuck Lever 372*a246b010SChuck Lever repsize = skb->len - sizeof(struct udphdr); 373*a246b010SChuck Lever if (repsize < 4) { 374*a246b010SChuck Lever printk("RPC: impossible RPC reply size %d!\n", repsize); 375*a246b010SChuck Lever goto dropit; 376*a246b010SChuck Lever } 377*a246b010SChuck Lever 378*a246b010SChuck Lever /* Copy the XID from the skb... */ 379*a246b010SChuck Lever xp = skb_header_pointer(skb, sizeof(struct udphdr), 380*a246b010SChuck Lever sizeof(_xid), &_xid); 381*a246b010SChuck Lever if (xp == NULL) 382*a246b010SChuck Lever goto dropit; 383*a246b010SChuck Lever 384*a246b010SChuck Lever /* Look up and lock the request corresponding to the given XID */ 385*a246b010SChuck Lever spin_lock(&xprt->sock_lock); 386*a246b010SChuck Lever rovr = xprt_lookup_rqst(xprt, *xp); 387*a246b010SChuck Lever if (!rovr) 388*a246b010SChuck Lever goto out_unlock; 389*a246b010SChuck Lever task = rovr->rq_task; 390*a246b010SChuck Lever 391*a246b010SChuck Lever dprintk("RPC: %4d received reply\n", task->tk_pid); 392*a246b010SChuck Lever 393*a246b010SChuck Lever if ((copied = rovr->rq_private_buf.buflen) > repsize) 394*a246b010SChuck Lever copied = repsize; 395*a246b010SChuck Lever 396*a246b010SChuck Lever /* Suck it into the iovec, verify checksum if not done by hw. */ 397*a246b010SChuck Lever if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) 398*a246b010SChuck Lever goto out_unlock; 399*a246b010SChuck Lever 400*a246b010SChuck Lever /* Something worked... */ 401*a246b010SChuck Lever dst_confirm(skb->dst); 402*a246b010SChuck Lever 403*a246b010SChuck Lever xprt_complete_rqst(xprt, rovr, copied); 404*a246b010SChuck Lever 405*a246b010SChuck Lever out_unlock: 406*a246b010SChuck Lever spin_unlock(&xprt->sock_lock); 407*a246b010SChuck Lever dropit: 408*a246b010SChuck Lever skb_free_datagram(sk, skb); 409*a246b010SChuck Lever out: 410*a246b010SChuck Lever read_unlock(&sk->sk_callback_lock); 411*a246b010SChuck Lever } 412*a246b010SChuck Lever 413*a246b010SChuck Lever /* 414*a246b010SChuck Lever * Copy from an skb into memory and shrink the skb. 415*a246b010SChuck Lever */ 416*a246b010SChuck Lever static inline size_t 417*a246b010SChuck Lever tcp_copy_data(skb_reader_t *desc, void *p, size_t len) 418*a246b010SChuck Lever { 419*a246b010SChuck Lever if (len > desc->count) 420*a246b010SChuck Lever len = desc->count; 421*a246b010SChuck Lever if (skb_copy_bits(desc->skb, desc->offset, p, len)) { 422*a246b010SChuck Lever dprintk("RPC: failed to copy %zu bytes from skb. %zu bytes remain\n", 423*a246b010SChuck Lever len, desc->count); 424*a246b010SChuck Lever return 0; 425*a246b010SChuck Lever } 426*a246b010SChuck Lever desc->offset += len; 427*a246b010SChuck Lever desc->count -= len; 428*a246b010SChuck Lever dprintk("RPC: copied %zu bytes from skb. %zu bytes remain\n", 429*a246b010SChuck Lever len, desc->count); 430*a246b010SChuck Lever return len; 431*a246b010SChuck Lever } 432*a246b010SChuck Lever 433*a246b010SChuck Lever /* 434*a246b010SChuck Lever * TCP read fragment marker 435*a246b010SChuck Lever */ 436*a246b010SChuck Lever static inline void 437*a246b010SChuck Lever tcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc) 438*a246b010SChuck Lever { 439*a246b010SChuck Lever size_t len, used; 440*a246b010SChuck Lever char *p; 441*a246b010SChuck Lever 442*a246b010SChuck Lever p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset; 443*a246b010SChuck Lever len = sizeof(xprt->tcp_recm) - xprt->tcp_offset; 444*a246b010SChuck Lever used = tcp_copy_data(desc, p, len); 445*a246b010SChuck Lever xprt->tcp_offset += used; 446*a246b010SChuck Lever if (used != len) 447*a246b010SChuck Lever return; 448*a246b010SChuck Lever xprt->tcp_reclen = ntohl(xprt->tcp_recm); 449*a246b010SChuck Lever if (xprt->tcp_reclen & 0x80000000) 450*a246b010SChuck Lever xprt->tcp_flags |= XPRT_LAST_FRAG; 451*a246b010SChuck Lever else 452*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_LAST_FRAG; 453*a246b010SChuck Lever xprt->tcp_reclen &= 0x7fffffff; 454*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_RECM; 455*a246b010SChuck Lever xprt->tcp_offset = 0; 456*a246b010SChuck Lever /* Sanity check of the record length */ 457*a246b010SChuck Lever if (xprt->tcp_reclen < 4) { 458*a246b010SChuck Lever printk(KERN_ERR "RPC: Invalid TCP record fragment length\n"); 459*a246b010SChuck Lever xprt_disconnect(xprt); 460*a246b010SChuck Lever } 461*a246b010SChuck Lever dprintk("RPC: reading TCP record fragment of length %d\n", 462*a246b010SChuck Lever xprt->tcp_reclen); 463*a246b010SChuck Lever } 464*a246b010SChuck Lever 465*a246b010SChuck Lever static void 466*a246b010SChuck Lever tcp_check_recm(struct rpc_xprt *xprt) 467*a246b010SChuck Lever { 468*a246b010SChuck Lever dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u, tcp_flags = %lx\n", 469*a246b010SChuck Lever xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_flags); 470*a246b010SChuck Lever if (xprt->tcp_offset == xprt->tcp_reclen) { 471*a246b010SChuck Lever xprt->tcp_flags |= XPRT_COPY_RECM; 472*a246b010SChuck Lever xprt->tcp_offset = 0; 473*a246b010SChuck Lever if (xprt->tcp_flags & XPRT_LAST_FRAG) { 474*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_DATA; 475*a246b010SChuck Lever xprt->tcp_flags |= XPRT_COPY_XID; 476*a246b010SChuck Lever xprt->tcp_copied = 0; 477*a246b010SChuck Lever } 478*a246b010SChuck Lever } 479*a246b010SChuck Lever } 480*a246b010SChuck Lever 481*a246b010SChuck Lever /* 482*a246b010SChuck Lever * TCP read xid 483*a246b010SChuck Lever */ 484*a246b010SChuck Lever static inline void 485*a246b010SChuck Lever tcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc) 486*a246b010SChuck Lever { 487*a246b010SChuck Lever size_t len, used; 488*a246b010SChuck Lever char *p; 489*a246b010SChuck Lever 490*a246b010SChuck Lever len = sizeof(xprt->tcp_xid) - xprt->tcp_offset; 491*a246b010SChuck Lever dprintk("RPC: reading XID (%Zu bytes)\n", len); 492*a246b010SChuck Lever p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset; 493*a246b010SChuck Lever used = tcp_copy_data(desc, p, len); 494*a246b010SChuck Lever xprt->tcp_offset += used; 495*a246b010SChuck Lever if (used != len) 496*a246b010SChuck Lever return; 497*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_XID; 498*a246b010SChuck Lever xprt->tcp_flags |= XPRT_COPY_DATA; 499*a246b010SChuck Lever xprt->tcp_copied = 4; 500*a246b010SChuck Lever dprintk("RPC: reading reply for XID %08x\n", 501*a246b010SChuck Lever ntohl(xprt->tcp_xid)); 502*a246b010SChuck Lever tcp_check_recm(xprt); 503*a246b010SChuck Lever } 504*a246b010SChuck Lever 505*a246b010SChuck Lever /* 506*a246b010SChuck Lever * TCP read and complete request 507*a246b010SChuck Lever */ 508*a246b010SChuck Lever static inline void 509*a246b010SChuck Lever tcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc) 510*a246b010SChuck Lever { 511*a246b010SChuck Lever struct rpc_rqst *req; 512*a246b010SChuck Lever struct xdr_buf *rcvbuf; 513*a246b010SChuck Lever size_t len; 514*a246b010SChuck Lever ssize_t r; 515*a246b010SChuck Lever 516*a246b010SChuck Lever /* Find and lock the request corresponding to this xid */ 517*a246b010SChuck Lever spin_lock(&xprt->sock_lock); 518*a246b010SChuck Lever req = xprt_lookup_rqst(xprt, xprt->tcp_xid); 519*a246b010SChuck Lever if (!req) { 520*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_DATA; 521*a246b010SChuck Lever dprintk("RPC: XID %08x request not found!\n", 522*a246b010SChuck Lever ntohl(xprt->tcp_xid)); 523*a246b010SChuck Lever spin_unlock(&xprt->sock_lock); 524*a246b010SChuck Lever return; 525*a246b010SChuck Lever } 526*a246b010SChuck Lever 527*a246b010SChuck Lever rcvbuf = &req->rq_private_buf; 528*a246b010SChuck Lever len = desc->count; 529*a246b010SChuck Lever if (len > xprt->tcp_reclen - xprt->tcp_offset) { 530*a246b010SChuck Lever skb_reader_t my_desc; 531*a246b010SChuck Lever 532*a246b010SChuck Lever len = xprt->tcp_reclen - xprt->tcp_offset; 533*a246b010SChuck Lever memcpy(&my_desc, desc, sizeof(my_desc)); 534*a246b010SChuck Lever my_desc.count = len; 535*a246b010SChuck Lever r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 536*a246b010SChuck Lever &my_desc, tcp_copy_data); 537*a246b010SChuck Lever desc->count -= r; 538*a246b010SChuck Lever desc->offset += r; 539*a246b010SChuck Lever } else 540*a246b010SChuck Lever r = xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied, 541*a246b010SChuck Lever desc, tcp_copy_data); 542*a246b010SChuck Lever 543*a246b010SChuck Lever if (r > 0) { 544*a246b010SChuck Lever xprt->tcp_copied += r; 545*a246b010SChuck Lever xprt->tcp_offset += r; 546*a246b010SChuck Lever } 547*a246b010SChuck Lever if (r != len) { 548*a246b010SChuck Lever /* Error when copying to the receive buffer, 549*a246b010SChuck Lever * usually because we weren't able to allocate 550*a246b010SChuck Lever * additional buffer pages. All we can do now 551*a246b010SChuck Lever * is turn off XPRT_COPY_DATA, so the request 552*a246b010SChuck Lever * will not receive any additional updates, 553*a246b010SChuck Lever * and time out. 554*a246b010SChuck Lever * Any remaining data from this record will 555*a246b010SChuck Lever * be discarded. 556*a246b010SChuck Lever */ 557*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_DATA; 558*a246b010SChuck Lever dprintk("RPC: XID %08x truncated request\n", 559*a246b010SChuck Lever ntohl(xprt->tcp_xid)); 560*a246b010SChuck Lever dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 561*a246b010SChuck Lever xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 562*a246b010SChuck Lever goto out; 563*a246b010SChuck Lever } 564*a246b010SChuck Lever 565*a246b010SChuck Lever dprintk("RPC: XID %08x read %Zd bytes\n", 566*a246b010SChuck Lever ntohl(xprt->tcp_xid), r); 567*a246b010SChuck Lever dprintk("RPC: xprt = %p, tcp_copied = %lu, tcp_offset = %u, tcp_reclen = %u\n", 568*a246b010SChuck Lever xprt, xprt->tcp_copied, xprt->tcp_offset, xprt->tcp_reclen); 569*a246b010SChuck Lever 570*a246b010SChuck Lever if (xprt->tcp_copied == req->rq_private_buf.buflen) 571*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_DATA; 572*a246b010SChuck Lever else if (xprt->tcp_offset == xprt->tcp_reclen) { 573*a246b010SChuck Lever if (xprt->tcp_flags & XPRT_LAST_FRAG) 574*a246b010SChuck Lever xprt->tcp_flags &= ~XPRT_COPY_DATA; 575*a246b010SChuck Lever } 576*a246b010SChuck Lever 577*a246b010SChuck Lever out: 578*a246b010SChuck Lever if (!(xprt->tcp_flags & XPRT_COPY_DATA)) { 579*a246b010SChuck Lever dprintk("RPC: %4d received reply complete\n", 580*a246b010SChuck Lever req->rq_task->tk_pid); 581*a246b010SChuck Lever xprt_complete_rqst(xprt, req, xprt->tcp_copied); 582*a246b010SChuck Lever } 583*a246b010SChuck Lever spin_unlock(&xprt->sock_lock); 584*a246b010SChuck Lever tcp_check_recm(xprt); 585*a246b010SChuck Lever } 586*a246b010SChuck Lever 587*a246b010SChuck Lever /* 588*a246b010SChuck Lever * TCP discard extra bytes from a short read 589*a246b010SChuck Lever */ 590*a246b010SChuck Lever static inline void 591*a246b010SChuck Lever tcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc) 592*a246b010SChuck Lever { 593*a246b010SChuck Lever size_t len; 594*a246b010SChuck Lever 595*a246b010SChuck Lever len = xprt->tcp_reclen - xprt->tcp_offset; 596*a246b010SChuck Lever if (len > desc->count) 597*a246b010SChuck Lever len = desc->count; 598*a246b010SChuck Lever desc->count -= len; 599*a246b010SChuck Lever desc->offset += len; 600*a246b010SChuck Lever xprt->tcp_offset += len; 601*a246b010SChuck Lever dprintk("RPC: discarded %Zu bytes\n", len); 602*a246b010SChuck Lever tcp_check_recm(xprt); 603*a246b010SChuck Lever } 604*a246b010SChuck Lever 605*a246b010SChuck Lever /* 606*a246b010SChuck Lever * TCP record receive routine 607*a246b010SChuck Lever * We first have to grab the record marker, then the XID, then the data. 608*a246b010SChuck Lever */ 609*a246b010SChuck Lever static int 610*a246b010SChuck Lever tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, 611*a246b010SChuck Lever unsigned int offset, size_t len) 612*a246b010SChuck Lever { 613*a246b010SChuck Lever struct rpc_xprt *xprt = rd_desc->arg.data; 614*a246b010SChuck Lever skb_reader_t desc = { 615*a246b010SChuck Lever .skb = skb, 616*a246b010SChuck Lever .offset = offset, 617*a246b010SChuck Lever .count = len, 618*a246b010SChuck Lever .csum = 0 619*a246b010SChuck Lever }; 620*a246b010SChuck Lever 621*a246b010SChuck Lever dprintk("RPC: tcp_data_recv\n"); 622*a246b010SChuck Lever do { 623*a246b010SChuck Lever /* Read in a new fragment marker if necessary */ 624*a246b010SChuck Lever /* Can we ever really expect to get completely empty fragments? */ 625*a246b010SChuck Lever if (xprt->tcp_flags & XPRT_COPY_RECM) { 626*a246b010SChuck Lever tcp_read_fraghdr(xprt, &desc); 627*a246b010SChuck Lever continue; 628*a246b010SChuck Lever } 629*a246b010SChuck Lever /* Read in the xid if necessary */ 630*a246b010SChuck Lever if (xprt->tcp_flags & XPRT_COPY_XID) { 631*a246b010SChuck Lever tcp_read_xid(xprt, &desc); 632*a246b010SChuck Lever continue; 633*a246b010SChuck Lever } 634*a246b010SChuck Lever /* Read in the request data */ 635*a246b010SChuck Lever if (xprt->tcp_flags & XPRT_COPY_DATA) { 636*a246b010SChuck Lever tcp_read_request(xprt, &desc); 637*a246b010SChuck Lever continue; 638*a246b010SChuck Lever } 639*a246b010SChuck Lever /* Skip over any trailing bytes on short reads */ 640*a246b010SChuck Lever tcp_read_discard(xprt, &desc); 641*a246b010SChuck Lever } while (desc.count); 642*a246b010SChuck Lever dprintk("RPC: tcp_data_recv done\n"); 643*a246b010SChuck Lever return len - desc.count; 644*a246b010SChuck Lever } 645*a246b010SChuck Lever 646*a246b010SChuck Lever static void tcp_data_ready(struct sock *sk, int bytes) 647*a246b010SChuck Lever { 648*a246b010SChuck Lever struct rpc_xprt *xprt; 649*a246b010SChuck Lever read_descriptor_t rd_desc; 650*a246b010SChuck Lever 651*a246b010SChuck Lever read_lock(&sk->sk_callback_lock); 652*a246b010SChuck Lever dprintk("RPC: tcp_data_ready...\n"); 653*a246b010SChuck Lever if (!(xprt = xprt_from_sock(sk))) { 654*a246b010SChuck Lever printk("RPC: tcp_data_ready socket info not found!\n"); 655*a246b010SChuck Lever goto out; 656*a246b010SChuck Lever } 657*a246b010SChuck Lever if (xprt->shutdown) 658*a246b010SChuck Lever goto out; 659*a246b010SChuck Lever 660*a246b010SChuck Lever /* We use rd_desc to pass struct xprt to tcp_data_recv */ 661*a246b010SChuck Lever rd_desc.arg.data = xprt; 662*a246b010SChuck Lever rd_desc.count = 65536; 663*a246b010SChuck Lever tcp_read_sock(sk, &rd_desc, tcp_data_recv); 664*a246b010SChuck Lever out: 665*a246b010SChuck Lever read_unlock(&sk->sk_callback_lock); 666*a246b010SChuck Lever } 667*a246b010SChuck Lever 668*a246b010SChuck Lever static void 669*a246b010SChuck Lever tcp_state_change(struct sock *sk) 670*a246b010SChuck Lever { 671*a246b010SChuck Lever struct rpc_xprt *xprt; 672*a246b010SChuck Lever 673*a246b010SChuck Lever read_lock(&sk->sk_callback_lock); 674*a246b010SChuck Lever if (!(xprt = xprt_from_sock(sk))) 675*a246b010SChuck Lever goto out; 676*a246b010SChuck Lever dprintk("RPC: tcp_state_change client %p...\n", xprt); 677*a246b010SChuck Lever dprintk("RPC: state %x conn %d dead %d zapped %d\n", 678*a246b010SChuck Lever sk->sk_state, xprt_connected(xprt), 679*a246b010SChuck Lever sock_flag(sk, SOCK_DEAD), 680*a246b010SChuck Lever sock_flag(sk, SOCK_ZAPPED)); 681*a246b010SChuck Lever 682*a246b010SChuck Lever switch (sk->sk_state) { 683*a246b010SChuck Lever case TCP_ESTABLISHED: 684*a246b010SChuck Lever spin_lock_bh(&xprt->sock_lock); 685*a246b010SChuck Lever if (!xprt_test_and_set_connected(xprt)) { 686*a246b010SChuck Lever /* Reset TCP record info */ 687*a246b010SChuck Lever xprt->tcp_offset = 0; 688*a246b010SChuck Lever xprt->tcp_reclen = 0; 689*a246b010SChuck Lever xprt->tcp_copied = 0; 690*a246b010SChuck Lever xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID; 691*a246b010SChuck Lever rpc_wake_up(&xprt->pending); 692*a246b010SChuck Lever } 693*a246b010SChuck Lever spin_unlock_bh(&xprt->sock_lock); 694*a246b010SChuck Lever break; 695*a246b010SChuck Lever case TCP_SYN_SENT: 696*a246b010SChuck Lever case TCP_SYN_RECV: 697*a246b010SChuck Lever break; 698*a246b010SChuck Lever default: 699*a246b010SChuck Lever xprt_disconnect(xprt); 700*a246b010SChuck Lever break; 701*a246b010SChuck Lever } 702*a246b010SChuck Lever out: 703*a246b010SChuck Lever read_unlock(&sk->sk_callback_lock); 704*a246b010SChuck Lever } 705*a246b010SChuck Lever 706*a246b010SChuck Lever /* 707*a246b010SChuck Lever * Called when more output buffer space is available for this socket. 708*a246b010SChuck Lever * We try not to wake our writers until they can make "significant" 709*a246b010SChuck Lever * progress, otherwise we'll waste resources thrashing sock_sendmsg 710*a246b010SChuck Lever * with a bunch of small requests. 711*a246b010SChuck Lever */ 712*a246b010SChuck Lever static void 713*a246b010SChuck Lever xprt_write_space(struct sock *sk) 714*a246b010SChuck Lever { 715*a246b010SChuck Lever struct rpc_xprt *xprt; 716*a246b010SChuck Lever struct socket *sock; 717*a246b010SChuck Lever 718*a246b010SChuck Lever read_lock(&sk->sk_callback_lock); 719*a246b010SChuck Lever if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket)) 720*a246b010SChuck Lever goto out; 721*a246b010SChuck Lever if (xprt->shutdown) 722*a246b010SChuck Lever goto out; 723*a246b010SChuck Lever 724*a246b010SChuck Lever /* Wait until we have enough socket memory */ 725*a246b010SChuck Lever if (xprt->stream) { 726*a246b010SChuck Lever /* from net/core/stream.c:sk_stream_write_space */ 727*a246b010SChuck Lever if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk)) 728*a246b010SChuck Lever goto out; 729*a246b010SChuck Lever } else { 730*a246b010SChuck Lever /* from net/core/sock.c:sock_def_write_space */ 731*a246b010SChuck Lever if (!sock_writeable(sk)) 732*a246b010SChuck Lever goto out; 733*a246b010SChuck Lever } 734*a246b010SChuck Lever 735*a246b010SChuck Lever if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)) 736*a246b010SChuck Lever goto out; 737*a246b010SChuck Lever 738*a246b010SChuck Lever spin_lock_bh(&xprt->sock_lock); 739*a246b010SChuck Lever if (xprt->snd_task) 740*a246b010SChuck Lever rpc_wake_up_task(xprt->snd_task); 741*a246b010SChuck Lever spin_unlock_bh(&xprt->sock_lock); 742*a246b010SChuck Lever out: 743*a246b010SChuck Lever read_unlock(&sk->sk_callback_lock); 744*a246b010SChuck Lever } 745*a246b010SChuck Lever 746*a246b010SChuck Lever /* 747*a246b010SChuck Lever * Set socket buffer length 748*a246b010SChuck Lever */ 749*a246b010SChuck Lever static void 750*a246b010SChuck Lever xprt_sock_setbufsize(struct rpc_xprt *xprt) 751*a246b010SChuck Lever { 752*a246b010SChuck Lever struct sock *sk = xprt->inet; 753*a246b010SChuck Lever 754*a246b010SChuck Lever if (xprt->stream) 755*a246b010SChuck Lever return; 756*a246b010SChuck Lever if (xprt->rcvsize) { 757*a246b010SChuck Lever sk->sk_userlocks |= SOCK_RCVBUF_LOCK; 758*a246b010SChuck Lever sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2; 759*a246b010SChuck Lever } 760*a246b010SChuck Lever if (xprt->sndsize) { 761*a246b010SChuck Lever sk->sk_userlocks |= SOCK_SNDBUF_LOCK; 762*a246b010SChuck Lever sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2; 763*a246b010SChuck Lever sk->sk_write_space(sk); 764*a246b010SChuck Lever } 765*a246b010SChuck Lever } 766*a246b010SChuck Lever 767*a246b010SChuck Lever /* 768*a246b010SChuck Lever * Bind to a reserved port 769*a246b010SChuck Lever */ 770*a246b010SChuck Lever static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock) 771*a246b010SChuck Lever { 772*a246b010SChuck Lever struct sockaddr_in myaddr = { 773*a246b010SChuck Lever .sin_family = AF_INET, 774*a246b010SChuck Lever }; 775*a246b010SChuck Lever int err, port; 776*a246b010SChuck Lever 777*a246b010SChuck Lever /* Were we already bound to a given port? Try to reuse it */ 778*a246b010SChuck Lever port = xprt->port; 779*a246b010SChuck Lever do { 780*a246b010SChuck Lever myaddr.sin_port = htons(port); 781*a246b010SChuck Lever err = sock->ops->bind(sock, (struct sockaddr *) &myaddr, 782*a246b010SChuck Lever sizeof(myaddr)); 783*a246b010SChuck Lever if (err == 0) { 784*a246b010SChuck Lever xprt->port = port; 785*a246b010SChuck Lever return 0; 786*a246b010SChuck Lever } 787*a246b010SChuck Lever if (--port == 0) 788*a246b010SChuck Lever port = XPRT_MAX_RESVPORT; 789*a246b010SChuck Lever } while (err == -EADDRINUSE && port != xprt->port); 790*a246b010SChuck Lever 791*a246b010SChuck Lever printk("RPC: Can't bind to reserved port (%d).\n", -err); 792*a246b010SChuck Lever return err; 793*a246b010SChuck Lever } 794*a246b010SChuck Lever 795*a246b010SChuck Lever static void 796*a246b010SChuck Lever xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock) 797*a246b010SChuck Lever { 798*a246b010SChuck Lever struct sock *sk = sock->sk; 799*a246b010SChuck Lever 800*a246b010SChuck Lever if (xprt->inet) 801*a246b010SChuck Lever return; 802*a246b010SChuck Lever 803*a246b010SChuck Lever write_lock_bh(&sk->sk_callback_lock); 804*a246b010SChuck Lever sk->sk_user_data = xprt; 805*a246b010SChuck Lever xprt->old_data_ready = sk->sk_data_ready; 806*a246b010SChuck Lever xprt->old_state_change = sk->sk_state_change; 807*a246b010SChuck Lever xprt->old_write_space = sk->sk_write_space; 808*a246b010SChuck Lever if (xprt->prot == IPPROTO_UDP) { 809*a246b010SChuck Lever sk->sk_data_ready = udp_data_ready; 810*a246b010SChuck Lever sk->sk_no_check = UDP_CSUM_NORCV; 811*a246b010SChuck Lever xprt_set_connected(xprt); 812*a246b010SChuck Lever } else { 813*a246b010SChuck Lever tcp_sk(sk)->nonagle = 1; /* disable Nagle's algorithm */ 814*a246b010SChuck Lever sk->sk_data_ready = tcp_data_ready; 815*a246b010SChuck Lever sk->sk_state_change = tcp_state_change; 816*a246b010SChuck Lever xprt_clear_connected(xprt); 817*a246b010SChuck Lever } 818*a246b010SChuck Lever sk->sk_write_space = xprt_write_space; 819*a246b010SChuck Lever 820*a246b010SChuck Lever /* Reset to new socket */ 821*a246b010SChuck Lever xprt->sock = sock; 822*a246b010SChuck Lever xprt->inet = sk; 823*a246b010SChuck Lever write_unlock_bh(&sk->sk_callback_lock); 824*a246b010SChuck Lever 825*a246b010SChuck Lever return; 826*a246b010SChuck Lever } 827*a246b010SChuck Lever 828*a246b010SChuck Lever /* 829*a246b010SChuck Lever * Datastream sockets are created here, but xprt_connect will create 830*a246b010SChuck Lever * and connect stream sockets. 831*a246b010SChuck Lever */ 832*a246b010SChuck Lever static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport) 833*a246b010SChuck Lever { 834*a246b010SChuck Lever struct socket *sock; 835*a246b010SChuck Lever int type, err; 836*a246b010SChuck Lever 837*a246b010SChuck Lever dprintk("RPC: xprt_create_socket(%s %d)\n", 838*a246b010SChuck Lever (proto == IPPROTO_UDP)? "udp" : "tcp", proto); 839*a246b010SChuck Lever 840*a246b010SChuck Lever type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; 841*a246b010SChuck Lever 842*a246b010SChuck Lever if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) { 843*a246b010SChuck Lever printk("RPC: can't create socket (%d).\n", -err); 844*a246b010SChuck Lever return NULL; 845*a246b010SChuck Lever } 846*a246b010SChuck Lever 847*a246b010SChuck Lever /* If the caller has the capability, bind to a reserved port */ 848*a246b010SChuck Lever if (resvport && xprt_bindresvport(xprt, sock) < 0) { 849*a246b010SChuck Lever printk("RPC: can't bind to reserved port.\n"); 850*a246b010SChuck Lever goto failed; 851*a246b010SChuck Lever } 852*a246b010SChuck Lever 853*a246b010SChuck Lever return sock; 854*a246b010SChuck Lever 855*a246b010SChuck Lever failed: 856*a246b010SChuck Lever sock_release(sock); 857*a246b010SChuck Lever return NULL; 858*a246b010SChuck Lever } 859*a246b010SChuck Lever 860*a246b010SChuck Lever static void xprt_socket_connect(void *args) 861*a246b010SChuck Lever { 862*a246b010SChuck Lever struct rpc_xprt *xprt = (struct rpc_xprt *)args; 863*a246b010SChuck Lever struct socket *sock = xprt->sock; 864*a246b010SChuck Lever int status = -EIO; 865*a246b010SChuck Lever 866*a246b010SChuck Lever if (xprt->shutdown || xprt->addr.sin_port == 0) 867*a246b010SChuck Lever goto out; 868*a246b010SChuck Lever 869*a246b010SChuck Lever /* 870*a246b010SChuck Lever * Start by resetting any existing state 871*a246b010SChuck Lever */ 872*a246b010SChuck Lever xprt_close(xprt); 873*a246b010SChuck Lever sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); 874*a246b010SChuck Lever if (sock == NULL) { 875*a246b010SChuck Lever /* couldn't create socket or bind to reserved port; 876*a246b010SChuck Lever * this is likely a permanent error, so cause an abort */ 877*a246b010SChuck Lever goto out; 878*a246b010SChuck Lever } 879*a246b010SChuck Lever xprt_bind_socket(xprt, sock); 880*a246b010SChuck Lever xprt_sock_setbufsize(xprt); 881*a246b010SChuck Lever 882*a246b010SChuck Lever status = 0; 883*a246b010SChuck Lever if (!xprt->stream) 884*a246b010SChuck Lever goto out; 885*a246b010SChuck Lever 886*a246b010SChuck Lever /* 887*a246b010SChuck Lever * Tell the socket layer to start connecting... 888*a246b010SChuck Lever */ 889*a246b010SChuck Lever status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, 890*a246b010SChuck Lever sizeof(xprt->addr), O_NONBLOCK); 891*a246b010SChuck Lever dprintk("RPC: %p connect status %d connected %d sock state %d\n", 892*a246b010SChuck Lever xprt, -status, xprt_connected(xprt), sock->sk->sk_state); 893*a246b010SChuck Lever if (status < 0) { 894*a246b010SChuck Lever switch (status) { 895*a246b010SChuck Lever case -EINPROGRESS: 896*a246b010SChuck Lever case -EALREADY: 897*a246b010SChuck Lever goto out_clear; 898*a246b010SChuck Lever } 899*a246b010SChuck Lever } 900*a246b010SChuck Lever out: 901*a246b010SChuck Lever if (status < 0) 902*a246b010SChuck Lever rpc_wake_up_status(&xprt->pending, status); 903*a246b010SChuck Lever else 904*a246b010SChuck Lever rpc_wake_up(&xprt->pending); 905*a246b010SChuck Lever out_clear: 906*a246b010SChuck Lever smp_mb__before_clear_bit(); 907*a246b010SChuck Lever clear_bit(XPRT_CONNECTING, &xprt->sockstate); 908*a246b010SChuck Lever smp_mb__after_clear_bit(); 909*a246b010SChuck Lever } 910*a246b010SChuck Lever 911*a246b010SChuck Lever static void 912*a246b010SChuck Lever xprt_connect_sock(struct rpc_task *task) 913*a246b010SChuck Lever { 914*a246b010SChuck Lever struct rpc_xprt *xprt = task->tk_xprt; 915*a246b010SChuck Lever 916*a246b010SChuck Lever if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) { 917*a246b010SChuck Lever /* Note: if we are here due to a dropped connection 918*a246b010SChuck Lever * we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ 919*a246b010SChuck Lever * seconds 920*a246b010SChuck Lever */ 921*a246b010SChuck Lever if (xprt->sock != NULL) 922*a246b010SChuck Lever schedule_delayed_work(&xprt->sock_connect, 923*a246b010SChuck Lever RPC_REESTABLISH_TIMEOUT); 924*a246b010SChuck Lever else { 925*a246b010SChuck Lever schedule_work(&xprt->sock_connect); 926*a246b010SChuck Lever /* flush_scheduled_work can sleep... */ 927*a246b010SChuck Lever if (!RPC_IS_ASYNC(task)) 928*a246b010SChuck Lever flush_scheduled_work(); 929*a246b010SChuck Lever } 930*a246b010SChuck Lever } 931*a246b010SChuck Lever } 932*a246b010SChuck Lever 933*a246b010SChuck Lever /* 934*a246b010SChuck Lever * Set default timeout parameters 935*a246b010SChuck Lever */ 936*a246b010SChuck Lever static void 937*a246b010SChuck Lever xprt_default_timeout(struct rpc_timeout *to, int proto) 938*a246b010SChuck Lever { 939*a246b010SChuck Lever if (proto == IPPROTO_UDP) 940*a246b010SChuck Lever xprt_set_timeout(to, 5, 5 * HZ); 941*a246b010SChuck Lever else 942*a246b010SChuck Lever xprt_set_timeout(to, 2, 60 * HZ); 943*a246b010SChuck Lever } 944*a246b010SChuck Lever 945*a246b010SChuck Lever static struct rpc_xprt_ops xprt_socket_ops = { 946*a246b010SChuck Lever .set_buffer_size = xprt_sock_setbufsize, 947*a246b010SChuck Lever .connect = xprt_connect_sock, 948*a246b010SChuck Lever .send_request = xprt_send_request, 949*a246b010SChuck Lever .close = xprt_close, 950*a246b010SChuck Lever .destroy = xprt_socket_destroy, 951*a246b010SChuck Lever }; 952*a246b010SChuck Lever 953*a246b010SChuck Lever extern unsigned int xprt_udp_slot_table_entries; 954*a246b010SChuck Lever extern unsigned int xprt_tcp_slot_table_entries; 955*a246b010SChuck Lever 956*a246b010SChuck Lever int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to) 957*a246b010SChuck Lever { 958*a246b010SChuck Lever size_t slot_table_size; 959*a246b010SChuck Lever 960*a246b010SChuck Lever dprintk("RPC: setting up udp-ipv4 transport...\n"); 961*a246b010SChuck Lever 962*a246b010SChuck Lever xprt->max_reqs = xprt_udp_slot_table_entries; 963*a246b010SChuck Lever slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); 964*a246b010SChuck Lever xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); 965*a246b010SChuck Lever if (xprt->slot == NULL) 966*a246b010SChuck Lever return -ENOMEM; 967*a246b010SChuck Lever memset(xprt->slot, 0, slot_table_size); 968*a246b010SChuck Lever 969*a246b010SChuck Lever xprt->prot = IPPROTO_UDP; 970*a246b010SChuck Lever xprt->port = XPRT_MAX_RESVPORT; 971*a246b010SChuck Lever xprt->stream = 0; 972*a246b010SChuck Lever xprt->nocong = 0; 973*a246b010SChuck Lever xprt->cwnd = RPC_INITCWND; 974*a246b010SChuck Lever xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 975*a246b010SChuck Lever /* XXX: header size can vary due to auth type, IPv6, etc. */ 976*a246b010SChuck Lever xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); 977*a246b010SChuck Lever 978*a246b010SChuck Lever INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 979*a246b010SChuck Lever 980*a246b010SChuck Lever xprt->ops = &xprt_socket_ops; 981*a246b010SChuck Lever 982*a246b010SChuck Lever if (to) 983*a246b010SChuck Lever xprt->timeout = *to; 984*a246b010SChuck Lever else 985*a246b010SChuck Lever xprt_default_timeout(to, xprt->prot); 986*a246b010SChuck Lever 987*a246b010SChuck Lever return 0; 988*a246b010SChuck Lever } 989*a246b010SChuck Lever 990*a246b010SChuck Lever int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to) 991*a246b010SChuck Lever { 992*a246b010SChuck Lever size_t slot_table_size; 993*a246b010SChuck Lever 994*a246b010SChuck Lever dprintk("RPC: setting up tcp-ipv4 transport...\n"); 995*a246b010SChuck Lever 996*a246b010SChuck Lever xprt->max_reqs = xprt_tcp_slot_table_entries; 997*a246b010SChuck Lever slot_table_size = xprt->max_reqs * sizeof(xprt->slot[0]); 998*a246b010SChuck Lever xprt->slot = kmalloc(slot_table_size, GFP_KERNEL); 999*a246b010SChuck Lever if (xprt->slot == NULL) 1000*a246b010SChuck Lever return -ENOMEM; 1001*a246b010SChuck Lever memset(xprt->slot, 0, slot_table_size); 1002*a246b010SChuck Lever 1003*a246b010SChuck Lever xprt->prot = IPPROTO_TCP; 1004*a246b010SChuck Lever xprt->port = XPRT_MAX_RESVPORT; 1005*a246b010SChuck Lever xprt->stream = 1; 1006*a246b010SChuck Lever xprt->nocong = 1; 1007*a246b010SChuck Lever xprt->cwnd = RPC_MAXCWND(xprt); 1008*a246b010SChuck Lever xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; 1009*a246b010SChuck Lever xprt->max_payload = (1U << 31) - 1; 1010*a246b010SChuck Lever 1011*a246b010SChuck Lever INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); 1012*a246b010SChuck Lever 1013*a246b010SChuck Lever xprt->ops = &xprt_socket_ops; 1014*a246b010SChuck Lever 1015*a246b010SChuck Lever if (to) 1016*a246b010SChuck Lever xprt->timeout = *to; 1017*a246b010SChuck Lever else 1018*a246b010SChuck Lever xprt_default_timeout(to, xprt->prot); 1019*a246b010SChuck Lever 1020*a246b010SChuck Lever return 0; 1021*a246b010SChuck Lever } 1022