1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "core.h" 39 #include "link.h" 40 #include "port.h" 41 #include "bcast.h" 42 43 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 44 45 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 46 47 /* 48 * Loss rate for incoming broadcast frames; used to test retransmission code. 49 * Set to N to cause every N'th frame to be discarded; 0 => don't discard any. 50 */ 51 52 #define TIPC_BCAST_LOSS_RATE 0 53 54 /** 55 * struct bcbearer_pair - a pair of bearers used by broadcast link 56 * @primary: pointer to primary bearer 57 * @secondary: pointer to secondary bearer 58 * 59 * Bearers must have same priority and same set of reachable destinations 60 * to be paired. 61 */ 62 63 struct bcbearer_pair { 64 struct bearer *primary; 65 struct bearer *secondary; 66 }; 67 68 /** 69 * struct bcbearer - bearer used by broadcast link 70 * @bearer: (non-standard) broadcast bearer structure 71 * @media: (non-standard) broadcast media structure 72 * @bpairs: array of bearer pairs 73 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort() 74 * @remains: temporary node map used by tipc_bcbearer_send() 75 * @remains_new: temporary node map used tipc_bcbearer_send() 76 * 77 * Note: The fields labelled "temporary" are incorporated into the bearer 78 * to avoid consuming potentially limited stack space through the use of 79 * large local variables within multicast routines. Concurrent access is 80 * prevented through use of the spinlock "bc_lock". 81 */ 82 83 struct bcbearer { 84 struct bearer bearer; 85 struct media media; 86 struct bcbearer_pair bpairs[MAX_BEARERS]; 87 struct bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1]; 88 struct tipc_node_map remains; 89 struct tipc_node_map remains_new; 90 }; 91 92 /** 93 * struct bclink - link used for broadcast messages 94 * @link: (non-standard) broadcast link structure 95 * @node: (non-standard) node structure representing b'cast link's peer node 96 * 97 * Handles sequence numbering, fragmentation, bundling, etc. 98 */ 99 100 struct bclink { 101 struct link link; 102 struct tipc_node node; 103 }; 104 105 106 static struct bcbearer *bcbearer; 107 static struct bclink *bclink; 108 static struct link *bcl; 109 static DEFINE_SPINLOCK(bc_lock); 110 111 /* broadcast-capable node map */ 112 struct tipc_node_map tipc_bcast_nmap; 113 114 const char tipc_bclink_name[] = "broadcast-link"; 115 116 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 117 struct tipc_node_map *nm_b, 118 struct tipc_node_map *nm_diff); 119 120 static u32 buf_seqno(struct sk_buff *buf) 121 { 122 return msg_seqno(buf_msg(buf)); 123 } 124 125 static u32 bcbuf_acks(struct sk_buff *buf) 126 { 127 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 128 } 129 130 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 131 { 132 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 133 } 134 135 static void bcbuf_decr_acks(struct sk_buff *buf) 136 { 137 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 138 } 139 140 141 static void bclink_set_last_sent(void) 142 { 143 if (bcl->next_out) 144 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 145 else 146 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 147 } 148 149 u32 tipc_bclink_get_last_sent(void) 150 { 151 return bcl->fsm_msg_cnt; 152 } 153 154 /** 155 * bclink_set_gap - set gap according to contents of current deferred pkt queue 156 * 157 * Called with 'node' locked, bc_lock unlocked 158 */ 159 160 static void bclink_set_gap(struct tipc_node *n_ptr) 161 { 162 struct sk_buff *buf = n_ptr->bclink.deferred_head; 163 164 n_ptr->bclink.gap_after = n_ptr->bclink.gap_to = 165 mod(n_ptr->bclink.last_in); 166 if (unlikely(buf != NULL)) 167 n_ptr->bclink.gap_to = mod(buf_seqno(buf) - 1); 168 } 169 170 /** 171 * bclink_ack_allowed - test if ACK or NACK message can be sent at this moment 172 * 173 * This mechanism endeavours to prevent all nodes in network from trying 174 * to ACK or NACK at the same time. 175 * 176 * Note: TIPC uses a different trigger to distribute ACKs than it does to 177 * distribute NACKs, but tries to use the same spacing (divide by 16). 178 */ 179 180 static int bclink_ack_allowed(u32 n) 181 { 182 return (n % TIPC_MIN_LINK_WIN) == tipc_own_tag; 183 } 184 185 186 /** 187 * bclink_retransmit_pkt - retransmit broadcast packets 188 * @after: sequence number of last packet to *not* retransmit 189 * @to: sequence number of last packet to retransmit 190 * 191 * Called with bc_lock locked 192 */ 193 194 static void bclink_retransmit_pkt(u32 after, u32 to) 195 { 196 struct sk_buff *buf; 197 198 buf = bcl->first_out; 199 while (buf && less_eq(buf_seqno(buf), after)) 200 buf = buf->next; 201 tipc_link_retransmit(bcl, buf, mod(to - after)); 202 } 203 204 /** 205 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 206 * @n_ptr: node that sent acknowledgement info 207 * @acked: broadcast sequence # that has been acknowledged 208 * 209 * Node is locked, bc_lock unlocked. 210 */ 211 212 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 213 { 214 struct sk_buff *crs; 215 struct sk_buff *next; 216 unsigned int released = 0; 217 218 if (less_eq(acked, n_ptr->bclink.acked)) 219 return; 220 221 spin_lock_bh(&bc_lock); 222 223 /* Skip over packets that node has previously acknowledged */ 224 225 crs = bcl->first_out; 226 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 227 crs = crs->next; 228 229 /* Update packets that node is now acknowledging */ 230 231 while (crs && less_eq(buf_seqno(crs), acked)) { 232 next = crs->next; 233 bcbuf_decr_acks(crs); 234 if (bcbuf_acks(crs) == 0) { 235 bcl->first_out = next; 236 bcl->out_queue_size--; 237 buf_discard(crs); 238 released = 1; 239 } 240 crs = next; 241 } 242 n_ptr->bclink.acked = acked; 243 244 /* Try resolving broadcast link congestion, if necessary */ 245 246 if (unlikely(bcl->next_out)) { 247 tipc_link_push_queue(bcl); 248 bclink_set_last_sent(); 249 } 250 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 251 tipc_link_wakeup_ports(bcl, 0); 252 spin_unlock_bh(&bc_lock); 253 } 254 255 /** 256 * bclink_send_ack - unicast an ACK msg 257 * 258 * tipc_net_lock and node lock set 259 */ 260 261 static void bclink_send_ack(struct tipc_node *n_ptr) 262 { 263 struct link *l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 264 265 if (l_ptr != NULL) 266 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 267 } 268 269 /** 270 * bclink_send_nack- broadcast a NACK msg 271 * 272 * tipc_net_lock and node lock set 273 */ 274 275 static void bclink_send_nack(struct tipc_node *n_ptr) 276 { 277 struct sk_buff *buf; 278 struct tipc_msg *msg; 279 280 if (!less(n_ptr->bclink.gap_after, n_ptr->bclink.gap_to)) 281 return; 282 283 buf = tipc_buf_acquire(INT_H_SIZE); 284 if (buf) { 285 msg = buf_msg(buf); 286 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 287 INT_H_SIZE, n_ptr->addr); 288 msg_set_mc_netid(msg, tipc_net_id); 289 msg_set_bcast_ack(msg, mod(n_ptr->bclink.last_in)); 290 msg_set_bcgap_after(msg, n_ptr->bclink.gap_after); 291 msg_set_bcgap_to(msg, n_ptr->bclink.gap_to); 292 msg_set_bcast_tag(msg, tipc_own_tag); 293 294 if (tipc_bearer_send(&bcbearer->bearer, buf, NULL)) { 295 bcl->stats.sent_nacks++; 296 buf_discard(buf); 297 } else { 298 tipc_bearer_schedule(bcl->b_ptr, bcl); 299 bcl->proto_msg_queue = buf; 300 bcl->stats.bearer_congs++; 301 } 302 303 /* 304 * Ensure we doesn't send another NACK msg to the node 305 * until 16 more deferred messages arrive from it 306 * (i.e. helps prevent all nodes from NACK'ing at same time) 307 */ 308 309 n_ptr->bclink.nack_sync = tipc_own_tag; 310 } 311 } 312 313 /** 314 * tipc_bclink_check_gap - send a NACK if a sequence gap exists 315 * 316 * tipc_net_lock and node lock set 317 */ 318 319 void tipc_bclink_check_gap(struct tipc_node *n_ptr, u32 last_sent) 320 { 321 if (!n_ptr->bclink.supported || 322 less_eq(last_sent, mod(n_ptr->bclink.last_in))) 323 return; 324 325 bclink_set_gap(n_ptr); 326 if (n_ptr->bclink.gap_after == n_ptr->bclink.gap_to) 327 n_ptr->bclink.gap_to = last_sent; 328 bclink_send_nack(n_ptr); 329 } 330 331 /** 332 * tipc_bclink_peek_nack - process a NACK msg meant for another node 333 * 334 * Only tipc_net_lock set. 335 */ 336 337 static void tipc_bclink_peek_nack(u32 dest, u32 sender_tag, u32 gap_after, u32 gap_to) 338 { 339 struct tipc_node *n_ptr = tipc_node_find(dest); 340 u32 my_after, my_to; 341 342 if (unlikely(!n_ptr || !tipc_node_is_up(n_ptr))) 343 return; 344 tipc_node_lock(n_ptr); 345 /* 346 * Modify gap to suppress unnecessary NACKs from this node 347 */ 348 my_after = n_ptr->bclink.gap_after; 349 my_to = n_ptr->bclink.gap_to; 350 351 if (less_eq(gap_after, my_after)) { 352 if (less(my_after, gap_to) && less(gap_to, my_to)) 353 n_ptr->bclink.gap_after = gap_to; 354 else if (less_eq(my_to, gap_to)) 355 n_ptr->bclink.gap_to = n_ptr->bclink.gap_after; 356 } else if (less_eq(gap_after, my_to)) { 357 if (less_eq(my_to, gap_to)) 358 n_ptr->bclink.gap_to = gap_after; 359 } else { 360 /* 361 * Expand gap if missing bufs not in deferred queue: 362 */ 363 struct sk_buff *buf = n_ptr->bclink.deferred_head; 364 u32 prev = n_ptr->bclink.gap_to; 365 366 for (; buf; buf = buf->next) { 367 u32 seqno = buf_seqno(buf); 368 369 if (mod(seqno - prev) != 1) { 370 buf = NULL; 371 break; 372 } 373 if (seqno == gap_after) 374 break; 375 prev = seqno; 376 } 377 if (buf == NULL) 378 n_ptr->bclink.gap_to = gap_after; 379 } 380 /* 381 * Some nodes may send a complementary NACK now: 382 */ 383 if (bclink_ack_allowed(sender_tag + 1)) { 384 if (n_ptr->bclink.gap_to != n_ptr->bclink.gap_after) { 385 bclink_send_nack(n_ptr); 386 bclink_set_gap(n_ptr); 387 } 388 } 389 tipc_node_unlock(n_ptr); 390 } 391 392 /** 393 * tipc_bclink_send_msg - broadcast a packet to all nodes in cluster 394 */ 395 396 int tipc_bclink_send_msg(struct sk_buff *buf) 397 { 398 int res; 399 400 spin_lock_bh(&bc_lock); 401 402 res = tipc_link_send_buf(bcl, buf); 403 if (unlikely(res == -ELINKCONG)) 404 buf_discard(buf); 405 else 406 bclink_set_last_sent(); 407 408 if (bcl->out_queue_size > bcl->stats.max_queue_sz) 409 bcl->stats.max_queue_sz = bcl->out_queue_size; 410 bcl->stats.queue_sz_counts++; 411 bcl->stats.accu_queue_sz += bcl->out_queue_size; 412 413 spin_unlock_bh(&bc_lock); 414 return res; 415 } 416 417 /** 418 * tipc_bclink_recv_pkt - receive a broadcast packet, and deliver upwards 419 * 420 * tipc_net_lock is read_locked, no other locks set 421 */ 422 423 void tipc_bclink_recv_pkt(struct sk_buff *buf) 424 { 425 #if (TIPC_BCAST_LOSS_RATE) 426 static int rx_count; 427 #endif 428 struct tipc_msg *msg = buf_msg(buf); 429 struct tipc_node *node = tipc_node_find(msg_prevnode(msg)); 430 u32 next_in; 431 u32 seqno; 432 struct sk_buff *deferred; 433 434 if (unlikely(!node || !tipc_node_is_up(node) || !node->bclink.supported || 435 (msg_mc_netid(msg) != tipc_net_id))) { 436 buf_discard(buf); 437 return; 438 } 439 440 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 441 if (msg_destnode(msg) == tipc_own_addr) { 442 tipc_node_lock(node); 443 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 444 tipc_node_unlock(node); 445 spin_lock_bh(&bc_lock); 446 bcl->stats.recv_nacks++; 447 bcl->owner->next = node; /* remember requestor */ 448 bclink_retransmit_pkt(msg_bcgap_after(msg), 449 msg_bcgap_to(msg)); 450 bcl->owner->next = NULL; 451 spin_unlock_bh(&bc_lock); 452 } else { 453 tipc_bclink_peek_nack(msg_destnode(msg), 454 msg_bcast_tag(msg), 455 msg_bcgap_after(msg), 456 msg_bcgap_to(msg)); 457 } 458 buf_discard(buf); 459 return; 460 } 461 462 #if (TIPC_BCAST_LOSS_RATE) 463 if (++rx_count == TIPC_BCAST_LOSS_RATE) { 464 rx_count = 0; 465 buf_discard(buf); 466 return; 467 } 468 #endif 469 470 tipc_node_lock(node); 471 receive: 472 deferred = node->bclink.deferred_head; 473 next_in = mod(node->bclink.last_in + 1); 474 seqno = msg_seqno(msg); 475 476 if (likely(seqno == next_in)) { 477 bcl->stats.recv_info++; 478 node->bclink.last_in++; 479 bclink_set_gap(node); 480 if (unlikely(bclink_ack_allowed(seqno))) { 481 bclink_send_ack(node); 482 bcl->stats.sent_acks++; 483 } 484 if (likely(msg_isdata(msg))) { 485 tipc_node_unlock(node); 486 tipc_port_recv_mcast(buf, NULL); 487 } else if (msg_user(msg) == MSG_BUNDLER) { 488 bcl->stats.recv_bundles++; 489 bcl->stats.recv_bundled += msg_msgcnt(msg); 490 tipc_node_unlock(node); 491 tipc_link_recv_bundle(buf); 492 } else if (msg_user(msg) == MSG_FRAGMENTER) { 493 bcl->stats.recv_fragments++; 494 if (tipc_link_recv_fragment(&node->bclink.defragm, 495 &buf, &msg)) 496 bcl->stats.recv_fragmented++; 497 tipc_node_unlock(node); 498 tipc_net_route_msg(buf); 499 } else { 500 tipc_node_unlock(node); 501 tipc_net_route_msg(buf); 502 } 503 if (deferred && (buf_seqno(deferred) == mod(next_in + 1))) { 504 tipc_node_lock(node); 505 buf = deferred; 506 msg = buf_msg(buf); 507 node->bclink.deferred_head = deferred->next; 508 goto receive; 509 } 510 return; 511 } else if (less(next_in, seqno)) { 512 u32 gap_after = node->bclink.gap_after; 513 u32 gap_to = node->bclink.gap_to; 514 515 if (tipc_link_defer_pkt(&node->bclink.deferred_head, 516 &node->bclink.deferred_tail, 517 buf)) { 518 node->bclink.nack_sync++; 519 bcl->stats.deferred_recv++; 520 if (seqno == mod(gap_after + 1)) 521 node->bclink.gap_after = seqno; 522 else if (less(gap_after, seqno) && less(seqno, gap_to)) 523 node->bclink.gap_to = seqno; 524 } 525 if (bclink_ack_allowed(node->bclink.nack_sync)) { 526 if (gap_to != gap_after) 527 bclink_send_nack(node); 528 bclink_set_gap(node); 529 } 530 } else { 531 bcl->stats.duplicates++; 532 buf_discard(buf); 533 } 534 tipc_node_unlock(node); 535 } 536 537 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 538 { 539 return (n_ptr->bclink.supported && 540 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 541 } 542 543 544 /** 545 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 546 * 547 * Send through as many bearers as necessary to reach all nodes 548 * that support TIPC multicasting. 549 * 550 * Returns 0 if packet sent successfully, non-zero if not 551 */ 552 553 static int tipc_bcbearer_send(struct sk_buff *buf, 554 struct tipc_bearer *unused1, 555 struct tipc_media_addr *unused2) 556 { 557 int bp_index; 558 559 /* Prepare buffer for broadcasting (if first time trying to send it) */ 560 561 if (likely(!msg_non_seq(buf_msg(buf)))) { 562 struct tipc_msg *msg; 563 564 assert(tipc_bcast_nmap.count != 0); 565 bcbuf_set_acks(buf, tipc_bcast_nmap.count); 566 msg = buf_msg(buf); 567 msg_set_non_seq(msg, 1); 568 msg_set_mc_netid(msg, tipc_net_id); 569 bcl->stats.sent_info++; 570 } 571 572 /* Send buffer over bearers until all targets reached */ 573 574 bcbearer->remains = tipc_bcast_nmap; 575 576 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 577 struct bearer *p = bcbearer->bpairs[bp_index].primary; 578 struct bearer *s = bcbearer->bpairs[bp_index].secondary; 579 580 if (!p) 581 break; /* no more bearers to try */ 582 583 tipc_nmap_diff(&bcbearer->remains, &p->nodes, &bcbearer->remains_new); 584 if (bcbearer->remains_new.count == bcbearer->remains.count) 585 continue; /* bearer pair doesn't add anything */ 586 587 if (p->publ.blocked || 588 p->media->send_msg(buf, &p->publ, &p->media->bcast_addr)) { 589 /* unable to send on primary bearer */ 590 if (!s || s->publ.blocked || 591 s->media->send_msg(buf, &s->publ, 592 &s->media->bcast_addr)) { 593 /* unable to send on either bearer */ 594 continue; 595 } 596 } 597 598 if (s) { 599 bcbearer->bpairs[bp_index].primary = s; 600 bcbearer->bpairs[bp_index].secondary = p; 601 } 602 603 if (bcbearer->remains_new.count == 0) 604 return 0; 605 606 bcbearer->remains = bcbearer->remains_new; 607 } 608 609 /* 610 * Unable to reach all targets (indicate success, since currently 611 * there isn't code in place to properly block & unblock the 612 * pseudo-bearer used by the broadcast link) 613 */ 614 615 return TIPC_OK; 616 } 617 618 /** 619 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 620 */ 621 622 void tipc_bcbearer_sort(void) 623 { 624 struct bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 625 struct bcbearer_pair *bp_curr; 626 int b_index; 627 int pri; 628 629 spin_lock_bh(&bc_lock); 630 631 /* Group bearers by priority (can assume max of two per priority) */ 632 633 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 634 635 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 636 struct bearer *b = &tipc_bearers[b_index]; 637 638 if (!b->active || !b->nodes.count) 639 continue; 640 641 if (!bp_temp[b->priority].primary) 642 bp_temp[b->priority].primary = b; 643 else 644 bp_temp[b->priority].secondary = b; 645 } 646 647 /* Create array of bearer pairs for broadcasting */ 648 649 bp_curr = bcbearer->bpairs; 650 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 651 652 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 653 654 if (!bp_temp[pri].primary) 655 continue; 656 657 bp_curr->primary = bp_temp[pri].primary; 658 659 if (bp_temp[pri].secondary) { 660 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 661 &bp_temp[pri].secondary->nodes)) { 662 bp_curr->secondary = bp_temp[pri].secondary; 663 } else { 664 bp_curr++; 665 bp_curr->primary = bp_temp[pri].secondary; 666 } 667 } 668 669 bp_curr++; 670 } 671 672 spin_unlock_bh(&bc_lock); 673 } 674 675 /** 676 * tipc_bcbearer_push - resolve bearer congestion 677 * 678 * Forces bclink to push out any unsent packets, until all packets are gone 679 * or congestion reoccurs. 680 * No locks set when function called 681 */ 682 683 void tipc_bcbearer_push(void) 684 { 685 struct bearer *b_ptr; 686 687 spin_lock_bh(&bc_lock); 688 b_ptr = &bcbearer->bearer; 689 if (b_ptr->publ.blocked) { 690 b_ptr->publ.blocked = 0; 691 tipc_bearer_lock_push(b_ptr); 692 } 693 spin_unlock_bh(&bc_lock); 694 } 695 696 697 int tipc_bclink_stats(char *buf, const u32 buf_size) 698 { 699 struct print_buf pb; 700 701 if (!bcl) 702 return 0; 703 704 tipc_printbuf_init(&pb, buf, buf_size); 705 706 spin_lock_bh(&bc_lock); 707 708 tipc_printf(&pb, "Link <%s>\n" 709 " Window:%u packets\n", 710 bcl->name, bcl->queue_limit[0]); 711 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 712 bcl->stats.recv_info, 713 bcl->stats.recv_fragments, 714 bcl->stats.recv_fragmented, 715 bcl->stats.recv_bundles, 716 bcl->stats.recv_bundled); 717 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 718 bcl->stats.sent_info, 719 bcl->stats.sent_fragments, 720 bcl->stats.sent_fragmented, 721 bcl->stats.sent_bundles, 722 bcl->stats.sent_bundled); 723 tipc_printf(&pb, " RX naks:%u defs:%u dups:%u\n", 724 bcl->stats.recv_nacks, 725 bcl->stats.deferred_recv, 726 bcl->stats.duplicates); 727 tipc_printf(&pb, " TX naks:%u acks:%u dups:%u\n", 728 bcl->stats.sent_nacks, 729 bcl->stats.sent_acks, 730 bcl->stats.retransmitted); 731 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 732 bcl->stats.bearer_congs, 733 bcl->stats.link_congs, 734 bcl->stats.max_queue_sz, 735 bcl->stats.queue_sz_counts 736 ? (bcl->stats.accu_queue_sz / bcl->stats.queue_sz_counts) 737 : 0); 738 739 spin_unlock_bh(&bc_lock); 740 return tipc_printbuf_validate(&pb); 741 } 742 743 int tipc_bclink_reset_stats(void) 744 { 745 if (!bcl) 746 return -ENOPROTOOPT; 747 748 spin_lock_bh(&bc_lock); 749 memset(&bcl->stats, 0, sizeof(bcl->stats)); 750 spin_unlock_bh(&bc_lock); 751 return 0; 752 } 753 754 int tipc_bclink_set_queue_limits(u32 limit) 755 { 756 if (!bcl) 757 return -ENOPROTOOPT; 758 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 759 return -EINVAL; 760 761 spin_lock_bh(&bc_lock); 762 tipc_link_set_queue_limits(bcl, limit); 763 spin_unlock_bh(&bc_lock); 764 return 0; 765 } 766 767 int tipc_bclink_init(void) 768 { 769 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 770 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 771 if (!bcbearer || !bclink) { 772 warn("Multicast link creation failed, no memory\n"); 773 kfree(bcbearer); 774 bcbearer = NULL; 775 kfree(bclink); 776 bclink = NULL; 777 return -ENOMEM; 778 } 779 780 INIT_LIST_HEAD(&bcbearer->bearer.cong_links); 781 bcbearer->bearer.media = &bcbearer->media; 782 bcbearer->media.send_msg = tipc_bcbearer_send; 783 sprintf(bcbearer->media.name, "tipc-multicast"); 784 785 bcl = &bclink->link; 786 INIT_LIST_HEAD(&bcl->waiting_ports); 787 bcl->next_out_no = 1; 788 spin_lock_init(&bclink->node.lock); 789 bcl->owner = &bclink->node; 790 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 791 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 792 bcl->b_ptr = &bcbearer->bearer; 793 bcl->state = WORKING_WORKING; 794 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 795 796 return 0; 797 } 798 799 void tipc_bclink_stop(void) 800 { 801 spin_lock_bh(&bc_lock); 802 if (bcbearer) { 803 tipc_link_stop(bcl); 804 bcl = NULL; 805 kfree(bclink); 806 bclink = NULL; 807 kfree(bcbearer); 808 bcbearer = NULL; 809 } 810 spin_unlock_bh(&bc_lock); 811 } 812 813 814 /** 815 * tipc_nmap_add - add a node to a node map 816 */ 817 818 void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 819 { 820 int n = tipc_node(node); 821 int w = n / WSIZE; 822 u32 mask = (1 << (n % WSIZE)); 823 824 if ((nm_ptr->map[w] & mask) == 0) { 825 nm_ptr->count++; 826 nm_ptr->map[w] |= mask; 827 } 828 } 829 830 /** 831 * tipc_nmap_remove - remove a node from a node map 832 */ 833 834 void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 835 { 836 int n = tipc_node(node); 837 int w = n / WSIZE; 838 u32 mask = (1 << (n % WSIZE)); 839 840 if ((nm_ptr->map[w] & mask) != 0) { 841 nm_ptr->map[w] &= ~mask; 842 nm_ptr->count--; 843 } 844 } 845 846 /** 847 * tipc_nmap_diff - find differences between node maps 848 * @nm_a: input node map A 849 * @nm_b: input node map B 850 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 851 */ 852 853 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 854 struct tipc_node_map *nm_b, 855 struct tipc_node_map *nm_diff) 856 { 857 int stop = ARRAY_SIZE(nm_a->map); 858 int w; 859 int b; 860 u32 map; 861 862 memset(nm_diff, 0, sizeof(*nm_diff)); 863 for (w = 0; w < stop; w++) { 864 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 865 nm_diff->map[w] = map; 866 if (map != 0) { 867 for (b = 0 ; b < WSIZE; b++) { 868 if (map & (1 << b)) 869 nm_diff->count++; 870 } 871 } 872 } 873 } 874 875 /** 876 * tipc_port_list_add - add a port to a port list, ensuring no duplicates 877 */ 878 879 void tipc_port_list_add(struct port_list *pl_ptr, u32 port) 880 { 881 struct port_list *item = pl_ptr; 882 int i; 883 int item_sz = PLSIZE; 884 int cnt = pl_ptr->count; 885 886 for (; ; cnt -= item_sz, item = item->next) { 887 if (cnt < PLSIZE) 888 item_sz = cnt; 889 for (i = 0; i < item_sz; i++) 890 if (item->ports[i] == port) 891 return; 892 if (i < PLSIZE) { 893 item->ports[i] = port; 894 pl_ptr->count++; 895 return; 896 } 897 if (!item->next) { 898 item->next = kmalloc(sizeof(*item), GFP_ATOMIC); 899 if (!item->next) { 900 warn("Incomplete multicast delivery, no memory\n"); 901 return; 902 } 903 item->next->next = NULL; 904 } 905 } 906 } 907 908 /** 909 * tipc_port_list_free - free dynamically created entries in port_list chain 910 * 911 */ 912 913 void tipc_port_list_free(struct port_list *pl_ptr) 914 { 915 struct port_list *item; 916 struct port_list *next; 917 918 for (item = pl_ptr->next; item; item = next) { 919 next = item->next; 920 kfree(item); 921 } 922 } 923 924