1 /* 2 * net/tipc/link.c: TIPC link code 3 * 4 * Copyright (c) 1996-2007, 2012-2014, Ericsson AB 5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "link.h" 39 #include "socket.h" 40 #include "name_distr.h" 41 #include "discover.h" 42 #include "config.h" 43 44 #include <linux/pkt_sched.h> 45 46 /* 47 * Error message prefixes 48 */ 49 static const char *link_co_err = "Link changeover error, "; 50 static const char *link_rst_msg = "Resetting link "; 51 static const char *link_unk_evt = "Unknown link event "; 52 53 /* 54 * Out-of-range value for link session numbers 55 */ 56 #define INVALID_SESSION 0x10000 57 58 /* 59 * Link state events: 60 */ 61 #define STARTING_EVT 856384768 /* link processing trigger */ 62 #define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ 63 #define TIMEOUT_EVT 560817u /* link timer expired */ 64 65 /* 66 * The following two 'message types' is really just implementation 67 * data conveniently stored in the message header. 68 * They must not be considered part of the protocol 69 */ 70 #define OPEN_MSG 0 71 #define CLOSED_MSG 1 72 73 /* 74 * State value stored in 'exp_msg_count' 75 */ 76 #define START_CHANGEOVER 100000u 77 78 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, 79 struct sk_buff *buf); 80 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf); 81 static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, 82 struct sk_buff **buf); 83 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance); 84 static void link_state_event(struct tipc_link *l_ptr, u32 event); 85 static void link_reset_statistics(struct tipc_link *l_ptr); 86 static void link_print(struct tipc_link *l_ptr, const char *str); 87 static void tipc_link_sync_xmit(struct tipc_link *l); 88 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 89 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf); 90 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf); 91 92 /* 93 * Simple link routines 94 */ 95 static unsigned int align(unsigned int i) 96 { 97 return (i + 3) & ~3u; 98 } 99 100 static void link_init_max_pkt(struct tipc_link *l_ptr) 101 { 102 struct tipc_bearer *b_ptr; 103 u32 max_pkt; 104 105 rcu_read_lock(); 106 b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]); 107 if (!b_ptr) { 108 rcu_read_unlock(); 109 return; 110 } 111 max_pkt = (b_ptr->mtu & ~3); 112 rcu_read_unlock(); 113 114 if (max_pkt > MAX_MSG_SIZE) 115 max_pkt = MAX_MSG_SIZE; 116 117 l_ptr->max_pkt_target = max_pkt; 118 if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) 119 l_ptr->max_pkt = l_ptr->max_pkt_target; 120 else 121 l_ptr->max_pkt = MAX_PKT_DEFAULT; 122 123 l_ptr->max_pkt_probes = 0; 124 } 125 126 static u32 link_next_sent(struct tipc_link *l_ptr) 127 { 128 if (l_ptr->next_out) 129 return buf_seqno(l_ptr->next_out); 130 return mod(l_ptr->next_out_no); 131 } 132 133 static u32 link_last_sent(struct tipc_link *l_ptr) 134 { 135 return mod(link_next_sent(l_ptr) - 1); 136 } 137 138 /* 139 * Simple non-static link routines (i.e. referenced outside this file) 140 */ 141 int tipc_link_is_up(struct tipc_link *l_ptr) 142 { 143 if (!l_ptr) 144 return 0; 145 return link_working_working(l_ptr) || link_working_unknown(l_ptr); 146 } 147 148 int tipc_link_is_active(struct tipc_link *l_ptr) 149 { 150 return (l_ptr->owner->active_links[0] == l_ptr) || 151 (l_ptr->owner->active_links[1] == l_ptr); 152 } 153 154 /** 155 * link_timeout - handle expiration of link timer 156 * @l_ptr: pointer to link 157 */ 158 static void link_timeout(struct tipc_link *l_ptr) 159 { 160 tipc_node_lock(l_ptr->owner); 161 162 /* update counters used in statistical profiling of send traffic */ 163 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 164 l_ptr->stats.queue_sz_counts++; 165 166 if (l_ptr->first_out) { 167 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 168 u32 length = msg_size(msg); 169 170 if ((msg_user(msg) == MSG_FRAGMENTER) && 171 (msg_type(msg) == FIRST_FRAGMENT)) { 172 length = msg_size(msg_get_wrapped(msg)); 173 } 174 if (length) { 175 l_ptr->stats.msg_lengths_total += length; 176 l_ptr->stats.msg_length_counts++; 177 if (length <= 64) 178 l_ptr->stats.msg_length_profile[0]++; 179 else if (length <= 256) 180 l_ptr->stats.msg_length_profile[1]++; 181 else if (length <= 1024) 182 l_ptr->stats.msg_length_profile[2]++; 183 else if (length <= 4096) 184 l_ptr->stats.msg_length_profile[3]++; 185 else if (length <= 16384) 186 l_ptr->stats.msg_length_profile[4]++; 187 else if (length <= 32768) 188 l_ptr->stats.msg_length_profile[5]++; 189 else 190 l_ptr->stats.msg_length_profile[6]++; 191 } 192 } 193 194 /* do all other link processing performed on a periodic basis */ 195 196 link_state_event(l_ptr, TIMEOUT_EVT); 197 198 if (l_ptr->next_out) 199 tipc_link_push_queue(l_ptr); 200 201 tipc_node_unlock(l_ptr->owner); 202 } 203 204 static void link_set_timer(struct tipc_link *l_ptr, u32 time) 205 { 206 k_start_timer(&l_ptr->timer, time); 207 } 208 209 /** 210 * tipc_link_create - create a new link 211 * @n_ptr: pointer to associated node 212 * @b_ptr: pointer to associated bearer 213 * @media_addr: media address to use when sending messages over link 214 * 215 * Returns pointer to link. 216 */ 217 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr, 218 struct tipc_bearer *b_ptr, 219 const struct tipc_media_addr *media_addr) 220 { 221 struct tipc_link *l_ptr; 222 struct tipc_msg *msg; 223 char *if_name; 224 char addr_string[16]; 225 u32 peer = n_ptr->addr; 226 227 if (n_ptr->link_cnt >= 2) { 228 tipc_addr_string_fill(addr_string, n_ptr->addr); 229 pr_err("Attempt to establish third link to %s\n", addr_string); 230 return NULL; 231 } 232 233 if (n_ptr->links[b_ptr->identity]) { 234 tipc_addr_string_fill(addr_string, n_ptr->addr); 235 pr_err("Attempt to establish second link on <%s> to %s\n", 236 b_ptr->name, addr_string); 237 return NULL; 238 } 239 240 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); 241 if (!l_ptr) { 242 pr_warn("Link creation failed, no memory\n"); 243 return NULL; 244 } 245 246 l_ptr->addr = peer; 247 if_name = strchr(b_ptr->name, ':') + 1; 248 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown", 249 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 250 tipc_node(tipc_own_addr), 251 if_name, 252 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 253 /* note: peer i/f name is updated by reset/activate message */ 254 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); 255 l_ptr->owner = n_ptr; 256 l_ptr->checkpoint = 1; 257 l_ptr->peer_session = INVALID_SESSION; 258 l_ptr->bearer_id = b_ptr->identity; 259 link_set_supervision_props(l_ptr, b_ptr->tolerance); 260 l_ptr->state = RESET_UNKNOWN; 261 262 l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; 263 msg = l_ptr->pmsg; 264 tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr); 265 msg_set_size(msg, sizeof(l_ptr->proto_msg)); 266 msg_set_session(msg, (tipc_random & 0xffff)); 267 msg_set_bearer_id(msg, b_ptr->identity); 268 strcpy((char *)msg_data(msg), if_name); 269 270 l_ptr->priority = b_ptr->priority; 271 tipc_link_set_queue_limits(l_ptr, b_ptr->window); 272 273 l_ptr->net_plane = b_ptr->net_plane; 274 link_init_max_pkt(l_ptr); 275 276 l_ptr->next_out_no = 1; 277 __skb_queue_head_init(&l_ptr->waiting_sks); 278 279 link_reset_statistics(l_ptr); 280 281 tipc_node_attach_link(n_ptr, l_ptr); 282 283 k_init_timer(&l_ptr->timer, (Handler)link_timeout, 284 (unsigned long)l_ptr); 285 286 link_state_event(l_ptr, STARTING_EVT); 287 288 return l_ptr; 289 } 290 291 void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down) 292 { 293 struct tipc_link *l_ptr; 294 struct tipc_node *n_ptr; 295 296 rcu_read_lock(); 297 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 298 tipc_node_lock(n_ptr); 299 l_ptr = n_ptr->links[bearer_id]; 300 if (l_ptr) { 301 tipc_link_reset(l_ptr); 302 if (shutting_down || !tipc_node_is_up(n_ptr)) { 303 tipc_node_detach_link(l_ptr->owner, l_ptr); 304 tipc_link_reset_fragments(l_ptr); 305 tipc_node_unlock(n_ptr); 306 307 /* Nobody else can access this link now: */ 308 del_timer_sync(&l_ptr->timer); 309 kfree(l_ptr); 310 } else { 311 /* Detach/delete when failover is finished: */ 312 l_ptr->flags |= LINK_STOPPED; 313 tipc_node_unlock(n_ptr); 314 del_timer_sync(&l_ptr->timer); 315 } 316 continue; 317 } 318 tipc_node_unlock(n_ptr); 319 } 320 rcu_read_unlock(); 321 } 322 323 /** 324 * link_schedule_user - schedule user for wakeup after congestion 325 * @link: congested link 326 * @oport: sending port 327 * @chain_sz: size of buffer chain that was attempted sent 328 * @imp: importance of message attempted sent 329 * Create pseudo msg to send back to user when congestion abates 330 */ 331 static bool link_schedule_user(struct tipc_link *link, u32 oport, 332 uint chain_sz, uint imp) 333 { 334 struct sk_buff *buf; 335 336 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr, 337 tipc_own_addr, oport, 0, 0); 338 if (!buf) 339 return false; 340 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 341 TIPC_SKB_CB(buf)->chain_imp = imp; 342 __skb_queue_tail(&link->waiting_sks, buf); 343 link->stats.link_congs++; 344 return true; 345 } 346 347 /** 348 * link_prepare_wakeup - prepare users for wakeup after congestion 349 * @link: congested link 350 * Move a number of waiting users, as permitted by available space in 351 * the send queue, from link wait queue to node wait queue for wakeup 352 */ 353 static void link_prepare_wakeup(struct tipc_link *link) 354 { 355 struct sk_buff_head *wq = &link->waiting_sks; 356 struct sk_buff *buf; 357 uint pend_qsz = link->out_queue_size; 358 359 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { 360 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) 361 break; 362 pend_qsz += TIPC_SKB_CB(buf)->chain_sz; 363 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); 364 } 365 } 366 367 /** 368 * link_release_outqueue - purge link's outbound message queue 369 * @l_ptr: pointer to link 370 */ 371 static void link_release_outqueue(struct tipc_link *l_ptr) 372 { 373 kfree_skb_list(l_ptr->first_out); 374 l_ptr->first_out = NULL; 375 l_ptr->out_queue_size = 0; 376 } 377 378 /** 379 * tipc_link_reset_fragments - purge link's inbound message fragments queue 380 * @l_ptr: pointer to link 381 */ 382 void tipc_link_reset_fragments(struct tipc_link *l_ptr) 383 { 384 kfree_skb(l_ptr->reasm_buf); 385 l_ptr->reasm_buf = NULL; 386 } 387 388 /** 389 * tipc_link_purge_queues - purge all pkt queues associated with link 390 * @l_ptr: pointer to link 391 */ 392 void tipc_link_purge_queues(struct tipc_link *l_ptr) 393 { 394 kfree_skb_list(l_ptr->oldest_deferred_in); 395 kfree_skb_list(l_ptr->first_out); 396 tipc_link_reset_fragments(l_ptr); 397 kfree_skb(l_ptr->proto_msg_queue); 398 l_ptr->proto_msg_queue = NULL; 399 } 400 401 void tipc_link_reset(struct tipc_link *l_ptr) 402 { 403 u32 prev_state = l_ptr->state; 404 u32 checkpoint = l_ptr->next_in_no; 405 int was_active_link = tipc_link_is_active(l_ptr); 406 struct tipc_node *owner = l_ptr->owner; 407 408 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 409 410 /* Link is down, accept any session */ 411 l_ptr->peer_session = INVALID_SESSION; 412 413 /* Prepare for max packet size negotiation */ 414 link_init_max_pkt(l_ptr); 415 416 l_ptr->state = RESET_UNKNOWN; 417 418 if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) 419 return; 420 421 tipc_node_link_down(l_ptr->owner, l_ptr); 422 tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr); 423 424 if (was_active_link && tipc_node_active_links(l_ptr->owner)) { 425 l_ptr->reset_checkpoint = checkpoint; 426 l_ptr->exp_msg_count = START_CHANGEOVER; 427 } 428 429 /* Clean up all queues: */ 430 link_release_outqueue(l_ptr); 431 kfree_skb(l_ptr->proto_msg_queue); 432 l_ptr->proto_msg_queue = NULL; 433 kfree_skb_list(l_ptr->oldest_deferred_in); 434 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 435 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 436 owner->action_flags |= TIPC_WAKEUP_USERS; 437 } 438 l_ptr->retransm_queue_head = 0; 439 l_ptr->retransm_queue_size = 0; 440 l_ptr->last_out = NULL; 441 l_ptr->first_out = NULL; 442 l_ptr->next_out = NULL; 443 l_ptr->unacked_window = 0; 444 l_ptr->checkpoint = 1; 445 l_ptr->next_out_no = 1; 446 l_ptr->deferred_inqueue_sz = 0; 447 l_ptr->oldest_deferred_in = NULL; 448 l_ptr->newest_deferred_in = NULL; 449 l_ptr->fsm_msg_cnt = 0; 450 l_ptr->stale_count = 0; 451 link_reset_statistics(l_ptr); 452 } 453 454 void tipc_link_reset_list(unsigned int bearer_id) 455 { 456 struct tipc_link *l_ptr; 457 struct tipc_node *n_ptr; 458 459 rcu_read_lock(); 460 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 461 tipc_node_lock(n_ptr); 462 l_ptr = n_ptr->links[bearer_id]; 463 if (l_ptr) 464 tipc_link_reset(l_ptr); 465 tipc_node_unlock(n_ptr); 466 } 467 rcu_read_unlock(); 468 } 469 470 static void link_activate(struct tipc_link *l_ptr) 471 { 472 l_ptr->next_in_no = l_ptr->stats.recv_info = 1; 473 tipc_node_link_up(l_ptr->owner, l_ptr); 474 tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr); 475 } 476 477 /** 478 * link_state_event - link finite state machine 479 * @l_ptr: pointer to link 480 * @event: state machine event to process 481 */ 482 static void link_state_event(struct tipc_link *l_ptr, unsigned int event) 483 { 484 struct tipc_link *other; 485 u32 cont_intv = l_ptr->continuity_interval; 486 487 if (l_ptr->flags & LINK_STOPPED) 488 return; 489 490 if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT)) 491 return; /* Not yet. */ 492 493 /* Check whether changeover is going on */ 494 if (l_ptr->exp_msg_count) { 495 if (event == TIMEOUT_EVT) 496 link_set_timer(l_ptr, cont_intv); 497 return; 498 } 499 500 switch (l_ptr->state) { 501 case WORKING_WORKING: 502 switch (event) { 503 case TRAFFIC_MSG_EVT: 504 case ACTIVATE_MSG: 505 break; 506 case TIMEOUT_EVT: 507 if (l_ptr->next_in_no != l_ptr->checkpoint) { 508 l_ptr->checkpoint = l_ptr->next_in_no; 509 if (tipc_bclink_acks_missing(l_ptr->owner)) { 510 tipc_link_proto_xmit(l_ptr, STATE_MSG, 511 0, 0, 0, 0, 0); 512 l_ptr->fsm_msg_cnt++; 513 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { 514 tipc_link_proto_xmit(l_ptr, STATE_MSG, 515 1, 0, 0, 0, 0); 516 l_ptr->fsm_msg_cnt++; 517 } 518 link_set_timer(l_ptr, cont_intv); 519 break; 520 } 521 l_ptr->state = WORKING_UNKNOWN; 522 l_ptr->fsm_msg_cnt = 0; 523 tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 524 l_ptr->fsm_msg_cnt++; 525 link_set_timer(l_ptr, cont_intv / 4); 526 break; 527 case RESET_MSG: 528 pr_info("%s<%s>, requested by peer\n", link_rst_msg, 529 l_ptr->name); 530 tipc_link_reset(l_ptr); 531 l_ptr->state = RESET_RESET; 532 l_ptr->fsm_msg_cnt = 0; 533 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, 534 0, 0, 0, 0, 0); 535 l_ptr->fsm_msg_cnt++; 536 link_set_timer(l_ptr, cont_intv); 537 break; 538 default: 539 pr_err("%s%u in WW state\n", link_unk_evt, event); 540 } 541 break; 542 case WORKING_UNKNOWN: 543 switch (event) { 544 case TRAFFIC_MSG_EVT: 545 case ACTIVATE_MSG: 546 l_ptr->state = WORKING_WORKING; 547 l_ptr->fsm_msg_cnt = 0; 548 link_set_timer(l_ptr, cont_intv); 549 break; 550 case RESET_MSG: 551 pr_info("%s<%s>, requested by peer while probing\n", 552 link_rst_msg, l_ptr->name); 553 tipc_link_reset(l_ptr); 554 l_ptr->state = RESET_RESET; 555 l_ptr->fsm_msg_cnt = 0; 556 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, 557 0, 0, 0, 0, 0); 558 l_ptr->fsm_msg_cnt++; 559 link_set_timer(l_ptr, cont_intv); 560 break; 561 case TIMEOUT_EVT: 562 if (l_ptr->next_in_no != l_ptr->checkpoint) { 563 l_ptr->state = WORKING_WORKING; 564 l_ptr->fsm_msg_cnt = 0; 565 l_ptr->checkpoint = l_ptr->next_in_no; 566 if (tipc_bclink_acks_missing(l_ptr->owner)) { 567 tipc_link_proto_xmit(l_ptr, STATE_MSG, 568 0, 0, 0, 0, 0); 569 l_ptr->fsm_msg_cnt++; 570 } 571 link_set_timer(l_ptr, cont_intv); 572 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { 573 tipc_link_proto_xmit(l_ptr, STATE_MSG, 574 1, 0, 0, 0, 0); 575 l_ptr->fsm_msg_cnt++; 576 link_set_timer(l_ptr, cont_intv / 4); 577 } else { /* Link has failed */ 578 pr_warn("%s<%s>, peer not responding\n", 579 link_rst_msg, l_ptr->name); 580 tipc_link_reset(l_ptr); 581 l_ptr->state = RESET_UNKNOWN; 582 l_ptr->fsm_msg_cnt = 0; 583 tipc_link_proto_xmit(l_ptr, RESET_MSG, 584 0, 0, 0, 0, 0); 585 l_ptr->fsm_msg_cnt++; 586 link_set_timer(l_ptr, cont_intv); 587 } 588 break; 589 default: 590 pr_err("%s%u in WU state\n", link_unk_evt, event); 591 } 592 break; 593 case RESET_UNKNOWN: 594 switch (event) { 595 case TRAFFIC_MSG_EVT: 596 break; 597 case ACTIVATE_MSG: 598 other = l_ptr->owner->active_links[0]; 599 if (other && link_working_unknown(other)) 600 break; 601 l_ptr->state = WORKING_WORKING; 602 l_ptr->fsm_msg_cnt = 0; 603 link_activate(l_ptr); 604 tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 605 l_ptr->fsm_msg_cnt++; 606 if (l_ptr->owner->working_links == 1) 607 tipc_link_sync_xmit(l_ptr); 608 link_set_timer(l_ptr, cont_intv); 609 break; 610 case RESET_MSG: 611 l_ptr->state = RESET_RESET; 612 l_ptr->fsm_msg_cnt = 0; 613 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, 614 1, 0, 0, 0, 0); 615 l_ptr->fsm_msg_cnt++; 616 link_set_timer(l_ptr, cont_intv); 617 break; 618 case STARTING_EVT: 619 l_ptr->flags |= LINK_STARTED; 620 /* fall through */ 621 case TIMEOUT_EVT: 622 tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); 623 l_ptr->fsm_msg_cnt++; 624 link_set_timer(l_ptr, cont_intv); 625 break; 626 default: 627 pr_err("%s%u in RU state\n", link_unk_evt, event); 628 } 629 break; 630 case RESET_RESET: 631 switch (event) { 632 case TRAFFIC_MSG_EVT: 633 case ACTIVATE_MSG: 634 other = l_ptr->owner->active_links[0]; 635 if (other && link_working_unknown(other)) 636 break; 637 l_ptr->state = WORKING_WORKING; 638 l_ptr->fsm_msg_cnt = 0; 639 link_activate(l_ptr); 640 tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 641 l_ptr->fsm_msg_cnt++; 642 if (l_ptr->owner->working_links == 1) 643 tipc_link_sync_xmit(l_ptr); 644 link_set_timer(l_ptr, cont_intv); 645 break; 646 case RESET_MSG: 647 break; 648 case TIMEOUT_EVT: 649 tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG, 650 0, 0, 0, 0, 0); 651 l_ptr->fsm_msg_cnt++; 652 link_set_timer(l_ptr, cont_intv); 653 break; 654 default: 655 pr_err("%s%u in RR state\n", link_unk_evt, event); 656 } 657 break; 658 default: 659 pr_err("Unknown link state %u/%u\n", l_ptr->state, event); 660 } 661 } 662 663 /* tipc_link_cong: determine return value and how to treat the 664 * sent buffer during link congestion. 665 * - For plain, errorless user data messages we keep the buffer and 666 * return -ELINKONG. 667 * - For all other messages we discard the buffer and return -EHOSTUNREACH 668 * - For TIPC internal messages we also reset the link 669 */ 670 static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 671 { 672 struct tipc_msg *msg = buf_msg(buf); 673 uint imp = tipc_msg_tot_importance(msg); 674 u32 oport = msg_tot_origport(msg); 675 676 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 677 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 678 tipc_link_reset(link); 679 goto drop; 680 } 681 if (unlikely(msg_errcode(msg))) 682 goto drop; 683 if (unlikely(msg_reroute_cnt(msg))) 684 goto drop; 685 if (TIPC_SKB_CB(buf)->wakeup_pending) 686 return -ELINKCONG; 687 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) 688 return -ELINKCONG; 689 drop: 690 kfree_skb_list(buf); 691 return -EHOSTUNREACH; 692 } 693 694 /** 695 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 696 * @link: link to use 697 * @buf: chain of buffers containing message 698 * Consumes the buffer chain, except when returning -ELINKCONG 699 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 700 * user data messages) or -EHOSTUNREACH (all other messages/senders) 701 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 702 * to act on the return value, since they may need to do more send attempts. 703 */ 704 int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) 705 { 706 struct tipc_msg *msg = buf_msg(buf); 707 uint psz = msg_size(msg); 708 uint qsz = link->out_queue_size; 709 uint sndlim = link->queue_limit[0]; 710 uint imp = tipc_msg_tot_importance(msg); 711 uint mtu = link->max_pkt; 712 uint ack = mod(link->next_in_no - 1); 713 uint seqno = link->next_out_no; 714 uint bc_last_in = link->owner->bclink.last_in; 715 struct tipc_media_addr *addr = &link->media_addr; 716 struct sk_buff *next = buf->next; 717 718 /* Match queue limits against msg importance: */ 719 if (unlikely(qsz >= link->queue_limit[imp])) 720 return tipc_link_cong(link, buf); 721 722 /* Has valid packet limit been used ? */ 723 if (unlikely(psz > mtu)) { 724 kfree_skb_list(buf); 725 return -EMSGSIZE; 726 } 727 728 /* Prepare each packet for sending, and add to outqueue: */ 729 while (buf) { 730 next = buf->next; 731 msg = buf_msg(buf); 732 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 733 msg_set_bcast_ack(msg, bc_last_in); 734 735 if (!link->first_out) { 736 link->first_out = buf; 737 } else if (qsz < sndlim) { 738 link->last_out->next = buf; 739 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { 740 link->stats.sent_bundled++; 741 buf = next; 742 next = buf->next; 743 continue; 744 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { 745 link->stats.sent_bundled++; 746 link->stats.sent_bundles++; 747 link->last_out->next = buf; 748 if (!link->next_out) 749 link->next_out = buf; 750 } else { 751 link->last_out->next = buf; 752 if (!link->next_out) 753 link->next_out = buf; 754 } 755 756 /* Send packet if possible: */ 757 if (likely(++qsz <= sndlim)) { 758 tipc_bearer_send(link->bearer_id, buf, addr); 759 link->next_out = next; 760 link->unacked_window = 0; 761 } 762 seqno++; 763 link->last_out = buf; 764 buf = next; 765 } 766 link->next_out_no = seqno; 767 link->out_queue_size = qsz; 768 return 0; 769 } 770 771 /** 772 * tipc_link_xmit() is the general link level function for message sending 773 * @buf: chain of buffers containing message 774 * @dsz: amount of user data to be sent 775 * @dnode: address of destination node 776 * @selector: a number used for deterministic link selection 777 * Consumes the buffer chain, except when returning -ELINKCONG 778 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 779 */ 780 int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) 781 { 782 struct tipc_link *link = NULL; 783 struct tipc_node *node; 784 int rc = -EHOSTUNREACH; 785 786 node = tipc_node_find(dnode); 787 if (node) { 788 tipc_node_lock(node); 789 link = node->active_links[selector & 1]; 790 if (link) 791 rc = __tipc_link_xmit(link, buf); 792 tipc_node_unlock(node); 793 } 794 795 if (link) 796 return rc; 797 798 if (likely(in_own_node(dnode))) 799 return tipc_sk_rcv(buf); 800 801 kfree_skb_list(buf); 802 return rc; 803 } 804 805 /* 806 * tipc_link_sync_xmit - synchronize broadcast link endpoints. 807 * 808 * Give a newly added peer node the sequence number where it should 809 * start receiving and acking broadcast packets. 810 * 811 * Called with node locked 812 */ 813 static void tipc_link_sync_xmit(struct tipc_link *link) 814 { 815 struct sk_buff *buf; 816 struct tipc_msg *msg; 817 818 buf = tipc_buf_acquire(INT_H_SIZE); 819 if (!buf) 820 return; 821 822 msg = buf_msg(buf); 823 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); 824 msg_set_last_bcast(msg, link->owner->bclink.acked); 825 __tipc_link_xmit(link, buf); 826 } 827 828 /* 829 * tipc_link_sync_rcv - synchronize broadcast link endpoints. 830 * Receive the sequence number where we should start receiving and 831 * acking broadcast packets from a newly added peer node, and open 832 * up for reception of such packets. 833 * 834 * Called with node locked 835 */ 836 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf) 837 { 838 struct tipc_msg *msg = buf_msg(buf); 839 840 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg); 841 n->bclink.recv_permitted = true; 842 kfree_skb(buf); 843 } 844 845 /* 846 * tipc_link_push_packet: Push one unsent packet to the media 847 */ 848 static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 849 { 850 struct sk_buff *buf = l_ptr->first_out; 851 u32 r_q_size = l_ptr->retransm_queue_size; 852 u32 r_q_head = l_ptr->retransm_queue_head; 853 854 /* Step to position where retransmission failed, if any, */ 855 /* consider that buffers may have been released in meantime */ 856 if (r_q_size && buf) { 857 u32 last = lesser(mod(r_q_head + r_q_size), 858 link_last_sent(l_ptr)); 859 u32 first = buf_seqno(buf); 860 861 while (buf && less(first, r_q_head)) { 862 first = mod(first + 1); 863 buf = buf->next; 864 } 865 l_ptr->retransm_queue_head = r_q_head = first; 866 l_ptr->retransm_queue_size = r_q_size = mod(last - first); 867 } 868 869 /* Continue retransmission now, if there is anything: */ 870 if (r_q_size && buf) { 871 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 872 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 873 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 874 l_ptr->retransm_queue_head = mod(++r_q_head); 875 l_ptr->retransm_queue_size = --r_q_size; 876 l_ptr->stats.retransmitted++; 877 return 0; 878 } 879 880 /* Send deferred protocol message, if any: */ 881 buf = l_ptr->proto_msg_queue; 882 if (buf) { 883 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 884 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 885 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 886 l_ptr->unacked_window = 0; 887 kfree_skb(buf); 888 l_ptr->proto_msg_queue = NULL; 889 return 0; 890 } 891 892 /* Send one deferred data message, if send window not full: */ 893 buf = l_ptr->next_out; 894 if (buf) { 895 struct tipc_msg *msg = buf_msg(buf); 896 u32 next = msg_seqno(msg); 897 u32 first = buf_seqno(l_ptr->first_out); 898 899 if (mod(next - first) < l_ptr->queue_limit[0]) { 900 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 901 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 902 tipc_bearer_send(l_ptr->bearer_id, buf, 903 &l_ptr->media_addr); 904 if (msg_user(msg) == MSG_BUNDLER) 905 msg_set_type(msg, BUNDLE_CLOSED); 906 l_ptr->next_out = buf->next; 907 return 0; 908 } 909 } 910 return 1; 911 } 912 913 /* 914 * push_queue(): push out the unsent messages of a link where 915 * congestion has abated. Node is locked 916 */ 917 void tipc_link_push_queue(struct tipc_link *l_ptr) 918 { 919 u32 res; 920 921 do { 922 res = tipc_link_push_packet(l_ptr); 923 } while (!res); 924 } 925 926 void tipc_link_reset_all(struct tipc_node *node) 927 { 928 char addr_string[16]; 929 u32 i; 930 931 tipc_node_lock(node); 932 933 pr_warn("Resetting all links to %s\n", 934 tipc_addr_string_fill(addr_string, node->addr)); 935 936 for (i = 0; i < MAX_BEARERS; i++) { 937 if (node->links[i]) { 938 link_print(node->links[i], "Resetting link\n"); 939 tipc_link_reset(node->links[i]); 940 } 941 } 942 943 tipc_node_unlock(node); 944 } 945 946 static void link_retransmit_failure(struct tipc_link *l_ptr, 947 struct sk_buff *buf) 948 { 949 struct tipc_msg *msg = buf_msg(buf); 950 951 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); 952 953 if (l_ptr->addr) { 954 /* Handle failure on standard link */ 955 link_print(l_ptr, "Resetting link\n"); 956 tipc_link_reset(l_ptr); 957 958 } else { 959 /* Handle failure on broadcast link */ 960 struct tipc_node *n_ptr; 961 char addr_string[16]; 962 963 pr_info("Msg seq number: %u, ", msg_seqno(msg)); 964 pr_cont("Outstanding acks: %lu\n", 965 (unsigned long) TIPC_SKB_CB(buf)->handle); 966 967 n_ptr = tipc_bclink_retransmit_to(); 968 tipc_node_lock(n_ptr); 969 970 tipc_addr_string_fill(addr_string, n_ptr->addr); 971 pr_info("Broadcast link info for %s\n", addr_string); 972 pr_info("Reception permitted: %d, Acked: %u\n", 973 n_ptr->bclink.recv_permitted, 974 n_ptr->bclink.acked); 975 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", 976 n_ptr->bclink.last_in, 977 n_ptr->bclink.oos_state, 978 n_ptr->bclink.last_sent); 979 980 tipc_node_unlock(n_ptr); 981 982 tipc_bclink_set_flags(TIPC_BCLINK_RESET); 983 l_ptr->stale_count = 0; 984 } 985 } 986 987 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, 988 u32 retransmits) 989 { 990 struct tipc_msg *msg; 991 992 if (!buf) 993 return; 994 995 msg = buf_msg(buf); 996 997 /* Detect repeated retransmit failures */ 998 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 999 if (++l_ptr->stale_count > 100) { 1000 link_retransmit_failure(l_ptr, buf); 1001 return; 1002 } 1003 } else { 1004 l_ptr->last_retransmitted = msg_seqno(msg); 1005 l_ptr->stale_count = 1; 1006 } 1007 1008 while (retransmits && (buf != l_ptr->next_out) && buf) { 1009 msg = buf_msg(buf); 1010 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1011 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1012 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 1013 buf = buf->next; 1014 retransmits--; 1015 l_ptr->stats.retransmitted++; 1016 } 1017 1018 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1019 } 1020 1021 /** 1022 * link_insert_deferred_queue - insert deferred messages back into receive chain 1023 */ 1024 static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr, 1025 struct sk_buff *buf) 1026 { 1027 u32 seq_no; 1028 1029 if (l_ptr->oldest_deferred_in == NULL) 1030 return buf; 1031 1032 seq_no = buf_seqno(l_ptr->oldest_deferred_in); 1033 if (seq_no == mod(l_ptr->next_in_no)) { 1034 l_ptr->newest_deferred_in->next = buf; 1035 buf = l_ptr->oldest_deferred_in; 1036 l_ptr->oldest_deferred_in = NULL; 1037 l_ptr->deferred_inqueue_sz = 0; 1038 } 1039 return buf; 1040 } 1041 1042 /** 1043 * link_recv_buf_validate - validate basic format of received message 1044 * 1045 * This routine ensures a TIPC message has an acceptable header, and at least 1046 * as much data as the header indicates it should. The routine also ensures 1047 * that the entire message header is stored in the main fragment of the message 1048 * buffer, to simplify future access to message header fields. 1049 * 1050 * Note: Having extra info present in the message header or data areas is OK. 1051 * TIPC will ignore the excess, under the assumption that it is optional info 1052 * introduced by a later release of the protocol. 1053 */ 1054 static int link_recv_buf_validate(struct sk_buff *buf) 1055 { 1056 static u32 min_data_hdr_size[8] = { 1057 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE, 1058 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE 1059 }; 1060 1061 struct tipc_msg *msg; 1062 u32 tipc_hdr[2]; 1063 u32 size; 1064 u32 hdr_size; 1065 u32 min_hdr_size; 1066 1067 /* If this packet comes from the defer queue, the skb has already 1068 * been validated 1069 */ 1070 if (unlikely(TIPC_SKB_CB(buf)->deferred)) 1071 return 1; 1072 1073 if (unlikely(buf->len < MIN_H_SIZE)) 1074 return 0; 1075 1076 msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr); 1077 if (msg == NULL) 1078 return 0; 1079 1080 if (unlikely(msg_version(msg) != TIPC_VERSION)) 1081 return 0; 1082 1083 size = msg_size(msg); 1084 hdr_size = msg_hdr_sz(msg); 1085 min_hdr_size = msg_isdata(msg) ? 1086 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE; 1087 1088 if (unlikely((hdr_size < min_hdr_size) || 1089 (size < hdr_size) || 1090 (buf->len < size) || 1091 (size - hdr_size > TIPC_MAX_USER_MSG_SIZE))) 1092 return 0; 1093 1094 return pskb_may_pull(buf, hdr_size); 1095 } 1096 1097 /** 1098 * tipc_rcv - process TIPC packets/messages arriving from off-node 1099 * @head: pointer to message buffer chain 1100 * @b_ptr: pointer to bearer message arrived on 1101 * 1102 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1103 * structure (i.e. cannot be NULL), but bearer can be inactive. 1104 */ 1105 void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) 1106 { 1107 while (head) { 1108 struct tipc_node *n_ptr; 1109 struct tipc_link *l_ptr; 1110 struct sk_buff *crs; 1111 struct sk_buff *buf = head; 1112 struct tipc_msg *msg; 1113 u32 seq_no; 1114 u32 ackd; 1115 u32 released = 0; 1116 1117 head = head->next; 1118 buf->next = NULL; 1119 1120 /* Ensure message is well-formed */ 1121 if (unlikely(!link_recv_buf_validate(buf))) 1122 goto discard; 1123 1124 /* Ensure message data is a single contiguous unit */ 1125 if (unlikely(skb_linearize(buf))) 1126 goto discard; 1127 1128 /* Handle arrival of a non-unicast link message */ 1129 msg = buf_msg(buf); 1130 1131 if (unlikely(msg_non_seq(msg))) { 1132 if (msg_user(msg) == LINK_CONFIG) 1133 tipc_disc_rcv(buf, b_ptr); 1134 else 1135 tipc_bclink_rcv(buf); 1136 continue; 1137 } 1138 1139 /* Discard unicast link messages destined for another node */ 1140 if (unlikely(!msg_short(msg) && 1141 (msg_destnode(msg) != tipc_own_addr))) 1142 goto discard; 1143 1144 /* Locate neighboring node that sent message */ 1145 n_ptr = tipc_node_find(msg_prevnode(msg)); 1146 if (unlikely(!n_ptr)) 1147 goto discard; 1148 tipc_node_lock(n_ptr); 1149 1150 /* Locate unicast link endpoint that should handle message */ 1151 l_ptr = n_ptr->links[b_ptr->identity]; 1152 if (unlikely(!l_ptr)) 1153 goto unlock_discard; 1154 1155 /* Verify that communication with node is currently allowed */ 1156 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && 1157 msg_user(msg) == LINK_PROTOCOL && 1158 (msg_type(msg) == RESET_MSG || 1159 msg_type(msg) == ACTIVATE_MSG) && 1160 !msg_redundant_link(msg)) 1161 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; 1162 1163 if (tipc_node_blocked(n_ptr)) 1164 goto unlock_discard; 1165 1166 /* Validate message sequence number info */ 1167 seq_no = msg_seqno(msg); 1168 ackd = msg_ack(msg); 1169 1170 /* Release acked messages */ 1171 if (n_ptr->bclink.recv_permitted) 1172 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1173 1174 crs = l_ptr->first_out; 1175 while ((crs != l_ptr->next_out) && 1176 less_eq(buf_seqno(crs), ackd)) { 1177 struct sk_buff *next = crs->next; 1178 kfree_skb(crs); 1179 crs = next; 1180 released++; 1181 } 1182 if (released) { 1183 l_ptr->first_out = crs; 1184 l_ptr->out_queue_size -= released; 1185 } 1186 1187 /* Try sending any messages link endpoint has pending */ 1188 if (unlikely(l_ptr->next_out)) 1189 tipc_link_push_queue(l_ptr); 1190 1191 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1192 link_prepare_wakeup(l_ptr); 1193 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS; 1194 } 1195 1196 /* Process the incoming packet */ 1197 if (unlikely(!link_working_working(l_ptr))) { 1198 if (msg_user(msg) == LINK_PROTOCOL) { 1199 tipc_link_proto_rcv(l_ptr, buf); 1200 head = link_insert_deferred_queue(l_ptr, head); 1201 tipc_node_unlock(n_ptr); 1202 continue; 1203 } 1204 1205 /* Traffic message. Conditionally activate link */ 1206 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1207 1208 if (link_working_working(l_ptr)) { 1209 /* Re-insert buffer in front of queue */ 1210 buf->next = head; 1211 head = buf; 1212 tipc_node_unlock(n_ptr); 1213 continue; 1214 } 1215 goto unlock_discard; 1216 } 1217 1218 /* Link is now in state WORKING_WORKING */ 1219 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1220 link_handle_out_of_seq_msg(l_ptr, buf); 1221 head = link_insert_deferred_queue(l_ptr, head); 1222 tipc_node_unlock(n_ptr); 1223 continue; 1224 } 1225 l_ptr->next_in_no++; 1226 if (unlikely(l_ptr->oldest_deferred_in)) 1227 head = link_insert_deferred_queue(l_ptr, head); 1228 1229 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1230 l_ptr->stats.sent_acks++; 1231 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1232 } 1233 1234 if (tipc_link_prepare_input(l_ptr, &buf)) { 1235 tipc_node_unlock(n_ptr); 1236 continue; 1237 } 1238 tipc_node_unlock(n_ptr); 1239 msg = buf_msg(buf); 1240 if (tipc_link_input(l_ptr, buf) != 0) 1241 goto discard; 1242 continue; 1243 unlock_discard: 1244 tipc_node_unlock(n_ptr); 1245 discard: 1246 kfree_skb(buf); 1247 } 1248 } 1249 1250 /** 1251 * tipc_link_prepare_input - process TIPC link messages 1252 * 1253 * returns nonzero if the message was consumed 1254 * 1255 * Node lock must be held 1256 */ 1257 static int tipc_link_prepare_input(struct tipc_link *l, struct sk_buff **buf) 1258 { 1259 struct tipc_node *n; 1260 struct tipc_msg *msg; 1261 int res = -EINVAL; 1262 1263 n = l->owner; 1264 msg = buf_msg(*buf); 1265 switch (msg_user(msg)) { 1266 case CHANGEOVER_PROTOCOL: 1267 if (tipc_link_tunnel_rcv(n, buf)) 1268 res = 0; 1269 break; 1270 case MSG_FRAGMENTER: 1271 l->stats.recv_fragments++; 1272 if (tipc_buf_append(&l->reasm_buf, buf)) { 1273 l->stats.recv_fragmented++; 1274 res = 0; 1275 } else if (!l->reasm_buf) { 1276 tipc_link_reset(l); 1277 } 1278 break; 1279 case MSG_BUNDLER: 1280 l->stats.recv_bundles++; 1281 l->stats.recv_bundled += msg_msgcnt(msg); 1282 res = 0; 1283 break; 1284 case NAME_DISTRIBUTOR: 1285 n->bclink.recv_permitted = true; 1286 res = 0; 1287 break; 1288 case BCAST_PROTOCOL: 1289 tipc_link_sync_rcv(n, *buf); 1290 break; 1291 default: 1292 res = 0; 1293 } 1294 return res; 1295 } 1296 /** 1297 * tipc_link_input - Deliver message too higher layers 1298 */ 1299 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf) 1300 { 1301 struct tipc_msg *msg = buf_msg(buf); 1302 int res = 0; 1303 1304 switch (msg_user(msg)) { 1305 case TIPC_LOW_IMPORTANCE: 1306 case TIPC_MEDIUM_IMPORTANCE: 1307 case TIPC_HIGH_IMPORTANCE: 1308 case TIPC_CRITICAL_IMPORTANCE: 1309 case CONN_MANAGER: 1310 tipc_sk_rcv(buf); 1311 break; 1312 case NAME_DISTRIBUTOR: 1313 tipc_named_rcv(buf); 1314 break; 1315 case MSG_BUNDLER: 1316 tipc_link_bundle_rcv(buf); 1317 break; 1318 default: 1319 res = -EINVAL; 1320 } 1321 return res; 1322 } 1323 1324 /** 1325 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue 1326 * 1327 * Returns increase in queue length (i.e. 0 or 1) 1328 */ 1329 u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 1330 struct sk_buff *buf) 1331 { 1332 struct sk_buff *queue_buf; 1333 struct sk_buff **prev; 1334 u32 seq_no = buf_seqno(buf); 1335 1336 buf->next = NULL; 1337 1338 /* Empty queue ? */ 1339 if (*head == NULL) { 1340 *head = *tail = buf; 1341 return 1; 1342 } 1343 1344 /* Last ? */ 1345 if (less(buf_seqno(*tail), seq_no)) { 1346 (*tail)->next = buf; 1347 *tail = buf; 1348 return 1; 1349 } 1350 1351 /* Locate insertion point in queue, then insert; discard if duplicate */ 1352 prev = head; 1353 queue_buf = *head; 1354 for (;;) { 1355 u32 curr_seqno = buf_seqno(queue_buf); 1356 1357 if (seq_no == curr_seqno) { 1358 kfree_skb(buf); 1359 return 0; 1360 } 1361 1362 if (less(seq_no, curr_seqno)) 1363 break; 1364 1365 prev = &queue_buf->next; 1366 queue_buf = queue_buf->next; 1367 } 1368 1369 buf->next = queue_buf; 1370 *prev = buf; 1371 return 1; 1372 } 1373 1374 /* 1375 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet 1376 */ 1377 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr, 1378 struct sk_buff *buf) 1379 { 1380 u32 seq_no = buf_seqno(buf); 1381 1382 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { 1383 tipc_link_proto_rcv(l_ptr, buf); 1384 return; 1385 } 1386 1387 /* Record OOS packet arrival (force mismatch on next timeout) */ 1388 l_ptr->checkpoint--; 1389 1390 /* 1391 * Discard packet if a duplicate; otherwise add it to deferred queue 1392 * and notify peer of gap as per protocol specification 1393 */ 1394 if (less(seq_no, mod(l_ptr->next_in_no))) { 1395 l_ptr->stats.duplicates++; 1396 kfree_skb(buf); 1397 return; 1398 } 1399 1400 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1401 &l_ptr->newest_deferred_in, buf)) { 1402 l_ptr->deferred_inqueue_sz++; 1403 l_ptr->stats.deferred_recv++; 1404 TIPC_SKB_CB(buf)->deferred = true; 1405 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1406 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1407 } else 1408 l_ptr->stats.duplicates++; 1409 } 1410 1411 /* 1412 * Send protocol message to the other endpoint. 1413 */ 1414 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg, 1415 u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) 1416 { 1417 struct sk_buff *buf = NULL; 1418 struct tipc_msg *msg = l_ptr->pmsg; 1419 u32 msg_size = sizeof(l_ptr->proto_msg); 1420 int r_flag; 1421 1422 /* Discard any previous message that was deferred due to congestion */ 1423 if (l_ptr->proto_msg_queue) { 1424 kfree_skb(l_ptr->proto_msg_queue); 1425 l_ptr->proto_msg_queue = NULL; 1426 } 1427 1428 /* Don't send protocol message during link changeover */ 1429 if (l_ptr->exp_msg_count) 1430 return; 1431 1432 /* Abort non-RESET send if communication with node is prohibited */ 1433 if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG)) 1434 return; 1435 1436 /* Create protocol message with "out-of-sequence" sequence number */ 1437 msg_set_type(msg, msg_typ); 1438 msg_set_net_plane(msg, l_ptr->net_plane); 1439 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1440 msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); 1441 1442 if (msg_typ == STATE_MSG) { 1443 u32 next_sent = mod(l_ptr->next_out_no); 1444 1445 if (!tipc_link_is_up(l_ptr)) 1446 return; 1447 if (l_ptr->next_out) 1448 next_sent = buf_seqno(l_ptr->next_out); 1449 msg_set_next_sent(msg, next_sent); 1450 if (l_ptr->oldest_deferred_in) { 1451 u32 rec = buf_seqno(l_ptr->oldest_deferred_in); 1452 gap = mod(rec - mod(l_ptr->next_in_no)); 1453 } 1454 msg_set_seq_gap(msg, gap); 1455 if (gap) 1456 l_ptr->stats.sent_nacks++; 1457 msg_set_link_tolerance(msg, tolerance); 1458 msg_set_linkprio(msg, priority); 1459 msg_set_max_pkt(msg, ack_mtu); 1460 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1461 msg_set_probe(msg, probe_msg != 0); 1462 if (probe_msg) { 1463 u32 mtu = l_ptr->max_pkt; 1464 1465 if ((mtu < l_ptr->max_pkt_target) && 1466 link_working_working(l_ptr) && 1467 l_ptr->fsm_msg_cnt) { 1468 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 1469 if (l_ptr->max_pkt_probes == 10) { 1470 l_ptr->max_pkt_target = (msg_size - 4); 1471 l_ptr->max_pkt_probes = 0; 1472 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 1473 } 1474 l_ptr->max_pkt_probes++; 1475 } 1476 1477 l_ptr->stats.sent_probes++; 1478 } 1479 l_ptr->stats.sent_states++; 1480 } else { /* RESET_MSG or ACTIVATE_MSG */ 1481 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); 1482 msg_set_seq_gap(msg, 0); 1483 msg_set_next_sent(msg, 1); 1484 msg_set_probe(msg, 0); 1485 msg_set_link_tolerance(msg, l_ptr->tolerance); 1486 msg_set_linkprio(msg, l_ptr->priority); 1487 msg_set_max_pkt(msg, l_ptr->max_pkt_target); 1488 } 1489 1490 r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr)); 1491 msg_set_redundant_link(msg, r_flag); 1492 msg_set_linkprio(msg, l_ptr->priority); 1493 msg_set_size(msg, msg_size); 1494 1495 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); 1496 1497 buf = tipc_buf_acquire(msg_size); 1498 if (!buf) 1499 return; 1500 1501 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1502 buf->priority = TC_PRIO_CONTROL; 1503 1504 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 1505 l_ptr->unacked_window = 0; 1506 kfree_skb(buf); 1507 } 1508 1509 /* 1510 * Receive protocol message : 1511 * Note that network plane id propagates through the network, and may 1512 * change at any time. The node with lowest address rules 1513 */ 1514 static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf) 1515 { 1516 u32 rec_gap = 0; 1517 u32 max_pkt_info; 1518 u32 max_pkt_ack; 1519 u32 msg_tol; 1520 struct tipc_msg *msg = buf_msg(buf); 1521 1522 /* Discard protocol message during link changeover */ 1523 if (l_ptr->exp_msg_count) 1524 goto exit; 1525 1526 if (l_ptr->net_plane != msg_net_plane(msg)) 1527 if (tipc_own_addr > msg_prevnode(msg)) 1528 l_ptr->net_plane = msg_net_plane(msg); 1529 1530 switch (msg_type(msg)) { 1531 1532 case RESET_MSG: 1533 if (!link_working_unknown(l_ptr) && 1534 (l_ptr->peer_session != INVALID_SESSION)) { 1535 if (less_eq(msg_session(msg), l_ptr->peer_session)) 1536 break; /* duplicate or old reset: ignore */ 1537 } 1538 1539 if (!msg_redundant_link(msg) && (link_working_working(l_ptr) || 1540 link_working_unknown(l_ptr))) { 1541 /* 1542 * peer has lost contact -- don't allow peer's links 1543 * to reactivate before we recognize loss & clean up 1544 */ 1545 l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN; 1546 } 1547 1548 link_state_event(l_ptr, RESET_MSG); 1549 1550 /* fall thru' */ 1551 case ACTIVATE_MSG: 1552 /* Update link settings according other endpoint's values */ 1553 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); 1554 1555 msg_tol = msg_link_tolerance(msg); 1556 if (msg_tol > l_ptr->tolerance) 1557 link_set_supervision_props(l_ptr, msg_tol); 1558 1559 if (msg_linkprio(msg) > l_ptr->priority) 1560 l_ptr->priority = msg_linkprio(msg); 1561 1562 max_pkt_info = msg_max_pkt(msg); 1563 if (max_pkt_info) { 1564 if (max_pkt_info < l_ptr->max_pkt_target) 1565 l_ptr->max_pkt_target = max_pkt_info; 1566 if (l_ptr->max_pkt > l_ptr->max_pkt_target) 1567 l_ptr->max_pkt = l_ptr->max_pkt_target; 1568 } else { 1569 l_ptr->max_pkt = l_ptr->max_pkt_target; 1570 } 1571 1572 /* Synchronize broadcast link info, if not done previously */ 1573 if (!tipc_node_is_up(l_ptr->owner)) { 1574 l_ptr->owner->bclink.last_sent = 1575 l_ptr->owner->bclink.last_in = 1576 msg_last_bcast(msg); 1577 l_ptr->owner->bclink.oos_state = 0; 1578 } 1579 1580 l_ptr->peer_session = msg_session(msg); 1581 l_ptr->peer_bearer_id = msg_bearer_id(msg); 1582 1583 if (msg_type(msg) == ACTIVATE_MSG) 1584 link_state_event(l_ptr, ACTIVATE_MSG); 1585 break; 1586 case STATE_MSG: 1587 1588 msg_tol = msg_link_tolerance(msg); 1589 if (msg_tol) 1590 link_set_supervision_props(l_ptr, msg_tol); 1591 1592 if (msg_linkprio(msg) && 1593 (msg_linkprio(msg) != l_ptr->priority)) { 1594 pr_warn("%s<%s>, priority change %u->%u\n", 1595 link_rst_msg, l_ptr->name, l_ptr->priority, 1596 msg_linkprio(msg)); 1597 l_ptr->priority = msg_linkprio(msg); 1598 tipc_link_reset(l_ptr); /* Enforce change to take effect */ 1599 break; 1600 } 1601 1602 /* Record reception; force mismatch at next timeout: */ 1603 l_ptr->checkpoint--; 1604 1605 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1606 l_ptr->stats.recv_states++; 1607 if (link_reset_unknown(l_ptr)) 1608 break; 1609 1610 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) { 1611 rec_gap = mod(msg_next_sent(msg) - 1612 mod(l_ptr->next_in_no)); 1613 } 1614 1615 max_pkt_ack = msg_max_pkt(msg); 1616 if (max_pkt_ack > l_ptr->max_pkt) { 1617 l_ptr->max_pkt = max_pkt_ack; 1618 l_ptr->max_pkt_probes = 0; 1619 } 1620 1621 max_pkt_ack = 0; 1622 if (msg_probe(msg)) { 1623 l_ptr->stats.recv_probes++; 1624 if (msg_size(msg) > sizeof(l_ptr->proto_msg)) 1625 max_pkt_ack = msg_size(msg); 1626 } 1627 1628 /* Protocol message before retransmits, reduce loss risk */ 1629 if (l_ptr->owner->bclink.recv_permitted) 1630 tipc_bclink_update_link_state(l_ptr->owner, 1631 msg_last_bcast(msg)); 1632 1633 if (rec_gap || (msg_probe(msg))) { 1634 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0, 1635 0, max_pkt_ack); 1636 } 1637 if (msg_seq_gap(msg)) { 1638 l_ptr->stats.recv_nacks++; 1639 tipc_link_retransmit(l_ptr, l_ptr->first_out, 1640 msg_seq_gap(msg)); 1641 } 1642 break; 1643 } 1644 exit: 1645 kfree_skb(buf); 1646 } 1647 1648 1649 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to 1650 * a different bearer. Owner node is locked. 1651 */ 1652 static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr, 1653 struct tipc_msg *tunnel_hdr, 1654 struct tipc_msg *msg, 1655 u32 selector) 1656 { 1657 struct tipc_link *tunnel; 1658 struct sk_buff *buf; 1659 u32 length = msg_size(msg); 1660 1661 tunnel = l_ptr->owner->active_links[selector & 1]; 1662 if (!tipc_link_is_up(tunnel)) { 1663 pr_warn("%stunnel link no longer available\n", link_co_err); 1664 return; 1665 } 1666 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 1667 buf = tipc_buf_acquire(length + INT_H_SIZE); 1668 if (!buf) { 1669 pr_warn("%sunable to send tunnel msg\n", link_co_err); 1670 return; 1671 } 1672 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1673 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1674 __tipc_link_xmit(tunnel, buf); 1675 } 1676 1677 1678 /* tipc_link_failover_send_queue(): A link has gone down, but a second 1679 * link is still active. We can do failover. Tunnel the failing link's 1680 * whole send queue via the remaining link. This way, we don't lose 1681 * any packets, and sequence order is preserved for subsequent traffic 1682 * sent over the remaining link. Owner node is locked. 1683 */ 1684 void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1685 { 1686 u32 msgcount = l_ptr->out_queue_size; 1687 struct sk_buff *crs = l_ptr->first_out; 1688 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1689 struct tipc_msg tunnel_hdr; 1690 int split_bundles; 1691 1692 if (!tunnel) 1693 return; 1694 1695 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1696 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1697 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1698 msg_set_msgcnt(&tunnel_hdr, msgcount); 1699 1700 if (!l_ptr->first_out) { 1701 struct sk_buff *buf; 1702 1703 buf = tipc_buf_acquire(INT_H_SIZE); 1704 if (buf) { 1705 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); 1706 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1707 __tipc_link_xmit(tunnel, buf); 1708 } else { 1709 pr_warn("%sunable to send changeover msg\n", 1710 link_co_err); 1711 } 1712 return; 1713 } 1714 1715 split_bundles = (l_ptr->owner->active_links[0] != 1716 l_ptr->owner->active_links[1]); 1717 1718 while (crs) { 1719 struct tipc_msg *msg = buf_msg(crs); 1720 1721 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1722 struct tipc_msg *m = msg_get_wrapped(msg); 1723 unchar *pos = (unchar *)m; 1724 1725 msgcount = msg_msgcnt(msg); 1726 while (msgcount--) { 1727 msg_set_seqno(m, msg_seqno(msg)); 1728 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m, 1729 msg_link_selector(m)); 1730 pos += align(msg_size(m)); 1731 m = (struct tipc_msg *)pos; 1732 } 1733 } else { 1734 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, 1735 msg_link_selector(msg)); 1736 } 1737 crs = crs->next; 1738 } 1739 } 1740 1741 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a 1742 * duplicate of the first link's send queue via the new link. This way, we 1743 * are guaranteed that currently queued packets from a socket are delivered 1744 * before future traffic from the same socket, even if this is using the 1745 * new link. The last arriving copy of each duplicate packet is dropped at 1746 * the receiving end by the regular protocol check, so packet cardinality 1747 * and sequence order is preserved per sender/receiver socket pair. 1748 * Owner node is locked. 1749 */ 1750 void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1751 struct tipc_link *tunnel) 1752 { 1753 struct sk_buff *iter; 1754 struct tipc_msg tunnel_hdr; 1755 1756 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1757 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1758 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 1759 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1760 iter = l_ptr->first_out; 1761 while (iter) { 1762 struct sk_buff *outbuf; 1763 struct tipc_msg *msg = buf_msg(iter); 1764 u32 length = msg_size(msg); 1765 1766 if (msg_user(msg) == MSG_BUNDLER) 1767 msg_set_type(msg, CLOSED_MSG); 1768 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1769 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1770 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 1771 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 1772 if (outbuf == NULL) { 1773 pr_warn("%sunable to send duplicate msg\n", 1774 link_co_err); 1775 return; 1776 } 1777 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 1778 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 1779 length); 1780 __tipc_link_xmit(tunnel, outbuf); 1781 if (!tipc_link_is_up(l_ptr)) 1782 return; 1783 iter = iter->next; 1784 } 1785 } 1786 1787 /** 1788 * buf_extract - extracts embedded TIPC message from another message 1789 * @skb: encapsulating message buffer 1790 * @from_pos: offset to extract from 1791 * 1792 * Returns a new message buffer containing an embedded message. The 1793 * encapsulating message itself is left unchanged. 1794 */ 1795 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 1796 { 1797 struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); 1798 u32 size = msg_size(msg); 1799 struct sk_buff *eb; 1800 1801 eb = tipc_buf_acquire(size); 1802 if (eb) 1803 skb_copy_to_linear_data(eb, msg, size); 1804 return eb; 1805 } 1806 1807 1808 1809 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1810 * Owner node is locked. 1811 */ 1812 static void tipc_link_dup_rcv(struct tipc_link *l_ptr, 1813 struct sk_buff *t_buf) 1814 { 1815 struct sk_buff *buf; 1816 1817 if (!tipc_link_is_up(l_ptr)) 1818 return; 1819 1820 buf = buf_extract(t_buf, INT_H_SIZE); 1821 if (buf == NULL) { 1822 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); 1823 return; 1824 } 1825 1826 /* Add buffer to deferred queue, if applicable: */ 1827 link_handle_out_of_seq_msg(l_ptr, buf); 1828 } 1829 1830 /* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet 1831 * Owner node is locked. 1832 */ 1833 static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr, 1834 struct sk_buff *t_buf) 1835 { 1836 struct tipc_msg *t_msg = buf_msg(t_buf); 1837 struct sk_buff *buf = NULL; 1838 struct tipc_msg *msg; 1839 1840 if (tipc_link_is_up(l_ptr)) 1841 tipc_link_reset(l_ptr); 1842 1843 /* First failover packet? */ 1844 if (l_ptr->exp_msg_count == START_CHANGEOVER) 1845 l_ptr->exp_msg_count = msg_msgcnt(t_msg); 1846 1847 /* Should there be an inner packet? */ 1848 if (l_ptr->exp_msg_count) { 1849 l_ptr->exp_msg_count--; 1850 buf = buf_extract(t_buf, INT_H_SIZE); 1851 if (buf == NULL) { 1852 pr_warn("%sno inner failover pkt\n", link_co_err); 1853 goto exit; 1854 } 1855 msg = buf_msg(buf); 1856 1857 if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) { 1858 kfree_skb(buf); 1859 buf = NULL; 1860 goto exit; 1861 } 1862 if (msg_user(msg) == MSG_FRAGMENTER) { 1863 l_ptr->stats.recv_fragments++; 1864 tipc_buf_append(&l_ptr->reasm_buf, &buf); 1865 } 1866 } 1867 exit: 1868 if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) { 1869 tipc_node_detach_link(l_ptr->owner, l_ptr); 1870 kfree(l_ptr); 1871 } 1872 return buf; 1873 } 1874 1875 /* tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent 1876 * via other link as result of a failover (ORIGINAL_MSG) or 1877 * a new active link (DUPLICATE_MSG). Failover packets are 1878 * returned to the active link for delivery upwards. 1879 * Owner node is locked. 1880 */ 1881 static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr, 1882 struct sk_buff **buf) 1883 { 1884 struct sk_buff *t_buf = *buf; 1885 struct tipc_link *l_ptr; 1886 struct tipc_msg *t_msg = buf_msg(t_buf); 1887 u32 bearer_id = msg_bearer_id(t_msg); 1888 1889 *buf = NULL; 1890 1891 if (bearer_id >= MAX_BEARERS) 1892 goto exit; 1893 1894 l_ptr = n_ptr->links[bearer_id]; 1895 if (!l_ptr) 1896 goto exit; 1897 1898 if (msg_type(t_msg) == DUPLICATE_MSG) 1899 tipc_link_dup_rcv(l_ptr, t_buf); 1900 else if (msg_type(t_msg) == ORIGINAL_MSG) 1901 *buf = tipc_link_failover_rcv(l_ptr, t_buf); 1902 else 1903 pr_warn("%sunknown tunnel pkt received\n", link_co_err); 1904 exit: 1905 kfree_skb(t_buf); 1906 return *buf != NULL; 1907 } 1908 1909 /* 1910 * Bundler functionality: 1911 */ 1912 void tipc_link_bundle_rcv(struct sk_buff *buf) 1913 { 1914 u32 msgcount = msg_msgcnt(buf_msg(buf)); 1915 u32 pos = INT_H_SIZE; 1916 struct sk_buff *obuf; 1917 struct tipc_msg *omsg; 1918 1919 while (msgcount--) { 1920 obuf = buf_extract(buf, pos); 1921 if (obuf == NULL) { 1922 pr_warn("Link unable to unbundle message(s)\n"); 1923 break; 1924 } 1925 omsg = buf_msg(obuf); 1926 pos += align(msg_size(omsg)); 1927 if (msg_isdata(omsg)) { 1928 if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG)) 1929 tipc_sk_mcast_rcv(obuf); 1930 else 1931 tipc_sk_rcv(obuf); 1932 } else if (msg_user(omsg) == CONN_MANAGER) { 1933 tipc_sk_rcv(obuf); 1934 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) { 1935 tipc_named_rcv(obuf); 1936 } else { 1937 pr_warn("Illegal bundled msg: %u\n", msg_user(omsg)); 1938 kfree_skb(obuf); 1939 } 1940 } 1941 kfree_skb(buf); 1942 } 1943 1944 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 1945 { 1946 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) 1947 return; 1948 1949 l_ptr->tolerance = tolerance; 1950 l_ptr->continuity_interval = 1951 ((tolerance / 4) > 500) ? 500 : tolerance / 4; 1952 l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4); 1953 } 1954 1955 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) 1956 { 1957 /* Data messages from this node, inclusive FIRST_FRAGM */ 1958 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 1959 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 1960 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; 1961 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; 1962 /* Transiting data messages,inclusive FIRST_FRAGM */ 1963 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; 1964 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; 1965 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900; 1966 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200; 1967 l_ptr->queue_limit[CONN_MANAGER] = 1200; 1968 l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; 1969 l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; 1970 /* FRAGMENT and LAST_FRAGMENT packets */ 1971 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; 1972 } 1973 1974 /* tipc_link_find_owner - locate owner node of link by link's name 1975 * @name: pointer to link name string 1976 * @bearer_id: pointer to index in 'node->links' array where the link was found. 1977 * 1978 * Returns pointer to node owning the link, or 0 if no matching link is found. 1979 */ 1980 static struct tipc_node *tipc_link_find_owner(const char *link_name, 1981 unsigned int *bearer_id) 1982 { 1983 struct tipc_link *l_ptr; 1984 struct tipc_node *n_ptr; 1985 struct tipc_node *found_node = 0; 1986 int i; 1987 1988 *bearer_id = 0; 1989 rcu_read_lock(); 1990 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 1991 tipc_node_lock(n_ptr); 1992 for (i = 0; i < MAX_BEARERS; i++) { 1993 l_ptr = n_ptr->links[i]; 1994 if (l_ptr && !strcmp(l_ptr->name, link_name)) { 1995 *bearer_id = i; 1996 found_node = n_ptr; 1997 break; 1998 } 1999 } 2000 tipc_node_unlock(n_ptr); 2001 if (found_node) 2002 break; 2003 } 2004 rcu_read_unlock(); 2005 2006 return found_node; 2007 } 2008 2009 /** 2010 * link_value_is_valid -- validate proposed link tolerance/priority/window 2011 * 2012 * @cmd: value type (TIPC_CMD_SET_LINK_*) 2013 * @new_value: the new value 2014 * 2015 * Returns 1 if value is within range, 0 if not. 2016 */ 2017 static int link_value_is_valid(u16 cmd, u32 new_value) 2018 { 2019 switch (cmd) { 2020 case TIPC_CMD_SET_LINK_TOL: 2021 return (new_value >= TIPC_MIN_LINK_TOL) && 2022 (new_value <= TIPC_MAX_LINK_TOL); 2023 case TIPC_CMD_SET_LINK_PRI: 2024 return (new_value <= TIPC_MAX_LINK_PRI); 2025 case TIPC_CMD_SET_LINK_WINDOW: 2026 return (new_value >= TIPC_MIN_LINK_WIN) && 2027 (new_value <= TIPC_MAX_LINK_WIN); 2028 } 2029 return 0; 2030 } 2031 2032 /** 2033 * link_cmd_set_value - change priority/tolerance/window for link/bearer/media 2034 * @name: ptr to link, bearer, or media name 2035 * @new_value: new value of link, bearer, or media setting 2036 * @cmd: which link, bearer, or media attribute to set (TIPC_CMD_SET_LINK_*) 2037 * 2038 * Caller must hold RTNL lock to ensure link/bearer/media is not deleted. 2039 * 2040 * Returns 0 if value updated and negative value on error. 2041 */ 2042 static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd) 2043 { 2044 struct tipc_node *node; 2045 struct tipc_link *l_ptr; 2046 struct tipc_bearer *b_ptr; 2047 struct tipc_media *m_ptr; 2048 int bearer_id; 2049 int res = 0; 2050 2051 node = tipc_link_find_owner(name, &bearer_id); 2052 if (node) { 2053 tipc_node_lock(node); 2054 l_ptr = node->links[bearer_id]; 2055 2056 if (l_ptr) { 2057 switch (cmd) { 2058 case TIPC_CMD_SET_LINK_TOL: 2059 link_set_supervision_props(l_ptr, new_value); 2060 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 2061 new_value, 0, 0); 2062 break; 2063 case TIPC_CMD_SET_LINK_PRI: 2064 l_ptr->priority = new_value; 2065 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 2066 0, new_value, 0); 2067 break; 2068 case TIPC_CMD_SET_LINK_WINDOW: 2069 tipc_link_set_queue_limits(l_ptr, new_value); 2070 break; 2071 default: 2072 res = -EINVAL; 2073 break; 2074 } 2075 } 2076 tipc_node_unlock(node); 2077 return res; 2078 } 2079 2080 b_ptr = tipc_bearer_find(name); 2081 if (b_ptr) { 2082 switch (cmd) { 2083 case TIPC_CMD_SET_LINK_TOL: 2084 b_ptr->tolerance = new_value; 2085 break; 2086 case TIPC_CMD_SET_LINK_PRI: 2087 b_ptr->priority = new_value; 2088 break; 2089 case TIPC_CMD_SET_LINK_WINDOW: 2090 b_ptr->window = new_value; 2091 break; 2092 default: 2093 res = -EINVAL; 2094 break; 2095 } 2096 return res; 2097 } 2098 2099 m_ptr = tipc_media_find(name); 2100 if (!m_ptr) 2101 return -ENODEV; 2102 switch (cmd) { 2103 case TIPC_CMD_SET_LINK_TOL: 2104 m_ptr->tolerance = new_value; 2105 break; 2106 case TIPC_CMD_SET_LINK_PRI: 2107 m_ptr->priority = new_value; 2108 break; 2109 case TIPC_CMD_SET_LINK_WINDOW: 2110 m_ptr->window = new_value; 2111 break; 2112 default: 2113 res = -EINVAL; 2114 break; 2115 } 2116 return res; 2117 } 2118 2119 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, 2120 u16 cmd) 2121 { 2122 struct tipc_link_config *args; 2123 u32 new_value; 2124 int res; 2125 2126 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG)) 2127 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2128 2129 args = (struct tipc_link_config *)TLV_DATA(req_tlv_area); 2130 new_value = ntohl(args->value); 2131 2132 if (!link_value_is_valid(cmd, new_value)) 2133 return tipc_cfg_reply_error_string( 2134 "cannot change, value invalid"); 2135 2136 if (!strcmp(args->name, tipc_bclink_name)) { 2137 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) && 2138 (tipc_bclink_set_queue_limits(new_value) == 0)) 2139 return tipc_cfg_reply_none(); 2140 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 2141 " (cannot change setting on broadcast link)"); 2142 } 2143 2144 res = link_cmd_set_value(args->name, new_value, cmd); 2145 if (res) 2146 return tipc_cfg_reply_error_string("cannot change link setting"); 2147 2148 return tipc_cfg_reply_none(); 2149 } 2150 2151 /** 2152 * link_reset_statistics - reset link statistics 2153 * @l_ptr: pointer to link 2154 */ 2155 static void link_reset_statistics(struct tipc_link *l_ptr) 2156 { 2157 memset(&l_ptr->stats, 0, sizeof(l_ptr->stats)); 2158 l_ptr->stats.sent_info = l_ptr->next_out_no; 2159 l_ptr->stats.recv_info = l_ptr->next_in_no; 2160 } 2161 2162 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space) 2163 { 2164 char *link_name; 2165 struct tipc_link *l_ptr; 2166 struct tipc_node *node; 2167 unsigned int bearer_id; 2168 2169 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2170 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2171 2172 link_name = (char *)TLV_DATA(req_tlv_area); 2173 if (!strcmp(link_name, tipc_bclink_name)) { 2174 if (tipc_bclink_reset_stats()) 2175 return tipc_cfg_reply_error_string("link not found"); 2176 return tipc_cfg_reply_none(); 2177 } 2178 node = tipc_link_find_owner(link_name, &bearer_id); 2179 if (!node) 2180 return tipc_cfg_reply_error_string("link not found"); 2181 2182 tipc_node_lock(node); 2183 l_ptr = node->links[bearer_id]; 2184 if (!l_ptr) { 2185 tipc_node_unlock(node); 2186 return tipc_cfg_reply_error_string("link not found"); 2187 } 2188 link_reset_statistics(l_ptr); 2189 tipc_node_unlock(node); 2190 return tipc_cfg_reply_none(); 2191 } 2192 2193 /** 2194 * percent - convert count to a percentage of total (rounding up or down) 2195 */ 2196 static u32 percent(u32 count, u32 total) 2197 { 2198 return (count * 100 + (total / 2)) / total; 2199 } 2200 2201 /** 2202 * tipc_link_stats - print link statistics 2203 * @name: link name 2204 * @buf: print buffer area 2205 * @buf_size: size of print buffer area 2206 * 2207 * Returns length of print buffer data string (or 0 if error) 2208 */ 2209 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) 2210 { 2211 struct tipc_link *l; 2212 struct tipc_stats *s; 2213 struct tipc_node *node; 2214 char *status; 2215 u32 profile_total = 0; 2216 unsigned int bearer_id; 2217 int ret; 2218 2219 if (!strcmp(name, tipc_bclink_name)) 2220 return tipc_bclink_stats(buf, buf_size); 2221 2222 node = tipc_link_find_owner(name, &bearer_id); 2223 if (!node) 2224 return 0; 2225 2226 tipc_node_lock(node); 2227 2228 l = node->links[bearer_id]; 2229 if (!l) { 2230 tipc_node_unlock(node); 2231 return 0; 2232 } 2233 2234 s = &l->stats; 2235 2236 if (tipc_link_is_active(l)) 2237 status = "ACTIVE"; 2238 else if (tipc_link_is_up(l)) 2239 status = "STANDBY"; 2240 else 2241 status = "DEFUNCT"; 2242 2243 ret = tipc_snprintf(buf, buf_size, "Link <%s>\n" 2244 " %s MTU:%u Priority:%u Tolerance:%u ms" 2245 " Window:%u packets\n", 2246 l->name, status, l->max_pkt, l->priority, 2247 l->tolerance, l->queue_limit[0]); 2248 2249 ret += tipc_snprintf(buf + ret, buf_size - ret, 2250 " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 2251 l->next_in_no - s->recv_info, s->recv_fragments, 2252 s->recv_fragmented, s->recv_bundles, 2253 s->recv_bundled); 2254 2255 ret += tipc_snprintf(buf + ret, buf_size - ret, 2256 " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 2257 l->next_out_no - s->sent_info, s->sent_fragments, 2258 s->sent_fragmented, s->sent_bundles, 2259 s->sent_bundled); 2260 2261 profile_total = s->msg_length_counts; 2262 if (!profile_total) 2263 profile_total = 1; 2264 2265 ret += tipc_snprintf(buf + ret, buf_size - ret, 2266 " TX profile sample:%u packets average:%u octets\n" 2267 " 0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% " 2268 "-16384:%u%% -32768:%u%% -66000:%u%%\n", 2269 s->msg_length_counts, 2270 s->msg_lengths_total / profile_total, 2271 percent(s->msg_length_profile[0], profile_total), 2272 percent(s->msg_length_profile[1], profile_total), 2273 percent(s->msg_length_profile[2], profile_total), 2274 percent(s->msg_length_profile[3], profile_total), 2275 percent(s->msg_length_profile[4], profile_total), 2276 percent(s->msg_length_profile[5], profile_total), 2277 percent(s->msg_length_profile[6], profile_total)); 2278 2279 ret += tipc_snprintf(buf + ret, buf_size - ret, 2280 " RX states:%u probes:%u naks:%u defs:%u" 2281 " dups:%u\n", s->recv_states, s->recv_probes, 2282 s->recv_nacks, s->deferred_recv, s->duplicates); 2283 2284 ret += tipc_snprintf(buf + ret, buf_size - ret, 2285 " TX states:%u probes:%u naks:%u acks:%u" 2286 " dups:%u\n", s->sent_states, s->sent_probes, 2287 s->sent_nacks, s->sent_acks, s->retransmitted); 2288 2289 ret += tipc_snprintf(buf + ret, buf_size - ret, 2290 " Congestion link:%u Send queue" 2291 " max:%u avg:%u\n", s->link_congs, 2292 s->max_queue_sz, s->queue_sz_counts ? 2293 (s->accu_queue_sz / s->queue_sz_counts) : 0); 2294 2295 tipc_node_unlock(node); 2296 return ret; 2297 } 2298 2299 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space) 2300 { 2301 struct sk_buff *buf; 2302 struct tlv_desc *rep_tlv; 2303 int str_len; 2304 int pb_len; 2305 char *pb; 2306 2307 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2308 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2309 2310 buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN)); 2311 if (!buf) 2312 return NULL; 2313 2314 rep_tlv = (struct tlv_desc *)buf->data; 2315 pb = TLV_DATA(rep_tlv); 2316 pb_len = ULTRA_STRING_MAX_LEN; 2317 str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area), 2318 pb, pb_len); 2319 if (!str_len) { 2320 kfree_skb(buf); 2321 return tipc_cfg_reply_error_string("link not found"); 2322 } 2323 str_len += 1; /* for "\0" */ 2324 skb_put(buf, TLV_SPACE(str_len)); 2325 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 2326 2327 return buf; 2328 } 2329 2330 /** 2331 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination 2332 * @dest: network address of destination node 2333 * @selector: used to select from set of active links 2334 * 2335 * If no active link can be found, uses default maximum packet size. 2336 */ 2337 u32 tipc_link_get_max_pkt(u32 dest, u32 selector) 2338 { 2339 struct tipc_node *n_ptr; 2340 struct tipc_link *l_ptr; 2341 u32 res = MAX_PKT_DEFAULT; 2342 2343 if (dest == tipc_own_addr) 2344 return MAX_MSG_SIZE; 2345 2346 n_ptr = tipc_node_find(dest); 2347 if (n_ptr) { 2348 tipc_node_lock(n_ptr); 2349 l_ptr = n_ptr->active_links[selector & 1]; 2350 if (l_ptr) 2351 res = l_ptr->max_pkt; 2352 tipc_node_unlock(n_ptr); 2353 } 2354 return res; 2355 } 2356 2357 static void link_print(struct tipc_link *l_ptr, const char *str) 2358 { 2359 struct tipc_bearer *b_ptr; 2360 2361 rcu_read_lock(); 2362 b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]); 2363 if (b_ptr) 2364 pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name); 2365 rcu_read_unlock(); 2366 2367 if (link_working_unknown(l_ptr)) 2368 pr_cont(":WU\n"); 2369 else if (link_reset_reset(l_ptr)) 2370 pr_cont(":RR\n"); 2371 else if (link_reset_unknown(l_ptr)) 2372 pr_cont(":RU\n"); 2373 else if (link_working_working(l_ptr)) 2374 pr_cont(":WW\n"); 2375 else 2376 pr_cont("\n"); 2377 } 2378