1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/smp_lock.h> 9 #include <linux/errno.h> 10 #include <linux/freezer.h> 11 #include <linux/kthread.h> 12 #include <net/sock.h> 13 #include <linux/sunrpc/stats.h> 14 #include <linux/sunrpc/svc_xprt.h> 15 #include <linux/sunrpc/svcsock.h> 16 17 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 18 19 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 20 static int svc_deferred_recv(struct svc_rqst *rqstp); 21 static struct cache_deferred_req *svc_defer(struct cache_req *req); 22 static void svc_age_temp_xprts(unsigned long closure); 23 24 /* apparently the "standard" is that clients close 25 * idle connections after 5 minutes, servers after 26 * 6 minutes 27 * http://www.connectathon.org/talks96/nfstcp.pdf 28 */ 29 static int svc_conn_age_period = 6*60; 30 31 /* List of registered transport classes */ 32 static DEFINE_SPINLOCK(svc_xprt_class_lock); 33 static LIST_HEAD(svc_xprt_class_list); 34 35 /* SMP locking strategy: 36 * 37 * svc_pool->sp_lock protects most of the fields of that pool. 38 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 39 * when both need to be taken (rare), svc_serv->sv_lock is first. 40 * BKL protects svc_serv->sv_nrthread. 41 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 42 * and the ->sk_info_authunix cache. 43 * 44 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 45 * enqueued multiply. During normal transport processing this bit 46 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 47 * Providers should not manipulate this bit directly. 48 * 49 * Some flags can be set to certain values at any time 50 * providing that certain rules are followed: 51 * 52 * XPT_CONN, XPT_DATA: 53 * - Can be set or cleared at any time. 54 * - After a set, svc_xprt_enqueue must be called to enqueue 55 * the transport for processing. 56 * - After a clear, the transport must be read/accepted. 57 * If this succeeds, it must be set again. 58 * XPT_CLOSE: 59 * - Can set at any time. It is never cleared. 60 * XPT_DEAD: 61 * - Can only be set while XPT_BUSY is held which ensures 62 * that no other thread will be using the transport or will 63 * try to set XPT_DEAD. 64 */ 65 66 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 67 { 68 struct svc_xprt_class *cl; 69 int res = -EEXIST; 70 71 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 72 73 INIT_LIST_HEAD(&xcl->xcl_list); 74 spin_lock(&svc_xprt_class_lock); 75 /* Make sure there isn't already a class with the same name */ 76 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 77 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 78 goto out; 79 } 80 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 81 res = 0; 82 out: 83 spin_unlock(&svc_xprt_class_lock); 84 return res; 85 } 86 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 87 88 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 89 { 90 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 91 spin_lock(&svc_xprt_class_lock); 92 list_del_init(&xcl->xcl_list); 93 spin_unlock(&svc_xprt_class_lock); 94 } 95 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 96 97 /* 98 * Format the transport list for printing 99 */ 100 int svc_print_xprts(char *buf, int maxlen) 101 { 102 struct list_head *le; 103 char tmpstr[80]; 104 int len = 0; 105 buf[0] = '\0'; 106 107 spin_lock(&svc_xprt_class_lock); 108 list_for_each(le, &svc_xprt_class_list) { 109 int slen; 110 struct svc_xprt_class *xcl = 111 list_entry(le, struct svc_xprt_class, xcl_list); 112 113 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 114 slen = strlen(tmpstr); 115 if (len + slen > maxlen) 116 break; 117 len += slen; 118 strcat(buf, tmpstr); 119 } 120 spin_unlock(&svc_xprt_class_lock); 121 122 return len; 123 } 124 125 static void svc_xprt_free(struct kref *kref) 126 { 127 struct svc_xprt *xprt = 128 container_of(kref, struct svc_xprt, xpt_ref); 129 struct module *owner = xprt->xpt_class->xcl_owner; 130 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags) && 131 xprt->xpt_auth_cache != NULL) 132 svcauth_unix_info_release(xprt->xpt_auth_cache); 133 xprt->xpt_ops->xpo_free(xprt); 134 module_put(owner); 135 } 136 137 void svc_xprt_put(struct svc_xprt *xprt) 138 { 139 kref_put(&xprt->xpt_ref, svc_xprt_free); 140 } 141 EXPORT_SYMBOL_GPL(svc_xprt_put); 142 143 /* 144 * Called by transport drivers to initialize the transport independent 145 * portion of the transport instance. 146 */ 147 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 148 struct svc_serv *serv) 149 { 150 memset(xprt, 0, sizeof(*xprt)); 151 xprt->xpt_class = xcl; 152 xprt->xpt_ops = xcl->xcl_ops; 153 kref_init(&xprt->xpt_ref); 154 xprt->xpt_server = serv; 155 INIT_LIST_HEAD(&xprt->xpt_list); 156 INIT_LIST_HEAD(&xprt->xpt_ready); 157 INIT_LIST_HEAD(&xprt->xpt_deferred); 158 mutex_init(&xprt->xpt_mutex); 159 spin_lock_init(&xprt->xpt_lock); 160 set_bit(XPT_BUSY, &xprt->xpt_flags); 161 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 162 } 163 EXPORT_SYMBOL_GPL(svc_xprt_init); 164 165 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 166 struct svc_serv *serv, 167 const int family, 168 const unsigned short port, 169 int flags) 170 { 171 struct sockaddr_in sin = { 172 .sin_family = AF_INET, 173 .sin_addr.s_addr = htonl(INADDR_ANY), 174 .sin_port = htons(port), 175 }; 176 struct sockaddr_in6 sin6 = { 177 .sin6_family = AF_INET6, 178 .sin6_addr = IN6ADDR_ANY_INIT, 179 .sin6_port = htons(port), 180 }; 181 struct sockaddr *sap; 182 size_t len; 183 184 switch (family) { 185 case PF_INET: 186 sap = (struct sockaddr *)&sin; 187 len = sizeof(sin); 188 break; 189 case PF_INET6: 190 sap = (struct sockaddr *)&sin6; 191 len = sizeof(sin6); 192 break; 193 default: 194 return ERR_PTR(-EAFNOSUPPORT); 195 } 196 197 return xcl->xcl_ops->xpo_create(serv, sap, len, flags); 198 } 199 200 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 201 const int family, const unsigned short port, 202 int flags) 203 { 204 struct svc_xprt_class *xcl; 205 206 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 207 spin_lock(&svc_xprt_class_lock); 208 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 209 struct svc_xprt *newxprt; 210 211 if (strcmp(xprt_name, xcl->xcl_name)) 212 continue; 213 214 if (!try_module_get(xcl->xcl_owner)) 215 goto err; 216 217 spin_unlock(&svc_xprt_class_lock); 218 newxprt = __svc_xpo_create(xcl, serv, family, port, flags); 219 if (IS_ERR(newxprt)) { 220 module_put(xcl->xcl_owner); 221 return PTR_ERR(newxprt); 222 } 223 224 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 225 spin_lock_bh(&serv->sv_lock); 226 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 227 spin_unlock_bh(&serv->sv_lock); 228 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 229 return svc_xprt_local_port(newxprt); 230 } 231 err: 232 spin_unlock(&svc_xprt_class_lock); 233 dprintk("svc: transport %s not found\n", xprt_name); 234 return -ENOENT; 235 } 236 EXPORT_SYMBOL_GPL(svc_create_xprt); 237 238 /* 239 * Copy the local and remote xprt addresses to the rqstp structure 240 */ 241 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 242 { 243 struct sockaddr *sin; 244 245 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 246 rqstp->rq_addrlen = xprt->xpt_remotelen; 247 248 /* 249 * Destination address in request is needed for binding the 250 * source address in RPC replies/callbacks later. 251 */ 252 sin = (struct sockaddr *)&xprt->xpt_local; 253 switch (sin->sa_family) { 254 case AF_INET: 255 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 256 break; 257 case AF_INET6: 258 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 259 break; 260 } 261 } 262 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 263 264 /** 265 * svc_print_addr - Format rq_addr field for printing 266 * @rqstp: svc_rqst struct containing address to print 267 * @buf: target buffer for formatted address 268 * @len: length of target buffer 269 * 270 */ 271 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 272 { 273 return __svc_print_addr(svc_addr(rqstp), buf, len); 274 } 275 EXPORT_SYMBOL_GPL(svc_print_addr); 276 277 /* 278 * Queue up an idle server thread. Must have pool->sp_lock held. 279 * Note: this is really a stack rather than a queue, so that we only 280 * use as many different threads as we need, and the rest don't pollute 281 * the cache. 282 */ 283 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 284 { 285 list_add(&rqstp->rq_list, &pool->sp_threads); 286 } 287 288 /* 289 * Dequeue an nfsd thread. Must have pool->sp_lock held. 290 */ 291 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 292 { 293 list_del(&rqstp->rq_list); 294 } 295 296 /* 297 * Queue up a transport with data pending. If there are idle nfsd 298 * processes, wake 'em up. 299 * 300 */ 301 void svc_xprt_enqueue(struct svc_xprt *xprt) 302 { 303 struct svc_serv *serv = xprt->xpt_server; 304 struct svc_pool *pool; 305 struct svc_rqst *rqstp; 306 int cpu; 307 308 if (!(xprt->xpt_flags & 309 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 310 return; 311 312 cpu = get_cpu(); 313 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 314 put_cpu(); 315 316 spin_lock_bh(&pool->sp_lock); 317 318 if (!list_empty(&pool->sp_threads) && 319 !list_empty(&pool->sp_sockets)) 320 printk(KERN_ERR 321 "svc_xprt_enqueue: " 322 "threads and transports both waiting??\n"); 323 324 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) { 325 /* Don't enqueue dead transports */ 326 dprintk("svc: transport %p is dead, not enqueued\n", xprt); 327 goto out_unlock; 328 } 329 330 pool->sp_stats.packets++; 331 332 /* Mark transport as busy. It will remain in this state until 333 * the provider calls svc_xprt_received. We update XPT_BUSY 334 * atomically because it also guards against trying to enqueue 335 * the transport twice. 336 */ 337 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 338 /* Don't enqueue transport while already enqueued */ 339 dprintk("svc: transport %p busy, not enqueued\n", xprt); 340 goto out_unlock; 341 } 342 BUG_ON(xprt->xpt_pool != NULL); 343 xprt->xpt_pool = pool; 344 345 /* Handle pending connection */ 346 if (test_bit(XPT_CONN, &xprt->xpt_flags)) 347 goto process; 348 349 /* Handle close in-progress */ 350 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 351 goto process; 352 353 /* Check if we have space to reply to a request */ 354 if (!xprt->xpt_ops->xpo_has_wspace(xprt)) { 355 /* Don't enqueue while not enough space for reply */ 356 dprintk("svc: no write space, transport %p not enqueued\n", 357 xprt); 358 xprt->xpt_pool = NULL; 359 clear_bit(XPT_BUSY, &xprt->xpt_flags); 360 goto out_unlock; 361 } 362 363 process: 364 if (!list_empty(&pool->sp_threads)) { 365 rqstp = list_entry(pool->sp_threads.next, 366 struct svc_rqst, 367 rq_list); 368 dprintk("svc: transport %p served by daemon %p\n", 369 xprt, rqstp); 370 svc_thread_dequeue(pool, rqstp); 371 if (rqstp->rq_xprt) 372 printk(KERN_ERR 373 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 374 rqstp, rqstp->rq_xprt); 375 rqstp->rq_xprt = xprt; 376 svc_xprt_get(xprt); 377 rqstp->rq_reserved = serv->sv_max_mesg; 378 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 379 pool->sp_stats.threads_woken++; 380 BUG_ON(xprt->xpt_pool != pool); 381 wake_up(&rqstp->rq_wait); 382 } else { 383 dprintk("svc: transport %p put into queue\n", xprt); 384 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 385 pool->sp_stats.sockets_queued++; 386 BUG_ON(xprt->xpt_pool != pool); 387 } 388 389 out_unlock: 390 spin_unlock_bh(&pool->sp_lock); 391 } 392 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 393 394 /* 395 * Dequeue the first transport. Must be called with the pool->sp_lock held. 396 */ 397 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 398 { 399 struct svc_xprt *xprt; 400 401 if (list_empty(&pool->sp_sockets)) 402 return NULL; 403 404 xprt = list_entry(pool->sp_sockets.next, 405 struct svc_xprt, xpt_ready); 406 list_del_init(&xprt->xpt_ready); 407 408 dprintk("svc: transport %p dequeued, inuse=%d\n", 409 xprt, atomic_read(&xprt->xpt_ref.refcount)); 410 411 return xprt; 412 } 413 414 /* 415 * svc_xprt_received conditionally queues the transport for processing 416 * by another thread. The caller must hold the XPT_BUSY bit and must 417 * not thereafter touch transport data. 418 * 419 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 420 * insufficient) data. 421 */ 422 void svc_xprt_received(struct svc_xprt *xprt) 423 { 424 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 425 xprt->xpt_pool = NULL; 426 clear_bit(XPT_BUSY, &xprt->xpt_flags); 427 svc_xprt_enqueue(xprt); 428 } 429 EXPORT_SYMBOL_GPL(svc_xprt_received); 430 431 /** 432 * svc_reserve - change the space reserved for the reply to a request. 433 * @rqstp: The request in question 434 * @space: new max space to reserve 435 * 436 * Each request reserves some space on the output queue of the transport 437 * to make sure the reply fits. This function reduces that reserved 438 * space to be the amount of space used already, plus @space. 439 * 440 */ 441 void svc_reserve(struct svc_rqst *rqstp, int space) 442 { 443 space += rqstp->rq_res.head[0].iov_len; 444 445 if (space < rqstp->rq_reserved) { 446 struct svc_xprt *xprt = rqstp->rq_xprt; 447 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 448 rqstp->rq_reserved = space; 449 450 svc_xprt_enqueue(xprt); 451 } 452 } 453 EXPORT_SYMBOL_GPL(svc_reserve); 454 455 static void svc_xprt_release(struct svc_rqst *rqstp) 456 { 457 struct svc_xprt *xprt = rqstp->rq_xprt; 458 459 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 460 461 kfree(rqstp->rq_deferred); 462 rqstp->rq_deferred = NULL; 463 464 svc_free_res_pages(rqstp); 465 rqstp->rq_res.page_len = 0; 466 rqstp->rq_res.page_base = 0; 467 468 /* Reset response buffer and release 469 * the reservation. 470 * But first, check that enough space was reserved 471 * for the reply, otherwise we have a bug! 472 */ 473 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 474 printk(KERN_ERR "RPC request reserved %d but used %d\n", 475 rqstp->rq_reserved, 476 rqstp->rq_res.len); 477 478 rqstp->rq_res.head[0].iov_len = 0; 479 svc_reserve(rqstp, 0); 480 rqstp->rq_xprt = NULL; 481 482 svc_xprt_put(xprt); 483 } 484 485 /* 486 * External function to wake up a server waiting for data 487 * This really only makes sense for services like lockd 488 * which have exactly one thread anyway. 489 */ 490 void svc_wake_up(struct svc_serv *serv) 491 { 492 struct svc_rqst *rqstp; 493 unsigned int i; 494 struct svc_pool *pool; 495 496 for (i = 0; i < serv->sv_nrpools; i++) { 497 pool = &serv->sv_pools[i]; 498 499 spin_lock_bh(&pool->sp_lock); 500 if (!list_empty(&pool->sp_threads)) { 501 rqstp = list_entry(pool->sp_threads.next, 502 struct svc_rqst, 503 rq_list); 504 dprintk("svc: daemon %p woken up.\n", rqstp); 505 /* 506 svc_thread_dequeue(pool, rqstp); 507 rqstp->rq_xprt = NULL; 508 */ 509 wake_up(&rqstp->rq_wait); 510 } 511 spin_unlock_bh(&pool->sp_lock); 512 } 513 } 514 EXPORT_SYMBOL_GPL(svc_wake_up); 515 516 int svc_port_is_privileged(struct sockaddr *sin) 517 { 518 switch (sin->sa_family) { 519 case AF_INET: 520 return ntohs(((struct sockaddr_in *)sin)->sin_port) 521 < PROT_SOCK; 522 case AF_INET6: 523 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 524 < PROT_SOCK; 525 default: 526 return 0; 527 } 528 } 529 530 /* 531 * Make sure that we don't have too many active connections. If we have, 532 * something must be dropped. It's not clear what will happen if we allow 533 * "too many" connections, but when dealing with network-facing software, 534 * we have to code defensively. Here we do that by imposing hard limits. 535 * 536 * There's no point in trying to do random drop here for DoS 537 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 538 * attacker can easily beat that. 539 * 540 * The only somewhat efficient mechanism would be if drop old 541 * connections from the same IP first. But right now we don't even 542 * record the client IP in svc_sock. 543 * 544 * single-threaded services that expect a lot of clients will probably 545 * need to set sv_maxconn to override the default value which is based 546 * on the number of threads 547 */ 548 static void svc_check_conn_limits(struct svc_serv *serv) 549 { 550 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 551 (serv->sv_nrthreads+3) * 20; 552 553 if (serv->sv_tmpcnt > limit) { 554 struct svc_xprt *xprt = NULL; 555 spin_lock_bh(&serv->sv_lock); 556 if (!list_empty(&serv->sv_tempsocks)) { 557 if (net_ratelimit()) { 558 /* Try to help the admin */ 559 printk(KERN_NOTICE "%s: too many open " 560 "connections, consider increasing %s\n", 561 serv->sv_name, serv->sv_maxconn ? 562 "the max number of connections." : 563 "the number of threads."); 564 } 565 /* 566 * Always select the oldest connection. It's not fair, 567 * but so is life 568 */ 569 xprt = list_entry(serv->sv_tempsocks.prev, 570 struct svc_xprt, 571 xpt_list); 572 set_bit(XPT_CLOSE, &xprt->xpt_flags); 573 svc_xprt_get(xprt); 574 } 575 spin_unlock_bh(&serv->sv_lock); 576 577 if (xprt) { 578 svc_xprt_enqueue(xprt); 579 svc_xprt_put(xprt); 580 } 581 } 582 } 583 584 /* 585 * Receive the next request on any transport. This code is carefully 586 * organised not to touch any cachelines in the shared svc_serv 587 * structure, only cachelines in the local svc_pool. 588 */ 589 int svc_recv(struct svc_rqst *rqstp, long timeout) 590 { 591 struct svc_xprt *xprt = NULL; 592 struct svc_serv *serv = rqstp->rq_server; 593 struct svc_pool *pool = rqstp->rq_pool; 594 int len, i; 595 int pages; 596 struct xdr_buf *arg; 597 DECLARE_WAITQUEUE(wait, current); 598 long time_left; 599 600 dprintk("svc: server %p waiting for data (to = %ld)\n", 601 rqstp, timeout); 602 603 if (rqstp->rq_xprt) 604 printk(KERN_ERR 605 "svc_recv: service %p, transport not NULL!\n", 606 rqstp); 607 if (waitqueue_active(&rqstp->rq_wait)) 608 printk(KERN_ERR 609 "svc_recv: service %p, wait queue active!\n", 610 rqstp); 611 612 /* now allocate needed pages. If we get a failure, sleep briefly */ 613 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 614 for (i = 0; i < pages ; i++) 615 while (rqstp->rq_pages[i] == NULL) { 616 struct page *p = alloc_page(GFP_KERNEL); 617 if (!p) { 618 set_current_state(TASK_INTERRUPTIBLE); 619 if (signalled() || kthread_should_stop()) { 620 set_current_state(TASK_RUNNING); 621 return -EINTR; 622 } 623 schedule_timeout(msecs_to_jiffies(500)); 624 } 625 rqstp->rq_pages[i] = p; 626 } 627 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 628 BUG_ON(pages >= RPCSVC_MAXPAGES); 629 630 /* Make arg->head point to first page and arg->pages point to rest */ 631 arg = &rqstp->rq_arg; 632 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 633 arg->head[0].iov_len = PAGE_SIZE; 634 arg->pages = rqstp->rq_pages + 1; 635 arg->page_base = 0; 636 /* save at least one page for response */ 637 arg->page_len = (pages-2)*PAGE_SIZE; 638 arg->len = (pages-1)*PAGE_SIZE; 639 arg->tail[0].iov_len = 0; 640 641 try_to_freeze(); 642 cond_resched(); 643 if (signalled() || kthread_should_stop()) 644 return -EINTR; 645 646 spin_lock_bh(&pool->sp_lock); 647 xprt = svc_xprt_dequeue(pool); 648 if (xprt) { 649 rqstp->rq_xprt = xprt; 650 svc_xprt_get(xprt); 651 rqstp->rq_reserved = serv->sv_max_mesg; 652 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 653 } else { 654 /* No data pending. Go to sleep */ 655 svc_thread_enqueue(pool, rqstp); 656 657 /* 658 * We have to be able to interrupt this wait 659 * to bring down the daemons ... 660 */ 661 set_current_state(TASK_INTERRUPTIBLE); 662 663 /* 664 * checking kthread_should_stop() here allows us to avoid 665 * locking and signalling when stopping kthreads that call 666 * svc_recv. If the thread has already been woken up, then 667 * we can exit here without sleeping. If not, then it 668 * it'll be woken up quickly during the schedule_timeout 669 */ 670 if (kthread_should_stop()) { 671 set_current_state(TASK_RUNNING); 672 spin_unlock_bh(&pool->sp_lock); 673 return -EINTR; 674 } 675 676 add_wait_queue(&rqstp->rq_wait, &wait); 677 spin_unlock_bh(&pool->sp_lock); 678 679 time_left = schedule_timeout(timeout); 680 681 try_to_freeze(); 682 683 spin_lock_bh(&pool->sp_lock); 684 remove_wait_queue(&rqstp->rq_wait, &wait); 685 if (!time_left) 686 pool->sp_stats.threads_timedout++; 687 688 xprt = rqstp->rq_xprt; 689 if (!xprt) { 690 svc_thread_dequeue(pool, rqstp); 691 spin_unlock_bh(&pool->sp_lock); 692 dprintk("svc: server %p, no data yet\n", rqstp); 693 if (signalled() || kthread_should_stop()) 694 return -EINTR; 695 else 696 return -EAGAIN; 697 } 698 } 699 spin_unlock_bh(&pool->sp_lock); 700 701 len = 0; 702 if (test_bit(XPT_LISTENER, &xprt->xpt_flags) && 703 !test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 704 struct svc_xprt *newxpt; 705 newxpt = xprt->xpt_ops->xpo_accept(xprt); 706 if (newxpt) { 707 /* 708 * We know this module_get will succeed because the 709 * listener holds a reference too 710 */ 711 __module_get(newxpt->xpt_class->xcl_owner); 712 svc_check_conn_limits(xprt->xpt_server); 713 spin_lock_bh(&serv->sv_lock); 714 set_bit(XPT_TEMP, &newxpt->xpt_flags); 715 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 716 serv->sv_tmpcnt++; 717 if (serv->sv_temptimer.function == NULL) { 718 /* setup timer to age temp transports */ 719 setup_timer(&serv->sv_temptimer, 720 svc_age_temp_xprts, 721 (unsigned long)serv); 722 mod_timer(&serv->sv_temptimer, 723 jiffies + svc_conn_age_period * HZ); 724 } 725 spin_unlock_bh(&serv->sv_lock); 726 svc_xprt_received(newxpt); 727 } 728 svc_xprt_received(xprt); 729 } else if (!test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 730 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 731 rqstp, pool->sp_id, xprt, 732 atomic_read(&xprt->xpt_ref.refcount)); 733 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 734 if (rqstp->rq_deferred) { 735 svc_xprt_received(xprt); 736 len = svc_deferred_recv(rqstp); 737 } else 738 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 739 dprintk("svc: got len=%d\n", len); 740 } 741 742 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 743 dprintk("svc_recv: found XPT_CLOSE\n"); 744 svc_delete_xprt(xprt); 745 } 746 747 /* No data, incomplete (TCP) read, or accept() */ 748 if (len == 0 || len == -EAGAIN) { 749 rqstp->rq_res.len = 0; 750 svc_xprt_release(rqstp); 751 return -EAGAIN; 752 } 753 clear_bit(XPT_OLD, &xprt->xpt_flags); 754 755 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 756 rqstp->rq_chandle.defer = svc_defer; 757 758 if (serv->sv_stats) 759 serv->sv_stats->netcnt++; 760 return len; 761 } 762 EXPORT_SYMBOL_GPL(svc_recv); 763 764 /* 765 * Drop request 766 */ 767 void svc_drop(struct svc_rqst *rqstp) 768 { 769 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 770 svc_xprt_release(rqstp); 771 } 772 EXPORT_SYMBOL_GPL(svc_drop); 773 774 /* 775 * Return reply to client. 776 */ 777 int svc_send(struct svc_rqst *rqstp) 778 { 779 struct svc_xprt *xprt; 780 int len; 781 struct xdr_buf *xb; 782 783 xprt = rqstp->rq_xprt; 784 if (!xprt) 785 return -EFAULT; 786 787 /* release the receive skb before sending the reply */ 788 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 789 790 /* calculate over-all length */ 791 xb = &rqstp->rq_res; 792 xb->len = xb->head[0].iov_len + 793 xb->page_len + 794 xb->tail[0].iov_len; 795 796 /* Grab mutex to serialize outgoing data. */ 797 mutex_lock(&xprt->xpt_mutex); 798 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 799 len = -ENOTCONN; 800 else 801 len = xprt->xpt_ops->xpo_sendto(rqstp); 802 mutex_unlock(&xprt->xpt_mutex); 803 rpc_wake_up(&xprt->xpt_bc_pending); 804 svc_xprt_release(rqstp); 805 806 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 807 return 0; 808 return len; 809 } 810 811 /* 812 * Timer function to close old temporary transports, using 813 * a mark-and-sweep algorithm. 814 */ 815 static void svc_age_temp_xprts(unsigned long closure) 816 { 817 struct svc_serv *serv = (struct svc_serv *)closure; 818 struct svc_xprt *xprt; 819 struct list_head *le, *next; 820 LIST_HEAD(to_be_aged); 821 822 dprintk("svc_age_temp_xprts\n"); 823 824 if (!spin_trylock_bh(&serv->sv_lock)) { 825 /* busy, try again 1 sec later */ 826 dprintk("svc_age_temp_xprts: busy\n"); 827 mod_timer(&serv->sv_temptimer, jiffies + HZ); 828 return; 829 } 830 831 list_for_each_safe(le, next, &serv->sv_tempsocks) { 832 xprt = list_entry(le, struct svc_xprt, xpt_list); 833 834 /* First time through, just mark it OLD. Second time 835 * through, close it. */ 836 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 837 continue; 838 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 839 test_bit(XPT_BUSY, &xprt->xpt_flags)) 840 continue; 841 svc_xprt_get(xprt); 842 list_move(le, &to_be_aged); 843 set_bit(XPT_CLOSE, &xprt->xpt_flags); 844 set_bit(XPT_DETACHED, &xprt->xpt_flags); 845 } 846 spin_unlock_bh(&serv->sv_lock); 847 848 while (!list_empty(&to_be_aged)) { 849 le = to_be_aged.next; 850 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 851 list_del_init(le); 852 xprt = list_entry(le, struct svc_xprt, xpt_list); 853 854 dprintk("queuing xprt %p for closing\n", xprt); 855 856 /* a thread will dequeue and close it soon */ 857 svc_xprt_enqueue(xprt); 858 svc_xprt_put(xprt); 859 } 860 861 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 862 } 863 864 /* 865 * Remove a dead transport 866 */ 867 void svc_delete_xprt(struct svc_xprt *xprt) 868 { 869 struct svc_serv *serv = xprt->xpt_server; 870 struct svc_deferred_req *dr; 871 872 /* Only do this once */ 873 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 874 return; 875 876 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 877 xprt->xpt_ops->xpo_detach(xprt); 878 879 spin_lock_bh(&serv->sv_lock); 880 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 881 list_del_init(&xprt->xpt_list); 882 /* 883 * We used to delete the transport from whichever list 884 * it's sk_xprt.xpt_ready node was on, but we don't actually 885 * need to. This is because the only time we're called 886 * while still attached to a queue, the queue itself 887 * is about to be destroyed (in svc_destroy). 888 */ 889 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 890 serv->sv_tmpcnt--; 891 892 for (dr = svc_deferred_dequeue(xprt); dr; 893 dr = svc_deferred_dequeue(xprt)) { 894 svc_xprt_put(xprt); 895 kfree(dr); 896 } 897 898 svc_xprt_put(xprt); 899 spin_unlock_bh(&serv->sv_lock); 900 } 901 902 void svc_close_xprt(struct svc_xprt *xprt) 903 { 904 set_bit(XPT_CLOSE, &xprt->xpt_flags); 905 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 906 /* someone else will have to effect the close */ 907 return; 908 909 svc_xprt_get(xprt); 910 svc_delete_xprt(xprt); 911 clear_bit(XPT_BUSY, &xprt->xpt_flags); 912 svc_xprt_put(xprt); 913 } 914 EXPORT_SYMBOL_GPL(svc_close_xprt); 915 916 void svc_close_all(struct list_head *xprt_list) 917 { 918 struct svc_xprt *xprt; 919 struct svc_xprt *tmp; 920 921 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 922 set_bit(XPT_CLOSE, &xprt->xpt_flags); 923 if (test_bit(XPT_BUSY, &xprt->xpt_flags)) { 924 /* Waiting to be processed, but no threads left, 925 * So just remove it from the waiting list 926 */ 927 list_del_init(&xprt->xpt_ready); 928 clear_bit(XPT_BUSY, &xprt->xpt_flags); 929 } 930 svc_close_xprt(xprt); 931 } 932 } 933 934 /* 935 * Handle defer and revisit of requests 936 */ 937 938 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 939 { 940 struct svc_deferred_req *dr = 941 container_of(dreq, struct svc_deferred_req, handle); 942 struct svc_xprt *xprt = dr->xprt; 943 944 spin_lock(&xprt->xpt_lock); 945 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 946 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 947 spin_unlock(&xprt->xpt_lock); 948 dprintk("revisit canceled\n"); 949 svc_xprt_put(xprt); 950 kfree(dr); 951 return; 952 } 953 dprintk("revisit queued\n"); 954 dr->xprt = NULL; 955 list_add(&dr->handle.recent, &xprt->xpt_deferred); 956 spin_unlock(&xprt->xpt_lock); 957 svc_xprt_enqueue(xprt); 958 svc_xprt_put(xprt); 959 } 960 961 /* 962 * Save the request off for later processing. The request buffer looks 963 * like this: 964 * 965 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 966 * 967 * This code can only handle requests that consist of an xprt-header 968 * and rpc-header. 969 */ 970 static struct cache_deferred_req *svc_defer(struct cache_req *req) 971 { 972 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 973 struct svc_deferred_req *dr; 974 975 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 976 return NULL; /* if more than a page, give up FIXME */ 977 if (rqstp->rq_deferred) { 978 dr = rqstp->rq_deferred; 979 rqstp->rq_deferred = NULL; 980 } else { 981 size_t skip; 982 size_t size; 983 /* FIXME maybe discard if size too large */ 984 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 985 dr = kmalloc(size, GFP_KERNEL); 986 if (dr == NULL) 987 return NULL; 988 989 dr->handle.owner = rqstp->rq_server; 990 dr->prot = rqstp->rq_prot; 991 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 992 dr->addrlen = rqstp->rq_addrlen; 993 dr->daddr = rqstp->rq_daddr; 994 dr->argslen = rqstp->rq_arg.len >> 2; 995 dr->xprt_hlen = rqstp->rq_xprt_hlen; 996 997 /* back up head to the start of the buffer and copy */ 998 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 999 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1000 dr->argslen << 2); 1001 } 1002 svc_xprt_get(rqstp->rq_xprt); 1003 dr->xprt = rqstp->rq_xprt; 1004 1005 dr->handle.revisit = svc_revisit; 1006 return &dr->handle; 1007 } 1008 1009 /* 1010 * recv data from a deferred request into an active one 1011 */ 1012 static int svc_deferred_recv(struct svc_rqst *rqstp) 1013 { 1014 struct svc_deferred_req *dr = rqstp->rq_deferred; 1015 1016 /* setup iov_base past transport header */ 1017 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1018 /* The iov_len does not include the transport header bytes */ 1019 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1020 rqstp->rq_arg.page_len = 0; 1021 /* The rq_arg.len includes the transport header bytes */ 1022 rqstp->rq_arg.len = dr->argslen<<2; 1023 rqstp->rq_prot = dr->prot; 1024 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1025 rqstp->rq_addrlen = dr->addrlen; 1026 /* Save off transport header len in case we get deferred again */ 1027 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1028 rqstp->rq_daddr = dr->daddr; 1029 rqstp->rq_respages = rqstp->rq_pages; 1030 return (dr->argslen<<2) - dr->xprt_hlen; 1031 } 1032 1033 1034 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1035 { 1036 struct svc_deferred_req *dr = NULL; 1037 1038 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1039 return NULL; 1040 spin_lock(&xprt->xpt_lock); 1041 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1042 if (!list_empty(&xprt->xpt_deferred)) { 1043 dr = list_entry(xprt->xpt_deferred.next, 1044 struct svc_deferred_req, 1045 handle.recent); 1046 list_del_init(&dr->handle.recent); 1047 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 1048 } 1049 spin_unlock(&xprt->xpt_lock); 1050 return dr; 1051 } 1052 1053 /** 1054 * svc_find_xprt - find an RPC transport instance 1055 * @serv: pointer to svc_serv to search 1056 * @xcl_name: C string containing transport's class name 1057 * @af: Address family of transport's local address 1058 * @port: transport's IP port number 1059 * 1060 * Return the transport instance pointer for the endpoint accepting 1061 * connections/peer traffic from the specified transport class, 1062 * address family and port. 1063 * 1064 * Specifying 0 for the address family or port is effectively a 1065 * wild-card, and will result in matching the first transport in the 1066 * service's list that has a matching class name. 1067 */ 1068 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1069 const sa_family_t af, const unsigned short port) 1070 { 1071 struct svc_xprt *xprt; 1072 struct svc_xprt *found = NULL; 1073 1074 /* Sanity check the args */ 1075 if (serv == NULL || xcl_name == NULL) 1076 return found; 1077 1078 spin_lock_bh(&serv->sv_lock); 1079 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1080 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1081 continue; 1082 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1083 continue; 1084 if (port != 0 && port != svc_xprt_local_port(xprt)) 1085 continue; 1086 found = xprt; 1087 svc_xprt_get(xprt); 1088 break; 1089 } 1090 spin_unlock_bh(&serv->sv_lock); 1091 return found; 1092 } 1093 EXPORT_SYMBOL_GPL(svc_find_xprt); 1094 1095 static int svc_one_xprt_name(const struct svc_xprt *xprt, 1096 char *pos, int remaining) 1097 { 1098 int len; 1099 1100 len = snprintf(pos, remaining, "%s %u\n", 1101 xprt->xpt_class->xcl_name, 1102 svc_xprt_local_port(xprt)); 1103 if (len >= remaining) 1104 return -ENAMETOOLONG; 1105 return len; 1106 } 1107 1108 /** 1109 * svc_xprt_names - format a buffer with a list of transport names 1110 * @serv: pointer to an RPC service 1111 * @buf: pointer to a buffer to be filled in 1112 * @buflen: length of buffer to be filled in 1113 * 1114 * Fills in @buf with a string containing a list of transport names, 1115 * each name terminated with '\n'. 1116 * 1117 * Returns positive length of the filled-in string on success; otherwise 1118 * a negative errno value is returned if an error occurs. 1119 */ 1120 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1121 { 1122 struct svc_xprt *xprt; 1123 int len, totlen; 1124 char *pos; 1125 1126 /* Sanity check args */ 1127 if (!serv) 1128 return 0; 1129 1130 spin_lock_bh(&serv->sv_lock); 1131 1132 pos = buf; 1133 totlen = 0; 1134 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1135 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1136 if (len < 0) { 1137 *buf = '\0'; 1138 totlen = len; 1139 } 1140 if (len <= 0) 1141 break; 1142 1143 pos += len; 1144 totlen += len; 1145 } 1146 1147 spin_unlock_bh(&serv->sv_lock); 1148 return totlen; 1149 } 1150 EXPORT_SYMBOL_GPL(svc_xprt_names); 1151 1152 1153 /*----------------------------------------------------------------------------*/ 1154 1155 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1156 { 1157 unsigned int pidx = (unsigned int)*pos; 1158 struct svc_serv *serv = m->private; 1159 1160 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1161 1162 if (!pidx) 1163 return SEQ_START_TOKEN; 1164 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1165 } 1166 1167 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1168 { 1169 struct svc_pool *pool = p; 1170 struct svc_serv *serv = m->private; 1171 1172 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1173 1174 if (p == SEQ_START_TOKEN) { 1175 pool = &serv->sv_pools[0]; 1176 } else { 1177 unsigned int pidx = (pool - &serv->sv_pools[0]); 1178 if (pidx < serv->sv_nrpools-1) 1179 pool = &serv->sv_pools[pidx+1]; 1180 else 1181 pool = NULL; 1182 } 1183 ++*pos; 1184 return pool; 1185 } 1186 1187 static void svc_pool_stats_stop(struct seq_file *m, void *p) 1188 { 1189 } 1190 1191 static int svc_pool_stats_show(struct seq_file *m, void *p) 1192 { 1193 struct svc_pool *pool = p; 1194 1195 if (p == SEQ_START_TOKEN) { 1196 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1197 return 0; 1198 } 1199 1200 seq_printf(m, "%u %lu %lu %lu %lu\n", 1201 pool->sp_id, 1202 pool->sp_stats.packets, 1203 pool->sp_stats.sockets_queued, 1204 pool->sp_stats.threads_woken, 1205 pool->sp_stats.threads_timedout); 1206 1207 return 0; 1208 } 1209 1210 static const struct seq_operations svc_pool_stats_seq_ops = { 1211 .start = svc_pool_stats_start, 1212 .next = svc_pool_stats_next, 1213 .stop = svc_pool_stats_stop, 1214 .show = svc_pool_stats_show, 1215 }; 1216 1217 int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1218 { 1219 int err; 1220 1221 err = seq_open(file, &svc_pool_stats_seq_ops); 1222 if (!err) 1223 ((struct seq_file *) file->private_data)->private = serv; 1224 return err; 1225 } 1226 EXPORT_SYMBOL(svc_pool_stats_open); 1227 1228 /*----------------------------------------------------------------------------*/ 1229