1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Management of Tx window, Tx resend, ACKs and out-of-sequence reception 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/module.h> 11 #include <linux/circ_buf.h> 12 #include <linux/net.h> 13 #include <linux/skbuff.h> 14 #include <linux/slab.h> 15 #include <linux/udp.h> 16 #include <net/sock.h> 17 #include <net/af_rxrpc.h> 18 #include "ar-internal.h" 19 20 /* 21 * Propose a PING ACK be sent. 22 */ 23 void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial, 24 enum rxrpc_propose_ack_trace why) 25 { 26 unsigned long now = jiffies; 27 unsigned long ping_at = now + rxrpc_idle_ack_delay; 28 29 if (time_before(ping_at, call->ping_at)) { 30 WRITE_ONCE(call->ping_at, ping_at); 31 rxrpc_reduce_call_timer(call, ping_at, now, 32 rxrpc_timer_set_for_ping); 33 trace_rxrpc_propose_ack(call, why, RXRPC_ACK_PING, serial); 34 } 35 } 36 37 /* 38 * Propose a DELAY ACK be sent in the future. 39 */ 40 void rxrpc_propose_delay_ACK(struct rxrpc_call *call, rxrpc_serial_t serial, 41 enum rxrpc_propose_ack_trace why) 42 { 43 unsigned long expiry = rxrpc_soft_ack_delay; 44 unsigned long now = jiffies, ack_at; 45 46 if (rxrpc_soft_ack_delay < expiry) 47 expiry = rxrpc_soft_ack_delay; 48 if (call->peer->srtt_us != 0) 49 ack_at = usecs_to_jiffies(call->peer->srtt_us >> 3); 50 else 51 ack_at = expiry; 52 53 ack_at += READ_ONCE(call->tx_backoff); 54 ack_at += now; 55 if (time_before(ack_at, call->delay_ack_at)) { 56 WRITE_ONCE(call->delay_ack_at, ack_at); 57 rxrpc_reduce_call_timer(call, ack_at, now, 58 rxrpc_timer_set_for_ack); 59 } 60 61 trace_rxrpc_propose_ack(call, why, RXRPC_ACK_DELAY, serial); 62 } 63 64 /* 65 * Queue an ACK for immediate transmission. 66 */ 67 void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason, 68 rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why) 69 { 70 struct rxrpc_txbuf *txb; 71 72 if (test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 73 return; 74 75 rxrpc_inc_stat(call->rxnet, stat_tx_acks[ack_reason]); 76 77 txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_ACK, 78 rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS); 79 if (!txb) { 80 kleave(" = -ENOMEM"); 81 return; 82 } 83 84 txb->ack_why = why; 85 txb->wire.seq = 0; 86 txb->wire.type = RXRPC_PACKET_TYPE_ACK; 87 txb->wire.flags |= RXRPC_SLOW_START_OK; 88 txb->ack.bufferSpace = 0; 89 txb->ack.maxSkew = 0; 90 txb->ack.firstPacket = 0; 91 txb->ack.previousPacket = 0; 92 txb->ack.serial = htonl(serial); 93 txb->ack.reason = ack_reason; 94 txb->ack.nAcks = 0; 95 96 trace_rxrpc_send_ack(call, why, ack_reason, serial); 97 rxrpc_send_ack_packet(call, txb); 98 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_ack_tx); 99 } 100 101 /* 102 * Handle congestion being detected by the retransmit timeout. 103 */ 104 static void rxrpc_congestion_timeout(struct rxrpc_call *call) 105 { 106 set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags); 107 } 108 109 /* 110 * Perform retransmission of NAK'd and unack'd packets. 111 */ 112 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb) 113 { 114 struct rxrpc_ackpacket *ack = NULL; 115 struct rxrpc_skb_priv *sp; 116 struct rxrpc_txbuf *txb; 117 unsigned long resend_at; 118 rxrpc_seq_t transmitted = READ_ONCE(call->tx_transmitted); 119 ktime_t now, max_age, oldest, ack_ts; 120 bool unacked = false; 121 unsigned int i; 122 LIST_HEAD(retrans_queue); 123 124 _enter("{%d,%d}", call->acks_hard_ack, call->tx_top); 125 126 now = ktime_get_real(); 127 max_age = ktime_sub_us(now, jiffies_to_usecs(call->peer->rto_j)); 128 oldest = now; 129 130 if (list_empty(&call->tx_buffer)) 131 goto no_resend; 132 133 if (list_empty(&call->tx_buffer)) 134 goto no_further_resend; 135 136 trace_rxrpc_resend(call, ack_skb); 137 txb = list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link); 138 139 /* Scan the soft ACK table without dropping the lock and resend any 140 * explicitly NAK'd packets. 141 */ 142 if (ack_skb) { 143 sp = rxrpc_skb(ack_skb); 144 ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header); 145 146 for (i = 0; i < sp->nr_acks; i++) { 147 rxrpc_seq_t seq; 148 149 if (ack->acks[i] & 1) 150 continue; 151 seq = sp->first_ack + i; 152 if (after(txb->seq, transmitted)) 153 break; 154 if (after(txb->seq, seq)) 155 continue; /* A new hard ACK probably came in */ 156 list_for_each_entry_from(txb, &call->tx_buffer, call_link) { 157 if (txb->seq == seq) 158 goto found_txb; 159 } 160 goto no_further_resend; 161 162 found_txb: 163 if (after(ntohl(txb->wire.serial), call->acks_highest_serial)) 164 continue; /* Ack point not yet reached */ 165 166 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked); 167 168 if (list_empty(&txb->tx_link)) { 169 list_add_tail(&txb->tx_link, &retrans_queue); 170 set_bit(RXRPC_TXBUF_RESENT, &txb->flags); 171 } 172 173 trace_rxrpc_retransmit(call, txb->seq, 174 ktime_to_ns(ktime_sub(txb->last_sent, 175 max_age))); 176 177 if (list_is_last(&txb->call_link, &call->tx_buffer)) 178 goto no_further_resend; 179 txb = list_next_entry(txb, call_link); 180 } 181 } 182 183 /* Fast-forward through the Tx queue to the point the peer says it has 184 * seen. Anything between the soft-ACK table and that point will get 185 * ACK'd or NACK'd in due course, so don't worry about it here; here we 186 * need to consider retransmitting anything beyond that point. 187 * 188 * Note that ACK for a packet can beat the update of tx_transmitted. 189 */ 190 if (after_eq(READ_ONCE(call->acks_prev_seq), READ_ONCE(call->tx_transmitted))) 191 goto no_further_resend; 192 193 list_for_each_entry_from(txb, &call->tx_buffer, call_link) { 194 if (before_eq(txb->seq, READ_ONCE(call->acks_prev_seq))) 195 continue; 196 if (after(txb->seq, READ_ONCE(call->tx_transmitted))) 197 break; /* Not transmitted yet */ 198 199 if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE && 200 before(ntohl(txb->wire.serial), ntohl(ack->serial))) 201 goto do_resend; /* Wasn't accounted for by a more recent ping. */ 202 203 if (ktime_after(txb->last_sent, max_age)) { 204 if (ktime_before(txb->last_sent, oldest)) 205 oldest = txb->last_sent; 206 continue; 207 } 208 209 do_resend: 210 unacked = true; 211 if (list_empty(&txb->tx_link)) { 212 list_add_tail(&txb->tx_link, &retrans_queue); 213 set_bit(RXRPC_TXBUF_RESENT, &txb->flags); 214 rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); 215 } 216 } 217 218 no_further_resend: 219 no_resend: 220 resend_at = nsecs_to_jiffies(ktime_to_ns(ktime_sub(now, oldest))); 221 resend_at += jiffies + rxrpc_get_rto_backoff(call->peer, 222 !list_empty(&retrans_queue)); 223 WRITE_ONCE(call->resend_at, resend_at); 224 225 if (unacked) 226 rxrpc_congestion_timeout(call); 227 228 /* If there was nothing that needed retransmission then it's likely 229 * that an ACK got lost somewhere. Send a ping to find out instead of 230 * retransmitting data. 231 */ 232 if (list_empty(&retrans_queue)) { 233 rxrpc_reduce_call_timer(call, resend_at, jiffies, 234 rxrpc_timer_set_for_resend); 235 ack_ts = ktime_sub(now, call->acks_latest_ts); 236 if (ktime_to_us(ack_ts) < (call->peer->srtt_us >> 3)) 237 goto out; 238 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 239 rxrpc_propose_ack_ping_for_lost_ack); 240 goto out; 241 } 242 243 /* Retransmit the queue */ 244 while ((txb = list_first_entry_or_null(&retrans_queue, 245 struct rxrpc_txbuf, tx_link))) { 246 list_del_init(&txb->tx_link); 247 rxrpc_transmit_one(call, txb); 248 } 249 250 out: 251 _leave(""); 252 } 253 254 /* 255 * Start transmitting the reply to a service. This cancels the need to ACK the 256 * request if we haven't yet done so. 257 */ 258 static void rxrpc_begin_service_reply(struct rxrpc_call *call) 259 { 260 unsigned long now = jiffies; 261 262 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SEND_REPLY); 263 WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET); 264 if (call->ackr_reason == RXRPC_ACK_DELAY) 265 call->ackr_reason = 0; 266 trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now); 267 } 268 269 /* 270 * Close the transmission phase. After this point there is no more data to be 271 * transmitted in the call. 272 */ 273 static void rxrpc_close_tx_phase(struct rxrpc_call *call) 274 { 275 _debug("________awaiting reply/ACK__________"); 276 277 switch (__rxrpc_call_state(call)) { 278 case RXRPC_CALL_CLIENT_SEND_REQUEST: 279 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY); 280 break; 281 case RXRPC_CALL_SERVER_SEND_REPLY: 282 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_AWAIT_ACK); 283 break; 284 default: 285 break; 286 } 287 } 288 289 static bool rxrpc_tx_window_has_space(struct rxrpc_call *call) 290 { 291 unsigned int winsize = min_t(unsigned int, call->tx_winsize, 292 call->cong_cwnd + call->cong_extra); 293 rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize; 294 rxrpc_seq_t tx_top = call->tx_top; 295 int space; 296 297 space = wtop - tx_top; 298 return space > 0; 299 } 300 301 /* 302 * Decant some if the sendmsg prepared queue into the transmission buffer. 303 */ 304 static void rxrpc_decant_prepared_tx(struct rxrpc_call *call) 305 { 306 struct rxrpc_txbuf *txb; 307 308 if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { 309 if (list_empty(&call->tx_sendmsg)) 310 return; 311 rxrpc_expose_client_call(call); 312 } 313 314 while ((txb = list_first_entry_or_null(&call->tx_sendmsg, 315 struct rxrpc_txbuf, call_link))) { 316 spin_lock(&call->tx_lock); 317 list_del(&txb->call_link); 318 spin_unlock(&call->tx_lock); 319 320 call->tx_top = txb->seq; 321 list_add_tail(&txb->call_link, &call->tx_buffer); 322 323 if (txb->wire.flags & RXRPC_LAST_PACKET) 324 rxrpc_close_tx_phase(call); 325 326 rxrpc_transmit_one(call, txb); 327 328 if (!rxrpc_tx_window_has_space(call)) 329 break; 330 } 331 } 332 333 static void rxrpc_transmit_some_data(struct rxrpc_call *call) 334 { 335 switch (__rxrpc_call_state(call)) { 336 case RXRPC_CALL_SERVER_ACK_REQUEST: 337 if (list_empty(&call->tx_sendmsg)) 338 return; 339 rxrpc_begin_service_reply(call); 340 fallthrough; 341 342 case RXRPC_CALL_SERVER_SEND_REPLY: 343 case RXRPC_CALL_CLIENT_SEND_REQUEST: 344 if (!rxrpc_tx_window_has_space(call)) 345 return; 346 if (list_empty(&call->tx_sendmsg)) { 347 rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow); 348 return; 349 } 350 rxrpc_decant_prepared_tx(call); 351 break; 352 default: 353 return; 354 } 355 } 356 357 /* 358 * Ping the other end to fill our RTT cache and to retrieve the rwind 359 * and MTU parameters. 360 */ 361 static void rxrpc_send_initial_ping(struct rxrpc_call *call) 362 { 363 if (call->peer->rtt_count < 3 || 364 ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), 365 ktime_get_real())) 366 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 367 rxrpc_propose_ack_ping_for_params); 368 } 369 370 /* 371 * Handle retransmission and deferred ACK/abort generation. 372 */ 373 bool rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb) 374 { 375 unsigned long now, next, t; 376 bool resend = false, expired = false; 377 s32 abort_code; 378 379 rxrpc_see_call(call, rxrpc_call_see_input); 380 381 //printk("\n--------------------\n"); 382 _enter("{%d,%s,%lx}", 383 call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)], 384 call->events); 385 386 if (__rxrpc_call_is_complete(call)) 387 goto out; 388 389 /* Handle abort request locklessly, vs rxrpc_propose_abort(). */ 390 abort_code = smp_load_acquire(&call->send_abort); 391 if (abort_code) { 392 rxrpc_abort_call(call, 0, call->send_abort, call->send_abort_err, 393 call->send_abort_why); 394 goto out; 395 } 396 397 if (skb && skb->mark == RXRPC_SKB_MARK_ERROR) 398 goto out; 399 400 /* If we see our async-event poke, check for timeout trippage. */ 401 now = jiffies; 402 t = READ_ONCE(call->expect_rx_by); 403 if (time_after_eq(now, t)) { 404 trace_rxrpc_timer(call, rxrpc_timer_exp_normal, now); 405 expired = true; 406 } 407 408 t = READ_ONCE(call->expect_req_by); 409 if (__rxrpc_call_state(call) == RXRPC_CALL_SERVER_RECV_REQUEST && 410 time_after_eq(now, t)) { 411 trace_rxrpc_timer(call, rxrpc_timer_exp_idle, now); 412 expired = true; 413 } 414 415 t = READ_ONCE(call->expect_term_by); 416 if (time_after_eq(now, t)) { 417 trace_rxrpc_timer(call, rxrpc_timer_exp_hard, now); 418 expired = true; 419 } 420 421 t = READ_ONCE(call->delay_ack_at); 422 if (time_after_eq(now, t)) { 423 trace_rxrpc_timer(call, rxrpc_timer_exp_ack, now); 424 cmpxchg(&call->delay_ack_at, t, now + MAX_JIFFY_OFFSET); 425 rxrpc_send_ACK(call, RXRPC_ACK_DELAY, 0, 426 rxrpc_propose_ack_ping_for_lost_ack); 427 } 428 429 t = READ_ONCE(call->ack_lost_at); 430 if (time_after_eq(now, t)) { 431 trace_rxrpc_timer(call, rxrpc_timer_exp_lost_ack, now); 432 cmpxchg(&call->ack_lost_at, t, now + MAX_JIFFY_OFFSET); 433 set_bit(RXRPC_CALL_EV_ACK_LOST, &call->events); 434 } 435 436 t = READ_ONCE(call->keepalive_at); 437 if (time_after_eq(now, t)) { 438 trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now); 439 cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET); 440 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 441 rxrpc_propose_ack_ping_for_keepalive); 442 } 443 444 t = READ_ONCE(call->ping_at); 445 if (time_after_eq(now, t)) { 446 trace_rxrpc_timer(call, rxrpc_timer_exp_ping, now); 447 cmpxchg(&call->ping_at, t, now + MAX_JIFFY_OFFSET); 448 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 449 rxrpc_propose_ack_ping_for_keepalive); 450 } 451 452 t = READ_ONCE(call->resend_at); 453 if (time_after_eq(now, t)) { 454 trace_rxrpc_timer(call, rxrpc_timer_exp_resend, now); 455 cmpxchg(&call->resend_at, t, now + MAX_JIFFY_OFFSET); 456 resend = true; 457 } 458 459 if (skb) 460 rxrpc_input_call_packet(call, skb); 461 462 rxrpc_transmit_some_data(call); 463 464 if (skb) { 465 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 466 467 if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK) 468 rxrpc_congestion_degrade(call); 469 } 470 471 if (test_and_clear_bit(RXRPC_CALL_EV_INITIAL_PING, &call->events)) 472 rxrpc_send_initial_ping(call); 473 474 /* Process events */ 475 if (expired) { 476 if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) && 477 (int)call->conn->hi_serial - (int)call->rx_serial > 0) { 478 trace_rxrpc_call_reset(call); 479 rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ECONNRESET, 480 rxrpc_abort_call_reset); 481 } else { 482 rxrpc_abort_call(call, 0, RX_CALL_TIMEOUT, -ETIME, 483 rxrpc_abort_call_timeout); 484 } 485 goto out; 486 } 487 488 if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) 489 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 490 rxrpc_propose_ack_ping_for_lost_ack); 491 492 if (resend && __rxrpc_call_state(call) != RXRPC_CALL_CLIENT_RECV_REPLY) 493 rxrpc_resend(call, NULL); 494 495 if (test_and_clear_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 496 rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0, 497 rxrpc_propose_ack_rx_idle); 498 499 if (call->ackr_nr_unacked > 2) { 500 if (call->peer->rtt_count < 3) 501 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 502 rxrpc_propose_ack_ping_for_rtt); 503 else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), 504 ktime_get_real())) 505 rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, 506 rxrpc_propose_ack_ping_for_old_rtt); 507 else 508 rxrpc_send_ACK(call, RXRPC_ACK_IDLE, 0, 509 rxrpc_propose_ack_input_data); 510 } 511 512 /* Make sure the timer is restarted */ 513 if (!__rxrpc_call_is_complete(call)) { 514 next = call->expect_rx_by; 515 516 #define set(T) { t = READ_ONCE(T); if (time_before(t, next)) next = t; } 517 518 set(call->expect_req_by); 519 set(call->expect_term_by); 520 set(call->delay_ack_at); 521 set(call->ack_lost_at); 522 set(call->resend_at); 523 set(call->keepalive_at); 524 set(call->ping_at); 525 526 now = jiffies; 527 if (time_after_eq(now, next)) 528 rxrpc_poke_call(call, rxrpc_call_poke_timer_now); 529 530 rxrpc_reduce_call_timer(call, next, now, rxrpc_timer_restart); 531 } 532 533 out: 534 if (__rxrpc_call_is_complete(call)) { 535 del_timer_sync(&call->timer); 536 if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) 537 rxrpc_disconnect_call(call); 538 if (call->security) 539 call->security->free_call_crypto(call); 540 } 541 if (call->acks_hard_ack != call->tx_bottom) 542 rxrpc_shrink_call_tx_buffer(call); 543 _leave(""); 544 return true; 545 } 546