1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/errno.h> 9 #include <linux/freezer.h> 10 #include <linux/kthread.h> 11 #include <linux/slab.h> 12 #include <net/sock.h> 13 #include <linux/sunrpc/stats.h> 14 #include <linux/sunrpc/svc_xprt.h> 15 #include <linux/sunrpc/svcsock.h> 16 #include <linux/sunrpc/xprt.h> 17 #include <linux/module.h> 18 19 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 20 21 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 22 static int svc_deferred_recv(struct svc_rqst *rqstp); 23 static struct cache_deferred_req *svc_defer(struct cache_req *req); 24 static void svc_age_temp_xprts(unsigned long closure); 25 26 /* apparently the "standard" is that clients close 27 * idle connections after 5 minutes, servers after 28 * 6 minutes 29 * http://www.connectathon.org/talks96/nfstcp.pdf 30 */ 31 static int svc_conn_age_period = 6*60; 32 33 /* List of registered transport classes */ 34 static DEFINE_SPINLOCK(svc_xprt_class_lock); 35 static LIST_HEAD(svc_xprt_class_list); 36 37 /* SMP locking strategy: 38 * 39 * svc_pool->sp_lock protects most of the fields of that pool. 40 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 41 * when both need to be taken (rare), svc_serv->sv_lock is first. 42 * BKL protects svc_serv->sv_nrthread. 43 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 44 * and the ->sk_info_authunix cache. 45 * 46 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 47 * enqueued multiply. During normal transport processing this bit 48 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 49 * Providers should not manipulate this bit directly. 50 * 51 * Some flags can be set to certain values at any time 52 * providing that certain rules are followed: 53 * 54 * XPT_CONN, XPT_DATA: 55 * - Can be set or cleared at any time. 56 * - After a set, svc_xprt_enqueue must be called to enqueue 57 * the transport for processing. 58 * - After a clear, the transport must be read/accepted. 59 * If this succeeds, it must be set again. 60 * XPT_CLOSE: 61 * - Can set at any time. It is never cleared. 62 * XPT_DEAD: 63 * - Can only be set while XPT_BUSY is held which ensures 64 * that no other thread will be using the transport or will 65 * try to set XPT_DEAD. 66 */ 67 68 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 69 { 70 struct svc_xprt_class *cl; 71 int res = -EEXIST; 72 73 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 74 75 INIT_LIST_HEAD(&xcl->xcl_list); 76 spin_lock(&svc_xprt_class_lock); 77 /* Make sure there isn't already a class with the same name */ 78 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 79 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 80 goto out; 81 } 82 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 83 res = 0; 84 out: 85 spin_unlock(&svc_xprt_class_lock); 86 return res; 87 } 88 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 89 90 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 91 { 92 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 93 spin_lock(&svc_xprt_class_lock); 94 list_del_init(&xcl->xcl_list); 95 spin_unlock(&svc_xprt_class_lock); 96 } 97 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 98 99 /* 100 * Format the transport list for printing 101 */ 102 int svc_print_xprts(char *buf, int maxlen) 103 { 104 struct svc_xprt_class *xcl; 105 char tmpstr[80]; 106 int len = 0; 107 buf[0] = '\0'; 108 109 spin_lock(&svc_xprt_class_lock); 110 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 111 int slen; 112 113 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 114 slen = strlen(tmpstr); 115 if (len + slen > maxlen) 116 break; 117 len += slen; 118 strcat(buf, tmpstr); 119 } 120 spin_unlock(&svc_xprt_class_lock); 121 122 return len; 123 } 124 125 static void svc_xprt_free(struct kref *kref) 126 { 127 struct svc_xprt *xprt = 128 container_of(kref, struct svc_xprt, xpt_ref); 129 struct module *owner = xprt->xpt_class->xcl_owner; 130 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) 131 svcauth_unix_info_release(xprt); 132 put_net(xprt->xpt_net); 133 /* See comment on corresponding get in xs_setup_bc_tcp(): */ 134 if (xprt->xpt_bc_xprt) 135 xprt_put(xprt->xpt_bc_xprt); 136 xprt->xpt_ops->xpo_free(xprt); 137 module_put(owner); 138 } 139 140 void svc_xprt_put(struct svc_xprt *xprt) 141 { 142 kref_put(&xprt->xpt_ref, svc_xprt_free); 143 } 144 EXPORT_SYMBOL_GPL(svc_xprt_put); 145 146 /* 147 * Called by transport drivers to initialize the transport independent 148 * portion of the transport instance. 149 */ 150 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 151 struct svc_serv *serv) 152 { 153 memset(xprt, 0, sizeof(*xprt)); 154 xprt->xpt_class = xcl; 155 xprt->xpt_ops = xcl->xcl_ops; 156 kref_init(&xprt->xpt_ref); 157 xprt->xpt_server = serv; 158 INIT_LIST_HEAD(&xprt->xpt_list); 159 INIT_LIST_HEAD(&xprt->xpt_ready); 160 INIT_LIST_HEAD(&xprt->xpt_deferred); 161 INIT_LIST_HEAD(&xprt->xpt_users); 162 mutex_init(&xprt->xpt_mutex); 163 spin_lock_init(&xprt->xpt_lock); 164 set_bit(XPT_BUSY, &xprt->xpt_flags); 165 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 166 xprt->xpt_net = get_net(&init_net); 167 } 168 EXPORT_SYMBOL_GPL(svc_xprt_init); 169 170 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 171 struct svc_serv *serv, 172 struct net *net, 173 const int family, 174 const unsigned short port, 175 int flags) 176 { 177 struct sockaddr_in sin = { 178 .sin_family = AF_INET, 179 .sin_addr.s_addr = htonl(INADDR_ANY), 180 .sin_port = htons(port), 181 }; 182 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 183 struct sockaddr_in6 sin6 = { 184 .sin6_family = AF_INET6, 185 .sin6_addr = IN6ADDR_ANY_INIT, 186 .sin6_port = htons(port), 187 }; 188 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 189 struct sockaddr *sap; 190 size_t len; 191 192 switch (family) { 193 case PF_INET: 194 sap = (struct sockaddr *)&sin; 195 len = sizeof(sin); 196 break; 197 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 198 case PF_INET6: 199 sap = (struct sockaddr *)&sin6; 200 len = sizeof(sin6); 201 break; 202 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 203 default: 204 return ERR_PTR(-EAFNOSUPPORT); 205 } 206 207 return xcl->xcl_ops->xpo_create(serv, net, sap, len, flags); 208 } 209 210 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 211 struct net *net, const int family, 212 const unsigned short port, int flags) 213 { 214 struct svc_xprt_class *xcl; 215 216 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 217 spin_lock(&svc_xprt_class_lock); 218 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 219 struct svc_xprt *newxprt; 220 unsigned short newport; 221 222 if (strcmp(xprt_name, xcl->xcl_name)) 223 continue; 224 225 if (!try_module_get(xcl->xcl_owner)) 226 goto err; 227 228 spin_unlock(&svc_xprt_class_lock); 229 newxprt = __svc_xpo_create(xcl, serv, net, family, port, flags); 230 if (IS_ERR(newxprt)) { 231 module_put(xcl->xcl_owner); 232 return PTR_ERR(newxprt); 233 } 234 235 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 236 spin_lock_bh(&serv->sv_lock); 237 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 238 spin_unlock_bh(&serv->sv_lock); 239 newport = svc_xprt_local_port(newxprt); 240 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 241 return newport; 242 } 243 err: 244 spin_unlock(&svc_xprt_class_lock); 245 dprintk("svc: transport %s not found\n", xprt_name); 246 247 /* This errno is exposed to user space. Provide a reasonable 248 * perror msg for a bad transport. */ 249 return -EPROTONOSUPPORT; 250 } 251 EXPORT_SYMBOL_GPL(svc_create_xprt); 252 253 /* 254 * Copy the local and remote xprt addresses to the rqstp structure 255 */ 256 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 257 { 258 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 259 rqstp->rq_addrlen = xprt->xpt_remotelen; 260 261 /* 262 * Destination address in request is needed for binding the 263 * source address in RPC replies/callbacks later. 264 */ 265 memcpy(&rqstp->rq_daddr, &xprt->xpt_local, xprt->xpt_locallen); 266 rqstp->rq_daddrlen = xprt->xpt_locallen; 267 } 268 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 269 270 /** 271 * svc_print_addr - Format rq_addr field for printing 272 * @rqstp: svc_rqst struct containing address to print 273 * @buf: target buffer for formatted address 274 * @len: length of target buffer 275 * 276 */ 277 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 278 { 279 return __svc_print_addr(svc_addr(rqstp), buf, len); 280 } 281 EXPORT_SYMBOL_GPL(svc_print_addr); 282 283 /* 284 * Queue up an idle server thread. Must have pool->sp_lock held. 285 * Note: this is really a stack rather than a queue, so that we only 286 * use as many different threads as we need, and the rest don't pollute 287 * the cache. 288 */ 289 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 290 { 291 list_add(&rqstp->rq_list, &pool->sp_threads); 292 } 293 294 /* 295 * Dequeue an nfsd thread. Must have pool->sp_lock held. 296 */ 297 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 298 { 299 list_del(&rqstp->rq_list); 300 } 301 302 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) 303 { 304 if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) 305 return true; 306 if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) 307 return xprt->xpt_ops->xpo_has_wspace(xprt); 308 return false; 309 } 310 311 /* 312 * Queue up a transport with data pending. If there are idle nfsd 313 * processes, wake 'em up. 314 * 315 */ 316 void svc_xprt_enqueue(struct svc_xprt *xprt) 317 { 318 struct svc_serv *serv = xprt->xpt_server; 319 struct svc_pool *pool; 320 struct svc_rqst *rqstp; 321 int cpu; 322 323 if (!svc_xprt_has_something_to_do(xprt)) 324 return; 325 326 cpu = get_cpu(); 327 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 328 put_cpu(); 329 330 spin_lock_bh(&pool->sp_lock); 331 332 if (!list_empty(&pool->sp_threads) && 333 !list_empty(&pool->sp_sockets)) 334 printk(KERN_ERR 335 "svc_xprt_enqueue: " 336 "threads and transports both waiting??\n"); 337 338 pool->sp_stats.packets++; 339 340 /* Mark transport as busy. It will remain in this state until 341 * the provider calls svc_xprt_received. We update XPT_BUSY 342 * atomically because it also guards against trying to enqueue 343 * the transport twice. 344 */ 345 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 346 /* Don't enqueue transport while already enqueued */ 347 dprintk("svc: transport %p busy, not enqueued\n", xprt); 348 goto out_unlock; 349 } 350 351 if (!list_empty(&pool->sp_threads)) { 352 rqstp = list_entry(pool->sp_threads.next, 353 struct svc_rqst, 354 rq_list); 355 dprintk("svc: transport %p served by daemon %p\n", 356 xprt, rqstp); 357 svc_thread_dequeue(pool, rqstp); 358 if (rqstp->rq_xprt) 359 printk(KERN_ERR 360 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 361 rqstp, rqstp->rq_xprt); 362 rqstp->rq_xprt = xprt; 363 svc_xprt_get(xprt); 364 rqstp->rq_reserved = serv->sv_max_mesg; 365 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 366 pool->sp_stats.threads_woken++; 367 wake_up(&rqstp->rq_wait); 368 } else { 369 dprintk("svc: transport %p put into queue\n", xprt); 370 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 371 pool->sp_stats.sockets_queued++; 372 } 373 374 out_unlock: 375 spin_unlock_bh(&pool->sp_lock); 376 } 377 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 378 379 /* 380 * Dequeue the first transport. Must be called with the pool->sp_lock held. 381 */ 382 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 383 { 384 struct svc_xprt *xprt; 385 386 if (list_empty(&pool->sp_sockets)) 387 return NULL; 388 389 xprt = list_entry(pool->sp_sockets.next, 390 struct svc_xprt, xpt_ready); 391 list_del_init(&xprt->xpt_ready); 392 393 dprintk("svc: transport %p dequeued, inuse=%d\n", 394 xprt, atomic_read(&xprt->xpt_ref.refcount)); 395 396 return xprt; 397 } 398 399 /* 400 * svc_xprt_received conditionally queues the transport for processing 401 * by another thread. The caller must hold the XPT_BUSY bit and must 402 * not thereafter touch transport data. 403 * 404 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 405 * insufficient) data. 406 */ 407 void svc_xprt_received(struct svc_xprt *xprt) 408 { 409 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 410 /* As soon as we clear busy, the xprt could be closed and 411 * 'put', so we need a reference to call svc_xprt_enqueue with: 412 */ 413 svc_xprt_get(xprt); 414 clear_bit(XPT_BUSY, &xprt->xpt_flags); 415 svc_xprt_enqueue(xprt); 416 svc_xprt_put(xprt); 417 } 418 EXPORT_SYMBOL_GPL(svc_xprt_received); 419 420 /** 421 * svc_reserve - change the space reserved for the reply to a request. 422 * @rqstp: The request in question 423 * @space: new max space to reserve 424 * 425 * Each request reserves some space on the output queue of the transport 426 * to make sure the reply fits. This function reduces that reserved 427 * space to be the amount of space used already, plus @space. 428 * 429 */ 430 void svc_reserve(struct svc_rqst *rqstp, int space) 431 { 432 space += rqstp->rq_res.head[0].iov_len; 433 434 if (space < rqstp->rq_reserved) { 435 struct svc_xprt *xprt = rqstp->rq_xprt; 436 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 437 rqstp->rq_reserved = space; 438 439 svc_xprt_enqueue(xprt); 440 } 441 } 442 EXPORT_SYMBOL_GPL(svc_reserve); 443 444 static void svc_xprt_release(struct svc_rqst *rqstp) 445 { 446 struct svc_xprt *xprt = rqstp->rq_xprt; 447 448 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 449 450 kfree(rqstp->rq_deferred); 451 rqstp->rq_deferred = NULL; 452 453 svc_free_res_pages(rqstp); 454 rqstp->rq_res.page_len = 0; 455 rqstp->rq_res.page_base = 0; 456 457 /* Reset response buffer and release 458 * the reservation. 459 * But first, check that enough space was reserved 460 * for the reply, otherwise we have a bug! 461 */ 462 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 463 printk(KERN_ERR "RPC request reserved %d but used %d\n", 464 rqstp->rq_reserved, 465 rqstp->rq_res.len); 466 467 rqstp->rq_res.head[0].iov_len = 0; 468 svc_reserve(rqstp, 0); 469 rqstp->rq_xprt = NULL; 470 471 svc_xprt_put(xprt); 472 } 473 474 /* 475 * External function to wake up a server waiting for data 476 * This really only makes sense for services like lockd 477 * which have exactly one thread anyway. 478 */ 479 void svc_wake_up(struct svc_serv *serv) 480 { 481 struct svc_rqst *rqstp; 482 unsigned int i; 483 struct svc_pool *pool; 484 485 for (i = 0; i < serv->sv_nrpools; i++) { 486 pool = &serv->sv_pools[i]; 487 488 spin_lock_bh(&pool->sp_lock); 489 if (!list_empty(&pool->sp_threads)) { 490 rqstp = list_entry(pool->sp_threads.next, 491 struct svc_rqst, 492 rq_list); 493 dprintk("svc: daemon %p woken up.\n", rqstp); 494 /* 495 svc_thread_dequeue(pool, rqstp); 496 rqstp->rq_xprt = NULL; 497 */ 498 wake_up(&rqstp->rq_wait); 499 } 500 spin_unlock_bh(&pool->sp_lock); 501 } 502 } 503 EXPORT_SYMBOL_GPL(svc_wake_up); 504 505 int svc_port_is_privileged(struct sockaddr *sin) 506 { 507 switch (sin->sa_family) { 508 case AF_INET: 509 return ntohs(((struct sockaddr_in *)sin)->sin_port) 510 < PROT_SOCK; 511 case AF_INET6: 512 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 513 < PROT_SOCK; 514 default: 515 return 0; 516 } 517 } 518 519 /* 520 * Make sure that we don't have too many active connections. If we have, 521 * something must be dropped. It's not clear what will happen if we allow 522 * "too many" connections, but when dealing with network-facing software, 523 * we have to code defensively. Here we do that by imposing hard limits. 524 * 525 * There's no point in trying to do random drop here for DoS 526 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 527 * attacker can easily beat that. 528 * 529 * The only somewhat efficient mechanism would be if drop old 530 * connections from the same IP first. But right now we don't even 531 * record the client IP in svc_sock. 532 * 533 * single-threaded services that expect a lot of clients will probably 534 * need to set sv_maxconn to override the default value which is based 535 * on the number of threads 536 */ 537 static void svc_check_conn_limits(struct svc_serv *serv) 538 { 539 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 540 (serv->sv_nrthreads+3) * 20; 541 542 if (serv->sv_tmpcnt > limit) { 543 struct svc_xprt *xprt = NULL; 544 spin_lock_bh(&serv->sv_lock); 545 if (!list_empty(&serv->sv_tempsocks)) { 546 if (net_ratelimit()) { 547 /* Try to help the admin */ 548 printk(KERN_NOTICE "%s: too many open " 549 "connections, consider increasing %s\n", 550 serv->sv_name, serv->sv_maxconn ? 551 "the max number of connections." : 552 "the number of threads."); 553 } 554 /* 555 * Always select the oldest connection. It's not fair, 556 * but so is life 557 */ 558 xprt = list_entry(serv->sv_tempsocks.prev, 559 struct svc_xprt, 560 xpt_list); 561 set_bit(XPT_CLOSE, &xprt->xpt_flags); 562 svc_xprt_get(xprt); 563 } 564 spin_unlock_bh(&serv->sv_lock); 565 566 if (xprt) { 567 svc_xprt_enqueue(xprt); 568 svc_xprt_put(xprt); 569 } 570 } 571 } 572 573 /* 574 * Receive the next request on any transport. This code is carefully 575 * organised not to touch any cachelines in the shared svc_serv 576 * structure, only cachelines in the local svc_pool. 577 */ 578 int svc_recv(struct svc_rqst *rqstp, long timeout) 579 { 580 struct svc_xprt *xprt = NULL; 581 struct svc_serv *serv = rqstp->rq_server; 582 struct svc_pool *pool = rqstp->rq_pool; 583 int len, i; 584 int pages; 585 struct xdr_buf *arg; 586 DECLARE_WAITQUEUE(wait, current); 587 long time_left; 588 589 dprintk("svc: server %p waiting for data (to = %ld)\n", 590 rqstp, timeout); 591 592 if (rqstp->rq_xprt) 593 printk(KERN_ERR 594 "svc_recv: service %p, transport not NULL!\n", 595 rqstp); 596 if (waitqueue_active(&rqstp->rq_wait)) 597 printk(KERN_ERR 598 "svc_recv: service %p, wait queue active!\n", 599 rqstp); 600 601 /* now allocate needed pages. If we get a failure, sleep briefly */ 602 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 603 for (i = 0; i < pages ; i++) 604 while (rqstp->rq_pages[i] == NULL) { 605 struct page *p = alloc_page(GFP_KERNEL); 606 if (!p) { 607 set_current_state(TASK_INTERRUPTIBLE); 608 if (signalled() || kthread_should_stop()) { 609 set_current_state(TASK_RUNNING); 610 return -EINTR; 611 } 612 schedule_timeout(msecs_to_jiffies(500)); 613 } 614 rqstp->rq_pages[i] = p; 615 } 616 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 617 BUG_ON(pages >= RPCSVC_MAXPAGES); 618 619 /* Make arg->head point to first page and arg->pages point to rest */ 620 arg = &rqstp->rq_arg; 621 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 622 arg->head[0].iov_len = PAGE_SIZE; 623 arg->pages = rqstp->rq_pages + 1; 624 arg->page_base = 0; 625 /* save at least one page for response */ 626 arg->page_len = (pages-2)*PAGE_SIZE; 627 arg->len = (pages-1)*PAGE_SIZE; 628 arg->tail[0].iov_len = 0; 629 630 try_to_freeze(); 631 cond_resched(); 632 if (signalled() || kthread_should_stop()) 633 return -EINTR; 634 635 /* Normally we will wait up to 5 seconds for any required 636 * cache information to be provided. 637 */ 638 rqstp->rq_chandle.thread_wait = 5*HZ; 639 640 spin_lock_bh(&pool->sp_lock); 641 xprt = svc_xprt_dequeue(pool); 642 if (xprt) { 643 rqstp->rq_xprt = xprt; 644 svc_xprt_get(xprt); 645 rqstp->rq_reserved = serv->sv_max_mesg; 646 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 647 648 /* As there is a shortage of threads and this request 649 * had to be queued, don't allow the thread to wait so 650 * long for cache updates. 651 */ 652 rqstp->rq_chandle.thread_wait = 1*HZ; 653 } else { 654 /* No data pending. Go to sleep */ 655 svc_thread_enqueue(pool, rqstp); 656 657 /* 658 * We have to be able to interrupt this wait 659 * to bring down the daemons ... 660 */ 661 set_current_state(TASK_INTERRUPTIBLE); 662 663 /* 664 * checking kthread_should_stop() here allows us to avoid 665 * locking and signalling when stopping kthreads that call 666 * svc_recv. If the thread has already been woken up, then 667 * we can exit here without sleeping. If not, then it 668 * it'll be woken up quickly during the schedule_timeout 669 */ 670 if (kthread_should_stop()) { 671 set_current_state(TASK_RUNNING); 672 spin_unlock_bh(&pool->sp_lock); 673 return -EINTR; 674 } 675 676 add_wait_queue(&rqstp->rq_wait, &wait); 677 spin_unlock_bh(&pool->sp_lock); 678 679 time_left = schedule_timeout(timeout); 680 681 try_to_freeze(); 682 683 spin_lock_bh(&pool->sp_lock); 684 remove_wait_queue(&rqstp->rq_wait, &wait); 685 if (!time_left) 686 pool->sp_stats.threads_timedout++; 687 688 xprt = rqstp->rq_xprt; 689 if (!xprt) { 690 svc_thread_dequeue(pool, rqstp); 691 spin_unlock_bh(&pool->sp_lock); 692 dprintk("svc: server %p, no data yet\n", rqstp); 693 if (signalled() || kthread_should_stop()) 694 return -EINTR; 695 else 696 return -EAGAIN; 697 } 698 } 699 spin_unlock_bh(&pool->sp_lock); 700 701 len = 0; 702 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 703 dprintk("svc_recv: found XPT_CLOSE\n"); 704 svc_delete_xprt(xprt); 705 /* Leave XPT_BUSY set on the dead xprt: */ 706 goto out; 707 } 708 if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 709 struct svc_xprt *newxpt; 710 newxpt = xprt->xpt_ops->xpo_accept(xprt); 711 if (newxpt) { 712 /* 713 * We know this module_get will succeed because the 714 * listener holds a reference too 715 */ 716 __module_get(newxpt->xpt_class->xcl_owner); 717 svc_check_conn_limits(xprt->xpt_server); 718 spin_lock_bh(&serv->sv_lock); 719 set_bit(XPT_TEMP, &newxpt->xpt_flags); 720 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 721 serv->sv_tmpcnt++; 722 if (serv->sv_temptimer.function == NULL) { 723 /* setup timer to age temp transports */ 724 setup_timer(&serv->sv_temptimer, 725 svc_age_temp_xprts, 726 (unsigned long)serv); 727 mod_timer(&serv->sv_temptimer, 728 jiffies + svc_conn_age_period * HZ); 729 } 730 spin_unlock_bh(&serv->sv_lock); 731 svc_xprt_received(newxpt); 732 } 733 } else if (xprt->xpt_ops->xpo_has_wspace(xprt)) { 734 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 735 rqstp, pool->sp_id, xprt, 736 atomic_read(&xprt->xpt_ref.refcount)); 737 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 738 if (rqstp->rq_deferred) 739 len = svc_deferred_recv(rqstp); 740 else 741 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 742 dprintk("svc: got len=%d\n", len); 743 } 744 svc_xprt_received(xprt); 745 746 /* No data, incomplete (TCP) read, or accept() */ 747 if (len == 0 || len == -EAGAIN) 748 goto out; 749 750 clear_bit(XPT_OLD, &xprt->xpt_flags); 751 752 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 753 rqstp->rq_chandle.defer = svc_defer; 754 755 if (serv->sv_stats) 756 serv->sv_stats->netcnt++; 757 return len; 758 out: 759 rqstp->rq_res.len = 0; 760 svc_xprt_release(rqstp); 761 return -EAGAIN; 762 } 763 EXPORT_SYMBOL_GPL(svc_recv); 764 765 /* 766 * Drop request 767 */ 768 void svc_drop(struct svc_rqst *rqstp) 769 { 770 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 771 svc_xprt_release(rqstp); 772 } 773 EXPORT_SYMBOL_GPL(svc_drop); 774 775 /* 776 * Return reply to client. 777 */ 778 int svc_send(struct svc_rqst *rqstp) 779 { 780 struct svc_xprt *xprt; 781 int len; 782 struct xdr_buf *xb; 783 784 xprt = rqstp->rq_xprt; 785 if (!xprt) 786 return -EFAULT; 787 788 /* release the receive skb before sending the reply */ 789 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 790 791 /* calculate over-all length */ 792 xb = &rqstp->rq_res; 793 xb->len = xb->head[0].iov_len + 794 xb->page_len + 795 xb->tail[0].iov_len; 796 797 /* Grab mutex to serialize outgoing data. */ 798 mutex_lock(&xprt->xpt_mutex); 799 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 800 len = -ENOTCONN; 801 else 802 len = xprt->xpt_ops->xpo_sendto(rqstp); 803 mutex_unlock(&xprt->xpt_mutex); 804 rpc_wake_up(&xprt->xpt_bc_pending); 805 svc_xprt_release(rqstp); 806 807 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 808 return 0; 809 return len; 810 } 811 812 /* 813 * Timer function to close old temporary transports, using 814 * a mark-and-sweep algorithm. 815 */ 816 static void svc_age_temp_xprts(unsigned long closure) 817 { 818 struct svc_serv *serv = (struct svc_serv *)closure; 819 struct svc_xprt *xprt; 820 struct list_head *le, *next; 821 LIST_HEAD(to_be_aged); 822 823 dprintk("svc_age_temp_xprts\n"); 824 825 if (!spin_trylock_bh(&serv->sv_lock)) { 826 /* busy, try again 1 sec later */ 827 dprintk("svc_age_temp_xprts: busy\n"); 828 mod_timer(&serv->sv_temptimer, jiffies + HZ); 829 return; 830 } 831 832 list_for_each_safe(le, next, &serv->sv_tempsocks) { 833 xprt = list_entry(le, struct svc_xprt, xpt_list); 834 835 /* First time through, just mark it OLD. Second time 836 * through, close it. */ 837 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 838 continue; 839 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 840 test_bit(XPT_BUSY, &xprt->xpt_flags)) 841 continue; 842 svc_xprt_get(xprt); 843 list_move(le, &to_be_aged); 844 set_bit(XPT_CLOSE, &xprt->xpt_flags); 845 set_bit(XPT_DETACHED, &xprt->xpt_flags); 846 } 847 spin_unlock_bh(&serv->sv_lock); 848 849 while (!list_empty(&to_be_aged)) { 850 le = to_be_aged.next; 851 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 852 list_del_init(le); 853 xprt = list_entry(le, struct svc_xprt, xpt_list); 854 855 dprintk("queuing xprt %p for closing\n", xprt); 856 857 /* a thread will dequeue and close it soon */ 858 svc_xprt_enqueue(xprt); 859 svc_xprt_put(xprt); 860 } 861 862 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 863 } 864 865 static void call_xpt_users(struct svc_xprt *xprt) 866 { 867 struct svc_xpt_user *u; 868 869 spin_lock(&xprt->xpt_lock); 870 while (!list_empty(&xprt->xpt_users)) { 871 u = list_first_entry(&xprt->xpt_users, struct svc_xpt_user, list); 872 list_del(&u->list); 873 u->callback(u); 874 } 875 spin_unlock(&xprt->xpt_lock); 876 } 877 878 /* 879 * Remove a dead transport 880 */ 881 void svc_delete_xprt(struct svc_xprt *xprt) 882 { 883 struct svc_serv *serv = xprt->xpt_server; 884 struct svc_deferred_req *dr; 885 886 /* Only do this once */ 887 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 888 BUG(); 889 890 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 891 xprt->xpt_ops->xpo_detach(xprt); 892 893 spin_lock_bh(&serv->sv_lock); 894 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 895 list_del_init(&xprt->xpt_list); 896 /* 897 * The only time we're called while xpt_ready is still on a list 898 * is while the list itself is about to be destroyed (in 899 * svc_destroy). BUT svc_xprt_enqueue could still be attempting 900 * to add new entries to the sp_sockets list, so we can't leave 901 * a freed xprt on it. 902 */ 903 list_del_init(&xprt->xpt_ready); 904 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 905 serv->sv_tmpcnt--; 906 spin_unlock_bh(&serv->sv_lock); 907 908 while ((dr = svc_deferred_dequeue(xprt)) != NULL) 909 kfree(dr); 910 911 call_xpt_users(xprt); 912 svc_xprt_put(xprt); 913 } 914 915 void svc_close_xprt(struct svc_xprt *xprt) 916 { 917 set_bit(XPT_CLOSE, &xprt->xpt_flags); 918 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 919 /* someone else will have to effect the close */ 920 return; 921 /* 922 * We expect svc_close_xprt() to work even when no threads are 923 * running (e.g., while configuring the server before starting 924 * any threads), so if the transport isn't busy, we delete 925 * it ourself: 926 */ 927 svc_delete_xprt(xprt); 928 } 929 EXPORT_SYMBOL_GPL(svc_close_xprt); 930 931 void svc_close_all(struct list_head *xprt_list) 932 { 933 struct svc_xprt *xprt; 934 struct svc_xprt *tmp; 935 936 /* 937 * The server is shutting down, and no more threads are running. 938 * svc_xprt_enqueue() might still be running, but at worst it 939 * will re-add the xprt to sp_sockets, which will soon get 940 * freed. So we don't bother with any more locking, and don't 941 * leave the close to the (nonexistent) server threads: 942 */ 943 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 944 set_bit(XPT_CLOSE, &xprt->xpt_flags); 945 svc_delete_xprt(xprt); 946 } 947 } 948 949 /* 950 * Handle defer and revisit of requests 951 */ 952 953 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 954 { 955 struct svc_deferred_req *dr = 956 container_of(dreq, struct svc_deferred_req, handle); 957 struct svc_xprt *xprt = dr->xprt; 958 959 spin_lock(&xprt->xpt_lock); 960 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 961 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 962 spin_unlock(&xprt->xpt_lock); 963 dprintk("revisit canceled\n"); 964 svc_xprt_put(xprt); 965 kfree(dr); 966 return; 967 } 968 dprintk("revisit queued\n"); 969 dr->xprt = NULL; 970 list_add(&dr->handle.recent, &xprt->xpt_deferred); 971 spin_unlock(&xprt->xpt_lock); 972 svc_xprt_enqueue(xprt); 973 svc_xprt_put(xprt); 974 } 975 976 /* 977 * Save the request off for later processing. The request buffer looks 978 * like this: 979 * 980 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 981 * 982 * This code can only handle requests that consist of an xprt-header 983 * and rpc-header. 984 */ 985 static struct cache_deferred_req *svc_defer(struct cache_req *req) 986 { 987 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 988 struct svc_deferred_req *dr; 989 990 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 991 return NULL; /* if more than a page, give up FIXME */ 992 if (rqstp->rq_deferred) { 993 dr = rqstp->rq_deferred; 994 rqstp->rq_deferred = NULL; 995 } else { 996 size_t skip; 997 size_t size; 998 /* FIXME maybe discard if size too large */ 999 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 1000 dr = kmalloc(size, GFP_KERNEL); 1001 if (dr == NULL) 1002 return NULL; 1003 1004 dr->handle.owner = rqstp->rq_server; 1005 dr->prot = rqstp->rq_prot; 1006 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 1007 dr->addrlen = rqstp->rq_addrlen; 1008 dr->daddr = rqstp->rq_daddr; 1009 dr->argslen = rqstp->rq_arg.len >> 2; 1010 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1011 1012 /* back up head to the start of the buffer and copy */ 1013 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1014 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1015 dr->argslen << 2); 1016 } 1017 svc_xprt_get(rqstp->rq_xprt); 1018 dr->xprt = rqstp->rq_xprt; 1019 rqstp->rq_dropme = true; 1020 1021 dr->handle.revisit = svc_revisit; 1022 return &dr->handle; 1023 } 1024 1025 /* 1026 * recv data from a deferred request into an active one 1027 */ 1028 static int svc_deferred_recv(struct svc_rqst *rqstp) 1029 { 1030 struct svc_deferred_req *dr = rqstp->rq_deferred; 1031 1032 /* setup iov_base past transport header */ 1033 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1034 /* The iov_len does not include the transport header bytes */ 1035 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1036 rqstp->rq_arg.page_len = 0; 1037 /* The rq_arg.len includes the transport header bytes */ 1038 rqstp->rq_arg.len = dr->argslen<<2; 1039 rqstp->rq_prot = dr->prot; 1040 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1041 rqstp->rq_addrlen = dr->addrlen; 1042 /* Save off transport header len in case we get deferred again */ 1043 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1044 rqstp->rq_daddr = dr->daddr; 1045 rqstp->rq_respages = rqstp->rq_pages; 1046 return (dr->argslen<<2) - dr->xprt_hlen; 1047 } 1048 1049 1050 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1051 { 1052 struct svc_deferred_req *dr = NULL; 1053 1054 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1055 return NULL; 1056 spin_lock(&xprt->xpt_lock); 1057 if (!list_empty(&xprt->xpt_deferred)) { 1058 dr = list_entry(xprt->xpt_deferred.next, 1059 struct svc_deferred_req, 1060 handle.recent); 1061 list_del_init(&dr->handle.recent); 1062 } else 1063 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1064 spin_unlock(&xprt->xpt_lock); 1065 return dr; 1066 } 1067 1068 /** 1069 * svc_find_xprt - find an RPC transport instance 1070 * @serv: pointer to svc_serv to search 1071 * @xcl_name: C string containing transport's class name 1072 * @af: Address family of transport's local address 1073 * @port: transport's IP port number 1074 * 1075 * Return the transport instance pointer for the endpoint accepting 1076 * connections/peer traffic from the specified transport class, 1077 * address family and port. 1078 * 1079 * Specifying 0 for the address family or port is effectively a 1080 * wild-card, and will result in matching the first transport in the 1081 * service's list that has a matching class name. 1082 */ 1083 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1084 const sa_family_t af, const unsigned short port) 1085 { 1086 struct svc_xprt *xprt; 1087 struct svc_xprt *found = NULL; 1088 1089 /* Sanity check the args */ 1090 if (serv == NULL || xcl_name == NULL) 1091 return found; 1092 1093 spin_lock_bh(&serv->sv_lock); 1094 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1095 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1096 continue; 1097 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1098 continue; 1099 if (port != 0 && port != svc_xprt_local_port(xprt)) 1100 continue; 1101 found = xprt; 1102 svc_xprt_get(xprt); 1103 break; 1104 } 1105 spin_unlock_bh(&serv->sv_lock); 1106 return found; 1107 } 1108 EXPORT_SYMBOL_GPL(svc_find_xprt); 1109 1110 static int svc_one_xprt_name(const struct svc_xprt *xprt, 1111 char *pos, int remaining) 1112 { 1113 int len; 1114 1115 len = snprintf(pos, remaining, "%s %u\n", 1116 xprt->xpt_class->xcl_name, 1117 svc_xprt_local_port(xprt)); 1118 if (len >= remaining) 1119 return -ENAMETOOLONG; 1120 return len; 1121 } 1122 1123 /** 1124 * svc_xprt_names - format a buffer with a list of transport names 1125 * @serv: pointer to an RPC service 1126 * @buf: pointer to a buffer to be filled in 1127 * @buflen: length of buffer to be filled in 1128 * 1129 * Fills in @buf with a string containing a list of transport names, 1130 * each name terminated with '\n'. 1131 * 1132 * Returns positive length of the filled-in string on success; otherwise 1133 * a negative errno value is returned if an error occurs. 1134 */ 1135 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1136 { 1137 struct svc_xprt *xprt; 1138 int len, totlen; 1139 char *pos; 1140 1141 /* Sanity check args */ 1142 if (!serv) 1143 return 0; 1144 1145 spin_lock_bh(&serv->sv_lock); 1146 1147 pos = buf; 1148 totlen = 0; 1149 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1150 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1151 if (len < 0) { 1152 *buf = '\0'; 1153 totlen = len; 1154 } 1155 if (len <= 0) 1156 break; 1157 1158 pos += len; 1159 totlen += len; 1160 } 1161 1162 spin_unlock_bh(&serv->sv_lock); 1163 return totlen; 1164 } 1165 EXPORT_SYMBOL_GPL(svc_xprt_names); 1166 1167 1168 /*----------------------------------------------------------------------------*/ 1169 1170 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1171 { 1172 unsigned int pidx = (unsigned int)*pos; 1173 struct svc_serv *serv = m->private; 1174 1175 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1176 1177 if (!pidx) 1178 return SEQ_START_TOKEN; 1179 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1180 } 1181 1182 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1183 { 1184 struct svc_pool *pool = p; 1185 struct svc_serv *serv = m->private; 1186 1187 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1188 1189 if (p == SEQ_START_TOKEN) { 1190 pool = &serv->sv_pools[0]; 1191 } else { 1192 unsigned int pidx = (pool - &serv->sv_pools[0]); 1193 if (pidx < serv->sv_nrpools-1) 1194 pool = &serv->sv_pools[pidx+1]; 1195 else 1196 pool = NULL; 1197 } 1198 ++*pos; 1199 return pool; 1200 } 1201 1202 static void svc_pool_stats_stop(struct seq_file *m, void *p) 1203 { 1204 } 1205 1206 static int svc_pool_stats_show(struct seq_file *m, void *p) 1207 { 1208 struct svc_pool *pool = p; 1209 1210 if (p == SEQ_START_TOKEN) { 1211 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1212 return 0; 1213 } 1214 1215 seq_printf(m, "%u %lu %lu %lu %lu\n", 1216 pool->sp_id, 1217 pool->sp_stats.packets, 1218 pool->sp_stats.sockets_queued, 1219 pool->sp_stats.threads_woken, 1220 pool->sp_stats.threads_timedout); 1221 1222 return 0; 1223 } 1224 1225 static const struct seq_operations svc_pool_stats_seq_ops = { 1226 .start = svc_pool_stats_start, 1227 .next = svc_pool_stats_next, 1228 .stop = svc_pool_stats_stop, 1229 .show = svc_pool_stats_show, 1230 }; 1231 1232 int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1233 { 1234 int err; 1235 1236 err = seq_open(file, &svc_pool_stats_seq_ops); 1237 if (!err) 1238 ((struct seq_file *) file->private_data)->private = serv; 1239 return err; 1240 } 1241 EXPORT_SYMBOL(svc_pool_stats_open); 1242 1243 /*----------------------------------------------------------------------------*/ 1244