1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC recvmsg() implementation 3 * 4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/net.h> 11 #include <linux/skbuff.h> 12 #include <linux/export.h> 13 #include <linux/sched/signal.h> 14 15 #include <net/sock.h> 16 #include <net/af_rxrpc.h> 17 #include "ar-internal.h" 18 19 /* 20 * Post a call for attention by the socket or kernel service. Further 21 * notifications are suppressed by putting recvmsg_link on a dummy queue. 22 */ 23 void rxrpc_notify_socket(struct rxrpc_call *call) 24 { 25 struct rxrpc_sock *rx; 26 struct sock *sk; 27 28 _enter("%d", call->debug_id); 29 30 if (!list_empty(&call->recvmsg_link)) 31 return; 32 33 rcu_read_lock(); 34 35 rx = rcu_dereference(call->socket); 36 sk = &rx->sk; 37 if (rx && sk->sk_state < RXRPC_CLOSE) { 38 if (call->notify_rx) { 39 spin_lock_bh(&call->notify_lock); 40 call->notify_rx(sk, call, call->user_call_ID); 41 spin_unlock_bh(&call->notify_lock); 42 } else { 43 write_lock_bh(&rx->recvmsg_lock); 44 if (list_empty(&call->recvmsg_link)) { 45 rxrpc_get_call(call, rxrpc_call_got); 46 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q); 47 } 48 write_unlock_bh(&rx->recvmsg_lock); 49 50 if (!sock_flag(sk, SOCK_DEAD)) { 51 _debug("call %ps", sk->sk_data_ready); 52 sk->sk_data_ready(sk); 53 } 54 } 55 } 56 57 rcu_read_unlock(); 58 _leave(""); 59 } 60 61 /* 62 * Transition a call to the complete state. 63 */ 64 bool __rxrpc_set_call_completion(struct rxrpc_call *call, 65 enum rxrpc_call_completion compl, 66 u32 abort_code, 67 int error) 68 { 69 if (call->state < RXRPC_CALL_COMPLETE) { 70 call->abort_code = abort_code; 71 call->error = error; 72 call->completion = compl; 73 call->state = RXRPC_CALL_COMPLETE; 74 trace_rxrpc_call_complete(call); 75 wake_up(&call->waitq); 76 rxrpc_notify_socket(call); 77 return true; 78 } 79 return false; 80 } 81 82 bool rxrpc_set_call_completion(struct rxrpc_call *call, 83 enum rxrpc_call_completion compl, 84 u32 abort_code, 85 int error) 86 { 87 bool ret = false; 88 89 if (call->state < RXRPC_CALL_COMPLETE) { 90 write_lock_bh(&call->state_lock); 91 ret = __rxrpc_set_call_completion(call, compl, abort_code, error); 92 write_unlock_bh(&call->state_lock); 93 } 94 return ret; 95 } 96 97 /* 98 * Record that a call successfully completed. 99 */ 100 bool __rxrpc_call_completed(struct rxrpc_call *call) 101 { 102 return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0); 103 } 104 105 bool rxrpc_call_completed(struct rxrpc_call *call) 106 { 107 bool ret = false; 108 109 if (call->state < RXRPC_CALL_COMPLETE) { 110 write_lock_bh(&call->state_lock); 111 ret = __rxrpc_call_completed(call); 112 write_unlock_bh(&call->state_lock); 113 } 114 return ret; 115 } 116 117 /* 118 * Record that a call is locally aborted. 119 */ 120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call, 121 rxrpc_seq_t seq, u32 abort_code, int error) 122 { 123 trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq, 124 abort_code, error); 125 return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED, 126 abort_code, error); 127 } 128 129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call, 130 rxrpc_seq_t seq, u32 abort_code, int error) 131 { 132 bool ret; 133 134 write_lock_bh(&call->state_lock); 135 ret = __rxrpc_abort_call(why, call, seq, abort_code, error); 136 write_unlock_bh(&call->state_lock); 137 return ret; 138 } 139 140 /* 141 * Pass a call terminating message to userspace. 142 */ 143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg) 144 { 145 u32 tmp = 0; 146 int ret; 147 148 switch (call->completion) { 149 case RXRPC_CALL_SUCCEEDED: 150 ret = 0; 151 if (rxrpc_is_service_call(call)) 152 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp); 153 break; 154 case RXRPC_CALL_REMOTELY_ABORTED: 155 tmp = call->abort_code; 156 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 157 break; 158 case RXRPC_CALL_LOCALLY_ABORTED: 159 tmp = call->abort_code; 160 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp); 161 break; 162 case RXRPC_CALL_NETWORK_ERROR: 163 tmp = -call->error; 164 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp); 165 break; 166 case RXRPC_CALL_LOCAL_ERROR: 167 tmp = -call->error; 168 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp); 169 break; 170 default: 171 pr_err("Invalid terminal call state %u\n", call->state); 172 BUG(); 173 break; 174 } 175 176 trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, 177 lower_32_bits(atomic64_read(&call->ackr_window)) - 1, 178 call->rx_pkt_offset, call->rx_pkt_len, ret); 179 return ret; 180 } 181 182 /* 183 * End the packet reception phase. 184 */ 185 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial) 186 { 187 rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq); 188 189 _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]); 190 191 trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh); 192 193 if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) 194 rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack); 195 196 write_lock_bh(&call->state_lock); 197 198 switch (call->state) { 199 case RXRPC_CALL_CLIENT_RECV_REPLY: 200 __rxrpc_call_completed(call); 201 write_unlock_bh(&call->state_lock); 202 break; 203 204 case RXRPC_CALL_SERVER_RECV_REQUEST: 205 call->state = RXRPC_CALL_SERVER_ACK_REQUEST; 206 call->expect_req_by = jiffies + MAX_JIFFY_OFFSET; 207 write_unlock_bh(&call->state_lock); 208 rxrpc_propose_delay_ACK(call, serial, 209 rxrpc_propose_ack_processing_op); 210 break; 211 default: 212 write_unlock_bh(&call->state_lock); 213 break; 214 } 215 } 216 217 /* 218 * Discard a packet we've used up and advance the Rx window by one. 219 */ 220 static void rxrpc_rotate_rx_window(struct rxrpc_call *call) 221 { 222 struct rxrpc_skb_priv *sp; 223 struct sk_buff *skb; 224 rxrpc_serial_t serial; 225 rxrpc_seq_t old_consumed = call->rx_consumed, tseq; 226 bool last; 227 int acked; 228 229 _enter("%d", call->debug_id); 230 231 further_rotation: 232 skb = skb_dequeue(&call->recvmsg_queue); 233 rxrpc_see_skb(skb, rxrpc_skb_rotated); 234 235 sp = rxrpc_skb(skb); 236 tseq = sp->hdr.seq; 237 serial = sp->hdr.serial; 238 last = sp->hdr.flags & RXRPC_LAST_PACKET; 239 240 /* Barrier against rxrpc_input_data(). */ 241 if (after(tseq, call->rx_consumed)) 242 smp_store_release(&call->rx_consumed, tseq); 243 244 rxrpc_free_skb(skb, rxrpc_skb_freed); 245 246 trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, 247 serial, call->rx_consumed); 248 if (last) { 249 rxrpc_end_rx_phase(call, serial); 250 return; 251 } 252 253 /* The next packet on the queue might entirely overlap with the one we 254 * just consumed; if so, rotate that away also. 255 */ 256 skb = skb_peek(&call->recvmsg_queue); 257 if (skb) { 258 sp = rxrpc_skb(skb); 259 if (sp->hdr.seq != call->rx_consumed && 260 after_eq(call->rx_consumed, sp->hdr.seq)) 261 goto further_rotation; 262 } 263 264 /* Check to see if there's an ACK that needs sending. */ 265 acked = atomic_add_return(call->rx_consumed - old_consumed, 266 &call->ackr_nr_consumed); 267 if (acked > 2 && 268 !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) { 269 rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial, 270 rxrpc_propose_ack_rotate_rx); 271 rxrpc_transmit_ack_packets(call->peer->local); 272 } 273 } 274 275 /* 276 * Decrypt and verify a DATA packet. 277 */ 278 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb) 279 { 280 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 281 282 if (sp->flags & RXRPC_RX_VERIFIED) 283 return 0; 284 return call->security->verify_packet(call, skb); 285 } 286 287 /* 288 * Deliver messages to a call. This keeps processing packets until the buffer 289 * is filled and we find either more DATA (returns 0) or the end of the DATA 290 * (returns 1). If more packets are required, it returns -EAGAIN. 291 */ 292 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, 293 struct msghdr *msg, struct iov_iter *iter, 294 size_t len, int flags, size_t *_offset) 295 { 296 struct rxrpc_skb_priv *sp; 297 struct sk_buff *skb; 298 rxrpc_seq_t seq = 0; 299 size_t remain; 300 unsigned int rx_pkt_offset, rx_pkt_len; 301 int copy, ret = -EAGAIN, ret2; 302 303 rx_pkt_offset = call->rx_pkt_offset; 304 rx_pkt_len = call->rx_pkt_len; 305 306 if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) { 307 seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; 308 ret = 1; 309 goto done; 310 } 311 312 /* No one else can be removing stuff from the queue, so we shouldn't 313 * need the Rx lock to walk it. 314 */ 315 skb = skb_peek(&call->recvmsg_queue); 316 while (skb) { 317 rxrpc_see_skb(skb, rxrpc_skb_seen); 318 sp = rxrpc_skb(skb); 319 seq = sp->hdr.seq; 320 321 if (after_eq(call->rx_consumed, seq)) { 322 kdebug("obsolete %x %x", call->rx_consumed, seq); 323 goto skip_obsolete; 324 } 325 326 if (!(flags & MSG_PEEK)) 327 trace_rxrpc_receive(call, rxrpc_receive_front, 328 sp->hdr.serial, seq); 329 330 if (msg) 331 sock_recv_timestamp(msg, sock->sk, skb); 332 333 if (rx_pkt_offset == 0) { 334 ret2 = rxrpc_verify_data(call, skb); 335 rx_pkt_offset = sp->offset; 336 rx_pkt_len = sp->len; 337 trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, 338 rx_pkt_offset, rx_pkt_len, ret2); 339 if (ret2 < 0) { 340 ret = ret2; 341 goto out; 342 } 343 rxrpc_transmit_ack_packets(call->peer->local); 344 } else { 345 trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, 346 rx_pkt_offset, rx_pkt_len, 0); 347 } 348 349 /* We have to handle short, empty and used-up DATA packets. */ 350 remain = len - *_offset; 351 copy = rx_pkt_len; 352 if (copy > remain) 353 copy = remain; 354 if (copy > 0) { 355 ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter, 356 copy); 357 if (ret2 < 0) { 358 ret = ret2; 359 goto out; 360 } 361 362 /* handle piecemeal consumption of data packets */ 363 rx_pkt_offset += copy; 364 rx_pkt_len -= copy; 365 *_offset += copy; 366 } 367 368 if (rx_pkt_len > 0) { 369 trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq, 370 rx_pkt_offset, rx_pkt_len, 0); 371 ASSERTCMP(*_offset, ==, len); 372 ret = 0; 373 break; 374 } 375 376 skip_obsolete: 377 /* The whole packet has been transferred. */ 378 if (sp->hdr.flags & RXRPC_LAST_PACKET) 379 ret = 1; 380 rx_pkt_offset = 0; 381 rx_pkt_len = 0; 382 383 skb = skb_peek_next(skb, &call->recvmsg_queue); 384 385 if (!(flags & MSG_PEEK)) 386 rxrpc_rotate_rx_window(call); 387 } 388 389 out: 390 if (!(flags & MSG_PEEK)) { 391 call->rx_pkt_offset = rx_pkt_offset; 392 call->rx_pkt_len = rx_pkt_len; 393 } 394 done: 395 trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq, 396 rx_pkt_offset, rx_pkt_len, ret); 397 if (ret == -EAGAIN) 398 set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags); 399 return ret; 400 } 401 402 /* 403 * Receive a message from an RxRPC socket 404 * - we need to be careful about two or more threads calling recvmsg 405 * simultaneously 406 */ 407 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, 408 int flags) 409 { 410 struct rxrpc_call *call; 411 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 412 struct list_head *l; 413 size_t copied = 0; 414 long timeo; 415 int ret; 416 417 DEFINE_WAIT(wait); 418 419 trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0); 420 421 if (flags & (MSG_OOB | MSG_TRUNC)) 422 return -EOPNOTSUPP; 423 424 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 425 426 try_again: 427 lock_sock(&rx->sk); 428 429 /* Return immediately if a client socket has no outstanding calls */ 430 if (RB_EMPTY_ROOT(&rx->calls) && 431 list_empty(&rx->recvmsg_q) && 432 rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 433 release_sock(&rx->sk); 434 return -EAGAIN; 435 } 436 437 if (list_empty(&rx->recvmsg_q)) { 438 ret = -EWOULDBLOCK; 439 if (timeo == 0) { 440 call = NULL; 441 goto error_no_call; 442 } 443 444 release_sock(&rx->sk); 445 446 /* Wait for something to happen */ 447 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 448 TASK_INTERRUPTIBLE); 449 ret = sock_error(&rx->sk); 450 if (ret) 451 goto wait_error; 452 453 if (list_empty(&rx->recvmsg_q)) { 454 if (signal_pending(current)) 455 goto wait_interrupted; 456 trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, 0); 457 timeo = schedule_timeout(timeo); 458 } 459 finish_wait(sk_sleep(&rx->sk), &wait); 460 goto try_again; 461 } 462 463 /* Find the next call and dequeue it if we're not just peeking. If we 464 * do dequeue it, that comes with a ref that we will need to release. 465 */ 466 write_lock_bh(&rx->recvmsg_lock); 467 l = rx->recvmsg_q.next; 468 call = list_entry(l, struct rxrpc_call, recvmsg_link); 469 if (!(flags & MSG_PEEK)) 470 list_del_init(&call->recvmsg_link); 471 else 472 rxrpc_get_call(call, rxrpc_call_got); 473 write_unlock_bh(&rx->recvmsg_lock); 474 475 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0); 476 477 /* We're going to drop the socket lock, so we need to lock the call 478 * against interference by sendmsg. 479 */ 480 if (!mutex_trylock(&call->user_mutex)) { 481 ret = -EWOULDBLOCK; 482 if (flags & MSG_DONTWAIT) 483 goto error_requeue_call; 484 ret = -ERESTARTSYS; 485 if (mutex_lock_interruptible(&call->user_mutex) < 0) 486 goto error_requeue_call; 487 } 488 489 release_sock(&rx->sk); 490 491 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) 492 BUG(); 493 494 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 495 if (flags & MSG_CMSG_COMPAT) { 496 unsigned int id32 = call->user_call_ID; 497 498 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 499 sizeof(unsigned int), &id32); 500 } else { 501 unsigned long idl = call->user_call_ID; 502 503 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 504 sizeof(unsigned long), &idl); 505 } 506 if (ret < 0) 507 goto error_unlock_call; 508 } 509 510 if (msg->msg_name && call->peer) { 511 struct sockaddr_rxrpc *srx = msg->msg_name; 512 size_t len = sizeof(call->peer->srx); 513 514 memcpy(msg->msg_name, &call->peer->srx, len); 515 srx->srx_service = call->service_id; 516 msg->msg_namelen = len; 517 } 518 519 switch (READ_ONCE(call->state)) { 520 case RXRPC_CALL_CLIENT_RECV_REPLY: 521 case RXRPC_CALL_SERVER_RECV_REQUEST: 522 case RXRPC_CALL_SERVER_ACK_REQUEST: 523 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len, 524 flags, &copied); 525 if (ret == -EAGAIN) 526 ret = 0; 527 528 rxrpc_transmit_ack_packets(call->peer->local); 529 if (!skb_queue_empty(&call->recvmsg_queue)) 530 rxrpc_notify_socket(call); 531 break; 532 default: 533 ret = 0; 534 break; 535 } 536 537 if (ret < 0) 538 goto error_unlock_call; 539 540 if (call->state == RXRPC_CALL_COMPLETE) { 541 ret = rxrpc_recvmsg_term(call, msg); 542 if (ret < 0) 543 goto error_unlock_call; 544 if (!(flags & MSG_PEEK)) 545 rxrpc_release_call(rx, call); 546 msg->msg_flags |= MSG_EOR; 547 ret = 1; 548 } 549 550 if (ret == 0) 551 msg->msg_flags |= MSG_MORE; 552 else 553 msg->msg_flags &= ~MSG_MORE; 554 ret = copied; 555 556 error_unlock_call: 557 mutex_unlock(&call->user_mutex); 558 rxrpc_put_call(call, rxrpc_call_put); 559 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); 560 return ret; 561 562 error_requeue_call: 563 if (!(flags & MSG_PEEK)) { 564 write_lock_bh(&rx->recvmsg_lock); 565 list_add(&call->recvmsg_link, &rx->recvmsg_q); 566 write_unlock_bh(&rx->recvmsg_lock); 567 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0); 568 } else { 569 rxrpc_put_call(call, rxrpc_call_put); 570 } 571 error_no_call: 572 release_sock(&rx->sk); 573 error_trace: 574 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret); 575 return ret; 576 577 wait_interrupted: 578 ret = sock_intr_errno(timeo); 579 wait_error: 580 finish_wait(sk_sleep(&rx->sk), &wait); 581 call = NULL; 582 goto error_trace; 583 } 584 585 /** 586 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info 587 * @sock: The socket that the call exists on 588 * @call: The call to send data through 589 * @iter: The buffer to receive into 590 * @_len: The amount of data we want to receive (decreased on return) 591 * @want_more: True if more data is expected to be read 592 * @_abort: Where the abort code is stored if -ECONNABORTED is returned 593 * @_service: Where to store the actual service ID (may be upgraded) 594 * 595 * Allow a kernel service to receive data and pick up information about the 596 * state of a call. Returns 0 if got what was asked for and there's more 597 * available, 1 if we got what was asked for and we're at the end of the data 598 * and -EAGAIN if we need more data. 599 * 600 * Note that we may return -EAGAIN to drain empty packets at the end of the 601 * data, even if we've already copied over the requested data. 602 * 603 * *_abort should also be initialised to 0. 604 */ 605 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call, 606 struct iov_iter *iter, size_t *_len, 607 bool want_more, u32 *_abort, u16 *_service) 608 { 609 size_t offset = 0; 610 int ret; 611 612 _enter("{%d,%s},%zu,%d", 613 call->debug_id, rxrpc_call_states[call->state], 614 *_len, want_more); 615 616 ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING); 617 618 mutex_lock(&call->user_mutex); 619 620 switch (READ_ONCE(call->state)) { 621 case RXRPC_CALL_CLIENT_RECV_REPLY: 622 case RXRPC_CALL_SERVER_RECV_REQUEST: 623 case RXRPC_CALL_SERVER_ACK_REQUEST: 624 ret = rxrpc_recvmsg_data(sock, call, NULL, iter, 625 *_len, 0, &offset); 626 *_len -= offset; 627 if (ret < 0) 628 goto out; 629 630 /* We can only reach here with a partially full buffer if we 631 * have reached the end of the data. We must otherwise have a 632 * full buffer or have been given -EAGAIN. 633 */ 634 if (ret == 1) { 635 if (iov_iter_count(iter) > 0) 636 goto short_data; 637 if (!want_more) 638 goto read_phase_complete; 639 ret = 0; 640 goto out; 641 } 642 643 if (!want_more) 644 goto excess_data; 645 goto out; 646 647 case RXRPC_CALL_COMPLETE: 648 goto call_complete; 649 650 default: 651 ret = -EINPROGRESS; 652 goto out; 653 } 654 655 read_phase_complete: 656 ret = 1; 657 out: 658 rxrpc_transmit_ack_packets(call->peer->local); 659 if (_service) 660 *_service = call->service_id; 661 mutex_unlock(&call->user_mutex); 662 _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort); 663 return ret; 664 665 short_data: 666 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data")); 667 ret = -EBADMSG; 668 goto out; 669 excess_data: 670 trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data")); 671 ret = -EMSGSIZE; 672 goto out; 673 call_complete: 674 *_abort = call->abort_code; 675 ret = call->error; 676 if (call->completion == RXRPC_CALL_SUCCEEDED) { 677 ret = 1; 678 if (iov_iter_count(iter) > 0) 679 ret = -ECONNRESET; 680 } 681 goto out; 682 } 683 EXPORT_SYMBOL(rxrpc_kernel_recv_data); 684