xref: /openbmc/linux/net/tipc/socket.c (revision ca79522c)
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012 Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "port.h"
39 
40 #include <linux/export.h>
41 #include <net/sock.h>
42 
43 #define SS_LISTENING	-1	/* socket is listening */
44 #define SS_READY	-2	/* socket is connectionless */
45 
46 #define CONN_OVERLOAD_LIMIT	((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
47 				SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
48 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
49 
50 struct tipc_sock {
51 	struct sock sk;
52 	struct tipc_port *p;
53 	struct tipc_portid peer_name;
54 	unsigned int conn_timeout;
55 };
56 
57 #define tipc_sk(sk) ((struct tipc_sock *)(sk))
58 #define tipc_sk_port(sk) (tipc_sk(sk)->p)
59 
60 #define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
61 			(sock->state == SS_DISCONNECTING))
62 
63 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
64 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
65 static void wakeupdispatch(struct tipc_port *tport);
66 static void tipc_data_ready(struct sock *sk, int len);
67 static void tipc_write_space(struct sock *sk);
68 
69 static const struct proto_ops packet_ops;
70 static const struct proto_ops stream_ops;
71 static const struct proto_ops msg_ops;
72 
73 static struct proto tipc_proto;
74 
75 static int sockets_enabled;
76 
77 /*
78  * Revised TIPC socket locking policy:
79  *
80  * Most socket operations take the standard socket lock when they start
81  * and hold it until they finish (or until they need to sleep).  Acquiring
82  * this lock grants the owner exclusive access to the fields of the socket
83  * data structures, with the exception of the backlog queue.  A few socket
84  * operations can be done without taking the socket lock because they only
85  * read socket information that never changes during the life of the socket.
86  *
87  * Socket operations may acquire the lock for the associated TIPC port if they
88  * need to perform an operation on the port.  If any routine needs to acquire
89  * both the socket lock and the port lock it must take the socket lock first
90  * to avoid the risk of deadlock.
91  *
92  * The dispatcher handling incoming messages cannot grab the socket lock in
93  * the standard fashion, since invoked it runs at the BH level and cannot block.
94  * Instead, it checks to see if the socket lock is currently owned by someone,
95  * and either handles the message itself or adds it to the socket's backlog
96  * queue; in the latter case the queued message is processed once the process
97  * owning the socket lock releases it.
98  *
99  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
100  * the problem of a blocked socket operation preventing any other operations
101  * from occurring.  However, applications must be careful if they have
102  * multiple threads trying to send (or receive) on the same socket, as these
103  * operations might interfere with each other.  For example, doing a connect
104  * and a receive at the same time might allow the receive to consume the
105  * ACK message meant for the connect.  While additional work could be done
106  * to try and overcome this, it doesn't seem to be worthwhile at the present.
107  *
108  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
109  * that another operation that must be performed in a non-blocking manner is
110  * not delayed for very long because the lock has already been taken.
111  *
112  * NOTE: This code assumes that certain fields of a port/socket pair are
113  * constant over its lifetime; such fields can be examined without taking
114  * the socket lock and/or port lock, and do not need to be re-read even
115  * after resuming processing after waiting.  These fields include:
116  *   - socket type
117  *   - pointer to socket sk structure (aka tipc_sock structure)
118  *   - pointer to port structure
119  *   - port reference
120  */
121 
122 /**
123  * advance_rx_queue - discard first buffer in socket receive queue
124  *
125  * Caller must hold socket lock
126  */
127 static void advance_rx_queue(struct sock *sk)
128 {
129 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
130 }
131 
132 /**
133  * reject_rx_queue - reject all buffers in socket receive queue
134  *
135  * Caller must hold socket lock
136  */
137 static void reject_rx_queue(struct sock *sk)
138 {
139 	struct sk_buff *buf;
140 
141 	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
142 		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
143 }
144 
145 /**
146  * tipc_create - create a TIPC socket
147  * @net: network namespace (must be default network)
148  * @sock: pre-allocated socket structure
149  * @protocol: protocol indicator (must be 0)
150  * @kern: caused by kernel or by userspace?
151  *
152  * This routine creates additional data structures used by the TIPC socket,
153  * initializes them, and links them together.
154  *
155  * Returns 0 on success, errno otherwise
156  */
157 static int tipc_create(struct net *net, struct socket *sock, int protocol,
158 		       int kern)
159 {
160 	const struct proto_ops *ops;
161 	socket_state state;
162 	struct sock *sk;
163 	struct tipc_port *tp_ptr;
164 
165 	/* Validate arguments */
166 	if (unlikely(protocol != 0))
167 		return -EPROTONOSUPPORT;
168 
169 	switch (sock->type) {
170 	case SOCK_STREAM:
171 		ops = &stream_ops;
172 		state = SS_UNCONNECTED;
173 		break;
174 	case SOCK_SEQPACKET:
175 		ops = &packet_ops;
176 		state = SS_UNCONNECTED;
177 		break;
178 	case SOCK_DGRAM:
179 	case SOCK_RDM:
180 		ops = &msg_ops;
181 		state = SS_READY;
182 		break;
183 	default:
184 		return -EPROTOTYPE;
185 	}
186 
187 	/* Allocate socket's protocol area */
188 	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
189 	if (sk == NULL)
190 		return -ENOMEM;
191 
192 	/* Allocate TIPC port for socket to use */
193 	tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
194 				     TIPC_LOW_IMPORTANCE);
195 	if (unlikely(!tp_ptr)) {
196 		sk_free(sk);
197 		return -ENOMEM;
198 	}
199 
200 	/* Finish initializing socket data structures */
201 	sock->ops = ops;
202 	sock->state = state;
203 
204 	sock_init_data(sock, sk);
205 	sk->sk_backlog_rcv = backlog_rcv;
206 	sk->sk_data_ready = tipc_data_ready;
207 	sk->sk_write_space = tipc_write_space;
208 	tipc_sk(sk)->p = tp_ptr;
209 	tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
210 
211 	spin_unlock_bh(tp_ptr->lock);
212 
213 	if (sock->state == SS_READY) {
214 		tipc_set_portunreturnable(tp_ptr->ref, 1);
215 		if (sock->type == SOCK_DGRAM)
216 			tipc_set_portunreliable(tp_ptr->ref, 1);
217 	}
218 
219 	return 0;
220 }
221 
222 /**
223  * release - destroy a TIPC socket
224  * @sock: socket to destroy
225  *
226  * This routine cleans up any messages that are still queued on the socket.
227  * For DGRAM and RDM socket types, all queued messages are rejected.
228  * For SEQPACKET and STREAM socket types, the first message is rejected
229  * and any others are discarded.  (If the first message on a STREAM socket
230  * is partially-read, it is discarded and the next one is rejected instead.)
231  *
232  * NOTE: Rejected messages are not necessarily returned to the sender!  They
233  * are returned or discarded according to the "destination droppable" setting
234  * specified for the message by the sender.
235  *
236  * Returns 0 on success, errno otherwise
237  */
238 static int release(struct socket *sock)
239 {
240 	struct sock *sk = sock->sk;
241 	struct tipc_port *tport;
242 	struct sk_buff *buf;
243 	int res;
244 
245 	/*
246 	 * Exit if socket isn't fully initialized (occurs when a failed accept()
247 	 * releases a pre-allocated child socket that was never used)
248 	 */
249 	if (sk == NULL)
250 		return 0;
251 
252 	tport = tipc_sk_port(sk);
253 	lock_sock(sk);
254 
255 	/*
256 	 * Reject all unreceived messages, except on an active connection
257 	 * (which disconnects locally & sends a 'FIN+' to peer)
258 	 */
259 	while (sock->state != SS_DISCONNECTING) {
260 		buf = __skb_dequeue(&sk->sk_receive_queue);
261 		if (buf == NULL)
262 			break;
263 		if (TIPC_SKB_CB(buf)->handle != 0)
264 			kfree_skb(buf);
265 		else {
266 			if ((sock->state == SS_CONNECTING) ||
267 			    (sock->state == SS_CONNECTED)) {
268 				sock->state = SS_DISCONNECTING;
269 				tipc_disconnect(tport->ref);
270 			}
271 			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
272 		}
273 	}
274 
275 	/*
276 	 * Delete TIPC port; this ensures no more messages are queued
277 	 * (also disconnects an active connection & sends a 'FIN-' to peer)
278 	 */
279 	res = tipc_deleteport(tport->ref);
280 
281 	/* Discard any remaining (connection-based) messages in receive queue */
282 	__skb_queue_purge(&sk->sk_receive_queue);
283 
284 	/* Reject any messages that accumulated in backlog queue */
285 	sock->state = SS_DISCONNECTING;
286 	release_sock(sk);
287 
288 	sock_put(sk);
289 	sock->sk = NULL;
290 
291 	return res;
292 }
293 
294 /**
295  * bind - associate or disassocate TIPC name(s) with a socket
296  * @sock: socket structure
297  * @uaddr: socket address describing name(s) and desired operation
298  * @uaddr_len: size of socket address data structure
299  *
300  * Name and name sequence binding is indicated using a positive scope value;
301  * a negative scope value unbinds the specified name.  Specifying no name
302  * (i.e. a socket address length of 0) unbinds all names from the socket.
303  *
304  * Returns 0 on success, errno otherwise
305  *
306  * NOTE: This routine doesn't need to take the socket lock since it doesn't
307  *       access any non-constant socket information.
308  */
309 static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
310 {
311 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
312 	u32 portref = tipc_sk_port(sock->sk)->ref;
313 
314 	if (unlikely(!uaddr_len))
315 		return tipc_withdraw(portref, 0, NULL);
316 
317 	if (uaddr_len < sizeof(struct sockaddr_tipc))
318 		return -EINVAL;
319 	if (addr->family != AF_TIPC)
320 		return -EAFNOSUPPORT;
321 
322 	if (addr->addrtype == TIPC_ADDR_NAME)
323 		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
324 	else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
325 		return -EAFNOSUPPORT;
326 
327 	if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
328 		return -EACCES;
329 
330 	return (addr->scope > 0) ?
331 		tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
332 		tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
333 }
334 
335 /**
336  * get_name - get port ID of socket or peer socket
337  * @sock: socket structure
338  * @uaddr: area for returned socket address
339  * @uaddr_len: area for returned length of socket address
340  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
341  *
342  * Returns 0 on success, errno otherwise
343  *
344  * NOTE: This routine doesn't need to take the socket lock since it only
345  *       accesses socket information that is unchanging (or which changes in
346  *       a completely predictable manner).
347  */
348 static int get_name(struct socket *sock, struct sockaddr *uaddr,
349 		    int *uaddr_len, int peer)
350 {
351 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
352 	struct tipc_sock *tsock = tipc_sk(sock->sk);
353 
354 	memset(addr, 0, sizeof(*addr));
355 	if (peer) {
356 		if ((sock->state != SS_CONNECTED) &&
357 			((peer != 2) || (sock->state != SS_DISCONNECTING)))
358 			return -ENOTCONN;
359 		addr->addr.id.ref = tsock->peer_name.ref;
360 		addr->addr.id.node = tsock->peer_name.node;
361 	} else {
362 		addr->addr.id.ref = tsock->p->ref;
363 		addr->addr.id.node = tipc_own_addr;
364 	}
365 
366 	*uaddr_len = sizeof(*addr);
367 	addr->addrtype = TIPC_ADDR_ID;
368 	addr->family = AF_TIPC;
369 	addr->scope = 0;
370 	addr->addr.name.domain = 0;
371 
372 	return 0;
373 }
374 
375 /**
376  * poll - read and possibly block on pollmask
377  * @file: file structure associated with the socket
378  * @sock: socket for which to calculate the poll bits
379  * @wait: ???
380  *
381  * Returns pollmask value
382  *
383  * COMMENTARY:
384  * It appears that the usual socket locking mechanisms are not useful here
385  * since the pollmask info is potentially out-of-date the moment this routine
386  * exits.  TCP and other protocols seem to rely on higher level poll routines
387  * to handle any preventable race conditions, so TIPC will do the same ...
388  *
389  * TIPC sets the returned events as follows:
390  *
391  * socket state		flags set
392  * ------------		---------
393  * unconnected		no read flags
394  *			POLLOUT if port is not congested
395  *
396  * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue
397  *			no write flags
398  *
399  * connected		POLLIN/POLLRDNORM if data in rx queue
400  *			POLLOUT if port is not congested
401  *
402  * disconnecting	POLLIN/POLLRDNORM/POLLHUP
403  *			no write flags
404  *
405  * listening		POLLIN if SYN in rx queue
406  *			no write flags
407  *
408  * ready		POLLIN/POLLRDNORM if data in rx queue
409  * [connectionless]	POLLOUT (since port cannot be congested)
410  *
411  * IMPORTANT: The fact that a read or write operation is indicated does NOT
412  * imply that the operation will succeed, merely that it should be performed
413  * and will not block.
414  */
415 static unsigned int poll(struct file *file, struct socket *sock,
416 			 poll_table *wait)
417 {
418 	struct sock *sk = sock->sk;
419 	u32 mask = 0;
420 
421 	sock_poll_wait(file, sk_sleep(sk), wait);
422 
423 	switch ((int)sock->state) {
424 	case SS_UNCONNECTED:
425 		if (!tipc_sk_port(sk)->congested)
426 			mask |= POLLOUT;
427 		break;
428 	case SS_READY:
429 	case SS_CONNECTED:
430 		if (!tipc_sk_port(sk)->congested)
431 			mask |= POLLOUT;
432 		/* fall thru' */
433 	case SS_CONNECTING:
434 	case SS_LISTENING:
435 		if (!skb_queue_empty(&sk->sk_receive_queue))
436 			mask |= (POLLIN | POLLRDNORM);
437 		break;
438 	case SS_DISCONNECTING:
439 		mask = (POLLIN | POLLRDNORM | POLLHUP);
440 		break;
441 	}
442 
443 	return mask;
444 }
445 
446 /**
447  * dest_name_check - verify user is permitted to send to specified port name
448  * @dest: destination address
449  * @m: descriptor for message to be sent
450  *
451  * Prevents restricted configuration commands from being issued by
452  * unauthorized users.
453  *
454  * Returns 0 if permission is granted, otherwise errno
455  */
456 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
457 {
458 	struct tipc_cfg_msg_hdr hdr;
459 
460 	if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
461 		return 0;
462 	if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
463 		return 0;
464 	if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
465 		return -EACCES;
466 
467 	if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
468 		return -EMSGSIZE;
469 	if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
470 		return -EFAULT;
471 	if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
472 		return -EACCES;
473 
474 	return 0;
475 }
476 
477 /**
478  * send_msg - send message in connectionless manner
479  * @iocb: if NULL, indicates that socket lock is already held
480  * @sock: socket structure
481  * @m: message to send
482  * @total_len: length of message
483  *
484  * Message must have an destination specified explicitly.
485  * Used for SOCK_RDM and SOCK_DGRAM messages,
486  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
487  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
488  *
489  * Returns the number of bytes sent on success, or errno otherwise
490  */
491 static int send_msg(struct kiocb *iocb, struct socket *sock,
492 		    struct msghdr *m, size_t total_len)
493 {
494 	struct sock *sk = sock->sk;
495 	struct tipc_port *tport = tipc_sk_port(sk);
496 	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
497 	int needs_conn;
498 	long timeout_val;
499 	int res = -EINVAL;
500 
501 	if (unlikely(!dest))
502 		return -EDESTADDRREQ;
503 	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
504 		     (dest->family != AF_TIPC)))
505 		return -EINVAL;
506 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
507 		return -EMSGSIZE;
508 
509 	if (iocb)
510 		lock_sock(sk);
511 
512 	needs_conn = (sock->state != SS_READY);
513 	if (unlikely(needs_conn)) {
514 		if (sock->state == SS_LISTENING) {
515 			res = -EPIPE;
516 			goto exit;
517 		}
518 		if (sock->state != SS_UNCONNECTED) {
519 			res = -EISCONN;
520 			goto exit;
521 		}
522 		if ((tport->published) ||
523 		    ((sock->type == SOCK_STREAM) && (total_len != 0))) {
524 			res = -EOPNOTSUPP;
525 			goto exit;
526 		}
527 		if (dest->addrtype == TIPC_ADDR_NAME) {
528 			tport->conn_type = dest->addr.name.name.type;
529 			tport->conn_instance = dest->addr.name.name.instance;
530 		}
531 
532 		/* Abort any pending connection attempts (very unlikely) */
533 		reject_rx_queue(sk);
534 	}
535 
536 	timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
537 
538 	do {
539 		if (dest->addrtype == TIPC_ADDR_NAME) {
540 			res = dest_name_check(dest, m);
541 			if (res)
542 				break;
543 			res = tipc_send2name(tport->ref,
544 					     &dest->addr.name.name,
545 					     dest->addr.name.domain,
546 					     m->msg_iovlen,
547 					     m->msg_iov,
548 					     total_len);
549 		} else if (dest->addrtype == TIPC_ADDR_ID) {
550 			res = tipc_send2port(tport->ref,
551 					     &dest->addr.id,
552 					     m->msg_iovlen,
553 					     m->msg_iov,
554 					     total_len);
555 		} else if (dest->addrtype == TIPC_ADDR_MCAST) {
556 			if (needs_conn) {
557 				res = -EOPNOTSUPP;
558 				break;
559 			}
560 			res = dest_name_check(dest, m);
561 			if (res)
562 				break;
563 			res = tipc_multicast(tport->ref,
564 					     &dest->addr.nameseq,
565 					     m->msg_iovlen,
566 					     m->msg_iov,
567 					     total_len);
568 		}
569 		if (likely(res != -ELINKCONG)) {
570 			if (needs_conn && (res >= 0))
571 				sock->state = SS_CONNECTING;
572 			break;
573 		}
574 		if (timeout_val <= 0L) {
575 			res = timeout_val ? timeout_val : -EWOULDBLOCK;
576 			break;
577 		}
578 		release_sock(sk);
579 		timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
580 					       !tport->congested, timeout_val);
581 		lock_sock(sk);
582 	} while (1);
583 
584 exit:
585 	if (iocb)
586 		release_sock(sk);
587 	return res;
588 }
589 
590 /**
591  * send_packet - send a connection-oriented message
592  * @iocb: if NULL, indicates that socket lock is already held
593  * @sock: socket structure
594  * @m: message to send
595  * @total_len: length of message
596  *
597  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
598  *
599  * Returns the number of bytes sent on success, or errno otherwise
600  */
601 static int send_packet(struct kiocb *iocb, struct socket *sock,
602 		       struct msghdr *m, size_t total_len)
603 {
604 	struct sock *sk = sock->sk;
605 	struct tipc_port *tport = tipc_sk_port(sk);
606 	struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
607 	long timeout_val;
608 	int res;
609 
610 	/* Handle implied connection establishment */
611 	if (unlikely(dest))
612 		return send_msg(iocb, sock, m, total_len);
613 
614 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
615 		return -EMSGSIZE;
616 
617 	if (iocb)
618 		lock_sock(sk);
619 
620 	timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
621 
622 	do {
623 		if (unlikely(sock->state != SS_CONNECTED)) {
624 			if (sock->state == SS_DISCONNECTING)
625 				res = -EPIPE;
626 			else
627 				res = -ENOTCONN;
628 			break;
629 		}
630 
631 		res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
632 				total_len);
633 		if (likely(res != -ELINKCONG))
634 			break;
635 		if (timeout_val <= 0L) {
636 			res = timeout_val ? timeout_val : -EWOULDBLOCK;
637 			break;
638 		}
639 		release_sock(sk);
640 		timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
641 			(!tport->congested || !tport->connected), timeout_val);
642 		lock_sock(sk);
643 	} while (1);
644 
645 	if (iocb)
646 		release_sock(sk);
647 	return res;
648 }
649 
650 /**
651  * send_stream - send stream-oriented data
652  * @iocb: (unused)
653  * @sock: socket structure
654  * @m: data to send
655  * @total_len: total length of data to be sent
656  *
657  * Used for SOCK_STREAM data.
658  *
659  * Returns the number of bytes sent on success (or partial success),
660  * or errno if no data sent
661  */
662 static int send_stream(struct kiocb *iocb, struct socket *sock,
663 		       struct msghdr *m, size_t total_len)
664 {
665 	struct sock *sk = sock->sk;
666 	struct tipc_port *tport = tipc_sk_port(sk);
667 	struct msghdr my_msg;
668 	struct iovec my_iov;
669 	struct iovec *curr_iov;
670 	int curr_iovlen;
671 	char __user *curr_start;
672 	u32 hdr_size;
673 	int curr_left;
674 	int bytes_to_send;
675 	int bytes_sent;
676 	int res;
677 
678 	lock_sock(sk);
679 
680 	/* Handle special cases where there is no connection */
681 	if (unlikely(sock->state != SS_CONNECTED)) {
682 		if (sock->state == SS_UNCONNECTED) {
683 			res = send_packet(NULL, sock, m, total_len);
684 			goto exit;
685 		} else if (sock->state == SS_DISCONNECTING) {
686 			res = -EPIPE;
687 			goto exit;
688 		} else {
689 			res = -ENOTCONN;
690 			goto exit;
691 		}
692 	}
693 
694 	if (unlikely(m->msg_name)) {
695 		res = -EISCONN;
696 		goto exit;
697 	}
698 
699 	if (total_len > (unsigned int)INT_MAX) {
700 		res = -EMSGSIZE;
701 		goto exit;
702 	}
703 
704 	/*
705 	 * Send each iovec entry using one or more messages
706 	 *
707 	 * Note: This algorithm is good for the most likely case
708 	 * (i.e. one large iovec entry), but could be improved to pass sets
709 	 * of small iovec entries into send_packet().
710 	 */
711 	curr_iov = m->msg_iov;
712 	curr_iovlen = m->msg_iovlen;
713 	my_msg.msg_iov = &my_iov;
714 	my_msg.msg_iovlen = 1;
715 	my_msg.msg_flags = m->msg_flags;
716 	my_msg.msg_name = NULL;
717 	bytes_sent = 0;
718 
719 	hdr_size = msg_hdr_sz(&tport->phdr);
720 
721 	while (curr_iovlen--) {
722 		curr_start = curr_iov->iov_base;
723 		curr_left = curr_iov->iov_len;
724 
725 		while (curr_left) {
726 			bytes_to_send = tport->max_pkt - hdr_size;
727 			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
728 				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
729 			if (curr_left < bytes_to_send)
730 				bytes_to_send = curr_left;
731 			my_iov.iov_base = curr_start;
732 			my_iov.iov_len = bytes_to_send;
733 			res = send_packet(NULL, sock, &my_msg, bytes_to_send);
734 			if (res < 0) {
735 				if (bytes_sent)
736 					res = bytes_sent;
737 				goto exit;
738 			}
739 			curr_left -= bytes_to_send;
740 			curr_start += bytes_to_send;
741 			bytes_sent += bytes_to_send;
742 		}
743 
744 		curr_iov++;
745 	}
746 	res = bytes_sent;
747 exit:
748 	release_sock(sk);
749 	return res;
750 }
751 
752 /**
753  * auto_connect - complete connection setup to a remote port
754  * @sock: socket structure
755  * @msg: peer's response message
756  *
757  * Returns 0 on success, errno otherwise
758  */
759 static int auto_connect(struct socket *sock, struct tipc_msg *msg)
760 {
761 	struct tipc_sock *tsock = tipc_sk(sock->sk);
762 	struct tipc_port *p_ptr;
763 
764 	tsock->peer_name.ref = msg_origport(msg);
765 	tsock->peer_name.node = msg_orignode(msg);
766 	p_ptr = tipc_port_deref(tsock->p->ref);
767 	if (!p_ptr)
768 		return -EINVAL;
769 
770 	__tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
771 
772 	if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
773 		return -EINVAL;
774 	msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
775 	sock->state = SS_CONNECTED;
776 	return 0;
777 }
778 
779 /**
780  * set_orig_addr - capture sender's address for received message
781  * @m: descriptor for message info
782  * @msg: received message header
783  *
784  * Note: Address is not captured if not requested by receiver.
785  */
786 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
787 {
788 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
789 
790 	if (addr) {
791 		addr->family = AF_TIPC;
792 		addr->addrtype = TIPC_ADDR_ID;
793 		memset(&addr->addr, 0, sizeof(addr->addr));
794 		addr->addr.id.ref = msg_origport(msg);
795 		addr->addr.id.node = msg_orignode(msg);
796 		addr->addr.name.domain = 0;	/* could leave uninitialized */
797 		addr->scope = 0;		/* could leave uninitialized */
798 		m->msg_namelen = sizeof(struct sockaddr_tipc);
799 	}
800 }
801 
802 /**
803  * anc_data_recv - optionally capture ancillary data for received message
804  * @m: descriptor for message info
805  * @msg: received message header
806  * @tport: TIPC port associated with message
807  *
808  * Note: Ancillary data is not captured if not requested by receiver.
809  *
810  * Returns 0 if successful, otherwise errno
811  */
812 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
813 				struct tipc_port *tport)
814 {
815 	u32 anc_data[3];
816 	u32 err;
817 	u32 dest_type;
818 	int has_name;
819 	int res;
820 
821 	if (likely(m->msg_controllen == 0))
822 		return 0;
823 
824 	/* Optionally capture errored message object(s) */
825 	err = msg ? msg_errcode(msg) : 0;
826 	if (unlikely(err)) {
827 		anc_data[0] = err;
828 		anc_data[1] = msg_data_sz(msg);
829 		res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
830 		if (res)
831 			return res;
832 		if (anc_data[1]) {
833 			res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
834 				       msg_data(msg));
835 			if (res)
836 				return res;
837 		}
838 	}
839 
840 	/* Optionally capture message destination object */
841 	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
842 	switch (dest_type) {
843 	case TIPC_NAMED_MSG:
844 		has_name = 1;
845 		anc_data[0] = msg_nametype(msg);
846 		anc_data[1] = msg_namelower(msg);
847 		anc_data[2] = msg_namelower(msg);
848 		break;
849 	case TIPC_MCAST_MSG:
850 		has_name = 1;
851 		anc_data[0] = msg_nametype(msg);
852 		anc_data[1] = msg_namelower(msg);
853 		anc_data[2] = msg_nameupper(msg);
854 		break;
855 	case TIPC_CONN_MSG:
856 		has_name = (tport->conn_type != 0);
857 		anc_data[0] = tport->conn_type;
858 		anc_data[1] = tport->conn_instance;
859 		anc_data[2] = tport->conn_instance;
860 		break;
861 	default:
862 		has_name = 0;
863 	}
864 	if (has_name) {
865 		res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
866 		if (res)
867 			return res;
868 	}
869 
870 	return 0;
871 }
872 
873 /**
874  * recv_msg - receive packet-oriented message
875  * @iocb: (unused)
876  * @m: descriptor for message info
877  * @buf_len: total size of user buffer area
878  * @flags: receive flags
879  *
880  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
881  * If the complete message doesn't fit in user area, truncate it.
882  *
883  * Returns size of returned message data, errno otherwise
884  */
885 static int recv_msg(struct kiocb *iocb, struct socket *sock,
886 		    struct msghdr *m, size_t buf_len, int flags)
887 {
888 	struct sock *sk = sock->sk;
889 	struct tipc_port *tport = tipc_sk_port(sk);
890 	struct sk_buff *buf;
891 	struct tipc_msg *msg;
892 	long timeout;
893 	unsigned int sz;
894 	u32 err;
895 	int res;
896 
897 	/* Catch invalid receive requests */
898 	if (unlikely(!buf_len))
899 		return -EINVAL;
900 
901 	lock_sock(sk);
902 
903 	if (unlikely(sock->state == SS_UNCONNECTED)) {
904 		res = -ENOTCONN;
905 		goto exit;
906 	}
907 
908 	/* will be updated in set_orig_addr() if needed */
909 	m->msg_namelen = 0;
910 
911 	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
912 restart:
913 
914 	/* Look for a message in receive queue; wait if necessary */
915 	while (skb_queue_empty(&sk->sk_receive_queue)) {
916 		if (sock->state == SS_DISCONNECTING) {
917 			res = -ENOTCONN;
918 			goto exit;
919 		}
920 		if (timeout <= 0L) {
921 			res = timeout ? timeout : -EWOULDBLOCK;
922 			goto exit;
923 		}
924 		release_sock(sk);
925 		timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
926 							   tipc_rx_ready(sock),
927 							   timeout);
928 		lock_sock(sk);
929 	}
930 
931 	/* Look at first message in receive queue */
932 	buf = skb_peek(&sk->sk_receive_queue);
933 	msg = buf_msg(buf);
934 	sz = msg_data_sz(msg);
935 	err = msg_errcode(msg);
936 
937 	/* Discard an empty non-errored message & try again */
938 	if ((!sz) && (!err)) {
939 		advance_rx_queue(sk);
940 		goto restart;
941 	}
942 
943 	/* Capture sender's address (optional) */
944 	set_orig_addr(m, msg);
945 
946 	/* Capture ancillary data (optional) */
947 	res = anc_data_recv(m, msg, tport);
948 	if (res)
949 		goto exit;
950 
951 	/* Capture message data (if valid) & compute return value (always) */
952 	if (!err) {
953 		if (unlikely(buf_len < sz)) {
954 			sz = buf_len;
955 			m->msg_flags |= MSG_TRUNC;
956 		}
957 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
958 					      m->msg_iov, sz);
959 		if (res)
960 			goto exit;
961 		res = sz;
962 	} else {
963 		if ((sock->state == SS_READY) ||
964 		    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
965 			res = 0;
966 		else
967 			res = -ECONNRESET;
968 	}
969 
970 	/* Consume received message (optional) */
971 	if (likely(!(flags & MSG_PEEK))) {
972 		if ((sock->state != SS_READY) &&
973 		    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
974 			tipc_acknowledge(tport->ref, tport->conn_unacked);
975 		advance_rx_queue(sk);
976 	}
977 exit:
978 	release_sock(sk);
979 	return res;
980 }
981 
982 /**
983  * recv_stream - receive stream-oriented data
984  * @iocb: (unused)
985  * @m: descriptor for message info
986  * @buf_len: total size of user buffer area
987  * @flags: receive flags
988  *
989  * Used for SOCK_STREAM messages only.  If not enough data is available
990  * will optionally wait for more; never truncates data.
991  *
992  * Returns size of returned message data, errno otherwise
993  */
994 static int recv_stream(struct kiocb *iocb, struct socket *sock,
995 		       struct msghdr *m, size_t buf_len, int flags)
996 {
997 	struct sock *sk = sock->sk;
998 	struct tipc_port *tport = tipc_sk_port(sk);
999 	struct sk_buff *buf;
1000 	struct tipc_msg *msg;
1001 	long timeout;
1002 	unsigned int sz;
1003 	int sz_to_copy, target, needed;
1004 	int sz_copied = 0;
1005 	u32 err;
1006 	int res = 0;
1007 
1008 	/* Catch invalid receive attempts */
1009 	if (unlikely(!buf_len))
1010 		return -EINVAL;
1011 
1012 	lock_sock(sk);
1013 
1014 	if (unlikely((sock->state == SS_UNCONNECTED) ||
1015 		     (sock->state == SS_CONNECTING))) {
1016 		res = -ENOTCONN;
1017 		goto exit;
1018 	}
1019 
1020 	/* will be updated in set_orig_addr() if needed */
1021 	m->msg_namelen = 0;
1022 
1023 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1024 	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1025 
1026 restart:
1027 	/* Look for a message in receive queue; wait if necessary */
1028 	while (skb_queue_empty(&sk->sk_receive_queue)) {
1029 		if (sock->state == SS_DISCONNECTING) {
1030 			res = -ENOTCONN;
1031 			goto exit;
1032 		}
1033 		if (timeout <= 0L) {
1034 			res = timeout ? timeout : -EWOULDBLOCK;
1035 			goto exit;
1036 		}
1037 		release_sock(sk);
1038 		timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1039 							   tipc_rx_ready(sock),
1040 							   timeout);
1041 		lock_sock(sk);
1042 	}
1043 
1044 	/* Look at first message in receive queue */
1045 	buf = skb_peek(&sk->sk_receive_queue);
1046 	msg = buf_msg(buf);
1047 	sz = msg_data_sz(msg);
1048 	err = msg_errcode(msg);
1049 
1050 	/* Discard an empty non-errored message & try again */
1051 	if ((!sz) && (!err)) {
1052 		advance_rx_queue(sk);
1053 		goto restart;
1054 	}
1055 
1056 	/* Optionally capture sender's address & ancillary data of first msg */
1057 	if (sz_copied == 0) {
1058 		set_orig_addr(m, msg);
1059 		res = anc_data_recv(m, msg, tport);
1060 		if (res)
1061 			goto exit;
1062 	}
1063 
1064 	/* Capture message data (if valid) & compute return value (always) */
1065 	if (!err) {
1066 		u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1067 
1068 		sz -= offset;
1069 		needed = (buf_len - sz_copied);
1070 		sz_to_copy = (sz <= needed) ? sz : needed;
1071 
1072 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1073 					      m->msg_iov, sz_to_copy);
1074 		if (res)
1075 			goto exit;
1076 
1077 		sz_copied += sz_to_copy;
1078 
1079 		if (sz_to_copy < sz) {
1080 			if (!(flags & MSG_PEEK))
1081 				TIPC_SKB_CB(buf)->handle =
1082 				(void *)(unsigned long)(offset + sz_to_copy);
1083 			goto exit;
1084 		}
1085 	} else {
1086 		if (sz_copied != 0)
1087 			goto exit; /* can't add error msg to valid data */
1088 
1089 		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1090 			res = 0;
1091 		else
1092 			res = -ECONNRESET;
1093 	}
1094 
1095 	/* Consume received message (optional) */
1096 	if (likely(!(flags & MSG_PEEK))) {
1097 		if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1098 			tipc_acknowledge(tport->ref, tport->conn_unacked);
1099 		advance_rx_queue(sk);
1100 	}
1101 
1102 	/* Loop around if more data is required */
1103 	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
1104 	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1105 	    (sz_copied < target)) &&	/* and more is ready or required */
1106 	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */
1107 	    (!err))			/* and haven't reached a FIN */
1108 		goto restart;
1109 
1110 exit:
1111 	release_sock(sk);
1112 	return sz_copied ? sz_copied : res;
1113 }
1114 
1115 /**
1116  * tipc_write_space - wake up thread if port congestion is released
1117  * @sk: socket
1118  */
1119 static void tipc_write_space(struct sock *sk)
1120 {
1121 	struct socket_wq *wq;
1122 
1123 	rcu_read_lock();
1124 	wq = rcu_dereference(sk->sk_wq);
1125 	if (wq_has_sleeper(wq))
1126 		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1127 						POLLWRNORM | POLLWRBAND);
1128 	rcu_read_unlock();
1129 }
1130 
1131 /**
1132  * tipc_data_ready - wake up threads to indicate messages have been received
1133  * @sk: socket
1134  * @len: the length of messages
1135  */
1136 static void tipc_data_ready(struct sock *sk, int len)
1137 {
1138 	struct socket_wq *wq;
1139 
1140 	rcu_read_lock();
1141 	wq = rcu_dereference(sk->sk_wq);
1142 	if (wq_has_sleeper(wq))
1143 		wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1144 						POLLRDNORM | POLLRDBAND);
1145 	rcu_read_unlock();
1146 }
1147 
1148 /**
1149  * filter_connect - Handle all incoming messages for a connection-based socket
1150  * @tsock: TIPC socket
1151  * @msg: message
1152  *
1153  * Returns TIPC error status code and socket error status code
1154  * once it encounters some errors
1155  */
1156 static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1157 {
1158 	struct socket *sock = tsock->sk.sk_socket;
1159 	struct tipc_msg *msg = buf_msg(*buf);
1160 	struct sock *sk = &tsock->sk;
1161 	u32 retval = TIPC_ERR_NO_PORT;
1162 	int res;
1163 
1164 	if (msg_mcast(msg))
1165 		return retval;
1166 
1167 	switch ((int)sock->state) {
1168 	case SS_CONNECTED:
1169 		/* Accept only connection-based messages sent by peer */
1170 		if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
1171 			if (unlikely(msg_errcode(msg))) {
1172 				sock->state = SS_DISCONNECTING;
1173 				__tipc_disconnect(tsock->p);
1174 			}
1175 			retval = TIPC_OK;
1176 		}
1177 		break;
1178 	case SS_CONNECTING:
1179 		/* Accept only ACK or NACK message */
1180 		if (unlikely(msg_errcode(msg))) {
1181 			sock->state = SS_DISCONNECTING;
1182 			sk->sk_err = -ECONNREFUSED;
1183 			retval = TIPC_OK;
1184 			break;
1185 		}
1186 
1187 		if (unlikely(!msg_connected(msg)))
1188 			break;
1189 
1190 		res = auto_connect(sock, msg);
1191 		if (res) {
1192 			sock->state = SS_DISCONNECTING;
1193 			sk->sk_err = res;
1194 			retval = TIPC_OK;
1195 			break;
1196 		}
1197 
1198 		/* If an incoming message is an 'ACK-', it should be
1199 		 * discarded here because it doesn't contain useful
1200 		 * data. In addition, we should try to wake up
1201 		 * connect() routine if sleeping.
1202 		 */
1203 		if (msg_data_sz(msg) == 0) {
1204 			kfree_skb(*buf);
1205 			*buf = NULL;
1206 			if (waitqueue_active(sk_sleep(sk)))
1207 				wake_up_interruptible(sk_sleep(sk));
1208 		}
1209 		retval = TIPC_OK;
1210 		break;
1211 	case SS_LISTENING:
1212 	case SS_UNCONNECTED:
1213 		/* Accept only SYN message */
1214 		if (!msg_connected(msg) && !(msg_errcode(msg)))
1215 			retval = TIPC_OK;
1216 		break;
1217 	case SS_DISCONNECTING:
1218 		break;
1219 	default:
1220 		pr_err("Unknown socket state %u\n", sock->state);
1221 	}
1222 	return retval;
1223 }
1224 
1225 /**
1226  * rcvbuf_limit - get proper overload limit of socket receive queue
1227  * @sk: socket
1228  * @buf: message
1229  *
1230  * For all connection oriented messages, irrespective of importance,
1231  * the default overload value (i.e. 67MB) is set as limit.
1232  *
1233  * For all connectionless messages, by default new queue limits are
1234  * as belows:
1235  *
1236  * TIPC_LOW_IMPORTANCE       (5MB)
1237  * TIPC_MEDIUM_IMPORTANCE    (10MB)
1238  * TIPC_HIGH_IMPORTANCE      (20MB)
1239  * TIPC_CRITICAL_IMPORTANCE  (40MB)
1240  *
1241  * Returns overload limit according to corresponding message importance
1242  */
1243 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1244 {
1245 	struct tipc_msg *msg = buf_msg(buf);
1246 	unsigned int limit;
1247 
1248 	if (msg_connected(msg))
1249 		limit = CONN_OVERLOAD_LIMIT;
1250 	else
1251 		limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
1252 	return limit;
1253 }
1254 
1255 /**
1256  * filter_rcv - validate incoming message
1257  * @sk: socket
1258  * @buf: message
1259  *
1260  * Enqueues message on receive queue if acceptable; optionally handles
1261  * disconnect indication for a connected socket.
1262  *
1263  * Called with socket lock already taken; port lock may also be taken.
1264  *
1265  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1266  */
1267 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1268 {
1269 	struct socket *sock = sk->sk_socket;
1270 	struct tipc_msg *msg = buf_msg(buf);
1271 	unsigned int limit = rcvbuf_limit(sk, buf);
1272 	u32 res = TIPC_OK;
1273 
1274 	/* Reject message if it is wrong sort of message for socket */
1275 	if (msg_type(msg) > TIPC_DIRECT_MSG)
1276 		return TIPC_ERR_NO_PORT;
1277 
1278 	if (sock->state == SS_READY) {
1279 		if (msg_connected(msg))
1280 			return TIPC_ERR_NO_PORT;
1281 	} else {
1282 		res = filter_connect(tipc_sk(sk), &buf);
1283 		if (res != TIPC_OK || buf == NULL)
1284 			return res;
1285 	}
1286 
1287 	/* Reject message if there isn't room to queue it */
1288 	if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1289 		return TIPC_ERR_OVERLOAD;
1290 
1291 	/* Enqueue message */
1292 	TIPC_SKB_CB(buf)->handle = 0;
1293 	__skb_queue_tail(&sk->sk_receive_queue, buf);
1294 	skb_set_owner_r(buf, sk);
1295 
1296 	sk->sk_data_ready(sk, 0);
1297 	return TIPC_OK;
1298 }
1299 
1300 /**
1301  * backlog_rcv - handle incoming message from backlog queue
1302  * @sk: socket
1303  * @buf: message
1304  *
1305  * Caller must hold socket lock, but not port lock.
1306  *
1307  * Returns 0
1308  */
1309 static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1310 {
1311 	u32 res;
1312 
1313 	res = filter_rcv(sk, buf);
1314 	if (res)
1315 		tipc_reject_msg(buf, res);
1316 	return 0;
1317 }
1318 
1319 /**
1320  * dispatch - handle incoming message
1321  * @tport: TIPC port that received message
1322  * @buf: message
1323  *
1324  * Called with port lock already taken.
1325  *
1326  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1327  */
1328 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1329 {
1330 	struct sock *sk = (struct sock *)tport->usr_handle;
1331 	u32 res;
1332 
1333 	/*
1334 	 * Process message if socket is unlocked; otherwise add to backlog queue
1335 	 *
1336 	 * This code is based on sk_receive_skb(), but must be distinct from it
1337 	 * since a TIPC-specific filter/reject mechanism is utilized
1338 	 */
1339 	bh_lock_sock(sk);
1340 	if (!sock_owned_by_user(sk)) {
1341 		res = filter_rcv(sk, buf);
1342 	} else {
1343 		if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1344 			res = TIPC_ERR_OVERLOAD;
1345 		else
1346 			res = TIPC_OK;
1347 	}
1348 	bh_unlock_sock(sk);
1349 
1350 	return res;
1351 }
1352 
1353 /**
1354  * wakeupdispatch - wake up port after congestion
1355  * @tport: port to wakeup
1356  *
1357  * Called with port lock already taken.
1358  */
1359 static void wakeupdispatch(struct tipc_port *tport)
1360 {
1361 	struct sock *sk = (struct sock *)tport->usr_handle;
1362 
1363 	sk->sk_write_space(sk);
1364 }
1365 
1366 /**
1367  * connect - establish a connection to another TIPC port
1368  * @sock: socket structure
1369  * @dest: socket address for destination port
1370  * @destlen: size of socket address data structure
1371  * @flags: file-related flags associated with socket
1372  *
1373  * Returns 0 on success, errno otherwise
1374  */
1375 static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1376 		   int flags)
1377 {
1378 	struct sock *sk = sock->sk;
1379 	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1380 	struct msghdr m = {NULL,};
1381 	unsigned int timeout;
1382 	int res;
1383 
1384 	lock_sock(sk);
1385 
1386 	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1387 	if (sock->state == SS_READY) {
1388 		res = -EOPNOTSUPP;
1389 		goto exit;
1390 	}
1391 
1392 	/*
1393 	 * Reject connection attempt using multicast address
1394 	 *
1395 	 * Note: send_msg() validates the rest of the address fields,
1396 	 *       so there's no need to do it here
1397 	 */
1398 	if (dst->addrtype == TIPC_ADDR_MCAST) {
1399 		res = -EINVAL;
1400 		goto exit;
1401 	}
1402 
1403 	timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1404 
1405 	switch (sock->state) {
1406 	case SS_UNCONNECTED:
1407 		/* Send a 'SYN-' to destination */
1408 		m.msg_name = dest;
1409 		m.msg_namelen = destlen;
1410 
1411 		/* If connect is in non-blocking case, set MSG_DONTWAIT to
1412 		 * indicate send_msg() is never blocked.
1413 		 */
1414 		if (!timeout)
1415 			m.msg_flags = MSG_DONTWAIT;
1416 
1417 		res = send_msg(NULL, sock, &m, 0);
1418 		if ((res < 0) && (res != -EWOULDBLOCK))
1419 			goto exit;
1420 
1421 		/* Just entered SS_CONNECTING state; the only
1422 		 * difference is that return value in non-blocking
1423 		 * case is EINPROGRESS, rather than EALREADY.
1424 		 */
1425 		res = -EINPROGRESS;
1426 		break;
1427 	case SS_CONNECTING:
1428 		res = -EALREADY;
1429 		break;
1430 	case SS_CONNECTED:
1431 		res = -EISCONN;
1432 		break;
1433 	default:
1434 		res = -EINVAL;
1435 		goto exit;
1436 	}
1437 
1438 	if (sock->state == SS_CONNECTING) {
1439 		if (!timeout)
1440 			goto exit;
1441 
1442 		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1443 		release_sock(sk);
1444 		res = wait_event_interruptible_timeout(*sk_sleep(sk),
1445 				sock->state != SS_CONNECTING,
1446 				timeout ? (long)msecs_to_jiffies(timeout)
1447 					: MAX_SCHEDULE_TIMEOUT);
1448 		lock_sock(sk);
1449 		if (res <= 0) {
1450 			if (res == 0)
1451 				res = -ETIMEDOUT;
1452 			else
1453 				; /* leave "res" unchanged */
1454 			goto exit;
1455 		}
1456 	}
1457 
1458 	if (unlikely(sock->state == SS_DISCONNECTING))
1459 		res = sock_error(sk);
1460 	else
1461 		res = 0;
1462 
1463 exit:
1464 	release_sock(sk);
1465 	return res;
1466 }
1467 
1468 /**
1469  * listen - allow socket to listen for incoming connections
1470  * @sock: socket structure
1471  * @len: (unused)
1472  *
1473  * Returns 0 on success, errno otherwise
1474  */
1475 static int listen(struct socket *sock, int len)
1476 {
1477 	struct sock *sk = sock->sk;
1478 	int res;
1479 
1480 	lock_sock(sk);
1481 
1482 	if (sock->state != SS_UNCONNECTED)
1483 		res = -EINVAL;
1484 	else {
1485 		sock->state = SS_LISTENING;
1486 		res = 0;
1487 	}
1488 
1489 	release_sock(sk);
1490 	return res;
1491 }
1492 
1493 /**
1494  * accept - wait for connection request
1495  * @sock: listening socket
1496  * @newsock: new socket that is to be connected
1497  * @flags: file-related flags associated with socket
1498  *
1499  * Returns 0 on success, errno otherwise
1500  */
1501 static int accept(struct socket *sock, struct socket *new_sock, int flags)
1502 {
1503 	struct sock *new_sk, *sk = sock->sk;
1504 	struct sk_buff *buf;
1505 	struct tipc_sock *new_tsock;
1506 	struct tipc_port *new_tport;
1507 	struct tipc_msg *msg;
1508 	u32 new_ref;
1509 
1510 	int res;
1511 
1512 	lock_sock(sk);
1513 
1514 	if (sock->state != SS_LISTENING) {
1515 		res = -EINVAL;
1516 		goto exit;
1517 	}
1518 
1519 	while (skb_queue_empty(&sk->sk_receive_queue)) {
1520 		if (flags & O_NONBLOCK) {
1521 			res = -EWOULDBLOCK;
1522 			goto exit;
1523 		}
1524 		release_sock(sk);
1525 		res = wait_event_interruptible(*sk_sleep(sk),
1526 				(!skb_queue_empty(&sk->sk_receive_queue)));
1527 		lock_sock(sk);
1528 		if (res)
1529 			goto exit;
1530 	}
1531 
1532 	buf = skb_peek(&sk->sk_receive_queue);
1533 
1534 	res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1535 	if (res)
1536 		goto exit;
1537 
1538 	new_sk = new_sock->sk;
1539 	new_tsock = tipc_sk(new_sk);
1540 	new_tport = new_tsock->p;
1541 	new_ref = new_tport->ref;
1542 	msg = buf_msg(buf);
1543 
1544 	/* we lock on new_sk; but lockdep sees the lock on sk */
1545 	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1546 
1547 	/*
1548 	 * Reject any stray messages received by new socket
1549 	 * before the socket lock was taken (very, very unlikely)
1550 	 */
1551 	reject_rx_queue(new_sk);
1552 
1553 	/* Connect new socket to it's peer */
1554 	new_tsock->peer_name.ref = msg_origport(msg);
1555 	new_tsock->peer_name.node = msg_orignode(msg);
1556 	tipc_connect(new_ref, &new_tsock->peer_name);
1557 	new_sock->state = SS_CONNECTED;
1558 
1559 	tipc_set_portimportance(new_ref, msg_importance(msg));
1560 	if (msg_named(msg)) {
1561 		new_tport->conn_type = msg_nametype(msg);
1562 		new_tport->conn_instance = msg_nameinst(msg);
1563 	}
1564 
1565 	/*
1566 	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1567 	 * Respond to 'SYN+' by queuing it on new socket.
1568 	 */
1569 	if (!msg_data_sz(msg)) {
1570 		struct msghdr m = {NULL,};
1571 
1572 		advance_rx_queue(sk);
1573 		send_packet(NULL, new_sock, &m, 0);
1574 	} else {
1575 		__skb_dequeue(&sk->sk_receive_queue);
1576 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
1577 		skb_set_owner_r(buf, new_sk);
1578 	}
1579 	release_sock(new_sk);
1580 
1581 exit:
1582 	release_sock(sk);
1583 	return res;
1584 }
1585 
1586 /**
1587  * shutdown - shutdown socket connection
1588  * @sock: socket structure
1589  * @how: direction to close (must be SHUT_RDWR)
1590  *
1591  * Terminates connection (if necessary), then purges socket's receive queue.
1592  *
1593  * Returns 0 on success, errno otherwise
1594  */
1595 static int shutdown(struct socket *sock, int how)
1596 {
1597 	struct sock *sk = sock->sk;
1598 	struct tipc_port *tport = tipc_sk_port(sk);
1599 	struct sk_buff *buf;
1600 	int res;
1601 
1602 	if (how != SHUT_RDWR)
1603 		return -EINVAL;
1604 
1605 	lock_sock(sk);
1606 
1607 	switch (sock->state) {
1608 	case SS_CONNECTING:
1609 	case SS_CONNECTED:
1610 
1611 restart:
1612 		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1613 		buf = __skb_dequeue(&sk->sk_receive_queue);
1614 		if (buf) {
1615 			if (TIPC_SKB_CB(buf)->handle != 0) {
1616 				kfree_skb(buf);
1617 				goto restart;
1618 			}
1619 			tipc_disconnect(tport->ref);
1620 			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1621 		} else {
1622 			tipc_shutdown(tport->ref);
1623 		}
1624 
1625 		sock->state = SS_DISCONNECTING;
1626 
1627 		/* fall through */
1628 
1629 	case SS_DISCONNECTING:
1630 
1631 		/* Discard any unreceived messages */
1632 		__skb_queue_purge(&sk->sk_receive_queue);
1633 
1634 		/* Wake up anyone sleeping in poll */
1635 		sk->sk_state_change(sk);
1636 		res = 0;
1637 		break;
1638 
1639 	default:
1640 		res = -ENOTCONN;
1641 	}
1642 
1643 	release_sock(sk);
1644 	return res;
1645 }
1646 
1647 /**
1648  * setsockopt - set socket option
1649  * @sock: socket structure
1650  * @lvl: option level
1651  * @opt: option identifier
1652  * @ov: pointer to new option value
1653  * @ol: length of option value
1654  *
1655  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1656  * (to ease compatibility).
1657  *
1658  * Returns 0 on success, errno otherwise
1659  */
1660 static int setsockopt(struct socket *sock,
1661 		      int lvl, int opt, char __user *ov, unsigned int ol)
1662 {
1663 	struct sock *sk = sock->sk;
1664 	struct tipc_port *tport = tipc_sk_port(sk);
1665 	u32 value;
1666 	int res;
1667 
1668 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1669 		return 0;
1670 	if (lvl != SOL_TIPC)
1671 		return -ENOPROTOOPT;
1672 	if (ol < sizeof(value))
1673 		return -EINVAL;
1674 	res = get_user(value, (u32 __user *)ov);
1675 	if (res)
1676 		return res;
1677 
1678 	lock_sock(sk);
1679 
1680 	switch (opt) {
1681 	case TIPC_IMPORTANCE:
1682 		res = tipc_set_portimportance(tport->ref, value);
1683 		break;
1684 	case TIPC_SRC_DROPPABLE:
1685 		if (sock->type != SOCK_STREAM)
1686 			res = tipc_set_portunreliable(tport->ref, value);
1687 		else
1688 			res = -ENOPROTOOPT;
1689 		break;
1690 	case TIPC_DEST_DROPPABLE:
1691 		res = tipc_set_portunreturnable(tport->ref, value);
1692 		break;
1693 	case TIPC_CONN_TIMEOUT:
1694 		tipc_sk(sk)->conn_timeout = value;
1695 		/* no need to set "res", since already 0 at this point */
1696 		break;
1697 	default:
1698 		res = -EINVAL;
1699 	}
1700 
1701 	release_sock(sk);
1702 
1703 	return res;
1704 }
1705 
1706 /**
1707  * getsockopt - get socket option
1708  * @sock: socket structure
1709  * @lvl: option level
1710  * @opt: option identifier
1711  * @ov: receptacle for option value
1712  * @ol: receptacle for length of option value
1713  *
1714  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1715  * (to ease compatibility).
1716  *
1717  * Returns 0 on success, errno otherwise
1718  */
1719 static int getsockopt(struct socket *sock,
1720 		      int lvl, int opt, char __user *ov, int __user *ol)
1721 {
1722 	struct sock *sk = sock->sk;
1723 	struct tipc_port *tport = tipc_sk_port(sk);
1724 	int len;
1725 	u32 value;
1726 	int res;
1727 
1728 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1729 		return put_user(0, ol);
1730 	if (lvl != SOL_TIPC)
1731 		return -ENOPROTOOPT;
1732 	res = get_user(len, ol);
1733 	if (res)
1734 		return res;
1735 
1736 	lock_sock(sk);
1737 
1738 	switch (opt) {
1739 	case TIPC_IMPORTANCE:
1740 		res = tipc_portimportance(tport->ref, &value);
1741 		break;
1742 	case TIPC_SRC_DROPPABLE:
1743 		res = tipc_portunreliable(tport->ref, &value);
1744 		break;
1745 	case TIPC_DEST_DROPPABLE:
1746 		res = tipc_portunreturnable(tport->ref, &value);
1747 		break;
1748 	case TIPC_CONN_TIMEOUT:
1749 		value = tipc_sk(sk)->conn_timeout;
1750 		/* no need to set "res", since already 0 at this point */
1751 		break;
1752 	case TIPC_NODE_RECVQ_DEPTH:
1753 		value = 0; /* was tipc_queue_size, now obsolete */
1754 		break;
1755 	case TIPC_SOCK_RECVQ_DEPTH:
1756 		value = skb_queue_len(&sk->sk_receive_queue);
1757 		break;
1758 	default:
1759 		res = -EINVAL;
1760 	}
1761 
1762 	release_sock(sk);
1763 
1764 	if (res)
1765 		return res;	/* "get" failed */
1766 
1767 	if (len < sizeof(value))
1768 		return -EINVAL;
1769 
1770 	if (copy_to_user(ov, &value, sizeof(value)))
1771 		return -EFAULT;
1772 
1773 	return put_user(sizeof(value), ol);
1774 }
1775 
1776 /* Protocol switches for the various types of TIPC sockets */
1777 
1778 static const struct proto_ops msg_ops = {
1779 	.owner		= THIS_MODULE,
1780 	.family		= AF_TIPC,
1781 	.release	= release,
1782 	.bind		= bind,
1783 	.connect	= connect,
1784 	.socketpair	= sock_no_socketpair,
1785 	.accept		= sock_no_accept,
1786 	.getname	= get_name,
1787 	.poll		= poll,
1788 	.ioctl		= sock_no_ioctl,
1789 	.listen		= sock_no_listen,
1790 	.shutdown	= shutdown,
1791 	.setsockopt	= setsockopt,
1792 	.getsockopt	= getsockopt,
1793 	.sendmsg	= send_msg,
1794 	.recvmsg	= recv_msg,
1795 	.mmap		= sock_no_mmap,
1796 	.sendpage	= sock_no_sendpage
1797 };
1798 
1799 static const struct proto_ops packet_ops = {
1800 	.owner		= THIS_MODULE,
1801 	.family		= AF_TIPC,
1802 	.release	= release,
1803 	.bind		= bind,
1804 	.connect	= connect,
1805 	.socketpair	= sock_no_socketpair,
1806 	.accept		= accept,
1807 	.getname	= get_name,
1808 	.poll		= poll,
1809 	.ioctl		= sock_no_ioctl,
1810 	.listen		= listen,
1811 	.shutdown	= shutdown,
1812 	.setsockopt	= setsockopt,
1813 	.getsockopt	= getsockopt,
1814 	.sendmsg	= send_packet,
1815 	.recvmsg	= recv_msg,
1816 	.mmap		= sock_no_mmap,
1817 	.sendpage	= sock_no_sendpage
1818 };
1819 
1820 static const struct proto_ops stream_ops = {
1821 	.owner		= THIS_MODULE,
1822 	.family		= AF_TIPC,
1823 	.release	= release,
1824 	.bind		= bind,
1825 	.connect	= connect,
1826 	.socketpair	= sock_no_socketpair,
1827 	.accept		= accept,
1828 	.getname	= get_name,
1829 	.poll		= poll,
1830 	.ioctl		= sock_no_ioctl,
1831 	.listen		= listen,
1832 	.shutdown	= shutdown,
1833 	.setsockopt	= setsockopt,
1834 	.getsockopt	= getsockopt,
1835 	.sendmsg	= send_stream,
1836 	.recvmsg	= recv_stream,
1837 	.mmap		= sock_no_mmap,
1838 	.sendpage	= sock_no_sendpage
1839 };
1840 
1841 static const struct net_proto_family tipc_family_ops = {
1842 	.owner		= THIS_MODULE,
1843 	.family		= AF_TIPC,
1844 	.create		= tipc_create
1845 };
1846 
1847 static struct proto tipc_proto = {
1848 	.name		= "TIPC",
1849 	.owner		= THIS_MODULE,
1850 	.obj_size	= sizeof(struct tipc_sock)
1851 };
1852 
1853 /**
1854  * tipc_socket_init - initialize TIPC socket interface
1855  *
1856  * Returns 0 on success, errno otherwise
1857  */
1858 int tipc_socket_init(void)
1859 {
1860 	int res;
1861 
1862 	res = proto_register(&tipc_proto, 1);
1863 	if (res) {
1864 		pr_err("Failed to register TIPC protocol type\n");
1865 		goto out;
1866 	}
1867 
1868 	res = sock_register(&tipc_family_ops);
1869 	if (res) {
1870 		pr_err("Failed to register TIPC socket type\n");
1871 		proto_unregister(&tipc_proto);
1872 		goto out;
1873 	}
1874 
1875 	sockets_enabled = 1;
1876  out:
1877 	return res;
1878 }
1879 
1880 /**
1881  * tipc_socket_stop - stop TIPC socket interface
1882  */
1883 void tipc_socket_stop(void)
1884 {
1885 	if (!sockets_enabled)
1886 		return;
1887 
1888 	sockets_enabled = 0;
1889 	sock_unregister(tipc_family_ops.family);
1890 	proto_unregister(&tipc_proto);
1891 }
1892