xref: /openbmc/linux/net/rxrpc/recvmsg.c (revision faf92e8d)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 
33 	rcu_read_lock();
34 
35 	rx = rcu_dereference(call->socket);
36 	sk = &rx->sk;
37 	if (rx && sk->sk_state < RXRPC_CLOSE) {
38 		if (call->notify_rx) {
39 			spin_lock_bh(&call->notify_lock);
40 			call->notify_rx(sk, call, call->user_call_ID);
41 			spin_unlock_bh(&call->notify_lock);
42 		} else {
43 			write_lock_bh(&rx->recvmsg_lock);
44 			if (list_empty(&call->recvmsg_link)) {
45 				rxrpc_get_call(call, rxrpc_call_got);
46 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47 			}
48 			write_unlock_bh(&rx->recvmsg_lock);
49 
50 			if (!sock_flag(sk, SOCK_DEAD)) {
51 				_debug("call %ps", sk->sk_data_ready);
52 				sk->sk_data_ready(sk);
53 			}
54 		}
55 	}
56 
57 	rcu_read_unlock();
58 	_leave("");
59 }
60 
61 /*
62  * Transition a call to the complete state.
63  */
64 bool __rxrpc_set_call_completion(struct rxrpc_call *call,
65 				 enum rxrpc_call_completion compl,
66 				 u32 abort_code,
67 				 int error)
68 {
69 	if (call->state < RXRPC_CALL_COMPLETE) {
70 		call->abort_code = abort_code;
71 		call->error = error;
72 		call->completion = compl;
73 		call->state = RXRPC_CALL_COMPLETE;
74 		trace_rxrpc_call_complete(call);
75 		wake_up(&call->waitq);
76 		rxrpc_notify_socket(call);
77 		return true;
78 	}
79 	return false;
80 }
81 
82 bool rxrpc_set_call_completion(struct rxrpc_call *call,
83 			       enum rxrpc_call_completion compl,
84 			       u32 abort_code,
85 			       int error)
86 {
87 	bool ret = false;
88 
89 	if (call->state < RXRPC_CALL_COMPLETE) {
90 		write_lock_bh(&call->state_lock);
91 		ret = __rxrpc_set_call_completion(call, compl, abort_code, error);
92 		write_unlock_bh(&call->state_lock);
93 	}
94 	return ret;
95 }
96 
97 /*
98  * Record that a call successfully completed.
99  */
100 bool __rxrpc_call_completed(struct rxrpc_call *call)
101 {
102 	return __rxrpc_set_call_completion(call, RXRPC_CALL_SUCCEEDED, 0, 0);
103 }
104 
105 bool rxrpc_call_completed(struct rxrpc_call *call)
106 {
107 	bool ret = false;
108 
109 	if (call->state < RXRPC_CALL_COMPLETE) {
110 		write_lock_bh(&call->state_lock);
111 		ret = __rxrpc_call_completed(call);
112 		write_unlock_bh(&call->state_lock);
113 	}
114 	return ret;
115 }
116 
117 /*
118  * Record that a call is locally aborted.
119  */
120 bool __rxrpc_abort_call(const char *why, struct rxrpc_call *call,
121 			rxrpc_seq_t seq, u32 abort_code, int error)
122 {
123 	trace_rxrpc_abort(call->debug_id, why, call->cid, call->call_id, seq,
124 			  abort_code, error);
125 	return __rxrpc_set_call_completion(call, RXRPC_CALL_LOCALLY_ABORTED,
126 					   abort_code, error);
127 }
128 
129 bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
130 		      rxrpc_seq_t seq, u32 abort_code, int error)
131 {
132 	bool ret;
133 
134 	write_lock_bh(&call->state_lock);
135 	ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
136 	write_unlock_bh(&call->state_lock);
137 	return ret;
138 }
139 
140 /*
141  * Pass a call terminating message to userspace.
142  */
143 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
144 {
145 	u32 tmp = 0;
146 	int ret;
147 
148 	switch (call->completion) {
149 	case RXRPC_CALL_SUCCEEDED:
150 		ret = 0;
151 		if (rxrpc_is_service_call(call))
152 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
153 		break;
154 	case RXRPC_CALL_REMOTELY_ABORTED:
155 		tmp = call->abort_code;
156 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
157 		break;
158 	case RXRPC_CALL_LOCALLY_ABORTED:
159 		tmp = call->abort_code;
160 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
161 		break;
162 	case RXRPC_CALL_NETWORK_ERROR:
163 		tmp = -call->error;
164 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
165 		break;
166 	case RXRPC_CALL_LOCAL_ERROR:
167 		tmp = -call->error;
168 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
169 		break;
170 	default:
171 		pr_err("Invalid terminal call state %u\n", call->state);
172 		BUG();
173 		break;
174 	}
175 
176 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
177 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
178 	return ret;
179 }
180 
181 /*
182  * End the packet reception phase.
183  */
184 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
185 {
186 	_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
187 
188 	trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
189 	ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
190 
191 	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
192 		rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
193 
194 	write_lock_bh(&call->state_lock);
195 
196 	switch (call->state) {
197 	case RXRPC_CALL_CLIENT_RECV_REPLY:
198 		__rxrpc_call_completed(call);
199 		write_unlock_bh(&call->state_lock);
200 		break;
201 
202 	case RXRPC_CALL_SERVER_RECV_REQUEST:
203 		call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
204 		call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
205 		write_unlock_bh(&call->state_lock);
206 		rxrpc_propose_delay_ACK(call, serial,
207 					rxrpc_propose_ack_processing_op);
208 		break;
209 	default:
210 		write_unlock_bh(&call->state_lock);
211 		break;
212 	}
213 }
214 
215 /*
216  * Discard a packet we've used up and advance the Rx window by one.
217  */
218 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
219 {
220 	struct rxrpc_skb_priv *sp;
221 	struct sk_buff *skb;
222 	rxrpc_serial_t serial;
223 	rxrpc_seq_t hard_ack, top;
224 	bool last = false;
225 	u8 subpacket;
226 	int ix;
227 
228 	_enter("%d", call->debug_id);
229 
230 	hard_ack = call->rx_hard_ack;
231 	top = smp_load_acquire(&call->rx_top);
232 	ASSERT(before(hard_ack, top));
233 
234 	hard_ack++;
235 	ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
236 	skb = call->rxtx_buffer[ix];
237 	rxrpc_see_skb(skb, rxrpc_skb_rotated);
238 	sp = rxrpc_skb(skb);
239 
240 	subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
241 	serial = sp->hdr.serial + subpacket;
242 
243 	if (subpacket == sp->nr_subpackets - 1 &&
244 	    sp->rx_flags & RXRPC_SKB_INCL_LAST)
245 		last = true;
246 
247 	call->rxtx_buffer[ix] = NULL;
248 	call->rxtx_annotations[ix] = 0;
249 	/* Barrier against rxrpc_input_data(). */
250 	smp_store_release(&call->rx_hard_ack, hard_ack);
251 
252 	rxrpc_free_skb(skb, rxrpc_skb_freed);
253 
254 	trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
255 	if (last) {
256 		rxrpc_end_rx_phase(call, serial);
257 	} else {
258 		/* Check to see if there's an ACK that needs sending. */
259 		if (atomic_inc_return(&call->ackr_nr_consumed) > 2 &&
260 		    !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
261 			rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
262 				       rxrpc_propose_ack_rotate_rx);
263 			rxrpc_transmit_ack_packets(call->peer->local);
264 		}
265 	}
266 }
267 
268 /*
269  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
270  * padding, but if this is the case, the packet length will be resident in the
271  * socket buffer.  Note that we can't modify the master skb info as the skb may
272  * be the home to multiple subpackets.
273  */
274 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
275 			       u8 annotation,
276 			       unsigned int offset, unsigned int len)
277 {
278 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
279 	rxrpc_seq_t seq = sp->hdr.seq;
280 	u16 cksum = sp->hdr.cksum;
281 	u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
282 
283 	_enter("");
284 
285 	/* For all but the head jumbo subpacket, the security checksum is in a
286 	 * jumbo header immediately prior to the data.
287 	 */
288 	if (subpacket > 0) {
289 		__be16 tmp;
290 		if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
291 			BUG();
292 		cksum = ntohs(tmp);
293 		seq += subpacket;
294 	}
295 
296 	return call->security->verify_packet(call, skb, offset, len,
297 					     seq, cksum);
298 }
299 
300 /*
301  * Locate the data within a packet.  This is complicated by:
302  *
303  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
304  *     subpacket.
305  *
306  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
307  *     contains an extra header which includes the true length of the data,
308  *     excluding any encrypted padding.
309  */
310 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
311 			     u8 *_annotation,
312 			     unsigned int *_offset, unsigned int *_len,
313 			     bool *_last)
314 {
315 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
316 	unsigned int offset = sizeof(struct rxrpc_wire_header);
317 	unsigned int len;
318 	bool last = false;
319 	int ret;
320 	u8 annotation = *_annotation;
321 	u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
322 
323 	/* Locate the subpacket */
324 	offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
325 	len = skb->len - offset;
326 	if (subpacket < sp->nr_subpackets - 1)
327 		len = RXRPC_JUMBO_DATALEN;
328 	else if (sp->rx_flags & RXRPC_SKB_INCL_LAST)
329 		last = true;
330 
331 	if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
332 		ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
333 		if (ret < 0)
334 			return ret;
335 		*_annotation |= RXRPC_RX_ANNO_VERIFIED;
336 	}
337 
338 	*_offset = offset;
339 	*_len = len;
340 	*_last = last;
341 	call->security->locate_data(call, skb, _offset, _len);
342 	return 0;
343 }
344 
345 /*
346  * Deliver messages to a call.  This keeps processing packets until the buffer
347  * is filled and we find either more DATA (returns 0) or the end of the DATA
348  * (returns 1).  If more packets are required, it returns -EAGAIN.
349  */
350 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
351 			      struct msghdr *msg, struct iov_iter *iter,
352 			      size_t len, int flags, size_t *_offset)
353 {
354 	struct rxrpc_skb_priv *sp;
355 	struct sk_buff *skb;
356 	rxrpc_serial_t serial;
357 	rxrpc_seq_t hard_ack, top, seq;
358 	size_t remain;
359 	bool rx_pkt_last;
360 	unsigned int rx_pkt_offset, rx_pkt_len;
361 	int ix, copy, ret = -EAGAIN, ret2;
362 
363 	rx_pkt_offset = call->rx_pkt_offset;
364 	rx_pkt_len = call->rx_pkt_len;
365 	rx_pkt_last = call->rx_pkt_last;
366 
367 	if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
368 		seq = call->rx_hard_ack;
369 		ret = 1;
370 		goto done;
371 	}
372 
373 	/* Barriers against rxrpc_input_data(). */
374 	hard_ack = call->rx_hard_ack;
375 	seq = hard_ack + 1;
376 
377 	while (top = smp_load_acquire(&call->rx_top),
378 	       before_eq(seq, top)
379 	       ) {
380 		ix = seq & RXRPC_RXTX_BUFF_MASK;
381 		skb = call->rxtx_buffer[ix];
382 		if (!skb) {
383 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_hole, seq,
384 					     rx_pkt_offset, rx_pkt_len, 0);
385 			rxrpc_transmit_ack_packets(call->peer->local);
386 			break;
387 		}
388 		smp_rmb();
389 		rxrpc_see_skb(skb, rxrpc_skb_seen);
390 		sp = rxrpc_skb(skb);
391 
392 		if (!(flags & MSG_PEEK)) {
393 			serial = sp->hdr.serial;
394 			serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
395 			trace_rxrpc_receive(call, rxrpc_receive_front,
396 					    serial, seq);
397 		}
398 
399 		if (msg)
400 			sock_recv_timestamp(msg, sock->sk, skb);
401 
402 		if (rx_pkt_offset == 0) {
403 			ret2 = rxrpc_locate_data(call, skb,
404 						 &call->rxtx_annotations[ix],
405 						 &rx_pkt_offset, &rx_pkt_len,
406 						 &rx_pkt_last);
407 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
408 					     rx_pkt_offset, rx_pkt_len, ret2);
409 			if (ret2 < 0) {
410 				ret = ret2;
411 				goto out;
412 			}
413 		} else {
414 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
415 					     rx_pkt_offset, rx_pkt_len, 0);
416 		}
417 
418 		/* We have to handle short, empty and used-up DATA packets. */
419 		remain = len - *_offset;
420 		copy = rx_pkt_len;
421 		if (copy > remain)
422 			copy = remain;
423 		if (copy > 0) {
424 			ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
425 						      copy);
426 			if (ret2 < 0) {
427 				ret = ret2;
428 				goto out;
429 			}
430 
431 			/* handle piecemeal consumption of data packets */
432 			rx_pkt_offset += copy;
433 			rx_pkt_len -= copy;
434 			*_offset += copy;
435 		}
436 
437 		if (rx_pkt_len > 0) {
438 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
439 					     rx_pkt_offset, rx_pkt_len, 0);
440 			ASSERTCMP(*_offset, ==, len);
441 			ret = 0;
442 			break;
443 		}
444 
445 		/* The whole packet has been transferred. */
446 		if (!(flags & MSG_PEEK))
447 			rxrpc_rotate_rx_window(call);
448 		rx_pkt_offset = 0;
449 		rx_pkt_len = 0;
450 
451 		if (rx_pkt_last) {
452 			ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
453 			ret = 1;
454 			goto out;
455 		}
456 
457 		seq++;
458 	}
459 
460 out:
461 	if (!(flags & MSG_PEEK)) {
462 		call->rx_pkt_offset = rx_pkt_offset;
463 		call->rx_pkt_len = rx_pkt_len;
464 		call->rx_pkt_last = rx_pkt_last;
465 	}
466 done:
467 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
468 			     rx_pkt_offset, rx_pkt_len, ret);
469 	if (ret == -EAGAIN)
470 		set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
471 	return ret;
472 }
473 
474 /*
475  * Receive a message from an RxRPC socket
476  * - we need to be careful about two or more threads calling recvmsg
477  *   simultaneously
478  */
479 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
480 		  int flags)
481 {
482 	struct rxrpc_call *call;
483 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
484 	struct list_head *l;
485 	size_t copied = 0;
486 	long timeo;
487 	int ret;
488 
489 	DEFINE_WAIT(wait);
490 
491 	trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0);
492 
493 	if (flags & (MSG_OOB | MSG_TRUNC))
494 		return -EOPNOTSUPP;
495 
496 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
497 
498 try_again:
499 	lock_sock(&rx->sk);
500 
501 	/* Return immediately if a client socket has no outstanding calls */
502 	if (RB_EMPTY_ROOT(&rx->calls) &&
503 	    list_empty(&rx->recvmsg_q) &&
504 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
505 		release_sock(&rx->sk);
506 		return -EAGAIN;
507 	}
508 
509 	if (list_empty(&rx->recvmsg_q)) {
510 		ret = -EWOULDBLOCK;
511 		if (timeo == 0) {
512 			call = NULL;
513 			goto error_no_call;
514 		}
515 
516 		release_sock(&rx->sk);
517 
518 		/* Wait for something to happen */
519 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
520 					  TASK_INTERRUPTIBLE);
521 		ret = sock_error(&rx->sk);
522 		if (ret)
523 			goto wait_error;
524 
525 		if (list_empty(&rx->recvmsg_q)) {
526 			if (signal_pending(current))
527 				goto wait_interrupted;
528 			trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait, 0);
529 			timeo = schedule_timeout(timeo);
530 		}
531 		finish_wait(sk_sleep(&rx->sk), &wait);
532 		goto try_again;
533 	}
534 
535 	/* Find the next call and dequeue it if we're not just peeking.  If we
536 	 * do dequeue it, that comes with a ref that we will need to release.
537 	 */
538 	write_lock_bh(&rx->recvmsg_lock);
539 	l = rx->recvmsg_q.next;
540 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
541 	if (!(flags & MSG_PEEK))
542 		list_del_init(&call->recvmsg_link);
543 	else
544 		rxrpc_get_call(call, rxrpc_call_got);
545 	write_unlock_bh(&rx->recvmsg_lock);
546 
547 	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0);
548 
549 	/* We're going to drop the socket lock, so we need to lock the call
550 	 * against interference by sendmsg.
551 	 */
552 	if (!mutex_trylock(&call->user_mutex)) {
553 		ret = -EWOULDBLOCK;
554 		if (flags & MSG_DONTWAIT)
555 			goto error_requeue_call;
556 		ret = -ERESTARTSYS;
557 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
558 			goto error_requeue_call;
559 	}
560 
561 	release_sock(&rx->sk);
562 
563 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
564 		BUG();
565 
566 	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
567 		if (flags & MSG_CMSG_COMPAT) {
568 			unsigned int id32 = call->user_call_ID;
569 
570 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
571 				       sizeof(unsigned int), &id32);
572 		} else {
573 			unsigned long idl = call->user_call_ID;
574 
575 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
576 				       sizeof(unsigned long), &idl);
577 		}
578 		if (ret < 0)
579 			goto error_unlock_call;
580 	}
581 
582 	if (msg->msg_name && call->peer) {
583 		struct sockaddr_rxrpc *srx = msg->msg_name;
584 		size_t len = sizeof(call->peer->srx);
585 
586 		memcpy(msg->msg_name, &call->peer->srx, len);
587 		srx->srx_service = call->service_id;
588 		msg->msg_namelen = len;
589 	}
590 
591 	switch (READ_ONCE(call->state)) {
592 	case RXRPC_CALL_CLIENT_RECV_REPLY:
593 	case RXRPC_CALL_SERVER_RECV_REQUEST:
594 	case RXRPC_CALL_SERVER_ACK_REQUEST:
595 		ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
596 					 flags, &copied);
597 		if (ret == -EAGAIN)
598 			ret = 0;
599 
600 		rxrpc_transmit_ack_packets(call->peer->local);
601 		if (after(call->rx_top, call->rx_hard_ack) &&
602 		    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
603 			rxrpc_notify_socket(call);
604 		break;
605 	default:
606 		ret = 0;
607 		break;
608 	}
609 
610 	if (ret < 0)
611 		goto error_unlock_call;
612 
613 	if (call->state == RXRPC_CALL_COMPLETE) {
614 		ret = rxrpc_recvmsg_term(call, msg);
615 		if (ret < 0)
616 			goto error_unlock_call;
617 		if (!(flags & MSG_PEEK))
618 			rxrpc_release_call(rx, call);
619 		msg->msg_flags |= MSG_EOR;
620 		ret = 1;
621 	}
622 
623 	if (ret == 0)
624 		msg->msg_flags |= MSG_MORE;
625 	else
626 		msg->msg_flags &= ~MSG_MORE;
627 	ret = copied;
628 
629 error_unlock_call:
630 	mutex_unlock(&call->user_mutex);
631 	rxrpc_put_call(call, rxrpc_call_put);
632 	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret);
633 	return ret;
634 
635 error_requeue_call:
636 	if (!(flags & MSG_PEEK)) {
637 		write_lock_bh(&rx->recvmsg_lock);
638 		list_add(&call->recvmsg_link, &rx->recvmsg_q);
639 		write_unlock_bh(&rx->recvmsg_lock);
640 		trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0);
641 	} else {
642 		rxrpc_put_call(call, rxrpc_call_put);
643 	}
644 error_no_call:
645 	release_sock(&rx->sk);
646 error_trace:
647 	trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, ret);
648 	return ret;
649 
650 wait_interrupted:
651 	ret = sock_intr_errno(timeo);
652 wait_error:
653 	finish_wait(sk_sleep(&rx->sk), &wait);
654 	call = NULL;
655 	goto error_trace;
656 }
657 
658 /**
659  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
660  * @sock: The socket that the call exists on
661  * @call: The call to send data through
662  * @iter: The buffer to receive into
663  * @_len: The amount of data we want to receive (decreased on return)
664  * @want_more: True if more data is expected to be read
665  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
666  * @_service: Where to store the actual service ID (may be upgraded)
667  *
668  * Allow a kernel service to receive data and pick up information about the
669  * state of a call.  Returns 0 if got what was asked for and there's more
670  * available, 1 if we got what was asked for and we're at the end of the data
671  * and -EAGAIN if we need more data.
672  *
673  * Note that we may return -EAGAIN to drain empty packets at the end of the
674  * data, even if we've already copied over the requested data.
675  *
676  * *_abort should also be initialised to 0.
677  */
678 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
679 			   struct iov_iter *iter, size_t *_len,
680 			   bool want_more, u32 *_abort, u16 *_service)
681 {
682 	size_t offset = 0;
683 	int ret;
684 
685 	_enter("{%d,%s},%zu,%d",
686 	       call->debug_id, rxrpc_call_states[call->state],
687 	       *_len, want_more);
688 
689 	ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
690 
691 	mutex_lock(&call->user_mutex);
692 
693 	switch (READ_ONCE(call->state)) {
694 	case RXRPC_CALL_CLIENT_RECV_REPLY:
695 	case RXRPC_CALL_SERVER_RECV_REQUEST:
696 	case RXRPC_CALL_SERVER_ACK_REQUEST:
697 		ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
698 					 *_len, 0, &offset);
699 		*_len -= offset;
700 		if (ret < 0)
701 			goto out;
702 
703 		/* We can only reach here with a partially full buffer if we
704 		 * have reached the end of the data.  We must otherwise have a
705 		 * full buffer or have been given -EAGAIN.
706 		 */
707 		if (ret == 1) {
708 			if (iov_iter_count(iter) > 0)
709 				goto short_data;
710 			if (!want_more)
711 				goto read_phase_complete;
712 			ret = 0;
713 			goto out;
714 		}
715 
716 		if (!want_more)
717 			goto excess_data;
718 		goto out;
719 
720 	case RXRPC_CALL_COMPLETE:
721 		goto call_complete;
722 
723 	default:
724 		ret = -EINPROGRESS;
725 		goto out;
726 	}
727 
728 read_phase_complete:
729 	ret = 1;
730 out:
731 	rxrpc_transmit_ack_packets(call->peer->local);
732 	if (_service)
733 		*_service = call->service_id;
734 	mutex_unlock(&call->user_mutex);
735 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
736 	return ret;
737 
738 short_data:
739 	trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
740 	ret = -EBADMSG;
741 	goto out;
742 excess_data:
743 	trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
744 	ret = -EMSGSIZE;
745 	goto out;
746 call_complete:
747 	*_abort = call->abort_code;
748 	ret = call->error;
749 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
750 		ret = 1;
751 		if (iov_iter_count(iter) > 0)
752 			ret = -ECONNRESET;
753 	}
754 	goto out;
755 }
756 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
757