1 /* 2 * linux/net/sunrpc/svcsock.c 3 * 4 * These are the RPC server socket internals. 5 * 6 * The server scheduling algorithm does not always distribute the load 7 * evenly when servicing a single client. May need to modify the 8 * svc_sock_enqueue procedure... 9 * 10 * TCP support is largely untested and may be a little slow. The problem 11 * is that we currently do two separate recvfrom's, one for the 4-byte 12 * record length, and the second for the actual record. This could possibly 13 * be improved by always reading a minimum size of around 100 bytes and 14 * tucking any superfluous bytes away in a temporary store. Still, that 15 * leaves write requests out in the rain. An alternative may be to peek at 16 * the first skb in the queue, and if it matches the next TCP sequence 17 * number, to extract the record marker. Yuck. 18 * 19 * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de> 20 */ 21 22 #include <linux/kernel.h> 23 #include <linux/sched.h> 24 #include <linux/errno.h> 25 #include <linux/fcntl.h> 26 #include <linux/net.h> 27 #include <linux/in.h> 28 #include <linux/inet.h> 29 #include <linux/udp.h> 30 #include <linux/tcp.h> 31 #include <linux/unistd.h> 32 #include <linux/slab.h> 33 #include <linux/netdevice.h> 34 #include <linux/skbuff.h> 35 #include <linux/file.h> 36 #include <linux/freezer.h> 37 #include <net/sock.h> 38 #include <net/checksum.h> 39 #include <net/ip.h> 40 #include <net/ipv6.h> 41 #include <net/tcp_states.h> 42 #include <asm/uaccess.h> 43 #include <asm/ioctls.h> 44 45 #include <linux/sunrpc/types.h> 46 #include <linux/sunrpc/clnt.h> 47 #include <linux/sunrpc/xdr.h> 48 #include <linux/sunrpc/svcsock.h> 49 #include <linux/sunrpc/stats.h> 50 51 /* SMP locking strategy: 52 * 53 * svc_pool->sp_lock protects most of the fields of that pool. 54 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 55 * when both need to be taken (rare), svc_serv->sv_lock is first. 56 * BKL protects svc_serv->sv_nrthread. 57 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 58 * and the ->sk_info_authunix cache. 59 * svc_sock->sk_xprt.xpt_flags.XPT_BUSY prevents a svc_sock being 60 * enqueued multiply. 61 * 62 * Some flags can be set to certain values at any time 63 * providing that certain rules are followed: 64 * 65 * XPT_CONN, XPT_DATA, can be set or cleared at any time. 66 * after a set, svc_sock_enqueue must be called. 67 * after a clear, the socket must be read/accepted 68 * if this succeeds, it must be set again. 69 * XPT_CLOSE can set at any time. It is never cleared. 70 * xpt_ref contains a bias of '1' until XPT_DEAD is set. 71 * so when xprt_ref hits zero, we know the transport is dead 72 * and no-one is using it. 73 * XPT_DEAD can only be set while XPT_BUSY is held which ensures 74 * no other thread will be using the socket or will try to 75 * set XPT_DEAD. 76 * 77 */ 78 79 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 80 81 82 static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *, 83 int *errp, int flags); 84 static void svc_delete_xprt(struct svc_xprt *xprt); 85 static void svc_udp_data_ready(struct sock *, int); 86 static int svc_udp_recvfrom(struct svc_rqst *); 87 static int svc_udp_sendto(struct svc_rqst *); 88 static void svc_close_xprt(struct svc_xprt *xprt); 89 static void svc_sock_detach(struct svc_xprt *); 90 static void svc_sock_free(struct svc_xprt *); 91 92 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk); 93 static int svc_deferred_recv(struct svc_rqst *rqstp); 94 static struct cache_deferred_req *svc_defer(struct cache_req *req); 95 static struct svc_xprt *svc_create_socket(struct svc_serv *, int, 96 struct sockaddr *, int, int); 97 98 /* apparently the "standard" is that clients close 99 * idle connections after 5 minutes, servers after 100 * 6 minutes 101 * http://www.connectathon.org/talks96/nfstcp.pdf 102 */ 103 static int svc_conn_age_period = 6*60; 104 105 #ifdef CONFIG_DEBUG_LOCK_ALLOC 106 static struct lock_class_key svc_key[2]; 107 static struct lock_class_key svc_slock_key[2]; 108 109 static inline void svc_reclassify_socket(struct socket *sock) 110 { 111 struct sock *sk = sock->sk; 112 BUG_ON(sock_owned_by_user(sk)); 113 switch (sk->sk_family) { 114 case AF_INET: 115 sock_lock_init_class_and_name(sk, "slock-AF_INET-NFSD", 116 &svc_slock_key[0], "sk_lock-AF_INET-NFSD", &svc_key[0]); 117 break; 118 119 case AF_INET6: 120 sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFSD", 121 &svc_slock_key[1], "sk_lock-AF_INET6-NFSD", &svc_key[1]); 122 break; 123 124 default: 125 BUG(); 126 } 127 } 128 #else 129 static inline void svc_reclassify_socket(struct socket *sock) 130 { 131 } 132 #endif 133 134 static char *__svc_print_addr(struct sockaddr *addr, char *buf, size_t len) 135 { 136 switch (addr->sa_family) { 137 case AF_INET: 138 snprintf(buf, len, "%u.%u.%u.%u, port=%u", 139 NIPQUAD(((struct sockaddr_in *) addr)->sin_addr), 140 ntohs(((struct sockaddr_in *) addr)->sin_port)); 141 break; 142 143 case AF_INET6: 144 snprintf(buf, len, "%x:%x:%x:%x:%x:%x:%x:%x, port=%u", 145 NIP6(((struct sockaddr_in6 *) addr)->sin6_addr), 146 ntohs(((struct sockaddr_in6 *) addr)->sin6_port)); 147 break; 148 149 default: 150 snprintf(buf, len, "unknown address type: %d", addr->sa_family); 151 break; 152 } 153 return buf; 154 } 155 156 /** 157 * svc_print_addr - Format rq_addr field for printing 158 * @rqstp: svc_rqst struct containing address to print 159 * @buf: target buffer for formatted address 160 * @len: length of target buffer 161 * 162 */ 163 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 164 { 165 return __svc_print_addr(svc_addr(rqstp), buf, len); 166 } 167 EXPORT_SYMBOL_GPL(svc_print_addr); 168 169 /* 170 * Queue up an idle server thread. Must have pool->sp_lock held. 171 * Note: this is really a stack rather than a queue, so that we only 172 * use as many different threads as we need, and the rest don't pollute 173 * the cache. 174 */ 175 static inline void 176 svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 177 { 178 list_add(&rqstp->rq_list, &pool->sp_threads); 179 } 180 181 /* 182 * Dequeue an nfsd thread. Must have pool->sp_lock held. 183 */ 184 static inline void 185 svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 186 { 187 list_del(&rqstp->rq_list); 188 } 189 190 /* 191 * Release an skbuff after use 192 */ 193 static void svc_release_skb(struct svc_rqst *rqstp) 194 { 195 struct sk_buff *skb = rqstp->rq_xprt_ctxt; 196 struct svc_deferred_req *dr = rqstp->rq_deferred; 197 198 if (skb) { 199 rqstp->rq_xprt_ctxt = NULL; 200 201 dprintk("svc: service %p, releasing skb %p\n", rqstp, skb); 202 skb_free_datagram(rqstp->rq_sock->sk_sk, skb); 203 } 204 if (dr) { 205 rqstp->rq_deferred = NULL; 206 kfree(dr); 207 } 208 } 209 210 /* 211 * Queue up a socket with data pending. If there are idle nfsd 212 * processes, wake 'em up. 213 * 214 */ 215 static void 216 svc_sock_enqueue(struct svc_sock *svsk) 217 { 218 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 219 struct svc_pool *pool; 220 struct svc_rqst *rqstp; 221 int cpu; 222 223 if (!(svsk->sk_xprt.xpt_flags & 224 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 225 return; 226 if (test_bit(XPT_DEAD, &svsk->sk_xprt.xpt_flags)) 227 return; 228 229 cpu = get_cpu(); 230 pool = svc_pool_for_cpu(svsk->sk_xprt.xpt_server, cpu); 231 put_cpu(); 232 233 spin_lock_bh(&pool->sp_lock); 234 235 if (!list_empty(&pool->sp_threads) && 236 !list_empty(&pool->sp_sockets)) 237 printk(KERN_ERR 238 "svc_sock_enqueue: threads and sockets both waiting??\n"); 239 240 if (test_bit(XPT_DEAD, &svsk->sk_xprt.xpt_flags)) { 241 /* Don't enqueue dead sockets */ 242 dprintk("svc: socket %p is dead, not enqueued\n", svsk->sk_sk); 243 goto out_unlock; 244 } 245 246 /* Mark socket as busy. It will remain in this state until the 247 * server has processed all pending data and put the socket back 248 * on the idle list. We update XPT_BUSY atomically because 249 * it also guards against trying to enqueue the svc_sock twice. 250 */ 251 if (test_and_set_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)) { 252 /* Don't enqueue socket while already enqueued */ 253 dprintk("svc: socket %p busy, not enqueued\n", svsk->sk_sk); 254 goto out_unlock; 255 } 256 BUG_ON(svsk->sk_xprt.xpt_pool != NULL); 257 svsk->sk_xprt.xpt_pool = pool; 258 259 /* Handle pending connection */ 260 if (test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags)) 261 goto process; 262 263 /* Handle close in-progress */ 264 if (test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags)) 265 goto process; 266 267 /* Check if we have space to reply to a request */ 268 if (!svsk->sk_xprt.xpt_ops->xpo_has_wspace(&svsk->sk_xprt)) { 269 /* Don't enqueue while not enough space for reply */ 270 dprintk("svc: no write space, socket %p not enqueued\n", svsk); 271 svsk->sk_xprt.xpt_pool = NULL; 272 clear_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags); 273 goto out_unlock; 274 } 275 276 process: 277 if (!list_empty(&pool->sp_threads)) { 278 rqstp = list_entry(pool->sp_threads.next, 279 struct svc_rqst, 280 rq_list); 281 dprintk("svc: socket %p served by daemon %p\n", 282 svsk->sk_sk, rqstp); 283 svc_thread_dequeue(pool, rqstp); 284 if (rqstp->rq_sock) 285 printk(KERN_ERR 286 "svc_sock_enqueue: server %p, rq_sock=%p!\n", 287 rqstp, rqstp->rq_sock); 288 rqstp->rq_sock = svsk; 289 svc_xprt_get(&svsk->sk_xprt); 290 rqstp->rq_reserved = serv->sv_max_mesg; 291 atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); 292 BUG_ON(svsk->sk_xprt.xpt_pool != pool); 293 wake_up(&rqstp->rq_wait); 294 } else { 295 dprintk("svc: socket %p put into queue\n", svsk->sk_sk); 296 list_add_tail(&svsk->sk_xprt.xpt_ready, &pool->sp_sockets); 297 BUG_ON(svsk->sk_xprt.xpt_pool != pool); 298 } 299 300 out_unlock: 301 spin_unlock_bh(&pool->sp_lock); 302 } 303 304 /* 305 * Dequeue the first socket. Must be called with the pool->sp_lock held. 306 */ 307 static inline struct svc_sock * 308 svc_sock_dequeue(struct svc_pool *pool) 309 { 310 struct svc_sock *svsk; 311 312 if (list_empty(&pool->sp_sockets)) 313 return NULL; 314 315 svsk = list_entry(pool->sp_sockets.next, 316 struct svc_sock, sk_xprt.xpt_ready); 317 list_del_init(&svsk->sk_xprt.xpt_ready); 318 319 dprintk("svc: socket %p dequeued, inuse=%d\n", 320 svsk->sk_sk, atomic_read(&svsk->sk_xprt.xpt_ref.refcount)); 321 322 return svsk; 323 } 324 325 /* 326 * Having read something from a socket, check whether it 327 * needs to be re-enqueued. 328 * Note: XPT_DATA only gets cleared when a read-attempt finds 329 * no (or insufficient) data. 330 */ 331 static inline void 332 svc_sock_received(struct svc_sock *svsk) 333 { 334 svsk->sk_xprt.xpt_pool = NULL; 335 clear_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags); 336 svc_sock_enqueue(svsk); 337 } 338 339 340 /** 341 * svc_reserve - change the space reserved for the reply to a request. 342 * @rqstp: The request in question 343 * @space: new max space to reserve 344 * 345 * Each request reserves some space on the output queue of the socket 346 * to make sure the reply fits. This function reduces that reserved 347 * space to be the amount of space used already, plus @space. 348 * 349 */ 350 void svc_reserve(struct svc_rqst *rqstp, int space) 351 { 352 space += rqstp->rq_res.head[0].iov_len; 353 354 if (space < rqstp->rq_reserved) { 355 struct svc_sock *svsk = rqstp->rq_sock; 356 atomic_sub((rqstp->rq_reserved - space), &svsk->sk_reserved); 357 rqstp->rq_reserved = space; 358 359 svc_sock_enqueue(svsk); 360 } 361 } 362 363 static void 364 svc_sock_release(struct svc_rqst *rqstp) 365 { 366 struct svc_sock *svsk = rqstp->rq_sock; 367 368 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 369 370 svc_free_res_pages(rqstp); 371 rqstp->rq_res.page_len = 0; 372 rqstp->rq_res.page_base = 0; 373 374 375 /* Reset response buffer and release 376 * the reservation. 377 * But first, check that enough space was reserved 378 * for the reply, otherwise we have a bug! 379 */ 380 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 381 printk(KERN_ERR "RPC request reserved %d but used %d\n", 382 rqstp->rq_reserved, 383 rqstp->rq_res.len); 384 385 rqstp->rq_res.head[0].iov_len = 0; 386 svc_reserve(rqstp, 0); 387 rqstp->rq_sock = NULL; 388 389 svc_xprt_put(&svsk->sk_xprt); 390 } 391 392 /* 393 * External function to wake up a server waiting for data 394 * This really only makes sense for services like lockd 395 * which have exactly one thread anyway. 396 */ 397 void 398 svc_wake_up(struct svc_serv *serv) 399 { 400 struct svc_rqst *rqstp; 401 unsigned int i; 402 struct svc_pool *pool; 403 404 for (i = 0; i < serv->sv_nrpools; i++) { 405 pool = &serv->sv_pools[i]; 406 407 spin_lock_bh(&pool->sp_lock); 408 if (!list_empty(&pool->sp_threads)) { 409 rqstp = list_entry(pool->sp_threads.next, 410 struct svc_rqst, 411 rq_list); 412 dprintk("svc: daemon %p woken up.\n", rqstp); 413 /* 414 svc_thread_dequeue(pool, rqstp); 415 rqstp->rq_sock = NULL; 416 */ 417 wake_up(&rqstp->rq_wait); 418 } 419 spin_unlock_bh(&pool->sp_lock); 420 } 421 } 422 423 union svc_pktinfo_u { 424 struct in_pktinfo pkti; 425 struct in6_pktinfo pkti6; 426 }; 427 #define SVC_PKTINFO_SPACE \ 428 CMSG_SPACE(sizeof(union svc_pktinfo_u)) 429 430 static void svc_set_cmsg_data(struct svc_rqst *rqstp, struct cmsghdr *cmh) 431 { 432 switch (rqstp->rq_sock->sk_sk->sk_family) { 433 case AF_INET: { 434 struct in_pktinfo *pki = CMSG_DATA(cmh); 435 436 cmh->cmsg_level = SOL_IP; 437 cmh->cmsg_type = IP_PKTINFO; 438 pki->ipi_ifindex = 0; 439 pki->ipi_spec_dst.s_addr = rqstp->rq_daddr.addr.s_addr; 440 cmh->cmsg_len = CMSG_LEN(sizeof(*pki)); 441 } 442 break; 443 444 case AF_INET6: { 445 struct in6_pktinfo *pki = CMSG_DATA(cmh); 446 447 cmh->cmsg_level = SOL_IPV6; 448 cmh->cmsg_type = IPV6_PKTINFO; 449 pki->ipi6_ifindex = 0; 450 ipv6_addr_copy(&pki->ipi6_addr, 451 &rqstp->rq_daddr.addr6); 452 cmh->cmsg_len = CMSG_LEN(sizeof(*pki)); 453 } 454 break; 455 } 456 return; 457 } 458 459 /* 460 * Generic sendto routine 461 */ 462 static int 463 svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr) 464 { 465 struct svc_sock *svsk = rqstp->rq_sock; 466 struct socket *sock = svsk->sk_sock; 467 int slen; 468 union { 469 struct cmsghdr hdr; 470 long all[SVC_PKTINFO_SPACE / sizeof(long)]; 471 } buffer; 472 struct cmsghdr *cmh = &buffer.hdr; 473 int len = 0; 474 int result; 475 int size; 476 struct page **ppage = xdr->pages; 477 size_t base = xdr->page_base; 478 unsigned int pglen = xdr->page_len; 479 unsigned int flags = MSG_MORE; 480 char buf[RPC_MAX_ADDRBUFLEN]; 481 482 slen = xdr->len; 483 484 if (rqstp->rq_prot == IPPROTO_UDP) { 485 struct msghdr msg = { 486 .msg_name = &rqstp->rq_addr, 487 .msg_namelen = rqstp->rq_addrlen, 488 .msg_control = cmh, 489 .msg_controllen = sizeof(buffer), 490 .msg_flags = MSG_MORE, 491 }; 492 493 svc_set_cmsg_data(rqstp, cmh); 494 495 if (sock_sendmsg(sock, &msg, 0) < 0) 496 goto out; 497 } 498 499 /* send head */ 500 if (slen == xdr->head[0].iov_len) 501 flags = 0; 502 len = kernel_sendpage(sock, rqstp->rq_respages[0], 0, 503 xdr->head[0].iov_len, flags); 504 if (len != xdr->head[0].iov_len) 505 goto out; 506 slen -= xdr->head[0].iov_len; 507 if (slen == 0) 508 goto out; 509 510 /* send page data */ 511 size = PAGE_SIZE - base < pglen ? PAGE_SIZE - base : pglen; 512 while (pglen > 0) { 513 if (slen == size) 514 flags = 0; 515 result = kernel_sendpage(sock, *ppage, base, size, flags); 516 if (result > 0) 517 len += result; 518 if (result != size) 519 goto out; 520 slen -= size; 521 pglen -= size; 522 size = PAGE_SIZE < pglen ? PAGE_SIZE : pglen; 523 base = 0; 524 ppage++; 525 } 526 /* send tail */ 527 if (xdr->tail[0].iov_len) { 528 result = kernel_sendpage(sock, rqstp->rq_respages[0], 529 ((unsigned long)xdr->tail[0].iov_base) 530 & (PAGE_SIZE-1), 531 xdr->tail[0].iov_len, 0); 532 533 if (result > 0) 534 len += result; 535 } 536 out: 537 dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n", 538 rqstp->rq_sock, xdr->head[0].iov_base, xdr->head[0].iov_len, 539 xdr->len, len, svc_print_addr(rqstp, buf, sizeof(buf))); 540 541 return len; 542 } 543 544 /* 545 * Report socket names for nfsdfs 546 */ 547 static int one_sock_name(char *buf, struct svc_sock *svsk) 548 { 549 int len; 550 551 switch(svsk->sk_sk->sk_family) { 552 case AF_INET: 553 len = sprintf(buf, "ipv4 %s %u.%u.%u.%u %d\n", 554 svsk->sk_sk->sk_protocol==IPPROTO_UDP? 555 "udp" : "tcp", 556 NIPQUAD(inet_sk(svsk->sk_sk)->rcv_saddr), 557 inet_sk(svsk->sk_sk)->num); 558 break; 559 default: 560 len = sprintf(buf, "*unknown-%d*\n", 561 svsk->sk_sk->sk_family); 562 } 563 return len; 564 } 565 566 int 567 svc_sock_names(char *buf, struct svc_serv *serv, char *toclose) 568 { 569 struct svc_sock *svsk, *closesk = NULL; 570 int len = 0; 571 572 if (!serv) 573 return 0; 574 spin_lock_bh(&serv->sv_lock); 575 list_for_each_entry(svsk, &serv->sv_permsocks, sk_xprt.xpt_list) { 576 int onelen = one_sock_name(buf+len, svsk); 577 if (toclose && strcmp(toclose, buf+len) == 0) 578 closesk = svsk; 579 else 580 len += onelen; 581 } 582 spin_unlock_bh(&serv->sv_lock); 583 if (closesk) 584 /* Should unregister with portmap, but you cannot 585 * unregister just one protocol... 586 */ 587 svc_close_xprt(&closesk->sk_xprt); 588 else if (toclose) 589 return -ENOENT; 590 return len; 591 } 592 EXPORT_SYMBOL(svc_sock_names); 593 594 /* 595 * Check input queue length 596 */ 597 static int 598 svc_recv_available(struct svc_sock *svsk) 599 { 600 struct socket *sock = svsk->sk_sock; 601 int avail, err; 602 603 err = kernel_sock_ioctl(sock, TIOCINQ, (unsigned long) &avail); 604 605 return (err >= 0)? avail : err; 606 } 607 608 /* 609 * Generic recvfrom routine. 610 */ 611 static int 612 svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, int nr, int buflen) 613 { 614 struct svc_sock *svsk = rqstp->rq_sock; 615 struct msghdr msg = { 616 .msg_flags = MSG_DONTWAIT, 617 }; 618 struct sockaddr *sin; 619 int len; 620 621 len = kernel_recvmsg(svsk->sk_sock, &msg, iov, nr, buflen, 622 msg.msg_flags); 623 624 /* sock_recvmsg doesn't fill in the name/namelen, so we must.. 625 */ 626 memcpy(&rqstp->rq_addr, &svsk->sk_remote, svsk->sk_remotelen); 627 rqstp->rq_addrlen = svsk->sk_remotelen; 628 629 /* Destination address in request is needed for binding the 630 * source address in RPC callbacks later. 631 */ 632 sin = (struct sockaddr *)&svsk->sk_local; 633 switch (sin->sa_family) { 634 case AF_INET: 635 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 636 break; 637 case AF_INET6: 638 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 639 break; 640 } 641 642 dprintk("svc: socket %p recvfrom(%p, %Zu) = %d\n", 643 svsk, iov[0].iov_base, iov[0].iov_len, len); 644 645 return len; 646 } 647 648 /* 649 * Set socket snd and rcv buffer lengths 650 */ 651 static inline void 652 svc_sock_setbufsize(struct socket *sock, unsigned int snd, unsigned int rcv) 653 { 654 #if 0 655 mm_segment_t oldfs; 656 oldfs = get_fs(); set_fs(KERNEL_DS); 657 sock_setsockopt(sock, SOL_SOCKET, SO_SNDBUF, 658 (char*)&snd, sizeof(snd)); 659 sock_setsockopt(sock, SOL_SOCKET, SO_RCVBUF, 660 (char*)&rcv, sizeof(rcv)); 661 #else 662 /* sock_setsockopt limits use to sysctl_?mem_max, 663 * which isn't acceptable. Until that is made conditional 664 * on not having CAP_SYS_RESOURCE or similar, we go direct... 665 * DaveM said I could! 666 */ 667 lock_sock(sock->sk); 668 sock->sk->sk_sndbuf = snd * 2; 669 sock->sk->sk_rcvbuf = rcv * 2; 670 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK|SOCK_RCVBUF_LOCK; 671 release_sock(sock->sk); 672 #endif 673 } 674 /* 675 * INET callback when data has been received on the socket. 676 */ 677 static void 678 svc_udp_data_ready(struct sock *sk, int count) 679 { 680 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 681 682 if (svsk) { 683 dprintk("svc: socket %p(inet %p), count=%d, busy=%d\n", 684 svsk, sk, count, 685 test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)); 686 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 687 svc_sock_enqueue(svsk); 688 } 689 if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) 690 wake_up_interruptible(sk->sk_sleep); 691 } 692 693 /* 694 * INET callback when space is newly available on the socket. 695 */ 696 static void 697 svc_write_space(struct sock *sk) 698 { 699 struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); 700 701 if (svsk) { 702 dprintk("svc: socket %p(inet %p), write_space busy=%d\n", 703 svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)); 704 svc_sock_enqueue(svsk); 705 } 706 707 if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) { 708 dprintk("RPC svc_write_space: someone sleeping on %p\n", 709 svsk); 710 wake_up_interruptible(sk->sk_sleep); 711 } 712 } 713 714 static inline void svc_udp_get_dest_address(struct svc_rqst *rqstp, 715 struct cmsghdr *cmh) 716 { 717 switch (rqstp->rq_sock->sk_sk->sk_family) { 718 case AF_INET: { 719 struct in_pktinfo *pki = CMSG_DATA(cmh); 720 rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr; 721 break; 722 } 723 case AF_INET6: { 724 struct in6_pktinfo *pki = CMSG_DATA(cmh); 725 ipv6_addr_copy(&rqstp->rq_daddr.addr6, &pki->ipi6_addr); 726 break; 727 } 728 } 729 } 730 731 /* 732 * Receive a datagram from a UDP socket. 733 */ 734 static int 735 svc_udp_recvfrom(struct svc_rqst *rqstp) 736 { 737 struct svc_sock *svsk = rqstp->rq_sock; 738 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 739 struct sk_buff *skb; 740 union { 741 struct cmsghdr hdr; 742 long all[SVC_PKTINFO_SPACE / sizeof(long)]; 743 } buffer; 744 struct cmsghdr *cmh = &buffer.hdr; 745 int err, len; 746 struct msghdr msg = { 747 .msg_name = svc_addr(rqstp), 748 .msg_control = cmh, 749 .msg_controllen = sizeof(buffer), 750 .msg_flags = MSG_DONTWAIT, 751 }; 752 753 if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags)) 754 /* udp sockets need large rcvbuf as all pending 755 * requests are still in that buffer. sndbuf must 756 * also be large enough that there is enough space 757 * for one reply per thread. We count all threads 758 * rather than threads in a particular pool, which 759 * provides an upper bound on the number of threads 760 * which will access the socket. 761 */ 762 svc_sock_setbufsize(svsk->sk_sock, 763 (serv->sv_nrthreads+3) * serv->sv_max_mesg, 764 (serv->sv_nrthreads+3) * serv->sv_max_mesg); 765 766 if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) { 767 svc_sock_received(svsk); 768 return svc_deferred_recv(rqstp); 769 } 770 771 clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 772 skb = NULL; 773 err = kernel_recvmsg(svsk->sk_sock, &msg, NULL, 774 0, 0, MSG_PEEK | MSG_DONTWAIT); 775 if (err >= 0) 776 skb = skb_recv_datagram(svsk->sk_sk, 0, 1, &err); 777 778 if (skb == NULL) { 779 if (err != -EAGAIN) { 780 /* possibly an icmp error */ 781 dprintk("svc: recvfrom returned error %d\n", -err); 782 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 783 } 784 svc_sock_received(svsk); 785 return -EAGAIN; 786 } 787 rqstp->rq_addrlen = sizeof(rqstp->rq_addr); 788 if (skb->tstamp.tv64 == 0) { 789 skb->tstamp = ktime_get_real(); 790 /* Don't enable netstamp, sunrpc doesn't 791 need that much accuracy */ 792 } 793 svsk->sk_sk->sk_stamp = skb->tstamp; 794 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* there may be more data... */ 795 796 /* 797 * Maybe more packets - kick another thread ASAP. 798 */ 799 svc_sock_received(svsk); 800 801 len = skb->len - sizeof(struct udphdr); 802 rqstp->rq_arg.len = len; 803 804 rqstp->rq_prot = IPPROTO_UDP; 805 806 if (cmh->cmsg_level != IPPROTO_IP || 807 cmh->cmsg_type != IP_PKTINFO) { 808 if (net_ratelimit()) 809 printk("rpcsvc: received unknown control message:" 810 "%d/%d\n", 811 cmh->cmsg_level, cmh->cmsg_type); 812 skb_free_datagram(svsk->sk_sk, skb); 813 return 0; 814 } 815 svc_udp_get_dest_address(rqstp, cmh); 816 817 if (skb_is_nonlinear(skb)) { 818 /* we have to copy */ 819 local_bh_disable(); 820 if (csum_partial_copy_to_xdr(&rqstp->rq_arg, skb)) { 821 local_bh_enable(); 822 /* checksum error */ 823 skb_free_datagram(svsk->sk_sk, skb); 824 return 0; 825 } 826 local_bh_enable(); 827 skb_free_datagram(svsk->sk_sk, skb); 828 } else { 829 /* we can use it in-place */ 830 rqstp->rq_arg.head[0].iov_base = skb->data + sizeof(struct udphdr); 831 rqstp->rq_arg.head[0].iov_len = len; 832 if (skb_checksum_complete(skb)) { 833 skb_free_datagram(svsk->sk_sk, skb); 834 return 0; 835 } 836 rqstp->rq_xprt_ctxt = skb; 837 } 838 839 rqstp->rq_arg.page_base = 0; 840 if (len <= rqstp->rq_arg.head[0].iov_len) { 841 rqstp->rq_arg.head[0].iov_len = len; 842 rqstp->rq_arg.page_len = 0; 843 rqstp->rq_respages = rqstp->rq_pages+1; 844 } else { 845 rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len; 846 rqstp->rq_respages = rqstp->rq_pages + 1 + 847 DIV_ROUND_UP(rqstp->rq_arg.page_len, PAGE_SIZE); 848 } 849 850 if (serv->sv_stats) 851 serv->sv_stats->netudpcnt++; 852 853 return len; 854 } 855 856 static int 857 svc_udp_sendto(struct svc_rqst *rqstp) 858 { 859 int error; 860 861 error = svc_sendto(rqstp, &rqstp->rq_res); 862 if (error == -ECONNREFUSED) 863 /* ICMP error on earlier request. */ 864 error = svc_sendto(rqstp, &rqstp->rq_res); 865 866 return error; 867 } 868 869 static void svc_udp_prep_reply_hdr(struct svc_rqst *rqstp) 870 { 871 } 872 873 static int svc_udp_has_wspace(struct svc_xprt *xprt) 874 { 875 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 876 struct svc_serv *serv = xprt->xpt_server; 877 unsigned long required; 878 879 /* 880 * Set the SOCK_NOSPACE flag before checking the available 881 * sock space. 882 */ 883 set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); 884 required = atomic_read(&svsk->sk_reserved) + serv->sv_max_mesg; 885 if (required*2 > sock_wspace(svsk->sk_sk)) 886 return 0; 887 clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); 888 return 1; 889 } 890 891 static struct svc_xprt *svc_udp_accept(struct svc_xprt *xprt) 892 { 893 BUG(); 894 return NULL; 895 } 896 897 static struct svc_xprt *svc_udp_create(struct svc_serv *serv, 898 struct sockaddr *sa, int salen, 899 int flags) 900 { 901 return svc_create_socket(serv, IPPROTO_UDP, sa, salen, flags); 902 } 903 904 static struct svc_xprt_ops svc_udp_ops = { 905 .xpo_create = svc_udp_create, 906 .xpo_recvfrom = svc_udp_recvfrom, 907 .xpo_sendto = svc_udp_sendto, 908 .xpo_release_rqst = svc_release_skb, 909 .xpo_detach = svc_sock_detach, 910 .xpo_free = svc_sock_free, 911 .xpo_prep_reply_hdr = svc_udp_prep_reply_hdr, 912 .xpo_has_wspace = svc_udp_has_wspace, 913 .xpo_accept = svc_udp_accept, 914 }; 915 916 static struct svc_xprt_class svc_udp_class = { 917 .xcl_name = "udp", 918 .xcl_owner = THIS_MODULE, 919 .xcl_ops = &svc_udp_ops, 920 .xcl_max_payload = RPCSVC_MAXPAYLOAD_UDP, 921 }; 922 923 static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) 924 { 925 int one = 1; 926 mm_segment_t oldfs; 927 928 svc_xprt_init(&svc_udp_class, &svsk->sk_xprt, serv); 929 svsk->sk_sk->sk_data_ready = svc_udp_data_ready; 930 svsk->sk_sk->sk_write_space = svc_write_space; 931 932 /* initialise setting must have enough space to 933 * receive and respond to one request. 934 * svc_udp_recvfrom will re-adjust if necessary 935 */ 936 svc_sock_setbufsize(svsk->sk_sock, 937 3 * svsk->sk_xprt.xpt_server->sv_max_mesg, 938 3 * svsk->sk_xprt.xpt_server->sv_max_mesg); 939 940 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* might have come in before data_ready set up */ 941 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); 942 943 oldfs = get_fs(); 944 set_fs(KERNEL_DS); 945 /* make sure we get destination address info */ 946 svsk->sk_sock->ops->setsockopt(svsk->sk_sock, IPPROTO_IP, IP_PKTINFO, 947 (char __user *)&one, sizeof(one)); 948 set_fs(oldfs); 949 } 950 951 /* 952 * A data_ready event on a listening socket means there's a connection 953 * pending. Do not use state_change as a substitute for it. 954 */ 955 static void 956 svc_tcp_listen_data_ready(struct sock *sk, int count_unused) 957 { 958 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 959 960 dprintk("svc: socket %p TCP (listen) state change %d\n", 961 sk, sk->sk_state); 962 963 /* 964 * This callback may called twice when a new connection 965 * is established as a child socket inherits everything 966 * from a parent LISTEN socket. 967 * 1) data_ready method of the parent socket will be called 968 * when one of child sockets become ESTABLISHED. 969 * 2) data_ready method of the child socket may be called 970 * when it receives data before the socket is accepted. 971 * In case of 2, we should ignore it silently. 972 */ 973 if (sk->sk_state == TCP_LISTEN) { 974 if (svsk) { 975 set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags); 976 svc_sock_enqueue(svsk); 977 } else 978 printk("svc: socket %p: no user data\n", sk); 979 } 980 981 if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) 982 wake_up_interruptible_all(sk->sk_sleep); 983 } 984 985 /* 986 * A state change on a connected socket means it's dying or dead. 987 */ 988 static void 989 svc_tcp_state_change(struct sock *sk) 990 { 991 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 992 993 dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n", 994 sk, sk->sk_state, sk->sk_user_data); 995 996 if (!svsk) 997 printk("svc: socket %p: no user data\n", sk); 998 else { 999 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1000 svc_sock_enqueue(svsk); 1001 } 1002 if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) 1003 wake_up_interruptible_all(sk->sk_sleep); 1004 } 1005 1006 static void 1007 svc_tcp_data_ready(struct sock *sk, int count) 1008 { 1009 struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; 1010 1011 dprintk("svc: socket %p TCP data ready (svsk %p)\n", 1012 sk, sk->sk_user_data); 1013 if (svsk) { 1014 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 1015 svc_sock_enqueue(svsk); 1016 } 1017 if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) 1018 wake_up_interruptible(sk->sk_sleep); 1019 } 1020 1021 static inline int svc_port_is_privileged(struct sockaddr *sin) 1022 { 1023 switch (sin->sa_family) { 1024 case AF_INET: 1025 return ntohs(((struct sockaddr_in *)sin)->sin_port) 1026 < PROT_SOCK; 1027 case AF_INET6: 1028 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 1029 < PROT_SOCK; 1030 default: 1031 return 0; 1032 } 1033 } 1034 1035 /* 1036 * Accept a TCP connection 1037 */ 1038 static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt) 1039 { 1040 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 1041 struct sockaddr_storage addr; 1042 struct sockaddr *sin = (struct sockaddr *) &addr; 1043 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 1044 struct socket *sock = svsk->sk_sock; 1045 struct socket *newsock; 1046 struct svc_sock *newsvsk; 1047 int err, slen; 1048 char buf[RPC_MAX_ADDRBUFLEN]; 1049 1050 dprintk("svc: tcp_accept %p sock %p\n", svsk, sock); 1051 if (!sock) 1052 return NULL; 1053 1054 clear_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags); 1055 err = kernel_accept(sock, &newsock, O_NONBLOCK); 1056 if (err < 0) { 1057 if (err == -ENOMEM) 1058 printk(KERN_WARNING "%s: no more sockets!\n", 1059 serv->sv_name); 1060 else if (err != -EAGAIN && net_ratelimit()) 1061 printk(KERN_WARNING "%s: accept failed (err %d)!\n", 1062 serv->sv_name, -err); 1063 return NULL; 1064 } 1065 set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags); 1066 1067 err = kernel_getpeername(newsock, sin, &slen); 1068 if (err < 0) { 1069 if (net_ratelimit()) 1070 printk(KERN_WARNING "%s: peername failed (err %d)!\n", 1071 serv->sv_name, -err); 1072 goto failed; /* aborted connection or whatever */ 1073 } 1074 1075 /* Ideally, we would want to reject connections from unauthorized 1076 * hosts here, but when we get encryption, the IP of the host won't 1077 * tell us anything. For now just warn about unpriv connections. 1078 */ 1079 if (!svc_port_is_privileged(sin)) { 1080 dprintk(KERN_WARNING 1081 "%s: connect from unprivileged port: %s\n", 1082 serv->sv_name, 1083 __svc_print_addr(sin, buf, sizeof(buf))); 1084 } 1085 dprintk("%s: connect from %s\n", serv->sv_name, 1086 __svc_print_addr(sin, buf, sizeof(buf))); 1087 1088 /* make sure that a write doesn't block forever when 1089 * low on memory 1090 */ 1091 newsock->sk->sk_sndtimeo = HZ*30; 1092 1093 if (!(newsvsk = svc_setup_socket(serv, newsock, &err, 1094 (SVC_SOCK_ANONYMOUS | SVC_SOCK_TEMPORARY)))) 1095 goto failed; 1096 memcpy(&newsvsk->sk_remote, sin, slen); 1097 newsvsk->sk_remotelen = slen; 1098 err = kernel_getsockname(newsock, sin, &slen); 1099 if (unlikely(err < 0)) { 1100 dprintk("svc_tcp_accept: kernel_getsockname error %d\n", -err); 1101 slen = offsetof(struct sockaddr, sa_data); 1102 } 1103 memcpy(&newsvsk->sk_local, sin, slen); 1104 1105 svc_sock_received(newsvsk); 1106 1107 if (serv->sv_stats) 1108 serv->sv_stats->nettcpconn++; 1109 1110 return &newsvsk->sk_xprt; 1111 1112 failed: 1113 sock_release(newsock); 1114 return NULL; 1115 } 1116 1117 /* 1118 * Receive data from a TCP socket. 1119 */ 1120 static int 1121 svc_tcp_recvfrom(struct svc_rqst *rqstp) 1122 { 1123 struct svc_sock *svsk = rqstp->rq_sock; 1124 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 1125 int len; 1126 struct kvec *vec; 1127 int pnum, vlen; 1128 1129 dprintk("svc: tcp_recv %p data %d conn %d close %d\n", 1130 svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags), 1131 test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags), 1132 test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags)); 1133 1134 if ((rqstp->rq_deferred = svc_deferred_dequeue(svsk))) { 1135 svc_sock_received(svsk); 1136 return svc_deferred_recv(rqstp); 1137 } 1138 1139 if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags)) 1140 /* sndbuf needs to have room for one request 1141 * per thread, otherwise we can stall even when the 1142 * network isn't a bottleneck. 1143 * 1144 * We count all threads rather than threads in a 1145 * particular pool, which provides an upper bound 1146 * on the number of threads which will access the socket. 1147 * 1148 * rcvbuf just needs to be able to hold a few requests. 1149 * Normally they will be removed from the queue 1150 * as soon a a complete request arrives. 1151 */ 1152 svc_sock_setbufsize(svsk->sk_sock, 1153 (serv->sv_nrthreads+3) * serv->sv_max_mesg, 1154 3 * serv->sv_max_mesg); 1155 1156 clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 1157 1158 /* Receive data. If we haven't got the record length yet, get 1159 * the next four bytes. Otherwise try to gobble up as much as 1160 * possible up to the complete record length. 1161 */ 1162 if (svsk->sk_tcplen < 4) { 1163 unsigned long want = 4 - svsk->sk_tcplen; 1164 struct kvec iov; 1165 1166 iov.iov_base = ((char *) &svsk->sk_reclen) + svsk->sk_tcplen; 1167 iov.iov_len = want; 1168 if ((len = svc_recvfrom(rqstp, &iov, 1, want)) < 0) 1169 goto error; 1170 svsk->sk_tcplen += len; 1171 1172 if (len < want) { 1173 dprintk("svc: short recvfrom while reading record length (%d of %lu)\n", 1174 len, want); 1175 svc_sock_received(svsk); 1176 return -EAGAIN; /* record header not complete */ 1177 } 1178 1179 svsk->sk_reclen = ntohl(svsk->sk_reclen); 1180 if (!(svsk->sk_reclen & 0x80000000)) { 1181 /* FIXME: technically, a record can be fragmented, 1182 * and non-terminal fragments will not have the top 1183 * bit set in the fragment length header. 1184 * But apparently no known nfs clients send fragmented 1185 * records. */ 1186 if (net_ratelimit()) 1187 printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx" 1188 " (non-terminal)\n", 1189 (unsigned long) svsk->sk_reclen); 1190 goto err_delete; 1191 } 1192 svsk->sk_reclen &= 0x7fffffff; 1193 dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen); 1194 if (svsk->sk_reclen > serv->sv_max_mesg) { 1195 if (net_ratelimit()) 1196 printk(KERN_NOTICE "RPC: bad TCP reclen 0x%08lx" 1197 " (large)\n", 1198 (unsigned long) svsk->sk_reclen); 1199 goto err_delete; 1200 } 1201 } 1202 1203 /* Check whether enough data is available */ 1204 len = svc_recv_available(svsk); 1205 if (len < 0) 1206 goto error; 1207 1208 if (len < svsk->sk_reclen) { 1209 dprintk("svc: incomplete TCP record (%d of %d)\n", 1210 len, svsk->sk_reclen); 1211 svc_sock_received(svsk); 1212 return -EAGAIN; /* record not complete */ 1213 } 1214 len = svsk->sk_reclen; 1215 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 1216 1217 vec = rqstp->rq_vec; 1218 vec[0] = rqstp->rq_arg.head[0]; 1219 vlen = PAGE_SIZE; 1220 pnum = 1; 1221 while (vlen < len) { 1222 vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]); 1223 vec[pnum].iov_len = PAGE_SIZE; 1224 pnum++; 1225 vlen += PAGE_SIZE; 1226 } 1227 rqstp->rq_respages = &rqstp->rq_pages[pnum]; 1228 1229 /* Now receive data */ 1230 len = svc_recvfrom(rqstp, vec, pnum, len); 1231 if (len < 0) 1232 goto error; 1233 1234 dprintk("svc: TCP complete record (%d bytes)\n", len); 1235 rqstp->rq_arg.len = len; 1236 rqstp->rq_arg.page_base = 0; 1237 if (len <= rqstp->rq_arg.head[0].iov_len) { 1238 rqstp->rq_arg.head[0].iov_len = len; 1239 rqstp->rq_arg.page_len = 0; 1240 } else { 1241 rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len; 1242 } 1243 1244 rqstp->rq_xprt_ctxt = NULL; 1245 rqstp->rq_prot = IPPROTO_TCP; 1246 1247 /* Reset TCP read info */ 1248 svsk->sk_reclen = 0; 1249 svsk->sk_tcplen = 0; 1250 1251 svc_sock_received(svsk); 1252 if (serv->sv_stats) 1253 serv->sv_stats->nettcpcnt++; 1254 1255 return len; 1256 1257 err_delete: 1258 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1259 return -EAGAIN; 1260 1261 error: 1262 if (len == -EAGAIN) { 1263 dprintk("RPC: TCP recvfrom got EAGAIN\n"); 1264 svc_sock_received(svsk); 1265 } else { 1266 printk(KERN_NOTICE "%s: recvfrom returned errno %d\n", 1267 svsk->sk_xprt.xpt_server->sv_name, -len); 1268 goto err_delete; 1269 } 1270 1271 return len; 1272 } 1273 1274 /* 1275 * Send out data on TCP socket. 1276 */ 1277 static int 1278 svc_tcp_sendto(struct svc_rqst *rqstp) 1279 { 1280 struct xdr_buf *xbufp = &rqstp->rq_res; 1281 int sent; 1282 __be32 reclen; 1283 1284 /* Set up the first element of the reply kvec. 1285 * Any other kvecs that may be in use have been taken 1286 * care of by the server implementation itself. 1287 */ 1288 reclen = htonl(0x80000000|((xbufp->len ) - 4)); 1289 memcpy(xbufp->head[0].iov_base, &reclen, 4); 1290 1291 if (test_bit(XPT_DEAD, &rqstp->rq_sock->sk_xprt.xpt_flags)) 1292 return -ENOTCONN; 1293 1294 sent = svc_sendto(rqstp, &rqstp->rq_res); 1295 if (sent != xbufp->len) { 1296 printk(KERN_NOTICE "rpc-srv/tcp: %s: %s %d when sending %d bytes - shutting down socket\n", 1297 rqstp->rq_sock->sk_xprt.xpt_server->sv_name, 1298 (sent<0)?"got error":"sent only", 1299 sent, xbufp->len); 1300 set_bit(XPT_CLOSE, &rqstp->rq_sock->sk_xprt.xpt_flags); 1301 svc_sock_enqueue(rqstp->rq_sock); 1302 sent = -EAGAIN; 1303 } 1304 return sent; 1305 } 1306 1307 /* 1308 * Setup response header. TCP has a 4B record length field. 1309 */ 1310 static void svc_tcp_prep_reply_hdr(struct svc_rqst *rqstp) 1311 { 1312 struct kvec *resv = &rqstp->rq_res.head[0]; 1313 1314 /* tcp needs a space for the record length... */ 1315 svc_putnl(resv, 0); 1316 } 1317 1318 static int svc_tcp_has_wspace(struct svc_xprt *xprt) 1319 { 1320 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 1321 struct svc_serv *serv = svsk->sk_xprt.xpt_server; 1322 int required; 1323 int wspace; 1324 1325 /* 1326 * Set the SOCK_NOSPACE flag before checking the available 1327 * sock space. 1328 */ 1329 set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); 1330 required = atomic_read(&svsk->sk_reserved) + serv->sv_max_mesg; 1331 wspace = sk_stream_wspace(svsk->sk_sk); 1332 1333 if (wspace < sk_stream_min_wspace(svsk->sk_sk)) 1334 return 0; 1335 if (required * 2 > wspace) 1336 return 0; 1337 1338 clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); 1339 return 1; 1340 } 1341 1342 static struct svc_xprt *svc_tcp_create(struct svc_serv *serv, 1343 struct sockaddr *sa, int salen, 1344 int flags) 1345 { 1346 return svc_create_socket(serv, IPPROTO_TCP, sa, salen, flags); 1347 } 1348 1349 static struct svc_xprt_ops svc_tcp_ops = { 1350 .xpo_create = svc_tcp_create, 1351 .xpo_recvfrom = svc_tcp_recvfrom, 1352 .xpo_sendto = svc_tcp_sendto, 1353 .xpo_release_rqst = svc_release_skb, 1354 .xpo_detach = svc_sock_detach, 1355 .xpo_free = svc_sock_free, 1356 .xpo_prep_reply_hdr = svc_tcp_prep_reply_hdr, 1357 .xpo_has_wspace = svc_tcp_has_wspace, 1358 .xpo_accept = svc_tcp_accept, 1359 }; 1360 1361 static struct svc_xprt_class svc_tcp_class = { 1362 .xcl_name = "tcp", 1363 .xcl_owner = THIS_MODULE, 1364 .xcl_ops = &svc_tcp_ops, 1365 .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP, 1366 }; 1367 1368 void svc_init_xprt_sock(void) 1369 { 1370 svc_reg_xprt_class(&svc_tcp_class); 1371 svc_reg_xprt_class(&svc_udp_class); 1372 } 1373 1374 void svc_cleanup_xprt_sock(void) 1375 { 1376 svc_unreg_xprt_class(&svc_tcp_class); 1377 svc_unreg_xprt_class(&svc_udp_class); 1378 } 1379 1380 static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) 1381 { 1382 struct sock *sk = svsk->sk_sk; 1383 struct tcp_sock *tp = tcp_sk(sk); 1384 1385 svc_xprt_init(&svc_tcp_class, &svsk->sk_xprt, serv); 1386 1387 if (sk->sk_state == TCP_LISTEN) { 1388 dprintk("setting up TCP socket for listening\n"); 1389 set_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags); 1390 sk->sk_data_ready = svc_tcp_listen_data_ready; 1391 set_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags); 1392 } else { 1393 dprintk("setting up TCP socket for reading\n"); 1394 sk->sk_state_change = svc_tcp_state_change; 1395 sk->sk_data_ready = svc_tcp_data_ready; 1396 sk->sk_write_space = svc_write_space; 1397 1398 svsk->sk_reclen = 0; 1399 svsk->sk_tcplen = 0; 1400 1401 tp->nonagle = 1; /* disable Nagle's algorithm */ 1402 1403 /* initialise setting must have enough space to 1404 * receive and respond to one request. 1405 * svc_tcp_recvfrom will re-adjust if necessary 1406 */ 1407 svc_sock_setbufsize(svsk->sk_sock, 1408 3 * svsk->sk_xprt.xpt_server->sv_max_mesg, 1409 3 * svsk->sk_xprt.xpt_server->sv_max_mesg); 1410 1411 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); 1412 set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); 1413 if (sk->sk_state != TCP_ESTABLISHED) 1414 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1415 } 1416 } 1417 1418 void 1419 svc_sock_update_bufs(struct svc_serv *serv) 1420 { 1421 /* 1422 * The number of server threads has changed. Update 1423 * rcvbuf and sndbuf accordingly on all sockets 1424 */ 1425 struct list_head *le; 1426 1427 spin_lock_bh(&serv->sv_lock); 1428 list_for_each(le, &serv->sv_permsocks) { 1429 struct svc_sock *svsk = 1430 list_entry(le, struct svc_sock, sk_xprt.xpt_list); 1431 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); 1432 } 1433 list_for_each(le, &serv->sv_tempsocks) { 1434 struct svc_sock *svsk = 1435 list_entry(le, struct svc_sock, sk_xprt.xpt_list); 1436 set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags); 1437 } 1438 spin_unlock_bh(&serv->sv_lock); 1439 } 1440 1441 /* 1442 * Make sure that we don't have too many active connections. If we 1443 * have, something must be dropped. 1444 * 1445 * There's no point in trying to do random drop here for DoS 1446 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 1447 * attacker can easily beat that. 1448 * 1449 * The only somewhat efficient mechanism would be if drop old 1450 * connections from the same IP first. But right now we don't even 1451 * record the client IP in svc_sock. 1452 */ 1453 static void svc_check_conn_limits(struct svc_serv *serv) 1454 { 1455 if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) { 1456 struct svc_sock *svsk = NULL; 1457 spin_lock_bh(&serv->sv_lock); 1458 if (!list_empty(&serv->sv_tempsocks)) { 1459 if (net_ratelimit()) { 1460 /* Try to help the admin */ 1461 printk(KERN_NOTICE "%s: too many open TCP " 1462 "sockets, consider increasing the " 1463 "number of nfsd threads\n", 1464 serv->sv_name); 1465 } 1466 /* 1467 * Always select the oldest socket. It's not fair, 1468 * but so is life 1469 */ 1470 svsk = list_entry(serv->sv_tempsocks.prev, 1471 struct svc_sock, 1472 sk_xprt.xpt_list); 1473 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1474 svc_xprt_get(&svsk->sk_xprt); 1475 } 1476 spin_unlock_bh(&serv->sv_lock); 1477 1478 if (svsk) { 1479 svc_sock_enqueue(svsk); 1480 svc_xprt_put(&svsk->sk_xprt); 1481 } 1482 } 1483 } 1484 1485 /* 1486 * Receive the next request on any socket. This code is carefully 1487 * organised not to touch any cachelines in the shared svc_serv 1488 * structure, only cachelines in the local svc_pool. 1489 */ 1490 int 1491 svc_recv(struct svc_rqst *rqstp, long timeout) 1492 { 1493 struct svc_sock *svsk = NULL; 1494 struct svc_serv *serv = rqstp->rq_server; 1495 struct svc_pool *pool = rqstp->rq_pool; 1496 int len, i; 1497 int pages; 1498 struct xdr_buf *arg; 1499 DECLARE_WAITQUEUE(wait, current); 1500 1501 dprintk("svc: server %p waiting for data (to = %ld)\n", 1502 rqstp, timeout); 1503 1504 if (rqstp->rq_sock) 1505 printk(KERN_ERR 1506 "svc_recv: service %p, socket not NULL!\n", 1507 rqstp); 1508 if (waitqueue_active(&rqstp->rq_wait)) 1509 printk(KERN_ERR 1510 "svc_recv: service %p, wait queue active!\n", 1511 rqstp); 1512 1513 1514 /* now allocate needed pages. If we get a failure, sleep briefly */ 1515 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 1516 for (i=0; i < pages ; i++) 1517 while (rqstp->rq_pages[i] == NULL) { 1518 struct page *p = alloc_page(GFP_KERNEL); 1519 if (!p) 1520 schedule_timeout_uninterruptible(msecs_to_jiffies(500)); 1521 rqstp->rq_pages[i] = p; 1522 } 1523 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 1524 BUG_ON(pages >= RPCSVC_MAXPAGES); 1525 1526 /* Make arg->head point to first page and arg->pages point to rest */ 1527 arg = &rqstp->rq_arg; 1528 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 1529 arg->head[0].iov_len = PAGE_SIZE; 1530 arg->pages = rqstp->rq_pages + 1; 1531 arg->page_base = 0; 1532 /* save at least one page for response */ 1533 arg->page_len = (pages-2)*PAGE_SIZE; 1534 arg->len = (pages-1)*PAGE_SIZE; 1535 arg->tail[0].iov_len = 0; 1536 1537 try_to_freeze(); 1538 cond_resched(); 1539 if (signalled()) 1540 return -EINTR; 1541 1542 spin_lock_bh(&pool->sp_lock); 1543 if ((svsk = svc_sock_dequeue(pool)) != NULL) { 1544 rqstp->rq_sock = svsk; 1545 svc_xprt_get(&svsk->sk_xprt); 1546 rqstp->rq_reserved = serv->sv_max_mesg; 1547 atomic_add(rqstp->rq_reserved, &svsk->sk_reserved); 1548 } else { 1549 /* No data pending. Go to sleep */ 1550 svc_thread_enqueue(pool, rqstp); 1551 1552 /* 1553 * We have to be able to interrupt this wait 1554 * to bring down the daemons ... 1555 */ 1556 set_current_state(TASK_INTERRUPTIBLE); 1557 add_wait_queue(&rqstp->rq_wait, &wait); 1558 spin_unlock_bh(&pool->sp_lock); 1559 1560 schedule_timeout(timeout); 1561 1562 try_to_freeze(); 1563 1564 spin_lock_bh(&pool->sp_lock); 1565 remove_wait_queue(&rqstp->rq_wait, &wait); 1566 1567 if (!(svsk = rqstp->rq_sock)) { 1568 svc_thread_dequeue(pool, rqstp); 1569 spin_unlock_bh(&pool->sp_lock); 1570 dprintk("svc: server %p, no data yet\n", rqstp); 1571 return signalled()? -EINTR : -EAGAIN; 1572 } 1573 } 1574 spin_unlock_bh(&pool->sp_lock); 1575 1576 len = 0; 1577 if (test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags)) { 1578 dprintk("svc_recv: found XPT_CLOSE\n"); 1579 svc_delete_xprt(&svsk->sk_xprt); 1580 } else if (test_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags)) { 1581 struct svc_xprt *newxpt; 1582 newxpt = svsk->sk_xprt.xpt_ops->xpo_accept(&svsk->sk_xprt); 1583 if (newxpt) { 1584 /* 1585 * We know this module_get will succeed because the 1586 * listener holds a reference too 1587 */ 1588 __module_get(newxpt->xpt_class->xcl_owner); 1589 svc_check_conn_limits(svsk->sk_xprt.xpt_server); 1590 } 1591 svc_sock_received(svsk); 1592 } else { 1593 dprintk("svc: server %p, pool %u, socket %p, inuse=%d\n", 1594 rqstp, pool->sp_id, svsk, 1595 atomic_read(&svsk->sk_xprt.xpt_ref.refcount)); 1596 len = svsk->sk_xprt.xpt_ops->xpo_recvfrom(rqstp); 1597 dprintk("svc: got len=%d\n", len); 1598 } 1599 1600 /* No data, incomplete (TCP) read, or accept() */ 1601 if (len == 0 || len == -EAGAIN) { 1602 rqstp->rq_res.len = 0; 1603 svc_sock_release(rqstp); 1604 return -EAGAIN; 1605 } 1606 svsk->sk_lastrecv = get_seconds(); 1607 clear_bit(XPT_OLD, &svsk->sk_xprt.xpt_flags); 1608 1609 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 1610 rqstp->rq_chandle.defer = svc_defer; 1611 1612 if (serv->sv_stats) 1613 serv->sv_stats->netcnt++; 1614 return len; 1615 } 1616 1617 /* 1618 * Drop request 1619 */ 1620 void 1621 svc_drop(struct svc_rqst *rqstp) 1622 { 1623 dprintk("svc: socket %p dropped request\n", rqstp->rq_sock); 1624 svc_sock_release(rqstp); 1625 } 1626 1627 /* 1628 * Return reply to client. 1629 */ 1630 int 1631 svc_send(struct svc_rqst *rqstp) 1632 { 1633 struct svc_sock *svsk; 1634 int len; 1635 struct xdr_buf *xb; 1636 1637 if ((svsk = rqstp->rq_sock) == NULL) { 1638 printk(KERN_WARNING "NULL socket pointer in %s:%d\n", 1639 __FILE__, __LINE__); 1640 return -EFAULT; 1641 } 1642 1643 /* release the receive skb before sending the reply */ 1644 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 1645 1646 /* calculate over-all length */ 1647 xb = & rqstp->rq_res; 1648 xb->len = xb->head[0].iov_len + 1649 xb->page_len + 1650 xb->tail[0].iov_len; 1651 1652 /* Grab svsk->sk_mutex to serialize outgoing data. */ 1653 mutex_lock(&svsk->sk_mutex); 1654 if (test_bit(XPT_DEAD, &svsk->sk_xprt.xpt_flags)) 1655 len = -ENOTCONN; 1656 else 1657 len = svsk->sk_xprt.xpt_ops->xpo_sendto(rqstp); 1658 mutex_unlock(&svsk->sk_mutex); 1659 svc_sock_release(rqstp); 1660 1661 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 1662 return 0; 1663 return len; 1664 } 1665 1666 /* 1667 * Timer function to close old temporary sockets, using 1668 * a mark-and-sweep algorithm. 1669 */ 1670 static void 1671 svc_age_temp_sockets(unsigned long closure) 1672 { 1673 struct svc_serv *serv = (struct svc_serv *)closure; 1674 struct svc_sock *svsk; 1675 struct list_head *le, *next; 1676 LIST_HEAD(to_be_aged); 1677 1678 dprintk("svc_age_temp_sockets\n"); 1679 1680 if (!spin_trylock_bh(&serv->sv_lock)) { 1681 /* busy, try again 1 sec later */ 1682 dprintk("svc_age_temp_sockets: busy\n"); 1683 mod_timer(&serv->sv_temptimer, jiffies + HZ); 1684 return; 1685 } 1686 1687 list_for_each_safe(le, next, &serv->sv_tempsocks) { 1688 svsk = list_entry(le, struct svc_sock, sk_xprt.xpt_list); 1689 1690 if (!test_and_set_bit(XPT_OLD, &svsk->sk_xprt.xpt_flags)) 1691 continue; 1692 if (atomic_read(&svsk->sk_xprt.xpt_ref.refcount) > 1 1693 || test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)) 1694 continue; 1695 svc_xprt_get(&svsk->sk_xprt); 1696 list_move(le, &to_be_aged); 1697 set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); 1698 set_bit(XPT_DETACHED, &svsk->sk_xprt.xpt_flags); 1699 } 1700 spin_unlock_bh(&serv->sv_lock); 1701 1702 while (!list_empty(&to_be_aged)) { 1703 le = to_be_aged.next; 1704 /* fiddling the sk_xprt.xpt_list node is safe 'cos we're XPT_DETACHED */ 1705 list_del_init(le); 1706 svsk = list_entry(le, struct svc_sock, sk_xprt.xpt_list); 1707 1708 dprintk("queuing svsk %p for closing, %lu seconds old\n", 1709 svsk, get_seconds() - svsk->sk_lastrecv); 1710 1711 /* a thread will dequeue and close it soon */ 1712 svc_sock_enqueue(svsk); 1713 svc_xprt_put(&svsk->sk_xprt); 1714 } 1715 1716 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 1717 } 1718 1719 /* 1720 * Initialize socket for RPC use and create svc_sock struct 1721 * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF. 1722 */ 1723 static struct svc_sock *svc_setup_socket(struct svc_serv *serv, 1724 struct socket *sock, 1725 int *errp, int flags) 1726 { 1727 struct svc_sock *svsk; 1728 struct sock *inet; 1729 int pmap_register = !(flags & SVC_SOCK_ANONYMOUS); 1730 int is_temporary = flags & SVC_SOCK_TEMPORARY; 1731 1732 dprintk("svc: svc_setup_socket %p\n", sock); 1733 if (!(svsk = kzalloc(sizeof(*svsk), GFP_KERNEL))) { 1734 *errp = -ENOMEM; 1735 return NULL; 1736 } 1737 1738 inet = sock->sk; 1739 1740 /* Register socket with portmapper */ 1741 if (*errp >= 0 && pmap_register) 1742 *errp = svc_register(serv, inet->sk_protocol, 1743 ntohs(inet_sk(inet)->sport)); 1744 1745 if (*errp < 0) { 1746 kfree(svsk); 1747 return NULL; 1748 } 1749 1750 set_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags); 1751 inet->sk_user_data = svsk; 1752 svsk->sk_sock = sock; 1753 svsk->sk_sk = inet; 1754 svsk->sk_ostate = inet->sk_state_change; 1755 svsk->sk_odata = inet->sk_data_ready; 1756 svsk->sk_owspace = inet->sk_write_space; 1757 svsk->sk_lastrecv = get_seconds(); 1758 spin_lock_init(&svsk->sk_lock); 1759 INIT_LIST_HEAD(&svsk->sk_deferred); 1760 mutex_init(&svsk->sk_mutex); 1761 1762 /* Initialize the socket */ 1763 if (sock->type == SOCK_DGRAM) 1764 svc_udp_init(svsk, serv); 1765 else 1766 svc_tcp_init(svsk, serv); 1767 1768 spin_lock_bh(&serv->sv_lock); 1769 if (is_temporary) { 1770 set_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags); 1771 list_add(&svsk->sk_xprt.xpt_list, &serv->sv_tempsocks); 1772 serv->sv_tmpcnt++; 1773 if (serv->sv_temptimer.function == NULL) { 1774 /* setup timer to age temp sockets */ 1775 setup_timer(&serv->sv_temptimer, svc_age_temp_sockets, 1776 (unsigned long)serv); 1777 mod_timer(&serv->sv_temptimer, 1778 jiffies + svc_conn_age_period * HZ); 1779 } 1780 } else { 1781 clear_bit(XPT_TEMP, &svsk->sk_xprt.xpt_flags); 1782 list_add(&svsk->sk_xprt.xpt_list, &serv->sv_permsocks); 1783 } 1784 spin_unlock_bh(&serv->sv_lock); 1785 1786 dprintk("svc: svc_setup_socket created %p (inet %p)\n", 1787 svsk, svsk->sk_sk); 1788 1789 return svsk; 1790 } 1791 1792 int svc_addsock(struct svc_serv *serv, 1793 int fd, 1794 char *name_return, 1795 int *proto) 1796 { 1797 int err = 0; 1798 struct socket *so = sockfd_lookup(fd, &err); 1799 struct svc_sock *svsk = NULL; 1800 1801 if (!so) 1802 return err; 1803 if (so->sk->sk_family != AF_INET) 1804 err = -EAFNOSUPPORT; 1805 else if (so->sk->sk_protocol != IPPROTO_TCP && 1806 so->sk->sk_protocol != IPPROTO_UDP) 1807 err = -EPROTONOSUPPORT; 1808 else if (so->state > SS_UNCONNECTED) 1809 err = -EISCONN; 1810 else { 1811 svsk = svc_setup_socket(serv, so, &err, SVC_SOCK_DEFAULTS); 1812 if (svsk) { 1813 svc_sock_received(svsk); 1814 err = 0; 1815 } 1816 } 1817 if (err) { 1818 sockfd_put(so); 1819 return err; 1820 } 1821 if (proto) *proto = so->sk->sk_protocol; 1822 return one_sock_name(name_return, svsk); 1823 } 1824 EXPORT_SYMBOL_GPL(svc_addsock); 1825 1826 /* 1827 * Create socket for RPC service. 1828 */ 1829 static struct svc_xprt *svc_create_socket(struct svc_serv *serv, 1830 int protocol, 1831 struct sockaddr *sin, int len, 1832 int flags) 1833 { 1834 struct svc_sock *svsk; 1835 struct socket *sock; 1836 int error; 1837 int type; 1838 char buf[RPC_MAX_ADDRBUFLEN]; 1839 1840 dprintk("svc: svc_create_socket(%s, %d, %s)\n", 1841 serv->sv_program->pg_name, protocol, 1842 __svc_print_addr(sin, buf, sizeof(buf))); 1843 1844 if (protocol != IPPROTO_UDP && protocol != IPPROTO_TCP) { 1845 printk(KERN_WARNING "svc: only UDP and TCP " 1846 "sockets supported\n"); 1847 return ERR_PTR(-EINVAL); 1848 } 1849 type = (protocol == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM; 1850 1851 error = sock_create_kern(sin->sa_family, type, protocol, &sock); 1852 if (error < 0) 1853 return ERR_PTR(error); 1854 1855 svc_reclassify_socket(sock); 1856 1857 if (type == SOCK_STREAM) 1858 sock->sk->sk_reuse = 1; /* allow address reuse */ 1859 error = kernel_bind(sock, sin, len); 1860 if (error < 0) 1861 goto bummer; 1862 1863 if (protocol == IPPROTO_TCP) { 1864 if ((error = kernel_listen(sock, 64)) < 0) 1865 goto bummer; 1866 } 1867 1868 if ((svsk = svc_setup_socket(serv, sock, &error, flags)) != NULL) { 1869 svc_sock_received(svsk); 1870 return (struct svc_xprt *)svsk; 1871 } 1872 1873 bummer: 1874 dprintk("svc: svc_create_socket error = %d\n", -error); 1875 sock_release(sock); 1876 return ERR_PTR(error); 1877 } 1878 1879 /* 1880 * Detach the svc_sock from the socket so that no 1881 * more callbacks occur. 1882 */ 1883 static void svc_sock_detach(struct svc_xprt *xprt) 1884 { 1885 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 1886 struct sock *sk = svsk->sk_sk; 1887 1888 dprintk("svc: svc_sock_detach(%p)\n", svsk); 1889 1890 /* put back the old socket callbacks */ 1891 sk->sk_state_change = svsk->sk_ostate; 1892 sk->sk_data_ready = svsk->sk_odata; 1893 sk->sk_write_space = svsk->sk_owspace; 1894 } 1895 1896 /* 1897 * Free the svc_sock's socket resources and the svc_sock itself. 1898 */ 1899 static void svc_sock_free(struct svc_xprt *xprt) 1900 { 1901 struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); 1902 dprintk("svc: svc_sock_free(%p)\n", svsk); 1903 1904 if (svsk->sk_info_authunix != NULL) 1905 svcauth_unix_info_release(svsk->sk_info_authunix); 1906 if (svsk->sk_sock->file) 1907 sockfd_put(svsk->sk_sock); 1908 else 1909 sock_release(svsk->sk_sock); 1910 kfree(svsk); 1911 } 1912 1913 /* 1914 * Remove a dead transport 1915 */ 1916 static void svc_delete_xprt(struct svc_xprt *xprt) 1917 { 1918 struct svc_serv *serv = xprt->xpt_server; 1919 1920 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 1921 xprt->xpt_ops->xpo_detach(xprt); 1922 1923 spin_lock_bh(&serv->sv_lock); 1924 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 1925 list_del_init(&xprt->xpt_list); 1926 /* 1927 * We used to delete the transport from whichever list 1928 * it's sk_xprt.xpt_ready node was on, but we don't actually 1929 * need to. This is because the only time we're called 1930 * while still attached to a queue, the queue itself 1931 * is about to be destroyed (in svc_destroy). 1932 */ 1933 if (!test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) { 1934 BUG_ON(atomic_read(&xprt->xpt_ref.refcount) < 2); 1935 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 1936 serv->sv_tmpcnt--; 1937 svc_xprt_put(xprt); 1938 } 1939 spin_unlock_bh(&serv->sv_lock); 1940 } 1941 1942 static void svc_close_xprt(struct svc_xprt *xprt) 1943 { 1944 set_bit(XPT_CLOSE, &xprt->xpt_flags); 1945 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 1946 /* someone else will have to effect the close */ 1947 return; 1948 1949 svc_xprt_get(xprt); 1950 svc_delete_xprt(xprt); 1951 clear_bit(XPT_BUSY, &xprt->xpt_flags); 1952 svc_xprt_put(xprt); 1953 } 1954 1955 void svc_close_all(struct list_head *xprt_list) 1956 { 1957 struct svc_xprt *xprt; 1958 struct svc_xprt *tmp; 1959 1960 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 1961 set_bit(XPT_CLOSE, &xprt->xpt_flags); 1962 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 1963 /* Waiting to be processed, but no threads left, 1964 * So just remove it from the waiting list 1965 */ 1966 list_del_init(&xprt->xpt_ready); 1967 clear_bit(XPT_BUSY, &xprt->xpt_flags); 1968 } 1969 svc_close_xprt(xprt); 1970 } 1971 } 1972 1973 /* 1974 * Handle defer and revisit of requests 1975 */ 1976 1977 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 1978 { 1979 struct svc_deferred_req *dr = container_of(dreq, struct svc_deferred_req, handle); 1980 struct svc_sock *svsk; 1981 1982 if (too_many) { 1983 svc_xprt_put(&dr->svsk->sk_xprt); 1984 kfree(dr); 1985 return; 1986 } 1987 dprintk("revisit queued\n"); 1988 svsk = dr->svsk; 1989 dr->svsk = NULL; 1990 spin_lock(&svsk->sk_lock); 1991 list_add(&dr->handle.recent, &svsk->sk_deferred); 1992 spin_unlock(&svsk->sk_lock); 1993 set_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags); 1994 svc_sock_enqueue(svsk); 1995 svc_xprt_put(&svsk->sk_xprt); 1996 } 1997 1998 static struct cache_deferred_req * 1999 svc_defer(struct cache_req *req) 2000 { 2001 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 2002 int size = sizeof(struct svc_deferred_req) + (rqstp->rq_arg.len); 2003 struct svc_deferred_req *dr; 2004 2005 if (rqstp->rq_arg.page_len) 2006 return NULL; /* if more than a page, give up FIXME */ 2007 if (rqstp->rq_deferred) { 2008 dr = rqstp->rq_deferred; 2009 rqstp->rq_deferred = NULL; 2010 } else { 2011 int skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 2012 /* FIXME maybe discard if size too large */ 2013 dr = kmalloc(size, GFP_KERNEL); 2014 if (dr == NULL) 2015 return NULL; 2016 2017 dr->handle.owner = rqstp->rq_server; 2018 dr->prot = rqstp->rq_prot; 2019 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 2020 dr->addrlen = rqstp->rq_addrlen; 2021 dr->daddr = rqstp->rq_daddr; 2022 dr->argslen = rqstp->rq_arg.len >> 2; 2023 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base-skip, dr->argslen<<2); 2024 } 2025 svc_xprt_get(rqstp->rq_xprt); 2026 dr->svsk = rqstp->rq_sock; 2027 2028 dr->handle.revisit = svc_revisit; 2029 return &dr->handle; 2030 } 2031 2032 /* 2033 * recv data from a deferred request into an active one 2034 */ 2035 static int svc_deferred_recv(struct svc_rqst *rqstp) 2036 { 2037 struct svc_deferred_req *dr = rqstp->rq_deferred; 2038 2039 rqstp->rq_arg.head[0].iov_base = dr->args; 2040 rqstp->rq_arg.head[0].iov_len = dr->argslen<<2; 2041 rqstp->rq_arg.page_len = 0; 2042 rqstp->rq_arg.len = dr->argslen<<2; 2043 rqstp->rq_prot = dr->prot; 2044 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 2045 rqstp->rq_addrlen = dr->addrlen; 2046 rqstp->rq_daddr = dr->daddr; 2047 rqstp->rq_respages = rqstp->rq_pages; 2048 return dr->argslen<<2; 2049 } 2050 2051 2052 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_sock *svsk) 2053 { 2054 struct svc_deferred_req *dr = NULL; 2055 2056 if (!test_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags)) 2057 return NULL; 2058 spin_lock(&svsk->sk_lock); 2059 clear_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags); 2060 if (!list_empty(&svsk->sk_deferred)) { 2061 dr = list_entry(svsk->sk_deferred.next, 2062 struct svc_deferred_req, 2063 handle.recent); 2064 list_del_init(&dr->handle.recent); 2065 set_bit(XPT_DEFERRED, &svsk->sk_xprt.xpt_flags); 2066 } 2067 spin_unlock(&svsk->sk_lock); 2068 return dr; 2069 } 2070