1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 58 #include "dlm_internal.h" 59 #include "lowcomms.h" 60 #include "midcomms.h" 61 #include "memory.h" 62 #include "config.h" 63 64 #define NEEDED_RMEM (4*1024*1024) 65 66 struct connection { 67 struct socket *sock; /* NULL if not connected */ 68 uint32_t nodeid; /* So we know who we are in the list */ 69 /* this semaphore is used to allow parallel recv/send in read 70 * lock mode. When we release a sock we need to held the write lock. 71 * 72 * However this is locking code and not nice. When we remove the 73 * othercon handling we can look into other mechanism to synchronize 74 * io handling to call sock_release() at the right time. 75 */ 76 struct rw_semaphore sock_lock; 77 unsigned long flags; 78 #define CF_APP_LIMITED 0 79 #define CF_RECV_PENDING 1 80 #define CF_SEND_PENDING 2 81 #define CF_RECV_INTR 3 82 #define CF_IO_STOP 4 83 #define CF_IS_OTHERCON 5 84 struct list_head writequeue; /* List of outgoing writequeue_entries */ 85 spinlock_t writequeue_lock; 86 int retries; 87 struct hlist_node list; 88 /* due some connect()/accept() races we currently have this cross over 89 * connection attempt second connection for one node. 90 * 91 * There is a solution to avoid the race by introducing a connect 92 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 93 * connect. Otherside can connect but will only be considered that 94 * the other side wants to have a reconnect. 95 * 96 * However changing to this behaviour will break backwards compatible. 97 * In a DLM protocol major version upgrade we should remove this! 98 */ 99 struct connection *othercon; 100 struct work_struct rwork; /* receive worker */ 101 struct work_struct swork; /* send worker */ 102 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 103 int rx_leftover; 104 int mark; 105 int addr_count; 106 int curr_addr_index; 107 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 108 spinlock_t addrs_lock; 109 struct rcu_head rcu; 110 }; 111 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 112 113 struct listen_connection { 114 struct socket *sock; 115 struct work_struct rwork; 116 }; 117 118 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 119 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 120 121 /* An entry waiting to be sent */ 122 struct writequeue_entry { 123 struct list_head list; 124 struct page *page; 125 int offset; 126 int len; 127 int end; 128 int users; 129 bool dirty; 130 struct connection *con; 131 struct list_head msgs; 132 struct kref ref; 133 }; 134 135 struct dlm_msg { 136 struct writequeue_entry *entry; 137 struct dlm_msg *orig_msg; 138 bool retransmit; 139 void *ppc; 140 int len; 141 int idx; /* new()/commit() idx exchange */ 142 143 struct list_head list; 144 struct kref ref; 145 }; 146 147 struct processqueue_entry { 148 unsigned char *buf; 149 int nodeid; 150 int buflen; 151 152 struct list_head list; 153 }; 154 155 struct dlm_proto_ops { 156 bool try_new_addr; 157 const char *name; 158 int proto; 159 160 int (*connect)(struct connection *con, struct socket *sock, 161 struct sockaddr *addr, int addr_len); 162 void (*sockopts)(struct socket *sock); 163 int (*bind)(struct socket *sock); 164 int (*listen_validate)(void); 165 void (*listen_sockopts)(struct socket *sock); 166 int (*listen_bind)(struct socket *sock); 167 }; 168 169 static struct listen_sock_callbacks { 170 void (*sk_error_report)(struct sock *); 171 void (*sk_data_ready)(struct sock *); 172 void (*sk_state_change)(struct sock *); 173 void (*sk_write_space)(struct sock *); 174 } listen_sock; 175 176 static struct listen_connection listen_con; 177 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 178 static int dlm_local_count; 179 180 /* Work queues */ 181 static struct workqueue_struct *io_workqueue; 182 static struct workqueue_struct *process_workqueue; 183 184 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 185 static DEFINE_SPINLOCK(connections_lock); 186 DEFINE_STATIC_SRCU(connections_srcu); 187 188 static const struct dlm_proto_ops *dlm_proto_ops; 189 190 #define DLM_IO_SUCCESS 0 191 #define DLM_IO_END 1 192 #define DLM_IO_EOF 2 193 #define DLM_IO_RESCHED 3 194 195 static void process_recv_sockets(struct work_struct *work); 196 static void process_send_sockets(struct work_struct *work); 197 static void process_dlm_messages(struct work_struct *work); 198 199 static DECLARE_WORK(process_work, process_dlm_messages); 200 static DEFINE_SPINLOCK(processqueue_lock); 201 static bool process_dlm_messages_pending; 202 static LIST_HEAD(processqueue); 203 204 bool dlm_lowcomms_is_running(void) 205 { 206 return !!listen_con.sock; 207 } 208 209 static void lowcomms_queue_swork(struct connection *con) 210 { 211 assert_spin_locked(&con->writequeue_lock); 212 213 if (!test_bit(CF_IO_STOP, &con->flags) && 214 !test_bit(CF_APP_LIMITED, &con->flags) && 215 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 216 queue_work(io_workqueue, &con->swork); 217 } 218 219 static void lowcomms_queue_rwork(struct connection *con) 220 { 221 #ifdef CONFIG_LOCKDEP 222 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 223 #endif 224 225 if (!test_bit(CF_IO_STOP, &con->flags) && 226 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 227 queue_work(io_workqueue, &con->rwork); 228 } 229 230 static void writequeue_entry_ctor(void *data) 231 { 232 struct writequeue_entry *entry = data; 233 234 INIT_LIST_HEAD(&entry->msgs); 235 } 236 237 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 238 { 239 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 240 0, 0, writequeue_entry_ctor); 241 } 242 243 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 244 { 245 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 246 } 247 248 /* need to held writequeue_lock */ 249 static struct writequeue_entry *con_next_wq(struct connection *con) 250 { 251 struct writequeue_entry *e; 252 253 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 254 list); 255 /* if len is zero nothing is to send, if there are users filling 256 * buffers we wait until the users are done so we can send more. 257 */ 258 if (!e || e->users || e->len == 0) 259 return NULL; 260 261 return e; 262 } 263 264 static struct connection *__find_con(int nodeid, int r) 265 { 266 struct connection *con; 267 268 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 269 if (con->nodeid == nodeid) 270 return con; 271 } 272 273 return NULL; 274 } 275 276 static void dlm_con_init(struct connection *con, int nodeid) 277 { 278 con->nodeid = nodeid; 279 init_rwsem(&con->sock_lock); 280 INIT_LIST_HEAD(&con->writequeue); 281 spin_lock_init(&con->writequeue_lock); 282 INIT_WORK(&con->swork, process_send_sockets); 283 INIT_WORK(&con->rwork, process_recv_sockets); 284 spin_lock_init(&con->addrs_lock); 285 } 286 287 /* 288 * If 'allocation' is zero then we don't attempt to create a new 289 * connection structure for this node. 290 */ 291 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 292 { 293 struct connection *con, *tmp; 294 int r; 295 296 r = nodeid_hash(nodeid); 297 con = __find_con(nodeid, r); 298 if (con || !alloc) 299 return con; 300 301 con = kzalloc(sizeof(*con), alloc); 302 if (!con) 303 return NULL; 304 305 dlm_con_init(con, nodeid); 306 307 spin_lock(&connections_lock); 308 /* Because multiple workqueues/threads calls this function it can 309 * race on multiple cpu's. Instead of locking hot path __find_con() 310 * we just check in rare cases of recently added nodes again 311 * under protection of connections_lock. If this is the case we 312 * abort our connection creation and return the existing connection. 313 */ 314 tmp = __find_con(nodeid, r); 315 if (tmp) { 316 spin_unlock(&connections_lock); 317 kfree(con); 318 return tmp; 319 } 320 321 hlist_add_head_rcu(&con->list, &connection_hash[r]); 322 spin_unlock(&connections_lock); 323 324 return con; 325 } 326 327 static int addr_compare(const struct sockaddr_storage *x, 328 const struct sockaddr_storage *y) 329 { 330 switch (x->ss_family) { 331 case AF_INET: { 332 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 333 struct sockaddr_in *siny = (struct sockaddr_in *)y; 334 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 335 return 0; 336 if (sinx->sin_port != siny->sin_port) 337 return 0; 338 break; 339 } 340 case AF_INET6: { 341 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 342 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 343 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 344 return 0; 345 if (sinx->sin6_port != siny->sin6_port) 346 return 0; 347 break; 348 } 349 default: 350 return 0; 351 } 352 return 1; 353 } 354 355 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 356 struct sockaddr *sa_out, bool try_new_addr, 357 unsigned int *mark) 358 { 359 struct sockaddr_storage sas; 360 struct connection *con; 361 int idx; 362 363 if (!dlm_local_count) 364 return -1; 365 366 idx = srcu_read_lock(&connections_srcu); 367 con = nodeid2con(nodeid, 0); 368 if (!con) { 369 srcu_read_unlock(&connections_srcu, idx); 370 return -ENOENT; 371 } 372 373 spin_lock(&con->addrs_lock); 374 if (!con->addr_count) { 375 spin_unlock(&con->addrs_lock); 376 srcu_read_unlock(&connections_srcu, idx); 377 return -ENOENT; 378 } 379 380 memcpy(&sas, &con->addr[con->curr_addr_index], 381 sizeof(struct sockaddr_storage)); 382 383 if (try_new_addr) { 384 con->curr_addr_index++; 385 if (con->curr_addr_index == con->addr_count) 386 con->curr_addr_index = 0; 387 } 388 389 *mark = con->mark; 390 spin_unlock(&con->addrs_lock); 391 392 if (sas_out) 393 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 394 395 if (!sa_out) { 396 srcu_read_unlock(&connections_srcu, idx); 397 return 0; 398 } 399 400 if (dlm_local_addr[0].ss_family == AF_INET) { 401 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 402 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 403 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 404 } else { 405 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 406 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 407 ret6->sin6_addr = in6->sin6_addr; 408 } 409 410 srcu_read_unlock(&connections_srcu, idx); 411 return 0; 412 } 413 414 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 415 unsigned int *mark) 416 { 417 struct connection *con; 418 int i, idx, addr_i; 419 420 idx = srcu_read_lock(&connections_srcu); 421 for (i = 0; i < CONN_HASH_SIZE; i++) { 422 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 423 WARN_ON_ONCE(!con->addr_count); 424 425 spin_lock(&con->addrs_lock); 426 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 427 if (addr_compare(&con->addr[addr_i], addr)) { 428 *nodeid = con->nodeid; 429 *mark = con->mark; 430 spin_unlock(&con->addrs_lock); 431 srcu_read_unlock(&connections_srcu, idx); 432 return 0; 433 } 434 } 435 spin_unlock(&con->addrs_lock); 436 } 437 } 438 srcu_read_unlock(&connections_srcu, idx); 439 440 return -ENOENT; 441 } 442 443 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 444 const struct sockaddr_storage *addr) 445 { 446 int i; 447 448 for (i = 0; i < con->addr_count; i++) { 449 if (addr_compare(&con->addr[i], addr)) 450 return true; 451 } 452 453 return false; 454 } 455 456 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 457 { 458 struct connection *con; 459 bool ret, idx; 460 461 idx = srcu_read_lock(&connections_srcu); 462 con = nodeid2con(nodeid, GFP_NOFS); 463 if (!con) { 464 srcu_read_unlock(&connections_srcu, idx); 465 return -ENOMEM; 466 } 467 468 spin_lock(&con->addrs_lock); 469 if (!con->addr_count) { 470 memcpy(&con->addr[0], addr, sizeof(*addr)); 471 con->addr_count = 1; 472 con->mark = dlm_config.ci_mark; 473 spin_unlock(&con->addrs_lock); 474 srcu_read_unlock(&connections_srcu, idx); 475 return 0; 476 } 477 478 ret = dlm_lowcomms_con_has_addr(con, addr); 479 if (ret) { 480 spin_unlock(&con->addrs_lock); 481 srcu_read_unlock(&connections_srcu, idx); 482 return -EEXIST; 483 } 484 485 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 486 spin_unlock(&con->addrs_lock); 487 srcu_read_unlock(&connections_srcu, idx); 488 return -ENOSPC; 489 } 490 491 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 492 srcu_read_unlock(&connections_srcu, idx); 493 spin_unlock(&con->addrs_lock); 494 return 0; 495 } 496 497 /* Data available on socket or listen socket received a connect */ 498 static void lowcomms_data_ready(struct sock *sk) 499 { 500 struct connection *con = sock2con(sk); 501 502 set_bit(CF_RECV_INTR, &con->flags); 503 lowcomms_queue_rwork(con); 504 } 505 506 static void lowcomms_write_space(struct sock *sk) 507 { 508 struct connection *con = sock2con(sk); 509 510 clear_bit(SOCK_NOSPACE, &con->sock->flags); 511 512 spin_lock_bh(&con->writequeue_lock); 513 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 514 con->sock->sk->sk_write_pending--; 515 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 516 } 517 518 lowcomms_queue_swork(con); 519 spin_unlock_bh(&con->writequeue_lock); 520 } 521 522 static void lowcomms_state_change(struct sock *sk) 523 { 524 /* SCTP layer is not calling sk_data_ready when the connection 525 * is done, so we catch the signal through here. 526 */ 527 if (sk->sk_shutdown == RCV_SHUTDOWN) 528 lowcomms_data_ready(sk); 529 } 530 531 static void lowcomms_listen_data_ready(struct sock *sk) 532 { 533 queue_work(io_workqueue, &listen_con.rwork); 534 } 535 536 int dlm_lowcomms_connect_node(int nodeid) 537 { 538 struct connection *con; 539 int idx; 540 541 if (nodeid == dlm_our_nodeid()) 542 return 0; 543 544 idx = srcu_read_lock(&connections_srcu); 545 con = nodeid2con(nodeid, 0); 546 if (WARN_ON_ONCE(!con)) { 547 srcu_read_unlock(&connections_srcu, idx); 548 return -ENOENT; 549 } 550 551 down_read(&con->sock_lock); 552 if (!con->sock) { 553 spin_lock_bh(&con->writequeue_lock); 554 lowcomms_queue_swork(con); 555 spin_unlock_bh(&con->writequeue_lock); 556 } 557 up_read(&con->sock_lock); 558 srcu_read_unlock(&connections_srcu, idx); 559 560 cond_resched(); 561 return 0; 562 } 563 564 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 565 { 566 struct connection *con; 567 int idx; 568 569 idx = srcu_read_lock(&connections_srcu); 570 con = nodeid2con(nodeid, 0); 571 if (!con) { 572 srcu_read_unlock(&connections_srcu, idx); 573 return -ENOENT; 574 } 575 576 spin_lock(&con->addrs_lock); 577 con->mark = mark; 578 spin_unlock(&con->addrs_lock); 579 srcu_read_unlock(&connections_srcu, idx); 580 return 0; 581 } 582 583 static void lowcomms_error_report(struct sock *sk) 584 { 585 struct connection *con = sock2con(sk); 586 struct inet_sock *inet; 587 588 inet = inet_sk(sk); 589 switch (sk->sk_family) { 590 case AF_INET: 591 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 592 "sending to node %d at %pI4, dport %d, " 593 "sk_err=%d/%d\n", dlm_our_nodeid(), 594 con->nodeid, &inet->inet_daddr, 595 ntohs(inet->inet_dport), sk->sk_err, 596 sk->sk_err_soft); 597 break; 598 #if IS_ENABLED(CONFIG_IPV6) 599 case AF_INET6: 600 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 601 "sending to node %d at %pI6c, " 602 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 603 con->nodeid, &sk->sk_v6_daddr, 604 ntohs(inet->inet_dport), sk->sk_err, 605 sk->sk_err_soft); 606 break; 607 #endif 608 default: 609 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 610 "invalid socket family %d set, " 611 "sk_err=%d/%d\n", dlm_our_nodeid(), 612 sk->sk_family, sk->sk_err, sk->sk_err_soft); 613 break; 614 } 615 616 dlm_midcomms_unack_msg_resend(con->nodeid); 617 618 listen_sock.sk_error_report(sk); 619 } 620 621 static void restore_callbacks(struct sock *sk) 622 { 623 #ifdef CONFIG_LOCKDEP 624 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 625 #endif 626 627 sk->sk_user_data = NULL; 628 sk->sk_data_ready = listen_sock.sk_data_ready; 629 sk->sk_state_change = listen_sock.sk_state_change; 630 sk->sk_write_space = listen_sock.sk_write_space; 631 sk->sk_error_report = listen_sock.sk_error_report; 632 } 633 634 /* Make a socket active */ 635 static void add_sock(struct socket *sock, struct connection *con) 636 { 637 struct sock *sk = sock->sk; 638 639 lock_sock(sk); 640 con->sock = sock; 641 642 sk->sk_user_data = con; 643 sk->sk_data_ready = lowcomms_data_ready; 644 sk->sk_write_space = lowcomms_write_space; 645 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 646 sk->sk_state_change = lowcomms_state_change; 647 sk->sk_allocation = GFP_NOFS; 648 sk->sk_error_report = lowcomms_error_report; 649 release_sock(sk); 650 } 651 652 /* Add the port number to an IPv6 or 4 sockaddr and return the address 653 length */ 654 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 655 int *addr_len) 656 { 657 saddr->ss_family = dlm_local_addr[0].ss_family; 658 if (saddr->ss_family == AF_INET) { 659 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 660 in4_addr->sin_port = cpu_to_be16(port); 661 *addr_len = sizeof(struct sockaddr_in); 662 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 663 } else { 664 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 665 in6_addr->sin6_port = cpu_to_be16(port); 666 *addr_len = sizeof(struct sockaddr_in6); 667 } 668 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 669 } 670 671 static void dlm_page_release(struct kref *kref) 672 { 673 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 674 ref); 675 676 __free_page(e->page); 677 dlm_free_writequeue(e); 678 } 679 680 static void dlm_msg_release(struct kref *kref) 681 { 682 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 683 684 kref_put(&msg->entry->ref, dlm_page_release); 685 dlm_free_msg(msg); 686 } 687 688 static void free_entry(struct writequeue_entry *e) 689 { 690 struct dlm_msg *msg, *tmp; 691 692 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 693 if (msg->orig_msg) { 694 msg->orig_msg->retransmit = false; 695 kref_put(&msg->orig_msg->ref, dlm_msg_release); 696 } 697 698 list_del(&msg->list); 699 kref_put(&msg->ref, dlm_msg_release); 700 } 701 702 list_del(&e->list); 703 kref_put(&e->ref, dlm_page_release); 704 } 705 706 static void dlm_close_sock(struct socket **sock) 707 { 708 lock_sock((*sock)->sk); 709 restore_callbacks((*sock)->sk); 710 release_sock((*sock)->sk); 711 712 sock_release(*sock); 713 *sock = NULL; 714 } 715 716 static void allow_connection_io(struct connection *con) 717 { 718 if (con->othercon) 719 clear_bit(CF_IO_STOP, &con->othercon->flags); 720 clear_bit(CF_IO_STOP, &con->flags); 721 } 722 723 static void stop_connection_io(struct connection *con) 724 { 725 if (con->othercon) 726 stop_connection_io(con->othercon); 727 728 down_write(&con->sock_lock); 729 if (con->sock) { 730 lock_sock(con->sock->sk); 731 restore_callbacks(con->sock->sk); 732 733 spin_lock_bh(&con->writequeue_lock); 734 set_bit(CF_IO_STOP, &con->flags); 735 spin_unlock_bh(&con->writequeue_lock); 736 release_sock(con->sock->sk); 737 } else { 738 spin_lock_bh(&con->writequeue_lock); 739 set_bit(CF_IO_STOP, &con->flags); 740 spin_unlock_bh(&con->writequeue_lock); 741 } 742 up_write(&con->sock_lock); 743 744 cancel_work_sync(&con->swork); 745 cancel_work_sync(&con->rwork); 746 } 747 748 /* Close a remote connection and tidy up */ 749 static void close_connection(struct connection *con, bool and_other) 750 { 751 struct writequeue_entry *e; 752 753 if (con->othercon && and_other) 754 close_connection(con->othercon, false); 755 756 down_write(&con->sock_lock); 757 if (!con->sock) { 758 up_write(&con->sock_lock); 759 return; 760 } 761 762 dlm_close_sock(&con->sock); 763 764 /* if we send a writequeue entry only a half way, we drop the 765 * whole entry because reconnection and that we not start of the 766 * middle of a msg which will confuse the other end. 767 * 768 * we can always drop messages because retransmits, but what we 769 * cannot allow is to transmit half messages which may be processed 770 * at the other side. 771 * 772 * our policy is to start on a clean state when disconnects, we don't 773 * know what's send/received on transport layer in this case. 774 */ 775 spin_lock_bh(&con->writequeue_lock); 776 if (!list_empty(&con->writequeue)) { 777 e = list_first_entry(&con->writequeue, struct writequeue_entry, 778 list); 779 if (e->dirty) 780 free_entry(e); 781 } 782 spin_unlock_bh(&con->writequeue_lock); 783 784 con->rx_leftover = 0; 785 con->retries = 0; 786 clear_bit(CF_APP_LIMITED, &con->flags); 787 clear_bit(CF_RECV_PENDING, &con->flags); 788 clear_bit(CF_SEND_PENDING, &con->flags); 789 up_write(&con->sock_lock); 790 } 791 792 static struct processqueue_entry *new_processqueue_entry(int nodeid, 793 int buflen) 794 { 795 struct processqueue_entry *pentry; 796 797 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 798 if (!pentry) 799 return NULL; 800 801 pentry->buf = kmalloc(buflen, GFP_NOFS); 802 if (!pentry->buf) { 803 kfree(pentry); 804 return NULL; 805 } 806 807 pentry->nodeid = nodeid; 808 return pentry; 809 } 810 811 static void free_processqueue_entry(struct processqueue_entry *pentry) 812 { 813 kfree(pentry->buf); 814 kfree(pentry); 815 } 816 817 struct dlm_processed_nodes { 818 int nodeid; 819 820 struct list_head list; 821 }; 822 823 static void add_processed_node(int nodeid, struct list_head *processed_nodes) 824 { 825 struct dlm_processed_nodes *n; 826 827 list_for_each_entry(n, processed_nodes, list) { 828 /* we already remembered this node */ 829 if (n->nodeid == nodeid) 830 return; 831 } 832 833 /* if it's fails in worst case we simple don't send an ack back. 834 * We try it next time. 835 */ 836 n = kmalloc(sizeof(*n), GFP_NOFS); 837 if (!n) 838 return; 839 840 n->nodeid = nodeid; 841 list_add(&n->list, processed_nodes); 842 } 843 844 static void process_dlm_messages(struct work_struct *work) 845 { 846 struct dlm_processed_nodes *n, *n_tmp; 847 struct processqueue_entry *pentry; 848 LIST_HEAD(processed_nodes); 849 850 spin_lock(&processqueue_lock); 851 pentry = list_first_entry_or_null(&processqueue, 852 struct processqueue_entry, list); 853 if (WARN_ON_ONCE(!pentry)) { 854 spin_unlock(&processqueue_lock); 855 return; 856 } 857 858 list_del(&pentry->list); 859 spin_unlock(&processqueue_lock); 860 861 for (;;) { 862 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 863 pentry->buflen); 864 add_processed_node(pentry->nodeid, &processed_nodes); 865 free_processqueue_entry(pentry); 866 867 spin_lock(&processqueue_lock); 868 pentry = list_first_entry_or_null(&processqueue, 869 struct processqueue_entry, list); 870 if (!pentry) { 871 process_dlm_messages_pending = false; 872 spin_unlock(&processqueue_lock); 873 break; 874 } 875 876 list_del(&pentry->list); 877 spin_unlock(&processqueue_lock); 878 } 879 880 /* send ack back after we processed couple of messages */ 881 list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { 882 list_del(&n->list); 883 dlm_midcomms_receive_done(n->nodeid); 884 kfree(n); 885 } 886 } 887 888 /* Data received from remote end */ 889 static int receive_from_sock(struct connection *con, int buflen) 890 { 891 struct processqueue_entry *pentry; 892 int ret, buflen_real; 893 struct msghdr msg; 894 struct kvec iov; 895 896 pentry = new_processqueue_entry(con->nodeid, buflen); 897 if (!pentry) 898 return DLM_IO_RESCHED; 899 900 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 901 902 /* calculate new buffer parameter regarding last receive and 903 * possible leftover bytes 904 */ 905 iov.iov_base = pentry->buf + con->rx_leftover; 906 iov.iov_len = buflen - con->rx_leftover; 907 908 memset(&msg, 0, sizeof(msg)); 909 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 910 clear_bit(CF_RECV_INTR, &con->flags); 911 again: 912 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 913 msg.msg_flags); 914 trace_dlm_recv(con->nodeid, ret); 915 if (ret == -EAGAIN) { 916 lock_sock(con->sock->sk); 917 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 918 release_sock(con->sock->sk); 919 goto again; 920 } 921 922 clear_bit(CF_RECV_PENDING, &con->flags); 923 release_sock(con->sock->sk); 924 free_processqueue_entry(pentry); 925 return DLM_IO_END; 926 } else if (ret == 0) { 927 /* close will clear CF_RECV_PENDING */ 928 free_processqueue_entry(pentry); 929 return DLM_IO_EOF; 930 } else if (ret < 0) { 931 free_processqueue_entry(pentry); 932 return ret; 933 } 934 935 /* new buflen according readed bytes and leftover from last receive */ 936 buflen_real = ret + con->rx_leftover; 937 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 938 buflen_real); 939 if (ret < 0) { 940 free_processqueue_entry(pentry); 941 return ret; 942 } 943 944 pentry->buflen = ret; 945 946 /* calculate leftover bytes from process and put it into begin of 947 * the receive buffer, so next receive we have the full message 948 * at the start address of the receive buffer. 949 */ 950 con->rx_leftover = buflen_real - ret; 951 memmove(con->rx_leftover_buf, pentry->buf + ret, 952 con->rx_leftover); 953 954 spin_lock(&processqueue_lock); 955 list_add_tail(&pentry->list, &processqueue); 956 if (!process_dlm_messages_pending) { 957 process_dlm_messages_pending = true; 958 queue_work(process_workqueue, &process_work); 959 } 960 spin_unlock(&processqueue_lock); 961 962 return DLM_IO_SUCCESS; 963 } 964 965 /* Listening socket is busy, accept a connection */ 966 static int accept_from_sock(void) 967 { 968 struct sockaddr_storage peeraddr; 969 int len, idx, result, nodeid; 970 struct connection *newcon; 971 struct socket *newsock; 972 unsigned int mark; 973 974 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 975 if (result == -EAGAIN) 976 return DLM_IO_END; 977 else if (result < 0) 978 goto accept_err; 979 980 /* Get the connected socket's peer */ 981 memset(&peeraddr, 0, sizeof(peeraddr)); 982 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 983 if (len < 0) { 984 result = -ECONNABORTED; 985 goto accept_err; 986 } 987 988 /* Get the new node's NODEID */ 989 make_sockaddr(&peeraddr, 0, &len); 990 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 991 switch (peeraddr.ss_family) { 992 case AF_INET: { 993 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 994 995 log_print("connect from non cluster IPv4 node %pI4", 996 &sin->sin_addr); 997 break; 998 } 999 #if IS_ENABLED(CONFIG_IPV6) 1000 case AF_INET6: { 1001 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1002 1003 log_print("connect from non cluster IPv6 node %pI6c", 1004 &sin6->sin6_addr); 1005 break; 1006 } 1007 #endif 1008 default: 1009 log_print("invalid family from non cluster node"); 1010 break; 1011 } 1012 1013 sock_release(newsock); 1014 return -1; 1015 } 1016 1017 log_print("got connection from %d", nodeid); 1018 1019 /* Check to see if we already have a connection to this node. This 1020 * could happen if the two nodes initiate a connection at roughly 1021 * the same time and the connections cross on the wire. 1022 * In this case we store the incoming one in "othercon" 1023 */ 1024 idx = srcu_read_lock(&connections_srcu); 1025 newcon = nodeid2con(nodeid, 0); 1026 if (WARN_ON_ONCE(!newcon)) { 1027 srcu_read_unlock(&connections_srcu, idx); 1028 result = -ENOENT; 1029 goto accept_err; 1030 } 1031 1032 sock_set_mark(newsock->sk, mark); 1033 1034 down_write(&newcon->sock_lock); 1035 if (newcon->sock) { 1036 struct connection *othercon = newcon->othercon; 1037 1038 if (!othercon) { 1039 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1040 if (!othercon) { 1041 log_print("failed to allocate incoming socket"); 1042 up_write(&newcon->sock_lock); 1043 srcu_read_unlock(&connections_srcu, idx); 1044 result = -ENOMEM; 1045 goto accept_err; 1046 } 1047 1048 dlm_con_init(othercon, nodeid); 1049 lockdep_set_subclass(&othercon->sock_lock, 1); 1050 newcon->othercon = othercon; 1051 set_bit(CF_IS_OTHERCON, &othercon->flags); 1052 } else { 1053 /* close other sock con if we have something new */ 1054 close_connection(othercon, false); 1055 } 1056 1057 down_write(&othercon->sock_lock); 1058 add_sock(newsock, othercon); 1059 1060 /* check if we receved something while adding */ 1061 lock_sock(othercon->sock->sk); 1062 lowcomms_queue_rwork(othercon); 1063 release_sock(othercon->sock->sk); 1064 up_write(&othercon->sock_lock); 1065 } 1066 else { 1067 /* accept copies the sk after we've saved the callbacks, so we 1068 don't want to save them a second time or comm errors will 1069 result in calling sk_error_report recursively. */ 1070 add_sock(newsock, newcon); 1071 1072 /* check if we receved something while adding */ 1073 lock_sock(newcon->sock->sk); 1074 lowcomms_queue_rwork(newcon); 1075 release_sock(newcon->sock->sk); 1076 } 1077 up_write(&newcon->sock_lock); 1078 srcu_read_unlock(&connections_srcu, idx); 1079 1080 return DLM_IO_SUCCESS; 1081 1082 accept_err: 1083 if (newsock) 1084 sock_release(newsock); 1085 1086 return result; 1087 } 1088 1089 /* 1090 * writequeue_entry_complete - try to delete and free write queue entry 1091 * @e: write queue entry to try to delete 1092 * @completed: bytes completed 1093 * 1094 * writequeue_lock must be held. 1095 */ 1096 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1097 { 1098 e->offset += completed; 1099 e->len -= completed; 1100 /* signal that page was half way transmitted */ 1101 e->dirty = true; 1102 1103 if (e->len == 0 && e->users == 0) 1104 free_entry(e); 1105 } 1106 1107 /* 1108 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1109 */ 1110 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1111 { 1112 struct sockaddr_storage localaddr; 1113 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1114 int i, addr_len, result = 0; 1115 1116 for (i = 0; i < dlm_local_count; i++) { 1117 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1118 make_sockaddr(&localaddr, port, &addr_len); 1119 1120 if (!i) 1121 result = kernel_bind(sock, addr, addr_len); 1122 else 1123 result = sock_bind_add(sock->sk, addr, addr_len); 1124 1125 if (result < 0) { 1126 log_print("Can't bind to %d addr number %d, %d.\n", 1127 port, i + 1, result); 1128 break; 1129 } 1130 } 1131 return result; 1132 } 1133 1134 /* Get local addresses */ 1135 static void init_local(void) 1136 { 1137 struct sockaddr_storage sas; 1138 int i; 1139 1140 dlm_local_count = 0; 1141 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1142 if (dlm_our_addr(&sas, i)) 1143 break; 1144 1145 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1146 } 1147 } 1148 1149 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1150 { 1151 struct writequeue_entry *entry; 1152 1153 entry = dlm_allocate_writequeue(); 1154 if (!entry) 1155 return NULL; 1156 1157 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1158 if (!entry->page) { 1159 dlm_free_writequeue(entry); 1160 return NULL; 1161 } 1162 1163 entry->offset = 0; 1164 entry->len = 0; 1165 entry->end = 0; 1166 entry->dirty = false; 1167 entry->con = con; 1168 entry->users = 1; 1169 kref_init(&entry->ref); 1170 return entry; 1171 } 1172 1173 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1174 char **ppc, void (*cb)(void *data), 1175 void *data) 1176 { 1177 struct writequeue_entry *e; 1178 1179 spin_lock_bh(&con->writequeue_lock); 1180 if (!list_empty(&con->writequeue)) { 1181 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1182 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1183 kref_get(&e->ref); 1184 1185 *ppc = page_address(e->page) + e->end; 1186 if (cb) 1187 cb(data); 1188 1189 e->end += len; 1190 e->users++; 1191 goto out; 1192 } 1193 } 1194 1195 e = new_writequeue_entry(con); 1196 if (!e) 1197 goto out; 1198 1199 kref_get(&e->ref); 1200 *ppc = page_address(e->page); 1201 e->end += len; 1202 if (cb) 1203 cb(data); 1204 1205 list_add_tail(&e->list, &con->writequeue); 1206 1207 out: 1208 spin_unlock_bh(&con->writequeue_lock); 1209 return e; 1210 }; 1211 1212 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1213 gfp_t allocation, char **ppc, 1214 void (*cb)(void *data), 1215 void *data) 1216 { 1217 struct writequeue_entry *e; 1218 struct dlm_msg *msg; 1219 1220 msg = dlm_allocate_msg(allocation); 1221 if (!msg) 1222 return NULL; 1223 1224 kref_init(&msg->ref); 1225 1226 e = new_wq_entry(con, len, ppc, cb, data); 1227 if (!e) { 1228 dlm_free_msg(msg); 1229 return NULL; 1230 } 1231 1232 msg->retransmit = false; 1233 msg->orig_msg = NULL; 1234 msg->ppc = *ppc; 1235 msg->len = len; 1236 msg->entry = e; 1237 1238 return msg; 1239 } 1240 1241 /* avoid false positive for nodes_srcu, unlock happens in 1242 * dlm_lowcomms_commit_msg which is a must call if success 1243 */ 1244 #ifndef __CHECKER__ 1245 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1246 char **ppc, void (*cb)(void *data), 1247 void *data) 1248 { 1249 struct connection *con; 1250 struct dlm_msg *msg; 1251 int idx; 1252 1253 if (len > DLM_MAX_SOCKET_BUFSIZE || 1254 len < sizeof(struct dlm_header)) { 1255 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1256 log_print("failed to allocate a buffer of size %d", len); 1257 WARN_ON_ONCE(1); 1258 return NULL; 1259 } 1260 1261 idx = srcu_read_lock(&connections_srcu); 1262 con = nodeid2con(nodeid, 0); 1263 if (WARN_ON_ONCE(!con)) { 1264 srcu_read_unlock(&connections_srcu, idx); 1265 return NULL; 1266 } 1267 1268 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1269 if (!msg) { 1270 srcu_read_unlock(&connections_srcu, idx); 1271 return NULL; 1272 } 1273 1274 /* for dlm_lowcomms_commit_msg() */ 1275 kref_get(&msg->ref); 1276 /* we assume if successful commit must called */ 1277 msg->idx = idx; 1278 return msg; 1279 } 1280 #endif 1281 1282 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1283 { 1284 struct writequeue_entry *e = msg->entry; 1285 struct connection *con = e->con; 1286 int users; 1287 1288 spin_lock_bh(&con->writequeue_lock); 1289 kref_get(&msg->ref); 1290 list_add(&msg->list, &e->msgs); 1291 1292 users = --e->users; 1293 if (users) 1294 goto out; 1295 1296 e->len = DLM_WQ_LENGTH_BYTES(e); 1297 1298 lowcomms_queue_swork(con); 1299 1300 out: 1301 spin_unlock_bh(&con->writequeue_lock); 1302 return; 1303 } 1304 1305 /* avoid false positive for nodes_srcu, lock was happen in 1306 * dlm_lowcomms_new_msg 1307 */ 1308 #ifndef __CHECKER__ 1309 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1310 { 1311 _dlm_lowcomms_commit_msg(msg); 1312 srcu_read_unlock(&connections_srcu, msg->idx); 1313 /* because dlm_lowcomms_new_msg() */ 1314 kref_put(&msg->ref, dlm_msg_release); 1315 } 1316 #endif 1317 1318 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1319 { 1320 kref_put(&msg->ref, dlm_msg_release); 1321 } 1322 1323 /* does not held connections_srcu, usage lowcomms_error_report only */ 1324 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1325 { 1326 struct dlm_msg *msg_resend; 1327 char *ppc; 1328 1329 if (msg->retransmit) 1330 return 1; 1331 1332 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1333 GFP_ATOMIC, &ppc, NULL, NULL); 1334 if (!msg_resend) 1335 return -ENOMEM; 1336 1337 msg->retransmit = true; 1338 kref_get(&msg->ref); 1339 msg_resend->orig_msg = msg; 1340 1341 memcpy(ppc, msg->ppc, msg->len); 1342 _dlm_lowcomms_commit_msg(msg_resend); 1343 dlm_lowcomms_put_msg(msg_resend); 1344 1345 return 0; 1346 } 1347 1348 /* Send a message */ 1349 static int send_to_sock(struct connection *con) 1350 { 1351 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1352 struct writequeue_entry *e; 1353 int len, offset, ret; 1354 1355 spin_lock_bh(&con->writequeue_lock); 1356 e = con_next_wq(con); 1357 if (!e) { 1358 clear_bit(CF_SEND_PENDING, &con->flags); 1359 spin_unlock_bh(&con->writequeue_lock); 1360 return DLM_IO_END; 1361 } 1362 1363 len = e->len; 1364 offset = e->offset; 1365 WARN_ON_ONCE(len == 0 && e->users == 0); 1366 spin_unlock_bh(&con->writequeue_lock); 1367 1368 ret = kernel_sendpage(con->sock, e->page, offset, len, 1369 msg_flags); 1370 trace_dlm_send(con->nodeid, ret); 1371 if (ret == -EAGAIN || ret == 0) { 1372 lock_sock(con->sock->sk); 1373 spin_lock_bh(&con->writequeue_lock); 1374 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1375 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1376 /* Notify TCP that we're limited by the 1377 * application window size. 1378 */ 1379 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1380 con->sock->sk->sk_write_pending++; 1381 1382 clear_bit(CF_SEND_PENDING, &con->flags); 1383 spin_unlock_bh(&con->writequeue_lock); 1384 release_sock(con->sock->sk); 1385 1386 /* wait for write_space() event */ 1387 return DLM_IO_END; 1388 } 1389 spin_unlock_bh(&con->writequeue_lock); 1390 release_sock(con->sock->sk); 1391 1392 return DLM_IO_RESCHED; 1393 } else if (ret < 0) { 1394 return ret; 1395 } 1396 1397 spin_lock_bh(&con->writequeue_lock); 1398 writequeue_entry_complete(e, ret); 1399 spin_unlock_bh(&con->writequeue_lock); 1400 1401 return DLM_IO_SUCCESS; 1402 } 1403 1404 static void clean_one_writequeue(struct connection *con) 1405 { 1406 struct writequeue_entry *e, *safe; 1407 1408 spin_lock_bh(&con->writequeue_lock); 1409 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1410 free_entry(e); 1411 } 1412 spin_unlock_bh(&con->writequeue_lock); 1413 } 1414 1415 static void connection_release(struct rcu_head *rcu) 1416 { 1417 struct connection *con = container_of(rcu, struct connection, rcu); 1418 1419 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1420 WARN_ON_ONCE(con->sock); 1421 kfree(con); 1422 } 1423 1424 /* Called from recovery when it knows that a node has 1425 left the cluster */ 1426 int dlm_lowcomms_close(int nodeid) 1427 { 1428 struct connection *con; 1429 int idx; 1430 1431 log_print("closing connection to node %d", nodeid); 1432 1433 idx = srcu_read_lock(&connections_srcu); 1434 con = nodeid2con(nodeid, 0); 1435 if (WARN_ON_ONCE(!con)) { 1436 srcu_read_unlock(&connections_srcu, idx); 1437 return -ENOENT; 1438 } 1439 1440 stop_connection_io(con); 1441 log_print("io handling for node: %d stopped", nodeid); 1442 close_connection(con, true); 1443 1444 spin_lock(&connections_lock); 1445 hlist_del_rcu(&con->list); 1446 spin_unlock(&connections_lock); 1447 1448 clean_one_writequeue(con); 1449 call_srcu(&connections_srcu, &con->rcu, connection_release); 1450 if (con->othercon) { 1451 clean_one_writequeue(con->othercon); 1452 if (con->othercon) 1453 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1454 } 1455 srcu_read_unlock(&connections_srcu, idx); 1456 1457 /* for debugging we print when we are done to compare with other 1458 * messages in between. This function need to be correctly synchronized 1459 * with io handling 1460 */ 1461 log_print("closing connection to node %d done", nodeid); 1462 1463 return 0; 1464 } 1465 1466 /* Receive worker function */ 1467 static void process_recv_sockets(struct work_struct *work) 1468 { 1469 struct connection *con = container_of(work, struct connection, rwork); 1470 int ret, buflen; 1471 1472 down_read(&con->sock_lock); 1473 if (!con->sock) { 1474 up_read(&con->sock_lock); 1475 return; 1476 } 1477 1478 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1479 do { 1480 ret = receive_from_sock(con, buflen); 1481 } while (ret == DLM_IO_SUCCESS); 1482 up_read(&con->sock_lock); 1483 1484 switch (ret) { 1485 case DLM_IO_END: 1486 /* CF_RECV_PENDING cleared */ 1487 break; 1488 case DLM_IO_EOF: 1489 close_connection(con, false); 1490 /* CF_RECV_PENDING cleared */ 1491 break; 1492 case DLM_IO_RESCHED: 1493 cond_resched(); 1494 queue_work(io_workqueue, &con->rwork); 1495 /* CF_RECV_PENDING not cleared */ 1496 break; 1497 default: 1498 if (ret < 0) { 1499 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1500 close_connection(con, false); 1501 } else { 1502 spin_lock_bh(&con->writequeue_lock); 1503 lowcomms_queue_swork(con); 1504 spin_unlock_bh(&con->writequeue_lock); 1505 } 1506 1507 /* CF_RECV_PENDING cleared for othercon 1508 * we trigger send queue if not already done 1509 * and process_send_sockets will handle it 1510 */ 1511 break; 1512 } 1513 1514 WARN_ON_ONCE(1); 1515 break; 1516 } 1517 } 1518 1519 static void process_listen_recv_socket(struct work_struct *work) 1520 { 1521 int ret; 1522 1523 if (WARN_ON_ONCE(!listen_con.sock)) 1524 return; 1525 1526 do { 1527 ret = accept_from_sock(); 1528 } while (ret == DLM_IO_SUCCESS); 1529 1530 if (ret < 0) 1531 log_print("critical error accepting connection: %d", ret); 1532 } 1533 1534 static int dlm_connect(struct connection *con) 1535 { 1536 struct sockaddr_storage addr; 1537 int result, addr_len; 1538 struct socket *sock; 1539 unsigned int mark; 1540 1541 memset(&addr, 0, sizeof(addr)); 1542 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1543 dlm_proto_ops->try_new_addr, &mark); 1544 if (result < 0) { 1545 log_print("no address for nodeid %d", con->nodeid); 1546 return result; 1547 } 1548 1549 /* Create a socket to communicate with */ 1550 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1551 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1552 if (result < 0) 1553 return result; 1554 1555 sock_set_mark(sock->sk, mark); 1556 dlm_proto_ops->sockopts(sock); 1557 1558 result = dlm_proto_ops->bind(sock); 1559 if (result < 0) { 1560 sock_release(sock); 1561 return result; 1562 } 1563 1564 add_sock(sock, con); 1565 1566 log_print_ratelimited("connecting to %d", con->nodeid); 1567 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1568 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1569 addr_len); 1570 switch (result) { 1571 case -EINPROGRESS: 1572 /* not an error */ 1573 fallthrough; 1574 case 0: 1575 break; 1576 default: 1577 if (result < 0) 1578 dlm_close_sock(&con->sock); 1579 1580 break; 1581 } 1582 1583 return result; 1584 } 1585 1586 /* Send worker function */ 1587 static void process_send_sockets(struct work_struct *work) 1588 { 1589 struct connection *con = container_of(work, struct connection, swork); 1590 int ret; 1591 1592 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1593 1594 down_read(&con->sock_lock); 1595 if (!con->sock) { 1596 up_read(&con->sock_lock); 1597 down_write(&con->sock_lock); 1598 if (!con->sock) { 1599 ret = dlm_connect(con); 1600 switch (ret) { 1601 case 0: 1602 break; 1603 case -EINPROGRESS: 1604 /* avoid spamming resched on connection 1605 * we might can switch to a state_change 1606 * event based mechanism if established 1607 */ 1608 msleep(100); 1609 break; 1610 default: 1611 /* CF_SEND_PENDING not cleared */ 1612 up_write(&con->sock_lock); 1613 log_print("connect to node %d try %d error %d", 1614 con->nodeid, con->retries++, ret); 1615 msleep(1000); 1616 /* For now we try forever to reconnect. In 1617 * future we should send a event to cluster 1618 * manager to fence itself after certain amount 1619 * of retries. 1620 */ 1621 queue_work(io_workqueue, &con->swork); 1622 return; 1623 } 1624 } 1625 downgrade_write(&con->sock_lock); 1626 } 1627 1628 do { 1629 ret = send_to_sock(con); 1630 } while (ret == DLM_IO_SUCCESS); 1631 up_read(&con->sock_lock); 1632 1633 switch (ret) { 1634 case DLM_IO_END: 1635 /* CF_SEND_PENDING cleared */ 1636 break; 1637 case DLM_IO_RESCHED: 1638 /* CF_SEND_PENDING not cleared */ 1639 cond_resched(); 1640 queue_work(io_workqueue, &con->swork); 1641 break; 1642 default: 1643 if (ret < 0) { 1644 close_connection(con, false); 1645 1646 /* CF_SEND_PENDING cleared */ 1647 spin_lock_bh(&con->writequeue_lock); 1648 lowcomms_queue_swork(con); 1649 spin_unlock_bh(&con->writequeue_lock); 1650 break; 1651 } 1652 1653 WARN_ON_ONCE(1); 1654 break; 1655 } 1656 } 1657 1658 static void work_stop(void) 1659 { 1660 if (io_workqueue) { 1661 destroy_workqueue(io_workqueue); 1662 io_workqueue = NULL; 1663 } 1664 1665 if (process_workqueue) { 1666 destroy_workqueue(process_workqueue); 1667 process_workqueue = NULL; 1668 } 1669 } 1670 1671 static int work_start(void) 1672 { 1673 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM, 1674 0); 1675 if (!io_workqueue) { 1676 log_print("can't start dlm_io"); 1677 return -ENOMEM; 1678 } 1679 1680 /* ordered dlm message process queue, 1681 * should be converted to a tasklet 1682 */ 1683 process_workqueue = alloc_ordered_workqueue("dlm_process", 1684 WQ_HIGHPRI | WQ_MEM_RECLAIM); 1685 if (!process_workqueue) { 1686 log_print("can't start dlm_process"); 1687 destroy_workqueue(io_workqueue); 1688 io_workqueue = NULL; 1689 return -ENOMEM; 1690 } 1691 1692 return 0; 1693 } 1694 1695 void dlm_lowcomms_shutdown(void) 1696 { 1697 /* stop lowcomms_listen_data_ready calls */ 1698 lock_sock(listen_con.sock->sk); 1699 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1700 release_sock(listen_con.sock->sk); 1701 1702 cancel_work_sync(&listen_con.rwork); 1703 dlm_close_sock(&listen_con.sock); 1704 1705 flush_workqueue(process_workqueue); 1706 } 1707 1708 void dlm_lowcomms_shutdown_node(int nodeid, bool force) 1709 { 1710 struct connection *con; 1711 int idx; 1712 1713 idx = srcu_read_lock(&connections_srcu); 1714 con = nodeid2con(nodeid, 0); 1715 if (WARN_ON_ONCE(!con)) { 1716 srcu_read_unlock(&connections_srcu, idx); 1717 return; 1718 } 1719 1720 flush_work(&con->swork); 1721 stop_connection_io(con); 1722 WARN_ON_ONCE(!force && !list_empty(&con->writequeue)); 1723 close_connection(con, true); 1724 clean_one_writequeue(con); 1725 if (con->othercon) 1726 clean_one_writequeue(con->othercon); 1727 allow_connection_io(con); 1728 srcu_read_unlock(&connections_srcu, idx); 1729 } 1730 1731 void dlm_lowcomms_stop(void) 1732 { 1733 work_stop(); 1734 dlm_proto_ops = NULL; 1735 } 1736 1737 static int dlm_listen_for_all(void) 1738 { 1739 struct socket *sock; 1740 int result; 1741 1742 log_print("Using %s for communications", 1743 dlm_proto_ops->name); 1744 1745 result = dlm_proto_ops->listen_validate(); 1746 if (result < 0) 1747 return result; 1748 1749 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1750 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1751 if (result < 0) { 1752 log_print("Can't create comms socket: %d", result); 1753 return result; 1754 } 1755 1756 sock_set_mark(sock->sk, dlm_config.ci_mark); 1757 dlm_proto_ops->listen_sockopts(sock); 1758 1759 result = dlm_proto_ops->listen_bind(sock); 1760 if (result < 0) 1761 goto out; 1762 1763 lock_sock(sock->sk); 1764 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1765 listen_sock.sk_write_space = sock->sk->sk_write_space; 1766 listen_sock.sk_error_report = sock->sk->sk_error_report; 1767 listen_sock.sk_state_change = sock->sk->sk_state_change; 1768 1769 listen_con.sock = sock; 1770 1771 sock->sk->sk_allocation = GFP_NOFS; 1772 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1773 release_sock(sock->sk); 1774 1775 result = sock->ops->listen(sock, 5); 1776 if (result < 0) { 1777 dlm_close_sock(&listen_con.sock); 1778 return result; 1779 } 1780 1781 return 0; 1782 1783 out: 1784 sock_release(sock); 1785 return result; 1786 } 1787 1788 static int dlm_tcp_bind(struct socket *sock) 1789 { 1790 struct sockaddr_storage src_addr; 1791 int result, addr_len; 1792 1793 /* Bind to our cluster-known address connecting to avoid 1794 * routing problems. 1795 */ 1796 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1797 make_sockaddr(&src_addr, 0, &addr_len); 1798 1799 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1800 addr_len); 1801 if (result < 0) { 1802 /* This *may* not indicate a critical error */ 1803 log_print("could not bind for connect: %d", result); 1804 } 1805 1806 return 0; 1807 } 1808 1809 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1810 struct sockaddr *addr, int addr_len) 1811 { 1812 return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1813 } 1814 1815 static int dlm_tcp_listen_validate(void) 1816 { 1817 /* We don't support multi-homed hosts */ 1818 if (dlm_local_count > 1) { 1819 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1820 return -EINVAL; 1821 } 1822 1823 return 0; 1824 } 1825 1826 static void dlm_tcp_sockopts(struct socket *sock) 1827 { 1828 /* Turn off Nagle's algorithm */ 1829 tcp_sock_set_nodelay(sock->sk); 1830 } 1831 1832 static void dlm_tcp_listen_sockopts(struct socket *sock) 1833 { 1834 dlm_tcp_sockopts(sock); 1835 sock_set_reuseaddr(sock->sk); 1836 } 1837 1838 static int dlm_tcp_listen_bind(struct socket *sock) 1839 { 1840 int addr_len; 1841 1842 /* Bind to our port */ 1843 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1844 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1845 addr_len); 1846 } 1847 1848 static const struct dlm_proto_ops dlm_tcp_ops = { 1849 .name = "TCP", 1850 .proto = IPPROTO_TCP, 1851 .connect = dlm_tcp_connect, 1852 .sockopts = dlm_tcp_sockopts, 1853 .bind = dlm_tcp_bind, 1854 .listen_validate = dlm_tcp_listen_validate, 1855 .listen_sockopts = dlm_tcp_listen_sockopts, 1856 .listen_bind = dlm_tcp_listen_bind, 1857 }; 1858 1859 static int dlm_sctp_bind(struct socket *sock) 1860 { 1861 return sctp_bind_addrs(sock, 0); 1862 } 1863 1864 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1865 struct sockaddr *addr, int addr_len) 1866 { 1867 int ret; 1868 1869 /* 1870 * Make sock->ops->connect() function return in specified time, 1871 * since O_NONBLOCK argument in connect() function does not work here, 1872 * then, we should restore the default value of this attribute. 1873 */ 1874 sock_set_sndtimeo(sock->sk, 5); 1875 ret = sock->ops->connect(sock, addr, addr_len, 0); 1876 sock_set_sndtimeo(sock->sk, 0); 1877 return ret; 1878 } 1879 1880 static int dlm_sctp_listen_validate(void) 1881 { 1882 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1883 log_print("SCTP is not enabled by this kernel"); 1884 return -EOPNOTSUPP; 1885 } 1886 1887 request_module("sctp"); 1888 return 0; 1889 } 1890 1891 static int dlm_sctp_bind_listen(struct socket *sock) 1892 { 1893 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1894 } 1895 1896 static void dlm_sctp_sockopts(struct socket *sock) 1897 { 1898 /* Turn off Nagle's algorithm */ 1899 sctp_sock_set_nodelay(sock->sk); 1900 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1901 } 1902 1903 static const struct dlm_proto_ops dlm_sctp_ops = { 1904 .name = "SCTP", 1905 .proto = IPPROTO_SCTP, 1906 .try_new_addr = true, 1907 .connect = dlm_sctp_connect, 1908 .sockopts = dlm_sctp_sockopts, 1909 .bind = dlm_sctp_bind, 1910 .listen_validate = dlm_sctp_listen_validate, 1911 .listen_sockopts = dlm_sctp_sockopts, 1912 .listen_bind = dlm_sctp_bind_listen, 1913 }; 1914 1915 int dlm_lowcomms_start(void) 1916 { 1917 int error; 1918 1919 init_local(); 1920 if (!dlm_local_count) { 1921 error = -ENOTCONN; 1922 log_print("no local IP address has been set"); 1923 goto fail; 1924 } 1925 1926 error = work_start(); 1927 if (error) 1928 goto fail; 1929 1930 /* Start listening */ 1931 switch (dlm_config.ci_protocol) { 1932 case DLM_PROTO_TCP: 1933 dlm_proto_ops = &dlm_tcp_ops; 1934 break; 1935 case DLM_PROTO_SCTP: 1936 dlm_proto_ops = &dlm_sctp_ops; 1937 break; 1938 default: 1939 log_print("Invalid protocol identifier %d set", 1940 dlm_config.ci_protocol); 1941 error = -EINVAL; 1942 goto fail_proto_ops; 1943 } 1944 1945 error = dlm_listen_for_all(); 1946 if (error) 1947 goto fail_listen; 1948 1949 return 0; 1950 1951 fail_listen: 1952 dlm_proto_ops = NULL; 1953 fail_proto_ops: 1954 work_stop(); 1955 fail: 1956 return error; 1957 } 1958 1959 void dlm_lowcomms_init(void) 1960 { 1961 int i; 1962 1963 for (i = 0; i < CONN_HASH_SIZE; i++) 1964 INIT_HLIST_HEAD(&connection_hash[i]); 1965 1966 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1967 } 1968 1969 void dlm_lowcomms_exit(void) 1970 { 1971 struct connection *con; 1972 int i, idx; 1973 1974 idx = srcu_read_lock(&connections_srcu); 1975 for (i = 0; i < CONN_HASH_SIZE; i++) { 1976 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1977 spin_lock(&connections_lock); 1978 hlist_del_rcu(&con->list); 1979 spin_unlock(&connections_lock); 1980 1981 if (con->othercon) 1982 call_srcu(&connections_srcu, &con->othercon->rcu, 1983 connection_release); 1984 call_srcu(&connections_srcu, &con->rcu, connection_release); 1985 } 1986 } 1987 srcu_read_unlock(&connections_srcu, idx); 1988 } 1989