1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 58 #include "dlm_internal.h" 59 #include "lowcomms.h" 60 #include "midcomms.h" 61 #include "memory.h" 62 #include "config.h" 63 64 #define NEEDED_RMEM (4*1024*1024) 65 66 /* Number of messages to send before rescheduling */ 67 #define MAX_SEND_MSG_COUNT 25 68 69 struct connection { 70 struct socket *sock; /* NULL if not connected */ 71 uint32_t nodeid; /* So we know who we are in the list */ 72 struct mutex sock_mutex; 73 unsigned long flags; 74 #define CF_READ_PENDING 1 75 #define CF_WRITE_PENDING 2 76 #define CF_INIT_PENDING 4 77 #define CF_IS_OTHERCON 5 78 #define CF_CLOSE 6 79 #define CF_APP_LIMITED 7 80 #define CF_CLOSING 8 81 #define CF_CONNECTED 9 82 #define CF_RECONNECT 10 83 #define CF_DELAY_CONNECT 11 84 struct list_head writequeue; /* List of outgoing writequeue_entries */ 85 spinlock_t writequeue_lock; 86 int retries; 87 #define MAX_CONNECT_RETRIES 3 88 struct hlist_node list; 89 struct connection *othercon; 90 struct connection *sendcon; 91 struct work_struct rwork; /* Receive workqueue */ 92 struct work_struct swork; /* Send workqueue */ 93 unsigned char *rx_buf; 94 int rx_buflen; 95 int rx_leftover; 96 int mark; 97 int addr_count; 98 int curr_addr_index; 99 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 100 spinlock_t addrs_lock; 101 struct rcu_head rcu; 102 }; 103 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 104 105 struct listen_connection { 106 struct socket *sock; 107 struct work_struct rwork; 108 }; 109 110 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 111 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 112 113 /* An entry waiting to be sent */ 114 struct writequeue_entry { 115 struct list_head list; 116 struct page *page; 117 int offset; 118 int len; 119 int end; 120 int users; 121 bool dirty; 122 struct connection *con; 123 struct list_head msgs; 124 struct kref ref; 125 }; 126 127 struct dlm_msg { 128 struct writequeue_entry *entry; 129 struct dlm_msg *orig_msg; 130 bool retransmit; 131 void *ppc; 132 int len; 133 int idx; /* new()/commit() idx exchange */ 134 135 struct list_head list; 136 struct kref ref; 137 }; 138 139 struct dlm_proto_ops { 140 bool try_new_addr; 141 const char *name; 142 int proto; 143 144 int (*connect)(struct connection *con, struct socket *sock, 145 struct sockaddr *addr, int addr_len); 146 void (*sockopts)(struct socket *sock); 147 int (*bind)(struct socket *sock); 148 int (*listen_validate)(void); 149 void (*listen_sockopts)(struct socket *sock); 150 int (*listen_bind)(struct socket *sock); 151 }; 152 153 static struct listen_sock_callbacks { 154 void (*sk_error_report)(struct sock *); 155 void (*sk_data_ready)(struct sock *); 156 void (*sk_state_change)(struct sock *); 157 void (*sk_write_space)(struct sock *); 158 } listen_sock; 159 160 static struct listen_connection listen_con; 161 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 162 static int dlm_local_count; 163 164 /* Work queues */ 165 static struct workqueue_struct *recv_workqueue; 166 static struct workqueue_struct *send_workqueue; 167 168 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 169 static DEFINE_SPINLOCK(connections_lock); 170 DEFINE_STATIC_SRCU(connections_srcu); 171 172 static const struct dlm_proto_ops *dlm_proto_ops; 173 174 static void process_recv_sockets(struct work_struct *work); 175 static void process_send_sockets(struct work_struct *work); 176 177 bool dlm_lowcomms_is_running(void) 178 { 179 return !!listen_con.sock; 180 } 181 182 static void writequeue_entry_ctor(void *data) 183 { 184 struct writequeue_entry *entry = data; 185 186 INIT_LIST_HEAD(&entry->msgs); 187 } 188 189 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 190 { 191 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 192 0, 0, writequeue_entry_ctor); 193 } 194 195 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 196 { 197 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 198 } 199 200 /* need to held writequeue_lock */ 201 static struct writequeue_entry *con_next_wq(struct connection *con) 202 { 203 struct writequeue_entry *e; 204 205 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 206 list); 207 /* if len is zero nothing is to send, if there are users filling 208 * buffers we wait until the users are done so we can send more. 209 */ 210 if (!e || e->users || e->len == 0) 211 return NULL; 212 213 return e; 214 } 215 216 static struct connection *__find_con(int nodeid, int r) 217 { 218 struct connection *con; 219 220 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 221 if (con->nodeid == nodeid) 222 return con; 223 } 224 225 return NULL; 226 } 227 228 static int dlm_con_init(struct connection *con, int nodeid) 229 { 230 con->rx_buflen = dlm_config.ci_buffer_size; 231 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 232 if (!con->rx_buf) 233 return -ENOMEM; 234 235 con->nodeid = nodeid; 236 mutex_init(&con->sock_mutex); 237 INIT_LIST_HEAD(&con->writequeue); 238 spin_lock_init(&con->writequeue_lock); 239 INIT_WORK(&con->swork, process_send_sockets); 240 INIT_WORK(&con->rwork, process_recv_sockets); 241 242 return 0; 243 } 244 245 /* 246 * If 'allocation' is zero then we don't attempt to create a new 247 * connection structure for this node. 248 */ 249 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 250 { 251 struct connection *con, *tmp; 252 int r, ret; 253 254 r = nodeid_hash(nodeid); 255 con = __find_con(nodeid, r); 256 if (con || !alloc) 257 return con; 258 259 con = kzalloc(sizeof(*con), alloc); 260 if (!con) 261 return NULL; 262 263 ret = dlm_con_init(con, nodeid); 264 if (ret) { 265 kfree(con); 266 return NULL; 267 } 268 269 spin_lock(&connections_lock); 270 /* Because multiple workqueues/threads calls this function it can 271 * race on multiple cpu's. Instead of locking hot path __find_con() 272 * we just check in rare cases of recently added nodes again 273 * under protection of connections_lock. If this is the case we 274 * abort our connection creation and return the existing connection. 275 */ 276 tmp = __find_con(nodeid, r); 277 if (tmp) { 278 spin_unlock(&connections_lock); 279 kfree(con->rx_buf); 280 kfree(con); 281 return tmp; 282 } 283 284 hlist_add_head_rcu(&con->list, &connection_hash[r]); 285 spin_unlock(&connections_lock); 286 287 return con; 288 } 289 290 /* Loop round all connections */ 291 static void foreach_conn(void (*conn_func)(struct connection *c)) 292 { 293 int i; 294 struct connection *con; 295 296 for (i = 0; i < CONN_HASH_SIZE; i++) { 297 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 298 conn_func(con); 299 } 300 } 301 302 static int addr_compare(const struct sockaddr_storage *x, 303 const struct sockaddr_storage *y) 304 { 305 switch (x->ss_family) { 306 case AF_INET: { 307 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 308 struct sockaddr_in *siny = (struct sockaddr_in *)y; 309 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 310 return 0; 311 if (sinx->sin_port != siny->sin_port) 312 return 0; 313 break; 314 } 315 case AF_INET6: { 316 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 317 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 318 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 319 return 0; 320 if (sinx->sin6_port != siny->sin6_port) 321 return 0; 322 break; 323 } 324 default: 325 return 0; 326 } 327 return 1; 328 } 329 330 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 331 struct sockaddr *sa_out, bool try_new_addr, 332 unsigned int *mark) 333 { 334 struct sockaddr_storage sas; 335 struct connection *con; 336 int idx; 337 338 if (!dlm_local_count) 339 return -1; 340 341 idx = srcu_read_lock(&connections_srcu); 342 con = nodeid2con(nodeid, 0); 343 if (!con) { 344 srcu_read_unlock(&connections_srcu, idx); 345 return -ENOENT; 346 } 347 348 spin_lock(&con->addrs_lock); 349 if (!con->addr_count) { 350 spin_unlock(&con->addrs_lock); 351 srcu_read_unlock(&connections_srcu, idx); 352 return -ENOENT; 353 } 354 355 memcpy(&sas, &con->addr[con->curr_addr_index], 356 sizeof(struct sockaddr_storage)); 357 358 if (try_new_addr) { 359 con->curr_addr_index++; 360 if (con->curr_addr_index == con->addr_count) 361 con->curr_addr_index = 0; 362 } 363 364 *mark = con->mark; 365 spin_unlock(&con->addrs_lock); 366 367 if (sas_out) 368 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 369 370 if (!sa_out) { 371 srcu_read_unlock(&connections_srcu, idx); 372 return 0; 373 } 374 375 if (dlm_local_addr[0].ss_family == AF_INET) { 376 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 377 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 378 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 379 } else { 380 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 381 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 382 ret6->sin6_addr = in6->sin6_addr; 383 } 384 385 srcu_read_unlock(&connections_srcu, idx); 386 return 0; 387 } 388 389 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 390 unsigned int *mark) 391 { 392 struct connection *con; 393 int i, idx, addr_i; 394 395 idx = srcu_read_lock(&connections_srcu); 396 for (i = 0; i < CONN_HASH_SIZE; i++) { 397 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 398 WARN_ON_ONCE(!con->addr_count); 399 400 spin_lock(&con->addrs_lock); 401 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 402 if (addr_compare(&con->addr[addr_i], addr)) { 403 *nodeid = con->nodeid; 404 *mark = con->mark; 405 spin_unlock(&con->addrs_lock); 406 srcu_read_unlock(&connections_srcu, idx); 407 return 0; 408 } 409 } 410 spin_unlock(&con->addrs_lock); 411 } 412 } 413 srcu_read_unlock(&connections_srcu, idx); 414 415 return -ENOENT; 416 } 417 418 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 419 const struct sockaddr_storage *addr) 420 { 421 int i; 422 423 for (i = 0; i < con->addr_count; i++) { 424 if (addr_compare(&con->addr[i], addr)) 425 return true; 426 } 427 428 return false; 429 } 430 431 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 432 { 433 struct connection *con; 434 bool ret, idx; 435 436 idx = srcu_read_lock(&connections_srcu); 437 con = nodeid2con(nodeid, GFP_NOFS); 438 if (!con) { 439 srcu_read_unlock(&connections_srcu, idx); 440 return -ENOMEM; 441 } 442 443 spin_lock(&con->addrs_lock); 444 if (!con->addr_count) { 445 memcpy(&con->addr[0], addr, sizeof(*addr)); 446 con->addr_count = 1; 447 con->mark = dlm_config.ci_mark; 448 spin_unlock(&con->addrs_lock); 449 srcu_read_unlock(&connections_srcu, idx); 450 return 0; 451 } 452 453 ret = dlm_lowcomms_con_has_addr(con, addr); 454 if (ret) { 455 spin_unlock(&con->addrs_lock); 456 srcu_read_unlock(&connections_srcu, idx); 457 return -EEXIST; 458 } 459 460 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 461 spin_unlock(&con->addrs_lock); 462 srcu_read_unlock(&connections_srcu, idx); 463 return -ENOSPC; 464 } 465 466 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 467 srcu_read_unlock(&connections_srcu, idx); 468 spin_unlock(&con->addrs_lock); 469 return 0; 470 } 471 472 /* Data available on socket or listen socket received a connect */ 473 static void lowcomms_data_ready(struct sock *sk) 474 { 475 struct connection *con = sock2con(sk); 476 477 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 478 queue_work(recv_workqueue, &con->rwork); 479 } 480 481 static void lowcomms_listen_data_ready(struct sock *sk) 482 { 483 queue_work(recv_workqueue, &listen_con.rwork); 484 } 485 486 static void lowcomms_write_space(struct sock *sk) 487 { 488 struct connection *con = sock2con(sk); 489 490 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { 491 log_print("connected to node %d", con->nodeid); 492 queue_work(send_workqueue, &con->swork); 493 return; 494 } 495 496 clear_bit(SOCK_NOSPACE, &con->sock->flags); 497 498 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 499 con->sock->sk->sk_write_pending--; 500 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 501 } 502 503 queue_work(send_workqueue, &con->swork); 504 } 505 506 static inline void lowcomms_connect_sock(struct connection *con) 507 { 508 if (test_bit(CF_CLOSE, &con->flags)) 509 return; 510 queue_work(send_workqueue, &con->swork); 511 cond_resched(); 512 } 513 514 static void lowcomms_state_change(struct sock *sk) 515 { 516 /* SCTP layer is not calling sk_data_ready when the connection 517 * is done, so we catch the signal through here. Also, it 518 * doesn't switch socket state when entering shutdown, so we 519 * skip the write in that case. 520 */ 521 if (sk->sk_shutdown) { 522 if (sk->sk_shutdown == RCV_SHUTDOWN) 523 lowcomms_data_ready(sk); 524 } else if (sk->sk_state == TCP_ESTABLISHED) { 525 lowcomms_write_space(sk); 526 } 527 } 528 529 int dlm_lowcomms_connect_node(int nodeid) 530 { 531 struct connection *con; 532 int idx; 533 534 if (nodeid == dlm_our_nodeid()) 535 return 0; 536 537 idx = srcu_read_lock(&connections_srcu); 538 con = nodeid2con(nodeid, 0); 539 if (WARN_ON_ONCE(!con)) { 540 srcu_read_unlock(&connections_srcu, idx); 541 return -ENOENT; 542 } 543 544 lowcomms_connect_sock(con); 545 srcu_read_unlock(&connections_srcu, idx); 546 547 return 0; 548 } 549 550 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 551 { 552 struct connection *con; 553 int idx; 554 555 idx = srcu_read_lock(&connections_srcu); 556 con = nodeid2con(nodeid, 0); 557 if (!con) { 558 srcu_read_unlock(&connections_srcu, idx); 559 return -ENOENT; 560 } 561 562 spin_lock(&con->addrs_lock); 563 con->mark = mark; 564 spin_unlock(&con->addrs_lock); 565 srcu_read_unlock(&connections_srcu, idx); 566 return 0; 567 } 568 569 static void lowcomms_error_report(struct sock *sk) 570 { 571 struct connection *con = sock2con(sk); 572 struct inet_sock *inet; 573 574 inet = inet_sk(sk); 575 switch (sk->sk_family) { 576 case AF_INET: 577 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 578 "sending to node %d at %pI4, dport %d, " 579 "sk_err=%d/%d\n", dlm_our_nodeid(), 580 con->nodeid, &inet->inet_daddr, 581 ntohs(inet->inet_dport), sk->sk_err, 582 sk->sk_err_soft); 583 break; 584 #if IS_ENABLED(CONFIG_IPV6) 585 case AF_INET6: 586 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 587 "sending to node %d at %pI6c, " 588 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 589 con->nodeid, &sk->sk_v6_daddr, 590 ntohs(inet->inet_dport), sk->sk_err, 591 sk->sk_err_soft); 592 break; 593 #endif 594 default: 595 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 596 "invalid socket family %d set, " 597 "sk_err=%d/%d\n", dlm_our_nodeid(), 598 sk->sk_family, sk->sk_err, sk->sk_err_soft); 599 goto out; 600 } 601 602 /* below sendcon only handling */ 603 if (test_bit(CF_IS_OTHERCON, &con->flags)) 604 con = con->sendcon; 605 606 switch (sk->sk_err) { 607 case ECONNREFUSED: 608 set_bit(CF_DELAY_CONNECT, &con->flags); 609 break; 610 default: 611 break; 612 } 613 614 if (!test_and_set_bit(CF_RECONNECT, &con->flags)) 615 queue_work(send_workqueue, &con->swork); 616 617 out: 618 listen_sock.sk_error_report(sk); 619 } 620 621 static void restore_callbacks(struct socket *sock) 622 { 623 struct sock *sk = sock->sk; 624 625 lock_sock(sk); 626 sk->sk_user_data = NULL; 627 sk->sk_data_ready = listen_sock.sk_data_ready; 628 sk->sk_state_change = listen_sock.sk_state_change; 629 sk->sk_write_space = listen_sock.sk_write_space; 630 sk->sk_error_report = listen_sock.sk_error_report; 631 release_sock(sk); 632 } 633 634 /* Make a socket active */ 635 static void add_sock(struct socket *sock, struct connection *con) 636 { 637 struct sock *sk = sock->sk; 638 639 lock_sock(sk); 640 con->sock = sock; 641 642 sk->sk_user_data = con; 643 /* Install a data_ready callback */ 644 sk->sk_data_ready = lowcomms_data_ready; 645 sk->sk_write_space = lowcomms_write_space; 646 sk->sk_state_change = lowcomms_state_change; 647 sk->sk_allocation = GFP_NOFS; 648 sk->sk_error_report = lowcomms_error_report; 649 release_sock(sk); 650 } 651 652 /* Add the port number to an IPv6 or 4 sockaddr and return the address 653 length */ 654 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 655 int *addr_len) 656 { 657 saddr->ss_family = dlm_local_addr[0].ss_family; 658 if (saddr->ss_family == AF_INET) { 659 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 660 in4_addr->sin_port = cpu_to_be16(port); 661 *addr_len = sizeof(struct sockaddr_in); 662 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 663 } else { 664 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 665 in6_addr->sin6_port = cpu_to_be16(port); 666 *addr_len = sizeof(struct sockaddr_in6); 667 } 668 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 669 } 670 671 static void dlm_page_release(struct kref *kref) 672 { 673 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 674 ref); 675 676 __free_page(e->page); 677 dlm_free_writequeue(e); 678 } 679 680 static void dlm_msg_release(struct kref *kref) 681 { 682 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 683 684 kref_put(&msg->entry->ref, dlm_page_release); 685 dlm_free_msg(msg); 686 } 687 688 static void free_entry(struct writequeue_entry *e) 689 { 690 struct dlm_msg *msg, *tmp; 691 692 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 693 if (msg->orig_msg) { 694 msg->orig_msg->retransmit = false; 695 kref_put(&msg->orig_msg->ref, dlm_msg_release); 696 } 697 698 list_del(&msg->list); 699 kref_put(&msg->ref, dlm_msg_release); 700 } 701 702 list_del(&e->list); 703 kref_put(&e->ref, dlm_page_release); 704 } 705 706 static void dlm_close_sock(struct socket **sock) 707 { 708 if (*sock) { 709 restore_callbacks(*sock); 710 sock_release(*sock); 711 *sock = NULL; 712 } 713 } 714 715 /* Close a remote connection and tidy up */ 716 static void close_connection(struct connection *con, bool and_other, 717 bool tx, bool rx) 718 { 719 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 720 struct writequeue_entry *e; 721 722 if (tx && !closing && cancel_work_sync(&con->swork)) { 723 log_print("canceled swork for node %d", con->nodeid); 724 clear_bit(CF_WRITE_PENDING, &con->flags); 725 } 726 if (rx && !closing && cancel_work_sync(&con->rwork)) { 727 log_print("canceled rwork for node %d", con->nodeid); 728 clear_bit(CF_READ_PENDING, &con->flags); 729 } 730 731 mutex_lock(&con->sock_mutex); 732 dlm_close_sock(&con->sock); 733 734 if (con->othercon && and_other) { 735 /* Will only re-enter once. */ 736 close_connection(con->othercon, false, tx, rx); 737 } 738 739 /* if we send a writequeue entry only a half way, we drop the 740 * whole entry because reconnection and that we not start of the 741 * middle of a msg which will confuse the other end. 742 * 743 * we can always drop messages because retransmits, but what we 744 * cannot allow is to transmit half messages which may be processed 745 * at the other side. 746 * 747 * our policy is to start on a clean state when disconnects, we don't 748 * know what's send/received on transport layer in this case. 749 */ 750 spin_lock(&con->writequeue_lock); 751 if (!list_empty(&con->writequeue)) { 752 e = list_first_entry(&con->writequeue, struct writequeue_entry, 753 list); 754 if (e->dirty) 755 free_entry(e); 756 } 757 spin_unlock(&con->writequeue_lock); 758 759 con->rx_leftover = 0; 760 con->retries = 0; 761 clear_bit(CF_APP_LIMITED, &con->flags); 762 clear_bit(CF_CONNECTED, &con->flags); 763 clear_bit(CF_DELAY_CONNECT, &con->flags); 764 clear_bit(CF_RECONNECT, &con->flags); 765 mutex_unlock(&con->sock_mutex); 766 clear_bit(CF_CLOSING, &con->flags); 767 } 768 769 static int con_realloc_receive_buf(struct connection *con, int newlen) 770 { 771 unsigned char *newbuf; 772 773 newbuf = kmalloc(newlen, GFP_NOFS); 774 if (!newbuf) 775 return -ENOMEM; 776 777 /* copy any leftover from last receive */ 778 if (con->rx_leftover) 779 memmove(newbuf, con->rx_buf, con->rx_leftover); 780 781 /* swap to new buffer space */ 782 kfree(con->rx_buf); 783 con->rx_buflen = newlen; 784 con->rx_buf = newbuf; 785 786 return 0; 787 } 788 789 /* Data received from remote end */ 790 static int receive_from_sock(struct connection *con) 791 { 792 struct msghdr msg; 793 struct kvec iov; 794 int ret, buflen; 795 796 mutex_lock(&con->sock_mutex); 797 798 if (con->sock == NULL) { 799 ret = -EAGAIN; 800 goto out_close; 801 } 802 803 /* realloc if we get new buffer size to read out */ 804 buflen = dlm_config.ci_buffer_size; 805 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 806 ret = con_realloc_receive_buf(con, buflen); 807 if (ret < 0) 808 goto out_resched; 809 } 810 811 for (;;) { 812 /* calculate new buffer parameter regarding last receive and 813 * possible leftover bytes 814 */ 815 iov.iov_base = con->rx_buf + con->rx_leftover; 816 iov.iov_len = con->rx_buflen - con->rx_leftover; 817 818 memset(&msg, 0, sizeof(msg)); 819 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 820 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 821 msg.msg_flags); 822 trace_dlm_recv(con->nodeid, ret); 823 if (ret == -EAGAIN) 824 break; 825 else if (ret <= 0) 826 goto out_close; 827 828 /* new buflen according readed bytes and leftover from last receive */ 829 buflen = ret + con->rx_leftover; 830 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 831 if (ret < 0) 832 goto out_close; 833 834 /* calculate leftover bytes from process and put it into begin of 835 * the receive buffer, so next receive we have the full message 836 * at the start address of the receive buffer. 837 */ 838 con->rx_leftover = buflen - ret; 839 if (con->rx_leftover) { 840 memmove(con->rx_buf, con->rx_buf + ret, 841 con->rx_leftover); 842 } 843 } 844 845 dlm_midcomms_receive_done(con->nodeid); 846 mutex_unlock(&con->sock_mutex); 847 return 0; 848 849 out_resched: 850 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 851 queue_work(recv_workqueue, &con->rwork); 852 mutex_unlock(&con->sock_mutex); 853 return -EAGAIN; 854 855 out_close: 856 if (ret == 0) { 857 log_print("connection %p got EOF from %d", 858 con, con->nodeid); 859 860 mutex_unlock(&con->sock_mutex); 861 close_connection(con, false, true, false); 862 /* signal to breaking receive worker */ 863 ret = -1; 864 } else { 865 mutex_unlock(&con->sock_mutex); 866 } 867 return ret; 868 } 869 870 /* Listening socket is busy, accept a connection */ 871 static int accept_from_sock(struct listen_connection *con) 872 { 873 int result; 874 struct sockaddr_storage peeraddr; 875 struct socket *newsock; 876 int len, idx; 877 int nodeid; 878 struct connection *newcon; 879 struct connection *addcon; 880 unsigned int mark; 881 882 if (!con->sock) 883 return -ENOTCONN; 884 885 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 886 if (result < 0) 887 goto accept_err; 888 889 /* Get the connected socket's peer */ 890 memset(&peeraddr, 0, sizeof(peeraddr)); 891 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 892 if (len < 0) { 893 result = -ECONNABORTED; 894 goto accept_err; 895 } 896 897 /* Get the new node's NODEID */ 898 make_sockaddr(&peeraddr, 0, &len); 899 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 900 switch (peeraddr.ss_family) { 901 case AF_INET: { 902 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 903 904 log_print("connect from non cluster IPv4 node %pI4", 905 &sin->sin_addr); 906 break; 907 } 908 #if IS_ENABLED(CONFIG_IPV6) 909 case AF_INET6: { 910 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 911 912 log_print("connect from non cluster IPv6 node %pI6c", 913 &sin6->sin6_addr); 914 break; 915 } 916 #endif 917 default: 918 log_print("invalid family from non cluster node"); 919 break; 920 } 921 922 sock_release(newsock); 923 return -1; 924 } 925 926 log_print("got connection from %d", nodeid); 927 928 /* Check to see if we already have a connection to this node. This 929 * could happen if the two nodes initiate a connection at roughly 930 * the same time and the connections cross on the wire. 931 * In this case we store the incoming one in "othercon" 932 */ 933 idx = srcu_read_lock(&connections_srcu); 934 newcon = nodeid2con(nodeid, 0); 935 if (WARN_ON_ONCE(!newcon)) { 936 srcu_read_unlock(&connections_srcu, idx); 937 result = -ENOENT; 938 goto accept_err; 939 } 940 941 sock_set_mark(newsock->sk, mark); 942 943 mutex_lock(&newcon->sock_mutex); 944 if (newcon->sock) { 945 struct connection *othercon = newcon->othercon; 946 947 if (!othercon) { 948 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 949 if (!othercon) { 950 log_print("failed to allocate incoming socket"); 951 mutex_unlock(&newcon->sock_mutex); 952 srcu_read_unlock(&connections_srcu, idx); 953 result = -ENOMEM; 954 goto accept_err; 955 } 956 957 result = dlm_con_init(othercon, nodeid); 958 if (result < 0) { 959 kfree(othercon); 960 mutex_unlock(&newcon->sock_mutex); 961 srcu_read_unlock(&connections_srcu, idx); 962 goto accept_err; 963 } 964 965 lockdep_set_subclass(&othercon->sock_mutex, 1); 966 set_bit(CF_IS_OTHERCON, &othercon->flags); 967 newcon->othercon = othercon; 968 othercon->sendcon = newcon; 969 } else { 970 /* close other sock con if we have something new */ 971 close_connection(othercon, false, true, false); 972 } 973 974 mutex_lock(&othercon->sock_mutex); 975 add_sock(newsock, othercon); 976 addcon = othercon; 977 mutex_unlock(&othercon->sock_mutex); 978 } 979 else { 980 /* accept copies the sk after we've saved the callbacks, so we 981 don't want to save them a second time or comm errors will 982 result in calling sk_error_report recursively. */ 983 add_sock(newsock, newcon); 984 addcon = newcon; 985 } 986 987 set_bit(CF_CONNECTED, &addcon->flags); 988 mutex_unlock(&newcon->sock_mutex); 989 990 /* 991 * Add it to the active queue in case we got data 992 * between processing the accept adding the socket 993 * to the read_sockets list 994 */ 995 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 996 queue_work(recv_workqueue, &addcon->rwork); 997 998 srcu_read_unlock(&connections_srcu, idx); 999 1000 return 0; 1001 1002 accept_err: 1003 if (newsock) 1004 sock_release(newsock); 1005 1006 if (result != -EAGAIN) 1007 log_print("error accepting connection from node: %d", result); 1008 return result; 1009 } 1010 1011 /* 1012 * writequeue_entry_complete - try to delete and free write queue entry 1013 * @e: write queue entry to try to delete 1014 * @completed: bytes completed 1015 * 1016 * writequeue_lock must be held. 1017 */ 1018 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1019 { 1020 e->offset += completed; 1021 e->len -= completed; 1022 /* signal that page was half way transmitted */ 1023 e->dirty = true; 1024 1025 if (e->len == 0 && e->users == 0) 1026 free_entry(e); 1027 } 1028 1029 /* 1030 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1031 */ 1032 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1033 { 1034 struct sockaddr_storage localaddr; 1035 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1036 int i, addr_len, result = 0; 1037 1038 for (i = 0; i < dlm_local_count; i++) { 1039 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1040 make_sockaddr(&localaddr, port, &addr_len); 1041 1042 if (!i) 1043 result = kernel_bind(sock, addr, addr_len); 1044 else 1045 result = sock_bind_add(sock->sk, addr, addr_len); 1046 1047 if (result < 0) { 1048 log_print("Can't bind to %d addr number %d, %d.\n", 1049 port, i + 1, result); 1050 break; 1051 } 1052 } 1053 return result; 1054 } 1055 1056 /* Get local addresses */ 1057 static void init_local(void) 1058 { 1059 struct sockaddr_storage sas; 1060 int i; 1061 1062 dlm_local_count = 0; 1063 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1064 if (dlm_our_addr(&sas, i)) 1065 break; 1066 1067 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1068 } 1069 } 1070 1071 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1072 { 1073 struct writequeue_entry *entry; 1074 1075 entry = dlm_allocate_writequeue(); 1076 if (!entry) 1077 return NULL; 1078 1079 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1080 if (!entry->page) { 1081 dlm_free_writequeue(entry); 1082 return NULL; 1083 } 1084 1085 entry->offset = 0; 1086 entry->len = 0; 1087 entry->end = 0; 1088 entry->dirty = false; 1089 entry->con = con; 1090 entry->users = 1; 1091 kref_init(&entry->ref); 1092 return entry; 1093 } 1094 1095 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1096 char **ppc, void (*cb)(void *data), 1097 void *data) 1098 { 1099 struct writequeue_entry *e; 1100 1101 spin_lock(&con->writequeue_lock); 1102 if (!list_empty(&con->writequeue)) { 1103 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1104 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1105 kref_get(&e->ref); 1106 1107 *ppc = page_address(e->page) + e->end; 1108 if (cb) 1109 cb(data); 1110 1111 e->end += len; 1112 e->users++; 1113 goto out; 1114 } 1115 } 1116 1117 e = new_writequeue_entry(con); 1118 if (!e) 1119 goto out; 1120 1121 kref_get(&e->ref); 1122 *ppc = page_address(e->page); 1123 e->end += len; 1124 if (cb) 1125 cb(data); 1126 1127 list_add_tail(&e->list, &con->writequeue); 1128 1129 out: 1130 spin_unlock(&con->writequeue_lock); 1131 return e; 1132 }; 1133 1134 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1135 gfp_t allocation, char **ppc, 1136 void (*cb)(void *data), 1137 void *data) 1138 { 1139 struct writequeue_entry *e; 1140 struct dlm_msg *msg; 1141 1142 msg = dlm_allocate_msg(allocation); 1143 if (!msg) 1144 return NULL; 1145 1146 kref_init(&msg->ref); 1147 1148 e = new_wq_entry(con, len, ppc, cb, data); 1149 if (!e) { 1150 dlm_free_msg(msg); 1151 return NULL; 1152 } 1153 1154 msg->retransmit = false; 1155 msg->orig_msg = NULL; 1156 msg->ppc = *ppc; 1157 msg->len = len; 1158 msg->entry = e; 1159 1160 return msg; 1161 } 1162 1163 /* avoid false positive for nodes_srcu, unlock happens in 1164 * dlm_lowcomms_commit_msg which is a must call if success 1165 */ 1166 #ifndef __CHECKER__ 1167 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1168 char **ppc, void (*cb)(void *data), 1169 void *data) 1170 { 1171 struct connection *con; 1172 struct dlm_msg *msg; 1173 int idx; 1174 1175 if (len > DLM_MAX_SOCKET_BUFSIZE || 1176 len < sizeof(struct dlm_header)) { 1177 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1178 log_print("failed to allocate a buffer of size %d", len); 1179 WARN_ON(1); 1180 return NULL; 1181 } 1182 1183 idx = srcu_read_lock(&connections_srcu); 1184 con = nodeid2con(nodeid, 0); 1185 if (WARN_ON_ONCE(!con)) { 1186 srcu_read_unlock(&connections_srcu, idx); 1187 return NULL; 1188 } 1189 1190 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1191 if (!msg) { 1192 srcu_read_unlock(&connections_srcu, idx); 1193 return NULL; 1194 } 1195 1196 /* for dlm_lowcomms_commit_msg() */ 1197 kref_get(&msg->ref); 1198 /* we assume if successful commit must called */ 1199 msg->idx = idx; 1200 return msg; 1201 } 1202 #endif 1203 1204 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1205 { 1206 struct writequeue_entry *e = msg->entry; 1207 struct connection *con = e->con; 1208 int users; 1209 1210 spin_lock(&con->writequeue_lock); 1211 kref_get(&msg->ref); 1212 list_add(&msg->list, &e->msgs); 1213 1214 users = --e->users; 1215 if (users) 1216 goto out; 1217 1218 e->len = DLM_WQ_LENGTH_BYTES(e); 1219 spin_unlock(&con->writequeue_lock); 1220 1221 queue_work(send_workqueue, &con->swork); 1222 return; 1223 1224 out: 1225 spin_unlock(&con->writequeue_lock); 1226 return; 1227 } 1228 1229 /* avoid false positive for nodes_srcu, lock was happen in 1230 * dlm_lowcomms_new_msg 1231 */ 1232 #ifndef __CHECKER__ 1233 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1234 { 1235 _dlm_lowcomms_commit_msg(msg); 1236 srcu_read_unlock(&connections_srcu, msg->idx); 1237 /* because dlm_lowcomms_new_msg() */ 1238 kref_put(&msg->ref, dlm_msg_release); 1239 } 1240 #endif 1241 1242 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1243 { 1244 kref_put(&msg->ref, dlm_msg_release); 1245 } 1246 1247 /* does not held connections_srcu, usage workqueue only */ 1248 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1249 { 1250 struct dlm_msg *msg_resend; 1251 char *ppc; 1252 1253 if (msg->retransmit) 1254 return 1; 1255 1256 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1257 GFP_ATOMIC, &ppc, NULL, NULL); 1258 if (!msg_resend) 1259 return -ENOMEM; 1260 1261 msg->retransmit = true; 1262 kref_get(&msg->ref); 1263 msg_resend->orig_msg = msg; 1264 1265 memcpy(ppc, msg->ppc, msg->len); 1266 _dlm_lowcomms_commit_msg(msg_resend); 1267 dlm_lowcomms_put_msg(msg_resend); 1268 1269 return 0; 1270 } 1271 1272 /* Send a message */ 1273 static void send_to_sock(struct connection *con) 1274 { 1275 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1276 struct writequeue_entry *e; 1277 int len, offset, ret; 1278 int count; 1279 1280 again: 1281 count = 0; 1282 1283 mutex_lock(&con->sock_mutex); 1284 if (con->sock == NULL) 1285 goto out_connect; 1286 1287 spin_lock(&con->writequeue_lock); 1288 for (;;) { 1289 e = con_next_wq(con); 1290 if (!e) 1291 break; 1292 1293 len = e->len; 1294 offset = e->offset; 1295 BUG_ON(len == 0 && e->users == 0); 1296 spin_unlock(&con->writequeue_lock); 1297 1298 ret = kernel_sendpage(con->sock, e->page, offset, len, 1299 msg_flags); 1300 trace_dlm_send(con->nodeid, ret); 1301 if (ret == -EAGAIN || ret == 0) { 1302 if (ret == -EAGAIN && 1303 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1304 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1305 /* Notify TCP that we're limited by the 1306 * application window size. 1307 */ 1308 set_bit(SOCK_NOSPACE, &con->sock->flags); 1309 con->sock->sk->sk_write_pending++; 1310 } 1311 cond_resched(); 1312 goto out; 1313 } else if (ret < 0) 1314 goto out; 1315 1316 spin_lock(&con->writequeue_lock); 1317 writequeue_entry_complete(e, ret); 1318 1319 /* Don't starve people filling buffers */ 1320 if (++count >= MAX_SEND_MSG_COUNT) { 1321 spin_unlock(&con->writequeue_lock); 1322 mutex_unlock(&con->sock_mutex); 1323 cond_resched(); 1324 goto again; 1325 } 1326 } 1327 spin_unlock(&con->writequeue_lock); 1328 1329 out: 1330 mutex_unlock(&con->sock_mutex); 1331 return; 1332 1333 out_connect: 1334 mutex_unlock(&con->sock_mutex); 1335 queue_work(send_workqueue, &con->swork); 1336 cond_resched(); 1337 } 1338 1339 static void clean_one_writequeue(struct connection *con) 1340 { 1341 struct writequeue_entry *e, *safe; 1342 1343 spin_lock(&con->writequeue_lock); 1344 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1345 free_entry(e); 1346 } 1347 spin_unlock(&con->writequeue_lock); 1348 } 1349 1350 static void connection_release(struct rcu_head *rcu) 1351 { 1352 struct connection *con = container_of(rcu, struct connection, rcu); 1353 1354 kfree(con->rx_buf); 1355 kfree(con); 1356 } 1357 1358 /* Called from recovery when it knows that a node has 1359 left the cluster */ 1360 int dlm_lowcomms_close(int nodeid) 1361 { 1362 struct connection *con; 1363 int idx; 1364 1365 log_print("closing connection to node %d", nodeid); 1366 1367 idx = srcu_read_lock(&connections_srcu); 1368 con = nodeid2con(nodeid, 0); 1369 if (WARN_ON_ONCE(!con)) { 1370 srcu_read_unlock(&connections_srcu, idx); 1371 return -ENOENT; 1372 } 1373 1374 spin_lock(&connections_lock); 1375 hlist_del_rcu(&con->list); 1376 spin_unlock(&connections_lock); 1377 1378 close_connection(con, true, true, true); 1379 1380 clean_one_writequeue(con); 1381 call_srcu(&connections_srcu, &con->rcu, connection_release); 1382 if (con->othercon) { 1383 clean_one_writequeue(con->othercon); 1384 if (con->othercon) 1385 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1386 } 1387 srcu_read_unlock(&connections_srcu, idx); 1388 1389 return 0; 1390 } 1391 1392 /* Receive workqueue function */ 1393 static void process_recv_sockets(struct work_struct *work) 1394 { 1395 struct connection *con = container_of(work, struct connection, rwork); 1396 1397 clear_bit(CF_READ_PENDING, &con->flags); 1398 receive_from_sock(con); 1399 } 1400 1401 static void process_listen_recv_socket(struct work_struct *work) 1402 { 1403 int ret; 1404 1405 do { 1406 ret = accept_from_sock(&listen_con); 1407 } while (!ret); 1408 } 1409 1410 static void dlm_connect(struct connection *con) 1411 { 1412 struct sockaddr_storage addr; 1413 int result, addr_len; 1414 struct socket *sock; 1415 unsigned int mark; 1416 1417 /* Some odd races can cause double-connects, ignore them */ 1418 if (con->retries++ > MAX_CONNECT_RETRIES) 1419 return; 1420 1421 if (con->sock) { 1422 log_print("node %d already connected.", con->nodeid); 1423 return; 1424 } 1425 1426 memset(&addr, 0, sizeof(addr)); 1427 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1428 dlm_proto_ops->try_new_addr, &mark); 1429 if (result < 0) { 1430 log_print("no address for nodeid %d", con->nodeid); 1431 return; 1432 } 1433 1434 /* Create a socket to communicate with */ 1435 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1436 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1437 if (result < 0) 1438 goto socket_err; 1439 1440 sock_set_mark(sock->sk, mark); 1441 dlm_proto_ops->sockopts(sock); 1442 1443 add_sock(sock, con); 1444 1445 result = dlm_proto_ops->bind(sock); 1446 if (result < 0) 1447 goto add_sock_err; 1448 1449 log_print_ratelimited("connecting to %d", con->nodeid); 1450 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1451 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1452 addr_len); 1453 if (result < 0) 1454 goto add_sock_err; 1455 1456 return; 1457 1458 add_sock_err: 1459 dlm_close_sock(&con->sock); 1460 1461 socket_err: 1462 /* 1463 * Some errors are fatal and this list might need adjusting. For other 1464 * errors we try again until the max number of retries is reached. 1465 */ 1466 if (result != -EHOSTUNREACH && 1467 result != -ENETUNREACH && 1468 result != -ENETDOWN && 1469 result != -EINVAL && 1470 result != -EPROTONOSUPPORT) { 1471 log_print("connect %d try %d error %d", con->nodeid, 1472 con->retries, result); 1473 msleep(1000); 1474 lowcomms_connect_sock(con); 1475 } 1476 } 1477 1478 /* Send workqueue function */ 1479 static void process_send_sockets(struct work_struct *work) 1480 { 1481 struct connection *con = container_of(work, struct connection, swork); 1482 1483 WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); 1484 1485 clear_bit(CF_WRITE_PENDING, &con->flags); 1486 1487 if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { 1488 close_connection(con, false, false, true); 1489 dlm_midcomms_unack_msg_resend(con->nodeid); 1490 } 1491 1492 if (con->sock == NULL) { 1493 if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) 1494 msleep(1000); 1495 1496 mutex_lock(&con->sock_mutex); 1497 dlm_connect(con); 1498 mutex_unlock(&con->sock_mutex); 1499 } 1500 1501 if (!list_empty(&con->writequeue)) 1502 send_to_sock(con); 1503 } 1504 1505 static void work_stop(void) 1506 { 1507 if (recv_workqueue) { 1508 destroy_workqueue(recv_workqueue); 1509 recv_workqueue = NULL; 1510 } 1511 1512 if (send_workqueue) { 1513 destroy_workqueue(send_workqueue); 1514 send_workqueue = NULL; 1515 } 1516 } 1517 1518 static int work_start(void) 1519 { 1520 recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM); 1521 if (!recv_workqueue) { 1522 log_print("can't start dlm_recv"); 1523 return -ENOMEM; 1524 } 1525 1526 send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM); 1527 if (!send_workqueue) { 1528 log_print("can't start dlm_send"); 1529 destroy_workqueue(recv_workqueue); 1530 recv_workqueue = NULL; 1531 return -ENOMEM; 1532 } 1533 1534 return 0; 1535 } 1536 1537 void dlm_lowcomms_shutdown(void) 1538 { 1539 /* stop lowcomms_listen_data_ready calls */ 1540 lock_sock(listen_con.sock->sk); 1541 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1542 release_sock(listen_con.sock->sk); 1543 1544 cancel_work_sync(&listen_con.rwork); 1545 dlm_close_sock(&listen_con.sock); 1546 } 1547 1548 void dlm_lowcomms_shutdown_node(int nodeid, bool force) 1549 { 1550 struct connection *con; 1551 int idx; 1552 1553 idx = srcu_read_lock(&connections_srcu); 1554 con = nodeid2con(nodeid, 0); 1555 if (WARN_ON_ONCE(!con)) { 1556 srcu_read_unlock(&connections_srcu, idx); 1557 return; 1558 } 1559 1560 flush_work(&con->swork); 1561 WARN_ON_ONCE(!force && !list_empty(&con->writequeue)); 1562 clean_one_writequeue(con); 1563 if (con->othercon) 1564 clean_one_writequeue(con->othercon); 1565 close_connection(con, true, true, true); 1566 srcu_read_unlock(&connections_srcu, idx); 1567 } 1568 1569 static void _stop_conn(struct connection *con, bool and_other) 1570 { 1571 mutex_lock(&con->sock_mutex); 1572 set_bit(CF_CLOSE, &con->flags); 1573 set_bit(CF_READ_PENDING, &con->flags); 1574 set_bit(CF_WRITE_PENDING, &con->flags); 1575 if (con->sock && con->sock->sk) { 1576 lock_sock(con->sock->sk); 1577 con->sock->sk->sk_user_data = NULL; 1578 release_sock(con->sock->sk); 1579 } 1580 if (con->othercon && and_other) 1581 _stop_conn(con->othercon, false); 1582 mutex_unlock(&con->sock_mutex); 1583 } 1584 1585 static void stop_conn(struct connection *con) 1586 { 1587 _stop_conn(con, true); 1588 } 1589 1590 static void free_conn(struct connection *con) 1591 { 1592 close_connection(con, true, true, true); 1593 } 1594 1595 static void work_flush(void) 1596 { 1597 int ok; 1598 int i; 1599 struct connection *con; 1600 1601 do { 1602 ok = 1; 1603 foreach_conn(stop_conn); 1604 if (recv_workqueue) 1605 flush_workqueue(recv_workqueue); 1606 if (send_workqueue) 1607 flush_workqueue(send_workqueue); 1608 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1609 hlist_for_each_entry_rcu(con, &connection_hash[i], 1610 list) { 1611 ok &= test_bit(CF_READ_PENDING, &con->flags); 1612 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1613 if (con->othercon) { 1614 ok &= test_bit(CF_READ_PENDING, 1615 &con->othercon->flags); 1616 ok &= test_bit(CF_WRITE_PENDING, 1617 &con->othercon->flags); 1618 } 1619 } 1620 } 1621 } while (!ok); 1622 } 1623 1624 void dlm_lowcomms_stop(void) 1625 { 1626 int idx; 1627 1628 idx = srcu_read_lock(&connections_srcu); 1629 work_flush(); 1630 foreach_conn(free_conn); 1631 srcu_read_unlock(&connections_srcu, idx); 1632 work_stop(); 1633 1634 dlm_proto_ops = NULL; 1635 } 1636 1637 static int dlm_listen_for_all(void) 1638 { 1639 struct socket *sock; 1640 int result; 1641 1642 log_print("Using %s for communications", 1643 dlm_proto_ops->name); 1644 1645 result = dlm_proto_ops->listen_validate(); 1646 if (result < 0) 1647 return result; 1648 1649 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1650 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1651 if (result < 0) { 1652 log_print("Can't create comms socket: %d", result); 1653 return result; 1654 } 1655 1656 sock_set_mark(sock->sk, dlm_config.ci_mark); 1657 dlm_proto_ops->listen_sockopts(sock); 1658 1659 result = dlm_proto_ops->listen_bind(sock); 1660 if (result < 0) 1661 goto out; 1662 1663 lock_sock(sock->sk); 1664 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1665 listen_sock.sk_write_space = sock->sk->sk_write_space; 1666 listen_sock.sk_error_report = sock->sk->sk_error_report; 1667 listen_sock.sk_state_change = sock->sk->sk_state_change; 1668 1669 listen_con.sock = sock; 1670 1671 sock->sk->sk_allocation = GFP_NOFS; 1672 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1673 release_sock(sock->sk); 1674 1675 result = sock->ops->listen(sock, 5); 1676 if (result < 0) { 1677 dlm_close_sock(&listen_con.sock); 1678 return result; 1679 } 1680 1681 return 0; 1682 1683 out: 1684 sock_release(sock); 1685 return result; 1686 } 1687 1688 static int dlm_tcp_bind(struct socket *sock) 1689 { 1690 struct sockaddr_storage src_addr; 1691 int result, addr_len; 1692 1693 /* Bind to our cluster-known address connecting to avoid 1694 * routing problems. 1695 */ 1696 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1697 make_sockaddr(&src_addr, 0, &addr_len); 1698 1699 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1700 addr_len); 1701 if (result < 0) { 1702 /* This *may* not indicate a critical error */ 1703 log_print("could not bind for connect: %d", result); 1704 } 1705 1706 return 0; 1707 } 1708 1709 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1710 struct sockaddr *addr, int addr_len) 1711 { 1712 int ret; 1713 1714 ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1715 switch (ret) { 1716 case -EINPROGRESS: 1717 fallthrough; 1718 case 0: 1719 return 0; 1720 } 1721 1722 return ret; 1723 } 1724 1725 static int dlm_tcp_listen_validate(void) 1726 { 1727 /* We don't support multi-homed hosts */ 1728 if (dlm_local_count > 1) { 1729 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1730 return -EINVAL; 1731 } 1732 1733 return 0; 1734 } 1735 1736 static void dlm_tcp_sockopts(struct socket *sock) 1737 { 1738 /* Turn off Nagle's algorithm */ 1739 tcp_sock_set_nodelay(sock->sk); 1740 } 1741 1742 static void dlm_tcp_listen_sockopts(struct socket *sock) 1743 { 1744 dlm_tcp_sockopts(sock); 1745 sock_set_reuseaddr(sock->sk); 1746 } 1747 1748 static int dlm_tcp_listen_bind(struct socket *sock) 1749 { 1750 int addr_len; 1751 1752 /* Bind to our port */ 1753 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1754 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1755 addr_len); 1756 } 1757 1758 static const struct dlm_proto_ops dlm_tcp_ops = { 1759 .name = "TCP", 1760 .proto = IPPROTO_TCP, 1761 .connect = dlm_tcp_connect, 1762 .sockopts = dlm_tcp_sockopts, 1763 .bind = dlm_tcp_bind, 1764 .listen_validate = dlm_tcp_listen_validate, 1765 .listen_sockopts = dlm_tcp_listen_sockopts, 1766 .listen_bind = dlm_tcp_listen_bind, 1767 }; 1768 1769 static int dlm_sctp_bind(struct socket *sock) 1770 { 1771 return sctp_bind_addrs(sock, 0); 1772 } 1773 1774 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1775 struct sockaddr *addr, int addr_len) 1776 { 1777 int ret; 1778 1779 /* 1780 * Make sock->ops->connect() function return in specified time, 1781 * since O_NONBLOCK argument in connect() function does not work here, 1782 * then, we should restore the default value of this attribute. 1783 */ 1784 sock_set_sndtimeo(sock->sk, 5); 1785 ret = sock->ops->connect(sock, addr, addr_len, 0); 1786 sock_set_sndtimeo(sock->sk, 0); 1787 if (ret < 0) 1788 return ret; 1789 1790 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) 1791 log_print("connected to node %d", con->nodeid); 1792 1793 return 0; 1794 } 1795 1796 static int dlm_sctp_listen_validate(void) 1797 { 1798 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1799 log_print("SCTP is not enabled by this kernel"); 1800 return -EOPNOTSUPP; 1801 } 1802 1803 request_module("sctp"); 1804 return 0; 1805 } 1806 1807 static int dlm_sctp_bind_listen(struct socket *sock) 1808 { 1809 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1810 } 1811 1812 static void dlm_sctp_sockopts(struct socket *sock) 1813 { 1814 /* Turn off Nagle's algorithm */ 1815 sctp_sock_set_nodelay(sock->sk); 1816 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1817 } 1818 1819 static const struct dlm_proto_ops dlm_sctp_ops = { 1820 .name = "SCTP", 1821 .proto = IPPROTO_SCTP, 1822 .try_new_addr = true, 1823 .connect = dlm_sctp_connect, 1824 .sockopts = dlm_sctp_sockopts, 1825 .bind = dlm_sctp_bind, 1826 .listen_validate = dlm_sctp_listen_validate, 1827 .listen_sockopts = dlm_sctp_sockopts, 1828 .listen_bind = dlm_sctp_bind_listen, 1829 }; 1830 1831 int dlm_lowcomms_start(void) 1832 { 1833 int error = -EINVAL; 1834 1835 init_local(); 1836 if (!dlm_local_count) { 1837 error = -ENOTCONN; 1838 log_print("no local IP address has been set"); 1839 goto fail; 1840 } 1841 1842 error = work_start(); 1843 if (error) 1844 goto fail; 1845 1846 /* Start listening */ 1847 switch (dlm_config.ci_protocol) { 1848 case DLM_PROTO_TCP: 1849 dlm_proto_ops = &dlm_tcp_ops; 1850 break; 1851 case DLM_PROTO_SCTP: 1852 dlm_proto_ops = &dlm_sctp_ops; 1853 break; 1854 default: 1855 log_print("Invalid protocol identifier %d set", 1856 dlm_config.ci_protocol); 1857 error = -EINVAL; 1858 goto fail_proto_ops; 1859 } 1860 1861 error = dlm_listen_for_all(); 1862 if (error) 1863 goto fail_listen; 1864 1865 return 0; 1866 1867 fail_listen: 1868 dlm_proto_ops = NULL; 1869 fail_proto_ops: 1870 work_stop(); 1871 fail: 1872 return error; 1873 } 1874 1875 void dlm_lowcomms_init(void) 1876 { 1877 int i; 1878 1879 for (i = 0; i < CONN_HASH_SIZE; i++) 1880 INIT_HLIST_HEAD(&connection_hash[i]); 1881 1882 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1883 } 1884 1885 void dlm_lowcomms_exit(void) 1886 { 1887 struct connection *con; 1888 int i, idx; 1889 1890 idx = srcu_read_lock(&connections_srcu); 1891 for (i = 0; i < CONN_HASH_SIZE; i++) { 1892 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1893 spin_lock(&connections_lock); 1894 hlist_del_rcu(&con->list); 1895 spin_unlock(&connections_lock); 1896 1897 if (con->othercon) 1898 call_srcu(&connections_srcu, &con->othercon->rcu, 1899 connection_release); 1900 call_srcu(&connections_srcu, &con->rcu, connection_release); 1901 } 1902 } 1903 srcu_read_unlock(&connections_srcu, idx); 1904 } 1905