1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "core.h" 39 #include "link.h" 40 #include "port.h" 41 #include "bcast.h" 42 #include "name_distr.h" 43 44 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 45 46 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 47 48 /** 49 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link 50 * @primary: pointer to primary bearer 51 * @secondary: pointer to secondary bearer 52 * 53 * Bearers must have same priority and same set of reachable destinations 54 * to be paired. 55 */ 56 57 struct tipc_bcbearer_pair { 58 struct tipc_bearer *primary; 59 struct tipc_bearer *secondary; 60 }; 61 62 /** 63 * struct tipc_bcbearer - bearer used by broadcast link 64 * @bearer: (non-standard) broadcast bearer structure 65 * @media: (non-standard) broadcast media structure 66 * @bpairs: array of bearer pairs 67 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort() 68 * @remains: temporary node map used by tipc_bcbearer_send() 69 * @remains_new: temporary node map used tipc_bcbearer_send() 70 * 71 * Note: The fields labelled "temporary" are incorporated into the bearer 72 * to avoid consuming potentially limited stack space through the use of 73 * large local variables within multicast routines. Concurrent access is 74 * prevented through use of the spinlock "bc_lock". 75 */ 76 77 struct tipc_bcbearer { 78 struct tipc_bearer bearer; 79 struct tipc_media media; 80 struct tipc_bcbearer_pair bpairs[MAX_BEARERS]; 81 struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; 82 struct tipc_node_map remains; 83 struct tipc_node_map remains_new; 84 }; 85 86 /** 87 * struct tipc_bclink - link used for broadcast messages 88 * @link: (non-standard) broadcast link structure 89 * @node: (non-standard) node structure representing b'cast link's peer node 90 * @bcast_nodes: map of broadcast-capable nodes 91 * @retransmit_to: node that most recently requested a retransmit 92 * 93 * Handles sequence numbering, fragmentation, bundling, etc. 94 */ 95 96 struct tipc_bclink { 97 struct tipc_link link; 98 struct tipc_node node; 99 struct tipc_node_map bcast_nodes; 100 struct tipc_node *retransmit_to; 101 }; 102 103 static struct tipc_bcbearer bcast_bearer; 104 static struct tipc_bclink bcast_link; 105 106 static struct tipc_bcbearer *bcbearer = &bcast_bearer; 107 static struct tipc_bclink *bclink = &bcast_link; 108 static struct tipc_link *bcl = &bcast_link.link; 109 110 static DEFINE_SPINLOCK(bc_lock); 111 112 const char tipc_bclink_name[] = "broadcast-link"; 113 114 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 115 struct tipc_node_map *nm_b, 116 struct tipc_node_map *nm_diff); 117 118 static u32 bcbuf_acks(struct sk_buff *buf) 119 { 120 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 121 } 122 123 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 124 { 125 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 126 } 127 128 static void bcbuf_decr_acks(struct sk_buff *buf) 129 { 130 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 131 } 132 133 void tipc_bclink_add_node(u32 addr) 134 { 135 spin_lock_bh(&bc_lock); 136 tipc_nmap_add(&bclink->bcast_nodes, addr); 137 spin_unlock_bh(&bc_lock); 138 } 139 140 void tipc_bclink_remove_node(u32 addr) 141 { 142 spin_lock_bh(&bc_lock); 143 tipc_nmap_remove(&bclink->bcast_nodes, addr); 144 spin_unlock_bh(&bc_lock); 145 } 146 147 static void bclink_set_last_sent(void) 148 { 149 if (bcl->next_out) 150 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 151 else 152 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 153 } 154 155 u32 tipc_bclink_get_last_sent(void) 156 { 157 return bcl->fsm_msg_cnt; 158 } 159 160 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 161 { 162 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 163 seqno : node->bclink.last_sent; 164 } 165 166 167 /* 168 * tipc_bclink_retransmit_to - get most recent node to request retransmission 169 * 170 * Called with bc_lock locked 171 */ 172 173 struct tipc_node *tipc_bclink_retransmit_to(void) 174 { 175 return bclink->retransmit_to; 176 } 177 178 /** 179 * bclink_retransmit_pkt - retransmit broadcast packets 180 * @after: sequence number of last packet to *not* retransmit 181 * @to: sequence number of last packet to retransmit 182 * 183 * Called with bc_lock locked 184 */ 185 186 static void bclink_retransmit_pkt(u32 after, u32 to) 187 { 188 struct sk_buff *buf; 189 190 buf = bcl->first_out; 191 while (buf && less_eq(buf_seqno(buf), after)) 192 buf = buf->next; 193 tipc_link_retransmit(bcl, buf, mod(to - after)); 194 } 195 196 /** 197 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 198 * @n_ptr: node that sent acknowledgement info 199 * @acked: broadcast sequence # that has been acknowledged 200 * 201 * Node is locked, bc_lock unlocked. 202 */ 203 204 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 205 { 206 struct sk_buff *crs; 207 struct sk_buff *next; 208 unsigned int released = 0; 209 210 spin_lock_bh(&bc_lock); 211 212 /* Bail out if tx queue is empty (no clean up is required) */ 213 crs = bcl->first_out; 214 if (!crs) 215 goto exit; 216 217 /* Determine which messages need to be acknowledged */ 218 if (acked == INVALID_LINK_SEQ) { 219 /* 220 * Contact with specified node has been lost, so need to 221 * acknowledge sent messages only (if other nodes still exist) 222 * or both sent and unsent messages (otherwise) 223 */ 224 if (bclink->bcast_nodes.count) 225 acked = bcl->fsm_msg_cnt; 226 else 227 acked = bcl->next_out_no; 228 } else { 229 /* 230 * Bail out if specified sequence number does not correspond 231 * to a message that has been sent and not yet acknowledged 232 */ 233 if (less(acked, buf_seqno(crs)) || 234 less(bcl->fsm_msg_cnt, acked) || 235 less_eq(acked, n_ptr->bclink.acked)) 236 goto exit; 237 } 238 239 /* Skip over packets that node has previously acknowledged */ 240 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 241 crs = crs->next; 242 243 /* Update packets that node is now acknowledging */ 244 245 while (crs && less_eq(buf_seqno(crs), acked)) { 246 next = crs->next; 247 248 if (crs != bcl->next_out) 249 bcbuf_decr_acks(crs); 250 else { 251 bcbuf_set_acks(crs, 0); 252 bcl->next_out = next; 253 bclink_set_last_sent(); 254 } 255 256 if (bcbuf_acks(crs) == 0) { 257 bcl->first_out = next; 258 bcl->out_queue_size--; 259 kfree_skb(crs); 260 released = 1; 261 } 262 crs = next; 263 } 264 n_ptr->bclink.acked = acked; 265 266 /* Try resolving broadcast link congestion, if necessary */ 267 268 if (unlikely(bcl->next_out)) { 269 tipc_link_push_queue(bcl); 270 bclink_set_last_sent(); 271 } 272 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 273 tipc_link_wakeup_ports(bcl, 0); 274 exit: 275 spin_unlock_bh(&bc_lock); 276 } 277 278 /* 279 * tipc_bclink_update_link_state - update broadcast link state 280 * 281 * tipc_net_lock and node lock set 282 */ 283 284 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) 285 { 286 struct sk_buff *buf; 287 288 /* Ignore "stale" link state info */ 289 290 if (less_eq(last_sent, n_ptr->bclink.last_in)) 291 return; 292 293 /* Update link synchronization state; quit if in sync */ 294 295 bclink_update_last_sent(n_ptr, last_sent); 296 297 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 298 return; 299 300 /* Update out-of-sync state; quit if loss is still unconfirmed */ 301 302 if ((++n_ptr->bclink.oos_state) == 1) { 303 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 304 return; 305 n_ptr->bclink.oos_state++; 306 } 307 308 /* Don't NACK if one has been recently sent (or seen) */ 309 310 if (n_ptr->bclink.oos_state & 0x1) 311 return; 312 313 /* Send NACK */ 314 315 buf = tipc_buf_acquire(INT_H_SIZE); 316 if (buf) { 317 struct tipc_msg *msg = buf_msg(buf); 318 319 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 320 INT_H_SIZE, n_ptr->addr); 321 msg_set_non_seq(msg, 1); 322 msg_set_mc_netid(msg, tipc_net_id); 323 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 324 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 325 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head 326 ? buf_seqno(n_ptr->bclink.deferred_head) - 1 327 : n_ptr->bclink.last_sent); 328 329 spin_lock_bh(&bc_lock); 330 tipc_bearer_send(&bcbearer->bearer, buf, NULL); 331 bcl->stats.sent_nacks++; 332 spin_unlock_bh(&bc_lock); 333 kfree_skb(buf); 334 335 n_ptr->bclink.oos_state++; 336 } 337 } 338 339 /* 340 * bclink_peek_nack - monitor retransmission requests sent by other nodes 341 * 342 * Delay any upcoming NACK by this node if another node has already 343 * requested the first message this node is going to ask for. 344 * 345 * Only tipc_net_lock set. 346 */ 347 348 static void bclink_peek_nack(struct tipc_msg *msg) 349 { 350 struct tipc_node *n_ptr = tipc_node_find(msg_destnode(msg)); 351 352 if (unlikely(!n_ptr)) 353 return; 354 355 tipc_node_lock(n_ptr); 356 357 if (n_ptr->bclink.supported && 358 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 359 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 360 n_ptr->bclink.oos_state = 2; 361 362 tipc_node_unlock(n_ptr); 363 } 364 365 /* 366 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster 367 */ 368 369 int tipc_bclink_send_msg(struct sk_buff *buf) 370 { 371 int res; 372 373 spin_lock_bh(&bc_lock); 374 375 if (!bclink->bcast_nodes.count) { 376 res = msg_data_sz(buf_msg(buf)); 377 kfree_skb(buf); 378 goto exit; 379 } 380 381 res = tipc_link_send_buf(bcl, buf); 382 if (likely(res >= 0)) { 383 bclink_set_last_sent(); 384 bcl->stats.queue_sz_counts++; 385 bcl->stats.accu_queue_sz += bcl->out_queue_size; 386 } 387 exit: 388 spin_unlock_bh(&bc_lock); 389 return res; 390 } 391 392 /* 393 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 394 * 395 * Called with both sending node's lock and bc_lock taken. 396 */ 397 398 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 399 { 400 bclink_update_last_sent(node, seqno); 401 node->bclink.last_in = seqno; 402 node->bclink.oos_state = 0; 403 bcl->stats.recv_info++; 404 405 /* 406 * Unicast an ACK periodically, ensuring that 407 * all nodes in the cluster don't ACK at the same time 408 */ 409 410 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { 411 tipc_link_send_proto_msg( 412 node->active_links[node->addr & 1], 413 STATE_MSG, 0, 0, 0, 0, 0); 414 bcl->stats.sent_acks++; 415 } 416 } 417 418 /* 419 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 420 * 421 * tipc_net_lock is read_locked, no other locks set 422 */ 423 424 void tipc_bclink_recv_pkt(struct sk_buff *buf) 425 { 426 struct tipc_msg *msg = buf_msg(buf); 427 struct tipc_node *node; 428 u32 next_in; 429 u32 seqno; 430 int deferred; 431 432 /* Screen out unwanted broadcast messages */ 433 434 if (msg_mc_netid(msg) != tipc_net_id) 435 goto exit; 436 437 node = tipc_node_find(msg_prevnode(msg)); 438 if (unlikely(!node)) 439 goto exit; 440 441 tipc_node_lock(node); 442 if (unlikely(!node->bclink.supported)) 443 goto unlock; 444 445 /* Handle broadcast protocol message */ 446 447 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 448 if (msg_type(msg) != STATE_MSG) 449 goto unlock; 450 if (msg_destnode(msg) == tipc_own_addr) { 451 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 452 tipc_node_unlock(node); 453 spin_lock_bh(&bc_lock); 454 bcl->stats.recv_nacks++; 455 bclink->retransmit_to = node; 456 bclink_retransmit_pkt(msg_bcgap_after(msg), 457 msg_bcgap_to(msg)); 458 spin_unlock_bh(&bc_lock); 459 } else { 460 tipc_node_unlock(node); 461 bclink_peek_nack(msg); 462 } 463 goto exit; 464 } 465 466 /* Handle in-sequence broadcast message */ 467 468 seqno = msg_seqno(msg); 469 next_in = mod(node->bclink.last_in + 1); 470 471 if (likely(seqno == next_in)) { 472 receive: 473 /* Deliver message to destination */ 474 475 if (likely(msg_isdata(msg))) { 476 spin_lock_bh(&bc_lock); 477 bclink_accept_pkt(node, seqno); 478 spin_unlock_bh(&bc_lock); 479 tipc_node_unlock(node); 480 if (likely(msg_mcast(msg))) 481 tipc_port_recv_mcast(buf, NULL); 482 else 483 kfree_skb(buf); 484 } else if (msg_user(msg) == MSG_BUNDLER) { 485 spin_lock_bh(&bc_lock); 486 bclink_accept_pkt(node, seqno); 487 bcl->stats.recv_bundles++; 488 bcl->stats.recv_bundled += msg_msgcnt(msg); 489 spin_unlock_bh(&bc_lock); 490 tipc_node_unlock(node); 491 tipc_link_recv_bundle(buf); 492 } else if (msg_user(msg) == MSG_FRAGMENTER) { 493 int ret = tipc_link_recv_fragment(&node->bclink.defragm, 494 &buf, &msg); 495 if (ret < 0) 496 goto unlock; 497 spin_lock_bh(&bc_lock); 498 bclink_accept_pkt(node, seqno); 499 bcl->stats.recv_fragments++; 500 if (ret > 0) 501 bcl->stats.recv_fragmented++; 502 spin_unlock_bh(&bc_lock); 503 tipc_node_unlock(node); 504 tipc_net_route_msg(buf); 505 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 506 spin_lock_bh(&bc_lock); 507 bclink_accept_pkt(node, seqno); 508 spin_unlock_bh(&bc_lock); 509 tipc_node_unlock(node); 510 tipc_named_recv(buf); 511 } else { 512 spin_lock_bh(&bc_lock); 513 bclink_accept_pkt(node, seqno); 514 spin_unlock_bh(&bc_lock); 515 tipc_node_unlock(node); 516 kfree_skb(buf); 517 } 518 buf = NULL; 519 520 /* Determine new synchronization state */ 521 522 tipc_node_lock(node); 523 if (unlikely(!tipc_node_is_up(node))) 524 goto unlock; 525 526 if (node->bclink.last_in == node->bclink.last_sent) 527 goto unlock; 528 529 if (!node->bclink.deferred_head) { 530 node->bclink.oos_state = 1; 531 goto unlock; 532 } 533 534 msg = buf_msg(node->bclink.deferred_head); 535 seqno = msg_seqno(msg); 536 next_in = mod(next_in + 1); 537 if (seqno != next_in) 538 goto unlock; 539 540 /* Take in-sequence message from deferred queue & deliver it */ 541 542 buf = node->bclink.deferred_head; 543 node->bclink.deferred_head = buf->next; 544 node->bclink.deferred_size--; 545 goto receive; 546 } 547 548 /* Handle out-of-sequence broadcast message */ 549 550 if (less(next_in, seqno)) { 551 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, 552 &node->bclink.deferred_tail, 553 buf); 554 node->bclink.deferred_size += deferred; 555 bclink_update_last_sent(node, seqno); 556 buf = NULL; 557 } else 558 deferred = 0; 559 560 spin_lock_bh(&bc_lock); 561 562 if (deferred) 563 bcl->stats.deferred_recv++; 564 else 565 bcl->stats.duplicates++; 566 567 spin_unlock_bh(&bc_lock); 568 569 unlock: 570 tipc_node_unlock(node); 571 exit: 572 kfree_skb(buf); 573 } 574 575 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 576 { 577 return (n_ptr->bclink.supported && 578 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 579 } 580 581 582 /** 583 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 584 * 585 * Send packet over as many bearers as necessary to reach all nodes 586 * that have joined the broadcast link. 587 * 588 * Returns 0 (packet sent successfully) under all circumstances, 589 * since the broadcast link's pseudo-bearer never blocks 590 */ 591 592 static int tipc_bcbearer_send(struct sk_buff *buf, 593 struct tipc_bearer *unused1, 594 struct tipc_media_addr *unused2) 595 { 596 int bp_index; 597 598 /* 599 * Prepare broadcast link message for reliable transmission, 600 * if first time trying to send it; 601 * preparation is skipped for broadcast link protocol messages 602 * since they are sent in an unreliable manner and don't need it 603 */ 604 605 if (likely(!msg_non_seq(buf_msg(buf)))) { 606 struct tipc_msg *msg; 607 608 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 609 msg = buf_msg(buf); 610 msg_set_non_seq(msg, 1); 611 msg_set_mc_netid(msg, tipc_net_id); 612 bcl->stats.sent_info++; 613 614 if (WARN_ON(!bclink->bcast_nodes.count)) { 615 dump_stack(); 616 return 0; 617 } 618 } 619 620 /* Send buffer over bearers until all targets reached */ 621 622 bcbearer->remains = bclink->bcast_nodes; 623 624 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 625 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 626 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 627 628 if (!p) 629 break; /* no more bearers to try */ 630 631 tipc_nmap_diff(&bcbearer->remains, &p->nodes, &bcbearer->remains_new); 632 if (bcbearer->remains_new.count == bcbearer->remains.count) 633 continue; /* bearer pair doesn't add anything */ 634 635 if (p->blocked || 636 p->media->send_msg(buf, p, &p->media->bcast_addr)) { 637 /* unable to send on primary bearer */ 638 if (!s || s->blocked || 639 s->media->send_msg(buf, s, 640 &s->media->bcast_addr)) { 641 /* unable to send on either bearer */ 642 continue; 643 } 644 } 645 646 if (s) { 647 bcbearer->bpairs[bp_index].primary = s; 648 bcbearer->bpairs[bp_index].secondary = p; 649 } 650 651 if (bcbearer->remains_new.count == 0) 652 break; /* all targets reached */ 653 654 bcbearer->remains = bcbearer->remains_new; 655 } 656 657 return 0; 658 } 659 660 /** 661 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 662 */ 663 664 void tipc_bcbearer_sort(void) 665 { 666 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 667 struct tipc_bcbearer_pair *bp_curr; 668 int b_index; 669 int pri; 670 671 spin_lock_bh(&bc_lock); 672 673 /* Group bearers by priority (can assume max of two per priority) */ 674 675 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 676 677 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 678 struct tipc_bearer *b = &tipc_bearers[b_index]; 679 680 if (!b->active || !b->nodes.count) 681 continue; 682 683 if (!bp_temp[b->priority].primary) 684 bp_temp[b->priority].primary = b; 685 else 686 bp_temp[b->priority].secondary = b; 687 } 688 689 /* Create array of bearer pairs for broadcasting */ 690 691 bp_curr = bcbearer->bpairs; 692 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 693 694 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 695 696 if (!bp_temp[pri].primary) 697 continue; 698 699 bp_curr->primary = bp_temp[pri].primary; 700 701 if (bp_temp[pri].secondary) { 702 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 703 &bp_temp[pri].secondary->nodes)) { 704 bp_curr->secondary = bp_temp[pri].secondary; 705 } else { 706 bp_curr++; 707 bp_curr->primary = bp_temp[pri].secondary; 708 } 709 } 710 711 bp_curr++; 712 } 713 714 spin_unlock_bh(&bc_lock); 715 } 716 717 718 int tipc_bclink_stats(char *buf, const u32 buf_size) 719 { 720 struct print_buf pb; 721 722 if (!bcl) 723 return 0; 724 725 tipc_printbuf_init(&pb, buf, buf_size); 726 727 spin_lock_bh(&bc_lock); 728 729 tipc_printf(&pb, "Link <%s>\n" 730 " Window:%u packets\n", 731 bcl->name, bcl->queue_limit[0]); 732 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 733 bcl->stats.recv_info, 734 bcl->stats.recv_fragments, 735 bcl->stats.recv_fragmented, 736 bcl->stats.recv_bundles, 737 bcl->stats.recv_bundled); 738 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 739 bcl->stats.sent_info, 740 bcl->stats.sent_fragments, 741 bcl->stats.sent_fragmented, 742 bcl->stats.sent_bundles, 743 bcl->stats.sent_bundled); 744 tipc_printf(&pb, " RX naks:%u defs:%u dups:%u\n", 745 bcl->stats.recv_nacks, 746 bcl->stats.deferred_recv, 747 bcl->stats.duplicates); 748 tipc_printf(&pb, " TX naks:%u acks:%u dups:%u\n", 749 bcl->stats.sent_nacks, 750 bcl->stats.sent_acks, 751 bcl->stats.retransmitted); 752 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 753 bcl->stats.bearer_congs, 754 bcl->stats.link_congs, 755 bcl->stats.max_queue_sz, 756 bcl->stats.queue_sz_counts 757 ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts) 758 : 0); 759 760 spin_unlock_bh(&bc_lock); 761 return tipc_printbuf_validate(&pb); 762 } 763 764 int tipc_bclink_reset_stats(void) 765 { 766 if (!bcl) 767 return -ENOPROTOOPT; 768 769 spin_lock_bh(&bc_lock); 770 memset(&bcl->stats, 0, sizeof(bcl->stats)); 771 spin_unlock_bh(&bc_lock); 772 return 0; 773 } 774 775 int tipc_bclink_set_queue_limits(u32 limit) 776 { 777 if (!bcl) 778 return -ENOPROTOOPT; 779 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 780 return -EINVAL; 781 782 spin_lock_bh(&bc_lock); 783 tipc_link_set_queue_limits(bcl, limit); 784 spin_unlock_bh(&bc_lock); 785 return 0; 786 } 787 788 void tipc_bclink_init(void) 789 { 790 INIT_LIST_HEAD(&bcbearer->bearer.cong_links); 791 bcbearer->bearer.media = &bcbearer->media; 792 bcbearer->media.send_msg = tipc_bcbearer_send; 793 sprintf(bcbearer->media.name, "tipc-broadcast"); 794 795 INIT_LIST_HEAD(&bcl->waiting_ports); 796 bcl->next_out_no = 1; 797 spin_lock_init(&bclink->node.lock); 798 bcl->owner = &bclink->node; 799 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 800 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 801 bcl->b_ptr = &bcbearer->bearer; 802 bcl->state = WORKING_WORKING; 803 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 804 } 805 806 void tipc_bclink_stop(void) 807 { 808 spin_lock_bh(&bc_lock); 809 tipc_link_stop(bcl); 810 spin_unlock_bh(&bc_lock); 811 812 memset(bclink, 0, sizeof(*bclink)); 813 memset(bcbearer, 0, sizeof(*bcbearer)); 814 } 815 816 817 /** 818 * tipc_nmap_add - add a node to a node map 819 */ 820 821 void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 822 { 823 int n = tipc_node(node); 824 int w = n / WSIZE; 825 u32 mask = (1 << (n % WSIZE)); 826 827 if ((nm_ptr->map[w] & mask) == 0) { 828 nm_ptr->count++; 829 nm_ptr->map[w] |= mask; 830 } 831 } 832 833 /** 834 * tipc_nmap_remove - remove a node from a node map 835 */ 836 837 void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 838 { 839 int n = tipc_node(node); 840 int w = n / WSIZE; 841 u32 mask = (1 << (n % WSIZE)); 842 843 if ((nm_ptr->map[w] & mask) != 0) { 844 nm_ptr->map[w] &= ~mask; 845 nm_ptr->count--; 846 } 847 } 848 849 /** 850 * tipc_nmap_diff - find differences between node maps 851 * @nm_a: input node map A 852 * @nm_b: input node map B 853 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 854 */ 855 856 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 857 struct tipc_node_map *nm_b, 858 struct tipc_node_map *nm_diff) 859 { 860 int stop = ARRAY_SIZE(nm_a->map); 861 int w; 862 int b; 863 u32 map; 864 865 memset(nm_diff, 0, sizeof(*nm_diff)); 866 for (w = 0; w < stop; w++) { 867 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 868 nm_diff->map[w] = map; 869 if (map != 0) { 870 for (b = 0 ; b < WSIZE; b++) { 871 if (map & (1 << b)) 872 nm_diff->count++; 873 } 874 } 875 } 876 } 877 878 /** 879 * tipc_port_list_add - add a port to a port list, ensuring no duplicates 880 */ 881 882 void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port) 883 { 884 struct tipc_port_list *item = pl_ptr; 885 int i; 886 int item_sz = PLSIZE; 887 int cnt = pl_ptr->count; 888 889 for (; ; cnt -= item_sz, item = item->next) { 890 if (cnt < PLSIZE) 891 item_sz = cnt; 892 for (i = 0; i < item_sz; i++) 893 if (item->ports[i] == port) 894 return; 895 if (i < PLSIZE) { 896 item->ports[i] = port; 897 pl_ptr->count++; 898 return; 899 } 900 if (!item->next) { 901 item->next = kmalloc(sizeof(*item), GFP_ATOMIC); 902 if (!item->next) { 903 warn("Incomplete multicast delivery, no memory\n"); 904 return; 905 } 906 item->next->next = NULL; 907 } 908 } 909 } 910 911 /** 912 * tipc_port_list_free - free dynamically created entries in port_list chain 913 * 914 */ 915 916 void tipc_port_list_free(struct tipc_port_list *pl_ptr) 917 { 918 struct tipc_port_list *item; 919 struct tipc_port_list *next; 920 921 for (item = pl_ptr->next; item; item = next) { 922 next = item->next; 923 kfree(item); 924 } 925 } 926 927