1 /* 2 * linux/net/sunrpc/svc_xprt.c 3 * 4 * Author: Tom Tucker <tom@opengridcomputing.com> 5 */ 6 7 #include <linux/sched.h> 8 #include <linux/errno.h> 9 #include <linux/freezer.h> 10 #include <linux/kthread.h> 11 #include <linux/slab.h> 12 #include <net/sock.h> 13 #include <linux/sunrpc/stats.h> 14 #include <linux/sunrpc/svc_xprt.h> 15 #include <linux/sunrpc/svcsock.h> 16 #include <linux/sunrpc/xprt.h> 17 18 #define RPCDBG_FACILITY RPCDBG_SVCXPRT 19 20 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); 21 static int svc_deferred_recv(struct svc_rqst *rqstp); 22 static struct cache_deferred_req *svc_defer(struct cache_req *req); 23 static void svc_age_temp_xprts(unsigned long closure); 24 25 /* apparently the "standard" is that clients close 26 * idle connections after 5 minutes, servers after 27 * 6 minutes 28 * http://www.connectathon.org/talks96/nfstcp.pdf 29 */ 30 static int svc_conn_age_period = 6*60; 31 32 /* List of registered transport classes */ 33 static DEFINE_SPINLOCK(svc_xprt_class_lock); 34 static LIST_HEAD(svc_xprt_class_list); 35 36 /* SMP locking strategy: 37 * 38 * svc_pool->sp_lock protects most of the fields of that pool. 39 * svc_serv->sv_lock protects sv_tempsocks, sv_permsocks, sv_tmpcnt. 40 * when both need to be taken (rare), svc_serv->sv_lock is first. 41 * BKL protects svc_serv->sv_nrthread. 42 * svc_sock->sk_lock protects the svc_sock->sk_deferred list 43 * and the ->sk_info_authunix cache. 44 * 45 * The XPT_BUSY bit in xprt->xpt_flags prevents a transport being 46 * enqueued multiply. During normal transport processing this bit 47 * is set by svc_xprt_enqueue and cleared by svc_xprt_received. 48 * Providers should not manipulate this bit directly. 49 * 50 * Some flags can be set to certain values at any time 51 * providing that certain rules are followed: 52 * 53 * XPT_CONN, XPT_DATA: 54 * - Can be set or cleared at any time. 55 * - After a set, svc_xprt_enqueue must be called to enqueue 56 * the transport for processing. 57 * - After a clear, the transport must be read/accepted. 58 * If this succeeds, it must be set again. 59 * XPT_CLOSE: 60 * - Can set at any time. It is never cleared. 61 * XPT_DEAD: 62 * - Can only be set while XPT_BUSY is held which ensures 63 * that no other thread will be using the transport or will 64 * try to set XPT_DEAD. 65 */ 66 67 int svc_reg_xprt_class(struct svc_xprt_class *xcl) 68 { 69 struct svc_xprt_class *cl; 70 int res = -EEXIST; 71 72 dprintk("svc: Adding svc transport class '%s'\n", xcl->xcl_name); 73 74 INIT_LIST_HEAD(&xcl->xcl_list); 75 spin_lock(&svc_xprt_class_lock); 76 /* Make sure there isn't already a class with the same name */ 77 list_for_each_entry(cl, &svc_xprt_class_list, xcl_list) { 78 if (strcmp(xcl->xcl_name, cl->xcl_name) == 0) 79 goto out; 80 } 81 list_add_tail(&xcl->xcl_list, &svc_xprt_class_list); 82 res = 0; 83 out: 84 spin_unlock(&svc_xprt_class_lock); 85 return res; 86 } 87 EXPORT_SYMBOL_GPL(svc_reg_xprt_class); 88 89 void svc_unreg_xprt_class(struct svc_xprt_class *xcl) 90 { 91 dprintk("svc: Removing svc transport class '%s'\n", xcl->xcl_name); 92 spin_lock(&svc_xprt_class_lock); 93 list_del_init(&xcl->xcl_list); 94 spin_unlock(&svc_xprt_class_lock); 95 } 96 EXPORT_SYMBOL_GPL(svc_unreg_xprt_class); 97 98 /* 99 * Format the transport list for printing 100 */ 101 int svc_print_xprts(char *buf, int maxlen) 102 { 103 struct svc_xprt_class *xcl; 104 char tmpstr[80]; 105 int len = 0; 106 buf[0] = '\0'; 107 108 spin_lock(&svc_xprt_class_lock); 109 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 110 int slen; 111 112 sprintf(tmpstr, "%s %d\n", xcl->xcl_name, xcl->xcl_max_payload); 113 slen = strlen(tmpstr); 114 if (len + slen > maxlen) 115 break; 116 len += slen; 117 strcat(buf, tmpstr); 118 } 119 spin_unlock(&svc_xprt_class_lock); 120 121 return len; 122 } 123 124 static void svc_xprt_free(struct kref *kref) 125 { 126 struct svc_xprt *xprt = 127 container_of(kref, struct svc_xprt, xpt_ref); 128 struct module *owner = xprt->xpt_class->xcl_owner; 129 if (test_bit(XPT_CACHE_AUTH, &xprt->xpt_flags)) 130 svcauth_unix_info_release(xprt); 131 put_net(xprt->xpt_net); 132 /* See comment on corresponding get in xs_setup_bc_tcp(): */ 133 if (xprt->xpt_bc_xprt) 134 xprt_put(xprt->xpt_bc_xprt); 135 xprt->xpt_ops->xpo_free(xprt); 136 module_put(owner); 137 } 138 139 void svc_xprt_put(struct svc_xprt *xprt) 140 { 141 kref_put(&xprt->xpt_ref, svc_xprt_free); 142 } 143 EXPORT_SYMBOL_GPL(svc_xprt_put); 144 145 /* 146 * Called by transport drivers to initialize the transport independent 147 * portion of the transport instance. 148 */ 149 void svc_xprt_init(struct svc_xprt_class *xcl, struct svc_xprt *xprt, 150 struct svc_serv *serv) 151 { 152 memset(xprt, 0, sizeof(*xprt)); 153 xprt->xpt_class = xcl; 154 xprt->xpt_ops = xcl->xcl_ops; 155 kref_init(&xprt->xpt_ref); 156 xprt->xpt_server = serv; 157 INIT_LIST_HEAD(&xprt->xpt_list); 158 INIT_LIST_HEAD(&xprt->xpt_ready); 159 INIT_LIST_HEAD(&xprt->xpt_deferred); 160 INIT_LIST_HEAD(&xprt->xpt_users); 161 mutex_init(&xprt->xpt_mutex); 162 spin_lock_init(&xprt->xpt_lock); 163 set_bit(XPT_BUSY, &xprt->xpt_flags); 164 rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending"); 165 xprt->xpt_net = get_net(&init_net); 166 } 167 EXPORT_SYMBOL_GPL(svc_xprt_init); 168 169 static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl, 170 struct svc_serv *serv, 171 struct net *net, 172 const int family, 173 const unsigned short port, 174 int flags) 175 { 176 struct sockaddr_in sin = { 177 .sin_family = AF_INET, 178 .sin_addr.s_addr = htonl(INADDR_ANY), 179 .sin_port = htons(port), 180 }; 181 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 182 struct sockaddr_in6 sin6 = { 183 .sin6_family = AF_INET6, 184 .sin6_addr = IN6ADDR_ANY_INIT, 185 .sin6_port = htons(port), 186 }; 187 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 188 struct sockaddr *sap; 189 size_t len; 190 191 switch (family) { 192 case PF_INET: 193 sap = (struct sockaddr *)&sin; 194 len = sizeof(sin); 195 break; 196 #if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) 197 case PF_INET6: 198 sap = (struct sockaddr *)&sin6; 199 len = sizeof(sin6); 200 break; 201 #endif /* defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE) */ 202 default: 203 return ERR_PTR(-EAFNOSUPPORT); 204 } 205 206 return xcl->xcl_ops->xpo_create(serv, net, sap, len, flags); 207 } 208 209 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, 210 struct net *net, const int family, 211 const unsigned short port, int flags) 212 { 213 struct svc_xprt_class *xcl; 214 215 dprintk("svc: creating transport %s[%d]\n", xprt_name, port); 216 spin_lock(&svc_xprt_class_lock); 217 list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { 218 struct svc_xprt *newxprt; 219 unsigned short newport; 220 221 if (strcmp(xprt_name, xcl->xcl_name)) 222 continue; 223 224 if (!try_module_get(xcl->xcl_owner)) 225 goto err; 226 227 spin_unlock(&svc_xprt_class_lock); 228 newxprt = __svc_xpo_create(xcl, serv, net, family, port, flags); 229 if (IS_ERR(newxprt)) { 230 module_put(xcl->xcl_owner); 231 return PTR_ERR(newxprt); 232 } 233 234 clear_bit(XPT_TEMP, &newxprt->xpt_flags); 235 spin_lock_bh(&serv->sv_lock); 236 list_add(&newxprt->xpt_list, &serv->sv_permsocks); 237 spin_unlock_bh(&serv->sv_lock); 238 newport = svc_xprt_local_port(newxprt); 239 clear_bit(XPT_BUSY, &newxprt->xpt_flags); 240 return newport; 241 } 242 err: 243 spin_unlock(&svc_xprt_class_lock); 244 dprintk("svc: transport %s not found\n", xprt_name); 245 246 /* This errno is exposed to user space. Provide a reasonable 247 * perror msg for a bad transport. */ 248 return -EPROTONOSUPPORT; 249 } 250 EXPORT_SYMBOL_GPL(svc_create_xprt); 251 252 /* 253 * Copy the local and remote xprt addresses to the rqstp structure 254 */ 255 void svc_xprt_copy_addrs(struct svc_rqst *rqstp, struct svc_xprt *xprt) 256 { 257 struct sockaddr *sin; 258 259 memcpy(&rqstp->rq_addr, &xprt->xpt_remote, xprt->xpt_remotelen); 260 rqstp->rq_addrlen = xprt->xpt_remotelen; 261 262 /* 263 * Destination address in request is needed for binding the 264 * source address in RPC replies/callbacks later. 265 */ 266 sin = (struct sockaddr *)&xprt->xpt_local; 267 switch (sin->sa_family) { 268 case AF_INET: 269 rqstp->rq_daddr.addr = ((struct sockaddr_in *)sin)->sin_addr; 270 break; 271 case AF_INET6: 272 rqstp->rq_daddr.addr6 = ((struct sockaddr_in6 *)sin)->sin6_addr; 273 break; 274 } 275 } 276 EXPORT_SYMBOL_GPL(svc_xprt_copy_addrs); 277 278 /** 279 * svc_print_addr - Format rq_addr field for printing 280 * @rqstp: svc_rqst struct containing address to print 281 * @buf: target buffer for formatted address 282 * @len: length of target buffer 283 * 284 */ 285 char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) 286 { 287 return __svc_print_addr(svc_addr(rqstp), buf, len); 288 } 289 EXPORT_SYMBOL_GPL(svc_print_addr); 290 291 /* 292 * Queue up an idle server thread. Must have pool->sp_lock held. 293 * Note: this is really a stack rather than a queue, so that we only 294 * use as many different threads as we need, and the rest don't pollute 295 * the cache. 296 */ 297 static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp) 298 { 299 list_add(&rqstp->rq_list, &pool->sp_threads); 300 } 301 302 /* 303 * Dequeue an nfsd thread. Must have pool->sp_lock held. 304 */ 305 static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp) 306 { 307 list_del(&rqstp->rq_list); 308 } 309 310 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) 311 { 312 if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) 313 return true; 314 if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) 315 return xprt->xpt_ops->xpo_has_wspace(xprt); 316 return false; 317 } 318 319 /* 320 * Queue up a transport with data pending. If there are idle nfsd 321 * processes, wake 'em up. 322 * 323 */ 324 void svc_xprt_enqueue(struct svc_xprt *xprt) 325 { 326 struct svc_serv *serv = xprt->xpt_server; 327 struct svc_pool *pool; 328 struct svc_rqst *rqstp; 329 int cpu; 330 331 if (!svc_xprt_has_something_to_do(xprt)) 332 return; 333 334 cpu = get_cpu(); 335 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 336 put_cpu(); 337 338 spin_lock_bh(&pool->sp_lock); 339 340 if (!list_empty(&pool->sp_threads) && 341 !list_empty(&pool->sp_sockets)) 342 printk(KERN_ERR 343 "svc_xprt_enqueue: " 344 "threads and transports both waiting??\n"); 345 346 pool->sp_stats.packets++; 347 348 /* Mark transport as busy. It will remain in this state until 349 * the provider calls svc_xprt_received. We update XPT_BUSY 350 * atomically because it also guards against trying to enqueue 351 * the transport twice. 352 */ 353 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) { 354 /* Don't enqueue transport while already enqueued */ 355 dprintk("svc: transport %p busy, not enqueued\n", xprt); 356 goto out_unlock; 357 } 358 359 if (!list_empty(&pool->sp_threads)) { 360 rqstp = list_entry(pool->sp_threads.next, 361 struct svc_rqst, 362 rq_list); 363 dprintk("svc: transport %p served by daemon %p\n", 364 xprt, rqstp); 365 svc_thread_dequeue(pool, rqstp); 366 if (rqstp->rq_xprt) 367 printk(KERN_ERR 368 "svc_xprt_enqueue: server %p, rq_xprt=%p!\n", 369 rqstp, rqstp->rq_xprt); 370 rqstp->rq_xprt = xprt; 371 svc_xprt_get(xprt); 372 rqstp->rq_reserved = serv->sv_max_mesg; 373 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 374 pool->sp_stats.threads_woken++; 375 wake_up(&rqstp->rq_wait); 376 } else { 377 dprintk("svc: transport %p put into queue\n", xprt); 378 list_add_tail(&xprt->xpt_ready, &pool->sp_sockets); 379 pool->sp_stats.sockets_queued++; 380 } 381 382 out_unlock: 383 spin_unlock_bh(&pool->sp_lock); 384 } 385 EXPORT_SYMBOL_GPL(svc_xprt_enqueue); 386 387 /* 388 * Dequeue the first transport. Must be called with the pool->sp_lock held. 389 */ 390 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool) 391 { 392 struct svc_xprt *xprt; 393 394 if (list_empty(&pool->sp_sockets)) 395 return NULL; 396 397 xprt = list_entry(pool->sp_sockets.next, 398 struct svc_xprt, xpt_ready); 399 list_del_init(&xprt->xpt_ready); 400 401 dprintk("svc: transport %p dequeued, inuse=%d\n", 402 xprt, atomic_read(&xprt->xpt_ref.refcount)); 403 404 return xprt; 405 } 406 407 /* 408 * svc_xprt_received conditionally queues the transport for processing 409 * by another thread. The caller must hold the XPT_BUSY bit and must 410 * not thereafter touch transport data. 411 * 412 * Note: XPT_DATA only gets cleared when a read-attempt finds no (or 413 * insufficient) data. 414 */ 415 void svc_xprt_received(struct svc_xprt *xprt) 416 { 417 BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags)); 418 /* As soon as we clear busy, the xprt could be closed and 419 * 'put', so we need a reference to call svc_xprt_enqueue with: 420 */ 421 svc_xprt_get(xprt); 422 clear_bit(XPT_BUSY, &xprt->xpt_flags); 423 svc_xprt_enqueue(xprt); 424 svc_xprt_put(xprt); 425 } 426 EXPORT_SYMBOL_GPL(svc_xprt_received); 427 428 /** 429 * svc_reserve - change the space reserved for the reply to a request. 430 * @rqstp: The request in question 431 * @space: new max space to reserve 432 * 433 * Each request reserves some space on the output queue of the transport 434 * to make sure the reply fits. This function reduces that reserved 435 * space to be the amount of space used already, plus @space. 436 * 437 */ 438 void svc_reserve(struct svc_rqst *rqstp, int space) 439 { 440 space += rqstp->rq_res.head[0].iov_len; 441 442 if (space < rqstp->rq_reserved) { 443 struct svc_xprt *xprt = rqstp->rq_xprt; 444 atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); 445 rqstp->rq_reserved = space; 446 447 svc_xprt_enqueue(xprt); 448 } 449 } 450 EXPORT_SYMBOL_GPL(svc_reserve); 451 452 static void svc_xprt_release(struct svc_rqst *rqstp) 453 { 454 struct svc_xprt *xprt = rqstp->rq_xprt; 455 456 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 457 458 kfree(rqstp->rq_deferred); 459 rqstp->rq_deferred = NULL; 460 461 svc_free_res_pages(rqstp); 462 rqstp->rq_res.page_len = 0; 463 rqstp->rq_res.page_base = 0; 464 465 /* Reset response buffer and release 466 * the reservation. 467 * But first, check that enough space was reserved 468 * for the reply, otherwise we have a bug! 469 */ 470 if ((rqstp->rq_res.len) > rqstp->rq_reserved) 471 printk(KERN_ERR "RPC request reserved %d but used %d\n", 472 rqstp->rq_reserved, 473 rqstp->rq_res.len); 474 475 rqstp->rq_res.head[0].iov_len = 0; 476 svc_reserve(rqstp, 0); 477 rqstp->rq_xprt = NULL; 478 479 svc_xprt_put(xprt); 480 } 481 482 /* 483 * External function to wake up a server waiting for data 484 * This really only makes sense for services like lockd 485 * which have exactly one thread anyway. 486 */ 487 void svc_wake_up(struct svc_serv *serv) 488 { 489 struct svc_rqst *rqstp; 490 unsigned int i; 491 struct svc_pool *pool; 492 493 for (i = 0; i < serv->sv_nrpools; i++) { 494 pool = &serv->sv_pools[i]; 495 496 spin_lock_bh(&pool->sp_lock); 497 if (!list_empty(&pool->sp_threads)) { 498 rqstp = list_entry(pool->sp_threads.next, 499 struct svc_rqst, 500 rq_list); 501 dprintk("svc: daemon %p woken up.\n", rqstp); 502 /* 503 svc_thread_dequeue(pool, rqstp); 504 rqstp->rq_xprt = NULL; 505 */ 506 wake_up(&rqstp->rq_wait); 507 } 508 spin_unlock_bh(&pool->sp_lock); 509 } 510 } 511 EXPORT_SYMBOL_GPL(svc_wake_up); 512 513 int svc_port_is_privileged(struct sockaddr *sin) 514 { 515 switch (sin->sa_family) { 516 case AF_INET: 517 return ntohs(((struct sockaddr_in *)sin)->sin_port) 518 < PROT_SOCK; 519 case AF_INET6: 520 return ntohs(((struct sockaddr_in6 *)sin)->sin6_port) 521 < PROT_SOCK; 522 default: 523 return 0; 524 } 525 } 526 527 /* 528 * Make sure that we don't have too many active connections. If we have, 529 * something must be dropped. It's not clear what will happen if we allow 530 * "too many" connections, but when dealing with network-facing software, 531 * we have to code defensively. Here we do that by imposing hard limits. 532 * 533 * There's no point in trying to do random drop here for DoS 534 * prevention. The NFS clients does 1 reconnect in 15 seconds. An 535 * attacker can easily beat that. 536 * 537 * The only somewhat efficient mechanism would be if drop old 538 * connections from the same IP first. But right now we don't even 539 * record the client IP in svc_sock. 540 * 541 * single-threaded services that expect a lot of clients will probably 542 * need to set sv_maxconn to override the default value which is based 543 * on the number of threads 544 */ 545 static void svc_check_conn_limits(struct svc_serv *serv) 546 { 547 unsigned int limit = serv->sv_maxconn ? serv->sv_maxconn : 548 (serv->sv_nrthreads+3) * 20; 549 550 if (serv->sv_tmpcnt > limit) { 551 struct svc_xprt *xprt = NULL; 552 spin_lock_bh(&serv->sv_lock); 553 if (!list_empty(&serv->sv_tempsocks)) { 554 if (net_ratelimit()) { 555 /* Try to help the admin */ 556 printk(KERN_NOTICE "%s: too many open " 557 "connections, consider increasing %s\n", 558 serv->sv_name, serv->sv_maxconn ? 559 "the max number of connections." : 560 "the number of threads."); 561 } 562 /* 563 * Always select the oldest connection. It's not fair, 564 * but so is life 565 */ 566 xprt = list_entry(serv->sv_tempsocks.prev, 567 struct svc_xprt, 568 xpt_list); 569 set_bit(XPT_CLOSE, &xprt->xpt_flags); 570 svc_xprt_get(xprt); 571 } 572 spin_unlock_bh(&serv->sv_lock); 573 574 if (xprt) { 575 svc_xprt_enqueue(xprt); 576 svc_xprt_put(xprt); 577 } 578 } 579 } 580 581 /* 582 * Receive the next request on any transport. This code is carefully 583 * organised not to touch any cachelines in the shared svc_serv 584 * structure, only cachelines in the local svc_pool. 585 */ 586 int svc_recv(struct svc_rqst *rqstp, long timeout) 587 { 588 struct svc_xprt *xprt = NULL; 589 struct svc_serv *serv = rqstp->rq_server; 590 struct svc_pool *pool = rqstp->rq_pool; 591 int len, i; 592 int pages; 593 struct xdr_buf *arg; 594 DECLARE_WAITQUEUE(wait, current); 595 long time_left; 596 597 dprintk("svc: server %p waiting for data (to = %ld)\n", 598 rqstp, timeout); 599 600 if (rqstp->rq_xprt) 601 printk(KERN_ERR 602 "svc_recv: service %p, transport not NULL!\n", 603 rqstp); 604 if (waitqueue_active(&rqstp->rq_wait)) 605 printk(KERN_ERR 606 "svc_recv: service %p, wait queue active!\n", 607 rqstp); 608 609 /* now allocate needed pages. If we get a failure, sleep briefly */ 610 pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE; 611 for (i = 0; i < pages ; i++) 612 while (rqstp->rq_pages[i] == NULL) { 613 struct page *p = alloc_page(GFP_KERNEL); 614 if (!p) { 615 set_current_state(TASK_INTERRUPTIBLE); 616 if (signalled() || kthread_should_stop()) { 617 set_current_state(TASK_RUNNING); 618 return -EINTR; 619 } 620 schedule_timeout(msecs_to_jiffies(500)); 621 } 622 rqstp->rq_pages[i] = p; 623 } 624 rqstp->rq_pages[i++] = NULL; /* this might be seen in nfs_read_actor */ 625 BUG_ON(pages >= RPCSVC_MAXPAGES); 626 627 /* Make arg->head point to first page and arg->pages point to rest */ 628 arg = &rqstp->rq_arg; 629 arg->head[0].iov_base = page_address(rqstp->rq_pages[0]); 630 arg->head[0].iov_len = PAGE_SIZE; 631 arg->pages = rqstp->rq_pages + 1; 632 arg->page_base = 0; 633 /* save at least one page for response */ 634 arg->page_len = (pages-2)*PAGE_SIZE; 635 arg->len = (pages-1)*PAGE_SIZE; 636 arg->tail[0].iov_len = 0; 637 638 try_to_freeze(); 639 cond_resched(); 640 if (signalled() || kthread_should_stop()) 641 return -EINTR; 642 643 /* Normally we will wait up to 5 seconds for any required 644 * cache information to be provided. 645 */ 646 rqstp->rq_chandle.thread_wait = 5*HZ; 647 648 spin_lock_bh(&pool->sp_lock); 649 xprt = svc_xprt_dequeue(pool); 650 if (xprt) { 651 rqstp->rq_xprt = xprt; 652 svc_xprt_get(xprt); 653 rqstp->rq_reserved = serv->sv_max_mesg; 654 atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved); 655 656 /* As there is a shortage of threads and this request 657 * had to be queued, don't allow the thread to wait so 658 * long for cache updates. 659 */ 660 rqstp->rq_chandle.thread_wait = 1*HZ; 661 } else { 662 /* No data pending. Go to sleep */ 663 svc_thread_enqueue(pool, rqstp); 664 665 /* 666 * We have to be able to interrupt this wait 667 * to bring down the daemons ... 668 */ 669 set_current_state(TASK_INTERRUPTIBLE); 670 671 /* 672 * checking kthread_should_stop() here allows us to avoid 673 * locking and signalling when stopping kthreads that call 674 * svc_recv. If the thread has already been woken up, then 675 * we can exit here without sleeping. If not, then it 676 * it'll be woken up quickly during the schedule_timeout 677 */ 678 if (kthread_should_stop()) { 679 set_current_state(TASK_RUNNING); 680 spin_unlock_bh(&pool->sp_lock); 681 return -EINTR; 682 } 683 684 add_wait_queue(&rqstp->rq_wait, &wait); 685 spin_unlock_bh(&pool->sp_lock); 686 687 time_left = schedule_timeout(timeout); 688 689 try_to_freeze(); 690 691 spin_lock_bh(&pool->sp_lock); 692 remove_wait_queue(&rqstp->rq_wait, &wait); 693 if (!time_left) 694 pool->sp_stats.threads_timedout++; 695 696 xprt = rqstp->rq_xprt; 697 if (!xprt) { 698 svc_thread_dequeue(pool, rqstp); 699 spin_unlock_bh(&pool->sp_lock); 700 dprintk("svc: server %p, no data yet\n", rqstp); 701 if (signalled() || kthread_should_stop()) 702 return -EINTR; 703 else 704 return -EAGAIN; 705 } 706 } 707 spin_unlock_bh(&pool->sp_lock); 708 709 len = 0; 710 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) { 711 dprintk("svc_recv: found XPT_CLOSE\n"); 712 svc_delete_xprt(xprt); 713 /* Leave XPT_BUSY set on the dead xprt: */ 714 goto out; 715 } 716 if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) { 717 struct svc_xprt *newxpt; 718 newxpt = xprt->xpt_ops->xpo_accept(xprt); 719 if (newxpt) { 720 /* 721 * We know this module_get will succeed because the 722 * listener holds a reference too 723 */ 724 __module_get(newxpt->xpt_class->xcl_owner); 725 svc_check_conn_limits(xprt->xpt_server); 726 spin_lock_bh(&serv->sv_lock); 727 set_bit(XPT_TEMP, &newxpt->xpt_flags); 728 list_add(&newxpt->xpt_list, &serv->sv_tempsocks); 729 serv->sv_tmpcnt++; 730 if (serv->sv_temptimer.function == NULL) { 731 /* setup timer to age temp transports */ 732 setup_timer(&serv->sv_temptimer, 733 svc_age_temp_xprts, 734 (unsigned long)serv); 735 mod_timer(&serv->sv_temptimer, 736 jiffies + svc_conn_age_period * HZ); 737 } 738 spin_unlock_bh(&serv->sv_lock); 739 svc_xprt_received(newxpt); 740 } 741 } else if (xprt->xpt_ops->xpo_has_wspace(xprt)) { 742 dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", 743 rqstp, pool->sp_id, xprt, 744 atomic_read(&xprt->xpt_ref.refcount)); 745 rqstp->rq_deferred = svc_deferred_dequeue(xprt); 746 if (rqstp->rq_deferred) 747 len = svc_deferred_recv(rqstp); 748 else 749 len = xprt->xpt_ops->xpo_recvfrom(rqstp); 750 dprintk("svc: got len=%d\n", len); 751 } 752 svc_xprt_received(xprt); 753 754 /* No data, incomplete (TCP) read, or accept() */ 755 if (len == 0 || len == -EAGAIN) 756 goto out; 757 758 clear_bit(XPT_OLD, &xprt->xpt_flags); 759 760 rqstp->rq_secure = svc_port_is_privileged(svc_addr(rqstp)); 761 rqstp->rq_chandle.defer = svc_defer; 762 763 if (serv->sv_stats) 764 serv->sv_stats->netcnt++; 765 return len; 766 out: 767 rqstp->rq_res.len = 0; 768 svc_xprt_release(rqstp); 769 return -EAGAIN; 770 } 771 EXPORT_SYMBOL_GPL(svc_recv); 772 773 /* 774 * Drop request 775 */ 776 void svc_drop(struct svc_rqst *rqstp) 777 { 778 dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); 779 svc_xprt_release(rqstp); 780 } 781 EXPORT_SYMBOL_GPL(svc_drop); 782 783 /* 784 * Return reply to client. 785 */ 786 int svc_send(struct svc_rqst *rqstp) 787 { 788 struct svc_xprt *xprt; 789 int len; 790 struct xdr_buf *xb; 791 792 xprt = rqstp->rq_xprt; 793 if (!xprt) 794 return -EFAULT; 795 796 /* release the receive skb before sending the reply */ 797 rqstp->rq_xprt->xpt_ops->xpo_release_rqst(rqstp); 798 799 /* calculate over-all length */ 800 xb = &rqstp->rq_res; 801 xb->len = xb->head[0].iov_len + 802 xb->page_len + 803 xb->tail[0].iov_len; 804 805 /* Grab mutex to serialize outgoing data. */ 806 mutex_lock(&xprt->xpt_mutex); 807 if (test_bit(XPT_DEAD, &xprt->xpt_flags)) 808 len = -ENOTCONN; 809 else 810 len = xprt->xpt_ops->xpo_sendto(rqstp); 811 mutex_unlock(&xprt->xpt_mutex); 812 rpc_wake_up(&xprt->xpt_bc_pending); 813 svc_xprt_release(rqstp); 814 815 if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN) 816 return 0; 817 return len; 818 } 819 820 /* 821 * Timer function to close old temporary transports, using 822 * a mark-and-sweep algorithm. 823 */ 824 static void svc_age_temp_xprts(unsigned long closure) 825 { 826 struct svc_serv *serv = (struct svc_serv *)closure; 827 struct svc_xprt *xprt; 828 struct list_head *le, *next; 829 LIST_HEAD(to_be_aged); 830 831 dprintk("svc_age_temp_xprts\n"); 832 833 if (!spin_trylock_bh(&serv->sv_lock)) { 834 /* busy, try again 1 sec later */ 835 dprintk("svc_age_temp_xprts: busy\n"); 836 mod_timer(&serv->sv_temptimer, jiffies + HZ); 837 return; 838 } 839 840 list_for_each_safe(le, next, &serv->sv_tempsocks) { 841 xprt = list_entry(le, struct svc_xprt, xpt_list); 842 843 /* First time through, just mark it OLD. Second time 844 * through, close it. */ 845 if (!test_and_set_bit(XPT_OLD, &xprt->xpt_flags)) 846 continue; 847 if (atomic_read(&xprt->xpt_ref.refcount) > 1 || 848 test_bit(XPT_BUSY, &xprt->xpt_flags)) 849 continue; 850 svc_xprt_get(xprt); 851 list_move(le, &to_be_aged); 852 set_bit(XPT_CLOSE, &xprt->xpt_flags); 853 set_bit(XPT_DETACHED, &xprt->xpt_flags); 854 } 855 spin_unlock_bh(&serv->sv_lock); 856 857 while (!list_empty(&to_be_aged)) { 858 le = to_be_aged.next; 859 /* fiddling the xpt_list node is safe 'cos we're XPT_DETACHED */ 860 list_del_init(le); 861 xprt = list_entry(le, struct svc_xprt, xpt_list); 862 863 dprintk("queuing xprt %p for closing\n", xprt); 864 865 /* a thread will dequeue and close it soon */ 866 svc_xprt_enqueue(xprt); 867 svc_xprt_put(xprt); 868 } 869 870 mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); 871 } 872 873 static void call_xpt_users(struct svc_xprt *xprt) 874 { 875 struct svc_xpt_user *u; 876 877 spin_lock(&xprt->xpt_lock); 878 while (!list_empty(&xprt->xpt_users)) { 879 u = list_first_entry(&xprt->xpt_users, struct svc_xpt_user, list); 880 list_del(&u->list); 881 u->callback(u); 882 } 883 spin_unlock(&xprt->xpt_lock); 884 } 885 886 /* 887 * Remove a dead transport 888 */ 889 void svc_delete_xprt(struct svc_xprt *xprt) 890 { 891 struct svc_serv *serv = xprt->xpt_server; 892 struct svc_deferred_req *dr; 893 894 /* Only do this once */ 895 if (test_and_set_bit(XPT_DEAD, &xprt->xpt_flags)) 896 BUG(); 897 898 dprintk("svc: svc_delete_xprt(%p)\n", xprt); 899 xprt->xpt_ops->xpo_detach(xprt); 900 901 spin_lock_bh(&serv->sv_lock); 902 if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags)) 903 list_del_init(&xprt->xpt_list); 904 /* 905 * We used to delete the transport from whichever list 906 * it's sk_xprt.xpt_ready node was on, but we don't actually 907 * need to. This is because the only time we're called 908 * while still attached to a queue, the queue itself 909 * is about to be destroyed (in svc_destroy). 910 */ 911 if (test_bit(XPT_TEMP, &xprt->xpt_flags)) 912 serv->sv_tmpcnt--; 913 spin_unlock_bh(&serv->sv_lock); 914 915 while ((dr = svc_deferred_dequeue(xprt)) != NULL) 916 kfree(dr); 917 918 call_xpt_users(xprt); 919 svc_xprt_put(xprt); 920 } 921 922 void svc_close_xprt(struct svc_xprt *xprt) 923 { 924 set_bit(XPT_CLOSE, &xprt->xpt_flags); 925 if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) 926 /* someone else will have to effect the close */ 927 return; 928 /* 929 * We expect svc_close_xprt() to work even when no threads are 930 * running (e.g., while configuring the server before starting 931 * any threads), so if the transport isn't busy, we delete 932 * it ourself: 933 */ 934 svc_delete_xprt(xprt); 935 } 936 EXPORT_SYMBOL_GPL(svc_close_xprt); 937 938 void svc_close_all(struct list_head *xprt_list) 939 { 940 struct svc_xprt *xprt; 941 struct svc_xprt *tmp; 942 943 /* 944 * The server is shutting down, and no more threads are running. 945 * svc_xprt_enqueue() might still be running, but at worst it 946 * will re-add the xprt to sp_sockets, which will soon get 947 * freed. So we don't bother with any more locking, and don't 948 * leave the close to the (nonexistent) server threads: 949 */ 950 list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) { 951 set_bit(XPT_CLOSE, &xprt->xpt_flags); 952 svc_delete_xprt(xprt); 953 } 954 } 955 956 /* 957 * Handle defer and revisit of requests 958 */ 959 960 static void svc_revisit(struct cache_deferred_req *dreq, int too_many) 961 { 962 struct svc_deferred_req *dr = 963 container_of(dreq, struct svc_deferred_req, handle); 964 struct svc_xprt *xprt = dr->xprt; 965 966 spin_lock(&xprt->xpt_lock); 967 set_bit(XPT_DEFERRED, &xprt->xpt_flags); 968 if (too_many || test_bit(XPT_DEAD, &xprt->xpt_flags)) { 969 spin_unlock(&xprt->xpt_lock); 970 dprintk("revisit canceled\n"); 971 svc_xprt_put(xprt); 972 kfree(dr); 973 return; 974 } 975 dprintk("revisit queued\n"); 976 dr->xprt = NULL; 977 list_add(&dr->handle.recent, &xprt->xpt_deferred); 978 spin_unlock(&xprt->xpt_lock); 979 svc_xprt_enqueue(xprt); 980 svc_xprt_put(xprt); 981 } 982 983 /* 984 * Save the request off for later processing. The request buffer looks 985 * like this: 986 * 987 * <xprt-header><rpc-header><rpc-pagelist><rpc-tail> 988 * 989 * This code can only handle requests that consist of an xprt-header 990 * and rpc-header. 991 */ 992 static struct cache_deferred_req *svc_defer(struct cache_req *req) 993 { 994 struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle); 995 struct svc_deferred_req *dr; 996 997 if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral) 998 return NULL; /* if more than a page, give up FIXME */ 999 if (rqstp->rq_deferred) { 1000 dr = rqstp->rq_deferred; 1001 rqstp->rq_deferred = NULL; 1002 } else { 1003 size_t skip; 1004 size_t size; 1005 /* FIXME maybe discard if size too large */ 1006 size = sizeof(struct svc_deferred_req) + rqstp->rq_arg.len; 1007 dr = kmalloc(size, GFP_KERNEL); 1008 if (dr == NULL) 1009 return NULL; 1010 1011 dr->handle.owner = rqstp->rq_server; 1012 dr->prot = rqstp->rq_prot; 1013 memcpy(&dr->addr, &rqstp->rq_addr, rqstp->rq_addrlen); 1014 dr->addrlen = rqstp->rq_addrlen; 1015 dr->daddr = rqstp->rq_daddr; 1016 dr->argslen = rqstp->rq_arg.len >> 2; 1017 dr->xprt_hlen = rqstp->rq_xprt_hlen; 1018 1019 /* back up head to the start of the buffer and copy */ 1020 skip = rqstp->rq_arg.len - rqstp->rq_arg.head[0].iov_len; 1021 memcpy(dr->args, rqstp->rq_arg.head[0].iov_base - skip, 1022 dr->argslen << 2); 1023 } 1024 svc_xprt_get(rqstp->rq_xprt); 1025 dr->xprt = rqstp->rq_xprt; 1026 rqstp->rq_dropme = true; 1027 1028 dr->handle.revisit = svc_revisit; 1029 return &dr->handle; 1030 } 1031 1032 /* 1033 * recv data from a deferred request into an active one 1034 */ 1035 static int svc_deferred_recv(struct svc_rqst *rqstp) 1036 { 1037 struct svc_deferred_req *dr = rqstp->rq_deferred; 1038 1039 /* setup iov_base past transport header */ 1040 rqstp->rq_arg.head[0].iov_base = dr->args + (dr->xprt_hlen>>2); 1041 /* The iov_len does not include the transport header bytes */ 1042 rqstp->rq_arg.head[0].iov_len = (dr->argslen<<2) - dr->xprt_hlen; 1043 rqstp->rq_arg.page_len = 0; 1044 /* The rq_arg.len includes the transport header bytes */ 1045 rqstp->rq_arg.len = dr->argslen<<2; 1046 rqstp->rq_prot = dr->prot; 1047 memcpy(&rqstp->rq_addr, &dr->addr, dr->addrlen); 1048 rqstp->rq_addrlen = dr->addrlen; 1049 /* Save off transport header len in case we get deferred again */ 1050 rqstp->rq_xprt_hlen = dr->xprt_hlen; 1051 rqstp->rq_daddr = dr->daddr; 1052 rqstp->rq_respages = rqstp->rq_pages; 1053 return (dr->argslen<<2) - dr->xprt_hlen; 1054 } 1055 1056 1057 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) 1058 { 1059 struct svc_deferred_req *dr = NULL; 1060 1061 if (!test_bit(XPT_DEFERRED, &xprt->xpt_flags)) 1062 return NULL; 1063 spin_lock(&xprt->xpt_lock); 1064 if (!list_empty(&xprt->xpt_deferred)) { 1065 dr = list_entry(xprt->xpt_deferred.next, 1066 struct svc_deferred_req, 1067 handle.recent); 1068 list_del_init(&dr->handle.recent); 1069 } else 1070 clear_bit(XPT_DEFERRED, &xprt->xpt_flags); 1071 spin_unlock(&xprt->xpt_lock); 1072 return dr; 1073 } 1074 1075 /** 1076 * svc_find_xprt - find an RPC transport instance 1077 * @serv: pointer to svc_serv to search 1078 * @xcl_name: C string containing transport's class name 1079 * @af: Address family of transport's local address 1080 * @port: transport's IP port number 1081 * 1082 * Return the transport instance pointer for the endpoint accepting 1083 * connections/peer traffic from the specified transport class, 1084 * address family and port. 1085 * 1086 * Specifying 0 for the address family or port is effectively a 1087 * wild-card, and will result in matching the first transport in the 1088 * service's list that has a matching class name. 1089 */ 1090 struct svc_xprt *svc_find_xprt(struct svc_serv *serv, const char *xcl_name, 1091 const sa_family_t af, const unsigned short port) 1092 { 1093 struct svc_xprt *xprt; 1094 struct svc_xprt *found = NULL; 1095 1096 /* Sanity check the args */ 1097 if (serv == NULL || xcl_name == NULL) 1098 return found; 1099 1100 spin_lock_bh(&serv->sv_lock); 1101 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1102 if (strcmp(xprt->xpt_class->xcl_name, xcl_name)) 1103 continue; 1104 if (af != AF_UNSPEC && af != xprt->xpt_local.ss_family) 1105 continue; 1106 if (port != 0 && port != svc_xprt_local_port(xprt)) 1107 continue; 1108 found = xprt; 1109 svc_xprt_get(xprt); 1110 break; 1111 } 1112 spin_unlock_bh(&serv->sv_lock); 1113 return found; 1114 } 1115 EXPORT_SYMBOL_GPL(svc_find_xprt); 1116 1117 static int svc_one_xprt_name(const struct svc_xprt *xprt, 1118 char *pos, int remaining) 1119 { 1120 int len; 1121 1122 len = snprintf(pos, remaining, "%s %u\n", 1123 xprt->xpt_class->xcl_name, 1124 svc_xprt_local_port(xprt)); 1125 if (len >= remaining) 1126 return -ENAMETOOLONG; 1127 return len; 1128 } 1129 1130 /** 1131 * svc_xprt_names - format a buffer with a list of transport names 1132 * @serv: pointer to an RPC service 1133 * @buf: pointer to a buffer to be filled in 1134 * @buflen: length of buffer to be filled in 1135 * 1136 * Fills in @buf with a string containing a list of transport names, 1137 * each name terminated with '\n'. 1138 * 1139 * Returns positive length of the filled-in string on success; otherwise 1140 * a negative errno value is returned if an error occurs. 1141 */ 1142 int svc_xprt_names(struct svc_serv *serv, char *buf, const int buflen) 1143 { 1144 struct svc_xprt *xprt; 1145 int len, totlen; 1146 char *pos; 1147 1148 /* Sanity check args */ 1149 if (!serv) 1150 return 0; 1151 1152 spin_lock_bh(&serv->sv_lock); 1153 1154 pos = buf; 1155 totlen = 0; 1156 list_for_each_entry(xprt, &serv->sv_permsocks, xpt_list) { 1157 len = svc_one_xprt_name(xprt, pos, buflen - totlen); 1158 if (len < 0) { 1159 *buf = '\0'; 1160 totlen = len; 1161 } 1162 if (len <= 0) 1163 break; 1164 1165 pos += len; 1166 totlen += len; 1167 } 1168 1169 spin_unlock_bh(&serv->sv_lock); 1170 return totlen; 1171 } 1172 EXPORT_SYMBOL_GPL(svc_xprt_names); 1173 1174 1175 /*----------------------------------------------------------------------------*/ 1176 1177 static void *svc_pool_stats_start(struct seq_file *m, loff_t *pos) 1178 { 1179 unsigned int pidx = (unsigned int)*pos; 1180 struct svc_serv *serv = m->private; 1181 1182 dprintk("svc_pool_stats_start, *pidx=%u\n", pidx); 1183 1184 if (!pidx) 1185 return SEQ_START_TOKEN; 1186 return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]); 1187 } 1188 1189 static void *svc_pool_stats_next(struct seq_file *m, void *p, loff_t *pos) 1190 { 1191 struct svc_pool *pool = p; 1192 struct svc_serv *serv = m->private; 1193 1194 dprintk("svc_pool_stats_next, *pos=%llu\n", *pos); 1195 1196 if (p == SEQ_START_TOKEN) { 1197 pool = &serv->sv_pools[0]; 1198 } else { 1199 unsigned int pidx = (pool - &serv->sv_pools[0]); 1200 if (pidx < serv->sv_nrpools-1) 1201 pool = &serv->sv_pools[pidx+1]; 1202 else 1203 pool = NULL; 1204 } 1205 ++*pos; 1206 return pool; 1207 } 1208 1209 static void svc_pool_stats_stop(struct seq_file *m, void *p) 1210 { 1211 } 1212 1213 static int svc_pool_stats_show(struct seq_file *m, void *p) 1214 { 1215 struct svc_pool *pool = p; 1216 1217 if (p == SEQ_START_TOKEN) { 1218 seq_puts(m, "# pool packets-arrived sockets-enqueued threads-woken threads-timedout\n"); 1219 return 0; 1220 } 1221 1222 seq_printf(m, "%u %lu %lu %lu %lu\n", 1223 pool->sp_id, 1224 pool->sp_stats.packets, 1225 pool->sp_stats.sockets_queued, 1226 pool->sp_stats.threads_woken, 1227 pool->sp_stats.threads_timedout); 1228 1229 return 0; 1230 } 1231 1232 static const struct seq_operations svc_pool_stats_seq_ops = { 1233 .start = svc_pool_stats_start, 1234 .next = svc_pool_stats_next, 1235 .stop = svc_pool_stats_stop, 1236 .show = svc_pool_stats_show, 1237 }; 1238 1239 int svc_pool_stats_open(struct svc_serv *serv, struct file *file) 1240 { 1241 int err; 1242 1243 err = seq_open(file, &svc_pool_stats_seq_ops); 1244 if (!err) 1245 ((struct seq_file *) file->private_data)->private = serv; 1246 return err; 1247 } 1248 EXPORT_SYMBOL(svc_pool_stats_open); 1249 1250 /*----------------------------------------------------------------------------*/ 1251