1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 #define CONN_HASH_SIZE 32 63 64 /* Number of messages to send before rescheduling */ 65 #define MAX_SEND_MSG_COUNT 25 66 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 67 68 struct connection { 69 struct socket *sock; /* NULL if not connected */ 70 uint32_t nodeid; /* So we know who we are in the list */ 71 struct mutex sock_mutex; 72 unsigned long flags; 73 #define CF_READ_PENDING 1 74 #define CF_WRITE_PENDING 2 75 #define CF_INIT_PENDING 4 76 #define CF_IS_OTHERCON 5 77 #define CF_CLOSE 6 78 #define CF_APP_LIMITED 7 79 #define CF_CLOSING 8 80 #define CF_SHUTDOWN 9 81 #define CF_CONNECTED 10 82 struct list_head writequeue; /* List of outgoing writequeue_entries */ 83 spinlock_t writequeue_lock; 84 void (*connect_action) (struct connection *); /* What to do to connect */ 85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 86 int retries; 87 #define MAX_CONNECT_RETRIES 3 88 struct hlist_node list; 89 struct connection *othercon; 90 struct work_struct rwork; /* Receive workqueue */ 91 struct work_struct swork; /* Send workqueue */ 92 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 93 unsigned char *rx_buf; 94 int rx_buflen; 95 int rx_leftover; 96 struct rcu_head rcu; 97 }; 98 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 99 100 struct listen_connection { 101 struct socket *sock; 102 struct work_struct rwork; 103 }; 104 105 /* An entry waiting to be sent */ 106 struct writequeue_entry { 107 struct list_head list; 108 struct page *page; 109 int offset; 110 int len; 111 int end; 112 int users; 113 struct connection *con; 114 }; 115 116 struct dlm_node_addr { 117 struct list_head list; 118 int nodeid; 119 int addr_count; 120 int curr_addr_index; 121 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 122 }; 123 124 static struct listen_sock_callbacks { 125 void (*sk_error_report)(struct sock *); 126 void (*sk_data_ready)(struct sock *); 127 void (*sk_state_change)(struct sock *); 128 void (*sk_write_space)(struct sock *); 129 } listen_sock; 130 131 static LIST_HEAD(dlm_node_addrs); 132 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 133 134 static struct listen_connection listen_con; 135 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 136 static int dlm_local_count; 137 static int dlm_allow_conn; 138 139 /* Work queues */ 140 static struct workqueue_struct *recv_workqueue; 141 static struct workqueue_struct *send_workqueue; 142 143 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 144 static DEFINE_SPINLOCK(connections_lock); 145 DEFINE_STATIC_SRCU(connections_srcu); 146 147 static void process_recv_sockets(struct work_struct *work); 148 static void process_send_sockets(struct work_struct *work); 149 150 static void sctp_connect_to_sock(struct connection *con); 151 static void tcp_connect_to_sock(struct connection *con); 152 static void dlm_tcp_shutdown(struct connection *con); 153 154 /* This is deliberately very simple because most clusters have simple 155 sequential nodeids, so we should be able to go straight to a connection 156 struct in the array */ 157 static inline int nodeid_hash(int nodeid) 158 { 159 return nodeid & (CONN_HASH_SIZE-1); 160 } 161 162 static struct connection *__find_con(int nodeid) 163 { 164 int r, idx; 165 struct connection *con; 166 167 r = nodeid_hash(nodeid); 168 169 idx = srcu_read_lock(&connections_srcu); 170 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 171 if (con->nodeid == nodeid) { 172 srcu_read_unlock(&connections_srcu, idx); 173 return con; 174 } 175 } 176 srcu_read_unlock(&connections_srcu, idx); 177 178 return NULL; 179 } 180 181 static int dlm_con_init(struct connection *con, int nodeid) 182 { 183 con->rx_buflen = dlm_config.ci_buffer_size; 184 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 185 if (!con->rx_buf) 186 return -ENOMEM; 187 188 con->nodeid = nodeid; 189 mutex_init(&con->sock_mutex); 190 INIT_LIST_HEAD(&con->writequeue); 191 spin_lock_init(&con->writequeue_lock); 192 INIT_WORK(&con->swork, process_send_sockets); 193 INIT_WORK(&con->rwork, process_recv_sockets); 194 init_waitqueue_head(&con->shutdown_wait); 195 196 if (dlm_config.ci_protocol == 0) { 197 con->connect_action = tcp_connect_to_sock; 198 con->shutdown_action = dlm_tcp_shutdown; 199 } else { 200 con->connect_action = sctp_connect_to_sock; 201 } 202 203 return 0; 204 } 205 206 /* 207 * If 'allocation' is zero then we don't attempt to create a new 208 * connection structure for this node. 209 */ 210 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 211 { 212 struct connection *con, *tmp; 213 int r, ret; 214 215 con = __find_con(nodeid); 216 if (con || !alloc) 217 return con; 218 219 con = kzalloc(sizeof(*con), alloc); 220 if (!con) 221 return NULL; 222 223 ret = dlm_con_init(con, nodeid); 224 if (ret) { 225 kfree(con); 226 return NULL; 227 } 228 229 r = nodeid_hash(nodeid); 230 231 spin_lock(&connections_lock); 232 /* Because multiple workqueues/threads calls this function it can 233 * race on multiple cpu's. Instead of locking hot path __find_con() 234 * we just check in rare cases of recently added nodes again 235 * under protection of connections_lock. If this is the case we 236 * abort our connection creation and return the existing connection. 237 */ 238 tmp = __find_con(nodeid); 239 if (tmp) { 240 spin_unlock(&connections_lock); 241 kfree(con->rx_buf); 242 kfree(con); 243 return tmp; 244 } 245 246 hlist_add_head_rcu(&con->list, &connection_hash[r]); 247 spin_unlock(&connections_lock); 248 249 return con; 250 } 251 252 /* Loop round all connections */ 253 static void foreach_conn(void (*conn_func)(struct connection *c)) 254 { 255 int i, idx; 256 struct connection *con; 257 258 idx = srcu_read_lock(&connections_srcu); 259 for (i = 0; i < CONN_HASH_SIZE; i++) { 260 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 261 conn_func(con); 262 } 263 srcu_read_unlock(&connections_srcu, idx); 264 } 265 266 static struct dlm_node_addr *find_node_addr(int nodeid) 267 { 268 struct dlm_node_addr *na; 269 270 list_for_each_entry(na, &dlm_node_addrs, list) { 271 if (na->nodeid == nodeid) 272 return na; 273 } 274 return NULL; 275 } 276 277 static int addr_compare(const struct sockaddr_storage *x, 278 const struct sockaddr_storage *y) 279 { 280 switch (x->ss_family) { 281 case AF_INET: { 282 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 283 struct sockaddr_in *siny = (struct sockaddr_in *)y; 284 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 285 return 0; 286 if (sinx->sin_port != siny->sin_port) 287 return 0; 288 break; 289 } 290 case AF_INET6: { 291 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 292 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 293 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 294 return 0; 295 if (sinx->sin6_port != siny->sin6_port) 296 return 0; 297 break; 298 } 299 default: 300 return 0; 301 } 302 return 1; 303 } 304 305 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 306 struct sockaddr *sa_out, bool try_new_addr) 307 { 308 struct sockaddr_storage sas; 309 struct dlm_node_addr *na; 310 311 if (!dlm_local_count) 312 return -1; 313 314 spin_lock(&dlm_node_addrs_spin); 315 na = find_node_addr(nodeid); 316 if (na && na->addr_count) { 317 memcpy(&sas, na->addr[na->curr_addr_index], 318 sizeof(struct sockaddr_storage)); 319 320 if (try_new_addr) { 321 na->curr_addr_index++; 322 if (na->curr_addr_index == na->addr_count) 323 na->curr_addr_index = 0; 324 } 325 } 326 spin_unlock(&dlm_node_addrs_spin); 327 328 if (!na) 329 return -EEXIST; 330 331 if (!na->addr_count) 332 return -ENOENT; 333 334 if (sas_out) 335 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 336 337 if (!sa_out) 338 return 0; 339 340 if (dlm_local_addr[0]->ss_family == AF_INET) { 341 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 342 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 343 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 344 } else { 345 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 346 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 347 ret6->sin6_addr = in6->sin6_addr; 348 } 349 350 return 0; 351 } 352 353 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 354 { 355 struct dlm_node_addr *na; 356 int rv = -EEXIST; 357 int addr_i; 358 359 spin_lock(&dlm_node_addrs_spin); 360 list_for_each_entry(na, &dlm_node_addrs, list) { 361 if (!na->addr_count) 362 continue; 363 364 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 365 if (addr_compare(na->addr[addr_i], addr)) { 366 *nodeid = na->nodeid; 367 rv = 0; 368 goto unlock; 369 } 370 } 371 } 372 unlock: 373 spin_unlock(&dlm_node_addrs_spin); 374 return rv; 375 } 376 377 /* caller need to held dlm_node_addrs_spin lock */ 378 static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na, 379 const struct sockaddr_storage *addr) 380 { 381 int i; 382 383 for (i = 0; i < na->addr_count; i++) { 384 if (addr_compare(na->addr[i], addr)) 385 return true; 386 } 387 388 return false; 389 } 390 391 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 392 { 393 struct sockaddr_storage *new_addr; 394 struct dlm_node_addr *new_node, *na; 395 bool ret; 396 397 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 398 if (!new_node) 399 return -ENOMEM; 400 401 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 402 if (!new_addr) { 403 kfree(new_node); 404 return -ENOMEM; 405 } 406 407 memcpy(new_addr, addr, len); 408 409 spin_lock(&dlm_node_addrs_spin); 410 na = find_node_addr(nodeid); 411 if (!na) { 412 new_node->nodeid = nodeid; 413 new_node->addr[0] = new_addr; 414 new_node->addr_count = 1; 415 list_add(&new_node->list, &dlm_node_addrs); 416 spin_unlock(&dlm_node_addrs_spin); 417 return 0; 418 } 419 420 ret = dlm_lowcomms_na_has_addr(na, addr); 421 if (ret) { 422 spin_unlock(&dlm_node_addrs_spin); 423 kfree(new_addr); 424 kfree(new_node); 425 return -EEXIST; 426 } 427 428 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 429 spin_unlock(&dlm_node_addrs_spin); 430 kfree(new_addr); 431 kfree(new_node); 432 return -ENOSPC; 433 } 434 435 na->addr[na->addr_count++] = new_addr; 436 spin_unlock(&dlm_node_addrs_spin); 437 kfree(new_node); 438 return 0; 439 } 440 441 /* Data available on socket or listen socket received a connect */ 442 static void lowcomms_data_ready(struct sock *sk) 443 { 444 struct connection *con; 445 446 read_lock_bh(&sk->sk_callback_lock); 447 con = sock2con(sk); 448 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 449 queue_work(recv_workqueue, &con->rwork); 450 read_unlock_bh(&sk->sk_callback_lock); 451 } 452 453 static void lowcomms_listen_data_ready(struct sock *sk) 454 { 455 queue_work(recv_workqueue, &listen_con.rwork); 456 } 457 458 static void lowcomms_write_space(struct sock *sk) 459 { 460 struct connection *con; 461 462 read_lock_bh(&sk->sk_callback_lock); 463 con = sock2con(sk); 464 if (!con) 465 goto out; 466 467 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { 468 log_print("successful connected to node %d", con->nodeid); 469 queue_work(send_workqueue, &con->swork); 470 goto out; 471 } 472 473 clear_bit(SOCK_NOSPACE, &con->sock->flags); 474 475 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 476 con->sock->sk->sk_write_pending--; 477 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 478 } 479 480 queue_work(send_workqueue, &con->swork); 481 out: 482 read_unlock_bh(&sk->sk_callback_lock); 483 } 484 485 static inline void lowcomms_connect_sock(struct connection *con) 486 { 487 if (test_bit(CF_CLOSE, &con->flags)) 488 return; 489 queue_work(send_workqueue, &con->swork); 490 cond_resched(); 491 } 492 493 static void lowcomms_state_change(struct sock *sk) 494 { 495 /* SCTP layer is not calling sk_data_ready when the connection 496 * is done, so we catch the signal through here. Also, it 497 * doesn't switch socket state when entering shutdown, so we 498 * skip the write in that case. 499 */ 500 if (sk->sk_shutdown) { 501 if (sk->sk_shutdown == RCV_SHUTDOWN) 502 lowcomms_data_ready(sk); 503 } else if (sk->sk_state == TCP_ESTABLISHED) { 504 lowcomms_write_space(sk); 505 } 506 } 507 508 int dlm_lowcomms_connect_node(int nodeid) 509 { 510 struct connection *con; 511 512 if (nodeid == dlm_our_nodeid()) 513 return 0; 514 515 con = nodeid2con(nodeid, GFP_NOFS); 516 if (!con) 517 return -ENOMEM; 518 lowcomms_connect_sock(con); 519 return 0; 520 } 521 522 static void lowcomms_error_report(struct sock *sk) 523 { 524 struct connection *con; 525 struct sockaddr_storage saddr; 526 void (*orig_report)(struct sock *) = NULL; 527 528 read_lock_bh(&sk->sk_callback_lock); 529 con = sock2con(sk); 530 if (con == NULL) 531 goto out; 532 533 orig_report = listen_sock.sk_error_report; 534 if (con->sock == NULL || 535 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 536 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 537 "sending to node %d, port %d, " 538 "sk_err=%d/%d\n", dlm_our_nodeid(), 539 con->nodeid, dlm_config.ci_tcp_port, 540 sk->sk_err, sk->sk_err_soft); 541 } else if (saddr.ss_family == AF_INET) { 542 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 543 544 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 545 "sending to node %d at %pI4, port %d, " 546 "sk_err=%d/%d\n", dlm_our_nodeid(), 547 con->nodeid, &sin4->sin_addr.s_addr, 548 dlm_config.ci_tcp_port, sk->sk_err, 549 sk->sk_err_soft); 550 } else { 551 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 552 553 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 554 "sending to node %d at %u.%u.%u.%u, " 555 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 556 con->nodeid, sin6->sin6_addr.s6_addr32[0], 557 sin6->sin6_addr.s6_addr32[1], 558 sin6->sin6_addr.s6_addr32[2], 559 sin6->sin6_addr.s6_addr32[3], 560 dlm_config.ci_tcp_port, sk->sk_err, 561 sk->sk_err_soft); 562 } 563 out: 564 read_unlock_bh(&sk->sk_callback_lock); 565 if (orig_report) 566 orig_report(sk); 567 } 568 569 /* Note: sk_callback_lock must be locked before calling this function. */ 570 static void save_listen_callbacks(struct socket *sock) 571 { 572 struct sock *sk = sock->sk; 573 574 listen_sock.sk_data_ready = sk->sk_data_ready; 575 listen_sock.sk_state_change = sk->sk_state_change; 576 listen_sock.sk_write_space = sk->sk_write_space; 577 listen_sock.sk_error_report = sk->sk_error_report; 578 } 579 580 static void restore_callbacks(struct socket *sock) 581 { 582 struct sock *sk = sock->sk; 583 584 write_lock_bh(&sk->sk_callback_lock); 585 sk->sk_user_data = NULL; 586 sk->sk_data_ready = listen_sock.sk_data_ready; 587 sk->sk_state_change = listen_sock.sk_state_change; 588 sk->sk_write_space = listen_sock.sk_write_space; 589 sk->sk_error_report = listen_sock.sk_error_report; 590 write_unlock_bh(&sk->sk_callback_lock); 591 } 592 593 static void add_listen_sock(struct socket *sock, struct listen_connection *con) 594 { 595 struct sock *sk = sock->sk; 596 597 write_lock_bh(&sk->sk_callback_lock); 598 save_listen_callbacks(sock); 599 con->sock = sock; 600 601 sk->sk_user_data = con; 602 sk->sk_allocation = GFP_NOFS; 603 /* Install a data_ready callback */ 604 sk->sk_data_ready = lowcomms_listen_data_ready; 605 write_unlock_bh(&sk->sk_callback_lock); 606 } 607 608 /* Make a socket active */ 609 static void add_sock(struct socket *sock, struct connection *con) 610 { 611 struct sock *sk = sock->sk; 612 613 write_lock_bh(&sk->sk_callback_lock); 614 con->sock = sock; 615 616 sk->sk_user_data = con; 617 /* Install a data_ready callback */ 618 sk->sk_data_ready = lowcomms_data_ready; 619 sk->sk_write_space = lowcomms_write_space; 620 sk->sk_state_change = lowcomms_state_change; 621 sk->sk_allocation = GFP_NOFS; 622 sk->sk_error_report = lowcomms_error_report; 623 write_unlock_bh(&sk->sk_callback_lock); 624 } 625 626 /* Add the port number to an IPv6 or 4 sockaddr and return the address 627 length */ 628 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 629 int *addr_len) 630 { 631 saddr->ss_family = dlm_local_addr[0]->ss_family; 632 if (saddr->ss_family == AF_INET) { 633 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 634 in4_addr->sin_port = cpu_to_be16(port); 635 *addr_len = sizeof(struct sockaddr_in); 636 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 637 } else { 638 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 639 in6_addr->sin6_port = cpu_to_be16(port); 640 *addr_len = sizeof(struct sockaddr_in6); 641 } 642 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 643 } 644 645 static void dlm_close_sock(struct socket **sock) 646 { 647 if (*sock) { 648 restore_callbacks(*sock); 649 sock_release(*sock); 650 *sock = NULL; 651 } 652 } 653 654 /* Close a remote connection and tidy up */ 655 static void close_connection(struct connection *con, bool and_other, 656 bool tx, bool rx) 657 { 658 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 659 660 if (tx && !closing && cancel_work_sync(&con->swork)) { 661 log_print("canceled swork for node %d", con->nodeid); 662 clear_bit(CF_WRITE_PENDING, &con->flags); 663 } 664 if (rx && !closing && cancel_work_sync(&con->rwork)) { 665 log_print("canceled rwork for node %d", con->nodeid); 666 clear_bit(CF_READ_PENDING, &con->flags); 667 } 668 669 mutex_lock(&con->sock_mutex); 670 dlm_close_sock(&con->sock); 671 672 if (con->othercon && and_other) { 673 /* Will only re-enter once. */ 674 close_connection(con->othercon, false, true, true); 675 } 676 677 con->rx_leftover = 0; 678 con->retries = 0; 679 clear_bit(CF_CONNECTED, &con->flags); 680 mutex_unlock(&con->sock_mutex); 681 clear_bit(CF_CLOSING, &con->flags); 682 } 683 684 static void shutdown_connection(struct connection *con) 685 { 686 int ret; 687 688 if (cancel_work_sync(&con->swork)) { 689 log_print("canceled swork for node %d", con->nodeid); 690 clear_bit(CF_WRITE_PENDING, &con->flags); 691 } 692 693 mutex_lock(&con->sock_mutex); 694 /* nothing to shutdown */ 695 if (!con->sock) { 696 mutex_unlock(&con->sock_mutex); 697 return; 698 } 699 700 set_bit(CF_SHUTDOWN, &con->flags); 701 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 702 mutex_unlock(&con->sock_mutex); 703 if (ret) { 704 log_print("Connection %p failed to shutdown: %d will force close", 705 con, ret); 706 goto force_close; 707 } else { 708 ret = wait_event_timeout(con->shutdown_wait, 709 !test_bit(CF_SHUTDOWN, &con->flags), 710 DLM_SHUTDOWN_WAIT_TIMEOUT); 711 if (ret == 0) { 712 log_print("Connection %p shutdown timed out, will force close", 713 con); 714 goto force_close; 715 } 716 } 717 718 return; 719 720 force_close: 721 clear_bit(CF_SHUTDOWN, &con->flags); 722 close_connection(con, false, true, true); 723 } 724 725 static void dlm_tcp_shutdown(struct connection *con) 726 { 727 if (con->othercon) 728 shutdown_connection(con->othercon); 729 shutdown_connection(con); 730 } 731 732 static int con_realloc_receive_buf(struct connection *con, int newlen) 733 { 734 unsigned char *newbuf; 735 736 newbuf = kmalloc(newlen, GFP_NOFS); 737 if (!newbuf) 738 return -ENOMEM; 739 740 /* copy any leftover from last receive */ 741 if (con->rx_leftover) 742 memmove(newbuf, con->rx_buf, con->rx_leftover); 743 744 /* swap to new buffer space */ 745 kfree(con->rx_buf); 746 con->rx_buflen = newlen; 747 con->rx_buf = newbuf; 748 749 return 0; 750 } 751 752 /* Data received from remote end */ 753 static int receive_from_sock(struct connection *con) 754 { 755 int call_again_soon = 0; 756 struct msghdr msg; 757 struct kvec iov; 758 int ret, buflen; 759 760 mutex_lock(&con->sock_mutex); 761 762 if (con->sock == NULL) { 763 ret = -EAGAIN; 764 goto out_close; 765 } 766 767 /* realloc if we get new buffer size to read out */ 768 buflen = dlm_config.ci_buffer_size; 769 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 770 ret = con_realloc_receive_buf(con, buflen); 771 if (ret < 0) 772 goto out_resched; 773 } 774 775 /* calculate new buffer parameter regarding last receive and 776 * possible leftover bytes 777 */ 778 iov.iov_base = con->rx_buf + con->rx_leftover; 779 iov.iov_len = con->rx_buflen - con->rx_leftover; 780 781 memset(&msg, 0, sizeof(msg)); 782 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 783 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 784 msg.msg_flags); 785 if (ret <= 0) 786 goto out_close; 787 else if (ret == iov.iov_len) 788 call_again_soon = 1; 789 790 /* new buflen according readed bytes and leftover from last receive */ 791 buflen = ret + con->rx_leftover; 792 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 793 if (ret < 0) 794 goto out_close; 795 796 /* calculate leftover bytes from process and put it into begin of 797 * the receive buffer, so next receive we have the full message 798 * at the start address of the receive buffer. 799 */ 800 con->rx_leftover = buflen - ret; 801 if (con->rx_leftover) { 802 memmove(con->rx_buf, con->rx_buf + ret, 803 con->rx_leftover); 804 call_again_soon = true; 805 } 806 807 if (call_again_soon) 808 goto out_resched; 809 810 mutex_unlock(&con->sock_mutex); 811 return 0; 812 813 out_resched: 814 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 815 queue_work(recv_workqueue, &con->rwork); 816 mutex_unlock(&con->sock_mutex); 817 return -EAGAIN; 818 819 out_close: 820 mutex_unlock(&con->sock_mutex); 821 if (ret != -EAGAIN) { 822 /* Reconnect when there is something to send */ 823 close_connection(con, false, true, false); 824 if (ret == 0) { 825 log_print("connection %p got EOF from %d", 826 con, con->nodeid); 827 /* handling for tcp shutdown */ 828 clear_bit(CF_SHUTDOWN, &con->flags); 829 wake_up(&con->shutdown_wait); 830 /* signal to breaking receive worker */ 831 ret = -1; 832 } 833 } 834 return ret; 835 } 836 837 /* Listening socket is busy, accept a connection */ 838 static int accept_from_sock(struct listen_connection *con) 839 { 840 int result; 841 struct sockaddr_storage peeraddr; 842 struct socket *newsock; 843 int len; 844 int nodeid; 845 struct connection *newcon; 846 struct connection *addcon; 847 unsigned int mark; 848 849 if (!dlm_allow_conn) { 850 return -1; 851 } 852 853 if (!con->sock) 854 return -ENOTCONN; 855 856 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 857 if (result < 0) 858 goto accept_err; 859 860 /* Get the connected socket's peer */ 861 memset(&peeraddr, 0, sizeof(peeraddr)); 862 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 863 if (len < 0) { 864 result = -ECONNABORTED; 865 goto accept_err; 866 } 867 868 /* Get the new node's NODEID */ 869 make_sockaddr(&peeraddr, 0, &len); 870 if (addr_to_nodeid(&peeraddr, &nodeid)) { 871 unsigned char *b=(unsigned char *)&peeraddr; 872 log_print("connect from non cluster node"); 873 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 874 b, sizeof(struct sockaddr_storage)); 875 sock_release(newsock); 876 return -1; 877 } 878 879 dlm_comm_mark(nodeid, &mark); 880 sock_set_mark(newsock->sk, mark); 881 882 log_print("got connection from %d", nodeid); 883 884 /* Check to see if we already have a connection to this node. This 885 * could happen if the two nodes initiate a connection at roughly 886 * the same time and the connections cross on the wire. 887 * In this case we store the incoming one in "othercon" 888 */ 889 newcon = nodeid2con(nodeid, GFP_NOFS); 890 if (!newcon) { 891 result = -ENOMEM; 892 goto accept_err; 893 } 894 895 mutex_lock(&newcon->sock_mutex); 896 if (newcon->sock) { 897 struct connection *othercon = newcon->othercon; 898 899 if (!othercon) { 900 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 901 if (!othercon) { 902 log_print("failed to allocate incoming socket"); 903 mutex_unlock(&newcon->sock_mutex); 904 result = -ENOMEM; 905 goto accept_err; 906 } 907 908 result = dlm_con_init(othercon, nodeid); 909 if (result < 0) { 910 kfree(othercon); 911 goto accept_err; 912 } 913 914 newcon->othercon = othercon; 915 } else { 916 /* close other sock con if we have something new */ 917 close_connection(othercon, false, true, false); 918 } 919 920 mutex_lock_nested(&othercon->sock_mutex, 1); 921 add_sock(newsock, othercon); 922 addcon = othercon; 923 mutex_unlock(&othercon->sock_mutex); 924 } 925 else { 926 /* accept copies the sk after we've saved the callbacks, so we 927 don't want to save them a second time or comm errors will 928 result in calling sk_error_report recursively. */ 929 add_sock(newsock, newcon); 930 addcon = newcon; 931 } 932 933 mutex_unlock(&newcon->sock_mutex); 934 935 /* 936 * Add it to the active queue in case we got data 937 * between processing the accept adding the socket 938 * to the read_sockets list 939 */ 940 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 941 queue_work(recv_workqueue, &addcon->rwork); 942 943 return 0; 944 945 accept_err: 946 if (newsock) 947 sock_release(newsock); 948 949 if (result != -EAGAIN) 950 log_print("error accepting connection from node: %d", result); 951 return result; 952 } 953 954 static void free_entry(struct writequeue_entry *e) 955 { 956 __free_page(e->page); 957 kfree(e); 958 } 959 960 /* 961 * writequeue_entry_complete - try to delete and free write queue entry 962 * @e: write queue entry to try to delete 963 * @completed: bytes completed 964 * 965 * writequeue_lock must be held. 966 */ 967 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 968 { 969 e->offset += completed; 970 e->len -= completed; 971 972 if (e->len == 0 && e->users == 0) { 973 list_del(&e->list); 974 free_entry(e); 975 } 976 } 977 978 /* 979 * sctp_bind_addrs - bind a SCTP socket to all our addresses 980 */ 981 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 982 { 983 struct sockaddr_storage localaddr; 984 struct sockaddr *addr = (struct sockaddr *)&localaddr; 985 int i, addr_len, result = 0; 986 987 for (i = 0; i < dlm_local_count; i++) { 988 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 989 make_sockaddr(&localaddr, port, &addr_len); 990 991 if (!i) 992 result = kernel_bind(sock, addr, addr_len); 993 else 994 result = sock_bind_add(sock->sk, addr, addr_len); 995 996 if (result < 0) { 997 log_print("Can't bind to %d addr number %d, %d.\n", 998 port, i + 1, result); 999 break; 1000 } 1001 } 1002 return result; 1003 } 1004 1005 /* Initiate an SCTP association. 1006 This is a special case of send_to_sock() in that we don't yet have a 1007 peeled-off socket for this association, so we use the listening socket 1008 and add the primary IP address of the remote node. 1009 */ 1010 static void sctp_connect_to_sock(struct connection *con) 1011 { 1012 struct sockaddr_storage daddr; 1013 int result; 1014 int addr_len; 1015 struct socket *sock; 1016 unsigned int mark; 1017 1018 dlm_comm_mark(con->nodeid, &mark); 1019 1020 mutex_lock(&con->sock_mutex); 1021 1022 /* Some odd races can cause double-connects, ignore them */ 1023 if (con->retries++ > MAX_CONNECT_RETRIES) 1024 goto out; 1025 1026 if (con->sock) { 1027 log_print("node %d already connected.", con->nodeid); 1028 goto out; 1029 } 1030 1031 memset(&daddr, 0, sizeof(daddr)); 1032 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); 1033 if (result < 0) { 1034 log_print("no address for nodeid %d", con->nodeid); 1035 goto out; 1036 } 1037 1038 /* Create a socket to communicate with */ 1039 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1040 SOCK_STREAM, IPPROTO_SCTP, &sock); 1041 if (result < 0) 1042 goto socket_err; 1043 1044 sock_set_mark(sock->sk, mark); 1045 1046 add_sock(sock, con); 1047 1048 /* Bind to all addresses. */ 1049 if (sctp_bind_addrs(con->sock, 0)) 1050 goto bind_err; 1051 1052 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1053 1054 log_print("connecting to %d", con->nodeid); 1055 1056 /* Turn off Nagle's algorithm */ 1057 sctp_sock_set_nodelay(sock->sk); 1058 1059 /* 1060 * Make sock->ops->connect() function return in specified time, 1061 * since O_NONBLOCK argument in connect() function does not work here, 1062 * then, we should restore the default value of this attribute. 1063 */ 1064 sock_set_sndtimeo(sock->sk, 5); 1065 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1066 0); 1067 sock_set_sndtimeo(sock->sk, 0); 1068 1069 if (result == -EINPROGRESS) 1070 result = 0; 1071 if (result == 0) { 1072 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) 1073 log_print("successful connected to node %d", con->nodeid); 1074 goto out; 1075 } 1076 1077 bind_err: 1078 con->sock = NULL; 1079 sock_release(sock); 1080 1081 socket_err: 1082 /* 1083 * Some errors are fatal and this list might need adjusting. For other 1084 * errors we try again until the max number of retries is reached. 1085 */ 1086 if (result != -EHOSTUNREACH && 1087 result != -ENETUNREACH && 1088 result != -ENETDOWN && 1089 result != -EINVAL && 1090 result != -EPROTONOSUPPORT) { 1091 log_print("connect %d try %d error %d", con->nodeid, 1092 con->retries, result); 1093 mutex_unlock(&con->sock_mutex); 1094 msleep(1000); 1095 lowcomms_connect_sock(con); 1096 return; 1097 } 1098 1099 out: 1100 mutex_unlock(&con->sock_mutex); 1101 } 1102 1103 /* Connect a new socket to its peer */ 1104 static void tcp_connect_to_sock(struct connection *con) 1105 { 1106 struct sockaddr_storage saddr, src_addr; 1107 int addr_len; 1108 struct socket *sock = NULL; 1109 unsigned int mark; 1110 int result; 1111 1112 dlm_comm_mark(con->nodeid, &mark); 1113 1114 mutex_lock(&con->sock_mutex); 1115 if (con->retries++ > MAX_CONNECT_RETRIES) 1116 goto out; 1117 1118 /* Some odd races can cause double-connects, ignore them */ 1119 if (con->sock) 1120 goto out; 1121 1122 /* Create a socket to communicate with */ 1123 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1124 SOCK_STREAM, IPPROTO_TCP, &sock); 1125 if (result < 0) 1126 goto out_err; 1127 1128 sock_set_mark(sock->sk, mark); 1129 1130 memset(&saddr, 0, sizeof(saddr)); 1131 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1132 if (result < 0) { 1133 log_print("no address for nodeid %d", con->nodeid); 1134 goto out_err; 1135 } 1136 1137 add_sock(sock, con); 1138 1139 /* Bind to our cluster-known address connecting to avoid 1140 routing problems */ 1141 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1142 make_sockaddr(&src_addr, 0, &addr_len); 1143 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1144 addr_len); 1145 if (result < 0) { 1146 log_print("could not bind for connect: %d", result); 1147 /* This *may* not indicate a critical error */ 1148 } 1149 1150 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1151 1152 log_print("connecting to %d", con->nodeid); 1153 1154 /* Turn off Nagle's algorithm */ 1155 tcp_sock_set_nodelay(sock->sk); 1156 1157 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1158 O_NONBLOCK); 1159 if (result == -EINPROGRESS) 1160 result = 0; 1161 if (result == 0) 1162 goto out; 1163 1164 out_err: 1165 if (con->sock) { 1166 sock_release(con->sock); 1167 con->sock = NULL; 1168 } else if (sock) { 1169 sock_release(sock); 1170 } 1171 /* 1172 * Some errors are fatal and this list might need adjusting. For other 1173 * errors we try again until the max number of retries is reached. 1174 */ 1175 if (result != -EHOSTUNREACH && 1176 result != -ENETUNREACH && 1177 result != -ENETDOWN && 1178 result != -EINVAL && 1179 result != -EPROTONOSUPPORT) { 1180 log_print("connect %d try %d error %d", con->nodeid, 1181 con->retries, result); 1182 mutex_unlock(&con->sock_mutex); 1183 msleep(1000); 1184 lowcomms_connect_sock(con); 1185 return; 1186 } 1187 out: 1188 mutex_unlock(&con->sock_mutex); 1189 return; 1190 } 1191 1192 /* On error caller must run dlm_close_sock() for the 1193 * listen connection socket. 1194 */ 1195 static int tcp_create_listen_sock(struct listen_connection *con, 1196 struct sockaddr_storage *saddr) 1197 { 1198 struct socket *sock = NULL; 1199 int result = 0; 1200 int addr_len; 1201 1202 if (dlm_local_addr[0]->ss_family == AF_INET) 1203 addr_len = sizeof(struct sockaddr_in); 1204 else 1205 addr_len = sizeof(struct sockaddr_in6); 1206 1207 /* Create a socket to communicate with */ 1208 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1209 SOCK_STREAM, IPPROTO_TCP, &sock); 1210 if (result < 0) { 1211 log_print("Can't create listening comms socket"); 1212 goto create_out; 1213 } 1214 1215 sock_set_mark(sock->sk, dlm_config.ci_mark); 1216 1217 /* Turn off Nagle's algorithm */ 1218 tcp_sock_set_nodelay(sock->sk); 1219 1220 sock_set_reuseaddr(sock->sk); 1221 1222 add_listen_sock(sock, con); 1223 1224 /* Bind to our port */ 1225 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1226 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1227 if (result < 0) { 1228 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1229 goto create_out; 1230 } 1231 sock_set_keepalive(sock->sk); 1232 1233 result = sock->ops->listen(sock, 5); 1234 if (result < 0) { 1235 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1236 goto create_out; 1237 } 1238 1239 return 0; 1240 1241 create_out: 1242 return result; 1243 } 1244 1245 /* Get local addresses */ 1246 static void init_local(void) 1247 { 1248 struct sockaddr_storage sas, *addr; 1249 int i; 1250 1251 dlm_local_count = 0; 1252 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1253 if (dlm_our_addr(&sas, i)) 1254 break; 1255 1256 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1257 if (!addr) 1258 break; 1259 dlm_local_addr[dlm_local_count++] = addr; 1260 } 1261 } 1262 1263 static void deinit_local(void) 1264 { 1265 int i; 1266 1267 for (i = 0; i < dlm_local_count; i++) 1268 kfree(dlm_local_addr[i]); 1269 } 1270 1271 /* Initialise SCTP socket and bind to all interfaces 1272 * On error caller must run dlm_close_sock() for the 1273 * listen connection socket. 1274 */ 1275 static int sctp_listen_for_all(struct listen_connection *con) 1276 { 1277 struct socket *sock = NULL; 1278 int result = -EINVAL; 1279 1280 log_print("Using SCTP for communications"); 1281 1282 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1283 SOCK_STREAM, IPPROTO_SCTP, &sock); 1284 if (result < 0) { 1285 log_print("Can't create comms socket, check SCTP is loaded"); 1286 goto out; 1287 } 1288 1289 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1290 sock_set_mark(sock->sk, dlm_config.ci_mark); 1291 sctp_sock_set_nodelay(sock->sk); 1292 1293 add_listen_sock(sock, con); 1294 1295 /* Bind to all addresses. */ 1296 result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port); 1297 if (result < 0) 1298 goto out; 1299 1300 result = sock->ops->listen(sock, 5); 1301 if (result < 0) { 1302 log_print("Can't set socket listening"); 1303 goto out; 1304 } 1305 1306 return 0; 1307 1308 out: 1309 return result; 1310 } 1311 1312 static int tcp_listen_for_all(void) 1313 { 1314 /* We don't support multi-homed hosts */ 1315 if (dlm_local_count > 1) { 1316 log_print("TCP protocol can't handle multi-homed hosts, " 1317 "try SCTP"); 1318 return -EINVAL; 1319 } 1320 1321 log_print("Using TCP for communications"); 1322 1323 return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]); 1324 } 1325 1326 1327 1328 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1329 gfp_t allocation) 1330 { 1331 struct writequeue_entry *entry; 1332 1333 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1334 if (!entry) 1335 return NULL; 1336 1337 entry->page = alloc_page(allocation); 1338 if (!entry->page) { 1339 kfree(entry); 1340 return NULL; 1341 } 1342 1343 entry->offset = 0; 1344 entry->len = 0; 1345 entry->end = 0; 1346 entry->users = 0; 1347 entry->con = con; 1348 1349 return entry; 1350 } 1351 1352 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1353 { 1354 struct connection *con; 1355 struct writequeue_entry *e; 1356 int offset = 0; 1357 1358 if (len > LOWCOMMS_MAX_TX_BUFFER_LEN) { 1359 BUILD_BUG_ON(PAGE_SIZE < LOWCOMMS_MAX_TX_BUFFER_LEN); 1360 log_print("failed to allocate a buffer of size %d", len); 1361 return NULL; 1362 } 1363 1364 con = nodeid2con(nodeid, allocation); 1365 if (!con) 1366 return NULL; 1367 1368 spin_lock(&con->writequeue_lock); 1369 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1370 if ((&e->list == &con->writequeue) || 1371 (PAGE_SIZE - e->end < len)) { 1372 e = NULL; 1373 } else { 1374 offset = e->end; 1375 e->end += len; 1376 e->users++; 1377 } 1378 spin_unlock(&con->writequeue_lock); 1379 1380 if (e) { 1381 got_one: 1382 *ppc = page_address(e->page) + offset; 1383 return e; 1384 } 1385 1386 e = new_writequeue_entry(con, allocation); 1387 if (e) { 1388 spin_lock(&con->writequeue_lock); 1389 offset = e->end; 1390 e->end += len; 1391 e->users++; 1392 list_add_tail(&e->list, &con->writequeue); 1393 spin_unlock(&con->writequeue_lock); 1394 goto got_one; 1395 } 1396 return NULL; 1397 } 1398 1399 void dlm_lowcomms_commit_buffer(void *mh) 1400 { 1401 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1402 struct connection *con = e->con; 1403 int users; 1404 1405 spin_lock(&con->writequeue_lock); 1406 users = --e->users; 1407 if (users) 1408 goto out; 1409 e->len = e->end - e->offset; 1410 spin_unlock(&con->writequeue_lock); 1411 1412 queue_work(send_workqueue, &con->swork); 1413 return; 1414 1415 out: 1416 spin_unlock(&con->writequeue_lock); 1417 return; 1418 } 1419 1420 /* Send a message */ 1421 static void send_to_sock(struct connection *con) 1422 { 1423 int ret = 0; 1424 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1425 struct writequeue_entry *e; 1426 int len, offset; 1427 int count = 0; 1428 1429 mutex_lock(&con->sock_mutex); 1430 if (con->sock == NULL) 1431 goto out_connect; 1432 1433 spin_lock(&con->writequeue_lock); 1434 for (;;) { 1435 e = list_entry(con->writequeue.next, struct writequeue_entry, 1436 list); 1437 if ((struct list_head *) e == &con->writequeue) 1438 break; 1439 1440 len = e->len; 1441 offset = e->offset; 1442 BUG_ON(len == 0 && e->users == 0); 1443 spin_unlock(&con->writequeue_lock); 1444 1445 ret = 0; 1446 if (len) { 1447 ret = kernel_sendpage(con->sock, e->page, offset, len, 1448 msg_flags); 1449 if (ret == -EAGAIN || ret == 0) { 1450 if (ret == -EAGAIN && 1451 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1452 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1453 /* Notify TCP that we're limited by the 1454 * application window size. 1455 */ 1456 set_bit(SOCK_NOSPACE, &con->sock->flags); 1457 con->sock->sk->sk_write_pending++; 1458 } 1459 cond_resched(); 1460 goto out; 1461 } else if (ret < 0) 1462 goto send_error; 1463 } 1464 1465 /* Don't starve people filling buffers */ 1466 if (++count >= MAX_SEND_MSG_COUNT) { 1467 cond_resched(); 1468 count = 0; 1469 } 1470 1471 spin_lock(&con->writequeue_lock); 1472 writequeue_entry_complete(e, ret); 1473 } 1474 spin_unlock(&con->writequeue_lock); 1475 out: 1476 mutex_unlock(&con->sock_mutex); 1477 return; 1478 1479 send_error: 1480 mutex_unlock(&con->sock_mutex); 1481 close_connection(con, false, false, true); 1482 /* Requeue the send work. When the work daemon runs again, it will try 1483 a new connection, then call this function again. */ 1484 queue_work(send_workqueue, &con->swork); 1485 return; 1486 1487 out_connect: 1488 mutex_unlock(&con->sock_mutex); 1489 queue_work(send_workqueue, &con->swork); 1490 cond_resched(); 1491 } 1492 1493 static void clean_one_writequeue(struct connection *con) 1494 { 1495 struct writequeue_entry *e, *safe; 1496 1497 spin_lock(&con->writequeue_lock); 1498 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1499 list_del(&e->list); 1500 free_entry(e); 1501 } 1502 spin_unlock(&con->writequeue_lock); 1503 } 1504 1505 /* Called from recovery when it knows that a node has 1506 left the cluster */ 1507 int dlm_lowcomms_close(int nodeid) 1508 { 1509 struct connection *con; 1510 struct dlm_node_addr *na; 1511 1512 log_print("closing connection to node %d", nodeid); 1513 con = nodeid2con(nodeid, 0); 1514 if (con) { 1515 set_bit(CF_CLOSE, &con->flags); 1516 close_connection(con, true, true, true); 1517 clean_one_writequeue(con); 1518 if (con->othercon) 1519 clean_one_writequeue(con->othercon); 1520 } 1521 1522 spin_lock(&dlm_node_addrs_spin); 1523 na = find_node_addr(nodeid); 1524 if (na) { 1525 list_del(&na->list); 1526 while (na->addr_count--) 1527 kfree(na->addr[na->addr_count]); 1528 kfree(na); 1529 } 1530 spin_unlock(&dlm_node_addrs_spin); 1531 1532 return 0; 1533 } 1534 1535 /* Receive workqueue function */ 1536 static void process_recv_sockets(struct work_struct *work) 1537 { 1538 struct connection *con = container_of(work, struct connection, rwork); 1539 int err; 1540 1541 clear_bit(CF_READ_PENDING, &con->flags); 1542 do { 1543 err = receive_from_sock(con); 1544 } while (!err); 1545 } 1546 1547 static void process_listen_recv_socket(struct work_struct *work) 1548 { 1549 accept_from_sock(&listen_con); 1550 } 1551 1552 /* Send workqueue function */ 1553 static void process_send_sockets(struct work_struct *work) 1554 { 1555 struct connection *con = container_of(work, struct connection, swork); 1556 1557 clear_bit(CF_WRITE_PENDING, &con->flags); 1558 if (con->sock == NULL) /* not mutex protected so check it inside too */ 1559 con->connect_action(con); 1560 if (!list_empty(&con->writequeue)) 1561 send_to_sock(con); 1562 } 1563 1564 static void work_stop(void) 1565 { 1566 if (recv_workqueue) 1567 destroy_workqueue(recv_workqueue); 1568 if (send_workqueue) 1569 destroy_workqueue(send_workqueue); 1570 } 1571 1572 static int work_start(void) 1573 { 1574 recv_workqueue = alloc_workqueue("dlm_recv", 1575 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1576 if (!recv_workqueue) { 1577 log_print("can't start dlm_recv"); 1578 return -ENOMEM; 1579 } 1580 1581 send_workqueue = alloc_workqueue("dlm_send", 1582 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1583 if (!send_workqueue) { 1584 log_print("can't start dlm_send"); 1585 destroy_workqueue(recv_workqueue); 1586 return -ENOMEM; 1587 } 1588 1589 return 0; 1590 } 1591 1592 static void _stop_conn(struct connection *con, bool and_other) 1593 { 1594 mutex_lock(&con->sock_mutex); 1595 set_bit(CF_CLOSE, &con->flags); 1596 set_bit(CF_READ_PENDING, &con->flags); 1597 set_bit(CF_WRITE_PENDING, &con->flags); 1598 if (con->sock && con->sock->sk) { 1599 write_lock_bh(&con->sock->sk->sk_callback_lock); 1600 con->sock->sk->sk_user_data = NULL; 1601 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1602 } 1603 if (con->othercon && and_other) 1604 _stop_conn(con->othercon, false); 1605 mutex_unlock(&con->sock_mutex); 1606 } 1607 1608 static void stop_conn(struct connection *con) 1609 { 1610 _stop_conn(con, true); 1611 } 1612 1613 static void shutdown_conn(struct connection *con) 1614 { 1615 if (con->shutdown_action) 1616 con->shutdown_action(con); 1617 } 1618 1619 static void connection_release(struct rcu_head *rcu) 1620 { 1621 struct connection *con = container_of(rcu, struct connection, rcu); 1622 1623 kfree(con->rx_buf); 1624 kfree(con); 1625 } 1626 1627 static void free_conn(struct connection *con) 1628 { 1629 close_connection(con, true, true, true); 1630 spin_lock(&connections_lock); 1631 hlist_del_rcu(&con->list); 1632 spin_unlock(&connections_lock); 1633 if (con->othercon) { 1634 clean_one_writequeue(con->othercon); 1635 call_srcu(&connections_srcu, &con->othercon->rcu, 1636 connection_release); 1637 } 1638 clean_one_writequeue(con); 1639 call_srcu(&connections_srcu, &con->rcu, connection_release); 1640 } 1641 1642 static void work_flush(void) 1643 { 1644 int ok, idx; 1645 int i; 1646 struct connection *con; 1647 1648 do { 1649 ok = 1; 1650 foreach_conn(stop_conn); 1651 if (recv_workqueue) 1652 flush_workqueue(recv_workqueue); 1653 if (send_workqueue) 1654 flush_workqueue(send_workqueue); 1655 idx = srcu_read_lock(&connections_srcu); 1656 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1657 hlist_for_each_entry_rcu(con, &connection_hash[i], 1658 list) { 1659 ok &= test_bit(CF_READ_PENDING, &con->flags); 1660 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1661 if (con->othercon) { 1662 ok &= test_bit(CF_READ_PENDING, 1663 &con->othercon->flags); 1664 ok &= test_bit(CF_WRITE_PENDING, 1665 &con->othercon->flags); 1666 } 1667 } 1668 } 1669 srcu_read_unlock(&connections_srcu, idx); 1670 } while (!ok); 1671 } 1672 1673 void dlm_lowcomms_stop(void) 1674 { 1675 /* Set all the flags to prevent any 1676 socket activity. 1677 */ 1678 dlm_allow_conn = 0; 1679 1680 if (recv_workqueue) 1681 flush_workqueue(recv_workqueue); 1682 if (send_workqueue) 1683 flush_workqueue(send_workqueue); 1684 1685 dlm_close_sock(&listen_con.sock); 1686 1687 foreach_conn(shutdown_conn); 1688 work_flush(); 1689 foreach_conn(free_conn); 1690 work_stop(); 1691 deinit_local(); 1692 } 1693 1694 int dlm_lowcomms_start(void) 1695 { 1696 int error = -EINVAL; 1697 int i; 1698 1699 for (i = 0; i < CONN_HASH_SIZE; i++) 1700 INIT_HLIST_HEAD(&connection_hash[i]); 1701 1702 init_local(); 1703 if (!dlm_local_count) { 1704 error = -ENOTCONN; 1705 log_print("no local IP address has been set"); 1706 goto fail; 1707 } 1708 1709 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1710 1711 error = work_start(); 1712 if (error) 1713 goto fail; 1714 1715 dlm_allow_conn = 1; 1716 1717 /* Start listening */ 1718 if (dlm_config.ci_protocol == 0) 1719 error = tcp_listen_for_all(); 1720 else 1721 error = sctp_listen_for_all(&listen_con); 1722 if (error) 1723 goto fail_unlisten; 1724 1725 return 0; 1726 1727 fail_unlisten: 1728 dlm_allow_conn = 0; 1729 dlm_close_sock(&listen_con.sock); 1730 fail: 1731 return error; 1732 } 1733 1734 void dlm_lowcomms_exit(void) 1735 { 1736 struct dlm_node_addr *na, *safe; 1737 1738 spin_lock(&dlm_node_addrs_spin); 1739 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1740 list_del(&na->list); 1741 while (na->addr_count--) 1742 kfree(na->addr[na->addr_count]); 1743 kfree(na); 1744 } 1745 spin_unlock(&dlm_node_addrs_spin); 1746 } 1747