1 /* 2 * net/tipc/group.c: TIPC group messaging code 3 * 4 * Copyright (c) 2017, Ericsson AB 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions are met: 9 * 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the names of the copyright holders nor the names of its 16 * contributors may be used to endorse or promote products derived from 17 * this software without specific prior written permission. 18 * 19 * Alternatively, this software may be distributed under the terms of the 20 * GNU General Public License ("GPL") version 2 as published by the Free 21 * Software Foundation. 22 * 23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 * POSSIBILITY OF SUCH DAMAGE. 34 */ 35 36 #include "core.h" 37 #include "addr.h" 38 #include "group.h" 39 #include "bcast.h" 40 #include "topsrv.h" 41 #include "msg.h" 42 #include "socket.h" 43 #include "node.h" 44 #include "name_table.h" 45 #include "subscr.h" 46 47 #define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1) 48 #define ADV_IDLE ADV_UNIT 49 #define ADV_ACTIVE (ADV_UNIT * 12) 50 51 enum mbr_state { 52 MBR_JOINING, 53 MBR_PUBLISHED, 54 MBR_JOINED, 55 MBR_PENDING, 56 MBR_ACTIVE, 57 MBR_RECLAIMING, 58 MBR_REMITTED, 59 MBR_LEAVING 60 }; 61 62 struct tipc_member { 63 struct rb_node tree_node; 64 struct list_head list; 65 struct list_head small_win; 66 struct sk_buff_head deferredq; 67 struct tipc_group *group; 68 u32 node; 69 u32 port; 70 u32 instance; 71 enum mbr_state state; 72 u16 advertised; 73 u16 window; 74 u16 bc_rcv_nxt; 75 u16 bc_syncpt; 76 u16 bc_acked; 77 }; 78 79 struct tipc_group { 80 struct rb_root members; 81 struct list_head small_win; 82 struct list_head pending; 83 struct list_head active; 84 struct tipc_nlist dests; 85 struct net *net; 86 int subid; 87 u32 type; 88 u32 instance; 89 u32 scope; 90 u32 portid; 91 u16 member_cnt; 92 u16 active_cnt; 93 u16 max_active; 94 u16 bc_snd_nxt; 95 u16 bc_ackers; 96 bool *open; 97 bool loopback; 98 bool events; 99 }; 100 101 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 102 int mtyp, struct sk_buff_head *xmitq); 103 104 static void tipc_group_open(struct tipc_member *m, bool *wakeup) 105 { 106 *wakeup = false; 107 if (list_empty(&m->small_win)) 108 return; 109 list_del_init(&m->small_win); 110 *m->group->open = true; 111 *wakeup = true; 112 } 113 114 static void tipc_group_decr_active(struct tipc_group *grp, 115 struct tipc_member *m) 116 { 117 if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING || 118 m->state == MBR_REMITTED) 119 grp->active_cnt--; 120 } 121 122 static int tipc_group_rcvbuf_limit(struct tipc_group *grp) 123 { 124 int max_active, active_pool, idle_pool; 125 int mcnt = grp->member_cnt + 1; 126 127 /* Limit simultaneous reception from other members */ 128 max_active = min(mcnt / 8, 64); 129 max_active = max(max_active, 16); 130 grp->max_active = max_active; 131 132 /* Reserve blocks for active and idle members */ 133 active_pool = max_active * ADV_ACTIVE; 134 idle_pool = (mcnt - max_active) * ADV_IDLE; 135 136 /* Scale to bytes, considering worst-case truesize/msgsize ratio */ 137 return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4; 138 } 139 140 u16 tipc_group_bc_snd_nxt(struct tipc_group *grp) 141 { 142 return grp->bc_snd_nxt; 143 } 144 145 static bool tipc_group_is_receiver(struct tipc_member *m) 146 { 147 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING; 148 } 149 150 static bool tipc_group_is_sender(struct tipc_member *m) 151 { 152 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED; 153 } 154 155 u32 tipc_group_exclude(struct tipc_group *grp) 156 { 157 if (!grp->loopback) 158 return grp->portid; 159 return 0; 160 } 161 162 struct tipc_group *tipc_group_create(struct net *net, u32 portid, 163 struct tipc_group_req *mreq, 164 bool *group_is_open) 165 { 166 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS; 167 bool global = mreq->scope != TIPC_NODE_SCOPE; 168 struct tipc_group *grp; 169 u32 type = mreq->type; 170 171 grp = kzalloc(sizeof(*grp), GFP_ATOMIC); 172 if (!grp) 173 return NULL; 174 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 175 INIT_LIST_HEAD(&grp->small_win); 176 INIT_LIST_HEAD(&grp->active); 177 INIT_LIST_HEAD(&grp->pending); 178 grp->members = RB_ROOT; 179 grp->net = net; 180 grp->portid = portid; 181 grp->type = type; 182 grp->instance = mreq->instance; 183 grp->scope = mreq->scope; 184 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 185 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 186 grp->open = group_is_open; 187 *grp->open = false; 188 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE; 189 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, 190 filter, &grp->subid)) 191 return grp; 192 kfree(grp); 193 return NULL; 194 } 195 196 void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf) 197 { 198 struct rb_root *tree = &grp->members; 199 struct tipc_member *m, *tmp; 200 struct sk_buff_head xmitq; 201 202 skb_queue_head_init(&xmitq); 203 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 204 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq); 205 tipc_group_update_member(m, 0); 206 } 207 tipc_node_distr_xmit(net, &xmitq); 208 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 209 } 210 211 void tipc_group_delete(struct net *net, struct tipc_group *grp) 212 { 213 struct rb_root *tree = &grp->members; 214 struct tipc_member *m, *tmp; 215 struct sk_buff_head xmitq; 216 217 __skb_queue_head_init(&xmitq); 218 219 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) { 220 tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq); 221 list_del(&m->list); 222 kfree(m); 223 } 224 tipc_node_distr_xmit(net, &xmitq); 225 tipc_nlist_purge(&grp->dests); 226 tipc_topsrv_kern_unsubscr(net, grp->subid); 227 kfree(grp); 228 } 229 230 static struct tipc_member *tipc_group_find_member(struct tipc_group *grp, 231 u32 node, u32 port) 232 { 233 struct rb_node *n = grp->members.rb_node; 234 u64 nkey, key = (u64)node << 32 | port; 235 struct tipc_member *m; 236 237 while (n) { 238 m = container_of(n, struct tipc_member, tree_node); 239 nkey = (u64)m->node << 32 | m->port; 240 if (key < nkey) 241 n = n->rb_left; 242 else if (key > nkey) 243 n = n->rb_right; 244 else 245 return m; 246 } 247 return NULL; 248 } 249 250 static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp, 251 u32 node, u32 port) 252 { 253 struct tipc_member *m; 254 255 m = tipc_group_find_member(grp, node, port); 256 if (m && tipc_group_is_receiver(m)) 257 return m; 258 return NULL; 259 } 260 261 static struct tipc_member *tipc_group_find_node(struct tipc_group *grp, 262 u32 node) 263 { 264 struct tipc_member *m; 265 struct rb_node *n; 266 267 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 268 m = container_of(n, struct tipc_member, tree_node); 269 if (m->node == node) 270 return m; 271 } 272 return NULL; 273 } 274 275 static void tipc_group_add_to_tree(struct tipc_group *grp, 276 struct tipc_member *m) 277 { 278 u64 nkey, key = (u64)m->node << 32 | m->port; 279 struct rb_node **n, *parent = NULL; 280 struct tipc_member *tmp; 281 282 n = &grp->members.rb_node; 283 while (*n) { 284 tmp = container_of(*n, struct tipc_member, tree_node); 285 parent = *n; 286 tmp = container_of(parent, struct tipc_member, tree_node); 287 nkey = (u64)tmp->node << 32 | tmp->port; 288 if (key < nkey) 289 n = &(*n)->rb_left; 290 else if (key > nkey) 291 n = &(*n)->rb_right; 292 else 293 return; 294 } 295 rb_link_node(&m->tree_node, parent, n); 296 rb_insert_color(&m->tree_node, &grp->members); 297 } 298 299 static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 300 u32 node, u32 port, 301 u32 instance, int state) 302 { 303 struct tipc_member *m; 304 305 m = kzalloc(sizeof(*m), GFP_ATOMIC); 306 if (!m) 307 return NULL; 308 INIT_LIST_HEAD(&m->list); 309 INIT_LIST_HEAD(&m->small_win); 310 __skb_queue_head_init(&m->deferredq); 311 m->group = grp; 312 m->node = node; 313 m->port = port; 314 m->instance = instance; 315 m->bc_acked = grp->bc_snd_nxt - 1; 316 grp->member_cnt++; 317 tipc_group_add_to_tree(grp, m); 318 tipc_nlist_add(&grp->dests, m->node); 319 m->state = state; 320 return m; 321 } 322 323 void tipc_group_add_member(struct tipc_group *grp, u32 node, 324 u32 port, u32 instance) 325 { 326 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED); 327 } 328 329 static void tipc_group_delete_member(struct tipc_group *grp, 330 struct tipc_member *m) 331 { 332 rb_erase(&m->tree_node, &grp->members); 333 grp->member_cnt--; 334 335 /* Check if we were waiting for replicast ack from this member */ 336 if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1)) 337 grp->bc_ackers--; 338 339 list_del_init(&m->list); 340 list_del_init(&m->small_win); 341 tipc_group_decr_active(grp, m); 342 343 /* If last member on a node, remove node from dest list */ 344 if (!tipc_group_find_node(grp, m->node)) 345 tipc_nlist_del(&grp->dests, m->node); 346 347 kfree(m); 348 } 349 350 struct tipc_nlist *tipc_group_dests(struct tipc_group *grp) 351 { 352 return &grp->dests; 353 } 354 355 void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 356 int *scope) 357 { 358 seq->type = grp->type; 359 seq->lower = grp->instance; 360 seq->upper = grp->instance; 361 *scope = grp->scope; 362 } 363 364 void tipc_group_update_member(struct tipc_member *m, int len) 365 { 366 struct tipc_group *grp = m->group; 367 struct tipc_member *_m, *tmp; 368 369 if (!tipc_group_is_receiver(m)) 370 return; 371 372 m->window -= len; 373 374 if (m->window >= ADV_IDLE) 375 return; 376 377 list_del_init(&m->small_win); 378 379 /* Sort member into small_window members' list */ 380 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) { 381 if (_m->window > m->window) 382 break; 383 } 384 list_add_tail(&m->small_win, &_m->small_win); 385 } 386 387 void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 388 { 389 u16 prev = grp->bc_snd_nxt - 1; 390 struct tipc_member *m; 391 struct rb_node *n; 392 u16 ackers = 0; 393 394 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 395 m = container_of(n, struct tipc_member, tree_node); 396 if (tipc_group_is_receiver(m)) { 397 tipc_group_update_member(m, len); 398 m->bc_acked = prev; 399 ackers++; 400 } 401 } 402 403 /* Mark number of acknowledges to expect, if any */ 404 if (ack) 405 grp->bc_ackers = ackers; 406 grp->bc_snd_nxt++; 407 } 408 409 bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 410 int len, struct tipc_member **mbr) 411 { 412 struct sk_buff_head xmitq; 413 struct tipc_member *m; 414 int adv, state; 415 416 m = tipc_group_find_dest(grp, dnode, dport); 417 if (!tipc_group_is_receiver(m)) { 418 *mbr = NULL; 419 return false; 420 } 421 *mbr = m; 422 423 if (m->window >= len) 424 return false; 425 426 *grp->open = false; 427 428 /* If not fully advertised, do it now to prevent mutual blocking */ 429 adv = m->advertised; 430 state = m->state; 431 if (state == MBR_JOINED && adv == ADV_IDLE) 432 return true; 433 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 434 return true; 435 if (state == MBR_PENDING && adv == ADV_IDLE) 436 return true; 437 skb_queue_head_init(&xmitq); 438 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq); 439 tipc_node_distr_xmit(grp->net, &xmitq); 440 return true; 441 } 442 443 bool tipc_group_bc_cong(struct tipc_group *grp, int len) 444 { 445 struct tipc_member *m = NULL; 446 447 /* If prev bcast was replicast, reject until all receivers have acked */ 448 if (grp->bc_ackers) { 449 *grp->open = false; 450 return true; 451 } 452 if (list_empty(&grp->small_win)) 453 return false; 454 455 m = list_first_entry(&grp->small_win, struct tipc_member, small_win); 456 if (m->window >= len) 457 return false; 458 459 return tipc_group_cong(grp, m->node, m->port, len, &m); 460 } 461 462 /* tipc_group_sort_msg() - sort msg into queue by bcast sequence number 463 */ 464 static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq) 465 { 466 struct tipc_msg *_hdr, *hdr = buf_msg(skb); 467 u16 bc_seqno = msg_grp_bc_seqno(hdr); 468 struct sk_buff *_skb, *tmp; 469 int mtyp = msg_type(hdr); 470 471 /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */ 472 if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) { 473 skb_queue_walk_safe(defq, _skb, tmp) { 474 _hdr = buf_msg(_skb); 475 if (!less(bc_seqno, msg_grp_bc_seqno(_hdr))) 476 continue; 477 __skb_queue_before(defq, _skb, skb); 478 return; 479 } 480 /* Bcast was not bypassed, - add to tail */ 481 } 482 /* Unicasts are never bypassed, - always add to tail */ 483 __skb_queue_tail(defq, skb); 484 } 485 486 /* tipc_group_filter_msg() - determine if we should accept arriving message 487 */ 488 void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq, 489 struct sk_buff_head *xmitq) 490 { 491 struct sk_buff *skb = __skb_dequeue(inputq); 492 bool ack, deliver, update, leave = false; 493 struct sk_buff_head *defq; 494 struct tipc_member *m; 495 struct tipc_msg *hdr; 496 u32 node, port; 497 int mtyp, blks; 498 499 if (!skb) 500 return; 501 502 hdr = buf_msg(skb); 503 node = msg_orignode(hdr); 504 port = msg_origport(hdr); 505 506 if (!msg_in_group(hdr)) 507 goto drop; 508 509 m = tipc_group_find_member(grp, node, port); 510 if (!tipc_group_is_sender(m)) 511 goto drop; 512 513 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 514 goto drop; 515 516 TIPC_SKB_CB(skb)->orig_member = m->instance; 517 defq = &m->deferredq; 518 tipc_group_sort_msg(skb, defq); 519 520 while ((skb = skb_peek(defq))) { 521 hdr = buf_msg(skb); 522 mtyp = msg_type(hdr); 523 blks = msg_blocks(hdr); 524 deliver = true; 525 ack = false; 526 update = false; 527 528 if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 529 break; 530 531 /* Decide what to do with message */ 532 switch (mtyp) { 533 case TIPC_GRP_MCAST_MSG: 534 if (msg_nameinst(hdr) != grp->instance) { 535 update = true; 536 deliver = false; 537 } 538 /* Fall thru */ 539 case TIPC_GRP_BCAST_MSG: 540 m->bc_rcv_nxt++; 541 ack = msg_grp_bc_ack_req(hdr); 542 break; 543 case TIPC_GRP_UCAST_MSG: 544 break; 545 case TIPC_GRP_MEMBER_EVT: 546 if (m->state == MBR_LEAVING) 547 leave = true; 548 if (!grp->events) 549 deliver = false; 550 break; 551 default: 552 break; 553 } 554 555 /* Execute decisions */ 556 __skb_dequeue(defq); 557 if (deliver) 558 __skb_queue_tail(inputq, skb); 559 else 560 kfree_skb(skb); 561 562 if (ack) 563 tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq); 564 565 if (leave) { 566 __skb_queue_purge(defq); 567 tipc_group_delete_member(grp, m); 568 break; 569 } 570 if (!update) 571 continue; 572 573 tipc_group_update_rcv_win(grp, blks, node, port, xmitq); 574 } 575 return; 576 drop: 577 kfree_skb(skb); 578 } 579 580 void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 581 u32 port, struct sk_buff_head *xmitq) 582 { 583 struct list_head *active = &grp->active; 584 int max_active = grp->max_active; 585 int reclaim_limit = max_active * 3 / 4; 586 int active_cnt = grp->active_cnt; 587 struct tipc_member *m, *rm, *pm; 588 589 m = tipc_group_find_member(grp, node, port); 590 if (!m) 591 return; 592 593 m->advertised -= blks; 594 595 switch (m->state) { 596 case MBR_JOINED: 597 /* First, decide if member can go active */ 598 if (active_cnt <= max_active) { 599 m->state = MBR_ACTIVE; 600 list_add_tail(&m->list, active); 601 grp->active_cnt++; 602 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 603 } else { 604 m->state = MBR_PENDING; 605 list_add_tail(&m->list, &grp->pending); 606 } 607 608 if (active_cnt < reclaim_limit) 609 break; 610 611 /* Reclaim from oldest active member, if possible */ 612 if (!list_empty(active)) { 613 rm = list_first_entry(active, struct tipc_member, list); 614 rm->state = MBR_RECLAIMING; 615 list_del_init(&rm->list); 616 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 617 break; 618 } 619 /* Nobody to reclaim from; - revert oldest pending to JOINED */ 620 pm = list_first_entry(&grp->pending, struct tipc_member, list); 621 list_del_init(&pm->list); 622 pm->state = MBR_JOINED; 623 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 624 break; 625 case MBR_ACTIVE: 626 if (!list_is_last(&m->list, &grp->active)) 627 list_move_tail(&m->list, &grp->active); 628 if (m->advertised > (ADV_ACTIVE * 3 / 4)) 629 break; 630 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 631 break; 632 case MBR_REMITTED: 633 if (m->advertised > ADV_IDLE) 634 break; 635 m->state = MBR_JOINED; 636 grp->active_cnt--; 637 if (m->advertised < ADV_IDLE) { 638 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 639 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 640 } 641 642 if (list_empty(&grp->pending)) 643 return; 644 645 /* Set oldest pending member to active and advertise */ 646 pm = list_first_entry(&grp->pending, struct tipc_member, list); 647 pm->state = MBR_ACTIVE; 648 list_move_tail(&pm->list, &grp->active); 649 grp->active_cnt++; 650 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 651 break; 652 case MBR_RECLAIMING: 653 case MBR_JOINING: 654 case MBR_LEAVING: 655 default: 656 break; 657 } 658 } 659 660 static void tipc_group_create_event(struct tipc_group *grp, 661 struct tipc_member *m, 662 u32 event, u16 seqno, 663 struct sk_buff_head *inputq) 664 { u32 dnode = tipc_own_addr(grp->net); 665 struct tipc_event evt; 666 struct sk_buff *skb; 667 struct tipc_msg *hdr; 668 669 memset(&evt, 0, sizeof(evt)); 670 evt.event = event; 671 evt.found_lower = m->instance; 672 evt.found_upper = m->instance; 673 evt.port.ref = m->port; 674 evt.port.node = m->node; 675 evt.s.seq.type = grp->type; 676 evt.s.seq.lower = m->instance; 677 evt.s.seq.upper = m->instance; 678 679 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT, 680 GROUP_H_SIZE, sizeof(evt), dnode, m->node, 681 grp->portid, m->port, 0); 682 if (!skb) 683 return; 684 685 hdr = buf_msg(skb); 686 msg_set_nametype(hdr, grp->type); 687 msg_set_grp_evt(hdr, event); 688 msg_set_dest_droppable(hdr, true); 689 msg_set_grp_bc_seqno(hdr, seqno); 690 memcpy(msg_data(hdr), &evt, sizeof(evt)); 691 TIPC_SKB_CB(skb)->orig_member = m->instance; 692 __skb_queue_tail(inputq, skb); 693 } 694 695 static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 696 int mtyp, struct sk_buff_head *xmitq) 697 { 698 struct tipc_msg *hdr; 699 struct sk_buff *skb; 700 int adv = 0; 701 702 skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0, 703 m->node, tipc_own_addr(grp->net), 704 m->port, grp->portid, 0); 705 if (!skb) 706 return; 707 708 if (m->state == MBR_ACTIVE) 709 adv = ADV_ACTIVE - m->advertised; 710 else if (m->state == MBR_JOINED || m->state == MBR_PENDING) 711 adv = ADV_IDLE - m->advertised; 712 713 hdr = buf_msg(skb); 714 715 if (mtyp == GRP_JOIN_MSG) { 716 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 717 msg_set_adv_win(hdr, adv); 718 m->advertised += adv; 719 } else if (mtyp == GRP_LEAVE_MSG) { 720 msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt); 721 } else if (mtyp == GRP_ADV_MSG) { 722 msg_set_adv_win(hdr, adv); 723 m->advertised += adv; 724 } else if (mtyp == GRP_ACK_MSG) { 725 msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt); 726 } else if (mtyp == GRP_REMIT_MSG) { 727 msg_set_grp_remitted(hdr, m->window); 728 } 729 msg_set_dest_droppable(hdr, true); 730 __skb_queue_tail(xmitq, skb); 731 } 732 733 void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup, 734 struct tipc_msg *hdr, struct sk_buff_head *inputq, 735 struct sk_buff_head *xmitq) 736 { 737 u32 node = msg_orignode(hdr); 738 u32 port = msg_origport(hdr); 739 struct tipc_member *m, *pm; 740 u16 remitted, in_flight; 741 742 if (!grp) 743 return; 744 745 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net)) 746 return; 747 748 m = tipc_group_find_member(grp, node, port); 749 750 switch (msg_type(hdr)) { 751 case GRP_JOIN_MSG: 752 if (!m) 753 m = tipc_group_create_member(grp, node, port, 754 0, MBR_JOINING); 755 if (!m) 756 return; 757 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 758 m->bc_rcv_nxt = m->bc_syncpt; 759 m->window += msg_adv_win(hdr); 760 761 /* Wait until PUBLISH event is received if necessary */ 762 if (m->state != MBR_PUBLISHED) 763 return; 764 765 /* Member can be taken into service */ 766 m->state = MBR_JOINED; 767 tipc_group_open(m, usr_wakeup); 768 tipc_group_update_member(m, 0); 769 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 770 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 771 m->bc_syncpt, inputq); 772 return; 773 case GRP_LEAVE_MSG: 774 if (!m) 775 return; 776 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 777 list_del_init(&m->list); 778 tipc_group_open(m, usr_wakeup); 779 tipc_group_decr_active(grp, m); 780 m->state = MBR_LEAVING; 781 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 782 m->bc_syncpt, inputq); 783 return; 784 case GRP_ADV_MSG: 785 if (!m) 786 return; 787 m->window += msg_adv_win(hdr); 788 tipc_group_open(m, usr_wakeup); 789 return; 790 case GRP_ACK_MSG: 791 if (!m) 792 return; 793 m->bc_acked = msg_grp_bc_acked(hdr); 794 if (--grp->bc_ackers) 795 return; 796 list_del_init(&m->small_win); 797 *m->group->open = true; 798 *usr_wakeup = true; 799 tipc_group_update_member(m, 0); 800 return; 801 case GRP_RECLAIM_MSG: 802 if (!m) 803 return; 804 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 805 m->window = ADV_IDLE; 806 tipc_group_open(m, usr_wakeup); 807 return; 808 case GRP_REMIT_MSG: 809 if (!m || m->state != MBR_RECLAIMING) 810 return; 811 812 remitted = msg_grp_remitted(hdr); 813 814 /* Messages preceding the REMIT still in receive queue */ 815 if (m->advertised > remitted) { 816 m->state = MBR_REMITTED; 817 in_flight = m->advertised - remitted; 818 m->advertised = ADV_IDLE + in_flight; 819 return; 820 } 821 /* This should never happen */ 822 if (m->advertised < remitted) 823 pr_warn_ratelimited("Unexpected REMIT msg\n"); 824 825 /* All messages preceding the REMIT have been read */ 826 m->state = MBR_JOINED; 827 grp->active_cnt--; 828 m->advertised = ADV_IDLE; 829 830 /* Set oldest pending member to active and advertise */ 831 if (list_empty(&grp->pending)) 832 return; 833 pm = list_first_entry(&grp->pending, struct tipc_member, list); 834 pm->state = MBR_ACTIVE; 835 list_move_tail(&pm->list, &grp->active); 836 grp->active_cnt++; 837 if (pm->advertised <= (ADV_ACTIVE * 3 / 4)) 838 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 839 return; 840 default: 841 pr_warn("Received unknown GROUP_PROTO message\n"); 842 } 843 } 844 845 /* tipc_group_member_evt() - receive and handle a member up/down event 846 */ 847 void tipc_group_member_evt(struct tipc_group *grp, 848 bool *usr_wakeup, 849 int *sk_rcvbuf, 850 struct tipc_msg *hdr, 851 struct sk_buff_head *inputq, 852 struct sk_buff_head *xmitq) 853 { 854 struct tipc_event *evt = (void *)msg_data(hdr); 855 u32 instance = evt->found_lower; 856 u32 node = evt->port.node; 857 u32 port = evt->port.ref; 858 int event = evt->event; 859 struct tipc_member *m; 860 struct net *net; 861 u32 self; 862 863 if (!grp) 864 return; 865 866 net = grp->net; 867 self = tipc_own_addr(net); 868 if (!grp->loopback && node == self && port == grp->portid) 869 return; 870 871 m = tipc_group_find_member(grp, node, port); 872 873 switch (event) { 874 case TIPC_PUBLISHED: 875 /* Send and wait for arrival of JOIN message if necessary */ 876 if (!m) { 877 m = tipc_group_create_member(grp, node, port, instance, 878 MBR_PUBLISHED); 879 if (!m) 880 break; 881 tipc_group_update_member(m, 0); 882 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 883 break; 884 } 885 886 if (m->state != MBR_JOINING) 887 break; 888 889 /* Member can be taken into service */ 890 m->instance = instance; 891 m->state = MBR_JOINED; 892 tipc_group_open(m, usr_wakeup); 893 tipc_group_update_member(m, 0); 894 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 895 tipc_group_create_event(grp, m, TIPC_PUBLISHED, 896 m->bc_syncpt, inputq); 897 break; 898 case TIPC_WITHDRAWN: 899 if (!m) 900 break; 901 902 tipc_group_decr_active(grp, m); 903 m->state = MBR_LEAVING; 904 list_del_init(&m->list); 905 tipc_group_open(m, usr_wakeup); 906 907 /* Only send event if no LEAVE message can be expected */ 908 if (!tipc_node_is_up(net, node)) 909 tipc_group_create_event(grp, m, TIPC_WITHDRAWN, 910 m->bc_rcv_nxt, inputq); 911 break; 912 default: 913 break; 914 } 915 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 916 } 917 918 int tipc_group_fill_sock_diag(struct tipc_group *grp, struct sk_buff *skb) 919 { 920 struct nlattr *group = nla_nest_start(skb, TIPC_NLA_SOCK_GROUP); 921 922 if (!group) 923 return -EMSGSIZE; 924 925 if (nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_ID, 926 grp->type) || 927 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_INSTANCE, 928 grp->instance) || 929 nla_put_u32(skb, TIPC_NLA_SOCK_GROUP_BC_SEND_NEXT, 930 grp->bc_snd_nxt)) 931 goto group_msg_cancel; 932 933 if (grp->scope == TIPC_NODE_SCOPE) 934 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_NODE_SCOPE)) 935 goto group_msg_cancel; 936 937 if (grp->scope == TIPC_CLUSTER_SCOPE) 938 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_CLUSTER_SCOPE)) 939 goto group_msg_cancel; 940 941 if (*grp->open) 942 if (nla_put_flag(skb, TIPC_NLA_SOCK_GROUP_OPEN)) 943 goto group_msg_cancel; 944 945 nla_nest_end(skb, group); 946 return 0; 947 948 group_msg_cancel: 949 nla_nest_cancel(skb, group); 950 return -1; 951 } 952