xref: /openbmc/linux/net/rxrpc/recvmsg.c (revision 0b9bb322)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* RxRPC recvmsg() implementation
3  *
4  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/skbuff.h>
12 #include <linux/export.h>
13 #include <linux/sched/signal.h>
14 
15 #include <net/sock.h>
16 #include <net/af_rxrpc.h>
17 #include "ar-internal.h"
18 
19 /*
20  * Post a call for attention by the socket or kernel service.  Further
21  * notifications are suppressed by putting recvmsg_link on a dummy queue.
22  */
23 void rxrpc_notify_socket(struct rxrpc_call *call)
24 {
25 	struct rxrpc_sock *rx;
26 	struct sock *sk;
27 
28 	_enter("%d", call->debug_id);
29 
30 	if (!list_empty(&call->recvmsg_link))
31 		return;
32 
33 	rcu_read_lock();
34 
35 	rx = rcu_dereference(call->socket);
36 	sk = &rx->sk;
37 	if (rx && sk->sk_state < RXRPC_CLOSE) {
38 		if (call->notify_rx) {
39 			spin_lock(&call->notify_lock);
40 			call->notify_rx(sk, call, call->user_call_ID);
41 			spin_unlock(&call->notify_lock);
42 		} else {
43 			write_lock(&rx->recvmsg_lock);
44 			if (list_empty(&call->recvmsg_link)) {
45 				rxrpc_get_call(call, rxrpc_call_get_notify_socket);
46 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47 			}
48 			write_unlock(&rx->recvmsg_lock);
49 
50 			if (!sock_flag(sk, SOCK_DEAD)) {
51 				_debug("call %ps", sk->sk_data_ready);
52 				sk->sk_data_ready(sk);
53 			}
54 		}
55 	}
56 
57 	rcu_read_unlock();
58 	_leave("");
59 }
60 
61 /*
62  * Pass a call terminating message to userspace.
63  */
64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
65 {
66 	u32 tmp = 0;
67 	int ret;
68 
69 	switch (call->completion) {
70 	case RXRPC_CALL_SUCCEEDED:
71 		ret = 0;
72 		if (rxrpc_is_service_call(call))
73 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
74 		break;
75 	case RXRPC_CALL_REMOTELY_ABORTED:
76 		tmp = call->abort_code;
77 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
78 		break;
79 	case RXRPC_CALL_LOCALLY_ABORTED:
80 		tmp = call->abort_code;
81 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 		break;
83 	case RXRPC_CALL_NETWORK_ERROR:
84 		tmp = -call->error;
85 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
86 		break;
87 	case RXRPC_CALL_LOCAL_ERROR:
88 		tmp = -call->error;
89 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
90 		break;
91 	default:
92 		pr_err("Invalid terminal call state %u\n", call->state);
93 		BUG();
94 		break;
95 	}
96 
97 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
98 			     lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
99 			     call->rx_pkt_offset, call->rx_pkt_len, ret);
100 	return ret;
101 }
102 
103 /*
104  * End the packet reception phase.
105  */
106 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
107 {
108 	rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
109 
110 	_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
111 
112 	trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
113 
114 	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
115 		rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
116 
117 	write_lock(&call->state_lock);
118 
119 	switch (call->state) {
120 	case RXRPC_CALL_CLIENT_RECV_REPLY:
121 		__rxrpc_call_completed(call);
122 		write_unlock(&call->state_lock);
123 		rxrpc_poke_call(call, rxrpc_call_poke_complete);
124 		break;
125 
126 	case RXRPC_CALL_SERVER_RECV_REQUEST:
127 		call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
128 		call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
129 		write_unlock(&call->state_lock);
130 		rxrpc_propose_delay_ACK(call, serial,
131 					rxrpc_propose_ack_processing_op);
132 		break;
133 	default:
134 		write_unlock(&call->state_lock);
135 		break;
136 	}
137 }
138 
139 /*
140  * Discard a packet we've used up and advance the Rx window by one.
141  */
142 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
143 {
144 	struct rxrpc_skb_priv *sp;
145 	struct sk_buff *skb;
146 	rxrpc_serial_t serial;
147 	rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
148 	bool last;
149 	int acked;
150 
151 	_enter("%d", call->debug_id);
152 
153 	skb = skb_dequeue(&call->recvmsg_queue);
154 	rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
155 
156 	sp = rxrpc_skb(skb);
157 	tseq   = sp->hdr.seq;
158 	serial = sp->hdr.serial;
159 	last   = sp->hdr.flags & RXRPC_LAST_PACKET;
160 
161 	/* Barrier against rxrpc_input_data(). */
162 	if (after(tseq, call->rx_consumed))
163 		smp_store_release(&call->rx_consumed, tseq);
164 
165 	rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
166 
167 	trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
168 			    serial, call->rx_consumed);
169 	if (last) {
170 		rxrpc_end_rx_phase(call, serial);
171 		return;
172 	}
173 
174 	/* Check to see if there's an ACK that needs sending. */
175 	acked = atomic_add_return(call->rx_consumed - old_consumed,
176 				  &call->ackr_nr_consumed);
177 	if (acked > 2 &&
178 	    !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
179 		rxrpc_poke_call(call, rxrpc_call_poke_idle);
180 }
181 
182 /*
183  * Decrypt and verify a DATA packet.
184  */
185 static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
186 {
187 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
188 
189 	if (sp->flags & RXRPC_RX_VERIFIED)
190 		return 0;
191 	return call->security->verify_packet(call, skb);
192 }
193 
194 /*
195  * Deliver messages to a call.  This keeps processing packets until the buffer
196  * is filled and we find either more DATA (returns 0) or the end of the DATA
197  * (returns 1).  If more packets are required, it returns -EAGAIN.
198  */
199 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
200 			      struct msghdr *msg, struct iov_iter *iter,
201 			      size_t len, int flags, size_t *_offset)
202 {
203 	struct rxrpc_skb_priv *sp;
204 	struct sk_buff *skb;
205 	rxrpc_seq_t seq = 0;
206 	size_t remain;
207 	unsigned int rx_pkt_offset, rx_pkt_len;
208 	int copy, ret = -EAGAIN, ret2;
209 
210 	rx_pkt_offset = call->rx_pkt_offset;
211 	rx_pkt_len = call->rx_pkt_len;
212 
213 	if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
214 		seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
215 		ret = 1;
216 		goto done;
217 	}
218 
219 	/* No one else can be removing stuff from the queue, so we shouldn't
220 	 * need the Rx lock to walk it.
221 	 */
222 	skb = skb_peek(&call->recvmsg_queue);
223 	while (skb) {
224 		rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
225 		sp = rxrpc_skb(skb);
226 		seq = sp->hdr.seq;
227 
228 		if (!(flags & MSG_PEEK))
229 			trace_rxrpc_receive(call, rxrpc_receive_front,
230 					    sp->hdr.serial, seq);
231 
232 		if (msg)
233 			sock_recv_timestamp(msg, sock->sk, skb);
234 
235 		if (rx_pkt_offset == 0) {
236 			ret2 = rxrpc_verify_data(call, skb);
237 			rx_pkt_offset = sp->offset;
238 			rx_pkt_len = sp->len;
239 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
240 					     rx_pkt_offset, rx_pkt_len, ret2);
241 			if (ret2 < 0) {
242 				ret = ret2;
243 				goto out;
244 			}
245 		} else {
246 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
247 					     rx_pkt_offset, rx_pkt_len, 0);
248 		}
249 
250 		/* We have to handle short, empty and used-up DATA packets. */
251 		remain = len - *_offset;
252 		copy = rx_pkt_len;
253 		if (copy > remain)
254 			copy = remain;
255 		if (copy > 0) {
256 			ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
257 						      copy);
258 			if (ret2 < 0) {
259 				ret = ret2;
260 				goto out;
261 			}
262 
263 			/* handle piecemeal consumption of data packets */
264 			rx_pkt_offset += copy;
265 			rx_pkt_len -= copy;
266 			*_offset += copy;
267 		}
268 
269 		if (rx_pkt_len > 0) {
270 			trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
271 					     rx_pkt_offset, rx_pkt_len, 0);
272 			ASSERTCMP(*_offset, ==, len);
273 			ret = 0;
274 			break;
275 		}
276 
277 		/* The whole packet has been transferred. */
278 		if (sp->hdr.flags & RXRPC_LAST_PACKET)
279 			ret = 1;
280 		rx_pkt_offset = 0;
281 		rx_pkt_len = 0;
282 
283 		skb = skb_peek_next(skb, &call->recvmsg_queue);
284 
285 		if (!(flags & MSG_PEEK))
286 			rxrpc_rotate_rx_window(call);
287 	}
288 
289 out:
290 	if (!(flags & MSG_PEEK)) {
291 		call->rx_pkt_offset = rx_pkt_offset;
292 		call->rx_pkt_len = rx_pkt_len;
293 	}
294 done:
295 	trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
296 			     rx_pkt_offset, rx_pkt_len, ret);
297 	if (ret == -EAGAIN)
298 		set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
299 	return ret;
300 }
301 
302 /*
303  * Receive a message from an RxRPC socket
304  * - we need to be careful about two or more threads calling recvmsg
305  *   simultaneously
306  */
307 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
308 		  int flags)
309 {
310 	struct rxrpc_call *call;
311 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
312 	struct list_head *l;
313 	unsigned int call_debug_id = 0;
314 	size_t copied = 0;
315 	long timeo;
316 	int ret;
317 
318 	DEFINE_WAIT(wait);
319 
320 	trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
321 
322 	if (flags & (MSG_OOB | MSG_TRUNC))
323 		return -EOPNOTSUPP;
324 
325 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
326 
327 try_again:
328 	lock_sock(&rx->sk);
329 
330 	/* Return immediately if a client socket has no outstanding calls */
331 	if (RB_EMPTY_ROOT(&rx->calls) &&
332 	    list_empty(&rx->recvmsg_q) &&
333 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
334 		release_sock(&rx->sk);
335 		return -EAGAIN;
336 	}
337 
338 	if (list_empty(&rx->recvmsg_q)) {
339 		ret = -EWOULDBLOCK;
340 		if (timeo == 0) {
341 			call = NULL;
342 			goto error_no_call;
343 		}
344 
345 		release_sock(&rx->sk);
346 
347 		/* Wait for something to happen */
348 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
349 					  TASK_INTERRUPTIBLE);
350 		ret = sock_error(&rx->sk);
351 		if (ret)
352 			goto wait_error;
353 
354 		if (list_empty(&rx->recvmsg_q)) {
355 			if (signal_pending(current))
356 				goto wait_interrupted;
357 			trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
358 			timeo = schedule_timeout(timeo);
359 		}
360 		finish_wait(sk_sleep(&rx->sk), &wait);
361 		goto try_again;
362 	}
363 
364 	/* Find the next call and dequeue it if we're not just peeking.  If we
365 	 * do dequeue it, that comes with a ref that we will need to release.
366 	 */
367 	write_lock(&rx->recvmsg_lock);
368 	l = rx->recvmsg_q.next;
369 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
370 	if (!(flags & MSG_PEEK))
371 		list_del_init(&call->recvmsg_link);
372 	else
373 		rxrpc_get_call(call, rxrpc_call_get_recvmsg);
374 	write_unlock(&rx->recvmsg_lock);
375 
376 	call_debug_id = call->debug_id;
377 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
378 
379 	/* We're going to drop the socket lock, so we need to lock the call
380 	 * against interference by sendmsg.
381 	 */
382 	if (!mutex_trylock(&call->user_mutex)) {
383 		ret = -EWOULDBLOCK;
384 		if (flags & MSG_DONTWAIT)
385 			goto error_requeue_call;
386 		ret = -ERESTARTSYS;
387 		if (mutex_lock_interruptible(&call->user_mutex) < 0)
388 			goto error_requeue_call;
389 	}
390 
391 	release_sock(&rx->sk);
392 
393 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
394 		BUG();
395 
396 	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
397 		if (flags & MSG_CMSG_COMPAT) {
398 			unsigned int id32 = call->user_call_ID;
399 
400 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
401 				       sizeof(unsigned int), &id32);
402 		} else {
403 			unsigned long idl = call->user_call_ID;
404 
405 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
406 				       sizeof(unsigned long), &idl);
407 		}
408 		if (ret < 0)
409 			goto error_unlock_call;
410 	}
411 
412 	if (msg->msg_name && call->peer) {
413 		size_t len = sizeof(call->dest_srx);
414 
415 		memcpy(msg->msg_name, &call->dest_srx, len);
416 		msg->msg_namelen = len;
417 	}
418 
419 	switch (READ_ONCE(call->state)) {
420 	case RXRPC_CALL_CLIENT_RECV_REPLY:
421 	case RXRPC_CALL_SERVER_RECV_REQUEST:
422 	case RXRPC_CALL_SERVER_ACK_REQUEST:
423 		ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
424 					 flags, &copied);
425 		if (ret == -EAGAIN)
426 			ret = 0;
427 
428 		if (!skb_queue_empty(&call->recvmsg_queue))
429 			rxrpc_notify_socket(call);
430 		break;
431 	default:
432 		ret = 0;
433 		break;
434 	}
435 
436 	if (ret < 0)
437 		goto error_unlock_call;
438 
439 	if (call->state == RXRPC_CALL_COMPLETE) {
440 		ret = rxrpc_recvmsg_term(call, msg);
441 		if (ret < 0)
442 			goto error_unlock_call;
443 		if (!(flags & MSG_PEEK))
444 			rxrpc_release_call(rx, call);
445 		msg->msg_flags |= MSG_EOR;
446 		ret = 1;
447 	}
448 
449 	if (ret == 0)
450 		msg->msg_flags |= MSG_MORE;
451 	else
452 		msg->msg_flags &= ~MSG_MORE;
453 	ret = copied;
454 
455 error_unlock_call:
456 	mutex_unlock(&call->user_mutex);
457 	rxrpc_put_call(call, rxrpc_call_put_recvmsg);
458 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
459 	return ret;
460 
461 error_requeue_call:
462 	if (!(flags & MSG_PEEK)) {
463 		write_lock(&rx->recvmsg_lock);
464 		list_add(&call->recvmsg_link, &rx->recvmsg_q);
465 		write_unlock(&rx->recvmsg_lock);
466 		trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
467 	} else {
468 		rxrpc_put_call(call, rxrpc_call_put_recvmsg);
469 	}
470 error_no_call:
471 	release_sock(&rx->sk);
472 error_trace:
473 	trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
474 	return ret;
475 
476 wait_interrupted:
477 	ret = sock_intr_errno(timeo);
478 wait_error:
479 	finish_wait(sk_sleep(&rx->sk), &wait);
480 	call = NULL;
481 	goto error_trace;
482 }
483 
484 /**
485  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
486  * @sock: The socket that the call exists on
487  * @call: The call to send data through
488  * @iter: The buffer to receive into
489  * @_len: The amount of data we want to receive (decreased on return)
490  * @want_more: True if more data is expected to be read
491  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
492  * @_service: Where to store the actual service ID (may be upgraded)
493  *
494  * Allow a kernel service to receive data and pick up information about the
495  * state of a call.  Returns 0 if got what was asked for and there's more
496  * available, 1 if we got what was asked for and we're at the end of the data
497  * and -EAGAIN if we need more data.
498  *
499  * Note that we may return -EAGAIN to drain empty packets at the end of the
500  * data, even if we've already copied over the requested data.
501  *
502  * *_abort should also be initialised to 0.
503  */
504 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
505 			   struct iov_iter *iter, size_t *_len,
506 			   bool want_more, u32 *_abort, u16 *_service)
507 {
508 	size_t offset = 0;
509 	int ret;
510 
511 	_enter("{%d,%s},%zu,%d",
512 	       call->debug_id, rxrpc_call_states[call->state],
513 	       *_len, want_more);
514 
515 	ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
516 
517 	mutex_lock(&call->user_mutex);
518 
519 	switch (READ_ONCE(call->state)) {
520 	case RXRPC_CALL_CLIENT_RECV_REPLY:
521 	case RXRPC_CALL_SERVER_RECV_REQUEST:
522 	case RXRPC_CALL_SERVER_ACK_REQUEST:
523 		ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
524 					 *_len, 0, &offset);
525 		*_len -= offset;
526 		if (ret < 0)
527 			goto out;
528 
529 		/* We can only reach here with a partially full buffer if we
530 		 * have reached the end of the data.  We must otherwise have a
531 		 * full buffer or have been given -EAGAIN.
532 		 */
533 		if (ret == 1) {
534 			if (iov_iter_count(iter) > 0)
535 				goto short_data;
536 			if (!want_more)
537 				goto read_phase_complete;
538 			ret = 0;
539 			goto out;
540 		}
541 
542 		if (!want_more)
543 			goto excess_data;
544 		goto out;
545 
546 	case RXRPC_CALL_COMPLETE:
547 		goto call_complete;
548 
549 	default:
550 		ret = -EINPROGRESS;
551 		goto out;
552 	}
553 
554 read_phase_complete:
555 	ret = 1;
556 out:
557 	if (_service)
558 		*_service = call->dest_srx.srx_service;
559 	mutex_unlock(&call->user_mutex);
560 	_leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
561 	return ret;
562 
563 short_data:
564 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
565 			  call->cid, call->call_id, call->rx_consumed,
566 			  0, -EBADMSG);
567 	ret = -EBADMSG;
568 	goto out;
569 excess_data:
570 	trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
571 			  call->cid, call->call_id, call->rx_consumed,
572 			  0, -EMSGSIZE);
573 	ret = -EMSGSIZE;
574 	goto out;
575 call_complete:
576 	*_abort = call->abort_code;
577 	ret = call->error;
578 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
579 		ret = 1;
580 		if (iov_iter_count(iter) > 0)
581 			ret = -ECONNRESET;
582 	}
583 	goto out;
584 }
585 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
586