1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * Copyright (c) 2020, Red Hat Inc 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "addr.h" 39 #include "group.h" 40 #include "bcast.h" 41 #include "topsrv.h" 42 #include "msg.h" 43 #include "socket.h" 44 #include "node.h" 45 #include "name_table.h" 46 #include "subscr.h" 47 48 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 49 #define ADV_IDLE ADV_UNIT 50 #define ADV_ACTIVE (ADV_UNIT * 12) 51 52 enum mbr_state { 53 MBR_JOINING, 54 MBR_PUBLISHED, 55 MBR_JOINED, 56 MBR_PENDING, 57 MBR_ACTIVE, 58 MBR_RECLAIMING, 59 MBR_REMITTED, 60 MBR_LEAVING 61 }; 62 63 struct tipc_member { 64 struct rb_node tree_node; 65 struct list_head list; 66 struct list_head small_win; 67 struct sk_buff_head deferredq; 68 struct tipc_group *group; 69 u32 node; 70 u32 port; 71 u32 instance; 72 enum mbr_state state; 73 u16 advertised; 74 u16 window; 75 u16 bc_rcv_nxt; 76 u16 bc_syncpt; 77 u16 bc_acked; 78 }; 79 80 struct tipc_group { 81 struct rb_root members; 82 struct list_head small_win; 83 struct list_head pending; 84 struct list_head active; 85 struct tipc_nlist dests; 86 struct net *net; 87 int subid; 88 u32 type; 89 u32 instance; 90 u32 scope; 91 u32 portid; 92 u16 member_cnt; 93 u16 active_cnt; 94 u16 max_active; 95 u16 bc_snd_nxt; 96 u16 bc_ackers; 97 bool *open; 98 bool loopback; 99 bool events; 100 }; 101 102 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 103 int mtyp, struct sk_buff_head *xmitq); 104 105 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 106 { 107 *wakeup = false; 108 if (list_empty(&m->small_win)) 109 return; 110 list_del_init(&m->small_win); 111 *m->group->open = true; 112 *wakeup = true; 113 } 114 115 static void tipc_group_decr_active(struct tipc_group *grp, 116 struct tipc_member *m) 117 { 118 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 119 m->state == MBR_REMITTED) 120 grp->active_cnt--; 121 } 122 123 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 124 { 125 int max_active, active_pool, idle_pool; 126 int mcnt = grp->member_cnt + 1; 127 128 /* Limit simultaneous reception from other members */ 129 max_active = min(mcnt / 8, 64); 130 max_active = max(max_active, 16); 131 grp->max_active = max_active; 132 133 /* Reserve blocks for active and idle members */ 134 active_pool = max_active * ADV_ACTIVE; 135 idle_pool = (mcnt - max_active) * ADV_IDLE; 136 137 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 138 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 139 } 140 141 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 142 { 143 return grp->bc_snd_nxt; 144 } 145 146 static bool tipc_group_is_receiver(struct tipc_member *m) 147 { 148 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 149 } 150 151 static bool tipc_group_is_sender(struct tipc_member *m) 152 { 153 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 154 } 155 156 u32 tipc_group_exclude(struct tipc_group *grp) 157 { 158 if (!grp->loopback) 159 return grp->portid; 160 return 0; 161 } 162 163 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 164 struct tipc_group_req *mreq, 165 bool *group_is_open) 166 { 167 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 168 bool global = mreq->scope != TIPC_NODE_SCOPE; 169 struct tipc_group *grp; 170 u32 type = mreq->type; 171 172 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 173 if (!grp) 174 return NULL; 175 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 176 INIT_LIST_HEAD(&grp->small_win); 177 INIT_LIST_HEAD(&grp->active); 178 INIT_LIST_HEAD(&grp->pending); 179 grp->members = RB_ROOT; 180 grp->net = net; 181 grp->portid = portid; 182 grp->type = type; 183 grp->instance = mreq->instance; 184 grp->scope = mreq->scope; 185 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 186 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 187 grp->open = group_is_open; 188 *grp->open = false; 189 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 190 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 191 filter, &grp->subid)) 192 return grp; 193 kfree(grp); 194 return NULL; 195 } 196 197 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 198 { 199 struct rb_root *tree = &grp->members; 200 struct tipc_member *m, *tmp; 201 struct sk_buff_head xmitq; 202 203 __skb_queue_head_init(&xmitq); 204 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 205 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 206 tipc_group_update_member(m, 0); 207 } 208 tipc_node_distr_xmit(net, &xmitq); 209 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 210 } 211 212 void tipc_group_delete(struct net *net, struct tipc_group *grp) 213 { 214 struct rb_root *tree = &grp->members; 215 struct tipc_member *m, *tmp; 216 struct sk_buff_head xmitq; 217 218 __skb_queue_head_init(&xmitq); 219 220 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 221 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 222 __skb_queue_purge(&m->deferredq); 223 list_del(&m->list); 224 kfree(m); 225 } 226 tipc_node_distr_xmit(net, &xmitq); 227 tipc_nlist_purge(&grp->dests); 228 tipc_topsrv_kern_unsubscr(net, grp->subid); 229 kfree(grp); 230 } 231 232 static struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 233 u32 node, u32 port) 234 { 235 struct rb_node *n = grp->members.rb_node; 236 u64 nkey, key = (u64)node << 32 | port; 237 struct tipc_member *m; 238 239 while (n) { 240 m = container_of(n, struct tipc_member, tree_node); 241 nkey = (u64)m->node << 32 | m->port; 242 if (key < nkey) 243 n = n->rb_left; 244 else if (key > nkey) 245 n = n->rb_right; 246 else 247 return m; 248 } 249 return NULL; 250 } 251 252 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 253 u32 node, u32 port) 254 { 255 struct tipc_member *m; 256 257 m = tipc_group_find_member(grp, node, port); 258 if (m && tipc_group_is_receiver(m)) 259 return m; 260 return NULL; 261 } 262 263 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 264 u32 node) 265 { 266 struct tipc_member *m; 267 struct rb_node *n; 268 269 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 270 m = container_of(n, struct tipc_member, tree_node); 271 if (m->node == node) 272 return m; 273 } 274 return NULL; 275 } 276 277 static int tipc_group_add_to_tree(struct tipc_group *grp, 278 struct tipc_member *m) 279 { 280 u64 nkey, key = (u64)m->node << 32 | m->port; 281 struct rb_node **n, *parent = NULL; 282 struct tipc_member *tmp; 283 284 n = &grp->members.rb_node; 285 while (*n) { 286 tmp = container_of(*n, struct tipc_member, tree_node); 287 parent = *n; 288 tmp = container_of(parent, struct tipc_member, tree_node); 289 nkey = (u64)tmp->node << 32 | tmp->port; 290 if (key < nkey) 291 n = &(*n)->rb_left; 292 else if (key > nkey) 293 n = &(*n)->rb_right; 294 else 295 return -EEXIST; 296 } 297 rb_link_node(&m->tree_node, parent, n); 298 rb_insert_color(&m->tree_node, &grp->members); 299 return 0; 300 } 301 302 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 303 u32 node, u32 port, 304 u32 instance, int state) 305 { 306 struct tipc_member *m; 307 int ret; 308 309 m = kzalloc(sizeof(*m), GFP_ATOMIC); 310 if (!m) 311 return NULL; 312 INIT_LIST_HEAD(&m->list); 313 INIT_LIST_HEAD(&m->small_win); 314 __skb_queue_head_init(&m->deferredq); 315 m->group = grp; 316 m->node = node; 317 m->port = port; 318 m->instance = instance; 319 m->bc_acked = grp->bc_snd_nxt - 1; 320 ret = tipc_group_add_to_tree(grp, m); 321 if (ret < 0) { 322 kfree(m); 323 return NULL; 324 } 325 grp->member_cnt++; 326 tipc_nlist_add(&grp->dests, m->node); 327 m->state = state; 328 return m; 329 } 330 331 void tipc_group_add_member(struct tipc_group *grp, u32 node, 332 u32 port, u32 instance) 333 { 334 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 335 } 336 337 static void tipc_group_delete_member(struct tipc_group *grp, 338 struct tipc_member *m) 339 { 340 rb_erase(&m->tree_node, &grp->members); 341 grp->member_cnt--; 342 343 /* Check if we were waiting for replicast ack from this member */ 344 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 345 grp->bc_ackers--; 346 347 list_del_init(&m->list); 348 list_del_init(&m->small_win); 349 tipc_group_decr_active(grp, m); 350 351 /* If last member on a node, remove node from dest list */ 352 if (!tipc_group_find_node(grp, m->node)) 353 tipc_nlist_del(&grp->dests, m->node); 354 355 kfree(m); 356 } 357 358 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 359 { 360 return &grp->dests; 361 } 362 363 void tipc_group_self(struct tipc_group *grp, struct tipc_service_range *seq, 364 int *scope) 365 { 366 seq->type = grp->type; 367 seq->lower = grp->instance; 368 seq->upper = grp->instance; 369 *scope = grp->scope; 370 } 371 372 void tipc_group_update_member(struct tipc_member *m, int len) 373 { 374 struct tipc_group *grp = m->group; 375 struct tipc_member *_m, *tmp; 376 377 if (!tipc_group_is_receiver(m)) 378 return; 379 380 m->window -= len; 381 382 if (m->window >= ADV_IDLE) 383 return; 384 385 list_del_init(&m->small_win); 386 387 /* Sort member into small_window members' list */ 388 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 389 if (_m->window > m->window) 390 break; 391 } 392 list_add_tail(&m->small_win, &_m->small_win); 393 } 394 395 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 396 { 397 u16 prev = grp->bc_snd_nxt - 1; 398 struct tipc_member *m; 399 struct rb_node *n; 400 u16 ackers = 0; 401 402 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 403 m = container_of(n, struct tipc_member, tree_node); 404 if (tipc_group_is_receiver(m)) { 405 tipc_group_update_member(m, len); 406 m->bc_acked = prev; 407 ackers++; 408 } 409 } 410 411 /* Mark number of acknowledges to expect, if any */ 412 if (ack) 413 grp->bc_ackers = ackers; 414 grp->bc_snd_nxt++; 415 } 416 417 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 418 int len, struct tipc_member **mbr) 419 { 420 struct sk_buff_head xmitq; 421 struct tipc_member *m; 422 int adv, state; 423 424 m = tipc_group_find_dest(grp, dnode, dport); 425 if (!tipc_group_is_receiver(m)) { 426 *mbr = NULL; 427 return false; 428 } 429 *mbr = m; 430 431 if (m->window >= len) 432 return false; 433 434 *grp->open = false; 435 436 /* If not fully advertised, do it now to prevent mutual blocking */ 437 adv = m->advertised; 438 state = m->state; 439 if (state == MBR_JOINED && adv == ADV_IDLE) 440 return true; 441 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 442 return true; 443 if (state == MBR_PENDING && adv == ADV_IDLE) 444 return true; 445 __skb_queue_head_init(&xmitq); 446 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 447 tipc_node_distr_xmit(grp->net, &xmitq); 448 return true; 449 } 450 451 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 452 { 453 struct tipc_member *m = NULL; 454 455 /* If prev bcast was replicast, reject until all receivers have acked */ 456 if (grp->bc_ackers) { 457 *grp->open = false; 458 return true; 459 } 460 if (list_empty(&grp->small_win)) 461 return false; 462 463 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 464 if (m->window >= len) 465 return false; 466 467 return tipc_group_cong(grp, m->node, m->port, len, &m); 468 } 469 470 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 471 */ 472 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 473 { 474 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 475 u16 bc_seqno = msg_grp_bc_seqno(hdr); 476 struct sk_buff *_skb, *tmp; 477 int mtyp = msg_type(hdr); 478 479 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 480 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 481 skb_queue_walk_safe(defq, _skb, tmp) { 482 _hdr = buf_msg(_skb); 483 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 484 continue; 485 __skb_queue_before(defq, _skb, skb); 486 return; 487 } 488 /* Bcast was not bypassed, - add to tail */ 489 } 490 /* Unicasts are never bypassed, - always add to tail */ 491 __skb_queue_tail(defq, skb); 492 } 493 494 /* tipc_group_filter_msg() - determine if we should accept arriving message 495 */ 496 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 497 struct sk_buff_head *xmitq) 498 { 499 struct sk_buff *skb = __skb_dequeue(inputq); 500 bool ack, deliver, update, leave = false; 501 struct sk_buff_head *defq; 502 struct tipc_member *m; 503 struct tipc_msg *hdr; 504 u32 node, port; 505 int mtyp, blks; 506 507 if (!skb) 508 return; 509 510 hdr = buf_msg(skb); 511 node = msg_orignode(hdr); 512 port = msg_origport(hdr); 513 514 if (!msg_in_group(hdr)) 515 goto drop; 516 517 m = tipc_group_find_member(grp, node, port); 518 if (!tipc_group_is_sender(m)) 519 goto drop; 520 521 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 522 goto drop; 523 524 TIPC_SKB_CB(skb)->orig_member = m->instance; 525 defq = &m->deferredq; 526 tipc_group_sort_msg(skb, defq); 527 528 while ((skb = skb_peek(defq))) { 529 hdr = buf_msg(skb); 530 mtyp = msg_type(hdr); 531 blks = msg_blocks(hdr); 532 deliver = true; 533 ack = false; 534 update = false; 535 536 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 537 break; 538 539 /* Decide what to do with message */ 540 switch (mtyp) { 541 case TIPC_GRP_MCAST_MSG: 542 if (msg_nameinst(hdr) != grp->instance) { 543 update = true; 544 deliver = false; 545 } 546 fallthrough; 547 case TIPC_GRP_BCAST_MSG: 548 m->bc_rcv_nxt++; 549 ack = msg_grp_bc_ack_req(hdr); 550 break; 551 case TIPC_GRP_UCAST_MSG: 552 break; 553 case TIPC_GRP_MEMBER_EVT: 554 if (m->state == MBR_LEAVING) 555 leave = true; 556 if (!grp->events) 557 deliver = false; 558 break; 559 default: 560 break; 561 } 562 563 /* Execute decisions */ 564 __skb_dequeue(defq); 565 if (deliver) 566 __skb_queue_tail(inputq, skb); 567 else 568 kfree_skb(skb); 569 570 if (ack) 571 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 572 573 if (leave) { 574 __skb_queue_purge(defq); 575 tipc_group_delete_member(grp, m); 576 break; 577 } 578 if (!update) 579 continue; 580 581 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 582 } 583 return; 584 drop: 585 kfree_skb(skb); 586 } 587 588 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 589 u32 port, struct sk_buff_head *xmitq) 590 { 591 struct list_head *active = &grp->active; 592 int max_active = grp->max_active; 593 int reclaim_limit = max_active * 3 / 4; 594 int active_cnt = grp->active_cnt; 595 struct tipc_member *m, *rm, *pm; 596 597 m = tipc_group_find_member(grp, node, port); 598 if (!m) 599 return; 600 601 m->advertised -= blks; 602 603 switch (m->state) { 604 case MBR_JOINED: 605 /* First, decide if member can go active */ 606 if (active_cnt <= max_active) { 607 m->state = MBR_ACTIVE; 608 list_add_tail(&m->list, active); 609 grp->active_cnt++; 610 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 611 } else { 612 m->state = MBR_PENDING; 613 list_add_tail(&m->list, &grp->pending); 614 } 615 616 if (active_cnt < reclaim_limit) 617 break; 618 619 /* Reclaim from oldest active member, if possible */ 620 if (!list_empty(active)) { 621 rm = list_first_entry(active, struct tipc_member, list); 622 rm->state = MBR_RECLAIMING; 623 list_del_init(&rm->list); 624 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 625 break; 626 } 627 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 628 pm = list_first_entry(&grp->pending, struct tipc_member, list); 629 list_del_init(&pm->list); 630 pm->state = MBR_JOINED; 631 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 632 break; 633 case MBR_ACTIVE: 634 if (!list_is_last(&m->list, &grp->active)) 635 list_move_tail(&m->list, &grp->active); 636 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 637 break; 638 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 639 break; 640 case MBR_REMITTED: 641 if (m->advertised > ADV_IDLE) 642 break; 643 m->state = MBR_JOINED; 644 grp->active_cnt--; 645 if (m->advertised < ADV_IDLE) { 646 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 647 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 648 } 649 650 if (list_empty(&grp->pending)) 651 return; 652 653 /* Set oldest pending member to active and advertise */ 654 pm = list_first_entry(&grp->pending, struct tipc_member, list); 655 pm->state = MBR_ACTIVE; 656 list_move_tail(&pm->list, &grp->active); 657 grp->active_cnt++; 658 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 659 break; 660 case MBR_RECLAIMING: 661 case MBR_JOINING: 662 case MBR_LEAVING: 663 default: 664 break; 665 } 666 } 667 668 static void tipc_group_create_event(struct tipc_group *grp, 669 struct tipc_member *m, 670 u32 event, u16 seqno, 671 struct sk_buff_head *inputq) 672 { u32 dnode = tipc_own_addr(grp->net); 673 struct tipc_event evt; 674 struct sk_buff *skb; 675 struct tipc_msg *hdr; 676 677 memset(&evt, 0, sizeof(evt)); 678 evt.event = event; 679 evt.found_lower = m->instance; 680 evt.found_upper = m->instance; 681 evt.port.ref = m->port; 682 evt.port.node = m->node; 683 evt.s.seq.type = grp->type; 684 evt.s.seq.lower = m->instance; 685 evt.s.seq.upper = m->instance; 686 687 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 688 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 689 grp->portid, m->port, 0); 690 if (!skb) 691 return; 692 693 hdr = buf_msg(skb); 694 msg_set_nametype(hdr, grp->type); 695 msg_set_grp_evt(hdr, event); 696 msg_set_dest_droppable(hdr, true); 697 msg_set_grp_bc_seqno(hdr, seqno); 698 memcpy(msg_data(hdr), &evt, sizeof(evt)); 699 TIPC_SKB_CB(skb)->orig_member = m->instance; 700 __skb_queue_tail(inputq, skb); 701 } 702 703 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 704 int mtyp, struct sk_buff_head *xmitq) 705 { 706 struct tipc_msg *hdr; 707 struct sk_buff *skb; 708 int adv = 0; 709 710 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 711 m->node, tipc_own_addr(grp->net), 712 m->port, grp->portid, 0); 713 if (!skb) 714 return; 715 716 if (m->state == MBR_ACTIVE) 717 adv = ADV_ACTIVE - m->advertised; 718 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 719 adv = ADV_IDLE - m->advertised; 720 721 hdr = buf_msg(skb); 722 723 if (mtyp == GRP_JOIN_MSG) { 724 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 725 msg_set_adv_win(hdr, adv); 726 m->advertised += adv; 727 } else if (mtyp == GRP_LEAVE_MSG) { 728 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 729 } else if (mtyp == GRP_ADV_MSG) { 730 msg_set_adv_win(hdr, adv); 731 m->advertised += adv; 732 } else if (mtyp == GRP_ACK_MSG) { 733 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 734 } else if (mtyp == GRP_REMIT_MSG) { 735 msg_set_grp_remitted(hdr, m->window); 736 } 737 msg_set_dest_droppable(hdr, true); 738 __skb_queue_tail(xmitq, skb); 739 } 740 741 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 742 struct tipc_msg *hdr, struct sk_buff_head *inputq, 743 struct sk_buff_head *xmitq) 744 { 745 u32 node = msg_orignode(hdr); 746 u32 port = msg_origport(hdr); 747 struct tipc_member *m, *pm; 748 u16 remitted, in_flight; 749 750 if (!grp) 751 return; 752 753 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 754 return; 755 756 m = tipc_group_find_member(grp, node, port); 757 758 switch (msg_type(hdr)) { 759 case GRP_JOIN_MSG: 760 if (!m) 761 m = tipc_group_create_member(grp, node, port, 762 0, MBR_JOINING); 763 if (!m) 764 return; 765 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 766 m->bc_rcv_nxt = m->bc_syncpt; 767 m->window += msg_adv_win(hdr); 768 769 /* Wait until PUBLISH event is received if necessary */ 770 if (m->state != MBR_PUBLISHED) 771 return; 772 773 /* Member can be taken into service */ 774 m->state = MBR_JOINED; 775 tipc_group_open(m, usr_wakeup); 776 tipc_group_update_member(m, 0); 777 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 778 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 779 m->bc_syncpt, inputq); 780 return; 781 case GRP_LEAVE_MSG: 782 if (!m) 783 return; 784 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 785 list_del_init(&m->list); 786 tipc_group_open(m, usr_wakeup); 787 tipc_group_decr_active(grp, m); 788 m->state = MBR_LEAVING; 789 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 790 m->bc_syncpt, inputq); 791 return; 792 case GRP_ADV_MSG: 793 if (!m) 794 return; 795 m->window += msg_adv_win(hdr); 796 tipc_group_open(m, usr_wakeup); 797 return; 798 case GRP_ACK_MSG: 799 if (!m) 800 return; 801 m->bc_acked = msg_grp_bc_acked(hdr); 802 if (--grp->bc_ackers) 803 return; 804 list_del_init(&m->small_win); 805 *m->group->open = true; 806 *usr_wakeup = true; 807 tipc_group_update_member(m, 0); 808 return; 809 case GRP_RECLAIM_MSG: 810 if (!m) 811 return; 812 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 813 m->window = ADV_IDLE; 814 tipc_group_open(m, usr_wakeup); 815 return; 816 case GRP_REMIT_MSG: 817 if (!m || m->state != MBR_RECLAIMING) 818 return; 819 820 remitted = msg_grp_remitted(hdr); 821 822 /* Messages preceding the REMIT still in receive queue */ 823 if (m->advertised > remitted) { 824 m->state = MBR_REMITTED; 825 in_flight = m->advertised - remitted; 826 m->advertised = ADV_IDLE + in_flight; 827 return; 828 } 829 /* This should never happen */ 830 if (m->advertised < remitted) 831 pr_warn_ratelimited("Unexpected REMIT msg\n"); 832 833 /* All messages preceding the REMIT have been read */ 834 m->state = MBR_JOINED; 835 grp->active_cnt--; 836 m->advertised = ADV_IDLE; 837 838 /* Set oldest pending member to active and advertise */ 839 if (list_empty(&grp->pending)) 840 return; 841 pm = list_first_entry(&grp->pending, struct tipc_member, list); 842 pm->state = MBR_ACTIVE; 843 list_move_tail(&pm->list, &grp->active); 844 grp->active_cnt++; 845 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 846 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 847 return; 848 default: 849 pr_warn("Received unknown GROUP_PROTO message\n"); 850 } 851 } 852 853 /* tipc_group_member_evt() - receive and handle a member up/down event 854 */ 855 void tipc_group_member_evt(struct tipc_group *grp, 856 bool *usr_wakeup, 857 int *sk_rcvbuf, 858 struct tipc_msg *hdr, 859 struct sk_buff_head *inputq, 860 struct sk_buff_head *xmitq) 861 { 862 struct tipc_event *evt = (void *)msg_data(hdr); 863 u32 instance = evt->found_lower; 864 u32 node = evt->port.node; 865 u32 port = evt->port.ref; 866 int event = evt->event; 867 struct tipc_member *m; 868 struct net *net; 869 u32 self; 870 871 if (!grp) 872 return; 873 874 net = grp->net; 875 self = tipc_own_addr(net); 876 if (!grp->loopback && node == self && port == grp->portid) 877 return; 878 879 m = tipc_group_find_member(grp, node, port); 880 881 switch (event) { 882 case TIPC_PUBLISHED: 883 /* Send and wait for arrival of JOIN message if necessary */ 884 if (!m) { 885 m = tipc_group_create_member(grp, node, port, instance, 886 MBR_PUBLISHED); 887 if (!m) 888 break; 889 tipc_group_update_member(m, 0); 890 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 891 break; 892 } 893 894 if (m->state != MBR_JOINING) 895 break; 896 897 /* Member can be taken into service */ 898 m->instance = instance; 899 m->state = MBR_JOINED; 900 tipc_group_open(m, usr_wakeup); 901 tipc_group_update_member(m, 0); 902 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 903 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 904 m->bc_syncpt, inputq); 905 break; 906 case TIPC_WITHDRAWN: 907 if (!m) 908 break; 909 910 tipc_group_decr_active(grp, m); 911 m->state = MBR_LEAVING; 912 list_del_init(&m->list); 913 tipc_group_open(m, usr_wakeup); 914 915 /* Only send event if no LEAVE message can be expected */ 916 if (!tipc_node_is_up(net, node)) 917 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 918 m->bc_rcv_nxt, inputq); 919 break; 920 default: 921 break; 922 } 923 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 924 } 925 926 int tipc_group_fill_sock_diag(struct tipc_group *grp, struct sk_buff *skb) 927 { 928 struct nlattr *group = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_GROUP); 929 930 if (!group) 931 return -EMSGSIZE; 932 933 if (nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_ID, 934 grp->type) || 935 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_INSTANCE, 936 grp->instance) || 937 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_BC_SEND_NEXT, 938 grp->bc_snd_nxt)) 939 goto group_msg_cancel; 940 941 if (grp->scope == TIPC_NODE_SCOPE) 942 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_NODE_SCOPE)) 943 goto group_msg_cancel; 944 945 if (grp->scope == TIPC_CLUSTER_SCOPE) 946 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_CLUSTER_SCOPE)) 947 goto group_msg_cancel; 948 949 if (*grp->open) 950 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_OPEN)) 951 goto group_msg_cancel; 952 953 nla_nest_end(skb, group); 954 return 0; 955 956 group_msg_cancel: 957 nla_nest_cancel(skb, group); 958 return -1; 959 } 960