xref: /openbmc/linux/net/rxrpc/recvmsg.c (revision f5c17aae)
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11 
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13 
14 #include <linux/net.h>
15 #include <linux/skbuff.h>
16 #include <linux/export.h>
17 #include <net/sock.h>
18 #include <net/af_rxrpc.h>
19 #include "ar-internal.h"
20 
21 /*
22  * removal a call's user ID from the socket tree to make the user ID available
23  * again and so that it won't be seen again in association with that call
24  */
25 void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
26 {
27 	_debug("RELEASE CALL %d", call->debug_id);
28 
29 	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
30 		write_lock_bh(&rx->call_lock);
31 		rb_erase(&call->sock_node, &call->socket->calls);
32 		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
33 		write_unlock_bh(&rx->call_lock);
34 	}
35 
36 	read_lock_bh(&call->state_lock);
37 	if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
38 	    !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
39 		rxrpc_queue_call(call);
40 	read_unlock_bh(&call->state_lock);
41 }
42 
43 /*
44  * receive a message from an RxRPC socket
45  * - we need to be careful about two or more threads calling recvmsg
46  *   simultaneously
47  */
48 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
49 		  int flags)
50 {
51 	struct rxrpc_skb_priv *sp;
52 	struct rxrpc_call *call = NULL, *continue_call = NULL;
53 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
54 	struct sk_buff *skb;
55 	long timeo;
56 	int copy, ret, ullen, offset, copied = 0;
57 	u32 abort_code;
58 
59 	DEFINE_WAIT(wait);
60 
61 	_enter(",,,%zu,%d", len, flags);
62 
63 	if (flags & (MSG_OOB | MSG_TRUNC))
64 		return -EOPNOTSUPP;
65 
66 	ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
67 
68 	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
69 	msg->msg_flags |= MSG_MORE;
70 
71 	lock_sock(&rx->sk);
72 
73 	for (;;) {
74 		/* return immediately if a client socket has no outstanding
75 		 * calls */
76 		if (RB_EMPTY_ROOT(&rx->calls)) {
77 			if (copied)
78 				goto out;
79 			if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
80 				release_sock(&rx->sk);
81 				if (continue_call)
82 					rxrpc_put_call(continue_call);
83 				return -ENODATA;
84 			}
85 		}
86 
87 		/* get the next message on the Rx queue */
88 		skb = skb_peek(&rx->sk.sk_receive_queue);
89 		if (!skb) {
90 			/* nothing remains on the queue */
91 			if (copied &&
92 			    (flags & MSG_PEEK || timeo == 0))
93 				goto out;
94 
95 			/* wait for a message to turn up */
96 			release_sock(&rx->sk);
97 			prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
98 						  TASK_INTERRUPTIBLE);
99 			ret = sock_error(&rx->sk);
100 			if (ret)
101 				goto wait_error;
102 
103 			if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
104 				if (signal_pending(current))
105 					goto wait_interrupted;
106 				timeo = schedule_timeout(timeo);
107 			}
108 			finish_wait(sk_sleep(&rx->sk), &wait);
109 			lock_sock(&rx->sk);
110 			continue;
111 		}
112 
113 	peek_next_packet:
114 		rxrpc_see_skb(skb);
115 		sp = rxrpc_skb(skb);
116 		call = sp->call;
117 		ASSERT(call != NULL);
118 
119 		_debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
120 
121 		/* make sure we wait for the state to be updated in this call */
122 		spin_lock_bh(&call->lock);
123 		spin_unlock_bh(&call->lock);
124 
125 		if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
126 			_debug("packet from released call");
127 			if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
128 				BUG();
129 			rxrpc_free_skb(skb);
130 			continue;
131 		}
132 
133 		/* determine whether to continue last data receive */
134 		if (continue_call) {
135 			_debug("maybe cont");
136 			if (call != continue_call ||
137 			    skb->mark != RXRPC_SKB_MARK_DATA) {
138 				release_sock(&rx->sk);
139 				rxrpc_put_call(continue_call);
140 				_leave(" = %d [noncont]", copied);
141 				return copied;
142 			}
143 		}
144 
145 		rxrpc_get_call(call);
146 
147 		/* copy the peer address and timestamp */
148 		if (!continue_call) {
149 			if (msg->msg_name) {
150 				size_t len =
151 					sizeof(call->conn->params.peer->srx);
152 				memcpy(msg->msg_name,
153 				       &call->conn->params.peer->srx, len);
154 				msg->msg_namelen = len;
155 			}
156 			sock_recv_timestamp(msg, &rx->sk, skb);
157 		}
158 
159 		/* receive the message */
160 		if (skb->mark != RXRPC_SKB_MARK_DATA)
161 			goto receive_non_data_message;
162 
163 		_debug("recvmsg DATA #%u { %d, %d }",
164 		       sp->hdr.seq, skb->len, sp->offset);
165 
166 		if (!continue_call) {
167 			/* only set the control data once per recvmsg() */
168 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
169 				       ullen, &call->user_call_ID);
170 			if (ret < 0)
171 				goto copy_error;
172 			ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
173 		}
174 
175 		ASSERTCMP(sp->hdr.seq, >=, call->rx_data_recv);
176 		ASSERTCMP(sp->hdr.seq, <=, call->rx_data_recv + 1);
177 		call->rx_data_recv = sp->hdr.seq;
178 
179 		ASSERTCMP(sp->hdr.seq, >, call->rx_data_eaten);
180 
181 		offset = sp->offset;
182 		copy = skb->len - offset;
183 		if (copy > len - copied)
184 			copy = len - copied;
185 
186 		ret = skb_copy_datagram_msg(skb, offset, msg, copy);
187 
188 		if (ret < 0)
189 			goto copy_error;
190 
191 		/* handle piecemeal consumption of data packets */
192 		_debug("copied %d+%d", copy, copied);
193 
194 		offset += copy;
195 		copied += copy;
196 
197 		if (!(flags & MSG_PEEK))
198 			sp->offset = offset;
199 
200 		if (sp->offset < skb->len) {
201 			_debug("buffer full");
202 			ASSERTCMP(copied, ==, len);
203 			break;
204 		}
205 
206 		/* we transferred the whole data packet */
207 		if (!(flags & MSG_PEEK))
208 			rxrpc_kernel_data_consumed(call, skb);
209 
210 		if (sp->hdr.flags & RXRPC_LAST_PACKET) {
211 			_debug("last");
212 			if (rxrpc_conn_is_client(call->conn)) {
213 				 /* last byte of reply received */
214 				ret = copied;
215 				goto terminal_message;
216 			}
217 
218 			/* last bit of request received */
219 			if (!(flags & MSG_PEEK)) {
220 				_debug("eat packet");
221 				if (skb_dequeue(&rx->sk.sk_receive_queue) !=
222 				    skb)
223 					BUG();
224 				rxrpc_free_skb(skb);
225 			}
226 			msg->msg_flags &= ~MSG_MORE;
227 			break;
228 		}
229 
230 		/* move on to the next data message */
231 		_debug("next");
232 		if (!continue_call)
233 			continue_call = sp->call;
234 		else
235 			rxrpc_put_call(call);
236 		call = NULL;
237 
238 		if (flags & MSG_PEEK) {
239 			_debug("peek next");
240 			skb = skb->next;
241 			if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
242 				break;
243 			goto peek_next_packet;
244 		}
245 
246 		_debug("eat packet");
247 		if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
248 			BUG();
249 		rxrpc_free_skb(skb);
250 	}
251 
252 	/* end of non-terminal data packet reception for the moment */
253 	_debug("end rcv data");
254 out:
255 	release_sock(&rx->sk);
256 	if (call)
257 		rxrpc_put_call(call);
258 	if (continue_call)
259 		rxrpc_put_call(continue_call);
260 	_leave(" = %d [data]", copied);
261 	return copied;
262 
263 	/* handle non-DATA messages such as aborts, incoming connections and
264 	 * final ACKs */
265 receive_non_data_message:
266 	_debug("non-data");
267 
268 	if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
269 		_debug("RECV NEW CALL");
270 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
271 		if (ret < 0)
272 			goto copy_error;
273 		if (!(flags & MSG_PEEK)) {
274 			if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
275 				BUG();
276 			rxrpc_free_skb(skb);
277 		}
278 		goto out;
279 	}
280 
281 	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
282 		       ullen, &call->user_call_ID);
283 	if (ret < 0)
284 		goto copy_error;
285 	ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
286 
287 	switch (skb->mark) {
288 	case RXRPC_SKB_MARK_DATA:
289 		BUG();
290 	case RXRPC_SKB_MARK_FINAL_ACK:
291 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
292 		break;
293 	case RXRPC_SKB_MARK_BUSY:
294 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
295 		break;
296 	case RXRPC_SKB_MARK_REMOTE_ABORT:
297 		abort_code = call->abort_code;
298 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
299 		break;
300 	case RXRPC_SKB_MARK_LOCAL_ABORT:
301 		abort_code = call->abort_code;
302 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
303 		if (call->error) {
304 			abort_code = call->error;
305 			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
306 				       &abort_code);
307 		}
308 		break;
309 	case RXRPC_SKB_MARK_NET_ERROR:
310 		_debug("RECV NET ERROR %d", sp->error);
311 		abort_code = sp->error;
312 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
313 		break;
314 	case RXRPC_SKB_MARK_LOCAL_ERROR:
315 		_debug("RECV LOCAL ERROR %d", sp->error);
316 		abort_code = sp->error;
317 		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
318 			       &abort_code);
319 		break;
320 	default:
321 		pr_err("Unknown packet mark %u\n", skb->mark);
322 		BUG();
323 		break;
324 	}
325 
326 	if (ret < 0)
327 		goto copy_error;
328 
329 terminal_message:
330 	_debug("terminal");
331 	msg->msg_flags &= ~MSG_MORE;
332 	msg->msg_flags |= MSG_EOR;
333 
334 	if (!(flags & MSG_PEEK)) {
335 		_net("free terminal skb %p", skb);
336 		if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
337 			BUG();
338 		rxrpc_free_skb(skb);
339 		rxrpc_remove_user_ID(rx, call);
340 	}
341 
342 	release_sock(&rx->sk);
343 	rxrpc_put_call(call);
344 	if (continue_call)
345 		rxrpc_put_call(continue_call);
346 	_leave(" = %d", ret);
347 	return ret;
348 
349 copy_error:
350 	_debug("copy error");
351 	release_sock(&rx->sk);
352 	rxrpc_put_call(call);
353 	if (continue_call)
354 		rxrpc_put_call(continue_call);
355 	_leave(" = %d", ret);
356 	return ret;
357 
358 wait_interrupted:
359 	ret = sock_intr_errno(timeo);
360 wait_error:
361 	finish_wait(sk_sleep(&rx->sk), &wait);
362 	if (continue_call)
363 		rxrpc_put_call(continue_call);
364 	if (copied)
365 		copied = ret;
366 	_leave(" = %d [waitfail %d]", copied, ret);
367 	return copied;
368 
369 }
370 
371 /**
372  * rxrpc_kernel_is_data_last - Determine if data message is last one
373  * @skb: Message holding data
374  *
375  * Determine if data message is last one for the parent call.
376  */
377 bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
378 {
379 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
380 
381 	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
382 
383 	return sp->hdr.flags & RXRPC_LAST_PACKET;
384 }
385 
386 EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
387 
388 /**
389  * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
390  * @skb: Message indicating an abort
391  *
392  * Get the abort code from an RxRPC abort message.
393  */
394 u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
395 {
396 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
397 
398 	switch (skb->mark) {
399 	case RXRPC_SKB_MARK_REMOTE_ABORT:
400 	case RXRPC_SKB_MARK_LOCAL_ABORT:
401 		return sp->call->abort_code;
402 	default:
403 		BUG();
404 	}
405 }
406 
407 EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
408 
409 /**
410  * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
411  * @skb: Message indicating an error
412  *
413  * Get the error number from an RxRPC error message.
414  */
415 int rxrpc_kernel_get_error_number(struct sk_buff *skb)
416 {
417 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
418 
419 	return sp->error;
420 }
421 
422 EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
423