xref: /openbmc/linux/fs/dlm/lowcomms.c (revision 18df8a87)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is its
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * lowcomms will choose to use either TCP or SCTP as its transport layer
40  * depending on the configuration variable 'protocol'. This should be set
41  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42  * cluster-wide mechanism as it must be the same on all nodes of the cluster
43  * for the DLM to function.
44  *
45  */
46 
47 #include <asm/ioctls.h>
48 #include <net/sock.h>
49 #include <net/tcp.h>
50 #include <linux/pagemap.h>
51 #include <linux/file.h>
52 #include <linux/mutex.h>
53 #include <linux/sctp.h>
54 #include <linux/slab.h>
55 #include <net/sctp/sctp.h>
56 #include <net/ipv6.h>
57 
58 #include "dlm_internal.h"
59 #include "lowcomms.h"
60 #include "midcomms.h"
61 #include "config.h"
62 
63 #define NEEDED_RMEM (4*1024*1024)
64 #define CONN_HASH_SIZE 32
65 
66 /* Number of messages to send before rescheduling */
67 #define MAX_SEND_MSG_COUNT 25
68 
69 struct cbuf {
70 	unsigned int base;
71 	unsigned int len;
72 	unsigned int mask;
73 };
74 
75 static void cbuf_add(struct cbuf *cb, int n)
76 {
77 	cb->len += n;
78 }
79 
80 static int cbuf_data(struct cbuf *cb)
81 {
82 	return ((cb->base + cb->len) & cb->mask);
83 }
84 
85 static void cbuf_init(struct cbuf *cb, int size)
86 {
87 	cb->base = cb->len = 0;
88 	cb->mask = size-1;
89 }
90 
91 static void cbuf_eat(struct cbuf *cb, int n)
92 {
93 	cb->len  -= n;
94 	cb->base += n;
95 	cb->base &= cb->mask;
96 }
97 
98 static bool cbuf_empty(struct cbuf *cb)
99 {
100 	return cb->len == 0;
101 }
102 
103 struct connection {
104 	struct socket *sock;	/* NULL if not connected */
105 	uint32_t nodeid;	/* So we know who we are in the list */
106 	struct mutex sock_mutex;
107 	unsigned long flags;
108 #define CF_READ_PENDING 1
109 #define CF_WRITE_PENDING 2
110 #define CF_CONNECT_PENDING 3
111 #define CF_INIT_PENDING 4
112 #define CF_IS_OTHERCON 5
113 #define CF_CLOSE 6
114 #define CF_APP_LIMITED 7
115 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
116 	spinlock_t writequeue_lock;
117 	int (*rx_action) (struct connection *);	/* What to do when active */
118 	void (*connect_action) (struct connection *);	/* What to do to connect */
119 	struct page *rx_page;
120 	struct cbuf cb;
121 	int retries;
122 #define MAX_CONNECT_RETRIES 3
123 	struct hlist_node list;
124 	struct connection *othercon;
125 	struct work_struct rwork; /* Receive workqueue */
126 	struct work_struct swork; /* Send workqueue */
127 };
128 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
129 
130 /* An entry waiting to be sent */
131 struct writequeue_entry {
132 	struct list_head list;
133 	struct page *page;
134 	int offset;
135 	int len;
136 	int end;
137 	int users;
138 	struct connection *con;
139 };
140 
141 struct dlm_node_addr {
142 	struct list_head list;
143 	int nodeid;
144 	int addr_count;
145 	int curr_addr_index;
146 	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
147 };
148 
149 static LIST_HEAD(dlm_node_addrs);
150 static DEFINE_SPINLOCK(dlm_node_addrs_spin);
151 
152 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
153 static int dlm_local_count;
154 static int dlm_allow_conn;
155 
156 /* Work queues */
157 static struct workqueue_struct *recv_workqueue;
158 static struct workqueue_struct *send_workqueue;
159 
160 static struct hlist_head connection_hash[CONN_HASH_SIZE];
161 static DEFINE_MUTEX(connections_lock);
162 static struct kmem_cache *con_cache;
163 
164 static void process_recv_sockets(struct work_struct *work);
165 static void process_send_sockets(struct work_struct *work);
166 
167 
168 /* This is deliberately very simple because most clusters have simple
169    sequential nodeids, so we should be able to go straight to a connection
170    struct in the array */
171 static inline int nodeid_hash(int nodeid)
172 {
173 	return nodeid & (CONN_HASH_SIZE-1);
174 }
175 
176 static struct connection *__find_con(int nodeid)
177 {
178 	int r;
179 	struct connection *con;
180 
181 	r = nodeid_hash(nodeid);
182 
183 	hlist_for_each_entry(con, &connection_hash[r], list) {
184 		if (con->nodeid == nodeid)
185 			return con;
186 	}
187 	return NULL;
188 }
189 
190 /*
191  * If 'allocation' is zero then we don't attempt to create a new
192  * connection structure for this node.
193  */
194 static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
195 {
196 	struct connection *con = NULL;
197 	int r;
198 
199 	con = __find_con(nodeid);
200 	if (con || !alloc)
201 		return con;
202 
203 	con = kmem_cache_zalloc(con_cache, alloc);
204 	if (!con)
205 		return NULL;
206 
207 	r = nodeid_hash(nodeid);
208 	hlist_add_head(&con->list, &connection_hash[r]);
209 
210 	con->nodeid = nodeid;
211 	mutex_init(&con->sock_mutex);
212 	INIT_LIST_HEAD(&con->writequeue);
213 	spin_lock_init(&con->writequeue_lock);
214 	INIT_WORK(&con->swork, process_send_sockets);
215 	INIT_WORK(&con->rwork, process_recv_sockets);
216 
217 	/* Setup action pointers for child sockets */
218 	if (con->nodeid) {
219 		struct connection *zerocon = __find_con(0);
220 
221 		con->connect_action = zerocon->connect_action;
222 		if (!con->rx_action)
223 			con->rx_action = zerocon->rx_action;
224 	}
225 
226 	return con;
227 }
228 
229 /* Loop round all connections */
230 static void foreach_conn(void (*conn_func)(struct connection *c))
231 {
232 	int i;
233 	struct hlist_node *n;
234 	struct connection *con;
235 
236 	for (i = 0; i < CONN_HASH_SIZE; i++) {
237 		hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
238 			conn_func(con);
239 	}
240 }
241 
242 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
243 {
244 	struct connection *con;
245 
246 	mutex_lock(&connections_lock);
247 	con = __nodeid2con(nodeid, allocation);
248 	mutex_unlock(&connections_lock);
249 
250 	return con;
251 }
252 
253 static struct dlm_node_addr *find_node_addr(int nodeid)
254 {
255 	struct dlm_node_addr *na;
256 
257 	list_for_each_entry(na, &dlm_node_addrs, list) {
258 		if (na->nodeid == nodeid)
259 			return na;
260 	}
261 	return NULL;
262 }
263 
264 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
265 {
266 	switch (x->ss_family) {
267 	case AF_INET: {
268 		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
269 		struct sockaddr_in *siny = (struct sockaddr_in *)y;
270 		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
271 			return 0;
272 		if (sinx->sin_port != siny->sin_port)
273 			return 0;
274 		break;
275 	}
276 	case AF_INET6: {
277 		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
278 		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
279 		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
280 			return 0;
281 		if (sinx->sin6_port != siny->sin6_port)
282 			return 0;
283 		break;
284 	}
285 	default:
286 		return 0;
287 	}
288 	return 1;
289 }
290 
291 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
292 			  struct sockaddr *sa_out, bool try_new_addr)
293 {
294 	struct sockaddr_storage sas;
295 	struct dlm_node_addr *na;
296 
297 	if (!dlm_local_count)
298 		return -1;
299 
300 	spin_lock(&dlm_node_addrs_spin);
301 	na = find_node_addr(nodeid);
302 	if (na && na->addr_count) {
303 		memcpy(&sas, na->addr[na->curr_addr_index],
304 		       sizeof(struct sockaddr_storage));
305 
306 		if (try_new_addr) {
307 			na->curr_addr_index++;
308 			if (na->curr_addr_index == na->addr_count)
309 				na->curr_addr_index = 0;
310 		}
311 	}
312 	spin_unlock(&dlm_node_addrs_spin);
313 
314 	if (!na)
315 		return -EEXIST;
316 
317 	if (!na->addr_count)
318 		return -ENOENT;
319 
320 	if (sas_out)
321 		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
322 
323 	if (!sa_out)
324 		return 0;
325 
326 	if (dlm_local_addr[0]->ss_family == AF_INET) {
327 		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
328 		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
329 		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
330 	} else {
331 		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
332 		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
333 		ret6->sin6_addr = in6->sin6_addr;
334 	}
335 
336 	return 0;
337 }
338 
339 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
340 {
341 	struct dlm_node_addr *na;
342 	int rv = -EEXIST;
343 	int addr_i;
344 
345 	spin_lock(&dlm_node_addrs_spin);
346 	list_for_each_entry(na, &dlm_node_addrs, list) {
347 		if (!na->addr_count)
348 			continue;
349 
350 		for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
351 			if (addr_compare(na->addr[addr_i], addr)) {
352 				*nodeid = na->nodeid;
353 				rv = 0;
354 				goto unlock;
355 			}
356 		}
357 	}
358 unlock:
359 	spin_unlock(&dlm_node_addrs_spin);
360 	return rv;
361 }
362 
363 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
364 {
365 	struct sockaddr_storage *new_addr;
366 	struct dlm_node_addr *new_node, *na;
367 
368 	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
369 	if (!new_node)
370 		return -ENOMEM;
371 
372 	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
373 	if (!new_addr) {
374 		kfree(new_node);
375 		return -ENOMEM;
376 	}
377 
378 	memcpy(new_addr, addr, len);
379 
380 	spin_lock(&dlm_node_addrs_spin);
381 	na = find_node_addr(nodeid);
382 	if (!na) {
383 		new_node->nodeid = nodeid;
384 		new_node->addr[0] = new_addr;
385 		new_node->addr_count = 1;
386 		list_add(&new_node->list, &dlm_node_addrs);
387 		spin_unlock(&dlm_node_addrs_spin);
388 		return 0;
389 	}
390 
391 	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
392 		spin_unlock(&dlm_node_addrs_spin);
393 		kfree(new_addr);
394 		kfree(new_node);
395 		return -ENOSPC;
396 	}
397 
398 	na->addr[na->addr_count++] = new_addr;
399 	spin_unlock(&dlm_node_addrs_spin);
400 	kfree(new_node);
401 	return 0;
402 }
403 
404 /* Data available on socket or listen socket received a connect */
405 static void lowcomms_data_ready(struct sock *sk)
406 {
407 	struct connection *con = sock2con(sk);
408 	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
409 		queue_work(recv_workqueue, &con->rwork);
410 }
411 
412 static void lowcomms_write_space(struct sock *sk)
413 {
414 	struct connection *con = sock2con(sk);
415 
416 	if (!con)
417 		return;
418 
419 	clear_bit(SOCK_NOSPACE, &con->sock->flags);
420 
421 	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
422 		con->sock->sk->sk_write_pending--;
423 		clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
424 	}
425 
426 	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
427 		queue_work(send_workqueue, &con->swork);
428 }
429 
430 static inline void lowcomms_connect_sock(struct connection *con)
431 {
432 	if (test_bit(CF_CLOSE, &con->flags))
433 		return;
434 	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
435 		queue_work(send_workqueue, &con->swork);
436 }
437 
438 static void lowcomms_state_change(struct sock *sk)
439 {
440 	/* SCTP layer is not calling sk_data_ready when the connection
441 	 * is done, so we catch the signal through here. Also, it
442 	 * doesn't switch socket state when entering shutdown, so we
443 	 * skip the write in that case.
444 	 */
445 	if (sk->sk_shutdown) {
446 		if (sk->sk_shutdown == RCV_SHUTDOWN)
447 			lowcomms_data_ready(sk);
448 	} else if (sk->sk_state == TCP_ESTABLISHED) {
449 		lowcomms_write_space(sk);
450 	}
451 }
452 
453 int dlm_lowcomms_connect_node(int nodeid)
454 {
455 	struct connection *con;
456 
457 	if (nodeid == dlm_our_nodeid())
458 		return 0;
459 
460 	con = nodeid2con(nodeid, GFP_NOFS);
461 	if (!con)
462 		return -ENOMEM;
463 	lowcomms_connect_sock(con);
464 	return 0;
465 }
466 
467 /* Make a socket active */
468 static void add_sock(struct socket *sock, struct connection *con)
469 {
470 	con->sock = sock;
471 
472 	/* Install a data_ready callback */
473 	con->sock->sk->sk_data_ready = lowcomms_data_ready;
474 	con->sock->sk->sk_write_space = lowcomms_write_space;
475 	con->sock->sk->sk_state_change = lowcomms_state_change;
476 	con->sock->sk->sk_user_data = con;
477 	con->sock->sk->sk_allocation = GFP_NOFS;
478 }
479 
480 /* Add the port number to an IPv6 or 4 sockaddr and return the address
481    length */
482 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
483 			  int *addr_len)
484 {
485 	saddr->ss_family =  dlm_local_addr[0]->ss_family;
486 	if (saddr->ss_family == AF_INET) {
487 		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
488 		in4_addr->sin_port = cpu_to_be16(port);
489 		*addr_len = sizeof(struct sockaddr_in);
490 		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
491 	} else {
492 		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
493 		in6_addr->sin6_port = cpu_to_be16(port);
494 		*addr_len = sizeof(struct sockaddr_in6);
495 	}
496 	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
497 }
498 
499 /* Close a remote connection and tidy up */
500 static void close_connection(struct connection *con, bool and_other,
501 			     bool tx, bool rx)
502 {
503 	clear_bit(CF_CONNECT_PENDING, &con->flags);
504 	clear_bit(CF_WRITE_PENDING, &con->flags);
505 	if (tx && cancel_work_sync(&con->swork))
506 		log_print("canceled swork for node %d", con->nodeid);
507 	if (rx && cancel_work_sync(&con->rwork))
508 		log_print("canceled rwork for node %d", con->nodeid);
509 
510 	mutex_lock(&con->sock_mutex);
511 	if (con->sock) {
512 		sock_release(con->sock);
513 		con->sock = NULL;
514 	}
515 	if (con->othercon && and_other) {
516 		/* Will only re-enter once. */
517 		close_connection(con->othercon, false, true, true);
518 	}
519 	if (con->rx_page) {
520 		__free_page(con->rx_page);
521 		con->rx_page = NULL;
522 	}
523 
524 	con->retries = 0;
525 	mutex_unlock(&con->sock_mutex);
526 }
527 
528 /* Data received from remote end */
529 static int receive_from_sock(struct connection *con)
530 {
531 	int ret = 0;
532 	struct msghdr msg = {};
533 	struct kvec iov[2];
534 	unsigned len;
535 	int r;
536 	int call_again_soon = 0;
537 	int nvec;
538 
539 	mutex_lock(&con->sock_mutex);
540 
541 	if (con->sock == NULL) {
542 		ret = -EAGAIN;
543 		goto out_close;
544 	}
545 	if (con->nodeid == 0) {
546 		ret = -EINVAL;
547 		goto out_close;
548 	}
549 
550 	if (con->rx_page == NULL) {
551 		/*
552 		 * This doesn't need to be atomic, but I think it should
553 		 * improve performance if it is.
554 		 */
555 		con->rx_page = alloc_page(GFP_ATOMIC);
556 		if (con->rx_page == NULL)
557 			goto out_resched;
558 		cbuf_init(&con->cb, PAGE_CACHE_SIZE);
559 	}
560 
561 	/*
562 	 * iov[0] is the bit of the circular buffer between the current end
563 	 * point (cb.base + cb.len) and the end of the buffer.
564 	 */
565 	iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
566 	iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
567 	iov[1].iov_len = 0;
568 	nvec = 1;
569 
570 	/*
571 	 * iov[1] is the bit of the circular buffer between the start of the
572 	 * buffer and the start of the currently used section (cb.base)
573 	 */
574 	if (cbuf_data(&con->cb) >= con->cb.base) {
575 		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
576 		iov[1].iov_len = con->cb.base;
577 		iov[1].iov_base = page_address(con->rx_page);
578 		nvec = 2;
579 	}
580 	len = iov[0].iov_len + iov[1].iov_len;
581 
582 	r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
583 			       MSG_DONTWAIT | MSG_NOSIGNAL);
584 	if (ret <= 0)
585 		goto out_close;
586 	else if (ret == len)
587 		call_again_soon = 1;
588 
589 	cbuf_add(&con->cb, ret);
590 	ret = dlm_process_incoming_buffer(con->nodeid,
591 					  page_address(con->rx_page),
592 					  con->cb.base, con->cb.len,
593 					  PAGE_CACHE_SIZE);
594 	if (ret == -EBADMSG) {
595 		log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
596 			  page_address(con->rx_page), con->cb.base,
597 			  con->cb.len, r);
598 	}
599 	if (ret < 0)
600 		goto out_close;
601 	cbuf_eat(&con->cb, ret);
602 
603 	if (cbuf_empty(&con->cb) && !call_again_soon) {
604 		__free_page(con->rx_page);
605 		con->rx_page = NULL;
606 	}
607 
608 	if (call_again_soon)
609 		goto out_resched;
610 	mutex_unlock(&con->sock_mutex);
611 	return 0;
612 
613 out_resched:
614 	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
615 		queue_work(recv_workqueue, &con->rwork);
616 	mutex_unlock(&con->sock_mutex);
617 	return -EAGAIN;
618 
619 out_close:
620 	mutex_unlock(&con->sock_mutex);
621 	if (ret != -EAGAIN) {
622 		close_connection(con, false, true, false);
623 		/* Reconnect when there is something to send */
624 	}
625 	/* Don't return success if we really got EOF */
626 	if (ret == 0)
627 		ret = -EAGAIN;
628 
629 	return ret;
630 }
631 
632 /* Listening socket is busy, accept a connection */
633 static int tcp_accept_from_sock(struct connection *con)
634 {
635 	int result;
636 	struct sockaddr_storage peeraddr;
637 	struct socket *newsock;
638 	int len;
639 	int nodeid;
640 	struct connection *newcon;
641 	struct connection *addcon;
642 
643 	mutex_lock(&connections_lock);
644 	if (!dlm_allow_conn) {
645 		mutex_unlock(&connections_lock);
646 		return -1;
647 	}
648 	mutex_unlock(&connections_lock);
649 
650 	memset(&peeraddr, 0, sizeof(peeraddr));
651 	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
652 				  SOCK_STREAM, IPPROTO_TCP, &newsock);
653 	if (result < 0)
654 		return -ENOMEM;
655 
656 	mutex_lock_nested(&con->sock_mutex, 0);
657 
658 	result = -ENOTCONN;
659 	if (con->sock == NULL)
660 		goto accept_err;
661 
662 	newsock->type = con->sock->type;
663 	newsock->ops = con->sock->ops;
664 
665 	result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
666 	if (result < 0)
667 		goto accept_err;
668 
669 	/* Get the connected socket's peer */
670 	memset(&peeraddr, 0, sizeof(peeraddr));
671 	if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
672 				  &len, 2)) {
673 		result = -ECONNABORTED;
674 		goto accept_err;
675 	}
676 
677 	/* Get the new node's NODEID */
678 	make_sockaddr(&peeraddr, 0, &len);
679 	if (addr_to_nodeid(&peeraddr, &nodeid)) {
680 		unsigned char *b=(unsigned char *)&peeraddr;
681 		log_print("connect from non cluster node");
682 		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
683 				     b, sizeof(struct sockaddr_storage));
684 		sock_release(newsock);
685 		mutex_unlock(&con->sock_mutex);
686 		return -1;
687 	}
688 
689 	log_print("got connection from %d", nodeid);
690 
691 	/*  Check to see if we already have a connection to this node. This
692 	 *  could happen if the two nodes initiate a connection at roughly
693 	 *  the same time and the connections cross on the wire.
694 	 *  In this case we store the incoming one in "othercon"
695 	 */
696 	newcon = nodeid2con(nodeid, GFP_NOFS);
697 	if (!newcon) {
698 		result = -ENOMEM;
699 		goto accept_err;
700 	}
701 	mutex_lock_nested(&newcon->sock_mutex, 1);
702 	if (newcon->sock) {
703 		struct connection *othercon = newcon->othercon;
704 
705 		if (!othercon) {
706 			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
707 			if (!othercon) {
708 				log_print("failed to allocate incoming socket");
709 				mutex_unlock(&newcon->sock_mutex);
710 				result = -ENOMEM;
711 				goto accept_err;
712 			}
713 			othercon->nodeid = nodeid;
714 			othercon->rx_action = receive_from_sock;
715 			mutex_init(&othercon->sock_mutex);
716 			INIT_WORK(&othercon->swork, process_send_sockets);
717 			INIT_WORK(&othercon->rwork, process_recv_sockets);
718 			set_bit(CF_IS_OTHERCON, &othercon->flags);
719 		}
720 		if (!othercon->sock) {
721 			newcon->othercon = othercon;
722 			othercon->sock = newsock;
723 			newsock->sk->sk_user_data = othercon;
724 			add_sock(newsock, othercon);
725 			addcon = othercon;
726 		}
727 		else {
728 			printk("Extra connection from node %d attempted\n", nodeid);
729 			result = -EAGAIN;
730 			mutex_unlock(&newcon->sock_mutex);
731 			goto accept_err;
732 		}
733 	}
734 	else {
735 		newsock->sk->sk_user_data = newcon;
736 		newcon->rx_action = receive_from_sock;
737 		add_sock(newsock, newcon);
738 		addcon = newcon;
739 	}
740 
741 	mutex_unlock(&newcon->sock_mutex);
742 
743 	/*
744 	 * Add it to the active queue in case we got data
745 	 * between processing the accept adding the socket
746 	 * to the read_sockets list
747 	 */
748 	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
749 		queue_work(recv_workqueue, &addcon->rwork);
750 	mutex_unlock(&con->sock_mutex);
751 
752 	return 0;
753 
754 accept_err:
755 	mutex_unlock(&con->sock_mutex);
756 	sock_release(newsock);
757 
758 	if (result != -EAGAIN)
759 		log_print("error accepting connection from node: %d", result);
760 	return result;
761 }
762 
763 static int sctp_accept_from_sock(struct connection *con)
764 {
765 	/* Check that the new node is in the lockspace */
766 	struct sctp_prim prim;
767 	int nodeid;
768 	int prim_len, ret;
769 	int addr_len;
770 	struct connection *newcon;
771 	struct connection *addcon;
772 	struct socket *newsock;
773 
774 	mutex_lock(&connections_lock);
775 	if (!dlm_allow_conn) {
776 		mutex_unlock(&connections_lock);
777 		return -1;
778 	}
779 	mutex_unlock(&connections_lock);
780 
781 	mutex_lock_nested(&con->sock_mutex, 0);
782 
783 	ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
784 	if (ret < 0)
785 		goto accept_err;
786 
787 	memset(&prim, 0, sizeof(struct sctp_prim));
788 	prim_len = sizeof(struct sctp_prim);
789 
790 	ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
791 				(char *)&prim, &prim_len);
792 	if (ret < 0) {
793 		log_print("getsockopt/sctp_primary_addr failed: %d", ret);
794 		goto accept_err;
795 	}
796 
797 	make_sockaddr(&prim.ssp_addr, 0, &addr_len);
798 	if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
799 		unsigned char *b = (unsigned char *)&prim.ssp_addr;
800 
801 		log_print("reject connect from unknown addr");
802 		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
803 				     b, sizeof(struct sockaddr_storage));
804 		goto accept_err;
805 	}
806 
807 	newcon = nodeid2con(nodeid, GFP_NOFS);
808 	if (!newcon) {
809 		ret = -ENOMEM;
810 		goto accept_err;
811 	}
812 
813 	mutex_lock_nested(&newcon->sock_mutex, 1);
814 
815 	if (newcon->sock) {
816 		struct connection *othercon = newcon->othercon;
817 
818 		if (!othercon) {
819 			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
820 			if (!othercon) {
821 				log_print("failed to allocate incoming socket");
822 				mutex_unlock(&newcon->sock_mutex);
823 				ret = -ENOMEM;
824 				goto accept_err;
825 			}
826 			othercon->nodeid = nodeid;
827 			othercon->rx_action = receive_from_sock;
828 			mutex_init(&othercon->sock_mutex);
829 			INIT_WORK(&othercon->swork, process_send_sockets);
830 			INIT_WORK(&othercon->rwork, process_recv_sockets);
831 			set_bit(CF_IS_OTHERCON, &othercon->flags);
832 		}
833 		if (!othercon->sock) {
834 			newcon->othercon = othercon;
835 			othercon->sock = newsock;
836 			newsock->sk->sk_user_data = othercon;
837 			add_sock(newsock, othercon);
838 			addcon = othercon;
839 		} else {
840 			printk("Extra connection from node %d attempted\n", nodeid);
841 			ret = -EAGAIN;
842 			mutex_unlock(&newcon->sock_mutex);
843 			goto accept_err;
844 		}
845 	} else {
846 		newsock->sk->sk_user_data = newcon;
847 		newcon->rx_action = receive_from_sock;
848 		add_sock(newsock, newcon);
849 		addcon = newcon;
850 	}
851 
852 	log_print("connected to %d", nodeid);
853 
854 	mutex_unlock(&newcon->sock_mutex);
855 
856 	/*
857 	 * Add it to the active queue in case we got data
858 	 * between processing the accept adding the socket
859 	 * to the read_sockets list
860 	 */
861 	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
862 		queue_work(recv_workqueue, &addcon->rwork);
863 	mutex_unlock(&con->sock_mutex);
864 
865 	return 0;
866 
867 accept_err:
868 	mutex_unlock(&con->sock_mutex);
869 	if (newsock)
870 		sock_release(newsock);
871 	if (ret != -EAGAIN)
872 		log_print("error accepting connection from node: %d", ret);
873 
874 	return ret;
875 }
876 
877 static void free_entry(struct writequeue_entry *e)
878 {
879 	__free_page(e->page);
880 	kfree(e);
881 }
882 
883 /*
884  * writequeue_entry_complete - try to delete and free write queue entry
885  * @e: write queue entry to try to delete
886  * @completed: bytes completed
887  *
888  * writequeue_lock must be held.
889  */
890 static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
891 {
892 	e->offset += completed;
893 	e->len -= completed;
894 
895 	if (e->len == 0 && e->users == 0) {
896 		list_del(&e->list);
897 		free_entry(e);
898 	}
899 }
900 
901 /*
902  * sctp_bind_addrs - bind a SCTP socket to all our addresses
903  */
904 static int sctp_bind_addrs(struct connection *con, uint16_t port)
905 {
906 	struct sockaddr_storage localaddr;
907 	int i, addr_len, result = 0;
908 
909 	for (i = 0; i < dlm_local_count; i++) {
910 		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
911 		make_sockaddr(&localaddr, port, &addr_len);
912 
913 		if (!i)
914 			result = kernel_bind(con->sock,
915 					     (struct sockaddr *)&localaddr,
916 					     addr_len);
917 		else
918 			result = kernel_setsockopt(con->sock, SOL_SCTP,
919 						   SCTP_SOCKOPT_BINDX_ADD,
920 						   (char *)&localaddr, addr_len);
921 
922 		if (result < 0) {
923 			log_print("Can't bind to %d addr number %d, %d.\n",
924 				  port, i + 1, result);
925 			break;
926 		}
927 	}
928 	return result;
929 }
930 
931 /* Initiate an SCTP association.
932    This is a special case of send_to_sock() in that we don't yet have a
933    peeled-off socket for this association, so we use the listening socket
934    and add the primary IP address of the remote node.
935  */
936 static void sctp_connect_to_sock(struct connection *con)
937 {
938 	struct sockaddr_storage daddr;
939 	int one = 1;
940 	int result;
941 	int addr_len;
942 	struct socket *sock;
943 
944 	if (con->nodeid == 0) {
945 		log_print("attempt to connect sock 0 foiled");
946 		return;
947 	}
948 
949 	mutex_lock(&con->sock_mutex);
950 
951 	/* Some odd races can cause double-connects, ignore them */
952 	if (con->retries++ > MAX_CONNECT_RETRIES)
953 		goto out;
954 
955 	if (con->sock) {
956 		log_print("node %d already connected.", con->nodeid);
957 		goto out;
958 	}
959 
960 	memset(&daddr, 0, sizeof(daddr));
961 	result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
962 	if (result < 0) {
963 		log_print("no address for nodeid %d", con->nodeid);
964 		goto out;
965 	}
966 
967 	/* Create a socket to communicate with */
968 	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
969 				  SOCK_STREAM, IPPROTO_SCTP, &sock);
970 	if (result < 0)
971 		goto socket_err;
972 
973 	sock->sk->sk_user_data = con;
974 	con->rx_action = receive_from_sock;
975 	con->connect_action = sctp_connect_to_sock;
976 	add_sock(sock, con);
977 
978 	/* Bind to all addresses. */
979 	if (sctp_bind_addrs(con, 0))
980 		goto bind_err;
981 
982 	make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
983 
984 	log_print("connecting to %d", con->nodeid);
985 
986 	/* Turn off Nagle's algorithm */
987 	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
988 			  sizeof(one));
989 
990 	result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
991 				   O_NONBLOCK);
992 	if (result == -EINPROGRESS)
993 		result = 0;
994 	if (result == 0)
995 		goto out;
996 
997 
998 bind_err:
999 	con->sock = NULL;
1000 	sock_release(sock);
1001 
1002 socket_err:
1003 	/*
1004 	 * Some errors are fatal and this list might need adjusting. For other
1005 	 * errors we try again until the max number of retries is reached.
1006 	 */
1007 	if (result != -EHOSTUNREACH &&
1008 	    result != -ENETUNREACH &&
1009 	    result != -ENETDOWN &&
1010 	    result != -EINVAL &&
1011 	    result != -EPROTONOSUPPORT) {
1012 		log_print("connect %d try %d error %d", con->nodeid,
1013 			  con->retries, result);
1014 		mutex_unlock(&con->sock_mutex);
1015 		msleep(1000);
1016 		clear_bit(CF_CONNECT_PENDING, &con->flags);
1017 		lowcomms_connect_sock(con);
1018 		return;
1019 	}
1020 
1021 out:
1022 	mutex_unlock(&con->sock_mutex);
1023 	set_bit(CF_WRITE_PENDING, &con->flags);
1024 }
1025 
1026 /* Connect a new socket to its peer */
1027 static void tcp_connect_to_sock(struct connection *con)
1028 {
1029 	struct sockaddr_storage saddr, src_addr;
1030 	int addr_len;
1031 	struct socket *sock = NULL;
1032 	int one = 1;
1033 	int result;
1034 
1035 	if (con->nodeid == 0) {
1036 		log_print("attempt to connect sock 0 foiled");
1037 		return;
1038 	}
1039 
1040 	mutex_lock(&con->sock_mutex);
1041 	if (con->retries++ > MAX_CONNECT_RETRIES)
1042 		goto out;
1043 
1044 	/* Some odd races can cause double-connects, ignore them */
1045 	if (con->sock)
1046 		goto out;
1047 
1048 	/* Create a socket to communicate with */
1049 	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1050 				  SOCK_STREAM, IPPROTO_TCP, &sock);
1051 	if (result < 0)
1052 		goto out_err;
1053 
1054 	memset(&saddr, 0, sizeof(saddr));
1055 	result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1056 	if (result < 0) {
1057 		log_print("no address for nodeid %d", con->nodeid);
1058 		goto out_err;
1059 	}
1060 
1061 	sock->sk->sk_user_data = con;
1062 	con->rx_action = receive_from_sock;
1063 	con->connect_action = tcp_connect_to_sock;
1064 	add_sock(sock, con);
1065 
1066 	/* Bind to our cluster-known address connecting to avoid
1067 	   routing problems */
1068 	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1069 	make_sockaddr(&src_addr, 0, &addr_len);
1070 	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1071 				 addr_len);
1072 	if (result < 0) {
1073 		log_print("could not bind for connect: %d", result);
1074 		/* This *may* not indicate a critical error */
1075 	}
1076 
1077 	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1078 
1079 	log_print("connecting to %d", con->nodeid);
1080 
1081 	/* Turn off Nagle's algorithm */
1082 	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1083 			  sizeof(one));
1084 
1085 	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1086 				   O_NONBLOCK);
1087 	if (result == -EINPROGRESS)
1088 		result = 0;
1089 	if (result == 0)
1090 		goto out;
1091 
1092 out_err:
1093 	if (con->sock) {
1094 		sock_release(con->sock);
1095 		con->sock = NULL;
1096 	} else if (sock) {
1097 		sock_release(sock);
1098 	}
1099 	/*
1100 	 * Some errors are fatal and this list might need adjusting. For other
1101 	 * errors we try again until the max number of retries is reached.
1102 	 */
1103 	if (result != -EHOSTUNREACH &&
1104 	    result != -ENETUNREACH &&
1105 	    result != -ENETDOWN &&
1106 	    result != -EINVAL &&
1107 	    result != -EPROTONOSUPPORT) {
1108 		log_print("connect %d try %d error %d", con->nodeid,
1109 			  con->retries, result);
1110 		mutex_unlock(&con->sock_mutex);
1111 		msleep(1000);
1112 		clear_bit(CF_CONNECT_PENDING, &con->flags);
1113 		lowcomms_connect_sock(con);
1114 		return;
1115 	}
1116 out:
1117 	mutex_unlock(&con->sock_mutex);
1118 	set_bit(CF_WRITE_PENDING, &con->flags);
1119 	return;
1120 }
1121 
1122 static struct socket *tcp_create_listen_sock(struct connection *con,
1123 					     struct sockaddr_storage *saddr)
1124 {
1125 	struct socket *sock = NULL;
1126 	int result = 0;
1127 	int one = 1;
1128 	int addr_len;
1129 
1130 	if (dlm_local_addr[0]->ss_family == AF_INET)
1131 		addr_len = sizeof(struct sockaddr_in);
1132 	else
1133 		addr_len = sizeof(struct sockaddr_in6);
1134 
1135 	/* Create a socket to communicate with */
1136 	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1137 				  SOCK_STREAM, IPPROTO_TCP, &sock);
1138 	if (result < 0) {
1139 		log_print("Can't create listening comms socket");
1140 		goto create_out;
1141 	}
1142 
1143 	/* Turn off Nagle's algorithm */
1144 	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1145 			  sizeof(one));
1146 
1147 	result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1148 				   (char *)&one, sizeof(one));
1149 
1150 	if (result < 0) {
1151 		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1152 	}
1153 	con->rx_action = tcp_accept_from_sock;
1154 	con->connect_action = tcp_connect_to_sock;
1155 
1156 	/* Bind to our port */
1157 	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1158 	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1159 	if (result < 0) {
1160 		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1161 		sock_release(sock);
1162 		sock = NULL;
1163 		con->sock = NULL;
1164 		goto create_out;
1165 	}
1166 	result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1167 				 (char *)&one, sizeof(one));
1168 	if (result < 0) {
1169 		log_print("Set keepalive failed: %d", result);
1170 	}
1171 
1172 	result = sock->ops->listen(sock, 5);
1173 	if (result < 0) {
1174 		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1175 		sock_release(sock);
1176 		sock = NULL;
1177 		goto create_out;
1178 	}
1179 
1180 create_out:
1181 	return sock;
1182 }
1183 
1184 /* Get local addresses */
1185 static void init_local(void)
1186 {
1187 	struct sockaddr_storage sas, *addr;
1188 	int i;
1189 
1190 	dlm_local_count = 0;
1191 	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1192 		if (dlm_our_addr(&sas, i))
1193 			break;
1194 
1195 		addr = kmalloc(sizeof(*addr), GFP_NOFS);
1196 		if (!addr)
1197 			break;
1198 		memcpy(addr, &sas, sizeof(*addr));
1199 		dlm_local_addr[dlm_local_count++] = addr;
1200 	}
1201 }
1202 
1203 /* Initialise SCTP socket and bind to all interfaces */
1204 static int sctp_listen_for_all(void)
1205 {
1206 	struct socket *sock = NULL;
1207 	int result = -EINVAL;
1208 	struct connection *con = nodeid2con(0, GFP_NOFS);
1209 	int bufsize = NEEDED_RMEM;
1210 	int one = 1;
1211 
1212 	if (!con)
1213 		return -ENOMEM;
1214 
1215 	log_print("Using SCTP for communications");
1216 
1217 	result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1218 				  SOCK_STREAM, IPPROTO_SCTP, &sock);
1219 	if (result < 0) {
1220 		log_print("Can't create comms socket, check SCTP is loaded");
1221 		goto out;
1222 	}
1223 
1224 	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1225 				 (char *)&bufsize, sizeof(bufsize));
1226 	if (result)
1227 		log_print("Error increasing buffer space on socket %d", result);
1228 
1229 	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1230 				   sizeof(one));
1231 	if (result < 0)
1232 		log_print("Could not set SCTP NODELAY error %d\n", result);
1233 
1234 	/* Init con struct */
1235 	sock->sk->sk_user_data = con;
1236 	con->sock = sock;
1237 	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1238 	con->rx_action = sctp_accept_from_sock;
1239 	con->connect_action = sctp_connect_to_sock;
1240 
1241 	/* Bind to all addresses. */
1242 	if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1243 		goto create_delsock;
1244 
1245 	result = sock->ops->listen(sock, 5);
1246 	if (result < 0) {
1247 		log_print("Can't set socket listening");
1248 		goto create_delsock;
1249 	}
1250 
1251 	return 0;
1252 
1253 create_delsock:
1254 	sock_release(sock);
1255 	con->sock = NULL;
1256 out:
1257 	return result;
1258 }
1259 
1260 static int tcp_listen_for_all(void)
1261 {
1262 	struct socket *sock = NULL;
1263 	struct connection *con = nodeid2con(0, GFP_NOFS);
1264 	int result = -EINVAL;
1265 
1266 	if (!con)
1267 		return -ENOMEM;
1268 
1269 	/* We don't support multi-homed hosts */
1270 	if (dlm_local_addr[1] != NULL) {
1271 		log_print("TCP protocol can't handle multi-homed hosts, "
1272 			  "try SCTP");
1273 		return -EINVAL;
1274 	}
1275 
1276 	log_print("Using TCP for communications");
1277 
1278 	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1279 	if (sock) {
1280 		add_sock(sock, con);
1281 		result = 0;
1282 	}
1283 	else {
1284 		result = -EADDRINUSE;
1285 	}
1286 
1287 	return result;
1288 }
1289 
1290 
1291 
1292 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1293 						     gfp_t allocation)
1294 {
1295 	struct writequeue_entry *entry;
1296 
1297 	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1298 	if (!entry)
1299 		return NULL;
1300 
1301 	entry->page = alloc_page(allocation);
1302 	if (!entry->page) {
1303 		kfree(entry);
1304 		return NULL;
1305 	}
1306 
1307 	entry->offset = 0;
1308 	entry->len = 0;
1309 	entry->end = 0;
1310 	entry->users = 0;
1311 	entry->con = con;
1312 
1313 	return entry;
1314 }
1315 
1316 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1317 {
1318 	struct connection *con;
1319 	struct writequeue_entry *e;
1320 	int offset = 0;
1321 
1322 	con = nodeid2con(nodeid, allocation);
1323 	if (!con)
1324 		return NULL;
1325 
1326 	spin_lock(&con->writequeue_lock);
1327 	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1328 	if ((&e->list == &con->writequeue) ||
1329 	    (PAGE_CACHE_SIZE - e->end < len)) {
1330 		e = NULL;
1331 	} else {
1332 		offset = e->end;
1333 		e->end += len;
1334 		e->users++;
1335 	}
1336 	spin_unlock(&con->writequeue_lock);
1337 
1338 	if (e) {
1339 	got_one:
1340 		*ppc = page_address(e->page) + offset;
1341 		return e;
1342 	}
1343 
1344 	e = new_writequeue_entry(con, allocation);
1345 	if (e) {
1346 		spin_lock(&con->writequeue_lock);
1347 		offset = e->end;
1348 		e->end += len;
1349 		e->users++;
1350 		list_add_tail(&e->list, &con->writequeue);
1351 		spin_unlock(&con->writequeue_lock);
1352 		goto got_one;
1353 	}
1354 	return NULL;
1355 }
1356 
1357 void dlm_lowcomms_commit_buffer(void *mh)
1358 {
1359 	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1360 	struct connection *con = e->con;
1361 	int users;
1362 
1363 	spin_lock(&con->writequeue_lock);
1364 	users = --e->users;
1365 	if (users)
1366 		goto out;
1367 	e->len = e->end - e->offset;
1368 	spin_unlock(&con->writequeue_lock);
1369 
1370 	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1371 		queue_work(send_workqueue, &con->swork);
1372 	}
1373 	return;
1374 
1375 out:
1376 	spin_unlock(&con->writequeue_lock);
1377 	return;
1378 }
1379 
1380 /* Send a message */
1381 static void send_to_sock(struct connection *con)
1382 {
1383 	int ret = 0;
1384 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1385 	struct writequeue_entry *e;
1386 	int len, offset;
1387 	int count = 0;
1388 
1389 	mutex_lock(&con->sock_mutex);
1390 	if (con->sock == NULL)
1391 		goto out_connect;
1392 
1393 	spin_lock(&con->writequeue_lock);
1394 	for (;;) {
1395 		e = list_entry(con->writequeue.next, struct writequeue_entry,
1396 			       list);
1397 		if ((struct list_head *) e == &con->writequeue)
1398 			break;
1399 
1400 		len = e->len;
1401 		offset = e->offset;
1402 		BUG_ON(len == 0 && e->users == 0);
1403 		spin_unlock(&con->writequeue_lock);
1404 
1405 		ret = 0;
1406 		if (len) {
1407 			ret = kernel_sendpage(con->sock, e->page, offset, len,
1408 					      msg_flags);
1409 			if (ret == -EAGAIN || ret == 0) {
1410 				if (ret == -EAGAIN &&
1411 				    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1412 				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1413 					/* Notify TCP that we're limited by the
1414 					 * application window size.
1415 					 */
1416 					set_bit(SOCK_NOSPACE, &con->sock->flags);
1417 					con->sock->sk->sk_write_pending++;
1418 				}
1419 				cond_resched();
1420 				goto out;
1421 			} else if (ret < 0)
1422 				goto send_error;
1423 		}
1424 
1425 		/* Don't starve people filling buffers */
1426 		if (++count >= MAX_SEND_MSG_COUNT) {
1427 			cond_resched();
1428 			count = 0;
1429 		}
1430 
1431 		spin_lock(&con->writequeue_lock);
1432 		writequeue_entry_complete(e, ret);
1433 	}
1434 	spin_unlock(&con->writequeue_lock);
1435 out:
1436 	mutex_unlock(&con->sock_mutex);
1437 	return;
1438 
1439 send_error:
1440 	mutex_unlock(&con->sock_mutex);
1441 	close_connection(con, false, false, true);
1442 	lowcomms_connect_sock(con);
1443 	return;
1444 
1445 out_connect:
1446 	mutex_unlock(&con->sock_mutex);
1447 	lowcomms_connect_sock(con);
1448 }
1449 
1450 static void clean_one_writequeue(struct connection *con)
1451 {
1452 	struct writequeue_entry *e, *safe;
1453 
1454 	spin_lock(&con->writequeue_lock);
1455 	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1456 		list_del(&e->list);
1457 		free_entry(e);
1458 	}
1459 	spin_unlock(&con->writequeue_lock);
1460 }
1461 
1462 /* Called from recovery when it knows that a node has
1463    left the cluster */
1464 int dlm_lowcomms_close(int nodeid)
1465 {
1466 	struct connection *con;
1467 	struct dlm_node_addr *na;
1468 
1469 	log_print("closing connection to node %d", nodeid);
1470 	con = nodeid2con(nodeid, 0);
1471 	if (con) {
1472 		set_bit(CF_CLOSE, &con->flags);
1473 		close_connection(con, true, true, true);
1474 		clean_one_writequeue(con);
1475 	}
1476 
1477 	spin_lock(&dlm_node_addrs_spin);
1478 	na = find_node_addr(nodeid);
1479 	if (na) {
1480 		list_del(&na->list);
1481 		while (na->addr_count--)
1482 			kfree(na->addr[na->addr_count]);
1483 		kfree(na);
1484 	}
1485 	spin_unlock(&dlm_node_addrs_spin);
1486 
1487 	return 0;
1488 }
1489 
1490 /* Receive workqueue function */
1491 static void process_recv_sockets(struct work_struct *work)
1492 {
1493 	struct connection *con = container_of(work, struct connection, rwork);
1494 	int err;
1495 
1496 	clear_bit(CF_READ_PENDING, &con->flags);
1497 	do {
1498 		err = con->rx_action(con);
1499 	} while (!err);
1500 }
1501 
1502 /* Send workqueue function */
1503 static void process_send_sockets(struct work_struct *work)
1504 {
1505 	struct connection *con = container_of(work, struct connection, swork);
1506 
1507 	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags))
1508 		con->connect_action(con);
1509 	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1510 		send_to_sock(con);
1511 }
1512 
1513 
1514 /* Discard all entries on the write queues */
1515 static void clean_writequeues(void)
1516 {
1517 	foreach_conn(clean_one_writequeue);
1518 }
1519 
1520 static void work_stop(void)
1521 {
1522 	destroy_workqueue(recv_workqueue);
1523 	destroy_workqueue(send_workqueue);
1524 }
1525 
1526 static int work_start(void)
1527 {
1528 	recv_workqueue = alloc_workqueue("dlm_recv",
1529 					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1530 	if (!recv_workqueue) {
1531 		log_print("can't start dlm_recv");
1532 		return -ENOMEM;
1533 	}
1534 
1535 	send_workqueue = alloc_workqueue("dlm_send",
1536 					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1537 	if (!send_workqueue) {
1538 		log_print("can't start dlm_send");
1539 		destroy_workqueue(recv_workqueue);
1540 		return -ENOMEM;
1541 	}
1542 
1543 	return 0;
1544 }
1545 
1546 static void stop_conn(struct connection *con)
1547 {
1548 	con->flags |= 0x0F;
1549 	if (con->sock && con->sock->sk)
1550 		con->sock->sk->sk_user_data = NULL;
1551 }
1552 
1553 static void free_conn(struct connection *con)
1554 {
1555 	close_connection(con, true, true, true);
1556 	if (con->othercon)
1557 		kmem_cache_free(con_cache, con->othercon);
1558 	hlist_del(&con->list);
1559 	kmem_cache_free(con_cache, con);
1560 }
1561 
1562 void dlm_lowcomms_stop(void)
1563 {
1564 	/* Set all the flags to prevent any
1565 	   socket activity.
1566 	*/
1567 	mutex_lock(&connections_lock);
1568 	dlm_allow_conn = 0;
1569 	foreach_conn(stop_conn);
1570 	mutex_unlock(&connections_lock);
1571 
1572 	work_stop();
1573 
1574 	mutex_lock(&connections_lock);
1575 	clean_writequeues();
1576 
1577 	foreach_conn(free_conn);
1578 
1579 	mutex_unlock(&connections_lock);
1580 	kmem_cache_destroy(con_cache);
1581 }
1582 
1583 int dlm_lowcomms_start(void)
1584 {
1585 	int error = -EINVAL;
1586 	struct connection *con;
1587 	int i;
1588 
1589 	for (i = 0; i < CONN_HASH_SIZE; i++)
1590 		INIT_HLIST_HEAD(&connection_hash[i]);
1591 
1592 	init_local();
1593 	if (!dlm_local_count) {
1594 		error = -ENOTCONN;
1595 		log_print("no local IP address has been set");
1596 		goto fail;
1597 	}
1598 
1599 	error = -ENOMEM;
1600 	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1601 				      __alignof__(struct connection), 0,
1602 				      NULL);
1603 	if (!con_cache)
1604 		goto fail;
1605 
1606 	error = work_start();
1607 	if (error)
1608 		goto fail_destroy;
1609 
1610 	dlm_allow_conn = 1;
1611 
1612 	/* Start listening */
1613 	if (dlm_config.ci_protocol == 0)
1614 		error = tcp_listen_for_all();
1615 	else
1616 		error = sctp_listen_for_all();
1617 	if (error)
1618 		goto fail_unlisten;
1619 
1620 	return 0;
1621 
1622 fail_unlisten:
1623 	dlm_allow_conn = 0;
1624 	con = nodeid2con(0,0);
1625 	if (con) {
1626 		close_connection(con, false, true, true);
1627 		kmem_cache_free(con_cache, con);
1628 	}
1629 fail_destroy:
1630 	kmem_cache_destroy(con_cache);
1631 fail:
1632 	return error;
1633 }
1634 
1635 void dlm_lowcomms_exit(void)
1636 {
1637 	struct dlm_node_addr *na, *safe;
1638 
1639 	spin_lock(&dlm_node_addrs_spin);
1640 	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1641 		list_del(&na->list);
1642 		while (na->addr_count--)
1643 			kfree(na->addr[na->addr_count]);
1644 		kfree(na);
1645 	}
1646 	spin_unlock(&dlm_node_addrs_spin);
1647 }
1648