1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 33 rcu_read_lock(); 34 35 rx = rcu_dereference(call->socket); 36 sk = &rx->sk; 37 if (rx && sk->sk_state < RXRPC_CLOSE) { 38 if (call->notify_rx) { 39 spin_lock(&call->notify_lock); 40 call->notify_rx(sk, call, call->user_call_ID); 41 spin_unlock(&call->notify_lock); 42 } else { 43 write_lock(&rx->recvmsg_lock); 44 if (list_empty(&call->recvmsg_link)) { 45 rxrpc_get_call(call, rxrpc_call_get_notify_socket); 46 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 47 } 48 write_unlock(&rx->recvmsg_lock); 49 50 if (!sock_flag(sk, SOCK_DEAD)) { 51 _debug("call %ps", sk->sk_data_ready); 52 sk->sk_data_ready(sk); 53 } 54 } 55 } 56 57 rcu_read_unlock(); 58 _leave(""); 59 } 60 61 /* 62 * Pass a call terminating message to userspace. 63 */ 64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 65 { 66 u32 tmp = 0; 67 int ret; 68 69 switch (call->completion) { 70 case RXRPC_CALL_SUCCEEDED: 71 ret = 0; 72 if (rxrpc_is_service_call(call)) 73 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 74 break; 75 case RXRPC_CALL_REMOTELY_ABORTED: 76 tmp = call->abort_code; 77 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 78 break; 79 case RXRPC_CALL_LOCALLY_ABORTED: 80 tmp = call->abort_code; 81 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 82 break; 83 case RXRPC_CALL_NETWORK_ERROR: 84 tmp = -call->error; 85 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 86 break; 87 case RXRPC_CALL_LOCAL_ERROR: 88 tmp = -call->error; 89 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 90 break; 91 default: 92 pr_err("Invalid terminal call state %u\n", call->state); 93 BUG(); 94 break; 95 } 96 97 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 98 lower_32_bits(atomic64_read(&call->ackr_window)) - 1, 99 call->rx_pkt_offset, call->rx_pkt_len, ret); 100 return ret; 101 } 102 103 /* 104 * End the packet reception phase. 105 */ 106 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 107 { 108 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 109 110 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 111 112 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 113 114 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) 115 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 116 117 write_lock(&call->state_lock); 118 119 switch (call->state) { 120 case RXRPC_CALL_CLIENT_RECV_REPLY: 121 __rxrpc_call_completed(call); 122 write_unlock(&call->state_lock); 123 rxrpc_poke_call(call, rxrpc_call_poke_complete); 124 break; 125 126 case RXRPC_CALL_SERVER_RECV_REQUEST: 127 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 128 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 129 write_unlock(&call->state_lock); 130 rxrpc_propose_delay_ACK(call, serial, 131 rxrpc_propose_ack_processing_op); 132 break; 133 default: 134 write_unlock(&call->state_lock); 135 break; 136 } 137 } 138 139 /* 140 * Discard a packet we've used up and advance the Rx window by one. 141 */ 142 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 143 { 144 struct rxrpc_skb_priv *sp; 145 struct sk_buff *skb; 146 rxrpc_serial_t serial; 147 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 148 bool last; 149 int acked; 150 151 _enter("%d", call->debug_id); 152 153 skb = skb_dequeue(&call->recvmsg_queue); 154 rxrpc_see_skb(skb, rxrpc_skb_see_rotate); 155 156 sp = rxrpc_skb(skb); 157 tseq = sp->hdr.seq; 158 serial = sp->hdr.serial; 159 last = sp->hdr.flags & RXRPC_LAST_PACKET; 160 161 /* Barrier against rxrpc_input_data(). */ 162 if (after(tseq, call->rx_consumed)) 163 smp_store_release(&call->rx_consumed, tseq); 164 165 rxrpc_free_skb(skb, rxrpc_skb_put_rotate); 166 167 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 168 serial, call->rx_consumed); 169 if (last) { 170 rxrpc_end_rx_phase(call, serial); 171 return; 172 } 173 174 /* Check to see if there's an ACK that needs sending. */ 175 acked = atomic_add_return(call->rx_consumed - old_consumed, 176 &call->ackr_nr_consumed); 177 if (acked > 2 && 178 !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags)) 179 rxrpc_poke_call(call, rxrpc_call_poke_idle); 180 } 181 182 /* 183 * Decrypt and verify a DATA packet. 184 */ 185 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 186 { 187 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 188 189 if (sp->flags & RXRPC_RX_VERIFIED) 190 return 0; 191 return call->security->verify_packet(call, skb); 192 } 193 194 /* 195 * Deliver messages to a call. This keeps processing packets until the buffer 196 * is filled and we find either more DATA (returns 0) or the end of the DATA 197 * (returns 1). If more packets are required, it returns -EAGAIN. 198 */ 199 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 200 struct msghdr *msg, struct iov_iter *iter, 201 size_t len, int flags, size_t *_offset) 202 { 203 struct rxrpc_skb_priv *sp; 204 struct sk_buff *skb; 205 rxrpc_seq_t seq = 0; 206 size_t remain; 207 unsigned int rx_pkt_offset, rx_pkt_len; 208 int copy, ret = -EAGAIN, ret2; 209 210 rx_pkt_offset = call->rx_pkt_offset; 211 rx_pkt_len = call->rx_pkt_len; 212 213 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { 214 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; 215 ret = 1; 216 goto done; 217 } 218 219 /* No one else can be removing stuff from the queue, so we shouldn't 220 * need the Rx lock to walk it. 221 */ 222 skb = skb_peek(&call->recvmsg_queue); 223 while (skb) { 224 rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg); 225 sp = rxrpc_skb(skb); 226 seq = sp->hdr.seq; 227 228 if (!(flags & MSG_PEEK)) 229 trace_rxrpc_receive(call, rxrpc_receive_front, 230 sp->hdr.serial, seq); 231 232 if (msg) 233 sock_recv_timestamp(msg, sock->sk, skb); 234 235 if (rx_pkt_offset == 0) { 236 ret2 = rxrpc_verify_data(call, skb); 237 rx_pkt_offset = sp->offset; 238 rx_pkt_len = sp->len; 239 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 240 rx_pkt_offset, rx_pkt_len, ret2); 241 if (ret2 < 0) { 242 ret = ret2; 243 goto out; 244 } 245 } else { 246 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 247 rx_pkt_offset, rx_pkt_len, 0); 248 } 249 250 /* We have to handle short, empty and used-up DATA packets. */ 251 remain = len - *_offset; 252 copy = rx_pkt_len; 253 if (copy > remain) 254 copy = remain; 255 if (copy > 0) { 256 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 257 copy); 258 if (ret2 < 0) { 259 ret = ret2; 260 goto out; 261 } 262 263 /* handle piecemeal consumption of data packets */ 264 rx_pkt_offset += copy; 265 rx_pkt_len -= copy; 266 *_offset += copy; 267 } 268 269 if (rx_pkt_len > 0) { 270 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 271 rx_pkt_offset, rx_pkt_len, 0); 272 ASSERTCMP(*_offset, ==, len); 273 ret = 0; 274 break; 275 } 276 277 /* The whole packet has been transferred. */ 278 if (sp->hdr.flags & RXRPC_LAST_PACKET) 279 ret = 1; 280 rx_pkt_offset = 0; 281 rx_pkt_len = 0; 282 283 skb = skb_peek_next(skb, &call->recvmsg_queue); 284 285 if (!(flags & MSG_PEEK)) 286 rxrpc_rotate_rx_window(call); 287 } 288 289 out: 290 if (!(flags & MSG_PEEK)) { 291 call->rx_pkt_offset = rx_pkt_offset; 292 call->rx_pkt_len = rx_pkt_len; 293 } 294 done: 295 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 296 rx_pkt_offset, rx_pkt_len, ret); 297 if (ret == -EAGAIN) 298 set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags); 299 return ret; 300 } 301 302 /* 303 * Receive a message from an RxRPC socket 304 * - we need to be careful about two or more threads calling recvmsg 305 * simultaneously 306 */ 307 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 308 int flags) 309 { 310 struct rxrpc_call *call; 311 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 312 struct list_head *l; 313 unsigned int call_debug_id = 0; 314 size_t copied = 0; 315 long timeo; 316 int ret; 317 318 DEFINE_WAIT(wait); 319 320 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0); 321 322 if (flags & (MSG_OOB | MSG_TRUNC)) 323 return -EOPNOTSUPP; 324 325 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 326 327 try_again: 328 lock_sock(&rx->sk); 329 330 /* Return immediately if a client socket has no outstanding calls */ 331 if (RB_EMPTY_ROOT(&rx->calls) && 332 list_empty(&rx->recvmsg_q) && 333 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 334 release_sock(&rx->sk); 335 return -EAGAIN; 336 } 337 338 if (list_empty(&rx->recvmsg_q)) { 339 ret = -EWOULDBLOCK; 340 if (timeo == 0) { 341 call = NULL; 342 goto error_no_call; 343 } 344 345 release_sock(&rx->sk); 346 347 /* Wait for something to happen */ 348 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 349 TASK_INTERRUPTIBLE); 350 ret = sock_error(&rx->sk); 351 if (ret) 352 goto wait_error; 353 354 if (list_empty(&rx->recvmsg_q)) { 355 if (signal_pending(current)) 356 goto wait_interrupted; 357 trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0); 358 timeo = schedule_timeout(timeo); 359 } 360 finish_wait(sk_sleep(&rx->sk), &wait); 361 goto try_again; 362 } 363 364 /* Find the next call and dequeue it if we're not just peeking. If we 365 * do dequeue it, that comes with a ref that we will need to release. 366 */ 367 write_lock(&rx->recvmsg_lock); 368 l = rx->recvmsg_q.next; 369 call = list_entry(l, struct rxrpc_call, recvmsg_link); 370 if (!(flags & MSG_PEEK)) 371 list_del_init(&call->recvmsg_link); 372 else 373 rxrpc_get_call(call, rxrpc_call_get_recvmsg); 374 write_unlock(&rx->recvmsg_lock); 375 376 call_debug_id = call->debug_id; 377 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0); 378 379 /* We're going to drop the socket lock, so we need to lock the call 380 * against interference by sendmsg. 381 */ 382 if (!mutex_trylock(&call->user_mutex)) { 383 ret = -EWOULDBLOCK; 384 if (flags & MSG_DONTWAIT) 385 goto error_requeue_call; 386 ret = -ERESTARTSYS; 387 if (mutex_lock_interruptible(&call->user_mutex) < 0) 388 goto error_requeue_call; 389 } 390 391 release_sock(&rx->sk); 392 393 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 394 BUG(); 395 396 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 397 if (flags & MSG_CMSG_COMPAT) { 398 unsigned int id32 = call->user_call_ID; 399 400 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 401 sizeof(unsigned int), &id32); 402 } else { 403 unsigned long idl = call->user_call_ID; 404 405 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 406 sizeof(unsigned long), &idl); 407 } 408 if (ret < 0) 409 goto error_unlock_call; 410 } 411 412 if (msg->msg_name && call->peer) { 413 size_t len = sizeof(call->dest_srx); 414 415 memcpy(msg->msg_name, &call->dest_srx, len); 416 msg->msg_namelen = len; 417 } 418 419 switch (READ_ONCE(call->state)) { 420 case RXRPC_CALL_CLIENT_RECV_REPLY: 421 case RXRPC_CALL_SERVER_RECV_REQUEST: 422 case RXRPC_CALL_SERVER_ACK_REQUEST: 423 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 424 flags, &copied); 425 if (ret == -EAGAIN) 426 ret = 0; 427 428 if (!skb_queue_empty(&call->recvmsg_queue)) 429 rxrpc_notify_socket(call); 430 break; 431 default: 432 ret = 0; 433 break; 434 } 435 436 if (ret < 0) 437 goto error_unlock_call; 438 439 if (call->state == RXRPC_CALL_COMPLETE) { 440 ret = rxrpc_recvmsg_term(call, msg); 441 if (ret < 0) 442 goto error_unlock_call; 443 if (!(flags & MSG_PEEK)) 444 rxrpc_release_call(rx, call); 445 msg->msg_flags |= MSG_EOR; 446 ret = 1; 447 } 448 449 if (ret == 0) 450 msg->msg_flags |= MSG_MORE; 451 else 452 msg->msg_flags &= ~MSG_MORE; 453 ret = copied; 454 455 error_unlock_call: 456 mutex_unlock(&call->user_mutex); 457 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 458 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 459 return ret; 460 461 error_requeue_call: 462 if (!(flags & MSG_PEEK)) { 463 write_lock(&rx->recvmsg_lock); 464 list_add(&call->recvmsg_link, &rx->recvmsg_q); 465 write_unlock(&rx->recvmsg_lock); 466 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0); 467 } else { 468 rxrpc_put_call(call, rxrpc_call_put_recvmsg); 469 } 470 error_no_call: 471 release_sock(&rx->sk); 472 error_trace: 473 trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret); 474 return ret; 475 476 wait_interrupted: 477 ret = sock_intr_errno(timeo); 478 wait_error: 479 finish_wait(sk_sleep(&rx->sk), &wait); 480 call = NULL; 481 goto error_trace; 482 } 483 484 /** 485 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 486 * @sock: The socket that the call exists on 487 * @call: The call to send data through 488 * @iter: The buffer to receive into 489 * @_len: The amount of data we want to receive (decreased on return) 490 * @want_more: True if more data is expected to be read 491 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 492 * @_service: Where to store the actual service ID (may be upgraded) 493 * 494 * Allow a kernel service to receive data and pick up information about the 495 * state of a call. Returns 0 if got what was asked for and there's more 496 * available, 1 if we got what was asked for and we're at the end of the data 497 * and -EAGAIN if we need more data. 498 * 499 * Note that we may return -EAGAIN to drain empty packets at the end of the 500 * data, even if we've already copied over the requested data. 501 * 502 * *_abort should also be initialised to 0. 503 */ 504 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 505 struct iov_iter *iter, size_t *_len, 506 bool want_more, u32 *_abort, u16 *_service) 507 { 508 size_t offset = 0; 509 int ret; 510 511 _enter("{%d,%s},%zu,%d", 512 call->debug_id, rxrpc_call_states[call->state], 513 *_len, want_more); 514 515 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING); 516 517 mutex_lock(&call->user_mutex); 518 519 switch (READ_ONCE(call->state)) { 520 case RXRPC_CALL_CLIENT_RECV_REPLY: 521 case RXRPC_CALL_SERVER_RECV_REQUEST: 522 case RXRPC_CALL_SERVER_ACK_REQUEST: 523 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, 524 *_len, 0, &offset); 525 *_len -= offset; 526 if (ret < 0) 527 goto out; 528 529 /* We can only reach here with a partially full buffer if we 530 * have reached the end of the data. We must otherwise have a 531 * full buffer or have been given -EAGAIN. 532 */ 533 if (ret == 1) { 534 if (iov_iter_count(iter) > 0) 535 goto short_data; 536 if (!want_more) 537 goto read_phase_complete; 538 ret = 0; 539 goto out; 540 } 541 542 if (!want_more) 543 goto excess_data; 544 goto out; 545 546 case RXRPC_CALL_COMPLETE: 547 goto call_complete; 548 549 default: 550 ret = -EINPROGRESS; 551 goto out; 552 } 553 554 read_phase_complete: 555 ret = 1; 556 out: 557 if (_service) 558 *_service = call->dest_srx.srx_service; 559 mutex_unlock(&call->user_mutex); 560 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 561 return ret; 562 563 short_data: 564 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data, 565 call->cid, call->call_id, call->rx_consumed, 566 0, -EBADMSG); 567 ret = -EBADMSG; 568 goto out; 569 excess_data: 570 trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data, 571 call->cid, call->call_id, call->rx_consumed, 572 0, -EMSGSIZE); 573 ret = -EMSGSIZE; 574 goto out; 575 call_complete: 576 *_abort = call->abort_code; 577 ret = call->error; 578 if (call->completion == RXRPC_CALL_SUCCEEDED) { 579 ret = 1; 580 if (iov_iter_count(iter) > 0) 581 ret = -ECONNRESET; 582 } 583 goto out; 584 } 585 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 586