1 /* 2 * net/tipc/bcast.c: TIPC broadcast code 3 * 4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB 5 * Copyright (c) 2004, Intel Corporation. 6 * Copyright (c) 2005, 2010-2011, Wind River Systems 7 * All rights reserved. 8 * 9 * Redistribution and use in source and binary forms, with or without 10 * modification, are permitted provided that the following conditions are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in the 16 * documentation and/or other materials provided with the distribution. 17 * 3. Neither the names of the copyright holders nor the names of its 18 * contributors may be used to endorse or promote products derived from 19 * this software without specific prior written permission. 20 * 21 * Alternatively, this software may be distributed under the terms of the 22 * GNU General Public License ("GPL") version 2 as published by the Free 23 * Software Foundation. 24 * 25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 * POSSIBILITY OF SUCH DAMAGE. 36 */ 37 38 #include "socket.h" 39 #include "msg.h" 40 #include "bcast.h" 41 #include "name_distr.h" 42 #include "core.h" 43 44 #define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */ 45 #define BCLINK_WIN_DEFAULT 20 /* bcast link window size (default) */ 46 47 const char tipc_bclink_name[] = "broadcast-link"; 48 49 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 50 struct tipc_node_map *nm_b, 51 struct tipc_node_map *nm_diff); 52 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 53 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 54 55 static void tipc_bclink_lock(struct net *net) 56 { 57 struct tipc_net *tn = net_generic(net, tipc_net_id); 58 59 spin_lock_bh(&tn->bclink->lock); 60 } 61 62 static void tipc_bclink_unlock(struct net *net) 63 { 64 struct tipc_net *tn = net_generic(net, tipc_net_id); 65 66 spin_unlock_bh(&tn->bclink->lock); 67 } 68 69 void tipc_bclink_input(struct net *net) 70 { 71 struct tipc_net *tn = net_generic(net, tipc_net_id); 72 73 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq); 74 } 75 76 uint tipc_bclink_get_mtu(void) 77 { 78 return MAX_PKT_DEFAULT_MCAST; 79 } 80 81 static u32 bcbuf_acks(struct sk_buff *buf) 82 { 83 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 84 } 85 86 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks) 87 { 88 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks; 89 } 90 91 static void bcbuf_decr_acks(struct sk_buff *buf) 92 { 93 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 94 } 95 96 void tipc_bclink_add_node(struct net *net, u32 addr) 97 { 98 struct tipc_net *tn = net_generic(net, tipc_net_id); 99 100 tipc_bclink_lock(net); 101 tipc_nmap_add(&tn->bclink->bcast_nodes, addr); 102 tipc_bclink_unlock(net); 103 } 104 105 void tipc_bclink_remove_node(struct net *net, u32 addr) 106 { 107 struct tipc_net *tn = net_generic(net, tipc_net_id); 108 109 tipc_bclink_lock(net); 110 tipc_nmap_remove(&tn->bclink->bcast_nodes, addr); 111 112 /* Last node? => reset backlog queue */ 113 if (!tn->bclink->bcast_nodes.count) 114 tipc_link_purge_backlog(&tn->bclink->link); 115 116 tipc_bclink_unlock(net); 117 } 118 119 static void bclink_set_last_sent(struct net *net) 120 { 121 struct tipc_net *tn = net_generic(net, tipc_net_id); 122 struct tipc_link *bcl = tn->bcl; 123 124 bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1); 125 } 126 127 u32 tipc_bclink_get_last_sent(struct net *net) 128 { 129 struct tipc_net *tn = net_generic(net, tipc_net_id); 130 131 return tn->bcl->silent_intv_cnt; 132 } 133 134 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 135 { 136 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 137 seqno : node->bclink.last_sent; 138 } 139 140 /** 141 * tipc_bclink_retransmit_to - get most recent node to request retransmission 142 * 143 * Called with bclink_lock locked 144 */ 145 struct tipc_node *tipc_bclink_retransmit_to(struct net *net) 146 { 147 struct tipc_net *tn = net_generic(net, tipc_net_id); 148 149 return tn->bclink->retransmit_to; 150 } 151 152 /** 153 * bclink_retransmit_pkt - retransmit broadcast packets 154 * @after: sequence number of last packet to *not* retransmit 155 * @to: sequence number of last packet to retransmit 156 * 157 * Called with bclink_lock locked 158 */ 159 static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to) 160 { 161 struct sk_buff *skb; 162 struct tipc_link *bcl = tn->bcl; 163 164 skb_queue_walk(&bcl->transmq, skb) { 165 if (more(buf_seqno(skb), after)) { 166 tipc_link_retransmit(bcl, skb, mod(to - after)); 167 break; 168 } 169 } 170 } 171 172 /** 173 * bclink_prepare_wakeup - prepare users for wakeup after congestion 174 * @bcl: broadcast link 175 * @resultq: queue for users which can be woken up 176 * Move a number of waiting users, as permitted by available space in 177 * the send queue, from link wait queue to specified queue for wakeup 178 */ 179 static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq) 180 { 181 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; 182 int imp, lim; 183 struct sk_buff *skb, *tmp; 184 185 skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) { 186 imp = TIPC_SKB_CB(skb)->chain_imp; 187 lim = bcl->window + bcl->backlog[imp].limit; 188 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz; 189 if ((pnd[imp] + bcl->backlog[imp].len) >= lim) 190 continue; 191 skb_unlink(skb, &bcl->wakeupq); 192 skb_queue_tail(resultq, skb); 193 } 194 } 195 196 /** 197 * tipc_bclink_wakeup_users - wake up pending users 198 * 199 * Called with no locks taken 200 */ 201 void tipc_bclink_wakeup_users(struct net *net) 202 { 203 struct tipc_net *tn = net_generic(net, tipc_net_id); 204 struct tipc_link *bcl = tn->bcl; 205 struct sk_buff_head resultq; 206 207 skb_queue_head_init(&resultq); 208 bclink_prepare_wakeup(bcl, &resultq); 209 tipc_sk_rcv(net, &resultq); 210 } 211 212 /** 213 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets 214 * @n_ptr: node that sent acknowledgement info 215 * @acked: broadcast sequence # that has been acknowledged 216 * 217 * Node is locked, bclink_lock unlocked. 218 */ 219 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 220 { 221 struct sk_buff *skb, *tmp; 222 unsigned int released = 0; 223 struct net *net = n_ptr->net; 224 struct tipc_net *tn = net_generic(net, tipc_net_id); 225 226 if (unlikely(!n_ptr->bclink.recv_permitted)) 227 return; 228 229 tipc_bclink_lock(net); 230 231 /* Bail out if tx queue is empty (no clean up is required) */ 232 skb = skb_peek(&tn->bcl->transmq); 233 if (!skb) 234 goto exit; 235 236 /* Determine which messages need to be acknowledged */ 237 if (acked == INVALID_LINK_SEQ) { 238 /* 239 * Contact with specified node has been lost, so need to 240 * acknowledge sent messages only (if other nodes still exist) 241 * or both sent and unsent messages (otherwise) 242 */ 243 if (tn->bclink->bcast_nodes.count) 244 acked = tn->bcl->silent_intv_cnt; 245 else 246 acked = tn->bcl->snd_nxt; 247 } else { 248 /* 249 * Bail out if specified sequence number does not correspond 250 * to a message that has been sent and not yet acknowledged 251 */ 252 if (less(acked, buf_seqno(skb)) || 253 less(tn->bcl->silent_intv_cnt, acked) || 254 less_eq(acked, n_ptr->bclink.acked)) 255 goto exit; 256 } 257 258 /* Skip over packets that node has previously acknowledged */ 259 skb_queue_walk(&tn->bcl->transmq, skb) { 260 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 261 break; 262 } 263 264 /* Update packets that node is now acknowledging */ 265 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) { 266 if (more(buf_seqno(skb), acked)) 267 break; 268 bcbuf_decr_acks(skb); 269 bclink_set_last_sent(net); 270 if (bcbuf_acks(skb) == 0) { 271 __skb_unlink(skb, &tn->bcl->transmq); 272 kfree_skb(skb); 273 released = 1; 274 } 275 } 276 n_ptr->bclink.acked = acked; 277 278 /* Try resolving broadcast link congestion, if necessary */ 279 if (unlikely(skb_peek(&tn->bcl->backlogq))) { 280 tipc_link_push_packets(tn->bcl); 281 bclink_set_last_sent(net); 282 } 283 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq))) 284 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 285 exit: 286 tipc_bclink_unlock(net); 287 } 288 289 /** 290 * tipc_bclink_update_link_state - update broadcast link state 291 * 292 * RCU and node lock set 293 */ 294 void tipc_bclink_update_link_state(struct tipc_node *n_ptr, 295 u32 last_sent) 296 { 297 struct sk_buff *buf; 298 struct net *net = n_ptr->net; 299 struct tipc_net *tn = net_generic(net, tipc_net_id); 300 301 /* Ignore "stale" link state info */ 302 if (less_eq(last_sent, n_ptr->bclink.last_in)) 303 return; 304 305 /* Update link synchronization state; quit if in sync */ 306 bclink_update_last_sent(n_ptr, last_sent); 307 308 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 309 return; 310 311 /* Update out-of-sync state; quit if loss is still unconfirmed */ 312 if ((++n_ptr->bclink.oos_state) == 1) { 313 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 314 return; 315 n_ptr->bclink.oos_state++; 316 } 317 318 /* Don't NACK if one has been recently sent (or seen) */ 319 if (n_ptr->bclink.oos_state & 0x1) 320 return; 321 322 /* Send NACK */ 323 buf = tipc_buf_acquire(INT_H_SIZE); 324 if (buf) { 325 struct tipc_msg *msg = buf_msg(buf); 326 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq); 327 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 328 329 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, 330 INT_H_SIZE, n_ptr->addr); 331 msg_set_non_seq(msg, 1); 332 msg_set_mc_netid(msg, tn->net_id); 333 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 334 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 335 msg_set_bcgap_to(msg, to); 336 337 tipc_bclink_lock(net); 338 tipc_bearer_send(net, MAX_BEARERS, buf, NULL); 339 tn->bcl->stats.sent_nacks++; 340 tipc_bclink_unlock(net); 341 kfree_skb(buf); 342 343 n_ptr->bclink.oos_state++; 344 } 345 } 346 347 void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) 348 { 349 u16 last = msg_last_bcast(hdr); 350 int mtyp = msg_type(hdr); 351 352 if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) 353 return; 354 if (mtyp == STATE_MSG) { 355 tipc_bclink_update_link_state(n, last); 356 return; 357 } 358 /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, 359 * and transfer synch info in LINK_PROTOCOL messages. 360 */ 361 if (tipc_node_is_up(n)) 362 return; 363 if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG)) 364 return; 365 n->bclink.last_sent = last; 366 n->bclink.last_in = last; 367 n->bclink.oos_state = 0; 368 } 369 370 /** 371 * bclink_peek_nack - monitor retransmission requests sent by other nodes 372 * 373 * Delay any upcoming NACK by this node if another node has already 374 * requested the first message this node is going to ask for. 375 */ 376 static void bclink_peek_nack(struct net *net, struct tipc_msg *msg) 377 { 378 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg)); 379 380 if (unlikely(!n_ptr)) 381 return; 382 383 tipc_node_lock(n_ptr); 384 if (n_ptr->bclink.recv_permitted && 385 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 386 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 387 n_ptr->bclink.oos_state = 2; 388 tipc_node_unlock(n_ptr); 389 tipc_node_put(n_ptr); 390 } 391 392 /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster 393 * and to identified node local sockets 394 * @net: the applicable net namespace 395 * @list: chain of buffers containing message 396 * Consumes the buffer chain, except when returning -ELINKCONG 397 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 398 */ 399 int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) 400 { 401 struct tipc_net *tn = net_generic(net, tipc_net_id); 402 struct tipc_link *bcl = tn->bcl; 403 struct tipc_bclink *bclink = tn->bclink; 404 int rc = 0; 405 int bc = 0; 406 struct sk_buff *skb; 407 struct sk_buff_head arrvq; 408 struct sk_buff_head inputq; 409 410 /* Prepare clone of message for local node */ 411 skb = tipc_msg_reassemble(list); 412 if (unlikely(!skb)) 413 return -EHOSTUNREACH; 414 415 /* Broadcast to all nodes */ 416 if (likely(bclink)) { 417 tipc_bclink_lock(net); 418 if (likely(bclink->bcast_nodes.count)) { 419 rc = __tipc_link_xmit(net, bcl, list); 420 if (likely(!rc)) { 421 u32 len = skb_queue_len(&bcl->transmq); 422 423 bclink_set_last_sent(net); 424 bcl->stats.queue_sz_counts++; 425 bcl->stats.accu_queue_sz += len; 426 } 427 bc = 1; 428 } 429 tipc_bclink_unlock(net); 430 } 431 432 if (unlikely(!bc)) 433 __skb_queue_purge(list); 434 435 if (unlikely(rc)) { 436 kfree_skb(skb); 437 return rc; 438 } 439 /* Deliver message clone */ 440 __skb_queue_head_init(&arrvq); 441 skb_queue_head_init(&inputq); 442 __skb_queue_tail(&arrvq, skb); 443 tipc_sk_mcast_rcv(net, &arrvq, &inputq); 444 return rc; 445 } 446 447 /** 448 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 449 * 450 * Called with both sending node's lock and bclink_lock taken. 451 */ 452 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 453 { 454 struct tipc_net *tn = net_generic(node->net, tipc_net_id); 455 456 bclink_update_last_sent(node, seqno); 457 node->bclink.last_in = seqno; 458 node->bclink.oos_state = 0; 459 tn->bcl->stats.recv_info++; 460 461 /* 462 * Unicast an ACK periodically, ensuring that 463 * all nodes in the cluster don't ACK at the same time 464 */ 465 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) { 466 tipc_link_proto_xmit(node_active_link(node, node->addr), 467 STATE_MSG, 0, 0, 0, 0); 468 tn->bcl->stats.sent_acks++; 469 } 470 } 471 472 /** 473 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards 474 * 475 * RCU is locked, no other locks set 476 */ 477 void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) 478 { 479 struct tipc_net *tn = net_generic(net, tipc_net_id); 480 struct tipc_link *bcl = tn->bcl; 481 struct tipc_msg *msg = buf_msg(buf); 482 struct tipc_node *node; 483 u32 next_in; 484 u32 seqno; 485 int deferred = 0; 486 int pos = 0; 487 struct sk_buff *iskb; 488 struct sk_buff_head *arrvq, *inputq; 489 490 /* Screen out unwanted broadcast messages */ 491 if (msg_mc_netid(msg) != tn->net_id) 492 goto exit; 493 494 node = tipc_node_find(net, msg_prevnode(msg)); 495 if (unlikely(!node)) 496 goto exit; 497 498 tipc_node_lock(node); 499 if (unlikely(!node->bclink.recv_permitted)) 500 goto unlock; 501 502 /* Handle broadcast protocol message */ 503 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 504 if (msg_type(msg) != STATE_MSG) 505 goto unlock; 506 if (msg_destnode(msg) == tn->own_addr) { 507 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 508 tipc_bclink_lock(net); 509 bcl->stats.recv_nacks++; 510 tn->bclink->retransmit_to = node; 511 bclink_retransmit_pkt(tn, msg_bcgap_after(msg), 512 msg_bcgap_to(msg)); 513 tipc_bclink_unlock(net); 514 tipc_node_unlock(node); 515 } else { 516 tipc_node_unlock(node); 517 bclink_peek_nack(net, msg); 518 } 519 tipc_node_put(node); 520 goto exit; 521 } 522 523 /* Handle in-sequence broadcast message */ 524 seqno = msg_seqno(msg); 525 next_in = mod(node->bclink.last_in + 1); 526 arrvq = &tn->bclink->arrvq; 527 inputq = &tn->bclink->inputq; 528 529 if (likely(seqno == next_in)) { 530 receive: 531 /* Deliver message to destination */ 532 if (likely(msg_isdata(msg))) { 533 tipc_bclink_lock(net); 534 bclink_accept_pkt(node, seqno); 535 spin_lock_bh(&inputq->lock); 536 __skb_queue_tail(arrvq, buf); 537 spin_unlock_bh(&inputq->lock); 538 node->action_flags |= TIPC_BCAST_MSG_EVT; 539 tipc_bclink_unlock(net); 540 tipc_node_unlock(node); 541 } else if (msg_user(msg) == MSG_BUNDLER) { 542 tipc_bclink_lock(net); 543 bclink_accept_pkt(node, seqno); 544 bcl->stats.recv_bundles++; 545 bcl->stats.recv_bundled += msg_msgcnt(msg); 546 pos = 0; 547 while (tipc_msg_extract(buf, &iskb, &pos)) { 548 spin_lock_bh(&inputq->lock); 549 __skb_queue_tail(arrvq, iskb); 550 spin_unlock_bh(&inputq->lock); 551 } 552 node->action_flags |= TIPC_BCAST_MSG_EVT; 553 tipc_bclink_unlock(net); 554 tipc_node_unlock(node); 555 } else if (msg_user(msg) == MSG_FRAGMENTER) { 556 tipc_bclink_lock(net); 557 bclink_accept_pkt(node, seqno); 558 tipc_buf_append(&node->bclink.reasm_buf, &buf); 559 if (unlikely(!buf && !node->bclink.reasm_buf)) { 560 tipc_bclink_unlock(net); 561 goto unlock; 562 } 563 bcl->stats.recv_fragments++; 564 if (buf) { 565 bcl->stats.recv_fragmented++; 566 msg = buf_msg(buf); 567 tipc_bclink_unlock(net); 568 goto receive; 569 } 570 tipc_bclink_unlock(net); 571 tipc_node_unlock(node); 572 } else { 573 tipc_bclink_lock(net); 574 bclink_accept_pkt(node, seqno); 575 tipc_bclink_unlock(net); 576 tipc_node_unlock(node); 577 kfree_skb(buf); 578 } 579 buf = NULL; 580 581 /* Determine new synchronization state */ 582 tipc_node_lock(node); 583 if (unlikely(!tipc_node_is_up(node))) 584 goto unlock; 585 586 if (node->bclink.last_in == node->bclink.last_sent) 587 goto unlock; 588 589 if (skb_queue_empty(&node->bclink.deferdq)) { 590 node->bclink.oos_state = 1; 591 goto unlock; 592 } 593 594 msg = buf_msg(skb_peek(&node->bclink.deferdq)); 595 seqno = msg_seqno(msg); 596 next_in = mod(next_in + 1); 597 if (seqno != next_in) 598 goto unlock; 599 600 /* Take in-sequence message from deferred queue & deliver it */ 601 buf = __skb_dequeue(&node->bclink.deferdq); 602 goto receive; 603 } 604 605 /* Handle out-of-sequence broadcast message */ 606 if (less(next_in, seqno)) { 607 deferred = tipc_link_defer_pkt(&node->bclink.deferdq, 608 buf); 609 bclink_update_last_sent(node, seqno); 610 buf = NULL; 611 } 612 613 tipc_bclink_lock(net); 614 615 if (deferred) 616 bcl->stats.deferred_recv++; 617 else 618 bcl->stats.duplicates++; 619 620 tipc_bclink_unlock(net); 621 622 unlock: 623 tipc_node_unlock(node); 624 tipc_node_put(node); 625 exit: 626 kfree_skb(buf); 627 } 628 629 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 630 { 631 return (n_ptr->bclink.recv_permitted && 632 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked)); 633 } 634 635 636 /** 637 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer 638 * 639 * Send packet over as many bearers as necessary to reach all nodes 640 * that have joined the broadcast link. 641 * 642 * Returns 0 (packet sent successfully) under all circumstances, 643 * since the broadcast link's pseudo-bearer never blocks 644 */ 645 static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, 646 struct tipc_bearer *unused1, 647 struct tipc_media_addr *unused2) 648 { 649 int bp_index; 650 struct tipc_msg *msg = buf_msg(buf); 651 struct tipc_net *tn = net_generic(net, tipc_net_id); 652 struct tipc_bcbearer *bcbearer = tn->bcbearer; 653 struct tipc_bclink *bclink = tn->bclink; 654 655 /* Prepare broadcast link message for reliable transmission, 656 * if first time trying to send it; 657 * preparation is skipped for broadcast link protocol messages 658 * since they are sent in an unreliable manner and don't need it 659 */ 660 if (likely(!msg_non_seq(buf_msg(buf)))) { 661 bcbuf_set_acks(buf, bclink->bcast_nodes.count); 662 msg_set_non_seq(msg, 1); 663 msg_set_mc_netid(msg, tn->net_id); 664 tn->bcl->stats.sent_info++; 665 if (WARN_ON(!bclink->bcast_nodes.count)) { 666 dump_stack(); 667 return 0; 668 } 669 } 670 671 /* Send buffer over bearers until all targets reached */ 672 bcbearer->remains = bclink->bcast_nodes; 673 674 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) { 675 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary; 676 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary; 677 struct tipc_bearer *bp[2] = {p, s}; 678 struct tipc_bearer *b = bp[msg_link_selector(msg)]; 679 struct sk_buff *tbuf; 680 681 if (!p) 682 break; /* No more bearers to try */ 683 if (!b) 684 b = p; 685 tipc_nmap_diff(&bcbearer->remains, &b->nodes, 686 &bcbearer->remains_new); 687 if (bcbearer->remains_new.count == bcbearer->remains.count) 688 continue; /* Nothing added by bearer pair */ 689 690 if (bp_index == 0) { 691 /* Use original buffer for first bearer */ 692 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr); 693 } else { 694 /* Avoid concurrent buffer access */ 695 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC); 696 if (!tbuf) 697 break; 698 tipc_bearer_send(net, b->identity, tbuf, 699 &b->bcast_addr); 700 kfree_skb(tbuf); /* Bearer keeps a clone */ 701 } 702 if (bcbearer->remains_new.count == 0) 703 break; /* All targets reached */ 704 705 bcbearer->remains = bcbearer->remains_new; 706 } 707 708 return 0; 709 } 710 711 /** 712 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 713 */ 714 void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, 715 u32 node, bool action) 716 { 717 struct tipc_net *tn = net_generic(net, tipc_net_id); 718 struct tipc_bcbearer *bcbearer = tn->bcbearer; 719 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 720 struct tipc_bcbearer_pair *bp_curr; 721 struct tipc_bearer *b; 722 int b_index; 723 int pri; 724 725 tipc_bclink_lock(net); 726 727 if (action) 728 tipc_nmap_add(nm_ptr, node); 729 else 730 tipc_nmap_remove(nm_ptr, node); 731 732 /* Group bearers by priority (can assume max of two per priority) */ 733 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 734 735 rcu_read_lock(); 736 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 737 b = rcu_dereference_rtnl(tn->bearer_list[b_index]); 738 if (!b || !b->nodes.count) 739 continue; 740 741 if (!bp_temp[b->priority].primary) 742 bp_temp[b->priority].primary = b; 743 else 744 bp_temp[b->priority].secondary = b; 745 } 746 rcu_read_unlock(); 747 748 /* Create array of bearer pairs for broadcasting */ 749 bp_curr = bcbearer->bpairs; 750 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 751 752 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 753 754 if (!bp_temp[pri].primary) 755 continue; 756 757 bp_curr->primary = bp_temp[pri].primary; 758 759 if (bp_temp[pri].secondary) { 760 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 761 &bp_temp[pri].secondary->nodes)) { 762 bp_curr->secondary = bp_temp[pri].secondary; 763 } else { 764 bp_curr++; 765 bp_curr->primary = bp_temp[pri].secondary; 766 } 767 } 768 769 bp_curr++; 770 } 771 772 tipc_bclink_unlock(net); 773 } 774 775 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 776 struct tipc_stats *stats) 777 { 778 int i; 779 struct nlattr *nest; 780 781 struct nla_map { 782 __u32 key; 783 __u32 val; 784 }; 785 786 struct nla_map map[] = { 787 {TIPC_NLA_STATS_RX_INFO, stats->recv_info}, 788 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments}, 789 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented}, 790 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles}, 791 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled}, 792 {TIPC_NLA_STATS_TX_INFO, stats->sent_info}, 793 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments}, 794 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented}, 795 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles}, 796 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled}, 797 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks}, 798 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv}, 799 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks}, 800 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks}, 801 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted}, 802 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates}, 803 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs}, 804 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz}, 805 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ? 806 (stats->accu_queue_sz / stats->queue_sz_counts) : 0} 807 }; 808 809 nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS); 810 if (!nest) 811 return -EMSGSIZE; 812 813 for (i = 0; i < ARRAY_SIZE(map); i++) 814 if (nla_put_u32(skb, map[i].key, map[i].val)) 815 goto msg_full; 816 817 nla_nest_end(skb, nest); 818 819 return 0; 820 msg_full: 821 nla_nest_cancel(skb, nest); 822 823 return -EMSGSIZE; 824 } 825 826 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) 827 { 828 int err; 829 void *hdr; 830 struct nlattr *attrs; 831 struct nlattr *prop; 832 struct tipc_net *tn = net_generic(net, tipc_net_id); 833 struct tipc_link *bcl = tn->bcl; 834 835 if (!bcl) 836 return 0; 837 838 tipc_bclink_lock(net); 839 840 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 841 NLM_F_MULTI, TIPC_NL_LINK_GET); 842 if (!hdr) 843 return -EMSGSIZE; 844 845 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK); 846 if (!attrs) 847 goto msg_full; 848 849 /* The broadcast link is always up */ 850 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 851 goto attr_msg_full; 852 853 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST)) 854 goto attr_msg_full; 855 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name)) 856 goto attr_msg_full; 857 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt)) 858 goto attr_msg_full; 859 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt)) 860 goto attr_msg_full; 861 862 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP); 863 if (!prop) 864 goto attr_msg_full; 865 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window)) 866 goto prop_msg_full; 867 nla_nest_end(msg->skb, prop); 868 869 err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); 870 if (err) 871 goto attr_msg_full; 872 873 tipc_bclink_unlock(net); 874 nla_nest_end(msg->skb, attrs); 875 genlmsg_end(msg->skb, hdr); 876 877 return 0; 878 879 prop_msg_full: 880 nla_nest_cancel(msg->skb, prop); 881 attr_msg_full: 882 nla_nest_cancel(msg->skb, attrs); 883 msg_full: 884 tipc_bclink_unlock(net); 885 genlmsg_cancel(msg->skb, hdr); 886 887 return -EMSGSIZE; 888 } 889 890 int tipc_bclink_reset_stats(struct net *net) 891 { 892 struct tipc_net *tn = net_generic(net, tipc_net_id); 893 struct tipc_link *bcl = tn->bcl; 894 895 if (!bcl) 896 return -ENOPROTOOPT; 897 898 tipc_bclink_lock(net); 899 memset(&bcl->stats, 0, sizeof(bcl->stats)); 900 tipc_bclink_unlock(net); 901 return 0; 902 } 903 904 int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 905 { 906 struct tipc_net *tn = net_generic(net, tipc_net_id); 907 struct tipc_link *bcl = tn->bcl; 908 909 if (!bcl) 910 return -ENOPROTOOPT; 911 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 912 return -EINVAL; 913 914 tipc_bclink_lock(net); 915 tipc_link_set_queue_limits(bcl, limit); 916 tipc_bclink_unlock(net); 917 return 0; 918 } 919 920 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) 921 { 922 int err; 923 u32 win; 924 struct nlattr *props[TIPC_NLA_PROP_MAX + 1]; 925 926 if (!attrs[TIPC_NLA_LINK_PROP]) 927 return -EINVAL; 928 929 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props); 930 if (err) 931 return err; 932 933 if (!props[TIPC_NLA_PROP_WIN]) 934 return -EOPNOTSUPP; 935 936 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 937 938 return tipc_bclink_set_queue_limits(net, win); 939 } 940 941 int tipc_bclink_init(struct net *net) 942 { 943 struct tipc_net *tn = net_generic(net, tipc_net_id); 944 struct tipc_bcbearer *bcbearer; 945 struct tipc_bclink *bclink; 946 struct tipc_link *bcl; 947 948 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC); 949 if (!bcbearer) 950 return -ENOMEM; 951 952 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC); 953 if (!bclink) { 954 kfree(bcbearer); 955 return -ENOMEM; 956 } 957 958 bcl = &bclink->link; 959 bcbearer->bearer.media = &bcbearer->media; 960 bcbearer->media.send_msg = tipc_bcbearer_send; 961 sprintf(bcbearer->media.name, "tipc-broadcast"); 962 963 spin_lock_init(&bclink->lock); 964 __skb_queue_head_init(&bcl->transmq); 965 __skb_queue_head_init(&bcl->backlogq); 966 __skb_queue_head_init(&bcl->deferdq); 967 skb_queue_head_init(&bcl->wakeupq); 968 bcl->snd_nxt = 1; 969 spin_lock_init(&bclink->node.lock); 970 __skb_queue_head_init(&bclink->arrvq); 971 skb_queue_head_init(&bclink->inputq); 972 bcl->owner = &bclink->node; 973 bcl->owner->net = net; 974 bcl->mtu = MAX_PKT_DEFAULT_MCAST; 975 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 976 bcl->bearer_id = MAX_BEARERS; 977 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer); 978 bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg; 979 msg_set_prevnode(bcl->pmsg, tn->own_addr); 980 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 981 tn->bcbearer = bcbearer; 982 tn->bclink = bclink; 983 tn->bcl = bcl; 984 return 0; 985 } 986 987 void tipc_bclink_stop(struct net *net) 988 { 989 struct tipc_net *tn = net_generic(net, tipc_net_id); 990 991 tipc_bclink_lock(net); 992 tipc_link_purge_queues(tn->bcl); 993 tipc_bclink_unlock(net); 994 995 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); 996 synchronize_net(); 997 kfree(tn->bcbearer); 998 kfree(tn->bclink); 999 } 1000 1001 /** 1002 * tipc_nmap_add - add a node to a node map 1003 */ 1004 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 1005 { 1006 int n = tipc_node(node); 1007 int w = n / WSIZE; 1008 u32 mask = (1 << (n % WSIZE)); 1009 1010 if ((nm_ptr->map[w] & mask) == 0) { 1011 nm_ptr->count++; 1012 nm_ptr->map[w] |= mask; 1013 } 1014 } 1015 1016 /** 1017 * tipc_nmap_remove - remove a node from a node map 1018 */ 1019 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 1020 { 1021 int n = tipc_node(node); 1022 int w = n / WSIZE; 1023 u32 mask = (1 << (n % WSIZE)); 1024 1025 if ((nm_ptr->map[w] & mask) != 0) { 1026 nm_ptr->map[w] &= ~mask; 1027 nm_ptr->count--; 1028 } 1029 } 1030 1031 /** 1032 * tipc_nmap_diff - find differences between node maps 1033 * @nm_a: input node map A 1034 * @nm_b: input node map B 1035 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B) 1036 */ 1037 static void tipc_nmap_diff(struct tipc_node_map *nm_a, 1038 struct tipc_node_map *nm_b, 1039 struct tipc_node_map *nm_diff) 1040 { 1041 int stop = ARRAY_SIZE(nm_a->map); 1042 int w; 1043 int b; 1044 u32 map; 1045 1046 memset(nm_diff, 0, sizeof(*nm_diff)); 1047 for (w = 0; w < stop; w++) { 1048 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]); 1049 nm_diff->map[w] = map; 1050 if (map != 0) { 1051 for (b = 0 ; b < WSIZE; b++) { 1052 if (map & (1 << b)) 1053 nm_diff->count++; 1054 } 1055 } 1056 } 1057 } 1058