xref: /openbmc/linux/net/tipc/link.c (revision d31346494bd2b1185949dc64ab6467186b80fb05)
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "subscr.h"
39 #include "link.h"
40 #include "bcast.h"
41 #include "socket.h"
42 #include "name_distr.h"
43 #include "discover.h"
44 #include "netlink.h"
45 
46 #include <linux/pkt_sched.h>
47 
48 /*
49  * Error message prefixes
50  */
51 static const char *link_co_err = "Link changeover error, ";
52 static const char *link_rst_msg = "Resetting link ";
53 static const char *link_unk_evt = "Unknown link event ";
54 
55 static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
56 	[TIPC_NLA_LINK_UNSPEC]		= { .type = NLA_UNSPEC },
57 	[TIPC_NLA_LINK_NAME] = {
58 		.type = NLA_STRING,
59 		.len = TIPC_MAX_LINK_NAME
60 	},
61 	[TIPC_NLA_LINK_MTU]		= { .type = NLA_U32 },
62 	[TIPC_NLA_LINK_BROADCAST]	= { .type = NLA_FLAG },
63 	[TIPC_NLA_LINK_UP]		= { .type = NLA_FLAG },
64 	[TIPC_NLA_LINK_ACTIVE]		= { .type = NLA_FLAG },
65 	[TIPC_NLA_LINK_PROP]		= { .type = NLA_NESTED },
66 	[TIPC_NLA_LINK_STATS]		= { .type = NLA_NESTED },
67 	[TIPC_NLA_LINK_RX]		= { .type = NLA_U32 },
68 	[TIPC_NLA_LINK_TX]		= { .type = NLA_U32 }
69 };
70 
71 /* Properties valid for media, bearar and link */
72 static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
73 	[TIPC_NLA_PROP_UNSPEC]		= { .type = NLA_UNSPEC },
74 	[TIPC_NLA_PROP_PRIO]		= { .type = NLA_U32 },
75 	[TIPC_NLA_PROP_TOL]		= { .type = NLA_U32 },
76 	[TIPC_NLA_PROP_WIN]		= { .type = NLA_U32 }
77 };
78 
79 /*
80  * Out-of-range value for link session numbers
81  */
82 #define INVALID_SESSION 0x10000
83 
84 /*
85  * Link state events:
86  */
87 #define  STARTING_EVT    856384768	/* link processing trigger */
88 #define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
89 #define  TIMEOUT_EVT     560817u	/* link timer expired */
90 
91 /*
92  * The following two 'message types' is really just implementation
93  * data conveniently stored in the message header.
94  * They must not be considered part of the protocol
95  */
96 #define OPEN_MSG   0
97 #define CLOSED_MSG 1
98 
99 /*
100  * State value stored in 'exp_msg_count'
101  */
102 #define START_CHANGEOVER 100000u
103 
104 static void link_handle_out_of_seq_msg(struct tipc_link *link,
105 				       struct sk_buff *skb);
106 static void tipc_link_proto_rcv(struct tipc_link *link,
107 				struct sk_buff *skb);
108 static int  tipc_link_tunnel_rcv(struct tipc_node *node,
109 				 struct sk_buff **skb);
110 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
111 static void link_state_event(struct tipc_link *l_ptr, u32 event);
112 static void link_reset_statistics(struct tipc_link *l_ptr);
113 static void link_print(struct tipc_link *l_ptr, const char *str);
114 static void tipc_link_sync_xmit(struct tipc_link *l);
115 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
116 static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
117 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
118 
119 /*
120  *  Simple link routines
121  */
122 static unsigned int align(unsigned int i)
123 {
124 	return (i + 3) & ~3u;
125 }
126 
127 static void tipc_link_release(struct kref *kref)
128 {
129 	kfree(container_of(kref, struct tipc_link, ref));
130 }
131 
132 static void tipc_link_get(struct tipc_link *l_ptr)
133 {
134 	kref_get(&l_ptr->ref);
135 }
136 
137 static void tipc_link_put(struct tipc_link *l_ptr)
138 {
139 	kref_put(&l_ptr->ref, tipc_link_release);
140 }
141 
142 static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
143 {
144 	if (l->owner->active_links[0] != l)
145 		return l->owner->active_links[0];
146 	return l->owner->active_links[1];
147 }
148 
149 static void link_init_max_pkt(struct tipc_link *l_ptr)
150 {
151 	struct tipc_node *node = l_ptr->owner;
152 	struct tipc_net *tn = net_generic(node->net, tipc_net_id);
153 	struct tipc_bearer *b_ptr;
154 	u32 max_pkt;
155 
156 	rcu_read_lock();
157 	b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
158 	if (!b_ptr) {
159 		rcu_read_unlock();
160 		return;
161 	}
162 	max_pkt = (b_ptr->mtu & ~3);
163 	rcu_read_unlock();
164 
165 	if (max_pkt > MAX_MSG_SIZE)
166 		max_pkt = MAX_MSG_SIZE;
167 
168 	l_ptr->max_pkt_target = max_pkt;
169 	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
170 		l_ptr->max_pkt = l_ptr->max_pkt_target;
171 	else
172 		l_ptr->max_pkt = MAX_PKT_DEFAULT;
173 
174 	l_ptr->max_pkt_probes = 0;
175 }
176 
177 /*
178  *  Simple non-static link routines (i.e. referenced outside this file)
179  */
180 int tipc_link_is_up(struct tipc_link *l_ptr)
181 {
182 	if (!l_ptr)
183 		return 0;
184 	return link_working_working(l_ptr) || link_working_unknown(l_ptr);
185 }
186 
187 int tipc_link_is_active(struct tipc_link *l_ptr)
188 {
189 	return	(l_ptr->owner->active_links[0] == l_ptr) ||
190 		(l_ptr->owner->active_links[1] == l_ptr);
191 }
192 
193 /**
194  * link_timeout - handle expiration of link timer
195  * @l_ptr: pointer to link
196  */
197 static void link_timeout(unsigned long data)
198 {
199 	struct tipc_link *l_ptr = (struct tipc_link *)data;
200 	struct sk_buff *skb;
201 
202 	tipc_node_lock(l_ptr->owner);
203 
204 	/* update counters used in statistical profiling of send traffic */
205 	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
206 	l_ptr->stats.queue_sz_counts++;
207 
208 	skb = skb_peek(&l_ptr->transmq);
209 	if (skb) {
210 		struct tipc_msg *msg = buf_msg(skb);
211 		u32 length = msg_size(msg);
212 
213 		if ((msg_user(msg) == MSG_FRAGMENTER) &&
214 		    (msg_type(msg) == FIRST_FRAGMENT)) {
215 			length = msg_size(msg_get_wrapped(msg));
216 		}
217 		if (length) {
218 			l_ptr->stats.msg_lengths_total += length;
219 			l_ptr->stats.msg_length_counts++;
220 			if (length <= 64)
221 				l_ptr->stats.msg_length_profile[0]++;
222 			else if (length <= 256)
223 				l_ptr->stats.msg_length_profile[1]++;
224 			else if (length <= 1024)
225 				l_ptr->stats.msg_length_profile[2]++;
226 			else if (length <= 4096)
227 				l_ptr->stats.msg_length_profile[3]++;
228 			else if (length <= 16384)
229 				l_ptr->stats.msg_length_profile[4]++;
230 			else if (length <= 32768)
231 				l_ptr->stats.msg_length_profile[5]++;
232 			else
233 				l_ptr->stats.msg_length_profile[6]++;
234 		}
235 	}
236 
237 	/* do all other link processing performed on a periodic basis */
238 	link_state_event(l_ptr, TIMEOUT_EVT);
239 
240 	if (skb_queue_len(&l_ptr->backlogq))
241 		tipc_link_push_packets(l_ptr);
242 
243 	tipc_node_unlock(l_ptr->owner);
244 	tipc_link_put(l_ptr);
245 }
246 
247 static void link_set_timer(struct tipc_link *link, unsigned long time)
248 {
249 	if (!mod_timer(&link->timer, jiffies + time))
250 		tipc_link_get(link);
251 }
252 
253 /**
254  * tipc_link_create - create a new link
255  * @n_ptr: pointer to associated node
256  * @b_ptr: pointer to associated bearer
257  * @media_addr: media address to use when sending messages over link
258  *
259  * Returns pointer to link.
260  */
261 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
262 				   struct tipc_bearer *b_ptr,
263 				   const struct tipc_media_addr *media_addr)
264 {
265 	struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
266 	struct tipc_link *l_ptr;
267 	struct tipc_msg *msg;
268 	char *if_name;
269 	char addr_string[16];
270 	u32 peer = n_ptr->addr;
271 
272 	if (n_ptr->link_cnt >= MAX_BEARERS) {
273 		tipc_addr_string_fill(addr_string, n_ptr->addr);
274 		pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n",
275 			n_ptr->link_cnt, addr_string, MAX_BEARERS);
276 		return NULL;
277 	}
278 
279 	if (n_ptr->links[b_ptr->identity]) {
280 		tipc_addr_string_fill(addr_string, n_ptr->addr);
281 		pr_err("Attempt to establish second link on <%s> to %s\n",
282 		       b_ptr->name, addr_string);
283 		return NULL;
284 	}
285 
286 	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
287 	if (!l_ptr) {
288 		pr_warn("Link creation failed, no memory\n");
289 		return NULL;
290 	}
291 	kref_init(&l_ptr->ref);
292 	l_ptr->addr = peer;
293 	if_name = strchr(b_ptr->name, ':') + 1;
294 	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
295 		tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr),
296 		tipc_node(tn->own_addr),
297 		if_name,
298 		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
299 		/* note: peer i/f name is updated by reset/activate message */
300 	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
301 	l_ptr->owner = n_ptr;
302 	l_ptr->checkpoint = 1;
303 	l_ptr->peer_session = INVALID_SESSION;
304 	l_ptr->bearer_id = b_ptr->identity;
305 	link_set_supervision_props(l_ptr, b_ptr->tolerance);
306 	l_ptr->state = RESET_UNKNOWN;
307 
308 	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
309 	msg = l_ptr->pmsg;
310 	tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE,
311 		      l_ptr->addr);
312 	msg_set_size(msg, sizeof(l_ptr->proto_msg));
313 	msg_set_session(msg, (tn->random & 0xffff));
314 	msg_set_bearer_id(msg, b_ptr->identity);
315 	strcpy((char *)msg_data(msg), if_name);
316 	l_ptr->net_plane = b_ptr->net_plane;
317 	link_init_max_pkt(l_ptr);
318 	l_ptr->priority = b_ptr->priority;
319 	tipc_link_set_queue_limits(l_ptr, b_ptr->window);
320 	l_ptr->next_out_no = 1;
321 	__skb_queue_head_init(&l_ptr->transmq);
322 	__skb_queue_head_init(&l_ptr->backlogq);
323 	__skb_queue_head_init(&l_ptr->deferdq);
324 	skb_queue_head_init(&l_ptr->wakeupq);
325 	skb_queue_head_init(&l_ptr->inputq);
326 	skb_queue_head_init(&l_ptr->namedq);
327 	link_reset_statistics(l_ptr);
328 	tipc_node_attach_link(n_ptr, l_ptr);
329 	setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
330 	link_state_event(l_ptr, STARTING_EVT);
331 
332 	return l_ptr;
333 }
334 
335 /**
336  * link_delete - Conditional deletion of link.
337  *               If timer still running, real delete is done when it expires
338  * @link: link to be deleted
339  */
340 void tipc_link_delete(struct tipc_link *link)
341 {
342 	tipc_link_reset_fragments(link);
343 	tipc_node_detach_link(link->owner, link);
344 	tipc_link_put(link);
345 }
346 
347 void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
348 			   bool shutting_down)
349 {
350 	struct tipc_net *tn = net_generic(net, tipc_net_id);
351 	struct tipc_link *link;
352 	struct tipc_node *node;
353 	bool del_link;
354 
355 	rcu_read_lock();
356 	list_for_each_entry_rcu(node, &tn->node_list, list) {
357 		tipc_node_lock(node);
358 		link = node->links[bearer_id];
359 		if (!link) {
360 			tipc_node_unlock(node);
361 			continue;
362 		}
363 		del_link = !tipc_link_is_up(link) && !link->exp_msg_count;
364 		tipc_link_reset(link);
365 		if (del_timer(&link->timer))
366 			tipc_link_put(link);
367 		link->flags |= LINK_STOPPED;
368 		/* Delete link now, or when failover is finished: */
369 		if (shutting_down || !tipc_node_is_up(node) || del_link)
370 			tipc_link_delete(link);
371 		tipc_node_unlock(node);
372 	}
373 	rcu_read_unlock();
374 }
375 
376 /**
377  * link_schedule_user - schedule a message sender for wakeup after congestion
378  * @link: congested link
379  * @list: message that was attempted sent
380  * Create pseudo msg to send back to user when congestion abates
381  * Only consumes message if there is an error
382  */
383 static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
384 {
385 	struct tipc_msg *msg = buf_msg(skb_peek(list));
386 	int imp = msg_importance(msg);
387 	u32 oport = msg_origport(msg);
388 	u32 addr = link_own_addr(link);
389 	struct sk_buff *skb;
390 
391 	/* This really cannot happen...  */
392 	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
393 		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
394 		tipc_link_reset(link);
395 		goto err;
396 	}
397 	/* Non-blocking sender: */
398 	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
399 		return -ELINKCONG;
400 
401 	/* Create and schedule wakeup pseudo message */
402 	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
403 			      addr, addr, oport, 0, 0);
404 	if (!skb)
405 		goto err;
406 	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
407 	TIPC_SKB_CB(skb)->chain_imp = imp;
408 	skb_queue_tail(&link->wakeupq, skb);
409 	link->stats.link_congs++;
410 	return -ELINKCONG;
411 err:
412 	__skb_queue_purge(list);
413 	return -ENOBUFS;
414 }
415 
416 /**
417  * link_prepare_wakeup - prepare users for wakeup after congestion
418  * @link: congested link
419  * Move a number of waiting users, as permitted by available space in
420  * the send queue, from link wait queue to node wait queue for wakeup
421  */
422 void link_prepare_wakeup(struct tipc_link *l)
423 {
424 	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
425 	int imp, lim;
426 	struct sk_buff *skb, *tmp;
427 
428 	skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
429 		imp = TIPC_SKB_CB(skb)->chain_imp;
430 		lim = l->window + l->backlog[imp].limit;
431 		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
432 		if ((pnd[imp] + l->backlog[imp].len) >= lim)
433 			break;
434 		skb_unlink(skb, &l->wakeupq);
435 		skb_queue_tail(&l->inputq, skb);
436 		l->owner->inputq = &l->inputq;
437 		l->owner->action_flags |= TIPC_MSG_EVT;
438 	}
439 }
440 
441 /**
442  * tipc_link_reset_fragments - purge link's inbound message fragments queue
443  * @l_ptr: pointer to link
444  */
445 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
446 {
447 	kfree_skb(l_ptr->reasm_buf);
448 	l_ptr->reasm_buf = NULL;
449 }
450 
451 static void tipc_link_purge_backlog(struct tipc_link *l)
452 {
453 	__skb_queue_purge(&l->backlogq);
454 	l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
455 	l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
456 	l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
457 	l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
458 	l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
459 }
460 
461 /**
462  * tipc_link_purge_queues - purge all pkt queues associated with link
463  * @l_ptr: pointer to link
464  */
465 void tipc_link_purge_queues(struct tipc_link *l_ptr)
466 {
467 	__skb_queue_purge(&l_ptr->deferdq);
468 	__skb_queue_purge(&l_ptr->transmq);
469 	tipc_link_purge_backlog(l_ptr);
470 	tipc_link_reset_fragments(l_ptr);
471 }
472 
473 void tipc_link_reset(struct tipc_link *l_ptr)
474 {
475 	u32 prev_state = l_ptr->state;
476 	u32 checkpoint = l_ptr->next_in_no;
477 	int was_active_link = tipc_link_is_active(l_ptr);
478 	struct tipc_node *owner = l_ptr->owner;
479 
480 	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
481 
482 	/* Link is down, accept any session */
483 	l_ptr->peer_session = INVALID_SESSION;
484 
485 	/* Prepare for max packet size negotiation */
486 	link_init_max_pkt(l_ptr);
487 
488 	l_ptr->state = RESET_UNKNOWN;
489 
490 	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
491 		return;
492 
493 	tipc_node_link_down(l_ptr->owner, l_ptr);
494 	tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
495 
496 	if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
497 		l_ptr->reset_checkpoint = checkpoint;
498 		l_ptr->exp_msg_count = START_CHANGEOVER;
499 	}
500 
501 	/* Clean up all queues, except inputq: */
502 	__skb_queue_purge(&l_ptr->transmq);
503 	__skb_queue_purge(&l_ptr->deferdq);
504 	if (!owner->inputq)
505 		owner->inputq = &l_ptr->inputq;
506 	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
507 	if (!skb_queue_empty(owner->inputq))
508 		owner->action_flags |= TIPC_MSG_EVT;
509 	tipc_link_purge_backlog(l_ptr);
510 	l_ptr->rcv_unacked = 0;
511 	l_ptr->checkpoint = 1;
512 	l_ptr->next_out_no = 1;
513 	l_ptr->fsm_msg_cnt = 0;
514 	l_ptr->stale_count = 0;
515 	link_reset_statistics(l_ptr);
516 }
517 
518 void tipc_link_reset_list(struct net *net, unsigned int bearer_id)
519 {
520 	struct tipc_net *tn = net_generic(net, tipc_net_id);
521 	struct tipc_link *l_ptr;
522 	struct tipc_node *n_ptr;
523 
524 	rcu_read_lock();
525 	list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
526 		tipc_node_lock(n_ptr);
527 		l_ptr = n_ptr->links[bearer_id];
528 		if (l_ptr)
529 			tipc_link_reset(l_ptr);
530 		tipc_node_unlock(n_ptr);
531 	}
532 	rcu_read_unlock();
533 }
534 
535 static void link_activate(struct tipc_link *link)
536 {
537 	struct tipc_node *node = link->owner;
538 
539 	link->next_in_no = 1;
540 	link->stats.recv_info = 1;
541 	tipc_node_link_up(node, link);
542 	tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
543 }
544 
545 /**
546  * link_state_event - link finite state machine
547  * @l_ptr: pointer to link
548  * @event: state machine event to process
549  */
550 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
551 {
552 	struct tipc_link *other;
553 	unsigned long cont_intv = l_ptr->cont_intv;
554 
555 	if (l_ptr->flags & LINK_STOPPED)
556 		return;
557 
558 	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
559 		return;		/* Not yet. */
560 
561 	/* Check whether changeover is going on */
562 	if (l_ptr->exp_msg_count) {
563 		if (event == TIMEOUT_EVT)
564 			link_set_timer(l_ptr, cont_intv);
565 		return;
566 	}
567 
568 	switch (l_ptr->state) {
569 	case WORKING_WORKING:
570 		switch (event) {
571 		case TRAFFIC_MSG_EVT:
572 		case ACTIVATE_MSG:
573 			break;
574 		case TIMEOUT_EVT:
575 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
576 				l_ptr->checkpoint = l_ptr->next_in_no;
577 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
578 					tipc_link_proto_xmit(l_ptr, STATE_MSG,
579 							     0, 0, 0, 0, 0);
580 					l_ptr->fsm_msg_cnt++;
581 				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
582 					tipc_link_proto_xmit(l_ptr, STATE_MSG,
583 							     1, 0, 0, 0, 0);
584 					l_ptr->fsm_msg_cnt++;
585 				}
586 				link_set_timer(l_ptr, cont_intv);
587 				break;
588 			}
589 			l_ptr->state = WORKING_UNKNOWN;
590 			l_ptr->fsm_msg_cnt = 0;
591 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
592 			l_ptr->fsm_msg_cnt++;
593 			link_set_timer(l_ptr, cont_intv / 4);
594 			break;
595 		case RESET_MSG:
596 			pr_debug("%s<%s>, requested by peer\n",
597 				 link_rst_msg, l_ptr->name);
598 			tipc_link_reset(l_ptr);
599 			l_ptr->state = RESET_RESET;
600 			l_ptr->fsm_msg_cnt = 0;
601 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
602 					     0, 0, 0, 0, 0);
603 			l_ptr->fsm_msg_cnt++;
604 			link_set_timer(l_ptr, cont_intv);
605 			break;
606 		default:
607 			pr_debug("%s%u in WW state\n", link_unk_evt, event);
608 		}
609 		break;
610 	case WORKING_UNKNOWN:
611 		switch (event) {
612 		case TRAFFIC_MSG_EVT:
613 		case ACTIVATE_MSG:
614 			l_ptr->state = WORKING_WORKING;
615 			l_ptr->fsm_msg_cnt = 0;
616 			link_set_timer(l_ptr, cont_intv);
617 			break;
618 		case RESET_MSG:
619 			pr_debug("%s<%s>, requested by peer while probing\n",
620 				 link_rst_msg, l_ptr->name);
621 			tipc_link_reset(l_ptr);
622 			l_ptr->state = RESET_RESET;
623 			l_ptr->fsm_msg_cnt = 0;
624 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
625 					     0, 0, 0, 0, 0);
626 			l_ptr->fsm_msg_cnt++;
627 			link_set_timer(l_ptr, cont_intv);
628 			break;
629 		case TIMEOUT_EVT:
630 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
631 				l_ptr->state = WORKING_WORKING;
632 				l_ptr->fsm_msg_cnt = 0;
633 				l_ptr->checkpoint = l_ptr->next_in_no;
634 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
635 					tipc_link_proto_xmit(l_ptr, STATE_MSG,
636 							     0, 0, 0, 0, 0);
637 					l_ptr->fsm_msg_cnt++;
638 				}
639 				link_set_timer(l_ptr, cont_intv);
640 			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
641 				tipc_link_proto_xmit(l_ptr, STATE_MSG,
642 						     1, 0, 0, 0, 0);
643 				l_ptr->fsm_msg_cnt++;
644 				link_set_timer(l_ptr, cont_intv / 4);
645 			} else {	/* Link has failed */
646 				pr_debug("%s<%s>, peer not responding\n",
647 					 link_rst_msg, l_ptr->name);
648 				tipc_link_reset(l_ptr);
649 				l_ptr->state = RESET_UNKNOWN;
650 				l_ptr->fsm_msg_cnt = 0;
651 				tipc_link_proto_xmit(l_ptr, RESET_MSG,
652 						     0, 0, 0, 0, 0);
653 				l_ptr->fsm_msg_cnt++;
654 				link_set_timer(l_ptr, cont_intv);
655 			}
656 			break;
657 		default:
658 			pr_err("%s%u in WU state\n", link_unk_evt, event);
659 		}
660 		break;
661 	case RESET_UNKNOWN:
662 		switch (event) {
663 		case TRAFFIC_MSG_EVT:
664 			break;
665 		case ACTIVATE_MSG:
666 			other = l_ptr->owner->active_links[0];
667 			if (other && link_working_unknown(other))
668 				break;
669 			l_ptr->state = WORKING_WORKING;
670 			l_ptr->fsm_msg_cnt = 0;
671 			link_activate(l_ptr);
672 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
673 			l_ptr->fsm_msg_cnt++;
674 			if (l_ptr->owner->working_links == 1)
675 				tipc_link_sync_xmit(l_ptr);
676 			link_set_timer(l_ptr, cont_intv);
677 			break;
678 		case RESET_MSG:
679 			l_ptr->state = RESET_RESET;
680 			l_ptr->fsm_msg_cnt = 0;
681 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
682 					     1, 0, 0, 0, 0);
683 			l_ptr->fsm_msg_cnt++;
684 			link_set_timer(l_ptr, cont_intv);
685 			break;
686 		case STARTING_EVT:
687 			l_ptr->flags |= LINK_STARTED;
688 			l_ptr->fsm_msg_cnt++;
689 			link_set_timer(l_ptr, cont_intv);
690 			break;
691 		case TIMEOUT_EVT:
692 			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
693 			l_ptr->fsm_msg_cnt++;
694 			link_set_timer(l_ptr, cont_intv);
695 			break;
696 		default:
697 			pr_err("%s%u in RU state\n", link_unk_evt, event);
698 		}
699 		break;
700 	case RESET_RESET:
701 		switch (event) {
702 		case TRAFFIC_MSG_EVT:
703 		case ACTIVATE_MSG:
704 			other = l_ptr->owner->active_links[0];
705 			if (other && link_working_unknown(other))
706 				break;
707 			l_ptr->state = WORKING_WORKING;
708 			l_ptr->fsm_msg_cnt = 0;
709 			link_activate(l_ptr);
710 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
711 			l_ptr->fsm_msg_cnt++;
712 			if (l_ptr->owner->working_links == 1)
713 				tipc_link_sync_xmit(l_ptr);
714 			link_set_timer(l_ptr, cont_intv);
715 			break;
716 		case RESET_MSG:
717 			break;
718 		case TIMEOUT_EVT:
719 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
720 					     0, 0, 0, 0, 0);
721 			l_ptr->fsm_msg_cnt++;
722 			link_set_timer(l_ptr, cont_intv);
723 			break;
724 		default:
725 			pr_err("%s%u in RR state\n", link_unk_evt, event);
726 		}
727 		break;
728 	default:
729 		pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
730 	}
731 }
732 
733 /**
734  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
735  * @link: link to use
736  * @list: chain of buffers containing message
737  *
738  * Consumes the buffer chain, except when returning -ELINKCONG,
739  * since the caller then may want to make more send attempts.
740  * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
741  * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
742  */
743 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
744 		     struct sk_buff_head *list)
745 {
746 	struct tipc_msg *msg = buf_msg(skb_peek(list));
747 	unsigned int maxwin = link->window;
748 	unsigned int imp = msg_importance(msg);
749 	uint mtu = link->max_pkt;
750 	uint ack = mod(link->next_in_no - 1);
751 	uint seqno = link->next_out_no;
752 	uint bc_last_in = link->owner->bclink.last_in;
753 	struct tipc_media_addr *addr = &link->media_addr;
754 	struct sk_buff_head *transmq = &link->transmq;
755 	struct sk_buff_head *backlogq = &link->backlogq;
756 	struct sk_buff *skb, *tmp;
757 
758 	/* Match backlog limit against msg importance: */
759 	if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
760 		return link_schedule_user(link, list);
761 
762 	if (unlikely(msg_size(msg) > mtu)) {
763 		__skb_queue_purge(list);
764 		return -EMSGSIZE;
765 	}
766 	/* Prepare each packet for sending, and add to relevant queue: */
767 	skb_queue_walk_safe(list, skb, tmp) {
768 		__skb_unlink(skb, list);
769 		msg = buf_msg(skb);
770 		msg_set_seqno(msg, seqno);
771 		msg_set_ack(msg, ack);
772 		msg_set_bcast_ack(msg, bc_last_in);
773 
774 		if (likely(skb_queue_len(transmq) < maxwin)) {
775 			__skb_queue_tail(transmq, skb);
776 			tipc_bearer_send(net, link->bearer_id, skb, addr);
777 			link->rcv_unacked = 0;
778 			seqno++;
779 			continue;
780 		}
781 		if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
782 			link->stats.sent_bundled++;
783 			continue;
784 		}
785 		if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
786 			link->stats.sent_bundled++;
787 			link->stats.sent_bundles++;
788 			imp = msg_importance(buf_msg(skb));
789 		}
790 		__skb_queue_tail(backlogq, skb);
791 		link->backlog[imp].len++;
792 		seqno++;
793 	}
794 	link->next_out_no = seqno;
795 	return 0;
796 }
797 
798 static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
799 {
800 	skb_queue_head_init(list);
801 	__skb_queue_tail(list, skb);
802 }
803 
804 static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
805 {
806 	struct sk_buff_head head;
807 
808 	skb2list(skb, &head);
809 	return __tipc_link_xmit(link->owner->net, link, &head);
810 }
811 
812 /* tipc_link_xmit_skb(): send single buffer to destination
813  * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
814  * messages, which will not be rejected
815  * The only exception is datagram messages rerouted after secondary
816  * lookup, which are rare and safe to dispose of anyway.
817  * TODO: Return real return value, and let callers use
818  * tipc_wait_for_sendpkt() where applicable
819  */
820 int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
821 		       u32 selector)
822 {
823 	struct sk_buff_head head;
824 	int rc;
825 
826 	skb2list(skb, &head);
827 	rc = tipc_link_xmit(net, &head, dnode, selector);
828 	if (rc == -ELINKCONG)
829 		kfree_skb(skb);
830 	return 0;
831 }
832 
833 /**
834  * tipc_link_xmit() is the general link level function for message sending
835  * @net: the applicable net namespace
836  * @list: chain of buffers containing message
837  * @dsz: amount of user data to be sent
838  * @dnode: address of destination node
839  * @selector: a number used for deterministic link selection
840  * Consumes the buffer chain, except when returning -ELINKCONG
841  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
842  */
843 int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
844 		   u32 selector)
845 {
846 	struct tipc_link *link = NULL;
847 	struct tipc_node *node;
848 	int rc = -EHOSTUNREACH;
849 
850 	node = tipc_node_find(net, dnode);
851 	if (node) {
852 		tipc_node_lock(node);
853 		link = node->active_links[selector & 1];
854 		if (link)
855 			rc = __tipc_link_xmit(net, link, list);
856 		tipc_node_unlock(node);
857 	}
858 	if (link)
859 		return rc;
860 
861 	if (likely(in_own_node(net, dnode))) {
862 		tipc_sk_rcv(net, list);
863 		return 0;
864 	}
865 
866 	__skb_queue_purge(list);
867 	return rc;
868 }
869 
870 /*
871  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
872  *
873  * Give a newly added peer node the sequence number where it should
874  * start receiving and acking broadcast packets.
875  *
876  * Called with node locked
877  */
878 static void tipc_link_sync_xmit(struct tipc_link *link)
879 {
880 	struct sk_buff *skb;
881 	struct tipc_msg *msg;
882 
883 	skb = tipc_buf_acquire(INT_H_SIZE);
884 	if (!skb)
885 		return;
886 
887 	msg = buf_msg(skb);
888 	tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
889 		      INT_H_SIZE, link->addr);
890 	msg_set_last_bcast(msg, link->owner->bclink.acked);
891 	__tipc_link_xmit_skb(link, skb);
892 }
893 
894 /*
895  * tipc_link_sync_rcv - synchronize broadcast link endpoints.
896  * Receive the sequence number where we should start receiving and
897  * acking broadcast packets from a newly added peer node, and open
898  * up for reception of such packets.
899  *
900  * Called with node locked
901  */
902 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
903 {
904 	struct tipc_msg *msg = buf_msg(buf);
905 
906 	n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
907 	n->bclink.recv_permitted = true;
908 	kfree_skb(buf);
909 }
910 
911 /*
912  * tipc_link_push_packets - push unsent packets to bearer
913  *
914  * Push out the unsent messages of a link where congestion
915  * has abated. Node is locked.
916  *
917  * Called with node locked
918  */
919 void tipc_link_push_packets(struct tipc_link *link)
920 {
921 	struct sk_buff *skb;
922 	struct tipc_msg *msg;
923 	unsigned int ack = mod(link->next_in_no - 1);
924 
925 	while (skb_queue_len(&link->transmq) < link->window) {
926 		skb = __skb_dequeue(&link->backlogq);
927 		if (!skb)
928 			break;
929 		msg = buf_msg(skb);
930 		link->backlog[msg_importance(msg)].len--;
931 		msg_set_ack(msg, ack);
932 		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
933 		link->rcv_unacked = 0;
934 		__skb_queue_tail(&link->transmq, skb);
935 		tipc_bearer_send(link->owner->net, link->bearer_id,
936 				 skb, &link->media_addr);
937 	}
938 }
939 
940 void tipc_link_reset_all(struct tipc_node *node)
941 {
942 	char addr_string[16];
943 	u32 i;
944 
945 	tipc_node_lock(node);
946 
947 	pr_warn("Resetting all links to %s\n",
948 		tipc_addr_string_fill(addr_string, node->addr));
949 
950 	for (i = 0; i < MAX_BEARERS; i++) {
951 		if (node->links[i]) {
952 			link_print(node->links[i], "Resetting link\n");
953 			tipc_link_reset(node->links[i]);
954 		}
955 	}
956 
957 	tipc_node_unlock(node);
958 }
959 
960 static void link_retransmit_failure(struct tipc_link *l_ptr,
961 				    struct sk_buff *buf)
962 {
963 	struct tipc_msg *msg = buf_msg(buf);
964 	struct net *net = l_ptr->owner->net;
965 
966 	pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
967 
968 	if (l_ptr->addr) {
969 		/* Handle failure on standard link */
970 		link_print(l_ptr, "Resetting link\n");
971 		tipc_link_reset(l_ptr);
972 
973 	} else {
974 		/* Handle failure on broadcast link */
975 		struct tipc_node *n_ptr;
976 		char addr_string[16];
977 
978 		pr_info("Msg seq number: %u,  ", msg_seqno(msg));
979 		pr_cont("Outstanding acks: %lu\n",
980 			(unsigned long) TIPC_SKB_CB(buf)->handle);
981 
982 		n_ptr = tipc_bclink_retransmit_to(net);
983 		tipc_node_lock(n_ptr);
984 
985 		tipc_addr_string_fill(addr_string, n_ptr->addr);
986 		pr_info("Broadcast link info for %s\n", addr_string);
987 		pr_info("Reception permitted: %d,  Acked: %u\n",
988 			n_ptr->bclink.recv_permitted,
989 			n_ptr->bclink.acked);
990 		pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
991 			n_ptr->bclink.last_in,
992 			n_ptr->bclink.oos_state,
993 			n_ptr->bclink.last_sent);
994 
995 		tipc_node_unlock(n_ptr);
996 
997 		tipc_bclink_set_flags(net, TIPC_BCLINK_RESET);
998 		l_ptr->stale_count = 0;
999 	}
1000 }
1001 
1002 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1003 			  u32 retransmits)
1004 {
1005 	struct tipc_msg *msg;
1006 
1007 	if (!skb)
1008 		return;
1009 
1010 	msg = buf_msg(skb);
1011 
1012 	/* Detect repeated retransmit failures */
1013 	if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1014 		if (++l_ptr->stale_count > 100) {
1015 			link_retransmit_failure(l_ptr, skb);
1016 			return;
1017 		}
1018 	} else {
1019 		l_ptr->last_retransmitted = msg_seqno(msg);
1020 		l_ptr->stale_count = 1;
1021 	}
1022 
1023 	skb_queue_walk_from(&l_ptr->transmq, skb) {
1024 		if (!retransmits)
1025 			break;
1026 		msg = buf_msg(skb);
1027 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1028 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1029 		tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
1030 				 &l_ptr->media_addr);
1031 		retransmits--;
1032 		l_ptr->stats.retransmitted++;
1033 	}
1034 }
1035 
1036 /* link_synch(): check if all packets arrived before the synch
1037  *               point have been consumed
1038  * Returns true if the parallel links are synched, otherwise false
1039  */
1040 static bool link_synch(struct tipc_link *l)
1041 {
1042 	unsigned int post_synch;
1043 	struct tipc_link *pl;
1044 
1045 	pl  = tipc_parallel_link(l);
1046 	if (pl == l)
1047 		goto synched;
1048 
1049 	/* Was last pre-synch packet added to input queue ? */
1050 	if (less_eq(pl->next_in_no, l->synch_point))
1051 		return false;
1052 
1053 	/* Is it still in the input queue ? */
1054 	post_synch = mod(pl->next_in_no - l->synch_point) - 1;
1055 	if (skb_queue_len(&pl->inputq) > post_synch)
1056 		return false;
1057 synched:
1058 	l->flags &= ~LINK_SYNCHING;
1059 	return true;
1060 }
1061 
1062 static void link_retrieve_defq(struct tipc_link *link,
1063 			       struct sk_buff_head *list)
1064 {
1065 	u32 seq_no;
1066 
1067 	if (skb_queue_empty(&link->deferdq))
1068 		return;
1069 
1070 	seq_no = buf_seqno(skb_peek(&link->deferdq));
1071 	if (seq_no == mod(link->next_in_no))
1072 		skb_queue_splice_tail_init(&link->deferdq, list);
1073 }
1074 
1075 /**
1076  * tipc_rcv - process TIPC packets/messages arriving from off-node
1077  * @net: the applicable net namespace
1078  * @skb: TIPC packet
1079  * @b_ptr: pointer to bearer message arrived on
1080  *
1081  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1082  * structure (i.e. cannot be NULL), but bearer can be inactive.
1083  */
1084 void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1085 {
1086 	struct tipc_net *tn = net_generic(net, tipc_net_id);
1087 	struct sk_buff_head head;
1088 	struct tipc_node *n_ptr;
1089 	struct tipc_link *l_ptr;
1090 	struct sk_buff *skb1, *tmp;
1091 	struct tipc_msg *msg;
1092 	u32 seq_no;
1093 	u32 ackd;
1094 	u32 released;
1095 
1096 	skb2list(skb, &head);
1097 
1098 	while ((skb = __skb_dequeue(&head))) {
1099 		/* Ensure message is well-formed */
1100 		if (unlikely(!tipc_msg_validate(skb)))
1101 			goto discard;
1102 
1103 		/* Handle arrival of a non-unicast link message */
1104 		msg = buf_msg(skb);
1105 		if (unlikely(msg_non_seq(msg))) {
1106 			if (msg_user(msg) ==  LINK_CONFIG)
1107 				tipc_disc_rcv(net, skb, b_ptr);
1108 			else
1109 				tipc_bclink_rcv(net, skb);
1110 			continue;
1111 		}
1112 
1113 		/* Discard unicast link messages destined for another node */
1114 		if (unlikely(!msg_short(msg) &&
1115 			     (msg_destnode(msg) != tn->own_addr)))
1116 			goto discard;
1117 
1118 		/* Locate neighboring node that sent message */
1119 		n_ptr = tipc_node_find(net, msg_prevnode(msg));
1120 		if (unlikely(!n_ptr))
1121 			goto discard;
1122 		tipc_node_lock(n_ptr);
1123 
1124 		/* Locate unicast link endpoint that should handle message */
1125 		l_ptr = n_ptr->links[b_ptr->identity];
1126 		if (unlikely(!l_ptr))
1127 			goto unlock;
1128 
1129 		/* Verify that communication with node is currently allowed */
1130 		if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1131 		    msg_user(msg) == LINK_PROTOCOL &&
1132 		    (msg_type(msg) == RESET_MSG ||
1133 		    msg_type(msg) == ACTIVATE_MSG) &&
1134 		    !msg_redundant_link(msg))
1135 			n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1136 
1137 		if (tipc_node_blocked(n_ptr))
1138 			goto unlock;
1139 
1140 		/* Validate message sequence number info */
1141 		seq_no = msg_seqno(msg);
1142 		ackd = msg_ack(msg);
1143 
1144 		/* Release acked messages */
1145 		if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
1146 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1147 
1148 		released = 0;
1149 		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1150 			if (more(buf_seqno(skb1), ackd))
1151 				break;
1152 			 __skb_unlink(skb1, &l_ptr->transmq);
1153 			 kfree_skb(skb1);
1154 			 released = 1;
1155 		}
1156 
1157 		/* Try sending any messages link endpoint has pending */
1158 		if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1159 			tipc_link_push_packets(l_ptr);
1160 
1161 		if (released && !skb_queue_empty(&l_ptr->wakeupq))
1162 			link_prepare_wakeup(l_ptr);
1163 
1164 		/* Process the incoming packet */
1165 		if (unlikely(!link_working_working(l_ptr))) {
1166 			if (msg_user(msg) == LINK_PROTOCOL) {
1167 				tipc_link_proto_rcv(l_ptr, skb);
1168 				link_retrieve_defq(l_ptr, &head);
1169 				skb = NULL;
1170 				goto unlock;
1171 			}
1172 
1173 			/* Traffic message. Conditionally activate link */
1174 			link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1175 
1176 			if (link_working_working(l_ptr)) {
1177 				/* Re-insert buffer in front of queue */
1178 				__skb_queue_head(&head, skb);
1179 				skb = NULL;
1180 				goto unlock;
1181 			}
1182 			goto unlock;
1183 		}
1184 
1185 		/* Link is now in state WORKING_WORKING */
1186 		if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1187 			link_handle_out_of_seq_msg(l_ptr, skb);
1188 			link_retrieve_defq(l_ptr, &head);
1189 			skb = NULL;
1190 			goto unlock;
1191 		}
1192 		/* Synchronize with parallel link if applicable */
1193 		if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
1194 			link_handle_out_of_seq_msg(l_ptr, skb);
1195 			if (link_synch(l_ptr))
1196 				link_retrieve_defq(l_ptr, &head);
1197 			skb = NULL;
1198 			goto unlock;
1199 		}
1200 		l_ptr->next_in_no++;
1201 		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1202 			link_retrieve_defq(l_ptr, &head);
1203 		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1204 			l_ptr->stats.sent_acks++;
1205 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1206 		}
1207 		tipc_link_input(l_ptr, skb);
1208 		skb = NULL;
1209 unlock:
1210 		tipc_node_unlock(n_ptr);
1211 discard:
1212 		if (unlikely(skb))
1213 			kfree_skb(skb);
1214 	}
1215 }
1216 
1217 /* tipc_data_input - deliver data and name distr msgs to upper layer
1218  *
1219  * Consumes buffer if message is of right type
1220  * Node lock must be held
1221  */
1222 static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1223 {
1224 	struct tipc_node *node = link->owner;
1225 	struct tipc_msg *msg = buf_msg(skb);
1226 	u32 dport = msg_destport(msg);
1227 
1228 	switch (msg_user(msg)) {
1229 	case TIPC_LOW_IMPORTANCE:
1230 	case TIPC_MEDIUM_IMPORTANCE:
1231 	case TIPC_HIGH_IMPORTANCE:
1232 	case TIPC_CRITICAL_IMPORTANCE:
1233 	case CONN_MANAGER:
1234 		if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
1235 			node->inputq = &link->inputq;
1236 			node->action_flags |= TIPC_MSG_EVT;
1237 		}
1238 		return true;
1239 	case NAME_DISTRIBUTOR:
1240 		node->bclink.recv_permitted = true;
1241 		node->namedq = &link->namedq;
1242 		skb_queue_tail(&link->namedq, skb);
1243 		if (skb_queue_len(&link->namedq) == 1)
1244 			node->action_flags |= TIPC_NAMED_MSG_EVT;
1245 		return true;
1246 	case MSG_BUNDLER:
1247 	case CHANGEOVER_PROTOCOL:
1248 	case MSG_FRAGMENTER:
1249 	case BCAST_PROTOCOL:
1250 		return false;
1251 	default:
1252 		pr_warn("Dropping received illegal msg type\n");
1253 		kfree_skb(skb);
1254 		return false;
1255 	};
1256 }
1257 
1258 /* tipc_link_input - process packet that has passed link protocol check
1259  *
1260  * Consumes buffer
1261  * Node lock must be held
1262  */
1263 static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1264 {
1265 	struct tipc_node *node = link->owner;
1266 	struct tipc_msg *msg = buf_msg(skb);
1267 	struct sk_buff *iskb;
1268 	int pos = 0;
1269 
1270 	if (likely(tipc_data_input(link, skb)))
1271 		return;
1272 
1273 	switch (msg_user(msg)) {
1274 	case CHANGEOVER_PROTOCOL:
1275 		if (msg_dup(msg)) {
1276 			link->flags |= LINK_SYNCHING;
1277 			link->synch_point = msg_seqno(msg_get_wrapped(msg));
1278 		}
1279 		if (!tipc_link_tunnel_rcv(node, &skb))
1280 			break;
1281 		if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1282 			tipc_data_input(link, skb);
1283 			break;
1284 		}
1285 	case MSG_BUNDLER:
1286 		link->stats.recv_bundles++;
1287 		link->stats.recv_bundled += msg_msgcnt(msg);
1288 
1289 		while (tipc_msg_extract(skb, &iskb, &pos))
1290 			tipc_data_input(link, iskb);
1291 		break;
1292 	case MSG_FRAGMENTER:
1293 		link->stats.recv_fragments++;
1294 		if (tipc_buf_append(&link->reasm_buf, &skb)) {
1295 			link->stats.recv_fragmented++;
1296 			tipc_data_input(link, skb);
1297 		} else if (!link->reasm_buf) {
1298 			tipc_link_reset(link);
1299 		}
1300 		break;
1301 	case BCAST_PROTOCOL:
1302 		tipc_link_sync_rcv(node, skb);
1303 		break;
1304 	default:
1305 		break;
1306 	};
1307 }
1308 
1309 /**
1310  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1311  *
1312  * Returns increase in queue length (i.e. 0 or 1)
1313  */
1314 u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1315 {
1316 	struct sk_buff *skb1;
1317 	u32 seq_no = buf_seqno(skb);
1318 
1319 	/* Empty queue ? */
1320 	if (skb_queue_empty(list)) {
1321 		__skb_queue_tail(list, skb);
1322 		return 1;
1323 	}
1324 
1325 	/* Last ? */
1326 	if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1327 		__skb_queue_tail(list, skb);
1328 		return 1;
1329 	}
1330 
1331 	/* Locate insertion point in queue, then insert; discard if duplicate */
1332 	skb_queue_walk(list, skb1) {
1333 		u32 curr_seqno = buf_seqno(skb1);
1334 
1335 		if (seq_no == curr_seqno) {
1336 			kfree_skb(skb);
1337 			return 0;
1338 		}
1339 
1340 		if (less(seq_no, curr_seqno))
1341 			break;
1342 	}
1343 
1344 	__skb_queue_before(list, skb1, skb);
1345 	return 1;
1346 }
1347 
1348 /*
1349  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1350  */
1351 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1352 				       struct sk_buff *buf)
1353 {
1354 	u32 seq_no = buf_seqno(buf);
1355 
1356 	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1357 		tipc_link_proto_rcv(l_ptr, buf);
1358 		return;
1359 	}
1360 
1361 	/* Record OOS packet arrival (force mismatch on next timeout) */
1362 	l_ptr->checkpoint--;
1363 
1364 	/*
1365 	 * Discard packet if a duplicate; otherwise add it to deferred queue
1366 	 * and notify peer of gap as per protocol specification
1367 	 */
1368 	if (less(seq_no, mod(l_ptr->next_in_no))) {
1369 		l_ptr->stats.duplicates++;
1370 		kfree_skb(buf);
1371 		return;
1372 	}
1373 
1374 	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1375 		l_ptr->stats.deferred_recv++;
1376 		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1377 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1378 	} else {
1379 		l_ptr->stats.duplicates++;
1380 	}
1381 }
1382 
1383 /*
1384  * Send protocol message to the other endpoint.
1385  */
1386 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1387 			  u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1388 {
1389 	struct sk_buff *buf = NULL;
1390 	struct tipc_msg *msg = l_ptr->pmsg;
1391 	u32 msg_size = sizeof(l_ptr->proto_msg);
1392 	int r_flag;
1393 
1394 	/* Don't send protocol message during link changeover */
1395 	if (l_ptr->exp_msg_count)
1396 		return;
1397 
1398 	/* Abort non-RESET send if communication with node is prohibited */
1399 	if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1400 		return;
1401 
1402 	/* Create protocol message with "out-of-sequence" sequence number */
1403 	msg_set_type(msg, msg_typ);
1404 	msg_set_net_plane(msg, l_ptr->net_plane);
1405 	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1406 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net));
1407 
1408 	if (msg_typ == STATE_MSG) {
1409 		u32 next_sent = mod(l_ptr->next_out_no);
1410 
1411 		if (!tipc_link_is_up(l_ptr))
1412 			return;
1413 		if (skb_queue_len(&l_ptr->backlogq))
1414 			next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
1415 		msg_set_next_sent(msg, next_sent);
1416 		if (!skb_queue_empty(&l_ptr->deferdq)) {
1417 			u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
1418 			gap = mod(rec - mod(l_ptr->next_in_no));
1419 		}
1420 		msg_set_seq_gap(msg, gap);
1421 		if (gap)
1422 			l_ptr->stats.sent_nacks++;
1423 		msg_set_link_tolerance(msg, tolerance);
1424 		msg_set_linkprio(msg, priority);
1425 		msg_set_max_pkt(msg, ack_mtu);
1426 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1427 		msg_set_probe(msg, probe_msg != 0);
1428 		if (probe_msg) {
1429 			u32 mtu = l_ptr->max_pkt;
1430 
1431 			if ((mtu < l_ptr->max_pkt_target) &&
1432 			    link_working_working(l_ptr) &&
1433 			    l_ptr->fsm_msg_cnt) {
1434 				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1435 				if (l_ptr->max_pkt_probes == 10) {
1436 					l_ptr->max_pkt_target = (msg_size - 4);
1437 					l_ptr->max_pkt_probes = 0;
1438 					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1439 				}
1440 				l_ptr->max_pkt_probes++;
1441 			}
1442 
1443 			l_ptr->stats.sent_probes++;
1444 		}
1445 		l_ptr->stats.sent_states++;
1446 	} else {		/* RESET_MSG or ACTIVATE_MSG */
1447 		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1448 		msg_set_seq_gap(msg, 0);
1449 		msg_set_next_sent(msg, 1);
1450 		msg_set_probe(msg, 0);
1451 		msg_set_link_tolerance(msg, l_ptr->tolerance);
1452 		msg_set_linkprio(msg, l_ptr->priority);
1453 		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1454 	}
1455 
1456 	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1457 	msg_set_redundant_link(msg, r_flag);
1458 	msg_set_linkprio(msg, l_ptr->priority);
1459 	msg_set_size(msg, msg_size);
1460 
1461 	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1462 
1463 	buf = tipc_buf_acquire(msg_size);
1464 	if (!buf)
1465 		return;
1466 
1467 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1468 	buf->priority = TC_PRIO_CONTROL;
1469 	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
1470 			 &l_ptr->media_addr);
1471 	l_ptr->rcv_unacked = 0;
1472 	kfree_skb(buf);
1473 }
1474 
1475 /*
1476  * Receive protocol message :
1477  * Note that network plane id propagates through the network, and may
1478  * change at any time. The node with lowest address rules
1479  */
1480 static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1481 				struct sk_buff *buf)
1482 {
1483 	u32 rec_gap = 0;
1484 	u32 max_pkt_info;
1485 	u32 max_pkt_ack;
1486 	u32 msg_tol;
1487 	struct tipc_msg *msg = buf_msg(buf);
1488 
1489 	/* Discard protocol message during link changeover */
1490 	if (l_ptr->exp_msg_count)
1491 		goto exit;
1492 
1493 	if (l_ptr->net_plane != msg_net_plane(msg))
1494 		if (link_own_addr(l_ptr) > msg_prevnode(msg))
1495 			l_ptr->net_plane = msg_net_plane(msg);
1496 
1497 	switch (msg_type(msg)) {
1498 
1499 	case RESET_MSG:
1500 		if (!link_working_unknown(l_ptr) &&
1501 		    (l_ptr->peer_session != INVALID_SESSION)) {
1502 			if (less_eq(msg_session(msg), l_ptr->peer_session))
1503 				break; /* duplicate or old reset: ignore */
1504 		}
1505 
1506 		if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1507 				link_working_unknown(l_ptr))) {
1508 			/*
1509 			 * peer has lost contact -- don't allow peer's links
1510 			 * to reactivate before we recognize loss & clean up
1511 			 */
1512 			l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1513 		}
1514 
1515 		link_state_event(l_ptr, RESET_MSG);
1516 
1517 		/* fall thru' */
1518 	case ACTIVATE_MSG:
1519 		/* Update link settings according other endpoint's values */
1520 		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1521 
1522 		msg_tol = msg_link_tolerance(msg);
1523 		if (msg_tol > l_ptr->tolerance)
1524 			link_set_supervision_props(l_ptr, msg_tol);
1525 
1526 		if (msg_linkprio(msg) > l_ptr->priority)
1527 			l_ptr->priority = msg_linkprio(msg);
1528 
1529 		max_pkt_info = msg_max_pkt(msg);
1530 		if (max_pkt_info) {
1531 			if (max_pkt_info < l_ptr->max_pkt_target)
1532 				l_ptr->max_pkt_target = max_pkt_info;
1533 			if (l_ptr->max_pkt > l_ptr->max_pkt_target)
1534 				l_ptr->max_pkt = l_ptr->max_pkt_target;
1535 		} else {
1536 			l_ptr->max_pkt = l_ptr->max_pkt_target;
1537 		}
1538 
1539 		/* Synchronize broadcast link info, if not done previously */
1540 		if (!tipc_node_is_up(l_ptr->owner)) {
1541 			l_ptr->owner->bclink.last_sent =
1542 				l_ptr->owner->bclink.last_in =
1543 				msg_last_bcast(msg);
1544 			l_ptr->owner->bclink.oos_state = 0;
1545 		}
1546 
1547 		l_ptr->peer_session = msg_session(msg);
1548 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
1549 
1550 		if (msg_type(msg) == ACTIVATE_MSG)
1551 			link_state_event(l_ptr, ACTIVATE_MSG);
1552 		break;
1553 	case STATE_MSG:
1554 
1555 		msg_tol = msg_link_tolerance(msg);
1556 		if (msg_tol)
1557 			link_set_supervision_props(l_ptr, msg_tol);
1558 
1559 		if (msg_linkprio(msg) &&
1560 		    (msg_linkprio(msg) != l_ptr->priority)) {
1561 			pr_debug("%s<%s>, priority change %u->%u\n",
1562 				 link_rst_msg, l_ptr->name,
1563 				 l_ptr->priority, msg_linkprio(msg));
1564 			l_ptr->priority = msg_linkprio(msg);
1565 			tipc_link_reset(l_ptr); /* Enforce change to take effect */
1566 			break;
1567 		}
1568 
1569 		/* Record reception; force mismatch at next timeout: */
1570 		l_ptr->checkpoint--;
1571 
1572 		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1573 		l_ptr->stats.recv_states++;
1574 		if (link_reset_unknown(l_ptr))
1575 			break;
1576 
1577 		if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
1578 			rec_gap = mod(msg_next_sent(msg) -
1579 				      mod(l_ptr->next_in_no));
1580 		}
1581 
1582 		max_pkt_ack = msg_max_pkt(msg);
1583 		if (max_pkt_ack > l_ptr->max_pkt) {
1584 			l_ptr->max_pkt = max_pkt_ack;
1585 			l_ptr->max_pkt_probes = 0;
1586 		}
1587 
1588 		max_pkt_ack = 0;
1589 		if (msg_probe(msg)) {
1590 			l_ptr->stats.recv_probes++;
1591 			if (msg_size(msg) > sizeof(l_ptr->proto_msg))
1592 				max_pkt_ack = msg_size(msg);
1593 		}
1594 
1595 		/* Protocol message before retransmits, reduce loss risk */
1596 		if (l_ptr->owner->bclink.recv_permitted)
1597 			tipc_bclink_update_link_state(l_ptr->owner,
1598 						      msg_last_bcast(msg));
1599 
1600 		if (rec_gap || (msg_probe(msg))) {
1601 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
1602 					     0, max_pkt_ack);
1603 		}
1604 		if (msg_seq_gap(msg)) {
1605 			l_ptr->stats.recv_nacks++;
1606 			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1607 					     msg_seq_gap(msg));
1608 		}
1609 		break;
1610 	}
1611 exit:
1612 	kfree_skb(buf);
1613 }
1614 
1615 
1616 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1617  * a different bearer. Owner node is locked.
1618  */
1619 static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1620 				  struct tipc_msg *tunnel_hdr,
1621 				  struct tipc_msg *msg,
1622 				  u32 selector)
1623 {
1624 	struct tipc_link *tunnel;
1625 	struct sk_buff *skb;
1626 	u32 length = msg_size(msg);
1627 
1628 	tunnel = l_ptr->owner->active_links[selector & 1];
1629 	if (!tipc_link_is_up(tunnel)) {
1630 		pr_warn("%stunnel link no longer available\n", link_co_err);
1631 		return;
1632 	}
1633 	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1634 	skb = tipc_buf_acquire(length + INT_H_SIZE);
1635 	if (!skb) {
1636 		pr_warn("%sunable to send tunnel msg\n", link_co_err);
1637 		return;
1638 	}
1639 	skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1640 	skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1641 	__tipc_link_xmit_skb(tunnel, skb);
1642 }
1643 
1644 
1645 /* tipc_link_failover_send_queue(): A link has gone down, but a second
1646  * link is still active. We can do failover. Tunnel the failing link's
1647  * whole send queue via the remaining link. This way, we don't lose
1648  * any packets, and sequence order is preserved for subsequent traffic
1649  * sent over the remaining link. Owner node is locked.
1650  */
1651 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1652 {
1653 	int msgcount;
1654 	struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1655 	struct tipc_msg tunnel_hdr;
1656 	struct sk_buff *skb;
1657 	int split_bundles;
1658 
1659 	if (!tunnel)
1660 		return;
1661 
1662 	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1663 		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1664 	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1665 	tipc_link_purge_backlog(l_ptr);
1666 	msgcount = skb_queue_len(&l_ptr->transmq);
1667 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1668 	msg_set_msgcnt(&tunnel_hdr, msgcount);
1669 
1670 	if (skb_queue_empty(&l_ptr->transmq)) {
1671 		skb = tipc_buf_acquire(INT_H_SIZE);
1672 		if (skb) {
1673 			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1674 			msg_set_size(&tunnel_hdr, INT_H_SIZE);
1675 			__tipc_link_xmit_skb(tunnel, skb);
1676 		} else {
1677 			pr_warn("%sunable to send changeover msg\n",
1678 				link_co_err);
1679 		}
1680 		return;
1681 	}
1682 
1683 	split_bundles = (l_ptr->owner->active_links[0] !=
1684 			 l_ptr->owner->active_links[1]);
1685 
1686 	skb_queue_walk(&l_ptr->transmq, skb) {
1687 		struct tipc_msg *msg = buf_msg(skb);
1688 
1689 		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1690 			struct tipc_msg *m = msg_get_wrapped(msg);
1691 			unchar *pos = (unchar *)m;
1692 
1693 			msgcount = msg_msgcnt(msg);
1694 			while (msgcount--) {
1695 				msg_set_seqno(m, msg_seqno(msg));
1696 				tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1697 						      msg_link_selector(m));
1698 				pos += align(msg_size(m));
1699 				m = (struct tipc_msg *)pos;
1700 			}
1701 		} else {
1702 			tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1703 					      msg_link_selector(msg));
1704 		}
1705 	}
1706 }
1707 
1708 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1709  * duplicate of the first link's send queue via the new link. This way, we
1710  * are guaranteed that currently queued packets from a socket are delivered
1711  * before future traffic from the same socket, even if this is using the
1712  * new link. The last arriving copy of each duplicate packet is dropped at
1713  * the receiving end by the regular protocol check, so packet cardinality
1714  * and sequence order is preserved per sender/receiver socket pair.
1715  * Owner node is locked.
1716  */
1717 void tipc_link_dup_queue_xmit(struct tipc_link *link,
1718 			      struct tipc_link *tnl)
1719 {
1720 	struct sk_buff *skb;
1721 	struct tipc_msg tnl_hdr;
1722 	struct sk_buff_head *queue = &link->transmq;
1723 	int mcnt;
1724 
1725 	tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
1726 		      DUPLICATE_MSG, INT_H_SIZE, link->addr);
1727 	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
1728 	msg_set_msgcnt(&tnl_hdr, mcnt);
1729 	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
1730 
1731 tunnel_queue:
1732 	skb_queue_walk(queue, skb) {
1733 		struct sk_buff *outskb;
1734 		struct tipc_msg *msg = buf_msg(skb);
1735 		u32 len = msg_size(msg);
1736 
1737 		msg_set_ack(msg, mod(link->next_in_no - 1));
1738 		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
1739 		msg_set_size(&tnl_hdr, len + INT_H_SIZE);
1740 		outskb = tipc_buf_acquire(len + INT_H_SIZE);
1741 		if (outskb == NULL) {
1742 			pr_warn("%sunable to send duplicate msg\n",
1743 				link_co_err);
1744 			return;
1745 		}
1746 		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
1747 		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
1748 					       skb->data, len);
1749 		__tipc_link_xmit_skb(tnl, outskb);
1750 		if (!tipc_link_is_up(link))
1751 			return;
1752 	}
1753 	if (queue == &link->backlogq)
1754 		return;
1755 	queue = &link->backlogq;
1756 	goto tunnel_queue;
1757 }
1758 
1759 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1760  * Owner node is locked.
1761  */
1762 static void tipc_link_dup_rcv(struct tipc_link *link,
1763 			      struct sk_buff *skb)
1764 {
1765 	struct sk_buff *iskb;
1766 	int pos = 0;
1767 
1768 	if (!tipc_link_is_up(link))
1769 		return;
1770 
1771 	if (!tipc_msg_extract(skb, &iskb, &pos)) {
1772 		pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1773 		return;
1774 	}
1775 	/* Append buffer to deferred queue, if applicable: */
1776 	link_handle_out_of_seq_msg(link, iskb);
1777 }
1778 
1779 /*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
1780  *  Owner node is locked.
1781  */
1782 static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1783 					      struct sk_buff *t_buf)
1784 {
1785 	struct tipc_msg *t_msg = buf_msg(t_buf);
1786 	struct sk_buff *buf = NULL;
1787 	struct tipc_msg *msg;
1788 	int pos = 0;
1789 
1790 	if (tipc_link_is_up(l_ptr))
1791 		tipc_link_reset(l_ptr);
1792 
1793 	/* First failover packet? */
1794 	if (l_ptr->exp_msg_count == START_CHANGEOVER)
1795 		l_ptr->exp_msg_count = msg_msgcnt(t_msg);
1796 
1797 	/* Should there be an inner packet? */
1798 	if (l_ptr->exp_msg_count) {
1799 		l_ptr->exp_msg_count--;
1800 		if (!tipc_msg_extract(t_buf, &buf, &pos)) {
1801 			pr_warn("%sno inner failover pkt\n", link_co_err);
1802 			goto exit;
1803 		}
1804 		msg = buf_msg(buf);
1805 
1806 		if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
1807 			kfree_skb(buf);
1808 			buf = NULL;
1809 			goto exit;
1810 		}
1811 		if (msg_user(msg) == MSG_FRAGMENTER) {
1812 			l_ptr->stats.recv_fragments++;
1813 			tipc_buf_append(&l_ptr->reasm_buf, &buf);
1814 		}
1815 	}
1816 exit:
1817 	if ((!l_ptr->exp_msg_count) && (l_ptr->flags & LINK_STOPPED))
1818 		tipc_link_delete(l_ptr);
1819 	return buf;
1820 }
1821 
1822 /*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
1823  *  via other link as result of a failover (ORIGINAL_MSG) or
1824  *  a new active link (DUPLICATE_MSG). Failover packets are
1825  *  returned to the active link for delivery upwards.
1826  *  Owner node is locked.
1827  */
1828 static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
1829 				struct sk_buff **buf)
1830 {
1831 	struct sk_buff *t_buf = *buf;
1832 	struct tipc_link *l_ptr;
1833 	struct tipc_msg *t_msg = buf_msg(t_buf);
1834 	u32 bearer_id = msg_bearer_id(t_msg);
1835 
1836 	*buf = NULL;
1837 
1838 	if (bearer_id >= MAX_BEARERS)
1839 		goto exit;
1840 
1841 	l_ptr = n_ptr->links[bearer_id];
1842 	if (!l_ptr)
1843 		goto exit;
1844 
1845 	if (msg_type(t_msg) == DUPLICATE_MSG)
1846 		tipc_link_dup_rcv(l_ptr, t_buf);
1847 	else if (msg_type(t_msg) == ORIGINAL_MSG)
1848 		*buf = tipc_link_failover_rcv(l_ptr, t_buf);
1849 	else
1850 		pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1851 exit:
1852 	kfree_skb(t_buf);
1853 	return *buf != NULL;
1854 }
1855 
1856 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1857 {
1858 	unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
1859 
1860 	if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1861 		return;
1862 
1863 	l_ptr->tolerance = tol;
1864 	l_ptr->cont_intv = msecs_to_jiffies(intv);
1865 	l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
1866 }
1867 
1868 void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1869 {
1870 	int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
1871 
1872 	l->window = win;
1873 	l->backlog[TIPC_LOW_IMPORTANCE].limit      = win / 2;
1874 	l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = win;
1875 	l->backlog[TIPC_HIGH_IMPORTANCE].limit     = win / 2 * 3;
1876 	l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
1877 	l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;
1878 }
1879 
1880 /* tipc_link_find_owner - locate owner node of link by link's name
1881  * @net: the applicable net namespace
1882  * @name: pointer to link name string
1883  * @bearer_id: pointer to index in 'node->links' array where the link was found.
1884  *
1885  * Returns pointer to node owning the link, or 0 if no matching link is found.
1886  */
1887 static struct tipc_node *tipc_link_find_owner(struct net *net,
1888 					      const char *link_name,
1889 					      unsigned int *bearer_id)
1890 {
1891 	struct tipc_net *tn = net_generic(net, tipc_net_id);
1892 	struct tipc_link *l_ptr;
1893 	struct tipc_node *n_ptr;
1894 	struct tipc_node *found_node = NULL;
1895 	int i;
1896 
1897 	*bearer_id = 0;
1898 	rcu_read_lock();
1899 	list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
1900 		tipc_node_lock(n_ptr);
1901 		for (i = 0; i < MAX_BEARERS; i++) {
1902 			l_ptr = n_ptr->links[i];
1903 			if (l_ptr && !strcmp(l_ptr->name, link_name)) {
1904 				*bearer_id = i;
1905 				found_node = n_ptr;
1906 				break;
1907 			}
1908 		}
1909 		tipc_node_unlock(n_ptr);
1910 		if (found_node)
1911 			break;
1912 	}
1913 	rcu_read_unlock();
1914 
1915 	return found_node;
1916 }
1917 
1918 /**
1919  * link_reset_statistics - reset link statistics
1920  * @l_ptr: pointer to link
1921  */
1922 static void link_reset_statistics(struct tipc_link *l_ptr)
1923 {
1924 	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
1925 	l_ptr->stats.sent_info = l_ptr->next_out_no;
1926 	l_ptr->stats.recv_info = l_ptr->next_in_no;
1927 }
1928 
1929 static void link_print(struct tipc_link *l_ptr, const char *str)
1930 {
1931 	struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
1932 	struct tipc_bearer *b_ptr;
1933 
1934 	rcu_read_lock();
1935 	b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
1936 	if (b_ptr)
1937 		pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
1938 	rcu_read_unlock();
1939 
1940 	if (link_working_unknown(l_ptr))
1941 		pr_cont(":WU\n");
1942 	else if (link_reset_reset(l_ptr))
1943 		pr_cont(":RR\n");
1944 	else if (link_reset_unknown(l_ptr))
1945 		pr_cont(":RU\n");
1946 	else if (link_working_working(l_ptr))
1947 		pr_cont(":WW\n");
1948 	else
1949 		pr_cont("\n");
1950 }
1951 
1952 /* Parse and validate nested (link) properties valid for media, bearer and link
1953  */
1954 int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
1955 {
1956 	int err;
1957 
1958 	err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
1959 			       tipc_nl_prop_policy);
1960 	if (err)
1961 		return err;
1962 
1963 	if (props[TIPC_NLA_PROP_PRIO]) {
1964 		u32 prio;
1965 
1966 		prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1967 		if (prio > TIPC_MAX_LINK_PRI)
1968 			return -EINVAL;
1969 	}
1970 
1971 	if (props[TIPC_NLA_PROP_TOL]) {
1972 		u32 tol;
1973 
1974 		tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1975 		if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1976 			return -EINVAL;
1977 	}
1978 
1979 	if (props[TIPC_NLA_PROP_WIN]) {
1980 		u32 win;
1981 
1982 		win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1983 		if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
1984 			return -EINVAL;
1985 	}
1986 
1987 	return 0;
1988 }
1989 
1990 int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
1991 {
1992 	int err;
1993 	int res = 0;
1994 	int bearer_id;
1995 	char *name;
1996 	struct tipc_link *link;
1997 	struct tipc_node *node;
1998 	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1999 	struct net *net = sock_net(skb->sk);
2000 
2001 	if (!info->attrs[TIPC_NLA_LINK])
2002 		return -EINVAL;
2003 
2004 	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2005 			       info->attrs[TIPC_NLA_LINK],
2006 			       tipc_nl_link_policy);
2007 	if (err)
2008 		return err;
2009 
2010 	if (!attrs[TIPC_NLA_LINK_NAME])
2011 		return -EINVAL;
2012 
2013 	name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2014 
2015 	node = tipc_link_find_owner(net, name, &bearer_id);
2016 	if (!node)
2017 		return -EINVAL;
2018 
2019 	tipc_node_lock(node);
2020 
2021 	link = node->links[bearer_id];
2022 	if (!link) {
2023 		res = -EINVAL;
2024 		goto out;
2025 	}
2026 
2027 	if (attrs[TIPC_NLA_LINK_PROP]) {
2028 		struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
2029 
2030 		err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
2031 					      props);
2032 		if (err) {
2033 			res = err;
2034 			goto out;
2035 		}
2036 
2037 		if (props[TIPC_NLA_PROP_TOL]) {
2038 			u32 tol;
2039 
2040 			tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2041 			link_set_supervision_props(link, tol);
2042 			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0);
2043 		}
2044 		if (props[TIPC_NLA_PROP_PRIO]) {
2045 			u32 prio;
2046 
2047 			prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2048 			link->priority = prio;
2049 			tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0);
2050 		}
2051 		if (props[TIPC_NLA_PROP_WIN]) {
2052 			u32 win;
2053 
2054 			win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2055 			tipc_link_set_queue_limits(link, win);
2056 		}
2057 	}
2058 
2059 out:
2060 	tipc_node_unlock(node);
2061 
2062 	return res;
2063 }
2064 
2065 static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
2066 {
2067 	int i;
2068 	struct nlattr *stats;
2069 
2070 	struct nla_map {
2071 		u32 key;
2072 		u32 val;
2073 	};
2074 
2075 	struct nla_map map[] = {
2076 		{TIPC_NLA_STATS_RX_INFO, s->recv_info},
2077 		{TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
2078 		{TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
2079 		{TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
2080 		{TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
2081 		{TIPC_NLA_STATS_TX_INFO, s->sent_info},
2082 		{TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
2083 		{TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
2084 		{TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
2085 		{TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
2086 		{TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
2087 			s->msg_length_counts : 1},
2088 		{TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
2089 		{TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
2090 		{TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
2091 		{TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
2092 		{TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
2093 		{TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
2094 		{TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
2095 		{TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
2096 		{TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
2097 		{TIPC_NLA_STATS_RX_STATES, s->recv_states},
2098 		{TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
2099 		{TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
2100 		{TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
2101 		{TIPC_NLA_STATS_TX_STATES, s->sent_states},
2102 		{TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
2103 		{TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
2104 		{TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
2105 		{TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
2106 		{TIPC_NLA_STATS_DUPLICATES, s->duplicates},
2107 		{TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
2108 		{TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
2109 		{TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
2110 			(s->accu_queue_sz / s->queue_sz_counts) : 0}
2111 	};
2112 
2113 	stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
2114 	if (!stats)
2115 		return -EMSGSIZE;
2116 
2117 	for (i = 0; i <  ARRAY_SIZE(map); i++)
2118 		if (nla_put_u32(skb, map[i].key, map[i].val))
2119 			goto msg_full;
2120 
2121 	nla_nest_end(skb, stats);
2122 
2123 	return 0;
2124 msg_full:
2125 	nla_nest_cancel(skb, stats);
2126 
2127 	return -EMSGSIZE;
2128 }
2129 
2130 /* Caller should hold appropriate locks to protect the link */
2131 static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
2132 			      struct tipc_link *link)
2133 {
2134 	int err;
2135 	void *hdr;
2136 	struct nlattr *attrs;
2137 	struct nlattr *prop;
2138 	struct tipc_net *tn = net_generic(net, tipc_net_id);
2139 
2140 	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
2141 			  NLM_F_MULTI, TIPC_NL_LINK_GET);
2142 	if (!hdr)
2143 		return -EMSGSIZE;
2144 
2145 	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
2146 	if (!attrs)
2147 		goto msg_full;
2148 
2149 	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
2150 		goto attr_msg_full;
2151 	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
2152 			tipc_cluster_mask(tn->own_addr)))
2153 		goto attr_msg_full;
2154 	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt))
2155 		goto attr_msg_full;
2156 	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))
2157 		goto attr_msg_full;
2158 	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no))
2159 		goto attr_msg_full;
2160 
2161 	if (tipc_link_is_up(link))
2162 		if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2163 			goto attr_msg_full;
2164 	if (tipc_link_is_active(link))
2165 		if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2166 			goto attr_msg_full;
2167 
2168 	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2169 	if (!prop)
2170 		goto attr_msg_full;
2171 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2172 		goto prop_msg_full;
2173 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2174 		goto prop_msg_full;
2175 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2176 			link->window))
2177 		goto prop_msg_full;
2178 	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2179 		goto prop_msg_full;
2180 	nla_nest_end(msg->skb, prop);
2181 
2182 	err = __tipc_nl_add_stats(msg->skb, &link->stats);
2183 	if (err)
2184 		goto attr_msg_full;
2185 
2186 	nla_nest_end(msg->skb, attrs);
2187 	genlmsg_end(msg->skb, hdr);
2188 
2189 	return 0;
2190 
2191 prop_msg_full:
2192 	nla_nest_cancel(msg->skb, prop);
2193 attr_msg_full:
2194 	nla_nest_cancel(msg->skb, attrs);
2195 msg_full:
2196 	genlmsg_cancel(msg->skb, hdr);
2197 
2198 	return -EMSGSIZE;
2199 }
2200 
2201 /* Caller should hold node lock  */
2202 static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
2203 				    struct tipc_node *node, u32 *prev_link)
2204 {
2205 	u32 i;
2206 	int err;
2207 
2208 	for (i = *prev_link; i < MAX_BEARERS; i++) {
2209 		*prev_link = i;
2210 
2211 		if (!node->links[i])
2212 			continue;
2213 
2214 		err = __tipc_nl_add_link(net, msg, node->links[i]);
2215 		if (err)
2216 			return err;
2217 	}
2218 	*prev_link = 0;
2219 
2220 	return 0;
2221 }
2222 
2223 int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2224 {
2225 	struct net *net = sock_net(skb->sk);
2226 	struct tipc_net *tn = net_generic(net, tipc_net_id);
2227 	struct tipc_node *node;
2228 	struct tipc_nl_msg msg;
2229 	u32 prev_node = cb->args[0];
2230 	u32 prev_link = cb->args[1];
2231 	int done = cb->args[2];
2232 	int err;
2233 
2234 	if (done)
2235 		return 0;
2236 
2237 	msg.skb = skb;
2238 	msg.portid = NETLINK_CB(cb->skb).portid;
2239 	msg.seq = cb->nlh->nlmsg_seq;
2240 
2241 	rcu_read_lock();
2242 
2243 	if (prev_node) {
2244 		node = tipc_node_find(net, prev_node);
2245 		if (!node) {
2246 			/* We never set seq or call nl_dump_check_consistent()
2247 			 * this means that setting prev_seq here will cause the
2248 			 * consistence check to fail in the netlink callback
2249 			 * handler. Resulting in the last NLMSG_DONE message
2250 			 * having the NLM_F_DUMP_INTR flag set.
2251 			 */
2252 			cb->prev_seq = 1;
2253 			goto out;
2254 		}
2255 
2256 		list_for_each_entry_continue_rcu(node, &tn->node_list,
2257 						 list) {
2258 			tipc_node_lock(node);
2259 			err = __tipc_nl_add_node_links(net, &msg, node,
2260 						       &prev_link);
2261 			tipc_node_unlock(node);
2262 			if (err)
2263 				goto out;
2264 
2265 			prev_node = node->addr;
2266 		}
2267 	} else {
2268 		err = tipc_nl_add_bc_link(net, &msg);
2269 		if (err)
2270 			goto out;
2271 
2272 		list_for_each_entry_rcu(node, &tn->node_list, list) {
2273 			tipc_node_lock(node);
2274 			err = __tipc_nl_add_node_links(net, &msg, node,
2275 						       &prev_link);
2276 			tipc_node_unlock(node);
2277 			if (err)
2278 				goto out;
2279 
2280 			prev_node = node->addr;
2281 		}
2282 	}
2283 	done = 1;
2284 out:
2285 	rcu_read_unlock();
2286 
2287 	cb->args[0] = prev_node;
2288 	cb->args[1] = prev_link;
2289 	cb->args[2] = done;
2290 
2291 	return skb->len;
2292 }
2293 
2294 int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2295 {
2296 	struct net *net = genl_info_net(info);
2297 	struct sk_buff *ans_skb;
2298 	struct tipc_nl_msg msg;
2299 	struct tipc_link *link;
2300 	struct tipc_node *node;
2301 	char *name;
2302 	int bearer_id;
2303 	int err;
2304 
2305 	if (!info->attrs[TIPC_NLA_LINK_NAME])
2306 		return -EINVAL;
2307 
2308 	name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2309 	node = tipc_link_find_owner(net, name, &bearer_id);
2310 	if (!node)
2311 		return -EINVAL;
2312 
2313 	ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2314 	if (!ans_skb)
2315 		return -ENOMEM;
2316 
2317 	msg.skb = ans_skb;
2318 	msg.portid = info->snd_portid;
2319 	msg.seq = info->snd_seq;
2320 
2321 	tipc_node_lock(node);
2322 	link = node->links[bearer_id];
2323 	if (!link) {
2324 		err = -EINVAL;
2325 		goto err_out;
2326 	}
2327 
2328 	err = __tipc_nl_add_link(net, &msg, link);
2329 	if (err)
2330 		goto err_out;
2331 
2332 	tipc_node_unlock(node);
2333 
2334 	return genlmsg_reply(ans_skb, info);
2335 
2336 err_out:
2337 	tipc_node_unlock(node);
2338 	nlmsg_free(ans_skb);
2339 
2340 	return err;
2341 }
2342 
2343 int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2344 {
2345 	int err;
2346 	char *link_name;
2347 	unsigned int bearer_id;
2348 	struct tipc_link *link;
2349 	struct tipc_node *node;
2350 	struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2351 	struct net *net = sock_net(skb->sk);
2352 
2353 	if (!info->attrs[TIPC_NLA_LINK])
2354 		return -EINVAL;
2355 
2356 	err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2357 			       info->attrs[TIPC_NLA_LINK],
2358 			       tipc_nl_link_policy);
2359 	if (err)
2360 		return err;
2361 
2362 	if (!attrs[TIPC_NLA_LINK_NAME])
2363 		return -EINVAL;
2364 
2365 	link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2366 
2367 	if (strcmp(link_name, tipc_bclink_name) == 0) {
2368 		err = tipc_bclink_reset_stats(net);
2369 		if (err)
2370 			return err;
2371 		return 0;
2372 	}
2373 
2374 	node = tipc_link_find_owner(net, link_name, &bearer_id);
2375 	if (!node)
2376 		return -EINVAL;
2377 
2378 	tipc_node_lock(node);
2379 
2380 	link = node->links[bearer_id];
2381 	if (!link) {
2382 		tipc_node_unlock(node);
2383 		return -EINVAL;
2384 	}
2385 
2386 	link_reset_statistics(link);
2387 
2388 	tipc_node_unlock(node);
2389 
2390 	return 0;
2391 }
2392