1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 #define CONN_HASH_SIZE 32 63 64 /* Number of messages to send before rescheduling */ 65 #define MAX_SEND_MSG_COUNT 25 66 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 67 68 struct connection { 69 struct socket *sock; /* NULL if not connected */ 70 uint32_t nodeid; /* So we know who we are in the list */ 71 struct mutex sock_mutex; 72 unsigned long flags; 73 #define CF_READ_PENDING 1 74 #define CF_WRITE_PENDING 2 75 #define CF_INIT_PENDING 4 76 #define CF_IS_OTHERCON 5 77 #define CF_CLOSE 6 78 #define CF_APP_LIMITED 7 79 #define CF_CLOSING 8 80 #define CF_SHUTDOWN 9 81 #define CF_CONNECTED 10 82 struct list_head writequeue; /* List of outgoing writequeue_entries */ 83 spinlock_t writequeue_lock; 84 void (*connect_action) (struct connection *); /* What to do to connect */ 85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 86 int retries; 87 #define MAX_CONNECT_RETRIES 3 88 struct hlist_node list; 89 struct connection *othercon; 90 struct work_struct rwork; /* Receive workqueue */ 91 struct work_struct swork; /* Send workqueue */ 92 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 93 unsigned char *rx_buf; 94 int rx_buflen; 95 int rx_leftover; 96 struct rcu_head rcu; 97 }; 98 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 99 100 struct listen_connection { 101 struct socket *sock; 102 struct work_struct rwork; 103 }; 104 105 /* An entry waiting to be sent */ 106 struct writequeue_entry { 107 struct list_head list; 108 struct page *page; 109 int offset; 110 int len; 111 int end; 112 int users; 113 struct connection *con; 114 }; 115 116 struct dlm_node_addr { 117 struct list_head list; 118 int nodeid; 119 int addr_count; 120 int curr_addr_index; 121 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 122 }; 123 124 static struct listen_sock_callbacks { 125 void (*sk_error_report)(struct sock *); 126 void (*sk_data_ready)(struct sock *); 127 void (*sk_state_change)(struct sock *); 128 void (*sk_write_space)(struct sock *); 129 } listen_sock; 130 131 static LIST_HEAD(dlm_node_addrs); 132 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 133 134 static struct listen_connection listen_con; 135 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 136 static int dlm_local_count; 137 static int dlm_allow_conn; 138 139 /* Work queues */ 140 static struct workqueue_struct *recv_workqueue; 141 static struct workqueue_struct *send_workqueue; 142 143 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 144 static DEFINE_SPINLOCK(connections_lock); 145 DEFINE_STATIC_SRCU(connections_srcu); 146 147 static void process_recv_sockets(struct work_struct *work); 148 static void process_send_sockets(struct work_struct *work); 149 150 static void sctp_connect_to_sock(struct connection *con); 151 static void tcp_connect_to_sock(struct connection *con); 152 static void dlm_tcp_shutdown(struct connection *con); 153 154 /* This is deliberately very simple because most clusters have simple 155 sequential nodeids, so we should be able to go straight to a connection 156 struct in the array */ 157 static inline int nodeid_hash(int nodeid) 158 { 159 return nodeid & (CONN_HASH_SIZE-1); 160 } 161 162 static struct connection *__find_con(int nodeid) 163 { 164 int r, idx; 165 struct connection *con; 166 167 r = nodeid_hash(nodeid); 168 169 idx = srcu_read_lock(&connections_srcu); 170 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 171 if (con->nodeid == nodeid) { 172 srcu_read_unlock(&connections_srcu, idx); 173 return con; 174 } 175 } 176 srcu_read_unlock(&connections_srcu, idx); 177 178 return NULL; 179 } 180 181 static int dlm_con_init(struct connection *con, int nodeid) 182 { 183 con->rx_buflen = dlm_config.ci_buffer_size; 184 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 185 if (!con->rx_buf) 186 return -ENOMEM; 187 188 con->nodeid = nodeid; 189 mutex_init(&con->sock_mutex); 190 INIT_LIST_HEAD(&con->writequeue); 191 spin_lock_init(&con->writequeue_lock); 192 INIT_WORK(&con->swork, process_send_sockets); 193 INIT_WORK(&con->rwork, process_recv_sockets); 194 init_waitqueue_head(&con->shutdown_wait); 195 196 if (dlm_config.ci_protocol == 0) { 197 con->connect_action = tcp_connect_to_sock; 198 con->shutdown_action = dlm_tcp_shutdown; 199 } else { 200 con->connect_action = sctp_connect_to_sock; 201 } 202 203 return 0; 204 } 205 206 /* 207 * If 'allocation' is zero then we don't attempt to create a new 208 * connection structure for this node. 209 */ 210 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 211 { 212 struct connection *con, *tmp; 213 int r, ret; 214 215 con = __find_con(nodeid); 216 if (con || !alloc) 217 return con; 218 219 con = kzalloc(sizeof(*con), alloc); 220 if (!con) 221 return NULL; 222 223 ret = dlm_con_init(con, nodeid); 224 if (ret) { 225 kfree(con); 226 return NULL; 227 } 228 229 r = nodeid_hash(nodeid); 230 231 spin_lock(&connections_lock); 232 /* Because multiple workqueues/threads calls this function it can 233 * race on multiple cpu's. Instead of locking hot path __find_con() 234 * we just check in rare cases of recently added nodes again 235 * under protection of connections_lock. If this is the case we 236 * abort our connection creation and return the existing connection. 237 */ 238 tmp = __find_con(nodeid); 239 if (tmp) { 240 spin_unlock(&connections_lock); 241 kfree(con->rx_buf); 242 kfree(con); 243 return tmp; 244 } 245 246 hlist_add_head_rcu(&con->list, &connection_hash[r]); 247 spin_unlock(&connections_lock); 248 249 return con; 250 } 251 252 /* Loop round all connections */ 253 static void foreach_conn(void (*conn_func)(struct connection *c)) 254 { 255 int i, idx; 256 struct connection *con; 257 258 idx = srcu_read_lock(&connections_srcu); 259 for (i = 0; i < CONN_HASH_SIZE; i++) { 260 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 261 conn_func(con); 262 } 263 srcu_read_unlock(&connections_srcu, idx); 264 } 265 266 static struct dlm_node_addr *find_node_addr(int nodeid) 267 { 268 struct dlm_node_addr *na; 269 270 list_for_each_entry(na, &dlm_node_addrs, list) { 271 if (na->nodeid == nodeid) 272 return na; 273 } 274 return NULL; 275 } 276 277 static int addr_compare(const struct sockaddr_storage *x, 278 const struct sockaddr_storage *y) 279 { 280 switch (x->ss_family) { 281 case AF_INET: { 282 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 283 struct sockaddr_in *siny = (struct sockaddr_in *)y; 284 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 285 return 0; 286 if (sinx->sin_port != siny->sin_port) 287 return 0; 288 break; 289 } 290 case AF_INET6: { 291 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 292 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 293 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 294 return 0; 295 if (sinx->sin6_port != siny->sin6_port) 296 return 0; 297 break; 298 } 299 default: 300 return 0; 301 } 302 return 1; 303 } 304 305 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 306 struct sockaddr *sa_out, bool try_new_addr) 307 { 308 struct sockaddr_storage sas; 309 struct dlm_node_addr *na; 310 311 if (!dlm_local_count) 312 return -1; 313 314 spin_lock(&dlm_node_addrs_spin); 315 na = find_node_addr(nodeid); 316 if (na && na->addr_count) { 317 memcpy(&sas, na->addr[na->curr_addr_index], 318 sizeof(struct sockaddr_storage)); 319 320 if (try_new_addr) { 321 na->curr_addr_index++; 322 if (na->curr_addr_index == na->addr_count) 323 na->curr_addr_index = 0; 324 } 325 } 326 spin_unlock(&dlm_node_addrs_spin); 327 328 if (!na) 329 return -EEXIST; 330 331 if (!na->addr_count) 332 return -ENOENT; 333 334 if (sas_out) 335 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 336 337 if (!sa_out) 338 return 0; 339 340 if (dlm_local_addr[0]->ss_family == AF_INET) { 341 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 342 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 343 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 344 } else { 345 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 346 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 347 ret6->sin6_addr = in6->sin6_addr; 348 } 349 350 return 0; 351 } 352 353 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 354 { 355 struct dlm_node_addr *na; 356 int rv = -EEXIST; 357 int addr_i; 358 359 spin_lock(&dlm_node_addrs_spin); 360 list_for_each_entry(na, &dlm_node_addrs, list) { 361 if (!na->addr_count) 362 continue; 363 364 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 365 if (addr_compare(na->addr[addr_i], addr)) { 366 *nodeid = na->nodeid; 367 rv = 0; 368 goto unlock; 369 } 370 } 371 } 372 unlock: 373 spin_unlock(&dlm_node_addrs_spin); 374 return rv; 375 } 376 377 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 378 { 379 struct sockaddr_storage *new_addr; 380 struct dlm_node_addr *new_node, *na; 381 382 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 383 if (!new_node) 384 return -ENOMEM; 385 386 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 387 if (!new_addr) { 388 kfree(new_node); 389 return -ENOMEM; 390 } 391 392 memcpy(new_addr, addr, len); 393 394 spin_lock(&dlm_node_addrs_spin); 395 na = find_node_addr(nodeid); 396 if (!na) { 397 new_node->nodeid = nodeid; 398 new_node->addr[0] = new_addr; 399 new_node->addr_count = 1; 400 list_add(&new_node->list, &dlm_node_addrs); 401 spin_unlock(&dlm_node_addrs_spin); 402 return 0; 403 } 404 405 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 406 spin_unlock(&dlm_node_addrs_spin); 407 kfree(new_addr); 408 kfree(new_node); 409 return -ENOSPC; 410 } 411 412 na->addr[na->addr_count++] = new_addr; 413 spin_unlock(&dlm_node_addrs_spin); 414 kfree(new_node); 415 return 0; 416 } 417 418 /* Data available on socket or listen socket received a connect */ 419 static void lowcomms_data_ready(struct sock *sk) 420 { 421 struct connection *con; 422 423 read_lock_bh(&sk->sk_callback_lock); 424 con = sock2con(sk); 425 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 426 queue_work(recv_workqueue, &con->rwork); 427 read_unlock_bh(&sk->sk_callback_lock); 428 } 429 430 static void lowcomms_listen_data_ready(struct sock *sk) 431 { 432 queue_work(recv_workqueue, &listen_con.rwork); 433 } 434 435 static void lowcomms_write_space(struct sock *sk) 436 { 437 struct connection *con; 438 439 read_lock_bh(&sk->sk_callback_lock); 440 con = sock2con(sk); 441 if (!con) 442 goto out; 443 444 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { 445 log_print("successful connected to node %d", con->nodeid); 446 queue_work(send_workqueue, &con->swork); 447 goto out; 448 } 449 450 clear_bit(SOCK_NOSPACE, &con->sock->flags); 451 452 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 453 con->sock->sk->sk_write_pending--; 454 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 455 } 456 457 queue_work(send_workqueue, &con->swork); 458 out: 459 read_unlock_bh(&sk->sk_callback_lock); 460 } 461 462 static inline void lowcomms_connect_sock(struct connection *con) 463 { 464 if (test_bit(CF_CLOSE, &con->flags)) 465 return; 466 queue_work(send_workqueue, &con->swork); 467 cond_resched(); 468 } 469 470 static void lowcomms_state_change(struct sock *sk) 471 { 472 /* SCTP layer is not calling sk_data_ready when the connection 473 * is done, so we catch the signal through here. Also, it 474 * doesn't switch socket state when entering shutdown, so we 475 * skip the write in that case. 476 */ 477 if (sk->sk_shutdown) { 478 if (sk->sk_shutdown == RCV_SHUTDOWN) 479 lowcomms_data_ready(sk); 480 } else if (sk->sk_state == TCP_ESTABLISHED) { 481 lowcomms_write_space(sk); 482 } 483 } 484 485 int dlm_lowcomms_connect_node(int nodeid) 486 { 487 struct connection *con; 488 489 if (nodeid == dlm_our_nodeid()) 490 return 0; 491 492 con = nodeid2con(nodeid, GFP_NOFS); 493 if (!con) 494 return -ENOMEM; 495 lowcomms_connect_sock(con); 496 return 0; 497 } 498 499 static void lowcomms_error_report(struct sock *sk) 500 { 501 struct connection *con; 502 struct sockaddr_storage saddr; 503 void (*orig_report)(struct sock *) = NULL; 504 505 read_lock_bh(&sk->sk_callback_lock); 506 con = sock2con(sk); 507 if (con == NULL) 508 goto out; 509 510 orig_report = listen_sock.sk_error_report; 511 if (con->sock == NULL || 512 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 513 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 514 "sending to node %d, port %d, " 515 "sk_err=%d/%d\n", dlm_our_nodeid(), 516 con->nodeid, dlm_config.ci_tcp_port, 517 sk->sk_err, sk->sk_err_soft); 518 } else if (saddr.ss_family == AF_INET) { 519 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 520 521 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 522 "sending to node %d at %pI4, port %d, " 523 "sk_err=%d/%d\n", dlm_our_nodeid(), 524 con->nodeid, &sin4->sin_addr.s_addr, 525 dlm_config.ci_tcp_port, sk->sk_err, 526 sk->sk_err_soft); 527 } else { 528 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 529 530 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 531 "sending to node %d at %u.%u.%u.%u, " 532 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 533 con->nodeid, sin6->sin6_addr.s6_addr32[0], 534 sin6->sin6_addr.s6_addr32[1], 535 sin6->sin6_addr.s6_addr32[2], 536 sin6->sin6_addr.s6_addr32[3], 537 dlm_config.ci_tcp_port, sk->sk_err, 538 sk->sk_err_soft); 539 } 540 out: 541 read_unlock_bh(&sk->sk_callback_lock); 542 if (orig_report) 543 orig_report(sk); 544 } 545 546 /* Note: sk_callback_lock must be locked before calling this function. */ 547 static void save_listen_callbacks(struct socket *sock) 548 { 549 struct sock *sk = sock->sk; 550 551 listen_sock.sk_data_ready = sk->sk_data_ready; 552 listen_sock.sk_state_change = sk->sk_state_change; 553 listen_sock.sk_write_space = sk->sk_write_space; 554 listen_sock.sk_error_report = sk->sk_error_report; 555 } 556 557 static void restore_callbacks(struct socket *sock) 558 { 559 struct sock *sk = sock->sk; 560 561 write_lock_bh(&sk->sk_callback_lock); 562 sk->sk_user_data = NULL; 563 sk->sk_data_ready = listen_sock.sk_data_ready; 564 sk->sk_state_change = listen_sock.sk_state_change; 565 sk->sk_write_space = listen_sock.sk_write_space; 566 sk->sk_error_report = listen_sock.sk_error_report; 567 write_unlock_bh(&sk->sk_callback_lock); 568 } 569 570 static void add_listen_sock(struct socket *sock, struct listen_connection *con) 571 { 572 struct sock *sk = sock->sk; 573 574 write_lock_bh(&sk->sk_callback_lock); 575 save_listen_callbacks(sock); 576 con->sock = sock; 577 578 sk->sk_user_data = con; 579 sk->sk_allocation = GFP_NOFS; 580 /* Install a data_ready callback */ 581 sk->sk_data_ready = lowcomms_listen_data_ready; 582 write_unlock_bh(&sk->sk_callback_lock); 583 } 584 585 /* Make a socket active */ 586 static void add_sock(struct socket *sock, struct connection *con) 587 { 588 struct sock *sk = sock->sk; 589 590 write_lock_bh(&sk->sk_callback_lock); 591 con->sock = sock; 592 593 sk->sk_user_data = con; 594 /* Install a data_ready callback */ 595 sk->sk_data_ready = lowcomms_data_ready; 596 sk->sk_write_space = lowcomms_write_space; 597 sk->sk_state_change = lowcomms_state_change; 598 sk->sk_allocation = GFP_NOFS; 599 sk->sk_error_report = lowcomms_error_report; 600 write_unlock_bh(&sk->sk_callback_lock); 601 } 602 603 /* Add the port number to an IPv6 or 4 sockaddr and return the address 604 length */ 605 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 606 int *addr_len) 607 { 608 saddr->ss_family = dlm_local_addr[0]->ss_family; 609 if (saddr->ss_family == AF_INET) { 610 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 611 in4_addr->sin_port = cpu_to_be16(port); 612 *addr_len = sizeof(struct sockaddr_in); 613 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 614 } else { 615 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 616 in6_addr->sin6_port = cpu_to_be16(port); 617 *addr_len = sizeof(struct sockaddr_in6); 618 } 619 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 620 } 621 622 static void dlm_close_sock(struct socket **sock) 623 { 624 if (*sock) { 625 restore_callbacks(*sock); 626 sock_release(*sock); 627 *sock = NULL; 628 } 629 } 630 631 /* Close a remote connection and tidy up */ 632 static void close_connection(struct connection *con, bool and_other, 633 bool tx, bool rx) 634 { 635 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 636 637 if (tx && !closing && cancel_work_sync(&con->swork)) { 638 log_print("canceled swork for node %d", con->nodeid); 639 clear_bit(CF_WRITE_PENDING, &con->flags); 640 } 641 if (rx && !closing && cancel_work_sync(&con->rwork)) { 642 log_print("canceled rwork for node %d", con->nodeid); 643 clear_bit(CF_READ_PENDING, &con->flags); 644 } 645 646 mutex_lock(&con->sock_mutex); 647 dlm_close_sock(&con->sock); 648 649 if (con->othercon && and_other) { 650 /* Will only re-enter once. */ 651 close_connection(con->othercon, false, true, true); 652 } 653 654 con->rx_leftover = 0; 655 con->retries = 0; 656 clear_bit(CF_CONNECTED, &con->flags); 657 mutex_unlock(&con->sock_mutex); 658 clear_bit(CF_CLOSING, &con->flags); 659 } 660 661 static void shutdown_connection(struct connection *con) 662 { 663 int ret; 664 665 if (cancel_work_sync(&con->swork)) { 666 log_print("canceled swork for node %d", con->nodeid); 667 clear_bit(CF_WRITE_PENDING, &con->flags); 668 } 669 670 mutex_lock(&con->sock_mutex); 671 /* nothing to shutdown */ 672 if (!con->sock) { 673 mutex_unlock(&con->sock_mutex); 674 return; 675 } 676 677 set_bit(CF_SHUTDOWN, &con->flags); 678 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 679 mutex_unlock(&con->sock_mutex); 680 if (ret) { 681 log_print("Connection %p failed to shutdown: %d will force close", 682 con, ret); 683 goto force_close; 684 } else { 685 ret = wait_event_timeout(con->shutdown_wait, 686 !test_bit(CF_SHUTDOWN, &con->flags), 687 DLM_SHUTDOWN_WAIT_TIMEOUT); 688 if (ret == 0) { 689 log_print("Connection %p shutdown timed out, will force close", 690 con); 691 goto force_close; 692 } 693 } 694 695 return; 696 697 force_close: 698 clear_bit(CF_SHUTDOWN, &con->flags); 699 close_connection(con, false, true, true); 700 } 701 702 static void dlm_tcp_shutdown(struct connection *con) 703 { 704 if (con->othercon) 705 shutdown_connection(con->othercon); 706 shutdown_connection(con); 707 } 708 709 static int con_realloc_receive_buf(struct connection *con, int newlen) 710 { 711 unsigned char *newbuf; 712 713 newbuf = kmalloc(newlen, GFP_NOFS); 714 if (!newbuf) 715 return -ENOMEM; 716 717 /* copy any leftover from last receive */ 718 if (con->rx_leftover) 719 memmove(newbuf, con->rx_buf, con->rx_leftover); 720 721 /* swap to new buffer space */ 722 kfree(con->rx_buf); 723 con->rx_buflen = newlen; 724 con->rx_buf = newbuf; 725 726 return 0; 727 } 728 729 /* Data received from remote end */ 730 static int receive_from_sock(struct connection *con) 731 { 732 int call_again_soon = 0; 733 struct msghdr msg; 734 struct kvec iov; 735 int ret, buflen; 736 737 mutex_lock(&con->sock_mutex); 738 739 if (con->sock == NULL) { 740 ret = -EAGAIN; 741 goto out_close; 742 } 743 744 /* realloc if we get new buffer size to read out */ 745 buflen = dlm_config.ci_buffer_size; 746 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 747 ret = con_realloc_receive_buf(con, buflen); 748 if (ret < 0) 749 goto out_resched; 750 } 751 752 /* calculate new buffer parameter regarding last receive and 753 * possible leftover bytes 754 */ 755 iov.iov_base = con->rx_buf + con->rx_leftover; 756 iov.iov_len = con->rx_buflen - con->rx_leftover; 757 758 memset(&msg, 0, sizeof(msg)); 759 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 760 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 761 msg.msg_flags); 762 if (ret <= 0) 763 goto out_close; 764 else if (ret == iov.iov_len) 765 call_again_soon = 1; 766 767 /* new buflen according readed bytes and leftover from last receive */ 768 buflen = ret + con->rx_leftover; 769 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 770 if (ret < 0) 771 goto out_close; 772 773 /* calculate leftover bytes from process and put it into begin of 774 * the receive buffer, so next receive we have the full message 775 * at the start address of the receive buffer. 776 */ 777 con->rx_leftover = buflen - ret; 778 if (con->rx_leftover) { 779 memmove(con->rx_buf, con->rx_buf + ret, 780 con->rx_leftover); 781 call_again_soon = true; 782 } 783 784 if (call_again_soon) 785 goto out_resched; 786 787 mutex_unlock(&con->sock_mutex); 788 return 0; 789 790 out_resched: 791 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 792 queue_work(recv_workqueue, &con->rwork); 793 mutex_unlock(&con->sock_mutex); 794 return -EAGAIN; 795 796 out_close: 797 mutex_unlock(&con->sock_mutex); 798 if (ret != -EAGAIN) { 799 /* Reconnect when there is something to send */ 800 close_connection(con, false, true, false); 801 if (ret == 0) { 802 log_print("connection %p got EOF from %d", 803 con, con->nodeid); 804 /* handling for tcp shutdown */ 805 clear_bit(CF_SHUTDOWN, &con->flags); 806 wake_up(&con->shutdown_wait); 807 /* signal to breaking receive worker */ 808 ret = -1; 809 } 810 } 811 return ret; 812 } 813 814 /* Listening socket is busy, accept a connection */ 815 static int accept_from_sock(struct listen_connection *con) 816 { 817 int result; 818 struct sockaddr_storage peeraddr; 819 struct socket *newsock; 820 int len; 821 int nodeid; 822 struct connection *newcon; 823 struct connection *addcon; 824 unsigned int mark; 825 826 if (!dlm_allow_conn) { 827 return -1; 828 } 829 830 if (!con->sock) 831 return -ENOTCONN; 832 833 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 834 if (result < 0) 835 goto accept_err; 836 837 /* Get the connected socket's peer */ 838 memset(&peeraddr, 0, sizeof(peeraddr)); 839 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 840 if (len < 0) { 841 result = -ECONNABORTED; 842 goto accept_err; 843 } 844 845 /* Get the new node's NODEID */ 846 make_sockaddr(&peeraddr, 0, &len); 847 if (addr_to_nodeid(&peeraddr, &nodeid)) { 848 unsigned char *b=(unsigned char *)&peeraddr; 849 log_print("connect from non cluster node"); 850 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 851 b, sizeof(struct sockaddr_storage)); 852 sock_release(newsock); 853 return -1; 854 } 855 856 dlm_comm_mark(nodeid, &mark); 857 sock_set_mark(newsock->sk, mark); 858 859 log_print("got connection from %d", nodeid); 860 861 /* Check to see if we already have a connection to this node. This 862 * could happen if the two nodes initiate a connection at roughly 863 * the same time and the connections cross on the wire. 864 * In this case we store the incoming one in "othercon" 865 */ 866 newcon = nodeid2con(nodeid, GFP_NOFS); 867 if (!newcon) { 868 result = -ENOMEM; 869 goto accept_err; 870 } 871 872 mutex_lock(&newcon->sock_mutex); 873 if (newcon->sock) { 874 struct connection *othercon = newcon->othercon; 875 876 if (!othercon) { 877 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 878 if (!othercon) { 879 log_print("failed to allocate incoming socket"); 880 mutex_unlock(&newcon->sock_mutex); 881 result = -ENOMEM; 882 goto accept_err; 883 } 884 885 result = dlm_con_init(othercon, nodeid); 886 if (result < 0) { 887 kfree(othercon); 888 goto accept_err; 889 } 890 891 newcon->othercon = othercon; 892 } else { 893 /* close other sock con if we have something new */ 894 close_connection(othercon, false, true, false); 895 } 896 897 mutex_lock_nested(&othercon->sock_mutex, 1); 898 add_sock(newsock, othercon); 899 addcon = othercon; 900 mutex_unlock(&othercon->sock_mutex); 901 } 902 else { 903 /* accept copies the sk after we've saved the callbacks, so we 904 don't want to save them a second time or comm errors will 905 result in calling sk_error_report recursively. */ 906 add_sock(newsock, newcon); 907 addcon = newcon; 908 } 909 910 mutex_unlock(&newcon->sock_mutex); 911 912 /* 913 * Add it to the active queue in case we got data 914 * between processing the accept adding the socket 915 * to the read_sockets list 916 */ 917 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 918 queue_work(recv_workqueue, &addcon->rwork); 919 920 return 0; 921 922 accept_err: 923 if (newsock) 924 sock_release(newsock); 925 926 if (result != -EAGAIN) 927 log_print("error accepting connection from node: %d", result); 928 return result; 929 } 930 931 static void free_entry(struct writequeue_entry *e) 932 { 933 __free_page(e->page); 934 kfree(e); 935 } 936 937 /* 938 * writequeue_entry_complete - try to delete and free write queue entry 939 * @e: write queue entry to try to delete 940 * @completed: bytes completed 941 * 942 * writequeue_lock must be held. 943 */ 944 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 945 { 946 e->offset += completed; 947 e->len -= completed; 948 949 if (e->len == 0 && e->users == 0) { 950 list_del(&e->list); 951 free_entry(e); 952 } 953 } 954 955 /* 956 * sctp_bind_addrs - bind a SCTP socket to all our addresses 957 */ 958 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 959 { 960 struct sockaddr_storage localaddr; 961 struct sockaddr *addr = (struct sockaddr *)&localaddr; 962 int i, addr_len, result = 0; 963 964 for (i = 0; i < dlm_local_count; i++) { 965 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 966 make_sockaddr(&localaddr, port, &addr_len); 967 968 if (!i) 969 result = kernel_bind(sock, addr, addr_len); 970 else 971 result = sock_bind_add(sock->sk, addr, addr_len); 972 973 if (result < 0) { 974 log_print("Can't bind to %d addr number %d, %d.\n", 975 port, i + 1, result); 976 break; 977 } 978 } 979 return result; 980 } 981 982 /* Initiate an SCTP association. 983 This is a special case of send_to_sock() in that we don't yet have a 984 peeled-off socket for this association, so we use the listening socket 985 and add the primary IP address of the remote node. 986 */ 987 static void sctp_connect_to_sock(struct connection *con) 988 { 989 struct sockaddr_storage daddr; 990 int result; 991 int addr_len; 992 struct socket *sock; 993 unsigned int mark; 994 995 dlm_comm_mark(con->nodeid, &mark); 996 997 mutex_lock(&con->sock_mutex); 998 999 /* Some odd races can cause double-connects, ignore them */ 1000 if (con->retries++ > MAX_CONNECT_RETRIES) 1001 goto out; 1002 1003 if (con->sock) { 1004 log_print("node %d already connected.", con->nodeid); 1005 goto out; 1006 } 1007 1008 memset(&daddr, 0, sizeof(daddr)); 1009 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); 1010 if (result < 0) { 1011 log_print("no address for nodeid %d", con->nodeid); 1012 goto out; 1013 } 1014 1015 /* Create a socket to communicate with */ 1016 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1017 SOCK_STREAM, IPPROTO_SCTP, &sock); 1018 if (result < 0) 1019 goto socket_err; 1020 1021 sock_set_mark(sock->sk, mark); 1022 1023 add_sock(sock, con); 1024 1025 /* Bind to all addresses. */ 1026 if (sctp_bind_addrs(con->sock, 0)) 1027 goto bind_err; 1028 1029 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1030 1031 log_print("connecting to %d", con->nodeid); 1032 1033 /* Turn off Nagle's algorithm */ 1034 sctp_sock_set_nodelay(sock->sk); 1035 1036 /* 1037 * Make sock->ops->connect() function return in specified time, 1038 * since O_NONBLOCK argument in connect() function does not work here, 1039 * then, we should restore the default value of this attribute. 1040 */ 1041 sock_set_sndtimeo(sock->sk, 5); 1042 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1043 0); 1044 sock_set_sndtimeo(sock->sk, 0); 1045 1046 if (result == -EINPROGRESS) 1047 result = 0; 1048 if (result == 0) { 1049 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) 1050 log_print("successful connected to node %d", con->nodeid); 1051 goto out; 1052 } 1053 1054 bind_err: 1055 con->sock = NULL; 1056 sock_release(sock); 1057 1058 socket_err: 1059 /* 1060 * Some errors are fatal and this list might need adjusting. For other 1061 * errors we try again until the max number of retries is reached. 1062 */ 1063 if (result != -EHOSTUNREACH && 1064 result != -ENETUNREACH && 1065 result != -ENETDOWN && 1066 result != -EINVAL && 1067 result != -EPROTONOSUPPORT) { 1068 log_print("connect %d try %d error %d", con->nodeid, 1069 con->retries, result); 1070 mutex_unlock(&con->sock_mutex); 1071 msleep(1000); 1072 lowcomms_connect_sock(con); 1073 return; 1074 } 1075 1076 out: 1077 mutex_unlock(&con->sock_mutex); 1078 } 1079 1080 /* Connect a new socket to its peer */ 1081 static void tcp_connect_to_sock(struct connection *con) 1082 { 1083 struct sockaddr_storage saddr, src_addr; 1084 int addr_len; 1085 struct socket *sock = NULL; 1086 unsigned int mark; 1087 int result; 1088 1089 dlm_comm_mark(con->nodeid, &mark); 1090 1091 mutex_lock(&con->sock_mutex); 1092 if (con->retries++ > MAX_CONNECT_RETRIES) 1093 goto out; 1094 1095 /* Some odd races can cause double-connects, ignore them */ 1096 if (con->sock) 1097 goto out; 1098 1099 /* Create a socket to communicate with */ 1100 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1101 SOCK_STREAM, IPPROTO_TCP, &sock); 1102 if (result < 0) 1103 goto out_err; 1104 1105 sock_set_mark(sock->sk, mark); 1106 1107 memset(&saddr, 0, sizeof(saddr)); 1108 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1109 if (result < 0) { 1110 log_print("no address for nodeid %d", con->nodeid); 1111 goto out_err; 1112 } 1113 1114 add_sock(sock, con); 1115 1116 /* Bind to our cluster-known address connecting to avoid 1117 routing problems */ 1118 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1119 make_sockaddr(&src_addr, 0, &addr_len); 1120 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1121 addr_len); 1122 if (result < 0) { 1123 log_print("could not bind for connect: %d", result); 1124 /* This *may* not indicate a critical error */ 1125 } 1126 1127 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1128 1129 log_print("connecting to %d", con->nodeid); 1130 1131 /* Turn off Nagle's algorithm */ 1132 tcp_sock_set_nodelay(sock->sk); 1133 1134 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1135 O_NONBLOCK); 1136 if (result == -EINPROGRESS) 1137 result = 0; 1138 if (result == 0) 1139 goto out; 1140 1141 out_err: 1142 if (con->sock) { 1143 sock_release(con->sock); 1144 con->sock = NULL; 1145 } else if (sock) { 1146 sock_release(sock); 1147 } 1148 /* 1149 * Some errors are fatal and this list might need adjusting. For other 1150 * errors we try again until the max number of retries is reached. 1151 */ 1152 if (result != -EHOSTUNREACH && 1153 result != -ENETUNREACH && 1154 result != -ENETDOWN && 1155 result != -EINVAL && 1156 result != -EPROTONOSUPPORT) { 1157 log_print("connect %d try %d error %d", con->nodeid, 1158 con->retries, result); 1159 mutex_unlock(&con->sock_mutex); 1160 msleep(1000); 1161 lowcomms_connect_sock(con); 1162 return; 1163 } 1164 out: 1165 mutex_unlock(&con->sock_mutex); 1166 return; 1167 } 1168 1169 /* On error caller must run dlm_close_sock() for the 1170 * listen connection socket. 1171 */ 1172 static int tcp_create_listen_sock(struct listen_connection *con, 1173 struct sockaddr_storage *saddr) 1174 { 1175 struct socket *sock = NULL; 1176 int result = 0; 1177 int addr_len; 1178 1179 if (dlm_local_addr[0]->ss_family == AF_INET) 1180 addr_len = sizeof(struct sockaddr_in); 1181 else 1182 addr_len = sizeof(struct sockaddr_in6); 1183 1184 /* Create a socket to communicate with */ 1185 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1186 SOCK_STREAM, IPPROTO_TCP, &sock); 1187 if (result < 0) { 1188 log_print("Can't create listening comms socket"); 1189 goto create_out; 1190 } 1191 1192 sock_set_mark(sock->sk, dlm_config.ci_mark); 1193 1194 /* Turn off Nagle's algorithm */ 1195 tcp_sock_set_nodelay(sock->sk); 1196 1197 sock_set_reuseaddr(sock->sk); 1198 1199 add_listen_sock(sock, con); 1200 1201 /* Bind to our port */ 1202 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1203 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1204 if (result < 0) { 1205 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1206 goto create_out; 1207 } 1208 sock_set_keepalive(sock->sk); 1209 1210 result = sock->ops->listen(sock, 5); 1211 if (result < 0) { 1212 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1213 goto create_out; 1214 } 1215 1216 return 0; 1217 1218 create_out: 1219 return result; 1220 } 1221 1222 /* Get local addresses */ 1223 static void init_local(void) 1224 { 1225 struct sockaddr_storage sas, *addr; 1226 int i; 1227 1228 dlm_local_count = 0; 1229 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1230 if (dlm_our_addr(&sas, i)) 1231 break; 1232 1233 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1234 if (!addr) 1235 break; 1236 dlm_local_addr[dlm_local_count++] = addr; 1237 } 1238 } 1239 1240 static void deinit_local(void) 1241 { 1242 int i; 1243 1244 for (i = 0; i < dlm_local_count; i++) 1245 kfree(dlm_local_addr[i]); 1246 } 1247 1248 /* Initialise SCTP socket and bind to all interfaces 1249 * On error caller must run dlm_close_sock() for the 1250 * listen connection socket. 1251 */ 1252 static int sctp_listen_for_all(struct listen_connection *con) 1253 { 1254 struct socket *sock = NULL; 1255 int result = -EINVAL; 1256 1257 log_print("Using SCTP for communications"); 1258 1259 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1260 SOCK_STREAM, IPPROTO_SCTP, &sock); 1261 if (result < 0) { 1262 log_print("Can't create comms socket, check SCTP is loaded"); 1263 goto out; 1264 } 1265 1266 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1267 sock_set_mark(sock->sk, dlm_config.ci_mark); 1268 sctp_sock_set_nodelay(sock->sk); 1269 1270 add_listen_sock(sock, con); 1271 1272 /* Bind to all addresses. */ 1273 result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port); 1274 if (result < 0) 1275 goto out; 1276 1277 result = sock->ops->listen(sock, 5); 1278 if (result < 0) { 1279 log_print("Can't set socket listening"); 1280 goto out; 1281 } 1282 1283 return 0; 1284 1285 out: 1286 return result; 1287 } 1288 1289 static int tcp_listen_for_all(void) 1290 { 1291 /* We don't support multi-homed hosts */ 1292 if (dlm_local_count > 1) { 1293 log_print("TCP protocol can't handle multi-homed hosts, " 1294 "try SCTP"); 1295 return -EINVAL; 1296 } 1297 1298 log_print("Using TCP for communications"); 1299 1300 return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]); 1301 } 1302 1303 1304 1305 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1306 gfp_t allocation) 1307 { 1308 struct writequeue_entry *entry; 1309 1310 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1311 if (!entry) 1312 return NULL; 1313 1314 entry->page = alloc_page(allocation); 1315 if (!entry->page) { 1316 kfree(entry); 1317 return NULL; 1318 } 1319 1320 entry->offset = 0; 1321 entry->len = 0; 1322 entry->end = 0; 1323 entry->users = 0; 1324 entry->con = con; 1325 1326 return entry; 1327 } 1328 1329 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1330 { 1331 struct connection *con; 1332 struct writequeue_entry *e; 1333 int offset = 0; 1334 1335 if (len > LOWCOMMS_MAX_TX_BUFFER_LEN) { 1336 BUILD_BUG_ON(PAGE_SIZE < LOWCOMMS_MAX_TX_BUFFER_LEN); 1337 log_print("failed to allocate a buffer of size %d", len); 1338 return NULL; 1339 } 1340 1341 con = nodeid2con(nodeid, allocation); 1342 if (!con) 1343 return NULL; 1344 1345 spin_lock(&con->writequeue_lock); 1346 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1347 if ((&e->list == &con->writequeue) || 1348 (PAGE_SIZE - e->end < len)) { 1349 e = NULL; 1350 } else { 1351 offset = e->end; 1352 e->end += len; 1353 e->users++; 1354 } 1355 spin_unlock(&con->writequeue_lock); 1356 1357 if (e) { 1358 got_one: 1359 *ppc = page_address(e->page) + offset; 1360 return e; 1361 } 1362 1363 e = new_writequeue_entry(con, allocation); 1364 if (e) { 1365 spin_lock(&con->writequeue_lock); 1366 offset = e->end; 1367 e->end += len; 1368 e->users++; 1369 list_add_tail(&e->list, &con->writequeue); 1370 spin_unlock(&con->writequeue_lock); 1371 goto got_one; 1372 } 1373 return NULL; 1374 } 1375 1376 void dlm_lowcomms_commit_buffer(void *mh) 1377 { 1378 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1379 struct connection *con = e->con; 1380 int users; 1381 1382 spin_lock(&con->writequeue_lock); 1383 users = --e->users; 1384 if (users) 1385 goto out; 1386 e->len = e->end - e->offset; 1387 spin_unlock(&con->writequeue_lock); 1388 1389 queue_work(send_workqueue, &con->swork); 1390 return; 1391 1392 out: 1393 spin_unlock(&con->writequeue_lock); 1394 return; 1395 } 1396 1397 /* Send a message */ 1398 static void send_to_sock(struct connection *con) 1399 { 1400 int ret = 0; 1401 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1402 struct writequeue_entry *e; 1403 int len, offset; 1404 int count = 0; 1405 1406 mutex_lock(&con->sock_mutex); 1407 if (con->sock == NULL) 1408 goto out_connect; 1409 1410 spin_lock(&con->writequeue_lock); 1411 for (;;) { 1412 e = list_entry(con->writequeue.next, struct writequeue_entry, 1413 list); 1414 if ((struct list_head *) e == &con->writequeue) 1415 break; 1416 1417 len = e->len; 1418 offset = e->offset; 1419 BUG_ON(len == 0 && e->users == 0); 1420 spin_unlock(&con->writequeue_lock); 1421 1422 ret = 0; 1423 if (len) { 1424 ret = kernel_sendpage(con->sock, e->page, offset, len, 1425 msg_flags); 1426 if (ret == -EAGAIN || ret == 0) { 1427 if (ret == -EAGAIN && 1428 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1429 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1430 /* Notify TCP that we're limited by the 1431 * application window size. 1432 */ 1433 set_bit(SOCK_NOSPACE, &con->sock->flags); 1434 con->sock->sk->sk_write_pending++; 1435 } 1436 cond_resched(); 1437 goto out; 1438 } else if (ret < 0) 1439 goto send_error; 1440 } 1441 1442 /* Don't starve people filling buffers */ 1443 if (++count >= MAX_SEND_MSG_COUNT) { 1444 cond_resched(); 1445 count = 0; 1446 } 1447 1448 spin_lock(&con->writequeue_lock); 1449 writequeue_entry_complete(e, ret); 1450 } 1451 spin_unlock(&con->writequeue_lock); 1452 out: 1453 mutex_unlock(&con->sock_mutex); 1454 return; 1455 1456 send_error: 1457 mutex_unlock(&con->sock_mutex); 1458 close_connection(con, false, false, true); 1459 /* Requeue the send work. When the work daemon runs again, it will try 1460 a new connection, then call this function again. */ 1461 queue_work(send_workqueue, &con->swork); 1462 return; 1463 1464 out_connect: 1465 mutex_unlock(&con->sock_mutex); 1466 queue_work(send_workqueue, &con->swork); 1467 cond_resched(); 1468 } 1469 1470 static void clean_one_writequeue(struct connection *con) 1471 { 1472 struct writequeue_entry *e, *safe; 1473 1474 spin_lock(&con->writequeue_lock); 1475 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1476 list_del(&e->list); 1477 free_entry(e); 1478 } 1479 spin_unlock(&con->writequeue_lock); 1480 } 1481 1482 /* Called from recovery when it knows that a node has 1483 left the cluster */ 1484 int dlm_lowcomms_close(int nodeid) 1485 { 1486 struct connection *con; 1487 struct dlm_node_addr *na; 1488 1489 log_print("closing connection to node %d", nodeid); 1490 con = nodeid2con(nodeid, 0); 1491 if (con) { 1492 set_bit(CF_CLOSE, &con->flags); 1493 close_connection(con, true, true, true); 1494 clean_one_writequeue(con); 1495 if (con->othercon) 1496 clean_one_writequeue(con->othercon); 1497 } 1498 1499 spin_lock(&dlm_node_addrs_spin); 1500 na = find_node_addr(nodeid); 1501 if (na) { 1502 list_del(&na->list); 1503 while (na->addr_count--) 1504 kfree(na->addr[na->addr_count]); 1505 kfree(na); 1506 } 1507 spin_unlock(&dlm_node_addrs_spin); 1508 1509 return 0; 1510 } 1511 1512 /* Receive workqueue function */ 1513 static void process_recv_sockets(struct work_struct *work) 1514 { 1515 struct connection *con = container_of(work, struct connection, rwork); 1516 int err; 1517 1518 clear_bit(CF_READ_PENDING, &con->flags); 1519 do { 1520 err = receive_from_sock(con); 1521 } while (!err); 1522 } 1523 1524 static void process_listen_recv_socket(struct work_struct *work) 1525 { 1526 accept_from_sock(&listen_con); 1527 } 1528 1529 /* Send workqueue function */ 1530 static void process_send_sockets(struct work_struct *work) 1531 { 1532 struct connection *con = container_of(work, struct connection, swork); 1533 1534 clear_bit(CF_WRITE_PENDING, &con->flags); 1535 if (con->sock == NULL) /* not mutex protected so check it inside too */ 1536 con->connect_action(con); 1537 if (!list_empty(&con->writequeue)) 1538 send_to_sock(con); 1539 } 1540 1541 static void work_stop(void) 1542 { 1543 if (recv_workqueue) 1544 destroy_workqueue(recv_workqueue); 1545 if (send_workqueue) 1546 destroy_workqueue(send_workqueue); 1547 } 1548 1549 static int work_start(void) 1550 { 1551 recv_workqueue = alloc_workqueue("dlm_recv", 1552 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1553 if (!recv_workqueue) { 1554 log_print("can't start dlm_recv"); 1555 return -ENOMEM; 1556 } 1557 1558 send_workqueue = alloc_workqueue("dlm_send", 1559 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1560 if (!send_workqueue) { 1561 log_print("can't start dlm_send"); 1562 destroy_workqueue(recv_workqueue); 1563 return -ENOMEM; 1564 } 1565 1566 return 0; 1567 } 1568 1569 static void _stop_conn(struct connection *con, bool and_other) 1570 { 1571 mutex_lock(&con->sock_mutex); 1572 set_bit(CF_CLOSE, &con->flags); 1573 set_bit(CF_READ_PENDING, &con->flags); 1574 set_bit(CF_WRITE_PENDING, &con->flags); 1575 if (con->sock && con->sock->sk) { 1576 write_lock_bh(&con->sock->sk->sk_callback_lock); 1577 con->sock->sk->sk_user_data = NULL; 1578 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1579 } 1580 if (con->othercon && and_other) 1581 _stop_conn(con->othercon, false); 1582 mutex_unlock(&con->sock_mutex); 1583 } 1584 1585 static void stop_conn(struct connection *con) 1586 { 1587 _stop_conn(con, true); 1588 } 1589 1590 static void shutdown_conn(struct connection *con) 1591 { 1592 if (con->shutdown_action) 1593 con->shutdown_action(con); 1594 } 1595 1596 static void connection_release(struct rcu_head *rcu) 1597 { 1598 struct connection *con = container_of(rcu, struct connection, rcu); 1599 1600 kfree(con->rx_buf); 1601 kfree(con); 1602 } 1603 1604 static void free_conn(struct connection *con) 1605 { 1606 close_connection(con, true, true, true); 1607 spin_lock(&connections_lock); 1608 hlist_del_rcu(&con->list); 1609 spin_unlock(&connections_lock); 1610 if (con->othercon) { 1611 clean_one_writequeue(con->othercon); 1612 call_srcu(&connections_srcu, &con->othercon->rcu, 1613 connection_release); 1614 } 1615 clean_one_writequeue(con); 1616 call_srcu(&connections_srcu, &con->rcu, connection_release); 1617 } 1618 1619 static void work_flush(void) 1620 { 1621 int ok, idx; 1622 int i; 1623 struct connection *con; 1624 1625 do { 1626 ok = 1; 1627 foreach_conn(stop_conn); 1628 if (recv_workqueue) 1629 flush_workqueue(recv_workqueue); 1630 if (send_workqueue) 1631 flush_workqueue(send_workqueue); 1632 idx = srcu_read_lock(&connections_srcu); 1633 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1634 hlist_for_each_entry_rcu(con, &connection_hash[i], 1635 list) { 1636 ok &= test_bit(CF_READ_PENDING, &con->flags); 1637 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1638 if (con->othercon) { 1639 ok &= test_bit(CF_READ_PENDING, 1640 &con->othercon->flags); 1641 ok &= test_bit(CF_WRITE_PENDING, 1642 &con->othercon->flags); 1643 } 1644 } 1645 } 1646 srcu_read_unlock(&connections_srcu, idx); 1647 } while (!ok); 1648 } 1649 1650 void dlm_lowcomms_stop(void) 1651 { 1652 /* Set all the flags to prevent any 1653 socket activity. 1654 */ 1655 dlm_allow_conn = 0; 1656 1657 if (recv_workqueue) 1658 flush_workqueue(recv_workqueue); 1659 if (send_workqueue) 1660 flush_workqueue(send_workqueue); 1661 1662 dlm_close_sock(&listen_con.sock); 1663 1664 foreach_conn(shutdown_conn); 1665 work_flush(); 1666 foreach_conn(free_conn); 1667 work_stop(); 1668 deinit_local(); 1669 } 1670 1671 int dlm_lowcomms_start(void) 1672 { 1673 int error = -EINVAL; 1674 int i; 1675 1676 for (i = 0; i < CONN_HASH_SIZE; i++) 1677 INIT_HLIST_HEAD(&connection_hash[i]); 1678 1679 init_local(); 1680 if (!dlm_local_count) { 1681 error = -ENOTCONN; 1682 log_print("no local IP address has been set"); 1683 goto fail; 1684 } 1685 1686 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1687 1688 error = work_start(); 1689 if (error) 1690 goto fail; 1691 1692 dlm_allow_conn = 1; 1693 1694 /* Start listening */ 1695 if (dlm_config.ci_protocol == 0) 1696 error = tcp_listen_for_all(); 1697 else 1698 error = sctp_listen_for_all(&listen_con); 1699 if (error) 1700 goto fail_unlisten; 1701 1702 return 0; 1703 1704 fail_unlisten: 1705 dlm_allow_conn = 0; 1706 dlm_close_sock(&listen_con.sock); 1707 fail: 1708 return error; 1709 } 1710 1711 void dlm_lowcomms_exit(void) 1712 { 1713 struct dlm_node_addr *na, *safe; 1714 1715 spin_lock(&dlm_node_addrs_spin); 1716 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1717 list_del(&na->list); 1718 while (na->addr_count--) 1719 kfree(na->addr[na->addr_count]); 1720 kfree(na); 1721 } 1722 spin_unlock(&dlm_node_addrs_spin); 1723 } 1724