1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "server.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_JOINING, 53 MBR_PUBLISHED, 54 MBR_JOINED, 55 MBR_PENDING, 56 MBR_ACTIVE, 57 MBR_RECLAIMING, 58 MBR_REMITTED, 59 MBR_LEAVING 60 }; 61 62 struct tipc_member { 63 struct rb_node tree_node; 64 struct list_head list; 65 struct list_head small_win; 66 struct sk_buff_head deferredq; 67 struct tipc_group *group; 68 u32 node; 69 u32 port; 70 u32 instance; 71 enum mbr_state state; 72 u16 advertised; 73 u16 window; 74 u16 bc_rcv_nxt; 75 u16 bc_syncpt; 76 u16 bc_acked; 77 }; 78 79 struct tipc_group { 80 struct rb_root members; 81 struct list_head small_win; 82 struct list_head pending; 83 struct list_head active; 84 struct tipc_nlist dests; 85 struct net *net; 86 int subid; 87 u32 type; 88 u32 instance; 89 u32 scope; 90 u32 portid; 91 u16 member_cnt; 92 u16 active_cnt; 93 u16 max_active; 94 u16 bc_snd_nxt; 95 u16 bc_ackers; 96 bool *open; 97 bool loopback; 98 bool events; 99 }; 100 101 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 102 int mtyp, struct sk_buff_head *xmitq); 103 104 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 105 { 106 *wakeup = false; 107 if (list_empty(&m->small_win)) 108 return; 109 list_del_init(&m->small_win); 110 *m->group->open = true; 111 *wakeup = true; 112 } 113 114 static void tipc_group_decr_active(struct tipc_group *grp, 115 struct tipc_member *m) 116 { 117 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 118 m->state == MBR_REMITTED) 119 grp->active_cnt--; 120 } 121 122 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 123 { 124 int max_active, active_pool, idle_pool; 125 int mcnt = grp->member_cnt + 1; 126 127 /* Limit simultaneous reception from other members */ 128 max_active = min(mcnt / 8, 64); 129 max_active = max(max_active, 16); 130 grp->max_active = max_active; 131 132 /* Reserve blocks for active and idle members */ 133 active_pool = max_active * ADV_ACTIVE; 134 idle_pool = (mcnt - max_active) * ADV_IDLE; 135 136 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 137 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 138 } 139 140 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 141 { 142 return grp->bc_snd_nxt; 143 } 144 145 static bool tipc_group_is_receiver(struct tipc_member *m) 146 { 147 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 148 } 149 150 static bool tipc_group_is_sender(struct tipc_member *m) 151 { 152 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 153 } 154 155 u32 tipc_group_exclude(struct tipc_group *grp) 156 { 157 if (!grp->loopback) 158 return grp->portid; 159 return 0; 160 } 161 162 int tipc_group_size(struct tipc_group *grp) 163 { 164 return grp->member_cnt; 165 } 166 167 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 168 struct tipc_group_req *mreq, 169 bool *group_is_open) 170 { 171 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 172 bool global = mreq->scope != TIPC_NODE_SCOPE; 173 struct tipc_group *grp; 174 u32 type = mreq->type; 175 176 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 177 if (!grp) 178 return NULL; 179 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 180 INIT_LIST_HEAD(&grp->small_win); 181 INIT_LIST_HEAD(&grp->active); 182 INIT_LIST_HEAD(&grp->pending); 183 grp->members = RB_ROOT; 184 grp->net = net; 185 grp->portid = portid; 186 grp->type = type; 187 grp->instance = mreq->instance; 188 grp->scope = mreq->scope; 189 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 190 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 191 grp->open = group_is_open; 192 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 193 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 194 filter, &grp->subid)) 195 return grp; 196 kfree(grp); 197 return NULL; 198 } 199 200 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 201 { 202 struct rb_root *tree = &grp->members; 203 struct tipc_member *m, *tmp; 204 struct sk_buff_head xmitq; 205 206 skb_queue_head_init(&xmitq); 207 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 208 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 209 tipc_group_update_member(m, 0); 210 } 211 tipc_node_distr_xmit(net, &xmitq); 212 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 213 } 214 215 void tipc_group_delete(struct net *net, struct tipc_group *grp) 216 { 217 struct rb_root *tree = &grp->members; 218 struct tipc_member *m, *tmp; 219 struct sk_buff_head xmitq; 220 221 __skb_queue_head_init(&xmitq); 222 223 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 224 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 225 list_del(&m->list); 226 kfree(m); 227 } 228 tipc_node_distr_xmit(net, &xmitq); 229 tipc_nlist_purge(&grp->dests); 230 tipc_topsrv_kern_unsubscr(net, grp->subid); 231 kfree(grp); 232 } 233 234 struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 235 u32 node, u32 port) 236 { 237 struct rb_node *n = grp->members.rb_node; 238 u64 nkey, key = (u64)node << 32 | port; 239 struct tipc_member *m; 240 241 while (n) { 242 m = container_of(n, struct tipc_member, tree_node); 243 nkey = (u64)m->node << 32 | m->port; 244 if (key < nkey) 245 n = n->rb_left; 246 else if (key > nkey) 247 n = n->rb_right; 248 else 249 return m; 250 } 251 return NULL; 252 } 253 254 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 255 u32 node, u32 port) 256 { 257 struct tipc_member *m; 258 259 m = tipc_group_find_member(grp, node, port); 260 if (m && tipc_group_is_receiver(m)) 261 return m; 262 return NULL; 263 } 264 265 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 266 u32 node) 267 { 268 struct tipc_member *m; 269 struct rb_node *n; 270 271 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 272 m = container_of(n, struct tipc_member, tree_node); 273 if (m->node == node) 274 return m; 275 } 276 return NULL; 277 } 278 279 static void tipc_group_add_to_tree(struct tipc_group *grp, 280 struct tipc_member *m) 281 { 282 u64 nkey, key = (u64)m->node << 32 | m->port; 283 struct rb_node **n, *parent = NULL; 284 struct tipc_member *tmp; 285 286 n = &grp->members.rb_node; 287 while (*n) { 288 tmp = container_of(*n, struct tipc_member, tree_node); 289 parent = *n; 290 tmp = container_of(parent, struct tipc_member, tree_node); 291 nkey = (u64)tmp->node << 32 | tmp->port; 292 if (key < nkey) 293 n = &(*n)->rb_left; 294 else if (key > nkey) 295 n = &(*n)->rb_right; 296 else 297 return; 298 } 299 rb_link_node(&m->tree_node, parent, n); 300 rb_insert_color(&m->tree_node, &grp->members); 301 } 302 303 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 304 u32 node, u32 port, 305 u32 instance, int state) 306 { 307 struct tipc_member *m; 308 309 m = kzalloc(sizeof(*m), GFP_ATOMIC); 310 if (!m) 311 return NULL; 312 INIT_LIST_HEAD(&m->list); 313 INIT_LIST_HEAD(&m->small_win); 314 __skb_queue_head_init(&m->deferredq); 315 m->group = grp; 316 m->node = node; 317 m->port = port; 318 m->instance = instance; 319 m->bc_acked = grp->bc_snd_nxt - 1; 320 grp->member_cnt++; 321 tipc_group_add_to_tree(grp, m); 322 tipc_nlist_add(&grp->dests, m->node); 323 m->state = state; 324 return m; 325 } 326 327 void tipc_group_add_member(struct tipc_group *grp, u32 node, 328 u32 port, u32 instance) 329 { 330 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 331 } 332 333 static void tipc_group_delete_member(struct tipc_group *grp, 334 struct tipc_member *m) 335 { 336 rb_erase(&m->tree_node, &grp->members); 337 grp->member_cnt--; 338 339 /* Check if we were waiting for replicast ack from this member */ 340 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 341 grp->bc_ackers--; 342 343 list_del_init(&m->list); 344 list_del_init(&m->small_win); 345 tipc_group_decr_active(grp, m); 346 347 /* If last member on a node, remove node from dest list */ 348 if (!tipc_group_find_node(grp, m->node)) 349 tipc_nlist_del(&grp->dests, m->node); 350 351 kfree(m); 352 } 353 354 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 355 { 356 return &grp->dests; 357 } 358 359 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 360 int *scope) 361 { 362 seq->type = grp->type; 363 seq->lower = grp->instance; 364 seq->upper = grp->instance; 365 *scope = grp->scope; 366 } 367 368 void tipc_group_update_member(struct tipc_member *m, int len) 369 { 370 struct tipc_group *grp = m->group; 371 struct tipc_member *_m, *tmp; 372 373 if (!tipc_group_is_receiver(m)) 374 return; 375 376 m->window -= len; 377 378 if (m->window >= ADV_IDLE) 379 return; 380 381 list_del_init(&m->small_win); 382 383 /* Sort member into small_window members' list */ 384 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 385 if (_m->window > m->window) 386 break; 387 } 388 list_add_tail(&m->small_win, &_m->small_win); 389 } 390 391 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 392 { 393 u16 prev = grp->bc_snd_nxt - 1; 394 struct tipc_member *m; 395 struct rb_node *n; 396 u16 ackers = 0; 397 398 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 399 m = container_of(n, struct tipc_member, tree_node); 400 if (tipc_group_is_receiver(m)) { 401 tipc_group_update_member(m, len); 402 m->bc_acked = prev; 403 ackers++; 404 } 405 } 406 407 /* Mark number of acknowledges to expect, if any */ 408 if (ack) 409 grp->bc_ackers = ackers; 410 grp->bc_snd_nxt++; 411 } 412 413 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 414 int len, struct tipc_member **mbr) 415 { 416 struct sk_buff_head xmitq; 417 struct tipc_member *m; 418 int adv, state; 419 420 m = tipc_group_find_dest(grp, dnode, dport); 421 if (!tipc_group_is_receiver(m)) { 422 *mbr = NULL; 423 return false; 424 } 425 *mbr = m; 426 427 if (m->window >= len) 428 return false; 429 430 *grp->open = false; 431 432 /* If not fully advertised, do it now to prevent mutual blocking */ 433 adv = m->advertised; 434 state = m->state; 435 if (state == MBR_JOINED && adv == ADV_IDLE) 436 return true; 437 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 438 return true; 439 if (state == MBR_PENDING && adv == ADV_IDLE) 440 return true; 441 skb_queue_head_init(&xmitq); 442 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 443 tipc_node_distr_xmit(grp->net, &xmitq); 444 return true; 445 } 446 447 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 448 { 449 struct tipc_member *m = NULL; 450 451 /* If prev bcast was replicast, reject until all receivers have acked */ 452 if (grp->bc_ackers) { 453 *grp->open = false; 454 return true; 455 } 456 if (list_empty(&grp->small_win)) 457 return false; 458 459 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 460 if (m->window >= len) 461 return false; 462 463 return tipc_group_cong(grp, m->node, m->port, len, &m); 464 } 465 466 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 467 */ 468 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 469 { 470 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 471 u16 bc_seqno = msg_grp_bc_seqno(hdr); 472 struct sk_buff *_skb, *tmp; 473 int mtyp = msg_type(hdr); 474 475 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 476 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 477 skb_queue_walk_safe(defq, _skb, tmp) { 478 _hdr = buf_msg(_skb); 479 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 480 continue; 481 __skb_queue_before(defq, _skb, skb); 482 return; 483 } 484 /* Bcast was not bypassed, - add to tail */ 485 } 486 /* Unicasts are never bypassed, - always add to tail */ 487 __skb_queue_tail(defq, skb); 488 } 489 490 /* tipc_group_filter_msg() - determine if we should accept arriving message 491 */ 492 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 493 struct sk_buff_head *xmitq) 494 { 495 struct sk_buff *skb = __skb_dequeue(inputq); 496 bool ack, deliver, update, leave = false; 497 struct sk_buff_head *defq; 498 struct tipc_member *m; 499 struct tipc_msg *hdr; 500 u32 node, port; 501 int mtyp, blks; 502 503 if (!skb) 504 return; 505 506 hdr = buf_msg(skb); 507 node = msg_orignode(hdr); 508 port = msg_origport(hdr); 509 510 if (!msg_in_group(hdr)) 511 goto drop; 512 513 m = tipc_group_find_member(grp, node, port); 514 if (!tipc_group_is_sender(m)) 515 goto drop; 516 517 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 518 goto drop; 519 520 TIPC_SKB_CB(skb)->orig_member = m->instance; 521 defq = &m->deferredq; 522 tipc_group_sort_msg(skb, defq); 523 524 while ((skb = skb_peek(defq))) { 525 hdr = buf_msg(skb); 526 mtyp = msg_type(hdr); 527 blks = msg_blocks(hdr); 528 deliver = true; 529 ack = false; 530 update = false; 531 532 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 533 break; 534 535 /* Decide what to do with message */ 536 switch (mtyp) { 537 case TIPC_GRP_MCAST_MSG: 538 if (msg_nameinst(hdr) != grp->instance) { 539 update = true; 540 deliver = false; 541 } 542 /* Fall thru */ 543 case TIPC_GRP_BCAST_MSG: 544 m->bc_rcv_nxt++; 545 ack = msg_grp_bc_ack_req(hdr); 546 break; 547 case TIPC_GRP_UCAST_MSG: 548 break; 549 case TIPC_GRP_MEMBER_EVT: 550 if (m->state == MBR_LEAVING) 551 leave = true; 552 if (!grp->events) 553 deliver = false; 554 break; 555 default: 556 break; 557 } 558 559 /* Execute decisions */ 560 __skb_dequeue(defq); 561 if (deliver) 562 __skb_queue_tail(inputq, skb); 563 else 564 kfree_skb(skb); 565 566 if (ack) 567 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 568 569 if (leave) { 570 __skb_queue_purge(defq); 571 tipc_group_delete_member(grp, m); 572 break; 573 } 574 if (!update) 575 continue; 576 577 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 578 } 579 return; 580 drop: 581 kfree_skb(skb); 582 } 583 584 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 585 u32 port, struct sk_buff_head *xmitq) 586 { 587 struct list_head *active = &grp->active; 588 int max_active = grp->max_active; 589 int reclaim_limit = max_active * 3 / 4; 590 int active_cnt = grp->active_cnt; 591 struct tipc_member *m, *rm, *pm; 592 593 m = tipc_group_find_member(grp, node, port); 594 if (!m) 595 return; 596 597 m->advertised -= blks; 598 599 switch (m->state) { 600 case MBR_JOINED: 601 /* First, decide if member can go active */ 602 if (active_cnt <= max_active) { 603 m->state = MBR_ACTIVE; 604 list_add_tail(&m->list, active); 605 grp->active_cnt++; 606 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 607 } else { 608 m->state = MBR_PENDING; 609 list_add_tail(&m->list, &grp->pending); 610 } 611 612 if (active_cnt < reclaim_limit) 613 break; 614 615 /* Reclaim from oldest active member, if possible */ 616 if (!list_empty(active)) { 617 rm = list_first_entry(active, struct tipc_member, list); 618 rm->state = MBR_RECLAIMING; 619 list_del_init(&rm->list); 620 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 621 break; 622 } 623 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 624 pm = list_first_entry(&grp->pending, struct tipc_member, list); 625 list_del_init(&pm->list); 626 pm->state = MBR_JOINED; 627 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 628 break; 629 case MBR_ACTIVE: 630 if (!list_is_last(&m->list, &grp->active)) 631 list_move_tail(&m->list, &grp->active); 632 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 633 break; 634 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 635 break; 636 case MBR_REMITTED: 637 if (m->advertised > ADV_IDLE) 638 break; 639 m->state = MBR_JOINED; 640 grp->active_cnt--; 641 if (m->advertised < ADV_IDLE) { 642 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 643 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 644 } 645 646 if (list_empty(&grp->pending)) 647 return; 648 649 /* Set oldest pending member to active and advertise */ 650 pm = list_first_entry(&grp->pending, struct tipc_member, list); 651 pm->state = MBR_ACTIVE; 652 list_move_tail(&pm->list, &grp->active); 653 grp->active_cnt++; 654 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 655 break; 656 case MBR_RECLAIMING: 657 case MBR_JOINING: 658 case MBR_LEAVING: 659 default: 660 break; 661 } 662 } 663 664 static void tipc_group_create_event(struct tipc_group *grp, 665 struct tipc_member *m, 666 u32 event, u16 seqno, 667 struct sk_buff_head *inputq) 668 { u32 dnode = tipc_own_addr(grp->net); 669 struct tipc_event evt; 670 struct sk_buff *skb; 671 struct tipc_msg *hdr; 672 673 evt.event = event; 674 evt.found_lower = m->instance; 675 evt.found_upper = m->instance; 676 evt.port.ref = m->port; 677 evt.port.node = m->node; 678 evt.s.seq.type = grp->type; 679 evt.s.seq.lower = m->instance; 680 evt.s.seq.upper = m->instance; 681 682 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 683 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 684 grp->portid, m->port, 0); 685 if (!skb) 686 return; 687 688 hdr = buf_msg(skb); 689 msg_set_nametype(hdr, grp->type); 690 msg_set_grp_evt(hdr, event); 691 msg_set_dest_droppable(hdr, true); 692 msg_set_grp_bc_seqno(hdr, seqno); 693 memcpy(msg_data(hdr), &evt, sizeof(evt)); 694 TIPC_SKB_CB(skb)->orig_member = m->instance; 695 __skb_queue_tail(inputq, skb); 696 } 697 698 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 699 int mtyp, struct sk_buff_head *xmitq) 700 { 701 struct tipc_msg *hdr; 702 struct sk_buff *skb; 703 int adv = 0; 704 705 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 706 m->node, tipc_own_addr(grp->net), 707 m->port, grp->portid, 0); 708 if (!skb) 709 return; 710 711 if (m->state == MBR_ACTIVE) 712 adv = ADV_ACTIVE - m->advertised; 713 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 714 adv = ADV_IDLE - m->advertised; 715 716 hdr = buf_msg(skb); 717 718 if (mtyp == GRP_JOIN_MSG) { 719 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 720 msg_set_adv_win(hdr, adv); 721 m->advertised += adv; 722 } else if (mtyp == GRP_LEAVE_MSG) { 723 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 724 } else if (mtyp == GRP_ADV_MSG) { 725 msg_set_adv_win(hdr, adv); 726 m->advertised += adv; 727 } else if (mtyp == GRP_ACK_MSG) { 728 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 729 } else if (mtyp == GRP_REMIT_MSG) { 730 msg_set_grp_remitted(hdr, m->window); 731 } 732 msg_set_dest_droppable(hdr, true); 733 __skb_queue_tail(xmitq, skb); 734 } 735 736 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 737 struct tipc_msg *hdr, struct sk_buff_head *inputq, 738 struct sk_buff_head *xmitq) 739 { 740 u32 node = msg_orignode(hdr); 741 u32 port = msg_origport(hdr); 742 struct tipc_member *m, *pm; 743 u16 remitted, in_flight; 744 745 if (!grp) 746 return; 747 748 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 749 return; 750 751 m = tipc_group_find_member(grp, node, port); 752 753 switch (msg_type(hdr)) { 754 case GRP_JOIN_MSG: 755 if (!m) 756 m = tipc_group_create_member(grp, node, port, 757 0, MBR_JOINING); 758 if (!m) 759 return; 760 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 761 m->bc_rcv_nxt = m->bc_syncpt; 762 m->window += msg_adv_win(hdr); 763 764 /* Wait until PUBLISH event is received if necessary */ 765 if (m->state != MBR_PUBLISHED) 766 return; 767 768 /* Member can be taken into service */ 769 m->state = MBR_JOINED; 770 tipc_group_open(m, usr_wakeup); 771 tipc_group_update_member(m, 0); 772 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 773 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 774 m->bc_syncpt, inputq); 775 return; 776 case GRP_LEAVE_MSG: 777 if (!m) 778 return; 779 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 780 list_del_init(&m->list); 781 tipc_group_open(m, usr_wakeup); 782 tipc_group_decr_active(grp, m); 783 m->state = MBR_LEAVING; 784 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 785 m->bc_syncpt, inputq); 786 return; 787 case GRP_ADV_MSG: 788 if (!m) 789 return; 790 m->window += msg_adv_win(hdr); 791 tipc_group_open(m, usr_wakeup); 792 return; 793 case GRP_ACK_MSG: 794 if (!m) 795 return; 796 m->bc_acked = msg_grp_bc_acked(hdr); 797 if (--grp->bc_ackers) 798 return; 799 list_del_init(&m->small_win); 800 *m->group->open = true; 801 *usr_wakeup = true; 802 tipc_group_update_member(m, 0); 803 return; 804 case GRP_RECLAIM_MSG: 805 if (!m) 806 return; 807 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 808 m->window = ADV_IDLE; 809 tipc_group_open(m, usr_wakeup); 810 return; 811 case GRP_REMIT_MSG: 812 if (!m || m->state != MBR_RECLAIMING) 813 return; 814 815 remitted = msg_grp_remitted(hdr); 816 817 /* Messages preceding the REMIT still in receive queue */ 818 if (m->advertised > remitted) { 819 m->state = MBR_REMITTED; 820 in_flight = m->advertised - remitted; 821 m->advertised = ADV_IDLE + in_flight; 822 return; 823 } 824 /* This should never happen */ 825 if (m->advertised < remitted) 826 pr_warn_ratelimited("Unexpected REMIT msg\n"); 827 828 /* All messages preceding the REMIT have been read */ 829 m->state = MBR_JOINED; 830 grp->active_cnt--; 831 m->advertised = ADV_IDLE; 832 833 /* Set oldest pending member to active and advertise */ 834 if (list_empty(&grp->pending)) 835 return; 836 pm = list_first_entry(&grp->pending, struct tipc_member, list); 837 pm->state = MBR_ACTIVE; 838 list_move_tail(&pm->list, &grp->active); 839 grp->active_cnt++; 840 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 841 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 842 return; 843 default: 844 pr_warn("Received unknown GROUP_PROTO message\n"); 845 } 846 } 847 848 /* tipc_group_member_evt() - receive and handle a member up/down event 849 */ 850 void tipc_group_member_evt(struct tipc_group *grp, 851 bool *usr_wakeup, 852 int *sk_rcvbuf, 853 struct tipc_msg *hdr, 854 struct sk_buff_head *inputq, 855 struct sk_buff_head *xmitq) 856 { 857 struct tipc_event *evt = (void *)msg_data(hdr); 858 u32 instance = evt->found_lower; 859 u32 node = evt->port.node; 860 u32 port = evt->port.ref; 861 int event = evt->event; 862 struct tipc_member *m; 863 struct net *net; 864 u32 self; 865 866 if (!grp) 867 return; 868 869 net = grp->net; 870 self = tipc_own_addr(net); 871 if (!grp->loopback && node == self && port == grp->portid) 872 return; 873 874 m = tipc_group_find_member(grp, node, port); 875 876 switch (event) { 877 case TIPC_PUBLISHED: 878 /* Send and wait for arrival of JOIN message if necessary */ 879 if (!m) { 880 m = tipc_group_create_member(grp, node, port, instance, 881 MBR_PUBLISHED); 882 if (!m) 883 break; 884 tipc_group_update_member(m, 0); 885 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 886 break; 887 } 888 889 if (m->state != MBR_JOINING) 890 break; 891 892 /* Member can be taken into service */ 893 m->instance = instance; 894 m->state = MBR_JOINED; 895 tipc_group_open(m, usr_wakeup); 896 tipc_group_update_member(m, 0); 897 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 898 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 899 m->bc_syncpt, inputq); 900 break; 901 case TIPC_WITHDRAWN: 902 if (!m) 903 break; 904 905 tipc_group_decr_active(grp, m); 906 m->state = MBR_LEAVING; 907 list_del_init(&m->list); 908 tipc_group_open(m, usr_wakeup); 909 910 /* Only send event if no LEAVE message can be expected */ 911 if (!tipc_node_is_up(net, node)) 912 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 913 m->bc_rcv_nxt, inputq); 914 break; 915 default: 916 break; 917 } 918 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 919 } 920