1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 /* 15 * lowcomms.c 16 * 17 * This is the "low-level" comms layer. 18 * 19 * It is responsible for sending/receiving messages 20 * from other nodes in the cluster. 21 * 22 * Cluster nodes are referred to by their nodeids. nodeids are 23 * simply 32 bit numbers to the locking module - if they need to 24 * be expanded for the cluster infrastructure then that is its 25 * responsibility. It is this layer's 26 * responsibility to resolve these into IP address or 27 * whatever it needs for inter-node communication. 28 * 29 * The comms level is two kernel threads that deal mainly with 30 * the receiving of messages from other nodes and passing them 31 * up to the mid-level comms layer (which understands the 32 * message format) for execution by the locking core, and 33 * a send thread which does all the setting up of connections 34 * to remote nodes and the sending of data. Threads are not allowed 35 * to send their own data because it may cause them to wait in times 36 * of high load. Also, this way, the sending thread can collect together 37 * messages bound for one node and send them in one block. 38 * 39 * lowcomms will choose to use either TCP or SCTP as its transport layer 40 * depending on the configuration variable 'protocol'. This should be set 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 43 * for the DLM to function. 44 * 45 */ 46 47 #include <asm/ioctls.h> 48 #include <net/sock.h> 49 #include <net/tcp.h> 50 #include <linux/pagemap.h> 51 #include <linux/file.h> 52 #include <linux/mutex.h> 53 #include <linux/sctp.h> 54 #include <linux/slab.h> 55 #include <net/sctp/sctp.h> 56 #include <net/sctp/user.h> 57 #include <net/ipv6.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "config.h" 63 64 #define NEEDED_RMEM (4*1024*1024) 65 #define CONN_HASH_SIZE 32 66 67 /* Number of messages to send before rescheduling */ 68 #define MAX_SEND_MSG_COUNT 25 69 70 struct cbuf { 71 unsigned int base; 72 unsigned int len; 73 unsigned int mask; 74 }; 75 76 static void cbuf_add(struct cbuf *cb, int n) 77 { 78 cb->len += n; 79 } 80 81 static int cbuf_data(struct cbuf *cb) 82 { 83 return ((cb->base + cb->len) & cb->mask); 84 } 85 86 static void cbuf_init(struct cbuf *cb, int size) 87 { 88 cb->base = cb->len = 0; 89 cb->mask = size-1; 90 } 91 92 static void cbuf_eat(struct cbuf *cb, int n) 93 { 94 cb->len -= n; 95 cb->base += n; 96 cb->base &= cb->mask; 97 } 98 99 static bool cbuf_empty(struct cbuf *cb) 100 { 101 return cb->len == 0; 102 } 103 104 struct connection { 105 struct socket *sock; /* NULL if not connected */ 106 uint32_t nodeid; /* So we know who we are in the list */ 107 struct mutex sock_mutex; 108 unsigned long flags; 109 #define CF_READ_PENDING 1 110 #define CF_WRITE_PENDING 2 111 #define CF_CONNECT_PENDING 3 112 #define CF_INIT_PENDING 4 113 #define CF_IS_OTHERCON 5 114 #define CF_CLOSE 6 115 #define CF_APP_LIMITED 7 116 struct list_head writequeue; /* List of outgoing writequeue_entries */ 117 spinlock_t writequeue_lock; 118 int (*rx_action) (struct connection *); /* What to do when active */ 119 void (*connect_action) (struct connection *); /* What to do to connect */ 120 struct page *rx_page; 121 struct cbuf cb; 122 int retries; 123 #define MAX_CONNECT_RETRIES 3 124 int sctp_assoc; 125 struct hlist_node list; 126 struct connection *othercon; 127 struct work_struct rwork; /* Receive workqueue */ 128 struct work_struct swork; /* Send workqueue */ 129 }; 130 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 131 132 /* An entry waiting to be sent */ 133 struct writequeue_entry { 134 struct list_head list; 135 struct page *page; 136 int offset; 137 int len; 138 int end; 139 int users; 140 struct connection *con; 141 }; 142 143 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 144 static int dlm_local_count; 145 static int dlm_allow_conn; 146 147 /* Work queues */ 148 static struct workqueue_struct *recv_workqueue; 149 static struct workqueue_struct *send_workqueue; 150 151 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 152 static DEFINE_MUTEX(connections_lock); 153 static struct kmem_cache *con_cache; 154 155 static void process_recv_sockets(struct work_struct *work); 156 static void process_send_sockets(struct work_struct *work); 157 158 159 /* This is deliberately very simple because most clusters have simple 160 sequential nodeids, so we should be able to go straight to a connection 161 struct in the array */ 162 static inline int nodeid_hash(int nodeid) 163 { 164 return nodeid & (CONN_HASH_SIZE-1); 165 } 166 167 static struct connection *__find_con(int nodeid) 168 { 169 int r; 170 struct hlist_node *h; 171 struct connection *con; 172 173 r = nodeid_hash(nodeid); 174 175 hlist_for_each_entry(con, h, &connection_hash[r], list) { 176 if (con->nodeid == nodeid) 177 return con; 178 } 179 return NULL; 180 } 181 182 /* 183 * If 'allocation' is zero then we don't attempt to create a new 184 * connection structure for this node. 185 */ 186 static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 187 { 188 struct connection *con = NULL; 189 int r; 190 191 con = __find_con(nodeid); 192 if (con || !alloc) 193 return con; 194 195 con = kmem_cache_zalloc(con_cache, alloc); 196 if (!con) 197 return NULL; 198 199 r = nodeid_hash(nodeid); 200 hlist_add_head(&con->list, &connection_hash[r]); 201 202 con->nodeid = nodeid; 203 mutex_init(&con->sock_mutex); 204 INIT_LIST_HEAD(&con->writequeue); 205 spin_lock_init(&con->writequeue_lock); 206 INIT_WORK(&con->swork, process_send_sockets); 207 INIT_WORK(&con->rwork, process_recv_sockets); 208 209 /* Setup action pointers for child sockets */ 210 if (con->nodeid) { 211 struct connection *zerocon = __find_con(0); 212 213 con->connect_action = zerocon->connect_action; 214 if (!con->rx_action) 215 con->rx_action = zerocon->rx_action; 216 } 217 218 return con; 219 } 220 221 /* Loop round all connections */ 222 static void foreach_conn(void (*conn_func)(struct connection *c)) 223 { 224 int i; 225 struct hlist_node *h, *n; 226 struct connection *con; 227 228 for (i = 0; i < CONN_HASH_SIZE; i++) { 229 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ 230 conn_func(con); 231 } 232 } 233 } 234 235 static struct connection *nodeid2con(int nodeid, gfp_t allocation) 236 { 237 struct connection *con; 238 239 mutex_lock(&connections_lock); 240 con = __nodeid2con(nodeid, allocation); 241 mutex_unlock(&connections_lock); 242 243 return con; 244 } 245 246 /* This is a bit drastic, but only called when things go wrong */ 247 static struct connection *assoc2con(int assoc_id) 248 { 249 int i; 250 struct hlist_node *h; 251 struct connection *con; 252 253 mutex_lock(&connections_lock); 254 255 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 256 hlist_for_each_entry(con, h, &connection_hash[i], list) { 257 if (con->sctp_assoc == assoc_id) { 258 mutex_unlock(&connections_lock); 259 return con; 260 } 261 } 262 } 263 mutex_unlock(&connections_lock); 264 return NULL; 265 } 266 267 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr) 268 { 269 struct sockaddr_storage addr; 270 int error; 271 272 if (!dlm_local_count) 273 return -1; 274 275 error = dlm_nodeid_to_addr(nodeid, &addr); 276 if (error) 277 return error; 278 279 if (dlm_local_addr[0]->ss_family == AF_INET) { 280 struct sockaddr_in *in4 = (struct sockaddr_in *) &addr; 281 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr; 282 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 283 } else { 284 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &addr; 285 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr; 286 ret6->sin6_addr = in6->sin6_addr; 287 } 288 289 return 0; 290 } 291 292 /* Data available on socket or listen socket received a connect */ 293 static void lowcomms_data_ready(struct sock *sk, int count_unused) 294 { 295 struct connection *con = sock2con(sk); 296 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 297 queue_work(recv_workqueue, &con->rwork); 298 } 299 300 static void lowcomms_write_space(struct sock *sk) 301 { 302 struct connection *con = sock2con(sk); 303 304 if (!con) 305 return; 306 307 clear_bit(SOCK_NOSPACE, &con->sock->flags); 308 309 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 310 con->sock->sk->sk_write_pending--; 311 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags); 312 } 313 314 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 315 queue_work(send_workqueue, &con->swork); 316 } 317 318 static inline void lowcomms_connect_sock(struct connection *con) 319 { 320 if (test_bit(CF_CLOSE, &con->flags)) 321 return; 322 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 323 queue_work(send_workqueue, &con->swork); 324 } 325 326 static void lowcomms_state_change(struct sock *sk) 327 { 328 if (sk->sk_state == TCP_ESTABLISHED) 329 lowcomms_write_space(sk); 330 } 331 332 int dlm_lowcomms_connect_node(int nodeid) 333 { 334 struct connection *con; 335 336 /* with sctp there's no connecting without sending */ 337 if (dlm_config.ci_protocol != 0) 338 return 0; 339 340 if (nodeid == dlm_our_nodeid()) 341 return 0; 342 343 con = nodeid2con(nodeid, GFP_NOFS); 344 if (!con) 345 return -ENOMEM; 346 lowcomms_connect_sock(con); 347 return 0; 348 } 349 350 /* Make a socket active */ 351 static int add_sock(struct socket *sock, struct connection *con) 352 { 353 con->sock = sock; 354 355 /* Install a data_ready callback */ 356 con->sock->sk->sk_data_ready = lowcomms_data_ready; 357 con->sock->sk->sk_write_space = lowcomms_write_space; 358 con->sock->sk->sk_state_change = lowcomms_state_change; 359 con->sock->sk->sk_user_data = con; 360 con->sock->sk->sk_allocation = GFP_NOFS; 361 return 0; 362 } 363 364 /* Add the port number to an IPv6 or 4 sockaddr and return the address 365 length */ 366 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 367 int *addr_len) 368 { 369 saddr->ss_family = dlm_local_addr[0]->ss_family; 370 if (saddr->ss_family == AF_INET) { 371 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 372 in4_addr->sin_port = cpu_to_be16(port); 373 *addr_len = sizeof(struct sockaddr_in); 374 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 375 } else { 376 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 377 in6_addr->sin6_port = cpu_to_be16(port); 378 *addr_len = sizeof(struct sockaddr_in6); 379 } 380 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 381 } 382 383 /* Close a remote connection and tidy up */ 384 static void close_connection(struct connection *con, bool and_other) 385 { 386 mutex_lock(&con->sock_mutex); 387 388 if (con->sock) { 389 sock_release(con->sock); 390 con->sock = NULL; 391 } 392 if (con->othercon && and_other) { 393 /* Will only re-enter once. */ 394 close_connection(con->othercon, false); 395 } 396 if (con->rx_page) { 397 __free_page(con->rx_page); 398 con->rx_page = NULL; 399 } 400 401 con->retries = 0; 402 mutex_unlock(&con->sock_mutex); 403 } 404 405 /* We only send shutdown messages to nodes that are not part of the cluster */ 406 static void sctp_send_shutdown(sctp_assoc_t associd) 407 { 408 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 409 struct msghdr outmessage; 410 struct cmsghdr *cmsg; 411 struct sctp_sndrcvinfo *sinfo; 412 int ret; 413 struct connection *con; 414 415 con = nodeid2con(0,0); 416 BUG_ON(con == NULL); 417 418 outmessage.msg_name = NULL; 419 outmessage.msg_namelen = 0; 420 outmessage.msg_control = outcmsg; 421 outmessage.msg_controllen = sizeof(outcmsg); 422 outmessage.msg_flags = MSG_EOR; 423 424 cmsg = CMSG_FIRSTHDR(&outmessage); 425 cmsg->cmsg_level = IPPROTO_SCTP; 426 cmsg->cmsg_type = SCTP_SNDRCV; 427 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 428 outmessage.msg_controllen = cmsg->cmsg_len; 429 sinfo = CMSG_DATA(cmsg); 430 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 431 432 sinfo->sinfo_flags |= MSG_EOF; 433 sinfo->sinfo_assoc_id = associd; 434 435 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 436 437 if (ret != 0) 438 log_print("send EOF to node failed: %d", ret); 439 } 440 441 static void sctp_init_failed_foreach(struct connection *con) 442 { 443 con->sctp_assoc = 0; 444 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 445 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 446 queue_work(send_workqueue, &con->swork); 447 } 448 } 449 450 /* INIT failed but we don't know which node... 451 restart INIT on all pending nodes */ 452 static void sctp_init_failed(void) 453 { 454 mutex_lock(&connections_lock); 455 456 foreach_conn(sctp_init_failed_foreach); 457 458 mutex_unlock(&connections_lock); 459 } 460 461 /* Something happened to an association */ 462 static void process_sctp_notification(struct connection *con, 463 struct msghdr *msg, char *buf) 464 { 465 union sctp_notification *sn = (union sctp_notification *)buf; 466 467 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 468 switch (sn->sn_assoc_change.sac_state) { 469 470 case SCTP_COMM_UP: 471 case SCTP_RESTART: 472 { 473 /* Check that the new node is in the lockspace */ 474 struct sctp_prim prim; 475 int nodeid; 476 int prim_len, ret; 477 int addr_len; 478 struct connection *new_con; 479 480 /* 481 * We get this before any data for an association. 482 * We verify that the node is in the cluster and 483 * then peel off a socket for it. 484 */ 485 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 486 log_print("COMM_UP for invalid assoc ID %d", 487 (int)sn->sn_assoc_change.sac_assoc_id); 488 sctp_init_failed(); 489 return; 490 } 491 memset(&prim, 0, sizeof(struct sctp_prim)); 492 prim_len = sizeof(struct sctp_prim); 493 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 494 495 ret = kernel_getsockopt(con->sock, 496 IPPROTO_SCTP, 497 SCTP_PRIMARY_ADDR, 498 (char*)&prim, 499 &prim_len); 500 if (ret < 0) { 501 log_print("getsockopt/sctp_primary_addr on " 502 "new assoc %d failed : %d", 503 (int)sn->sn_assoc_change.sac_assoc_id, 504 ret); 505 506 /* Retry INIT later */ 507 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 508 if (new_con) 509 clear_bit(CF_CONNECT_PENDING, &con->flags); 510 return; 511 } 512 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 513 if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 514 unsigned char *b=(unsigned char *)&prim.ssp_addr; 515 log_print("reject connect from unknown addr"); 516 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 517 b, sizeof(struct sockaddr_storage)); 518 sctp_send_shutdown(prim.ssp_assoc_id); 519 return; 520 } 521 522 new_con = nodeid2con(nodeid, GFP_NOFS); 523 if (!new_con) 524 return; 525 526 /* Peel off a new sock */ 527 sctp_lock_sock(con->sock->sk); 528 ret = sctp_do_peeloff(con->sock->sk, 529 sn->sn_assoc_change.sac_assoc_id, 530 &new_con->sock); 531 sctp_release_sock(con->sock->sk); 532 if (ret < 0) { 533 log_print("Can't peel off a socket for " 534 "connection %d to node %d: err=%d", 535 (int)sn->sn_assoc_change.sac_assoc_id, 536 nodeid, ret); 537 return; 538 } 539 add_sock(new_con->sock, new_con); 540 541 log_print("connecting to %d sctp association %d", 542 nodeid, (int)sn->sn_assoc_change.sac_assoc_id); 543 544 /* Send any pending writes */ 545 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 546 clear_bit(CF_INIT_PENDING, &con->flags); 547 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 548 queue_work(send_workqueue, &new_con->swork); 549 } 550 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 551 queue_work(recv_workqueue, &new_con->rwork); 552 } 553 break; 554 555 case SCTP_COMM_LOST: 556 case SCTP_SHUTDOWN_COMP: 557 { 558 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 559 if (con) { 560 con->sctp_assoc = 0; 561 } 562 } 563 break; 564 565 /* We don't know which INIT failed, so clear the PENDING flags 566 * on them all. if assoc_id is zero then it will then try 567 * again */ 568 569 case SCTP_CANT_STR_ASSOC: 570 { 571 log_print("Can't start SCTP association - retrying"); 572 sctp_init_failed(); 573 } 574 break; 575 576 default: 577 log_print("unexpected SCTP assoc change id=%d state=%d", 578 (int)sn->sn_assoc_change.sac_assoc_id, 579 sn->sn_assoc_change.sac_state); 580 } 581 } 582 } 583 584 /* Data received from remote end */ 585 static int receive_from_sock(struct connection *con) 586 { 587 int ret = 0; 588 struct msghdr msg = {}; 589 struct kvec iov[2]; 590 unsigned len; 591 int r; 592 int call_again_soon = 0; 593 int nvec; 594 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 595 596 mutex_lock(&con->sock_mutex); 597 598 if (con->sock == NULL) { 599 ret = -EAGAIN; 600 goto out_close; 601 } 602 603 if (con->rx_page == NULL) { 604 /* 605 * This doesn't need to be atomic, but I think it should 606 * improve performance if it is. 607 */ 608 con->rx_page = alloc_page(GFP_ATOMIC); 609 if (con->rx_page == NULL) 610 goto out_resched; 611 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 612 } 613 614 /* Only SCTP needs these really */ 615 memset(&incmsg, 0, sizeof(incmsg)); 616 msg.msg_control = incmsg; 617 msg.msg_controllen = sizeof(incmsg); 618 619 /* 620 * iov[0] is the bit of the circular buffer between the current end 621 * point (cb.base + cb.len) and the end of the buffer. 622 */ 623 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 624 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 625 iov[1].iov_len = 0; 626 nvec = 1; 627 628 /* 629 * iov[1] is the bit of the circular buffer between the start of the 630 * buffer and the start of the currently used section (cb.base) 631 */ 632 if (cbuf_data(&con->cb) >= con->cb.base) { 633 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 634 iov[1].iov_len = con->cb.base; 635 iov[1].iov_base = page_address(con->rx_page); 636 nvec = 2; 637 } 638 len = iov[0].iov_len + iov[1].iov_len; 639 640 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 641 MSG_DONTWAIT | MSG_NOSIGNAL); 642 if (ret <= 0) 643 goto out_close; 644 645 /* Process SCTP notifications */ 646 if (msg.msg_flags & MSG_NOTIFICATION) { 647 msg.msg_control = incmsg; 648 msg.msg_controllen = sizeof(incmsg); 649 650 process_sctp_notification(con, &msg, 651 page_address(con->rx_page) + con->cb.base); 652 mutex_unlock(&con->sock_mutex); 653 return 0; 654 } 655 BUG_ON(con->nodeid == 0); 656 657 if (ret == len) 658 call_again_soon = 1; 659 cbuf_add(&con->cb, ret); 660 ret = dlm_process_incoming_buffer(con->nodeid, 661 page_address(con->rx_page), 662 con->cb.base, con->cb.len, 663 PAGE_CACHE_SIZE); 664 if (ret == -EBADMSG) { 665 log_print("lowcomms: addr=%p, base=%u, len=%u, " 666 "iov_len=%u, iov_base[0]=%p, read=%d", 667 page_address(con->rx_page), con->cb.base, con->cb.len, 668 len, iov[0].iov_base, r); 669 } 670 if (ret < 0) 671 goto out_close; 672 cbuf_eat(&con->cb, ret); 673 674 if (cbuf_empty(&con->cb) && !call_again_soon) { 675 __free_page(con->rx_page); 676 con->rx_page = NULL; 677 } 678 679 if (call_again_soon) 680 goto out_resched; 681 mutex_unlock(&con->sock_mutex); 682 return 0; 683 684 out_resched: 685 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 686 queue_work(recv_workqueue, &con->rwork); 687 mutex_unlock(&con->sock_mutex); 688 return -EAGAIN; 689 690 out_close: 691 mutex_unlock(&con->sock_mutex); 692 if (ret != -EAGAIN) { 693 close_connection(con, false); 694 /* Reconnect when there is something to send */ 695 } 696 /* Don't return success if we really got EOF */ 697 if (ret == 0) 698 ret = -EAGAIN; 699 700 return ret; 701 } 702 703 /* Listening socket is busy, accept a connection */ 704 static int tcp_accept_from_sock(struct connection *con) 705 { 706 int result; 707 struct sockaddr_storage peeraddr; 708 struct socket *newsock; 709 int len; 710 int nodeid; 711 struct connection *newcon; 712 struct connection *addcon; 713 714 mutex_lock(&connections_lock); 715 if (!dlm_allow_conn) { 716 mutex_unlock(&connections_lock); 717 return -1; 718 } 719 mutex_unlock(&connections_lock); 720 721 memset(&peeraddr, 0, sizeof(peeraddr)); 722 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 723 IPPROTO_TCP, &newsock); 724 if (result < 0) 725 return -ENOMEM; 726 727 mutex_lock_nested(&con->sock_mutex, 0); 728 729 result = -ENOTCONN; 730 if (con->sock == NULL) 731 goto accept_err; 732 733 newsock->type = con->sock->type; 734 newsock->ops = con->sock->ops; 735 736 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 737 if (result < 0) 738 goto accept_err; 739 740 /* Get the connected socket's peer */ 741 memset(&peeraddr, 0, sizeof(peeraddr)); 742 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 743 &len, 2)) { 744 result = -ECONNABORTED; 745 goto accept_err; 746 } 747 748 /* Get the new node's NODEID */ 749 make_sockaddr(&peeraddr, 0, &len); 750 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) { 751 unsigned char *b=(unsigned char *)&peeraddr; 752 log_print("connect from non cluster node"); 753 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 754 b, sizeof(struct sockaddr_storage)); 755 sock_release(newsock); 756 mutex_unlock(&con->sock_mutex); 757 return -1; 758 } 759 760 log_print("got connection from %d", nodeid); 761 762 /* Check to see if we already have a connection to this node. This 763 * could happen if the two nodes initiate a connection at roughly 764 * the same time and the connections cross on the wire. 765 * In this case we store the incoming one in "othercon" 766 */ 767 newcon = nodeid2con(nodeid, GFP_NOFS); 768 if (!newcon) { 769 result = -ENOMEM; 770 goto accept_err; 771 } 772 mutex_lock_nested(&newcon->sock_mutex, 1); 773 if (newcon->sock) { 774 struct connection *othercon = newcon->othercon; 775 776 if (!othercon) { 777 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 778 if (!othercon) { 779 log_print("failed to allocate incoming socket"); 780 mutex_unlock(&newcon->sock_mutex); 781 result = -ENOMEM; 782 goto accept_err; 783 } 784 othercon->nodeid = nodeid; 785 othercon->rx_action = receive_from_sock; 786 mutex_init(&othercon->sock_mutex); 787 INIT_WORK(&othercon->swork, process_send_sockets); 788 INIT_WORK(&othercon->rwork, process_recv_sockets); 789 set_bit(CF_IS_OTHERCON, &othercon->flags); 790 } 791 if (!othercon->sock) { 792 newcon->othercon = othercon; 793 othercon->sock = newsock; 794 newsock->sk->sk_user_data = othercon; 795 add_sock(newsock, othercon); 796 addcon = othercon; 797 } 798 else { 799 printk("Extra connection from node %d attempted\n", nodeid); 800 result = -EAGAIN; 801 mutex_unlock(&newcon->sock_mutex); 802 goto accept_err; 803 } 804 } 805 else { 806 newsock->sk->sk_user_data = newcon; 807 newcon->rx_action = receive_from_sock; 808 add_sock(newsock, newcon); 809 addcon = newcon; 810 } 811 812 mutex_unlock(&newcon->sock_mutex); 813 814 /* 815 * Add it to the active queue in case we got data 816 * between processing the accept adding the socket 817 * to the read_sockets list 818 */ 819 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 820 queue_work(recv_workqueue, &addcon->rwork); 821 mutex_unlock(&con->sock_mutex); 822 823 return 0; 824 825 accept_err: 826 mutex_unlock(&con->sock_mutex); 827 sock_release(newsock); 828 829 if (result != -EAGAIN) 830 log_print("error accepting connection from node: %d", result); 831 return result; 832 } 833 834 static void free_entry(struct writequeue_entry *e) 835 { 836 __free_page(e->page); 837 kfree(e); 838 } 839 840 /* Initiate an SCTP association. 841 This is a special case of send_to_sock() in that we don't yet have a 842 peeled-off socket for this association, so we use the listening socket 843 and add the primary IP address of the remote node. 844 */ 845 static void sctp_init_assoc(struct connection *con) 846 { 847 struct sockaddr_storage rem_addr; 848 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 849 struct msghdr outmessage; 850 struct cmsghdr *cmsg; 851 struct sctp_sndrcvinfo *sinfo; 852 struct connection *base_con; 853 struct writequeue_entry *e; 854 int len, offset; 855 int ret; 856 int addrlen; 857 struct kvec iov[1]; 858 859 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 860 return; 861 862 if (con->retries++ > MAX_CONNECT_RETRIES) 863 return; 864 865 if (nodeid_to_addr(con->nodeid, (struct sockaddr *)&rem_addr)) { 866 log_print("no address for nodeid %d", con->nodeid); 867 return; 868 } 869 base_con = nodeid2con(0, 0); 870 BUG_ON(base_con == NULL); 871 872 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 873 874 outmessage.msg_name = &rem_addr; 875 outmessage.msg_namelen = addrlen; 876 outmessage.msg_control = outcmsg; 877 outmessage.msg_controllen = sizeof(outcmsg); 878 outmessage.msg_flags = MSG_EOR; 879 880 spin_lock(&con->writequeue_lock); 881 882 if (list_empty(&con->writequeue)) { 883 spin_unlock(&con->writequeue_lock); 884 log_print("writequeue empty for nodeid %d", con->nodeid); 885 return; 886 } 887 888 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 889 len = e->len; 890 offset = e->offset; 891 spin_unlock(&con->writequeue_lock); 892 893 /* Send the first block off the write queue */ 894 iov[0].iov_base = page_address(e->page)+offset; 895 iov[0].iov_len = len; 896 897 cmsg = CMSG_FIRSTHDR(&outmessage); 898 cmsg->cmsg_level = IPPROTO_SCTP; 899 cmsg->cmsg_type = SCTP_SNDRCV; 900 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 901 sinfo = CMSG_DATA(cmsg); 902 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 903 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 904 outmessage.msg_controllen = cmsg->cmsg_len; 905 906 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 907 if (ret < 0) { 908 log_print("Send first packet to node %d failed: %d", 909 con->nodeid, ret); 910 911 /* Try again later */ 912 clear_bit(CF_CONNECT_PENDING, &con->flags); 913 clear_bit(CF_INIT_PENDING, &con->flags); 914 } 915 else { 916 spin_lock(&con->writequeue_lock); 917 e->offset += ret; 918 e->len -= ret; 919 920 if (e->len == 0 && e->users == 0) { 921 list_del(&e->list); 922 free_entry(e); 923 } 924 spin_unlock(&con->writequeue_lock); 925 } 926 } 927 928 /* Connect a new socket to its peer */ 929 static void tcp_connect_to_sock(struct connection *con) 930 { 931 int result = -EHOSTUNREACH; 932 struct sockaddr_storage saddr, src_addr; 933 int addr_len; 934 struct socket *sock = NULL; 935 int one = 1; 936 937 if (con->nodeid == 0) { 938 log_print("attempt to connect sock 0 foiled"); 939 return; 940 } 941 942 mutex_lock(&con->sock_mutex); 943 if (con->retries++ > MAX_CONNECT_RETRIES) 944 goto out; 945 946 /* Some odd races can cause double-connects, ignore them */ 947 if (con->sock) { 948 result = 0; 949 goto out; 950 } 951 952 /* Create a socket to communicate with */ 953 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 954 IPPROTO_TCP, &sock); 955 if (result < 0) 956 goto out_err; 957 958 memset(&saddr, 0, sizeof(saddr)); 959 if (dlm_nodeid_to_addr(con->nodeid, &saddr)) 960 goto out_err; 961 962 sock->sk->sk_user_data = con; 963 con->rx_action = receive_from_sock; 964 con->connect_action = tcp_connect_to_sock; 965 add_sock(sock, con); 966 967 /* Bind to our cluster-known address connecting to avoid 968 routing problems */ 969 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 970 make_sockaddr(&src_addr, 0, &addr_len); 971 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 972 addr_len); 973 if (result < 0) { 974 log_print("could not bind for connect: %d", result); 975 /* This *may* not indicate a critical error */ 976 } 977 978 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 979 980 log_print("connecting to %d", con->nodeid); 981 982 /* Turn off Nagle's algorithm */ 983 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 984 sizeof(one)); 985 986 result = 987 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 988 O_NONBLOCK); 989 if (result == -EINPROGRESS) 990 result = 0; 991 if (result == 0) 992 goto out; 993 994 out_err: 995 if (con->sock) { 996 sock_release(con->sock); 997 con->sock = NULL; 998 } else if (sock) { 999 sock_release(sock); 1000 } 1001 /* 1002 * Some errors are fatal and this list might need adjusting. For other 1003 * errors we try again until the max number of retries is reached. 1004 */ 1005 if (result != -EHOSTUNREACH && result != -ENETUNREACH && 1006 result != -ENETDOWN && result != -EINVAL 1007 && result != -EPROTONOSUPPORT) { 1008 lowcomms_connect_sock(con); 1009 result = 0; 1010 } 1011 out: 1012 mutex_unlock(&con->sock_mutex); 1013 return; 1014 } 1015 1016 static struct socket *tcp_create_listen_sock(struct connection *con, 1017 struct sockaddr_storage *saddr) 1018 { 1019 struct socket *sock = NULL; 1020 int result = 0; 1021 int one = 1; 1022 int addr_len; 1023 1024 if (dlm_local_addr[0]->ss_family == AF_INET) 1025 addr_len = sizeof(struct sockaddr_in); 1026 else 1027 addr_len = sizeof(struct sockaddr_in6); 1028 1029 /* Create a socket to communicate with */ 1030 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1031 IPPROTO_TCP, &sock); 1032 if (result < 0) { 1033 log_print("Can't create listening comms socket"); 1034 goto create_out; 1035 } 1036 1037 /* Turn off Nagle's algorithm */ 1038 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1039 sizeof(one)); 1040 1041 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1042 (char *)&one, sizeof(one)); 1043 1044 if (result < 0) { 1045 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1046 } 1047 sock->sk->sk_user_data = con; 1048 con->rx_action = tcp_accept_from_sock; 1049 con->connect_action = tcp_connect_to_sock; 1050 con->sock = sock; 1051 1052 /* Bind to our port */ 1053 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1054 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1055 if (result < 0) { 1056 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1057 sock_release(sock); 1058 sock = NULL; 1059 con->sock = NULL; 1060 goto create_out; 1061 } 1062 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1063 (char *)&one, sizeof(one)); 1064 if (result < 0) { 1065 log_print("Set keepalive failed: %d", result); 1066 } 1067 1068 result = sock->ops->listen(sock, 5); 1069 if (result < 0) { 1070 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1071 sock_release(sock); 1072 sock = NULL; 1073 goto create_out; 1074 } 1075 1076 create_out: 1077 return sock; 1078 } 1079 1080 /* Get local addresses */ 1081 static void init_local(void) 1082 { 1083 struct sockaddr_storage sas, *addr; 1084 int i; 1085 1086 dlm_local_count = 0; 1087 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1088 if (dlm_our_addr(&sas, i)) 1089 break; 1090 1091 addr = kmalloc(sizeof(*addr), GFP_NOFS); 1092 if (!addr) 1093 break; 1094 memcpy(addr, &sas, sizeof(*addr)); 1095 dlm_local_addr[dlm_local_count++] = addr; 1096 } 1097 } 1098 1099 /* Bind to an IP address. SCTP allows multiple address so it can do 1100 multi-homing */ 1101 static int add_sctp_bind_addr(struct connection *sctp_con, 1102 struct sockaddr_storage *addr, 1103 int addr_len, int num) 1104 { 1105 int result = 0; 1106 1107 if (num == 1) 1108 result = kernel_bind(sctp_con->sock, 1109 (struct sockaddr *) addr, 1110 addr_len); 1111 else 1112 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1113 SCTP_SOCKOPT_BINDX_ADD, 1114 (char *)addr, addr_len); 1115 1116 if (result < 0) 1117 log_print("Can't bind to port %d addr number %d", 1118 dlm_config.ci_tcp_port, num); 1119 1120 return result; 1121 } 1122 1123 /* Initialise SCTP socket and bind to all interfaces */ 1124 static int sctp_listen_for_all(void) 1125 { 1126 struct socket *sock = NULL; 1127 struct sockaddr_storage localaddr; 1128 struct sctp_event_subscribe subscribe; 1129 int result = -EINVAL, num = 1, i, addr_len; 1130 struct connection *con = nodeid2con(0, GFP_NOFS); 1131 int bufsize = NEEDED_RMEM; 1132 1133 if (!con) 1134 return -ENOMEM; 1135 1136 log_print("Using SCTP for communications"); 1137 1138 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1139 IPPROTO_SCTP, &sock); 1140 if (result < 0) { 1141 log_print("Can't create comms socket, check SCTP is loaded"); 1142 goto out; 1143 } 1144 1145 /* Listen for events */ 1146 memset(&subscribe, 0, sizeof(subscribe)); 1147 subscribe.sctp_data_io_event = 1; 1148 subscribe.sctp_association_event = 1; 1149 subscribe.sctp_send_failure_event = 1; 1150 subscribe.sctp_shutdown_event = 1; 1151 subscribe.sctp_partial_delivery_event = 1; 1152 1153 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1154 (char *)&bufsize, sizeof(bufsize)); 1155 if (result) 1156 log_print("Error increasing buffer space on socket %d", result); 1157 1158 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1159 (char *)&subscribe, sizeof(subscribe)); 1160 if (result < 0) { 1161 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1162 result); 1163 goto create_delsock; 1164 } 1165 1166 /* Init con struct */ 1167 sock->sk->sk_user_data = con; 1168 con->sock = sock; 1169 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1170 con->rx_action = receive_from_sock; 1171 con->connect_action = sctp_init_assoc; 1172 1173 /* Bind to all interfaces. */ 1174 for (i = 0; i < dlm_local_count; i++) { 1175 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1176 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1177 1178 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1179 if (result) 1180 goto create_delsock; 1181 ++num; 1182 } 1183 1184 result = sock->ops->listen(sock, 5); 1185 if (result < 0) { 1186 log_print("Can't set socket listening"); 1187 goto create_delsock; 1188 } 1189 1190 return 0; 1191 1192 create_delsock: 1193 sock_release(sock); 1194 con->sock = NULL; 1195 out: 1196 return result; 1197 } 1198 1199 static int tcp_listen_for_all(void) 1200 { 1201 struct socket *sock = NULL; 1202 struct connection *con = nodeid2con(0, GFP_NOFS); 1203 int result = -EINVAL; 1204 1205 if (!con) 1206 return -ENOMEM; 1207 1208 /* We don't support multi-homed hosts */ 1209 if (dlm_local_addr[1] != NULL) { 1210 log_print("TCP protocol can't handle multi-homed hosts, " 1211 "try SCTP"); 1212 return -EINVAL; 1213 } 1214 1215 log_print("Using TCP for communications"); 1216 1217 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1218 if (sock) { 1219 add_sock(sock, con); 1220 result = 0; 1221 } 1222 else { 1223 result = -EADDRINUSE; 1224 } 1225 1226 return result; 1227 } 1228 1229 1230 1231 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1232 gfp_t allocation) 1233 { 1234 struct writequeue_entry *entry; 1235 1236 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1237 if (!entry) 1238 return NULL; 1239 1240 entry->page = alloc_page(allocation); 1241 if (!entry->page) { 1242 kfree(entry); 1243 return NULL; 1244 } 1245 1246 entry->offset = 0; 1247 entry->len = 0; 1248 entry->end = 0; 1249 entry->users = 0; 1250 entry->con = con; 1251 1252 return entry; 1253 } 1254 1255 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1256 { 1257 struct connection *con; 1258 struct writequeue_entry *e; 1259 int offset = 0; 1260 int users = 0; 1261 1262 con = nodeid2con(nodeid, allocation); 1263 if (!con) 1264 return NULL; 1265 1266 spin_lock(&con->writequeue_lock); 1267 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1268 if ((&e->list == &con->writequeue) || 1269 (PAGE_CACHE_SIZE - e->end < len)) { 1270 e = NULL; 1271 } else { 1272 offset = e->end; 1273 e->end += len; 1274 users = e->users++; 1275 } 1276 spin_unlock(&con->writequeue_lock); 1277 1278 if (e) { 1279 got_one: 1280 *ppc = page_address(e->page) + offset; 1281 return e; 1282 } 1283 1284 e = new_writequeue_entry(con, allocation); 1285 if (e) { 1286 spin_lock(&con->writequeue_lock); 1287 offset = e->end; 1288 e->end += len; 1289 users = e->users++; 1290 list_add_tail(&e->list, &con->writequeue); 1291 spin_unlock(&con->writequeue_lock); 1292 goto got_one; 1293 } 1294 return NULL; 1295 } 1296 1297 void dlm_lowcomms_commit_buffer(void *mh) 1298 { 1299 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1300 struct connection *con = e->con; 1301 int users; 1302 1303 spin_lock(&con->writequeue_lock); 1304 users = --e->users; 1305 if (users) 1306 goto out; 1307 e->len = e->end - e->offset; 1308 spin_unlock(&con->writequeue_lock); 1309 1310 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1311 queue_work(send_workqueue, &con->swork); 1312 } 1313 return; 1314 1315 out: 1316 spin_unlock(&con->writequeue_lock); 1317 return; 1318 } 1319 1320 /* Send a message */ 1321 static void send_to_sock(struct connection *con) 1322 { 1323 int ret = 0; 1324 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1325 struct writequeue_entry *e; 1326 int len, offset; 1327 int count = 0; 1328 1329 mutex_lock(&con->sock_mutex); 1330 if (con->sock == NULL) 1331 goto out_connect; 1332 1333 spin_lock(&con->writequeue_lock); 1334 for (;;) { 1335 e = list_entry(con->writequeue.next, struct writequeue_entry, 1336 list); 1337 if ((struct list_head *) e == &con->writequeue) 1338 break; 1339 1340 len = e->len; 1341 offset = e->offset; 1342 BUG_ON(len == 0 && e->users == 0); 1343 spin_unlock(&con->writequeue_lock); 1344 1345 ret = 0; 1346 if (len) { 1347 ret = kernel_sendpage(con->sock, e->page, offset, len, 1348 msg_flags); 1349 if (ret == -EAGAIN || ret == 0) { 1350 if (ret == -EAGAIN && 1351 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) && 1352 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1353 /* Notify TCP that we're limited by the 1354 * application window size. 1355 */ 1356 set_bit(SOCK_NOSPACE, &con->sock->flags); 1357 con->sock->sk->sk_write_pending++; 1358 } 1359 cond_resched(); 1360 goto out; 1361 } 1362 if (ret <= 0) 1363 goto send_error; 1364 } 1365 1366 /* Don't starve people filling buffers */ 1367 if (++count >= MAX_SEND_MSG_COUNT) { 1368 cond_resched(); 1369 count = 0; 1370 } 1371 1372 spin_lock(&con->writequeue_lock); 1373 e->offset += ret; 1374 e->len -= ret; 1375 1376 if (e->len == 0 && e->users == 0) { 1377 list_del(&e->list); 1378 free_entry(e); 1379 continue; 1380 } 1381 } 1382 spin_unlock(&con->writequeue_lock); 1383 out: 1384 mutex_unlock(&con->sock_mutex); 1385 return; 1386 1387 send_error: 1388 mutex_unlock(&con->sock_mutex); 1389 close_connection(con, false); 1390 lowcomms_connect_sock(con); 1391 return; 1392 1393 out_connect: 1394 mutex_unlock(&con->sock_mutex); 1395 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1396 lowcomms_connect_sock(con); 1397 return; 1398 } 1399 1400 static void clean_one_writequeue(struct connection *con) 1401 { 1402 struct writequeue_entry *e, *safe; 1403 1404 spin_lock(&con->writequeue_lock); 1405 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1406 list_del(&e->list); 1407 free_entry(e); 1408 } 1409 spin_unlock(&con->writequeue_lock); 1410 } 1411 1412 /* Called from recovery when it knows that a node has 1413 left the cluster */ 1414 int dlm_lowcomms_close(int nodeid) 1415 { 1416 struct connection *con; 1417 1418 log_print("closing connection to node %d", nodeid); 1419 con = nodeid2con(nodeid, 0); 1420 if (con) { 1421 clear_bit(CF_CONNECT_PENDING, &con->flags); 1422 clear_bit(CF_WRITE_PENDING, &con->flags); 1423 set_bit(CF_CLOSE, &con->flags); 1424 if (cancel_work_sync(&con->swork)) 1425 log_print("canceled swork for node %d", nodeid); 1426 if (cancel_work_sync(&con->rwork)) 1427 log_print("canceled rwork for node %d", nodeid); 1428 clean_one_writequeue(con); 1429 close_connection(con, true); 1430 } 1431 return 0; 1432 } 1433 1434 /* Receive workqueue function */ 1435 static void process_recv_sockets(struct work_struct *work) 1436 { 1437 struct connection *con = container_of(work, struct connection, rwork); 1438 int err; 1439 1440 clear_bit(CF_READ_PENDING, &con->flags); 1441 do { 1442 err = con->rx_action(con); 1443 } while (!err); 1444 } 1445 1446 /* Send workqueue function */ 1447 static void process_send_sockets(struct work_struct *work) 1448 { 1449 struct connection *con = container_of(work, struct connection, swork); 1450 1451 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1452 con->connect_action(con); 1453 set_bit(CF_WRITE_PENDING, &con->flags); 1454 } 1455 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) 1456 send_to_sock(con); 1457 } 1458 1459 1460 /* Discard all entries on the write queues */ 1461 static void clean_writequeues(void) 1462 { 1463 foreach_conn(clean_one_writequeue); 1464 } 1465 1466 static void work_stop(void) 1467 { 1468 destroy_workqueue(recv_workqueue); 1469 destroy_workqueue(send_workqueue); 1470 } 1471 1472 static int work_start(void) 1473 { 1474 recv_workqueue = alloc_workqueue("dlm_recv", 1475 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1476 if (!recv_workqueue) { 1477 log_print("can't start dlm_recv"); 1478 return -ENOMEM; 1479 } 1480 1481 send_workqueue = alloc_workqueue("dlm_send", 1482 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1483 if (!send_workqueue) { 1484 log_print("can't start dlm_send"); 1485 destroy_workqueue(recv_workqueue); 1486 return -ENOMEM; 1487 } 1488 1489 return 0; 1490 } 1491 1492 static void stop_conn(struct connection *con) 1493 { 1494 con->flags |= 0x0F; 1495 if (con->sock && con->sock->sk) 1496 con->sock->sk->sk_user_data = NULL; 1497 } 1498 1499 static void free_conn(struct connection *con) 1500 { 1501 close_connection(con, true); 1502 if (con->othercon) 1503 kmem_cache_free(con_cache, con->othercon); 1504 hlist_del(&con->list); 1505 kmem_cache_free(con_cache, con); 1506 } 1507 1508 void dlm_lowcomms_stop(void) 1509 { 1510 /* Set all the flags to prevent any 1511 socket activity. 1512 */ 1513 mutex_lock(&connections_lock); 1514 dlm_allow_conn = 0; 1515 foreach_conn(stop_conn); 1516 mutex_unlock(&connections_lock); 1517 1518 work_stop(); 1519 1520 mutex_lock(&connections_lock); 1521 clean_writequeues(); 1522 1523 foreach_conn(free_conn); 1524 1525 mutex_unlock(&connections_lock); 1526 kmem_cache_destroy(con_cache); 1527 } 1528 1529 int dlm_lowcomms_start(void) 1530 { 1531 int error = -EINVAL; 1532 struct connection *con; 1533 int i; 1534 1535 for (i = 0; i < CONN_HASH_SIZE; i++) 1536 INIT_HLIST_HEAD(&connection_hash[i]); 1537 1538 init_local(); 1539 if (!dlm_local_count) { 1540 error = -ENOTCONN; 1541 log_print("no local IP address has been set"); 1542 goto fail; 1543 } 1544 1545 error = -ENOMEM; 1546 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1547 __alignof__(struct connection), 0, 1548 NULL); 1549 if (!con_cache) 1550 goto fail; 1551 1552 error = work_start(); 1553 if (error) 1554 goto fail_destroy; 1555 1556 dlm_allow_conn = 1; 1557 1558 /* Start listening */ 1559 if (dlm_config.ci_protocol == 0) 1560 error = tcp_listen_for_all(); 1561 else 1562 error = sctp_listen_for_all(); 1563 if (error) 1564 goto fail_unlisten; 1565 1566 return 0; 1567 1568 fail_unlisten: 1569 dlm_allow_conn = 0; 1570 con = nodeid2con(0,0); 1571 if (con) { 1572 close_connection(con, false); 1573 kmem_cache_free(con_cache, con); 1574 } 1575 fail_destroy: 1576 kmem_cache_destroy(con_cache); 1577 fail: 1578 return error; 1579 } 1580