1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 #define CONN_HASH_SIZE 32 63 64 /* Number of messages to send before rescheduling */ 65 #define MAX_SEND_MSG_COUNT 25 66 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 67 68 struct connection { 69 struct socket *sock; /* NULL if not connected */ 70 uint32_t nodeid; /* So we know who we are in the list */ 71 struct mutex sock_mutex; 72 unsigned long flags; 73 #define CF_READ_PENDING 1 74 #define CF_WRITE_PENDING 2 75 #define CF_INIT_PENDING 4 76 #define CF_IS_OTHERCON 5 77 #define CF_CLOSE 6 78 #define CF_APP_LIMITED 7 79 #define CF_CLOSING 8 80 #define CF_SHUTDOWN 9 81 struct list_head writequeue; /* List of outgoing writequeue_entries */ 82 spinlock_t writequeue_lock; 83 int (*rx_action) (struct connection *); /* What to do when active */ 84 void (*connect_action) (struct connection *); /* What to do to connect */ 85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 86 int retries; 87 #define MAX_CONNECT_RETRIES 3 88 struct hlist_node list; 89 struct connection *othercon; 90 struct work_struct rwork; /* Receive workqueue */ 91 struct work_struct swork; /* Send workqueue */ 92 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 93 unsigned char *rx_buf; 94 int rx_buflen; 95 int rx_leftover; 96 struct rcu_head rcu; 97 }; 98 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 99 100 /* An entry waiting to be sent */ 101 struct writequeue_entry { 102 struct list_head list; 103 struct page *page; 104 int offset; 105 int len; 106 int end; 107 int users; 108 struct connection *con; 109 }; 110 111 struct dlm_node_addr { 112 struct list_head list; 113 int nodeid; 114 int addr_count; 115 int curr_addr_index; 116 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 117 }; 118 119 static struct listen_sock_callbacks { 120 void (*sk_error_report)(struct sock *); 121 void (*sk_data_ready)(struct sock *); 122 void (*sk_state_change)(struct sock *); 123 void (*sk_write_space)(struct sock *); 124 } listen_sock; 125 126 static LIST_HEAD(dlm_node_addrs); 127 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 128 129 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 130 static int dlm_local_count; 131 static int dlm_allow_conn; 132 133 /* Work queues */ 134 static struct workqueue_struct *recv_workqueue; 135 static struct workqueue_struct *send_workqueue; 136 137 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 138 static DEFINE_SPINLOCK(connections_lock); 139 DEFINE_STATIC_SRCU(connections_srcu); 140 141 static void process_recv_sockets(struct work_struct *work); 142 static void process_send_sockets(struct work_struct *work); 143 144 145 /* This is deliberately very simple because most clusters have simple 146 sequential nodeids, so we should be able to go straight to a connection 147 struct in the array */ 148 static inline int nodeid_hash(int nodeid) 149 { 150 return nodeid & (CONN_HASH_SIZE-1); 151 } 152 153 static struct connection *__find_con(int nodeid) 154 { 155 int r, idx; 156 struct connection *con; 157 158 r = nodeid_hash(nodeid); 159 160 idx = srcu_read_lock(&connections_srcu); 161 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 162 if (con->nodeid == nodeid) { 163 srcu_read_unlock(&connections_srcu, idx); 164 return con; 165 } 166 } 167 srcu_read_unlock(&connections_srcu, idx); 168 169 return NULL; 170 } 171 172 /* 173 * If 'allocation' is zero then we don't attempt to create a new 174 * connection structure for this node. 175 */ 176 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 177 { 178 struct connection *con, *tmp; 179 int r; 180 181 con = __find_con(nodeid); 182 if (con || !alloc) 183 return con; 184 185 con = kzalloc(sizeof(*con), alloc); 186 if (!con) 187 return NULL; 188 189 con->rx_buflen = dlm_config.ci_buffer_size; 190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 191 if (!con->rx_buf) { 192 kfree(con); 193 return NULL; 194 } 195 196 con->nodeid = nodeid; 197 mutex_init(&con->sock_mutex); 198 INIT_LIST_HEAD(&con->writequeue); 199 spin_lock_init(&con->writequeue_lock); 200 INIT_WORK(&con->swork, process_send_sockets); 201 INIT_WORK(&con->rwork, process_recv_sockets); 202 init_waitqueue_head(&con->shutdown_wait); 203 204 /* Setup action pointers for child sockets */ 205 if (con->nodeid) { 206 struct connection *zerocon = __find_con(0); 207 208 con->connect_action = zerocon->connect_action; 209 if (!con->rx_action) 210 con->rx_action = zerocon->rx_action; 211 } 212 213 r = nodeid_hash(nodeid); 214 215 spin_lock(&connections_lock); 216 /* Because multiple workqueues/threads calls this function it can 217 * race on multiple cpu's. Instead of locking hot path __find_con() 218 * we just check in rare cases of recently added nodes again 219 * under protection of connections_lock. If this is the case we 220 * abort our connection creation and return the existing connection. 221 */ 222 tmp = __find_con(nodeid); 223 if (tmp) { 224 spin_unlock(&connections_lock); 225 kfree(con->rx_buf); 226 kfree(con); 227 return tmp; 228 } 229 230 hlist_add_head_rcu(&con->list, &connection_hash[r]); 231 spin_unlock(&connections_lock); 232 233 return con; 234 } 235 236 /* Loop round all connections */ 237 static void foreach_conn(void (*conn_func)(struct connection *c)) 238 { 239 int i, idx; 240 struct connection *con; 241 242 idx = srcu_read_lock(&connections_srcu); 243 for (i = 0; i < CONN_HASH_SIZE; i++) { 244 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 245 conn_func(con); 246 } 247 srcu_read_unlock(&connections_srcu, idx); 248 } 249 250 static struct dlm_node_addr *find_node_addr(int nodeid) 251 { 252 struct dlm_node_addr *na; 253 254 list_for_each_entry(na, &dlm_node_addrs, list) { 255 if (na->nodeid == nodeid) 256 return na; 257 } 258 return NULL; 259 } 260 261 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 262 { 263 switch (x->ss_family) { 264 case AF_INET: { 265 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 266 struct sockaddr_in *siny = (struct sockaddr_in *)y; 267 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 268 return 0; 269 if (sinx->sin_port != siny->sin_port) 270 return 0; 271 break; 272 } 273 case AF_INET6: { 274 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 275 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 276 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 277 return 0; 278 if (sinx->sin6_port != siny->sin6_port) 279 return 0; 280 break; 281 } 282 default: 283 return 0; 284 } 285 return 1; 286 } 287 288 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 289 struct sockaddr *sa_out, bool try_new_addr) 290 { 291 struct sockaddr_storage sas; 292 struct dlm_node_addr *na; 293 294 if (!dlm_local_count) 295 return -1; 296 297 spin_lock(&dlm_node_addrs_spin); 298 na = find_node_addr(nodeid); 299 if (na && na->addr_count) { 300 memcpy(&sas, na->addr[na->curr_addr_index], 301 sizeof(struct sockaddr_storage)); 302 303 if (try_new_addr) { 304 na->curr_addr_index++; 305 if (na->curr_addr_index == na->addr_count) 306 na->curr_addr_index = 0; 307 } 308 } 309 spin_unlock(&dlm_node_addrs_spin); 310 311 if (!na) 312 return -EEXIST; 313 314 if (!na->addr_count) 315 return -ENOENT; 316 317 if (sas_out) 318 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 319 320 if (!sa_out) 321 return 0; 322 323 if (dlm_local_addr[0]->ss_family == AF_INET) { 324 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 325 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 326 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 327 } else { 328 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 329 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 330 ret6->sin6_addr = in6->sin6_addr; 331 } 332 333 return 0; 334 } 335 336 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 337 { 338 struct dlm_node_addr *na; 339 int rv = -EEXIST; 340 int addr_i; 341 342 spin_lock(&dlm_node_addrs_spin); 343 list_for_each_entry(na, &dlm_node_addrs, list) { 344 if (!na->addr_count) 345 continue; 346 347 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 348 if (addr_compare(na->addr[addr_i], addr)) { 349 *nodeid = na->nodeid; 350 rv = 0; 351 goto unlock; 352 } 353 } 354 } 355 unlock: 356 spin_unlock(&dlm_node_addrs_spin); 357 return rv; 358 } 359 360 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 361 { 362 struct sockaddr_storage *new_addr; 363 struct dlm_node_addr *new_node, *na; 364 365 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 366 if (!new_node) 367 return -ENOMEM; 368 369 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 370 if (!new_addr) { 371 kfree(new_node); 372 return -ENOMEM; 373 } 374 375 memcpy(new_addr, addr, len); 376 377 spin_lock(&dlm_node_addrs_spin); 378 na = find_node_addr(nodeid); 379 if (!na) { 380 new_node->nodeid = nodeid; 381 new_node->addr[0] = new_addr; 382 new_node->addr_count = 1; 383 list_add(&new_node->list, &dlm_node_addrs); 384 spin_unlock(&dlm_node_addrs_spin); 385 return 0; 386 } 387 388 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 389 spin_unlock(&dlm_node_addrs_spin); 390 kfree(new_addr); 391 kfree(new_node); 392 return -ENOSPC; 393 } 394 395 na->addr[na->addr_count++] = new_addr; 396 spin_unlock(&dlm_node_addrs_spin); 397 kfree(new_node); 398 return 0; 399 } 400 401 /* Data available on socket or listen socket received a connect */ 402 static void lowcomms_data_ready(struct sock *sk) 403 { 404 struct connection *con; 405 406 read_lock_bh(&sk->sk_callback_lock); 407 con = sock2con(sk); 408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 409 queue_work(recv_workqueue, &con->rwork); 410 read_unlock_bh(&sk->sk_callback_lock); 411 } 412 413 static void lowcomms_write_space(struct sock *sk) 414 { 415 struct connection *con; 416 417 read_lock_bh(&sk->sk_callback_lock); 418 con = sock2con(sk); 419 if (!con) 420 goto out; 421 422 clear_bit(SOCK_NOSPACE, &con->sock->flags); 423 424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 425 con->sock->sk->sk_write_pending--; 426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 427 } 428 429 queue_work(send_workqueue, &con->swork); 430 out: 431 read_unlock_bh(&sk->sk_callback_lock); 432 } 433 434 static inline void lowcomms_connect_sock(struct connection *con) 435 { 436 if (test_bit(CF_CLOSE, &con->flags)) 437 return; 438 queue_work(send_workqueue, &con->swork); 439 cond_resched(); 440 } 441 442 static void lowcomms_state_change(struct sock *sk) 443 { 444 /* SCTP layer is not calling sk_data_ready when the connection 445 * is done, so we catch the signal through here. Also, it 446 * doesn't switch socket state when entering shutdown, so we 447 * skip the write in that case. 448 */ 449 if (sk->sk_shutdown) { 450 if (sk->sk_shutdown == RCV_SHUTDOWN) 451 lowcomms_data_ready(sk); 452 } else if (sk->sk_state == TCP_ESTABLISHED) { 453 lowcomms_write_space(sk); 454 } 455 } 456 457 int dlm_lowcomms_connect_node(int nodeid) 458 { 459 struct connection *con; 460 461 if (nodeid == dlm_our_nodeid()) 462 return 0; 463 464 con = nodeid2con(nodeid, GFP_NOFS); 465 if (!con) 466 return -ENOMEM; 467 lowcomms_connect_sock(con); 468 return 0; 469 } 470 471 static void lowcomms_error_report(struct sock *sk) 472 { 473 struct connection *con; 474 struct sockaddr_storage saddr; 475 void (*orig_report)(struct sock *) = NULL; 476 477 read_lock_bh(&sk->sk_callback_lock); 478 con = sock2con(sk); 479 if (con == NULL) 480 goto out; 481 482 orig_report = listen_sock.sk_error_report; 483 if (con->sock == NULL || 484 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 485 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 486 "sending to node %d, port %d, " 487 "sk_err=%d/%d\n", dlm_our_nodeid(), 488 con->nodeid, dlm_config.ci_tcp_port, 489 sk->sk_err, sk->sk_err_soft); 490 } else if (saddr.ss_family == AF_INET) { 491 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 492 493 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 494 "sending to node %d at %pI4, port %d, " 495 "sk_err=%d/%d\n", dlm_our_nodeid(), 496 con->nodeid, &sin4->sin_addr.s_addr, 497 dlm_config.ci_tcp_port, sk->sk_err, 498 sk->sk_err_soft); 499 } else { 500 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 501 502 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 503 "sending to node %d at %u.%u.%u.%u, " 504 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 505 con->nodeid, sin6->sin6_addr.s6_addr32[0], 506 sin6->sin6_addr.s6_addr32[1], 507 sin6->sin6_addr.s6_addr32[2], 508 sin6->sin6_addr.s6_addr32[3], 509 dlm_config.ci_tcp_port, sk->sk_err, 510 sk->sk_err_soft); 511 } 512 out: 513 read_unlock_bh(&sk->sk_callback_lock); 514 if (orig_report) 515 orig_report(sk); 516 } 517 518 /* Note: sk_callback_lock must be locked before calling this function. */ 519 static void save_listen_callbacks(struct socket *sock) 520 { 521 struct sock *sk = sock->sk; 522 523 listen_sock.sk_data_ready = sk->sk_data_ready; 524 listen_sock.sk_state_change = sk->sk_state_change; 525 listen_sock.sk_write_space = sk->sk_write_space; 526 listen_sock.sk_error_report = sk->sk_error_report; 527 } 528 529 static void restore_callbacks(struct socket *sock) 530 { 531 struct sock *sk = sock->sk; 532 533 write_lock_bh(&sk->sk_callback_lock); 534 sk->sk_user_data = NULL; 535 sk->sk_data_ready = listen_sock.sk_data_ready; 536 sk->sk_state_change = listen_sock.sk_state_change; 537 sk->sk_write_space = listen_sock.sk_write_space; 538 sk->sk_error_report = listen_sock.sk_error_report; 539 write_unlock_bh(&sk->sk_callback_lock); 540 } 541 542 /* Make a socket active */ 543 static void add_sock(struct socket *sock, struct connection *con) 544 { 545 struct sock *sk = sock->sk; 546 547 write_lock_bh(&sk->sk_callback_lock); 548 con->sock = sock; 549 550 sk->sk_user_data = con; 551 /* Install a data_ready callback */ 552 sk->sk_data_ready = lowcomms_data_ready; 553 sk->sk_write_space = lowcomms_write_space; 554 sk->sk_state_change = lowcomms_state_change; 555 sk->sk_allocation = GFP_NOFS; 556 sk->sk_error_report = lowcomms_error_report; 557 write_unlock_bh(&sk->sk_callback_lock); 558 } 559 560 /* Add the port number to an IPv6 or 4 sockaddr and return the address 561 length */ 562 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 563 int *addr_len) 564 { 565 saddr->ss_family = dlm_local_addr[0]->ss_family; 566 if (saddr->ss_family == AF_INET) { 567 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 568 in4_addr->sin_port = cpu_to_be16(port); 569 *addr_len = sizeof(struct sockaddr_in); 570 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 571 } else { 572 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 573 in6_addr->sin6_port = cpu_to_be16(port); 574 *addr_len = sizeof(struct sockaddr_in6); 575 } 576 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 577 } 578 579 /* Close a remote connection and tidy up */ 580 static void close_connection(struct connection *con, bool and_other, 581 bool tx, bool rx) 582 { 583 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 584 585 if (tx && !closing && cancel_work_sync(&con->swork)) { 586 log_print("canceled swork for node %d", con->nodeid); 587 clear_bit(CF_WRITE_PENDING, &con->flags); 588 } 589 if (rx && !closing && cancel_work_sync(&con->rwork)) { 590 log_print("canceled rwork for node %d", con->nodeid); 591 clear_bit(CF_READ_PENDING, &con->flags); 592 } 593 594 mutex_lock(&con->sock_mutex); 595 if (con->sock) { 596 restore_callbacks(con->sock); 597 sock_release(con->sock); 598 con->sock = NULL; 599 } 600 if (con->othercon && and_other) { 601 /* Will only re-enter once. */ 602 close_connection(con->othercon, false, true, true); 603 } 604 605 con->rx_leftover = 0; 606 con->retries = 0; 607 mutex_unlock(&con->sock_mutex); 608 clear_bit(CF_CLOSING, &con->flags); 609 } 610 611 static void shutdown_connection(struct connection *con) 612 { 613 int ret; 614 615 if (cancel_work_sync(&con->swork)) { 616 log_print("canceled swork for node %d", con->nodeid); 617 clear_bit(CF_WRITE_PENDING, &con->flags); 618 } 619 620 mutex_lock(&con->sock_mutex); 621 /* nothing to shutdown */ 622 if (!con->sock) { 623 mutex_unlock(&con->sock_mutex); 624 return; 625 } 626 627 set_bit(CF_SHUTDOWN, &con->flags); 628 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 629 mutex_unlock(&con->sock_mutex); 630 if (ret) { 631 log_print("Connection %p failed to shutdown: %d will force close", 632 con, ret); 633 goto force_close; 634 } else { 635 ret = wait_event_timeout(con->shutdown_wait, 636 !test_bit(CF_SHUTDOWN, &con->flags), 637 DLM_SHUTDOWN_WAIT_TIMEOUT); 638 if (ret == 0) { 639 log_print("Connection %p shutdown timed out, will force close", 640 con); 641 goto force_close; 642 } 643 } 644 645 return; 646 647 force_close: 648 clear_bit(CF_SHUTDOWN, &con->flags); 649 close_connection(con, false, true, true); 650 } 651 652 static void dlm_tcp_shutdown(struct connection *con) 653 { 654 if (con->othercon) 655 shutdown_connection(con->othercon); 656 shutdown_connection(con); 657 } 658 659 static int con_realloc_receive_buf(struct connection *con, int newlen) 660 { 661 unsigned char *newbuf; 662 663 newbuf = kmalloc(newlen, GFP_NOFS); 664 if (!newbuf) 665 return -ENOMEM; 666 667 /* copy any leftover from last receive */ 668 if (con->rx_leftover) 669 memmove(newbuf, con->rx_buf, con->rx_leftover); 670 671 /* swap to new buffer space */ 672 kfree(con->rx_buf); 673 con->rx_buflen = newlen; 674 con->rx_buf = newbuf; 675 676 return 0; 677 } 678 679 /* Data received from remote end */ 680 static int receive_from_sock(struct connection *con) 681 { 682 int call_again_soon = 0; 683 struct msghdr msg; 684 struct kvec iov; 685 int ret, buflen; 686 687 mutex_lock(&con->sock_mutex); 688 689 if (con->sock == NULL) { 690 ret = -EAGAIN; 691 goto out_close; 692 } 693 694 if (con->nodeid == 0) { 695 ret = -EINVAL; 696 goto out_close; 697 } 698 699 /* realloc if we get new buffer size to read out */ 700 buflen = dlm_config.ci_buffer_size; 701 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 702 ret = con_realloc_receive_buf(con, buflen); 703 if (ret < 0) 704 goto out_resched; 705 } 706 707 /* calculate new buffer parameter regarding last receive and 708 * possible leftover bytes 709 */ 710 iov.iov_base = con->rx_buf + con->rx_leftover; 711 iov.iov_len = con->rx_buflen - con->rx_leftover; 712 713 memset(&msg, 0, sizeof(msg)); 714 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 715 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 716 msg.msg_flags); 717 if (ret <= 0) 718 goto out_close; 719 else if (ret == iov.iov_len) 720 call_again_soon = 1; 721 722 /* new buflen according readed bytes and leftover from last receive */ 723 buflen = ret + con->rx_leftover; 724 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 725 if (ret < 0) 726 goto out_close; 727 728 /* calculate leftover bytes from process and put it into begin of 729 * the receive buffer, so next receive we have the full message 730 * at the start address of the receive buffer. 731 */ 732 con->rx_leftover = buflen - ret; 733 if (con->rx_leftover) { 734 memmove(con->rx_buf, con->rx_buf + ret, 735 con->rx_leftover); 736 call_again_soon = true; 737 } 738 739 if (call_again_soon) 740 goto out_resched; 741 742 mutex_unlock(&con->sock_mutex); 743 return 0; 744 745 out_resched: 746 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 747 queue_work(recv_workqueue, &con->rwork); 748 mutex_unlock(&con->sock_mutex); 749 return -EAGAIN; 750 751 out_close: 752 mutex_unlock(&con->sock_mutex); 753 if (ret != -EAGAIN) { 754 /* Reconnect when there is something to send */ 755 close_connection(con, false, true, false); 756 if (ret == 0) { 757 log_print("connection %p got EOF from %d", 758 con, con->nodeid); 759 /* handling for tcp shutdown */ 760 clear_bit(CF_SHUTDOWN, &con->flags); 761 wake_up(&con->shutdown_wait); 762 /* signal to breaking receive worker */ 763 ret = -1; 764 } 765 } 766 return ret; 767 } 768 769 /* Listening socket is busy, accept a connection */ 770 static int accept_from_sock(struct connection *con) 771 { 772 int result; 773 struct sockaddr_storage peeraddr; 774 struct socket *newsock; 775 int len; 776 int nodeid; 777 struct connection *newcon; 778 struct connection *addcon; 779 unsigned int mark; 780 781 if (!dlm_allow_conn) { 782 return -1; 783 } 784 785 mutex_lock_nested(&con->sock_mutex, 0); 786 787 if (!con->sock) { 788 mutex_unlock(&con->sock_mutex); 789 return -ENOTCONN; 790 } 791 792 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 793 if (result < 0) 794 goto accept_err; 795 796 /* Get the connected socket's peer */ 797 memset(&peeraddr, 0, sizeof(peeraddr)); 798 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 799 if (len < 0) { 800 result = -ECONNABORTED; 801 goto accept_err; 802 } 803 804 /* Get the new node's NODEID */ 805 make_sockaddr(&peeraddr, 0, &len); 806 if (addr_to_nodeid(&peeraddr, &nodeid)) { 807 unsigned char *b=(unsigned char *)&peeraddr; 808 log_print("connect from non cluster node"); 809 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 810 b, sizeof(struct sockaddr_storage)); 811 sock_release(newsock); 812 mutex_unlock(&con->sock_mutex); 813 return -1; 814 } 815 816 dlm_comm_mark(nodeid, &mark); 817 sock_set_mark(newsock->sk, mark); 818 819 log_print("got connection from %d", nodeid); 820 821 /* Check to see if we already have a connection to this node. This 822 * could happen if the two nodes initiate a connection at roughly 823 * the same time and the connections cross on the wire. 824 * In this case we store the incoming one in "othercon" 825 */ 826 newcon = nodeid2con(nodeid, GFP_NOFS); 827 if (!newcon) { 828 result = -ENOMEM; 829 goto accept_err; 830 } 831 mutex_lock_nested(&newcon->sock_mutex, 1); 832 if (newcon->sock) { 833 struct connection *othercon = newcon->othercon; 834 835 if (!othercon) { 836 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 837 if (!othercon) { 838 log_print("failed to allocate incoming socket"); 839 mutex_unlock(&newcon->sock_mutex); 840 result = -ENOMEM; 841 goto accept_err; 842 } 843 844 othercon->rx_buflen = dlm_config.ci_buffer_size; 845 othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); 846 if (!othercon->rx_buf) { 847 mutex_unlock(&newcon->sock_mutex); 848 kfree(othercon); 849 log_print("failed to allocate incoming socket receive buffer"); 850 result = -ENOMEM; 851 goto accept_err; 852 } 853 854 othercon->nodeid = nodeid; 855 othercon->rx_action = receive_from_sock; 856 mutex_init(&othercon->sock_mutex); 857 INIT_LIST_HEAD(&othercon->writequeue); 858 spin_lock_init(&othercon->writequeue_lock); 859 INIT_WORK(&othercon->swork, process_send_sockets); 860 INIT_WORK(&othercon->rwork, process_recv_sockets); 861 init_waitqueue_head(&othercon->shutdown_wait); 862 set_bit(CF_IS_OTHERCON, &othercon->flags); 863 } else { 864 /* close other sock con if we have something new */ 865 close_connection(othercon, false, true, false); 866 } 867 868 mutex_lock_nested(&othercon->sock_mutex, 2); 869 newcon->othercon = othercon; 870 add_sock(newsock, othercon); 871 addcon = othercon; 872 mutex_unlock(&othercon->sock_mutex); 873 } 874 else { 875 newcon->rx_action = receive_from_sock; 876 /* accept copies the sk after we've saved the callbacks, so we 877 don't want to save them a second time or comm errors will 878 result in calling sk_error_report recursively. */ 879 add_sock(newsock, newcon); 880 addcon = newcon; 881 } 882 883 mutex_unlock(&newcon->sock_mutex); 884 885 /* 886 * Add it to the active queue in case we got data 887 * between processing the accept adding the socket 888 * to the read_sockets list 889 */ 890 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 891 queue_work(recv_workqueue, &addcon->rwork); 892 mutex_unlock(&con->sock_mutex); 893 894 return 0; 895 896 accept_err: 897 mutex_unlock(&con->sock_mutex); 898 if (newsock) 899 sock_release(newsock); 900 901 if (result != -EAGAIN) 902 log_print("error accepting connection from node: %d", result); 903 return result; 904 } 905 906 static void free_entry(struct writequeue_entry *e) 907 { 908 __free_page(e->page); 909 kfree(e); 910 } 911 912 /* 913 * writequeue_entry_complete - try to delete and free write queue entry 914 * @e: write queue entry to try to delete 915 * @completed: bytes completed 916 * 917 * writequeue_lock must be held. 918 */ 919 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 920 { 921 e->offset += completed; 922 e->len -= completed; 923 924 if (e->len == 0 && e->users == 0) { 925 list_del(&e->list); 926 free_entry(e); 927 } 928 } 929 930 /* 931 * sctp_bind_addrs - bind a SCTP socket to all our addresses 932 */ 933 static int sctp_bind_addrs(struct connection *con, uint16_t port) 934 { 935 struct sockaddr_storage localaddr; 936 struct sockaddr *addr = (struct sockaddr *)&localaddr; 937 int i, addr_len, result = 0; 938 939 for (i = 0; i < dlm_local_count; i++) { 940 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 941 make_sockaddr(&localaddr, port, &addr_len); 942 943 if (!i) 944 result = kernel_bind(con->sock, addr, addr_len); 945 else 946 result = sock_bind_add(con->sock->sk, addr, addr_len); 947 948 if (result < 0) { 949 log_print("Can't bind to %d addr number %d, %d.\n", 950 port, i + 1, result); 951 break; 952 } 953 } 954 return result; 955 } 956 957 /* Initiate an SCTP association. 958 This is a special case of send_to_sock() in that we don't yet have a 959 peeled-off socket for this association, so we use the listening socket 960 and add the primary IP address of the remote node. 961 */ 962 static void sctp_connect_to_sock(struct connection *con) 963 { 964 struct sockaddr_storage daddr; 965 int result; 966 int addr_len; 967 struct socket *sock; 968 unsigned int mark; 969 970 if (con->nodeid == 0) { 971 log_print("attempt to connect sock 0 foiled"); 972 return; 973 } 974 975 dlm_comm_mark(con->nodeid, &mark); 976 977 mutex_lock(&con->sock_mutex); 978 979 /* Some odd races can cause double-connects, ignore them */ 980 if (con->retries++ > MAX_CONNECT_RETRIES) 981 goto out; 982 983 if (con->sock) { 984 log_print("node %d already connected.", con->nodeid); 985 goto out; 986 } 987 988 memset(&daddr, 0, sizeof(daddr)); 989 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); 990 if (result < 0) { 991 log_print("no address for nodeid %d", con->nodeid); 992 goto out; 993 } 994 995 /* Create a socket to communicate with */ 996 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 997 SOCK_STREAM, IPPROTO_SCTP, &sock); 998 if (result < 0) 999 goto socket_err; 1000 1001 sock_set_mark(sock->sk, mark); 1002 1003 con->rx_action = receive_from_sock; 1004 con->connect_action = sctp_connect_to_sock; 1005 add_sock(sock, con); 1006 1007 /* Bind to all addresses. */ 1008 if (sctp_bind_addrs(con, 0)) 1009 goto bind_err; 1010 1011 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1012 1013 log_print("connecting to %d", con->nodeid); 1014 1015 /* Turn off Nagle's algorithm */ 1016 sctp_sock_set_nodelay(sock->sk); 1017 1018 /* 1019 * Make sock->ops->connect() function return in specified time, 1020 * since O_NONBLOCK argument in connect() function does not work here, 1021 * then, we should restore the default value of this attribute. 1022 */ 1023 sock_set_sndtimeo(sock->sk, 5); 1024 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1025 0); 1026 sock_set_sndtimeo(sock->sk, 0); 1027 1028 if (result == -EINPROGRESS) 1029 result = 0; 1030 if (result == 0) 1031 goto out; 1032 1033 bind_err: 1034 con->sock = NULL; 1035 sock_release(sock); 1036 1037 socket_err: 1038 /* 1039 * Some errors are fatal and this list might need adjusting. For other 1040 * errors we try again until the max number of retries is reached. 1041 */ 1042 if (result != -EHOSTUNREACH && 1043 result != -ENETUNREACH && 1044 result != -ENETDOWN && 1045 result != -EINVAL && 1046 result != -EPROTONOSUPPORT) { 1047 log_print("connect %d try %d error %d", con->nodeid, 1048 con->retries, result); 1049 mutex_unlock(&con->sock_mutex); 1050 msleep(1000); 1051 lowcomms_connect_sock(con); 1052 return; 1053 } 1054 1055 out: 1056 mutex_unlock(&con->sock_mutex); 1057 } 1058 1059 /* Connect a new socket to its peer */ 1060 static void tcp_connect_to_sock(struct connection *con) 1061 { 1062 struct sockaddr_storage saddr, src_addr; 1063 int addr_len; 1064 struct socket *sock = NULL; 1065 unsigned int mark; 1066 int result; 1067 1068 if (con->nodeid == 0) { 1069 log_print("attempt to connect sock 0 foiled"); 1070 return; 1071 } 1072 1073 dlm_comm_mark(con->nodeid, &mark); 1074 1075 mutex_lock(&con->sock_mutex); 1076 if (con->retries++ > MAX_CONNECT_RETRIES) 1077 goto out; 1078 1079 /* Some odd races can cause double-connects, ignore them */ 1080 if (con->sock) 1081 goto out; 1082 1083 /* Create a socket to communicate with */ 1084 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1085 SOCK_STREAM, IPPROTO_TCP, &sock); 1086 if (result < 0) 1087 goto out_err; 1088 1089 sock_set_mark(sock->sk, mark); 1090 1091 memset(&saddr, 0, sizeof(saddr)); 1092 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1093 if (result < 0) { 1094 log_print("no address for nodeid %d", con->nodeid); 1095 goto out_err; 1096 } 1097 1098 con->rx_action = receive_from_sock; 1099 con->connect_action = tcp_connect_to_sock; 1100 con->shutdown_action = dlm_tcp_shutdown; 1101 add_sock(sock, con); 1102 1103 /* Bind to our cluster-known address connecting to avoid 1104 routing problems */ 1105 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1106 make_sockaddr(&src_addr, 0, &addr_len); 1107 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1108 addr_len); 1109 if (result < 0) { 1110 log_print("could not bind for connect: %d", result); 1111 /* This *may* not indicate a critical error */ 1112 } 1113 1114 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1115 1116 log_print("connecting to %d", con->nodeid); 1117 1118 /* Turn off Nagle's algorithm */ 1119 tcp_sock_set_nodelay(sock->sk); 1120 1121 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1122 O_NONBLOCK); 1123 if (result == -EINPROGRESS) 1124 result = 0; 1125 if (result == 0) 1126 goto out; 1127 1128 out_err: 1129 if (con->sock) { 1130 sock_release(con->sock); 1131 con->sock = NULL; 1132 } else if (sock) { 1133 sock_release(sock); 1134 } 1135 /* 1136 * Some errors are fatal and this list might need adjusting. For other 1137 * errors we try again until the max number of retries is reached. 1138 */ 1139 if (result != -EHOSTUNREACH && 1140 result != -ENETUNREACH && 1141 result != -ENETDOWN && 1142 result != -EINVAL && 1143 result != -EPROTONOSUPPORT) { 1144 log_print("connect %d try %d error %d", con->nodeid, 1145 con->retries, result); 1146 mutex_unlock(&con->sock_mutex); 1147 msleep(1000); 1148 lowcomms_connect_sock(con); 1149 return; 1150 } 1151 out: 1152 mutex_unlock(&con->sock_mutex); 1153 return; 1154 } 1155 1156 static struct socket *tcp_create_listen_sock(struct connection *con, 1157 struct sockaddr_storage *saddr) 1158 { 1159 struct socket *sock = NULL; 1160 int result = 0; 1161 int addr_len; 1162 1163 if (dlm_local_addr[0]->ss_family == AF_INET) 1164 addr_len = sizeof(struct sockaddr_in); 1165 else 1166 addr_len = sizeof(struct sockaddr_in6); 1167 1168 /* Create a socket to communicate with */ 1169 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1170 SOCK_STREAM, IPPROTO_TCP, &sock); 1171 if (result < 0) { 1172 log_print("Can't create listening comms socket"); 1173 goto create_out; 1174 } 1175 1176 sock_set_mark(sock->sk, dlm_config.ci_mark); 1177 1178 /* Turn off Nagle's algorithm */ 1179 tcp_sock_set_nodelay(sock->sk); 1180 1181 sock_set_reuseaddr(sock->sk); 1182 1183 write_lock_bh(&sock->sk->sk_callback_lock); 1184 sock->sk->sk_user_data = con; 1185 save_listen_callbacks(sock); 1186 con->rx_action = accept_from_sock; 1187 con->connect_action = tcp_connect_to_sock; 1188 write_unlock_bh(&sock->sk->sk_callback_lock); 1189 1190 /* Bind to our port */ 1191 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1192 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1193 if (result < 0) { 1194 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1195 sock_release(sock); 1196 sock = NULL; 1197 con->sock = NULL; 1198 goto create_out; 1199 } 1200 sock_set_keepalive(sock->sk); 1201 1202 result = sock->ops->listen(sock, 5); 1203 if (result < 0) { 1204 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1205 sock_release(sock); 1206 sock = NULL; 1207 goto create_out; 1208 } 1209 1210 create_out: 1211 return sock; 1212 } 1213 1214 /* Get local addresses */ 1215 static void init_local(void) 1216 { 1217 struct sockaddr_storage sas, *addr; 1218 int i; 1219 1220 dlm_local_count = 0; 1221 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1222 if (dlm_our_addr(&sas, i)) 1223 break; 1224 1225 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1226 if (!addr) 1227 break; 1228 dlm_local_addr[dlm_local_count++] = addr; 1229 } 1230 } 1231 1232 static void deinit_local(void) 1233 { 1234 int i; 1235 1236 for (i = 0; i < dlm_local_count; i++) 1237 kfree(dlm_local_addr[i]); 1238 } 1239 1240 /* Initialise SCTP socket and bind to all interfaces */ 1241 static int sctp_listen_for_all(void) 1242 { 1243 struct socket *sock = NULL; 1244 int result = -EINVAL; 1245 struct connection *con = nodeid2con(0, GFP_NOFS); 1246 1247 if (!con) 1248 return -ENOMEM; 1249 1250 log_print("Using SCTP for communications"); 1251 1252 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1253 SOCK_STREAM, IPPROTO_SCTP, &sock); 1254 if (result < 0) { 1255 log_print("Can't create comms socket, check SCTP is loaded"); 1256 goto out; 1257 } 1258 1259 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1260 sock_set_mark(sock->sk, dlm_config.ci_mark); 1261 sctp_sock_set_nodelay(sock->sk); 1262 1263 write_lock_bh(&sock->sk->sk_callback_lock); 1264 /* Init con struct */ 1265 sock->sk->sk_user_data = con; 1266 save_listen_callbacks(sock); 1267 con->sock = sock; 1268 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1269 con->rx_action = accept_from_sock; 1270 con->connect_action = sctp_connect_to_sock; 1271 1272 write_unlock_bh(&sock->sk->sk_callback_lock); 1273 1274 /* Bind to all addresses. */ 1275 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) 1276 goto create_delsock; 1277 1278 result = sock->ops->listen(sock, 5); 1279 if (result < 0) { 1280 log_print("Can't set socket listening"); 1281 goto create_delsock; 1282 } 1283 1284 return 0; 1285 1286 create_delsock: 1287 sock_release(sock); 1288 con->sock = NULL; 1289 out: 1290 return result; 1291 } 1292 1293 static int tcp_listen_for_all(void) 1294 { 1295 struct socket *sock = NULL; 1296 struct connection *con = nodeid2con(0, GFP_NOFS); 1297 int result = -EINVAL; 1298 1299 if (!con) 1300 return -ENOMEM; 1301 1302 /* We don't support multi-homed hosts */ 1303 if (dlm_local_addr[1] != NULL) { 1304 log_print("TCP protocol can't handle multi-homed hosts, " 1305 "try SCTP"); 1306 return -EINVAL; 1307 } 1308 1309 log_print("Using TCP for communications"); 1310 1311 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1312 if (sock) { 1313 add_sock(sock, con); 1314 result = 0; 1315 } 1316 else { 1317 result = -EADDRINUSE; 1318 } 1319 1320 return result; 1321 } 1322 1323 1324 1325 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1326 gfp_t allocation) 1327 { 1328 struct writequeue_entry *entry; 1329 1330 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1331 if (!entry) 1332 return NULL; 1333 1334 entry->page = alloc_page(allocation); 1335 if (!entry->page) { 1336 kfree(entry); 1337 return NULL; 1338 } 1339 1340 entry->offset = 0; 1341 entry->len = 0; 1342 entry->end = 0; 1343 entry->users = 0; 1344 entry->con = con; 1345 1346 return entry; 1347 } 1348 1349 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1350 { 1351 struct connection *con; 1352 struct writequeue_entry *e; 1353 int offset = 0; 1354 1355 con = nodeid2con(nodeid, allocation); 1356 if (!con) 1357 return NULL; 1358 1359 spin_lock(&con->writequeue_lock); 1360 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1361 if ((&e->list == &con->writequeue) || 1362 (PAGE_SIZE - e->end < len)) { 1363 e = NULL; 1364 } else { 1365 offset = e->end; 1366 e->end += len; 1367 e->users++; 1368 } 1369 spin_unlock(&con->writequeue_lock); 1370 1371 if (e) { 1372 got_one: 1373 *ppc = page_address(e->page) + offset; 1374 return e; 1375 } 1376 1377 e = new_writequeue_entry(con, allocation); 1378 if (e) { 1379 spin_lock(&con->writequeue_lock); 1380 offset = e->end; 1381 e->end += len; 1382 e->users++; 1383 list_add_tail(&e->list, &con->writequeue); 1384 spin_unlock(&con->writequeue_lock); 1385 goto got_one; 1386 } 1387 return NULL; 1388 } 1389 1390 void dlm_lowcomms_commit_buffer(void *mh) 1391 { 1392 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1393 struct connection *con = e->con; 1394 int users; 1395 1396 spin_lock(&con->writequeue_lock); 1397 users = --e->users; 1398 if (users) 1399 goto out; 1400 e->len = e->end - e->offset; 1401 spin_unlock(&con->writequeue_lock); 1402 1403 queue_work(send_workqueue, &con->swork); 1404 return; 1405 1406 out: 1407 spin_unlock(&con->writequeue_lock); 1408 return; 1409 } 1410 1411 /* Send a message */ 1412 static void send_to_sock(struct connection *con) 1413 { 1414 int ret = 0; 1415 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1416 struct writequeue_entry *e; 1417 int len, offset; 1418 int count = 0; 1419 1420 mutex_lock(&con->sock_mutex); 1421 if (con->sock == NULL) 1422 goto out_connect; 1423 1424 spin_lock(&con->writequeue_lock); 1425 for (;;) { 1426 e = list_entry(con->writequeue.next, struct writequeue_entry, 1427 list); 1428 if ((struct list_head *) e == &con->writequeue) 1429 break; 1430 1431 len = e->len; 1432 offset = e->offset; 1433 BUG_ON(len == 0 && e->users == 0); 1434 spin_unlock(&con->writequeue_lock); 1435 1436 ret = 0; 1437 if (len) { 1438 ret = kernel_sendpage(con->sock, e->page, offset, len, 1439 msg_flags); 1440 if (ret == -EAGAIN || ret == 0) { 1441 if (ret == -EAGAIN && 1442 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1443 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1444 /* Notify TCP that we're limited by the 1445 * application window size. 1446 */ 1447 set_bit(SOCK_NOSPACE, &con->sock->flags); 1448 con->sock->sk->sk_write_pending++; 1449 } 1450 cond_resched(); 1451 goto out; 1452 } else if (ret < 0) 1453 goto send_error; 1454 } 1455 1456 /* Don't starve people filling buffers */ 1457 if (++count >= MAX_SEND_MSG_COUNT) { 1458 cond_resched(); 1459 count = 0; 1460 } 1461 1462 spin_lock(&con->writequeue_lock); 1463 writequeue_entry_complete(e, ret); 1464 } 1465 spin_unlock(&con->writequeue_lock); 1466 out: 1467 mutex_unlock(&con->sock_mutex); 1468 return; 1469 1470 send_error: 1471 mutex_unlock(&con->sock_mutex); 1472 close_connection(con, false, false, true); 1473 /* Requeue the send work. When the work daemon runs again, it will try 1474 a new connection, then call this function again. */ 1475 queue_work(send_workqueue, &con->swork); 1476 return; 1477 1478 out_connect: 1479 mutex_unlock(&con->sock_mutex); 1480 queue_work(send_workqueue, &con->swork); 1481 cond_resched(); 1482 } 1483 1484 static void clean_one_writequeue(struct connection *con) 1485 { 1486 struct writequeue_entry *e, *safe; 1487 1488 spin_lock(&con->writequeue_lock); 1489 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1490 list_del(&e->list); 1491 free_entry(e); 1492 } 1493 spin_unlock(&con->writequeue_lock); 1494 } 1495 1496 /* Called from recovery when it knows that a node has 1497 left the cluster */ 1498 int dlm_lowcomms_close(int nodeid) 1499 { 1500 struct connection *con; 1501 struct dlm_node_addr *na; 1502 1503 log_print("closing connection to node %d", nodeid); 1504 con = nodeid2con(nodeid, 0); 1505 if (con) { 1506 set_bit(CF_CLOSE, &con->flags); 1507 close_connection(con, true, true, true); 1508 clean_one_writequeue(con); 1509 } 1510 1511 spin_lock(&dlm_node_addrs_spin); 1512 na = find_node_addr(nodeid); 1513 if (na) { 1514 list_del(&na->list); 1515 while (na->addr_count--) 1516 kfree(na->addr[na->addr_count]); 1517 kfree(na); 1518 } 1519 spin_unlock(&dlm_node_addrs_spin); 1520 1521 return 0; 1522 } 1523 1524 /* Receive workqueue function */ 1525 static void process_recv_sockets(struct work_struct *work) 1526 { 1527 struct connection *con = container_of(work, struct connection, rwork); 1528 int err; 1529 1530 clear_bit(CF_READ_PENDING, &con->flags); 1531 do { 1532 err = con->rx_action(con); 1533 } while (!err); 1534 } 1535 1536 /* Send workqueue function */ 1537 static void process_send_sockets(struct work_struct *work) 1538 { 1539 struct connection *con = container_of(work, struct connection, swork); 1540 1541 clear_bit(CF_WRITE_PENDING, &con->flags); 1542 if (con->sock == NULL) /* not mutex protected so check it inside too */ 1543 con->connect_action(con); 1544 if (!list_empty(&con->writequeue)) 1545 send_to_sock(con); 1546 } 1547 1548 static void work_stop(void) 1549 { 1550 if (recv_workqueue) 1551 destroy_workqueue(recv_workqueue); 1552 if (send_workqueue) 1553 destroy_workqueue(send_workqueue); 1554 } 1555 1556 static int work_start(void) 1557 { 1558 recv_workqueue = alloc_workqueue("dlm_recv", 1559 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1560 if (!recv_workqueue) { 1561 log_print("can't start dlm_recv"); 1562 return -ENOMEM; 1563 } 1564 1565 send_workqueue = alloc_workqueue("dlm_send", 1566 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1567 if (!send_workqueue) { 1568 log_print("can't start dlm_send"); 1569 destroy_workqueue(recv_workqueue); 1570 return -ENOMEM; 1571 } 1572 1573 return 0; 1574 } 1575 1576 static void _stop_conn(struct connection *con, bool and_other) 1577 { 1578 mutex_lock(&con->sock_mutex); 1579 set_bit(CF_CLOSE, &con->flags); 1580 set_bit(CF_READ_PENDING, &con->flags); 1581 set_bit(CF_WRITE_PENDING, &con->flags); 1582 if (con->sock && con->sock->sk) { 1583 write_lock_bh(&con->sock->sk->sk_callback_lock); 1584 con->sock->sk->sk_user_data = NULL; 1585 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1586 } 1587 if (con->othercon && and_other) 1588 _stop_conn(con->othercon, false); 1589 mutex_unlock(&con->sock_mutex); 1590 } 1591 1592 static void stop_conn(struct connection *con) 1593 { 1594 _stop_conn(con, true); 1595 } 1596 1597 static void shutdown_conn(struct connection *con) 1598 { 1599 if (con->shutdown_action) 1600 con->shutdown_action(con); 1601 } 1602 1603 static void connection_release(struct rcu_head *rcu) 1604 { 1605 struct connection *con = container_of(rcu, struct connection, rcu); 1606 1607 kfree(con->rx_buf); 1608 kfree(con); 1609 } 1610 1611 static void free_conn(struct connection *con) 1612 { 1613 close_connection(con, true, true, true); 1614 spin_lock(&connections_lock); 1615 hlist_del_rcu(&con->list); 1616 spin_unlock(&connections_lock); 1617 if (con->othercon) { 1618 clean_one_writequeue(con->othercon); 1619 call_rcu(&con->othercon->rcu, connection_release); 1620 } 1621 clean_one_writequeue(con); 1622 call_rcu(&con->rcu, connection_release); 1623 } 1624 1625 static void work_flush(void) 1626 { 1627 int ok, idx; 1628 int i; 1629 struct connection *con; 1630 1631 do { 1632 ok = 1; 1633 foreach_conn(stop_conn); 1634 if (recv_workqueue) 1635 flush_workqueue(recv_workqueue); 1636 if (send_workqueue) 1637 flush_workqueue(send_workqueue); 1638 idx = srcu_read_lock(&connections_srcu); 1639 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1640 hlist_for_each_entry_rcu(con, &connection_hash[i], 1641 list) { 1642 ok &= test_bit(CF_READ_PENDING, &con->flags); 1643 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1644 if (con->othercon) { 1645 ok &= test_bit(CF_READ_PENDING, 1646 &con->othercon->flags); 1647 ok &= test_bit(CF_WRITE_PENDING, 1648 &con->othercon->flags); 1649 } 1650 } 1651 } 1652 srcu_read_unlock(&connections_srcu, idx); 1653 } while (!ok); 1654 } 1655 1656 void dlm_lowcomms_stop(void) 1657 { 1658 /* Set all the flags to prevent any 1659 socket activity. 1660 */ 1661 dlm_allow_conn = 0; 1662 1663 if (recv_workqueue) 1664 flush_workqueue(recv_workqueue); 1665 if (send_workqueue) 1666 flush_workqueue(send_workqueue); 1667 1668 foreach_conn(shutdown_conn); 1669 work_flush(); 1670 foreach_conn(free_conn); 1671 work_stop(); 1672 deinit_local(); 1673 } 1674 1675 int dlm_lowcomms_start(void) 1676 { 1677 int error = -EINVAL; 1678 struct connection *con; 1679 int i; 1680 1681 for (i = 0; i < CONN_HASH_SIZE; i++) 1682 INIT_HLIST_HEAD(&connection_hash[i]); 1683 1684 init_local(); 1685 if (!dlm_local_count) { 1686 error = -ENOTCONN; 1687 log_print("no local IP address has been set"); 1688 goto fail; 1689 } 1690 1691 error = work_start(); 1692 if (error) 1693 goto fail; 1694 1695 dlm_allow_conn = 1; 1696 1697 /* Start listening */ 1698 if (dlm_config.ci_protocol == 0) 1699 error = tcp_listen_for_all(); 1700 else 1701 error = sctp_listen_for_all(); 1702 if (error) 1703 goto fail_unlisten; 1704 1705 return 0; 1706 1707 fail_unlisten: 1708 dlm_allow_conn = 0; 1709 con = nodeid2con(0,0); 1710 if (con) 1711 free_conn(con); 1712 fail: 1713 return error; 1714 } 1715 1716 void dlm_lowcomms_exit(void) 1717 { 1718 struct dlm_node_addr *na, *safe; 1719 1720 spin_lock(&dlm_node_addrs_spin); 1721 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1722 list_del(&na->list); 1723 while (na->addr_count--) 1724 kfree(na->addr[na->addr_count]); 1725 kfree(na); 1726 } 1727 spin_unlock(&dlm_node_addrs_spin); 1728 } 1729