1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 33 rcu_read_lock(); 34 35 rx = rcu_dereference(call->socket); 36 sk = &rx->sk; 37 if (rx && sk->sk_state < RXRPC_CLOSE) { 38 if (call->notify_rx) { 39 spin_lock_bh(&call->notify_lock); 40 call->notify_rx(sk, call, call->user_call_ID); 41 spin_unlock_bh(&call->notify_lock); 42 } else { 43 write_lock_bh(&rx->recvmsg_lock); 44 if (list_empty(&call->recvmsg_link)) { 45 rxrpc_get_call(call, rxrpc_call_got); 46 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 47 } 48 write_unlock_bh(&rx->recvmsg_lock); 49 50 if (!sock_flag(sk, SOCK_DEAD)) { 51 _debug("call %ps", sk->sk_data_ready); 52 sk->sk_data_ready(sk); 53 } 54 } 55 } 56 57 rcu_read_unlock(); 58 _leave(""); 59 } 60 61 /* 62 * Transition a call to the complete state. 63 */ 64 bool __rxrpc_set_call_completion(struct rxrpc_call *call, 65 enum rxrpc_call_completion compl, 66 u32 abort_code, 67 int error) 68 { 69 if (call->state < RXRPC_CALL_COMPLETE) { 70 call->abort_code = abort_code; 71 call->error = error; 72 call->completion = compl; 73 call->state = RXRPC_CALL_COMPLETE; 74 trace_rxrpc_call_complete(call); 75 wake_up(&call->waitq); 76 rxrpc_notify_socket(call); 77 return true; 78 } 79 return false; 80 } 81 82 bool rxrpc_set_call_completion(struct rxrpc_call *call, 83 enum rxrpc_call_completion compl, 84 u32 abort_code, 85 int error) 86 { 87 bool ret = false; 88 89 if (call->state < RXRPC_CALL_COMPLETE) { 90 write_lock_bh(&call->state_lock); 91 ret = __rxrpc_set_call_completion(call, compl, abort_code, error); 92 write_unlock_bh(&call->state_lock); 93 } 94 return ret; 95 } 96 97 /* 98 * Record that a call successfully completed. 99 */ 100 bool __rxrpc_call_completed(struct rxrpc_call *call) 101 { 102 return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); 103 } 104 105 bool rxrpc_call_completed(struct rxrpc_call *call) 106 { 107 bool ret = false; 108 109 if (call->state < RXRPC_CALL_COMPLETE) { 110 write_lock_bh(&call->state_lock); 111 ret = __rxrpc_call_completed(call); 112 write_unlock_bh(&call->state_lock); 113 } 114 return ret; 115 } 116 117 /* 118 * Record that a call is locally aborted. 119 */ 120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call, 121 rxrpc_seq_t seq, u32 abort_code, int error) 122 { 123 trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, 124 abort_code, error); 125 return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, 126 abort_code, error); 127 } 128 129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, 130 rxrpc_seq_t seq, u32 abort_code, int error) 131 { 132 bool ret; 133 134 write_lock_bh(&call->state_lock); 135 ret = __rxrpc_abort_call(why, call, seq, abort_code, error); 136 write_unlock_bh(&call->state_lock); 137 return ret; 138 } 139 140 /* 141 * Pass a call terminating message to userspace. 142 */ 143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 144 { 145 u32 tmp = 0; 146 int ret; 147 148 switch (call->completion) { 149 case RXRPC_CALL_SUCCEEDED: 150 ret = 0; 151 if (rxrpc_is_service_call(call)) 152 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 153 break; 154 case RXRPC_CALL_REMOTELY_ABORTED: 155 tmp = call->abort_code; 156 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 157 break; 158 case RXRPC_CALL_LOCALLY_ABORTED: 159 tmp = call->abort_code; 160 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 161 break; 162 case RXRPC_CALL_NETWORK_ERROR: 163 tmp = -call->error; 164 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 165 break; 166 case RXRPC_CALL_LOCAL_ERROR: 167 tmp = -call->error; 168 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 169 break; 170 default: 171 pr_err("Invalid terminal call state %u\n", call->state); 172 BUG(); 173 break; 174 } 175 176 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 177 lower_32_bits(atomic64_read(&call->ackr_window)) - 1, 178 call->rx_pkt_offset, call->rx_pkt_len, ret); 179 return ret; 180 } 181 182 /* 183 * End the packet reception phase. 184 */ 185 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 186 { 187 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 188 189 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 190 191 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 192 193 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) 194 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 195 196 write_lock_bh(&call->state_lock); 197 198 switch (call->state) { 199 case RXRPC_CALL_CLIENT_RECV_REPLY: 200 __rxrpc_call_completed(call); 201 write_unlock_bh(&call->state_lock); 202 break; 203 204 case RXRPC_CALL_SERVER_RECV_REQUEST: 205 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 206 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 207 write_unlock_bh(&call->state_lock); 208 rxrpc_propose_delay_ACK(call, serial, 209 rxrpc_propose_ack_processing_op); 210 break; 211 default: 212 write_unlock_bh(&call->state_lock); 213 break; 214 } 215 } 216 217 /* 218 * Discard a packet we've used up and advance the Rx window by one. 219 */ 220 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 221 { 222 struct rxrpc_skb_priv *sp; 223 struct sk_buff *skb; 224 rxrpc_serial_t serial; 225 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 226 bool last; 227 int acked; 228 229 _enter("%d", call->debug_id); 230 231 skb = skb_dequeue(&call->recvmsg_queue); 232 rxrpc_see_skb(skb, rxrpc_skb_rotated); 233 234 sp = rxrpc_skb(skb); 235 tseq = sp->hdr.seq; 236 serial = sp->hdr.serial; 237 last = sp->hdr.flags & RXRPC_LAST_PACKET; 238 239 /* Barrier against rxrpc_input_data(). */ 240 if (after(tseq, call->rx_consumed)) 241 smp_store_release(&call->rx_consumed, tseq); 242 243 rxrpc_free_skb(skb, rxrpc_skb_freed); 244 245 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 246 serial, call->rx_consumed); 247 if (last) { 248 rxrpc_end_rx_phase(call, serial); 249 return; 250 } 251 252 /* Check to see if there's an ACK that needs sending. */ 253 acked = atomic_add_return(call->rx_consumed - old_consumed, 254 &call->ackr_nr_consumed); 255 if (acked > 2 && 256 !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) { 257 rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial, 258 rxrpc_propose_ack_rotate_rx); 259 rxrpc_transmit_ack_packets(call->peer->local); 260 } 261 } 262 263 /* 264 * Decrypt and verify a DATA packet. 265 */ 266 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 267 { 268 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 269 270 if (sp->flags & RXRPC_RX_VERIFIED) 271 return 0; 272 return call->security->verify_packet(call, skb); 273 } 274 275 /* 276 * Deliver messages to a call. This keeps processing packets until the buffer 277 * is filled and we find either more DATA (returns 0) or the end of the DATA 278 * (returns 1). If more packets are required, it returns -EAGAIN. 279 */ 280 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 281 struct msghdr *msg, struct iov_iter *iter, 282 size_t len, int flags, size_t *_offset) 283 { 284 struct rxrpc_skb_priv *sp; 285 struct sk_buff *skb; 286 rxrpc_seq_t seq = 0; 287 size_t remain; 288 unsigned int rx_pkt_offset, rx_pkt_len; 289 int copy, ret = -EAGAIN, ret2; 290 291 rx_pkt_offset = call->rx_pkt_offset; 292 rx_pkt_len = call->rx_pkt_len; 293 294 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { 295 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; 296 ret = 1; 297 goto done; 298 } 299 300 /* No one else can be removing stuff from the queue, so we shouldn't 301 * need the Rx lock to walk it. 302 */ 303 skb = skb_peek(&call->recvmsg_queue); 304 while (skb) { 305 rxrpc_see_skb(skb, rxrpc_skb_seen); 306 sp = rxrpc_skb(skb); 307 seq = sp->hdr.seq; 308 309 if (!(flags & MSG_PEEK)) 310 trace_rxrpc_receive(call, rxrpc_receive_front, 311 sp->hdr.serial, seq); 312 313 if (msg) 314 sock_recv_timestamp(msg, sock->sk, skb); 315 316 if (rx_pkt_offset == 0) { 317 ret2 = rxrpc_verify_data(call, skb); 318 rx_pkt_offset = sp->offset; 319 rx_pkt_len = sp->len; 320 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 321 rx_pkt_offset, rx_pkt_len, ret2); 322 if (ret2 < 0) { 323 ret = ret2; 324 goto out; 325 } 326 rxrpc_transmit_ack_packets(call->peer->local); 327 } else { 328 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 329 rx_pkt_offset, rx_pkt_len, 0); 330 } 331 332 /* We have to handle short, empty and used-up DATA packets. */ 333 remain = len - *_offset; 334 copy = rx_pkt_len; 335 if (copy > remain) 336 copy = remain; 337 if (copy > 0) { 338 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 339 copy); 340 if (ret2 < 0) { 341 ret = ret2; 342 goto out; 343 } 344 345 /* handle piecemeal consumption of data packets */ 346 rx_pkt_offset += copy; 347 rx_pkt_len -= copy; 348 *_offset += copy; 349 } 350 351 if (rx_pkt_len > 0) { 352 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 353 rx_pkt_offset, rx_pkt_len, 0); 354 ASSERTCMP(*_offset, ==, len); 355 ret = 0; 356 break; 357 } 358 359 /* The whole packet has been transferred. */ 360 if (sp->hdr.flags & RXRPC_LAST_PACKET) 361 ret = 1; 362 rx_pkt_offset = 0; 363 rx_pkt_len = 0; 364 365 skb = skb_peek_next(skb, &call->recvmsg_queue); 366 367 if (!(flags & MSG_PEEK)) 368 rxrpc_rotate_rx_window(call); 369 } 370 371 out: 372 if (!(flags & MSG_PEEK)) { 373 call->rx_pkt_offset = rx_pkt_offset; 374 call->rx_pkt_len = rx_pkt_len; 375 } 376 done: 377 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 378 rx_pkt_offset, rx_pkt_len, ret); 379 if (ret == -EAGAIN) 380 set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags); 381 return ret; 382 } 383 384 /* 385 * Receive a message from an RxRPC socket 386 * - we need to be careful about two or more threads calling recvmsg 387 * simultaneously 388 */ 389 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 390 int flags) 391 { 392 struct rxrpc_call *call; 393 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 394 struct list_head *l; 395 size_t copied = 0; 396 long timeo; 397 int ret; 398 399 DEFINE_WAIT(wait); 400 401 trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0); 402 403 if (flags & (MSG_OOB | MSG_TRUNC)) 404 return -EOPNOTSUPP; 405 406 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 407 408 try_again: 409 lock_sock(&rx->sk); 410 411 /* Return immediately if a client socket has no outstanding calls */ 412 if (RB_EMPTY_ROOT(&rx->calls) && 413 list_empty(&rx->recvmsg_q) && 414 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 415 release_sock(&rx->sk); 416 return -EAGAIN; 417 } 418 419 if (list_empty(&rx->recvmsg_q)) { 420 ret = -EWOULDBLOCK; 421 if (timeo == 0) { 422 call = NULL; 423 goto error_no_call; 424 } 425 426 release_sock(&rx->sk); 427 428 /* Wait for something to happen */ 429 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 430 TASK_INTERRUPTIBLE); 431 ret = sock_error(&rx->sk); 432 if (ret) 433 goto wait_error; 434 435 if (list_empty(&rx->recvmsg_q)) { 436 if (signal_pending(current)) 437 goto wait_interrupted; 438 trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, 0); 439 timeo = schedule_timeout(timeo); 440 } 441 finish_wait(sk_sleep(&rx->sk), &wait); 442 goto try_again; 443 } 444 445 /* Find the next call and dequeue it if we're not just peeking. If we 446 * do dequeue it, that comes with a ref that we will need to release. 447 */ 448 write_lock_bh(&rx->recvmsg_lock); 449 l = rx->recvmsg_q.next; 450 call = list_entry(l, struct rxrpc_call, recvmsg_link); 451 if (!(flags & MSG_PEEK)) 452 list_del_init(&call->recvmsg_link); 453 else 454 rxrpc_get_call(call, rxrpc_call_got); 455 write_unlock_bh(&rx->recvmsg_lock); 456 457 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0); 458 459 /* We're going to drop the socket lock, so we need to lock the call 460 * against interference by sendmsg. 461 */ 462 if (!mutex_trylock(&call->user_mutex)) { 463 ret = -EWOULDBLOCK; 464 if (flags & MSG_DONTWAIT) 465 goto error_requeue_call; 466 ret = -ERESTARTSYS; 467 if (mutex_lock_interruptible(&call->user_mutex) < 0) 468 goto error_requeue_call; 469 } 470 471 release_sock(&rx->sk); 472 473 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 474 BUG(); 475 476 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 477 if (flags & MSG_CMSG_COMPAT) { 478 unsigned int id32 = call->user_call_ID; 479 480 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 481 sizeof(unsigned int), &id32); 482 } else { 483 unsigned long idl = call->user_call_ID; 484 485 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 486 sizeof(unsigned long), &idl); 487 } 488 if (ret < 0) 489 goto error_unlock_call; 490 } 491 492 if (msg->msg_name && call->peer) { 493 struct sockaddr_rxrpc *srx = msg->msg_name; 494 size_t len = sizeof(call->peer->srx); 495 496 memcpy(msg->msg_name, &call->peer->srx, len); 497 srx->srx_service = call->service_id; 498 msg->msg_namelen = len; 499 } 500 501 switch (READ_ONCE(call->state)) { 502 case RXRPC_CALL_CLIENT_RECV_REPLY: 503 case RXRPC_CALL_SERVER_RECV_REQUEST: 504 case RXRPC_CALL_SERVER_ACK_REQUEST: 505 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 506 flags, &copied); 507 if (ret == -EAGAIN) 508 ret = 0; 509 510 rxrpc_transmit_ack_packets(call->peer->local); 511 if (!skb_queue_empty(&call->recvmsg_queue)) 512 rxrpc_notify_socket(call); 513 break; 514 default: 515 ret = 0; 516 break; 517 } 518 519 if (ret < 0) 520 goto error_unlock_call; 521 522 if (call->state == RXRPC_CALL_COMPLETE) { 523 ret = rxrpc_recvmsg_term(call, msg); 524 if (ret < 0) 525 goto error_unlock_call; 526 if (!(flags & MSG_PEEK)) 527 rxrpc_release_call(rx, call); 528 msg->msg_flags |= MSG_EOR; 529 ret = 1; 530 } 531 532 if (ret == 0) 533 msg->msg_flags |= MSG_MORE; 534 else 535 msg->msg_flags &= ~MSG_MORE; 536 ret = copied; 537 538 error_unlock_call: 539 mutex_unlock(&call->user_mutex); 540 rxrpc_put_call(call, rxrpc_call_put); 541 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); 542 return ret; 543 544 error_requeue_call: 545 if (!(flags & MSG_PEEK)) { 546 write_lock_bh(&rx->recvmsg_lock); 547 list_add(&call->recvmsg_link, &rx->recvmsg_q); 548 write_unlock_bh(&rx->recvmsg_lock); 549 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0); 550 } else { 551 rxrpc_put_call(call, rxrpc_call_put); 552 } 553 error_no_call: 554 release_sock(&rx->sk); 555 error_trace: 556 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); 557 return ret; 558 559 wait_interrupted: 560 ret = sock_intr_errno(timeo); 561 wait_error: 562 finish_wait(sk_sleep(&rx->sk), &wait); 563 call = NULL; 564 goto error_trace; 565 } 566 567 /** 568 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 569 * @sock: The socket that the call exists on 570 * @call: The call to send data through 571 * @iter: The buffer to receive into 572 * @_len: The amount of data we want to receive (decreased on return) 573 * @want_more: True if more data is expected to be read 574 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 575 * @_service: Where to store the actual service ID (may be upgraded) 576 * 577 * Allow a kernel service to receive data and pick up information about the 578 * state of a call. Returns 0 if got what was asked for and there's more 579 * available, 1 if we got what was asked for and we're at the end of the data 580 * and -EAGAIN if we need more data. 581 * 582 * Note that we may return -EAGAIN to drain empty packets at the end of the 583 * data, even if we've already copied over the requested data. 584 * 585 * *_abort should also be initialised to 0. 586 */ 587 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 588 struct iov_iter *iter, size_t *_len, 589 bool want_more, u32 *_abort, u16 *_service) 590 { 591 size_t offset = 0; 592 int ret; 593 594 _enter("{%d,%s},%zu,%d", 595 call->debug_id, rxrpc_call_states[call->state], 596 *_len, want_more); 597 598 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING); 599 600 mutex_lock(&call->user_mutex); 601 602 switch (READ_ONCE(call->state)) { 603 case RXRPC_CALL_CLIENT_RECV_REPLY: 604 case RXRPC_CALL_SERVER_RECV_REQUEST: 605 case RXRPC_CALL_SERVER_ACK_REQUEST: 606 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, 607 *_len, 0, &offset); 608 *_len -= offset; 609 if (ret < 0) 610 goto out; 611 612 /* We can only reach here with a partially full buffer if we 613 * have reached the end of the data. We must otherwise have a 614 * full buffer or have been given -EAGAIN. 615 */ 616 if (ret == 1) { 617 if (iov_iter_count(iter) > 0) 618 goto short_data; 619 if (!want_more) 620 goto read_phase_complete; 621 ret = 0; 622 goto out; 623 } 624 625 if (!want_more) 626 goto excess_data; 627 goto out; 628 629 case RXRPC_CALL_COMPLETE: 630 goto call_complete; 631 632 default: 633 ret = -EINPROGRESS; 634 goto out; 635 } 636 637 read_phase_complete: 638 ret = 1; 639 out: 640 rxrpc_transmit_ack_packets(call->peer->local); 641 if (_service) 642 *_service = call->service_id; 643 mutex_unlock(&call->user_mutex); 644 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 645 return ret; 646 647 short_data: 648 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data")); 649 ret = -EBADMSG; 650 goto out; 651 excess_data: 652 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data")); 653 ret = -EMSGSIZE; 654 goto out; 655 call_complete: 656 *_abort = call->abort_code; 657 ret = call->error; 658 if (call->completion == RXRPC_CALL_SUCCEEDED) { 659 ret = 1; 660 if (iov_iter_count(iter) > 0) 661 ret = -ECONNRESET; 662 } 663 goto out; 664 } 665 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 666