xref: /openbmc/linux/net/rxrpc/sendmsg.c (revision 96b4059f43ce69e9c590f77d6ce3e99888d5cfe6)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* AF_RXRPC sendmsg() implementation.
3  *
4  * Copyright (C) 2007, 2016 Red Hat, Inc. All Rights Reserved.
5  * Written by David Howells (dhowells@redhat.com)
6  */
7 
8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
9 
10 #include <linux/net.h>
11 #include <linux/gfp.h>
12 #include <linux/skbuff.h>
13 #include <linux/export.h>
14 #include <linux/sched/signal.h>
15 
16 #include <net/sock.h>
17 #include <net/af_rxrpc.h>
18 #include "ar-internal.h"
19 
20 /*
21  * Propose an abort to be made in the I/O thread.
22  */
23 bool rxrpc_propose_abort(struct rxrpc_call *call, s32 abort_code, int error,
24 			 enum rxrpc_abort_reason why)
25 {
26 	_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
27 
28 	if (!call->send_abort && !rxrpc_call_is_complete(call)) {
29 		call->send_abort_why = why;
30 		call->send_abort_err = error;
31 		call->send_abort_seq = 0;
32 		/* Request abort locklessly vs rxrpc_input_call_event(). */
33 		smp_store_release(&call->send_abort, abort_code);
34 		rxrpc_poke_call(call, rxrpc_call_poke_abort);
35 		return true;
36 	}
37 
38 	return false;
39 }
40 
41 /*
42  * Return true if there's sufficient Tx queue space.
43  */
44 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
45 {
46 	if (_tx_win)
47 		*_tx_win = call->tx_bottom;
48 	return call->tx_prepared - call->tx_bottom < 256;
49 }
50 
51 /*
52  * Wait for space to appear in the Tx queue or a signal to occur.
53  */
54 static int rxrpc_wait_for_tx_window_intr(struct rxrpc_sock *rx,
55 					 struct rxrpc_call *call,
56 					 long *timeo)
57 {
58 	for (;;) {
59 		set_current_state(TASK_INTERRUPTIBLE);
60 		if (rxrpc_check_tx_space(call, NULL))
61 			return 0;
62 
63 		if (rxrpc_call_is_complete(call))
64 			return call->error;
65 
66 		if (signal_pending(current))
67 			return sock_intr_errno(*timeo);
68 
69 		trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
70 		*timeo = schedule_timeout(*timeo);
71 	}
72 }
73 
74 /*
75  * Wait for space to appear in the Tx queue uninterruptibly, but with
76  * a timeout of 2*RTT if no progress was made and a signal occurred.
77  */
78 static int rxrpc_wait_for_tx_window_waitall(struct rxrpc_sock *rx,
79 					    struct rxrpc_call *call)
80 {
81 	rxrpc_seq_t tx_start, tx_win;
82 	signed long rtt, timeout;
83 
84 	rtt = READ_ONCE(call->peer->srtt_us) >> 3;
85 	rtt = usecs_to_jiffies(rtt) * 2;
86 	if (rtt < 2)
87 		rtt = 2;
88 
89 	timeout = rtt;
90 	tx_start = smp_load_acquire(&call->acks_hard_ack);
91 
92 	for (;;) {
93 		set_current_state(TASK_UNINTERRUPTIBLE);
94 
95 		if (rxrpc_check_tx_space(call, &tx_win))
96 			return 0;
97 
98 		if (rxrpc_call_is_complete(call))
99 			return call->error;
100 
101 		if (timeout == 0 &&
102 		    tx_win == tx_start && signal_pending(current))
103 			return -EINTR;
104 
105 		if (tx_win != tx_start) {
106 			timeout = rtt;
107 			tx_start = tx_win;
108 		}
109 
110 		trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
111 		timeout = schedule_timeout(timeout);
112 	}
113 }
114 
115 /*
116  * Wait for space to appear in the Tx queue uninterruptibly.
117  */
118 static int rxrpc_wait_for_tx_window_nonintr(struct rxrpc_sock *rx,
119 					    struct rxrpc_call *call,
120 					    long *timeo)
121 {
122 	for (;;) {
123 		set_current_state(TASK_UNINTERRUPTIBLE);
124 		if (rxrpc_check_tx_space(call, NULL))
125 			return 0;
126 
127 		if (rxrpc_call_is_complete(call))
128 			return call->error;
129 
130 		trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
131 		*timeo = schedule_timeout(*timeo);
132 	}
133 }
134 
135 /*
136  * wait for space to appear in the transmit/ACK window
137  * - caller holds the socket locked
138  */
139 static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
140 				    struct rxrpc_call *call,
141 				    long *timeo,
142 				    bool waitall)
143 {
144 	DECLARE_WAITQUEUE(myself, current);
145 	int ret;
146 
147 	_enter(",{%u,%u,%u,%u}",
148 	       call->tx_bottom, call->acks_hard_ack, call->tx_top, call->tx_winsize);
149 
150 	add_wait_queue(&call->waitq, &myself);
151 
152 	switch (call->interruptibility) {
153 	case RXRPC_INTERRUPTIBLE:
154 		if (waitall)
155 			ret = rxrpc_wait_for_tx_window_waitall(rx, call);
156 		else
157 			ret = rxrpc_wait_for_tx_window_intr(rx, call, timeo);
158 		break;
159 	case RXRPC_PREINTERRUPTIBLE:
160 	case RXRPC_UNINTERRUPTIBLE:
161 	default:
162 		ret = rxrpc_wait_for_tx_window_nonintr(rx, call, timeo);
163 		break;
164 	}
165 
166 	remove_wait_queue(&call->waitq, &myself);
167 	set_current_state(TASK_RUNNING);
168 	_leave(" = %d", ret);
169 	return ret;
170 }
171 
172 /*
173  * Notify the owner of the call that the transmit phase is ended and the last
174  * packet has been queued.
175  */
176 static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
177 				rxrpc_notify_end_tx_t notify_end_tx)
178 {
179 	if (notify_end_tx)
180 		notify_end_tx(&rx->sk, call, call->user_call_ID);
181 }
182 
183 /*
184  * Queue a DATA packet for transmission, set the resend timeout and send
185  * the packet immediately.  Returns the error from rxrpc_send_data_packet()
186  * in case the caller wants to do something with it.
187  */
188 static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
189 			       struct rxrpc_txbuf *txb,
190 			       rxrpc_notify_end_tx_t notify_end_tx)
191 {
192 	rxrpc_seq_t seq = txb->seq;
193 	bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
194 
195 	rxrpc_inc_stat(call->rxnet, stat_tx_data);
196 
197 	ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
198 
199 	/* We have to set the timestamp before queueing as the retransmit
200 	 * algorithm can see the packet as soon as we queue it.
201 	 */
202 	txb->last_sent = ktime_get_real();
203 
204 	if (last)
205 		trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
206 	else
207 		trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
208 
209 	/* Add the packet to the call's output buffer */
210 	spin_lock(&call->tx_lock);
211 	poke = list_empty(&call->tx_sendmsg);
212 	list_add_tail(&txb->call_link, &call->tx_sendmsg);
213 	call->tx_prepared = seq;
214 	if (last)
215 		rxrpc_notify_end_tx(rx, call, notify_end_tx);
216 	spin_unlock(&call->tx_lock);
217 
218 	if (poke)
219 		rxrpc_poke_call(call, rxrpc_call_poke_start);
220 }
221 
222 /*
223  * send data through a socket
224  * - must be called in process context
225  * - The caller holds the call user access mutex, but not the socket lock.
226  */
227 static int rxrpc_send_data(struct rxrpc_sock *rx,
228 			   struct rxrpc_call *call,
229 			   struct msghdr *msg, size_t len,
230 			   rxrpc_notify_end_tx_t notify_end_tx,
231 			   bool *_dropped_lock)
232 {
233 	struct rxrpc_txbuf *txb;
234 	struct sock *sk = &rx->sk;
235 	enum rxrpc_call_state state;
236 	long timeo;
237 	bool more = msg->msg_flags & MSG_MORE;
238 	int ret, copied = 0;
239 
240 	timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
241 
242 	/* this should be in poll */
243 	sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
244 
245 reload:
246 	ret = -EPIPE;
247 	if (sk->sk_shutdown & SEND_SHUTDOWN)
248 		goto maybe_error;
249 	state = rxrpc_call_state(call);
250 	ret = -ESHUTDOWN;
251 	if (state >= RXRPC_CALL_COMPLETE)
252 		goto maybe_error;
253 	ret = -EPROTO;
254 	if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
255 	    state != RXRPC_CALL_SERVER_ACK_REQUEST &&
256 	    state != RXRPC_CALL_SERVER_SEND_REPLY) {
257 		/* Request phase complete for this client call */
258 		trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
259 				  call->cid, call->call_id, call->rx_consumed,
260 				  0, -EPROTO);
261 		goto maybe_error;
262 	}
263 
264 	ret = -EMSGSIZE;
265 	if (call->tx_total_len != -1) {
266 		if (len - copied > call->tx_total_len)
267 			goto maybe_error;
268 		if (!more && len - copied != call->tx_total_len)
269 			goto maybe_error;
270 	}
271 
272 	txb = call->tx_pending;
273 	call->tx_pending = NULL;
274 	if (txb)
275 		rxrpc_see_txbuf(txb, rxrpc_txbuf_see_send_more);
276 
277 	do {
278 		if (!txb) {
279 			size_t remain, bufsize, chunk, offset;
280 
281 			_debug("alloc");
282 
283 			if (!rxrpc_check_tx_space(call, NULL))
284 				goto wait_for_space;
285 
286 			/* Work out the maximum size of a packet.  Assume that
287 			 * the security header is going to be in the padded
288 			 * region (enc blocksize), but the trailer is not.
289 			 */
290 			remain = more ? INT_MAX : msg_data_left(msg);
291 			ret = call->conn->security->how_much_data(call, remain,
292 								  &bufsize, &chunk, &offset);
293 			if (ret < 0)
294 				goto maybe_error;
295 
296 			_debug("SIZE: %zu/%zu @%zu", chunk, bufsize, offset);
297 
298 			/* create a buffer that we can retain until it's ACK'd */
299 			ret = -ENOMEM;
300 			txb = rxrpc_alloc_txbuf(call, RXRPC_PACKET_TYPE_DATA,
301 						GFP_KERNEL);
302 			if (!txb)
303 				goto maybe_error;
304 
305 			txb->offset = offset;
306 			txb->space -= offset;
307 			txb->space = min_t(size_t, chunk, txb->space);
308 		}
309 
310 		_debug("append");
311 
312 		/* append next segment of data to the current buffer */
313 		if (msg_data_left(msg) > 0) {
314 			size_t copy = min_t(size_t, txb->space, msg_data_left(msg));
315 
316 			_debug("add %zu", copy);
317 			if (!copy_from_iter_full(txb->data + txb->offset, copy,
318 						 &msg->msg_iter))
319 				goto efault;
320 			_debug("added");
321 			txb->space -= copy;
322 			txb->len += copy;
323 			txb->offset += copy;
324 			copied += copy;
325 			if (call->tx_total_len != -1)
326 				call->tx_total_len -= copy;
327 		}
328 
329 		/* check for the far side aborting the call or a network error
330 		 * occurring */
331 		if (rxrpc_call_is_complete(call))
332 			goto call_terminated;
333 
334 		/* add the packet to the send queue if it's now full */
335 		if (!txb->space ||
336 		    (msg_data_left(msg) == 0 && !more)) {
337 			if (msg_data_left(msg) == 0 && !more) {
338 				txb->wire.flags |= RXRPC_LAST_PACKET;
339 				__set_bit(RXRPC_TXBUF_LAST, &txb->flags);
340 			}
341 			else if (call->tx_top - call->acks_hard_ack <
342 				 call->tx_winsize)
343 				txb->wire.flags |= RXRPC_MORE_PACKETS;
344 
345 			ret = call->security->secure_packet(call, txb);
346 			if (ret < 0)
347 				goto out;
348 
349 			rxrpc_queue_packet(rx, call, txb, notify_end_tx);
350 			txb = NULL;
351 		}
352 	} while (msg_data_left(msg) > 0);
353 
354 success:
355 	ret = copied;
356 	if (rxrpc_call_is_complete(call) &&
357 	    call->error < 0)
358 		ret = call->error;
359 out:
360 	call->tx_pending = txb;
361 	_leave(" = %d", ret);
362 	return ret;
363 
364 call_terminated:
365 	rxrpc_put_txbuf(txb, rxrpc_txbuf_put_send_aborted);
366 	_leave(" = %d", call->error);
367 	return call->error;
368 
369 maybe_error:
370 	if (copied)
371 		goto success;
372 	goto out;
373 
374 efault:
375 	ret = -EFAULT;
376 	goto out;
377 
378 wait_for_space:
379 	ret = -EAGAIN;
380 	if (msg->msg_flags & MSG_DONTWAIT)
381 		goto maybe_error;
382 	mutex_unlock(&call->user_mutex);
383 	*_dropped_lock = true;
384 	ret = rxrpc_wait_for_tx_window(rx, call, &timeo,
385 				       msg->msg_flags & MSG_WAITALL);
386 	if (ret < 0)
387 		goto maybe_error;
388 	if (call->interruptibility == RXRPC_INTERRUPTIBLE) {
389 		if (mutex_lock_interruptible(&call->user_mutex) < 0) {
390 			ret = sock_intr_errno(timeo);
391 			goto maybe_error;
392 		}
393 	} else {
394 		mutex_lock(&call->user_mutex);
395 	}
396 	*_dropped_lock = false;
397 	goto reload;
398 }
399 
400 /*
401  * extract control messages from the sendmsg() control buffer
402  */
403 static int rxrpc_sendmsg_cmsg(struct msghdr *msg, struct rxrpc_send_params *p)
404 {
405 	struct cmsghdr *cmsg;
406 	bool got_user_ID = false;
407 	int len;
408 
409 	if (msg->msg_controllen == 0)
410 		return -EINVAL;
411 
412 	for_each_cmsghdr(cmsg, msg) {
413 		if (!CMSG_OK(msg, cmsg))
414 			return -EINVAL;
415 
416 		len = cmsg->cmsg_len - sizeof(struct cmsghdr);
417 		_debug("CMSG %d, %d, %d",
418 		       cmsg->cmsg_level, cmsg->cmsg_type, len);
419 
420 		if (cmsg->cmsg_level != SOL_RXRPC)
421 			continue;
422 
423 		switch (cmsg->cmsg_type) {
424 		case RXRPC_USER_CALL_ID:
425 			if (msg->msg_flags & MSG_CMSG_COMPAT) {
426 				if (len != sizeof(u32))
427 					return -EINVAL;
428 				p->call.user_call_ID = *(u32 *)CMSG_DATA(cmsg);
429 			} else {
430 				if (len != sizeof(unsigned long))
431 					return -EINVAL;
432 				p->call.user_call_ID = *(unsigned long *)
433 					CMSG_DATA(cmsg);
434 			}
435 			got_user_ID = true;
436 			break;
437 
438 		case RXRPC_ABORT:
439 			if (p->command != RXRPC_CMD_SEND_DATA)
440 				return -EINVAL;
441 			p->command = RXRPC_CMD_SEND_ABORT;
442 			if (len != sizeof(p->abort_code))
443 				return -EINVAL;
444 			p->abort_code = *(unsigned int *)CMSG_DATA(cmsg);
445 			if (p->abort_code == 0)
446 				return -EINVAL;
447 			break;
448 
449 		case RXRPC_CHARGE_ACCEPT:
450 			if (p->command != RXRPC_CMD_SEND_DATA)
451 				return -EINVAL;
452 			p->command = RXRPC_CMD_CHARGE_ACCEPT;
453 			if (len != 0)
454 				return -EINVAL;
455 			break;
456 
457 		case RXRPC_EXCLUSIVE_CALL:
458 			p->exclusive = true;
459 			if (len != 0)
460 				return -EINVAL;
461 			break;
462 
463 		case RXRPC_UPGRADE_SERVICE:
464 			p->upgrade = true;
465 			if (len != 0)
466 				return -EINVAL;
467 			break;
468 
469 		case RXRPC_TX_LENGTH:
470 			if (p->call.tx_total_len != -1 || len != sizeof(__s64))
471 				return -EINVAL;
472 			p->call.tx_total_len = *(__s64 *)CMSG_DATA(cmsg);
473 			if (p->call.tx_total_len < 0)
474 				return -EINVAL;
475 			break;
476 
477 		case RXRPC_SET_CALL_TIMEOUT:
478 			if (len & 3 || len < 4 || len > 12)
479 				return -EINVAL;
480 			memcpy(&p->call.timeouts, CMSG_DATA(cmsg), len);
481 			p->call.nr_timeouts = len / 4;
482 			if (p->call.timeouts.hard > INT_MAX / HZ)
483 				return -ERANGE;
484 			if (p->call.nr_timeouts >= 2 && p->call.timeouts.idle > 60 * 60 * 1000)
485 				return -ERANGE;
486 			if (p->call.nr_timeouts >= 3 && p->call.timeouts.normal > 60 * 60 * 1000)
487 				return -ERANGE;
488 			break;
489 
490 		default:
491 			return -EINVAL;
492 		}
493 	}
494 
495 	if (!got_user_ID)
496 		return -EINVAL;
497 	if (p->call.tx_total_len != -1 && p->command != RXRPC_CMD_SEND_DATA)
498 		return -EINVAL;
499 	_leave(" = 0");
500 	return 0;
501 }
502 
503 /*
504  * Create a new client call for sendmsg().
505  * - Called with the socket lock held, which it must release.
506  * - If it returns a call, the call's lock will need releasing by the caller.
507  */
508 static struct rxrpc_call *
509 rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
510 				  struct rxrpc_send_params *p)
511 	__releases(&rx->sk.sk_lock.slock)
512 	__acquires(&call->user_mutex)
513 {
514 	struct rxrpc_conn_parameters cp;
515 	struct rxrpc_call *call;
516 	struct key *key;
517 
518 	DECLARE_SOCKADDR(struct sockaddr_rxrpc *, srx, msg->msg_name);
519 
520 	_enter("");
521 
522 	if (!msg->msg_name) {
523 		release_sock(&rx->sk);
524 		return ERR_PTR(-EDESTADDRREQ);
525 	}
526 
527 	key = rx->key;
528 	if (key && !rx->key->payload.data[0])
529 		key = NULL;
530 
531 	memset(&cp, 0, sizeof(cp));
532 	cp.local		= rx->local;
533 	cp.key			= rx->key;
534 	cp.security_level	= rx->min_sec_level;
535 	cp.exclusive		= rx->exclusive | p->exclusive;
536 	cp.upgrade		= p->upgrade;
537 	cp.service_id		= srx->srx_service;
538 	call = rxrpc_new_client_call(rx, &cp, srx, &p->call, GFP_KERNEL,
539 				     atomic_inc_return(&rxrpc_debug_id));
540 	/* The socket is now unlocked */
541 
542 	_leave(" = %p\n", call);
543 	return call;
544 }
545 
546 /*
547  * send a message forming part of a client call through an RxRPC socket
548  * - caller holds the socket locked
549  * - the socket may be either a client socket or a server socket
550  */
551 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
552 	__releases(&rx->sk.sk_lock.slock)
553 {
554 	struct rxrpc_call *call;
555 	unsigned long now, j;
556 	bool dropped_lock = false;
557 	int ret;
558 
559 	struct rxrpc_send_params p = {
560 		.call.tx_total_len	= -1,
561 		.call.user_call_ID	= 0,
562 		.call.nr_timeouts	= 0,
563 		.call.interruptibility	= RXRPC_INTERRUPTIBLE,
564 		.abort_code		= 0,
565 		.command		= RXRPC_CMD_SEND_DATA,
566 		.exclusive		= false,
567 		.upgrade		= false,
568 	};
569 
570 	_enter("");
571 
572 	ret = rxrpc_sendmsg_cmsg(msg, &p);
573 	if (ret < 0)
574 		goto error_release_sock;
575 
576 	if (p.command == RXRPC_CMD_CHARGE_ACCEPT) {
577 		ret = -EINVAL;
578 		if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
579 			goto error_release_sock;
580 		ret = rxrpc_user_charge_accept(rx, p.call.user_call_ID);
581 		goto error_release_sock;
582 	}
583 
584 	call = rxrpc_find_call_by_user_ID(rx, p.call.user_call_ID);
585 	if (!call) {
586 		ret = -EBADSLT;
587 		if (p.command != RXRPC_CMD_SEND_DATA)
588 			goto error_release_sock;
589 		call = rxrpc_new_client_call_for_sendmsg(rx, msg, &p);
590 		/* The socket is now unlocked... */
591 		if (IS_ERR(call))
592 			return PTR_ERR(call);
593 		/* ... and we have the call lock. */
594 		ret = 0;
595 		if (rxrpc_call_is_complete(call))
596 			goto out_put_unlock;
597 	} else {
598 		switch (rxrpc_call_state(call)) {
599 		case RXRPC_CALL_UNINITIALISED:
600 		case RXRPC_CALL_CLIENT_AWAIT_CONN:
601 		case RXRPC_CALL_SERVER_PREALLOC:
602 		case RXRPC_CALL_SERVER_SECURING:
603 			rxrpc_put_call(call, rxrpc_call_put_sendmsg);
604 			ret = -EBUSY;
605 			goto error_release_sock;
606 		default:
607 			break;
608 		}
609 
610 		ret = mutex_lock_interruptible(&call->user_mutex);
611 		release_sock(&rx->sk);
612 		if (ret < 0) {
613 			ret = -ERESTARTSYS;
614 			goto error_put;
615 		}
616 
617 		if (p.call.tx_total_len != -1) {
618 			ret = -EINVAL;
619 			if (call->tx_total_len != -1 ||
620 			    call->tx_pending ||
621 			    call->tx_top != 0)
622 				goto out_put_unlock;
623 			call->tx_total_len = p.call.tx_total_len;
624 		}
625 	}
626 
627 	switch (p.call.nr_timeouts) {
628 	case 3:
629 		j = msecs_to_jiffies(p.call.timeouts.normal);
630 		if (p.call.timeouts.normal > 0 && j == 0)
631 			j = 1;
632 		WRITE_ONCE(call->next_rx_timo, j);
633 		fallthrough;
634 	case 2:
635 		j = msecs_to_jiffies(p.call.timeouts.idle);
636 		if (p.call.timeouts.idle > 0 && j == 0)
637 			j = 1;
638 		WRITE_ONCE(call->next_req_timo, j);
639 		fallthrough;
640 	case 1:
641 		if (p.call.timeouts.hard > 0) {
642 			j = msecs_to_jiffies(p.call.timeouts.hard);
643 			now = jiffies;
644 			j += now;
645 			WRITE_ONCE(call->expect_term_by, j);
646 			rxrpc_reduce_call_timer(call, j, now,
647 						rxrpc_timer_set_for_hard);
648 		}
649 		break;
650 	}
651 
652 	if (rxrpc_call_is_complete(call)) {
653 		/* it's too late for this call */
654 		ret = -ESHUTDOWN;
655 	} else if (p.command == RXRPC_CMD_SEND_ABORT) {
656 		rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED,
657 				    rxrpc_abort_call_sendmsg);
658 		ret = 0;
659 	} else if (p.command != RXRPC_CMD_SEND_DATA) {
660 		ret = -EINVAL;
661 	} else {
662 		ret = rxrpc_send_data(rx, call, msg, len, NULL, &dropped_lock);
663 	}
664 
665 out_put_unlock:
666 	if (!dropped_lock)
667 		mutex_unlock(&call->user_mutex);
668 error_put:
669 	rxrpc_put_call(call, rxrpc_call_put_sendmsg);
670 	_leave(" = %d", ret);
671 	return ret;
672 
673 error_release_sock:
674 	release_sock(&rx->sk);
675 	return ret;
676 }
677 
678 /**
679  * rxrpc_kernel_send_data - Allow a kernel service to send data on a call
680  * @sock: The socket the call is on
681  * @call: The call to send data through
682  * @msg: The data to send
683  * @len: The amount of data to send
684  * @notify_end_tx: Notification that the last packet is queued.
685  *
686  * Allow a kernel service to send data on a call.  The call must be in an state
687  * appropriate to sending data.  No control data should be supplied in @msg,
688  * nor should an address be supplied.  MSG_MORE should be flagged if there's
689  * more data to come, otherwise this data will end the transmission phase.
690  */
691 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
692 			   struct msghdr *msg, size_t len,
693 			   rxrpc_notify_end_tx_t notify_end_tx)
694 {
695 	bool dropped_lock = false;
696 	int ret;
697 
698 	_enter("{%d},", call->debug_id);
699 
700 	ASSERTCMP(msg->msg_name, ==, NULL);
701 	ASSERTCMP(msg->msg_control, ==, NULL);
702 
703 	mutex_lock(&call->user_mutex);
704 
705 	ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
706 			      notify_end_tx, &dropped_lock);
707 	if (ret == -ESHUTDOWN)
708 		ret = call->error;
709 
710 	if (!dropped_lock)
711 		mutex_unlock(&call->user_mutex);
712 	_leave(" = %d", ret);
713 	return ret;
714 }
715 EXPORT_SYMBOL(rxrpc_kernel_send_data);
716 
717 /**
718  * rxrpc_kernel_abort_call - Allow a kernel service to abort a call
719  * @sock: The socket the call is on
720  * @call: The call to be aborted
721  * @abort_code: The abort code to stick into the ABORT packet
722  * @error: Local error value
723  * @why: Indication as to why.
724  *
725  * Allow a kernel service to abort a call, if it's still in an abortable state
726  * and return true if the call was aborted, false if it was already complete.
727  */
728 bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
729 			     u32 abort_code, int error, enum rxrpc_abort_reason why)
730 {
731 	bool aborted;
732 
733 	_enter("{%d},%d,%d,%u", call->debug_id, abort_code, error, why);
734 
735 	mutex_lock(&call->user_mutex);
736 	aborted = rxrpc_propose_abort(call, abort_code, error, why);
737 	mutex_unlock(&call->user_mutex);
738 	return aborted;
739 }
740 EXPORT_SYMBOL(rxrpc_kernel_abort_call);
741 
742 /**
743  * rxrpc_kernel_set_tx_length - Set the total Tx length on a call
744  * @sock: The socket the call is on
745  * @call: The call to be informed
746  * @tx_total_len: The amount of data to be transmitted for this call
747  *
748  * Allow a kernel service to set the total transmit length on a call.  This
749  * allows buffer-to-packet encrypt-and-copy to be performed.
750  *
751  * This function is primarily for use for setting the reply length since the
752  * request length can be set when beginning the call.
753  */
754 void rxrpc_kernel_set_tx_length(struct socket *sock, struct rxrpc_call *call,
755 				s64 tx_total_len)
756 {
757 	WARN_ON(call->tx_total_len != -1);
758 	call->tx_total_len = tx_total_len;
759 }
760 EXPORT_SYMBOL(rxrpc_kernel_set_tx_length);
761