1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "server.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_QUARANTINED, 53 MBR_DISCOVERED, 54 MBR_JOINING, 55 MBR_PUBLISHED, 56 MBR_JOINED, 57 MBR_PENDING, 58 MBR_ACTIVE, 59 MBR_RECLAIMING, 60 MBR_REMITTED, 61 MBR_LEAVING 62 }; 63 64 struct tipc_member { 65 struct rb_node tree_node; 66 struct list_head list; 67 struct list_head congested; 68 struct sk_buff *event_msg; 69 struct sk_buff_head deferredq; 70 struct tipc_group *group; 71 u32 node; 72 u32 port; 73 u32 instance; 74 enum mbr_state state; 75 u16 advertised; 76 u16 window; 77 u16 bc_rcv_nxt; 78 u16 bc_syncpt; 79 u16 bc_acked; 80 bool usr_pending; 81 }; 82 83 struct tipc_group { 84 struct rb_root members; 85 struct list_head congested; 86 struct list_head pending; 87 struct list_head active; 88 struct list_head reclaiming; 89 struct tipc_nlist dests; 90 struct net *net; 91 int subid; 92 u32 type; 93 u32 instance; 94 u32 domain; 95 u32 scope; 96 u32 portid; 97 u16 member_cnt; 98 u16 active_cnt; 99 u16 max_active; 100 u16 bc_snd_nxt; 101 u16 bc_ackers; 102 bool loopback; 103 bool events; 104 }; 105 106 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 107 int mtyp, struct sk_buff_head *xmitq); 108 109 static void tipc_group_decr_active(struct tipc_group *grp, 110 struct tipc_member *m) 111 { 112 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) 113 grp->active_cnt--; 114 } 115 116 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 117 { 118 int max_active, active_pool, idle_pool; 119 int mcnt = grp->member_cnt + 1; 120 121 /* Limit simultaneous reception from other members */ 122 max_active = min(mcnt / 8, 64); 123 max_active = max(max_active, 16); 124 grp->max_active = max_active; 125 126 /* Reserve blocks for active and idle members */ 127 active_pool = max_active * ADV_ACTIVE; 128 idle_pool = (mcnt - max_active) * ADV_IDLE; 129 130 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 131 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 132 } 133 134 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 135 { 136 return grp->bc_snd_nxt; 137 } 138 139 static bool tipc_group_is_enabled(struct tipc_member *m) 140 { 141 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; 142 } 143 144 static bool tipc_group_is_receiver(struct tipc_member *m) 145 { 146 return m && m->state >= MBR_JOINED; 147 } 148 149 u32 tipc_group_exclude(struct tipc_group *grp) 150 { 151 if (!grp->loopback) 152 return grp->portid; 153 return 0; 154 } 155 156 int tipc_group_size(struct tipc_group *grp) 157 { 158 return grp->member_cnt; 159 } 160 161 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 162 struct tipc_group_req *mreq) 163 { 164 struct tipc_group *grp; 165 u32 type = mreq->type; 166 167 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 168 if (!grp) 169 return NULL; 170 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 171 INIT_LIST_HEAD(&grp->congested); 172 INIT_LIST_HEAD(&grp->active); 173 INIT_LIST_HEAD(&grp->pending); 174 INIT_LIST_HEAD(&grp->reclaiming); 175 grp->members = RB_ROOT; 176 grp->net = net; 177 grp->portid = portid; 178 grp->domain = addr_domain(net, mreq->scope); 179 grp->type = type; 180 grp->instance = mreq->instance; 181 grp->scope = mreq->scope; 182 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 183 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 184 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 185 return grp; 186 kfree(grp); 187 return NULL; 188 } 189 190 void tipc_group_delete(struct net *net, struct tipc_group *grp) 191 { 192 struct rb_root *tree = &grp->members; 193 struct tipc_member *m, *tmp; 194 struct sk_buff_head xmitq; 195 196 __skb_queue_head_init(&xmitq); 197 198 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 199 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 200 list_del(&m->list); 201 kfree(m); 202 } 203 tipc_node_distr_xmit(net, &xmitq); 204 tipc_nlist_purge(&grp->dests); 205 tipc_topsrv_kern_unsubscr(net, grp->subid); 206 kfree(grp); 207 } 208 209 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 210 u32 node, u32 port) 211 { 212 struct rb_node *n = grp->members.rb_node; 213 u64 nkey, key = (u64)node << 32 | port; 214 struct tipc_member *m; 215 216 while (n) { 217 m = container_of(n, struct tipc_member, tree_node); 218 nkey = (u64)m->node << 32 | m->port; 219 if (key < nkey) 220 n = n->rb_left; 221 else if (key > nkey) 222 n = n->rb_right; 223 else 224 return m; 225 } 226 return NULL; 227 } 228 229 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 230 u32 node, u32 port) 231 { 232 struct tipc_member *m; 233 234 m = tipc_group_find_member(grp, node, port); 235 if (m && tipc_group_is_enabled(m)) 236 return m; 237 return NULL; 238 } 239 240 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 241 u32 node) 242 { 243 struct tipc_member *m; 244 struct rb_node *n; 245 246 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 247 m = container_of(n, struct tipc_member, tree_node); 248 if (m->node == node) 249 return m; 250 } 251 return NULL; 252 } 253 254 static void tipc_group_add_to_tree(struct tipc_group *grp, 255 struct tipc_member *m) 256 { 257 u64 nkey, key = (u64)m->node << 32 | m->port; 258 struct rb_node **n, *parent = NULL; 259 struct tipc_member *tmp; 260 261 n = &grp->members.rb_node; 262 while (*n) { 263 tmp = container_of(*n, struct tipc_member, tree_node); 264 parent = *n; 265 tmp = container_of(parent, struct tipc_member, tree_node); 266 nkey = (u64)tmp->node << 32 | tmp->port; 267 if (key < nkey) 268 n = &(*n)->rb_left; 269 else if (key > nkey) 270 n = &(*n)->rb_right; 271 else 272 return; 273 } 274 rb_link_node(&m->tree_node, parent, n); 275 rb_insert_color(&m->tree_node, &grp->members); 276 } 277 278 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 279 u32 node, u32 port, 280 int state) 281 { 282 struct tipc_member *m; 283 284 m = kzalloc(sizeof(*m), GFP_ATOMIC); 285 if (!m) 286 return NULL; 287 INIT_LIST_HEAD(&m->list); 288 INIT_LIST_HEAD(&m->congested); 289 __skb_queue_head_init(&m->deferredq); 290 m->group = grp; 291 m->node = node; 292 m->port = port; 293 m->bc_acked = grp->bc_snd_nxt - 1; 294 grp->member_cnt++; 295 tipc_group_add_to_tree(grp, m); 296 tipc_nlist_add(&grp->dests, m->node); 297 m->state = state; 298 return m; 299 } 300 301 void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) 302 { 303 tipc_group_create_member(grp, node, port, MBR_DISCOVERED); 304 } 305 306 static void tipc_group_delete_member(struct tipc_group *grp, 307 struct tipc_member *m) 308 { 309 rb_erase(&m->tree_node, &grp->members); 310 grp->member_cnt--; 311 312 /* Check if we were waiting for replicast ack from this member */ 313 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 314 grp->bc_ackers--; 315 316 list_del_init(&m->list); 317 list_del_init(&m->congested); 318 tipc_group_decr_active(grp, m); 319 320 /* If last member on a node, remove node from dest list */ 321 if (!tipc_group_find_node(grp, m->node)) 322 tipc_nlist_del(&grp->dests, m->node); 323 324 kfree(m); 325 } 326 327 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 328 { 329 return &grp->dests; 330 } 331 332 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 333 int *scope) 334 { 335 seq->type = grp->type; 336 seq->lower = grp->instance; 337 seq->upper = grp->instance; 338 *scope = grp->scope; 339 } 340 341 void tipc_group_update_member(struct tipc_member *m, int len) 342 { 343 struct tipc_group *grp = m->group; 344 struct tipc_member *_m, *tmp; 345 346 if (!tipc_group_is_enabled(m)) 347 return; 348 349 m->window -= len; 350 351 if (m->window >= ADV_IDLE) 352 return; 353 354 list_del_init(&m->congested); 355 356 /* Sort member into congested members' list */ 357 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { 358 if (m->window > _m->window) 359 continue; 360 list_add_tail(&m->congested, &_m->congested); 361 return; 362 } 363 list_add_tail(&m->congested, &grp->congested); 364 } 365 366 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 367 { 368 u16 prev = grp->bc_snd_nxt - 1; 369 struct tipc_member *m; 370 struct rb_node *n; 371 u16 ackers = 0; 372 373 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 374 m = container_of(n, struct tipc_member, tree_node); 375 if (tipc_group_is_enabled(m)) { 376 tipc_group_update_member(m, len); 377 m->bc_acked = prev; 378 ackers++; 379 } 380 } 381 382 /* Mark number of acknowledges to expect, if any */ 383 if (ack) 384 grp->bc_ackers = ackers; 385 grp->bc_snd_nxt++; 386 } 387 388 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 389 int len, struct tipc_member **mbr) 390 { 391 struct sk_buff_head xmitq; 392 struct tipc_member *m; 393 int adv, state; 394 395 m = tipc_group_find_dest(grp, dnode, dport); 396 *mbr = m; 397 if (!m) 398 return false; 399 if (m->usr_pending) 400 return true; 401 if (m->window >= len) 402 return false; 403 m->usr_pending = true; 404 405 /* If not fully advertised, do it now to prevent mutual blocking */ 406 adv = m->advertised; 407 state = m->state; 408 if (state < MBR_JOINED) 409 return true; 410 if (state == MBR_JOINED && adv == ADV_IDLE) 411 return true; 412 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 413 return true; 414 if (state == MBR_PENDING && adv == ADV_IDLE) 415 return true; 416 skb_queue_head_init(&xmitq); 417 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 418 tipc_node_distr_xmit(grp->net, &xmitq); 419 return true; 420 } 421 422 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 423 { 424 struct tipc_member *m = NULL; 425 426 /* If prev bcast was replicast, reject until all receivers have acked */ 427 if (grp->bc_ackers) 428 return true; 429 430 if (list_empty(&grp->congested)) 431 return false; 432 433 m = list_first_entry(&grp->congested, struct tipc_member, congested); 434 if (m->window >= len) 435 return false; 436 437 return tipc_group_cong(grp, m->node, m->port, len, &m); 438 } 439 440 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 441 */ 442 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 443 { 444 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 445 u16 bc_seqno = msg_grp_bc_seqno(hdr); 446 struct sk_buff *_skb, *tmp; 447 int mtyp = msg_type(hdr); 448 449 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 450 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 451 skb_queue_walk_safe(defq, _skb, tmp) { 452 _hdr = buf_msg(_skb); 453 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 454 continue; 455 __skb_queue_before(defq, _skb, skb); 456 return; 457 } 458 /* Bcast was not bypassed, - add to tail */ 459 } 460 /* Unicasts are never bypassed, - always add to tail */ 461 __skb_queue_tail(defq, skb); 462 } 463 464 /* tipc_group_filter_msg() - determine if we should accept arriving message 465 */ 466 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 467 struct sk_buff_head *xmitq) 468 { 469 struct sk_buff *skb = __skb_dequeue(inputq); 470 bool ack, deliver, update, leave = false; 471 struct sk_buff_head *defq; 472 struct tipc_member *m; 473 struct tipc_msg *hdr; 474 u32 node, port; 475 int mtyp, blks; 476 477 if (!skb) 478 return; 479 480 hdr = buf_msg(skb); 481 node = msg_orignode(hdr); 482 port = msg_origport(hdr); 483 484 if (!msg_in_group(hdr)) 485 goto drop; 486 487 m = tipc_group_find_member(grp, node, port); 488 if (!tipc_group_is_receiver(m)) 489 goto drop; 490 491 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 492 goto drop; 493 494 TIPC_SKB_CB(skb)->orig_member = m->instance; 495 defq = &m->deferredq; 496 tipc_group_sort_msg(skb, defq); 497 498 while ((skb = skb_peek(defq))) { 499 hdr = buf_msg(skb); 500 mtyp = msg_type(hdr); 501 blks = msg_blocks(hdr); 502 deliver = true; 503 ack = false; 504 update = false; 505 506 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 507 break; 508 509 /* Decide what to do with message */ 510 switch (mtyp) { 511 case TIPC_GRP_MCAST_MSG: 512 if (msg_nameinst(hdr) != grp->instance) { 513 update = true; 514 deliver = false; 515 } 516 /* Fall thru */ 517 case TIPC_GRP_BCAST_MSG: 518 m->bc_rcv_nxt++; 519 ack = msg_grp_bc_ack_req(hdr); 520 break; 521 case TIPC_GRP_UCAST_MSG: 522 break; 523 case TIPC_GRP_MEMBER_EVT: 524 if (m->state == MBR_LEAVING) 525 leave = true; 526 if (!grp->events) 527 deliver = false; 528 break; 529 default: 530 break; 531 } 532 533 /* Execute decisions */ 534 __skb_dequeue(defq); 535 if (deliver) 536 __skb_queue_tail(inputq, skb); 537 else 538 kfree_skb(skb); 539 540 if (ack) 541 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 542 543 if (leave) { 544 __skb_queue_purge(defq); 545 tipc_group_delete_member(grp, m); 546 break; 547 } 548 if (!update) 549 continue; 550 551 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 552 } 553 return; 554 drop: 555 kfree_skb(skb); 556 } 557 558 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 559 u32 port, struct sk_buff_head *xmitq) 560 { 561 struct list_head *active = &grp->active; 562 int max_active = grp->max_active; 563 int reclaim_limit = max_active * 3 / 4; 564 int active_cnt = grp->active_cnt; 565 struct tipc_member *m, *rm; 566 567 m = tipc_group_find_member(grp, node, port); 568 if (!m) 569 return; 570 571 m->advertised -= blks; 572 573 switch (m->state) { 574 case MBR_JOINED: 575 /* Reclaim advertised space from least active member */ 576 if (!list_empty(active) && active_cnt >= reclaim_limit) { 577 rm = list_first_entry(active, struct tipc_member, list); 578 rm->state = MBR_RECLAIMING; 579 list_move_tail(&rm->list, &grp->reclaiming); 580 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 581 } 582 /* If max active, become pending and wait for reclaimed space */ 583 if (active_cnt >= max_active) { 584 m->state = MBR_PENDING; 585 list_add_tail(&m->list, &grp->pending); 586 break; 587 } 588 /* Otherwise become active */ 589 m->state = MBR_ACTIVE; 590 list_add_tail(&m->list, &grp->active); 591 grp->active_cnt++; 592 /* Fall through */ 593 case MBR_ACTIVE: 594 if (!list_is_last(&m->list, &grp->active)) 595 list_move_tail(&m->list, &grp->active); 596 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 597 break; 598 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 599 break; 600 case MBR_REMITTED: 601 if (m->advertised > ADV_IDLE) 602 break; 603 m->state = MBR_JOINED; 604 if (m->advertised < ADV_IDLE) { 605 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 606 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 607 } 608 break; 609 case MBR_RECLAIMING: 610 case MBR_DISCOVERED: 611 case MBR_JOINING: 612 case MBR_LEAVING: 613 default: 614 break; 615 } 616 } 617 618 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 619 int mtyp, struct sk_buff_head *xmitq) 620 { 621 struct tipc_msg *hdr; 622 struct sk_buff *skb; 623 int adv = 0; 624 625 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 626 m->node, tipc_own_addr(grp->net), 627 m->port, grp->portid, 0); 628 if (!skb) 629 return; 630 631 if (m->state == MBR_ACTIVE) 632 adv = ADV_ACTIVE - m->advertised; 633 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 634 adv = ADV_IDLE - m->advertised; 635 636 hdr = buf_msg(skb); 637 638 if (mtyp == GRP_JOIN_MSG) { 639 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 640 msg_set_adv_win(hdr, adv); 641 m->advertised += adv; 642 } else if (mtyp == GRP_LEAVE_MSG) { 643 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 644 } else if (mtyp == GRP_ADV_MSG) { 645 msg_set_adv_win(hdr, adv); 646 m->advertised += adv; 647 } else if (mtyp == GRP_ACK_MSG) { 648 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 649 } else if (mtyp == GRP_REMIT_MSG) { 650 msg_set_grp_remitted(hdr, m->window); 651 } 652 msg_set_dest_droppable(hdr, true); 653 __skb_queue_tail(xmitq, skb); 654 } 655 656 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 657 struct tipc_msg *hdr, struct sk_buff_head *inputq, 658 struct sk_buff_head *xmitq) 659 { 660 u32 node = msg_orignode(hdr); 661 u32 port = msg_origport(hdr); 662 struct tipc_member *m, *pm; 663 struct tipc_msg *ehdr; 664 u16 remitted, in_flight; 665 666 if (!grp) 667 return; 668 669 m = tipc_group_find_member(grp, node, port); 670 671 switch (msg_type(hdr)) { 672 case GRP_JOIN_MSG: 673 if (!m) 674 m = tipc_group_create_member(grp, node, port, 675 MBR_QUARANTINED); 676 if (!m) 677 return; 678 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 679 m->bc_rcv_nxt = m->bc_syncpt; 680 m->window += msg_adv_win(hdr); 681 682 /* Wait until PUBLISH event is received */ 683 if (m->state == MBR_DISCOVERED) { 684 m->state = MBR_JOINING; 685 } else if (m->state == MBR_PUBLISHED) { 686 m->state = MBR_JOINED; 687 *usr_wakeup = true; 688 m->usr_pending = false; 689 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 690 ehdr = buf_msg(m->event_msg); 691 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 692 __skb_queue_tail(inputq, m->event_msg); 693 } 694 list_del_init(&m->congested); 695 tipc_group_update_member(m, 0); 696 return; 697 case GRP_LEAVE_MSG: 698 if (!m) 699 return; 700 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 701 list_del_init(&m->list); 702 list_del_init(&m->congested); 703 *usr_wakeup = true; 704 705 /* Wait until WITHDRAW event is received */ 706 if (m->state != MBR_LEAVING) { 707 tipc_group_decr_active(grp, m); 708 m->state = MBR_LEAVING; 709 return; 710 } 711 /* Otherwise deliver already received WITHDRAW event */ 712 ehdr = buf_msg(m->event_msg); 713 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 714 __skb_queue_tail(inputq, m->event_msg); 715 return; 716 case GRP_ADV_MSG: 717 if (!m) 718 return; 719 m->window += msg_adv_win(hdr); 720 *usr_wakeup = m->usr_pending; 721 m->usr_pending = false; 722 list_del_init(&m->congested); 723 return; 724 case GRP_ACK_MSG: 725 if (!m) 726 return; 727 m->bc_acked = msg_grp_bc_acked(hdr); 728 if (--grp->bc_ackers) 729 break; 730 *usr_wakeup = true; 731 m->usr_pending = false; 732 return; 733 case GRP_RECLAIM_MSG: 734 if (!m) 735 return; 736 *usr_wakeup = m->usr_pending; 737 m->usr_pending = false; 738 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 739 m->window = ADV_IDLE; 740 return; 741 case GRP_REMIT_MSG: 742 if (!m || m->state != MBR_RECLAIMING) 743 return; 744 745 list_del_init(&m->list); 746 grp->active_cnt--; 747 remitted = msg_grp_remitted(hdr); 748 749 /* Messages preceding the REMIT still in receive queue */ 750 if (m->advertised > remitted) { 751 m->state = MBR_REMITTED; 752 in_flight = m->advertised - remitted; 753 } 754 /* All messages preceding the REMIT have been read */ 755 if (m->advertised <= remitted) { 756 m->state = MBR_JOINED; 757 in_flight = 0; 758 } 759 /* ..and the REMIT overtaken by more messages => re-advertise */ 760 if (m->advertised < remitted) 761 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 762 763 m->advertised = ADV_IDLE + in_flight; 764 765 /* Set oldest pending member to active and advertise */ 766 if (list_empty(&grp->pending)) 767 return; 768 pm = list_first_entry(&grp->pending, struct tipc_member, list); 769 pm->state = MBR_ACTIVE; 770 list_move_tail(&pm->list, &grp->active); 771 grp->active_cnt++; 772 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 773 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 774 return; 775 default: 776 pr_warn("Received unknown GROUP_PROTO message\n"); 777 } 778 } 779 780 /* tipc_group_member_evt() - receive and handle a member up/down event 781 */ 782 void tipc_group_member_evt(struct tipc_group *grp, 783 bool *usr_wakeup, 784 int *sk_rcvbuf, 785 struct sk_buff *skb, 786 struct sk_buff_head *inputq, 787 struct sk_buff_head *xmitq) 788 { 789 struct tipc_msg *hdr = buf_msg(skb); 790 struct tipc_event *evt = (void *)msg_data(hdr); 791 u32 instance = evt->found_lower; 792 u32 node = evt->port.node; 793 u32 port = evt->port.ref; 794 int event = evt->event; 795 struct tipc_member *m; 796 struct net *net; 797 bool node_up; 798 u32 self; 799 800 if (!grp) 801 goto drop; 802 803 net = grp->net; 804 self = tipc_own_addr(net); 805 if (!grp->loopback && node == self && port == grp->portid) 806 goto drop; 807 808 /* Convert message before delivery to user */ 809 msg_set_hdr_sz(hdr, GROUP_H_SIZE); 810 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); 811 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); 812 msg_set_origport(hdr, port); 813 msg_set_orignode(hdr, node); 814 msg_set_nametype(hdr, grp->type); 815 msg_set_grp_evt(hdr, event); 816 817 m = tipc_group_find_member(grp, node, port); 818 819 if (event == TIPC_PUBLISHED) { 820 if (!m) 821 m = tipc_group_create_member(grp, node, port, 822 MBR_DISCOVERED); 823 if (!m) 824 goto drop; 825 826 /* Hold back event if JOIN message not yet received */ 827 if (m->state == MBR_DISCOVERED) { 828 m->event_msg = skb; 829 m->state = MBR_PUBLISHED; 830 } else { 831 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 832 __skb_queue_tail(inputq, skb); 833 m->state = MBR_JOINED; 834 *usr_wakeup = true; 835 m->usr_pending = false; 836 } 837 m->instance = instance; 838 TIPC_SKB_CB(skb)->orig_member = m->instance; 839 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 840 if (m->window < ADV_IDLE) 841 tipc_group_update_member(m, 0); 842 else 843 list_del_init(&m->congested); 844 } else if (event == TIPC_WITHDRAWN) { 845 if (!m) 846 goto drop; 847 848 TIPC_SKB_CB(skb)->orig_member = m->instance; 849 850 *usr_wakeup = true; 851 m->usr_pending = false; 852 node_up = tipc_node_is_up(net, node); 853 m->event_msg = NULL; 854 855 if (node_up) { 856 /* Hold back event if a LEAVE msg should be expected */ 857 if (m->state != MBR_LEAVING) { 858 m->event_msg = skb; 859 tipc_group_decr_active(grp, m); 860 m->state = MBR_LEAVING; 861 } else { 862 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 863 __skb_queue_tail(inputq, skb); 864 } 865 } else { 866 if (m->state != MBR_LEAVING) { 867 tipc_group_decr_active(grp, m); 868 m->state = MBR_LEAVING; 869 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); 870 } else { 871 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 872 } 873 __skb_queue_tail(inputq, skb); 874 } 875 list_del_init(&m->list); 876 list_del_init(&m->congested); 877 } 878 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 879 return; 880 drop: 881 kfree_skb(skb); 882 } 883