1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC packet reception 3 * 4 * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, 13 struct sockaddr_rxrpc *peer_srx, 14 struct sk_buff *skb); 15 16 /* 17 * handle data received on the local endpoint 18 * - may be called in interrupt context 19 * 20 * [!] Note that as this is called from the encap_rcv hook, the socket is not 21 * held locked by the caller and nothing prevents sk_user_data on the UDP from 22 * being cleared in the middle of processing this function. 23 * 24 * Called with the RCU read lock held from the IP layer via UDP. 25 */ 26 int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb) 27 { 28 struct sk_buff_head *rx_queue; 29 struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk); 30 struct task_struct *io_thread; 31 32 if (unlikely(!local)) { 33 kfree_skb(skb); 34 return 0; 35 } 36 io_thread = READ_ONCE(local->io_thread); 37 if (!io_thread) { 38 kfree_skb(skb); 39 return 0; 40 } 41 if (skb->tstamp == 0) 42 skb->tstamp = ktime_get_real(); 43 44 skb->mark = RXRPC_SKB_MARK_PACKET; 45 rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv); 46 rx_queue = &local->rx_queue; 47 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 48 if (rxrpc_inject_rx_delay || 49 !skb_queue_empty(&local->rx_delay_queue)) { 50 skb->tstamp = ktime_add_ms(skb->tstamp, rxrpc_inject_rx_delay); 51 rx_queue = &local->rx_delay_queue; 52 } 53 #endif 54 55 skb_queue_tail(rx_queue, skb); 56 wake_up_process(io_thread); 57 return 0; 58 } 59 60 /* 61 * Handle an error received on the local endpoint. 62 */ 63 void rxrpc_error_report(struct sock *sk) 64 { 65 struct rxrpc_local *local; 66 struct sk_buff *skb; 67 68 rcu_read_lock(); 69 local = rcu_dereference_sk_user_data(sk); 70 if (unlikely(!local)) { 71 rcu_read_unlock(); 72 return; 73 } 74 75 while ((skb = skb_dequeue(&sk->sk_error_queue))) { 76 skb->mark = RXRPC_SKB_MARK_ERROR; 77 rxrpc_new_skb(skb, rxrpc_skb_new_error_report); 78 skb_queue_tail(&local->rx_queue, skb); 79 } 80 81 rxrpc_wake_up_io_thread(local); 82 rcu_read_unlock(); 83 } 84 85 /* 86 * Directly produce an abort from a packet. 87 */ 88 bool rxrpc_direct_abort(struct sk_buff *skb, enum rxrpc_abort_reason why, 89 s32 abort_code, int err) 90 { 91 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 92 93 trace_rxrpc_abort(0, why, sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, 94 abort_code, err); 95 skb->mark = RXRPC_SKB_MARK_REJECT_ABORT; 96 skb->priority = abort_code; 97 return false; 98 } 99 100 static bool rxrpc_bad_message(struct sk_buff *skb, enum rxrpc_abort_reason why) 101 { 102 return rxrpc_direct_abort(skb, why, RX_PROTOCOL_ERROR, -EBADMSG); 103 } 104 105 #define just_discard true 106 107 /* 108 * Process event packets targeted at a local endpoint. 109 */ 110 static bool rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb) 111 { 112 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 113 char v; 114 115 _enter(""); 116 117 rxrpc_see_skb(skb, rxrpc_skb_see_version); 118 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) { 119 if (v == 0) 120 rxrpc_send_version_request(local, &sp->hdr, skb); 121 } 122 123 return true; 124 } 125 126 /* 127 * Extract the wire header from a packet and translate the byte order. 128 */ 129 static bool rxrpc_extract_header(struct rxrpc_skb_priv *sp, 130 struct sk_buff *skb) 131 { 132 struct rxrpc_wire_header whdr; 133 134 /* dig out the RxRPC connection details */ 135 if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) 136 return rxrpc_bad_message(skb, rxrpc_badmsg_short_hdr); 137 138 memset(sp, 0, sizeof(*sp)); 139 sp->hdr.epoch = ntohl(whdr.epoch); 140 sp->hdr.cid = ntohl(whdr.cid); 141 sp->hdr.callNumber = ntohl(whdr.callNumber); 142 sp->hdr.seq = ntohl(whdr.seq); 143 sp->hdr.serial = ntohl(whdr.serial); 144 sp->hdr.flags = whdr.flags; 145 sp->hdr.type = whdr.type; 146 sp->hdr.userStatus = whdr.userStatus; 147 sp->hdr.securityIndex = whdr.securityIndex; 148 sp->hdr._rsvd = ntohs(whdr._rsvd); 149 sp->hdr.serviceId = ntohs(whdr.serviceId); 150 return true; 151 } 152 153 /* 154 * Extract the abort code from an ABORT packet and stash it in skb->priority. 155 */ 156 static bool rxrpc_extract_abort(struct sk_buff *skb) 157 { 158 __be32 wtmp; 159 160 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), 161 &wtmp, sizeof(wtmp)) < 0) 162 return false; 163 skb->priority = ntohl(wtmp); 164 return true; 165 } 166 167 /* 168 * Process packets received on the local endpoint 169 */ 170 static bool rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) 171 { 172 struct rxrpc_connection *conn; 173 struct sockaddr_rxrpc peer_srx; 174 struct rxrpc_skb_priv *sp; 175 struct rxrpc_peer *peer = NULL; 176 struct sk_buff *skb = *_skb; 177 bool ret = false; 178 179 skb_pull(skb, sizeof(struct udphdr)); 180 181 sp = rxrpc_skb(skb); 182 183 /* dig out the RxRPC connection details */ 184 if (!rxrpc_extract_header(sp, skb)) 185 return just_discard; 186 187 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { 188 static int lose; 189 if ((lose++ & 7) == 7) { 190 trace_rxrpc_rx_lose(sp); 191 return just_discard; 192 } 193 } 194 195 trace_rxrpc_rx_packet(sp); 196 197 switch (sp->hdr.type) { 198 case RXRPC_PACKET_TYPE_VERSION: 199 if (rxrpc_to_client(sp)) 200 return just_discard; 201 return rxrpc_input_version(local, skb); 202 203 case RXRPC_PACKET_TYPE_BUSY: 204 if (rxrpc_to_server(sp)) 205 return just_discard; 206 fallthrough; 207 case RXRPC_PACKET_TYPE_ACK: 208 case RXRPC_PACKET_TYPE_ACKALL: 209 if (sp->hdr.callNumber == 0) 210 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call); 211 break; 212 case RXRPC_PACKET_TYPE_ABORT: 213 if (!rxrpc_extract_abort(skb)) 214 return just_discard; /* Just discard if malformed */ 215 break; 216 217 case RXRPC_PACKET_TYPE_DATA: 218 if (sp->hdr.callNumber == 0) 219 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call); 220 if (sp->hdr.seq == 0) 221 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_seq); 222 223 /* Unshare the packet so that it can be modified for in-place 224 * decryption. 225 */ 226 if (sp->hdr.securityIndex != 0) { 227 skb = skb_unshare(skb, GFP_ATOMIC); 228 if (!skb) { 229 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem); 230 *_skb = NULL; 231 return just_discard; 232 } 233 234 if (skb != *_skb) { 235 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare); 236 *_skb = skb; 237 rxrpc_new_skb(skb, rxrpc_skb_new_unshared); 238 sp = rxrpc_skb(skb); 239 } 240 } 241 break; 242 243 case RXRPC_PACKET_TYPE_CHALLENGE: 244 if (rxrpc_to_server(sp)) 245 return just_discard; 246 break; 247 case RXRPC_PACKET_TYPE_RESPONSE: 248 if (rxrpc_to_client(sp)) 249 return just_discard; 250 break; 251 252 /* Packet types 9-11 should just be ignored. */ 253 case RXRPC_PACKET_TYPE_PARAMS: 254 case RXRPC_PACKET_TYPE_10: 255 case RXRPC_PACKET_TYPE_11: 256 return just_discard; 257 258 default: 259 return rxrpc_bad_message(skb, rxrpc_badmsg_unsupported_packet); 260 } 261 262 if (sp->hdr.serviceId == 0) 263 return rxrpc_bad_message(skb, rxrpc_badmsg_zero_service); 264 265 if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0)) 266 return just_discard; /* Unsupported address type. */ 267 268 if (peer_srx.transport.family != local->srx.transport.family && 269 (peer_srx.transport.family == AF_INET && 270 local->srx.transport.family != AF_INET6)) { 271 pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n", 272 peer_srx.transport.family, 273 local->srx.transport.family); 274 return just_discard; /* Wrong address type. */ 275 } 276 277 if (rxrpc_to_client(sp)) { 278 rcu_read_lock(); 279 conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb); 280 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); 281 rcu_read_unlock(); 282 if (!conn) 283 return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_conn); 284 285 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); 286 rxrpc_put_connection(conn, rxrpc_conn_put_call_input); 287 return ret; 288 } 289 290 /* We need to look up service connections by the full protocol 291 * parameter set. We look up the peer first as an intermediate step 292 * and then the connection from the peer's tree. 293 */ 294 rcu_read_lock(); 295 296 peer = rxrpc_lookup_peer_rcu(local, &peer_srx); 297 if (!peer) { 298 rcu_read_unlock(); 299 return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb); 300 } 301 302 conn = rxrpc_find_service_conn_rcu(peer, skb); 303 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); 304 if (conn) { 305 rcu_read_unlock(); 306 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); 307 rxrpc_put_connection(conn, rxrpc_conn_put_call_input); 308 return ret; 309 } 310 311 peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input); 312 rcu_read_unlock(); 313 314 ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb); 315 rxrpc_put_peer(peer, rxrpc_peer_put_input); 316 return ret; 317 } 318 319 /* 320 * Deal with a packet that's associated with an extant connection. 321 */ 322 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, 323 struct sockaddr_rxrpc *peer_srx, 324 struct sk_buff *skb) 325 { 326 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 327 struct rxrpc_channel *chan; 328 struct rxrpc_call *call = NULL; 329 unsigned int channel; 330 bool ret; 331 332 if (sp->hdr.securityIndex != conn->security_ix) 333 return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security, 334 RXKADINCONSISTENCY, -EBADMSG); 335 336 if (sp->hdr.serviceId != conn->service_id) { 337 int old_id; 338 339 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) 340 return rxrpc_protocol_error(skb, rxrpc_eproto_reupgrade); 341 342 old_id = cmpxchg(&conn->service_id, conn->orig_service_id, 343 sp->hdr.serviceId); 344 if (old_id != conn->orig_service_id && 345 old_id != sp->hdr.serviceId) 346 return rxrpc_protocol_error(skb, rxrpc_eproto_bad_upgrade); 347 } 348 349 if (after(sp->hdr.serial, conn->hi_serial)) 350 conn->hi_serial = sp->hdr.serial; 351 352 /* It's a connection-level packet if the call number is 0. */ 353 if (sp->hdr.callNumber == 0) 354 return rxrpc_input_conn_packet(conn, skb); 355 356 /* Call-bound packets are routed by connection channel. */ 357 channel = sp->hdr.cid & RXRPC_CHANNELMASK; 358 chan = &conn->channels[channel]; 359 360 /* Ignore really old calls */ 361 if (sp->hdr.callNumber < chan->last_call) 362 return just_discard; 363 364 if (sp->hdr.callNumber == chan->last_call) { 365 if (chan->call || 366 sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) 367 return just_discard; 368 369 /* For the previous service call, if completed successfully, we 370 * discard all further packets. 371 */ 372 if (rxrpc_conn_is_service(conn) && 373 chan->last_type == RXRPC_PACKET_TYPE_ACK) 374 return just_discard; 375 376 /* But otherwise we need to retransmit the final packet from 377 * data cached in the connection record. 378 */ 379 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA) 380 trace_rxrpc_rx_data(chan->call_debug_id, 381 sp->hdr.seq, 382 sp->hdr.serial, 383 sp->hdr.flags); 384 rxrpc_conn_retransmit_call(conn, skb, channel); 385 return just_discard; 386 } 387 388 call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input); 389 390 if (sp->hdr.callNumber > chan->call_id) { 391 if (rxrpc_to_client(sp)) { 392 rxrpc_put_call(call, rxrpc_call_put_input); 393 return rxrpc_protocol_error(skb, 394 rxrpc_eproto_unexpected_implicit_end); 395 } 396 397 if (call) { 398 rxrpc_implicit_end_call(call, skb); 399 rxrpc_put_call(call, rxrpc_call_put_input); 400 call = NULL; 401 } 402 } 403 404 if (!call) { 405 if (rxrpc_to_client(sp)) 406 return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_call); 407 return rxrpc_new_incoming_call(conn->local, conn->peer, conn, 408 peer_srx, skb); 409 } 410 411 ret = rxrpc_input_call_event(call, skb); 412 rxrpc_put_call(call, rxrpc_call_put_input); 413 return ret; 414 } 415 416 /* 417 * I/O and event handling thread. 418 */ 419 int rxrpc_io_thread(void *data) 420 { 421 struct rxrpc_connection *conn; 422 struct sk_buff_head rx_queue; 423 struct rxrpc_local *local = data; 424 struct rxrpc_call *call; 425 struct sk_buff *skb; 426 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 427 ktime_t now; 428 #endif 429 bool should_stop; 430 431 complete(&local->io_thread_ready); 432 433 skb_queue_head_init(&rx_queue); 434 435 set_user_nice(current, MIN_NICE); 436 437 for (;;) { 438 rxrpc_inc_stat(local->rxnet, stat_io_loop); 439 440 /* Deal with connections that want immediate attention. */ 441 conn = list_first_entry_or_null(&local->conn_attend_q, 442 struct rxrpc_connection, 443 attend_link); 444 if (conn) { 445 spin_lock_bh(&local->lock); 446 list_del_init(&conn->attend_link); 447 spin_unlock_bh(&local->lock); 448 449 rxrpc_input_conn_event(conn, NULL); 450 rxrpc_put_connection(conn, rxrpc_conn_put_poke); 451 continue; 452 } 453 454 if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER, 455 &local->client_conn_flags)) 456 rxrpc_discard_expired_client_conns(local); 457 458 /* Deal with calls that want immediate attention. */ 459 if ((call = list_first_entry_or_null(&local->call_attend_q, 460 struct rxrpc_call, 461 attend_link))) { 462 spin_lock_bh(&local->lock); 463 list_del_init(&call->attend_link); 464 spin_unlock_bh(&local->lock); 465 466 trace_rxrpc_call_poked(call); 467 rxrpc_input_call_event(call, NULL); 468 rxrpc_put_call(call, rxrpc_call_put_poke); 469 continue; 470 } 471 472 if (!list_empty(&local->new_client_calls)) 473 rxrpc_connect_client_calls(local); 474 475 /* Process received packets and errors. */ 476 if ((skb = __skb_dequeue(&rx_queue))) { 477 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 478 switch (skb->mark) { 479 case RXRPC_SKB_MARK_PACKET: 480 skb->priority = 0; 481 if (!rxrpc_input_packet(local, &skb)) 482 rxrpc_reject_packet(local, skb); 483 trace_rxrpc_rx_done(skb->mark, skb->priority); 484 rxrpc_free_skb(skb, rxrpc_skb_put_input); 485 break; 486 case RXRPC_SKB_MARK_ERROR: 487 rxrpc_input_error(local, skb); 488 rxrpc_free_skb(skb, rxrpc_skb_put_error_report); 489 break; 490 case RXRPC_SKB_MARK_SERVICE_CONN_SECURED: 491 rxrpc_input_conn_event(sp->conn, skb); 492 rxrpc_put_connection(sp->conn, rxrpc_conn_put_poke); 493 rxrpc_free_skb(skb, rxrpc_skb_put_conn_secured); 494 break; 495 default: 496 WARN_ON_ONCE(1); 497 rxrpc_free_skb(skb, rxrpc_skb_put_unknown); 498 break; 499 } 500 continue; 501 } 502 503 /* Inject a delay into packets if requested. */ 504 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 505 now = ktime_get_real(); 506 while ((skb = skb_peek(&local->rx_delay_queue))) { 507 if (ktime_before(now, skb->tstamp)) 508 break; 509 skb = skb_dequeue(&local->rx_delay_queue); 510 skb_queue_tail(&local->rx_queue, skb); 511 } 512 #endif 513 514 if (!skb_queue_empty(&local->rx_queue)) { 515 spin_lock_irq(&local->rx_queue.lock); 516 skb_queue_splice_tail_init(&local->rx_queue, &rx_queue); 517 spin_unlock_irq(&local->rx_queue.lock); 518 continue; 519 } 520 521 set_current_state(TASK_INTERRUPTIBLE); 522 should_stop = kthread_should_stop(); 523 if (!skb_queue_empty(&local->rx_queue) || 524 !list_empty(&local->call_attend_q) || 525 !list_empty(&local->conn_attend_q) || 526 !list_empty(&local->new_client_calls) || 527 test_bit(RXRPC_CLIENT_CONN_REAP_TIMER, 528 &local->client_conn_flags)) { 529 __set_current_state(TASK_RUNNING); 530 continue; 531 } 532 533 if (should_stop) 534 break; 535 536 #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY 537 skb = skb_peek(&local->rx_delay_queue); 538 if (skb) { 539 unsigned long timeout; 540 ktime_t tstamp = skb->tstamp; 541 ktime_t now = ktime_get_real(); 542 s64 delay_ns = ktime_to_ns(ktime_sub(tstamp, now)); 543 544 if (delay_ns <= 0) { 545 __set_current_state(TASK_RUNNING); 546 continue; 547 } 548 549 timeout = nsecs_to_jiffies(delay_ns); 550 timeout = max(timeout, 1UL); 551 schedule_timeout(timeout); 552 __set_current_state(TASK_RUNNING); 553 continue; 554 } 555 #endif 556 557 schedule(); 558 } 559 560 __set_current_state(TASK_RUNNING); 561 rxrpc_see_local(local, rxrpc_local_stop); 562 rxrpc_destroy_local(local); 563 WRITE_ONCE(local->io_thread, NULL); 564 rxrpc_see_local(local, rxrpc_local_stopped); 565 return 0; 566 } 567