1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/smp_lock.h> 9 #include <linux/errno.h> 10 #include <linux/freezer.h> 11 #include <linux/kthread.h> 12 #include <linux/slab.h> 13 #include <net/sock.h> 14 #include <linux/sunrpc/stats.h> 15 #include <linux/sunrpc/svc_xprt.h> 16 #include <linux/sunrpc/svcsock.h> 17 18 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 19 20 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 21 static int svc_deferred_recv(struct svc_rqst *rqstp); 22 static struct cache_deferred_req *svc_defer(struct cache_req *req); 23 static void svc_age_temp_xprts(unsigned long closure); 24 25 /* apparently the "standard" is that clients close 26 * idle connections after 5 minutes, servers after 27 * 6 minutes 28 * http://www.connectathon.org/talks96/nfstcp.pdf 29 */ 30 static int svc_conn_age_period = 6*60; 31 32 /* List of registered transport classes */ 33 static DEFINE_SPINLOCK(svc_xprt_class_lock); 34 static LIST_HEAD(svc_xprt_class_list); 35 36 /* SMP locking strategy: 37 * 38 * svc_pool->sp_lock protects most of the fields of that pool. 39 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 40 * when both need to be taken (rare), svc_serv->sv_lock is first. 41 * BKL protects svc_serv->sv_nrthread. 42 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 43 * and the ->sk_info_authunix cache. 44 * 45 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 46 * enqueued multiply. During normal transport processing this bit 47 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 48 * Providers should not manipulate this bit directly. 49 * 50 * Some flags can be set to certain values at any time 51 * providing that certain rules are followed: 52 * 53 * XPT_CONN, XPT_DATA: 54 * - Can be set or cleared at any time. 55 * - After a set, svc_xprt_enqueue must be called to enqueue 56 * the transport for processing. 57 * - After a clear, the transport must be read/accepted. 58 * If this succeeds, it must be set again. 59 * XPT_CLOSE: 60 * - Can set at any time. It is never cleared. 61 * XPT_DEAD: 62 * - Can only be set while XPT_BUSY is held which ensures 63 * that no other thread will be using the transport or will 64 * try to set XPT_DEAD. 65 */ 66 67 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 68 { 69 struct svc_xprt_class *cl; 70 int res = -EEXIST; 71 72 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 73 74 INIT_LIST_HEAD(&xcl->xcl_list); 75 spin_lock(&svc_xprt_class_lock); 76 /* Make sure there isn't already a class with the same name */ 77 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 78 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 79 goto out; 80 } 81 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 82 res = 0; 83 out: 84 spin_unlock(&svc_xprt_class_lock); 85 return res; 86 } 87 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 88 89 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 90 { 91 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 92 spin_lock(&svc_xprt_class_lock); 93 list_del_init(&xcl->xcl_list); 94 spin_unlock(&svc_xprt_class_lock); 95 } 96 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 97 98 /* 99 * Format the transport list for printing 100 */ 101 int svc_print_xprts(char *buf, int maxlen) 102 { 103 struct list_head *le; 104 char tmpstr[80]; 105 int len = 0; 106 buf[0] = '\0'; 107 108 spin_lock(&svc_xprt_class_lock); 109 list_for_each(le, &svc_xprt_class_list) { 110 int slen; 111 struct svc_xprt_class *xcl = 112 list_entry(le, struct svc_xprt_class, xcl_list); 113 114 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 115 slen = strlen(tmpstr); 116 if (len + slen > maxlen) 117 break; 118 len += slen; 119 strcat(buf, tmpstr); 120 } 121 spin_unlock(&svc_xprt_class_lock); 122 123 return len; 124 } 125 126 static void svc_xprt_free(struct kref *kref) 127 { 128 struct svc_xprt *xprt = 129 container_of(kref, struct svc_xprt, xpt_ref); 130 struct module *owner = xprt->xpt_class->xcl_owner; 131 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) && 132 xprt->xpt_auth_cache != NULL) 133 svcauth_unix_info_release(xprt->xpt_auth_cache); 134 xprt->xpt_ops->xpo_free(xprt); 135 module_put(owner); 136 } 137 138 void svc_xprt_put(struct svc_xprt *xprt) 139 { 140 kref_put(&xprt->xpt_ref, svc_xprt_free); 141 } 142 EXPORT_SYMBOL_GPL(svc_xprt_put); 143 144 /* 145 * Called by transport drivers to initialize the transport independent 146 * portion of the transport instance. 147 */ 148 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 149 struct svc_serv *serv) 150 { 151 memset(xprt, 0, sizeof(*xprt)); 152 xprt->xpt_class = xcl; 153 xprt->xpt_ops = xcl->xcl_ops; 154 kref_init(&xprt->xpt_ref); 155 xprt->xpt_server = serv; 156 INIT_LIST_HEAD(&xprt->xpt_list); 157 INIT_LIST_HEAD(&xprt->xpt_ready); 158 INIT_LIST_HEAD(&xprt->xpt_deferred); 159 mutex_init(&xprt->xpt_mutex); 160 spin_lock_init(&xprt->xpt_lock); 161 set_bit(XPT_BUSY, &xprt->xpt_flags); 162 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 163 } 164 EXPORT_SYMBOL_GPL(svc_xprt_init); 165 166 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 167 struct svc_serv *serv, 168 const int family, 169 const unsigned short port, 170 int flags) 171 { 172 struct sockaddr_in sin = { 173 .sin_family = AF_INET, 174 .sin_addr.s_addr = htonl(INADDR_ANY), 175 .sin_port = htons(port), 176 }; 177 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 178 struct sockaddr_in6 sin6 = { 179 .sin6_family = AF_INET6, 180 .sin6_addr = IN6ADDR_ANY_INIT, 181 .sin6_port = htons(port), 182 }; 183 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 184 struct sockaddr *sap; 185 size_t len; 186 187 switch (family) { 188 case PF_INET: 189 sap = (struct sockaddr *)&sin; 190 len = sizeof(sin); 191 break; 192 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 193 case PF_INET6: 194 sap = (struct sockaddr *)&sin6; 195 len = sizeof(sin6); 196 break; 197 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 198 default: 199 return ERR_PTR(-EAFNOSUPPORT); 200 } 201 202 return xcl->xcl_ops->xpo_create(serv, sap, len, flags); 203 } 204 205 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 206 const int family, const unsigned short port, 207 int flags) 208 { 209 struct svc_xprt_class *xcl; 210 211 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 212 spin_lock(&svc_xprt_class_lock); 213 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 214 struct svc_xprt *newxprt; 215 216 if (strcmp(xprt_name, xcl->xcl_name)) 217 continue; 218 219 if (!try_module_get(xcl->xcl_owner)) 220 goto err; 221 222 spin_unlock(&svc_xprt_class_lock); 223 newxprt = __svc_xpo_create(xcl, serv, family, port, flags); 224 if (IS_ERR(newxprt)) { 225 module_put(xcl->xcl_owner); 226 return PTR_ERR(newxprt); 227 } 228 229 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 230 spin_lock_bh(&serv->sv_lock); 231 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 232 spin_unlock_bh(&serv->sv_lock); 233 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 234 return svc_xprt_local_port(newxprt); 235 } 236 err: 237 spin_unlock(&svc_xprt_class_lock); 238 dprintk("svc: transport %s not found\n", xprt_name); 239 240 /* This errno is exposed to user space. Provide a reasonable 241 * perror msg for a bad transport. */ 242 return -EPROTONOSUPPORT; 243 } 244 EXPORT_SYMBOL_GPL(svc_create_xprt); 245 246 /* 247 * Copy the local and remote xprt addresses to the rqstp structure 248 */ 249 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 250 { 251 struct sockaddr *sin; 252 253 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 254 rqstp->rq_addrlen = xprt->xpt_remotelen; 255 256 /* 257 * Destination address in request is needed for binding the 258 * source address in RPC replies/callbacks later. 259 */ 260 sin = (struct sockaddr *)&xprt->xpt_local; 261 switch (sin->sa_family) { 262 case AF_INET: 263 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 264 break; 265 case AF_INET6: 266 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 267 break; 268 } 269 } 270 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 271 272 /** 273 * svc_print_addr - Format rq_addr field for printing 274 * @rqstp: svc_rqst struct containing address to print 275 * @buf: target buffer for formatted address 276 * @len: length of target buffer 277 * 278 */ 279 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 280 { 281 return __svc_print_addr(svc_addr(rqstp), buf, len); 282 } 283 EXPORT_SYMBOL_GPL(svc_print_addr); 284 285 /* 286 * Queue up an idle server thread. Must have pool->sp_lock held. 287 * Note: this is really a stack rather than a queue, so that we only 288 * use as many different threads as we need, and the rest don't pollute 289 * the cache. 290 */ 291 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 292 { 293 list_add(&rqstp->rq_list, &pool->sp_threads); 294 } 295 296 /* 297 * Dequeue an nfsd thread. Must have pool->sp_lock held. 298 */ 299 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 300 { 301 list_del(&rqstp->rq_list); 302 } 303 304 /* 305 * Queue up a transport with data pending. If there are idle nfsd 306 * processes, wake 'em up. 307 * 308 */ 309 void svc_xprt_enqueue(struct svc_xprt *xprt) 310 { 311 struct svc_serv *serv = xprt->xpt_server; 312 struct svc_pool *pool; 313 struct svc_rqst *rqstp; 314 int cpu; 315 316 if (!(xprt->xpt_flags & 317 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 318 return; 319 320 cpu = get_cpu(); 321 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 322 put_cpu(); 323 324 spin_lock_bh(&pool->sp_lock); 325 326 if (!list_empty(&pool->sp_threads) && 327 !list_empty(&pool->sp_sockets)) 328 printk(KERN_ERR 329 "svc_xprt_enqueue: " 330 "threads and transports both waiting??\n"); 331 332 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 333 /* Don't enqueue dead transports */ 334 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 335 goto out_unlock; 336 } 337 338 pool->sp_stats.packets++; 339 340 /* Mark transport as busy. It will remain in this state until 341 * the provider calls svc_xprt_received. We update XPT_BUSY 342 * atomically because it also guards against trying to enqueue 343 * the transport twice. 344 */ 345 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 346 /* Don't enqueue transport while already enqueued */ 347 dprintk("svc: transport %p busy, not enqueued\n", xprt); 348 goto out_unlock; 349 } 350 BUG_ON(xprt->xpt_pool != NULL); 351 xprt->xpt_pool = pool; 352 353 /* Handle pending connection */ 354 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 355 goto process; 356 357 /* Handle close in-progress */ 358 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 359 goto process; 360 361 /* Check if we have space to reply to a request */ 362 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 363 /* Don't enqueue while not enough space for reply */ 364 dprintk("svc: no write space, transport %p not enqueued\n", 365 xprt); 366 xprt->xpt_pool = NULL; 367 clear_bit(XPT_BUSY, &xprt->xpt_flags); 368 goto out_unlock; 369 } 370 371 process: 372 if (!list_empty(&pool->sp_threads)) { 373 rqstp = list_entry(pool->sp_threads.next, 374 struct svc_rqst, 375 rq_list); 376 dprintk("svc: transport %p served by daemon %p\n", 377 xprt, rqstp); 378 svc_thread_dequeue(pool, rqstp); 379 if (rqstp->rq_xprt) 380 printk(KERN_ERR 381 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 382 rqstp, rqstp->rq_xprt); 383 rqstp->rq_xprt = xprt; 384 svc_xprt_get(xprt); 385 rqstp->rq_reserved = serv->sv_max_mesg; 386 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 387 pool->sp_stats.threads_woken++; 388 BUG_ON(xprt->xpt_pool != pool); 389 wake_up(&rqstp->rq_wait); 390 } else { 391 dprintk("svc: transport %p put into queue\n", xprt); 392 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 393 pool->sp_stats.sockets_queued++; 394 BUG_ON(xprt->xpt_pool != pool); 395 } 396 397 out_unlock: 398 spin_unlock_bh(&pool->sp_lock); 399 } 400 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 401 402 /* 403 * Dequeue the first transport. Must be called with the pool->sp_lock held. 404 */ 405 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 406 { 407 struct svc_xprt *xprt; 408 409 if (list_empty(&pool->sp_sockets)) 410 return NULL; 411 412 xprt = list_entry(pool->sp_sockets.next, 413 struct svc_xprt, xpt_ready); 414 list_del_init(&xprt->xpt_ready); 415 416 dprintk("svc: transport %p dequeued, inuse=%d\n", 417 xprt, atomic_read(&xprt->xpt_ref.refcount)); 418 419 return xprt; 420 } 421 422 /* 423 * svc_xprt_received conditionally queues the transport for processing 424 * by another thread. The caller must hold the XPT_BUSY bit and must 425 * not thereafter touch transport data. 426 * 427 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 428 * insufficient) data. 429 */ 430 void svc_xprt_received(struct svc_xprt *xprt) 431 { 432 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 433 xprt->xpt_pool = NULL; 434 clear_bit(XPT_BUSY, &xprt->xpt_flags); 435 svc_xprt_enqueue(xprt); 436 } 437 EXPORT_SYMBOL_GPL(svc_xprt_received); 438 439 /** 440 * svc_reserve - change the space reserved for the reply to a request. 441 * @rqstp: The request in question 442 * @space: new max space to reserve 443 * 444 * Each request reserves some space on the output queue of the transport 445 * to make sure the reply fits. This function reduces that reserved 446 * space to be the amount of space used already, plus @space. 447 * 448 */ 449 void svc_reserve(struct svc_rqst *rqstp, int space) 450 { 451 space += rqstp->rq_res.head[0].iov_len; 452 453 if (space < rqstp->rq_reserved) { 454 struct svc_xprt *xprt = rqstp->rq_xprt; 455 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 456 rqstp->rq_reserved = space; 457 458 svc_xprt_enqueue(xprt); 459 } 460 } 461 EXPORT_SYMBOL_GPL(svc_reserve); 462 463 static void svc_xprt_release(struct svc_rqst *rqstp) 464 { 465 struct svc_xprt *xprt = rqstp->rq_xprt; 466 467 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 468 469 kfree(rqstp->rq_deferred); 470 rqstp->rq_deferred = NULL; 471 472 svc_free_res_pages(rqstp); 473 rqstp->rq_res.page_len = 0; 474 rqstp->rq_res.page_base = 0; 475 476 /* Reset response buffer and release 477 * the reservation. 478 * But first, check that enough space was reserved 479 * for the reply, otherwise we have a bug! 480 */ 481 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 482 printk(KERN_ERR "RPC request reserved %d but used %d\n", 483 rqstp->rq_reserved, 484 rqstp->rq_res.len); 485 486 rqstp->rq_res.head[0].iov_len = 0; 487 svc_reserve(rqstp, 0); 488 rqstp->rq_xprt = NULL; 489 490 svc_xprt_put(xprt); 491 } 492 493 /* 494 * External function to wake up a server waiting for data 495 * This really only makes sense for services like lockd 496 * which have exactly one thread anyway. 497 */ 498 void svc_wake_up(struct svc_serv *serv) 499 { 500 struct svc_rqst *rqstp; 501 unsigned int i; 502 struct svc_pool *pool; 503 504 for (i = 0; i < serv->sv_nrpools; i++) { 505 pool = &serv->sv_pools[i]; 506 507 spin_lock_bh(&pool->sp_lock); 508 if (!list_empty(&pool->sp_threads)) { 509 rqstp = list_entry(pool->sp_threads.next, 510 struct svc_rqst, 511 rq_list); 512 dprintk("svc: daemon %p woken up.\n", rqstp); 513 /* 514 svc_thread_dequeue(pool, rqstp); 515 rqstp->rq_xprt = NULL; 516 */ 517 wake_up(&rqstp->rq_wait); 518 } 519 spin_unlock_bh(&pool->sp_lock); 520 } 521 } 522 EXPORT_SYMBOL_GPL(svc_wake_up); 523 524 int svc_port_is_privileged(struct sockaddr *sin) 525 { 526 switch (sin->sa_family) { 527 case AF_INET: 528 return ntohs(((struct sockaddr_in *)sin)->sin_port) 529 < PROT_SOCK; 530 case AF_INET6: 531 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 532 < PROT_SOCK; 533 default: 534 return 0; 535 } 536 } 537 538 /* 539 * Make sure that we don't have too many active connections. If we have, 540 * something must be dropped. It's not clear what will happen if we allow 541 * "too many" connections, but when dealing with network-facing software, 542 * we have to code defensively. Here we do that by imposing hard limits. 543 * 544 * There's no point in trying to do random drop here for DoS 545 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 546 * attacker can easily beat that. 547 * 548 * The only somewhat efficient mechanism would be if drop old 549 * connections from the same IP first. But right now we don't even 550 * record the client IP in svc_sock. 551 * 552 * single-threaded services that expect a lot of clients will probably 553 * need to set sv_maxconn to override the default value which is based 554 * on the number of threads 555 */ 556 static void svc_check_conn_limits(struct svc_serv *serv) 557 { 558 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 559 (serv->sv_nrthreads+3) * 20; 560 561 if (serv->sv_tmpcnt > limit) { 562 struct svc_xprt *xprt = NULL; 563 spin_lock_bh(&serv->sv_lock); 564 if (!list_empty(&serv->sv_tempsocks)) { 565 if (net_ratelimit()) { 566 /* Try to help the admin */ 567 printk(KERN_NOTICE "%s: too many open " 568 "connections, consider increasing %s\n", 569 serv->sv_name, serv->sv_maxconn ? 570 "the max number of connections." : 571 "the number of threads."); 572 } 573 /* 574 * Always select the oldest connection. It's not fair, 575 * but so is life 576 */ 577 xprt = list_entry(serv->sv_tempsocks.prev, 578 struct svc_xprt, 579 xpt_list); 580 set_bit(XPT_CLOSE, &xprt->xpt_flags); 581 svc_xprt_get(xprt); 582 } 583 spin_unlock_bh(&serv->sv_lock); 584 585 if (xprt) { 586 svc_xprt_enqueue(xprt); 587 svc_xprt_put(xprt); 588 } 589 } 590 } 591 592 /* 593 * Receive the next request on any transport. This code is carefully 594 * organised not to touch any cachelines in the shared svc_serv 595 * structure, only cachelines in the local svc_pool. 596 */ 597 int svc_recv(struct svc_rqst *rqstp, long timeout) 598 { 599 struct svc_xprt *xprt = NULL; 600 struct svc_serv *serv = rqstp->rq_server; 601 struct svc_pool *pool = rqstp->rq_pool; 602 int len, i; 603 int pages; 604 struct xdr_buf *arg; 605 DECLARE_WAITQUEUE(wait, current); 606 long time_left; 607 608 dprintk("svc: server %p waiting for data (to = %ld)\n", 609 rqstp, timeout); 610 611 if (rqstp->rq_xprt) 612 printk(KERN_ERR 613 "svc_recv: service %p, transport not NULL!\n", 614 rqstp); 615 if (waitqueue_active(&rqstp->rq_wait)) 616 printk(KERN_ERR 617 "svc_recv: service %p, wait queue active!\n", 618 rqstp); 619 620 /* now allocate needed pages. If we get a failure, sleep briefly */ 621 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 622 for (i = 0; i < pages ; i++) 623 while (rqstp->rq_pages[i] == NULL) { 624 struct page *p = alloc_page(GFP_KERNEL); 625 if (!p) { 626 set_current_state(TASK_INTERRUPTIBLE); 627 if (signalled() || kthread_should_stop()) { 628 set_current_state(TASK_RUNNING); 629 return -EINTR; 630 } 631 schedule_timeout(msecs_to_jiffies(500)); 632 } 633 rqstp->rq_pages[i] = p; 634 } 635 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 636 BUG_ON(pages >= RPCSVC_MAXPAGES); 637 638 /* Make arg->head point to first page and arg->pages point to rest */ 639 arg = &rqstp->rq_arg; 640 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 641 arg->head[0].iov_len = PAGE_SIZE; 642 arg->pages = rqstp->rq_pages + 1; 643 arg->page_base = 0; 644 /* save at least one page for response */ 645 arg->page_len = (pages-2)*PAGE_SIZE; 646 arg->len = (pages-1)*PAGE_SIZE; 647 arg->tail[0].iov_len = 0; 648 649 try_to_freeze(); 650 cond_resched(); 651 if (signalled() || kthread_should_stop()) 652 return -EINTR; 653 654 spin_lock_bh(&pool->sp_lock); 655 xprt = svc_xprt_dequeue(pool); 656 if (xprt) { 657 rqstp->rq_xprt = xprt; 658 svc_xprt_get(xprt); 659 rqstp->rq_reserved = serv->sv_max_mesg; 660 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 661 } else { 662 /* No data pending. Go to sleep */ 663 svc_thread_enqueue(pool, rqstp); 664 665 /* 666 * We have to be able to interrupt this wait 667 * to bring down the daemons ... 668 */ 669 set_current_state(TASK_INTERRUPTIBLE); 670 671 /* 672 * checking kthread_should_stop() here allows us to avoid 673 * locking and signalling when stopping kthreads that call 674 * svc_recv. If the thread has already been woken up, then 675 * we can exit here without sleeping. If not, then it 676 * it'll be woken up quickly during the schedule_timeout 677 */ 678 if (kthread_should_stop()) { 679 set_current_state(TASK_RUNNING); 680 spin_unlock_bh(&pool->sp_lock); 681 return -EINTR; 682 } 683 684 add_wait_queue(&rqstp->rq_wait, &wait); 685 spin_unlock_bh(&pool->sp_lock); 686 687 time_left = schedule_timeout(timeout); 688 689 try_to_freeze(); 690 691 spin_lock_bh(&pool->sp_lock); 692 remove_wait_queue(&rqstp->rq_wait, &wait); 693 if (!time_left) 694 pool->sp_stats.threads_timedout++; 695 696 xprt = rqstp->rq_xprt; 697 if (!xprt) { 698 svc_thread_dequeue(pool, rqstp); 699 spin_unlock_bh(&pool->sp_lock); 700 dprintk("svc: server %p, no data yet\n", rqstp); 701 if (signalled() || kthread_should_stop()) 702 return -EINTR; 703 else 704 return -EAGAIN; 705 } 706 } 707 spin_unlock_bh(&pool->sp_lock); 708 709 len = 0; 710 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 711 dprintk("svc_recv: found XPT_CLOSE\n"); 712 svc_delete_xprt(xprt); 713 } else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 714 struct svc_xprt *newxpt; 715 newxpt = xprt->xpt_ops->xpo_accept(xprt); 716 if (newxpt) { 717 /* 718 * We know this module_get will succeed because the 719 * listener holds a reference too 720 */ 721 __module_get(newxpt->xpt_class->xcl_owner); 722 svc_check_conn_limits(xprt->xpt_server); 723 spin_lock_bh(&serv->sv_lock); 724 set_bit(XPT_TEMP, &newxpt->xpt_flags); 725 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 726 serv->sv_tmpcnt++; 727 if (serv->sv_temptimer.function == NULL) { 728 /* setup timer to age temp transports */ 729 setup_timer(&serv->sv_temptimer, 730 svc_age_temp_xprts, 731 (unsigned long)serv); 732 mod_timer(&serv->sv_temptimer, 733 jiffies + svc_conn_age_period * HZ); 734 } 735 spin_unlock_bh(&serv->sv_lock); 736 svc_xprt_received(newxpt); 737 } 738 svc_xprt_received(xprt); 739 } else { 740 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 741 rqstp, pool->sp_id, xprt, 742 atomic_read(&xprt->xpt_ref.refcount)); 743 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 744 if (rqstp->rq_deferred) { 745 svc_xprt_received(xprt); 746 len = svc_deferred_recv(rqstp); 747 } else { 748 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 749 svc_xprt_received(xprt); 750 } 751 dprintk("svc: got len=%d\n", len); 752 } 753 754 /* No data, incomplete (TCP) read, or accept() */ 755 if (len == 0 || len == -EAGAIN) { 756 rqstp->rq_res.len = 0; 757 svc_xprt_release(rqstp); 758 return -EAGAIN; 759 } 760 clear_bit(XPT_OLD, &xprt->xpt_flags); 761 762 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 763 rqstp->rq_chandle.defer = svc_defer; 764 765 if (serv->sv_stats) 766 serv->sv_stats->netcnt++; 767 return len; 768 } 769 EXPORT_SYMBOL_GPL(svc_recv); 770 771 /* 772 * Drop request 773 */ 774 void svc_drop(struct svc_rqst *rqstp) 775 { 776 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 777 svc_xprt_release(rqstp); 778 } 779 EXPORT_SYMBOL_GPL(svc_drop); 780 781 /* 782 * Return reply to client. 783 */ 784 int svc_send(struct svc_rqst *rqstp) 785 { 786 struct svc_xprt *xprt; 787 int len; 788 struct xdr_buf *xb; 789 790 xprt = rqstp->rq_xprt; 791 if (!xprt) 792 return -EFAULT; 793 794 /* release the receive skb before sending the reply */ 795 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 796 797 /* calculate over-all length */ 798 xb = &rqstp->rq_res; 799 xb->len = xb->head[0].iov_len + 800 xb->page_len + 801 xb->tail[0].iov_len; 802 803 /* Grab mutex to serialize outgoing data. */ 804 mutex_lock(&xprt->xpt_mutex); 805 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 806 len = -ENOTCONN; 807 else 808 len = xprt->xpt_ops->xpo_sendto(rqstp); 809 mutex_unlock(&xprt->xpt_mutex); 810 rpc_wake_up(&xprt->xpt_bc_pending); 811 svc_xprt_release(rqstp); 812 813 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 814 return 0; 815 return len; 816 } 817 818 /* 819 * Timer function to close old temporary transports, using 820 * a mark-and-sweep algorithm. 821 */ 822 static void svc_age_temp_xprts(unsigned long closure) 823 { 824 struct svc_serv *serv = (struct svc_serv *)closure; 825 struct svc_xprt *xprt; 826 struct list_head *le, *next; 827 LIST_HEAD(to_be_aged); 828 829 dprintk("svc_age_temp_xprts\n"); 830 831 if (!spin_trylock_bh(&serv->sv_lock)) { 832 /* busy, try again 1 sec later */ 833 dprintk("svc_age_temp_xprts: busy\n"); 834 mod_timer(&serv->sv_temptimer, jiffies + HZ); 835 return; 836 } 837 838 list_for_each_safe(le, next, &serv->sv_tempsocks) { 839 xprt = list_entry(le, struct svc_xprt, xpt_list); 840 841 /* First time through, just mark it OLD. Second time 842 * through, close it. */ 843 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 844 continue; 845 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 846 test_bit(XPT_BUSY, &xprt->xpt_flags)) 847 continue; 848 svc_xprt_get(xprt); 849 list_move(le, &to_be_aged); 850 set_bit(XPT_CLOSE, &xprt->xpt_flags); 851 set_bit(XPT_DETACHED, &xprt->xpt_flags); 852 } 853 spin_unlock_bh(&serv->sv_lock); 854 855 while (!list_empty(&to_be_aged)) { 856 le = to_be_aged.next; 857 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 858 list_del_init(le); 859 xprt = list_entry(le, struct svc_xprt, xpt_list); 860 861 dprintk("queuing xprt %p for closing\n", xprt); 862 863 /* a thread will dequeue and close it soon */ 864 svc_xprt_enqueue(xprt); 865 svc_xprt_put(xprt); 866 } 867 868 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 869 } 870 871 /* 872 * Remove a dead transport 873 */ 874 void svc_delete_xprt(struct svc_xprt *xprt) 875 { 876 struct svc_serv *serv = xprt->xpt_server; 877 struct svc_deferred_req *dr; 878 879 /* Only do this once */ 880 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 881 return; 882 883 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 884 xprt->xpt_ops->xpo_detach(xprt); 885 886 spin_lock_bh(&serv->sv_lock); 887 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 888 list_del_init(&xprt->xpt_list); 889 /* 890 * We used to delete the transport from whichever list 891 * it's sk_xprt.xpt_ready node was on, but we don't actually 892 * need to. This is because the only time we're called 893 * while still attached to a queue, the queue itself 894 * is about to be destroyed (in svc_destroy). 895 */ 896 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 897 serv->sv_tmpcnt--; 898 spin_unlock_bh(&serv->sv_lock); 899 900 while ((dr = svc_deferred_dequeue(xprt)) != NULL) 901 kfree(dr); 902 903 svc_xprt_put(xprt); 904 } 905 906 void svc_close_xprt(struct svc_xprt *xprt) 907 { 908 set_bit(XPT_CLOSE, &xprt->xpt_flags); 909 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 910 /* someone else will have to effect the close */ 911 return; 912 913 svc_xprt_get(xprt); 914 svc_delete_xprt(xprt); 915 clear_bit(XPT_BUSY, &xprt->xpt_flags); 916 svc_xprt_put(xprt); 917 } 918 EXPORT_SYMBOL_GPL(svc_close_xprt); 919 920 void svc_close_all(struct list_head *xprt_list) 921 { 922 struct svc_xprt *xprt; 923 struct svc_xprt *tmp; 924 925 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 926 set_bit(XPT_CLOSE, &xprt->xpt_flags); 927 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 928 /* Waiting to be processed, but no threads left, 929 * So just remove it from the waiting list 930 */ 931 list_del_init(&xprt->xpt_ready); 932 clear_bit(XPT_BUSY, &xprt->xpt_flags); 933 } 934 svc_close_xprt(xprt); 935 } 936 } 937 938 /* 939 * Handle defer and revisit of requests 940 */ 941 942 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 943 { 944 struct svc_deferred_req *dr = 945 container_of(dreq, struct svc_deferred_req, handle); 946 struct svc_xprt *xprt = dr->xprt; 947 948 spin_lock(&xprt->xpt_lock); 949 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 950 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 951 spin_unlock(&xprt->xpt_lock); 952 dprintk("revisit canceled\n"); 953 svc_xprt_put(xprt); 954 kfree(dr); 955 return; 956 } 957 dprintk("revisit queued\n"); 958 dr->xprt = NULL; 959 list_add(&dr->handle.recent, &xprt->xpt_deferred); 960 spin_unlock(&xprt->xpt_lock); 961 svc_xprt_enqueue(xprt); 962 svc_xprt_put(xprt); 963 } 964 965 /* 966 * Save the request off for later processing. The request buffer looks 967 * like this: 968 * 969 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 970 * 971 * This code can only handle requests that consist of an xprt-header 972 * and rpc-header. 973 */ 974 static struct cache_deferred_req *svc_defer(struct cache_req *req) 975 { 976 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 977 struct svc_deferred_req *dr; 978 979 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 980 return NULL; /* if more than a page, give up FIXME */ 981 if (rqstp->rq_deferred) { 982 dr = rqstp->rq_deferred; 983 rqstp->rq_deferred = NULL; 984 } else { 985 size_t skip; 986 size_t size; 987 /* FIXME maybe discard if size too large */ 988 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 989 dr = kmalloc(size, GFP_KERNEL); 990 if (dr == NULL) 991 return NULL; 992 993 dr->handle.owner = rqstp->rq_server; 994 dr->prot = rqstp->rq_prot; 995 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 996 dr->addrlen = rqstp->rq_addrlen; 997 dr->daddr = rqstp->rq_daddr; 998 dr->argslen = rqstp->rq_arg.len >> 2; 999 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1000 1001 /* back up head to the start of the buffer and copy */ 1002 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1003 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1004 dr->argslen << 2); 1005 } 1006 svc_xprt_get(rqstp->rq_xprt); 1007 dr->xprt = rqstp->rq_xprt; 1008 1009 dr->handle.revisit = svc_revisit; 1010 return &dr->handle; 1011 } 1012 1013 /* 1014 * recv data from a deferred request into an active one 1015 */ 1016 static int svc_deferred_recv(struct svc_rqst *rqstp) 1017 { 1018 struct svc_deferred_req *dr = rqstp->rq_deferred; 1019 1020 /* setup iov_base past transport header */ 1021 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1022 /* The iov_len does not include the transport header bytes */ 1023 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1024 rqstp->rq_arg.page_len = 0; 1025 /* The rq_arg.len includes the transport header bytes */ 1026 rqstp->rq_arg.len = dr->argslen<<2; 1027 rqstp->rq_prot = dr->prot; 1028 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1029 rqstp->rq_addrlen = dr->addrlen; 1030 /* Save off transport header len in case we get deferred again */ 1031 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1032 rqstp->rq_daddr = dr->daddr; 1033 rqstp->rq_respages = rqstp->rq_pages; 1034 return (dr->argslen<<2) - dr->xprt_hlen; 1035 } 1036 1037 1038 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1039 { 1040 struct svc_deferred_req *dr = NULL; 1041 1042 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1043 return NULL; 1044 spin_lock(&xprt->xpt_lock); 1045 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1046 if (!list_empty(&xprt->xpt_deferred)) { 1047 dr = list_entry(xprt->xpt_deferred.next, 1048 struct svc_deferred_req, 1049 handle.recent); 1050 list_del_init(&dr->handle.recent); 1051 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1052 } 1053 spin_unlock(&xprt->xpt_lock); 1054 return dr; 1055 } 1056 1057 /** 1058 * svc_find_xprt - find an RPC transport instance 1059 * @serv: pointer to svc_serv to search 1060 * @xcl_name: C string containing transport's class name 1061 * @af: Address family of transport's local address 1062 * @port: transport's IP port number 1063 * 1064 * Return the transport instance pointer for the endpoint accepting 1065 * connections/peer traffic from the specified transport class, 1066 * address family and port. 1067 * 1068 * Specifying 0 for the address family or port is effectively a 1069 * wild-card, and will result in matching the first transport in the 1070 * service's list that has a matching class name. 1071 */ 1072 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1073 const sa_family_t af, const unsigned short port) 1074 { 1075 struct svc_xprt *xprt; 1076 struct svc_xprt *found = NULL; 1077 1078 /* Sanity check the args */ 1079 if (serv == NULL || xcl_name == NULL) 1080 return found; 1081 1082 spin_lock_bh(&serv->sv_lock); 1083 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1084 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1085 continue; 1086 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1087 continue; 1088 if (port != 0 && port != svc_xprt_local_port(xprt)) 1089 continue; 1090 found = xprt; 1091 svc_xprt_get(xprt); 1092 break; 1093 } 1094 spin_unlock_bh(&serv->sv_lock); 1095 return found; 1096 } 1097 EXPORT_SYMBOL_GPL(svc_find_xprt); 1098 1099 static int svc_one_xprt_name(const struct svc_xprt *xprt, 1100 char *pos, int remaining) 1101 { 1102 int len; 1103 1104 len = snprintf(pos, remaining, "%s %u\n", 1105 xprt->xpt_class->xcl_name, 1106 svc_xprt_local_port(xprt)); 1107 if (len >= remaining) 1108 return -ENAMETOOLONG; 1109 return len; 1110 } 1111 1112 /** 1113 * svc_xprt_names - format a buffer with a list of transport names 1114 * @serv: pointer to an RPC service 1115 * @buf: pointer to a buffer to be filled in 1116 * @buflen: length of buffer to be filled in 1117 * 1118 * Fills in @buf with a string containing a list of transport names, 1119 * each name terminated with '\n'. 1120 * 1121 * Returns positive length of the filled-in string on success; otherwise 1122 * a negative errno value is returned if an error occurs. 1123 */ 1124 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1125 { 1126 struct svc_xprt *xprt; 1127 int len, totlen; 1128 char *pos; 1129 1130 /* Sanity check args */ 1131 if (!serv) 1132 return 0; 1133 1134 spin_lock_bh(&serv->sv_lock); 1135 1136 pos = buf; 1137 totlen = 0; 1138 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1139 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1140 if (len < 0) { 1141 *buf = '\0'; 1142 totlen = len; 1143 } 1144 if (len <= 0) 1145 break; 1146 1147 pos += len; 1148 totlen += len; 1149 } 1150 1151 spin_unlock_bh(&serv->sv_lock); 1152 return totlen; 1153 } 1154 EXPORT_SYMBOL_GPL(svc_xprt_names); 1155 1156 1157 /*----------------------------------------------------------------------------*/ 1158 1159 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1160 { 1161 unsigned int pidx = (unsigned int)*pos; 1162 struct svc_serv *serv = m->private; 1163 1164 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1165 1166 if (!pidx) 1167 return SEQ_START_TOKEN; 1168 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1169 } 1170 1171 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1172 { 1173 struct svc_pool *pool = p; 1174 struct svc_serv *serv = m->private; 1175 1176 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1177 1178 if (p == SEQ_START_TOKEN) { 1179 pool = &serv->sv_pools[0]; 1180 } else { 1181 unsigned int pidx = (pool - &serv->sv_pools[0]); 1182 if (pidx < serv->sv_nrpools-1) 1183 pool = &serv->sv_pools[pidx+1]; 1184 else 1185 pool = NULL; 1186 } 1187 ++*pos; 1188 return pool; 1189 } 1190 1191 static void svc_pool_stats_stop(struct seq_file *m, void *p) 1192 { 1193 } 1194 1195 static int svc_pool_stats_show(struct seq_file *m, void *p) 1196 { 1197 struct svc_pool *pool = p; 1198 1199 if (p == SEQ_START_TOKEN) { 1200 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1201 return 0; 1202 } 1203 1204 seq_printf(m, "%u %lu %lu %lu %lu\n", 1205 pool->sp_id, 1206 pool->sp_stats.packets, 1207 pool->sp_stats.sockets_queued, 1208 pool->sp_stats.threads_woken, 1209 pool->sp_stats.threads_timedout); 1210 1211 return 0; 1212 } 1213 1214 static const struct seq_operations svc_pool_stats_seq_ops = { 1215 .start = svc_pool_stats_start, 1216 .next = svc_pool_stats_next, 1217 .stop = svc_pool_stats_stop, 1218 .show = svc_pool_stats_show, 1219 }; 1220 1221 int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1222 { 1223 int err; 1224 1225 err = seq_open(file, &svc_pool_stats_seq_ops); 1226 if (!err) 1227 ((struct seq_file *) file->private_data)->private = serv; 1228 return err; 1229 } 1230 EXPORT_SYMBOL(svc_pool_stats_open); 1231 1232 /*----------------------------------------------------------------------------*/ 1233