xref: /openbmc/linux/net/tipc/link.c (revision 275876e2)
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "socket.h"
41 #include "name_distr.h"
42 #include "discover.h"
43 #include "config.h"
44 
45 #include <linux/pkt_sched.h>
46 
47 /*
48  * Error message prefixes
49  */
50 static const char *link_co_err = "Link changeover error, ";
51 static const char *link_rst_msg = "Resetting link ";
52 static const char *link_unk_evt = "Unknown link event ";
53 
54 /*
55  * Out-of-range value for link session numbers
56  */
57 #define INVALID_SESSION 0x10000
58 
59 /*
60  * Link state events:
61  */
62 #define  STARTING_EVT    856384768	/* link processing trigger */
63 #define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
64 #define  TIMEOUT_EVT     560817u	/* link timer expired */
65 
66 /*
67  * The following two 'message types' is really just implementation
68  * data conveniently stored in the message header.
69  * They must not be considered part of the protocol
70  */
71 #define OPEN_MSG   0
72 #define CLOSED_MSG 1
73 
74 /*
75  * State value stored in 'exp_msg_count'
76  */
77 #define START_CHANGEOVER 100000u
78 
79 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
80 				       struct sk_buff *buf);
81 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
82 static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
83 				 struct sk_buff **buf);
84 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
85 static void link_state_event(struct tipc_link *l_ptr, u32 event);
86 static void link_reset_statistics(struct tipc_link *l_ptr);
87 static void link_print(struct tipc_link *l_ptr, const char *str);
88 static void tipc_link_sync_xmit(struct tipc_link *l);
89 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
90 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
91 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf);
92 
93 /*
94  *  Simple link routines
95  */
96 static unsigned int align(unsigned int i)
97 {
98 	return (i + 3) & ~3u;
99 }
100 
101 static void link_init_max_pkt(struct tipc_link *l_ptr)
102 {
103 	struct tipc_bearer *b_ptr;
104 	u32 max_pkt;
105 
106 	rcu_read_lock();
107 	b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
108 	if (!b_ptr) {
109 		rcu_read_unlock();
110 		return;
111 	}
112 	max_pkt = (b_ptr->mtu & ~3);
113 	rcu_read_unlock();
114 
115 	if (max_pkt > MAX_MSG_SIZE)
116 		max_pkt = MAX_MSG_SIZE;
117 
118 	l_ptr->max_pkt_target = max_pkt;
119 	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
120 		l_ptr->max_pkt = l_ptr->max_pkt_target;
121 	else
122 		l_ptr->max_pkt = MAX_PKT_DEFAULT;
123 
124 	l_ptr->max_pkt_probes = 0;
125 }
126 
127 static u32 link_next_sent(struct tipc_link *l_ptr)
128 {
129 	if (l_ptr->next_out)
130 		return buf_seqno(l_ptr->next_out);
131 	return mod(l_ptr->next_out_no);
132 }
133 
134 static u32 link_last_sent(struct tipc_link *l_ptr)
135 {
136 	return mod(link_next_sent(l_ptr) - 1);
137 }
138 
139 /*
140  *  Simple non-static link routines (i.e. referenced outside this file)
141  */
142 int tipc_link_is_up(struct tipc_link *l_ptr)
143 {
144 	if (!l_ptr)
145 		return 0;
146 	return link_working_working(l_ptr) || link_working_unknown(l_ptr);
147 }
148 
149 int tipc_link_is_active(struct tipc_link *l_ptr)
150 {
151 	return	(l_ptr->owner->active_links[0] == l_ptr) ||
152 		(l_ptr->owner->active_links[1] == l_ptr);
153 }
154 
155 /**
156  * link_timeout - handle expiration of link timer
157  * @l_ptr: pointer to link
158  */
159 static void link_timeout(struct tipc_link *l_ptr)
160 {
161 	tipc_node_lock(l_ptr->owner);
162 
163 	/* update counters used in statistical profiling of send traffic */
164 	l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
165 	l_ptr->stats.queue_sz_counts++;
166 
167 	if (l_ptr->first_out) {
168 		struct tipc_msg *msg = buf_msg(l_ptr->first_out);
169 		u32 length = msg_size(msg);
170 
171 		if ((msg_user(msg) == MSG_FRAGMENTER) &&
172 		    (msg_type(msg) == FIRST_FRAGMENT)) {
173 			length = msg_size(msg_get_wrapped(msg));
174 		}
175 		if (length) {
176 			l_ptr->stats.msg_lengths_total += length;
177 			l_ptr->stats.msg_length_counts++;
178 			if (length <= 64)
179 				l_ptr->stats.msg_length_profile[0]++;
180 			else if (length <= 256)
181 				l_ptr->stats.msg_length_profile[1]++;
182 			else if (length <= 1024)
183 				l_ptr->stats.msg_length_profile[2]++;
184 			else if (length <= 4096)
185 				l_ptr->stats.msg_length_profile[3]++;
186 			else if (length <= 16384)
187 				l_ptr->stats.msg_length_profile[4]++;
188 			else if (length <= 32768)
189 				l_ptr->stats.msg_length_profile[5]++;
190 			else
191 				l_ptr->stats.msg_length_profile[6]++;
192 		}
193 	}
194 
195 	/* do all other link processing performed on a periodic basis */
196 
197 	link_state_event(l_ptr, TIMEOUT_EVT);
198 
199 	if (l_ptr->next_out)
200 		tipc_link_push_queue(l_ptr);
201 
202 	tipc_node_unlock(l_ptr->owner);
203 }
204 
205 static void link_set_timer(struct tipc_link *l_ptr, u32 time)
206 {
207 	k_start_timer(&l_ptr->timer, time);
208 }
209 
210 /**
211  * tipc_link_create - create a new link
212  * @n_ptr: pointer to associated node
213  * @b_ptr: pointer to associated bearer
214  * @media_addr: media address to use when sending messages over link
215  *
216  * Returns pointer to link.
217  */
218 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
219 				   struct tipc_bearer *b_ptr,
220 				   const struct tipc_media_addr *media_addr)
221 {
222 	struct tipc_link *l_ptr;
223 	struct tipc_msg *msg;
224 	char *if_name;
225 	char addr_string[16];
226 	u32 peer = n_ptr->addr;
227 
228 	if (n_ptr->link_cnt >= 2) {
229 		tipc_addr_string_fill(addr_string, n_ptr->addr);
230 		pr_err("Attempt to establish third link to %s\n", addr_string);
231 		return NULL;
232 	}
233 
234 	if (n_ptr->links[b_ptr->identity]) {
235 		tipc_addr_string_fill(addr_string, n_ptr->addr);
236 		pr_err("Attempt to establish second link on <%s> to %s\n",
237 		       b_ptr->name, addr_string);
238 		return NULL;
239 	}
240 
241 	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
242 	if (!l_ptr) {
243 		pr_warn("Link creation failed, no memory\n");
244 		return NULL;
245 	}
246 
247 	l_ptr->addr = peer;
248 	if_name = strchr(b_ptr->name, ':') + 1;
249 	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
250 		tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
251 		tipc_node(tipc_own_addr),
252 		if_name,
253 		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
254 		/* note: peer i/f name is updated by reset/activate message */
255 	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
256 	l_ptr->owner = n_ptr;
257 	l_ptr->checkpoint = 1;
258 	l_ptr->peer_session = INVALID_SESSION;
259 	l_ptr->bearer_id = b_ptr->identity;
260 	link_set_supervision_props(l_ptr, b_ptr->tolerance);
261 	l_ptr->state = RESET_UNKNOWN;
262 
263 	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
264 	msg = l_ptr->pmsg;
265 	tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
266 	msg_set_size(msg, sizeof(l_ptr->proto_msg));
267 	msg_set_session(msg, (tipc_random & 0xffff));
268 	msg_set_bearer_id(msg, b_ptr->identity);
269 	strcpy((char *)msg_data(msg), if_name);
270 
271 	l_ptr->priority = b_ptr->priority;
272 	tipc_link_set_queue_limits(l_ptr, b_ptr->window);
273 
274 	l_ptr->net_plane = b_ptr->net_plane;
275 	link_init_max_pkt(l_ptr);
276 
277 	l_ptr->next_out_no = 1;
278 	INIT_LIST_HEAD(&l_ptr->waiting_ports);
279 
280 	link_reset_statistics(l_ptr);
281 
282 	tipc_node_attach_link(n_ptr, l_ptr);
283 
284 	k_init_timer(&l_ptr->timer, (Handler)link_timeout,
285 		     (unsigned long)l_ptr);
286 
287 	link_state_event(l_ptr, STARTING_EVT);
288 
289 	return l_ptr;
290 }
291 
292 void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
293 {
294 	struct tipc_link *l_ptr;
295 	struct tipc_node *n_ptr;
296 
297 	rcu_read_lock();
298 	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
299 		tipc_node_lock(n_ptr);
300 		l_ptr = n_ptr->links[bearer_id];
301 		if (l_ptr) {
302 			tipc_link_reset(l_ptr);
303 			if (shutting_down || !tipc_node_is_up(n_ptr)) {
304 				tipc_node_detach_link(l_ptr->owner, l_ptr);
305 				tipc_link_reset_fragments(l_ptr);
306 				tipc_node_unlock(n_ptr);
307 
308 				/* Nobody else can access this link now: */
309 				del_timer_sync(&l_ptr->timer);
310 				kfree(l_ptr);
311 			} else {
312 				/* Detach/delete when failover is finished: */
313 				l_ptr->flags |= LINK_STOPPED;
314 				tipc_node_unlock(n_ptr);
315 				del_timer_sync(&l_ptr->timer);
316 			}
317 			continue;
318 		}
319 		tipc_node_unlock(n_ptr);
320 	}
321 	rcu_read_unlock();
322 }
323 
324 /**
325  * link_schedule_port - schedule port for deferred sending
326  * @l_ptr: pointer to link
327  * @origport: reference to sending port
328  * @sz: amount of data to be sent
329  *
330  * Schedules port for renewed sending of messages after link congestion
331  * has abated.
332  */
333 static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
334 {
335 	struct tipc_port *p_ptr;
336 	struct tipc_sock *tsk;
337 
338 	spin_lock_bh(&tipc_port_list_lock);
339 	p_ptr = tipc_port_lock(origport);
340 	if (p_ptr) {
341 		if (!list_empty(&p_ptr->wait_list))
342 			goto exit;
343 		tsk = tipc_port_to_sock(p_ptr);
344 		tsk->link_cong = 1;
345 		p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
346 		list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
347 		l_ptr->stats.link_congs++;
348 exit:
349 		tipc_port_unlock(p_ptr);
350 	}
351 	spin_unlock_bh(&tipc_port_list_lock);
352 	return -ELINKCONG;
353 }
354 
355 void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
356 {
357 	struct tipc_port *p_ptr;
358 	struct tipc_sock *tsk;
359 	struct tipc_port *temp_p_ptr;
360 	int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
361 
362 	if (all)
363 		win = 100000;
364 	if (win <= 0)
365 		return;
366 	if (!spin_trylock_bh(&tipc_port_list_lock))
367 		return;
368 	if (link_congested(l_ptr))
369 		goto exit;
370 	list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371 				 wait_list) {
372 		if (win <= 0)
373 			break;
374 		tsk = tipc_port_to_sock(p_ptr);
375 		list_del_init(&p_ptr->wait_list);
376 		spin_lock_bh(p_ptr->lock);
377 		tsk->link_cong = 0;
378 		tipc_sock_wakeup(tsk);
379 		win -= p_ptr->waiting_pkts;
380 		spin_unlock_bh(p_ptr->lock);
381 	}
382 
383 exit:
384 	spin_unlock_bh(&tipc_port_list_lock);
385 }
386 
387 /**
388  * link_release_outqueue - purge link's outbound message queue
389  * @l_ptr: pointer to link
390  */
391 static void link_release_outqueue(struct tipc_link *l_ptr)
392 {
393 	kfree_skb_list(l_ptr->first_out);
394 	l_ptr->first_out = NULL;
395 	l_ptr->out_queue_size = 0;
396 }
397 
398 /**
399  * tipc_link_reset_fragments - purge link's inbound message fragments queue
400  * @l_ptr: pointer to link
401  */
402 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
403 {
404 	kfree_skb(l_ptr->reasm_buf);
405 	l_ptr->reasm_buf = NULL;
406 }
407 
408 /**
409  * tipc_link_purge_queues - purge all pkt queues associated with link
410  * @l_ptr: pointer to link
411  */
412 void tipc_link_purge_queues(struct tipc_link *l_ptr)
413 {
414 	kfree_skb_list(l_ptr->oldest_deferred_in);
415 	kfree_skb_list(l_ptr->first_out);
416 	tipc_link_reset_fragments(l_ptr);
417 	kfree_skb(l_ptr->proto_msg_queue);
418 	l_ptr->proto_msg_queue = NULL;
419 }
420 
421 void tipc_link_reset(struct tipc_link *l_ptr)
422 {
423 	u32 prev_state = l_ptr->state;
424 	u32 checkpoint = l_ptr->next_in_no;
425 	int was_active_link = tipc_link_is_active(l_ptr);
426 
427 	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428 
429 	/* Link is down, accept any session */
430 	l_ptr->peer_session = INVALID_SESSION;
431 
432 	/* Prepare for max packet size negotiation */
433 	link_init_max_pkt(l_ptr);
434 
435 	l_ptr->state = RESET_UNKNOWN;
436 
437 	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
438 		return;
439 
440 	tipc_node_link_down(l_ptr->owner, l_ptr);
441 	tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr);
442 
443 	if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
444 		l_ptr->reset_checkpoint = checkpoint;
445 		l_ptr->exp_msg_count = START_CHANGEOVER;
446 	}
447 
448 	/* Clean up all queues: */
449 	link_release_outqueue(l_ptr);
450 	kfree_skb(l_ptr->proto_msg_queue);
451 	l_ptr->proto_msg_queue = NULL;
452 	kfree_skb_list(l_ptr->oldest_deferred_in);
453 	if (!list_empty(&l_ptr->waiting_ports))
454 		tipc_link_wakeup_ports(l_ptr, 1);
455 
456 	l_ptr->retransm_queue_head = 0;
457 	l_ptr->retransm_queue_size = 0;
458 	l_ptr->last_out = NULL;
459 	l_ptr->first_out = NULL;
460 	l_ptr->next_out = NULL;
461 	l_ptr->unacked_window = 0;
462 	l_ptr->checkpoint = 1;
463 	l_ptr->next_out_no = 1;
464 	l_ptr->deferred_inqueue_sz = 0;
465 	l_ptr->oldest_deferred_in = NULL;
466 	l_ptr->newest_deferred_in = NULL;
467 	l_ptr->fsm_msg_cnt = 0;
468 	l_ptr->stale_count = 0;
469 	link_reset_statistics(l_ptr);
470 }
471 
472 void tipc_link_reset_list(unsigned int bearer_id)
473 {
474 	struct tipc_link *l_ptr;
475 	struct tipc_node *n_ptr;
476 
477 	rcu_read_lock();
478 	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
479 		tipc_node_lock(n_ptr);
480 		l_ptr = n_ptr->links[bearer_id];
481 		if (l_ptr)
482 			tipc_link_reset(l_ptr);
483 		tipc_node_unlock(n_ptr);
484 	}
485 	rcu_read_unlock();
486 }
487 
488 static void link_activate(struct tipc_link *l_ptr)
489 {
490 	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
491 	tipc_node_link_up(l_ptr->owner, l_ptr);
492 	tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr);
493 }
494 
495 /**
496  * link_state_event - link finite state machine
497  * @l_ptr: pointer to link
498  * @event: state machine event to process
499  */
500 static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
501 {
502 	struct tipc_link *other;
503 	u32 cont_intv = l_ptr->continuity_interval;
504 
505 	if (l_ptr->flags & LINK_STOPPED)
506 		return;
507 
508 	if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
509 		return;		/* Not yet. */
510 
511 	/* Check whether changeover is going on */
512 	if (l_ptr->exp_msg_count) {
513 		if (event == TIMEOUT_EVT)
514 			link_set_timer(l_ptr, cont_intv);
515 		return;
516 	}
517 
518 	switch (l_ptr->state) {
519 	case WORKING_WORKING:
520 		switch (event) {
521 		case TRAFFIC_MSG_EVT:
522 		case ACTIVATE_MSG:
523 			break;
524 		case TIMEOUT_EVT:
525 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
526 				l_ptr->checkpoint = l_ptr->next_in_no;
527 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
528 					tipc_link_proto_xmit(l_ptr, STATE_MSG,
529 							     0, 0, 0, 0, 0);
530 					l_ptr->fsm_msg_cnt++;
531 				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
532 					tipc_link_proto_xmit(l_ptr, STATE_MSG,
533 							     1, 0, 0, 0, 0);
534 					l_ptr->fsm_msg_cnt++;
535 				}
536 				link_set_timer(l_ptr, cont_intv);
537 				break;
538 			}
539 			l_ptr->state = WORKING_UNKNOWN;
540 			l_ptr->fsm_msg_cnt = 0;
541 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
542 			l_ptr->fsm_msg_cnt++;
543 			link_set_timer(l_ptr, cont_intv / 4);
544 			break;
545 		case RESET_MSG:
546 			pr_info("%s<%s>, requested by peer\n", link_rst_msg,
547 				l_ptr->name);
548 			tipc_link_reset(l_ptr);
549 			l_ptr->state = RESET_RESET;
550 			l_ptr->fsm_msg_cnt = 0;
551 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
552 					     0, 0, 0, 0, 0);
553 			l_ptr->fsm_msg_cnt++;
554 			link_set_timer(l_ptr, cont_intv);
555 			break;
556 		default:
557 			pr_err("%s%u in WW state\n", link_unk_evt, event);
558 		}
559 		break;
560 	case WORKING_UNKNOWN:
561 		switch (event) {
562 		case TRAFFIC_MSG_EVT:
563 		case ACTIVATE_MSG:
564 			l_ptr->state = WORKING_WORKING;
565 			l_ptr->fsm_msg_cnt = 0;
566 			link_set_timer(l_ptr, cont_intv);
567 			break;
568 		case RESET_MSG:
569 			pr_info("%s<%s>, requested by peer while probing\n",
570 				link_rst_msg, l_ptr->name);
571 			tipc_link_reset(l_ptr);
572 			l_ptr->state = RESET_RESET;
573 			l_ptr->fsm_msg_cnt = 0;
574 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
575 					     0, 0, 0, 0, 0);
576 			l_ptr->fsm_msg_cnt++;
577 			link_set_timer(l_ptr, cont_intv);
578 			break;
579 		case TIMEOUT_EVT:
580 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
581 				l_ptr->state = WORKING_WORKING;
582 				l_ptr->fsm_msg_cnt = 0;
583 				l_ptr->checkpoint = l_ptr->next_in_no;
584 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
585 					tipc_link_proto_xmit(l_ptr, STATE_MSG,
586 							     0, 0, 0, 0, 0);
587 					l_ptr->fsm_msg_cnt++;
588 				}
589 				link_set_timer(l_ptr, cont_intv);
590 			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
591 				tipc_link_proto_xmit(l_ptr, STATE_MSG,
592 						     1, 0, 0, 0, 0);
593 				l_ptr->fsm_msg_cnt++;
594 				link_set_timer(l_ptr, cont_intv / 4);
595 			} else {	/* Link has failed */
596 				pr_warn("%s<%s>, peer not responding\n",
597 					link_rst_msg, l_ptr->name);
598 				tipc_link_reset(l_ptr);
599 				l_ptr->state = RESET_UNKNOWN;
600 				l_ptr->fsm_msg_cnt = 0;
601 				tipc_link_proto_xmit(l_ptr, RESET_MSG,
602 						     0, 0, 0, 0, 0);
603 				l_ptr->fsm_msg_cnt++;
604 				link_set_timer(l_ptr, cont_intv);
605 			}
606 			break;
607 		default:
608 			pr_err("%s%u in WU state\n", link_unk_evt, event);
609 		}
610 		break;
611 	case RESET_UNKNOWN:
612 		switch (event) {
613 		case TRAFFIC_MSG_EVT:
614 			break;
615 		case ACTIVATE_MSG:
616 			other = l_ptr->owner->active_links[0];
617 			if (other && link_working_unknown(other))
618 				break;
619 			l_ptr->state = WORKING_WORKING;
620 			l_ptr->fsm_msg_cnt = 0;
621 			link_activate(l_ptr);
622 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
623 			l_ptr->fsm_msg_cnt++;
624 			if (l_ptr->owner->working_links == 1)
625 				tipc_link_sync_xmit(l_ptr);
626 			link_set_timer(l_ptr, cont_intv);
627 			break;
628 		case RESET_MSG:
629 			l_ptr->state = RESET_RESET;
630 			l_ptr->fsm_msg_cnt = 0;
631 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
632 					     1, 0, 0, 0, 0);
633 			l_ptr->fsm_msg_cnt++;
634 			link_set_timer(l_ptr, cont_intv);
635 			break;
636 		case STARTING_EVT:
637 			l_ptr->flags |= LINK_STARTED;
638 			/* fall through */
639 		case TIMEOUT_EVT:
640 			tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
641 			l_ptr->fsm_msg_cnt++;
642 			link_set_timer(l_ptr, cont_intv);
643 			break;
644 		default:
645 			pr_err("%s%u in RU state\n", link_unk_evt, event);
646 		}
647 		break;
648 	case RESET_RESET:
649 		switch (event) {
650 		case TRAFFIC_MSG_EVT:
651 		case ACTIVATE_MSG:
652 			other = l_ptr->owner->active_links[0];
653 			if (other && link_working_unknown(other))
654 				break;
655 			l_ptr->state = WORKING_WORKING;
656 			l_ptr->fsm_msg_cnt = 0;
657 			link_activate(l_ptr);
658 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
659 			l_ptr->fsm_msg_cnt++;
660 			if (l_ptr->owner->working_links == 1)
661 				tipc_link_sync_xmit(l_ptr);
662 			link_set_timer(l_ptr, cont_intv);
663 			break;
664 		case RESET_MSG:
665 			break;
666 		case TIMEOUT_EVT:
667 			tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
668 					     0, 0, 0, 0, 0);
669 			l_ptr->fsm_msg_cnt++;
670 			link_set_timer(l_ptr, cont_intv);
671 			break;
672 		default:
673 			pr_err("%s%u in RR state\n", link_unk_evt, event);
674 		}
675 		break;
676 	default:
677 		pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
678 	}
679 }
680 
681 /* tipc_link_cong: determine return value and how to treat the
682  * sent buffer during link congestion.
683  * - For plain, errorless user data messages we keep the buffer and
684  *   return -ELINKONG.
685  * - For all other messages we discard the buffer and return -EHOSTUNREACH
686  * - For TIPC internal messages we also reset the link
687  */
688 static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689 {
690 	struct tipc_msg *msg = buf_msg(buf);
691 	uint psz = msg_size(msg);
692 	uint imp = tipc_msg_tot_importance(msg);
693 	u32 oport = msg_tot_origport(msg);
694 
695 	if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) {
696 		if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697 			link_schedule_port(link, oport, psz);
698 			return -ELINKCONG;
699 		}
700 	} else {
701 		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702 		tipc_link_reset(link);
703 	}
704 	kfree_skb_list(buf);
705 	return -EHOSTUNREACH;
706 }
707 
708 /**
709  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
710  * @link: link to use
711  * @buf: chain of buffers containing message
712  * Consumes the buffer chain, except when returning -ELINKCONG
713  * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
714  * user data messages) or -EHOSTUNREACH (all other messages/senders)
715  * Only the socket functions tipc_send_stream() and tipc_send_packet() need
716  * to act on the return value, since they may need to do more send attempts.
717  */
718 int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
719 {
720 	struct tipc_msg *msg = buf_msg(buf);
721 	uint psz = msg_size(msg);
722 	uint qsz = link->out_queue_size;
723 	uint sndlim = link->queue_limit[0];
724 	uint imp = tipc_msg_tot_importance(msg);
725 	uint mtu = link->max_pkt;
726 	uint ack = mod(link->next_in_no - 1);
727 	uint seqno = link->next_out_no;
728 	uint bc_last_in = link->owner->bclink.last_in;
729 	struct tipc_media_addr *addr = &link->media_addr;
730 	struct sk_buff *next = buf->next;
731 
732 	/* Match queue limits against msg importance: */
733 	if (unlikely(qsz >= link->queue_limit[imp]))
734 		return tipc_link_cong(link, buf);
735 
736 	/* Has valid packet limit been used ? */
737 	if (unlikely(psz > mtu)) {
738 		kfree_skb_list(buf);
739 		return -EMSGSIZE;
740 	}
741 
742 	/* Prepare each packet for sending, and add to outqueue: */
743 	while (buf) {
744 		next = buf->next;
745 		msg = buf_msg(buf);
746 		msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
747 		msg_set_bcast_ack(msg, bc_last_in);
748 
749 		if (!link->first_out) {
750 			link->first_out = buf;
751 		} else if (qsz < sndlim) {
752 			link->last_out->next = buf;
753 		} else if (tipc_msg_bundle(link->last_out, buf, mtu)) {
754 			link->stats.sent_bundled++;
755 			buf = next;
756 			next = buf->next;
757 			continue;
758 		} else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) {
759 			link->stats.sent_bundled++;
760 			link->stats.sent_bundles++;
761 			link->last_out->next = buf;
762 			if (!link->next_out)
763 				link->next_out = buf;
764 		} else {
765 			link->last_out->next = buf;
766 			if (!link->next_out)
767 				link->next_out = buf;
768 		}
769 
770 		/* Send packet if possible: */
771 		if (likely(++qsz <= sndlim)) {
772 			tipc_bearer_send(link->bearer_id, buf, addr);
773 			link->next_out = next;
774 			link->unacked_window = 0;
775 		}
776 		seqno++;
777 		link->last_out = buf;
778 		buf = next;
779 	}
780 	link->next_out_no = seqno;
781 	link->out_queue_size = qsz;
782 	return 0;
783 }
784 
785 /**
786  * tipc_link_xmit() is the general link level function for message sending
787  * @buf: chain of buffers containing message
788  * @dsz: amount of user data to be sent
789  * @dnode: address of destination node
790  * @selector: a number used for deterministic link selection
791  * Consumes the buffer chain, except when returning -ELINKCONG
792  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
793  */
794 int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
795 {
796 	struct tipc_link *link = NULL;
797 	struct tipc_node *node;
798 	int rc = -EHOSTUNREACH;
799 
800 	node = tipc_node_find(dnode);
801 	if (node) {
802 		tipc_node_lock(node);
803 		link = node->active_links[selector & 1];
804 		if (link)
805 			rc = __tipc_link_xmit(link, buf);
806 		tipc_node_unlock(node);
807 	}
808 
809 	if (link)
810 		return rc;
811 
812 	if (likely(in_own_node(dnode)))
813 		return tipc_sk_rcv(buf);
814 
815 	kfree_skb_list(buf);
816 	return rc;
817 }
818 
819 /*
820  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
821  *
822  * Give a newly added peer node the sequence number where it should
823  * start receiving and acking broadcast packets.
824  *
825  * Called with node locked
826  */
827 static void tipc_link_sync_xmit(struct tipc_link *link)
828 {
829 	struct sk_buff *buf;
830 	struct tipc_msg *msg;
831 
832 	buf = tipc_buf_acquire(INT_H_SIZE);
833 	if (!buf)
834 		return;
835 
836 	msg = buf_msg(buf);
837 	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
838 	msg_set_last_bcast(msg, link->owner->bclink.acked);
839 	__tipc_link_xmit(link, buf);
840 }
841 
842 /*
843  * tipc_link_sync_rcv - synchronize broadcast link endpoints.
844  * Receive the sequence number where we should start receiving and
845  * acking broadcast packets from a newly added peer node, and open
846  * up for reception of such packets.
847  *
848  * Called with node locked
849  */
850 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
851 {
852 	struct tipc_msg *msg = buf_msg(buf);
853 
854 	n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
855 	n->bclink.recv_permitted = true;
856 	kfree_skb(buf);
857 }
858 
859 /*
860  * tipc_link_push_packet: Push one unsent packet to the media
861  */
862 static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
863 {
864 	struct sk_buff *buf = l_ptr->first_out;
865 	u32 r_q_size = l_ptr->retransm_queue_size;
866 	u32 r_q_head = l_ptr->retransm_queue_head;
867 
868 	/* Step to position where retransmission failed, if any,    */
869 	/* consider that buffers may have been released in meantime */
870 	if (r_q_size && buf) {
871 		u32 last = lesser(mod(r_q_head + r_q_size),
872 				  link_last_sent(l_ptr));
873 		u32 first = buf_seqno(buf);
874 
875 		while (buf && less(first, r_q_head)) {
876 			first = mod(first + 1);
877 			buf = buf->next;
878 		}
879 		l_ptr->retransm_queue_head = r_q_head = first;
880 		l_ptr->retransm_queue_size = r_q_size = mod(last - first);
881 	}
882 
883 	/* Continue retransmission now, if there is anything: */
884 	if (r_q_size && buf) {
885 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
886 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
887 		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
888 		l_ptr->retransm_queue_head = mod(++r_q_head);
889 		l_ptr->retransm_queue_size = --r_q_size;
890 		l_ptr->stats.retransmitted++;
891 		return 0;
892 	}
893 
894 	/* Send deferred protocol message, if any: */
895 	buf = l_ptr->proto_msg_queue;
896 	if (buf) {
897 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
898 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
899 		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
900 		l_ptr->unacked_window = 0;
901 		kfree_skb(buf);
902 		l_ptr->proto_msg_queue = NULL;
903 		return 0;
904 	}
905 
906 	/* Send one deferred data message, if send window not full: */
907 	buf = l_ptr->next_out;
908 	if (buf) {
909 		struct tipc_msg *msg = buf_msg(buf);
910 		u32 next = msg_seqno(msg);
911 		u32 first = buf_seqno(l_ptr->first_out);
912 
913 		if (mod(next - first) < l_ptr->queue_limit[0]) {
914 			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
915 			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
916 			tipc_bearer_send(l_ptr->bearer_id, buf,
917 					 &l_ptr->media_addr);
918 			if (msg_user(msg) == MSG_BUNDLER)
919 				msg_set_type(msg, BUNDLE_CLOSED);
920 			l_ptr->next_out = buf->next;
921 			return 0;
922 		}
923 	}
924 	return 1;
925 }
926 
927 /*
928  * push_queue(): push out the unsent messages of a link where
929  *               congestion has abated. Node is locked
930  */
931 void tipc_link_push_queue(struct tipc_link *l_ptr)
932 {
933 	u32 res;
934 
935 	do {
936 		res = tipc_link_push_packet(l_ptr);
937 	} while (!res);
938 }
939 
940 void tipc_link_reset_all(struct tipc_node *node)
941 {
942 	char addr_string[16];
943 	u32 i;
944 
945 	tipc_node_lock(node);
946 
947 	pr_warn("Resetting all links to %s\n",
948 		tipc_addr_string_fill(addr_string, node->addr));
949 
950 	for (i = 0; i < MAX_BEARERS; i++) {
951 		if (node->links[i]) {
952 			link_print(node->links[i], "Resetting link\n");
953 			tipc_link_reset(node->links[i]);
954 		}
955 	}
956 
957 	tipc_node_unlock(node);
958 }
959 
960 static void link_retransmit_failure(struct tipc_link *l_ptr,
961 				    struct sk_buff *buf)
962 {
963 	struct tipc_msg *msg = buf_msg(buf);
964 
965 	pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
966 
967 	if (l_ptr->addr) {
968 		/* Handle failure on standard link */
969 		link_print(l_ptr, "Resetting link\n");
970 		tipc_link_reset(l_ptr);
971 
972 	} else {
973 		/* Handle failure on broadcast link */
974 		struct tipc_node *n_ptr;
975 		char addr_string[16];
976 
977 		pr_info("Msg seq number: %u,  ", msg_seqno(msg));
978 		pr_cont("Outstanding acks: %lu\n",
979 			(unsigned long) TIPC_SKB_CB(buf)->handle);
980 
981 		n_ptr = tipc_bclink_retransmit_to();
982 		tipc_node_lock(n_ptr);
983 
984 		tipc_addr_string_fill(addr_string, n_ptr->addr);
985 		pr_info("Broadcast link info for %s\n", addr_string);
986 		pr_info("Reception permitted: %d,  Acked: %u\n",
987 			n_ptr->bclink.recv_permitted,
988 			n_ptr->bclink.acked);
989 		pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
990 			n_ptr->bclink.last_in,
991 			n_ptr->bclink.oos_state,
992 			n_ptr->bclink.last_sent);
993 
994 		tipc_node_unlock(n_ptr);
995 
996 		tipc_bclink_set_flags(TIPC_BCLINK_RESET);
997 		l_ptr->stale_count = 0;
998 	}
999 }
1000 
1001 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1002 			  u32 retransmits)
1003 {
1004 	struct tipc_msg *msg;
1005 
1006 	if (!buf)
1007 		return;
1008 
1009 	msg = buf_msg(buf);
1010 
1011 	/* Detect repeated retransmit failures */
1012 	if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1013 		if (++l_ptr->stale_count > 100) {
1014 			link_retransmit_failure(l_ptr, buf);
1015 			return;
1016 		}
1017 	} else {
1018 		l_ptr->last_retransmitted = msg_seqno(msg);
1019 		l_ptr->stale_count = 1;
1020 	}
1021 
1022 	while (retransmits && (buf != l_ptr->next_out) && buf) {
1023 		msg = buf_msg(buf);
1024 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1025 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1026 		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
1027 		buf = buf->next;
1028 		retransmits--;
1029 		l_ptr->stats.retransmitted++;
1030 	}
1031 
1032 	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1033 }
1034 
1035 /**
1036  * link_insert_deferred_queue - insert deferred messages back into receive chain
1037  */
1038 static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1039 						  struct sk_buff *buf)
1040 {
1041 	u32 seq_no;
1042 
1043 	if (l_ptr->oldest_deferred_in == NULL)
1044 		return buf;
1045 
1046 	seq_no = buf_seqno(l_ptr->oldest_deferred_in);
1047 	if (seq_no == mod(l_ptr->next_in_no)) {
1048 		l_ptr->newest_deferred_in->next = buf;
1049 		buf = l_ptr->oldest_deferred_in;
1050 		l_ptr->oldest_deferred_in = NULL;
1051 		l_ptr->deferred_inqueue_sz = 0;
1052 	}
1053 	return buf;
1054 }
1055 
1056 /**
1057  * link_recv_buf_validate - validate basic format of received message
1058  *
1059  * This routine ensures a TIPC message has an acceptable header, and at least
1060  * as much data as the header indicates it should.  The routine also ensures
1061  * that the entire message header is stored in the main fragment of the message
1062  * buffer, to simplify future access to message header fields.
1063  *
1064  * Note: Having extra info present in the message header or data areas is OK.
1065  * TIPC will ignore the excess, under the assumption that it is optional info
1066  * introduced by a later release of the protocol.
1067  */
1068 static int link_recv_buf_validate(struct sk_buff *buf)
1069 {
1070 	static u32 min_data_hdr_size[8] = {
1071 		SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1072 		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1073 		};
1074 
1075 	struct tipc_msg *msg;
1076 	u32 tipc_hdr[2];
1077 	u32 size;
1078 	u32 hdr_size;
1079 	u32 min_hdr_size;
1080 
1081 	/* If this packet comes from the defer queue, the skb has already
1082 	 * been validated
1083 	 */
1084 	if (unlikely(TIPC_SKB_CB(buf)->deferred))
1085 		return 1;
1086 
1087 	if (unlikely(buf->len < MIN_H_SIZE))
1088 		return 0;
1089 
1090 	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1091 	if (msg == NULL)
1092 		return 0;
1093 
1094 	if (unlikely(msg_version(msg) != TIPC_VERSION))
1095 		return 0;
1096 
1097 	size = msg_size(msg);
1098 	hdr_size = msg_hdr_sz(msg);
1099 	min_hdr_size = msg_isdata(msg) ?
1100 		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1101 
1102 	if (unlikely((hdr_size < min_hdr_size) ||
1103 		     (size < hdr_size) ||
1104 		     (buf->len < size) ||
1105 		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1106 		return 0;
1107 
1108 	return pskb_may_pull(buf, hdr_size);
1109 }
1110 
1111 /**
1112  * tipc_rcv - process TIPC packets/messages arriving from off-node
1113  * @head: pointer to message buffer chain
1114  * @b_ptr: pointer to bearer message arrived on
1115  *
1116  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1117  * structure (i.e. cannot be NULL), but bearer can be inactive.
1118  */
1119 void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1120 {
1121 	while (head) {
1122 		struct tipc_node *n_ptr;
1123 		struct tipc_link *l_ptr;
1124 		struct sk_buff *crs;
1125 		struct sk_buff *buf = head;
1126 		struct tipc_msg *msg;
1127 		u32 seq_no;
1128 		u32 ackd;
1129 		u32 released = 0;
1130 
1131 		head = head->next;
1132 		buf->next = NULL;
1133 
1134 		/* Ensure message is well-formed */
1135 		if (unlikely(!link_recv_buf_validate(buf)))
1136 			goto discard;
1137 
1138 		/* Ensure message data is a single contiguous unit */
1139 		if (unlikely(skb_linearize(buf)))
1140 			goto discard;
1141 
1142 		/* Handle arrival of a non-unicast link message */
1143 		msg = buf_msg(buf);
1144 
1145 		if (unlikely(msg_non_seq(msg))) {
1146 			if (msg_user(msg) ==  LINK_CONFIG)
1147 				tipc_disc_rcv(buf, b_ptr);
1148 			else
1149 				tipc_bclink_rcv(buf);
1150 			continue;
1151 		}
1152 
1153 		/* Discard unicast link messages destined for another node */
1154 		if (unlikely(!msg_short(msg) &&
1155 			     (msg_destnode(msg) != tipc_own_addr)))
1156 			goto discard;
1157 
1158 		/* Locate neighboring node that sent message */
1159 		n_ptr = tipc_node_find(msg_prevnode(msg));
1160 		if (unlikely(!n_ptr))
1161 			goto discard;
1162 		tipc_node_lock(n_ptr);
1163 
1164 		/* Locate unicast link endpoint that should handle message */
1165 		l_ptr = n_ptr->links[b_ptr->identity];
1166 		if (unlikely(!l_ptr))
1167 			goto unlock_discard;
1168 
1169 		/* Verify that communication with node is currently allowed */
1170 		if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1171 		    msg_user(msg) == LINK_PROTOCOL &&
1172 		    (msg_type(msg) == RESET_MSG ||
1173 		    msg_type(msg) == ACTIVATE_MSG) &&
1174 		    !msg_redundant_link(msg))
1175 			n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1176 
1177 		if (tipc_node_blocked(n_ptr))
1178 			goto unlock_discard;
1179 
1180 		/* Validate message sequence number info */
1181 		seq_no = msg_seqno(msg);
1182 		ackd = msg_ack(msg);
1183 
1184 		/* Release acked messages */
1185 		if (n_ptr->bclink.recv_permitted)
1186 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1187 
1188 		crs = l_ptr->first_out;
1189 		while ((crs != l_ptr->next_out) &&
1190 		       less_eq(buf_seqno(crs), ackd)) {
1191 			struct sk_buff *next = crs->next;
1192 			kfree_skb(crs);
1193 			crs = next;
1194 			released++;
1195 		}
1196 		if (released) {
1197 			l_ptr->first_out = crs;
1198 			l_ptr->out_queue_size -= released;
1199 		}
1200 
1201 		/* Try sending any messages link endpoint has pending */
1202 		if (unlikely(l_ptr->next_out))
1203 			tipc_link_push_queue(l_ptr);
1204 
1205 		if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1206 			tipc_link_wakeup_ports(l_ptr, 0);
1207 
1208 		/* Process the incoming packet */
1209 		if (unlikely(!link_working_working(l_ptr))) {
1210 			if (msg_user(msg) == LINK_PROTOCOL) {
1211 				tipc_link_proto_rcv(l_ptr, buf);
1212 				head = link_insert_deferred_queue(l_ptr, head);
1213 				tipc_node_unlock(n_ptr);
1214 				continue;
1215 			}
1216 
1217 			/* Traffic message. Conditionally activate link */
1218 			link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1219 
1220 			if (link_working_working(l_ptr)) {
1221 				/* Re-insert buffer in front of queue */
1222 				buf->next = head;
1223 				head = buf;
1224 				tipc_node_unlock(n_ptr);
1225 				continue;
1226 			}
1227 			goto unlock_discard;
1228 		}
1229 
1230 		/* Link is now in state WORKING_WORKING */
1231 		if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1232 			link_handle_out_of_seq_msg(l_ptr, buf);
1233 			head = link_insert_deferred_queue(l_ptr, head);
1234 			tipc_node_unlock(n_ptr);
1235 			continue;
1236 		}
1237 		l_ptr->next_in_no++;
1238 		if (unlikely(l_ptr->oldest_deferred_in))
1239 			head = link_insert_deferred_queue(l_ptr, head);
1240 
1241 		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1242 			l_ptr->stats.sent_acks++;
1243 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1244 		}
1245 
1246 		if (tipc_link_prepare_input(l_ptr, &buf)) {
1247 			tipc_node_unlock(n_ptr);
1248 			continue;
1249 		}
1250 		tipc_node_unlock(n_ptr);
1251 		msg = buf_msg(buf);
1252 		if (tipc_link_input(l_ptr, buf) != 0)
1253 			goto discard;
1254 		continue;
1255 unlock_discard:
1256 		tipc_node_unlock(n_ptr);
1257 discard:
1258 		kfree_skb(buf);
1259 	}
1260 }
1261 
1262 /**
1263  * tipc_link_prepare_input - process TIPC link messages
1264  *
1265  * returns nonzero if the message was consumed
1266  *
1267  * Node lock must be held
1268  */
1269 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf)
1270 {
1271 	struct tipc_node *n;
1272 	struct tipc_msg *msg;
1273 	int res = -EINVAL;
1274 
1275 	n = l->owner;
1276 	msg = buf_msg(*buf);
1277 	switch (msg_user(msg)) {
1278 	case CHANGEOVER_PROTOCOL:
1279 		if (tipc_link_tunnel_rcv(n, buf))
1280 			res = 0;
1281 		break;
1282 	case MSG_FRAGMENTER:
1283 		l->stats.recv_fragments++;
1284 		if (tipc_buf_append(&l->reasm_buf, buf)) {
1285 			l->stats.recv_fragmented++;
1286 			res = 0;
1287 		} else if (!l->reasm_buf) {
1288 			tipc_link_reset(l);
1289 		}
1290 		break;
1291 	case MSG_BUNDLER:
1292 		l->stats.recv_bundles++;
1293 		l->stats.recv_bundled += msg_msgcnt(msg);
1294 		res = 0;
1295 		break;
1296 	case NAME_DISTRIBUTOR:
1297 		n->bclink.recv_permitted = true;
1298 		res = 0;
1299 		break;
1300 	case BCAST_PROTOCOL:
1301 		tipc_link_sync_rcv(n, *buf);
1302 		break;
1303 	default:
1304 		res = 0;
1305 	}
1306 	return res;
1307 }
1308 /**
1309  * tipc_link_input - Deliver message too higher layers
1310  */
1311 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1312 {
1313 	struct tipc_msg *msg = buf_msg(buf);
1314 	int res = 0;
1315 
1316 	switch (msg_user(msg)) {
1317 	case TIPC_LOW_IMPORTANCE:
1318 	case TIPC_MEDIUM_IMPORTANCE:
1319 	case TIPC_HIGH_IMPORTANCE:
1320 	case TIPC_CRITICAL_IMPORTANCE:
1321 	case CONN_MANAGER:
1322 		tipc_sk_rcv(buf);
1323 		break;
1324 	case NAME_DISTRIBUTOR:
1325 		tipc_named_rcv(buf);
1326 		break;
1327 	case MSG_BUNDLER:
1328 		tipc_link_bundle_rcv(buf);
1329 		break;
1330 	default:
1331 		res = -EINVAL;
1332 	}
1333 	return res;
1334 }
1335 
1336 /**
1337  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1338  *
1339  * Returns increase in queue length (i.e. 0 or 1)
1340  */
1341 u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
1342 			struct sk_buff *buf)
1343 {
1344 	struct sk_buff *queue_buf;
1345 	struct sk_buff **prev;
1346 	u32 seq_no = buf_seqno(buf);
1347 
1348 	buf->next = NULL;
1349 
1350 	/* Empty queue ? */
1351 	if (*head == NULL) {
1352 		*head = *tail = buf;
1353 		return 1;
1354 	}
1355 
1356 	/* Last ? */
1357 	if (less(buf_seqno(*tail), seq_no)) {
1358 		(*tail)->next = buf;
1359 		*tail = buf;
1360 		return 1;
1361 	}
1362 
1363 	/* Locate insertion point in queue, then insert; discard if duplicate */
1364 	prev = head;
1365 	queue_buf = *head;
1366 	for (;;) {
1367 		u32 curr_seqno = buf_seqno(queue_buf);
1368 
1369 		if (seq_no == curr_seqno) {
1370 			kfree_skb(buf);
1371 			return 0;
1372 		}
1373 
1374 		if (less(seq_no, curr_seqno))
1375 			break;
1376 
1377 		prev = &queue_buf->next;
1378 		queue_buf = queue_buf->next;
1379 	}
1380 
1381 	buf->next = queue_buf;
1382 	*prev = buf;
1383 	return 1;
1384 }
1385 
1386 /*
1387  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1388  */
1389 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1390 				       struct sk_buff *buf)
1391 {
1392 	u32 seq_no = buf_seqno(buf);
1393 
1394 	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1395 		tipc_link_proto_rcv(l_ptr, buf);
1396 		return;
1397 	}
1398 
1399 	/* Record OOS packet arrival (force mismatch on next timeout) */
1400 	l_ptr->checkpoint--;
1401 
1402 	/*
1403 	 * Discard packet if a duplicate; otherwise add it to deferred queue
1404 	 * and notify peer of gap as per protocol specification
1405 	 */
1406 	if (less(seq_no, mod(l_ptr->next_in_no))) {
1407 		l_ptr->stats.duplicates++;
1408 		kfree_skb(buf);
1409 		return;
1410 	}
1411 
1412 	if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1413 				&l_ptr->newest_deferred_in, buf)) {
1414 		l_ptr->deferred_inqueue_sz++;
1415 		l_ptr->stats.deferred_recv++;
1416 		TIPC_SKB_CB(buf)->deferred = true;
1417 		if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1418 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1419 	} else
1420 		l_ptr->stats.duplicates++;
1421 }
1422 
1423 /*
1424  * Send protocol message to the other endpoint.
1425  */
1426 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1427 			  u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1428 {
1429 	struct sk_buff *buf = NULL;
1430 	struct tipc_msg *msg = l_ptr->pmsg;
1431 	u32 msg_size = sizeof(l_ptr->proto_msg);
1432 	int r_flag;
1433 
1434 	/* Discard any previous message that was deferred due to congestion */
1435 	if (l_ptr->proto_msg_queue) {
1436 		kfree_skb(l_ptr->proto_msg_queue);
1437 		l_ptr->proto_msg_queue = NULL;
1438 	}
1439 
1440 	/* Don't send protocol message during link changeover */
1441 	if (l_ptr->exp_msg_count)
1442 		return;
1443 
1444 	/* Abort non-RESET send if communication with node is prohibited */
1445 	if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1446 		return;
1447 
1448 	/* Create protocol message with "out-of-sequence" sequence number */
1449 	msg_set_type(msg, msg_typ);
1450 	msg_set_net_plane(msg, l_ptr->net_plane);
1451 	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1452 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1453 
1454 	if (msg_typ == STATE_MSG) {
1455 		u32 next_sent = mod(l_ptr->next_out_no);
1456 
1457 		if (!tipc_link_is_up(l_ptr))
1458 			return;
1459 		if (l_ptr->next_out)
1460 			next_sent = buf_seqno(l_ptr->next_out);
1461 		msg_set_next_sent(msg, next_sent);
1462 		if (l_ptr->oldest_deferred_in) {
1463 			u32 rec = buf_seqno(l_ptr->oldest_deferred_in);
1464 			gap = mod(rec - mod(l_ptr->next_in_no));
1465 		}
1466 		msg_set_seq_gap(msg, gap);
1467 		if (gap)
1468 			l_ptr->stats.sent_nacks++;
1469 		msg_set_link_tolerance(msg, tolerance);
1470 		msg_set_linkprio(msg, priority);
1471 		msg_set_max_pkt(msg, ack_mtu);
1472 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1473 		msg_set_probe(msg, probe_msg != 0);
1474 		if (probe_msg) {
1475 			u32 mtu = l_ptr->max_pkt;
1476 
1477 			if ((mtu < l_ptr->max_pkt_target) &&
1478 			    link_working_working(l_ptr) &&
1479 			    l_ptr->fsm_msg_cnt) {
1480 				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1481 				if (l_ptr->max_pkt_probes == 10) {
1482 					l_ptr->max_pkt_target = (msg_size - 4);
1483 					l_ptr->max_pkt_probes = 0;
1484 					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1485 				}
1486 				l_ptr->max_pkt_probes++;
1487 			}
1488 
1489 			l_ptr->stats.sent_probes++;
1490 		}
1491 		l_ptr->stats.sent_states++;
1492 	} else {		/* RESET_MSG or ACTIVATE_MSG */
1493 		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1494 		msg_set_seq_gap(msg, 0);
1495 		msg_set_next_sent(msg, 1);
1496 		msg_set_probe(msg, 0);
1497 		msg_set_link_tolerance(msg, l_ptr->tolerance);
1498 		msg_set_linkprio(msg, l_ptr->priority);
1499 		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1500 	}
1501 
1502 	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1503 	msg_set_redundant_link(msg, r_flag);
1504 	msg_set_linkprio(msg, l_ptr->priority);
1505 	msg_set_size(msg, msg_size);
1506 
1507 	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1508 
1509 	buf = tipc_buf_acquire(msg_size);
1510 	if (!buf)
1511 		return;
1512 
1513 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1514 	buf->priority = TC_PRIO_CONTROL;
1515 
1516 	tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
1517 	l_ptr->unacked_window = 0;
1518 	kfree_skb(buf);
1519 }
1520 
1521 /*
1522  * Receive protocol message :
1523  * Note that network plane id propagates through the network, and may
1524  * change at any time. The node with lowest address rules
1525  */
1526 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1527 {
1528 	u32 rec_gap = 0;
1529 	u32 max_pkt_info;
1530 	u32 max_pkt_ack;
1531 	u32 msg_tol;
1532 	struct tipc_msg *msg = buf_msg(buf);
1533 
1534 	/* Discard protocol message during link changeover */
1535 	if (l_ptr->exp_msg_count)
1536 		goto exit;
1537 
1538 	if (l_ptr->net_plane != msg_net_plane(msg))
1539 		if (tipc_own_addr > msg_prevnode(msg))
1540 			l_ptr->net_plane = msg_net_plane(msg);
1541 
1542 	switch (msg_type(msg)) {
1543 
1544 	case RESET_MSG:
1545 		if (!link_working_unknown(l_ptr) &&
1546 		    (l_ptr->peer_session != INVALID_SESSION)) {
1547 			if (less_eq(msg_session(msg), l_ptr->peer_session))
1548 				break; /* duplicate or old reset: ignore */
1549 		}
1550 
1551 		if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1552 				link_working_unknown(l_ptr))) {
1553 			/*
1554 			 * peer has lost contact -- don't allow peer's links
1555 			 * to reactivate before we recognize loss & clean up
1556 			 */
1557 			l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1558 		}
1559 
1560 		link_state_event(l_ptr, RESET_MSG);
1561 
1562 		/* fall thru' */
1563 	case ACTIVATE_MSG:
1564 		/* Update link settings according other endpoint's values */
1565 		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1566 
1567 		msg_tol = msg_link_tolerance(msg);
1568 		if (msg_tol > l_ptr->tolerance)
1569 			link_set_supervision_props(l_ptr, msg_tol);
1570 
1571 		if (msg_linkprio(msg) > l_ptr->priority)
1572 			l_ptr->priority = msg_linkprio(msg);
1573 
1574 		max_pkt_info = msg_max_pkt(msg);
1575 		if (max_pkt_info) {
1576 			if (max_pkt_info < l_ptr->max_pkt_target)
1577 				l_ptr->max_pkt_target = max_pkt_info;
1578 			if (l_ptr->max_pkt > l_ptr->max_pkt_target)
1579 				l_ptr->max_pkt = l_ptr->max_pkt_target;
1580 		} else {
1581 			l_ptr->max_pkt = l_ptr->max_pkt_target;
1582 		}
1583 
1584 		/* Synchronize broadcast link info, if not done previously */
1585 		if (!tipc_node_is_up(l_ptr->owner)) {
1586 			l_ptr->owner->bclink.last_sent =
1587 				l_ptr->owner->bclink.last_in =
1588 				msg_last_bcast(msg);
1589 			l_ptr->owner->bclink.oos_state = 0;
1590 		}
1591 
1592 		l_ptr->peer_session = msg_session(msg);
1593 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
1594 
1595 		if (msg_type(msg) == ACTIVATE_MSG)
1596 			link_state_event(l_ptr, ACTIVATE_MSG);
1597 		break;
1598 	case STATE_MSG:
1599 
1600 		msg_tol = msg_link_tolerance(msg);
1601 		if (msg_tol)
1602 			link_set_supervision_props(l_ptr, msg_tol);
1603 
1604 		if (msg_linkprio(msg) &&
1605 		    (msg_linkprio(msg) != l_ptr->priority)) {
1606 			pr_warn("%s<%s>, priority change %u->%u\n",
1607 				link_rst_msg, l_ptr->name, l_ptr->priority,
1608 				msg_linkprio(msg));
1609 			l_ptr->priority = msg_linkprio(msg);
1610 			tipc_link_reset(l_ptr); /* Enforce change to take effect */
1611 			break;
1612 		}
1613 
1614 		/* Record reception; force mismatch at next timeout: */
1615 		l_ptr->checkpoint--;
1616 
1617 		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1618 		l_ptr->stats.recv_states++;
1619 		if (link_reset_unknown(l_ptr))
1620 			break;
1621 
1622 		if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
1623 			rec_gap = mod(msg_next_sent(msg) -
1624 				      mod(l_ptr->next_in_no));
1625 		}
1626 
1627 		max_pkt_ack = msg_max_pkt(msg);
1628 		if (max_pkt_ack > l_ptr->max_pkt) {
1629 			l_ptr->max_pkt = max_pkt_ack;
1630 			l_ptr->max_pkt_probes = 0;
1631 		}
1632 
1633 		max_pkt_ack = 0;
1634 		if (msg_probe(msg)) {
1635 			l_ptr->stats.recv_probes++;
1636 			if (msg_size(msg) > sizeof(l_ptr->proto_msg))
1637 				max_pkt_ack = msg_size(msg);
1638 		}
1639 
1640 		/* Protocol message before retransmits, reduce loss risk */
1641 		if (l_ptr->owner->bclink.recv_permitted)
1642 			tipc_bclink_update_link_state(l_ptr->owner,
1643 						      msg_last_bcast(msg));
1644 
1645 		if (rec_gap || (msg_probe(msg))) {
1646 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
1647 					     0, max_pkt_ack);
1648 		}
1649 		if (msg_seq_gap(msg)) {
1650 			l_ptr->stats.recv_nacks++;
1651 			tipc_link_retransmit(l_ptr, l_ptr->first_out,
1652 					     msg_seq_gap(msg));
1653 		}
1654 		break;
1655 	}
1656 exit:
1657 	kfree_skb(buf);
1658 }
1659 
1660 
1661 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1662  * a different bearer. Owner node is locked.
1663  */
1664 static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1665 				  struct tipc_msg *tunnel_hdr,
1666 				  struct tipc_msg *msg,
1667 				  u32 selector)
1668 {
1669 	struct tipc_link *tunnel;
1670 	struct sk_buff *buf;
1671 	u32 length = msg_size(msg);
1672 
1673 	tunnel = l_ptr->owner->active_links[selector & 1];
1674 	if (!tipc_link_is_up(tunnel)) {
1675 		pr_warn("%stunnel link no longer available\n", link_co_err);
1676 		return;
1677 	}
1678 	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1679 	buf = tipc_buf_acquire(length + INT_H_SIZE);
1680 	if (!buf) {
1681 		pr_warn("%sunable to send tunnel msg\n", link_co_err);
1682 		return;
1683 	}
1684 	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
1685 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
1686 	__tipc_link_xmit(tunnel, buf);
1687 }
1688 
1689 
1690 /* tipc_link_failover_send_queue(): A link has gone down, but a second
1691  * link is still active. We can do failover. Tunnel the failing link's
1692  * whole send queue via the remaining link. This way, we don't lose
1693  * any packets, and sequence order is preserved for subsequent traffic
1694  * sent over the remaining link. Owner node is locked.
1695  */
1696 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1697 {
1698 	u32 msgcount = l_ptr->out_queue_size;
1699 	struct sk_buff *crs = l_ptr->first_out;
1700 	struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1701 	struct tipc_msg tunnel_hdr;
1702 	int split_bundles;
1703 
1704 	if (!tunnel)
1705 		return;
1706 
1707 	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1708 		 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1709 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1710 	msg_set_msgcnt(&tunnel_hdr, msgcount);
1711 
1712 	if (!l_ptr->first_out) {
1713 		struct sk_buff *buf;
1714 
1715 		buf = tipc_buf_acquire(INT_H_SIZE);
1716 		if (buf) {
1717 			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1718 			msg_set_size(&tunnel_hdr, INT_H_SIZE);
1719 			__tipc_link_xmit(tunnel, buf);
1720 		} else {
1721 			pr_warn("%sunable to send changeover msg\n",
1722 				link_co_err);
1723 		}
1724 		return;
1725 	}
1726 
1727 	split_bundles = (l_ptr->owner->active_links[0] !=
1728 			 l_ptr->owner->active_links[1]);
1729 
1730 	while (crs) {
1731 		struct tipc_msg *msg = buf_msg(crs);
1732 
1733 		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1734 			struct tipc_msg *m = msg_get_wrapped(msg);
1735 			unchar *pos = (unchar *)m;
1736 
1737 			msgcount = msg_msgcnt(msg);
1738 			while (msgcount--) {
1739 				msg_set_seqno(m, msg_seqno(msg));
1740 				tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1741 						      msg_link_selector(m));
1742 				pos += align(msg_size(m));
1743 				m = (struct tipc_msg *)pos;
1744 			}
1745 		} else {
1746 			tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1747 					      msg_link_selector(msg));
1748 		}
1749 		crs = crs->next;
1750 	}
1751 }
1752 
1753 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1754  * duplicate of the first link's send queue via the new link. This way, we
1755  * are guaranteed that currently queued packets from a socket are delivered
1756  * before future traffic from the same socket, even if this is using the
1757  * new link. The last arriving copy of each duplicate packet is dropped at
1758  * the receiving end by the regular protocol check, so packet cardinality
1759  * and sequence order is preserved per sender/receiver socket pair.
1760  * Owner node is locked.
1761  */
1762 void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1763 			      struct tipc_link *tunnel)
1764 {
1765 	struct sk_buff *iter;
1766 	struct tipc_msg tunnel_hdr;
1767 
1768 	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1769 		 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1770 	msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
1771 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1772 	iter = l_ptr->first_out;
1773 	while (iter) {
1774 		struct sk_buff *outbuf;
1775 		struct tipc_msg *msg = buf_msg(iter);
1776 		u32 length = msg_size(msg);
1777 
1778 		if (msg_user(msg) == MSG_BUNDLER)
1779 			msg_set_type(msg, CLOSED_MSG);
1780 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
1781 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1782 		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1783 		outbuf = tipc_buf_acquire(length + INT_H_SIZE);
1784 		if (outbuf == NULL) {
1785 			pr_warn("%sunable to send duplicate msg\n",
1786 				link_co_err);
1787 			return;
1788 		}
1789 		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
1790 		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
1791 					       length);
1792 		__tipc_link_xmit(tunnel, outbuf);
1793 		if (!tipc_link_is_up(l_ptr))
1794 			return;
1795 		iter = iter->next;
1796 	}
1797 }
1798 
1799 /**
1800  * buf_extract - extracts embedded TIPC message from another message
1801  * @skb: encapsulating message buffer
1802  * @from_pos: offset to extract from
1803  *
1804  * Returns a new message buffer containing an embedded message.  The
1805  * encapsulating message itself is left unchanged.
1806  */
1807 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1808 {
1809 	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
1810 	u32 size = msg_size(msg);
1811 	struct sk_buff *eb;
1812 
1813 	eb = tipc_buf_acquire(size);
1814 	if (eb)
1815 		skb_copy_to_linear_data(eb, msg, size);
1816 	return eb;
1817 }
1818 
1819 
1820 
1821 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1822  * Owner node is locked.
1823  */
1824 static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
1825 			      struct sk_buff *t_buf)
1826 {
1827 	struct sk_buff *buf;
1828 
1829 	if (!tipc_link_is_up(l_ptr))
1830 		return;
1831 
1832 	buf = buf_extract(t_buf, INT_H_SIZE);
1833 	if (buf == NULL) {
1834 		pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1835 		return;
1836 	}
1837 
1838 	/* Add buffer to deferred queue, if applicable: */
1839 	link_handle_out_of_seq_msg(l_ptr, buf);
1840 }
1841 
1842 /*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
1843  *  Owner node is locked.
1844  */
1845 static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1846 					      struct sk_buff *t_buf)
1847 {
1848 	struct tipc_msg *t_msg = buf_msg(t_buf);
1849 	struct sk_buff *buf = NULL;
1850 	struct tipc_msg *msg;
1851 
1852 	if (tipc_link_is_up(l_ptr))
1853 		tipc_link_reset(l_ptr);
1854 
1855 	/* First failover packet? */
1856 	if (l_ptr->exp_msg_count == START_CHANGEOVER)
1857 		l_ptr->exp_msg_count = msg_msgcnt(t_msg);
1858 
1859 	/* Should there be an inner packet? */
1860 	if (l_ptr->exp_msg_count) {
1861 		l_ptr->exp_msg_count--;
1862 		buf = buf_extract(t_buf, INT_H_SIZE);
1863 		if (buf == NULL) {
1864 			pr_warn("%sno inner failover pkt\n", link_co_err);
1865 			goto exit;
1866 		}
1867 		msg = buf_msg(buf);
1868 
1869 		if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
1870 			kfree_skb(buf);
1871 			buf = NULL;
1872 			goto exit;
1873 		}
1874 		if (msg_user(msg) == MSG_FRAGMENTER) {
1875 			l_ptr->stats.recv_fragments++;
1876 			tipc_buf_append(&l_ptr->reasm_buf, &buf);
1877 		}
1878 	}
1879 exit:
1880 	if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) {
1881 		tipc_node_detach_link(l_ptr->owner, l_ptr);
1882 		kfree(l_ptr);
1883 	}
1884 	return buf;
1885 }
1886 
1887 /*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
1888  *  via other link as result of a failover (ORIGINAL_MSG) or
1889  *  a new active link (DUPLICATE_MSG). Failover packets are
1890  *  returned to the active link for delivery upwards.
1891  *  Owner node is locked.
1892  */
1893 static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
1894 				struct sk_buff **buf)
1895 {
1896 	struct sk_buff *t_buf = *buf;
1897 	struct tipc_link *l_ptr;
1898 	struct tipc_msg *t_msg = buf_msg(t_buf);
1899 	u32 bearer_id = msg_bearer_id(t_msg);
1900 
1901 	*buf = NULL;
1902 
1903 	if (bearer_id >= MAX_BEARERS)
1904 		goto exit;
1905 
1906 	l_ptr = n_ptr->links[bearer_id];
1907 	if (!l_ptr)
1908 		goto exit;
1909 
1910 	if (msg_type(t_msg) == DUPLICATE_MSG)
1911 		tipc_link_dup_rcv(l_ptr, t_buf);
1912 	else if (msg_type(t_msg) == ORIGINAL_MSG)
1913 		*buf = tipc_link_failover_rcv(l_ptr, t_buf);
1914 	else
1915 		pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1916 exit:
1917 	kfree_skb(t_buf);
1918 	return *buf != NULL;
1919 }
1920 
1921 /*
1922  *  Bundler functionality:
1923  */
1924 void tipc_link_bundle_rcv(struct sk_buff *buf)
1925 {
1926 	u32 msgcount = msg_msgcnt(buf_msg(buf));
1927 	u32 pos = INT_H_SIZE;
1928 	struct sk_buff *obuf;
1929 	struct tipc_msg *omsg;
1930 
1931 	while (msgcount--) {
1932 		obuf = buf_extract(buf, pos);
1933 		if (obuf == NULL) {
1934 			pr_warn("Link unable to unbundle message(s)\n");
1935 			break;
1936 		}
1937 		omsg = buf_msg(obuf);
1938 		pos += align(msg_size(omsg));
1939 		if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) {
1940 			tipc_sk_rcv(obuf);
1941 		} else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1942 			tipc_named_rcv(obuf);
1943 		} else {
1944 			pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
1945 			kfree_skb(obuf);
1946 		}
1947 	}
1948 	kfree_skb(buf);
1949 }
1950 
1951 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
1952 {
1953 	if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
1954 		return;
1955 
1956 	l_ptr->tolerance = tolerance;
1957 	l_ptr->continuity_interval =
1958 		((tolerance / 4) > 500) ? 500 : tolerance / 4;
1959 	l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
1960 }
1961 
1962 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
1963 {
1964 	/* Data messages from this node, inclusive FIRST_FRAGM */
1965 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
1966 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
1967 	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
1968 	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
1969 	/* Transiting data messages,inclusive FIRST_FRAGM */
1970 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
1971 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
1972 	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
1973 	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
1974 	l_ptr->queue_limit[CONN_MANAGER] = 1200;
1975 	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
1976 	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
1977 	/* FRAGMENT and LAST_FRAGMENT packets */
1978 	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
1979 }
1980 
1981 /* tipc_link_find_owner - locate owner node of link by link's name
1982  * @name: pointer to link name string
1983  * @bearer_id: pointer to index in 'node->links' array where the link was found.
1984  *
1985  * Returns pointer to node owning the link, or 0 if no matching link is found.
1986  */
1987 static struct tipc_node *tipc_link_find_owner(const char *link_name,
1988 					      unsigned int *bearer_id)
1989 {
1990 	struct tipc_link *l_ptr;
1991 	struct tipc_node *n_ptr;
1992 	struct tipc_node *found_node = 0;
1993 	int i;
1994 
1995 	*bearer_id = 0;
1996 	rcu_read_lock();
1997 	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
1998 		tipc_node_lock(n_ptr);
1999 		for (i = 0; i < MAX_BEARERS; i++) {
2000 			l_ptr = n_ptr->links[i];
2001 			if (l_ptr && !strcmp(l_ptr->name, link_name)) {
2002 				*bearer_id = i;
2003 				found_node = n_ptr;
2004 				break;
2005 			}
2006 		}
2007 		tipc_node_unlock(n_ptr);
2008 		if (found_node)
2009 			break;
2010 	}
2011 	rcu_read_unlock();
2012 
2013 	return found_node;
2014 }
2015 
2016 /**
2017  * link_value_is_valid -- validate proposed link tolerance/priority/window
2018  *
2019  * @cmd: value type (TIPC_CMD_SET_LINK_*)
2020  * @new_value: the new value
2021  *
2022  * Returns 1 if value is within range, 0 if not.
2023  */
2024 static int link_value_is_valid(u16 cmd, u32 new_value)
2025 {
2026 	switch (cmd) {
2027 	case TIPC_CMD_SET_LINK_TOL:
2028 		return (new_value >= TIPC_MIN_LINK_TOL) &&
2029 			(new_value <= TIPC_MAX_LINK_TOL);
2030 	case TIPC_CMD_SET_LINK_PRI:
2031 		return (new_value <= TIPC_MAX_LINK_PRI);
2032 	case TIPC_CMD_SET_LINK_WINDOW:
2033 		return (new_value >= TIPC_MIN_LINK_WIN) &&
2034 			(new_value <= TIPC_MAX_LINK_WIN);
2035 	}
2036 	return 0;
2037 }
2038 
2039 /**
2040  * link_cmd_set_value - change priority/tolerance/window for link/bearer/media
2041  * @name: ptr to link, bearer, or media name
2042  * @new_value: new value of link, bearer, or media setting
2043  * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*)
2044  *
2045  * Caller must hold RTNL lock to ensure link/bearer/media is not deleted.
2046  *
2047  * Returns 0 if value updated and negative value on error.
2048  */
2049 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
2050 {
2051 	struct tipc_node *node;
2052 	struct tipc_link *l_ptr;
2053 	struct tipc_bearer *b_ptr;
2054 	struct tipc_media *m_ptr;
2055 	int bearer_id;
2056 	int res = 0;
2057 
2058 	node = tipc_link_find_owner(name, &bearer_id);
2059 	if (node) {
2060 		tipc_node_lock(node);
2061 		l_ptr = node->links[bearer_id];
2062 
2063 		if (l_ptr) {
2064 			switch (cmd) {
2065 			case TIPC_CMD_SET_LINK_TOL:
2066 				link_set_supervision_props(l_ptr, new_value);
2067 				tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2068 						     new_value, 0, 0);
2069 				break;
2070 			case TIPC_CMD_SET_LINK_PRI:
2071 				l_ptr->priority = new_value;
2072 				tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
2073 						     0, new_value, 0);
2074 				break;
2075 			case TIPC_CMD_SET_LINK_WINDOW:
2076 				tipc_link_set_queue_limits(l_ptr, new_value);
2077 				break;
2078 			default:
2079 				res = -EINVAL;
2080 				break;
2081 			}
2082 		}
2083 		tipc_node_unlock(node);
2084 		return res;
2085 	}
2086 
2087 	b_ptr = tipc_bearer_find(name);
2088 	if (b_ptr) {
2089 		switch (cmd) {
2090 		case TIPC_CMD_SET_LINK_TOL:
2091 			b_ptr->tolerance = new_value;
2092 			break;
2093 		case TIPC_CMD_SET_LINK_PRI:
2094 			b_ptr->priority = new_value;
2095 			break;
2096 		case TIPC_CMD_SET_LINK_WINDOW:
2097 			b_ptr->window = new_value;
2098 			break;
2099 		default:
2100 			res = -EINVAL;
2101 			break;
2102 		}
2103 		return res;
2104 	}
2105 
2106 	m_ptr = tipc_media_find(name);
2107 	if (!m_ptr)
2108 		return -ENODEV;
2109 	switch (cmd) {
2110 	case TIPC_CMD_SET_LINK_TOL:
2111 		m_ptr->tolerance = new_value;
2112 		break;
2113 	case TIPC_CMD_SET_LINK_PRI:
2114 		m_ptr->priority = new_value;
2115 		break;
2116 	case TIPC_CMD_SET_LINK_WINDOW:
2117 		m_ptr->window = new_value;
2118 		break;
2119 	default:
2120 		res = -EINVAL;
2121 		break;
2122 	}
2123 	return res;
2124 }
2125 
2126 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2127 				     u16 cmd)
2128 {
2129 	struct tipc_link_config *args;
2130 	u32 new_value;
2131 	int res;
2132 
2133 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2134 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2135 
2136 	args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2137 	new_value = ntohl(args->value);
2138 
2139 	if (!link_value_is_valid(cmd, new_value))
2140 		return tipc_cfg_reply_error_string(
2141 			"cannot change, value invalid");
2142 
2143 	if (!strcmp(args->name, tipc_bclink_name)) {
2144 		if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2145 		    (tipc_bclink_set_queue_limits(new_value) == 0))
2146 			return tipc_cfg_reply_none();
2147 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2148 						   " (cannot change setting on broadcast link)");
2149 	}
2150 
2151 	res = link_cmd_set_value(args->name, new_value, cmd);
2152 	if (res)
2153 		return tipc_cfg_reply_error_string("cannot change link setting");
2154 
2155 	return tipc_cfg_reply_none();
2156 }
2157 
2158 /**
2159  * link_reset_statistics - reset link statistics
2160  * @l_ptr: pointer to link
2161  */
2162 static void link_reset_statistics(struct tipc_link *l_ptr)
2163 {
2164 	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2165 	l_ptr->stats.sent_info = l_ptr->next_out_no;
2166 	l_ptr->stats.recv_info = l_ptr->next_in_no;
2167 }
2168 
2169 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2170 {
2171 	char *link_name;
2172 	struct tipc_link *l_ptr;
2173 	struct tipc_node *node;
2174 	unsigned int bearer_id;
2175 
2176 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2177 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2178 
2179 	link_name = (char *)TLV_DATA(req_tlv_area);
2180 	if (!strcmp(link_name, tipc_bclink_name)) {
2181 		if (tipc_bclink_reset_stats())
2182 			return tipc_cfg_reply_error_string("link not found");
2183 		return tipc_cfg_reply_none();
2184 	}
2185 	node = tipc_link_find_owner(link_name, &bearer_id);
2186 	if (!node)
2187 		return tipc_cfg_reply_error_string("link not found");
2188 
2189 	tipc_node_lock(node);
2190 	l_ptr = node->links[bearer_id];
2191 	if (!l_ptr) {
2192 		tipc_node_unlock(node);
2193 		return tipc_cfg_reply_error_string("link not found");
2194 	}
2195 	link_reset_statistics(l_ptr);
2196 	tipc_node_unlock(node);
2197 	return tipc_cfg_reply_none();
2198 }
2199 
2200 /**
2201  * percent - convert count to a percentage of total (rounding up or down)
2202  */
2203 static u32 percent(u32 count, u32 total)
2204 {
2205 	return (count * 100 + (total / 2)) / total;
2206 }
2207 
2208 /**
2209  * tipc_link_stats - print link statistics
2210  * @name: link name
2211  * @buf: print buffer area
2212  * @buf_size: size of print buffer area
2213  *
2214  * Returns length of print buffer data string (or 0 if error)
2215  */
2216 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2217 {
2218 	struct tipc_link *l;
2219 	struct tipc_stats *s;
2220 	struct tipc_node *node;
2221 	char *status;
2222 	u32 profile_total = 0;
2223 	unsigned int bearer_id;
2224 	int ret;
2225 
2226 	if (!strcmp(name, tipc_bclink_name))
2227 		return tipc_bclink_stats(buf, buf_size);
2228 
2229 	node = tipc_link_find_owner(name, &bearer_id);
2230 	if (!node)
2231 		return 0;
2232 
2233 	tipc_node_lock(node);
2234 
2235 	l = node->links[bearer_id];
2236 	if (!l) {
2237 		tipc_node_unlock(node);
2238 		return 0;
2239 	}
2240 
2241 	s = &l->stats;
2242 
2243 	if (tipc_link_is_active(l))
2244 		status = "ACTIVE";
2245 	else if (tipc_link_is_up(l))
2246 		status = "STANDBY";
2247 	else
2248 		status = "DEFUNCT";
2249 
2250 	ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
2251 			    "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2252 			    "  Window:%u packets\n",
2253 			    l->name, status, l->max_pkt, l->priority,
2254 			    l->tolerance, l->queue_limit[0]);
2255 
2256 	ret += tipc_snprintf(buf + ret, buf_size - ret,
2257 			     "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2258 			     l->next_in_no - s->recv_info, s->recv_fragments,
2259 			     s->recv_fragmented, s->recv_bundles,
2260 			     s->recv_bundled);
2261 
2262 	ret += tipc_snprintf(buf + ret, buf_size - ret,
2263 			     "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2264 			     l->next_out_no - s->sent_info, s->sent_fragments,
2265 			     s->sent_fragmented, s->sent_bundles,
2266 			     s->sent_bundled);
2267 
2268 	profile_total = s->msg_length_counts;
2269 	if (!profile_total)
2270 		profile_total = 1;
2271 
2272 	ret += tipc_snprintf(buf + ret, buf_size - ret,
2273 			     "  TX profile sample:%u packets  average:%u octets\n"
2274 			     "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2275 			     "-16384:%u%% -32768:%u%% -66000:%u%%\n",
2276 			     s->msg_length_counts,
2277 			     s->msg_lengths_total / profile_total,
2278 			     percent(s->msg_length_profile[0], profile_total),
2279 			     percent(s->msg_length_profile[1], profile_total),
2280 			     percent(s->msg_length_profile[2], profile_total),
2281 			     percent(s->msg_length_profile[3], profile_total),
2282 			     percent(s->msg_length_profile[4], profile_total),
2283 			     percent(s->msg_length_profile[5], profile_total),
2284 			     percent(s->msg_length_profile[6], profile_total));
2285 
2286 	ret += tipc_snprintf(buf + ret, buf_size - ret,
2287 			     "  RX states:%u probes:%u naks:%u defs:%u"
2288 			     " dups:%u\n", s->recv_states, s->recv_probes,
2289 			     s->recv_nacks, s->deferred_recv, s->duplicates);
2290 
2291 	ret += tipc_snprintf(buf + ret, buf_size - ret,
2292 			     "  TX states:%u probes:%u naks:%u acks:%u"
2293 			     " dups:%u\n", s->sent_states, s->sent_probes,
2294 			     s->sent_nacks, s->sent_acks, s->retransmitted);
2295 
2296 	ret += tipc_snprintf(buf + ret, buf_size - ret,
2297 			     "  Congestion link:%u  Send queue"
2298 			     " max:%u avg:%u\n", s->link_congs,
2299 			     s->max_queue_sz, s->queue_sz_counts ?
2300 			     (s->accu_queue_sz / s->queue_sz_counts) : 0);
2301 
2302 	tipc_node_unlock(node);
2303 	return ret;
2304 }
2305 
2306 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2307 {
2308 	struct sk_buff *buf;
2309 	struct tlv_desc *rep_tlv;
2310 	int str_len;
2311 	int pb_len;
2312 	char *pb;
2313 
2314 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2315 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2316 
2317 	buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2318 	if (!buf)
2319 		return NULL;
2320 
2321 	rep_tlv = (struct tlv_desc *)buf->data;
2322 	pb = TLV_DATA(rep_tlv);
2323 	pb_len = ULTRA_STRING_MAX_LEN;
2324 	str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2325 				  pb, pb_len);
2326 	if (!str_len) {
2327 		kfree_skb(buf);
2328 		return tipc_cfg_reply_error_string("link not found");
2329 	}
2330 	str_len += 1;	/* for "\0" */
2331 	skb_put(buf, TLV_SPACE(str_len));
2332 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2333 
2334 	return buf;
2335 }
2336 
2337 /**
2338  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2339  * @dest: network address of destination node
2340  * @selector: used to select from set of active links
2341  *
2342  * If no active link can be found, uses default maximum packet size.
2343  */
2344 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2345 {
2346 	struct tipc_node *n_ptr;
2347 	struct tipc_link *l_ptr;
2348 	u32 res = MAX_PKT_DEFAULT;
2349 
2350 	if (dest == tipc_own_addr)
2351 		return MAX_MSG_SIZE;
2352 
2353 	n_ptr = tipc_node_find(dest);
2354 	if (n_ptr) {
2355 		tipc_node_lock(n_ptr);
2356 		l_ptr = n_ptr->active_links[selector & 1];
2357 		if (l_ptr)
2358 			res = l_ptr->max_pkt;
2359 		tipc_node_unlock(n_ptr);
2360 	}
2361 	return res;
2362 }
2363 
2364 static void link_print(struct tipc_link *l_ptr, const char *str)
2365 {
2366 	struct tipc_bearer *b_ptr;
2367 
2368 	rcu_read_lock();
2369 	b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
2370 	if (b_ptr)
2371 		pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
2372 	rcu_read_unlock();
2373 
2374 	if (link_working_unknown(l_ptr))
2375 		pr_cont(":WU\n");
2376 	else if (link_reset_reset(l_ptr))
2377 		pr_cont(":RR\n");
2378 	else if (link_reset_unknown(l_ptr))
2379 		pr_cont(":RU\n");
2380 	else if (link_working_working(l_ptr))
2381 		pr_cont(":WW\n");
2382 	else
2383 		pr_cont("\n");
2384 }
2385