1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 58 #include "dlm_internal.h" 59 #include "lowcomms.h" 60 #include "midcomms.h" 61 #include "memory.h" 62 #include "config.h" 63 64 #define NEEDED_RMEM (4*1024*1024) 65 66 struct connection { 67 struct socket *sock; /* NULL if not connected */ 68 uint32_t nodeid; /* So we know who we are in the list */ 69 /* this semaphore is used to allow parallel recv/send in read 70 * lock mode. When we release a sock we need to held the write lock. 71 * 72 * However this is locking code and not nice. When we remove the 73 * othercon handling we can look into other mechanism to synchronize 74 * io handling to call sock_release() at the right time. 75 */ 76 struct rw_semaphore sock_lock; 77 unsigned long flags; 78 #define CF_APP_LIMITED 0 79 #define CF_RECV_PENDING 1 80 #define CF_SEND_PENDING 2 81 #define CF_RECV_INTR 3 82 #define CF_IO_STOP 4 83 #define CF_IS_OTHERCON 5 84 struct list_head writequeue; /* List of outgoing writequeue_entries */ 85 spinlock_t writequeue_lock; 86 int retries; 87 struct hlist_node list; 88 /* due some connect()/accept() races we currently have this cross over 89 * connection attempt second connection for one node. 90 * 91 * There is a solution to avoid the race by introducing a connect 92 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 93 * connect. Otherside can connect but will only be considered that 94 * the other side wants to have a reconnect. 95 * 96 * However changing to this behaviour will break backwards compatible. 97 * In a DLM protocol major version upgrade we should remove this! 98 */ 99 struct connection *othercon; 100 struct work_struct rwork; /* receive worker */ 101 struct work_struct swork; /* send worker */ 102 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 103 int rx_leftover; 104 int mark; 105 int addr_count; 106 int curr_addr_index; 107 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 108 spinlock_t addrs_lock; 109 struct rcu_head rcu; 110 }; 111 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 112 113 struct listen_connection { 114 struct socket *sock; 115 struct work_struct rwork; 116 }; 117 118 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 119 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 120 121 /* An entry waiting to be sent */ 122 struct writequeue_entry { 123 struct list_head list; 124 struct page *page; 125 int offset; 126 int len; 127 int end; 128 int users; 129 bool dirty; 130 struct connection *con; 131 struct list_head msgs; 132 struct kref ref; 133 }; 134 135 struct dlm_msg { 136 struct writequeue_entry *entry; 137 struct dlm_msg *orig_msg; 138 bool retransmit; 139 void *ppc; 140 int len; 141 int idx; /* new()/commit() idx exchange */ 142 143 struct list_head list; 144 struct kref ref; 145 }; 146 147 struct processqueue_entry { 148 unsigned char *buf; 149 int nodeid; 150 int buflen; 151 152 struct list_head list; 153 }; 154 155 struct dlm_proto_ops { 156 bool try_new_addr; 157 const char *name; 158 int proto; 159 160 int (*connect)(struct connection *con, struct socket *sock, 161 struct sockaddr *addr, int addr_len); 162 void (*sockopts)(struct socket *sock); 163 int (*bind)(struct socket *sock); 164 int (*listen_validate)(void); 165 void (*listen_sockopts)(struct socket *sock); 166 int (*listen_bind)(struct socket *sock); 167 }; 168 169 static struct listen_sock_callbacks { 170 void (*sk_error_report)(struct sock *); 171 void (*sk_data_ready)(struct sock *); 172 void (*sk_state_change)(struct sock *); 173 void (*sk_write_space)(struct sock *); 174 } listen_sock; 175 176 static struct listen_connection listen_con; 177 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 178 static int dlm_local_count; 179 180 /* Work queues */ 181 static struct workqueue_struct *io_workqueue; 182 static struct workqueue_struct *process_workqueue; 183 184 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 185 static DEFINE_SPINLOCK(connections_lock); 186 DEFINE_STATIC_SRCU(connections_srcu); 187 188 static const struct dlm_proto_ops *dlm_proto_ops; 189 190 #define DLM_IO_SUCCESS 0 191 #define DLM_IO_END 1 192 #define DLM_IO_EOF 2 193 #define DLM_IO_RESCHED 3 194 195 static void process_recv_sockets(struct work_struct *work); 196 static void process_send_sockets(struct work_struct *work); 197 static void process_dlm_messages(struct work_struct *work); 198 199 static DECLARE_WORK(process_work, process_dlm_messages); 200 static DEFINE_SPINLOCK(processqueue_lock); 201 static bool process_dlm_messages_pending; 202 static LIST_HEAD(processqueue); 203 204 bool dlm_lowcomms_is_running(void) 205 { 206 return !!listen_con.sock; 207 } 208 209 static void lowcomms_queue_swork(struct connection *con) 210 { 211 WARN_ON_ONCE(!lockdep_is_held(&con->writequeue_lock)); 212 213 if (!test_bit(CF_IO_STOP, &con->flags) && 214 !test_bit(CF_APP_LIMITED, &con->flags) && 215 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 216 queue_work(io_workqueue, &con->swork); 217 } 218 219 static void lowcomms_queue_rwork(struct connection *con) 220 { 221 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 222 223 if (!test_bit(CF_IO_STOP, &con->flags) && 224 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 225 queue_work(io_workqueue, &con->rwork); 226 } 227 228 static void writequeue_entry_ctor(void *data) 229 { 230 struct writequeue_entry *entry = data; 231 232 INIT_LIST_HEAD(&entry->msgs); 233 } 234 235 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 236 { 237 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 238 0, 0, writequeue_entry_ctor); 239 } 240 241 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 242 { 243 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 244 } 245 246 /* need to held writequeue_lock */ 247 static struct writequeue_entry *con_next_wq(struct connection *con) 248 { 249 struct writequeue_entry *e; 250 251 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 252 list); 253 /* if len is zero nothing is to send, if there are users filling 254 * buffers we wait until the users are done so we can send more. 255 */ 256 if (!e || e->users || e->len == 0) 257 return NULL; 258 259 return e; 260 } 261 262 static struct connection *__find_con(int nodeid, int r) 263 { 264 struct connection *con; 265 266 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 267 if (con->nodeid == nodeid) 268 return con; 269 } 270 271 return NULL; 272 } 273 274 static void dlm_con_init(struct connection *con, int nodeid) 275 { 276 con->nodeid = nodeid; 277 init_rwsem(&con->sock_lock); 278 INIT_LIST_HEAD(&con->writequeue); 279 spin_lock_init(&con->writequeue_lock); 280 INIT_WORK(&con->swork, process_send_sockets); 281 INIT_WORK(&con->rwork, process_recv_sockets); 282 spin_lock_init(&con->addrs_lock); 283 } 284 285 /* 286 * If 'allocation' is zero then we don't attempt to create a new 287 * connection structure for this node. 288 */ 289 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 290 { 291 struct connection *con, *tmp; 292 int r; 293 294 r = nodeid_hash(nodeid); 295 con = __find_con(nodeid, r); 296 if (con || !alloc) 297 return con; 298 299 con = kzalloc(sizeof(*con), alloc); 300 if (!con) 301 return NULL; 302 303 dlm_con_init(con, nodeid); 304 305 spin_lock(&connections_lock); 306 /* Because multiple workqueues/threads calls this function it can 307 * race on multiple cpu's. Instead of locking hot path __find_con() 308 * we just check in rare cases of recently added nodes again 309 * under protection of connections_lock. If this is the case we 310 * abort our connection creation and return the existing connection. 311 */ 312 tmp = __find_con(nodeid, r); 313 if (tmp) { 314 spin_unlock(&connections_lock); 315 kfree(con); 316 return tmp; 317 } 318 319 hlist_add_head_rcu(&con->list, &connection_hash[r]); 320 spin_unlock(&connections_lock); 321 322 return con; 323 } 324 325 static int addr_compare(const struct sockaddr_storage *x, 326 const struct sockaddr_storage *y) 327 { 328 switch (x->ss_family) { 329 case AF_INET: { 330 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 331 struct sockaddr_in *siny = (struct sockaddr_in *)y; 332 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 333 return 0; 334 if (sinx->sin_port != siny->sin_port) 335 return 0; 336 break; 337 } 338 case AF_INET6: { 339 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 340 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 341 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 342 return 0; 343 if (sinx->sin6_port != siny->sin6_port) 344 return 0; 345 break; 346 } 347 default: 348 return 0; 349 } 350 return 1; 351 } 352 353 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 354 struct sockaddr *sa_out, bool try_new_addr, 355 unsigned int *mark) 356 { 357 struct sockaddr_storage sas; 358 struct connection *con; 359 int idx; 360 361 if (!dlm_local_count) 362 return -1; 363 364 idx = srcu_read_lock(&connections_srcu); 365 con = nodeid2con(nodeid, 0); 366 if (!con) { 367 srcu_read_unlock(&connections_srcu, idx); 368 return -ENOENT; 369 } 370 371 spin_lock(&con->addrs_lock); 372 if (!con->addr_count) { 373 spin_unlock(&con->addrs_lock); 374 srcu_read_unlock(&connections_srcu, idx); 375 return -ENOENT; 376 } 377 378 memcpy(&sas, &con->addr[con->curr_addr_index], 379 sizeof(struct sockaddr_storage)); 380 381 if (try_new_addr) { 382 con->curr_addr_index++; 383 if (con->curr_addr_index == con->addr_count) 384 con->curr_addr_index = 0; 385 } 386 387 *mark = con->mark; 388 spin_unlock(&con->addrs_lock); 389 390 if (sas_out) 391 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 392 393 if (!sa_out) { 394 srcu_read_unlock(&connections_srcu, idx); 395 return 0; 396 } 397 398 if (dlm_local_addr[0].ss_family == AF_INET) { 399 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 400 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 401 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 402 } else { 403 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 404 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 405 ret6->sin6_addr = in6->sin6_addr; 406 } 407 408 srcu_read_unlock(&connections_srcu, idx); 409 return 0; 410 } 411 412 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 413 unsigned int *mark) 414 { 415 struct connection *con; 416 int i, idx, addr_i; 417 418 idx = srcu_read_lock(&connections_srcu); 419 for (i = 0; i < CONN_HASH_SIZE; i++) { 420 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 421 WARN_ON_ONCE(!con->addr_count); 422 423 spin_lock(&con->addrs_lock); 424 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 425 if (addr_compare(&con->addr[addr_i], addr)) { 426 *nodeid = con->nodeid; 427 *mark = con->mark; 428 spin_unlock(&con->addrs_lock); 429 srcu_read_unlock(&connections_srcu, idx); 430 return 0; 431 } 432 } 433 spin_unlock(&con->addrs_lock); 434 } 435 } 436 srcu_read_unlock(&connections_srcu, idx); 437 438 return -ENOENT; 439 } 440 441 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 442 const struct sockaddr_storage *addr) 443 { 444 int i; 445 446 for (i = 0; i < con->addr_count; i++) { 447 if (addr_compare(&con->addr[i], addr)) 448 return true; 449 } 450 451 return false; 452 } 453 454 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 455 { 456 struct connection *con; 457 bool ret, idx; 458 459 idx = srcu_read_lock(&connections_srcu); 460 con = nodeid2con(nodeid, GFP_NOFS); 461 if (!con) { 462 srcu_read_unlock(&connections_srcu, idx); 463 return -ENOMEM; 464 } 465 466 spin_lock(&con->addrs_lock); 467 if (!con->addr_count) { 468 memcpy(&con->addr[0], addr, sizeof(*addr)); 469 con->addr_count = 1; 470 con->mark = dlm_config.ci_mark; 471 spin_unlock(&con->addrs_lock); 472 srcu_read_unlock(&connections_srcu, idx); 473 return 0; 474 } 475 476 ret = dlm_lowcomms_con_has_addr(con, addr); 477 if (ret) { 478 spin_unlock(&con->addrs_lock); 479 srcu_read_unlock(&connections_srcu, idx); 480 return -EEXIST; 481 } 482 483 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 484 spin_unlock(&con->addrs_lock); 485 srcu_read_unlock(&connections_srcu, idx); 486 return -ENOSPC; 487 } 488 489 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 490 srcu_read_unlock(&connections_srcu, idx); 491 spin_unlock(&con->addrs_lock); 492 return 0; 493 } 494 495 /* Data available on socket or listen socket received a connect */ 496 static void lowcomms_data_ready(struct sock *sk) 497 { 498 struct connection *con = sock2con(sk); 499 500 set_bit(CF_RECV_INTR, &con->flags); 501 lowcomms_queue_rwork(con); 502 } 503 504 static void lowcomms_write_space(struct sock *sk) 505 { 506 struct connection *con = sock2con(sk); 507 508 clear_bit(SOCK_NOSPACE, &con->sock->flags); 509 510 spin_lock_bh(&con->writequeue_lock); 511 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 512 con->sock->sk->sk_write_pending--; 513 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 514 } 515 516 lowcomms_queue_swork(con); 517 spin_unlock_bh(&con->writequeue_lock); 518 } 519 520 static void lowcomms_state_change(struct sock *sk) 521 { 522 /* SCTP layer is not calling sk_data_ready when the connection 523 * is done, so we catch the signal through here. 524 */ 525 if (sk->sk_shutdown == RCV_SHUTDOWN) 526 lowcomms_data_ready(sk); 527 } 528 529 static void lowcomms_listen_data_ready(struct sock *sk) 530 { 531 queue_work(io_workqueue, &listen_con.rwork); 532 } 533 534 int dlm_lowcomms_connect_node(int nodeid) 535 { 536 struct connection *con; 537 int idx; 538 539 if (nodeid == dlm_our_nodeid()) 540 return 0; 541 542 idx = srcu_read_lock(&connections_srcu); 543 con = nodeid2con(nodeid, 0); 544 if (WARN_ON_ONCE(!con)) { 545 srcu_read_unlock(&connections_srcu, idx); 546 return -ENOENT; 547 } 548 549 down_read(&con->sock_lock); 550 if (!con->sock) { 551 spin_lock_bh(&con->writequeue_lock); 552 lowcomms_queue_swork(con); 553 spin_unlock_bh(&con->writequeue_lock); 554 } 555 up_read(&con->sock_lock); 556 srcu_read_unlock(&connections_srcu, idx); 557 558 cond_resched(); 559 return 0; 560 } 561 562 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 563 { 564 struct connection *con; 565 int idx; 566 567 idx = srcu_read_lock(&connections_srcu); 568 con = nodeid2con(nodeid, 0); 569 if (!con) { 570 srcu_read_unlock(&connections_srcu, idx); 571 return -ENOENT; 572 } 573 574 spin_lock(&con->addrs_lock); 575 con->mark = mark; 576 spin_unlock(&con->addrs_lock); 577 srcu_read_unlock(&connections_srcu, idx); 578 return 0; 579 } 580 581 static void lowcomms_error_report(struct sock *sk) 582 { 583 struct connection *con = sock2con(sk); 584 struct inet_sock *inet; 585 586 inet = inet_sk(sk); 587 switch (sk->sk_family) { 588 case AF_INET: 589 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 590 "sending to node %d at %pI4, dport %d, " 591 "sk_err=%d/%d\n", dlm_our_nodeid(), 592 con->nodeid, &inet->inet_daddr, 593 ntohs(inet->inet_dport), sk->sk_err, 594 sk->sk_err_soft); 595 break; 596 #if IS_ENABLED(CONFIG_IPV6) 597 case AF_INET6: 598 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 599 "sending to node %d at %pI6c, " 600 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 601 con->nodeid, &sk->sk_v6_daddr, 602 ntohs(inet->inet_dport), sk->sk_err, 603 sk->sk_err_soft); 604 break; 605 #endif 606 default: 607 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 608 "invalid socket family %d set, " 609 "sk_err=%d/%d\n", dlm_our_nodeid(), 610 sk->sk_family, sk->sk_err, sk->sk_err_soft); 611 break; 612 } 613 614 dlm_midcomms_unack_msg_resend(con->nodeid); 615 616 listen_sock.sk_error_report(sk); 617 } 618 619 static void restore_callbacks(struct sock *sk) 620 { 621 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 622 623 sk->sk_user_data = NULL; 624 sk->sk_data_ready = listen_sock.sk_data_ready; 625 sk->sk_state_change = listen_sock.sk_state_change; 626 sk->sk_write_space = listen_sock.sk_write_space; 627 sk->sk_error_report = listen_sock.sk_error_report; 628 } 629 630 /* Make a socket active */ 631 static void add_sock(struct socket *sock, struct connection *con) 632 { 633 struct sock *sk = sock->sk; 634 635 lock_sock(sk); 636 con->sock = sock; 637 638 sk->sk_user_data = con; 639 sk->sk_data_ready = lowcomms_data_ready; 640 sk->sk_write_space = lowcomms_write_space; 641 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 642 sk->sk_state_change = lowcomms_state_change; 643 sk->sk_allocation = GFP_NOFS; 644 sk->sk_error_report = lowcomms_error_report; 645 release_sock(sk); 646 } 647 648 /* Add the port number to an IPv6 or 4 sockaddr and return the address 649 length */ 650 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 651 int *addr_len) 652 { 653 saddr->ss_family = dlm_local_addr[0].ss_family; 654 if (saddr->ss_family == AF_INET) { 655 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 656 in4_addr->sin_port = cpu_to_be16(port); 657 *addr_len = sizeof(struct sockaddr_in); 658 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 659 } else { 660 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 661 in6_addr->sin6_port = cpu_to_be16(port); 662 *addr_len = sizeof(struct sockaddr_in6); 663 } 664 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 665 } 666 667 static void dlm_page_release(struct kref *kref) 668 { 669 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 670 ref); 671 672 __free_page(e->page); 673 dlm_free_writequeue(e); 674 } 675 676 static void dlm_msg_release(struct kref *kref) 677 { 678 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 679 680 kref_put(&msg->entry->ref, dlm_page_release); 681 dlm_free_msg(msg); 682 } 683 684 static void free_entry(struct writequeue_entry *e) 685 { 686 struct dlm_msg *msg, *tmp; 687 688 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 689 if (msg->orig_msg) { 690 msg->orig_msg->retransmit = false; 691 kref_put(&msg->orig_msg->ref, dlm_msg_release); 692 } 693 694 list_del(&msg->list); 695 kref_put(&msg->ref, dlm_msg_release); 696 } 697 698 list_del(&e->list); 699 kref_put(&e->ref, dlm_page_release); 700 } 701 702 static void dlm_close_sock(struct socket **sock) 703 { 704 lock_sock((*sock)->sk); 705 restore_callbacks((*sock)->sk); 706 release_sock((*sock)->sk); 707 708 sock_release(*sock); 709 *sock = NULL; 710 } 711 712 static void allow_connection_io(struct connection *con) 713 { 714 if (con->othercon) 715 clear_bit(CF_IO_STOP, &con->othercon->flags); 716 clear_bit(CF_IO_STOP, &con->flags); 717 } 718 719 static void stop_connection_io(struct connection *con) 720 { 721 if (con->othercon) 722 stop_connection_io(con->othercon); 723 724 down_write(&con->sock_lock); 725 if (con->sock) { 726 lock_sock(con->sock->sk); 727 restore_callbacks(con->sock->sk); 728 729 spin_lock_bh(&con->writequeue_lock); 730 set_bit(CF_IO_STOP, &con->flags); 731 spin_unlock_bh(&con->writequeue_lock); 732 release_sock(con->sock->sk); 733 } else { 734 spin_lock_bh(&con->writequeue_lock); 735 set_bit(CF_IO_STOP, &con->flags); 736 spin_unlock_bh(&con->writequeue_lock); 737 } 738 up_write(&con->sock_lock); 739 740 cancel_work_sync(&con->swork); 741 cancel_work_sync(&con->rwork); 742 } 743 744 /* Close a remote connection and tidy up */ 745 static void close_connection(struct connection *con, bool and_other) 746 { 747 struct writequeue_entry *e; 748 749 if (con->othercon && and_other) 750 close_connection(con->othercon, false); 751 752 down_write(&con->sock_lock); 753 if (!con->sock) { 754 up_write(&con->sock_lock); 755 return; 756 } 757 758 dlm_close_sock(&con->sock); 759 760 /* if we send a writequeue entry only a half way, we drop the 761 * whole entry because reconnection and that we not start of the 762 * middle of a msg which will confuse the other end. 763 * 764 * we can always drop messages because retransmits, but what we 765 * cannot allow is to transmit half messages which may be processed 766 * at the other side. 767 * 768 * our policy is to start on a clean state when disconnects, we don't 769 * know what's send/received on transport layer in this case. 770 */ 771 spin_lock_bh(&con->writequeue_lock); 772 if (!list_empty(&con->writequeue)) { 773 e = list_first_entry(&con->writequeue, struct writequeue_entry, 774 list); 775 if (e->dirty) 776 free_entry(e); 777 } 778 spin_unlock_bh(&con->writequeue_lock); 779 780 con->rx_leftover = 0; 781 con->retries = 0; 782 clear_bit(CF_APP_LIMITED, &con->flags); 783 clear_bit(CF_RECV_PENDING, &con->flags); 784 clear_bit(CF_SEND_PENDING, &con->flags); 785 up_write(&con->sock_lock); 786 } 787 788 static struct processqueue_entry *new_processqueue_entry(int nodeid, 789 int buflen) 790 { 791 struct processqueue_entry *pentry; 792 793 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 794 if (!pentry) 795 return NULL; 796 797 pentry->buf = kmalloc(buflen, GFP_NOFS); 798 if (!pentry->buf) { 799 kfree(pentry); 800 return NULL; 801 } 802 803 pentry->nodeid = nodeid; 804 return pentry; 805 } 806 807 static void free_processqueue_entry(struct processqueue_entry *pentry) 808 { 809 kfree(pentry->buf); 810 kfree(pentry); 811 } 812 813 struct dlm_processed_nodes { 814 int nodeid; 815 816 struct list_head list; 817 }; 818 819 static void add_processed_node(int nodeid, struct list_head *processed_nodes) 820 { 821 struct dlm_processed_nodes *n; 822 823 list_for_each_entry(n, processed_nodes, list) { 824 /* we already remembered this node */ 825 if (n->nodeid == nodeid) 826 return; 827 } 828 829 /* if it's fails in worst case we simple don't send an ack back. 830 * We try it next time. 831 */ 832 n = kmalloc(sizeof(*n), GFP_NOFS); 833 if (!n) 834 return; 835 836 n->nodeid = nodeid; 837 list_add(&n->list, processed_nodes); 838 } 839 840 static void process_dlm_messages(struct work_struct *work) 841 { 842 struct dlm_processed_nodes *n, *n_tmp; 843 struct processqueue_entry *pentry; 844 LIST_HEAD(processed_nodes); 845 846 spin_lock(&processqueue_lock); 847 pentry = list_first_entry_or_null(&processqueue, 848 struct processqueue_entry, list); 849 if (WARN_ON_ONCE(!pentry)) { 850 spin_unlock(&processqueue_lock); 851 return; 852 } 853 854 list_del(&pentry->list); 855 spin_unlock(&processqueue_lock); 856 857 for (;;) { 858 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 859 pentry->buflen); 860 add_processed_node(pentry->nodeid, &processed_nodes); 861 free_processqueue_entry(pentry); 862 863 spin_lock(&processqueue_lock); 864 pentry = list_first_entry_or_null(&processqueue, 865 struct processqueue_entry, list); 866 if (!pentry) { 867 process_dlm_messages_pending = false; 868 spin_unlock(&processqueue_lock); 869 break; 870 } 871 872 list_del(&pentry->list); 873 spin_unlock(&processqueue_lock); 874 } 875 876 /* send ack back after we processed couple of messages */ 877 list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { 878 list_del(&n->list); 879 dlm_midcomms_receive_done(n->nodeid); 880 kfree(n); 881 } 882 } 883 884 /* Data received from remote end */ 885 static int receive_from_sock(struct connection *con, int buflen) 886 { 887 struct processqueue_entry *pentry; 888 int ret, buflen_real; 889 struct msghdr msg; 890 struct kvec iov; 891 892 pentry = new_processqueue_entry(con->nodeid, buflen); 893 if (!pentry) 894 return DLM_IO_RESCHED; 895 896 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 897 898 /* calculate new buffer parameter regarding last receive and 899 * possible leftover bytes 900 */ 901 iov.iov_base = pentry->buf + con->rx_leftover; 902 iov.iov_len = buflen - con->rx_leftover; 903 904 memset(&msg, 0, sizeof(msg)); 905 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 906 clear_bit(CF_RECV_INTR, &con->flags); 907 again: 908 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 909 msg.msg_flags); 910 trace_dlm_recv(con->nodeid, ret); 911 if (ret == -EAGAIN) { 912 lock_sock(con->sock->sk); 913 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 914 release_sock(con->sock->sk); 915 goto again; 916 } 917 918 clear_bit(CF_RECV_PENDING, &con->flags); 919 release_sock(con->sock->sk); 920 free_processqueue_entry(pentry); 921 return DLM_IO_END; 922 } else if (ret == 0) { 923 /* close will clear CF_RECV_PENDING */ 924 free_processqueue_entry(pentry); 925 return DLM_IO_EOF; 926 } else if (ret < 0) { 927 free_processqueue_entry(pentry); 928 return ret; 929 } 930 931 /* new buflen according readed bytes and leftover from last receive */ 932 buflen_real = ret + con->rx_leftover; 933 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 934 buflen_real); 935 if (ret < 0) { 936 free_processqueue_entry(pentry); 937 return ret; 938 } 939 940 pentry->buflen = ret; 941 942 /* calculate leftover bytes from process and put it into begin of 943 * the receive buffer, so next receive we have the full message 944 * at the start address of the receive buffer. 945 */ 946 con->rx_leftover = buflen_real - ret; 947 memmove(con->rx_leftover_buf, pentry->buf + ret, 948 con->rx_leftover); 949 950 spin_lock(&processqueue_lock); 951 list_add_tail(&pentry->list, &processqueue); 952 if (!process_dlm_messages_pending) { 953 process_dlm_messages_pending = true; 954 queue_work(process_workqueue, &process_work); 955 } 956 spin_unlock(&processqueue_lock); 957 958 return DLM_IO_SUCCESS; 959 } 960 961 /* Listening socket is busy, accept a connection */ 962 static int accept_from_sock(void) 963 { 964 struct sockaddr_storage peeraddr; 965 int len, idx, result, nodeid; 966 struct connection *newcon; 967 struct socket *newsock; 968 unsigned int mark; 969 970 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 971 if (result == -EAGAIN) 972 return DLM_IO_END; 973 else if (result < 0) 974 goto accept_err; 975 976 /* Get the connected socket's peer */ 977 memset(&peeraddr, 0, sizeof(peeraddr)); 978 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 979 if (len < 0) { 980 result = -ECONNABORTED; 981 goto accept_err; 982 } 983 984 /* Get the new node's NODEID */ 985 make_sockaddr(&peeraddr, 0, &len); 986 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 987 switch (peeraddr.ss_family) { 988 case AF_INET: { 989 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 990 991 log_print("connect from non cluster IPv4 node %pI4", 992 &sin->sin_addr); 993 break; 994 } 995 #if IS_ENABLED(CONFIG_IPV6) 996 case AF_INET6: { 997 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 998 999 log_print("connect from non cluster IPv6 node %pI6c", 1000 &sin6->sin6_addr); 1001 break; 1002 } 1003 #endif 1004 default: 1005 log_print("invalid family from non cluster node"); 1006 break; 1007 } 1008 1009 sock_release(newsock); 1010 return -1; 1011 } 1012 1013 log_print("got connection from %d", nodeid); 1014 1015 /* Check to see if we already have a connection to this node. This 1016 * could happen if the two nodes initiate a connection at roughly 1017 * the same time and the connections cross on the wire. 1018 * In this case we store the incoming one in "othercon" 1019 */ 1020 idx = srcu_read_lock(&connections_srcu); 1021 newcon = nodeid2con(nodeid, 0); 1022 if (WARN_ON_ONCE(!newcon)) { 1023 srcu_read_unlock(&connections_srcu, idx); 1024 result = -ENOENT; 1025 goto accept_err; 1026 } 1027 1028 sock_set_mark(newsock->sk, mark); 1029 1030 down_write(&newcon->sock_lock); 1031 if (newcon->sock) { 1032 struct connection *othercon = newcon->othercon; 1033 1034 if (!othercon) { 1035 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1036 if (!othercon) { 1037 log_print("failed to allocate incoming socket"); 1038 up_write(&newcon->sock_lock); 1039 srcu_read_unlock(&connections_srcu, idx); 1040 result = -ENOMEM; 1041 goto accept_err; 1042 } 1043 1044 dlm_con_init(othercon, nodeid); 1045 lockdep_set_subclass(&othercon->sock_lock, 1); 1046 newcon->othercon = othercon; 1047 set_bit(CF_IS_OTHERCON, &othercon->flags); 1048 } else { 1049 /* close other sock con if we have something new */ 1050 close_connection(othercon, false); 1051 } 1052 1053 down_write(&othercon->sock_lock); 1054 add_sock(newsock, othercon); 1055 1056 /* check if we receved something while adding */ 1057 lock_sock(othercon->sock->sk); 1058 lowcomms_queue_rwork(othercon); 1059 release_sock(othercon->sock->sk); 1060 up_write(&othercon->sock_lock); 1061 } 1062 else { 1063 /* accept copies the sk after we've saved the callbacks, so we 1064 don't want to save them a second time or comm errors will 1065 result in calling sk_error_report recursively. */ 1066 add_sock(newsock, newcon); 1067 1068 /* check if we receved something while adding */ 1069 lock_sock(newcon->sock->sk); 1070 lowcomms_queue_rwork(newcon); 1071 release_sock(newcon->sock->sk); 1072 } 1073 up_write(&newcon->sock_lock); 1074 srcu_read_unlock(&connections_srcu, idx); 1075 1076 return DLM_IO_SUCCESS; 1077 1078 accept_err: 1079 if (newsock) 1080 sock_release(newsock); 1081 1082 return result; 1083 } 1084 1085 /* 1086 * writequeue_entry_complete - try to delete and free write queue entry 1087 * @e: write queue entry to try to delete 1088 * @completed: bytes completed 1089 * 1090 * writequeue_lock must be held. 1091 */ 1092 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1093 { 1094 e->offset += completed; 1095 e->len -= completed; 1096 /* signal that page was half way transmitted */ 1097 e->dirty = true; 1098 1099 if (e->len == 0 && e->users == 0) 1100 free_entry(e); 1101 } 1102 1103 /* 1104 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1105 */ 1106 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1107 { 1108 struct sockaddr_storage localaddr; 1109 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1110 int i, addr_len, result = 0; 1111 1112 for (i = 0; i < dlm_local_count; i++) { 1113 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1114 make_sockaddr(&localaddr, port, &addr_len); 1115 1116 if (!i) 1117 result = kernel_bind(sock, addr, addr_len); 1118 else 1119 result = sock_bind_add(sock->sk, addr, addr_len); 1120 1121 if (result < 0) { 1122 log_print("Can't bind to %d addr number %d, %d.\n", 1123 port, i + 1, result); 1124 break; 1125 } 1126 } 1127 return result; 1128 } 1129 1130 /* Get local addresses */ 1131 static void init_local(void) 1132 { 1133 struct sockaddr_storage sas; 1134 int i; 1135 1136 dlm_local_count = 0; 1137 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1138 if (dlm_our_addr(&sas, i)) 1139 break; 1140 1141 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1142 } 1143 } 1144 1145 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1146 { 1147 struct writequeue_entry *entry; 1148 1149 entry = dlm_allocate_writequeue(); 1150 if (!entry) 1151 return NULL; 1152 1153 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1154 if (!entry->page) { 1155 dlm_free_writequeue(entry); 1156 return NULL; 1157 } 1158 1159 entry->offset = 0; 1160 entry->len = 0; 1161 entry->end = 0; 1162 entry->dirty = false; 1163 entry->con = con; 1164 entry->users = 1; 1165 kref_init(&entry->ref); 1166 return entry; 1167 } 1168 1169 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1170 char **ppc, void (*cb)(void *data), 1171 void *data) 1172 { 1173 struct writequeue_entry *e; 1174 1175 spin_lock_bh(&con->writequeue_lock); 1176 if (!list_empty(&con->writequeue)) { 1177 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1178 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1179 kref_get(&e->ref); 1180 1181 *ppc = page_address(e->page) + e->end; 1182 if (cb) 1183 cb(data); 1184 1185 e->end += len; 1186 e->users++; 1187 goto out; 1188 } 1189 } 1190 1191 e = new_writequeue_entry(con); 1192 if (!e) 1193 goto out; 1194 1195 kref_get(&e->ref); 1196 *ppc = page_address(e->page); 1197 e->end += len; 1198 if (cb) 1199 cb(data); 1200 1201 list_add_tail(&e->list, &con->writequeue); 1202 1203 out: 1204 spin_unlock_bh(&con->writequeue_lock); 1205 return e; 1206 }; 1207 1208 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1209 gfp_t allocation, char **ppc, 1210 void (*cb)(void *data), 1211 void *data) 1212 { 1213 struct writequeue_entry *e; 1214 struct dlm_msg *msg; 1215 1216 msg = dlm_allocate_msg(allocation); 1217 if (!msg) 1218 return NULL; 1219 1220 kref_init(&msg->ref); 1221 1222 e = new_wq_entry(con, len, ppc, cb, data); 1223 if (!e) { 1224 dlm_free_msg(msg); 1225 return NULL; 1226 } 1227 1228 msg->retransmit = false; 1229 msg->orig_msg = NULL; 1230 msg->ppc = *ppc; 1231 msg->len = len; 1232 msg->entry = e; 1233 1234 return msg; 1235 } 1236 1237 /* avoid false positive for nodes_srcu, unlock happens in 1238 * dlm_lowcomms_commit_msg which is a must call if success 1239 */ 1240 #ifndef __CHECKER__ 1241 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1242 char **ppc, void (*cb)(void *data), 1243 void *data) 1244 { 1245 struct connection *con; 1246 struct dlm_msg *msg; 1247 int idx; 1248 1249 if (len > DLM_MAX_SOCKET_BUFSIZE || 1250 len < sizeof(struct dlm_header)) { 1251 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1252 log_print("failed to allocate a buffer of size %d", len); 1253 WARN_ON_ONCE(1); 1254 return NULL; 1255 } 1256 1257 idx = srcu_read_lock(&connections_srcu); 1258 con = nodeid2con(nodeid, 0); 1259 if (WARN_ON_ONCE(!con)) { 1260 srcu_read_unlock(&connections_srcu, idx); 1261 return NULL; 1262 } 1263 1264 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1265 if (!msg) { 1266 srcu_read_unlock(&connections_srcu, idx); 1267 return NULL; 1268 } 1269 1270 /* for dlm_lowcomms_commit_msg() */ 1271 kref_get(&msg->ref); 1272 /* we assume if successful commit must called */ 1273 msg->idx = idx; 1274 return msg; 1275 } 1276 #endif 1277 1278 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1279 { 1280 struct writequeue_entry *e = msg->entry; 1281 struct connection *con = e->con; 1282 int users; 1283 1284 spin_lock_bh(&con->writequeue_lock); 1285 kref_get(&msg->ref); 1286 list_add(&msg->list, &e->msgs); 1287 1288 users = --e->users; 1289 if (users) 1290 goto out; 1291 1292 e->len = DLM_WQ_LENGTH_BYTES(e); 1293 1294 lowcomms_queue_swork(con); 1295 1296 out: 1297 spin_unlock_bh(&con->writequeue_lock); 1298 return; 1299 } 1300 1301 /* avoid false positive for nodes_srcu, lock was happen in 1302 * dlm_lowcomms_new_msg 1303 */ 1304 #ifndef __CHECKER__ 1305 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1306 { 1307 _dlm_lowcomms_commit_msg(msg); 1308 srcu_read_unlock(&connections_srcu, msg->idx); 1309 /* because dlm_lowcomms_new_msg() */ 1310 kref_put(&msg->ref, dlm_msg_release); 1311 } 1312 #endif 1313 1314 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1315 { 1316 kref_put(&msg->ref, dlm_msg_release); 1317 } 1318 1319 /* does not held connections_srcu, usage lowcomms_error_report only */ 1320 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1321 { 1322 struct dlm_msg *msg_resend; 1323 char *ppc; 1324 1325 if (msg->retransmit) 1326 return 1; 1327 1328 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1329 GFP_ATOMIC, &ppc, NULL, NULL); 1330 if (!msg_resend) 1331 return -ENOMEM; 1332 1333 msg->retransmit = true; 1334 kref_get(&msg->ref); 1335 msg_resend->orig_msg = msg; 1336 1337 memcpy(ppc, msg->ppc, msg->len); 1338 _dlm_lowcomms_commit_msg(msg_resend); 1339 dlm_lowcomms_put_msg(msg_resend); 1340 1341 return 0; 1342 } 1343 1344 /* Send a message */ 1345 static int send_to_sock(struct connection *con) 1346 { 1347 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1348 struct writequeue_entry *e; 1349 int len, offset, ret; 1350 1351 spin_lock_bh(&con->writequeue_lock); 1352 e = con_next_wq(con); 1353 if (!e) { 1354 clear_bit(CF_SEND_PENDING, &con->flags); 1355 spin_unlock_bh(&con->writequeue_lock); 1356 return DLM_IO_END; 1357 } 1358 1359 len = e->len; 1360 offset = e->offset; 1361 WARN_ON_ONCE(len == 0 && e->users == 0); 1362 spin_unlock_bh(&con->writequeue_lock); 1363 1364 ret = kernel_sendpage(con->sock, e->page, offset, len, 1365 msg_flags); 1366 trace_dlm_send(con->nodeid, ret); 1367 if (ret == -EAGAIN || ret == 0) { 1368 lock_sock(con->sock->sk); 1369 spin_lock_bh(&con->writequeue_lock); 1370 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1371 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1372 /* Notify TCP that we're limited by the 1373 * application window size. 1374 */ 1375 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1376 con->sock->sk->sk_write_pending++; 1377 1378 clear_bit(CF_SEND_PENDING, &con->flags); 1379 spin_unlock_bh(&con->writequeue_lock); 1380 release_sock(con->sock->sk); 1381 1382 /* wait for write_space() event */ 1383 return DLM_IO_END; 1384 } 1385 spin_unlock_bh(&con->writequeue_lock); 1386 release_sock(con->sock->sk); 1387 1388 return DLM_IO_RESCHED; 1389 } else if (ret < 0) { 1390 return ret; 1391 } 1392 1393 spin_lock_bh(&con->writequeue_lock); 1394 writequeue_entry_complete(e, ret); 1395 spin_unlock_bh(&con->writequeue_lock); 1396 1397 return DLM_IO_SUCCESS; 1398 } 1399 1400 static void clean_one_writequeue(struct connection *con) 1401 { 1402 struct writequeue_entry *e, *safe; 1403 1404 spin_lock_bh(&con->writequeue_lock); 1405 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1406 free_entry(e); 1407 } 1408 spin_unlock_bh(&con->writequeue_lock); 1409 } 1410 1411 static void connection_release(struct rcu_head *rcu) 1412 { 1413 struct connection *con = container_of(rcu, struct connection, rcu); 1414 1415 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1416 WARN_ON_ONCE(con->sock); 1417 kfree(con); 1418 } 1419 1420 /* Called from recovery when it knows that a node has 1421 left the cluster */ 1422 int dlm_lowcomms_close(int nodeid) 1423 { 1424 struct connection *con; 1425 int idx; 1426 1427 log_print("closing connection to node %d", nodeid); 1428 1429 idx = srcu_read_lock(&connections_srcu); 1430 con = nodeid2con(nodeid, 0); 1431 if (WARN_ON_ONCE(!con)) { 1432 srcu_read_unlock(&connections_srcu, idx); 1433 return -ENOENT; 1434 } 1435 1436 stop_connection_io(con); 1437 log_print("io handling for node: %d stopped", nodeid); 1438 close_connection(con, true); 1439 1440 spin_lock(&connections_lock); 1441 hlist_del_rcu(&con->list); 1442 spin_unlock(&connections_lock); 1443 1444 clean_one_writequeue(con); 1445 call_srcu(&connections_srcu, &con->rcu, connection_release); 1446 if (con->othercon) { 1447 clean_one_writequeue(con->othercon); 1448 if (con->othercon) 1449 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1450 } 1451 srcu_read_unlock(&connections_srcu, idx); 1452 1453 /* for debugging we print when we are done to compare with other 1454 * messages in between. This function need to be correctly synchronized 1455 * with io handling 1456 */ 1457 log_print("closing connection to node %d done", nodeid); 1458 1459 return 0; 1460 } 1461 1462 /* Receive worker function */ 1463 static void process_recv_sockets(struct work_struct *work) 1464 { 1465 struct connection *con = container_of(work, struct connection, rwork); 1466 int ret, buflen; 1467 1468 down_read(&con->sock_lock); 1469 if (!con->sock) { 1470 up_read(&con->sock_lock); 1471 return; 1472 } 1473 1474 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1475 do { 1476 ret = receive_from_sock(con, buflen); 1477 } while (ret == DLM_IO_SUCCESS); 1478 up_read(&con->sock_lock); 1479 1480 switch (ret) { 1481 case DLM_IO_END: 1482 /* CF_RECV_PENDING cleared */ 1483 break; 1484 case DLM_IO_EOF: 1485 close_connection(con, false); 1486 /* CF_RECV_PENDING cleared */ 1487 break; 1488 case DLM_IO_RESCHED: 1489 cond_resched(); 1490 queue_work(io_workqueue, &con->rwork); 1491 /* CF_RECV_PENDING not cleared */ 1492 break; 1493 default: 1494 if (ret < 0) { 1495 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1496 close_connection(con, false); 1497 } else { 1498 spin_lock_bh(&con->writequeue_lock); 1499 lowcomms_queue_swork(con); 1500 spin_unlock_bh(&con->writequeue_lock); 1501 } 1502 1503 /* CF_RECV_PENDING cleared for othercon 1504 * we trigger send queue if not already done 1505 * and process_send_sockets will handle it 1506 */ 1507 break; 1508 } 1509 1510 WARN_ON_ONCE(1); 1511 break; 1512 } 1513 } 1514 1515 static void process_listen_recv_socket(struct work_struct *work) 1516 { 1517 int ret; 1518 1519 if (WARN_ON_ONCE(!listen_con.sock)) 1520 return; 1521 1522 do { 1523 ret = accept_from_sock(); 1524 } while (ret == DLM_IO_SUCCESS); 1525 1526 if (ret < 0) 1527 log_print("critical error accepting connection: %d", ret); 1528 } 1529 1530 static int dlm_connect(struct connection *con) 1531 { 1532 struct sockaddr_storage addr; 1533 int result, addr_len; 1534 struct socket *sock; 1535 unsigned int mark; 1536 1537 memset(&addr, 0, sizeof(addr)); 1538 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1539 dlm_proto_ops->try_new_addr, &mark); 1540 if (result < 0) { 1541 log_print("no address for nodeid %d", con->nodeid); 1542 return result; 1543 } 1544 1545 /* Create a socket to communicate with */ 1546 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1547 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1548 if (result < 0) 1549 return result; 1550 1551 sock_set_mark(sock->sk, mark); 1552 dlm_proto_ops->sockopts(sock); 1553 1554 result = dlm_proto_ops->bind(sock); 1555 if (result < 0) { 1556 sock_release(sock); 1557 return result; 1558 } 1559 1560 add_sock(sock, con); 1561 1562 log_print_ratelimited("connecting to %d", con->nodeid); 1563 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1564 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1565 addr_len); 1566 switch (result) { 1567 case -EINPROGRESS: 1568 /* not an error */ 1569 fallthrough; 1570 case 0: 1571 break; 1572 default: 1573 if (result < 0) 1574 dlm_close_sock(&con->sock); 1575 1576 break; 1577 } 1578 1579 return result; 1580 } 1581 1582 /* Send worker function */ 1583 static void process_send_sockets(struct work_struct *work) 1584 { 1585 struct connection *con = container_of(work, struct connection, swork); 1586 int ret; 1587 1588 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1589 1590 down_read(&con->sock_lock); 1591 if (!con->sock) { 1592 up_read(&con->sock_lock); 1593 down_write(&con->sock_lock); 1594 if (!con->sock) { 1595 ret = dlm_connect(con); 1596 switch (ret) { 1597 case 0: 1598 break; 1599 case -EINPROGRESS: 1600 /* avoid spamming resched on connection 1601 * we might can switch to a state_change 1602 * event based mechanism if established 1603 */ 1604 msleep(100); 1605 break; 1606 default: 1607 /* CF_SEND_PENDING not cleared */ 1608 up_write(&con->sock_lock); 1609 log_print("connect to node %d try %d error %d", 1610 con->nodeid, con->retries++, ret); 1611 msleep(1000); 1612 /* For now we try forever to reconnect. In 1613 * future we should send a event to cluster 1614 * manager to fence itself after certain amount 1615 * of retries. 1616 */ 1617 queue_work(io_workqueue, &con->swork); 1618 return; 1619 } 1620 } 1621 downgrade_write(&con->sock_lock); 1622 } 1623 1624 do { 1625 ret = send_to_sock(con); 1626 } while (ret == DLM_IO_SUCCESS); 1627 up_read(&con->sock_lock); 1628 1629 switch (ret) { 1630 case DLM_IO_END: 1631 /* CF_SEND_PENDING cleared */ 1632 break; 1633 case DLM_IO_RESCHED: 1634 /* CF_SEND_PENDING not cleared */ 1635 cond_resched(); 1636 queue_work(io_workqueue, &con->swork); 1637 break; 1638 default: 1639 if (ret < 0) { 1640 close_connection(con, false); 1641 1642 /* CF_SEND_PENDING cleared */ 1643 spin_lock_bh(&con->writequeue_lock); 1644 lowcomms_queue_swork(con); 1645 spin_unlock_bh(&con->writequeue_lock); 1646 break; 1647 } 1648 1649 WARN_ON_ONCE(1); 1650 break; 1651 } 1652 } 1653 1654 static void work_stop(void) 1655 { 1656 if (io_workqueue) { 1657 destroy_workqueue(io_workqueue); 1658 io_workqueue = NULL; 1659 } 1660 1661 if (process_workqueue) { 1662 destroy_workqueue(process_workqueue); 1663 process_workqueue = NULL; 1664 } 1665 } 1666 1667 static int work_start(void) 1668 { 1669 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM, 1670 0); 1671 if (!io_workqueue) { 1672 log_print("can't start dlm_io"); 1673 return -ENOMEM; 1674 } 1675 1676 /* ordered dlm message process queue, 1677 * should be converted to a tasklet 1678 */ 1679 process_workqueue = alloc_ordered_workqueue("dlm_process", 1680 WQ_HIGHPRI | WQ_MEM_RECLAIM); 1681 if (!process_workqueue) { 1682 log_print("can't start dlm_process"); 1683 destroy_workqueue(io_workqueue); 1684 io_workqueue = NULL; 1685 return -ENOMEM; 1686 } 1687 1688 return 0; 1689 } 1690 1691 void dlm_lowcomms_shutdown(void) 1692 { 1693 /* stop lowcomms_listen_data_ready calls */ 1694 lock_sock(listen_con.sock->sk); 1695 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1696 release_sock(listen_con.sock->sk); 1697 1698 cancel_work_sync(&listen_con.rwork); 1699 dlm_close_sock(&listen_con.sock); 1700 1701 flush_workqueue(process_workqueue); 1702 } 1703 1704 void dlm_lowcomms_shutdown_node(int nodeid, bool force) 1705 { 1706 struct connection *con; 1707 int idx; 1708 1709 idx = srcu_read_lock(&connections_srcu); 1710 con = nodeid2con(nodeid, 0); 1711 if (WARN_ON_ONCE(!con)) { 1712 srcu_read_unlock(&connections_srcu, idx); 1713 return; 1714 } 1715 1716 flush_work(&con->swork); 1717 stop_connection_io(con); 1718 WARN_ON_ONCE(!force && !list_empty(&con->writequeue)); 1719 close_connection(con, true); 1720 clean_one_writequeue(con); 1721 if (con->othercon) 1722 clean_one_writequeue(con->othercon); 1723 allow_connection_io(con); 1724 srcu_read_unlock(&connections_srcu, idx); 1725 } 1726 1727 void dlm_lowcomms_stop(void) 1728 { 1729 work_stop(); 1730 dlm_proto_ops = NULL; 1731 } 1732 1733 static int dlm_listen_for_all(void) 1734 { 1735 struct socket *sock; 1736 int result; 1737 1738 log_print("Using %s for communications", 1739 dlm_proto_ops->name); 1740 1741 result = dlm_proto_ops->listen_validate(); 1742 if (result < 0) 1743 return result; 1744 1745 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1746 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1747 if (result < 0) { 1748 log_print("Can't create comms socket: %d", result); 1749 return result; 1750 } 1751 1752 sock_set_mark(sock->sk, dlm_config.ci_mark); 1753 dlm_proto_ops->listen_sockopts(sock); 1754 1755 result = dlm_proto_ops->listen_bind(sock); 1756 if (result < 0) 1757 goto out; 1758 1759 lock_sock(sock->sk); 1760 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1761 listen_sock.sk_write_space = sock->sk->sk_write_space; 1762 listen_sock.sk_error_report = sock->sk->sk_error_report; 1763 listen_sock.sk_state_change = sock->sk->sk_state_change; 1764 1765 listen_con.sock = sock; 1766 1767 sock->sk->sk_allocation = GFP_NOFS; 1768 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1769 release_sock(sock->sk); 1770 1771 result = sock->ops->listen(sock, 5); 1772 if (result < 0) { 1773 dlm_close_sock(&listen_con.sock); 1774 return result; 1775 } 1776 1777 return 0; 1778 1779 out: 1780 sock_release(sock); 1781 return result; 1782 } 1783 1784 static int dlm_tcp_bind(struct socket *sock) 1785 { 1786 struct sockaddr_storage src_addr; 1787 int result, addr_len; 1788 1789 /* Bind to our cluster-known address connecting to avoid 1790 * routing problems. 1791 */ 1792 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1793 make_sockaddr(&src_addr, 0, &addr_len); 1794 1795 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1796 addr_len); 1797 if (result < 0) { 1798 /* This *may* not indicate a critical error */ 1799 log_print("could not bind for connect: %d", result); 1800 } 1801 1802 return 0; 1803 } 1804 1805 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1806 struct sockaddr *addr, int addr_len) 1807 { 1808 return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1809 } 1810 1811 static int dlm_tcp_listen_validate(void) 1812 { 1813 /* We don't support multi-homed hosts */ 1814 if (dlm_local_count > 1) { 1815 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1816 return -EINVAL; 1817 } 1818 1819 return 0; 1820 } 1821 1822 static void dlm_tcp_sockopts(struct socket *sock) 1823 { 1824 /* Turn off Nagle's algorithm */ 1825 tcp_sock_set_nodelay(sock->sk); 1826 } 1827 1828 static void dlm_tcp_listen_sockopts(struct socket *sock) 1829 { 1830 dlm_tcp_sockopts(sock); 1831 sock_set_reuseaddr(sock->sk); 1832 } 1833 1834 static int dlm_tcp_listen_bind(struct socket *sock) 1835 { 1836 int addr_len; 1837 1838 /* Bind to our port */ 1839 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1840 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1841 addr_len); 1842 } 1843 1844 static const struct dlm_proto_ops dlm_tcp_ops = { 1845 .name = "TCP", 1846 .proto = IPPROTO_TCP, 1847 .connect = dlm_tcp_connect, 1848 .sockopts = dlm_tcp_sockopts, 1849 .bind = dlm_tcp_bind, 1850 .listen_validate = dlm_tcp_listen_validate, 1851 .listen_sockopts = dlm_tcp_listen_sockopts, 1852 .listen_bind = dlm_tcp_listen_bind, 1853 }; 1854 1855 static int dlm_sctp_bind(struct socket *sock) 1856 { 1857 return sctp_bind_addrs(sock, 0); 1858 } 1859 1860 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1861 struct sockaddr *addr, int addr_len) 1862 { 1863 int ret; 1864 1865 /* 1866 * Make sock->ops->connect() function return in specified time, 1867 * since O_NONBLOCK argument in connect() function does not work here, 1868 * then, we should restore the default value of this attribute. 1869 */ 1870 sock_set_sndtimeo(sock->sk, 5); 1871 ret = sock->ops->connect(sock, addr, addr_len, 0); 1872 sock_set_sndtimeo(sock->sk, 0); 1873 return ret; 1874 } 1875 1876 static int dlm_sctp_listen_validate(void) 1877 { 1878 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1879 log_print("SCTP is not enabled by this kernel"); 1880 return -EOPNOTSUPP; 1881 } 1882 1883 request_module("sctp"); 1884 return 0; 1885 } 1886 1887 static int dlm_sctp_bind_listen(struct socket *sock) 1888 { 1889 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1890 } 1891 1892 static void dlm_sctp_sockopts(struct socket *sock) 1893 { 1894 /* Turn off Nagle's algorithm */ 1895 sctp_sock_set_nodelay(sock->sk); 1896 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1897 } 1898 1899 static const struct dlm_proto_ops dlm_sctp_ops = { 1900 .name = "SCTP", 1901 .proto = IPPROTO_SCTP, 1902 .try_new_addr = true, 1903 .connect = dlm_sctp_connect, 1904 .sockopts = dlm_sctp_sockopts, 1905 .bind = dlm_sctp_bind, 1906 .listen_validate = dlm_sctp_listen_validate, 1907 .listen_sockopts = dlm_sctp_sockopts, 1908 .listen_bind = dlm_sctp_bind_listen, 1909 }; 1910 1911 int dlm_lowcomms_start(void) 1912 { 1913 int error; 1914 1915 init_local(); 1916 if (!dlm_local_count) { 1917 error = -ENOTCONN; 1918 log_print("no local IP address has been set"); 1919 goto fail; 1920 } 1921 1922 error = work_start(); 1923 if (error) 1924 goto fail; 1925 1926 /* Start listening */ 1927 switch (dlm_config.ci_protocol) { 1928 case DLM_PROTO_TCP: 1929 dlm_proto_ops = &dlm_tcp_ops; 1930 break; 1931 case DLM_PROTO_SCTP: 1932 dlm_proto_ops = &dlm_sctp_ops; 1933 break; 1934 default: 1935 log_print("Invalid protocol identifier %d set", 1936 dlm_config.ci_protocol); 1937 error = -EINVAL; 1938 goto fail_proto_ops; 1939 } 1940 1941 error = dlm_listen_for_all(); 1942 if (error) 1943 goto fail_listen; 1944 1945 return 0; 1946 1947 fail_listen: 1948 dlm_proto_ops = NULL; 1949 fail_proto_ops: 1950 work_stop(); 1951 fail: 1952 return error; 1953 } 1954 1955 void dlm_lowcomms_init(void) 1956 { 1957 int i; 1958 1959 for (i = 0; i < CONN_HASH_SIZE; i++) 1960 INIT_HLIST_HEAD(&connection_hash[i]); 1961 1962 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1963 } 1964 1965 void dlm_lowcomms_exit(void) 1966 { 1967 struct connection *con; 1968 int i, idx; 1969 1970 idx = srcu_read_lock(&connections_srcu); 1971 for (i = 0; i < CONN_HASH_SIZE; i++) { 1972 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1973 spin_lock(&connections_lock); 1974 hlist_del_rcu(&con->list); 1975 spin_unlock(&connections_lock); 1976 1977 if (con->othercon) 1978 call_srcu(&connections_srcu, &con->othercon->rcu, 1979 connection_release); 1980 call_srcu(&connections_srcu, &con->rcu, connection_release); 1981 } 1982 } 1983 srcu_read_unlock(&connections_srcu, idx); 1984 } 1985