1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include <trace/events/dlm.h> 57 #include <trace/events/sock.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "memory.h" 63 #include "config.h" 64 65 #define NEEDED_RMEM (4*1024*1024) 66 67 struct connection { 68 struct socket *sock; /* NULL if not connected */ 69 uint32_t nodeid; /* So we know who we are in the list */ 70 /* this semaphore is used to allow parallel recv/send in read 71 * lock mode. When we release a sock we need to held the write lock. 72 * 73 * However this is locking code and not nice. When we remove the 74 * othercon handling we can look into other mechanism to synchronize 75 * io handling to call sock_release() at the right time. 76 */ 77 struct rw_semaphore sock_lock; 78 unsigned long flags; 79 #define CF_APP_LIMITED 0 80 #define CF_RECV_PENDING 1 81 #define CF_SEND_PENDING 2 82 #define CF_RECV_INTR 3 83 #define CF_IO_STOP 4 84 #define CF_IS_OTHERCON 5 85 struct list_head writequeue; /* List of outgoing writequeue_entries */ 86 spinlock_t writequeue_lock; 87 int retries; 88 struct hlist_node list; 89 /* due some connect()/accept() races we currently have this cross over 90 * connection attempt second connection for one node. 91 * 92 * There is a solution to avoid the race by introducing a connect 93 * rule as e.g. our_nodeid > nodeid_to_connect who is allowed to 94 * connect. Otherside can connect but will only be considered that 95 * the other side wants to have a reconnect. 96 * 97 * However changing to this behaviour will break backwards compatible. 98 * In a DLM protocol major version upgrade we should remove this! 99 */ 100 struct connection *othercon; 101 struct work_struct rwork; /* receive worker */ 102 struct work_struct swork; /* send worker */ 103 unsigned char rx_leftover_buf[DLM_MAX_SOCKET_BUFSIZE]; 104 int rx_leftover; 105 int mark; 106 int addr_count; 107 int curr_addr_index; 108 struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT]; 109 spinlock_t addrs_lock; 110 struct rcu_head rcu; 111 }; 112 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 113 114 struct listen_connection { 115 struct socket *sock; 116 struct work_struct rwork; 117 }; 118 119 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 120 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 121 122 /* An entry waiting to be sent */ 123 struct writequeue_entry { 124 struct list_head list; 125 struct page *page; 126 int offset; 127 int len; 128 int end; 129 int users; 130 bool dirty; 131 struct connection *con; 132 struct list_head msgs; 133 struct kref ref; 134 }; 135 136 struct dlm_msg { 137 struct writequeue_entry *entry; 138 struct dlm_msg *orig_msg; 139 bool retransmit; 140 void *ppc; 141 int len; 142 int idx; /* new()/commit() idx exchange */ 143 144 struct list_head list; 145 struct kref ref; 146 }; 147 148 struct processqueue_entry { 149 unsigned char *buf; 150 int nodeid; 151 int buflen; 152 153 struct list_head list; 154 }; 155 156 struct dlm_proto_ops { 157 bool try_new_addr; 158 const char *name; 159 int proto; 160 161 int (*connect)(struct connection *con, struct socket *sock, 162 struct sockaddr *addr, int addr_len); 163 void (*sockopts)(struct socket *sock); 164 int (*bind)(struct socket *sock); 165 int (*listen_validate)(void); 166 void (*listen_sockopts)(struct socket *sock); 167 int (*listen_bind)(struct socket *sock); 168 }; 169 170 static struct listen_sock_callbacks { 171 void (*sk_error_report)(struct sock *); 172 void (*sk_data_ready)(struct sock *); 173 void (*sk_state_change)(struct sock *); 174 void (*sk_write_space)(struct sock *); 175 } listen_sock; 176 177 static struct listen_connection listen_con; 178 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT]; 179 static int dlm_local_count; 180 181 /* Work queues */ 182 static struct workqueue_struct *io_workqueue; 183 static struct workqueue_struct *process_workqueue; 184 185 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 186 static DEFINE_SPINLOCK(connections_lock); 187 DEFINE_STATIC_SRCU(connections_srcu); 188 189 static const struct dlm_proto_ops *dlm_proto_ops; 190 191 #define DLM_IO_SUCCESS 0 192 #define DLM_IO_END 1 193 #define DLM_IO_EOF 2 194 #define DLM_IO_RESCHED 3 195 196 static void process_recv_sockets(struct work_struct *work); 197 static void process_send_sockets(struct work_struct *work); 198 static void process_dlm_messages(struct work_struct *work); 199 200 static DECLARE_WORK(process_work, process_dlm_messages); 201 static DEFINE_SPINLOCK(processqueue_lock); 202 static bool process_dlm_messages_pending; 203 static LIST_HEAD(processqueue); 204 205 bool dlm_lowcomms_is_running(void) 206 { 207 return !!listen_con.sock; 208 } 209 210 static void lowcomms_queue_swork(struct connection *con) 211 { 212 assert_spin_locked(&con->writequeue_lock); 213 214 if (!test_bit(CF_IO_STOP, &con->flags) && 215 !test_bit(CF_APP_LIMITED, &con->flags) && 216 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) 217 queue_work(io_workqueue, &con->swork); 218 } 219 220 static void lowcomms_queue_rwork(struct connection *con) 221 { 222 #ifdef CONFIG_LOCKDEP 223 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); 224 #endif 225 226 if (!test_bit(CF_IO_STOP, &con->flags) && 227 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) 228 queue_work(io_workqueue, &con->rwork); 229 } 230 231 static void writequeue_entry_ctor(void *data) 232 { 233 struct writequeue_entry *entry = data; 234 235 INIT_LIST_HEAD(&entry->msgs); 236 } 237 238 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void) 239 { 240 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry), 241 0, 0, writequeue_entry_ctor); 242 } 243 244 struct kmem_cache *dlm_lowcomms_msg_cache_create(void) 245 { 246 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL); 247 } 248 249 /* need to held writequeue_lock */ 250 static struct writequeue_entry *con_next_wq(struct connection *con) 251 { 252 struct writequeue_entry *e; 253 254 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, 255 list); 256 /* if len is zero nothing is to send, if there are users filling 257 * buffers we wait until the users are done so we can send more. 258 */ 259 if (!e || e->users || e->len == 0) 260 return NULL; 261 262 return e; 263 } 264 265 static struct connection *__find_con(int nodeid, int r) 266 { 267 struct connection *con; 268 269 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 270 if (con->nodeid == nodeid) 271 return con; 272 } 273 274 return NULL; 275 } 276 277 static void dlm_con_init(struct connection *con, int nodeid) 278 { 279 con->nodeid = nodeid; 280 init_rwsem(&con->sock_lock); 281 INIT_LIST_HEAD(&con->writequeue); 282 spin_lock_init(&con->writequeue_lock); 283 INIT_WORK(&con->swork, process_send_sockets); 284 INIT_WORK(&con->rwork, process_recv_sockets); 285 spin_lock_init(&con->addrs_lock); 286 } 287 288 /* 289 * If 'allocation' is zero then we don't attempt to create a new 290 * connection structure for this node. 291 */ 292 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 293 { 294 struct connection *con, *tmp; 295 int r; 296 297 r = nodeid_hash(nodeid); 298 con = __find_con(nodeid, r); 299 if (con || !alloc) 300 return con; 301 302 con = kzalloc(sizeof(*con), alloc); 303 if (!con) 304 return NULL; 305 306 dlm_con_init(con, nodeid); 307 308 spin_lock(&connections_lock); 309 /* Because multiple workqueues/threads calls this function it can 310 * race on multiple cpu's. Instead of locking hot path __find_con() 311 * we just check in rare cases of recently added nodes again 312 * under protection of connections_lock. If this is the case we 313 * abort our connection creation and return the existing connection. 314 */ 315 tmp = __find_con(nodeid, r); 316 if (tmp) { 317 spin_unlock(&connections_lock); 318 kfree(con); 319 return tmp; 320 } 321 322 hlist_add_head_rcu(&con->list, &connection_hash[r]); 323 spin_unlock(&connections_lock); 324 325 return con; 326 } 327 328 static int addr_compare(const struct sockaddr_storage *x, 329 const struct sockaddr_storage *y) 330 { 331 switch (x->ss_family) { 332 case AF_INET: { 333 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 334 struct sockaddr_in *siny = (struct sockaddr_in *)y; 335 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 336 return 0; 337 if (sinx->sin_port != siny->sin_port) 338 return 0; 339 break; 340 } 341 case AF_INET6: { 342 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 343 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 344 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 345 return 0; 346 if (sinx->sin6_port != siny->sin6_port) 347 return 0; 348 break; 349 } 350 default: 351 return 0; 352 } 353 return 1; 354 } 355 356 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 357 struct sockaddr *sa_out, bool try_new_addr, 358 unsigned int *mark) 359 { 360 struct sockaddr_storage sas; 361 struct connection *con; 362 int idx; 363 364 if (!dlm_local_count) 365 return -1; 366 367 idx = srcu_read_lock(&connections_srcu); 368 con = nodeid2con(nodeid, 0); 369 if (!con) { 370 srcu_read_unlock(&connections_srcu, idx); 371 return -ENOENT; 372 } 373 374 spin_lock(&con->addrs_lock); 375 if (!con->addr_count) { 376 spin_unlock(&con->addrs_lock); 377 srcu_read_unlock(&connections_srcu, idx); 378 return -ENOENT; 379 } 380 381 memcpy(&sas, &con->addr[con->curr_addr_index], 382 sizeof(struct sockaddr_storage)); 383 384 if (try_new_addr) { 385 con->curr_addr_index++; 386 if (con->curr_addr_index == con->addr_count) 387 con->curr_addr_index = 0; 388 } 389 390 *mark = con->mark; 391 spin_unlock(&con->addrs_lock); 392 393 if (sas_out) 394 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 395 396 if (!sa_out) { 397 srcu_read_unlock(&connections_srcu, idx); 398 return 0; 399 } 400 401 if (dlm_local_addr[0].ss_family == AF_INET) { 402 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 403 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 404 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 405 } else { 406 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 407 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 408 ret6->sin6_addr = in6->sin6_addr; 409 } 410 411 srcu_read_unlock(&connections_srcu, idx); 412 return 0; 413 } 414 415 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 416 unsigned int *mark) 417 { 418 struct connection *con; 419 int i, idx, addr_i; 420 421 idx = srcu_read_lock(&connections_srcu); 422 for (i = 0; i < CONN_HASH_SIZE; i++) { 423 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 424 WARN_ON_ONCE(!con->addr_count); 425 426 spin_lock(&con->addrs_lock); 427 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { 428 if (addr_compare(&con->addr[addr_i], addr)) { 429 *nodeid = con->nodeid; 430 *mark = con->mark; 431 spin_unlock(&con->addrs_lock); 432 srcu_read_unlock(&connections_srcu, idx); 433 return 0; 434 } 435 } 436 spin_unlock(&con->addrs_lock); 437 } 438 } 439 srcu_read_unlock(&connections_srcu, idx); 440 441 return -ENOENT; 442 } 443 444 static bool dlm_lowcomms_con_has_addr(const struct connection *con, 445 const struct sockaddr_storage *addr) 446 { 447 int i; 448 449 for (i = 0; i < con->addr_count; i++) { 450 if (addr_compare(&con->addr[i], addr)) 451 return true; 452 } 453 454 return false; 455 } 456 457 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 458 { 459 struct connection *con; 460 bool ret, idx; 461 462 idx = srcu_read_lock(&connections_srcu); 463 con = nodeid2con(nodeid, GFP_NOFS); 464 if (!con) { 465 srcu_read_unlock(&connections_srcu, idx); 466 return -ENOMEM; 467 } 468 469 spin_lock(&con->addrs_lock); 470 if (!con->addr_count) { 471 memcpy(&con->addr[0], addr, sizeof(*addr)); 472 con->addr_count = 1; 473 con->mark = dlm_config.ci_mark; 474 spin_unlock(&con->addrs_lock); 475 srcu_read_unlock(&connections_srcu, idx); 476 return 0; 477 } 478 479 ret = dlm_lowcomms_con_has_addr(con, addr); 480 if (ret) { 481 spin_unlock(&con->addrs_lock); 482 srcu_read_unlock(&connections_srcu, idx); 483 return -EEXIST; 484 } 485 486 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { 487 spin_unlock(&con->addrs_lock); 488 srcu_read_unlock(&connections_srcu, idx); 489 return -ENOSPC; 490 } 491 492 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); 493 srcu_read_unlock(&connections_srcu, idx); 494 spin_unlock(&con->addrs_lock); 495 return 0; 496 } 497 498 /* Data available on socket or listen socket received a connect */ 499 static void lowcomms_data_ready(struct sock *sk) 500 { 501 struct connection *con = sock2con(sk); 502 503 trace_sk_data_ready(sk); 504 505 set_bit(CF_RECV_INTR, &con->flags); 506 lowcomms_queue_rwork(con); 507 } 508 509 static void lowcomms_write_space(struct sock *sk) 510 { 511 struct connection *con = sock2con(sk); 512 513 clear_bit(SOCK_NOSPACE, &con->sock->flags); 514 515 spin_lock_bh(&con->writequeue_lock); 516 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 517 con->sock->sk->sk_write_pending--; 518 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 519 } 520 521 lowcomms_queue_swork(con); 522 spin_unlock_bh(&con->writequeue_lock); 523 } 524 525 static void lowcomms_state_change(struct sock *sk) 526 { 527 /* SCTP layer is not calling sk_data_ready when the connection 528 * is done, so we catch the signal through here. 529 */ 530 if (sk->sk_shutdown == RCV_SHUTDOWN) 531 lowcomms_data_ready(sk); 532 } 533 534 static void lowcomms_listen_data_ready(struct sock *sk) 535 { 536 trace_sk_data_ready(sk); 537 538 queue_work(io_workqueue, &listen_con.rwork); 539 } 540 541 int dlm_lowcomms_connect_node(int nodeid) 542 { 543 struct connection *con; 544 int idx; 545 546 if (nodeid == dlm_our_nodeid()) 547 return 0; 548 549 idx = srcu_read_lock(&connections_srcu); 550 con = nodeid2con(nodeid, 0); 551 if (WARN_ON_ONCE(!con)) { 552 srcu_read_unlock(&connections_srcu, idx); 553 return -ENOENT; 554 } 555 556 down_read(&con->sock_lock); 557 if (!con->sock) { 558 spin_lock_bh(&con->writequeue_lock); 559 lowcomms_queue_swork(con); 560 spin_unlock_bh(&con->writequeue_lock); 561 } 562 up_read(&con->sock_lock); 563 srcu_read_unlock(&connections_srcu, idx); 564 565 cond_resched(); 566 return 0; 567 } 568 569 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 570 { 571 struct connection *con; 572 int idx; 573 574 idx = srcu_read_lock(&connections_srcu); 575 con = nodeid2con(nodeid, 0); 576 if (!con) { 577 srcu_read_unlock(&connections_srcu, idx); 578 return -ENOENT; 579 } 580 581 spin_lock(&con->addrs_lock); 582 con->mark = mark; 583 spin_unlock(&con->addrs_lock); 584 srcu_read_unlock(&connections_srcu, idx); 585 return 0; 586 } 587 588 static void lowcomms_error_report(struct sock *sk) 589 { 590 struct connection *con = sock2con(sk); 591 struct inet_sock *inet; 592 593 inet = inet_sk(sk); 594 switch (sk->sk_family) { 595 case AF_INET: 596 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 597 "sending to node %d at %pI4, dport %d, " 598 "sk_err=%d/%d\n", dlm_our_nodeid(), 599 con->nodeid, &inet->inet_daddr, 600 ntohs(inet->inet_dport), sk->sk_err, 601 sk->sk_err_soft); 602 break; 603 #if IS_ENABLED(CONFIG_IPV6) 604 case AF_INET6: 605 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 606 "sending to node %d at %pI6c, " 607 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(), 608 con->nodeid, &sk->sk_v6_daddr, 609 ntohs(inet->inet_dport), sk->sk_err, 610 sk->sk_err_soft); 611 break; 612 #endif 613 default: 614 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 615 "invalid socket family %d set, " 616 "sk_err=%d/%d\n", dlm_our_nodeid(), 617 sk->sk_family, sk->sk_err, sk->sk_err_soft); 618 break; 619 } 620 621 dlm_midcomms_unack_msg_resend(con->nodeid); 622 623 listen_sock.sk_error_report(sk); 624 } 625 626 static void restore_callbacks(struct sock *sk) 627 { 628 #ifdef CONFIG_LOCKDEP 629 WARN_ON_ONCE(!lockdep_sock_is_held(sk)); 630 #endif 631 632 sk->sk_user_data = NULL; 633 sk->sk_data_ready = listen_sock.sk_data_ready; 634 sk->sk_state_change = listen_sock.sk_state_change; 635 sk->sk_write_space = listen_sock.sk_write_space; 636 sk->sk_error_report = listen_sock.sk_error_report; 637 } 638 639 /* Make a socket active */ 640 static void add_sock(struct socket *sock, struct connection *con) 641 { 642 struct sock *sk = sock->sk; 643 644 lock_sock(sk); 645 con->sock = sock; 646 647 sk->sk_user_data = con; 648 sk->sk_data_ready = lowcomms_data_ready; 649 sk->sk_write_space = lowcomms_write_space; 650 if (dlm_config.ci_protocol == DLM_PROTO_SCTP) 651 sk->sk_state_change = lowcomms_state_change; 652 sk->sk_allocation = GFP_NOFS; 653 sk->sk_use_task_frag = false; 654 sk->sk_error_report = lowcomms_error_report; 655 release_sock(sk); 656 } 657 658 /* Add the port number to an IPv6 or 4 sockaddr and return the address 659 length */ 660 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 661 int *addr_len) 662 { 663 saddr->ss_family = dlm_local_addr[0].ss_family; 664 if (saddr->ss_family == AF_INET) { 665 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 666 in4_addr->sin_port = cpu_to_be16(port); 667 *addr_len = sizeof(struct sockaddr_in); 668 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 669 } else { 670 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 671 in6_addr->sin6_port = cpu_to_be16(port); 672 *addr_len = sizeof(struct sockaddr_in6); 673 } 674 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 675 } 676 677 static void dlm_page_release(struct kref *kref) 678 { 679 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 680 ref); 681 682 __free_page(e->page); 683 dlm_free_writequeue(e); 684 } 685 686 static void dlm_msg_release(struct kref *kref) 687 { 688 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 689 690 kref_put(&msg->entry->ref, dlm_page_release); 691 dlm_free_msg(msg); 692 } 693 694 static void free_entry(struct writequeue_entry *e) 695 { 696 struct dlm_msg *msg, *tmp; 697 698 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 699 if (msg->orig_msg) { 700 msg->orig_msg->retransmit = false; 701 kref_put(&msg->orig_msg->ref, dlm_msg_release); 702 } 703 704 list_del(&msg->list); 705 kref_put(&msg->ref, dlm_msg_release); 706 } 707 708 list_del(&e->list); 709 kref_put(&e->ref, dlm_page_release); 710 } 711 712 static void dlm_close_sock(struct socket **sock) 713 { 714 lock_sock((*sock)->sk); 715 restore_callbacks((*sock)->sk); 716 release_sock((*sock)->sk); 717 718 sock_release(*sock); 719 *sock = NULL; 720 } 721 722 static void allow_connection_io(struct connection *con) 723 { 724 if (con->othercon) 725 clear_bit(CF_IO_STOP, &con->othercon->flags); 726 clear_bit(CF_IO_STOP, &con->flags); 727 } 728 729 static void stop_connection_io(struct connection *con) 730 { 731 if (con->othercon) 732 stop_connection_io(con->othercon); 733 734 down_write(&con->sock_lock); 735 if (con->sock) { 736 lock_sock(con->sock->sk); 737 restore_callbacks(con->sock->sk); 738 739 spin_lock_bh(&con->writequeue_lock); 740 set_bit(CF_IO_STOP, &con->flags); 741 spin_unlock_bh(&con->writequeue_lock); 742 release_sock(con->sock->sk); 743 } else { 744 spin_lock_bh(&con->writequeue_lock); 745 set_bit(CF_IO_STOP, &con->flags); 746 spin_unlock_bh(&con->writequeue_lock); 747 } 748 up_write(&con->sock_lock); 749 750 cancel_work_sync(&con->swork); 751 cancel_work_sync(&con->rwork); 752 } 753 754 /* Close a remote connection and tidy up */ 755 static void close_connection(struct connection *con, bool and_other) 756 { 757 struct writequeue_entry *e; 758 759 if (con->othercon && and_other) 760 close_connection(con->othercon, false); 761 762 down_write(&con->sock_lock); 763 if (!con->sock) { 764 up_write(&con->sock_lock); 765 return; 766 } 767 768 dlm_close_sock(&con->sock); 769 770 /* if we send a writequeue entry only a half way, we drop the 771 * whole entry because reconnection and that we not start of the 772 * middle of a msg which will confuse the other end. 773 * 774 * we can always drop messages because retransmits, but what we 775 * cannot allow is to transmit half messages which may be processed 776 * at the other side. 777 * 778 * our policy is to start on a clean state when disconnects, we don't 779 * know what's send/received on transport layer in this case. 780 */ 781 spin_lock_bh(&con->writequeue_lock); 782 if (!list_empty(&con->writequeue)) { 783 e = list_first_entry(&con->writequeue, struct writequeue_entry, 784 list); 785 if (e->dirty) 786 free_entry(e); 787 } 788 spin_unlock_bh(&con->writequeue_lock); 789 790 con->rx_leftover = 0; 791 con->retries = 0; 792 clear_bit(CF_APP_LIMITED, &con->flags); 793 clear_bit(CF_RECV_PENDING, &con->flags); 794 clear_bit(CF_SEND_PENDING, &con->flags); 795 up_write(&con->sock_lock); 796 } 797 798 static struct processqueue_entry *new_processqueue_entry(int nodeid, 799 int buflen) 800 { 801 struct processqueue_entry *pentry; 802 803 pentry = kmalloc(sizeof(*pentry), GFP_NOFS); 804 if (!pentry) 805 return NULL; 806 807 pentry->buf = kmalloc(buflen, GFP_NOFS); 808 if (!pentry->buf) { 809 kfree(pentry); 810 return NULL; 811 } 812 813 pentry->nodeid = nodeid; 814 return pentry; 815 } 816 817 static void free_processqueue_entry(struct processqueue_entry *pentry) 818 { 819 kfree(pentry->buf); 820 kfree(pentry); 821 } 822 823 struct dlm_processed_nodes { 824 int nodeid; 825 826 struct list_head list; 827 }; 828 829 static void add_processed_node(int nodeid, struct list_head *processed_nodes) 830 { 831 struct dlm_processed_nodes *n; 832 833 list_for_each_entry(n, processed_nodes, list) { 834 /* we already remembered this node */ 835 if (n->nodeid == nodeid) 836 return; 837 } 838 839 /* if it's fails in worst case we simple don't send an ack back. 840 * We try it next time. 841 */ 842 n = kmalloc(sizeof(*n), GFP_NOFS); 843 if (!n) 844 return; 845 846 n->nodeid = nodeid; 847 list_add(&n->list, processed_nodes); 848 } 849 850 static void process_dlm_messages(struct work_struct *work) 851 { 852 struct dlm_processed_nodes *n, *n_tmp; 853 struct processqueue_entry *pentry; 854 LIST_HEAD(processed_nodes); 855 856 spin_lock(&processqueue_lock); 857 pentry = list_first_entry_or_null(&processqueue, 858 struct processqueue_entry, list); 859 if (WARN_ON_ONCE(!pentry)) { 860 spin_unlock(&processqueue_lock); 861 return; 862 } 863 864 list_del(&pentry->list); 865 spin_unlock(&processqueue_lock); 866 867 for (;;) { 868 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, 869 pentry->buflen); 870 add_processed_node(pentry->nodeid, &processed_nodes); 871 free_processqueue_entry(pentry); 872 873 spin_lock(&processqueue_lock); 874 pentry = list_first_entry_or_null(&processqueue, 875 struct processqueue_entry, list); 876 if (!pentry) { 877 process_dlm_messages_pending = false; 878 spin_unlock(&processqueue_lock); 879 break; 880 } 881 882 list_del(&pentry->list); 883 spin_unlock(&processqueue_lock); 884 } 885 886 /* send ack back after we processed couple of messages */ 887 list_for_each_entry_safe(n, n_tmp, &processed_nodes, list) { 888 list_del(&n->list); 889 dlm_midcomms_receive_done(n->nodeid); 890 kfree(n); 891 } 892 } 893 894 /* Data received from remote end */ 895 static int receive_from_sock(struct connection *con, int buflen) 896 { 897 struct processqueue_entry *pentry; 898 int ret, buflen_real; 899 struct msghdr msg; 900 struct kvec iov; 901 902 pentry = new_processqueue_entry(con->nodeid, buflen); 903 if (!pentry) 904 return DLM_IO_RESCHED; 905 906 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); 907 908 /* calculate new buffer parameter regarding last receive and 909 * possible leftover bytes 910 */ 911 iov.iov_base = pentry->buf + con->rx_leftover; 912 iov.iov_len = buflen - con->rx_leftover; 913 914 memset(&msg, 0, sizeof(msg)); 915 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 916 clear_bit(CF_RECV_INTR, &con->flags); 917 again: 918 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 919 msg.msg_flags); 920 trace_dlm_recv(con->nodeid, ret); 921 if (ret == -EAGAIN) { 922 lock_sock(con->sock->sk); 923 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { 924 release_sock(con->sock->sk); 925 goto again; 926 } 927 928 clear_bit(CF_RECV_PENDING, &con->flags); 929 release_sock(con->sock->sk); 930 free_processqueue_entry(pentry); 931 return DLM_IO_END; 932 } else if (ret == 0) { 933 /* close will clear CF_RECV_PENDING */ 934 free_processqueue_entry(pentry); 935 return DLM_IO_EOF; 936 } else if (ret < 0) { 937 free_processqueue_entry(pentry); 938 return ret; 939 } 940 941 /* new buflen according readed bytes and leftover from last receive */ 942 buflen_real = ret + con->rx_leftover; 943 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, 944 buflen_real); 945 if (ret < 0) { 946 free_processqueue_entry(pentry); 947 return ret; 948 } 949 950 pentry->buflen = ret; 951 952 /* calculate leftover bytes from process and put it into begin of 953 * the receive buffer, so next receive we have the full message 954 * at the start address of the receive buffer. 955 */ 956 con->rx_leftover = buflen_real - ret; 957 memmove(con->rx_leftover_buf, pentry->buf + ret, 958 con->rx_leftover); 959 960 spin_lock(&processqueue_lock); 961 list_add_tail(&pentry->list, &processqueue); 962 if (!process_dlm_messages_pending) { 963 process_dlm_messages_pending = true; 964 queue_work(process_workqueue, &process_work); 965 } 966 spin_unlock(&processqueue_lock); 967 968 return DLM_IO_SUCCESS; 969 } 970 971 /* Listening socket is busy, accept a connection */ 972 static int accept_from_sock(void) 973 { 974 struct sockaddr_storage peeraddr; 975 int len, idx, result, nodeid; 976 struct connection *newcon; 977 struct socket *newsock; 978 unsigned int mark; 979 980 result = kernel_accept(listen_con.sock, &newsock, O_NONBLOCK); 981 if (result == -EAGAIN) 982 return DLM_IO_END; 983 else if (result < 0) 984 goto accept_err; 985 986 /* Get the connected socket's peer */ 987 memset(&peeraddr, 0, sizeof(peeraddr)); 988 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 989 if (len < 0) { 990 result = -ECONNABORTED; 991 goto accept_err; 992 } 993 994 /* Get the new node's NODEID */ 995 make_sockaddr(&peeraddr, 0, &len); 996 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 997 switch (peeraddr.ss_family) { 998 case AF_INET: { 999 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr; 1000 1001 log_print("connect from non cluster IPv4 node %pI4", 1002 &sin->sin_addr); 1003 break; 1004 } 1005 #if IS_ENABLED(CONFIG_IPV6) 1006 case AF_INET6: { 1007 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr; 1008 1009 log_print("connect from non cluster IPv6 node %pI6c", 1010 &sin6->sin6_addr); 1011 break; 1012 } 1013 #endif 1014 default: 1015 log_print("invalid family from non cluster node"); 1016 break; 1017 } 1018 1019 sock_release(newsock); 1020 return -1; 1021 } 1022 1023 log_print("got connection from %d", nodeid); 1024 1025 /* Check to see if we already have a connection to this node. This 1026 * could happen if the two nodes initiate a connection at roughly 1027 * the same time and the connections cross on the wire. 1028 * In this case we store the incoming one in "othercon" 1029 */ 1030 idx = srcu_read_lock(&connections_srcu); 1031 newcon = nodeid2con(nodeid, 0); 1032 if (WARN_ON_ONCE(!newcon)) { 1033 srcu_read_unlock(&connections_srcu, idx); 1034 result = -ENOENT; 1035 goto accept_err; 1036 } 1037 1038 sock_set_mark(newsock->sk, mark); 1039 1040 down_write(&newcon->sock_lock); 1041 if (newcon->sock) { 1042 struct connection *othercon = newcon->othercon; 1043 1044 if (!othercon) { 1045 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1046 if (!othercon) { 1047 log_print("failed to allocate incoming socket"); 1048 up_write(&newcon->sock_lock); 1049 srcu_read_unlock(&connections_srcu, idx); 1050 result = -ENOMEM; 1051 goto accept_err; 1052 } 1053 1054 dlm_con_init(othercon, nodeid); 1055 lockdep_set_subclass(&othercon->sock_lock, 1); 1056 newcon->othercon = othercon; 1057 set_bit(CF_IS_OTHERCON, &othercon->flags); 1058 } else { 1059 /* close other sock con if we have something new */ 1060 close_connection(othercon, false); 1061 } 1062 1063 down_write(&othercon->sock_lock); 1064 add_sock(newsock, othercon); 1065 1066 /* check if we receved something while adding */ 1067 lock_sock(othercon->sock->sk); 1068 lowcomms_queue_rwork(othercon); 1069 release_sock(othercon->sock->sk); 1070 up_write(&othercon->sock_lock); 1071 } 1072 else { 1073 /* accept copies the sk after we've saved the callbacks, so we 1074 don't want to save them a second time or comm errors will 1075 result in calling sk_error_report recursively. */ 1076 add_sock(newsock, newcon); 1077 1078 /* check if we receved something while adding */ 1079 lock_sock(newcon->sock->sk); 1080 lowcomms_queue_rwork(newcon); 1081 release_sock(newcon->sock->sk); 1082 } 1083 up_write(&newcon->sock_lock); 1084 srcu_read_unlock(&connections_srcu, idx); 1085 1086 return DLM_IO_SUCCESS; 1087 1088 accept_err: 1089 if (newsock) 1090 sock_release(newsock); 1091 1092 return result; 1093 } 1094 1095 /* 1096 * writequeue_entry_complete - try to delete and free write queue entry 1097 * @e: write queue entry to try to delete 1098 * @completed: bytes completed 1099 * 1100 * writequeue_lock must be held. 1101 */ 1102 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1103 { 1104 e->offset += completed; 1105 e->len -= completed; 1106 /* signal that page was half way transmitted */ 1107 e->dirty = true; 1108 1109 if (e->len == 0 && e->users == 0) 1110 free_entry(e); 1111 } 1112 1113 /* 1114 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1115 */ 1116 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1117 { 1118 struct sockaddr_storage localaddr; 1119 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1120 int i, addr_len, result = 0; 1121 1122 for (i = 0; i < dlm_local_count; i++) { 1123 memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr)); 1124 make_sockaddr(&localaddr, port, &addr_len); 1125 1126 if (!i) 1127 result = kernel_bind(sock, addr, addr_len); 1128 else 1129 result = sock_bind_add(sock->sk, addr, addr_len); 1130 1131 if (result < 0) { 1132 log_print("Can't bind to %d addr number %d, %d.\n", 1133 port, i + 1, result); 1134 break; 1135 } 1136 } 1137 return result; 1138 } 1139 1140 /* Get local addresses */ 1141 static void init_local(void) 1142 { 1143 struct sockaddr_storage sas; 1144 int i; 1145 1146 dlm_local_count = 0; 1147 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1148 if (dlm_our_addr(&sas, i)) 1149 break; 1150 1151 memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas)); 1152 } 1153 } 1154 1155 static struct writequeue_entry *new_writequeue_entry(struct connection *con) 1156 { 1157 struct writequeue_entry *entry; 1158 1159 entry = dlm_allocate_writequeue(); 1160 if (!entry) 1161 return NULL; 1162 1163 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); 1164 if (!entry->page) { 1165 dlm_free_writequeue(entry); 1166 return NULL; 1167 } 1168 1169 entry->offset = 0; 1170 entry->len = 0; 1171 entry->end = 0; 1172 entry->dirty = false; 1173 entry->con = con; 1174 entry->users = 1; 1175 kref_init(&entry->ref); 1176 return entry; 1177 } 1178 1179 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1180 char **ppc, void (*cb)(void *data), 1181 void *data) 1182 { 1183 struct writequeue_entry *e; 1184 1185 spin_lock_bh(&con->writequeue_lock); 1186 if (!list_empty(&con->writequeue)) { 1187 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1188 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1189 kref_get(&e->ref); 1190 1191 *ppc = page_address(e->page) + e->end; 1192 if (cb) 1193 cb(data); 1194 1195 e->end += len; 1196 e->users++; 1197 goto out; 1198 } 1199 } 1200 1201 e = new_writequeue_entry(con); 1202 if (!e) 1203 goto out; 1204 1205 kref_get(&e->ref); 1206 *ppc = page_address(e->page); 1207 e->end += len; 1208 if (cb) 1209 cb(data); 1210 1211 list_add_tail(&e->list, &con->writequeue); 1212 1213 out: 1214 spin_unlock_bh(&con->writequeue_lock); 1215 return e; 1216 }; 1217 1218 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1219 gfp_t allocation, char **ppc, 1220 void (*cb)(void *data), 1221 void *data) 1222 { 1223 struct writequeue_entry *e; 1224 struct dlm_msg *msg; 1225 1226 msg = dlm_allocate_msg(allocation); 1227 if (!msg) 1228 return NULL; 1229 1230 kref_init(&msg->ref); 1231 1232 e = new_wq_entry(con, len, ppc, cb, data); 1233 if (!e) { 1234 dlm_free_msg(msg); 1235 return NULL; 1236 } 1237 1238 msg->retransmit = false; 1239 msg->orig_msg = NULL; 1240 msg->ppc = *ppc; 1241 msg->len = len; 1242 msg->entry = e; 1243 1244 return msg; 1245 } 1246 1247 /* avoid false positive for nodes_srcu, unlock happens in 1248 * dlm_lowcomms_commit_msg which is a must call if success 1249 */ 1250 #ifndef __CHECKER__ 1251 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1252 char **ppc, void (*cb)(void *data), 1253 void *data) 1254 { 1255 struct connection *con; 1256 struct dlm_msg *msg; 1257 int idx; 1258 1259 if (len > DLM_MAX_SOCKET_BUFSIZE || 1260 len < sizeof(struct dlm_header)) { 1261 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1262 log_print("failed to allocate a buffer of size %d", len); 1263 WARN_ON_ONCE(1); 1264 return NULL; 1265 } 1266 1267 idx = srcu_read_lock(&connections_srcu); 1268 con = nodeid2con(nodeid, 0); 1269 if (WARN_ON_ONCE(!con)) { 1270 srcu_read_unlock(&connections_srcu, idx); 1271 return NULL; 1272 } 1273 1274 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); 1275 if (!msg) { 1276 srcu_read_unlock(&connections_srcu, idx); 1277 return NULL; 1278 } 1279 1280 /* for dlm_lowcomms_commit_msg() */ 1281 kref_get(&msg->ref); 1282 /* we assume if successful commit must called */ 1283 msg->idx = idx; 1284 return msg; 1285 } 1286 #endif 1287 1288 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1289 { 1290 struct writequeue_entry *e = msg->entry; 1291 struct connection *con = e->con; 1292 int users; 1293 1294 spin_lock_bh(&con->writequeue_lock); 1295 kref_get(&msg->ref); 1296 list_add(&msg->list, &e->msgs); 1297 1298 users = --e->users; 1299 if (users) 1300 goto out; 1301 1302 e->len = DLM_WQ_LENGTH_BYTES(e); 1303 1304 lowcomms_queue_swork(con); 1305 1306 out: 1307 spin_unlock_bh(&con->writequeue_lock); 1308 return; 1309 } 1310 1311 /* avoid false positive for nodes_srcu, lock was happen in 1312 * dlm_lowcomms_new_msg 1313 */ 1314 #ifndef __CHECKER__ 1315 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1316 { 1317 _dlm_lowcomms_commit_msg(msg); 1318 srcu_read_unlock(&connections_srcu, msg->idx); 1319 /* because dlm_lowcomms_new_msg() */ 1320 kref_put(&msg->ref, dlm_msg_release); 1321 } 1322 #endif 1323 1324 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1325 { 1326 kref_put(&msg->ref, dlm_msg_release); 1327 } 1328 1329 /* does not held connections_srcu, usage lowcomms_error_report only */ 1330 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1331 { 1332 struct dlm_msg *msg_resend; 1333 char *ppc; 1334 1335 if (msg->retransmit) 1336 return 1; 1337 1338 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1339 GFP_ATOMIC, &ppc, NULL, NULL); 1340 if (!msg_resend) 1341 return -ENOMEM; 1342 1343 msg->retransmit = true; 1344 kref_get(&msg->ref); 1345 msg_resend->orig_msg = msg; 1346 1347 memcpy(ppc, msg->ppc, msg->len); 1348 _dlm_lowcomms_commit_msg(msg_resend); 1349 dlm_lowcomms_put_msg(msg_resend); 1350 1351 return 0; 1352 } 1353 1354 /* Send a message */ 1355 static int send_to_sock(struct connection *con) 1356 { 1357 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1358 struct writequeue_entry *e; 1359 int len, offset, ret; 1360 1361 spin_lock_bh(&con->writequeue_lock); 1362 e = con_next_wq(con); 1363 if (!e) { 1364 clear_bit(CF_SEND_PENDING, &con->flags); 1365 spin_unlock_bh(&con->writequeue_lock); 1366 return DLM_IO_END; 1367 } 1368 1369 len = e->len; 1370 offset = e->offset; 1371 WARN_ON_ONCE(len == 0 && e->users == 0); 1372 spin_unlock_bh(&con->writequeue_lock); 1373 1374 ret = kernel_sendpage(con->sock, e->page, offset, len, 1375 msg_flags); 1376 trace_dlm_send(con->nodeid, ret); 1377 if (ret == -EAGAIN || ret == 0) { 1378 lock_sock(con->sock->sk); 1379 spin_lock_bh(&con->writequeue_lock); 1380 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1381 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1382 /* Notify TCP that we're limited by the 1383 * application window size. 1384 */ 1385 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); 1386 con->sock->sk->sk_write_pending++; 1387 1388 clear_bit(CF_SEND_PENDING, &con->flags); 1389 spin_unlock_bh(&con->writequeue_lock); 1390 release_sock(con->sock->sk); 1391 1392 /* wait for write_space() event */ 1393 return DLM_IO_END; 1394 } 1395 spin_unlock_bh(&con->writequeue_lock); 1396 release_sock(con->sock->sk); 1397 1398 return DLM_IO_RESCHED; 1399 } else if (ret < 0) { 1400 return ret; 1401 } 1402 1403 spin_lock_bh(&con->writequeue_lock); 1404 writequeue_entry_complete(e, ret); 1405 spin_unlock_bh(&con->writequeue_lock); 1406 1407 return DLM_IO_SUCCESS; 1408 } 1409 1410 static void clean_one_writequeue(struct connection *con) 1411 { 1412 struct writequeue_entry *e, *safe; 1413 1414 spin_lock_bh(&con->writequeue_lock); 1415 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1416 free_entry(e); 1417 } 1418 spin_unlock_bh(&con->writequeue_lock); 1419 } 1420 1421 static void connection_release(struct rcu_head *rcu) 1422 { 1423 struct connection *con = container_of(rcu, struct connection, rcu); 1424 1425 WARN_ON_ONCE(!list_empty(&con->writequeue)); 1426 WARN_ON_ONCE(con->sock); 1427 kfree(con); 1428 } 1429 1430 /* Called from recovery when it knows that a node has 1431 left the cluster */ 1432 int dlm_lowcomms_close(int nodeid) 1433 { 1434 struct connection *con; 1435 int idx; 1436 1437 log_print("closing connection to node %d", nodeid); 1438 1439 idx = srcu_read_lock(&connections_srcu); 1440 con = nodeid2con(nodeid, 0); 1441 if (WARN_ON_ONCE(!con)) { 1442 srcu_read_unlock(&connections_srcu, idx); 1443 return -ENOENT; 1444 } 1445 1446 stop_connection_io(con); 1447 log_print("io handling for node: %d stopped", nodeid); 1448 close_connection(con, true); 1449 1450 spin_lock(&connections_lock); 1451 hlist_del_rcu(&con->list); 1452 spin_unlock(&connections_lock); 1453 1454 clean_one_writequeue(con); 1455 call_srcu(&connections_srcu, &con->rcu, connection_release); 1456 if (con->othercon) { 1457 clean_one_writequeue(con->othercon); 1458 if (con->othercon) 1459 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); 1460 } 1461 srcu_read_unlock(&connections_srcu, idx); 1462 1463 /* for debugging we print when we are done to compare with other 1464 * messages in between. This function need to be correctly synchronized 1465 * with io handling 1466 */ 1467 log_print("closing connection to node %d done", nodeid); 1468 1469 return 0; 1470 } 1471 1472 /* Receive worker function */ 1473 static void process_recv_sockets(struct work_struct *work) 1474 { 1475 struct connection *con = container_of(work, struct connection, rwork); 1476 int ret, buflen; 1477 1478 down_read(&con->sock_lock); 1479 if (!con->sock) { 1480 up_read(&con->sock_lock); 1481 return; 1482 } 1483 1484 buflen = READ_ONCE(dlm_config.ci_buffer_size); 1485 do { 1486 ret = receive_from_sock(con, buflen); 1487 } while (ret == DLM_IO_SUCCESS); 1488 up_read(&con->sock_lock); 1489 1490 switch (ret) { 1491 case DLM_IO_END: 1492 /* CF_RECV_PENDING cleared */ 1493 break; 1494 case DLM_IO_EOF: 1495 close_connection(con, false); 1496 /* CF_RECV_PENDING cleared */ 1497 break; 1498 case DLM_IO_RESCHED: 1499 cond_resched(); 1500 queue_work(io_workqueue, &con->rwork); 1501 /* CF_RECV_PENDING not cleared */ 1502 break; 1503 default: 1504 if (ret < 0) { 1505 if (test_bit(CF_IS_OTHERCON, &con->flags)) { 1506 close_connection(con, false); 1507 } else { 1508 spin_lock_bh(&con->writequeue_lock); 1509 lowcomms_queue_swork(con); 1510 spin_unlock_bh(&con->writequeue_lock); 1511 } 1512 1513 /* CF_RECV_PENDING cleared for othercon 1514 * we trigger send queue if not already done 1515 * and process_send_sockets will handle it 1516 */ 1517 break; 1518 } 1519 1520 WARN_ON_ONCE(1); 1521 break; 1522 } 1523 } 1524 1525 static void process_listen_recv_socket(struct work_struct *work) 1526 { 1527 int ret; 1528 1529 if (WARN_ON_ONCE(!listen_con.sock)) 1530 return; 1531 1532 do { 1533 ret = accept_from_sock(); 1534 } while (ret == DLM_IO_SUCCESS); 1535 1536 if (ret < 0) 1537 log_print("critical error accepting connection: %d", ret); 1538 } 1539 1540 static int dlm_connect(struct connection *con) 1541 { 1542 struct sockaddr_storage addr; 1543 int result, addr_len; 1544 struct socket *sock; 1545 unsigned int mark; 1546 1547 memset(&addr, 0, sizeof(addr)); 1548 result = nodeid_to_addr(con->nodeid, &addr, NULL, 1549 dlm_proto_ops->try_new_addr, &mark); 1550 if (result < 0) { 1551 log_print("no address for nodeid %d", con->nodeid); 1552 return result; 1553 } 1554 1555 /* Create a socket to communicate with */ 1556 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1557 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1558 if (result < 0) 1559 return result; 1560 1561 sock_set_mark(sock->sk, mark); 1562 dlm_proto_ops->sockopts(sock); 1563 1564 result = dlm_proto_ops->bind(sock); 1565 if (result < 0) { 1566 sock_release(sock); 1567 return result; 1568 } 1569 1570 add_sock(sock, con); 1571 1572 log_print_ratelimited("connecting to %d", con->nodeid); 1573 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len); 1574 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, 1575 addr_len); 1576 switch (result) { 1577 case -EINPROGRESS: 1578 /* not an error */ 1579 fallthrough; 1580 case 0: 1581 break; 1582 default: 1583 if (result < 0) 1584 dlm_close_sock(&con->sock); 1585 1586 break; 1587 } 1588 1589 return result; 1590 } 1591 1592 /* Send worker function */ 1593 static void process_send_sockets(struct work_struct *work) 1594 { 1595 struct connection *con = container_of(work, struct connection, swork); 1596 int ret; 1597 1598 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); 1599 1600 down_read(&con->sock_lock); 1601 if (!con->sock) { 1602 up_read(&con->sock_lock); 1603 down_write(&con->sock_lock); 1604 if (!con->sock) { 1605 ret = dlm_connect(con); 1606 switch (ret) { 1607 case 0: 1608 break; 1609 case -EINPROGRESS: 1610 /* avoid spamming resched on connection 1611 * we might can switch to a state_change 1612 * event based mechanism if established 1613 */ 1614 msleep(100); 1615 break; 1616 default: 1617 /* CF_SEND_PENDING not cleared */ 1618 up_write(&con->sock_lock); 1619 log_print("connect to node %d try %d error %d", 1620 con->nodeid, con->retries++, ret); 1621 msleep(1000); 1622 /* For now we try forever to reconnect. In 1623 * future we should send a event to cluster 1624 * manager to fence itself after certain amount 1625 * of retries. 1626 */ 1627 queue_work(io_workqueue, &con->swork); 1628 return; 1629 } 1630 } 1631 downgrade_write(&con->sock_lock); 1632 } 1633 1634 do { 1635 ret = send_to_sock(con); 1636 } while (ret == DLM_IO_SUCCESS); 1637 up_read(&con->sock_lock); 1638 1639 switch (ret) { 1640 case DLM_IO_END: 1641 /* CF_SEND_PENDING cleared */ 1642 break; 1643 case DLM_IO_RESCHED: 1644 /* CF_SEND_PENDING not cleared */ 1645 cond_resched(); 1646 queue_work(io_workqueue, &con->swork); 1647 break; 1648 default: 1649 if (ret < 0) { 1650 close_connection(con, false); 1651 1652 /* CF_SEND_PENDING cleared */ 1653 spin_lock_bh(&con->writequeue_lock); 1654 lowcomms_queue_swork(con); 1655 spin_unlock_bh(&con->writequeue_lock); 1656 break; 1657 } 1658 1659 WARN_ON_ONCE(1); 1660 break; 1661 } 1662 } 1663 1664 static void work_stop(void) 1665 { 1666 if (io_workqueue) { 1667 destroy_workqueue(io_workqueue); 1668 io_workqueue = NULL; 1669 } 1670 1671 if (process_workqueue) { 1672 destroy_workqueue(process_workqueue); 1673 process_workqueue = NULL; 1674 } 1675 } 1676 1677 static int work_start(void) 1678 { 1679 io_workqueue = alloc_workqueue("dlm_io", WQ_HIGHPRI | WQ_MEM_RECLAIM, 1680 0); 1681 if (!io_workqueue) { 1682 log_print("can't start dlm_io"); 1683 return -ENOMEM; 1684 } 1685 1686 /* ordered dlm message process queue, 1687 * should be converted to a tasklet 1688 */ 1689 process_workqueue = alloc_ordered_workqueue("dlm_process", 1690 WQ_HIGHPRI | WQ_MEM_RECLAIM); 1691 if (!process_workqueue) { 1692 log_print("can't start dlm_process"); 1693 destroy_workqueue(io_workqueue); 1694 io_workqueue = NULL; 1695 return -ENOMEM; 1696 } 1697 1698 return 0; 1699 } 1700 1701 void dlm_lowcomms_shutdown(void) 1702 { 1703 /* stop lowcomms_listen_data_ready calls */ 1704 lock_sock(listen_con.sock->sk); 1705 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; 1706 release_sock(listen_con.sock->sk); 1707 1708 cancel_work_sync(&listen_con.rwork); 1709 dlm_close_sock(&listen_con.sock); 1710 1711 flush_workqueue(process_workqueue); 1712 } 1713 1714 void dlm_lowcomms_shutdown_node(int nodeid, bool force) 1715 { 1716 struct connection *con; 1717 int idx; 1718 1719 idx = srcu_read_lock(&connections_srcu); 1720 con = nodeid2con(nodeid, 0); 1721 if (WARN_ON_ONCE(!con)) { 1722 srcu_read_unlock(&connections_srcu, idx); 1723 return; 1724 } 1725 1726 flush_work(&con->swork); 1727 stop_connection_io(con); 1728 WARN_ON_ONCE(!force && !list_empty(&con->writequeue)); 1729 close_connection(con, true); 1730 clean_one_writequeue(con); 1731 if (con->othercon) 1732 clean_one_writequeue(con->othercon); 1733 allow_connection_io(con); 1734 srcu_read_unlock(&connections_srcu, idx); 1735 } 1736 1737 void dlm_lowcomms_stop(void) 1738 { 1739 work_stop(); 1740 dlm_proto_ops = NULL; 1741 } 1742 1743 static int dlm_listen_for_all(void) 1744 { 1745 struct socket *sock; 1746 int result; 1747 1748 log_print("Using %s for communications", 1749 dlm_proto_ops->name); 1750 1751 result = dlm_proto_ops->listen_validate(); 1752 if (result < 0) 1753 return result; 1754 1755 result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family, 1756 SOCK_STREAM, dlm_proto_ops->proto, &sock); 1757 if (result < 0) { 1758 log_print("Can't create comms socket: %d", result); 1759 return result; 1760 } 1761 1762 sock_set_mark(sock->sk, dlm_config.ci_mark); 1763 dlm_proto_ops->listen_sockopts(sock); 1764 1765 result = dlm_proto_ops->listen_bind(sock); 1766 if (result < 0) 1767 goto out; 1768 1769 lock_sock(sock->sk); 1770 listen_sock.sk_data_ready = sock->sk->sk_data_ready; 1771 listen_sock.sk_write_space = sock->sk->sk_write_space; 1772 listen_sock.sk_error_report = sock->sk->sk_error_report; 1773 listen_sock.sk_state_change = sock->sk->sk_state_change; 1774 1775 listen_con.sock = sock; 1776 1777 sock->sk->sk_allocation = GFP_NOFS; 1778 sock->sk->sk_use_task_frag = false; 1779 sock->sk->sk_data_ready = lowcomms_listen_data_ready; 1780 release_sock(sock->sk); 1781 1782 result = sock->ops->listen(sock, 5); 1783 if (result < 0) { 1784 dlm_close_sock(&listen_con.sock); 1785 return result; 1786 } 1787 1788 return 0; 1789 1790 out: 1791 sock_release(sock); 1792 return result; 1793 } 1794 1795 static int dlm_tcp_bind(struct socket *sock) 1796 { 1797 struct sockaddr_storage src_addr; 1798 int result, addr_len; 1799 1800 /* Bind to our cluster-known address connecting to avoid 1801 * routing problems. 1802 */ 1803 memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr)); 1804 make_sockaddr(&src_addr, 0, &addr_len); 1805 1806 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr, 1807 addr_len); 1808 if (result < 0) { 1809 /* This *may* not indicate a critical error */ 1810 log_print("could not bind for connect: %d", result); 1811 } 1812 1813 return 0; 1814 } 1815 1816 static int dlm_tcp_connect(struct connection *con, struct socket *sock, 1817 struct sockaddr *addr, int addr_len) 1818 { 1819 return sock->ops->connect(sock, addr, addr_len, O_NONBLOCK); 1820 } 1821 1822 static int dlm_tcp_listen_validate(void) 1823 { 1824 /* We don't support multi-homed hosts */ 1825 if (dlm_local_count > 1) { 1826 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); 1827 return -EINVAL; 1828 } 1829 1830 return 0; 1831 } 1832 1833 static void dlm_tcp_sockopts(struct socket *sock) 1834 { 1835 /* Turn off Nagle's algorithm */ 1836 tcp_sock_set_nodelay(sock->sk); 1837 } 1838 1839 static void dlm_tcp_listen_sockopts(struct socket *sock) 1840 { 1841 dlm_tcp_sockopts(sock); 1842 sock_set_reuseaddr(sock->sk); 1843 } 1844 1845 static int dlm_tcp_listen_bind(struct socket *sock) 1846 { 1847 int addr_len; 1848 1849 /* Bind to our port */ 1850 make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len); 1851 return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0], 1852 addr_len); 1853 } 1854 1855 static const struct dlm_proto_ops dlm_tcp_ops = { 1856 .name = "TCP", 1857 .proto = IPPROTO_TCP, 1858 .connect = dlm_tcp_connect, 1859 .sockopts = dlm_tcp_sockopts, 1860 .bind = dlm_tcp_bind, 1861 .listen_validate = dlm_tcp_listen_validate, 1862 .listen_sockopts = dlm_tcp_listen_sockopts, 1863 .listen_bind = dlm_tcp_listen_bind, 1864 }; 1865 1866 static int dlm_sctp_bind(struct socket *sock) 1867 { 1868 return sctp_bind_addrs(sock, 0); 1869 } 1870 1871 static int dlm_sctp_connect(struct connection *con, struct socket *sock, 1872 struct sockaddr *addr, int addr_len) 1873 { 1874 int ret; 1875 1876 /* 1877 * Make sock->ops->connect() function return in specified time, 1878 * since O_NONBLOCK argument in connect() function does not work here, 1879 * then, we should restore the default value of this attribute. 1880 */ 1881 sock_set_sndtimeo(sock->sk, 5); 1882 ret = sock->ops->connect(sock, addr, addr_len, 0); 1883 sock_set_sndtimeo(sock->sk, 0); 1884 return ret; 1885 } 1886 1887 static int dlm_sctp_listen_validate(void) 1888 { 1889 if (!IS_ENABLED(CONFIG_IP_SCTP)) { 1890 log_print("SCTP is not enabled by this kernel"); 1891 return -EOPNOTSUPP; 1892 } 1893 1894 request_module("sctp"); 1895 return 0; 1896 } 1897 1898 static int dlm_sctp_bind_listen(struct socket *sock) 1899 { 1900 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port); 1901 } 1902 1903 static void dlm_sctp_sockopts(struct socket *sock) 1904 { 1905 /* Turn off Nagle's algorithm */ 1906 sctp_sock_set_nodelay(sock->sk); 1907 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1908 } 1909 1910 static const struct dlm_proto_ops dlm_sctp_ops = { 1911 .name = "SCTP", 1912 .proto = IPPROTO_SCTP, 1913 .try_new_addr = true, 1914 .connect = dlm_sctp_connect, 1915 .sockopts = dlm_sctp_sockopts, 1916 .bind = dlm_sctp_bind, 1917 .listen_validate = dlm_sctp_listen_validate, 1918 .listen_sockopts = dlm_sctp_sockopts, 1919 .listen_bind = dlm_sctp_bind_listen, 1920 }; 1921 1922 int dlm_lowcomms_start(void) 1923 { 1924 int error; 1925 1926 init_local(); 1927 if (!dlm_local_count) { 1928 error = -ENOTCONN; 1929 log_print("no local IP address has been set"); 1930 goto fail; 1931 } 1932 1933 error = work_start(); 1934 if (error) 1935 goto fail; 1936 1937 /* Start listening */ 1938 switch (dlm_config.ci_protocol) { 1939 case DLM_PROTO_TCP: 1940 dlm_proto_ops = &dlm_tcp_ops; 1941 break; 1942 case DLM_PROTO_SCTP: 1943 dlm_proto_ops = &dlm_sctp_ops; 1944 break; 1945 default: 1946 log_print("Invalid protocol identifier %d set", 1947 dlm_config.ci_protocol); 1948 error = -EINVAL; 1949 goto fail_proto_ops; 1950 } 1951 1952 error = dlm_listen_for_all(); 1953 if (error) 1954 goto fail_listen; 1955 1956 return 0; 1957 1958 fail_listen: 1959 dlm_proto_ops = NULL; 1960 fail_proto_ops: 1961 work_stop(); 1962 fail: 1963 return error; 1964 } 1965 1966 void dlm_lowcomms_init(void) 1967 { 1968 int i; 1969 1970 for (i = 0; i < CONN_HASH_SIZE; i++) 1971 INIT_HLIST_HEAD(&connection_hash[i]); 1972 1973 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1974 } 1975 1976 void dlm_lowcomms_exit(void) 1977 { 1978 struct connection *con; 1979 int i, idx; 1980 1981 idx = srcu_read_lock(&connections_srcu); 1982 for (i = 0; i < CONN_HASH_SIZE; i++) { 1983 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { 1984 spin_lock(&connections_lock); 1985 hlist_del_rcu(&con->list); 1986 spin_unlock(&connections_lock); 1987 1988 if (con->othercon) 1989 call_srcu(&connections_srcu, &con->othercon->rcu, 1990 connection_release); 1991 call_srcu(&connections_srcu, &con->rcu, connection_release); 1992 } 1993 } 1994 srcu_read_unlock(&connections_srcu, idx); 1995 } 1996