1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "core.h" 39 #include "link.h" 40 #include "socket.h" 41 #include "msg.h" 42 #include "bcast.h" 43 #include "name_distr.h" 44 45 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 46 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 47 #define BCBEARER MAX_BEARERS 48 49 /** 50 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link 51 * @primary: pointer to primary bearer 52 * @secondary: pointer to secondary bearer 53 * 54 * Bearers must have same priority and same set of reachable destinations 55 * to be paired. 56 */ 57 58 struct tipc_bcbearer_pair { 59 struct tipc_bearer *primary; 60 struct tipc_bearer *secondary; 61 }; 62 63 /** 64 * struct tipc_bcbearer - bearer used by broadcast link 65 * @bearer: (non-standard) broadcast bearer structure 66 * @media: (non-standard) broadcast media structure 67 * @bpairs: array of bearer pairs 68 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort() 69 * @remains: temporary node map used by tipc_bcbearer_send() 70 * @remains_new: temporary node map used tipc_bcbearer_send() 71 * 72 * Note: The fields labelled "temporary" are incorporated into the bearer 73 * to avoid consuming potentially limited stack space through the use of 74 * large local variables within multicast routines. Concurrent access is 75 * prevented through use of the spinlock "bclink_lock". 76 */ 77 struct tipc_bcbearer { 78 struct tipc_bearer bearer; 79 struct tipc_media media; 80 struct tipc_bcbearer_pair bpairs[MAX_BEARERS]; 81 struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; 82 struct tipc_node_map remains; 83 struct tipc_node_map remains_new; 84 }; 85 86 /** 87 * struct tipc_bclink - link used for broadcast messages 88 * @lock: spinlock governing access to structure 89 * @link: (non-standard) broadcast link structure 90 * @node: (non-standard) node structure representing b'cast link's peer node 91 * @flags: represent bclink states 92 * @bcast_nodes: map of broadcast-capable nodes 93 * @retransmit_to: node that most recently requested a retransmit 94 * 95 * Handles sequence numbering, fragmentation, bundling, etc. 96 */ 97 struct tipc_bclink { 98 spinlock_t lock; 99 struct tipc_link link; 100 struct tipc_node node; 101 unsigned int flags; 102 struct tipc_node_map bcast_nodes; 103 struct tipc_node *retransmit_to; 104 }; 105 106 static struct tipc_bcbearer *bcbearer; 107 static struct tipc_bclink *bclink; 108 static struct tipc_link *bcl; 109 110 const char tipc_bclink_name[] = "broadcast-link"; 111 112 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 113 struct tipc_node_map *nm_b, 114 struct tipc_node_map *nm_diff); 115 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 116 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 117 118 static void tipc_bclink_lock(void) 119 { 120 spin_lock_bh(&bclink->lock); 121 } 122 123 static void tipc_bclink_unlock(void) 124 { 125 struct tipc_node *node = NULL; 126 127 if (likely(!bclink->flags)) { 128 spin_unlock_bh(&bclink->lock); 129 return; 130 } 131 132 if (bclink->flags & TIPC_BCLINK_RESET) { 133 bclink->flags &= ~TIPC_BCLINK_RESET; 134 node = tipc_bclink_retransmit_to(); 135 } 136 spin_unlock_bh(&bclink->lock); 137 138 if (node) 139 tipc_link_reset_all(node); 140 } 141 142 uint tipc_bclink_get_mtu(void) 143 { 144 return MAX_PKT_DEFAULT_MCAST; 145 } 146 147 void tipc_bclink_set_flags(unsigned int flags) 148 { 149 bclink->flags |= flags; 150 } 151 152 static u32 bcbuf_acks(struct sk_buff *buf) 153 { 154 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 155 } 156 157 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 158 { 159 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 160 } 161 162 static void bcbuf_decr_acks(struct sk_buff *buf) 163 { 164 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 165 } 166 167 void tipc_bclink_add_node(u32 addr) 168 { 169 tipc_bclink_lock(); 170 tipc_nmap_add(&bclink->bcast_nodes, addr); 171 tipc_bclink_unlock(); 172 } 173 174 void tipc_bclink_remove_node(u32 addr) 175 { 176 tipc_bclink_lock(); 177 tipc_nmap_remove(&bclink->bcast_nodes, addr); 178 tipc_bclink_unlock(); 179 } 180 181 static void bclink_set_last_sent(void) 182 { 183 if (bcl->next_out) 184 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 185 else 186 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 187 } 188 189 u32 tipc_bclink_get_last_sent(void) 190 { 191 return bcl->fsm_msg_cnt; 192 } 193 194 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 195 { 196 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 197 seqno : node->bclink.last_sent; 198 } 199 200 201 /** 202 * tipc_bclink_retransmit_to - get most recent node to request retransmission 203 * 204 * Called with bclink_lock locked 205 */ 206 struct tipc_node *tipc_bclink_retransmit_to(void) 207 { 208 return bclink->retransmit_to; 209 } 210 211 /** 212 * bclink_retransmit_pkt - retransmit broadcast packets 213 * @after: sequence number of last packet to *not* retransmit 214 * @to: sequence number of last packet to retransmit 215 * 216 * Called with bclink_lock locked 217 */ 218 static void bclink_retransmit_pkt(u32 after, u32 to) 219 { 220 struct sk_buff *skb; 221 222 skb_queue_walk(&bcl->outqueue, skb) { 223 if (more(buf_seqno(skb), after)) 224 break; 225 } 226 tipc_link_retransmit(bcl, skb, mod(to - after)); 227 } 228 229 /** 230 * tipc_bclink_wakeup_users - wake up pending users 231 * 232 * Called with no locks taken 233 */ 234 void tipc_bclink_wakeup_users(void) 235 { 236 struct sk_buff *skb; 237 238 while ((skb = skb_dequeue(&bclink->link.waiting_sks))) 239 tipc_sk_rcv(skb); 240 241 } 242 243 /** 244 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 245 * @n_ptr: node that sent acknowledgement info 246 * @acked: broadcast sequence # that has been acknowledged 247 * 248 * Node is locked, bclink_lock unlocked. 249 */ 250 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 251 { 252 struct sk_buff *skb, *tmp; 253 struct sk_buff *next; 254 unsigned int released = 0; 255 256 tipc_bclink_lock(); 257 /* Bail out if tx queue is empty (no clean up is required) */ 258 skb = skb_peek(&bcl->outqueue); 259 if (!skb) 260 goto exit; 261 262 /* Determine which messages need to be acknowledged */ 263 if (acked == INVALID_LINK_SEQ) { 264 /* 265 * Contact with specified node has been lost, so need to 266 * acknowledge sent messages only (if other nodes still exist) 267 * or both sent and unsent messages (otherwise) 268 */ 269 if (bclink->bcast_nodes.count) 270 acked = bcl->fsm_msg_cnt; 271 else 272 acked = bcl->next_out_no; 273 } else { 274 /* 275 * Bail out if specified sequence number does not correspond 276 * to a message that has been sent and not yet acknowledged 277 */ 278 if (less(acked, buf_seqno(skb)) || 279 less(bcl->fsm_msg_cnt, acked) || 280 less_eq(acked, n_ptr->bclink.acked)) 281 goto exit; 282 } 283 284 /* Skip over packets that node has previously acknowledged */ 285 skb_queue_walk(&bcl->outqueue, skb) { 286 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 287 break; 288 } 289 290 /* Update packets that node is now acknowledging */ 291 skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) { 292 if (more(buf_seqno(skb), acked)) 293 break; 294 295 next = tipc_skb_queue_next(&bcl->outqueue, skb); 296 if (skb != bcl->next_out) { 297 bcbuf_decr_acks(skb); 298 } else { 299 bcbuf_set_acks(skb, 0); 300 bcl->next_out = next; 301 bclink_set_last_sent(); 302 } 303 304 if (bcbuf_acks(skb) == 0) { 305 __skb_unlink(skb, &bcl->outqueue); 306 kfree_skb(skb); 307 released = 1; 308 } 309 } 310 n_ptr->bclink.acked = acked; 311 312 /* Try resolving broadcast link congestion, if necessary */ 313 if (unlikely(bcl->next_out)) { 314 tipc_link_push_packets(bcl); 315 bclink_set_last_sent(); 316 } 317 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks))) 318 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 319 320 exit: 321 tipc_bclink_unlock(); 322 } 323 324 /** 325 * tipc_bclink_update_link_state - update broadcast link state 326 * 327 * RCU and node lock set 328 */ 329 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) 330 { 331 struct sk_buff *buf; 332 333 /* Ignore "stale" link state info */ 334 if (less_eq(last_sent, n_ptr->bclink.last_in)) 335 return; 336 337 /* Update link synchronization state; quit if in sync */ 338 bclink_update_last_sent(n_ptr, last_sent); 339 340 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 341 return; 342 343 /* Update out-of-sync state; quit if loss is still unconfirmed */ 344 if ((++n_ptr->bclink.oos_state) == 1) { 345 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 346 return; 347 n_ptr->bclink.oos_state++; 348 } 349 350 /* Don't NACK if one has been recently sent (or seen) */ 351 if (n_ptr->bclink.oos_state & 0x1) 352 return; 353 354 /* Send NACK */ 355 buf = tipc_buf_acquire(INT_H_SIZE); 356 if (buf) { 357 struct tipc_msg *msg = buf_msg(buf); 358 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); 359 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 360 361 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 362 INT_H_SIZE, n_ptr->addr); 363 msg_set_non_seq(msg, 1); 364 msg_set_mc_netid(msg, tipc_net_id); 365 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 366 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 367 msg_set_bcgap_to(msg, to); 368 369 tipc_bclink_lock(); 370 tipc_bearer_send(MAX_BEARERS, buf, NULL); 371 bcl->stats.sent_nacks++; 372 tipc_bclink_unlock(); 373 kfree_skb(buf); 374 375 n_ptr->bclink.oos_state++; 376 } 377 } 378 379 /** 380 * bclink_peek_nack - monitor retransmission requests sent by other nodes 381 * 382 * Delay any upcoming NACK by this node if another node has already 383 * requested the first message this node is going to ask for. 384 */ 385 static void bclink_peek_nack(struct tipc_msg *msg) 386 { 387 struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); 388 389 if (unlikely(!n_ptr)) 390 return; 391 392 tipc_node_lock(n_ptr); 393 394 if (n_ptr->bclink.recv_permitted && 395 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 396 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 397 n_ptr->bclink.oos_state = 2; 398 399 tipc_node_unlock(n_ptr); 400 } 401 402 /* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 403 * and to identified node local sockets 404 * @list: chain of buffers containing message 405 * Consumes the buffer chain, except when returning -ELINKCONG 406 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 407 */ 408 int tipc_bclink_xmit(struct sk_buff_head *list) 409 { 410 int rc = 0; 411 int bc = 0; 412 struct sk_buff *skb; 413 414 /* Prepare clone of message for local node */ 415 skb = tipc_msg_reassemble(list); 416 if (unlikely(!skb)) { 417 __skb_queue_purge(list); 418 return -EHOSTUNREACH; 419 } 420 421 /* Broadcast to all other nodes */ 422 if (likely(bclink)) { 423 tipc_bclink_lock(); 424 if (likely(bclink->bcast_nodes.count)) { 425 rc = __tipc_link_xmit(bcl, list); 426 if (likely(!rc)) { 427 u32 len = skb_queue_len(&bcl->outqueue); 428 429 bclink_set_last_sent(); 430 bcl->stats.queue_sz_counts++; 431 bcl->stats.accu_queue_sz += len; 432 } 433 bc = 1; 434 } 435 tipc_bclink_unlock(); 436 } 437 438 if (unlikely(!bc)) 439 __skb_queue_purge(list); 440 441 /* Deliver message clone */ 442 if (likely(!rc)) 443 tipc_sk_mcast_rcv(skb); 444 else 445 kfree_skb(skb); 446 447 return rc; 448 } 449 450 /** 451 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 452 * 453 * Called with both sending node's lock and bclink_lock taken. 454 */ 455 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 456 { 457 bclink_update_last_sent(node, seqno); 458 node->bclink.last_in = seqno; 459 node->bclink.oos_state = 0; 460 bcl->stats.recv_info++; 461 462 /* 463 * Unicast an ACK periodically, ensuring that 464 * all nodes in the cluster don't ACK at the same time 465 */ 466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { 467 tipc_link_proto_xmit(node->active_links[node->addr & 1], 468 STATE_MSG, 0, 0, 0, 0, 0); 469 bcl->stats.sent_acks++; 470 } 471 } 472 473 /** 474 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards 475 * 476 * RCU is locked, no other locks set 477 */ 478 void tipc_bclink_rcv(struct sk_buff *buf) 479 { 480 struct tipc_msg *msg = buf_msg(buf); 481 struct tipc_node *node; 482 u32 next_in; 483 u32 seqno; 484 int deferred = 0; 485 486 /* Screen out unwanted broadcast messages */ 487 if (msg_mc_netid(msg) != tipc_net_id) 488 goto exit; 489 490 node = tipc_node_find(msg_prevnode(msg)); 491 if (unlikely(!node)) 492 goto exit; 493 494 tipc_node_lock(node); 495 if (unlikely(!node->bclink.recv_permitted)) 496 goto unlock; 497 498 /* Handle broadcast protocol message */ 499 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 500 if (msg_type(msg) != STATE_MSG) 501 goto unlock; 502 if (msg_destnode(msg) == tipc_own_addr) { 503 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 504 tipc_node_unlock(node); 505 tipc_bclink_lock(); 506 bcl->stats.recv_nacks++; 507 bclink->retransmit_to = node; 508 bclink_retransmit_pkt(msg_bcgap_after(msg), 509 msg_bcgap_to(msg)); 510 tipc_bclink_unlock(); 511 } else { 512 tipc_node_unlock(node); 513 bclink_peek_nack(msg); 514 } 515 goto exit; 516 } 517 518 /* Handle in-sequence broadcast message */ 519 seqno = msg_seqno(msg); 520 next_in = mod(node->bclink.last_in + 1); 521 522 if (likely(seqno == next_in)) { 523 receive: 524 /* Deliver message to destination */ 525 if (likely(msg_isdata(msg))) { 526 tipc_bclink_lock(); 527 bclink_accept_pkt(node, seqno); 528 tipc_bclink_unlock(); 529 tipc_node_unlock(node); 530 if (likely(msg_mcast(msg))) 531 tipc_sk_mcast_rcv(buf); 532 else 533 kfree_skb(buf); 534 } else if (msg_user(msg) == MSG_BUNDLER) { 535 tipc_bclink_lock(); 536 bclink_accept_pkt(node, seqno); 537 bcl->stats.recv_bundles++; 538 bcl->stats.recv_bundled += msg_msgcnt(msg); 539 tipc_bclink_unlock(); 540 tipc_node_unlock(node); 541 tipc_link_bundle_rcv(buf); 542 } else if (msg_user(msg) == MSG_FRAGMENTER) { 543 tipc_buf_append(&node->bclink.reasm_buf, &buf); 544 if (unlikely(!buf && !node->bclink.reasm_buf)) 545 goto unlock; 546 tipc_bclink_lock(); 547 bclink_accept_pkt(node, seqno); 548 bcl->stats.recv_fragments++; 549 if (buf) { 550 bcl->stats.recv_fragmented++; 551 msg = buf_msg(buf); 552 tipc_bclink_unlock(); 553 goto receive; 554 } 555 tipc_bclink_unlock(); 556 tipc_node_unlock(node); 557 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 558 tipc_bclink_lock(); 559 bclink_accept_pkt(node, seqno); 560 tipc_bclink_unlock(); 561 tipc_node_unlock(node); 562 tipc_named_rcv(buf); 563 } else { 564 tipc_bclink_lock(); 565 bclink_accept_pkt(node, seqno); 566 tipc_bclink_unlock(); 567 tipc_node_unlock(node); 568 kfree_skb(buf); 569 } 570 buf = NULL; 571 572 /* Determine new synchronization state */ 573 tipc_node_lock(node); 574 if (unlikely(!tipc_node_is_up(node))) 575 goto unlock; 576 577 if (node->bclink.last_in == node->bclink.last_sent) 578 goto unlock; 579 580 if (skb_queue_empty(&node->bclink.deferred_queue)) { 581 node->bclink.oos_state = 1; 582 goto unlock; 583 } 584 585 msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); 586 seqno = msg_seqno(msg); 587 next_in = mod(next_in + 1); 588 if (seqno != next_in) 589 goto unlock; 590 591 /* Take in-sequence message from deferred queue & deliver it */ 592 buf = __skb_dequeue(&node->bclink.deferred_queue); 593 goto receive; 594 } 595 596 /* Handle out-of-sequence broadcast message */ 597 if (less(next_in, seqno)) { 598 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, 599 buf); 600 bclink_update_last_sent(node, seqno); 601 buf = NULL; 602 } 603 604 tipc_bclink_lock(); 605 606 if (deferred) 607 bcl->stats.deferred_recv++; 608 else 609 bcl->stats.duplicates++; 610 611 tipc_bclink_unlock(); 612 613 unlock: 614 tipc_node_unlock(node); 615 exit: 616 kfree_skb(buf); 617 } 618 619 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 620 { 621 return (n_ptr->bclink.recv_permitted && 622 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 623 } 624 625 626 /** 627 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 628 * 629 * Send packet over as many bearers as necessary to reach all nodes 630 * that have joined the broadcast link. 631 * 632 * Returns 0 (packet sent successfully) under all circumstances, 633 * since the broadcast link's pseudo-bearer never blocks 634 */ 635 static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1, 636 struct tipc_media_addr *unused2) 637 { 638 int bp_index; 639 struct tipc_msg *msg = buf_msg(buf); 640 641 /* Prepare broadcast link message for reliable transmission, 642 * if first time trying to send it; 643 * preparation is skipped for broadcast link protocol messages 644 * since they are sent in an unreliable manner and don't need it 645 */ 646 if (likely(!msg_non_seq(buf_msg(buf)))) { 647 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 648 msg_set_non_seq(msg, 1); 649 msg_set_mc_netid(msg, tipc_net_id); 650 bcl->stats.sent_info++; 651 652 if (WARN_ON(!bclink->bcast_nodes.count)) { 653 dump_stack(); 654 return 0; 655 } 656 } 657 658 /* Send buffer over bearers until all targets reached */ 659 bcbearer->remains = bclink->bcast_nodes; 660 661 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 662 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 663 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 664 struct tipc_bearer *bp[2] = {p, s}; 665 struct tipc_bearer *b = bp[msg_link_selector(msg)]; 666 struct sk_buff *tbuf; 667 668 if (!p) 669 break; /* No more bearers to try */ 670 if (!b) 671 b = p; 672 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 673 &bcbearer->remains_new); 674 if (bcbearer->remains_new.count == bcbearer->remains.count) 675 continue; /* Nothing added by bearer pair */ 676 677 if (bp_index == 0) { 678 /* Use original buffer for first bearer */ 679 tipc_bearer_send(b->identity, buf, &b->bcast_addr); 680 } else { 681 /* Avoid concurrent buffer access */ 682 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); 683 if (!tbuf) 684 break; 685 tipc_bearer_send(b->identity, tbuf, &b->bcast_addr); 686 kfree_skb(tbuf); /* Bearer keeps a clone */ 687 } 688 if (bcbearer->remains_new.count == 0) 689 break; /* All targets reached */ 690 691 bcbearer->remains = bcbearer->remains_new; 692 } 693 694 return 0; 695 } 696 697 /** 698 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 699 */ 700 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action) 701 { 702 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 703 struct tipc_bcbearer_pair *bp_curr; 704 struct tipc_bearer *b; 705 int b_index; 706 int pri; 707 708 tipc_bclink_lock(); 709 710 if (action) 711 tipc_nmap_add(nm_ptr, node); 712 else 713 tipc_nmap_remove(nm_ptr, node); 714 715 /* Group bearers by priority (can assume max of two per priority) */ 716 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 717 718 rcu_read_lock(); 719 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 720 b = rcu_dereference_rtnl(bearer_list[b_index]); 721 if (!b || !b->nodes.count) 722 continue; 723 724 if (!bp_temp[b->priority].primary) 725 bp_temp[b->priority].primary = b; 726 else 727 bp_temp[b->priority].secondary = b; 728 } 729 rcu_read_unlock(); 730 731 /* Create array of bearer pairs for broadcasting */ 732 bp_curr = bcbearer->bpairs; 733 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 734 735 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 736 737 if (!bp_temp[pri].primary) 738 continue; 739 740 bp_curr->primary = bp_temp[pri].primary; 741 742 if (bp_temp[pri].secondary) { 743 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 744 &bp_temp[pri].secondary->nodes)) { 745 bp_curr->secondary = bp_temp[pri].secondary; 746 } else { 747 bp_curr++; 748 bp_curr->primary = bp_temp[pri].secondary; 749 } 750 } 751 752 bp_curr++; 753 } 754 755 tipc_bclink_unlock(); 756 } 757 758 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 759 struct tipc_stats *stats) 760 { 761 int i; 762 struct nlattr *nest; 763 764 struct nla_map { 765 __u32 key; 766 __u32 val; 767 }; 768 769 struct nla_map map[] = { 770 {TIPC_NLA_STATS_RX_INFO, stats->recv_info}, 771 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments}, 772 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented}, 773 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles}, 774 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled}, 775 {TIPC_NLA_STATS_TX_INFO, stats->sent_info}, 776 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments}, 777 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented}, 778 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles}, 779 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled}, 780 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks}, 781 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv}, 782 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks}, 783 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks}, 784 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted}, 785 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates}, 786 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs}, 787 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz}, 788 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ? 789 (stats->accu_queue_sz / stats->queue_sz_counts) : 0} 790 }; 791 792 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS); 793 if (!nest) 794 return -EMSGSIZE; 795 796 for (i = 0; i < ARRAY_SIZE(map); i++) 797 if (nla_put_u32(skb, map[i].key, map[i].val)) 798 goto msg_full; 799 800 nla_nest_end(skb, nest); 801 802 return 0; 803 msg_full: 804 nla_nest_cancel(skb, nest); 805 806 return -EMSGSIZE; 807 } 808 809 int tipc_nl_add_bc_link(struct tipc_nl_msg *msg) 810 { 811 int err; 812 void *hdr; 813 struct nlattr *attrs; 814 struct nlattr *prop; 815 816 if (!bcl) 817 return 0; 818 819 tipc_bclink_lock(); 820 821 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family, 822 NLM_F_MULTI, TIPC_NL_LINK_GET); 823 if (!hdr) 824 return -EMSGSIZE; 825 826 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); 827 if (!attrs) 828 goto msg_full; 829 830 /* The broadcast link is always up */ 831 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 832 goto attr_msg_full; 833 834 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST)) 835 goto attr_msg_full; 836 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name)) 837 goto attr_msg_full; 838 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no)) 839 goto attr_msg_full; 840 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no)) 841 goto attr_msg_full; 842 843 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 844 if (!prop) 845 goto attr_msg_full; 846 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) 847 goto prop_msg_full; 848 nla_nest_end(msg->skb, prop); 849 850 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); 851 if (err) 852 goto attr_msg_full; 853 854 tipc_bclink_unlock(); 855 nla_nest_end(msg->skb, attrs); 856 genlmsg_end(msg->skb, hdr); 857 858 return 0; 859 860 prop_msg_full: 861 nla_nest_cancel(msg->skb, prop); 862 attr_msg_full: 863 nla_nest_cancel(msg->skb, attrs); 864 msg_full: 865 tipc_bclink_unlock(); 866 genlmsg_cancel(msg->skb, hdr); 867 868 return -EMSGSIZE; 869 } 870 871 int tipc_bclink_stats(char *buf, const u32 buf_size) 872 { 873 int ret; 874 struct tipc_stats *s; 875 876 if (!bcl) 877 return 0; 878 879 tipc_bclink_lock(); 880 881 s = &bcl->stats; 882 883 ret = tipc_snprintf(buf, buf_size, "Link <%s>\n" 884 " Window:%u packets\n", 885 bcl->name, bcl->queue_limit[0]); 886 ret += tipc_snprintf(buf + ret, buf_size - ret, 887 " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 888 s->recv_info, s->recv_fragments, 889 s->recv_fragmented, s->recv_bundles, 890 s->recv_bundled); 891 ret += tipc_snprintf(buf + ret, buf_size - ret, 892 " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 893 s->sent_info, s->sent_fragments, 894 s->sent_fragmented, s->sent_bundles, 895 s->sent_bundled); 896 ret += tipc_snprintf(buf + ret, buf_size - ret, 897 " RX naks:%u defs:%u dups:%u\n", 898 s->recv_nacks, s->deferred_recv, s->duplicates); 899 ret += tipc_snprintf(buf + ret, buf_size - ret, 900 " TX naks:%u acks:%u dups:%u\n", 901 s->sent_nacks, s->sent_acks, s->retransmitted); 902 ret += tipc_snprintf(buf + ret, buf_size - ret, 903 " Congestion link:%u Send queue max:%u avg:%u\n", 904 s->link_congs, s->max_queue_sz, 905 s->queue_sz_counts ? 906 (s->accu_queue_sz / s->queue_sz_counts) : 0); 907 908 tipc_bclink_unlock(); 909 return ret; 910 } 911 912 int tipc_bclink_reset_stats(void) 913 { 914 if (!bcl) 915 return -ENOPROTOOPT; 916 917 tipc_bclink_lock(); 918 memset(&bcl->stats, 0, sizeof(bcl->stats)); 919 tipc_bclink_unlock(); 920 return 0; 921 } 922 923 int tipc_bclink_set_queue_limits(u32 limit) 924 { 925 if (!bcl) 926 return -ENOPROTOOPT; 927 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 928 return -EINVAL; 929 930 tipc_bclink_lock(); 931 tipc_link_set_queue_limits(bcl, limit); 932 tipc_bclink_unlock(); 933 return 0; 934 } 935 936 int tipc_bclink_init(void) 937 { 938 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 939 if (!bcbearer) 940 return -ENOMEM; 941 942 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 943 if (!bclink) { 944 kfree(bcbearer); 945 return -ENOMEM; 946 } 947 948 bcl = &bclink->link; 949 bcbearer->bearer.media = &bcbearer->media; 950 bcbearer->media.send_msg = tipc_bcbearer_send; 951 sprintf(bcbearer->media.name, "tipc-broadcast"); 952 953 spin_lock_init(&bclink->lock); 954 __skb_queue_head_init(&bcl->outqueue); 955 __skb_queue_head_init(&bcl->deferred_queue); 956 skb_queue_head_init(&bcl->waiting_sks); 957 bcl->next_out_no = 1; 958 spin_lock_init(&bclink->node.lock); 959 __skb_queue_head_init(&bclink->node.waiting_sks); 960 bcl->owner = &bclink->node; 961 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 962 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 963 bcl->bearer_id = MAX_BEARERS; 964 rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer); 965 bcl->state = WORKING_WORKING; 966 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 967 return 0; 968 } 969 970 void tipc_bclink_stop(void) 971 { 972 tipc_bclink_lock(); 973 tipc_link_purge_queues(bcl); 974 tipc_bclink_unlock(); 975 976 RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); 977 synchronize_net(); 978 kfree(bcbearer); 979 kfree(bclink); 980 } 981 982 /** 983 * tipc_nmap_add - add a node to a node map 984 */ 985 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 986 { 987 int n = tipc_node(node); 988 int w = n / WSIZE; 989 u32 mask = (1 << (n % WSIZE)); 990 991 if ((nm_ptr->map[w] & mask) == 0) { 992 nm_ptr->count++; 993 nm_ptr->map[w] |= mask; 994 } 995 } 996 997 /** 998 * tipc_nmap_remove - remove a node from a node map 999 */ 1000 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 1001 { 1002 int n = tipc_node(node); 1003 int w = n / WSIZE; 1004 u32 mask = (1 << (n % WSIZE)); 1005 1006 if ((nm_ptr->map[w] & mask) != 0) { 1007 nm_ptr->map[w] &= ~mask; 1008 nm_ptr->count--; 1009 } 1010 } 1011 1012 /** 1013 * tipc_nmap_diff - find differences between node maps 1014 * @nm_a: input node map A 1015 * @nm_b: input node map B 1016 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 1017 */ 1018 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 1019 struct tipc_node_map *nm_b, 1020 struct tipc_node_map *nm_diff) 1021 { 1022 int stop = ARRAY_SIZE(nm_a->map); 1023 int w; 1024 int b; 1025 u32 map; 1026 1027 memset(nm_diff, 0, sizeof(*nm_diff)); 1028 for (w = 0; w < stop; w++) { 1029 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 1030 nm_diff->map[w] = map; 1031 if (map != 0) { 1032 for (b = 0 ; b < WSIZE; b++) { 1033 if (map & (1 << b)) 1034 nm_diff->count++; 1035 } 1036 } 1037 } 1038 } 1039 1040 /** 1041 * tipc_port_list_add - add a port to a port list, ensuring no duplicates 1042 */ 1043 void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port) 1044 { 1045 struct tipc_port_list *item = pl_ptr; 1046 int i; 1047 int item_sz = PLSIZE; 1048 int cnt = pl_ptr->count; 1049 1050 for (; ; cnt -= item_sz, item = item->next) { 1051 if (cnt < PLSIZE) 1052 item_sz = cnt; 1053 for (i = 0; i < item_sz; i++) 1054 if (item->ports[i] == port) 1055 return; 1056 if (i < PLSIZE) { 1057 item->ports[i] = port; 1058 pl_ptr->count++; 1059 return; 1060 } 1061 if (!item->next) { 1062 item->next = kmalloc(sizeof(*item), GFP_ATOMIC); 1063 if (!item->next) { 1064 pr_warn("Incomplete multicast delivery, no memory\n"); 1065 return; 1066 } 1067 item->next->next = NULL; 1068 } 1069 } 1070 } 1071 1072 /** 1073 * tipc_port_list_free - free dynamically created entries in port_list chain 1074 * 1075 */ 1076 void tipc_port_list_free(struct tipc_port_list *pl_ptr) 1077 { 1078 struct tipc_port_list *item; 1079 struct tipc_port_list *next; 1080 1081 for (item = pl_ptr->next; item; item = next) { 1082 next = item->next; 1083 kfree(item); 1084 } 1085 } 1086