1 /* RxRPC individual remote procedure call handling 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 13 14 #include <linux/slab.h> 15 #include <linux/module.h> 16 #include <linux/circ_buf.h> 17 #include <linux/spinlock_types.h> 18 #include <net/sock.h> 19 #include <net/af_rxrpc.h> 20 #include "ar-internal.h" 21 22 /* 23 * Maximum lifetime of a call (in jiffies). 24 */ 25 unsigned int rxrpc_max_call_lifetime = 60 * HZ; 26 27 /* 28 * Time till dead call expires after last use (in jiffies). 29 */ 30 unsigned int rxrpc_dead_call_expiry = 2 * HZ; 31 32 const char *const rxrpc_call_states[NR__RXRPC_CALL_STATES] = { 33 [RXRPC_CALL_UNINITIALISED] = "Uninit ", 34 [RXRPC_CALL_CLIENT_AWAIT_CONN] = "ClWtConn", 35 [RXRPC_CALL_CLIENT_SEND_REQUEST] = "ClSndReq", 36 [RXRPC_CALL_CLIENT_AWAIT_REPLY] = "ClAwtRpl", 37 [RXRPC_CALL_CLIENT_RECV_REPLY] = "ClRcvRpl", 38 [RXRPC_CALL_CLIENT_FINAL_ACK] = "ClFnlACK", 39 [RXRPC_CALL_SERVER_SECURING] = "SvSecure", 40 [RXRPC_CALL_SERVER_ACCEPTING] = "SvAccept", 41 [RXRPC_CALL_SERVER_RECV_REQUEST] = "SvRcvReq", 42 [RXRPC_CALL_SERVER_ACK_REQUEST] = "SvAckReq", 43 [RXRPC_CALL_SERVER_SEND_REPLY] = "SvSndRpl", 44 [RXRPC_CALL_SERVER_AWAIT_ACK] = "SvAwtACK", 45 [RXRPC_CALL_COMPLETE] = "Complete", 46 [RXRPC_CALL_DEAD] = "Dead ", 47 }; 48 49 const char *const rxrpc_call_completions[NR__RXRPC_CALL_COMPLETIONS] = { 50 [RXRPC_CALL_SUCCEEDED] = "Complete", 51 [RXRPC_CALL_SERVER_BUSY] = "SvBusy ", 52 [RXRPC_CALL_REMOTELY_ABORTED] = "RmtAbort", 53 [RXRPC_CALL_LOCALLY_ABORTED] = "LocAbort", 54 [RXRPC_CALL_LOCAL_ERROR] = "LocError", 55 [RXRPC_CALL_NETWORK_ERROR] = "NetError", 56 }; 57 58 const char rxrpc_call_traces[rxrpc_call__nr_trace][4] = { 59 [rxrpc_call_new_client] = "NWc", 60 [rxrpc_call_new_service] = "NWs", 61 [rxrpc_call_queued] = "QUE", 62 [rxrpc_call_queued_ref] = "QUR", 63 [rxrpc_call_seen] = "SEE", 64 [rxrpc_call_got] = "GOT", 65 [rxrpc_call_got_skb] = "Gsk", 66 [rxrpc_call_got_userid] = "Gus", 67 [rxrpc_call_put] = "PUT", 68 [rxrpc_call_put_skb] = "Psk", 69 [rxrpc_call_put_userid] = "Pus", 70 [rxrpc_call_put_noqueue] = "PNQ", 71 }; 72 73 struct kmem_cache *rxrpc_call_jar; 74 LIST_HEAD(rxrpc_calls); 75 DEFINE_RWLOCK(rxrpc_call_lock); 76 77 static void rxrpc_destroy_call(struct work_struct *work); 78 static void rxrpc_call_life_expired(unsigned long _call); 79 static void rxrpc_dead_call_expired(unsigned long _call); 80 static void rxrpc_ack_time_expired(unsigned long _call); 81 static void rxrpc_resend_time_expired(unsigned long _call); 82 83 /* 84 * find an extant server call 85 * - called in process context with IRQs enabled 86 */ 87 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx, 88 unsigned long user_call_ID) 89 { 90 struct rxrpc_call *call; 91 struct rb_node *p; 92 93 _enter("%p,%lx", rx, user_call_ID); 94 95 read_lock(&rx->call_lock); 96 97 p = rx->calls.rb_node; 98 while (p) { 99 call = rb_entry(p, struct rxrpc_call, sock_node); 100 101 if (user_call_ID < call->user_call_ID) 102 p = p->rb_left; 103 else if (user_call_ID > call->user_call_ID) 104 p = p->rb_right; 105 else 106 goto found_extant_call; 107 } 108 109 read_unlock(&rx->call_lock); 110 _leave(" = NULL"); 111 return NULL; 112 113 found_extant_call: 114 rxrpc_get_call(call, rxrpc_call_got); 115 read_unlock(&rx->call_lock); 116 _leave(" = %p [%d]", call, atomic_read(&call->usage)); 117 return call; 118 } 119 120 /* 121 * allocate a new call 122 */ 123 static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp) 124 { 125 struct rxrpc_call *call; 126 127 call = kmem_cache_zalloc(rxrpc_call_jar, gfp); 128 if (!call) 129 return NULL; 130 131 call->acks_winsz = 16; 132 call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long), 133 gfp); 134 if (!call->acks_window) { 135 kmem_cache_free(rxrpc_call_jar, call); 136 return NULL; 137 } 138 139 setup_timer(&call->lifetimer, &rxrpc_call_life_expired, 140 (unsigned long) call); 141 setup_timer(&call->deadspan, &rxrpc_dead_call_expired, 142 (unsigned long) call); 143 setup_timer(&call->ack_timer, &rxrpc_ack_time_expired, 144 (unsigned long) call); 145 setup_timer(&call->resend_timer, &rxrpc_resend_time_expired, 146 (unsigned long) call); 147 INIT_WORK(&call->destroyer, &rxrpc_destroy_call); 148 INIT_WORK(&call->processor, &rxrpc_process_call); 149 INIT_LIST_HEAD(&call->link); 150 INIT_LIST_HEAD(&call->chan_wait_link); 151 INIT_LIST_HEAD(&call->accept_link); 152 skb_queue_head_init(&call->rx_queue); 153 skb_queue_head_init(&call->rx_oos_queue); 154 skb_queue_head_init(&call->knlrecv_queue); 155 init_waitqueue_head(&call->waitq); 156 spin_lock_init(&call->lock); 157 rwlock_init(&call->state_lock); 158 atomic_set(&call->usage, 1); 159 call->debug_id = atomic_inc_return(&rxrpc_debug_id); 160 161 memset(&call->sock_node, 0xed, sizeof(call->sock_node)); 162 163 call->rx_data_expect = 1; 164 call->rx_data_eaten = 0; 165 call->rx_first_oos = 0; 166 call->ackr_win_top = call->rx_data_eaten + 1 + rxrpc_rx_window_size; 167 call->creation_jif = jiffies; 168 return call; 169 } 170 171 /* 172 * Allocate a new client call. 173 */ 174 static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx, 175 struct sockaddr_rxrpc *srx, 176 gfp_t gfp) 177 { 178 struct rxrpc_call *call; 179 180 _enter(""); 181 182 ASSERT(rx->local != NULL); 183 184 call = rxrpc_alloc_call(gfp); 185 if (!call) 186 return ERR_PTR(-ENOMEM); 187 call->state = RXRPC_CALL_CLIENT_AWAIT_CONN; 188 189 sock_hold(&rx->sk); 190 call->socket = rx; 191 call->rx_data_post = 1; 192 call->service_id = srx->srx_service; 193 194 _leave(" = %p", call); 195 return call; 196 } 197 198 /* 199 * Begin client call. 200 */ 201 static int rxrpc_begin_client_call(struct rxrpc_call *call, 202 struct rxrpc_conn_parameters *cp, 203 struct sockaddr_rxrpc *srx, 204 gfp_t gfp) 205 { 206 int ret; 207 208 /* Set up or get a connection record and set the protocol parameters, 209 * including channel number and call ID. 210 */ 211 ret = rxrpc_connect_call(call, cp, srx, gfp); 212 if (ret < 0) 213 return ret; 214 215 spin_lock(&call->conn->params.peer->lock); 216 hlist_add_head(&call->error_link, &call->conn->params.peer->error_targets); 217 spin_unlock(&call->conn->params.peer->lock); 218 219 call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime; 220 add_timer(&call->lifetimer); 221 return 0; 222 } 223 224 /* 225 * set up a call for the given data 226 * - called in process context with IRQs enabled 227 */ 228 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, 229 struct rxrpc_conn_parameters *cp, 230 struct sockaddr_rxrpc *srx, 231 unsigned long user_call_ID, 232 gfp_t gfp) 233 { 234 struct rxrpc_call *call, *xcall; 235 struct rb_node *parent, **pp; 236 const void *here = __builtin_return_address(0); 237 int ret; 238 239 _enter("%p,%lx", rx, user_call_ID); 240 241 call = rxrpc_alloc_client_call(rx, srx, gfp); 242 if (IS_ERR(call)) { 243 _leave(" = %ld", PTR_ERR(call)); 244 return call; 245 } 246 247 trace_rxrpc_call(call, 0, atomic_read(&call->usage), 0, here, 248 (const void *)user_call_ID); 249 250 /* Publish the call, even though it is incompletely set up as yet */ 251 call->user_call_ID = user_call_ID; 252 __set_bit(RXRPC_CALL_HAS_USERID, &call->flags); 253 254 write_lock(&rx->call_lock); 255 256 pp = &rx->calls.rb_node; 257 parent = NULL; 258 while (*pp) { 259 parent = *pp; 260 xcall = rb_entry(parent, struct rxrpc_call, sock_node); 261 262 if (user_call_ID < xcall->user_call_ID) 263 pp = &(*pp)->rb_left; 264 else if (user_call_ID > xcall->user_call_ID) 265 pp = &(*pp)->rb_right; 266 else 267 goto found_user_ID_now_present; 268 } 269 270 rxrpc_get_call(call, rxrpc_call_got_userid); 271 rb_link_node(&call->sock_node, parent, pp); 272 rb_insert_color(&call->sock_node, &rx->calls); 273 write_unlock(&rx->call_lock); 274 275 write_lock_bh(&rxrpc_call_lock); 276 list_add_tail(&call->link, &rxrpc_calls); 277 write_unlock_bh(&rxrpc_call_lock); 278 279 ret = rxrpc_begin_client_call(call, cp, srx, gfp); 280 if (ret < 0) 281 goto error; 282 283 _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id); 284 285 _leave(" = %p [new]", call); 286 return call; 287 288 error: 289 write_lock(&rx->call_lock); 290 rb_erase(&call->sock_node, &rx->calls); 291 write_unlock(&rx->call_lock); 292 rxrpc_put_call(call, rxrpc_call_put_userid); 293 294 write_lock_bh(&rxrpc_call_lock); 295 list_del_init(&call->link); 296 write_unlock_bh(&rxrpc_call_lock); 297 298 set_bit(RXRPC_CALL_RELEASED, &call->flags); 299 call->state = RXRPC_CALL_DEAD; 300 rxrpc_put_call(call, rxrpc_call_put); 301 _leave(" = %d", ret); 302 return ERR_PTR(ret); 303 304 /* We unexpectedly found the user ID in the list after taking 305 * the call_lock. This shouldn't happen unless the user races 306 * with itself and tries to add the same user ID twice at the 307 * same time in different threads. 308 */ 309 found_user_ID_now_present: 310 write_unlock(&rx->call_lock); 311 set_bit(RXRPC_CALL_RELEASED, &call->flags); 312 call->state = RXRPC_CALL_DEAD; 313 rxrpc_put_call(call, rxrpc_call_put); 314 _leave(" = -EEXIST [%p]", call); 315 return ERR_PTR(-EEXIST); 316 } 317 318 /* 319 * set up an incoming call 320 * - called in process context with IRQs enabled 321 */ 322 struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx, 323 struct rxrpc_connection *conn, 324 struct sk_buff *skb) 325 { 326 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 327 struct rxrpc_call *call, *candidate; 328 const void *here = __builtin_return_address(0); 329 u32 call_id, chan; 330 331 _enter(",%d", conn->debug_id); 332 333 ASSERT(rx != NULL); 334 335 candidate = rxrpc_alloc_call(GFP_NOIO); 336 if (!candidate) 337 return ERR_PTR(-EBUSY); 338 339 trace_rxrpc_call(candidate, rxrpc_call_new_service, 340 atomic_read(&candidate->usage), 0, here, NULL); 341 342 chan = sp->hdr.cid & RXRPC_CHANNELMASK; 343 candidate->socket = rx; 344 candidate->conn = conn; 345 candidate->peer = conn->params.peer; 346 candidate->cid = sp->hdr.cid; 347 candidate->call_id = sp->hdr.callNumber; 348 candidate->rx_data_post = 0; 349 candidate->state = RXRPC_CALL_SERVER_ACCEPTING; 350 candidate->flags |= (1 << RXRPC_CALL_IS_SERVICE); 351 if (conn->security_ix > 0) 352 candidate->state = RXRPC_CALL_SERVER_SECURING; 353 354 spin_lock(&conn->channel_lock); 355 356 /* set the channel for this call */ 357 call = rcu_dereference_protected(conn->channels[chan].call, 358 lockdep_is_held(&conn->channel_lock)); 359 360 _debug("channel[%u] is %p", candidate->cid & RXRPC_CHANNELMASK, call); 361 if (call && call->call_id == sp->hdr.callNumber) { 362 /* already set; must've been a duplicate packet */ 363 _debug("extant call [%d]", call->state); 364 ASSERTCMP(call->conn, ==, conn); 365 366 read_lock(&call->state_lock); 367 switch (call->state) { 368 case RXRPC_CALL_LOCALLY_ABORTED: 369 if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events)) 370 rxrpc_queue_call(call); 371 case RXRPC_CALL_REMOTELY_ABORTED: 372 read_unlock(&call->state_lock); 373 goto aborted_call; 374 default: 375 rxrpc_get_call(call, rxrpc_call_got); 376 read_unlock(&call->state_lock); 377 goto extant_call; 378 } 379 } 380 381 if (call) { 382 /* it seems the channel is still in use from the previous call 383 * - ditch the old binding if its call is now complete */ 384 _debug("CALL: %u { %s }", 385 call->debug_id, rxrpc_call_states[call->state]); 386 387 if (call->state == RXRPC_CALL_COMPLETE) { 388 __rxrpc_disconnect_call(conn, call); 389 } else { 390 spin_unlock(&conn->channel_lock); 391 kmem_cache_free(rxrpc_call_jar, candidate); 392 _leave(" = -EBUSY"); 393 return ERR_PTR(-EBUSY); 394 } 395 } 396 397 /* check the call number isn't duplicate */ 398 _debug("check dup"); 399 call_id = sp->hdr.callNumber; 400 401 /* We just ignore calls prior to the current call ID. Terminated calls 402 * are handled via the connection. 403 */ 404 if (call_id <= conn->channels[chan].call_counter) 405 goto old_call; /* TODO: Just drop packet */ 406 407 /* make the call available */ 408 _debug("new call"); 409 call = candidate; 410 candidate = NULL; 411 conn->channels[chan].call_counter = call_id; 412 rcu_assign_pointer(conn->channels[chan].call, call); 413 sock_hold(&rx->sk); 414 rxrpc_get_connection(conn); 415 rxrpc_get_peer(call->peer); 416 spin_unlock(&conn->channel_lock); 417 418 spin_lock(&conn->params.peer->lock); 419 hlist_add_head(&call->error_link, &conn->params.peer->error_targets); 420 spin_unlock(&conn->params.peer->lock); 421 422 write_lock_bh(&rxrpc_call_lock); 423 list_add_tail(&call->link, &rxrpc_calls); 424 write_unlock_bh(&rxrpc_call_lock); 425 426 call->service_id = conn->params.service_id; 427 428 _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id); 429 430 call->lifetimer.expires = jiffies + rxrpc_max_call_lifetime; 431 add_timer(&call->lifetimer); 432 _leave(" = %p {%d} [new]", call, call->debug_id); 433 return call; 434 435 extant_call: 436 spin_unlock(&conn->channel_lock); 437 kmem_cache_free(rxrpc_call_jar, candidate); 438 _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1); 439 return call; 440 441 aborted_call: 442 spin_unlock(&conn->channel_lock); 443 kmem_cache_free(rxrpc_call_jar, candidate); 444 _leave(" = -ECONNABORTED"); 445 return ERR_PTR(-ECONNABORTED); 446 447 old_call: 448 spin_unlock(&conn->channel_lock); 449 kmem_cache_free(rxrpc_call_jar, candidate); 450 _leave(" = -ECONNRESET [old]"); 451 return ERR_PTR(-ECONNRESET); 452 } 453 454 /* 455 * Note the re-emergence of a call. 456 */ 457 void rxrpc_see_call(struct rxrpc_call *call) 458 { 459 const void *here = __builtin_return_address(0); 460 if (call) { 461 int n = atomic_read(&call->usage); 462 int m = atomic_read(&call->skb_count); 463 464 trace_rxrpc_call(call, rxrpc_call_seen, n, m, here, NULL); 465 } 466 } 467 468 /* 469 * Note the addition of a ref on a call. 470 */ 471 void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op) 472 { 473 const void *here = __builtin_return_address(0); 474 int n = atomic_inc_return(&call->usage); 475 int m = atomic_read(&call->skb_count); 476 477 trace_rxrpc_call(call, op, n, m, here, NULL); 478 } 479 480 /* 481 * Note the addition of a ref on a call for a socket buffer. 482 */ 483 void rxrpc_get_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb) 484 { 485 const void *here = __builtin_return_address(0); 486 int n = atomic_inc_return(&call->usage); 487 int m = atomic_inc_return(&call->skb_count); 488 489 trace_rxrpc_call(call, rxrpc_call_got_skb, n, m, here, skb); 490 } 491 492 /* 493 * detach a call from a socket and set up for release 494 */ 495 void rxrpc_release_call(struct rxrpc_call *call) 496 { 497 struct rxrpc_connection *conn = call->conn; 498 struct rxrpc_sock *rx = call->socket; 499 500 _enter("{%d,%d,%d,%d}", 501 call->debug_id, atomic_read(&call->usage), 502 atomic_read(&call->ackr_not_idle), 503 call->rx_first_oos); 504 505 rxrpc_see_call(call); 506 507 spin_lock_bh(&call->lock); 508 if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags)) 509 BUG(); 510 spin_unlock_bh(&call->lock); 511 512 /* dissociate from the socket 513 * - the socket's ref on the call is passed to the death timer 514 */ 515 _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn); 516 517 spin_lock(&conn->params.peer->lock); 518 hlist_del_init(&call->error_link); 519 spin_unlock(&conn->params.peer->lock); 520 521 write_lock_bh(&rx->call_lock); 522 if (!list_empty(&call->accept_link)) { 523 _debug("unlinking once-pending call %p { e=%lx f=%lx }", 524 call, call->events, call->flags); 525 ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 526 list_del_init(&call->accept_link); 527 sk_acceptq_removed(&rx->sk); 528 } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 529 rb_erase(&call->sock_node, &rx->calls); 530 memset(&call->sock_node, 0xdd, sizeof(call->sock_node)); 531 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 532 } 533 write_unlock_bh(&rx->call_lock); 534 535 /* free up the channel for reuse */ 536 write_lock_bh(&call->state_lock); 537 538 if (call->state < RXRPC_CALL_COMPLETE && 539 call->state != RXRPC_CALL_CLIENT_FINAL_ACK) { 540 _debug("+++ ABORTING STATE %d +++\n", call->state); 541 __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET); 542 } 543 write_unlock_bh(&call->state_lock); 544 545 rxrpc_disconnect_call(call); 546 547 /* clean up the Rx queue */ 548 if (!skb_queue_empty(&call->rx_queue) || 549 !skb_queue_empty(&call->rx_oos_queue)) { 550 struct rxrpc_skb_priv *sp; 551 struct sk_buff *skb; 552 553 _debug("purge Rx queues"); 554 555 spin_lock_bh(&call->lock); 556 while ((skb = skb_dequeue(&call->rx_queue)) || 557 (skb = skb_dequeue(&call->rx_oos_queue))) { 558 spin_unlock_bh(&call->lock); 559 560 sp = rxrpc_skb(skb); 561 _debug("- zap %s %%%u #%u", 562 rxrpc_pkts[sp->hdr.type], 563 sp->hdr.serial, sp->hdr.seq); 564 rxrpc_free_skb(skb); 565 spin_lock_bh(&call->lock); 566 } 567 spin_unlock_bh(&call->lock); 568 } 569 570 del_timer_sync(&call->resend_timer); 571 del_timer_sync(&call->ack_timer); 572 del_timer_sync(&call->lifetimer); 573 call->deadspan.expires = jiffies + rxrpc_dead_call_expiry; 574 add_timer(&call->deadspan); 575 576 _leave(""); 577 } 578 579 /* 580 * handle a dead call being ready for reaping 581 */ 582 static void rxrpc_dead_call_expired(unsigned long _call) 583 { 584 struct rxrpc_call *call = (struct rxrpc_call *) _call; 585 586 _enter("{%d}", call->debug_id); 587 588 rxrpc_see_call(call); 589 write_lock_bh(&call->state_lock); 590 call->state = RXRPC_CALL_DEAD; 591 write_unlock_bh(&call->state_lock); 592 rxrpc_put_call(call, rxrpc_call_put); 593 } 594 595 /* 596 * mark a call as to be released, aborting it if it's still in progress 597 * - called with softirqs disabled 598 */ 599 static void rxrpc_mark_call_released(struct rxrpc_call *call) 600 { 601 bool sched = false; 602 603 rxrpc_see_call(call); 604 write_lock(&call->state_lock); 605 if (call->state < RXRPC_CALL_DEAD) { 606 sched = __rxrpc_abort_call(call, RX_CALL_DEAD, ECONNRESET); 607 if (!test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events)) 608 sched = true; 609 } 610 write_unlock(&call->state_lock); 611 if (sched) 612 rxrpc_queue_call(call); 613 } 614 615 /* 616 * release all the calls associated with a socket 617 */ 618 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) 619 { 620 struct rxrpc_call *call; 621 struct rb_node *p; 622 623 _enter("%p", rx); 624 625 read_lock_bh(&rx->call_lock); 626 627 /* kill the not-yet-accepted incoming calls */ 628 list_for_each_entry(call, &rx->secureq, accept_link) { 629 rxrpc_mark_call_released(call); 630 } 631 632 list_for_each_entry(call, &rx->acceptq, accept_link) { 633 rxrpc_mark_call_released(call); 634 } 635 636 /* mark all the calls as no longer wanting incoming packets */ 637 for (p = rb_first(&rx->calls); p; p = rb_next(p)) { 638 call = rb_entry(p, struct rxrpc_call, sock_node); 639 rxrpc_mark_call_released(call); 640 } 641 642 read_unlock_bh(&rx->call_lock); 643 _leave(""); 644 } 645 646 /* 647 * release a call 648 */ 649 void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op) 650 { 651 const void *here = __builtin_return_address(0); 652 int n, m; 653 654 ASSERT(call != NULL); 655 656 n = atomic_dec_return(&call->usage); 657 m = atomic_read(&call->skb_count); 658 trace_rxrpc_call(call, op, n, m, here, NULL); 659 ASSERTCMP(n, >=, 0); 660 if (n == 0) { 661 _debug("call %d dead", call->debug_id); 662 WARN_ON(m != 0); 663 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 664 rxrpc_queue_work(&call->destroyer); 665 } 666 } 667 668 /* 669 * Release a call ref held by a socket buffer. 670 */ 671 void rxrpc_put_call_for_skb(struct rxrpc_call *call, struct sk_buff *skb) 672 { 673 const void *here = __builtin_return_address(0); 674 int n, m; 675 676 n = atomic_dec_return(&call->usage); 677 m = atomic_dec_return(&call->skb_count); 678 trace_rxrpc_call(call, rxrpc_call_put_skb, n, m, here, skb); 679 ASSERTCMP(n, >=, 0); 680 if (n == 0) { 681 _debug("call %d dead", call->debug_id); 682 WARN_ON(m != 0); 683 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 684 rxrpc_queue_work(&call->destroyer); 685 } 686 } 687 688 /* 689 * Final call destruction under RCU. 690 */ 691 static void rxrpc_rcu_destroy_call(struct rcu_head *rcu) 692 { 693 struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu); 694 695 rxrpc_purge_queue(&call->rx_queue); 696 rxrpc_purge_queue(&call->knlrecv_queue); 697 rxrpc_put_peer(call->peer); 698 kmem_cache_free(rxrpc_call_jar, call); 699 } 700 701 /* 702 * clean up a call 703 */ 704 static void rxrpc_cleanup_call(struct rxrpc_call *call) 705 { 706 _net("DESTROY CALL %d", call->debug_id); 707 708 ASSERT(call->socket); 709 710 memset(&call->sock_node, 0xcd, sizeof(call->sock_node)); 711 712 del_timer_sync(&call->lifetimer); 713 del_timer_sync(&call->deadspan); 714 del_timer_sync(&call->ack_timer); 715 del_timer_sync(&call->resend_timer); 716 717 ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags)); 718 ASSERTCMP(call->events, ==, 0); 719 if (work_pending(&call->processor)) { 720 _debug("defer destroy"); 721 rxrpc_queue_work(&call->destroyer); 722 return; 723 } 724 725 ASSERTCMP(call->conn, ==, NULL); 726 727 if (call->acks_window) { 728 _debug("kill Tx window %d", 729 CIRC_CNT(call->acks_head, call->acks_tail, 730 call->acks_winsz)); 731 smp_mb(); 732 while (CIRC_CNT(call->acks_head, call->acks_tail, 733 call->acks_winsz) > 0) { 734 struct rxrpc_skb_priv *sp; 735 unsigned long _skb; 736 737 _skb = call->acks_window[call->acks_tail] & ~1; 738 sp = rxrpc_skb((struct sk_buff *)_skb); 739 _debug("+++ clear Tx %u", sp->hdr.seq); 740 rxrpc_free_skb((struct sk_buff *)_skb); 741 call->acks_tail = 742 (call->acks_tail + 1) & (call->acks_winsz - 1); 743 } 744 745 kfree(call->acks_window); 746 } 747 748 rxrpc_free_skb(call->tx_pending); 749 750 rxrpc_purge_queue(&call->rx_queue); 751 ASSERT(skb_queue_empty(&call->rx_oos_queue)); 752 rxrpc_purge_queue(&call->knlrecv_queue); 753 sock_put(&call->socket->sk); 754 call_rcu(&call->rcu, rxrpc_rcu_destroy_call); 755 } 756 757 /* 758 * destroy a call 759 */ 760 static void rxrpc_destroy_call(struct work_struct *work) 761 { 762 struct rxrpc_call *call = 763 container_of(work, struct rxrpc_call, destroyer); 764 765 _enter("%p{%d,%x,%p}", 766 call, atomic_read(&call->usage), call->cid, call->conn); 767 768 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 769 770 write_lock_bh(&rxrpc_call_lock); 771 list_del_init(&call->link); 772 write_unlock_bh(&rxrpc_call_lock); 773 774 rxrpc_cleanup_call(call); 775 _leave(""); 776 } 777 778 /* 779 * preemptively destroy all the call records from a transport endpoint rather 780 * than waiting for them to time out 781 */ 782 void __exit rxrpc_destroy_all_calls(void) 783 { 784 struct rxrpc_call *call; 785 786 _enter(""); 787 write_lock_bh(&rxrpc_call_lock); 788 789 while (!list_empty(&rxrpc_calls)) { 790 call = list_entry(rxrpc_calls.next, struct rxrpc_call, link); 791 _debug("Zapping call %p", call); 792 793 rxrpc_see_call(call); 794 list_del_init(&call->link); 795 796 switch (atomic_read(&call->usage)) { 797 case 0: 798 ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD); 799 break; 800 case 1: 801 if (del_timer_sync(&call->deadspan) != 0 && 802 call->state != RXRPC_CALL_DEAD) 803 rxrpc_dead_call_expired((unsigned long) call); 804 if (call->state != RXRPC_CALL_DEAD) 805 break; 806 default: 807 pr_err("Call %p still in use (%d,%d,%s,%lx,%lx)!\n", 808 call, atomic_read(&call->usage), 809 atomic_read(&call->ackr_not_idle), 810 rxrpc_call_states[call->state], 811 call->flags, call->events); 812 if (!skb_queue_empty(&call->rx_queue)) 813 pr_err("Rx queue occupied\n"); 814 if (!skb_queue_empty(&call->rx_oos_queue)) 815 pr_err("OOS queue occupied\n"); 816 break; 817 } 818 819 write_unlock_bh(&rxrpc_call_lock); 820 cond_resched(); 821 write_lock_bh(&rxrpc_call_lock); 822 } 823 824 write_unlock_bh(&rxrpc_call_lock); 825 _leave(""); 826 } 827 828 /* 829 * handle call lifetime being exceeded 830 */ 831 static void rxrpc_call_life_expired(unsigned long _call) 832 { 833 struct rxrpc_call *call = (struct rxrpc_call *) _call; 834 835 _enter("{%d}", call->debug_id); 836 837 rxrpc_see_call(call); 838 if (call->state >= RXRPC_CALL_COMPLETE) 839 return; 840 841 set_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events); 842 rxrpc_queue_call(call); 843 } 844 845 /* 846 * handle resend timer expiry 847 * - may not take call->state_lock as this can deadlock against del_timer_sync() 848 */ 849 static void rxrpc_resend_time_expired(unsigned long _call) 850 { 851 struct rxrpc_call *call = (struct rxrpc_call *) _call; 852 853 _enter("{%d}", call->debug_id); 854 855 rxrpc_see_call(call); 856 if (call->state >= RXRPC_CALL_COMPLETE) 857 return; 858 859 clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags); 860 if (!test_and_set_bit(RXRPC_CALL_EV_RESEND_TIMER, &call->events)) 861 rxrpc_queue_call(call); 862 } 863 864 /* 865 * handle ACK timer expiry 866 */ 867 static void rxrpc_ack_time_expired(unsigned long _call) 868 { 869 struct rxrpc_call *call = (struct rxrpc_call *) _call; 870 871 _enter("{%d}", call->debug_id); 872 873 rxrpc_see_call(call); 874 if (call->state >= RXRPC_CALL_COMPLETE) 875 return; 876 877 if (!test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events)) 878 rxrpc_queue_call(call); 879 } 880