xref: /openbmc/linux/net/tipc/socket.c (revision 9d749629)
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012 Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "port.h"
39 
40 #include <linux/export.h>
41 #include <net/sock.h>
42 
43 #define SS_LISTENING	-1	/* socket is listening */
44 #define SS_READY	-2	/* socket is connectionless */
45 
46 #define CONN_OVERLOAD_LIMIT	((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
47 				SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
48 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
49 
50 struct tipc_sock {
51 	struct sock sk;
52 	struct tipc_port *p;
53 	struct tipc_portid peer_name;
54 	unsigned int conn_timeout;
55 };
56 
57 #define tipc_sk(sk) ((struct tipc_sock *)(sk))
58 #define tipc_sk_port(sk) (tipc_sk(sk)->p)
59 
60 #define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
61 			(sock->state == SS_DISCONNECTING))
62 
63 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
64 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
65 static void wakeupdispatch(struct tipc_port *tport);
66 static void tipc_data_ready(struct sock *sk, int len);
67 static void tipc_write_space(struct sock *sk);
68 
69 static const struct proto_ops packet_ops;
70 static const struct proto_ops stream_ops;
71 static const struct proto_ops msg_ops;
72 
73 static struct proto tipc_proto;
74 
75 static int sockets_enabled;
76 
77 /*
78  * Revised TIPC socket locking policy:
79  *
80  * Most socket operations take the standard socket lock when they start
81  * and hold it until they finish (or until they need to sleep).  Acquiring
82  * this lock grants the owner exclusive access to the fields of the socket
83  * data structures, with the exception of the backlog queue.  A few socket
84  * operations can be done without taking the socket lock because they only
85  * read socket information that never changes during the life of the socket.
86  *
87  * Socket operations may acquire the lock for the associated TIPC port if they
88  * need to perform an operation on the port.  If any routine needs to acquire
89  * both the socket lock and the port lock it must take the socket lock first
90  * to avoid the risk of deadlock.
91  *
92  * The dispatcher handling incoming messages cannot grab the socket lock in
93  * the standard fashion, since invoked it runs at the BH level and cannot block.
94  * Instead, it checks to see if the socket lock is currently owned by someone,
95  * and either handles the message itself or adds it to the socket's backlog
96  * queue; in the latter case the queued message is processed once the process
97  * owning the socket lock releases it.
98  *
99  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
100  * the problem of a blocked socket operation preventing any other operations
101  * from occurring.  However, applications must be careful if they have
102  * multiple threads trying to send (or receive) on the same socket, as these
103  * operations might interfere with each other.  For example, doing a connect
104  * and a receive at the same time might allow the receive to consume the
105  * ACK message meant for the connect.  While additional work could be done
106  * to try and overcome this, it doesn't seem to be worthwhile at the present.
107  *
108  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
109  * that another operation that must be performed in a non-blocking manner is
110  * not delayed for very long because the lock has already been taken.
111  *
112  * NOTE: This code assumes that certain fields of a port/socket pair are
113  * constant over its lifetime; such fields can be examined without taking
114  * the socket lock and/or port lock, and do not need to be re-read even
115  * after resuming processing after waiting.  These fields include:
116  *   - socket type
117  *   - pointer to socket sk structure (aka tipc_sock structure)
118  *   - pointer to port structure
119  *   - port reference
120  */
121 
122 /**
123  * advance_rx_queue - discard first buffer in socket receive queue
124  *
125  * Caller must hold socket lock
126  */
127 static void advance_rx_queue(struct sock *sk)
128 {
129 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
130 }
131 
132 /**
133  * reject_rx_queue - reject all buffers in socket receive queue
134  *
135  * Caller must hold socket lock
136  */
137 static void reject_rx_queue(struct sock *sk)
138 {
139 	struct sk_buff *buf;
140 
141 	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
142 		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
143 }
144 
145 /**
146  * tipc_create - create a TIPC socket
147  * @net: network namespace (must be default network)
148  * @sock: pre-allocated socket structure
149  * @protocol: protocol indicator (must be 0)
150  * @kern: caused by kernel or by userspace?
151  *
152  * This routine creates additional data structures used by the TIPC socket,
153  * initializes them, and links them together.
154  *
155  * Returns 0 on success, errno otherwise
156  */
157 static int tipc_create(struct net *net, struct socket *sock, int protocol,
158 		       int kern)
159 {
160 	const struct proto_ops *ops;
161 	socket_state state;
162 	struct sock *sk;
163 	struct tipc_port *tp_ptr;
164 
165 	/* Validate arguments */
166 	if (unlikely(protocol != 0))
167 		return -EPROTONOSUPPORT;
168 
169 	switch (sock->type) {
170 	case SOCK_STREAM:
171 		ops = &stream_ops;
172 		state = SS_UNCONNECTED;
173 		break;
174 	case SOCK_SEQPACKET:
175 		ops = &packet_ops;
176 		state = SS_UNCONNECTED;
177 		break;
178 	case SOCK_DGRAM:
179 	case SOCK_RDM:
180 		ops = &msg_ops;
181 		state = SS_READY;
182 		break;
183 	default:
184 		return -EPROTOTYPE;
185 	}
186 
187 	/* Allocate socket's protocol area */
188 	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
189 	if (sk == NULL)
190 		return -ENOMEM;
191 
192 	/* Allocate TIPC port for socket to use */
193 	tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
194 				     TIPC_LOW_IMPORTANCE);
195 	if (unlikely(!tp_ptr)) {
196 		sk_free(sk);
197 		return -ENOMEM;
198 	}
199 
200 	/* Finish initializing socket data structures */
201 	sock->ops = ops;
202 	sock->state = state;
203 
204 	sock_init_data(sock, sk);
205 	sk->sk_backlog_rcv = backlog_rcv;
206 	sk->sk_data_ready = tipc_data_ready;
207 	sk->sk_write_space = tipc_write_space;
208 	tipc_sk(sk)->p = tp_ptr;
209 	tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
210 
211 	spin_unlock_bh(tp_ptr->lock);
212 
213 	if (sock->state == SS_READY) {
214 		tipc_set_portunreturnable(tp_ptr->ref, 1);
215 		if (sock->type == SOCK_DGRAM)
216 			tipc_set_portunreliable(tp_ptr->ref, 1);
217 	}
218 
219 	return 0;
220 }
221 
222 /**
223  * release - destroy a TIPC socket
224  * @sock: socket to destroy
225  *
226  * This routine cleans up any messages that are still queued on the socket.
227  * For DGRAM and RDM socket types, all queued messages are rejected.
228  * For SEQPACKET and STREAM socket types, the first message is rejected
229  * and any others are discarded.  (If the first message on a STREAM socket
230  * is partially-read, it is discarded and the next one is rejected instead.)
231  *
232  * NOTE: Rejected messages are not necessarily returned to the sender!  They
233  * are returned or discarded according to the "destination droppable" setting
234  * specified for the message by the sender.
235  *
236  * Returns 0 on success, errno otherwise
237  */
238 static int release(struct socket *sock)
239 {
240 	struct sock *sk = sock->sk;
241 	struct tipc_port *tport;
242 	struct sk_buff *buf;
243 	int res;
244 
245 	/*
246 	 * Exit if socket isn't fully initialized (occurs when a failed accept()
247 	 * releases a pre-allocated child socket that was never used)
248 	 */
249 	if (sk == NULL)
250 		return 0;
251 
252 	tport = tipc_sk_port(sk);
253 	lock_sock(sk);
254 
255 	/*
256 	 * Reject all unreceived messages, except on an active connection
257 	 * (which disconnects locally & sends a 'FIN+' to peer)
258 	 */
259 	while (sock->state != SS_DISCONNECTING) {
260 		buf = __skb_dequeue(&sk->sk_receive_queue);
261 		if (buf == NULL)
262 			break;
263 		if (TIPC_SKB_CB(buf)->handle != 0)
264 			kfree_skb(buf);
265 		else {
266 			if ((sock->state == SS_CONNECTING) ||
267 			    (sock->state == SS_CONNECTED)) {
268 				sock->state = SS_DISCONNECTING;
269 				tipc_disconnect(tport->ref);
270 			}
271 			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
272 		}
273 	}
274 
275 	/*
276 	 * Delete TIPC port; this ensures no more messages are queued
277 	 * (also disconnects an active connection & sends a 'FIN-' to peer)
278 	 */
279 	res = tipc_deleteport(tport->ref);
280 
281 	/* Discard any remaining (connection-based) messages in receive queue */
282 	__skb_queue_purge(&sk->sk_receive_queue);
283 
284 	/* Reject any messages that accumulated in backlog queue */
285 	sock->state = SS_DISCONNECTING;
286 	release_sock(sk);
287 
288 	sock_put(sk);
289 	sock->sk = NULL;
290 
291 	return res;
292 }
293 
294 /**
295  * bind - associate or disassocate TIPC name(s) with a socket
296  * @sock: socket structure
297  * @uaddr: socket address describing name(s) and desired operation
298  * @uaddr_len: size of socket address data structure
299  *
300  * Name and name sequence binding is indicated using a positive scope value;
301  * a negative scope value unbinds the specified name.  Specifying no name
302  * (i.e. a socket address length of 0) unbinds all names from the socket.
303  *
304  * Returns 0 on success, errno otherwise
305  *
306  * NOTE: This routine doesn't need to take the socket lock since it doesn't
307  *       access any non-constant socket information.
308  */
309 static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
310 {
311 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
312 	u32 portref = tipc_sk_port(sock->sk)->ref;
313 
314 	if (unlikely(!uaddr_len))
315 		return tipc_withdraw(portref, 0, NULL);
316 
317 	if (uaddr_len < sizeof(struct sockaddr_tipc))
318 		return -EINVAL;
319 	if (addr->family != AF_TIPC)
320 		return -EAFNOSUPPORT;
321 
322 	if (addr->addrtype == TIPC_ADDR_NAME)
323 		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
324 	else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
325 		return -EAFNOSUPPORT;
326 
327 	if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
328 		return -EACCES;
329 
330 	return (addr->scope > 0) ?
331 		tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
332 		tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
333 }
334 
335 /**
336  * get_name - get port ID of socket or peer socket
337  * @sock: socket structure
338  * @uaddr: area for returned socket address
339  * @uaddr_len: area for returned length of socket address
340  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
341  *
342  * Returns 0 on success, errno otherwise
343  *
344  * NOTE: This routine doesn't need to take the socket lock since it only
345  *       accesses socket information that is unchanging (or which changes in
346  *       a completely predictable manner).
347  */
348 static int get_name(struct socket *sock, struct sockaddr *uaddr,
349 		    int *uaddr_len, int peer)
350 {
351 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
352 	struct tipc_sock *tsock = tipc_sk(sock->sk);
353 
354 	memset(addr, 0, sizeof(*addr));
355 	if (peer) {
356 		if ((sock->state != SS_CONNECTED) &&
357 			((peer != 2) || (sock->state != SS_DISCONNECTING)))
358 			return -ENOTCONN;
359 		addr->addr.id.ref = tsock->peer_name.ref;
360 		addr->addr.id.node = tsock->peer_name.node;
361 	} else {
362 		addr->addr.id.ref = tsock->p->ref;
363 		addr->addr.id.node = tipc_own_addr;
364 	}
365 
366 	*uaddr_len = sizeof(*addr);
367 	addr->addrtype = TIPC_ADDR_ID;
368 	addr->family = AF_TIPC;
369 	addr->scope = 0;
370 	addr->addr.name.domain = 0;
371 
372 	return 0;
373 }
374 
375 /**
376  * poll - read and possibly block on pollmask
377  * @file: file structure associated with the socket
378  * @sock: socket for which to calculate the poll bits
379  * @wait: ???
380  *
381  * Returns pollmask value
382  *
383  * COMMENTARY:
384  * It appears that the usual socket locking mechanisms are not useful here
385  * since the pollmask info is potentially out-of-date the moment this routine
386  * exits.  TCP and other protocols seem to rely on higher level poll routines
387  * to handle any preventable race conditions, so TIPC will do the same ...
388  *
389  * TIPC sets the returned events as follows:
390  *
391  * socket state		flags set
392  * ------------		---------
393  * unconnected		no read flags
394  *			POLLOUT if port is not congested
395  *
396  * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue
397  *			no write flags
398  *
399  * connected		POLLIN/POLLRDNORM if data in rx queue
400  *			POLLOUT if port is not congested
401  *
402  * disconnecting	POLLIN/POLLRDNORM/POLLHUP
403  *			no write flags
404  *
405  * listening		POLLIN if SYN in rx queue
406  *			no write flags
407  *
408  * ready		POLLIN/POLLRDNORM if data in rx queue
409  * [connectionless]	POLLOUT (since port cannot be congested)
410  *
411  * IMPORTANT: The fact that a read or write operation is indicated does NOT
412  * imply that the operation will succeed, merely that it should be performed
413  * and will not block.
414  */
415 static unsigned int poll(struct file *file, struct socket *sock,
416 			 poll_table *wait)
417 {
418 	struct sock *sk = sock->sk;
419 	u32 mask = 0;
420 
421 	sock_poll_wait(file, sk_sleep(sk), wait);
422 
423 	switch ((int)sock->state) {
424 	case SS_UNCONNECTED:
425 		if (!tipc_sk_port(sk)->congested)
426 			mask |= POLLOUT;
427 		break;
428 	case SS_READY:
429 	case SS_CONNECTED:
430 		if (!tipc_sk_port(sk)->congested)
431 			mask |= POLLOUT;
432 		/* fall thru' */
433 	case SS_CONNECTING:
434 	case SS_LISTENING:
435 		if (!skb_queue_empty(&sk->sk_receive_queue))
436 			mask |= (POLLIN | POLLRDNORM);
437 		break;
438 	case SS_DISCONNECTING:
439 		mask = (POLLIN | POLLRDNORM | POLLHUP);
440 		break;
441 	}
442 
443 	return mask;
444 }
445 
446 /**
447  * dest_name_check - verify user is permitted to send to specified port name
448  * @dest: destination address
449  * @m: descriptor for message to be sent
450  *
451  * Prevents restricted configuration commands from being issued by
452  * unauthorized users.
453  *
454  * Returns 0 if permission is granted, otherwise errno
455  */
456 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
457 {
458 	struct tipc_cfg_msg_hdr hdr;
459 
460 	if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
461 		return 0;
462 	if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
463 		return 0;
464 	if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
465 		return -EACCES;
466 
467 	if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
468 		return -EMSGSIZE;
469 	if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
470 		return -EFAULT;
471 	if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
472 		return -EACCES;
473 
474 	return 0;
475 }
476 
477 /**
478  * send_msg - send message in connectionless manner
479  * @iocb: if NULL, indicates that socket lock is already held
480  * @sock: socket structure
481  * @m: message to send
482  * @total_len: length of message
483  *
484  * Message must have an destination specified explicitly.
485  * Used for SOCK_RDM and SOCK_DGRAM messages,
486  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
487  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
488  *
489  * Returns the number of bytes sent on success, or errno otherwise
490  */
491 static int send_msg(struct kiocb *iocb, struct socket *sock,
492 		    struct msghdr *m, size_t total_len)
493 {
494 	struct sock *sk = sock->sk;
495 	struct tipc_port *tport = tipc_sk_port(sk);
496 	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
497 	int needs_conn;
498 	long timeout_val;
499 	int res = -EINVAL;
500 
501 	if (unlikely(!dest))
502 		return -EDESTADDRREQ;
503 	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
504 		     (dest->family != AF_TIPC)))
505 		return -EINVAL;
506 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
507 		return -EMSGSIZE;
508 
509 	if (iocb)
510 		lock_sock(sk);
511 
512 	needs_conn = (sock->state != SS_READY);
513 	if (unlikely(needs_conn)) {
514 		if (sock->state == SS_LISTENING) {
515 			res = -EPIPE;
516 			goto exit;
517 		}
518 		if (sock->state != SS_UNCONNECTED) {
519 			res = -EISCONN;
520 			goto exit;
521 		}
522 		if ((tport->published) ||
523 		    ((sock->type == SOCK_STREAM) && (total_len != 0))) {
524 			res = -EOPNOTSUPP;
525 			goto exit;
526 		}
527 		if (dest->addrtype == TIPC_ADDR_NAME) {
528 			tport->conn_type = dest->addr.name.name.type;
529 			tport->conn_instance = dest->addr.name.name.instance;
530 		}
531 
532 		/* Abort any pending connection attempts (very unlikely) */
533 		reject_rx_queue(sk);
534 	}
535 
536 	timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
537 
538 	do {
539 		if (dest->addrtype == TIPC_ADDR_NAME) {
540 			res = dest_name_check(dest, m);
541 			if (res)
542 				break;
543 			res = tipc_send2name(tport->ref,
544 					     &dest->addr.name.name,
545 					     dest->addr.name.domain,
546 					     m->msg_iovlen,
547 					     m->msg_iov,
548 					     total_len);
549 		} else if (dest->addrtype == TIPC_ADDR_ID) {
550 			res = tipc_send2port(tport->ref,
551 					     &dest->addr.id,
552 					     m->msg_iovlen,
553 					     m->msg_iov,
554 					     total_len);
555 		} else if (dest->addrtype == TIPC_ADDR_MCAST) {
556 			if (needs_conn) {
557 				res = -EOPNOTSUPP;
558 				break;
559 			}
560 			res = dest_name_check(dest, m);
561 			if (res)
562 				break;
563 			res = tipc_multicast(tport->ref,
564 					     &dest->addr.nameseq,
565 					     m->msg_iovlen,
566 					     m->msg_iov,
567 					     total_len);
568 		}
569 		if (likely(res != -ELINKCONG)) {
570 			if (needs_conn && (res >= 0))
571 				sock->state = SS_CONNECTING;
572 			break;
573 		}
574 		if (timeout_val <= 0L) {
575 			res = timeout_val ? timeout_val : -EWOULDBLOCK;
576 			break;
577 		}
578 		release_sock(sk);
579 		timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
580 					       !tport->congested, timeout_val);
581 		lock_sock(sk);
582 	} while (1);
583 
584 exit:
585 	if (iocb)
586 		release_sock(sk);
587 	return res;
588 }
589 
590 /**
591  * send_packet - send a connection-oriented message
592  * @iocb: if NULL, indicates that socket lock is already held
593  * @sock: socket structure
594  * @m: message to send
595  * @total_len: length of message
596  *
597  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
598  *
599  * Returns the number of bytes sent on success, or errno otherwise
600  */
601 static int send_packet(struct kiocb *iocb, struct socket *sock,
602 		       struct msghdr *m, size_t total_len)
603 {
604 	struct sock *sk = sock->sk;
605 	struct tipc_port *tport = tipc_sk_port(sk);
606 	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
607 	long timeout_val;
608 	int res;
609 
610 	/* Handle implied connection establishment */
611 	if (unlikely(dest))
612 		return send_msg(iocb, sock, m, total_len);
613 
614 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
615 		return -EMSGSIZE;
616 
617 	if (iocb)
618 		lock_sock(sk);
619 
620 	timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
621 
622 	do {
623 		if (unlikely(sock->state != SS_CONNECTED)) {
624 			if (sock->state == SS_DISCONNECTING)
625 				res = -EPIPE;
626 			else
627 				res = -ENOTCONN;
628 			break;
629 		}
630 
631 		res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
632 				total_len);
633 		if (likely(res != -ELINKCONG))
634 			break;
635 		if (timeout_val <= 0L) {
636 			res = timeout_val ? timeout_val : -EWOULDBLOCK;
637 			break;
638 		}
639 		release_sock(sk);
640 		timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
641 			(!tport->congested || !tport->connected), timeout_val);
642 		lock_sock(sk);
643 	} while (1);
644 
645 	if (iocb)
646 		release_sock(sk);
647 	return res;
648 }
649 
650 /**
651  * send_stream - send stream-oriented data
652  * @iocb: (unused)
653  * @sock: socket structure
654  * @m: data to send
655  * @total_len: total length of data to be sent
656  *
657  * Used for SOCK_STREAM data.
658  *
659  * Returns the number of bytes sent on success (or partial success),
660  * or errno if no data sent
661  */
662 static int send_stream(struct kiocb *iocb, struct socket *sock,
663 		       struct msghdr *m, size_t total_len)
664 {
665 	struct sock *sk = sock->sk;
666 	struct tipc_port *tport = tipc_sk_port(sk);
667 	struct msghdr my_msg;
668 	struct iovec my_iov;
669 	struct iovec *curr_iov;
670 	int curr_iovlen;
671 	char __user *curr_start;
672 	u32 hdr_size;
673 	int curr_left;
674 	int bytes_to_send;
675 	int bytes_sent;
676 	int res;
677 
678 	lock_sock(sk);
679 
680 	/* Handle special cases where there is no connection */
681 	if (unlikely(sock->state != SS_CONNECTED)) {
682 		if (sock->state == SS_UNCONNECTED) {
683 			res = send_packet(NULL, sock, m, total_len);
684 			goto exit;
685 		} else if (sock->state == SS_DISCONNECTING) {
686 			res = -EPIPE;
687 			goto exit;
688 		} else {
689 			res = -ENOTCONN;
690 			goto exit;
691 		}
692 	}
693 
694 	if (unlikely(m->msg_name)) {
695 		res = -EISCONN;
696 		goto exit;
697 	}
698 
699 	if (total_len > (unsigned int)INT_MAX) {
700 		res = -EMSGSIZE;
701 		goto exit;
702 	}
703 
704 	/*
705 	 * Send each iovec entry using one or more messages
706 	 *
707 	 * Note: This algorithm is good for the most likely case
708 	 * (i.e. one large iovec entry), but could be improved to pass sets
709 	 * of small iovec entries into send_packet().
710 	 */
711 	curr_iov = m->msg_iov;
712 	curr_iovlen = m->msg_iovlen;
713 	my_msg.msg_iov = &my_iov;
714 	my_msg.msg_iovlen = 1;
715 	my_msg.msg_flags = m->msg_flags;
716 	my_msg.msg_name = NULL;
717 	bytes_sent = 0;
718 
719 	hdr_size = msg_hdr_sz(&tport->phdr);
720 
721 	while (curr_iovlen--) {
722 		curr_start = curr_iov->iov_base;
723 		curr_left = curr_iov->iov_len;
724 
725 		while (curr_left) {
726 			bytes_to_send = tport->max_pkt - hdr_size;
727 			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
728 				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
729 			if (curr_left < bytes_to_send)
730 				bytes_to_send = curr_left;
731 			my_iov.iov_base = curr_start;
732 			my_iov.iov_len = bytes_to_send;
733 			res = send_packet(NULL, sock, &my_msg, bytes_to_send);
734 			if (res < 0) {
735 				if (bytes_sent)
736 					res = bytes_sent;
737 				goto exit;
738 			}
739 			curr_left -= bytes_to_send;
740 			curr_start += bytes_to_send;
741 			bytes_sent += bytes_to_send;
742 		}
743 
744 		curr_iov++;
745 	}
746 	res = bytes_sent;
747 exit:
748 	release_sock(sk);
749 	return res;
750 }
751 
752 /**
753  * auto_connect - complete connection setup to a remote port
754  * @sock: socket structure
755  * @msg: peer's response message
756  *
757  * Returns 0 on success, errno otherwise
758  */
759 static int auto_connect(struct socket *sock, struct tipc_msg *msg)
760 {
761 	struct tipc_sock *tsock = tipc_sk(sock->sk);
762 	struct tipc_port *p_ptr;
763 
764 	tsock->peer_name.ref = msg_origport(msg);
765 	tsock->peer_name.node = msg_orignode(msg);
766 	p_ptr = tipc_port_deref(tsock->p->ref);
767 	if (!p_ptr)
768 		return -EINVAL;
769 
770 	__tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
771 
772 	if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
773 		return -EINVAL;
774 	msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
775 	sock->state = SS_CONNECTED;
776 	return 0;
777 }
778 
779 /**
780  * set_orig_addr - capture sender's address for received message
781  * @m: descriptor for message info
782  * @msg: received message header
783  *
784  * Note: Address is not captured if not requested by receiver.
785  */
786 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
787 {
788 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
789 
790 	if (addr) {
791 		addr->family = AF_TIPC;
792 		addr->addrtype = TIPC_ADDR_ID;
793 		addr->addr.id.ref = msg_origport(msg);
794 		addr->addr.id.node = msg_orignode(msg);
795 		addr->addr.name.domain = 0;	/* could leave uninitialized */
796 		addr->scope = 0;		/* could leave uninitialized */
797 		m->msg_namelen = sizeof(struct sockaddr_tipc);
798 	}
799 }
800 
801 /**
802  * anc_data_recv - optionally capture ancillary data for received message
803  * @m: descriptor for message info
804  * @msg: received message header
805  * @tport: TIPC port associated with message
806  *
807  * Note: Ancillary data is not captured if not requested by receiver.
808  *
809  * Returns 0 if successful, otherwise errno
810  */
811 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
812 				struct tipc_port *tport)
813 {
814 	u32 anc_data[3];
815 	u32 err;
816 	u32 dest_type;
817 	int has_name;
818 	int res;
819 
820 	if (likely(m->msg_controllen == 0))
821 		return 0;
822 
823 	/* Optionally capture errored message object(s) */
824 	err = msg ? msg_errcode(msg) : 0;
825 	if (unlikely(err)) {
826 		anc_data[0] = err;
827 		anc_data[1] = msg_data_sz(msg);
828 		res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
829 		if (res)
830 			return res;
831 		if (anc_data[1]) {
832 			res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
833 				       msg_data(msg));
834 			if (res)
835 				return res;
836 		}
837 	}
838 
839 	/* Optionally capture message destination object */
840 	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
841 	switch (dest_type) {
842 	case TIPC_NAMED_MSG:
843 		has_name = 1;
844 		anc_data[0] = msg_nametype(msg);
845 		anc_data[1] = msg_namelower(msg);
846 		anc_data[2] = msg_namelower(msg);
847 		break;
848 	case TIPC_MCAST_MSG:
849 		has_name = 1;
850 		anc_data[0] = msg_nametype(msg);
851 		anc_data[1] = msg_namelower(msg);
852 		anc_data[2] = msg_nameupper(msg);
853 		break;
854 	case TIPC_CONN_MSG:
855 		has_name = (tport->conn_type != 0);
856 		anc_data[0] = tport->conn_type;
857 		anc_data[1] = tport->conn_instance;
858 		anc_data[2] = tport->conn_instance;
859 		break;
860 	default:
861 		has_name = 0;
862 	}
863 	if (has_name) {
864 		res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
865 		if (res)
866 			return res;
867 	}
868 
869 	return 0;
870 }
871 
872 /**
873  * recv_msg - receive packet-oriented message
874  * @iocb: (unused)
875  * @m: descriptor for message info
876  * @buf_len: total size of user buffer area
877  * @flags: receive flags
878  *
879  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
880  * If the complete message doesn't fit in user area, truncate it.
881  *
882  * Returns size of returned message data, errno otherwise
883  */
884 static int recv_msg(struct kiocb *iocb, struct socket *sock,
885 		    struct msghdr *m, size_t buf_len, int flags)
886 {
887 	struct sock *sk = sock->sk;
888 	struct tipc_port *tport = tipc_sk_port(sk);
889 	struct sk_buff *buf;
890 	struct tipc_msg *msg;
891 	long timeout;
892 	unsigned int sz;
893 	u32 err;
894 	int res;
895 
896 	/* Catch invalid receive requests */
897 	if (unlikely(!buf_len))
898 		return -EINVAL;
899 
900 	lock_sock(sk);
901 
902 	if (unlikely(sock->state == SS_UNCONNECTED)) {
903 		res = -ENOTCONN;
904 		goto exit;
905 	}
906 
907 	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
908 restart:
909 
910 	/* Look for a message in receive queue; wait if necessary */
911 	while (skb_queue_empty(&sk->sk_receive_queue)) {
912 		if (sock->state == SS_DISCONNECTING) {
913 			res = -ENOTCONN;
914 			goto exit;
915 		}
916 		if (timeout <= 0L) {
917 			res = timeout ? timeout : -EWOULDBLOCK;
918 			goto exit;
919 		}
920 		release_sock(sk);
921 		timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
922 							   tipc_rx_ready(sock),
923 							   timeout);
924 		lock_sock(sk);
925 	}
926 
927 	/* Look at first message in receive queue */
928 	buf = skb_peek(&sk->sk_receive_queue);
929 	msg = buf_msg(buf);
930 	sz = msg_data_sz(msg);
931 	err = msg_errcode(msg);
932 
933 	/* Discard an empty non-errored message & try again */
934 	if ((!sz) && (!err)) {
935 		advance_rx_queue(sk);
936 		goto restart;
937 	}
938 
939 	/* Capture sender's address (optional) */
940 	set_orig_addr(m, msg);
941 
942 	/* Capture ancillary data (optional) */
943 	res = anc_data_recv(m, msg, tport);
944 	if (res)
945 		goto exit;
946 
947 	/* Capture message data (if valid) & compute return value (always) */
948 	if (!err) {
949 		if (unlikely(buf_len < sz)) {
950 			sz = buf_len;
951 			m->msg_flags |= MSG_TRUNC;
952 		}
953 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
954 					      m->msg_iov, sz);
955 		if (res)
956 			goto exit;
957 		res = sz;
958 	} else {
959 		if ((sock->state == SS_READY) ||
960 		    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
961 			res = 0;
962 		else
963 			res = -ECONNRESET;
964 	}
965 
966 	/* Consume received message (optional) */
967 	if (likely(!(flags & MSG_PEEK))) {
968 		if ((sock->state != SS_READY) &&
969 		    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
970 			tipc_acknowledge(tport->ref, tport->conn_unacked);
971 		advance_rx_queue(sk);
972 	}
973 exit:
974 	release_sock(sk);
975 	return res;
976 }
977 
978 /**
979  * recv_stream - receive stream-oriented data
980  * @iocb: (unused)
981  * @m: descriptor for message info
982  * @buf_len: total size of user buffer area
983  * @flags: receive flags
984  *
985  * Used for SOCK_STREAM messages only.  If not enough data is available
986  * will optionally wait for more; never truncates data.
987  *
988  * Returns size of returned message data, errno otherwise
989  */
990 static int recv_stream(struct kiocb *iocb, struct socket *sock,
991 		       struct msghdr *m, size_t buf_len, int flags)
992 {
993 	struct sock *sk = sock->sk;
994 	struct tipc_port *tport = tipc_sk_port(sk);
995 	struct sk_buff *buf;
996 	struct tipc_msg *msg;
997 	long timeout;
998 	unsigned int sz;
999 	int sz_to_copy, target, needed;
1000 	int sz_copied = 0;
1001 	u32 err;
1002 	int res = 0;
1003 
1004 	/* Catch invalid receive attempts */
1005 	if (unlikely(!buf_len))
1006 		return -EINVAL;
1007 
1008 	lock_sock(sk);
1009 
1010 	if (unlikely((sock->state == SS_UNCONNECTED) ||
1011 		     (sock->state == SS_CONNECTING))) {
1012 		res = -ENOTCONN;
1013 		goto exit;
1014 	}
1015 
1016 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1017 	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1018 
1019 restart:
1020 	/* Look for a message in receive queue; wait if necessary */
1021 	while (skb_queue_empty(&sk->sk_receive_queue)) {
1022 		if (sock->state == SS_DISCONNECTING) {
1023 			res = -ENOTCONN;
1024 			goto exit;
1025 		}
1026 		if (timeout <= 0L) {
1027 			res = timeout ? timeout : -EWOULDBLOCK;
1028 			goto exit;
1029 		}
1030 		release_sock(sk);
1031 		timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1032 							   tipc_rx_ready(sock),
1033 							   timeout);
1034 		lock_sock(sk);
1035 	}
1036 
1037 	/* Look at first message in receive queue */
1038 	buf = skb_peek(&sk->sk_receive_queue);
1039 	msg = buf_msg(buf);
1040 	sz = msg_data_sz(msg);
1041 	err = msg_errcode(msg);
1042 
1043 	/* Discard an empty non-errored message & try again */
1044 	if ((!sz) && (!err)) {
1045 		advance_rx_queue(sk);
1046 		goto restart;
1047 	}
1048 
1049 	/* Optionally capture sender's address & ancillary data of first msg */
1050 	if (sz_copied == 0) {
1051 		set_orig_addr(m, msg);
1052 		res = anc_data_recv(m, msg, tport);
1053 		if (res)
1054 			goto exit;
1055 	}
1056 
1057 	/* Capture message data (if valid) & compute return value (always) */
1058 	if (!err) {
1059 		u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1060 
1061 		sz -= offset;
1062 		needed = (buf_len - sz_copied);
1063 		sz_to_copy = (sz <= needed) ? sz : needed;
1064 
1065 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1066 					      m->msg_iov, sz_to_copy);
1067 		if (res)
1068 			goto exit;
1069 
1070 		sz_copied += sz_to_copy;
1071 
1072 		if (sz_to_copy < sz) {
1073 			if (!(flags & MSG_PEEK))
1074 				TIPC_SKB_CB(buf)->handle =
1075 				(void *)(unsigned long)(offset + sz_to_copy);
1076 			goto exit;
1077 		}
1078 	} else {
1079 		if (sz_copied != 0)
1080 			goto exit; /* can't add error msg to valid data */
1081 
1082 		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1083 			res = 0;
1084 		else
1085 			res = -ECONNRESET;
1086 	}
1087 
1088 	/* Consume received message (optional) */
1089 	if (likely(!(flags & MSG_PEEK))) {
1090 		if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1091 			tipc_acknowledge(tport->ref, tport->conn_unacked);
1092 		advance_rx_queue(sk);
1093 	}
1094 
1095 	/* Loop around if more data is required */
1096 	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
1097 	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1098 	    (sz_copied < target)) &&	/* and more is ready or required */
1099 	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */
1100 	    (!err))			/* and haven't reached a FIN */
1101 		goto restart;
1102 
1103 exit:
1104 	release_sock(sk);
1105 	return sz_copied ? sz_copied : res;
1106 }
1107 
1108 /**
1109  * tipc_write_space - wake up thread if port congestion is released
1110  * @sk: socket
1111  */
1112 static void tipc_write_space(struct sock *sk)
1113 {
1114 	struct socket_wq *wq;
1115 
1116 	rcu_read_lock();
1117 	wq = rcu_dereference(sk->sk_wq);
1118 	if (wq_has_sleeper(wq))
1119 		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1120 						POLLWRNORM | POLLWRBAND);
1121 	rcu_read_unlock();
1122 }
1123 
1124 /**
1125  * tipc_data_ready - wake up threads to indicate messages have been received
1126  * @sk: socket
1127  * @len: the length of messages
1128  */
1129 static void tipc_data_ready(struct sock *sk, int len)
1130 {
1131 	struct socket_wq *wq;
1132 
1133 	rcu_read_lock();
1134 	wq = rcu_dereference(sk->sk_wq);
1135 	if (wq_has_sleeper(wq))
1136 		wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1137 						POLLRDNORM | POLLRDBAND);
1138 	rcu_read_unlock();
1139 }
1140 
1141 /**
1142  * filter_connect - Handle all incoming messages for a connection-based socket
1143  * @tsock: TIPC socket
1144  * @msg: message
1145  *
1146  * Returns TIPC error status code and socket error status code
1147  * once it encounters some errors
1148  */
1149 static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1150 {
1151 	struct socket *sock = tsock->sk.sk_socket;
1152 	struct tipc_msg *msg = buf_msg(*buf);
1153 	struct sock *sk = &tsock->sk;
1154 	u32 retval = TIPC_ERR_NO_PORT;
1155 	int res;
1156 
1157 	if (msg_mcast(msg))
1158 		return retval;
1159 
1160 	switch ((int)sock->state) {
1161 	case SS_CONNECTED:
1162 		/* Accept only connection-based messages sent by peer */
1163 		if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
1164 			if (unlikely(msg_errcode(msg))) {
1165 				sock->state = SS_DISCONNECTING;
1166 				__tipc_disconnect(tsock->p);
1167 			}
1168 			retval = TIPC_OK;
1169 		}
1170 		break;
1171 	case SS_CONNECTING:
1172 		/* Accept only ACK or NACK message */
1173 		if (unlikely(msg_errcode(msg))) {
1174 			sock->state = SS_DISCONNECTING;
1175 			sk->sk_err = -ECONNREFUSED;
1176 			retval = TIPC_OK;
1177 			break;
1178 		}
1179 
1180 		if (unlikely(!msg_connected(msg)))
1181 			break;
1182 
1183 		res = auto_connect(sock, msg);
1184 		if (res) {
1185 			sock->state = SS_DISCONNECTING;
1186 			sk->sk_err = res;
1187 			retval = TIPC_OK;
1188 			break;
1189 		}
1190 
1191 		/* If an incoming message is an 'ACK-', it should be
1192 		 * discarded here because it doesn't contain useful
1193 		 * data. In addition, we should try to wake up
1194 		 * connect() routine if sleeping.
1195 		 */
1196 		if (msg_data_sz(msg) == 0) {
1197 			kfree_skb(*buf);
1198 			*buf = NULL;
1199 			if (waitqueue_active(sk_sleep(sk)))
1200 				wake_up_interruptible(sk_sleep(sk));
1201 		}
1202 		retval = TIPC_OK;
1203 		break;
1204 	case SS_LISTENING:
1205 	case SS_UNCONNECTED:
1206 		/* Accept only SYN message */
1207 		if (!msg_connected(msg) && !(msg_errcode(msg)))
1208 			retval = TIPC_OK;
1209 		break;
1210 	case SS_DISCONNECTING:
1211 		break;
1212 	default:
1213 		pr_err("Unknown socket state %u\n", sock->state);
1214 	}
1215 	return retval;
1216 }
1217 
1218 /**
1219  * rcvbuf_limit - get proper overload limit of socket receive queue
1220  * @sk: socket
1221  * @buf: message
1222  *
1223  * For all connection oriented messages, irrespective of importance,
1224  * the default overload value (i.e. 67MB) is set as limit.
1225  *
1226  * For all connectionless messages, by default new queue limits are
1227  * as belows:
1228  *
1229  * TIPC_LOW_IMPORTANCE       (5MB)
1230  * TIPC_MEDIUM_IMPORTANCE    (10MB)
1231  * TIPC_HIGH_IMPORTANCE      (20MB)
1232  * TIPC_CRITICAL_IMPORTANCE  (40MB)
1233  *
1234  * Returns overload limit according to corresponding message importance
1235  */
1236 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1237 {
1238 	struct tipc_msg *msg = buf_msg(buf);
1239 	unsigned int limit;
1240 
1241 	if (msg_connected(msg))
1242 		limit = CONN_OVERLOAD_LIMIT;
1243 	else
1244 		limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
1245 	return limit;
1246 }
1247 
1248 /**
1249  * filter_rcv - validate incoming message
1250  * @sk: socket
1251  * @buf: message
1252  *
1253  * Enqueues message on receive queue if acceptable; optionally handles
1254  * disconnect indication for a connected socket.
1255  *
1256  * Called with socket lock already taken; port lock may also be taken.
1257  *
1258  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1259  */
1260 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1261 {
1262 	struct socket *sock = sk->sk_socket;
1263 	struct tipc_msg *msg = buf_msg(buf);
1264 	unsigned int limit = rcvbuf_limit(sk, buf);
1265 	u32 res = TIPC_OK;
1266 
1267 	/* Reject message if it is wrong sort of message for socket */
1268 	if (msg_type(msg) > TIPC_DIRECT_MSG)
1269 		return TIPC_ERR_NO_PORT;
1270 
1271 	if (sock->state == SS_READY) {
1272 		if (msg_connected(msg))
1273 			return TIPC_ERR_NO_PORT;
1274 	} else {
1275 		res = filter_connect(tipc_sk(sk), &buf);
1276 		if (res != TIPC_OK || buf == NULL)
1277 			return res;
1278 	}
1279 
1280 	/* Reject message if there isn't room to queue it */
1281 	if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1282 		return TIPC_ERR_OVERLOAD;
1283 
1284 	/* Enqueue message */
1285 	TIPC_SKB_CB(buf)->handle = 0;
1286 	__skb_queue_tail(&sk->sk_receive_queue, buf);
1287 	skb_set_owner_r(buf, sk);
1288 
1289 	sk->sk_data_ready(sk, 0);
1290 	return TIPC_OK;
1291 }
1292 
1293 /**
1294  * backlog_rcv - handle incoming message from backlog queue
1295  * @sk: socket
1296  * @buf: message
1297  *
1298  * Caller must hold socket lock, but not port lock.
1299  *
1300  * Returns 0
1301  */
1302 static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1303 {
1304 	u32 res;
1305 
1306 	res = filter_rcv(sk, buf);
1307 	if (res)
1308 		tipc_reject_msg(buf, res);
1309 	return 0;
1310 }
1311 
1312 /**
1313  * dispatch - handle incoming message
1314  * @tport: TIPC port that received message
1315  * @buf: message
1316  *
1317  * Called with port lock already taken.
1318  *
1319  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1320  */
1321 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1322 {
1323 	struct sock *sk = (struct sock *)tport->usr_handle;
1324 	u32 res;
1325 
1326 	/*
1327 	 * Process message if socket is unlocked; otherwise add to backlog queue
1328 	 *
1329 	 * This code is based on sk_receive_skb(), but must be distinct from it
1330 	 * since a TIPC-specific filter/reject mechanism is utilized
1331 	 */
1332 	bh_lock_sock(sk);
1333 	if (!sock_owned_by_user(sk)) {
1334 		res = filter_rcv(sk, buf);
1335 	} else {
1336 		if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1337 			res = TIPC_ERR_OVERLOAD;
1338 		else
1339 			res = TIPC_OK;
1340 	}
1341 	bh_unlock_sock(sk);
1342 
1343 	return res;
1344 }
1345 
1346 /**
1347  * wakeupdispatch - wake up port after congestion
1348  * @tport: port to wakeup
1349  *
1350  * Called with port lock already taken.
1351  */
1352 static void wakeupdispatch(struct tipc_port *tport)
1353 {
1354 	struct sock *sk = (struct sock *)tport->usr_handle;
1355 
1356 	sk->sk_write_space(sk);
1357 }
1358 
1359 /**
1360  * connect - establish a connection to another TIPC port
1361  * @sock: socket structure
1362  * @dest: socket address for destination port
1363  * @destlen: size of socket address data structure
1364  * @flags: file-related flags associated with socket
1365  *
1366  * Returns 0 on success, errno otherwise
1367  */
1368 static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1369 		   int flags)
1370 {
1371 	struct sock *sk = sock->sk;
1372 	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1373 	struct msghdr m = {NULL,};
1374 	unsigned int timeout;
1375 	int res;
1376 
1377 	lock_sock(sk);
1378 
1379 	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1380 	if (sock->state == SS_READY) {
1381 		res = -EOPNOTSUPP;
1382 		goto exit;
1383 	}
1384 
1385 	/*
1386 	 * Reject connection attempt using multicast address
1387 	 *
1388 	 * Note: send_msg() validates the rest of the address fields,
1389 	 *       so there's no need to do it here
1390 	 */
1391 	if (dst->addrtype == TIPC_ADDR_MCAST) {
1392 		res = -EINVAL;
1393 		goto exit;
1394 	}
1395 
1396 	timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1397 
1398 	switch (sock->state) {
1399 	case SS_UNCONNECTED:
1400 		/* Send a 'SYN-' to destination */
1401 		m.msg_name = dest;
1402 		m.msg_namelen = destlen;
1403 
1404 		/* If connect is in non-blocking case, set MSG_DONTWAIT to
1405 		 * indicate send_msg() is never blocked.
1406 		 */
1407 		if (!timeout)
1408 			m.msg_flags = MSG_DONTWAIT;
1409 
1410 		res = send_msg(NULL, sock, &m, 0);
1411 		if ((res < 0) && (res != -EWOULDBLOCK))
1412 			goto exit;
1413 
1414 		/* Just entered SS_CONNECTING state; the only
1415 		 * difference is that return value in non-blocking
1416 		 * case is EINPROGRESS, rather than EALREADY.
1417 		 */
1418 		res = -EINPROGRESS;
1419 		break;
1420 	case SS_CONNECTING:
1421 		res = -EALREADY;
1422 		break;
1423 	case SS_CONNECTED:
1424 		res = -EISCONN;
1425 		break;
1426 	default:
1427 		res = -EINVAL;
1428 		goto exit;
1429 	}
1430 
1431 	if (sock->state == SS_CONNECTING) {
1432 		if (!timeout)
1433 			goto exit;
1434 
1435 		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1436 		release_sock(sk);
1437 		res = wait_event_interruptible_timeout(*sk_sleep(sk),
1438 				sock->state != SS_CONNECTING,
1439 				timeout ? (long)msecs_to_jiffies(timeout)
1440 					: MAX_SCHEDULE_TIMEOUT);
1441 		lock_sock(sk);
1442 		if (res <= 0) {
1443 			if (res == 0)
1444 				res = -ETIMEDOUT;
1445 			else
1446 				; /* leave "res" unchanged */
1447 			goto exit;
1448 		}
1449 	}
1450 
1451 	if (unlikely(sock->state == SS_DISCONNECTING))
1452 		res = sock_error(sk);
1453 	else
1454 		res = 0;
1455 
1456 exit:
1457 	release_sock(sk);
1458 	return res;
1459 }
1460 
1461 /**
1462  * listen - allow socket to listen for incoming connections
1463  * @sock: socket structure
1464  * @len: (unused)
1465  *
1466  * Returns 0 on success, errno otherwise
1467  */
1468 static int listen(struct socket *sock, int len)
1469 {
1470 	struct sock *sk = sock->sk;
1471 	int res;
1472 
1473 	lock_sock(sk);
1474 
1475 	if (sock->state != SS_UNCONNECTED)
1476 		res = -EINVAL;
1477 	else {
1478 		sock->state = SS_LISTENING;
1479 		res = 0;
1480 	}
1481 
1482 	release_sock(sk);
1483 	return res;
1484 }
1485 
1486 /**
1487  * accept - wait for connection request
1488  * @sock: listening socket
1489  * @newsock: new socket that is to be connected
1490  * @flags: file-related flags associated with socket
1491  *
1492  * Returns 0 on success, errno otherwise
1493  */
1494 static int accept(struct socket *sock, struct socket *new_sock, int flags)
1495 {
1496 	struct sock *new_sk, *sk = sock->sk;
1497 	struct sk_buff *buf;
1498 	struct tipc_sock *new_tsock;
1499 	struct tipc_port *new_tport;
1500 	struct tipc_msg *msg;
1501 	u32 new_ref;
1502 
1503 	int res;
1504 
1505 	lock_sock(sk);
1506 
1507 	if (sock->state != SS_LISTENING) {
1508 		res = -EINVAL;
1509 		goto exit;
1510 	}
1511 
1512 	while (skb_queue_empty(&sk->sk_receive_queue)) {
1513 		if (flags & O_NONBLOCK) {
1514 			res = -EWOULDBLOCK;
1515 			goto exit;
1516 		}
1517 		release_sock(sk);
1518 		res = wait_event_interruptible(*sk_sleep(sk),
1519 				(!skb_queue_empty(&sk->sk_receive_queue)));
1520 		lock_sock(sk);
1521 		if (res)
1522 			goto exit;
1523 	}
1524 
1525 	buf = skb_peek(&sk->sk_receive_queue);
1526 
1527 	res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1528 	if (res)
1529 		goto exit;
1530 
1531 	new_sk = new_sock->sk;
1532 	new_tsock = tipc_sk(new_sk);
1533 	new_tport = new_tsock->p;
1534 	new_ref = new_tport->ref;
1535 	msg = buf_msg(buf);
1536 
1537 	/* we lock on new_sk; but lockdep sees the lock on sk */
1538 	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1539 
1540 	/*
1541 	 * Reject any stray messages received by new socket
1542 	 * before the socket lock was taken (very, very unlikely)
1543 	 */
1544 	reject_rx_queue(new_sk);
1545 
1546 	/* Connect new socket to it's peer */
1547 	new_tsock->peer_name.ref = msg_origport(msg);
1548 	new_tsock->peer_name.node = msg_orignode(msg);
1549 	tipc_connect(new_ref, &new_tsock->peer_name);
1550 	new_sock->state = SS_CONNECTED;
1551 
1552 	tipc_set_portimportance(new_ref, msg_importance(msg));
1553 	if (msg_named(msg)) {
1554 		new_tport->conn_type = msg_nametype(msg);
1555 		new_tport->conn_instance = msg_nameinst(msg);
1556 	}
1557 
1558 	/*
1559 	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1560 	 * Respond to 'SYN+' by queuing it on new socket.
1561 	 */
1562 	if (!msg_data_sz(msg)) {
1563 		struct msghdr m = {NULL,};
1564 
1565 		advance_rx_queue(sk);
1566 		send_packet(NULL, new_sock, &m, 0);
1567 	} else {
1568 		__skb_dequeue(&sk->sk_receive_queue);
1569 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
1570 		skb_set_owner_r(buf, new_sk);
1571 	}
1572 	release_sock(new_sk);
1573 
1574 exit:
1575 	release_sock(sk);
1576 	return res;
1577 }
1578 
1579 /**
1580  * shutdown - shutdown socket connection
1581  * @sock: socket structure
1582  * @how: direction to close (must be SHUT_RDWR)
1583  *
1584  * Terminates connection (if necessary), then purges socket's receive queue.
1585  *
1586  * Returns 0 on success, errno otherwise
1587  */
1588 static int shutdown(struct socket *sock, int how)
1589 {
1590 	struct sock *sk = sock->sk;
1591 	struct tipc_port *tport = tipc_sk_port(sk);
1592 	struct sk_buff *buf;
1593 	int res;
1594 
1595 	if (how != SHUT_RDWR)
1596 		return -EINVAL;
1597 
1598 	lock_sock(sk);
1599 
1600 	switch (sock->state) {
1601 	case SS_CONNECTING:
1602 	case SS_CONNECTED:
1603 
1604 restart:
1605 		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1606 		buf = __skb_dequeue(&sk->sk_receive_queue);
1607 		if (buf) {
1608 			if (TIPC_SKB_CB(buf)->handle != 0) {
1609 				kfree_skb(buf);
1610 				goto restart;
1611 			}
1612 			tipc_disconnect(tport->ref);
1613 			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1614 		} else {
1615 			tipc_shutdown(tport->ref);
1616 		}
1617 
1618 		sock->state = SS_DISCONNECTING;
1619 
1620 		/* fall through */
1621 
1622 	case SS_DISCONNECTING:
1623 
1624 		/* Discard any unreceived messages */
1625 		__skb_queue_purge(&sk->sk_receive_queue);
1626 
1627 		/* Wake up anyone sleeping in poll */
1628 		sk->sk_state_change(sk);
1629 		res = 0;
1630 		break;
1631 
1632 	default:
1633 		res = -ENOTCONN;
1634 	}
1635 
1636 	release_sock(sk);
1637 	return res;
1638 }
1639 
1640 /**
1641  * setsockopt - set socket option
1642  * @sock: socket structure
1643  * @lvl: option level
1644  * @opt: option identifier
1645  * @ov: pointer to new option value
1646  * @ol: length of option value
1647  *
1648  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1649  * (to ease compatibility).
1650  *
1651  * Returns 0 on success, errno otherwise
1652  */
1653 static int setsockopt(struct socket *sock,
1654 		      int lvl, int opt, char __user *ov, unsigned int ol)
1655 {
1656 	struct sock *sk = sock->sk;
1657 	struct tipc_port *tport = tipc_sk_port(sk);
1658 	u32 value;
1659 	int res;
1660 
1661 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1662 		return 0;
1663 	if (lvl != SOL_TIPC)
1664 		return -ENOPROTOOPT;
1665 	if (ol < sizeof(value))
1666 		return -EINVAL;
1667 	res = get_user(value, (u32 __user *)ov);
1668 	if (res)
1669 		return res;
1670 
1671 	lock_sock(sk);
1672 
1673 	switch (opt) {
1674 	case TIPC_IMPORTANCE:
1675 		res = tipc_set_portimportance(tport->ref, value);
1676 		break;
1677 	case TIPC_SRC_DROPPABLE:
1678 		if (sock->type != SOCK_STREAM)
1679 			res = tipc_set_portunreliable(tport->ref, value);
1680 		else
1681 			res = -ENOPROTOOPT;
1682 		break;
1683 	case TIPC_DEST_DROPPABLE:
1684 		res = tipc_set_portunreturnable(tport->ref, value);
1685 		break;
1686 	case TIPC_CONN_TIMEOUT:
1687 		tipc_sk(sk)->conn_timeout = value;
1688 		/* no need to set "res", since already 0 at this point */
1689 		break;
1690 	default:
1691 		res = -EINVAL;
1692 	}
1693 
1694 	release_sock(sk);
1695 
1696 	return res;
1697 }
1698 
1699 /**
1700  * getsockopt - get socket option
1701  * @sock: socket structure
1702  * @lvl: option level
1703  * @opt: option identifier
1704  * @ov: receptacle for option value
1705  * @ol: receptacle for length of option value
1706  *
1707  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1708  * (to ease compatibility).
1709  *
1710  * Returns 0 on success, errno otherwise
1711  */
1712 static int getsockopt(struct socket *sock,
1713 		      int lvl, int opt, char __user *ov, int __user *ol)
1714 {
1715 	struct sock *sk = sock->sk;
1716 	struct tipc_port *tport = tipc_sk_port(sk);
1717 	int len;
1718 	u32 value;
1719 	int res;
1720 
1721 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1722 		return put_user(0, ol);
1723 	if (lvl != SOL_TIPC)
1724 		return -ENOPROTOOPT;
1725 	res = get_user(len, ol);
1726 	if (res)
1727 		return res;
1728 
1729 	lock_sock(sk);
1730 
1731 	switch (opt) {
1732 	case TIPC_IMPORTANCE:
1733 		res = tipc_portimportance(tport->ref, &value);
1734 		break;
1735 	case TIPC_SRC_DROPPABLE:
1736 		res = tipc_portunreliable(tport->ref, &value);
1737 		break;
1738 	case TIPC_DEST_DROPPABLE:
1739 		res = tipc_portunreturnable(tport->ref, &value);
1740 		break;
1741 	case TIPC_CONN_TIMEOUT:
1742 		value = tipc_sk(sk)->conn_timeout;
1743 		/* no need to set "res", since already 0 at this point */
1744 		break;
1745 	case TIPC_NODE_RECVQ_DEPTH:
1746 		value = 0; /* was tipc_queue_size, now obsolete */
1747 		break;
1748 	case TIPC_SOCK_RECVQ_DEPTH:
1749 		value = skb_queue_len(&sk->sk_receive_queue);
1750 		break;
1751 	default:
1752 		res = -EINVAL;
1753 	}
1754 
1755 	release_sock(sk);
1756 
1757 	if (res)
1758 		return res;	/* "get" failed */
1759 
1760 	if (len < sizeof(value))
1761 		return -EINVAL;
1762 
1763 	if (copy_to_user(ov, &value, sizeof(value)))
1764 		return -EFAULT;
1765 
1766 	return put_user(sizeof(value), ol);
1767 }
1768 
1769 /* Protocol switches for the various types of TIPC sockets */
1770 
1771 static const struct proto_ops msg_ops = {
1772 	.owner		= THIS_MODULE,
1773 	.family		= AF_TIPC,
1774 	.release	= release,
1775 	.bind		= bind,
1776 	.connect	= connect,
1777 	.socketpair	= sock_no_socketpair,
1778 	.accept		= sock_no_accept,
1779 	.getname	= get_name,
1780 	.poll		= poll,
1781 	.ioctl		= sock_no_ioctl,
1782 	.listen		= sock_no_listen,
1783 	.shutdown	= shutdown,
1784 	.setsockopt	= setsockopt,
1785 	.getsockopt	= getsockopt,
1786 	.sendmsg	= send_msg,
1787 	.recvmsg	= recv_msg,
1788 	.mmap		= sock_no_mmap,
1789 	.sendpage	= sock_no_sendpage
1790 };
1791 
1792 static const struct proto_ops packet_ops = {
1793 	.owner		= THIS_MODULE,
1794 	.family		= AF_TIPC,
1795 	.release	= release,
1796 	.bind		= bind,
1797 	.connect	= connect,
1798 	.socketpair	= sock_no_socketpair,
1799 	.accept		= accept,
1800 	.getname	= get_name,
1801 	.poll		= poll,
1802 	.ioctl		= sock_no_ioctl,
1803 	.listen		= listen,
1804 	.shutdown	= shutdown,
1805 	.setsockopt	= setsockopt,
1806 	.getsockopt	= getsockopt,
1807 	.sendmsg	= send_packet,
1808 	.recvmsg	= recv_msg,
1809 	.mmap		= sock_no_mmap,
1810 	.sendpage	= sock_no_sendpage
1811 };
1812 
1813 static const struct proto_ops stream_ops = {
1814 	.owner		= THIS_MODULE,
1815 	.family		= AF_TIPC,
1816 	.release	= release,
1817 	.bind		= bind,
1818 	.connect	= connect,
1819 	.socketpair	= sock_no_socketpair,
1820 	.accept		= accept,
1821 	.getname	= get_name,
1822 	.poll		= poll,
1823 	.ioctl		= sock_no_ioctl,
1824 	.listen		= listen,
1825 	.shutdown	= shutdown,
1826 	.setsockopt	= setsockopt,
1827 	.getsockopt	= getsockopt,
1828 	.sendmsg	= send_stream,
1829 	.recvmsg	= recv_stream,
1830 	.mmap		= sock_no_mmap,
1831 	.sendpage	= sock_no_sendpage
1832 };
1833 
1834 static const struct net_proto_family tipc_family_ops = {
1835 	.owner		= THIS_MODULE,
1836 	.family		= AF_TIPC,
1837 	.create		= tipc_create
1838 };
1839 
1840 static struct proto tipc_proto = {
1841 	.name		= "TIPC",
1842 	.owner		= THIS_MODULE,
1843 	.obj_size	= sizeof(struct tipc_sock)
1844 };
1845 
1846 /**
1847  * tipc_socket_init - initialize TIPC socket interface
1848  *
1849  * Returns 0 on success, errno otherwise
1850  */
1851 int tipc_socket_init(void)
1852 {
1853 	int res;
1854 
1855 	res = proto_register(&tipc_proto, 1);
1856 	if (res) {
1857 		pr_err("Failed to register TIPC protocol type\n");
1858 		goto out;
1859 	}
1860 
1861 	res = sock_register(&tipc_family_ops);
1862 	if (res) {
1863 		pr_err("Failed to register TIPC socket type\n");
1864 		proto_unregister(&tipc_proto);
1865 		goto out;
1866 	}
1867 
1868 	sockets_enabled = 1;
1869  out:
1870 	return res;
1871 }
1872 
1873 /**
1874  * tipc_socket_stop - stop TIPC socket interface
1875  */
1876 void tipc_socket_stop(void)
1877 {
1878 	if (!sockets_enabled)
1879 		return;
1880 
1881 	sockets_enabled = 0;
1882 	sock_unregister(tipc_family_ops.family);
1883 	proto_unregister(&tipc_proto);
1884 }
1885