xref: /openbmc/linux/fs/dlm/lowcomms.c (revision 0d456bad)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is its
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * lowcomms will choose to use either TCP or SCTP as its transport layer
40  * depending on the configuration variable 'protocol'. This should be set
41  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
42  * cluster-wide mechanism as it must be the same on all nodes of the cluster
43  * for the DLM to function.
44  *
45  */
46 
47 #include <asm/ioctls.h>
48 #include <net/sock.h>
49 #include <net/tcp.h>
50 #include <linux/pagemap.h>
51 #include <linux/file.h>
52 #include <linux/mutex.h>
53 #include <linux/sctp.h>
54 #include <linux/slab.h>
55 #include <net/sctp/sctp.h>
56 #include <net/sctp/user.h>
57 #include <net/ipv6.h>
58 
59 #include "dlm_internal.h"
60 #include "lowcomms.h"
61 #include "midcomms.h"
62 #include "config.h"
63 
64 #define NEEDED_RMEM (4*1024*1024)
65 #define CONN_HASH_SIZE 32
66 
67 /* Number of messages to send before rescheduling */
68 #define MAX_SEND_MSG_COUNT 25
69 
70 struct cbuf {
71 	unsigned int base;
72 	unsigned int len;
73 	unsigned int mask;
74 };
75 
76 static void cbuf_add(struct cbuf *cb, int n)
77 {
78 	cb->len += n;
79 }
80 
81 static int cbuf_data(struct cbuf *cb)
82 {
83 	return ((cb->base + cb->len) & cb->mask);
84 }
85 
86 static void cbuf_init(struct cbuf *cb, int size)
87 {
88 	cb->base = cb->len = 0;
89 	cb->mask = size-1;
90 }
91 
92 static void cbuf_eat(struct cbuf *cb, int n)
93 {
94 	cb->len  -= n;
95 	cb->base += n;
96 	cb->base &= cb->mask;
97 }
98 
99 static bool cbuf_empty(struct cbuf *cb)
100 {
101 	return cb->len == 0;
102 }
103 
104 struct connection {
105 	struct socket *sock;	/* NULL if not connected */
106 	uint32_t nodeid;	/* So we know who we are in the list */
107 	struct mutex sock_mutex;
108 	unsigned long flags;
109 #define CF_READ_PENDING 1
110 #define CF_WRITE_PENDING 2
111 #define CF_CONNECT_PENDING 3
112 #define CF_INIT_PENDING 4
113 #define CF_IS_OTHERCON 5
114 #define CF_CLOSE 6
115 #define CF_APP_LIMITED 7
116 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
117 	spinlock_t writequeue_lock;
118 	int (*rx_action) (struct connection *);	/* What to do when active */
119 	void (*connect_action) (struct connection *);	/* What to do to connect */
120 	struct page *rx_page;
121 	struct cbuf cb;
122 	int retries;
123 #define MAX_CONNECT_RETRIES 3
124 	int sctp_assoc;
125 	struct hlist_node list;
126 	struct connection *othercon;
127 	struct work_struct rwork; /* Receive workqueue */
128 	struct work_struct swork; /* Send workqueue */
129 };
130 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
131 
132 /* An entry waiting to be sent */
133 struct writequeue_entry {
134 	struct list_head list;
135 	struct page *page;
136 	int offset;
137 	int len;
138 	int end;
139 	int users;
140 	struct connection *con;
141 };
142 
143 struct dlm_node_addr {
144 	struct list_head list;
145 	int nodeid;
146 	int addr_count;
147 	struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
148 };
149 
150 static LIST_HEAD(dlm_node_addrs);
151 static DEFINE_SPINLOCK(dlm_node_addrs_spin);
152 
153 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
154 static int dlm_local_count;
155 static int dlm_allow_conn;
156 
157 /* Work queues */
158 static struct workqueue_struct *recv_workqueue;
159 static struct workqueue_struct *send_workqueue;
160 
161 static struct hlist_head connection_hash[CONN_HASH_SIZE];
162 static DEFINE_MUTEX(connections_lock);
163 static struct kmem_cache *con_cache;
164 
165 static void process_recv_sockets(struct work_struct *work);
166 static void process_send_sockets(struct work_struct *work);
167 
168 
169 /* This is deliberately very simple because most clusters have simple
170    sequential nodeids, so we should be able to go straight to a connection
171    struct in the array */
172 static inline int nodeid_hash(int nodeid)
173 {
174 	return nodeid & (CONN_HASH_SIZE-1);
175 }
176 
177 static struct connection *__find_con(int nodeid)
178 {
179 	int r;
180 	struct hlist_node *h;
181 	struct connection *con;
182 
183 	r = nodeid_hash(nodeid);
184 
185 	hlist_for_each_entry(con, h, &connection_hash[r], list) {
186 		if (con->nodeid == nodeid)
187 			return con;
188 	}
189 	return NULL;
190 }
191 
192 /*
193  * If 'allocation' is zero then we don't attempt to create a new
194  * connection structure for this node.
195  */
196 static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
197 {
198 	struct connection *con = NULL;
199 	int r;
200 
201 	con = __find_con(nodeid);
202 	if (con || !alloc)
203 		return con;
204 
205 	con = kmem_cache_zalloc(con_cache, alloc);
206 	if (!con)
207 		return NULL;
208 
209 	r = nodeid_hash(nodeid);
210 	hlist_add_head(&con->list, &connection_hash[r]);
211 
212 	con->nodeid = nodeid;
213 	mutex_init(&con->sock_mutex);
214 	INIT_LIST_HEAD(&con->writequeue);
215 	spin_lock_init(&con->writequeue_lock);
216 	INIT_WORK(&con->swork, process_send_sockets);
217 	INIT_WORK(&con->rwork, process_recv_sockets);
218 
219 	/* Setup action pointers for child sockets */
220 	if (con->nodeid) {
221 		struct connection *zerocon = __find_con(0);
222 
223 		con->connect_action = zerocon->connect_action;
224 		if (!con->rx_action)
225 			con->rx_action = zerocon->rx_action;
226 	}
227 
228 	return con;
229 }
230 
231 /* Loop round all connections */
232 static void foreach_conn(void (*conn_func)(struct connection *c))
233 {
234 	int i;
235 	struct hlist_node *h, *n;
236 	struct connection *con;
237 
238 	for (i = 0; i < CONN_HASH_SIZE; i++) {
239 		hlist_for_each_entry_safe(con, h, n, &connection_hash[i], list){
240 			conn_func(con);
241 		}
242 	}
243 }
244 
245 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
246 {
247 	struct connection *con;
248 
249 	mutex_lock(&connections_lock);
250 	con = __nodeid2con(nodeid, allocation);
251 	mutex_unlock(&connections_lock);
252 
253 	return con;
254 }
255 
256 /* This is a bit drastic, but only called when things go wrong */
257 static struct connection *assoc2con(int assoc_id)
258 {
259 	int i;
260 	struct hlist_node *h;
261 	struct connection *con;
262 
263 	mutex_lock(&connections_lock);
264 
265 	for (i = 0 ; i < CONN_HASH_SIZE; i++) {
266 		hlist_for_each_entry(con, h, &connection_hash[i], list) {
267 			if (con->sctp_assoc == assoc_id) {
268 				mutex_unlock(&connections_lock);
269 				return con;
270 			}
271 		}
272 	}
273 	mutex_unlock(&connections_lock);
274 	return NULL;
275 }
276 
277 static struct dlm_node_addr *find_node_addr(int nodeid)
278 {
279 	struct dlm_node_addr *na;
280 
281 	list_for_each_entry(na, &dlm_node_addrs, list) {
282 		if (na->nodeid == nodeid)
283 			return na;
284 	}
285 	return NULL;
286 }
287 
288 static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
289 {
290 	switch (x->ss_family) {
291 	case AF_INET: {
292 		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
293 		struct sockaddr_in *siny = (struct sockaddr_in *)y;
294 		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
295 			return 0;
296 		if (sinx->sin_port != siny->sin_port)
297 			return 0;
298 		break;
299 	}
300 	case AF_INET6: {
301 		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
302 		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
303 		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
304 			return 0;
305 		if (sinx->sin6_port != siny->sin6_port)
306 			return 0;
307 		break;
308 	}
309 	default:
310 		return 0;
311 	}
312 	return 1;
313 }
314 
315 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
316 			  struct sockaddr *sa_out)
317 {
318 	struct sockaddr_storage sas;
319 	struct dlm_node_addr *na;
320 
321 	if (!dlm_local_count)
322 		return -1;
323 
324 	spin_lock(&dlm_node_addrs_spin);
325 	na = find_node_addr(nodeid);
326 	if (na && na->addr_count)
327 		memcpy(&sas, na->addr[0], sizeof(struct sockaddr_storage));
328 	spin_unlock(&dlm_node_addrs_spin);
329 
330 	if (!na)
331 		return -EEXIST;
332 
333 	if (!na->addr_count)
334 		return -ENOENT;
335 
336 	if (sas_out)
337 		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
338 
339 	if (!sa_out)
340 		return 0;
341 
342 	if (dlm_local_addr[0]->ss_family == AF_INET) {
343 		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
344 		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
345 		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
346 	} else {
347 		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
348 		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
349 		ret6->sin6_addr = in6->sin6_addr;
350 	}
351 
352 	return 0;
353 }
354 
355 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
356 {
357 	struct dlm_node_addr *na;
358 	int rv = -EEXIST;
359 
360 	spin_lock(&dlm_node_addrs_spin);
361 	list_for_each_entry(na, &dlm_node_addrs, list) {
362 		if (!na->addr_count)
363 			continue;
364 
365 		if (!addr_compare(na->addr[0], addr))
366 			continue;
367 
368 		*nodeid = na->nodeid;
369 		rv = 0;
370 		break;
371 	}
372 	spin_unlock(&dlm_node_addrs_spin);
373 	return rv;
374 }
375 
376 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
377 {
378 	struct sockaddr_storage *new_addr;
379 	struct dlm_node_addr *new_node, *na;
380 
381 	new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
382 	if (!new_node)
383 		return -ENOMEM;
384 
385 	new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
386 	if (!new_addr) {
387 		kfree(new_node);
388 		return -ENOMEM;
389 	}
390 
391 	memcpy(new_addr, addr, len);
392 
393 	spin_lock(&dlm_node_addrs_spin);
394 	na = find_node_addr(nodeid);
395 	if (!na) {
396 		new_node->nodeid = nodeid;
397 		new_node->addr[0] = new_addr;
398 		new_node->addr_count = 1;
399 		list_add(&new_node->list, &dlm_node_addrs);
400 		spin_unlock(&dlm_node_addrs_spin);
401 		return 0;
402 	}
403 
404 	if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
405 		spin_unlock(&dlm_node_addrs_spin);
406 		kfree(new_addr);
407 		kfree(new_node);
408 		return -ENOSPC;
409 	}
410 
411 	na->addr[na->addr_count++] = new_addr;
412 	spin_unlock(&dlm_node_addrs_spin);
413 	kfree(new_node);
414 	return 0;
415 }
416 
417 /* Data available on socket or listen socket received a connect */
418 static void lowcomms_data_ready(struct sock *sk, int count_unused)
419 {
420 	struct connection *con = sock2con(sk);
421 	if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
422 		queue_work(recv_workqueue, &con->rwork);
423 }
424 
425 static void lowcomms_write_space(struct sock *sk)
426 {
427 	struct connection *con = sock2con(sk);
428 
429 	if (!con)
430 		return;
431 
432 	clear_bit(SOCK_NOSPACE, &con->sock->flags);
433 
434 	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
435 		con->sock->sk->sk_write_pending--;
436 		clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
437 	}
438 
439 	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
440 		queue_work(send_workqueue, &con->swork);
441 }
442 
443 static inline void lowcomms_connect_sock(struct connection *con)
444 {
445 	if (test_bit(CF_CLOSE, &con->flags))
446 		return;
447 	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
448 		queue_work(send_workqueue, &con->swork);
449 }
450 
451 static void lowcomms_state_change(struct sock *sk)
452 {
453 	if (sk->sk_state == TCP_ESTABLISHED)
454 		lowcomms_write_space(sk);
455 }
456 
457 int dlm_lowcomms_connect_node(int nodeid)
458 {
459 	struct connection *con;
460 
461 	/* with sctp there's no connecting without sending */
462 	if (dlm_config.ci_protocol != 0)
463 		return 0;
464 
465 	if (nodeid == dlm_our_nodeid())
466 		return 0;
467 
468 	con = nodeid2con(nodeid, GFP_NOFS);
469 	if (!con)
470 		return -ENOMEM;
471 	lowcomms_connect_sock(con);
472 	return 0;
473 }
474 
475 /* Make a socket active */
476 static void add_sock(struct socket *sock, struct connection *con)
477 {
478 	con->sock = sock;
479 
480 	/* Install a data_ready callback */
481 	con->sock->sk->sk_data_ready = lowcomms_data_ready;
482 	con->sock->sk->sk_write_space = lowcomms_write_space;
483 	con->sock->sk->sk_state_change = lowcomms_state_change;
484 	con->sock->sk->sk_user_data = con;
485 	con->sock->sk->sk_allocation = GFP_NOFS;
486 }
487 
488 /* Add the port number to an IPv6 or 4 sockaddr and return the address
489    length */
490 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
491 			  int *addr_len)
492 {
493 	saddr->ss_family =  dlm_local_addr[0]->ss_family;
494 	if (saddr->ss_family == AF_INET) {
495 		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
496 		in4_addr->sin_port = cpu_to_be16(port);
497 		*addr_len = sizeof(struct sockaddr_in);
498 		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
499 	} else {
500 		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
501 		in6_addr->sin6_port = cpu_to_be16(port);
502 		*addr_len = sizeof(struct sockaddr_in6);
503 	}
504 	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
505 }
506 
507 /* Close a remote connection and tidy up */
508 static void close_connection(struct connection *con, bool and_other)
509 {
510 	mutex_lock(&con->sock_mutex);
511 
512 	if (con->sock) {
513 		sock_release(con->sock);
514 		con->sock = NULL;
515 	}
516 	if (con->othercon && and_other) {
517 		/* Will only re-enter once. */
518 		close_connection(con->othercon, false);
519 	}
520 	if (con->rx_page) {
521 		__free_page(con->rx_page);
522 		con->rx_page = NULL;
523 	}
524 
525 	con->retries = 0;
526 	mutex_unlock(&con->sock_mutex);
527 }
528 
529 /* We only send shutdown messages to nodes that are not part of the cluster */
530 static void sctp_send_shutdown(sctp_assoc_t associd)
531 {
532 	static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
533 	struct msghdr outmessage;
534 	struct cmsghdr *cmsg;
535 	struct sctp_sndrcvinfo *sinfo;
536 	int ret;
537 	struct connection *con;
538 
539 	con = nodeid2con(0,0);
540 	BUG_ON(con == NULL);
541 
542 	outmessage.msg_name = NULL;
543 	outmessage.msg_namelen = 0;
544 	outmessage.msg_control = outcmsg;
545 	outmessage.msg_controllen = sizeof(outcmsg);
546 	outmessage.msg_flags = MSG_EOR;
547 
548 	cmsg = CMSG_FIRSTHDR(&outmessage);
549 	cmsg->cmsg_level = IPPROTO_SCTP;
550 	cmsg->cmsg_type = SCTP_SNDRCV;
551 	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
552 	outmessage.msg_controllen = cmsg->cmsg_len;
553 	sinfo = CMSG_DATA(cmsg);
554 	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
555 
556 	sinfo->sinfo_flags |= MSG_EOF;
557 	sinfo->sinfo_assoc_id = associd;
558 
559 	ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
560 
561 	if (ret != 0)
562 		log_print("send EOF to node failed: %d", ret);
563 }
564 
565 static void sctp_init_failed_foreach(struct connection *con)
566 {
567 	con->sctp_assoc = 0;
568 	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
569 		if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
570 			queue_work(send_workqueue, &con->swork);
571 	}
572 }
573 
574 /* INIT failed but we don't know which node...
575    restart INIT on all pending nodes */
576 static void sctp_init_failed(void)
577 {
578 	mutex_lock(&connections_lock);
579 
580 	foreach_conn(sctp_init_failed_foreach);
581 
582 	mutex_unlock(&connections_lock);
583 }
584 
585 /* Something happened to an association */
586 static void process_sctp_notification(struct connection *con,
587 				      struct msghdr *msg, char *buf)
588 {
589 	union sctp_notification *sn = (union sctp_notification *)buf;
590 
591 	if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
592 		switch (sn->sn_assoc_change.sac_state) {
593 
594 		case SCTP_COMM_UP:
595 		case SCTP_RESTART:
596 		{
597 			/* Check that the new node is in the lockspace */
598 			struct sctp_prim prim;
599 			int nodeid;
600 			int prim_len, ret;
601 			int addr_len;
602 			struct connection *new_con;
603 
604 			/*
605 			 * We get this before any data for an association.
606 			 * We verify that the node is in the cluster and
607 			 * then peel off a socket for it.
608 			 */
609 			if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
610 				log_print("COMM_UP for invalid assoc ID %d",
611 					 (int)sn->sn_assoc_change.sac_assoc_id);
612 				sctp_init_failed();
613 				return;
614 			}
615 			memset(&prim, 0, sizeof(struct sctp_prim));
616 			prim_len = sizeof(struct sctp_prim);
617 			prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
618 
619 			ret = kernel_getsockopt(con->sock,
620 						IPPROTO_SCTP,
621 						SCTP_PRIMARY_ADDR,
622 						(char*)&prim,
623 						&prim_len);
624 			if (ret < 0) {
625 				log_print("getsockopt/sctp_primary_addr on "
626 					  "new assoc %d failed : %d",
627 					  (int)sn->sn_assoc_change.sac_assoc_id,
628 					  ret);
629 
630 				/* Retry INIT later */
631 				new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
632 				if (new_con)
633 					clear_bit(CF_CONNECT_PENDING, &con->flags);
634 				return;
635 			}
636 			make_sockaddr(&prim.ssp_addr, 0, &addr_len);
637 			if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
638 				unsigned char *b=(unsigned char *)&prim.ssp_addr;
639 				log_print("reject connect from unknown addr");
640 				print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
641 						     b, sizeof(struct sockaddr_storage));
642 				sctp_send_shutdown(prim.ssp_assoc_id);
643 				return;
644 			}
645 
646 			new_con = nodeid2con(nodeid, GFP_NOFS);
647 			if (!new_con)
648 				return;
649 
650 			/* Peel off a new sock */
651 			sctp_lock_sock(con->sock->sk);
652 			ret = sctp_do_peeloff(con->sock->sk,
653 				sn->sn_assoc_change.sac_assoc_id,
654 				&new_con->sock);
655 			sctp_release_sock(con->sock->sk);
656 			if (ret < 0) {
657 				log_print("Can't peel off a socket for "
658 					  "connection %d to node %d: err=%d",
659 					  (int)sn->sn_assoc_change.sac_assoc_id,
660 					  nodeid, ret);
661 				return;
662 			}
663 			add_sock(new_con->sock, new_con);
664 
665 			log_print("connecting to %d sctp association %d",
666 				 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
667 
668 			/* Send any pending writes */
669 			clear_bit(CF_CONNECT_PENDING, &new_con->flags);
670 			clear_bit(CF_INIT_PENDING, &con->flags);
671 			if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
672 				queue_work(send_workqueue, &new_con->swork);
673 			}
674 			if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
675 				queue_work(recv_workqueue, &new_con->rwork);
676 		}
677 		break;
678 
679 		case SCTP_COMM_LOST:
680 		case SCTP_SHUTDOWN_COMP:
681 		{
682 			con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
683 			if (con) {
684 				con->sctp_assoc = 0;
685 			}
686 		}
687 		break;
688 
689 		/* We don't know which INIT failed, so clear the PENDING flags
690 		 * on them all.  if assoc_id is zero then it will then try
691 		 * again */
692 
693 		case SCTP_CANT_STR_ASSOC:
694 		{
695 			log_print("Can't start SCTP association - retrying");
696 			sctp_init_failed();
697 		}
698 		break;
699 
700 		default:
701 			log_print("unexpected SCTP assoc change id=%d state=%d",
702 				  (int)sn->sn_assoc_change.sac_assoc_id,
703 				  sn->sn_assoc_change.sac_state);
704 		}
705 	}
706 }
707 
708 /* Data received from remote end */
709 static int receive_from_sock(struct connection *con)
710 {
711 	int ret = 0;
712 	struct msghdr msg = {};
713 	struct kvec iov[2];
714 	unsigned len;
715 	int r;
716 	int call_again_soon = 0;
717 	int nvec;
718 	char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
719 
720 	mutex_lock(&con->sock_mutex);
721 
722 	if (con->sock == NULL) {
723 		ret = -EAGAIN;
724 		goto out_close;
725 	}
726 
727 	if (con->rx_page == NULL) {
728 		/*
729 		 * This doesn't need to be atomic, but I think it should
730 		 * improve performance if it is.
731 		 */
732 		con->rx_page = alloc_page(GFP_ATOMIC);
733 		if (con->rx_page == NULL)
734 			goto out_resched;
735 		cbuf_init(&con->cb, PAGE_CACHE_SIZE);
736 	}
737 
738 	/* Only SCTP needs these really */
739 	memset(&incmsg, 0, sizeof(incmsg));
740 	msg.msg_control = incmsg;
741 	msg.msg_controllen = sizeof(incmsg);
742 
743 	/*
744 	 * iov[0] is the bit of the circular buffer between the current end
745 	 * point (cb.base + cb.len) and the end of the buffer.
746 	 */
747 	iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
748 	iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
749 	iov[1].iov_len = 0;
750 	nvec = 1;
751 
752 	/*
753 	 * iov[1] is the bit of the circular buffer between the start of the
754 	 * buffer and the start of the currently used section (cb.base)
755 	 */
756 	if (cbuf_data(&con->cb) >= con->cb.base) {
757 		iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
758 		iov[1].iov_len = con->cb.base;
759 		iov[1].iov_base = page_address(con->rx_page);
760 		nvec = 2;
761 	}
762 	len = iov[0].iov_len + iov[1].iov_len;
763 
764 	r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
765 			       MSG_DONTWAIT | MSG_NOSIGNAL);
766 	if (ret <= 0)
767 		goto out_close;
768 
769 	/* Process SCTP notifications */
770 	if (msg.msg_flags & MSG_NOTIFICATION) {
771 		msg.msg_control = incmsg;
772 		msg.msg_controllen = sizeof(incmsg);
773 
774 		process_sctp_notification(con, &msg,
775 				page_address(con->rx_page) + con->cb.base);
776 		mutex_unlock(&con->sock_mutex);
777 		return 0;
778 	}
779 	BUG_ON(con->nodeid == 0);
780 
781 	if (ret == len)
782 		call_again_soon = 1;
783 	cbuf_add(&con->cb, ret);
784 	ret = dlm_process_incoming_buffer(con->nodeid,
785 					  page_address(con->rx_page),
786 					  con->cb.base, con->cb.len,
787 					  PAGE_CACHE_SIZE);
788 	if (ret == -EBADMSG) {
789 		log_print("lowcomms: addr=%p, base=%u, len=%u, "
790 			  "iov_len=%u, iov_base[0]=%p, read=%d",
791 			  page_address(con->rx_page), con->cb.base, con->cb.len,
792 			  len, iov[0].iov_base, r);
793 	}
794 	if (ret < 0)
795 		goto out_close;
796 	cbuf_eat(&con->cb, ret);
797 
798 	if (cbuf_empty(&con->cb) && !call_again_soon) {
799 		__free_page(con->rx_page);
800 		con->rx_page = NULL;
801 	}
802 
803 	if (call_again_soon)
804 		goto out_resched;
805 	mutex_unlock(&con->sock_mutex);
806 	return 0;
807 
808 out_resched:
809 	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
810 		queue_work(recv_workqueue, &con->rwork);
811 	mutex_unlock(&con->sock_mutex);
812 	return -EAGAIN;
813 
814 out_close:
815 	mutex_unlock(&con->sock_mutex);
816 	if (ret != -EAGAIN) {
817 		close_connection(con, false);
818 		/* Reconnect when there is something to send */
819 	}
820 	/* Don't return success if we really got EOF */
821 	if (ret == 0)
822 		ret = -EAGAIN;
823 
824 	return ret;
825 }
826 
827 /* Listening socket is busy, accept a connection */
828 static int tcp_accept_from_sock(struct connection *con)
829 {
830 	int result;
831 	struct sockaddr_storage peeraddr;
832 	struct socket *newsock;
833 	int len;
834 	int nodeid;
835 	struct connection *newcon;
836 	struct connection *addcon;
837 
838 	mutex_lock(&connections_lock);
839 	if (!dlm_allow_conn) {
840 		mutex_unlock(&connections_lock);
841 		return -1;
842 	}
843 	mutex_unlock(&connections_lock);
844 
845 	memset(&peeraddr, 0, sizeof(peeraddr));
846 	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
847 				  IPPROTO_TCP, &newsock);
848 	if (result < 0)
849 		return -ENOMEM;
850 
851 	mutex_lock_nested(&con->sock_mutex, 0);
852 
853 	result = -ENOTCONN;
854 	if (con->sock == NULL)
855 		goto accept_err;
856 
857 	newsock->type = con->sock->type;
858 	newsock->ops = con->sock->ops;
859 
860 	result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
861 	if (result < 0)
862 		goto accept_err;
863 
864 	/* Get the connected socket's peer */
865 	memset(&peeraddr, 0, sizeof(peeraddr));
866 	if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
867 				  &len, 2)) {
868 		result = -ECONNABORTED;
869 		goto accept_err;
870 	}
871 
872 	/* Get the new node's NODEID */
873 	make_sockaddr(&peeraddr, 0, &len);
874 	if (addr_to_nodeid(&peeraddr, &nodeid)) {
875 		unsigned char *b=(unsigned char *)&peeraddr;
876 		log_print("connect from non cluster node");
877 		print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
878 				     b, sizeof(struct sockaddr_storage));
879 		sock_release(newsock);
880 		mutex_unlock(&con->sock_mutex);
881 		return -1;
882 	}
883 
884 	log_print("got connection from %d", nodeid);
885 
886 	/*  Check to see if we already have a connection to this node. This
887 	 *  could happen if the two nodes initiate a connection at roughly
888 	 *  the same time and the connections cross on the wire.
889 	 *  In this case we store the incoming one in "othercon"
890 	 */
891 	newcon = nodeid2con(nodeid, GFP_NOFS);
892 	if (!newcon) {
893 		result = -ENOMEM;
894 		goto accept_err;
895 	}
896 	mutex_lock_nested(&newcon->sock_mutex, 1);
897 	if (newcon->sock) {
898 		struct connection *othercon = newcon->othercon;
899 
900 		if (!othercon) {
901 			othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
902 			if (!othercon) {
903 				log_print("failed to allocate incoming socket");
904 				mutex_unlock(&newcon->sock_mutex);
905 				result = -ENOMEM;
906 				goto accept_err;
907 			}
908 			othercon->nodeid = nodeid;
909 			othercon->rx_action = receive_from_sock;
910 			mutex_init(&othercon->sock_mutex);
911 			INIT_WORK(&othercon->swork, process_send_sockets);
912 			INIT_WORK(&othercon->rwork, process_recv_sockets);
913 			set_bit(CF_IS_OTHERCON, &othercon->flags);
914 		}
915 		if (!othercon->sock) {
916 			newcon->othercon = othercon;
917 			othercon->sock = newsock;
918 			newsock->sk->sk_user_data = othercon;
919 			add_sock(newsock, othercon);
920 			addcon = othercon;
921 		}
922 		else {
923 			printk("Extra connection from node %d attempted\n", nodeid);
924 			result = -EAGAIN;
925 			mutex_unlock(&newcon->sock_mutex);
926 			goto accept_err;
927 		}
928 	}
929 	else {
930 		newsock->sk->sk_user_data = newcon;
931 		newcon->rx_action = receive_from_sock;
932 		add_sock(newsock, newcon);
933 		addcon = newcon;
934 	}
935 
936 	mutex_unlock(&newcon->sock_mutex);
937 
938 	/*
939 	 * Add it to the active queue in case we got data
940 	 * between processing the accept adding the socket
941 	 * to the read_sockets list
942 	 */
943 	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
944 		queue_work(recv_workqueue, &addcon->rwork);
945 	mutex_unlock(&con->sock_mutex);
946 
947 	return 0;
948 
949 accept_err:
950 	mutex_unlock(&con->sock_mutex);
951 	sock_release(newsock);
952 
953 	if (result != -EAGAIN)
954 		log_print("error accepting connection from node: %d", result);
955 	return result;
956 }
957 
958 static void free_entry(struct writequeue_entry *e)
959 {
960 	__free_page(e->page);
961 	kfree(e);
962 }
963 
964 /* Initiate an SCTP association.
965    This is a special case of send_to_sock() in that we don't yet have a
966    peeled-off socket for this association, so we use the listening socket
967    and add the primary IP address of the remote node.
968  */
969 static void sctp_init_assoc(struct connection *con)
970 {
971 	struct sockaddr_storage rem_addr;
972 	char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
973 	struct msghdr outmessage;
974 	struct cmsghdr *cmsg;
975 	struct sctp_sndrcvinfo *sinfo;
976 	struct connection *base_con;
977 	struct writequeue_entry *e;
978 	int len, offset;
979 	int ret;
980 	int addrlen;
981 	struct kvec iov[1];
982 
983 	if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
984 		return;
985 
986 	if (con->retries++ > MAX_CONNECT_RETRIES)
987 		return;
988 
989 	if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr)) {
990 		log_print("no address for nodeid %d", con->nodeid);
991 		return;
992 	}
993 	base_con = nodeid2con(0, 0);
994 	BUG_ON(base_con == NULL);
995 
996 	make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
997 
998 	outmessage.msg_name = &rem_addr;
999 	outmessage.msg_namelen = addrlen;
1000 	outmessage.msg_control = outcmsg;
1001 	outmessage.msg_controllen = sizeof(outcmsg);
1002 	outmessage.msg_flags = MSG_EOR;
1003 
1004 	spin_lock(&con->writequeue_lock);
1005 
1006 	if (list_empty(&con->writequeue)) {
1007 		spin_unlock(&con->writequeue_lock);
1008 		log_print("writequeue empty for nodeid %d", con->nodeid);
1009 		return;
1010 	}
1011 
1012 	e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1013 	len = e->len;
1014 	offset = e->offset;
1015 	spin_unlock(&con->writequeue_lock);
1016 
1017 	/* Send the first block off the write queue */
1018 	iov[0].iov_base = page_address(e->page)+offset;
1019 	iov[0].iov_len = len;
1020 
1021 	cmsg = CMSG_FIRSTHDR(&outmessage);
1022 	cmsg->cmsg_level = IPPROTO_SCTP;
1023 	cmsg->cmsg_type = SCTP_SNDRCV;
1024 	cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1025 	sinfo = CMSG_DATA(cmsg);
1026 	memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1027 	sinfo->sinfo_ppid = cpu_to_le32(dlm_our_nodeid());
1028 	outmessage.msg_controllen = cmsg->cmsg_len;
1029 
1030 	ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1031 	if (ret < 0) {
1032 		log_print("Send first packet to node %d failed: %d",
1033 			  con->nodeid, ret);
1034 
1035 		/* Try again later */
1036 		clear_bit(CF_CONNECT_PENDING, &con->flags);
1037 		clear_bit(CF_INIT_PENDING, &con->flags);
1038 	}
1039 	else {
1040 		spin_lock(&con->writequeue_lock);
1041 		e->offset += ret;
1042 		e->len -= ret;
1043 
1044 		if (e->len == 0 && e->users == 0) {
1045 			list_del(&e->list);
1046 			free_entry(e);
1047 		}
1048 		spin_unlock(&con->writequeue_lock);
1049 	}
1050 }
1051 
1052 /* Connect a new socket to its peer */
1053 static void tcp_connect_to_sock(struct connection *con)
1054 {
1055 	struct sockaddr_storage saddr, src_addr;
1056 	int addr_len;
1057 	struct socket *sock = NULL;
1058 	int one = 1;
1059 	int result;
1060 
1061 	if (con->nodeid == 0) {
1062 		log_print("attempt to connect sock 0 foiled");
1063 		return;
1064 	}
1065 
1066 	mutex_lock(&con->sock_mutex);
1067 	if (con->retries++ > MAX_CONNECT_RETRIES)
1068 		goto out;
1069 
1070 	/* Some odd races can cause double-connects, ignore them */
1071 	if (con->sock)
1072 		goto out;
1073 
1074 	/* Create a socket to communicate with */
1075 	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1076 				  IPPROTO_TCP, &sock);
1077 	if (result < 0)
1078 		goto out_err;
1079 
1080 	memset(&saddr, 0, sizeof(saddr));
1081 	result = nodeid_to_addr(con->nodeid, &saddr, NULL);
1082 	if (result < 0) {
1083 		log_print("no address for nodeid %d", con->nodeid);
1084 		goto out_err;
1085 	}
1086 
1087 	sock->sk->sk_user_data = con;
1088 	con->rx_action = receive_from_sock;
1089 	con->connect_action = tcp_connect_to_sock;
1090 	add_sock(sock, con);
1091 
1092 	/* Bind to our cluster-known address connecting to avoid
1093 	   routing problems */
1094 	memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1095 	make_sockaddr(&src_addr, 0, &addr_len);
1096 	result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1097 				 addr_len);
1098 	if (result < 0) {
1099 		log_print("could not bind for connect: %d", result);
1100 		/* This *may* not indicate a critical error */
1101 	}
1102 
1103 	make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1104 
1105 	log_print("connecting to %d", con->nodeid);
1106 
1107 	/* Turn off Nagle's algorithm */
1108 	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1109 			  sizeof(one));
1110 
1111 	result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1112 				   O_NONBLOCK);
1113 	if (result == -EINPROGRESS)
1114 		result = 0;
1115 	if (result == 0)
1116 		goto out;
1117 
1118 out_err:
1119 	if (con->sock) {
1120 		sock_release(con->sock);
1121 		con->sock = NULL;
1122 	} else if (sock) {
1123 		sock_release(sock);
1124 	}
1125 	/*
1126 	 * Some errors are fatal and this list might need adjusting. For other
1127 	 * errors we try again until the max number of retries is reached.
1128 	 */
1129 	if (result != -EHOSTUNREACH &&
1130 	    result != -ENETUNREACH &&
1131 	    result != -ENETDOWN &&
1132 	    result != -EINVAL &&
1133 	    result != -EPROTONOSUPPORT) {
1134 		log_print("connect %d try %d error %d", con->nodeid,
1135 			  con->retries, result);
1136 		mutex_unlock(&con->sock_mutex);
1137 		msleep(1000);
1138 		lowcomms_connect_sock(con);
1139 		return;
1140 	}
1141 out:
1142 	mutex_unlock(&con->sock_mutex);
1143 	return;
1144 }
1145 
1146 static struct socket *tcp_create_listen_sock(struct connection *con,
1147 					     struct sockaddr_storage *saddr)
1148 {
1149 	struct socket *sock = NULL;
1150 	int result = 0;
1151 	int one = 1;
1152 	int addr_len;
1153 
1154 	if (dlm_local_addr[0]->ss_family == AF_INET)
1155 		addr_len = sizeof(struct sockaddr_in);
1156 	else
1157 		addr_len = sizeof(struct sockaddr_in6);
1158 
1159 	/* Create a socket to communicate with */
1160 	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1161 				  IPPROTO_TCP, &sock);
1162 	if (result < 0) {
1163 		log_print("Can't create listening comms socket");
1164 		goto create_out;
1165 	}
1166 
1167 	/* Turn off Nagle's algorithm */
1168 	kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1169 			  sizeof(one));
1170 
1171 	result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1172 				   (char *)&one, sizeof(one));
1173 
1174 	if (result < 0) {
1175 		log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1176 	}
1177 	con->rx_action = tcp_accept_from_sock;
1178 	con->connect_action = tcp_connect_to_sock;
1179 
1180 	/* Bind to our port */
1181 	make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1182 	result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1183 	if (result < 0) {
1184 		log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1185 		sock_release(sock);
1186 		sock = NULL;
1187 		con->sock = NULL;
1188 		goto create_out;
1189 	}
1190 	result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1191 				 (char *)&one, sizeof(one));
1192 	if (result < 0) {
1193 		log_print("Set keepalive failed: %d", result);
1194 	}
1195 
1196 	result = sock->ops->listen(sock, 5);
1197 	if (result < 0) {
1198 		log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1199 		sock_release(sock);
1200 		sock = NULL;
1201 		goto create_out;
1202 	}
1203 
1204 create_out:
1205 	return sock;
1206 }
1207 
1208 /* Get local addresses */
1209 static void init_local(void)
1210 {
1211 	struct sockaddr_storage sas, *addr;
1212 	int i;
1213 
1214 	dlm_local_count = 0;
1215 	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1216 		if (dlm_our_addr(&sas, i))
1217 			break;
1218 
1219 		addr = kmalloc(sizeof(*addr), GFP_NOFS);
1220 		if (!addr)
1221 			break;
1222 		memcpy(addr, &sas, sizeof(*addr));
1223 		dlm_local_addr[dlm_local_count++] = addr;
1224 	}
1225 }
1226 
1227 /* Bind to an IP address. SCTP allows multiple address so it can do
1228    multi-homing */
1229 static int add_sctp_bind_addr(struct connection *sctp_con,
1230 			      struct sockaddr_storage *addr,
1231 			      int addr_len, int num)
1232 {
1233 	int result = 0;
1234 
1235 	if (num == 1)
1236 		result = kernel_bind(sctp_con->sock,
1237 				     (struct sockaddr *) addr,
1238 				     addr_len);
1239 	else
1240 		result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1241 					   SCTP_SOCKOPT_BINDX_ADD,
1242 					   (char *)addr, addr_len);
1243 
1244 	if (result < 0)
1245 		log_print("Can't bind to port %d addr number %d",
1246 			  dlm_config.ci_tcp_port, num);
1247 
1248 	return result;
1249 }
1250 
1251 /* Initialise SCTP socket and bind to all interfaces */
1252 static int sctp_listen_for_all(void)
1253 {
1254 	struct socket *sock = NULL;
1255 	struct sockaddr_storage localaddr;
1256 	struct sctp_event_subscribe subscribe;
1257 	int result = -EINVAL, num = 1, i, addr_len;
1258 	struct connection *con = nodeid2con(0, GFP_NOFS);
1259 	int bufsize = NEEDED_RMEM;
1260 
1261 	if (!con)
1262 		return -ENOMEM;
1263 
1264 	log_print("Using SCTP for communications");
1265 
1266 	result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1267 				  IPPROTO_SCTP, &sock);
1268 	if (result < 0) {
1269 		log_print("Can't create comms socket, check SCTP is loaded");
1270 		goto out;
1271 	}
1272 
1273 	/* Listen for events */
1274 	memset(&subscribe, 0, sizeof(subscribe));
1275 	subscribe.sctp_data_io_event = 1;
1276 	subscribe.sctp_association_event = 1;
1277 	subscribe.sctp_send_failure_event = 1;
1278 	subscribe.sctp_shutdown_event = 1;
1279 	subscribe.sctp_partial_delivery_event = 1;
1280 
1281 	result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1282 				 (char *)&bufsize, sizeof(bufsize));
1283 	if (result)
1284 		log_print("Error increasing buffer space on socket %d", result);
1285 
1286 	result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1287 				   (char *)&subscribe, sizeof(subscribe));
1288 	if (result < 0) {
1289 		log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1290 			  result);
1291 		goto create_delsock;
1292 	}
1293 
1294 	/* Init con struct */
1295 	sock->sk->sk_user_data = con;
1296 	con->sock = sock;
1297 	con->sock->sk->sk_data_ready = lowcomms_data_ready;
1298 	con->rx_action = receive_from_sock;
1299 	con->connect_action = sctp_init_assoc;
1300 
1301 	/* Bind to all interfaces. */
1302 	for (i = 0; i < dlm_local_count; i++) {
1303 		memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1304 		make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1305 
1306 		result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1307 		if (result)
1308 			goto create_delsock;
1309 		++num;
1310 	}
1311 
1312 	result = sock->ops->listen(sock, 5);
1313 	if (result < 0) {
1314 		log_print("Can't set socket listening");
1315 		goto create_delsock;
1316 	}
1317 
1318 	return 0;
1319 
1320 create_delsock:
1321 	sock_release(sock);
1322 	con->sock = NULL;
1323 out:
1324 	return result;
1325 }
1326 
1327 static int tcp_listen_for_all(void)
1328 {
1329 	struct socket *sock = NULL;
1330 	struct connection *con = nodeid2con(0, GFP_NOFS);
1331 	int result = -EINVAL;
1332 
1333 	if (!con)
1334 		return -ENOMEM;
1335 
1336 	/* We don't support multi-homed hosts */
1337 	if (dlm_local_addr[1] != NULL) {
1338 		log_print("TCP protocol can't handle multi-homed hosts, "
1339 			  "try SCTP");
1340 		return -EINVAL;
1341 	}
1342 
1343 	log_print("Using TCP for communications");
1344 
1345 	sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1346 	if (sock) {
1347 		add_sock(sock, con);
1348 		result = 0;
1349 	}
1350 	else {
1351 		result = -EADDRINUSE;
1352 	}
1353 
1354 	return result;
1355 }
1356 
1357 
1358 
1359 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1360 						     gfp_t allocation)
1361 {
1362 	struct writequeue_entry *entry;
1363 
1364 	entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1365 	if (!entry)
1366 		return NULL;
1367 
1368 	entry->page = alloc_page(allocation);
1369 	if (!entry->page) {
1370 		kfree(entry);
1371 		return NULL;
1372 	}
1373 
1374 	entry->offset = 0;
1375 	entry->len = 0;
1376 	entry->end = 0;
1377 	entry->users = 0;
1378 	entry->con = con;
1379 
1380 	return entry;
1381 }
1382 
1383 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1384 {
1385 	struct connection *con;
1386 	struct writequeue_entry *e;
1387 	int offset = 0;
1388 
1389 	con = nodeid2con(nodeid, allocation);
1390 	if (!con)
1391 		return NULL;
1392 
1393 	spin_lock(&con->writequeue_lock);
1394 	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1395 	if ((&e->list == &con->writequeue) ||
1396 	    (PAGE_CACHE_SIZE - e->end < len)) {
1397 		e = NULL;
1398 	} else {
1399 		offset = e->end;
1400 		e->end += len;
1401 		e->users++;
1402 	}
1403 	spin_unlock(&con->writequeue_lock);
1404 
1405 	if (e) {
1406 	got_one:
1407 		*ppc = page_address(e->page) + offset;
1408 		return e;
1409 	}
1410 
1411 	e = new_writequeue_entry(con, allocation);
1412 	if (e) {
1413 		spin_lock(&con->writequeue_lock);
1414 		offset = e->end;
1415 		e->end += len;
1416 		e->users++;
1417 		list_add_tail(&e->list, &con->writequeue);
1418 		spin_unlock(&con->writequeue_lock);
1419 		goto got_one;
1420 	}
1421 	return NULL;
1422 }
1423 
1424 void dlm_lowcomms_commit_buffer(void *mh)
1425 {
1426 	struct writequeue_entry *e = (struct writequeue_entry *)mh;
1427 	struct connection *con = e->con;
1428 	int users;
1429 
1430 	spin_lock(&con->writequeue_lock);
1431 	users = --e->users;
1432 	if (users)
1433 		goto out;
1434 	e->len = e->end - e->offset;
1435 	spin_unlock(&con->writequeue_lock);
1436 
1437 	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1438 		queue_work(send_workqueue, &con->swork);
1439 	}
1440 	return;
1441 
1442 out:
1443 	spin_unlock(&con->writequeue_lock);
1444 	return;
1445 }
1446 
1447 /* Send a message */
1448 static void send_to_sock(struct connection *con)
1449 {
1450 	int ret = 0;
1451 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1452 	struct writequeue_entry *e;
1453 	int len, offset;
1454 	int count = 0;
1455 
1456 	mutex_lock(&con->sock_mutex);
1457 	if (con->sock == NULL)
1458 		goto out_connect;
1459 
1460 	spin_lock(&con->writequeue_lock);
1461 	for (;;) {
1462 		e = list_entry(con->writequeue.next, struct writequeue_entry,
1463 			       list);
1464 		if ((struct list_head *) e == &con->writequeue)
1465 			break;
1466 
1467 		len = e->len;
1468 		offset = e->offset;
1469 		BUG_ON(len == 0 && e->users == 0);
1470 		spin_unlock(&con->writequeue_lock);
1471 
1472 		ret = 0;
1473 		if (len) {
1474 			ret = kernel_sendpage(con->sock, e->page, offset, len,
1475 					      msg_flags);
1476 			if (ret == -EAGAIN || ret == 0) {
1477 				if (ret == -EAGAIN &&
1478 				    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1479 				    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1480 					/* Notify TCP that we're limited by the
1481 					 * application window size.
1482 					 */
1483 					set_bit(SOCK_NOSPACE, &con->sock->flags);
1484 					con->sock->sk->sk_write_pending++;
1485 				}
1486 				cond_resched();
1487 				goto out;
1488 			} else if (ret < 0)
1489 				goto send_error;
1490 		}
1491 
1492 		/* Don't starve people filling buffers */
1493 		if (++count >= MAX_SEND_MSG_COUNT) {
1494 			cond_resched();
1495 			count = 0;
1496 		}
1497 
1498 		spin_lock(&con->writequeue_lock);
1499 		e->offset += ret;
1500 		e->len -= ret;
1501 
1502 		if (e->len == 0 && e->users == 0) {
1503 			list_del(&e->list);
1504 			free_entry(e);
1505 		}
1506 	}
1507 	spin_unlock(&con->writequeue_lock);
1508 out:
1509 	mutex_unlock(&con->sock_mutex);
1510 	return;
1511 
1512 send_error:
1513 	mutex_unlock(&con->sock_mutex);
1514 	close_connection(con, false);
1515 	lowcomms_connect_sock(con);
1516 	return;
1517 
1518 out_connect:
1519 	mutex_unlock(&con->sock_mutex);
1520 	if (!test_bit(CF_INIT_PENDING, &con->flags))
1521 		lowcomms_connect_sock(con);
1522 }
1523 
1524 static void clean_one_writequeue(struct connection *con)
1525 {
1526 	struct writequeue_entry *e, *safe;
1527 
1528 	spin_lock(&con->writequeue_lock);
1529 	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1530 		list_del(&e->list);
1531 		free_entry(e);
1532 	}
1533 	spin_unlock(&con->writequeue_lock);
1534 }
1535 
1536 /* Called from recovery when it knows that a node has
1537    left the cluster */
1538 int dlm_lowcomms_close(int nodeid)
1539 {
1540 	struct connection *con;
1541 	struct dlm_node_addr *na;
1542 
1543 	log_print("closing connection to node %d", nodeid);
1544 	con = nodeid2con(nodeid, 0);
1545 	if (con) {
1546 		clear_bit(CF_CONNECT_PENDING, &con->flags);
1547 		clear_bit(CF_WRITE_PENDING, &con->flags);
1548 		set_bit(CF_CLOSE, &con->flags);
1549 		if (cancel_work_sync(&con->swork))
1550 			log_print("canceled swork for node %d", nodeid);
1551 		if (cancel_work_sync(&con->rwork))
1552 			log_print("canceled rwork for node %d", nodeid);
1553 		clean_one_writequeue(con);
1554 		close_connection(con, true);
1555 	}
1556 
1557 	spin_lock(&dlm_node_addrs_spin);
1558 	na = find_node_addr(nodeid);
1559 	if (na) {
1560 		list_del(&na->list);
1561 		while (na->addr_count--)
1562 			kfree(na->addr[na->addr_count]);
1563 		kfree(na);
1564 	}
1565 	spin_unlock(&dlm_node_addrs_spin);
1566 
1567 	return 0;
1568 }
1569 
1570 /* Receive workqueue function */
1571 static void process_recv_sockets(struct work_struct *work)
1572 {
1573 	struct connection *con = container_of(work, struct connection, rwork);
1574 	int err;
1575 
1576 	clear_bit(CF_READ_PENDING, &con->flags);
1577 	do {
1578 		err = con->rx_action(con);
1579 	} while (!err);
1580 }
1581 
1582 /* Send workqueue function */
1583 static void process_send_sockets(struct work_struct *work)
1584 {
1585 	struct connection *con = container_of(work, struct connection, swork);
1586 
1587 	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1588 		con->connect_action(con);
1589 		set_bit(CF_WRITE_PENDING, &con->flags);
1590 	}
1591 	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1592 		send_to_sock(con);
1593 }
1594 
1595 
1596 /* Discard all entries on the write queues */
1597 static void clean_writequeues(void)
1598 {
1599 	foreach_conn(clean_one_writequeue);
1600 }
1601 
1602 static void work_stop(void)
1603 {
1604 	destroy_workqueue(recv_workqueue);
1605 	destroy_workqueue(send_workqueue);
1606 }
1607 
1608 static int work_start(void)
1609 {
1610 	recv_workqueue = alloc_workqueue("dlm_recv",
1611 					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1612 	if (!recv_workqueue) {
1613 		log_print("can't start dlm_recv");
1614 		return -ENOMEM;
1615 	}
1616 
1617 	send_workqueue = alloc_workqueue("dlm_send",
1618 					 WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1619 	if (!send_workqueue) {
1620 		log_print("can't start dlm_send");
1621 		destroy_workqueue(recv_workqueue);
1622 		return -ENOMEM;
1623 	}
1624 
1625 	return 0;
1626 }
1627 
1628 static void stop_conn(struct connection *con)
1629 {
1630 	con->flags |= 0x0F;
1631 	if (con->sock && con->sock->sk)
1632 		con->sock->sk->sk_user_data = NULL;
1633 }
1634 
1635 static void free_conn(struct connection *con)
1636 {
1637 	close_connection(con, true);
1638 	if (con->othercon)
1639 		kmem_cache_free(con_cache, con->othercon);
1640 	hlist_del(&con->list);
1641 	kmem_cache_free(con_cache, con);
1642 }
1643 
1644 void dlm_lowcomms_stop(void)
1645 {
1646 	/* Set all the flags to prevent any
1647 	   socket activity.
1648 	*/
1649 	mutex_lock(&connections_lock);
1650 	dlm_allow_conn = 0;
1651 	foreach_conn(stop_conn);
1652 	mutex_unlock(&connections_lock);
1653 
1654 	work_stop();
1655 
1656 	mutex_lock(&connections_lock);
1657 	clean_writequeues();
1658 
1659 	foreach_conn(free_conn);
1660 
1661 	mutex_unlock(&connections_lock);
1662 	kmem_cache_destroy(con_cache);
1663 }
1664 
1665 int dlm_lowcomms_start(void)
1666 {
1667 	int error = -EINVAL;
1668 	struct connection *con;
1669 	int i;
1670 
1671 	for (i = 0; i < CONN_HASH_SIZE; i++)
1672 		INIT_HLIST_HEAD(&connection_hash[i]);
1673 
1674 	init_local();
1675 	if (!dlm_local_count) {
1676 		error = -ENOTCONN;
1677 		log_print("no local IP address has been set");
1678 		goto fail;
1679 	}
1680 
1681 	error = -ENOMEM;
1682 	con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1683 				      __alignof__(struct connection), 0,
1684 				      NULL);
1685 	if (!con_cache)
1686 		goto fail;
1687 
1688 	error = work_start();
1689 	if (error)
1690 		goto fail_destroy;
1691 
1692 	dlm_allow_conn = 1;
1693 
1694 	/* Start listening */
1695 	if (dlm_config.ci_protocol == 0)
1696 		error = tcp_listen_for_all();
1697 	else
1698 		error = sctp_listen_for_all();
1699 	if (error)
1700 		goto fail_unlisten;
1701 
1702 	return 0;
1703 
1704 fail_unlisten:
1705 	dlm_allow_conn = 0;
1706 	con = nodeid2con(0,0);
1707 	if (con) {
1708 		close_connection(con, false);
1709 		kmem_cache_free(con_cache, con);
1710 	}
1711 fail_destroy:
1712 	kmem_cache_destroy(con_cache);
1713 fail:
1714 	return error;
1715 }
1716 
1717 void dlm_lowcomms_exit(void)
1718 {
1719 	struct dlm_node_addr *na, *safe;
1720 
1721 	spin_lock(&dlm_node_addrs_spin);
1722 	list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1723 		list_del(&na->list);
1724 		while (na->addr_count--)
1725 			kfree(na->addr[na->addr_count]);
1726 		kfree(na);
1727 	}
1728 	spin_unlock(&dlm_node_addrs_spin);
1729 }
1730