1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 33 rcu_read_lock(); 34 35 rx = rcu_dereference(call->socket); 36 sk = &rx->sk; 37 if (rx && sk->sk_state < RXRPC_CLOSE) { 38 if (call->notify_rx) { 39 spin_lock(&call->notify_lock); 40 call->notify_rx(sk, call, call->user_call_ID); 41 spin_unlock(&call->notify_lock); 42 } else { 43 write_lock(&rx->recvmsg_lock); 44 if (list_empty(&call->recvmsg_link)) { 45 rxrpc_get_call(call, rxrpc_call_get_notify_socket); 46 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 47 } 48 write_unlock(&rx->recvmsg_lock); 49 50 if (!sock_flag(sk, SOCK_DEAD)) { 51 _debug("call %ps", sk->sk_data_ready); 52 sk->sk_data_ready(sk); 53 } 54 } 55 } 56 57 rcu_read_unlock(); 58 _leave(""); 59 } 60 61 /* 62 * Transition a call to the complete state. 63 */ 64 bool __rxrpc_set_call_completion(struct rxrpc_call *call, 65 enum rxrpc_call_completion compl, 66 u32 abort_code, 67 int error) 68 { 69 if (call->state < RXRPC_CALL_COMPLETE) { 70 call->abort_code = abort_code; 71 call->error = error; 72 call->completion = compl; 73 call->state = RXRPC_CALL_COMPLETE; 74 trace_rxrpc_call_complete(call); 75 wake_up(&call->waitq); 76 rxrpc_notify_socket(call); 77 return true; 78 } 79 return false; 80 } 81 82 bool rxrpc_set_call_completion(struct rxrpc_call *call, 83 enum rxrpc_call_completion compl, 84 u32 abort_code, 85 int error) 86 { 87 bool ret = false; 88 89 if (call->state < RXRPC_CALL_COMPLETE) { 90 write_lock(&call->state_lock); 91 ret = __rxrpc_set_call_completion(call, compl, abort_code, error); 92 write_unlock(&call->state_lock); 93 } 94 return ret; 95 } 96 97 /* 98 * Record that a call successfully completed. 99 */ 100 bool __rxrpc_call_completed(struct rxrpc_call *call) 101 { 102 return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); 103 } 104 105 bool rxrpc_call_completed(struct rxrpc_call *call) 106 { 107 bool ret = false; 108 109 if (call->state < RXRPC_CALL_COMPLETE) { 110 write_lock(&call->state_lock); 111 ret = __rxrpc_call_completed(call); 112 write_unlock(&call->state_lock); 113 } 114 return ret; 115 } 116 117 /* 118 * Record that a call is locally aborted. 119 */ 120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call, 121 rxrpc_seq_t seq, u32 abort_code, int error) 122 { 123 trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, 124 abort_code, error); 125 return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, 126 abort_code, error); 127 } 128 129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, 130 rxrpc_seq_t seq, u32 abort_code, int error) 131 { 132 bool ret; 133 134 write_lock(&call->state_lock); 135 ret = __rxrpc_abort_call(why, call, seq, abort_code, error); 136 write_unlock(&call->state_lock); 137 return ret; 138 } 139 140 /* 141 * Pass a call terminating message to userspace. 142 */ 143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 144 { 145 u32 tmp = 0; 146 int ret; 147 148 switch (call->completion) { 149 case RXRPC_CALL_SUCCEEDED: 150 ret = 0; 151 if (rxrpc_is_service_call(call)) 152 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 153 break; 154 case RXRPC_CALL_REMOTELY_ABORTED: 155 tmp = call->abort_code; 156 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 157 break; 158 case RXRPC_CALL_LOCALLY_ABORTED: 159 tmp = call->abort_code; 160 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 161 break; 162 case RXRPC_CALL_NETWORK_ERROR: 163 tmp = -call->error; 164 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 165 break; 166 case RXRPC_CALL_LOCAL_ERROR: 167 tmp = -call->error; 168 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 169 break; 170 default: 171 pr_err("Invalid terminal call state %u\n", call->state); 172 BUG(); 173 break; 174 } 175 176 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 177 lower_32_bits(atomic64_read(&call->ackr_window)) - 1, 178 call->rx_pkt_offset, call->rx_pkt_len, ret); 179 return ret; 180 } 181 182 /* 183 * End the packet reception phase. 184 */ 185 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 186 { 187 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 188 189 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 190 191 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 192 193 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) 194 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 195 196 write_lock(&call->state_lock); 197 198 switch (call->state) { 199 case RXRPC_CALL_CLIENT_RECV_REPLY: 200 __rxrpc_call_completed(call); 201 write_unlock(&call->state_lock); 202 break; 203 204 case RXRPC_CALL_SERVER_RECV_REQUEST: 205 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 206 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 207 write_unlock(&call->state_lock); 208 rxrpc_propose_delay_ACK(call, serial, 209 rxrpc_propose_ack_processing_op); 210 break; 211 default: 212 write_unlock(&call->state_lock); 213 break; 214 } 215 } 216 217 /* 218 * Discard a packet we've used up and advance the Rx window by one. 219 */ 220 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 221 { 222 struct rxrpc_skb_priv *sp; 223 struct sk_buff *skb; 224 rxrpc_serial_t serial; 225 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 226 bool last; 227 int acked; 228 229 _enter("%d", call->debug_id); 230 231 skb = skb_dequeue(&call->recvmsg_queue); 232 rxrpc_see_skb(skb, rxrpc_skb_see_rotate); 233 234 sp = rxrpc_skb(skb); 235 tseq = sp->hdr.seq; 236 serial = sp->hdr.serial; 237 last = sp->hdr.flags & RXRPC_LAST_PACKET; 238 239 /* Barrier against rxrpc_input_data(). */ 240 if (after(tseq, call->rx_consumed)) 241 smp_store_release(&call->rx_consumed, tseq); 242 243 rxrpc_free_skb(skb, rxrpc_skb_put_rotate); 244 245 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 246 serial, call->rx_consumed); 247 if (last) { 248 rxrpc_end_rx_phase(call, serial); 249 return; 250 } 251 252 /* Check to see if there's an ACK that needs sending. */ 253 acked = atomic_add_return(call->rx_consumed - old_consumed, 254 &call->ackr_nr_consumed); 255 if (acked > 2 && 256 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 257 rxrpc_poke_call(call, rxrpc_call_poke_idle); 258 } 259 260 /* 261 * Decrypt and verify a DATA packet. 262 */ 263 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 264 { 265 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 266 267 if (sp->flags & RXRPC_RX_VERIFIED) 268 return 0; 269 return call->security->verify_packet(call, skb); 270 } 271 272 /* 273 * Deliver messages to a call. This keeps processing packets until the buffer 274 * is filled and we find either more DATA (returns 0) or the end of the DATA 275 * (returns 1). If more packets are required, it returns -EAGAIN. 276 */ 277 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 278 struct msghdr *msg, struct iov_iter *iter, 279 size_t len, int flags, size_t *_offset) 280 { 281 struct rxrpc_skb_priv *sp; 282 struct sk_buff *skb; 283 rxrpc_seq_t seq = 0; 284 size_t remain; 285 unsigned int rx_pkt_offset, rx_pkt_len; 286 int copy, ret = -EAGAIN, ret2; 287 288 rx_pkt_offset = call->rx_pkt_offset; 289 rx_pkt_len = call->rx_pkt_len; 290 291 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { 292 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; 293 ret = 1; 294 goto done; 295 } 296 297 /* No one else can be removing stuff from the queue, so we shouldn't 298 * need the Rx lock to walk it. 299 */ 300 skb = skb_peek(&call->recvmsg_queue); 301 while (skb) { 302 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 303 sp = rxrpc_skb(skb); 304 seq = sp->hdr.seq; 305 306 if (!(flags & MSG_PEEK)) 307 trace_rxrpc_receive(call, rxrpc_receive_front, 308 sp->hdr.serial, seq); 309 310 if (msg) 311 sock_recv_timestamp(msg, sock->sk, skb); 312 313 if (rx_pkt_offset == 0) { 314 ret2 = rxrpc_verify_data(call, skb); 315 rx_pkt_offset = sp->offset; 316 rx_pkt_len = sp->len; 317 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 318 rx_pkt_offset, rx_pkt_len, ret2); 319 if (ret2 < 0) { 320 ret = ret2; 321 goto out; 322 } 323 } else { 324 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 325 rx_pkt_offset, rx_pkt_len, 0); 326 } 327 328 /* We have to handle short, empty and used-up DATA packets. */ 329 remain = len - *_offset; 330 copy = rx_pkt_len; 331 if (copy > remain) 332 copy = remain; 333 if (copy > 0) { 334 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 335 copy); 336 if (ret2 < 0) { 337 ret = ret2; 338 goto out; 339 } 340 341 /* handle piecemeal consumption of data packets */ 342 rx_pkt_offset += copy; 343 rx_pkt_len -= copy; 344 *_offset += copy; 345 } 346 347 if (rx_pkt_len > 0) { 348 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 349 rx_pkt_offset, rx_pkt_len, 0); 350 ASSERTCMP(*_offset, ==, len); 351 ret = 0; 352 break; 353 } 354 355 /* The whole packet has been transferred. */ 356 if (sp->hdr.flags & RXRPC_LAST_PACKET) 357 ret = 1; 358 rx_pkt_offset = 0; 359 rx_pkt_len = 0; 360 361 skb = skb_peek_next(skb, &call->recvmsg_queue); 362 363 if (!(flags & MSG_PEEK)) 364 rxrpc_rotate_rx_window(call); 365 } 366 367 out: 368 if (!(flags & MSG_PEEK)) { 369 call->rx_pkt_offset = rx_pkt_offset; 370 call->rx_pkt_len = rx_pkt_len; 371 } 372 done: 373 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 374 rx_pkt_offset, rx_pkt_len, ret); 375 if (ret == -EAGAIN) 376 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 377 return ret; 378 } 379 380 /* 381 * Receive a message from an RxRPC socket 382 * - we need to be careful about two or more threads calling recvmsg 383 * simultaneously 384 */ 385 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 386 int flags) 387 { 388 struct rxrpc_call *call; 389 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 390 struct list_head *l; 391 size_t copied = 0; 392 long timeo; 393 int ret; 394 395 DEFINE_WAIT(wait); 396 397 trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0); 398 399 if (flags & (MSG_OOB | MSG_TRUNC)) 400 return -EOPNOTSUPP; 401 402 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 403 404 try_again: 405 lock_sock(&rx->sk); 406 407 /* Return immediately if a client socket has no outstanding calls */ 408 if (RB_EMPTY_ROOT(&rx->calls) && 409 list_empty(&rx->recvmsg_q) && 410 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 411 release_sock(&rx->sk); 412 return -EAGAIN; 413 } 414 415 if (list_empty(&rx->recvmsg_q)) { 416 ret = -EWOULDBLOCK; 417 if (timeo == 0) { 418 call = NULL; 419 goto error_no_call; 420 } 421 422 release_sock(&rx->sk); 423 424 /* Wait for something to happen */ 425 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 426 TASK_INTERRUPTIBLE); 427 ret = sock_error(&rx->sk); 428 if (ret) 429 goto wait_error; 430 431 if (list_empty(&rx->recvmsg_q)) { 432 if (signal_pending(current)) 433 goto wait_interrupted; 434 trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, 0); 435 timeo = schedule_timeout(timeo); 436 } 437 finish_wait(sk_sleep(&rx->sk), &wait); 438 goto try_again; 439 } 440 441 /* Find the next call and dequeue it if we're not just peeking. If we 442 * do dequeue it, that comes with a ref that we will need to release. 443 */ 444 write_lock(&rx->recvmsg_lock); 445 l = rx->recvmsg_q.next; 446 call = list_entry(l, struct rxrpc_call, recvmsg_link); 447 if (!(flags & MSG_PEEK)) 448 list_del_init(&call->recvmsg_link); 449 else 450 rxrpc_get_call(call, rxrpc_call_get_recvmsg); 451 write_unlock(&rx->recvmsg_lock); 452 453 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0); 454 455 /* We're going to drop the socket lock, so we need to lock the call 456 * against interference by sendmsg. 457 */ 458 if (!mutex_trylock(&call->user_mutex)) { 459 ret = -EWOULDBLOCK; 460 if (flags & MSG_DONTWAIT) 461 goto error_requeue_call; 462 ret = -ERESTARTSYS; 463 if (mutex_lock_interruptible(&call->user_mutex) < 0) 464 goto error_requeue_call; 465 } 466 467 release_sock(&rx->sk); 468 469 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 470 BUG(); 471 472 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 473 if (flags & MSG_CMSG_COMPAT) { 474 unsigned int id32 = call->user_call_ID; 475 476 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 477 sizeof(unsigned int), &id32); 478 } else { 479 unsigned long idl = call->user_call_ID; 480 481 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 482 sizeof(unsigned long), &idl); 483 } 484 if (ret < 0) 485 goto error_unlock_call; 486 } 487 488 if (msg->msg_name && call->peer) { 489 size_t len = sizeof(call->dest_srx); 490 491 memcpy(msg->msg_name, &call->dest_srx, len); 492 msg->msg_namelen = len; 493 } 494 495 switch (READ_ONCE(call->state)) { 496 case RXRPC_CALL_CLIENT_RECV_REPLY: 497 case RXRPC_CALL_SERVER_RECV_REQUEST: 498 case RXRPC_CALL_SERVER_ACK_REQUEST: 499 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 500 flags, &copied); 501 if (ret == -EAGAIN) 502 ret = 0; 503 504 if (!skb_queue_empty(&call->recvmsg_queue)) 505 rxrpc_notify_socket(call); 506 break; 507 default: 508 ret = 0; 509 break; 510 } 511 512 if (ret < 0) 513 goto error_unlock_call; 514 515 if (call->state == RXRPC_CALL_COMPLETE) { 516 ret = rxrpc_recvmsg_term(call, msg); 517 if (ret < 0) 518 goto error_unlock_call; 519 if (!(flags & MSG_PEEK)) 520 rxrpc_release_call(rx, call); 521 msg->msg_flags |= MSG_EOR; 522 ret = 1; 523 } 524 525 if (ret == 0) 526 msg->msg_flags |= MSG_MORE; 527 else 528 msg->msg_flags &= ~MSG_MORE; 529 ret = copied; 530 531 error_unlock_call: 532 mutex_unlock(&call->user_mutex); 533 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 534 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); 535 return ret; 536 537 error_requeue_call: 538 if (!(flags & MSG_PEEK)) { 539 write_lock(&rx->recvmsg_lock); 540 list_add(&call->recvmsg_link, &rx->recvmsg_q); 541 write_unlock(&rx->recvmsg_lock); 542 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0); 543 } else { 544 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 545 } 546 error_no_call: 547 release_sock(&rx->sk); 548 error_trace: 549 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); 550 return ret; 551 552 wait_interrupted: 553 ret = sock_intr_errno(timeo); 554 wait_error: 555 finish_wait(sk_sleep(&rx->sk), &wait); 556 call = NULL; 557 goto error_trace; 558 } 559 560 /** 561 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 562 * @sock: The socket that the call exists on 563 * @call: The call to send data through 564 * @iter: The buffer to receive into 565 * @_len: The amount of data we want to receive (decreased on return) 566 * @want_more: True if more data is expected to be read 567 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 568 * @_service: Where to store the actual service ID (may be upgraded) 569 * 570 * Allow a kernel service to receive data and pick up information about the 571 * state of a call. Returns 0 if got what was asked for and there's more 572 * available, 1 if we got what was asked for and we're at the end of the data 573 * and -EAGAIN if we need more data. 574 * 575 * Note that we may return -EAGAIN to drain empty packets at the end of the 576 * data, even if we've already copied over the requested data. 577 * 578 * *_abort should also be initialised to 0. 579 */ 580 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 581 struct iov_iter *iter, size_t *_len, 582 bool want_more, u32 *_abort, u16 *_service) 583 { 584 size_t offset = 0; 585 int ret; 586 587 _enter("{%d,%s},%zu,%d", 588 call->debug_id, rxrpc_call_states[call->state], 589 *_len, want_more); 590 591 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING); 592 593 mutex_lock(&call->user_mutex); 594 595 switch (READ_ONCE(call->state)) { 596 case RXRPC_CALL_CLIENT_RECV_REPLY: 597 case RXRPC_CALL_SERVER_RECV_REQUEST: 598 case RXRPC_CALL_SERVER_ACK_REQUEST: 599 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, 600 *_len, 0, &offset); 601 *_len -= offset; 602 if (ret < 0) 603 goto out; 604 605 /* We can only reach here with a partially full buffer if we 606 * have reached the end of the data. We must otherwise have a 607 * full buffer or have been given -EAGAIN. 608 */ 609 if (ret == 1) { 610 if (iov_iter_count(iter) > 0) 611 goto short_data; 612 if (!want_more) 613 goto read_phase_complete; 614 ret = 0; 615 goto out; 616 } 617 618 if (!want_more) 619 goto excess_data; 620 goto out; 621 622 case RXRPC_CALL_COMPLETE: 623 goto call_complete; 624 625 default: 626 ret = -EINPROGRESS; 627 goto out; 628 } 629 630 read_phase_complete: 631 ret = 1; 632 out: 633 if (_service) 634 *_service = call->dest_srx.srx_service; 635 mutex_unlock(&call->user_mutex); 636 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 637 return ret; 638 639 short_data: 640 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data")); 641 ret = -EBADMSG; 642 goto out; 643 excess_data: 644 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data")); 645 ret = -EMSGSIZE; 646 goto out; 647 call_complete: 648 *_abort = call->abort_code; 649 ret = call->error; 650 if (call->completion == RXRPC_CALL_SUCCEEDED) { 651 ret = 1; 652 if (iov_iter_count(iter) > 0) 653 ret = -ECONNRESET; 654 } 655 goto out; 656 } 657 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 658