xref: /openbmc/linux/net/rxrpc/recvmsg.c (revision 248f219c)
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13 
14 #include <linux/net.h>
15 #include <linux/skbuff.h>
16 #include <linux/export.h>
17 #include <net/sock.h>
18 #include <net/af_rxrpc.h>
19 #include "ar-internal.h"
20 
21 /*
22  * Post a call for attention by the socket or kernel service.  Further
23  * notifications are suppressed by putting recvmsg_link on a dummy queue.
24  */
25 void rxrpc_notify_socket(struct rxrpc_call *call)
26 {
27 	struct rxrpc_sock *rx;
28 	struct sock *sk;
29 
30 	_enter("%d", call->debug_id);
31 
32 	if (!list_empty(&call->recvmsg_link))
33 		return;
34 
35 	rcu_read_lock();
36 
37 	rx = rcu_dereference(call->socket);
38 	sk = &rx->sk;
39 	if (rx && sk->sk_state < RXRPC_CLOSE) {
40 		if (call->notify_rx) {
41 			call->notify_rx(sk, call, call->user_call_ID);
42 		} else {
43 			write_lock_bh(&rx->recvmsg_lock);
44 			if (list_empty(&call->recvmsg_link)) {
45 				rxrpc_get_call(call, rxrpc_call_got);
46 				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
47 			}
48 			write_unlock_bh(&rx->recvmsg_lock);
49 
50 			if (!sock_flag(sk, SOCK_DEAD)) {
51 				_debug("call %ps", sk->sk_data_ready);
52 				sk->sk_data_ready(sk);
53 			}
54 		}
55 	}
56 
57 	rcu_read_unlock();
58 	_leave("");
59 }
60 
61 /*
62  * Pass a call terminating message to userspace.
63  */
64 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
65 {
66 	u32 tmp = 0;
67 	int ret;
68 
69 	switch (call->completion) {
70 	case RXRPC_CALL_SUCCEEDED:
71 		ret = 0;
72 		if (rxrpc_is_service_call(call))
73 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
74 		break;
75 	case RXRPC_CALL_REMOTELY_ABORTED:
76 		tmp = call->abort_code;
77 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
78 		break;
79 	case RXRPC_CALL_LOCALLY_ABORTED:
80 		tmp = call->abort_code;
81 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
82 		break;
83 	case RXRPC_CALL_NETWORK_ERROR:
84 		tmp = call->error;
85 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
86 		break;
87 	case RXRPC_CALL_LOCAL_ERROR:
88 		tmp = call->error;
89 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
90 		break;
91 	default:
92 		pr_err("Invalid terminal call state %u\n", call->state);
93 		BUG();
94 		break;
95 	}
96 
97 	return ret;
98 }
99 
100 /*
101  * Pass back notification of a new call.  The call is added to the
102  * to-be-accepted list.  This means that the next call to be accepted might not
103  * be the last call seen awaiting acceptance, but unless we leave this on the
104  * front of the queue and block all other messages until someone gives us a
105  * user_ID for it, there's not a lot we can do.
106  */
107 static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
108 				  struct rxrpc_call *call,
109 				  struct msghdr *msg, int flags)
110 {
111 	int tmp = 0, ret;
112 
113 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
114 
115 	if (ret == 0 && !(flags & MSG_PEEK)) {
116 		_debug("to be accepted");
117 		write_lock_bh(&rx->recvmsg_lock);
118 		list_del_init(&call->recvmsg_link);
119 		write_unlock_bh(&rx->recvmsg_lock);
120 
121 		write_lock(&rx->call_lock);
122 		list_add_tail(&call->accept_link, &rx->to_be_accepted);
123 		write_unlock(&rx->call_lock);
124 	}
125 
126 	return ret;
127 }
128 
129 /*
130  * End the packet reception phase.
131  */
132 static void rxrpc_end_rx_phase(struct rxrpc_call *call)
133 {
134 	_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
135 
136 	if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
137 		rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, true, false);
138 		rxrpc_send_call_packet(call, RXRPC_PACKET_TYPE_ACK);
139 	} else {
140 		rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, 0, false, false);
141 	}
142 
143 	write_lock_bh(&call->state_lock);
144 
145 	switch (call->state) {
146 	case RXRPC_CALL_CLIENT_RECV_REPLY:
147 		__rxrpc_call_completed(call);
148 		break;
149 
150 	case RXRPC_CALL_SERVER_RECV_REQUEST:
151 		call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
152 		break;
153 	default:
154 		break;
155 	}
156 
157 	write_unlock_bh(&call->state_lock);
158 }
159 
160 /*
161  * Discard a packet we've used up and advance the Rx window by one.
162  */
163 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
164 {
165 	struct sk_buff *skb;
166 	rxrpc_seq_t hard_ack, top;
167 	int ix;
168 
169 	_enter("%d", call->debug_id);
170 
171 	hard_ack = call->rx_hard_ack;
172 	top = smp_load_acquire(&call->rx_top);
173 	ASSERT(before(hard_ack, top));
174 
175 	hard_ack++;
176 	ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
177 	skb = call->rxtx_buffer[ix];
178 	rxrpc_see_skb(skb);
179 	call->rxtx_buffer[ix] = NULL;
180 	call->rxtx_annotations[ix] = 0;
181 	/* Barrier against rxrpc_input_data(). */
182 	smp_store_release(&call->rx_hard_ack, hard_ack);
183 
184 	rxrpc_free_skb(skb);
185 
186 	_debug("%u,%u,%lx", hard_ack, top, call->flags);
187 	if (hard_ack == top && test_bit(RXRPC_CALL_RX_LAST, &call->flags))
188 		rxrpc_end_rx_phase(call);
189 }
190 
191 /*
192  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
193  * padding, but if this is the case, the packet length will be resident in the
194  * socket buffer.  Note that we can't modify the master skb info as the skb may
195  * be the home to multiple subpackets.
196  */
197 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
198 			       u8 annotation,
199 			       unsigned int offset, unsigned int len)
200 {
201 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
202 	rxrpc_seq_t seq = sp->hdr.seq;
203 	u16 cksum = sp->hdr.cksum;
204 
205 	_enter("");
206 
207 	/* For all but the head jumbo subpacket, the security checksum is in a
208 	 * jumbo header immediately prior to the data.
209 	 */
210 	if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
211 		__be16 tmp;
212 		if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
213 			BUG();
214 		cksum = ntohs(tmp);
215 		seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
216 	}
217 
218 	return call->conn->security->verify_packet(call, skb, offset, len,
219 						   seq, cksum);
220 }
221 
222 /*
223  * Locate the data within a packet.  This is complicated by:
224  *
225  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
226  *     subpacket.
227  *
228  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
229  *     contains an extra header which includes the true length of the data,
230  *     excluding any encrypted padding.
231  */
232 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
233 			     u8 *_annotation,
234 			     unsigned int *_offset, unsigned int *_len)
235 {
236 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
237 	unsigned int offset = *_offset;
238 	unsigned int len = *_len;
239 	int ret;
240 	u8 annotation = *_annotation;
241 
242 	if (offset > 0)
243 		return 0;
244 
245 	/* Locate the subpacket */
246 	offset = sp->offset;
247 	len = skb->len - sp->offset;
248 	if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
249 		offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
250 			   RXRPC_JUMBO_SUBPKTLEN);
251 		len = (annotation & RXRPC_RX_ANNO_JLAST) ?
252 			skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
253 	}
254 
255 	if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
256 		ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
257 		if (ret < 0)
258 			return ret;
259 		*_annotation |= RXRPC_RX_ANNO_VERIFIED;
260 	}
261 
262 	*_offset = offset;
263 	*_len = len;
264 	call->conn->security->locate_data(call, skb, _offset, _len);
265 	return 0;
266 }
267 
268 /*
269  * Deliver messages to a call.  This keeps processing packets until the buffer
270  * is filled and we find either more DATA (returns 0) or the end of the DATA
271  * (returns 1).  If more packets are required, it returns -EAGAIN.
272  */
273 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
274 			      struct msghdr *msg, struct iov_iter *iter,
275 			      size_t len, int flags, size_t *_offset)
276 {
277 	struct rxrpc_skb_priv *sp;
278 	struct sk_buff *skb;
279 	rxrpc_seq_t hard_ack, top, seq;
280 	size_t remain;
281 	bool last;
282 	unsigned int rx_pkt_offset, rx_pkt_len;
283 	int ix, copy, ret = 0;
284 
285 	_enter("");
286 
287 	rx_pkt_offset = call->rx_pkt_offset;
288 	rx_pkt_len = call->rx_pkt_len;
289 
290 	/* Barriers against rxrpc_input_data(). */
291 	hard_ack = call->rx_hard_ack;
292 	top = smp_load_acquire(&call->rx_top);
293 	for (seq = hard_ack + 1; before_eq(seq, top); seq++) {
294 		ix = seq & RXRPC_RXTX_BUFF_MASK;
295 		skb = call->rxtx_buffer[ix];
296 		if (!skb)
297 			break;
298 		smp_rmb();
299 		rxrpc_see_skb(skb);
300 		sp = rxrpc_skb(skb);
301 
302 		if (msg)
303 			sock_recv_timestamp(msg, sock->sk, skb);
304 
305 		ret = rxrpc_locate_data(call, skb, &call->rxtx_annotations[ix],
306 					&rx_pkt_offset, &rx_pkt_len);
307 		_debug("recvmsg %x DATA #%u { %d, %d }",
308 		       sp->hdr.callNumber, seq, rx_pkt_offset, rx_pkt_len);
309 
310 		/* We have to handle short, empty and used-up DATA packets. */
311 		remain = len - *_offset;
312 		copy = rx_pkt_len;
313 		if (copy > remain)
314 			copy = remain;
315 		if (copy > 0) {
316 			ret = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
317 						     copy);
318 			if (ret < 0)
319 				goto out;
320 
321 			/* handle piecemeal consumption of data packets */
322 			_debug("copied %d @%zu", copy, *_offset);
323 
324 			rx_pkt_offset += copy;
325 			rx_pkt_len -= copy;
326 			*_offset += copy;
327 		}
328 
329 		if (rx_pkt_len > 0) {
330 			_debug("buffer full");
331 			ASSERTCMP(*_offset, ==, len);
332 			break;
333 		}
334 
335 		/* The whole packet has been transferred. */
336 		last = sp->hdr.flags & RXRPC_LAST_PACKET;
337 		if (!(flags & MSG_PEEK))
338 			rxrpc_rotate_rx_window(call);
339 		rx_pkt_offset = 0;
340 		rx_pkt_len = 0;
341 
342 		ASSERTIFCMP(last, seq, ==, top);
343 	}
344 
345 	if (after(seq, top)) {
346 		ret = -EAGAIN;
347 		if (test_bit(RXRPC_CALL_RX_LAST, &call->flags))
348 			ret = 1;
349 	}
350 out:
351 	if (!(flags & MSG_PEEK)) {
352 		call->rx_pkt_offset = rx_pkt_offset;
353 		call->rx_pkt_len = rx_pkt_len;
354 	}
355 	_leave(" = %d [%u/%u]", ret, seq, top);
356 	return ret;
357 }
358 
359 /*
360  * Receive a message from an RxRPC socket
361  * - we need to be careful about two or more threads calling recvmsg
362  *   simultaneously
363  */
364 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
365 		  int flags)
366 {
367 	struct rxrpc_call *call;
368 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
369 	struct list_head *l;
370 	size_t copied = 0;
371 	long timeo;
372 	int ret;
373 
374 	DEFINE_WAIT(wait);
375 
376 	_enter(",,,%zu,%d", len, flags);
377 
378 	if (flags & (MSG_OOB | MSG_TRUNC))
379 		return -EOPNOTSUPP;
380 
381 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
382 
383 try_again:
384 	lock_sock(&rx->sk);
385 
386 	/* Return immediately if a client socket has no outstanding calls */
387 	if (RB_EMPTY_ROOT(&rx->calls) &&
388 	    list_empty(&rx->recvmsg_q) &&
389 	    rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
390 		release_sock(&rx->sk);
391 		return -ENODATA;
392 	}
393 
394 	if (list_empty(&rx->recvmsg_q)) {
395 		ret = -EWOULDBLOCK;
396 		if (timeo == 0)
397 			goto error_no_call;
398 
399 		release_sock(&rx->sk);
400 
401 		/* Wait for something to happen */
402 		prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
403 					  TASK_INTERRUPTIBLE);
404 		ret = sock_error(&rx->sk);
405 		if (ret)
406 			goto wait_error;
407 
408 		if (list_empty(&rx->recvmsg_q)) {
409 			if (signal_pending(current))
410 				goto wait_interrupted;
411 			timeo = schedule_timeout(timeo);
412 		}
413 		finish_wait(sk_sleep(&rx->sk), &wait);
414 		goto try_again;
415 	}
416 
417 	/* Find the next call and dequeue it if we're not just peeking.  If we
418 	 * do dequeue it, that comes with a ref that we will need to release.
419 	 */
420 	write_lock_bh(&rx->recvmsg_lock);
421 	l = rx->recvmsg_q.next;
422 	call = list_entry(l, struct rxrpc_call, recvmsg_link);
423 	if (!(flags & MSG_PEEK))
424 		list_del_init(&call->recvmsg_link);
425 	else
426 		rxrpc_get_call(call, rxrpc_call_got);
427 	write_unlock_bh(&rx->recvmsg_lock);
428 
429 	_debug("recvmsg call %p", call);
430 
431 	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
432 		BUG();
433 
434 	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
435 		if (flags & MSG_CMSG_COMPAT) {
436 			unsigned int id32 = call->user_call_ID;
437 
438 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
439 				       sizeof(unsigned int), &id32);
440 		} else {
441 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
442 				       sizeof(unsigned long),
443 				       &call->user_call_ID);
444 		}
445 		if (ret < 0)
446 			goto error;
447 	}
448 
449 	if (msg->msg_name) {
450 		size_t len = sizeof(call->conn->params.peer->srx);
451 		memcpy(msg->msg_name, &call->conn->params.peer->srx, len);
452 		msg->msg_namelen = len;
453 	}
454 
455 	switch (call->state) {
456 	case RXRPC_CALL_SERVER_ACCEPTING:
457 		ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
458 		break;
459 	case RXRPC_CALL_CLIENT_RECV_REPLY:
460 	case RXRPC_CALL_SERVER_RECV_REQUEST:
461 	case RXRPC_CALL_SERVER_ACK_REQUEST:
462 		ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
463 					 flags, &copied);
464 		if (ret == -EAGAIN)
465 			ret = 0;
466 		break;
467 	default:
468 		ret = 0;
469 		break;
470 	}
471 
472 	if (ret < 0)
473 		goto error;
474 
475 	if (call->state == RXRPC_CALL_COMPLETE) {
476 		ret = rxrpc_recvmsg_term(call, msg);
477 		if (ret < 0)
478 			goto error;
479 		if (!(flags & MSG_PEEK))
480 			rxrpc_release_call(rx, call);
481 		msg->msg_flags |= MSG_EOR;
482 		ret = 1;
483 	}
484 
485 	if (ret == 0)
486 		msg->msg_flags |= MSG_MORE;
487 	else
488 		msg->msg_flags &= ~MSG_MORE;
489 	ret = copied;
490 
491 error:
492 	rxrpc_put_call(call, rxrpc_call_put);
493 error_no_call:
494 	release_sock(&rx->sk);
495 	_leave(" = %d", ret);
496 	return ret;
497 
498 wait_interrupted:
499 	ret = sock_intr_errno(timeo);
500 wait_error:
501 	finish_wait(sk_sleep(&rx->sk), &wait);
502 	release_sock(&rx->sk);
503 	_leave(" = %d [wait]", ret);
504 	return ret;
505 }
506 
507 /**
508  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
509  * @sock: The socket that the call exists on
510  * @call: The call to send data through
511  * @buf: The buffer to receive into
512  * @size: The size of the buffer, including data already read
513  * @_offset: The running offset into the buffer.
514  * @want_more: True if more data is expected to be read
515  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
516  *
517  * Allow a kernel service to receive data and pick up information about the
518  * state of a call.  Returns 0 if got what was asked for and there's more
519  * available, 1 if we got what was asked for and we're at the end of the data
520  * and -EAGAIN if we need more data.
521  *
522  * Note that we may return -EAGAIN to drain empty packets at the end of the
523  * data, even if we've already copied over the requested data.
524  *
525  * This function adds the amount it transfers to *_offset, so this should be
526  * precleared as appropriate.  Note that the amount remaining in the buffer is
527  * taken to be size - *_offset.
528  *
529  * *_abort should also be initialised to 0.
530  */
531 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
532 			   void *buf, size_t size, size_t *_offset,
533 			   bool want_more, u32 *_abort)
534 {
535 	struct iov_iter iter;
536 	struct kvec iov;
537 	int ret;
538 
539 	_enter("{%d,%s},%zu/%zu,%d",
540 	       call->debug_id, rxrpc_call_states[call->state],
541 	       *_offset, size, want_more);
542 
543 	ASSERTCMP(*_offset, <=, size);
544 	ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
545 
546 	iov.iov_base = buf + *_offset;
547 	iov.iov_len = size - *_offset;
548 	iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
549 
550 	lock_sock(sock->sk);
551 
552 	switch (call->state) {
553 	case RXRPC_CALL_CLIENT_RECV_REPLY:
554 	case RXRPC_CALL_SERVER_RECV_REQUEST:
555 	case RXRPC_CALL_SERVER_ACK_REQUEST:
556 		ret = rxrpc_recvmsg_data(sock, call, NULL, &iter, size, 0,
557 					 _offset);
558 		if (ret < 0)
559 			goto out;
560 
561 		/* We can only reach here with a partially full buffer if we
562 		 * have reached the end of the data.  We must otherwise have a
563 		 * full buffer or have been given -EAGAIN.
564 		 */
565 		if (ret == 1) {
566 			if (*_offset < size)
567 				goto short_data;
568 			if (!want_more)
569 				goto read_phase_complete;
570 			ret = 0;
571 			goto out;
572 		}
573 
574 		if (!want_more)
575 			goto excess_data;
576 		goto out;
577 
578 	case RXRPC_CALL_COMPLETE:
579 		goto call_complete;
580 
581 	default:
582 		ret = -EINPROGRESS;
583 		goto out;
584 	}
585 
586 read_phase_complete:
587 	ret = 1;
588 out:
589 	release_sock(sock->sk);
590 	_leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
591 	return ret;
592 
593 short_data:
594 	ret = -EBADMSG;
595 	goto out;
596 excess_data:
597 	ret = -EMSGSIZE;
598 	goto out;
599 call_complete:
600 	*_abort = call->abort_code;
601 	ret = call->error;
602 	if (call->completion == RXRPC_CALL_SUCCEEDED) {
603 		ret = 1;
604 		if (size > 0)
605 			ret = -ECONNRESET;
606 	}
607 	goto out;
608 }
609 EXPORT_SYMBOL(rxrpc_kernel_recv_data);
610