1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "server.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_QUARANTINED, 53 MBR_DISCOVERED, 54 MBR_JOINING, 55 MBR_PUBLISHED, 56 MBR_JOINED, 57 MBR_PENDING, 58 MBR_ACTIVE, 59 MBR_RECLAIMING, 60 MBR_REMITTED, 61 MBR_LEAVING 62 }; 63 64 struct tipc_member { 65 struct rb_node tree_node; 66 struct list_head list; 67 struct list_head congested; 68 struct sk_buff *event_msg; 69 struct sk_buff_head deferredq; 70 struct tipc_group *group; 71 u32 node; 72 u32 port; 73 u32 instance; 74 enum mbr_state state; 75 u16 advertised; 76 u16 window; 77 u16 bc_rcv_nxt; 78 u16 bc_syncpt; 79 u16 bc_acked; 80 bool usr_pending; 81 }; 82 83 struct tipc_group { 84 struct rb_root members; 85 struct list_head congested; 86 struct list_head pending; 87 struct list_head active; 88 struct list_head reclaiming; 89 struct tipc_nlist dests; 90 struct net *net; 91 int subid; 92 u32 type; 93 u32 instance; 94 u32 domain; 95 u32 scope; 96 u32 portid; 97 u16 member_cnt; 98 u16 active_cnt; 99 u16 max_active; 100 u16 bc_snd_nxt; 101 u16 bc_ackers; 102 bool loopback; 103 bool events; 104 }; 105 106 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 107 int mtyp, struct sk_buff_head *xmitq); 108 109 static void tipc_group_decr_active(struct tipc_group *grp, 110 struct tipc_member *m) 111 { 112 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING) 113 grp->active_cnt--; 114 } 115 116 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 117 { 118 int max_active, active_pool, idle_pool; 119 int mcnt = grp->member_cnt + 1; 120 121 /* Limit simultaneous reception from other members */ 122 max_active = min(mcnt / 8, 64); 123 max_active = max(max_active, 16); 124 grp->max_active = max_active; 125 126 /* Reserve blocks for active and idle members */ 127 active_pool = max_active * ADV_ACTIVE; 128 idle_pool = (mcnt - max_active) * ADV_IDLE; 129 130 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 131 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 132 } 133 134 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 135 { 136 return grp->bc_snd_nxt; 137 } 138 139 static bool tipc_group_is_enabled(struct tipc_member *m) 140 { 141 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; 142 } 143 144 static bool tipc_group_is_receiver(struct tipc_member *m) 145 { 146 return m && m->state >= MBR_JOINED; 147 } 148 149 u32 tipc_group_exclude(struct tipc_group *grp) 150 { 151 if (!grp->loopback) 152 return grp->portid; 153 return 0; 154 } 155 156 int tipc_group_size(struct tipc_group *grp) 157 { 158 return grp->member_cnt; 159 } 160 161 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 162 struct tipc_group_req *mreq) 163 { 164 struct tipc_group *grp; 165 u32 type = mreq->type; 166 167 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 168 if (!grp) 169 return NULL; 170 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 171 INIT_LIST_HEAD(&grp->congested); 172 INIT_LIST_HEAD(&grp->active); 173 INIT_LIST_HEAD(&grp->pending); 174 INIT_LIST_HEAD(&grp->reclaiming); 175 grp->members = RB_ROOT; 176 grp->net = net; 177 grp->portid = portid; 178 grp->domain = addr_domain(net, mreq->scope); 179 grp->type = type; 180 grp->instance = mreq->instance; 181 grp->scope = mreq->scope; 182 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 183 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 184 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 185 return grp; 186 kfree(grp); 187 return NULL; 188 } 189 190 void tipc_group_delete(struct net *net, struct tipc_group *grp) 191 { 192 struct rb_root *tree = &grp->members; 193 struct tipc_member *m, *tmp; 194 struct sk_buff_head xmitq; 195 196 __skb_queue_head_init(&xmitq); 197 198 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 199 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 200 list_del(&m->list); 201 kfree(m); 202 } 203 tipc_node_distr_xmit(net, &xmitq); 204 tipc_nlist_purge(&grp->dests); 205 tipc_topsrv_kern_unsubscr(net, grp->subid); 206 kfree(grp); 207 } 208 209 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 210 u32 node, u32 port) 211 { 212 struct rb_node *n = grp->members.rb_node; 213 u64 nkey, key = (u64)node << 32 | port; 214 struct tipc_member *m; 215 216 while (n) { 217 m = container_of(n, struct tipc_member, tree_node); 218 nkey = (u64)m->node << 32 | m->port; 219 if (key < nkey) 220 n = n->rb_left; 221 else if (key > nkey) 222 n = n->rb_right; 223 else 224 return m; 225 } 226 return NULL; 227 } 228 229 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 230 u32 node, u32 port) 231 { 232 struct tipc_member *m; 233 234 m = tipc_group_find_member(grp, node, port); 235 if (m && tipc_group_is_enabled(m)) 236 return m; 237 return NULL; 238 } 239 240 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 241 u32 node) 242 { 243 struct tipc_member *m; 244 struct rb_node *n; 245 246 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 247 m = container_of(n, struct tipc_member, tree_node); 248 if (m->node == node) 249 return m; 250 } 251 return NULL; 252 } 253 254 static void tipc_group_add_to_tree(struct tipc_group *grp, 255 struct tipc_member *m) 256 { 257 u64 nkey, key = (u64)m->node << 32 | m->port; 258 struct rb_node **n, *parent = NULL; 259 struct tipc_member *tmp; 260 261 n = &grp->members.rb_node; 262 while (*n) { 263 tmp = container_of(*n, struct tipc_member, tree_node); 264 parent = *n; 265 tmp = container_of(parent, struct tipc_member, tree_node); 266 nkey = (u64)tmp->node << 32 | tmp->port; 267 if (key < nkey) 268 n = &(*n)->rb_left; 269 else if (key > nkey) 270 n = &(*n)->rb_right; 271 else 272 return; 273 } 274 rb_link_node(&m->tree_node, parent, n); 275 rb_insert_color(&m->tree_node, &grp->members); 276 } 277 278 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 279 u32 node, u32 port, 280 int state) 281 { 282 struct tipc_member *m; 283 284 m = kzalloc(sizeof(*m), GFP_ATOMIC); 285 if (!m) 286 return NULL; 287 INIT_LIST_HEAD(&m->list); 288 INIT_LIST_HEAD(&m->congested); 289 __skb_queue_head_init(&m->deferredq); 290 m->group = grp; 291 m->node = node; 292 m->port = port; 293 m->bc_acked = grp->bc_snd_nxt - 1; 294 grp->member_cnt++; 295 tipc_group_add_to_tree(grp, m); 296 tipc_nlist_add(&grp->dests, m->node); 297 m->state = state; 298 return m; 299 } 300 301 void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) 302 { 303 tipc_group_create_member(grp, node, port, MBR_DISCOVERED); 304 } 305 306 static void tipc_group_delete_member(struct tipc_group *grp, 307 struct tipc_member *m) 308 { 309 rb_erase(&m->tree_node, &grp->members); 310 grp->member_cnt--; 311 312 /* Check if we were waiting for replicast ack from this member */ 313 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 314 grp->bc_ackers--; 315 316 list_del_init(&m->list); 317 list_del_init(&m->congested); 318 tipc_group_decr_active(grp, m); 319 320 /* If last member on a node, remove node from dest list */ 321 if (!tipc_group_find_node(grp, m->node)) 322 tipc_nlist_del(&grp->dests, m->node); 323 324 kfree(m); 325 } 326 327 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 328 { 329 return &grp->dests; 330 } 331 332 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 333 int *scope) 334 { 335 seq->type = grp->type; 336 seq->lower = grp->instance; 337 seq->upper = grp->instance; 338 *scope = grp->scope; 339 } 340 341 void tipc_group_update_member(struct tipc_member *m, int len) 342 { 343 struct tipc_group *grp = m->group; 344 struct tipc_member *_m, *tmp; 345 346 if (!tipc_group_is_enabled(m)) 347 return; 348 349 m->window -= len; 350 351 if (m->window >= ADV_IDLE) 352 return; 353 354 if (!list_empty(&m->congested)) 355 return; 356 357 /* Sort member into congested members' list */ 358 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { 359 if (m->window > _m->window) 360 continue; 361 list_add_tail(&m->congested, &_m->congested); 362 return; 363 } 364 list_add_tail(&m->congested, &grp->congested); 365 } 366 367 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 368 { 369 u16 prev = grp->bc_snd_nxt - 1; 370 struct tipc_member *m; 371 struct rb_node *n; 372 373 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 374 m = container_of(n, struct tipc_member, tree_node); 375 if (tipc_group_is_enabled(m)) { 376 tipc_group_update_member(m, len); 377 m->bc_acked = prev; 378 } 379 } 380 381 /* Mark number of acknowledges to expect, if any */ 382 if (ack) 383 grp->bc_ackers = grp->member_cnt; 384 grp->bc_snd_nxt++; 385 } 386 387 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 388 int len, struct tipc_member **mbr) 389 { 390 struct sk_buff_head xmitq; 391 struct tipc_member *m; 392 int adv, state; 393 394 m = tipc_group_find_dest(grp, dnode, dport); 395 *mbr = m; 396 if (!m) 397 return false; 398 if (m->usr_pending) 399 return true; 400 if (m->window >= len) 401 return false; 402 m->usr_pending = true; 403 404 /* If not fully advertised, do it now to prevent mutual blocking */ 405 adv = m->advertised; 406 state = m->state; 407 if (state < MBR_JOINED) 408 return true; 409 if (state == MBR_JOINED && adv == ADV_IDLE) 410 return true; 411 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 412 return true; 413 if (state == MBR_PENDING && adv == ADV_IDLE) 414 return true; 415 skb_queue_head_init(&xmitq); 416 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 417 tipc_node_distr_xmit(grp->net, &xmitq); 418 return true; 419 } 420 421 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 422 { 423 struct tipc_member *m = NULL; 424 425 /* If prev bcast was replicast, reject until all receivers have acked */ 426 if (grp->bc_ackers) 427 return true; 428 429 if (list_empty(&grp->congested)) 430 return false; 431 432 m = list_first_entry(&grp->congested, struct tipc_member, congested); 433 if (m->window >= len) 434 return false; 435 436 return tipc_group_cong(grp, m->node, m->port, len, &m); 437 } 438 439 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 440 */ 441 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 442 { 443 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 444 u16 bc_seqno = msg_grp_bc_seqno(hdr); 445 struct sk_buff *_skb, *tmp; 446 int mtyp = msg_type(hdr); 447 448 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 449 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 450 skb_queue_walk_safe(defq, _skb, tmp) { 451 _hdr = buf_msg(_skb); 452 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 453 continue; 454 __skb_queue_before(defq, _skb, skb); 455 return; 456 } 457 /* Bcast was not bypassed, - add to tail */ 458 } 459 /* Unicasts are never bypassed, - always add to tail */ 460 __skb_queue_tail(defq, skb); 461 } 462 463 /* tipc_group_filter_msg() - determine if we should accept arriving message 464 */ 465 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 466 struct sk_buff_head *xmitq) 467 { 468 struct sk_buff *skb = __skb_dequeue(inputq); 469 bool ack, deliver, update, leave = false; 470 struct sk_buff_head *defq; 471 struct tipc_member *m; 472 struct tipc_msg *hdr; 473 u32 node, port; 474 int mtyp, blks; 475 476 if (!skb) 477 return; 478 479 hdr = buf_msg(skb); 480 node = msg_orignode(hdr); 481 port = msg_origport(hdr); 482 483 if (!msg_in_group(hdr)) 484 goto drop; 485 486 m = tipc_group_find_member(grp, node, port); 487 if (!tipc_group_is_receiver(m)) 488 goto drop; 489 490 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 491 goto drop; 492 493 TIPC_SKB_CB(skb)->orig_member = m->instance; 494 defq = &m->deferredq; 495 tipc_group_sort_msg(skb, defq); 496 497 while ((skb = skb_peek(defq))) { 498 hdr = buf_msg(skb); 499 mtyp = msg_type(hdr); 500 deliver = true; 501 ack = false; 502 update = false; 503 504 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 505 break; 506 507 /* Decide what to do with message */ 508 switch (mtyp) { 509 case TIPC_GRP_MCAST_MSG: 510 if (msg_nameinst(hdr) != grp->instance) { 511 update = true; 512 deliver = false; 513 } 514 /* Fall thru */ 515 case TIPC_GRP_BCAST_MSG: 516 m->bc_rcv_nxt++; 517 ack = msg_grp_bc_ack_req(hdr); 518 break; 519 case TIPC_GRP_UCAST_MSG: 520 break; 521 case TIPC_GRP_MEMBER_EVT: 522 if (m->state == MBR_LEAVING) 523 leave = true; 524 if (!grp->events) 525 deliver = false; 526 break; 527 default: 528 break; 529 } 530 531 /* Execute decisions */ 532 __skb_dequeue(defq); 533 if (deliver) 534 __skb_queue_tail(inputq, skb); 535 else 536 kfree_skb(skb); 537 538 if (ack) 539 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 540 541 if (leave) { 542 __skb_queue_purge(defq); 543 tipc_group_delete_member(grp, m); 544 break; 545 } 546 if (!update) 547 continue; 548 549 blks = msg_blocks(hdr); 550 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 551 } 552 return; 553 drop: 554 kfree_skb(skb); 555 } 556 557 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 558 u32 port, struct sk_buff_head *xmitq) 559 { 560 struct list_head *active = &grp->active; 561 int max_active = grp->max_active; 562 int reclaim_limit = max_active * 3 / 4; 563 int active_cnt = grp->active_cnt; 564 struct tipc_member *m, *rm; 565 566 m = tipc_group_find_member(grp, node, port); 567 if (!m) 568 return; 569 570 m->advertised -= blks; 571 572 switch (m->state) { 573 case MBR_JOINED: 574 /* Reclaim advertised space from least active member */ 575 if (!list_empty(active) && active_cnt >= reclaim_limit) { 576 rm = list_first_entry(active, struct tipc_member, list); 577 rm->state = MBR_RECLAIMING; 578 list_move_tail(&rm->list, &grp->reclaiming); 579 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 580 } 581 /* If max active, become pending and wait for reclaimed space */ 582 if (active_cnt >= max_active) { 583 m->state = MBR_PENDING; 584 list_add_tail(&m->list, &grp->pending); 585 break; 586 } 587 /* Otherwise become active */ 588 m->state = MBR_ACTIVE; 589 list_add_tail(&m->list, &grp->active); 590 grp->active_cnt++; 591 /* Fall through */ 592 case MBR_ACTIVE: 593 if (!list_is_last(&m->list, &grp->active)) 594 list_move_tail(&m->list, &grp->active); 595 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 596 break; 597 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 598 break; 599 case MBR_REMITTED: 600 if (m->advertised > ADV_IDLE) 601 break; 602 m->state = MBR_JOINED; 603 if (m->advertised < ADV_IDLE) { 604 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 605 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 606 } 607 break; 608 case MBR_RECLAIMING: 609 case MBR_DISCOVERED: 610 case MBR_JOINING: 611 case MBR_LEAVING: 612 default: 613 break; 614 } 615 } 616 617 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 618 int mtyp, struct sk_buff_head *xmitq) 619 { 620 struct tipc_msg *hdr; 621 struct sk_buff *skb; 622 int adv = 0; 623 624 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 625 m->node, tipc_own_addr(grp->net), 626 m->port, grp->portid, 0); 627 if (!skb) 628 return; 629 630 if (m->state == MBR_ACTIVE) 631 adv = ADV_ACTIVE - m->advertised; 632 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 633 adv = ADV_IDLE - m->advertised; 634 635 hdr = buf_msg(skb); 636 637 if (mtyp == GRP_JOIN_MSG) { 638 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 639 msg_set_adv_win(hdr, adv); 640 m->advertised += adv; 641 } else if (mtyp == GRP_LEAVE_MSG) { 642 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 643 } else if (mtyp == GRP_ADV_MSG) { 644 msg_set_adv_win(hdr, adv); 645 m->advertised += adv; 646 } else if (mtyp == GRP_ACK_MSG) { 647 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 648 } else if (mtyp == GRP_REMIT_MSG) { 649 msg_set_grp_remitted(hdr, m->window); 650 } 651 __skb_queue_tail(xmitq, skb); 652 } 653 654 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 655 struct tipc_msg *hdr, struct sk_buff_head *inputq, 656 struct sk_buff_head *xmitq) 657 { 658 u32 node = msg_orignode(hdr); 659 u32 port = msg_origport(hdr); 660 struct tipc_member *m, *pm; 661 struct tipc_msg *ehdr; 662 u16 remitted, in_flight; 663 664 if (!grp) 665 return; 666 667 m = tipc_group_find_member(grp, node, port); 668 669 switch (msg_type(hdr)) { 670 case GRP_JOIN_MSG: 671 if (!m) 672 m = tipc_group_create_member(grp, node, port, 673 MBR_QUARANTINED); 674 if (!m) 675 return; 676 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 677 m->bc_rcv_nxt = m->bc_syncpt; 678 m->window += msg_adv_win(hdr); 679 680 /* Wait until PUBLISH event is received */ 681 if (m->state == MBR_DISCOVERED) { 682 m->state = MBR_JOINING; 683 } else if (m->state == MBR_PUBLISHED) { 684 m->state = MBR_JOINED; 685 *usr_wakeup = true; 686 m->usr_pending = false; 687 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 688 ehdr = buf_msg(m->event_msg); 689 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 690 __skb_queue_tail(inputq, m->event_msg); 691 } 692 if (m->window < ADV_IDLE) 693 tipc_group_update_member(m, 0); 694 else 695 list_del_init(&m->congested); 696 return; 697 case GRP_LEAVE_MSG: 698 if (!m) 699 return; 700 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 701 702 /* Wait until WITHDRAW event is received */ 703 if (m->state != MBR_LEAVING) { 704 tipc_group_decr_active(grp, m); 705 m->state = MBR_LEAVING; 706 return; 707 } 708 /* Otherwise deliver already received WITHDRAW event */ 709 ehdr = buf_msg(m->event_msg); 710 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt); 711 __skb_queue_tail(inputq, m->event_msg); 712 *usr_wakeup = true; 713 list_del_init(&m->congested); 714 return; 715 case GRP_ADV_MSG: 716 if (!m) 717 return; 718 m->window += msg_adv_win(hdr); 719 *usr_wakeup = m->usr_pending; 720 m->usr_pending = false; 721 list_del_init(&m->congested); 722 return; 723 case GRP_ACK_MSG: 724 if (!m) 725 return; 726 m->bc_acked = msg_grp_bc_acked(hdr); 727 if (--grp->bc_ackers) 728 break; 729 *usr_wakeup = true; 730 m->usr_pending = false; 731 return; 732 case GRP_RECLAIM_MSG: 733 if (!m) 734 return; 735 *usr_wakeup = m->usr_pending; 736 m->usr_pending = false; 737 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 738 m->window = ADV_IDLE; 739 return; 740 case GRP_REMIT_MSG: 741 if (!m || m->state != MBR_RECLAIMING) 742 return; 743 744 list_del_init(&m->list); 745 grp->active_cnt--; 746 remitted = msg_grp_remitted(hdr); 747 748 /* Messages preceding the REMIT still in receive queue */ 749 if (m->advertised > remitted) { 750 m->state = MBR_REMITTED; 751 in_flight = m->advertised - remitted; 752 } 753 /* All messages preceding the REMIT have been read */ 754 if (m->advertised <= remitted) { 755 m->state = MBR_JOINED; 756 in_flight = 0; 757 } 758 /* ..and the REMIT overtaken by more messages => re-advertise */ 759 if (m->advertised < remitted) 760 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 761 762 m->advertised = ADV_IDLE + in_flight; 763 764 /* Set oldest pending member to active and advertise */ 765 if (list_empty(&grp->pending)) 766 return; 767 pm = list_first_entry(&grp->pending, struct tipc_member, list); 768 pm->state = MBR_ACTIVE; 769 list_move_tail(&pm->list, &grp->active); 770 grp->active_cnt++; 771 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 772 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 773 return; 774 default: 775 pr_warn("Received unknown GROUP_PROTO message\n"); 776 } 777 } 778 779 /* tipc_group_member_evt() - receive and handle a member up/down event 780 */ 781 void tipc_group_member_evt(struct tipc_group *grp, 782 bool *usr_wakeup, 783 int *sk_rcvbuf, 784 struct sk_buff *skb, 785 struct sk_buff_head *inputq, 786 struct sk_buff_head *xmitq) 787 { 788 struct tipc_msg *hdr = buf_msg(skb); 789 struct tipc_event *evt = (void *)msg_data(hdr); 790 u32 instance = evt->found_lower; 791 u32 node = evt->port.node; 792 u32 port = evt->port.ref; 793 int event = evt->event; 794 struct tipc_member *m; 795 struct net *net; 796 bool node_up; 797 u32 self; 798 799 if (!grp) 800 goto drop; 801 802 net = grp->net; 803 self = tipc_own_addr(net); 804 if (!grp->loopback && node == self && port == grp->portid) 805 goto drop; 806 807 /* Convert message before delivery to user */ 808 msg_set_hdr_sz(hdr, GROUP_H_SIZE); 809 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE); 810 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT); 811 msg_set_origport(hdr, port); 812 msg_set_orignode(hdr, node); 813 msg_set_nametype(hdr, grp->type); 814 msg_set_grp_evt(hdr, event); 815 816 m = tipc_group_find_member(grp, node, port); 817 818 if (event == TIPC_PUBLISHED) { 819 if (!m) 820 m = tipc_group_create_member(grp, node, port, 821 MBR_DISCOVERED); 822 if (!m) 823 goto drop; 824 825 /* Hold back event if JOIN message not yet received */ 826 if (m->state == MBR_DISCOVERED) { 827 m->event_msg = skb; 828 m->state = MBR_PUBLISHED; 829 } else { 830 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 831 __skb_queue_tail(inputq, skb); 832 m->state = MBR_JOINED; 833 *usr_wakeup = true; 834 m->usr_pending = false; 835 } 836 m->instance = instance; 837 TIPC_SKB_CB(skb)->orig_member = m->instance; 838 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 839 if (m->window < ADV_IDLE) 840 tipc_group_update_member(m, 0); 841 else 842 list_del_init(&m->congested); 843 } else if (event == TIPC_WITHDRAWN) { 844 if (!m) 845 goto drop; 846 847 TIPC_SKB_CB(skb)->orig_member = m->instance; 848 849 *usr_wakeup = true; 850 m->usr_pending = false; 851 node_up = tipc_node_is_up(net, node); 852 853 /* Hold back event if more messages might be expected */ 854 if (m->state != MBR_LEAVING && node_up) { 855 m->event_msg = skb; 856 tipc_group_decr_active(grp, m); 857 m->state = MBR_LEAVING; 858 } else { 859 if (node_up) 860 msg_set_grp_bc_seqno(hdr, m->bc_syncpt); 861 else 862 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt); 863 __skb_queue_tail(inputq, skb); 864 } 865 list_del_init(&m->congested); 866 } 867 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 868 return; 869 drop: 870 kfree_skb(skb); 871 } 872