1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 /* 13 * lowcomms.c 14 * 15 * This is the "low-level" comms layer. 16 * 17 * It is responsible for sending/receiving messages 18 * from other nodes in the cluster. 19 * 20 * Cluster nodes are referred to by their nodeids. nodeids are 21 * simply 32 bit numbers to the locking module - if they need to 22 * be expanded for the cluster infrastructure then that is its 23 * responsibility. It is this layer's 24 * responsibility to resolve these into IP address or 25 * whatever it needs for inter-node communication. 26 * 27 * The comms level is two kernel threads that deal mainly with 28 * the receiving of messages from other nodes and passing them 29 * up to the mid-level comms layer (which understands the 30 * message format) for execution by the locking core, and 31 * a send thread which does all the setting up of connections 32 * to remote nodes and the sending of data. Threads are not allowed 33 * to send their own data because it may cause them to wait in times 34 * of high load. Also, this way, the sending thread can collect together 35 * messages bound for one node and send them in one block. 36 * 37 * lowcomms will choose to use either TCP or SCTP as its transport layer 38 * depending on the configuration variable 'protocol'. This should be set 39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 40 * cluster-wide mechanism as it must be the same on all nodes of the cluster 41 * for the DLM to function. 42 * 43 */ 44 45 #include <asm/ioctls.h> 46 #include <net/sock.h> 47 #include <net/tcp.h> 48 #include <linux/pagemap.h> 49 #include <linux/file.h> 50 #include <linux/mutex.h> 51 #include <linux/sctp.h> 52 #include <linux/slab.h> 53 #include <net/sctp/sctp.h> 54 #include <net/ipv6.h> 55 56 #include "dlm_internal.h" 57 #include "lowcomms.h" 58 #include "midcomms.h" 59 #include "config.h" 60 61 #define NEEDED_RMEM (4*1024*1024) 62 63 /* Number of messages to send before rescheduling */ 64 #define MAX_SEND_MSG_COUNT 25 65 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) 66 67 struct connection { 68 struct socket *sock; /* NULL if not connected */ 69 uint32_t nodeid; /* So we know who we are in the list */ 70 struct mutex sock_mutex; 71 unsigned long flags; 72 #define CF_READ_PENDING 1 73 #define CF_WRITE_PENDING 2 74 #define CF_INIT_PENDING 4 75 #define CF_IS_OTHERCON 5 76 #define CF_CLOSE 6 77 #define CF_APP_LIMITED 7 78 #define CF_CLOSING 8 79 #define CF_SHUTDOWN 9 80 #define CF_CONNECTED 10 81 #define CF_RECONNECT 11 82 #define CF_DELAY_CONNECT 12 83 #define CF_EOF 13 84 struct list_head writequeue; /* List of outgoing writequeue_entries */ 85 spinlock_t writequeue_lock; 86 atomic_t writequeue_cnt; 87 void (*connect_action) (struct connection *); /* What to do to connect */ 88 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ 89 bool (*eof_condition)(struct connection *con); /* What to do to eof check */ 90 int retries; 91 #define MAX_CONNECT_RETRIES 3 92 struct hlist_node list; 93 struct connection *othercon; 94 struct connection *sendcon; 95 struct work_struct rwork; /* Receive workqueue */ 96 struct work_struct swork; /* Send workqueue */ 97 wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ 98 unsigned char *rx_buf; 99 int rx_buflen; 100 int rx_leftover; 101 struct rcu_head rcu; 102 }; 103 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 104 105 struct listen_connection { 106 struct socket *sock; 107 struct work_struct rwork; 108 }; 109 110 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end) 111 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset) 112 113 /* An entry waiting to be sent */ 114 struct writequeue_entry { 115 struct list_head list; 116 struct page *page; 117 int offset; 118 int len; 119 int end; 120 int users; 121 bool dirty; 122 struct connection *con; 123 struct list_head msgs; 124 struct kref ref; 125 }; 126 127 struct dlm_msg { 128 struct writequeue_entry *entry; 129 struct dlm_msg *orig_msg; 130 bool retransmit; 131 void *ppc; 132 int len; 133 int idx; /* new()/commit() idx exchange */ 134 135 struct list_head list; 136 struct kref ref; 137 }; 138 139 struct dlm_node_addr { 140 struct list_head list; 141 int nodeid; 142 int mark; 143 int addr_count; 144 int curr_addr_index; 145 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 146 }; 147 148 static struct listen_sock_callbacks { 149 void (*sk_error_report)(struct sock *); 150 void (*sk_data_ready)(struct sock *); 151 void (*sk_state_change)(struct sock *); 152 void (*sk_write_space)(struct sock *); 153 } listen_sock; 154 155 static LIST_HEAD(dlm_node_addrs); 156 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 157 158 static struct listen_connection listen_con; 159 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 160 static int dlm_local_count; 161 int dlm_allow_conn; 162 163 /* Work queues */ 164 static struct workqueue_struct *recv_workqueue; 165 static struct workqueue_struct *send_workqueue; 166 167 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 168 static DEFINE_SPINLOCK(connections_lock); 169 DEFINE_STATIC_SRCU(connections_srcu); 170 171 static void process_recv_sockets(struct work_struct *work); 172 static void process_send_sockets(struct work_struct *work); 173 174 static void sctp_connect_to_sock(struct connection *con); 175 static void tcp_connect_to_sock(struct connection *con); 176 static void dlm_tcp_shutdown(struct connection *con); 177 178 static struct connection *__find_con(int nodeid, int r) 179 { 180 struct connection *con; 181 182 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { 183 if (con->nodeid == nodeid) 184 return con; 185 } 186 187 return NULL; 188 } 189 190 static bool tcp_eof_condition(struct connection *con) 191 { 192 return atomic_read(&con->writequeue_cnt); 193 } 194 195 static int dlm_con_init(struct connection *con, int nodeid) 196 { 197 con->rx_buflen = dlm_config.ci_buffer_size; 198 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); 199 if (!con->rx_buf) 200 return -ENOMEM; 201 202 con->nodeid = nodeid; 203 mutex_init(&con->sock_mutex); 204 INIT_LIST_HEAD(&con->writequeue); 205 spin_lock_init(&con->writequeue_lock); 206 atomic_set(&con->writequeue_cnt, 0); 207 INIT_WORK(&con->swork, process_send_sockets); 208 INIT_WORK(&con->rwork, process_recv_sockets); 209 init_waitqueue_head(&con->shutdown_wait); 210 211 switch (dlm_config.ci_protocol) { 212 case DLM_PROTO_TCP: 213 con->connect_action = tcp_connect_to_sock; 214 con->shutdown_action = dlm_tcp_shutdown; 215 con->eof_condition = tcp_eof_condition; 216 break; 217 case DLM_PROTO_SCTP: 218 con->connect_action = sctp_connect_to_sock; 219 break; 220 default: 221 kfree(con->rx_buf); 222 return -EINVAL; 223 } 224 225 return 0; 226 } 227 228 /* 229 * If 'allocation' is zero then we don't attempt to create a new 230 * connection structure for this node. 231 */ 232 static struct connection *nodeid2con(int nodeid, gfp_t alloc) 233 { 234 struct connection *con, *tmp; 235 int r, ret; 236 237 r = nodeid_hash(nodeid); 238 con = __find_con(nodeid, r); 239 if (con || !alloc) 240 return con; 241 242 con = kzalloc(sizeof(*con), alloc); 243 if (!con) 244 return NULL; 245 246 ret = dlm_con_init(con, nodeid); 247 if (ret) { 248 kfree(con); 249 return NULL; 250 } 251 252 spin_lock(&connections_lock); 253 /* Because multiple workqueues/threads calls this function it can 254 * race on multiple cpu's. Instead of locking hot path __find_con() 255 * we just check in rare cases of recently added nodes again 256 * under protection of connections_lock. If this is the case we 257 * abort our connection creation and return the existing connection. 258 */ 259 tmp = __find_con(nodeid, r); 260 if (tmp) { 261 spin_unlock(&connections_lock); 262 kfree(con->rx_buf); 263 kfree(con); 264 return tmp; 265 } 266 267 hlist_add_head_rcu(&con->list, &connection_hash[r]); 268 spin_unlock(&connections_lock); 269 270 return con; 271 } 272 273 /* Loop round all connections */ 274 static void foreach_conn(void (*conn_func)(struct connection *c)) 275 { 276 int i; 277 struct connection *con; 278 279 for (i = 0; i < CONN_HASH_SIZE; i++) { 280 hlist_for_each_entry_rcu(con, &connection_hash[i], list) 281 conn_func(con); 282 } 283 } 284 285 static struct dlm_node_addr *find_node_addr(int nodeid) 286 { 287 struct dlm_node_addr *na; 288 289 list_for_each_entry(na, &dlm_node_addrs, list) { 290 if (na->nodeid == nodeid) 291 return na; 292 } 293 return NULL; 294 } 295 296 static int addr_compare(const struct sockaddr_storage *x, 297 const struct sockaddr_storage *y) 298 { 299 switch (x->ss_family) { 300 case AF_INET: { 301 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 302 struct sockaddr_in *siny = (struct sockaddr_in *)y; 303 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 304 return 0; 305 if (sinx->sin_port != siny->sin_port) 306 return 0; 307 break; 308 } 309 case AF_INET6: { 310 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 311 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 312 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 313 return 0; 314 if (sinx->sin6_port != siny->sin6_port) 315 return 0; 316 break; 317 } 318 default: 319 return 0; 320 } 321 return 1; 322 } 323 324 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 325 struct sockaddr *sa_out, bool try_new_addr, 326 unsigned int *mark) 327 { 328 struct sockaddr_storage sas; 329 struct dlm_node_addr *na; 330 331 if (!dlm_local_count) 332 return -1; 333 334 spin_lock(&dlm_node_addrs_spin); 335 na = find_node_addr(nodeid); 336 if (na && na->addr_count) { 337 memcpy(&sas, na->addr[na->curr_addr_index], 338 sizeof(struct sockaddr_storage)); 339 340 if (try_new_addr) { 341 na->curr_addr_index++; 342 if (na->curr_addr_index == na->addr_count) 343 na->curr_addr_index = 0; 344 } 345 } 346 spin_unlock(&dlm_node_addrs_spin); 347 348 if (!na) 349 return -EEXIST; 350 351 if (!na->addr_count) 352 return -ENOENT; 353 354 *mark = na->mark; 355 356 if (sas_out) 357 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 358 359 if (!sa_out) 360 return 0; 361 362 if (dlm_local_addr[0]->ss_family == AF_INET) { 363 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 364 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 365 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 366 } else { 367 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 368 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 369 ret6->sin6_addr = in6->sin6_addr; 370 } 371 372 return 0; 373 } 374 375 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid, 376 unsigned int *mark) 377 { 378 struct dlm_node_addr *na; 379 int rv = -EEXIST; 380 int addr_i; 381 382 spin_lock(&dlm_node_addrs_spin); 383 list_for_each_entry(na, &dlm_node_addrs, list) { 384 if (!na->addr_count) 385 continue; 386 387 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { 388 if (addr_compare(na->addr[addr_i], addr)) { 389 *nodeid = na->nodeid; 390 *mark = na->mark; 391 rv = 0; 392 goto unlock; 393 } 394 } 395 } 396 unlock: 397 spin_unlock(&dlm_node_addrs_spin); 398 return rv; 399 } 400 401 /* caller need to held dlm_node_addrs_spin lock */ 402 static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na, 403 const struct sockaddr_storage *addr) 404 { 405 int i; 406 407 for (i = 0; i < na->addr_count; i++) { 408 if (addr_compare(na->addr[i], addr)) 409 return true; 410 } 411 412 return false; 413 } 414 415 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 416 { 417 struct sockaddr_storage *new_addr; 418 struct dlm_node_addr *new_node, *na; 419 bool ret; 420 421 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 422 if (!new_node) 423 return -ENOMEM; 424 425 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 426 if (!new_addr) { 427 kfree(new_node); 428 return -ENOMEM; 429 } 430 431 memcpy(new_addr, addr, len); 432 433 spin_lock(&dlm_node_addrs_spin); 434 na = find_node_addr(nodeid); 435 if (!na) { 436 new_node->nodeid = nodeid; 437 new_node->addr[0] = new_addr; 438 new_node->addr_count = 1; 439 new_node->mark = dlm_config.ci_mark; 440 list_add(&new_node->list, &dlm_node_addrs); 441 spin_unlock(&dlm_node_addrs_spin); 442 return 0; 443 } 444 445 ret = dlm_lowcomms_na_has_addr(na, addr); 446 if (ret) { 447 spin_unlock(&dlm_node_addrs_spin); 448 kfree(new_addr); 449 kfree(new_node); 450 return -EEXIST; 451 } 452 453 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 454 spin_unlock(&dlm_node_addrs_spin); 455 kfree(new_addr); 456 kfree(new_node); 457 return -ENOSPC; 458 } 459 460 na->addr[na->addr_count++] = new_addr; 461 spin_unlock(&dlm_node_addrs_spin); 462 kfree(new_node); 463 return 0; 464 } 465 466 /* Data available on socket or listen socket received a connect */ 467 static void lowcomms_data_ready(struct sock *sk) 468 { 469 struct connection *con; 470 471 read_lock_bh(&sk->sk_callback_lock); 472 con = sock2con(sk); 473 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 474 queue_work(recv_workqueue, &con->rwork); 475 read_unlock_bh(&sk->sk_callback_lock); 476 } 477 478 static void lowcomms_listen_data_ready(struct sock *sk) 479 { 480 if (!dlm_allow_conn) 481 return; 482 483 queue_work(recv_workqueue, &listen_con.rwork); 484 } 485 486 static void lowcomms_write_space(struct sock *sk) 487 { 488 struct connection *con; 489 490 read_lock_bh(&sk->sk_callback_lock); 491 con = sock2con(sk); 492 if (!con) 493 goto out; 494 495 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) { 496 log_print("successful connected to node %d", con->nodeid); 497 queue_work(send_workqueue, &con->swork); 498 goto out; 499 } 500 501 clear_bit(SOCK_NOSPACE, &con->sock->flags); 502 503 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 504 con->sock->sk->sk_write_pending--; 505 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); 506 } 507 508 queue_work(send_workqueue, &con->swork); 509 out: 510 read_unlock_bh(&sk->sk_callback_lock); 511 } 512 513 static inline void lowcomms_connect_sock(struct connection *con) 514 { 515 if (test_bit(CF_CLOSE, &con->flags)) 516 return; 517 queue_work(send_workqueue, &con->swork); 518 cond_resched(); 519 } 520 521 static void lowcomms_state_change(struct sock *sk) 522 { 523 /* SCTP layer is not calling sk_data_ready when the connection 524 * is done, so we catch the signal through here. Also, it 525 * doesn't switch socket state when entering shutdown, so we 526 * skip the write in that case. 527 */ 528 if (sk->sk_shutdown) { 529 if (sk->sk_shutdown == RCV_SHUTDOWN) 530 lowcomms_data_ready(sk); 531 } else if (sk->sk_state == TCP_ESTABLISHED) { 532 lowcomms_write_space(sk); 533 } 534 } 535 536 int dlm_lowcomms_connect_node(int nodeid) 537 { 538 struct connection *con; 539 int idx; 540 541 if (nodeid == dlm_our_nodeid()) 542 return 0; 543 544 idx = srcu_read_lock(&connections_srcu); 545 con = nodeid2con(nodeid, GFP_NOFS); 546 if (!con) { 547 srcu_read_unlock(&connections_srcu, idx); 548 return -ENOMEM; 549 } 550 551 lowcomms_connect_sock(con); 552 srcu_read_unlock(&connections_srcu, idx); 553 554 return 0; 555 } 556 557 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) 558 { 559 struct dlm_node_addr *na; 560 561 spin_lock(&dlm_node_addrs_spin); 562 na = find_node_addr(nodeid); 563 if (!na) { 564 spin_unlock(&dlm_node_addrs_spin); 565 return -ENOENT; 566 } 567 568 na->mark = mark; 569 spin_unlock(&dlm_node_addrs_spin); 570 571 return 0; 572 } 573 574 static void lowcomms_error_report(struct sock *sk) 575 { 576 struct connection *con; 577 struct sockaddr_storage saddr; 578 void (*orig_report)(struct sock *) = NULL; 579 580 read_lock_bh(&sk->sk_callback_lock); 581 con = sock2con(sk); 582 if (con == NULL) 583 goto out; 584 585 orig_report = listen_sock.sk_error_report; 586 if (con->sock == NULL || 587 kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) { 588 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 589 "sending to node %d, port %d, " 590 "sk_err=%d/%d\n", dlm_our_nodeid(), 591 con->nodeid, dlm_config.ci_tcp_port, 592 sk->sk_err, sk->sk_err_soft); 593 } else if (saddr.ss_family == AF_INET) { 594 struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr; 595 596 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 597 "sending to node %d at %pI4, port %d, " 598 "sk_err=%d/%d\n", dlm_our_nodeid(), 599 con->nodeid, &sin4->sin_addr.s_addr, 600 dlm_config.ci_tcp_port, sk->sk_err, 601 sk->sk_err_soft); 602 } else { 603 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr; 604 605 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " 606 "sending to node %d at %u.%u.%u.%u, " 607 "port %d, sk_err=%d/%d\n", dlm_our_nodeid(), 608 con->nodeid, sin6->sin6_addr.s6_addr32[0], 609 sin6->sin6_addr.s6_addr32[1], 610 sin6->sin6_addr.s6_addr32[2], 611 sin6->sin6_addr.s6_addr32[3], 612 dlm_config.ci_tcp_port, sk->sk_err, 613 sk->sk_err_soft); 614 } 615 616 /* below sendcon only handling */ 617 if (test_bit(CF_IS_OTHERCON, &con->flags)) 618 con = con->sendcon; 619 620 switch (sk->sk_err) { 621 case ECONNREFUSED: 622 set_bit(CF_DELAY_CONNECT, &con->flags); 623 break; 624 default: 625 break; 626 } 627 628 if (!test_and_set_bit(CF_RECONNECT, &con->flags)) 629 queue_work(send_workqueue, &con->swork); 630 631 out: 632 read_unlock_bh(&sk->sk_callback_lock); 633 if (orig_report) 634 orig_report(sk); 635 } 636 637 /* Note: sk_callback_lock must be locked before calling this function. */ 638 static void save_listen_callbacks(struct socket *sock) 639 { 640 struct sock *sk = sock->sk; 641 642 listen_sock.sk_data_ready = sk->sk_data_ready; 643 listen_sock.sk_state_change = sk->sk_state_change; 644 listen_sock.sk_write_space = sk->sk_write_space; 645 listen_sock.sk_error_report = sk->sk_error_report; 646 } 647 648 static void restore_callbacks(struct socket *sock) 649 { 650 struct sock *sk = sock->sk; 651 652 write_lock_bh(&sk->sk_callback_lock); 653 sk->sk_user_data = NULL; 654 sk->sk_data_ready = listen_sock.sk_data_ready; 655 sk->sk_state_change = listen_sock.sk_state_change; 656 sk->sk_write_space = listen_sock.sk_write_space; 657 sk->sk_error_report = listen_sock.sk_error_report; 658 write_unlock_bh(&sk->sk_callback_lock); 659 } 660 661 static void add_listen_sock(struct socket *sock, struct listen_connection *con) 662 { 663 struct sock *sk = sock->sk; 664 665 write_lock_bh(&sk->sk_callback_lock); 666 save_listen_callbacks(sock); 667 con->sock = sock; 668 669 sk->sk_user_data = con; 670 sk->sk_allocation = GFP_NOFS; 671 /* Install a data_ready callback */ 672 sk->sk_data_ready = lowcomms_listen_data_ready; 673 write_unlock_bh(&sk->sk_callback_lock); 674 } 675 676 /* Make a socket active */ 677 static void add_sock(struct socket *sock, struct connection *con) 678 { 679 struct sock *sk = sock->sk; 680 681 write_lock_bh(&sk->sk_callback_lock); 682 con->sock = sock; 683 684 sk->sk_user_data = con; 685 /* Install a data_ready callback */ 686 sk->sk_data_ready = lowcomms_data_ready; 687 sk->sk_write_space = lowcomms_write_space; 688 sk->sk_state_change = lowcomms_state_change; 689 sk->sk_allocation = GFP_NOFS; 690 sk->sk_error_report = lowcomms_error_report; 691 write_unlock_bh(&sk->sk_callback_lock); 692 } 693 694 /* Add the port number to an IPv6 or 4 sockaddr and return the address 695 length */ 696 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 697 int *addr_len) 698 { 699 saddr->ss_family = dlm_local_addr[0]->ss_family; 700 if (saddr->ss_family == AF_INET) { 701 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 702 in4_addr->sin_port = cpu_to_be16(port); 703 *addr_len = sizeof(struct sockaddr_in); 704 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 705 } else { 706 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 707 in6_addr->sin6_port = cpu_to_be16(port); 708 *addr_len = sizeof(struct sockaddr_in6); 709 } 710 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 711 } 712 713 static void dlm_page_release(struct kref *kref) 714 { 715 struct writequeue_entry *e = container_of(kref, struct writequeue_entry, 716 ref); 717 718 __free_page(e->page); 719 kfree(e); 720 } 721 722 static void dlm_msg_release(struct kref *kref) 723 { 724 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref); 725 726 kref_put(&msg->entry->ref, dlm_page_release); 727 kfree(msg); 728 } 729 730 static void free_entry(struct writequeue_entry *e) 731 { 732 struct dlm_msg *msg, *tmp; 733 734 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { 735 if (msg->orig_msg) { 736 msg->orig_msg->retransmit = false; 737 kref_put(&msg->orig_msg->ref, dlm_msg_release); 738 } 739 740 list_del(&msg->list); 741 kref_put(&msg->ref, dlm_msg_release); 742 } 743 744 list_del(&e->list); 745 atomic_dec(&e->con->writequeue_cnt); 746 kref_put(&e->ref, dlm_page_release); 747 } 748 749 static void dlm_close_sock(struct socket **sock) 750 { 751 if (*sock) { 752 restore_callbacks(*sock); 753 sock_release(*sock); 754 *sock = NULL; 755 } 756 } 757 758 /* Close a remote connection and tidy up */ 759 static void close_connection(struct connection *con, bool and_other, 760 bool tx, bool rx) 761 { 762 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); 763 struct writequeue_entry *e; 764 765 if (tx && !closing && cancel_work_sync(&con->swork)) { 766 log_print("canceled swork for node %d", con->nodeid); 767 clear_bit(CF_WRITE_PENDING, &con->flags); 768 } 769 if (rx && !closing && cancel_work_sync(&con->rwork)) { 770 log_print("canceled rwork for node %d", con->nodeid); 771 clear_bit(CF_READ_PENDING, &con->flags); 772 } 773 774 mutex_lock(&con->sock_mutex); 775 dlm_close_sock(&con->sock); 776 777 if (con->othercon && and_other) { 778 /* Will only re-enter once. */ 779 close_connection(con->othercon, false, tx, rx); 780 } 781 782 /* if we send a writequeue entry only a half way, we drop the 783 * whole entry because reconnection and that we not start of the 784 * middle of a msg which will confuse the other end. 785 * 786 * we can always drop messages because retransmits, but what we 787 * cannot allow is to transmit half messages which may be processed 788 * at the other side. 789 * 790 * our policy is to start on a clean state when disconnects, we don't 791 * know what's send/received on transport layer in this case. 792 */ 793 spin_lock(&con->writequeue_lock); 794 if (!list_empty(&con->writequeue)) { 795 e = list_first_entry(&con->writequeue, struct writequeue_entry, 796 list); 797 if (e->dirty) 798 free_entry(e); 799 } 800 spin_unlock(&con->writequeue_lock); 801 802 con->rx_leftover = 0; 803 con->retries = 0; 804 clear_bit(CF_CONNECTED, &con->flags); 805 clear_bit(CF_DELAY_CONNECT, &con->flags); 806 clear_bit(CF_RECONNECT, &con->flags); 807 clear_bit(CF_EOF, &con->flags); 808 mutex_unlock(&con->sock_mutex); 809 clear_bit(CF_CLOSING, &con->flags); 810 } 811 812 static void shutdown_connection(struct connection *con) 813 { 814 int ret; 815 816 flush_work(&con->swork); 817 818 mutex_lock(&con->sock_mutex); 819 /* nothing to shutdown */ 820 if (!con->sock) { 821 mutex_unlock(&con->sock_mutex); 822 return; 823 } 824 825 set_bit(CF_SHUTDOWN, &con->flags); 826 ret = kernel_sock_shutdown(con->sock, SHUT_WR); 827 mutex_unlock(&con->sock_mutex); 828 if (ret) { 829 log_print("Connection %p failed to shutdown: %d will force close", 830 con, ret); 831 goto force_close; 832 } else { 833 ret = wait_event_timeout(con->shutdown_wait, 834 !test_bit(CF_SHUTDOWN, &con->flags), 835 DLM_SHUTDOWN_WAIT_TIMEOUT); 836 if (ret == 0) { 837 log_print("Connection %p shutdown timed out, will force close", 838 con); 839 goto force_close; 840 } 841 } 842 843 return; 844 845 force_close: 846 clear_bit(CF_SHUTDOWN, &con->flags); 847 close_connection(con, false, true, true); 848 } 849 850 static void dlm_tcp_shutdown(struct connection *con) 851 { 852 if (con->othercon) 853 shutdown_connection(con->othercon); 854 shutdown_connection(con); 855 } 856 857 static int con_realloc_receive_buf(struct connection *con, int newlen) 858 { 859 unsigned char *newbuf; 860 861 newbuf = kmalloc(newlen, GFP_NOFS); 862 if (!newbuf) 863 return -ENOMEM; 864 865 /* copy any leftover from last receive */ 866 if (con->rx_leftover) 867 memmove(newbuf, con->rx_buf, con->rx_leftover); 868 869 /* swap to new buffer space */ 870 kfree(con->rx_buf); 871 con->rx_buflen = newlen; 872 con->rx_buf = newbuf; 873 874 return 0; 875 } 876 877 /* Data received from remote end */ 878 static int receive_from_sock(struct connection *con) 879 { 880 int call_again_soon = 0; 881 struct msghdr msg; 882 struct kvec iov; 883 int ret, buflen; 884 885 mutex_lock(&con->sock_mutex); 886 887 if (con->sock == NULL) { 888 ret = -EAGAIN; 889 goto out_close; 890 } 891 892 /* realloc if we get new buffer size to read out */ 893 buflen = dlm_config.ci_buffer_size; 894 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { 895 ret = con_realloc_receive_buf(con, buflen); 896 if (ret < 0) 897 goto out_resched; 898 } 899 900 /* calculate new buffer parameter regarding last receive and 901 * possible leftover bytes 902 */ 903 iov.iov_base = con->rx_buf + con->rx_leftover; 904 iov.iov_len = con->rx_buflen - con->rx_leftover; 905 906 memset(&msg, 0, sizeof(msg)); 907 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 908 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, 909 msg.msg_flags); 910 if (ret <= 0) 911 goto out_close; 912 else if (ret == iov.iov_len) 913 call_again_soon = 1; 914 915 /* new buflen according readed bytes and leftover from last receive */ 916 buflen = ret + con->rx_leftover; 917 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); 918 if (ret < 0) 919 goto out_close; 920 921 /* calculate leftover bytes from process and put it into begin of 922 * the receive buffer, so next receive we have the full message 923 * at the start address of the receive buffer. 924 */ 925 con->rx_leftover = buflen - ret; 926 if (con->rx_leftover) { 927 memmove(con->rx_buf, con->rx_buf + ret, 928 con->rx_leftover); 929 call_again_soon = true; 930 } 931 932 if (call_again_soon) 933 goto out_resched; 934 935 mutex_unlock(&con->sock_mutex); 936 return 0; 937 938 out_resched: 939 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 940 queue_work(recv_workqueue, &con->rwork); 941 mutex_unlock(&con->sock_mutex); 942 return -EAGAIN; 943 944 out_close: 945 if (ret == 0) { 946 log_print("connection %p got EOF from %d", 947 con, con->nodeid); 948 949 if (con->eof_condition && con->eof_condition(con)) { 950 set_bit(CF_EOF, &con->flags); 951 mutex_unlock(&con->sock_mutex); 952 } else { 953 mutex_unlock(&con->sock_mutex); 954 close_connection(con, false, true, false); 955 956 /* handling for tcp shutdown */ 957 clear_bit(CF_SHUTDOWN, &con->flags); 958 wake_up(&con->shutdown_wait); 959 } 960 961 /* signal to breaking receive worker */ 962 ret = -1; 963 } else { 964 mutex_unlock(&con->sock_mutex); 965 } 966 return ret; 967 } 968 969 /* Listening socket is busy, accept a connection */ 970 static int accept_from_sock(struct listen_connection *con) 971 { 972 int result; 973 struct sockaddr_storage peeraddr; 974 struct socket *newsock; 975 int len, idx; 976 int nodeid; 977 struct connection *newcon; 978 struct connection *addcon; 979 unsigned int mark; 980 981 if (!con->sock) 982 return -ENOTCONN; 983 984 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); 985 if (result < 0) 986 goto accept_err; 987 988 /* Get the connected socket's peer */ 989 memset(&peeraddr, 0, sizeof(peeraddr)); 990 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); 991 if (len < 0) { 992 result = -ECONNABORTED; 993 goto accept_err; 994 } 995 996 /* Get the new node's NODEID */ 997 make_sockaddr(&peeraddr, 0, &len); 998 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) { 999 unsigned char *b=(unsigned char *)&peeraddr; 1000 log_print("connect from non cluster node"); 1001 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 1002 b, sizeof(struct sockaddr_storage)); 1003 sock_release(newsock); 1004 return -1; 1005 } 1006 1007 log_print("got connection from %d", nodeid); 1008 1009 /* Check to see if we already have a connection to this node. This 1010 * could happen if the two nodes initiate a connection at roughly 1011 * the same time and the connections cross on the wire. 1012 * In this case we store the incoming one in "othercon" 1013 */ 1014 idx = srcu_read_lock(&connections_srcu); 1015 newcon = nodeid2con(nodeid, GFP_NOFS); 1016 if (!newcon) { 1017 srcu_read_unlock(&connections_srcu, idx); 1018 result = -ENOMEM; 1019 goto accept_err; 1020 } 1021 1022 sock_set_mark(newsock->sk, mark); 1023 1024 mutex_lock(&newcon->sock_mutex); 1025 if (newcon->sock) { 1026 struct connection *othercon = newcon->othercon; 1027 1028 if (!othercon) { 1029 othercon = kzalloc(sizeof(*othercon), GFP_NOFS); 1030 if (!othercon) { 1031 log_print("failed to allocate incoming socket"); 1032 mutex_unlock(&newcon->sock_mutex); 1033 srcu_read_unlock(&connections_srcu, idx); 1034 result = -ENOMEM; 1035 goto accept_err; 1036 } 1037 1038 result = dlm_con_init(othercon, nodeid); 1039 if (result < 0) { 1040 kfree(othercon); 1041 mutex_unlock(&newcon->sock_mutex); 1042 srcu_read_unlock(&connections_srcu, idx); 1043 goto accept_err; 1044 } 1045 1046 lockdep_set_subclass(&othercon->sock_mutex, 1); 1047 set_bit(CF_IS_OTHERCON, &othercon->flags); 1048 newcon->othercon = othercon; 1049 othercon->sendcon = newcon; 1050 } else { 1051 /* close other sock con if we have something new */ 1052 close_connection(othercon, false, true, false); 1053 } 1054 1055 mutex_lock(&othercon->sock_mutex); 1056 add_sock(newsock, othercon); 1057 addcon = othercon; 1058 mutex_unlock(&othercon->sock_mutex); 1059 } 1060 else { 1061 /* accept copies the sk after we've saved the callbacks, so we 1062 don't want to save them a second time or comm errors will 1063 result in calling sk_error_report recursively. */ 1064 add_sock(newsock, newcon); 1065 addcon = newcon; 1066 } 1067 1068 set_bit(CF_CONNECTED, &addcon->flags); 1069 mutex_unlock(&newcon->sock_mutex); 1070 1071 /* 1072 * Add it to the active queue in case we got data 1073 * between processing the accept adding the socket 1074 * to the read_sockets list 1075 */ 1076 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 1077 queue_work(recv_workqueue, &addcon->rwork); 1078 1079 srcu_read_unlock(&connections_srcu, idx); 1080 1081 return 0; 1082 1083 accept_err: 1084 if (newsock) 1085 sock_release(newsock); 1086 1087 if (result != -EAGAIN) 1088 log_print("error accepting connection from node: %d", result); 1089 return result; 1090 } 1091 1092 /* 1093 * writequeue_entry_complete - try to delete and free write queue entry 1094 * @e: write queue entry to try to delete 1095 * @completed: bytes completed 1096 * 1097 * writequeue_lock must be held. 1098 */ 1099 static void writequeue_entry_complete(struct writequeue_entry *e, int completed) 1100 { 1101 e->offset += completed; 1102 e->len -= completed; 1103 /* signal that page was half way transmitted */ 1104 e->dirty = true; 1105 1106 if (e->len == 0 && e->users == 0) 1107 free_entry(e); 1108 } 1109 1110 /* 1111 * sctp_bind_addrs - bind a SCTP socket to all our addresses 1112 */ 1113 static int sctp_bind_addrs(struct socket *sock, uint16_t port) 1114 { 1115 struct sockaddr_storage localaddr; 1116 struct sockaddr *addr = (struct sockaddr *)&localaddr; 1117 int i, addr_len, result = 0; 1118 1119 for (i = 0; i < dlm_local_count; i++) { 1120 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1121 make_sockaddr(&localaddr, port, &addr_len); 1122 1123 if (!i) 1124 result = kernel_bind(sock, addr, addr_len); 1125 else 1126 result = sock_bind_add(sock->sk, addr, addr_len); 1127 1128 if (result < 0) { 1129 log_print("Can't bind to %d addr number %d, %d.\n", 1130 port, i + 1, result); 1131 break; 1132 } 1133 } 1134 return result; 1135 } 1136 1137 /* Initiate an SCTP association. 1138 This is a special case of send_to_sock() in that we don't yet have a 1139 peeled-off socket for this association, so we use the listening socket 1140 and add the primary IP address of the remote node. 1141 */ 1142 static void sctp_connect_to_sock(struct connection *con) 1143 { 1144 struct sockaddr_storage daddr; 1145 int result; 1146 int addr_len; 1147 struct socket *sock; 1148 unsigned int mark; 1149 1150 mutex_lock(&con->sock_mutex); 1151 1152 /* Some odd races can cause double-connects, ignore them */ 1153 if (con->retries++ > MAX_CONNECT_RETRIES) 1154 goto out; 1155 1156 if (con->sock) { 1157 log_print("node %d already connected.", con->nodeid); 1158 goto out; 1159 } 1160 1161 memset(&daddr, 0, sizeof(daddr)); 1162 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark); 1163 if (result < 0) { 1164 log_print("no address for nodeid %d", con->nodeid); 1165 goto out; 1166 } 1167 1168 /* Create a socket to communicate with */ 1169 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1170 SOCK_STREAM, IPPROTO_SCTP, &sock); 1171 if (result < 0) 1172 goto socket_err; 1173 1174 sock_set_mark(sock->sk, mark); 1175 1176 add_sock(sock, con); 1177 1178 /* Bind to all addresses. */ 1179 if (sctp_bind_addrs(con->sock, 0)) 1180 goto bind_err; 1181 1182 make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len); 1183 1184 log_print_ratelimited("connecting to %d", con->nodeid); 1185 1186 /* Turn off Nagle's algorithm */ 1187 sctp_sock_set_nodelay(sock->sk); 1188 1189 /* 1190 * Make sock->ops->connect() function return in specified time, 1191 * since O_NONBLOCK argument in connect() function does not work here, 1192 * then, we should restore the default value of this attribute. 1193 */ 1194 sock_set_sndtimeo(sock->sk, 5); 1195 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, 1196 0); 1197 sock_set_sndtimeo(sock->sk, 0); 1198 1199 if (result == -EINPROGRESS) 1200 result = 0; 1201 if (result == 0) { 1202 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) 1203 log_print("successful connected to node %d", con->nodeid); 1204 goto out; 1205 } 1206 1207 bind_err: 1208 con->sock = NULL; 1209 sock_release(sock); 1210 1211 socket_err: 1212 /* 1213 * Some errors are fatal and this list might need adjusting. For other 1214 * errors we try again until the max number of retries is reached. 1215 */ 1216 if (result != -EHOSTUNREACH && 1217 result != -ENETUNREACH && 1218 result != -ENETDOWN && 1219 result != -EINVAL && 1220 result != -EPROTONOSUPPORT) { 1221 log_print("connect %d try %d error %d", con->nodeid, 1222 con->retries, result); 1223 mutex_unlock(&con->sock_mutex); 1224 msleep(1000); 1225 lowcomms_connect_sock(con); 1226 return; 1227 } 1228 1229 out: 1230 mutex_unlock(&con->sock_mutex); 1231 } 1232 1233 /* Connect a new socket to its peer */ 1234 static void tcp_connect_to_sock(struct connection *con) 1235 { 1236 struct sockaddr_storage saddr, src_addr; 1237 unsigned int mark; 1238 int addr_len; 1239 struct socket *sock = NULL; 1240 int result; 1241 1242 mutex_lock(&con->sock_mutex); 1243 if (con->retries++ > MAX_CONNECT_RETRIES) 1244 goto out; 1245 1246 /* Some odd races can cause double-connects, ignore them */ 1247 if (con->sock) 1248 goto out; 1249 1250 /* Create a socket to communicate with */ 1251 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1252 SOCK_STREAM, IPPROTO_TCP, &sock); 1253 if (result < 0) 1254 goto out_err; 1255 1256 memset(&saddr, 0, sizeof(saddr)); 1257 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark); 1258 if (result < 0) { 1259 log_print("no address for nodeid %d", con->nodeid); 1260 goto out_err; 1261 } 1262 1263 sock_set_mark(sock->sk, mark); 1264 1265 add_sock(sock, con); 1266 1267 /* Bind to our cluster-known address connecting to avoid 1268 routing problems */ 1269 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1270 make_sockaddr(&src_addr, 0, &addr_len); 1271 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1272 addr_len); 1273 if (result < 0) { 1274 log_print("could not bind for connect: %d", result); 1275 /* This *may* not indicate a critical error */ 1276 } 1277 1278 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1279 1280 log_print_ratelimited("connecting to %d", con->nodeid); 1281 1282 /* Turn off Nagle's algorithm */ 1283 tcp_sock_set_nodelay(sock->sk); 1284 1285 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1286 O_NONBLOCK); 1287 if (result == -EINPROGRESS) 1288 result = 0; 1289 if (result == 0) 1290 goto out; 1291 1292 out_err: 1293 if (con->sock) { 1294 sock_release(con->sock); 1295 con->sock = NULL; 1296 } else if (sock) { 1297 sock_release(sock); 1298 } 1299 /* 1300 * Some errors are fatal and this list might need adjusting. For other 1301 * errors we try again until the max number of retries is reached. 1302 */ 1303 if (result != -EHOSTUNREACH && 1304 result != -ENETUNREACH && 1305 result != -ENETDOWN && 1306 result != -EINVAL && 1307 result != -EPROTONOSUPPORT) { 1308 log_print("connect %d try %d error %d", con->nodeid, 1309 con->retries, result); 1310 mutex_unlock(&con->sock_mutex); 1311 msleep(1000); 1312 lowcomms_connect_sock(con); 1313 return; 1314 } 1315 out: 1316 mutex_unlock(&con->sock_mutex); 1317 return; 1318 } 1319 1320 /* On error caller must run dlm_close_sock() for the 1321 * listen connection socket. 1322 */ 1323 static int tcp_create_listen_sock(struct listen_connection *con, 1324 struct sockaddr_storage *saddr) 1325 { 1326 struct socket *sock = NULL; 1327 int result = 0; 1328 int addr_len; 1329 1330 if (dlm_local_addr[0]->ss_family == AF_INET) 1331 addr_len = sizeof(struct sockaddr_in); 1332 else 1333 addr_len = sizeof(struct sockaddr_in6); 1334 1335 /* Create a socket to communicate with */ 1336 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1337 SOCK_STREAM, IPPROTO_TCP, &sock); 1338 if (result < 0) { 1339 log_print("Can't create listening comms socket"); 1340 goto create_out; 1341 } 1342 1343 sock_set_mark(sock->sk, dlm_config.ci_mark); 1344 1345 /* Turn off Nagle's algorithm */ 1346 tcp_sock_set_nodelay(sock->sk); 1347 1348 sock_set_reuseaddr(sock->sk); 1349 1350 add_listen_sock(sock, con); 1351 1352 /* Bind to our port */ 1353 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1354 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1355 if (result < 0) { 1356 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1357 goto create_out; 1358 } 1359 sock_set_keepalive(sock->sk); 1360 1361 result = sock->ops->listen(sock, 5); 1362 if (result < 0) { 1363 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1364 goto create_out; 1365 } 1366 1367 return 0; 1368 1369 create_out: 1370 return result; 1371 } 1372 1373 /* Get local addresses */ 1374 static void init_local(void) 1375 { 1376 struct sockaddr_storage sas, *addr; 1377 int i; 1378 1379 dlm_local_count = 0; 1380 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1381 if (dlm_our_addr(&sas, i)) 1382 break; 1383 1384 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS); 1385 if (!addr) 1386 break; 1387 dlm_local_addr[dlm_local_count++] = addr; 1388 } 1389 } 1390 1391 static void deinit_local(void) 1392 { 1393 int i; 1394 1395 for (i = 0; i < dlm_local_count; i++) 1396 kfree(dlm_local_addr[i]); 1397 } 1398 1399 /* Initialise SCTP socket and bind to all interfaces 1400 * On error caller must run dlm_close_sock() for the 1401 * listen connection socket. 1402 */ 1403 static int sctp_listen_for_all(struct listen_connection *con) 1404 { 1405 struct socket *sock = NULL; 1406 int result = -EINVAL; 1407 1408 log_print("Using SCTP for communications"); 1409 1410 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, 1411 SOCK_STREAM, IPPROTO_SCTP, &sock); 1412 if (result < 0) { 1413 log_print("Can't create comms socket, check SCTP is loaded"); 1414 goto out; 1415 } 1416 1417 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); 1418 sock_set_mark(sock->sk, dlm_config.ci_mark); 1419 sctp_sock_set_nodelay(sock->sk); 1420 1421 add_listen_sock(sock, con); 1422 1423 /* Bind to all addresses. */ 1424 result = sctp_bind_addrs(con->sock, dlm_config.ci_tcp_port); 1425 if (result < 0) 1426 goto out; 1427 1428 result = sock->ops->listen(sock, 5); 1429 if (result < 0) { 1430 log_print("Can't set socket listening"); 1431 goto out; 1432 } 1433 1434 return 0; 1435 1436 out: 1437 return result; 1438 } 1439 1440 static int tcp_listen_for_all(void) 1441 { 1442 /* We don't support multi-homed hosts */ 1443 if (dlm_local_count > 1) { 1444 log_print("TCP protocol can't handle multi-homed hosts, " 1445 "try SCTP"); 1446 return -EINVAL; 1447 } 1448 1449 log_print("Using TCP for communications"); 1450 1451 return tcp_create_listen_sock(&listen_con, dlm_local_addr[0]); 1452 } 1453 1454 1455 1456 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1457 gfp_t allocation) 1458 { 1459 struct writequeue_entry *entry; 1460 1461 entry = kzalloc(sizeof(*entry), allocation); 1462 if (!entry) 1463 return NULL; 1464 1465 entry->page = alloc_page(allocation | __GFP_ZERO); 1466 if (!entry->page) { 1467 kfree(entry); 1468 return NULL; 1469 } 1470 1471 entry->con = con; 1472 entry->users = 1; 1473 kref_init(&entry->ref); 1474 INIT_LIST_HEAD(&entry->msgs); 1475 1476 return entry; 1477 } 1478 1479 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, 1480 gfp_t allocation, char **ppc, 1481 void (*cb)(struct dlm_mhandle *mh), 1482 struct dlm_mhandle *mh) 1483 { 1484 struct writequeue_entry *e; 1485 1486 spin_lock(&con->writequeue_lock); 1487 if (!list_empty(&con->writequeue)) { 1488 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); 1489 if (DLM_WQ_REMAIN_BYTES(e) >= len) { 1490 kref_get(&e->ref); 1491 1492 *ppc = page_address(e->page) + e->end; 1493 if (cb) 1494 cb(mh); 1495 1496 e->end += len; 1497 e->users++; 1498 spin_unlock(&con->writequeue_lock); 1499 1500 return e; 1501 } 1502 } 1503 spin_unlock(&con->writequeue_lock); 1504 1505 e = new_writequeue_entry(con, allocation); 1506 if (!e) 1507 return NULL; 1508 1509 kref_get(&e->ref); 1510 *ppc = page_address(e->page); 1511 e->end += len; 1512 atomic_inc(&con->writequeue_cnt); 1513 1514 spin_lock(&con->writequeue_lock); 1515 if (cb) 1516 cb(mh); 1517 1518 list_add_tail(&e->list, &con->writequeue); 1519 spin_unlock(&con->writequeue_lock); 1520 1521 return e; 1522 }; 1523 1524 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, 1525 gfp_t allocation, char **ppc, 1526 void (*cb)(struct dlm_mhandle *mh), 1527 struct dlm_mhandle *mh) 1528 { 1529 struct writequeue_entry *e; 1530 struct dlm_msg *msg; 1531 1532 msg = kzalloc(sizeof(*msg), allocation); 1533 if (!msg) 1534 return NULL; 1535 1536 kref_init(&msg->ref); 1537 1538 e = new_wq_entry(con, len, allocation, ppc, cb, mh); 1539 if (!e) { 1540 kfree(msg); 1541 return NULL; 1542 } 1543 1544 msg->ppc = *ppc; 1545 msg->len = len; 1546 msg->entry = e; 1547 1548 return msg; 1549 } 1550 1551 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation, 1552 char **ppc, void (*cb)(struct dlm_mhandle *mh), 1553 struct dlm_mhandle *mh) 1554 { 1555 struct connection *con; 1556 struct dlm_msg *msg; 1557 int idx; 1558 1559 if (len > DLM_MAX_SOCKET_BUFSIZE || 1560 len < sizeof(struct dlm_header)) { 1561 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE); 1562 log_print("failed to allocate a buffer of size %d", len); 1563 WARN_ON(1); 1564 return NULL; 1565 } 1566 1567 idx = srcu_read_lock(&connections_srcu); 1568 con = nodeid2con(nodeid, allocation); 1569 if (!con) { 1570 srcu_read_unlock(&connections_srcu, idx); 1571 return NULL; 1572 } 1573 1574 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, mh); 1575 if (!msg) { 1576 srcu_read_unlock(&connections_srcu, idx); 1577 return NULL; 1578 } 1579 1580 /* we assume if successful commit must called */ 1581 msg->idx = idx; 1582 return msg; 1583 } 1584 1585 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1586 { 1587 struct writequeue_entry *e = msg->entry; 1588 struct connection *con = e->con; 1589 int users; 1590 1591 spin_lock(&con->writequeue_lock); 1592 kref_get(&msg->ref); 1593 list_add(&msg->list, &e->msgs); 1594 1595 users = --e->users; 1596 if (users) 1597 goto out; 1598 1599 e->len = DLM_WQ_LENGTH_BYTES(e); 1600 spin_unlock(&con->writequeue_lock); 1601 1602 queue_work(send_workqueue, &con->swork); 1603 return; 1604 1605 out: 1606 spin_unlock(&con->writequeue_lock); 1607 return; 1608 } 1609 1610 void dlm_lowcomms_commit_msg(struct dlm_msg *msg) 1611 { 1612 _dlm_lowcomms_commit_msg(msg); 1613 srcu_read_unlock(&connections_srcu, msg->idx); 1614 } 1615 1616 void dlm_lowcomms_put_msg(struct dlm_msg *msg) 1617 { 1618 kref_put(&msg->ref, dlm_msg_release); 1619 } 1620 1621 /* does not held connections_srcu, usage workqueue only */ 1622 int dlm_lowcomms_resend_msg(struct dlm_msg *msg) 1623 { 1624 struct dlm_msg *msg_resend; 1625 char *ppc; 1626 1627 if (msg->retransmit) 1628 return 1; 1629 1630 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, 1631 GFP_ATOMIC, &ppc, NULL, NULL); 1632 if (!msg_resend) 1633 return -ENOMEM; 1634 1635 msg->retransmit = true; 1636 kref_get(&msg->ref); 1637 msg_resend->orig_msg = msg; 1638 1639 memcpy(ppc, msg->ppc, msg->len); 1640 _dlm_lowcomms_commit_msg(msg_resend); 1641 dlm_lowcomms_put_msg(msg_resend); 1642 1643 return 0; 1644 } 1645 1646 /* Send a message */ 1647 static void send_to_sock(struct connection *con) 1648 { 1649 int ret = 0; 1650 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1651 struct writequeue_entry *e; 1652 int len, offset; 1653 int count = 0; 1654 1655 mutex_lock(&con->sock_mutex); 1656 if (con->sock == NULL) 1657 goto out_connect; 1658 1659 spin_lock(&con->writequeue_lock); 1660 for (;;) { 1661 if (list_empty(&con->writequeue)) 1662 break; 1663 1664 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 1665 len = e->len; 1666 offset = e->offset; 1667 BUG_ON(len == 0 && e->users == 0); 1668 spin_unlock(&con->writequeue_lock); 1669 1670 ret = 0; 1671 if (len) { 1672 ret = kernel_sendpage(con->sock, e->page, offset, len, 1673 msg_flags); 1674 if (ret == -EAGAIN || ret == 0) { 1675 if (ret == -EAGAIN && 1676 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && 1677 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1678 /* Notify TCP that we're limited by the 1679 * application window size. 1680 */ 1681 set_bit(SOCK_NOSPACE, &con->sock->flags); 1682 con->sock->sk->sk_write_pending++; 1683 } 1684 cond_resched(); 1685 goto out; 1686 } else if (ret < 0) 1687 goto out; 1688 } 1689 1690 /* Don't starve people filling buffers */ 1691 if (++count >= MAX_SEND_MSG_COUNT) { 1692 cond_resched(); 1693 count = 0; 1694 } 1695 1696 spin_lock(&con->writequeue_lock); 1697 writequeue_entry_complete(e, ret); 1698 } 1699 spin_unlock(&con->writequeue_lock); 1700 1701 /* close if we got EOF */ 1702 if (test_and_clear_bit(CF_EOF, &con->flags)) { 1703 mutex_unlock(&con->sock_mutex); 1704 close_connection(con, false, false, true); 1705 1706 /* handling for tcp shutdown */ 1707 clear_bit(CF_SHUTDOWN, &con->flags); 1708 wake_up(&con->shutdown_wait); 1709 } else { 1710 mutex_unlock(&con->sock_mutex); 1711 } 1712 1713 return; 1714 1715 out: 1716 mutex_unlock(&con->sock_mutex); 1717 return; 1718 1719 out_connect: 1720 mutex_unlock(&con->sock_mutex); 1721 queue_work(send_workqueue, &con->swork); 1722 cond_resched(); 1723 } 1724 1725 static void clean_one_writequeue(struct connection *con) 1726 { 1727 struct writequeue_entry *e, *safe; 1728 1729 spin_lock(&con->writequeue_lock); 1730 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1731 free_entry(e); 1732 } 1733 spin_unlock(&con->writequeue_lock); 1734 } 1735 1736 /* Called from recovery when it knows that a node has 1737 left the cluster */ 1738 int dlm_lowcomms_close(int nodeid) 1739 { 1740 struct connection *con; 1741 struct dlm_node_addr *na; 1742 int idx; 1743 1744 log_print("closing connection to node %d", nodeid); 1745 idx = srcu_read_lock(&connections_srcu); 1746 con = nodeid2con(nodeid, 0); 1747 if (con) { 1748 set_bit(CF_CLOSE, &con->flags); 1749 close_connection(con, true, true, true); 1750 clean_one_writequeue(con); 1751 if (con->othercon) 1752 clean_one_writequeue(con->othercon); 1753 } 1754 srcu_read_unlock(&connections_srcu, idx); 1755 1756 spin_lock(&dlm_node_addrs_spin); 1757 na = find_node_addr(nodeid); 1758 if (na) { 1759 list_del(&na->list); 1760 while (na->addr_count--) 1761 kfree(na->addr[na->addr_count]); 1762 kfree(na); 1763 } 1764 spin_unlock(&dlm_node_addrs_spin); 1765 1766 return 0; 1767 } 1768 1769 /* Receive workqueue function */ 1770 static void process_recv_sockets(struct work_struct *work) 1771 { 1772 struct connection *con = container_of(work, struct connection, rwork); 1773 int err; 1774 1775 clear_bit(CF_READ_PENDING, &con->flags); 1776 do { 1777 err = receive_from_sock(con); 1778 } while (!err); 1779 } 1780 1781 static void process_listen_recv_socket(struct work_struct *work) 1782 { 1783 accept_from_sock(&listen_con); 1784 } 1785 1786 /* Send workqueue function */ 1787 static void process_send_sockets(struct work_struct *work) 1788 { 1789 struct connection *con = container_of(work, struct connection, swork); 1790 1791 WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); 1792 1793 clear_bit(CF_WRITE_PENDING, &con->flags); 1794 1795 if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { 1796 close_connection(con, false, false, true); 1797 dlm_midcomms_unack_msg_resend(con->nodeid); 1798 } 1799 1800 if (con->sock == NULL) { /* not mutex protected so check it inside too */ 1801 if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) 1802 msleep(1000); 1803 con->connect_action(con); 1804 } 1805 if (!list_empty(&con->writequeue)) 1806 send_to_sock(con); 1807 } 1808 1809 static void work_stop(void) 1810 { 1811 if (recv_workqueue) { 1812 destroy_workqueue(recv_workqueue); 1813 recv_workqueue = NULL; 1814 } 1815 1816 if (send_workqueue) { 1817 destroy_workqueue(send_workqueue); 1818 send_workqueue = NULL; 1819 } 1820 } 1821 1822 static int work_start(void) 1823 { 1824 recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM); 1825 if (!recv_workqueue) { 1826 log_print("can't start dlm_recv"); 1827 return -ENOMEM; 1828 } 1829 1830 send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM); 1831 if (!send_workqueue) { 1832 log_print("can't start dlm_send"); 1833 destroy_workqueue(recv_workqueue); 1834 recv_workqueue = NULL; 1835 return -ENOMEM; 1836 } 1837 1838 return 0; 1839 } 1840 1841 static void shutdown_conn(struct connection *con) 1842 { 1843 if (con->shutdown_action) 1844 con->shutdown_action(con); 1845 } 1846 1847 void dlm_lowcomms_shutdown(void) 1848 { 1849 int idx; 1850 1851 /* Set all the flags to prevent any 1852 * socket activity. 1853 */ 1854 dlm_allow_conn = 0; 1855 1856 if (recv_workqueue) 1857 flush_workqueue(recv_workqueue); 1858 if (send_workqueue) 1859 flush_workqueue(send_workqueue); 1860 1861 dlm_close_sock(&listen_con.sock); 1862 1863 idx = srcu_read_lock(&connections_srcu); 1864 foreach_conn(shutdown_conn); 1865 srcu_read_unlock(&connections_srcu, idx); 1866 } 1867 1868 static void _stop_conn(struct connection *con, bool and_other) 1869 { 1870 mutex_lock(&con->sock_mutex); 1871 set_bit(CF_CLOSE, &con->flags); 1872 set_bit(CF_READ_PENDING, &con->flags); 1873 set_bit(CF_WRITE_PENDING, &con->flags); 1874 if (con->sock && con->sock->sk) { 1875 write_lock_bh(&con->sock->sk->sk_callback_lock); 1876 con->sock->sk->sk_user_data = NULL; 1877 write_unlock_bh(&con->sock->sk->sk_callback_lock); 1878 } 1879 if (con->othercon && and_other) 1880 _stop_conn(con->othercon, false); 1881 mutex_unlock(&con->sock_mutex); 1882 } 1883 1884 static void stop_conn(struct connection *con) 1885 { 1886 _stop_conn(con, true); 1887 } 1888 1889 static void connection_release(struct rcu_head *rcu) 1890 { 1891 struct connection *con = container_of(rcu, struct connection, rcu); 1892 1893 kfree(con->rx_buf); 1894 kfree(con); 1895 } 1896 1897 static void free_conn(struct connection *con) 1898 { 1899 close_connection(con, true, true, true); 1900 spin_lock(&connections_lock); 1901 hlist_del_rcu(&con->list); 1902 spin_unlock(&connections_lock); 1903 if (con->othercon) { 1904 clean_one_writequeue(con->othercon); 1905 call_srcu(&connections_srcu, &con->othercon->rcu, 1906 connection_release); 1907 } 1908 clean_one_writequeue(con); 1909 call_srcu(&connections_srcu, &con->rcu, connection_release); 1910 } 1911 1912 static void work_flush(void) 1913 { 1914 int ok; 1915 int i; 1916 struct connection *con; 1917 1918 do { 1919 ok = 1; 1920 foreach_conn(stop_conn); 1921 if (recv_workqueue) 1922 flush_workqueue(recv_workqueue); 1923 if (send_workqueue) 1924 flush_workqueue(send_workqueue); 1925 for (i = 0; i < CONN_HASH_SIZE && ok; i++) { 1926 hlist_for_each_entry_rcu(con, &connection_hash[i], 1927 list) { 1928 ok &= test_bit(CF_READ_PENDING, &con->flags); 1929 ok &= test_bit(CF_WRITE_PENDING, &con->flags); 1930 if (con->othercon) { 1931 ok &= test_bit(CF_READ_PENDING, 1932 &con->othercon->flags); 1933 ok &= test_bit(CF_WRITE_PENDING, 1934 &con->othercon->flags); 1935 } 1936 } 1937 } 1938 } while (!ok); 1939 } 1940 1941 void dlm_lowcomms_stop(void) 1942 { 1943 int idx; 1944 1945 idx = srcu_read_lock(&connections_srcu); 1946 work_flush(); 1947 foreach_conn(free_conn); 1948 srcu_read_unlock(&connections_srcu, idx); 1949 work_stop(); 1950 deinit_local(); 1951 } 1952 1953 int dlm_lowcomms_start(void) 1954 { 1955 int error = -EINVAL; 1956 int i; 1957 1958 for (i = 0; i < CONN_HASH_SIZE; i++) 1959 INIT_HLIST_HEAD(&connection_hash[i]); 1960 1961 init_local(); 1962 if (!dlm_local_count) { 1963 error = -ENOTCONN; 1964 log_print("no local IP address has been set"); 1965 goto fail; 1966 } 1967 1968 INIT_WORK(&listen_con.rwork, process_listen_recv_socket); 1969 1970 error = work_start(); 1971 if (error) 1972 goto fail_local; 1973 1974 dlm_allow_conn = 1; 1975 1976 /* Start listening */ 1977 switch (dlm_config.ci_protocol) { 1978 case DLM_PROTO_TCP: 1979 error = tcp_listen_for_all(); 1980 break; 1981 case DLM_PROTO_SCTP: 1982 error = sctp_listen_for_all(&listen_con); 1983 break; 1984 default: 1985 log_print("Invalid protocol identifier %d set", 1986 dlm_config.ci_protocol); 1987 error = -EINVAL; 1988 break; 1989 } 1990 if (error) 1991 goto fail_unlisten; 1992 1993 return 0; 1994 1995 fail_unlisten: 1996 dlm_allow_conn = 0; 1997 dlm_close_sock(&listen_con.sock); 1998 work_stop(); 1999 fail_local: 2000 deinit_local(); 2001 fail: 2002 return error; 2003 } 2004 2005 void dlm_lowcomms_exit(void) 2006 { 2007 struct dlm_node_addr *na, *safe; 2008 2009 spin_lock(&dlm_node_addrs_spin); 2010 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 2011 list_del(&na->list); 2012 while (na->addr_count--) 2013 kfree(na->addr[na->addr_count]); 2014 kfree(na); 2015 } 2016 spin_unlock(&dlm_node_addrs_spin); 2017 } 2018