xref: /openbmc/linux/net/rxrpc/recvmsg.c (revision 57af281e)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 
33 	rcu_read_lock();
34 
35 	rx = rcu_dereference(call->socket);
36 	sk = &rx->sk;
37 	if (rx && sk->sk_state < RXRPC_CLOSE) {
38 		if (call->notify_rx) {
39 			spin_lock(&call->notify_lock);
40 			call->notify_rx(sk, call, call->user_call_ID);
41 			spin_unlock(&call->notify_lock);
42 		} else {
43 			write_lock(&rx->recvmsg_lock);
44 			if (list_empty(&call->recvmsg_link)) {
45 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
46 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47 			}
48 			write_unlock(&rx->recvmsg_lock);
49 
50 			if (!sock_flag(sk, SOCK_DEAD)) {
51 				_debug("call %ps", sk->sk_data_ready);
52 				sk->sk_data_ready(sk);
53 			}
54 		}
55 	}
56 
57 	rcu_read_unlock();
58 	_leave("");
59 }
60 
61 /*
62  * Transition a call to the complete state.
63  */
64 bool __rxrpc_set_call_completion(struct rxrpc_call *call,
65 				 enum rxrpc_call_completion compl,
66 				 u32 abort_code,
67 				 int error)
68 {
69 	if (call->state < RXRPC_CALL_COMPLETE) {
70 		call->abort_code = abort_code;
71 		call->error = error;
72 		call->completion = compl;
73 		call->state = RXRPC_CALL_COMPLETE;
74 		trace_rxrpc_call_complete(call);
75 		wake_up(&call->waitq);
76 		rxrpc_notify_socket(call);
77 		return true;
78 	}
79 	return false;
80 }
81 
82 bool rxrpc_set_call_completion(struct rxrpc_call *call,
83 			       enum rxrpc_call_completion compl,
84 			       u32 abort_code,
85 			       int error)
86 {
87 	bool ret = false;
88 
89 	if (call->state < RXRPC_CALL_COMPLETE) {
90 		write_lock(&call->state_lock);
91 		ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
92 		write_unlock(&call->state_lock);
93 	}
94 	return ret;
95 }
96 
97 /*
98  * Record that a call successfully completed.
99  */
100 bool __rxrpc_call_completed(struct rxrpc_call *call)
101 {
102 	return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
103 }
104 
105 bool rxrpc_call_completed(struct rxrpc_call *call)
106 {
107 	bool ret = false;
108 
109 	if (call->state < RXRPC_CALL_COMPLETE) {
110 		write_lock(&call->state_lock);
111 		ret = __rxrpc_call_completed(call);
112 		write_unlock(&call->state_lock);
113 	}
114 	return ret;
115 }
116 
117 /*
118  * Record that a call is locally aborted.
119  */
120 bool __rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
121 			u32 abort_code, int error, enum rxrpc_abort_reason why)
122 {
123 	trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
124 			  abort_code, error);
125 	return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
126 					   abort_code, error);
127 }
128 
129 bool rxrpc_abort_call(struct rxrpc_call *call, rxrpc_seq_t seq,
130 		      u32 abort_code, int error, enum rxrpc_abort_reason why)
131 {
132 	bool ret;
133 
134 	write_lock(&call->state_lock);
135 	ret = __rxrpc_abort_call(call, seq, abort_code, error, why);
136 	write_unlock(&call->state_lock);
137 	if (ret && test_bit(RXRPC_CALL_EXPOSED, &call->flags))
138 		rxrpc_send_abort_packet(call);
139 	return ret;
140 }
141 
142 /*
143  * Pass a call terminating message to userspace.
144  */
145 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
146 {
147 	u32 tmp = 0;
148 	int ret;
149 
150 	switch (call->completion) {
151 	case RXRPC_CALL_SUCCEEDED:
152 		ret = 0;
153 		if (rxrpc_is_service_call(call))
154 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
155 		break;
156 	case RXRPC_CALL_REMOTELY_ABORTED:
157 		tmp = call->abort_code;
158 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
159 		break;
160 	case RXRPC_CALL_LOCALLY_ABORTED:
161 		tmp = call->abort_code;
162 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
163 		break;
164 	case RXRPC_CALL_NETWORK_ERROR:
165 		tmp = -call->error;
166 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
167 		break;
168 	case RXRPC_CALL_LOCAL_ERROR:
169 		tmp = -call->error;
170 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
171 		break;
172 	default:
173 		pr_err("Invalid terminal call state %u\n", call->state);
174 		BUG();
175 		break;
176 	}
177 
178 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
179 			     lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
180 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
181 	return ret;
182 }
183 
184 /*
185  * End the packet reception phase.
186  */
187 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
188 {
189 	rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
190 
191 	_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
192 
193 	trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
194 
195 	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
196 		rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
197 
198 	write_lock(&call->state_lock);
199 
200 	switch (call->state) {
201 	case RXRPC_CALL_CLIENT_RECV_REPLY:
202 		__rxrpc_call_completed(call);
203 		write_unlock(&call->state_lock);
204 		rxrpc_poke_call(call, rxrpc_call_poke_complete);
205 		break;
206 
207 	case RXRPC_CALL_SERVER_RECV_REQUEST:
208 		call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
209 		call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
210 		write_unlock(&call->state_lock);
211 		rxrpc_propose_delay_ACK(call, serial,
212 					rxrpc_propose_ack_processing_op);
213 		break;
214 	default:
215 		write_unlock(&call->state_lock);
216 		break;
217 	}
218 }
219 
220 /*
221  * Discard a packet we've used up and advance the Rx window by one.
222  */
223 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
224 {
225 	struct rxrpc_skb_priv *sp;
226 	struct sk_buff *skb;
227 	rxrpc_serial_t serial;
228 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
229 	bool last;
230 	int acked;
231 
232 	_enter("%d", call->debug_id);
233 
234 	skb = skb_dequeue(&call->recvmsg_queue);
235 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
236 
237 	sp = rxrpc_skb(skb);
238 	tseq   = sp->hdr.seq;
239 	serial = sp->hdr.serial;
240 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
241 
242 	/* Barrier against rxrpc_input_data(). */
243 	if (after(tseq, call->rx_consumed))
244 		smp_store_release(&call->rx_consumed, tseq);
245 
246 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
247 
248 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
249 			    serial, call->rx_consumed);
250 	if (last) {
251 		rxrpc_end_rx_phase(call, serial);
252 		return;
253 	}
254 
255 	/* Check to see if there's an ACK that needs sending. */
256 	acked = atomic_add_return(call->rx_consumed - old_consumed,
257 				  &call->ackr_nr_consumed);
258 	if (acked > 2 &&
259 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
260 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
261 }
262 
263 /*
264  * Decrypt and verify a DATA packet.
265  */
266 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
267 {
268 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
269 
270 	if (sp->flags & RXRPC_RX_VERIFIED)
271 		return 0;
272 	return call->security->verify_packet(call, skb);
273 }
274 
275 /*
276  * Deliver messages to a call.  This keeps processing packets until the buffer
277  * is filled and we find either more DATA (returns 0) or the end of the DATA
278  * (returns 1).  If more packets are required, it returns -EAGAIN.
279  */
280 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
281 			      struct msghdr *msg, struct iov_iter *iter,
282 			      size_t len, int flags, size_t *_offset)
283 {
284 	struct rxrpc_skb_priv *sp;
285 	struct sk_buff *skb;
286 	rxrpc_seq_t seq = 0;
287 	size_t remain;
288 	unsigned int rx_pkt_offset, rx_pkt_len;
289 	int copy, ret = -EAGAIN, ret2;
290 
291 	rx_pkt_offset = call->rx_pkt_offset;
292 	rx_pkt_len = call->rx_pkt_len;
293 
294 	if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
295 		seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
296 		ret = 1;
297 		goto done;
298 	}
299 
300 	/* No one else can be removing stuff from the queue, so we shouldn't
301 	 * need the Rx lock to walk it.
302 	 */
303 	skb = skb_peek(&call->recvmsg_queue);
304 	while (skb) {
305 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
306 		sp = rxrpc_skb(skb);
307 		seq = sp->hdr.seq;
308 
309 		if (!(flags & MSG_PEEK))
310 			trace_rxrpc_receive(call, rxrpc_receive_front,
311 					    sp->hdr.serial, seq);
312 
313 		if (msg)
314 			sock_recv_timestamp(msg, sock->sk, skb);
315 
316 		if (rx_pkt_offset == 0) {
317 			ret2 = rxrpc_verify_data(call, skb);
318 			rx_pkt_offset = sp->offset;
319 			rx_pkt_len = sp->len;
320 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
321 					     rx_pkt_offset, rx_pkt_len, ret2);
322 			if (ret2 < 0) {
323 				ret = ret2;
324 				goto out;
325 			}
326 		} else {
327 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
328 					     rx_pkt_offset, rx_pkt_len, 0);
329 		}
330 
331 		/* We have to handle short, empty and used-up DATA packets. */
332 		remain = len - *_offset;
333 		copy = rx_pkt_len;
334 		if (copy > remain)
335 			copy = remain;
336 		if (copy > 0) {
337 			ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
338 						      copy);
339 			if (ret2 < 0) {
340 				ret = ret2;
341 				goto out;
342 			}
343 
344 			/* handle piecemeal consumption of data packets */
345 			rx_pkt_offset += copy;
346 			rx_pkt_len -= copy;
347 			*_offset += copy;
348 		}
349 
350 		if (rx_pkt_len > 0) {
351 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
352 					     rx_pkt_offset, rx_pkt_len, 0);
353 			ASSERTCMP(*_offset, ==, len);
354 			ret = 0;
355 			break;
356 		}
357 
358 		/* The whole packet has been transferred. */
359 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
360 			ret = 1;
361 		rx_pkt_offset = 0;
362 		rx_pkt_len = 0;
363 
364 		skb = skb_peek_next(skb, &call->recvmsg_queue);
365 
366 		if (!(flags & MSG_PEEK))
367 			rxrpc_rotate_rx_window(call);
368 	}
369 
370 out:
371 	if (!(flags & MSG_PEEK)) {
372 		call->rx_pkt_offset = rx_pkt_offset;
373 		call->rx_pkt_len = rx_pkt_len;
374 	}
375 done:
376 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
377 			     rx_pkt_offset, rx_pkt_len, ret);
378 	if (ret == -EAGAIN)
379 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
380 	return ret;
381 }
382 
383 /*
384  * Receive a message from an RxRPC socket
385  * - we need to be careful about two or more threads calling recvmsg
386  *   simultaneously
387  */
388 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
389 		  int flags)
390 {
391 	struct rxrpc_call *call;
392 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
393 	struct list_head *l;
394 	unsigned int call_debug_id = 0;
395 	size_t copied = 0;
396 	long timeo;
397 	int ret;
398 
399 	DEFINE_WAIT(wait);
400 
401 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
402 
403 	if (flags & (MSG_OOB | MSG_TRUNC))
404 		return -EOPNOTSUPP;
405 
406 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
407 
408 try_again:
409 	lock_sock(&rx->sk);
410 
411 	/* Return immediately if a client socket has no outstanding calls */
412 	if (RB_EMPTY_ROOT(&rx->calls) &&
413 	    list_empty(&rx->recvmsg_q) &&
414 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
415 		release_sock(&rx->sk);
416 		return -EAGAIN;
417 	}
418 
419 	if (list_empty(&rx->recvmsg_q)) {
420 		ret = -EWOULDBLOCK;
421 		if (timeo == 0) {
422 			call = NULL;
423 			goto error_no_call;
424 		}
425 
426 		release_sock(&rx->sk);
427 
428 		/* Wait for something to happen */
429 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
430 					  TASK_INTERRUPTIBLE);
431 		ret = sock_error(&rx->sk);
432 		if (ret)
433 			goto wait_error;
434 
435 		if (list_empty(&rx->recvmsg_q)) {
436 			if (signal_pending(current))
437 				goto wait_interrupted;
438 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
439 			timeo = schedule_timeout(timeo);
440 		}
441 		finish_wait(sk_sleep(&rx->sk), &wait);
442 		goto try_again;
443 	}
444 
445 	/* Find the next call and dequeue it if we're not just peeking.  If we
446 	 * do dequeue it, that comes with a ref that we will need to release.
447 	 */
448 	write_lock(&rx->recvmsg_lock);
449 	l = rx->recvmsg_q.next;
450 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
451 	if (!(flags & MSG_PEEK))
452 		list_del_init(&call->recvmsg_link);
453 	else
454 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
455 	write_unlock(&rx->recvmsg_lock);
456 
457 	call_debug_id = call->debug_id;
458 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
459 
460 	/* We're going to drop the socket lock, so we need to lock the call
461 	 * against interference by sendmsg.
462 	 */
463 	if (!mutex_trylock(&call->user_mutex)) {
464 		ret = -EWOULDBLOCK;
465 		if (flags & MSG_DONTWAIT)
466 			goto error_requeue_call;
467 		ret = -ERESTARTSYS;
468 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
469 			goto error_requeue_call;
470 	}
471 
472 	release_sock(&rx->sk);
473 
474 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
475 		BUG();
476 
477 	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
478 		if (flags & MSG_CMSG_COMPAT) {
479 			unsigned int id32 = call->user_call_ID;
480 
481 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
482 				       sizeof(unsigned int), &id32);
483 		} else {
484 			unsigned long idl = call->user_call_ID;
485 
486 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
487 				       sizeof(unsigned long), &idl);
488 		}
489 		if (ret < 0)
490 			goto error_unlock_call;
491 	}
492 
493 	if (msg->msg_name && call->peer) {
494 		size_t len = sizeof(call->dest_srx);
495 
496 		memcpy(msg->msg_name, &call->dest_srx, len);
497 		msg->msg_namelen = len;
498 	}
499 
500 	switch (READ_ONCE(call->state)) {
501 	case RXRPC_CALL_CLIENT_RECV_REPLY:
502 	case RXRPC_CALL_SERVER_RECV_REQUEST:
503 	case RXRPC_CALL_SERVER_ACK_REQUEST:
504 		ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
505 					 flags, &copied);
506 		if (ret == -EAGAIN)
507 			ret = 0;
508 
509 		if (!skb_queue_empty(&call->recvmsg_queue))
510 			rxrpc_notify_socket(call);
511 		break;
512 	default:
513 		ret = 0;
514 		break;
515 	}
516 
517 	if (ret < 0)
518 		goto error_unlock_call;
519 
520 	if (call->state == RXRPC_CALL_COMPLETE) {
521 		ret = rxrpc_recvmsg_term(call, msg);
522 		if (ret < 0)
523 			goto error_unlock_call;
524 		if (!(flags & MSG_PEEK))
525 			rxrpc_release_call(rx, call);
526 		msg->msg_flags |= MSG_EOR;
527 		ret = 1;
528 	}
529 
530 	if (ret == 0)
531 		msg->msg_flags |= MSG_MORE;
532 	else
533 		msg->msg_flags &= ~MSG_MORE;
534 	ret = copied;
535 
536 error_unlock_call:
537 	mutex_unlock(&call->user_mutex);
538 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
539 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
540 	return ret;
541 
542 error_requeue_call:
543 	if (!(flags & MSG_PEEK)) {
544 		write_lock(&rx->recvmsg_lock);
545 		list_add(&call->recvmsg_link, &rx->recvmsg_q);
546 		write_unlock(&rx->recvmsg_lock);
547 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
548 	} else {
549 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
550 	}
551 error_no_call:
552 	release_sock(&rx->sk);
553 error_trace:
554 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
555 	return ret;
556 
557 wait_interrupted:
558 	ret = sock_intr_errno(timeo);
559 wait_error:
560 	finish_wait(sk_sleep(&rx->sk), &wait);
561 	call = NULL;
562 	goto error_trace;
563 }
564 
565 /**
566  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
567  * @sock: The socket that the call exists on
568  * @call: The call to send data through
569  * @iter: The buffer to receive into
570  * @_len: The amount of data we want to receive (decreased on return)
571  * @want_more: True if more data is expected to be read
572  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
573  * @_service: Where to store the actual service ID (may be upgraded)
574  *
575  * Allow a kernel service to receive data and pick up information about the
576  * state of a call.  Returns 0 if got what was asked for and there's more
577  * available, 1 if we got what was asked for and we're at the end of the data
578  * and -EAGAIN if we need more data.
579  *
580  * Note that we may return -EAGAIN to drain empty packets at the end of the
581  * data, even if we've already copied over the requested data.
582  *
583  * *_abort should also be initialised to 0.
584  */
585 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
586 			   struct iov_iter *iter, size_t *_len,
587 			   bool want_more, u32 *_abort, u16 *_service)
588 {
589 	size_t offset = 0;
590 	int ret;
591 
592 	_enter("{%d,%s},%zu,%d",
593 	       call->debug_id, rxrpc_call_states[call->state],
594 	       *_len, want_more);
595 
596 	ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
597 
598 	mutex_lock(&call->user_mutex);
599 
600 	switch (READ_ONCE(call->state)) {
601 	case RXRPC_CALL_CLIENT_RECV_REPLY:
602 	case RXRPC_CALL_SERVER_RECV_REQUEST:
603 	case RXRPC_CALL_SERVER_ACK_REQUEST:
604 		ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
605 					 *_len, 0, &offset);
606 		*_len -= offset;
607 		if (ret < 0)
608 			goto out;
609 
610 		/* We can only reach here with a partially full buffer if we
611 		 * have reached the end of the data.  We must otherwise have a
612 		 * full buffer or have been given -EAGAIN.
613 		 */
614 		if (ret == 1) {
615 			if (iov_iter_count(iter) > 0)
616 				goto short_data;
617 			if (!want_more)
618 				goto read_phase_complete;
619 			ret = 0;
620 			goto out;
621 		}
622 
623 		if (!want_more)
624 			goto excess_data;
625 		goto out;
626 
627 	case RXRPC_CALL_COMPLETE:
628 		goto call_complete;
629 
630 	default:
631 		ret = -EINPROGRESS;
632 		goto out;
633 	}
634 
635 read_phase_complete:
636 	ret = 1;
637 out:
638 	if (_service)
639 		*_service = call->dest_srx.srx_service;
640 	mutex_unlock(&call->user_mutex);
641 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
642 	return ret;
643 
644 short_data:
645 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
646 			  call->cid, call->call_id, call->rx_consumed,
647 			  0, -EBADMSG);
648 	ret = -EBADMSG;
649 	goto out;
650 excess_data:
651 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
652 			  call->cid, call->call_id, call->rx_consumed,
653 			  0, -EMSGSIZE);
654 	ret = -EMSGSIZE;
655 	goto out;
656 call_complete:
657 	*_abort = call->abort_code;
658 	ret = call->error;
659 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
660 		ret = 1;
661 		if (iov_iter_count(iter) > 0)
662 			ret = -ECONNRESET;
663 	}
664 	goto out;
665 }
666 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
667