xref: /openbmc/linux/net/tipc/node.c (revision 48c926cd)
1  /*
2   * net/tipc/node.c: TIPC node management routines
3   *
4   * Copyright (c) 2000-2006, 2012-2016, Ericsson AB
5   * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
6   * All rights reserved.
7   *
8   * Redistribution and use in source and binary forms, with or without
9   * modification, are permitted provided that the following conditions are met:
10   *
11   * 1. Redistributions of source code must retain the above copyright
12   *    notice, this list of conditions and the following disclaimer.
13   * 2. Redistributions in binary form must reproduce the above copyright
14   *    notice, this list of conditions and the following disclaimer in the
15   *    documentation and/or other materials provided with the distribution.
16   * 3. Neither the names of the copyright holders nor the names of its
17   *    contributors may be used to endorse or promote products derived from
18   *    this software without specific prior written permission.
19   *
20   * Alternatively, this software may be distributed under the terms of the
21   * GNU General Public License ("GPL") version 2 as published by the Free
22   * Software Foundation.
23   *
24   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34   * POSSIBILITY OF SUCH DAMAGE.
35   */
36  
37  #include "core.h"
38  #include "link.h"
39  #include "node.h"
40  #include "name_distr.h"
41  #include "socket.h"
42  #include "bcast.h"
43  #include "monitor.h"
44  #include "discover.h"
45  #include "netlink.h"
46  
47  #define INVALID_NODE_SIG	0x10000
48  
49  /* Flags used to take different actions according to flag type
50   * TIPC_NOTIFY_NODE_DOWN: notify node is down
51   * TIPC_NOTIFY_NODE_UP: notify node is up
52   * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
53   */
54  enum {
55  	TIPC_NOTIFY_NODE_DOWN		= (1 << 3),
56  	TIPC_NOTIFY_NODE_UP		= (1 << 4),
57  	TIPC_NOTIFY_LINK_UP		= (1 << 6),
58  	TIPC_NOTIFY_LINK_DOWN		= (1 << 7)
59  };
60  
61  struct tipc_link_entry {
62  	struct tipc_link *link;
63  	spinlock_t lock; /* per link */
64  	u32 mtu;
65  	struct sk_buff_head inputq;
66  	struct tipc_media_addr maddr;
67  };
68  
69  struct tipc_bclink_entry {
70  	struct tipc_link *link;
71  	struct sk_buff_head inputq1;
72  	struct sk_buff_head arrvq;
73  	struct sk_buff_head inputq2;
74  	struct sk_buff_head namedq;
75  };
76  
77  /**
78   * struct tipc_node - TIPC node structure
79   * @addr: network address of node
80   * @ref: reference counter to node object
81   * @lock: rwlock governing access to structure
82   * @net: the applicable net namespace
83   * @hash: links to adjacent nodes in unsorted hash chain
84   * @inputq: pointer to input queue containing messages for msg event
85   * @namedq: pointer to name table input queue with name table messages
86   * @active_links: bearer ids of active links, used as index into links[] array
87   * @links: array containing references to all links to node
88   * @action_flags: bit mask of different types of node actions
89   * @state: connectivity state vs peer node
90   * @sync_point: sequence number where synch/failover is finished
91   * @list: links to adjacent nodes in sorted list of cluster's nodes
92   * @working_links: number of working links to node (both active and standby)
93   * @link_cnt: number of links to node
94   * @capabilities: bitmap, indicating peer node's functional capabilities
95   * @signature: node instance identifier
96   * @link_id: local and remote bearer ids of changing link, if any
97   * @publ_list: list of publications
98   * @rcu: rcu struct for tipc_node
99   */
100  struct tipc_node {
101  	u32 addr;
102  	struct kref kref;
103  	rwlock_t lock;
104  	struct net *net;
105  	struct hlist_node hash;
106  	int active_links[2];
107  	struct tipc_link_entry links[MAX_BEARERS];
108  	struct tipc_bclink_entry bc_entry;
109  	int action_flags;
110  	struct list_head list;
111  	int state;
112  	u16 sync_point;
113  	int link_cnt;
114  	u16 working_links;
115  	u16 capabilities;
116  	u32 signature;
117  	u32 link_id;
118  	struct list_head publ_list;
119  	struct list_head conn_sks;
120  	unsigned long keepalive_intv;
121  	struct timer_list timer;
122  	struct rcu_head rcu;
123  };
124  
125  /* Node FSM states and events:
126   */
127  enum {
128  	SELF_DOWN_PEER_DOWN    = 0xdd,
129  	SELF_UP_PEER_UP        = 0xaa,
130  	SELF_DOWN_PEER_LEAVING = 0xd1,
131  	SELF_UP_PEER_COMING    = 0xac,
132  	SELF_COMING_PEER_UP    = 0xca,
133  	SELF_LEAVING_PEER_DOWN = 0x1d,
134  	NODE_FAILINGOVER       = 0xf0,
135  	NODE_SYNCHING          = 0xcc
136  };
137  
138  enum {
139  	SELF_ESTABL_CONTACT_EVT = 0xece,
140  	SELF_LOST_CONTACT_EVT   = 0x1ce,
141  	PEER_ESTABL_CONTACT_EVT = 0x9ece,
142  	PEER_LOST_CONTACT_EVT   = 0x91ce,
143  	NODE_FAILOVER_BEGIN_EVT = 0xfbe,
144  	NODE_FAILOVER_END_EVT   = 0xfee,
145  	NODE_SYNCH_BEGIN_EVT    = 0xcbe,
146  	NODE_SYNCH_END_EVT      = 0xcee
147  };
148  
149  static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
150  				  struct sk_buff_head *xmitq,
151  				  struct tipc_media_addr **maddr);
152  static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
153  				bool delete);
154  static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
155  static void tipc_node_delete(struct tipc_node *node);
156  static void tipc_node_timeout(unsigned long data);
157  static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
158  static struct tipc_node *tipc_node_find(struct net *net, u32 addr);
159  static void tipc_node_put(struct tipc_node *node);
160  static bool tipc_node_is_up(struct tipc_node *n);
161  
162  struct tipc_sock_conn {
163  	u32 port;
164  	u32 peer_port;
165  	u32 peer_node;
166  	struct list_head list;
167  };
168  
169  static struct tipc_link *node_active_link(struct tipc_node *n, int sel)
170  {
171  	int bearer_id = n->active_links[sel & 1];
172  
173  	if (unlikely(bearer_id == INVALID_BEARER_ID))
174  		return NULL;
175  
176  	return n->links[bearer_id].link;
177  }
178  
179  int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
180  {
181  	struct tipc_node *n;
182  	int bearer_id;
183  	unsigned int mtu = MAX_MSG_SIZE;
184  
185  	n = tipc_node_find(net, addr);
186  	if (unlikely(!n))
187  		return mtu;
188  
189  	bearer_id = n->active_links[sel & 1];
190  	if (likely(bearer_id != INVALID_BEARER_ID))
191  		mtu = n->links[bearer_id].mtu;
192  	tipc_node_put(n);
193  	return mtu;
194  }
195  
196  u16 tipc_node_get_capabilities(struct net *net, u32 addr)
197  {
198  	struct tipc_node *n;
199  	u16 caps;
200  
201  	n = tipc_node_find(net, addr);
202  	if (unlikely(!n))
203  		return TIPC_NODE_CAPABILITIES;
204  	caps = n->capabilities;
205  	tipc_node_put(n);
206  	return caps;
207  }
208  
209  static void tipc_node_kref_release(struct kref *kref)
210  {
211  	struct tipc_node *n = container_of(kref, struct tipc_node, kref);
212  
213  	kfree(n->bc_entry.link);
214  	kfree_rcu(n, rcu);
215  }
216  
217  static void tipc_node_put(struct tipc_node *node)
218  {
219  	kref_put(&node->kref, tipc_node_kref_release);
220  }
221  
222  static void tipc_node_get(struct tipc_node *node)
223  {
224  	kref_get(&node->kref);
225  }
226  
227  /*
228   * tipc_node_find - locate specified node object, if it exists
229   */
230  static struct tipc_node *tipc_node_find(struct net *net, u32 addr)
231  {
232  	struct tipc_net *tn = tipc_net(net);
233  	struct tipc_node *node;
234  	unsigned int thash = tipc_hashfn(addr);
235  
236  	if (unlikely(!in_own_cluster_exact(net, addr)))
237  		return NULL;
238  
239  	rcu_read_lock();
240  	hlist_for_each_entry_rcu(node, &tn->node_htable[thash], hash) {
241  		if (node->addr != addr)
242  			continue;
243  		if (!kref_get_unless_zero(&node->kref))
244  			node = NULL;
245  		break;
246  	}
247  	rcu_read_unlock();
248  	return node;
249  }
250  
251  static void tipc_node_read_lock(struct tipc_node *n)
252  {
253  	read_lock_bh(&n->lock);
254  }
255  
256  static void tipc_node_read_unlock(struct tipc_node *n)
257  {
258  	read_unlock_bh(&n->lock);
259  }
260  
261  static void tipc_node_write_lock(struct tipc_node *n)
262  {
263  	write_lock_bh(&n->lock);
264  }
265  
266  static void tipc_node_write_unlock_fast(struct tipc_node *n)
267  {
268  	write_unlock_bh(&n->lock);
269  }
270  
271  static void tipc_node_write_unlock(struct tipc_node *n)
272  {
273  	struct net *net = n->net;
274  	u32 addr = 0;
275  	u32 flags = n->action_flags;
276  	u32 link_id = 0;
277  	u32 bearer_id;
278  	struct list_head *publ_list;
279  
280  	if (likely(!flags)) {
281  		write_unlock_bh(&n->lock);
282  		return;
283  	}
284  
285  	addr = n->addr;
286  	link_id = n->link_id;
287  	bearer_id = link_id & 0xffff;
288  	publ_list = &n->publ_list;
289  
290  	n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
291  			     TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
292  
293  	write_unlock_bh(&n->lock);
294  
295  	if (flags & TIPC_NOTIFY_NODE_DOWN)
296  		tipc_publ_notify(net, publ_list, addr);
297  
298  	if (flags & TIPC_NOTIFY_NODE_UP)
299  		tipc_named_node_up(net, addr);
300  
301  	if (flags & TIPC_NOTIFY_LINK_UP) {
302  		tipc_mon_peer_up(net, addr, bearer_id);
303  		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
304  				     TIPC_NODE_SCOPE, link_id, addr);
305  	}
306  	if (flags & TIPC_NOTIFY_LINK_DOWN) {
307  		tipc_mon_peer_down(net, addr, bearer_id);
308  		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
309  				      link_id, addr);
310  	}
311  }
312  
313  struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
314  {
315  	struct tipc_net *tn = net_generic(net, tipc_net_id);
316  	struct tipc_node *n, *temp_node;
317  	int i;
318  
319  	spin_lock_bh(&tn->node_list_lock);
320  	n = tipc_node_find(net, addr);
321  	if (n) {
322  		/* Same node may come back with new capabilities */
323  		n->capabilities = capabilities;
324  		goto exit;
325  	}
326  	n = kzalloc(sizeof(*n), GFP_ATOMIC);
327  	if (!n) {
328  		pr_warn("Node creation failed, no memory\n");
329  		goto exit;
330  	}
331  	n->addr = addr;
332  	n->net = net;
333  	n->capabilities = capabilities;
334  	kref_init(&n->kref);
335  	rwlock_init(&n->lock);
336  	INIT_HLIST_NODE(&n->hash);
337  	INIT_LIST_HEAD(&n->list);
338  	INIT_LIST_HEAD(&n->publ_list);
339  	INIT_LIST_HEAD(&n->conn_sks);
340  	skb_queue_head_init(&n->bc_entry.namedq);
341  	skb_queue_head_init(&n->bc_entry.inputq1);
342  	__skb_queue_head_init(&n->bc_entry.arrvq);
343  	skb_queue_head_init(&n->bc_entry.inputq2);
344  	for (i = 0; i < MAX_BEARERS; i++)
345  		spin_lock_init(&n->links[i].lock);
346  	n->state = SELF_DOWN_PEER_LEAVING;
347  	n->signature = INVALID_NODE_SIG;
348  	n->active_links[0] = INVALID_BEARER_ID;
349  	n->active_links[1] = INVALID_BEARER_ID;
350  	if (!tipc_link_bc_create(net, tipc_own_addr(net), n->addr,
351  				 U16_MAX,
352  				 tipc_link_window(tipc_bc_sndlink(net)),
353  				 n->capabilities,
354  				 &n->bc_entry.inputq1,
355  				 &n->bc_entry.namedq,
356  				 tipc_bc_sndlink(net),
357  				 &n->bc_entry.link)) {
358  		pr_warn("Broadcast rcv link creation failed, no memory\n");
359  		kfree(n);
360  		n = NULL;
361  		goto exit;
362  	}
363  	tipc_node_get(n);
364  	setup_timer(&n->timer, tipc_node_timeout, (unsigned long)n);
365  	n->keepalive_intv = U32_MAX;
366  	hlist_add_head_rcu(&n->hash, &tn->node_htable[tipc_hashfn(addr)]);
367  	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
368  		if (n->addr < temp_node->addr)
369  			break;
370  	}
371  	list_add_tail_rcu(&n->list, &temp_node->list);
372  exit:
373  	spin_unlock_bh(&tn->node_list_lock);
374  	return n;
375  }
376  
377  static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
378  {
379  	unsigned long tol = tipc_link_tolerance(l);
380  	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
381  
382  	/* Link with lowest tolerance determines timer interval */
383  	if (intv < n->keepalive_intv)
384  		n->keepalive_intv = intv;
385  
386  	/* Ensure link's abort limit corresponds to current tolerance */
387  	tipc_link_set_abort_limit(l, tol / n->keepalive_intv);
388  }
389  
390  static void tipc_node_delete(struct tipc_node *node)
391  {
392  	list_del_rcu(&node->list);
393  	hlist_del_rcu(&node->hash);
394  	tipc_node_put(node);
395  
396  	del_timer_sync(&node->timer);
397  	tipc_node_put(node);
398  }
399  
400  void tipc_node_stop(struct net *net)
401  {
402  	struct tipc_net *tn = tipc_net(net);
403  	struct tipc_node *node, *t_node;
404  
405  	spin_lock_bh(&tn->node_list_lock);
406  	list_for_each_entry_safe(node, t_node, &tn->node_list, list)
407  		tipc_node_delete(node);
408  	spin_unlock_bh(&tn->node_list_lock);
409  }
410  
411  void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
412  {
413  	struct tipc_node *n;
414  
415  	if (in_own_node(net, addr))
416  		return;
417  
418  	n = tipc_node_find(net, addr);
419  	if (!n) {
420  		pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
421  		return;
422  	}
423  	tipc_node_write_lock(n);
424  	list_add_tail(subscr, &n->publ_list);
425  	tipc_node_write_unlock_fast(n);
426  	tipc_node_put(n);
427  }
428  
429  void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
430  {
431  	struct tipc_node *n;
432  
433  	if (in_own_node(net, addr))
434  		return;
435  
436  	n = tipc_node_find(net, addr);
437  	if (!n) {
438  		pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
439  		return;
440  	}
441  	tipc_node_write_lock(n);
442  	list_del_init(subscr);
443  	tipc_node_write_unlock_fast(n);
444  	tipc_node_put(n);
445  }
446  
447  int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
448  {
449  	struct tipc_node *node;
450  	struct tipc_sock_conn *conn;
451  	int err = 0;
452  
453  	if (in_own_node(net, dnode))
454  		return 0;
455  
456  	node = tipc_node_find(net, dnode);
457  	if (!node) {
458  		pr_warn("Connecting sock to node 0x%x failed\n", dnode);
459  		return -EHOSTUNREACH;
460  	}
461  	conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
462  	if (!conn) {
463  		err = -EHOSTUNREACH;
464  		goto exit;
465  	}
466  	conn->peer_node = dnode;
467  	conn->port = port;
468  	conn->peer_port = peer_port;
469  
470  	tipc_node_write_lock(node);
471  	list_add_tail(&conn->list, &node->conn_sks);
472  	tipc_node_write_unlock(node);
473  exit:
474  	tipc_node_put(node);
475  	return err;
476  }
477  
478  void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
479  {
480  	struct tipc_node *node;
481  	struct tipc_sock_conn *conn, *safe;
482  
483  	if (in_own_node(net, dnode))
484  		return;
485  
486  	node = tipc_node_find(net, dnode);
487  	if (!node)
488  		return;
489  
490  	tipc_node_write_lock(node);
491  	list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
492  		if (port != conn->port)
493  			continue;
494  		list_del(&conn->list);
495  		kfree(conn);
496  	}
497  	tipc_node_write_unlock(node);
498  	tipc_node_put(node);
499  }
500  
501  /* tipc_node_timeout - handle expiration of node timer
502   */
503  static void tipc_node_timeout(unsigned long data)
504  {
505  	struct tipc_node *n = (struct tipc_node *)data;
506  	struct tipc_link_entry *le;
507  	struct sk_buff_head xmitq;
508  	int bearer_id;
509  	int rc = 0;
510  
511  	__skb_queue_head_init(&xmitq);
512  
513  	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
514  		tipc_node_read_lock(n);
515  		le = &n->links[bearer_id];
516  		spin_lock_bh(&le->lock);
517  		if (le->link) {
518  			/* Link tolerance may change asynchronously: */
519  			tipc_node_calculate_timer(n, le->link);
520  			rc = tipc_link_timeout(le->link, &xmitq);
521  		}
522  		spin_unlock_bh(&le->lock);
523  		tipc_node_read_unlock(n);
524  		tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
525  		if (rc & TIPC_LINK_DOWN_EVT)
526  			tipc_node_link_down(n, bearer_id, false);
527  	}
528  	mod_timer(&n->timer, jiffies + msecs_to_jiffies(n->keepalive_intv));
529  }
530  
531  /**
532   * __tipc_node_link_up - handle addition of link
533   * Node lock must be held by caller
534   * Link becomes active (alone or shared) or standby, depending on its priority.
535   */
536  static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
537  				struct sk_buff_head *xmitq)
538  {
539  	int *slot0 = &n->active_links[0];
540  	int *slot1 = &n->active_links[1];
541  	struct tipc_link *ol = node_active_link(n, 0);
542  	struct tipc_link *nl = n->links[bearer_id].link;
543  
544  	if (!nl || tipc_link_is_up(nl))
545  		return;
546  
547  	tipc_link_fsm_evt(nl, LINK_ESTABLISH_EVT);
548  	if (!tipc_link_is_up(nl))
549  		return;
550  
551  	n->working_links++;
552  	n->action_flags |= TIPC_NOTIFY_LINK_UP;
553  	n->link_id = tipc_link_id(nl);
554  
555  	/* Leave room for tunnel header when returning 'mtu' to users: */
556  	n->links[bearer_id].mtu = tipc_link_mtu(nl) - INT_H_SIZE;
557  
558  	tipc_bearer_add_dest(n->net, bearer_id, n->addr);
559  	tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
560  
561  	pr_debug("Established link <%s> on network plane %c\n",
562  		 tipc_link_name(nl), tipc_link_plane(nl));
563  
564  	/* Ensure that a STATE message goes first */
565  	tipc_link_build_state_msg(nl, xmitq);
566  
567  	/* First link? => give it both slots */
568  	if (!ol) {
569  		*slot0 = bearer_id;
570  		*slot1 = bearer_id;
571  		tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
572  		n->action_flags |= TIPC_NOTIFY_NODE_UP;
573  		tipc_link_set_active(nl, true);
574  		tipc_bcast_add_peer(n->net, nl, xmitq);
575  		return;
576  	}
577  
578  	/* Second link => redistribute slots */
579  	if (tipc_link_prio(nl) > tipc_link_prio(ol)) {
580  		pr_debug("Old link <%s> becomes standby\n", tipc_link_name(ol));
581  		*slot0 = bearer_id;
582  		*slot1 = bearer_id;
583  		tipc_link_set_active(nl, true);
584  		tipc_link_set_active(ol, false);
585  	} else if (tipc_link_prio(nl) == tipc_link_prio(ol)) {
586  		tipc_link_set_active(nl, true);
587  		*slot1 = bearer_id;
588  	} else {
589  		pr_debug("New link <%s> is standby\n", tipc_link_name(nl));
590  	}
591  
592  	/* Prepare synchronization with first link */
593  	tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
594  }
595  
596  /**
597   * tipc_node_link_up - handle addition of link
598   *
599   * Link becomes active (alone or shared) or standby, depending on its priority.
600   */
601  static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
602  			      struct sk_buff_head *xmitq)
603  {
604  	struct tipc_media_addr *maddr;
605  
606  	tipc_node_write_lock(n);
607  	__tipc_node_link_up(n, bearer_id, xmitq);
608  	maddr = &n->links[bearer_id].maddr;
609  	tipc_bearer_xmit(n->net, bearer_id, xmitq, maddr);
610  	tipc_node_write_unlock(n);
611  }
612  
613  /**
614   * __tipc_node_link_down - handle loss of link
615   */
616  static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
617  				  struct sk_buff_head *xmitq,
618  				  struct tipc_media_addr **maddr)
619  {
620  	struct tipc_link_entry *le = &n->links[*bearer_id];
621  	int *slot0 = &n->active_links[0];
622  	int *slot1 = &n->active_links[1];
623  	int i, highest = 0, prio;
624  	struct tipc_link *l, *_l, *tnl;
625  
626  	l = n->links[*bearer_id].link;
627  	if (!l || tipc_link_is_reset(l))
628  		return;
629  
630  	n->working_links--;
631  	n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
632  	n->link_id = tipc_link_id(l);
633  
634  	tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
635  
636  	pr_debug("Lost link <%s> on network plane %c\n",
637  		 tipc_link_name(l), tipc_link_plane(l));
638  
639  	/* Select new active link if any available */
640  	*slot0 = INVALID_BEARER_ID;
641  	*slot1 = INVALID_BEARER_ID;
642  	for (i = 0; i < MAX_BEARERS; i++) {
643  		_l = n->links[i].link;
644  		if (!_l || !tipc_link_is_up(_l))
645  			continue;
646  		if (_l == l)
647  			continue;
648  		prio = tipc_link_prio(_l);
649  		if (prio < highest)
650  			continue;
651  		if (prio > highest) {
652  			highest = prio;
653  			*slot0 = i;
654  			*slot1 = i;
655  			continue;
656  		}
657  		*slot1 = i;
658  	}
659  
660  	if (!tipc_node_is_up(n)) {
661  		if (tipc_link_peer_is_down(l))
662  			tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
663  		tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT);
664  		tipc_link_fsm_evt(l, LINK_RESET_EVT);
665  		tipc_link_reset(l);
666  		tipc_link_build_reset_msg(l, xmitq);
667  		*maddr = &n->links[*bearer_id].maddr;
668  		node_lost_contact(n, &le->inputq);
669  		tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
670  		return;
671  	}
672  	tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
673  
674  	/* There is still a working link => initiate failover */
675  	*bearer_id = n->active_links[0];
676  	tnl = n->links[*bearer_id].link;
677  	tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
678  	tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
679  	n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1);
680  	tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
681  	tipc_link_reset(l);
682  	tipc_link_fsm_evt(l, LINK_RESET_EVT);
683  	tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
684  	tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
685  	*maddr = &n->links[*bearer_id].maddr;
686  }
687  
688  static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
689  {
690  	struct tipc_link_entry *le = &n->links[bearer_id];
691  	struct tipc_link *l = le->link;
692  	struct tipc_media_addr *maddr;
693  	struct sk_buff_head xmitq;
694  	int old_bearer_id = bearer_id;
695  
696  	if (!l)
697  		return;
698  
699  	__skb_queue_head_init(&xmitq);
700  
701  	tipc_node_write_lock(n);
702  	if (!tipc_link_is_establishing(l)) {
703  		__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
704  		if (delete) {
705  			kfree(l);
706  			le->link = NULL;
707  			n->link_cnt--;
708  		}
709  	} else {
710  		/* Defuse pending tipc_node_link_up() */
711  		tipc_link_fsm_evt(l, LINK_RESET_EVT);
712  	}
713  	tipc_node_write_unlock(n);
714  	if (delete)
715  		tipc_mon_remove_peer(n->net, n->addr, old_bearer_id);
716  	tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
717  	tipc_sk_rcv(n->net, &le->inputq);
718  }
719  
720  static bool tipc_node_is_up(struct tipc_node *n)
721  {
722  	return n->active_links[0] != INVALID_BEARER_ID;
723  }
724  
725  void tipc_node_check_dest(struct net *net, u32 onode,
726  			  struct tipc_bearer *b,
727  			  u16 capabilities, u32 signature,
728  			  struct tipc_media_addr *maddr,
729  			  bool *respond, bool *dupl_addr)
730  {
731  	struct tipc_node *n;
732  	struct tipc_link *l;
733  	struct tipc_link_entry *le;
734  	bool addr_match = false;
735  	bool sign_match = false;
736  	bool link_up = false;
737  	bool accept_addr = false;
738  	bool reset = true;
739  	char *if_name;
740  	unsigned long intv;
741  
742  	*dupl_addr = false;
743  	*respond = false;
744  
745  	n = tipc_node_create(net, onode, capabilities);
746  	if (!n)
747  		return;
748  
749  	tipc_node_write_lock(n);
750  
751  	le = &n->links[b->identity];
752  
753  	/* Prepare to validate requesting node's signature and media address */
754  	l = le->link;
755  	link_up = l && tipc_link_is_up(l);
756  	addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr));
757  	sign_match = (signature == n->signature);
758  
759  	/* These three flags give us eight permutations: */
760  
761  	if (sign_match && addr_match && link_up) {
762  		/* All is fine. Do nothing. */
763  		reset = false;
764  	} else if (sign_match && addr_match && !link_up) {
765  		/* Respond. The link will come up in due time */
766  		*respond = true;
767  	} else if (sign_match && !addr_match && link_up) {
768  		/* Peer has changed i/f address without rebooting.
769  		 * If so, the link will reset soon, and the next
770  		 * discovery will be accepted. So we can ignore it.
771  		 * It may also be an cloned or malicious peer having
772  		 * chosen the same node address and signature as an
773  		 * existing one.
774  		 * Ignore requests until the link goes down, if ever.
775  		 */
776  		*dupl_addr = true;
777  	} else if (sign_match && !addr_match && !link_up) {
778  		/* Peer link has changed i/f address without rebooting.
779  		 * It may also be a cloned or malicious peer; we can't
780  		 * distinguish between the two.
781  		 * The signature is correct, so we must accept.
782  		 */
783  		accept_addr = true;
784  		*respond = true;
785  	} else if (!sign_match && addr_match && link_up) {
786  		/* Peer node rebooted. Two possibilities:
787  		 *  - Delayed re-discovery; this link endpoint has already
788  		 *    reset and re-established contact with the peer, before
789  		 *    receiving a discovery message from that node.
790  		 *    (The peer happened to receive one from this node first).
791  		 *  - The peer came back so fast that our side has not
792  		 *    discovered it yet. Probing from this side will soon
793  		 *    reset the link, since there can be no working link
794  		 *    endpoint at the peer end, and the link will re-establish.
795  		 *  Accept the signature, since it comes from a known peer.
796  		 */
797  		n->signature = signature;
798  	} else if (!sign_match && addr_match && !link_up) {
799  		/*  The peer node has rebooted.
800  		 *  Accept signature, since it is a known peer.
801  		 */
802  		n->signature = signature;
803  		*respond = true;
804  	} else if (!sign_match && !addr_match && link_up) {
805  		/* Peer rebooted with new address, or a new/duplicate peer.
806  		 * Ignore until the link goes down, if ever.
807  		 */
808  		*dupl_addr = true;
809  	} else if (!sign_match && !addr_match && !link_up) {
810  		/* Peer rebooted with new address, or it is a new peer.
811  		 * Accept signature and address.
812  		 */
813  		n->signature = signature;
814  		accept_addr = true;
815  		*respond = true;
816  	}
817  
818  	if (!accept_addr)
819  		goto exit;
820  
821  	/* Now create new link if not already existing */
822  	if (!l) {
823  		if (n->link_cnt == 2) {
824  			pr_warn("Cannot establish 3rd link to %x\n", n->addr);
825  			goto exit;
826  		}
827  		if_name = strchr(b->name, ':') + 1;
828  		if (!tipc_link_create(net, if_name, b->identity, b->tolerance,
829  				      b->net_plane, b->mtu, b->priority,
830  				      b->window, mod(tipc_net(net)->random),
831  				      tipc_own_addr(net), onode,
832  				      n->capabilities,
833  				      tipc_bc_sndlink(n->net), n->bc_entry.link,
834  				      &le->inputq,
835  				      &n->bc_entry.namedq, &l)) {
836  			*respond = false;
837  			goto exit;
838  		}
839  		tipc_link_reset(l);
840  		tipc_link_fsm_evt(l, LINK_RESET_EVT);
841  		if (n->state == NODE_FAILINGOVER)
842  			tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
843  		le->link = l;
844  		n->link_cnt++;
845  		tipc_node_calculate_timer(n, l);
846  		if (n->link_cnt == 1) {
847  			intv = jiffies + msecs_to_jiffies(n->keepalive_intv);
848  			if (!mod_timer(&n->timer, intv))
849  				tipc_node_get(n);
850  		}
851  	}
852  	memcpy(&le->maddr, maddr, sizeof(*maddr));
853  exit:
854  	tipc_node_write_unlock(n);
855  	if (reset && l && !tipc_link_is_reset(l))
856  		tipc_node_link_down(n, b->identity, false);
857  	tipc_node_put(n);
858  }
859  
860  void tipc_node_delete_links(struct net *net, int bearer_id)
861  {
862  	struct tipc_net *tn = net_generic(net, tipc_net_id);
863  	struct tipc_node *n;
864  
865  	rcu_read_lock();
866  	list_for_each_entry_rcu(n, &tn->node_list, list) {
867  		tipc_node_link_down(n, bearer_id, true);
868  	}
869  	rcu_read_unlock();
870  }
871  
872  static void tipc_node_reset_links(struct tipc_node *n)
873  {
874  	char addr_string[16];
875  	int i;
876  
877  	pr_warn("Resetting all links to %s\n",
878  		tipc_addr_string_fill(addr_string, n->addr));
879  
880  	for (i = 0; i < MAX_BEARERS; i++) {
881  		tipc_node_link_down(n, i, false);
882  	}
883  }
884  
885  /* tipc_node_fsm_evt - node finite state machine
886   * Determines when contact is allowed with peer node
887   */
888  static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
889  {
890  	int state = n->state;
891  
892  	switch (state) {
893  	case SELF_DOWN_PEER_DOWN:
894  		switch (evt) {
895  		case SELF_ESTABL_CONTACT_EVT:
896  			state = SELF_UP_PEER_COMING;
897  			break;
898  		case PEER_ESTABL_CONTACT_EVT:
899  			state = SELF_COMING_PEER_UP;
900  			break;
901  		case SELF_LOST_CONTACT_EVT:
902  		case PEER_LOST_CONTACT_EVT:
903  			break;
904  		case NODE_SYNCH_END_EVT:
905  		case NODE_SYNCH_BEGIN_EVT:
906  		case NODE_FAILOVER_BEGIN_EVT:
907  		case NODE_FAILOVER_END_EVT:
908  		default:
909  			goto illegal_evt;
910  		}
911  		break;
912  	case SELF_UP_PEER_UP:
913  		switch (evt) {
914  		case SELF_LOST_CONTACT_EVT:
915  			state = SELF_DOWN_PEER_LEAVING;
916  			break;
917  		case PEER_LOST_CONTACT_EVT:
918  			state = SELF_LEAVING_PEER_DOWN;
919  			break;
920  		case NODE_SYNCH_BEGIN_EVT:
921  			state = NODE_SYNCHING;
922  			break;
923  		case NODE_FAILOVER_BEGIN_EVT:
924  			state = NODE_FAILINGOVER;
925  			break;
926  		case SELF_ESTABL_CONTACT_EVT:
927  		case PEER_ESTABL_CONTACT_EVT:
928  		case NODE_SYNCH_END_EVT:
929  		case NODE_FAILOVER_END_EVT:
930  			break;
931  		default:
932  			goto illegal_evt;
933  		}
934  		break;
935  	case SELF_DOWN_PEER_LEAVING:
936  		switch (evt) {
937  		case PEER_LOST_CONTACT_EVT:
938  			state = SELF_DOWN_PEER_DOWN;
939  			break;
940  		case SELF_ESTABL_CONTACT_EVT:
941  		case PEER_ESTABL_CONTACT_EVT:
942  		case SELF_LOST_CONTACT_EVT:
943  			break;
944  		case NODE_SYNCH_END_EVT:
945  		case NODE_SYNCH_BEGIN_EVT:
946  		case NODE_FAILOVER_BEGIN_EVT:
947  		case NODE_FAILOVER_END_EVT:
948  		default:
949  			goto illegal_evt;
950  		}
951  		break;
952  	case SELF_UP_PEER_COMING:
953  		switch (evt) {
954  		case PEER_ESTABL_CONTACT_EVT:
955  			state = SELF_UP_PEER_UP;
956  			break;
957  		case SELF_LOST_CONTACT_EVT:
958  			state = SELF_DOWN_PEER_DOWN;
959  			break;
960  		case SELF_ESTABL_CONTACT_EVT:
961  		case PEER_LOST_CONTACT_EVT:
962  		case NODE_SYNCH_END_EVT:
963  		case NODE_FAILOVER_BEGIN_EVT:
964  			break;
965  		case NODE_SYNCH_BEGIN_EVT:
966  		case NODE_FAILOVER_END_EVT:
967  		default:
968  			goto illegal_evt;
969  		}
970  		break;
971  	case SELF_COMING_PEER_UP:
972  		switch (evt) {
973  		case SELF_ESTABL_CONTACT_EVT:
974  			state = SELF_UP_PEER_UP;
975  			break;
976  		case PEER_LOST_CONTACT_EVT:
977  			state = SELF_DOWN_PEER_DOWN;
978  			break;
979  		case SELF_LOST_CONTACT_EVT:
980  		case PEER_ESTABL_CONTACT_EVT:
981  			break;
982  		case NODE_SYNCH_END_EVT:
983  		case NODE_SYNCH_BEGIN_EVT:
984  		case NODE_FAILOVER_BEGIN_EVT:
985  		case NODE_FAILOVER_END_EVT:
986  		default:
987  			goto illegal_evt;
988  		}
989  		break;
990  	case SELF_LEAVING_PEER_DOWN:
991  		switch (evt) {
992  		case SELF_LOST_CONTACT_EVT:
993  			state = SELF_DOWN_PEER_DOWN;
994  			break;
995  		case SELF_ESTABL_CONTACT_EVT:
996  		case PEER_ESTABL_CONTACT_EVT:
997  		case PEER_LOST_CONTACT_EVT:
998  			break;
999  		case NODE_SYNCH_END_EVT:
1000  		case NODE_SYNCH_BEGIN_EVT:
1001  		case NODE_FAILOVER_BEGIN_EVT:
1002  		case NODE_FAILOVER_END_EVT:
1003  		default:
1004  			goto illegal_evt;
1005  		}
1006  		break;
1007  	case NODE_FAILINGOVER:
1008  		switch (evt) {
1009  		case SELF_LOST_CONTACT_EVT:
1010  			state = SELF_DOWN_PEER_LEAVING;
1011  			break;
1012  		case PEER_LOST_CONTACT_EVT:
1013  			state = SELF_LEAVING_PEER_DOWN;
1014  			break;
1015  		case NODE_FAILOVER_END_EVT:
1016  			state = SELF_UP_PEER_UP;
1017  			break;
1018  		case NODE_FAILOVER_BEGIN_EVT:
1019  		case SELF_ESTABL_CONTACT_EVT:
1020  		case PEER_ESTABL_CONTACT_EVT:
1021  			break;
1022  		case NODE_SYNCH_BEGIN_EVT:
1023  		case NODE_SYNCH_END_EVT:
1024  		default:
1025  			goto illegal_evt;
1026  		}
1027  		break;
1028  	case NODE_SYNCHING:
1029  		switch (evt) {
1030  		case SELF_LOST_CONTACT_EVT:
1031  			state = SELF_DOWN_PEER_LEAVING;
1032  			break;
1033  		case PEER_LOST_CONTACT_EVT:
1034  			state = SELF_LEAVING_PEER_DOWN;
1035  			break;
1036  		case NODE_SYNCH_END_EVT:
1037  			state = SELF_UP_PEER_UP;
1038  			break;
1039  		case NODE_FAILOVER_BEGIN_EVT:
1040  			state = NODE_FAILINGOVER;
1041  			break;
1042  		case NODE_SYNCH_BEGIN_EVT:
1043  		case SELF_ESTABL_CONTACT_EVT:
1044  		case PEER_ESTABL_CONTACT_EVT:
1045  			break;
1046  		case NODE_FAILOVER_END_EVT:
1047  		default:
1048  			goto illegal_evt;
1049  		}
1050  		break;
1051  	default:
1052  		pr_err("Unknown node fsm state %x\n", state);
1053  		break;
1054  	}
1055  	n->state = state;
1056  	return;
1057  
1058  illegal_evt:
1059  	pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
1060  }
1061  
1062  static void node_lost_contact(struct tipc_node *n,
1063  			      struct sk_buff_head *inputq)
1064  {
1065  	char addr_string[16];
1066  	struct tipc_sock_conn *conn, *safe;
1067  	struct tipc_link *l;
1068  	struct list_head *conns = &n->conn_sks;
1069  	struct sk_buff *skb;
1070  	uint i;
1071  
1072  	pr_debug("Lost contact with %s\n",
1073  		 tipc_addr_string_fill(addr_string, n->addr));
1074  
1075  	/* Clean up broadcast state */
1076  	tipc_bcast_remove_peer(n->net, n->bc_entry.link);
1077  
1078  	/* Abort any ongoing link failover */
1079  	for (i = 0; i < MAX_BEARERS; i++) {
1080  		l = n->links[i].link;
1081  		if (l)
1082  			tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
1083  	}
1084  
1085  	/* Notify publications from this node */
1086  	n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
1087  
1088  	/* Notify sockets connected to node */
1089  	list_for_each_entry_safe(conn, safe, conns, list) {
1090  		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
1091  				      SHORT_H_SIZE, 0, tipc_own_addr(n->net),
1092  				      conn->peer_node, conn->port,
1093  				      conn->peer_port, TIPC_ERR_NO_NODE);
1094  		if (likely(skb))
1095  			skb_queue_tail(inputq, skb);
1096  		list_del(&conn->list);
1097  		kfree(conn);
1098  	}
1099  }
1100  
1101  /**
1102   * tipc_node_get_linkname - get the name of a link
1103   *
1104   * @bearer_id: id of the bearer
1105   * @node: peer node address
1106   * @linkname: link name output buffer
1107   *
1108   * Returns 0 on success
1109   */
1110  int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
1111  			   char *linkname, size_t len)
1112  {
1113  	struct tipc_link *link;
1114  	int err = -EINVAL;
1115  	struct tipc_node *node = tipc_node_find(net, addr);
1116  
1117  	if (!node)
1118  		return err;
1119  
1120  	if (bearer_id >= MAX_BEARERS)
1121  		goto exit;
1122  
1123  	tipc_node_read_lock(node);
1124  	link = node->links[bearer_id].link;
1125  	if (link) {
1126  		strncpy(linkname, tipc_link_name(link), len);
1127  		err = 0;
1128  	}
1129  	tipc_node_read_unlock(node);
1130  exit:
1131  	tipc_node_put(node);
1132  	return err;
1133  }
1134  
1135  /* Caller should hold node lock for the passed node */
1136  static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
1137  {
1138  	void *hdr;
1139  	struct nlattr *attrs;
1140  
1141  	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
1142  			  NLM_F_MULTI, TIPC_NL_NODE_GET);
1143  	if (!hdr)
1144  		return -EMSGSIZE;
1145  
1146  	attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE);
1147  	if (!attrs)
1148  		goto msg_full;
1149  
1150  	if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
1151  		goto attr_msg_full;
1152  	if (tipc_node_is_up(node))
1153  		if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
1154  			goto attr_msg_full;
1155  
1156  	nla_nest_end(msg->skb, attrs);
1157  	genlmsg_end(msg->skb, hdr);
1158  
1159  	return 0;
1160  
1161  attr_msg_full:
1162  	nla_nest_cancel(msg->skb, attrs);
1163  msg_full:
1164  	genlmsg_cancel(msg->skb, hdr);
1165  
1166  	return -EMSGSIZE;
1167  }
1168  
1169  /**
1170   * tipc_node_xmit() is the general link level function for message sending
1171   * @net: the applicable net namespace
1172   * @list: chain of buffers containing message
1173   * @dnode: address of destination node
1174   * @selector: a number used for deterministic link selection
1175   * Consumes the buffer chain.
1176   * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
1177   */
1178  int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1179  		   u32 dnode, int selector)
1180  {
1181  	struct tipc_link_entry *le = NULL;
1182  	struct tipc_node *n;
1183  	struct sk_buff_head xmitq;
1184  	int bearer_id;
1185  	int rc;
1186  
1187  	if (in_own_node(net, dnode)) {
1188  		tipc_sk_rcv(net, list);
1189  		return 0;
1190  	}
1191  
1192  	n = tipc_node_find(net, dnode);
1193  	if (unlikely(!n)) {
1194  		skb_queue_purge(list);
1195  		return -EHOSTUNREACH;
1196  	}
1197  
1198  	tipc_node_read_lock(n);
1199  	bearer_id = n->active_links[selector & 1];
1200  	if (unlikely(bearer_id == INVALID_BEARER_ID)) {
1201  		tipc_node_read_unlock(n);
1202  		tipc_node_put(n);
1203  		skb_queue_purge(list);
1204  		return -EHOSTUNREACH;
1205  	}
1206  
1207  	__skb_queue_head_init(&xmitq);
1208  	le = &n->links[bearer_id];
1209  	spin_lock_bh(&le->lock);
1210  	rc = tipc_link_xmit(le->link, list, &xmitq);
1211  	spin_unlock_bh(&le->lock);
1212  	tipc_node_read_unlock(n);
1213  
1214  	if (unlikely(rc == -ENOBUFS))
1215  		tipc_node_link_down(n, bearer_id, false);
1216  	else
1217  		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1218  
1219  	tipc_node_put(n);
1220  
1221  	return rc;
1222  }
1223  
1224  /* tipc_node_xmit_skb(): send single buffer to destination
1225   * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
1226   * messages, which will not be rejected
1227   * The only exception is datagram messages rerouted after secondary
1228   * lookup, which are rare and safe to dispose of anyway.
1229   */
1230  int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1231  		       u32 selector)
1232  {
1233  	struct sk_buff_head head;
1234  
1235  	skb_queue_head_init(&head);
1236  	__skb_queue_tail(&head, skb);
1237  	tipc_node_xmit(net, &head, dnode, selector);
1238  	return 0;
1239  }
1240  
1241  void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1242  {
1243  	struct sk_buff *txskb;
1244  	struct tipc_node *n;
1245  	u32 dst;
1246  
1247  	rcu_read_lock();
1248  	list_for_each_entry_rcu(n, tipc_nodes(net), list) {
1249  		dst = n->addr;
1250  		if (in_own_node(net, dst))
1251  			continue;
1252  		if (!tipc_node_is_up(n))
1253  			continue;
1254  		txskb = pskb_copy(skb, GFP_ATOMIC);
1255  		if (!txskb)
1256  			break;
1257  		msg_set_destnode(buf_msg(txskb), dst);
1258  		tipc_node_xmit_skb(net, txskb, dst, 0);
1259  	}
1260  	rcu_read_unlock();
1261  
1262  	kfree_skb(skb);
1263  }
1264  
1265  static void tipc_node_mcast_rcv(struct tipc_node *n)
1266  {
1267  	struct tipc_bclink_entry *be = &n->bc_entry;
1268  
1269  	/* 'arrvq' is under inputq2's lock protection */
1270  	spin_lock_bh(&be->inputq2.lock);
1271  	spin_lock_bh(&be->inputq1.lock);
1272  	skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1273  	spin_unlock_bh(&be->inputq1.lock);
1274  	spin_unlock_bh(&be->inputq2.lock);
1275  	tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
1276  }
1277  
1278  static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
1279  				  int bearer_id, struct sk_buff_head *xmitq)
1280  {
1281  	struct tipc_link *ucl;
1282  	int rc;
1283  
1284  	rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr);
1285  
1286  	if (rc & TIPC_LINK_DOWN_EVT) {
1287  		tipc_node_reset_links(n);
1288  		return;
1289  	}
1290  
1291  	if (!(rc & TIPC_LINK_SND_STATE))
1292  		return;
1293  
1294  	/* If probe message, a STATE response will be sent anyway */
1295  	if (msg_probe(hdr))
1296  		return;
1297  
1298  	/* Produce a STATE message carrying broadcast NACK */
1299  	tipc_node_read_lock(n);
1300  	ucl = n->links[bearer_id].link;
1301  	if (ucl)
1302  		tipc_link_build_state_msg(ucl, xmitq);
1303  	tipc_node_read_unlock(n);
1304  }
1305  
1306  /**
1307   * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1308   * @net: the applicable net namespace
1309   * @skb: TIPC packet
1310   * @bearer_id: id of bearer message arrived on
1311   *
1312   * Invoked with no locks held.
1313   */
1314  static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1315  {
1316  	int rc;
1317  	struct sk_buff_head xmitq;
1318  	struct tipc_bclink_entry *be;
1319  	struct tipc_link_entry *le;
1320  	struct tipc_msg *hdr = buf_msg(skb);
1321  	int usr = msg_user(hdr);
1322  	u32 dnode = msg_destnode(hdr);
1323  	struct tipc_node *n;
1324  
1325  	__skb_queue_head_init(&xmitq);
1326  
1327  	/* If NACK for other node, let rcv link for that node peek into it */
1328  	if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1329  		n = tipc_node_find(net, dnode);
1330  	else
1331  		n = tipc_node_find(net, msg_prevnode(hdr));
1332  	if (!n) {
1333  		kfree_skb(skb);
1334  		return;
1335  	}
1336  	be = &n->bc_entry;
1337  	le = &n->links[bearer_id];
1338  
1339  	rc = tipc_bcast_rcv(net, be->link, skb);
1340  
1341  	/* Broadcast ACKs are sent on a unicast link */
1342  	if (rc & TIPC_LINK_SND_STATE) {
1343  		tipc_node_read_lock(n);
1344  		tipc_link_build_state_msg(le->link, &xmitq);
1345  		tipc_node_read_unlock(n);
1346  	}
1347  
1348  	if (!skb_queue_empty(&xmitq))
1349  		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1350  
1351  	if (!skb_queue_empty(&be->inputq1))
1352  		tipc_node_mcast_rcv(n);
1353  
1354  	/* If reassembly or retransmission failure => reset all links to peer */
1355  	if (rc & TIPC_LINK_DOWN_EVT)
1356  		tipc_node_reset_links(n);
1357  
1358  	tipc_node_put(n);
1359  }
1360  
1361  /**
1362   * tipc_node_check_state - check and if necessary update node state
1363   * @skb: TIPC packet
1364   * @bearer_id: identity of bearer delivering the packet
1365   * Returns true if state is ok, otherwise consumes buffer and returns false
1366   */
1367  static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1368  				  int bearer_id, struct sk_buff_head *xmitq)
1369  {
1370  	struct tipc_msg *hdr = buf_msg(skb);
1371  	int usr = msg_user(hdr);
1372  	int mtyp = msg_type(hdr);
1373  	u16 oseqno = msg_seqno(hdr);
1374  	u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
1375  	u16 exp_pkts = msg_msgcnt(hdr);
1376  	u16 rcv_nxt, syncpt, dlv_nxt, inputq_len;
1377  	int state = n->state;
1378  	struct tipc_link *l, *tnl, *pl = NULL;
1379  	struct tipc_media_addr *maddr;
1380  	int pb_id;
1381  
1382  	l = n->links[bearer_id].link;
1383  	if (!l)
1384  		return false;
1385  	rcv_nxt = tipc_link_rcv_nxt(l);
1386  
1387  
1388  	if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
1389  		return true;
1390  
1391  	/* Find parallel link, if any */
1392  	for (pb_id = 0; pb_id < MAX_BEARERS; pb_id++) {
1393  		if ((pb_id != bearer_id) && n->links[pb_id].link) {
1394  			pl = n->links[pb_id].link;
1395  			break;
1396  		}
1397  	}
1398  
1399  	/* Check and update node accesibility if applicable */
1400  	if (state == SELF_UP_PEER_COMING) {
1401  		if (!tipc_link_is_up(l))
1402  			return true;
1403  		if (!msg_peer_link_is_up(hdr))
1404  			return true;
1405  		tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
1406  	}
1407  
1408  	if (state == SELF_DOWN_PEER_LEAVING) {
1409  		if (msg_peer_node_is_up(hdr))
1410  			return false;
1411  		tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
1412  		return true;
1413  	}
1414  
1415  	if (state == SELF_LEAVING_PEER_DOWN)
1416  		return false;
1417  
1418  	/* Ignore duplicate packets */
1419  	if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
1420  		return true;
1421  
1422  	/* Initiate or update failover mode if applicable */
1423  	if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1424  		syncpt = oseqno + exp_pkts - 1;
1425  		if (pl && tipc_link_is_up(pl)) {
1426  			__tipc_node_link_down(n, &pb_id, xmitq, &maddr);
1427  			tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl),
1428  							tipc_link_inputq(l));
1429  		}
1430  		/* If pkts arrive out of order, use lowest calculated syncpt */
1431  		if (less(syncpt, n->sync_point))
1432  			n->sync_point = syncpt;
1433  	}
1434  
1435  	/* Open parallel link when tunnel link reaches synch point */
1436  	if ((n->state == NODE_FAILINGOVER) && tipc_link_is_up(l)) {
1437  		if (!more(rcv_nxt, n->sync_point))
1438  			return true;
1439  		tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
1440  		if (pl)
1441  			tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT);
1442  		return true;
1443  	}
1444  
1445  	/* No synching needed if only one link */
1446  	if (!pl || !tipc_link_is_up(pl))
1447  		return true;
1448  
1449  	/* Initiate synch mode if applicable */
1450  	if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) {
1451  		syncpt = iseqno + exp_pkts - 1;
1452  		if (!tipc_link_is_up(l))
1453  			__tipc_node_link_up(n, bearer_id, xmitq);
1454  		if (n->state == SELF_UP_PEER_UP) {
1455  			n->sync_point = syncpt;
1456  			tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
1457  			tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
1458  		}
1459  	}
1460  
1461  	/* Open tunnel link when parallel link reaches synch point */
1462  	if (n->state == NODE_SYNCHING) {
1463  		if (tipc_link_is_synching(l)) {
1464  			tnl = l;
1465  		} else {
1466  			tnl = pl;
1467  			pl = l;
1468  		}
1469  		inputq_len = skb_queue_len(tipc_link_inputq(pl));
1470  		dlv_nxt = tipc_link_rcv_nxt(pl) - inputq_len;
1471  		if (more(dlv_nxt, n->sync_point)) {
1472  			tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
1473  			tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
1474  			return true;
1475  		}
1476  		if (l == pl)
1477  			return true;
1478  		if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
1479  			return true;
1480  		if (usr == LINK_PROTOCOL)
1481  			return true;
1482  		return false;
1483  	}
1484  	return true;
1485  }
1486  
1487  /**
1488   * tipc_rcv - process TIPC packets/messages arriving from off-node
1489   * @net: the applicable net namespace
1490   * @skb: TIPC packet
1491   * @bearer: pointer to bearer message arrived on
1492   *
1493   * Invoked with no locks held. Bearer pointer must point to a valid bearer
1494   * structure (i.e. cannot be NULL), but bearer can be inactive.
1495   */
1496  void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1497  {
1498  	struct sk_buff_head xmitq;
1499  	struct tipc_node *n;
1500  	struct tipc_msg *hdr;
1501  	int bearer_id = b->identity;
1502  	struct tipc_link_entry *le;
1503  	u32 self = tipc_own_addr(net);
1504  	int usr, rc = 0;
1505  	u16 bc_ack;
1506  
1507  	__skb_queue_head_init(&xmitq);
1508  
1509  	/* Ensure message is well-formed before touching the header */
1510  	if (unlikely(!tipc_msg_validate(skb)))
1511  		goto discard;
1512  	hdr = buf_msg(skb);
1513  	usr = msg_user(hdr);
1514  	bc_ack = msg_bcast_ack(hdr);
1515  
1516  	/* Handle arrival of discovery or broadcast packet */
1517  	if (unlikely(msg_non_seq(hdr))) {
1518  		if (unlikely(usr == LINK_CONFIG))
1519  			return tipc_disc_rcv(net, skb, b);
1520  		else
1521  			return tipc_node_bc_rcv(net, skb, bearer_id);
1522  	}
1523  
1524  	/* Discard unicast link messages destined for another node */
1525  	if (unlikely(!msg_short(hdr) && (msg_destnode(hdr) != self)))
1526  		goto discard;
1527  
1528  	/* Locate neighboring node that sent packet */
1529  	n = tipc_node_find(net, msg_prevnode(hdr));
1530  	if (unlikely(!n))
1531  		goto discard;
1532  	le = &n->links[bearer_id];
1533  
1534  	/* Ensure broadcast reception is in synch with peer's send state */
1535  	if (unlikely(usr == LINK_PROTOCOL))
1536  		tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq);
1537  	else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
1538  		tipc_bcast_ack_rcv(net, n->bc_entry.link, hdr);
1539  
1540  	/* Receive packet directly if conditions permit */
1541  	tipc_node_read_lock(n);
1542  	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
1543  		spin_lock_bh(&le->lock);
1544  		if (le->link) {
1545  			rc = tipc_link_rcv(le->link, skb, &xmitq);
1546  			skb = NULL;
1547  		}
1548  		spin_unlock_bh(&le->lock);
1549  	}
1550  	tipc_node_read_unlock(n);
1551  
1552  	/* Check/update node state before receiving */
1553  	if (unlikely(skb)) {
1554  		if (unlikely(skb_linearize(skb)))
1555  			goto discard;
1556  		tipc_node_write_lock(n);
1557  		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
1558  			if (le->link) {
1559  				rc = tipc_link_rcv(le->link, skb, &xmitq);
1560  				skb = NULL;
1561  			}
1562  		}
1563  		tipc_node_write_unlock(n);
1564  	}
1565  
1566  	if (unlikely(rc & TIPC_LINK_UP_EVT))
1567  		tipc_node_link_up(n, bearer_id, &xmitq);
1568  
1569  	if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1570  		tipc_node_link_down(n, bearer_id, false);
1571  
1572  	if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1573  		tipc_named_rcv(net, &n->bc_entry.namedq);
1574  
1575  	if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
1576  		tipc_node_mcast_rcv(n);
1577  
1578  	if (!skb_queue_empty(&le->inputq))
1579  		tipc_sk_rcv(net, &le->inputq);
1580  
1581  	if (!skb_queue_empty(&xmitq))
1582  		tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1583  
1584  	tipc_node_put(n);
1585  discard:
1586  	kfree_skb(skb);
1587  }
1588  
1589  int tipc_nl_peer_rm(struct sk_buff *skb, struct genl_info *info)
1590  {
1591  	struct net *net = sock_net(skb->sk);
1592  	struct tipc_net *tn = net_generic(net, tipc_net_id);
1593  	struct nlattr *attrs[TIPC_NLA_NET_MAX + 1];
1594  	struct tipc_node *peer;
1595  	u32 addr;
1596  	int err;
1597  	int i;
1598  
1599  	/* We identify the peer by its net */
1600  	if (!info->attrs[TIPC_NLA_NET])
1601  		return -EINVAL;
1602  
1603  	err = nla_parse_nested(attrs, TIPC_NLA_NET_MAX,
1604  			       info->attrs[TIPC_NLA_NET], tipc_nl_net_policy,
1605  			       info->extack);
1606  	if (err)
1607  		return err;
1608  
1609  	if (!attrs[TIPC_NLA_NET_ADDR])
1610  		return -EINVAL;
1611  
1612  	addr = nla_get_u32(attrs[TIPC_NLA_NET_ADDR]);
1613  
1614  	if (in_own_node(net, addr))
1615  		return -ENOTSUPP;
1616  
1617  	spin_lock_bh(&tn->node_list_lock);
1618  	peer = tipc_node_find(net, addr);
1619  	if (!peer) {
1620  		spin_unlock_bh(&tn->node_list_lock);
1621  		return -ENXIO;
1622  	}
1623  
1624  	tipc_node_write_lock(peer);
1625  	if (peer->state != SELF_DOWN_PEER_DOWN &&
1626  	    peer->state != SELF_DOWN_PEER_LEAVING) {
1627  		tipc_node_write_unlock(peer);
1628  		err = -EBUSY;
1629  		goto err_out;
1630  	}
1631  
1632  	for (i = 0; i < MAX_BEARERS; i++) {
1633  		struct tipc_link_entry *le = &peer->links[i];
1634  
1635  		if (le->link) {
1636  			kfree(le->link);
1637  			le->link = NULL;
1638  			peer->link_cnt--;
1639  		}
1640  	}
1641  	tipc_node_write_unlock(peer);
1642  	tipc_node_delete(peer);
1643  
1644  	err = 0;
1645  err_out:
1646  	tipc_node_put(peer);
1647  	spin_unlock_bh(&tn->node_list_lock);
1648  
1649  	return err;
1650  }
1651  
1652  int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1653  {
1654  	int err;
1655  	struct net *net = sock_net(skb->sk);
1656  	struct tipc_net *tn = net_generic(net, tipc_net_id);
1657  	int done = cb->args[0];
1658  	int last_addr = cb->args[1];
1659  	struct tipc_node *node;
1660  	struct tipc_nl_msg msg;
1661  
1662  	if (done)
1663  		return 0;
1664  
1665  	msg.skb = skb;
1666  	msg.portid = NETLINK_CB(cb->skb).portid;
1667  	msg.seq = cb->nlh->nlmsg_seq;
1668  
1669  	rcu_read_lock();
1670  	if (last_addr) {
1671  		node = tipc_node_find(net, last_addr);
1672  		if (!node) {
1673  			rcu_read_unlock();
1674  			/* We never set seq or call nl_dump_check_consistent()
1675  			 * this means that setting prev_seq here will cause the
1676  			 * consistence check to fail in the netlink callback
1677  			 * handler. Resulting in the NLMSG_DONE message having
1678  			 * the NLM_F_DUMP_INTR flag set if the node state
1679  			 * changed while we released the lock.
1680  			 */
1681  			cb->prev_seq = 1;
1682  			return -EPIPE;
1683  		}
1684  		tipc_node_put(node);
1685  	}
1686  
1687  	list_for_each_entry_rcu(node, &tn->node_list, list) {
1688  		if (last_addr) {
1689  			if (node->addr == last_addr)
1690  				last_addr = 0;
1691  			else
1692  				continue;
1693  		}
1694  
1695  		tipc_node_read_lock(node);
1696  		err = __tipc_nl_add_node(&msg, node);
1697  		if (err) {
1698  			last_addr = node->addr;
1699  			tipc_node_read_unlock(node);
1700  			goto out;
1701  		}
1702  
1703  		tipc_node_read_unlock(node);
1704  	}
1705  	done = 1;
1706  out:
1707  	cb->args[0] = done;
1708  	cb->args[1] = last_addr;
1709  	rcu_read_unlock();
1710  
1711  	return skb->len;
1712  }
1713  
1714  /* tipc_node_find_by_name - locate owner node of link by link's name
1715   * @net: the applicable net namespace
1716   * @name: pointer to link name string
1717   * @bearer_id: pointer to index in 'node->links' array where the link was found.
1718   *
1719   * Returns pointer to node owning the link, or 0 if no matching link is found.
1720   */
1721  static struct tipc_node *tipc_node_find_by_name(struct net *net,
1722  						const char *link_name,
1723  						unsigned int *bearer_id)
1724  {
1725  	struct tipc_net *tn = net_generic(net, tipc_net_id);
1726  	struct tipc_link *l;
1727  	struct tipc_node *n;
1728  	struct tipc_node *found_node = NULL;
1729  	int i;
1730  
1731  	*bearer_id = 0;
1732  	rcu_read_lock();
1733  	list_for_each_entry_rcu(n, &tn->node_list, list) {
1734  		tipc_node_read_lock(n);
1735  		for (i = 0; i < MAX_BEARERS; i++) {
1736  			l = n->links[i].link;
1737  			if (l && !strcmp(tipc_link_name(l), link_name)) {
1738  				*bearer_id = i;
1739  				found_node = n;
1740  				break;
1741  			}
1742  		}
1743  		tipc_node_read_unlock(n);
1744  		if (found_node)
1745  			break;
1746  	}
1747  	rcu_read_unlock();
1748  
1749  	return found_node;
1750  }
1751  
1752  int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info)
1753  {
1754  	int err;
1755  	int res = 0;
1756  	int bearer_id;
1757  	char *name;
1758  	struct tipc_link *link;
1759  	struct tipc_node *node;
1760  	struct sk_buff_head xmitq;
1761  	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1762  	struct net *net = sock_net(skb->sk);
1763  
1764  	__skb_queue_head_init(&xmitq);
1765  
1766  	if (!info->attrs[TIPC_NLA_LINK])
1767  		return -EINVAL;
1768  
1769  	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
1770  			       info->attrs[TIPC_NLA_LINK],
1771  			       tipc_nl_link_policy, info->extack);
1772  	if (err)
1773  		return err;
1774  
1775  	if (!attrs[TIPC_NLA_LINK_NAME])
1776  		return -EINVAL;
1777  
1778  	name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
1779  
1780  	if (strcmp(name, tipc_bclink_name) == 0)
1781  		return tipc_nl_bc_link_set(net, attrs);
1782  
1783  	node = tipc_node_find_by_name(net, name, &bearer_id);
1784  	if (!node)
1785  		return -EINVAL;
1786  
1787  	tipc_node_read_lock(node);
1788  
1789  	link = node->links[bearer_id].link;
1790  	if (!link) {
1791  		res = -EINVAL;
1792  		goto out;
1793  	}
1794  
1795  	if (attrs[TIPC_NLA_LINK_PROP]) {
1796  		struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
1797  
1798  		err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
1799  					      props);
1800  		if (err) {
1801  			res = err;
1802  			goto out;
1803  		}
1804  
1805  		if (props[TIPC_NLA_PROP_TOL]) {
1806  			u32 tol;
1807  
1808  			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1809  			tipc_link_set_tolerance(link, tol, &xmitq);
1810  		}
1811  		if (props[TIPC_NLA_PROP_PRIO]) {
1812  			u32 prio;
1813  
1814  			prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1815  			tipc_link_set_prio(link, prio, &xmitq);
1816  		}
1817  		if (props[TIPC_NLA_PROP_WIN]) {
1818  			u32 win;
1819  
1820  			win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1821  			tipc_link_set_queue_limits(link, win);
1822  		}
1823  	}
1824  
1825  out:
1826  	tipc_node_read_unlock(node);
1827  	tipc_bearer_xmit(net, bearer_id, &xmitq, &node->links[bearer_id].maddr);
1828  	return res;
1829  }
1830  
1831  int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info)
1832  {
1833  	struct net *net = genl_info_net(info);
1834  	struct tipc_nl_msg msg;
1835  	char *name;
1836  	int err;
1837  
1838  	msg.portid = info->snd_portid;
1839  	msg.seq = info->snd_seq;
1840  
1841  	if (!info->attrs[TIPC_NLA_LINK_NAME])
1842  		return -EINVAL;
1843  	name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
1844  
1845  	msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
1846  	if (!msg.skb)
1847  		return -ENOMEM;
1848  
1849  	if (strcmp(name, tipc_bclink_name) == 0) {
1850  		err = tipc_nl_add_bc_link(net, &msg);
1851  		if (err) {
1852  			nlmsg_free(msg.skb);
1853  			return err;
1854  		}
1855  	} else {
1856  		int bearer_id;
1857  		struct tipc_node *node;
1858  		struct tipc_link *link;
1859  
1860  		node = tipc_node_find_by_name(net, name, &bearer_id);
1861  		if (!node)
1862  			return -EINVAL;
1863  
1864  		tipc_node_read_lock(node);
1865  		link = node->links[bearer_id].link;
1866  		if (!link) {
1867  			tipc_node_read_unlock(node);
1868  			nlmsg_free(msg.skb);
1869  			return -EINVAL;
1870  		}
1871  
1872  		err = __tipc_nl_add_link(net, &msg, link, 0);
1873  		tipc_node_read_unlock(node);
1874  		if (err) {
1875  			nlmsg_free(msg.skb);
1876  			return err;
1877  		}
1878  	}
1879  
1880  	return genlmsg_reply(msg.skb, info);
1881  }
1882  
1883  int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
1884  {
1885  	int err;
1886  	char *link_name;
1887  	unsigned int bearer_id;
1888  	struct tipc_link *link;
1889  	struct tipc_node *node;
1890  	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1891  	struct net *net = sock_net(skb->sk);
1892  	struct tipc_link_entry *le;
1893  
1894  	if (!info->attrs[TIPC_NLA_LINK])
1895  		return -EINVAL;
1896  
1897  	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
1898  			       info->attrs[TIPC_NLA_LINK],
1899  			       tipc_nl_link_policy, info->extack);
1900  	if (err)
1901  		return err;
1902  
1903  	if (!attrs[TIPC_NLA_LINK_NAME])
1904  		return -EINVAL;
1905  
1906  	link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
1907  
1908  	if (strcmp(link_name, tipc_bclink_name) == 0) {
1909  		err = tipc_bclink_reset_stats(net);
1910  		if (err)
1911  			return err;
1912  		return 0;
1913  	}
1914  
1915  	node = tipc_node_find_by_name(net, link_name, &bearer_id);
1916  	if (!node)
1917  		return -EINVAL;
1918  
1919  	le = &node->links[bearer_id];
1920  	tipc_node_read_lock(node);
1921  	spin_lock_bh(&le->lock);
1922  	link = node->links[bearer_id].link;
1923  	if (!link) {
1924  		spin_unlock_bh(&le->lock);
1925  		tipc_node_read_unlock(node);
1926  		return -EINVAL;
1927  	}
1928  	tipc_link_reset_stats(link);
1929  	spin_unlock_bh(&le->lock);
1930  	tipc_node_read_unlock(node);
1931  	return 0;
1932  }
1933  
1934  /* Caller should hold node lock  */
1935  static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
1936  				    struct tipc_node *node, u32 *prev_link)
1937  {
1938  	u32 i;
1939  	int err;
1940  
1941  	for (i = *prev_link; i < MAX_BEARERS; i++) {
1942  		*prev_link = i;
1943  
1944  		if (!node->links[i].link)
1945  			continue;
1946  
1947  		err = __tipc_nl_add_link(net, msg,
1948  					 node->links[i].link, NLM_F_MULTI);
1949  		if (err)
1950  			return err;
1951  	}
1952  	*prev_link = 0;
1953  
1954  	return 0;
1955  }
1956  
1957  int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
1958  {
1959  	struct net *net = sock_net(skb->sk);
1960  	struct tipc_net *tn = net_generic(net, tipc_net_id);
1961  	struct tipc_node *node;
1962  	struct tipc_nl_msg msg;
1963  	u32 prev_node = cb->args[0];
1964  	u32 prev_link = cb->args[1];
1965  	int done = cb->args[2];
1966  	int err;
1967  
1968  	if (done)
1969  		return 0;
1970  
1971  	msg.skb = skb;
1972  	msg.portid = NETLINK_CB(cb->skb).portid;
1973  	msg.seq = cb->nlh->nlmsg_seq;
1974  
1975  	rcu_read_lock();
1976  	if (prev_node) {
1977  		node = tipc_node_find(net, prev_node);
1978  		if (!node) {
1979  			/* We never set seq or call nl_dump_check_consistent()
1980  			 * this means that setting prev_seq here will cause the
1981  			 * consistence check to fail in the netlink callback
1982  			 * handler. Resulting in the last NLMSG_DONE message
1983  			 * having the NLM_F_DUMP_INTR flag set.
1984  			 */
1985  			cb->prev_seq = 1;
1986  			goto out;
1987  		}
1988  		tipc_node_put(node);
1989  
1990  		list_for_each_entry_continue_rcu(node, &tn->node_list,
1991  						 list) {
1992  			tipc_node_read_lock(node);
1993  			err = __tipc_nl_add_node_links(net, &msg, node,
1994  						       &prev_link);
1995  			tipc_node_read_unlock(node);
1996  			if (err)
1997  				goto out;
1998  
1999  			prev_node = node->addr;
2000  		}
2001  	} else {
2002  		err = tipc_nl_add_bc_link(net, &msg);
2003  		if (err)
2004  			goto out;
2005  
2006  		list_for_each_entry_rcu(node, &tn->node_list, list) {
2007  			tipc_node_read_lock(node);
2008  			err = __tipc_nl_add_node_links(net, &msg, node,
2009  						       &prev_link);
2010  			tipc_node_read_unlock(node);
2011  			if (err)
2012  				goto out;
2013  
2014  			prev_node = node->addr;
2015  		}
2016  	}
2017  	done = 1;
2018  out:
2019  	rcu_read_unlock();
2020  
2021  	cb->args[0] = prev_node;
2022  	cb->args[1] = prev_link;
2023  	cb->args[2] = done;
2024  
2025  	return skb->len;
2026  }
2027  
2028  int tipc_nl_node_set_monitor(struct sk_buff *skb, struct genl_info *info)
2029  {
2030  	struct nlattr *attrs[TIPC_NLA_MON_MAX + 1];
2031  	struct net *net = sock_net(skb->sk);
2032  	int err;
2033  
2034  	if (!info->attrs[TIPC_NLA_MON])
2035  		return -EINVAL;
2036  
2037  	err = nla_parse_nested(attrs, TIPC_NLA_MON_MAX,
2038  			       info->attrs[TIPC_NLA_MON],
2039  			       tipc_nl_monitor_policy, info->extack);
2040  	if (err)
2041  		return err;
2042  
2043  	if (attrs[TIPC_NLA_MON_ACTIVATION_THRESHOLD]) {
2044  		u32 val;
2045  
2046  		val = nla_get_u32(attrs[TIPC_NLA_MON_ACTIVATION_THRESHOLD]);
2047  		err = tipc_nl_monitor_set_threshold(net, val);
2048  		if (err)
2049  			return err;
2050  	}
2051  
2052  	return 0;
2053  }
2054  
2055  static int __tipc_nl_add_monitor_prop(struct net *net, struct tipc_nl_msg *msg)
2056  {
2057  	struct nlattr *attrs;
2058  	void *hdr;
2059  	u32 val;
2060  
2061  	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
2062  			  0, TIPC_NL_MON_GET);
2063  	if (!hdr)
2064  		return -EMSGSIZE;
2065  
2066  	attrs = nla_nest_start(msg->skb, TIPC_NLA_MON);
2067  	if (!attrs)
2068  		goto msg_full;
2069  
2070  	val = tipc_nl_monitor_get_threshold(net);
2071  
2072  	if (nla_put_u32(msg->skb, TIPC_NLA_MON_ACTIVATION_THRESHOLD, val))
2073  		goto attr_msg_full;
2074  
2075  	nla_nest_end(msg->skb, attrs);
2076  	genlmsg_end(msg->skb, hdr);
2077  
2078  	return 0;
2079  
2080  attr_msg_full:
2081  	nla_nest_cancel(msg->skb, attrs);
2082  msg_full:
2083  	genlmsg_cancel(msg->skb, hdr);
2084  
2085  	return -EMSGSIZE;
2086  }
2087  
2088  int tipc_nl_node_get_monitor(struct sk_buff *skb, struct genl_info *info)
2089  {
2090  	struct net *net = sock_net(skb->sk);
2091  	struct tipc_nl_msg msg;
2092  	int err;
2093  
2094  	msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2095  	if (!msg.skb)
2096  		return -ENOMEM;
2097  	msg.portid = info->snd_portid;
2098  	msg.seq = info->snd_seq;
2099  
2100  	err = __tipc_nl_add_monitor_prop(net, &msg);
2101  	if (err) {
2102  		nlmsg_free(msg.skb);
2103  		return err;
2104  	}
2105  
2106  	return genlmsg_reply(msg.skb, info);
2107  }
2108  
2109  int tipc_nl_node_dump_monitor(struct sk_buff *skb, struct netlink_callback *cb)
2110  {
2111  	struct net *net = sock_net(skb->sk);
2112  	u32 prev_bearer = cb->args[0];
2113  	struct tipc_nl_msg msg;
2114  	int err;
2115  	int i;
2116  
2117  	if (prev_bearer == MAX_BEARERS)
2118  		return 0;
2119  
2120  	msg.skb = skb;
2121  	msg.portid = NETLINK_CB(cb->skb).portid;
2122  	msg.seq = cb->nlh->nlmsg_seq;
2123  
2124  	rtnl_lock();
2125  	for (i = prev_bearer; i < MAX_BEARERS; i++) {
2126  		prev_bearer = i;
2127  		err = __tipc_nl_add_monitor(net, &msg, prev_bearer);
2128  		if (err)
2129  			goto out;
2130  	}
2131  
2132  out:
2133  	rtnl_unlock();
2134  	cb->args[0] = prev_bearer;
2135  
2136  	return skb->len;
2137  }
2138  
2139  int tipc_nl_node_dump_monitor_peer(struct sk_buff *skb,
2140  				   struct netlink_callback *cb)
2141  {
2142  	struct net *net = sock_net(skb->sk);
2143  	u32 prev_node = cb->args[1];
2144  	u32 bearer_id = cb->args[2];
2145  	int done = cb->args[0];
2146  	struct tipc_nl_msg msg;
2147  	int err;
2148  
2149  	if (!prev_node) {
2150  		struct nlattr **attrs;
2151  		struct nlattr *mon[TIPC_NLA_MON_MAX + 1];
2152  
2153  		err = tipc_nlmsg_parse(cb->nlh, &attrs);
2154  		if (err)
2155  			return err;
2156  
2157  		if (!attrs[TIPC_NLA_MON])
2158  			return -EINVAL;
2159  
2160  		err = nla_parse_nested(mon, TIPC_NLA_MON_MAX,
2161  				       attrs[TIPC_NLA_MON],
2162  				       tipc_nl_monitor_policy, NULL);
2163  		if (err)
2164  			return err;
2165  
2166  		if (!mon[TIPC_NLA_MON_REF])
2167  			return -EINVAL;
2168  
2169  		bearer_id = nla_get_u32(mon[TIPC_NLA_MON_REF]);
2170  
2171  		if (bearer_id >= MAX_BEARERS)
2172  			return -EINVAL;
2173  	}
2174  
2175  	if (done)
2176  		return 0;
2177  
2178  	msg.skb = skb;
2179  	msg.portid = NETLINK_CB(cb->skb).portid;
2180  	msg.seq = cb->nlh->nlmsg_seq;
2181  
2182  	rtnl_lock();
2183  	err = tipc_nl_add_monitor_peer(net, &msg, bearer_id, &prev_node);
2184  	if (!err)
2185  		done = 1;
2186  
2187  	rtnl_unlock();
2188  	cb->args[0] = done;
2189  	cb->args[1] = prev_node;
2190  	cb->args[2] = bearer_id;
2191  
2192  	return skb->len;
2193  }
2194