1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/errno.h> 9 #include <linux/fcntl.h> 10 #include <linux/net.h> 11 #include <linux/in.h> 12 #include <linux/inet.h> 13 #include <linux/udp.h> 14 #include <linux/tcp.h> 15 #include <linux/unistd.h> 16 #include <linux/slab.h> 17 #include <linux/netdevice.h> 18 #include <linux/skbuff.h> 19 #include <linux/file.h> 20 #include <linux/freezer.h> 21 #include <net/sock.h> 22 #include <net/checksum.h> 23 #include <net/ip.h> 24 #include <net/ipv6.h> 25 #include <net/tcp_states.h> 26 #include <linux/uaccess.h> 27 #include <asm/ioctls.h> 28 29 #include <linux/sunrpc/types.h> 30 #include <linux/sunrpc/clnt.h> 31 #include <linux/sunrpc/xdr.h> 32 #include <linux/sunrpc/stats.h> 33 #include <linux/sunrpc/svc_xprt.h> 34 35 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 36 37 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 38 static int svc_deferred_recv(struct svc_rqst *rqstp); 39 static struct cache_deferred_req *svc_defer(struct cache_req *req); 40 static void svc_age_temp_xprts(unsigned long closure); 41 42 /* apparently the "standard" is that clients close 43 * idle connections after 5 minutes, servers after 44 * 6 minutes 45 * http://www.connectathon.org/talks96/nfstcp.pdf 46 */ 47 static int svc_conn_age_period = 6*60; 48 49 /* List of registered transport classes */ 50 static DEFINE_SPINLOCK(svc_xprt_class_lock); 51 static LIST_HEAD(svc_xprt_class_list); 52 53 /* SMP locking strategy: 54 * 55 * svc_pool->sp_lock protects most of the fields of that pool. 56 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 57 * when both need to be taken (rare), svc_serv->sv_lock is first. 58 * BKL protects svc_serv->sv_nrthread. 59 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 60 * and the ->sk_info_authunix cache. 61 * 62 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 63 * enqueued multiply. During normal transport processing this bit 64 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 65 * Providers should not manipulate this bit directly. 66 * 67 * Some flags can be set to certain values at any time 68 * providing that certain rules are followed: 69 * 70 * XPT_CONN, XPT_DATA: 71 * - Can be set or cleared at any time. 72 * - After a set, svc_xprt_enqueue must be called to enqueue 73 * the transport for processing. 74 * - After a clear, the transport must be read/accepted. 75 * If this succeeds, it must be set again. 76 * XPT_CLOSE: 77 * - Can set at any time. It is never cleared. 78 * XPT_DEAD: 79 * - Can only be set while XPT_BUSY is held which ensures 80 * that no other thread will be using the transport or will 81 * try to set XPT_DEAD. 82 */ 83 84 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 85 { 86 struct svc_xprt_class *cl; 87 int res = -EEXIST; 88 89 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 90 91 INIT_LIST_HEAD(&xcl->xcl_list); 92 spin_lock(&svc_xprt_class_lock); 93 /* Make sure there isn't already a class with the same name */ 94 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 95 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 96 goto out; 97 } 98 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 99 res = 0; 100 out: 101 spin_unlock(&svc_xprt_class_lock); 102 return res; 103 } 104 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 105 106 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 107 { 108 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 109 spin_lock(&svc_xprt_class_lock); 110 list_del_init(&xcl->xcl_list); 111 spin_unlock(&svc_xprt_class_lock); 112 } 113 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 114 115 /* 116 * Format the transport list for printing 117 */ 118 int svc_print_xprts(char *buf, int maxlen) 119 { 120 struct list_head *le; 121 char tmpstr[80]; 122 int len = 0; 123 buf[0] = '\0'; 124 125 spin_lock(&svc_xprt_class_lock); 126 list_for_each(le, &svc_xprt_class_list) { 127 int slen; 128 struct svc_xprt_class *xcl = 129 list_entry(le, struct svc_xprt_class, xcl_list); 130 131 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 132 slen = strlen(tmpstr); 133 if (len + slen > maxlen) 134 break; 135 len += slen; 136 strcat(buf, tmpstr); 137 } 138 spin_unlock(&svc_xprt_class_lock); 139 140 return len; 141 } 142 143 static void svc_xprt_free(struct kref *kref) 144 { 145 struct svc_xprt *xprt = 146 container_of(kref, struct svc_xprt, xpt_ref); 147 struct module *owner = xprt->xpt_class->xcl_owner; 148 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) 149 && xprt->xpt_auth_cache != NULL) 150 svcauth_unix_info_release(xprt->xpt_auth_cache); 151 xprt->xpt_ops->xpo_free(xprt); 152 module_put(owner); 153 } 154 155 void svc_xprt_put(struct svc_xprt *xprt) 156 { 157 kref_put(&xprt->xpt_ref, svc_xprt_free); 158 } 159 EXPORT_SYMBOL_GPL(svc_xprt_put); 160 161 /* 162 * Called by transport drivers to initialize the transport independent 163 * portion of the transport instance. 164 */ 165 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 166 struct svc_serv *serv) 167 { 168 memset(xprt, 0, sizeof(*xprt)); 169 xprt->xpt_class = xcl; 170 xprt->xpt_ops = xcl->xcl_ops; 171 kref_init(&xprt->xpt_ref); 172 xprt->xpt_server = serv; 173 INIT_LIST_HEAD(&xprt->xpt_list); 174 INIT_LIST_HEAD(&xprt->xpt_ready); 175 INIT_LIST_HEAD(&xprt->xpt_deferred); 176 mutex_init(&xprt->xpt_mutex); 177 spin_lock_init(&xprt->xpt_lock); 178 set_bit(XPT_BUSY, &xprt->xpt_flags); 179 } 180 EXPORT_SYMBOL_GPL(svc_xprt_init); 181 182 int svc_create_xprt(struct svc_serv *serv, char *xprt_name, unsigned short port, 183 int flags) 184 { 185 struct svc_xprt_class *xcl; 186 struct sockaddr_in sin = { 187 .sin_family = AF_INET, 188 .sin_addr.s_addr = htonl(INADDR_ANY), 189 .sin_port = htons(port), 190 }; 191 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 192 spin_lock(&svc_xprt_class_lock); 193 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 194 struct svc_xprt *newxprt; 195 196 if (strcmp(xprt_name, xcl->xcl_name)) 197 continue; 198 199 if (!try_module_get(xcl->xcl_owner)) 200 goto err; 201 202 spin_unlock(&svc_xprt_class_lock); 203 newxprt = xcl->xcl_ops-> 204 xpo_create(serv, (struct sockaddr *)&sin, sizeof(sin), 205 flags); 206 if (IS_ERR(newxprt)) { 207 module_put(xcl->xcl_owner); 208 return PTR_ERR(newxprt); 209 } 210 211 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 212 spin_lock_bh(&serv->sv_lock); 213 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 214 spin_unlock_bh(&serv->sv_lock); 215 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 216 return svc_xprt_local_port(newxprt); 217 } 218 err: 219 spin_unlock(&svc_xprt_class_lock); 220 dprintk("svc: transport %s not found\n", xprt_name); 221 return -ENOENT; 222 } 223 EXPORT_SYMBOL_GPL(svc_create_xprt); 224 225 /* 226 * Copy the local and remote xprt addresses to the rqstp structure 227 */ 228 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 229 { 230 struct sockaddr *sin; 231 232 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 233 rqstp->rq_addrlen = xprt->xpt_remotelen; 234 235 /* 236 * Destination address in request is needed for binding the 237 * source address in RPC replies/callbacks later. 238 */ 239 sin = (struct sockaddr *)&xprt->xpt_local; 240 switch (sin->sa_family) { 241 case AF_INET: 242 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 243 break; 244 case AF_INET6: 245 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 246 break; 247 } 248 } 249 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 250 251 /** 252 * svc_print_addr - Format rq_addr field for printing 253 * @rqstp: svc_rqst struct containing address to print 254 * @buf: target buffer for formatted address 255 * @len: length of target buffer 256 * 257 */ 258 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 259 { 260 return __svc_print_addr(svc_addr(rqstp), buf, len); 261 } 262 EXPORT_SYMBOL_GPL(svc_print_addr); 263 264 /* 265 * Queue up an idle server thread. Must have pool->sp_lock held. 266 * Note: this is really a stack rather than a queue, so that we only 267 * use as many different threads as we need, and the rest don't pollute 268 * the cache. 269 */ 270 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 271 { 272 list_add(&rqstp->rq_list, &pool->sp_threads); 273 } 274 275 /* 276 * Dequeue an nfsd thread. Must have pool->sp_lock held. 277 */ 278 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 279 { 280 list_del(&rqstp->rq_list); 281 } 282 283 /* 284 * Queue up a transport with data pending. If there are idle nfsd 285 * processes, wake 'em up. 286 * 287 */ 288 void svc_xprt_enqueue(struct svc_xprt *xprt) 289 { 290 struct svc_serv *serv = xprt->xpt_server; 291 struct svc_pool *pool; 292 struct svc_rqst *rqstp; 293 int cpu; 294 295 if (!(xprt->xpt_flags & 296 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 297 return; 298 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 299 return; 300 301 cpu = get_cpu(); 302 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 303 put_cpu(); 304 305 spin_lock_bh(&pool->sp_lock); 306 307 if (!list_empty(&pool->sp_threads) && 308 !list_empty(&pool->sp_sockets)) 309 printk(KERN_ERR 310 "svc_xprt_enqueue: " 311 "threads and transports both waiting??\n"); 312 313 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 314 /* Don't enqueue dead transports */ 315 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 316 goto out_unlock; 317 } 318 319 /* Mark transport as busy. It will remain in this state until 320 * the provider calls svc_xprt_received. We update XPT_BUSY 321 * atomically because it also guards against trying to enqueue 322 * the transport twice. 323 */ 324 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 325 /* Don't enqueue transport while already enqueued */ 326 dprintk("svc: transport %p busy, not enqueued\n", xprt); 327 goto out_unlock; 328 } 329 BUG_ON(xprt->xpt_pool != NULL); 330 xprt->xpt_pool = pool; 331 332 /* Handle pending connection */ 333 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 334 goto process; 335 336 /* Handle close in-progress */ 337 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 338 goto process; 339 340 /* Check if we have space to reply to a request */ 341 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 342 /* Don't enqueue while not enough space for reply */ 343 dprintk("svc: no write space, transport %p not enqueued\n", 344 xprt); 345 xprt->xpt_pool = NULL; 346 clear_bit(XPT_BUSY, &xprt->xpt_flags); 347 goto out_unlock; 348 } 349 350 process: 351 if (!list_empty(&pool->sp_threads)) { 352 rqstp = list_entry(pool->sp_threads.next, 353 struct svc_rqst, 354 rq_list); 355 dprintk("svc: transport %p served by daemon %p\n", 356 xprt, rqstp); 357 svc_thread_dequeue(pool, rqstp); 358 if (rqstp->rq_xprt) 359 printk(KERN_ERR 360 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 361 rqstp, rqstp->rq_xprt); 362 rqstp->rq_xprt = xprt; 363 svc_xprt_get(xprt); 364 rqstp->rq_reserved = serv->sv_max_mesg; 365 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 366 BUG_ON(xprt->xpt_pool != pool); 367 wake_up(&rqstp->rq_wait); 368 } else { 369 dprintk("svc: transport %p put into queue\n", xprt); 370 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 371 BUG_ON(xprt->xpt_pool != pool); 372 } 373 374 out_unlock: 375 spin_unlock_bh(&pool->sp_lock); 376 } 377 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 378 379 /* 380 * Dequeue the first transport. Must be called with the pool->sp_lock held. 381 */ 382 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 383 { 384 struct svc_xprt *xprt; 385 386 if (list_empty(&pool->sp_sockets)) 387 return NULL; 388 389 xprt = list_entry(pool->sp_sockets.next, 390 struct svc_xprt, xpt_ready); 391 list_del_init(&xprt->xpt_ready); 392 393 dprintk("svc: transport %p dequeued, inuse=%d\n", 394 xprt, atomic_read(&xprt->xpt_ref.refcount)); 395 396 return xprt; 397 } 398 399 /* 400 * svc_xprt_received conditionally queues the transport for processing 401 * by another thread. The caller must hold the XPT_BUSY bit and must 402 * not thereafter touch transport data. 403 * 404 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 405 * insufficient) data. 406 */ 407 void svc_xprt_received(struct svc_xprt *xprt) 408 { 409 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 410 xprt->xpt_pool = NULL; 411 clear_bit(XPT_BUSY, &xprt->xpt_flags); 412 svc_xprt_enqueue(xprt); 413 } 414 EXPORT_SYMBOL_GPL(svc_xprt_received); 415 416 /** 417 * svc_reserve - change the space reserved for the reply to a request. 418 * @rqstp: The request in question 419 * @space: new max space to reserve 420 * 421 * Each request reserves some space on the output queue of the transport 422 * to make sure the reply fits. This function reduces that reserved 423 * space to be the amount of space used already, plus @space. 424 * 425 */ 426 void svc_reserve(struct svc_rqst *rqstp, int space) 427 { 428 space += rqstp->rq_res.head[0].iov_len; 429 430 if (space < rqstp->rq_reserved) { 431 struct svc_xprt *xprt = rqstp->rq_xprt; 432 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 433 rqstp->rq_reserved = space; 434 435 svc_xprt_enqueue(xprt); 436 } 437 } 438 EXPORT_SYMBOL(svc_reserve); 439 440 static void svc_xprt_release(struct svc_rqst *rqstp) 441 { 442 struct svc_xprt *xprt = rqstp->rq_xprt; 443 444 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 445 446 svc_free_res_pages(rqstp); 447 rqstp->rq_res.page_len = 0; 448 rqstp->rq_res.page_base = 0; 449 450 /* Reset response buffer and release 451 * the reservation. 452 * But first, check that enough space was reserved 453 * for the reply, otherwise we have a bug! 454 */ 455 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 456 printk(KERN_ERR "RPC request reserved %d but used %d\n", 457 rqstp->rq_reserved, 458 rqstp->rq_res.len); 459 460 rqstp->rq_res.head[0].iov_len = 0; 461 svc_reserve(rqstp, 0); 462 rqstp->rq_xprt = NULL; 463 464 svc_xprt_put(xprt); 465 } 466 467 /* 468 * External function to wake up a server waiting for data 469 * This really only makes sense for services like lockd 470 * which have exactly one thread anyway. 471 */ 472 void svc_wake_up(struct svc_serv *serv) 473 { 474 struct svc_rqst *rqstp; 475 unsigned int i; 476 struct svc_pool *pool; 477 478 for (i = 0; i < serv->sv_nrpools; i++) { 479 pool = &serv->sv_pools[i]; 480 481 spin_lock_bh(&pool->sp_lock); 482 if (!list_empty(&pool->sp_threads)) { 483 rqstp = list_entry(pool->sp_threads.next, 484 struct svc_rqst, 485 rq_list); 486 dprintk("svc: daemon %p woken up.\n", rqstp); 487 /* 488 svc_thread_dequeue(pool, rqstp); 489 rqstp->rq_xprt = NULL; 490 */ 491 wake_up(&rqstp->rq_wait); 492 } 493 spin_unlock_bh(&pool->sp_lock); 494 } 495 } 496 EXPORT_SYMBOL(svc_wake_up); 497 498 int svc_port_is_privileged(struct sockaddr *sin) 499 { 500 switch (sin->sa_family) { 501 case AF_INET: 502 return ntohs(((struct sockaddr_in *)sin)->sin_port) 503 < PROT_SOCK; 504 case AF_INET6: 505 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 506 < PROT_SOCK; 507 default: 508 return 0; 509 } 510 } 511 512 /* 513 * Make sure that we don't have too many active connections. If we 514 * have, something must be dropped. 515 * 516 * There's no point in trying to do random drop here for DoS 517 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 518 * attacker can easily beat that. 519 * 520 * The only somewhat efficient mechanism would be if drop old 521 * connections from the same IP first. But right now we don't even 522 * record the client IP in svc_sock. 523 */ 524 static void svc_check_conn_limits(struct svc_serv *serv) 525 { 526 if (serv->sv_tmpcnt > (serv->sv_nrthreads+3)*20) { 527 struct svc_xprt *xprt = NULL; 528 spin_lock_bh(&serv->sv_lock); 529 if (!list_empty(&serv->sv_tempsocks)) { 530 if (net_ratelimit()) { 531 /* Try to help the admin */ 532 printk(KERN_NOTICE "%s: too many open " 533 "connections, consider increasing the " 534 "number of nfsd threads\n", 535 serv->sv_name); 536 } 537 /* 538 * Always select the oldest connection. It's not fair, 539 * but so is life 540 */ 541 xprt = list_entry(serv->sv_tempsocks.prev, 542 struct svc_xprt, 543 xpt_list); 544 set_bit(XPT_CLOSE, &xprt->xpt_flags); 545 svc_xprt_get(xprt); 546 } 547 spin_unlock_bh(&serv->sv_lock); 548 549 if (xprt) { 550 svc_xprt_enqueue(xprt); 551 svc_xprt_put(xprt); 552 } 553 } 554 } 555 556 /* 557 * Receive the next request on any transport. This code is carefully 558 * organised not to touch any cachelines in the shared svc_serv 559 * structure, only cachelines in the local svc_pool. 560 */ 561 int svc_recv(struct svc_rqst *rqstp, long timeout) 562 { 563 struct svc_xprt *xprt = NULL; 564 struct svc_serv *serv = rqstp->rq_server; 565 struct svc_pool *pool = rqstp->rq_pool; 566 int len, i; 567 int pages; 568 struct xdr_buf *arg; 569 DECLARE_WAITQUEUE(wait, current); 570 571 dprintk("svc: server %p waiting for data (to = %ld)\n", 572 rqstp, timeout); 573 574 if (rqstp->rq_xprt) 575 printk(KERN_ERR 576 "svc_recv: service %p, transport not NULL!\n", 577 rqstp); 578 if (waitqueue_active(&rqstp->rq_wait)) 579 printk(KERN_ERR 580 "svc_recv: service %p, wait queue active!\n", 581 rqstp); 582 583 /* now allocate needed pages. If we get a failure, sleep briefly */ 584 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 585 for (i = 0; i < pages ; i++) 586 while (rqstp->rq_pages[i] == NULL) { 587 struct page *p = alloc_page(GFP_KERNEL); 588 if (!p) { 589 int j = msecs_to_jiffies(500); 590 schedule_timeout_uninterruptible(j); 591 } 592 rqstp->rq_pages[i] = p; 593 } 594 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 595 BUG_ON(pages >= RPCSVC_MAXPAGES); 596 597 /* Make arg->head point to first page and arg->pages point to rest */ 598 arg = &rqstp->rq_arg; 599 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 600 arg->head[0].iov_len = PAGE_SIZE; 601 arg->pages = rqstp->rq_pages + 1; 602 arg->page_base = 0; 603 /* save at least one page for response */ 604 arg->page_len = (pages-2)*PAGE_SIZE; 605 arg->len = (pages-1)*PAGE_SIZE; 606 arg->tail[0].iov_len = 0; 607 608 try_to_freeze(); 609 cond_resched(); 610 if (signalled()) 611 return -EINTR; 612 613 spin_lock_bh(&pool->sp_lock); 614 xprt = svc_xprt_dequeue(pool); 615 if (xprt) { 616 rqstp->rq_xprt = xprt; 617 svc_xprt_get(xprt); 618 rqstp->rq_reserved = serv->sv_max_mesg; 619 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 620 } else { 621 /* No data pending. Go to sleep */ 622 svc_thread_enqueue(pool, rqstp); 623 624 /* 625 * We have to be able to interrupt this wait 626 * to bring down the daemons ... 627 */ 628 set_current_state(TASK_INTERRUPTIBLE); 629 add_wait_queue(&rqstp->rq_wait, &wait); 630 spin_unlock_bh(&pool->sp_lock); 631 632 schedule_timeout(timeout); 633 634 try_to_freeze(); 635 636 spin_lock_bh(&pool->sp_lock); 637 remove_wait_queue(&rqstp->rq_wait, &wait); 638 639 xprt = rqstp->rq_xprt; 640 if (!xprt) { 641 svc_thread_dequeue(pool, rqstp); 642 spin_unlock_bh(&pool->sp_lock); 643 dprintk("svc: server %p, no data yet\n", rqstp); 644 return signalled()? -EINTR : -EAGAIN; 645 } 646 } 647 spin_unlock_bh(&pool->sp_lock); 648 649 len = 0; 650 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 651 dprintk("svc_recv: found XPT_CLOSE\n"); 652 svc_delete_xprt(xprt); 653 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 654 struct svc_xprt *newxpt; 655 newxpt = xprt->xpt_ops->xpo_accept(xprt); 656 if (newxpt) { 657 /* 658 * We know this module_get will succeed because the 659 * listener holds a reference too 660 */ 661 __module_get(newxpt->xpt_class->xcl_owner); 662 svc_check_conn_limits(xprt->xpt_server); 663 spin_lock_bh(&serv->sv_lock); 664 set_bit(XPT_TEMP, &newxpt->xpt_flags); 665 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 666 serv->sv_tmpcnt++; 667 if (serv->sv_temptimer.function == NULL) { 668 /* setup timer to age temp transports */ 669 setup_timer(&serv->sv_temptimer, 670 svc_age_temp_xprts, 671 (unsigned long)serv); 672 mod_timer(&serv->sv_temptimer, 673 jiffies + svc_conn_age_period * HZ); 674 } 675 spin_unlock_bh(&serv->sv_lock); 676 svc_xprt_received(newxpt); 677 } 678 svc_xprt_received(xprt); 679 } else { 680 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 681 rqstp, pool->sp_id, xprt, 682 atomic_read(&xprt->xpt_ref.refcount)); 683 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 684 if (rqstp->rq_deferred) { 685 svc_xprt_received(xprt); 686 len = svc_deferred_recv(rqstp); 687 } else 688 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 689 dprintk("svc: got len=%d\n", len); 690 } 691 692 /* No data, incomplete (TCP) read, or accept() */ 693 if (len == 0 || len == -EAGAIN) { 694 rqstp->rq_res.len = 0; 695 svc_xprt_release(rqstp); 696 return -EAGAIN; 697 } 698 clear_bit(XPT_OLD, &xprt->xpt_flags); 699 700 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 701 rqstp->rq_chandle.defer = svc_defer; 702 703 if (serv->sv_stats) 704 serv->sv_stats->netcnt++; 705 return len; 706 } 707 EXPORT_SYMBOL(svc_recv); 708 709 /* 710 * Drop request 711 */ 712 void svc_drop(struct svc_rqst *rqstp) 713 { 714 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 715 svc_xprt_release(rqstp); 716 } 717 EXPORT_SYMBOL(svc_drop); 718 719 /* 720 * Return reply to client. 721 */ 722 int svc_send(struct svc_rqst *rqstp) 723 { 724 struct svc_xprt *xprt; 725 int len; 726 struct xdr_buf *xb; 727 728 xprt = rqstp->rq_xprt; 729 if (!xprt) 730 return -EFAULT; 731 732 /* release the receive skb before sending the reply */ 733 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 734 735 /* calculate over-all length */ 736 xb = &rqstp->rq_res; 737 xb->len = xb->head[0].iov_len + 738 xb->page_len + 739 xb->tail[0].iov_len; 740 741 /* Grab mutex to serialize outgoing data. */ 742 mutex_lock(&xprt->xpt_mutex); 743 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 744 len = -ENOTCONN; 745 else 746 len = xprt->xpt_ops->xpo_sendto(rqstp); 747 mutex_unlock(&xprt->xpt_mutex); 748 svc_xprt_release(rqstp); 749 750 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 751 return 0; 752 return len; 753 } 754 755 /* 756 * Timer function to close old temporary transports, using 757 * a mark-and-sweep algorithm. 758 */ 759 static void svc_age_temp_xprts(unsigned long closure) 760 { 761 struct svc_serv *serv = (struct svc_serv *)closure; 762 struct svc_xprt *xprt; 763 struct list_head *le, *next; 764 LIST_HEAD(to_be_aged); 765 766 dprintk("svc_age_temp_xprts\n"); 767 768 if (!spin_trylock_bh(&serv->sv_lock)) { 769 /* busy, try again 1 sec later */ 770 dprintk("svc_age_temp_xprts: busy\n"); 771 mod_timer(&serv->sv_temptimer, jiffies + HZ); 772 return; 773 } 774 775 list_for_each_safe(le, next, &serv->sv_tempsocks) { 776 xprt = list_entry(le, struct svc_xprt, xpt_list); 777 778 /* First time through, just mark it OLD. Second time 779 * through, close it. */ 780 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 781 continue; 782 if (atomic_read(&xprt->xpt_ref.refcount) > 1 783 || test_bit(XPT_BUSY, &xprt->xpt_flags)) 784 continue; 785 svc_xprt_get(xprt); 786 list_move(le, &to_be_aged); 787 set_bit(XPT_CLOSE, &xprt->xpt_flags); 788 set_bit(XPT_DETACHED, &xprt->xpt_flags); 789 } 790 spin_unlock_bh(&serv->sv_lock); 791 792 while (!list_empty(&to_be_aged)) { 793 le = to_be_aged.next; 794 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 795 list_del_init(le); 796 xprt = list_entry(le, struct svc_xprt, xpt_list); 797 798 dprintk("queuing xprt %p for closing\n", xprt); 799 800 /* a thread will dequeue and close it soon */ 801 svc_xprt_enqueue(xprt); 802 svc_xprt_put(xprt); 803 } 804 805 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 806 } 807 808 /* 809 * Remove a dead transport 810 */ 811 void svc_delete_xprt(struct svc_xprt *xprt) 812 { 813 struct svc_serv *serv = xprt->xpt_server; 814 815 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 816 xprt->xpt_ops->xpo_detach(xprt); 817 818 spin_lock_bh(&serv->sv_lock); 819 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 820 list_del_init(&xprt->xpt_list); 821 /* 822 * We used to delete the transport from whichever list 823 * it's sk_xprt.xpt_ready node was on, but we don't actually 824 * need to. This is because the only time we're called 825 * while still attached to a queue, the queue itself 826 * is about to be destroyed (in svc_destroy). 827 */ 828 if (!test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) { 829 BUG_ON(atomic_read(&xprt->xpt_ref.refcount) < 2); 830 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 831 serv->sv_tmpcnt--; 832 svc_xprt_put(xprt); 833 } 834 spin_unlock_bh(&serv->sv_lock); 835 } 836 837 void svc_close_xprt(struct svc_xprt *xprt) 838 { 839 set_bit(XPT_CLOSE, &xprt->xpt_flags); 840 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 841 /* someone else will have to effect the close */ 842 return; 843 844 svc_xprt_get(xprt); 845 svc_delete_xprt(xprt); 846 clear_bit(XPT_BUSY, &xprt->xpt_flags); 847 svc_xprt_put(xprt); 848 } 849 EXPORT_SYMBOL_GPL(svc_close_xprt); 850 851 void svc_close_all(struct list_head *xprt_list) 852 { 853 struct svc_xprt *xprt; 854 struct svc_xprt *tmp; 855 856 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 857 set_bit(XPT_CLOSE, &xprt->xpt_flags); 858 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 859 /* Waiting to be processed, but no threads left, 860 * So just remove it from the waiting list 861 */ 862 list_del_init(&xprt->xpt_ready); 863 clear_bit(XPT_BUSY, &xprt->xpt_flags); 864 } 865 svc_close_xprt(xprt); 866 } 867 } 868 869 /* 870 * Handle defer and revisit of requests 871 */ 872 873 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 874 { 875 struct svc_deferred_req *dr = 876 container_of(dreq, struct svc_deferred_req, handle); 877 struct svc_xprt *xprt = dr->xprt; 878 879 if (too_many) { 880 svc_xprt_put(xprt); 881 kfree(dr); 882 return; 883 } 884 dprintk("revisit queued\n"); 885 dr->xprt = NULL; 886 spin_lock(&xprt->xpt_lock); 887 list_add(&dr->handle.recent, &xprt->xpt_deferred); 888 spin_unlock(&xprt->xpt_lock); 889 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 890 svc_xprt_enqueue(xprt); 891 svc_xprt_put(xprt); 892 } 893 894 /* 895 * Save the request off for later processing. The request buffer looks 896 * like this: 897 * 898 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 899 * 900 * This code can only handle requests that consist of an xprt-header 901 * and rpc-header. 902 */ 903 static struct cache_deferred_req *svc_defer(struct cache_req *req) 904 { 905 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 906 struct svc_deferred_req *dr; 907 908 if (rqstp->rq_arg.page_len) 909 return NULL; /* if more than a page, give up FIXME */ 910 if (rqstp->rq_deferred) { 911 dr = rqstp->rq_deferred; 912 rqstp->rq_deferred = NULL; 913 } else { 914 size_t skip; 915 size_t size; 916 /* FIXME maybe discard if size too large */ 917 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 918 dr = kmalloc(size, GFP_KERNEL); 919 if (dr == NULL) 920 return NULL; 921 922 dr->handle.owner = rqstp->rq_server; 923 dr->prot = rqstp->rq_prot; 924 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 925 dr->addrlen = rqstp->rq_addrlen; 926 dr->daddr = rqstp->rq_daddr; 927 dr->argslen = rqstp->rq_arg.len >> 2; 928 dr->xprt_hlen = rqstp->rq_xprt_hlen; 929 930 /* back up head to the start of the buffer and copy */ 931 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 932 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 933 dr->argslen << 2); 934 } 935 svc_xprt_get(rqstp->rq_xprt); 936 dr->xprt = rqstp->rq_xprt; 937 938 dr->handle.revisit = svc_revisit; 939 return &dr->handle; 940 } 941 942 /* 943 * recv data from a deferred request into an active one 944 */ 945 static int svc_deferred_recv(struct svc_rqst *rqstp) 946 { 947 struct svc_deferred_req *dr = rqstp->rq_deferred; 948 949 /* setup iov_base past transport header */ 950 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 951 /* The iov_len does not include the transport header bytes */ 952 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 953 rqstp->rq_arg.page_len = 0; 954 /* The rq_arg.len includes the transport header bytes */ 955 rqstp->rq_arg.len = dr->argslen<<2; 956 rqstp->rq_prot = dr->prot; 957 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 958 rqstp->rq_addrlen = dr->addrlen; 959 /* Save off transport header len in case we get deferred again */ 960 rqstp->rq_xprt_hlen = dr->xprt_hlen; 961 rqstp->rq_daddr = dr->daddr; 962 rqstp->rq_respages = rqstp->rq_pages; 963 return (dr->argslen<<2) - dr->xprt_hlen; 964 } 965 966 967 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 968 { 969 struct svc_deferred_req *dr = NULL; 970 971 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 972 return NULL; 973 spin_lock(&xprt->xpt_lock); 974 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 975 if (!list_empty(&xprt->xpt_deferred)) { 976 dr = list_entry(xprt->xpt_deferred.next, 977 struct svc_deferred_req, 978 handle.recent); 979 list_del_init(&dr->handle.recent); 980 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 981 } 982 spin_unlock(&xprt->xpt_lock); 983 return dr; 984 } 985 986 /* 987 * Return the transport instance pointer for the endpoint accepting 988 * connections/peer traffic from the specified transport class, 989 * address family and port. 990 * 991 * Specifying 0 for the address family or port is effectively a 992 * wild-card, and will result in matching the first transport in the 993 * service's list that has a matching class name. 994 */ 995 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, char *xcl_name, 996 int af, int port) 997 { 998 struct svc_xprt *xprt; 999 struct svc_xprt *found = NULL; 1000 1001 /* Sanity check the args */ 1002 if (!serv || !xcl_name) 1003 return found; 1004 1005 spin_lock_bh(&serv->sv_lock); 1006 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1007 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1008 continue; 1009 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1010 continue; 1011 if (port && port != svc_xprt_local_port(xprt)) 1012 continue; 1013 found = xprt; 1014 svc_xprt_get(xprt); 1015 break; 1016 } 1017 spin_unlock_bh(&serv->sv_lock); 1018 return found; 1019 } 1020 EXPORT_SYMBOL_GPL(svc_find_xprt); 1021 1022 /* 1023 * Format a buffer with a list of the active transports. A zero for 1024 * the buflen parameter disables target buffer overflow checking. 1025 */ 1026 int svc_xprt_names(struct svc_serv *serv, char *buf, int buflen) 1027 { 1028 struct svc_xprt *xprt; 1029 char xprt_str[64]; 1030 int totlen = 0; 1031 int len; 1032 1033 /* Sanity check args */ 1034 if (!serv) 1035 return 0; 1036 1037 spin_lock_bh(&serv->sv_lock); 1038 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1039 len = snprintf(xprt_str, sizeof(xprt_str), 1040 "%s %d\n", xprt->xpt_class->xcl_name, 1041 svc_xprt_local_port(xprt)); 1042 /* If the string was truncated, replace with error string */ 1043 if (len >= sizeof(xprt_str)) 1044 strcpy(xprt_str, "name-too-long\n"); 1045 /* Don't overflow buffer */ 1046 len = strlen(xprt_str); 1047 if (buflen && (len + totlen >= buflen)) 1048 break; 1049 strcpy(buf+totlen, xprt_str); 1050 totlen += len; 1051 } 1052 spin_unlock_bh(&serv->sv_lock); 1053 return totlen; 1054 } 1055 EXPORT_SYMBOL_GPL(svc_xprt_names); 1056