xref: /openbmc/linux/net/tipc/socket.c (revision c900529f3d9161bfde5cca0754f83b4d3c3e0220)
1  /*
2   * net/tipc/socket.c: TIPC socket API
3   *
4   * Copyright (c) 2001-2007, 2012-2019, Ericsson AB
5   * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6   * Copyright (c) 2020-2021, Red Hat Inc
7   * All rights reserved.
8   *
9   * Redistribution and use in source and binary forms, with or without
10   * modification, are permitted provided that the following conditions are met:
11   *
12   * 1. Redistributions of source code must retain the above copyright
13   *    notice, this list of conditions and the following disclaimer.
14   * 2. Redistributions in binary form must reproduce the above copyright
15   *    notice, this list of conditions and the following disclaimer in the
16   *    documentation and/or other materials provided with the distribution.
17   * 3. Neither the names of the copyright holders nor the names of its
18   *    contributors may be used to endorse or promote products derived from
19   *    this software without specific prior written permission.
20   *
21   * Alternatively, this software may be distributed under the terms of the
22   * GNU General Public License ("GPL") version 2 as published by the Free
23   * Software Foundation.
24   *
25   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35   * POSSIBILITY OF SUCH DAMAGE.
36   */
37  
38  #include <linux/rhashtable.h>
39  #include <linux/sched/signal.h>
40  #include <trace/events/sock.h>
41  
42  #include "core.h"
43  #include "name_table.h"
44  #include "node.h"
45  #include "link.h"
46  #include "name_distr.h"
47  #include "socket.h"
48  #include "bcast.h"
49  #include "netlink.h"
50  #include "group.h"
51  #include "trace.h"
52  
53  #define NAGLE_START_INIT	4
54  #define NAGLE_START_MAX		1024
55  #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
56  #define CONN_PROBING_INTV	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
57  #define TIPC_MAX_PORT		0xffffffff
58  #define TIPC_MIN_PORT		1
59  #define TIPC_ACK_RATE		4       /* ACK at 1/4 of rcv window size */
60  
61  enum {
62  	TIPC_LISTEN = TCP_LISTEN,
63  	TIPC_ESTABLISHED = TCP_ESTABLISHED,
64  	TIPC_OPEN = TCP_CLOSE,
65  	TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
66  	TIPC_CONNECTING = TCP_SYN_SENT,
67  };
68  
69  struct sockaddr_pair {
70  	struct sockaddr_tipc sock;
71  	struct sockaddr_tipc member;
72  };
73  
74  /**
75   * struct tipc_sock - TIPC socket structure
76   * @sk: socket - interacts with 'port' and with user via the socket API
77   * @max_pkt: maximum packet size "hint" used when building messages sent by port
78   * @maxnagle: maximum size of msg which can be subject to nagle
79   * @portid: unique port identity in TIPC socket hash table
80   * @phdr: preformatted message header used when sending messages
81   * @cong_links: list of congested links
82   * @publications: list of publications for port
83   * @blocking_link: address of the congested link we are currently sleeping on
84   * @pub_count: total # of publications port has made during its lifetime
85   * @conn_timeout: the time we can wait for an unresponded setup request
86   * @probe_unacked: probe has not received ack yet
87   * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
88   * @cong_link_cnt: number of congested links
89   * @snt_unacked: # messages sent by socket, and not yet acked by peer
90   * @snd_win: send window size
91   * @peer_caps: peer capabilities mask
92   * @rcv_unacked: # messages read by user, but not yet acked back to peer
93   * @rcv_win: receive window size
94   * @peer: 'connected' peer for dgram/rdm
95   * @node: hash table node
96   * @mc_method: cookie for use between socket and broadcast layer
97   * @rcu: rcu struct for tipc_sock
98   * @group: TIPC communications group
99   * @oneway: message count in one direction (FIXME)
100   * @nagle_start: current nagle value
101   * @snd_backlog: send backlog count
102   * @msg_acc: messages accepted; used in managing backlog and nagle
103   * @pkt_cnt: TIPC socket packet count
104   * @expect_ack: whether this TIPC socket is expecting an ack
105   * @nodelay: setsockopt() TIPC_NODELAY setting
106   * @group_is_open: TIPC socket group is fully open (FIXME)
107   * @published: true if port has one or more associated names
108   * @conn_addrtype: address type used when establishing connection
109   */
110  struct tipc_sock {
111  	struct sock sk;
112  	u32 max_pkt;
113  	u32 maxnagle;
114  	u32 portid;
115  	struct tipc_msg phdr;
116  	struct list_head cong_links;
117  	struct list_head publications;
118  	u32 pub_count;
119  	atomic_t dupl_rcvcnt;
120  	u16 conn_timeout;
121  	bool probe_unacked;
122  	u16 cong_link_cnt;
123  	u16 snt_unacked;
124  	u16 snd_win;
125  	u16 peer_caps;
126  	u16 rcv_unacked;
127  	u16 rcv_win;
128  	struct sockaddr_tipc peer;
129  	struct rhash_head node;
130  	struct tipc_mc_method mc_method;
131  	struct rcu_head rcu;
132  	struct tipc_group *group;
133  	u32 oneway;
134  	u32 nagle_start;
135  	u16 snd_backlog;
136  	u16 msg_acc;
137  	u16 pkt_cnt;
138  	bool expect_ack;
139  	bool nodelay;
140  	bool group_is_open;
141  	bool published;
142  	u8 conn_addrtype;
143  };
144  
145  static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
146  static void tipc_data_ready(struct sock *sk);
147  static void tipc_write_space(struct sock *sk);
148  static void tipc_sock_destruct(struct sock *sk);
149  static int tipc_release(struct socket *sock);
150  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
151  		       bool kern);
152  static void tipc_sk_timeout(struct timer_list *t);
153  static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua);
154  static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua);
155  static int tipc_sk_leave(struct tipc_sock *tsk);
156  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
157  static int tipc_sk_insert(struct tipc_sock *tsk);
158  static void tipc_sk_remove(struct tipc_sock *tsk);
159  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
160  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
161  static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
162  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p);
163  
164  static const struct proto_ops packet_ops;
165  static const struct proto_ops stream_ops;
166  static const struct proto_ops msg_ops;
167  static struct proto tipc_proto;
168  static const struct rhashtable_params tsk_rht_params;
169  
tsk_own_node(struct tipc_sock * tsk)170  static u32 tsk_own_node(struct tipc_sock *tsk)
171  {
172  	return msg_prevnode(&tsk->phdr);
173  }
174  
tsk_peer_node(struct tipc_sock * tsk)175  static u32 tsk_peer_node(struct tipc_sock *tsk)
176  {
177  	return msg_destnode(&tsk->phdr);
178  }
179  
tsk_peer_port(struct tipc_sock * tsk)180  static u32 tsk_peer_port(struct tipc_sock *tsk)
181  {
182  	return msg_destport(&tsk->phdr);
183  }
184  
tsk_unreliable(struct tipc_sock * tsk)185  static  bool tsk_unreliable(struct tipc_sock *tsk)
186  {
187  	return msg_src_droppable(&tsk->phdr) != 0;
188  }
189  
tsk_set_unreliable(struct tipc_sock * tsk,bool unreliable)190  static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
191  {
192  	msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
193  }
194  
tsk_unreturnable(struct tipc_sock * tsk)195  static bool tsk_unreturnable(struct tipc_sock *tsk)
196  {
197  	return msg_dest_droppable(&tsk->phdr) != 0;
198  }
199  
tsk_set_unreturnable(struct tipc_sock * tsk,bool unreturnable)200  static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
201  {
202  	msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
203  }
204  
tsk_importance(struct tipc_sock * tsk)205  static int tsk_importance(struct tipc_sock *tsk)
206  {
207  	return msg_importance(&tsk->phdr);
208  }
209  
tipc_sk(const struct sock * sk)210  static struct tipc_sock *tipc_sk(const struct sock *sk)
211  {
212  	return container_of(sk, struct tipc_sock, sk);
213  }
214  
tsk_set_importance(struct sock * sk,int imp)215  int tsk_set_importance(struct sock *sk, int imp)
216  {
217  	if (imp > TIPC_CRITICAL_IMPORTANCE)
218  		return -EINVAL;
219  	msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
220  	return 0;
221  }
222  
tsk_conn_cong(struct tipc_sock * tsk)223  static bool tsk_conn_cong(struct tipc_sock *tsk)
224  {
225  	return tsk->snt_unacked > tsk->snd_win;
226  }
227  
tsk_blocks(int len)228  static u16 tsk_blocks(int len)
229  {
230  	return ((len / FLOWCTL_BLK_SZ) + 1);
231  }
232  
233  /* tsk_blocks(): translate a buffer size in bytes to number of
234   * advertisable blocks, taking into account the ratio truesize(len)/len
235   * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
236   */
tsk_adv_blocks(int len)237  static u16 tsk_adv_blocks(int len)
238  {
239  	return len / FLOWCTL_BLK_SZ / 4;
240  }
241  
242  /* tsk_inc(): increment counter for sent or received data
243   * - If block based flow control is not supported by peer we
244   *   fall back to message based ditto, incrementing the counter
245   */
tsk_inc(struct tipc_sock * tsk,int msglen)246  static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
247  {
248  	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
249  		return ((msglen / FLOWCTL_BLK_SZ) + 1);
250  	return 1;
251  }
252  
253  /* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
254   */
tsk_set_nagle(struct tipc_sock * tsk)255  static void tsk_set_nagle(struct tipc_sock *tsk)
256  {
257  	struct sock *sk = &tsk->sk;
258  
259  	tsk->maxnagle = 0;
260  	if (sk->sk_type != SOCK_STREAM)
261  		return;
262  	if (tsk->nodelay)
263  		return;
264  	if (!(tsk->peer_caps & TIPC_NAGLE))
265  		return;
266  	/* Limit node local buffer size to avoid receive queue overflow */
267  	if (tsk->max_pkt == MAX_MSG_SIZE)
268  		tsk->maxnagle = 1500;
269  	else
270  		tsk->maxnagle = tsk->max_pkt;
271  }
272  
273  /**
274   * tsk_advance_rx_queue - discard first buffer in socket receive queue
275   * @sk: network socket
276   *
277   * Caller must hold socket lock
278   */
tsk_advance_rx_queue(struct sock * sk)279  static void tsk_advance_rx_queue(struct sock *sk)
280  {
281  	trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
282  	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
283  }
284  
285  /* tipc_sk_respond() : send response message back to sender
286   */
tipc_sk_respond(struct sock * sk,struct sk_buff * skb,int err)287  static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
288  {
289  	u32 selector;
290  	u32 dnode;
291  	u32 onode = tipc_own_addr(sock_net(sk));
292  
293  	if (!tipc_msg_reverse(onode, &skb, err))
294  		return;
295  
296  	trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
297  	dnode = msg_destnode(buf_msg(skb));
298  	selector = msg_origport(buf_msg(skb));
299  	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
300  }
301  
302  /**
303   * tsk_rej_rx_queue - reject all buffers in socket receive queue
304   * @sk: network socket
305   * @error: response error code
306   *
307   * Caller must hold socket lock
308   */
tsk_rej_rx_queue(struct sock * sk,int error)309  static void tsk_rej_rx_queue(struct sock *sk, int error)
310  {
311  	struct sk_buff *skb;
312  
313  	while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
314  		tipc_sk_respond(sk, skb, error);
315  }
316  
tipc_sk_connected(const struct sock * sk)317  static bool tipc_sk_connected(const struct sock *sk)
318  {
319  	return READ_ONCE(sk->sk_state) == TIPC_ESTABLISHED;
320  }
321  
322  /* tipc_sk_type_connectionless - check if the socket is datagram socket
323   * @sk: socket
324   *
325   * Returns true if connection less, false otherwise
326   */
tipc_sk_type_connectionless(struct sock * sk)327  static bool tipc_sk_type_connectionless(struct sock *sk)
328  {
329  	return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
330  }
331  
332  /* tsk_peer_msg - verify if message was sent by connected port's peer
333   *
334   * Handles cases where the node's network address has changed from
335   * the default of <0.0.0> to its configured setting.
336   */
tsk_peer_msg(struct tipc_sock * tsk,struct tipc_msg * msg)337  static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
338  {
339  	struct sock *sk = &tsk->sk;
340  	u32 self = tipc_own_addr(sock_net(sk));
341  	u32 peer_port = tsk_peer_port(tsk);
342  	u32 orig_node, peer_node;
343  
344  	if (unlikely(!tipc_sk_connected(sk)))
345  		return false;
346  
347  	if (unlikely(msg_origport(msg) != peer_port))
348  		return false;
349  
350  	orig_node = msg_orignode(msg);
351  	peer_node = tsk_peer_node(tsk);
352  
353  	if (likely(orig_node == peer_node))
354  		return true;
355  
356  	if (!orig_node && peer_node == self)
357  		return true;
358  
359  	if (!peer_node && orig_node == self)
360  		return true;
361  
362  	return false;
363  }
364  
365  /* tipc_set_sk_state - set the sk_state of the socket
366   * @sk: socket
367   *
368   * Caller must hold socket lock
369   *
370   * Returns 0 on success, errno otherwise
371   */
tipc_set_sk_state(struct sock * sk,int state)372  static int tipc_set_sk_state(struct sock *sk, int state)
373  {
374  	int oldsk_state = sk->sk_state;
375  	int res = -EINVAL;
376  
377  	switch (state) {
378  	case TIPC_OPEN:
379  		res = 0;
380  		break;
381  	case TIPC_LISTEN:
382  	case TIPC_CONNECTING:
383  		if (oldsk_state == TIPC_OPEN)
384  			res = 0;
385  		break;
386  	case TIPC_ESTABLISHED:
387  		if (oldsk_state == TIPC_CONNECTING ||
388  		    oldsk_state == TIPC_OPEN)
389  			res = 0;
390  		break;
391  	case TIPC_DISCONNECTING:
392  		if (oldsk_state == TIPC_CONNECTING ||
393  		    oldsk_state == TIPC_ESTABLISHED)
394  			res = 0;
395  		break;
396  	}
397  
398  	if (!res)
399  		sk->sk_state = state;
400  
401  	return res;
402  }
403  
tipc_sk_sock_err(struct socket * sock,long * timeout)404  static int tipc_sk_sock_err(struct socket *sock, long *timeout)
405  {
406  	struct sock *sk = sock->sk;
407  	int err = sock_error(sk);
408  	int typ = sock->type;
409  
410  	if (err)
411  		return err;
412  	if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
413  		if (sk->sk_state == TIPC_DISCONNECTING)
414  			return -EPIPE;
415  		else if (!tipc_sk_connected(sk))
416  			return -ENOTCONN;
417  	}
418  	if (!*timeout)
419  		return -EAGAIN;
420  	if (signal_pending(current))
421  		return sock_intr_errno(*timeout);
422  
423  	return 0;
424  }
425  
426  #define tipc_wait_for_cond(sock_, timeo_, condition_)			       \
427  ({                                                                             \
428  	DEFINE_WAIT_FUNC(wait_, woken_wake_function);                          \
429  	struct sock *sk_;						       \
430  	int rc_;							       \
431  									       \
432  	while ((rc_ = !(condition_))) {					       \
433  		/* coupled with smp_wmb() in tipc_sk_proto_rcv() */            \
434  		smp_rmb();                                                     \
435  		sk_ = (sock_)->sk;					       \
436  		rc_ = tipc_sk_sock_err((sock_), timeo_);		       \
437  		if (rc_)						       \
438  			break;						       \
439  		add_wait_queue(sk_sleep(sk_), &wait_);                         \
440  		release_sock(sk_);					       \
441  		*(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
442  		sched_annotate_sleep();				               \
443  		lock_sock(sk_);						       \
444  		remove_wait_queue(sk_sleep(sk_), &wait_);		       \
445  	}								       \
446  	rc_;								       \
447  })
448  
449  /**
450   * tipc_sk_create - create a TIPC socket
451   * @net: network namespace (must be default network)
452   * @sock: pre-allocated socket structure
453   * @protocol: protocol indicator (must be 0)
454   * @kern: caused by kernel or by userspace?
455   *
456   * This routine creates additional data structures used by the TIPC socket,
457   * initializes them, and links them together.
458   *
459   * Return: 0 on success, errno otherwise
460   */
tipc_sk_create(struct net * net,struct socket * sock,int protocol,int kern)461  static int tipc_sk_create(struct net *net, struct socket *sock,
462  			  int protocol, int kern)
463  {
464  	const struct proto_ops *ops;
465  	struct sock *sk;
466  	struct tipc_sock *tsk;
467  	struct tipc_msg *msg;
468  
469  	/* Validate arguments */
470  	if (unlikely(protocol != 0))
471  		return -EPROTONOSUPPORT;
472  
473  	switch (sock->type) {
474  	case SOCK_STREAM:
475  		ops = &stream_ops;
476  		break;
477  	case SOCK_SEQPACKET:
478  		ops = &packet_ops;
479  		break;
480  	case SOCK_DGRAM:
481  	case SOCK_RDM:
482  		ops = &msg_ops;
483  		break;
484  	default:
485  		return -EPROTOTYPE;
486  	}
487  
488  	/* Allocate socket's protocol area */
489  	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
490  	if (sk == NULL)
491  		return -ENOMEM;
492  
493  	tsk = tipc_sk(sk);
494  	tsk->max_pkt = MAX_PKT_DEFAULT;
495  	tsk->maxnagle = 0;
496  	tsk->nagle_start = NAGLE_START_INIT;
497  	INIT_LIST_HEAD(&tsk->publications);
498  	INIT_LIST_HEAD(&tsk->cong_links);
499  	msg = &tsk->phdr;
500  
501  	/* Finish initializing socket data structures */
502  	sock->ops = ops;
503  	sock_init_data(sock, sk);
504  	tipc_set_sk_state(sk, TIPC_OPEN);
505  	if (tipc_sk_insert(tsk)) {
506  		sk_free(sk);
507  		pr_warn("Socket create failed; port number exhausted\n");
508  		return -EINVAL;
509  	}
510  
511  	/* Ensure tsk is visible before we read own_addr. */
512  	smp_mb();
513  
514  	tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
515  		      TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
516  
517  	msg_set_origport(msg, tsk->portid);
518  	timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
519  	sk->sk_shutdown = 0;
520  	sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
521  	sk->sk_rcvbuf = READ_ONCE(sysctl_tipc_rmem[1]);
522  	sk->sk_data_ready = tipc_data_ready;
523  	sk->sk_write_space = tipc_write_space;
524  	sk->sk_destruct = tipc_sock_destruct;
525  	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
526  	tsk->group_is_open = true;
527  	atomic_set(&tsk->dupl_rcvcnt, 0);
528  
529  	/* Start out with safe limits until we receive an advertised window */
530  	tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
531  	tsk->rcv_win = tsk->snd_win;
532  
533  	if (tipc_sk_type_connectionless(sk)) {
534  		tsk_set_unreturnable(tsk, true);
535  		if (sock->type == SOCK_DGRAM)
536  			tsk_set_unreliable(tsk, true);
537  	}
538  	__skb_queue_head_init(&tsk->mc_method.deferredq);
539  	trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
540  	return 0;
541  }
542  
tipc_sk_callback(struct rcu_head * head)543  static void tipc_sk_callback(struct rcu_head *head)
544  {
545  	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
546  
547  	sock_put(&tsk->sk);
548  }
549  
550  /* Caller should hold socket lock for the socket. */
__tipc_shutdown(struct socket * sock,int error)551  static void __tipc_shutdown(struct socket *sock, int error)
552  {
553  	struct sock *sk = sock->sk;
554  	struct tipc_sock *tsk = tipc_sk(sk);
555  	struct net *net = sock_net(sk);
556  	long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
557  	u32 dnode = tsk_peer_node(tsk);
558  	struct sk_buff *skb;
559  
560  	/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
561  	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
562  					    !tsk_conn_cong(tsk)));
563  
564  	/* Push out delayed messages if in Nagle mode */
565  	tipc_sk_push_backlog(tsk, false);
566  	/* Remove pending SYN */
567  	__skb_queue_purge(&sk->sk_write_queue);
568  
569  	/* Remove partially received buffer if any */
570  	skb = skb_peek(&sk->sk_receive_queue);
571  	if (skb && TIPC_SKB_CB(skb)->bytes_read) {
572  		__skb_unlink(skb, &sk->sk_receive_queue);
573  		kfree_skb(skb);
574  	}
575  
576  	/* Reject all unreceived messages if connectionless */
577  	if (tipc_sk_type_connectionless(sk)) {
578  		tsk_rej_rx_queue(sk, error);
579  		return;
580  	}
581  
582  	switch (sk->sk_state) {
583  	case TIPC_CONNECTING:
584  	case TIPC_ESTABLISHED:
585  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
586  		tipc_node_remove_conn(net, dnode, tsk->portid);
587  		/* Send a FIN+/- to its peer */
588  		skb = __skb_dequeue(&sk->sk_receive_queue);
589  		if (skb) {
590  			__skb_queue_purge(&sk->sk_receive_queue);
591  			tipc_sk_respond(sk, skb, error);
592  			break;
593  		}
594  		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
595  				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
596  				      tsk_own_node(tsk), tsk_peer_port(tsk),
597  				      tsk->portid, error);
598  		if (skb)
599  			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
600  		break;
601  	case TIPC_LISTEN:
602  		/* Reject all SYN messages */
603  		tsk_rej_rx_queue(sk, error);
604  		break;
605  	default:
606  		__skb_queue_purge(&sk->sk_receive_queue);
607  		break;
608  	}
609  }
610  
611  /**
612   * tipc_release - destroy a TIPC socket
613   * @sock: socket to destroy
614   *
615   * This routine cleans up any messages that are still queued on the socket.
616   * For DGRAM and RDM socket types, all queued messages are rejected.
617   * For SEQPACKET and STREAM socket types, the first message is rejected
618   * and any others are discarded.  (If the first message on a STREAM socket
619   * is partially-read, it is discarded and the next one is rejected instead.)
620   *
621   * NOTE: Rejected messages are not necessarily returned to the sender!  They
622   * are returned or discarded according to the "destination droppable" setting
623   * specified for the message by the sender.
624   *
625   * Return: 0 on success, errno otherwise
626   */
tipc_release(struct socket * sock)627  static int tipc_release(struct socket *sock)
628  {
629  	struct sock *sk = sock->sk;
630  	struct tipc_sock *tsk;
631  
632  	/*
633  	 * Exit if socket isn't fully initialized (occurs when a failed accept()
634  	 * releases a pre-allocated child socket that was never used)
635  	 */
636  	if (sk == NULL)
637  		return 0;
638  
639  	tsk = tipc_sk(sk);
640  	lock_sock(sk);
641  
642  	trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
643  	__tipc_shutdown(sock, TIPC_ERR_NO_PORT);
644  	sk->sk_shutdown = SHUTDOWN_MASK;
645  	tipc_sk_leave(tsk);
646  	tipc_sk_withdraw(tsk, NULL);
647  	__skb_queue_purge(&tsk->mc_method.deferredq);
648  	sk_stop_timer(sk, &sk->sk_timer);
649  	tipc_sk_remove(tsk);
650  
651  	sock_orphan(sk);
652  	/* Reject any messages that accumulated in backlog queue */
653  	release_sock(sk);
654  	tipc_dest_list_purge(&tsk->cong_links);
655  	tsk->cong_link_cnt = 0;
656  	call_rcu(&tsk->rcu, tipc_sk_callback);
657  	sock->sk = NULL;
658  
659  	return 0;
660  }
661  
662  /**
663   * __tipc_bind - associate or disassocate TIPC name(s) with a socket
664   * @sock: socket structure
665   * @skaddr: socket address describing name(s) and desired operation
666   * @alen: size of socket address data structure
667   *
668   * Name and name sequence binding are indicated using a positive scope value;
669   * a negative scope value unbinds the specified name.  Specifying no name
670   * (i.e. a socket address length of 0) unbinds all names from the socket.
671   *
672   * Return: 0 on success, errno otherwise
673   *
674   * NOTE: This routine doesn't need to take the socket lock since it doesn't
675   *       access any non-constant socket information.
676   */
__tipc_bind(struct socket * sock,struct sockaddr * skaddr,int alen)677  static int __tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
678  {
679  	struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
680  	struct tipc_sock *tsk = tipc_sk(sock->sk);
681  	bool unbind = false;
682  
683  	if (unlikely(!alen))
684  		return tipc_sk_withdraw(tsk, NULL);
685  
686  	if (ua->addrtype == TIPC_SERVICE_ADDR) {
687  		ua->addrtype = TIPC_SERVICE_RANGE;
688  		ua->sr.upper = ua->sr.lower;
689  	}
690  	if (ua->scope < 0) {
691  		unbind = true;
692  		ua->scope = -ua->scope;
693  	}
694  	/* Users may still use deprecated TIPC_ZONE_SCOPE */
695  	if (ua->scope != TIPC_NODE_SCOPE)
696  		ua->scope = TIPC_CLUSTER_SCOPE;
697  
698  	if (tsk->group)
699  		return -EACCES;
700  
701  	if (unbind)
702  		return tipc_sk_withdraw(tsk, ua);
703  	return tipc_sk_publish(tsk, ua);
704  }
705  
tipc_sk_bind(struct socket * sock,struct sockaddr * skaddr,int alen)706  int tipc_sk_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
707  {
708  	int res;
709  
710  	lock_sock(sock->sk);
711  	res = __tipc_bind(sock, skaddr, alen);
712  	release_sock(sock->sk);
713  	return res;
714  }
715  
tipc_bind(struct socket * sock,struct sockaddr * skaddr,int alen)716  static int tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
717  {
718  	struct tipc_uaddr *ua = (struct tipc_uaddr *)skaddr;
719  	u32 atype = ua->addrtype;
720  
721  	if (alen) {
722  		if (!tipc_uaddr_valid(ua, alen))
723  			return -EINVAL;
724  		if (atype == TIPC_SOCKET_ADDR)
725  			return -EAFNOSUPPORT;
726  		if (ua->sr.type < TIPC_RESERVED_TYPES) {
727  			pr_warn_once("Can't bind to reserved service type %u\n",
728  				     ua->sr.type);
729  			return -EACCES;
730  		}
731  	}
732  	return tipc_sk_bind(sock, skaddr, alen);
733  }
734  
735  /**
736   * tipc_getname - get port ID of socket or peer socket
737   * @sock: socket structure
738   * @uaddr: area for returned socket address
739   * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
740   *
741   * Return: 0 on success, errno otherwise
742   *
743   * NOTE: This routine doesn't need to take the socket lock since it only
744   *       accesses socket information that is unchanging (or which changes in
745   *       a completely predictable manner).
746   */
tipc_getname(struct socket * sock,struct sockaddr * uaddr,int peer)747  static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
748  			int peer)
749  {
750  	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
751  	struct sock *sk = sock->sk;
752  	struct tipc_sock *tsk = tipc_sk(sk);
753  
754  	memset(addr, 0, sizeof(*addr));
755  	if (peer) {
756  		if ((!tipc_sk_connected(sk)) &&
757  		    ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
758  			return -ENOTCONN;
759  		addr->addr.id.ref = tsk_peer_port(tsk);
760  		addr->addr.id.node = tsk_peer_node(tsk);
761  	} else {
762  		addr->addr.id.ref = tsk->portid;
763  		addr->addr.id.node = tipc_own_addr(sock_net(sk));
764  	}
765  
766  	addr->addrtype = TIPC_SOCKET_ADDR;
767  	addr->family = AF_TIPC;
768  	addr->scope = 0;
769  	addr->addr.name.domain = 0;
770  
771  	return sizeof(*addr);
772  }
773  
774  /**
775   * tipc_poll - read and possibly block on pollmask
776   * @file: file structure associated with the socket
777   * @sock: socket for which to calculate the poll bits
778   * @wait: ???
779   *
780   * Return: pollmask value
781   *
782   * COMMENTARY:
783   * It appears that the usual socket locking mechanisms are not useful here
784   * since the pollmask info is potentially out-of-date the moment this routine
785   * exits.  TCP and other protocols seem to rely on higher level poll routines
786   * to handle any preventable race conditions, so TIPC will do the same ...
787   *
788   * IMPORTANT: The fact that a read or write operation is indicated does NOT
789   * imply that the operation will succeed, merely that it should be performed
790   * and will not block.
791   */
tipc_poll(struct file * file,struct socket * sock,poll_table * wait)792  static __poll_t tipc_poll(struct file *file, struct socket *sock,
793  			      poll_table *wait)
794  {
795  	struct sock *sk = sock->sk;
796  	struct tipc_sock *tsk = tipc_sk(sk);
797  	__poll_t revents = 0;
798  
799  	sock_poll_wait(file, sock, wait);
800  	trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
801  
802  	if (sk->sk_shutdown & RCV_SHUTDOWN)
803  		revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
804  	if (sk->sk_shutdown == SHUTDOWN_MASK)
805  		revents |= EPOLLHUP;
806  
807  	switch (sk->sk_state) {
808  	case TIPC_ESTABLISHED:
809  		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
810  			revents |= EPOLLOUT;
811  		fallthrough;
812  	case TIPC_LISTEN:
813  	case TIPC_CONNECTING:
814  		if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
815  			revents |= EPOLLIN | EPOLLRDNORM;
816  		break;
817  	case TIPC_OPEN:
818  		if (tsk->group_is_open && !tsk->cong_link_cnt)
819  			revents |= EPOLLOUT;
820  		if (!tipc_sk_type_connectionless(sk))
821  			break;
822  		if (skb_queue_empty_lockless(&sk->sk_receive_queue))
823  			break;
824  		revents |= EPOLLIN | EPOLLRDNORM;
825  		break;
826  	case TIPC_DISCONNECTING:
827  		revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
828  		break;
829  	}
830  	return revents;
831  }
832  
833  /**
834   * tipc_sendmcast - send multicast message
835   * @sock: socket structure
836   * @ua: destination address struct
837   * @msg: message to send
838   * @dlen: length of data to send
839   * @timeout: timeout to wait for wakeup
840   *
841   * Called from function tipc_sendmsg(), which has done all sanity checks
842   * Return: the number of bytes sent on success, or errno
843   */
tipc_sendmcast(struct socket * sock,struct tipc_uaddr * ua,struct msghdr * msg,size_t dlen,long timeout)844  static int tipc_sendmcast(struct  socket *sock, struct tipc_uaddr *ua,
845  			  struct msghdr *msg, size_t dlen, long timeout)
846  {
847  	struct sock *sk = sock->sk;
848  	struct tipc_sock *tsk = tipc_sk(sk);
849  	struct tipc_msg *hdr = &tsk->phdr;
850  	struct net *net = sock_net(sk);
851  	int mtu = tipc_bcast_get_mtu(net);
852  	struct sk_buff_head pkts;
853  	struct tipc_nlist dsts;
854  	int rc;
855  
856  	if (tsk->group)
857  		return -EACCES;
858  
859  	/* Block or return if any destination link is congested */
860  	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
861  	if (unlikely(rc))
862  		return rc;
863  
864  	/* Lookup destination nodes */
865  	tipc_nlist_init(&dsts, tipc_own_addr(net));
866  	tipc_nametbl_lookup_mcast_nodes(net, ua, &dsts);
867  	if (!dsts.local && !dsts.remote)
868  		return -EHOSTUNREACH;
869  
870  	/* Build message header */
871  	msg_set_type(hdr, TIPC_MCAST_MSG);
872  	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
873  	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
874  	msg_set_destport(hdr, 0);
875  	msg_set_destnode(hdr, 0);
876  	msg_set_nametype(hdr, ua->sr.type);
877  	msg_set_namelower(hdr, ua->sr.lower);
878  	msg_set_nameupper(hdr, ua->sr.upper);
879  
880  	/* Build message as chain of buffers */
881  	__skb_queue_head_init(&pkts);
882  	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
883  
884  	/* Send message if build was successful */
885  	if (unlikely(rc == dlen)) {
886  		trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
887  					TIPC_DUMP_SK_SNDQ, " ");
888  		rc = tipc_mcast_xmit(net, &pkts, &tsk->mc_method, &dsts,
889  				     &tsk->cong_link_cnt);
890  	}
891  
892  	tipc_nlist_purge(&dsts);
893  
894  	return rc ? rc : dlen;
895  }
896  
897  /**
898   * tipc_send_group_msg - send a message to a member in the group
899   * @net: network namespace
900   * @tsk: tipc socket
901   * @m: message to send
902   * @mb: group member
903   * @dnode: destination node
904   * @dport: destination port
905   * @dlen: total length of message data
906   */
tipc_send_group_msg(struct net * net,struct tipc_sock * tsk,struct msghdr * m,struct tipc_member * mb,u32 dnode,u32 dport,int dlen)907  static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
908  			       struct msghdr *m, struct tipc_member *mb,
909  			       u32 dnode, u32 dport, int dlen)
910  {
911  	u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
912  	struct tipc_mc_method *method = &tsk->mc_method;
913  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
914  	struct tipc_msg *hdr = &tsk->phdr;
915  	struct sk_buff_head pkts;
916  	int mtu, rc;
917  
918  	/* Complete message header */
919  	msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
920  	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
921  	msg_set_destport(hdr, dport);
922  	msg_set_destnode(hdr, dnode);
923  	msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
924  
925  	/* Build message as chain of buffers */
926  	__skb_queue_head_init(&pkts);
927  	mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
928  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
929  	if (unlikely(rc != dlen))
930  		return rc;
931  
932  	/* Send message */
933  	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
934  	if (unlikely(rc == -ELINKCONG)) {
935  		tipc_dest_push(&tsk->cong_links, dnode, 0);
936  		tsk->cong_link_cnt++;
937  	}
938  
939  	/* Update send window */
940  	tipc_group_update_member(mb, blks);
941  
942  	/* A broadcast sent within next EXPIRE period must follow same path */
943  	method->rcast = true;
944  	method->mandatory = true;
945  	return dlen;
946  }
947  
948  /**
949   * tipc_send_group_unicast - send message to a member in the group
950   * @sock: socket structure
951   * @m: message to send
952   * @dlen: total length of message data
953   * @timeout: timeout to wait for wakeup
954   *
955   * Called from function tipc_sendmsg(), which has done all sanity checks
956   * Return: the number of bytes sent on success, or errno
957   */
tipc_send_group_unicast(struct socket * sock,struct msghdr * m,int dlen,long timeout)958  static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
959  				   int dlen, long timeout)
960  {
961  	struct sock *sk = sock->sk;
962  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
963  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
964  	struct tipc_sock *tsk = tipc_sk(sk);
965  	struct net *net = sock_net(sk);
966  	struct tipc_member *mb = NULL;
967  	u32 node, port;
968  	int rc;
969  
970  	node = ua->sk.node;
971  	port = ua->sk.ref;
972  	if (!port && !node)
973  		return -EHOSTUNREACH;
974  
975  	/* Block or return if destination link or member is congested */
976  	rc = tipc_wait_for_cond(sock, &timeout,
977  				!tipc_dest_find(&tsk->cong_links, node, 0) &&
978  				tsk->group &&
979  				!tipc_group_cong(tsk->group, node, port, blks,
980  						 &mb));
981  	if (unlikely(rc))
982  		return rc;
983  
984  	if (unlikely(!mb))
985  		return -EHOSTUNREACH;
986  
987  	rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
988  
989  	return rc ? rc : dlen;
990  }
991  
992  /**
993   * tipc_send_group_anycast - send message to any member with given identity
994   * @sock: socket structure
995   * @m: message to send
996   * @dlen: total length of message data
997   * @timeout: timeout to wait for wakeup
998   *
999   * Called from function tipc_sendmsg(), which has done all sanity checks
1000   * Return: the number of bytes sent on success, or errno
1001   */
tipc_send_group_anycast(struct socket * sock,struct msghdr * m,int dlen,long timeout)1002  static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
1003  				   int dlen, long timeout)
1004  {
1005  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1006  	struct sock *sk = sock->sk;
1007  	struct tipc_sock *tsk = tipc_sk(sk);
1008  	struct list_head *cong_links = &tsk->cong_links;
1009  	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
1010  	struct tipc_msg *hdr = &tsk->phdr;
1011  	struct tipc_member *first = NULL;
1012  	struct tipc_member *mbr = NULL;
1013  	struct net *net = sock_net(sk);
1014  	u32 node, port, exclude;
1015  	struct list_head dsts;
1016  	int lookups = 0;
1017  	int dstcnt, rc;
1018  	bool cong;
1019  
1020  	INIT_LIST_HEAD(&dsts);
1021  	ua->sa.type = msg_nametype(hdr);
1022  	ua->scope = msg_lookup_scope(hdr);
1023  
1024  	while (++lookups < 4) {
1025  		exclude = tipc_group_exclude(tsk->group);
1026  
1027  		first = NULL;
1028  
1029  		/* Look for a non-congested destination member, if any */
1030  		while (1) {
1031  			if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt,
1032  						       exclude, false))
1033  				return -EHOSTUNREACH;
1034  			tipc_dest_pop(&dsts, &node, &port);
1035  			cong = tipc_group_cong(tsk->group, node, port, blks,
1036  					       &mbr);
1037  			if (!cong)
1038  				break;
1039  			if (mbr == first)
1040  				break;
1041  			if (!first)
1042  				first = mbr;
1043  		}
1044  
1045  		/* Start over if destination was not in member list */
1046  		if (unlikely(!mbr))
1047  			continue;
1048  
1049  		if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
1050  			break;
1051  
1052  		/* Block or return if destination link or member is congested */
1053  		rc = tipc_wait_for_cond(sock, &timeout,
1054  					!tipc_dest_find(cong_links, node, 0) &&
1055  					tsk->group &&
1056  					!tipc_group_cong(tsk->group, node, port,
1057  							 blks, &mbr));
1058  		if (unlikely(rc))
1059  			return rc;
1060  
1061  		/* Send, unless destination disappeared while waiting */
1062  		if (likely(mbr))
1063  			break;
1064  	}
1065  
1066  	if (unlikely(lookups >= 4))
1067  		return -EHOSTUNREACH;
1068  
1069  	rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
1070  
1071  	return rc ? rc : dlen;
1072  }
1073  
1074  /**
1075   * tipc_send_group_bcast - send message to all members in communication group
1076   * @sock: socket structure
1077   * @m: message to send
1078   * @dlen: total length of message data
1079   * @timeout: timeout to wait for wakeup
1080   *
1081   * Called from function tipc_sendmsg(), which has done all sanity checks
1082   * Return: the number of bytes sent on success, or errno
1083   */
tipc_send_group_bcast(struct socket * sock,struct msghdr * m,int dlen,long timeout)1084  static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1085  				 int dlen, long timeout)
1086  {
1087  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1088  	struct sock *sk = sock->sk;
1089  	struct net *net = sock_net(sk);
1090  	struct tipc_sock *tsk = tipc_sk(sk);
1091  	struct tipc_nlist *dsts;
1092  	struct tipc_mc_method *method = &tsk->mc_method;
1093  	bool ack = method->mandatory && method->rcast;
1094  	int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1095  	struct tipc_msg *hdr = &tsk->phdr;
1096  	int mtu = tipc_bcast_get_mtu(net);
1097  	struct sk_buff_head pkts;
1098  	int rc = -EHOSTUNREACH;
1099  
1100  	/* Block or return if any destination link or member is congested */
1101  	rc = tipc_wait_for_cond(sock, &timeout,
1102  				!tsk->cong_link_cnt && tsk->group &&
1103  				!tipc_group_bc_cong(tsk->group, blks));
1104  	if (unlikely(rc))
1105  		return rc;
1106  
1107  	dsts = tipc_group_dests(tsk->group);
1108  	if (!dsts->local && !dsts->remote)
1109  		return -EHOSTUNREACH;
1110  
1111  	/* Complete message header */
1112  	if (ua) {
1113  		msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1114  		msg_set_nameinst(hdr, ua->sa.instance);
1115  	} else {
1116  		msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1117  		msg_set_nameinst(hdr, 0);
1118  	}
1119  	msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1120  	msg_set_destport(hdr, 0);
1121  	msg_set_destnode(hdr, 0);
1122  	msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
1123  
1124  	/* Avoid getting stuck with repeated forced replicasts */
1125  	msg_set_grp_bc_ack_req(hdr, ack);
1126  
1127  	/* Build message as chain of buffers */
1128  	__skb_queue_head_init(&pkts);
1129  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1130  	if (unlikely(rc != dlen))
1131  		return rc;
1132  
1133  	/* Send message */
1134  	rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1135  	if (unlikely(rc))
1136  		return rc;
1137  
1138  	/* Update broadcast sequence number and send windows */
1139  	tipc_group_update_bc_members(tsk->group, blks, ack);
1140  
1141  	/* Broadcast link is now free to choose method for next broadcast */
1142  	method->mandatory = false;
1143  	method->expires = jiffies;
1144  
1145  	return dlen;
1146  }
1147  
1148  /**
1149   * tipc_send_group_mcast - send message to all members with given identity
1150   * @sock: socket structure
1151   * @m: message to send
1152   * @dlen: total length of message data
1153   * @timeout: timeout to wait for wakeup
1154   *
1155   * Called from function tipc_sendmsg(), which has done all sanity checks
1156   * Return: the number of bytes sent on success, or errno
1157   */
tipc_send_group_mcast(struct socket * sock,struct msghdr * m,int dlen,long timeout)1158  static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1159  				 int dlen, long timeout)
1160  {
1161  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1162  	struct sock *sk = sock->sk;
1163  	struct tipc_sock *tsk = tipc_sk(sk);
1164  	struct tipc_group *grp = tsk->group;
1165  	struct tipc_msg *hdr = &tsk->phdr;
1166  	struct net *net = sock_net(sk);
1167  	struct list_head dsts;
1168  	u32 dstcnt, exclude;
1169  
1170  	INIT_LIST_HEAD(&dsts);
1171  	ua->sa.type = msg_nametype(hdr);
1172  	ua->scope = msg_lookup_scope(hdr);
1173  	exclude = tipc_group_exclude(grp);
1174  
1175  	if (!tipc_nametbl_lookup_group(net, ua, &dsts, &dstcnt, exclude, true))
1176  		return -EHOSTUNREACH;
1177  
1178  	if (dstcnt == 1) {
1179  		tipc_dest_pop(&dsts, &ua->sk.node, &ua->sk.ref);
1180  		return tipc_send_group_unicast(sock, m, dlen, timeout);
1181  	}
1182  
1183  	tipc_dest_list_purge(&dsts);
1184  	return tipc_send_group_bcast(sock, m, dlen, timeout);
1185  }
1186  
1187  /**
1188   * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
1189   * @net: the associated network namespace
1190   * @arrvq: queue with arriving messages, to be cloned after destination lookup
1191   * @inputq: queue with cloned messages, delivered to socket after dest lookup
1192   *
1193   * Multi-threaded: parallel calls with reference to same queues may occur
1194   */
tipc_sk_mcast_rcv(struct net * net,struct sk_buff_head * arrvq,struct sk_buff_head * inputq)1195  void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1196  		       struct sk_buff_head *inputq)
1197  {
1198  	u32 self = tipc_own_addr(net);
1199  	struct sk_buff *skb, *_skb;
1200  	u32 portid, onode;
1201  	struct sk_buff_head tmpq;
1202  	struct list_head dports;
1203  	struct tipc_msg *hdr;
1204  	struct tipc_uaddr ua;
1205  	int user, mtyp, hlen;
1206  
1207  	__skb_queue_head_init(&tmpq);
1208  	INIT_LIST_HEAD(&dports);
1209  	ua.addrtype = TIPC_SERVICE_RANGE;
1210  
1211  	/* tipc_skb_peek() increments the head skb's reference counter */
1212  	skb = tipc_skb_peek(arrvq, &inputq->lock);
1213  	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1214  		hdr = buf_msg(skb);
1215  		user = msg_user(hdr);
1216  		mtyp = msg_type(hdr);
1217  		hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1218  		onode = msg_orignode(hdr);
1219  		ua.sr.type = msg_nametype(hdr);
1220  		ua.sr.lower = msg_namelower(hdr);
1221  		ua.sr.upper = msg_nameupper(hdr);
1222  		if (onode == self)
1223  			ua.scope = TIPC_ANY_SCOPE;
1224  		else
1225  			ua.scope = TIPC_CLUSTER_SCOPE;
1226  
1227  		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1228  			spin_lock_bh(&inputq->lock);
1229  			if (skb_peek(arrvq) == skb) {
1230  				__skb_dequeue(arrvq);
1231  				__skb_queue_tail(inputq, skb);
1232  			}
1233  			kfree_skb(skb);
1234  			spin_unlock_bh(&inputq->lock);
1235  			continue;
1236  		}
1237  
1238  		/* Group messages require exact scope match */
1239  		if (msg_in_group(hdr)) {
1240  			ua.sr.lower = 0;
1241  			ua.sr.upper = ~0;
1242  			ua.scope = msg_lookup_scope(hdr);
1243  		}
1244  
1245  		/* Create destination port list: */
1246  		tipc_nametbl_lookup_mcast_sockets(net, &ua, &dports);
1247  
1248  		/* Clone message per destination */
1249  		while (tipc_dest_pop(&dports, NULL, &portid)) {
1250  			_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1251  			if (_skb) {
1252  				msg_set_destport(buf_msg(_skb), portid);
1253  				__skb_queue_tail(&tmpq, _skb);
1254  				continue;
1255  			}
1256  			pr_warn("Failed to clone mcast rcv buffer\n");
1257  		}
1258  		/* Append clones to inputq only if skb is still head of arrvq */
1259  		spin_lock_bh(&inputq->lock);
1260  		if (skb_peek(arrvq) == skb) {
1261  			skb_queue_splice_tail_init(&tmpq, inputq);
1262  			/* Decrement the skb's refcnt */
1263  			kfree_skb(__skb_dequeue(arrvq));
1264  		}
1265  		spin_unlock_bh(&inputq->lock);
1266  		__skb_queue_purge(&tmpq);
1267  		kfree_skb(skb);
1268  	}
1269  	tipc_sk_rcv(net, inputq);
1270  }
1271  
1272  /* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
1273   *                         when socket is in Nagle mode
1274   */
tipc_sk_push_backlog(struct tipc_sock * tsk,bool nagle_ack)1275  static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
1276  {
1277  	struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
1278  	struct sk_buff *skb = skb_peek_tail(txq);
1279  	struct net *net = sock_net(&tsk->sk);
1280  	u32 dnode = tsk_peer_node(tsk);
1281  	int rc;
1282  
1283  	if (nagle_ack) {
1284  		tsk->pkt_cnt += skb_queue_len(txq);
1285  		if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
1286  			tsk->oneway = 0;
1287  			if (tsk->nagle_start < NAGLE_START_MAX)
1288  				tsk->nagle_start *= 2;
1289  			tsk->expect_ack = false;
1290  			pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
1291  				 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
1292  				 tsk->nagle_start);
1293  		} else {
1294  			tsk->nagle_start = NAGLE_START_INIT;
1295  			if (skb) {
1296  				msg_set_ack_required(buf_msg(skb));
1297  				tsk->expect_ack = true;
1298  			} else {
1299  				tsk->expect_ack = false;
1300  			}
1301  		}
1302  		tsk->msg_acc = 0;
1303  		tsk->pkt_cnt = 0;
1304  	}
1305  
1306  	if (!skb || tsk->cong_link_cnt)
1307  		return;
1308  
1309  	/* Do not send SYN again after congestion */
1310  	if (msg_is_syn(buf_msg(skb)))
1311  		return;
1312  
1313  	if (tsk->msg_acc)
1314  		tsk->pkt_cnt += skb_queue_len(txq);
1315  	tsk->snt_unacked += tsk->snd_backlog;
1316  	tsk->snd_backlog = 0;
1317  	rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1318  	if (rc == -ELINKCONG)
1319  		tsk->cong_link_cnt = 1;
1320  }
1321  
1322  /**
1323   * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
1324   * @tsk: receiving socket
1325   * @skb: pointer to message buffer.
1326   * @inputq: buffer list containing the buffers
1327   * @xmitq: output message area
1328   */
tipc_sk_conn_proto_rcv(struct tipc_sock * tsk,struct sk_buff * skb,struct sk_buff_head * inputq,struct sk_buff_head * xmitq)1329  static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
1330  				   struct sk_buff_head *inputq,
1331  				   struct sk_buff_head *xmitq)
1332  {
1333  	struct tipc_msg *hdr = buf_msg(skb);
1334  	u32 onode = tsk_own_node(tsk);
1335  	struct sock *sk = &tsk->sk;
1336  	int mtyp = msg_type(hdr);
1337  	bool was_cong;
1338  
1339  	/* Ignore if connection cannot be validated: */
1340  	if (!tsk_peer_msg(tsk, hdr)) {
1341  		trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
1342  		goto exit;
1343  	}
1344  
1345  	if (unlikely(msg_errcode(hdr))) {
1346  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1347  		tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
1348  				      tsk_peer_port(tsk));
1349  		sk->sk_state_change(sk);
1350  
1351  		/* State change is ignored if socket already awake,
1352  		 * - convert msg to abort msg and add to inqueue
1353  		 */
1354  		msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
1355  		msg_set_type(hdr, TIPC_CONN_MSG);
1356  		msg_set_size(hdr, BASIC_H_SIZE);
1357  		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1358  		__skb_queue_tail(inputq, skb);
1359  		return;
1360  	}
1361  
1362  	tsk->probe_unacked = false;
1363  
1364  	if (mtyp == CONN_PROBE) {
1365  		msg_set_type(hdr, CONN_PROBE_REPLY);
1366  		if (tipc_msg_reverse(onode, &skb, TIPC_OK))
1367  			__skb_queue_tail(xmitq, skb);
1368  		return;
1369  	} else if (mtyp == CONN_ACK) {
1370  		was_cong = tsk_conn_cong(tsk);
1371  		tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
1372  		tsk->snt_unacked -= msg_conn_ack(hdr);
1373  		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1374  			tsk->snd_win = msg_adv_win(hdr);
1375  		if (was_cong && !tsk_conn_cong(tsk))
1376  			sk->sk_write_space(sk);
1377  	} else if (mtyp != CONN_PROBE_REPLY) {
1378  		pr_warn("Received unknown CONN_PROTO msg\n");
1379  	}
1380  exit:
1381  	kfree_skb(skb);
1382  }
1383  
1384  /**
1385   * tipc_sendmsg - send message in connectionless manner
1386   * @sock: socket structure
1387   * @m: message to send
1388   * @dsz: amount of user data to be sent
1389   *
1390   * Message must have an destination specified explicitly.
1391   * Used for SOCK_RDM and SOCK_DGRAM messages,
1392   * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
1393   * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
1394   *
1395   * Return: the number of bytes sent on success, or errno otherwise
1396   */
tipc_sendmsg(struct socket * sock,struct msghdr * m,size_t dsz)1397  static int tipc_sendmsg(struct socket *sock,
1398  			struct msghdr *m, size_t dsz)
1399  {
1400  	struct sock *sk = sock->sk;
1401  	int ret;
1402  
1403  	lock_sock(sk);
1404  	ret = __tipc_sendmsg(sock, m, dsz);
1405  	release_sock(sk);
1406  
1407  	return ret;
1408  }
1409  
__tipc_sendmsg(struct socket * sock,struct msghdr * m,size_t dlen)1410  static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1411  {
1412  	struct sock *sk = sock->sk;
1413  	struct net *net = sock_net(sk);
1414  	struct tipc_sock *tsk = tipc_sk(sk);
1415  	struct tipc_uaddr *ua = (struct tipc_uaddr *)m->msg_name;
1416  	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1417  	struct list_head *clinks = &tsk->cong_links;
1418  	bool syn = !tipc_sk_type_connectionless(sk);
1419  	struct tipc_group *grp = tsk->group;
1420  	struct tipc_msg *hdr = &tsk->phdr;
1421  	struct tipc_socket_addr skaddr;
1422  	struct sk_buff_head pkts;
1423  	int atype, mtu, rc;
1424  
1425  	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
1426  		return -EMSGSIZE;
1427  
1428  	if (ua) {
1429  		if (!tipc_uaddr_valid(ua, m->msg_namelen))
1430  			return -EINVAL;
1431  		atype = ua->addrtype;
1432  	}
1433  
1434  	/* If socket belongs to a communication group follow other paths */
1435  	if (grp) {
1436  		if (!ua)
1437  			return tipc_send_group_bcast(sock, m, dlen, timeout);
1438  		if (atype == TIPC_SERVICE_ADDR)
1439  			return tipc_send_group_anycast(sock, m, dlen, timeout);
1440  		if (atype == TIPC_SOCKET_ADDR)
1441  			return tipc_send_group_unicast(sock, m, dlen, timeout);
1442  		if (atype == TIPC_SERVICE_RANGE)
1443  			return tipc_send_group_mcast(sock, m, dlen, timeout);
1444  		return -EINVAL;
1445  	}
1446  
1447  	if (!ua) {
1448  		ua = (struct tipc_uaddr *)&tsk->peer;
1449  		if (!syn && ua->family != AF_TIPC)
1450  			return -EDESTADDRREQ;
1451  		atype = ua->addrtype;
1452  	}
1453  
1454  	if (unlikely(syn)) {
1455  		if (sk->sk_state == TIPC_LISTEN)
1456  			return -EPIPE;
1457  		if (sk->sk_state != TIPC_OPEN)
1458  			return -EISCONN;
1459  		if (tsk->published)
1460  			return -EOPNOTSUPP;
1461  		if (atype == TIPC_SERVICE_ADDR)
1462  			tsk->conn_addrtype = atype;
1463  		msg_set_syn(hdr, 1);
1464  	}
1465  
1466  	memset(&skaddr, 0, sizeof(skaddr));
1467  
1468  	/* Determine destination */
1469  	if (atype == TIPC_SERVICE_RANGE) {
1470  		return tipc_sendmcast(sock, ua, m, dlen, timeout);
1471  	} else if (atype == TIPC_SERVICE_ADDR) {
1472  		skaddr.node = ua->lookup_node;
1473  		ua->scope = tipc_node2scope(skaddr.node);
1474  		if (!tipc_nametbl_lookup_anycast(net, ua, &skaddr))
1475  			return -EHOSTUNREACH;
1476  	} else if (atype == TIPC_SOCKET_ADDR) {
1477  		skaddr = ua->sk;
1478  	} else {
1479  		return -EINVAL;
1480  	}
1481  
1482  	/* Block or return if destination link is congested */
1483  	rc = tipc_wait_for_cond(sock, &timeout,
1484  				!tipc_dest_find(clinks, skaddr.node, 0));
1485  	if (unlikely(rc))
1486  		return rc;
1487  
1488  	/* Finally build message header */
1489  	msg_set_destnode(hdr, skaddr.node);
1490  	msg_set_destport(hdr, skaddr.ref);
1491  	if (atype == TIPC_SERVICE_ADDR) {
1492  		msg_set_type(hdr, TIPC_NAMED_MSG);
1493  		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
1494  		msg_set_nametype(hdr, ua->sa.type);
1495  		msg_set_nameinst(hdr, ua->sa.instance);
1496  		msg_set_lookup_scope(hdr, ua->scope);
1497  	} else { /* TIPC_SOCKET_ADDR */
1498  		msg_set_type(hdr, TIPC_DIRECT_MSG);
1499  		msg_set_lookup_scope(hdr, 0);
1500  		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1501  	}
1502  
1503  	/* Add message body */
1504  	__skb_queue_head_init(&pkts);
1505  	mtu = tipc_node_get_mtu(net, skaddr.node, tsk->portid, true);
1506  	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1507  	if (unlikely(rc != dlen))
1508  		return rc;
1509  	if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
1510  		__skb_queue_purge(&pkts);
1511  		return -ENOMEM;
1512  	}
1513  
1514  	/* Send message */
1515  	trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
1516  	rc = tipc_node_xmit(net, &pkts, skaddr.node, tsk->portid);
1517  	if (unlikely(rc == -ELINKCONG)) {
1518  		tipc_dest_push(clinks, skaddr.node, 0);
1519  		tsk->cong_link_cnt++;
1520  		rc = 0;
1521  	}
1522  
1523  	if (unlikely(syn && !rc)) {
1524  		tipc_set_sk_state(sk, TIPC_CONNECTING);
1525  		if (dlen && timeout) {
1526  			timeout = msecs_to_jiffies(timeout);
1527  			tipc_wait_for_connect(sock, &timeout);
1528  		}
1529  	}
1530  
1531  	return rc ? rc : dlen;
1532  }
1533  
1534  /**
1535   * tipc_sendstream - send stream-oriented data
1536   * @sock: socket structure
1537   * @m: data to send
1538   * @dsz: total length of data to be transmitted
1539   *
1540   * Used for SOCK_STREAM data.
1541   *
1542   * Return: the number of bytes sent on success (or partial success),
1543   * or errno if no data sent
1544   */
tipc_sendstream(struct socket * sock,struct msghdr * m,size_t dsz)1545  static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1546  {
1547  	struct sock *sk = sock->sk;
1548  	int ret;
1549  
1550  	lock_sock(sk);
1551  	ret = __tipc_sendstream(sock, m, dsz);
1552  	release_sock(sk);
1553  
1554  	return ret;
1555  }
1556  
__tipc_sendstream(struct socket * sock,struct msghdr * m,size_t dlen)1557  static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1558  {
1559  	struct sock *sk = sock->sk;
1560  	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1561  	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1562  	struct sk_buff_head *txq = &sk->sk_write_queue;
1563  	struct tipc_sock *tsk = tipc_sk(sk);
1564  	struct tipc_msg *hdr = &tsk->phdr;
1565  	struct net *net = sock_net(sk);
1566  	struct sk_buff *skb;
1567  	u32 dnode = tsk_peer_node(tsk);
1568  	int maxnagle = tsk->maxnagle;
1569  	int maxpkt = tsk->max_pkt;
1570  	int send, sent = 0;
1571  	int blocks, rc = 0;
1572  
1573  	if (unlikely(dlen > INT_MAX))
1574  		return -EMSGSIZE;
1575  
1576  	/* Handle implicit connection setup */
1577  	if (unlikely(dest && sk->sk_state == TIPC_OPEN)) {
1578  		rc = __tipc_sendmsg(sock, m, dlen);
1579  		if (dlen && dlen == rc) {
1580  			tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
1581  			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1582  		}
1583  		return rc;
1584  	}
1585  
1586  	do {
1587  		rc = tipc_wait_for_cond(sock, &timeout,
1588  					(!tsk->cong_link_cnt &&
1589  					 !tsk_conn_cong(tsk) &&
1590  					 tipc_sk_connected(sk)));
1591  		if (unlikely(rc))
1592  			break;
1593  		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1594  		blocks = tsk->snd_backlog;
1595  		if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
1596  		    send <= maxnagle) {
1597  			rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
1598  			if (unlikely(rc < 0))
1599  				break;
1600  			blocks += rc;
1601  			tsk->msg_acc++;
1602  			if (blocks <= 64 && tsk->expect_ack) {
1603  				tsk->snd_backlog = blocks;
1604  				sent += send;
1605  				break;
1606  			} else if (blocks > 64) {
1607  				tsk->pkt_cnt += skb_queue_len(txq);
1608  			} else {
1609  				skb = skb_peek_tail(txq);
1610  				if (skb) {
1611  					msg_set_ack_required(buf_msg(skb));
1612  					tsk->expect_ack = true;
1613  				} else {
1614  					tsk->expect_ack = false;
1615  				}
1616  				tsk->msg_acc = 0;
1617  				tsk->pkt_cnt = 0;
1618  			}
1619  		} else {
1620  			rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
1621  			if (unlikely(rc != send))
1622  				break;
1623  			blocks += tsk_inc(tsk, send + MIN_H_SIZE);
1624  		}
1625  		trace_tipc_sk_sendstream(sk, skb_peek(txq),
1626  					 TIPC_DUMP_SK_SNDQ, " ");
1627  		rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1628  		if (unlikely(rc == -ELINKCONG)) {
1629  			tsk->cong_link_cnt = 1;
1630  			rc = 0;
1631  		}
1632  		if (likely(!rc)) {
1633  			tsk->snt_unacked += blocks;
1634  			tsk->snd_backlog = 0;
1635  			sent += send;
1636  		}
1637  	} while (sent < dlen && !rc);
1638  
1639  	return sent ? sent : rc;
1640  }
1641  
1642  /**
1643   * tipc_send_packet - send a connection-oriented message
1644   * @sock: socket structure
1645   * @m: message to send
1646   * @dsz: length of data to be transmitted
1647   *
1648   * Used for SOCK_SEQPACKET messages.
1649   *
1650   * Return: the number of bytes sent on success, or errno otherwise
1651   */
tipc_send_packet(struct socket * sock,struct msghdr * m,size_t dsz)1652  static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1653  {
1654  	if (dsz > TIPC_MAX_USER_MSG_SIZE)
1655  		return -EMSGSIZE;
1656  
1657  	return tipc_sendstream(sock, m, dsz);
1658  }
1659  
1660  /* tipc_sk_finish_conn - complete the setup of a connection
1661   */
tipc_sk_finish_conn(struct tipc_sock * tsk,u32 peer_port,u32 peer_node)1662  static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1663  				u32 peer_node)
1664  {
1665  	struct sock *sk = &tsk->sk;
1666  	struct net *net = sock_net(sk);
1667  	struct tipc_msg *msg = &tsk->phdr;
1668  
1669  	msg_set_syn(msg, 0);
1670  	msg_set_destnode(msg, peer_node);
1671  	msg_set_destport(msg, peer_port);
1672  	msg_set_type(msg, TIPC_CONN_MSG);
1673  	msg_set_lookup_scope(msg, 0);
1674  	msg_set_hdr_sz(msg, SHORT_H_SIZE);
1675  
1676  	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1677  	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1678  	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1679  	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
1680  	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1681  	tsk_set_nagle(tsk);
1682  	__skb_queue_purge(&sk->sk_write_queue);
1683  	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1684  		return;
1685  
1686  	/* Fall back to message based flow control */
1687  	tsk->rcv_win = FLOWCTL_MSG_WIN;
1688  	tsk->snd_win = FLOWCTL_MSG_WIN;
1689  }
1690  
1691  /**
1692   * tipc_sk_set_orig_addr - capture sender's address for received message
1693   * @m: descriptor for message info
1694   * @skb: received message
1695   *
1696   * Note: Address is not captured if not requested by receiver.
1697   */
tipc_sk_set_orig_addr(struct msghdr * m,struct sk_buff * skb)1698  static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1699  {
1700  	DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1701  	struct tipc_msg *hdr = buf_msg(skb);
1702  
1703  	if (!srcaddr)
1704  		return;
1705  
1706  	srcaddr->sock.family = AF_TIPC;
1707  	srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
1708  	srcaddr->sock.scope = 0;
1709  	srcaddr->sock.addr.id.ref = msg_origport(hdr);
1710  	srcaddr->sock.addr.id.node = msg_orignode(hdr);
1711  	srcaddr->sock.addr.name.domain = 0;
1712  	m->msg_namelen = sizeof(struct sockaddr_tipc);
1713  
1714  	if (!msg_in_group(hdr))
1715  		return;
1716  
1717  	/* Group message users may also want to know sending member's id */
1718  	srcaddr->member.family = AF_TIPC;
1719  	srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
1720  	srcaddr->member.scope = 0;
1721  	srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1722  	srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1723  	srcaddr->member.addr.name.domain = 0;
1724  	m->msg_namelen = sizeof(*srcaddr);
1725  }
1726  
1727  /**
1728   * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1729   * @m: descriptor for message info
1730   * @skb: received message buffer
1731   * @tsk: TIPC port associated with message
1732   *
1733   * Note: Ancillary data is not captured if not requested by receiver.
1734   *
1735   * Return: 0 if successful, otherwise errno
1736   */
tipc_sk_anc_data_recv(struct msghdr * m,struct sk_buff * skb,struct tipc_sock * tsk)1737  static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
1738  				 struct tipc_sock *tsk)
1739  {
1740  	struct tipc_msg *hdr;
1741  	u32 data[3] = {0,};
1742  	bool has_addr;
1743  	int dlen, rc;
1744  
1745  	if (likely(m->msg_controllen == 0))
1746  		return 0;
1747  
1748  	hdr = buf_msg(skb);
1749  	dlen = msg_data_sz(hdr);
1750  
1751  	/* Capture errored message object, if any */
1752  	if (msg_errcode(hdr)) {
1753  		if (skb_linearize(skb))
1754  			return -ENOMEM;
1755  		hdr = buf_msg(skb);
1756  		data[0] = msg_errcode(hdr);
1757  		data[1] = dlen;
1758  		rc = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, data);
1759  		if (rc || !dlen)
1760  			return rc;
1761  		rc = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, dlen, msg_data(hdr));
1762  		if (rc)
1763  			return rc;
1764  	}
1765  
1766  	/* Capture TIPC_SERVICE_ADDR/RANGE destination address, if any */
1767  	switch (msg_type(hdr)) {
1768  	case TIPC_NAMED_MSG:
1769  		has_addr = true;
1770  		data[0] = msg_nametype(hdr);
1771  		data[1] = msg_namelower(hdr);
1772  		data[2] = data[1];
1773  		break;
1774  	case TIPC_MCAST_MSG:
1775  		has_addr = true;
1776  		data[0] = msg_nametype(hdr);
1777  		data[1] = msg_namelower(hdr);
1778  		data[2] = msg_nameupper(hdr);
1779  		break;
1780  	case TIPC_CONN_MSG:
1781  		has_addr = !!tsk->conn_addrtype;
1782  		data[0] = msg_nametype(&tsk->phdr);
1783  		data[1] = msg_nameinst(&tsk->phdr);
1784  		data[2] = data[1];
1785  		break;
1786  	default:
1787  		has_addr = false;
1788  	}
1789  	if (!has_addr)
1790  		return 0;
1791  	return put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, data);
1792  }
1793  
tipc_sk_build_ack(struct tipc_sock * tsk)1794  static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
1795  {
1796  	struct sock *sk = &tsk->sk;
1797  	struct sk_buff *skb = NULL;
1798  	struct tipc_msg *msg;
1799  	u32 peer_port = tsk_peer_port(tsk);
1800  	u32 dnode = tsk_peer_node(tsk);
1801  
1802  	if (!tipc_sk_connected(sk))
1803  		return NULL;
1804  	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1805  			      dnode, tsk_own_node(tsk), peer_port,
1806  			      tsk->portid, TIPC_OK);
1807  	if (!skb)
1808  		return NULL;
1809  	msg = buf_msg(skb);
1810  	msg_set_conn_ack(msg, tsk->rcv_unacked);
1811  	tsk->rcv_unacked = 0;
1812  
1813  	/* Adjust to and advertize the correct window limit */
1814  	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1815  		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1816  		msg_set_adv_win(msg, tsk->rcv_win);
1817  	}
1818  	return skb;
1819  }
1820  
tipc_sk_send_ack(struct tipc_sock * tsk)1821  static void tipc_sk_send_ack(struct tipc_sock *tsk)
1822  {
1823  	struct sk_buff *skb;
1824  
1825  	skb = tipc_sk_build_ack(tsk);
1826  	if (!skb)
1827  		return;
1828  
1829  	tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
1830  			   msg_link_selector(buf_msg(skb)));
1831  }
1832  
tipc_wait_for_rcvmsg(struct socket * sock,long * timeop)1833  static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1834  {
1835  	struct sock *sk = sock->sk;
1836  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
1837  	long timeo = *timeop;
1838  	int err = sock_error(sk);
1839  
1840  	if (err)
1841  		return err;
1842  
1843  	for (;;) {
1844  		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1845  			if (sk->sk_shutdown & RCV_SHUTDOWN) {
1846  				err = -ENOTCONN;
1847  				break;
1848  			}
1849  			add_wait_queue(sk_sleep(sk), &wait);
1850  			release_sock(sk);
1851  			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
1852  			sched_annotate_sleep();
1853  			lock_sock(sk);
1854  			remove_wait_queue(sk_sleep(sk), &wait);
1855  		}
1856  		err = 0;
1857  		if (!skb_queue_empty(&sk->sk_receive_queue))
1858  			break;
1859  		err = -EAGAIN;
1860  		if (!timeo)
1861  			break;
1862  		err = sock_intr_errno(timeo);
1863  		if (signal_pending(current))
1864  			break;
1865  
1866  		err = sock_error(sk);
1867  		if (err)
1868  			break;
1869  	}
1870  	*timeop = timeo;
1871  	return err;
1872  }
1873  
1874  /**
1875   * tipc_recvmsg - receive packet-oriented message
1876   * @sock: network socket
1877   * @m: descriptor for message info
1878   * @buflen: length of user buffer area
1879   * @flags: receive flags
1880   *
1881   * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1882   * If the complete message doesn't fit in user area, truncate it.
1883   *
1884   * Return: size of returned message data, errno otherwise
1885   */
tipc_recvmsg(struct socket * sock,struct msghdr * m,size_t buflen,int flags)1886  static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1887  			size_t buflen,	int flags)
1888  {
1889  	struct sock *sk = sock->sk;
1890  	bool connected = !tipc_sk_type_connectionless(sk);
1891  	struct tipc_sock *tsk = tipc_sk(sk);
1892  	int rc, err, hlen, dlen, copy;
1893  	struct tipc_skb_cb *skb_cb;
1894  	struct sk_buff_head xmitq;
1895  	struct tipc_msg *hdr;
1896  	struct sk_buff *skb;
1897  	bool grp_evt;
1898  	long timeout;
1899  
1900  	/* Catch invalid receive requests */
1901  	if (unlikely(!buflen))
1902  		return -EINVAL;
1903  
1904  	lock_sock(sk);
1905  	if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
1906  		rc = -ENOTCONN;
1907  		goto exit;
1908  	}
1909  	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1910  
1911  	/* Step rcv queue to first msg with data or error; wait if necessary */
1912  	do {
1913  		rc = tipc_wait_for_rcvmsg(sock, &timeout);
1914  		if (unlikely(rc))
1915  			goto exit;
1916  		skb = skb_peek(&sk->sk_receive_queue);
1917  		skb_cb = TIPC_SKB_CB(skb);
1918  		hdr = buf_msg(skb);
1919  		dlen = msg_data_sz(hdr);
1920  		hlen = msg_hdr_sz(hdr);
1921  		err = msg_errcode(hdr);
1922  		grp_evt = msg_is_grp_evt(hdr);
1923  		if (likely(dlen || err))
1924  			break;
1925  		tsk_advance_rx_queue(sk);
1926  	} while (1);
1927  
1928  	/* Collect msg meta data, including error code and rejected data */
1929  	tipc_sk_set_orig_addr(m, skb);
1930  	rc = tipc_sk_anc_data_recv(m, skb, tsk);
1931  	if (unlikely(rc))
1932  		goto exit;
1933  	hdr = buf_msg(skb);
1934  
1935  	/* Capture data if non-error msg, otherwise just set return value */
1936  	if (likely(!err)) {
1937  		int offset = skb_cb->bytes_read;
1938  
1939  		copy = min_t(int, dlen - offset, buflen);
1940  		rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
1941  		if (unlikely(rc))
1942  			goto exit;
1943  		if (unlikely(offset + copy < dlen)) {
1944  			if (flags & MSG_EOR) {
1945  				if (!(flags & MSG_PEEK))
1946  					skb_cb->bytes_read = offset + copy;
1947  			} else {
1948  				m->msg_flags |= MSG_TRUNC;
1949  				skb_cb->bytes_read = 0;
1950  			}
1951  		} else {
1952  			if (flags & MSG_EOR)
1953  				m->msg_flags |= MSG_EOR;
1954  			skb_cb->bytes_read = 0;
1955  		}
1956  	} else {
1957  		copy = 0;
1958  		rc = 0;
1959  		if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control) {
1960  			rc = -ECONNRESET;
1961  			goto exit;
1962  		}
1963  	}
1964  
1965  	/* Mark message as group event if applicable */
1966  	if (unlikely(grp_evt)) {
1967  		if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1968  			m->msg_flags |= MSG_EOR;
1969  		m->msg_flags |= MSG_OOB;
1970  		copy = 0;
1971  	}
1972  
1973  	/* Caption of data or error code/rejected data was successful */
1974  	if (unlikely(flags & MSG_PEEK))
1975  		goto exit;
1976  
1977  	/* Send group flow control advertisement when applicable */
1978  	if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1979  		__skb_queue_head_init(&xmitq);
1980  		tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1981  					  msg_orignode(hdr), msg_origport(hdr),
1982  					  &xmitq);
1983  		tipc_node_distr_xmit(sock_net(sk), &xmitq);
1984  	}
1985  
1986  	if (skb_cb->bytes_read)
1987  		goto exit;
1988  
1989  	tsk_advance_rx_queue(sk);
1990  
1991  	if (likely(!connected))
1992  		goto exit;
1993  
1994  	/* Send connection flow control advertisement when applicable */
1995  	tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1996  	if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1997  		tipc_sk_send_ack(tsk);
1998  exit:
1999  	release_sock(sk);
2000  	return rc ? rc : copy;
2001  }
2002  
2003  /**
2004   * tipc_recvstream - receive stream-oriented data
2005   * @sock: network socket
2006   * @m: descriptor for message info
2007   * @buflen: total size of user buffer area
2008   * @flags: receive flags
2009   *
2010   * Used for SOCK_STREAM messages only.  If not enough data is available
2011   * will optionally wait for more; never truncates data.
2012   *
2013   * Return: size of returned message data, errno otherwise
2014   */
tipc_recvstream(struct socket * sock,struct msghdr * m,size_t buflen,int flags)2015  static int tipc_recvstream(struct socket *sock, struct msghdr *m,
2016  			   size_t buflen, int flags)
2017  {
2018  	struct sock *sk = sock->sk;
2019  	struct tipc_sock *tsk = tipc_sk(sk);
2020  	struct sk_buff *skb;
2021  	struct tipc_msg *hdr;
2022  	struct tipc_skb_cb *skb_cb;
2023  	bool peek = flags & MSG_PEEK;
2024  	int offset, required, copy, copied = 0;
2025  	int hlen, dlen, err, rc;
2026  	long timeout;
2027  
2028  	/* Catch invalid receive attempts */
2029  	if (unlikely(!buflen))
2030  		return -EINVAL;
2031  
2032  	lock_sock(sk);
2033  
2034  	if (unlikely(sk->sk_state == TIPC_OPEN)) {
2035  		rc = -ENOTCONN;
2036  		goto exit;
2037  	}
2038  	required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
2039  	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
2040  
2041  	do {
2042  		/* Look at first msg in receive queue; wait if necessary */
2043  		rc = tipc_wait_for_rcvmsg(sock, &timeout);
2044  		if (unlikely(rc))
2045  			break;
2046  		skb = skb_peek(&sk->sk_receive_queue);
2047  		skb_cb = TIPC_SKB_CB(skb);
2048  		hdr = buf_msg(skb);
2049  		dlen = msg_data_sz(hdr);
2050  		hlen = msg_hdr_sz(hdr);
2051  		err = msg_errcode(hdr);
2052  
2053  		/* Discard any empty non-errored (SYN-) message */
2054  		if (unlikely(!dlen && !err)) {
2055  			tsk_advance_rx_queue(sk);
2056  			continue;
2057  		}
2058  
2059  		/* Collect msg meta data, incl. error code and rejected data */
2060  		if (!copied) {
2061  			tipc_sk_set_orig_addr(m, skb);
2062  			rc = tipc_sk_anc_data_recv(m, skb, tsk);
2063  			if (rc)
2064  				break;
2065  			hdr = buf_msg(skb);
2066  		}
2067  
2068  		/* Copy data if msg ok, otherwise return error/partial data */
2069  		if (likely(!err)) {
2070  			offset = skb_cb->bytes_read;
2071  			copy = min_t(int, dlen - offset, buflen - copied);
2072  			rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
2073  			if (unlikely(rc))
2074  				break;
2075  			copied += copy;
2076  			offset += copy;
2077  			if (unlikely(offset < dlen)) {
2078  				if (!peek)
2079  					skb_cb->bytes_read = offset;
2080  				break;
2081  			}
2082  		} else {
2083  			rc = 0;
2084  			if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
2085  				rc = -ECONNRESET;
2086  			if (copied || rc)
2087  				break;
2088  		}
2089  
2090  		if (unlikely(peek))
2091  			break;
2092  
2093  		tsk_advance_rx_queue(sk);
2094  
2095  		/* Send connection flow control advertisement when applicable */
2096  		tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
2097  		if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
2098  			tipc_sk_send_ack(tsk);
2099  
2100  		/* Exit if all requested data or FIN/error received */
2101  		if (copied == buflen || err)
2102  			break;
2103  
2104  	} while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
2105  exit:
2106  	release_sock(sk);
2107  	return copied ? copied : rc;
2108  }
2109  
2110  /**
2111   * tipc_write_space - wake up thread if port congestion is released
2112   * @sk: socket
2113   */
tipc_write_space(struct sock * sk)2114  static void tipc_write_space(struct sock *sk)
2115  {
2116  	struct socket_wq *wq;
2117  
2118  	rcu_read_lock();
2119  	wq = rcu_dereference(sk->sk_wq);
2120  	if (skwq_has_sleeper(wq))
2121  		wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
2122  						EPOLLWRNORM | EPOLLWRBAND);
2123  	rcu_read_unlock();
2124  }
2125  
2126  /**
2127   * tipc_data_ready - wake up threads to indicate messages have been received
2128   * @sk: socket
2129   */
tipc_data_ready(struct sock * sk)2130  static void tipc_data_ready(struct sock *sk)
2131  {
2132  	struct socket_wq *wq;
2133  
2134  	trace_sk_data_ready(sk);
2135  
2136  	rcu_read_lock();
2137  	wq = rcu_dereference(sk->sk_wq);
2138  	if (skwq_has_sleeper(wq))
2139  		wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
2140  						EPOLLRDNORM | EPOLLRDBAND);
2141  	rcu_read_unlock();
2142  }
2143  
tipc_sock_destruct(struct sock * sk)2144  static void tipc_sock_destruct(struct sock *sk)
2145  {
2146  	__skb_queue_purge(&sk->sk_receive_queue);
2147  }
2148  
tipc_sk_proto_rcv(struct sock * sk,struct sk_buff_head * inputq,struct sk_buff_head * xmitq)2149  static void tipc_sk_proto_rcv(struct sock *sk,
2150  			      struct sk_buff_head *inputq,
2151  			      struct sk_buff_head *xmitq)
2152  {
2153  	struct sk_buff *skb = __skb_dequeue(inputq);
2154  	struct tipc_sock *tsk = tipc_sk(sk);
2155  	struct tipc_msg *hdr = buf_msg(skb);
2156  	struct tipc_group *grp = tsk->group;
2157  	bool wakeup = false;
2158  
2159  	switch (msg_user(hdr)) {
2160  	case CONN_MANAGER:
2161  		tipc_sk_conn_proto_rcv(tsk, skb, inputq, xmitq);
2162  		return;
2163  	case SOCK_WAKEUP:
2164  		tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
2165  		/* coupled with smp_rmb() in tipc_wait_for_cond() */
2166  		smp_wmb();
2167  		tsk->cong_link_cnt--;
2168  		wakeup = true;
2169  		tipc_sk_push_backlog(tsk, false);
2170  		break;
2171  	case GROUP_PROTOCOL:
2172  		tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
2173  		break;
2174  	case TOP_SRV:
2175  		tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
2176  				      hdr, inputq, xmitq);
2177  		break;
2178  	default:
2179  		break;
2180  	}
2181  
2182  	if (wakeup)
2183  		sk->sk_write_space(sk);
2184  
2185  	kfree_skb(skb);
2186  }
2187  
2188  /**
2189   * tipc_sk_filter_connect - check incoming message for a connection-based socket
2190   * @tsk: TIPC socket
2191   * @skb: pointer to message buffer.
2192   * @xmitq: for Nagle ACK if any
2193   * Return: true if message should be added to receive queue, false otherwise
2194   */
tipc_sk_filter_connect(struct tipc_sock * tsk,struct sk_buff * skb,struct sk_buff_head * xmitq)2195  static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
2196  				   struct sk_buff_head *xmitq)
2197  {
2198  	struct sock *sk = &tsk->sk;
2199  	struct net *net = sock_net(sk);
2200  	struct tipc_msg *hdr = buf_msg(skb);
2201  	bool con_msg = msg_connected(hdr);
2202  	u32 pport = tsk_peer_port(tsk);
2203  	u32 pnode = tsk_peer_node(tsk);
2204  	u32 oport = msg_origport(hdr);
2205  	u32 onode = msg_orignode(hdr);
2206  	int err = msg_errcode(hdr);
2207  	unsigned long delay;
2208  
2209  	if (unlikely(msg_mcast(hdr)))
2210  		return false;
2211  	tsk->oneway = 0;
2212  
2213  	switch (sk->sk_state) {
2214  	case TIPC_CONNECTING:
2215  		/* Setup ACK */
2216  		if (likely(con_msg)) {
2217  			if (err)
2218  				break;
2219  			tipc_sk_finish_conn(tsk, oport, onode);
2220  			msg_set_importance(&tsk->phdr, msg_importance(hdr));
2221  			/* ACK+ message with data is added to receive queue */
2222  			if (msg_data_sz(hdr))
2223  				return true;
2224  			/* Empty ACK-, - wake up sleeping connect() and drop */
2225  			sk->sk_state_change(sk);
2226  			msg_set_dest_droppable(hdr, 1);
2227  			return false;
2228  		}
2229  		/* Ignore connectionless message if not from listening socket */
2230  		if (oport != pport || onode != pnode)
2231  			return false;
2232  
2233  		/* Rejected SYN */
2234  		if (err != TIPC_ERR_OVERLOAD)
2235  			break;
2236  
2237  		/* Prepare for new setup attempt if we have a SYN clone */
2238  		if (skb_queue_empty(&sk->sk_write_queue))
2239  			break;
2240  		get_random_bytes(&delay, 2);
2241  		delay %= (tsk->conn_timeout / 4);
2242  		delay = msecs_to_jiffies(delay + 100);
2243  		sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
2244  		return false;
2245  	case TIPC_OPEN:
2246  	case TIPC_DISCONNECTING:
2247  		return false;
2248  	case TIPC_LISTEN:
2249  		/* Accept only SYN message */
2250  		if (!msg_is_syn(hdr) &&
2251  		    tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
2252  			return false;
2253  		if (!con_msg && !err)
2254  			return true;
2255  		return false;
2256  	case TIPC_ESTABLISHED:
2257  		if (!skb_queue_empty(&sk->sk_write_queue))
2258  			tipc_sk_push_backlog(tsk, false);
2259  		/* Accept only connection-based messages sent by peer */
2260  		if (likely(con_msg && !err && pport == oport &&
2261  			   pnode == onode)) {
2262  			if (msg_ack_required(hdr)) {
2263  				struct sk_buff *skb;
2264  
2265  				skb = tipc_sk_build_ack(tsk);
2266  				if (skb) {
2267  					msg_set_nagle_ack(buf_msg(skb));
2268  					__skb_queue_tail(xmitq, skb);
2269  				}
2270  			}
2271  			return true;
2272  		}
2273  		if (!tsk_peer_msg(tsk, hdr))
2274  			return false;
2275  		if (!err)
2276  			return true;
2277  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2278  		tipc_node_remove_conn(net, pnode, tsk->portid);
2279  		sk->sk_state_change(sk);
2280  		return true;
2281  	default:
2282  		pr_err("Unknown sk_state %u\n", sk->sk_state);
2283  	}
2284  	/* Abort connection setup attempt */
2285  	tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2286  	sk->sk_err = ECONNREFUSED;
2287  	sk->sk_state_change(sk);
2288  	return true;
2289  }
2290  
2291  /**
2292   * rcvbuf_limit - get proper overload limit of socket receive queue
2293   * @sk: socket
2294   * @skb: message
2295   *
2296   * For connection oriented messages, irrespective of importance,
2297   * default queue limit is 2 MB.
2298   *
2299   * For connectionless messages, queue limits are based on message
2300   * importance as follows:
2301   *
2302   * TIPC_LOW_IMPORTANCE       (2 MB)
2303   * TIPC_MEDIUM_IMPORTANCE    (4 MB)
2304   * TIPC_HIGH_IMPORTANCE      (8 MB)
2305   * TIPC_CRITICAL_IMPORTANCE  (16 MB)
2306   *
2307   * Return: overload limit according to corresponding message importance
2308   */
rcvbuf_limit(struct sock * sk,struct sk_buff * skb)2309  static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
2310  {
2311  	struct tipc_sock *tsk = tipc_sk(sk);
2312  	struct tipc_msg *hdr = buf_msg(skb);
2313  
2314  	if (unlikely(msg_in_group(hdr)))
2315  		return READ_ONCE(sk->sk_rcvbuf);
2316  
2317  	if (unlikely(!msg_connected(hdr)))
2318  		return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);
2319  
2320  	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
2321  		return READ_ONCE(sk->sk_rcvbuf);
2322  
2323  	return FLOWCTL_MSG_LIM;
2324  }
2325  
2326  /**
2327   * tipc_sk_filter_rcv - validate incoming message
2328   * @sk: socket
2329   * @skb: pointer to message.
2330   * @xmitq: output message area (FIXME)
2331   *
2332   * Enqueues message on receive queue if acceptable; optionally handles
2333   * disconnect indication for a connected socket.
2334   *
2335   * Called with socket lock already taken
2336   */
tipc_sk_filter_rcv(struct sock * sk,struct sk_buff * skb,struct sk_buff_head * xmitq)2337  static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2338  			       struct sk_buff_head *xmitq)
2339  {
2340  	bool sk_conn = !tipc_sk_type_connectionless(sk);
2341  	struct tipc_sock *tsk = tipc_sk(sk);
2342  	struct tipc_group *grp = tsk->group;
2343  	struct tipc_msg *hdr = buf_msg(skb);
2344  	struct net *net = sock_net(sk);
2345  	struct sk_buff_head inputq;
2346  	int mtyp = msg_type(hdr);
2347  	int limit, err = TIPC_OK;
2348  
2349  	trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
2350  	TIPC_SKB_CB(skb)->bytes_read = 0;
2351  	__skb_queue_head_init(&inputq);
2352  	__skb_queue_tail(&inputq, skb);
2353  
2354  	if (unlikely(!msg_isdata(hdr)))
2355  		tipc_sk_proto_rcv(sk, &inputq, xmitq);
2356  
2357  	if (unlikely(grp))
2358  		tipc_group_filter_msg(grp, &inputq, xmitq);
2359  
2360  	if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
2361  		tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
2362  
2363  	/* Validate and add to receive buffer if there is space */
2364  	while ((skb = __skb_dequeue(&inputq))) {
2365  		hdr = buf_msg(skb);
2366  		limit = rcvbuf_limit(sk, skb);
2367  		if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
2368  		    (!sk_conn && msg_connected(hdr)) ||
2369  		    (!grp && msg_in_group(hdr)))
2370  			err = TIPC_ERR_NO_PORT;
2371  		else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
2372  			trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
2373  					   "err_overload2!");
2374  			atomic_inc(&sk->sk_drops);
2375  			err = TIPC_ERR_OVERLOAD;
2376  		}
2377  
2378  		if (unlikely(err)) {
2379  			if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
2380  				trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
2381  						      "@filter_rcv!");
2382  				__skb_queue_tail(xmitq, skb);
2383  			}
2384  			err = TIPC_OK;
2385  			continue;
2386  		}
2387  		__skb_queue_tail(&sk->sk_receive_queue, skb);
2388  		skb_set_owner_r(skb, sk);
2389  		trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
2390  					 "rcvq >90% allocated!");
2391  		sk->sk_data_ready(sk);
2392  	}
2393  }
2394  
2395  /**
2396   * tipc_sk_backlog_rcv - handle incoming message from backlog queue
2397   * @sk: socket
2398   * @skb: message
2399   *
2400   * Caller must hold socket lock
2401   */
tipc_sk_backlog_rcv(struct sock * sk,struct sk_buff * skb)2402  static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
2403  {
2404  	unsigned int before = sk_rmem_alloc_get(sk);
2405  	struct sk_buff_head xmitq;
2406  	unsigned int added;
2407  
2408  	__skb_queue_head_init(&xmitq);
2409  
2410  	tipc_sk_filter_rcv(sk, skb, &xmitq);
2411  	added = sk_rmem_alloc_get(sk) - before;
2412  	atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
2413  
2414  	/* Send pending response/rejected messages, if any */
2415  	tipc_node_distr_xmit(sock_net(sk), &xmitq);
2416  	return 0;
2417  }
2418  
2419  /**
2420   * tipc_sk_enqueue - extract all buffers with destination 'dport' from
2421   *                   inputq and try adding them to socket or backlog queue
2422   * @inputq: list of incoming buffers with potentially different destinations
2423   * @sk: socket where the buffers should be enqueued
2424   * @dport: port number for the socket
2425   * @xmitq: output queue
2426   *
2427   * Caller must hold socket lock
2428   */
tipc_sk_enqueue(struct sk_buff_head * inputq,struct sock * sk,u32 dport,struct sk_buff_head * xmitq)2429  static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
2430  			    u32 dport, struct sk_buff_head *xmitq)
2431  {
2432  	unsigned long time_limit = jiffies + usecs_to_jiffies(20000);
2433  	struct sk_buff *skb;
2434  	unsigned int lim;
2435  	atomic_t *dcnt;
2436  	u32 onode;
2437  
2438  	while (skb_queue_len(inputq)) {
2439  		if (unlikely(time_after_eq(jiffies, time_limit)))
2440  			return;
2441  
2442  		skb = tipc_skb_dequeue(inputq, dport);
2443  		if (unlikely(!skb))
2444  			return;
2445  
2446  		/* Add message directly to receive queue if possible */
2447  		if (!sock_owned_by_user(sk)) {
2448  			tipc_sk_filter_rcv(sk, skb, xmitq);
2449  			continue;
2450  		}
2451  
2452  		/* Try backlog, compensating for double-counted bytes */
2453  		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
2454  		if (!sk->sk_backlog.len)
2455  			atomic_set(dcnt, 0);
2456  		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
2457  		if (likely(!sk_add_backlog(sk, skb, lim))) {
2458  			trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
2459  						 "bklg & rcvq >90% allocated!");
2460  			continue;
2461  		}
2462  
2463  		trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
2464  		/* Overload => reject message back to sender */
2465  		onode = tipc_own_addr(sock_net(sk));
2466  		atomic_inc(&sk->sk_drops);
2467  		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
2468  			trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
2469  					      "@sk_enqueue!");
2470  			__skb_queue_tail(xmitq, skb);
2471  		}
2472  		break;
2473  	}
2474  }
2475  
2476  /**
2477   * tipc_sk_rcv - handle a chain of incoming buffers
2478   * @net: the associated network namespace
2479   * @inputq: buffer list containing the buffers
2480   * Consumes all buffers in list until inputq is empty
2481   * Note: may be called in multiple threads referring to the same queue
2482   */
tipc_sk_rcv(struct net * net,struct sk_buff_head * inputq)2483  void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
2484  {
2485  	struct sk_buff_head xmitq;
2486  	u32 dnode, dport = 0;
2487  	int err;
2488  	struct tipc_sock *tsk;
2489  	struct sock *sk;
2490  	struct sk_buff *skb;
2491  
2492  	__skb_queue_head_init(&xmitq);
2493  	while (skb_queue_len(inputq)) {
2494  		dport = tipc_skb_peek_port(inputq, dport);
2495  		tsk = tipc_sk_lookup(net, dport);
2496  
2497  		if (likely(tsk)) {
2498  			sk = &tsk->sk;
2499  			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
2500  				tipc_sk_enqueue(inputq, sk, dport, &xmitq);
2501  				spin_unlock_bh(&sk->sk_lock.slock);
2502  			}
2503  			/* Send pending response/rejected messages, if any */
2504  			tipc_node_distr_xmit(sock_net(sk), &xmitq);
2505  			sock_put(sk);
2506  			continue;
2507  		}
2508  		/* No destination socket => dequeue skb if still there */
2509  		skb = tipc_skb_dequeue(inputq, dport);
2510  		if (!skb)
2511  			return;
2512  
2513  		/* Try secondary lookup if unresolved named message */
2514  		err = TIPC_ERR_NO_PORT;
2515  		if (tipc_msg_lookup_dest(net, skb, &err))
2516  			goto xmit;
2517  
2518  		/* Prepare for message rejection */
2519  		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
2520  			continue;
2521  
2522  		trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
2523  xmit:
2524  		dnode = msg_destnode(buf_msg(skb));
2525  		tipc_node_xmit_skb(net, skb, dnode, dport);
2526  	}
2527  }
2528  
tipc_wait_for_connect(struct socket * sock,long * timeo_p)2529  static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
2530  {
2531  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
2532  	struct sock *sk = sock->sk;
2533  	int done;
2534  
2535  	do {
2536  		int err = sock_error(sk);
2537  		if (err)
2538  			return err;
2539  		if (!*timeo_p)
2540  			return -ETIMEDOUT;
2541  		if (signal_pending(current))
2542  			return sock_intr_errno(*timeo_p);
2543  		if (sk->sk_state == TIPC_DISCONNECTING)
2544  			break;
2545  
2546  		add_wait_queue(sk_sleep(sk), &wait);
2547  		done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
2548  				     &wait);
2549  		remove_wait_queue(sk_sleep(sk), &wait);
2550  	} while (!done);
2551  	return 0;
2552  }
2553  
tipc_sockaddr_is_sane(struct sockaddr_tipc * addr)2554  static bool tipc_sockaddr_is_sane(struct sockaddr_tipc *addr)
2555  {
2556  	if (addr->family != AF_TIPC)
2557  		return false;
2558  	if (addr->addrtype == TIPC_SERVICE_RANGE)
2559  		return (addr->addr.nameseq.lower <= addr->addr.nameseq.upper);
2560  	return (addr->addrtype == TIPC_SERVICE_ADDR ||
2561  		addr->addrtype == TIPC_SOCKET_ADDR);
2562  }
2563  
2564  /**
2565   * tipc_connect - establish a connection to another TIPC port
2566   * @sock: socket structure
2567   * @dest: socket address for destination port
2568   * @destlen: size of socket address data structure
2569   * @flags: file-related flags associated with socket
2570   *
2571   * Return: 0 on success, errno otherwise
2572   */
tipc_connect(struct socket * sock,struct sockaddr * dest,int destlen,int flags)2573  static int tipc_connect(struct socket *sock, struct sockaddr *dest,
2574  			int destlen, int flags)
2575  {
2576  	struct sock *sk = sock->sk;
2577  	struct tipc_sock *tsk = tipc_sk(sk);
2578  	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
2579  	struct msghdr m = {NULL,};
2580  	long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
2581  	int previous;
2582  	int res = 0;
2583  
2584  	if (destlen != sizeof(struct sockaddr_tipc))
2585  		return -EINVAL;
2586  
2587  	lock_sock(sk);
2588  
2589  	if (tsk->group) {
2590  		res = -EINVAL;
2591  		goto exit;
2592  	}
2593  
2594  	if (dst->family == AF_UNSPEC) {
2595  		memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
2596  		if (!tipc_sk_type_connectionless(sk))
2597  			res = -EINVAL;
2598  		goto exit;
2599  	}
2600  	if (!tipc_sockaddr_is_sane(dst)) {
2601  		res = -EINVAL;
2602  		goto exit;
2603  	}
2604  	/* DGRAM/RDM connect(), just save the destaddr */
2605  	if (tipc_sk_type_connectionless(sk)) {
2606  		memcpy(&tsk->peer, dest, destlen);
2607  		goto exit;
2608  	} else if (dst->addrtype == TIPC_SERVICE_RANGE) {
2609  		res = -EINVAL;
2610  		goto exit;
2611  	}
2612  
2613  	previous = sk->sk_state;
2614  
2615  	switch (sk->sk_state) {
2616  	case TIPC_OPEN:
2617  		/* Send a 'SYN-' to destination */
2618  		m.msg_name = dest;
2619  		m.msg_namelen = destlen;
2620  		iov_iter_kvec(&m.msg_iter, ITER_SOURCE, NULL, 0, 0);
2621  
2622  		/* If connect is in non-blocking case, set MSG_DONTWAIT to
2623  		 * indicate send_msg() is never blocked.
2624  		 */
2625  		if (!timeout)
2626  			m.msg_flags = MSG_DONTWAIT;
2627  
2628  		res = __tipc_sendmsg(sock, &m, 0);
2629  		if ((res < 0) && (res != -EWOULDBLOCK))
2630  			goto exit;
2631  
2632  		/* Just entered TIPC_CONNECTING state; the only
2633  		 * difference is that return value in non-blocking
2634  		 * case is EINPROGRESS, rather than EALREADY.
2635  		 */
2636  		res = -EINPROGRESS;
2637  		fallthrough;
2638  	case TIPC_CONNECTING:
2639  		if (!timeout) {
2640  			if (previous == TIPC_CONNECTING)
2641  				res = -EALREADY;
2642  			goto exit;
2643  		}
2644  		timeout = msecs_to_jiffies(timeout);
2645  		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
2646  		res = tipc_wait_for_connect(sock, &timeout);
2647  		break;
2648  	case TIPC_ESTABLISHED:
2649  		res = -EISCONN;
2650  		break;
2651  	default:
2652  		res = -EINVAL;
2653  	}
2654  
2655  exit:
2656  	release_sock(sk);
2657  	return res;
2658  }
2659  
2660  /**
2661   * tipc_listen - allow socket to listen for incoming connections
2662   * @sock: socket structure
2663   * @len: (unused)
2664   *
2665   * Return: 0 on success, errno otherwise
2666   */
tipc_listen(struct socket * sock,int len)2667  static int tipc_listen(struct socket *sock, int len)
2668  {
2669  	struct sock *sk = sock->sk;
2670  	int res;
2671  
2672  	lock_sock(sk);
2673  	res = tipc_set_sk_state(sk, TIPC_LISTEN);
2674  	release_sock(sk);
2675  
2676  	return res;
2677  }
2678  
tipc_wait_for_accept(struct socket * sock,long timeo)2679  static int tipc_wait_for_accept(struct socket *sock, long timeo)
2680  {
2681  	struct sock *sk = sock->sk;
2682  	DEFINE_WAIT_FUNC(wait, woken_wake_function);
2683  	int err;
2684  
2685  	/* True wake-one mechanism for incoming connections: only
2686  	 * one process gets woken up, not the 'whole herd'.
2687  	 * Since we do not 'race & poll' for established sockets
2688  	 * anymore, the common case will execute the loop only once.
2689  	*/
2690  	for (;;) {
2691  		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2692  			add_wait_queue(sk_sleep(sk), &wait);
2693  			release_sock(sk);
2694  			timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
2695  			lock_sock(sk);
2696  			remove_wait_queue(sk_sleep(sk), &wait);
2697  		}
2698  		err = 0;
2699  		if (!skb_queue_empty(&sk->sk_receive_queue))
2700  			break;
2701  		err = -EAGAIN;
2702  		if (!timeo)
2703  			break;
2704  		err = sock_intr_errno(timeo);
2705  		if (signal_pending(current))
2706  			break;
2707  	}
2708  	return err;
2709  }
2710  
2711  /**
2712   * tipc_accept - wait for connection request
2713   * @sock: listening socket
2714   * @new_sock: new socket that is to be connected
2715   * @flags: file-related flags associated with socket
2716   * @kern: caused by kernel or by userspace?
2717   *
2718   * Return: 0 on success, errno otherwise
2719   */
tipc_accept(struct socket * sock,struct socket * new_sock,int flags,bool kern)2720  static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
2721  		       bool kern)
2722  {
2723  	struct sock *new_sk, *sk = sock->sk;
2724  	struct tipc_sock *new_tsock;
2725  	struct msghdr m = {NULL,};
2726  	struct tipc_msg *msg;
2727  	struct sk_buff *buf;
2728  	long timeo;
2729  	int res;
2730  
2731  	lock_sock(sk);
2732  
2733  	if (sk->sk_state != TIPC_LISTEN) {
2734  		res = -EINVAL;
2735  		goto exit;
2736  	}
2737  	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2738  	res = tipc_wait_for_accept(sock, timeo);
2739  	if (res)
2740  		goto exit;
2741  
2742  	buf = skb_peek(&sk->sk_receive_queue);
2743  
2744  	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
2745  	if (res)
2746  		goto exit;
2747  	security_sk_clone(sock->sk, new_sock->sk);
2748  
2749  	new_sk = new_sock->sk;
2750  	new_tsock = tipc_sk(new_sk);
2751  	msg = buf_msg(buf);
2752  
2753  	/* we lock on new_sk; but lockdep sees the lock on sk */
2754  	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2755  
2756  	/*
2757  	 * Reject any stray messages received by new socket
2758  	 * before the socket lock was taken (very, very unlikely)
2759  	 */
2760  	tsk_rej_rx_queue(new_sk, TIPC_ERR_NO_PORT);
2761  
2762  	/* Connect new socket to it's peer */
2763  	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2764  
2765  	tsk_set_importance(new_sk, msg_importance(msg));
2766  	if (msg_named(msg)) {
2767  		new_tsock->conn_addrtype = TIPC_SERVICE_ADDR;
2768  		msg_set_nametype(&new_tsock->phdr, msg_nametype(msg));
2769  		msg_set_nameinst(&new_tsock->phdr, msg_nameinst(msg));
2770  	}
2771  
2772  	/*
2773  	 * Respond to 'SYN-' by discarding it & returning 'ACK'.
2774  	 * Respond to 'SYN+' by queuing it on new socket & returning 'ACK'.
2775  	 */
2776  	if (!msg_data_sz(msg)) {
2777  		tsk_advance_rx_queue(sk);
2778  	} else {
2779  		__skb_dequeue(&sk->sk_receive_queue);
2780  		__skb_queue_head(&new_sk->sk_receive_queue, buf);
2781  		skb_set_owner_r(buf, new_sk);
2782  	}
2783  	iov_iter_kvec(&m.msg_iter, ITER_SOURCE, NULL, 0, 0);
2784  	__tipc_sendstream(new_sock, &m, 0);
2785  	release_sock(new_sk);
2786  exit:
2787  	release_sock(sk);
2788  	return res;
2789  }
2790  
2791  /**
2792   * tipc_shutdown - shutdown socket connection
2793   * @sock: socket structure
2794   * @how: direction to close (must be SHUT_RDWR)
2795   *
2796   * Terminates connection (if necessary), then purges socket's receive queue.
2797   *
2798   * Return: 0 on success, errno otherwise
2799   */
tipc_shutdown(struct socket * sock,int how)2800  static int tipc_shutdown(struct socket *sock, int how)
2801  {
2802  	struct sock *sk = sock->sk;
2803  	int res;
2804  
2805  	if (how != SHUT_RDWR)
2806  		return -EINVAL;
2807  
2808  	lock_sock(sk);
2809  
2810  	trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
2811  	__tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
2812  	sk->sk_shutdown = SHUTDOWN_MASK;
2813  
2814  	if (sk->sk_state == TIPC_DISCONNECTING) {
2815  		/* Discard any unreceived messages */
2816  		__skb_queue_purge(&sk->sk_receive_queue);
2817  
2818  		res = 0;
2819  	} else {
2820  		res = -ENOTCONN;
2821  	}
2822  	/* Wake up anyone sleeping in poll. */
2823  	sk->sk_state_change(sk);
2824  
2825  	release_sock(sk);
2826  	return res;
2827  }
2828  
tipc_sk_check_probing_state(struct sock * sk,struct sk_buff_head * list)2829  static void tipc_sk_check_probing_state(struct sock *sk,
2830  					struct sk_buff_head *list)
2831  {
2832  	struct tipc_sock *tsk = tipc_sk(sk);
2833  	u32 pnode = tsk_peer_node(tsk);
2834  	u32 pport = tsk_peer_port(tsk);
2835  	u32 self = tsk_own_node(tsk);
2836  	u32 oport = tsk->portid;
2837  	struct sk_buff *skb;
2838  
2839  	if (tsk->probe_unacked) {
2840  		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2841  		sk->sk_err = ECONNABORTED;
2842  		tipc_node_remove_conn(sock_net(sk), pnode, pport);
2843  		sk->sk_state_change(sk);
2844  		return;
2845  	}
2846  	/* Prepare new probe */
2847  	skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2848  			      pnode, self, pport, oport, TIPC_OK);
2849  	if (skb)
2850  		__skb_queue_tail(list, skb);
2851  	tsk->probe_unacked = true;
2852  	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2853  }
2854  
tipc_sk_retry_connect(struct sock * sk,struct sk_buff_head * list)2855  static void tipc_sk_retry_connect(struct sock *sk, struct sk_buff_head *list)
2856  {
2857  	struct tipc_sock *tsk = tipc_sk(sk);
2858  
2859  	/* Try again later if dest link is congested */
2860  	if (tsk->cong_link_cnt) {
2861  		sk_reset_timer(sk, &sk->sk_timer,
2862  			       jiffies + msecs_to_jiffies(100));
2863  		return;
2864  	}
2865  	/* Prepare SYN for retransmit */
2866  	tipc_msg_skb_clone(&sk->sk_write_queue, list);
2867  }
2868  
tipc_sk_timeout(struct timer_list * t)2869  static void tipc_sk_timeout(struct timer_list *t)
2870  {
2871  	struct sock *sk = from_timer(sk, t, sk_timer);
2872  	struct tipc_sock *tsk = tipc_sk(sk);
2873  	u32 pnode = tsk_peer_node(tsk);
2874  	struct sk_buff_head list;
2875  	int rc = 0;
2876  
2877  	__skb_queue_head_init(&list);
2878  	bh_lock_sock(sk);
2879  
2880  	/* Try again later if socket is busy */
2881  	if (sock_owned_by_user(sk)) {
2882  		sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2883  		bh_unlock_sock(sk);
2884  		sock_put(sk);
2885  		return;
2886  	}
2887  
2888  	if (sk->sk_state == TIPC_ESTABLISHED)
2889  		tipc_sk_check_probing_state(sk, &list);
2890  	else if (sk->sk_state == TIPC_CONNECTING)
2891  		tipc_sk_retry_connect(sk, &list);
2892  
2893  	bh_unlock_sock(sk);
2894  
2895  	if (!skb_queue_empty(&list))
2896  		rc = tipc_node_xmit(sock_net(sk), &list, pnode, tsk->portid);
2897  
2898  	/* SYN messages may cause link congestion */
2899  	if (rc == -ELINKCONG) {
2900  		tipc_dest_push(&tsk->cong_links, pnode, 0);
2901  		tsk->cong_link_cnt = 1;
2902  	}
2903  	sock_put(sk);
2904  }
2905  
tipc_sk_publish(struct tipc_sock * tsk,struct tipc_uaddr * ua)2906  static int tipc_sk_publish(struct tipc_sock *tsk, struct tipc_uaddr *ua)
2907  {
2908  	struct sock *sk = &tsk->sk;
2909  	struct net *net = sock_net(sk);
2910  	struct tipc_socket_addr skaddr;
2911  	struct publication *p;
2912  	u32 key;
2913  
2914  	if (tipc_sk_connected(sk))
2915  		return -EINVAL;
2916  	key = tsk->portid + tsk->pub_count + 1;
2917  	if (key == tsk->portid)
2918  		return -EADDRINUSE;
2919  	skaddr.ref = tsk->portid;
2920  	skaddr.node = tipc_own_addr(net);
2921  	p = tipc_nametbl_publish(net, ua, &skaddr, key);
2922  	if (unlikely(!p))
2923  		return -EINVAL;
2924  
2925  	list_add(&p->binding_sock, &tsk->publications);
2926  	tsk->pub_count++;
2927  	tsk->published = true;
2928  	return 0;
2929  }
2930  
tipc_sk_withdraw(struct tipc_sock * tsk,struct tipc_uaddr * ua)2931  static int tipc_sk_withdraw(struct tipc_sock *tsk, struct tipc_uaddr *ua)
2932  {
2933  	struct net *net = sock_net(&tsk->sk);
2934  	struct publication *safe, *p;
2935  	struct tipc_uaddr _ua;
2936  	int rc = -EINVAL;
2937  
2938  	list_for_each_entry_safe(p, safe, &tsk->publications, binding_sock) {
2939  		if (!ua) {
2940  			tipc_uaddr(&_ua, TIPC_SERVICE_RANGE, p->scope,
2941  				   p->sr.type, p->sr.lower, p->sr.upper);
2942  			tipc_nametbl_withdraw(net, &_ua, &p->sk, p->key);
2943  			continue;
2944  		}
2945  		/* Unbind specific publication */
2946  		if (p->scope != ua->scope)
2947  			continue;
2948  		if (p->sr.type != ua->sr.type)
2949  			continue;
2950  		if (p->sr.lower != ua->sr.lower)
2951  			continue;
2952  		if (p->sr.upper != ua->sr.upper)
2953  			break;
2954  		tipc_nametbl_withdraw(net, ua, &p->sk, p->key);
2955  		rc = 0;
2956  		break;
2957  	}
2958  	if (list_empty(&tsk->publications)) {
2959  		tsk->published = 0;
2960  		rc = 0;
2961  	}
2962  	return rc;
2963  }
2964  
2965  /* tipc_sk_reinit: set non-zero address in all existing sockets
2966   *                 when we go from standalone to network mode.
2967   */
tipc_sk_reinit(struct net * net)2968  void tipc_sk_reinit(struct net *net)
2969  {
2970  	struct tipc_net *tn = net_generic(net, tipc_net_id);
2971  	struct rhashtable_iter iter;
2972  	struct tipc_sock *tsk;
2973  	struct tipc_msg *msg;
2974  
2975  	rhashtable_walk_enter(&tn->sk_rht, &iter);
2976  
2977  	do {
2978  		rhashtable_walk_start(&iter);
2979  
2980  		while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2981  			sock_hold(&tsk->sk);
2982  			rhashtable_walk_stop(&iter);
2983  			lock_sock(&tsk->sk);
2984  			msg = &tsk->phdr;
2985  			msg_set_prevnode(msg, tipc_own_addr(net));
2986  			msg_set_orignode(msg, tipc_own_addr(net));
2987  			release_sock(&tsk->sk);
2988  			rhashtable_walk_start(&iter);
2989  			sock_put(&tsk->sk);
2990  		}
2991  
2992  		rhashtable_walk_stop(&iter);
2993  	} while (tsk == ERR_PTR(-EAGAIN));
2994  
2995  	rhashtable_walk_exit(&iter);
2996  }
2997  
tipc_sk_lookup(struct net * net,u32 portid)2998  static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2999  {
3000  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3001  	struct tipc_sock *tsk;
3002  
3003  	rcu_read_lock();
3004  	tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
3005  	if (tsk)
3006  		sock_hold(&tsk->sk);
3007  	rcu_read_unlock();
3008  
3009  	return tsk;
3010  }
3011  
tipc_sk_insert(struct tipc_sock * tsk)3012  static int tipc_sk_insert(struct tipc_sock *tsk)
3013  {
3014  	struct sock *sk = &tsk->sk;
3015  	struct net *net = sock_net(sk);
3016  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3017  	u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
3018  	u32 portid = get_random_u32_below(remaining) + TIPC_MIN_PORT;
3019  
3020  	while (remaining--) {
3021  		portid++;
3022  		if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
3023  			portid = TIPC_MIN_PORT;
3024  		tsk->portid = portid;
3025  		sock_hold(&tsk->sk);
3026  		if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
3027  						   tsk_rht_params))
3028  			return 0;
3029  		sock_put(&tsk->sk);
3030  	}
3031  
3032  	return -1;
3033  }
3034  
tipc_sk_remove(struct tipc_sock * tsk)3035  static void tipc_sk_remove(struct tipc_sock *tsk)
3036  {
3037  	struct sock *sk = &tsk->sk;
3038  	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
3039  
3040  	if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
3041  		WARN_ON(refcount_read(&sk->sk_refcnt) == 1);
3042  		__sock_put(sk);
3043  	}
3044  }
3045  
3046  static const struct rhashtable_params tsk_rht_params = {
3047  	.nelem_hint = 192,
3048  	.head_offset = offsetof(struct tipc_sock, node),
3049  	.key_offset = offsetof(struct tipc_sock, portid),
3050  	.key_len = sizeof(u32), /* portid */
3051  	.max_size = 1048576,
3052  	.min_size = 256,
3053  	.automatic_shrinking = true,
3054  };
3055  
tipc_sk_rht_init(struct net * net)3056  int tipc_sk_rht_init(struct net *net)
3057  {
3058  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3059  
3060  	return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
3061  }
3062  
tipc_sk_rht_destroy(struct net * net)3063  void tipc_sk_rht_destroy(struct net *net)
3064  {
3065  	struct tipc_net *tn = net_generic(net, tipc_net_id);
3066  
3067  	/* Wait for socket readers to complete */
3068  	synchronize_net();
3069  
3070  	rhashtable_destroy(&tn->sk_rht);
3071  }
3072  
tipc_sk_join(struct tipc_sock * tsk,struct tipc_group_req * mreq)3073  static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
3074  {
3075  	struct net *net = sock_net(&tsk->sk);
3076  	struct tipc_group *grp = tsk->group;
3077  	struct tipc_msg *hdr = &tsk->phdr;
3078  	struct tipc_uaddr ua;
3079  	int rc;
3080  
3081  	if (mreq->type < TIPC_RESERVED_TYPES)
3082  		return -EACCES;
3083  	if (mreq->scope > TIPC_NODE_SCOPE)
3084  		return -EINVAL;
3085  	if (mreq->scope != TIPC_NODE_SCOPE)
3086  		mreq->scope = TIPC_CLUSTER_SCOPE;
3087  	if (grp)
3088  		return -EACCES;
3089  	grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);
3090  	if (!grp)
3091  		return -ENOMEM;
3092  	tsk->group = grp;
3093  	msg_set_lookup_scope(hdr, mreq->scope);
3094  	msg_set_nametype(hdr, mreq->type);
3095  	msg_set_dest_droppable(hdr, true);
3096  	tipc_uaddr(&ua, TIPC_SERVICE_RANGE, mreq->scope,
3097  		   mreq->type, mreq->instance, mreq->instance);
3098  	tipc_nametbl_build_group(net, grp, &ua);
3099  	rc = tipc_sk_publish(tsk, &ua);
3100  	if (rc) {
3101  		tipc_group_delete(net, grp);
3102  		tsk->group = NULL;
3103  		return rc;
3104  	}
3105  	/* Eliminate any risk that a broadcast overtakes sent JOINs */
3106  	tsk->mc_method.rcast = true;
3107  	tsk->mc_method.mandatory = true;
3108  	tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
3109  	return rc;
3110  }
3111  
tipc_sk_leave(struct tipc_sock * tsk)3112  static int tipc_sk_leave(struct tipc_sock *tsk)
3113  {
3114  	struct net *net = sock_net(&tsk->sk);
3115  	struct tipc_group *grp = tsk->group;
3116  	struct tipc_uaddr ua;
3117  	int scope;
3118  
3119  	if (!grp)
3120  		return -EINVAL;
3121  	ua.addrtype = TIPC_SERVICE_RANGE;
3122  	tipc_group_self(grp, &ua.sr, &scope);
3123  	ua.scope = scope;
3124  	tipc_group_delete(net, grp);
3125  	tsk->group = NULL;
3126  	tipc_sk_withdraw(tsk, &ua);
3127  	return 0;
3128  }
3129  
3130  /**
3131   * tipc_setsockopt - set socket option
3132   * @sock: socket structure
3133   * @lvl: option level
3134   * @opt: option identifier
3135   * @ov: pointer to new option value
3136   * @ol: length of option value
3137   *
3138   * For stream sockets only, accepts and ignores all IPPROTO_TCP options
3139   * (to ease compatibility).
3140   *
3141   * Return: 0 on success, errno otherwise
3142   */
tipc_setsockopt(struct socket * sock,int lvl,int opt,sockptr_t ov,unsigned int ol)3143  static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
3144  			   sockptr_t ov, unsigned int ol)
3145  {
3146  	struct sock *sk = sock->sk;
3147  	struct tipc_sock *tsk = tipc_sk(sk);
3148  	struct tipc_group_req mreq;
3149  	u32 value = 0;
3150  	int res = 0;
3151  
3152  	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3153  		return 0;
3154  	if (lvl != SOL_TIPC)
3155  		return -ENOPROTOOPT;
3156  
3157  	switch (opt) {
3158  	case TIPC_IMPORTANCE:
3159  	case TIPC_SRC_DROPPABLE:
3160  	case TIPC_DEST_DROPPABLE:
3161  	case TIPC_CONN_TIMEOUT:
3162  	case TIPC_NODELAY:
3163  		if (ol < sizeof(value))
3164  			return -EINVAL;
3165  		if (copy_from_sockptr(&value, ov, sizeof(u32)))
3166  			return -EFAULT;
3167  		break;
3168  	case TIPC_GROUP_JOIN:
3169  		if (ol < sizeof(mreq))
3170  			return -EINVAL;
3171  		if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
3172  			return -EFAULT;
3173  		break;
3174  	default:
3175  		if (!sockptr_is_null(ov) || ol)
3176  			return -EINVAL;
3177  	}
3178  
3179  	lock_sock(sk);
3180  
3181  	switch (opt) {
3182  	case TIPC_IMPORTANCE:
3183  		res = tsk_set_importance(sk, value);
3184  		break;
3185  	case TIPC_SRC_DROPPABLE:
3186  		if (sock->type != SOCK_STREAM)
3187  			tsk_set_unreliable(tsk, value);
3188  		else
3189  			res = -ENOPROTOOPT;
3190  		break;
3191  	case TIPC_DEST_DROPPABLE:
3192  		tsk_set_unreturnable(tsk, value);
3193  		break;
3194  	case TIPC_CONN_TIMEOUT:
3195  		tipc_sk(sk)->conn_timeout = value;
3196  		break;
3197  	case TIPC_MCAST_BROADCAST:
3198  		tsk->mc_method.rcast = false;
3199  		tsk->mc_method.mandatory = true;
3200  		break;
3201  	case TIPC_MCAST_REPLICAST:
3202  		tsk->mc_method.rcast = true;
3203  		tsk->mc_method.mandatory = true;
3204  		break;
3205  	case TIPC_GROUP_JOIN:
3206  		res = tipc_sk_join(tsk, &mreq);
3207  		break;
3208  	case TIPC_GROUP_LEAVE:
3209  		res = tipc_sk_leave(tsk);
3210  		break;
3211  	case TIPC_NODELAY:
3212  		tsk->nodelay = !!value;
3213  		tsk_set_nagle(tsk);
3214  		break;
3215  	default:
3216  		res = -EINVAL;
3217  	}
3218  
3219  	release_sock(sk);
3220  
3221  	return res;
3222  }
3223  
3224  /**
3225   * tipc_getsockopt - get socket option
3226   * @sock: socket structure
3227   * @lvl: option level
3228   * @opt: option identifier
3229   * @ov: receptacle for option value
3230   * @ol: receptacle for length of option value
3231   *
3232   * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
3233   * (to ease compatibility).
3234   *
3235   * Return: 0 on success, errno otherwise
3236   */
tipc_getsockopt(struct socket * sock,int lvl,int opt,char __user * ov,int __user * ol)3237  static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
3238  			   char __user *ov, int __user *ol)
3239  {
3240  	struct sock *sk = sock->sk;
3241  	struct tipc_sock *tsk = tipc_sk(sk);
3242  	struct tipc_service_range seq;
3243  	int len, scope;
3244  	u32 value;
3245  	int res;
3246  
3247  	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3248  		return put_user(0, ol);
3249  	if (lvl != SOL_TIPC)
3250  		return -ENOPROTOOPT;
3251  	res = get_user(len, ol);
3252  	if (res)
3253  		return res;
3254  
3255  	lock_sock(sk);
3256  
3257  	switch (opt) {
3258  	case TIPC_IMPORTANCE:
3259  		value = tsk_importance(tsk);
3260  		break;
3261  	case TIPC_SRC_DROPPABLE:
3262  		value = tsk_unreliable(tsk);
3263  		break;
3264  	case TIPC_DEST_DROPPABLE:
3265  		value = tsk_unreturnable(tsk);
3266  		break;
3267  	case TIPC_CONN_TIMEOUT:
3268  		value = tsk->conn_timeout;
3269  		/* no need to set "res", since already 0 at this point */
3270  		break;
3271  	case TIPC_NODE_RECVQ_DEPTH:
3272  		value = 0; /* was tipc_queue_size, now obsolete */
3273  		break;
3274  	case TIPC_SOCK_RECVQ_DEPTH:
3275  		value = skb_queue_len(&sk->sk_receive_queue);
3276  		break;
3277  	case TIPC_SOCK_RECVQ_USED:
3278  		value = sk_rmem_alloc_get(sk);
3279  		break;
3280  	case TIPC_GROUP_JOIN:
3281  		seq.type = 0;
3282  		if (tsk->group)
3283  			tipc_group_self(tsk->group, &seq, &scope);
3284  		value = seq.type;
3285  		break;
3286  	default:
3287  		res = -EINVAL;
3288  	}
3289  
3290  	release_sock(sk);
3291  
3292  	if (res)
3293  		return res;	/* "get" failed */
3294  
3295  	if (len < sizeof(value))
3296  		return -EINVAL;
3297  
3298  	if (copy_to_user(ov, &value, sizeof(value)))
3299  		return -EFAULT;
3300  
3301  	return put_user(sizeof(value), ol);
3302  }
3303  
tipc_ioctl(struct socket * sock,unsigned int cmd,unsigned long arg)3304  static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
3305  {
3306  	struct net *net = sock_net(sock->sk);
3307  	struct tipc_sioc_nodeid_req nr = {0};
3308  	struct tipc_sioc_ln_req lnr;
3309  	void __user *argp = (void __user *)arg;
3310  
3311  	switch (cmd) {
3312  	case SIOCGETLINKNAME:
3313  		if (copy_from_user(&lnr, argp, sizeof(lnr)))
3314  			return -EFAULT;
3315  		if (!tipc_node_get_linkname(net,
3316  					    lnr.bearer_id & 0xffff, lnr.peer,
3317  					    lnr.linkname, TIPC_MAX_LINK_NAME)) {
3318  			if (copy_to_user(argp, &lnr, sizeof(lnr)))
3319  				return -EFAULT;
3320  			return 0;
3321  		}
3322  		return -EADDRNOTAVAIL;
3323  	case SIOCGETNODEID:
3324  		if (copy_from_user(&nr, argp, sizeof(nr)))
3325  			return -EFAULT;
3326  		if (!tipc_node_get_id(net, nr.peer, nr.node_id))
3327  			return -EADDRNOTAVAIL;
3328  		if (copy_to_user(argp, &nr, sizeof(nr)))
3329  			return -EFAULT;
3330  		return 0;
3331  	default:
3332  		return -ENOIOCTLCMD;
3333  	}
3334  }
3335  
tipc_socketpair(struct socket * sock1,struct socket * sock2)3336  static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
3337  {
3338  	struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
3339  	struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
3340  	u32 onode = tipc_own_addr(sock_net(sock1->sk));
3341  
3342  	tsk1->peer.family = AF_TIPC;
3343  	tsk1->peer.addrtype = TIPC_SOCKET_ADDR;
3344  	tsk1->peer.scope = TIPC_NODE_SCOPE;
3345  	tsk1->peer.addr.id.ref = tsk2->portid;
3346  	tsk1->peer.addr.id.node = onode;
3347  	tsk2->peer.family = AF_TIPC;
3348  	tsk2->peer.addrtype = TIPC_SOCKET_ADDR;
3349  	tsk2->peer.scope = TIPC_NODE_SCOPE;
3350  	tsk2->peer.addr.id.ref = tsk1->portid;
3351  	tsk2->peer.addr.id.node = onode;
3352  
3353  	tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
3354  	tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
3355  	return 0;
3356  }
3357  
3358  /* Protocol switches for the various types of TIPC sockets */
3359  
3360  static const struct proto_ops msg_ops = {
3361  	.owner		= THIS_MODULE,
3362  	.family		= AF_TIPC,
3363  	.release	= tipc_release,
3364  	.bind		= tipc_bind,
3365  	.connect	= tipc_connect,
3366  	.socketpair	= tipc_socketpair,
3367  	.accept		= sock_no_accept,
3368  	.getname	= tipc_getname,
3369  	.poll		= tipc_poll,
3370  	.ioctl		= tipc_ioctl,
3371  	.listen		= sock_no_listen,
3372  	.shutdown	= tipc_shutdown,
3373  	.setsockopt	= tipc_setsockopt,
3374  	.getsockopt	= tipc_getsockopt,
3375  	.sendmsg	= tipc_sendmsg,
3376  	.recvmsg	= tipc_recvmsg,
3377  	.mmap		= sock_no_mmap,
3378  };
3379  
3380  static const struct proto_ops packet_ops = {
3381  	.owner		= THIS_MODULE,
3382  	.family		= AF_TIPC,
3383  	.release	= tipc_release,
3384  	.bind		= tipc_bind,
3385  	.connect	= tipc_connect,
3386  	.socketpair	= tipc_socketpair,
3387  	.accept		= tipc_accept,
3388  	.getname	= tipc_getname,
3389  	.poll		= tipc_poll,
3390  	.ioctl		= tipc_ioctl,
3391  	.listen		= tipc_listen,
3392  	.shutdown	= tipc_shutdown,
3393  	.setsockopt	= tipc_setsockopt,
3394  	.getsockopt	= tipc_getsockopt,
3395  	.sendmsg	= tipc_send_packet,
3396  	.recvmsg	= tipc_recvmsg,
3397  	.mmap		= sock_no_mmap,
3398  };
3399  
3400  static const struct proto_ops stream_ops = {
3401  	.owner		= THIS_MODULE,
3402  	.family		= AF_TIPC,
3403  	.release	= tipc_release,
3404  	.bind		= tipc_bind,
3405  	.connect	= tipc_connect,
3406  	.socketpair	= tipc_socketpair,
3407  	.accept		= tipc_accept,
3408  	.getname	= tipc_getname,
3409  	.poll		= tipc_poll,
3410  	.ioctl		= tipc_ioctl,
3411  	.listen		= tipc_listen,
3412  	.shutdown	= tipc_shutdown,
3413  	.setsockopt	= tipc_setsockopt,
3414  	.getsockopt	= tipc_getsockopt,
3415  	.sendmsg	= tipc_sendstream,
3416  	.recvmsg	= tipc_recvstream,
3417  	.mmap		= sock_no_mmap,
3418  };
3419  
3420  static const struct net_proto_family tipc_family_ops = {
3421  	.owner		= THIS_MODULE,
3422  	.family		= AF_TIPC,
3423  	.create		= tipc_sk_create
3424  };
3425  
3426  static struct proto tipc_proto = {
3427  	.name		= "TIPC",
3428  	.owner		= THIS_MODULE,
3429  	.obj_size	= sizeof(struct tipc_sock),
3430  	.sysctl_rmem	= sysctl_tipc_rmem
3431  };
3432  
3433  /**
3434   * tipc_socket_init - initialize TIPC socket interface
3435   *
3436   * Return: 0 on success, errno otherwise
3437   */
tipc_socket_init(void)3438  int tipc_socket_init(void)
3439  {
3440  	int res;
3441  
3442  	res = proto_register(&tipc_proto, 1);
3443  	if (res) {
3444  		pr_err("Failed to register TIPC protocol type\n");
3445  		goto out;
3446  	}
3447  
3448  	res = sock_register(&tipc_family_ops);
3449  	if (res) {
3450  		pr_err("Failed to register TIPC socket type\n");
3451  		proto_unregister(&tipc_proto);
3452  		goto out;
3453  	}
3454   out:
3455  	return res;
3456  }
3457  
3458  /**
3459   * tipc_socket_stop - stop TIPC socket interface
3460   */
tipc_socket_stop(void)3461  void tipc_socket_stop(void)
3462  {
3463  	sock_unregister(tipc_family_ops.family);
3464  	proto_unregister(&tipc_proto);
3465  }
3466  
3467  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_add_sk_con(struct sk_buff * skb,struct tipc_sock * tsk)3468  static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
3469  {
3470  	u32 peer_node, peer_port;
3471  	u32 conn_type, conn_instance;
3472  	struct nlattr *nest;
3473  
3474  	peer_node = tsk_peer_node(tsk);
3475  	peer_port = tsk_peer_port(tsk);
3476  	conn_type = msg_nametype(&tsk->phdr);
3477  	conn_instance = msg_nameinst(&tsk->phdr);
3478  	nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON);
3479  	if (!nest)
3480  		return -EMSGSIZE;
3481  
3482  	if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
3483  		goto msg_full;
3484  	if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
3485  		goto msg_full;
3486  
3487  	if (tsk->conn_addrtype != 0) {
3488  		if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
3489  			goto msg_full;
3490  		if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, conn_type))
3491  			goto msg_full;
3492  		if (nla_put_u32(skb, TIPC_NLA_CON_INST, conn_instance))
3493  			goto msg_full;
3494  	}
3495  	nla_nest_end(skb, nest);
3496  
3497  	return 0;
3498  
3499  msg_full:
3500  	nla_nest_cancel(skb, nest);
3501  
3502  	return -EMSGSIZE;
3503  }
3504  
__tipc_nl_add_sk_info(struct sk_buff * skb,struct tipc_sock * tsk)3505  static int __tipc_nl_add_sk_info(struct sk_buff *skb, struct tipc_sock
3506  			  *tsk)
3507  {
3508  	struct net *net = sock_net(skb->sk);
3509  	struct sock *sk = &tsk->sk;
3510  
3511  	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid) ||
3512  	    nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr(net)))
3513  		return -EMSGSIZE;
3514  
3515  	if (tipc_sk_connected(sk)) {
3516  		if (__tipc_nl_add_sk_con(skb, tsk))
3517  			return -EMSGSIZE;
3518  	} else if (!list_empty(&tsk->publications)) {
3519  		if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
3520  			return -EMSGSIZE;
3521  	}
3522  	return 0;
3523  }
3524  
3525  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_add_sk(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk)3526  static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
3527  			    struct tipc_sock *tsk)
3528  {
3529  	struct nlattr *attrs;
3530  	void *hdr;
3531  
3532  	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3533  			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
3534  	if (!hdr)
3535  		goto msg_cancel;
3536  
3537  	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3538  	if (!attrs)
3539  		goto genlmsg_cancel;
3540  
3541  	if (__tipc_nl_add_sk_info(skb, tsk))
3542  		goto attr_msg_cancel;
3543  
3544  	nla_nest_end(skb, attrs);
3545  	genlmsg_end(skb, hdr);
3546  
3547  	return 0;
3548  
3549  attr_msg_cancel:
3550  	nla_nest_cancel(skb, attrs);
3551  genlmsg_cancel:
3552  	genlmsg_cancel(skb, hdr);
3553  msg_cancel:
3554  	return -EMSGSIZE;
3555  }
3556  
tipc_nl_sk_walk(struct sk_buff * skb,struct netlink_callback * cb,int (* skb_handler)(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk))3557  int tipc_nl_sk_walk(struct sk_buff *skb, struct netlink_callback *cb,
3558  		    int (*skb_handler)(struct sk_buff *skb,
3559  				       struct netlink_callback *cb,
3560  				       struct tipc_sock *tsk))
3561  {
3562  	struct rhashtable_iter *iter = (void *)cb->args[4];
3563  	struct tipc_sock *tsk;
3564  	int err;
3565  
3566  	rhashtable_walk_start(iter);
3567  	while ((tsk = rhashtable_walk_next(iter)) != NULL) {
3568  		if (IS_ERR(tsk)) {
3569  			err = PTR_ERR(tsk);
3570  			if (err == -EAGAIN) {
3571  				err = 0;
3572  				continue;
3573  			}
3574  			break;
3575  		}
3576  
3577  		sock_hold(&tsk->sk);
3578  		rhashtable_walk_stop(iter);
3579  		lock_sock(&tsk->sk);
3580  		err = skb_handler(skb, cb, tsk);
3581  		if (err) {
3582  			release_sock(&tsk->sk);
3583  			sock_put(&tsk->sk);
3584  			goto out;
3585  		}
3586  		release_sock(&tsk->sk);
3587  		rhashtable_walk_start(iter);
3588  		sock_put(&tsk->sk);
3589  	}
3590  	rhashtable_walk_stop(iter);
3591  out:
3592  	return skb->len;
3593  }
3594  EXPORT_SYMBOL(tipc_nl_sk_walk);
3595  
tipc_dump_start(struct netlink_callback * cb)3596  int tipc_dump_start(struct netlink_callback *cb)
3597  {
3598  	return __tipc_dump_start(cb, sock_net(cb->skb->sk));
3599  }
3600  EXPORT_SYMBOL(tipc_dump_start);
3601  
__tipc_dump_start(struct netlink_callback * cb,struct net * net)3602  int __tipc_dump_start(struct netlink_callback *cb, struct net *net)
3603  {
3604  	/* tipc_nl_name_table_dump() uses cb->args[0...3]. */
3605  	struct rhashtable_iter *iter = (void *)cb->args[4];
3606  	struct tipc_net *tn = tipc_net(net);
3607  
3608  	if (!iter) {
3609  		iter = kmalloc(sizeof(*iter), GFP_KERNEL);
3610  		if (!iter)
3611  			return -ENOMEM;
3612  
3613  		cb->args[4] = (long)iter;
3614  	}
3615  
3616  	rhashtable_walk_enter(&tn->sk_rht, iter);
3617  	return 0;
3618  }
3619  
tipc_dump_done(struct netlink_callback * cb)3620  int tipc_dump_done(struct netlink_callback *cb)
3621  {
3622  	struct rhashtable_iter *hti = (void *)cb->args[4];
3623  
3624  	rhashtable_walk_exit(hti);
3625  	kfree(hti);
3626  	return 0;
3627  }
3628  EXPORT_SYMBOL(tipc_dump_done);
3629  
tipc_sk_fill_sock_diag(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk,u32 sk_filter_state,u64 (* tipc_diag_gen_cookie)(struct sock * sk))3630  int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb,
3631  			   struct tipc_sock *tsk, u32 sk_filter_state,
3632  			   u64 (*tipc_diag_gen_cookie)(struct sock *sk))
3633  {
3634  	struct sock *sk = &tsk->sk;
3635  	struct nlattr *attrs;
3636  	struct nlattr *stat;
3637  
3638  	/*filter response w.r.t sk_state*/
3639  	if (!(sk_filter_state & (1 << sk->sk_state)))
3640  		return 0;
3641  
3642  	attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3643  	if (!attrs)
3644  		goto msg_cancel;
3645  
3646  	if (__tipc_nl_add_sk_info(skb, tsk))
3647  		goto attr_msg_cancel;
3648  
3649  	if (nla_put_u32(skb, TIPC_NLA_SOCK_TYPE, (u32)sk->sk_type) ||
3650  	    nla_put_u32(skb, TIPC_NLA_SOCK_TIPC_STATE, (u32)sk->sk_state) ||
3651  	    nla_put_u32(skb, TIPC_NLA_SOCK_INO, sock_i_ino(sk)) ||
3652  	    nla_put_u32(skb, TIPC_NLA_SOCK_UID,
3653  			from_kuid_munged(sk_user_ns(NETLINK_CB(cb->skb).sk),
3654  					 sock_i_uid(sk))) ||
3655  	    nla_put_u64_64bit(skb, TIPC_NLA_SOCK_COOKIE,
3656  			      tipc_diag_gen_cookie(sk),
3657  			      TIPC_NLA_SOCK_PAD))
3658  		goto attr_msg_cancel;
3659  
3660  	stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT);
3661  	if (!stat)
3662  		goto attr_msg_cancel;
3663  
3664  	if (nla_put_u32(skb, TIPC_NLA_SOCK_STAT_RCVQ,
3665  			skb_queue_len(&sk->sk_receive_queue)) ||
3666  	    nla_put_u32(skb, TIPC_NLA_SOCK_STAT_SENDQ,
3667  			skb_queue_len(&sk->sk_write_queue)) ||
3668  	    nla_put_u32(skb, TIPC_NLA_SOCK_STAT_DROP,
3669  			atomic_read(&sk->sk_drops)))
3670  		goto stat_msg_cancel;
3671  
3672  	if (tsk->cong_link_cnt &&
3673  	    nla_put_flag(skb, TIPC_NLA_SOCK_STAT_LINK_CONG))
3674  		goto stat_msg_cancel;
3675  
3676  	if (tsk_conn_cong(tsk) &&
3677  	    nla_put_flag(skb, TIPC_NLA_SOCK_STAT_CONN_CONG))
3678  		goto stat_msg_cancel;
3679  
3680  	nla_nest_end(skb, stat);
3681  
3682  	if (tsk->group)
3683  		if (tipc_group_fill_sock_diag(tsk->group, skb))
3684  			goto stat_msg_cancel;
3685  
3686  	nla_nest_end(skb, attrs);
3687  
3688  	return 0;
3689  
3690  stat_msg_cancel:
3691  	nla_nest_cancel(skb, stat);
3692  attr_msg_cancel:
3693  	nla_nest_cancel(skb, attrs);
3694  msg_cancel:
3695  	return -EMSGSIZE;
3696  }
3697  EXPORT_SYMBOL(tipc_sk_fill_sock_diag);
3698  
tipc_nl_sk_dump(struct sk_buff * skb,struct netlink_callback * cb)3699  int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
3700  {
3701  	return tipc_nl_sk_walk(skb, cb, __tipc_nl_add_sk);
3702  }
3703  
3704  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_add_sk_publ(struct sk_buff * skb,struct netlink_callback * cb,struct publication * publ)3705  static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
3706  				 struct netlink_callback *cb,
3707  				 struct publication *publ)
3708  {
3709  	void *hdr;
3710  	struct nlattr *attrs;
3711  
3712  	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3713  			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
3714  	if (!hdr)
3715  		goto msg_cancel;
3716  
3717  	attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL);
3718  	if (!attrs)
3719  		goto genlmsg_cancel;
3720  
3721  	if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
3722  		goto attr_msg_cancel;
3723  	if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->sr.type))
3724  		goto attr_msg_cancel;
3725  	if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->sr.lower))
3726  		goto attr_msg_cancel;
3727  	if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->sr.upper))
3728  		goto attr_msg_cancel;
3729  
3730  	nla_nest_end(skb, attrs);
3731  	genlmsg_end(skb, hdr);
3732  
3733  	return 0;
3734  
3735  attr_msg_cancel:
3736  	nla_nest_cancel(skb, attrs);
3737  genlmsg_cancel:
3738  	genlmsg_cancel(skb, hdr);
3739  msg_cancel:
3740  	return -EMSGSIZE;
3741  }
3742  
3743  /* Caller should hold socket lock for the passed tipc socket. */
__tipc_nl_list_sk_publ(struct sk_buff * skb,struct netlink_callback * cb,struct tipc_sock * tsk,u32 * last_publ)3744  static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
3745  				  struct netlink_callback *cb,
3746  				  struct tipc_sock *tsk, u32 *last_publ)
3747  {
3748  	int err;
3749  	struct publication *p;
3750  
3751  	if (*last_publ) {
3752  		list_for_each_entry(p, &tsk->publications, binding_sock) {
3753  			if (p->key == *last_publ)
3754  				break;
3755  		}
3756  		if (list_entry_is_head(p, &tsk->publications, binding_sock)) {
3757  			/* We never set seq or call nl_dump_check_consistent()
3758  			 * this means that setting prev_seq here will cause the
3759  			 * consistence check to fail in the netlink callback
3760  			 * handler. Resulting in the last NLMSG_DONE message
3761  			 * having the NLM_F_DUMP_INTR flag set.
3762  			 */
3763  			cb->prev_seq = 1;
3764  			*last_publ = 0;
3765  			return -EPIPE;
3766  		}
3767  	} else {
3768  		p = list_first_entry(&tsk->publications, struct publication,
3769  				     binding_sock);
3770  	}
3771  
3772  	list_for_each_entry_from(p, &tsk->publications, binding_sock) {
3773  		err = __tipc_nl_add_sk_publ(skb, cb, p);
3774  		if (err) {
3775  			*last_publ = p->key;
3776  			return err;
3777  		}
3778  	}
3779  	*last_publ = 0;
3780  
3781  	return 0;
3782  }
3783  
tipc_nl_publ_dump(struct sk_buff * skb,struct netlink_callback * cb)3784  int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
3785  {
3786  	int err;
3787  	u32 tsk_portid = cb->args[0];
3788  	u32 last_publ = cb->args[1];
3789  	u32 done = cb->args[2];
3790  	struct net *net = sock_net(skb->sk);
3791  	struct tipc_sock *tsk;
3792  
3793  	if (!tsk_portid) {
3794  		struct nlattr **attrs = genl_dumpit_info(cb)->info.attrs;
3795  		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
3796  
3797  		if (!attrs[TIPC_NLA_SOCK])
3798  			return -EINVAL;
3799  
3800  		err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX,
3801  						  attrs[TIPC_NLA_SOCK],
3802  						  tipc_nl_sock_policy, NULL);
3803  		if (err)
3804  			return err;
3805  
3806  		if (!sock[TIPC_NLA_SOCK_REF])
3807  			return -EINVAL;
3808  
3809  		tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
3810  	}
3811  
3812  	if (done)
3813  		return 0;
3814  
3815  	tsk = tipc_sk_lookup(net, tsk_portid);
3816  	if (!tsk)
3817  		return -EINVAL;
3818  
3819  	lock_sock(&tsk->sk);
3820  	err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
3821  	if (!err)
3822  		done = 1;
3823  	release_sock(&tsk->sk);
3824  	sock_put(&tsk->sk);
3825  
3826  	cb->args[0] = tsk_portid;
3827  	cb->args[1] = last_publ;
3828  	cb->args[2] = done;
3829  
3830  	return skb->len;
3831  }
3832  
3833  /**
3834   * tipc_sk_filtering - check if a socket should be traced
3835   * @sk: the socket to be examined
3836   *
3837   * @sysctl_tipc_sk_filter is used as the socket tuple for filtering:
3838   * (portid, sock type, name type, name lower, name upper)
3839   *
3840   * Return: true if the socket meets the socket tuple data
3841   * (value 0 = 'any') or when there is no tuple set (all = 0),
3842   * otherwise false
3843   */
tipc_sk_filtering(struct sock * sk)3844  bool tipc_sk_filtering(struct sock *sk)
3845  {
3846  	struct tipc_sock *tsk;
3847  	struct publication *p;
3848  	u32 _port, _sktype, _type, _lower, _upper;
3849  	u32 type = 0, lower = 0, upper = 0;
3850  
3851  	if (!sk)
3852  		return true;
3853  
3854  	tsk = tipc_sk(sk);
3855  
3856  	_port = sysctl_tipc_sk_filter[0];
3857  	_sktype = sysctl_tipc_sk_filter[1];
3858  	_type = sysctl_tipc_sk_filter[2];
3859  	_lower = sysctl_tipc_sk_filter[3];
3860  	_upper = sysctl_tipc_sk_filter[4];
3861  
3862  	if (!_port && !_sktype && !_type && !_lower && !_upper)
3863  		return true;
3864  
3865  	if (_port)
3866  		return (_port == tsk->portid);
3867  
3868  	if (_sktype && _sktype != sk->sk_type)
3869  		return false;
3870  
3871  	if (tsk->published) {
3872  		p = list_first_entry_or_null(&tsk->publications,
3873  					     struct publication, binding_sock);
3874  		if (p) {
3875  			type = p->sr.type;
3876  			lower = p->sr.lower;
3877  			upper = p->sr.upper;
3878  		}
3879  	}
3880  
3881  	if (!tipc_sk_type_connectionless(sk)) {
3882  		type = msg_nametype(&tsk->phdr);
3883  		lower = msg_nameinst(&tsk->phdr);
3884  		upper = lower;
3885  	}
3886  
3887  	if ((_type && _type != type) || (_lower && _lower != lower) ||
3888  	    (_upper && _upper != upper))
3889  		return false;
3890  
3891  	return true;
3892  }
3893  
tipc_sock_get_portid(struct sock * sk)3894  u32 tipc_sock_get_portid(struct sock *sk)
3895  {
3896  	return (sk) ? (tipc_sk(sk))->portid : 0;
3897  }
3898  
3899  /**
3900   * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
3901   *			both the rcv and backlog queues are considered
3902   * @sk: tipc sk to be checked
3903   * @skb: tipc msg to be checked
3904   *
3905   * Return: true if the socket rx queue allocation is > 90%, otherwise false
3906   */
3907  
tipc_sk_overlimit1(struct sock * sk,struct sk_buff * skb)3908  bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
3909  {
3910  	atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
3911  	unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
3912  	unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
3913  
3914  	return (qsize > lim * 90 / 100);
3915  }
3916  
3917  /**
3918   * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
3919   *			only the rcv queue is considered
3920   * @sk: tipc sk to be checked
3921   * @skb: tipc msg to be checked
3922   *
3923   * Return: true if the socket rx queue allocation is > 90%, otherwise false
3924   */
3925  
tipc_sk_overlimit2(struct sock * sk,struct sk_buff * skb)3926  bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
3927  {
3928  	unsigned int lim = rcvbuf_limit(sk, skb);
3929  	unsigned int qsize = sk_rmem_alloc_get(sk);
3930  
3931  	return (qsize > lim * 90 / 100);
3932  }
3933  
3934  /**
3935   * tipc_sk_dump - dump TIPC socket
3936   * @sk: tipc sk to be dumped
3937   * @dqueues: bitmask to decide if any socket queue to be dumped?
3938   *           - TIPC_DUMP_NONE: don't dump socket queues
3939   *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
3940   *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
3941   *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
3942   *           - TIPC_DUMP_ALL: dump all the socket queues above
3943   * @buf: returned buffer of dump data in format
3944   */
tipc_sk_dump(struct sock * sk,u16 dqueues,char * buf)3945  int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
3946  {
3947  	int i = 0;
3948  	size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
3949  	u32 conn_type, conn_instance;
3950  	struct tipc_sock *tsk;
3951  	struct publication *p;
3952  	bool tsk_connected;
3953  
3954  	if (!sk) {
3955  		i += scnprintf(buf, sz, "sk data: (null)\n");
3956  		return i;
3957  	}
3958  
3959  	tsk = tipc_sk(sk);
3960  	tsk_connected = !tipc_sk_type_connectionless(sk);
3961  
3962  	i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
3963  	i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
3964  	i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
3965  	i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
3966  	i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
3967  	if (tsk_connected) {
3968  		i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
3969  		i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
3970  		conn_type = msg_nametype(&tsk->phdr);
3971  		conn_instance = msg_nameinst(&tsk->phdr);
3972  		i += scnprintf(buf + i, sz - i, " %u", conn_type);
3973  		i += scnprintf(buf + i, sz - i, " %u", conn_instance);
3974  	}
3975  	i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
3976  	if (tsk->published) {
3977  		p = list_first_entry_or_null(&tsk->publications,
3978  					     struct publication, binding_sock);
3979  		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.type : 0);
3980  		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.lower : 0);
3981  		i += scnprintf(buf + i, sz - i, " %u", (p) ? p->sr.upper : 0);
3982  	}
3983  	i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
3984  	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
3985  	i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
3986  	i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
3987  	i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
3988  	i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
3989  	i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
3990  	i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
3991  	i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
3992  	i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
3993  	i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
3994  	i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
3995  	i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
3996  	i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len));
3997  
3998  	if (dqueues & TIPC_DUMP_SK_SNDQ) {
3999  		i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
4000  		i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
4001  	}
4002  
4003  	if (dqueues & TIPC_DUMP_SK_RCVQ) {
4004  		i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
4005  		i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
4006  	}
4007  
4008  	if (dqueues & TIPC_DUMP_SK_BKLGQ) {
4009  		i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
4010  		i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
4011  		if (sk->sk_backlog.tail != sk->sk_backlog.head) {
4012  			i += scnprintf(buf + i, sz - i, "  tail ");
4013  			i += tipc_skb_dump(sk->sk_backlog.tail, false,
4014  					   buf + i);
4015  		}
4016  	}
4017  
4018  	return i;
4019  }
4020