1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 /* 15 * lowcomms.c 16 * 17 * This is the "low-level" comms layer. 18 * 19 * It is responsible for sending/receiving messages 20 * from other nodes in the cluster. 21 * 22 * Cluster nodes are referred to by their nodeids. nodeids are 23 * simply 32 bit numbers to the locking module - if they need to 24 * be expanded for the cluster infrastructure then that is its 25 * responsibility. It is this layer's 26 * responsibility to resolve these into IP address or 27 * whatever it needs for inter-node communication. 28 * 29 * The comms level is two kernel threads that deal mainly with 30 * the receiving of messages from other nodes and passing them 31 * up to the mid-level comms layer (which understands the 32 * message format) for execution by the locking core, and 33 * a send thread which does all the setting up of connections 34 * to remote nodes and the sending of data. Threads are not allowed 35 * to send their own data because it may cause them to wait in times 36 * of high load. Also, this way, the sending thread can collect together 37 * messages bound for one node and send them in one block. 38 * 39 * lowcomms will choose to use either TCP or SCTP as its transport layer 40 * depending on the configuration variable 'protocol'. This should be set 41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a 42 * cluster-wide mechanism as it must be the same on all nodes of the cluster 43 * for the DLM to function. 44 * 45 */ 46 47 #include <asm/ioctls.h> 48 #include <net/sock.h> 49 #include <net/tcp.h> 50 #include <linux/pagemap.h> 51 #include <linux/file.h> 52 #include <linux/mutex.h> 53 #include <linux/sctp.h> 54 #include <linux/slab.h> 55 #include <net/sctp/sctp.h> 56 #include <net/sctp/user.h> 57 #include <net/ipv6.h> 58 59 #include "dlm_internal.h" 60 #include "lowcomms.h" 61 #include "midcomms.h" 62 #include "config.h" 63 64 #define NEEDED_RMEM (4*1024*1024) 65 #define CONN_HASH_SIZE 32 66 67 /* Number of messages to send before rescheduling */ 68 #define MAX_SEND_MSG_COUNT 25 69 70 struct cbuf { 71 unsigned int base; 72 unsigned int len; 73 unsigned int mask; 74 }; 75 76 static void cbuf_add(struct cbuf *cb, int n) 77 { 78 cb->len += n; 79 } 80 81 static int cbuf_data(struct cbuf *cb) 82 { 83 return ((cb->base + cb->len) & cb->mask); 84 } 85 86 static void cbuf_init(struct cbuf *cb, int size) 87 { 88 cb->base = cb->len = 0; 89 cb->mask = size-1; 90 } 91 92 static void cbuf_eat(struct cbuf *cb, int n) 93 { 94 cb->len -= n; 95 cb->base += n; 96 cb->base &= cb->mask; 97 } 98 99 static bool cbuf_empty(struct cbuf *cb) 100 { 101 return cb->len == 0; 102 } 103 104 struct connection { 105 struct socket *sock; /* NULL if not connected */ 106 uint32_t nodeid; /* So we know who we are in the list */ 107 struct mutex sock_mutex; 108 unsigned long flags; 109 #define CF_READ_PENDING 1 110 #define CF_WRITE_PENDING 2 111 #define CF_CONNECT_PENDING 3 112 #define CF_INIT_PENDING 4 113 #define CF_IS_OTHERCON 5 114 #define CF_CLOSE 6 115 #define CF_APP_LIMITED 7 116 struct list_head writequeue; /* List of outgoing writequeue_entries */ 117 spinlock_t writequeue_lock; 118 int (*rx_action) (struct connection *); /* What to do when active */ 119 void (*connect_action) (struct connection *); /* What to do to connect */ 120 struct page *rx_page; 121 struct cbuf cb; 122 int retries; 123 #define MAX_CONNECT_RETRIES 3 124 int sctp_assoc; 125 struct hlist_node list; 126 struct connection *othercon; 127 struct work_struct rwork; /* Receive workqueue */ 128 struct work_struct swork; /* Send workqueue */ 129 }; 130 #define sock2con(x) ((struct connection *)(x)->sk_user_data) 131 132 /* An entry waiting to be sent */ 133 struct writequeue_entry { 134 struct list_head list; 135 struct page *page; 136 int offset; 137 int len; 138 int end; 139 int users; 140 struct connection *con; 141 }; 142 143 struct dlm_node_addr { 144 struct list_head list; 145 int nodeid; 146 int addr_count; 147 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT]; 148 }; 149 150 static LIST_HEAD(dlm_node_addrs); 151 static DEFINE_SPINLOCK(dlm_node_addrs_spin); 152 153 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT]; 154 static int dlm_local_count; 155 static int dlm_allow_conn; 156 157 /* Work queues */ 158 static struct workqueue_struct *recv_workqueue; 159 static struct workqueue_struct *send_workqueue; 160 161 static struct hlist_head connection_hash[CONN_HASH_SIZE]; 162 static DEFINE_MUTEX(connections_lock); 163 static struct kmem_cache *con_cache; 164 165 static void process_recv_sockets(struct work_struct *work); 166 static void process_send_sockets(struct work_struct *work); 167 168 169 /* This is deliberately very simple because most clusters have simple 170 sequential nodeids, so we should be able to go straight to a connection 171 struct in the array */ 172 static inline int nodeid_hash(int nodeid) 173 { 174 return nodeid & (CONN_HASH_SIZE-1); 175 } 176 177 static struct connection *__find_con(int nodeid) 178 { 179 int r; 180 struct hlist_node *h; 181 struct connection *con; 182 183 r = nodeid_hash(nodeid); 184 185 hlist_for_each_entry(con, h, &connection_hash[r], list) { 186 if (con->nodeid == nodeid) 187 return con; 188 } 189 return NULL; 190 } 191 192 /* 193 * If 'allocation' is zero then we don't attempt to create a new 194 * connection structure for this node. 195 */ 196 static struct connection *__nodeid2con(int nodeid, gfp_t alloc) 197 { 198 struct connection *con = NULL; 199 int r; 200 201 con = __find_con(nodeid); 202 if (con || !alloc) 203 return con; 204 205 con = kmem_cache_zalloc(con_cache, alloc); 206 if (!con) 207 return NULL; 208 209 r = nodeid_hash(nodeid); 210 hlist_add_head(&con->list, &connection_hash[r]); 211 212 con->nodeid = nodeid; 213 mutex_init(&con->sock_mutex); 214 INIT_LIST_HEAD(&con->writequeue); 215 spin_lock_init(&con->writequeue_lock); 216 INIT_WORK(&con->swork, process_send_sockets); 217 INIT_WORK(&con->rwork, process_recv_sockets); 218 219 /* Setup action pointers for child sockets */ 220 if (con->nodeid) { 221 struct connection *zerocon = __find_con(0); 222 223 con->connect_action = zerocon->connect_action; 224 if (!con->rx_action) 225 con->rx_action = zerocon->rx_action; 226 } 227 228 return con; 229 } 230 231 /* Loop round all connections */ 232 static void foreach_conn(void (*conn_func)(struct connection *c)) 233 { 234 int i; 235 struct hlist_node *h, *n; 236 struct connection *con; 237 238 for (i = 0; i < CONN_HASH_SIZE; i++) { 239 hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){ 240 conn_func(con); 241 } 242 } 243 } 244 245 static struct connection *nodeid2con(int nodeid, gfp_t allocation) 246 { 247 struct connection *con; 248 249 mutex_lock(&connections_lock); 250 con = __nodeid2con(nodeid, allocation); 251 mutex_unlock(&connections_lock); 252 253 return con; 254 } 255 256 /* This is a bit drastic, but only called when things go wrong */ 257 static struct connection *assoc2con(int assoc_id) 258 { 259 int i; 260 struct hlist_node *h; 261 struct connection *con; 262 263 mutex_lock(&connections_lock); 264 265 for (i = 0 ; i < CONN_HASH_SIZE; i++) { 266 hlist_for_each_entry(con, h, &connection_hash[i], list) { 267 if (con->sctp_assoc == assoc_id) { 268 mutex_unlock(&connections_lock); 269 return con; 270 } 271 } 272 } 273 mutex_unlock(&connections_lock); 274 return NULL; 275 } 276 277 static struct dlm_node_addr *find_node_addr(int nodeid) 278 { 279 struct dlm_node_addr *na; 280 281 list_for_each_entry(na, &dlm_node_addrs, list) { 282 if (na->nodeid == nodeid) 283 return na; 284 } 285 return NULL; 286 } 287 288 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y) 289 { 290 switch (x->ss_family) { 291 case AF_INET: { 292 struct sockaddr_in *sinx = (struct sockaddr_in *)x; 293 struct sockaddr_in *siny = (struct sockaddr_in *)y; 294 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) 295 return 0; 296 if (sinx->sin_port != siny->sin_port) 297 return 0; 298 break; 299 } 300 case AF_INET6: { 301 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x; 302 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y; 303 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) 304 return 0; 305 if (sinx->sin6_port != siny->sin6_port) 306 return 0; 307 break; 308 } 309 default: 310 return 0; 311 } 312 return 1; 313 } 314 315 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out, 316 struct sockaddr *sa_out) 317 { 318 struct sockaddr_storage sas; 319 struct dlm_node_addr *na; 320 321 if (!dlm_local_count) 322 return -1; 323 324 spin_lock(&dlm_node_addrs_spin); 325 na = find_node_addr(nodeid); 326 if (na && na->addr_count) 327 memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage)); 328 spin_unlock(&dlm_node_addrs_spin); 329 330 if (!na) 331 return -EEXIST; 332 333 if (!na->addr_count) 334 return -ENOENT; 335 336 if (sas_out) 337 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage)); 338 339 if (!sa_out) 340 return 0; 341 342 if (dlm_local_addr[0]->ss_family == AF_INET) { 343 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas; 344 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out; 345 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; 346 } else { 347 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas; 348 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out; 349 ret6->sin6_addr = in6->sin6_addr; 350 } 351 352 return 0; 353 } 354 355 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid) 356 { 357 struct dlm_node_addr *na; 358 int rv = -EEXIST; 359 360 spin_lock(&dlm_node_addrs_spin); 361 list_for_each_entry(na, &dlm_node_addrs, list) { 362 if (!na->addr_count) 363 continue; 364 365 if (!addr_compare(na->addr[0], addr)) 366 continue; 367 368 *nodeid = na->nodeid; 369 rv = 0; 370 break; 371 } 372 spin_unlock(&dlm_node_addrs_spin); 373 return rv; 374 } 375 376 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len) 377 { 378 struct sockaddr_storage *new_addr; 379 struct dlm_node_addr *new_node, *na; 380 381 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS); 382 if (!new_node) 383 return -ENOMEM; 384 385 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS); 386 if (!new_addr) { 387 kfree(new_node); 388 return -ENOMEM; 389 } 390 391 memcpy(new_addr, addr, len); 392 393 spin_lock(&dlm_node_addrs_spin); 394 na = find_node_addr(nodeid); 395 if (!na) { 396 new_node->nodeid = nodeid; 397 new_node->addr[0] = new_addr; 398 new_node->addr_count = 1; 399 list_add(&new_node->list, &dlm_node_addrs); 400 spin_unlock(&dlm_node_addrs_spin); 401 return 0; 402 } 403 404 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { 405 spin_unlock(&dlm_node_addrs_spin); 406 kfree(new_addr); 407 kfree(new_node); 408 return -ENOSPC; 409 } 410 411 na->addr[na->addr_count++] = new_addr; 412 spin_unlock(&dlm_node_addrs_spin); 413 kfree(new_node); 414 return 0; 415 } 416 417 /* Data available on socket or listen socket received a connect */ 418 static void lowcomms_data_ready(struct sock *sk, int count_unused) 419 { 420 struct connection *con = sock2con(sk); 421 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) 422 queue_work(recv_workqueue, &con->rwork); 423 } 424 425 static void lowcomms_write_space(struct sock *sk) 426 { 427 struct connection *con = sock2con(sk); 428 429 if (!con) 430 return; 431 432 clear_bit(SOCK_NOSPACE, &con->sock->flags); 433 434 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { 435 con->sock->sk->sk_write_pending--; 436 clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags); 437 } 438 439 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 440 queue_work(send_workqueue, &con->swork); 441 } 442 443 static inline void lowcomms_connect_sock(struct connection *con) 444 { 445 if (test_bit(CF_CLOSE, &con->flags)) 446 return; 447 if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags)) 448 queue_work(send_workqueue, &con->swork); 449 } 450 451 static void lowcomms_state_change(struct sock *sk) 452 { 453 if (sk->sk_state == TCP_ESTABLISHED) 454 lowcomms_write_space(sk); 455 } 456 457 int dlm_lowcomms_connect_node(int nodeid) 458 { 459 struct connection *con; 460 461 /* with sctp there's no connecting without sending */ 462 if (dlm_config.ci_protocol != 0) 463 return 0; 464 465 if (nodeid == dlm_our_nodeid()) 466 return 0; 467 468 con = nodeid2con(nodeid, GFP_NOFS); 469 if (!con) 470 return -ENOMEM; 471 lowcomms_connect_sock(con); 472 return 0; 473 } 474 475 /* Make a socket active */ 476 static void add_sock(struct socket *sock, struct connection *con) 477 { 478 con->sock = sock; 479 480 /* Install a data_ready callback */ 481 con->sock->sk->sk_data_ready = lowcomms_data_ready; 482 con->sock->sk->sk_write_space = lowcomms_write_space; 483 con->sock->sk->sk_state_change = lowcomms_state_change; 484 con->sock->sk->sk_user_data = con; 485 con->sock->sk->sk_allocation = GFP_NOFS; 486 } 487 488 /* Add the port number to an IPv6 or 4 sockaddr and return the address 489 length */ 490 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, 491 int *addr_len) 492 { 493 saddr->ss_family = dlm_local_addr[0]->ss_family; 494 if (saddr->ss_family == AF_INET) { 495 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; 496 in4_addr->sin_port = cpu_to_be16(port); 497 *addr_len = sizeof(struct sockaddr_in); 498 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); 499 } else { 500 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; 501 in6_addr->sin6_port = cpu_to_be16(port); 502 *addr_len = sizeof(struct sockaddr_in6); 503 } 504 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); 505 } 506 507 /* Close a remote connection and tidy up */ 508 static void close_connection(struct connection *con, bool and_other) 509 { 510 mutex_lock(&con->sock_mutex); 511 512 if (con->sock) { 513 sock_release(con->sock); 514 con->sock = NULL; 515 } 516 if (con->othercon && and_other) { 517 /* Will only re-enter once. */ 518 close_connection(con->othercon, false); 519 } 520 if (con->rx_page) { 521 __free_page(con->rx_page); 522 con->rx_page = NULL; 523 } 524 525 con->retries = 0; 526 mutex_unlock(&con->sock_mutex); 527 } 528 529 /* We only send shutdown messages to nodes that are not part of the cluster */ 530 static void sctp_send_shutdown(sctp_assoc_t associd) 531 { 532 static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 533 struct msghdr outmessage; 534 struct cmsghdr *cmsg; 535 struct sctp_sndrcvinfo *sinfo; 536 int ret; 537 struct connection *con; 538 539 con = nodeid2con(0,0); 540 BUG_ON(con == NULL); 541 542 outmessage.msg_name = NULL; 543 outmessage.msg_namelen = 0; 544 outmessage.msg_control = outcmsg; 545 outmessage.msg_controllen = sizeof(outcmsg); 546 outmessage.msg_flags = MSG_EOR; 547 548 cmsg = CMSG_FIRSTHDR(&outmessage); 549 cmsg->cmsg_level = IPPROTO_SCTP; 550 cmsg->cmsg_type = SCTP_SNDRCV; 551 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 552 outmessage.msg_controllen = cmsg->cmsg_len; 553 sinfo = CMSG_DATA(cmsg); 554 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 555 556 sinfo->sinfo_flags |= MSG_EOF; 557 sinfo->sinfo_assoc_id = associd; 558 559 ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0); 560 561 if (ret != 0) 562 log_print("send EOF to node failed: %d", ret); 563 } 564 565 static void sctp_init_failed_foreach(struct connection *con) 566 { 567 con->sctp_assoc = 0; 568 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 569 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) 570 queue_work(send_workqueue, &con->swork); 571 } 572 } 573 574 /* INIT failed but we don't know which node... 575 restart INIT on all pending nodes */ 576 static void sctp_init_failed(void) 577 { 578 mutex_lock(&connections_lock); 579 580 foreach_conn(sctp_init_failed_foreach); 581 582 mutex_unlock(&connections_lock); 583 } 584 585 /* Something happened to an association */ 586 static void process_sctp_notification(struct connection *con, 587 struct msghdr *msg, char *buf) 588 { 589 union sctp_notification *sn = (union sctp_notification *)buf; 590 591 if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) { 592 switch (sn->sn_assoc_change.sac_state) { 593 594 case SCTP_COMM_UP: 595 case SCTP_RESTART: 596 { 597 /* Check that the new node is in the lockspace */ 598 struct sctp_prim prim; 599 int nodeid; 600 int prim_len, ret; 601 int addr_len; 602 struct connection *new_con; 603 604 /* 605 * We get this before any data for an association. 606 * We verify that the node is in the cluster and 607 * then peel off a socket for it. 608 */ 609 if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) { 610 log_print("COMM_UP for invalid assoc ID %d", 611 (int)sn->sn_assoc_change.sac_assoc_id); 612 sctp_init_failed(); 613 return; 614 } 615 memset(&prim, 0, sizeof(struct sctp_prim)); 616 prim_len = sizeof(struct sctp_prim); 617 prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id; 618 619 ret = kernel_getsockopt(con->sock, 620 IPPROTO_SCTP, 621 SCTP_PRIMARY_ADDR, 622 (char*)&prim, 623 &prim_len); 624 if (ret < 0) { 625 log_print("getsockopt/sctp_primary_addr on " 626 "new assoc %d failed : %d", 627 (int)sn->sn_assoc_change.sac_assoc_id, 628 ret); 629 630 /* Retry INIT later */ 631 new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 632 if (new_con) 633 clear_bit(CF_CONNECT_PENDING, &con->flags); 634 return; 635 } 636 make_sockaddr(&prim.ssp_addr, 0, &addr_len); 637 if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) { 638 unsigned char *b=(unsigned char *)&prim.ssp_addr; 639 log_print("reject connect from unknown addr"); 640 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 641 b, sizeof(struct sockaddr_storage)); 642 sctp_send_shutdown(prim.ssp_assoc_id); 643 return; 644 } 645 646 new_con = nodeid2con(nodeid, GFP_NOFS); 647 if (!new_con) 648 return; 649 650 /* Peel off a new sock */ 651 sctp_lock_sock(con->sock->sk); 652 ret = sctp_do_peeloff(con->sock->sk, 653 sn->sn_assoc_change.sac_assoc_id, 654 &new_con->sock); 655 sctp_release_sock(con->sock->sk); 656 if (ret < 0) { 657 log_print("Can't peel off a socket for " 658 "connection %d to node %d: err=%d", 659 (int)sn->sn_assoc_change.sac_assoc_id, 660 nodeid, ret); 661 return; 662 } 663 add_sock(new_con->sock, new_con); 664 665 log_print("connecting to %d sctp association %d", 666 nodeid, (int)sn->sn_assoc_change.sac_assoc_id); 667 668 /* Send any pending writes */ 669 clear_bit(CF_CONNECT_PENDING, &new_con->flags); 670 clear_bit(CF_INIT_PENDING, &con->flags); 671 if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) { 672 queue_work(send_workqueue, &new_con->swork); 673 } 674 if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags)) 675 queue_work(recv_workqueue, &new_con->rwork); 676 } 677 break; 678 679 case SCTP_COMM_LOST: 680 case SCTP_SHUTDOWN_COMP: 681 { 682 con = assoc2con(sn->sn_assoc_change.sac_assoc_id); 683 if (con) { 684 con->sctp_assoc = 0; 685 } 686 } 687 break; 688 689 /* We don't know which INIT failed, so clear the PENDING flags 690 * on them all. if assoc_id is zero then it will then try 691 * again */ 692 693 case SCTP_CANT_STR_ASSOC: 694 { 695 log_print("Can't start SCTP association - retrying"); 696 sctp_init_failed(); 697 } 698 break; 699 700 default: 701 log_print("unexpected SCTP assoc change id=%d state=%d", 702 (int)sn->sn_assoc_change.sac_assoc_id, 703 sn->sn_assoc_change.sac_state); 704 } 705 } 706 } 707 708 /* Data received from remote end */ 709 static int receive_from_sock(struct connection *con) 710 { 711 int ret = 0; 712 struct msghdr msg = {}; 713 struct kvec iov[2]; 714 unsigned len; 715 int r; 716 int call_again_soon = 0; 717 int nvec; 718 char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 719 720 mutex_lock(&con->sock_mutex); 721 722 if (con->sock == NULL) { 723 ret = -EAGAIN; 724 goto out_close; 725 } 726 727 if (con->rx_page == NULL) { 728 /* 729 * This doesn't need to be atomic, but I think it should 730 * improve performance if it is. 731 */ 732 con->rx_page = alloc_page(GFP_ATOMIC); 733 if (con->rx_page == NULL) 734 goto out_resched; 735 cbuf_init(&con->cb, PAGE_CACHE_SIZE); 736 } 737 738 /* Only SCTP needs these really */ 739 memset(&incmsg, 0, sizeof(incmsg)); 740 msg.msg_control = incmsg; 741 msg.msg_controllen = sizeof(incmsg); 742 743 /* 744 * iov[0] is the bit of the circular buffer between the current end 745 * point (cb.base + cb.len) and the end of the buffer. 746 */ 747 iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); 748 iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); 749 iov[1].iov_len = 0; 750 nvec = 1; 751 752 /* 753 * iov[1] is the bit of the circular buffer between the start of the 754 * buffer and the start of the currently used section (cb.base) 755 */ 756 if (cbuf_data(&con->cb) >= con->cb.base) { 757 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb); 758 iov[1].iov_len = con->cb.base; 759 iov[1].iov_base = page_address(con->rx_page); 760 nvec = 2; 761 } 762 len = iov[0].iov_len + iov[1].iov_len; 763 764 r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len, 765 MSG_DONTWAIT | MSG_NOSIGNAL); 766 if (ret <= 0) 767 goto out_close; 768 769 /* Process SCTP notifications */ 770 if (msg.msg_flags & MSG_NOTIFICATION) { 771 msg.msg_control = incmsg; 772 msg.msg_controllen = sizeof(incmsg); 773 774 process_sctp_notification(con, &msg, 775 page_address(con->rx_page) + con->cb.base); 776 mutex_unlock(&con->sock_mutex); 777 return 0; 778 } 779 BUG_ON(con->nodeid == 0); 780 781 if (ret == len) 782 call_again_soon = 1; 783 cbuf_add(&con->cb, ret); 784 ret = dlm_process_incoming_buffer(con->nodeid, 785 page_address(con->rx_page), 786 con->cb.base, con->cb.len, 787 PAGE_CACHE_SIZE); 788 if (ret == -EBADMSG) { 789 log_print("lowcomms: addr=%p, base=%u, len=%u, " 790 "iov_len=%u, iov_base[0]=%p, read=%d", 791 page_address(con->rx_page), con->cb.base, con->cb.len, 792 len, iov[0].iov_base, r); 793 } 794 if (ret < 0) 795 goto out_close; 796 cbuf_eat(&con->cb, ret); 797 798 if (cbuf_empty(&con->cb) && !call_again_soon) { 799 __free_page(con->rx_page); 800 con->rx_page = NULL; 801 } 802 803 if (call_again_soon) 804 goto out_resched; 805 mutex_unlock(&con->sock_mutex); 806 return 0; 807 808 out_resched: 809 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) 810 queue_work(recv_workqueue, &con->rwork); 811 mutex_unlock(&con->sock_mutex); 812 return -EAGAIN; 813 814 out_close: 815 mutex_unlock(&con->sock_mutex); 816 if (ret != -EAGAIN) { 817 close_connection(con, false); 818 /* Reconnect when there is something to send */ 819 } 820 /* Don't return success if we really got EOF */ 821 if (ret == 0) 822 ret = -EAGAIN; 823 824 return ret; 825 } 826 827 /* Listening socket is busy, accept a connection */ 828 static int tcp_accept_from_sock(struct connection *con) 829 { 830 int result; 831 struct sockaddr_storage peeraddr; 832 struct socket *newsock; 833 int len; 834 int nodeid; 835 struct connection *newcon; 836 struct connection *addcon; 837 838 mutex_lock(&connections_lock); 839 if (!dlm_allow_conn) { 840 mutex_unlock(&connections_lock); 841 return -1; 842 } 843 mutex_unlock(&connections_lock); 844 845 memset(&peeraddr, 0, sizeof(peeraddr)); 846 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 847 IPPROTO_TCP, &newsock); 848 if (result < 0) 849 return -ENOMEM; 850 851 mutex_lock_nested(&con->sock_mutex, 0); 852 853 result = -ENOTCONN; 854 if (con->sock == NULL) 855 goto accept_err; 856 857 newsock->type = con->sock->type; 858 newsock->ops = con->sock->ops; 859 860 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK); 861 if (result < 0) 862 goto accept_err; 863 864 /* Get the connected socket's peer */ 865 memset(&peeraddr, 0, sizeof(peeraddr)); 866 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 867 &len, 2)) { 868 result = -ECONNABORTED; 869 goto accept_err; 870 } 871 872 /* Get the new node's NODEID */ 873 make_sockaddr(&peeraddr, 0, &len); 874 if (addr_to_nodeid(&peeraddr, &nodeid)) { 875 unsigned char *b=(unsigned char *)&peeraddr; 876 log_print("connect from non cluster node"); 877 print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 878 b, sizeof(struct sockaddr_storage)); 879 sock_release(newsock); 880 mutex_unlock(&con->sock_mutex); 881 return -1; 882 } 883 884 log_print("got connection from %d", nodeid); 885 886 /* Check to see if we already have a connection to this node. This 887 * could happen if the two nodes initiate a connection at roughly 888 * the same time and the connections cross on the wire. 889 * In this case we store the incoming one in "othercon" 890 */ 891 newcon = nodeid2con(nodeid, GFP_NOFS); 892 if (!newcon) { 893 result = -ENOMEM; 894 goto accept_err; 895 } 896 mutex_lock_nested(&newcon->sock_mutex, 1); 897 if (newcon->sock) { 898 struct connection *othercon = newcon->othercon; 899 900 if (!othercon) { 901 othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); 902 if (!othercon) { 903 log_print("failed to allocate incoming socket"); 904 mutex_unlock(&newcon->sock_mutex); 905 result = -ENOMEM; 906 goto accept_err; 907 } 908 othercon->nodeid = nodeid; 909 othercon->rx_action = receive_from_sock; 910 mutex_init(&othercon->sock_mutex); 911 INIT_WORK(&othercon->swork, process_send_sockets); 912 INIT_WORK(&othercon->rwork, process_recv_sockets); 913 set_bit(CF_IS_OTHERCON, &othercon->flags); 914 } 915 if (!othercon->sock) { 916 newcon->othercon = othercon; 917 othercon->sock = newsock; 918 newsock->sk->sk_user_data = othercon; 919 add_sock(newsock, othercon); 920 addcon = othercon; 921 } 922 else { 923 printk("Extra connection from node %d attempted\n", nodeid); 924 result = -EAGAIN; 925 mutex_unlock(&newcon->sock_mutex); 926 goto accept_err; 927 } 928 } 929 else { 930 newsock->sk->sk_user_data = newcon; 931 newcon->rx_action = receive_from_sock; 932 add_sock(newsock, newcon); 933 addcon = newcon; 934 } 935 936 mutex_unlock(&newcon->sock_mutex); 937 938 /* 939 * Add it to the active queue in case we got data 940 * between processing the accept adding the socket 941 * to the read_sockets list 942 */ 943 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) 944 queue_work(recv_workqueue, &addcon->rwork); 945 mutex_unlock(&con->sock_mutex); 946 947 return 0; 948 949 accept_err: 950 mutex_unlock(&con->sock_mutex); 951 sock_release(newsock); 952 953 if (result != -EAGAIN) 954 log_print("error accepting connection from node: %d", result); 955 return result; 956 } 957 958 static void free_entry(struct writequeue_entry *e) 959 { 960 __free_page(e->page); 961 kfree(e); 962 } 963 964 /* Initiate an SCTP association. 965 This is a special case of send_to_sock() in that we don't yet have a 966 peeled-off socket for this association, so we use the listening socket 967 and add the primary IP address of the remote node. 968 */ 969 static void sctp_init_assoc(struct connection *con) 970 { 971 struct sockaddr_storage rem_addr; 972 char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; 973 struct msghdr outmessage; 974 struct cmsghdr *cmsg; 975 struct sctp_sndrcvinfo *sinfo; 976 struct connection *base_con; 977 struct writequeue_entry *e; 978 int len, offset; 979 int ret; 980 int addrlen; 981 struct kvec iov[1]; 982 983 if (test_and_set_bit(CF_INIT_PENDING, &con->flags)) 984 return; 985 986 if (con->retries++ > MAX_CONNECT_RETRIES) 987 return; 988 989 if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) { 990 log_print("no address for nodeid %d", con->nodeid); 991 return; 992 } 993 base_con = nodeid2con(0, 0); 994 BUG_ON(base_con == NULL); 995 996 make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen); 997 998 outmessage.msg_name = &rem_addr; 999 outmessage.msg_namelen = addrlen; 1000 outmessage.msg_control = outcmsg; 1001 outmessage.msg_controllen = sizeof(outcmsg); 1002 outmessage.msg_flags = MSG_EOR; 1003 1004 spin_lock(&con->writequeue_lock); 1005 1006 if (list_empty(&con->writequeue)) { 1007 spin_unlock(&con->writequeue_lock); 1008 log_print("writequeue empty for nodeid %d", con->nodeid); 1009 return; 1010 } 1011 1012 e = list_first_entry(&con->writequeue, struct writequeue_entry, list); 1013 len = e->len; 1014 offset = e->offset; 1015 spin_unlock(&con->writequeue_lock); 1016 1017 /* Send the first block off the write queue */ 1018 iov[0].iov_base = page_address(e->page)+offset; 1019 iov[0].iov_len = len; 1020 1021 cmsg = CMSG_FIRSTHDR(&outmessage); 1022 cmsg->cmsg_level = IPPROTO_SCTP; 1023 cmsg->cmsg_type = SCTP_SNDRCV; 1024 cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); 1025 sinfo = CMSG_DATA(cmsg); 1026 memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); 1027 sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid()); 1028 outmessage.msg_controllen = cmsg->cmsg_len; 1029 1030 ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len); 1031 if (ret < 0) { 1032 log_print("Send first packet to node %d failed: %d", 1033 con->nodeid, ret); 1034 1035 /* Try again later */ 1036 clear_bit(CF_CONNECT_PENDING, &con->flags); 1037 clear_bit(CF_INIT_PENDING, &con->flags); 1038 } 1039 else { 1040 spin_lock(&con->writequeue_lock); 1041 e->offset += ret; 1042 e->len -= ret; 1043 1044 if (e->len == 0 && e->users == 0) { 1045 list_del(&e->list); 1046 free_entry(e); 1047 } 1048 spin_unlock(&con->writequeue_lock); 1049 } 1050 } 1051 1052 /* Connect a new socket to its peer */ 1053 static void tcp_connect_to_sock(struct connection *con) 1054 { 1055 struct sockaddr_storage saddr, src_addr; 1056 int addr_len; 1057 struct socket *sock = NULL; 1058 int one = 1; 1059 int result; 1060 1061 if (con->nodeid == 0) { 1062 log_print("attempt to connect sock 0 foiled"); 1063 return; 1064 } 1065 1066 mutex_lock(&con->sock_mutex); 1067 if (con->retries++ > MAX_CONNECT_RETRIES) 1068 goto out; 1069 1070 /* Some odd races can cause double-connects, ignore them */ 1071 if (con->sock) 1072 goto out; 1073 1074 /* Create a socket to communicate with */ 1075 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1076 IPPROTO_TCP, &sock); 1077 if (result < 0) 1078 goto out_err; 1079 1080 memset(&saddr, 0, sizeof(saddr)); 1081 result = nodeid_to_addr(con->nodeid, &saddr, NULL); 1082 if (result < 0) { 1083 log_print("no address for nodeid %d", con->nodeid); 1084 goto out_err; 1085 } 1086 1087 sock->sk->sk_user_data = con; 1088 con->rx_action = receive_from_sock; 1089 con->connect_action = tcp_connect_to_sock; 1090 add_sock(sock, con); 1091 1092 /* Bind to our cluster-known address connecting to avoid 1093 routing problems */ 1094 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr)); 1095 make_sockaddr(&src_addr, 0, &addr_len); 1096 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, 1097 addr_len); 1098 if (result < 0) { 1099 log_print("could not bind for connect: %d", result); 1100 /* This *may* not indicate a critical error */ 1101 } 1102 1103 make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len); 1104 1105 log_print("connecting to %d", con->nodeid); 1106 1107 /* Turn off Nagle's algorithm */ 1108 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1109 sizeof(one)); 1110 1111 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, 1112 O_NONBLOCK); 1113 if (result == -EINPROGRESS) 1114 result = 0; 1115 if (result == 0) 1116 goto out; 1117 1118 out_err: 1119 if (con->sock) { 1120 sock_release(con->sock); 1121 con->sock = NULL; 1122 } else if (sock) { 1123 sock_release(sock); 1124 } 1125 /* 1126 * Some errors are fatal and this list might need adjusting. For other 1127 * errors we try again until the max number of retries is reached. 1128 */ 1129 if (result != -EHOSTUNREACH && 1130 result != -ENETUNREACH && 1131 result != -ENETDOWN && 1132 result != -EINVAL && 1133 result != -EPROTONOSUPPORT) { 1134 log_print("connect %d try %d error %d", con->nodeid, 1135 con->retries, result); 1136 mutex_unlock(&con->sock_mutex); 1137 msleep(1000); 1138 lowcomms_connect_sock(con); 1139 return; 1140 } 1141 out: 1142 mutex_unlock(&con->sock_mutex); 1143 return; 1144 } 1145 1146 static struct socket *tcp_create_listen_sock(struct connection *con, 1147 struct sockaddr_storage *saddr) 1148 { 1149 struct socket *sock = NULL; 1150 int result = 0; 1151 int one = 1; 1152 int addr_len; 1153 1154 if (dlm_local_addr[0]->ss_family == AF_INET) 1155 addr_len = sizeof(struct sockaddr_in); 1156 else 1157 addr_len = sizeof(struct sockaddr_in6); 1158 1159 /* Create a socket to communicate with */ 1160 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM, 1161 IPPROTO_TCP, &sock); 1162 if (result < 0) { 1163 log_print("Can't create listening comms socket"); 1164 goto create_out; 1165 } 1166 1167 /* Turn off Nagle's algorithm */ 1168 kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one, 1169 sizeof(one)); 1170 1171 result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 1172 (char *)&one, sizeof(one)); 1173 1174 if (result < 0) { 1175 log_print("Failed to set SO_REUSEADDR on socket: %d", result); 1176 } 1177 con->rx_action = tcp_accept_from_sock; 1178 con->connect_action = tcp_connect_to_sock; 1179 1180 /* Bind to our port */ 1181 make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len); 1182 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); 1183 if (result < 0) { 1184 log_print("Can't bind to port %d", dlm_config.ci_tcp_port); 1185 sock_release(sock); 1186 sock = NULL; 1187 con->sock = NULL; 1188 goto create_out; 1189 } 1190 result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, 1191 (char *)&one, sizeof(one)); 1192 if (result < 0) { 1193 log_print("Set keepalive failed: %d", result); 1194 } 1195 1196 result = sock->ops->listen(sock, 5); 1197 if (result < 0) { 1198 log_print("Can't listen on port %d", dlm_config.ci_tcp_port); 1199 sock_release(sock); 1200 sock = NULL; 1201 goto create_out; 1202 } 1203 1204 create_out: 1205 return sock; 1206 } 1207 1208 /* Get local addresses */ 1209 static void init_local(void) 1210 { 1211 struct sockaddr_storage sas, *addr; 1212 int i; 1213 1214 dlm_local_count = 0; 1215 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) { 1216 if (dlm_our_addr(&sas, i)) 1217 break; 1218 1219 addr = kmalloc(sizeof(*addr), GFP_NOFS); 1220 if (!addr) 1221 break; 1222 memcpy(addr, &sas, sizeof(*addr)); 1223 dlm_local_addr[dlm_local_count++] = addr; 1224 } 1225 } 1226 1227 /* Bind to an IP address. SCTP allows multiple address so it can do 1228 multi-homing */ 1229 static int add_sctp_bind_addr(struct connection *sctp_con, 1230 struct sockaddr_storage *addr, 1231 int addr_len, int num) 1232 { 1233 int result = 0; 1234 1235 if (num == 1) 1236 result = kernel_bind(sctp_con->sock, 1237 (struct sockaddr *) addr, 1238 addr_len); 1239 else 1240 result = kernel_setsockopt(sctp_con->sock, SOL_SCTP, 1241 SCTP_SOCKOPT_BINDX_ADD, 1242 (char *)addr, addr_len); 1243 1244 if (result < 0) 1245 log_print("Can't bind to port %d addr number %d", 1246 dlm_config.ci_tcp_port, num); 1247 1248 return result; 1249 } 1250 1251 /* Initialise SCTP socket and bind to all interfaces */ 1252 static int sctp_listen_for_all(void) 1253 { 1254 struct socket *sock = NULL; 1255 struct sockaddr_storage localaddr; 1256 struct sctp_event_subscribe subscribe; 1257 int result = -EINVAL, num = 1, i, addr_len; 1258 struct connection *con = nodeid2con(0, GFP_NOFS); 1259 int bufsize = NEEDED_RMEM; 1260 1261 if (!con) 1262 return -ENOMEM; 1263 1264 log_print("Using SCTP for communications"); 1265 1266 result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET, 1267 IPPROTO_SCTP, &sock); 1268 if (result < 0) { 1269 log_print("Can't create comms socket, check SCTP is loaded"); 1270 goto out; 1271 } 1272 1273 /* Listen for events */ 1274 memset(&subscribe, 0, sizeof(subscribe)); 1275 subscribe.sctp_data_io_event = 1; 1276 subscribe.sctp_association_event = 1; 1277 subscribe.sctp_send_failure_event = 1; 1278 subscribe.sctp_shutdown_event = 1; 1279 subscribe.sctp_partial_delivery_event = 1; 1280 1281 result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE, 1282 (char *)&bufsize, sizeof(bufsize)); 1283 if (result) 1284 log_print("Error increasing buffer space on socket %d", result); 1285 1286 result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS, 1287 (char *)&subscribe, sizeof(subscribe)); 1288 if (result < 0) { 1289 log_print("Failed to set SCTP_EVENTS on socket: result=%d", 1290 result); 1291 goto create_delsock; 1292 } 1293 1294 /* Init con struct */ 1295 sock->sk->sk_user_data = con; 1296 con->sock = sock; 1297 con->sock->sk->sk_data_ready = lowcomms_data_ready; 1298 con->rx_action = receive_from_sock; 1299 con->connect_action = sctp_init_assoc; 1300 1301 /* Bind to all interfaces. */ 1302 for (i = 0; i < dlm_local_count; i++) { 1303 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr)); 1304 make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len); 1305 1306 result = add_sctp_bind_addr(con, &localaddr, addr_len, num); 1307 if (result) 1308 goto create_delsock; 1309 ++num; 1310 } 1311 1312 result = sock->ops->listen(sock, 5); 1313 if (result < 0) { 1314 log_print("Can't set socket listening"); 1315 goto create_delsock; 1316 } 1317 1318 return 0; 1319 1320 create_delsock: 1321 sock_release(sock); 1322 con->sock = NULL; 1323 out: 1324 return result; 1325 } 1326 1327 static int tcp_listen_for_all(void) 1328 { 1329 struct socket *sock = NULL; 1330 struct connection *con = nodeid2con(0, GFP_NOFS); 1331 int result = -EINVAL; 1332 1333 if (!con) 1334 return -ENOMEM; 1335 1336 /* We don't support multi-homed hosts */ 1337 if (dlm_local_addr[1] != NULL) { 1338 log_print("TCP protocol can't handle multi-homed hosts, " 1339 "try SCTP"); 1340 return -EINVAL; 1341 } 1342 1343 log_print("Using TCP for communications"); 1344 1345 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); 1346 if (sock) { 1347 add_sock(sock, con); 1348 result = 0; 1349 } 1350 else { 1351 result = -EADDRINUSE; 1352 } 1353 1354 return result; 1355 } 1356 1357 1358 1359 static struct writequeue_entry *new_writequeue_entry(struct connection *con, 1360 gfp_t allocation) 1361 { 1362 struct writequeue_entry *entry; 1363 1364 entry = kmalloc(sizeof(struct writequeue_entry), allocation); 1365 if (!entry) 1366 return NULL; 1367 1368 entry->page = alloc_page(allocation); 1369 if (!entry->page) { 1370 kfree(entry); 1371 return NULL; 1372 } 1373 1374 entry->offset = 0; 1375 entry->len = 0; 1376 entry->end = 0; 1377 entry->users = 0; 1378 entry->con = con; 1379 1380 return entry; 1381 } 1382 1383 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) 1384 { 1385 struct connection *con; 1386 struct writequeue_entry *e; 1387 int offset = 0; 1388 int users = 0; 1389 1390 con = nodeid2con(nodeid, allocation); 1391 if (!con) 1392 return NULL; 1393 1394 spin_lock(&con->writequeue_lock); 1395 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); 1396 if ((&e->list == &con->writequeue) || 1397 (PAGE_CACHE_SIZE - e->end < len)) { 1398 e = NULL; 1399 } else { 1400 offset = e->end; 1401 e->end += len; 1402 users = e->users++; 1403 } 1404 spin_unlock(&con->writequeue_lock); 1405 1406 if (e) { 1407 got_one: 1408 *ppc = page_address(e->page) + offset; 1409 return e; 1410 } 1411 1412 e = new_writequeue_entry(con, allocation); 1413 if (e) { 1414 spin_lock(&con->writequeue_lock); 1415 offset = e->end; 1416 e->end += len; 1417 users = e->users++; 1418 list_add_tail(&e->list, &con->writequeue); 1419 spin_unlock(&con->writequeue_lock); 1420 goto got_one; 1421 } 1422 return NULL; 1423 } 1424 1425 void dlm_lowcomms_commit_buffer(void *mh) 1426 { 1427 struct writequeue_entry *e = (struct writequeue_entry *)mh; 1428 struct connection *con = e->con; 1429 int users; 1430 1431 spin_lock(&con->writequeue_lock); 1432 users = --e->users; 1433 if (users) 1434 goto out; 1435 e->len = e->end - e->offset; 1436 spin_unlock(&con->writequeue_lock); 1437 1438 if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) { 1439 queue_work(send_workqueue, &con->swork); 1440 } 1441 return; 1442 1443 out: 1444 spin_unlock(&con->writequeue_lock); 1445 return; 1446 } 1447 1448 /* Send a message */ 1449 static void send_to_sock(struct connection *con) 1450 { 1451 int ret = 0; 1452 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; 1453 struct writequeue_entry *e; 1454 int len, offset; 1455 int count = 0; 1456 1457 mutex_lock(&con->sock_mutex); 1458 if (con->sock == NULL) 1459 goto out_connect; 1460 1461 spin_lock(&con->writequeue_lock); 1462 for (;;) { 1463 e = list_entry(con->writequeue.next, struct writequeue_entry, 1464 list); 1465 if ((struct list_head *) e == &con->writequeue) 1466 break; 1467 1468 len = e->len; 1469 offset = e->offset; 1470 BUG_ON(len == 0 && e->users == 0); 1471 spin_unlock(&con->writequeue_lock); 1472 1473 ret = 0; 1474 if (len) { 1475 ret = kernel_sendpage(con->sock, e->page, offset, len, 1476 msg_flags); 1477 if (ret == -EAGAIN || ret == 0) { 1478 if (ret == -EAGAIN && 1479 test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) && 1480 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { 1481 /* Notify TCP that we're limited by the 1482 * application window size. 1483 */ 1484 set_bit(SOCK_NOSPACE, &con->sock->flags); 1485 con->sock->sk->sk_write_pending++; 1486 } 1487 cond_resched(); 1488 goto out; 1489 } else if (ret < 0) 1490 goto send_error; 1491 } 1492 1493 /* Don't starve people filling buffers */ 1494 if (++count >= MAX_SEND_MSG_COUNT) { 1495 cond_resched(); 1496 count = 0; 1497 } 1498 1499 spin_lock(&con->writequeue_lock); 1500 e->offset += ret; 1501 e->len -= ret; 1502 1503 if (e->len == 0 && e->users == 0) { 1504 list_del(&e->list); 1505 free_entry(e); 1506 } 1507 } 1508 spin_unlock(&con->writequeue_lock); 1509 out: 1510 mutex_unlock(&con->sock_mutex); 1511 return; 1512 1513 send_error: 1514 mutex_unlock(&con->sock_mutex); 1515 close_connection(con, false); 1516 lowcomms_connect_sock(con); 1517 return; 1518 1519 out_connect: 1520 mutex_unlock(&con->sock_mutex); 1521 if (!test_bit(CF_INIT_PENDING, &con->flags)) 1522 lowcomms_connect_sock(con); 1523 } 1524 1525 static void clean_one_writequeue(struct connection *con) 1526 { 1527 struct writequeue_entry *e, *safe; 1528 1529 spin_lock(&con->writequeue_lock); 1530 list_for_each_entry_safe(e, safe, &con->writequeue, list) { 1531 list_del(&e->list); 1532 free_entry(e); 1533 } 1534 spin_unlock(&con->writequeue_lock); 1535 } 1536 1537 /* Called from recovery when it knows that a node has 1538 left the cluster */ 1539 int dlm_lowcomms_close(int nodeid) 1540 { 1541 struct connection *con; 1542 struct dlm_node_addr *na; 1543 1544 log_print("closing connection to node %d", nodeid); 1545 con = nodeid2con(nodeid, 0); 1546 if (con) { 1547 clear_bit(CF_CONNECT_PENDING, &con->flags); 1548 clear_bit(CF_WRITE_PENDING, &con->flags); 1549 set_bit(CF_CLOSE, &con->flags); 1550 if (cancel_work_sync(&con->swork)) 1551 log_print("canceled swork for node %d", nodeid); 1552 if (cancel_work_sync(&con->rwork)) 1553 log_print("canceled rwork for node %d", nodeid); 1554 clean_one_writequeue(con); 1555 close_connection(con, true); 1556 } 1557 1558 spin_lock(&dlm_node_addrs_spin); 1559 na = find_node_addr(nodeid); 1560 if (na) { 1561 list_del(&na->list); 1562 while (na->addr_count--) 1563 kfree(na->addr[na->addr_count]); 1564 kfree(na); 1565 } 1566 spin_unlock(&dlm_node_addrs_spin); 1567 1568 return 0; 1569 } 1570 1571 /* Receive workqueue function */ 1572 static void process_recv_sockets(struct work_struct *work) 1573 { 1574 struct connection *con = container_of(work, struct connection, rwork); 1575 int err; 1576 1577 clear_bit(CF_READ_PENDING, &con->flags); 1578 do { 1579 err = con->rx_action(con); 1580 } while (!err); 1581 } 1582 1583 /* Send workqueue function */ 1584 static void process_send_sockets(struct work_struct *work) 1585 { 1586 struct connection *con = container_of(work, struct connection, swork); 1587 1588 if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) { 1589 con->connect_action(con); 1590 set_bit(CF_WRITE_PENDING, &con->flags); 1591 } 1592 if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) 1593 send_to_sock(con); 1594 } 1595 1596 1597 /* Discard all entries on the write queues */ 1598 static void clean_writequeues(void) 1599 { 1600 foreach_conn(clean_one_writequeue); 1601 } 1602 1603 static void work_stop(void) 1604 { 1605 destroy_workqueue(recv_workqueue); 1606 destroy_workqueue(send_workqueue); 1607 } 1608 1609 static int work_start(void) 1610 { 1611 recv_workqueue = alloc_workqueue("dlm_recv", 1612 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1613 if (!recv_workqueue) { 1614 log_print("can't start dlm_recv"); 1615 return -ENOMEM; 1616 } 1617 1618 send_workqueue = alloc_workqueue("dlm_send", 1619 WQ_UNBOUND | WQ_MEM_RECLAIM, 1); 1620 if (!send_workqueue) { 1621 log_print("can't start dlm_send"); 1622 destroy_workqueue(recv_workqueue); 1623 return -ENOMEM; 1624 } 1625 1626 return 0; 1627 } 1628 1629 static void stop_conn(struct connection *con) 1630 { 1631 con->flags |= 0x0F; 1632 if (con->sock && con->sock->sk) 1633 con->sock->sk->sk_user_data = NULL; 1634 } 1635 1636 static void free_conn(struct connection *con) 1637 { 1638 close_connection(con, true); 1639 if (con->othercon) 1640 kmem_cache_free(con_cache, con->othercon); 1641 hlist_del(&con->list); 1642 kmem_cache_free(con_cache, con); 1643 } 1644 1645 void dlm_lowcomms_stop(void) 1646 { 1647 /* Set all the flags to prevent any 1648 socket activity. 1649 */ 1650 mutex_lock(&connections_lock); 1651 dlm_allow_conn = 0; 1652 foreach_conn(stop_conn); 1653 mutex_unlock(&connections_lock); 1654 1655 work_stop(); 1656 1657 mutex_lock(&connections_lock); 1658 clean_writequeues(); 1659 1660 foreach_conn(free_conn); 1661 1662 mutex_unlock(&connections_lock); 1663 kmem_cache_destroy(con_cache); 1664 } 1665 1666 int dlm_lowcomms_start(void) 1667 { 1668 int error = -EINVAL; 1669 struct connection *con; 1670 int i; 1671 1672 for (i = 0; i < CONN_HASH_SIZE; i++) 1673 INIT_HLIST_HEAD(&connection_hash[i]); 1674 1675 init_local(); 1676 if (!dlm_local_count) { 1677 error = -ENOTCONN; 1678 log_print("no local IP address has been set"); 1679 goto fail; 1680 } 1681 1682 error = -ENOMEM; 1683 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), 1684 __alignof__(struct connection), 0, 1685 NULL); 1686 if (!con_cache) 1687 goto fail; 1688 1689 error = work_start(); 1690 if (error) 1691 goto fail_destroy; 1692 1693 dlm_allow_conn = 1; 1694 1695 /* Start listening */ 1696 if (dlm_config.ci_protocol == 0) 1697 error = tcp_listen_for_all(); 1698 else 1699 error = sctp_listen_for_all(); 1700 if (error) 1701 goto fail_unlisten; 1702 1703 return 0; 1704 1705 fail_unlisten: 1706 dlm_allow_conn = 0; 1707 con = nodeid2con(0,0); 1708 if (con) { 1709 close_connection(con, false); 1710 kmem_cache_free(con_cache, con); 1711 } 1712 fail_destroy: 1713 kmem_cache_destroy(con_cache); 1714 fail: 1715 return error; 1716 } 1717 1718 void dlm_lowcomms_exit(void) 1719 { 1720 struct dlm_node_addr *na, *safe; 1721 1722 spin_lock(&dlm_node_addrs_spin); 1723 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) { 1724 list_del(&na->list); 1725 while (na->addr_count--) 1726 kfree(na->addr[na->addr_count]); 1727 kfree(na); 1728 } 1729 spin_unlock(&dlm_node_addrs_spin); 1730 } 1731