1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 58 #include "dlm_internal.h" 59 #include "lowcomms.h" 60 #include "midcomms.h" 61 #include "memory.h" 62 #include "config.h" 63 64 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(5000) 65 #define NEEDED_RMEM (4*1024*1024) 66 67 struct connection { 68 struct socket *sock; /* NULL if not connected */ 69 uint32_t nodeid; /* So we know who we are in the list */ 70 /* this semaphore is used to allow parallel recv/send in read 71 * lock mode. When we release a sock we need to held the write lock. 72 * 73 * However this is locking code and not nice. When we remove the 74 * othercon handling we can look into other mechanism to synchronize 75 * io handling to call sock_release() at the right time. 76 */ 77 struct rw_semaphore sock_lock; 78 unsigned long flags; 79 #define CF_APP_LIMITED 0 80 #define CF_RECV_PENDING 1 81 #define CF_SEND_PENDING 2 82 #define CF_RECV_INTR 3 83 #define CF_IO_STOP 4 84 #define CF_IS_OTHERCON 5 85 struct list_head writequeue; /* List of outgoing writequeue_entries */ 86 spinlock_t writequeue_lock; 87 int retries; 88 struct hlist_node list; 89 /* due some connect()/accept() races we currently have this cross over 90 * connection attempt second connection for one node. 91 * 92 * There is a solution to avoid the race by introducing a connect 93 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 94 * connect. Otherside can connect but will only be considered that 95 * the other side wants to have a reconnect. 96 * 97 * However changing to this behaviour will break backwards compatible. 98 * In a DLM protocol major version upgrade we should remove this! 99 */ 100 struct connection *othercon; 101 struct work_struct rwork; /* receive worker */ 102 struct work_struct swork; /* send worker */ 103 wait_queue_head_t shutdown_wait; 104 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 105 int rx_leftover; 106 int mark; 107 int addr_count; 108 int curr_addr_index; 109 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 110 spinlock_t addrs_lock; 111 struct rcu_head rcu; 112 }; 113 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 114 115 struct listen_connection { 116 struct socket *sock; 117 struct work_struct rwork; 118 }; 119 120 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 121 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 122 123 /* An entry waiting to be sent */ 124 struct writequeue_entry { 125 struct list_head list; 126 struct page *page; 127 int offset; 128 int len; 129 int end; 130 int users; 131 bool dirty; 132 struct connection *con; 133 struct list_head msgs; 134 struct kref ref; 135 }; 136 137 struct dlm_msg { 138 struct writequeue_entry *entry; 139 struct dlm_msg *orig_msg; 140 bool retransmit; 141 void *ppc; 142 int len; 143 int idx; /* new()/commit() idx exchange */ 144 145 struct list_head list; 146 struct kref ref; 147 }; 148 149 struct processqueue_entry { 150 unsigned char *buf; 151 int nodeid; 152 int buflen; 153 154 struct list_head list; 155 }; 156 157 struct dlm_proto_ops { 158 bool try_new_addr; 159 const char *name; 160 int proto; 161 162 int (*connect)(struct connection *con, struct socket *sock, 163 struct sockaddr *addr, int addr_len); 164 void (*sockopts)(struct socket *sock); 165 int (*bind)(struct socket *sock); 166 int (*listen_validate)(void); 167 void (*listen_sockopts)(struct socket *sock); 168 int (*listen_bind)(struct socket *sock); 169 }; 170 171 static struct listen_sock_callbacks { 172 void (*sk_error_report)(struct sock *); 173 void (*sk_data_ready)(struct sock *); 174 void (*sk_state_change)(struct sock *); 175 void (*sk_write_space)(struct sock *); 176 } listen_sock; 177 178 static struct listen_connection listen_con; 179 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 180 static int dlm_local_count; 181 182 /* Work queues */ 183 static struct workqueue_struct *io_workqueue; 184 static struct workqueue_struct *process_workqueue; 185 186 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 187 static DEFINE_SPINLOCK(connections_lock); 188 DEFINE_STATIC_SRCU(connections_srcu); 189 190 static const struct dlm_proto_ops *dlm_proto_ops; 191 192 #define DLM_IO_SUCCESS 0 193 #define DLM_IO_END 1 194 #define DLM_IO_EOF 2 195 #define DLM_IO_RESCHED 3 196 197 static void process_recv_sockets(struct work_struct *work); 198 static void process_send_sockets(struct work_struct *work); 199 static void process_dlm_messages(struct work_struct *work); 200 201 static DECLARE_WORK(process_work, process_dlm_messages); 202 static DEFINE_SPINLOCK(processqueue_lock); 203 static bool process_dlm_messages_pending; 204 static LIST_HEAD(processqueue); 205 206 bool dlm_lowcomms_is_running(void) 207 { 208 return !!listen_con.sock; 209 } 210 211 static void lowcomms_queue_swork(struct connection *con) 212 { 213 assert_spin_locked(&con->writequeue_lock); 214 215 if (!test_bit(CF_IO_STOP, &con->flags) && 216 !test_bit(CF_APP_LIMITED, &con->flags) && 217 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 218 queue_work(io_workqueue, &con->swork); 219 } 220 221 static void lowcomms_queue_rwork(struct connection *con) 222 { 223 #ifdef CONFIG_LOCKDEP 224 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 225 #endif 226 227 if (!test_bit(CF_IO_STOP, &con->flags) && 228 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 229 queue_work(io_workqueue, &con->rwork); 230 } 231 232 static void writequeue_entry_ctor(void *data) 233 { 234 struct writequeue_entry *entry = data; 235 236 INIT_LIST_HEAD(&entry->msgs); 237 } 238 239 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 240 { 241 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 242 0, 0, writequeue_entry_ctor); 243 } 244 245 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 246 { 247 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 248 } 249 250 /* need to held writequeue_lock */ 251 static struct writequeue_entry *con_next_wq(struct connection *con) 252 { 253 struct writequeue_entry *e; 254 255 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 256 list); 257 /* if len is zero nothing is to send, if there are users filling 258 * buffers we wait until the users are done so we can send more. 259 */ 260 if (!e || e->users || e->len == 0) 261 return NULL; 262 263 return e; 264 } 265 266 static struct connection *__find_con(int nodeid, int r) 267 { 268 struct connection *con; 269 270 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 271 if (con->nodeid == nodeid) 272 return con; 273 } 274 275 return NULL; 276 } 277 278 static void dlm_con_init(struct connection *con, int nodeid) 279 { 280 con->nodeid = nodeid; 281 init_rwsem(&con->sock_lock); 282 INIT_LIST_HEAD(&con->writequeue); 283 spin_lock_init(&con->writequeue_lock); 284 INIT_WORK(&con->swork, process_send_sockets); 285 INIT_WORK(&con->rwork, process_recv_sockets); 286 spin_lock_init(&con->addrs_lock); 287 init_waitqueue_head(&con->shutdown_wait); 288 } 289 290 /* 291 * If 'allocation' is zero then we don't attempt to create a new 292 * connection structure for this node. 293 */ 294 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 295 { 296 struct connection *con, *tmp; 297 int r; 298 299 r = nodeid_hash(nodeid); 300 con = __find_con(nodeid, r); 301 if (con || !alloc) 302 return con; 303 304 con = kzalloc(sizeof(*con), alloc); 305 if (!con) 306 return NULL; 307 308 dlm_con_init(con, nodeid); 309 310 spin_lock(&connections_lock); 311 /* Because multiple workqueues/threads calls this function it can 312 * race on multiple cpu's. Instead of locking hot path __find_con() 313 * we just check in rare cases of recently added nodes again 314 * under protection of connections_lock. If this is the case we 315 * abort our connection creation and return the existing connection. 316 */ 317 tmp = __find_con(nodeid, r); 318 if (tmp) { 319 spin_unlock(&connections_lock); 320 kfree(con); 321 return tmp; 322 } 323 324 hlist_add_head_rcu(&con->list, &connection_hash[r]); 325 spin_unlock(&connections_lock); 326 327 return con; 328 } 329 330 static int addr_compare(const struct sockaddr_storage *x, 331 const struct sockaddr_storage *y) 332 { 333 switch (x->ss_family) { 334 case AF_INET: { 335 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 336 struct sockaddr_in *siny = (struct sockaddr_in *)y; 337 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 338 return 0; 339 if (sinx->sin_port != siny->sin_port) 340 return 0; 341 break; 342 } 343 case AF_INET6: { 344 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 345 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 346 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 347 return 0; 348 if (sinx->sin6_port != siny->sin6_port) 349 return 0; 350 break; 351 } 352 default: 353 return 0; 354 } 355 return 1; 356 } 357 358 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 359 struct sockaddr *sa_out, bool try_new_addr, 360 unsigned int *mark) 361 { 362 struct sockaddr_storage sas; 363 struct connection *con; 364 int idx; 365 366 if (!dlm_local_count) 367 return -1; 368 369 idx = srcu_read_lock(&connections_srcu); 370 con = nodeid2con(nodeid, 0); 371 if (!con) { 372 srcu_read_unlock(&connections_srcu, idx); 373 return -ENOENT; 374 } 375 376 spin_lock(&con->addrs_lock); 377 if (!con->addr_count) { 378 spin_unlock(&con->addrs_lock); 379 srcu_read_unlock(&connections_srcu, idx); 380 return -ENOENT; 381 } 382 383 memcpy(&sas, &con->addr[con->curr_addr_index], 384 sizeof(struct sockaddr_storage)); 385 386 if (try_new_addr) { 387 con->curr_addr_index++; 388 if (con->curr_addr_index == con->addr_count) 389 con->curr_addr_index = 0; 390 } 391 392 *mark = con->mark; 393 spin_unlock(&con->addrs_lock); 394 395 if (sas_out) 396 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 397 398 if (!sa_out) { 399 srcu_read_unlock(&connections_srcu, idx); 400 return 0; 401 } 402 403 if (dlm_local_addr[0].ss_family == AF_INET) { 404 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 405 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 406 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 407 } else { 408 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 409 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 410 ret6->sin6_addr = in6->sin6_addr; 411 } 412 413 srcu_read_unlock(&connections_srcu, idx); 414 return 0; 415 } 416 417 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 418 unsigned int *mark) 419 { 420 struct connection *con; 421 int i, idx, addr_i; 422 423 idx = srcu_read_lock(&connections_srcu); 424 for (i = 0; i < CONN_HASH_SIZE; i++) { 425 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 426 WARN_ON_ONCE(!con->addr_count); 427 428 spin_lock(&con->addrs_lock); 429 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 430 if (addr_compare(&con->addr[addr_i], addr)) { 431 *nodeid = con->nodeid; 432 *mark = con->mark; 433 spin_unlock(&con->addrs_lock); 434 srcu_read_unlock(&connections_srcu, idx); 435 return 0; 436 } 437 } 438 spin_unlock(&con->addrs_lock); 439 } 440 } 441 srcu_read_unlock(&connections_srcu, idx); 442 443 return -ENOENT; 444 } 445 446 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 447 const struct sockaddr_storage *addr) 448 { 449 int i; 450 451 for (i = 0; i < con->addr_count; i++) { 452 if (addr_compare(&con->addr[i], addr)) 453 return true; 454 } 455 456 return false; 457 } 458 459 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 460 { 461 struct connection *con; 462 bool ret, idx; 463 464 idx = srcu_read_lock(&connections_srcu); 465 con = nodeid2con(nodeid, GFP_NOFS); 466 if (!con) { 467 srcu_read_unlock(&connections_srcu, idx); 468 return -ENOMEM; 469 } 470 471 spin_lock(&con->addrs_lock); 472 if (!con->addr_count) { 473 memcpy(&con->addr[0], addr, sizeof(*addr)); 474 con->addr_count = 1; 475 con->mark = dlm_config.ci_mark; 476 spin_unlock(&con->addrs_lock); 477 srcu_read_unlock(&connections_srcu, idx); 478 return 0; 479 } 480 481 ret = dlm_lowcomms_con_has_addr(con, addr); 482 if (ret) { 483 spin_unlock(&con->addrs_lock); 484 srcu_read_unlock(&connections_srcu, idx); 485 return -EEXIST; 486 } 487 488 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 489 spin_unlock(&con->addrs_lock); 490 srcu_read_unlock(&connections_srcu, idx); 491 return -ENOSPC; 492 } 493 494 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 495 srcu_read_unlock(&connections_srcu, idx); 496 spin_unlock(&con->addrs_lock); 497 return 0; 498 } 499 500 /* Data available on socket or listen socket received a connect */ 501 static void lowcomms_data_ready(struct sock *sk) 502 { 503 struct connection *con = sock2con(sk); 504 505 set_bit(CF_RECV_INTR, &con->flags); 506 lowcomms_queue_rwork(con); 507 } 508 509 static void lowcomms_write_space(struct sock *sk) 510 { 511 struct connection *con = sock2con(sk); 512 513 clear_bit(SOCK_NOSPACE, &con->sock->flags); 514 515 spin_lock_bh(&con->writequeue_lock); 516 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 517 con->sock->sk->sk_write_pending--; 518 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 519 } 520 521 lowcomms_queue_swork(con); 522 spin_unlock_bh(&con->writequeue_lock); 523 } 524 525 static void lowcomms_state_change(struct sock *sk) 526 { 527 /* SCTP layer is not calling sk_data_ready when the connection 528 * is done, so we catch the signal through here. 529 */ 530 if (sk->sk_shutdown == RCV_SHUTDOWN) 531 lowcomms_data_ready(sk); 532 } 533 534 static void lowcomms_listen_data_ready(struct sock *sk) 535 { 536 queue_work(io_workqueue, &listen_con.rwork); 537 } 538 539 int dlm_lowcomms_connect_node(int nodeid) 540 { 541 struct connection *con; 542 int idx; 543 544 if (nodeid == dlm_our_nodeid()) 545 return 0; 546 547 idx = srcu_read_lock(&connections_srcu); 548 con = nodeid2con(nodeid, 0); 549 if (WARN_ON_ONCE(!con)) { 550 srcu_read_unlock(&connections_srcu, idx); 551 return -ENOENT; 552 } 553 554 down_read(&con->sock_lock); 555 if (!con->sock) { 556 spin_lock_bh(&con->writequeue_lock); 557 lowcomms_queue_swork(con); 558 spin_unlock_bh(&con->writequeue_lock); 559 } 560 up_read(&con->sock_lock); 561 srcu_read_unlock(&connections_srcu, idx); 562 563 cond_resched(); 564 return 0; 565 } 566 567 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 568 { 569 struct connection *con; 570 int idx; 571 572 idx = srcu_read_lock(&connections_srcu); 573 con = nodeid2con(nodeid, 0); 574 if (!con) { 575 srcu_read_unlock(&connections_srcu, idx); 576 return -ENOENT; 577 } 578 579 spin_lock(&con->addrs_lock); 580 con->mark = mark; 581 spin_unlock(&con->addrs_lock); 582 srcu_read_unlock(&connections_srcu, idx); 583 return 0; 584 } 585 586 static void lowcomms_error_report(struct sock *sk) 587 { 588 struct connection *con = sock2con(sk); 589 struct inet_sock *inet; 590 591 inet = inet_sk(sk); 592 switch (sk->sk_family) { 593 case AF_INET: 594 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 595 "sending to node %d at %pI4, dport %d, " 596 "sk_err=%d/%d\n", dlm_our_nodeid(), 597 con->nodeid, &inet->inet_daddr, 598 ntohs(inet->inet_dport), sk->sk_err, 599 sk->sk_err_soft); 600 break; 601 #if IS_ENABLED(CONFIG_IPV6) 602 case AF_INET6: 603 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 604 "sending to node %d at %pI6c, " 605 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 606 con->nodeid, &sk->sk_v6_daddr, 607 ntohs(inet->inet_dport), sk->sk_err, 608 sk->sk_err_soft); 609 break; 610 #endif 611 default: 612 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 613 "invalid socket family %d set, " 614 "sk_err=%d/%d\n", dlm_our_nodeid(), 615 sk->sk_family, sk->sk_err, sk->sk_err_soft); 616 break; 617 } 618 619 dlm_midcomms_unack_msg_resend(con->nodeid); 620 621 listen_sock.sk_error_report(sk); 622 } 623 624 static void restore_callbacks(struct sock *sk) 625 { 626 #ifdef CONFIG_LOCKDEP 627 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 628 #endif 629 630 sk->sk_user_data = NULL; 631 sk->sk_data_ready = listen_sock.sk_data_ready; 632 sk->sk_state_change = listen_sock.sk_state_change; 633 sk->sk_write_space = listen_sock.sk_write_space; 634 sk->sk_error_report = listen_sock.sk_error_report; 635 } 636 637 /* Make a socket active */ 638 static void add_sock(struct socket *sock, struct connection *con) 639 { 640 struct sock *sk = sock->sk; 641 642 lock_sock(sk); 643 con->sock = sock; 644 645 sk->sk_user_data = con; 646 sk->sk_data_ready = lowcomms_data_ready; 647 sk->sk_write_space = lowcomms_write_space; 648 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 649 sk->sk_state_change = lowcomms_state_change; 650 sk->sk_allocation = GFP_NOFS; 651 sk->sk_use_task_frag = false; 652 sk->sk_error_report = lowcomms_error_report; 653 release_sock(sk); 654 } 655 656 /* Add the port number to an IPv6 or 4 sockaddr and return the address 657 length */ 658 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 659 int *addr_len) 660 { 661 saddr->ss_family = dlm_local_addr[0].ss_family; 662 if (saddr->ss_family == AF_INET) { 663 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 664 in4_addr->sin_port = cpu_to_be16(port); 665 *addr_len = sizeof(struct sockaddr_in); 666 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 667 } else { 668 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 669 in6_addr->sin6_port = cpu_to_be16(port); 670 *addr_len = sizeof(struct sockaddr_in6); 671 } 672 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 673 } 674 675 static void dlm_page_release(struct kref *kref) 676 { 677 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 678 ref); 679 680 __free_page(e->page); 681 dlm_free_writequeue(e); 682 } 683 684 static void dlm_msg_release(struct kref *kref) 685 { 686 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 687 688 kref_put(&msg->entry->ref, dlm_page_release); 689 dlm_free_msg(msg); 690 } 691 692 static void free_entry(struct writequeue_entry *e) 693 { 694 struct dlm_msg *msg, *tmp; 695 696 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 697 if (msg->orig_msg) { 698 msg->orig_msg->retransmit = false; 699 kref_put(&msg->orig_msg->ref, dlm_msg_release); 700 } 701 702 list_del(&msg->list); 703 kref_put(&msg->ref, dlm_msg_release); 704 } 705 706 list_del(&e->list); 707 kref_put(&e->ref, dlm_page_release); 708 } 709 710 static void dlm_close_sock(struct socket **sock) 711 { 712 lock_sock((*sock)->sk); 713 restore_callbacks((*sock)->sk); 714 release_sock((*sock)->sk); 715 716 sock_release(*sock); 717 *sock = NULL; 718 } 719 720 static void allow_connection_io(struct connection *con) 721 { 722 if (con->othercon) 723 clear_bit(CF_IO_STOP, &con->othercon->flags); 724 clear_bit(CF_IO_STOP, &con->flags); 725 } 726 727 static void stop_connection_io(struct connection *con) 728 { 729 if (con->othercon) 730 stop_connection_io(con->othercon); 731 732 down_write(&con->sock_lock); 733 if (con->sock) { 734 lock_sock(con->sock->sk); 735 restore_callbacks(con->sock->sk); 736 737 spin_lock_bh(&con->writequeue_lock); 738 set_bit(CF_IO_STOP, &con->flags); 739 spin_unlock_bh(&con->writequeue_lock); 740 release_sock(con->sock->sk); 741 } else { 742 spin_lock_bh(&con->writequeue_lock); 743 set_bit(CF_IO_STOP, &con->flags); 744 spin_unlock_bh(&con->writequeue_lock); 745 } 746 up_write(&con->sock_lock); 747 748 cancel_work_sync(&con->swork); 749 cancel_work_sync(&con->rwork); 750 } 751 752 /* Close a remote connection and tidy up */ 753 static void close_connection(struct connection *con, bool and_other) 754 { 755 struct writequeue_entry *e; 756 757 if (con->othercon && and_other) 758 close_connection(con->othercon, false); 759 760 down_write(&con->sock_lock); 761 if (!con->sock) { 762 up_write(&con->sock_lock); 763 return; 764 } 765 766 dlm_close_sock(&con->sock); 767 768 /* if we send a writequeue entry only a half way, we drop the 769 * whole entry because reconnection and that we not start of the 770 * middle of a msg which will confuse the other end. 771 * 772 * we can always drop messages because retransmits, but what we 773 * cannot allow is to transmit half messages which may be processed 774 * at the other side. 775 * 776 * our policy is to start on a clean state when disconnects, we don't 777 * know what's send/received on transport layer in this case. 778 */ 779 spin_lock_bh(&con->writequeue_lock); 780 if (!list_empty(&con->writequeue)) { 781 e = list_first_entry(&con->writequeue, struct writequeue_entry, 782 list); 783 if (e->dirty) 784 free_entry(e); 785 } 786 spin_unlock_bh(&con->writequeue_lock); 787 788 con->rx_leftover = 0; 789 con->retries = 0; 790 clear_bit(CF_APP_LIMITED, &con->flags); 791 clear_bit(CF_RECV_PENDING, &con->flags); 792 clear_bit(CF_SEND_PENDING, &con->flags); 793 up_write(&con->sock_lock); 794 } 795 796 static void shutdown_connection(struct connection *con, bool and_other) 797 { 798 int ret; 799 800 if (con->othercon && and_other) 801 shutdown_connection(con->othercon, false); 802 803 flush_workqueue(io_workqueue); 804 down_read(&con->sock_lock); 805 /* nothing to shutdown */ 806 if (!con->sock) { 807 up_read(&con->sock_lock); 808 return; 809 } 810 811 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 812 up_read(&con->sock_lock); 813 if (ret) { 814 log_print("Connection %p failed to shutdown: %d will force close", 815 con, ret); 816 goto force_close; 817 } else { 818 ret = wait_event_timeout(con->shutdown_wait, !con->sock, 819 DLM_SHUTDOWN_WAIT_TIMEOUT); 820 if (ret == 0) { 821 log_print("Connection %p shutdown timed out, will force close", 822 con); 823 goto force_close; 824 } 825 } 826 827 return; 828 829 force_close: 830 close_connection(con, false); 831 } 832 833 static struct processqueue_entry *new_processqueue_entry(int nodeid, 834 int buflen) 835 { 836 struct processqueue_entry *pentry; 837 838 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 839 if (!pentry) 840 return NULL; 841 842 pentry->buf = kmalloc(buflen, GFP_NOFS); 843 if (!pentry->buf) { 844 kfree(pentry); 845 return NULL; 846 } 847 848 pentry->nodeid = nodeid; 849 return pentry; 850 } 851 852 static void free_processqueue_entry(struct processqueue_entry *pentry) 853 { 854 kfree(pentry->buf); 855 kfree(pentry); 856 } 857 858 struct dlm_processed_nodes { 859 int nodeid; 860 861 struct list_head list; 862 }; 863 864 static void add_processed_node(int nodeid, struct list_head *processed_nodes) 865 { 866 struct dlm_processed_nodes *n; 867 868 list_for_each_entry(n, processed_nodes, list) { 869 /* we already remembered this node */ 870 if (n->nodeid == nodeid) 871 return; 872 } 873 874 /* if it's fails in worst case we simple don't send an ack back. 875 * We try it next time. 876 */ 877 n = kmalloc(sizeof(*n), GFP_NOFS); 878 if (!n) 879 return; 880 881 n->nodeid = nodeid; 882 list_add(&n->list, processed_nodes); 883 } 884 885 static void process_dlm_messages(struct work_struct *work) 886 { 887 struct dlm_processed_nodes *n, *n_tmp; 888 struct processqueue_entry *pentry; 889 LIST_HEAD(processed_nodes); 890 891 spin_lock(&processqueue_lock); 892 pentry = list_first_entry_or_null(&processqueue, 893 struct processqueue_entry, list); 894 if (WARN_ON_ONCE(!pentry)) { 895 spin_unlock(&processqueue_lock); 896 return; 897 } 898 899 list_del(&pentry->list); 900 spin_unlock(&processqueue_lock); 901 902 for (;;) { 903 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 904 pentry->buflen); 905 add_processed_node(pentry->nodeid, &processed_nodes); 906 free_processqueue_entry(pentry); 907 908 spin_lock(&processqueue_lock); 909 pentry = list_first_entry_or_null(&processqueue, 910 struct processqueue_entry, list); 911 if (!pentry) { 912 process_dlm_messages_pending = false; 913 spin_unlock(&processqueue_lock); 914 break; 915 } 916 917 list_del(&pentry->list); 918 spin_unlock(&processqueue_lock); 919 } 920 921 /* send ack back after we processed couple of messages */ 922 list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { 923 list_del(&n->list); 924 dlm_midcomms_receive_done(n->nodeid); 925 kfree(n); 926 } 927 } 928 929 /* Data received from remote end */ 930 static int receive_from_sock(struct connection *con, int buflen) 931 { 932 struct processqueue_entry *pentry; 933 int ret, buflen_real; 934 struct msghdr msg; 935 struct kvec iov; 936 937 pentry = new_processqueue_entry(con->nodeid, buflen); 938 if (!pentry) 939 return DLM_IO_RESCHED; 940 941 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 942 943 /* calculate new buffer parameter regarding last receive and 944 * possible leftover bytes 945 */ 946 iov.iov_base = pentry->buf + con->rx_leftover; 947 iov.iov_len = buflen - con->rx_leftover; 948 949 memset(&msg, 0, sizeof(msg)); 950 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 951 clear_bit(CF_RECV_INTR, &con->flags); 952 again: 953 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 954 msg.msg_flags); 955 trace_dlm_recv(con->nodeid, ret); 956 if (ret == -EAGAIN) { 957 lock_sock(con->sock->sk); 958 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 959 release_sock(con->sock->sk); 960 goto again; 961 } 962 963 clear_bit(CF_RECV_PENDING, &con->flags); 964 release_sock(con->sock->sk); 965 free_processqueue_entry(pentry); 966 return DLM_IO_END; 967 } else if (ret == 0) { 968 /* close will clear CF_RECV_PENDING */ 969 free_processqueue_entry(pentry); 970 return DLM_IO_EOF; 971 } else if (ret < 0) { 972 free_processqueue_entry(pentry); 973 return ret; 974 } 975 976 /* new buflen according readed bytes and leftover from last receive */ 977 buflen_real = ret + con->rx_leftover; 978 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 979 buflen_real); 980 if (ret < 0) { 981 free_processqueue_entry(pentry); 982 return ret; 983 } 984 985 pentry->buflen = ret; 986 987 /* calculate leftover bytes from process and put it into begin of 988 * the receive buffer, so next receive we have the full message 989 * at the start address of the receive buffer. 990 */ 991 con->rx_leftover = buflen_real - ret; 992 memmove(con->rx_leftover_buf, pentry->buf + ret, 993 con->rx_leftover); 994 995 spin_lock(&processqueue_lock); 996 list_add_tail(&pentry->list, &processqueue); 997 if (!process_dlm_messages_pending) { 998 process_dlm_messages_pending = true; 999 queue_work(process_workqueue, &process_work); 1000 } 1001 spin_unlock(&processqueue_lock); 1002 1003 return DLM_IO_SUCCESS; 1004 } 1005 1006 /* Listening socket is busy, accept a connection */ 1007 static int accept_from_sock(void) 1008 { 1009 struct sockaddr_storage peeraddr; 1010 int len, idx, result, nodeid; 1011 struct connection *newcon; 1012 struct socket *newsock; 1013 unsigned int mark; 1014 1015 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 1016 if (result == -EAGAIN) 1017 return DLM_IO_END; 1018 else if (result < 0) 1019 goto accept_err; 1020 1021 /* Get the connected socket's peer */ 1022 memset(&peeraddr, 0, sizeof(peeraddr)); 1023 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 1024 if (len < 0) { 1025 result = -ECONNABORTED; 1026 goto accept_err; 1027 } 1028 1029 /* Get the new node's NODEID */ 1030 make_sockaddr(&peeraddr, 0, &len); 1031 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 1032 switch (peeraddr.ss_family) { 1033 case AF_INET: { 1034 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 1035 1036 log_print("connect from non cluster IPv4 node %pI4", 1037 &sin->sin_addr); 1038 break; 1039 } 1040 #if IS_ENABLED(CONFIG_IPV6) 1041 case AF_INET6: { 1042 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1043 1044 log_print("connect from non cluster IPv6 node %pI6c", 1045 &sin6->sin6_addr); 1046 break; 1047 } 1048 #endif 1049 default: 1050 log_print("invalid family from non cluster node"); 1051 break; 1052 } 1053 1054 sock_release(newsock); 1055 return -1; 1056 } 1057 1058 log_print("got connection from %d", nodeid); 1059 1060 /* Check to see if we already have a connection to this node. This 1061 * could happen if the two nodes initiate a connection at roughly 1062 * the same time and the connections cross on the wire. 1063 * In this case we store the incoming one in "othercon" 1064 */ 1065 idx = srcu_read_lock(&connections_srcu); 1066 newcon = nodeid2con(nodeid, 0); 1067 if (WARN_ON_ONCE(!newcon)) { 1068 srcu_read_unlock(&connections_srcu, idx); 1069 result = -ENOENT; 1070 goto accept_err; 1071 } 1072 1073 sock_set_mark(newsock->sk, mark); 1074 1075 down_write(&newcon->sock_lock); 1076 if (newcon->sock) { 1077 struct connection *othercon = newcon->othercon; 1078 1079 if (!othercon) { 1080 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1081 if (!othercon) { 1082 log_print("failed to allocate incoming socket"); 1083 up_write(&newcon->sock_lock); 1084 srcu_read_unlock(&connections_srcu, idx); 1085 result = -ENOMEM; 1086 goto accept_err; 1087 } 1088 1089 dlm_con_init(othercon, nodeid); 1090 lockdep_set_subclass(&othercon->sock_lock, 1); 1091 newcon->othercon = othercon; 1092 set_bit(CF_IS_OTHERCON, &othercon->flags); 1093 } else { 1094 /* close other sock con if we have something new */ 1095 close_connection(othercon, false); 1096 } 1097 1098 down_write(&othercon->sock_lock); 1099 add_sock(newsock, othercon); 1100 1101 /* check if we receved something while adding */ 1102 lock_sock(othercon->sock->sk); 1103 lowcomms_queue_rwork(othercon); 1104 release_sock(othercon->sock->sk); 1105 up_write(&othercon->sock_lock); 1106 } 1107 else { 1108 /* accept copies the sk after we've saved the callbacks, so we 1109 don't want to save them a second time or comm errors will 1110 result in calling sk_error_report recursively. */ 1111 add_sock(newsock, newcon); 1112 1113 /* check if we receved something while adding */ 1114 lock_sock(newcon->sock->sk); 1115 lowcomms_queue_rwork(newcon); 1116 release_sock(newcon->sock->sk); 1117 } 1118 up_write(&newcon->sock_lock); 1119 srcu_read_unlock(&connections_srcu, idx); 1120 1121 return DLM_IO_SUCCESS; 1122 1123 accept_err: 1124 if (newsock) 1125 sock_release(newsock); 1126 1127 return result; 1128 } 1129 1130 /* 1131 * writequeue_entry_complete - try to delete and free write queue entry 1132 * @e: write queue entry to try to delete 1133 * @completed: bytes completed 1134 * 1135 * writequeue_lock must be held. 1136 */ 1137 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1138 { 1139 e->offset += completed; 1140 e->len -= completed; 1141 /* signal that page was half way transmitted */ 1142 e->dirty = true; 1143 1144 if (e->len == 0 && e->users == 0) 1145 free_entry(e); 1146 } 1147 1148 /* 1149 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1150 */ 1151 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1152 { 1153 struct sockaddr_storage localaddr; 1154 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1155 int i, addr_len, result = 0; 1156 1157 for (i = 0; i < dlm_local_count; i++) { 1158 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1159 make_sockaddr(&localaddr, port, &addr_len); 1160 1161 if (!i) 1162 result = kernel_bind(sock, addr, addr_len); 1163 else 1164 result = sock_bind_add(sock->sk, addr, addr_len); 1165 1166 if (result < 0) { 1167 log_print("Can't bind to %d addr number %d, %d.\n", 1168 port, i + 1, result); 1169 break; 1170 } 1171 } 1172 return result; 1173 } 1174 1175 /* Get local addresses */ 1176 static void init_local(void) 1177 { 1178 struct sockaddr_storage sas; 1179 int i; 1180 1181 dlm_local_count = 0; 1182 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1183 if (dlm_our_addr(&sas, i)) 1184 break; 1185 1186 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1187 } 1188 } 1189 1190 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1191 { 1192 struct writequeue_entry *entry; 1193 1194 entry = dlm_allocate_writequeue(); 1195 if (!entry) 1196 return NULL; 1197 1198 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1199 if (!entry->page) { 1200 dlm_free_writequeue(entry); 1201 return NULL; 1202 } 1203 1204 entry->offset = 0; 1205 entry->len = 0; 1206 entry->end = 0; 1207 entry->dirty = false; 1208 entry->con = con; 1209 entry->users = 1; 1210 kref_init(&entry->ref); 1211 return entry; 1212 } 1213 1214 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1215 char **ppc, void (*cb)(void *data), 1216 void *data) 1217 { 1218 struct writequeue_entry *e; 1219 1220 spin_lock_bh(&con->writequeue_lock); 1221 if (!list_empty(&con->writequeue)) { 1222 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1223 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1224 kref_get(&e->ref); 1225 1226 *ppc = page_address(e->page) + e->end; 1227 if (cb) 1228 cb(data); 1229 1230 e->end += len; 1231 e->users++; 1232 goto out; 1233 } 1234 } 1235 1236 e = new_writequeue_entry(con); 1237 if (!e) 1238 goto out; 1239 1240 kref_get(&e->ref); 1241 *ppc = page_address(e->page); 1242 e->end += len; 1243 if (cb) 1244 cb(data); 1245 1246 list_add_tail(&e->list, &con->writequeue); 1247 1248 out: 1249 spin_unlock_bh(&con->writequeue_lock); 1250 return e; 1251 }; 1252 1253 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1254 gfp_t allocation, char **ppc, 1255 void (*cb)(void *data), 1256 void *data) 1257 { 1258 struct writequeue_entry *e; 1259 struct dlm_msg *msg; 1260 1261 msg = dlm_allocate_msg(allocation); 1262 if (!msg) 1263 return NULL; 1264 1265 kref_init(&msg->ref); 1266 1267 e = new_wq_entry(con, len, ppc, cb, data); 1268 if (!e) { 1269 dlm_free_msg(msg); 1270 return NULL; 1271 } 1272 1273 msg->retransmit = false; 1274 msg->orig_msg = NULL; 1275 msg->ppc = *ppc; 1276 msg->len = len; 1277 msg->entry = e; 1278 1279 return msg; 1280 } 1281 1282 /* avoid false positive for nodes_srcu, unlock happens in 1283 * dlm_lowcomms_commit_msg which is a must call if success 1284 */ 1285 #ifndef __CHECKER__ 1286 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1287 char **ppc, void (*cb)(void *data), 1288 void *data) 1289 { 1290 struct connection *con; 1291 struct dlm_msg *msg; 1292 int idx; 1293 1294 if (len > DLM_MAX_SOCKET_BUFSIZE || 1295 len < sizeof(struct dlm_header)) { 1296 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1297 log_print("failed to allocate a buffer of size %d", len); 1298 WARN_ON_ONCE(1); 1299 return NULL; 1300 } 1301 1302 idx = srcu_read_lock(&connections_srcu); 1303 con = nodeid2con(nodeid, 0); 1304 if (WARN_ON_ONCE(!con)) { 1305 srcu_read_unlock(&connections_srcu, idx); 1306 return NULL; 1307 } 1308 1309 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1310 if (!msg) { 1311 srcu_read_unlock(&connections_srcu, idx); 1312 return NULL; 1313 } 1314 1315 /* for dlm_lowcomms_commit_msg() */ 1316 kref_get(&msg->ref); 1317 /* we assume if successful commit must called */ 1318 msg->idx = idx; 1319 return msg; 1320 } 1321 #endif 1322 1323 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1324 { 1325 struct writequeue_entry *e = msg->entry; 1326 struct connection *con = e->con; 1327 int users; 1328 1329 spin_lock_bh(&con->writequeue_lock); 1330 kref_get(&msg->ref); 1331 list_add(&msg->list, &e->msgs); 1332 1333 users = --e->users; 1334 if (users) 1335 goto out; 1336 1337 e->len = DLM_WQ_LENGTH_BYTES(e); 1338 1339 lowcomms_queue_swork(con); 1340 1341 out: 1342 spin_unlock_bh(&con->writequeue_lock); 1343 return; 1344 } 1345 1346 /* avoid false positive for nodes_srcu, lock was happen in 1347 * dlm_lowcomms_new_msg 1348 */ 1349 #ifndef __CHECKER__ 1350 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1351 { 1352 _dlm_lowcomms_commit_msg(msg); 1353 srcu_read_unlock(&connections_srcu, msg->idx); 1354 /* because dlm_lowcomms_new_msg() */ 1355 kref_put(&msg->ref, dlm_msg_release); 1356 } 1357 #endif 1358 1359 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1360 { 1361 kref_put(&msg->ref, dlm_msg_release); 1362 } 1363 1364 /* does not held connections_srcu, usage lowcomms_error_report only */ 1365 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1366 { 1367 struct dlm_msg *msg_resend; 1368 char *ppc; 1369 1370 if (msg->retransmit) 1371 return 1; 1372 1373 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1374 GFP_ATOMIC, &ppc, NULL, NULL); 1375 if (!msg_resend) 1376 return -ENOMEM; 1377 1378 msg->retransmit = true; 1379 kref_get(&msg->ref); 1380 msg_resend->orig_msg = msg; 1381 1382 memcpy(ppc, msg->ppc, msg->len); 1383 _dlm_lowcomms_commit_msg(msg_resend); 1384 dlm_lowcomms_put_msg(msg_resend); 1385 1386 return 0; 1387 } 1388 1389 /* Send a message */ 1390 static int send_to_sock(struct connection *con) 1391 { 1392 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1393 struct writequeue_entry *e; 1394 int len, offset, ret; 1395 1396 spin_lock_bh(&con->writequeue_lock); 1397 e = con_next_wq(con); 1398 if (!e) { 1399 clear_bit(CF_SEND_PENDING, &con->flags); 1400 spin_unlock_bh(&con->writequeue_lock); 1401 return DLM_IO_END; 1402 } 1403 1404 len = e->len; 1405 offset = e->offset; 1406 WARN_ON_ONCE(len == 0 && e->users == 0); 1407 spin_unlock_bh(&con->writequeue_lock); 1408 1409 ret = kernel_sendpage(con->sock, e->page, offset, len, 1410 msg_flags); 1411 trace_dlm_send(con->nodeid, ret); 1412 if (ret == -EAGAIN || ret == 0) { 1413 lock_sock(con->sock->sk); 1414 spin_lock_bh(&con->writequeue_lock); 1415 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1416 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1417 /* Notify TCP that we're limited by the 1418 * application window size. 1419 */ 1420 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1421 con->sock->sk->sk_write_pending++; 1422 1423 clear_bit(CF_SEND_PENDING, &con->flags); 1424 spin_unlock_bh(&con->writequeue_lock); 1425 release_sock(con->sock->sk); 1426 1427 /* wait for write_space() event */ 1428 return DLM_IO_END; 1429 } 1430 spin_unlock_bh(&con->writequeue_lock); 1431 release_sock(con->sock->sk); 1432 1433 return DLM_IO_RESCHED; 1434 } else if (ret < 0) { 1435 return ret; 1436 } 1437 1438 spin_lock_bh(&con->writequeue_lock); 1439 writequeue_entry_complete(e, ret); 1440 spin_unlock_bh(&con->writequeue_lock); 1441 1442 return DLM_IO_SUCCESS; 1443 } 1444 1445 static void clean_one_writequeue(struct connection *con) 1446 { 1447 struct writequeue_entry *e, *safe; 1448 1449 spin_lock_bh(&con->writequeue_lock); 1450 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1451 free_entry(e); 1452 } 1453 spin_unlock_bh(&con->writequeue_lock); 1454 } 1455 1456 static void connection_release(struct rcu_head *rcu) 1457 { 1458 struct connection *con = container_of(rcu, struct connection, rcu); 1459 1460 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1461 WARN_ON_ONCE(con->sock); 1462 kfree(con); 1463 } 1464 1465 /* Called from recovery when it knows that a node has 1466 left the cluster */ 1467 int dlm_lowcomms_close(int nodeid) 1468 { 1469 struct connection *con; 1470 int idx; 1471 1472 log_print("closing connection to node %d", nodeid); 1473 1474 idx = srcu_read_lock(&connections_srcu); 1475 con = nodeid2con(nodeid, 0); 1476 if (WARN_ON_ONCE(!con)) { 1477 srcu_read_unlock(&connections_srcu, idx); 1478 return -ENOENT; 1479 } 1480 1481 stop_connection_io(con); 1482 log_print("io handling for node: %d stopped", nodeid); 1483 close_connection(con, true); 1484 1485 spin_lock(&connections_lock); 1486 hlist_del_rcu(&con->list); 1487 spin_unlock(&connections_lock); 1488 1489 clean_one_writequeue(con); 1490 call_srcu(&connections_srcu, &con->rcu, connection_release); 1491 if (con->othercon) { 1492 clean_one_writequeue(con->othercon); 1493 if (con->othercon) 1494 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1495 } 1496 srcu_read_unlock(&connections_srcu, idx); 1497 1498 /* for debugging we print when we are done to compare with other 1499 * messages in between. This function need to be correctly synchronized 1500 * with io handling 1501 */ 1502 log_print("closing connection to node %d done", nodeid); 1503 1504 return 0; 1505 } 1506 1507 /* Receive worker function */ 1508 static void process_recv_sockets(struct work_struct *work) 1509 { 1510 struct connection *con = container_of(work, struct connection, rwork); 1511 int ret, buflen; 1512 1513 down_read(&con->sock_lock); 1514 if (!con->sock) { 1515 up_read(&con->sock_lock); 1516 return; 1517 } 1518 1519 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1520 do { 1521 ret = receive_from_sock(con, buflen); 1522 } while (ret == DLM_IO_SUCCESS); 1523 up_read(&con->sock_lock); 1524 1525 switch (ret) { 1526 case DLM_IO_END: 1527 /* CF_RECV_PENDING cleared */ 1528 break; 1529 case DLM_IO_EOF: 1530 close_connection(con, false); 1531 wake_up(&con->shutdown_wait); 1532 /* CF_RECV_PENDING cleared */ 1533 break; 1534 case DLM_IO_RESCHED: 1535 cond_resched(); 1536 queue_work(io_workqueue, &con->rwork); 1537 /* CF_RECV_PENDING not cleared */ 1538 break; 1539 default: 1540 if (ret < 0) { 1541 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1542 close_connection(con, false); 1543 } else { 1544 spin_lock_bh(&con->writequeue_lock); 1545 lowcomms_queue_swork(con); 1546 spin_unlock_bh(&con->writequeue_lock); 1547 } 1548 1549 /* CF_RECV_PENDING cleared for othercon 1550 * we trigger send queue if not already done 1551 * and process_send_sockets will handle it 1552 */ 1553 break; 1554 } 1555 1556 WARN_ON_ONCE(1); 1557 break; 1558 } 1559 } 1560 1561 static void process_listen_recv_socket(struct work_struct *work) 1562 { 1563 int ret; 1564 1565 if (WARN_ON_ONCE(!listen_con.sock)) 1566 return; 1567 1568 do { 1569 ret = accept_from_sock(); 1570 } while (ret == DLM_IO_SUCCESS); 1571 1572 if (ret < 0) 1573 log_print("critical error accepting connection: %d", ret); 1574 } 1575 1576 static int dlm_connect(struct connection *con) 1577 { 1578 struct sockaddr_storage addr; 1579 int result, addr_len; 1580 struct socket *sock; 1581 unsigned int mark; 1582 1583 memset(&addr, 0, sizeof(addr)); 1584 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1585 dlm_proto_ops->try_new_addr, &mark); 1586 if (result < 0) { 1587 log_print("no address for nodeid %d", con->nodeid); 1588 return result; 1589 } 1590 1591 /* Create a socket to communicate with */ 1592 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1593 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1594 if (result < 0) 1595 return result; 1596 1597 sock_set_mark(sock->sk, mark); 1598 dlm_proto_ops->sockopts(sock); 1599 1600 result = dlm_proto_ops->bind(sock); 1601 if (result < 0) { 1602 sock_release(sock); 1603 return result; 1604 } 1605 1606 add_sock(sock, con); 1607 1608 log_print_ratelimited("connecting to %d", con->nodeid); 1609 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1610 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1611 addr_len); 1612 switch (result) { 1613 case -EINPROGRESS: 1614 /* not an error */ 1615 fallthrough; 1616 case 0: 1617 break; 1618 default: 1619 if (result < 0) 1620 dlm_close_sock(&con->sock); 1621 1622 break; 1623 } 1624 1625 return result; 1626 } 1627 1628 /* Send worker function */ 1629 static void process_send_sockets(struct work_struct *work) 1630 { 1631 struct connection *con = container_of(work, struct connection, swork); 1632 int ret; 1633 1634 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1635 1636 down_read(&con->sock_lock); 1637 if (!con->sock) { 1638 up_read(&con->sock_lock); 1639 down_write(&con->sock_lock); 1640 if (!con->sock) { 1641 ret = dlm_connect(con); 1642 switch (ret) { 1643 case 0: 1644 break; 1645 case -EINPROGRESS: 1646 /* avoid spamming resched on connection 1647 * we might can switch to a state_change 1648 * event based mechanism if established 1649 */ 1650 msleep(100); 1651 break; 1652 default: 1653 /* CF_SEND_PENDING not cleared */ 1654 up_write(&con->sock_lock); 1655 log_print("connect to node %d try %d error %d", 1656 con->nodeid, con->retries++, ret); 1657 msleep(1000); 1658 /* For now we try forever to reconnect. In 1659 * future we should send a event to cluster 1660 * manager to fence itself after certain amount 1661 * of retries. 1662 */ 1663 queue_work(io_workqueue, &con->swork); 1664 return; 1665 } 1666 } 1667 downgrade_write(&con->sock_lock); 1668 } 1669 1670 do { 1671 ret = send_to_sock(con); 1672 } while (ret == DLM_IO_SUCCESS); 1673 up_read(&con->sock_lock); 1674 1675 switch (ret) { 1676 case DLM_IO_END: 1677 /* CF_SEND_PENDING cleared */ 1678 break; 1679 case DLM_IO_RESCHED: 1680 /* CF_SEND_PENDING not cleared */ 1681 cond_resched(); 1682 queue_work(io_workqueue, &con->swork); 1683 break; 1684 default: 1685 if (ret < 0) { 1686 close_connection(con, false); 1687 1688 /* CF_SEND_PENDING cleared */ 1689 spin_lock_bh(&con->writequeue_lock); 1690 lowcomms_queue_swork(con); 1691 spin_unlock_bh(&con->writequeue_lock); 1692 break; 1693 } 1694 1695 WARN_ON_ONCE(1); 1696 break; 1697 } 1698 } 1699 1700 static void work_stop(void) 1701 { 1702 if (io_workqueue) { 1703 destroy_workqueue(io_workqueue); 1704 io_workqueue = NULL; 1705 } 1706 1707 if (process_workqueue) { 1708 destroy_workqueue(process_workqueue); 1709 process_workqueue = NULL; 1710 } 1711 } 1712 1713 static int work_start(void) 1714 { 1715 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM, 1716 0); 1717 if (!io_workqueue) { 1718 log_print("can't start dlm_io"); 1719 return -ENOMEM; 1720 } 1721 1722 /* ordered dlm message process queue, 1723 * should be converted to a tasklet 1724 */ 1725 process_workqueue = alloc_ordered_workqueue("dlm_process", 1726 WQ_HIGHPRI | WQ_MEM_RECLAIM); 1727 if (!process_workqueue) { 1728 log_print("can't start dlm_process"); 1729 destroy_workqueue(io_workqueue); 1730 io_workqueue = NULL; 1731 return -ENOMEM; 1732 } 1733 1734 return 0; 1735 } 1736 1737 void dlm_lowcomms_shutdown(void) 1738 { 1739 struct connection *con; 1740 int i, idx; 1741 1742 /* stop lowcomms_listen_data_ready calls */ 1743 lock_sock(listen_con.sock->sk); 1744 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1745 release_sock(listen_con.sock->sk); 1746 1747 cancel_work_sync(&listen_con.rwork); 1748 dlm_close_sock(&listen_con.sock); 1749 1750 idx = srcu_read_lock(&connections_srcu); 1751 for (i = 0; i < CONN_HASH_SIZE; i++) { 1752 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1753 shutdown_connection(con, true); 1754 stop_connection_io(con); 1755 flush_workqueue(process_workqueue); 1756 close_connection(con, true); 1757 1758 clean_one_writequeue(con); 1759 if (con->othercon) 1760 clean_one_writequeue(con->othercon); 1761 allow_connection_io(con); 1762 } 1763 } 1764 srcu_read_unlock(&connections_srcu, idx); 1765 } 1766 1767 void dlm_lowcomms_stop(void) 1768 { 1769 work_stop(); 1770 dlm_proto_ops = NULL; 1771 } 1772 1773 static int dlm_listen_for_all(void) 1774 { 1775 struct socket *sock; 1776 int result; 1777 1778 log_print("Using %s for communications", 1779 dlm_proto_ops->name); 1780 1781 result = dlm_proto_ops->listen_validate(); 1782 if (result < 0) 1783 return result; 1784 1785 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1786 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1787 if (result < 0) { 1788 log_print("Can't create comms socket: %d", result); 1789 return result; 1790 } 1791 1792 sock_set_mark(sock->sk, dlm_config.ci_mark); 1793 dlm_proto_ops->listen_sockopts(sock); 1794 1795 result = dlm_proto_ops->listen_bind(sock); 1796 if (result < 0) 1797 goto out; 1798 1799 lock_sock(sock->sk); 1800 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1801 listen_sock.sk_write_space = sock->sk->sk_write_space; 1802 listen_sock.sk_error_report = sock->sk->sk_error_report; 1803 listen_sock.sk_state_change = sock->sk->sk_state_change; 1804 1805 listen_con.sock = sock; 1806 1807 sock->sk->sk_allocation = GFP_NOFS; 1808 sock->sk->sk_use_task_frag = false; 1809 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1810 release_sock(sock->sk); 1811 1812 result = sock->ops->listen(sock, 5); 1813 if (result < 0) { 1814 dlm_close_sock(&listen_con.sock); 1815 return result; 1816 } 1817 1818 return 0; 1819 1820 out: 1821 sock_release(sock); 1822 return result; 1823 } 1824 1825 static int dlm_tcp_bind(struct socket *sock) 1826 { 1827 struct sockaddr_storage src_addr; 1828 int result, addr_len; 1829 1830 /* Bind to our cluster-known address connecting to avoid 1831 * routing problems. 1832 */ 1833 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1834 make_sockaddr(&src_addr, 0, &addr_len); 1835 1836 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1837 addr_len); 1838 if (result < 0) { 1839 /* This *may* not indicate a critical error */ 1840 log_print("could not bind for connect: %d", result); 1841 } 1842 1843 return 0; 1844 } 1845 1846 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1847 struct sockaddr *addr, int addr_len) 1848 { 1849 return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1850 } 1851 1852 static int dlm_tcp_listen_validate(void) 1853 { 1854 /* We don't support multi-homed hosts */ 1855 if (dlm_local_count > 1) { 1856 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1857 return -EINVAL; 1858 } 1859 1860 return 0; 1861 } 1862 1863 static void dlm_tcp_sockopts(struct socket *sock) 1864 { 1865 /* Turn off Nagle's algorithm */ 1866 tcp_sock_set_nodelay(sock->sk); 1867 } 1868 1869 static void dlm_tcp_listen_sockopts(struct socket *sock) 1870 { 1871 dlm_tcp_sockopts(sock); 1872 sock_set_reuseaddr(sock->sk); 1873 } 1874 1875 static int dlm_tcp_listen_bind(struct socket *sock) 1876 { 1877 int addr_len; 1878 1879 /* Bind to our port */ 1880 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1881 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1882 addr_len); 1883 } 1884 1885 static const struct dlm_proto_ops dlm_tcp_ops = { 1886 .name = "TCP", 1887 .proto = IPPROTO_TCP, 1888 .connect = dlm_tcp_connect, 1889 .sockopts = dlm_tcp_sockopts, 1890 .bind = dlm_tcp_bind, 1891 .listen_validate = dlm_tcp_listen_validate, 1892 .listen_sockopts = dlm_tcp_listen_sockopts, 1893 .listen_bind = dlm_tcp_listen_bind, 1894 }; 1895 1896 static int dlm_sctp_bind(struct socket *sock) 1897 { 1898 return sctp_bind_addrs(sock, 0); 1899 } 1900 1901 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1902 struct sockaddr *addr, int addr_len) 1903 { 1904 int ret; 1905 1906 /* 1907 * Make sock->ops->connect() function return in specified time, 1908 * since O_NONBLOCK argument in connect() function does not work here, 1909 * then, we should restore the default value of this attribute. 1910 */ 1911 sock_set_sndtimeo(sock->sk, 5); 1912 ret = sock->ops->connect(sock, addr, addr_len, 0); 1913 sock_set_sndtimeo(sock->sk, 0); 1914 return ret; 1915 } 1916 1917 static int dlm_sctp_listen_validate(void) 1918 { 1919 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1920 log_print("SCTP is not enabled by this kernel"); 1921 return -EOPNOTSUPP; 1922 } 1923 1924 request_module("sctp"); 1925 return 0; 1926 } 1927 1928 static int dlm_sctp_bind_listen(struct socket *sock) 1929 { 1930 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1931 } 1932 1933 static void dlm_sctp_sockopts(struct socket *sock) 1934 { 1935 /* Turn off Nagle's algorithm */ 1936 sctp_sock_set_nodelay(sock->sk); 1937 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1938 } 1939 1940 static const struct dlm_proto_ops dlm_sctp_ops = { 1941 .name = "SCTP", 1942 .proto = IPPROTO_SCTP, 1943 .try_new_addr = true, 1944 .connect = dlm_sctp_connect, 1945 .sockopts = dlm_sctp_sockopts, 1946 .bind = dlm_sctp_bind, 1947 .listen_validate = dlm_sctp_listen_validate, 1948 .listen_sockopts = dlm_sctp_sockopts, 1949 .listen_bind = dlm_sctp_bind_listen, 1950 }; 1951 1952 int dlm_lowcomms_start(void) 1953 { 1954 int error; 1955 1956 init_local(); 1957 if (!dlm_local_count) { 1958 error = -ENOTCONN; 1959 log_print("no local IP address has been set"); 1960 goto fail; 1961 } 1962 1963 error = work_start(); 1964 if (error) 1965 goto fail; 1966 1967 /* Start listening */ 1968 switch (dlm_config.ci_protocol) { 1969 case DLM_PROTO_TCP: 1970 dlm_proto_ops = &dlm_tcp_ops; 1971 break; 1972 case DLM_PROTO_SCTP: 1973 dlm_proto_ops = &dlm_sctp_ops; 1974 break; 1975 default: 1976 log_print("Invalid protocol identifier %d set", 1977 dlm_config.ci_protocol); 1978 error = -EINVAL; 1979 goto fail_proto_ops; 1980 } 1981 1982 error = dlm_listen_for_all(); 1983 if (error) 1984 goto fail_listen; 1985 1986 return 0; 1987 1988 fail_listen: 1989 dlm_proto_ops = NULL; 1990 fail_proto_ops: 1991 work_stop(); 1992 fail: 1993 return error; 1994 } 1995 1996 void dlm_lowcomms_init(void) 1997 { 1998 int i; 1999 2000 for (i = 0; i < CONN_HASH_SIZE; i++) 2001 INIT_HLIST_HEAD(&connection_hash[i]); 2002 2003 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 2004 } 2005 2006 void dlm_lowcomms_exit(void) 2007 { 2008 struct connection *con; 2009 int i, idx; 2010 2011 idx = srcu_read_lock(&connections_srcu); 2012 for (i = 0; i < CONN_HASH_SIZE; i++) { 2013 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 2014 spin_lock(&connections_lock); 2015 hlist_del_rcu(&con->list); 2016 spin_unlock(&connections_lock); 2017 2018 if (con->othercon) 2019 call_srcu(&connections_srcu, &con->othercon->rcu, 2020 connection_release); 2021 call_srcu(&connections_srcu, &con->rcu, connection_release); 2022 } 2023 } 2024 srcu_read_unlock(&connections_srcu, idx); 2025 } 2026