1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "socket.h" 39 #include "msg.h" 40 #include "bcast.h" 41 #include "name_distr.h" 42 #include "core.h" 43 44 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 45 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 46 47 const char tipc_bclink_name[] = "broadcast-link"; 48 49 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 50 struct tipc_node_map *nm_b, 51 struct tipc_node_map *nm_diff); 52 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 53 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 54 55 static void tipc_bclink_lock(struct net *net) 56 { 57 struct tipc_net *tn = net_generic(net, tipc_net_id); 58 59 spin_lock_bh(&tn->bclink->lock); 60 } 61 62 static void tipc_bclink_unlock(struct net *net) 63 { 64 struct tipc_net *tn = net_generic(net, tipc_net_id); 65 struct tipc_node *node = NULL; 66 67 if (likely(!tn->bclink->flags)) { 68 spin_unlock_bh(&tn->bclink->lock); 69 return; 70 } 71 72 if (tn->bclink->flags & TIPC_BCLINK_RESET) { 73 tn->bclink->flags &= ~TIPC_BCLINK_RESET; 74 node = tipc_bclink_retransmit_to(net); 75 } 76 spin_unlock_bh(&tn->bclink->lock); 77 78 if (node) 79 tipc_link_reset_all(node); 80 } 81 82 void tipc_bclink_input(struct net *net) 83 { 84 struct tipc_net *tn = net_generic(net, tipc_net_id); 85 86 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); 87 } 88 89 uint tipc_bclink_get_mtu(void) 90 { 91 return MAX_PKT_DEFAULT_MCAST; 92 } 93 94 void tipc_bclink_set_flags(struct net *net, unsigned int flags) 95 { 96 struct tipc_net *tn = net_generic(net, tipc_net_id); 97 98 tn->bclink->flags |= flags; 99 } 100 101 static u32 bcbuf_acks(struct sk_buff *buf) 102 { 103 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 104 } 105 106 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 107 { 108 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 109 } 110 111 static void bcbuf_decr_acks(struct sk_buff *buf) 112 { 113 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 114 } 115 116 void tipc_bclink_add_node(struct net *net, u32 addr) 117 { 118 struct tipc_net *tn = net_generic(net, tipc_net_id); 119 120 tipc_bclink_lock(net); 121 tipc_nmap_add(&tn->bclink->bcast_nodes, addr); 122 tipc_bclink_unlock(net); 123 } 124 125 void tipc_bclink_remove_node(struct net *net, u32 addr) 126 { 127 struct tipc_net *tn = net_generic(net, tipc_net_id); 128 129 tipc_bclink_lock(net); 130 tipc_nmap_remove(&tn->bclink->bcast_nodes, addr); 131 tipc_bclink_unlock(net); 132 } 133 134 static void bclink_set_last_sent(struct net *net) 135 { 136 struct tipc_net *tn = net_generic(net, tipc_net_id); 137 struct tipc_link *bcl = tn->bcl; 138 139 if (bcl->next_out) 140 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 141 else 142 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 143 } 144 145 u32 tipc_bclink_get_last_sent(struct net *net) 146 { 147 struct tipc_net *tn = net_generic(net, tipc_net_id); 148 149 return tn->bcl->fsm_msg_cnt; 150 } 151 152 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 153 { 154 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 155 seqno : node->bclink.last_sent; 156 } 157 158 159 /** 160 * tipc_bclink_retransmit_to - get most recent node to request retransmission 161 * 162 * Called with bclink_lock locked 163 */ 164 struct tipc_node *tipc_bclink_retransmit_to(struct net *net) 165 { 166 struct tipc_net *tn = net_generic(net, tipc_net_id); 167 168 return tn->bclink->retransmit_to; 169 } 170 171 /** 172 * bclink_retransmit_pkt - retransmit broadcast packets 173 * @after: sequence number of last packet to *not* retransmit 174 * @to: sequence number of last packet to retransmit 175 * 176 * Called with bclink_lock locked 177 */ 178 static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) 179 { 180 struct sk_buff *skb; 181 struct tipc_link *bcl = tn->bcl; 182 183 skb_queue_walk(&bcl->outqueue, skb) { 184 if (more(buf_seqno(skb), after)) { 185 tipc_link_retransmit(bcl, skb, mod(to - after)); 186 break; 187 } 188 } 189 } 190 191 /** 192 * tipc_bclink_wakeup_users - wake up pending users 193 * 194 * Called with no locks taken 195 */ 196 void tipc_bclink_wakeup_users(struct net *net) 197 { 198 struct tipc_net *tn = net_generic(net, tipc_net_id); 199 200 tipc_sk_rcv(net, &tn->bclink->link.wakeupq); 201 } 202 203 /** 204 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 205 * @n_ptr: node that sent acknowledgement info 206 * @acked: broadcast sequence # that has been acknowledged 207 * 208 * Node is locked, bclink_lock unlocked. 209 */ 210 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 211 { 212 struct sk_buff *skb, *tmp; 213 struct sk_buff *next; 214 unsigned int released = 0; 215 struct net *net = n_ptr->net; 216 struct tipc_net *tn = net_generic(net, tipc_net_id); 217 218 tipc_bclink_lock(net); 219 /* Bail out if tx queue is empty (no clean up is required) */ 220 skb = skb_peek(&tn->bcl->outqueue); 221 if (!skb) 222 goto exit; 223 224 /* Determine which messages need to be acknowledged */ 225 if (acked == INVALID_LINK_SEQ) { 226 /* 227 * Contact with specified node has been lost, so need to 228 * acknowledge sent messages only (if other nodes still exist) 229 * or both sent and unsent messages (otherwise) 230 */ 231 if (tn->bclink->bcast_nodes.count) 232 acked = tn->bcl->fsm_msg_cnt; 233 else 234 acked = tn->bcl->next_out_no; 235 } else { 236 /* 237 * Bail out if specified sequence number does not correspond 238 * to a message that has been sent and not yet acknowledged 239 */ 240 if (less(acked, buf_seqno(skb)) || 241 less(tn->bcl->fsm_msg_cnt, acked) || 242 less_eq(acked, n_ptr->bclink.acked)) 243 goto exit; 244 } 245 246 /* Skip over packets that node has previously acknowledged */ 247 skb_queue_walk(&tn->bcl->outqueue, skb) { 248 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 249 break; 250 } 251 252 /* Update packets that node is now acknowledging */ 253 skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) { 254 if (more(buf_seqno(skb), acked)) 255 break; 256 257 next = tipc_skb_queue_next(&tn->bcl->outqueue, skb); 258 if (skb != tn->bcl->next_out) { 259 bcbuf_decr_acks(skb); 260 } else { 261 bcbuf_set_acks(skb, 0); 262 tn->bcl->next_out = next; 263 bclink_set_last_sent(net); 264 } 265 266 if (bcbuf_acks(skb) == 0) { 267 __skb_unlink(skb, &tn->bcl->outqueue); 268 kfree_skb(skb); 269 released = 1; 270 } 271 } 272 n_ptr->bclink.acked = acked; 273 274 /* Try resolving broadcast link congestion, if necessary */ 275 if (unlikely(tn->bcl->next_out)) { 276 tipc_link_push_packets(tn->bcl); 277 bclink_set_last_sent(net); 278 } 279 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq))) 280 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 281 exit: 282 tipc_bclink_unlock(net); 283 } 284 285 /** 286 * tipc_bclink_update_link_state - update broadcast link state 287 * 288 * RCU and node lock set 289 */ 290 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, 291 u32 last_sent) 292 { 293 struct sk_buff *buf; 294 struct net *net = n_ptr->net; 295 struct tipc_net *tn = net_generic(net, tipc_net_id); 296 297 /* Ignore "stale" link state info */ 298 if (less_eq(last_sent, n_ptr->bclink.last_in)) 299 return; 300 301 /* Update link synchronization state; quit if in sync */ 302 bclink_update_last_sent(n_ptr, last_sent); 303 304 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 305 return; 306 307 /* Update out-of-sync state; quit if loss is still unconfirmed */ 308 if ((++n_ptr->bclink.oos_state) == 1) { 309 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 310 return; 311 n_ptr->bclink.oos_state++; 312 } 313 314 /* Don't NACK if one has been recently sent (or seen) */ 315 if (n_ptr->bclink.oos_state & 0x1) 316 return; 317 318 /* Send NACK */ 319 buf = tipc_buf_acquire(INT_H_SIZE); 320 if (buf) { 321 struct tipc_msg *msg = buf_msg(buf); 322 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); 323 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 324 325 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, 326 INT_H_SIZE, n_ptr->addr); 327 msg_set_non_seq(msg, 1); 328 msg_set_mc_netid(msg, tn->net_id); 329 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 330 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 331 msg_set_bcgap_to(msg, to); 332 333 tipc_bclink_lock(net); 334 tipc_bearer_send(net, MAX_BEARERS, buf, NULL); 335 tn->bcl->stats.sent_nacks++; 336 tipc_bclink_unlock(net); 337 kfree_skb(buf); 338 339 n_ptr->bclink.oos_state++; 340 } 341 } 342 343 /** 344 * bclink_peek_nack - monitor retransmission requests sent by other nodes 345 * 346 * Delay any upcoming NACK by this node if another node has already 347 * requested the first message this node is going to ask for. 348 */ 349 static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) 350 { 351 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg)); 352 353 if (unlikely(!n_ptr)) 354 return; 355 356 tipc_node_lock(n_ptr); 357 358 if (n_ptr->bclink.recv_permitted && 359 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 360 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 361 n_ptr->bclink.oos_state = 2; 362 363 tipc_node_unlock(n_ptr); 364 } 365 366 /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster 367 * and to identified node local sockets 368 * @net: the applicable net namespace 369 * @list: chain of buffers containing message 370 * Consumes the buffer chain, except when returning -ELINKCONG 371 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 372 */ 373 int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) 374 { 375 struct tipc_net *tn = net_generic(net, tipc_net_id); 376 struct tipc_link *bcl = tn->bcl; 377 struct tipc_bclink *bclink = tn->bclink; 378 int rc = 0; 379 int bc = 0; 380 struct sk_buff *skb; 381 struct sk_buff_head arrvq; 382 struct sk_buff_head inputq; 383 384 /* Prepare clone of message for local node */ 385 skb = tipc_msg_reassemble(list); 386 if (unlikely(!skb)) { 387 __skb_queue_purge(list); 388 return -EHOSTUNREACH; 389 } 390 391 /* Broadcast to all nodes */ 392 if (likely(bclink)) { 393 tipc_bclink_lock(net); 394 if (likely(bclink->bcast_nodes.count)) { 395 rc = __tipc_link_xmit(net, bcl, list); 396 if (likely(!rc)) { 397 u32 len = skb_queue_len(&bcl->outqueue); 398 399 bclink_set_last_sent(net); 400 bcl->stats.queue_sz_counts++; 401 bcl->stats.accu_queue_sz += len; 402 } 403 bc = 1; 404 } 405 tipc_bclink_unlock(net); 406 } 407 408 if (unlikely(!bc)) 409 __skb_queue_purge(list); 410 411 if (unlikely(rc)) { 412 kfree_skb(skb); 413 return rc; 414 } 415 /* Deliver message clone */ 416 __skb_queue_head_init(&arrvq); 417 skb_queue_head_init(&inputq); 418 __skb_queue_tail(&arrvq, skb); 419 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 420 return rc; 421 } 422 423 /** 424 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 425 * 426 * Called with both sending node's lock and bclink_lock taken. 427 */ 428 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 429 { 430 struct tipc_net *tn = net_generic(node->net, tipc_net_id); 431 432 bclink_update_last_sent(node, seqno); 433 node->bclink.last_in = seqno; 434 node->bclink.oos_state = 0; 435 tn->bcl->stats.recv_info++; 436 437 /* 438 * Unicast an ACK periodically, ensuring that 439 * all nodes in the cluster don't ACK at the same time 440 */ 441 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { 442 tipc_link_proto_xmit(node->active_links[node->addr & 1], 443 STATE_MSG, 0, 0, 0, 0, 0); 444 tn->bcl->stats.sent_acks++; 445 } 446 } 447 448 /** 449 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards 450 * 451 * RCU is locked, no other locks set 452 */ 453 void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) 454 { 455 struct tipc_net *tn = net_generic(net, tipc_net_id); 456 struct tipc_link *bcl = tn->bcl; 457 struct tipc_msg *msg = buf_msg(buf); 458 struct tipc_node *node; 459 u32 next_in; 460 u32 seqno; 461 int deferred = 0; 462 int pos = 0; 463 struct sk_buff *iskb; 464 struct sk_buff_head *arrvq, *inputq; 465 466 /* Screen out unwanted broadcast messages */ 467 if (msg_mc_netid(msg) != tn->net_id) 468 goto exit; 469 470 node = tipc_node_find(net, msg_prevnode(msg)); 471 if (unlikely(!node)) 472 goto exit; 473 474 tipc_node_lock(node); 475 if (unlikely(!node->bclink.recv_permitted)) 476 goto unlock; 477 478 /* Handle broadcast protocol message */ 479 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 480 if (msg_type(msg) != STATE_MSG) 481 goto unlock; 482 if (msg_destnode(msg) == tn->own_addr) { 483 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 484 tipc_node_unlock(node); 485 tipc_bclink_lock(net); 486 bcl->stats.recv_nacks++; 487 tn->bclink->retransmit_to = node; 488 bclink_retransmit_pkt(tn, msg_bcgap_after(msg), 489 msg_bcgap_to(msg)); 490 tipc_bclink_unlock(net); 491 } else { 492 tipc_node_unlock(node); 493 bclink_peek_nack(net, msg); 494 } 495 goto exit; 496 } 497 498 /* Handle in-sequence broadcast message */ 499 seqno = msg_seqno(msg); 500 next_in = mod(node->bclink.last_in + 1); 501 arrvq = &tn->bclink->arrvq; 502 inputq = &tn->bclink->inputq; 503 504 if (likely(seqno == next_in)) { 505 receive: 506 /* Deliver message to destination */ 507 if (likely(msg_isdata(msg))) { 508 tipc_bclink_lock(net); 509 bclink_accept_pkt(node, seqno); 510 spin_lock_bh(&inputq->lock); 511 __skb_queue_tail(arrvq, buf); 512 spin_unlock_bh(&inputq->lock); 513 node->action_flags |= TIPC_BCAST_MSG_EVT; 514 tipc_bclink_unlock(net); 515 tipc_node_unlock(node); 516 } else if (msg_user(msg) == MSG_BUNDLER) { 517 tipc_bclink_lock(net); 518 bclink_accept_pkt(node, seqno); 519 bcl->stats.recv_bundles++; 520 bcl->stats.recv_bundled += msg_msgcnt(msg); 521 pos = 0; 522 while (tipc_msg_extract(buf, &iskb, &pos)) { 523 spin_lock_bh(&inputq->lock); 524 __skb_queue_tail(arrvq, iskb); 525 spin_unlock_bh(&inputq->lock); 526 } 527 node->action_flags |= TIPC_BCAST_MSG_EVT; 528 tipc_bclink_unlock(net); 529 tipc_node_unlock(node); 530 } else if (msg_user(msg) == MSG_FRAGMENTER) { 531 tipc_buf_append(&node->bclink.reasm_buf, &buf); 532 if (unlikely(!buf && !node->bclink.reasm_buf)) 533 goto unlock; 534 tipc_bclink_lock(net); 535 bclink_accept_pkt(node, seqno); 536 bcl->stats.recv_fragments++; 537 if (buf) { 538 bcl->stats.recv_fragmented++; 539 msg = buf_msg(buf); 540 tipc_bclink_unlock(net); 541 goto receive; 542 } 543 tipc_bclink_unlock(net); 544 tipc_node_unlock(node); 545 } else { 546 tipc_bclink_lock(net); 547 bclink_accept_pkt(node, seqno); 548 tipc_bclink_unlock(net); 549 tipc_node_unlock(node); 550 kfree_skb(buf); 551 } 552 buf = NULL; 553 554 /* Determine new synchronization state */ 555 tipc_node_lock(node); 556 if (unlikely(!tipc_node_is_up(node))) 557 goto unlock; 558 559 if (node->bclink.last_in == node->bclink.last_sent) 560 goto unlock; 561 562 if (skb_queue_empty(&node->bclink.deferred_queue)) { 563 node->bclink.oos_state = 1; 564 goto unlock; 565 } 566 567 msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); 568 seqno = msg_seqno(msg); 569 next_in = mod(next_in + 1); 570 if (seqno != next_in) 571 goto unlock; 572 573 /* Take in-sequence message from deferred queue & deliver it */ 574 buf = __skb_dequeue(&node->bclink.deferred_queue); 575 goto receive; 576 } 577 578 /* Handle out-of-sequence broadcast message */ 579 if (less(next_in, seqno)) { 580 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, 581 buf); 582 bclink_update_last_sent(node, seqno); 583 buf = NULL; 584 } 585 586 tipc_bclink_lock(net); 587 588 if (deferred) 589 bcl->stats.deferred_recv++; 590 else 591 bcl->stats.duplicates++; 592 593 tipc_bclink_unlock(net); 594 595 unlock: 596 tipc_node_unlock(node); 597 exit: 598 kfree_skb(buf); 599 } 600 601 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 602 { 603 return (n_ptr->bclink.recv_permitted && 604 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked)); 605 } 606 607 608 /** 609 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 610 * 611 * Send packet over as many bearers as necessary to reach all nodes 612 * that have joined the broadcast link. 613 * 614 * Returns 0 (packet sent successfully) under all circumstances, 615 * since the broadcast link's pseudo-bearer never blocks 616 */ 617 static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, 618 struct tipc_bearer *unused1, 619 struct tipc_media_addr *unused2) 620 { 621 int bp_index; 622 struct tipc_msg *msg = buf_msg(buf); 623 struct tipc_net *tn = net_generic(net, tipc_net_id); 624 struct tipc_bcbearer *bcbearer = tn->bcbearer; 625 struct tipc_bclink *bclink = tn->bclink; 626 627 /* Prepare broadcast link message for reliable transmission, 628 * if first time trying to send it; 629 * preparation is skipped for broadcast link protocol messages 630 * since they are sent in an unreliable manner and don't need it 631 */ 632 if (likely(!msg_non_seq(buf_msg(buf)))) { 633 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 634 msg_set_non_seq(msg, 1); 635 msg_set_mc_netid(msg, tn->net_id); 636 tn->bcl->stats.sent_info++; 637 638 if (WARN_ON(!bclink->bcast_nodes.count)) { 639 dump_stack(); 640 return 0; 641 } 642 } 643 644 /* Send buffer over bearers until all targets reached */ 645 bcbearer->remains = bclink->bcast_nodes; 646 647 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 648 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 649 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 650 struct tipc_bearer *bp[2] = {p, s}; 651 struct tipc_bearer *b = bp[msg_link_selector(msg)]; 652 struct sk_buff *tbuf; 653 654 if (!p) 655 break; /* No more bearers to try */ 656 if (!b) 657 b = p; 658 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 659 &bcbearer->remains_new); 660 if (bcbearer->remains_new.count == bcbearer->remains.count) 661 continue; /* Nothing added by bearer pair */ 662 663 if (bp_index == 0) { 664 /* Use original buffer for first bearer */ 665 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr); 666 } else { 667 /* Avoid concurrent buffer access */ 668 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); 669 if (!tbuf) 670 break; 671 tipc_bearer_send(net, b->identity, tbuf, 672 &b->bcast_addr); 673 kfree_skb(tbuf); /* Bearer keeps a clone */ 674 } 675 if (bcbearer->remains_new.count == 0) 676 break; /* All targets reached */ 677 678 bcbearer->remains = bcbearer->remains_new; 679 } 680 681 return 0; 682 } 683 684 /** 685 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 686 */ 687 void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, 688 u32 node, bool action) 689 { 690 struct tipc_net *tn = net_generic(net, tipc_net_id); 691 struct tipc_bcbearer *bcbearer = tn->bcbearer; 692 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 693 struct tipc_bcbearer_pair *bp_curr; 694 struct tipc_bearer *b; 695 int b_index; 696 int pri; 697 698 tipc_bclink_lock(net); 699 700 if (action) 701 tipc_nmap_add(nm_ptr, node); 702 else 703 tipc_nmap_remove(nm_ptr, node); 704 705 /* Group bearers by priority (can assume max of two per priority) */ 706 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 707 708 rcu_read_lock(); 709 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 710 b = rcu_dereference_rtnl(tn->bearer_list[b_index]); 711 if (!b || !b->nodes.count) 712 continue; 713 714 if (!bp_temp[b->priority].primary) 715 bp_temp[b->priority].primary = b; 716 else 717 bp_temp[b->priority].secondary = b; 718 } 719 rcu_read_unlock(); 720 721 /* Create array of bearer pairs for broadcasting */ 722 bp_curr = bcbearer->bpairs; 723 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 724 725 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 726 727 if (!bp_temp[pri].primary) 728 continue; 729 730 bp_curr->primary = bp_temp[pri].primary; 731 732 if (bp_temp[pri].secondary) { 733 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 734 &bp_temp[pri].secondary->nodes)) { 735 bp_curr->secondary = bp_temp[pri].secondary; 736 } else { 737 bp_curr++; 738 bp_curr->primary = bp_temp[pri].secondary; 739 } 740 } 741 742 bp_curr++; 743 } 744 745 tipc_bclink_unlock(net); 746 } 747 748 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 749 struct tipc_stats *stats) 750 { 751 int i; 752 struct nlattr *nest; 753 754 struct nla_map { 755 __u32 key; 756 __u32 val; 757 }; 758 759 struct nla_map map[] = { 760 {TIPC_NLA_STATS_RX_INFO, stats->recv_info}, 761 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments}, 762 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented}, 763 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles}, 764 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled}, 765 {TIPC_NLA_STATS_TX_INFO, stats->sent_info}, 766 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments}, 767 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented}, 768 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles}, 769 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled}, 770 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks}, 771 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv}, 772 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks}, 773 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks}, 774 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted}, 775 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates}, 776 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs}, 777 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz}, 778 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ? 779 (stats->accu_queue_sz / stats->queue_sz_counts) : 0} 780 }; 781 782 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS); 783 if (!nest) 784 return -EMSGSIZE; 785 786 for (i = 0; i < ARRAY_SIZE(map); i++) 787 if (nla_put_u32(skb, map[i].key, map[i].val)) 788 goto msg_full; 789 790 nla_nest_end(skb, nest); 791 792 return 0; 793 msg_full: 794 nla_nest_cancel(skb, nest); 795 796 return -EMSGSIZE; 797 } 798 799 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) 800 { 801 int err; 802 void *hdr; 803 struct nlattr *attrs; 804 struct nlattr *prop; 805 struct tipc_net *tn = net_generic(net, tipc_net_id); 806 struct tipc_link *bcl = tn->bcl; 807 808 if (!bcl) 809 return 0; 810 811 tipc_bclink_lock(net); 812 813 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 814 NLM_F_MULTI, TIPC_NL_LINK_GET); 815 if (!hdr) 816 return -EMSGSIZE; 817 818 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); 819 if (!attrs) 820 goto msg_full; 821 822 /* The broadcast link is always up */ 823 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 824 goto attr_msg_full; 825 826 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST)) 827 goto attr_msg_full; 828 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name)) 829 goto attr_msg_full; 830 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no)) 831 goto attr_msg_full; 832 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no)) 833 goto attr_msg_full; 834 835 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 836 if (!prop) 837 goto attr_msg_full; 838 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) 839 goto prop_msg_full; 840 nla_nest_end(msg->skb, prop); 841 842 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); 843 if (err) 844 goto attr_msg_full; 845 846 tipc_bclink_unlock(net); 847 nla_nest_end(msg->skb, attrs); 848 genlmsg_end(msg->skb, hdr); 849 850 return 0; 851 852 prop_msg_full: 853 nla_nest_cancel(msg->skb, prop); 854 attr_msg_full: 855 nla_nest_cancel(msg->skb, attrs); 856 msg_full: 857 tipc_bclink_unlock(net); 858 genlmsg_cancel(msg->skb, hdr); 859 860 return -EMSGSIZE; 861 } 862 863 int tipc_bclink_reset_stats(struct net *net) 864 { 865 struct tipc_net *tn = net_generic(net, tipc_net_id); 866 struct tipc_link *bcl = tn->bcl; 867 868 if (!bcl) 869 return -ENOPROTOOPT; 870 871 tipc_bclink_lock(net); 872 memset(&bcl->stats, 0, sizeof(bcl->stats)); 873 tipc_bclink_unlock(net); 874 return 0; 875 } 876 877 int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 878 { 879 struct tipc_net *tn = net_generic(net, tipc_net_id); 880 struct tipc_link *bcl = tn->bcl; 881 882 if (!bcl) 883 return -ENOPROTOOPT; 884 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 885 return -EINVAL; 886 887 tipc_bclink_lock(net); 888 tipc_link_set_queue_limits(bcl, limit); 889 tipc_bclink_unlock(net); 890 return 0; 891 } 892 893 int tipc_bclink_init(struct net *net) 894 { 895 struct tipc_net *tn = net_generic(net, tipc_net_id); 896 struct tipc_bcbearer *bcbearer; 897 struct tipc_bclink *bclink; 898 struct tipc_link *bcl; 899 900 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 901 if (!bcbearer) 902 return -ENOMEM; 903 904 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 905 if (!bclink) { 906 kfree(bcbearer); 907 return -ENOMEM; 908 } 909 910 bcl = &bclink->link; 911 bcbearer->bearer.media = &bcbearer->media; 912 bcbearer->media.send_msg = tipc_bcbearer_send; 913 sprintf(bcbearer->media.name, "tipc-broadcast"); 914 915 spin_lock_init(&bclink->lock); 916 __skb_queue_head_init(&bcl->outqueue); 917 __skb_queue_head_init(&bcl->deferred_queue); 918 skb_queue_head_init(&bcl->wakeupq); 919 bcl->next_out_no = 1; 920 spin_lock_init(&bclink->node.lock); 921 __skb_queue_head_init(&bclink->arrvq); 922 skb_queue_head_init(&bclink->inputq); 923 bcl->owner = &bclink->node; 924 bcl->owner->net = net; 925 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 926 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 927 bcl->bearer_id = MAX_BEARERS; 928 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); 929 bcl->state = WORKING_WORKING; 930 bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; 931 msg_set_prevnode(bcl->pmsg, tn->own_addr); 932 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 933 tn->bcbearer = bcbearer; 934 tn->bclink = bclink; 935 tn->bcl = bcl; 936 return 0; 937 } 938 939 void tipc_bclink_stop(struct net *net) 940 { 941 struct tipc_net *tn = net_generic(net, tipc_net_id); 942 943 tipc_bclink_lock(net); 944 tipc_link_purge_queues(tn->bcl); 945 tipc_bclink_unlock(net); 946 947 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); 948 synchronize_net(); 949 kfree(tn->bcbearer); 950 kfree(tn->bclink); 951 } 952 953 /** 954 * tipc_nmap_add - add a node to a node map 955 */ 956 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 957 { 958 int n = tipc_node(node); 959 int w = n / WSIZE; 960 u32 mask = (1 << (n % WSIZE)); 961 962 if ((nm_ptr->map[w] & mask) == 0) { 963 nm_ptr->count++; 964 nm_ptr->map[w] |= mask; 965 } 966 } 967 968 /** 969 * tipc_nmap_remove - remove a node from a node map 970 */ 971 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 972 { 973 int n = tipc_node(node); 974 int w = n / WSIZE; 975 u32 mask = (1 << (n % WSIZE)); 976 977 if ((nm_ptr->map[w] & mask) != 0) { 978 nm_ptr->map[w] &= ~mask; 979 nm_ptr->count--; 980 } 981 } 982 983 /** 984 * tipc_nmap_diff - find differences between node maps 985 * @nm_a: input node map A 986 * @nm_b: input node map B 987 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 988 */ 989 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 990 struct tipc_node_map *nm_b, 991 struct tipc_node_map *nm_diff) 992 { 993 int stop = ARRAY_SIZE(nm_a->map); 994 int w; 995 int b; 996 u32 map; 997 998 memset(nm_diff, 0, sizeof(*nm_diff)); 999 for (w = 0; w < stop; w++) { 1000 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 1001 nm_diff->map[w] = map; 1002 if (map != 0) { 1003 for (b = 0 ; b < WSIZE; b++) { 1004 if (map & (1 << b)) 1005 nm_diff->count++; 1006 } 1007 } 1008 } 1009 } 1010