xref: /openbmc/linux/net/tipc/socket.c (revision 206204a1)
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "port.h"
39 #include "node.h"
40 
41 #include <linux/export.h>
42 
43 #define SS_LISTENING	-1	/* socket is listening */
44 #define SS_READY	-2	/* socket is connectionless */
45 
46 #define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
47 
48 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
49 static void tipc_data_ready(struct sock *sk);
50 static void tipc_write_space(struct sock *sk);
51 static int tipc_release(struct socket *sock);
52 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
53 
54 static const struct proto_ops packet_ops;
55 static const struct proto_ops stream_ops;
56 static const struct proto_ops msg_ops;
57 
58 static struct proto tipc_proto;
59 static struct proto tipc_proto_kern;
60 
61 /*
62  * Revised TIPC socket locking policy:
63  *
64  * Most socket operations take the standard socket lock when they start
65  * and hold it until they finish (or until they need to sleep).  Acquiring
66  * this lock grants the owner exclusive access to the fields of the socket
67  * data structures, with the exception of the backlog queue.  A few socket
68  * operations can be done without taking the socket lock because they only
69  * read socket information that never changes during the life of the socket.
70  *
71  * Socket operations may acquire the lock for the associated TIPC port if they
72  * need to perform an operation on the port.  If any routine needs to acquire
73  * both the socket lock and the port lock it must take the socket lock first
74  * to avoid the risk of deadlock.
75  *
76  * The dispatcher handling incoming messages cannot grab the socket lock in
77  * the standard fashion, since invoked it runs at the BH level and cannot block.
78  * Instead, it checks to see if the socket lock is currently owned by someone,
79  * and either handles the message itself or adds it to the socket's backlog
80  * queue; in the latter case the queued message is processed once the process
81  * owning the socket lock releases it.
82  *
83  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
84  * the problem of a blocked socket operation preventing any other operations
85  * from occurring.  However, applications must be careful if they have
86  * multiple threads trying to send (or receive) on the same socket, as these
87  * operations might interfere with each other.  For example, doing a connect
88  * and a receive at the same time might allow the receive to consume the
89  * ACK message meant for the connect.  While additional work could be done
90  * to try and overcome this, it doesn't seem to be worthwhile at the present.
91  *
92  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
93  * that another operation that must be performed in a non-blocking manner is
94  * not delayed for very long because the lock has already been taken.
95  *
96  * NOTE: This code assumes that certain fields of a port/socket pair are
97  * constant over its lifetime; such fields can be examined without taking
98  * the socket lock and/or port lock, and do not need to be re-read even
99  * after resuming processing after waiting.  These fields include:
100  *   - socket type
101  *   - pointer to socket sk structure (aka tipc_sock structure)
102  *   - pointer to port structure
103  *   - port reference
104  */
105 
106 #include "socket.h"
107 
108 /**
109  * advance_rx_queue - discard first buffer in socket receive queue
110  *
111  * Caller must hold socket lock
112  */
113 static void advance_rx_queue(struct sock *sk)
114 {
115 	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
116 }
117 
118 /**
119  * reject_rx_queue - reject all buffers in socket receive queue
120  *
121  * Caller must hold socket lock
122  */
123 static void reject_rx_queue(struct sock *sk)
124 {
125 	struct sk_buff *buf;
126 
127 	while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
128 		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
129 }
130 
131 /**
132  * tipc_sk_create - create a TIPC socket
133  * @net: network namespace (must be default network)
134  * @sock: pre-allocated socket structure
135  * @protocol: protocol indicator (must be 0)
136  * @kern: caused by kernel or by userspace?
137  *
138  * This routine creates additional data structures used by the TIPC socket,
139  * initializes them, and links them together.
140  *
141  * Returns 0 on success, errno otherwise
142  */
143 static int tipc_sk_create(struct net *net, struct socket *sock,
144 			  int protocol, int kern)
145 {
146 	const struct proto_ops *ops;
147 	socket_state state;
148 	struct sock *sk;
149 	struct tipc_sock *tsk;
150 	struct tipc_port *port;
151 	u32 ref;
152 
153 	/* Validate arguments */
154 	if (unlikely(protocol != 0))
155 		return -EPROTONOSUPPORT;
156 
157 	switch (sock->type) {
158 	case SOCK_STREAM:
159 		ops = &stream_ops;
160 		state = SS_UNCONNECTED;
161 		break;
162 	case SOCK_SEQPACKET:
163 		ops = &packet_ops;
164 		state = SS_UNCONNECTED;
165 		break;
166 	case SOCK_DGRAM:
167 	case SOCK_RDM:
168 		ops = &msg_ops;
169 		state = SS_READY;
170 		break;
171 	default:
172 		return -EPROTOTYPE;
173 	}
174 
175 	/* Allocate socket's protocol area */
176 	if (!kern)
177 		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
178 	else
179 		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
180 
181 	if (sk == NULL)
182 		return -ENOMEM;
183 
184 	tsk = tipc_sk(sk);
185 	port = &tsk->port;
186 
187 	ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
188 	if (!ref) {
189 		pr_warn("Socket registration failed, ref. table exhausted\n");
190 		sk_free(sk);
191 		return -ENOMEM;
192 	}
193 
194 	/* Finish initializing socket data structures */
195 	sock->ops = ops;
196 	sock->state = state;
197 
198 	sock_init_data(sock, sk);
199 	sk->sk_backlog_rcv = tipc_backlog_rcv;
200 	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
201 	sk->sk_data_ready = tipc_data_ready;
202 	sk->sk_write_space = tipc_write_space;
203 	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
204 	atomic_set(&tsk->dupl_rcvcnt, 0);
205 	tipc_port_unlock(port);
206 
207 	if (sock->state == SS_READY) {
208 		tipc_port_set_unreturnable(port, true);
209 		if (sock->type == SOCK_DGRAM)
210 			tipc_port_set_unreliable(port, true);
211 	}
212 	return 0;
213 }
214 
215 /**
216  * tipc_sock_create_local - create TIPC socket from inside TIPC module
217  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
218  *
219  * We cannot use sock_creat_kern here because it bumps module user count.
220  * Since socket owner and creator is the same module we must make sure
221  * that module count remains zero for module local sockets, otherwise
222  * we cannot do rmmod.
223  *
224  * Returns 0 on success, errno otherwise
225  */
226 int tipc_sock_create_local(int type, struct socket **res)
227 {
228 	int rc;
229 
230 	rc = sock_create_lite(AF_TIPC, type, 0, res);
231 	if (rc < 0) {
232 		pr_err("Failed to create kernel socket\n");
233 		return rc;
234 	}
235 	tipc_sk_create(&init_net, *res, 0, 1);
236 
237 	return 0;
238 }
239 
240 /**
241  * tipc_sock_release_local - release socket created by tipc_sock_create_local
242  * @sock: the socket to be released.
243  *
244  * Module reference count is not incremented when such sockets are created,
245  * so we must keep it from being decremented when they are released.
246  */
247 void tipc_sock_release_local(struct socket *sock)
248 {
249 	tipc_release(sock);
250 	sock->ops = NULL;
251 	sock_release(sock);
252 }
253 
254 /**
255  * tipc_sock_accept_local - accept a connection on a socket created
256  * with tipc_sock_create_local. Use this function to avoid that
257  * module reference count is inadvertently incremented.
258  *
259  * @sock:    the accepting socket
260  * @newsock: reference to the new socket to be created
261  * @flags:   socket flags
262  */
263 
264 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
265 			   int flags)
266 {
267 	struct sock *sk = sock->sk;
268 	int ret;
269 
270 	ret = sock_create_lite(sk->sk_family, sk->sk_type,
271 			       sk->sk_protocol, newsock);
272 	if (ret < 0)
273 		return ret;
274 
275 	ret = tipc_accept(sock, *newsock, flags);
276 	if (ret < 0) {
277 		sock_release(*newsock);
278 		return ret;
279 	}
280 	(*newsock)->ops = sock->ops;
281 	return ret;
282 }
283 
284 /**
285  * tipc_release - destroy a TIPC socket
286  * @sock: socket to destroy
287  *
288  * This routine cleans up any messages that are still queued on the socket.
289  * For DGRAM and RDM socket types, all queued messages are rejected.
290  * For SEQPACKET and STREAM socket types, the first message is rejected
291  * and any others are discarded.  (If the first message on a STREAM socket
292  * is partially-read, it is discarded and the next one is rejected instead.)
293  *
294  * NOTE: Rejected messages are not necessarily returned to the sender!  They
295  * are returned or discarded according to the "destination droppable" setting
296  * specified for the message by the sender.
297  *
298  * Returns 0 on success, errno otherwise
299  */
300 static int tipc_release(struct socket *sock)
301 {
302 	struct sock *sk = sock->sk;
303 	struct tipc_sock *tsk;
304 	struct tipc_port *port;
305 	struct sk_buff *buf;
306 
307 	/*
308 	 * Exit if socket isn't fully initialized (occurs when a failed accept()
309 	 * releases a pre-allocated child socket that was never used)
310 	 */
311 	if (sk == NULL)
312 		return 0;
313 
314 	tsk = tipc_sk(sk);
315 	port = &tsk->port;
316 	lock_sock(sk);
317 
318 	/*
319 	 * Reject all unreceived messages, except on an active connection
320 	 * (which disconnects locally & sends a 'FIN+' to peer)
321 	 */
322 	while (sock->state != SS_DISCONNECTING) {
323 		buf = __skb_dequeue(&sk->sk_receive_queue);
324 		if (buf == NULL)
325 			break;
326 		if (TIPC_SKB_CB(buf)->handle != NULL)
327 			kfree_skb(buf);
328 		else {
329 			if ((sock->state == SS_CONNECTING) ||
330 			    (sock->state == SS_CONNECTED)) {
331 				sock->state = SS_DISCONNECTING;
332 				tipc_port_disconnect(port->ref);
333 			}
334 			tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
335 		}
336 	}
337 
338 	/* Destroy TIPC port; also disconnects an active connection and
339 	 * sends a 'FIN-' to peer.
340 	 */
341 	tipc_port_destroy(port);
342 
343 	/* Discard any remaining (connection-based) messages in receive queue */
344 	__skb_queue_purge(&sk->sk_receive_queue);
345 
346 	/* Reject any messages that accumulated in backlog queue */
347 	sock->state = SS_DISCONNECTING;
348 	release_sock(sk);
349 
350 	sock_put(sk);
351 	sock->sk = NULL;
352 
353 	return 0;
354 }
355 
356 /**
357  * tipc_bind - associate or disassocate TIPC name(s) with a socket
358  * @sock: socket structure
359  * @uaddr: socket address describing name(s) and desired operation
360  * @uaddr_len: size of socket address data structure
361  *
362  * Name and name sequence binding is indicated using a positive scope value;
363  * a negative scope value unbinds the specified name.  Specifying no name
364  * (i.e. a socket address length of 0) unbinds all names from the socket.
365  *
366  * Returns 0 on success, errno otherwise
367  *
368  * NOTE: This routine doesn't need to take the socket lock since it doesn't
369  *       access any non-constant socket information.
370  */
371 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
372 		     int uaddr_len)
373 {
374 	struct sock *sk = sock->sk;
375 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
376 	struct tipc_sock *tsk = tipc_sk(sk);
377 	int res = -EINVAL;
378 
379 	lock_sock(sk);
380 	if (unlikely(!uaddr_len)) {
381 		res = tipc_withdraw(&tsk->port, 0, NULL);
382 		goto exit;
383 	}
384 
385 	if (uaddr_len < sizeof(struct sockaddr_tipc)) {
386 		res = -EINVAL;
387 		goto exit;
388 	}
389 	if (addr->family != AF_TIPC) {
390 		res = -EAFNOSUPPORT;
391 		goto exit;
392 	}
393 
394 	if (addr->addrtype == TIPC_ADDR_NAME)
395 		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
396 	else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
397 		res = -EAFNOSUPPORT;
398 		goto exit;
399 	}
400 
401 	if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
402 	    (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
403 	    (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
404 		res = -EACCES;
405 		goto exit;
406 	}
407 
408 	res = (addr->scope > 0) ?
409 		tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
410 		tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
411 exit:
412 	release_sock(sk);
413 	return res;
414 }
415 
416 /**
417  * tipc_getname - get port ID of socket or peer socket
418  * @sock: socket structure
419  * @uaddr: area for returned socket address
420  * @uaddr_len: area for returned length of socket address
421  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
422  *
423  * Returns 0 on success, errno otherwise
424  *
425  * NOTE: This routine doesn't need to take the socket lock since it only
426  *       accesses socket information that is unchanging (or which changes in
427  *       a completely predictable manner).
428  */
429 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
430 			int *uaddr_len, int peer)
431 {
432 	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
433 	struct tipc_sock *tsk = tipc_sk(sock->sk);
434 
435 	memset(addr, 0, sizeof(*addr));
436 	if (peer) {
437 		if ((sock->state != SS_CONNECTED) &&
438 			((peer != 2) || (sock->state != SS_DISCONNECTING)))
439 			return -ENOTCONN;
440 		addr->addr.id.ref = tipc_port_peerport(&tsk->port);
441 		addr->addr.id.node = tipc_port_peernode(&tsk->port);
442 	} else {
443 		addr->addr.id.ref = tsk->port.ref;
444 		addr->addr.id.node = tipc_own_addr;
445 	}
446 
447 	*uaddr_len = sizeof(*addr);
448 	addr->addrtype = TIPC_ADDR_ID;
449 	addr->family = AF_TIPC;
450 	addr->scope = 0;
451 	addr->addr.name.domain = 0;
452 
453 	return 0;
454 }
455 
456 /**
457  * tipc_poll - read and possibly block on pollmask
458  * @file: file structure associated with the socket
459  * @sock: socket for which to calculate the poll bits
460  * @wait: ???
461  *
462  * Returns pollmask value
463  *
464  * COMMENTARY:
465  * It appears that the usual socket locking mechanisms are not useful here
466  * since the pollmask info is potentially out-of-date the moment this routine
467  * exits.  TCP and other protocols seem to rely on higher level poll routines
468  * to handle any preventable race conditions, so TIPC will do the same ...
469  *
470  * TIPC sets the returned events as follows:
471  *
472  * socket state		flags set
473  * ------------		---------
474  * unconnected		no read flags
475  *			POLLOUT if port is not congested
476  *
477  * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue
478  *			no write flags
479  *
480  * connected		POLLIN/POLLRDNORM if data in rx queue
481  *			POLLOUT if port is not congested
482  *
483  * disconnecting	POLLIN/POLLRDNORM/POLLHUP
484  *			no write flags
485  *
486  * listening		POLLIN if SYN in rx queue
487  *			no write flags
488  *
489  * ready		POLLIN/POLLRDNORM if data in rx queue
490  * [connectionless]	POLLOUT (since port cannot be congested)
491  *
492  * IMPORTANT: The fact that a read or write operation is indicated does NOT
493  * imply that the operation will succeed, merely that it should be performed
494  * and will not block.
495  */
496 static unsigned int tipc_poll(struct file *file, struct socket *sock,
497 			      poll_table *wait)
498 {
499 	struct sock *sk = sock->sk;
500 	struct tipc_sock *tsk = tipc_sk(sk);
501 	u32 mask = 0;
502 
503 	sock_poll_wait(file, sk_sleep(sk), wait);
504 
505 	switch ((int)sock->state) {
506 	case SS_UNCONNECTED:
507 		if (!tsk->port.congested)
508 			mask |= POLLOUT;
509 		break;
510 	case SS_READY:
511 	case SS_CONNECTED:
512 		if (!tsk->port.congested)
513 			mask |= POLLOUT;
514 		/* fall thru' */
515 	case SS_CONNECTING:
516 	case SS_LISTENING:
517 		if (!skb_queue_empty(&sk->sk_receive_queue))
518 			mask |= (POLLIN | POLLRDNORM);
519 		break;
520 	case SS_DISCONNECTING:
521 		mask = (POLLIN | POLLRDNORM | POLLHUP);
522 		break;
523 	}
524 
525 	return mask;
526 }
527 
528 /**
529  * dest_name_check - verify user is permitted to send to specified port name
530  * @dest: destination address
531  * @m: descriptor for message to be sent
532  *
533  * Prevents restricted configuration commands from being issued by
534  * unauthorized users.
535  *
536  * Returns 0 if permission is granted, otherwise errno
537  */
538 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
539 {
540 	struct tipc_cfg_msg_hdr hdr;
541 
542 	if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
543 		return 0;
544 	if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
545 		return 0;
546 	if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
547 		return -EACCES;
548 
549 	if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
550 		return -EMSGSIZE;
551 	if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
552 		return -EFAULT;
553 	if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
554 		return -EACCES;
555 
556 	return 0;
557 }
558 
559 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
560 {
561 	struct sock *sk = sock->sk;
562 	struct tipc_sock *tsk = tipc_sk(sk);
563 	DEFINE_WAIT(wait);
564 	int done;
565 
566 	do {
567 		int err = sock_error(sk);
568 		if (err)
569 			return err;
570 		if (sock->state == SS_DISCONNECTING)
571 			return -EPIPE;
572 		if (!*timeo_p)
573 			return -EAGAIN;
574 		if (signal_pending(current))
575 			return sock_intr_errno(*timeo_p);
576 
577 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
578 		done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
579 		finish_wait(sk_sleep(sk), &wait);
580 	} while (!done);
581 	return 0;
582 }
583 
584 
585 /**
586  * tipc_sendmsg - send message in connectionless manner
587  * @iocb: if NULL, indicates that socket lock is already held
588  * @sock: socket structure
589  * @m: message to send
590  * @total_len: length of message
591  *
592  * Message must have an destination specified explicitly.
593  * Used for SOCK_RDM and SOCK_DGRAM messages,
594  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
595  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
596  *
597  * Returns the number of bytes sent on success, or errno otherwise
598  */
599 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
600 			struct msghdr *m, size_t total_len)
601 {
602 	struct sock *sk = sock->sk;
603 	struct tipc_sock *tsk = tipc_sk(sk);
604 	struct tipc_port *port = &tsk->port;
605 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
606 	int needs_conn;
607 	long timeo;
608 	int res = -EINVAL;
609 
610 	if (unlikely(!dest))
611 		return -EDESTADDRREQ;
612 	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
613 		     (dest->family != AF_TIPC)))
614 		return -EINVAL;
615 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
616 		return -EMSGSIZE;
617 
618 	if (iocb)
619 		lock_sock(sk);
620 
621 	needs_conn = (sock->state != SS_READY);
622 	if (unlikely(needs_conn)) {
623 		if (sock->state == SS_LISTENING) {
624 			res = -EPIPE;
625 			goto exit;
626 		}
627 		if (sock->state != SS_UNCONNECTED) {
628 			res = -EISCONN;
629 			goto exit;
630 		}
631 		if (tsk->port.published) {
632 			res = -EOPNOTSUPP;
633 			goto exit;
634 		}
635 		if (dest->addrtype == TIPC_ADDR_NAME) {
636 			tsk->port.conn_type = dest->addr.name.name.type;
637 			tsk->port.conn_instance = dest->addr.name.name.instance;
638 		}
639 
640 		/* Abort any pending connection attempts (very unlikely) */
641 		reject_rx_queue(sk);
642 	}
643 
644 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
645 	do {
646 		if (dest->addrtype == TIPC_ADDR_NAME) {
647 			res = dest_name_check(dest, m);
648 			if (res)
649 				break;
650 			res = tipc_send2name(port,
651 					     &dest->addr.name.name,
652 					     dest->addr.name.domain,
653 					     m->msg_iov,
654 					     total_len);
655 		} else if (dest->addrtype == TIPC_ADDR_ID) {
656 			res = tipc_send2port(port,
657 					     &dest->addr.id,
658 					     m->msg_iov,
659 					     total_len);
660 		} else if (dest->addrtype == TIPC_ADDR_MCAST) {
661 			if (needs_conn) {
662 				res = -EOPNOTSUPP;
663 				break;
664 			}
665 			res = dest_name_check(dest, m);
666 			if (res)
667 				break;
668 			res = tipc_port_mcast_xmit(port,
669 						   &dest->addr.nameseq,
670 						   m->msg_iov,
671 						   total_len);
672 		}
673 		if (likely(res != -ELINKCONG)) {
674 			if (needs_conn && (res >= 0))
675 				sock->state = SS_CONNECTING;
676 			break;
677 		}
678 		res = tipc_wait_for_sndmsg(sock, &timeo);
679 		if (res)
680 			break;
681 	} while (1);
682 
683 exit:
684 	if (iocb)
685 		release_sock(sk);
686 	return res;
687 }
688 
689 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
690 {
691 	struct sock *sk = sock->sk;
692 	struct tipc_sock *tsk = tipc_sk(sk);
693 	struct tipc_port *port = &tsk->port;
694 	DEFINE_WAIT(wait);
695 	int done;
696 
697 	do {
698 		int err = sock_error(sk);
699 		if (err)
700 			return err;
701 		if (sock->state == SS_DISCONNECTING)
702 			return -EPIPE;
703 		else if (sock->state != SS_CONNECTED)
704 			return -ENOTCONN;
705 		if (!*timeo_p)
706 			return -EAGAIN;
707 		if (signal_pending(current))
708 			return sock_intr_errno(*timeo_p);
709 
710 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
711 		done = sk_wait_event(sk, timeo_p,
712 				     (!port->congested || !port->connected));
713 		finish_wait(sk_sleep(sk), &wait);
714 	} while (!done);
715 	return 0;
716 }
717 
718 /**
719  * tipc_send_packet - send a connection-oriented message
720  * @iocb: if NULL, indicates that socket lock is already held
721  * @sock: socket structure
722  * @m: message to send
723  * @total_len: length of message
724  *
725  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
726  *
727  * Returns the number of bytes sent on success, or errno otherwise
728  */
729 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
730 			    struct msghdr *m, size_t total_len)
731 {
732 	struct sock *sk = sock->sk;
733 	struct tipc_sock *tsk = tipc_sk(sk);
734 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
735 	int res = -EINVAL;
736 	long timeo;
737 
738 	/* Handle implied connection establishment */
739 	if (unlikely(dest))
740 		return tipc_sendmsg(iocb, sock, m, total_len);
741 
742 	if (total_len > TIPC_MAX_USER_MSG_SIZE)
743 		return -EMSGSIZE;
744 
745 	if (iocb)
746 		lock_sock(sk);
747 
748 	if (unlikely(sock->state != SS_CONNECTED)) {
749 		if (sock->state == SS_DISCONNECTING)
750 			res = -EPIPE;
751 		else
752 			res = -ENOTCONN;
753 		goto exit;
754 	}
755 
756 	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
757 	do {
758 		res = tipc_send(&tsk->port, m->msg_iov, total_len);
759 		if (likely(res != -ELINKCONG))
760 			break;
761 		res = tipc_wait_for_sndpkt(sock, &timeo);
762 		if (res)
763 			break;
764 	} while (1);
765 exit:
766 	if (iocb)
767 		release_sock(sk);
768 	return res;
769 }
770 
771 /**
772  * tipc_send_stream - send stream-oriented data
773  * @iocb: (unused)
774  * @sock: socket structure
775  * @m: data to send
776  * @total_len: total length of data to be sent
777  *
778  * Used for SOCK_STREAM data.
779  *
780  * Returns the number of bytes sent on success (or partial success),
781  * or errno if no data sent
782  */
783 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
784 			    struct msghdr *m, size_t total_len)
785 {
786 	struct sock *sk = sock->sk;
787 	struct tipc_sock *tsk = tipc_sk(sk);
788 	struct msghdr my_msg;
789 	struct iovec my_iov;
790 	struct iovec *curr_iov;
791 	int curr_iovlen;
792 	char __user *curr_start;
793 	u32 hdr_size;
794 	int curr_left;
795 	int bytes_to_send;
796 	int bytes_sent;
797 	int res;
798 
799 	lock_sock(sk);
800 
801 	/* Handle special cases where there is no connection */
802 	if (unlikely(sock->state != SS_CONNECTED)) {
803 		if (sock->state == SS_UNCONNECTED)
804 			res = tipc_send_packet(NULL, sock, m, total_len);
805 		else
806 			res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
807 		goto exit;
808 	}
809 
810 	if (unlikely(m->msg_name)) {
811 		res = -EISCONN;
812 		goto exit;
813 	}
814 
815 	if (total_len > (unsigned int)INT_MAX) {
816 		res = -EMSGSIZE;
817 		goto exit;
818 	}
819 
820 	/*
821 	 * Send each iovec entry using one or more messages
822 	 *
823 	 * Note: This algorithm is good for the most likely case
824 	 * (i.e. one large iovec entry), but could be improved to pass sets
825 	 * of small iovec entries into send_packet().
826 	 */
827 	curr_iov = m->msg_iov;
828 	curr_iovlen = m->msg_iovlen;
829 	my_msg.msg_iov = &my_iov;
830 	my_msg.msg_iovlen = 1;
831 	my_msg.msg_flags = m->msg_flags;
832 	my_msg.msg_name = NULL;
833 	bytes_sent = 0;
834 
835 	hdr_size = msg_hdr_sz(&tsk->port.phdr);
836 
837 	while (curr_iovlen--) {
838 		curr_start = curr_iov->iov_base;
839 		curr_left = curr_iov->iov_len;
840 
841 		while (curr_left) {
842 			bytes_to_send = tsk->port.max_pkt - hdr_size;
843 			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
844 				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
845 			if (curr_left < bytes_to_send)
846 				bytes_to_send = curr_left;
847 			my_iov.iov_base = curr_start;
848 			my_iov.iov_len = bytes_to_send;
849 			res = tipc_send_packet(NULL, sock, &my_msg,
850 					       bytes_to_send);
851 			if (res < 0) {
852 				if (bytes_sent)
853 					res = bytes_sent;
854 				goto exit;
855 			}
856 			curr_left -= bytes_to_send;
857 			curr_start += bytes_to_send;
858 			bytes_sent += bytes_to_send;
859 		}
860 
861 		curr_iov++;
862 	}
863 	res = bytes_sent;
864 exit:
865 	release_sock(sk);
866 	return res;
867 }
868 
869 /**
870  * auto_connect - complete connection setup to a remote port
871  * @tsk: tipc socket structure
872  * @msg: peer's response message
873  *
874  * Returns 0 on success, errno otherwise
875  */
876 static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
877 {
878 	struct tipc_port *port = &tsk->port;
879 	struct socket *sock = tsk->sk.sk_socket;
880 	struct tipc_portid peer;
881 
882 	peer.ref = msg_origport(msg);
883 	peer.node = msg_orignode(msg);
884 
885 	__tipc_port_connect(port->ref, port, &peer);
886 
887 	if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
888 		return -EINVAL;
889 	msg_set_importance(&port->phdr, (u32)msg_importance(msg));
890 	sock->state = SS_CONNECTED;
891 	return 0;
892 }
893 
894 /**
895  * set_orig_addr - capture sender's address for received message
896  * @m: descriptor for message info
897  * @msg: received message header
898  *
899  * Note: Address is not captured if not requested by receiver.
900  */
901 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
902 {
903 	DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
904 
905 	if (addr) {
906 		addr->family = AF_TIPC;
907 		addr->addrtype = TIPC_ADDR_ID;
908 		memset(&addr->addr, 0, sizeof(addr->addr));
909 		addr->addr.id.ref = msg_origport(msg);
910 		addr->addr.id.node = msg_orignode(msg);
911 		addr->addr.name.domain = 0;	/* could leave uninitialized */
912 		addr->scope = 0;		/* could leave uninitialized */
913 		m->msg_namelen = sizeof(struct sockaddr_tipc);
914 	}
915 }
916 
917 /**
918  * anc_data_recv - optionally capture ancillary data for received message
919  * @m: descriptor for message info
920  * @msg: received message header
921  * @tport: TIPC port associated with message
922  *
923  * Note: Ancillary data is not captured if not requested by receiver.
924  *
925  * Returns 0 if successful, otherwise errno
926  */
927 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
928 			 struct tipc_port *tport)
929 {
930 	u32 anc_data[3];
931 	u32 err;
932 	u32 dest_type;
933 	int has_name;
934 	int res;
935 
936 	if (likely(m->msg_controllen == 0))
937 		return 0;
938 
939 	/* Optionally capture errored message object(s) */
940 	err = msg ? msg_errcode(msg) : 0;
941 	if (unlikely(err)) {
942 		anc_data[0] = err;
943 		anc_data[1] = msg_data_sz(msg);
944 		res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
945 		if (res)
946 			return res;
947 		if (anc_data[1]) {
948 			res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
949 				       msg_data(msg));
950 			if (res)
951 				return res;
952 		}
953 	}
954 
955 	/* Optionally capture message destination object */
956 	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
957 	switch (dest_type) {
958 	case TIPC_NAMED_MSG:
959 		has_name = 1;
960 		anc_data[0] = msg_nametype(msg);
961 		anc_data[1] = msg_namelower(msg);
962 		anc_data[2] = msg_namelower(msg);
963 		break;
964 	case TIPC_MCAST_MSG:
965 		has_name = 1;
966 		anc_data[0] = msg_nametype(msg);
967 		anc_data[1] = msg_namelower(msg);
968 		anc_data[2] = msg_nameupper(msg);
969 		break;
970 	case TIPC_CONN_MSG:
971 		has_name = (tport->conn_type != 0);
972 		anc_data[0] = tport->conn_type;
973 		anc_data[1] = tport->conn_instance;
974 		anc_data[2] = tport->conn_instance;
975 		break;
976 	default:
977 		has_name = 0;
978 	}
979 	if (has_name) {
980 		res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
981 		if (res)
982 			return res;
983 	}
984 
985 	return 0;
986 }
987 
988 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
989 {
990 	struct sock *sk = sock->sk;
991 	DEFINE_WAIT(wait);
992 	long timeo = *timeop;
993 	int err;
994 
995 	for (;;) {
996 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
997 		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
998 			if (sock->state == SS_DISCONNECTING) {
999 				err = -ENOTCONN;
1000 				break;
1001 			}
1002 			release_sock(sk);
1003 			timeo = schedule_timeout(timeo);
1004 			lock_sock(sk);
1005 		}
1006 		err = 0;
1007 		if (!skb_queue_empty(&sk->sk_receive_queue))
1008 			break;
1009 		err = sock_intr_errno(timeo);
1010 		if (signal_pending(current))
1011 			break;
1012 		err = -EAGAIN;
1013 		if (!timeo)
1014 			break;
1015 	}
1016 	finish_wait(sk_sleep(sk), &wait);
1017 	*timeop = timeo;
1018 	return err;
1019 }
1020 
1021 /**
1022  * tipc_recvmsg - receive packet-oriented message
1023  * @iocb: (unused)
1024  * @m: descriptor for message info
1025  * @buf_len: total size of user buffer area
1026  * @flags: receive flags
1027  *
1028  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1029  * If the complete message doesn't fit in user area, truncate it.
1030  *
1031  * Returns size of returned message data, errno otherwise
1032  */
1033 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1034 			struct msghdr *m, size_t buf_len, int flags)
1035 {
1036 	struct sock *sk = sock->sk;
1037 	struct tipc_sock *tsk = tipc_sk(sk);
1038 	struct tipc_port *port = &tsk->port;
1039 	struct sk_buff *buf;
1040 	struct tipc_msg *msg;
1041 	long timeo;
1042 	unsigned int sz;
1043 	u32 err;
1044 	int res;
1045 
1046 	/* Catch invalid receive requests */
1047 	if (unlikely(!buf_len))
1048 		return -EINVAL;
1049 
1050 	lock_sock(sk);
1051 
1052 	if (unlikely(sock->state == SS_UNCONNECTED)) {
1053 		res = -ENOTCONN;
1054 		goto exit;
1055 	}
1056 
1057 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1058 restart:
1059 
1060 	/* Look for a message in receive queue; wait if necessary */
1061 	res = tipc_wait_for_rcvmsg(sock, &timeo);
1062 	if (res)
1063 		goto exit;
1064 
1065 	/* Look at first message in receive queue */
1066 	buf = skb_peek(&sk->sk_receive_queue);
1067 	msg = buf_msg(buf);
1068 	sz = msg_data_sz(msg);
1069 	err = msg_errcode(msg);
1070 
1071 	/* Discard an empty non-errored message & try again */
1072 	if ((!sz) && (!err)) {
1073 		advance_rx_queue(sk);
1074 		goto restart;
1075 	}
1076 
1077 	/* Capture sender's address (optional) */
1078 	set_orig_addr(m, msg);
1079 
1080 	/* Capture ancillary data (optional) */
1081 	res = anc_data_recv(m, msg, port);
1082 	if (res)
1083 		goto exit;
1084 
1085 	/* Capture message data (if valid) & compute return value (always) */
1086 	if (!err) {
1087 		if (unlikely(buf_len < sz)) {
1088 			sz = buf_len;
1089 			m->msg_flags |= MSG_TRUNC;
1090 		}
1091 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1092 					      m->msg_iov, sz);
1093 		if (res)
1094 			goto exit;
1095 		res = sz;
1096 	} else {
1097 		if ((sock->state == SS_READY) ||
1098 		    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1099 			res = 0;
1100 		else
1101 			res = -ECONNRESET;
1102 	}
1103 
1104 	/* Consume received message (optional) */
1105 	if (likely(!(flags & MSG_PEEK))) {
1106 		if ((sock->state != SS_READY) &&
1107 		    (++port->conn_unacked >= TIPC_CONNACK_INTV))
1108 			tipc_acknowledge(port->ref, port->conn_unacked);
1109 		advance_rx_queue(sk);
1110 	}
1111 exit:
1112 	release_sock(sk);
1113 	return res;
1114 }
1115 
1116 /**
1117  * tipc_recv_stream - receive stream-oriented data
1118  * @iocb: (unused)
1119  * @m: descriptor for message info
1120  * @buf_len: total size of user buffer area
1121  * @flags: receive flags
1122  *
1123  * Used for SOCK_STREAM messages only.  If not enough data is available
1124  * will optionally wait for more; never truncates data.
1125  *
1126  * Returns size of returned message data, errno otherwise
1127  */
1128 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1129 			    struct msghdr *m, size_t buf_len, int flags)
1130 {
1131 	struct sock *sk = sock->sk;
1132 	struct tipc_sock *tsk = tipc_sk(sk);
1133 	struct tipc_port *port = &tsk->port;
1134 	struct sk_buff *buf;
1135 	struct tipc_msg *msg;
1136 	long timeo;
1137 	unsigned int sz;
1138 	int sz_to_copy, target, needed;
1139 	int sz_copied = 0;
1140 	u32 err;
1141 	int res = 0;
1142 
1143 	/* Catch invalid receive attempts */
1144 	if (unlikely(!buf_len))
1145 		return -EINVAL;
1146 
1147 	lock_sock(sk);
1148 
1149 	if (unlikely(sock->state == SS_UNCONNECTED)) {
1150 		res = -ENOTCONN;
1151 		goto exit;
1152 	}
1153 
1154 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1155 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1156 
1157 restart:
1158 	/* Look for a message in receive queue; wait if necessary */
1159 	res = tipc_wait_for_rcvmsg(sock, &timeo);
1160 	if (res)
1161 		goto exit;
1162 
1163 	/* Look at first message in receive queue */
1164 	buf = skb_peek(&sk->sk_receive_queue);
1165 	msg = buf_msg(buf);
1166 	sz = msg_data_sz(msg);
1167 	err = msg_errcode(msg);
1168 
1169 	/* Discard an empty non-errored message & try again */
1170 	if ((!sz) && (!err)) {
1171 		advance_rx_queue(sk);
1172 		goto restart;
1173 	}
1174 
1175 	/* Optionally capture sender's address & ancillary data of first msg */
1176 	if (sz_copied == 0) {
1177 		set_orig_addr(m, msg);
1178 		res = anc_data_recv(m, msg, port);
1179 		if (res)
1180 			goto exit;
1181 	}
1182 
1183 	/* Capture message data (if valid) & compute return value (always) */
1184 	if (!err) {
1185 		u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1186 
1187 		sz -= offset;
1188 		needed = (buf_len - sz_copied);
1189 		sz_to_copy = (sz <= needed) ? sz : needed;
1190 
1191 		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1192 					      m->msg_iov, sz_to_copy);
1193 		if (res)
1194 			goto exit;
1195 
1196 		sz_copied += sz_to_copy;
1197 
1198 		if (sz_to_copy < sz) {
1199 			if (!(flags & MSG_PEEK))
1200 				TIPC_SKB_CB(buf)->handle =
1201 				(void *)(unsigned long)(offset + sz_to_copy);
1202 			goto exit;
1203 		}
1204 	} else {
1205 		if (sz_copied != 0)
1206 			goto exit; /* can't add error msg to valid data */
1207 
1208 		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1209 			res = 0;
1210 		else
1211 			res = -ECONNRESET;
1212 	}
1213 
1214 	/* Consume received message (optional) */
1215 	if (likely(!(flags & MSG_PEEK))) {
1216 		if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
1217 			tipc_acknowledge(port->ref, port->conn_unacked);
1218 		advance_rx_queue(sk);
1219 	}
1220 
1221 	/* Loop around if more data is required */
1222 	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
1223 	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1224 	    (sz_copied < target)) &&	/* and more is ready or required */
1225 	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */
1226 	    (!err))			/* and haven't reached a FIN */
1227 		goto restart;
1228 
1229 exit:
1230 	release_sock(sk);
1231 	return sz_copied ? sz_copied : res;
1232 }
1233 
1234 /**
1235  * tipc_write_space - wake up thread if port congestion is released
1236  * @sk: socket
1237  */
1238 static void tipc_write_space(struct sock *sk)
1239 {
1240 	struct socket_wq *wq;
1241 
1242 	rcu_read_lock();
1243 	wq = rcu_dereference(sk->sk_wq);
1244 	if (wq_has_sleeper(wq))
1245 		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1246 						POLLWRNORM | POLLWRBAND);
1247 	rcu_read_unlock();
1248 }
1249 
1250 /**
1251  * tipc_data_ready - wake up threads to indicate messages have been received
1252  * @sk: socket
1253  * @len: the length of messages
1254  */
1255 static void tipc_data_ready(struct sock *sk)
1256 {
1257 	struct socket_wq *wq;
1258 
1259 	rcu_read_lock();
1260 	wq = rcu_dereference(sk->sk_wq);
1261 	if (wq_has_sleeper(wq))
1262 		wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1263 						POLLRDNORM | POLLRDBAND);
1264 	rcu_read_unlock();
1265 }
1266 
1267 /**
1268  * filter_connect - Handle all incoming messages for a connection-based socket
1269  * @tsk: TIPC socket
1270  * @msg: message
1271  *
1272  * Returns TIPC error status code and socket error status code
1273  * once it encounters some errors
1274  */
1275 static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1276 {
1277 	struct sock *sk = &tsk->sk;
1278 	struct tipc_port *port = &tsk->port;
1279 	struct socket *sock = sk->sk_socket;
1280 	struct tipc_msg *msg = buf_msg(*buf);
1281 
1282 	u32 retval = TIPC_ERR_NO_PORT;
1283 	int res;
1284 
1285 	if (msg_mcast(msg))
1286 		return retval;
1287 
1288 	switch ((int)sock->state) {
1289 	case SS_CONNECTED:
1290 		/* Accept only connection-based messages sent by peer */
1291 		if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1292 			if (unlikely(msg_errcode(msg))) {
1293 				sock->state = SS_DISCONNECTING;
1294 				__tipc_port_disconnect(port);
1295 			}
1296 			retval = TIPC_OK;
1297 		}
1298 		break;
1299 	case SS_CONNECTING:
1300 		/* Accept only ACK or NACK message */
1301 		if (unlikely(msg_errcode(msg))) {
1302 			sock->state = SS_DISCONNECTING;
1303 			sk->sk_err = ECONNREFUSED;
1304 			retval = TIPC_OK;
1305 			break;
1306 		}
1307 
1308 		if (unlikely(!msg_connected(msg)))
1309 			break;
1310 
1311 		res = auto_connect(tsk, msg);
1312 		if (res) {
1313 			sock->state = SS_DISCONNECTING;
1314 			sk->sk_err = -res;
1315 			retval = TIPC_OK;
1316 			break;
1317 		}
1318 
1319 		/* If an incoming message is an 'ACK-', it should be
1320 		 * discarded here because it doesn't contain useful
1321 		 * data. In addition, we should try to wake up
1322 		 * connect() routine if sleeping.
1323 		 */
1324 		if (msg_data_sz(msg) == 0) {
1325 			kfree_skb(*buf);
1326 			*buf = NULL;
1327 			if (waitqueue_active(sk_sleep(sk)))
1328 				wake_up_interruptible(sk_sleep(sk));
1329 		}
1330 		retval = TIPC_OK;
1331 		break;
1332 	case SS_LISTENING:
1333 	case SS_UNCONNECTED:
1334 		/* Accept only SYN message */
1335 		if (!msg_connected(msg) && !(msg_errcode(msg)))
1336 			retval = TIPC_OK;
1337 		break;
1338 	case SS_DISCONNECTING:
1339 		break;
1340 	default:
1341 		pr_err("Unknown socket state %u\n", sock->state);
1342 	}
1343 	return retval;
1344 }
1345 
1346 /**
1347  * rcvbuf_limit - get proper overload limit of socket receive queue
1348  * @sk: socket
1349  * @buf: message
1350  *
1351  * For all connection oriented messages, irrespective of importance,
1352  * the default overload value (i.e. 67MB) is set as limit.
1353  *
1354  * For all connectionless messages, by default new queue limits are
1355  * as belows:
1356  *
1357  * TIPC_LOW_IMPORTANCE       (4 MB)
1358  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1359  * TIPC_HIGH_IMPORTANCE      (16 MB)
1360  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1361  *
1362  * Returns overload limit according to corresponding message importance
1363  */
1364 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1365 {
1366 	struct tipc_msg *msg = buf_msg(buf);
1367 
1368 	if (msg_connected(msg))
1369 		return sysctl_tipc_rmem[2];
1370 
1371 	return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1372 		msg_importance(msg);
1373 }
1374 
1375 /**
1376  * filter_rcv - validate incoming message
1377  * @sk: socket
1378  * @buf: message
1379  *
1380  * Enqueues message on receive queue if acceptable; optionally handles
1381  * disconnect indication for a connected socket.
1382  *
1383  * Called with socket lock already taken; port lock may also be taken.
1384  *
1385  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1386  */
1387 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1388 {
1389 	struct socket *sock = sk->sk_socket;
1390 	struct tipc_sock *tsk = tipc_sk(sk);
1391 	struct tipc_msg *msg = buf_msg(buf);
1392 	unsigned int limit = rcvbuf_limit(sk, buf);
1393 	u32 res = TIPC_OK;
1394 
1395 	/* Reject message if it is wrong sort of message for socket */
1396 	if (msg_type(msg) > TIPC_DIRECT_MSG)
1397 		return TIPC_ERR_NO_PORT;
1398 
1399 	if (sock->state == SS_READY) {
1400 		if (msg_connected(msg))
1401 			return TIPC_ERR_NO_PORT;
1402 	} else {
1403 		res = filter_connect(tsk, &buf);
1404 		if (res != TIPC_OK || buf == NULL)
1405 			return res;
1406 	}
1407 
1408 	/* Reject message if there isn't room to queue it */
1409 	if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1410 		return TIPC_ERR_OVERLOAD;
1411 
1412 	/* Enqueue message */
1413 	TIPC_SKB_CB(buf)->handle = NULL;
1414 	__skb_queue_tail(&sk->sk_receive_queue, buf);
1415 	skb_set_owner_r(buf, sk);
1416 
1417 	sk->sk_data_ready(sk);
1418 	return TIPC_OK;
1419 }
1420 
1421 /**
1422  * tipc_backlog_rcv - handle incoming message from backlog queue
1423  * @sk: socket
1424  * @buf: message
1425  *
1426  * Caller must hold socket lock, but not port lock.
1427  *
1428  * Returns 0
1429  */
1430 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1431 {
1432 	u32 res;
1433 	struct tipc_sock *tsk = tipc_sk(sk);
1434 	uint truesize = buf->truesize;
1435 
1436 	res = filter_rcv(sk, buf);
1437 	if (unlikely(res))
1438 		tipc_reject_msg(buf, res);
1439 
1440 	if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1441 		atomic_add(truesize, &tsk->dupl_rcvcnt);
1442 
1443 	return 0;
1444 }
1445 
1446 /**
1447  * tipc_sk_rcv - handle incoming message
1448  * @buf: buffer containing arriving message
1449  * Consumes buffer
1450  * Returns 0 if success, or errno: -EHOSTUNREACH
1451  */
1452 int tipc_sk_rcv(struct sk_buff *buf)
1453 {
1454 	struct tipc_sock *tsk;
1455 	struct tipc_port *port;
1456 	struct sock *sk;
1457 	u32 dport = msg_destport(buf_msg(buf));
1458 	int err = TIPC_OK;
1459 	uint limit;
1460 
1461 	/* Forward unresolved named message */
1462 	if (unlikely(!dport)) {
1463 		tipc_net_route_msg(buf);
1464 		return 0;
1465 	}
1466 
1467 	/* Validate destination */
1468 	port = tipc_port_lock(dport);
1469 	if (unlikely(!port)) {
1470 		err = TIPC_ERR_NO_PORT;
1471 		goto exit;
1472 	}
1473 
1474 	tsk = tipc_port_to_sock(port);
1475 	sk = &tsk->sk;
1476 
1477 	/* Queue message */
1478 	bh_lock_sock(sk);
1479 
1480 	if (!sock_owned_by_user(sk)) {
1481 		err = filter_rcv(sk, buf);
1482 	} else {
1483 		if (sk->sk_backlog.len == 0)
1484 			atomic_set(&tsk->dupl_rcvcnt, 0);
1485 		limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1486 		if (sk_add_backlog(sk, buf, limit))
1487 			err = TIPC_ERR_OVERLOAD;
1488 	}
1489 
1490 	bh_unlock_sock(sk);
1491 	tipc_port_unlock(port);
1492 
1493 	if (likely(!err))
1494 		return 0;
1495 exit:
1496 	tipc_reject_msg(buf, err);
1497 	return -EHOSTUNREACH;
1498 }
1499 
1500 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1501 {
1502 	struct sock *sk = sock->sk;
1503 	DEFINE_WAIT(wait);
1504 	int done;
1505 
1506 	do {
1507 		int err = sock_error(sk);
1508 		if (err)
1509 			return err;
1510 		if (!*timeo_p)
1511 			return -ETIMEDOUT;
1512 		if (signal_pending(current))
1513 			return sock_intr_errno(*timeo_p);
1514 
1515 		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1516 		done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1517 		finish_wait(sk_sleep(sk), &wait);
1518 	} while (!done);
1519 	return 0;
1520 }
1521 
1522 /**
1523  * tipc_connect - establish a connection to another TIPC port
1524  * @sock: socket structure
1525  * @dest: socket address for destination port
1526  * @destlen: size of socket address data structure
1527  * @flags: file-related flags associated with socket
1528  *
1529  * Returns 0 on success, errno otherwise
1530  */
1531 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1532 			int destlen, int flags)
1533 {
1534 	struct sock *sk = sock->sk;
1535 	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1536 	struct msghdr m = {NULL,};
1537 	long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1538 	socket_state previous;
1539 	int res;
1540 
1541 	lock_sock(sk);
1542 
1543 	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1544 	if (sock->state == SS_READY) {
1545 		res = -EOPNOTSUPP;
1546 		goto exit;
1547 	}
1548 
1549 	/*
1550 	 * Reject connection attempt using multicast address
1551 	 *
1552 	 * Note: send_msg() validates the rest of the address fields,
1553 	 *       so there's no need to do it here
1554 	 */
1555 	if (dst->addrtype == TIPC_ADDR_MCAST) {
1556 		res = -EINVAL;
1557 		goto exit;
1558 	}
1559 
1560 	previous = sock->state;
1561 	switch (sock->state) {
1562 	case SS_UNCONNECTED:
1563 		/* Send a 'SYN-' to destination */
1564 		m.msg_name = dest;
1565 		m.msg_namelen = destlen;
1566 
1567 		/* If connect is in non-blocking case, set MSG_DONTWAIT to
1568 		 * indicate send_msg() is never blocked.
1569 		 */
1570 		if (!timeout)
1571 			m.msg_flags = MSG_DONTWAIT;
1572 
1573 		res = tipc_sendmsg(NULL, sock, &m, 0);
1574 		if ((res < 0) && (res != -EWOULDBLOCK))
1575 			goto exit;
1576 
1577 		/* Just entered SS_CONNECTING state; the only
1578 		 * difference is that return value in non-blocking
1579 		 * case is EINPROGRESS, rather than EALREADY.
1580 		 */
1581 		res = -EINPROGRESS;
1582 	case SS_CONNECTING:
1583 		if (previous == SS_CONNECTING)
1584 			res = -EALREADY;
1585 		if (!timeout)
1586 			goto exit;
1587 		timeout = msecs_to_jiffies(timeout);
1588 		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1589 		res = tipc_wait_for_connect(sock, &timeout);
1590 		break;
1591 	case SS_CONNECTED:
1592 		res = -EISCONN;
1593 		break;
1594 	default:
1595 		res = -EINVAL;
1596 		break;
1597 	}
1598 exit:
1599 	release_sock(sk);
1600 	return res;
1601 }
1602 
1603 /**
1604  * tipc_listen - allow socket to listen for incoming connections
1605  * @sock: socket structure
1606  * @len: (unused)
1607  *
1608  * Returns 0 on success, errno otherwise
1609  */
1610 static int tipc_listen(struct socket *sock, int len)
1611 {
1612 	struct sock *sk = sock->sk;
1613 	int res;
1614 
1615 	lock_sock(sk);
1616 
1617 	if (sock->state != SS_UNCONNECTED)
1618 		res = -EINVAL;
1619 	else {
1620 		sock->state = SS_LISTENING;
1621 		res = 0;
1622 	}
1623 
1624 	release_sock(sk);
1625 	return res;
1626 }
1627 
1628 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1629 {
1630 	struct sock *sk = sock->sk;
1631 	DEFINE_WAIT(wait);
1632 	int err;
1633 
1634 	/* True wake-one mechanism for incoming connections: only
1635 	 * one process gets woken up, not the 'whole herd'.
1636 	 * Since we do not 'race & poll' for established sockets
1637 	 * anymore, the common case will execute the loop only once.
1638 	*/
1639 	for (;;) {
1640 		prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1641 					  TASK_INTERRUPTIBLE);
1642 		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1643 			release_sock(sk);
1644 			timeo = schedule_timeout(timeo);
1645 			lock_sock(sk);
1646 		}
1647 		err = 0;
1648 		if (!skb_queue_empty(&sk->sk_receive_queue))
1649 			break;
1650 		err = -EINVAL;
1651 		if (sock->state != SS_LISTENING)
1652 			break;
1653 		err = sock_intr_errno(timeo);
1654 		if (signal_pending(current))
1655 			break;
1656 		err = -EAGAIN;
1657 		if (!timeo)
1658 			break;
1659 	}
1660 	finish_wait(sk_sleep(sk), &wait);
1661 	return err;
1662 }
1663 
1664 /**
1665  * tipc_accept - wait for connection request
1666  * @sock: listening socket
1667  * @newsock: new socket that is to be connected
1668  * @flags: file-related flags associated with socket
1669  *
1670  * Returns 0 on success, errno otherwise
1671  */
1672 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1673 {
1674 	struct sock *new_sk, *sk = sock->sk;
1675 	struct sk_buff *buf;
1676 	struct tipc_port *new_port;
1677 	struct tipc_msg *msg;
1678 	struct tipc_portid peer;
1679 	u32 new_ref;
1680 	long timeo;
1681 	int res;
1682 
1683 	lock_sock(sk);
1684 
1685 	if (sock->state != SS_LISTENING) {
1686 		res = -EINVAL;
1687 		goto exit;
1688 	}
1689 	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1690 	res = tipc_wait_for_accept(sock, timeo);
1691 	if (res)
1692 		goto exit;
1693 
1694 	buf = skb_peek(&sk->sk_receive_queue);
1695 
1696 	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1697 	if (res)
1698 		goto exit;
1699 
1700 	new_sk = new_sock->sk;
1701 	new_port = &tipc_sk(new_sk)->port;
1702 	new_ref = new_port->ref;
1703 	msg = buf_msg(buf);
1704 
1705 	/* we lock on new_sk; but lockdep sees the lock on sk */
1706 	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1707 
1708 	/*
1709 	 * Reject any stray messages received by new socket
1710 	 * before the socket lock was taken (very, very unlikely)
1711 	 */
1712 	reject_rx_queue(new_sk);
1713 
1714 	/* Connect new socket to it's peer */
1715 	peer.ref = msg_origport(msg);
1716 	peer.node = msg_orignode(msg);
1717 	tipc_port_connect(new_ref, &peer);
1718 	new_sock->state = SS_CONNECTED;
1719 
1720 	tipc_port_set_importance(new_port, msg_importance(msg));
1721 	if (msg_named(msg)) {
1722 		new_port->conn_type = msg_nametype(msg);
1723 		new_port->conn_instance = msg_nameinst(msg);
1724 	}
1725 
1726 	/*
1727 	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1728 	 * Respond to 'SYN+' by queuing it on new socket.
1729 	 */
1730 	if (!msg_data_sz(msg)) {
1731 		struct msghdr m = {NULL,};
1732 
1733 		advance_rx_queue(sk);
1734 		tipc_send_packet(NULL, new_sock, &m, 0);
1735 	} else {
1736 		__skb_dequeue(&sk->sk_receive_queue);
1737 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
1738 		skb_set_owner_r(buf, new_sk);
1739 	}
1740 	release_sock(new_sk);
1741 exit:
1742 	release_sock(sk);
1743 	return res;
1744 }
1745 
1746 /**
1747  * tipc_shutdown - shutdown socket connection
1748  * @sock: socket structure
1749  * @how: direction to close (must be SHUT_RDWR)
1750  *
1751  * Terminates connection (if necessary), then purges socket's receive queue.
1752  *
1753  * Returns 0 on success, errno otherwise
1754  */
1755 static int tipc_shutdown(struct socket *sock, int how)
1756 {
1757 	struct sock *sk = sock->sk;
1758 	struct tipc_sock *tsk = tipc_sk(sk);
1759 	struct tipc_port *port = &tsk->port;
1760 	struct sk_buff *buf;
1761 	int res;
1762 
1763 	if (how != SHUT_RDWR)
1764 		return -EINVAL;
1765 
1766 	lock_sock(sk);
1767 
1768 	switch (sock->state) {
1769 	case SS_CONNECTING:
1770 	case SS_CONNECTED:
1771 
1772 restart:
1773 		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1774 		buf = __skb_dequeue(&sk->sk_receive_queue);
1775 		if (buf) {
1776 			if (TIPC_SKB_CB(buf)->handle != NULL) {
1777 				kfree_skb(buf);
1778 				goto restart;
1779 			}
1780 			tipc_port_disconnect(port->ref);
1781 			tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1782 		} else {
1783 			tipc_port_shutdown(port->ref);
1784 		}
1785 
1786 		sock->state = SS_DISCONNECTING;
1787 
1788 		/* fall through */
1789 
1790 	case SS_DISCONNECTING:
1791 
1792 		/* Discard any unreceived messages */
1793 		__skb_queue_purge(&sk->sk_receive_queue);
1794 
1795 		/* Wake up anyone sleeping in poll */
1796 		sk->sk_state_change(sk);
1797 		res = 0;
1798 		break;
1799 
1800 	default:
1801 		res = -ENOTCONN;
1802 	}
1803 
1804 	release_sock(sk);
1805 	return res;
1806 }
1807 
1808 /**
1809  * tipc_setsockopt - set socket option
1810  * @sock: socket structure
1811  * @lvl: option level
1812  * @opt: option identifier
1813  * @ov: pointer to new option value
1814  * @ol: length of option value
1815  *
1816  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1817  * (to ease compatibility).
1818  *
1819  * Returns 0 on success, errno otherwise
1820  */
1821 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1822 			   char __user *ov, unsigned int ol)
1823 {
1824 	struct sock *sk = sock->sk;
1825 	struct tipc_sock *tsk = tipc_sk(sk);
1826 	struct tipc_port *port = &tsk->port;
1827 	u32 value;
1828 	int res;
1829 
1830 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1831 		return 0;
1832 	if (lvl != SOL_TIPC)
1833 		return -ENOPROTOOPT;
1834 	if (ol < sizeof(value))
1835 		return -EINVAL;
1836 	res = get_user(value, (u32 __user *)ov);
1837 	if (res)
1838 		return res;
1839 
1840 	lock_sock(sk);
1841 
1842 	switch (opt) {
1843 	case TIPC_IMPORTANCE:
1844 		tipc_port_set_importance(port, value);
1845 		break;
1846 	case TIPC_SRC_DROPPABLE:
1847 		if (sock->type != SOCK_STREAM)
1848 			tipc_port_set_unreliable(port, value);
1849 		else
1850 			res = -ENOPROTOOPT;
1851 		break;
1852 	case TIPC_DEST_DROPPABLE:
1853 		tipc_port_set_unreturnable(port, value);
1854 		break;
1855 	case TIPC_CONN_TIMEOUT:
1856 		tipc_sk(sk)->conn_timeout = value;
1857 		/* no need to set "res", since already 0 at this point */
1858 		break;
1859 	default:
1860 		res = -EINVAL;
1861 	}
1862 
1863 	release_sock(sk);
1864 
1865 	return res;
1866 }
1867 
1868 /**
1869  * tipc_getsockopt - get socket option
1870  * @sock: socket structure
1871  * @lvl: option level
1872  * @opt: option identifier
1873  * @ov: receptacle for option value
1874  * @ol: receptacle for length of option value
1875  *
1876  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1877  * (to ease compatibility).
1878  *
1879  * Returns 0 on success, errno otherwise
1880  */
1881 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1882 			   char __user *ov, int __user *ol)
1883 {
1884 	struct sock *sk = sock->sk;
1885 	struct tipc_sock *tsk = tipc_sk(sk);
1886 	struct tipc_port *port = &tsk->port;
1887 	int len;
1888 	u32 value;
1889 	int res;
1890 
1891 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1892 		return put_user(0, ol);
1893 	if (lvl != SOL_TIPC)
1894 		return -ENOPROTOOPT;
1895 	res = get_user(len, ol);
1896 	if (res)
1897 		return res;
1898 
1899 	lock_sock(sk);
1900 
1901 	switch (opt) {
1902 	case TIPC_IMPORTANCE:
1903 		value = tipc_port_importance(port);
1904 		break;
1905 	case TIPC_SRC_DROPPABLE:
1906 		value = tipc_port_unreliable(port);
1907 		break;
1908 	case TIPC_DEST_DROPPABLE:
1909 		value = tipc_port_unreturnable(port);
1910 		break;
1911 	case TIPC_CONN_TIMEOUT:
1912 		value = tipc_sk(sk)->conn_timeout;
1913 		/* no need to set "res", since already 0 at this point */
1914 		break;
1915 	case TIPC_NODE_RECVQ_DEPTH:
1916 		value = 0; /* was tipc_queue_size, now obsolete */
1917 		break;
1918 	case TIPC_SOCK_RECVQ_DEPTH:
1919 		value = skb_queue_len(&sk->sk_receive_queue);
1920 		break;
1921 	default:
1922 		res = -EINVAL;
1923 	}
1924 
1925 	release_sock(sk);
1926 
1927 	if (res)
1928 		return res;	/* "get" failed */
1929 
1930 	if (len < sizeof(value))
1931 		return -EINVAL;
1932 
1933 	if (copy_to_user(ov, &value, sizeof(value)))
1934 		return -EFAULT;
1935 
1936 	return put_user(sizeof(value), ol);
1937 }
1938 
1939 int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1940 {
1941 	struct tipc_sioc_ln_req lnr;
1942 	void __user *argp = (void __user *)arg;
1943 
1944 	switch (cmd) {
1945 	case SIOCGETLINKNAME:
1946 		if (copy_from_user(&lnr, argp, sizeof(lnr)))
1947 			return -EFAULT;
1948 		if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
1949 					    lnr.linkname, TIPC_MAX_LINK_NAME)) {
1950 			if (copy_to_user(argp, &lnr, sizeof(lnr)))
1951 				return -EFAULT;
1952 			return 0;
1953 		}
1954 		return -EADDRNOTAVAIL;
1955 		break;
1956 	default:
1957 		return -ENOIOCTLCMD;
1958 	}
1959 }
1960 
1961 /* Protocol switches for the various types of TIPC sockets */
1962 
1963 static const struct proto_ops msg_ops = {
1964 	.owner		= THIS_MODULE,
1965 	.family		= AF_TIPC,
1966 	.release	= tipc_release,
1967 	.bind		= tipc_bind,
1968 	.connect	= tipc_connect,
1969 	.socketpair	= sock_no_socketpair,
1970 	.accept		= sock_no_accept,
1971 	.getname	= tipc_getname,
1972 	.poll		= tipc_poll,
1973 	.ioctl		= tipc_ioctl,
1974 	.listen		= sock_no_listen,
1975 	.shutdown	= tipc_shutdown,
1976 	.setsockopt	= tipc_setsockopt,
1977 	.getsockopt	= tipc_getsockopt,
1978 	.sendmsg	= tipc_sendmsg,
1979 	.recvmsg	= tipc_recvmsg,
1980 	.mmap		= sock_no_mmap,
1981 	.sendpage	= sock_no_sendpage
1982 };
1983 
1984 static const struct proto_ops packet_ops = {
1985 	.owner		= THIS_MODULE,
1986 	.family		= AF_TIPC,
1987 	.release	= tipc_release,
1988 	.bind		= tipc_bind,
1989 	.connect	= tipc_connect,
1990 	.socketpair	= sock_no_socketpair,
1991 	.accept		= tipc_accept,
1992 	.getname	= tipc_getname,
1993 	.poll		= tipc_poll,
1994 	.ioctl		= tipc_ioctl,
1995 	.listen		= tipc_listen,
1996 	.shutdown	= tipc_shutdown,
1997 	.setsockopt	= tipc_setsockopt,
1998 	.getsockopt	= tipc_getsockopt,
1999 	.sendmsg	= tipc_send_packet,
2000 	.recvmsg	= tipc_recvmsg,
2001 	.mmap		= sock_no_mmap,
2002 	.sendpage	= sock_no_sendpage
2003 };
2004 
2005 static const struct proto_ops stream_ops = {
2006 	.owner		= THIS_MODULE,
2007 	.family		= AF_TIPC,
2008 	.release	= tipc_release,
2009 	.bind		= tipc_bind,
2010 	.connect	= tipc_connect,
2011 	.socketpair	= sock_no_socketpair,
2012 	.accept		= tipc_accept,
2013 	.getname	= tipc_getname,
2014 	.poll		= tipc_poll,
2015 	.ioctl		= tipc_ioctl,
2016 	.listen		= tipc_listen,
2017 	.shutdown	= tipc_shutdown,
2018 	.setsockopt	= tipc_setsockopt,
2019 	.getsockopt	= tipc_getsockopt,
2020 	.sendmsg	= tipc_send_stream,
2021 	.recvmsg	= tipc_recv_stream,
2022 	.mmap		= sock_no_mmap,
2023 	.sendpage	= sock_no_sendpage
2024 };
2025 
2026 static const struct net_proto_family tipc_family_ops = {
2027 	.owner		= THIS_MODULE,
2028 	.family		= AF_TIPC,
2029 	.create		= tipc_sk_create
2030 };
2031 
2032 static struct proto tipc_proto = {
2033 	.name		= "TIPC",
2034 	.owner		= THIS_MODULE,
2035 	.obj_size	= sizeof(struct tipc_sock),
2036 	.sysctl_rmem	= sysctl_tipc_rmem
2037 };
2038 
2039 static struct proto tipc_proto_kern = {
2040 	.name		= "TIPC",
2041 	.obj_size	= sizeof(struct tipc_sock),
2042 	.sysctl_rmem	= sysctl_tipc_rmem
2043 };
2044 
2045 /**
2046  * tipc_socket_init - initialize TIPC socket interface
2047  *
2048  * Returns 0 on success, errno otherwise
2049  */
2050 int tipc_socket_init(void)
2051 {
2052 	int res;
2053 
2054 	res = proto_register(&tipc_proto, 1);
2055 	if (res) {
2056 		pr_err("Failed to register TIPC protocol type\n");
2057 		goto out;
2058 	}
2059 
2060 	res = sock_register(&tipc_family_ops);
2061 	if (res) {
2062 		pr_err("Failed to register TIPC socket type\n");
2063 		proto_unregister(&tipc_proto);
2064 		goto out;
2065 	}
2066  out:
2067 	return res;
2068 }
2069 
2070 /**
2071  * tipc_socket_stop - stop TIPC socket interface
2072  */
2073 void tipc_socket_stop(void)
2074 {
2075 	sock_unregister(tipc_family_ops.family);
2076 	proto_unregister(&tipc_proto);
2077 }
2078