1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 #define CONN_HASH_SIZE 32 63 64 /* Number of messages to send before rescheduling */ 65 #define MAX_SEND_MSG_COUNT 25 66 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 67 68 struct connection { 69 struct socket *sock; /* NULL if not connected */ 70 uint32_t nodeid; /* So we know who we are in the list */ 71 struct mutex sock_mutex; 72 unsigned long flags; 73 #define CF_READ_PENDING 1 74 #define CF_WRITE_PENDING 2 75 #define CF_INIT_PENDING 4 76 #define CF_IS_OTHERCON 5 77 #define CF_CLOSE 6 78 #define CF_APP_LIMITED 7 79 #define CF_CLOSING 8 80 #define CF_SHUTDOWN 9 81 struct list_head writequeue; /* List of outgoing writequeue_entries */ 82 spinlock_t writequeue_lock; 83 int (*rx_action) (struct connection *); /* What to do when active */ 84 void (*connect_action) (struct connection *); /* What to do to connect */ 85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 86 int retries; 87 #define MAX_CONNECT_RETRIES 3 88 struct hlist_node list; 89 struct connection *othercon; 90 struct work_struct rwork; /* Receive workqueue */ 91 struct work_struct swork; /* Send workqueue */ 92 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 93 unsigned char *rx_buf; 94 int rx_buflen; 95 int rx_leftover; 96 struct rcu_head rcu; 97 }; 98 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 99 100 /* An entry waiting to be sent */ 101 struct writequeue_entry { 102 struct list_head list; 103 struct page *page; 104 int offset; 105 int len; 106 int end; 107 int users; 108 struct connection *con; 109 }; 110 111 struct dlm_node_addr { 112 struct list_head list; 113 int nodeid; 114 int addr_count; 115 int curr_addr_index; 116 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 117 }; 118 119 static struct listen_sock_callbacks { 120 void (*sk_error_report)(struct sock *); 121 void (*sk_data_ready)(struct sock *); 122 void (*sk_state_change)(struct sock *); 123 void (*sk_write_space)(struct sock *); 124 } listen_sock; 125 126 static LIST_HEAD(dlm_node_addrs); 127 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 128 129 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 130 static int dlm_local_count; 131 static int dlm_allow_conn; 132 133 /* Work queues */ 134 static struct workqueue_struct *recv_workqueue; 135 static struct workqueue_struct *send_workqueue; 136 137 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 138 static DEFINE_SPINLOCK(connections_lock); 139 DEFINE_STATIC_SRCU(connections_srcu); 140 141 static void process_recv_sockets(struct work_struct *work); 142 static void process_send_sockets(struct work_struct *work); 143 144 145 /* This is deliberately very simple because most clusters have simple 146 sequential nodeids, so we should be able to go straight to a connection 147 struct in the array */ 148 static inline int nodeid_hash(int nodeid) 149 { 150 return nodeid & (CONN_HASH_SIZE-1); 151 } 152 153 static struct connection *__find_con(int nodeid) 154 { 155 int r, idx; 156 struct connection *con; 157 158 r = nodeid_hash(nodeid); 159 160 idx = srcu_read_lock(&connections_srcu); 161 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 162 if (con->nodeid == nodeid) { 163 srcu_read_unlock(&connections_srcu, idx); 164 return con; 165 } 166 } 167 srcu_read_unlock(&connections_srcu, idx); 168 169 return NULL; 170 } 171 172 /* 173 * If 'allocation' is zero then we don't attempt to create a new 174 * connection structure for this node. 175 */ 176 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 177 { 178 struct connection *con, *tmp; 179 int r; 180 181 con = __find_con(nodeid); 182 if (con || !alloc) 183 return con; 184 185 con = kzalloc(sizeof(*con), alloc); 186 if (!con) 187 return NULL; 188 189 con->rx_buflen = dlm_config.ci_buffer_size; 190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 191 if (!con->rx_buf) { 192 kfree(con); 193 return NULL; 194 } 195 196 con->nodeid = nodeid; 197 mutex_init(&con->sock_mutex); 198 INIT_LIST_HEAD(&con->writequeue); 199 spin_lock_init(&con->writequeue_lock); 200 INIT_WORK(&con->swork, process_send_sockets); 201 INIT_WORK(&con->rwork, process_recv_sockets); 202 init_waitqueue_head(&con->shutdown_wait); 203 204 /* Setup action pointers for child sockets */ 205 if (con->nodeid) { 206 struct connection *zerocon = __find_con(0); 207 208 con->connect_action = zerocon->connect_action; 209 if (!con->rx_action) 210 con->rx_action = zerocon->rx_action; 211 } 212 213 r = nodeid_hash(nodeid); 214 215 spin_lock(&connections_lock); 216 /* Because multiple workqueues/threads calls this function it can 217 * race on multiple cpu's. Instead of locking hot path __find_con() 218 * we just check in rare cases of recently added nodes again 219 * under protection of connections_lock. If this is the case we 220 * abort our connection creation and return the existing connection. 221 */ 222 tmp = __find_con(nodeid); 223 if (tmp) { 224 spin_unlock(&connections_lock); 225 kfree(con->rx_buf); 226 kfree(con); 227 return tmp; 228 } 229 230 hlist_add_head_rcu(&con->list, &connection_hash[r]); 231 spin_unlock(&connections_lock); 232 233 return con; 234 } 235 236 /* Loop round all connections */ 237 static void foreach_conn(void (*conn_func)(struct connection *c)) 238 { 239 int i, idx; 240 struct connection *con; 241 242 idx = srcu_read_lock(&connections_srcu); 243 for (i = 0; i < CONN_HASH_SIZE; i++) { 244 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 245 conn_func(con); 246 } 247 srcu_read_unlock(&connections_srcu, idx); 248 } 249 250 static struct dlm_node_addr *find_node_addr(int nodeid) 251 { 252 struct dlm_node_addr *na; 253 254 list_for_each_entry(na, &dlm_node_addrs, list) { 255 if (na->nodeid == nodeid) 256 return na; 257 } 258 return NULL; 259 } 260 261 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 262 { 263 switch (x->ss_family) { 264 case AF_INET: { 265 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 266 struct sockaddr_in *siny = (struct sockaddr_in *)y; 267 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 268 return 0; 269 if (sinx->sin_port != siny->sin_port) 270 return 0; 271 break; 272 } 273 case AF_INET6: { 274 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 275 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 276 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 277 return 0; 278 if (sinx->sin6_port != siny->sin6_port) 279 return 0; 280 break; 281 } 282 default: 283 return 0; 284 } 285 return 1; 286 } 287 288 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 289 struct sockaddr *sa_out, bool try_new_addr) 290 { 291 struct sockaddr_storage sas; 292 struct dlm_node_addr *na; 293 294 if (!dlm_local_count) 295 return -1; 296 297 spin_lock(&dlm_node_addrs_spin); 298 na = find_node_addr(nodeid); 299 if (na && na->addr_count) { 300 memcpy(&sas, na->addr[na->curr_addr_index], 301 sizeof(struct sockaddr_storage)); 302 303 if (try_new_addr) { 304 na->curr_addr_index++; 305 if (na->curr_addr_index == na->addr_count) 306 na->curr_addr_index = 0; 307 } 308 } 309 spin_unlock(&dlm_node_addrs_spin); 310 311 if (!na) 312 return -EEXIST; 313 314 if (!na->addr_count) 315 return -ENOENT; 316 317 if (sas_out) 318 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 319 320 if (!sa_out) 321 return 0; 322 323 if (dlm_local_addr[0]->ss_family == AF_INET) { 324 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 325 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 326 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 327 } else { 328 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 329 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 330 ret6->sin6_addr = in6->sin6_addr; 331 } 332 333 return 0; 334 } 335 336 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 337 { 338 struct dlm_node_addr *na; 339 int rv = -EEXIST; 340 int addr_i; 341 342 spin_lock(&dlm_node_addrs_spin); 343 list_for_each_entry(na, &dlm_node_addrs, list) { 344 if (!na->addr_count) 345 continue; 346 347 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 348 if (addr_compare(na->addr[addr_i], addr)) { 349 *nodeid = na->nodeid; 350 rv = 0; 351 goto unlock; 352 } 353 } 354 } 355 unlock: 356 spin_unlock(&dlm_node_addrs_spin); 357 return rv; 358 } 359 360 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 361 { 362 struct sockaddr_storage *new_addr; 363 struct dlm_node_addr *new_node, *na; 364 365 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 366 if (!new_node) 367 return -ENOMEM; 368 369 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 370 if (!new_addr) { 371 kfree(new_node); 372 return -ENOMEM; 373 } 374 375 memcpy(new_addr, addr, len); 376 377 spin_lock(&dlm_node_addrs_spin); 378 na = find_node_addr(nodeid); 379 if (!na) { 380 new_node->nodeid = nodeid; 381 new_node->addr[0] = new_addr; 382 new_node->addr_count = 1; 383 list_add(&new_node->list, &dlm_node_addrs); 384 spin_unlock(&dlm_node_addrs_spin); 385 return 0; 386 } 387 388 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 389 spin_unlock(&dlm_node_addrs_spin); 390 kfree(new_addr); 391 kfree(new_node); 392 return -ENOSPC; 393 } 394 395 na->addr[na->addr_count++] = new_addr; 396 spin_unlock(&dlm_node_addrs_spin); 397 kfree(new_node); 398 return 0; 399 } 400 401 /* Data available on socket or listen socket received a connect */ 402 static void lowcomms_data_ready(struct sock *sk) 403 { 404 struct connection *con; 405 406 read_lock_bh(&sk->sk_callback_lock); 407 con = sock2con(sk); 408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 409 queue_work(recv_workqueue, &con->rwork); 410 read_unlock_bh(&sk->sk_callback_lock); 411 } 412 413 static void lowcomms_write_space(struct sock *sk) 414 { 415 struct connection *con; 416 417 read_lock_bh(&sk->sk_callback_lock); 418 con = sock2con(sk); 419 if (!con) 420 goto out; 421 422 clear_bit(SOCK_NOSPACE, &con->sock->flags); 423 424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 425 con->sock->sk->sk_write_pending--; 426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 427 } 428 429 queue_work(send_workqueue, &con->swork); 430 out: 431 read_unlock_bh(&sk->sk_callback_lock); 432 } 433 434 static inline void lowcomms_connect_sock(struct connection *con) 435 { 436 if (test_bit(CF_CLOSE, &con->flags)) 437 return; 438 queue_work(send_workqueue, &con->swork); 439 cond_resched(); 440 } 441 442 static void lowcomms_state_change(struct sock *sk) 443 { 444 /* SCTP layer is not calling sk_data_ready when the connection 445 * is done, so we catch the signal through here. Also, it 446 * doesn't switch socket state when entering shutdown, so we 447 * skip the write in that case. 448 */ 449 if (sk->sk_shutdown) { 450 if (sk->sk_shutdown == RCV_SHUTDOWN) 451 lowcomms_data_ready(sk); 452 } else if (sk->sk_state == TCP_ESTABLISHED) { 453 lowcomms_write_space(sk); 454 } 455 } 456 457 int dlm_lowcomms_connect_node(int nodeid) 458 { 459 struct connection *con; 460 461 if (nodeid == dlm_our_nodeid()) 462 return 0; 463 464 con = nodeid2con(nodeid, GFP_NOFS); 465 if (!con) 466 return -ENOMEM; 467 lowcomms_connect_sock(con); 468 return 0; 469 } 470 471 static void lowcomms_error_report(struct sock *sk) 472 { 473 struct connection *con; 474 struct sockaddr_storage saddr; 475 void (*orig_report)(struct sock *) = NULL; 476 477 read_lock_bh(&sk->sk_callback_lock); 478 con = sock2con(sk); 479 if (con == NULL) 480 goto out; 481 482 orig_report = listen_sock.sk_error_report; 483 if (con->sock == NULL || 484 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 485 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 486 "sending to node %d, port %d, " 487 "sk_err=%d/%d\n", dlm_our_nodeid(), 488 con->nodeid, dlm_config.ci_tcp_port, 489 sk->sk_err, sk->sk_err_soft); 490 } else if (saddr.ss_family == AF_INET) { 491 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 492 493 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 494 "sending to node %d at %pI4, port %d, " 495 "sk_err=%d/%d\n", dlm_our_nodeid(), 496 con->nodeid, &sin4->sin_addr.s_addr, 497 dlm_config.ci_tcp_port, sk->sk_err, 498 sk->sk_err_soft); 499 } else { 500 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 501 502 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 503 "sending to node %d at %u.%u.%u.%u, " 504 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 505 con->nodeid, sin6->sin6_addr.s6_addr32[0], 506 sin6->sin6_addr.s6_addr32[1], 507 sin6->sin6_addr.s6_addr32[2], 508 sin6->sin6_addr.s6_addr32[3], 509 dlm_config.ci_tcp_port, sk->sk_err, 510 sk->sk_err_soft); 511 } 512 out: 513 read_unlock_bh(&sk->sk_callback_lock); 514 if (orig_report) 515 orig_report(sk); 516 } 517 518 /* Note: sk_callback_lock must be locked before calling this function. */ 519 static void save_listen_callbacks(struct socket *sock) 520 { 521 struct sock *sk = sock->sk; 522 523 listen_sock.sk_data_ready = sk->sk_data_ready; 524 listen_sock.sk_state_change = sk->sk_state_change; 525 listen_sock.sk_write_space = sk->sk_write_space; 526 listen_sock.sk_error_report = sk->sk_error_report; 527 } 528 529 static void restore_callbacks(struct socket *sock) 530 { 531 struct sock *sk = sock->sk; 532 533 write_lock_bh(&sk->sk_callback_lock); 534 sk->sk_user_data = NULL; 535 sk->sk_data_ready = listen_sock.sk_data_ready; 536 sk->sk_state_change = listen_sock.sk_state_change; 537 sk->sk_write_space = listen_sock.sk_write_space; 538 sk->sk_error_report = listen_sock.sk_error_report; 539 write_unlock_bh(&sk->sk_callback_lock); 540 } 541 542 /* Make a socket active */ 543 static void add_sock(struct socket *sock, struct connection *con) 544 { 545 struct sock *sk = sock->sk; 546 547 write_lock_bh(&sk->sk_callback_lock); 548 con->sock = sock; 549 550 sk->sk_user_data = con; 551 /* Install a data_ready callback */ 552 sk->sk_data_ready = lowcomms_data_ready; 553 sk->sk_write_space = lowcomms_write_space; 554 sk->sk_state_change = lowcomms_state_change; 555 sk->sk_allocation = GFP_NOFS; 556 sk->sk_error_report = lowcomms_error_report; 557 write_unlock_bh(&sk->sk_callback_lock); 558 } 559 560 /* Add the port number to an IPv6 or 4 sockaddr and return the address 561 length */ 562 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 563 int *addr_len) 564 { 565 saddr->ss_family = dlm_local_addr[0]->ss_family; 566 if (saddr->ss_family == AF_INET) { 567 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 568 in4_addr->sin_port = cpu_to_be16(port); 569 *addr_len = sizeof(struct sockaddr_in); 570 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 571 } else { 572 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 573 in6_addr->sin6_port = cpu_to_be16(port); 574 *addr_len = sizeof(struct sockaddr_in6); 575 } 576 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 577 } 578 579 /* Close a remote connection and tidy up */ 580 static void close_connection(struct connection *con, bool and_other, 581 bool tx, bool rx) 582 { 583 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 584 585 if (tx && !closing && cancel_work_sync(&con->swork)) { 586 log_print("canceled swork for node %d", con->nodeid); 587 clear_bit(CF_WRITE_PENDING, &con->flags); 588 } 589 if (rx && !closing && cancel_work_sync(&con->rwork)) { 590 log_print("canceled rwork for node %d", con->nodeid); 591 clear_bit(CF_READ_PENDING, &con->flags); 592 } 593 594 mutex_lock(&con->sock_mutex); 595 if (con->sock) { 596 restore_callbacks(con->sock); 597 sock_release(con->sock); 598 con->sock = NULL; 599 } 600 if (con->othercon && and_other) { 601 /* Will only re-enter once. */ 602 close_connection(con->othercon, false, tx, rx); 603 } 604 605 con->rx_leftover = 0; 606 con->retries = 0; 607 mutex_unlock(&con->sock_mutex); 608 clear_bit(CF_CLOSING, &con->flags); 609 } 610 611 static void shutdown_connection(struct connection *con) 612 { 613 int ret; 614 615 flush_work(&con->swork); 616 617 mutex_lock(&con->sock_mutex); 618 /* nothing to shutdown */ 619 if (!con->sock) { 620 mutex_unlock(&con->sock_mutex); 621 return; 622 } 623 624 set_bit(CF_SHUTDOWN, &con->flags); 625 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 626 mutex_unlock(&con->sock_mutex); 627 if (ret) { 628 log_print("Connection %p failed to shutdown: %d will force close", 629 con, ret); 630 goto force_close; 631 } else { 632 ret = wait_event_timeout(con->shutdown_wait, 633 !test_bit(CF_SHUTDOWN, &con->flags), 634 DLM_SHUTDOWN_WAIT_TIMEOUT); 635 if (ret == 0) { 636 log_print("Connection %p shutdown timed out, will force close", 637 con); 638 goto force_close; 639 } 640 } 641 642 return; 643 644 force_close: 645 clear_bit(CF_SHUTDOWN, &con->flags); 646 close_connection(con, false, true, true); 647 } 648 649 static void dlm_tcp_shutdown(struct connection *con) 650 { 651 if (con->othercon) 652 shutdown_connection(con->othercon); 653 shutdown_connection(con); 654 } 655 656 static int con_realloc_receive_buf(struct connection *con, int newlen) 657 { 658 unsigned char *newbuf; 659 660 newbuf = kmalloc(newlen, GFP_NOFS); 661 if (!newbuf) 662 return -ENOMEM; 663 664 /* copy any leftover from last receive */ 665 if (con->rx_leftover) 666 memmove(newbuf, con->rx_buf, con->rx_leftover); 667 668 /* swap to new buffer space */ 669 kfree(con->rx_buf); 670 con->rx_buflen = newlen; 671 con->rx_buf = newbuf; 672 673 return 0; 674 } 675 676 /* Data received from remote end */ 677 static int receive_from_sock(struct connection *con) 678 { 679 int call_again_soon = 0; 680 struct msghdr msg; 681 struct kvec iov; 682 int ret, buflen; 683 684 mutex_lock(&con->sock_mutex); 685 686 if (con->sock == NULL) { 687 ret = -EAGAIN; 688 goto out_close; 689 } 690 691 if (con->nodeid == 0) { 692 ret = -EINVAL; 693 goto out_close; 694 } 695 696 /* realloc if we get new buffer size to read out */ 697 buflen = dlm_config.ci_buffer_size; 698 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 699 ret = con_realloc_receive_buf(con, buflen); 700 if (ret < 0) 701 goto out_resched; 702 } 703 704 /* calculate new buffer parameter regarding last receive and 705 * possible leftover bytes 706 */ 707 iov.iov_base = con->rx_buf + con->rx_leftover; 708 iov.iov_len = con->rx_buflen - con->rx_leftover; 709 710 memset(&msg, 0, sizeof(msg)); 711 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 712 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 713 msg.msg_flags); 714 if (ret <= 0) 715 goto out_close; 716 else if (ret == iov.iov_len) 717 call_again_soon = 1; 718 719 /* new buflen according readed bytes and leftover from last receive */ 720 buflen = ret + con->rx_leftover; 721 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 722 if (ret < 0) 723 goto out_close; 724 725 /* calculate leftover bytes from process and put it into begin of 726 * the receive buffer, so next receive we have the full message 727 * at the start address of the receive buffer. 728 */ 729 con->rx_leftover = buflen - ret; 730 if (con->rx_leftover) { 731 memmove(con->rx_buf, con->rx_buf + ret, 732 con->rx_leftover); 733 call_again_soon = true; 734 } 735 736 if (call_again_soon) 737 goto out_resched; 738 739 mutex_unlock(&con->sock_mutex); 740 return 0; 741 742 out_resched: 743 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 744 queue_work(recv_workqueue, &con->rwork); 745 mutex_unlock(&con->sock_mutex); 746 return -EAGAIN; 747 748 out_close: 749 mutex_unlock(&con->sock_mutex); 750 if (ret != -EAGAIN) { 751 /* Reconnect when there is something to send */ 752 close_connection(con, false, true, false); 753 if (ret == 0) { 754 log_print("connection %p got EOF from %d", 755 con, con->nodeid); 756 /* handling for tcp shutdown */ 757 clear_bit(CF_SHUTDOWN, &con->flags); 758 wake_up(&con->shutdown_wait); 759 /* signal to breaking receive worker */ 760 ret = -1; 761 } 762 } 763 return ret; 764 } 765 766 /* Listening socket is busy, accept a connection */ 767 static int accept_from_sock(struct connection *con) 768 { 769 int result; 770 struct sockaddr_storage peeraddr; 771 struct socket *newsock; 772 int len; 773 int nodeid; 774 struct connection *newcon; 775 struct connection *addcon; 776 unsigned int mark; 777 778 if (!dlm_allow_conn) { 779 return -1; 780 } 781 782 mutex_lock_nested(&con->sock_mutex, 0); 783 784 if (!con->sock) { 785 mutex_unlock(&con->sock_mutex); 786 return -ENOTCONN; 787 } 788 789 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 790 if (result < 0) 791 goto accept_err; 792 793 /* Get the connected socket's peer */ 794 memset(&peeraddr, 0, sizeof(peeraddr)); 795 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 796 if (len < 0) { 797 result = -ECONNABORTED; 798 goto accept_err; 799 } 800 801 /* Get the new node's NODEID */ 802 make_sockaddr(&peeraddr, 0, &len); 803 if (addr_to_nodeid(&peeraddr, &nodeid)) { 804 unsigned char *b=(unsigned char *)&peeraddr; 805 log_print("connect from non cluster node"); 806 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 807 b, sizeof(struct sockaddr_storage)); 808 sock_release(newsock); 809 mutex_unlock(&con->sock_mutex); 810 return -1; 811 } 812 813 dlm_comm_mark(nodeid, &mark); 814 sock_set_mark(newsock->sk, mark); 815 816 log_print("got connection from %d", nodeid); 817 818 /* Check to see if we already have a connection to this node. This 819 * could happen if the two nodes initiate a connection at roughly 820 * the same time and the connections cross on the wire. 821 * In this case we store the incoming one in "othercon" 822 */ 823 newcon = nodeid2con(nodeid, GFP_NOFS); 824 if (!newcon) { 825 result = -ENOMEM; 826 goto accept_err; 827 } 828 mutex_lock_nested(&newcon->sock_mutex, 1); 829 if (newcon->sock) { 830 struct connection *othercon = newcon->othercon; 831 832 if (!othercon) { 833 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 834 if (!othercon) { 835 log_print("failed to allocate incoming socket"); 836 mutex_unlock(&newcon->sock_mutex); 837 result = -ENOMEM; 838 goto accept_err; 839 } 840 841 othercon->rx_buflen = dlm_config.ci_buffer_size; 842 othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); 843 if (!othercon->rx_buf) { 844 mutex_unlock(&newcon->sock_mutex); 845 kfree(othercon); 846 log_print("failed to allocate incoming socket receive buffer"); 847 result = -ENOMEM; 848 goto accept_err; 849 } 850 851 othercon->nodeid = nodeid; 852 othercon->rx_action = receive_from_sock; 853 mutex_init(&othercon->sock_mutex); 854 INIT_LIST_HEAD(&othercon->writequeue); 855 spin_lock_init(&othercon->writequeue_lock); 856 INIT_WORK(&othercon->swork, process_send_sockets); 857 INIT_WORK(&othercon->rwork, process_recv_sockets); 858 init_waitqueue_head(&othercon->shutdown_wait); 859 set_bit(CF_IS_OTHERCON, &othercon->flags); 860 } else { 861 /* close other sock con if we have something new */ 862 close_connection(othercon, false, true, false); 863 } 864 865 mutex_lock_nested(&othercon->sock_mutex, 2); 866 newcon->othercon = othercon; 867 add_sock(newsock, othercon); 868 addcon = othercon; 869 mutex_unlock(&othercon->sock_mutex); 870 } 871 else { 872 newcon->rx_action = receive_from_sock; 873 /* accept copies the sk after we've saved the callbacks, so we 874 don't want to save them a second time or comm errors will 875 result in calling sk_error_report recursively. */ 876 add_sock(newsock, newcon); 877 addcon = newcon; 878 } 879 880 mutex_unlock(&newcon->sock_mutex); 881 882 /* 883 * Add it to the active queue in case we got data 884 * between processing the accept adding the socket 885 * to the read_sockets list 886 */ 887 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 888 queue_work(recv_workqueue, &addcon->rwork); 889 mutex_unlock(&con->sock_mutex); 890 891 return 0; 892 893 accept_err: 894 mutex_unlock(&con->sock_mutex); 895 if (newsock) 896 sock_release(newsock); 897 898 if (result != -EAGAIN) 899 log_print("error accepting connection from node: %d", result); 900 return result; 901 } 902 903 static void free_entry(struct writequeue_entry *e) 904 { 905 __free_page(e->page); 906 kfree(e); 907 } 908 909 /* 910 * writequeue_entry_complete - try to delete and free write queue entry 911 * @e: write queue entry to try to delete 912 * @completed: bytes completed 913 * 914 * writequeue_lock must be held. 915 */ 916 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 917 { 918 e->offset += completed; 919 e->len -= completed; 920 921 if (e->len == 0 && e->users == 0) { 922 list_del(&e->list); 923 free_entry(e); 924 } 925 } 926 927 /* 928 * sctp_bind_addrs - bind a SCTP socket to all our addresses 929 */ 930 static int sctp_bind_addrs(struct connection *con, uint16_t port) 931 { 932 struct sockaddr_storage localaddr; 933 struct sockaddr *addr = (struct sockaddr *)&localaddr; 934 int i, addr_len, result = 0; 935 936 for (i = 0; i < dlm_local_count; i++) { 937 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 938 make_sockaddr(&localaddr, port, &addr_len); 939 940 if (!i) 941 result = kernel_bind(con->sock, addr, addr_len); 942 else 943 result = sock_bind_add(con->sock->sk, addr, addr_len); 944 945 if (result < 0) { 946 log_print("Can't bind to %d addr number %d, %d.\n", 947 port, i + 1, result); 948 break; 949 } 950 } 951 return result; 952 } 953 954 /* Initiate an SCTP association. 955 This is a special case of send_to_sock() in that we don't yet have a 956 peeled-off socket for this association, so we use the listening socket 957 and add the primary IP address of the remote node. 958 */ 959 static void sctp_connect_to_sock(struct connection *con) 960 { 961 struct sockaddr_storage daddr; 962 int result; 963 int addr_len; 964 struct socket *sock; 965 unsigned int mark; 966 967 if (con->nodeid == 0) { 968 log_print("attempt to connect sock 0 foiled"); 969 return; 970 } 971 972 dlm_comm_mark(con->nodeid, &mark); 973 974 mutex_lock(&con->sock_mutex); 975 976 /* Some odd races can cause double-connects, ignore them */ 977 if (con->retries++ > MAX_CONNECT_RETRIES) 978 goto out; 979 980 if (con->sock) { 981 log_print("node %d already connected.", con->nodeid); 982 goto out; 983 } 984 985 memset(&daddr, 0, sizeof(daddr)); 986 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); 987 if (result < 0) { 988 log_print("no address for nodeid %d", con->nodeid); 989 goto out; 990 } 991 992 /* Create a socket to communicate with */ 993 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 994 SOCK_STREAM, IPPROTO_SCTP, &sock); 995 if (result < 0) 996 goto socket_err; 997 998 sock_set_mark(sock->sk, mark); 999 1000 con->rx_action = receive_from_sock; 1001 con->connect_action = sctp_connect_to_sock; 1002 add_sock(sock, con); 1003 1004 /* Bind to all addresses. */ 1005 if (sctp_bind_addrs(con, 0)) 1006 goto bind_err; 1007 1008 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1009 1010 log_print("connecting to %d", con->nodeid); 1011 1012 /* Turn off Nagle's algorithm */ 1013 sctp_sock_set_nodelay(sock->sk); 1014 1015 /* 1016 * Make sock->ops->connect() function return in specified time, 1017 * since O_NONBLOCK argument in connect() function does not work here, 1018 * then, we should restore the default value of this attribute. 1019 */ 1020 sock_set_sndtimeo(sock->sk, 5); 1021 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1022 0); 1023 sock_set_sndtimeo(sock->sk, 0); 1024 1025 if (result == -EINPROGRESS) 1026 result = 0; 1027 if (result == 0) 1028 goto out; 1029 1030 bind_err: 1031 con->sock = NULL; 1032 sock_release(sock); 1033 1034 socket_err: 1035 /* 1036 * Some errors are fatal and this list might need adjusting. For other 1037 * errors we try again until the max number of retries is reached. 1038 */ 1039 if (result != -EHOSTUNREACH && 1040 result != -ENETUNREACH && 1041 result != -ENETDOWN && 1042 result != -EINVAL && 1043 result != -EPROTONOSUPPORT) { 1044 log_print("connect %d try %d error %d", con->nodeid, 1045 con->retries, result); 1046 mutex_unlock(&con->sock_mutex); 1047 msleep(1000); 1048 lowcomms_connect_sock(con); 1049 return; 1050 } 1051 1052 out: 1053 mutex_unlock(&con->sock_mutex); 1054 } 1055 1056 /* Connect a new socket to its peer */ 1057 static void tcp_connect_to_sock(struct connection *con) 1058 { 1059 struct sockaddr_storage saddr, src_addr; 1060 int addr_len; 1061 struct socket *sock = NULL; 1062 unsigned int mark; 1063 int result; 1064 1065 if (con->nodeid == 0) { 1066 log_print("attempt to connect sock 0 foiled"); 1067 return; 1068 } 1069 1070 dlm_comm_mark(con->nodeid, &mark); 1071 1072 mutex_lock(&con->sock_mutex); 1073 if (con->retries++ > MAX_CONNECT_RETRIES) 1074 goto out; 1075 1076 /* Some odd races can cause double-connects, ignore them */ 1077 if (con->sock) 1078 goto out; 1079 1080 /* Create a socket to communicate with */ 1081 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1082 SOCK_STREAM, IPPROTO_TCP, &sock); 1083 if (result < 0) 1084 goto out_err; 1085 1086 sock_set_mark(sock->sk, mark); 1087 1088 memset(&saddr, 0, sizeof(saddr)); 1089 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); 1090 if (result < 0) { 1091 log_print("no address for nodeid %d", con->nodeid); 1092 goto out_err; 1093 } 1094 1095 con->rx_action = receive_from_sock; 1096 con->connect_action = tcp_connect_to_sock; 1097 con->shutdown_action = dlm_tcp_shutdown; 1098 add_sock(sock, con); 1099 1100 /* Bind to our cluster-known address connecting to avoid 1101 routing problems */ 1102 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1103 make_sockaddr(&src_addr, 0, &addr_len); 1104 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1105 addr_len); 1106 if (result < 0) { 1107 log_print("could not bind for connect: %d", result); 1108 /* This *may* not indicate a critical error */ 1109 } 1110 1111 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1112 1113 log_print("connecting to %d", con->nodeid); 1114 1115 /* Turn off Nagle's algorithm */ 1116 tcp_sock_set_nodelay(sock->sk); 1117 1118 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1119 O_NONBLOCK); 1120 if (result == -EINPROGRESS) 1121 result = 0; 1122 if (result == 0) 1123 goto out; 1124 1125 out_err: 1126 if (con->sock) { 1127 sock_release(con->sock); 1128 con->sock = NULL; 1129 } else if (sock) { 1130 sock_release(sock); 1131 } 1132 /* 1133 * Some errors are fatal and this list might need adjusting. For other 1134 * errors we try again until the max number of retries is reached. 1135 */ 1136 if (result != -EHOSTUNREACH && 1137 result != -ENETUNREACH && 1138 result != -ENETDOWN && 1139 result != -EINVAL && 1140 result != -EPROTONOSUPPORT) { 1141 log_print("connect %d try %d error %d", con->nodeid, 1142 con->retries, result); 1143 mutex_unlock(&con->sock_mutex); 1144 msleep(1000); 1145 lowcomms_connect_sock(con); 1146 return; 1147 } 1148 out: 1149 mutex_unlock(&con->sock_mutex); 1150 return; 1151 } 1152 1153 static struct socket *tcp_create_listen_sock(struct connection *con, 1154 struct sockaddr_storage *saddr) 1155 { 1156 struct socket *sock = NULL; 1157 int result = 0; 1158 int addr_len; 1159 1160 if (dlm_local_addr[0]->ss_family == AF_INET) 1161 addr_len = sizeof(struct sockaddr_in); 1162 else 1163 addr_len = sizeof(struct sockaddr_in6); 1164 1165 /* Create a socket to communicate with */ 1166 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1167 SOCK_STREAM, IPPROTO_TCP, &sock); 1168 if (result < 0) { 1169 log_print("Can't create listening comms socket"); 1170 goto create_out; 1171 } 1172 1173 sock_set_mark(sock->sk, dlm_config.ci_mark); 1174 1175 /* Turn off Nagle's algorithm */ 1176 tcp_sock_set_nodelay(sock->sk); 1177 1178 sock_set_reuseaddr(sock->sk); 1179 1180 write_lock_bh(&sock->sk->sk_callback_lock); 1181 sock->sk->sk_user_data = con; 1182 save_listen_callbacks(sock); 1183 con->rx_action = accept_from_sock; 1184 con->connect_action = tcp_connect_to_sock; 1185 write_unlock_bh(&sock->sk->sk_callback_lock); 1186 1187 /* Bind to our port */ 1188 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1189 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1190 if (result < 0) { 1191 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1192 sock_release(sock); 1193 sock = NULL; 1194 con->sock = NULL; 1195 goto create_out; 1196 } 1197 sock_set_keepalive(sock->sk); 1198 1199 result = sock->ops->listen(sock, 5); 1200 if (result < 0) { 1201 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1202 sock_release(sock); 1203 sock = NULL; 1204 goto create_out; 1205 } 1206 1207 create_out: 1208 return sock; 1209 } 1210 1211 /* Get local addresses */ 1212 static void init_local(void) 1213 { 1214 struct sockaddr_storage sas, *addr; 1215 int i; 1216 1217 dlm_local_count = 0; 1218 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1219 if (dlm_our_addr(&sas, i)) 1220 break; 1221 1222 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1223 if (!addr) 1224 break; 1225 dlm_local_addr[dlm_local_count++] = addr; 1226 } 1227 } 1228 1229 static void deinit_local(void) 1230 { 1231 int i; 1232 1233 for (i = 0; i < dlm_local_count; i++) 1234 kfree(dlm_local_addr[i]); 1235 } 1236 1237 /* Initialise SCTP socket and bind to all interfaces */ 1238 static int sctp_listen_for_all(void) 1239 { 1240 struct socket *sock = NULL; 1241 int result = -EINVAL; 1242 struct connection *con = nodeid2con(0, GFP_NOFS); 1243 1244 if (!con) 1245 return -ENOMEM; 1246 1247 log_print("Using SCTP for communications"); 1248 1249 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1250 SOCK_STREAM, IPPROTO_SCTP, &sock); 1251 if (result < 0) { 1252 log_print("Can't create comms socket, check SCTP is loaded"); 1253 goto out; 1254 } 1255 1256 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1257 sock_set_mark(sock->sk, dlm_config.ci_mark); 1258 sctp_sock_set_nodelay(sock->sk); 1259 1260 write_lock_bh(&sock->sk->sk_callback_lock); 1261 /* Init con struct */ 1262 sock->sk->sk_user_data = con; 1263 save_listen_callbacks(sock); 1264 con->sock = sock; 1265 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1266 con->rx_action = accept_from_sock; 1267 con->connect_action = sctp_connect_to_sock; 1268 1269 write_unlock_bh(&sock->sk->sk_callback_lock); 1270 1271 /* Bind to all addresses. */ 1272 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) 1273 goto create_delsock; 1274 1275 result = sock->ops->listen(sock, 5); 1276 if (result < 0) { 1277 log_print("Can't set socket listening"); 1278 goto create_delsock; 1279 } 1280 1281 return 0; 1282 1283 create_delsock: 1284 sock_release(sock); 1285 con->sock = NULL; 1286 out: 1287 return result; 1288 } 1289 1290 static int tcp_listen_for_all(void) 1291 { 1292 struct socket *sock = NULL; 1293 struct connection *con = nodeid2con(0, GFP_NOFS); 1294 int result = -EINVAL; 1295 1296 if (!con) 1297 return -ENOMEM; 1298 1299 /* We don't support multi-homed hosts */ 1300 if (dlm_local_addr[1] != NULL) { 1301 log_print("TCP protocol can't handle multi-homed hosts, " 1302 "try SCTP"); 1303 return -EINVAL; 1304 } 1305 1306 log_print("Using TCP for communications"); 1307 1308 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1309 if (sock) { 1310 add_sock(sock, con); 1311 result = 0; 1312 } 1313 else { 1314 result = -EADDRINUSE; 1315 } 1316 1317 return result; 1318 } 1319 1320 1321 1322 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1323 gfp_t allocation) 1324 { 1325 struct writequeue_entry *entry; 1326 1327 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1328 if (!entry) 1329 return NULL; 1330 1331 entry->page = alloc_page(allocation); 1332 if (!entry->page) { 1333 kfree(entry); 1334 return NULL; 1335 } 1336 1337 entry->offset = 0; 1338 entry->len = 0; 1339 entry->end = 0; 1340 entry->users = 0; 1341 entry->con = con; 1342 1343 return entry; 1344 } 1345 1346 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1347 { 1348 struct connection *con; 1349 struct writequeue_entry *e; 1350 int offset = 0; 1351 1352 con = nodeid2con(nodeid, allocation); 1353 if (!con) 1354 return NULL; 1355 1356 spin_lock(&con->writequeue_lock); 1357 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1358 if ((&e->list == &con->writequeue) || 1359 (PAGE_SIZE - e->end < len)) { 1360 e = NULL; 1361 } else { 1362 offset = e->end; 1363 e->end += len; 1364 e->users++; 1365 } 1366 spin_unlock(&con->writequeue_lock); 1367 1368 if (e) { 1369 got_one: 1370 *ppc = page_address(e->page) + offset; 1371 return e; 1372 } 1373 1374 e = new_writequeue_entry(con, allocation); 1375 if (e) { 1376 spin_lock(&con->writequeue_lock); 1377 offset = e->end; 1378 e->end += len; 1379 e->users++; 1380 list_add_tail(&e->list, &con->writequeue); 1381 spin_unlock(&con->writequeue_lock); 1382 goto got_one; 1383 } 1384 return NULL; 1385 } 1386 1387 void dlm_lowcomms_commit_buffer(void *mh) 1388 { 1389 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1390 struct connection *con = e->con; 1391 int users; 1392 1393 spin_lock(&con->writequeue_lock); 1394 users = --e->users; 1395 if (users) 1396 goto out; 1397 e->len = e->end - e->offset; 1398 spin_unlock(&con->writequeue_lock); 1399 1400 queue_work(send_workqueue, &con->swork); 1401 return; 1402 1403 out: 1404 spin_unlock(&con->writequeue_lock); 1405 return; 1406 } 1407 1408 /* Send a message */ 1409 static void send_to_sock(struct connection *con) 1410 { 1411 int ret = 0; 1412 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1413 struct writequeue_entry *e; 1414 int len, offset; 1415 int count = 0; 1416 1417 mutex_lock(&con->sock_mutex); 1418 if (con->sock == NULL) 1419 goto out_connect; 1420 1421 spin_lock(&con->writequeue_lock); 1422 for (;;) { 1423 e = list_entry(con->writequeue.next, struct writequeue_entry, 1424 list); 1425 if ((struct list_head *) e == &con->writequeue) 1426 break; 1427 1428 len = e->len; 1429 offset = e->offset; 1430 BUG_ON(len == 0 && e->users == 0); 1431 spin_unlock(&con->writequeue_lock); 1432 1433 ret = 0; 1434 if (len) { 1435 ret = kernel_sendpage(con->sock, e->page, offset, len, 1436 msg_flags); 1437 if (ret == -EAGAIN || ret == 0) { 1438 if (ret == -EAGAIN && 1439 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1440 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1441 /* Notify TCP that we're limited by the 1442 * application window size. 1443 */ 1444 set_bit(SOCK_NOSPACE, &con->sock->flags); 1445 con->sock->sk->sk_write_pending++; 1446 } 1447 cond_resched(); 1448 goto out; 1449 } else if (ret < 0) 1450 goto send_error; 1451 } 1452 1453 /* Don't starve people filling buffers */ 1454 if (++count >= MAX_SEND_MSG_COUNT) { 1455 cond_resched(); 1456 count = 0; 1457 } 1458 1459 spin_lock(&con->writequeue_lock); 1460 writequeue_entry_complete(e, ret); 1461 } 1462 spin_unlock(&con->writequeue_lock); 1463 out: 1464 mutex_unlock(&con->sock_mutex); 1465 return; 1466 1467 send_error: 1468 mutex_unlock(&con->sock_mutex); 1469 close_connection(con, false, false, true); 1470 /* Requeue the send work. When the work daemon runs again, it will try 1471 a new connection, then call this function again. */ 1472 queue_work(send_workqueue, &con->swork); 1473 return; 1474 1475 out_connect: 1476 mutex_unlock(&con->sock_mutex); 1477 queue_work(send_workqueue, &con->swork); 1478 cond_resched(); 1479 } 1480 1481 static void clean_one_writequeue(struct connection *con) 1482 { 1483 struct writequeue_entry *e, *safe; 1484 1485 spin_lock(&con->writequeue_lock); 1486 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1487 list_del(&e->list); 1488 free_entry(e); 1489 } 1490 spin_unlock(&con->writequeue_lock); 1491 } 1492 1493 /* Called from recovery when it knows that a node has 1494 left the cluster */ 1495 int dlm_lowcomms_close(int nodeid) 1496 { 1497 struct connection *con; 1498 struct dlm_node_addr *na; 1499 1500 log_print("closing connection to node %d", nodeid); 1501 con = nodeid2con(nodeid, 0); 1502 if (con) { 1503 set_bit(CF_CLOSE, &con->flags); 1504 close_connection(con, true, true, true); 1505 clean_one_writequeue(con); 1506 } 1507 1508 spin_lock(&dlm_node_addrs_spin); 1509 na = find_node_addr(nodeid); 1510 if (na) { 1511 list_del(&na->list); 1512 while (na->addr_count--) 1513 kfree(na->addr[na->addr_count]); 1514 kfree(na); 1515 } 1516 spin_unlock(&dlm_node_addrs_spin); 1517 1518 return 0; 1519 } 1520 1521 /* Receive workqueue function */ 1522 static void process_recv_sockets(struct work_struct *work) 1523 { 1524 struct connection *con = container_of(work, struct connection, rwork); 1525 int err; 1526 1527 clear_bit(CF_READ_PENDING, &con->flags); 1528 do { 1529 err = con->rx_action(con); 1530 } while (!err); 1531 } 1532 1533 /* Send workqueue function */ 1534 static void process_send_sockets(struct work_struct *work) 1535 { 1536 struct connection *con = container_of(work, struct connection, swork); 1537 1538 clear_bit(CF_WRITE_PENDING, &con->flags); 1539 if (con->sock == NULL) /* not mutex protected so check it inside too */ 1540 con->connect_action(con); 1541 if (!list_empty(&con->writequeue)) 1542 send_to_sock(con); 1543 } 1544 1545 static void work_stop(void) 1546 { 1547 if (recv_workqueue) 1548 destroy_workqueue(recv_workqueue); 1549 if (send_workqueue) 1550 destroy_workqueue(send_workqueue); 1551 } 1552 1553 static int work_start(void) 1554 { 1555 recv_workqueue = alloc_workqueue("dlm_recv", 1556 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1557 if (!recv_workqueue) { 1558 log_print("can't start dlm_recv"); 1559 return -ENOMEM; 1560 } 1561 1562 send_workqueue = alloc_workqueue("dlm_send", 1563 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1564 if (!send_workqueue) { 1565 log_print("can't start dlm_send"); 1566 destroy_workqueue(recv_workqueue); 1567 return -ENOMEM; 1568 } 1569 1570 return 0; 1571 } 1572 1573 static void _stop_conn(struct connection *con, bool and_other) 1574 { 1575 mutex_lock(&con->sock_mutex); 1576 set_bit(CF_CLOSE, &con->flags); 1577 set_bit(CF_READ_PENDING, &con->flags); 1578 set_bit(CF_WRITE_PENDING, &con->flags); 1579 if (con->sock && con->sock->sk) { 1580 write_lock_bh(&con->sock->sk->sk_callback_lock); 1581 con->sock->sk->sk_user_data = NULL; 1582 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1583 } 1584 if (con->othercon && and_other) 1585 _stop_conn(con->othercon, false); 1586 mutex_unlock(&con->sock_mutex); 1587 } 1588 1589 static void stop_conn(struct connection *con) 1590 { 1591 _stop_conn(con, true); 1592 } 1593 1594 static void shutdown_conn(struct connection *con) 1595 { 1596 if (con->shutdown_action) 1597 con->shutdown_action(con); 1598 } 1599 1600 static void connection_release(struct rcu_head *rcu) 1601 { 1602 struct connection *con = container_of(rcu, struct connection, rcu); 1603 1604 kfree(con->rx_buf); 1605 kfree(con); 1606 } 1607 1608 static void free_conn(struct connection *con) 1609 { 1610 close_connection(con, true, true, true); 1611 spin_lock(&connections_lock); 1612 hlist_del_rcu(&con->list); 1613 spin_unlock(&connections_lock); 1614 if (con->othercon) { 1615 clean_one_writequeue(con->othercon); 1616 call_rcu(&con->othercon->rcu, connection_release); 1617 } 1618 clean_one_writequeue(con); 1619 call_rcu(&con->rcu, connection_release); 1620 } 1621 1622 static void work_flush(void) 1623 { 1624 int ok, idx; 1625 int i; 1626 struct connection *con; 1627 1628 do { 1629 ok = 1; 1630 foreach_conn(stop_conn); 1631 if (recv_workqueue) 1632 flush_workqueue(recv_workqueue); 1633 if (send_workqueue) 1634 flush_workqueue(send_workqueue); 1635 idx = srcu_read_lock(&connections_srcu); 1636 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1637 hlist_for_each_entry_rcu(con, &connection_hash[i], 1638 list) { 1639 ok &= test_bit(CF_READ_PENDING, &con->flags); 1640 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1641 if (con->othercon) { 1642 ok &= test_bit(CF_READ_PENDING, 1643 &con->othercon->flags); 1644 ok &= test_bit(CF_WRITE_PENDING, 1645 &con->othercon->flags); 1646 } 1647 } 1648 } 1649 srcu_read_unlock(&connections_srcu, idx); 1650 } while (!ok); 1651 } 1652 1653 void dlm_lowcomms_stop(void) 1654 { 1655 /* Set all the flags to prevent any 1656 socket activity. 1657 */ 1658 dlm_allow_conn = 0; 1659 1660 if (recv_workqueue) 1661 flush_workqueue(recv_workqueue); 1662 if (send_workqueue) 1663 flush_workqueue(send_workqueue); 1664 1665 foreach_conn(shutdown_conn); 1666 work_flush(); 1667 foreach_conn(free_conn); 1668 work_stop(); 1669 deinit_local(); 1670 } 1671 1672 int dlm_lowcomms_start(void) 1673 { 1674 int error = -EINVAL; 1675 struct connection *con; 1676 int i; 1677 1678 for (i = 0; i < CONN_HASH_SIZE; i++) 1679 INIT_HLIST_HEAD(&connection_hash[i]); 1680 1681 init_local(); 1682 if (!dlm_local_count) { 1683 error = -ENOTCONN; 1684 log_print("no local IP address has been set"); 1685 goto fail; 1686 } 1687 1688 error = work_start(); 1689 if (error) 1690 goto fail; 1691 1692 dlm_allow_conn = 1; 1693 1694 /* Start listening */ 1695 if (dlm_config.ci_protocol == 0) 1696 error = tcp_listen_for_all(); 1697 else 1698 error = sctp_listen_for_all(); 1699 if (error) 1700 goto fail_unlisten; 1701 1702 return 0; 1703 1704 fail_unlisten: 1705 dlm_allow_conn = 0; 1706 con = nodeid2con(0,0); 1707 if (con) 1708 free_conn(con); 1709 fail: 1710 return error; 1711 } 1712 1713 void dlm_lowcomms_exit(void) 1714 { 1715 struct dlm_node_addr *na, *safe; 1716 1717 spin_lock(&dlm_node_addrs_spin); 1718 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1719 list_del(&na->list); 1720 while (na->addr_count--) 1721 kfree(na->addr[na->addr_count]); 1722 kfree(na); 1723 } 1724 spin_unlock(&dlm_node_addrs_spin); 1725 } 1726