1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 #define CONN_HASH_SIZE 32 63 64 /* Number of messages to send before rescheduling */ 65 #define MAX_SEND_MSG_COUNT 25 66 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 67 68 struct connection { 69 struct socket *sock; /* NULL if not connected */ 70 uint32_t nodeid; /* So we know who we are in the list */ 71 struct mutex sock_mutex; 72 unsigned long flags; 73 #define CF_READ_PENDING 1 74 #define CF_WRITE_PENDING 2 75 #define CF_INIT_PENDING 4 76 #define CF_IS_OTHERCON 5 77 #define CF_CLOSE 6 78 #define CF_APP_LIMITED 7 79 #define CF_CLOSING 8 80 #define CF_SHUTDOWN 9 81 struct list_head writequeue; /* List of outgoing writequeue_entries */ 82 spinlock_t writequeue_lock; 83 int (*rx_action) (struct connection *); /* What to do when active */ 84 void (*connect_action) (struct connection *); /* What to do to connect */ 85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 86 int retries; 87 #define MAX_CONNECT_RETRIES 3 88 struct hlist_node list; 89 struct connection *othercon; 90 struct work_struct rwork; /* Receive workqueue */ 91 struct work_struct swork; /* Send workqueue */ 92 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 93 unsigned char *rx_buf; 94 int rx_buflen; 95 int rx_leftover; 96 struct rcu_head rcu; 97 }; 98 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 99 100 /* An entry waiting to be sent */ 101 struct writequeue_entry { 102 struct list_head list; 103 struct page *page; 104 int offset; 105 int len; 106 int end; 107 int users; 108 struct connection *con; 109 }; 110 111 struct dlm_node_addr { 112 struct list_head list; 113 int nodeid; 114 int addr_count; 115 int curr_addr_index; 116 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 117 }; 118 119 static struct listen_sock_callbacks { 120 void (*sk_error_report)(struct sock *); 121 void (*sk_data_ready)(struct sock *); 122 void (*sk_state_change)(struct sock *); 123 void (*sk_write_space)(struct sock *); 124 } listen_sock; 125 126 static LIST_HEAD(dlm_node_addrs); 127 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 128 129 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 130 static int dlm_local_count; 131 static int dlm_allow_conn; 132 133 /* Work queues */ 134 static struct workqueue_struct *recv_workqueue; 135 static struct workqueue_struct *send_workqueue; 136 137 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 138 static DEFINE_SPINLOCK(connections_lock); 139 DEFINE_STATIC_SRCU(connections_srcu); 140 141 static void process_recv_sockets(struct work_struct *work); 142 static void process_send_sockets(struct work_struct *work); 143 144 145 /* This is deliberately very simple because most clusters have simple 146 sequential nodeids, so we should be able to go straight to a connection 147 struct in the array */ 148 static inline int nodeid_hash(int nodeid) 149 { 150 return nodeid & (CONN_HASH_SIZE-1); 151 } 152 153 static struct connection *__find_con(int nodeid) 154 { 155 int r, idx; 156 struct connection *con; 157 158 r = nodeid_hash(nodeid); 159 160 idx = srcu_read_lock(&connections_srcu); 161 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 162 if (con->nodeid == nodeid) { 163 srcu_read_unlock(&connections_srcu, idx); 164 return con; 165 } 166 } 167 srcu_read_unlock(&connections_srcu, idx); 168 169 return NULL; 170 } 171 172 /* 173 * If 'allocation' is zero then we don't attempt to create a new 174 * connection structure for this node. 175 */ 176 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 177 { 178 struct connection *con, *tmp; 179 int r; 180 181 con = __find_con(nodeid); 182 if (con || !alloc) 183 return con; 184 185 con = kzalloc(sizeof(*con), alloc); 186 if (!con) 187 return NULL; 188 189 con->rx_buflen = dlm_config.ci_buffer_size; 190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 191 if (!con->rx_buf) { 192 kfree(con); 193 return NULL; 194 } 195 196 con->nodeid = nodeid; 197 mutex_init(&con->sock_mutex); 198 INIT_LIST_HEAD(&con->writequeue); 199 spin_lock_init(&con->writequeue_lock); 200 INIT_WORK(&con->swork, process_send_sockets); 201 INIT_WORK(&con->rwork, process_recv_sockets); 202 init_waitqueue_head(&con->shutdown_wait); 203 204 /* Setup action pointers for child sockets */ 205 if (con->nodeid) { 206 struct connection *zerocon = __find_con(0); 207 208 con->connect_action = zerocon->connect_action; 209 if (!con->rx_action) 210 con->rx_action = zerocon->rx_action; 211 } 212 213 r = nodeid_hash(nodeid); 214 215 spin_lock(&connections_lock); 216 /* Because multiple workqueues/threads calls this function it can 217 * race on multiple cpu's. Instead of locking hot path __find_con() 218 * we just check in rare cases of recently added nodes again 219 * under protection of connections_lock. If this is the case we 220 * abort our connection creation and return the existing connection. 221 */ 222 tmp = __find_con(nodeid); 223 if (tmp) { 224 spin_unlock(&connections_lock); 225 kfree(con->rx_buf); 226 kfree(con); 227 return tmp; 228 } 229 230 hlist_add_head_rcu(&con->list, &connection_hash[r]); 231 spin_unlock(&connections_lock); 232 233 return con; 234 } 235 236 /* Loop round all connections */ 237 static void foreach_conn(void (*conn_func)(struct connection *c)) 238 { 239 int i, idx; 240 struct connection *con; 241 242 idx = srcu_read_lock(&connections_srcu); 243 for (i = 0; i < CONN_HASH_SIZE; i++) { 244 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 245 conn_func(con); 246 } 247 srcu_read_unlock(&connections_srcu, idx); 248 } 249 250 static struct dlm_node_addr *find_node_addr(int nodeid) 251 { 252 struct dlm_node_addr *na; 253 254 list_for_each_entry(na, &dlm_node_addrs, list) { 255 if (na->nodeid == nodeid) 256 return na; 257 } 258 return NULL; 259 } 260 261 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 262 { 263 switch (x->ss_family) { 264 case AF_INET: { 265 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 266 struct sockaddr_in *siny = (struct sockaddr_in *)y; 267 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 268 return 0; 269 if (sinx->sin_port != siny->sin_port) 270 return 0; 271 break; 272 } 273 case AF_INET6: { 274 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 275 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 276 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 277 return 0; 278 if (sinx->sin6_port != siny->sin6_port) 279 return 0; 280 break; 281 } 282 default: 283 return 0; 284 } 285 return 1; 286 } 287 288 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 289 struct sockaddr *sa_out, bool try_new_addr) 290 { 291 struct sockaddr_storage sas; 292 struct dlm_node_addr *na; 293 294 if (!dlm_local_count) 295 return -1; 296 297 spin_lock(&dlm_node_addrs_spin); 298 na = find_node_addr(nodeid); 299 if (na && na->addr_count) { 300 memcpy(&sas, na->addr[na->curr_addr_index], 301 sizeof(struct sockaddr_storage)); 302 303 if (try_new_addr) { 304 na->curr_addr_index++; 305 if (na->curr_addr_index == na->addr_count) 306 na->curr_addr_index = 0; 307 } 308 } 309 spin_unlock(&dlm_node_addrs_spin); 310 311 if (!na) 312 return -EEXIST; 313 314 if (!na->addr_count) 315 return -ENOENT; 316 317 if (sas_out) 318 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 319 320 if (!sa_out) 321 return 0; 322 323 if (dlm_local_addr[0]->ss_family == AF_INET) { 324 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 325 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 326 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 327 } else { 328 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 329 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 330 ret6->sin6_addr = in6->sin6_addr; 331 } 332 333 return 0; 334 } 335 336 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 337 { 338 struct dlm_node_addr *na; 339 int rv = -EEXIST; 340 int addr_i; 341 342 spin_lock(&dlm_node_addrs_spin); 343 list_for_each_entry(na, &dlm_node_addrs, list) { 344 if (!na->addr_count) 345 continue; 346 347 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 348 if (addr_compare(na->addr[addr_i], addr)) { 349 *nodeid = na->nodeid; 350 rv = 0; 351 goto unlock; 352 } 353 } 354 } 355 unlock: 356 spin_unlock(&dlm_node_addrs_spin); 357 return rv; 358 } 359 360 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 361 { 362 struct sockaddr_storage *new_addr; 363 struct dlm_node_addr *new_node, *na; 364 365 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 366 if (!new_node) 367 return -ENOMEM; 368 369 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 370 if (!new_addr) { 371 kfree(new_node); 372 return -ENOMEM; 373 } 374 375 memcpy(new_addr, addr, len); 376 377 spin_lock(&dlm_node_addrs_spin); 378 na = find_node_addr(nodeid); 379 if (!na) { 380 new_node->nodeid = nodeid; 381 new_node->addr[0] = new_addr; 382 new_node->addr_count = 1; 383 list_add(&new_node->list, &dlm_node_addrs); 384 spin_unlock(&dlm_node_addrs_spin); 385 return 0; 386 } 387 388 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 389 spin_unlock(&dlm_node_addrs_spin); 390 kfree(new_addr); 391 kfree(new_node); 392 return -ENOSPC; 393 } 394 395 na->addr[na->addr_count++] = new_addr; 396 spin_unlock(&dlm_node_addrs_spin); 397 kfree(new_node); 398 return 0; 399 } 400 401 /* Data available on socket or listen socket received a connect */ 402 static void lowcomms_data_ready(struct sock *sk) 403 { 404 struct connection *con; 405 406 read_lock_bh(&sk->sk_callback_lock); 407 con = sock2con(sk); 408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 409 queue_work(recv_workqueue, &con->rwork); 410 read_unlock_bh(&sk->sk_callback_lock); 411 } 412 413 static void lowcomms_write_space(struct sock *sk) 414 { 415 struct connection *con; 416 417 read_lock_bh(&sk->sk_callback_lock); 418 con = sock2con(sk); 419 if (!con) 420 goto out; 421 422 clear_bit(SOCK_NOSPACE, &con->sock->flags); 423 424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 425 con->sock->sk->sk_write_pending--; 426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 427 } 428 429 queue_work(send_workqueue, &con->swork); 430 out: 431 read_unlock_bh(&sk->sk_callback_lock); 432 } 433 434 static inline void lowcomms_connect_sock(struct connection *con) 435 { 436 if (test_bit(CF_CLOSE, &con->flags)) 437 return; 438 queue_work(send_workqueue, &con->swork); 439 cond_resched(); 440 } 441 442 static void lowcomms_state_change(struct sock *sk) 443 { 444 /* SCTP layer is not calling sk_data_ready when the connection 445 * is done, so we catch the signal through here. Also, it 446 * doesn't switch socket state when entering shutdown, so we 447 * skip the write in that case. 448 */ 449 if (sk->sk_shutdown) { 450 if (sk->sk_shutdown == RCV_SHUTDOWN) 451 lowcomms_data_ready(sk); 452 } else if (sk->sk_state == TCP_ESTABLISHED) { 453 lowcomms_write_space(sk); 454 } 455 } 456 457 int dlm_lowcomms_connect_node(int nodeid) 458 { 459 struct connection *con; 460 461 if (nodeid == dlm_our_nodeid()) 462 return 0; 463 464 con = nodeid2con(nodeid, GFP_NOFS); 465 if (!con) 466 return -ENOMEM; 467 lowcomms_connect_sock(con); 468 return 0; 469 } 470 471 static void lowcomms_error_report(struct sock *sk) 472 { 473 struct connection *con; 474 struct sockaddr_storage saddr; 475 void (*orig_report)(struct sock *) = NULL; 476 477 read_lock_bh(&sk->sk_callback_lock); 478 con = sock2con(sk); 479 if (con == NULL) 480 goto out; 481 482 orig_report = listen_sock.sk_error_report; 483 if (con->sock == NULL || 484 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 485 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 486 "sending to node %d, port %d, " 487 "sk_err=%d/%d\n", dlm_our_nodeid(), 488 con->nodeid, dlm_config.ci_tcp_port, 489 sk->sk_err, sk->sk_err_soft); 490 } else if (saddr.ss_family == AF_INET) { 491 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 492 493 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 494 "sending to node %d at %pI4, port %d, " 495 "sk_err=%d/%d\n", dlm_our_nodeid(), 496 con->nodeid, &sin4->sin_addr.s_addr, 497 dlm_config.ci_tcp_port, sk->sk_err, 498 sk->sk_err_soft); 499 } else { 500 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 501 502 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 503 "sending to node %d at %u.%u.%u.%u, " 504 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 505 con->nodeid, sin6->sin6_addr.s6_addr32[0], 506 sin6->sin6_addr.s6_addr32[1], 507 sin6->sin6_addr.s6_addr32[2], 508 sin6->sin6_addr.s6_addr32[3], 509 dlm_config.ci_tcp_port, sk->sk_err, 510 sk->sk_err_soft); 511 } 512 out: 513 read_unlock_bh(&sk->sk_callback_lock); 514 if (orig_report) 515 orig_report(sk); 516 } 517 518 /* Note: sk_callback_lock must be locked before calling this function. */ 519 static void save_listen_callbacks(struct socket *sock) 520 { 521 struct sock *sk = sock->sk; 522 523 listen_sock.sk_data_ready = sk->sk_data_ready; 524 listen_sock.sk_state_change = sk->sk_state_change; 525 listen_sock.sk_write_space = sk->sk_write_space; 526 listen_sock.sk_error_report = sk->sk_error_report; 527 } 528 529 static void restore_callbacks(struct socket *sock) 530 { 531 struct sock *sk = sock->sk; 532 533 write_lock_bh(&sk->sk_callback_lock); 534 sk->sk_user_data = NULL; 535 sk->sk_data_ready = listen_sock.sk_data_ready; 536 sk->sk_state_change = listen_sock.sk_state_change; 537 sk->sk_write_space = listen_sock.sk_write_space; 538 sk->sk_error_report = listen_sock.sk_error_report; 539 write_unlock_bh(&sk->sk_callback_lock); 540 } 541 542 /* Make a socket active */ 543 static void add_sock(struct socket *sock, struct connection *con) 544 { 545 struct sock *sk = sock->sk; 546 547 write_lock_bh(&sk->sk_callback_lock); 548 con->sock = sock; 549 550 sk->sk_user_data = con; 551 /* Install a data_ready callback */ 552 sk->sk_data_ready = lowcomms_data_ready; 553 sk->sk_write_space = lowcomms_write_space; 554 sk->sk_state_change = lowcomms_state_change; 555 sk->sk_allocation = GFP_NOFS; 556 sk->sk_error_report = lowcomms_error_report; 557 write_unlock_bh(&sk->sk_callback_lock); 558 } 559 560 /* Add the port number to an IPv6 or 4 sockaddr and return the address 561 length */ 562 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 563 int *addr_len) 564 { 565 saddr->ss_family = dlm_local_addr[0]->ss_family; 566 if (saddr->ss_family == AF_INET) { 567 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 568 in4_addr->sin_port = cpu_to_be16(port); 569 *addr_len = sizeof(struct sockaddr_in); 570 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 571 } else { 572 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 573 in6_addr->sin6_port = cpu_to_be16(port); 574 *addr_len = sizeof(struct sockaddr_in6); 575 } 576 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 577 } 578 579 /* Close a remote connection and tidy up */ 580 static void close_connection(struct connection *con, bool and_other, 581 bool tx, bool rx) 582 { 583 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 584 585 if (tx && !closing && cancel_work_sync(&con->swork)) { 586 log_print("canceled swork for node %d", con->nodeid); 587 clear_bit(CF_WRITE_PENDING, &con->flags); 588 } 589 if (rx && !closing && cancel_work_sync(&con->rwork)) { 590 log_print("canceled rwork for node %d", con->nodeid); 591 clear_bit(CF_READ_PENDING, &con->flags); 592 } 593 594 mutex_lock(&con->sock_mutex); 595 if (con->sock) { 596 restore_callbacks(con->sock); 597 sock_release(con->sock); 598 con->sock = NULL; 599 } 600 if (con->othercon && and_other) { 601 /* Will only re-enter once. */ 602 close_connection(con->othercon, false, true, true); 603 } 604 605 con->rx_leftover = 0; 606 con->retries = 0; 607 mutex_unlock(&con->sock_mutex); 608 clear_bit(CF_CLOSING, &con->flags); 609 } 610 611 static void shutdown_connection(struct connection *con) 612 { 613 int ret; 614 615 if (cancel_work_sync(&con->swork)) { 616 log_print("canceled swork for node %d", con->nodeid); 617 clear_bit(CF_WRITE_PENDING, &con->flags); 618 } 619 620 mutex_lock(&con->sock_mutex); 621 /* nothing to shutdown */ 622 if (!con->sock) { 623 mutex_unlock(&con->sock_mutex); 624 return; 625 } 626 627 set_bit(CF_SHUTDOWN, &con->flags); 628 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 629 mutex_unlock(&con->sock_mutex); 630 if (ret) { 631 log_print("Connection %p failed to shutdown: %d will force close", 632 con, ret); 633 goto force_close; 634 } else { 635 ret = wait_event_timeout(con->shutdown_wait, 636 !test_bit(CF_SHUTDOWN, &con->flags), 637 DLM_SHUTDOWN_WAIT_TIMEOUT); 638 if (ret == 0) { 639 log_print("Connection %p shutdown timed out, will force close", 640 con); 641 goto force_close; 642 } 643 } 644 645 return; 646 647 force_close: 648 clear_bit(CF_SHUTDOWN, &con->flags); 649 close_connection(con, false, true, true); 650 } 651 652 static void dlm_tcp_shutdown(struct connection *con) 653 { 654 if (con->othercon) 655 shutdown_connection(con->othercon); 656 shutdown_connection(con); 657 } 658 659 static int con_realloc_receive_buf(struct connection *con, int newlen) 660 { 661 unsigned char *newbuf; 662 663 newbuf = kmalloc(newlen, GFP_NOFS); 664 if (!newbuf) 665 return -ENOMEM; 666 667 /* copy any leftover from last receive */ 668 if (con->rx_leftover) 669 memmove(newbuf, con->rx_buf, con->rx_leftover); 670 671 /* swap to new buffer space */ 672 kfree(con->rx_buf); 673 con->rx_buflen = newlen; 674 con->rx_buf = newbuf; 675 676 return 0; 677 } 678 679 /* Data received from remote end */ 680 static int receive_from_sock(struct connection *con) 681 { 682 int call_again_soon = 0; 683 struct msghdr msg; 684 struct kvec iov; 685 int ret, buflen; 686 687 mutex_lock(&con->sock_mutex); 688 689 if (con->sock == NULL) { 690 ret = -EAGAIN; 691 goto out_close; 692 } 693 694 if (con->nodeid == 0) { 695 ret = -EINVAL; 696 goto out_close; 697 } 698 699 /* realloc if we get new buffer size to read out */ 700 buflen = dlm_config.ci_buffer_size; 701 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 702 ret = con_realloc_receive_buf(con, buflen); 703 if (ret < 0) 704 goto out_resched; 705 } 706 707 /* calculate new buffer parameter regarding last receive and 708 * possible leftover bytes 709 */ 710 iov.iov_base = con->rx_buf + con->rx_leftover; 711 iov.iov_len = con->rx_buflen - con->rx_leftover; 712 713 memset(&msg, 0, sizeof(msg)); 714 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 715 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 716 msg.msg_flags); 717 if (ret <= 0) 718 goto out_close; 719 else if (ret == iov.iov_len) 720 call_again_soon = 1; 721 722 /* new buflen according readed bytes and leftover from last receive */ 723 buflen = ret + con->rx_leftover; 724 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 725 if (ret < 0) 726 goto out_close; 727 728 /* calculate leftover bytes from process and put it into begin of 729 * the receive buffer, so next receive we have the full message 730 * at the start address of the receive buffer. 731 */ 732 con->rx_leftover = buflen - ret; 733 if (con->rx_leftover) { 734 memmove(con->rx_buf, con->rx_buf + ret, 735 con->rx_leftover); 736 call_again_soon = true; 737 } 738 739 if (call_again_soon) 740 goto out_resched; 741 742 mutex_unlock(&con->sock_mutex); 743 return 0; 744 745 out_resched: 746 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 747 queue_work(recv_workqueue, &con->rwork); 748 mutex_unlock(&con->sock_mutex); 749 return -EAGAIN; 750 751 out_close: 752 mutex_unlock(&con->sock_mutex); 753 if (ret != -EAGAIN) { 754 /* Reconnect when there is something to send */ 755 close_connection(con, false, true, false); 756 if (ret == 0) { 757 log_print("connection %p got EOF from %d", 758 con, con->nodeid); 759 /* handling for tcp shutdown */ 760 clear_bit(CF_SHUTDOWN, &con->flags); 761 wake_up(&con->shutdown_wait); 762 /* signal to breaking receive worker */ 763 ret = -1; 764 } 765 } 766 return ret; 767 } 768 769 /* Listening socket is busy, accept a connection */ 770 static int accept_from_sock(struct connection *con) 771 { 772 int result; 773 struct sockaddr_storage peeraddr; 774 struct socket *newsock; 775 int len; 776 int nodeid; 777 struct connection *newcon; 778 struct connection *addcon; 779 unsigned int mark; 780 781 if (!dlm_allow_conn) { 782 return -1; 783 } 784 785 mutex_lock_nested(&con->sock_mutex, 0); 786 787 if (!con->sock) { 788 mutex_unlock(&con->sock_mutex); 789 return -ENOTCONN; 790 } 791 792 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 793 if (result < 0) 794 goto accept_err; 795 796 /* Get the connected socket's peer */ 797 memset(&peeraddr, 0, sizeof(peeraddr)); 798 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 799 if (len < 0) { 800 result = -ECONNABORTED; 801 goto accept_err; 802 } 803 804 /* Get the new node's NODEID */ 805 make_sockaddr(&peeraddr, 0, &len); 806 if (addr_to_nodeid(&peeraddr, &nodeid)) { 807 unsigned char *b=(unsigned char *)&peeraddr; 808 log_print("connect from non cluster node"); 809 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 810 b, sizeof(struct sockaddr_storage)); 811 sock_release(newsock); 812 mutex_unlock(&con->sock_mutex); 813 return -1; 814 } 815 816 dlm_comm_mark(nodeid, &mark); 817 sock_set_mark(newsock->sk, mark); 818 819 log_print("got connection from %d", nodeid); 820 821 /* Check to see if we already have a connection to this node. This 822 * could happen if the two nodes initiate a connection at roughly 823 * the same time and the connections cross on the wire. 824 * In this case we store the incoming one in "othercon" 825 */ 826 newcon = nodeid2con(nodeid, GFP_NOFS); 827 if (!newcon) { 828 result = -ENOMEM; 829 goto accept_err; 830 } 831 mutex_lock_nested(&newcon->sock_mutex, 1); 832 if (newcon->sock) { 833 struct connection *othercon = newcon->othercon; 834 835 if (!othercon) { 836 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 837 if (!othercon) { 838 log_print("failed to allocate incoming socket"); 839 mutex_unlock(&newcon->sock_mutex); 840 result = -ENOMEM; 841 goto accept_err; 842 } 843 844 othercon->rx_buflen = dlm_config.ci_buffer_size; 845 othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); 846 if (!othercon->rx_buf) { 847 mutex_unlock(&newcon->sock_mutex); 848 kfree(othercon); 849 log_print("failed to allocate incoming socket receive buffer"); 850 result = -ENOMEM; 851 goto accept_err; 852 } 853 854 othercon->nodeid = nodeid; 855 othercon->rx_action = receive_from_sock; 856 mutex_init(&othercon->sock_mutex); 857 INIT_LIST_HEAD(&othercon->writequeue); 858 spin_lock_init(&othercon->writequeue_lock); 859 INIT_WORK(&othercon->swork, process_send_sockets); 860 INIT_WORK(&othercon->rwork, process_recv_sockets); 861 init_waitqueue_head(&othercon->shutdown_wait); 862 set_bit(CF_IS_OTHERCON, &othercon->flags); 863 } else { 864 /* close other sock con if we have something new */ 865 close_connection(othercon, false, true, false); 866 } 867 868 mutex_lock_nested(&othercon->sock_mutex, 2); 869 newcon->othercon = othercon; 870 add_sock(newsock, othercon); 871 addcon = othercon; 872 mutex_unlock(&othercon->sock_mutex); 873 } 874 else { 875 newcon->rx_action = receive_from_sock; 876 /* accept copies the sk after we've saved the callbacks, so we 877 don't want to save them a second time or comm errors will 878 result in calling sk_error_report recursively. */ 879 add_sock(newsock, newcon); 880 addcon = newcon; 881 } 882 883 mutex_unlock(&newcon->sock_mutex); 884 885 /* 886 * Add it to the active queue in case we got data 887 * between processing the accept adding the socket 888 * to the read_sockets list 889 */ 890 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 891 queue_work(recv_workqueue, &addcon->rwork); 892 mutex_unlock(&con->sock_mutex); 893 894 return 0; 895 896 accept_err: 897 mutex_unlock(&con->sock_mutex); 898 if (newsock) 899 sock_release(newsock); 900 901 if (result != -EAGAIN) 902 log_print("error accepting connection from node: %d", result); 903 return result; 904 } 905 906 static void free_entry(struct writequeue_entry *e) 907 { 908 __free_page(e->page); 909 kfree(e); 910 } 911 912 /* 913 * writequeue_entry_complete - try to delete and free write queue entry 914 * @e: write queue entry to try to delete 915 * @completed: bytes completed 916 * 917 * writequeue_lock must be held. 918 */ 919 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 920 { 921 e->offset += completed; 922 e->len -= completed; 923 924 if (e->len == 0 && e->users == 0) { 925 list_del(&e->list); 926 free_entry(e); 927 } 928 } 929 930 /* 931 * sctp_bind_addrs - bind a SCTP socket to all our addresses 932 */ 933 static int sctp_bind_addrs(struct connection *con, uint16_t port) 934 { 935 struct sockaddr_storage localaddr; 936 struct sockaddr *addr = (struct sockaddr *)&localaddr; 937 int i, addr_len, result = 0; 938 939 for (i = 0; i < dlm_local_count; i++) { 940 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 941 make_sockaddr(&localaddr, port, &addr_len); 942 943 if (!i) 944 result = kernel_bind(con->sock, addr, addr_len); 945 else 946 result = sock_bind_add(con->sock->sk, addr, addr_len); 947 948 if (result < 0) { 949 log_print("Can't bind to %d addr number %d, %d.\n", 950 port, i + 1, result); 951 break; 952 } 953 } 954 return result; 955 } 956 957 /* Initiate an SCTP association. 958 This is a special case of send_to_sock() in that we don't yet have a 959 peeled-off socket for this association, so we use the listening socket 960 and add the primary IP address of the remote node. 961 */ 962 static void sctp_connect_to_sock(struct connection *con) 963 { 964 struct sockaddr_storage daddr; 965 int result; 966 int addr_len; 967 struct socket *sock; 968 unsigned int mark; 969 970 if (con->nodeid == 0) { 971 log_print("attempt to connect sock 0 foiled"); 972 return; 973 } 974 975 dlm_comm_mark(con->nodeid, &mark); 976 977 mutex_lock(&con->sock_mutex); 978 979 /* Some odd races can cause double-connects, ignore them */ 980 if (con->retries++ > MAX_CONNECT_RETRIES) 981 goto out; 982 983 if (con->sock) { 984 log_print("node %d already connected.", con->nodeid); 985 goto out; 986 } 987 988 memset(&daddr, 0, sizeof(daddr)); 989 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); 990 if (result < 0) { 991 log_print("no address for nodeid %d", con->nodeid); 992 goto out; 993 } 994 995 /* Create a socket to communicate with */ 996 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 997 SOCK_STREAM, IPPROTO_SCTP, &sock); 998 if (result < 0) 999 goto socket_err; 1000 1001 sock_set_mark(sock->sk, mark); 1002 1003 con->rx_action = receive_from_sock; 1004 con->connect_action = sctp_connect_to_sock; 1005 add_sock(sock, con); 1006 1007 /* Bind to all addresses. */ 1008 if (sctp_bind_addrs(con, 0)) 1009 goto bind_err; 1010 1011 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1012 1013 log_print("connecting to %d", con->nodeid); 1014 1015 /* Turn off Nagle's algorithm */ 1016 sctp_sock_set_nodelay(sock->sk); 1017 1018 /* 1019 * Make sock->ops->connect() function return in specified time, 1020 * since O_NONBLOCK argument in connect() function does not work here, 1021 * then, we should restore the default value of this attribute. 1022 */ 1023 sock_set_sndtimeo(sock->sk, 5); 1024 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1025 0); 1026 sock_set_sndtimeo(sock->sk, 0); 1027 1028 if (result == -EINPROGRESS) 1029 result = 0; 1030 if (result == 0) 1031 goto out; 1032 1033 bind_err: 1034 con->sock = NULL; 1035 sock_release(sock); 1036 1037 socket_err: 1038 /* 1039 * Some errors are fatal and this list might need adjusting. For other 1040 * errors we try again until the max number of retries is reached. 1041 */ 1042 if (result != -EHOSTUNREACH && 1043 result != -ENETUNREACH && 1044 result != -ENETDOWN && 1045 result != -EINVAL && 1046 result != -EPROTONOSUPPORT) { 1047 log_print("connect %d try %d error %d", con->nodeid, 1048 con->retries, result); 1049 mutex_unlock(&con->sock_mutex); 1050 msleep(1000); 1051 lowcomms_connect_sock(con); 1052 return; 1053 } 1054 1055 out: 1056 mutex_unlock(&con->sock_mutex); 1057 } 1058 1059 /* Connect a new socket to its peer */ 1060 static void tcp_connect_to_sock(struct connection *con) 1061 { 1062 struct sockaddr_storage saddr, src_addr; 1063 int addr_len; 1064 struct socket *sock = NULL; 1065 unsigned int mark; 1066 int result; 1067 1068 if (con->nodeid == 0) { 1069 log_print("attempt to connect sock 0 foiled"); 1070 return; 1071 } 1072 1073 dlm_comm_mark(con->nodeid, &mark); 1074 1075 mutex_lock(&con->sock_mutex); 1076 if (con->retries++ > MAX_CONNECT_RETRIES) 1077 goto out; 1078 1079 /* Some odd races can cause double-connects, ignore them */ 1080 if (con->sock) 1081 goto out; 1082 1083 /* Create a socket to communicate with */ 1084 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1085 SOCK_STREAM, IPPROTO_TCP, &sock); 1086 if (result < 0) 1087 goto out_err; 1088 1089 sock_set_mark(sock->sk, mark); 1090 1091 memset(&saddr, 0, sizeof(saddr)); 1092 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1093 if (result < 0) { 1094 log_print("no address for nodeid %d", con->nodeid); 1095 goto out_err; 1096 } 1097 1098 con->rx_action = receive_from_sock; 1099 con->connect_action = tcp_connect_to_sock; 1100 con->shutdown_action = dlm_tcp_shutdown; 1101 add_sock(sock, con); 1102 1103 /* Bind to our cluster-known address connecting to avoid 1104 routing problems */ 1105 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1106 make_sockaddr(&src_addr, 0, &addr_len); 1107 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1108 addr_len); 1109 if (result < 0) { 1110 log_print("could not bind for connect: %d", result); 1111 /* This *may* not indicate a critical error */ 1112 } 1113 1114 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1115 1116 log_print("connecting to %d", con->nodeid); 1117 1118 /* Turn off Nagle's algorithm */ 1119 tcp_sock_set_nodelay(sock->sk); 1120 1121 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1122 O_NONBLOCK); 1123 if (result == -EINPROGRESS) 1124 result = 0; 1125 if (result == 0) 1126 goto out; 1127 1128 out_err: 1129 if (con->sock) { 1130 sock_release(con->sock); 1131 con->sock = NULL; 1132 } else if (sock) { 1133 sock_release(sock); 1134 } 1135 /* 1136 * Some errors are fatal and this list might need adjusting. For other 1137 * errors we try again until the max number of retries is reached. 1138 */ 1139 if (result != -EHOSTUNREACH && 1140 result != -ENETUNREACH && 1141 result != -ENETDOWN && 1142 result != -EINVAL && 1143 result != -EPROTONOSUPPORT) { 1144 log_print("connect %d try %d error %d", con->nodeid, 1145 con->retries, result); 1146 mutex_unlock(&con->sock_mutex); 1147 msleep(1000); 1148 lowcomms_connect_sock(con); 1149 return; 1150 } 1151 out: 1152 mutex_unlock(&con->sock_mutex); 1153 return; 1154 } 1155 1156 static struct socket *tcp_create_listen_sock(struct connection *con, 1157 struct sockaddr_storage *saddr) 1158 { 1159 struct socket *sock = NULL; 1160 int result = 0; 1161 int addr_len; 1162 1163 if (dlm_local_addr[0]->ss_family == AF_INET) 1164 addr_len = sizeof(struct sockaddr_in); 1165 else 1166 addr_len = sizeof(struct sockaddr_in6); 1167 1168 /* Create a socket to communicate with */ 1169 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1170 SOCK_STREAM, IPPROTO_TCP, &sock); 1171 if (result < 0) { 1172 log_print("Can't create listening comms socket"); 1173 goto create_out; 1174 } 1175 1176 sock_set_mark(sock->sk, dlm_config.ci_mark); 1177 1178 /* Turn off Nagle's algorithm */ 1179 tcp_sock_set_nodelay(sock->sk); 1180 1181 sock_set_reuseaddr(sock->sk); 1182 1183 write_lock_bh(&sock->sk->sk_callback_lock); 1184 sock->sk->sk_user_data = con; 1185 save_listen_callbacks(sock); 1186 con->rx_action = accept_from_sock; 1187 con->connect_action = tcp_connect_to_sock; 1188 write_unlock_bh(&sock->sk->sk_callback_lock); 1189 1190 /* Bind to our port */ 1191 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1192 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1193 if (result < 0) { 1194 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1195 sock_release(sock); 1196 sock = NULL; 1197 con->sock = NULL; 1198 goto create_out; 1199 } 1200 sock_set_keepalive(sock->sk); 1201 1202 result = sock->ops->listen(sock, 5); 1203 if (result < 0) { 1204 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1205 sock_release(sock); 1206 sock = NULL; 1207 goto create_out; 1208 } 1209 1210 create_out: 1211 return sock; 1212 } 1213 1214 /* Get local addresses */ 1215 static void init_local(void) 1216 { 1217 struct sockaddr_storage sas, *addr; 1218 int i; 1219 1220 dlm_local_count = 0; 1221 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1222 if (dlm_our_addr(&sas, i)) 1223 break; 1224 1225 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1226 if (!addr) 1227 break; 1228 dlm_local_addr[dlm_local_count++] = addr; 1229 } 1230 } 1231 1232 static void deinit_local(void) 1233 { 1234 int i; 1235 1236 for (i = 0; i < dlm_local_count; i++) 1237 kfree(dlm_local_addr[i]); 1238 } 1239 1240 /* Initialise SCTP socket and bind to all interfaces */ 1241 static int sctp_listen_for_all(void) 1242 { 1243 struct socket *sock = NULL; 1244 int result = -EINVAL; 1245 struct connection *con = nodeid2con(0, GFP_NOFS); 1246 1247 if (!con) 1248 return -ENOMEM; 1249 1250 log_print("Using SCTP for communications"); 1251 1252 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1253 SOCK_STREAM, IPPROTO_SCTP, &sock); 1254 if (result < 0) { 1255 log_print("Can't create comms socket, check SCTP is loaded"); 1256 goto out; 1257 } 1258 1259 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1260 sock_set_mark(sock->sk, dlm_config.ci_mark); 1261 sctp_sock_set_nodelay(sock->sk); 1262 1263 write_lock_bh(&sock->sk->sk_callback_lock); 1264 /* Init con struct */ 1265 sock->sk->sk_user_data = con; 1266 save_listen_callbacks(sock); 1267 con->sock = sock; 1268 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1269 con->rx_action = accept_from_sock; 1270 con->connect_action = sctp_connect_to_sock; 1271 1272 write_unlock_bh(&sock->sk->sk_callback_lock); 1273 1274 /* Bind to all addresses. */ 1275 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) 1276 goto create_delsock; 1277 1278 result = sock->ops->listen(sock, 5); 1279 if (result < 0) { 1280 log_print("Can't set socket listening"); 1281 goto create_delsock; 1282 } 1283 1284 return 0; 1285 1286 create_delsock: 1287 sock_release(sock); 1288 con->sock = NULL; 1289 out: 1290 return result; 1291 } 1292 1293 static int tcp_listen_for_all(void) 1294 { 1295 struct socket *sock = NULL; 1296 struct connection *con = nodeid2con(0, GFP_NOFS); 1297 int result = -EINVAL; 1298 1299 if (!con) 1300 return -ENOMEM; 1301 1302 /* We don't support multi-homed hosts */ 1303 if (dlm_local_addr[1] != NULL) { 1304 log_print("TCP protocol can't handle multi-homed hosts, " 1305 "try SCTP"); 1306 return -EINVAL; 1307 } 1308 1309 log_print("Using TCP for communications"); 1310 1311 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1312 if (sock) { 1313 add_sock(sock, con); 1314 result = 0; 1315 } 1316 else { 1317 result = -EADDRINUSE; 1318 } 1319 1320 return result; 1321 } 1322 1323 1324 1325 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1326 gfp_t allocation) 1327 { 1328 struct writequeue_entry *entry; 1329 1330 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1331 if (!entry) 1332 return NULL; 1333 1334 entry->page = alloc_page(allocation); 1335 if (!entry->page) { 1336 kfree(entry); 1337 return NULL; 1338 } 1339 1340 entry->offset = 0; 1341 entry->len = 0; 1342 entry->end = 0; 1343 entry->users = 0; 1344 entry->con = con; 1345 1346 return entry; 1347 } 1348 1349 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1350 { 1351 struct connection *con; 1352 struct writequeue_entry *e; 1353 int offset = 0; 1354 1355 if (len > LOWCOMMS_MAX_TX_BUFFER_LEN) { 1356 BUILD_BUG_ON(PAGE_SIZE < LOWCOMMS_MAX_TX_BUFFER_LEN); 1357 log_print("failed to allocate a buffer of size %d", len); 1358 return NULL; 1359 } 1360 1361 con = nodeid2con(nodeid, allocation); 1362 if (!con) 1363 return NULL; 1364 1365 spin_lock(&con->writequeue_lock); 1366 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1367 if ((&e->list == &con->writequeue) || 1368 (PAGE_SIZE - e->end < len)) { 1369 e = NULL; 1370 } else { 1371 offset = e->end; 1372 e->end += len; 1373 e->users++; 1374 } 1375 spin_unlock(&con->writequeue_lock); 1376 1377 if (e) { 1378 got_one: 1379 *ppc = page_address(e->page) + offset; 1380 return e; 1381 } 1382 1383 e = new_writequeue_entry(con, allocation); 1384 if (e) { 1385 spin_lock(&con->writequeue_lock); 1386 offset = e->end; 1387 e->end += len; 1388 e->users++; 1389 list_add_tail(&e->list, &con->writequeue); 1390 spin_unlock(&con->writequeue_lock); 1391 goto got_one; 1392 } 1393 return NULL; 1394 } 1395 1396 void dlm_lowcomms_commit_buffer(void *mh) 1397 { 1398 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1399 struct connection *con = e->con; 1400 int users; 1401 1402 spin_lock(&con->writequeue_lock); 1403 users = --e->users; 1404 if (users) 1405 goto out; 1406 e->len = e->end - e->offset; 1407 spin_unlock(&con->writequeue_lock); 1408 1409 queue_work(send_workqueue, &con->swork); 1410 return; 1411 1412 out: 1413 spin_unlock(&con->writequeue_lock); 1414 return; 1415 } 1416 1417 /* Send a message */ 1418 static void send_to_sock(struct connection *con) 1419 { 1420 int ret = 0; 1421 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1422 struct writequeue_entry *e; 1423 int len, offset; 1424 int count = 0; 1425 1426 mutex_lock(&con->sock_mutex); 1427 if (con->sock == NULL) 1428 goto out_connect; 1429 1430 spin_lock(&con->writequeue_lock); 1431 for (;;) { 1432 e = list_entry(con->writequeue.next, struct writequeue_entry, 1433 list); 1434 if ((struct list_head *) e == &con->writequeue) 1435 break; 1436 1437 len = e->len; 1438 offset = e->offset; 1439 BUG_ON(len == 0 && e->users == 0); 1440 spin_unlock(&con->writequeue_lock); 1441 1442 ret = 0; 1443 if (len) { 1444 ret = kernel_sendpage(con->sock, e->page, offset, len, 1445 msg_flags); 1446 if (ret == -EAGAIN || ret == 0) { 1447 if (ret == -EAGAIN && 1448 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1449 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1450 /* Notify TCP that we're limited by the 1451 * application window size. 1452 */ 1453 set_bit(SOCK_NOSPACE, &con->sock->flags); 1454 con->sock->sk->sk_write_pending++; 1455 } 1456 cond_resched(); 1457 goto out; 1458 } else if (ret < 0) 1459 goto send_error; 1460 } 1461 1462 /* Don't starve people filling buffers */ 1463 if (++count >= MAX_SEND_MSG_COUNT) { 1464 cond_resched(); 1465 count = 0; 1466 } 1467 1468 spin_lock(&con->writequeue_lock); 1469 writequeue_entry_complete(e, ret); 1470 } 1471 spin_unlock(&con->writequeue_lock); 1472 out: 1473 mutex_unlock(&con->sock_mutex); 1474 return; 1475 1476 send_error: 1477 mutex_unlock(&con->sock_mutex); 1478 close_connection(con, false, false, true); 1479 /* Requeue the send work. When the work daemon runs again, it will try 1480 a new connection, then call this function again. */ 1481 queue_work(send_workqueue, &con->swork); 1482 return; 1483 1484 out_connect: 1485 mutex_unlock(&con->sock_mutex); 1486 queue_work(send_workqueue, &con->swork); 1487 cond_resched(); 1488 } 1489 1490 static void clean_one_writequeue(struct connection *con) 1491 { 1492 struct writequeue_entry *e, *safe; 1493 1494 spin_lock(&con->writequeue_lock); 1495 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1496 list_del(&e->list); 1497 free_entry(e); 1498 } 1499 spin_unlock(&con->writequeue_lock); 1500 } 1501 1502 /* Called from recovery when it knows that a node has 1503 left the cluster */ 1504 int dlm_lowcomms_close(int nodeid) 1505 { 1506 struct connection *con; 1507 struct dlm_node_addr *na; 1508 1509 log_print("closing connection to node %d", nodeid); 1510 con = nodeid2con(nodeid, 0); 1511 if (con) { 1512 set_bit(CF_CLOSE, &con->flags); 1513 close_connection(con, true, true, true); 1514 clean_one_writequeue(con); 1515 } 1516 1517 spin_lock(&dlm_node_addrs_spin); 1518 na = find_node_addr(nodeid); 1519 if (na) { 1520 list_del(&na->list); 1521 while (na->addr_count--) 1522 kfree(na->addr[na->addr_count]); 1523 kfree(na); 1524 } 1525 spin_unlock(&dlm_node_addrs_spin); 1526 1527 return 0; 1528 } 1529 1530 /* Receive workqueue function */ 1531 static void process_recv_sockets(struct work_struct *work) 1532 { 1533 struct connection *con = container_of(work, struct connection, rwork); 1534 int err; 1535 1536 clear_bit(CF_READ_PENDING, &con->flags); 1537 do { 1538 err = con->rx_action(con); 1539 } while (!err); 1540 } 1541 1542 /* Send workqueue function */ 1543 static void process_send_sockets(struct work_struct *work) 1544 { 1545 struct connection *con = container_of(work, struct connection, swork); 1546 1547 clear_bit(CF_WRITE_PENDING, &con->flags); 1548 if (con->sock == NULL) /* not mutex protected so check it inside too */ 1549 con->connect_action(con); 1550 if (!list_empty(&con->writequeue)) 1551 send_to_sock(con); 1552 } 1553 1554 static void work_stop(void) 1555 { 1556 if (recv_workqueue) 1557 destroy_workqueue(recv_workqueue); 1558 if (send_workqueue) 1559 destroy_workqueue(send_workqueue); 1560 } 1561 1562 static int work_start(void) 1563 { 1564 recv_workqueue = alloc_workqueue("dlm_recv", 1565 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1566 if (!recv_workqueue) { 1567 log_print("can't start dlm_recv"); 1568 return -ENOMEM; 1569 } 1570 1571 send_workqueue = alloc_workqueue("dlm_send", 1572 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1573 if (!send_workqueue) { 1574 log_print("can't start dlm_send"); 1575 destroy_workqueue(recv_workqueue); 1576 return -ENOMEM; 1577 } 1578 1579 return 0; 1580 } 1581 1582 static void _stop_conn(struct connection *con, bool and_other) 1583 { 1584 mutex_lock(&con->sock_mutex); 1585 set_bit(CF_CLOSE, &con->flags); 1586 set_bit(CF_READ_PENDING, &con->flags); 1587 set_bit(CF_WRITE_PENDING, &con->flags); 1588 if (con->sock && con->sock->sk) { 1589 write_lock_bh(&con->sock->sk->sk_callback_lock); 1590 con->sock->sk->sk_user_data = NULL; 1591 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1592 } 1593 if (con->othercon && and_other) 1594 _stop_conn(con->othercon, false); 1595 mutex_unlock(&con->sock_mutex); 1596 } 1597 1598 static void stop_conn(struct connection *con) 1599 { 1600 _stop_conn(con, true); 1601 } 1602 1603 static void shutdown_conn(struct connection *con) 1604 { 1605 if (con->shutdown_action) 1606 con->shutdown_action(con); 1607 } 1608 1609 static void connection_release(struct rcu_head *rcu) 1610 { 1611 struct connection *con = container_of(rcu, struct connection, rcu); 1612 1613 kfree(con->rx_buf); 1614 kfree(con); 1615 } 1616 1617 static void free_conn(struct connection *con) 1618 { 1619 close_connection(con, true, true, true); 1620 spin_lock(&connections_lock); 1621 hlist_del_rcu(&con->list); 1622 spin_unlock(&connections_lock); 1623 if (con->othercon) { 1624 clean_one_writequeue(con->othercon); 1625 call_srcu(&connections_srcu, &con->othercon->rcu, 1626 connection_release); 1627 } 1628 clean_one_writequeue(con); 1629 call_srcu(&connections_srcu, &con->rcu, connection_release); 1630 } 1631 1632 static void work_flush(void) 1633 { 1634 int ok, idx; 1635 int i; 1636 struct connection *con; 1637 1638 do { 1639 ok = 1; 1640 foreach_conn(stop_conn); 1641 if (recv_workqueue) 1642 flush_workqueue(recv_workqueue); 1643 if (send_workqueue) 1644 flush_workqueue(send_workqueue); 1645 idx = srcu_read_lock(&connections_srcu); 1646 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1647 hlist_for_each_entry_rcu(con, &connection_hash[i], 1648 list) { 1649 ok &= test_bit(CF_READ_PENDING, &con->flags); 1650 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1651 if (con->othercon) { 1652 ok &= test_bit(CF_READ_PENDING, 1653 &con->othercon->flags); 1654 ok &= test_bit(CF_WRITE_PENDING, 1655 &con->othercon->flags); 1656 } 1657 } 1658 } 1659 srcu_read_unlock(&connections_srcu, idx); 1660 } while (!ok); 1661 } 1662 1663 void dlm_lowcomms_stop(void) 1664 { 1665 /* Set all the flags to prevent any 1666 socket activity. 1667 */ 1668 dlm_allow_conn = 0; 1669 1670 if (recv_workqueue) 1671 flush_workqueue(recv_workqueue); 1672 if (send_workqueue) 1673 flush_workqueue(send_workqueue); 1674 1675 foreach_conn(shutdown_conn); 1676 work_flush(); 1677 foreach_conn(free_conn); 1678 work_stop(); 1679 deinit_local(); 1680 } 1681 1682 int dlm_lowcomms_start(void) 1683 { 1684 int error = -EINVAL; 1685 struct connection *con; 1686 int i; 1687 1688 for (i = 0; i < CONN_HASH_SIZE; i++) 1689 INIT_HLIST_HEAD(&connection_hash[i]); 1690 1691 init_local(); 1692 if (!dlm_local_count) { 1693 error = -ENOTCONN; 1694 log_print("no local IP address has been set"); 1695 goto fail; 1696 } 1697 1698 error = work_start(); 1699 if (error) 1700 goto fail; 1701 1702 dlm_allow_conn = 1; 1703 1704 /* Start listening */ 1705 if (dlm_config.ci_protocol == 0) 1706 error = tcp_listen_for_all(); 1707 else 1708 error = sctp_listen_for_all(); 1709 if (error) 1710 goto fail_unlisten; 1711 1712 return 0; 1713 1714 fail_unlisten: 1715 dlm_allow_conn = 0; 1716 con = nodeid2con(0,0); 1717 if (con) 1718 free_conn(con); 1719 fail: 1720 return error; 1721 } 1722 1723 void dlm_lowcomms_exit(void) 1724 { 1725 struct dlm_node_addr *na, *safe; 1726 1727 spin_lock(&dlm_node_addrs_spin); 1728 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1729 list_del(&na->list); 1730 while (na->addr_count--) 1731 kfree(na->addr[na->addr_count]); 1732 kfree(na); 1733 } 1734 spin_unlock(&dlm_node_addrs_spin); 1735 } 1736