xref: /openbmc/linux/net/tipc/link.c (revision 1fa6ac37)
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, Ericsson AB
5  * Copyright (c) 2004-2007, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "dbg.h"
39 #include "link.h"
40 #include "net.h"
41 #include "node.h"
42 #include "port.h"
43 #include "addr.h"
44 #include "node_subscr.h"
45 #include "name_distr.h"
46 #include "bearer.h"
47 #include "name_table.h"
48 #include "discover.h"
49 #include "config.h"
50 #include "bcast.h"
51 
52 
53 /*
54  * Out-of-range value for link session numbers
55  */
56 
57 #define INVALID_SESSION 0x10000
58 
59 /*
60  * Limit for deferred reception queue:
61  */
62 
63 #define DEF_QUEUE_LIMIT 256u
64 
65 /*
66  * Link state events:
67  */
68 
69 #define  STARTING_EVT    856384768	/* link processing trigger */
70 #define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
71 #define  TIMEOUT_EVT     560817u	/* link timer expired */
72 
73 /*
74  * The following two 'message types' is really just implementation
75  * data conveniently stored in the message header.
76  * They must not be considered part of the protocol
77  */
78 #define OPEN_MSG   0
79 #define CLOSED_MSG 1
80 
81 /*
82  * State value stored in 'exp_msg_count'
83  */
84 
85 #define START_CHANGEOVER 100000u
86 
87 /**
88  * struct link_name - deconstructed link name
89  * @addr_local: network address of node at this end
90  * @if_local: name of interface at this end
91  * @addr_peer: network address of node at far end
92  * @if_peer: name of interface at far end
93  */
94 
95 struct link_name {
96 	u32 addr_local;
97 	char if_local[TIPC_MAX_IF_NAME];
98 	u32 addr_peer;
99 	char if_peer[TIPC_MAX_IF_NAME];
100 };
101 
102 #if 0
103 
104 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
105 
106 /**
107  * struct link_event - link up/down event notification
108  */
109 
110 struct link_event {
111 	u32 addr;
112 	int up;
113 	void (*fcn)(u32, char *, int);
114 	char name[TIPC_MAX_LINK_NAME];
115 };
116 
117 #endif
118 
119 static void link_handle_out_of_seq_msg(struct link *l_ptr,
120 				       struct sk_buff *buf);
121 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
122 static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
123 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
124 static int  link_send_sections_long(struct port *sender,
125 				    struct iovec const *msg_sect,
126 				    u32 num_sect, u32 destnode);
127 static void link_check_defragm_bufs(struct link *l_ptr);
128 static void link_state_event(struct link *l_ptr, u32 event);
129 static void link_reset_statistics(struct link *l_ptr);
130 static void link_print(struct link *l_ptr, struct print_buf *buf,
131 		       const char *str);
132 
133 /*
134  * Debugging code used by link routines only
135  *
136  * When debugging link problems on a system that has multiple links,
137  * the standard TIPC debugging routines may not be useful since they
138  * allow the output from multiple links to be intermixed.  For this reason
139  * routines of the form "dbg_link_XXX()" have been created that will capture
140  * debug info into a link's personal print buffer, which can then be dumped
141  * into the TIPC system log (TIPC_LOG) upon request.
142  *
143  * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
144  * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
145  * the dbg_link_XXX() routines simply send their output to the standard
146  * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
147  * when there is only a single link in the system being debugged.
148  *
149  * Notes:
150  * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
151  * - "l_ptr" must be valid when using dbg_link_XXX() macros
152  */
153 
154 #define LINK_LOG_BUF_SIZE 0
155 
156 #define dbg_link(fmt, arg...) \
157 	do { \
158 		if (LINK_LOG_BUF_SIZE) \
159 			tipc_printf(&l_ptr->print_buf, fmt, ## arg); \
160 	} while (0)
161 #define dbg_link_msg(msg, txt) \
162 	do { \
163 		if (LINK_LOG_BUF_SIZE) \
164 			tipc_msg_dbg(&l_ptr->print_buf, msg, txt); \
165 	} while (0)
166 #define dbg_link_state(txt) \
167 	do { \
168 		if (LINK_LOG_BUF_SIZE) \
169 			link_print(l_ptr, &l_ptr->print_buf, txt); \
170 	} while (0)
171 #define dbg_link_dump() do { \
172 	if (LINK_LOG_BUF_SIZE) { \
173 		tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
174 		tipc_printbuf_move(LOG, &l_ptr->print_buf); \
175 	} \
176 } while (0)
177 
178 static void dbg_print_link(struct link *l_ptr, const char *str)
179 {
180 	if (DBG_OUTPUT != TIPC_NULL)
181 		link_print(l_ptr, DBG_OUTPUT, str);
182 }
183 
184 static void dbg_print_buf_chain(struct sk_buff *root_buf)
185 {
186 	if (DBG_OUTPUT != TIPC_NULL) {
187 		struct sk_buff *buf = root_buf;
188 
189 		while (buf) {
190 			msg_dbg(buf_msg(buf), "In chain: ");
191 			buf = buf->next;
192 		}
193 	}
194 }
195 
196 /*
197  *  Simple link routines
198  */
199 
200 static unsigned int align(unsigned int i)
201 {
202 	return (i + 3) & ~3u;
203 }
204 
205 static void link_init_max_pkt(struct link *l_ptr)
206 {
207 	u32 max_pkt;
208 
209 	max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
210 	if (max_pkt > MAX_MSG_SIZE)
211 		max_pkt = MAX_MSG_SIZE;
212 
213 	l_ptr->max_pkt_target = max_pkt;
214 	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
215 		l_ptr->max_pkt = l_ptr->max_pkt_target;
216 	else
217 		l_ptr->max_pkt = MAX_PKT_DEFAULT;
218 
219 	l_ptr->max_pkt_probes = 0;
220 }
221 
222 static u32 link_next_sent(struct link *l_ptr)
223 {
224 	if (l_ptr->next_out)
225 		return msg_seqno(buf_msg(l_ptr->next_out));
226 	return mod(l_ptr->next_out_no);
227 }
228 
229 static u32 link_last_sent(struct link *l_ptr)
230 {
231 	return mod(link_next_sent(l_ptr) - 1);
232 }
233 
234 /*
235  *  Simple non-static link routines (i.e. referenced outside this file)
236  */
237 
238 int tipc_link_is_up(struct link *l_ptr)
239 {
240 	if (!l_ptr)
241 		return 0;
242 	return (link_working_working(l_ptr) || link_working_unknown(l_ptr));
243 }
244 
245 int tipc_link_is_active(struct link *l_ptr)
246 {
247 	return ((l_ptr->owner->active_links[0] == l_ptr) ||
248 		(l_ptr->owner->active_links[1] == l_ptr));
249 }
250 
251 /**
252  * link_name_validate - validate & (optionally) deconstruct link name
253  * @name - ptr to link name string
254  * @name_parts - ptr to area for link name components (or NULL if not needed)
255  *
256  * Returns 1 if link name is valid, otherwise 0.
257  */
258 
259 static int link_name_validate(const char *name, struct link_name *name_parts)
260 {
261 	char name_copy[TIPC_MAX_LINK_NAME];
262 	char *addr_local;
263 	char *if_local;
264 	char *addr_peer;
265 	char *if_peer;
266 	char dummy;
267 	u32 z_local, c_local, n_local;
268 	u32 z_peer, c_peer, n_peer;
269 	u32 if_local_len;
270 	u32 if_peer_len;
271 
272 	/* copy link name & ensure length is OK */
273 
274 	name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
275 	/* need above in case non-Posix strncpy() doesn't pad with nulls */
276 	strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
277 	if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
278 		return 0;
279 
280 	/* ensure all component parts of link name are present */
281 
282 	addr_local = name_copy;
283 	if ((if_local = strchr(addr_local, ':')) == NULL)
284 		return 0;
285 	*(if_local++) = 0;
286 	if ((addr_peer = strchr(if_local, '-')) == NULL)
287 		return 0;
288 	*(addr_peer++) = 0;
289 	if_local_len = addr_peer - if_local;
290 	if ((if_peer = strchr(addr_peer, ':')) == NULL)
291 		return 0;
292 	*(if_peer++) = 0;
293 	if_peer_len = strlen(if_peer) + 1;
294 
295 	/* validate component parts of link name */
296 
297 	if ((sscanf(addr_local, "%u.%u.%u%c",
298 		    &z_local, &c_local, &n_local, &dummy) != 3) ||
299 	    (sscanf(addr_peer, "%u.%u.%u%c",
300 		    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
301 	    (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
302 	    (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
303 	    (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
304 	    (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
305 	    (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
306 	    (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
307 		return 0;
308 
309 	/* return link name components, if necessary */
310 
311 	if (name_parts) {
312 		name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
313 		strcpy(name_parts->if_local, if_local);
314 		name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
315 		strcpy(name_parts->if_peer, if_peer);
316 	}
317 	return 1;
318 }
319 
320 /**
321  * link_timeout - handle expiration of link timer
322  * @l_ptr: pointer to link
323  *
324  * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
325  * with tipc_link_delete().  (There is no risk that the node will be deleted by
326  * another thread because tipc_link_delete() always cancels the link timer before
327  * tipc_node_delete() is called.)
328  */
329 
330 static void link_timeout(struct link *l_ptr)
331 {
332 	tipc_node_lock(l_ptr->owner);
333 
334 	/* update counters used in statistical profiling of send traffic */
335 
336 	l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
337 	l_ptr->stats.queue_sz_counts++;
338 
339 	if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
340 		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
341 
342 	if (l_ptr->first_out) {
343 		struct tipc_msg *msg = buf_msg(l_ptr->first_out);
344 		u32 length = msg_size(msg);
345 
346 		if ((msg_user(msg) == MSG_FRAGMENTER) &&
347 		    (msg_type(msg) == FIRST_FRAGMENT)) {
348 			length = msg_size(msg_get_wrapped(msg));
349 		}
350 		if (length) {
351 			l_ptr->stats.msg_lengths_total += length;
352 			l_ptr->stats.msg_length_counts++;
353 			if (length <= 64)
354 				l_ptr->stats.msg_length_profile[0]++;
355 			else if (length <= 256)
356 				l_ptr->stats.msg_length_profile[1]++;
357 			else if (length <= 1024)
358 				l_ptr->stats.msg_length_profile[2]++;
359 			else if (length <= 4096)
360 				l_ptr->stats.msg_length_profile[3]++;
361 			else if (length <= 16384)
362 				l_ptr->stats.msg_length_profile[4]++;
363 			else if (length <= 32768)
364 				l_ptr->stats.msg_length_profile[5]++;
365 			else
366 				l_ptr->stats.msg_length_profile[6]++;
367 		}
368 	}
369 
370 	/* do all other link processing performed on a periodic basis */
371 
372 	link_check_defragm_bufs(l_ptr);
373 
374 	link_state_event(l_ptr, TIMEOUT_EVT);
375 
376 	if (l_ptr->next_out)
377 		tipc_link_push_queue(l_ptr);
378 
379 	tipc_node_unlock(l_ptr->owner);
380 }
381 
382 static void link_set_timer(struct link *l_ptr, u32 time)
383 {
384 	k_start_timer(&l_ptr->timer, time);
385 }
386 
387 /**
388  * tipc_link_create - create a new link
389  * @b_ptr: pointer to associated bearer
390  * @peer: network address of node at other end of link
391  * @media_addr: media address to use when sending messages over link
392  *
393  * Returns pointer to link.
394  */
395 
396 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
397 			      const struct tipc_media_addr *media_addr)
398 {
399 	struct link *l_ptr;
400 	struct tipc_msg *msg;
401 	char *if_name;
402 
403 	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
404 	if (!l_ptr) {
405 		warn("Link creation failed, no memory\n");
406 		return NULL;
407 	}
408 
409 	if (LINK_LOG_BUF_SIZE) {
410 		char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
411 
412 		if (!pb) {
413 			kfree(l_ptr);
414 			warn("Link creation failed, no memory for print buffer\n");
415 			return NULL;
416 		}
417 		tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
418 	}
419 
420 	l_ptr->addr = peer;
421 	if_name = strchr(b_ptr->publ.name, ':') + 1;
422 	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
423 		tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
424 		tipc_node(tipc_own_addr),
425 		if_name,
426 		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
427 		/* note: peer i/f is appended to link name by reset/activate */
428 	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
429 	l_ptr->checkpoint = 1;
430 	l_ptr->b_ptr = b_ptr;
431 	link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
432 	l_ptr->state = RESET_UNKNOWN;
433 
434 	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
435 	msg = l_ptr->pmsg;
436 	tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr);
437 	msg_set_size(msg, sizeof(l_ptr->proto_msg));
438 	msg_set_session(msg, (tipc_random & 0xffff));
439 	msg_set_bearer_id(msg, b_ptr->identity);
440 	strcpy((char *)msg_data(msg), if_name);
441 
442 	l_ptr->priority = b_ptr->priority;
443 	tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
444 
445 	link_init_max_pkt(l_ptr);
446 
447 	l_ptr->next_out_no = 1;
448 	INIT_LIST_HEAD(&l_ptr->waiting_ports);
449 
450 	link_reset_statistics(l_ptr);
451 
452 	l_ptr->owner = tipc_node_attach_link(l_ptr);
453 	if (!l_ptr->owner) {
454 		if (LINK_LOG_BUF_SIZE)
455 			kfree(l_ptr->print_buf.buf);
456 		kfree(l_ptr);
457 		return NULL;
458 	}
459 
460 	k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
461 	list_add_tail(&l_ptr->link_list, &b_ptr->links);
462 	tipc_k_signal((Handler)tipc_link_start, (unsigned long)l_ptr);
463 
464 	dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
465 	    l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
466 
467 	return l_ptr;
468 }
469 
470 /**
471  * tipc_link_delete - delete a link
472  * @l_ptr: pointer to link
473  *
474  * Note: 'tipc_net_lock' is write_locked, bearer is locked.
475  * This routine must not grab the node lock until after link timer cancellation
476  * to avoid a potential deadlock situation.
477  */
478 
479 void tipc_link_delete(struct link *l_ptr)
480 {
481 	if (!l_ptr) {
482 		err("Attempt to delete non-existent link\n");
483 		return;
484 	}
485 
486 	dbg("tipc_link_delete()\n");
487 
488 	k_cancel_timer(&l_ptr->timer);
489 
490 	tipc_node_lock(l_ptr->owner);
491 	tipc_link_reset(l_ptr);
492 	tipc_node_detach_link(l_ptr->owner, l_ptr);
493 	tipc_link_stop(l_ptr);
494 	list_del_init(&l_ptr->link_list);
495 	if (LINK_LOG_BUF_SIZE)
496 		kfree(l_ptr->print_buf.buf);
497 	tipc_node_unlock(l_ptr->owner);
498 	k_term_timer(&l_ptr->timer);
499 	kfree(l_ptr);
500 }
501 
502 void tipc_link_start(struct link *l_ptr)
503 {
504 	dbg("tipc_link_start %x\n", l_ptr);
505 	link_state_event(l_ptr, STARTING_EVT);
506 }
507 
508 /**
509  * link_schedule_port - schedule port for deferred sending
510  * @l_ptr: pointer to link
511  * @origport: reference to sending port
512  * @sz: amount of data to be sent
513  *
514  * Schedules port for renewed sending of messages after link congestion
515  * has abated.
516  */
517 
518 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
519 {
520 	struct port *p_ptr;
521 
522 	spin_lock_bh(&tipc_port_list_lock);
523 	p_ptr = tipc_port_lock(origport);
524 	if (p_ptr) {
525 		if (!p_ptr->wakeup)
526 			goto exit;
527 		if (!list_empty(&p_ptr->wait_list))
528 			goto exit;
529 		p_ptr->publ.congested = 1;
530 		p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt);
531 		list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
532 		l_ptr->stats.link_congs++;
533 exit:
534 		tipc_port_unlock(p_ptr);
535 	}
536 	spin_unlock_bh(&tipc_port_list_lock);
537 	return -ELINKCONG;
538 }
539 
540 void tipc_link_wakeup_ports(struct link *l_ptr, int all)
541 {
542 	struct port *p_ptr;
543 	struct port *temp_p_ptr;
544 	int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
545 
546 	if (all)
547 		win = 100000;
548 	if (win <= 0)
549 		return;
550 	if (!spin_trylock_bh(&tipc_port_list_lock))
551 		return;
552 	if (link_congested(l_ptr))
553 		goto exit;
554 	list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
555 				 wait_list) {
556 		if (win <= 0)
557 			break;
558 		list_del_init(&p_ptr->wait_list);
559 		spin_lock_bh(p_ptr->publ.lock);
560 		p_ptr->publ.congested = 0;
561 		p_ptr->wakeup(&p_ptr->publ);
562 		win -= p_ptr->waiting_pkts;
563 		spin_unlock_bh(p_ptr->publ.lock);
564 	}
565 
566 exit:
567 	spin_unlock_bh(&tipc_port_list_lock);
568 }
569 
570 /**
571  * link_release_outqueue - purge link's outbound message queue
572  * @l_ptr: pointer to link
573  */
574 
575 static void link_release_outqueue(struct link *l_ptr)
576 {
577 	struct sk_buff *buf = l_ptr->first_out;
578 	struct sk_buff *next;
579 
580 	while (buf) {
581 		next = buf->next;
582 		buf_discard(buf);
583 		buf = next;
584 	}
585 	l_ptr->first_out = NULL;
586 	l_ptr->out_queue_size = 0;
587 }
588 
589 /**
590  * tipc_link_reset_fragments - purge link's inbound message fragments queue
591  * @l_ptr: pointer to link
592  */
593 
594 void tipc_link_reset_fragments(struct link *l_ptr)
595 {
596 	struct sk_buff *buf = l_ptr->defragm_buf;
597 	struct sk_buff *next;
598 
599 	while (buf) {
600 		next = buf->next;
601 		buf_discard(buf);
602 		buf = next;
603 	}
604 	l_ptr->defragm_buf = NULL;
605 }
606 
607 /**
608  * tipc_link_stop - purge all inbound and outbound messages associated with link
609  * @l_ptr: pointer to link
610  */
611 
612 void tipc_link_stop(struct link *l_ptr)
613 {
614 	struct sk_buff *buf;
615 	struct sk_buff *next;
616 
617 	buf = l_ptr->oldest_deferred_in;
618 	while (buf) {
619 		next = buf->next;
620 		buf_discard(buf);
621 		buf = next;
622 	}
623 
624 	buf = l_ptr->first_out;
625 	while (buf) {
626 		next = buf->next;
627 		buf_discard(buf);
628 		buf = next;
629 	}
630 
631 	tipc_link_reset_fragments(l_ptr);
632 
633 	buf_discard(l_ptr->proto_msg_queue);
634 	l_ptr->proto_msg_queue = NULL;
635 }
636 
637 #if 0
638 
639 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
640 
641 static void link_recv_event(struct link_event *ev)
642 {
643 	ev->fcn(ev->addr, ev->name, ev->up);
644 	kfree(ev);
645 }
646 
647 static void link_send_event(void (*fcn)(u32 a, char *n, int up),
648 			    struct link *l_ptr, int up)
649 {
650 	struct link_event *ev;
651 
652 	ev = kmalloc(sizeof(*ev), GFP_ATOMIC);
653 	if (!ev) {
654 		warn("Link event allocation failure\n");
655 		return;
656 	}
657 	ev->addr = l_ptr->addr;
658 	ev->up = up;
659 	ev->fcn = fcn;
660 	memcpy(ev->name, l_ptr->name, TIPC_MAX_LINK_NAME);
661 	tipc_k_signal((Handler)link_recv_event, (unsigned long)ev);
662 }
663 
664 #else
665 
666 #define link_send_event(fcn, l_ptr, up) do { } while (0)
667 
668 #endif
669 
670 void tipc_link_reset(struct link *l_ptr)
671 {
672 	struct sk_buff *buf;
673 	u32 prev_state = l_ptr->state;
674 	u32 checkpoint = l_ptr->next_in_no;
675 	int was_active_link = tipc_link_is_active(l_ptr);
676 
677 	msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
678 
679 	/* Link is down, accept any session */
680 	l_ptr->peer_session = INVALID_SESSION;
681 
682 	/* Prepare for max packet size negotiation */
683 	link_init_max_pkt(l_ptr);
684 
685 	l_ptr->state = RESET_UNKNOWN;
686 	dbg_link_state("Resetting Link\n");
687 
688 	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
689 		return;
690 
691 	tipc_node_link_down(l_ptr->owner, l_ptr);
692 	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
693 #if 0
694 	tipc_printf(TIPC_CONS, "\nReset link <%s>\n", l_ptr->name);
695 	dbg_link_dump();
696 #endif
697 	if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
698 	    l_ptr->owner->permit_changeover) {
699 		l_ptr->reset_checkpoint = checkpoint;
700 		l_ptr->exp_msg_count = START_CHANGEOVER;
701 	}
702 
703 	/* Clean up all queues: */
704 
705 	link_release_outqueue(l_ptr);
706 	buf_discard(l_ptr->proto_msg_queue);
707 	l_ptr->proto_msg_queue = NULL;
708 	buf = l_ptr->oldest_deferred_in;
709 	while (buf) {
710 		struct sk_buff *next = buf->next;
711 		buf_discard(buf);
712 		buf = next;
713 	}
714 	if (!list_empty(&l_ptr->waiting_ports))
715 		tipc_link_wakeup_ports(l_ptr, 1);
716 
717 	l_ptr->retransm_queue_head = 0;
718 	l_ptr->retransm_queue_size = 0;
719 	l_ptr->last_out = NULL;
720 	l_ptr->first_out = NULL;
721 	l_ptr->next_out = NULL;
722 	l_ptr->unacked_window = 0;
723 	l_ptr->checkpoint = 1;
724 	l_ptr->next_out_no = 1;
725 	l_ptr->deferred_inqueue_sz = 0;
726 	l_ptr->oldest_deferred_in = NULL;
727 	l_ptr->newest_deferred_in = NULL;
728 	l_ptr->fsm_msg_cnt = 0;
729 	l_ptr->stale_count = 0;
730 	link_reset_statistics(l_ptr);
731 
732 	link_send_event(tipc_cfg_link_event, l_ptr, 0);
733 	if (!in_own_cluster(l_ptr->addr))
734 		link_send_event(tipc_disc_link_event, l_ptr, 0);
735 }
736 
737 
738 static void link_activate(struct link *l_ptr)
739 {
740 	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
741 	tipc_node_link_up(l_ptr->owner, l_ptr);
742 	tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
743 	link_send_event(tipc_cfg_link_event, l_ptr, 1);
744 	if (!in_own_cluster(l_ptr->addr))
745 		link_send_event(tipc_disc_link_event, l_ptr, 1);
746 }
747 
748 /**
749  * link_state_event - link finite state machine
750  * @l_ptr: pointer to link
751  * @event: state machine event to process
752  */
753 
754 static void link_state_event(struct link *l_ptr, unsigned event)
755 {
756 	struct link *other;
757 	u32 cont_intv = l_ptr->continuity_interval;
758 
759 	if (!l_ptr->started && (event != STARTING_EVT))
760 		return;		/* Not yet. */
761 
762 	if (link_blocked(l_ptr)) {
763 		if (event == TIMEOUT_EVT) {
764 			link_set_timer(l_ptr, cont_intv);
765 		}
766 		return;	  /* Changeover going on */
767 	}
768 	dbg_link("STATE_EV: <%s> ", l_ptr->name);
769 
770 	switch (l_ptr->state) {
771 	case WORKING_WORKING:
772 		dbg_link("WW/");
773 		switch (event) {
774 		case TRAFFIC_MSG_EVT:
775 			dbg_link("TRF-");
776 			/* fall through */
777 		case ACTIVATE_MSG:
778 			dbg_link("ACT\n");
779 			break;
780 		case TIMEOUT_EVT:
781 			dbg_link("TIM ");
782 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
783 				l_ptr->checkpoint = l_ptr->next_in_no;
784 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
785 					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
786 								 0, 0, 0, 0, 0);
787 					l_ptr->fsm_msg_cnt++;
788 				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
789 					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
790 								 1, 0, 0, 0, 0);
791 					l_ptr->fsm_msg_cnt++;
792 				}
793 				link_set_timer(l_ptr, cont_intv);
794 				break;
795 			}
796 			dbg_link(" -> WU\n");
797 			l_ptr->state = WORKING_UNKNOWN;
798 			l_ptr->fsm_msg_cnt = 0;
799 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
800 			l_ptr->fsm_msg_cnt++;
801 			link_set_timer(l_ptr, cont_intv / 4);
802 			break;
803 		case RESET_MSG:
804 			dbg_link("RES -> RR\n");
805 			info("Resetting link <%s>, requested by peer\n",
806 			     l_ptr->name);
807 			tipc_link_reset(l_ptr);
808 			l_ptr->state = RESET_RESET;
809 			l_ptr->fsm_msg_cnt = 0;
810 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
811 			l_ptr->fsm_msg_cnt++;
812 			link_set_timer(l_ptr, cont_intv);
813 			break;
814 		default:
815 			err("Unknown link event %u in WW state\n", event);
816 		}
817 		break;
818 	case WORKING_UNKNOWN:
819 		dbg_link("WU/");
820 		switch (event) {
821 		case TRAFFIC_MSG_EVT:
822 			dbg_link("TRF-");
823 		case ACTIVATE_MSG:
824 			dbg_link("ACT -> WW\n");
825 			l_ptr->state = WORKING_WORKING;
826 			l_ptr->fsm_msg_cnt = 0;
827 			link_set_timer(l_ptr, cont_intv);
828 			break;
829 		case RESET_MSG:
830 			dbg_link("RES -> RR\n");
831 			info("Resetting link <%s>, requested by peer "
832 			     "while probing\n", l_ptr->name);
833 			tipc_link_reset(l_ptr);
834 			l_ptr->state = RESET_RESET;
835 			l_ptr->fsm_msg_cnt = 0;
836 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
837 			l_ptr->fsm_msg_cnt++;
838 			link_set_timer(l_ptr, cont_intv);
839 			break;
840 		case TIMEOUT_EVT:
841 			dbg_link("TIM ");
842 			if (l_ptr->next_in_no != l_ptr->checkpoint) {
843 				dbg_link("-> WW\n");
844 				l_ptr->state = WORKING_WORKING;
845 				l_ptr->fsm_msg_cnt = 0;
846 				l_ptr->checkpoint = l_ptr->next_in_no;
847 				if (tipc_bclink_acks_missing(l_ptr->owner)) {
848 					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
849 								 0, 0, 0, 0, 0);
850 					l_ptr->fsm_msg_cnt++;
851 				}
852 				link_set_timer(l_ptr, cont_intv);
853 			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
854 				dbg_link("Probing %u/%u,timer = %u ms)\n",
855 					 l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
856 					 cont_intv / 4);
857 				tipc_link_send_proto_msg(l_ptr, STATE_MSG,
858 							 1, 0, 0, 0, 0);
859 				l_ptr->fsm_msg_cnt++;
860 				link_set_timer(l_ptr, cont_intv / 4);
861 			} else {	/* Link has failed */
862 				dbg_link("-> RU (%u probes unanswered)\n",
863 					 l_ptr->fsm_msg_cnt);
864 				warn("Resetting link <%s>, peer not responding\n",
865 				     l_ptr->name);
866 				tipc_link_reset(l_ptr);
867 				l_ptr->state = RESET_UNKNOWN;
868 				l_ptr->fsm_msg_cnt = 0;
869 				tipc_link_send_proto_msg(l_ptr, RESET_MSG,
870 							 0, 0, 0, 0, 0);
871 				l_ptr->fsm_msg_cnt++;
872 				link_set_timer(l_ptr, cont_intv);
873 			}
874 			break;
875 		default:
876 			err("Unknown link event %u in WU state\n", event);
877 		}
878 		break;
879 	case RESET_UNKNOWN:
880 		dbg_link("RU/");
881 		switch (event) {
882 		case TRAFFIC_MSG_EVT:
883 			dbg_link("TRF-\n");
884 			break;
885 		case ACTIVATE_MSG:
886 			other = l_ptr->owner->active_links[0];
887 			if (other && link_working_unknown(other)) {
888 				dbg_link("ACT\n");
889 				break;
890 			}
891 			dbg_link("ACT -> WW\n");
892 			l_ptr->state = WORKING_WORKING;
893 			l_ptr->fsm_msg_cnt = 0;
894 			link_activate(l_ptr);
895 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
896 			l_ptr->fsm_msg_cnt++;
897 			link_set_timer(l_ptr, cont_intv);
898 			break;
899 		case RESET_MSG:
900 			dbg_link("RES\n");
901 			dbg_link(" -> RR\n");
902 			l_ptr->state = RESET_RESET;
903 			l_ptr->fsm_msg_cnt = 0;
904 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
905 			l_ptr->fsm_msg_cnt++;
906 			link_set_timer(l_ptr, cont_intv);
907 			break;
908 		case STARTING_EVT:
909 			dbg_link("START-");
910 			l_ptr->started = 1;
911 			/* fall through */
912 		case TIMEOUT_EVT:
913 			dbg_link("TIM\n");
914 			tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
915 			l_ptr->fsm_msg_cnt++;
916 			link_set_timer(l_ptr, cont_intv);
917 			break;
918 		default:
919 			err("Unknown link event %u in RU state\n", event);
920 		}
921 		break;
922 	case RESET_RESET:
923 		dbg_link("RR/ ");
924 		switch (event) {
925 		case TRAFFIC_MSG_EVT:
926 			dbg_link("TRF-");
927 			/* fall through */
928 		case ACTIVATE_MSG:
929 			other = l_ptr->owner->active_links[0];
930 			if (other && link_working_unknown(other)) {
931 				dbg_link("ACT\n");
932 				break;
933 			}
934 			dbg_link("ACT -> WW\n");
935 			l_ptr->state = WORKING_WORKING;
936 			l_ptr->fsm_msg_cnt = 0;
937 			link_activate(l_ptr);
938 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
939 			l_ptr->fsm_msg_cnt++;
940 			link_set_timer(l_ptr, cont_intv);
941 			break;
942 		case RESET_MSG:
943 			dbg_link("RES\n");
944 			break;
945 		case TIMEOUT_EVT:
946 			dbg_link("TIM\n");
947 			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
948 			l_ptr->fsm_msg_cnt++;
949 			link_set_timer(l_ptr, cont_intv);
950 			dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
951 			break;
952 		default:
953 			err("Unknown link event %u in RR state\n", event);
954 		}
955 		break;
956 	default:
957 		err("Unknown link state %u/%u\n", l_ptr->state, event);
958 	}
959 }
960 
961 /*
962  * link_bundle_buf(): Append contents of a buffer to
963  * the tail of an existing one.
964  */
965 
966 static int link_bundle_buf(struct link *l_ptr,
967 			   struct sk_buff *bundler,
968 			   struct sk_buff *buf)
969 {
970 	struct tipc_msg *bundler_msg = buf_msg(bundler);
971 	struct tipc_msg *msg = buf_msg(buf);
972 	u32 size = msg_size(msg);
973 	u32 bundle_size = msg_size(bundler_msg);
974 	u32 to_pos = align(bundle_size);
975 	u32 pad = to_pos - bundle_size;
976 
977 	if (msg_user(bundler_msg) != MSG_BUNDLER)
978 		return 0;
979 	if (msg_type(bundler_msg) != OPEN_MSG)
980 		return 0;
981 	if (skb_tailroom(bundler) < (pad + size))
982 		return 0;
983 	if (l_ptr->max_pkt < (to_pos + size))
984 		return 0;
985 
986 	skb_put(bundler, pad + size);
987 	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
988 	msg_set_size(bundler_msg, to_pos + size);
989 	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
990 	dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
991 	    msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
992 	msg_dbg(msg, "PACKD:");
993 	buf_discard(buf);
994 	l_ptr->stats.sent_bundled++;
995 	return 1;
996 }
997 
998 static void link_add_to_outqueue(struct link *l_ptr,
999 				 struct sk_buff *buf,
1000 				 struct tipc_msg *msg)
1001 {
1002 	u32 ack = mod(l_ptr->next_in_no - 1);
1003 	u32 seqno = mod(l_ptr->next_out_no++);
1004 
1005 	msg_set_word(msg, 2, ((ack << 16) | seqno));
1006 	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1007 	buf->next = NULL;
1008 	if (l_ptr->first_out) {
1009 		l_ptr->last_out->next = buf;
1010 		l_ptr->last_out = buf;
1011 	} else
1012 		l_ptr->first_out = l_ptr->last_out = buf;
1013 	l_ptr->out_queue_size++;
1014 }
1015 
1016 /*
1017  * tipc_link_send_buf() is the 'full path' for messages, called from
1018  * inside TIPC when the 'fast path' in tipc_send_buf
1019  * has failed, and from link_send()
1020  */
1021 
1022 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
1023 {
1024 	struct tipc_msg *msg = buf_msg(buf);
1025 	u32 size = msg_size(msg);
1026 	u32 dsz = msg_data_sz(msg);
1027 	u32 queue_size = l_ptr->out_queue_size;
1028 	u32 imp = tipc_msg_tot_importance(msg);
1029 	u32 queue_limit = l_ptr->queue_limit[imp];
1030 	u32 max_packet = l_ptr->max_pkt;
1031 
1032 	msg_set_prevnode(msg, tipc_own_addr);	/* If routed message */
1033 
1034 	/* Match msg importance against queue limits: */
1035 
1036 	if (unlikely(queue_size >= queue_limit)) {
1037 		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
1038 			return link_schedule_port(l_ptr, msg_origport(msg),
1039 						  size);
1040 		}
1041 		msg_dbg(msg, "TIPC: Congestion, throwing away\n");
1042 		buf_discard(buf);
1043 		if (imp > CONN_MANAGER) {
1044 			warn("Resetting link <%s>, send queue full", l_ptr->name);
1045 			tipc_link_reset(l_ptr);
1046 		}
1047 		return dsz;
1048 	}
1049 
1050 	/* Fragmentation needed ? */
1051 
1052 	if (size > max_packet)
1053 		return tipc_link_send_long_buf(l_ptr, buf);
1054 
1055 	/* Packet can be queued or sent: */
1056 
1057 	if (queue_size > l_ptr->stats.max_queue_sz)
1058 		l_ptr->stats.max_queue_sz = queue_size;
1059 
1060 	if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1061 		   !link_congested(l_ptr))) {
1062 		link_add_to_outqueue(l_ptr, buf, msg);
1063 
1064 		if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1065 			l_ptr->unacked_window = 0;
1066 		} else {
1067 			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1068 			l_ptr->stats.bearer_congs++;
1069 			l_ptr->next_out = buf;
1070 		}
1071 		return dsz;
1072 	}
1073 	/* Congestion: can message be bundled ?: */
1074 
1075 	if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1076 	    (msg_user(msg) != MSG_FRAGMENTER)) {
1077 
1078 		/* Try adding message to an existing bundle */
1079 
1080 		if (l_ptr->next_out &&
1081 		    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1082 			tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1083 			return dsz;
1084 		}
1085 
1086 		/* Try creating a new bundle */
1087 
1088 		if (size <= max_packet * 2 / 3) {
1089 			struct sk_buff *bundler = buf_acquire(max_packet);
1090 			struct tipc_msg bundler_hdr;
1091 
1092 			if (bundler) {
1093 				tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1094 					 INT_H_SIZE, l_ptr->addr);
1095 				skb_copy_to_linear_data(bundler, &bundler_hdr,
1096 							INT_H_SIZE);
1097 				skb_trim(bundler, INT_H_SIZE);
1098 				link_bundle_buf(l_ptr, bundler, buf);
1099 				buf = bundler;
1100 				msg = buf_msg(buf);
1101 				l_ptr->stats.sent_bundles++;
1102 			}
1103 		}
1104 	}
1105 	if (!l_ptr->next_out)
1106 		l_ptr->next_out = buf;
1107 	link_add_to_outqueue(l_ptr, buf, msg);
1108 	tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1109 	return dsz;
1110 }
1111 
1112 /*
1113  * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1114  * not been selected yet, and the the owner node is not locked
1115  * Called by TIPC internal users, e.g. the name distributor
1116  */
1117 
1118 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1119 {
1120 	struct link *l_ptr;
1121 	struct tipc_node *n_ptr;
1122 	int res = -ELINKCONG;
1123 
1124 	read_lock_bh(&tipc_net_lock);
1125 	n_ptr = tipc_node_select(dest, selector);
1126 	if (n_ptr) {
1127 		tipc_node_lock(n_ptr);
1128 		l_ptr = n_ptr->active_links[selector & 1];
1129 		if (l_ptr) {
1130 			dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1131 			res = tipc_link_send_buf(l_ptr, buf);
1132 		} else {
1133 			dbg("Attempt to send msg to unreachable node:\n");
1134 			msg_dbg(buf_msg(buf),">>>");
1135 			buf_discard(buf);
1136 		}
1137 		tipc_node_unlock(n_ptr);
1138 	} else {
1139 		dbg("Attempt to send msg to unknown node:\n");
1140 		msg_dbg(buf_msg(buf),">>>");
1141 		buf_discard(buf);
1142 	}
1143 	read_unlock_bh(&tipc_net_lock);
1144 	return res;
1145 }
1146 
1147 /*
1148  * link_send_buf_fast: Entry for data messages where the
1149  * destination link is known and the header is complete,
1150  * inclusive total message length. Very time critical.
1151  * Link is locked. Returns user data length.
1152  */
1153 
1154 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1155 			      u32 *used_max_pkt)
1156 {
1157 	struct tipc_msg *msg = buf_msg(buf);
1158 	int res = msg_data_sz(msg);
1159 
1160 	if (likely(!link_congested(l_ptr))) {
1161 		if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1162 			if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1163 				link_add_to_outqueue(l_ptr, buf, msg);
1164 				if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1165 							    &l_ptr->media_addr))) {
1166 					l_ptr->unacked_window = 0;
1167 					msg_dbg(msg,"SENT_FAST:");
1168 					return res;
1169 				}
1170 				dbg("failed sent fast...\n");
1171 				tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1172 				l_ptr->stats.bearer_congs++;
1173 				l_ptr->next_out = buf;
1174 				return res;
1175 			}
1176 		}
1177 		else
1178 			*used_max_pkt = l_ptr->max_pkt;
1179 	}
1180 	return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1181 }
1182 
1183 /*
1184  * tipc_send_buf_fast: Entry for data messages where the
1185  * destination node is known and the header is complete,
1186  * inclusive total message length.
1187  * Returns user data length.
1188  */
1189 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1190 {
1191 	struct link *l_ptr;
1192 	struct tipc_node *n_ptr;
1193 	int res;
1194 	u32 selector = msg_origport(buf_msg(buf)) & 1;
1195 	u32 dummy;
1196 
1197 	if (destnode == tipc_own_addr)
1198 		return tipc_port_recv_msg(buf);
1199 
1200 	read_lock_bh(&tipc_net_lock);
1201 	n_ptr = tipc_node_select(destnode, selector);
1202 	if (likely(n_ptr)) {
1203 		tipc_node_lock(n_ptr);
1204 		l_ptr = n_ptr->active_links[selector];
1205 		dbg("send_fast: buf %x selected %x, destnode = %x\n",
1206 		    buf, l_ptr, destnode);
1207 		if (likely(l_ptr)) {
1208 			res = link_send_buf_fast(l_ptr, buf, &dummy);
1209 			tipc_node_unlock(n_ptr);
1210 			read_unlock_bh(&tipc_net_lock);
1211 			return res;
1212 		}
1213 		tipc_node_unlock(n_ptr);
1214 	}
1215 	read_unlock_bh(&tipc_net_lock);
1216 	res = msg_data_sz(buf_msg(buf));
1217 	tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1218 	return res;
1219 }
1220 
1221 
1222 /*
1223  * tipc_link_send_sections_fast: Entry for messages where the
1224  * destination processor is known and the header is complete,
1225  * except for total message length.
1226  * Returns user data length or errno.
1227  */
1228 int tipc_link_send_sections_fast(struct port *sender,
1229 				 struct iovec const *msg_sect,
1230 				 const u32 num_sect,
1231 				 u32 destaddr)
1232 {
1233 	struct tipc_msg *hdr = &sender->publ.phdr;
1234 	struct link *l_ptr;
1235 	struct sk_buff *buf;
1236 	struct tipc_node *node;
1237 	int res;
1238 	u32 selector = msg_origport(hdr) & 1;
1239 
1240 again:
1241 	/*
1242 	 * Try building message using port's max_pkt hint.
1243 	 * (Must not hold any locks while building message.)
1244 	 */
1245 
1246 	res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt,
1247 			!sender->user_port, &buf);
1248 
1249 	read_lock_bh(&tipc_net_lock);
1250 	node = tipc_node_select(destaddr, selector);
1251 	if (likely(node)) {
1252 		tipc_node_lock(node);
1253 		l_ptr = node->active_links[selector];
1254 		if (likely(l_ptr)) {
1255 			if (likely(buf)) {
1256 				res = link_send_buf_fast(l_ptr, buf,
1257 							 &sender->publ.max_pkt);
1258 				if (unlikely(res < 0))
1259 					buf_discard(buf);
1260 exit:
1261 				tipc_node_unlock(node);
1262 				read_unlock_bh(&tipc_net_lock);
1263 				return res;
1264 			}
1265 
1266 			/* Exit if build request was invalid */
1267 
1268 			if (unlikely(res < 0))
1269 				goto exit;
1270 
1271 			/* Exit if link (or bearer) is congested */
1272 
1273 			if (link_congested(l_ptr) ||
1274 			    !list_empty(&l_ptr->b_ptr->cong_links)) {
1275 				res = link_schedule_port(l_ptr,
1276 							 sender->publ.ref, res);
1277 				goto exit;
1278 			}
1279 
1280 			/*
1281 			 * Message size exceeds max_pkt hint; update hint,
1282 			 * then re-try fast path or fragment the message
1283 			 */
1284 
1285 			sender->publ.max_pkt = l_ptr->max_pkt;
1286 			tipc_node_unlock(node);
1287 			read_unlock_bh(&tipc_net_lock);
1288 
1289 
1290 			if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt)
1291 				goto again;
1292 
1293 			return link_send_sections_long(sender, msg_sect,
1294 						       num_sect, destaddr);
1295 		}
1296 		tipc_node_unlock(node);
1297 	}
1298 	read_unlock_bh(&tipc_net_lock);
1299 
1300 	/* Couldn't find a link to the destination node */
1301 
1302 	if (buf)
1303 		return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1304 	if (res >= 0)
1305 		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1306 						 TIPC_ERR_NO_NODE);
1307 	return res;
1308 }
1309 
1310 /*
1311  * link_send_sections_long(): Entry for long messages where the
1312  * destination node is known and the header is complete,
1313  * inclusive total message length.
1314  * Link and bearer congestion status have been checked to be ok,
1315  * and are ignored if they change.
1316  *
1317  * Note that fragments do not use the full link MTU so that they won't have
1318  * to undergo refragmentation if link changeover causes them to be sent
1319  * over another link with an additional tunnel header added as prefix.
1320  * (Refragmentation will still occur if the other link has a smaller MTU.)
1321  *
1322  * Returns user data length or errno.
1323  */
1324 static int link_send_sections_long(struct port *sender,
1325 				   struct iovec const *msg_sect,
1326 				   u32 num_sect,
1327 				   u32 destaddr)
1328 {
1329 	struct link *l_ptr;
1330 	struct tipc_node *node;
1331 	struct tipc_msg *hdr = &sender->publ.phdr;
1332 	u32 dsz = msg_data_sz(hdr);
1333 	u32 max_pkt,fragm_sz,rest;
1334 	struct tipc_msg fragm_hdr;
1335 	struct sk_buff *buf,*buf_chain,*prev;
1336 	u32 fragm_crs,fragm_rest,hsz,sect_rest;
1337 	const unchar *sect_crs;
1338 	int curr_sect;
1339 	u32 fragm_no;
1340 
1341 again:
1342 	fragm_no = 1;
1343 	max_pkt = sender->publ.max_pkt - INT_H_SIZE;
1344 		/* leave room for tunnel header in case of link changeover */
1345 	fragm_sz = max_pkt - INT_H_SIZE;
1346 		/* leave room for fragmentation header in each fragment */
1347 	rest = dsz;
1348 	fragm_crs = 0;
1349 	fragm_rest = 0;
1350 	sect_rest = 0;
1351 	sect_crs = NULL;
1352 	curr_sect = -1;
1353 
1354 	/* Prepare reusable fragment header: */
1355 
1356 	msg_dbg(hdr, ">FRAGMENTING>");
1357 	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1358 		 INT_H_SIZE, msg_destnode(hdr));
1359 	msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1360 	msg_set_size(&fragm_hdr, max_pkt);
1361 	msg_set_fragm_no(&fragm_hdr, 1);
1362 
1363 	/* Prepare header of first fragment: */
1364 
1365 	buf_chain = buf = buf_acquire(max_pkt);
1366 	if (!buf)
1367 		return -ENOMEM;
1368 	buf->next = NULL;
1369 	skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1370 	hsz = msg_hdr_sz(hdr);
1371 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1372 	msg_dbg(buf_msg(buf), ">BUILD>");
1373 
1374 	/* Chop up message: */
1375 
1376 	fragm_crs = INT_H_SIZE + hsz;
1377 	fragm_rest = fragm_sz - hsz;
1378 
1379 	do {		/* For all sections */
1380 		u32 sz;
1381 
1382 		if (!sect_rest) {
1383 			sect_rest = msg_sect[++curr_sect].iov_len;
1384 			sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1385 		}
1386 
1387 		if (sect_rest < fragm_rest)
1388 			sz = sect_rest;
1389 		else
1390 			sz = fragm_rest;
1391 
1392 		if (likely(!sender->user_port)) {
1393 			if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1394 error:
1395 				for (; buf_chain; buf_chain = buf) {
1396 					buf = buf_chain->next;
1397 					buf_discard(buf_chain);
1398 				}
1399 				return -EFAULT;
1400 			}
1401 		} else
1402 			skb_copy_to_linear_data_offset(buf, fragm_crs,
1403 						       sect_crs, sz);
1404 		sect_crs += sz;
1405 		sect_rest -= sz;
1406 		fragm_crs += sz;
1407 		fragm_rest -= sz;
1408 		rest -= sz;
1409 
1410 		if (!fragm_rest && rest) {
1411 
1412 			/* Initiate new fragment: */
1413 			if (rest <= fragm_sz) {
1414 				fragm_sz = rest;
1415 				msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1416 			} else {
1417 				msg_set_type(&fragm_hdr, FRAGMENT);
1418 			}
1419 			msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1420 			msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1421 			prev = buf;
1422 			buf = buf_acquire(fragm_sz + INT_H_SIZE);
1423 			if (!buf)
1424 				goto error;
1425 
1426 			buf->next = NULL;
1427 			prev->next = buf;
1428 			skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1429 			fragm_crs = INT_H_SIZE;
1430 			fragm_rest = fragm_sz;
1431 			msg_dbg(buf_msg(buf),"  >BUILD>");
1432 		}
1433 	}
1434 	while (rest > 0);
1435 
1436 	/*
1437 	 * Now we have a buffer chain. Select a link and check
1438 	 * that packet size is still OK
1439 	 */
1440 	node = tipc_node_select(destaddr, sender->publ.ref & 1);
1441 	if (likely(node)) {
1442 		tipc_node_lock(node);
1443 		l_ptr = node->active_links[sender->publ.ref & 1];
1444 		if (!l_ptr) {
1445 			tipc_node_unlock(node);
1446 			goto reject;
1447 		}
1448 		if (l_ptr->max_pkt < max_pkt) {
1449 			sender->publ.max_pkt = l_ptr->max_pkt;
1450 			tipc_node_unlock(node);
1451 			for (; buf_chain; buf_chain = buf) {
1452 				buf = buf_chain->next;
1453 				buf_discard(buf_chain);
1454 			}
1455 			goto again;
1456 		}
1457 	} else {
1458 reject:
1459 		for (; buf_chain; buf_chain = buf) {
1460 			buf = buf_chain->next;
1461 			buf_discard(buf_chain);
1462 		}
1463 		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1464 						 TIPC_ERR_NO_NODE);
1465 	}
1466 
1467 	/* Append whole chain to send queue: */
1468 
1469 	buf = buf_chain;
1470 	l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1471 	if (!l_ptr->next_out)
1472 		l_ptr->next_out = buf_chain;
1473 	l_ptr->stats.sent_fragmented++;
1474 	while (buf) {
1475 		struct sk_buff *next = buf->next;
1476 		struct tipc_msg *msg = buf_msg(buf);
1477 
1478 		l_ptr->stats.sent_fragments++;
1479 		msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1480 		link_add_to_outqueue(l_ptr, buf, msg);
1481 		msg_dbg(msg, ">ADD>");
1482 		buf = next;
1483 	}
1484 
1485 	/* Send it, if possible: */
1486 
1487 	tipc_link_push_queue(l_ptr);
1488 	tipc_node_unlock(node);
1489 	return dsz;
1490 }
1491 
1492 /*
1493  * tipc_link_push_packet: Push one unsent packet to the media
1494  */
1495 u32 tipc_link_push_packet(struct link *l_ptr)
1496 {
1497 	struct sk_buff *buf = l_ptr->first_out;
1498 	u32 r_q_size = l_ptr->retransm_queue_size;
1499 	u32 r_q_head = l_ptr->retransm_queue_head;
1500 
1501 	/* Step to position where retransmission failed, if any,    */
1502 	/* consider that buffers may have been released in meantime */
1503 
1504 	if (r_q_size && buf) {
1505 		u32 last = lesser(mod(r_q_head + r_q_size),
1506 				  link_last_sent(l_ptr));
1507 		u32 first = msg_seqno(buf_msg(buf));
1508 
1509 		while (buf && less(first, r_q_head)) {
1510 			first = mod(first + 1);
1511 			buf = buf->next;
1512 		}
1513 		l_ptr->retransm_queue_head = r_q_head = first;
1514 		l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1515 	}
1516 
1517 	/* Continue retransmission now, if there is anything: */
1518 
1519 	if (r_q_size && buf) {
1520 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1521 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1522 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1523 			msg_dbg(buf_msg(buf), ">DEF-RETR>");
1524 			l_ptr->retransm_queue_head = mod(++r_q_head);
1525 			l_ptr->retransm_queue_size = --r_q_size;
1526 			l_ptr->stats.retransmitted++;
1527 			return 0;
1528 		} else {
1529 			l_ptr->stats.bearer_congs++;
1530 			msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1531 			return PUSH_FAILED;
1532 		}
1533 	}
1534 
1535 	/* Send deferred protocol message, if any: */
1536 
1537 	buf = l_ptr->proto_msg_queue;
1538 	if (buf) {
1539 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1540 		msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1541 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1542 			msg_dbg(buf_msg(buf), ">DEF-PROT>");
1543 			l_ptr->unacked_window = 0;
1544 			buf_discard(buf);
1545 			l_ptr->proto_msg_queue = NULL;
1546 			return 0;
1547 		} else {
1548 			msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1549 			l_ptr->stats.bearer_congs++;
1550 			return PUSH_FAILED;
1551 		}
1552 	}
1553 
1554 	/* Send one deferred data message, if send window not full: */
1555 
1556 	buf = l_ptr->next_out;
1557 	if (buf) {
1558 		struct tipc_msg *msg = buf_msg(buf);
1559 		u32 next = msg_seqno(msg);
1560 		u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1561 
1562 		if (mod(next - first) < l_ptr->queue_limit[0]) {
1563 			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1564 			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1565 			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1566 				if (msg_user(msg) == MSG_BUNDLER)
1567 					msg_set_type(msg, CLOSED_MSG);
1568 				msg_dbg(msg, ">PUSH-DATA>");
1569 				l_ptr->next_out = buf->next;
1570 				return 0;
1571 			} else {
1572 				msg_dbg(msg, "|PUSH-DATA|");
1573 				l_ptr->stats.bearer_congs++;
1574 				return PUSH_FAILED;
1575 			}
1576 		}
1577 	}
1578 	return PUSH_FINISHED;
1579 }
1580 
1581 /*
1582  * push_queue(): push out the unsent messages of a link where
1583  *               congestion has abated. Node is locked
1584  */
1585 void tipc_link_push_queue(struct link *l_ptr)
1586 {
1587 	u32 res;
1588 
1589 	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1590 		return;
1591 
1592 	do {
1593 		res = tipc_link_push_packet(l_ptr);
1594 	} while (!res);
1595 
1596 	if (res == PUSH_FAILED)
1597 		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1598 }
1599 
1600 static void link_reset_all(unsigned long addr)
1601 {
1602 	struct tipc_node *n_ptr;
1603 	char addr_string[16];
1604 	u32 i;
1605 
1606 	read_lock_bh(&tipc_net_lock);
1607 	n_ptr = tipc_node_find((u32)addr);
1608 	if (!n_ptr) {
1609 		read_unlock_bh(&tipc_net_lock);
1610 		return;	/* node no longer exists */
1611 	}
1612 
1613 	tipc_node_lock(n_ptr);
1614 
1615 	warn("Resetting all links to %s\n",
1616 	     tipc_addr_string_fill(addr_string, n_ptr->addr));
1617 
1618 	for (i = 0; i < MAX_BEARERS; i++) {
1619 		if (n_ptr->links[i]) {
1620 			link_print(n_ptr->links[i], TIPC_OUTPUT,
1621 				   "Resetting link\n");
1622 			tipc_link_reset(n_ptr->links[i]);
1623 		}
1624 	}
1625 
1626 	tipc_node_unlock(n_ptr);
1627 	read_unlock_bh(&tipc_net_lock);
1628 }
1629 
1630 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1631 {
1632 	struct tipc_msg *msg = buf_msg(buf);
1633 
1634 	warn("Retransmission failure on link <%s>\n", l_ptr->name);
1635 	tipc_msg_dbg(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1636 
1637 	if (l_ptr->addr) {
1638 
1639 		/* Handle failure on standard link */
1640 
1641 		link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1642 		tipc_link_reset(l_ptr);
1643 
1644 	} else {
1645 
1646 		/* Handle failure on broadcast link */
1647 
1648 		struct tipc_node *n_ptr;
1649 		char addr_string[16];
1650 
1651 		tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1652 		tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1653 				     (unsigned long) TIPC_SKB_CB(buf)->handle);
1654 
1655 		n_ptr = l_ptr->owner->next;
1656 		tipc_node_lock(n_ptr);
1657 
1658 		tipc_addr_string_fill(addr_string, n_ptr->addr);
1659 		tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1660 		tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1661 		tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1662 		tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1663 		tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1664 		tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1665 		tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1666 
1667 		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1668 
1669 		tipc_node_unlock(n_ptr);
1670 
1671 		l_ptr->stale_count = 0;
1672 	}
1673 }
1674 
1675 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1676 			  u32 retransmits)
1677 {
1678 	struct tipc_msg *msg;
1679 
1680 	if (!buf)
1681 		return;
1682 
1683 	msg = buf_msg(buf);
1684 
1685 	dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1686 
1687 	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1688 		if (l_ptr->retransm_queue_size == 0) {
1689 			msg_dbg(msg, ">NO_RETR->BCONG>");
1690 			dbg_print_link(l_ptr, "   ");
1691 			l_ptr->retransm_queue_head = msg_seqno(msg);
1692 			l_ptr->retransm_queue_size = retransmits;
1693 		} else {
1694 			err("Unexpected retransmit on link %s (qsize=%d)\n",
1695 			    l_ptr->name, l_ptr->retransm_queue_size);
1696 		}
1697 		return;
1698 	} else {
1699 		/* Detect repeated retransmit failures on uncongested bearer */
1700 
1701 		if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1702 			if (++l_ptr->stale_count > 100) {
1703 				link_retransmit_failure(l_ptr, buf);
1704 				return;
1705 			}
1706 		} else {
1707 			l_ptr->last_retransmitted = msg_seqno(msg);
1708 			l_ptr->stale_count = 1;
1709 		}
1710 	}
1711 
1712 	while (retransmits && (buf != l_ptr->next_out) && buf) {
1713 		msg = buf_msg(buf);
1714 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1715 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1716 		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1717 			msg_dbg(buf_msg(buf), ">RETR>");
1718 			buf = buf->next;
1719 			retransmits--;
1720 			l_ptr->stats.retransmitted++;
1721 		} else {
1722 			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1723 			l_ptr->stats.bearer_congs++;
1724 			l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1725 			l_ptr->retransm_queue_size = retransmits;
1726 			return;
1727 		}
1728 	}
1729 
1730 	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1731 }
1732 
1733 /**
1734  * link_insert_deferred_queue - insert deferred messages back into receive chain
1735  */
1736 
1737 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1738 						  struct sk_buff *buf)
1739 {
1740 	u32 seq_no;
1741 
1742 	if (l_ptr->oldest_deferred_in == NULL)
1743 		return buf;
1744 
1745 	seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1746 	if (seq_no == mod(l_ptr->next_in_no)) {
1747 		l_ptr->newest_deferred_in->next = buf;
1748 		buf = l_ptr->oldest_deferred_in;
1749 		l_ptr->oldest_deferred_in = NULL;
1750 		l_ptr->deferred_inqueue_sz = 0;
1751 	}
1752 	return buf;
1753 }
1754 
1755 /**
1756  * link_recv_buf_validate - validate basic format of received message
1757  *
1758  * This routine ensures a TIPC message has an acceptable header, and at least
1759  * as much data as the header indicates it should.  The routine also ensures
1760  * that the entire message header is stored in the main fragment of the message
1761  * buffer, to simplify future access to message header fields.
1762  *
1763  * Note: Having extra info present in the message header or data areas is OK.
1764  * TIPC will ignore the excess, under the assumption that it is optional info
1765  * introduced by a later release of the protocol.
1766  */
1767 
1768 static int link_recv_buf_validate(struct sk_buff *buf)
1769 {
1770 	static u32 min_data_hdr_size[8] = {
1771 		SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE,
1772 		MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1773 		};
1774 
1775 	struct tipc_msg *msg;
1776 	u32 tipc_hdr[2];
1777 	u32 size;
1778 	u32 hdr_size;
1779 	u32 min_hdr_size;
1780 
1781 	if (unlikely(buf->len < MIN_H_SIZE))
1782 		return 0;
1783 
1784 	msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1785 	if (msg == NULL)
1786 		return 0;
1787 
1788 	if (unlikely(msg_version(msg) != TIPC_VERSION))
1789 		return 0;
1790 
1791 	size = msg_size(msg);
1792 	hdr_size = msg_hdr_sz(msg);
1793 	min_hdr_size = msg_isdata(msg) ?
1794 		min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1795 
1796 	if (unlikely((hdr_size < min_hdr_size) ||
1797 		     (size < hdr_size) ||
1798 		     (buf->len < size) ||
1799 		     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1800 		return 0;
1801 
1802 	return pskb_may_pull(buf, hdr_size);
1803 }
1804 
1805 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1806 {
1807 	read_lock_bh(&tipc_net_lock);
1808 	while (head) {
1809 		struct bearer *b_ptr = (struct bearer *)tb_ptr;
1810 		struct tipc_node *n_ptr;
1811 		struct link *l_ptr;
1812 		struct sk_buff *crs;
1813 		struct sk_buff *buf = head;
1814 		struct tipc_msg *msg;
1815 		u32 seq_no;
1816 		u32 ackd;
1817 		u32 released = 0;
1818 		int type;
1819 
1820 		head = head->next;
1821 
1822 		/* Ensure message is well-formed */
1823 
1824 		if (unlikely(!link_recv_buf_validate(buf)))
1825 			goto cont;
1826 
1827 		/* Ensure message data is a single contiguous unit */
1828 
1829 		if (unlikely(buf_linearize(buf))) {
1830 			goto cont;
1831 		}
1832 
1833 		/* Handle arrival of a non-unicast link message */
1834 
1835 		msg = buf_msg(buf);
1836 
1837 		if (unlikely(msg_non_seq(msg))) {
1838 			if (msg_user(msg) ==  LINK_CONFIG)
1839 				tipc_disc_recv_msg(buf, b_ptr);
1840 			else
1841 				tipc_bclink_recv_pkt(buf);
1842 			continue;
1843 		}
1844 
1845 		if (unlikely(!msg_short(msg) &&
1846 			     (msg_destnode(msg) != tipc_own_addr)))
1847 			goto cont;
1848 
1849 		/* Discard non-routeable messages destined for another node */
1850 
1851 		if (unlikely(!msg_isdata(msg) &&
1852 			     (msg_destnode(msg) != tipc_own_addr))) {
1853 			if ((msg_user(msg) != CONN_MANAGER) &&
1854 			    (msg_user(msg) != MSG_FRAGMENTER))
1855 				goto cont;
1856 		}
1857 
1858 		/* Locate unicast link endpoint that should handle message */
1859 
1860 		n_ptr = tipc_node_find(msg_prevnode(msg));
1861 		if (unlikely(!n_ptr))
1862 			goto cont;
1863 		tipc_node_lock(n_ptr);
1864 
1865 		l_ptr = n_ptr->links[b_ptr->identity];
1866 		if (unlikely(!l_ptr)) {
1867 			tipc_node_unlock(n_ptr);
1868 			goto cont;
1869 		}
1870 
1871 		/* Validate message sequence number info */
1872 
1873 		seq_no = msg_seqno(msg);
1874 		ackd = msg_ack(msg);
1875 
1876 		/* Release acked messages */
1877 
1878 		if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1879 			if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1880 				tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1881 		}
1882 
1883 		crs = l_ptr->first_out;
1884 		while ((crs != l_ptr->next_out) &&
1885 		       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1886 			struct sk_buff *next = crs->next;
1887 
1888 			buf_discard(crs);
1889 			crs = next;
1890 			released++;
1891 		}
1892 		if (released) {
1893 			l_ptr->first_out = crs;
1894 			l_ptr->out_queue_size -= released;
1895 		}
1896 
1897 		/* Try sending any messages link endpoint has pending */
1898 
1899 		if (unlikely(l_ptr->next_out))
1900 			tipc_link_push_queue(l_ptr);
1901 		if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1902 			tipc_link_wakeup_ports(l_ptr, 0);
1903 		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1904 			l_ptr->stats.sent_acks++;
1905 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1906 		}
1907 
1908 		/* Now (finally!) process the incoming message */
1909 
1910 protocol_check:
1911 		if (likely(link_working_working(l_ptr))) {
1912 			if (likely(seq_no == mod(l_ptr->next_in_no))) {
1913 				l_ptr->next_in_no++;
1914 				if (unlikely(l_ptr->oldest_deferred_in))
1915 					head = link_insert_deferred_queue(l_ptr,
1916 									  head);
1917 				if (likely(msg_is_dest(msg, tipc_own_addr))) {
1918 deliver:
1919 					if (likely(msg_isdata(msg))) {
1920 						tipc_node_unlock(n_ptr);
1921 						tipc_port_recv_msg(buf);
1922 						continue;
1923 					}
1924 					switch (msg_user(msg)) {
1925 					case MSG_BUNDLER:
1926 						l_ptr->stats.recv_bundles++;
1927 						l_ptr->stats.recv_bundled +=
1928 							msg_msgcnt(msg);
1929 						tipc_node_unlock(n_ptr);
1930 						tipc_link_recv_bundle(buf);
1931 						continue;
1932 					case ROUTE_DISTRIBUTOR:
1933 						tipc_node_unlock(n_ptr);
1934 						tipc_cltr_recv_routing_table(buf);
1935 						continue;
1936 					case NAME_DISTRIBUTOR:
1937 						tipc_node_unlock(n_ptr);
1938 						tipc_named_recv(buf);
1939 						continue;
1940 					case CONN_MANAGER:
1941 						tipc_node_unlock(n_ptr);
1942 						tipc_port_recv_proto_msg(buf);
1943 						continue;
1944 					case MSG_FRAGMENTER:
1945 						l_ptr->stats.recv_fragments++;
1946 						if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1947 									    &buf, &msg)) {
1948 							l_ptr->stats.recv_fragmented++;
1949 							goto deliver;
1950 						}
1951 						break;
1952 					case CHANGEOVER_PROTOCOL:
1953 						type = msg_type(msg);
1954 						if (link_recv_changeover_msg(&l_ptr, &buf)) {
1955 							msg = buf_msg(buf);
1956 							seq_no = msg_seqno(msg);
1957 							if (type == ORIGINAL_MSG)
1958 								goto deliver;
1959 							goto protocol_check;
1960 						}
1961 						break;
1962 					}
1963 				}
1964 				tipc_node_unlock(n_ptr);
1965 				tipc_net_route_msg(buf);
1966 				continue;
1967 			}
1968 			link_handle_out_of_seq_msg(l_ptr, buf);
1969 			head = link_insert_deferred_queue(l_ptr, head);
1970 			tipc_node_unlock(n_ptr);
1971 			continue;
1972 		}
1973 
1974 		if (msg_user(msg) == LINK_PROTOCOL) {
1975 			link_recv_proto_msg(l_ptr, buf);
1976 			head = link_insert_deferred_queue(l_ptr, head);
1977 			tipc_node_unlock(n_ptr);
1978 			continue;
1979 		}
1980 		msg_dbg(msg,"NSEQ<REC<");
1981 		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1982 
1983 		if (link_working_working(l_ptr)) {
1984 			/* Re-insert in front of queue */
1985 			msg_dbg(msg,"RECV-REINS:");
1986 			buf->next = head;
1987 			head = buf;
1988 			tipc_node_unlock(n_ptr);
1989 			continue;
1990 		}
1991 		tipc_node_unlock(n_ptr);
1992 cont:
1993 		buf_discard(buf);
1994 	}
1995 	read_unlock_bh(&tipc_net_lock);
1996 }
1997 
1998 /*
1999  * link_defer_buf(): Sort a received out-of-sequence packet
2000  *                   into the deferred reception queue.
2001  * Returns the increase of the queue length,i.e. 0 or 1
2002  */
2003 
2004 u32 tipc_link_defer_pkt(struct sk_buff **head,
2005 			struct sk_buff **tail,
2006 			struct sk_buff *buf)
2007 {
2008 	struct sk_buff *prev = NULL;
2009 	struct sk_buff *crs = *head;
2010 	u32 seq_no = msg_seqno(buf_msg(buf));
2011 
2012 	buf->next = NULL;
2013 
2014 	/* Empty queue ? */
2015 	if (*head == NULL) {
2016 		*head = *tail = buf;
2017 		return 1;
2018 	}
2019 
2020 	/* Last ? */
2021 	if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
2022 		(*tail)->next = buf;
2023 		*tail = buf;
2024 		return 1;
2025 	}
2026 
2027 	/* Scan through queue and sort it in */
2028 	do {
2029 		struct tipc_msg *msg = buf_msg(crs);
2030 
2031 		if (less(seq_no, msg_seqno(msg))) {
2032 			buf->next = crs;
2033 			if (prev)
2034 				prev->next = buf;
2035 			else
2036 				*head = buf;
2037 			return 1;
2038 		}
2039 		if (seq_no == msg_seqno(msg)) {
2040 			break;
2041 		}
2042 		prev = crs;
2043 		crs = crs->next;
2044 	}
2045 	while (crs);
2046 
2047 	/* Message is a duplicate of an existing message */
2048 
2049 	buf_discard(buf);
2050 	return 0;
2051 }
2052 
2053 /**
2054  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
2055  */
2056 
2057 static void link_handle_out_of_seq_msg(struct link *l_ptr,
2058 				       struct sk_buff *buf)
2059 {
2060 	u32 seq_no = msg_seqno(buf_msg(buf));
2061 
2062 	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
2063 		link_recv_proto_msg(l_ptr, buf);
2064 		return;
2065 	}
2066 
2067 	dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
2068 	    seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
2069 
2070 	/* Record OOS packet arrival (force mismatch on next timeout) */
2071 
2072 	l_ptr->checkpoint--;
2073 
2074 	/*
2075 	 * Discard packet if a duplicate; otherwise add it to deferred queue
2076 	 * and notify peer of gap as per protocol specification
2077 	 */
2078 
2079 	if (less(seq_no, mod(l_ptr->next_in_no))) {
2080 		l_ptr->stats.duplicates++;
2081 		buf_discard(buf);
2082 		return;
2083 	}
2084 
2085 	if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
2086 				&l_ptr->newest_deferred_in, buf)) {
2087 		l_ptr->deferred_inqueue_sz++;
2088 		l_ptr->stats.deferred_recv++;
2089 		if ((l_ptr->deferred_inqueue_sz % 16) == 1)
2090 			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
2091 	} else
2092 		l_ptr->stats.duplicates++;
2093 }
2094 
2095 /*
2096  * Send protocol message to the other endpoint.
2097  */
2098 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2099 			      u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2100 {
2101 	struct sk_buff *buf = NULL;
2102 	struct tipc_msg *msg = l_ptr->pmsg;
2103 	u32 msg_size = sizeof(l_ptr->proto_msg);
2104 
2105 	if (link_blocked(l_ptr))
2106 		return;
2107 	msg_set_type(msg, msg_typ);
2108 	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2109 	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2110 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2111 
2112 	if (msg_typ == STATE_MSG) {
2113 		u32 next_sent = mod(l_ptr->next_out_no);
2114 
2115 		if (!tipc_link_is_up(l_ptr))
2116 			return;
2117 		if (l_ptr->next_out)
2118 			next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2119 		msg_set_next_sent(msg, next_sent);
2120 		if (l_ptr->oldest_deferred_in) {
2121 			u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2122 			gap = mod(rec - mod(l_ptr->next_in_no));
2123 		}
2124 		msg_set_seq_gap(msg, gap);
2125 		if (gap)
2126 			l_ptr->stats.sent_nacks++;
2127 		msg_set_link_tolerance(msg, tolerance);
2128 		msg_set_linkprio(msg, priority);
2129 		msg_set_max_pkt(msg, ack_mtu);
2130 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2131 		msg_set_probe(msg, probe_msg != 0);
2132 		if (probe_msg) {
2133 			u32 mtu = l_ptr->max_pkt;
2134 
2135 			if ((mtu < l_ptr->max_pkt_target) &&
2136 			    link_working_working(l_ptr) &&
2137 			    l_ptr->fsm_msg_cnt) {
2138 				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2139 				if (l_ptr->max_pkt_probes == 10) {
2140 					l_ptr->max_pkt_target = (msg_size - 4);
2141 					l_ptr->max_pkt_probes = 0;
2142 					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2143 				}
2144 				l_ptr->max_pkt_probes++;
2145 			}
2146 
2147 			l_ptr->stats.sent_probes++;
2148 		}
2149 		l_ptr->stats.sent_states++;
2150 	} else {		/* RESET_MSG or ACTIVATE_MSG */
2151 		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2152 		msg_set_seq_gap(msg, 0);
2153 		msg_set_next_sent(msg, 1);
2154 		msg_set_link_tolerance(msg, l_ptr->tolerance);
2155 		msg_set_linkprio(msg, l_ptr->priority);
2156 		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2157 	}
2158 
2159 	if (tipc_node_has_redundant_links(l_ptr->owner)) {
2160 		msg_set_redundant_link(msg);
2161 	} else {
2162 		msg_clear_redundant_link(msg);
2163 	}
2164 	msg_set_linkprio(msg, l_ptr->priority);
2165 
2166 	/* Ensure sequence number will not fit : */
2167 
2168 	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2169 
2170 	/* Congestion? */
2171 
2172 	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2173 		if (!l_ptr->proto_msg_queue) {
2174 			l_ptr->proto_msg_queue =
2175 				buf_acquire(sizeof(l_ptr->proto_msg));
2176 		}
2177 		buf = l_ptr->proto_msg_queue;
2178 		if (!buf)
2179 			return;
2180 		skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2181 		return;
2182 	}
2183 	msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2184 
2185 	/* Message can be sent */
2186 
2187 	msg_dbg(msg, ">>");
2188 
2189 	buf = buf_acquire(msg_size);
2190 	if (!buf)
2191 		return;
2192 
2193 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2194 	msg_set_size(buf_msg(buf), msg_size);
2195 
2196 	if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2197 		l_ptr->unacked_window = 0;
2198 		buf_discard(buf);
2199 		return;
2200 	}
2201 
2202 	/* New congestion */
2203 	tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2204 	l_ptr->proto_msg_queue = buf;
2205 	l_ptr->stats.bearer_congs++;
2206 }
2207 
2208 /*
2209  * Receive protocol message :
2210  * Note that network plane id propagates through the network, and may
2211  * change at any time. The node with lowest address rules
2212  */
2213 
2214 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2215 {
2216 	u32 rec_gap = 0;
2217 	u32 max_pkt_info;
2218 	u32 max_pkt_ack;
2219 	u32 msg_tol;
2220 	struct tipc_msg *msg = buf_msg(buf);
2221 
2222 	dbg("AT(%u):", jiffies_to_msecs(jiffies));
2223 	msg_dbg(msg, "<<");
2224 	if (link_blocked(l_ptr))
2225 		goto exit;
2226 
2227 	/* record unnumbered packet arrival (force mismatch on next timeout) */
2228 
2229 	l_ptr->checkpoint--;
2230 
2231 	if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2232 		if (tipc_own_addr > msg_prevnode(msg))
2233 			l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2234 
2235 	l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2236 
2237 	switch (msg_type(msg)) {
2238 
2239 	case RESET_MSG:
2240 		if (!link_working_unknown(l_ptr) &&
2241 		    (l_ptr->peer_session != INVALID_SESSION)) {
2242 			if (msg_session(msg) == l_ptr->peer_session) {
2243 				dbg("Duplicate RESET: %u<->%u\n",
2244 				    msg_session(msg), l_ptr->peer_session);
2245 				break; /* duplicate: ignore */
2246 			}
2247 		}
2248 		/* fall thru' */
2249 	case ACTIVATE_MSG:
2250 		/* Update link settings according other endpoint's values */
2251 
2252 		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2253 
2254 		if ((msg_tol = msg_link_tolerance(msg)) &&
2255 		    (msg_tol > l_ptr->tolerance))
2256 			link_set_supervision_props(l_ptr, msg_tol);
2257 
2258 		if (msg_linkprio(msg) > l_ptr->priority)
2259 			l_ptr->priority = msg_linkprio(msg);
2260 
2261 		max_pkt_info = msg_max_pkt(msg);
2262 		if (max_pkt_info) {
2263 			if (max_pkt_info < l_ptr->max_pkt_target)
2264 				l_ptr->max_pkt_target = max_pkt_info;
2265 			if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2266 				l_ptr->max_pkt = l_ptr->max_pkt_target;
2267 		} else {
2268 			l_ptr->max_pkt = l_ptr->max_pkt_target;
2269 		}
2270 		l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2271 
2272 		link_state_event(l_ptr, msg_type(msg));
2273 
2274 		l_ptr->peer_session = msg_session(msg);
2275 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
2276 
2277 		/* Synchronize broadcast sequence numbers */
2278 		if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2279 			l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2280 		}
2281 		break;
2282 	case STATE_MSG:
2283 
2284 		if ((msg_tol = msg_link_tolerance(msg)))
2285 			link_set_supervision_props(l_ptr, msg_tol);
2286 
2287 		if (msg_linkprio(msg) &&
2288 		    (msg_linkprio(msg) != l_ptr->priority)) {
2289 			warn("Resetting link <%s>, priority change %u->%u\n",
2290 			     l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2291 			l_ptr->priority = msg_linkprio(msg);
2292 			tipc_link_reset(l_ptr); /* Enforce change to take effect */
2293 			break;
2294 		}
2295 		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2296 		l_ptr->stats.recv_states++;
2297 		if (link_reset_unknown(l_ptr))
2298 			break;
2299 
2300 		if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2301 			rec_gap = mod(msg_next_sent(msg) -
2302 				      mod(l_ptr->next_in_no));
2303 		}
2304 
2305 		max_pkt_ack = msg_max_pkt(msg);
2306 		if (max_pkt_ack > l_ptr->max_pkt) {
2307 			dbg("Link <%s> updated MTU %u -> %u\n",
2308 			    l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2309 			l_ptr->max_pkt = max_pkt_ack;
2310 			l_ptr->max_pkt_probes = 0;
2311 		}
2312 
2313 		max_pkt_ack = 0;
2314 		if (msg_probe(msg)) {
2315 			l_ptr->stats.recv_probes++;
2316 			if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2317 				max_pkt_ack = msg_size(msg);
2318 			}
2319 		}
2320 
2321 		/* Protocol message before retransmits, reduce loss risk */
2322 
2323 		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2324 
2325 		if (rec_gap || (msg_probe(msg))) {
2326 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2327 						 0, rec_gap, 0, 0, max_pkt_ack);
2328 		}
2329 		if (msg_seq_gap(msg)) {
2330 			msg_dbg(msg, "With Gap:");
2331 			l_ptr->stats.recv_nacks++;
2332 			tipc_link_retransmit(l_ptr, l_ptr->first_out,
2333 					     msg_seq_gap(msg));
2334 		}
2335 		break;
2336 	default:
2337 		msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2338 	}
2339 exit:
2340 	buf_discard(buf);
2341 }
2342 
2343 
2344 /*
2345  * tipc_link_tunnel(): Send one message via a link belonging to
2346  * another bearer. Owner node is locked.
2347  */
2348 void tipc_link_tunnel(struct link *l_ptr,
2349 		      struct tipc_msg *tunnel_hdr,
2350 		      struct tipc_msg  *msg,
2351 		      u32 selector)
2352 {
2353 	struct link *tunnel;
2354 	struct sk_buff *buf;
2355 	u32 length = msg_size(msg);
2356 
2357 	tunnel = l_ptr->owner->active_links[selector & 1];
2358 	if (!tipc_link_is_up(tunnel)) {
2359 		warn("Link changeover error, "
2360 		     "tunnel link no longer available\n");
2361 		return;
2362 	}
2363 	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2364 	buf = buf_acquire(length + INT_H_SIZE);
2365 	if (!buf) {
2366 		warn("Link changeover error, "
2367 		     "unable to send tunnel msg\n");
2368 		return;
2369 	}
2370 	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2371 	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2372 	dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2373 	msg_dbg(buf_msg(buf), ">SEND>");
2374 	tipc_link_send_buf(tunnel, buf);
2375 }
2376 
2377 
2378 
2379 /*
2380  * changeover(): Send whole message queue via the remaining link
2381  *               Owner node is locked.
2382  */
2383 
2384 void tipc_link_changeover(struct link *l_ptr)
2385 {
2386 	u32 msgcount = l_ptr->out_queue_size;
2387 	struct sk_buff *crs = l_ptr->first_out;
2388 	struct link *tunnel = l_ptr->owner->active_links[0];
2389 	struct tipc_msg tunnel_hdr;
2390 	int split_bundles;
2391 
2392 	if (!tunnel)
2393 		return;
2394 
2395 	if (!l_ptr->owner->permit_changeover) {
2396 		warn("Link changeover error, "
2397 		     "peer did not permit changeover\n");
2398 		return;
2399 	}
2400 
2401 	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2402 		 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
2403 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2404 	msg_set_msgcnt(&tunnel_hdr, msgcount);
2405 	dbg("Link changeover requires %u tunnel messages\n", msgcount);
2406 
2407 	if (!l_ptr->first_out) {
2408 		struct sk_buff *buf;
2409 
2410 		buf = buf_acquire(INT_H_SIZE);
2411 		if (buf) {
2412 			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2413 			msg_set_size(&tunnel_hdr, INT_H_SIZE);
2414 			dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2415 			    tunnel->b_ptr->net_plane);
2416 			msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2417 			tipc_link_send_buf(tunnel, buf);
2418 		} else {
2419 			warn("Link changeover error, "
2420 			     "unable to send changeover msg\n");
2421 		}
2422 		return;
2423 	}
2424 
2425 	split_bundles = (l_ptr->owner->active_links[0] !=
2426 			 l_ptr->owner->active_links[1]);
2427 
2428 	while (crs) {
2429 		struct tipc_msg *msg = buf_msg(crs);
2430 
2431 		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2432 			struct tipc_msg *m = msg_get_wrapped(msg);
2433 			unchar* pos = (unchar*)m;
2434 
2435 			msgcount = msg_msgcnt(msg);
2436 			while (msgcount--) {
2437 				msg_set_seqno(m,msg_seqno(msg));
2438 				tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2439 						 msg_link_selector(m));
2440 				pos += align(msg_size(m));
2441 				m = (struct tipc_msg *)pos;
2442 			}
2443 		} else {
2444 			tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2445 					 msg_link_selector(msg));
2446 		}
2447 		crs = crs->next;
2448 	}
2449 }
2450 
2451 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2452 {
2453 	struct sk_buff *iter;
2454 	struct tipc_msg tunnel_hdr;
2455 
2456 	tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2457 		 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
2458 	msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2459 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2460 	iter = l_ptr->first_out;
2461 	while (iter) {
2462 		struct sk_buff *outbuf;
2463 		struct tipc_msg *msg = buf_msg(iter);
2464 		u32 length = msg_size(msg);
2465 
2466 		if (msg_user(msg) == MSG_BUNDLER)
2467 			msg_set_type(msg, CLOSED_MSG);
2468 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
2469 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2470 		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2471 		outbuf = buf_acquire(length + INT_H_SIZE);
2472 		if (outbuf == NULL) {
2473 			warn("Link changeover error, "
2474 			     "unable to send duplicate msg\n");
2475 			return;
2476 		}
2477 		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2478 		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2479 					       length);
2480 		dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2481 		    tunnel->b_ptr->net_plane);
2482 		msg_dbg(buf_msg(outbuf), ">SEND>");
2483 		tipc_link_send_buf(tunnel, outbuf);
2484 		if (!tipc_link_is_up(l_ptr))
2485 			return;
2486 		iter = iter->next;
2487 	}
2488 }
2489 
2490 
2491 
2492 /**
2493  * buf_extract - extracts embedded TIPC message from another message
2494  * @skb: encapsulating message buffer
2495  * @from_pos: offset to extract from
2496  *
2497  * Returns a new message buffer containing an embedded message.  The
2498  * encapsulating message itself is left unchanged.
2499  */
2500 
2501 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2502 {
2503 	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2504 	u32 size = msg_size(msg);
2505 	struct sk_buff *eb;
2506 
2507 	eb = buf_acquire(size);
2508 	if (eb)
2509 		skb_copy_to_linear_data(eb, msg, size);
2510 	return eb;
2511 }
2512 
2513 /*
2514  *  link_recv_changeover_msg(): Receive tunneled packet sent
2515  *  via other link. Node is locked. Return extracted buffer.
2516  */
2517 
2518 static int link_recv_changeover_msg(struct link **l_ptr,
2519 				    struct sk_buff **buf)
2520 {
2521 	struct sk_buff *tunnel_buf = *buf;
2522 	struct link *dest_link;
2523 	struct tipc_msg *msg;
2524 	struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2525 	u32 msg_typ = msg_type(tunnel_msg);
2526 	u32 msg_count = msg_msgcnt(tunnel_msg);
2527 
2528 	dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2529 	if (!dest_link) {
2530 		msg_dbg(tunnel_msg, "NOLINK/<REC<");
2531 		goto exit;
2532 	}
2533 	if (dest_link == *l_ptr) {
2534 		err("Unexpected changeover message on link <%s>\n",
2535 		    (*l_ptr)->name);
2536 		goto exit;
2537 	}
2538 	dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2539 	    (*l_ptr)->b_ptr->net_plane);
2540 	*l_ptr = dest_link;
2541 	msg = msg_get_wrapped(tunnel_msg);
2542 
2543 	if (msg_typ == DUPLICATE_MSG) {
2544 		if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2545 			msg_dbg(tunnel_msg, "DROP/<REC<");
2546 			goto exit;
2547 		}
2548 		*buf = buf_extract(tunnel_buf,INT_H_SIZE);
2549 		if (*buf == NULL) {
2550 			warn("Link changeover error, duplicate msg dropped\n");
2551 			goto exit;
2552 		}
2553 		msg_dbg(tunnel_msg, "TNL<REC<");
2554 		buf_discard(tunnel_buf);
2555 		return 1;
2556 	}
2557 
2558 	/* First original message ?: */
2559 
2560 	if (tipc_link_is_up(dest_link)) {
2561 		msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2562 		info("Resetting link <%s>, changeover initiated by peer\n",
2563 		     dest_link->name);
2564 		tipc_link_reset(dest_link);
2565 		dest_link->exp_msg_count = msg_count;
2566 		dbg("Expecting %u tunnelled messages\n", msg_count);
2567 		if (!msg_count)
2568 			goto exit;
2569 	} else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2570 		msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2571 		dest_link->exp_msg_count = msg_count;
2572 		dbg("Expecting %u tunnelled messages\n", msg_count);
2573 		if (!msg_count)
2574 			goto exit;
2575 	}
2576 
2577 	/* Receive original message */
2578 
2579 	if (dest_link->exp_msg_count == 0) {
2580 		warn("Link switchover error, "
2581 		     "got too many tunnelled messages\n");
2582 		msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2583 		dbg_print_link(dest_link, "LINK:");
2584 		goto exit;
2585 	}
2586 	dest_link->exp_msg_count--;
2587 	if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2588 		msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2589 		goto exit;
2590 	} else {
2591 		*buf = buf_extract(tunnel_buf, INT_H_SIZE);
2592 		if (*buf != NULL) {
2593 			msg_dbg(tunnel_msg, "TNL<REC<");
2594 			buf_discard(tunnel_buf);
2595 			return 1;
2596 		} else {
2597 			warn("Link changeover error, original msg dropped\n");
2598 		}
2599 	}
2600 exit:
2601 	*buf = NULL;
2602 	buf_discard(tunnel_buf);
2603 	return 0;
2604 }
2605 
2606 /*
2607  *  Bundler functionality:
2608  */
2609 void tipc_link_recv_bundle(struct sk_buff *buf)
2610 {
2611 	u32 msgcount = msg_msgcnt(buf_msg(buf));
2612 	u32 pos = INT_H_SIZE;
2613 	struct sk_buff *obuf;
2614 
2615 	msg_dbg(buf_msg(buf), "<BNDL<: ");
2616 	while (msgcount--) {
2617 		obuf = buf_extract(buf, pos);
2618 		if (obuf == NULL) {
2619 			warn("Link unable to unbundle message(s)\n");
2620 			break;
2621 		}
2622 		pos += align(msg_size(buf_msg(obuf)));
2623 		msg_dbg(buf_msg(obuf), "     /");
2624 		tipc_net_route_msg(obuf);
2625 	}
2626 	buf_discard(buf);
2627 }
2628 
2629 /*
2630  *  Fragmentation/defragmentation:
2631  */
2632 
2633 
2634 /*
2635  * tipc_link_send_long_buf: Entry for buffers needing fragmentation.
2636  * The buffer is complete, inclusive total message length.
2637  * Returns user data length.
2638  */
2639 int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2640 {
2641 	struct tipc_msg *inmsg = buf_msg(buf);
2642 	struct tipc_msg fragm_hdr;
2643 	u32 insize = msg_size(inmsg);
2644 	u32 dsz = msg_data_sz(inmsg);
2645 	unchar *crs = buf->data;
2646 	u32 rest = insize;
2647 	u32 pack_sz = l_ptr->max_pkt;
2648 	u32 fragm_sz = pack_sz - INT_H_SIZE;
2649 	u32 fragm_no = 1;
2650 	u32 destaddr;
2651 
2652 	if (msg_short(inmsg))
2653 		destaddr = l_ptr->addr;
2654 	else
2655 		destaddr = msg_destnode(inmsg);
2656 
2657 	if (msg_routed(inmsg))
2658 		msg_set_prevnode(inmsg, tipc_own_addr);
2659 
2660 	/* Prepare reusable fragment header: */
2661 
2662 	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2663 		 INT_H_SIZE, destaddr);
2664 	msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2665 	msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2666 	msg_set_fragm_no(&fragm_hdr, fragm_no);
2667 	l_ptr->stats.sent_fragmented++;
2668 
2669 	/* Chop up message: */
2670 
2671 	while (rest > 0) {
2672 		struct sk_buff *fragm;
2673 
2674 		if (rest <= fragm_sz) {
2675 			fragm_sz = rest;
2676 			msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2677 		}
2678 		fragm = buf_acquire(fragm_sz + INT_H_SIZE);
2679 		if (fragm == NULL) {
2680 			warn("Link unable to fragment message\n");
2681 			dsz = -ENOMEM;
2682 			goto exit;
2683 		}
2684 		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2685 		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2686 		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2687 					       fragm_sz);
2688 		/*  Send queued messages first, if any: */
2689 
2690 		l_ptr->stats.sent_fragments++;
2691 		tipc_link_send_buf(l_ptr, fragm);
2692 		if (!tipc_link_is_up(l_ptr))
2693 			return dsz;
2694 		msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2695 		rest -= fragm_sz;
2696 		crs += fragm_sz;
2697 		msg_set_type(&fragm_hdr, FRAGMENT);
2698 	}
2699 exit:
2700 	buf_discard(buf);
2701 	return dsz;
2702 }
2703 
2704 /*
2705  * A pending message being re-assembled must store certain values
2706  * to handle subsequent fragments correctly. The following functions
2707  * help storing these values in unused, available fields in the
2708  * pending message. This makes dynamic memory allocation unecessary.
2709  */
2710 
2711 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2712 {
2713 	msg_set_seqno(buf_msg(buf), seqno);
2714 }
2715 
2716 static u32 get_fragm_size(struct sk_buff *buf)
2717 {
2718 	return msg_ack(buf_msg(buf));
2719 }
2720 
2721 static void set_fragm_size(struct sk_buff *buf, u32 sz)
2722 {
2723 	msg_set_ack(buf_msg(buf), sz);
2724 }
2725 
2726 static u32 get_expected_frags(struct sk_buff *buf)
2727 {
2728 	return msg_bcast_ack(buf_msg(buf));
2729 }
2730 
2731 static void set_expected_frags(struct sk_buff *buf, u32 exp)
2732 {
2733 	msg_set_bcast_ack(buf_msg(buf), exp);
2734 }
2735 
2736 static u32 get_timer_cnt(struct sk_buff *buf)
2737 {
2738 	return msg_reroute_cnt(buf_msg(buf));
2739 }
2740 
2741 static void incr_timer_cnt(struct sk_buff *buf)
2742 {
2743 	msg_incr_reroute_cnt(buf_msg(buf));
2744 }
2745 
2746 /*
2747  * tipc_link_recv_fragment(): Called with node lock on. Returns
2748  * the reassembled buffer if message is complete.
2749  */
2750 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2751 			    struct tipc_msg **m)
2752 {
2753 	struct sk_buff *prev = NULL;
2754 	struct sk_buff *fbuf = *fb;
2755 	struct tipc_msg *fragm = buf_msg(fbuf);
2756 	struct sk_buff *pbuf = *pending;
2757 	u32 long_msg_seq_no = msg_long_msgno(fragm);
2758 
2759 	*fb = NULL;
2760 	msg_dbg(fragm,"FRG<REC<");
2761 
2762 	/* Is there an incomplete message waiting for this fragment? */
2763 
2764 	while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) ||
2765 			(msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2766 		prev = pbuf;
2767 		pbuf = pbuf->next;
2768 	}
2769 
2770 	if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2771 		struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2772 		u32 msg_sz = msg_size(imsg);
2773 		u32 fragm_sz = msg_data_sz(fragm);
2774 		u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2775 		u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2776 		if (msg_type(imsg) == TIPC_MCAST_MSG)
2777 			max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2778 		if (msg_size(imsg) > max) {
2779 			msg_dbg(fragm,"<REC<Oversized: ");
2780 			buf_discard(fbuf);
2781 			return 0;
2782 		}
2783 		pbuf = buf_acquire(msg_size(imsg));
2784 		if (pbuf != NULL) {
2785 			pbuf->next = *pending;
2786 			*pending = pbuf;
2787 			skb_copy_to_linear_data(pbuf, imsg,
2788 						msg_data_sz(fragm));
2789 			/*  Prepare buffer for subsequent fragments. */
2790 
2791 			set_long_msg_seqno(pbuf, long_msg_seq_no);
2792 			set_fragm_size(pbuf,fragm_sz);
2793 			set_expected_frags(pbuf,exp_fragm_cnt - 1);
2794 		} else {
2795 			warn("Link unable to reassemble fragmented message\n");
2796 		}
2797 		buf_discard(fbuf);
2798 		return 0;
2799 	} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2800 		u32 dsz = msg_data_sz(fragm);
2801 		u32 fsz = get_fragm_size(pbuf);
2802 		u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2803 		u32 exp_frags = get_expected_frags(pbuf) - 1;
2804 		skb_copy_to_linear_data_offset(pbuf, crs,
2805 					       msg_data(fragm), dsz);
2806 		buf_discard(fbuf);
2807 
2808 		/* Is message complete? */
2809 
2810 		if (exp_frags == 0) {
2811 			if (prev)
2812 				prev->next = pbuf->next;
2813 			else
2814 				*pending = pbuf->next;
2815 			msg_reset_reroute_cnt(buf_msg(pbuf));
2816 			*fb = pbuf;
2817 			*m = buf_msg(pbuf);
2818 			return 1;
2819 		}
2820 		set_expected_frags(pbuf,exp_frags);
2821 		return 0;
2822 	}
2823 	dbg(" Discarding orphan fragment %x\n",fbuf);
2824 	msg_dbg(fragm,"ORPHAN:");
2825 	dbg("Pending long buffers:\n");
2826 	dbg_print_buf_chain(*pending);
2827 	buf_discard(fbuf);
2828 	return 0;
2829 }
2830 
2831 /**
2832  * link_check_defragm_bufs - flush stale incoming message fragments
2833  * @l_ptr: pointer to link
2834  */
2835 
2836 static void link_check_defragm_bufs(struct link *l_ptr)
2837 {
2838 	struct sk_buff *prev = NULL;
2839 	struct sk_buff *next = NULL;
2840 	struct sk_buff *buf = l_ptr->defragm_buf;
2841 
2842 	if (!buf)
2843 		return;
2844 	if (!link_working_working(l_ptr))
2845 		return;
2846 	while (buf) {
2847 		u32 cnt = get_timer_cnt(buf);
2848 
2849 		next = buf->next;
2850 		if (cnt < 4) {
2851 			incr_timer_cnt(buf);
2852 			prev = buf;
2853 		} else {
2854 			dbg(" Discarding incomplete long buffer\n");
2855 			msg_dbg(buf_msg(buf), "LONG:");
2856 			dbg_print_link(l_ptr, "curr:");
2857 			dbg("Pending long buffers:\n");
2858 			dbg_print_buf_chain(l_ptr->defragm_buf);
2859 			if (prev)
2860 				prev->next = buf->next;
2861 			else
2862 				l_ptr->defragm_buf = buf->next;
2863 			buf_discard(buf);
2864 		}
2865 		buf = next;
2866 	}
2867 }
2868 
2869 
2870 
2871 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2872 {
2873 	l_ptr->tolerance = tolerance;
2874 	l_ptr->continuity_interval =
2875 		((tolerance / 4) > 500) ? 500 : tolerance / 4;
2876 	l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2877 }
2878 
2879 
2880 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2881 {
2882 	/* Data messages from this node, inclusive FIRST_FRAGM */
2883 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
2884 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
2885 	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
2886 	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
2887 	/* Transiting data messages,inclusive FIRST_FRAGM */
2888 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
2889 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
2890 	l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
2891 	l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
2892 	l_ptr->queue_limit[CONN_MANAGER] = 1200;
2893 	l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2894 	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2895 	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2896 	/* FRAGMENT and LAST_FRAGMENT packets */
2897 	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2898 }
2899 
2900 /**
2901  * link_find_link - locate link by name
2902  * @name - ptr to link name string
2903  * @node - ptr to area to be filled with ptr to associated node
2904  *
2905  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2906  * this also prevents link deletion.
2907  *
2908  * Returns pointer to link (or 0 if invalid link name).
2909  */
2910 
2911 static struct link *link_find_link(const char *name, struct tipc_node **node)
2912 {
2913 	struct link_name link_name_parts;
2914 	struct bearer *b_ptr;
2915 	struct link *l_ptr;
2916 
2917 	if (!link_name_validate(name, &link_name_parts))
2918 		return NULL;
2919 
2920 	b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2921 	if (!b_ptr)
2922 		return NULL;
2923 
2924 	*node = tipc_node_find(link_name_parts.addr_peer);
2925 	if (!*node)
2926 		return NULL;
2927 
2928 	l_ptr = (*node)->links[b_ptr->identity];
2929 	if (!l_ptr || strcmp(l_ptr->name, name))
2930 		return NULL;
2931 
2932 	return l_ptr;
2933 }
2934 
2935 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2936 				     u16 cmd)
2937 {
2938 	struct tipc_link_config *args;
2939 	u32 new_value;
2940 	struct link *l_ptr;
2941 	struct tipc_node *node;
2942 	int res;
2943 
2944 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2945 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2946 
2947 	args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2948 	new_value = ntohl(args->value);
2949 
2950 	if (!strcmp(args->name, tipc_bclink_name)) {
2951 		if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2952 		    (tipc_bclink_set_queue_limits(new_value) == 0))
2953 			return tipc_cfg_reply_none();
2954 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2955 						   " (cannot change setting on broadcast link)");
2956 	}
2957 
2958 	read_lock_bh(&tipc_net_lock);
2959 	l_ptr = link_find_link(args->name, &node);
2960 	if (!l_ptr) {
2961 		read_unlock_bh(&tipc_net_lock);
2962 		return tipc_cfg_reply_error_string("link not found");
2963 	}
2964 
2965 	tipc_node_lock(node);
2966 	res = -EINVAL;
2967 	switch (cmd) {
2968 	case TIPC_CMD_SET_LINK_TOL:
2969 		if ((new_value >= TIPC_MIN_LINK_TOL) &&
2970 		    (new_value <= TIPC_MAX_LINK_TOL)) {
2971 			link_set_supervision_props(l_ptr, new_value);
2972 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2973 						 0, 0, new_value, 0, 0);
2974 			res = 0;
2975 		}
2976 		break;
2977 	case TIPC_CMD_SET_LINK_PRI:
2978 		if ((new_value >= TIPC_MIN_LINK_PRI) &&
2979 		    (new_value <= TIPC_MAX_LINK_PRI)) {
2980 			l_ptr->priority = new_value;
2981 			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2982 						 0, 0, 0, new_value, 0);
2983 			res = 0;
2984 		}
2985 		break;
2986 	case TIPC_CMD_SET_LINK_WINDOW:
2987 		if ((new_value >= TIPC_MIN_LINK_WIN) &&
2988 		    (new_value <= TIPC_MAX_LINK_WIN)) {
2989 			tipc_link_set_queue_limits(l_ptr, new_value);
2990 			res = 0;
2991 		}
2992 		break;
2993 	}
2994 	tipc_node_unlock(node);
2995 
2996 	read_unlock_bh(&tipc_net_lock);
2997 	if (res)
2998 		return tipc_cfg_reply_error_string("cannot change link setting");
2999 
3000 	return tipc_cfg_reply_none();
3001 }
3002 
3003 /**
3004  * link_reset_statistics - reset link statistics
3005  * @l_ptr: pointer to link
3006  */
3007 
3008 static void link_reset_statistics(struct link *l_ptr)
3009 {
3010 	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
3011 	l_ptr->stats.sent_info = l_ptr->next_out_no;
3012 	l_ptr->stats.recv_info = l_ptr->next_in_no;
3013 }
3014 
3015 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
3016 {
3017 	char *link_name;
3018 	struct link *l_ptr;
3019 	struct tipc_node *node;
3020 
3021 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3022 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3023 
3024 	link_name = (char *)TLV_DATA(req_tlv_area);
3025 	if (!strcmp(link_name, tipc_bclink_name)) {
3026 		if (tipc_bclink_reset_stats())
3027 			return tipc_cfg_reply_error_string("link not found");
3028 		return tipc_cfg_reply_none();
3029 	}
3030 
3031 	read_lock_bh(&tipc_net_lock);
3032 	l_ptr = link_find_link(link_name, &node);
3033 	if (!l_ptr) {
3034 		read_unlock_bh(&tipc_net_lock);
3035 		return tipc_cfg_reply_error_string("link not found");
3036 	}
3037 
3038 	tipc_node_lock(node);
3039 	link_reset_statistics(l_ptr);
3040 	tipc_node_unlock(node);
3041 	read_unlock_bh(&tipc_net_lock);
3042 	return tipc_cfg_reply_none();
3043 }
3044 
3045 /**
3046  * percent - convert count to a percentage of total (rounding up or down)
3047  */
3048 
3049 static u32 percent(u32 count, u32 total)
3050 {
3051 	return (count * 100 + (total / 2)) / total;
3052 }
3053 
3054 /**
3055  * tipc_link_stats - print link statistics
3056  * @name: link name
3057  * @buf: print buffer area
3058  * @buf_size: size of print buffer area
3059  *
3060  * Returns length of print buffer data string (or 0 if error)
3061  */
3062 
3063 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
3064 {
3065 	struct print_buf pb;
3066 	struct link *l_ptr;
3067 	struct tipc_node *node;
3068 	char *status;
3069 	u32 profile_total = 0;
3070 
3071 	if (!strcmp(name, tipc_bclink_name))
3072 		return tipc_bclink_stats(buf, buf_size);
3073 
3074 	tipc_printbuf_init(&pb, buf, buf_size);
3075 
3076 	read_lock_bh(&tipc_net_lock);
3077 	l_ptr = link_find_link(name, &node);
3078 	if (!l_ptr) {
3079 		read_unlock_bh(&tipc_net_lock);
3080 		return 0;
3081 	}
3082 	tipc_node_lock(node);
3083 
3084 	if (tipc_link_is_active(l_ptr))
3085 		status = "ACTIVE";
3086 	else if (tipc_link_is_up(l_ptr))
3087 		status = "STANDBY";
3088 	else
3089 		status = "DEFUNCT";
3090 	tipc_printf(&pb, "Link <%s>\n"
3091 			 "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
3092 			 "  Window:%u packets\n",
3093 		    l_ptr->name, status, l_ptr->max_pkt,
3094 		    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
3095 	tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
3096 		    l_ptr->next_in_no - l_ptr->stats.recv_info,
3097 		    l_ptr->stats.recv_fragments,
3098 		    l_ptr->stats.recv_fragmented,
3099 		    l_ptr->stats.recv_bundles,
3100 		    l_ptr->stats.recv_bundled);
3101 	tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3102 		    l_ptr->next_out_no - l_ptr->stats.sent_info,
3103 		    l_ptr->stats.sent_fragments,
3104 		    l_ptr->stats.sent_fragmented,
3105 		    l_ptr->stats.sent_bundles,
3106 		    l_ptr->stats.sent_bundled);
3107 	profile_total = l_ptr->stats.msg_length_counts;
3108 	if (!profile_total)
3109 		profile_total = 1;
3110 	tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3111 			 "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3112 			 "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3113 		    l_ptr->stats.msg_length_counts,
3114 		    l_ptr->stats.msg_lengths_total / profile_total,
3115 		    percent(l_ptr->stats.msg_length_profile[0], profile_total),
3116 		    percent(l_ptr->stats.msg_length_profile[1], profile_total),
3117 		    percent(l_ptr->stats.msg_length_profile[2], profile_total),
3118 		    percent(l_ptr->stats.msg_length_profile[3], profile_total),
3119 		    percent(l_ptr->stats.msg_length_profile[4], profile_total),
3120 		    percent(l_ptr->stats.msg_length_profile[5], profile_total),
3121 		    percent(l_ptr->stats.msg_length_profile[6], profile_total));
3122 	tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3123 		    l_ptr->stats.recv_states,
3124 		    l_ptr->stats.recv_probes,
3125 		    l_ptr->stats.recv_nacks,
3126 		    l_ptr->stats.deferred_recv,
3127 		    l_ptr->stats.duplicates);
3128 	tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3129 		    l_ptr->stats.sent_states,
3130 		    l_ptr->stats.sent_probes,
3131 		    l_ptr->stats.sent_nacks,
3132 		    l_ptr->stats.sent_acks,
3133 		    l_ptr->stats.retransmitted);
3134 	tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3135 		    l_ptr->stats.bearer_congs,
3136 		    l_ptr->stats.link_congs,
3137 		    l_ptr->stats.max_queue_sz,
3138 		    l_ptr->stats.queue_sz_counts
3139 		    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3140 		    : 0);
3141 
3142 	tipc_node_unlock(node);
3143 	read_unlock_bh(&tipc_net_lock);
3144 	return tipc_printbuf_validate(&pb);
3145 }
3146 
3147 #define MAX_LINK_STATS_INFO 2000
3148 
3149 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3150 {
3151 	struct sk_buff *buf;
3152 	struct tlv_desc *rep_tlv;
3153 	int str_len;
3154 
3155 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3156 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3157 
3158 	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3159 	if (!buf)
3160 		return NULL;
3161 
3162 	rep_tlv = (struct tlv_desc *)buf->data;
3163 
3164 	str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3165 				  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3166 	if (!str_len) {
3167 		buf_discard(buf);
3168 		return tipc_cfg_reply_error_string("link not found");
3169 	}
3170 
3171 	skb_put(buf, TLV_SPACE(str_len));
3172 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3173 
3174 	return buf;
3175 }
3176 
3177 #if 0
3178 int link_control(const char *name, u32 op, u32 val)
3179 {
3180 	int res = -EINVAL;
3181 	struct link *l_ptr;
3182 	u32 bearer_id;
3183 	struct tipc_node * node;
3184 	u32 a;
3185 
3186 	a = link_name2addr(name, &bearer_id);
3187 	read_lock_bh(&tipc_net_lock);
3188 	node = tipc_node_find(a);
3189 	if (node) {
3190 		tipc_node_lock(node);
3191 		l_ptr = node->links[bearer_id];
3192 		if (l_ptr) {
3193 			if (op == TIPC_REMOVE_LINK) {
3194 				struct bearer *b_ptr = l_ptr->b_ptr;
3195 				spin_lock_bh(&b_ptr->publ.lock);
3196 				tipc_link_delete(l_ptr);
3197 				spin_unlock_bh(&b_ptr->publ.lock);
3198 			}
3199 			if (op == TIPC_CMD_BLOCK_LINK) {
3200 				tipc_link_reset(l_ptr);
3201 				l_ptr->blocked = 1;
3202 			}
3203 			if (op == TIPC_CMD_UNBLOCK_LINK) {
3204 				l_ptr->blocked = 0;
3205 			}
3206 			res = 0;
3207 		}
3208 		tipc_node_unlock(node);
3209 	}
3210 	read_unlock_bh(&tipc_net_lock);
3211 	return res;
3212 }
3213 #endif
3214 
3215 /**
3216  * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3217  * @dest: network address of destination node
3218  * @selector: used to select from set of active links
3219  *
3220  * If no active link can be found, uses default maximum packet size.
3221  */
3222 
3223 u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3224 {
3225 	struct tipc_node *n_ptr;
3226 	struct link *l_ptr;
3227 	u32 res = MAX_PKT_DEFAULT;
3228 
3229 	if (dest == tipc_own_addr)
3230 		return MAX_MSG_SIZE;
3231 
3232 	read_lock_bh(&tipc_net_lock);
3233 	n_ptr = tipc_node_select(dest, selector);
3234 	if (n_ptr) {
3235 		tipc_node_lock(n_ptr);
3236 		l_ptr = n_ptr->active_links[selector & 1];
3237 		if (l_ptr)
3238 			res = l_ptr->max_pkt;
3239 		tipc_node_unlock(n_ptr);
3240 	}
3241 	read_unlock_bh(&tipc_net_lock);
3242 	return res;
3243 }
3244 
3245 #if 0
3246 static void link_dump_rec_queue(struct link *l_ptr)
3247 {
3248 	struct sk_buff *crs;
3249 
3250 	if (!l_ptr->oldest_deferred_in) {
3251 		info("Reception queue empty\n");
3252 		return;
3253 	}
3254 	info("Contents of Reception queue:\n");
3255 	crs = l_ptr->oldest_deferred_in;
3256 	while (crs) {
3257 		if (crs->data == (void *)0x0000a3a3) {
3258 			info("buffer %x invalid\n", crs);
3259 			return;
3260 		}
3261 		msg_dbg(buf_msg(crs), "In rec queue:\n");
3262 		crs = crs->next;
3263 	}
3264 }
3265 #endif
3266 
3267 static void link_dump_send_queue(struct link *l_ptr)
3268 {
3269 	if (l_ptr->next_out) {
3270 		info("\nContents of unsent queue:\n");
3271 		dbg_print_buf_chain(l_ptr->next_out);
3272 	}
3273 	info("\nContents of send queue:\n");
3274 	if (l_ptr->first_out) {
3275 		dbg_print_buf_chain(l_ptr->first_out);
3276 	}
3277 	info("Empty send queue\n");
3278 }
3279 
3280 static void link_print(struct link *l_ptr, struct print_buf *buf,
3281 		       const char *str)
3282 {
3283 	tipc_printf(buf, str);
3284 	if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3285 		return;
3286 	tipc_printf(buf, "Link %x<%s>:",
3287 		    l_ptr->addr, l_ptr->b_ptr->publ.name);
3288 	tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3289 	tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3290 	tipc_printf(buf, "SQUE");
3291 	if (l_ptr->first_out) {
3292 		tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3293 		if (l_ptr->next_out)
3294 			tipc_printf(buf, "%u..",
3295 				    msg_seqno(buf_msg(l_ptr->next_out)));
3296 		tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out)));
3297 		if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3298 			 msg_seqno(buf_msg(l_ptr->first_out)))
3299 		     != (l_ptr->out_queue_size - 1)) ||
3300 		    (l_ptr->last_out->next != NULL)) {
3301 			tipc_printf(buf, "\nSend queue inconsistency\n");
3302 			tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3303 			tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3304 			tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3305 			link_dump_send_queue(l_ptr);
3306 		}
3307 	} else
3308 		tipc_printf(buf, "[]");
3309 	tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3310 	if (l_ptr->oldest_deferred_in) {
3311 		u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3312 		u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3313 		tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3314 		if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3315 			tipc_printf(buf, ":RQSIZ(%u)",
3316 				    l_ptr->deferred_inqueue_sz);
3317 		}
3318 	}
3319 	if (link_working_unknown(l_ptr))
3320 		tipc_printf(buf, ":WU");
3321 	if (link_reset_reset(l_ptr))
3322 		tipc_printf(buf, ":RR");
3323 	if (link_reset_unknown(l_ptr))
3324 		tipc_printf(buf, ":RU");
3325 	if (link_working_working(l_ptr))
3326 		tipc_printf(buf, ":WW");
3327 	tipc_printf(buf, "\n");
3328 }
3329 
3330