1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC packet reception 3 * 4 * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include "ar-internal.h" 11 12 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, 13 struct sockaddr_rxrpc *peer_srx, 14 struct sk_buff *skb); 15 16 /* 17 * handle data received on the local endpoint 18 * - may be called in interrupt context 19 * 20 * [!] Note that as this is called from the encap_rcv hook, the socket is not 21 * held locked by the caller and nothing prevents sk_user_data on the UDP from 22 * being cleared in the middle of processing this function. 23 * 24 * Called with the RCU read lock held from the IP layer via UDP. 25 */ 26 int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb) 27 { 28 struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk); 29 30 if (unlikely(!local)) { 31 kfree_skb(skb); 32 return 0; 33 } 34 if (skb->tstamp == 0) 35 skb->tstamp = ktime_get_real(); 36 37 skb->mark = RXRPC_SKB_MARK_PACKET; 38 rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv); 39 skb_queue_tail(&local->rx_queue, skb); 40 rxrpc_wake_up_io_thread(local); 41 return 0; 42 } 43 44 /* 45 * Handle an error received on the local endpoint. 46 */ 47 void rxrpc_error_report(struct sock *sk) 48 { 49 struct rxrpc_local *local; 50 struct sk_buff *skb; 51 52 rcu_read_lock(); 53 local = rcu_dereference_sk_user_data(sk); 54 if (unlikely(!local)) { 55 rcu_read_unlock(); 56 return; 57 } 58 59 while ((skb = skb_dequeue(&sk->sk_error_queue))) { 60 skb->mark = RXRPC_SKB_MARK_ERROR; 61 rxrpc_new_skb(skb, rxrpc_skb_new_error_report); 62 skb_queue_tail(&local->rx_queue, skb); 63 } 64 65 rxrpc_wake_up_io_thread(local); 66 rcu_read_unlock(); 67 } 68 69 /* 70 * Process event packets targeted at a local endpoint. 71 */ 72 static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb) 73 { 74 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 75 char v; 76 77 _enter(""); 78 79 rxrpc_see_skb(skb, rxrpc_skb_see_version); 80 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) { 81 if (v == 0) 82 rxrpc_send_version_request(local, &sp->hdr, skb); 83 } 84 } 85 86 /* 87 * Extract the wire header from a packet and translate the byte order. 88 */ 89 static noinline 90 int rxrpc_extract_header(struct rxrpc_skb_priv *sp, struct sk_buff *skb) 91 { 92 struct rxrpc_wire_header whdr; 93 94 /* dig out the RxRPC connection details */ 95 if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0) { 96 trace_rxrpc_rx_eproto(NULL, sp->hdr.serial, 97 tracepoint_string("bad_hdr")); 98 return -EBADMSG; 99 } 100 101 memset(sp, 0, sizeof(*sp)); 102 sp->hdr.epoch = ntohl(whdr.epoch); 103 sp->hdr.cid = ntohl(whdr.cid); 104 sp->hdr.callNumber = ntohl(whdr.callNumber); 105 sp->hdr.seq = ntohl(whdr.seq); 106 sp->hdr.serial = ntohl(whdr.serial); 107 sp->hdr.flags = whdr.flags; 108 sp->hdr.type = whdr.type; 109 sp->hdr.userStatus = whdr.userStatus; 110 sp->hdr.securityIndex = whdr.securityIndex; 111 sp->hdr._rsvd = ntohs(whdr._rsvd); 112 sp->hdr.serviceId = ntohs(whdr.serviceId); 113 return 0; 114 } 115 116 /* 117 * Extract the abort code from an ABORT packet and stash it in skb->priority. 118 */ 119 static bool rxrpc_extract_abort(struct sk_buff *skb) 120 { 121 __be32 wtmp; 122 123 if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), 124 &wtmp, sizeof(wtmp)) < 0) 125 return false; 126 skb->priority = ntohl(wtmp); 127 return true; 128 } 129 130 /* 131 * Process packets received on the local endpoint 132 */ 133 static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb) 134 { 135 struct rxrpc_connection *conn; 136 struct sockaddr_rxrpc peer_srx; 137 struct rxrpc_skb_priv *sp; 138 struct rxrpc_peer *peer = NULL; 139 struct sk_buff *skb = *_skb; 140 int ret = 0; 141 142 skb_pull(skb, sizeof(struct udphdr)); 143 144 sp = rxrpc_skb(skb); 145 146 /* dig out the RxRPC connection details */ 147 if (rxrpc_extract_header(sp, skb) < 0) 148 goto bad_message; 149 150 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { 151 static int lose; 152 if ((lose++ & 7) == 7) { 153 trace_rxrpc_rx_lose(sp); 154 return 0; 155 } 156 } 157 158 trace_rxrpc_rx_packet(sp); 159 160 switch (sp->hdr.type) { 161 case RXRPC_PACKET_TYPE_VERSION: 162 if (rxrpc_to_client(sp)) 163 return 0; 164 rxrpc_input_version(local, skb); 165 return 0; 166 167 case RXRPC_PACKET_TYPE_BUSY: 168 if (rxrpc_to_server(sp)) 169 return 0; 170 fallthrough; 171 case RXRPC_PACKET_TYPE_ACK: 172 case RXRPC_PACKET_TYPE_ACKALL: 173 if (sp->hdr.callNumber == 0) 174 goto bad_message; 175 break; 176 case RXRPC_PACKET_TYPE_ABORT: 177 if (!rxrpc_extract_abort(skb)) 178 return 0; /* Just discard if malformed */ 179 break; 180 181 case RXRPC_PACKET_TYPE_DATA: 182 if (sp->hdr.callNumber == 0 || 183 sp->hdr.seq == 0) 184 goto bad_message; 185 186 /* Unshare the packet so that it can be modified for in-place 187 * decryption. 188 */ 189 if (sp->hdr.securityIndex != 0) { 190 skb = skb_unshare(skb, GFP_ATOMIC); 191 if (!skb) { 192 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem); 193 *_skb = NULL; 194 return 0; 195 } 196 197 if (skb != *_skb) { 198 rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare); 199 *_skb = skb; 200 rxrpc_new_skb(skb, rxrpc_skb_new_unshared); 201 sp = rxrpc_skb(skb); 202 } 203 } 204 break; 205 206 case RXRPC_PACKET_TYPE_CHALLENGE: 207 if (rxrpc_to_server(sp)) 208 return 0; 209 break; 210 case RXRPC_PACKET_TYPE_RESPONSE: 211 if (rxrpc_to_client(sp)) 212 return 0; 213 break; 214 215 /* Packet types 9-11 should just be ignored. */ 216 case RXRPC_PACKET_TYPE_PARAMS: 217 case RXRPC_PACKET_TYPE_10: 218 case RXRPC_PACKET_TYPE_11: 219 return 0; 220 221 default: 222 goto bad_message; 223 } 224 225 if (sp->hdr.serviceId == 0) 226 goto bad_message; 227 228 if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0)) 229 return true; /* Unsupported address type - discard. */ 230 231 if (peer_srx.transport.family != local->srx.transport.family && 232 (peer_srx.transport.family == AF_INET && 233 local->srx.transport.family != AF_INET6)) { 234 pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n", 235 peer_srx.transport.family, 236 local->srx.transport.family); 237 return true; /* Wrong address type - discard. */ 238 } 239 240 if (rxrpc_to_client(sp)) { 241 rcu_read_lock(); 242 conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb); 243 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); 244 rcu_read_unlock(); 245 if (!conn) { 246 trace_rxrpc_abort(0, "NCC", sp->hdr.cid, 247 sp->hdr.callNumber, sp->hdr.seq, 248 RXKADINCONSISTENCY, EBADMSG); 249 goto protocol_error; 250 } 251 252 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); 253 rxrpc_put_connection(conn, rxrpc_conn_put_call_input); 254 return ret; 255 } 256 257 /* We need to look up service connections by the full protocol 258 * parameter set. We look up the peer first as an intermediate step 259 * and then the connection from the peer's tree. 260 */ 261 rcu_read_lock(); 262 263 peer = rxrpc_lookup_peer_rcu(local, &peer_srx); 264 if (!peer) { 265 rcu_read_unlock(); 266 return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb); 267 } 268 269 conn = rxrpc_find_service_conn_rcu(peer, skb); 270 conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input); 271 if (conn) { 272 rcu_read_unlock(); 273 ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb); 274 rxrpc_put_connection(conn, rxrpc_conn_put_call_input); 275 return ret; 276 } 277 278 peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input); 279 rcu_read_unlock(); 280 281 ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb); 282 rxrpc_put_peer(peer, rxrpc_peer_put_input); 283 if (ret < 0) 284 goto reject_packet; 285 return 0; 286 287 bad_message: 288 trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, 289 RX_PROTOCOL_ERROR, EBADMSG); 290 protocol_error: 291 skb->priority = RX_PROTOCOL_ERROR; 292 skb->mark = RXRPC_SKB_MARK_REJECT_ABORT; 293 reject_packet: 294 rxrpc_reject_packet(local, skb); 295 return ret; 296 } 297 298 /* 299 * Deal with a packet that's associated with an extant connection. 300 */ 301 static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn, 302 struct sockaddr_rxrpc *peer_srx, 303 struct sk_buff *skb) 304 { 305 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 306 struct rxrpc_channel *chan; 307 struct rxrpc_call *call = NULL; 308 unsigned int channel; 309 310 if (sp->hdr.securityIndex != conn->security_ix) 311 goto wrong_security; 312 313 if (sp->hdr.serviceId != conn->service_id) { 314 int old_id; 315 316 if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) 317 goto reupgrade; 318 old_id = cmpxchg(&conn->service_id, conn->orig_service_id, 319 sp->hdr.serviceId); 320 321 if (old_id != conn->orig_service_id && 322 old_id != sp->hdr.serviceId) 323 goto reupgrade; 324 } 325 326 if (after(sp->hdr.serial, conn->hi_serial)) 327 conn->hi_serial = sp->hdr.serial; 328 329 /* It's a connection-level packet if the call number is 0. */ 330 if (sp->hdr.callNumber == 0) 331 return rxrpc_input_conn_packet(conn, skb); 332 333 /* Call-bound packets are routed by connection channel. */ 334 channel = sp->hdr.cid & RXRPC_CHANNELMASK; 335 chan = &conn->channels[channel]; 336 337 /* Ignore really old calls */ 338 if (sp->hdr.callNumber < chan->last_call) 339 return 0; 340 341 if (sp->hdr.callNumber == chan->last_call) { 342 if (chan->call || 343 sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) 344 return 0; 345 346 /* For the previous service call, if completed successfully, we 347 * discard all further packets. 348 */ 349 if (rxrpc_conn_is_service(conn) && 350 chan->last_type == RXRPC_PACKET_TYPE_ACK) 351 return 0; 352 353 /* But otherwise we need to retransmit the final packet from 354 * data cached in the connection record. 355 */ 356 if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA) 357 trace_rxrpc_rx_data(chan->call_debug_id, 358 sp->hdr.seq, 359 sp->hdr.serial, 360 sp->hdr.flags); 361 rxrpc_input_conn_packet(conn, skb); 362 return 0; 363 } 364 365 rcu_read_lock(); 366 call = rxrpc_try_get_call(rcu_dereference(chan->call), 367 rxrpc_call_get_input); 368 rcu_read_unlock(); 369 370 if (sp->hdr.callNumber > chan->call_id) { 371 if (rxrpc_to_client(sp)) { 372 rxrpc_put_call(call, rxrpc_call_put_input); 373 goto reject_packet; 374 } 375 376 if (call) { 377 rxrpc_implicit_end_call(call, skb); 378 rxrpc_put_call(call, rxrpc_call_put_input); 379 call = NULL; 380 } 381 } 382 383 if (!call) { 384 if (rxrpc_to_client(sp)) 385 goto bad_message; 386 if (rxrpc_new_incoming_call(conn->local, conn->peer, conn, 387 peer_srx, skb)) 388 return 0; 389 goto reject_packet; 390 } 391 392 rxrpc_input_call_event(call, skb); 393 rxrpc_put_call(call, rxrpc_call_put_input); 394 return 0; 395 396 wrong_security: 397 trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, 398 RXKADINCONSISTENCY, EBADMSG); 399 skb->priority = RXKADINCONSISTENCY; 400 goto post_abort; 401 402 reupgrade: 403 trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, 404 RX_PROTOCOL_ERROR, EBADMSG); 405 goto protocol_error; 406 407 bad_message: 408 trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq, 409 RX_PROTOCOL_ERROR, EBADMSG); 410 protocol_error: 411 skb->priority = RX_PROTOCOL_ERROR; 412 post_abort: 413 skb->mark = RXRPC_SKB_MARK_REJECT_ABORT; 414 reject_packet: 415 rxrpc_reject_packet(conn->local, skb); 416 return 0; 417 } 418 419 /* 420 * I/O and event handling thread. 421 */ 422 int rxrpc_io_thread(void *data) 423 { 424 struct sk_buff_head rx_queue; 425 struct rxrpc_local *local = data; 426 struct rxrpc_call *call; 427 struct sk_buff *skb; 428 429 skb_queue_head_init(&rx_queue); 430 431 set_user_nice(current, MIN_NICE); 432 433 for (;;) { 434 rxrpc_inc_stat(local->rxnet, stat_io_loop); 435 436 /* Deal with calls that want immediate attention. */ 437 if ((call = list_first_entry_or_null(&local->call_attend_q, 438 struct rxrpc_call, 439 attend_link))) { 440 spin_lock_bh(&local->lock); 441 list_del_init(&call->attend_link); 442 spin_unlock_bh(&local->lock); 443 444 trace_rxrpc_call_poked(call); 445 rxrpc_input_call_event(call, NULL); 446 rxrpc_put_call(call, rxrpc_call_put_poke); 447 continue; 448 } 449 450 /* Process received packets and errors. */ 451 if ((skb = __skb_dequeue(&rx_queue))) { 452 switch (skb->mark) { 453 case RXRPC_SKB_MARK_PACKET: 454 skb->priority = 0; 455 rxrpc_input_packet(local, &skb); 456 trace_rxrpc_rx_done(skb->mark, skb->priority); 457 rxrpc_free_skb(skb, rxrpc_skb_put_input); 458 break; 459 case RXRPC_SKB_MARK_ERROR: 460 rxrpc_input_error(local, skb); 461 rxrpc_free_skb(skb, rxrpc_skb_put_error_report); 462 break; 463 default: 464 WARN_ON_ONCE(1); 465 rxrpc_free_skb(skb, rxrpc_skb_put_unknown); 466 break; 467 } 468 continue; 469 } 470 471 if (!skb_queue_empty(&local->rx_queue)) { 472 spin_lock_irq(&local->rx_queue.lock); 473 skb_queue_splice_tail_init(&local->rx_queue, &rx_queue); 474 spin_unlock_irq(&local->rx_queue.lock); 475 continue; 476 } 477 478 set_current_state(TASK_INTERRUPTIBLE); 479 if (!skb_queue_empty(&local->rx_queue) || 480 !list_empty(&local->call_attend_q)) { 481 __set_current_state(TASK_RUNNING); 482 continue; 483 } 484 485 if (kthread_should_stop()) 486 break; 487 schedule(); 488 } 489 490 __set_current_state(TASK_RUNNING); 491 rxrpc_see_local(local, rxrpc_local_stop); 492 rxrpc_destroy_local(local); 493 local->io_thread = NULL; 494 rxrpc_see_local(local, rxrpc_local_stopped); 495 return 0; 496 } 497