1 /* 2 * net/tipc/link.c: TIPC link code 3 * 4 * Copyright (c) 1996-2007, Ericsson AB 5 * Copyright (c) 2004-2007, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37 #include "core.h" 38 #include "link.h" 39 #include "port.h" 40 #include "name_distr.h" 41 #include "discover.h" 42 #include "config.h" 43 44 45 /* 46 * Out-of-range value for link session numbers 47 */ 48 49 #define INVALID_SESSION 0x10000 50 51 /* 52 * Link state events: 53 */ 54 55 #define STARTING_EVT 856384768 /* link processing trigger */ 56 #define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ 57 #define TIMEOUT_EVT 560817u /* link timer expired */ 58 59 /* 60 * The following two 'message types' is really just implementation 61 * data conveniently stored in the message header. 62 * They must not be considered part of the protocol 63 */ 64 #define OPEN_MSG 0 65 #define CLOSED_MSG 1 66 67 /* 68 * State value stored in 'exp_msg_count' 69 */ 70 71 #define START_CHANGEOVER 100000u 72 73 /** 74 * struct link_name - deconstructed link name 75 * @addr_local: network address of node at this end 76 * @if_local: name of interface at this end 77 * @addr_peer: network address of node at far end 78 * @if_peer: name of interface at far end 79 */ 80 81 struct link_name { 82 u32 addr_local; 83 char if_local[TIPC_MAX_IF_NAME]; 84 u32 addr_peer; 85 char if_peer[TIPC_MAX_IF_NAME]; 86 }; 87 88 static void link_handle_out_of_seq_msg(struct link *l_ptr, 89 struct sk_buff *buf); 90 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); 91 static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); 92 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); 93 static int link_send_sections_long(struct port *sender, 94 struct iovec const *msg_sect, 95 u32 num_sect, u32 destnode); 96 static void link_check_defragm_bufs(struct link *l_ptr); 97 static void link_state_event(struct link *l_ptr, u32 event); 98 static void link_reset_statistics(struct link *l_ptr); 99 static void link_print(struct link *l_ptr, const char *str); 100 static void link_start(struct link *l_ptr); 101 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf); 102 103 /* 104 * Simple link routines 105 */ 106 107 static unsigned int align(unsigned int i) 108 { 109 return (i + 3) & ~3u; 110 } 111 112 static void link_init_max_pkt(struct link *l_ptr) 113 { 114 u32 max_pkt; 115 116 max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); 117 if (max_pkt > MAX_MSG_SIZE) 118 max_pkt = MAX_MSG_SIZE; 119 120 l_ptr->max_pkt_target = max_pkt; 121 if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) 122 l_ptr->max_pkt = l_ptr->max_pkt_target; 123 else 124 l_ptr->max_pkt = MAX_PKT_DEFAULT; 125 126 l_ptr->max_pkt_probes = 0; 127 } 128 129 static u32 link_next_sent(struct link *l_ptr) 130 { 131 if (l_ptr->next_out) 132 return msg_seqno(buf_msg(l_ptr->next_out)); 133 return mod(l_ptr->next_out_no); 134 } 135 136 static u32 link_last_sent(struct link *l_ptr) 137 { 138 return mod(link_next_sent(l_ptr) - 1); 139 } 140 141 /* 142 * Simple non-static link routines (i.e. referenced outside this file) 143 */ 144 145 int tipc_link_is_up(struct link *l_ptr) 146 { 147 if (!l_ptr) 148 return 0; 149 return link_working_working(l_ptr) || link_working_unknown(l_ptr); 150 } 151 152 int tipc_link_is_active(struct link *l_ptr) 153 { 154 return (l_ptr->owner->active_links[0] == l_ptr) || 155 (l_ptr->owner->active_links[1] == l_ptr); 156 } 157 158 /** 159 * link_name_validate - validate & (optionally) deconstruct link name 160 * @name - ptr to link name string 161 * @name_parts - ptr to area for link name components (or NULL if not needed) 162 * 163 * Returns 1 if link name is valid, otherwise 0. 164 */ 165 166 static int link_name_validate(const char *name, struct link_name *name_parts) 167 { 168 char name_copy[TIPC_MAX_LINK_NAME]; 169 char *addr_local; 170 char *if_local; 171 char *addr_peer; 172 char *if_peer; 173 char dummy; 174 u32 z_local, c_local, n_local; 175 u32 z_peer, c_peer, n_peer; 176 u32 if_local_len; 177 u32 if_peer_len; 178 179 /* copy link name & ensure length is OK */ 180 181 name_copy[TIPC_MAX_LINK_NAME - 1] = 0; 182 /* need above in case non-Posix strncpy() doesn't pad with nulls */ 183 strncpy(name_copy, name, TIPC_MAX_LINK_NAME); 184 if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0) 185 return 0; 186 187 /* ensure all component parts of link name are present */ 188 189 addr_local = name_copy; 190 if_local = strchr(addr_local, ':'); 191 if (if_local == NULL) 192 return 0; 193 *(if_local++) = 0; 194 addr_peer = strchr(if_local, '-'); 195 if (addr_peer == NULL) 196 return 0; 197 *(addr_peer++) = 0; 198 if_local_len = addr_peer - if_local; 199 if_peer = strchr(addr_peer, ':'); 200 if (if_peer == NULL) 201 return 0; 202 *(if_peer++) = 0; 203 if_peer_len = strlen(if_peer) + 1; 204 205 /* validate component parts of link name */ 206 207 if ((sscanf(addr_local, "%u.%u.%u%c", 208 &z_local, &c_local, &n_local, &dummy) != 3) || 209 (sscanf(addr_peer, "%u.%u.%u%c", 210 &z_peer, &c_peer, &n_peer, &dummy) != 3) || 211 (z_local > 255) || (c_local > 4095) || (n_local > 4095) || 212 (z_peer > 255) || (c_peer > 4095) || (n_peer > 4095) || 213 (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || 214 (if_peer_len <= 1) || (if_peer_len > TIPC_MAX_IF_NAME) || 215 (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) || 216 (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1))) 217 return 0; 218 219 /* return link name components, if necessary */ 220 221 if (name_parts) { 222 name_parts->addr_local = tipc_addr(z_local, c_local, n_local); 223 strcpy(name_parts->if_local, if_local); 224 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer); 225 strcpy(name_parts->if_peer, if_peer); 226 } 227 return 1; 228 } 229 230 /** 231 * link_timeout - handle expiration of link timer 232 * @l_ptr: pointer to link 233 * 234 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict 235 * with tipc_link_delete(). (There is no risk that the node will be deleted by 236 * another thread because tipc_link_delete() always cancels the link timer before 237 * tipc_node_delete() is called.) 238 */ 239 240 static void link_timeout(struct link *l_ptr) 241 { 242 tipc_node_lock(l_ptr->owner); 243 244 /* update counters used in statistical profiling of send traffic */ 245 246 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 247 l_ptr->stats.queue_sz_counts++; 248 249 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) 250 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; 251 252 if (l_ptr->first_out) { 253 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 254 u32 length = msg_size(msg); 255 256 if ((msg_user(msg) == MSG_FRAGMENTER) && 257 (msg_type(msg) == FIRST_FRAGMENT)) { 258 length = msg_size(msg_get_wrapped(msg)); 259 } 260 if (length) { 261 l_ptr->stats.msg_lengths_total += length; 262 l_ptr->stats.msg_length_counts++; 263 if (length <= 64) 264 l_ptr->stats.msg_length_profile[0]++; 265 else if (length <= 256) 266 l_ptr->stats.msg_length_profile[1]++; 267 else if (length <= 1024) 268 l_ptr->stats.msg_length_profile[2]++; 269 else if (length <= 4096) 270 l_ptr->stats.msg_length_profile[3]++; 271 else if (length <= 16384) 272 l_ptr->stats.msg_length_profile[4]++; 273 else if (length <= 32768) 274 l_ptr->stats.msg_length_profile[5]++; 275 else 276 l_ptr->stats.msg_length_profile[6]++; 277 } 278 } 279 280 /* do all other link processing performed on a periodic basis */ 281 282 link_check_defragm_bufs(l_ptr); 283 284 link_state_event(l_ptr, TIMEOUT_EVT); 285 286 if (l_ptr->next_out) 287 tipc_link_push_queue(l_ptr); 288 289 tipc_node_unlock(l_ptr->owner); 290 } 291 292 static void link_set_timer(struct link *l_ptr, u32 time) 293 { 294 k_start_timer(&l_ptr->timer, time); 295 } 296 297 /** 298 * tipc_link_create - create a new link 299 * @b_ptr: pointer to associated bearer 300 * @peer: network address of node at other end of link 301 * @media_addr: media address to use when sending messages over link 302 * 303 * Returns pointer to link. 304 */ 305 306 struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, 307 const struct tipc_media_addr *media_addr) 308 { 309 struct link *l_ptr; 310 struct tipc_msg *msg; 311 char *if_name; 312 313 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); 314 if (!l_ptr) { 315 warn("Link creation failed, no memory\n"); 316 return NULL; 317 } 318 319 l_ptr->addr = peer; 320 if_name = strchr(b_ptr->publ.name, ':') + 1; 321 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", 322 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 323 tipc_node(tipc_own_addr), 324 if_name, 325 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 326 /* note: peer i/f is appended to link name by reset/activate */ 327 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); 328 l_ptr->checkpoint = 1; 329 l_ptr->b_ptr = b_ptr; 330 link_set_supervision_props(l_ptr, b_ptr->media->tolerance); 331 l_ptr->state = RESET_UNKNOWN; 332 333 l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; 334 msg = l_ptr->pmsg; 335 tipc_msg_init(msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, l_ptr->addr); 336 msg_set_size(msg, sizeof(l_ptr->proto_msg)); 337 msg_set_session(msg, (tipc_random & 0xffff)); 338 msg_set_bearer_id(msg, b_ptr->identity); 339 strcpy((char *)msg_data(msg), if_name); 340 341 l_ptr->priority = b_ptr->priority; 342 tipc_link_set_queue_limits(l_ptr, b_ptr->media->window); 343 344 link_init_max_pkt(l_ptr); 345 346 l_ptr->next_out_no = 1; 347 INIT_LIST_HEAD(&l_ptr->waiting_ports); 348 349 link_reset_statistics(l_ptr); 350 351 l_ptr->owner = tipc_node_attach_link(l_ptr); 352 if (!l_ptr->owner) { 353 kfree(l_ptr); 354 return NULL; 355 } 356 357 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); 358 list_add_tail(&l_ptr->link_list, &b_ptr->links); 359 tipc_k_signal((Handler)link_start, (unsigned long)l_ptr); 360 361 return l_ptr; 362 } 363 364 /** 365 * tipc_link_delete - delete a link 366 * @l_ptr: pointer to link 367 * 368 * Note: 'tipc_net_lock' is write_locked, bearer is locked. 369 * This routine must not grab the node lock until after link timer cancellation 370 * to avoid a potential deadlock situation. 371 */ 372 373 void tipc_link_delete(struct link *l_ptr) 374 { 375 if (!l_ptr) { 376 err("Attempt to delete non-existent link\n"); 377 return; 378 } 379 380 k_cancel_timer(&l_ptr->timer); 381 382 tipc_node_lock(l_ptr->owner); 383 tipc_link_reset(l_ptr); 384 tipc_node_detach_link(l_ptr->owner, l_ptr); 385 tipc_link_stop(l_ptr); 386 list_del_init(&l_ptr->link_list); 387 tipc_node_unlock(l_ptr->owner); 388 k_term_timer(&l_ptr->timer); 389 kfree(l_ptr); 390 } 391 392 static void link_start(struct link *l_ptr) 393 { 394 link_state_event(l_ptr, STARTING_EVT); 395 } 396 397 /** 398 * link_schedule_port - schedule port for deferred sending 399 * @l_ptr: pointer to link 400 * @origport: reference to sending port 401 * @sz: amount of data to be sent 402 * 403 * Schedules port for renewed sending of messages after link congestion 404 * has abated. 405 */ 406 407 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) 408 { 409 struct port *p_ptr; 410 411 spin_lock_bh(&tipc_port_list_lock); 412 p_ptr = tipc_port_lock(origport); 413 if (p_ptr) { 414 if (!p_ptr->wakeup) 415 goto exit; 416 if (!list_empty(&p_ptr->wait_list)) 417 goto exit; 418 p_ptr->publ.congested = 1; 419 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 420 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 421 l_ptr->stats.link_congs++; 422 exit: 423 tipc_port_unlock(p_ptr); 424 } 425 spin_unlock_bh(&tipc_port_list_lock); 426 return -ELINKCONG; 427 } 428 429 void tipc_link_wakeup_ports(struct link *l_ptr, int all) 430 { 431 struct port *p_ptr; 432 struct port *temp_p_ptr; 433 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 434 435 if (all) 436 win = 100000; 437 if (win <= 0) 438 return; 439 if (!spin_trylock_bh(&tipc_port_list_lock)) 440 return; 441 if (link_congested(l_ptr)) 442 goto exit; 443 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, 444 wait_list) { 445 if (win <= 0) 446 break; 447 list_del_init(&p_ptr->wait_list); 448 spin_lock_bh(p_ptr->publ.lock); 449 p_ptr->publ.congested = 0; 450 p_ptr->wakeup(&p_ptr->publ); 451 win -= p_ptr->waiting_pkts; 452 spin_unlock_bh(p_ptr->publ.lock); 453 } 454 455 exit: 456 spin_unlock_bh(&tipc_port_list_lock); 457 } 458 459 /** 460 * link_release_outqueue - purge link's outbound message queue 461 * @l_ptr: pointer to link 462 */ 463 464 static void link_release_outqueue(struct link *l_ptr) 465 { 466 struct sk_buff *buf = l_ptr->first_out; 467 struct sk_buff *next; 468 469 while (buf) { 470 next = buf->next; 471 buf_discard(buf); 472 buf = next; 473 } 474 l_ptr->first_out = NULL; 475 l_ptr->out_queue_size = 0; 476 } 477 478 /** 479 * tipc_link_reset_fragments - purge link's inbound message fragments queue 480 * @l_ptr: pointer to link 481 */ 482 483 void tipc_link_reset_fragments(struct link *l_ptr) 484 { 485 struct sk_buff *buf = l_ptr->defragm_buf; 486 struct sk_buff *next; 487 488 while (buf) { 489 next = buf->next; 490 buf_discard(buf); 491 buf = next; 492 } 493 l_ptr->defragm_buf = NULL; 494 } 495 496 /** 497 * tipc_link_stop - purge all inbound and outbound messages associated with link 498 * @l_ptr: pointer to link 499 */ 500 501 void tipc_link_stop(struct link *l_ptr) 502 { 503 struct sk_buff *buf; 504 struct sk_buff *next; 505 506 buf = l_ptr->oldest_deferred_in; 507 while (buf) { 508 next = buf->next; 509 buf_discard(buf); 510 buf = next; 511 } 512 513 buf = l_ptr->first_out; 514 while (buf) { 515 next = buf->next; 516 buf_discard(buf); 517 buf = next; 518 } 519 520 tipc_link_reset_fragments(l_ptr); 521 522 buf_discard(l_ptr->proto_msg_queue); 523 l_ptr->proto_msg_queue = NULL; 524 } 525 526 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */ 527 #define link_send_event(fcn, l_ptr, up) do { } while (0) 528 529 void tipc_link_reset(struct link *l_ptr) 530 { 531 struct sk_buff *buf; 532 u32 prev_state = l_ptr->state; 533 u32 checkpoint = l_ptr->next_in_no; 534 int was_active_link = tipc_link_is_active(l_ptr); 535 536 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 537 538 /* Link is down, accept any session */ 539 l_ptr->peer_session = INVALID_SESSION; 540 541 /* Prepare for max packet size negotiation */ 542 link_init_max_pkt(l_ptr); 543 544 l_ptr->state = RESET_UNKNOWN; 545 546 if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) 547 return; 548 549 tipc_node_link_down(l_ptr->owner, l_ptr); 550 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); 551 552 if (was_active_link && tipc_node_has_active_links(l_ptr->owner) && 553 l_ptr->owner->permit_changeover) { 554 l_ptr->reset_checkpoint = checkpoint; 555 l_ptr->exp_msg_count = START_CHANGEOVER; 556 } 557 558 /* Clean up all queues: */ 559 560 link_release_outqueue(l_ptr); 561 buf_discard(l_ptr->proto_msg_queue); 562 l_ptr->proto_msg_queue = NULL; 563 buf = l_ptr->oldest_deferred_in; 564 while (buf) { 565 struct sk_buff *next = buf->next; 566 buf_discard(buf); 567 buf = next; 568 } 569 if (!list_empty(&l_ptr->waiting_ports)) 570 tipc_link_wakeup_ports(l_ptr, 1); 571 572 l_ptr->retransm_queue_head = 0; 573 l_ptr->retransm_queue_size = 0; 574 l_ptr->last_out = NULL; 575 l_ptr->first_out = NULL; 576 l_ptr->next_out = NULL; 577 l_ptr->unacked_window = 0; 578 l_ptr->checkpoint = 1; 579 l_ptr->next_out_no = 1; 580 l_ptr->deferred_inqueue_sz = 0; 581 l_ptr->oldest_deferred_in = NULL; 582 l_ptr->newest_deferred_in = NULL; 583 l_ptr->fsm_msg_cnt = 0; 584 l_ptr->stale_count = 0; 585 link_reset_statistics(l_ptr); 586 587 link_send_event(tipc_cfg_link_event, l_ptr, 0); 588 if (!in_own_cluster(l_ptr->addr)) 589 link_send_event(tipc_disc_link_event, l_ptr, 0); 590 } 591 592 593 static void link_activate(struct link *l_ptr) 594 { 595 l_ptr->next_in_no = l_ptr->stats.recv_info = 1; 596 tipc_node_link_up(l_ptr->owner, l_ptr); 597 tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); 598 link_send_event(tipc_cfg_link_event, l_ptr, 1); 599 if (!in_own_cluster(l_ptr->addr)) 600 link_send_event(tipc_disc_link_event, l_ptr, 1); 601 } 602 603 /** 604 * link_state_event - link finite state machine 605 * @l_ptr: pointer to link 606 * @event: state machine event to process 607 */ 608 609 static void link_state_event(struct link *l_ptr, unsigned event) 610 { 611 struct link *other; 612 u32 cont_intv = l_ptr->continuity_interval; 613 614 if (!l_ptr->started && (event != STARTING_EVT)) 615 return; /* Not yet. */ 616 617 if (link_blocked(l_ptr)) { 618 if (event == TIMEOUT_EVT) 619 link_set_timer(l_ptr, cont_intv); 620 return; /* Changeover going on */ 621 } 622 623 switch (l_ptr->state) { 624 case WORKING_WORKING: 625 switch (event) { 626 case TRAFFIC_MSG_EVT: 627 case ACTIVATE_MSG: 628 break; 629 case TIMEOUT_EVT: 630 if (l_ptr->next_in_no != l_ptr->checkpoint) { 631 l_ptr->checkpoint = l_ptr->next_in_no; 632 if (tipc_bclink_acks_missing(l_ptr->owner)) { 633 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 634 0, 0, 0, 0, 0); 635 l_ptr->fsm_msg_cnt++; 636 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { 637 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 638 1, 0, 0, 0, 0); 639 l_ptr->fsm_msg_cnt++; 640 } 641 link_set_timer(l_ptr, cont_intv); 642 break; 643 } 644 l_ptr->state = WORKING_UNKNOWN; 645 l_ptr->fsm_msg_cnt = 0; 646 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 647 l_ptr->fsm_msg_cnt++; 648 link_set_timer(l_ptr, cont_intv / 4); 649 break; 650 case RESET_MSG: 651 info("Resetting link <%s>, requested by peer\n", 652 l_ptr->name); 653 tipc_link_reset(l_ptr); 654 l_ptr->state = RESET_RESET; 655 l_ptr->fsm_msg_cnt = 0; 656 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 657 l_ptr->fsm_msg_cnt++; 658 link_set_timer(l_ptr, cont_intv); 659 break; 660 default: 661 err("Unknown link event %u in WW state\n", event); 662 } 663 break; 664 case WORKING_UNKNOWN: 665 switch (event) { 666 case TRAFFIC_MSG_EVT: 667 case ACTIVATE_MSG: 668 l_ptr->state = WORKING_WORKING; 669 l_ptr->fsm_msg_cnt = 0; 670 link_set_timer(l_ptr, cont_intv); 671 break; 672 case RESET_MSG: 673 info("Resetting link <%s>, requested by peer " 674 "while probing\n", l_ptr->name); 675 tipc_link_reset(l_ptr); 676 l_ptr->state = RESET_RESET; 677 l_ptr->fsm_msg_cnt = 0; 678 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 679 l_ptr->fsm_msg_cnt++; 680 link_set_timer(l_ptr, cont_intv); 681 break; 682 case TIMEOUT_EVT: 683 if (l_ptr->next_in_no != l_ptr->checkpoint) { 684 l_ptr->state = WORKING_WORKING; 685 l_ptr->fsm_msg_cnt = 0; 686 l_ptr->checkpoint = l_ptr->next_in_no; 687 if (tipc_bclink_acks_missing(l_ptr->owner)) { 688 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 689 0, 0, 0, 0, 0); 690 l_ptr->fsm_msg_cnt++; 691 } 692 link_set_timer(l_ptr, cont_intv); 693 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { 694 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 695 1, 0, 0, 0, 0); 696 l_ptr->fsm_msg_cnt++; 697 link_set_timer(l_ptr, cont_intv / 4); 698 } else { /* Link has failed */ 699 warn("Resetting link <%s>, peer not responding\n", 700 l_ptr->name); 701 tipc_link_reset(l_ptr); 702 l_ptr->state = RESET_UNKNOWN; 703 l_ptr->fsm_msg_cnt = 0; 704 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 705 0, 0, 0, 0, 0); 706 l_ptr->fsm_msg_cnt++; 707 link_set_timer(l_ptr, cont_intv); 708 } 709 break; 710 default: 711 err("Unknown link event %u in WU state\n", event); 712 } 713 break; 714 case RESET_UNKNOWN: 715 switch (event) { 716 case TRAFFIC_MSG_EVT: 717 break; 718 case ACTIVATE_MSG: 719 other = l_ptr->owner->active_links[0]; 720 if (other && link_working_unknown(other)) 721 break; 722 l_ptr->state = WORKING_WORKING; 723 l_ptr->fsm_msg_cnt = 0; 724 link_activate(l_ptr); 725 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 726 l_ptr->fsm_msg_cnt++; 727 link_set_timer(l_ptr, cont_intv); 728 break; 729 case RESET_MSG: 730 l_ptr->state = RESET_RESET; 731 l_ptr->fsm_msg_cnt = 0; 732 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); 733 l_ptr->fsm_msg_cnt++; 734 link_set_timer(l_ptr, cont_intv); 735 break; 736 case STARTING_EVT: 737 l_ptr->started = 1; 738 /* fall through */ 739 case TIMEOUT_EVT: 740 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); 741 l_ptr->fsm_msg_cnt++; 742 link_set_timer(l_ptr, cont_intv); 743 break; 744 default: 745 err("Unknown link event %u in RU state\n", event); 746 } 747 break; 748 case RESET_RESET: 749 switch (event) { 750 case TRAFFIC_MSG_EVT: 751 case ACTIVATE_MSG: 752 other = l_ptr->owner->active_links[0]; 753 if (other && link_working_unknown(other)) 754 break; 755 l_ptr->state = WORKING_WORKING; 756 l_ptr->fsm_msg_cnt = 0; 757 link_activate(l_ptr); 758 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 759 l_ptr->fsm_msg_cnt++; 760 link_set_timer(l_ptr, cont_intv); 761 break; 762 case RESET_MSG: 763 break; 764 case TIMEOUT_EVT: 765 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 766 l_ptr->fsm_msg_cnt++; 767 link_set_timer(l_ptr, cont_intv); 768 break; 769 default: 770 err("Unknown link event %u in RR state\n", event); 771 } 772 break; 773 default: 774 err("Unknown link state %u/%u\n", l_ptr->state, event); 775 } 776 } 777 778 /* 779 * link_bundle_buf(): Append contents of a buffer to 780 * the tail of an existing one. 781 */ 782 783 static int link_bundle_buf(struct link *l_ptr, 784 struct sk_buff *bundler, 785 struct sk_buff *buf) 786 { 787 struct tipc_msg *bundler_msg = buf_msg(bundler); 788 struct tipc_msg *msg = buf_msg(buf); 789 u32 size = msg_size(msg); 790 u32 bundle_size = msg_size(bundler_msg); 791 u32 to_pos = align(bundle_size); 792 u32 pad = to_pos - bundle_size; 793 794 if (msg_user(bundler_msg) != MSG_BUNDLER) 795 return 0; 796 if (msg_type(bundler_msg) != OPEN_MSG) 797 return 0; 798 if (skb_tailroom(bundler) < (pad + size)) 799 return 0; 800 if (l_ptr->max_pkt < (to_pos + size)) 801 return 0; 802 803 skb_put(bundler, pad + size); 804 skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); 805 msg_set_size(bundler_msg, to_pos + size); 806 msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); 807 buf_discard(buf); 808 l_ptr->stats.sent_bundled++; 809 return 1; 810 } 811 812 static void link_add_to_outqueue(struct link *l_ptr, 813 struct sk_buff *buf, 814 struct tipc_msg *msg) 815 { 816 u32 ack = mod(l_ptr->next_in_no - 1); 817 u32 seqno = mod(l_ptr->next_out_no++); 818 819 msg_set_word(msg, 2, ((ack << 16) | seqno)); 820 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 821 buf->next = NULL; 822 if (l_ptr->first_out) { 823 l_ptr->last_out->next = buf; 824 l_ptr->last_out = buf; 825 } else 826 l_ptr->first_out = l_ptr->last_out = buf; 827 l_ptr->out_queue_size++; 828 } 829 830 /* 831 * tipc_link_send_buf() is the 'full path' for messages, called from 832 * inside TIPC when the 'fast path' in tipc_send_buf 833 * has failed, and from link_send() 834 */ 835 836 int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf) 837 { 838 struct tipc_msg *msg = buf_msg(buf); 839 u32 size = msg_size(msg); 840 u32 dsz = msg_data_sz(msg); 841 u32 queue_size = l_ptr->out_queue_size; 842 u32 imp = tipc_msg_tot_importance(msg); 843 u32 queue_limit = l_ptr->queue_limit[imp]; 844 u32 max_packet = l_ptr->max_pkt; 845 846 msg_set_prevnode(msg, tipc_own_addr); /* If routed message */ 847 848 /* Match msg importance against queue limits: */ 849 850 if (unlikely(queue_size >= queue_limit)) { 851 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 852 return link_schedule_port(l_ptr, msg_origport(msg), 853 size); 854 } 855 buf_discard(buf); 856 if (imp > CONN_MANAGER) { 857 warn("Resetting link <%s>, send queue full", l_ptr->name); 858 tipc_link_reset(l_ptr); 859 } 860 return dsz; 861 } 862 863 /* Fragmentation needed ? */ 864 865 if (size > max_packet) 866 return link_send_long_buf(l_ptr, buf); 867 868 /* Packet can be queued or sent: */ 869 870 if (queue_size > l_ptr->stats.max_queue_sz) 871 l_ptr->stats.max_queue_sz = queue_size; 872 873 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 874 !link_congested(l_ptr))) { 875 link_add_to_outqueue(l_ptr, buf, msg); 876 877 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { 878 l_ptr->unacked_window = 0; 879 } else { 880 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 881 l_ptr->stats.bearer_congs++; 882 l_ptr->next_out = buf; 883 } 884 return dsz; 885 } 886 /* Congestion: can message be bundled ?: */ 887 888 if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && 889 (msg_user(msg) != MSG_FRAGMENTER)) { 890 891 /* Try adding message to an existing bundle */ 892 893 if (l_ptr->next_out && 894 link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { 895 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 896 return dsz; 897 } 898 899 /* Try creating a new bundle */ 900 901 if (size <= max_packet * 2 / 3) { 902 struct sk_buff *bundler = tipc_buf_acquire(max_packet); 903 struct tipc_msg bundler_hdr; 904 905 if (bundler) { 906 tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, 907 INT_H_SIZE, l_ptr->addr); 908 skb_copy_to_linear_data(bundler, &bundler_hdr, 909 INT_H_SIZE); 910 skb_trim(bundler, INT_H_SIZE); 911 link_bundle_buf(l_ptr, bundler, buf); 912 buf = bundler; 913 msg = buf_msg(buf); 914 l_ptr->stats.sent_bundles++; 915 } 916 } 917 } 918 if (!l_ptr->next_out) 919 l_ptr->next_out = buf; 920 link_add_to_outqueue(l_ptr, buf, msg); 921 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 922 return dsz; 923 } 924 925 /* 926 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has 927 * not been selected yet, and the the owner node is not locked 928 * Called by TIPC internal users, e.g. the name distributor 929 */ 930 931 int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) 932 { 933 struct link *l_ptr; 934 struct tipc_node *n_ptr; 935 int res = -ELINKCONG; 936 937 read_lock_bh(&tipc_net_lock); 938 n_ptr = tipc_node_find(dest); 939 if (n_ptr) { 940 tipc_node_lock(n_ptr); 941 l_ptr = n_ptr->active_links[selector & 1]; 942 if (l_ptr) 943 res = tipc_link_send_buf(l_ptr, buf); 944 else 945 buf_discard(buf); 946 tipc_node_unlock(n_ptr); 947 } else { 948 buf_discard(buf); 949 } 950 read_unlock_bh(&tipc_net_lock); 951 return res; 952 } 953 954 /* 955 * link_send_buf_fast: Entry for data messages where the 956 * destination link is known and the header is complete, 957 * inclusive total message length. Very time critical. 958 * Link is locked. Returns user data length. 959 */ 960 961 static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf, 962 u32 *used_max_pkt) 963 { 964 struct tipc_msg *msg = buf_msg(buf); 965 int res = msg_data_sz(msg); 966 967 if (likely(!link_congested(l_ptr))) { 968 if (likely(msg_size(msg) <= l_ptr->max_pkt)) { 969 if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { 970 link_add_to_outqueue(l_ptr, buf, msg); 971 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, 972 &l_ptr->media_addr))) { 973 l_ptr->unacked_window = 0; 974 return res; 975 } 976 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 977 l_ptr->stats.bearer_congs++; 978 l_ptr->next_out = buf; 979 return res; 980 } 981 } else 982 *used_max_pkt = l_ptr->max_pkt; 983 } 984 return tipc_link_send_buf(l_ptr, buf); /* All other cases */ 985 } 986 987 /* 988 * tipc_send_buf_fast: Entry for data messages where the 989 * destination node is known and the header is complete, 990 * inclusive total message length. 991 * Returns user data length. 992 */ 993 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) 994 { 995 struct link *l_ptr; 996 struct tipc_node *n_ptr; 997 int res; 998 u32 selector = msg_origport(buf_msg(buf)) & 1; 999 u32 dummy; 1000 1001 if (destnode == tipc_own_addr) 1002 return tipc_port_recv_msg(buf); 1003 1004 read_lock_bh(&tipc_net_lock); 1005 n_ptr = tipc_node_find(destnode); 1006 if (likely(n_ptr)) { 1007 tipc_node_lock(n_ptr); 1008 l_ptr = n_ptr->active_links[selector]; 1009 if (likely(l_ptr)) { 1010 res = link_send_buf_fast(l_ptr, buf, &dummy); 1011 tipc_node_unlock(n_ptr); 1012 read_unlock_bh(&tipc_net_lock); 1013 return res; 1014 } 1015 tipc_node_unlock(n_ptr); 1016 } 1017 read_unlock_bh(&tipc_net_lock); 1018 res = msg_data_sz(buf_msg(buf)); 1019 tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1020 return res; 1021 } 1022 1023 1024 /* 1025 * tipc_link_send_sections_fast: Entry for messages where the 1026 * destination processor is known and the header is complete, 1027 * except for total message length. 1028 * Returns user data length or errno. 1029 */ 1030 int tipc_link_send_sections_fast(struct port *sender, 1031 struct iovec const *msg_sect, 1032 const u32 num_sect, 1033 u32 destaddr) 1034 { 1035 struct tipc_msg *hdr = &sender->publ.phdr; 1036 struct link *l_ptr; 1037 struct sk_buff *buf; 1038 struct tipc_node *node; 1039 int res; 1040 u32 selector = msg_origport(hdr) & 1; 1041 1042 again: 1043 /* 1044 * Try building message using port's max_pkt hint. 1045 * (Must not hold any locks while building message.) 1046 */ 1047 1048 res = tipc_msg_build(hdr, msg_sect, num_sect, sender->publ.max_pkt, 1049 !sender->user_port, &buf); 1050 1051 read_lock_bh(&tipc_net_lock); 1052 node = tipc_node_find(destaddr); 1053 if (likely(node)) { 1054 tipc_node_lock(node); 1055 l_ptr = node->active_links[selector]; 1056 if (likely(l_ptr)) { 1057 if (likely(buf)) { 1058 res = link_send_buf_fast(l_ptr, buf, 1059 &sender->publ.max_pkt); 1060 if (unlikely(res < 0)) 1061 buf_discard(buf); 1062 exit: 1063 tipc_node_unlock(node); 1064 read_unlock_bh(&tipc_net_lock); 1065 return res; 1066 } 1067 1068 /* Exit if build request was invalid */ 1069 1070 if (unlikely(res < 0)) 1071 goto exit; 1072 1073 /* Exit if link (or bearer) is congested */ 1074 1075 if (link_congested(l_ptr) || 1076 !list_empty(&l_ptr->b_ptr->cong_links)) { 1077 res = link_schedule_port(l_ptr, 1078 sender->publ.ref, res); 1079 goto exit; 1080 } 1081 1082 /* 1083 * Message size exceeds max_pkt hint; update hint, 1084 * then re-try fast path or fragment the message 1085 */ 1086 1087 sender->publ.max_pkt = l_ptr->max_pkt; 1088 tipc_node_unlock(node); 1089 read_unlock_bh(&tipc_net_lock); 1090 1091 1092 if ((msg_hdr_sz(hdr) + res) <= sender->publ.max_pkt) 1093 goto again; 1094 1095 return link_send_sections_long(sender, msg_sect, 1096 num_sect, destaddr); 1097 } 1098 tipc_node_unlock(node); 1099 } 1100 read_unlock_bh(&tipc_net_lock); 1101 1102 /* Couldn't find a link to the destination node */ 1103 1104 if (buf) 1105 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1106 if (res >= 0) 1107 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1108 TIPC_ERR_NO_NODE); 1109 return res; 1110 } 1111 1112 /* 1113 * link_send_sections_long(): Entry for long messages where the 1114 * destination node is known and the header is complete, 1115 * inclusive total message length. 1116 * Link and bearer congestion status have been checked to be ok, 1117 * and are ignored if they change. 1118 * 1119 * Note that fragments do not use the full link MTU so that they won't have 1120 * to undergo refragmentation if link changeover causes them to be sent 1121 * over another link with an additional tunnel header added as prefix. 1122 * (Refragmentation will still occur if the other link has a smaller MTU.) 1123 * 1124 * Returns user data length or errno. 1125 */ 1126 static int link_send_sections_long(struct port *sender, 1127 struct iovec const *msg_sect, 1128 u32 num_sect, 1129 u32 destaddr) 1130 { 1131 struct link *l_ptr; 1132 struct tipc_node *node; 1133 struct tipc_msg *hdr = &sender->publ.phdr; 1134 u32 dsz = msg_data_sz(hdr); 1135 u32 max_pkt, fragm_sz, rest; 1136 struct tipc_msg fragm_hdr; 1137 struct sk_buff *buf, *buf_chain, *prev; 1138 u32 fragm_crs, fragm_rest, hsz, sect_rest; 1139 const unchar *sect_crs; 1140 int curr_sect; 1141 u32 fragm_no; 1142 1143 again: 1144 fragm_no = 1; 1145 max_pkt = sender->publ.max_pkt - INT_H_SIZE; 1146 /* leave room for tunnel header in case of link changeover */ 1147 fragm_sz = max_pkt - INT_H_SIZE; 1148 /* leave room for fragmentation header in each fragment */ 1149 rest = dsz; 1150 fragm_crs = 0; 1151 fragm_rest = 0; 1152 sect_rest = 0; 1153 sect_crs = NULL; 1154 curr_sect = -1; 1155 1156 /* Prepare reusable fragment header: */ 1157 1158 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 1159 INT_H_SIZE, msg_destnode(hdr)); 1160 msg_set_link_selector(&fragm_hdr, sender->publ.ref); 1161 msg_set_size(&fragm_hdr, max_pkt); 1162 msg_set_fragm_no(&fragm_hdr, 1); 1163 1164 /* Prepare header of first fragment: */ 1165 1166 buf_chain = buf = tipc_buf_acquire(max_pkt); 1167 if (!buf) 1168 return -ENOMEM; 1169 buf->next = NULL; 1170 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1171 hsz = msg_hdr_sz(hdr); 1172 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); 1173 1174 /* Chop up message: */ 1175 1176 fragm_crs = INT_H_SIZE + hsz; 1177 fragm_rest = fragm_sz - hsz; 1178 1179 do { /* For all sections */ 1180 u32 sz; 1181 1182 if (!sect_rest) { 1183 sect_rest = msg_sect[++curr_sect].iov_len; 1184 sect_crs = (const unchar *)msg_sect[curr_sect].iov_base; 1185 } 1186 1187 if (sect_rest < fragm_rest) 1188 sz = sect_rest; 1189 else 1190 sz = fragm_rest; 1191 1192 if (likely(!sender->user_port)) { 1193 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { 1194 error: 1195 for (; buf_chain; buf_chain = buf) { 1196 buf = buf_chain->next; 1197 buf_discard(buf_chain); 1198 } 1199 return -EFAULT; 1200 } 1201 } else 1202 skb_copy_to_linear_data_offset(buf, fragm_crs, 1203 sect_crs, sz); 1204 sect_crs += sz; 1205 sect_rest -= sz; 1206 fragm_crs += sz; 1207 fragm_rest -= sz; 1208 rest -= sz; 1209 1210 if (!fragm_rest && rest) { 1211 1212 /* Initiate new fragment: */ 1213 if (rest <= fragm_sz) { 1214 fragm_sz = rest; 1215 msg_set_type(&fragm_hdr, LAST_FRAGMENT); 1216 } else { 1217 msg_set_type(&fragm_hdr, FRAGMENT); 1218 } 1219 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 1220 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 1221 prev = buf; 1222 buf = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 1223 if (!buf) 1224 goto error; 1225 1226 buf->next = NULL; 1227 prev->next = buf; 1228 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1229 fragm_crs = INT_H_SIZE; 1230 fragm_rest = fragm_sz; 1231 } 1232 } while (rest > 0); 1233 1234 /* 1235 * Now we have a buffer chain. Select a link and check 1236 * that packet size is still OK 1237 */ 1238 node = tipc_node_find(destaddr); 1239 if (likely(node)) { 1240 tipc_node_lock(node); 1241 l_ptr = node->active_links[sender->publ.ref & 1]; 1242 if (!l_ptr) { 1243 tipc_node_unlock(node); 1244 goto reject; 1245 } 1246 if (l_ptr->max_pkt < max_pkt) { 1247 sender->publ.max_pkt = l_ptr->max_pkt; 1248 tipc_node_unlock(node); 1249 for (; buf_chain; buf_chain = buf) { 1250 buf = buf_chain->next; 1251 buf_discard(buf_chain); 1252 } 1253 goto again; 1254 } 1255 } else { 1256 reject: 1257 for (; buf_chain; buf_chain = buf) { 1258 buf = buf_chain->next; 1259 buf_discard(buf_chain); 1260 } 1261 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1262 TIPC_ERR_NO_NODE); 1263 } 1264 1265 /* Append whole chain to send queue: */ 1266 1267 buf = buf_chain; 1268 l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); 1269 if (!l_ptr->next_out) 1270 l_ptr->next_out = buf_chain; 1271 l_ptr->stats.sent_fragmented++; 1272 while (buf) { 1273 struct sk_buff *next = buf->next; 1274 struct tipc_msg *msg = buf_msg(buf); 1275 1276 l_ptr->stats.sent_fragments++; 1277 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); 1278 link_add_to_outqueue(l_ptr, buf, msg); 1279 buf = next; 1280 } 1281 1282 /* Send it, if possible: */ 1283 1284 tipc_link_push_queue(l_ptr); 1285 tipc_node_unlock(node); 1286 return dsz; 1287 } 1288 1289 /* 1290 * tipc_link_push_packet: Push one unsent packet to the media 1291 */ 1292 u32 tipc_link_push_packet(struct link *l_ptr) 1293 { 1294 struct sk_buff *buf = l_ptr->first_out; 1295 u32 r_q_size = l_ptr->retransm_queue_size; 1296 u32 r_q_head = l_ptr->retransm_queue_head; 1297 1298 /* Step to position where retransmission failed, if any, */ 1299 /* consider that buffers may have been released in meantime */ 1300 1301 if (r_q_size && buf) { 1302 u32 last = lesser(mod(r_q_head + r_q_size), 1303 link_last_sent(l_ptr)); 1304 u32 first = msg_seqno(buf_msg(buf)); 1305 1306 while (buf && less(first, r_q_head)) { 1307 first = mod(first + 1); 1308 buf = buf->next; 1309 } 1310 l_ptr->retransm_queue_head = r_q_head = first; 1311 l_ptr->retransm_queue_size = r_q_size = mod(last - first); 1312 } 1313 1314 /* Continue retransmission now, if there is anything: */ 1315 1316 if (r_q_size && buf) { 1317 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1318 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1319 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1320 l_ptr->retransm_queue_head = mod(++r_q_head); 1321 l_ptr->retransm_queue_size = --r_q_size; 1322 l_ptr->stats.retransmitted++; 1323 return 0; 1324 } else { 1325 l_ptr->stats.bearer_congs++; 1326 return PUSH_FAILED; 1327 } 1328 } 1329 1330 /* Send deferred protocol message, if any: */ 1331 1332 buf = l_ptr->proto_msg_queue; 1333 if (buf) { 1334 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1335 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1336 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1337 l_ptr->unacked_window = 0; 1338 buf_discard(buf); 1339 l_ptr->proto_msg_queue = NULL; 1340 return 0; 1341 } else { 1342 l_ptr->stats.bearer_congs++; 1343 return PUSH_FAILED; 1344 } 1345 } 1346 1347 /* Send one deferred data message, if send window not full: */ 1348 1349 buf = l_ptr->next_out; 1350 if (buf) { 1351 struct tipc_msg *msg = buf_msg(buf); 1352 u32 next = msg_seqno(msg); 1353 u32 first = msg_seqno(buf_msg(l_ptr->first_out)); 1354 1355 if (mod(next - first) < l_ptr->queue_limit[0]) { 1356 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1357 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1358 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1359 if (msg_user(msg) == MSG_BUNDLER) 1360 msg_set_type(msg, CLOSED_MSG); 1361 l_ptr->next_out = buf->next; 1362 return 0; 1363 } else { 1364 l_ptr->stats.bearer_congs++; 1365 return PUSH_FAILED; 1366 } 1367 } 1368 } 1369 return PUSH_FINISHED; 1370 } 1371 1372 /* 1373 * push_queue(): push out the unsent messages of a link where 1374 * congestion has abated. Node is locked 1375 */ 1376 void tipc_link_push_queue(struct link *l_ptr) 1377 { 1378 u32 res; 1379 1380 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) 1381 return; 1382 1383 do { 1384 res = tipc_link_push_packet(l_ptr); 1385 } while (!res); 1386 1387 if (res == PUSH_FAILED) 1388 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1389 } 1390 1391 static void link_reset_all(unsigned long addr) 1392 { 1393 struct tipc_node *n_ptr; 1394 char addr_string[16]; 1395 u32 i; 1396 1397 read_lock_bh(&tipc_net_lock); 1398 n_ptr = tipc_node_find((u32)addr); 1399 if (!n_ptr) { 1400 read_unlock_bh(&tipc_net_lock); 1401 return; /* node no longer exists */ 1402 } 1403 1404 tipc_node_lock(n_ptr); 1405 1406 warn("Resetting all links to %s\n", 1407 tipc_addr_string_fill(addr_string, n_ptr->addr)); 1408 1409 for (i = 0; i < MAX_BEARERS; i++) { 1410 if (n_ptr->links[i]) { 1411 link_print(n_ptr->links[i], "Resetting link\n"); 1412 tipc_link_reset(n_ptr->links[i]); 1413 } 1414 } 1415 1416 tipc_node_unlock(n_ptr); 1417 read_unlock_bh(&tipc_net_lock); 1418 } 1419 1420 static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf) 1421 { 1422 struct tipc_msg *msg = buf_msg(buf); 1423 1424 warn("Retransmission failure on link <%s>\n", l_ptr->name); 1425 1426 if (l_ptr->addr) { 1427 1428 /* Handle failure on standard link */ 1429 1430 link_print(l_ptr, "Resetting link\n"); 1431 tipc_link_reset(l_ptr); 1432 1433 } else { 1434 1435 /* Handle failure on broadcast link */ 1436 1437 struct tipc_node *n_ptr; 1438 char addr_string[16]; 1439 1440 info("Msg seq number: %u, ", msg_seqno(msg)); 1441 info("Outstanding acks: %lu\n", 1442 (unsigned long) TIPC_SKB_CB(buf)->handle); 1443 1444 n_ptr = l_ptr->owner->next; 1445 tipc_node_lock(n_ptr); 1446 1447 tipc_addr_string_fill(addr_string, n_ptr->addr); 1448 info("Multicast link info for %s\n", addr_string); 1449 info("Supported: %d, ", n_ptr->bclink.supported); 1450 info("Acked: %u\n", n_ptr->bclink.acked); 1451 info("Last in: %u, ", n_ptr->bclink.last_in); 1452 info("Gap after: %u, ", n_ptr->bclink.gap_after); 1453 info("Gap to: %u\n", n_ptr->bclink.gap_to); 1454 info("Nack sync: %u\n\n", n_ptr->bclink.nack_sync); 1455 1456 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); 1457 1458 tipc_node_unlock(n_ptr); 1459 1460 l_ptr->stale_count = 0; 1461 } 1462 } 1463 1464 void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf, 1465 u32 retransmits) 1466 { 1467 struct tipc_msg *msg; 1468 1469 if (!buf) 1470 return; 1471 1472 msg = buf_msg(buf); 1473 1474 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1475 if (l_ptr->retransm_queue_size == 0) { 1476 l_ptr->retransm_queue_head = msg_seqno(msg); 1477 l_ptr->retransm_queue_size = retransmits; 1478 } else { 1479 err("Unexpected retransmit on link %s (qsize=%d)\n", 1480 l_ptr->name, l_ptr->retransm_queue_size); 1481 } 1482 return; 1483 } else { 1484 /* Detect repeated retransmit failures on uncongested bearer */ 1485 1486 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1487 if (++l_ptr->stale_count > 100) { 1488 link_retransmit_failure(l_ptr, buf); 1489 return; 1490 } 1491 } else { 1492 l_ptr->last_retransmitted = msg_seqno(msg); 1493 l_ptr->stale_count = 1; 1494 } 1495 } 1496 1497 while (retransmits && (buf != l_ptr->next_out) && buf) { 1498 msg = buf_msg(buf); 1499 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1500 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1501 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1502 buf = buf->next; 1503 retransmits--; 1504 l_ptr->stats.retransmitted++; 1505 } else { 1506 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1507 l_ptr->stats.bearer_congs++; 1508 l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf)); 1509 l_ptr->retransm_queue_size = retransmits; 1510 return; 1511 } 1512 } 1513 1514 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1515 } 1516 1517 /** 1518 * link_insert_deferred_queue - insert deferred messages back into receive chain 1519 */ 1520 1521 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, 1522 struct sk_buff *buf) 1523 { 1524 u32 seq_no; 1525 1526 if (l_ptr->oldest_deferred_in == NULL) 1527 return buf; 1528 1529 seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 1530 if (seq_no == mod(l_ptr->next_in_no)) { 1531 l_ptr->newest_deferred_in->next = buf; 1532 buf = l_ptr->oldest_deferred_in; 1533 l_ptr->oldest_deferred_in = NULL; 1534 l_ptr->deferred_inqueue_sz = 0; 1535 } 1536 return buf; 1537 } 1538 1539 /** 1540 * link_recv_buf_validate - validate basic format of received message 1541 * 1542 * This routine ensures a TIPC message has an acceptable header, and at least 1543 * as much data as the header indicates it should. The routine also ensures 1544 * that the entire message header is stored in the main fragment of the message 1545 * buffer, to simplify future access to message header fields. 1546 * 1547 * Note: Having extra info present in the message header or data areas is OK. 1548 * TIPC will ignore the excess, under the assumption that it is optional info 1549 * introduced by a later release of the protocol. 1550 */ 1551 1552 static int link_recv_buf_validate(struct sk_buff *buf) 1553 { 1554 static u32 min_data_hdr_size[8] = { 1555 SHORT_H_SIZE, MCAST_H_SIZE, LONG_H_SIZE, DIR_MSG_H_SIZE, 1556 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE 1557 }; 1558 1559 struct tipc_msg *msg; 1560 u32 tipc_hdr[2]; 1561 u32 size; 1562 u32 hdr_size; 1563 u32 min_hdr_size; 1564 1565 if (unlikely(buf->len < MIN_H_SIZE)) 1566 return 0; 1567 1568 msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr); 1569 if (msg == NULL) 1570 return 0; 1571 1572 if (unlikely(msg_version(msg) != TIPC_VERSION)) 1573 return 0; 1574 1575 size = msg_size(msg); 1576 hdr_size = msg_hdr_sz(msg); 1577 min_hdr_size = msg_isdata(msg) ? 1578 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE; 1579 1580 if (unlikely((hdr_size < min_hdr_size) || 1581 (size < hdr_size) || 1582 (buf->len < size) || 1583 (size - hdr_size > TIPC_MAX_USER_MSG_SIZE))) 1584 return 0; 1585 1586 return pskb_may_pull(buf, hdr_size); 1587 } 1588 1589 /** 1590 * tipc_recv_msg - process TIPC messages arriving from off-node 1591 * @head: pointer to message buffer chain 1592 * @tb_ptr: pointer to bearer message arrived on 1593 * 1594 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1595 * structure (i.e. cannot be NULL), but bearer can be inactive. 1596 */ 1597 1598 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr) 1599 { 1600 read_lock_bh(&tipc_net_lock); 1601 while (head) { 1602 struct bearer *b_ptr = (struct bearer *)tb_ptr; 1603 struct tipc_node *n_ptr; 1604 struct link *l_ptr; 1605 struct sk_buff *crs; 1606 struct sk_buff *buf = head; 1607 struct tipc_msg *msg; 1608 u32 seq_no; 1609 u32 ackd; 1610 u32 released = 0; 1611 int type; 1612 1613 head = head->next; 1614 1615 /* Ensure bearer is still enabled */ 1616 1617 if (unlikely(!b_ptr->active)) 1618 goto cont; 1619 1620 /* Ensure message is well-formed */ 1621 1622 if (unlikely(!link_recv_buf_validate(buf))) 1623 goto cont; 1624 1625 /* Ensure message data is a single contiguous unit */ 1626 1627 if (unlikely(buf_linearize(buf))) 1628 goto cont; 1629 1630 /* Handle arrival of a non-unicast link message */ 1631 1632 msg = buf_msg(buf); 1633 1634 if (unlikely(msg_non_seq(msg))) { 1635 if (msg_user(msg) == LINK_CONFIG) 1636 tipc_disc_recv_msg(buf, b_ptr); 1637 else 1638 tipc_bclink_recv_pkt(buf); 1639 continue; 1640 } 1641 1642 if (unlikely(!msg_short(msg) && 1643 (msg_destnode(msg) != tipc_own_addr))) 1644 goto cont; 1645 1646 /* Discard non-routeable messages destined for another node */ 1647 1648 if (unlikely(!msg_isdata(msg) && 1649 (msg_destnode(msg) != tipc_own_addr))) { 1650 if ((msg_user(msg) != CONN_MANAGER) && 1651 (msg_user(msg) != MSG_FRAGMENTER)) 1652 goto cont; 1653 } 1654 1655 /* Locate neighboring node that sent message */ 1656 1657 n_ptr = tipc_node_find(msg_prevnode(msg)); 1658 if (unlikely(!n_ptr)) 1659 goto cont; 1660 tipc_node_lock(n_ptr); 1661 1662 /* Don't talk to neighbor during cleanup after last session */ 1663 1664 if (n_ptr->cleanup_required) { 1665 tipc_node_unlock(n_ptr); 1666 goto cont; 1667 } 1668 1669 /* Locate unicast link endpoint that should handle message */ 1670 1671 l_ptr = n_ptr->links[b_ptr->identity]; 1672 if (unlikely(!l_ptr)) { 1673 tipc_node_unlock(n_ptr); 1674 goto cont; 1675 } 1676 1677 /* Validate message sequence number info */ 1678 1679 seq_no = msg_seqno(msg); 1680 ackd = msg_ack(msg); 1681 1682 /* Release acked messages */ 1683 1684 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) { 1685 if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) 1686 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1687 } 1688 1689 crs = l_ptr->first_out; 1690 while ((crs != l_ptr->next_out) && 1691 less_eq(msg_seqno(buf_msg(crs)), ackd)) { 1692 struct sk_buff *next = crs->next; 1693 1694 buf_discard(crs); 1695 crs = next; 1696 released++; 1697 } 1698 if (released) { 1699 l_ptr->first_out = crs; 1700 l_ptr->out_queue_size -= released; 1701 } 1702 1703 /* Try sending any messages link endpoint has pending */ 1704 1705 if (unlikely(l_ptr->next_out)) 1706 tipc_link_push_queue(l_ptr); 1707 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1708 tipc_link_wakeup_ports(l_ptr, 0); 1709 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1710 l_ptr->stats.sent_acks++; 1711 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1712 } 1713 1714 /* Now (finally!) process the incoming message */ 1715 1716 protocol_check: 1717 if (likely(link_working_working(l_ptr))) { 1718 if (likely(seq_no == mod(l_ptr->next_in_no))) { 1719 l_ptr->next_in_no++; 1720 if (unlikely(l_ptr->oldest_deferred_in)) 1721 head = link_insert_deferred_queue(l_ptr, 1722 head); 1723 if (likely(msg_is_dest(msg, tipc_own_addr))) { 1724 deliver: 1725 if (likely(msg_isdata(msg))) { 1726 tipc_node_unlock(n_ptr); 1727 tipc_port_recv_msg(buf); 1728 continue; 1729 } 1730 switch (msg_user(msg)) { 1731 case MSG_BUNDLER: 1732 l_ptr->stats.recv_bundles++; 1733 l_ptr->stats.recv_bundled += 1734 msg_msgcnt(msg); 1735 tipc_node_unlock(n_ptr); 1736 tipc_link_recv_bundle(buf); 1737 continue; 1738 case ROUTE_DISTRIBUTOR: 1739 tipc_node_unlock(n_ptr); 1740 buf_discard(buf); 1741 continue; 1742 case NAME_DISTRIBUTOR: 1743 tipc_node_unlock(n_ptr); 1744 tipc_named_recv(buf); 1745 continue; 1746 case CONN_MANAGER: 1747 tipc_node_unlock(n_ptr); 1748 tipc_port_recv_proto_msg(buf); 1749 continue; 1750 case MSG_FRAGMENTER: 1751 l_ptr->stats.recv_fragments++; 1752 if (tipc_link_recv_fragment(&l_ptr->defragm_buf, 1753 &buf, &msg)) { 1754 l_ptr->stats.recv_fragmented++; 1755 goto deliver; 1756 } 1757 break; 1758 case CHANGEOVER_PROTOCOL: 1759 type = msg_type(msg); 1760 if (link_recv_changeover_msg(&l_ptr, &buf)) { 1761 msg = buf_msg(buf); 1762 seq_no = msg_seqno(msg); 1763 if (type == ORIGINAL_MSG) 1764 goto deliver; 1765 goto protocol_check; 1766 } 1767 break; 1768 } 1769 } 1770 tipc_node_unlock(n_ptr); 1771 tipc_net_route_msg(buf); 1772 continue; 1773 } 1774 link_handle_out_of_seq_msg(l_ptr, buf); 1775 head = link_insert_deferred_queue(l_ptr, head); 1776 tipc_node_unlock(n_ptr); 1777 continue; 1778 } 1779 1780 if (msg_user(msg) == LINK_PROTOCOL) { 1781 link_recv_proto_msg(l_ptr, buf); 1782 head = link_insert_deferred_queue(l_ptr, head); 1783 tipc_node_unlock(n_ptr); 1784 continue; 1785 } 1786 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1787 1788 if (link_working_working(l_ptr)) { 1789 /* Re-insert in front of queue */ 1790 buf->next = head; 1791 head = buf; 1792 tipc_node_unlock(n_ptr); 1793 continue; 1794 } 1795 tipc_node_unlock(n_ptr); 1796 cont: 1797 buf_discard(buf); 1798 } 1799 read_unlock_bh(&tipc_net_lock); 1800 } 1801 1802 /* 1803 * link_defer_buf(): Sort a received out-of-sequence packet 1804 * into the deferred reception queue. 1805 * Returns the increase of the queue length,i.e. 0 or 1 1806 */ 1807 1808 u32 tipc_link_defer_pkt(struct sk_buff **head, 1809 struct sk_buff **tail, 1810 struct sk_buff *buf) 1811 { 1812 struct sk_buff *prev = NULL; 1813 struct sk_buff *crs = *head; 1814 u32 seq_no = msg_seqno(buf_msg(buf)); 1815 1816 buf->next = NULL; 1817 1818 /* Empty queue ? */ 1819 if (*head == NULL) { 1820 *head = *tail = buf; 1821 return 1; 1822 } 1823 1824 /* Last ? */ 1825 if (less(msg_seqno(buf_msg(*tail)), seq_no)) { 1826 (*tail)->next = buf; 1827 *tail = buf; 1828 return 1; 1829 } 1830 1831 /* Scan through queue and sort it in */ 1832 do { 1833 struct tipc_msg *msg = buf_msg(crs); 1834 1835 if (less(seq_no, msg_seqno(msg))) { 1836 buf->next = crs; 1837 if (prev) 1838 prev->next = buf; 1839 else 1840 *head = buf; 1841 return 1; 1842 } 1843 if (seq_no == msg_seqno(msg)) 1844 break; 1845 prev = crs; 1846 crs = crs->next; 1847 } while (crs); 1848 1849 /* Message is a duplicate of an existing message */ 1850 1851 buf_discard(buf); 1852 return 0; 1853 } 1854 1855 /** 1856 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet 1857 */ 1858 1859 static void link_handle_out_of_seq_msg(struct link *l_ptr, 1860 struct sk_buff *buf) 1861 { 1862 u32 seq_no = msg_seqno(buf_msg(buf)); 1863 1864 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { 1865 link_recv_proto_msg(l_ptr, buf); 1866 return; 1867 } 1868 1869 /* Record OOS packet arrival (force mismatch on next timeout) */ 1870 1871 l_ptr->checkpoint--; 1872 1873 /* 1874 * Discard packet if a duplicate; otherwise add it to deferred queue 1875 * and notify peer of gap as per protocol specification 1876 */ 1877 1878 if (less(seq_no, mod(l_ptr->next_in_no))) { 1879 l_ptr->stats.duplicates++; 1880 buf_discard(buf); 1881 return; 1882 } 1883 1884 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1885 &l_ptr->newest_deferred_in, buf)) { 1886 l_ptr->deferred_inqueue_sz++; 1887 l_ptr->stats.deferred_recv++; 1888 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1889 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1890 } else 1891 l_ptr->stats.duplicates++; 1892 } 1893 1894 /* 1895 * Send protocol message to the other endpoint. 1896 */ 1897 void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, 1898 u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) 1899 { 1900 struct sk_buff *buf = NULL; 1901 struct tipc_msg *msg = l_ptr->pmsg; 1902 u32 msg_size = sizeof(l_ptr->proto_msg); 1903 1904 if (link_blocked(l_ptr)) 1905 return; 1906 msg_set_type(msg, msg_typ); 1907 msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); 1908 msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 1909 msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); 1910 1911 if (msg_typ == STATE_MSG) { 1912 u32 next_sent = mod(l_ptr->next_out_no); 1913 1914 if (!tipc_link_is_up(l_ptr)) 1915 return; 1916 if (l_ptr->next_out) 1917 next_sent = msg_seqno(buf_msg(l_ptr->next_out)); 1918 msg_set_next_sent(msg, next_sent); 1919 if (l_ptr->oldest_deferred_in) { 1920 u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 1921 gap = mod(rec - mod(l_ptr->next_in_no)); 1922 } 1923 msg_set_seq_gap(msg, gap); 1924 if (gap) 1925 l_ptr->stats.sent_nacks++; 1926 msg_set_link_tolerance(msg, tolerance); 1927 msg_set_linkprio(msg, priority); 1928 msg_set_max_pkt(msg, ack_mtu); 1929 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1930 msg_set_probe(msg, probe_msg != 0); 1931 if (probe_msg) { 1932 u32 mtu = l_ptr->max_pkt; 1933 1934 if ((mtu < l_ptr->max_pkt_target) && 1935 link_working_working(l_ptr) && 1936 l_ptr->fsm_msg_cnt) { 1937 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 1938 if (l_ptr->max_pkt_probes == 10) { 1939 l_ptr->max_pkt_target = (msg_size - 4); 1940 l_ptr->max_pkt_probes = 0; 1941 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 1942 } 1943 l_ptr->max_pkt_probes++; 1944 } 1945 1946 l_ptr->stats.sent_probes++; 1947 } 1948 l_ptr->stats.sent_states++; 1949 } else { /* RESET_MSG or ACTIVATE_MSG */ 1950 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); 1951 msg_set_seq_gap(msg, 0); 1952 msg_set_next_sent(msg, 1); 1953 msg_set_link_tolerance(msg, l_ptr->tolerance); 1954 msg_set_linkprio(msg, l_ptr->priority); 1955 msg_set_max_pkt(msg, l_ptr->max_pkt_target); 1956 } 1957 1958 if (tipc_node_has_redundant_links(l_ptr->owner)) 1959 msg_set_redundant_link(msg); 1960 else 1961 msg_clear_redundant_link(msg); 1962 msg_set_linkprio(msg, l_ptr->priority); 1963 1964 /* Ensure sequence number will not fit : */ 1965 1966 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); 1967 1968 /* Congestion? */ 1969 1970 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1971 if (!l_ptr->proto_msg_queue) { 1972 l_ptr->proto_msg_queue = 1973 tipc_buf_acquire(sizeof(l_ptr->proto_msg)); 1974 } 1975 buf = l_ptr->proto_msg_queue; 1976 if (!buf) 1977 return; 1978 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1979 return; 1980 } 1981 msg_set_timestamp(msg, jiffies_to_msecs(jiffies)); 1982 1983 /* Message can be sent */ 1984 1985 buf = tipc_buf_acquire(msg_size); 1986 if (!buf) 1987 return; 1988 1989 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1990 msg_set_size(buf_msg(buf), msg_size); 1991 1992 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1993 l_ptr->unacked_window = 0; 1994 buf_discard(buf); 1995 return; 1996 } 1997 1998 /* New congestion */ 1999 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 2000 l_ptr->proto_msg_queue = buf; 2001 l_ptr->stats.bearer_congs++; 2002 } 2003 2004 /* 2005 * Receive protocol message : 2006 * Note that network plane id propagates through the network, and may 2007 * change at any time. The node with lowest address rules 2008 */ 2009 2010 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf) 2011 { 2012 u32 rec_gap = 0; 2013 u32 max_pkt_info; 2014 u32 max_pkt_ack; 2015 u32 msg_tol; 2016 struct tipc_msg *msg = buf_msg(buf); 2017 2018 if (link_blocked(l_ptr)) 2019 goto exit; 2020 2021 /* record unnumbered packet arrival (force mismatch on next timeout) */ 2022 2023 l_ptr->checkpoint--; 2024 2025 if (l_ptr->b_ptr->net_plane != msg_net_plane(msg)) 2026 if (tipc_own_addr > msg_prevnode(msg)) 2027 l_ptr->b_ptr->net_plane = msg_net_plane(msg); 2028 2029 l_ptr->owner->permit_changeover = msg_redundant_link(msg); 2030 2031 switch (msg_type(msg)) { 2032 2033 case RESET_MSG: 2034 if (!link_working_unknown(l_ptr) && 2035 (l_ptr->peer_session != INVALID_SESSION)) { 2036 if (msg_session(msg) == l_ptr->peer_session) 2037 break; /* duplicate: ignore */ 2038 } 2039 /* fall thru' */ 2040 case ACTIVATE_MSG: 2041 /* Update link settings according other endpoint's values */ 2042 2043 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); 2044 2045 msg_tol = msg_link_tolerance(msg); 2046 if (msg_tol > l_ptr->tolerance) 2047 link_set_supervision_props(l_ptr, msg_tol); 2048 2049 if (msg_linkprio(msg) > l_ptr->priority) 2050 l_ptr->priority = msg_linkprio(msg); 2051 2052 max_pkt_info = msg_max_pkt(msg); 2053 if (max_pkt_info) { 2054 if (max_pkt_info < l_ptr->max_pkt_target) 2055 l_ptr->max_pkt_target = max_pkt_info; 2056 if (l_ptr->max_pkt > l_ptr->max_pkt_target) 2057 l_ptr->max_pkt = l_ptr->max_pkt_target; 2058 } else { 2059 l_ptr->max_pkt = l_ptr->max_pkt_target; 2060 } 2061 l_ptr->owner->bclink.supported = (max_pkt_info != 0); 2062 2063 link_state_event(l_ptr, msg_type(msg)); 2064 2065 l_ptr->peer_session = msg_session(msg); 2066 l_ptr->peer_bearer_id = msg_bearer_id(msg); 2067 2068 /* Synchronize broadcast sequence numbers */ 2069 if (!tipc_node_has_redundant_links(l_ptr->owner)) 2070 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); 2071 break; 2072 case STATE_MSG: 2073 2074 msg_tol = msg_link_tolerance(msg); 2075 if (msg_tol) 2076 link_set_supervision_props(l_ptr, msg_tol); 2077 2078 if (msg_linkprio(msg) && 2079 (msg_linkprio(msg) != l_ptr->priority)) { 2080 warn("Resetting link <%s>, priority change %u->%u\n", 2081 l_ptr->name, l_ptr->priority, msg_linkprio(msg)); 2082 l_ptr->priority = msg_linkprio(msg); 2083 tipc_link_reset(l_ptr); /* Enforce change to take effect */ 2084 break; 2085 } 2086 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 2087 l_ptr->stats.recv_states++; 2088 if (link_reset_unknown(l_ptr)) 2089 break; 2090 2091 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) { 2092 rec_gap = mod(msg_next_sent(msg) - 2093 mod(l_ptr->next_in_no)); 2094 } 2095 2096 max_pkt_ack = msg_max_pkt(msg); 2097 if (max_pkt_ack > l_ptr->max_pkt) { 2098 l_ptr->max_pkt = max_pkt_ack; 2099 l_ptr->max_pkt_probes = 0; 2100 } 2101 2102 max_pkt_ack = 0; 2103 if (msg_probe(msg)) { 2104 l_ptr->stats.recv_probes++; 2105 if (msg_size(msg) > sizeof(l_ptr->proto_msg)) 2106 max_pkt_ack = msg_size(msg); 2107 } 2108 2109 /* Protocol message before retransmits, reduce loss risk */ 2110 2111 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); 2112 2113 if (rec_gap || (msg_probe(msg))) { 2114 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2115 0, rec_gap, 0, 0, max_pkt_ack); 2116 } 2117 if (msg_seq_gap(msg)) { 2118 l_ptr->stats.recv_nacks++; 2119 tipc_link_retransmit(l_ptr, l_ptr->first_out, 2120 msg_seq_gap(msg)); 2121 } 2122 break; 2123 } 2124 exit: 2125 buf_discard(buf); 2126 } 2127 2128 2129 /* 2130 * tipc_link_tunnel(): Send one message via a link belonging to 2131 * another bearer. Owner node is locked. 2132 */ 2133 static void tipc_link_tunnel(struct link *l_ptr, 2134 struct tipc_msg *tunnel_hdr, 2135 struct tipc_msg *msg, 2136 u32 selector) 2137 { 2138 struct link *tunnel; 2139 struct sk_buff *buf; 2140 u32 length = msg_size(msg); 2141 2142 tunnel = l_ptr->owner->active_links[selector & 1]; 2143 if (!tipc_link_is_up(tunnel)) { 2144 warn("Link changeover error, " 2145 "tunnel link no longer available\n"); 2146 return; 2147 } 2148 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 2149 buf = tipc_buf_acquire(length + INT_H_SIZE); 2150 if (!buf) { 2151 warn("Link changeover error, " 2152 "unable to send tunnel msg\n"); 2153 return; 2154 } 2155 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 2156 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 2157 tipc_link_send_buf(tunnel, buf); 2158 } 2159 2160 2161 2162 /* 2163 * changeover(): Send whole message queue via the remaining link 2164 * Owner node is locked. 2165 */ 2166 2167 void tipc_link_changeover(struct link *l_ptr) 2168 { 2169 u32 msgcount = l_ptr->out_queue_size; 2170 struct sk_buff *crs = l_ptr->first_out; 2171 struct link *tunnel = l_ptr->owner->active_links[0]; 2172 struct tipc_msg tunnel_hdr; 2173 int split_bundles; 2174 2175 if (!tunnel) 2176 return; 2177 2178 if (!l_ptr->owner->permit_changeover) { 2179 warn("Link changeover error, " 2180 "peer did not permit changeover\n"); 2181 return; 2182 } 2183 2184 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2185 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 2186 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2187 msg_set_msgcnt(&tunnel_hdr, msgcount); 2188 2189 if (!l_ptr->first_out) { 2190 struct sk_buff *buf; 2191 2192 buf = tipc_buf_acquire(INT_H_SIZE); 2193 if (buf) { 2194 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); 2195 msg_set_size(&tunnel_hdr, INT_H_SIZE); 2196 tipc_link_send_buf(tunnel, buf); 2197 } else { 2198 warn("Link changeover error, " 2199 "unable to send changeover msg\n"); 2200 } 2201 return; 2202 } 2203 2204 split_bundles = (l_ptr->owner->active_links[0] != 2205 l_ptr->owner->active_links[1]); 2206 2207 while (crs) { 2208 struct tipc_msg *msg = buf_msg(crs); 2209 2210 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 2211 struct tipc_msg *m = msg_get_wrapped(msg); 2212 unchar *pos = (unchar *)m; 2213 2214 msgcount = msg_msgcnt(msg); 2215 while (msgcount--) { 2216 msg_set_seqno(m, msg_seqno(msg)); 2217 tipc_link_tunnel(l_ptr, &tunnel_hdr, m, 2218 msg_link_selector(m)); 2219 pos += align(msg_size(m)); 2220 m = (struct tipc_msg *)pos; 2221 } 2222 } else { 2223 tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, 2224 msg_link_selector(msg)); 2225 } 2226 crs = crs->next; 2227 } 2228 } 2229 2230 void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel) 2231 { 2232 struct sk_buff *iter; 2233 struct tipc_msg tunnel_hdr; 2234 2235 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2236 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 2237 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 2238 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2239 iter = l_ptr->first_out; 2240 while (iter) { 2241 struct sk_buff *outbuf; 2242 struct tipc_msg *msg = buf_msg(iter); 2243 u32 length = msg_size(msg); 2244 2245 if (msg_user(msg) == MSG_BUNDLER) 2246 msg_set_type(msg, CLOSED_MSG); 2247 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 2248 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 2249 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 2250 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 2251 if (outbuf == NULL) { 2252 warn("Link changeover error, " 2253 "unable to send duplicate msg\n"); 2254 return; 2255 } 2256 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 2257 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 2258 length); 2259 tipc_link_send_buf(tunnel, outbuf); 2260 if (!tipc_link_is_up(l_ptr)) 2261 return; 2262 iter = iter->next; 2263 } 2264 } 2265 2266 2267 2268 /** 2269 * buf_extract - extracts embedded TIPC message from another message 2270 * @skb: encapsulating message buffer 2271 * @from_pos: offset to extract from 2272 * 2273 * Returns a new message buffer containing an embedded message. The 2274 * encapsulating message itself is left unchanged. 2275 */ 2276 2277 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 2278 { 2279 struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); 2280 u32 size = msg_size(msg); 2281 struct sk_buff *eb; 2282 2283 eb = tipc_buf_acquire(size); 2284 if (eb) 2285 skb_copy_to_linear_data(eb, msg, size); 2286 return eb; 2287 } 2288 2289 /* 2290 * link_recv_changeover_msg(): Receive tunneled packet sent 2291 * via other link. Node is locked. Return extracted buffer. 2292 */ 2293 2294 static int link_recv_changeover_msg(struct link **l_ptr, 2295 struct sk_buff **buf) 2296 { 2297 struct sk_buff *tunnel_buf = *buf; 2298 struct link *dest_link; 2299 struct tipc_msg *msg; 2300 struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); 2301 u32 msg_typ = msg_type(tunnel_msg); 2302 u32 msg_count = msg_msgcnt(tunnel_msg); 2303 2304 dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)]; 2305 if (!dest_link) 2306 goto exit; 2307 if (dest_link == *l_ptr) { 2308 err("Unexpected changeover message on link <%s>\n", 2309 (*l_ptr)->name); 2310 goto exit; 2311 } 2312 *l_ptr = dest_link; 2313 msg = msg_get_wrapped(tunnel_msg); 2314 2315 if (msg_typ == DUPLICATE_MSG) { 2316 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) 2317 goto exit; 2318 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2319 if (*buf == NULL) { 2320 warn("Link changeover error, duplicate msg dropped\n"); 2321 goto exit; 2322 } 2323 buf_discard(tunnel_buf); 2324 return 1; 2325 } 2326 2327 /* First original message ?: */ 2328 2329 if (tipc_link_is_up(dest_link)) { 2330 info("Resetting link <%s>, changeover initiated by peer\n", 2331 dest_link->name); 2332 tipc_link_reset(dest_link); 2333 dest_link->exp_msg_count = msg_count; 2334 if (!msg_count) 2335 goto exit; 2336 } else if (dest_link->exp_msg_count == START_CHANGEOVER) { 2337 dest_link->exp_msg_count = msg_count; 2338 if (!msg_count) 2339 goto exit; 2340 } 2341 2342 /* Receive original message */ 2343 2344 if (dest_link->exp_msg_count == 0) { 2345 warn("Link switchover error, " 2346 "got too many tunnelled messages\n"); 2347 goto exit; 2348 } 2349 dest_link->exp_msg_count--; 2350 if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { 2351 goto exit; 2352 } else { 2353 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2354 if (*buf != NULL) { 2355 buf_discard(tunnel_buf); 2356 return 1; 2357 } else { 2358 warn("Link changeover error, original msg dropped\n"); 2359 } 2360 } 2361 exit: 2362 *buf = NULL; 2363 buf_discard(tunnel_buf); 2364 return 0; 2365 } 2366 2367 /* 2368 * Bundler functionality: 2369 */ 2370 void tipc_link_recv_bundle(struct sk_buff *buf) 2371 { 2372 u32 msgcount = msg_msgcnt(buf_msg(buf)); 2373 u32 pos = INT_H_SIZE; 2374 struct sk_buff *obuf; 2375 2376 while (msgcount--) { 2377 obuf = buf_extract(buf, pos); 2378 if (obuf == NULL) { 2379 warn("Link unable to unbundle message(s)\n"); 2380 break; 2381 } 2382 pos += align(msg_size(buf_msg(obuf))); 2383 tipc_net_route_msg(obuf); 2384 } 2385 buf_discard(buf); 2386 } 2387 2388 /* 2389 * Fragmentation/defragmentation: 2390 */ 2391 2392 2393 /* 2394 * link_send_long_buf: Entry for buffers needing fragmentation. 2395 * The buffer is complete, inclusive total message length. 2396 * Returns user data length. 2397 */ 2398 static int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) 2399 { 2400 struct tipc_msg *inmsg = buf_msg(buf); 2401 struct tipc_msg fragm_hdr; 2402 u32 insize = msg_size(inmsg); 2403 u32 dsz = msg_data_sz(inmsg); 2404 unchar *crs = buf->data; 2405 u32 rest = insize; 2406 u32 pack_sz = l_ptr->max_pkt; 2407 u32 fragm_sz = pack_sz - INT_H_SIZE; 2408 u32 fragm_no = 1; 2409 u32 destaddr; 2410 2411 if (msg_short(inmsg)) 2412 destaddr = l_ptr->addr; 2413 else 2414 destaddr = msg_destnode(inmsg); 2415 2416 if (msg_routed(inmsg)) 2417 msg_set_prevnode(inmsg, tipc_own_addr); 2418 2419 /* Prepare reusable fragment header: */ 2420 2421 tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 2422 INT_H_SIZE, destaddr); 2423 msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg)); 2424 msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++)); 2425 msg_set_fragm_no(&fragm_hdr, fragm_no); 2426 l_ptr->stats.sent_fragmented++; 2427 2428 /* Chop up message: */ 2429 2430 while (rest > 0) { 2431 struct sk_buff *fragm; 2432 2433 if (rest <= fragm_sz) { 2434 fragm_sz = rest; 2435 msg_set_type(&fragm_hdr, LAST_FRAGMENT); 2436 } 2437 fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE); 2438 if (fragm == NULL) { 2439 warn("Link unable to fragment message\n"); 2440 dsz = -ENOMEM; 2441 goto exit; 2442 } 2443 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2444 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); 2445 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, 2446 fragm_sz); 2447 /* Send queued messages first, if any: */ 2448 2449 l_ptr->stats.sent_fragments++; 2450 tipc_link_send_buf(l_ptr, fragm); 2451 if (!tipc_link_is_up(l_ptr)) 2452 return dsz; 2453 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 2454 rest -= fragm_sz; 2455 crs += fragm_sz; 2456 msg_set_type(&fragm_hdr, FRAGMENT); 2457 } 2458 exit: 2459 buf_discard(buf); 2460 return dsz; 2461 } 2462 2463 /* 2464 * A pending message being re-assembled must store certain values 2465 * to handle subsequent fragments correctly. The following functions 2466 * help storing these values in unused, available fields in the 2467 * pending message. This makes dynamic memory allocation unecessary. 2468 */ 2469 2470 static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno) 2471 { 2472 msg_set_seqno(buf_msg(buf), seqno); 2473 } 2474 2475 static u32 get_fragm_size(struct sk_buff *buf) 2476 { 2477 return msg_ack(buf_msg(buf)); 2478 } 2479 2480 static void set_fragm_size(struct sk_buff *buf, u32 sz) 2481 { 2482 msg_set_ack(buf_msg(buf), sz); 2483 } 2484 2485 static u32 get_expected_frags(struct sk_buff *buf) 2486 { 2487 return msg_bcast_ack(buf_msg(buf)); 2488 } 2489 2490 static void set_expected_frags(struct sk_buff *buf, u32 exp) 2491 { 2492 msg_set_bcast_ack(buf_msg(buf), exp); 2493 } 2494 2495 static u32 get_timer_cnt(struct sk_buff *buf) 2496 { 2497 return msg_reroute_cnt(buf_msg(buf)); 2498 } 2499 2500 static void incr_timer_cnt(struct sk_buff *buf) 2501 { 2502 msg_incr_reroute_cnt(buf_msg(buf)); 2503 } 2504 2505 /* 2506 * tipc_link_recv_fragment(): Called with node lock on. Returns 2507 * the reassembled buffer if message is complete. 2508 */ 2509 int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 2510 struct tipc_msg **m) 2511 { 2512 struct sk_buff *prev = NULL; 2513 struct sk_buff *fbuf = *fb; 2514 struct tipc_msg *fragm = buf_msg(fbuf); 2515 struct sk_buff *pbuf = *pending; 2516 u32 long_msg_seq_no = msg_long_msgno(fragm); 2517 2518 *fb = NULL; 2519 2520 /* Is there an incomplete message waiting for this fragment? */ 2521 2522 while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) || 2523 (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { 2524 prev = pbuf; 2525 pbuf = pbuf->next; 2526 } 2527 2528 if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) { 2529 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm); 2530 u32 msg_sz = msg_size(imsg); 2531 u32 fragm_sz = msg_data_sz(fragm); 2532 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz); 2533 u32 max = TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE; 2534 if (msg_type(imsg) == TIPC_MCAST_MSG) 2535 max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE; 2536 if (msg_size(imsg) > max) { 2537 buf_discard(fbuf); 2538 return 0; 2539 } 2540 pbuf = tipc_buf_acquire(msg_size(imsg)); 2541 if (pbuf != NULL) { 2542 pbuf->next = *pending; 2543 *pending = pbuf; 2544 skb_copy_to_linear_data(pbuf, imsg, 2545 msg_data_sz(fragm)); 2546 /* Prepare buffer for subsequent fragments. */ 2547 2548 set_long_msg_seqno(pbuf, long_msg_seq_no); 2549 set_fragm_size(pbuf, fragm_sz); 2550 set_expected_frags(pbuf, exp_fragm_cnt - 1); 2551 } else { 2552 warn("Link unable to reassemble fragmented message\n"); 2553 } 2554 buf_discard(fbuf); 2555 return 0; 2556 } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) { 2557 u32 dsz = msg_data_sz(fragm); 2558 u32 fsz = get_fragm_size(pbuf); 2559 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz); 2560 u32 exp_frags = get_expected_frags(pbuf) - 1; 2561 skb_copy_to_linear_data_offset(pbuf, crs, 2562 msg_data(fragm), dsz); 2563 buf_discard(fbuf); 2564 2565 /* Is message complete? */ 2566 2567 if (exp_frags == 0) { 2568 if (prev) 2569 prev->next = pbuf->next; 2570 else 2571 *pending = pbuf->next; 2572 msg_reset_reroute_cnt(buf_msg(pbuf)); 2573 *fb = pbuf; 2574 *m = buf_msg(pbuf); 2575 return 1; 2576 } 2577 set_expected_frags(pbuf, exp_frags); 2578 return 0; 2579 } 2580 buf_discard(fbuf); 2581 return 0; 2582 } 2583 2584 /** 2585 * link_check_defragm_bufs - flush stale incoming message fragments 2586 * @l_ptr: pointer to link 2587 */ 2588 2589 static void link_check_defragm_bufs(struct link *l_ptr) 2590 { 2591 struct sk_buff *prev = NULL; 2592 struct sk_buff *next = NULL; 2593 struct sk_buff *buf = l_ptr->defragm_buf; 2594 2595 if (!buf) 2596 return; 2597 if (!link_working_working(l_ptr)) 2598 return; 2599 while (buf) { 2600 u32 cnt = get_timer_cnt(buf); 2601 2602 next = buf->next; 2603 if (cnt < 4) { 2604 incr_timer_cnt(buf); 2605 prev = buf; 2606 } else { 2607 if (prev) 2608 prev->next = buf->next; 2609 else 2610 l_ptr->defragm_buf = buf->next; 2611 buf_discard(buf); 2612 } 2613 buf = next; 2614 } 2615 } 2616 2617 2618 2619 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance) 2620 { 2621 l_ptr->tolerance = tolerance; 2622 l_ptr->continuity_interval = 2623 ((tolerance / 4) > 500) ? 500 : tolerance / 4; 2624 l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4); 2625 } 2626 2627 2628 void tipc_link_set_queue_limits(struct link *l_ptr, u32 window) 2629 { 2630 /* Data messages from this node, inclusive FIRST_FRAGM */ 2631 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 2632 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 2633 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; 2634 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; 2635 /* Transiting data messages,inclusive FIRST_FRAGM */ 2636 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; 2637 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; 2638 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900; 2639 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200; 2640 l_ptr->queue_limit[CONN_MANAGER] = 1200; 2641 l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; 2642 l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; 2643 /* FRAGMENT and LAST_FRAGMENT packets */ 2644 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; 2645 } 2646 2647 /** 2648 * link_find_link - locate link by name 2649 * @name - ptr to link name string 2650 * @node - ptr to area to be filled with ptr to associated node 2651 * 2652 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; 2653 * this also prevents link deletion. 2654 * 2655 * Returns pointer to link (or 0 if invalid link name). 2656 */ 2657 2658 static struct link *link_find_link(const char *name, struct tipc_node **node) 2659 { 2660 struct link_name link_name_parts; 2661 struct bearer *b_ptr; 2662 struct link *l_ptr; 2663 2664 if (!link_name_validate(name, &link_name_parts)) 2665 return NULL; 2666 2667 b_ptr = tipc_bearer_find_interface(link_name_parts.if_local); 2668 if (!b_ptr) 2669 return NULL; 2670 2671 *node = tipc_node_find(link_name_parts.addr_peer); 2672 if (!*node) 2673 return NULL; 2674 2675 l_ptr = (*node)->links[b_ptr->identity]; 2676 if (!l_ptr || strcmp(l_ptr->name, name)) 2677 return NULL; 2678 2679 return l_ptr; 2680 } 2681 2682 struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, 2683 u16 cmd) 2684 { 2685 struct tipc_link_config *args; 2686 u32 new_value; 2687 struct link *l_ptr; 2688 struct tipc_node *node; 2689 int res; 2690 2691 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG)) 2692 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2693 2694 args = (struct tipc_link_config *)TLV_DATA(req_tlv_area); 2695 new_value = ntohl(args->value); 2696 2697 if (!strcmp(args->name, tipc_bclink_name)) { 2698 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) && 2699 (tipc_bclink_set_queue_limits(new_value) == 0)) 2700 return tipc_cfg_reply_none(); 2701 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 2702 " (cannot change setting on broadcast link)"); 2703 } 2704 2705 read_lock_bh(&tipc_net_lock); 2706 l_ptr = link_find_link(args->name, &node); 2707 if (!l_ptr) { 2708 read_unlock_bh(&tipc_net_lock); 2709 return tipc_cfg_reply_error_string("link not found"); 2710 } 2711 2712 tipc_node_lock(node); 2713 res = -EINVAL; 2714 switch (cmd) { 2715 case TIPC_CMD_SET_LINK_TOL: 2716 if ((new_value >= TIPC_MIN_LINK_TOL) && 2717 (new_value <= TIPC_MAX_LINK_TOL)) { 2718 link_set_supervision_props(l_ptr, new_value); 2719 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2720 0, 0, new_value, 0, 0); 2721 res = 0; 2722 } 2723 break; 2724 case TIPC_CMD_SET_LINK_PRI: 2725 if ((new_value >= TIPC_MIN_LINK_PRI) && 2726 (new_value <= TIPC_MAX_LINK_PRI)) { 2727 l_ptr->priority = new_value; 2728 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2729 0, 0, 0, new_value, 0); 2730 res = 0; 2731 } 2732 break; 2733 case TIPC_CMD_SET_LINK_WINDOW: 2734 if ((new_value >= TIPC_MIN_LINK_WIN) && 2735 (new_value <= TIPC_MAX_LINK_WIN)) { 2736 tipc_link_set_queue_limits(l_ptr, new_value); 2737 res = 0; 2738 } 2739 break; 2740 } 2741 tipc_node_unlock(node); 2742 2743 read_unlock_bh(&tipc_net_lock); 2744 if (res) 2745 return tipc_cfg_reply_error_string("cannot change link setting"); 2746 2747 return tipc_cfg_reply_none(); 2748 } 2749 2750 /** 2751 * link_reset_statistics - reset link statistics 2752 * @l_ptr: pointer to link 2753 */ 2754 2755 static void link_reset_statistics(struct link *l_ptr) 2756 { 2757 memset(&l_ptr->stats, 0, sizeof(l_ptr->stats)); 2758 l_ptr->stats.sent_info = l_ptr->next_out_no; 2759 l_ptr->stats.recv_info = l_ptr->next_in_no; 2760 } 2761 2762 struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space) 2763 { 2764 char *link_name; 2765 struct link *l_ptr; 2766 struct tipc_node *node; 2767 2768 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2769 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2770 2771 link_name = (char *)TLV_DATA(req_tlv_area); 2772 if (!strcmp(link_name, tipc_bclink_name)) { 2773 if (tipc_bclink_reset_stats()) 2774 return tipc_cfg_reply_error_string("link not found"); 2775 return tipc_cfg_reply_none(); 2776 } 2777 2778 read_lock_bh(&tipc_net_lock); 2779 l_ptr = link_find_link(link_name, &node); 2780 if (!l_ptr) { 2781 read_unlock_bh(&tipc_net_lock); 2782 return tipc_cfg_reply_error_string("link not found"); 2783 } 2784 2785 tipc_node_lock(node); 2786 link_reset_statistics(l_ptr); 2787 tipc_node_unlock(node); 2788 read_unlock_bh(&tipc_net_lock); 2789 return tipc_cfg_reply_none(); 2790 } 2791 2792 /** 2793 * percent - convert count to a percentage of total (rounding up or down) 2794 */ 2795 2796 static u32 percent(u32 count, u32 total) 2797 { 2798 return (count * 100 + (total / 2)) / total; 2799 } 2800 2801 /** 2802 * tipc_link_stats - print link statistics 2803 * @name: link name 2804 * @buf: print buffer area 2805 * @buf_size: size of print buffer area 2806 * 2807 * Returns length of print buffer data string (or 0 if error) 2808 */ 2809 2810 static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) 2811 { 2812 struct print_buf pb; 2813 struct link *l_ptr; 2814 struct tipc_node *node; 2815 char *status; 2816 u32 profile_total = 0; 2817 2818 if (!strcmp(name, tipc_bclink_name)) 2819 return tipc_bclink_stats(buf, buf_size); 2820 2821 tipc_printbuf_init(&pb, buf, buf_size); 2822 2823 read_lock_bh(&tipc_net_lock); 2824 l_ptr = link_find_link(name, &node); 2825 if (!l_ptr) { 2826 read_unlock_bh(&tipc_net_lock); 2827 return 0; 2828 } 2829 tipc_node_lock(node); 2830 2831 if (tipc_link_is_active(l_ptr)) 2832 status = "ACTIVE"; 2833 else if (tipc_link_is_up(l_ptr)) 2834 status = "STANDBY"; 2835 else 2836 status = "DEFUNCT"; 2837 tipc_printf(&pb, "Link <%s>\n" 2838 " %s MTU:%u Priority:%u Tolerance:%u ms" 2839 " Window:%u packets\n", 2840 l_ptr->name, status, l_ptr->max_pkt, 2841 l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]); 2842 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 2843 l_ptr->next_in_no - l_ptr->stats.recv_info, 2844 l_ptr->stats.recv_fragments, 2845 l_ptr->stats.recv_fragmented, 2846 l_ptr->stats.recv_bundles, 2847 l_ptr->stats.recv_bundled); 2848 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 2849 l_ptr->next_out_no - l_ptr->stats.sent_info, 2850 l_ptr->stats.sent_fragments, 2851 l_ptr->stats.sent_fragmented, 2852 l_ptr->stats.sent_bundles, 2853 l_ptr->stats.sent_bundled); 2854 profile_total = l_ptr->stats.msg_length_counts; 2855 if (!profile_total) 2856 profile_total = 1; 2857 tipc_printf(&pb, " TX profile sample:%u packets average:%u octets\n" 2858 " 0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% " 2859 "-16354:%u%% -32768:%u%% -66000:%u%%\n", 2860 l_ptr->stats.msg_length_counts, 2861 l_ptr->stats.msg_lengths_total / profile_total, 2862 percent(l_ptr->stats.msg_length_profile[0], profile_total), 2863 percent(l_ptr->stats.msg_length_profile[1], profile_total), 2864 percent(l_ptr->stats.msg_length_profile[2], profile_total), 2865 percent(l_ptr->stats.msg_length_profile[3], profile_total), 2866 percent(l_ptr->stats.msg_length_profile[4], profile_total), 2867 percent(l_ptr->stats.msg_length_profile[5], profile_total), 2868 percent(l_ptr->stats.msg_length_profile[6], profile_total)); 2869 tipc_printf(&pb, " RX states:%u probes:%u naks:%u defs:%u dups:%u\n", 2870 l_ptr->stats.recv_states, 2871 l_ptr->stats.recv_probes, 2872 l_ptr->stats.recv_nacks, 2873 l_ptr->stats.deferred_recv, 2874 l_ptr->stats.duplicates); 2875 tipc_printf(&pb, " TX states:%u probes:%u naks:%u acks:%u dups:%u\n", 2876 l_ptr->stats.sent_states, 2877 l_ptr->stats.sent_probes, 2878 l_ptr->stats.sent_nacks, 2879 l_ptr->stats.sent_acks, 2880 l_ptr->stats.retransmitted); 2881 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 2882 l_ptr->stats.bearer_congs, 2883 l_ptr->stats.link_congs, 2884 l_ptr->stats.max_queue_sz, 2885 l_ptr->stats.queue_sz_counts 2886 ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts) 2887 : 0); 2888 2889 tipc_node_unlock(node); 2890 read_unlock_bh(&tipc_net_lock); 2891 return tipc_printbuf_validate(&pb); 2892 } 2893 2894 #define MAX_LINK_STATS_INFO 2000 2895 2896 struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space) 2897 { 2898 struct sk_buff *buf; 2899 struct tlv_desc *rep_tlv; 2900 int str_len; 2901 2902 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2903 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2904 2905 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO)); 2906 if (!buf) 2907 return NULL; 2908 2909 rep_tlv = (struct tlv_desc *)buf->data; 2910 2911 str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area), 2912 (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO); 2913 if (!str_len) { 2914 buf_discard(buf); 2915 return tipc_cfg_reply_error_string("link not found"); 2916 } 2917 2918 skb_put(buf, TLV_SPACE(str_len)); 2919 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 2920 2921 return buf; 2922 } 2923 2924 /** 2925 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination 2926 * @dest: network address of destination node 2927 * @selector: used to select from set of active links 2928 * 2929 * If no active link can be found, uses default maximum packet size. 2930 */ 2931 2932 u32 tipc_link_get_max_pkt(u32 dest, u32 selector) 2933 { 2934 struct tipc_node *n_ptr; 2935 struct link *l_ptr; 2936 u32 res = MAX_PKT_DEFAULT; 2937 2938 if (dest == tipc_own_addr) 2939 return MAX_MSG_SIZE; 2940 2941 read_lock_bh(&tipc_net_lock); 2942 n_ptr = tipc_node_find(dest); 2943 if (n_ptr) { 2944 tipc_node_lock(n_ptr); 2945 l_ptr = n_ptr->active_links[selector & 1]; 2946 if (l_ptr) 2947 res = l_ptr->max_pkt; 2948 tipc_node_unlock(n_ptr); 2949 } 2950 read_unlock_bh(&tipc_net_lock); 2951 return res; 2952 } 2953 2954 static void link_print(struct link *l_ptr, const char *str) 2955 { 2956 char print_area[256]; 2957 struct print_buf pb; 2958 struct print_buf *buf = &pb; 2959 2960 tipc_printbuf_init(buf, print_area, sizeof(print_area)); 2961 2962 tipc_printf(buf, str); 2963 tipc_printf(buf, "Link %x<%s>:", 2964 l_ptr->addr, l_ptr->b_ptr->publ.name); 2965 2966 #ifdef CONFIG_TIPC_DEBUG 2967 if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr)) 2968 goto print_state; 2969 2970 tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no)); 2971 tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no)); 2972 tipc_printf(buf, "SQUE"); 2973 if (l_ptr->first_out) { 2974 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out))); 2975 if (l_ptr->next_out) 2976 tipc_printf(buf, "%u..", 2977 msg_seqno(buf_msg(l_ptr->next_out))); 2978 tipc_printf(buf, "%u]", msg_seqno(buf_msg(l_ptr->last_out))); 2979 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) - 2980 msg_seqno(buf_msg(l_ptr->first_out))) 2981 != (l_ptr->out_queue_size - 1)) || 2982 (l_ptr->last_out->next != NULL)) { 2983 tipc_printf(buf, "\nSend queue inconsistency\n"); 2984 tipc_printf(buf, "first_out= %x ", l_ptr->first_out); 2985 tipc_printf(buf, "next_out= %x ", l_ptr->next_out); 2986 tipc_printf(buf, "last_out= %x ", l_ptr->last_out); 2987 } 2988 } else 2989 tipc_printf(buf, "[]"); 2990 tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size); 2991 if (l_ptr->oldest_deferred_in) { 2992 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 2993 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in)); 2994 tipc_printf(buf, ":RQUE[%u..%u]", o, n); 2995 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) { 2996 tipc_printf(buf, ":RQSIZ(%u)", 2997 l_ptr->deferred_inqueue_sz); 2998 } 2999 } 3000 print_state: 3001 #endif 3002 3003 if (link_working_unknown(l_ptr)) 3004 tipc_printf(buf, ":WU"); 3005 else if (link_reset_reset(l_ptr)) 3006 tipc_printf(buf, ":RR"); 3007 else if (link_reset_unknown(l_ptr)) 3008 tipc_printf(buf, ":RU"); 3009 else if (link_working_working(l_ptr)) 3010 tipc_printf(buf, ":WW"); 3011 tipc_printf(buf, "\n"); 3012 3013 tipc_printbuf_validate(buf); 3014 info("%s", print_area); 3015 } 3016 3017