1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* Processing of received RxRPC packets 3 * 4 * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 static void rxrpc_proto_abort(struct rxrpc_call *call, rxrpc_seq_t seq, 13 enum rxrpc_abort_reason why) 14 { 15 rxrpc_abort_call(call, seq, RX_PROTOCOL_ERROR, -EBADMSG, why); 16 } 17 18 /* 19 * Do TCP-style congestion management [RFC 5681]. 20 */ 21 static void rxrpc_congestion_management(struct rxrpc_call *call, 22 struct sk_buff *skb, 23 struct rxrpc_ack_summary *summary, 24 rxrpc_serial_t acked_serial) 25 { 26 enum rxrpc_congest_change change = rxrpc_cong_no_change; 27 unsigned int cumulative_acks = call->cong_cumul_acks; 28 unsigned int cwnd = call->cong_cwnd; 29 bool resend = false; 30 31 summary->flight_size = 32 (call->tx_top - call->acks_hard_ack) - summary->nr_acks; 33 34 if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) { 35 summary->retrans_timeo = true; 36 call->cong_ssthresh = max_t(unsigned int, 37 summary->flight_size / 2, 2); 38 cwnd = 1; 39 if (cwnd >= call->cong_ssthresh && 40 call->cong_mode == RXRPC_CALL_SLOW_START) { 41 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 42 call->cong_tstamp = skb->tstamp; 43 cumulative_acks = 0; 44 } 45 } 46 47 cumulative_acks += summary->nr_new_acks; 48 if (cumulative_acks > 255) 49 cumulative_acks = 255; 50 51 summary->cwnd = call->cong_cwnd; 52 summary->ssthresh = call->cong_ssthresh; 53 summary->cumulative_acks = cumulative_acks; 54 summary->dup_acks = call->cong_dup_acks; 55 56 switch (call->cong_mode) { 57 case RXRPC_CALL_SLOW_START: 58 if (summary->saw_nacks) 59 goto packet_loss_detected; 60 if (summary->cumulative_acks > 0) 61 cwnd += 1; 62 if (cwnd >= call->cong_ssthresh) { 63 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 64 call->cong_tstamp = skb->tstamp; 65 } 66 goto out; 67 68 case RXRPC_CALL_CONGEST_AVOIDANCE: 69 if (summary->saw_nacks) 70 goto packet_loss_detected; 71 72 /* We analyse the number of packets that get ACK'd per RTT 73 * period and increase the window if we managed to fill it. 74 */ 75 if (call->peer->rtt_count == 0) 76 goto out; 77 if (ktime_before(skb->tstamp, 78 ktime_add_us(call->cong_tstamp, 79 call->peer->srtt_us >> 3))) 80 goto out_no_clear_ca; 81 change = rxrpc_cong_rtt_window_end; 82 call->cong_tstamp = skb->tstamp; 83 if (cumulative_acks >= cwnd) 84 cwnd++; 85 goto out; 86 87 case RXRPC_CALL_PACKET_LOSS: 88 if (!summary->saw_nacks) 89 goto resume_normality; 90 91 if (summary->new_low_nack) { 92 change = rxrpc_cong_new_low_nack; 93 call->cong_dup_acks = 1; 94 if (call->cong_extra > 1) 95 call->cong_extra = 1; 96 goto send_extra_data; 97 } 98 99 call->cong_dup_acks++; 100 if (call->cong_dup_acks < 3) 101 goto send_extra_data; 102 103 change = rxrpc_cong_begin_retransmission; 104 call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT; 105 call->cong_ssthresh = max_t(unsigned int, 106 summary->flight_size / 2, 2); 107 cwnd = call->cong_ssthresh + 3; 108 call->cong_extra = 0; 109 call->cong_dup_acks = 0; 110 resend = true; 111 goto out; 112 113 case RXRPC_CALL_FAST_RETRANSMIT: 114 if (!summary->new_low_nack) { 115 if (summary->nr_new_acks == 0) 116 cwnd += 1; 117 call->cong_dup_acks++; 118 if (call->cong_dup_acks == 2) { 119 change = rxrpc_cong_retransmit_again; 120 call->cong_dup_acks = 0; 121 resend = true; 122 } 123 } else { 124 change = rxrpc_cong_progress; 125 cwnd = call->cong_ssthresh; 126 if (!summary->saw_nacks) 127 goto resume_normality; 128 } 129 goto out; 130 131 default: 132 BUG(); 133 goto out; 134 } 135 136 resume_normality: 137 change = rxrpc_cong_cleared_nacks; 138 call->cong_dup_acks = 0; 139 call->cong_extra = 0; 140 call->cong_tstamp = skb->tstamp; 141 if (cwnd < call->cong_ssthresh) 142 call->cong_mode = RXRPC_CALL_SLOW_START; 143 else 144 call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE; 145 out: 146 cumulative_acks = 0; 147 out_no_clear_ca: 148 if (cwnd >= RXRPC_TX_MAX_WINDOW) 149 cwnd = RXRPC_TX_MAX_WINDOW; 150 call->cong_cwnd = cwnd; 151 call->cong_cumul_acks = cumulative_acks; 152 summary->mode = call->cong_mode; 153 trace_rxrpc_congest(call, summary, acked_serial, change); 154 if (resend) 155 rxrpc_resend(call, skb); 156 return; 157 158 packet_loss_detected: 159 change = rxrpc_cong_saw_nack; 160 call->cong_mode = RXRPC_CALL_PACKET_LOSS; 161 call->cong_dup_acks = 0; 162 goto send_extra_data; 163 164 send_extra_data: 165 /* Send some previously unsent DATA if we have some to advance the ACK 166 * state. 167 */ 168 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) || 169 summary->nr_acks != call->tx_top - call->acks_hard_ack) { 170 call->cong_extra++; 171 wake_up(&call->waitq); 172 } 173 goto out_no_clear_ca; 174 } 175 176 /* 177 * Degrade the congestion window if we haven't transmitted a packet for >1RTT. 178 */ 179 void rxrpc_congestion_degrade(struct rxrpc_call *call) 180 { 181 ktime_t rtt, now; 182 183 if (call->cong_mode != RXRPC_CALL_SLOW_START && 184 call->cong_mode != RXRPC_CALL_CONGEST_AVOIDANCE) 185 return; 186 if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_REPLY) 187 return; 188 189 rtt = ns_to_ktime(call->peer->srtt_us * (1000 / 8)); 190 now = ktime_get_real(); 191 if (!ktime_before(ktime_add(call->tx_last_sent, rtt), now)) 192 return; 193 194 trace_rxrpc_reset_cwnd(call, now); 195 rxrpc_inc_stat(call->rxnet, stat_tx_data_cwnd_reset); 196 call->tx_last_sent = now; 197 call->cong_mode = RXRPC_CALL_SLOW_START; 198 call->cong_ssthresh = max_t(unsigned int, call->cong_ssthresh, 199 call->cong_cwnd * 3 / 4); 200 call->cong_cwnd = max_t(unsigned int, call->cong_cwnd / 2, RXRPC_MIN_CWND); 201 } 202 203 /* 204 * Apply a hard ACK by advancing the Tx window. 205 */ 206 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, 207 struct rxrpc_ack_summary *summary) 208 { 209 struct rxrpc_txbuf *txb; 210 bool rot_last = false; 211 212 list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) { 213 if (before_eq(txb->seq, call->acks_hard_ack)) 214 continue; 215 if (test_bit(RXRPC_TXBUF_LAST, &txb->flags)) { 216 set_bit(RXRPC_CALL_TX_LAST, &call->flags); 217 rot_last = true; 218 } 219 if (txb->seq == to) 220 break; 221 } 222 223 if (rot_last) 224 set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags); 225 226 _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); 227 228 if (call->acks_lowest_nak == call->acks_hard_ack) { 229 call->acks_lowest_nak = to; 230 } else if (after(to, call->acks_lowest_nak)) { 231 summary->new_low_nack = true; 232 call->acks_lowest_nak = to; 233 } 234 235 smp_store_release(&call->acks_hard_ack, to); 236 237 trace_rxrpc_txqueue(call, (rot_last ? 238 rxrpc_txqueue_rotate_last : 239 rxrpc_txqueue_rotate)); 240 wake_up(&call->waitq); 241 return rot_last; 242 } 243 244 /* 245 * End the transmission phase of a call. 246 * 247 * This occurs when we get an ACKALL packet, the first DATA packet of a reply, 248 * or a final ACK packet. 249 */ 250 static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun, 251 enum rxrpc_abort_reason abort_why) 252 { 253 ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags)); 254 255 if (unlikely(call->cong_last_nack)) { 256 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); 257 call->cong_last_nack = NULL; 258 } 259 260 switch (__rxrpc_call_state(call)) { 261 case RXRPC_CALL_CLIENT_SEND_REQUEST: 262 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 263 if (reply_begun) { 264 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_RECV_REPLY); 265 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 266 break; 267 } 268 269 rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY); 270 trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply); 271 break; 272 273 case RXRPC_CALL_SERVER_AWAIT_ACK: 274 rxrpc_call_completed(call); 275 trace_rxrpc_txqueue(call, rxrpc_txqueue_end); 276 break; 277 278 default: 279 kdebug("end_tx %s", rxrpc_call_states[__rxrpc_call_state(call)]); 280 rxrpc_proto_abort(call, call->tx_top, abort_why); 281 break; 282 } 283 } 284 285 /* 286 * Begin the reply reception phase of a call. 287 */ 288 static bool rxrpc_receiving_reply(struct rxrpc_call *call) 289 { 290 struct rxrpc_ack_summary summary = { 0 }; 291 unsigned long now, timo; 292 rxrpc_seq_t top = READ_ONCE(call->tx_top); 293 294 if (call->ackr_reason) { 295 now = jiffies; 296 timo = now + MAX_JIFFY_OFFSET; 297 298 WRITE_ONCE(call->delay_ack_at, timo); 299 trace_rxrpc_timer(call, rxrpc_timer_init_for_reply, now); 300 } 301 302 if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) { 303 if (!rxrpc_rotate_tx_window(call, top, &summary)) { 304 rxrpc_proto_abort(call, top, rxrpc_eproto_early_reply); 305 return false; 306 } 307 } 308 309 rxrpc_end_tx_phase(call, true, rxrpc_eproto_unexpected_reply); 310 return true; 311 } 312 313 /* 314 * End the packet reception phase. 315 */ 316 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 317 { 318 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 319 320 _enter("%d,%s", call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)]); 321 322 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 323 324 switch (__rxrpc_call_state(call)) { 325 case RXRPC_CALL_CLIENT_RECV_REPLY: 326 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 327 rxrpc_call_completed(call); 328 break; 329 330 case RXRPC_CALL_SERVER_RECV_REQUEST: 331 rxrpc_set_call_state(call, RXRPC_CALL_SERVER_ACK_REQUEST); 332 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 333 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_processing_op); 334 break; 335 336 default: 337 break; 338 } 339 } 340 341 static void rxrpc_input_update_ack_window(struct rxrpc_call *call, 342 rxrpc_seq_t window, rxrpc_seq_t wtop) 343 { 344 call->ackr_window = window; 345 call->ackr_wtop = wtop; 346 } 347 348 /* 349 * Push a DATA packet onto the Rx queue. 350 */ 351 static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb, 352 rxrpc_seq_t window, rxrpc_seq_t wtop, 353 enum rxrpc_receive_trace why) 354 { 355 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 356 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 357 358 __skb_queue_tail(&call->recvmsg_queue, skb); 359 rxrpc_input_update_ack_window(call, window, wtop); 360 trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); 361 if (last) 362 rxrpc_end_rx_phase(call, sp->hdr.serial); 363 } 364 365 /* 366 * Process a DATA packet. 367 */ 368 static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb, 369 bool *_notify) 370 { 371 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 372 struct sk_buff *oos; 373 rxrpc_serial_t serial = sp->hdr.serial; 374 unsigned int sack = call->ackr_sack_base; 375 rxrpc_seq_t window = call->ackr_window; 376 rxrpc_seq_t wtop = call->ackr_wtop; 377 rxrpc_seq_t wlimit = window + call->rx_winsize - 1; 378 rxrpc_seq_t seq = sp->hdr.seq; 379 bool last = sp->hdr.flags & RXRPC_LAST_PACKET; 380 int ack_reason = -1; 381 382 rxrpc_inc_stat(call->rxnet, stat_rx_data); 383 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 384 rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack); 385 if (sp->hdr.flags & RXRPC_JUMBO_PACKET) 386 rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo); 387 388 if (last) { 389 if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && 390 seq + 1 != wtop) 391 return rxrpc_proto_abort(call, seq, rxrpc_eproto_different_last); 392 } else { 393 if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && 394 after_eq(seq, wtop)) { 395 pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n", 396 call->debug_id, seq, window, wtop, wlimit); 397 return rxrpc_proto_abort(call, seq, rxrpc_eproto_data_after_last); 398 } 399 } 400 401 if (after(seq, call->rx_highest_seq)) 402 call->rx_highest_seq = seq; 403 404 trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags); 405 406 if (before(seq, window)) { 407 ack_reason = RXRPC_ACK_DUPLICATE; 408 goto send_ack; 409 } 410 if (after(seq, wlimit)) { 411 ack_reason = RXRPC_ACK_EXCEEDS_WINDOW; 412 goto send_ack; 413 } 414 415 /* Queue the packet. */ 416 if (seq == window) { 417 if (sp->hdr.flags & RXRPC_REQUEST_ACK) 418 ack_reason = RXRPC_ACK_REQUESTED; 419 /* Send an immediate ACK if we fill in a hole */ 420 else if (!skb_queue_empty(&call->rx_oos_queue)) 421 ack_reason = RXRPC_ACK_DELAY; 422 else 423 call->ackr_nr_unacked++; 424 425 window++; 426 if (after(window, wtop)) { 427 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none); 428 wtop = window; 429 } else { 430 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance); 431 sack = (sack + 1) % RXRPC_SACK_SIZE; 432 } 433 434 435 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg); 436 437 spin_lock(&call->recvmsg_queue.lock); 438 rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue); 439 *_notify = true; 440 441 while ((oos = skb_peek(&call->rx_oos_queue))) { 442 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 443 444 if (after(osp->hdr.seq, window)) 445 break; 446 447 __skb_unlink(oos, &call->rx_oos_queue); 448 last = osp->hdr.flags & RXRPC_LAST_PACKET; 449 seq = osp->hdr.seq; 450 call->ackr_sack_table[sack] = 0; 451 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill); 452 sack = (sack + 1) % RXRPC_SACK_SIZE; 453 454 window++; 455 rxrpc_input_queue_data(call, oos, window, wtop, 456 rxrpc_receive_queue_oos); 457 } 458 459 spin_unlock(&call->recvmsg_queue.lock); 460 461 call->ackr_sack_base = sack; 462 } else { 463 unsigned int slot; 464 465 ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE; 466 467 slot = seq - window; 468 sack = (sack + slot) % RXRPC_SACK_SIZE; 469 470 if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) { 471 ack_reason = RXRPC_ACK_DUPLICATE; 472 goto send_ack; 473 } 474 475 call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1; 476 trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos); 477 478 if (after(seq + 1, wtop)) { 479 wtop = seq + 1; 480 rxrpc_input_update_ack_window(call, window, wtop); 481 } 482 483 skb_queue_walk(&call->rx_oos_queue, oos) { 484 struct rxrpc_skb_priv *osp = rxrpc_skb(oos); 485 486 if (after(osp->hdr.seq, seq)) { 487 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 488 __skb_queue_before(&call->rx_oos_queue, oos, skb); 489 goto oos_queued; 490 } 491 } 492 493 rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); 494 __skb_queue_tail(&call->rx_oos_queue, skb); 495 oos_queued: 496 trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos, 497 sp->hdr.serial, sp->hdr.seq); 498 } 499 500 send_ack: 501 if (ack_reason >= 0) 502 rxrpc_send_ACK(call, ack_reason, serial, 503 rxrpc_propose_ack_input_data); 504 else 505 rxrpc_propose_delay_ACK(call, serial, 506 rxrpc_propose_ack_input_data); 507 } 508 509 /* 510 * Split a jumbo packet and file the bits separately. 511 */ 512 static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb) 513 { 514 struct rxrpc_jumbo_header jhdr; 515 struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp; 516 struct sk_buff *jskb; 517 unsigned int offset = sizeof(struct rxrpc_wire_header); 518 unsigned int len = skb->len - offset; 519 bool notify = false; 520 521 while (sp->hdr.flags & RXRPC_JUMBO_PACKET) { 522 if (len < RXRPC_JUMBO_SUBPKTLEN) 523 goto protocol_error; 524 if (sp->hdr.flags & RXRPC_LAST_PACKET) 525 goto protocol_error; 526 if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN, 527 &jhdr, sizeof(jhdr)) < 0) 528 goto protocol_error; 529 530 jskb = skb_clone(skb, GFP_NOFS); 531 if (!jskb) { 532 kdebug("couldn't clone"); 533 return false; 534 } 535 rxrpc_new_skb(jskb, rxrpc_skb_new_jumbo_subpacket); 536 jsp = rxrpc_skb(jskb); 537 jsp->offset = offset; 538 jsp->len = RXRPC_JUMBO_DATALEN; 539 rxrpc_input_data_one(call, jskb, ¬ify); 540 rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket); 541 542 sp->hdr.flags = jhdr.flags; 543 sp->hdr._rsvd = ntohs(jhdr._rsvd); 544 sp->hdr.seq++; 545 sp->hdr.serial++; 546 offset += RXRPC_JUMBO_SUBPKTLEN; 547 len -= RXRPC_JUMBO_SUBPKTLEN; 548 } 549 550 sp->offset = offset; 551 sp->len = len; 552 rxrpc_input_data_one(call, skb, ¬ify); 553 if (notify) { 554 trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial); 555 rxrpc_notify_socket(call); 556 } 557 return true; 558 559 protocol_error: 560 return false; 561 } 562 563 /* 564 * Process a DATA packet, adding the packet to the Rx ring. The caller's 565 * packet ref must be passed on or discarded. 566 */ 567 static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb) 568 { 569 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 570 rxrpc_serial_t serial = sp->hdr.serial; 571 rxrpc_seq_t seq0 = sp->hdr.seq; 572 573 _enter("{%x,%x,%x},{%u,%x}", 574 call->ackr_window, call->ackr_wtop, call->rx_highest_seq, 575 skb->len, seq0); 576 577 if (__rxrpc_call_is_complete(call)) 578 return; 579 580 switch (__rxrpc_call_state(call)) { 581 case RXRPC_CALL_CLIENT_SEND_REQUEST: 582 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 583 /* Received data implicitly ACKs all of the request 584 * packets we sent when we're acting as a client. 585 */ 586 if (!rxrpc_receiving_reply(call)) 587 goto out_notify; 588 break; 589 590 case RXRPC_CALL_SERVER_RECV_REQUEST: { 591 unsigned long timo = READ_ONCE(call->next_req_timo); 592 unsigned long now, expect_req_by; 593 594 if (timo) { 595 now = jiffies; 596 expect_req_by = now + timo; 597 WRITE_ONCE(call->expect_req_by, expect_req_by); 598 rxrpc_reduce_call_timer(call, expect_req_by, now, 599 rxrpc_timer_set_for_idle); 600 } 601 break; 602 } 603 604 default: 605 break; 606 } 607 608 if (!rxrpc_input_split_jumbo(call, skb)) { 609 rxrpc_proto_abort(call, sp->hdr.seq, rxrpc_badmsg_bad_jumbo); 610 goto out_notify; 611 } 612 return; 613 614 out_notify: 615 trace_rxrpc_notify_socket(call->debug_id, serial); 616 rxrpc_notify_socket(call); 617 _leave(" [queued]"); 618 } 619 620 /* 621 * See if there's a cached RTT probe to complete. 622 */ 623 static void rxrpc_complete_rtt_probe(struct rxrpc_call *call, 624 ktime_t resp_time, 625 rxrpc_serial_t acked_serial, 626 rxrpc_serial_t ack_serial, 627 enum rxrpc_rtt_rx_trace type) 628 { 629 rxrpc_serial_t orig_serial; 630 unsigned long avail; 631 ktime_t sent_at; 632 bool matched = false; 633 int i; 634 635 avail = READ_ONCE(call->rtt_avail); 636 smp_rmb(); /* Read avail bits before accessing data. */ 637 638 for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) { 639 if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail)) 640 continue; 641 642 sent_at = call->rtt_sent_at[i]; 643 orig_serial = call->rtt_serial[i]; 644 645 if (orig_serial == acked_serial) { 646 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 647 smp_mb(); /* Read data before setting avail bit */ 648 set_bit(i, &call->rtt_avail); 649 rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial, 650 sent_at, resp_time); 651 matched = true; 652 } 653 654 /* If a later serial is being acked, then mark this slot as 655 * being available. 656 */ 657 if (after(acked_serial, orig_serial)) { 658 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i, 659 orig_serial, acked_serial, 0, 0); 660 clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail); 661 smp_wmb(); 662 set_bit(i, &call->rtt_avail); 663 } 664 } 665 666 if (!matched) 667 trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0); 668 } 669 670 /* 671 * Process the extra information that may be appended to an ACK packet 672 */ 673 static void rxrpc_input_ackinfo(struct rxrpc_call *call, struct sk_buff *skb, 674 struct rxrpc_ackinfo *ackinfo) 675 { 676 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 677 struct rxrpc_peer *peer; 678 unsigned int mtu; 679 bool wake = false; 680 u32 rwind = ntohl(ackinfo->rwind); 681 682 if (rwind > RXRPC_TX_MAX_WINDOW) 683 rwind = RXRPC_TX_MAX_WINDOW; 684 if (call->tx_winsize != rwind) { 685 if (rwind > call->tx_winsize) 686 wake = true; 687 trace_rxrpc_rx_rwind_change(call, sp->hdr.serial, rwind, wake); 688 call->tx_winsize = rwind; 689 } 690 691 if (call->cong_ssthresh > rwind) 692 call->cong_ssthresh = rwind; 693 694 mtu = min(ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU)); 695 696 peer = call->peer; 697 if (mtu < peer->maxdata) { 698 spin_lock(&peer->lock); 699 peer->maxdata = mtu; 700 peer->mtu = mtu + peer->hdrsize; 701 spin_unlock(&peer->lock); 702 } 703 704 if (wake) 705 wake_up(&call->waitq); 706 } 707 708 /* 709 * Determine how many nacks from the previous ACK have now been satisfied. 710 */ 711 static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call, 712 struct rxrpc_ack_summary *summary, 713 rxrpc_seq_t seq) 714 { 715 struct sk_buff *skb = call->cong_last_nack; 716 struct rxrpc_ackpacket ack; 717 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 718 unsigned int i, new_acks = 0, retained_nacks = 0; 719 rxrpc_seq_t old_seq = sp->first_ack; 720 u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(ack); 721 722 if (after_eq(seq, old_seq + sp->nr_acks)) { 723 summary->nr_new_acks += sp->nr_nacks; 724 summary->nr_new_acks += seq - (old_seq + sp->nr_acks); 725 summary->nr_retained_nacks = 0; 726 } else if (seq == old_seq) { 727 summary->nr_retained_nacks = sp->nr_nacks; 728 } else { 729 for (i = 0; i < sp->nr_acks; i++) { 730 if (acks[i] == RXRPC_ACK_TYPE_NACK) { 731 if (before(old_seq + i, seq)) 732 new_acks++; 733 else 734 retained_nacks++; 735 } 736 } 737 738 summary->nr_new_acks += new_acks; 739 summary->nr_retained_nacks = retained_nacks; 740 } 741 742 return old_seq + sp->nr_acks; 743 } 744 745 /* 746 * Process individual soft ACKs. 747 * 748 * Each ACK in the array corresponds to one packet and can be either an ACK or 749 * a NAK. If we get find an explicitly NAK'd packet we resend immediately; 750 * packets that lie beyond the end of the ACK list are scheduled for resend by 751 * the timer on the basis that the peer might just not have processed them at 752 * the time the ACK was sent. 753 */ 754 static void rxrpc_input_soft_acks(struct rxrpc_call *call, 755 struct rxrpc_ack_summary *summary, 756 struct sk_buff *skb, 757 rxrpc_seq_t seq, 758 rxrpc_seq_t since) 759 { 760 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 761 unsigned int i, old_nacks = 0; 762 rxrpc_seq_t lowest_nak = seq + sp->nr_acks; 763 u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket); 764 765 for (i = 0; i < sp->nr_acks; i++) { 766 if (acks[i] == RXRPC_ACK_TYPE_ACK) { 767 summary->nr_acks++; 768 if (after_eq(seq, since)) 769 summary->nr_new_acks++; 770 } else { 771 summary->saw_nacks = true; 772 if (before(seq, since)) { 773 /* Overlap with previous ACK */ 774 old_nacks++; 775 } else { 776 summary->nr_new_nacks++; 777 sp->nr_nacks++; 778 } 779 780 if (before(seq, lowest_nak)) 781 lowest_nak = seq; 782 } 783 seq++; 784 } 785 786 if (lowest_nak != call->acks_lowest_nak) { 787 call->acks_lowest_nak = lowest_nak; 788 summary->new_low_nack = true; 789 } 790 791 /* We *can* have more nacks than we did - the peer is permitted to drop 792 * packets it has soft-acked and re-request them. Further, it is 793 * possible for the nack distribution to change whilst the number of 794 * nacks stays the same or goes down. 795 */ 796 if (old_nacks < summary->nr_retained_nacks) 797 summary->nr_new_acks += summary->nr_retained_nacks - old_nacks; 798 summary->nr_retained_nacks = old_nacks; 799 } 800 801 /* 802 * Return true if the ACK is valid - ie. it doesn't appear to have regressed 803 * with respect to the ack state conveyed by preceding ACKs. 804 */ 805 static bool rxrpc_is_ack_valid(struct rxrpc_call *call, 806 rxrpc_seq_t first_pkt, rxrpc_seq_t prev_pkt) 807 { 808 rxrpc_seq_t base = READ_ONCE(call->acks_first_seq); 809 810 if (after(first_pkt, base)) 811 return true; /* The window advanced */ 812 813 if (before(first_pkt, base)) 814 return false; /* firstPacket regressed */ 815 816 if (after_eq(prev_pkt, call->acks_prev_seq)) 817 return true; /* previousPacket hasn't regressed. */ 818 819 /* Some rx implementations put a serial number in previousPacket. */ 820 if (after_eq(prev_pkt, base + call->tx_winsize)) 821 return false; 822 return true; 823 } 824 825 /* 826 * Process an ACK packet. 827 * 828 * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet 829 * in the ACK array. Anything before that is hard-ACK'd and may be discarded. 830 * 831 * A hard-ACK means that a packet has been processed and may be discarded; a 832 * soft-ACK means that the packet may be discarded and retransmission 833 * requested. A phase is complete when all packets are hard-ACK'd. 834 */ 835 static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb) 836 { 837 struct rxrpc_ack_summary summary = { 0 }; 838 struct rxrpc_ackpacket ack; 839 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 840 struct rxrpc_ackinfo info; 841 rxrpc_serial_t ack_serial, acked_serial; 842 rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt, since; 843 int nr_acks, offset, ioffset; 844 845 _enter(""); 846 847 offset = sizeof(struct rxrpc_wire_header); 848 if (skb_copy_bits(skb, offset, &ack, sizeof(ack)) < 0) 849 return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack); 850 offset += sizeof(ack); 851 852 ack_serial = sp->hdr.serial; 853 acked_serial = ntohl(ack.serial); 854 first_soft_ack = ntohl(ack.firstPacket); 855 prev_pkt = ntohl(ack.previousPacket); 856 hard_ack = first_soft_ack - 1; 857 nr_acks = ack.nAcks; 858 sp->first_ack = first_soft_ack; 859 sp->nr_acks = nr_acks; 860 summary.ack_reason = (ack.reason < RXRPC_ACK__INVALID ? 861 ack.reason : RXRPC_ACK__INVALID); 862 863 trace_rxrpc_rx_ack(call, ack_serial, acked_serial, 864 first_soft_ack, prev_pkt, 865 summary.ack_reason, nr_acks); 866 rxrpc_inc_stat(call->rxnet, stat_rx_acks[ack.reason]); 867 868 if (acked_serial != 0) { 869 switch (ack.reason) { 870 case RXRPC_ACK_PING_RESPONSE: 871 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 872 rxrpc_rtt_rx_ping_response); 873 break; 874 case RXRPC_ACK_REQUESTED: 875 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 876 rxrpc_rtt_rx_requested_ack); 877 break; 878 default: 879 rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial, 880 rxrpc_rtt_rx_other_ack); 881 break; 882 } 883 } 884 885 /* If we get an EXCEEDS_WINDOW ACK from the server, it probably 886 * indicates that the client address changed due to NAT. The server 887 * lost the call because it switched to a different peer. 888 */ 889 if (unlikely(ack.reason == RXRPC_ACK_EXCEEDS_WINDOW) && 890 first_soft_ack == 1 && 891 prev_pkt == 0 && 892 rxrpc_is_client_call(call)) { 893 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 894 0, -ENETRESET); 895 goto send_response; 896 } 897 898 /* If we get an OUT_OF_SEQUENCE ACK from the server, that can also 899 * indicate a change of address. However, we can retransmit the call 900 * if we still have it buffered to the beginning. 901 */ 902 if (unlikely(ack.reason == RXRPC_ACK_OUT_OF_SEQUENCE) && 903 first_soft_ack == 1 && 904 prev_pkt == 0 && 905 call->acks_hard_ack == 0 && 906 rxrpc_is_client_call(call)) { 907 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 908 0, -ENETRESET); 909 goto send_response; 910 } 911 912 /* Discard any out-of-order or duplicate ACKs (outside lock). */ 913 if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) { 914 trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial, 915 first_soft_ack, call->acks_first_seq, 916 prev_pkt, call->acks_prev_seq); 917 goto send_response; 918 } 919 920 info.rxMTU = 0; 921 ioffset = offset + nr_acks + 3; 922 if (skb->len >= ioffset + sizeof(info) && 923 skb_copy_bits(skb, ioffset, &info, sizeof(info)) < 0) 924 return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack_info); 925 926 if (nr_acks > 0) 927 skb_condense(skb); 928 929 if (call->cong_last_nack) { 930 since = rxrpc_input_check_prev_ack(call, &summary, first_soft_ack); 931 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); 932 call->cong_last_nack = NULL; 933 } else { 934 summary.nr_new_acks = first_soft_ack - call->acks_first_seq; 935 call->acks_lowest_nak = first_soft_ack + nr_acks; 936 since = first_soft_ack; 937 } 938 939 call->acks_latest_ts = skb->tstamp; 940 call->acks_first_seq = first_soft_ack; 941 call->acks_prev_seq = prev_pkt; 942 943 switch (ack.reason) { 944 case RXRPC_ACK_PING: 945 break; 946 default: 947 if (acked_serial && after(acked_serial, call->acks_highest_serial)) 948 call->acks_highest_serial = acked_serial; 949 break; 950 } 951 952 /* Parse rwind and mtu sizes if provided. */ 953 if (info.rxMTU) 954 rxrpc_input_ackinfo(call, skb, &info); 955 956 if (first_soft_ack == 0) 957 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_zero); 958 959 /* Ignore ACKs unless we are or have just been transmitting. */ 960 switch (__rxrpc_call_state(call)) { 961 case RXRPC_CALL_CLIENT_SEND_REQUEST: 962 case RXRPC_CALL_CLIENT_AWAIT_REPLY: 963 case RXRPC_CALL_SERVER_SEND_REPLY: 964 case RXRPC_CALL_SERVER_AWAIT_ACK: 965 break; 966 default: 967 goto send_response; 968 } 969 970 if (before(hard_ack, call->acks_hard_ack) || 971 after(hard_ack, call->tx_top)) 972 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_outside_window); 973 if (nr_acks > call->tx_top - hard_ack) 974 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow); 975 976 if (after(hard_ack, call->acks_hard_ack)) { 977 if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) { 978 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack); 979 goto send_response; 980 } 981 } 982 983 if (nr_acks > 0) { 984 if (offset > (int)skb->len - nr_acks) 985 return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack); 986 rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack, since); 987 rxrpc_get_skb(skb, rxrpc_skb_get_last_nack); 988 call->cong_last_nack = skb; 989 } 990 991 if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) && 992 summary.nr_acks == call->tx_top - hard_ack && 993 rxrpc_is_client_call(call)) 994 rxrpc_propose_ping(call, ack_serial, 995 rxrpc_propose_ack_ping_for_lost_reply); 996 997 rxrpc_congestion_management(call, skb, &summary, acked_serial); 998 999 send_response: 1000 if (ack.reason == RXRPC_ACK_PING) 1001 rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial, 1002 rxrpc_propose_ack_respond_to_ping); 1003 else if (sp->hdr.flags & RXRPC_REQUEST_ACK) 1004 rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial, 1005 rxrpc_propose_ack_respond_to_ack); 1006 } 1007 1008 /* 1009 * Process an ACKALL packet. 1010 */ 1011 static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb) 1012 { 1013 struct rxrpc_ack_summary summary = { 0 }; 1014 1015 if (rxrpc_rotate_tx_window(call, call->tx_top, &summary)) 1016 rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ackall); 1017 } 1018 1019 /* 1020 * Process an ABORT packet directed at a call. 1021 */ 1022 static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb) 1023 { 1024 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1025 1026 trace_rxrpc_rx_abort(call, sp->hdr.serial, skb->priority); 1027 1028 rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, 1029 skb->priority, -ECONNABORTED); 1030 } 1031 1032 /* 1033 * Process an incoming call packet. 1034 */ 1035 void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb) 1036 { 1037 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 1038 unsigned long timo; 1039 1040 _enter("%p,%p", call, skb); 1041 1042 if (sp->hdr.serviceId != call->dest_srx.srx_service) 1043 call->dest_srx.srx_service = sp->hdr.serviceId; 1044 if ((int)sp->hdr.serial - (int)call->rx_serial > 0) 1045 call->rx_serial = sp->hdr.serial; 1046 if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags)) 1047 set_bit(RXRPC_CALL_RX_HEARD, &call->flags); 1048 1049 timo = READ_ONCE(call->next_rx_timo); 1050 if (timo) { 1051 unsigned long now = jiffies, expect_rx_by; 1052 1053 expect_rx_by = now + timo; 1054 WRITE_ONCE(call->expect_rx_by, expect_rx_by); 1055 rxrpc_reduce_call_timer(call, expect_rx_by, now, 1056 rxrpc_timer_set_for_normal); 1057 } 1058 1059 switch (sp->hdr.type) { 1060 case RXRPC_PACKET_TYPE_DATA: 1061 return rxrpc_input_data(call, skb); 1062 1063 case RXRPC_PACKET_TYPE_ACK: 1064 return rxrpc_input_ack(call, skb); 1065 1066 case RXRPC_PACKET_TYPE_BUSY: 1067 /* Just ignore BUSY packets from the server; the retry and 1068 * lifespan timers will take care of business. BUSY packets 1069 * from the client don't make sense. 1070 */ 1071 return; 1072 1073 case RXRPC_PACKET_TYPE_ABORT: 1074 return rxrpc_input_abort(call, skb); 1075 1076 case RXRPC_PACKET_TYPE_ACKALL: 1077 return rxrpc_input_ackall(call, skb); 1078 1079 default: 1080 break; 1081 } 1082 } 1083 1084 /* 1085 * Handle a new service call on a channel implicitly completing the preceding 1086 * call on that channel. This does not apply to client conns. 1087 * 1088 * TODO: If callNumber > call_id + 1, renegotiate security. 1089 */ 1090 void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb) 1091 { 1092 switch (__rxrpc_call_state(call)) { 1093 case RXRPC_CALL_SERVER_AWAIT_ACK: 1094 rxrpc_call_completed(call); 1095 fallthrough; 1096 case RXRPC_CALL_COMPLETE: 1097 break; 1098 default: 1099 rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ESHUTDOWN, 1100 rxrpc_eproto_improper_term); 1101 trace_rxrpc_improper_term(call); 1102 break; 1103 } 1104 1105 rxrpc_input_call_event(call, skb); 1106 } 1107