1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "topsrv.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_JOINING, 53 MBR_PUBLISHED, 54 MBR_JOINED, 55 MBR_PENDING, 56 MBR_ACTIVE, 57 MBR_RECLAIMING, 58 MBR_REMITTED, 59 MBR_LEAVING 60 }; 61 62 struct tipc_member { 63 struct rb_node tree_node; 64 struct list_head list; 65 struct list_head small_win; 66 struct sk_buff_head deferredq; 67 struct tipc_group *group; 68 u32 node; 69 u32 port; 70 u32 instance; 71 enum mbr_state state; 72 u16 advertised; 73 u16 window; 74 u16 bc_rcv_nxt; 75 u16 bc_syncpt; 76 u16 bc_acked; 77 }; 78 79 struct tipc_group { 80 struct rb_root members; 81 struct list_head small_win; 82 struct list_head pending; 83 struct list_head active; 84 struct tipc_nlist dests; 85 struct net *net; 86 int subid; 87 u32 type; 88 u32 instance; 89 u32 scope; 90 u32 portid; 91 u16 member_cnt; 92 u16 active_cnt; 93 u16 max_active; 94 u16 bc_snd_nxt; 95 u16 bc_ackers; 96 bool *open; 97 bool loopback; 98 bool events; 99 }; 100 101 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 102 int mtyp, struct sk_buff_head *xmitq); 103 104 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 105 { 106 *wakeup = false; 107 if (list_empty(&m->small_win)) 108 return; 109 list_del_init(&m->small_win); 110 *m->group->open = true; 111 *wakeup = true; 112 } 113 114 static void tipc_group_decr_active(struct tipc_group *grp, 115 struct tipc_member *m) 116 { 117 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 118 m->state == MBR_REMITTED) 119 grp->active_cnt--; 120 } 121 122 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 123 { 124 int max_active, active_pool, idle_pool; 125 int mcnt = grp->member_cnt + 1; 126 127 /* Limit simultaneous reception from other members */ 128 max_active = min(mcnt / 8, 64); 129 max_active = max(max_active, 16); 130 grp->max_active = max_active; 131 132 /* Reserve blocks for active and idle members */ 133 active_pool = max_active * ADV_ACTIVE; 134 idle_pool = (mcnt - max_active) * ADV_IDLE; 135 136 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 137 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 138 } 139 140 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 141 { 142 return grp->bc_snd_nxt; 143 } 144 145 static bool tipc_group_is_receiver(struct tipc_member *m) 146 { 147 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 148 } 149 150 static bool tipc_group_is_sender(struct tipc_member *m) 151 { 152 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 153 } 154 155 u32 tipc_group_exclude(struct tipc_group *grp) 156 { 157 if (!grp->loopback) 158 return grp->portid; 159 return 0; 160 } 161 162 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 163 struct tipc_group_req *mreq, 164 bool *group_is_open) 165 { 166 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 167 bool global = mreq->scope != TIPC_NODE_SCOPE; 168 struct tipc_group *grp; 169 u32 type = mreq->type; 170 171 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 172 if (!grp) 173 return NULL; 174 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 175 INIT_LIST_HEAD(&grp->small_win); 176 INIT_LIST_HEAD(&grp->active); 177 INIT_LIST_HEAD(&grp->pending); 178 grp->members = RB_ROOT; 179 grp->net = net; 180 grp->portid = portid; 181 grp->type = type; 182 grp->instance = mreq->instance; 183 grp->scope = mreq->scope; 184 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 185 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 186 grp->open = group_is_open; 187 *grp->open = false; 188 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 189 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 190 filter, &grp->subid)) 191 return grp; 192 kfree(grp); 193 return NULL; 194 } 195 196 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 197 { 198 struct rb_root *tree = &grp->members; 199 struct tipc_member *m, *tmp; 200 struct sk_buff_head xmitq; 201 202 __skb_queue_head_init(&xmitq); 203 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 204 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 205 tipc_group_update_member(m, 0); 206 } 207 tipc_node_distr_xmit(net, &xmitq); 208 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 209 } 210 211 void tipc_group_delete(struct net *net, struct tipc_group *grp) 212 { 213 struct rb_root *tree = &grp->members; 214 struct tipc_member *m, *tmp; 215 struct sk_buff_head xmitq; 216 217 __skb_queue_head_init(&xmitq); 218 219 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 220 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 221 __skb_queue_purge(&m->deferredq); 222 list_del(&m->list); 223 kfree(m); 224 } 225 tipc_node_distr_xmit(net, &xmitq); 226 tipc_nlist_purge(&grp->dests); 227 tipc_topsrv_kern_unsubscr(net, grp->subid); 228 kfree(grp); 229 } 230 231 static struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 232 u32 node, u32 port) 233 { 234 struct rb_node *n = grp->members.rb_node; 235 u64 nkey, key = (u64)node << 32 | port; 236 struct tipc_member *m; 237 238 while (n) { 239 m = container_of(n, struct tipc_member, tree_node); 240 nkey = (u64)m->node << 32 | m->port; 241 if (key < nkey) 242 n = n->rb_left; 243 else if (key > nkey) 244 n = n->rb_right; 245 else 246 return m; 247 } 248 return NULL; 249 } 250 251 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 252 u32 node, u32 port) 253 { 254 struct tipc_member *m; 255 256 m = tipc_group_find_member(grp, node, port); 257 if (m && tipc_group_is_receiver(m)) 258 return m; 259 return NULL; 260 } 261 262 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 263 u32 node) 264 { 265 struct tipc_member *m; 266 struct rb_node *n; 267 268 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 269 m = container_of(n, struct tipc_member, tree_node); 270 if (m->node == node) 271 return m; 272 } 273 return NULL; 274 } 275 276 static int tipc_group_add_to_tree(struct tipc_group *grp, 277 struct tipc_member *m) 278 { 279 u64 nkey, key = (u64)m->node << 32 | m->port; 280 struct rb_node **n, *parent = NULL; 281 struct tipc_member *tmp; 282 283 n = &grp->members.rb_node; 284 while (*n) { 285 tmp = container_of(*n, struct tipc_member, tree_node); 286 parent = *n; 287 tmp = container_of(parent, struct tipc_member, tree_node); 288 nkey = (u64)tmp->node << 32 | tmp->port; 289 if (key < nkey) 290 n = &(*n)->rb_left; 291 else if (key > nkey) 292 n = &(*n)->rb_right; 293 else 294 return -EEXIST; 295 } 296 rb_link_node(&m->tree_node, parent, n); 297 rb_insert_color(&m->tree_node, &grp->members); 298 return 0; 299 } 300 301 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 302 u32 node, u32 port, 303 u32 instance, int state) 304 { 305 struct tipc_member *m; 306 int ret; 307 308 m = kzalloc(sizeof(*m), GFP_ATOMIC); 309 if (!m) 310 return NULL; 311 INIT_LIST_HEAD(&m->list); 312 INIT_LIST_HEAD(&m->small_win); 313 __skb_queue_head_init(&m->deferredq); 314 m->group = grp; 315 m->node = node; 316 m->port = port; 317 m->instance = instance; 318 m->bc_acked = grp->bc_snd_nxt - 1; 319 ret = tipc_group_add_to_tree(grp, m); 320 if (ret < 0) { 321 kfree(m); 322 return NULL; 323 } 324 grp->member_cnt++; 325 tipc_nlist_add(&grp->dests, m->node); 326 m->state = state; 327 return m; 328 } 329 330 void tipc_group_add_member(struct tipc_group *grp, u32 node, 331 u32 port, u32 instance) 332 { 333 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 334 } 335 336 static void tipc_group_delete_member(struct tipc_group *grp, 337 struct tipc_member *m) 338 { 339 rb_erase(&m->tree_node, &grp->members); 340 grp->member_cnt--; 341 342 /* Check if we were waiting for replicast ack from this member */ 343 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 344 grp->bc_ackers--; 345 346 list_del_init(&m->list); 347 list_del_init(&m->small_win); 348 tipc_group_decr_active(grp, m); 349 350 /* If last member on a node, remove node from dest list */ 351 if (!tipc_group_find_node(grp, m->node)) 352 tipc_nlist_del(&grp->dests, m->node); 353 354 kfree(m); 355 } 356 357 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 358 { 359 return &grp->dests; 360 } 361 362 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 363 int *scope) 364 { 365 seq->type = grp->type; 366 seq->lower = grp->instance; 367 seq->upper = grp->instance; 368 *scope = grp->scope; 369 } 370 371 void tipc_group_update_member(struct tipc_member *m, int len) 372 { 373 struct tipc_group *grp = m->group; 374 struct tipc_member *_m, *tmp; 375 376 if (!tipc_group_is_receiver(m)) 377 return; 378 379 m->window -= len; 380 381 if (m->window >= ADV_IDLE) 382 return; 383 384 list_del_init(&m->small_win); 385 386 /* Sort member into small_window members' list */ 387 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 388 if (_m->window > m->window) 389 break; 390 } 391 list_add_tail(&m->small_win, &_m->small_win); 392 } 393 394 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 395 { 396 u16 prev = grp->bc_snd_nxt - 1; 397 struct tipc_member *m; 398 struct rb_node *n; 399 u16 ackers = 0; 400 401 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 402 m = container_of(n, struct tipc_member, tree_node); 403 if (tipc_group_is_receiver(m)) { 404 tipc_group_update_member(m, len); 405 m->bc_acked = prev; 406 ackers++; 407 } 408 } 409 410 /* Mark number of acknowledges to expect, if any */ 411 if (ack) 412 grp->bc_ackers = ackers; 413 grp->bc_snd_nxt++; 414 } 415 416 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 417 int len, struct tipc_member **mbr) 418 { 419 struct sk_buff_head xmitq; 420 struct tipc_member *m; 421 int adv, state; 422 423 m = tipc_group_find_dest(grp, dnode, dport); 424 if (!tipc_group_is_receiver(m)) { 425 *mbr = NULL; 426 return false; 427 } 428 *mbr = m; 429 430 if (m->window >= len) 431 return false; 432 433 *grp->open = false; 434 435 /* If not fully advertised, do it now to prevent mutual blocking */ 436 adv = m->advertised; 437 state = m->state; 438 if (state == MBR_JOINED && adv == ADV_IDLE) 439 return true; 440 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 441 return true; 442 if (state == MBR_PENDING && adv == ADV_IDLE) 443 return true; 444 __skb_queue_head_init(&xmitq); 445 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 446 tipc_node_distr_xmit(grp->net, &xmitq); 447 return true; 448 } 449 450 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 451 { 452 struct tipc_member *m = NULL; 453 454 /* If prev bcast was replicast, reject until all receivers have acked */ 455 if (grp->bc_ackers) { 456 *grp->open = false; 457 return true; 458 } 459 if (list_empty(&grp->small_win)) 460 return false; 461 462 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 463 if (m->window >= len) 464 return false; 465 466 return tipc_group_cong(grp, m->node, m->port, len, &m); 467 } 468 469 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 470 */ 471 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 472 { 473 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 474 u16 bc_seqno = msg_grp_bc_seqno(hdr); 475 struct sk_buff *_skb, *tmp; 476 int mtyp = msg_type(hdr); 477 478 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 479 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 480 skb_queue_walk_safe(defq, _skb, tmp) { 481 _hdr = buf_msg(_skb); 482 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 483 continue; 484 __skb_queue_before(defq, _skb, skb); 485 return; 486 } 487 /* Bcast was not bypassed, - add to tail */ 488 } 489 /* Unicasts are never bypassed, - always add to tail */ 490 __skb_queue_tail(defq, skb); 491 } 492 493 /* tipc_group_filter_msg() - determine if we should accept arriving message 494 */ 495 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 496 struct sk_buff_head *xmitq) 497 { 498 struct sk_buff *skb = __skb_dequeue(inputq); 499 bool ack, deliver, update, leave = false; 500 struct sk_buff_head *defq; 501 struct tipc_member *m; 502 struct tipc_msg *hdr; 503 u32 node, port; 504 int mtyp, blks; 505 506 if (!skb) 507 return; 508 509 hdr = buf_msg(skb); 510 node = msg_orignode(hdr); 511 port = msg_origport(hdr); 512 513 if (!msg_in_group(hdr)) 514 goto drop; 515 516 m = tipc_group_find_member(grp, node, port); 517 if (!tipc_group_is_sender(m)) 518 goto drop; 519 520 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 521 goto drop; 522 523 TIPC_SKB_CB(skb)->orig_member = m->instance; 524 defq = &m->deferredq; 525 tipc_group_sort_msg(skb, defq); 526 527 while ((skb = skb_peek(defq))) { 528 hdr = buf_msg(skb); 529 mtyp = msg_type(hdr); 530 blks = msg_blocks(hdr); 531 deliver = true; 532 ack = false; 533 update = false; 534 535 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 536 break; 537 538 /* Decide what to do with message */ 539 switch (mtyp) { 540 case TIPC_GRP_MCAST_MSG: 541 if (msg_nameinst(hdr) != grp->instance) { 542 update = true; 543 deliver = false; 544 } 545 fallthrough; 546 case TIPC_GRP_BCAST_MSG: 547 m->bc_rcv_nxt++; 548 ack = msg_grp_bc_ack_req(hdr); 549 break; 550 case TIPC_GRP_UCAST_MSG: 551 break; 552 case TIPC_GRP_MEMBER_EVT: 553 if (m->state == MBR_LEAVING) 554 leave = true; 555 if (!grp->events) 556 deliver = false; 557 break; 558 default: 559 break; 560 } 561 562 /* Execute decisions */ 563 __skb_dequeue(defq); 564 if (deliver) 565 __skb_queue_tail(inputq, skb); 566 else 567 kfree_skb(skb); 568 569 if (ack) 570 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 571 572 if (leave) { 573 __skb_queue_purge(defq); 574 tipc_group_delete_member(grp, m); 575 break; 576 } 577 if (!update) 578 continue; 579 580 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 581 } 582 return; 583 drop: 584 kfree_skb(skb); 585 } 586 587 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 588 u32 port, struct sk_buff_head *xmitq) 589 { 590 struct list_head *active = &grp->active; 591 int max_active = grp->max_active; 592 int reclaim_limit = max_active * 3 / 4; 593 int active_cnt = grp->active_cnt; 594 struct tipc_member *m, *rm, *pm; 595 596 m = tipc_group_find_member(grp, node, port); 597 if (!m) 598 return; 599 600 m->advertised -= blks; 601 602 switch (m->state) { 603 case MBR_JOINED: 604 /* First, decide if member can go active */ 605 if (active_cnt <= max_active) { 606 m->state = MBR_ACTIVE; 607 list_add_tail(&m->list, active); 608 grp->active_cnt++; 609 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 610 } else { 611 m->state = MBR_PENDING; 612 list_add_tail(&m->list, &grp->pending); 613 } 614 615 if (active_cnt < reclaim_limit) 616 break; 617 618 /* Reclaim from oldest active member, if possible */ 619 if (!list_empty(active)) { 620 rm = list_first_entry(active, struct tipc_member, list); 621 rm->state = MBR_RECLAIMING; 622 list_del_init(&rm->list); 623 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 624 break; 625 } 626 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 627 pm = list_first_entry(&grp->pending, struct tipc_member, list); 628 list_del_init(&pm->list); 629 pm->state = MBR_JOINED; 630 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 631 break; 632 case MBR_ACTIVE: 633 if (!list_is_last(&m->list, &grp->active)) 634 list_move_tail(&m->list, &grp->active); 635 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 636 break; 637 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 638 break; 639 case MBR_REMITTED: 640 if (m->advertised > ADV_IDLE) 641 break; 642 m->state = MBR_JOINED; 643 grp->active_cnt--; 644 if (m->advertised < ADV_IDLE) { 645 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 646 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 647 } 648 649 if (list_empty(&grp->pending)) 650 return; 651 652 /* Set oldest pending member to active and advertise */ 653 pm = list_first_entry(&grp->pending, struct tipc_member, list); 654 pm->state = MBR_ACTIVE; 655 list_move_tail(&pm->list, &grp->active); 656 grp->active_cnt++; 657 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 658 break; 659 case MBR_RECLAIMING: 660 case MBR_JOINING: 661 case MBR_LEAVING: 662 default: 663 break; 664 } 665 } 666 667 static void tipc_group_create_event(struct tipc_group *grp, 668 struct tipc_member *m, 669 u32 event, u16 seqno, 670 struct sk_buff_head *inputq) 671 { u32 dnode = tipc_own_addr(grp->net); 672 struct tipc_event evt; 673 struct sk_buff *skb; 674 struct tipc_msg *hdr; 675 676 memset(&evt, 0, sizeof(evt)); 677 evt.event = event; 678 evt.found_lower = m->instance; 679 evt.found_upper = m->instance; 680 evt.port.ref = m->port; 681 evt.port.node = m->node; 682 evt.s.seq.type = grp->type; 683 evt.s.seq.lower = m->instance; 684 evt.s.seq.upper = m->instance; 685 686 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 687 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 688 grp->portid, m->port, 0); 689 if (!skb) 690 return; 691 692 hdr = buf_msg(skb); 693 msg_set_nametype(hdr, grp->type); 694 msg_set_grp_evt(hdr, event); 695 msg_set_dest_droppable(hdr, true); 696 msg_set_grp_bc_seqno(hdr, seqno); 697 memcpy(msg_data(hdr), &evt, sizeof(evt)); 698 TIPC_SKB_CB(skb)->orig_member = m->instance; 699 __skb_queue_tail(inputq, skb); 700 } 701 702 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 703 int mtyp, struct sk_buff_head *xmitq) 704 { 705 struct tipc_msg *hdr; 706 struct sk_buff *skb; 707 int adv = 0; 708 709 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 710 m->node, tipc_own_addr(grp->net), 711 m->port, grp->portid, 0); 712 if (!skb) 713 return; 714 715 if (m->state == MBR_ACTIVE) 716 adv = ADV_ACTIVE - m->advertised; 717 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 718 adv = ADV_IDLE - m->advertised; 719 720 hdr = buf_msg(skb); 721 722 if (mtyp == GRP_JOIN_MSG) { 723 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 724 msg_set_adv_win(hdr, adv); 725 m->advertised += adv; 726 } else if (mtyp == GRP_LEAVE_MSG) { 727 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 728 } else if (mtyp == GRP_ADV_MSG) { 729 msg_set_adv_win(hdr, adv); 730 m->advertised += adv; 731 } else if (mtyp == GRP_ACK_MSG) { 732 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 733 } else if (mtyp == GRP_REMIT_MSG) { 734 msg_set_grp_remitted(hdr, m->window); 735 } 736 msg_set_dest_droppable(hdr, true); 737 __skb_queue_tail(xmitq, skb); 738 } 739 740 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 741 struct tipc_msg *hdr, struct sk_buff_head *inputq, 742 struct sk_buff_head *xmitq) 743 { 744 u32 node = msg_orignode(hdr); 745 u32 port = msg_origport(hdr); 746 struct tipc_member *m, *pm; 747 u16 remitted, in_flight; 748 749 if (!grp) 750 return; 751 752 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 753 return; 754 755 m = tipc_group_find_member(grp, node, port); 756 757 switch (msg_type(hdr)) { 758 case GRP_JOIN_MSG: 759 if (!m) 760 m = tipc_group_create_member(grp, node, port, 761 0, MBR_JOINING); 762 if (!m) 763 return; 764 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 765 m->bc_rcv_nxt = m->bc_syncpt; 766 m->window += msg_adv_win(hdr); 767 768 /* Wait until PUBLISH event is received if necessary */ 769 if (m->state != MBR_PUBLISHED) 770 return; 771 772 /* Member can be taken into service */ 773 m->state = MBR_JOINED; 774 tipc_group_open(m, usr_wakeup); 775 tipc_group_update_member(m, 0); 776 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 777 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 778 m->bc_syncpt, inputq); 779 return; 780 case GRP_LEAVE_MSG: 781 if (!m) 782 return; 783 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 784 list_del_init(&m->list); 785 tipc_group_open(m, usr_wakeup); 786 tipc_group_decr_active(grp, m); 787 m->state = MBR_LEAVING; 788 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 789 m->bc_syncpt, inputq); 790 return; 791 case GRP_ADV_MSG: 792 if (!m) 793 return; 794 m->window += msg_adv_win(hdr); 795 tipc_group_open(m, usr_wakeup); 796 return; 797 case GRP_ACK_MSG: 798 if (!m) 799 return; 800 m->bc_acked = msg_grp_bc_acked(hdr); 801 if (--grp->bc_ackers) 802 return; 803 list_del_init(&m->small_win); 804 *m->group->open = true; 805 *usr_wakeup = true; 806 tipc_group_update_member(m, 0); 807 return; 808 case GRP_RECLAIM_MSG: 809 if (!m) 810 return; 811 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 812 m->window = ADV_IDLE; 813 tipc_group_open(m, usr_wakeup); 814 return; 815 case GRP_REMIT_MSG: 816 if (!m || m->state != MBR_RECLAIMING) 817 return; 818 819 remitted = msg_grp_remitted(hdr); 820 821 /* Messages preceding the REMIT still in receive queue */ 822 if (m->advertised > remitted) { 823 m->state = MBR_REMITTED; 824 in_flight = m->advertised - remitted; 825 m->advertised = ADV_IDLE + in_flight; 826 return; 827 } 828 /* This should never happen */ 829 if (m->advertised < remitted) 830 pr_warn_ratelimited("Unexpected REMIT msg\n"); 831 832 /* All messages preceding the REMIT have been read */ 833 m->state = MBR_JOINED; 834 grp->active_cnt--; 835 m->advertised = ADV_IDLE; 836 837 /* Set oldest pending member to active and advertise */ 838 if (list_empty(&grp->pending)) 839 return; 840 pm = list_first_entry(&grp->pending, struct tipc_member, list); 841 pm->state = MBR_ACTIVE; 842 list_move_tail(&pm->list, &grp->active); 843 grp->active_cnt++; 844 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 845 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 846 return; 847 default: 848 pr_warn("Received unknown GROUP_PROTO message\n"); 849 } 850 } 851 852 /* tipc_group_member_evt() - receive and handle a member up/down event 853 */ 854 void tipc_group_member_evt(struct tipc_group *grp, 855 bool *usr_wakeup, 856 int *sk_rcvbuf, 857 struct tipc_msg *hdr, 858 struct sk_buff_head *inputq, 859 struct sk_buff_head *xmitq) 860 { 861 struct tipc_event *evt = (void *)msg_data(hdr); 862 u32 instance = evt->found_lower; 863 u32 node = evt->port.node; 864 u32 port = evt->port.ref; 865 int event = evt->event; 866 struct tipc_member *m; 867 struct net *net; 868 u32 self; 869 870 if (!grp) 871 return; 872 873 net = grp->net; 874 self = tipc_own_addr(net); 875 if (!grp->loopback && node == self && port == grp->portid) 876 return; 877 878 m = tipc_group_find_member(grp, node, port); 879 880 switch (event) { 881 case TIPC_PUBLISHED: 882 /* Send and wait for arrival of JOIN message if necessary */ 883 if (!m) { 884 m = tipc_group_create_member(grp, node, port, instance, 885 MBR_PUBLISHED); 886 if (!m) 887 break; 888 tipc_group_update_member(m, 0); 889 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 890 break; 891 } 892 893 if (m->state != MBR_JOINING) 894 break; 895 896 /* Member can be taken into service */ 897 m->instance = instance; 898 m->state = MBR_JOINED; 899 tipc_group_open(m, usr_wakeup); 900 tipc_group_update_member(m, 0); 901 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 902 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 903 m->bc_syncpt, inputq); 904 break; 905 case TIPC_WITHDRAWN: 906 if (!m) 907 break; 908 909 tipc_group_decr_active(grp, m); 910 m->state = MBR_LEAVING; 911 list_del_init(&m->list); 912 tipc_group_open(m, usr_wakeup); 913 914 /* Only send event if no LEAVE message can be expected */ 915 if (!tipc_node_is_up(net, node)) 916 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 917 m->bc_rcv_nxt, inputq); 918 break; 919 default: 920 break; 921 } 922 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 923 } 924 925 int tipc_group_fill_sock_diag(struct tipc_group *grp, struct sk_buff *skb) 926 { 927 struct nlattr *group = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_GROUP); 928 929 if (!group) 930 return -EMSGSIZE; 931 932 if (nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_ID, 933 grp->type) || 934 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_INSTANCE, 935 grp->instance) || 936 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_BC_SEND_NEXT, 937 grp->bc_snd_nxt)) 938 goto group_msg_cancel; 939 940 if (grp->scope == TIPC_NODE_SCOPE) 941 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_NODE_SCOPE)) 942 goto group_msg_cancel; 943 944 if (grp->scope == TIPC_CLUSTER_SCOPE) 945 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_CLUSTER_SCOPE)) 946 goto group_msg_cancel; 947 948 if (*grp->open) 949 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_OPEN)) 950 goto group_msg_cancel; 951 952 nla_nest_end(skb, group); 953 return 0; 954 955 group_msg_cancel: 956 nla_nest_cancel(skb, group); 957 return -1; 958 } 959