1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC packet reception 3 * 4 * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, 13 struct sockaddr_rxrpc *peer_srx, 14 struct sk_buff *skb); 15 16 /* 17 * handle data received on the local endpoint 18 * - may be called in interrupt context 19 * 20 * [!] Note that as this is called from the encap_rcv hook, the socket is not 21 * held locked by the caller and nothing prevents sk_user_data on the UDP from 22 * being cleared in the middle of processing this function. 23 * 24 * Called with the RCU read lock held from the IP layer via UDP. 25 */ 26 int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb) 27 { 28 struct sk_buff_head *rx_queue; 29 struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk); 30 31 if (unlikely(!local)) { 32 kfree_skb(skb); 33 return 0; 34 } 35 if (skb->tstamp == 0) 36 skb->tstamp = ktime_get_real(); 37 38 skb->mark = RXRPC_SKB_MARK_PACKET; 39 rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv); 40 rx_queue = &local->rx_queue; 41 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 42 if (rxrpc_inject_rx_delay || 43 !skb_queue_empty(&local->rx_delay_queue)) { 44 skb->tstamp = ktime_add_ms(skb->tstamp, rxrpc_inject_rx_delay); 45 rx_queue = &local->rx_delay_queue; 46 } 47 #endif 48 49 skb_queue_tail(rx_queue, skb); 50 rxrpc_wake_up_io_thread(local); 51 return 0; 52 } 53 54 /* 55 * Handle an error received on the local endpoint. 56 */ 57 void rxrpc_error_report(struct sock *sk) 58 { 59 struct rxrpc_local *local; 60 struct sk_buff *skb; 61 62 rcu_read_lock(); 63 local = rcu_dereference_sk_user_data(sk); 64 if (unlikely(!local)) { 65 rcu_read_unlock(); 66 return; 67 } 68 69 while ((skb = skb_dequeue(&sk->sk_error_queue))) { 70 skb->mark = RXRPC_SKB_MARK_ERROR; 71 rxrpc_new_skb(skb, rxrpc_skb_new_error_report); 72 skb_queue_tail(&local->rx_queue, skb); 73 } 74 75 rxrpc_wake_up_io_thread(local); 76 rcu_read_unlock(); 77 } 78 79 /* 80 * Directly produce an abort from a packet. 81 */ 82 bool rxrpc_direct_abort(struct sk_buff *skb, enum rxrpc_abort_reason why, 83 s32 abort_code, int err) 84 { 85 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 86 87 trace_rxrpc_abort(0, why, sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, 88 abort_code, err); 89 skb->mark = RXRPC_SKB_MARK_REJECT_ABORT; 90 skb->priority = abort_code; 91 return false; 92 } 93 94 static bool rxrpc_bad_message(struct sk_buff *skb, enum rxrpc_abort_reason why) 95 { 96 return rxrpc_direct_abort(skb, why, RX_PROTOCOL_ERROR, -EBADMSG); 97 } 98 99 #define just_discard true 100 101 /* 102 * Process event packets targeted at a local endpoint. 103 */ 104 static bool rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb) 105 { 106 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 107 char v; 108 109 _enter(""); 110 111 rxrpc_see_skb(skb, rxrpc_skb_see_version); 112 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) { 113 if (v == 0) 114 rxrpc_send_version_request(local, &sp->hdr, skb); 115 } 116 117 return true; 118 } 119 120 /* 121 * Extract the wire header from a packet and translate the byte order. 122 */ 123 static bool rxrpc_extract_header(struct rxrpc_skb_priv *sp, 124 struct sk_buff *skb) 125 { 126 struct rxrpc_wire_header whdr; 127 128 /* dig out the RxRPC connection details */ 129 if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) 130 return rxrpc_bad_message(skb, rxrpc_badmsg_short_hdr); 131 132 memset(sp, 0, sizeof(*sp)); 133 sp->hdr.epoch = ntohl(whdr.epoch); 134 sp->hdr.cid = ntohl(whdr.cid); 135 sp->hdr.callNumber = ntohl(whdr.callNumber); 136 sp->hdr.seq = ntohl(whdr.seq); 137 sp->hdr.serial = ntohl(whdr.serial); 138 sp->hdr.flags = whdr.flags; 139 sp->hdr.type = whdr.type; 140 sp->hdr.userStatus = whdr.userStatus; 141 sp->hdr.securityIndex = whdr.securityIndex; 142 sp->hdr._rsvd = ntohs(whdr._rsvd); 143 sp->hdr.serviceId = ntohs(whdr.serviceId); 144 return true; 145 } 146 147 /* 148 * Extract the abort code from an ABORT packet and stash it in skb->priority. 149 */ 150 static bool rxrpc_extract_abort(struct sk_buff *skb) 151 { 152 __be32 wtmp; 153 154 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), 155 &wtmp, sizeof(wtmp)) < 0) 156 return false; 157 skb->priority = ntohl(wtmp); 158 return true; 159 } 160 161 /* 162 * Process packets received on the local endpoint 163 */ 164 static bool rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) 165 { 166 struct rxrpc_connection *conn; 167 struct sockaddr_rxrpc peer_srx; 168 struct rxrpc_skb_priv *sp; 169 struct rxrpc_peer *peer = NULL; 170 struct sk_buff *skb = *_skb; 171 bool ret = false; 172 173 skb_pull(skb, sizeof(struct udphdr)); 174 175 sp = rxrpc_skb(skb); 176 177 /* dig out the RxRPC connection details */ 178 if (!rxrpc_extract_header(sp, skb)) 179 return just_discard; 180 181 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { 182 static int lose; 183 if ((lose++ & 7) == 7) { 184 trace_rxrpc_rx_lose(sp); 185 return just_discard; 186 } 187 } 188 189 trace_rxrpc_rx_packet(sp); 190 191 switch (sp->hdr.type) { 192 case RXRPC_PACKET_TYPE_VERSION: 193 if (rxrpc_to_client(sp)) 194 return just_discard; 195 return rxrpc_input_version(local, skb); 196 197 case RXRPC_PACKET_TYPE_BUSY: 198 if (rxrpc_to_server(sp)) 199 return just_discard; 200 fallthrough; 201 case RXRPC_PACKET_TYPE_ACK: 202 case RXRPC_PACKET_TYPE_ACKALL: 203 if (sp->hdr.callNumber == 0) 204 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call); 205 break; 206 case RXRPC_PACKET_TYPE_ABORT: 207 if (!rxrpc_extract_abort(skb)) 208 return just_discard; /* Just discard if malformed */ 209 break; 210 211 case RXRPC_PACKET_TYPE_DATA: 212 if (sp->hdr.callNumber == 0) 213 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call); 214 if (sp->hdr.seq == 0) 215 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_seq); 216 217 /* Unshare the packet so that it can be modified for in-place 218 * decryption. 219 */ 220 if (sp->hdr.securityIndex != 0) { 221 skb = skb_unshare(skb, GFP_ATOMIC); 222 if (!skb) { 223 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem); 224 *_skb = NULL; 225 return just_discard; 226 } 227 228 if (skb != *_skb) { 229 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare); 230 *_skb = skb; 231 rxrpc_new_skb(skb, rxrpc_skb_new_unshared); 232 sp = rxrpc_skb(skb); 233 } 234 } 235 break; 236 237 case RXRPC_PACKET_TYPE_CHALLENGE: 238 if (rxrpc_to_server(sp)) 239 return just_discard; 240 break; 241 case RXRPC_PACKET_TYPE_RESPONSE: 242 if (rxrpc_to_client(sp)) 243 return just_discard; 244 break; 245 246 /* Packet types 9-11 should just be ignored. */ 247 case RXRPC_PACKET_TYPE_PARAMS: 248 case RXRPC_PACKET_TYPE_10: 249 case RXRPC_PACKET_TYPE_11: 250 return just_discard; 251 252 default: 253 return rxrpc_bad_message(skb, rxrpc_badmsg_unsupported_packet); 254 } 255 256 if (sp->hdr.serviceId == 0) 257 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_service); 258 259 if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0)) 260 return just_discard; /* Unsupported address type. */ 261 262 if (peer_srx.transport.family != local->srx.transport.family && 263 (peer_srx.transport.family == AF_INET && 264 local->srx.transport.family != AF_INET6)) { 265 pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n", 266 peer_srx.transport.family, 267 local->srx.transport.family); 268 return just_discard; /* Wrong address type. */ 269 } 270 271 if (rxrpc_to_client(sp)) { 272 rcu_read_lock(); 273 conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb); 274 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); 275 rcu_read_unlock(); 276 if (!conn) 277 return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_conn); 278 279 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); 280 rxrpc_put_connection(conn, rxrpc_conn_put_call_input); 281 return ret; 282 } 283 284 /* We need to look up service connections by the full protocol 285 * parameter set. We look up the peer first as an intermediate step 286 * and then the connection from the peer's tree. 287 */ 288 rcu_read_lock(); 289 290 peer = rxrpc_lookup_peer_rcu(local, &peer_srx); 291 if (!peer) { 292 rcu_read_unlock(); 293 return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb); 294 } 295 296 conn = rxrpc_find_service_conn_rcu(peer, skb); 297 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); 298 if (conn) { 299 rcu_read_unlock(); 300 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); 301 rxrpc_put_connection(conn, rxrpc_conn_put_call_input); 302 return ret; 303 } 304 305 peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input); 306 rcu_read_unlock(); 307 308 ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb); 309 rxrpc_put_peer(peer, rxrpc_peer_put_input); 310 return ret; 311 } 312 313 /* 314 * Deal with a packet that's associated with an extant connection. 315 */ 316 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, 317 struct sockaddr_rxrpc *peer_srx, 318 struct sk_buff *skb) 319 { 320 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 321 struct rxrpc_channel *chan; 322 struct rxrpc_call *call = NULL; 323 unsigned int channel; 324 bool ret; 325 326 if (sp->hdr.securityIndex != conn->security_ix) 327 return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security, 328 RXKADINCONSISTENCY, -EBADMSG); 329 330 if (sp->hdr.serviceId != conn->service_id) { 331 int old_id; 332 333 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) 334 return rxrpc_protocol_error(skb, rxrpc_eproto_reupgrade); 335 336 old_id = cmpxchg(&conn->service_id, conn->orig_service_id, 337 sp->hdr.serviceId); 338 if (old_id != conn->orig_service_id && 339 old_id != sp->hdr.serviceId) 340 return rxrpc_protocol_error(skb, rxrpc_eproto_bad_upgrade); 341 } 342 343 if (after(sp->hdr.serial, conn->hi_serial)) 344 conn->hi_serial = sp->hdr.serial; 345 346 /* It's a connection-level packet if the call number is 0. */ 347 if (sp->hdr.callNumber == 0) 348 return rxrpc_input_conn_packet(conn, skb); 349 350 /* Call-bound packets are routed by connection channel. */ 351 channel = sp->hdr.cid & RXRPC_CHANNELMASK; 352 chan = &conn->channels[channel]; 353 354 /* Ignore really old calls */ 355 if (sp->hdr.callNumber < chan->last_call) 356 return just_discard; 357 358 if (sp->hdr.callNumber == chan->last_call) { 359 if (chan->call || 360 sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) 361 return just_discard; 362 363 /* For the previous service call, if completed successfully, we 364 * discard all further packets. 365 */ 366 if (rxrpc_conn_is_service(conn) && 367 chan->last_type == RXRPC_PACKET_TYPE_ACK) 368 return just_discard; 369 370 /* But otherwise we need to retransmit the final packet from 371 * data cached in the connection record. 372 */ 373 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA) 374 trace_rxrpc_rx_data(chan->call_debug_id, 375 sp->hdr.seq, 376 sp->hdr.serial, 377 sp->hdr.flags); 378 rxrpc_conn_retransmit_call(conn, skb, channel); 379 return just_discard; 380 } 381 382 call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input); 383 384 if (sp->hdr.callNumber > chan->call_id) { 385 if (rxrpc_to_client(sp)) { 386 rxrpc_put_call(call, rxrpc_call_put_input); 387 return rxrpc_protocol_error(skb, 388 rxrpc_eproto_unexpected_implicit_end); 389 } 390 391 if (call) { 392 rxrpc_implicit_end_call(call, skb); 393 rxrpc_put_call(call, rxrpc_call_put_input); 394 call = NULL; 395 } 396 } 397 398 if (!call) { 399 if (rxrpc_to_client(sp)) 400 return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_call); 401 return rxrpc_new_incoming_call(conn->local, conn->peer, conn, 402 peer_srx, skb); 403 } 404 405 ret = rxrpc_input_call_event(call, skb); 406 rxrpc_put_call(call, rxrpc_call_put_input); 407 return ret; 408 } 409 410 /* 411 * I/O and event handling thread. 412 */ 413 int rxrpc_io_thread(void *data) 414 { 415 struct rxrpc_connection *conn; 416 struct sk_buff_head rx_queue; 417 struct rxrpc_local *local = data; 418 struct rxrpc_call *call; 419 struct sk_buff *skb; 420 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 421 ktime_t now; 422 #endif 423 bool should_stop; 424 425 complete(&local->io_thread_ready); 426 427 skb_queue_head_init(&rx_queue); 428 429 set_user_nice(current, MIN_NICE); 430 431 for (;;) { 432 rxrpc_inc_stat(local->rxnet, stat_io_loop); 433 434 /* Deal with connections that want immediate attention. */ 435 conn = list_first_entry_or_null(&local->conn_attend_q, 436 struct rxrpc_connection, 437 attend_link); 438 if (conn) { 439 spin_lock_bh(&local->lock); 440 list_del_init(&conn->attend_link); 441 spin_unlock_bh(&local->lock); 442 443 rxrpc_input_conn_event(conn, NULL); 444 rxrpc_put_connection(conn, rxrpc_conn_put_poke); 445 continue; 446 } 447 448 if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER, 449 &local->client_conn_flags)) 450 rxrpc_discard_expired_client_conns(local); 451 452 /* Deal with calls that want immediate attention. */ 453 if ((call = list_first_entry_or_null(&local->call_attend_q, 454 struct rxrpc_call, 455 attend_link))) { 456 spin_lock_bh(&local->lock); 457 list_del_init(&call->attend_link); 458 spin_unlock_bh(&local->lock); 459 460 trace_rxrpc_call_poked(call); 461 rxrpc_input_call_event(call, NULL); 462 rxrpc_put_call(call, rxrpc_call_put_poke); 463 continue; 464 } 465 466 if (!list_empty(&local->new_client_calls)) 467 rxrpc_connect_client_calls(local); 468 469 /* Process received packets and errors. */ 470 if ((skb = __skb_dequeue(&rx_queue))) { 471 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 472 switch (skb->mark) { 473 case RXRPC_SKB_MARK_PACKET: 474 skb->priority = 0; 475 if (!rxrpc_input_packet(local, &skb)) 476 rxrpc_reject_packet(local, skb); 477 trace_rxrpc_rx_done(skb->mark, skb->priority); 478 rxrpc_free_skb(skb, rxrpc_skb_put_input); 479 break; 480 case RXRPC_SKB_MARK_ERROR: 481 rxrpc_input_error(local, skb); 482 rxrpc_free_skb(skb, rxrpc_skb_put_error_report); 483 break; 484 case RXRPC_SKB_MARK_SERVICE_CONN_SECURED: 485 rxrpc_input_conn_event(sp->conn, skb); 486 rxrpc_put_connection(sp->conn, rxrpc_conn_put_poke); 487 rxrpc_free_skb(skb, rxrpc_skb_put_conn_secured); 488 break; 489 default: 490 WARN_ON_ONCE(1); 491 rxrpc_free_skb(skb, rxrpc_skb_put_unknown); 492 break; 493 } 494 continue; 495 } 496 497 /* Inject a delay into packets if requested. */ 498 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 499 now = ktime_get_real(); 500 while ((skb = skb_peek(&local->rx_delay_queue))) { 501 if (ktime_before(now, skb->tstamp)) 502 break; 503 skb = skb_dequeue(&local->rx_delay_queue); 504 skb_queue_tail(&local->rx_queue, skb); 505 } 506 #endif 507 508 if (!skb_queue_empty(&local->rx_queue)) { 509 spin_lock_irq(&local->rx_queue.lock); 510 skb_queue_splice_tail_init(&local->rx_queue, &rx_queue); 511 spin_unlock_irq(&local->rx_queue.lock); 512 continue; 513 } 514 515 set_current_state(TASK_INTERRUPTIBLE); 516 should_stop = kthread_should_stop(); 517 if (!skb_queue_empty(&local->rx_queue) || 518 !list_empty(&local->call_attend_q) || 519 !list_empty(&local->conn_attend_q) || 520 !list_empty(&local->new_client_calls) || 521 test_bit(RXRPC_CLIENT_CONN_REAP_TIMER, 522 &local->client_conn_flags)) { 523 __set_current_state(TASK_RUNNING); 524 continue; 525 } 526 527 if (should_stop) 528 break; 529 530 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 531 skb = skb_peek(&local->rx_delay_queue); 532 if (skb) { 533 unsigned long timeout; 534 ktime_t tstamp = skb->tstamp; 535 ktime_t now = ktime_get_real(); 536 s64 delay_ns = ktime_to_ns(ktime_sub(tstamp, now)); 537 538 if (delay_ns <= 0) { 539 __set_current_state(TASK_RUNNING); 540 continue; 541 } 542 543 timeout = nsecs_to_jiffies(delay_ns); 544 timeout = max(timeout, 1UL); 545 schedule_timeout(timeout); 546 __set_current_state(TASK_RUNNING); 547 continue; 548 } 549 #endif 550 551 schedule(); 552 } 553 554 __set_current_state(TASK_RUNNING); 555 rxrpc_see_local(local, rxrpc_local_stop); 556 rxrpc_destroy_local(local); 557 local->io_thread = NULL; 558 rxrpc_see_local(local, rxrpc_local_stopped); 559 return 0; 560 } 561