1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "socket.h" 39 #include "msg.h" 40 #include "bcast.h" 41 #include "name_distr.h" 42 #include "core.h" 43 44 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 45 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 46 47 const char tipc_bclink_name[] = "broadcast-link"; 48 49 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 50 struct tipc_node_map *nm_b, 51 struct tipc_node_map *nm_diff); 52 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 53 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 54 55 static void tipc_bclink_lock(struct net *net) 56 { 57 struct tipc_net *tn = net_generic(net, tipc_net_id); 58 59 spin_lock_bh(&tn->bclink->lock); 60 } 61 62 static void tipc_bclink_unlock(struct net *net) 63 { 64 struct tipc_net *tn = net_generic(net, tipc_net_id); 65 struct tipc_node *node = NULL; 66 67 if (likely(!tn->bclink->flags)) { 68 spin_unlock_bh(&tn->bclink->lock); 69 return; 70 } 71 72 if (tn->bclink->flags & TIPC_BCLINK_RESET) { 73 tn->bclink->flags &= ~TIPC_BCLINK_RESET; 74 node = tipc_bclink_retransmit_to(net); 75 } 76 spin_unlock_bh(&tn->bclink->lock); 77 78 if (node) 79 tipc_link_reset_all(node); 80 } 81 82 void tipc_bclink_input(struct net *net) 83 { 84 struct tipc_net *tn = net_generic(net, tipc_net_id); 85 86 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); 87 } 88 89 uint tipc_bclink_get_mtu(void) 90 { 91 return MAX_PKT_DEFAULT_MCAST; 92 } 93 94 void tipc_bclink_set_flags(struct net *net, unsigned int flags) 95 { 96 struct tipc_net *tn = net_generic(net, tipc_net_id); 97 98 tn->bclink->flags |= flags; 99 } 100 101 static u32 bcbuf_acks(struct sk_buff *buf) 102 { 103 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 104 } 105 106 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 107 { 108 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 109 } 110 111 static void bcbuf_decr_acks(struct sk_buff *buf) 112 { 113 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 114 } 115 116 void tipc_bclink_add_node(struct net *net, u32 addr) 117 { 118 struct tipc_net *tn = net_generic(net, tipc_net_id); 119 120 tipc_bclink_lock(net); 121 tipc_nmap_add(&tn->bclink->bcast_nodes, addr); 122 tipc_bclink_unlock(net); 123 } 124 125 void tipc_bclink_remove_node(struct net *net, u32 addr) 126 { 127 struct tipc_net *tn = net_generic(net, tipc_net_id); 128 129 tipc_bclink_lock(net); 130 tipc_nmap_remove(&tn->bclink->bcast_nodes, addr); 131 tipc_bclink_unlock(net); 132 } 133 134 static void bclink_set_last_sent(struct net *net) 135 { 136 struct tipc_net *tn = net_generic(net, tipc_net_id); 137 struct tipc_link *bcl = tn->bcl; 138 struct sk_buff *skb = skb_peek(&bcl->backlogq); 139 140 if (skb) 141 bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1); 142 else 143 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 144 } 145 146 u32 tipc_bclink_get_last_sent(struct net *net) 147 { 148 struct tipc_net *tn = net_generic(net, tipc_net_id); 149 150 return tn->bcl->fsm_msg_cnt; 151 } 152 153 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 154 { 155 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 156 seqno : node->bclink.last_sent; 157 } 158 159 160 /** 161 * tipc_bclink_retransmit_to - get most recent node to request retransmission 162 * 163 * Called with bclink_lock locked 164 */ 165 struct tipc_node *tipc_bclink_retransmit_to(struct net *net) 166 { 167 struct tipc_net *tn = net_generic(net, tipc_net_id); 168 169 return tn->bclink->retransmit_to; 170 } 171 172 /** 173 * bclink_retransmit_pkt - retransmit broadcast packets 174 * @after: sequence number of last packet to *not* retransmit 175 * @to: sequence number of last packet to retransmit 176 * 177 * Called with bclink_lock locked 178 */ 179 static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) 180 { 181 struct sk_buff *skb; 182 struct tipc_link *bcl = tn->bcl; 183 184 skb_queue_walk(&bcl->transmq, skb) { 185 if (more(buf_seqno(skb), after)) { 186 tipc_link_retransmit(bcl, skb, mod(to - after)); 187 break; 188 } 189 } 190 } 191 192 /** 193 * tipc_bclink_wakeup_users - wake up pending users 194 * 195 * Called with no locks taken 196 */ 197 void tipc_bclink_wakeup_users(struct net *net) 198 { 199 struct tipc_net *tn = net_generic(net, tipc_net_id); 200 201 tipc_sk_rcv(net, &tn->bclink->link.wakeupq); 202 } 203 204 /** 205 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 206 * @n_ptr: node that sent acknowledgement info 207 * @acked: broadcast sequence # that has been acknowledged 208 * 209 * Node is locked, bclink_lock unlocked. 210 */ 211 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 212 { 213 struct sk_buff *skb, *tmp; 214 unsigned int released = 0; 215 struct net *net = n_ptr->net; 216 struct tipc_net *tn = net_generic(net, tipc_net_id); 217 218 if (unlikely(!n_ptr->bclink.recv_permitted)) 219 return; 220 221 tipc_bclink_lock(net); 222 223 /* Bail out if tx queue is empty (no clean up is required) */ 224 skb = skb_peek(&tn->bcl->transmq); 225 if (!skb) 226 goto exit; 227 228 /* Determine which messages need to be acknowledged */ 229 if (acked == INVALID_LINK_SEQ) { 230 /* 231 * Contact with specified node has been lost, so need to 232 * acknowledge sent messages only (if other nodes still exist) 233 * or both sent and unsent messages (otherwise) 234 */ 235 if (tn->bclink->bcast_nodes.count) 236 acked = tn->bcl->fsm_msg_cnt; 237 else 238 acked = tn->bcl->next_out_no; 239 } else { 240 /* 241 * Bail out if specified sequence number does not correspond 242 * to a message that has been sent and not yet acknowledged 243 */ 244 if (less(acked, buf_seqno(skb)) || 245 less(tn->bcl->fsm_msg_cnt, acked) || 246 less_eq(acked, n_ptr->bclink.acked)) 247 goto exit; 248 } 249 250 /* Skip over packets that node has previously acknowledged */ 251 skb_queue_walk(&tn->bcl->transmq, skb) { 252 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 253 break; 254 } 255 256 /* Update packets that node is now acknowledging */ 257 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { 258 if (more(buf_seqno(skb), acked)) 259 break; 260 bcbuf_decr_acks(skb); 261 bclink_set_last_sent(net); 262 if (bcbuf_acks(skb) == 0) { 263 __skb_unlink(skb, &tn->bcl->transmq); 264 kfree_skb(skb); 265 released = 1; 266 } 267 } 268 n_ptr->bclink.acked = acked; 269 270 /* Try resolving broadcast link congestion, if necessary */ 271 if (unlikely(skb_peek(&tn->bcl->backlogq))) { 272 tipc_link_push_packets(tn->bcl); 273 bclink_set_last_sent(net); 274 } 275 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq))) 276 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 277 exit: 278 tipc_bclink_unlock(net); 279 } 280 281 /** 282 * tipc_bclink_update_link_state - update broadcast link state 283 * 284 * RCU and node lock set 285 */ 286 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, 287 u32 last_sent) 288 { 289 struct sk_buff *buf; 290 struct net *net = n_ptr->net; 291 struct tipc_net *tn = net_generic(net, tipc_net_id); 292 293 /* Ignore "stale" link state info */ 294 if (less_eq(last_sent, n_ptr->bclink.last_in)) 295 return; 296 297 /* Update link synchronization state; quit if in sync */ 298 bclink_update_last_sent(n_ptr, last_sent); 299 300 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 301 return; 302 303 /* Update out-of-sync state; quit if loss is still unconfirmed */ 304 if ((++n_ptr->bclink.oos_state) == 1) { 305 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 306 return; 307 n_ptr->bclink.oos_state++; 308 } 309 310 /* Don't NACK if one has been recently sent (or seen) */ 311 if (n_ptr->bclink.oos_state & 0x1) 312 return; 313 314 /* Send NACK */ 315 buf = tipc_buf_acquire(INT_H_SIZE); 316 if (buf) { 317 struct tipc_msg *msg = buf_msg(buf); 318 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq); 319 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 320 321 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, 322 INT_H_SIZE, n_ptr->addr); 323 msg_set_non_seq(msg, 1); 324 msg_set_mc_netid(msg, tn->net_id); 325 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 326 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 327 msg_set_bcgap_to(msg, to); 328 329 tipc_bclink_lock(net); 330 tipc_bearer_send(net, MAX_BEARERS, buf, NULL); 331 tn->bcl->stats.sent_nacks++; 332 tipc_bclink_unlock(net); 333 kfree_skb(buf); 334 335 n_ptr->bclink.oos_state++; 336 } 337 } 338 339 /** 340 * bclink_peek_nack - monitor retransmission requests sent by other nodes 341 * 342 * Delay any upcoming NACK by this node if another node has already 343 * requested the first message this node is going to ask for. 344 */ 345 static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) 346 { 347 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg)); 348 349 if (unlikely(!n_ptr)) 350 return; 351 352 tipc_node_lock(n_ptr); 353 354 if (n_ptr->bclink.recv_permitted && 355 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 356 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 357 n_ptr->bclink.oos_state = 2; 358 359 tipc_node_unlock(n_ptr); 360 } 361 362 /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster 363 * and to identified node local sockets 364 * @net: the applicable net namespace 365 * @list: chain of buffers containing message 366 * Consumes the buffer chain, except when returning -ELINKCONG 367 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 368 */ 369 int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) 370 { 371 struct tipc_net *tn = net_generic(net, tipc_net_id); 372 struct tipc_link *bcl = tn->bcl; 373 struct tipc_bclink *bclink = tn->bclink; 374 int rc = 0; 375 int bc = 0; 376 struct sk_buff *skb; 377 struct sk_buff_head arrvq; 378 struct sk_buff_head inputq; 379 380 /* Prepare clone of message for local node */ 381 skb = tipc_msg_reassemble(list); 382 if (unlikely(!skb)) { 383 __skb_queue_purge(list); 384 return -EHOSTUNREACH; 385 } 386 /* Broadcast to all nodes */ 387 if (likely(bclink)) { 388 tipc_bclink_lock(net); 389 if (likely(bclink->bcast_nodes.count)) { 390 rc = __tipc_link_xmit(net, bcl, list); 391 if (likely(!rc)) { 392 u32 len = skb_queue_len(&bcl->transmq); 393 394 bclink_set_last_sent(net); 395 bcl->stats.queue_sz_counts++; 396 bcl->stats.accu_queue_sz += len; 397 } 398 bc = 1; 399 } 400 tipc_bclink_unlock(net); 401 } 402 403 if (unlikely(!bc)) 404 __skb_queue_purge(list); 405 406 if (unlikely(rc)) { 407 kfree_skb(skb); 408 return rc; 409 } 410 /* Deliver message clone */ 411 __skb_queue_head_init(&arrvq); 412 skb_queue_head_init(&inputq); 413 __skb_queue_tail(&arrvq, skb); 414 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 415 return rc; 416 } 417 418 /** 419 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 420 * 421 * Called with both sending node's lock and bclink_lock taken. 422 */ 423 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 424 { 425 struct tipc_net *tn = net_generic(node->net, tipc_net_id); 426 427 bclink_update_last_sent(node, seqno); 428 node->bclink.last_in = seqno; 429 node->bclink.oos_state = 0; 430 tn->bcl->stats.recv_info++; 431 432 /* 433 * Unicast an ACK periodically, ensuring that 434 * all nodes in the cluster don't ACK at the same time 435 */ 436 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { 437 tipc_link_proto_xmit(node->active_links[node->addr & 1], 438 STATE_MSG, 0, 0, 0, 0, 0); 439 tn->bcl->stats.sent_acks++; 440 } 441 } 442 443 /** 444 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards 445 * 446 * RCU is locked, no other locks set 447 */ 448 void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) 449 { 450 struct tipc_net *tn = net_generic(net, tipc_net_id); 451 struct tipc_link *bcl = tn->bcl; 452 struct tipc_msg *msg = buf_msg(buf); 453 struct tipc_node *node; 454 u32 next_in; 455 u32 seqno; 456 int deferred = 0; 457 int pos = 0; 458 struct sk_buff *iskb; 459 struct sk_buff_head *arrvq, *inputq; 460 461 /* Screen out unwanted broadcast messages */ 462 if (msg_mc_netid(msg) != tn->net_id) 463 goto exit; 464 465 node = tipc_node_find(net, msg_prevnode(msg)); 466 if (unlikely(!node)) 467 goto exit; 468 469 tipc_node_lock(node); 470 if (unlikely(!node->bclink.recv_permitted)) 471 goto unlock; 472 473 /* Handle broadcast protocol message */ 474 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 475 if (msg_type(msg) != STATE_MSG) 476 goto unlock; 477 if (msg_destnode(msg) == tn->own_addr) { 478 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 479 tipc_node_unlock(node); 480 tipc_bclink_lock(net); 481 bcl->stats.recv_nacks++; 482 tn->bclink->retransmit_to = node; 483 bclink_retransmit_pkt(tn, msg_bcgap_after(msg), 484 msg_bcgap_to(msg)); 485 tipc_bclink_unlock(net); 486 } else { 487 tipc_node_unlock(node); 488 bclink_peek_nack(net, msg); 489 } 490 goto exit; 491 } 492 493 /* Handle in-sequence broadcast message */ 494 seqno = msg_seqno(msg); 495 next_in = mod(node->bclink.last_in + 1); 496 arrvq = &tn->bclink->arrvq; 497 inputq = &tn->bclink->inputq; 498 499 if (likely(seqno == next_in)) { 500 receive: 501 /* Deliver message to destination */ 502 if (likely(msg_isdata(msg))) { 503 tipc_bclink_lock(net); 504 bclink_accept_pkt(node, seqno); 505 spin_lock_bh(&inputq->lock); 506 __skb_queue_tail(arrvq, buf); 507 spin_unlock_bh(&inputq->lock); 508 node->action_flags |= TIPC_BCAST_MSG_EVT; 509 tipc_bclink_unlock(net); 510 tipc_node_unlock(node); 511 } else if (msg_user(msg) == MSG_BUNDLER) { 512 tipc_bclink_lock(net); 513 bclink_accept_pkt(node, seqno); 514 bcl->stats.recv_bundles++; 515 bcl->stats.recv_bundled += msg_msgcnt(msg); 516 pos = 0; 517 while (tipc_msg_extract(buf, &iskb, &pos)) { 518 spin_lock_bh(&inputq->lock); 519 __skb_queue_tail(arrvq, iskb); 520 spin_unlock_bh(&inputq->lock); 521 } 522 node->action_flags |= TIPC_BCAST_MSG_EVT; 523 tipc_bclink_unlock(net); 524 tipc_node_unlock(node); 525 } else if (msg_user(msg) == MSG_FRAGMENTER) { 526 tipc_buf_append(&node->bclink.reasm_buf, &buf); 527 if (unlikely(!buf && !node->bclink.reasm_buf)) 528 goto unlock; 529 tipc_bclink_lock(net); 530 bclink_accept_pkt(node, seqno); 531 bcl->stats.recv_fragments++; 532 if (buf) { 533 bcl->stats.recv_fragmented++; 534 msg = buf_msg(buf); 535 tipc_bclink_unlock(net); 536 goto receive; 537 } 538 tipc_bclink_unlock(net); 539 tipc_node_unlock(node); 540 } else { 541 tipc_bclink_lock(net); 542 bclink_accept_pkt(node, seqno); 543 tipc_bclink_unlock(net); 544 tipc_node_unlock(node); 545 kfree_skb(buf); 546 } 547 buf = NULL; 548 549 /* Determine new synchronization state */ 550 tipc_node_lock(node); 551 if (unlikely(!tipc_node_is_up(node))) 552 goto unlock; 553 554 if (node->bclink.last_in == node->bclink.last_sent) 555 goto unlock; 556 557 if (skb_queue_empty(&node->bclink.deferdq)) { 558 node->bclink.oos_state = 1; 559 goto unlock; 560 } 561 562 msg = buf_msg(skb_peek(&node->bclink.deferdq)); 563 seqno = msg_seqno(msg); 564 next_in = mod(next_in + 1); 565 if (seqno != next_in) 566 goto unlock; 567 568 /* Take in-sequence message from deferred queue & deliver it */ 569 buf = __skb_dequeue(&node->bclink.deferdq); 570 goto receive; 571 } 572 573 /* Handle out-of-sequence broadcast message */ 574 if (less(next_in, seqno)) { 575 deferred = tipc_link_defer_pkt(&node->bclink.deferdq, 576 buf); 577 bclink_update_last_sent(node, seqno); 578 buf = NULL; 579 } 580 581 tipc_bclink_lock(net); 582 583 if (deferred) 584 bcl->stats.deferred_recv++; 585 else 586 bcl->stats.duplicates++; 587 588 tipc_bclink_unlock(net); 589 590 unlock: 591 tipc_node_unlock(node); 592 exit: 593 kfree_skb(buf); 594 } 595 596 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 597 { 598 return (n_ptr->bclink.recv_permitted && 599 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked)); 600 } 601 602 603 /** 604 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 605 * 606 * Send packet over as many bearers as necessary to reach all nodes 607 * that have joined the broadcast link. 608 * 609 * Returns 0 (packet sent successfully) under all circumstances, 610 * since the broadcast link's pseudo-bearer never blocks 611 */ 612 static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, 613 struct tipc_bearer *unused1, 614 struct tipc_media_addr *unused2) 615 { 616 int bp_index; 617 struct tipc_msg *msg = buf_msg(buf); 618 struct tipc_net *tn = net_generic(net, tipc_net_id); 619 struct tipc_bcbearer *bcbearer = tn->bcbearer; 620 struct tipc_bclink *bclink = tn->bclink; 621 622 /* Prepare broadcast link message for reliable transmission, 623 * if first time trying to send it; 624 * preparation is skipped for broadcast link protocol messages 625 * since they are sent in an unreliable manner and don't need it 626 */ 627 if (likely(!msg_non_seq(buf_msg(buf)))) { 628 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 629 msg_set_non_seq(msg, 1); 630 msg_set_mc_netid(msg, tn->net_id); 631 tn->bcl->stats.sent_info++; 632 if (WARN_ON(!bclink->bcast_nodes.count)) { 633 dump_stack(); 634 return 0; 635 } 636 } 637 638 /* Send buffer over bearers until all targets reached */ 639 bcbearer->remains = bclink->bcast_nodes; 640 641 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 642 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 643 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 644 struct tipc_bearer *bp[2] = {p, s}; 645 struct tipc_bearer *b = bp[msg_link_selector(msg)]; 646 struct sk_buff *tbuf; 647 648 if (!p) 649 break; /* No more bearers to try */ 650 if (!b) 651 b = p; 652 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 653 &bcbearer->remains_new); 654 if (bcbearer->remains_new.count == bcbearer->remains.count) 655 continue; /* Nothing added by bearer pair */ 656 657 if (bp_index == 0) { 658 /* Use original buffer for first bearer */ 659 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr); 660 } else { 661 /* Avoid concurrent buffer access */ 662 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); 663 if (!tbuf) 664 break; 665 tipc_bearer_send(net, b->identity, tbuf, 666 &b->bcast_addr); 667 kfree_skb(tbuf); /* Bearer keeps a clone */ 668 } 669 if (bcbearer->remains_new.count == 0) 670 break; /* All targets reached */ 671 672 bcbearer->remains = bcbearer->remains_new; 673 } 674 675 return 0; 676 } 677 678 /** 679 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 680 */ 681 void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, 682 u32 node, bool action) 683 { 684 struct tipc_net *tn = net_generic(net, tipc_net_id); 685 struct tipc_bcbearer *bcbearer = tn->bcbearer; 686 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 687 struct tipc_bcbearer_pair *bp_curr; 688 struct tipc_bearer *b; 689 int b_index; 690 int pri; 691 692 tipc_bclink_lock(net); 693 694 if (action) 695 tipc_nmap_add(nm_ptr, node); 696 else 697 tipc_nmap_remove(nm_ptr, node); 698 699 /* Group bearers by priority (can assume max of two per priority) */ 700 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 701 702 rcu_read_lock(); 703 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 704 b = rcu_dereference_rtnl(tn->bearer_list[b_index]); 705 if (!b || !b->nodes.count) 706 continue; 707 708 if (!bp_temp[b->priority].primary) 709 bp_temp[b->priority].primary = b; 710 else 711 bp_temp[b->priority].secondary = b; 712 } 713 rcu_read_unlock(); 714 715 /* Create array of bearer pairs for broadcasting */ 716 bp_curr = bcbearer->bpairs; 717 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 718 719 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 720 721 if (!bp_temp[pri].primary) 722 continue; 723 724 bp_curr->primary = bp_temp[pri].primary; 725 726 if (bp_temp[pri].secondary) { 727 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 728 &bp_temp[pri].secondary->nodes)) { 729 bp_curr->secondary = bp_temp[pri].secondary; 730 } else { 731 bp_curr++; 732 bp_curr->primary = bp_temp[pri].secondary; 733 } 734 } 735 736 bp_curr++; 737 } 738 739 tipc_bclink_unlock(net); 740 } 741 742 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 743 struct tipc_stats *stats) 744 { 745 int i; 746 struct nlattr *nest; 747 748 struct nla_map { 749 __u32 key; 750 __u32 val; 751 }; 752 753 struct nla_map map[] = { 754 {TIPC_NLA_STATS_RX_INFO, stats->recv_info}, 755 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments}, 756 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented}, 757 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles}, 758 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled}, 759 {TIPC_NLA_STATS_TX_INFO, stats->sent_info}, 760 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments}, 761 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented}, 762 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles}, 763 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled}, 764 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks}, 765 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv}, 766 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks}, 767 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks}, 768 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted}, 769 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates}, 770 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs}, 771 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz}, 772 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ? 773 (stats->accu_queue_sz / stats->queue_sz_counts) : 0} 774 }; 775 776 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS); 777 if (!nest) 778 return -EMSGSIZE; 779 780 for (i = 0; i < ARRAY_SIZE(map); i++) 781 if (nla_put_u32(skb, map[i].key, map[i].val)) 782 goto msg_full; 783 784 nla_nest_end(skb, nest); 785 786 return 0; 787 msg_full: 788 nla_nest_cancel(skb, nest); 789 790 return -EMSGSIZE; 791 } 792 793 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) 794 { 795 int err; 796 void *hdr; 797 struct nlattr *attrs; 798 struct nlattr *prop; 799 struct tipc_net *tn = net_generic(net, tipc_net_id); 800 struct tipc_link *bcl = tn->bcl; 801 802 if (!bcl) 803 return 0; 804 805 tipc_bclink_lock(net); 806 807 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 808 NLM_F_MULTI, TIPC_NL_LINK_GET); 809 if (!hdr) 810 return -EMSGSIZE; 811 812 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); 813 if (!attrs) 814 goto msg_full; 815 816 /* The broadcast link is always up */ 817 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 818 goto attr_msg_full; 819 820 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST)) 821 goto attr_msg_full; 822 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name)) 823 goto attr_msg_full; 824 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no)) 825 goto attr_msg_full; 826 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no)) 827 goto attr_msg_full; 828 829 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 830 if (!prop) 831 goto attr_msg_full; 832 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0])) 833 goto prop_msg_full; 834 nla_nest_end(msg->skb, prop); 835 836 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); 837 if (err) 838 goto attr_msg_full; 839 840 tipc_bclink_unlock(net); 841 nla_nest_end(msg->skb, attrs); 842 genlmsg_end(msg->skb, hdr); 843 844 return 0; 845 846 prop_msg_full: 847 nla_nest_cancel(msg->skb, prop); 848 attr_msg_full: 849 nla_nest_cancel(msg->skb, attrs); 850 msg_full: 851 tipc_bclink_unlock(net); 852 genlmsg_cancel(msg->skb, hdr); 853 854 return -EMSGSIZE; 855 } 856 857 int tipc_bclink_reset_stats(struct net *net) 858 { 859 struct tipc_net *tn = net_generic(net, tipc_net_id); 860 struct tipc_link *bcl = tn->bcl; 861 862 if (!bcl) 863 return -ENOPROTOOPT; 864 865 tipc_bclink_lock(net); 866 memset(&bcl->stats, 0, sizeof(bcl->stats)); 867 tipc_bclink_unlock(net); 868 return 0; 869 } 870 871 int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 872 { 873 struct tipc_net *tn = net_generic(net, tipc_net_id); 874 struct tipc_link *bcl = tn->bcl; 875 876 if (!bcl) 877 return -ENOPROTOOPT; 878 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 879 return -EINVAL; 880 881 tipc_bclink_lock(net); 882 tipc_link_set_queue_limits(bcl, limit); 883 tipc_bclink_unlock(net); 884 return 0; 885 } 886 887 int tipc_bclink_init(struct net *net) 888 { 889 struct tipc_net *tn = net_generic(net, tipc_net_id); 890 struct tipc_bcbearer *bcbearer; 891 struct tipc_bclink *bclink; 892 struct tipc_link *bcl; 893 894 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 895 if (!bcbearer) 896 return -ENOMEM; 897 898 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 899 if (!bclink) { 900 kfree(bcbearer); 901 return -ENOMEM; 902 } 903 904 bcl = &bclink->link; 905 bcbearer->bearer.media = &bcbearer->media; 906 bcbearer->media.send_msg = tipc_bcbearer_send; 907 sprintf(bcbearer->media.name, "tipc-broadcast"); 908 909 spin_lock_init(&bclink->lock); 910 __skb_queue_head_init(&bcl->transmq); 911 __skb_queue_head_init(&bcl->backlogq); 912 __skb_queue_head_init(&bcl->deferdq); 913 skb_queue_head_init(&bcl->wakeupq); 914 bcl->next_out_no = 1; 915 spin_lock_init(&bclink->node.lock); 916 __skb_queue_head_init(&bclink->arrvq); 917 skb_queue_head_init(&bclink->inputq); 918 bcl->owner = &bclink->node; 919 bcl->owner->net = net; 920 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 921 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 922 bcl->bearer_id = MAX_BEARERS; 923 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); 924 bcl->state = WORKING_WORKING; 925 bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; 926 msg_set_prevnode(bcl->pmsg, tn->own_addr); 927 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 928 tn->bcbearer = bcbearer; 929 tn->bclink = bclink; 930 tn->bcl = bcl; 931 return 0; 932 } 933 934 void tipc_bclink_stop(struct net *net) 935 { 936 struct tipc_net *tn = net_generic(net, tipc_net_id); 937 938 tipc_bclink_lock(net); 939 tipc_link_purge_queues(tn->bcl); 940 tipc_bclink_unlock(net); 941 942 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); 943 synchronize_net(); 944 kfree(tn->bcbearer); 945 kfree(tn->bclink); 946 } 947 948 /** 949 * tipc_nmap_add - add a node to a node map 950 */ 951 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 952 { 953 int n = tipc_node(node); 954 int w = n / WSIZE; 955 u32 mask = (1 << (n % WSIZE)); 956 957 if ((nm_ptr->map[w] & mask) == 0) { 958 nm_ptr->count++; 959 nm_ptr->map[w] |= mask; 960 } 961 } 962 963 /** 964 * tipc_nmap_remove - remove a node from a node map 965 */ 966 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 967 { 968 int n = tipc_node(node); 969 int w = n / WSIZE; 970 u32 mask = (1 << (n % WSIZE)); 971 972 if ((nm_ptr->map[w] & mask) != 0) { 973 nm_ptr->map[w] &= ~mask; 974 nm_ptr->count--; 975 } 976 } 977 978 /** 979 * tipc_nmap_diff - find differences between node maps 980 * @nm_a: input node map A 981 * @nm_b: input node map B 982 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 983 */ 984 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 985 struct tipc_node_map *nm_b, 986 struct tipc_node_map *nm_diff) 987 { 988 int stop = ARRAY_SIZE(nm_a->map); 989 int w; 990 int b; 991 u32 map; 992 993 memset(nm_diff, 0, sizeof(*nm_diff)); 994 for (w = 0; w < stop; w++) { 995 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 996 nm_diff->map[w] = map; 997 if (map != 0) { 998 for (b = 0 ; b < WSIZE; b++) { 999 if (map & (1 << b)) 1000 nm_diff->count++; 1001 } 1002 } 1003 } 1004 } 1005