xref: /openbmc/linux/fs/dlm/lowcomms.c (revision c852a6d7)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11 
12 /*
13  * lowcomms.c
14  *
15  * This is the "low-level" comms layer.
16  *
17  * It is responsible for sending/receiving messages
18  * from other nodes in the cluster.
19  *
20  * Cluster nodes are referred to by their nodeids. nodeids are
21  * simply 32 bit numbers to the locking module - if they need to
22  * be expanded for the cluster infrastructure then that is its
23  * responsibility. It is this layer's
24  * responsibility to resolve these into IP address or
25  * whatever it needs for inter-node communication.
26  *
27  * The comms level is two kernel threads that deal mainly with
28  * the receiving of messages from other nodes and passing them
29  * up to the mid-level comms layer (which understands the
30  * message format) for execution by the locking core, and
31  * a send thread which does all the setting up of connections
32  * to remote nodes and the sending of data. Threads are not allowed
33  * to send their own data because it may cause them to wait in times
34  * of high load. Also, this way, the sending thread can collect together
35  * messages bound for one node and send them in one block.
36  *
37  * lowcomms will choose to use either TCP or SCTP as its transport layer
38  * depending on the configuration variable 'protocol'. This should be set
39  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
40  * cluster-wide mechanism as it must be the same on all nodes of the cluster
41  * for the DLM to function.
42  *
43  */
44 
45 #include <asm/ioctls.h>
46 #include <net/sock.h>
47 #include <net/tcp.h>
48 #include <linux/pagemap.h>
49 #include <linux/file.h>
50 #include <linux/mutex.h>
51 #include <linux/sctp.h>
52 #include <linux/slab.h>
53 #include <net/sctp/sctp.h>
54 #include <net/ipv6.h>
55 
56 #include <trace/events/dlm.h>
57 
58 #include "dlm_internal.h"
59 #include "lowcomms.h"
60 #include "midcomms.h"
61 #include "memory.h"
62 #include "config.h"
63 
64 #define NEEDED_RMEM (4*1024*1024)
65 
66 /* Number of messages to send before rescheduling */
67 #define MAX_SEND_MSG_COUNT 25
68 
69 struct connection {
70 	struct socket *sock;	/* NULL if not connected */
71 	uint32_t nodeid;	/* So we know who we are in the list */
72 	struct mutex sock_mutex;
73 	unsigned long flags;
74 #define CF_READ_PENDING 1
75 #define CF_WRITE_PENDING 2
76 #define CF_INIT_PENDING 4
77 #define CF_IS_OTHERCON 5
78 #define CF_CLOSE 6
79 #define CF_APP_LIMITED 7
80 #define CF_CLOSING 8
81 #define CF_CONNECTED 9
82 #define CF_RECONNECT 10
83 #define CF_DELAY_CONNECT 11
84 	struct list_head writequeue;  /* List of outgoing writequeue_entries */
85 	spinlock_t writequeue_lock;
86 	int retries;
87 #define MAX_CONNECT_RETRIES 3
88 	struct hlist_node list;
89 	struct connection *othercon;
90 	struct connection *sendcon;
91 	struct work_struct rwork; /* Receive workqueue */
92 	struct work_struct swork; /* Send workqueue */
93 	unsigned char *rx_buf;
94 	int rx_buflen;
95 	int rx_leftover;
96 	int mark;
97 	int addr_count;
98 	int curr_addr_index;
99 	struct sockaddr_storage addr[DLM_MAX_ADDR_COUNT];
100 	spinlock_t addrs_lock;
101 	struct rcu_head rcu;
102 };
103 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
104 
105 struct listen_connection {
106 	struct socket *sock;
107 	struct work_struct rwork;
108 };
109 
110 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
111 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
112 
113 /* An entry waiting to be sent */
114 struct writequeue_entry {
115 	struct list_head list;
116 	struct page *page;
117 	int offset;
118 	int len;
119 	int end;
120 	int users;
121 	bool dirty;
122 	struct connection *con;
123 	struct list_head msgs;
124 	struct kref ref;
125 };
126 
127 struct dlm_msg {
128 	struct writequeue_entry *entry;
129 	struct dlm_msg *orig_msg;
130 	bool retransmit;
131 	void *ppc;
132 	int len;
133 	int idx; /* new()/commit() idx exchange */
134 
135 	struct list_head list;
136 	struct kref ref;
137 };
138 
139 struct dlm_proto_ops {
140 	bool try_new_addr;
141 	const char *name;
142 	int proto;
143 
144 	int (*connect)(struct connection *con, struct socket *sock,
145 		       struct sockaddr *addr, int addr_len);
146 	void (*sockopts)(struct socket *sock);
147 	int (*bind)(struct socket *sock);
148 	int (*listen_validate)(void);
149 	void (*listen_sockopts)(struct socket *sock);
150 	int (*listen_bind)(struct socket *sock);
151 };
152 
153 static struct listen_sock_callbacks {
154 	void (*sk_error_report)(struct sock *);
155 	void (*sk_data_ready)(struct sock *);
156 	void (*sk_state_change)(struct sock *);
157 	void (*sk_write_space)(struct sock *);
158 } listen_sock;
159 
160 static struct listen_connection listen_con;
161 static struct sockaddr_storage dlm_local_addr[DLM_MAX_ADDR_COUNT];
162 static int dlm_local_count;
163 
164 /* Work queues */
165 static struct workqueue_struct *recv_workqueue;
166 static struct workqueue_struct *send_workqueue;
167 
168 static struct hlist_head connection_hash[CONN_HASH_SIZE];
169 static DEFINE_SPINLOCK(connections_lock);
170 DEFINE_STATIC_SRCU(connections_srcu);
171 
172 static const struct dlm_proto_ops *dlm_proto_ops;
173 
174 static void process_recv_sockets(struct work_struct *work);
175 static void process_send_sockets(struct work_struct *work);
176 
177 bool dlm_lowcomms_is_running(void)
178 {
179 	return !!listen_con.sock;
180 }
181 
182 static void writequeue_entry_ctor(void *data)
183 {
184 	struct writequeue_entry *entry = data;
185 
186 	INIT_LIST_HEAD(&entry->msgs);
187 }
188 
189 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
190 {
191 	return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
192 				 0, 0, writequeue_entry_ctor);
193 }
194 
195 struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
196 {
197 	return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
198 }
199 
200 /* need to held writequeue_lock */
201 static struct writequeue_entry *con_next_wq(struct connection *con)
202 {
203 	struct writequeue_entry *e;
204 
205 	e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry,
206 				     list);
207 	/* if len is zero nothing is to send, if there are users filling
208 	 * buffers we wait until the users are done so we can send more.
209 	 */
210 	if (!e || e->users || e->len == 0)
211 		return NULL;
212 
213 	return e;
214 }
215 
216 static struct connection *__find_con(int nodeid, int r)
217 {
218 	struct connection *con;
219 
220 	hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
221 		if (con->nodeid == nodeid)
222 			return con;
223 	}
224 
225 	return NULL;
226 }
227 
228 static int dlm_con_init(struct connection *con, int nodeid)
229 {
230 	con->rx_buflen = dlm_config.ci_buffer_size;
231 	con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
232 	if (!con->rx_buf)
233 		return -ENOMEM;
234 
235 	con->nodeid = nodeid;
236 	mutex_init(&con->sock_mutex);
237 	INIT_LIST_HEAD(&con->writequeue);
238 	spin_lock_init(&con->writequeue_lock);
239 	INIT_WORK(&con->swork, process_send_sockets);
240 	INIT_WORK(&con->rwork, process_recv_sockets);
241 
242 	return 0;
243 }
244 
245 /*
246  * If 'allocation' is zero then we don't attempt to create a new
247  * connection structure for this node.
248  */
249 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
250 {
251 	struct connection *con, *tmp;
252 	int r, ret;
253 
254 	r = nodeid_hash(nodeid);
255 	con = __find_con(nodeid, r);
256 	if (con || !alloc)
257 		return con;
258 
259 	con = kzalloc(sizeof(*con), alloc);
260 	if (!con)
261 		return NULL;
262 
263 	ret = dlm_con_init(con, nodeid);
264 	if (ret) {
265 		kfree(con);
266 		return NULL;
267 	}
268 
269 	spin_lock(&connections_lock);
270 	/* Because multiple workqueues/threads calls this function it can
271 	 * race on multiple cpu's. Instead of locking hot path __find_con()
272 	 * we just check in rare cases of recently added nodes again
273 	 * under protection of connections_lock. If this is the case we
274 	 * abort our connection creation and return the existing connection.
275 	 */
276 	tmp = __find_con(nodeid, r);
277 	if (tmp) {
278 		spin_unlock(&connections_lock);
279 		kfree(con->rx_buf);
280 		kfree(con);
281 		return tmp;
282 	}
283 
284 	hlist_add_head_rcu(&con->list, &connection_hash[r]);
285 	spin_unlock(&connections_lock);
286 
287 	return con;
288 }
289 
290 /* Loop round all connections */
291 static void foreach_conn(void (*conn_func)(struct connection *c))
292 {
293 	int i;
294 	struct connection *con;
295 
296 	for (i = 0; i < CONN_HASH_SIZE; i++) {
297 		hlist_for_each_entry_rcu(con, &connection_hash[i], list)
298 			conn_func(con);
299 	}
300 }
301 
302 static int addr_compare(const struct sockaddr_storage *x,
303 			const struct sockaddr_storage *y)
304 {
305 	switch (x->ss_family) {
306 	case AF_INET: {
307 		struct sockaddr_in *sinx = (struct sockaddr_in *)x;
308 		struct sockaddr_in *siny = (struct sockaddr_in *)y;
309 		if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
310 			return 0;
311 		if (sinx->sin_port != siny->sin_port)
312 			return 0;
313 		break;
314 	}
315 	case AF_INET6: {
316 		struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
317 		struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
318 		if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
319 			return 0;
320 		if (sinx->sin6_port != siny->sin6_port)
321 			return 0;
322 		break;
323 	}
324 	default:
325 		return 0;
326 	}
327 	return 1;
328 }
329 
330 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
331 			  struct sockaddr *sa_out, bool try_new_addr,
332 			  unsigned int *mark)
333 {
334 	struct sockaddr_storage sas;
335 	struct connection *con;
336 	int idx;
337 
338 	if (!dlm_local_count)
339 		return -1;
340 
341 	idx = srcu_read_lock(&connections_srcu);
342 	con = nodeid2con(nodeid, 0);
343 	if (!con) {
344 		srcu_read_unlock(&connections_srcu, idx);
345 		return -ENOENT;
346 	}
347 
348 	spin_lock(&con->addrs_lock);
349 	if (!con->addr_count) {
350 		spin_unlock(&con->addrs_lock);
351 		srcu_read_unlock(&connections_srcu, idx);
352 		return -ENOENT;
353 	}
354 
355 	memcpy(&sas, &con->addr[con->curr_addr_index],
356 	       sizeof(struct sockaddr_storage));
357 
358 	if (try_new_addr) {
359 		con->curr_addr_index++;
360 		if (con->curr_addr_index == con->addr_count)
361 			con->curr_addr_index = 0;
362 	}
363 
364 	*mark = con->mark;
365 	spin_unlock(&con->addrs_lock);
366 
367 	if (sas_out)
368 		memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
369 
370 	if (!sa_out) {
371 		srcu_read_unlock(&connections_srcu, idx);
372 		return 0;
373 	}
374 
375 	if (dlm_local_addr[0].ss_family == AF_INET) {
376 		struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
377 		struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
378 		ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
379 	} else {
380 		struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
381 		struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
382 		ret6->sin6_addr = in6->sin6_addr;
383 	}
384 
385 	srcu_read_unlock(&connections_srcu, idx);
386 	return 0;
387 }
388 
389 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
390 			  unsigned int *mark)
391 {
392 	struct connection *con;
393 	int i, idx, addr_i;
394 
395 	idx = srcu_read_lock(&connections_srcu);
396 	for (i = 0; i < CONN_HASH_SIZE; i++) {
397 		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
398 			WARN_ON_ONCE(!con->addr_count);
399 
400 			spin_lock(&con->addrs_lock);
401 			for (addr_i = 0; addr_i < con->addr_count; addr_i++) {
402 				if (addr_compare(&con->addr[addr_i], addr)) {
403 					*nodeid = con->nodeid;
404 					*mark = con->mark;
405 					spin_unlock(&con->addrs_lock);
406 					srcu_read_unlock(&connections_srcu, idx);
407 					return 0;
408 				}
409 			}
410 			spin_unlock(&con->addrs_lock);
411 		}
412 	}
413 	srcu_read_unlock(&connections_srcu, idx);
414 
415 	return -ENOENT;
416 }
417 
418 static bool dlm_lowcomms_con_has_addr(const struct connection *con,
419 				      const struct sockaddr_storage *addr)
420 {
421 	int i;
422 
423 	for (i = 0; i < con->addr_count; i++) {
424 		if (addr_compare(&con->addr[i], addr))
425 			return true;
426 	}
427 
428 	return false;
429 }
430 
431 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
432 {
433 	struct connection *con;
434 	bool ret, idx;
435 
436 	idx = srcu_read_lock(&connections_srcu);
437 	con = nodeid2con(nodeid, GFP_NOFS);
438 	if (!con) {
439 		srcu_read_unlock(&connections_srcu, idx);
440 		return -ENOMEM;
441 	}
442 
443 	spin_lock(&con->addrs_lock);
444 	if (!con->addr_count) {
445 		memcpy(&con->addr[0], addr, sizeof(*addr));
446 		con->addr_count = 1;
447 		con->mark = dlm_config.ci_mark;
448 		spin_unlock(&con->addrs_lock);
449 		srcu_read_unlock(&connections_srcu, idx);
450 		return 0;
451 	}
452 
453 	ret = dlm_lowcomms_con_has_addr(con, addr);
454 	if (ret) {
455 		spin_unlock(&con->addrs_lock);
456 		srcu_read_unlock(&connections_srcu, idx);
457 		return -EEXIST;
458 	}
459 
460 	if (con->addr_count >= DLM_MAX_ADDR_COUNT) {
461 		spin_unlock(&con->addrs_lock);
462 		srcu_read_unlock(&connections_srcu, idx);
463 		return -ENOSPC;
464 	}
465 
466 	memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr));
467 	srcu_read_unlock(&connections_srcu, idx);
468 	spin_unlock(&con->addrs_lock);
469 	return 0;
470 }
471 
472 /* Data available on socket or listen socket received a connect */
473 static void lowcomms_data_ready(struct sock *sk)
474 {
475 	struct connection *con = sock2con(sk);
476 
477 	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
478 		queue_work(recv_workqueue, &con->rwork);
479 }
480 
481 static void lowcomms_listen_data_ready(struct sock *sk)
482 {
483 	queue_work(recv_workqueue, &listen_con.rwork);
484 }
485 
486 static void lowcomms_write_space(struct sock *sk)
487 {
488 	struct connection *con = sock2con(sk);
489 
490 	if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
491 		log_print("connected to node %d", con->nodeid);
492 		queue_work(send_workqueue, &con->swork);
493 		return;
494 	}
495 
496 	clear_bit(SOCK_NOSPACE, &con->sock->flags);
497 
498 	if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
499 		con->sock->sk->sk_write_pending--;
500 		clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
501 	}
502 
503 	queue_work(send_workqueue, &con->swork);
504 }
505 
506 static inline void lowcomms_connect_sock(struct connection *con)
507 {
508 	if (test_bit(CF_CLOSE, &con->flags))
509 		return;
510 	queue_work(send_workqueue, &con->swork);
511 	cond_resched();
512 }
513 
514 static void lowcomms_state_change(struct sock *sk)
515 {
516 	/* SCTP layer is not calling sk_data_ready when the connection
517 	 * is done, so we catch the signal through here. Also, it
518 	 * doesn't switch socket state when entering shutdown, so we
519 	 * skip the write in that case.
520 	 */
521 	if (sk->sk_shutdown) {
522 		if (sk->sk_shutdown == RCV_SHUTDOWN)
523 			lowcomms_data_ready(sk);
524 	} else if (sk->sk_state == TCP_ESTABLISHED) {
525 		lowcomms_write_space(sk);
526 	}
527 }
528 
529 int dlm_lowcomms_connect_node(int nodeid)
530 {
531 	struct connection *con;
532 	int idx;
533 
534 	if (nodeid == dlm_our_nodeid())
535 		return 0;
536 
537 	idx = srcu_read_lock(&connections_srcu);
538 	con = nodeid2con(nodeid, 0);
539 	if (WARN_ON_ONCE(!con)) {
540 		srcu_read_unlock(&connections_srcu, idx);
541 		return -ENOENT;
542 	}
543 
544 	lowcomms_connect_sock(con);
545 	srcu_read_unlock(&connections_srcu, idx);
546 
547 	return 0;
548 }
549 
550 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
551 {
552 	struct connection *con;
553 	int idx;
554 
555 	idx = srcu_read_lock(&connections_srcu);
556 	con = nodeid2con(nodeid, 0);
557 	if (!con) {
558 		srcu_read_unlock(&connections_srcu, idx);
559 		return -ENOENT;
560 	}
561 
562 	spin_lock(&con->addrs_lock);
563 	con->mark = mark;
564 	spin_unlock(&con->addrs_lock);
565 	srcu_read_unlock(&connections_srcu, idx);
566 	return 0;
567 }
568 
569 static void lowcomms_error_report(struct sock *sk)
570 {
571 	struct connection *con = sock2con(sk);
572 	struct inet_sock *inet;
573 
574 	inet = inet_sk(sk);
575 	switch (sk->sk_family) {
576 	case AF_INET:
577 		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
578 				   "sending to node %d at %pI4, dport %d, "
579 				   "sk_err=%d/%d\n", dlm_our_nodeid(),
580 				   con->nodeid, &inet->inet_daddr,
581 				   ntohs(inet->inet_dport), sk->sk_err,
582 				   sk->sk_err_soft);
583 		break;
584 #if IS_ENABLED(CONFIG_IPV6)
585 	case AF_INET6:
586 		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
587 				   "sending to node %d at %pI6c, "
588 				   "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
589 				   con->nodeid, &sk->sk_v6_daddr,
590 				   ntohs(inet->inet_dport), sk->sk_err,
591 				   sk->sk_err_soft);
592 		break;
593 #endif
594 	default:
595 		printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
596 				   "invalid socket family %d set, "
597 				   "sk_err=%d/%d\n", dlm_our_nodeid(),
598 				   sk->sk_family, sk->sk_err, sk->sk_err_soft);
599 		goto out;
600 	}
601 
602 	/* below sendcon only handling */
603 	if (test_bit(CF_IS_OTHERCON, &con->flags))
604 		con = con->sendcon;
605 
606 	switch (sk->sk_err) {
607 	case ECONNREFUSED:
608 		set_bit(CF_DELAY_CONNECT, &con->flags);
609 		break;
610 	default:
611 		break;
612 	}
613 
614 	if (!test_and_set_bit(CF_RECONNECT, &con->flags))
615 		queue_work(send_workqueue, &con->swork);
616 
617 out:
618 	listen_sock.sk_error_report(sk);
619 }
620 
621 static void restore_callbacks(struct socket *sock)
622 {
623 	struct sock *sk = sock->sk;
624 
625 	lock_sock(sk);
626 	sk->sk_user_data = NULL;
627 	sk->sk_data_ready = listen_sock.sk_data_ready;
628 	sk->sk_state_change = listen_sock.sk_state_change;
629 	sk->sk_write_space = listen_sock.sk_write_space;
630 	sk->sk_error_report = listen_sock.sk_error_report;
631 	release_sock(sk);
632 }
633 
634 /* Make a socket active */
635 static void add_sock(struct socket *sock, struct connection *con)
636 {
637 	struct sock *sk = sock->sk;
638 
639 	lock_sock(sk);
640 	con->sock = sock;
641 
642 	sk->sk_user_data = con;
643 	/* Install a data_ready callback */
644 	sk->sk_data_ready = lowcomms_data_ready;
645 	sk->sk_write_space = lowcomms_write_space;
646 	sk->sk_state_change = lowcomms_state_change;
647 	sk->sk_allocation = GFP_NOFS;
648 	sk->sk_error_report = lowcomms_error_report;
649 	release_sock(sk);
650 }
651 
652 /* Add the port number to an IPv6 or 4 sockaddr and return the address
653    length */
654 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
655 			  int *addr_len)
656 {
657 	saddr->ss_family =  dlm_local_addr[0].ss_family;
658 	if (saddr->ss_family == AF_INET) {
659 		struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
660 		in4_addr->sin_port = cpu_to_be16(port);
661 		*addr_len = sizeof(struct sockaddr_in);
662 		memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
663 	} else {
664 		struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
665 		in6_addr->sin6_port = cpu_to_be16(port);
666 		*addr_len = sizeof(struct sockaddr_in6);
667 	}
668 	memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
669 }
670 
671 static void dlm_page_release(struct kref *kref)
672 {
673 	struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
674 						  ref);
675 
676 	__free_page(e->page);
677 	dlm_free_writequeue(e);
678 }
679 
680 static void dlm_msg_release(struct kref *kref)
681 {
682 	struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
683 
684 	kref_put(&msg->entry->ref, dlm_page_release);
685 	dlm_free_msg(msg);
686 }
687 
688 static void free_entry(struct writequeue_entry *e)
689 {
690 	struct dlm_msg *msg, *tmp;
691 
692 	list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
693 		if (msg->orig_msg) {
694 			msg->orig_msg->retransmit = false;
695 			kref_put(&msg->orig_msg->ref, dlm_msg_release);
696 		}
697 
698 		list_del(&msg->list);
699 		kref_put(&msg->ref, dlm_msg_release);
700 	}
701 
702 	list_del(&e->list);
703 	kref_put(&e->ref, dlm_page_release);
704 }
705 
706 static void dlm_close_sock(struct socket **sock)
707 {
708 	if (*sock) {
709 		restore_callbacks(*sock);
710 		sock_release(*sock);
711 		*sock = NULL;
712 	}
713 }
714 
715 /* Close a remote connection and tidy up */
716 static void close_connection(struct connection *con, bool and_other,
717 			     bool tx, bool rx)
718 {
719 	bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
720 	struct writequeue_entry *e;
721 
722 	if (tx && !closing && cancel_work_sync(&con->swork)) {
723 		log_print("canceled swork for node %d", con->nodeid);
724 		clear_bit(CF_WRITE_PENDING, &con->flags);
725 	}
726 	if (rx && !closing && cancel_work_sync(&con->rwork)) {
727 		log_print("canceled rwork for node %d", con->nodeid);
728 		clear_bit(CF_READ_PENDING, &con->flags);
729 	}
730 
731 	mutex_lock(&con->sock_mutex);
732 	dlm_close_sock(&con->sock);
733 
734 	if (con->othercon && and_other) {
735 		/* Will only re-enter once. */
736 		close_connection(con->othercon, false, tx, rx);
737 	}
738 
739 	/* if we send a writequeue entry only a half way, we drop the
740 	 * whole entry because reconnection and that we not start of the
741 	 * middle of a msg which will confuse the other end.
742 	 *
743 	 * we can always drop messages because retransmits, but what we
744 	 * cannot allow is to transmit half messages which may be processed
745 	 * at the other side.
746 	 *
747 	 * our policy is to start on a clean state when disconnects, we don't
748 	 * know what's send/received on transport layer in this case.
749 	 */
750 	spin_lock(&con->writequeue_lock);
751 	if (!list_empty(&con->writequeue)) {
752 		e = list_first_entry(&con->writequeue, struct writequeue_entry,
753 				     list);
754 		if (e->dirty)
755 			free_entry(e);
756 	}
757 	spin_unlock(&con->writequeue_lock);
758 
759 	con->rx_leftover = 0;
760 	con->retries = 0;
761 	clear_bit(CF_APP_LIMITED, &con->flags);
762 	clear_bit(CF_CONNECTED, &con->flags);
763 	clear_bit(CF_DELAY_CONNECT, &con->flags);
764 	clear_bit(CF_RECONNECT, &con->flags);
765 	mutex_unlock(&con->sock_mutex);
766 	clear_bit(CF_CLOSING, &con->flags);
767 }
768 
769 static int con_realloc_receive_buf(struct connection *con, int newlen)
770 {
771 	unsigned char *newbuf;
772 
773 	newbuf = kmalloc(newlen, GFP_NOFS);
774 	if (!newbuf)
775 		return -ENOMEM;
776 
777 	/* copy any leftover from last receive */
778 	if (con->rx_leftover)
779 		memmove(newbuf, con->rx_buf, con->rx_leftover);
780 
781 	/* swap to new buffer space */
782 	kfree(con->rx_buf);
783 	con->rx_buflen = newlen;
784 	con->rx_buf = newbuf;
785 
786 	return 0;
787 }
788 
789 /* Data received from remote end */
790 static int receive_from_sock(struct connection *con)
791 {
792 	struct msghdr msg;
793 	struct kvec iov;
794 	int ret, buflen;
795 
796 	mutex_lock(&con->sock_mutex);
797 
798 	if (con->sock == NULL) {
799 		ret = -EAGAIN;
800 		goto out_close;
801 	}
802 
803 	/* realloc if we get new buffer size to read out */
804 	buflen = dlm_config.ci_buffer_size;
805 	if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
806 		ret = con_realloc_receive_buf(con, buflen);
807 		if (ret < 0)
808 			goto out_resched;
809 	}
810 
811 	for (;;) {
812 		/* calculate new buffer parameter regarding last receive and
813 		 * possible leftover bytes
814 		 */
815 		iov.iov_base = con->rx_buf + con->rx_leftover;
816 		iov.iov_len = con->rx_buflen - con->rx_leftover;
817 
818 		memset(&msg, 0, sizeof(msg));
819 		msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
820 		ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
821 				     msg.msg_flags);
822 		trace_dlm_recv(con->nodeid, ret);
823 		if (ret == -EAGAIN)
824 			break;
825 		else if (ret <= 0)
826 			goto out_close;
827 
828 		/* new buflen according readed bytes and leftover from last receive */
829 		buflen = ret + con->rx_leftover;
830 		ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
831 		if (ret < 0)
832 			goto out_close;
833 
834 		/* calculate leftover bytes from process and put it into begin of
835 		 * the receive buffer, so next receive we have the full message
836 		 * at the start address of the receive buffer.
837 		 */
838 		con->rx_leftover = buflen - ret;
839 		if (con->rx_leftover) {
840 			memmove(con->rx_buf, con->rx_buf + ret,
841 				con->rx_leftover);
842 		}
843 	}
844 
845 	dlm_midcomms_receive_done(con->nodeid);
846 	mutex_unlock(&con->sock_mutex);
847 	return 0;
848 
849 out_resched:
850 	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
851 		queue_work(recv_workqueue, &con->rwork);
852 	mutex_unlock(&con->sock_mutex);
853 	return -EAGAIN;
854 
855 out_close:
856 	if (ret == 0) {
857 		log_print("connection %p got EOF from %d",
858 			  con, con->nodeid);
859 
860 		mutex_unlock(&con->sock_mutex);
861 		close_connection(con, false, true, false);
862 		/* signal to breaking receive worker */
863 		ret = -1;
864 	} else {
865 		mutex_unlock(&con->sock_mutex);
866 	}
867 	return ret;
868 }
869 
870 /* Listening socket is busy, accept a connection */
871 static int accept_from_sock(struct listen_connection *con)
872 {
873 	int result;
874 	struct sockaddr_storage peeraddr;
875 	struct socket *newsock;
876 	int len, idx;
877 	int nodeid;
878 	struct connection *newcon;
879 	struct connection *addcon;
880 	unsigned int mark;
881 
882 	if (!con->sock)
883 		return -ENOTCONN;
884 
885 	result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
886 	if (result < 0)
887 		goto accept_err;
888 
889 	/* Get the connected socket's peer */
890 	memset(&peeraddr, 0, sizeof(peeraddr));
891 	len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
892 	if (len < 0) {
893 		result = -ECONNABORTED;
894 		goto accept_err;
895 	}
896 
897 	/* Get the new node's NODEID */
898 	make_sockaddr(&peeraddr, 0, &len);
899 	if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
900 		switch (peeraddr.ss_family) {
901 		case AF_INET: {
902 			struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
903 
904 			log_print("connect from non cluster IPv4 node %pI4",
905 				  &sin->sin_addr);
906 			break;
907 		}
908 #if IS_ENABLED(CONFIG_IPV6)
909 		case AF_INET6: {
910 			struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
911 
912 			log_print("connect from non cluster IPv6 node %pI6c",
913 				  &sin6->sin6_addr);
914 			break;
915 		}
916 #endif
917 		default:
918 			log_print("invalid family from non cluster node");
919 			break;
920 		}
921 
922 		sock_release(newsock);
923 		return -1;
924 	}
925 
926 	log_print("got connection from %d", nodeid);
927 
928 	/*  Check to see if we already have a connection to this node. This
929 	 *  could happen if the two nodes initiate a connection at roughly
930 	 *  the same time and the connections cross on the wire.
931 	 *  In this case we store the incoming one in "othercon"
932 	 */
933 	idx = srcu_read_lock(&connections_srcu);
934 	newcon = nodeid2con(nodeid, 0);
935 	if (WARN_ON_ONCE(!newcon)) {
936 		srcu_read_unlock(&connections_srcu, idx);
937 		result = -ENOENT;
938 		goto accept_err;
939 	}
940 
941 	sock_set_mark(newsock->sk, mark);
942 
943 	mutex_lock(&newcon->sock_mutex);
944 	if (newcon->sock) {
945 		struct connection *othercon = newcon->othercon;
946 
947 		if (!othercon) {
948 			othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
949 			if (!othercon) {
950 				log_print("failed to allocate incoming socket");
951 				mutex_unlock(&newcon->sock_mutex);
952 				srcu_read_unlock(&connections_srcu, idx);
953 				result = -ENOMEM;
954 				goto accept_err;
955 			}
956 
957 			result = dlm_con_init(othercon, nodeid);
958 			if (result < 0) {
959 				kfree(othercon);
960 				mutex_unlock(&newcon->sock_mutex);
961 				srcu_read_unlock(&connections_srcu, idx);
962 				goto accept_err;
963 			}
964 
965 			lockdep_set_subclass(&othercon->sock_mutex, 1);
966 			set_bit(CF_IS_OTHERCON, &othercon->flags);
967 			newcon->othercon = othercon;
968 			othercon->sendcon = newcon;
969 		} else {
970 			/* close other sock con if we have something new */
971 			close_connection(othercon, false, true, false);
972 		}
973 
974 		mutex_lock(&othercon->sock_mutex);
975 		add_sock(newsock, othercon);
976 		addcon = othercon;
977 		mutex_unlock(&othercon->sock_mutex);
978 	}
979 	else {
980 		/* accept copies the sk after we've saved the callbacks, so we
981 		   don't want to save them a second time or comm errors will
982 		   result in calling sk_error_report recursively. */
983 		add_sock(newsock, newcon);
984 		addcon = newcon;
985 	}
986 
987 	set_bit(CF_CONNECTED, &addcon->flags);
988 	mutex_unlock(&newcon->sock_mutex);
989 
990 	/*
991 	 * Add it to the active queue in case we got data
992 	 * between processing the accept adding the socket
993 	 * to the read_sockets list
994 	 */
995 	if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
996 		queue_work(recv_workqueue, &addcon->rwork);
997 
998 	srcu_read_unlock(&connections_srcu, idx);
999 
1000 	return 0;
1001 
1002 accept_err:
1003 	if (newsock)
1004 		sock_release(newsock);
1005 
1006 	if (result != -EAGAIN)
1007 		log_print("error accepting connection from node: %d", result);
1008 	return result;
1009 }
1010 
1011 /*
1012  * writequeue_entry_complete - try to delete and free write queue entry
1013  * @e: write queue entry to try to delete
1014  * @completed: bytes completed
1015  *
1016  * writequeue_lock must be held.
1017  */
1018 static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1019 {
1020 	e->offset += completed;
1021 	e->len -= completed;
1022 	/* signal that page was half way transmitted */
1023 	e->dirty = true;
1024 
1025 	if (e->len == 0 && e->users == 0)
1026 		free_entry(e);
1027 }
1028 
1029 /*
1030  * sctp_bind_addrs - bind a SCTP socket to all our addresses
1031  */
1032 static int sctp_bind_addrs(struct socket *sock, uint16_t port)
1033 {
1034 	struct sockaddr_storage localaddr;
1035 	struct sockaddr *addr = (struct sockaddr *)&localaddr;
1036 	int i, addr_len, result = 0;
1037 
1038 	for (i = 0; i < dlm_local_count; i++) {
1039 		memcpy(&localaddr, &dlm_local_addr[i], sizeof(localaddr));
1040 		make_sockaddr(&localaddr, port, &addr_len);
1041 
1042 		if (!i)
1043 			result = kernel_bind(sock, addr, addr_len);
1044 		else
1045 			result = sock_bind_add(sock->sk, addr, addr_len);
1046 
1047 		if (result < 0) {
1048 			log_print("Can't bind to %d addr number %d, %d.\n",
1049 				  port, i + 1, result);
1050 			break;
1051 		}
1052 	}
1053 	return result;
1054 }
1055 
1056 /* Get local addresses */
1057 static void init_local(void)
1058 {
1059 	struct sockaddr_storage sas;
1060 	int i;
1061 
1062 	dlm_local_count = 0;
1063 	for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1064 		if (dlm_our_addr(&sas, i))
1065 			break;
1066 
1067 		memcpy(&dlm_local_addr[dlm_local_count++], &sas, sizeof(sas));
1068 	}
1069 }
1070 
1071 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1072 {
1073 	struct writequeue_entry *entry;
1074 
1075 	entry = dlm_allocate_writequeue();
1076 	if (!entry)
1077 		return NULL;
1078 
1079 	entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1080 	if (!entry->page) {
1081 		dlm_free_writequeue(entry);
1082 		return NULL;
1083 	}
1084 
1085 	entry->offset = 0;
1086 	entry->len = 0;
1087 	entry->end = 0;
1088 	entry->dirty = false;
1089 	entry->con = con;
1090 	entry->users = 1;
1091 	kref_init(&entry->ref);
1092 	return entry;
1093 }
1094 
1095 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1096 					     char **ppc, void (*cb)(void *data),
1097 					     void *data)
1098 {
1099 	struct writequeue_entry *e;
1100 
1101 	spin_lock(&con->writequeue_lock);
1102 	if (!list_empty(&con->writequeue)) {
1103 		e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1104 		if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1105 			kref_get(&e->ref);
1106 
1107 			*ppc = page_address(e->page) + e->end;
1108 			if (cb)
1109 				cb(data);
1110 
1111 			e->end += len;
1112 			e->users++;
1113 			goto out;
1114 		}
1115 	}
1116 
1117 	e = new_writequeue_entry(con);
1118 	if (!e)
1119 		goto out;
1120 
1121 	kref_get(&e->ref);
1122 	*ppc = page_address(e->page);
1123 	e->end += len;
1124 	if (cb)
1125 		cb(data);
1126 
1127 	list_add_tail(&e->list, &con->writequeue);
1128 
1129 out:
1130 	spin_unlock(&con->writequeue_lock);
1131 	return e;
1132 };
1133 
1134 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1135 						gfp_t allocation, char **ppc,
1136 						void (*cb)(void *data),
1137 						void *data)
1138 {
1139 	struct writequeue_entry *e;
1140 	struct dlm_msg *msg;
1141 
1142 	msg = dlm_allocate_msg(allocation);
1143 	if (!msg)
1144 		return NULL;
1145 
1146 	kref_init(&msg->ref);
1147 
1148 	e = new_wq_entry(con, len, ppc, cb, data);
1149 	if (!e) {
1150 		dlm_free_msg(msg);
1151 		return NULL;
1152 	}
1153 
1154 	msg->retransmit = false;
1155 	msg->orig_msg = NULL;
1156 	msg->ppc = *ppc;
1157 	msg->len = len;
1158 	msg->entry = e;
1159 
1160 	return msg;
1161 }
1162 
1163 /* avoid false positive for nodes_srcu, unlock happens in
1164  * dlm_lowcomms_commit_msg which is a must call if success
1165  */
1166 #ifndef __CHECKER__
1167 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
1168 				     char **ppc, void (*cb)(void *data),
1169 				     void *data)
1170 {
1171 	struct connection *con;
1172 	struct dlm_msg *msg;
1173 	int idx;
1174 
1175 	if (len > DLM_MAX_SOCKET_BUFSIZE ||
1176 	    len < sizeof(struct dlm_header)) {
1177 		BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1178 		log_print("failed to allocate a buffer of size %d", len);
1179 		WARN_ON(1);
1180 		return NULL;
1181 	}
1182 
1183 	idx = srcu_read_lock(&connections_srcu);
1184 	con = nodeid2con(nodeid, 0);
1185 	if (WARN_ON_ONCE(!con)) {
1186 		srcu_read_unlock(&connections_srcu, idx);
1187 		return NULL;
1188 	}
1189 
1190 	msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
1191 	if (!msg) {
1192 		srcu_read_unlock(&connections_srcu, idx);
1193 		return NULL;
1194 	}
1195 
1196 	/* for dlm_lowcomms_commit_msg() */
1197 	kref_get(&msg->ref);
1198 	/* we assume if successful commit must called */
1199 	msg->idx = idx;
1200 	return msg;
1201 }
1202 #endif
1203 
1204 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1205 {
1206 	struct writequeue_entry *e = msg->entry;
1207 	struct connection *con = e->con;
1208 	int users;
1209 
1210 	spin_lock(&con->writequeue_lock);
1211 	kref_get(&msg->ref);
1212 	list_add(&msg->list, &e->msgs);
1213 
1214 	users = --e->users;
1215 	if (users)
1216 		goto out;
1217 
1218 	e->len = DLM_WQ_LENGTH_BYTES(e);
1219 	spin_unlock(&con->writequeue_lock);
1220 
1221 	queue_work(send_workqueue, &con->swork);
1222 	return;
1223 
1224 out:
1225 	spin_unlock(&con->writequeue_lock);
1226 	return;
1227 }
1228 
1229 /* avoid false positive for nodes_srcu, lock was happen in
1230  * dlm_lowcomms_new_msg
1231  */
1232 #ifndef __CHECKER__
1233 void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1234 {
1235 	_dlm_lowcomms_commit_msg(msg);
1236 	srcu_read_unlock(&connections_srcu, msg->idx);
1237 	/* because dlm_lowcomms_new_msg() */
1238 	kref_put(&msg->ref, dlm_msg_release);
1239 }
1240 #endif
1241 
1242 void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1243 {
1244 	kref_put(&msg->ref, dlm_msg_release);
1245 }
1246 
1247 /* does not held connections_srcu, usage workqueue only */
1248 int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1249 {
1250 	struct dlm_msg *msg_resend;
1251 	char *ppc;
1252 
1253 	if (msg->retransmit)
1254 		return 1;
1255 
1256 	msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1257 					      GFP_ATOMIC, &ppc, NULL, NULL);
1258 	if (!msg_resend)
1259 		return -ENOMEM;
1260 
1261 	msg->retransmit = true;
1262 	kref_get(&msg->ref);
1263 	msg_resend->orig_msg = msg;
1264 
1265 	memcpy(ppc, msg->ppc, msg->len);
1266 	_dlm_lowcomms_commit_msg(msg_resend);
1267 	dlm_lowcomms_put_msg(msg_resend);
1268 
1269 	return 0;
1270 }
1271 
1272 /* Send a message */
1273 static void send_to_sock(struct connection *con)
1274 {
1275 	const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1276 	struct writequeue_entry *e;
1277 	int len, offset, ret;
1278 	int count;
1279 
1280 again:
1281 	count = 0;
1282 
1283 	mutex_lock(&con->sock_mutex);
1284 	if (con->sock == NULL)
1285 		goto out_connect;
1286 
1287 	spin_lock(&con->writequeue_lock);
1288 	for (;;) {
1289 		e = con_next_wq(con);
1290 		if (!e)
1291 			break;
1292 
1293 		len = e->len;
1294 		offset = e->offset;
1295 		BUG_ON(len == 0 && e->users == 0);
1296 		spin_unlock(&con->writequeue_lock);
1297 
1298 		ret = kernel_sendpage(con->sock, e->page, offset, len,
1299 				      msg_flags);
1300 		trace_dlm_send(con->nodeid, ret);
1301 		if (ret == -EAGAIN || ret == 0) {
1302 			if (ret == -EAGAIN &&
1303 			    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1304 			    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1305 				/* Notify TCP that we're limited by the
1306 				 * application window size.
1307 				 */
1308 				set_bit(SOCK_NOSPACE, &con->sock->flags);
1309 				con->sock->sk->sk_write_pending++;
1310 			}
1311 			cond_resched();
1312 			goto out;
1313 		} else if (ret < 0)
1314 			goto out;
1315 
1316 		spin_lock(&con->writequeue_lock);
1317 		writequeue_entry_complete(e, ret);
1318 
1319 		/* Don't starve people filling buffers */
1320 		if (++count >= MAX_SEND_MSG_COUNT) {
1321 			spin_unlock(&con->writequeue_lock);
1322 			mutex_unlock(&con->sock_mutex);
1323 			cond_resched();
1324 			goto again;
1325 		}
1326 	}
1327 	spin_unlock(&con->writequeue_lock);
1328 
1329 out:
1330 	mutex_unlock(&con->sock_mutex);
1331 	return;
1332 
1333 out_connect:
1334 	mutex_unlock(&con->sock_mutex);
1335 	queue_work(send_workqueue, &con->swork);
1336 	cond_resched();
1337 }
1338 
1339 static void clean_one_writequeue(struct connection *con)
1340 {
1341 	struct writequeue_entry *e, *safe;
1342 
1343 	spin_lock(&con->writequeue_lock);
1344 	list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1345 		free_entry(e);
1346 	}
1347 	spin_unlock(&con->writequeue_lock);
1348 }
1349 
1350 static void connection_release(struct rcu_head *rcu)
1351 {
1352 	struct connection *con = container_of(rcu, struct connection, rcu);
1353 
1354 	kfree(con->rx_buf);
1355 	kfree(con);
1356 }
1357 
1358 /* Called from recovery when it knows that a node has
1359    left the cluster */
1360 int dlm_lowcomms_close(int nodeid)
1361 {
1362 	struct connection *con;
1363 	int idx;
1364 
1365 	log_print("closing connection to node %d", nodeid);
1366 
1367 	idx = srcu_read_lock(&connections_srcu);
1368 	con = nodeid2con(nodeid, 0);
1369 	if (WARN_ON_ONCE(!con)) {
1370 		srcu_read_unlock(&connections_srcu, idx);
1371 		return -ENOENT;
1372 	}
1373 
1374 	spin_lock(&connections_lock);
1375 	hlist_del_rcu(&con->list);
1376 	spin_unlock(&connections_lock);
1377 
1378 	close_connection(con, true, true, true);
1379 
1380 	clean_one_writequeue(con);
1381 	call_srcu(&connections_srcu, &con->rcu, connection_release);
1382 	if (con->othercon) {
1383 		clean_one_writequeue(con->othercon);
1384 		if (con->othercon)
1385 			call_srcu(&connections_srcu, &con->othercon->rcu, connection_release);
1386 	}
1387 	srcu_read_unlock(&connections_srcu, idx);
1388 
1389 	return 0;
1390 }
1391 
1392 /* Receive workqueue function */
1393 static void process_recv_sockets(struct work_struct *work)
1394 {
1395 	struct connection *con = container_of(work, struct connection, rwork);
1396 
1397 	clear_bit(CF_READ_PENDING, &con->flags);
1398 	receive_from_sock(con);
1399 }
1400 
1401 static void process_listen_recv_socket(struct work_struct *work)
1402 {
1403 	int ret;
1404 
1405 	do {
1406 		ret = accept_from_sock(&listen_con);
1407 	} while (!ret);
1408 }
1409 
1410 static void dlm_connect(struct connection *con)
1411 {
1412 	struct sockaddr_storage addr;
1413 	int result, addr_len;
1414 	struct socket *sock;
1415 	unsigned int mark;
1416 
1417 	/* Some odd races can cause double-connects, ignore them */
1418 	if (con->retries++ > MAX_CONNECT_RETRIES)
1419 		return;
1420 
1421 	if (con->sock) {
1422 		log_print("node %d already connected.", con->nodeid);
1423 		return;
1424 	}
1425 
1426 	memset(&addr, 0, sizeof(addr));
1427 	result = nodeid_to_addr(con->nodeid, &addr, NULL,
1428 				dlm_proto_ops->try_new_addr, &mark);
1429 	if (result < 0) {
1430 		log_print("no address for nodeid %d", con->nodeid);
1431 		return;
1432 	}
1433 
1434 	/* Create a socket to communicate with */
1435 	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1436 				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
1437 	if (result < 0)
1438 		goto socket_err;
1439 
1440 	sock_set_mark(sock->sk, mark);
1441 	dlm_proto_ops->sockopts(sock);
1442 
1443 	add_sock(sock, con);
1444 
1445 	result = dlm_proto_ops->bind(sock);
1446 	if (result < 0)
1447 		goto add_sock_err;
1448 
1449 	log_print_ratelimited("connecting to %d", con->nodeid);
1450 	make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1451 	result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1452 					addr_len);
1453 	if (result < 0)
1454 		goto add_sock_err;
1455 
1456 	return;
1457 
1458 add_sock_err:
1459 	dlm_close_sock(&con->sock);
1460 
1461 socket_err:
1462 	/*
1463 	 * Some errors are fatal and this list might need adjusting. For other
1464 	 * errors we try again until the max number of retries is reached.
1465 	 */
1466 	if (result != -EHOSTUNREACH &&
1467 	    result != -ENETUNREACH &&
1468 	    result != -ENETDOWN &&
1469 	    result != -EINVAL &&
1470 	    result != -EPROTONOSUPPORT) {
1471 		log_print("connect %d try %d error %d", con->nodeid,
1472 			  con->retries, result);
1473 		msleep(1000);
1474 		lowcomms_connect_sock(con);
1475 	}
1476 }
1477 
1478 /* Send workqueue function */
1479 static void process_send_sockets(struct work_struct *work)
1480 {
1481 	struct connection *con = container_of(work, struct connection, swork);
1482 
1483 	WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
1484 
1485 	clear_bit(CF_WRITE_PENDING, &con->flags);
1486 
1487 	if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
1488 		close_connection(con, false, false, true);
1489 		dlm_midcomms_unack_msg_resend(con->nodeid);
1490 	}
1491 
1492 	if (con->sock == NULL) {
1493 		if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
1494 			msleep(1000);
1495 
1496 		mutex_lock(&con->sock_mutex);
1497 		dlm_connect(con);
1498 		mutex_unlock(&con->sock_mutex);
1499 	}
1500 
1501 	if (!list_empty(&con->writequeue))
1502 		send_to_sock(con);
1503 }
1504 
1505 static void work_stop(void)
1506 {
1507 	if (recv_workqueue) {
1508 		destroy_workqueue(recv_workqueue);
1509 		recv_workqueue = NULL;
1510 	}
1511 
1512 	if (send_workqueue) {
1513 		destroy_workqueue(send_workqueue);
1514 		send_workqueue = NULL;
1515 	}
1516 }
1517 
1518 static int work_start(void)
1519 {
1520 	recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
1521 	if (!recv_workqueue) {
1522 		log_print("can't start dlm_recv");
1523 		return -ENOMEM;
1524 	}
1525 
1526 	send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
1527 	if (!send_workqueue) {
1528 		log_print("can't start dlm_send");
1529 		destroy_workqueue(recv_workqueue);
1530 		recv_workqueue = NULL;
1531 		return -ENOMEM;
1532 	}
1533 
1534 	return 0;
1535 }
1536 
1537 void dlm_lowcomms_shutdown(void)
1538 {
1539 	/* stop lowcomms_listen_data_ready calls */
1540 	lock_sock(listen_con.sock->sk);
1541 	listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready;
1542 	release_sock(listen_con.sock->sk);
1543 
1544 	cancel_work_sync(&listen_con.rwork);
1545 	dlm_close_sock(&listen_con.sock);
1546 }
1547 
1548 void dlm_lowcomms_shutdown_node(int nodeid, bool force)
1549 {
1550 	struct connection *con;
1551 	int idx;
1552 
1553 	idx = srcu_read_lock(&connections_srcu);
1554 	con = nodeid2con(nodeid, 0);
1555 	if (WARN_ON_ONCE(!con)) {
1556 		srcu_read_unlock(&connections_srcu, idx);
1557 		return;
1558 	}
1559 
1560 	flush_work(&con->swork);
1561 	WARN_ON_ONCE(!force && !list_empty(&con->writequeue));
1562 	clean_one_writequeue(con);
1563 	if (con->othercon)
1564 		clean_one_writequeue(con->othercon);
1565 	close_connection(con, true, true, true);
1566 	srcu_read_unlock(&connections_srcu, idx);
1567 }
1568 
1569 static void _stop_conn(struct connection *con, bool and_other)
1570 {
1571 	mutex_lock(&con->sock_mutex);
1572 	set_bit(CF_CLOSE, &con->flags);
1573 	set_bit(CF_READ_PENDING, &con->flags);
1574 	set_bit(CF_WRITE_PENDING, &con->flags);
1575 	if (con->sock && con->sock->sk) {
1576 		lock_sock(con->sock->sk);
1577 		con->sock->sk->sk_user_data = NULL;
1578 		release_sock(con->sock->sk);
1579 	}
1580 	if (con->othercon && and_other)
1581 		_stop_conn(con->othercon, false);
1582 	mutex_unlock(&con->sock_mutex);
1583 }
1584 
1585 static void stop_conn(struct connection *con)
1586 {
1587 	_stop_conn(con, true);
1588 }
1589 
1590 static void free_conn(struct connection *con)
1591 {
1592 	close_connection(con, true, true, true);
1593 }
1594 
1595 static void work_flush(void)
1596 {
1597 	int ok;
1598 	int i;
1599 	struct connection *con;
1600 
1601 	do {
1602 		ok = 1;
1603 		foreach_conn(stop_conn);
1604 		if (recv_workqueue)
1605 			flush_workqueue(recv_workqueue);
1606 		if (send_workqueue)
1607 			flush_workqueue(send_workqueue);
1608 		for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1609 			hlist_for_each_entry_rcu(con, &connection_hash[i],
1610 						 list) {
1611 				ok &= test_bit(CF_READ_PENDING, &con->flags);
1612 				ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1613 				if (con->othercon) {
1614 					ok &= test_bit(CF_READ_PENDING,
1615 						       &con->othercon->flags);
1616 					ok &= test_bit(CF_WRITE_PENDING,
1617 						       &con->othercon->flags);
1618 				}
1619 			}
1620 		}
1621 	} while (!ok);
1622 }
1623 
1624 void dlm_lowcomms_stop(void)
1625 {
1626 	int idx;
1627 
1628 	idx = srcu_read_lock(&connections_srcu);
1629 	work_flush();
1630 	foreach_conn(free_conn);
1631 	srcu_read_unlock(&connections_srcu, idx);
1632 	work_stop();
1633 
1634 	dlm_proto_ops = NULL;
1635 }
1636 
1637 static int dlm_listen_for_all(void)
1638 {
1639 	struct socket *sock;
1640 	int result;
1641 
1642 	log_print("Using %s for communications",
1643 		  dlm_proto_ops->name);
1644 
1645 	result = dlm_proto_ops->listen_validate();
1646 	if (result < 0)
1647 		return result;
1648 
1649 	result = sock_create_kern(&init_net, dlm_local_addr[0].ss_family,
1650 				  SOCK_STREAM, dlm_proto_ops->proto, &sock);
1651 	if (result < 0) {
1652 		log_print("Can't create comms socket: %d", result);
1653 		return result;
1654 	}
1655 
1656 	sock_set_mark(sock->sk, dlm_config.ci_mark);
1657 	dlm_proto_ops->listen_sockopts(sock);
1658 
1659 	result = dlm_proto_ops->listen_bind(sock);
1660 	if (result < 0)
1661 		goto out;
1662 
1663 	lock_sock(sock->sk);
1664 	listen_sock.sk_data_ready = sock->sk->sk_data_ready;
1665 	listen_sock.sk_write_space = sock->sk->sk_write_space;
1666 	listen_sock.sk_error_report = sock->sk->sk_error_report;
1667 	listen_sock.sk_state_change = sock->sk->sk_state_change;
1668 
1669 	listen_con.sock = sock;
1670 
1671 	sock->sk->sk_allocation = GFP_NOFS;
1672 	sock->sk->sk_data_ready = lowcomms_listen_data_ready;
1673 	release_sock(sock->sk);
1674 
1675 	result = sock->ops->listen(sock, 5);
1676 	if (result < 0) {
1677 		dlm_close_sock(&listen_con.sock);
1678 		return result;
1679 	}
1680 
1681 	return 0;
1682 
1683 out:
1684 	sock_release(sock);
1685 	return result;
1686 }
1687 
1688 static int dlm_tcp_bind(struct socket *sock)
1689 {
1690 	struct sockaddr_storage src_addr;
1691 	int result, addr_len;
1692 
1693 	/* Bind to our cluster-known address connecting to avoid
1694 	 * routing problems.
1695 	 */
1696 	memcpy(&src_addr, &dlm_local_addr[0], sizeof(src_addr));
1697 	make_sockaddr(&src_addr, 0, &addr_len);
1698 
1699 	result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
1700 				 addr_len);
1701 	if (result < 0) {
1702 		/* This *may* not indicate a critical error */
1703 		log_print("could not bind for connect: %d", result);
1704 	}
1705 
1706 	return 0;
1707 }
1708 
1709 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1710 			   struct sockaddr *addr, int addr_len)
1711 {
1712 	int ret;
1713 
1714 	ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
1715 	switch (ret) {
1716 	case -EINPROGRESS:
1717 		fallthrough;
1718 	case 0:
1719 		return 0;
1720 	}
1721 
1722 	return ret;
1723 }
1724 
1725 static int dlm_tcp_listen_validate(void)
1726 {
1727 	/* We don't support multi-homed hosts */
1728 	if (dlm_local_count > 1) {
1729 		log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1730 		return -EINVAL;
1731 	}
1732 
1733 	return 0;
1734 }
1735 
1736 static void dlm_tcp_sockopts(struct socket *sock)
1737 {
1738 	/* Turn off Nagle's algorithm */
1739 	tcp_sock_set_nodelay(sock->sk);
1740 }
1741 
1742 static void dlm_tcp_listen_sockopts(struct socket *sock)
1743 {
1744 	dlm_tcp_sockopts(sock);
1745 	sock_set_reuseaddr(sock->sk);
1746 }
1747 
1748 static int dlm_tcp_listen_bind(struct socket *sock)
1749 {
1750 	int addr_len;
1751 
1752 	/* Bind to our port */
1753 	make_sockaddr(&dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1754 	return sock->ops->bind(sock, (struct sockaddr *)&dlm_local_addr[0],
1755 			       addr_len);
1756 }
1757 
1758 static const struct dlm_proto_ops dlm_tcp_ops = {
1759 	.name = "TCP",
1760 	.proto = IPPROTO_TCP,
1761 	.connect = dlm_tcp_connect,
1762 	.sockopts = dlm_tcp_sockopts,
1763 	.bind = dlm_tcp_bind,
1764 	.listen_validate = dlm_tcp_listen_validate,
1765 	.listen_sockopts = dlm_tcp_listen_sockopts,
1766 	.listen_bind = dlm_tcp_listen_bind,
1767 };
1768 
1769 static int dlm_sctp_bind(struct socket *sock)
1770 {
1771 	return sctp_bind_addrs(sock, 0);
1772 }
1773 
1774 static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1775 			    struct sockaddr *addr, int addr_len)
1776 {
1777 	int ret;
1778 
1779 	/*
1780 	 * Make sock->ops->connect() function return in specified time,
1781 	 * since O_NONBLOCK argument in connect() function does not work here,
1782 	 * then, we should restore the default value of this attribute.
1783 	 */
1784 	sock_set_sndtimeo(sock->sk, 5);
1785 	ret = sock->ops->connect(sock, addr, addr_len, 0);
1786 	sock_set_sndtimeo(sock->sk, 0);
1787 	if (ret < 0)
1788 		return ret;
1789 
1790 	if (!test_and_set_bit(CF_CONNECTED, &con->flags))
1791 		log_print("connected to node %d", con->nodeid);
1792 
1793 	return 0;
1794 }
1795 
1796 static int dlm_sctp_listen_validate(void)
1797 {
1798 	if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1799 		log_print("SCTP is not enabled by this kernel");
1800 		return -EOPNOTSUPP;
1801 	}
1802 
1803 	request_module("sctp");
1804 	return 0;
1805 }
1806 
1807 static int dlm_sctp_bind_listen(struct socket *sock)
1808 {
1809 	return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1810 }
1811 
1812 static void dlm_sctp_sockopts(struct socket *sock)
1813 {
1814 	/* Turn off Nagle's algorithm */
1815 	sctp_sock_set_nodelay(sock->sk);
1816 	sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1817 }
1818 
1819 static const struct dlm_proto_ops dlm_sctp_ops = {
1820 	.name = "SCTP",
1821 	.proto = IPPROTO_SCTP,
1822 	.try_new_addr = true,
1823 	.connect = dlm_sctp_connect,
1824 	.sockopts = dlm_sctp_sockopts,
1825 	.bind = dlm_sctp_bind,
1826 	.listen_validate = dlm_sctp_listen_validate,
1827 	.listen_sockopts = dlm_sctp_sockopts,
1828 	.listen_bind = dlm_sctp_bind_listen,
1829 };
1830 
1831 int dlm_lowcomms_start(void)
1832 {
1833 	int error = -EINVAL;
1834 
1835 	init_local();
1836 	if (!dlm_local_count) {
1837 		error = -ENOTCONN;
1838 		log_print("no local IP address has been set");
1839 		goto fail;
1840 	}
1841 
1842 	error = work_start();
1843 	if (error)
1844 		goto fail;
1845 
1846 	/* Start listening */
1847 	switch (dlm_config.ci_protocol) {
1848 	case DLM_PROTO_TCP:
1849 		dlm_proto_ops = &dlm_tcp_ops;
1850 		break;
1851 	case DLM_PROTO_SCTP:
1852 		dlm_proto_ops = &dlm_sctp_ops;
1853 		break;
1854 	default:
1855 		log_print("Invalid protocol identifier %d set",
1856 			  dlm_config.ci_protocol);
1857 		error = -EINVAL;
1858 		goto fail_proto_ops;
1859 	}
1860 
1861 	error = dlm_listen_for_all();
1862 	if (error)
1863 		goto fail_listen;
1864 
1865 	return 0;
1866 
1867 fail_listen:
1868 	dlm_proto_ops = NULL;
1869 fail_proto_ops:
1870 	work_stop();
1871 fail:
1872 	return error;
1873 }
1874 
1875 void dlm_lowcomms_init(void)
1876 {
1877 	int i;
1878 
1879 	for (i = 0; i < CONN_HASH_SIZE; i++)
1880 		INIT_HLIST_HEAD(&connection_hash[i]);
1881 
1882 	INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1883 }
1884 
1885 void dlm_lowcomms_exit(void)
1886 {
1887 	struct connection *con;
1888 	int i, idx;
1889 
1890 	idx = srcu_read_lock(&connections_srcu);
1891 	for (i = 0; i < CONN_HASH_SIZE; i++) {
1892 		hlist_for_each_entry_rcu(con, &connection_hash[i], list) {
1893 			spin_lock(&connections_lock);
1894 			hlist_del_rcu(&con->list);
1895 			spin_unlock(&connections_lock);
1896 
1897 			if (con->othercon)
1898 				call_srcu(&connections_srcu, &con->othercon->rcu,
1899 					  connection_release);
1900 			call_srcu(&connections_srcu, &con->rcu, connection_release);
1901 		}
1902 	}
1903 	srcu_read_unlock(&connections_srcu, idx);
1904 }
1905