xref: /openbmc/linux/net/tipc/bcast.c (revision 455f9726)
1 /*
2  * net/tipc/bcast.c: TIPC broadcast code
3  *
4  * Copyright (c) 2004-2006, 2014, Ericsson AB
5  * Copyright (c) 2004, Intel Corporation.
6  * Copyright (c) 2005, 2010-2011, Wind River Systems
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  * 3. Neither the names of the copyright holders nor the names of its
18  *    contributors may be used to endorse or promote products derived from
19  *    this software without specific prior written permission.
20  *
21  * Alternatively, this software may be distributed under the terms of the
22  * GNU General Public License ("GPL") version 2 as published by the Free
23  * Software Foundation.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35  * POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 #include "core.h"
39 #include "link.h"
40 #include "port.h"
41 #include "socket.h"
42 #include "msg.h"
43 #include "bcast.h"
44 #include "name_distr.h"
45 
46 #define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
47 #define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */
48 #define	BCBEARER		MAX_BEARERS
49 
50 /**
51  * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
52  * @primary: pointer to primary bearer
53  * @secondary: pointer to secondary bearer
54  *
55  * Bearers must have same priority and same set of reachable destinations
56  * to be paired.
57  */
58 
59 struct tipc_bcbearer_pair {
60 	struct tipc_bearer *primary;
61 	struct tipc_bearer *secondary;
62 };
63 
64 /**
65  * struct tipc_bcbearer - bearer used by broadcast link
66  * @bearer: (non-standard) broadcast bearer structure
67  * @media: (non-standard) broadcast media structure
68  * @bpairs: array of bearer pairs
69  * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
70  * @remains: temporary node map used by tipc_bcbearer_send()
71  * @remains_new: temporary node map used tipc_bcbearer_send()
72  *
73  * Note: The fields labelled "temporary" are incorporated into the bearer
74  * to avoid consuming potentially limited stack space through the use of
75  * large local variables within multicast routines.  Concurrent access is
76  * prevented through use of the spinlock "bclink_lock".
77  */
78 struct tipc_bcbearer {
79 	struct tipc_bearer bearer;
80 	struct tipc_media media;
81 	struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
82 	struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
83 	struct tipc_node_map remains;
84 	struct tipc_node_map remains_new;
85 };
86 
87 /**
88  * struct tipc_bclink - link used for broadcast messages
89  * @lock: spinlock governing access to structure
90  * @link: (non-standard) broadcast link structure
91  * @node: (non-standard) node structure representing b'cast link's peer node
92  * @flags: represent bclink states
93  * @bcast_nodes: map of broadcast-capable nodes
94  * @retransmit_to: node that most recently requested a retransmit
95  *
96  * Handles sequence numbering, fragmentation, bundling, etc.
97  */
98 struct tipc_bclink {
99 	spinlock_t lock;
100 	struct tipc_link link;
101 	struct tipc_node node;
102 	unsigned int flags;
103 	struct tipc_node_map bcast_nodes;
104 	struct tipc_node *retransmit_to;
105 };
106 
107 static struct tipc_bcbearer *bcbearer;
108 static struct tipc_bclink *bclink;
109 static struct tipc_link *bcl;
110 
111 const char tipc_bclink_name[] = "broadcast-link";
112 
113 static void tipc_nmap_diff(struct tipc_node_map *nm_a,
114 			   struct tipc_node_map *nm_b,
115 			   struct tipc_node_map *nm_diff);
116 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
117 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
118 
119 static void tipc_bclink_lock(void)
120 {
121 	spin_lock_bh(&bclink->lock);
122 }
123 
124 static void tipc_bclink_unlock(void)
125 {
126 	struct tipc_node *node = NULL;
127 
128 	if (likely(!bclink->flags)) {
129 		spin_unlock_bh(&bclink->lock);
130 		return;
131 	}
132 
133 	if (bclink->flags & TIPC_BCLINK_RESET) {
134 		bclink->flags &= ~TIPC_BCLINK_RESET;
135 		node = tipc_bclink_retransmit_to();
136 	}
137 	spin_unlock_bh(&bclink->lock);
138 
139 	if (node)
140 		tipc_link_reset_all(node);
141 }
142 
143 uint  tipc_bclink_get_mtu(void)
144 {
145 	return MAX_PKT_DEFAULT_MCAST;
146 }
147 
148 void tipc_bclink_set_flags(unsigned int flags)
149 {
150 	bclink->flags |= flags;
151 }
152 
153 static u32 bcbuf_acks(struct sk_buff *buf)
154 {
155 	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
156 }
157 
158 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
159 {
160 	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
161 }
162 
163 static void bcbuf_decr_acks(struct sk_buff *buf)
164 {
165 	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
166 }
167 
168 void tipc_bclink_add_node(u32 addr)
169 {
170 	tipc_bclink_lock();
171 	tipc_nmap_add(&bclink->bcast_nodes, addr);
172 	tipc_bclink_unlock();
173 }
174 
175 void tipc_bclink_remove_node(u32 addr)
176 {
177 	tipc_bclink_lock();
178 	tipc_nmap_remove(&bclink->bcast_nodes, addr);
179 	tipc_bclink_unlock();
180 }
181 
182 static void bclink_set_last_sent(void)
183 {
184 	if (bcl->next_out)
185 		bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
186 	else
187 		bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
188 }
189 
190 u32 tipc_bclink_get_last_sent(void)
191 {
192 	return bcl->fsm_msg_cnt;
193 }
194 
195 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
196 {
197 	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
198 						seqno : node->bclink.last_sent;
199 }
200 
201 
202 /**
203  * tipc_bclink_retransmit_to - get most recent node to request retransmission
204  *
205  * Called with bclink_lock locked
206  */
207 struct tipc_node *tipc_bclink_retransmit_to(void)
208 {
209 	return bclink->retransmit_to;
210 }
211 
212 /**
213  * bclink_retransmit_pkt - retransmit broadcast packets
214  * @after: sequence number of last packet to *not* retransmit
215  * @to: sequence number of last packet to retransmit
216  *
217  * Called with bclink_lock locked
218  */
219 static void bclink_retransmit_pkt(u32 after, u32 to)
220 {
221 	struct sk_buff *buf;
222 
223 	buf = bcl->first_out;
224 	while (buf && less_eq(buf_seqno(buf), after))
225 		buf = buf->next;
226 	tipc_link_retransmit(bcl, buf, mod(to - after));
227 }
228 
229 /**
230  * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
231  * @n_ptr: node that sent acknowledgement info
232  * @acked: broadcast sequence # that has been acknowledged
233  *
234  * Node is locked, bclink_lock unlocked.
235  */
236 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
237 {
238 	struct sk_buff *crs;
239 	struct sk_buff *next;
240 	unsigned int released = 0;
241 
242 	tipc_bclink_lock();
243 	/* Bail out if tx queue is empty (no clean up is required) */
244 	crs = bcl->first_out;
245 	if (!crs)
246 		goto exit;
247 
248 	/* Determine which messages need to be acknowledged */
249 	if (acked == INVALID_LINK_SEQ) {
250 		/*
251 		 * Contact with specified node has been lost, so need to
252 		 * acknowledge sent messages only (if other nodes still exist)
253 		 * or both sent and unsent messages (otherwise)
254 		 */
255 		if (bclink->bcast_nodes.count)
256 			acked = bcl->fsm_msg_cnt;
257 		else
258 			acked = bcl->next_out_no;
259 	} else {
260 		/*
261 		 * Bail out if specified sequence number does not correspond
262 		 * to a message that has been sent and not yet acknowledged
263 		 */
264 		if (less(acked, buf_seqno(crs)) ||
265 		    less(bcl->fsm_msg_cnt, acked) ||
266 		    less_eq(acked, n_ptr->bclink.acked))
267 			goto exit;
268 	}
269 
270 	/* Skip over packets that node has previously acknowledged */
271 	while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked))
272 		crs = crs->next;
273 
274 	/* Update packets that node is now acknowledging */
275 
276 	while (crs && less_eq(buf_seqno(crs), acked)) {
277 		next = crs->next;
278 
279 		if (crs != bcl->next_out)
280 			bcbuf_decr_acks(crs);
281 		else {
282 			bcbuf_set_acks(crs, 0);
283 			bcl->next_out = next;
284 			bclink_set_last_sent();
285 		}
286 
287 		if (bcbuf_acks(crs) == 0) {
288 			bcl->first_out = next;
289 			bcl->out_queue_size--;
290 			kfree_skb(crs);
291 			released = 1;
292 		}
293 		crs = next;
294 	}
295 	n_ptr->bclink.acked = acked;
296 
297 	/* Try resolving broadcast link congestion, if necessary */
298 
299 	if (unlikely(bcl->next_out)) {
300 		tipc_link_push_queue(bcl);
301 		bclink_set_last_sent();
302 	}
303 	if (unlikely(released && !list_empty(&bcl->waiting_ports)))
304 		tipc_link_wakeup_ports(bcl, 0);
305 exit:
306 	tipc_bclink_unlock();
307 }
308 
309 /**
310  * tipc_bclink_update_link_state - update broadcast link state
311  *
312  * RCU and node lock set
313  */
314 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
315 {
316 	struct sk_buff *buf;
317 
318 	/* Ignore "stale" link state info */
319 
320 	if (less_eq(last_sent, n_ptr->bclink.last_in))
321 		return;
322 
323 	/* Update link synchronization state; quit if in sync */
324 
325 	bclink_update_last_sent(n_ptr, last_sent);
326 
327 	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
328 		return;
329 
330 	/* Update out-of-sync state; quit if loss is still unconfirmed */
331 
332 	if ((++n_ptr->bclink.oos_state) == 1) {
333 		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
334 			return;
335 		n_ptr->bclink.oos_state++;
336 	}
337 
338 	/* Don't NACK if one has been recently sent (or seen) */
339 
340 	if (n_ptr->bclink.oos_state & 0x1)
341 		return;
342 
343 	/* Send NACK */
344 
345 	buf = tipc_buf_acquire(INT_H_SIZE);
346 	if (buf) {
347 		struct tipc_msg *msg = buf_msg(buf);
348 
349 		tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
350 			      INT_H_SIZE, n_ptr->addr);
351 		msg_set_non_seq(msg, 1);
352 		msg_set_mc_netid(msg, tipc_net_id);
353 		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
354 		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
355 		msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head
356 				 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
357 				 : n_ptr->bclink.last_sent);
358 
359 		tipc_bclink_lock();
360 		tipc_bearer_send(MAX_BEARERS, buf, NULL);
361 		bcl->stats.sent_nacks++;
362 		tipc_bclink_unlock();
363 		kfree_skb(buf);
364 
365 		n_ptr->bclink.oos_state++;
366 	}
367 }
368 
369 /**
370  * bclink_peek_nack - monitor retransmission requests sent by other nodes
371  *
372  * Delay any upcoming NACK by this node if another node has already
373  * requested the first message this node is going to ask for.
374  */
375 static void bclink_peek_nack(struct tipc_msg *msg)
376 {
377 	struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg));
378 
379 	if (unlikely(!n_ptr))
380 		return;
381 
382 	tipc_node_lock(n_ptr);
383 
384 	if (n_ptr->bclink.recv_permitted &&
385 	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
386 	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
387 		n_ptr->bclink.oos_state = 2;
388 
389 	tipc_node_unlock(n_ptr);
390 }
391 
392 /* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
393  *                    and to identified node local sockets
394  * @buf: chain of buffers containing message
395  * Consumes the buffer chain, except when returning -ELINKCONG
396  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
397  */
398 int tipc_bclink_xmit(struct sk_buff *buf)
399 {
400 	int rc = 0;
401 	int bc = 0;
402 	struct sk_buff *clbuf;
403 
404 	/* Prepare clone of message for local node */
405 	clbuf = tipc_msg_reassemble(buf);
406 	if (unlikely(!clbuf)) {
407 		kfree_skb_list(buf);
408 		return -EHOSTUNREACH;
409 	}
410 
411 	/* Broadcast to all other nodes */
412 	if (likely(bclink)) {
413 		tipc_bclink_lock();
414 		if (likely(bclink->bcast_nodes.count)) {
415 			rc = __tipc_link_xmit(bcl, buf);
416 			if (likely(!rc)) {
417 				bclink_set_last_sent();
418 				bcl->stats.queue_sz_counts++;
419 				bcl->stats.accu_queue_sz += bcl->out_queue_size;
420 			}
421 			bc = 1;
422 		}
423 		tipc_bclink_unlock();
424 	}
425 
426 	if (unlikely(!bc))
427 		kfree_skb_list(buf);
428 
429 	/* Deliver message clone */
430 	if (likely(!rc))
431 		tipc_sk_mcast_rcv(clbuf);
432 	else
433 		kfree_skb(clbuf);
434 
435 	return rc;
436 }
437 
438 /**
439  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
440  *
441  * Called with both sending node's lock and bclink_lock taken.
442  */
443 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
444 {
445 	bclink_update_last_sent(node, seqno);
446 	node->bclink.last_in = seqno;
447 	node->bclink.oos_state = 0;
448 	bcl->stats.recv_info++;
449 
450 	/*
451 	 * Unicast an ACK periodically, ensuring that
452 	 * all nodes in the cluster don't ACK at the same time
453 	 */
454 
455 	if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
456 		tipc_link_proto_xmit(node->active_links[node->addr & 1],
457 				     STATE_MSG, 0, 0, 0, 0, 0);
458 		bcl->stats.sent_acks++;
459 	}
460 }
461 
462 /**
463  * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
464  *
465  * RCU is locked, no other locks set
466  */
467 void tipc_bclink_rcv(struct sk_buff *buf)
468 {
469 	struct tipc_msg *msg = buf_msg(buf);
470 	struct tipc_node *node;
471 	u32 next_in;
472 	u32 seqno;
473 	int deferred = 0;
474 
475 	/* Screen out unwanted broadcast messages */
476 
477 	if (msg_mc_netid(msg) != tipc_net_id)
478 		goto exit;
479 
480 	node = tipc_node_find(msg_prevnode(msg));
481 	if (unlikely(!node))
482 		goto exit;
483 
484 	tipc_node_lock(node);
485 	if (unlikely(!node->bclink.recv_permitted))
486 		goto unlock;
487 
488 	/* Handle broadcast protocol message */
489 
490 	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
491 		if (msg_type(msg) != STATE_MSG)
492 			goto unlock;
493 		if (msg_destnode(msg) == tipc_own_addr) {
494 			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
495 			tipc_node_unlock(node);
496 			tipc_bclink_lock();
497 			bcl->stats.recv_nacks++;
498 			bclink->retransmit_to = node;
499 			bclink_retransmit_pkt(msg_bcgap_after(msg),
500 					      msg_bcgap_to(msg));
501 			tipc_bclink_unlock();
502 		} else {
503 			tipc_node_unlock(node);
504 			bclink_peek_nack(msg);
505 		}
506 		goto exit;
507 	}
508 
509 	/* Handle in-sequence broadcast message */
510 
511 	seqno = msg_seqno(msg);
512 	next_in = mod(node->bclink.last_in + 1);
513 
514 	if (likely(seqno == next_in)) {
515 receive:
516 		/* Deliver message to destination */
517 
518 		if (likely(msg_isdata(msg))) {
519 			tipc_bclink_lock();
520 			bclink_accept_pkt(node, seqno);
521 			tipc_bclink_unlock();
522 			tipc_node_unlock(node);
523 			if (likely(msg_mcast(msg)))
524 				tipc_sk_mcast_rcv(buf);
525 			else
526 				kfree_skb(buf);
527 		} else if (msg_user(msg) == MSG_BUNDLER) {
528 			tipc_bclink_lock();
529 			bclink_accept_pkt(node, seqno);
530 			bcl->stats.recv_bundles++;
531 			bcl->stats.recv_bundled += msg_msgcnt(msg);
532 			tipc_bclink_unlock();
533 			tipc_node_unlock(node);
534 			tipc_link_bundle_rcv(buf);
535 		} else if (msg_user(msg) == MSG_FRAGMENTER) {
536 			tipc_buf_append(&node->bclink.reasm_buf, &buf);
537 			if (unlikely(!buf && !node->bclink.reasm_buf))
538 				goto unlock;
539 			tipc_bclink_lock();
540 			bclink_accept_pkt(node, seqno);
541 			bcl->stats.recv_fragments++;
542 			if (buf) {
543 				bcl->stats.recv_fragmented++;
544 				msg = buf_msg(buf);
545 				tipc_bclink_unlock();
546 				goto receive;
547 			}
548 			tipc_bclink_unlock();
549 			tipc_node_unlock(node);
550 		} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
551 			tipc_bclink_lock();
552 			bclink_accept_pkt(node, seqno);
553 			tipc_bclink_unlock();
554 			tipc_node_unlock(node);
555 			tipc_named_rcv(buf);
556 		} else {
557 			tipc_bclink_lock();
558 			bclink_accept_pkt(node, seqno);
559 			tipc_bclink_unlock();
560 			tipc_node_unlock(node);
561 			kfree_skb(buf);
562 		}
563 		buf = NULL;
564 
565 		/* Determine new synchronization state */
566 
567 		tipc_node_lock(node);
568 		if (unlikely(!tipc_node_is_up(node)))
569 			goto unlock;
570 
571 		if (node->bclink.last_in == node->bclink.last_sent)
572 			goto unlock;
573 
574 		if (!node->bclink.deferred_head) {
575 			node->bclink.oos_state = 1;
576 			goto unlock;
577 		}
578 
579 		msg = buf_msg(node->bclink.deferred_head);
580 		seqno = msg_seqno(msg);
581 		next_in = mod(next_in + 1);
582 		if (seqno != next_in)
583 			goto unlock;
584 
585 		/* Take in-sequence message from deferred queue & deliver it */
586 
587 		buf = node->bclink.deferred_head;
588 		node->bclink.deferred_head = buf->next;
589 		buf->next = NULL;
590 		node->bclink.deferred_size--;
591 		goto receive;
592 	}
593 
594 	/* Handle out-of-sequence broadcast message */
595 
596 	if (less(next_in, seqno)) {
597 		deferred = tipc_link_defer_pkt(&node->bclink.deferred_head,
598 					       &node->bclink.deferred_tail,
599 					       buf);
600 		node->bclink.deferred_size += deferred;
601 		bclink_update_last_sent(node, seqno);
602 		buf = NULL;
603 	}
604 
605 	tipc_bclink_lock();
606 
607 	if (deferred)
608 		bcl->stats.deferred_recv++;
609 	else
610 		bcl->stats.duplicates++;
611 
612 	tipc_bclink_unlock();
613 
614 unlock:
615 	tipc_node_unlock(node);
616 exit:
617 	kfree_skb(buf);
618 }
619 
620 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
621 {
622 	return (n_ptr->bclink.recv_permitted &&
623 		(tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
624 }
625 
626 
627 /**
628  * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
629  *
630  * Send packet over as many bearers as necessary to reach all nodes
631  * that have joined the broadcast link.
632  *
633  * Returns 0 (packet sent successfully) under all circumstances,
634  * since the broadcast link's pseudo-bearer never blocks
635  */
636 static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
637 			      struct tipc_media_addr *unused2)
638 {
639 	int bp_index;
640 	struct tipc_msg *msg = buf_msg(buf);
641 
642 	/* Prepare broadcast link message for reliable transmission,
643 	 * if first time trying to send it;
644 	 * preparation is skipped for broadcast link protocol messages
645 	 * since they are sent in an unreliable manner and don't need it
646 	 */
647 	if (likely(!msg_non_seq(buf_msg(buf)))) {
648 		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
649 		msg_set_non_seq(msg, 1);
650 		msg_set_mc_netid(msg, tipc_net_id);
651 		bcl->stats.sent_info++;
652 
653 		if (WARN_ON(!bclink->bcast_nodes.count)) {
654 			dump_stack();
655 			return 0;
656 		}
657 	}
658 
659 	/* Send buffer over bearers until all targets reached */
660 	bcbearer->remains = bclink->bcast_nodes;
661 
662 	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
663 		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
664 		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
665 		struct tipc_bearer *bp[2] = {p, s};
666 		struct tipc_bearer *b = bp[msg_link_selector(msg)];
667 		struct sk_buff *tbuf;
668 
669 		if (!p)
670 			break; /* No more bearers to try */
671 		if (!b)
672 			b = p;
673 		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
674 			       &bcbearer->remains_new);
675 		if (bcbearer->remains_new.count == bcbearer->remains.count)
676 			continue; /* Nothing added by bearer pair */
677 
678 		if (bp_index == 0) {
679 			/* Use original buffer for first bearer */
680 			tipc_bearer_send(b->identity, buf, &b->bcast_addr);
681 		} else {
682 			/* Avoid concurrent buffer access */
683 			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
684 			if (!tbuf)
685 				break;
686 			tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
687 			kfree_skb(tbuf); /* Bearer keeps a clone */
688 		}
689 		if (bcbearer->remains_new.count == 0)
690 			break; /* All targets reached */
691 
692 		bcbearer->remains = bcbearer->remains_new;
693 	}
694 
695 	return 0;
696 }
697 
698 /**
699  * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
700  */
701 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
702 {
703 	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
704 	struct tipc_bcbearer_pair *bp_curr;
705 	struct tipc_bearer *b;
706 	int b_index;
707 	int pri;
708 
709 	tipc_bclink_lock();
710 
711 	if (action)
712 		tipc_nmap_add(nm_ptr, node);
713 	else
714 		tipc_nmap_remove(nm_ptr, node);
715 
716 	/* Group bearers by priority (can assume max of two per priority) */
717 	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
718 
719 	rcu_read_lock();
720 	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
721 		b = rcu_dereference_rtnl(bearer_list[b_index]);
722 		if (!b || !b->nodes.count)
723 			continue;
724 
725 		if (!bp_temp[b->priority].primary)
726 			bp_temp[b->priority].primary = b;
727 		else
728 			bp_temp[b->priority].secondary = b;
729 	}
730 	rcu_read_unlock();
731 
732 	/* Create array of bearer pairs for broadcasting */
733 	bp_curr = bcbearer->bpairs;
734 	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
735 
736 	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
737 
738 		if (!bp_temp[pri].primary)
739 			continue;
740 
741 		bp_curr->primary = bp_temp[pri].primary;
742 
743 		if (bp_temp[pri].secondary) {
744 			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
745 					    &bp_temp[pri].secondary->nodes)) {
746 				bp_curr->secondary = bp_temp[pri].secondary;
747 			} else {
748 				bp_curr++;
749 				bp_curr->primary = bp_temp[pri].secondary;
750 			}
751 		}
752 
753 		bp_curr++;
754 	}
755 
756 	tipc_bclink_unlock();
757 }
758 
759 
760 int tipc_bclink_stats(char *buf, const u32 buf_size)
761 {
762 	int ret;
763 	struct tipc_stats *s;
764 
765 	if (!bcl)
766 		return 0;
767 
768 	tipc_bclink_lock();
769 
770 	s = &bcl->stats;
771 
772 	ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
773 			    "  Window:%u packets\n",
774 			    bcl->name, bcl->queue_limit[0]);
775 	ret += tipc_snprintf(buf + ret, buf_size - ret,
776 			     "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
777 			     s->recv_info, s->recv_fragments,
778 			     s->recv_fragmented, s->recv_bundles,
779 			     s->recv_bundled);
780 	ret += tipc_snprintf(buf + ret, buf_size - ret,
781 			     "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
782 			     s->sent_info, s->sent_fragments,
783 			     s->sent_fragmented, s->sent_bundles,
784 			     s->sent_bundled);
785 	ret += tipc_snprintf(buf + ret, buf_size - ret,
786 			     "  RX naks:%u defs:%u dups:%u\n",
787 			     s->recv_nacks, s->deferred_recv, s->duplicates);
788 	ret += tipc_snprintf(buf + ret, buf_size - ret,
789 			     "  TX naks:%u acks:%u dups:%u\n",
790 			     s->sent_nacks, s->sent_acks, s->retransmitted);
791 	ret += tipc_snprintf(buf + ret, buf_size - ret,
792 			     "  Congestion link:%u  Send queue max:%u avg:%u\n",
793 			     s->link_congs, s->max_queue_sz,
794 			     s->queue_sz_counts ?
795 			     (s->accu_queue_sz / s->queue_sz_counts) : 0);
796 
797 	tipc_bclink_unlock();
798 	return ret;
799 }
800 
801 int tipc_bclink_reset_stats(void)
802 {
803 	if (!bcl)
804 		return -ENOPROTOOPT;
805 
806 	tipc_bclink_lock();
807 	memset(&bcl->stats, 0, sizeof(bcl->stats));
808 	tipc_bclink_unlock();
809 	return 0;
810 }
811 
812 int tipc_bclink_set_queue_limits(u32 limit)
813 {
814 	if (!bcl)
815 		return -ENOPROTOOPT;
816 	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
817 		return -EINVAL;
818 
819 	tipc_bclink_lock();
820 	tipc_link_set_queue_limits(bcl, limit);
821 	tipc_bclink_unlock();
822 	return 0;
823 }
824 
825 int tipc_bclink_init(void)
826 {
827 	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
828 	if (!bcbearer)
829 		return -ENOMEM;
830 
831 	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
832 	if (!bclink) {
833 		kfree(bcbearer);
834 		return -ENOMEM;
835 	}
836 
837 	bcl = &bclink->link;
838 	bcbearer->bearer.media = &bcbearer->media;
839 	bcbearer->media.send_msg = tipc_bcbearer_send;
840 	sprintf(bcbearer->media.name, "tipc-broadcast");
841 
842 	spin_lock_init(&bclink->lock);
843 	INIT_LIST_HEAD(&bcl->waiting_ports);
844 	bcl->next_out_no = 1;
845 	spin_lock_init(&bclink->node.lock);
846 	bcl->owner = &bclink->node;
847 	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
848 	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
849 	bcl->bearer_id = MAX_BEARERS;
850 	rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
851 	bcl->state = WORKING_WORKING;
852 	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
853 	return 0;
854 }
855 
856 void tipc_bclink_stop(void)
857 {
858 	tipc_bclink_lock();
859 	tipc_link_purge_queues(bcl);
860 	tipc_bclink_unlock();
861 
862 	RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
863 	synchronize_net();
864 	kfree(bcbearer);
865 	kfree(bclink);
866 }
867 
868 /**
869  * tipc_nmap_add - add a node to a node map
870  */
871 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
872 {
873 	int n = tipc_node(node);
874 	int w = n / WSIZE;
875 	u32 mask = (1 << (n % WSIZE));
876 
877 	if ((nm_ptr->map[w] & mask) == 0) {
878 		nm_ptr->count++;
879 		nm_ptr->map[w] |= mask;
880 	}
881 }
882 
883 /**
884  * tipc_nmap_remove - remove a node from a node map
885  */
886 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
887 {
888 	int n = tipc_node(node);
889 	int w = n / WSIZE;
890 	u32 mask = (1 << (n % WSIZE));
891 
892 	if ((nm_ptr->map[w] & mask) != 0) {
893 		nm_ptr->map[w] &= ~mask;
894 		nm_ptr->count--;
895 	}
896 }
897 
898 /**
899  * tipc_nmap_diff - find differences between node maps
900  * @nm_a: input node map A
901  * @nm_b: input node map B
902  * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
903  */
904 static void tipc_nmap_diff(struct tipc_node_map *nm_a,
905 			   struct tipc_node_map *nm_b,
906 			   struct tipc_node_map *nm_diff)
907 {
908 	int stop = ARRAY_SIZE(nm_a->map);
909 	int w;
910 	int b;
911 	u32 map;
912 
913 	memset(nm_diff, 0, sizeof(*nm_diff));
914 	for (w = 0; w < stop; w++) {
915 		map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
916 		nm_diff->map[w] = map;
917 		if (map != 0) {
918 			for (b = 0 ; b < WSIZE; b++) {
919 				if (map & (1 << b))
920 					nm_diff->count++;
921 			}
922 		}
923 	}
924 }
925 
926 /**
927  * tipc_port_list_add - add a port to a port list, ensuring no duplicates
928  */
929 void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
930 {
931 	struct tipc_port_list *item = pl_ptr;
932 	int i;
933 	int item_sz = PLSIZE;
934 	int cnt = pl_ptr->count;
935 
936 	for (; ; cnt -= item_sz, item = item->next) {
937 		if (cnt < PLSIZE)
938 			item_sz = cnt;
939 		for (i = 0; i < item_sz; i++)
940 			if (item->ports[i] == port)
941 				return;
942 		if (i < PLSIZE) {
943 			item->ports[i] = port;
944 			pl_ptr->count++;
945 			return;
946 		}
947 		if (!item->next) {
948 			item->next = kmalloc(sizeof(*item), GFP_ATOMIC);
949 			if (!item->next) {
950 				pr_warn("Incomplete multicast delivery, no memory\n");
951 				return;
952 			}
953 			item->next->next = NULL;
954 		}
955 	}
956 }
957 
958 /**
959  * tipc_port_list_free - free dynamically created entries in port_list chain
960  *
961  */
962 void tipc_port_list_free(struct tipc_port_list *pl_ptr)
963 {
964 	struct tipc_port_list *item;
965 	struct tipc_port_list *next;
966 
967 	for (item = pl_ptr->next; item; item = next) {
968 		next = item->next;
969 		kfree(item);
970 	}
971 }
972