1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2021 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * midcomms.c 14 * 15 * This is the appallingly named "mid-level" comms layer. It takes care about 16 * deliver an on application layer "reliable" communication above the used 17 * lowcomms transport layer. 18 * 19 * How it works: 20 * 21 * Each nodes keeps track of all send DLM messages in send_queue with a sequence 22 * number. The receive will send an DLM_ACK message back for every DLM message 23 * received at the other side. If a reconnect happens in lowcomms we will send 24 * all unacknowledged dlm messages again. The receiving side might drop any already 25 * received message by comparing sequence numbers. 26 * 27 * How version detection works: 28 * 29 * Due the fact that dlm has pre-configured node addresses on every side 30 * it is in it's nature that every side connects at starts to transmit 31 * dlm messages which ends in a race. However DLM_RCOM_NAMES, DLM_RCOM_STATUS 32 * and their replies are the first messages which are exchanges. Due backwards 33 * compatibility these messages are not covered by the midcomms re-transmission 34 * layer. These messages have their own re-transmission handling in the dlm 35 * application layer. The version field of every node will be set on these RCOM 36 * messages as soon as they arrived and the node isn't yet part of the nodes 37 * hash. There exists also logic to detect version mismatched if something weird 38 * going on or the first messages isn't an expected one. 39 * 40 * Termination: 41 * 42 * The midcomms layer does a 4 way handshake for termination on DLM protocol 43 * like TCP supports it with half-closed socket support. SCTP doesn't support 44 * half-closed socket, so we do it on DLM layer. Also socket shutdown() can be 45 * interrupted by .e.g. tcp reset itself. Additional there exists the othercon 46 * paradigm in lowcomms which cannot be easily without breaking backwards 47 * compatibility. A node cannot send anything to another node when a DLM_FIN 48 * message was send. There exists additional logic to print a warning if 49 * DLM wants to do it. There exists a state handling like RFC 793 but reduced 50 * to termination only. The event "member removal event" describes the cluster 51 * manager removed the node from internal lists, at this point DLM does not 52 * send any message to the other node. There exists two cases: 53 * 54 * 1. The cluster member was removed and we received a FIN 55 * OR 56 * 2. We received a FIN but the member was not removed yet 57 * 58 * One of these cases will do the CLOSE_WAIT to LAST_ACK change. 59 * 60 * 61 * +---------+ 62 * | CLOSED | 63 * +---------+ 64 * | add member/receive RCOM version 65 * | detection msg 66 * V 67 * +---------+ 68 * | ESTAB | 69 * +---------+ 70 * CLOSE | | rcv FIN 71 * ------- | | ------- 72 * +---------+ snd FIN / \ snd ACK +---------+ 73 * | FIN |<----------------- ------------------>| CLOSE | 74 * | WAIT-1 |------------------ | WAIT | 75 * +---------+ rcv FIN \ +---------+ 76 * | rcv ACK of FIN ------- | CLOSE | member 77 * | -------------- snd ACK | ------- | removal 78 * V x V snd FIN V event 79 * +---------+ +---------+ +---------+ 80 * |FINWAIT-2| | CLOSING | | LAST-ACK| 81 * +---------+ +---------+ +---------+ 82 * | rcv ACK of FIN | rcv ACK of FIN | 83 * | rcv FIN -------------- | -------------- | 84 * | ------- x V x V 85 * \ snd ACK +---------+ +---------+ 86 * ------------------------>| CLOSED | | CLOSED | 87 * +---------+ +---------+ 88 * 89 * NOTE: any state can interrupted by midcomms_close() and state will be 90 * switched to CLOSED in case of fencing. There exists also some timeout 91 * handling when we receive the version detection RCOM messages which is 92 * made by observation. 93 * 94 * Future improvements: 95 * 96 * There exists some known issues/improvements of the dlm handling. Some 97 * of them should be done in a next major dlm version bump which makes 98 * it incompatible with previous versions. 99 * 100 * Unaligned memory access: 101 * 102 * There exists cases when the dlm message buffer length is not aligned 103 * to 8 byte. However seems nobody detected any problem with it. This 104 * can be fixed in the next major version bump of dlm. 105 * 106 * Version detection: 107 * 108 * The version detection and how it's done is related to backwards 109 * compatibility. There exists better ways to make a better handling. 110 * However this should be changed in the next major version bump of dlm. 111 * 112 * Ack handling: 113 * 114 * Currently we send an ack message for every dlm message. However we 115 * can ack multiple dlm messages with one ack by just delaying the ack 116 * message. Will reduce some traffic but makes the drop detection slower. 117 * 118 * Tail Size checking: 119 * 120 * There exists a message tail payload in e.g. DLM_MSG however we don't 121 * check it against the message length yet regarding to the receive buffer 122 * length. That need to be validated. 123 * 124 * Fencing bad nodes: 125 * 126 * At timeout places or weird sequence number behaviours we should send 127 * a fencing request to the cluster manager. 128 */ 129 130 /* Debug switch to enable a 5 seconds sleep waiting of a termination. 131 * This can be useful to test fencing while termination is running. 132 * This requires a setup with only gfs2 as dlm user, so that the 133 * last umount will terminate the connection. 134 * 135 * However it became useful to test, while the 5 seconds block in umount 136 * just press the reset button. In a lot of dropping the termination 137 * process can could take several seconds. 138 */ 139 #define DLM_DEBUG_FENCE_TERMINATION 0 140 141 #include <net/tcp.h> 142 143 #include "dlm_internal.h" 144 #include "lowcomms.h" 145 #include "config.h" 146 #include "lock.h" 147 #include "util.h" 148 #include "midcomms.h" 149 150 /* init value for sequence numbers for testing purpose only e.g. overflows */ 151 #define DLM_SEQ_INIT 0 152 /* 3 minutes wait to sync ending of dlm */ 153 #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(3 * 60 * 1000) 154 #define DLM_VERSION_NOT_SET 0 155 156 struct midcomms_node { 157 int nodeid; 158 uint32_t version; 159 uint32_t seq_send; 160 uint32_t seq_next; 161 /* These queues are unbound because we cannot drop any message in dlm. 162 * We could send a fence signal for a specific node to the cluster 163 * manager if queues hits some maximum value, however this handling 164 * not supported yet. 165 */ 166 struct list_head send_queue; 167 spinlock_t send_queue_lock; 168 atomic_t send_queue_cnt; 169 #define DLM_NODE_FLAG_CLOSE 1 170 #define DLM_NODE_FLAG_STOP_TX 2 171 #define DLM_NODE_FLAG_STOP_RX 3 172 unsigned long flags; 173 wait_queue_head_t shutdown_wait; 174 175 /* dlm tcp termination state */ 176 #define DLM_CLOSED 1 177 #define DLM_ESTABLISHED 2 178 #define DLM_FIN_WAIT1 3 179 #define DLM_FIN_WAIT2 4 180 #define DLM_CLOSE_WAIT 5 181 #define DLM_LAST_ACK 6 182 #define DLM_CLOSING 7 183 int state; 184 spinlock_t state_lock; 185 186 /* counts how many lockspaces are using this node 187 * this refcount is necessary to determine if the 188 * node wants to disconnect. 189 */ 190 int users; 191 192 /* not protected by srcu, node_hash lifetime */ 193 void *debugfs; 194 195 struct hlist_node hlist; 196 struct rcu_head rcu; 197 }; 198 199 struct dlm_mhandle { 200 const struct dlm_header *inner_hd; 201 struct midcomms_node *node; 202 struct dlm_opts *opts; 203 struct dlm_msg *msg; 204 bool committed; 205 uint32_t seq; 206 207 void (*ack_rcv)(struct midcomms_node *node); 208 209 /* get_mhandle/commit srcu idx exchange */ 210 int idx; 211 212 struct list_head list; 213 struct rcu_head rcu; 214 }; 215 216 static struct hlist_head node_hash[CONN_HASH_SIZE]; 217 static DEFINE_SPINLOCK(nodes_lock); 218 DEFINE_STATIC_SRCU(nodes_srcu); 219 220 /* This mutex prevents that midcomms_close() is running while 221 * stop() or remove(). As I experienced invalid memory access 222 * behaviours when DLM_DEBUG_FENCE_TERMINATION is enabled and 223 * resetting machines. I will end in some double deletion in nodes 224 * datastructure. 225 */ 226 static DEFINE_MUTEX(close_lock); 227 228 static inline const char *dlm_state_str(int state) 229 { 230 switch (state) { 231 case DLM_CLOSED: 232 return "CLOSED"; 233 case DLM_ESTABLISHED: 234 return "ESTABLISHED"; 235 case DLM_FIN_WAIT1: 236 return "FIN_WAIT1"; 237 case DLM_FIN_WAIT2: 238 return "FIN_WAIT2"; 239 case DLM_CLOSE_WAIT: 240 return "CLOSE_WAIT"; 241 case DLM_LAST_ACK: 242 return "LAST_ACK"; 243 case DLM_CLOSING: 244 return "CLOSING"; 245 default: 246 return "UNKNOWN"; 247 } 248 } 249 250 const char *dlm_midcomms_state(struct midcomms_node *node) 251 { 252 return dlm_state_str(node->state); 253 } 254 255 unsigned long dlm_midcomms_flags(struct midcomms_node *node) 256 { 257 return node->flags; 258 } 259 260 int dlm_midcomms_send_queue_cnt(struct midcomms_node *node) 261 { 262 return atomic_read(&node->send_queue_cnt); 263 } 264 265 uint32_t dlm_midcomms_version(struct midcomms_node *node) 266 { 267 return node->version; 268 } 269 270 static struct midcomms_node *__find_node(int nodeid, int r) 271 { 272 struct midcomms_node *node; 273 274 hlist_for_each_entry_rcu(node, &node_hash[r], hlist) { 275 if (node->nodeid == nodeid) 276 return node; 277 } 278 279 return NULL; 280 } 281 282 static void dlm_mhandle_release(struct rcu_head *rcu) 283 { 284 struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu); 285 286 dlm_lowcomms_put_msg(mh->msg); 287 kfree(mh); 288 } 289 290 static void dlm_mhandle_delete(struct midcomms_node *node, 291 struct dlm_mhandle *mh) 292 { 293 list_del_rcu(&mh->list); 294 atomic_dec(&node->send_queue_cnt); 295 call_rcu(&mh->rcu, dlm_mhandle_release); 296 } 297 298 static void dlm_send_queue_flush(struct midcomms_node *node) 299 { 300 struct dlm_mhandle *mh; 301 302 pr_debug("flush midcomms send queue of node %d\n", node->nodeid); 303 304 rcu_read_lock(); 305 spin_lock(&node->send_queue_lock); 306 list_for_each_entry_rcu(mh, &node->send_queue, list) { 307 dlm_mhandle_delete(node, mh); 308 } 309 spin_unlock(&node->send_queue_lock); 310 rcu_read_unlock(); 311 } 312 313 static void midcomms_node_reset(struct midcomms_node *node) 314 { 315 pr_debug("reset node %d\n", node->nodeid); 316 317 node->seq_next = DLM_SEQ_INIT; 318 node->seq_send = DLM_SEQ_INIT; 319 node->version = DLM_VERSION_NOT_SET; 320 node->flags = 0; 321 322 dlm_send_queue_flush(node); 323 node->state = DLM_CLOSED; 324 wake_up(&node->shutdown_wait); 325 } 326 327 static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc) 328 { 329 struct midcomms_node *node, *tmp; 330 int r = nodeid_hash(nodeid); 331 332 node = __find_node(nodeid, r); 333 if (node || !alloc) 334 return node; 335 336 node = kmalloc(sizeof(*node), alloc); 337 if (!node) 338 return NULL; 339 340 node->nodeid = nodeid; 341 spin_lock_init(&node->state_lock); 342 spin_lock_init(&node->send_queue_lock); 343 atomic_set(&node->send_queue_cnt, 0); 344 INIT_LIST_HEAD(&node->send_queue); 345 init_waitqueue_head(&node->shutdown_wait); 346 node->users = 0; 347 midcomms_node_reset(node); 348 349 spin_lock(&nodes_lock); 350 /* check again if there was somebody else 351 * earlier here to add the node 352 */ 353 tmp = __find_node(nodeid, r); 354 if (tmp) { 355 spin_unlock(&nodes_lock); 356 kfree(node); 357 return tmp; 358 } 359 360 hlist_add_head_rcu(&node->hlist, &node_hash[r]); 361 spin_unlock(&nodes_lock); 362 363 node->debugfs = dlm_create_debug_comms_file(nodeid, node); 364 return node; 365 } 366 367 static int dlm_send_ack(int nodeid, uint32_t seq) 368 { 369 int mb_len = sizeof(struct dlm_header); 370 struct dlm_header *m_header; 371 struct dlm_msg *msg; 372 char *ppc; 373 374 msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc, 375 NULL, NULL); 376 if (!msg) 377 return -ENOMEM; 378 379 m_header = (struct dlm_header *)ppc; 380 381 m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 382 m_header->h_nodeid = dlm_our_nodeid(); 383 m_header->h_length = mb_len; 384 m_header->h_cmd = DLM_ACK; 385 m_header->u.h_seq = seq; 386 387 header_out(m_header); 388 dlm_lowcomms_commit_msg(msg); 389 dlm_lowcomms_put_msg(msg); 390 391 return 0; 392 } 393 394 static int dlm_send_fin(struct midcomms_node *node, 395 void (*ack_rcv)(struct midcomms_node *node)) 396 { 397 int mb_len = sizeof(struct dlm_header); 398 struct dlm_header *m_header; 399 struct dlm_mhandle *mh; 400 char *ppc; 401 402 mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc); 403 if (!mh) 404 return -ENOMEM; 405 406 mh->ack_rcv = ack_rcv; 407 408 m_header = (struct dlm_header *)ppc; 409 410 m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 411 m_header->h_nodeid = dlm_our_nodeid(); 412 m_header->h_length = mb_len; 413 m_header->h_cmd = DLM_FIN; 414 415 header_out(m_header); 416 417 pr_debug("sending fin msg to node %d\n", node->nodeid); 418 dlm_midcomms_commit_mhandle(mh); 419 set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags); 420 421 return 0; 422 } 423 424 static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq) 425 { 426 struct dlm_mhandle *mh; 427 428 rcu_read_lock(); 429 list_for_each_entry_rcu(mh, &node->send_queue, list) { 430 if (before(mh->seq, seq)) { 431 if (mh->ack_rcv) 432 mh->ack_rcv(node); 433 } else { 434 /* send queue should be ordered */ 435 break; 436 } 437 } 438 439 spin_lock(&node->send_queue_lock); 440 list_for_each_entry_rcu(mh, &node->send_queue, list) { 441 if (before(mh->seq, seq)) { 442 dlm_mhandle_delete(node, mh); 443 } else { 444 /* send queue should be ordered */ 445 break; 446 } 447 } 448 spin_unlock(&node->send_queue_lock); 449 rcu_read_unlock(); 450 } 451 452 static void dlm_pas_fin_ack_rcv(struct midcomms_node *node) 453 { 454 spin_lock(&node->state_lock); 455 pr_debug("receive passive fin ack from node %d with state %s\n", 456 node->nodeid, dlm_state_str(node->state)); 457 458 switch (node->state) { 459 case DLM_LAST_ACK: 460 /* DLM_CLOSED */ 461 midcomms_node_reset(node); 462 break; 463 case DLM_CLOSED: 464 /* not valid but somehow we got what we want */ 465 wake_up(&node->shutdown_wait); 466 break; 467 default: 468 spin_unlock(&node->state_lock); 469 log_print("%s: unexpected state: %d\n", 470 __func__, node->state); 471 WARN_ON(1); 472 return; 473 } 474 spin_unlock(&node->state_lock); 475 } 476 477 static void dlm_midcomms_receive_buffer(union dlm_packet *p, 478 struct midcomms_node *node, 479 uint32_t seq) 480 { 481 if (seq == node->seq_next) { 482 node->seq_next++; 483 /* send ack before fin */ 484 dlm_send_ack(node->nodeid, node->seq_next); 485 486 switch (p->header.h_cmd) { 487 case DLM_FIN: 488 spin_lock(&node->state_lock); 489 pr_debug("receive fin msg from node %d with state %s\n", 490 node->nodeid, dlm_state_str(node->state)); 491 492 switch (node->state) { 493 case DLM_ESTABLISHED: 494 node->state = DLM_CLOSE_WAIT; 495 pr_debug("switch node %d to state %s\n", 496 node->nodeid, dlm_state_str(node->state)); 497 /* passive shutdown DLM_LAST_ACK case 1 498 * additional we check if the node is used by 499 * cluster manager events at all. 500 */ 501 if (node->users == 0) { 502 node->state = DLM_LAST_ACK; 503 pr_debug("switch node %d to state %s case 1\n", 504 node->nodeid, dlm_state_str(node->state)); 505 spin_unlock(&node->state_lock); 506 goto send_fin; 507 } 508 break; 509 case DLM_FIN_WAIT1: 510 node->state = DLM_CLOSING; 511 pr_debug("switch node %d to state %s\n", 512 node->nodeid, dlm_state_str(node->state)); 513 break; 514 case DLM_FIN_WAIT2: 515 midcomms_node_reset(node); 516 pr_debug("switch node %d to state %s\n", 517 node->nodeid, dlm_state_str(node->state)); 518 wake_up(&node->shutdown_wait); 519 break; 520 case DLM_LAST_ACK: 521 /* probably remove_member caught it, do nothing */ 522 break; 523 default: 524 spin_unlock(&node->state_lock); 525 log_print("%s: unexpected state: %d\n", 526 __func__, node->state); 527 WARN_ON(1); 528 return; 529 } 530 spin_unlock(&node->state_lock); 531 532 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); 533 break; 534 default: 535 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); 536 dlm_receive_buffer(p, node->nodeid); 537 break; 538 } 539 } else { 540 /* retry to ack message which we already have by sending back 541 * current node->seq_next number as ack. 542 */ 543 if (seq < node->seq_next) 544 dlm_send_ack(node->nodeid, node->seq_next); 545 546 log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d", 547 seq, node->seq_next, node->nodeid); 548 } 549 550 return; 551 552 send_fin: 553 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); 554 dlm_send_fin(node, dlm_pas_fin_ack_rcv); 555 } 556 557 static struct midcomms_node * 558 dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p, 559 uint16_t msglen, int (*cb)(struct midcomms_node *node)) 560 { 561 struct midcomms_node *node = NULL; 562 gfp_t allocation = 0; 563 int ret; 564 565 switch (p->header.h_cmd) { 566 case DLM_RCOM: 567 if (msglen < sizeof(struct dlm_rcom)) { 568 log_print("rcom msg too small: %u, will skip this message from node %d", 569 msglen, nodeid); 570 return NULL; 571 } 572 573 switch (le32_to_cpu(p->rcom.rc_type)) { 574 case DLM_RCOM_NAMES: 575 fallthrough; 576 case DLM_RCOM_NAMES_REPLY: 577 fallthrough; 578 case DLM_RCOM_STATUS: 579 fallthrough; 580 case DLM_RCOM_STATUS_REPLY: 581 node = nodeid2node(nodeid, 0); 582 if (node) { 583 spin_lock(&node->state_lock); 584 if (node->state != DLM_ESTABLISHED) 585 pr_debug("receive begin RCOM msg from node %d with state %s\n", 586 node->nodeid, dlm_state_str(node->state)); 587 588 switch (node->state) { 589 case DLM_CLOSED: 590 node->state = DLM_ESTABLISHED; 591 pr_debug("switch node %d to state %s\n", 592 node->nodeid, dlm_state_str(node->state)); 593 break; 594 case DLM_ESTABLISHED: 595 break; 596 default: 597 /* some invalid state passive shutdown 598 * was failed, we try to reset and 599 * hope it will go on. 600 */ 601 log_print("reset node %d because shutdown stuck", 602 node->nodeid); 603 604 midcomms_node_reset(node); 605 node->state = DLM_ESTABLISHED; 606 break; 607 } 608 spin_unlock(&node->state_lock); 609 } 610 611 allocation = GFP_NOFS; 612 break; 613 default: 614 break; 615 } 616 617 break; 618 default: 619 break; 620 } 621 622 node = nodeid2node(nodeid, allocation); 623 if (!node) { 624 switch (p->header.h_cmd) { 625 case DLM_OPTS: 626 if (msglen < sizeof(struct dlm_opts)) { 627 log_print("opts msg too small: %u, will skip this message from node %d", 628 msglen, nodeid); 629 return NULL; 630 } 631 632 log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence", 633 p->opts.o_nextcmd, nodeid); 634 break; 635 default: 636 log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence", 637 p->header.h_cmd, nodeid); 638 break; 639 } 640 641 return NULL; 642 } 643 644 ret = cb(node); 645 if (ret < 0) 646 return NULL; 647 648 return node; 649 } 650 651 static int dlm_midcomms_version_check_3_2(struct midcomms_node *node) 652 { 653 switch (node->version) { 654 case DLM_VERSION_NOT_SET: 655 node->version = DLM_VERSION_3_2; 656 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2, 657 node->nodeid); 658 break; 659 case DLM_VERSION_3_2: 660 break; 661 default: 662 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", 663 DLM_VERSION_3_2, node->nodeid, node->version); 664 return -1; 665 } 666 667 return 0; 668 } 669 670 static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid) 671 { 672 int len = msglen; 673 674 /* we only trust outer header msglen because 675 * it's checked against receive buffer length. 676 */ 677 if (len < sizeof(struct dlm_opts)) 678 return -1; 679 len -= sizeof(struct dlm_opts); 680 681 if (len < le16_to_cpu(p->opts.o_optlen)) 682 return -1; 683 len -= le16_to_cpu(p->opts.o_optlen); 684 685 switch (p->opts.o_nextcmd) { 686 case DLM_FIN: 687 if (len < sizeof(struct dlm_header)) { 688 log_print("fin too small: %d, will skip this message from node %d", 689 len, nodeid); 690 return -1; 691 } 692 693 break; 694 case DLM_MSG: 695 if (len < sizeof(struct dlm_message)) { 696 log_print("msg too small: %d, will skip this message from node %d", 697 msglen, nodeid); 698 return -1; 699 } 700 701 break; 702 case DLM_RCOM: 703 if (len < sizeof(struct dlm_rcom)) { 704 log_print("rcom msg too small: %d, will skip this message from node %d", 705 len, nodeid); 706 return -1; 707 } 708 709 break; 710 default: 711 log_print("unsupported o_nextcmd received: %u, will skip this message from node %d", 712 p->opts.o_nextcmd, nodeid); 713 return -1; 714 } 715 716 return 0; 717 } 718 719 static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid) 720 { 721 uint16_t msglen = le16_to_cpu(p->header.h_length); 722 struct midcomms_node *node; 723 uint32_t seq; 724 int ret, idx; 725 726 idx = srcu_read_lock(&nodes_srcu); 727 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, 728 dlm_midcomms_version_check_3_2); 729 if (!node) 730 goto out; 731 732 switch (p->header.h_cmd) { 733 case DLM_RCOM: 734 /* these rcom message we use to determine version. 735 * they have their own retransmission handling and 736 * are the first messages of dlm. 737 * 738 * length already checked. 739 */ 740 switch (le32_to_cpu(p->rcom.rc_type)) { 741 case DLM_RCOM_NAMES: 742 fallthrough; 743 case DLM_RCOM_NAMES_REPLY: 744 fallthrough; 745 case DLM_RCOM_STATUS: 746 fallthrough; 747 case DLM_RCOM_STATUS_REPLY: 748 break; 749 default: 750 log_print("unsupported rcom type received: %u, will skip this message from node %d", 751 le32_to_cpu(p->rcom.rc_type), nodeid); 752 goto out; 753 } 754 755 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags)); 756 dlm_receive_buffer(p, nodeid); 757 break; 758 case DLM_OPTS: 759 seq = le32_to_cpu(p->header.u.h_seq); 760 761 ret = dlm_opts_check_msglen(p, msglen, nodeid); 762 if (ret < 0) { 763 log_print("opts msg too small: %u, will skip this message from node %d", 764 msglen, nodeid); 765 goto out; 766 } 767 768 p = (union dlm_packet *)((unsigned char *)p->opts.o_opts + 769 le16_to_cpu(p->opts.o_optlen)); 770 771 /* recheck inner msglen just if it's not garbage */ 772 msglen = le16_to_cpu(p->header.h_length); 773 switch (p->header.h_cmd) { 774 case DLM_RCOM: 775 if (msglen < sizeof(struct dlm_rcom)) { 776 log_print("inner rcom msg too small: %u, will skip this message from node %d", 777 msglen, nodeid); 778 goto out; 779 } 780 781 break; 782 case DLM_MSG: 783 if (msglen < sizeof(struct dlm_message)) { 784 log_print("inner msg too small: %u, will skip this message from node %d", 785 msglen, nodeid); 786 goto out; 787 } 788 789 break; 790 case DLM_FIN: 791 if (msglen < sizeof(struct dlm_header)) { 792 log_print("inner fin too small: %u, will skip this message from node %d", 793 msglen, nodeid); 794 goto out; 795 } 796 797 break; 798 default: 799 log_print("unsupported inner h_cmd received: %u, will skip this message from node %d", 800 msglen, nodeid); 801 goto out; 802 } 803 804 dlm_midcomms_receive_buffer(p, node, seq); 805 break; 806 case DLM_ACK: 807 seq = le32_to_cpu(p->header.u.h_seq); 808 dlm_receive_ack(node, seq); 809 break; 810 default: 811 log_print("unsupported h_cmd received: %u, will skip this message from node %d", 812 p->header.h_cmd, nodeid); 813 break; 814 } 815 816 out: 817 srcu_read_unlock(&nodes_srcu, idx); 818 } 819 820 static int dlm_midcomms_version_check_3_1(struct midcomms_node *node) 821 { 822 switch (node->version) { 823 case DLM_VERSION_NOT_SET: 824 node->version = DLM_VERSION_3_1; 825 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1, 826 node->nodeid); 827 break; 828 case DLM_VERSION_3_1: 829 break; 830 default: 831 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x", 832 DLM_VERSION_3_1, node->nodeid, node->version); 833 return -1; 834 } 835 836 return 0; 837 } 838 839 static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid) 840 { 841 uint16_t msglen = le16_to_cpu(p->header.h_length); 842 struct midcomms_node *node; 843 int idx; 844 845 idx = srcu_read_lock(&nodes_srcu); 846 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen, 847 dlm_midcomms_version_check_3_1); 848 if (!node) { 849 srcu_read_unlock(&nodes_srcu, idx); 850 return; 851 } 852 srcu_read_unlock(&nodes_srcu, idx); 853 854 switch (p->header.h_cmd) { 855 case DLM_RCOM: 856 /* length already checked */ 857 break; 858 case DLM_MSG: 859 if (msglen < sizeof(struct dlm_message)) { 860 log_print("msg too small: %u, will skip this message from node %d", 861 msglen, nodeid); 862 return; 863 } 864 865 break; 866 default: 867 log_print("unsupported h_cmd received: %u, will skip this message from node %d", 868 p->header.h_cmd, nodeid); 869 return; 870 } 871 872 dlm_receive_buffer(p, nodeid); 873 } 874 875 /* 876 * Called from the low-level comms layer to process a buffer of 877 * commands. 878 */ 879 880 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) 881 { 882 const unsigned char *ptr = buf; 883 const struct dlm_header *hd; 884 uint16_t msglen; 885 int ret = 0; 886 887 while (len >= sizeof(struct dlm_header)) { 888 hd = (struct dlm_header *)ptr; 889 890 /* no message should be more than DLM_MAX_SOCKET_BUFSIZE or 891 * less than dlm_header size. 892 * 893 * Some messages does not have a 8 byte length boundary yet 894 * which can occur in a unaligned memory access of some dlm 895 * messages. However this problem need to be fixed at the 896 * sending side, for now it seems nobody run into architecture 897 * related issues yet but it slows down some processing. 898 * Fixing this issue should be scheduled in future by doing 899 * the next major version bump. 900 */ 901 msglen = le16_to_cpu(hd->h_length); 902 if (msglen > DLM_MAX_SOCKET_BUFSIZE || 903 msglen < sizeof(struct dlm_header)) { 904 log_print("received invalid length header: %u from node %d, will abort message parsing", 905 msglen, nodeid); 906 return -EBADMSG; 907 } 908 909 /* caller will take care that leftover 910 * will be parsed next call with more data 911 */ 912 if (msglen > len) 913 break; 914 915 switch (le32_to_cpu(hd->h_version)) { 916 case DLM_VERSION_3_1: 917 dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid); 918 break; 919 case DLM_VERSION_3_2: 920 dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid); 921 break; 922 default: 923 log_print("received invalid version header: %u from node %d, will skip this message", 924 le32_to_cpu(hd->h_version), nodeid); 925 break; 926 } 927 928 ret += msglen; 929 len -= msglen; 930 ptr += msglen; 931 } 932 933 return ret; 934 } 935 936 void dlm_midcomms_unack_msg_resend(int nodeid) 937 { 938 struct midcomms_node *node; 939 struct dlm_mhandle *mh; 940 int idx, ret; 941 942 idx = srcu_read_lock(&nodes_srcu); 943 node = nodeid2node(nodeid, 0); 944 if (!node) { 945 srcu_read_unlock(&nodes_srcu, idx); 946 return; 947 } 948 949 /* old protocol, we don't support to retransmit on failure */ 950 switch (node->version) { 951 case DLM_VERSION_3_2: 952 break; 953 default: 954 srcu_read_unlock(&nodes_srcu, idx); 955 return; 956 } 957 958 rcu_read_lock(); 959 list_for_each_entry_rcu(mh, &node->send_queue, list) { 960 if (!mh->committed) 961 continue; 962 963 ret = dlm_lowcomms_resend_msg(mh->msg); 964 if (!ret) 965 log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d", 966 mh->seq, node->nodeid); 967 } 968 rcu_read_unlock(); 969 srcu_read_unlock(&nodes_srcu, idx); 970 } 971 972 static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len, 973 uint32_t seq) 974 { 975 opts->o_header.h_cmd = DLM_OPTS; 976 opts->o_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 977 opts->o_header.h_nodeid = dlm_our_nodeid(); 978 opts->o_header.h_length = DLM_MIDCOMMS_OPT_LEN + inner_len; 979 opts->o_header.u.h_seq = seq; 980 header_out(&opts->o_header); 981 } 982 983 static void midcomms_new_msg_cb(struct dlm_mhandle *mh) 984 { 985 atomic_inc(&mh->node->send_queue_cnt); 986 987 spin_lock(&mh->node->send_queue_lock); 988 list_add_tail_rcu(&mh->list, &mh->node->send_queue); 989 spin_unlock(&mh->node->send_queue_lock); 990 991 mh->seq = mh->node->seq_send++; 992 } 993 994 static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid, 995 int len, gfp_t allocation, char **ppc) 996 { 997 struct dlm_opts *opts; 998 struct dlm_msg *msg; 999 1000 msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN, 1001 allocation, ppc, midcomms_new_msg_cb, mh); 1002 if (!msg) 1003 return NULL; 1004 1005 opts = (struct dlm_opts *)*ppc; 1006 mh->opts = opts; 1007 1008 /* add possible options here */ 1009 dlm_fill_opts_header(opts, len, mh->seq); 1010 1011 *ppc += sizeof(*opts); 1012 mh->inner_hd = (const struct dlm_header *)*ppc; 1013 return msg; 1014 } 1015 1016 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len, 1017 gfp_t allocation, char **ppc) 1018 { 1019 struct midcomms_node *node; 1020 struct dlm_mhandle *mh; 1021 struct dlm_msg *msg; 1022 int idx; 1023 1024 idx = srcu_read_lock(&nodes_srcu); 1025 node = nodeid2node(nodeid, 0); 1026 if (!node) { 1027 WARN_ON_ONCE(1); 1028 goto err; 1029 } 1030 1031 /* this is a bug, however we going on and hope it will be resolved */ 1032 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags)); 1033 1034 mh = kzalloc(sizeof(*mh), GFP_NOFS); 1035 if (!mh) 1036 goto err; 1037 1038 mh->idx = idx; 1039 mh->node = node; 1040 1041 switch (node->version) { 1042 case DLM_VERSION_3_1: 1043 msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc, 1044 NULL, NULL); 1045 if (!msg) { 1046 kfree(mh); 1047 goto err; 1048 } 1049 1050 break; 1051 case DLM_VERSION_3_2: 1052 msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation, 1053 ppc); 1054 if (!msg) { 1055 kfree(mh); 1056 goto err; 1057 } 1058 1059 break; 1060 default: 1061 kfree(mh); 1062 WARN_ON(1); 1063 goto err; 1064 } 1065 1066 mh->msg = msg; 1067 1068 /* keep in mind that is a must to call 1069 * dlm_midcomms_commit_msg() which releases 1070 * nodes_srcu using mh->idx which is assumed 1071 * here that the application will call it. 1072 */ 1073 return mh; 1074 1075 err: 1076 srcu_read_unlock(&nodes_srcu, idx); 1077 return NULL; 1078 } 1079 1080 static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh) 1081 { 1082 /* nexthdr chain for fast lookup */ 1083 mh->opts->o_nextcmd = mh->inner_hd->h_cmd; 1084 mh->committed = true; 1085 dlm_lowcomms_commit_msg(mh->msg); 1086 } 1087 1088 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh) 1089 { 1090 switch (mh->node->version) { 1091 case DLM_VERSION_3_1: 1092 srcu_read_unlock(&nodes_srcu, mh->idx); 1093 1094 dlm_lowcomms_commit_msg(mh->msg); 1095 dlm_lowcomms_put_msg(mh->msg); 1096 /* mh is not part of rcu list in this case */ 1097 kfree(mh); 1098 break; 1099 case DLM_VERSION_3_2: 1100 dlm_midcomms_commit_msg_3_2(mh); 1101 srcu_read_unlock(&nodes_srcu, mh->idx); 1102 break; 1103 default: 1104 srcu_read_unlock(&nodes_srcu, mh->idx); 1105 WARN_ON(1); 1106 break; 1107 } 1108 } 1109 1110 int dlm_midcomms_start(void) 1111 { 1112 int i; 1113 1114 for (i = 0; i < CONN_HASH_SIZE; i++) 1115 INIT_HLIST_HEAD(&node_hash[i]); 1116 1117 return dlm_lowcomms_start(); 1118 } 1119 1120 static void dlm_act_fin_ack_rcv(struct midcomms_node *node) 1121 { 1122 spin_lock(&node->state_lock); 1123 pr_debug("receive active fin ack from node %d with state %s\n", 1124 node->nodeid, dlm_state_str(node->state)); 1125 1126 switch (node->state) { 1127 case DLM_FIN_WAIT1: 1128 node->state = DLM_FIN_WAIT2; 1129 pr_debug("switch node %d to state %s\n", 1130 node->nodeid, dlm_state_str(node->state)); 1131 break; 1132 case DLM_CLOSING: 1133 midcomms_node_reset(node); 1134 pr_debug("switch node %d to state %s\n", 1135 node->nodeid, dlm_state_str(node->state)); 1136 wake_up(&node->shutdown_wait); 1137 break; 1138 case DLM_CLOSED: 1139 /* not valid but somehow we got what we want */ 1140 wake_up(&node->shutdown_wait); 1141 break; 1142 default: 1143 spin_unlock(&node->state_lock); 1144 log_print("%s: unexpected state: %d\n", 1145 __func__, node->state); 1146 WARN_ON(1); 1147 return; 1148 } 1149 spin_unlock(&node->state_lock); 1150 } 1151 1152 void dlm_midcomms_add_member(int nodeid) 1153 { 1154 struct midcomms_node *node; 1155 int idx; 1156 1157 if (nodeid == dlm_our_nodeid()) 1158 return; 1159 1160 idx = srcu_read_lock(&nodes_srcu); 1161 node = nodeid2node(nodeid, GFP_NOFS); 1162 if (!node) { 1163 srcu_read_unlock(&nodes_srcu, idx); 1164 return; 1165 } 1166 1167 spin_lock(&node->state_lock); 1168 if (!node->users) { 1169 pr_debug("receive add member from node %d with state %s\n", 1170 node->nodeid, dlm_state_str(node->state)); 1171 switch (node->state) { 1172 case DLM_ESTABLISHED: 1173 break; 1174 case DLM_CLOSED: 1175 node->state = DLM_ESTABLISHED; 1176 pr_debug("switch node %d to state %s\n", 1177 node->nodeid, dlm_state_str(node->state)); 1178 break; 1179 default: 1180 /* some invalid state passive shutdown 1181 * was failed, we try to reset and 1182 * hope it will go on. 1183 */ 1184 log_print("reset node %d because shutdown stuck", 1185 node->nodeid); 1186 1187 midcomms_node_reset(node); 1188 node->state = DLM_ESTABLISHED; 1189 break; 1190 } 1191 } 1192 1193 node->users++; 1194 pr_debug("users inc count %d\n", node->users); 1195 spin_unlock(&node->state_lock); 1196 1197 srcu_read_unlock(&nodes_srcu, idx); 1198 } 1199 1200 void dlm_midcomms_remove_member(int nodeid) 1201 { 1202 struct midcomms_node *node; 1203 int idx; 1204 1205 if (nodeid == dlm_our_nodeid()) 1206 return; 1207 1208 idx = srcu_read_lock(&nodes_srcu); 1209 node = nodeid2node(nodeid, 0); 1210 if (!node) { 1211 srcu_read_unlock(&nodes_srcu, idx); 1212 return; 1213 } 1214 1215 spin_lock(&node->state_lock); 1216 node->users--; 1217 pr_debug("users dec count %d\n", node->users); 1218 1219 /* hitting users count to zero means the 1220 * other side is running dlm_midcomms_stop() 1221 * we meet us to have a clean disconnect. 1222 */ 1223 if (node->users == 0) { 1224 pr_debug("receive remove member from node %d with state %s\n", 1225 node->nodeid, dlm_state_str(node->state)); 1226 switch (node->state) { 1227 case DLM_ESTABLISHED: 1228 break; 1229 case DLM_CLOSE_WAIT: 1230 /* passive shutdown DLM_LAST_ACK case 2 */ 1231 node->state = DLM_LAST_ACK; 1232 spin_unlock(&node->state_lock); 1233 1234 pr_debug("switch node %d to state %s case 2\n", 1235 node->nodeid, dlm_state_str(node->state)); 1236 goto send_fin; 1237 case DLM_LAST_ACK: 1238 /* probably receive fin caught it, do nothing */ 1239 break; 1240 case DLM_CLOSED: 1241 /* already gone, do nothing */ 1242 break; 1243 default: 1244 log_print("%s: unexpected state: %d\n", 1245 __func__, node->state); 1246 break; 1247 } 1248 } 1249 spin_unlock(&node->state_lock); 1250 1251 srcu_read_unlock(&nodes_srcu, idx); 1252 return; 1253 1254 send_fin: 1255 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags); 1256 dlm_send_fin(node, dlm_pas_fin_ack_rcv); 1257 srcu_read_unlock(&nodes_srcu, idx); 1258 } 1259 1260 static void midcomms_node_release(struct rcu_head *rcu) 1261 { 1262 struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu); 1263 1264 WARN_ON(atomic_read(&node->send_queue_cnt)); 1265 kfree(node); 1266 } 1267 1268 static void midcomms_shutdown(struct midcomms_node *node) 1269 { 1270 int ret; 1271 1272 /* old protocol, we don't wait for pending operations */ 1273 switch (node->version) { 1274 case DLM_VERSION_3_2: 1275 break; 1276 default: 1277 return; 1278 } 1279 1280 spin_lock(&node->state_lock); 1281 pr_debug("receive active shutdown for node %d with state %s\n", 1282 node->nodeid, dlm_state_str(node->state)); 1283 switch (node->state) { 1284 case DLM_ESTABLISHED: 1285 node->state = DLM_FIN_WAIT1; 1286 pr_debug("switch node %d to state %s case 2\n", 1287 node->nodeid, dlm_state_str(node->state)); 1288 break; 1289 case DLM_CLOSED: 1290 /* we have what we want */ 1291 spin_unlock(&node->state_lock); 1292 return; 1293 default: 1294 /* busy to enter DLM_FIN_WAIT1, wait until passive 1295 * done in shutdown_wait to enter DLM_CLOSED. 1296 */ 1297 break; 1298 } 1299 spin_unlock(&node->state_lock); 1300 1301 if (node->state == DLM_FIN_WAIT1) { 1302 dlm_send_fin(node, dlm_act_fin_ack_rcv); 1303 1304 if (DLM_DEBUG_FENCE_TERMINATION) 1305 msleep(5000); 1306 } 1307 1308 /* wait for other side dlm + fin */ 1309 ret = wait_event_timeout(node->shutdown_wait, 1310 node->state == DLM_CLOSED || 1311 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags), 1312 DLM_SHUTDOWN_TIMEOUT); 1313 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) { 1314 pr_debug("active shutdown timed out for node %d with state %s\n", 1315 node->nodeid, dlm_state_str(node->state)); 1316 midcomms_node_reset(node); 1317 return; 1318 } 1319 1320 pr_debug("active shutdown done for node %d with state %s\n", 1321 node->nodeid, dlm_state_str(node->state)); 1322 } 1323 1324 void dlm_midcomms_shutdown(void) 1325 { 1326 struct midcomms_node *node; 1327 int i, idx; 1328 1329 mutex_lock(&close_lock); 1330 idx = srcu_read_lock(&nodes_srcu); 1331 for (i = 0; i < CONN_HASH_SIZE; i++) { 1332 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) { 1333 midcomms_shutdown(node); 1334 1335 dlm_delete_debug_comms_file(node->debugfs); 1336 1337 spin_lock(&nodes_lock); 1338 hlist_del_rcu(&node->hlist); 1339 spin_unlock(&nodes_lock); 1340 1341 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release); 1342 } 1343 } 1344 srcu_read_unlock(&nodes_srcu, idx); 1345 mutex_unlock(&close_lock); 1346 1347 dlm_lowcomms_shutdown(); 1348 } 1349 1350 int dlm_midcomms_close(int nodeid) 1351 { 1352 struct midcomms_node *node; 1353 int idx, ret; 1354 1355 if (nodeid == dlm_our_nodeid()) 1356 return 0; 1357 1358 idx = srcu_read_lock(&nodes_srcu); 1359 /* Abort pending close/remove operation */ 1360 node = nodeid2node(nodeid, 0); 1361 if (node) { 1362 /* let shutdown waiters leave */ 1363 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags); 1364 wake_up(&node->shutdown_wait); 1365 } 1366 srcu_read_unlock(&nodes_srcu, idx); 1367 1368 synchronize_srcu(&nodes_srcu); 1369 1370 idx = srcu_read_lock(&nodes_srcu); 1371 mutex_lock(&close_lock); 1372 node = nodeid2node(nodeid, 0); 1373 if (!node) { 1374 mutex_unlock(&close_lock); 1375 srcu_read_unlock(&nodes_srcu, idx); 1376 return dlm_lowcomms_close(nodeid); 1377 } 1378 1379 ret = dlm_lowcomms_close(nodeid); 1380 spin_lock(&node->state_lock); 1381 midcomms_node_reset(node); 1382 spin_unlock(&node->state_lock); 1383 srcu_read_unlock(&nodes_srcu, idx); 1384 mutex_unlock(&close_lock); 1385 1386 return ret; 1387 } 1388