1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 33 rcu_read_lock(); 34 35 rx = rcu_dereference(call->socket); 36 sk = &rx->sk; 37 if (rx && sk->sk_state < RXRPC_CLOSE) { 38 if (call->notify_rx) { 39 spin_lock(&call->notify_lock); 40 call->notify_rx(sk, call, call->user_call_ID); 41 spin_unlock(&call->notify_lock); 42 } else { 43 write_lock(&rx->recvmsg_lock); 44 if (list_empty(&call->recvmsg_link)) { 45 rxrpc_get_call(call, rxrpc_call_get_notify_socket); 46 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 47 } 48 write_unlock(&rx->recvmsg_lock); 49 50 if (!sock_flag(sk, SOCK_DEAD)) { 51 _debug("call %ps", sk->sk_data_ready); 52 sk->sk_data_ready(sk); 53 } 54 } 55 } 56 57 rcu_read_unlock(); 58 _leave(""); 59 } 60 61 /* 62 * Transition a call to the complete state. 63 */ 64 bool __rxrpc_set_call_completion(struct rxrpc_call *call, 65 enum rxrpc_call_completion compl, 66 u32 abort_code, 67 int error) 68 { 69 if (call->state < RXRPC_CALL_COMPLETE) { 70 call->abort_code = abort_code; 71 call->error = error; 72 call->completion = compl; 73 call->state = RXRPC_CALL_COMPLETE; 74 trace_rxrpc_call_complete(call); 75 wake_up(&call->waitq); 76 rxrpc_notify_socket(call); 77 return true; 78 } 79 return false; 80 } 81 82 bool rxrpc_set_call_completion(struct rxrpc_call *call, 83 enum rxrpc_call_completion compl, 84 u32 abort_code, 85 int error) 86 { 87 bool ret = false; 88 89 if (call->state < RXRPC_CALL_COMPLETE) { 90 write_lock(&call->state_lock); 91 ret = __rxrpc_set_call_completion(call, compl, abort_code, error); 92 write_unlock(&call->state_lock); 93 } 94 return ret; 95 } 96 97 /* 98 * Record that a call successfully completed. 99 */ 100 bool __rxrpc_call_completed(struct rxrpc_call *call) 101 { 102 return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); 103 } 104 105 bool rxrpc_call_completed(struct rxrpc_call *call) 106 { 107 bool ret = false; 108 109 if (call->state < RXRPC_CALL_COMPLETE) { 110 write_lock(&call->state_lock); 111 ret = __rxrpc_call_completed(call); 112 write_unlock(&call->state_lock); 113 } 114 return ret; 115 } 116 117 /* 118 * Record that a call is locally aborted. 119 */ 120 bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq, 121 u32 abort_code, int error, enum rxrpc_abort_reason why) 122 { 123 trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, 124 abort_code, error); 125 return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, 126 abort_code, error); 127 } 128 129 bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq, 130 u32 abort_code, int error, enum rxrpc_abort_reason why) 131 { 132 bool ret; 133 134 write_lock(&call->state_lock); 135 ret = __rxrpc_abort_call(call, seq, abort_code, error, why); 136 write_unlock(&call->state_lock); 137 if (ret && test_bit(RXRPC_CALL_EXPOSED, &call->flags)) 138 rxrpc_send_abort_packet(call); 139 return ret; 140 } 141 142 /* 143 * Pass a call terminating message to userspace. 144 */ 145 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 146 { 147 u32 tmp = 0; 148 int ret; 149 150 switch (call->completion) { 151 case RXRPC_CALL_SUCCEEDED: 152 ret = 0; 153 if (rxrpc_is_service_call(call)) 154 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 155 break; 156 case RXRPC_CALL_REMOTELY_ABORTED: 157 tmp = call->abort_code; 158 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 159 break; 160 case RXRPC_CALL_LOCALLY_ABORTED: 161 tmp = call->abort_code; 162 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 163 break; 164 case RXRPC_CALL_NETWORK_ERROR: 165 tmp = -call->error; 166 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 167 break; 168 case RXRPC_CALL_LOCAL_ERROR: 169 tmp = -call->error; 170 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 171 break; 172 default: 173 pr_err("Invalid terminal call state %u\n", call->state); 174 BUG(); 175 break; 176 } 177 178 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 179 lower_32_bits(atomic64_read(&call->ackr_window)) - 1, 180 call->rx_pkt_offset, call->rx_pkt_len, ret); 181 return ret; 182 } 183 184 /* 185 * End the packet reception phase. 186 */ 187 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 188 { 189 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 190 191 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 192 193 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 194 195 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) 196 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 197 198 write_lock(&call->state_lock); 199 200 switch (call->state) { 201 case RXRPC_CALL_CLIENT_RECV_REPLY: 202 __rxrpc_call_completed(call); 203 write_unlock(&call->state_lock); 204 rxrpc_poke_call(call, rxrpc_call_poke_complete); 205 break; 206 207 case RXRPC_CALL_SERVER_RECV_REQUEST: 208 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 209 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 210 write_unlock(&call->state_lock); 211 rxrpc_propose_delay_ACK(call, serial, 212 rxrpc_propose_ack_processing_op); 213 break; 214 default: 215 write_unlock(&call->state_lock); 216 break; 217 } 218 } 219 220 /* 221 * Discard a packet we've used up and advance the Rx window by one. 222 */ 223 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 224 { 225 struct rxrpc_skb_priv *sp; 226 struct sk_buff *skb; 227 rxrpc_serial_t serial; 228 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 229 bool last; 230 int acked; 231 232 _enter("%d", call->debug_id); 233 234 skb = skb_dequeue(&call->recvmsg_queue); 235 rxrpc_see_skb(skb, rxrpc_skb_see_rotate); 236 237 sp = rxrpc_skb(skb); 238 tseq = sp->hdr.seq; 239 serial = sp->hdr.serial; 240 last = sp->hdr.flags & RXRPC_LAST_PACKET; 241 242 /* Barrier against rxrpc_input_data(). */ 243 if (after(tseq, call->rx_consumed)) 244 smp_store_release(&call->rx_consumed, tseq); 245 246 rxrpc_free_skb(skb, rxrpc_skb_put_rotate); 247 248 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 249 serial, call->rx_consumed); 250 if (last) { 251 rxrpc_end_rx_phase(call, serial); 252 return; 253 } 254 255 /* Check to see if there's an ACK that needs sending. */ 256 acked = atomic_add_return(call->rx_consumed - old_consumed, 257 &call->ackr_nr_consumed); 258 if (acked > 2 && 259 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 260 rxrpc_poke_call(call, rxrpc_call_poke_idle); 261 } 262 263 /* 264 * Decrypt and verify a DATA packet. 265 */ 266 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 267 { 268 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 269 270 if (sp->flags & RXRPC_RX_VERIFIED) 271 return 0; 272 return call->security->verify_packet(call, skb); 273 } 274 275 /* 276 * Deliver messages to a call. This keeps processing packets until the buffer 277 * is filled and we find either more DATA (returns 0) or the end of the DATA 278 * (returns 1). If more packets are required, it returns -EAGAIN. 279 */ 280 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 281 struct msghdr *msg, struct iov_iter *iter, 282 size_t len, int flags, size_t *_offset) 283 { 284 struct rxrpc_skb_priv *sp; 285 struct sk_buff *skb; 286 rxrpc_seq_t seq = 0; 287 size_t remain; 288 unsigned int rx_pkt_offset, rx_pkt_len; 289 int copy, ret = -EAGAIN, ret2; 290 291 rx_pkt_offset = call->rx_pkt_offset; 292 rx_pkt_len = call->rx_pkt_len; 293 294 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { 295 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; 296 ret = 1; 297 goto done; 298 } 299 300 /* No one else can be removing stuff from the queue, so we shouldn't 301 * need the Rx lock to walk it. 302 */ 303 skb = skb_peek(&call->recvmsg_queue); 304 while (skb) { 305 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 306 sp = rxrpc_skb(skb); 307 seq = sp->hdr.seq; 308 309 if (!(flags & MSG_PEEK)) 310 trace_rxrpc_receive(call, rxrpc_receive_front, 311 sp->hdr.serial, seq); 312 313 if (msg) 314 sock_recv_timestamp(msg, sock->sk, skb); 315 316 if (rx_pkt_offset == 0) { 317 ret2 = rxrpc_verify_data(call, skb); 318 rx_pkt_offset = sp->offset; 319 rx_pkt_len = sp->len; 320 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 321 rx_pkt_offset, rx_pkt_len, ret2); 322 if (ret2 < 0) { 323 ret = ret2; 324 goto out; 325 } 326 } else { 327 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 328 rx_pkt_offset, rx_pkt_len, 0); 329 } 330 331 /* We have to handle short, empty and used-up DATA packets. */ 332 remain = len - *_offset; 333 copy = rx_pkt_len; 334 if (copy > remain) 335 copy = remain; 336 if (copy > 0) { 337 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 338 copy); 339 if (ret2 < 0) { 340 ret = ret2; 341 goto out; 342 } 343 344 /* handle piecemeal consumption of data packets */ 345 rx_pkt_offset += copy; 346 rx_pkt_len -= copy; 347 *_offset += copy; 348 } 349 350 if (rx_pkt_len > 0) { 351 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 352 rx_pkt_offset, rx_pkt_len, 0); 353 ASSERTCMP(*_offset, ==, len); 354 ret = 0; 355 break; 356 } 357 358 /* The whole packet has been transferred. */ 359 if (sp->hdr.flags & RXRPC_LAST_PACKET) 360 ret = 1; 361 rx_pkt_offset = 0; 362 rx_pkt_len = 0; 363 364 skb = skb_peek_next(skb, &call->recvmsg_queue); 365 366 if (!(flags & MSG_PEEK)) 367 rxrpc_rotate_rx_window(call); 368 } 369 370 out: 371 if (!(flags & MSG_PEEK)) { 372 call->rx_pkt_offset = rx_pkt_offset; 373 call->rx_pkt_len = rx_pkt_len; 374 } 375 done: 376 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 377 rx_pkt_offset, rx_pkt_len, ret); 378 if (ret == -EAGAIN) 379 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 380 return ret; 381 } 382 383 /* 384 * Receive a message from an RxRPC socket 385 * - we need to be careful about two or more threads calling recvmsg 386 * simultaneously 387 */ 388 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 389 int flags) 390 { 391 struct rxrpc_call *call; 392 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 393 struct list_head *l; 394 unsigned int call_debug_id = 0; 395 size_t copied = 0; 396 long timeo; 397 int ret; 398 399 DEFINE_WAIT(wait); 400 401 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0); 402 403 if (flags & (MSG_OOB | MSG_TRUNC)) 404 return -EOPNOTSUPP; 405 406 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 407 408 try_again: 409 lock_sock(&rx->sk); 410 411 /* Return immediately if a client socket has no outstanding calls */ 412 if (RB_EMPTY_ROOT(&rx->calls) && 413 list_empty(&rx->recvmsg_q) && 414 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 415 release_sock(&rx->sk); 416 return -EAGAIN; 417 } 418 419 if (list_empty(&rx->recvmsg_q)) { 420 ret = -EWOULDBLOCK; 421 if (timeo == 0) { 422 call = NULL; 423 goto error_no_call; 424 } 425 426 release_sock(&rx->sk); 427 428 /* Wait for something to happen */ 429 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 430 TASK_INTERRUPTIBLE); 431 ret = sock_error(&rx->sk); 432 if (ret) 433 goto wait_error; 434 435 if (list_empty(&rx->recvmsg_q)) { 436 if (signal_pending(current)) 437 goto wait_interrupted; 438 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0); 439 timeo = schedule_timeout(timeo); 440 } 441 finish_wait(sk_sleep(&rx->sk), &wait); 442 goto try_again; 443 } 444 445 /* Find the next call and dequeue it if we're not just peeking. If we 446 * do dequeue it, that comes with a ref that we will need to release. 447 */ 448 write_lock(&rx->recvmsg_lock); 449 l = rx->recvmsg_q.next; 450 call = list_entry(l, struct rxrpc_call, recvmsg_link); 451 if (!(flags & MSG_PEEK)) 452 list_del_init(&call->recvmsg_link); 453 else 454 rxrpc_get_call(call, rxrpc_call_get_recvmsg); 455 write_unlock(&rx->recvmsg_lock); 456 457 call_debug_id = call->debug_id; 458 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0); 459 460 /* We're going to drop the socket lock, so we need to lock the call 461 * against interference by sendmsg. 462 */ 463 if (!mutex_trylock(&call->user_mutex)) { 464 ret = -EWOULDBLOCK; 465 if (flags & MSG_DONTWAIT) 466 goto error_requeue_call; 467 ret = -ERESTARTSYS; 468 if (mutex_lock_interruptible(&call->user_mutex) < 0) 469 goto error_requeue_call; 470 } 471 472 release_sock(&rx->sk); 473 474 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 475 BUG(); 476 477 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 478 if (flags & MSG_CMSG_COMPAT) { 479 unsigned int id32 = call->user_call_ID; 480 481 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 482 sizeof(unsigned int), &id32); 483 } else { 484 unsigned long idl = call->user_call_ID; 485 486 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 487 sizeof(unsigned long), &idl); 488 } 489 if (ret < 0) 490 goto error_unlock_call; 491 } 492 493 if (msg->msg_name && call->peer) { 494 size_t len = sizeof(call->dest_srx); 495 496 memcpy(msg->msg_name, &call->dest_srx, len); 497 msg->msg_namelen = len; 498 } 499 500 switch (READ_ONCE(call->state)) { 501 case RXRPC_CALL_CLIENT_RECV_REPLY: 502 case RXRPC_CALL_SERVER_RECV_REQUEST: 503 case RXRPC_CALL_SERVER_ACK_REQUEST: 504 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 505 flags, &copied); 506 if (ret == -EAGAIN) 507 ret = 0; 508 509 if (!skb_queue_empty(&call->recvmsg_queue)) 510 rxrpc_notify_socket(call); 511 break; 512 default: 513 ret = 0; 514 break; 515 } 516 517 if (ret < 0) 518 goto error_unlock_call; 519 520 if (call->state == RXRPC_CALL_COMPLETE) { 521 ret = rxrpc_recvmsg_term(call, msg); 522 if (ret < 0) 523 goto error_unlock_call; 524 if (!(flags & MSG_PEEK)) 525 rxrpc_release_call(rx, call); 526 msg->msg_flags |= MSG_EOR; 527 ret = 1; 528 } 529 530 if (ret == 0) 531 msg->msg_flags |= MSG_MORE; 532 else 533 msg->msg_flags &= ~MSG_MORE; 534 ret = copied; 535 536 error_unlock_call: 537 mutex_unlock(&call->user_mutex); 538 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 539 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 540 return ret; 541 542 error_requeue_call: 543 if (!(flags & MSG_PEEK)) { 544 write_lock(&rx->recvmsg_lock); 545 list_add(&call->recvmsg_link, &rx->recvmsg_q); 546 write_unlock(&rx->recvmsg_lock); 547 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0); 548 } else { 549 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 550 } 551 error_no_call: 552 release_sock(&rx->sk); 553 error_trace: 554 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 555 return ret; 556 557 wait_interrupted: 558 ret = sock_intr_errno(timeo); 559 wait_error: 560 finish_wait(sk_sleep(&rx->sk), &wait); 561 call = NULL; 562 goto error_trace; 563 } 564 565 /** 566 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 567 * @sock: The socket that the call exists on 568 * @call: The call to send data through 569 * @iter: The buffer to receive into 570 * @_len: The amount of data we want to receive (decreased on return) 571 * @want_more: True if more data is expected to be read 572 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 573 * @_service: Where to store the actual service ID (may be upgraded) 574 * 575 * Allow a kernel service to receive data and pick up information about the 576 * state of a call. Returns 0 if got what was asked for and there's more 577 * available, 1 if we got what was asked for and we're at the end of the data 578 * and -EAGAIN if we need more data. 579 * 580 * Note that we may return -EAGAIN to drain empty packets at the end of the 581 * data, even if we've already copied over the requested data. 582 * 583 * *_abort should also be initialised to 0. 584 */ 585 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 586 struct iov_iter *iter, size_t *_len, 587 bool want_more, u32 *_abort, u16 *_service) 588 { 589 size_t offset = 0; 590 int ret; 591 592 _enter("{%d,%s},%zu,%d", 593 call->debug_id, rxrpc_call_states[call->state], 594 *_len, want_more); 595 596 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING); 597 598 mutex_lock(&call->user_mutex); 599 600 switch (READ_ONCE(call->state)) { 601 case RXRPC_CALL_CLIENT_RECV_REPLY: 602 case RXRPC_CALL_SERVER_RECV_REQUEST: 603 case RXRPC_CALL_SERVER_ACK_REQUEST: 604 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, 605 *_len, 0, &offset); 606 *_len -= offset; 607 if (ret < 0) 608 goto out; 609 610 /* We can only reach here with a partially full buffer if we 611 * have reached the end of the data. We must otherwise have a 612 * full buffer or have been given -EAGAIN. 613 */ 614 if (ret == 1) { 615 if (iov_iter_count(iter) > 0) 616 goto short_data; 617 if (!want_more) 618 goto read_phase_complete; 619 ret = 0; 620 goto out; 621 } 622 623 if (!want_more) 624 goto excess_data; 625 goto out; 626 627 case RXRPC_CALL_COMPLETE: 628 goto call_complete; 629 630 default: 631 ret = -EINPROGRESS; 632 goto out; 633 } 634 635 read_phase_complete: 636 ret = 1; 637 out: 638 if (_service) 639 *_service = call->dest_srx.srx_service; 640 mutex_unlock(&call->user_mutex); 641 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 642 return ret; 643 644 short_data: 645 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data, 646 call->cid, call->call_id, call->rx_consumed, 647 0, -EBADMSG); 648 ret = -EBADMSG; 649 goto out; 650 excess_data: 651 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data, 652 call->cid, call->call_id, call->rx_consumed, 653 0, -EMSGSIZE); 654 ret = -EMSGSIZE; 655 goto out; 656 call_complete: 657 *_abort = call->abort_code; 658 ret = call->error; 659 if (call->completion == RXRPC_CALL_SUCCEEDED) { 660 ret = 1; 661 if (iov_iter_count(iter) > 0) 662 ret = -ECONNRESET; 663 } 664 goto out; 665 } 666 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 667