1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014-2017, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include <linux/tipc_config.h> 39 #include "socket.h" 40 #include "msg.h" 41 #include "bcast.h" 42 #include "link.h" 43 #include "name_table.h" 44 45 #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 46 #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 47 48 const char tipc_bclink_name[] = "broadcast-link"; 49 unsigned long sysctl_tipc_bc_retruni __read_mostly; 50 51 /** 52 * struct tipc_bc_base - base structure for keeping broadcast send state 53 * @link: broadcast send link structure 54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages 55 * @dests: array keeping number of reachable destinations per bearer 56 * @primary_bearer: a bearer having links to all broadcast destinations, if any 57 * @bcast_support: indicates if primary bearer, if any, supports broadcast 58 * @force_bcast: forces broadcast for multicast traffic 59 * @rcast_support: indicates if all peer nodes support replicast 60 * @force_rcast: forces replicast for multicast traffic 61 * @rc_ratio: dest count as percentage of cluster size where send method changes 62 * @bc_threshold: calculated from rc_ratio; if dests > threshold use broadcast 63 */ 64 struct tipc_bc_base { 65 struct tipc_link *link; 66 struct sk_buff_head inputq; 67 int dests[MAX_BEARERS]; 68 int primary_bearer; 69 bool bcast_support; 70 bool force_bcast; 71 bool rcast_support; 72 bool force_rcast; 73 int rc_ratio; 74 int bc_threshold; 75 }; 76 77 static struct tipc_bc_base *tipc_bc_base(struct net *net) 78 { 79 return tipc_net(net)->bcbase; 80 } 81 82 /* tipc_bcast_get_mtu(): -get the MTU currently used by broadcast link 83 * Note: the MTU is decremented to give room for a tunnel header, in 84 * case the message needs to be sent as replicast 85 */ 86 int tipc_bcast_get_mtu(struct net *net) 87 { 88 return tipc_link_mss(tipc_bc_sndlink(net)); 89 } 90 91 void tipc_bcast_toggle_rcast(struct net *net, bool supp) 92 { 93 tipc_bc_base(net)->rcast_support = supp; 94 } 95 96 static void tipc_bcbase_calc_bc_threshold(struct net *net) 97 { 98 struct tipc_bc_base *bb = tipc_bc_base(net); 99 int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net)); 100 101 bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100); 102 } 103 104 /* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 105 * if any, and make it primary bearer 106 */ 107 static void tipc_bcbase_select_primary(struct net *net) 108 { 109 struct tipc_bc_base *bb = tipc_bc_base(net); 110 int all_dests = tipc_link_bc_peers(bb->link); 111 int i, mtu, prim; 112 113 bb->primary_bearer = INVALID_BEARER_ID; 114 bb->bcast_support = true; 115 116 if (!all_dests) 117 return; 118 119 for (i = 0; i < MAX_BEARERS; i++) { 120 if (!bb->dests[i]) 121 continue; 122 123 mtu = tipc_bearer_mtu(net, i); 124 if (mtu < tipc_link_mtu(bb->link)) 125 tipc_link_set_mtu(bb->link, mtu); 126 bb->bcast_support &= tipc_bearer_bcast_support(net, i); 127 if (bb->dests[i] < all_dests) 128 continue; 129 130 bb->primary_bearer = i; 131 132 /* Reduce risk that all nodes select same primary */ 133 if ((i ^ tipc_own_addr(net)) & 1) 134 break; 135 } 136 prim = bb->primary_bearer; 137 if (prim != INVALID_BEARER_ID) 138 bb->bcast_support = tipc_bearer_bcast_support(net, prim); 139 } 140 141 void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) 142 { 143 struct tipc_bc_base *bb = tipc_bc_base(net); 144 145 tipc_bcast_lock(net); 146 bb->dests[bearer_id]++; 147 tipc_bcbase_select_primary(net); 148 tipc_bcast_unlock(net); 149 } 150 151 void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) 152 { 153 struct tipc_bc_base *bb = tipc_bc_base(net); 154 155 tipc_bcast_lock(net); 156 bb->dests[bearer_id]--; 157 tipc_bcbase_select_primary(net); 158 tipc_bcast_unlock(net); 159 } 160 161 /* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers 162 * 163 * Note that number of reachable destinations, as indicated in the dests[] 164 * array, may transitionally differ from the number of destinations indicated 165 * in each sent buffer. We can sustain this. Excess destination nodes will 166 * drop and never acknowledge the unexpected packets, and missing destinations 167 * will either require retransmission (if they are just about to be added to 168 * the bearer), or be removed from the buffer's 'ackers' counter (if they 169 * just went down) 170 */ 171 static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) 172 { 173 int bearer_id; 174 struct tipc_bc_base *bb = tipc_bc_base(net); 175 struct sk_buff *skb, *_skb; 176 struct sk_buff_head _xmitq; 177 178 if (skb_queue_empty(xmitq)) 179 return; 180 181 /* The typical case: at least one bearer has links to all nodes */ 182 bearer_id = bb->primary_bearer; 183 if (bearer_id >= 0) { 184 tipc_bearer_bc_xmit(net, bearer_id, xmitq); 185 return; 186 } 187 188 /* We have to transmit across all bearers */ 189 __skb_queue_head_init(&_xmitq); 190 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 191 if (!bb->dests[bearer_id]) 192 continue; 193 194 skb_queue_walk(xmitq, skb) { 195 _skb = pskb_copy_for_clone(skb, GFP_ATOMIC); 196 if (!_skb) 197 break; 198 __skb_queue_tail(&_xmitq, _skb); 199 } 200 tipc_bearer_bc_xmit(net, bearer_id, &_xmitq); 201 } 202 __skb_queue_purge(xmitq); 203 __skb_queue_purge(&_xmitq); 204 } 205 206 static void tipc_bcast_select_xmit_method(struct net *net, int dests, 207 struct tipc_mc_method *method) 208 { 209 struct tipc_bc_base *bb = tipc_bc_base(net); 210 unsigned long exp = method->expires; 211 212 /* Broadcast supported by used bearer/bearers? */ 213 if (!bb->bcast_support) { 214 method->rcast = true; 215 return; 216 } 217 /* Any destinations which don't support replicast ? */ 218 if (!bb->rcast_support) { 219 method->rcast = false; 220 return; 221 } 222 /* Can current method be changed ? */ 223 method->expires = jiffies + TIPC_METHOD_EXPIRE; 224 if (method->mandatory) 225 return; 226 227 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) && 228 time_before(jiffies, exp)) 229 return; 230 231 /* Configuration as force 'broadcast' method */ 232 if (bb->force_bcast) { 233 method->rcast = false; 234 return; 235 } 236 /* Configuration as force 'replicast' method */ 237 if (bb->force_rcast) { 238 method->rcast = true; 239 return; 240 } 241 /* Configuration as 'autoselect' or default method */ 242 /* Determine method to use now */ 243 method->rcast = dests <= bb->bc_threshold; 244 } 245 246 /* tipc_bcast_xmit - broadcast the buffer chain to all external nodes 247 * @net: the applicable net namespace 248 * @pkts: chain of buffers containing message 249 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 250 * Consumes the buffer chain. 251 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE 252 */ 253 static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, 254 u16 *cong_link_cnt) 255 { 256 struct tipc_link *l = tipc_bc_sndlink(net); 257 struct sk_buff_head xmitq; 258 int rc = 0; 259 260 __skb_queue_head_init(&xmitq); 261 tipc_bcast_lock(net); 262 if (tipc_link_bc_peers(l)) 263 rc = tipc_link_xmit(l, pkts, &xmitq); 264 tipc_bcast_unlock(net); 265 tipc_bcbase_xmit(net, &xmitq); 266 __skb_queue_purge(pkts); 267 if (rc == -ELINKCONG) { 268 *cong_link_cnt = 1; 269 rc = 0; 270 } 271 return rc; 272 } 273 274 /* tipc_rcast_xmit - replicate and send a message to given destination nodes 275 * @net: the applicable net namespace 276 * @pkts: chain of buffers containing message 277 * @dests: list of destination nodes 278 * @cong_link_cnt: returns number of congested links 279 * @cong_links: returns identities of congested links 280 * Returns 0 if success, otherwise errno 281 */ 282 static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, 283 struct tipc_nlist *dests, u16 *cong_link_cnt) 284 { 285 struct tipc_dest *dst, *tmp; 286 struct sk_buff_head _pkts; 287 u32 dnode, selector; 288 289 selector = msg_link_selector(buf_msg(skb_peek(pkts))); 290 __skb_queue_head_init(&_pkts); 291 292 list_for_each_entry_safe(dst, tmp, &dests->list, list) { 293 dnode = dst->node; 294 if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts)) 295 return -ENOMEM; 296 297 /* Any other return value than -ELINKCONG is ignored */ 298 if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG) 299 (*cong_link_cnt)++; 300 } 301 return 0; 302 } 303 304 /* tipc_mcast_send_sync - deliver a dummy message with SYN bit 305 * @net: the applicable net namespace 306 * @skb: socket buffer to copy 307 * @method: send method to be used 308 * @dests: destination nodes for message. 309 * Returns 0 if success, otherwise errno 310 */ 311 static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb, 312 struct tipc_mc_method *method, 313 struct tipc_nlist *dests) 314 { 315 struct tipc_msg *hdr, *_hdr; 316 struct sk_buff_head tmpq; 317 struct sk_buff *_skb; 318 u16 cong_link_cnt; 319 int rc = 0; 320 321 /* Is a cluster supporting with new capabilities ? */ 322 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) 323 return 0; 324 325 hdr = buf_msg(skb); 326 if (msg_user(hdr) == MSG_FRAGMENTER) 327 hdr = msg_inner_hdr(hdr); 328 if (msg_type(hdr) != TIPC_MCAST_MSG) 329 return 0; 330 331 /* Allocate dummy message */ 332 _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL); 333 if (!_skb) 334 return -ENOMEM; 335 336 /* Preparing for 'synching' header */ 337 msg_set_syn(hdr, 1); 338 339 /* Copy skb's header into a dummy header */ 340 skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE); 341 skb_orphan(_skb); 342 343 /* Reverse method for dummy message */ 344 _hdr = buf_msg(_skb); 345 msg_set_size(_hdr, MCAST_H_SIZE); 346 msg_set_is_rcast(_hdr, !msg_is_rcast(hdr)); 347 msg_set_errcode(_hdr, TIPC_ERR_NO_PORT); 348 349 __skb_queue_head_init(&tmpq); 350 __skb_queue_tail(&tmpq, _skb); 351 if (method->rcast) 352 rc = tipc_bcast_xmit(net, &tmpq, &cong_link_cnt); 353 else 354 rc = tipc_rcast_xmit(net, &tmpq, dests, &cong_link_cnt); 355 356 /* This queue should normally be empty by now */ 357 __skb_queue_purge(&tmpq); 358 359 return rc; 360 } 361 362 /* tipc_mcast_xmit - deliver message to indicated destination nodes 363 * and to identified node local sockets 364 * @net: the applicable net namespace 365 * @pkts: chain of buffers containing message 366 * @method: send method to be used 367 * @dests: destination nodes for message. 368 * @cong_link_cnt: returns number of encountered congested destination links 369 * Consumes buffer chain. 370 * Returns 0 if success, otherwise errno 371 */ 372 int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, 373 struct tipc_mc_method *method, struct tipc_nlist *dests, 374 u16 *cong_link_cnt) 375 { 376 struct sk_buff_head inputq, localq; 377 bool rcast = method->rcast; 378 struct tipc_msg *hdr; 379 struct sk_buff *skb; 380 int rc = 0; 381 382 skb_queue_head_init(&inputq); 383 __skb_queue_head_init(&localq); 384 385 /* Clone packets before they are consumed by next call */ 386 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { 387 rc = -ENOMEM; 388 goto exit; 389 } 390 /* Send according to determined transmit method */ 391 if (dests->remote) { 392 tipc_bcast_select_xmit_method(net, dests->remote, method); 393 394 skb = skb_peek(pkts); 395 hdr = buf_msg(skb); 396 if (msg_user(hdr) == MSG_FRAGMENTER) 397 hdr = msg_inner_hdr(hdr); 398 msg_set_is_rcast(hdr, method->rcast); 399 400 /* Switch method ? */ 401 if (rcast != method->rcast) { 402 rc = tipc_mcast_send_sync(net, skb, method, dests); 403 if (unlikely(rc)) { 404 pr_err("Unable to send SYN: method %d, rc %d\n", 405 rcast, rc); 406 goto exit; 407 } 408 } 409 410 if (method->rcast) 411 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); 412 else 413 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); 414 } 415 416 if (dests->local) { 417 tipc_loopback_trace(net, &localq); 418 tipc_sk_mcast_rcv(net, &localq, &inputq); 419 } 420 exit: 421 /* This queue should normally be empty by now */ 422 __skb_queue_purge(pkts); 423 return rc; 424 } 425 426 /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link 427 * 428 * RCU is locked, no other locks set 429 */ 430 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) 431 { 432 struct tipc_msg *hdr = buf_msg(skb); 433 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 434 struct sk_buff_head xmitq; 435 int rc; 436 437 __skb_queue_head_init(&xmitq); 438 439 if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { 440 kfree_skb(skb); 441 return 0; 442 } 443 444 tipc_bcast_lock(net); 445 if (msg_user(hdr) == BCAST_PROTOCOL) 446 rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); 447 else 448 rc = tipc_link_rcv(l, skb, NULL); 449 tipc_bcast_unlock(net); 450 451 tipc_bcbase_xmit(net, &xmitq); 452 453 /* Any socket wakeup messages ? */ 454 if (!skb_queue_empty(inputq)) 455 tipc_sk_rcv(net, inputq); 456 457 return rc; 458 } 459 460 /* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge 461 * 462 * RCU is locked, no other locks set 463 */ 464 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, 465 struct tipc_msg *hdr) 466 { 467 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 468 u16 acked = msg_bcast_ack(hdr); 469 struct sk_buff_head xmitq; 470 471 /* Ignore bc acks sent by peer before bcast synch point was received */ 472 if (msg_bc_ack_invalid(hdr)) 473 return; 474 475 __skb_queue_head_init(&xmitq); 476 477 tipc_bcast_lock(net); 478 tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL); 479 tipc_bcast_unlock(net); 480 481 tipc_bcbase_xmit(net, &xmitq); 482 483 /* Any socket wakeup messages ? */ 484 if (!skb_queue_empty(inputq)) 485 tipc_sk_rcv(net, inputq); 486 } 487 488 /* tipc_bcast_synch_rcv - check and update rcv link with peer's send state 489 * 490 * RCU is locked, no other locks set 491 */ 492 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, 493 struct tipc_msg *hdr, 494 struct sk_buff_head *retrq) 495 { 496 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 497 struct tipc_gap_ack_blks *ga; 498 struct sk_buff_head xmitq; 499 int rc = 0; 500 501 __skb_queue_head_init(&xmitq); 502 503 tipc_bcast_lock(net); 504 if (msg_type(hdr) != STATE_MSG) { 505 tipc_link_bc_init_rcv(l, hdr); 506 } else if (!msg_bc_ack_invalid(hdr)) { 507 tipc_get_gap_ack_blks(&ga, l, hdr, false); 508 if (!sysctl_tipc_bc_retruni) 509 retrq = &xmitq; 510 rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), 511 msg_bc_gap(hdr), ga, &xmitq, 512 retrq); 513 rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq); 514 } 515 tipc_bcast_unlock(net); 516 517 tipc_bcbase_xmit(net, &xmitq); 518 519 /* Any socket wakeup messages ? */ 520 if (!skb_queue_empty(inputq)) 521 tipc_sk_rcv(net, inputq); 522 return rc; 523 } 524 525 /* tipc_bcast_add_peer - add a peer node to broadcast link and bearer 526 * 527 * RCU is locked, node lock is set 528 */ 529 void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, 530 struct sk_buff_head *xmitq) 531 { 532 struct tipc_link *snd_l = tipc_bc_sndlink(net); 533 534 tipc_bcast_lock(net); 535 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 536 tipc_bcbase_select_primary(net); 537 tipc_bcbase_calc_bc_threshold(net); 538 tipc_bcast_unlock(net); 539 } 540 541 /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer 542 * 543 * RCU is locked, node lock is set 544 */ 545 void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) 546 { 547 struct tipc_link *snd_l = tipc_bc_sndlink(net); 548 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 549 struct sk_buff_head xmitq; 550 551 __skb_queue_head_init(&xmitq); 552 553 tipc_bcast_lock(net); 554 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 555 tipc_bcbase_select_primary(net); 556 tipc_bcbase_calc_bc_threshold(net); 557 tipc_bcast_unlock(net); 558 559 tipc_bcbase_xmit(net, &xmitq); 560 561 /* Any socket wakeup messages ? */ 562 if (!skb_queue_empty(inputq)) 563 tipc_sk_rcv(net, inputq); 564 } 565 566 int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l) 567 { 568 if (!l) 569 return -ENOPROTOOPT; 570 571 tipc_bcast_lock(net); 572 tipc_link_reset_stats(l); 573 tipc_bcast_unlock(net); 574 return 0; 575 } 576 577 static int tipc_bc_link_set_queue_limits(struct net *net, u32 max_win) 578 { 579 struct tipc_link *l = tipc_bc_sndlink(net); 580 581 if (!l) 582 return -ENOPROTOOPT; 583 if (max_win < BCLINK_WIN_MIN) 584 max_win = BCLINK_WIN_MIN; 585 if (max_win > TIPC_MAX_LINK_WIN) 586 return -EINVAL; 587 tipc_bcast_lock(net); 588 tipc_link_set_queue_limits(l, BCLINK_WIN_MIN, max_win); 589 tipc_bcast_unlock(net); 590 return 0; 591 } 592 593 static int tipc_bc_link_set_broadcast_mode(struct net *net, u32 bc_mode) 594 { 595 struct tipc_bc_base *bb = tipc_bc_base(net); 596 597 switch (bc_mode) { 598 case BCLINK_MODE_BCAST: 599 if (!bb->bcast_support) 600 return -ENOPROTOOPT; 601 602 bb->force_bcast = true; 603 bb->force_rcast = false; 604 break; 605 case BCLINK_MODE_RCAST: 606 if (!bb->rcast_support) 607 return -ENOPROTOOPT; 608 609 bb->force_bcast = false; 610 bb->force_rcast = true; 611 break; 612 case BCLINK_MODE_SEL: 613 if (!bb->bcast_support || !bb->rcast_support) 614 return -ENOPROTOOPT; 615 616 bb->force_bcast = false; 617 bb->force_rcast = false; 618 break; 619 default: 620 return -EINVAL; 621 } 622 623 return 0; 624 } 625 626 static int tipc_bc_link_set_broadcast_ratio(struct net *net, u32 bc_ratio) 627 { 628 struct tipc_bc_base *bb = tipc_bc_base(net); 629 630 if (!bb->bcast_support || !bb->rcast_support) 631 return -ENOPROTOOPT; 632 633 if (bc_ratio > 100 || bc_ratio <= 0) 634 return -EINVAL; 635 636 bb->rc_ratio = bc_ratio; 637 tipc_bcast_lock(net); 638 tipc_bcbase_calc_bc_threshold(net); 639 tipc_bcast_unlock(net); 640 641 return 0; 642 } 643 644 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) 645 { 646 int err; 647 u32 win; 648 u32 bc_mode; 649 u32 bc_ratio; 650 struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; 651 652 if (!attrs[TIPC_NLA_LINK_PROP]) 653 return -EINVAL; 654 655 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props); 656 if (err) 657 return err; 658 659 if (!props[TIPC_NLA_PROP_WIN] && 660 !props[TIPC_NLA_PROP_BROADCAST] && 661 !props[TIPC_NLA_PROP_BROADCAST_RATIO]) { 662 return -EOPNOTSUPP; 663 } 664 665 if (props[TIPC_NLA_PROP_BROADCAST]) { 666 bc_mode = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST]); 667 err = tipc_bc_link_set_broadcast_mode(net, bc_mode); 668 } 669 670 if (!err && props[TIPC_NLA_PROP_BROADCAST_RATIO]) { 671 bc_ratio = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST_RATIO]); 672 err = tipc_bc_link_set_broadcast_ratio(net, bc_ratio); 673 } 674 675 if (!err && props[TIPC_NLA_PROP_WIN]) { 676 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 677 err = tipc_bc_link_set_queue_limits(net, win); 678 } 679 680 return err; 681 } 682 683 int tipc_bcast_init(struct net *net) 684 { 685 struct tipc_net *tn = tipc_net(net); 686 struct tipc_bc_base *bb = NULL; 687 struct tipc_link *l = NULL; 688 689 bb = kzalloc(sizeof(*bb), GFP_KERNEL); 690 if (!bb) 691 goto enomem; 692 tn->bcbase = bb; 693 spin_lock_init(&tipc_net(net)->bclock); 694 695 if (!tipc_link_bc_create(net, 0, 0, NULL, 696 FB_MTU, 697 BCLINK_WIN_DEFAULT, 698 BCLINK_WIN_DEFAULT, 699 0, 700 &bb->inputq, 701 NULL, 702 NULL, 703 &l)) 704 goto enomem; 705 bb->link = l; 706 tn->bcl = l; 707 bb->rc_ratio = 10; 708 bb->rcast_support = true; 709 return 0; 710 enomem: 711 kfree(bb); 712 kfree(l); 713 return -ENOMEM; 714 } 715 716 void tipc_bcast_stop(struct net *net) 717 { 718 struct tipc_net *tn = net_generic(net, tipc_net_id); 719 720 synchronize_net(); 721 kfree(tn->bcbase); 722 kfree(tn->bcl); 723 } 724 725 void tipc_nlist_init(struct tipc_nlist *nl, u32 self) 726 { 727 memset(nl, 0, sizeof(*nl)); 728 INIT_LIST_HEAD(&nl->list); 729 nl->self = self; 730 } 731 732 void tipc_nlist_add(struct tipc_nlist *nl, u32 node) 733 { 734 if (node == nl->self) 735 nl->local = true; 736 else if (tipc_dest_push(&nl->list, node, 0)) 737 nl->remote++; 738 } 739 740 void tipc_nlist_del(struct tipc_nlist *nl, u32 node) 741 { 742 if (node == nl->self) 743 nl->local = false; 744 else if (tipc_dest_del(&nl->list, node, 0)) 745 nl->remote--; 746 } 747 748 void tipc_nlist_purge(struct tipc_nlist *nl) 749 { 750 tipc_dest_list_purge(&nl->list); 751 nl->remote = 0; 752 nl->local = false; 753 } 754 755 u32 tipc_bcast_get_broadcast_mode(struct net *net) 756 { 757 struct tipc_bc_base *bb = tipc_bc_base(net); 758 759 if (bb->force_bcast) 760 return BCLINK_MODE_BCAST; 761 762 if (bb->force_rcast) 763 return BCLINK_MODE_RCAST; 764 765 if (bb->bcast_support && bb->rcast_support) 766 return BCLINK_MODE_SEL; 767 768 return 0; 769 } 770 771 u32 tipc_bcast_get_broadcast_ratio(struct net *net) 772 { 773 struct tipc_bc_base *bb = tipc_bc_base(net); 774 775 return bb->rc_ratio; 776 } 777 778 void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq, 779 struct sk_buff_head *inputq) 780 { 781 struct sk_buff *skb, *_skb, *tmp; 782 struct tipc_msg *hdr, *_hdr; 783 bool match = false; 784 u32 node, port; 785 786 skb = skb_peek(inputq); 787 if (!skb) 788 return; 789 790 hdr = buf_msg(skb); 791 792 if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) 793 return; 794 795 node = msg_orignode(hdr); 796 if (node == tipc_own_addr(net)) 797 return; 798 799 port = msg_origport(hdr); 800 801 /* Has the twin SYN message already arrived ? */ 802 skb_queue_walk(defq, _skb) { 803 _hdr = buf_msg(_skb); 804 if (msg_orignode(_hdr) != node) 805 continue; 806 if (msg_origport(_hdr) != port) 807 continue; 808 match = true; 809 break; 810 } 811 812 if (!match) { 813 if (!msg_is_syn(hdr)) 814 return; 815 __skb_dequeue(inputq); 816 __skb_queue_tail(defq, skb); 817 return; 818 } 819 820 /* Deliver non-SYN message from other link, otherwise queue it */ 821 if (!msg_is_syn(hdr)) { 822 if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) 823 return; 824 __skb_dequeue(inputq); 825 __skb_queue_tail(defq, skb); 826 return; 827 } 828 829 /* Queue non-SYN/SYN message from same link */ 830 if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) { 831 __skb_dequeue(inputq); 832 __skb_queue_tail(defq, skb); 833 return; 834 } 835 836 /* Matching SYN messages => return the one with data, if any */ 837 __skb_unlink(_skb, defq); 838 if (msg_data_sz(hdr)) { 839 kfree_skb(_skb); 840 } else { 841 __skb_dequeue(inputq); 842 kfree_skb(skb); 843 __skb_queue_tail(inputq, _skb); 844 } 845 846 /* Deliver subsequent non-SYN messages from same peer */ 847 skb_queue_walk_safe(defq, _skb, tmp) { 848 _hdr = buf_msg(_skb); 849 if (msg_orignode(_hdr) != node) 850 continue; 851 if (msg_origport(_hdr) != port) 852 continue; 853 if (msg_is_syn(_hdr)) 854 break; 855 __skb_unlink(_skb, defq); 856 __skb_queue_tail(inputq, _skb); 857 } 858 } 859