1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014-2017, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include <linux/tipc_config.h> 39 #include "socket.h" 40 #include "msg.h" 41 #include "bcast.h" 42 #include "link.h" 43 #include "name_table.h" 44 45 #define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 46 #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 47 48 const char tipc_bclink_name[] = "broadcast-link"; 49 unsigned long sysctl_tipc_bc_retruni __read_mostly; 50 51 /** 52 * struct tipc_bc_base - base structure for keeping broadcast send state 53 * @link: broadcast send link structure 54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages 55 * @dests: array keeping number of reachable destinations per bearer 56 * @primary_bearer: a bearer having links to all broadcast destinations, if any 57 * @bcast_support: indicates if primary bearer, if any, supports broadcast 58 * @force_bcast: forces broadcast for multicast traffic 59 * @rcast_support: indicates if all peer nodes support replicast 60 * @force_rcast: forces replicast for multicast traffic 61 * @rc_ratio: dest count as percentage of cluster size where send method changes 62 * @bc_threshold: calculated from rc_ratio; if dests > threshold use broadcast 63 */ 64 struct tipc_bc_base { 65 struct tipc_link *link; 66 struct sk_buff_head inputq; 67 int dests[MAX_BEARERS]; 68 int primary_bearer; 69 bool bcast_support; 70 bool force_bcast; 71 bool rcast_support; 72 bool force_rcast; 73 int rc_ratio; 74 int bc_threshold; 75 }; 76 77 static struct tipc_bc_base *tipc_bc_base(struct net *net) 78 { 79 return tipc_net(net)->bcbase; 80 } 81 82 /* tipc_bcast_get_mtu(): -get the MTU currently used by broadcast link 83 * Note: the MTU is decremented to give room for a tunnel header, in 84 * case the message needs to be sent as replicast 85 */ 86 int tipc_bcast_get_mtu(struct net *net) 87 { 88 return tipc_link_mss(tipc_bc_sndlink(net)); 89 } 90 91 void tipc_bcast_toggle_rcast(struct net *net, bool supp) 92 { 93 tipc_bc_base(net)->rcast_support = supp; 94 } 95 96 static void tipc_bcbase_calc_bc_threshold(struct net *net) 97 { 98 struct tipc_bc_base *bb = tipc_bc_base(net); 99 int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net)); 100 101 bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100); 102 } 103 104 /* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 105 * if any, and make it primary bearer 106 */ 107 static void tipc_bcbase_select_primary(struct net *net) 108 { 109 struct tipc_bc_base *bb = tipc_bc_base(net); 110 int all_dests = tipc_link_bc_peers(bb->link); 111 int max_win = tipc_link_max_win(bb->link); 112 int min_win = tipc_link_min_win(bb->link); 113 int i, mtu, prim; 114 115 bb->primary_bearer = INVALID_BEARER_ID; 116 bb->bcast_support = true; 117 118 if (!all_dests) 119 return; 120 121 for (i = 0; i < MAX_BEARERS; i++) { 122 if (!bb->dests[i]) 123 continue; 124 125 mtu = tipc_bearer_mtu(net, i); 126 if (mtu < tipc_link_mtu(bb->link)) { 127 tipc_link_set_mtu(bb->link, mtu); 128 tipc_link_set_queue_limits(bb->link, 129 min_win, 130 max_win); 131 } 132 bb->bcast_support &= tipc_bearer_bcast_support(net, i); 133 if (bb->dests[i] < all_dests) 134 continue; 135 136 bb->primary_bearer = i; 137 138 /* Reduce risk that all nodes select same primary */ 139 if ((i ^ tipc_own_addr(net)) & 1) 140 break; 141 } 142 prim = bb->primary_bearer; 143 if (prim != INVALID_BEARER_ID) 144 bb->bcast_support = tipc_bearer_bcast_support(net, prim); 145 } 146 147 void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) 148 { 149 struct tipc_bc_base *bb = tipc_bc_base(net); 150 151 tipc_bcast_lock(net); 152 bb->dests[bearer_id]++; 153 tipc_bcbase_select_primary(net); 154 tipc_bcast_unlock(net); 155 } 156 157 void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id) 158 { 159 struct tipc_bc_base *bb = tipc_bc_base(net); 160 161 tipc_bcast_lock(net); 162 bb->dests[bearer_id]--; 163 tipc_bcbase_select_primary(net); 164 tipc_bcast_unlock(net); 165 } 166 167 /* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers 168 * 169 * Note that number of reachable destinations, as indicated in the dests[] 170 * array, may transitionally differ from the number of destinations indicated 171 * in each sent buffer. We can sustain this. Excess destination nodes will 172 * drop and never acknowledge the unexpected packets, and missing destinations 173 * will either require retransmission (if they are just about to be added to 174 * the bearer), or be removed from the buffer's 'ackers' counter (if they 175 * just went down) 176 */ 177 static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq) 178 { 179 int bearer_id; 180 struct tipc_bc_base *bb = tipc_bc_base(net); 181 struct sk_buff *skb, *_skb; 182 struct sk_buff_head _xmitq; 183 184 if (skb_queue_empty(xmitq)) 185 return; 186 187 /* The typical case: at least one bearer has links to all nodes */ 188 bearer_id = bb->primary_bearer; 189 if (bearer_id >= 0) { 190 tipc_bearer_bc_xmit(net, bearer_id, xmitq); 191 return; 192 } 193 194 /* We have to transmit across all bearers */ 195 __skb_queue_head_init(&_xmitq); 196 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 197 if (!bb->dests[bearer_id]) 198 continue; 199 200 skb_queue_walk(xmitq, skb) { 201 _skb = pskb_copy_for_clone(skb, GFP_ATOMIC); 202 if (!_skb) 203 break; 204 __skb_queue_tail(&_xmitq, _skb); 205 } 206 tipc_bearer_bc_xmit(net, bearer_id, &_xmitq); 207 } 208 __skb_queue_purge(xmitq); 209 __skb_queue_purge(&_xmitq); 210 } 211 212 static void tipc_bcast_select_xmit_method(struct net *net, int dests, 213 struct tipc_mc_method *method) 214 { 215 struct tipc_bc_base *bb = tipc_bc_base(net); 216 unsigned long exp = method->expires; 217 218 /* Broadcast supported by used bearer/bearers? */ 219 if (!bb->bcast_support) { 220 method->rcast = true; 221 return; 222 } 223 /* Any destinations which don't support replicast ? */ 224 if (!bb->rcast_support) { 225 method->rcast = false; 226 return; 227 } 228 /* Can current method be changed ? */ 229 method->expires = jiffies + TIPC_METHOD_EXPIRE; 230 if (method->mandatory) 231 return; 232 233 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) && 234 time_before(jiffies, exp)) 235 return; 236 237 /* Configuration as force 'broadcast' method */ 238 if (bb->force_bcast) { 239 method->rcast = false; 240 return; 241 } 242 /* Configuration as force 'replicast' method */ 243 if (bb->force_rcast) { 244 method->rcast = true; 245 return; 246 } 247 /* Configuration as 'autoselect' or default method */ 248 /* Determine method to use now */ 249 method->rcast = dests <= bb->bc_threshold; 250 } 251 252 /* tipc_bcast_xmit - broadcast the buffer chain to all external nodes 253 * @net: the applicable net namespace 254 * @pkts: chain of buffers containing message 255 * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 256 * Consumes the buffer chain. 257 * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE 258 */ 259 int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, 260 u16 *cong_link_cnt) 261 { 262 struct tipc_link *l = tipc_bc_sndlink(net); 263 struct sk_buff_head xmitq; 264 int rc = 0; 265 266 __skb_queue_head_init(&xmitq); 267 tipc_bcast_lock(net); 268 if (tipc_link_bc_peers(l)) 269 rc = tipc_link_xmit(l, pkts, &xmitq); 270 tipc_bcast_unlock(net); 271 tipc_bcbase_xmit(net, &xmitq); 272 __skb_queue_purge(pkts); 273 if (rc == -ELINKCONG) { 274 *cong_link_cnt = 1; 275 rc = 0; 276 } 277 return rc; 278 } 279 280 /* tipc_rcast_xmit - replicate and send a message to given destination nodes 281 * @net: the applicable net namespace 282 * @pkts: chain of buffers containing message 283 * @dests: list of destination nodes 284 * @cong_link_cnt: returns number of congested links 285 * @cong_links: returns identities of congested links 286 * Returns 0 if success, otherwise errno 287 */ 288 static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, 289 struct tipc_nlist *dests, u16 *cong_link_cnt) 290 { 291 struct tipc_dest *dst, *tmp; 292 struct sk_buff_head _pkts; 293 u32 dnode, selector; 294 295 selector = msg_link_selector(buf_msg(skb_peek(pkts))); 296 __skb_queue_head_init(&_pkts); 297 298 list_for_each_entry_safe(dst, tmp, &dests->list, list) { 299 dnode = dst->node; 300 if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts)) 301 return -ENOMEM; 302 303 /* Any other return value than -ELINKCONG is ignored */ 304 if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG) 305 (*cong_link_cnt)++; 306 } 307 return 0; 308 } 309 310 /* tipc_mcast_send_sync - deliver a dummy message with SYN bit 311 * @net: the applicable net namespace 312 * @skb: socket buffer to copy 313 * @method: send method to be used 314 * @dests: destination nodes for message. 315 * Returns 0 if success, otherwise errno 316 */ 317 static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb, 318 struct tipc_mc_method *method, 319 struct tipc_nlist *dests) 320 { 321 struct tipc_msg *hdr, *_hdr; 322 struct sk_buff_head tmpq; 323 struct sk_buff *_skb; 324 u16 cong_link_cnt; 325 int rc = 0; 326 327 /* Is a cluster supporting with new capabilities ? */ 328 if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) 329 return 0; 330 331 hdr = buf_msg(skb); 332 if (msg_user(hdr) == MSG_FRAGMENTER) 333 hdr = msg_inner_hdr(hdr); 334 if (msg_type(hdr) != TIPC_MCAST_MSG) 335 return 0; 336 337 /* Allocate dummy message */ 338 _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL); 339 if (!_skb) 340 return -ENOMEM; 341 342 /* Preparing for 'synching' header */ 343 msg_set_syn(hdr, 1); 344 345 /* Copy skb's header into a dummy header */ 346 skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE); 347 skb_orphan(_skb); 348 349 /* Reverse method for dummy message */ 350 _hdr = buf_msg(_skb); 351 msg_set_size(_hdr, MCAST_H_SIZE); 352 msg_set_is_rcast(_hdr, !msg_is_rcast(hdr)); 353 msg_set_errcode(_hdr, TIPC_ERR_NO_PORT); 354 355 __skb_queue_head_init(&tmpq); 356 __skb_queue_tail(&tmpq, _skb); 357 if (method->rcast) 358 rc = tipc_bcast_xmit(net, &tmpq, &cong_link_cnt); 359 else 360 rc = tipc_rcast_xmit(net, &tmpq, dests, &cong_link_cnt); 361 362 /* This queue should normally be empty by now */ 363 __skb_queue_purge(&tmpq); 364 365 return rc; 366 } 367 368 /* tipc_mcast_xmit - deliver message to indicated destination nodes 369 * and to identified node local sockets 370 * @net: the applicable net namespace 371 * @pkts: chain of buffers containing message 372 * @method: send method to be used 373 * @dests: destination nodes for message. 374 * @cong_link_cnt: returns number of encountered congested destination links 375 * Consumes buffer chain. 376 * Returns 0 if success, otherwise errno 377 */ 378 int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, 379 struct tipc_mc_method *method, struct tipc_nlist *dests, 380 u16 *cong_link_cnt) 381 { 382 struct sk_buff_head inputq, localq; 383 bool rcast = method->rcast; 384 struct tipc_msg *hdr; 385 struct sk_buff *skb; 386 int rc = 0; 387 388 skb_queue_head_init(&inputq); 389 __skb_queue_head_init(&localq); 390 391 /* Clone packets before they are consumed by next call */ 392 if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { 393 rc = -ENOMEM; 394 goto exit; 395 } 396 /* Send according to determined transmit method */ 397 if (dests->remote) { 398 tipc_bcast_select_xmit_method(net, dests->remote, method); 399 400 skb = skb_peek(pkts); 401 hdr = buf_msg(skb); 402 if (msg_user(hdr) == MSG_FRAGMENTER) 403 hdr = msg_inner_hdr(hdr); 404 msg_set_is_rcast(hdr, method->rcast); 405 406 /* Switch method ? */ 407 if (rcast != method->rcast) { 408 rc = tipc_mcast_send_sync(net, skb, method, dests); 409 if (unlikely(rc)) { 410 pr_err("Unable to send SYN: method %d, rc %d\n", 411 rcast, rc); 412 goto exit; 413 } 414 } 415 416 if (method->rcast) 417 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); 418 else 419 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); 420 } 421 422 if (dests->local) { 423 tipc_loopback_trace(net, &localq); 424 tipc_sk_mcast_rcv(net, &localq, &inputq); 425 } 426 exit: 427 /* This queue should normally be empty by now */ 428 __skb_queue_purge(pkts); 429 return rc; 430 } 431 432 /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link 433 * 434 * RCU is locked, no other locks set 435 */ 436 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) 437 { 438 struct tipc_msg *hdr = buf_msg(skb); 439 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 440 struct sk_buff_head xmitq; 441 int rc; 442 443 __skb_queue_head_init(&xmitq); 444 445 if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { 446 kfree_skb(skb); 447 return 0; 448 } 449 450 tipc_bcast_lock(net); 451 if (msg_user(hdr) == BCAST_PROTOCOL) 452 rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); 453 else 454 rc = tipc_link_rcv(l, skb, NULL); 455 tipc_bcast_unlock(net); 456 457 tipc_bcbase_xmit(net, &xmitq); 458 459 /* Any socket wakeup messages ? */ 460 if (!skb_queue_empty(inputq)) 461 tipc_sk_rcv(net, inputq); 462 463 return rc; 464 } 465 466 /* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge 467 * 468 * RCU is locked, no other locks set 469 */ 470 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, 471 struct tipc_msg *hdr) 472 { 473 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 474 u16 acked = msg_bcast_ack(hdr); 475 struct sk_buff_head xmitq; 476 477 /* Ignore bc acks sent by peer before bcast synch point was received */ 478 if (msg_bc_ack_invalid(hdr)) 479 return; 480 481 __skb_queue_head_init(&xmitq); 482 483 tipc_bcast_lock(net); 484 tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL); 485 tipc_bcast_unlock(net); 486 487 tipc_bcbase_xmit(net, &xmitq); 488 489 /* Any socket wakeup messages ? */ 490 if (!skb_queue_empty(inputq)) 491 tipc_sk_rcv(net, inputq); 492 } 493 494 /* tipc_bcast_synch_rcv - check and update rcv link with peer's send state 495 * 496 * RCU is locked, no other locks set 497 */ 498 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, 499 struct tipc_msg *hdr, 500 struct sk_buff_head *retrq) 501 { 502 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 503 struct tipc_gap_ack_blks *ga; 504 struct sk_buff_head xmitq; 505 int rc = 0; 506 507 __skb_queue_head_init(&xmitq); 508 509 tipc_bcast_lock(net); 510 if (msg_type(hdr) != STATE_MSG) { 511 tipc_link_bc_init_rcv(l, hdr); 512 } else if (!msg_bc_ack_invalid(hdr)) { 513 tipc_get_gap_ack_blks(&ga, l, hdr, false); 514 if (!sysctl_tipc_bc_retruni) 515 retrq = &xmitq; 516 rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), 517 msg_bc_gap(hdr), ga, &xmitq, 518 retrq); 519 rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq); 520 } 521 tipc_bcast_unlock(net); 522 523 tipc_bcbase_xmit(net, &xmitq); 524 525 /* Any socket wakeup messages ? */ 526 if (!skb_queue_empty(inputq)) 527 tipc_sk_rcv(net, inputq); 528 return rc; 529 } 530 531 /* tipc_bcast_add_peer - add a peer node to broadcast link and bearer 532 * 533 * RCU is locked, node lock is set 534 */ 535 void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, 536 struct sk_buff_head *xmitq) 537 { 538 struct tipc_link *snd_l = tipc_bc_sndlink(net); 539 540 tipc_bcast_lock(net); 541 tipc_link_add_bc_peer(snd_l, uc_l, xmitq); 542 tipc_bcbase_select_primary(net); 543 tipc_bcbase_calc_bc_threshold(net); 544 tipc_bcast_unlock(net); 545 } 546 547 /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer 548 * 549 * RCU is locked, node lock is set 550 */ 551 void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) 552 { 553 struct tipc_link *snd_l = tipc_bc_sndlink(net); 554 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; 555 struct sk_buff_head xmitq; 556 557 __skb_queue_head_init(&xmitq); 558 559 tipc_bcast_lock(net); 560 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); 561 tipc_bcbase_select_primary(net); 562 tipc_bcbase_calc_bc_threshold(net); 563 tipc_bcast_unlock(net); 564 565 tipc_bcbase_xmit(net, &xmitq); 566 567 /* Any socket wakeup messages ? */ 568 if (!skb_queue_empty(inputq)) 569 tipc_sk_rcv(net, inputq); 570 } 571 572 int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l) 573 { 574 if (!l) 575 return -ENOPROTOOPT; 576 577 tipc_bcast_lock(net); 578 tipc_link_reset_stats(l); 579 tipc_bcast_unlock(net); 580 return 0; 581 } 582 583 static int tipc_bc_link_set_queue_limits(struct net *net, u32 max_win) 584 { 585 struct tipc_link *l = tipc_bc_sndlink(net); 586 587 if (!l) 588 return -ENOPROTOOPT; 589 if (max_win < BCLINK_WIN_MIN) 590 max_win = BCLINK_WIN_MIN; 591 if (max_win > TIPC_MAX_LINK_WIN) 592 return -EINVAL; 593 tipc_bcast_lock(net); 594 tipc_link_set_queue_limits(l, tipc_link_min_win(l), max_win); 595 tipc_bcast_unlock(net); 596 return 0; 597 } 598 599 static int tipc_bc_link_set_broadcast_mode(struct net *net, u32 bc_mode) 600 { 601 struct tipc_bc_base *bb = tipc_bc_base(net); 602 603 switch (bc_mode) { 604 case BCLINK_MODE_BCAST: 605 if (!bb->bcast_support) 606 return -ENOPROTOOPT; 607 608 bb->force_bcast = true; 609 bb->force_rcast = false; 610 break; 611 case BCLINK_MODE_RCAST: 612 if (!bb->rcast_support) 613 return -ENOPROTOOPT; 614 615 bb->force_bcast = false; 616 bb->force_rcast = true; 617 break; 618 case BCLINK_MODE_SEL: 619 if (!bb->bcast_support || !bb->rcast_support) 620 return -ENOPROTOOPT; 621 622 bb->force_bcast = false; 623 bb->force_rcast = false; 624 break; 625 default: 626 return -EINVAL; 627 } 628 629 return 0; 630 } 631 632 static int tipc_bc_link_set_broadcast_ratio(struct net *net, u32 bc_ratio) 633 { 634 struct tipc_bc_base *bb = tipc_bc_base(net); 635 636 if (!bb->bcast_support || !bb->rcast_support) 637 return -ENOPROTOOPT; 638 639 if (bc_ratio > 100 || bc_ratio <= 0) 640 return -EINVAL; 641 642 bb->rc_ratio = bc_ratio; 643 tipc_bcast_lock(net); 644 tipc_bcbase_calc_bc_threshold(net); 645 tipc_bcast_unlock(net); 646 647 return 0; 648 } 649 650 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) 651 { 652 int err; 653 u32 win; 654 u32 bc_mode; 655 u32 bc_ratio; 656 struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; 657 658 if (!attrs[TIPC_NLA_LINK_PROP]) 659 return -EINVAL; 660 661 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props); 662 if (err) 663 return err; 664 665 if (!props[TIPC_NLA_PROP_WIN] && 666 !props[TIPC_NLA_PROP_BROADCAST] && 667 !props[TIPC_NLA_PROP_BROADCAST_RATIO]) { 668 return -EOPNOTSUPP; 669 } 670 671 if (props[TIPC_NLA_PROP_BROADCAST]) { 672 bc_mode = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST]); 673 err = tipc_bc_link_set_broadcast_mode(net, bc_mode); 674 } 675 676 if (!err && props[TIPC_NLA_PROP_BROADCAST_RATIO]) { 677 bc_ratio = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST_RATIO]); 678 err = tipc_bc_link_set_broadcast_ratio(net, bc_ratio); 679 } 680 681 if (!err && props[TIPC_NLA_PROP_WIN]) { 682 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 683 err = tipc_bc_link_set_queue_limits(net, win); 684 } 685 686 return err; 687 } 688 689 int tipc_bcast_init(struct net *net) 690 { 691 struct tipc_net *tn = tipc_net(net); 692 struct tipc_bc_base *bb = NULL; 693 struct tipc_link *l = NULL; 694 695 bb = kzalloc(sizeof(*bb), GFP_KERNEL); 696 if (!bb) 697 goto enomem; 698 tn->bcbase = bb; 699 spin_lock_init(&tipc_net(net)->bclock); 700 701 if (!tipc_link_bc_create(net, 0, 0, NULL, 702 FB_MTU, 703 BCLINK_WIN_DEFAULT, 704 BCLINK_WIN_DEFAULT, 705 0, 706 &bb->inputq, 707 NULL, 708 NULL, 709 &l)) 710 goto enomem; 711 bb->link = l; 712 tn->bcl = l; 713 bb->rc_ratio = 10; 714 bb->rcast_support = true; 715 return 0; 716 enomem: 717 kfree(bb); 718 kfree(l); 719 return -ENOMEM; 720 } 721 722 void tipc_bcast_stop(struct net *net) 723 { 724 struct tipc_net *tn = net_generic(net, tipc_net_id); 725 726 synchronize_net(); 727 kfree(tn->bcbase); 728 kfree(tn->bcl); 729 } 730 731 void tipc_nlist_init(struct tipc_nlist *nl, u32 self) 732 { 733 memset(nl, 0, sizeof(*nl)); 734 INIT_LIST_HEAD(&nl->list); 735 nl->self = self; 736 } 737 738 void tipc_nlist_add(struct tipc_nlist *nl, u32 node) 739 { 740 if (node == nl->self) 741 nl->local = true; 742 else if (tipc_dest_push(&nl->list, node, 0)) 743 nl->remote++; 744 } 745 746 void tipc_nlist_del(struct tipc_nlist *nl, u32 node) 747 { 748 if (node == nl->self) 749 nl->local = false; 750 else if (tipc_dest_del(&nl->list, node, 0)) 751 nl->remote--; 752 } 753 754 void tipc_nlist_purge(struct tipc_nlist *nl) 755 { 756 tipc_dest_list_purge(&nl->list); 757 nl->remote = 0; 758 nl->local = false; 759 } 760 761 u32 tipc_bcast_get_mode(struct net *net) 762 { 763 struct tipc_bc_base *bb = tipc_bc_base(net); 764 765 if (bb->force_bcast) 766 return BCLINK_MODE_BCAST; 767 768 if (bb->force_rcast) 769 return BCLINK_MODE_RCAST; 770 771 if (bb->bcast_support && bb->rcast_support) 772 return BCLINK_MODE_SEL; 773 774 return 0; 775 } 776 777 u32 tipc_bcast_get_broadcast_ratio(struct net *net) 778 { 779 struct tipc_bc_base *bb = tipc_bc_base(net); 780 781 return bb->rc_ratio; 782 } 783 784 void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq, 785 struct sk_buff_head *inputq) 786 { 787 struct sk_buff *skb, *_skb, *tmp; 788 struct tipc_msg *hdr, *_hdr; 789 bool match = false; 790 u32 node, port; 791 792 skb = skb_peek(inputq); 793 if (!skb) 794 return; 795 796 hdr = buf_msg(skb); 797 798 if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) 799 return; 800 801 node = msg_orignode(hdr); 802 if (node == tipc_own_addr(net)) 803 return; 804 805 port = msg_origport(hdr); 806 807 /* Has the twin SYN message already arrived ? */ 808 skb_queue_walk(defq, _skb) { 809 _hdr = buf_msg(_skb); 810 if (msg_orignode(_hdr) != node) 811 continue; 812 if (msg_origport(_hdr) != port) 813 continue; 814 match = true; 815 break; 816 } 817 818 if (!match) { 819 if (!msg_is_syn(hdr)) 820 return; 821 __skb_dequeue(inputq); 822 __skb_queue_tail(defq, skb); 823 return; 824 } 825 826 /* Deliver non-SYN message from other link, otherwise queue it */ 827 if (!msg_is_syn(hdr)) { 828 if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) 829 return; 830 __skb_dequeue(inputq); 831 __skb_queue_tail(defq, skb); 832 return; 833 } 834 835 /* Queue non-SYN/SYN message from same link */ 836 if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) { 837 __skb_dequeue(inputq); 838 __skb_queue_tail(defq, skb); 839 return; 840 } 841 842 /* Matching SYN messages => return the one with data, if any */ 843 __skb_unlink(_skb, defq); 844 if (msg_data_sz(hdr)) { 845 kfree_skb(_skb); 846 } else { 847 __skb_dequeue(inputq); 848 kfree_skb(skb); 849 __skb_queue_tail(inputq, _skb); 850 } 851 852 /* Deliver subsequent non-SYN messages from same peer */ 853 skb_queue_walk_safe(defq, _skb, tmp) { 854 _hdr = buf_msg(_skb); 855 if (msg_orignode(_hdr) != node) 856 continue; 857 if (msg_origport(_hdr) != port) 858 continue; 859 if (msg_is_syn(_hdr)) 860 break; 861 __skb_unlink(_skb, defq); 862 __skb_queue_tail(inputq, _skb); 863 } 864 } 865