xref: /openbmc/linux/net/tipc/link.c (revision baa7eb025ab14f3cba2e35c0a8648f9c9f01d24f)
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, Ericsson AB
5  * Copyright (c) 2004-2007, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "link.h"
39 #include "port.h"
40 #include "name_distr.h"
41 #include "discover.h"
42 #include "config.h"
43 
44 
45 /*
46  * Out-of-range value for link session numbers
47  */
48 
49 #define INVALID_SESSION 0x10000
50 
51 /*
52  * Link state events:
53  */
54 
55 #define  STARTING_EVT    856384768	/* link processing trigger */
56 #define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
57 #define  TIMEOUT_EVT     560817u	/* link timer expired */
58 
59 /*
60  * The following two 'message types' is really just implementation
61  * data conveniently stored in the message header.
62  * They must not be considered part of the protocol
63  */
64 #define OPEN_MSG   0
65 #define CLOSED_MSG 1
66 
67 /*
68  * State value stored in 'exp_msg_count'
69  */
70 
71 #define START_CHANGEOVER 100000u
72 
73 /**
74  * struct link_name - deconstructed link name
75  * @addr_local: network address of node at this end
76  * @if_local: name of interface at this end
77  * @addr_peer: network address of node at far end
78  * @if_peer: name of interface at far end
79  */
80 
81 struct link_name {
82 	u32 addr_local;
83 	char if_local[TIPC_MAX_IF_NAME];
84 	u32 addr_peer;
85 	char if_peer[TIPC_MAX_IF_NAME];
86 };
87 
88 static void link_handle_out_of_seq_msg(struct link *l_ptr,
89 				       struct sk_buff *buf);
90 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
91 static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
92 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
93 static int  link_send_sections_long(struct port *sender,
94 				    struct iovec const *msg_sect,
95 				    u32 num_sect, u32 destnode);
96 static void link_check_defragm_bufs(struct link *l_ptr);
97 static void link_state_event(struct link *l_ptr, u32 event);
98 static void link_reset_statistics(struct link *l_ptr);
99 static void link_print(struct link *l_ptr, const char *str);
100 static void link_start(struct link *l_ptr);
101 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf);
102 
103 /*
104  *  Simple link routines
105  */
106 
107 static unsigned int align(unsigned int i)
108 {
109 	return (i + 3) & ~3u;
110 }
111 
112 static void link_init_max_pkt(struct link *l_ptr)
113 {
114 	u32 max_pkt;
115 
116 	max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
117 	if (max_pkt > MAX_MSG_SIZE)
118 		max_pkt = MAX_MSG_SIZE;
119 
120 	l_ptr->max_pkt_target = max_pkt;
121 	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
122 		l_ptr->max_pkt = l_ptr->max_pkt_target;
123 	else
124 		l_ptr->max_pkt = MAX_PKT_DEFAULT;
125 
126 	l_ptr->max_pkt_probes = 0;
127 }
128 
129 static u32 link_next_sent(struct link *l_ptr)
130 {
131 	if (l_ptr->next_out)
132 		return msg_seqno(buf_msg(l_ptr->next_out));
133 	return mod(l_ptr->next_out_no);
134 }
135 
136 static u32 link_last_sent(struct link *l_ptr)
137 {
138 	return mod(link_next_sent(l_ptr) - 1);
139 }
140 
141 /*
142  *  Simple non-static link routines (i.e. referenced outside this file)
143  */
144 
145 int tipc_link_is_up(struct link *l_ptr)
146 {
147 	if (!l_ptr)
148 		return 0;
149 	return link_working_working(l_ptr) || link_working_unknown(l_ptr);
150 }
151 
152 int tipc_link_is_active(struct link *l_ptr)
153 {
154 	return	(l_ptr->owner->active_links[0] == l_ptr) ||
155 		(l_ptr->owner->active_links[1] == l_ptr);
156 }
157 
158 /**
159  * link_name_validate - validate & (optionally) deconstruct link name
160  * @name - ptr to link name string
161  * @name_parts - ptr to area for link name components (or NULL if not needed)
162  *
163  * Returns 1 if link name is valid, otherwise 0.
164  */
165 
166 static int link_name_validate(const char *name, struct link_name *name_parts)
167 {
168 	char name_copy[TIPC_MAX_LINK_NAME];
169 	char *addr_local;
170 	char *if_local;
171 	char *addr_peer;
172 	char *if_peer;
173 	char dummy;
174 	u32 z_local, c_local, n_local;
175 	u32 z_peer, c_peer, n_peer;
176 	u32 if_local_len;
177 	u32 if_peer_len;
178 
179 	/* copy link name & ensure length is OK */
180 
181 	name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
182 	/* need above in case non-Posix strncpy() doesn't pad with nulls */
183 	strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
184 	if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
185 		return 0;
186 
187 	/* ensure all component parts of link name are present */
188 
189 	addr_local = name_copy;
190 	if_local = strchr(addr_local, ':');
191 	if (if_local == NULL)
192 		return 0;
193 	*(if_local++) = 0;
194 	addr_peer = strchr(if_local, '-');
195 	if (addr_peer == NULL)
196 		return 0;
197 	*(addr_peer++) = 0;
198 	if_local_len = addr_peer - if_local;
199 	if_peer = strchr(addr_peer, ':');
200 	if (if_peer == NULL)
201 		return 0;
202 	*(if_peer++) = 0;
203 	if_peer_len = strlen(if_peer) + 1;
204 
205 	/* validate component parts of link name */
206 
207 	if ((sscanf(addr_local, "%u.%u.%u%c",
208 		    &z_local, &c_local, &n_local, &dummy) != 3) ||
209 	    (sscanf(addr_peer, "%u.%u.%u%c",
210 		    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
211 	    (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
212 	    (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
213 	    (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
214 	    (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
215 	    (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
216 	    (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
217 		return 0;
218 
219 	/* return link name components, if necessary */
220 
221 	if (name_parts) {
222 		name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
223 		strcpy(name_parts->if_local, if_local);
224 		name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
225 		strcpy(name_parts->if_peer, if_peer);
226 	}
227 	return 1;
228 }
229 
230 /**
231  * link_timeout - handle expiration of link timer
232  * @l_ptr: pointer to link
233  *
234  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
235  * with tipc_link_delete().  (There is no risk that the node will be deleted by
236  * another thread because tipc_link_delete() always cancels the link timer before
237  * tipc_node_delete() is called.)
238  */
239 
240 static void link_timeout(struct link *l_ptr)
241 {
242 	tipc_node_lock(l_ptr->owner);
243 
244 	/* update counters used in statistical profiling of send traffic */
245 
246 	l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
247 	l_ptr->stats.queue_sz_counts++;
248 
249 	if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
250 		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
251 
252 	if (l_ptr->first_out) {
253 		struct tipc_msg *msg = buf_msg(l_ptr->first_out);
254 		u32 length = msg_size(msg);
255 
256 		if ((msg_user(msg) == MSG_FRAGMENTER) &&
257 		    (msg_type(msg) == FIRST_FRAGMENT)) {
258 			length = msg_size(msg_get_wrapped(msg));
259 		}
260 		if (length) {
261 			l_ptr->stats.msg_lengths_total += length;
262 			l_ptr->stats.msg_length_counts++;
263 			if (length <= 64)
264 				l_ptr->stats.msg_length_profile[0]++;
265 			else if (length <= 256)
266 				l_ptr->stats.msg_length_profile[1]++;
267 			else if (length <= 1024)
268 				l_ptr->stats.msg_length_profile[2]++;
269 			else if (length <= 4096)
270 				l_ptr->stats.msg_length_profile[3]++;
271 			else if (length <= 16384)
272 				l_ptr->stats.msg_length_profile[4]++;
273 			else if (length <= 32768)
274 				l_ptr->stats.msg_length_profile[5]++;
275 			else
276 				l_ptr->stats.msg_length_profile[6]++;
277 		}
278 	}
279 
280 	/* do all other link processing performed on a periodic basis */
281 
282 	link_check_defragm_bufs(l_ptr);
283 
284 	link_state_event(l_ptr, TIMEOUT_EVT);
285 
286 	if (l_ptr->next_out)
287 		tipc_link_push_queue(l_ptr);
288 
289 	tipc_node_unlock(l_ptr->owner);
290 }
291 
292 static void link_set_timer(struct link *l_ptr, u32 time)
293 {
294 	k_start_timer(&l_ptr->timer, time);
295 }
296 
297 /**
298  * tipc_link_create - create a new link
299  * @b_ptr: pointer to associated bearer
300  * @peer: network address of node at other end of link
301  * @media_addr: media address to use when sending messages over link
302  *
303  * Returns pointer to link.
304  */
305 
306 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
307 			      const struct tipc_media_addr *media_addr)
308 {
309 	struct link *l_ptr;
310 	struct tipc_msg *msg;
311 	char *if_name;
312 
313 	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
314 	if (!l_ptr) {
315 		warn("Link creation failed, no memory\n");
316 		return NULL;
317 	}
318 
319 	l_ptr->addr = peer;
320 	if_name = strchr(b_ptr->publ.name, ':') + 1;
321 	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
322 		tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
323 		tipc_node(tipc_own_addr),
324 		if_name,
325 		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
326 		/* note: peer i/f is appended to link name by reset/activate */
327 	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
328 	l_ptr->checkpoint = 1;
329 	l_ptr->b_ptr = b_ptr;
330 	link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
331 	l_ptr->state = RESET_UNKNOWN;
332 
333 	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
334 	msg = l_ptr->pmsg;
335 	tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
336 	msg_set_size(msg, sizeof(l_ptr->proto_msg));
337 	msg_set_session(msg, (tipc_random & 0xffff));
338 	msg_set_bearer_id(msg, b_ptr->identity);
339 	strcpy((char *)msg_data(msg), if_name);
340 
341 	l_ptr->priority = b_ptr->priority;
342 	tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
343 
344 	link_init_max_pkt(l_ptr);
345 
346 	l_ptr->next_out_no = 1;
347 	INIT_LIST_HEAD(&l_ptr->waiting_ports);
348 
349 	link_reset_statistics(l_ptr);
350 
351 	l_ptr->owner = tipc_node_attach_link(l_ptr);
352 	if (!l_ptr->owner) {
353 		kfree(l_ptr);
354 		return NULL;
355 	}
356 
357 	k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
358 	list_add_tail(&l_ptr->link_list, &b_ptr->links);
359 	tipc_k_signal((Handler)link_start, (unsigned long)l_ptr);
360 
361 	return l_ptr;
362 }
363 
364 /**
365  * tipc_link_delete - delete a link
366  * @l_ptr: pointer to link
367  *
368  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
369  * This routine must not grab the node lock until after link timer cancellation
370  * to avoid a potential deadlock situation.
371  */
372 
373 void tipc_link_delete(struct link *l_ptr)
374 {
375 	if (!l_ptr) {
376 		err("Attempt to delete non-existent link\n");
377 		return;
378 	}
379 
380 	k_cancel_timer(&l_ptr->timer);
381 
382 	tipc_node_lock(l_ptr->owner);
383 	tipc_link_reset(l_ptr);
384 	tipc_node_detach_link(l_ptr->owner, l_ptr);
385 	tipc_link_stop(l_ptr);
386 	list_del_init(&l_ptr->link_list);
387 	tipc_node_unlock(l_ptr->owner);
388 	k_term_timer(&l_ptr->timer);
389 	kfree(l_ptr);
390 }
391 
392 static void link_start(struct link *l_ptr)
393 {
394 	link_state_event(l_ptr, STARTING_EVT);
395 }
396 
397 /**
398  * link_schedule_port - schedule port for deferred sending
399  * @l_ptr: pointer to link
400  * @origport: reference to sending port
401  * @sz: amount of data to be sent
402  *
403  * Schedules port for renewed sending of messages after link congestion
404  * has abated.
405  */
406 
407 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
408 {
409 	struct port *p_ptr;
410 
411 	spin_lock_bh(&tipc_port_list_lock);
412 	p_ptr = tipc_port_lock(origport);
413 	if (p_ptr) {
414 		if (!p_ptr->wakeup)
415 			goto exit;
416 		if (!list_empty(&p_ptr->wait_list))
417 			goto exit;
418 		p_ptr->publ.congested = 1;
419 		p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
420 		list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
421 		l_ptr->stats.link_congs++;
422 exit:
423 		tipc_port_unlock(p_ptr);
424 	}
425 	spin_unlock_bh(&tipc_port_list_lock);
426 	return -ELINKCONG;
427 }
428 
429 void tipc_link_wakeup_ports(struct link *l_ptr, int all)
430 {
431 	struct port *p_ptr;
432 	struct port *temp_p_ptr;
433 	int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
434 
435 	if (all)
436 		win = 100000;
437 	if (win <= 0)
438 		return;
439 	if (!spin_trylock_bh(&tipc_port_list_lock))
440 		return;
441 	if (link_congested(l_ptr))
442 		goto exit;
443 	list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
444 				 wait_list) {
445 		if (win <= 0)
446 			break;
447 		list_del_init(&p_ptr->wait_list);
448 		spin_lock_bh(p_ptr->publ.lock);
449 		p_ptr->publ.congested = 0;
450 		p_ptr->wakeup(&p_ptr->publ);
451 		win -= p_ptr->waiting_pkts;
452 		spin_unlock_bh(p_ptr->publ.lock);
453 	}
454 
455 exit:
456 	spin_unlock_bh(&tipc_port_list_lock);
457 }
458 
459 /**
460  * link_release_outqueue - purge link's outbound message queue
461  * @l_ptr: pointer to link
462  */
463 
464 static void link_release_outqueue(struct link *l_ptr)
465 {
466 	struct sk_buff *buf = l_ptr->first_out;
467 	struct sk_buff *next;
468 
469 	while (buf) {
470 		next = buf->next;
471 		buf_discard(buf);
472 		buf = next;
473 	}
474 	l_ptr->first_out = NULL;
475 	l_ptr->out_queue_size = 0;
476 }
477 
478 /**
479  * tipc_link_reset_fragments - purge link's inbound message fragments queue
480  * @l_ptr: pointer to link
481  */
482 
483 void tipc_link_reset_fragments(struct link *l_ptr)
484 {
485 	struct sk_buff *buf = l_ptr->defragm_buf;
486 	struct sk_buff *next;
487 
488 	while (buf) {
489 		next = buf->next;
490 		buf_discard(buf);
491 		buf = next;
492 	}
493 	l_ptr->defragm_buf = NULL;
494 }
495 
496 /**
497  * tipc_link_stop - purge all inbound and outbound messages associated with link
498  * @l_ptr: pointer to link
499  */
500 
501 void tipc_link_stop(struct link *l_ptr)
502 {
503 	struct sk_buff *buf;
504 	struct sk_buff *next;
505 
506 	buf = l_ptr->oldest_deferred_in;
507 	while (buf) {
508 		next = buf->next;
509 		buf_discard(buf);
510 		buf = next;
511 	}
512 
513 	buf = l_ptr->first_out;
514 	while (buf) {
515 		next = buf->next;
516 		buf_discard(buf);
517 		buf = next;
518 	}
519 
520 	tipc_link_reset_fragments(l_ptr);
521 
522 	buf_discard(l_ptr->proto_msg_queue);
523 	l_ptr->proto_msg_queue = NULL;
524 }
525 
526 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
527 #define link_send_event(fcn, l_ptr, up) do { } while (0)
528 
529 void tipc_link_reset(struct link *l_ptr)
530 {
531 	struct sk_buff *buf;
532 	u32 prev_state = l_ptr->state;
533 	u32 checkpoint = l_ptr->next_in_no;
534 	int was_active_link = tipc_link_is_active(l_ptr);
535 
536 	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
537 
538 	/* Link is down, accept any session */
539 	l_ptr->peer_session = INVALID_SESSION;
540 
541 	/* Prepare for max packet size negotiation */
542 	link_init_max_pkt(l_ptr);
543 
544 	l_ptr->state = RESET_UNKNOWN;
545 
546 	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
547 		return;
548 
549 	tipc_node_link_down(l_ptr->owner, l_ptr);
550 	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
551 
552 	if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
553 	    l_ptr->owner->permit_changeover) {
554 		l_ptr->reset_checkpoint = checkpoint;
555 		l_ptr->exp_msg_count = START_CHANGEOVER;
556 	}
557 
558 	/* Clean up all queues: */
559 
560 	link_release_outqueue(l_ptr);
561 	buf_discard(l_ptr->proto_msg_queue);
562 	l_ptr->proto_msg_queue = NULL;
563 	buf = l_ptr->oldest_deferred_in;
564 	while (buf) {
565 		struct sk_buff *next = buf->next;
566 		buf_discard(buf);
567 		buf = next;
568 	}
569 	if (!list_empty(&l_ptr->waiting_ports))
570 		tipc_link_wakeup_ports(l_ptr, 1);
571 
572 	l_ptr->retransm_queue_head = 0;
573 	l_ptr->retransm_queue_size = 0;
574 	l_ptr->last_out = NULL;
575 	l_ptr->first_out = NULL;
576 	l_ptr->next_out = NULL;
577 	l_ptr->unacked_window = 0;
578 	l_ptr->checkpoint = 1;
579 	l_ptr->next_out_no = 1;
580 	l_ptr->deferred_inqueue_sz = 0;
581 	l_ptr->oldest_deferred_in = NULL;
582 	l_ptr->newest_deferred_in = NULL;
583 	l_ptr->fsm_msg_cnt = 0;
584 	l_ptr->stale_count = 0;
585 	link_reset_statistics(l_ptr);
586 
587 	link_send_event(tipc_cfg_link_event, l_ptr, 0);
588 	if (!in_own_cluster(l_ptr->addr))
589 		link_send_event(tipc_disc_link_event, l_ptr, 0);
590 }
591 
592 
593 static void link_activate(struct link *l_ptr)
594 {
595 	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
596 	tipc_node_link_up(l_ptr->owner, l_ptr);
597 	tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
598 	link_send_event(tipc_cfg_link_event, l_ptr, 1);
599 	if (!in_own_cluster(l_ptr->addr))
600 		link_send_event(tipc_disc_link_event, l_ptr, 1);
601 }
602 
603 /**
604  * link_state_event - link finite state machine
605  * @l_ptr: pointer to link
606  * @event: state machine event to process
607  */
608 
609 static void link_state_event(struct link *l_ptr, unsigned event)
610 {
611 	struct link *other;
612 	u32 cont_intv = l_ptr->continuity_interval;
613 
614 	if (!l_ptr->started && (event != STARTING_EVT))
615 		return;		/* Not yet. */
616 
617 	if (link_blocked(l_ptr)) {
618 		if (event == TIMEOUT_EVT)
619 			link_set_timer(l_ptr, cont_intv);
620 		return;	  /* Changeover going on */
621 	}
622 
623 	switch (l_ptr->state) {
624 	case WORKING_WORKING:
625 		switch (event) {
626 		case TRAFFIC_MSG_EVT:
627 		case ACTIVATE_MSG:
628 			break;
629 		case TIMEOUT_EVT:
630 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
631 				l_ptr->checkpoint = l_ptr->next_in_no;
632 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
633 					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
634 								 0, 0, 0, 0, 0);
635 					l_ptr->fsm_msg_cnt++;
636 				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
637 					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
638 								 1, 0, 0, 0, 0);
639 					l_ptr->fsm_msg_cnt++;
640 				}
641 				link_set_timer(l_ptr, cont_intv);
642 				break;
643 			}
644 			l_ptr->state = WORKING_UNKNOWN;
645 			l_ptr->fsm_msg_cnt = 0;
646 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
647 			l_ptr->fsm_msg_cnt++;
648 			link_set_timer(l_ptr, cont_intv / 4);
649 			break;
650 		case RESET_MSG:
651 			info("Resetting link <%s>, requested by peer\n",
652 			     l_ptr->name);
653 			tipc_link_reset(l_ptr);
654 			l_ptr->state = RESET_RESET;
655 			l_ptr->fsm_msg_cnt = 0;
656 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
657 			l_ptr->fsm_msg_cnt++;
658 			link_set_timer(l_ptr, cont_intv);
659 			break;
660 		default:
661 			err("Unknown link event %u in WW state\n", event);
662 		}
663 		break;
664 	case WORKING_UNKNOWN:
665 		switch (event) {
666 		case TRAFFIC_MSG_EVT:
667 		case ACTIVATE_MSG:
668 			l_ptr->state = WORKING_WORKING;
669 			l_ptr->fsm_msg_cnt = 0;
670 			link_set_timer(l_ptr, cont_intv);
671 			break;
672 		case RESET_MSG:
673 			info("Resetting link <%s>, requested by peer "
674 			     "while probing\n", l_ptr->name);
675 			tipc_link_reset(l_ptr);
676 			l_ptr->state = RESET_RESET;
677 			l_ptr->fsm_msg_cnt = 0;
678 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
679 			l_ptr->fsm_msg_cnt++;
680 			link_set_timer(l_ptr, cont_intv);
681 			break;
682 		case TIMEOUT_EVT:
683 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
684 				l_ptr->state = WORKING_WORKING;
685 				l_ptr->fsm_msg_cnt = 0;
686 				l_ptr->checkpoint = l_ptr->next_in_no;
687 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
688 					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
689 								 0, 0, 0, 0, 0);
690 					l_ptr->fsm_msg_cnt++;
691 				}
692 				link_set_timer(l_ptr, cont_intv);
693 			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
694 				tipc_link_send_proto_msg(l_ptr, STATE_MSG,
695 							 1, 0, 0, 0, 0);
696 				l_ptr->fsm_msg_cnt++;
697 				link_set_timer(l_ptr, cont_intv / 4);
698 			} else {	/* Link has failed */
699 				warn("Resetting link <%s>, peer not responding\n",
700 				     l_ptr->name);
701 				tipc_link_reset(l_ptr);
702 				l_ptr->state = RESET_UNKNOWN;
703 				l_ptr->fsm_msg_cnt = 0;
704 				tipc_link_send_proto_msg(l_ptr, RESET_MSG,
705 							 0, 0, 0, 0, 0);
706 				l_ptr->fsm_msg_cnt++;
707 				link_set_timer(l_ptr, cont_intv);
708 			}
709 			break;
710 		default:
711 			err("Unknown link event %u in WU state\n", event);
712 		}
713 		break;
714 	case RESET_UNKNOWN:
715 		switch (event) {
716 		case TRAFFIC_MSG_EVT:
717 			break;
718 		case ACTIVATE_MSG:
719 			other = l_ptr->owner->active_links[0];
720 			if (other && link_working_unknown(other))
721 				break;
722 			l_ptr->state = WORKING_WORKING;
723 			l_ptr->fsm_msg_cnt = 0;
724 			link_activate(l_ptr);
725 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
726 			l_ptr->fsm_msg_cnt++;
727 			link_set_timer(l_ptr, cont_intv);
728 			break;
729 		case RESET_MSG:
730 			l_ptr->state = RESET_RESET;
731 			l_ptr->fsm_msg_cnt = 0;
732 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
733 			l_ptr->fsm_msg_cnt++;
734 			link_set_timer(l_ptr, cont_intv);
735 			break;
736 		case STARTING_EVT:
737 			l_ptr->started = 1;
738 			/* fall through */
739 		case TIMEOUT_EVT:
740 			tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
741 			l_ptr->fsm_msg_cnt++;
742 			link_set_timer(l_ptr, cont_intv);
743 			break;
744 		default:
745 			err("Unknown link event %u in RU state\n", event);
746 		}
747 		break;
748 	case RESET_RESET:
749 		switch (event) {
750 		case TRAFFIC_MSG_EVT:
751 		case ACTIVATE_MSG:
752 			other = l_ptr->owner->active_links[0];
753 			if (other && link_working_unknown(other))
754 				break;
755 			l_ptr->state = WORKING_WORKING;
756 			l_ptr->fsm_msg_cnt = 0;
757 			link_activate(l_ptr);
758 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
759 			l_ptr->fsm_msg_cnt++;
760 			link_set_timer(l_ptr, cont_intv);
761 			break;
762 		case RESET_MSG:
763 			break;
764 		case TIMEOUT_EVT:
765 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
766 			l_ptr->fsm_msg_cnt++;
767 			link_set_timer(l_ptr, cont_intv);
768 			break;
769 		default:
770 			err("Unknown link event %u in RR state\n", event);
771 		}
772 		break;
773 	default:
774 		err("Unknown link state %u/%u\n", l_ptr->state, event);
775 	}
776 }
777 
778 /*
779  * link_bundle_buf(): Append contents of a buffer to
780  * the tail of an existing one.
781  */
782 
783 static int link_bundle_buf(struct link *l_ptr,
784 			   struct sk_buff *bundler,
785 			   struct sk_buff *buf)
786 {
787 	struct tipc_msg *bundler_msg = buf_msg(bundler);
788 	struct tipc_msg *msg = buf_msg(buf);
789 	u32 size = msg_size(msg);
790 	u32 bundle_size = msg_size(bundler_msg);
791 	u32 to_pos = align(bundle_size);
792 	u32 pad = to_pos - bundle_size;
793 
794 	if (msg_user(bundler_msg) != MSG_BUNDLER)
795 		return 0;
796 	if (msg_type(bundler_msg) != OPEN_MSG)
797 		return 0;
798 	if (skb_tailroom(bundler) < (pad + size))
799 		return 0;
800 	if (l_ptr->max_pkt < (to_pos + size))
801 		return 0;
802 
803 	skb_put(bundler, pad + size);
804 	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
805 	msg_set_size(bundler_msg, to_pos + size);
806 	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
807 	buf_discard(buf);
808 	l_ptr->stats.sent_bundled++;
809 	return 1;
810 }
811 
812 static void link_add_to_outqueue(struct link *l_ptr,
813 				 struct sk_buff *buf,
814 				 struct tipc_msg *msg)
815 {
816 	u32 ack = mod(l_ptr->next_in_no - 1);
817 	u32 seqno = mod(l_ptr->next_out_no++);
818 
819 	msg_set_word(msg, 2, ((ack << 16) | seqno));
820 	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
821 	buf->next = NULL;
822 	if (l_ptr->first_out) {
823 		l_ptr->last_out->next = buf;
824 		l_ptr->last_out = buf;
825 	} else
826 		l_ptr->first_out = l_ptr->last_out = buf;
827 	l_ptr->out_queue_size++;
828 }
829 
830 /*
831  * tipc_link_send_buf() is the 'full path' for messages, called from
832  * inside TIPC when the 'fast path' in tipc_send_buf
833  * has failed, and from link_send()
834  */
835 
836 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
837 {
838 	struct tipc_msg *msg = buf_msg(buf);
839 	u32 size = msg_size(msg);
840 	u32 dsz = msg_data_sz(msg);
841 	u32 queue_size = l_ptr->out_queue_size;
842 	u32 imp = tipc_msg_tot_importance(msg);
843 	u32 queue_limit = l_ptr->queue_limit[imp];
844 	u32 max_packet = l_ptr->max_pkt;
845 
846 	msg_set_prevnode(msg, tipc_own_addr);	/* If routed message */
847 
848 	/* Match msg importance against queue limits: */
849 
850 	if (unlikely(queue_size >= queue_limit)) {
851 		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
852 			return link_schedule_port(l_ptr, msg_origport(msg),
853 						  size);
854 		}
855 		buf_discard(buf);
856 		if (imp > CONN_MANAGER) {
857 			warn("Resetting link <%s>, send queue full", l_ptr->name);
858 			tipc_link_reset(l_ptr);
859 		}
860 		return dsz;
861 	}
862 
863 	/* Fragmentation needed ? */
864 
865 	if (size > max_packet)
866 		return link_send_long_buf(l_ptr, buf);
867 
868 	/* Packet can be queued or sent: */
869 
870 	if (queue_size > l_ptr->stats.max_queue_sz)
871 		l_ptr->stats.max_queue_sz = queue_size;
872 
873 	if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
874 		   !link_congested(l_ptr))) {
875 		link_add_to_outqueue(l_ptr, buf, msg);
876 
877 		if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
878 			l_ptr->unacked_window = 0;
879 		} else {
880 			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
881 			l_ptr->stats.bearer_congs++;
882 			l_ptr->next_out = buf;
883 		}
884 		return dsz;
885 	}
886 	/* Congestion: can message be bundled ?: */
887 
888 	if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
889 	    (msg_user(msg) != MSG_FRAGMENTER)) {
890 
891 		/* Try adding message to an existing bundle */
892 
893 		if (l_ptr->next_out &&
894 		    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
895 			tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
896 			return dsz;
897 		}
898 
899 		/* Try creating a new bundle */
900 
901 		if (size <= max_packet * 2 / 3) {
902 			struct sk_buff *bundler = tipc_buf_acquire(max_packet);
903 			struct tipc_msg bundler_hdr;
904 
905 			if (bundler) {
906 				tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
907 					 INT_H_SIZE, l_ptr->addr);
908 				skb_copy_to_linear_data(bundler, &bundler_hdr,
909 							INT_H_SIZE);
910 				skb_trim(bundler, INT_H_SIZE);
911 				link_bundle_buf(l_ptr, bundler, buf);
912 				buf = bundler;
913 				msg = buf_msg(buf);
914 				l_ptr->stats.sent_bundles++;
915 			}
916 		}
917 	}
918 	if (!l_ptr->next_out)
919 		l_ptr->next_out = buf;
920 	link_add_to_outqueue(l_ptr, buf, msg);
921 	tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
922 	return dsz;
923 }
924 
925 /*
926  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
927  * not been selected yet, and the the owner node is not locked
928  * Called by TIPC internal users, e.g. the name distributor
929  */
930 
931 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
932 {
933 	struct link *l_ptr;
934 	struct tipc_node *n_ptr;
935 	int res = -ELINKCONG;
936 
937 	read_lock_bh(&tipc_net_lock);
938 	n_ptr = tipc_node_find(dest);
939 	if (n_ptr) {
940 		tipc_node_lock(n_ptr);
941 		l_ptr = n_ptr->active_links[selector & 1];
942 		if (l_ptr)
943 			res = tipc_link_send_buf(l_ptr, buf);
944 		else
945 			buf_discard(buf);
946 		tipc_node_unlock(n_ptr);
947 	} else {
948 		buf_discard(buf);
949 	}
950 	read_unlock_bh(&tipc_net_lock);
951 	return res;
952 }
953 
954 /*
955  * link_send_buf_fast: Entry for data messages where the
956  * destination link is known and the header is complete,
957  * inclusive total message length. Very time critical.
958  * Link is locked. Returns user data length.
959  */
960 
961 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
962 			      u32 *used_max_pkt)
963 {
964 	struct tipc_msg *msg = buf_msg(buf);
965 	int res = msg_data_sz(msg);
966 
967 	if (likely(!link_congested(l_ptr))) {
968 		if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
969 			if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
970 				link_add_to_outqueue(l_ptr, buf, msg);
971 				if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
972 							    &l_ptr->media_addr))) {
973 					l_ptr->unacked_window = 0;
974 					return res;
975 				}
976 				tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
977 				l_ptr->stats.bearer_congs++;
978 				l_ptr->next_out = buf;
979 				return res;
980 			}
981 		} else
982 			*used_max_pkt = l_ptr->max_pkt;
983 	}
984 	return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
985 }
986 
987 /*
988  * tipc_send_buf_fast: Entry for data messages where the
989  * destination node is known and the header is complete,
990  * inclusive total message length.
991  * Returns user data length.
992  */
993 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
994 {
995 	struct link *l_ptr;
996 	struct tipc_node *n_ptr;
997 	int res;
998 	u32 selector = msg_origport(buf_msg(buf)) & 1;
999 	u32 dummy;
1000 
1001 	if (destnode == tipc_own_addr)
1002 		return tipc_port_recv_msg(buf);
1003 
1004 	read_lock_bh(&tipc_net_lock);
1005 	n_ptr = tipc_node_find(destnode);
1006 	if (likely(n_ptr)) {
1007 		tipc_node_lock(n_ptr);
1008 		l_ptr = n_ptr->active_links[selector];
1009 		if (likely(l_ptr)) {
1010 			res = link_send_buf_fast(l_ptr, buf, &dummy);
1011 			tipc_node_unlock(n_ptr);
1012 			read_unlock_bh(&tipc_net_lock);
1013 			return res;
1014 		}
1015 		tipc_node_unlock(n_ptr);
1016 	}
1017 	read_unlock_bh(&tipc_net_lock);
1018 	res = msg_data_sz(buf_msg(buf));
1019 	tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1020 	return res;
1021 }
1022 
1023 
1024 /*
1025  * tipc_link_send_sections_fast: Entry for messages where the
1026  * destination processor is known and the header is complete,
1027  * except for total message length.
1028  * Returns user data length or errno.
1029  */
1030 int tipc_link_send_sections_fast(struct port *sender,
1031 				 struct iovec const *msg_sect,
1032 				 const u32 num_sect,
1033 				 u32 destaddr)
1034 {
1035 	struct tipc_msg *hdr = &sender->publ.phdr;
1036 	struct link *l_ptr;
1037 	struct sk_buff *buf;
1038 	struct tipc_node *node;
1039 	int res;
1040 	u32 selector = msg_origport(hdr) & 1;
1041 
1042 again:
1043 	/*
1044 	 * Try building message using port's max_pkt hint.
1045 	 * (Must not hold any locks while building message.)
1046 	 */
1047 
1048 	res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1049 			!sender->user_port, &buf);
1050 
1051 	read_lock_bh(&tipc_net_lock);
1052 	node = tipc_node_find(destaddr);
1053 	if (likely(node)) {
1054 		tipc_node_lock(node);
1055 		l_ptr = node->active_links[selector];
1056 		if (likely(l_ptr)) {
1057 			if (likely(buf)) {
1058 				res = link_send_buf_fast(l_ptr, buf,
1059 							 &sender->publ.max_pkt);
1060 				if (unlikely(res < 0))
1061 					buf_discard(buf);
1062 exit:
1063 				tipc_node_unlock(node);
1064 				read_unlock_bh(&tipc_net_lock);
1065 				return res;
1066 			}
1067 
1068 			/* Exit if build request was invalid */
1069 
1070 			if (unlikely(res < 0))
1071 				goto exit;
1072 
1073 			/* Exit if link (or bearer) is congested */
1074 
1075 			if (link_congested(l_ptr) ||
1076 			    !list_empty(&l_ptr->b_ptr->cong_links)) {
1077 				res = link_schedule_port(l_ptr,
1078 							 sender->publ.ref, res);
1079 				goto exit;
1080 			}
1081 
1082 			/*
1083 			 * Message size exceeds max_pkt hint; update hint,
1084 			 * then re-try fast path or fragment the message
1085 			 */
1086 
1087 			sender->publ.max_pkt = l_ptr->max_pkt;
1088 			tipc_node_unlock(node);
1089 			read_unlock_bh(&tipc_net_lock);
1090 
1091 
1092 			if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1093 				goto again;
1094 
1095 			return link_send_sections_long(sender, msg_sect,
1096 						       num_sect, destaddr);
1097 		}
1098 		tipc_node_unlock(node);
1099 	}
1100 	read_unlock_bh(&tipc_net_lock);
1101 
1102 	/* Couldn't find a link to the destination node */
1103 
1104 	if (buf)
1105 		return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1106 	if (res >= 0)
1107 		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1108 						 TIPC_ERR_NO_NODE);
1109 	return res;
1110 }
1111 
1112 /*
1113  * link_send_sections_long(): Entry for long messages where the
1114  * destination node is known and the header is complete,
1115  * inclusive total message length.
1116  * Link and bearer congestion status have been checked to be ok,
1117  * and are ignored if they change.
1118  *
1119  * Note that fragments do not use the full link MTU so that they won't have
1120  * to undergo refragmentation if link changeover causes them to be sent
1121  * over another link with an additional tunnel header added as prefix.
1122  * (Refragmentation will still occur if the other link has a smaller MTU.)
1123  *
1124  * Returns user data length or errno.
1125  */
1126 static int link_send_sections_long(struct port *sender,
1127 				   struct iovec const *msg_sect,
1128 				   u32 num_sect,
1129 				   u32 destaddr)
1130 {
1131 	struct link *l_ptr;
1132 	struct tipc_node *node;
1133 	struct tipc_msg *hdr = &sender->publ.phdr;
1134 	u32 dsz = msg_data_sz(hdr);
1135 	u32 max_pkt, fragm_sz, rest;
1136 	struct tipc_msg fragm_hdr;
1137 	struct sk_buff *buf, *buf_chain, *prev;
1138 	u32 fragm_crs, fragm_rest, hsz, sect_rest;
1139 	const unchar *sect_crs;
1140 	int curr_sect;
1141 	u32 fragm_no;
1142 
1143 again:
1144 	fragm_no = 1;
1145 	max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1146 		/* leave room for tunnel header in case of link changeover */
1147 	fragm_sz = max_pkt - INT_H_SIZE;
1148 		/* leave room for fragmentation header in each fragment */
1149 	rest = dsz;
1150 	fragm_crs = 0;
1151 	fragm_rest = 0;
1152 	sect_rest = 0;
1153 	sect_crs = NULL;
1154 	curr_sect = -1;
1155 
1156 	/* Prepare reusable fragment header: */
1157 
1158 	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1159 		 INT_H_SIZE, msg_destnode(hdr));
1160 	msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1161 	msg_set_size(&fragm_hdr, max_pkt);
1162 	msg_set_fragm_no(&fragm_hdr, 1);
1163 
1164 	/* Prepare header of first fragment: */
1165 
1166 	buf_chain = buf = tipc_buf_acquire(max_pkt);
1167 	if (!buf)
1168 		return -ENOMEM;
1169 	buf->next = NULL;
1170 	skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1171 	hsz = msg_hdr_sz(hdr);
1172 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1173 
1174 	/* Chop up message: */
1175 
1176 	fragm_crs = INT_H_SIZE + hsz;
1177 	fragm_rest = fragm_sz - hsz;
1178 
1179 	do {		/* For all sections */
1180 		u32 sz;
1181 
1182 		if (!sect_rest) {
1183 			sect_rest = msg_sect[++curr_sect].iov_len;
1184 			sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1185 		}
1186 
1187 		if (sect_rest < fragm_rest)
1188 			sz = sect_rest;
1189 		else
1190 			sz = fragm_rest;
1191 
1192 		if (likely(!sender->user_port)) {
1193 			if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1194 error:
1195 				for (; buf_chain; buf_chain = buf) {
1196 					buf = buf_chain->next;
1197 					buf_discard(buf_chain);
1198 				}
1199 				return -EFAULT;
1200 			}
1201 		} else
1202 			skb_copy_to_linear_data_offset(buf, fragm_crs,
1203 						       sect_crs, sz);
1204 		sect_crs += sz;
1205 		sect_rest -= sz;
1206 		fragm_crs += sz;
1207 		fragm_rest -= sz;
1208 		rest -= sz;
1209 
1210 		if (!fragm_rest && rest) {
1211 
1212 			/* Initiate new fragment: */
1213 			if (rest <= fragm_sz) {
1214 				fragm_sz = rest;
1215 				msg_set_type(&fragm_hdr, LAST_FRAGMENT);
1216 			} else {
1217 				msg_set_type(&fragm_hdr, FRAGMENT);
1218 			}
1219 			msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1220 			msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1221 			prev = buf;
1222 			buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
1223 			if (!buf)
1224 				goto error;
1225 
1226 			buf->next = NULL;
1227 			prev->next = buf;
1228 			skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1229 			fragm_crs = INT_H_SIZE;
1230 			fragm_rest = fragm_sz;
1231 		}
1232 	} while (rest > 0);
1233 
1234 	/*
1235 	 * Now we have a buffer chain. Select a link and check
1236 	 * that packet size is still OK
1237 	 */
1238 	node = tipc_node_find(destaddr);
1239 	if (likely(node)) {
1240 		tipc_node_lock(node);
1241 		l_ptr = node->active_links[sender->publ.ref & 1];
1242 		if (!l_ptr) {
1243 			tipc_node_unlock(node);
1244 			goto reject;
1245 		}
1246 		if (l_ptr->max_pkt < max_pkt) {
1247 			sender->publ.max_pkt = l_ptr->max_pkt;
1248 			tipc_node_unlock(node);
1249 			for (; buf_chain; buf_chain = buf) {
1250 				buf = buf_chain->next;
1251 				buf_discard(buf_chain);
1252 			}
1253 			goto again;
1254 		}
1255 	} else {
1256 reject:
1257 		for (; buf_chain; buf_chain = buf) {
1258 			buf = buf_chain->next;
1259 			buf_discard(buf_chain);
1260 		}
1261 		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1262 						 TIPC_ERR_NO_NODE);
1263 	}
1264 
1265 	/* Append whole chain to send queue: */
1266 
1267 	buf = buf_chain;
1268 	l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1269 	if (!l_ptr->next_out)
1270 		l_ptr->next_out = buf_chain;
1271 	l_ptr->stats.sent_fragmented++;
1272 	while (buf) {
1273 		struct sk_buff *next = buf->next;
1274 		struct tipc_msg *msg = buf_msg(buf);
1275 
1276 		l_ptr->stats.sent_fragments++;
1277 		msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1278 		link_add_to_outqueue(l_ptr, buf, msg);
1279 		buf = next;
1280 	}
1281 
1282 	/* Send it, if possible: */
1283 
1284 	tipc_link_push_queue(l_ptr);
1285 	tipc_node_unlock(node);
1286 	return dsz;
1287 }
1288 
1289 /*
1290  * tipc_link_push_packet: Push one unsent packet to the media
1291  */
1292 u32 tipc_link_push_packet(struct link *l_ptr)
1293 {
1294 	struct sk_buff *buf = l_ptr->first_out;
1295 	u32 r_q_size = l_ptr->retransm_queue_size;
1296 	u32 r_q_head = l_ptr->retransm_queue_head;
1297 
1298 	/* Step to position where retransmission failed, if any,    */
1299 	/* consider that buffers may have been released in meantime */
1300 
1301 	if (r_q_size && buf) {
1302 		u32 last = lesser(mod(r_q_head + r_q_size),
1303 				  link_last_sent(l_ptr));
1304 		u32 first = msg_seqno(buf_msg(buf));
1305 
1306 		while (buf && less(first, r_q_head)) {
1307 			first = mod(first + 1);
1308 			buf = buf->next;
1309 		}
1310 		l_ptr->retransm_queue_head = r_q_head = first;
1311 		l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1312 	}
1313 
1314 	/* Continue retransmission now, if there is anything: */
1315 
1316 	if (r_q_size && buf) {
1317 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1318 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1319 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1320 			l_ptr->retransm_queue_head = mod(++r_q_head);
1321 			l_ptr->retransm_queue_size = --r_q_size;
1322 			l_ptr->stats.retransmitted++;
1323 			return 0;
1324 		} else {
1325 			l_ptr->stats.bearer_congs++;
1326 			return PUSH_FAILED;
1327 		}
1328 	}
1329 
1330 	/* Send deferred protocol message, if any: */
1331 
1332 	buf = l_ptr->proto_msg_queue;
1333 	if (buf) {
1334 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1335 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1336 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1337 			l_ptr->unacked_window = 0;
1338 			buf_discard(buf);
1339 			l_ptr->proto_msg_queue = NULL;
1340 			return 0;
1341 		} else {
1342 			l_ptr->stats.bearer_congs++;
1343 			return PUSH_FAILED;
1344 		}
1345 	}
1346 
1347 	/* Send one deferred data message, if send window not full: */
1348 
1349 	buf = l_ptr->next_out;
1350 	if (buf) {
1351 		struct tipc_msg *msg = buf_msg(buf);
1352 		u32 next = msg_seqno(msg);
1353 		u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1354 
1355 		if (mod(next - first) < l_ptr->queue_limit[0]) {
1356 			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1357 			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1358 			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1359 				if (msg_user(msg) == MSG_BUNDLER)
1360 					msg_set_type(msg, CLOSED_MSG);
1361 				l_ptr->next_out = buf->next;
1362 				return 0;
1363 			} else {
1364 				l_ptr->stats.bearer_congs++;
1365 				return PUSH_FAILED;
1366 			}
1367 		}
1368 	}
1369 	return PUSH_FINISHED;
1370 }
1371 
1372 /*
1373  * push_queue(): push out the unsent messages of a link where
1374  *               congestion has abated. Node is locked
1375  */
1376 void tipc_link_push_queue(struct link *l_ptr)
1377 {
1378 	u32 res;
1379 
1380 	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1381 		return;
1382 
1383 	do {
1384 		res = tipc_link_push_packet(l_ptr);
1385 	} while (!res);
1386 
1387 	if (res == PUSH_FAILED)
1388 		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1389 }
1390 
1391 static void link_reset_all(unsigned long addr)
1392 {
1393 	struct tipc_node *n_ptr;
1394 	char addr_string[16];
1395 	u32 i;
1396 
1397 	read_lock_bh(&tipc_net_lock);
1398 	n_ptr = tipc_node_find((u32)addr);
1399 	if (!n_ptr) {
1400 		read_unlock_bh(&tipc_net_lock);
1401 		return;	/* node no longer exists */
1402 	}
1403 
1404 	tipc_node_lock(n_ptr);
1405 
1406 	warn("Resetting all links to %s\n",
1407 	     tipc_addr_string_fill(addr_string, n_ptr->addr));
1408 
1409 	for (i = 0; i < MAX_BEARERS; i++) {
1410 		if (n_ptr->links[i]) {
1411 			link_print(n_ptr->links[i], "Resetting link\n");
1412 			tipc_link_reset(n_ptr->links[i]);
1413 		}
1414 	}
1415 
1416 	tipc_node_unlock(n_ptr);
1417 	read_unlock_bh(&tipc_net_lock);
1418 }
1419 
1420 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1421 {
1422 	struct tipc_msg *msg = buf_msg(buf);
1423 
1424 	warn("Retransmission failure on link <%s>\n", l_ptr->name);
1425 
1426 	if (l_ptr->addr) {
1427 
1428 		/* Handle failure on standard link */
1429 
1430 		link_print(l_ptr, "Resetting link\n");
1431 		tipc_link_reset(l_ptr);
1432 
1433 	} else {
1434 
1435 		/* Handle failure on broadcast link */
1436 
1437 		struct tipc_node *n_ptr;
1438 		char addr_string[16];
1439 
1440 		info("Msg seq number: %u,  ", msg_seqno(msg));
1441 		info("Outstanding acks: %lu\n",
1442 		     (unsigned long) TIPC_SKB_CB(buf)->handle);
1443 
1444 		n_ptr = l_ptr->owner->next;
1445 		tipc_node_lock(n_ptr);
1446 
1447 		tipc_addr_string_fill(addr_string, n_ptr->addr);
1448 		info("Multicast link info for %s\n", addr_string);
1449 		info("Supported: %d,  ", n_ptr->bclink.supported);
1450 		info("Acked: %u\n", n_ptr->bclink.acked);
1451 		info("Last in: %u,  ", n_ptr->bclink.last_in);
1452 		info("Gap after: %u,  ", n_ptr->bclink.gap_after);
1453 		info("Gap to: %u\n", n_ptr->bclink.gap_to);
1454 		info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1455 
1456 		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1457 
1458 		tipc_node_unlock(n_ptr);
1459 
1460 		l_ptr->stale_count = 0;
1461 	}
1462 }
1463 
1464 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1465 			  u32 retransmits)
1466 {
1467 	struct tipc_msg *msg;
1468 
1469 	if (!buf)
1470 		return;
1471 
1472 	msg = buf_msg(buf);
1473 
1474 	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1475 		if (l_ptr->retransm_queue_size == 0) {
1476 			l_ptr->retransm_queue_head = msg_seqno(msg);
1477 			l_ptr->retransm_queue_size = retransmits;
1478 		} else {
1479 			err("Unexpected retransmit on link %s (qsize=%d)\n",
1480 			    l_ptr->name, l_ptr->retransm_queue_size);
1481 		}
1482 		return;
1483 	} else {
1484 		/* Detect repeated retransmit failures on uncongested bearer */
1485 
1486 		if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1487 			if (++l_ptr->stale_count > 100) {
1488 				link_retransmit_failure(l_ptr, buf);
1489 				return;
1490 			}
1491 		} else {
1492 			l_ptr->last_retransmitted = msg_seqno(msg);
1493 			l_ptr->stale_count = 1;
1494 		}
1495 	}
1496 
1497 	while (retransmits && (buf != l_ptr->next_out) && buf) {
1498 		msg = buf_msg(buf);
1499 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1500 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1501 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1502 			buf = buf->next;
1503 			retransmits--;
1504 			l_ptr->stats.retransmitted++;
1505 		} else {
1506 			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1507 			l_ptr->stats.bearer_congs++;
1508 			l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1509 			l_ptr->retransm_queue_size = retransmits;
1510 			return;
1511 		}
1512 	}
1513 
1514 	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1515 }
1516 
1517 /**
1518  * link_insert_deferred_queue - insert deferred messages back into receive chain
1519  */
1520 
1521 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1522 						  struct sk_buff *buf)
1523 {
1524 	u32 seq_no;
1525 
1526 	if (l_ptr->oldest_deferred_in == NULL)
1527 		return buf;
1528 
1529 	seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1530 	if (seq_no == mod(l_ptr->next_in_no)) {
1531 		l_ptr->newest_deferred_in->next = buf;
1532 		buf = l_ptr->oldest_deferred_in;
1533 		l_ptr->oldest_deferred_in = NULL;
1534 		l_ptr->deferred_inqueue_sz = 0;
1535 	}
1536 	return buf;
1537 }
1538 
1539 /**
1540  * link_recv_buf_validate - validate basic format of received message
1541  *
1542  * This routine ensures a TIPC message has an acceptable header, and at least
1543  * as much data as the header indicates it should.  The routine also ensures
1544  * that the entire message header is stored in the main fragment of the message
1545  * buffer, to simplify future access to message header fields.
1546  *
1547  * Note: Having extra info present in the message header or data areas is OK.
1548  * TIPC will ignore the excess, under the assumption that it is optional info
1549  * introduced by a later release of the protocol.
1550  */
1551 
1552 static int link_recv_buf_validate(struct sk_buff *buf)
1553 {
1554 	static u32 min_data_hdr_size[8] = {
1555 		SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1556 		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1557 		};
1558 
1559 	struct tipc_msg *msg;
1560 	u32 tipc_hdr[2];
1561 	u32 size;
1562 	u32 hdr_size;
1563 	u32 min_hdr_size;
1564 
1565 	if (unlikely(buf->len < MIN_H_SIZE))
1566 		return 0;
1567 
1568 	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1569 	if (msg == NULL)
1570 		return 0;
1571 
1572 	if (unlikely(msg_version(msg) != TIPC_VERSION))
1573 		return 0;
1574 
1575 	size = msg_size(msg);
1576 	hdr_size = msg_hdr_sz(msg);
1577 	min_hdr_size = msg_isdata(msg) ?
1578 		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1579 
1580 	if (unlikely((hdr_size < min_hdr_size) ||
1581 		     (size < hdr_size) ||
1582 		     (buf->len < size) ||
1583 		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1584 		return 0;
1585 
1586 	return pskb_may_pull(buf, hdr_size);
1587 }
1588 
1589 /**
1590  * tipc_recv_msg - process TIPC messages arriving from off-node
1591  * @head: pointer to message buffer chain
1592  * @tb_ptr: pointer to bearer message arrived on
1593  *
1594  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1595  * structure (i.e. cannot be NULL), but bearer can be inactive.
1596  */
1597 
1598 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1599 {
1600 	read_lock_bh(&tipc_net_lock);
1601 	while (head) {
1602 		struct bearer *b_ptr = (struct bearer *)tb_ptr;
1603 		struct tipc_node *n_ptr;
1604 		struct link *l_ptr;
1605 		struct sk_buff *crs;
1606 		struct sk_buff *buf = head;
1607 		struct tipc_msg *msg;
1608 		u32 seq_no;
1609 		u32 ackd;
1610 		u32 released = 0;
1611 		int type;
1612 
1613 		head = head->next;
1614 
1615 		/* Ensure bearer is still enabled */
1616 
1617 		if (unlikely(!b_ptr->active))
1618 			goto cont;
1619 
1620 		/* Ensure message is well-formed */
1621 
1622 		if (unlikely(!link_recv_buf_validate(buf)))
1623 			goto cont;
1624 
1625 		/* Ensure message data is a single contiguous unit */
1626 
1627 		if (unlikely(buf_linearize(buf)))
1628 			goto cont;
1629 
1630 		/* Handle arrival of a non-unicast link message */
1631 
1632 		msg = buf_msg(buf);
1633 
1634 		if (unlikely(msg_non_seq(msg))) {
1635 			if (msg_user(msg) ==  LINK_CONFIG)
1636 				tipc_disc_recv_msg(buf, b_ptr);
1637 			else
1638 				tipc_bclink_recv_pkt(buf);
1639 			continue;
1640 		}
1641 
1642 		if (unlikely(!msg_short(msg) &&
1643 			     (msg_destnode(msg) != tipc_own_addr)))
1644 			goto cont;
1645 
1646 		/* Discard non-routeable messages destined for another node */
1647 
1648 		if (unlikely(!msg_isdata(msg) &&
1649 			     (msg_destnode(msg) != tipc_own_addr))) {
1650 			if ((msg_user(msg) != CONN_MANAGER) &&
1651 			    (msg_user(msg) != MSG_FRAGMENTER))
1652 				goto cont;
1653 		}
1654 
1655 		/* Locate neighboring node that sent message */
1656 
1657 		n_ptr = tipc_node_find(msg_prevnode(msg));
1658 		if (unlikely(!n_ptr))
1659 			goto cont;
1660 		tipc_node_lock(n_ptr);
1661 
1662 		/* Don't talk to neighbor during cleanup after last session */
1663 
1664 		if (n_ptr->cleanup_required) {
1665 			tipc_node_unlock(n_ptr);
1666 			goto cont;
1667 		}
1668 
1669 		/* Locate unicast link endpoint that should handle message */
1670 
1671 		l_ptr = n_ptr->links[b_ptr->identity];
1672 		if (unlikely(!l_ptr)) {
1673 			tipc_node_unlock(n_ptr);
1674 			goto cont;
1675 		}
1676 
1677 		/* Validate message sequence number info */
1678 
1679 		seq_no = msg_seqno(msg);
1680 		ackd = msg_ack(msg);
1681 
1682 		/* Release acked messages */
1683 
1684 		if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1685 			if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1686 				tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1687 		}
1688 
1689 		crs = l_ptr->first_out;
1690 		while ((crs != l_ptr->next_out) &&
1691 		       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1692 			struct sk_buff *next = crs->next;
1693 
1694 			buf_discard(crs);
1695 			crs = next;
1696 			released++;
1697 		}
1698 		if (released) {
1699 			l_ptr->first_out = crs;
1700 			l_ptr->out_queue_size -= released;
1701 		}
1702 
1703 		/* Try sending any messages link endpoint has pending */
1704 
1705 		if (unlikely(l_ptr->next_out))
1706 			tipc_link_push_queue(l_ptr);
1707 		if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1708 			tipc_link_wakeup_ports(l_ptr, 0);
1709 		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1710 			l_ptr->stats.sent_acks++;
1711 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1712 		}
1713 
1714 		/* Now (finally!) process the incoming message */
1715 
1716 protocol_check:
1717 		if (likely(link_working_working(l_ptr))) {
1718 			if (likely(seq_no == mod(l_ptr->next_in_no))) {
1719 				l_ptr->next_in_no++;
1720 				if (unlikely(l_ptr->oldest_deferred_in))
1721 					head = link_insert_deferred_queue(l_ptr,
1722 									  head);
1723 				if (likely(msg_is_dest(msg, tipc_own_addr))) {
1724 deliver:
1725 					if (likely(msg_isdata(msg))) {
1726 						tipc_node_unlock(n_ptr);
1727 						tipc_port_recv_msg(buf);
1728 						continue;
1729 					}
1730 					switch (msg_user(msg)) {
1731 					case MSG_BUNDLER:
1732 						l_ptr->stats.recv_bundles++;
1733 						l_ptr->stats.recv_bundled +=
1734 							msg_msgcnt(msg);
1735 						tipc_node_unlock(n_ptr);
1736 						tipc_link_recv_bundle(buf);
1737 						continue;
1738 					case ROUTE_DISTRIBUTOR:
1739 						tipc_node_unlock(n_ptr);
1740 						buf_discard(buf);
1741 						continue;
1742 					case NAME_DISTRIBUTOR:
1743 						tipc_node_unlock(n_ptr);
1744 						tipc_named_recv(buf);
1745 						continue;
1746 					case CONN_MANAGER:
1747 						tipc_node_unlock(n_ptr);
1748 						tipc_port_recv_proto_msg(buf);
1749 						continue;
1750 					case MSG_FRAGMENTER:
1751 						l_ptr->stats.recv_fragments++;
1752 						if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1753 									    &buf, &msg)) {
1754 							l_ptr->stats.recv_fragmented++;
1755 							goto deliver;
1756 						}
1757 						break;
1758 					case CHANGEOVER_PROTOCOL:
1759 						type = msg_type(msg);
1760 						if (link_recv_changeover_msg(&l_ptr, &buf)) {
1761 							msg = buf_msg(buf);
1762 							seq_no = msg_seqno(msg);
1763 							if (type == ORIGINAL_MSG)
1764 								goto deliver;
1765 							goto protocol_check;
1766 						}
1767 						break;
1768 					}
1769 				}
1770 				tipc_node_unlock(n_ptr);
1771 				tipc_net_route_msg(buf);
1772 				continue;
1773 			}
1774 			link_handle_out_of_seq_msg(l_ptr, buf);
1775 			head = link_insert_deferred_queue(l_ptr, head);
1776 			tipc_node_unlock(n_ptr);
1777 			continue;
1778 		}
1779 
1780 		if (msg_user(msg) == LINK_PROTOCOL) {
1781 			link_recv_proto_msg(l_ptr, buf);
1782 			head = link_insert_deferred_queue(l_ptr, head);
1783 			tipc_node_unlock(n_ptr);
1784 			continue;
1785 		}
1786 		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1787 
1788 		if (link_working_working(l_ptr)) {
1789 			/* Re-insert in front of queue */
1790 			buf->next = head;
1791 			head = buf;
1792 			tipc_node_unlock(n_ptr);
1793 			continue;
1794 		}
1795 		tipc_node_unlock(n_ptr);
1796 cont:
1797 		buf_discard(buf);
1798 	}
1799 	read_unlock_bh(&tipc_net_lock);
1800 }
1801 
1802 /*
1803  * link_defer_buf(): Sort a received out-of-sequence packet
1804  *                   into the deferred reception queue.
1805  * Returns the increase of the queue length,i.e. 0 or 1
1806  */
1807 
1808 u32 tipc_link_defer_pkt(struct sk_buff **head,
1809 			struct sk_buff **tail,
1810 			struct sk_buff *buf)
1811 {
1812 	struct sk_buff *prev = NULL;
1813 	struct sk_buff *crs = *head;
1814 	u32 seq_no = msg_seqno(buf_msg(buf));
1815 
1816 	buf->next = NULL;
1817 
1818 	/* Empty queue ? */
1819 	if (*head == NULL) {
1820 		*head = *tail = buf;
1821 		return 1;
1822 	}
1823 
1824 	/* Last ? */
1825 	if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1826 		(*tail)->next = buf;
1827 		*tail = buf;
1828 		return 1;
1829 	}
1830 
1831 	/* Scan through queue and sort it in */
1832 	do {
1833 		struct tipc_msg *msg = buf_msg(crs);
1834 
1835 		if (less(seq_no, msg_seqno(msg))) {
1836 			buf->next = crs;
1837 			if (prev)
1838 				prev->next = buf;
1839 			else
1840 				*head = buf;
1841 			return 1;
1842 		}
1843 		if (seq_no == msg_seqno(msg))
1844 			break;
1845 		prev = crs;
1846 		crs = crs->next;
1847 	} while (crs);
1848 
1849 	/* Message is a duplicate of an existing message */
1850 
1851 	buf_discard(buf);
1852 	return 0;
1853 }
1854 
1855 /**
1856  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1857  */
1858 
1859 static void link_handle_out_of_seq_msg(struct link *l_ptr,
1860 				       struct sk_buff *buf)
1861 {
1862 	u32 seq_no = msg_seqno(buf_msg(buf));
1863 
1864 	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1865 		link_recv_proto_msg(l_ptr, buf);
1866 		return;
1867 	}
1868 
1869 	/* Record OOS packet arrival (force mismatch on next timeout) */
1870 
1871 	l_ptr->checkpoint--;
1872 
1873 	/*
1874 	 * Discard packet if a duplicate; otherwise add it to deferred queue
1875 	 * and notify peer of gap as per protocol specification
1876 	 */
1877 
1878 	if (less(seq_no, mod(l_ptr->next_in_no))) {
1879 		l_ptr->stats.duplicates++;
1880 		buf_discard(buf);
1881 		return;
1882 	}
1883 
1884 	if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1885 				&l_ptr->newest_deferred_in, buf)) {
1886 		l_ptr->deferred_inqueue_sz++;
1887 		l_ptr->stats.deferred_recv++;
1888 		if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1889 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1890 	} else
1891 		l_ptr->stats.duplicates++;
1892 }
1893 
1894 /*
1895  * Send protocol message to the other endpoint.
1896  */
1897 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
1898 			      u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1899 {
1900 	struct sk_buff *buf = NULL;
1901 	struct tipc_msg *msg = l_ptr->pmsg;
1902 	u32 msg_size = sizeof(l_ptr->proto_msg);
1903 
1904 	if (link_blocked(l_ptr))
1905 		return;
1906 	msg_set_type(msg, msg_typ);
1907 	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1908 	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
1909 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
1910 
1911 	if (msg_typ == STATE_MSG) {
1912 		u32 next_sent = mod(l_ptr->next_out_no);
1913 
1914 		if (!tipc_link_is_up(l_ptr))
1915 			return;
1916 		if (l_ptr->next_out)
1917 			next_sent = msg_seqno(buf_msg(l_ptr->next_out));
1918 		msg_set_next_sent(msg, next_sent);
1919 		if (l_ptr->oldest_deferred_in) {
1920 			u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1921 			gap = mod(rec - mod(l_ptr->next_in_no));
1922 		}
1923 		msg_set_seq_gap(msg, gap);
1924 		if (gap)
1925 			l_ptr->stats.sent_nacks++;
1926 		msg_set_link_tolerance(msg, tolerance);
1927 		msg_set_linkprio(msg, priority);
1928 		msg_set_max_pkt(msg, ack_mtu);
1929 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1930 		msg_set_probe(msg, probe_msg != 0);
1931 		if (probe_msg) {
1932 			u32 mtu = l_ptr->max_pkt;
1933 
1934 			if ((mtu < l_ptr->max_pkt_target) &&
1935 			    link_working_working(l_ptr) &&
1936 			    l_ptr->fsm_msg_cnt) {
1937 				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1938 				if (l_ptr->max_pkt_probes == 10) {
1939 					l_ptr->max_pkt_target = (msg_size - 4);
1940 					l_ptr->max_pkt_probes = 0;
1941 					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1942 				}
1943 				l_ptr->max_pkt_probes++;
1944 			}
1945 
1946 			l_ptr->stats.sent_probes++;
1947 		}
1948 		l_ptr->stats.sent_states++;
1949 	} else {		/* RESET_MSG or ACTIVATE_MSG */
1950 		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1951 		msg_set_seq_gap(msg, 0);
1952 		msg_set_next_sent(msg, 1);
1953 		msg_set_link_tolerance(msg, l_ptr->tolerance);
1954 		msg_set_linkprio(msg, l_ptr->priority);
1955 		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1956 	}
1957 
1958 	if (tipc_node_has_redundant_links(l_ptr->owner))
1959 		msg_set_redundant_link(msg);
1960 	else
1961 		msg_clear_redundant_link(msg);
1962 	msg_set_linkprio(msg, l_ptr->priority);
1963 
1964 	/* Ensure sequence number will not fit : */
1965 
1966 	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1967 
1968 	/* Congestion? */
1969 
1970 	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1971 		if (!l_ptr->proto_msg_queue) {
1972 			l_ptr->proto_msg_queue =
1973 				tipc_buf_acquire(sizeof(l_ptr->proto_msg));
1974 		}
1975 		buf = l_ptr->proto_msg_queue;
1976 		if (!buf)
1977 			return;
1978 		skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1979 		return;
1980 	}
1981 	msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
1982 
1983 	/* Message can be sent */
1984 
1985 	buf = tipc_buf_acquire(msg_size);
1986 	if (!buf)
1987 		return;
1988 
1989 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1990 	msg_set_size(buf_msg(buf), msg_size);
1991 
1992 	if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1993 		l_ptr->unacked_window = 0;
1994 		buf_discard(buf);
1995 		return;
1996 	}
1997 
1998 	/* New congestion */
1999 	tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2000 	l_ptr->proto_msg_queue = buf;
2001 	l_ptr->stats.bearer_congs++;
2002 }
2003 
2004 /*
2005  * Receive protocol message :
2006  * Note that network plane id propagates through the network, and may
2007  * change at any time. The node with lowest address rules
2008  */
2009 
2010 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2011 {
2012 	u32 rec_gap = 0;
2013 	u32 max_pkt_info;
2014 	u32 max_pkt_ack;
2015 	u32 msg_tol;
2016 	struct tipc_msg *msg = buf_msg(buf);
2017 
2018 	if (link_blocked(l_ptr))
2019 		goto exit;
2020 
2021 	/* record unnumbered packet arrival (force mismatch on next timeout) */
2022 
2023 	l_ptr->checkpoint--;
2024 
2025 	if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2026 		if (tipc_own_addr > msg_prevnode(msg))
2027 			l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2028 
2029 	l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2030 
2031 	switch (msg_type(msg)) {
2032 
2033 	case RESET_MSG:
2034 		if (!link_working_unknown(l_ptr) &&
2035 		    (l_ptr->peer_session != INVALID_SESSION)) {
2036 			if (msg_session(msg) == l_ptr->peer_session)
2037 				break; /* duplicate: ignore */
2038 		}
2039 		/* fall thru' */
2040 	case ACTIVATE_MSG:
2041 		/* Update link settings according other endpoint's values */
2042 
2043 		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2044 
2045 		msg_tol = msg_link_tolerance(msg);
2046 		if (msg_tol > l_ptr->tolerance)
2047 			link_set_supervision_props(l_ptr, msg_tol);
2048 
2049 		if (msg_linkprio(msg) > l_ptr->priority)
2050 			l_ptr->priority = msg_linkprio(msg);
2051 
2052 		max_pkt_info = msg_max_pkt(msg);
2053 		if (max_pkt_info) {
2054 			if (max_pkt_info < l_ptr->max_pkt_target)
2055 				l_ptr->max_pkt_target = max_pkt_info;
2056 			if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2057 				l_ptr->max_pkt = l_ptr->max_pkt_target;
2058 		} else {
2059 			l_ptr->max_pkt = l_ptr->max_pkt_target;
2060 		}
2061 		l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2062 
2063 		link_state_event(l_ptr, msg_type(msg));
2064 
2065 		l_ptr->peer_session = msg_session(msg);
2066 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
2067 
2068 		/* Synchronize broadcast sequence numbers */
2069 		if (!tipc_node_has_redundant_links(l_ptr->owner))
2070 			l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2071 		break;
2072 	case STATE_MSG:
2073 
2074 		msg_tol = msg_link_tolerance(msg);
2075 		if (msg_tol)
2076 			link_set_supervision_props(l_ptr, msg_tol);
2077 
2078 		if (msg_linkprio(msg) &&
2079 		    (msg_linkprio(msg) != l_ptr->priority)) {
2080 			warn("Resetting link <%s>, priority change %u->%u\n",
2081 			     l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2082 			l_ptr->priority = msg_linkprio(msg);
2083 			tipc_link_reset(l_ptr); /* Enforce change to take effect */
2084 			break;
2085 		}
2086 		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2087 		l_ptr->stats.recv_states++;
2088 		if (link_reset_unknown(l_ptr))
2089 			break;
2090 
2091 		if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2092 			rec_gap = mod(msg_next_sent(msg) -
2093 				      mod(l_ptr->next_in_no));
2094 		}
2095 
2096 		max_pkt_ack = msg_max_pkt(msg);
2097 		if (max_pkt_ack > l_ptr->max_pkt) {
2098 			l_ptr->max_pkt = max_pkt_ack;
2099 			l_ptr->max_pkt_probes = 0;
2100 		}
2101 
2102 		max_pkt_ack = 0;
2103 		if (msg_probe(msg)) {
2104 			l_ptr->stats.recv_probes++;
2105 			if (msg_size(msg) > sizeof(l_ptr->proto_msg))
2106 				max_pkt_ack = msg_size(msg);
2107 		}
2108 
2109 		/* Protocol message before retransmits, reduce loss risk */
2110 
2111 		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2112 
2113 		if (rec_gap || (msg_probe(msg))) {
2114 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2115 						 0, rec_gap, 0, 0, max_pkt_ack);
2116 		}
2117 		if (msg_seq_gap(msg)) {
2118 			l_ptr->stats.recv_nacks++;
2119 			tipc_link_retransmit(l_ptr, l_ptr->first_out,
2120 					     msg_seq_gap(msg));
2121 		}
2122 		break;
2123 	}
2124 exit:
2125 	buf_discard(buf);
2126 }
2127 
2128 
2129 /*
2130  * tipc_link_tunnel(): Send one message via a link belonging to
2131  * another bearer. Owner node is locked.
2132  */
2133 static void tipc_link_tunnel(struct link *l_ptr,
2134 			     struct tipc_msg *tunnel_hdr,
2135 			     struct tipc_msg  *msg,
2136 			     u32 selector)
2137 {
2138 	struct link *tunnel;
2139 	struct sk_buff *buf;
2140 	u32 length = msg_size(msg);
2141 
2142 	tunnel = l_ptr->owner->active_links[selector & 1];
2143 	if (!tipc_link_is_up(tunnel)) {
2144 		warn("Link changeover error, "
2145 		     "tunnel link no longer available\n");
2146 		return;
2147 	}
2148 	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2149 	buf = tipc_buf_acquire(length + INT_H_SIZE);
2150 	if (!buf) {
2151 		warn("Link changeover error, "
2152 		     "unable to send tunnel msg\n");
2153 		return;
2154 	}
2155 	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2156 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2157 	tipc_link_send_buf(tunnel, buf);
2158 }
2159 
2160 
2161 
2162 /*
2163  * changeover(): Send whole message queue via the remaining link
2164  *               Owner node is locked.
2165  */
2166 
2167 void tipc_link_changeover(struct link *l_ptr)
2168 {
2169 	u32 msgcount = l_ptr->out_queue_size;
2170 	struct sk_buff *crs = l_ptr->first_out;
2171 	struct link *tunnel = l_ptr->owner->active_links[0];
2172 	struct tipc_msg tunnel_hdr;
2173 	int split_bundles;
2174 
2175 	if (!tunnel)
2176 		return;
2177 
2178 	if (!l_ptr->owner->permit_changeover) {
2179 		warn("Link changeover error, "
2180 		     "peer did not permit changeover\n");
2181 		return;
2182 	}
2183 
2184 	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2185 		 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2186 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2187 	msg_set_msgcnt(&tunnel_hdr, msgcount);
2188 
2189 	if (!l_ptr->first_out) {
2190 		struct sk_buff *buf;
2191 
2192 		buf = tipc_buf_acquire(INT_H_SIZE);
2193 		if (buf) {
2194 			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2195 			msg_set_size(&tunnel_hdr, INT_H_SIZE);
2196 			tipc_link_send_buf(tunnel, buf);
2197 		} else {
2198 			warn("Link changeover error, "
2199 			     "unable to send changeover msg\n");
2200 		}
2201 		return;
2202 	}
2203 
2204 	split_bundles = (l_ptr->owner->active_links[0] !=
2205 			 l_ptr->owner->active_links[1]);
2206 
2207 	while (crs) {
2208 		struct tipc_msg *msg = buf_msg(crs);
2209 
2210 		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2211 			struct tipc_msg *m = msg_get_wrapped(msg);
2212 			unchar *pos = (unchar *)m;
2213 
2214 			msgcount = msg_msgcnt(msg);
2215 			while (msgcount--) {
2216 				msg_set_seqno(m, msg_seqno(msg));
2217 				tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2218 						 msg_link_selector(m));
2219 				pos += align(msg_size(m));
2220 				m = (struct tipc_msg *)pos;
2221 			}
2222 		} else {
2223 			tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2224 					 msg_link_selector(msg));
2225 		}
2226 		crs = crs->next;
2227 	}
2228 }
2229 
2230 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2231 {
2232 	struct sk_buff *iter;
2233 	struct tipc_msg tunnel_hdr;
2234 
2235 	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2236 		 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2237 	msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2238 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2239 	iter = l_ptr->first_out;
2240 	while (iter) {
2241 		struct sk_buff *outbuf;
2242 		struct tipc_msg *msg = buf_msg(iter);
2243 		u32 length = msg_size(msg);
2244 
2245 		if (msg_user(msg) == MSG_BUNDLER)
2246 			msg_set_type(msg, CLOSED_MSG);
2247 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
2248 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2249 		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2250 		outbuf = tipc_buf_acquire(length + INT_H_SIZE);
2251 		if (outbuf == NULL) {
2252 			warn("Link changeover error, "
2253 			     "unable to send duplicate msg\n");
2254 			return;
2255 		}
2256 		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2257 		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2258 					       length);
2259 		tipc_link_send_buf(tunnel, outbuf);
2260 		if (!tipc_link_is_up(l_ptr))
2261 			return;
2262 		iter = iter->next;
2263 	}
2264 }
2265 
2266 
2267 
2268 /**
2269  * buf_extract - extracts embedded TIPC message from another message
2270  * @skb: encapsulating message buffer
2271  * @from_pos: offset to extract from
2272  *
2273  * Returns a new message buffer containing an embedded message.  The
2274  * encapsulating message itself is left unchanged.
2275  */
2276 
2277 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2278 {
2279 	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2280 	u32 size = msg_size(msg);
2281 	struct sk_buff *eb;
2282 
2283 	eb = tipc_buf_acquire(size);
2284 	if (eb)
2285 		skb_copy_to_linear_data(eb, msg, size);
2286 	return eb;
2287 }
2288 
2289 /*
2290  *  link_recv_changeover_msg(): Receive tunneled packet sent
2291  *  via other link. Node is locked. Return extracted buffer.
2292  */
2293 
2294 static int link_recv_changeover_msg(struct link **l_ptr,
2295 				    struct sk_buff **buf)
2296 {
2297 	struct sk_buff *tunnel_buf = *buf;
2298 	struct link *dest_link;
2299 	struct tipc_msg *msg;
2300 	struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2301 	u32 msg_typ = msg_type(tunnel_msg);
2302 	u32 msg_count = msg_msgcnt(tunnel_msg);
2303 
2304 	dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2305 	if (!dest_link)
2306 		goto exit;
2307 	if (dest_link == *l_ptr) {
2308 		err("Unexpected changeover message on link <%s>\n",
2309 		    (*l_ptr)->name);
2310 		goto exit;
2311 	}
2312 	*l_ptr = dest_link;
2313 	msg = msg_get_wrapped(tunnel_msg);
2314 
2315 	if (msg_typ == DUPLICATE_MSG) {
2316 		if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
2317 			goto exit;
2318 		*buf = buf_extract(tunnel_buf, INT_H_SIZE);
2319 		if (*buf == NULL) {
2320 			warn("Link changeover error, duplicate msg dropped\n");
2321 			goto exit;
2322 		}
2323 		buf_discard(tunnel_buf);
2324 		return 1;
2325 	}
2326 
2327 	/* First original message ?: */
2328 
2329 	if (tipc_link_is_up(dest_link)) {
2330 		info("Resetting link <%s>, changeover initiated by peer\n",
2331 		     dest_link->name);
2332 		tipc_link_reset(dest_link);
2333 		dest_link->exp_msg_count = msg_count;
2334 		if (!msg_count)
2335 			goto exit;
2336 	} else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2337 		dest_link->exp_msg_count = msg_count;
2338 		if (!msg_count)
2339 			goto exit;
2340 	}
2341 
2342 	/* Receive original message */
2343 
2344 	if (dest_link->exp_msg_count == 0) {
2345 		warn("Link switchover error, "
2346 		     "got too many tunnelled messages\n");
2347 		goto exit;
2348 	}
2349 	dest_link->exp_msg_count--;
2350 	if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2351 		goto exit;
2352 	} else {
2353 		*buf = buf_extract(tunnel_buf, INT_H_SIZE);
2354 		if (*buf != NULL) {
2355 			buf_discard(tunnel_buf);
2356 			return 1;
2357 		} else {
2358 			warn("Link changeover error, original msg dropped\n");
2359 		}
2360 	}
2361 exit:
2362 	*buf = NULL;
2363 	buf_discard(tunnel_buf);
2364 	return 0;
2365 }
2366 
2367 /*
2368  *  Bundler functionality:
2369  */
2370 void tipc_link_recv_bundle(struct sk_buff *buf)
2371 {
2372 	u32 msgcount = msg_msgcnt(buf_msg(buf));
2373 	u32 pos = INT_H_SIZE;
2374 	struct sk_buff *obuf;
2375 
2376 	while (msgcount--) {
2377 		obuf = buf_extract(buf, pos);
2378 		if (obuf == NULL) {
2379 			warn("Link unable to unbundle message(s)\n");
2380 			break;
2381 		}
2382 		pos += align(msg_size(buf_msg(obuf)));
2383 		tipc_net_route_msg(obuf);
2384 	}
2385 	buf_discard(buf);
2386 }
2387 
2388 /*
2389  *  Fragmentation/defragmentation:
2390  */
2391 
2392 
2393 /*
2394  * link_send_long_buf: Entry for buffers needing fragmentation.
2395  * The buffer is complete, inclusive total message length.
2396  * Returns user data length.
2397  */
2398 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2399 {
2400 	struct tipc_msg *inmsg = buf_msg(buf);
2401 	struct tipc_msg fragm_hdr;
2402 	u32 insize = msg_size(inmsg);
2403 	u32 dsz = msg_data_sz(inmsg);
2404 	unchar *crs = buf->data;
2405 	u32 rest = insize;
2406 	u32 pack_sz = l_ptr->max_pkt;
2407 	u32 fragm_sz = pack_sz - INT_H_SIZE;
2408 	u32 fragm_no = 1;
2409 	u32 destaddr;
2410 
2411 	if (msg_short(inmsg))
2412 		destaddr = l_ptr->addr;
2413 	else
2414 		destaddr = msg_destnode(inmsg);
2415 
2416 	if (msg_routed(inmsg))
2417 		msg_set_prevnode(inmsg, tipc_own_addr);
2418 
2419 	/* Prepare reusable fragment header: */
2420 
2421 	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2422 		 INT_H_SIZE, destaddr);
2423 	msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2424 	msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2425 	msg_set_fragm_no(&fragm_hdr, fragm_no);
2426 	l_ptr->stats.sent_fragmented++;
2427 
2428 	/* Chop up message: */
2429 
2430 	while (rest > 0) {
2431 		struct sk_buff *fragm;
2432 
2433 		if (rest <= fragm_sz) {
2434 			fragm_sz = rest;
2435 			msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2436 		}
2437 		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
2438 		if (fragm == NULL) {
2439 			warn("Link unable to fragment message\n");
2440 			dsz = -ENOMEM;
2441 			goto exit;
2442 		}
2443 		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2444 		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2445 		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2446 					       fragm_sz);
2447 		/*  Send queued messages first, if any: */
2448 
2449 		l_ptr->stats.sent_fragments++;
2450 		tipc_link_send_buf(l_ptr, fragm);
2451 		if (!tipc_link_is_up(l_ptr))
2452 			return dsz;
2453 		msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2454 		rest -= fragm_sz;
2455 		crs += fragm_sz;
2456 		msg_set_type(&fragm_hdr, FRAGMENT);
2457 	}
2458 exit:
2459 	buf_discard(buf);
2460 	return dsz;
2461 }
2462 
2463 /*
2464  * A pending message being re-assembled must store certain values
2465  * to handle subsequent fragments correctly. The following functions
2466  * help storing these values in unused, available fields in the
2467  * pending message. This makes dynamic memory allocation unecessary.
2468  */
2469 
2470 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2471 {
2472 	msg_set_seqno(buf_msg(buf), seqno);
2473 }
2474 
2475 static u32 get_fragm_size(struct sk_buff *buf)
2476 {
2477 	return msg_ack(buf_msg(buf));
2478 }
2479 
2480 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2481 {
2482 	msg_set_ack(buf_msg(buf), sz);
2483 }
2484 
2485 static u32 get_expected_frags(struct sk_buff *buf)
2486 {
2487 	return msg_bcast_ack(buf_msg(buf));
2488 }
2489 
2490 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2491 {
2492 	msg_set_bcast_ack(buf_msg(buf), exp);
2493 }
2494 
2495 static u32 get_timer_cnt(struct sk_buff *buf)
2496 {
2497 	return msg_reroute_cnt(buf_msg(buf));
2498 }
2499 
2500 static void incr_timer_cnt(struct sk_buff *buf)
2501 {
2502 	msg_incr_reroute_cnt(buf_msg(buf));
2503 }
2504 
2505 /*
2506  * tipc_link_recv_fragment(): Called with node lock on. Returns
2507  * the reassembled buffer if message is complete.
2508  */
2509 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2510 			    struct tipc_msg **m)
2511 {
2512 	struct sk_buff *prev = NULL;
2513 	struct sk_buff *fbuf = *fb;
2514 	struct tipc_msg *fragm = buf_msg(fbuf);
2515 	struct sk_buff *pbuf = *pending;
2516 	u32 long_msg_seq_no = msg_long_msgno(fragm);
2517 
2518 	*fb = NULL;
2519 
2520 	/* Is there an incomplete message waiting for this fragment? */
2521 
2522 	while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2523 			(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2524 		prev = pbuf;
2525 		pbuf = pbuf->next;
2526 	}
2527 
2528 	if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2529 		struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2530 		u32 msg_sz = msg_size(imsg);
2531 		u32 fragm_sz = msg_data_sz(fragm);
2532 		u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2533 		u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2534 		if (msg_type(imsg) == TIPC_MCAST_MSG)
2535 			max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2536 		if (msg_size(imsg) > max) {
2537 			buf_discard(fbuf);
2538 			return 0;
2539 		}
2540 		pbuf = tipc_buf_acquire(msg_size(imsg));
2541 		if (pbuf != NULL) {
2542 			pbuf->next = *pending;
2543 			*pending = pbuf;
2544 			skb_copy_to_linear_data(pbuf, imsg,
2545 						msg_data_sz(fragm));
2546 			/*  Prepare buffer for subsequent fragments. */
2547 
2548 			set_long_msg_seqno(pbuf, long_msg_seq_no);
2549 			set_fragm_size(pbuf, fragm_sz);
2550 			set_expected_frags(pbuf, exp_fragm_cnt - 1);
2551 		} else {
2552 			warn("Link unable to reassemble fragmented message\n");
2553 		}
2554 		buf_discard(fbuf);
2555 		return 0;
2556 	} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2557 		u32 dsz = msg_data_sz(fragm);
2558 		u32 fsz = get_fragm_size(pbuf);
2559 		u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2560 		u32 exp_frags = get_expected_frags(pbuf) - 1;
2561 		skb_copy_to_linear_data_offset(pbuf, crs,
2562 					       msg_data(fragm), dsz);
2563 		buf_discard(fbuf);
2564 
2565 		/* Is message complete? */
2566 
2567 		if (exp_frags == 0) {
2568 			if (prev)
2569 				prev->next = pbuf->next;
2570 			else
2571 				*pending = pbuf->next;
2572 			msg_reset_reroute_cnt(buf_msg(pbuf));
2573 			*fb = pbuf;
2574 			*m = buf_msg(pbuf);
2575 			return 1;
2576 		}
2577 		set_expected_frags(pbuf, exp_frags);
2578 		return 0;
2579 	}
2580 	buf_discard(fbuf);
2581 	return 0;
2582 }
2583 
2584 /**
2585  * link_check_defragm_bufs - flush stale incoming message fragments
2586  * @l_ptr: pointer to link
2587  */
2588 
2589 static void link_check_defragm_bufs(struct link *l_ptr)
2590 {
2591 	struct sk_buff *prev = NULL;
2592 	struct sk_buff *next = NULL;
2593 	struct sk_buff *buf = l_ptr->defragm_buf;
2594 
2595 	if (!buf)
2596 		return;
2597 	if (!link_working_working(l_ptr))
2598 		return;
2599 	while (buf) {
2600 		u32 cnt = get_timer_cnt(buf);
2601 
2602 		next = buf->next;
2603 		if (cnt < 4) {
2604 			incr_timer_cnt(buf);
2605 			prev = buf;
2606 		} else {
2607 			if (prev)
2608 				prev->next = buf->next;
2609 			else
2610 				l_ptr->defragm_buf = buf->next;
2611 			buf_discard(buf);
2612 		}
2613 		buf = next;
2614 	}
2615 }
2616 
2617 
2618 
2619 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2620 {
2621 	l_ptr->tolerance = tolerance;
2622 	l_ptr->continuity_interval =
2623 		((tolerance / 4) > 500) ? 500 : tolerance / 4;
2624 	l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2625 }
2626 
2627 
2628 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2629 {
2630 	/* Data messages from this node, inclusive FIRST_FRAGM */
2631 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2632 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2633 	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2634 	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2635 	/* Transiting data messages,inclusive FIRST_FRAGM */
2636 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2637 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2638 	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2639 	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2640 	l_ptr->queue_limit[CONN_MANAGER] = 1200;
2641 	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2642 	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2643 	/* FRAGMENT and LAST_FRAGMENT packets */
2644 	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2645 }
2646 
2647 /**
2648  * link_find_link - locate link by name
2649  * @name - ptr to link name string
2650  * @node - ptr to area to be filled with ptr to associated node
2651  *
2652  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2653  * this also prevents link deletion.
2654  *
2655  * Returns pointer to link (or 0 if invalid link name).
2656  */
2657 
2658 static struct link *link_find_link(const char *name, struct tipc_node **node)
2659 {
2660 	struct link_name link_name_parts;
2661 	struct bearer *b_ptr;
2662 	struct link *l_ptr;
2663 
2664 	if (!link_name_validate(name, &link_name_parts))
2665 		return NULL;
2666 
2667 	b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2668 	if (!b_ptr)
2669 		return NULL;
2670 
2671 	*node = tipc_node_find(link_name_parts.addr_peer);
2672 	if (!*node)
2673 		return NULL;
2674 
2675 	l_ptr = (*node)->links[b_ptr->identity];
2676 	if (!l_ptr || strcmp(l_ptr->name, name))
2677 		return NULL;
2678 
2679 	return l_ptr;
2680 }
2681 
2682 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2683 				     u16 cmd)
2684 {
2685 	struct tipc_link_config *args;
2686 	u32 new_value;
2687 	struct link *l_ptr;
2688 	struct tipc_node *node;
2689 	int res;
2690 
2691 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2692 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2693 
2694 	args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2695 	new_value = ntohl(args->value);
2696 
2697 	if (!strcmp(args->name, tipc_bclink_name)) {
2698 		if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2699 		    (tipc_bclink_set_queue_limits(new_value) == 0))
2700 			return tipc_cfg_reply_none();
2701 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2702 						   " (cannot change setting on broadcast link)");
2703 	}
2704 
2705 	read_lock_bh(&tipc_net_lock);
2706 	l_ptr = link_find_link(args->name, &node);
2707 	if (!l_ptr) {
2708 		read_unlock_bh(&tipc_net_lock);
2709 		return tipc_cfg_reply_error_string("link not found");
2710 	}
2711 
2712 	tipc_node_lock(node);
2713 	res = -EINVAL;
2714 	switch (cmd) {
2715 	case TIPC_CMD_SET_LINK_TOL:
2716 		if ((new_value >= TIPC_MIN_LINK_TOL) &&
2717 		    (new_value <= TIPC_MAX_LINK_TOL)) {
2718 			link_set_supervision_props(l_ptr, new_value);
2719 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2720 						 0, 0, new_value, 0, 0);
2721 			res = 0;
2722 		}
2723 		break;
2724 	case TIPC_CMD_SET_LINK_PRI:
2725 		if ((new_value >= TIPC_MIN_LINK_PRI) &&
2726 		    (new_value <= TIPC_MAX_LINK_PRI)) {
2727 			l_ptr->priority = new_value;
2728 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2729 						 0, 0, 0, new_value, 0);
2730 			res = 0;
2731 		}
2732 		break;
2733 	case TIPC_CMD_SET_LINK_WINDOW:
2734 		if ((new_value >= TIPC_MIN_LINK_WIN) &&
2735 		    (new_value <= TIPC_MAX_LINK_WIN)) {
2736 			tipc_link_set_queue_limits(l_ptr, new_value);
2737 			res = 0;
2738 		}
2739 		break;
2740 	}
2741 	tipc_node_unlock(node);
2742 
2743 	read_unlock_bh(&tipc_net_lock);
2744 	if (res)
2745 		return tipc_cfg_reply_error_string("cannot change link setting");
2746 
2747 	return tipc_cfg_reply_none();
2748 }
2749 
2750 /**
2751  * link_reset_statistics - reset link statistics
2752  * @l_ptr: pointer to link
2753  */
2754 
2755 static void link_reset_statistics(struct link *l_ptr)
2756 {
2757 	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2758 	l_ptr->stats.sent_info = l_ptr->next_out_no;
2759 	l_ptr->stats.recv_info = l_ptr->next_in_no;
2760 }
2761 
2762 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2763 {
2764 	char *link_name;
2765 	struct link *l_ptr;
2766 	struct tipc_node *node;
2767 
2768 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2769 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2770 
2771 	link_name = (char *)TLV_DATA(req_tlv_area);
2772 	if (!strcmp(link_name, tipc_bclink_name)) {
2773 		if (tipc_bclink_reset_stats())
2774 			return tipc_cfg_reply_error_string("link not found");
2775 		return tipc_cfg_reply_none();
2776 	}
2777 
2778 	read_lock_bh(&tipc_net_lock);
2779 	l_ptr = link_find_link(link_name, &node);
2780 	if (!l_ptr) {
2781 		read_unlock_bh(&tipc_net_lock);
2782 		return tipc_cfg_reply_error_string("link not found");
2783 	}
2784 
2785 	tipc_node_lock(node);
2786 	link_reset_statistics(l_ptr);
2787 	tipc_node_unlock(node);
2788 	read_unlock_bh(&tipc_net_lock);
2789 	return tipc_cfg_reply_none();
2790 }
2791 
2792 /**
2793  * percent - convert count to a percentage of total (rounding up or down)
2794  */
2795 
2796 static u32 percent(u32 count, u32 total)
2797 {
2798 	return (count * 100 + (total / 2)) / total;
2799 }
2800 
2801 /**
2802  * tipc_link_stats - print link statistics
2803  * @name: link name
2804  * @buf: print buffer area
2805  * @buf_size: size of print buffer area
2806  *
2807  * Returns length of print buffer data string (or 0 if error)
2808  */
2809 
2810 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2811 {
2812 	struct print_buf pb;
2813 	struct link *l_ptr;
2814 	struct tipc_node *node;
2815 	char *status;
2816 	u32 profile_total = 0;
2817 
2818 	if (!strcmp(name, tipc_bclink_name))
2819 		return tipc_bclink_stats(buf, buf_size);
2820 
2821 	tipc_printbuf_init(&pb, buf, buf_size);
2822 
2823 	read_lock_bh(&tipc_net_lock);
2824 	l_ptr = link_find_link(name, &node);
2825 	if (!l_ptr) {
2826 		read_unlock_bh(&tipc_net_lock);
2827 		return 0;
2828 	}
2829 	tipc_node_lock(node);
2830 
2831 	if (tipc_link_is_active(l_ptr))
2832 		status = "ACTIVE";
2833 	else if (tipc_link_is_up(l_ptr))
2834 		status = "STANDBY";
2835 	else
2836 		status = "DEFUNCT";
2837 	tipc_printf(&pb, "Link <%s>\n"
2838 			 "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2839 			 "  Window:%u packets\n",
2840 		    l_ptr->name, status, l_ptr->max_pkt,
2841 		    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
2842 	tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2843 		    l_ptr->next_in_no - l_ptr->stats.recv_info,
2844 		    l_ptr->stats.recv_fragments,
2845 		    l_ptr->stats.recv_fragmented,
2846 		    l_ptr->stats.recv_bundles,
2847 		    l_ptr->stats.recv_bundled);
2848 	tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
2849 		    l_ptr->next_out_no - l_ptr->stats.sent_info,
2850 		    l_ptr->stats.sent_fragments,
2851 		    l_ptr->stats.sent_fragmented,
2852 		    l_ptr->stats.sent_bundles,
2853 		    l_ptr->stats.sent_bundled);
2854 	profile_total = l_ptr->stats.msg_length_counts;
2855 	if (!profile_total)
2856 		profile_total = 1;
2857 	tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
2858 			 "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2859 			 "-16354:%u%% -32768:%u%% -66000:%u%%\n",
2860 		    l_ptr->stats.msg_length_counts,
2861 		    l_ptr->stats.msg_lengths_total / profile_total,
2862 		    percent(l_ptr->stats.msg_length_profile[0], profile_total),
2863 		    percent(l_ptr->stats.msg_length_profile[1], profile_total),
2864 		    percent(l_ptr->stats.msg_length_profile[2], profile_total),
2865 		    percent(l_ptr->stats.msg_length_profile[3], profile_total),
2866 		    percent(l_ptr->stats.msg_length_profile[4], profile_total),
2867 		    percent(l_ptr->stats.msg_length_profile[5], profile_total),
2868 		    percent(l_ptr->stats.msg_length_profile[6], profile_total));
2869 	tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
2870 		    l_ptr->stats.recv_states,
2871 		    l_ptr->stats.recv_probes,
2872 		    l_ptr->stats.recv_nacks,
2873 		    l_ptr->stats.deferred_recv,
2874 		    l_ptr->stats.duplicates);
2875 	tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
2876 		    l_ptr->stats.sent_states,
2877 		    l_ptr->stats.sent_probes,
2878 		    l_ptr->stats.sent_nacks,
2879 		    l_ptr->stats.sent_acks,
2880 		    l_ptr->stats.retransmitted);
2881 	tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
2882 		    l_ptr->stats.bearer_congs,
2883 		    l_ptr->stats.link_congs,
2884 		    l_ptr->stats.max_queue_sz,
2885 		    l_ptr->stats.queue_sz_counts
2886 		    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
2887 		    : 0);
2888 
2889 	tipc_node_unlock(node);
2890 	read_unlock_bh(&tipc_net_lock);
2891 	return tipc_printbuf_validate(&pb);
2892 }
2893 
2894 #define MAX_LINK_STATS_INFO 2000
2895 
2896 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2897 {
2898 	struct sk_buff *buf;
2899 	struct tlv_desc *rep_tlv;
2900 	int str_len;
2901 
2902 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2903 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2904 
2905 	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
2906 	if (!buf)
2907 		return NULL;
2908 
2909 	rep_tlv = (struct tlv_desc *)buf->data;
2910 
2911 	str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
2912 				  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
2913 	if (!str_len) {
2914 		buf_discard(buf);
2915 		return tipc_cfg_reply_error_string("link not found");
2916 	}
2917 
2918 	skb_put(buf, TLV_SPACE(str_len));
2919 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2920 
2921 	return buf;
2922 }
2923 
2924 /**
2925  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
2926  * @dest: network address of destination node
2927  * @selector: used to select from set of active links
2928  *
2929  * If no active link can be found, uses default maximum packet size.
2930  */
2931 
2932 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
2933 {
2934 	struct tipc_node *n_ptr;
2935 	struct link *l_ptr;
2936 	u32 res = MAX_PKT_DEFAULT;
2937 
2938 	if (dest == tipc_own_addr)
2939 		return MAX_MSG_SIZE;
2940 
2941 	read_lock_bh(&tipc_net_lock);
2942 	n_ptr = tipc_node_find(dest);
2943 	if (n_ptr) {
2944 		tipc_node_lock(n_ptr);
2945 		l_ptr = n_ptr->active_links[selector & 1];
2946 		if (l_ptr)
2947 			res = l_ptr->max_pkt;
2948 		tipc_node_unlock(n_ptr);
2949 	}
2950 	read_unlock_bh(&tipc_net_lock);
2951 	return res;
2952 }
2953 
2954 static void link_print(struct link *l_ptr, const char *str)
2955 {
2956 	char print_area[256];
2957 	struct print_buf pb;
2958 	struct print_buf *buf = &pb;
2959 
2960 	tipc_printbuf_init(buf, print_area, sizeof(print_area));
2961 
2962 	tipc_printf(buf, str);
2963 	tipc_printf(buf, "Link %x<%s>:",
2964 		    l_ptr->addr, l_ptr->b_ptr->publ.name);
2965 
2966 #ifdef CONFIG_TIPC_DEBUG
2967 	if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
2968 		goto print_state;
2969 
2970 	tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
2971 	tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
2972 	tipc_printf(buf, "SQUE");
2973 	if (l_ptr->first_out) {
2974 		tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
2975 		if (l_ptr->next_out)
2976 			tipc_printf(buf, "%u..",
2977 				    msg_seqno(buf_msg(l_ptr->next_out)));
2978 		tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
2979 		if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
2980 			 msg_seqno(buf_msg(l_ptr->first_out)))
2981 		     != (l_ptr->out_queue_size - 1)) ||
2982 		    (l_ptr->last_out->next != NULL)) {
2983 			tipc_printf(buf, "\nSend queue inconsistency\n");
2984 			tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
2985 			tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
2986 			tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
2987 		}
2988 	} else
2989 		tipc_printf(buf, "[]");
2990 	tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
2991 	if (l_ptr->oldest_deferred_in) {
2992 		u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2993 		u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
2994 		tipc_printf(buf, ":RQUE[%u..%u]", o, n);
2995 		if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
2996 			tipc_printf(buf, ":RQSIZ(%u)",
2997 				    l_ptr->deferred_inqueue_sz);
2998 		}
2999 	}
3000 print_state:
3001 #endif
3002 
3003 	if (link_working_unknown(l_ptr))
3004 		tipc_printf(buf, ":WU");
3005 	else if (link_reset_reset(l_ptr))
3006 		tipc_printf(buf, ":RR");
3007 	else if (link_reset_unknown(l_ptr))
3008 		tipc_printf(buf, ":RU");
3009 	else if (link_working_working(l_ptr))
3010 		tipc_printf(buf, ":WW");
3011 	tipc_printf(buf, "\n");
3012 
3013 	tipc_printbuf_validate(buf);
3014 	info("%s", print_area);
3015 }
3016 
3017