xref: /openbmc/linux/net/tipc/socket.c (revision afb46f79)
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "port.h"
39 
40 #include <linux/export.h>
41 
42 #define SS_LISTENING	-1	/* socket is listening */
43 #define SS_READY	-2	/* socket is connectionless */
44 
45 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
46 
47 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
48 static void tipc_data_ready(struct sock *sk);
49 static void tipc_write_space(struct sock *sk);
50 static int tipc_release(struct socket *sock);
51 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
52 
53 static const struct proto_ops packet_ops;
54 static const struct proto_ops stream_ops;
55 static const struct proto_ops msg_ops;
56 
57 static struct proto tipc_proto;
58 static struct proto tipc_proto_kern;
59 
60 /*
61  * Revised TIPC socket locking policy:
62  *
63  * Most socket operations take the standard socket lock when they start
64  * and hold it until they finish (or until they need to sleep).  Acquiring
65  * this lock grants the owner exclusive access to the fields of the socket
66  * data structures, with the exception of the backlog queue.  A few socket
67  * operations can be done without taking the socket lock because they only
68  * read socket information that never changes during the life of the socket.
69  *
70  * Socket operations may acquire the lock for the associated TIPC port if they
71  * need to perform an operation on the port.  If any routine needs to acquire
72  * both the socket lock and the port lock it must take the socket lock first
73  * to avoid the risk of deadlock.
74  *
75  * The dispatcher handling incoming messages cannot grab the socket lock in
76  * the standard fashion, since invoked it runs at the BH level and cannot block.
77  * Instead, it checks to see if the socket lock is currently owned by someone,
78  * and either handles the message itself or adds it to the socket's backlog
79  * queue; in the latter case the queued message is processed once the process
80  * owning the socket lock releases it.
81  *
82  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
83  * the problem of a blocked socket operation preventing any other operations
84  * from occurring.  However, applications must be careful if they have
85  * multiple threads trying to send (or receive) on the same socket, as these
86  * operations might interfere with each other.  For example, doing a connect
87  * and a receive at the same time might allow the receive to consume the
88  * ACK message meant for the connect.  While additional work could be done
89  * to try and overcome this, it doesn't seem to be worthwhile at the present.
90  *
91  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
92  * that another operation that must be performed in a non-blocking manner is
93  * not delayed for very long because the lock has already been taken.
94  *
95  * NOTE: This code assumes that certain fields of a port/socket pair are
96  * constant over its lifetime; such fields can be examined without taking
97  * the socket lock and/or port lock, and do not need to be re-read even
98  * after resuming processing after waiting.  These fields include:
99  *   - socket type
100  *   - pointer to socket sk structure (aka tipc_sock structure)
101  *   - pointer to port structure
102  *   - port reference
103  */
104 
105 #include "socket.h"
106 
107 /**
108  * advance_rx_queue - discard first buffer in socket receive queue
109  *
110  * Caller must hold socket lock
111  */
112 static void advance_rx_queue(struct sock *sk)
113 {
114 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
115 }
116 
117 /**
118  * reject_rx_queue - reject all buffers in socket receive queue
119  *
120  * Caller must hold socket lock
121  */
122 static void reject_rx_queue(struct sock *sk)
123 {
124 	struct sk_buff *buf;
125 
126 	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
127 		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
128 }
129 
130 /**
131  * tipc_sk_create - create a TIPC socket
132  * @net: network namespace (must be default network)
133  * @sock: pre-allocated socket structure
134  * @protocol: protocol indicator (must be 0)
135  * @kern: caused by kernel or by userspace?
136  *
137  * This routine creates additional data structures used by the TIPC socket,
138  * initializes them, and links them together.
139  *
140  * Returns 0 on success, errno otherwise
141  */
142 static int tipc_sk_create(struct net *net, struct socket *sock,
143 			  int protocol, int kern)
144 {
145 	const struct proto_ops *ops;
146 	socket_state state;
147 	struct sock *sk;
148 	struct tipc_sock *tsk;
149 	struct tipc_port *port;
150 	u32 ref;
151 
152 	/* Validate arguments */
153 	if (unlikely(protocol != 0))
154 		return -EPROTONOSUPPORT;
155 
156 	switch (sock->type) {
157 	case SOCK_STREAM:
158 		ops = &stream_ops;
159 		state = SS_UNCONNECTED;
160 		break;
161 	case SOCK_SEQPACKET:
162 		ops = &packet_ops;
163 		state = SS_UNCONNECTED;
164 		break;
165 	case SOCK_DGRAM:
166 	case SOCK_RDM:
167 		ops = &msg_ops;
168 		state = SS_READY;
169 		break;
170 	default:
171 		return -EPROTOTYPE;
172 	}
173 
174 	/* Allocate socket's protocol area */
175 	if (!kern)
176 		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
177 	else
178 		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
179 
180 	if (sk == NULL)
181 		return -ENOMEM;
182 
183 	tsk = tipc_sk(sk);
184 	port = &tsk->port;
185 
186 	ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
187 	if (!ref) {
188 		pr_warn("Socket registration failed, ref. table exhausted\n");
189 		sk_free(sk);
190 		return -ENOMEM;
191 	}
192 
193 	/* Finish initializing socket data structures */
194 	sock->ops = ops;
195 	sock->state = state;
196 
197 	sock_init_data(sock, sk);
198 	sk->sk_backlog_rcv = backlog_rcv;
199 	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
200 	sk->sk_data_ready = tipc_data_ready;
201 	sk->sk_write_space = tipc_write_space;
202 	tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
203 	tipc_port_unlock(port);
204 
205 	if (sock->state == SS_READY) {
206 		tipc_port_set_unreturnable(port, true);
207 		if (sock->type == SOCK_DGRAM)
208 			tipc_port_set_unreliable(port, true);
209 	}
210 	return 0;
211 }
212 
213 /**
214  * tipc_sock_create_local - create TIPC socket from inside TIPC module
215  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
216  *
217  * We cannot use sock_creat_kern here because it bumps module user count.
218  * Since socket owner and creator is the same module we must make sure
219  * that module count remains zero for module local sockets, otherwise
220  * we cannot do rmmod.
221  *
222  * Returns 0 on success, errno otherwise
223  */
224 int tipc_sock_create_local(int type, struct socket **res)
225 {
226 	int rc;
227 
228 	rc = sock_create_lite(AF_TIPC, type, 0, res);
229 	if (rc < 0) {
230 		pr_err("Failed to create kernel socket\n");
231 		return rc;
232 	}
233 	tipc_sk_create(&init_net, *res, 0, 1);
234 
235 	return 0;
236 }
237 
238 /**
239  * tipc_sock_release_local - release socket created by tipc_sock_create_local
240  * @sock: the socket to be released.
241  *
242  * Module reference count is not incremented when such sockets are created,
243  * so we must keep it from being decremented when they are released.
244  */
245 void tipc_sock_release_local(struct socket *sock)
246 {
247 	tipc_release(sock);
248 	sock->ops = NULL;
249 	sock_release(sock);
250 }
251 
252 /**
253  * tipc_sock_accept_local - accept a connection on a socket created
254  * with tipc_sock_create_local. Use this function to avoid that
255  * module reference count is inadvertently incremented.
256  *
257  * @sock:    the accepting socket
258  * @newsock: reference to the new socket to be created
259  * @flags:   socket flags
260  */
261 
262 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
263 			   int flags)
264 {
265 	struct sock *sk = sock->sk;
266 	int ret;
267 
268 	ret = sock_create_lite(sk->sk_family, sk->sk_type,
269 			       sk->sk_protocol, newsock);
270 	if (ret < 0)
271 		return ret;
272 
273 	ret = tipc_accept(sock, *newsock, flags);
274 	if (ret < 0) {
275 		sock_release(*newsock);
276 		return ret;
277 	}
278 	(*newsock)->ops = sock->ops;
279 	return ret;
280 }
281 
282 /**
283  * tipc_release - destroy a TIPC socket
284  * @sock: socket to destroy
285  *
286  * This routine cleans up any messages that are still queued on the socket.
287  * For DGRAM and RDM socket types, all queued messages are rejected.
288  * For SEQPACKET and STREAM socket types, the first message is rejected
289  * and any others are discarded.  (If the first message on a STREAM socket
290  * is partially-read, it is discarded and the next one is rejected instead.)
291  *
292  * NOTE: Rejected messages are not necessarily returned to the sender!  They
293  * are returned or discarded according to the "destination droppable" setting
294  * specified for the message by the sender.
295  *
296  * Returns 0 on success, errno otherwise
297  */
298 static int tipc_release(struct socket *sock)
299 {
300 	struct sock *sk = sock->sk;
301 	struct tipc_sock *tsk;
302 	struct tipc_port *port;
303 	struct sk_buff *buf;
304 
305 	/*
306 	 * Exit if socket isn't fully initialized (occurs when a failed accept()
307 	 * releases a pre-allocated child socket that was never used)
308 	 */
309 	if (sk == NULL)
310 		return 0;
311 
312 	tsk = tipc_sk(sk);
313 	port = &tsk->port;
314 	lock_sock(sk);
315 
316 	/*
317 	 * Reject all unreceived messages, except on an active connection
318 	 * (which disconnects locally & sends a 'FIN+' to peer)
319 	 */
320 	while (sock->state != SS_DISCONNECTING) {
321 		buf = __skb_dequeue(&sk->sk_receive_queue);
322 		if (buf == NULL)
323 			break;
324 		if (TIPC_SKB_CB(buf)->handle != NULL)
325 			kfree_skb(buf);
326 		else {
327 			if ((sock->state == SS_CONNECTING) ||
328 			    (sock->state == SS_CONNECTED)) {
329 				sock->state = SS_DISCONNECTING;
330 				tipc_port_disconnect(port->ref);
331 			}
332 			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
333 		}
334 	}
335 
336 	/* Destroy TIPC port; also disconnects an active connection and
337 	 * sends a 'FIN-' to peer.
338 	 */
339 	tipc_port_destroy(port);
340 
341 	/* Discard any remaining (connection-based) messages in receive queue */
342 	__skb_queue_purge(&sk->sk_receive_queue);
343 
344 	/* Reject any messages that accumulated in backlog queue */
345 	sock->state = SS_DISCONNECTING;
346 	release_sock(sk);
347 
348 	sock_put(sk);
349 	sock->sk = NULL;
350 
351 	return 0;
352 }
353 
354 /**
355  * tipc_bind - associate or disassocate TIPC name(s) with a socket
356  * @sock: socket structure
357  * @uaddr: socket address describing name(s) and desired operation
358  * @uaddr_len: size of socket address data structure
359  *
360  * Name and name sequence binding is indicated using a positive scope value;
361  * a negative scope value unbinds the specified name.  Specifying no name
362  * (i.e. a socket address length of 0) unbinds all names from the socket.
363  *
364  * Returns 0 on success, errno otherwise
365  *
366  * NOTE: This routine doesn't need to take the socket lock since it doesn't
367  *       access any non-constant socket information.
368  */
369 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
370 		     int uaddr_len)
371 {
372 	struct sock *sk = sock->sk;
373 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
374 	struct tipc_sock *tsk = tipc_sk(sk);
375 	int res = -EINVAL;
376 
377 	lock_sock(sk);
378 	if (unlikely(!uaddr_len)) {
379 		res = tipc_withdraw(&tsk->port, 0, NULL);
380 		goto exit;
381 	}
382 
383 	if (uaddr_len < sizeof(struct sockaddr_tipc)) {
384 		res = -EINVAL;
385 		goto exit;
386 	}
387 	if (addr->family != AF_TIPC) {
388 		res = -EAFNOSUPPORT;
389 		goto exit;
390 	}
391 
392 	if (addr->addrtype == TIPC_ADDR_NAME)
393 		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
394 	else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
395 		res = -EAFNOSUPPORT;
396 		goto exit;
397 	}
398 
399 	if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
400 	    (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
401 	    (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
402 		res = -EACCES;
403 		goto exit;
404 	}
405 
406 	res = (addr->scope > 0) ?
407 		tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
408 		tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
409 exit:
410 	release_sock(sk);
411 	return res;
412 }
413 
414 /**
415  * tipc_getname - get port ID of socket or peer socket
416  * @sock: socket structure
417  * @uaddr: area for returned socket address
418  * @uaddr_len: area for returned length of socket address
419  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
420  *
421  * Returns 0 on success, errno otherwise
422  *
423  * NOTE: This routine doesn't need to take the socket lock since it only
424  *       accesses socket information that is unchanging (or which changes in
425  *       a completely predictable manner).
426  */
427 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
428 			int *uaddr_len, int peer)
429 {
430 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
431 	struct tipc_sock *tsk = tipc_sk(sock->sk);
432 
433 	memset(addr, 0, sizeof(*addr));
434 	if (peer) {
435 		if ((sock->state != SS_CONNECTED) &&
436 			((peer != 2) || (sock->state != SS_DISCONNECTING)))
437 			return -ENOTCONN;
438 		addr->addr.id.ref = tipc_port_peerport(&tsk->port);
439 		addr->addr.id.node = tipc_port_peernode(&tsk->port);
440 	} else {
441 		addr->addr.id.ref = tsk->port.ref;
442 		addr->addr.id.node = tipc_own_addr;
443 	}
444 
445 	*uaddr_len = sizeof(*addr);
446 	addr->addrtype = TIPC_ADDR_ID;
447 	addr->family = AF_TIPC;
448 	addr->scope = 0;
449 	addr->addr.name.domain = 0;
450 
451 	return 0;
452 }
453 
454 /**
455  * tipc_poll - read and possibly block on pollmask
456  * @file: file structure associated with the socket
457  * @sock: socket for which to calculate the poll bits
458  * @wait: ???
459  *
460  * Returns pollmask value
461  *
462  * COMMENTARY:
463  * It appears that the usual socket locking mechanisms are not useful here
464  * since the pollmask info is potentially out-of-date the moment this routine
465  * exits.  TCP and other protocols seem to rely on higher level poll routines
466  * to handle any preventable race conditions, so TIPC will do the same ...
467  *
468  * TIPC sets the returned events as follows:
469  *
470  * socket state		flags set
471  * ------------		---------
472  * unconnected		no read flags
473  *			POLLOUT if port is not congested
474  *
475  * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue
476  *			no write flags
477  *
478  * connected		POLLIN/POLLRDNORM if data in rx queue
479  *			POLLOUT if port is not congested
480  *
481  * disconnecting	POLLIN/POLLRDNORM/POLLHUP
482  *			no write flags
483  *
484  * listening		POLLIN if SYN in rx queue
485  *			no write flags
486  *
487  * ready		POLLIN/POLLRDNORM if data in rx queue
488  * [connectionless]	POLLOUT (since port cannot be congested)
489  *
490  * IMPORTANT: The fact that a read or write operation is indicated does NOT
491  * imply that the operation will succeed, merely that it should be performed
492  * and will not block.
493  */
494 static unsigned int tipc_poll(struct file *file, struct socket *sock,
495 			      poll_table *wait)
496 {
497 	struct sock *sk = sock->sk;
498 	struct tipc_sock *tsk = tipc_sk(sk);
499 	u32 mask = 0;
500 
501 	sock_poll_wait(file, sk_sleep(sk), wait);
502 
503 	switch ((int)sock->state) {
504 	case SS_UNCONNECTED:
505 		if (!tsk->port.congested)
506 			mask |= POLLOUT;
507 		break;
508 	case SS_READY:
509 	case SS_CONNECTED:
510 		if (!tsk->port.congested)
511 			mask |= POLLOUT;
512 		/* fall thru' */
513 	case SS_CONNECTING:
514 	case SS_LISTENING:
515 		if (!skb_queue_empty(&sk->sk_receive_queue))
516 			mask |= (POLLIN | POLLRDNORM);
517 		break;
518 	case SS_DISCONNECTING:
519 		mask = (POLLIN | POLLRDNORM | POLLHUP);
520 		break;
521 	}
522 
523 	return mask;
524 }
525 
526 /**
527  * dest_name_check - verify user is permitted to send to specified port name
528  * @dest: destination address
529  * @m: descriptor for message to be sent
530  *
531  * Prevents restricted configuration commands from being issued by
532  * unauthorized users.
533  *
534  * Returns 0 if permission is granted, otherwise errno
535  */
536 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
537 {
538 	struct tipc_cfg_msg_hdr hdr;
539 
540 	if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
541 		return 0;
542 	if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
543 		return 0;
544 	if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
545 		return -EACCES;
546 
547 	if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
548 		return -EMSGSIZE;
549 	if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
550 		return -EFAULT;
551 	if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
552 		return -EACCES;
553 
554 	return 0;
555 }
556 
557 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
558 {
559 	struct sock *sk = sock->sk;
560 	struct tipc_sock *tsk = tipc_sk(sk);
561 	DEFINE_WAIT(wait);
562 	int done;
563 
564 	do {
565 		int err = sock_error(sk);
566 		if (err)
567 			return err;
568 		if (sock->state == SS_DISCONNECTING)
569 			return -EPIPE;
570 		if (!*timeo_p)
571 			return -EAGAIN;
572 		if (signal_pending(current))
573 			return sock_intr_errno(*timeo_p);
574 
575 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
576 		done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
577 		finish_wait(sk_sleep(sk), &wait);
578 	} while (!done);
579 	return 0;
580 }
581 
582 
583 /**
584  * tipc_sendmsg - send message in connectionless manner
585  * @iocb: if NULL, indicates that socket lock is already held
586  * @sock: socket structure
587  * @m: message to send
588  * @total_len: length of message
589  *
590  * Message must have an destination specified explicitly.
591  * Used for SOCK_RDM and SOCK_DGRAM messages,
592  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
593  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
594  *
595  * Returns the number of bytes sent on success, or errno otherwise
596  */
597 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
598 			struct msghdr *m, size_t total_len)
599 {
600 	struct sock *sk = sock->sk;
601 	struct tipc_sock *tsk = tipc_sk(sk);
602 	struct tipc_port *port = &tsk->port;
603 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
604 	int needs_conn;
605 	long timeo;
606 	int res = -EINVAL;
607 
608 	if (unlikely(!dest))
609 		return -EDESTADDRREQ;
610 	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
611 		     (dest->family != AF_TIPC)))
612 		return -EINVAL;
613 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
614 		return -EMSGSIZE;
615 
616 	if (iocb)
617 		lock_sock(sk);
618 
619 	needs_conn = (sock->state != SS_READY);
620 	if (unlikely(needs_conn)) {
621 		if (sock->state == SS_LISTENING) {
622 			res = -EPIPE;
623 			goto exit;
624 		}
625 		if (sock->state != SS_UNCONNECTED) {
626 			res = -EISCONN;
627 			goto exit;
628 		}
629 		if (tsk->port.published) {
630 			res = -EOPNOTSUPP;
631 			goto exit;
632 		}
633 		if (dest->addrtype == TIPC_ADDR_NAME) {
634 			tsk->port.conn_type = dest->addr.name.name.type;
635 			tsk->port.conn_instance = dest->addr.name.name.instance;
636 		}
637 
638 		/* Abort any pending connection attempts (very unlikely) */
639 		reject_rx_queue(sk);
640 	}
641 
642 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
643 	do {
644 		if (dest->addrtype == TIPC_ADDR_NAME) {
645 			res = dest_name_check(dest, m);
646 			if (res)
647 				break;
648 			res = tipc_send2name(port,
649 					     &dest->addr.name.name,
650 					     dest->addr.name.domain,
651 					     m->msg_iov,
652 					     total_len);
653 		} else if (dest->addrtype == TIPC_ADDR_ID) {
654 			res = tipc_send2port(port,
655 					     &dest->addr.id,
656 					     m->msg_iov,
657 					     total_len);
658 		} else if (dest->addrtype == TIPC_ADDR_MCAST) {
659 			if (needs_conn) {
660 				res = -EOPNOTSUPP;
661 				break;
662 			}
663 			res = dest_name_check(dest, m);
664 			if (res)
665 				break;
666 			res = tipc_port_mcast_xmit(port,
667 						   &dest->addr.nameseq,
668 						   m->msg_iov,
669 						   total_len);
670 		}
671 		if (likely(res != -ELINKCONG)) {
672 			if (needs_conn && (res >= 0))
673 				sock->state = SS_CONNECTING;
674 			break;
675 		}
676 		res = tipc_wait_for_sndmsg(sock, &timeo);
677 		if (res)
678 			break;
679 	} while (1);
680 
681 exit:
682 	if (iocb)
683 		release_sock(sk);
684 	return res;
685 }
686 
687 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
688 {
689 	struct sock *sk = sock->sk;
690 	struct tipc_sock *tsk = tipc_sk(sk);
691 	struct tipc_port *port = &tsk->port;
692 	DEFINE_WAIT(wait);
693 	int done;
694 
695 	do {
696 		int err = sock_error(sk);
697 		if (err)
698 			return err;
699 		if (sock->state == SS_DISCONNECTING)
700 			return -EPIPE;
701 		else if (sock->state != SS_CONNECTED)
702 			return -ENOTCONN;
703 		if (!*timeo_p)
704 			return -EAGAIN;
705 		if (signal_pending(current))
706 			return sock_intr_errno(*timeo_p);
707 
708 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
709 		done = sk_wait_event(sk, timeo_p,
710 				     (!port->congested || !port->connected));
711 		finish_wait(sk_sleep(sk), &wait);
712 	} while (!done);
713 	return 0;
714 }
715 
716 /**
717  * tipc_send_packet - send a connection-oriented message
718  * @iocb: if NULL, indicates that socket lock is already held
719  * @sock: socket structure
720  * @m: message to send
721  * @total_len: length of message
722  *
723  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
724  *
725  * Returns the number of bytes sent on success, or errno otherwise
726  */
727 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
728 			    struct msghdr *m, size_t total_len)
729 {
730 	struct sock *sk = sock->sk;
731 	struct tipc_sock *tsk = tipc_sk(sk);
732 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
733 	int res = -EINVAL;
734 	long timeo;
735 
736 	/* Handle implied connection establishment */
737 	if (unlikely(dest))
738 		return tipc_sendmsg(iocb, sock, m, total_len);
739 
740 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
741 		return -EMSGSIZE;
742 
743 	if (iocb)
744 		lock_sock(sk);
745 
746 	if (unlikely(sock->state != SS_CONNECTED)) {
747 		if (sock->state == SS_DISCONNECTING)
748 			res = -EPIPE;
749 		else
750 			res = -ENOTCONN;
751 		goto exit;
752 	}
753 
754 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
755 	do {
756 		res = tipc_send(&tsk->port, m->msg_iov, total_len);
757 		if (likely(res != -ELINKCONG))
758 			break;
759 		res = tipc_wait_for_sndpkt(sock, &timeo);
760 		if (res)
761 			break;
762 	} while (1);
763 exit:
764 	if (iocb)
765 		release_sock(sk);
766 	return res;
767 }
768 
769 /**
770  * tipc_send_stream - send stream-oriented data
771  * @iocb: (unused)
772  * @sock: socket structure
773  * @m: data to send
774  * @total_len: total length of data to be sent
775  *
776  * Used for SOCK_STREAM data.
777  *
778  * Returns the number of bytes sent on success (or partial success),
779  * or errno if no data sent
780  */
781 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
782 			    struct msghdr *m, size_t total_len)
783 {
784 	struct sock *sk = sock->sk;
785 	struct tipc_sock *tsk = tipc_sk(sk);
786 	struct msghdr my_msg;
787 	struct iovec my_iov;
788 	struct iovec *curr_iov;
789 	int curr_iovlen;
790 	char __user *curr_start;
791 	u32 hdr_size;
792 	int curr_left;
793 	int bytes_to_send;
794 	int bytes_sent;
795 	int res;
796 
797 	lock_sock(sk);
798 
799 	/* Handle special cases where there is no connection */
800 	if (unlikely(sock->state != SS_CONNECTED)) {
801 		if (sock->state == SS_UNCONNECTED)
802 			res = tipc_send_packet(NULL, sock, m, total_len);
803 		else
804 			res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
805 		goto exit;
806 	}
807 
808 	if (unlikely(m->msg_name)) {
809 		res = -EISCONN;
810 		goto exit;
811 	}
812 
813 	if (total_len > (unsigned int)INT_MAX) {
814 		res = -EMSGSIZE;
815 		goto exit;
816 	}
817 
818 	/*
819 	 * Send each iovec entry using one or more messages
820 	 *
821 	 * Note: This algorithm is good for the most likely case
822 	 * (i.e. one large iovec entry), but could be improved to pass sets
823 	 * of small iovec entries into send_packet().
824 	 */
825 	curr_iov = m->msg_iov;
826 	curr_iovlen = m->msg_iovlen;
827 	my_msg.msg_iov = &my_iov;
828 	my_msg.msg_iovlen = 1;
829 	my_msg.msg_flags = m->msg_flags;
830 	my_msg.msg_name = NULL;
831 	bytes_sent = 0;
832 
833 	hdr_size = msg_hdr_sz(&tsk->port.phdr);
834 
835 	while (curr_iovlen--) {
836 		curr_start = curr_iov->iov_base;
837 		curr_left = curr_iov->iov_len;
838 
839 		while (curr_left) {
840 			bytes_to_send = tsk->port.max_pkt - hdr_size;
841 			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
842 				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
843 			if (curr_left < bytes_to_send)
844 				bytes_to_send = curr_left;
845 			my_iov.iov_base = curr_start;
846 			my_iov.iov_len = bytes_to_send;
847 			res = tipc_send_packet(NULL, sock, &my_msg,
848 					       bytes_to_send);
849 			if (res < 0) {
850 				if (bytes_sent)
851 					res = bytes_sent;
852 				goto exit;
853 			}
854 			curr_left -= bytes_to_send;
855 			curr_start += bytes_to_send;
856 			bytes_sent += bytes_to_send;
857 		}
858 
859 		curr_iov++;
860 	}
861 	res = bytes_sent;
862 exit:
863 	release_sock(sk);
864 	return res;
865 }
866 
867 /**
868  * auto_connect - complete connection setup to a remote port
869  * @tsk: tipc socket structure
870  * @msg: peer's response message
871  *
872  * Returns 0 on success, errno otherwise
873  */
874 static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
875 {
876 	struct tipc_port *port = &tsk->port;
877 	struct socket *sock = tsk->sk.sk_socket;
878 	struct tipc_portid peer;
879 
880 	peer.ref = msg_origport(msg);
881 	peer.node = msg_orignode(msg);
882 
883 	__tipc_port_connect(port->ref, port, &peer);
884 
885 	if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
886 		return -EINVAL;
887 	msg_set_importance(&port->phdr, (u32)msg_importance(msg));
888 	sock->state = SS_CONNECTED;
889 	return 0;
890 }
891 
892 /**
893  * set_orig_addr - capture sender's address for received message
894  * @m: descriptor for message info
895  * @msg: received message header
896  *
897  * Note: Address is not captured if not requested by receiver.
898  */
899 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
900 {
901 	DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
902 
903 	if (addr) {
904 		addr->family = AF_TIPC;
905 		addr->addrtype = TIPC_ADDR_ID;
906 		memset(&addr->addr, 0, sizeof(addr->addr));
907 		addr->addr.id.ref = msg_origport(msg);
908 		addr->addr.id.node = msg_orignode(msg);
909 		addr->addr.name.domain = 0;	/* could leave uninitialized */
910 		addr->scope = 0;		/* could leave uninitialized */
911 		m->msg_namelen = sizeof(struct sockaddr_tipc);
912 	}
913 }
914 
915 /**
916  * anc_data_recv - optionally capture ancillary data for received message
917  * @m: descriptor for message info
918  * @msg: received message header
919  * @tport: TIPC port associated with message
920  *
921  * Note: Ancillary data is not captured if not requested by receiver.
922  *
923  * Returns 0 if successful, otherwise errno
924  */
925 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
926 			 struct tipc_port *tport)
927 {
928 	u32 anc_data[3];
929 	u32 err;
930 	u32 dest_type;
931 	int has_name;
932 	int res;
933 
934 	if (likely(m->msg_controllen == 0))
935 		return 0;
936 
937 	/* Optionally capture errored message object(s) */
938 	err = msg ? msg_errcode(msg) : 0;
939 	if (unlikely(err)) {
940 		anc_data[0] = err;
941 		anc_data[1] = msg_data_sz(msg);
942 		res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
943 		if (res)
944 			return res;
945 		if (anc_data[1]) {
946 			res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
947 				       msg_data(msg));
948 			if (res)
949 				return res;
950 		}
951 	}
952 
953 	/* Optionally capture message destination object */
954 	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
955 	switch (dest_type) {
956 	case TIPC_NAMED_MSG:
957 		has_name = 1;
958 		anc_data[0] = msg_nametype(msg);
959 		anc_data[1] = msg_namelower(msg);
960 		anc_data[2] = msg_namelower(msg);
961 		break;
962 	case TIPC_MCAST_MSG:
963 		has_name = 1;
964 		anc_data[0] = msg_nametype(msg);
965 		anc_data[1] = msg_namelower(msg);
966 		anc_data[2] = msg_nameupper(msg);
967 		break;
968 	case TIPC_CONN_MSG:
969 		has_name = (tport->conn_type != 0);
970 		anc_data[0] = tport->conn_type;
971 		anc_data[1] = tport->conn_instance;
972 		anc_data[2] = tport->conn_instance;
973 		break;
974 	default:
975 		has_name = 0;
976 	}
977 	if (has_name) {
978 		res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
979 		if (res)
980 			return res;
981 	}
982 
983 	return 0;
984 }
985 
986 static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)
987 {
988 	struct sock *sk = sock->sk;
989 	DEFINE_WAIT(wait);
990 	int err;
991 
992 	for (;;) {
993 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
994 		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
995 			if (sock->state == SS_DISCONNECTING) {
996 				err = -ENOTCONN;
997 				break;
998 			}
999 			release_sock(sk);
1000 			timeo = schedule_timeout(timeo);
1001 			lock_sock(sk);
1002 		}
1003 		err = 0;
1004 		if (!skb_queue_empty(&sk->sk_receive_queue))
1005 			break;
1006 		err = sock_intr_errno(timeo);
1007 		if (signal_pending(current))
1008 			break;
1009 		err = -EAGAIN;
1010 		if (!timeo)
1011 			break;
1012 	}
1013 	finish_wait(sk_sleep(sk), &wait);
1014 	return err;
1015 }
1016 
1017 /**
1018  * tipc_recvmsg - receive packet-oriented message
1019  * @iocb: (unused)
1020  * @m: descriptor for message info
1021  * @buf_len: total size of user buffer area
1022  * @flags: receive flags
1023  *
1024  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1025  * If the complete message doesn't fit in user area, truncate it.
1026  *
1027  * Returns size of returned message data, errno otherwise
1028  */
1029 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1030 			struct msghdr *m, size_t buf_len, int flags)
1031 {
1032 	struct sock *sk = sock->sk;
1033 	struct tipc_sock *tsk = tipc_sk(sk);
1034 	struct tipc_port *port = &tsk->port;
1035 	struct sk_buff *buf;
1036 	struct tipc_msg *msg;
1037 	long timeo;
1038 	unsigned int sz;
1039 	u32 err;
1040 	int res;
1041 
1042 	/* Catch invalid receive requests */
1043 	if (unlikely(!buf_len))
1044 		return -EINVAL;
1045 
1046 	lock_sock(sk);
1047 
1048 	if (unlikely(sock->state == SS_UNCONNECTED)) {
1049 		res = -ENOTCONN;
1050 		goto exit;
1051 	}
1052 
1053 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1054 restart:
1055 
1056 	/* Look for a message in receive queue; wait if necessary */
1057 	res = tipc_wait_for_rcvmsg(sock, timeo);
1058 	if (res)
1059 		goto exit;
1060 
1061 	/* Look at first message in receive queue */
1062 	buf = skb_peek(&sk->sk_receive_queue);
1063 	msg = buf_msg(buf);
1064 	sz = msg_data_sz(msg);
1065 	err = msg_errcode(msg);
1066 
1067 	/* Discard an empty non-errored message & try again */
1068 	if ((!sz) && (!err)) {
1069 		advance_rx_queue(sk);
1070 		goto restart;
1071 	}
1072 
1073 	/* Capture sender's address (optional) */
1074 	set_orig_addr(m, msg);
1075 
1076 	/* Capture ancillary data (optional) */
1077 	res = anc_data_recv(m, msg, port);
1078 	if (res)
1079 		goto exit;
1080 
1081 	/* Capture message data (if valid) & compute return value (always) */
1082 	if (!err) {
1083 		if (unlikely(buf_len < sz)) {
1084 			sz = buf_len;
1085 			m->msg_flags |= MSG_TRUNC;
1086 		}
1087 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1088 					      m->msg_iov, sz);
1089 		if (res)
1090 			goto exit;
1091 		res = sz;
1092 	} else {
1093 		if ((sock->state == SS_READY) ||
1094 		    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1095 			res = 0;
1096 		else
1097 			res = -ECONNRESET;
1098 	}
1099 
1100 	/* Consume received message (optional) */
1101 	if (likely(!(flags & MSG_PEEK))) {
1102 		if ((sock->state != SS_READY) &&
1103 		    (++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1104 			tipc_acknowledge(port->ref, port->conn_unacked);
1105 		advance_rx_queue(sk);
1106 	}
1107 exit:
1108 	release_sock(sk);
1109 	return res;
1110 }
1111 
1112 /**
1113  * tipc_recv_stream - receive stream-oriented data
1114  * @iocb: (unused)
1115  * @m: descriptor for message info
1116  * @buf_len: total size of user buffer area
1117  * @flags: receive flags
1118  *
1119  * Used for SOCK_STREAM messages only.  If not enough data is available
1120  * will optionally wait for more; never truncates data.
1121  *
1122  * Returns size of returned message data, errno otherwise
1123  */
1124 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1125 			    struct msghdr *m, size_t buf_len, int flags)
1126 {
1127 	struct sock *sk = sock->sk;
1128 	struct tipc_sock *tsk = tipc_sk(sk);
1129 	struct tipc_port *port = &tsk->port;
1130 	struct sk_buff *buf;
1131 	struct tipc_msg *msg;
1132 	long timeo;
1133 	unsigned int sz;
1134 	int sz_to_copy, target, needed;
1135 	int sz_copied = 0;
1136 	u32 err;
1137 	int res = 0;
1138 
1139 	/* Catch invalid receive attempts */
1140 	if (unlikely(!buf_len))
1141 		return -EINVAL;
1142 
1143 	lock_sock(sk);
1144 
1145 	if (unlikely(sock->state == SS_UNCONNECTED)) {
1146 		res = -ENOTCONN;
1147 		goto exit;
1148 	}
1149 
1150 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1151 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1152 
1153 restart:
1154 	/* Look for a message in receive queue; wait if necessary */
1155 	res = tipc_wait_for_rcvmsg(sock, timeo);
1156 	if (res)
1157 		goto exit;
1158 
1159 	/* Look at first message in receive queue */
1160 	buf = skb_peek(&sk->sk_receive_queue);
1161 	msg = buf_msg(buf);
1162 	sz = msg_data_sz(msg);
1163 	err = msg_errcode(msg);
1164 
1165 	/* Discard an empty non-errored message & try again */
1166 	if ((!sz) && (!err)) {
1167 		advance_rx_queue(sk);
1168 		goto restart;
1169 	}
1170 
1171 	/* Optionally capture sender's address & ancillary data of first msg */
1172 	if (sz_copied == 0) {
1173 		set_orig_addr(m, msg);
1174 		res = anc_data_recv(m, msg, port);
1175 		if (res)
1176 			goto exit;
1177 	}
1178 
1179 	/* Capture message data (if valid) & compute return value (always) */
1180 	if (!err) {
1181 		u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1182 
1183 		sz -= offset;
1184 		needed = (buf_len - sz_copied);
1185 		sz_to_copy = (sz <= needed) ? sz : needed;
1186 
1187 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1188 					      m->msg_iov, sz_to_copy);
1189 		if (res)
1190 			goto exit;
1191 
1192 		sz_copied += sz_to_copy;
1193 
1194 		if (sz_to_copy < sz) {
1195 			if (!(flags & MSG_PEEK))
1196 				TIPC_SKB_CB(buf)->handle =
1197 				(void *)(unsigned long)(offset + sz_to_copy);
1198 			goto exit;
1199 		}
1200 	} else {
1201 		if (sz_copied != 0)
1202 			goto exit; /* can't add error msg to valid data */
1203 
1204 		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1205 			res = 0;
1206 		else
1207 			res = -ECONNRESET;
1208 	}
1209 
1210 	/* Consume received message (optional) */
1211 	if (likely(!(flags & MSG_PEEK))) {
1212 		if (unlikely(++port->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1213 			tipc_acknowledge(port->ref, port->conn_unacked);
1214 		advance_rx_queue(sk);
1215 	}
1216 
1217 	/* Loop around if more data is required */
1218 	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
1219 	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1220 	    (sz_copied < target)) &&	/* and more is ready or required */
1221 	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */
1222 	    (!err))			/* and haven't reached a FIN */
1223 		goto restart;
1224 
1225 exit:
1226 	release_sock(sk);
1227 	return sz_copied ? sz_copied : res;
1228 }
1229 
1230 /**
1231  * tipc_write_space - wake up thread if port congestion is released
1232  * @sk: socket
1233  */
1234 static void tipc_write_space(struct sock *sk)
1235 {
1236 	struct socket_wq *wq;
1237 
1238 	rcu_read_lock();
1239 	wq = rcu_dereference(sk->sk_wq);
1240 	if (wq_has_sleeper(wq))
1241 		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1242 						POLLWRNORM | POLLWRBAND);
1243 	rcu_read_unlock();
1244 }
1245 
1246 /**
1247  * tipc_data_ready - wake up threads to indicate messages have been received
1248  * @sk: socket
1249  * @len: the length of messages
1250  */
1251 static void tipc_data_ready(struct sock *sk)
1252 {
1253 	struct socket_wq *wq;
1254 
1255 	rcu_read_lock();
1256 	wq = rcu_dereference(sk->sk_wq);
1257 	if (wq_has_sleeper(wq))
1258 		wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1259 						POLLRDNORM | POLLRDBAND);
1260 	rcu_read_unlock();
1261 }
1262 
1263 /**
1264  * filter_connect - Handle all incoming messages for a connection-based socket
1265  * @tsk: TIPC socket
1266  * @msg: message
1267  *
1268  * Returns TIPC error status code and socket error status code
1269  * once it encounters some errors
1270  */
1271 static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1272 {
1273 	struct sock *sk = &tsk->sk;
1274 	struct tipc_port *port = &tsk->port;
1275 	struct socket *sock = sk->sk_socket;
1276 	struct tipc_msg *msg = buf_msg(*buf);
1277 
1278 	u32 retval = TIPC_ERR_NO_PORT;
1279 	int res;
1280 
1281 	if (msg_mcast(msg))
1282 		return retval;
1283 
1284 	switch ((int)sock->state) {
1285 	case SS_CONNECTED:
1286 		/* Accept only connection-based messages sent by peer */
1287 		if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1288 			if (unlikely(msg_errcode(msg))) {
1289 				sock->state = SS_DISCONNECTING;
1290 				__tipc_port_disconnect(port);
1291 			}
1292 			retval = TIPC_OK;
1293 		}
1294 		break;
1295 	case SS_CONNECTING:
1296 		/* Accept only ACK or NACK message */
1297 		if (unlikely(msg_errcode(msg))) {
1298 			sock->state = SS_DISCONNECTING;
1299 			sk->sk_err = ECONNREFUSED;
1300 			retval = TIPC_OK;
1301 			break;
1302 		}
1303 
1304 		if (unlikely(!msg_connected(msg)))
1305 			break;
1306 
1307 		res = auto_connect(tsk, msg);
1308 		if (res) {
1309 			sock->state = SS_DISCONNECTING;
1310 			sk->sk_err = -res;
1311 			retval = TIPC_OK;
1312 			break;
1313 		}
1314 
1315 		/* If an incoming message is an 'ACK-', it should be
1316 		 * discarded here because it doesn't contain useful
1317 		 * data. In addition, we should try to wake up
1318 		 * connect() routine if sleeping.
1319 		 */
1320 		if (msg_data_sz(msg) == 0) {
1321 			kfree_skb(*buf);
1322 			*buf = NULL;
1323 			if (waitqueue_active(sk_sleep(sk)))
1324 				wake_up_interruptible(sk_sleep(sk));
1325 		}
1326 		retval = TIPC_OK;
1327 		break;
1328 	case SS_LISTENING:
1329 	case SS_UNCONNECTED:
1330 		/* Accept only SYN message */
1331 		if (!msg_connected(msg) && !(msg_errcode(msg)))
1332 			retval = TIPC_OK;
1333 		break;
1334 	case SS_DISCONNECTING:
1335 		break;
1336 	default:
1337 		pr_err("Unknown socket state %u\n", sock->state);
1338 	}
1339 	return retval;
1340 }
1341 
1342 /**
1343  * rcvbuf_limit - get proper overload limit of socket receive queue
1344  * @sk: socket
1345  * @buf: message
1346  *
1347  * For all connection oriented messages, irrespective of importance,
1348  * the default overload value (i.e. 67MB) is set as limit.
1349  *
1350  * For all connectionless messages, by default new queue limits are
1351  * as belows:
1352  *
1353  * TIPC_LOW_IMPORTANCE       (4 MB)
1354  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1355  * TIPC_HIGH_IMPORTANCE      (16 MB)
1356  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1357  *
1358  * Returns overload limit according to corresponding message importance
1359  */
1360 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1361 {
1362 	struct tipc_msg *msg = buf_msg(buf);
1363 
1364 	if (msg_connected(msg))
1365 		return sysctl_tipc_rmem[2];
1366 
1367 	return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1368 		msg_importance(msg);
1369 }
1370 
1371 /**
1372  * filter_rcv - validate incoming message
1373  * @sk: socket
1374  * @buf: message
1375  *
1376  * Enqueues message on receive queue if acceptable; optionally handles
1377  * disconnect indication for a connected socket.
1378  *
1379  * Called with socket lock already taken; port lock may also be taken.
1380  *
1381  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1382  */
1383 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1384 {
1385 	struct socket *sock = sk->sk_socket;
1386 	struct tipc_sock *tsk = tipc_sk(sk);
1387 	struct tipc_msg *msg = buf_msg(buf);
1388 	unsigned int limit = rcvbuf_limit(sk, buf);
1389 	u32 res = TIPC_OK;
1390 
1391 	/* Reject message if it is wrong sort of message for socket */
1392 	if (msg_type(msg) > TIPC_DIRECT_MSG)
1393 		return TIPC_ERR_NO_PORT;
1394 
1395 	if (sock->state == SS_READY) {
1396 		if (msg_connected(msg))
1397 			return TIPC_ERR_NO_PORT;
1398 	} else {
1399 		res = filter_connect(tsk, &buf);
1400 		if (res != TIPC_OK || buf == NULL)
1401 			return res;
1402 	}
1403 
1404 	/* Reject message if there isn't room to queue it */
1405 	if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1406 		return TIPC_ERR_OVERLOAD;
1407 
1408 	/* Enqueue message */
1409 	TIPC_SKB_CB(buf)->handle = NULL;
1410 	__skb_queue_tail(&sk->sk_receive_queue, buf);
1411 	skb_set_owner_r(buf, sk);
1412 
1413 	sk->sk_data_ready(sk);
1414 	return TIPC_OK;
1415 }
1416 
1417 /**
1418  * backlog_rcv - handle incoming message from backlog queue
1419  * @sk: socket
1420  * @buf: message
1421  *
1422  * Caller must hold socket lock, but not port lock.
1423  *
1424  * Returns 0
1425  */
1426 static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1427 {
1428 	u32 res;
1429 
1430 	res = filter_rcv(sk, buf);
1431 	if (res)
1432 		tipc_reject_msg(buf, res);
1433 	return 0;
1434 }
1435 
1436 /**
1437  * tipc_sk_rcv - handle incoming message
1438  * @sk:  socket receiving message
1439  * @buf: message
1440  *
1441  * Called with port lock already taken.
1442  *
1443  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1444  */
1445 u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf)
1446 {
1447 	u32 res;
1448 
1449 	/*
1450 	 * Process message if socket is unlocked; otherwise add to backlog queue
1451 	 *
1452 	 * This code is based on sk_receive_skb(), but must be distinct from it
1453 	 * since a TIPC-specific filter/reject mechanism is utilized
1454 	 */
1455 	bh_lock_sock(sk);
1456 	if (!sock_owned_by_user(sk)) {
1457 		res = filter_rcv(sk, buf);
1458 	} else {
1459 		if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1460 			res = TIPC_ERR_OVERLOAD;
1461 		else
1462 			res = TIPC_OK;
1463 	}
1464 	bh_unlock_sock(sk);
1465 
1466 	return res;
1467 }
1468 
1469 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1470 {
1471 	struct sock *sk = sock->sk;
1472 	DEFINE_WAIT(wait);
1473 	int done;
1474 
1475 	do {
1476 		int err = sock_error(sk);
1477 		if (err)
1478 			return err;
1479 		if (!*timeo_p)
1480 			return -ETIMEDOUT;
1481 		if (signal_pending(current))
1482 			return sock_intr_errno(*timeo_p);
1483 
1484 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1485 		done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1486 		finish_wait(sk_sleep(sk), &wait);
1487 	} while (!done);
1488 	return 0;
1489 }
1490 
1491 /**
1492  * tipc_connect - establish a connection to another TIPC port
1493  * @sock: socket structure
1494  * @dest: socket address for destination port
1495  * @destlen: size of socket address data structure
1496  * @flags: file-related flags associated with socket
1497  *
1498  * Returns 0 on success, errno otherwise
1499  */
1500 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1501 			int destlen, int flags)
1502 {
1503 	struct sock *sk = sock->sk;
1504 	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1505 	struct msghdr m = {NULL,};
1506 	long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1507 	socket_state previous;
1508 	int res;
1509 
1510 	lock_sock(sk);
1511 
1512 	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1513 	if (sock->state == SS_READY) {
1514 		res = -EOPNOTSUPP;
1515 		goto exit;
1516 	}
1517 
1518 	/*
1519 	 * Reject connection attempt using multicast address
1520 	 *
1521 	 * Note: send_msg() validates the rest of the address fields,
1522 	 *       so there's no need to do it here
1523 	 */
1524 	if (dst->addrtype == TIPC_ADDR_MCAST) {
1525 		res = -EINVAL;
1526 		goto exit;
1527 	}
1528 
1529 	previous = sock->state;
1530 	switch (sock->state) {
1531 	case SS_UNCONNECTED:
1532 		/* Send a 'SYN-' to destination */
1533 		m.msg_name = dest;
1534 		m.msg_namelen = destlen;
1535 
1536 		/* If connect is in non-blocking case, set MSG_DONTWAIT to
1537 		 * indicate send_msg() is never blocked.
1538 		 */
1539 		if (!timeout)
1540 			m.msg_flags = MSG_DONTWAIT;
1541 
1542 		res = tipc_sendmsg(NULL, sock, &m, 0);
1543 		if ((res < 0) && (res != -EWOULDBLOCK))
1544 			goto exit;
1545 
1546 		/* Just entered SS_CONNECTING state; the only
1547 		 * difference is that return value in non-blocking
1548 		 * case is EINPROGRESS, rather than EALREADY.
1549 		 */
1550 		res = -EINPROGRESS;
1551 	case SS_CONNECTING:
1552 		if (previous == SS_CONNECTING)
1553 			res = -EALREADY;
1554 		if (!timeout)
1555 			goto exit;
1556 		timeout = msecs_to_jiffies(timeout);
1557 		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1558 		res = tipc_wait_for_connect(sock, &timeout);
1559 		break;
1560 	case SS_CONNECTED:
1561 		res = -EISCONN;
1562 		break;
1563 	default:
1564 		res = -EINVAL;
1565 		break;
1566 	}
1567 exit:
1568 	release_sock(sk);
1569 	return res;
1570 }
1571 
1572 /**
1573  * tipc_listen - allow socket to listen for incoming connections
1574  * @sock: socket structure
1575  * @len: (unused)
1576  *
1577  * Returns 0 on success, errno otherwise
1578  */
1579 static int tipc_listen(struct socket *sock, int len)
1580 {
1581 	struct sock *sk = sock->sk;
1582 	int res;
1583 
1584 	lock_sock(sk);
1585 
1586 	if (sock->state != SS_UNCONNECTED)
1587 		res = -EINVAL;
1588 	else {
1589 		sock->state = SS_LISTENING;
1590 		res = 0;
1591 	}
1592 
1593 	release_sock(sk);
1594 	return res;
1595 }
1596 
1597 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1598 {
1599 	struct sock *sk = sock->sk;
1600 	DEFINE_WAIT(wait);
1601 	int err;
1602 
1603 	/* True wake-one mechanism for incoming connections: only
1604 	 * one process gets woken up, not the 'whole herd'.
1605 	 * Since we do not 'race & poll' for established sockets
1606 	 * anymore, the common case will execute the loop only once.
1607 	*/
1608 	for (;;) {
1609 		prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1610 					  TASK_INTERRUPTIBLE);
1611 		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1612 			release_sock(sk);
1613 			timeo = schedule_timeout(timeo);
1614 			lock_sock(sk);
1615 		}
1616 		err = 0;
1617 		if (!skb_queue_empty(&sk->sk_receive_queue))
1618 			break;
1619 		err = -EINVAL;
1620 		if (sock->state != SS_LISTENING)
1621 			break;
1622 		err = sock_intr_errno(timeo);
1623 		if (signal_pending(current))
1624 			break;
1625 		err = -EAGAIN;
1626 		if (!timeo)
1627 			break;
1628 	}
1629 	finish_wait(sk_sleep(sk), &wait);
1630 	return err;
1631 }
1632 
1633 /**
1634  * tipc_accept - wait for connection request
1635  * @sock: listening socket
1636  * @newsock: new socket that is to be connected
1637  * @flags: file-related flags associated with socket
1638  *
1639  * Returns 0 on success, errno otherwise
1640  */
1641 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1642 {
1643 	struct sock *new_sk, *sk = sock->sk;
1644 	struct sk_buff *buf;
1645 	struct tipc_port *new_port;
1646 	struct tipc_msg *msg;
1647 	struct tipc_portid peer;
1648 	u32 new_ref;
1649 	long timeo;
1650 	int res;
1651 
1652 	lock_sock(sk);
1653 
1654 	if (sock->state != SS_LISTENING) {
1655 		res = -EINVAL;
1656 		goto exit;
1657 	}
1658 	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1659 	res = tipc_wait_for_accept(sock, timeo);
1660 	if (res)
1661 		goto exit;
1662 
1663 	buf = skb_peek(&sk->sk_receive_queue);
1664 
1665 	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1666 	if (res)
1667 		goto exit;
1668 
1669 	new_sk = new_sock->sk;
1670 	new_port = &tipc_sk(new_sk)->port;
1671 	new_ref = new_port->ref;
1672 	msg = buf_msg(buf);
1673 
1674 	/* we lock on new_sk; but lockdep sees the lock on sk */
1675 	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1676 
1677 	/*
1678 	 * Reject any stray messages received by new socket
1679 	 * before the socket lock was taken (very, very unlikely)
1680 	 */
1681 	reject_rx_queue(new_sk);
1682 
1683 	/* Connect new socket to it's peer */
1684 	peer.ref = msg_origport(msg);
1685 	peer.node = msg_orignode(msg);
1686 	tipc_port_connect(new_ref, &peer);
1687 	new_sock->state = SS_CONNECTED;
1688 
1689 	tipc_port_set_importance(new_port, msg_importance(msg));
1690 	if (msg_named(msg)) {
1691 		new_port->conn_type = msg_nametype(msg);
1692 		new_port->conn_instance = msg_nameinst(msg);
1693 	}
1694 
1695 	/*
1696 	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1697 	 * Respond to 'SYN+' by queuing it on new socket.
1698 	 */
1699 	if (!msg_data_sz(msg)) {
1700 		struct msghdr m = {NULL,};
1701 
1702 		advance_rx_queue(sk);
1703 		tipc_send_packet(NULL, new_sock, &m, 0);
1704 	} else {
1705 		__skb_dequeue(&sk->sk_receive_queue);
1706 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
1707 		skb_set_owner_r(buf, new_sk);
1708 	}
1709 	release_sock(new_sk);
1710 exit:
1711 	release_sock(sk);
1712 	return res;
1713 }
1714 
1715 /**
1716  * tipc_shutdown - shutdown socket connection
1717  * @sock: socket structure
1718  * @how: direction to close (must be SHUT_RDWR)
1719  *
1720  * Terminates connection (if necessary), then purges socket's receive queue.
1721  *
1722  * Returns 0 on success, errno otherwise
1723  */
1724 static int tipc_shutdown(struct socket *sock, int how)
1725 {
1726 	struct sock *sk = sock->sk;
1727 	struct tipc_sock *tsk = tipc_sk(sk);
1728 	struct tipc_port *port = &tsk->port;
1729 	struct sk_buff *buf;
1730 	int res;
1731 
1732 	if (how != SHUT_RDWR)
1733 		return -EINVAL;
1734 
1735 	lock_sock(sk);
1736 
1737 	switch (sock->state) {
1738 	case SS_CONNECTING:
1739 	case SS_CONNECTED:
1740 
1741 restart:
1742 		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1743 		buf = __skb_dequeue(&sk->sk_receive_queue);
1744 		if (buf) {
1745 			if (TIPC_SKB_CB(buf)->handle != NULL) {
1746 				kfree_skb(buf);
1747 				goto restart;
1748 			}
1749 			tipc_port_disconnect(port->ref);
1750 			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1751 		} else {
1752 			tipc_port_shutdown(port->ref);
1753 		}
1754 
1755 		sock->state = SS_DISCONNECTING;
1756 
1757 		/* fall through */
1758 
1759 	case SS_DISCONNECTING:
1760 
1761 		/* Discard any unreceived messages */
1762 		__skb_queue_purge(&sk->sk_receive_queue);
1763 
1764 		/* Wake up anyone sleeping in poll */
1765 		sk->sk_state_change(sk);
1766 		res = 0;
1767 		break;
1768 
1769 	default:
1770 		res = -ENOTCONN;
1771 	}
1772 
1773 	release_sock(sk);
1774 	return res;
1775 }
1776 
1777 /**
1778  * tipc_setsockopt - set socket option
1779  * @sock: socket structure
1780  * @lvl: option level
1781  * @opt: option identifier
1782  * @ov: pointer to new option value
1783  * @ol: length of option value
1784  *
1785  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1786  * (to ease compatibility).
1787  *
1788  * Returns 0 on success, errno otherwise
1789  */
1790 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1791 			   char __user *ov, unsigned int ol)
1792 {
1793 	struct sock *sk = sock->sk;
1794 	struct tipc_sock *tsk = tipc_sk(sk);
1795 	struct tipc_port *port = &tsk->port;
1796 	u32 value;
1797 	int res;
1798 
1799 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1800 		return 0;
1801 	if (lvl != SOL_TIPC)
1802 		return -ENOPROTOOPT;
1803 	if (ol < sizeof(value))
1804 		return -EINVAL;
1805 	res = get_user(value, (u32 __user *)ov);
1806 	if (res)
1807 		return res;
1808 
1809 	lock_sock(sk);
1810 
1811 	switch (opt) {
1812 	case TIPC_IMPORTANCE:
1813 		tipc_port_set_importance(port, value);
1814 		break;
1815 	case TIPC_SRC_DROPPABLE:
1816 		if (sock->type != SOCK_STREAM)
1817 			tipc_port_set_unreliable(port, value);
1818 		else
1819 			res = -ENOPROTOOPT;
1820 		break;
1821 	case TIPC_DEST_DROPPABLE:
1822 		tipc_port_set_unreturnable(port, value);
1823 		break;
1824 	case TIPC_CONN_TIMEOUT:
1825 		tipc_sk(sk)->conn_timeout = value;
1826 		/* no need to set "res", since already 0 at this point */
1827 		break;
1828 	default:
1829 		res = -EINVAL;
1830 	}
1831 
1832 	release_sock(sk);
1833 
1834 	return res;
1835 }
1836 
1837 /**
1838  * tipc_getsockopt - get socket option
1839  * @sock: socket structure
1840  * @lvl: option level
1841  * @opt: option identifier
1842  * @ov: receptacle for option value
1843  * @ol: receptacle for length of option value
1844  *
1845  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1846  * (to ease compatibility).
1847  *
1848  * Returns 0 on success, errno otherwise
1849  */
1850 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1851 			   char __user *ov, int __user *ol)
1852 {
1853 	struct sock *sk = sock->sk;
1854 	struct tipc_sock *tsk = tipc_sk(sk);
1855 	struct tipc_port *port = &tsk->port;
1856 	int len;
1857 	u32 value;
1858 	int res;
1859 
1860 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1861 		return put_user(0, ol);
1862 	if (lvl != SOL_TIPC)
1863 		return -ENOPROTOOPT;
1864 	res = get_user(len, ol);
1865 	if (res)
1866 		return res;
1867 
1868 	lock_sock(sk);
1869 
1870 	switch (opt) {
1871 	case TIPC_IMPORTANCE:
1872 		value = tipc_port_importance(port);
1873 		break;
1874 	case TIPC_SRC_DROPPABLE:
1875 		value = tipc_port_unreliable(port);
1876 		break;
1877 	case TIPC_DEST_DROPPABLE:
1878 		value = tipc_port_unreturnable(port);
1879 		break;
1880 	case TIPC_CONN_TIMEOUT:
1881 		value = tipc_sk(sk)->conn_timeout;
1882 		/* no need to set "res", since already 0 at this point */
1883 		break;
1884 	case TIPC_NODE_RECVQ_DEPTH:
1885 		value = 0; /* was tipc_queue_size, now obsolete */
1886 		break;
1887 	case TIPC_SOCK_RECVQ_DEPTH:
1888 		value = skb_queue_len(&sk->sk_receive_queue);
1889 		break;
1890 	default:
1891 		res = -EINVAL;
1892 	}
1893 
1894 	release_sock(sk);
1895 
1896 	if (res)
1897 		return res;	/* "get" failed */
1898 
1899 	if (len < sizeof(value))
1900 		return -EINVAL;
1901 
1902 	if (copy_to_user(ov, &value, sizeof(value)))
1903 		return -EFAULT;
1904 
1905 	return put_user(sizeof(value), ol);
1906 }
1907 
1908 /* Protocol switches for the various types of TIPC sockets */
1909 
1910 static const struct proto_ops msg_ops = {
1911 	.owner		= THIS_MODULE,
1912 	.family		= AF_TIPC,
1913 	.release	= tipc_release,
1914 	.bind		= tipc_bind,
1915 	.connect	= tipc_connect,
1916 	.socketpair	= sock_no_socketpair,
1917 	.accept		= sock_no_accept,
1918 	.getname	= tipc_getname,
1919 	.poll		= tipc_poll,
1920 	.ioctl		= sock_no_ioctl,
1921 	.listen		= sock_no_listen,
1922 	.shutdown	= tipc_shutdown,
1923 	.setsockopt	= tipc_setsockopt,
1924 	.getsockopt	= tipc_getsockopt,
1925 	.sendmsg	= tipc_sendmsg,
1926 	.recvmsg	= tipc_recvmsg,
1927 	.mmap		= sock_no_mmap,
1928 	.sendpage	= sock_no_sendpage
1929 };
1930 
1931 static const struct proto_ops packet_ops = {
1932 	.owner		= THIS_MODULE,
1933 	.family		= AF_TIPC,
1934 	.release	= tipc_release,
1935 	.bind		= tipc_bind,
1936 	.connect	= tipc_connect,
1937 	.socketpair	= sock_no_socketpair,
1938 	.accept		= tipc_accept,
1939 	.getname	= tipc_getname,
1940 	.poll		= tipc_poll,
1941 	.ioctl		= sock_no_ioctl,
1942 	.listen		= tipc_listen,
1943 	.shutdown	= tipc_shutdown,
1944 	.setsockopt	= tipc_setsockopt,
1945 	.getsockopt	= tipc_getsockopt,
1946 	.sendmsg	= tipc_send_packet,
1947 	.recvmsg	= tipc_recvmsg,
1948 	.mmap		= sock_no_mmap,
1949 	.sendpage	= sock_no_sendpage
1950 };
1951 
1952 static const struct proto_ops stream_ops = {
1953 	.owner		= THIS_MODULE,
1954 	.family		= AF_TIPC,
1955 	.release	= tipc_release,
1956 	.bind		= tipc_bind,
1957 	.connect	= tipc_connect,
1958 	.socketpair	= sock_no_socketpair,
1959 	.accept		= tipc_accept,
1960 	.getname	= tipc_getname,
1961 	.poll		= tipc_poll,
1962 	.ioctl		= sock_no_ioctl,
1963 	.listen		= tipc_listen,
1964 	.shutdown	= tipc_shutdown,
1965 	.setsockopt	= tipc_setsockopt,
1966 	.getsockopt	= tipc_getsockopt,
1967 	.sendmsg	= tipc_send_stream,
1968 	.recvmsg	= tipc_recv_stream,
1969 	.mmap		= sock_no_mmap,
1970 	.sendpage	= sock_no_sendpage
1971 };
1972 
1973 static const struct net_proto_family tipc_family_ops = {
1974 	.owner		= THIS_MODULE,
1975 	.family		= AF_TIPC,
1976 	.create		= tipc_sk_create
1977 };
1978 
1979 static struct proto tipc_proto = {
1980 	.name		= "TIPC",
1981 	.owner		= THIS_MODULE,
1982 	.obj_size	= sizeof(struct tipc_sock),
1983 	.sysctl_rmem	= sysctl_tipc_rmem
1984 };
1985 
1986 static struct proto tipc_proto_kern = {
1987 	.name		= "TIPC",
1988 	.obj_size	= sizeof(struct tipc_sock),
1989 	.sysctl_rmem	= sysctl_tipc_rmem
1990 };
1991 
1992 /**
1993  * tipc_socket_init - initialize TIPC socket interface
1994  *
1995  * Returns 0 on success, errno otherwise
1996  */
1997 int tipc_socket_init(void)
1998 {
1999 	int res;
2000 
2001 	res = proto_register(&tipc_proto, 1);
2002 	if (res) {
2003 		pr_err("Failed to register TIPC protocol type\n");
2004 		goto out;
2005 	}
2006 
2007 	res = sock_register(&tipc_family_ops);
2008 	if (res) {
2009 		pr_err("Failed to register TIPC socket type\n");
2010 		proto_unregister(&tipc_proto);
2011 		goto out;
2012 	}
2013  out:
2014 	return res;
2015 }
2016 
2017 /**
2018  * tipc_socket_stop - stop TIPC socket interface
2019  */
2020 void tipc_socket_stop(void)
2021 {
2022 	sock_unregister(tipc_family_ops.family);
2023 	proto_unregister(&tipc_proto);
2024 }
2025