xref: /openbmc/linux/drivers/block/drbd/drbd_receiver.c (revision 32bc7297d855608fcb13af62a95739a079b4f8e2)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_receiver.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11  */
12 
13 
14 #include <linux/module.h>
15 
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18 
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #include <linux/unistd.h>
31 #include <linux/vmalloc.h>
32 #include <linux/random.h>
33 #include <linux/string.h>
34 #include <linux/scatterlist.h>
35 #include <linux/part_stat.h>
36 #include "drbd_int.h"
37 #include "drbd_protocol.h"
38 #include "drbd_req.h"
39 #include "drbd_vli.h"
40 
41 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
42 
43 struct packet_info {
44 	enum drbd_packet cmd;
45 	unsigned int size;
46 	unsigned int vnr;
47 	void *data;
48 };
49 
50 enum finish_epoch {
51 	FE_STILL_LIVE,
52 	FE_DESTROYED,
53 	FE_RECYCLED,
54 };
55 
56 static int drbd_do_features(struct drbd_connection *connection);
57 static int drbd_do_auth(struct drbd_connection *connection);
58 static int drbd_disconnected(struct drbd_peer_device *);
59 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page))) {
114 		++i;
115 		page = tmp;
116 	}
117 	if (len)
118 		*len = i;
119 	return page;
120 }
121 
122 static int page_chain_free(struct page *page)
123 {
124 	struct page *tmp;
125 	int i = 0;
126 	page_chain_for_each_safe(page, tmp) {
127 		put_page(page);
128 		++i;
129 	}
130 	return i;
131 }
132 
133 static void page_chain_add(struct page **head,
134 		struct page *chain_first, struct page *chain_last)
135 {
136 #if 1
137 	struct page *tmp;
138 	tmp = page_chain_tail(chain_first, NULL);
139 	BUG_ON(tmp != chain_last);
140 #endif
141 
142 	/* add chain to head */
143 	set_page_private(chain_last, (unsigned long)*head);
144 	*head = chain_first;
145 }
146 
147 static struct page *__drbd_alloc_pages(struct drbd_device *device,
148 				       unsigned int number)
149 {
150 	struct page *page = NULL;
151 	struct page *tmp = NULL;
152 	unsigned int i = 0;
153 
154 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
155 	 * So what. It saves a spin_lock. */
156 	if (drbd_pp_vacant >= number) {
157 		spin_lock(&drbd_pp_lock);
158 		page = page_chain_del(&drbd_pp_pool, number);
159 		if (page)
160 			drbd_pp_vacant -= number;
161 		spin_unlock(&drbd_pp_lock);
162 		if (page)
163 			return page;
164 	}
165 
166 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
167 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
168 	 * which in turn might block on the other node at this very place.  */
169 	for (i = 0; i < number; i++) {
170 		tmp = alloc_page(GFP_TRY);
171 		if (!tmp)
172 			break;
173 		set_page_private(tmp, (unsigned long)page);
174 		page = tmp;
175 	}
176 
177 	if (i == number)
178 		return page;
179 
180 	/* Not enough pages immediately available this time.
181 	 * No need to jump around here, drbd_alloc_pages will retry this
182 	 * function "soon". */
183 	if (page) {
184 		tmp = page_chain_tail(page, NULL);
185 		spin_lock(&drbd_pp_lock);
186 		page_chain_add(&drbd_pp_pool, page, tmp);
187 		drbd_pp_vacant += i;
188 		spin_unlock(&drbd_pp_lock);
189 	}
190 	return NULL;
191 }
192 
193 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
194 					   struct list_head *to_be_freed)
195 {
196 	struct drbd_peer_request *peer_req, *tmp;
197 
198 	/* The EEs are always appended to the end of the list. Since
199 	   they are sent in order over the wire, they have to finish
200 	   in order. As soon as we see the first not finished we can
201 	   stop to examine the list... */
202 
203 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
204 		if (drbd_peer_req_has_active_page(peer_req))
205 			break;
206 		list_move(&peer_req->w.list, to_be_freed);
207 	}
208 }
209 
210 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
211 {
212 	LIST_HEAD(reclaimed);
213 	struct drbd_peer_request *peer_req, *t;
214 
215 	spin_lock_irq(&device->resource->req_lock);
216 	reclaim_finished_net_peer_reqs(device, &reclaimed);
217 	spin_unlock_irq(&device->resource->req_lock);
218 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
219 		drbd_free_net_peer_req(device, peer_req);
220 }
221 
222 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
223 {
224 	struct drbd_peer_device *peer_device;
225 	int vnr;
226 
227 	rcu_read_lock();
228 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
229 		struct drbd_device *device = peer_device->device;
230 		if (!atomic_read(&device->pp_in_use_by_net))
231 			continue;
232 
233 		kref_get(&device->kref);
234 		rcu_read_unlock();
235 		drbd_reclaim_net_peer_reqs(device);
236 		kref_put(&device->kref, drbd_destroy_device);
237 		rcu_read_lock();
238 	}
239 	rcu_read_unlock();
240 }
241 
242 /**
243  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
244  * @peer_device:	DRBD device.
245  * @number:		number of pages requested
246  * @retry:		whether to retry, if not enough pages are available right now
247  *
248  * Tries to allocate number pages, first from our own page pool, then from
249  * the kernel.
250  * Possibly retry until DRBD frees sufficient pages somewhere else.
251  *
252  * If this allocation would exceed the max_buffers setting, we throttle
253  * allocation (schedule_timeout) to give the system some room to breathe.
254  *
255  * We do not use max-buffers as hard limit, because it could lead to
256  * congestion and further to a distributed deadlock during online-verify or
257  * (checksum based) resync, if the max-buffers, socket buffer sizes and
258  * resync-rate settings are mis-configured.
259  *
260  * Returns a page chain linked via page->private.
261  */
262 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
263 			      bool retry)
264 {
265 	struct drbd_device *device = peer_device->device;
266 	struct page *page = NULL;
267 	struct net_conf *nc;
268 	DEFINE_WAIT(wait);
269 	unsigned int mxb;
270 
271 	rcu_read_lock();
272 	nc = rcu_dereference(peer_device->connection->net_conf);
273 	mxb = nc ? nc->max_buffers : 1000000;
274 	rcu_read_unlock();
275 
276 	if (atomic_read(&device->pp_in_use) < mxb)
277 		page = __drbd_alloc_pages(device, number);
278 
279 	/* Try to keep the fast path fast, but occasionally we need
280 	 * to reclaim the pages we lended to the network stack. */
281 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
282 		drbd_reclaim_net_peer_reqs(device);
283 
284 	while (page == NULL) {
285 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
286 
287 		drbd_reclaim_net_peer_reqs(device);
288 
289 		if (atomic_read(&device->pp_in_use) < mxb) {
290 			page = __drbd_alloc_pages(device, number);
291 			if (page)
292 				break;
293 		}
294 
295 		if (!retry)
296 			break;
297 
298 		if (signal_pending(current)) {
299 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
300 			break;
301 		}
302 
303 		if (schedule_timeout(HZ/10) == 0)
304 			mxb = UINT_MAX;
305 	}
306 	finish_wait(&drbd_pp_wait, &wait);
307 
308 	if (page)
309 		atomic_add(number, &device->pp_in_use);
310 	return page;
311 }
312 
313 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
314  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
315  * Either links the page chain back to the global pool,
316  * or returns all pages to the system. */
317 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
318 {
319 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
320 	int i;
321 
322 	if (page == NULL)
323 		return;
324 
325 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
326 		i = page_chain_free(page);
327 	else {
328 		struct page *tmp;
329 		tmp = page_chain_tail(page, &i);
330 		spin_lock(&drbd_pp_lock);
331 		page_chain_add(&drbd_pp_pool, page, tmp);
332 		drbd_pp_vacant += i;
333 		spin_unlock(&drbd_pp_lock);
334 	}
335 	i = atomic_sub_return(i, a);
336 	if (i < 0)
337 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
338 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
339 	wake_up(&drbd_pp_wait);
340 }
341 
342 /*
343 You need to hold the req_lock:
344  _drbd_wait_ee_list_empty()
345 
346 You must not have the req_lock:
347  drbd_free_peer_req()
348  drbd_alloc_peer_req()
349  drbd_free_peer_reqs()
350  drbd_ee_fix_bhs()
351  drbd_finish_peer_reqs()
352  drbd_clear_done_ee()
353  drbd_wait_ee_list_empty()
354 */
355 
356 /* normal: payload_size == request size (bi_size)
357  * w_same: payload_size == logical_block_size
358  * trim: payload_size == 0 */
359 struct drbd_peer_request *
360 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
361 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
362 {
363 	struct drbd_device *device = peer_device->device;
364 	struct drbd_peer_request *peer_req;
365 	struct page *page = NULL;
366 	unsigned int nr_pages = PFN_UP(payload_size);
367 
368 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
369 		return NULL;
370 
371 	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
372 	if (!peer_req) {
373 		if (!(gfp_mask & __GFP_NOWARN))
374 			drbd_err(device, "%s: allocation failed\n", __func__);
375 		return NULL;
376 	}
377 
378 	if (nr_pages) {
379 		page = drbd_alloc_pages(peer_device, nr_pages,
380 					gfpflags_allow_blocking(gfp_mask));
381 		if (!page)
382 			goto fail;
383 	}
384 
385 	memset(peer_req, 0, sizeof(*peer_req));
386 	INIT_LIST_HEAD(&peer_req->w.list);
387 	drbd_clear_interval(&peer_req->i);
388 	peer_req->i.size = request_size;
389 	peer_req->i.sector = sector;
390 	peer_req->submit_jif = jiffies;
391 	peer_req->peer_device = peer_device;
392 	peer_req->pages = page;
393 	/*
394 	 * The block_id is opaque to the receiver.  It is not endianness
395 	 * converted, and sent back to the sender unchanged.
396 	 */
397 	peer_req->block_id = id;
398 
399 	return peer_req;
400 
401  fail:
402 	mempool_free(peer_req, &drbd_ee_mempool);
403 	return NULL;
404 }
405 
406 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
407 		       int is_net)
408 {
409 	might_sleep();
410 	if (peer_req->flags & EE_HAS_DIGEST)
411 		kfree(peer_req->digest);
412 	drbd_free_pages(device, peer_req->pages, is_net);
413 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
414 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
415 	if (!expect(device, !(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
416 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
417 		drbd_al_complete_io(device, &peer_req->i);
418 	}
419 	mempool_free(peer_req, &drbd_ee_mempool);
420 }
421 
422 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
423 {
424 	LIST_HEAD(work_list);
425 	struct drbd_peer_request *peer_req, *t;
426 	int count = 0;
427 	int is_net = list == &device->net_ee;
428 
429 	spin_lock_irq(&device->resource->req_lock);
430 	list_splice_init(list, &work_list);
431 	spin_unlock_irq(&device->resource->req_lock);
432 
433 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
434 		__drbd_free_peer_req(device, peer_req, is_net);
435 		count++;
436 	}
437 	return count;
438 }
439 
440 /*
441  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
442  */
443 static int drbd_finish_peer_reqs(struct drbd_device *device)
444 {
445 	LIST_HEAD(work_list);
446 	LIST_HEAD(reclaimed);
447 	struct drbd_peer_request *peer_req, *t;
448 	int err = 0;
449 
450 	spin_lock_irq(&device->resource->req_lock);
451 	reclaim_finished_net_peer_reqs(device, &reclaimed);
452 	list_splice_init(&device->done_ee, &work_list);
453 	spin_unlock_irq(&device->resource->req_lock);
454 
455 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
456 		drbd_free_net_peer_req(device, peer_req);
457 
458 	/* possible callbacks here:
459 	 * e_end_block, and e_end_resync_block, e_send_superseded.
460 	 * all ignore the last argument.
461 	 */
462 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
463 		int err2;
464 
465 		/* list_del not necessary, next/prev members not touched */
466 		err2 = peer_req->w.cb(&peer_req->w, !!err);
467 		if (!err)
468 			err = err2;
469 		drbd_free_peer_req(device, peer_req);
470 	}
471 	wake_up(&device->ee_wait);
472 
473 	return err;
474 }
475 
476 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
477 				     struct list_head *head)
478 {
479 	DEFINE_WAIT(wait);
480 
481 	/* avoids spin_lock/unlock
482 	 * and calling prepare_to_wait in the fast path */
483 	while (!list_empty(head)) {
484 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
485 		spin_unlock_irq(&device->resource->req_lock);
486 		io_schedule();
487 		finish_wait(&device->ee_wait, &wait);
488 		spin_lock_irq(&device->resource->req_lock);
489 	}
490 }
491 
492 static void drbd_wait_ee_list_empty(struct drbd_device *device,
493 				    struct list_head *head)
494 {
495 	spin_lock_irq(&device->resource->req_lock);
496 	_drbd_wait_ee_list_empty(device, head);
497 	spin_unlock_irq(&device->resource->req_lock);
498 }
499 
500 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
501 {
502 	struct kvec iov = {
503 		.iov_base = buf,
504 		.iov_len = size,
505 	};
506 	struct msghdr msg = {
507 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
508 	};
509 	iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, size);
510 	return sock_recvmsg(sock, &msg, msg.msg_flags);
511 }
512 
513 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
514 {
515 	int rv;
516 
517 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
518 
519 	if (rv < 0) {
520 		if (rv == -ECONNRESET)
521 			drbd_info(connection, "sock was reset by peer\n");
522 		else if (rv != -ERESTARTSYS)
523 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
524 	} else if (rv == 0) {
525 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
526 			long t;
527 			rcu_read_lock();
528 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
529 			rcu_read_unlock();
530 
531 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
532 
533 			if (t)
534 				goto out;
535 		}
536 		drbd_info(connection, "sock was shut down by peer\n");
537 	}
538 
539 	if (rv != size)
540 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
541 
542 out:
543 	return rv;
544 }
545 
546 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
547 {
548 	int err;
549 
550 	err = drbd_recv(connection, buf, size);
551 	if (err != size) {
552 		if (err >= 0)
553 			err = -EIO;
554 	} else
555 		err = 0;
556 	return err;
557 }
558 
559 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
560 {
561 	int err;
562 
563 	err = drbd_recv_all(connection, buf, size);
564 	if (err && !signal_pending(current))
565 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
566 	return err;
567 }
568 
569 /* quoting tcp(7):
570  *   On individual connections, the socket buffer size must be set prior to the
571  *   listen(2) or connect(2) calls in order to have it take effect.
572  * This is our wrapper to do so.
573  */
574 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
575 		unsigned int rcv)
576 {
577 	/* open coded SO_SNDBUF, SO_RCVBUF */
578 	if (snd) {
579 		sock->sk->sk_sndbuf = snd;
580 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
581 	}
582 	if (rcv) {
583 		sock->sk->sk_rcvbuf = rcv;
584 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
585 	}
586 }
587 
588 static struct socket *drbd_try_connect(struct drbd_connection *connection)
589 {
590 	const char *what;
591 	struct socket *sock;
592 	struct sockaddr_in6 src_in6;
593 	struct sockaddr_in6 peer_in6;
594 	struct net_conf *nc;
595 	int err, peer_addr_len, my_addr_len;
596 	int sndbuf_size, rcvbuf_size, connect_int;
597 	int disconnect_on_error = 1;
598 
599 	rcu_read_lock();
600 	nc = rcu_dereference(connection->net_conf);
601 	if (!nc) {
602 		rcu_read_unlock();
603 		return NULL;
604 	}
605 	sndbuf_size = nc->sndbuf_size;
606 	rcvbuf_size = nc->rcvbuf_size;
607 	connect_int = nc->connect_int;
608 	rcu_read_unlock();
609 
610 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
611 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
612 
613 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
614 		src_in6.sin6_port = 0;
615 	else
616 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
617 
618 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
619 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
620 
621 	what = "sock_create_kern";
622 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
623 			       SOCK_STREAM, IPPROTO_TCP, &sock);
624 	if (err < 0) {
625 		sock = NULL;
626 		goto out;
627 	}
628 
629 	sock->sk->sk_rcvtimeo =
630 	sock->sk->sk_sndtimeo = connect_int * HZ;
631 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
632 
633        /* explicitly bind to the configured IP as source IP
634 	*  for the outgoing connections.
635 	*  This is needed for multihomed hosts and to be
636 	*  able to use lo: interfaces for drbd.
637 	* Make sure to use 0 as port number, so linux selects
638 	*  a free one dynamically.
639 	*/
640 	what = "bind before connect";
641 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
642 	if (err < 0)
643 		goto out;
644 
645 	/* connect may fail, peer not yet available.
646 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
647 	disconnect_on_error = 0;
648 	what = "connect";
649 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
650 
651 out:
652 	if (err < 0) {
653 		if (sock) {
654 			sock_release(sock);
655 			sock = NULL;
656 		}
657 		switch (-err) {
658 			/* timeout, busy, signal pending */
659 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
660 		case EINTR: case ERESTARTSYS:
661 			/* peer not (yet) available, network problem */
662 		case ECONNREFUSED: case ENETUNREACH:
663 		case EHOSTDOWN:    case EHOSTUNREACH:
664 			disconnect_on_error = 0;
665 			break;
666 		default:
667 			drbd_err(connection, "%s failed, err = %d\n", what, err);
668 		}
669 		if (disconnect_on_error)
670 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
671 	}
672 
673 	return sock;
674 }
675 
676 struct accept_wait_data {
677 	struct drbd_connection *connection;
678 	struct socket *s_listen;
679 	struct completion door_bell;
680 	void (*original_sk_state_change)(struct sock *sk);
681 
682 };
683 
684 static void drbd_incoming_connection(struct sock *sk)
685 {
686 	struct accept_wait_data *ad = sk->sk_user_data;
687 	void (*state_change)(struct sock *sk);
688 
689 	state_change = ad->original_sk_state_change;
690 	if (sk->sk_state == TCP_ESTABLISHED)
691 		complete(&ad->door_bell);
692 	state_change(sk);
693 }
694 
695 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
696 {
697 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
698 	struct sockaddr_in6 my_addr;
699 	struct socket *s_listen;
700 	struct net_conf *nc;
701 	const char *what;
702 
703 	rcu_read_lock();
704 	nc = rcu_dereference(connection->net_conf);
705 	if (!nc) {
706 		rcu_read_unlock();
707 		return -EIO;
708 	}
709 	sndbuf_size = nc->sndbuf_size;
710 	rcvbuf_size = nc->rcvbuf_size;
711 	rcu_read_unlock();
712 
713 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
714 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
715 
716 	what = "sock_create_kern";
717 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
718 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
719 	if (err) {
720 		s_listen = NULL;
721 		goto out;
722 	}
723 
724 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
725 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
726 
727 	what = "bind before listen";
728 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
729 	if (err < 0)
730 		goto out;
731 
732 	ad->s_listen = s_listen;
733 	write_lock_bh(&s_listen->sk->sk_callback_lock);
734 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
735 	s_listen->sk->sk_state_change = drbd_incoming_connection;
736 	s_listen->sk->sk_user_data = ad;
737 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
738 
739 	what = "listen";
740 	err = s_listen->ops->listen(s_listen, 5);
741 	if (err < 0)
742 		goto out;
743 
744 	return 0;
745 out:
746 	if (s_listen)
747 		sock_release(s_listen);
748 	if (err < 0) {
749 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
750 			drbd_err(connection, "%s failed, err = %d\n", what, err);
751 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
752 		}
753 	}
754 
755 	return -EIO;
756 }
757 
758 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
759 {
760 	write_lock_bh(&sk->sk_callback_lock);
761 	sk->sk_state_change = ad->original_sk_state_change;
762 	sk->sk_user_data = NULL;
763 	write_unlock_bh(&sk->sk_callback_lock);
764 }
765 
766 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
767 {
768 	int timeo, connect_int, err = 0;
769 	struct socket *s_estab = NULL;
770 	struct net_conf *nc;
771 
772 	rcu_read_lock();
773 	nc = rcu_dereference(connection->net_conf);
774 	if (!nc) {
775 		rcu_read_unlock();
776 		return NULL;
777 	}
778 	connect_int = nc->connect_int;
779 	rcu_read_unlock();
780 
781 	timeo = connect_int * HZ;
782 	/* 28.5% random jitter */
783 	timeo += get_random_u32_below(2) ? timeo / 7 : -timeo / 7;
784 
785 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
786 	if (err <= 0)
787 		return NULL;
788 
789 	err = kernel_accept(ad->s_listen, &s_estab, 0);
790 	if (err < 0) {
791 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
792 			drbd_err(connection, "accept failed, err = %d\n", err);
793 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
794 		}
795 	}
796 
797 	if (s_estab)
798 		unregister_state_change(s_estab->sk, ad);
799 
800 	return s_estab;
801 }
802 
803 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
804 
805 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
806 			     enum drbd_packet cmd)
807 {
808 	if (!conn_prepare_command(connection, sock))
809 		return -EIO;
810 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
811 }
812 
813 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
814 {
815 	unsigned int header_size = drbd_header_size(connection);
816 	struct packet_info pi;
817 	struct net_conf *nc;
818 	int err;
819 
820 	rcu_read_lock();
821 	nc = rcu_dereference(connection->net_conf);
822 	if (!nc) {
823 		rcu_read_unlock();
824 		return -EIO;
825 	}
826 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
827 	rcu_read_unlock();
828 
829 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
830 	if (err != header_size) {
831 		if (err >= 0)
832 			err = -EIO;
833 		return err;
834 	}
835 	err = decode_header(connection, connection->data.rbuf, &pi);
836 	if (err)
837 		return err;
838 	return pi.cmd;
839 }
840 
841 /**
842  * drbd_socket_okay() - Free the socket if its connection is not okay
843  * @sock:	pointer to the pointer to the socket.
844  */
845 static bool drbd_socket_okay(struct socket **sock)
846 {
847 	int rr;
848 	char tb[4];
849 
850 	if (!*sock)
851 		return false;
852 
853 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
854 
855 	if (rr > 0 || rr == -EAGAIN) {
856 		return true;
857 	} else {
858 		sock_release(*sock);
859 		*sock = NULL;
860 		return false;
861 	}
862 }
863 
864 static bool connection_established(struct drbd_connection *connection,
865 				   struct socket **sock1,
866 				   struct socket **sock2)
867 {
868 	struct net_conf *nc;
869 	int timeout;
870 	bool ok;
871 
872 	if (!*sock1 || !*sock2)
873 		return false;
874 
875 	rcu_read_lock();
876 	nc = rcu_dereference(connection->net_conf);
877 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
878 	rcu_read_unlock();
879 	schedule_timeout_interruptible(timeout);
880 
881 	ok = drbd_socket_okay(sock1);
882 	ok = drbd_socket_okay(sock2) && ok;
883 
884 	return ok;
885 }
886 
887 /* Gets called if a connection is established, or if a new minor gets created
888    in a connection */
889 int drbd_connected(struct drbd_peer_device *peer_device)
890 {
891 	struct drbd_device *device = peer_device->device;
892 	int err;
893 
894 	atomic_set(&device->packet_seq, 0);
895 	device->peer_seq = 0;
896 
897 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
898 		&peer_device->connection->cstate_mutex :
899 		&device->own_state_mutex;
900 
901 	err = drbd_send_sync_param(peer_device);
902 	if (!err)
903 		err = drbd_send_sizes(peer_device, 0, 0);
904 	if (!err)
905 		err = drbd_send_uuids(peer_device);
906 	if (!err)
907 		err = drbd_send_current_state(peer_device);
908 	clear_bit(USE_DEGR_WFC_T, &device->flags);
909 	clear_bit(RESIZE_PENDING, &device->flags);
910 	atomic_set(&device->ap_in_flight, 0);
911 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
912 	return err;
913 }
914 
915 /*
916  * return values:
917  *   1 yes, we have a valid connection
918  *   0 oops, did not work out, please try again
919  *  -1 peer talks different language,
920  *     no point in trying again, please go standalone.
921  *  -2 We do not have a network config...
922  */
923 static int conn_connect(struct drbd_connection *connection)
924 {
925 	struct drbd_socket sock, msock;
926 	struct drbd_peer_device *peer_device;
927 	struct net_conf *nc;
928 	int vnr, timeout, h;
929 	bool discard_my_data, ok;
930 	enum drbd_state_rv rv;
931 	struct accept_wait_data ad = {
932 		.connection = connection,
933 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
934 	};
935 
936 	clear_bit(DISCONNECT_SENT, &connection->flags);
937 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
938 		return -2;
939 
940 	mutex_init(&sock.mutex);
941 	sock.sbuf = connection->data.sbuf;
942 	sock.rbuf = connection->data.rbuf;
943 	sock.socket = NULL;
944 	mutex_init(&msock.mutex);
945 	msock.sbuf = connection->meta.sbuf;
946 	msock.rbuf = connection->meta.rbuf;
947 	msock.socket = NULL;
948 
949 	/* Assume that the peer only understands protocol 80 until we know better.  */
950 	connection->agreed_pro_version = 80;
951 
952 	if (prepare_listen_socket(connection, &ad))
953 		return 0;
954 
955 	do {
956 		struct socket *s;
957 
958 		s = drbd_try_connect(connection);
959 		if (s) {
960 			if (!sock.socket) {
961 				sock.socket = s;
962 				send_first_packet(connection, &sock, P_INITIAL_DATA);
963 			} else if (!msock.socket) {
964 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
965 				msock.socket = s;
966 				send_first_packet(connection, &msock, P_INITIAL_META);
967 			} else {
968 				drbd_err(connection, "Logic error in conn_connect()\n");
969 				goto out_release_sockets;
970 			}
971 		}
972 
973 		if (connection_established(connection, &sock.socket, &msock.socket))
974 			break;
975 
976 retry:
977 		s = drbd_wait_for_connect(connection, &ad);
978 		if (s) {
979 			int fp = receive_first_packet(connection, s);
980 			drbd_socket_okay(&sock.socket);
981 			drbd_socket_okay(&msock.socket);
982 			switch (fp) {
983 			case P_INITIAL_DATA:
984 				if (sock.socket) {
985 					drbd_warn(connection, "initial packet S crossed\n");
986 					sock_release(sock.socket);
987 					sock.socket = s;
988 					goto randomize;
989 				}
990 				sock.socket = s;
991 				break;
992 			case P_INITIAL_META:
993 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
994 				if (msock.socket) {
995 					drbd_warn(connection, "initial packet M crossed\n");
996 					sock_release(msock.socket);
997 					msock.socket = s;
998 					goto randomize;
999 				}
1000 				msock.socket = s;
1001 				break;
1002 			default:
1003 				drbd_warn(connection, "Error receiving initial packet\n");
1004 				sock_release(s);
1005 randomize:
1006 				if (get_random_u32_below(2))
1007 					goto retry;
1008 			}
1009 		}
1010 
1011 		if (connection->cstate <= C_DISCONNECTING)
1012 			goto out_release_sockets;
1013 		if (signal_pending(current)) {
1014 			flush_signals(current);
1015 			smp_rmb();
1016 			if (get_t_state(&connection->receiver) == EXITING)
1017 				goto out_release_sockets;
1018 		}
1019 
1020 		ok = connection_established(connection, &sock.socket, &msock.socket);
1021 	} while (!ok);
1022 
1023 	if (ad.s_listen)
1024 		sock_release(ad.s_listen);
1025 
1026 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1027 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1028 
1029 	sock.socket->sk->sk_allocation = GFP_NOIO;
1030 	msock.socket->sk->sk_allocation = GFP_NOIO;
1031 
1032 	sock.socket->sk->sk_use_task_frag = false;
1033 	msock.socket->sk->sk_use_task_frag = false;
1034 
1035 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1036 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1037 
1038 	/* NOT YET ...
1039 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1040 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1041 	 * first set it to the P_CONNECTION_FEATURES timeout,
1042 	 * which we set to 4x the configured ping_timeout. */
1043 	rcu_read_lock();
1044 	nc = rcu_dereference(connection->net_conf);
1045 
1046 	sock.socket->sk->sk_sndtimeo =
1047 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1048 
1049 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1050 	timeout = nc->timeout * HZ / 10;
1051 	discard_my_data = nc->discard_my_data;
1052 	rcu_read_unlock();
1053 
1054 	msock.socket->sk->sk_sndtimeo = timeout;
1055 
1056 	/* we don't want delays.
1057 	 * we use TCP_CORK where appropriate, though */
1058 	tcp_sock_set_nodelay(sock.socket->sk);
1059 	tcp_sock_set_nodelay(msock.socket->sk);
1060 
1061 	connection->data.socket = sock.socket;
1062 	connection->meta.socket = msock.socket;
1063 	connection->last_received = jiffies;
1064 
1065 	h = drbd_do_features(connection);
1066 	if (h <= 0)
1067 		return h;
1068 
1069 	if (connection->cram_hmac_tfm) {
1070 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1071 		switch (drbd_do_auth(connection)) {
1072 		case -1:
1073 			drbd_err(connection, "Authentication of peer failed\n");
1074 			return -1;
1075 		case 0:
1076 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1077 			return 0;
1078 		}
1079 	}
1080 
1081 	connection->data.socket->sk->sk_sndtimeo = timeout;
1082 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1083 
1084 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1085 		return -1;
1086 
1087 	/* Prevent a race between resync-handshake and
1088 	 * being promoted to Primary.
1089 	 *
1090 	 * Grab and release the state mutex, so we know that any current
1091 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1092 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1093 	 */
1094 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1095 		mutex_lock(peer_device->device->state_mutex);
1096 
1097 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1098 	spin_lock_irq(&connection->resource->req_lock);
1099 	set_bit(STATE_SENT, &connection->flags);
1100 	spin_unlock_irq(&connection->resource->req_lock);
1101 
1102 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1103 		mutex_unlock(peer_device->device->state_mutex);
1104 
1105 	rcu_read_lock();
1106 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1107 		struct drbd_device *device = peer_device->device;
1108 		kref_get(&device->kref);
1109 		rcu_read_unlock();
1110 
1111 		if (discard_my_data)
1112 			set_bit(DISCARD_MY_DATA, &device->flags);
1113 		else
1114 			clear_bit(DISCARD_MY_DATA, &device->flags);
1115 
1116 		drbd_connected(peer_device);
1117 		kref_put(&device->kref, drbd_destroy_device);
1118 		rcu_read_lock();
1119 	}
1120 	rcu_read_unlock();
1121 
1122 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1123 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1124 		clear_bit(STATE_SENT, &connection->flags);
1125 		return 0;
1126 	}
1127 
1128 	drbd_thread_start(&connection->ack_receiver);
1129 	/* opencoded create_singlethread_workqueue(),
1130 	 * to be able to use format string arguments */
1131 	connection->ack_sender =
1132 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1133 	if (!connection->ack_sender) {
1134 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1135 		return 0;
1136 	}
1137 
1138 	mutex_lock(&connection->resource->conf_update);
1139 	/* The discard_my_data flag is a single-shot modifier to the next
1140 	 * connection attempt, the handshake of which is now well underway.
1141 	 * No need for rcu style copying of the whole struct
1142 	 * just to clear a single value. */
1143 	connection->net_conf->discard_my_data = 0;
1144 	mutex_unlock(&connection->resource->conf_update);
1145 
1146 	return h;
1147 
1148 out_release_sockets:
1149 	if (ad.s_listen)
1150 		sock_release(ad.s_listen);
1151 	if (sock.socket)
1152 		sock_release(sock.socket);
1153 	if (msock.socket)
1154 		sock_release(msock.socket);
1155 	return -1;
1156 }
1157 
1158 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1159 {
1160 	unsigned int header_size = drbd_header_size(connection);
1161 
1162 	if (header_size == sizeof(struct p_header100) &&
1163 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1164 		struct p_header100 *h = header;
1165 		if (h->pad != 0) {
1166 			drbd_err(connection, "Header padding is not zero\n");
1167 			return -EINVAL;
1168 		}
1169 		pi->vnr = be16_to_cpu(h->volume);
1170 		pi->cmd = be16_to_cpu(h->command);
1171 		pi->size = be32_to_cpu(h->length);
1172 	} else if (header_size == sizeof(struct p_header95) &&
1173 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1174 		struct p_header95 *h = header;
1175 		pi->cmd = be16_to_cpu(h->command);
1176 		pi->size = be32_to_cpu(h->length);
1177 		pi->vnr = 0;
1178 	} else if (header_size == sizeof(struct p_header80) &&
1179 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1180 		struct p_header80 *h = header;
1181 		pi->cmd = be16_to_cpu(h->command);
1182 		pi->size = be16_to_cpu(h->length);
1183 		pi->vnr = 0;
1184 	} else {
1185 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1186 			 be32_to_cpu(*(__be32 *)header),
1187 			 connection->agreed_pro_version);
1188 		return -EINVAL;
1189 	}
1190 	pi->data = header + header_size;
1191 	return 0;
1192 }
1193 
1194 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1195 {
1196 	if (current->plug == &connection->receiver_plug) {
1197 		blk_finish_plug(&connection->receiver_plug);
1198 		blk_start_plug(&connection->receiver_plug);
1199 	} /* else: maybe just schedule() ?? */
1200 }
1201 
1202 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1203 {
1204 	void *buffer = connection->data.rbuf;
1205 	int err;
1206 
1207 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1208 	if (err)
1209 		return err;
1210 
1211 	err = decode_header(connection, buffer, pi);
1212 	connection->last_received = jiffies;
1213 
1214 	return err;
1215 }
1216 
1217 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1218 {
1219 	void *buffer = connection->data.rbuf;
1220 	unsigned int size = drbd_header_size(connection);
1221 	int err;
1222 
1223 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1224 	if (err != size) {
1225 		/* If we have nothing in the receive buffer now, to reduce
1226 		 * application latency, try to drain the backend queues as
1227 		 * quickly as possible, and let remote TCP know what we have
1228 		 * received so far. */
1229 		if (err == -EAGAIN) {
1230 			tcp_sock_set_quickack(connection->data.socket->sk, 2);
1231 			drbd_unplug_all_devices(connection);
1232 		}
1233 		if (err > 0) {
1234 			buffer += err;
1235 			size -= err;
1236 		}
1237 		err = drbd_recv_all_warn(connection, buffer, size);
1238 		if (err)
1239 			return err;
1240 	}
1241 
1242 	err = decode_header(connection, connection->data.rbuf, pi);
1243 	connection->last_received = jiffies;
1244 
1245 	return err;
1246 }
1247 /* This is blkdev_issue_flush, but asynchronous.
1248  * We want to submit to all component volumes in parallel,
1249  * then wait for all completions.
1250  */
1251 struct issue_flush_context {
1252 	atomic_t pending;
1253 	int error;
1254 	struct completion done;
1255 };
1256 struct one_flush_context {
1257 	struct drbd_device *device;
1258 	struct issue_flush_context *ctx;
1259 };
1260 
1261 static void one_flush_endio(struct bio *bio)
1262 {
1263 	struct one_flush_context *octx = bio->bi_private;
1264 	struct drbd_device *device = octx->device;
1265 	struct issue_flush_context *ctx = octx->ctx;
1266 
1267 	if (bio->bi_status) {
1268 		ctx->error = blk_status_to_errno(bio->bi_status);
1269 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1270 	}
1271 	kfree(octx);
1272 	bio_put(bio);
1273 
1274 	clear_bit(FLUSH_PENDING, &device->flags);
1275 	put_ldev(device);
1276 	kref_put(&device->kref, drbd_destroy_device);
1277 
1278 	if (atomic_dec_and_test(&ctx->pending))
1279 		complete(&ctx->done);
1280 }
1281 
1282 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1283 {
1284 	struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
1285 				    REQ_OP_WRITE | REQ_PREFLUSH, GFP_NOIO);
1286 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1287 
1288 	if (!octx) {
1289 		drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
1290 		/* FIXME: what else can I do now?  disconnecting or detaching
1291 		 * really does not help to improve the state of the world, either.
1292 		 */
1293 		bio_put(bio);
1294 
1295 		ctx->error = -ENOMEM;
1296 		put_ldev(device);
1297 		kref_put(&device->kref, drbd_destroy_device);
1298 		return;
1299 	}
1300 
1301 	octx->device = device;
1302 	octx->ctx = ctx;
1303 	bio->bi_private = octx;
1304 	bio->bi_end_io = one_flush_endio;
1305 
1306 	device->flush_jif = jiffies;
1307 	set_bit(FLUSH_PENDING, &device->flags);
1308 	atomic_inc(&ctx->pending);
1309 	submit_bio(bio);
1310 }
1311 
1312 static void drbd_flush(struct drbd_connection *connection)
1313 {
1314 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1315 		struct drbd_peer_device *peer_device;
1316 		struct issue_flush_context ctx;
1317 		int vnr;
1318 
1319 		atomic_set(&ctx.pending, 1);
1320 		ctx.error = 0;
1321 		init_completion(&ctx.done);
1322 
1323 		rcu_read_lock();
1324 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1325 			struct drbd_device *device = peer_device->device;
1326 
1327 			if (!get_ldev(device))
1328 				continue;
1329 			kref_get(&device->kref);
1330 			rcu_read_unlock();
1331 
1332 			submit_one_flush(device, &ctx);
1333 
1334 			rcu_read_lock();
1335 		}
1336 		rcu_read_unlock();
1337 
1338 		/* Do we want to add a timeout,
1339 		 * if disk-timeout is set? */
1340 		if (!atomic_dec_and_test(&ctx.pending))
1341 			wait_for_completion(&ctx.done);
1342 
1343 		if (ctx.error) {
1344 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1345 			 * don't try again for ANY return value != 0
1346 			 * if (rv == -EOPNOTSUPP) */
1347 			/* Any error is already reported by bio_endio callback. */
1348 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1349 		}
1350 	}
1351 }
1352 
1353 /**
1354  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1355  * @connection:	DRBD connection.
1356  * @epoch:	Epoch object.
1357  * @ev:		Epoch event.
1358  */
1359 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1360 					       struct drbd_epoch *epoch,
1361 					       enum epoch_event ev)
1362 {
1363 	int epoch_size;
1364 	struct drbd_epoch *next_epoch;
1365 	enum finish_epoch rv = FE_STILL_LIVE;
1366 
1367 	spin_lock(&connection->epoch_lock);
1368 	do {
1369 		next_epoch = NULL;
1370 
1371 		epoch_size = atomic_read(&epoch->epoch_size);
1372 
1373 		switch (ev & ~EV_CLEANUP) {
1374 		case EV_PUT:
1375 			atomic_dec(&epoch->active);
1376 			break;
1377 		case EV_GOT_BARRIER_NR:
1378 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1379 			break;
1380 		case EV_BECAME_LAST:
1381 			/* nothing to do*/
1382 			break;
1383 		}
1384 
1385 		if (epoch_size != 0 &&
1386 		    atomic_read(&epoch->active) == 0 &&
1387 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1388 			if (!(ev & EV_CLEANUP)) {
1389 				spin_unlock(&connection->epoch_lock);
1390 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1391 				spin_lock(&connection->epoch_lock);
1392 			}
1393 #if 0
1394 			/* FIXME: dec unacked on connection, once we have
1395 			 * something to count pending connection packets in. */
1396 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1397 				dec_unacked(epoch->connection);
1398 #endif
1399 
1400 			if (connection->current_epoch != epoch) {
1401 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1402 				list_del(&epoch->list);
1403 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1404 				connection->epochs--;
1405 				kfree(epoch);
1406 
1407 				if (rv == FE_STILL_LIVE)
1408 					rv = FE_DESTROYED;
1409 			} else {
1410 				epoch->flags = 0;
1411 				atomic_set(&epoch->epoch_size, 0);
1412 				/* atomic_set(&epoch->active, 0); is already zero */
1413 				if (rv == FE_STILL_LIVE)
1414 					rv = FE_RECYCLED;
1415 			}
1416 		}
1417 
1418 		if (!next_epoch)
1419 			break;
1420 
1421 		epoch = next_epoch;
1422 	} while (1);
1423 
1424 	spin_unlock(&connection->epoch_lock);
1425 
1426 	return rv;
1427 }
1428 
1429 static enum write_ordering_e
1430 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1431 {
1432 	struct disk_conf *dc;
1433 
1434 	dc = rcu_dereference(bdev->disk_conf);
1435 
1436 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1437 		wo = WO_DRAIN_IO;
1438 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1439 		wo = WO_NONE;
1440 
1441 	return wo;
1442 }
1443 
1444 /*
1445  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1446  * @wo:		Write ordering method to try.
1447  */
1448 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1449 			      enum write_ordering_e wo)
1450 {
1451 	struct drbd_device *device;
1452 	enum write_ordering_e pwo;
1453 	int vnr;
1454 	static char *write_ordering_str[] = {
1455 		[WO_NONE] = "none",
1456 		[WO_DRAIN_IO] = "drain",
1457 		[WO_BDEV_FLUSH] = "flush",
1458 	};
1459 
1460 	pwo = resource->write_ordering;
1461 	if (wo != WO_BDEV_FLUSH)
1462 		wo = min(pwo, wo);
1463 	rcu_read_lock();
1464 	idr_for_each_entry(&resource->devices, device, vnr) {
1465 		if (get_ldev(device)) {
1466 			wo = max_allowed_wo(device->ldev, wo);
1467 			if (device->ldev == bdev)
1468 				bdev = NULL;
1469 			put_ldev(device);
1470 		}
1471 	}
1472 
1473 	if (bdev)
1474 		wo = max_allowed_wo(bdev, wo);
1475 
1476 	rcu_read_unlock();
1477 
1478 	resource->write_ordering = wo;
1479 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1480 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1481 }
1482 
1483 /*
1484  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1485  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1486  * will directly go to fallback mode, submitting normal writes, and
1487  * never even try to UNMAP.
1488  *
1489  * And dm-thin does not do this (yet), mostly because in general it has
1490  * to assume that "skip_block_zeroing" is set.  See also:
1491  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1492  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1493  *
1494  * We *may* ignore the discard-zeroes-data setting, if so configured.
1495  *
1496  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1497  * may ignore partial unaligned discards.
1498  *
1499  * LVM/DM thin as of at least
1500  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1501  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1502  *   Driver version:  4.29.0
1503  * still behaves this way.
1504  *
1505  * For unaligned (wrt. alignment and granularity) or too small discards,
1506  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1507  * but discard all the aligned full chunks.
1508  *
1509  * At least for LVM/DM thin, with skip_block_zeroing=false,
1510  * the result is effectively "discard_zeroes_data=1".
1511  */
1512 /* flags: EE_TRIM|EE_ZEROOUT */
1513 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1514 {
1515 	struct block_device *bdev = device->ldev->backing_bdev;
1516 	sector_t tmp, nr;
1517 	unsigned int max_discard_sectors, granularity;
1518 	int alignment;
1519 	int err = 0;
1520 
1521 	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1522 		goto zero_out;
1523 
1524 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1525 	granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
1526 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1527 
1528 	max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
1529 	max_discard_sectors -= max_discard_sectors % granularity;
1530 	if (unlikely(!max_discard_sectors))
1531 		goto zero_out;
1532 
1533 	if (nr_sectors < granularity)
1534 		goto zero_out;
1535 
1536 	tmp = start;
1537 	if (sector_div(tmp, granularity) != alignment) {
1538 		if (nr_sectors < 2*granularity)
1539 			goto zero_out;
1540 		/* start + gran - (start + gran - align) % gran */
1541 		tmp = start + granularity - alignment;
1542 		tmp = start + granularity - sector_div(tmp, granularity);
1543 
1544 		nr = tmp - start;
1545 		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1546 		 * layers are below us, some may have smaller granularity */
1547 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1548 		nr_sectors -= nr;
1549 		start = tmp;
1550 	}
1551 	while (nr_sectors >= max_discard_sectors) {
1552 		err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
1553 					    GFP_NOIO);
1554 		nr_sectors -= max_discard_sectors;
1555 		start += max_discard_sectors;
1556 	}
1557 	if (nr_sectors) {
1558 		/* max_discard_sectors is unsigned int (and a multiple of
1559 		 * granularity, we made sure of that above already);
1560 		 * nr is < max_discard_sectors;
1561 		 * I don't need sector_div here, even though nr is sector_t */
1562 		nr = nr_sectors;
1563 		nr -= (unsigned int)nr % granularity;
1564 		if (nr) {
1565 			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
1566 			nr_sectors -= nr;
1567 			start += nr;
1568 		}
1569 	}
1570  zero_out:
1571 	if (nr_sectors) {
1572 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1573 				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1574 	}
1575 	return err != 0;
1576 }
1577 
1578 static bool can_do_reliable_discards(struct drbd_device *device)
1579 {
1580 	struct disk_conf *dc;
1581 	bool can_do;
1582 
1583 	if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
1584 		return false;
1585 
1586 	rcu_read_lock();
1587 	dc = rcu_dereference(device->ldev->disk_conf);
1588 	can_do = dc->discard_zeroes_if_aligned;
1589 	rcu_read_unlock();
1590 	return can_do;
1591 }
1592 
1593 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1594 {
1595 	/* If the backend cannot discard, or does not guarantee
1596 	 * read-back zeroes in discarded ranges, we fall back to
1597 	 * zero-out.  Unless configuration specifically requested
1598 	 * otherwise. */
1599 	if (!can_do_reliable_discards(device))
1600 		peer_req->flags |= EE_ZEROOUT;
1601 
1602 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1603 	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1604 		peer_req->flags |= EE_WAS_ERROR;
1605 	drbd_endio_write_sec_final(peer_req);
1606 }
1607 
1608 static int peer_request_fault_type(struct drbd_peer_request *peer_req)
1609 {
1610 	if (peer_req_op(peer_req) == REQ_OP_READ) {
1611 		return peer_req->flags & EE_APPLICATION ?
1612 			DRBD_FAULT_DT_RD : DRBD_FAULT_RS_RD;
1613 	} else {
1614 		return peer_req->flags & EE_APPLICATION ?
1615 			DRBD_FAULT_DT_WR : DRBD_FAULT_RS_WR;
1616 	}
1617 }
1618 
1619 /**
1620  * drbd_submit_peer_request()
1621  * @peer_req:	peer request
1622  *
1623  * May spread the pages to multiple bios,
1624  * depending on bio_add_page restrictions.
1625  *
1626  * Returns 0 if all bios have been submitted,
1627  * -ENOMEM if we could not allocate enough bios,
1628  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1629  *  single page to an empty bio (which should never happen and likely indicates
1630  *  that the lower level IO stack is in some way broken). This has been observed
1631  *  on certain Xen deployments.
1632  */
1633 /* TODO allocate from our own bio_set. */
1634 int drbd_submit_peer_request(struct drbd_peer_request *peer_req)
1635 {
1636 	struct drbd_device *device = peer_req->peer_device->device;
1637 	struct bio *bios = NULL;
1638 	struct bio *bio;
1639 	struct page *page = peer_req->pages;
1640 	sector_t sector = peer_req->i.sector;
1641 	unsigned int data_size = peer_req->i.size;
1642 	unsigned int n_bios = 0;
1643 	unsigned int nr_pages = PFN_UP(data_size);
1644 
1645 	/* TRIM/DISCARD: for now, always use the helper function
1646 	 * blkdev_issue_zeroout(..., discard=true).
1647 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1648 	 * Correctness first, performance later.  Next step is to code an
1649 	 * asynchronous variant of the same.
1650 	 */
1651 	if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
1652 		/* wait for all pending IO completions, before we start
1653 		 * zeroing things out. */
1654 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1655 		/* add it to the active list now,
1656 		 * so we can find it to present it in debugfs */
1657 		peer_req->submit_jif = jiffies;
1658 		peer_req->flags |= EE_SUBMITTED;
1659 
1660 		/* If this was a resync request from receive_rs_deallocated(),
1661 		 * it is already on the sync_ee list */
1662 		if (list_empty(&peer_req->w.list)) {
1663 			spin_lock_irq(&device->resource->req_lock);
1664 			list_add_tail(&peer_req->w.list, &device->active_ee);
1665 			spin_unlock_irq(&device->resource->req_lock);
1666 		}
1667 
1668 		drbd_issue_peer_discard_or_zero_out(device, peer_req);
1669 		return 0;
1670 	}
1671 
1672 	/* In most cases, we will only need one bio.  But in case the lower
1673 	 * level restrictions happen to be different at this offset on this
1674 	 * side than those of the sending peer, we may need to submit the
1675 	 * request in more than one bio.
1676 	 *
1677 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1678 	 * generated bio, but a bio allocated on behalf of the peer.
1679 	 */
1680 next_bio:
1681 	/* _DISCARD, _WRITE_ZEROES handled above.
1682 	 * REQ_OP_FLUSH (empty flush) not expected,
1683 	 * should have been mapped to a "drbd protocol barrier".
1684 	 * REQ_OP_SECURE_ERASE: I don't see how we could ever support that.
1685 	 */
1686 	if (!(peer_req_op(peer_req) == REQ_OP_WRITE ||
1687 				peer_req_op(peer_req) == REQ_OP_READ)) {
1688 		drbd_err(device, "Invalid bio op received: 0x%x\n", peer_req->opf);
1689 		return -EINVAL;
1690 	}
1691 
1692 	bio = bio_alloc(device->ldev->backing_bdev, nr_pages, peer_req->opf, GFP_NOIO);
1693 	/* > peer_req->i.sector, unless this is the first bio */
1694 	bio->bi_iter.bi_sector = sector;
1695 	bio->bi_private = peer_req;
1696 	bio->bi_end_io = drbd_peer_request_endio;
1697 
1698 	bio->bi_next = bios;
1699 	bios = bio;
1700 	++n_bios;
1701 
1702 	page_chain_for_each(page) {
1703 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1704 		if (!bio_add_page(bio, page, len, 0))
1705 			goto next_bio;
1706 		data_size -= len;
1707 		sector += len >> 9;
1708 		--nr_pages;
1709 	}
1710 	D_ASSERT(device, data_size == 0);
1711 	D_ASSERT(device, page == NULL);
1712 
1713 	atomic_set(&peer_req->pending_bios, n_bios);
1714 	/* for debugfs: update timestamp, mark as submitted */
1715 	peer_req->submit_jif = jiffies;
1716 	peer_req->flags |= EE_SUBMITTED;
1717 	do {
1718 		bio = bios;
1719 		bios = bios->bi_next;
1720 		bio->bi_next = NULL;
1721 
1722 		drbd_submit_bio_noacct(device, peer_request_fault_type(peer_req), bio);
1723 	} while (bios);
1724 	return 0;
1725 }
1726 
1727 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1728 					     struct drbd_peer_request *peer_req)
1729 {
1730 	struct drbd_interval *i = &peer_req->i;
1731 
1732 	drbd_remove_interval(&device->write_requests, i);
1733 	drbd_clear_interval(i);
1734 
1735 	/* Wake up any processes waiting for this peer request to complete.  */
1736 	if (i->waiting)
1737 		wake_up(&device->misc_wait);
1738 }
1739 
1740 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1741 {
1742 	struct drbd_peer_device *peer_device;
1743 	int vnr;
1744 
1745 	rcu_read_lock();
1746 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1747 		struct drbd_device *device = peer_device->device;
1748 
1749 		kref_get(&device->kref);
1750 		rcu_read_unlock();
1751 		drbd_wait_ee_list_empty(device, &device->active_ee);
1752 		kref_put(&device->kref, drbd_destroy_device);
1753 		rcu_read_lock();
1754 	}
1755 	rcu_read_unlock();
1756 }
1757 
1758 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1759 {
1760 	int rv;
1761 	struct p_barrier *p = pi->data;
1762 	struct drbd_epoch *epoch;
1763 
1764 	/* FIXME these are unacked on connection,
1765 	 * not a specific (peer)device.
1766 	 */
1767 	connection->current_epoch->barrier_nr = p->barrier;
1768 	connection->current_epoch->connection = connection;
1769 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1770 
1771 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1772 	 * the activity log, which means it would not be resynced in case the
1773 	 * R_PRIMARY crashes now.
1774 	 * Therefore we must send the barrier_ack after the barrier request was
1775 	 * completed. */
1776 	switch (connection->resource->write_ordering) {
1777 	case WO_NONE:
1778 		if (rv == FE_RECYCLED)
1779 			return 0;
1780 
1781 		/* receiver context, in the writeout path of the other node.
1782 		 * avoid potential distributed deadlock */
1783 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1784 		if (epoch)
1785 			break;
1786 		else
1787 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1788 		fallthrough;
1789 
1790 	case WO_BDEV_FLUSH:
1791 	case WO_DRAIN_IO:
1792 		conn_wait_active_ee_empty(connection);
1793 		drbd_flush(connection);
1794 
1795 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1796 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1797 			if (epoch)
1798 				break;
1799 		}
1800 
1801 		return 0;
1802 	default:
1803 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1804 			 connection->resource->write_ordering);
1805 		return -EIO;
1806 	}
1807 
1808 	epoch->flags = 0;
1809 	atomic_set(&epoch->epoch_size, 0);
1810 	atomic_set(&epoch->active, 0);
1811 
1812 	spin_lock(&connection->epoch_lock);
1813 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1814 		list_add(&epoch->list, &connection->current_epoch->list);
1815 		connection->current_epoch = epoch;
1816 		connection->epochs++;
1817 	} else {
1818 		/* The current_epoch got recycled while we allocated this one... */
1819 		kfree(epoch);
1820 	}
1821 	spin_unlock(&connection->epoch_lock);
1822 
1823 	return 0;
1824 }
1825 
1826 /* quick wrapper in case payload size != request_size (write same) */
1827 static void drbd_csum_ee_size(struct crypto_shash *h,
1828 			      struct drbd_peer_request *r, void *d,
1829 			      unsigned int payload_size)
1830 {
1831 	unsigned int tmp = r->i.size;
1832 	r->i.size = payload_size;
1833 	drbd_csum_ee(h, r, d);
1834 	r->i.size = tmp;
1835 }
1836 
1837 /* used from receive_RSDataReply (recv_resync_read)
1838  * and from receive_Data.
1839  * data_size: actual payload ("data in")
1840  * 	for normal writes that is bi_size.
1841  * 	for discards, that is zero.
1842  * 	for write same, it is logical_block_size.
1843  * both trim and write same have the bi_size ("data len to be affected")
1844  * as extra argument in the packet header.
1845  */
1846 static struct drbd_peer_request *
1847 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1848 	      struct packet_info *pi) __must_hold(local)
1849 {
1850 	struct drbd_device *device = peer_device->device;
1851 	const sector_t capacity = get_capacity(device->vdisk);
1852 	struct drbd_peer_request *peer_req;
1853 	struct page *page;
1854 	int digest_size, err;
1855 	unsigned int data_size = pi->size, ds;
1856 	void *dig_in = peer_device->connection->int_dig_in;
1857 	void *dig_vv = peer_device->connection->int_dig_vv;
1858 	unsigned long *data;
1859 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1860 	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1861 
1862 	digest_size = 0;
1863 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1864 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1865 		/*
1866 		 * FIXME: Receive the incoming digest into the receive buffer
1867 		 *	  here, together with its struct p_data?
1868 		 */
1869 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1870 		if (err)
1871 			return NULL;
1872 		data_size -= digest_size;
1873 	}
1874 
1875 	/* assume request_size == data_size, but special case trim. */
1876 	ds = data_size;
1877 	if (trim) {
1878 		if (!expect(peer_device, data_size == 0))
1879 			return NULL;
1880 		ds = be32_to_cpu(trim->size);
1881 	} else if (zeroes) {
1882 		if (!expect(peer_device, data_size == 0))
1883 			return NULL;
1884 		ds = be32_to_cpu(zeroes->size);
1885 	}
1886 
1887 	if (!expect(peer_device, IS_ALIGNED(ds, 512)))
1888 		return NULL;
1889 	if (trim || zeroes) {
1890 		if (!expect(peer_device, ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1891 			return NULL;
1892 	} else if (!expect(peer_device, ds <= DRBD_MAX_BIO_SIZE))
1893 		return NULL;
1894 
1895 	/* even though we trust out peer,
1896 	 * we sometimes have to double check. */
1897 	if (sector + (ds>>9) > capacity) {
1898 		drbd_err(device, "request from peer beyond end of local disk: "
1899 			"capacity: %llus < sector: %llus + size: %u\n",
1900 			(unsigned long long)capacity,
1901 			(unsigned long long)sector, ds);
1902 		return NULL;
1903 	}
1904 
1905 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1906 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1907 	 * which in turn might block on the other node at this very place.  */
1908 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1909 	if (!peer_req)
1910 		return NULL;
1911 
1912 	peer_req->flags |= EE_WRITE;
1913 	if (trim) {
1914 		peer_req->flags |= EE_TRIM;
1915 		return peer_req;
1916 	}
1917 	if (zeroes) {
1918 		peer_req->flags |= EE_ZEROOUT;
1919 		return peer_req;
1920 	}
1921 
1922 	/* receive payload size bytes into page chain */
1923 	ds = data_size;
1924 	page = peer_req->pages;
1925 	page_chain_for_each(page) {
1926 		unsigned len = min_t(int, ds, PAGE_SIZE);
1927 		data = kmap(page);
1928 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1929 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1930 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1931 			data[0] = data[0] ^ (unsigned long)-1;
1932 		}
1933 		kunmap(page);
1934 		if (err) {
1935 			drbd_free_peer_req(device, peer_req);
1936 			return NULL;
1937 		}
1938 		ds -= len;
1939 	}
1940 
1941 	if (digest_size) {
1942 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1943 		if (memcmp(dig_in, dig_vv, digest_size)) {
1944 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1945 				(unsigned long long)sector, data_size);
1946 			drbd_free_peer_req(device, peer_req);
1947 			return NULL;
1948 		}
1949 	}
1950 	device->recv_cnt += data_size >> 9;
1951 	return peer_req;
1952 }
1953 
1954 /* drbd_drain_block() just takes a data block
1955  * out of the socket input buffer, and discards it.
1956  */
1957 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1958 {
1959 	struct page *page;
1960 	int err = 0;
1961 	void *data;
1962 
1963 	if (!data_size)
1964 		return 0;
1965 
1966 	page = drbd_alloc_pages(peer_device, 1, 1);
1967 
1968 	data = kmap(page);
1969 	while (data_size) {
1970 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1971 
1972 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1973 		if (err)
1974 			break;
1975 		data_size -= len;
1976 	}
1977 	kunmap(page);
1978 	drbd_free_pages(peer_device->device, page, 0);
1979 	return err;
1980 }
1981 
1982 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1983 			   sector_t sector, int data_size)
1984 {
1985 	struct bio_vec bvec;
1986 	struct bvec_iter iter;
1987 	struct bio *bio;
1988 	int digest_size, err, expect;
1989 	void *dig_in = peer_device->connection->int_dig_in;
1990 	void *dig_vv = peer_device->connection->int_dig_vv;
1991 
1992 	digest_size = 0;
1993 	if (peer_device->connection->peer_integrity_tfm) {
1994 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1995 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1996 		if (err)
1997 			return err;
1998 		data_size -= digest_size;
1999 	}
2000 
2001 	/* optimistically update recv_cnt.  if receiving fails below,
2002 	 * we disconnect anyways, and counters will be reset. */
2003 	peer_device->device->recv_cnt += data_size>>9;
2004 
2005 	bio = req->master_bio;
2006 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2007 
2008 	bio_for_each_segment(bvec, bio, iter) {
2009 		void *mapped = bvec_kmap_local(&bvec);
2010 		expect = min_t(int, data_size, bvec.bv_len);
2011 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2012 		kunmap_local(mapped);
2013 		if (err)
2014 			return err;
2015 		data_size -= expect;
2016 	}
2017 
2018 	if (digest_size) {
2019 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2020 		if (memcmp(dig_in, dig_vv, digest_size)) {
2021 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2022 			return -EINVAL;
2023 		}
2024 	}
2025 
2026 	D_ASSERT(peer_device->device, data_size == 0);
2027 	return 0;
2028 }
2029 
2030 /*
2031  * e_end_resync_block() is called in ack_sender context via
2032  * drbd_finish_peer_reqs().
2033  */
2034 static int e_end_resync_block(struct drbd_work *w, int unused)
2035 {
2036 	struct drbd_peer_request *peer_req =
2037 		container_of(w, struct drbd_peer_request, w);
2038 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2039 	struct drbd_device *device = peer_device->device;
2040 	sector_t sector = peer_req->i.sector;
2041 	int err;
2042 
2043 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2044 
2045 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2046 		drbd_set_in_sync(peer_device, sector, peer_req->i.size);
2047 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2048 	} else {
2049 		/* Record failure to sync */
2050 		drbd_rs_failed_io(peer_device, sector, peer_req->i.size);
2051 
2052 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2053 	}
2054 	dec_unacked(device);
2055 
2056 	return err;
2057 }
2058 
2059 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2060 			    struct packet_info *pi) __releases(local)
2061 {
2062 	struct drbd_device *device = peer_device->device;
2063 	struct drbd_peer_request *peer_req;
2064 
2065 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2066 	if (!peer_req)
2067 		goto fail;
2068 
2069 	dec_rs_pending(peer_device);
2070 
2071 	inc_unacked(device);
2072 	/* corresponding dec_unacked() in e_end_resync_block()
2073 	 * respective _drbd_clear_done_ee */
2074 
2075 	peer_req->w.cb = e_end_resync_block;
2076 	peer_req->opf = REQ_OP_WRITE;
2077 	peer_req->submit_jif = jiffies;
2078 
2079 	spin_lock_irq(&device->resource->req_lock);
2080 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2081 	spin_unlock_irq(&device->resource->req_lock);
2082 
2083 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2084 	if (drbd_submit_peer_request(peer_req) == 0)
2085 		return 0;
2086 
2087 	/* don't care for the reason here */
2088 	drbd_err(device, "submit failed, triggering re-connect\n");
2089 	spin_lock_irq(&device->resource->req_lock);
2090 	list_del(&peer_req->w.list);
2091 	spin_unlock_irq(&device->resource->req_lock);
2092 
2093 	drbd_free_peer_req(device, peer_req);
2094 fail:
2095 	put_ldev(device);
2096 	return -EIO;
2097 }
2098 
2099 static struct drbd_request *
2100 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2101 	     sector_t sector, bool missing_ok, const char *func)
2102 {
2103 	struct drbd_request *req;
2104 
2105 	/* Request object according to our peer */
2106 	req = (struct drbd_request *)(unsigned long)id;
2107 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2108 		return req;
2109 	if (!missing_ok) {
2110 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2111 			(unsigned long)id, (unsigned long long)sector);
2112 	}
2113 	return NULL;
2114 }
2115 
2116 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2117 {
2118 	struct drbd_peer_device *peer_device;
2119 	struct drbd_device *device;
2120 	struct drbd_request *req;
2121 	sector_t sector;
2122 	int err;
2123 	struct p_data *p = pi->data;
2124 
2125 	peer_device = conn_peer_device(connection, pi->vnr);
2126 	if (!peer_device)
2127 		return -EIO;
2128 	device = peer_device->device;
2129 
2130 	sector = be64_to_cpu(p->sector);
2131 
2132 	spin_lock_irq(&device->resource->req_lock);
2133 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2134 	spin_unlock_irq(&device->resource->req_lock);
2135 	if (unlikely(!req))
2136 		return -EIO;
2137 
2138 	err = recv_dless_read(peer_device, req, sector, pi->size);
2139 	if (!err)
2140 		req_mod(req, DATA_RECEIVED, peer_device);
2141 	/* else: nothing. handled from drbd_disconnect...
2142 	 * I don't think we may complete this just yet
2143 	 * in case we are "on-disconnect: freeze" */
2144 
2145 	return err;
2146 }
2147 
2148 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2149 {
2150 	struct drbd_peer_device *peer_device;
2151 	struct drbd_device *device;
2152 	sector_t sector;
2153 	int err;
2154 	struct p_data *p = pi->data;
2155 
2156 	peer_device = conn_peer_device(connection, pi->vnr);
2157 	if (!peer_device)
2158 		return -EIO;
2159 	device = peer_device->device;
2160 
2161 	sector = be64_to_cpu(p->sector);
2162 	D_ASSERT(device, p->block_id == ID_SYNCER);
2163 
2164 	if (get_ldev(device)) {
2165 		/* data is submitted to disk within recv_resync_read.
2166 		 * corresponding put_ldev done below on error,
2167 		 * or in drbd_peer_request_endio. */
2168 		err = recv_resync_read(peer_device, sector, pi);
2169 	} else {
2170 		if (drbd_ratelimit())
2171 			drbd_err(device, "Can not write resync data to local disk.\n");
2172 
2173 		err = drbd_drain_block(peer_device, pi->size);
2174 
2175 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2176 	}
2177 
2178 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2179 
2180 	return err;
2181 }
2182 
2183 static void restart_conflicting_writes(struct drbd_device *device,
2184 				       sector_t sector, int size)
2185 {
2186 	struct drbd_interval *i;
2187 	struct drbd_request *req;
2188 
2189 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2190 		if (!i->local)
2191 			continue;
2192 		req = container_of(i, struct drbd_request, i);
2193 		if (req->rq_state & RQ_LOCAL_PENDING ||
2194 		    !(req->rq_state & RQ_POSTPONED))
2195 			continue;
2196 		/* as it is RQ_POSTPONED, this will cause it to
2197 		 * be queued on the retry workqueue. */
2198 		__req_mod(req, CONFLICT_RESOLVED, NULL, NULL);
2199 	}
2200 }
2201 
2202 /*
2203  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2204  */
2205 static int e_end_block(struct drbd_work *w, int cancel)
2206 {
2207 	struct drbd_peer_request *peer_req =
2208 		container_of(w, struct drbd_peer_request, w);
2209 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2210 	struct drbd_device *device = peer_device->device;
2211 	sector_t sector = peer_req->i.sector;
2212 	int err = 0, pcmd;
2213 
2214 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2215 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2216 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2217 				device->state.conn <= C_PAUSED_SYNC_T &&
2218 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2219 				P_RS_WRITE_ACK : P_WRITE_ACK;
2220 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2221 			if (pcmd == P_RS_WRITE_ACK)
2222 				drbd_set_in_sync(peer_device, sector, peer_req->i.size);
2223 		} else {
2224 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2225 			/* we expect it to be marked out of sync anyways...
2226 			 * maybe assert this?  */
2227 		}
2228 		dec_unacked(device);
2229 	}
2230 
2231 	/* we delete from the conflict detection hash _after_ we sent out the
2232 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2233 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2234 		spin_lock_irq(&device->resource->req_lock);
2235 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2236 		drbd_remove_epoch_entry_interval(device, peer_req);
2237 		if (peer_req->flags & EE_RESTART_REQUESTS)
2238 			restart_conflicting_writes(device, sector, peer_req->i.size);
2239 		spin_unlock_irq(&device->resource->req_lock);
2240 	} else
2241 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2242 
2243 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2244 
2245 	return err;
2246 }
2247 
2248 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2249 {
2250 	struct drbd_peer_request *peer_req =
2251 		container_of(w, struct drbd_peer_request, w);
2252 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2253 	int err;
2254 
2255 	err = drbd_send_ack(peer_device, ack, peer_req);
2256 	dec_unacked(peer_device->device);
2257 
2258 	return err;
2259 }
2260 
2261 static int e_send_superseded(struct drbd_work *w, int unused)
2262 {
2263 	return e_send_ack(w, P_SUPERSEDED);
2264 }
2265 
2266 static int e_send_retry_write(struct drbd_work *w, int unused)
2267 {
2268 	struct drbd_peer_request *peer_req =
2269 		container_of(w, struct drbd_peer_request, w);
2270 	struct drbd_connection *connection = peer_req->peer_device->connection;
2271 
2272 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2273 			     P_RETRY_WRITE : P_SUPERSEDED);
2274 }
2275 
2276 static bool seq_greater(u32 a, u32 b)
2277 {
2278 	/*
2279 	 * We assume 32-bit wrap-around here.
2280 	 * For 24-bit wrap-around, we would have to shift:
2281 	 *  a <<= 8; b <<= 8;
2282 	 */
2283 	return (s32)a - (s32)b > 0;
2284 }
2285 
2286 static u32 seq_max(u32 a, u32 b)
2287 {
2288 	return seq_greater(a, b) ? a : b;
2289 }
2290 
2291 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2292 {
2293 	struct drbd_device *device = peer_device->device;
2294 	unsigned int newest_peer_seq;
2295 
2296 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2297 		spin_lock(&device->peer_seq_lock);
2298 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2299 		device->peer_seq = newest_peer_seq;
2300 		spin_unlock(&device->peer_seq_lock);
2301 		/* wake up only if we actually changed device->peer_seq */
2302 		if (peer_seq == newest_peer_seq)
2303 			wake_up(&device->seq_wait);
2304 	}
2305 }
2306 
2307 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2308 {
2309 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2310 }
2311 
2312 /* maybe change sync_ee into interval trees as well? */
2313 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2314 {
2315 	struct drbd_peer_request *rs_req;
2316 	bool rv = false;
2317 
2318 	spin_lock_irq(&device->resource->req_lock);
2319 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2320 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2321 			     rs_req->i.sector, rs_req->i.size)) {
2322 			rv = true;
2323 			break;
2324 		}
2325 	}
2326 	spin_unlock_irq(&device->resource->req_lock);
2327 
2328 	return rv;
2329 }
2330 
2331 /* Called from receive_Data.
2332  * Synchronize packets on sock with packets on msock.
2333  *
2334  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2335  * packet traveling on msock, they are still processed in the order they have
2336  * been sent.
2337  *
2338  * Note: we don't care for Ack packets overtaking P_DATA packets.
2339  *
2340  * In case packet_seq is larger than device->peer_seq number, there are
2341  * outstanding packets on the msock. We wait for them to arrive.
2342  * In case we are the logically next packet, we update device->peer_seq
2343  * ourselves. Correctly handles 32bit wrap around.
2344  *
2345  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2346  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2347  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2348  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2349  *
2350  * returns 0 if we may process the packet,
2351  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2352 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2353 {
2354 	struct drbd_device *device = peer_device->device;
2355 	DEFINE_WAIT(wait);
2356 	long timeout;
2357 	int ret = 0, tp;
2358 
2359 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2360 		return 0;
2361 
2362 	spin_lock(&device->peer_seq_lock);
2363 	for (;;) {
2364 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2365 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2366 			break;
2367 		}
2368 
2369 		if (signal_pending(current)) {
2370 			ret = -ERESTARTSYS;
2371 			break;
2372 		}
2373 
2374 		rcu_read_lock();
2375 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2376 		rcu_read_unlock();
2377 
2378 		if (!tp)
2379 			break;
2380 
2381 		/* Only need to wait if two_primaries is enabled */
2382 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2383 		spin_unlock(&device->peer_seq_lock);
2384 		rcu_read_lock();
2385 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2386 		rcu_read_unlock();
2387 		timeout = schedule_timeout(timeout);
2388 		spin_lock(&device->peer_seq_lock);
2389 		if (!timeout) {
2390 			ret = -ETIMEDOUT;
2391 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2392 			break;
2393 		}
2394 	}
2395 	spin_unlock(&device->peer_seq_lock);
2396 	finish_wait(&device->seq_wait, &wait);
2397 	return ret;
2398 }
2399 
2400 static enum req_op wire_flags_to_bio_op(u32 dpf)
2401 {
2402 	if (dpf & DP_ZEROES)
2403 		return REQ_OP_WRITE_ZEROES;
2404 	if (dpf & DP_DISCARD)
2405 		return REQ_OP_DISCARD;
2406 	else
2407 		return REQ_OP_WRITE;
2408 }
2409 
2410 /* see also bio_flags_to_wire() */
2411 static blk_opf_t wire_flags_to_bio(struct drbd_connection *connection, u32 dpf)
2412 {
2413 	return wire_flags_to_bio_op(dpf) |
2414 		(dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2415 		(dpf & DP_FUA ? REQ_FUA : 0) |
2416 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2417 }
2418 
2419 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2420 				    unsigned int size)
2421 {
2422 	struct drbd_peer_device *peer_device = first_peer_device(device);
2423 	struct drbd_interval *i;
2424 
2425     repeat:
2426 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2427 		struct drbd_request *req;
2428 		struct bio_and_error m;
2429 
2430 		if (!i->local)
2431 			continue;
2432 		req = container_of(i, struct drbd_request, i);
2433 		if (!(req->rq_state & RQ_POSTPONED))
2434 			continue;
2435 		req->rq_state &= ~RQ_POSTPONED;
2436 		__req_mod(req, NEG_ACKED, peer_device, &m);
2437 		spin_unlock_irq(&device->resource->req_lock);
2438 		if (m.bio)
2439 			complete_master_bio(device, &m);
2440 		spin_lock_irq(&device->resource->req_lock);
2441 		goto repeat;
2442 	}
2443 }
2444 
2445 static int handle_write_conflicts(struct drbd_device *device,
2446 				  struct drbd_peer_request *peer_req)
2447 {
2448 	struct drbd_connection *connection = peer_req->peer_device->connection;
2449 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2450 	sector_t sector = peer_req->i.sector;
2451 	const unsigned int size = peer_req->i.size;
2452 	struct drbd_interval *i;
2453 	bool equal;
2454 	int err;
2455 
2456 	/*
2457 	 * Inserting the peer request into the write_requests tree will prevent
2458 	 * new conflicting local requests from being added.
2459 	 */
2460 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2461 
2462     repeat:
2463 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2464 		if (i == &peer_req->i)
2465 			continue;
2466 		if (i->completed)
2467 			continue;
2468 
2469 		if (!i->local) {
2470 			/*
2471 			 * Our peer has sent a conflicting remote request; this
2472 			 * should not happen in a two-node setup.  Wait for the
2473 			 * earlier peer request to complete.
2474 			 */
2475 			err = drbd_wait_misc(device, i);
2476 			if (err)
2477 				goto out;
2478 			goto repeat;
2479 		}
2480 
2481 		equal = i->sector == sector && i->size == size;
2482 		if (resolve_conflicts) {
2483 			/*
2484 			 * If the peer request is fully contained within the
2485 			 * overlapping request, it can be considered overwritten
2486 			 * and thus superseded; otherwise, it will be retried
2487 			 * once all overlapping requests have completed.
2488 			 */
2489 			bool superseded = i->sector <= sector && i->sector +
2490 				       (i->size >> 9) >= sector + (size >> 9);
2491 
2492 			if (!equal)
2493 				drbd_alert(device, "Concurrent writes detected: "
2494 					       "local=%llus +%u, remote=%llus +%u, "
2495 					       "assuming %s came first\n",
2496 					  (unsigned long long)i->sector, i->size,
2497 					  (unsigned long long)sector, size,
2498 					  superseded ? "local" : "remote");
2499 
2500 			peer_req->w.cb = superseded ? e_send_superseded :
2501 						   e_send_retry_write;
2502 			list_add_tail(&peer_req->w.list, &device->done_ee);
2503 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2504 
2505 			err = -ENOENT;
2506 			goto out;
2507 		} else {
2508 			struct drbd_request *req =
2509 				container_of(i, struct drbd_request, i);
2510 
2511 			if (!equal)
2512 				drbd_alert(device, "Concurrent writes detected: "
2513 					       "local=%llus +%u, remote=%llus +%u\n",
2514 					  (unsigned long long)i->sector, i->size,
2515 					  (unsigned long long)sector, size);
2516 
2517 			if (req->rq_state & RQ_LOCAL_PENDING ||
2518 			    !(req->rq_state & RQ_POSTPONED)) {
2519 				/*
2520 				 * Wait for the node with the discard flag to
2521 				 * decide if this request has been superseded
2522 				 * or needs to be retried.
2523 				 * Requests that have been superseded will
2524 				 * disappear from the write_requests tree.
2525 				 *
2526 				 * In addition, wait for the conflicting
2527 				 * request to finish locally before submitting
2528 				 * the conflicting peer request.
2529 				 */
2530 				err = drbd_wait_misc(device, &req->i);
2531 				if (err) {
2532 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2533 					fail_postponed_requests(device, sector, size);
2534 					goto out;
2535 				}
2536 				goto repeat;
2537 			}
2538 			/*
2539 			 * Remember to restart the conflicting requests after
2540 			 * the new peer request has completed.
2541 			 */
2542 			peer_req->flags |= EE_RESTART_REQUESTS;
2543 		}
2544 	}
2545 	err = 0;
2546 
2547     out:
2548 	if (err)
2549 		drbd_remove_epoch_entry_interval(device, peer_req);
2550 	return err;
2551 }
2552 
2553 /* mirrored write */
2554 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2555 {
2556 	struct drbd_peer_device *peer_device;
2557 	struct drbd_device *device;
2558 	struct net_conf *nc;
2559 	sector_t sector;
2560 	struct drbd_peer_request *peer_req;
2561 	struct p_data *p = pi->data;
2562 	u32 peer_seq = be32_to_cpu(p->seq_num);
2563 	u32 dp_flags;
2564 	int err, tp;
2565 
2566 	peer_device = conn_peer_device(connection, pi->vnr);
2567 	if (!peer_device)
2568 		return -EIO;
2569 	device = peer_device->device;
2570 
2571 	if (!get_ldev(device)) {
2572 		int err2;
2573 
2574 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2575 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2576 		atomic_inc(&connection->current_epoch->epoch_size);
2577 		err2 = drbd_drain_block(peer_device, pi->size);
2578 		if (!err)
2579 			err = err2;
2580 		return err;
2581 	}
2582 
2583 	/*
2584 	 * Corresponding put_ldev done either below (on various errors), or in
2585 	 * drbd_peer_request_endio, if we successfully submit the data at the
2586 	 * end of this function.
2587 	 */
2588 
2589 	sector = be64_to_cpu(p->sector);
2590 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2591 	if (!peer_req) {
2592 		put_ldev(device);
2593 		return -EIO;
2594 	}
2595 
2596 	peer_req->w.cb = e_end_block;
2597 	peer_req->submit_jif = jiffies;
2598 	peer_req->flags |= EE_APPLICATION;
2599 
2600 	dp_flags = be32_to_cpu(p->dp_flags);
2601 	peer_req->opf = wire_flags_to_bio(connection, dp_flags);
2602 	if (pi->cmd == P_TRIM) {
2603 		D_ASSERT(peer_device, peer_req->i.size > 0);
2604 		D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_DISCARD);
2605 		D_ASSERT(peer_device, peer_req->pages == NULL);
2606 		/* need to play safe: an older DRBD sender
2607 		 * may mean zero-out while sending P_TRIM. */
2608 		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2609 			peer_req->flags |= EE_ZEROOUT;
2610 	} else if (pi->cmd == P_ZEROES) {
2611 		D_ASSERT(peer_device, peer_req->i.size > 0);
2612 		D_ASSERT(peer_device, peer_req_op(peer_req) == REQ_OP_WRITE_ZEROES);
2613 		D_ASSERT(peer_device, peer_req->pages == NULL);
2614 		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2615 		if (dp_flags & DP_DISCARD)
2616 			peer_req->flags |= EE_TRIM;
2617 	} else if (peer_req->pages == NULL) {
2618 		D_ASSERT(device, peer_req->i.size == 0);
2619 		D_ASSERT(device, dp_flags & DP_FLUSH);
2620 	}
2621 
2622 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2623 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2624 
2625 	spin_lock(&connection->epoch_lock);
2626 	peer_req->epoch = connection->current_epoch;
2627 	atomic_inc(&peer_req->epoch->epoch_size);
2628 	atomic_inc(&peer_req->epoch->active);
2629 	spin_unlock(&connection->epoch_lock);
2630 
2631 	rcu_read_lock();
2632 	nc = rcu_dereference(peer_device->connection->net_conf);
2633 	tp = nc->two_primaries;
2634 	if (peer_device->connection->agreed_pro_version < 100) {
2635 		switch (nc->wire_protocol) {
2636 		case DRBD_PROT_C:
2637 			dp_flags |= DP_SEND_WRITE_ACK;
2638 			break;
2639 		case DRBD_PROT_B:
2640 			dp_flags |= DP_SEND_RECEIVE_ACK;
2641 			break;
2642 		}
2643 	}
2644 	rcu_read_unlock();
2645 
2646 	if (dp_flags & DP_SEND_WRITE_ACK) {
2647 		peer_req->flags |= EE_SEND_WRITE_ACK;
2648 		inc_unacked(device);
2649 		/* corresponding dec_unacked() in e_end_block()
2650 		 * respective _drbd_clear_done_ee */
2651 	}
2652 
2653 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2654 		/* I really don't like it that the receiver thread
2655 		 * sends on the msock, but anyways */
2656 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2657 	}
2658 
2659 	if (tp) {
2660 		/* two primaries implies protocol C */
2661 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2662 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2663 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2664 		if (err)
2665 			goto out_interrupted;
2666 		spin_lock_irq(&device->resource->req_lock);
2667 		err = handle_write_conflicts(device, peer_req);
2668 		if (err) {
2669 			spin_unlock_irq(&device->resource->req_lock);
2670 			if (err == -ENOENT) {
2671 				put_ldev(device);
2672 				return 0;
2673 			}
2674 			goto out_interrupted;
2675 		}
2676 	} else {
2677 		update_peer_seq(peer_device, peer_seq);
2678 		spin_lock_irq(&device->resource->req_lock);
2679 	}
2680 	/* TRIM and is processed synchronously,
2681 	 * we wait for all pending requests, respectively wait for
2682 	 * active_ee to become empty in drbd_submit_peer_request();
2683 	 * better not add ourselves here. */
2684 	if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
2685 		list_add_tail(&peer_req->w.list, &device->active_ee);
2686 	spin_unlock_irq(&device->resource->req_lock);
2687 
2688 	if (device->state.conn == C_SYNC_TARGET)
2689 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2690 
2691 	if (device->state.pdsk < D_INCONSISTENT) {
2692 		/* In case we have the only disk of the cluster, */
2693 		drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
2694 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2695 		drbd_al_begin_io(device, &peer_req->i);
2696 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2697 	}
2698 
2699 	err = drbd_submit_peer_request(peer_req);
2700 	if (!err)
2701 		return 0;
2702 
2703 	/* don't care for the reason here */
2704 	drbd_err(device, "submit failed, triggering re-connect\n");
2705 	spin_lock_irq(&device->resource->req_lock);
2706 	list_del(&peer_req->w.list);
2707 	drbd_remove_epoch_entry_interval(device, peer_req);
2708 	spin_unlock_irq(&device->resource->req_lock);
2709 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2710 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2711 		drbd_al_complete_io(device, &peer_req->i);
2712 	}
2713 
2714 out_interrupted:
2715 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2716 	put_ldev(device);
2717 	drbd_free_peer_req(device, peer_req);
2718 	return err;
2719 }
2720 
2721 /* We may throttle resync, if the lower device seems to be busy,
2722  * and current sync rate is above c_min_rate.
2723  *
2724  * To decide whether or not the lower device is busy, we use a scheme similar
2725  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2726  * (more than 64 sectors) of activity we cannot account for with our own resync
2727  * activity, it obviously is "busy".
2728  *
2729  * The current sync rate used here uses only the most recent two step marks,
2730  * to have a short time average so we can react faster.
2731  */
2732 bool drbd_rs_should_slow_down(struct drbd_peer_device *peer_device, sector_t sector,
2733 		bool throttle_if_app_is_waiting)
2734 {
2735 	struct drbd_device *device = peer_device->device;
2736 	struct lc_element *tmp;
2737 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2738 
2739 	if (!throttle || throttle_if_app_is_waiting)
2740 		return throttle;
2741 
2742 	spin_lock_irq(&device->al_lock);
2743 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2744 	if (tmp) {
2745 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2746 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2747 			throttle = false;
2748 		/* Do not slow down if app IO is already waiting for this extent,
2749 		 * and our progress is necessary for application IO to complete. */
2750 	}
2751 	spin_unlock_irq(&device->al_lock);
2752 
2753 	return throttle;
2754 }
2755 
2756 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2757 {
2758 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2759 	unsigned long db, dt, dbdt;
2760 	unsigned int c_min_rate;
2761 	int curr_events;
2762 
2763 	rcu_read_lock();
2764 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2765 	rcu_read_unlock();
2766 
2767 	/* feature disabled? */
2768 	if (c_min_rate == 0)
2769 		return false;
2770 
2771 	curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2772 			atomic_read(&device->rs_sect_ev);
2773 
2774 	if (atomic_read(&device->ap_actlog_cnt)
2775 	    || curr_events - device->rs_last_events > 64) {
2776 		unsigned long rs_left;
2777 		int i;
2778 
2779 		device->rs_last_events = curr_events;
2780 
2781 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2782 		 * approx. */
2783 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2784 
2785 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2786 			rs_left = device->ov_left;
2787 		else
2788 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2789 
2790 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2791 		if (!dt)
2792 			dt++;
2793 		db = device->rs_mark_left[i] - rs_left;
2794 		dbdt = Bit2KB(db/dt);
2795 
2796 		if (dbdt > c_min_rate)
2797 			return true;
2798 	}
2799 	return false;
2800 }
2801 
2802 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2803 {
2804 	struct drbd_peer_device *peer_device;
2805 	struct drbd_device *device;
2806 	sector_t sector;
2807 	sector_t capacity;
2808 	struct drbd_peer_request *peer_req;
2809 	struct digest_info *di = NULL;
2810 	int size, verb;
2811 	struct p_block_req *p =	pi->data;
2812 
2813 	peer_device = conn_peer_device(connection, pi->vnr);
2814 	if (!peer_device)
2815 		return -EIO;
2816 	device = peer_device->device;
2817 	capacity = get_capacity(device->vdisk);
2818 
2819 	sector = be64_to_cpu(p->sector);
2820 	size   = be32_to_cpu(p->blksize);
2821 
2822 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2823 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2824 				(unsigned long long)sector, size);
2825 		return -EINVAL;
2826 	}
2827 	if (sector + (size>>9) > capacity) {
2828 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2829 				(unsigned long long)sector, size);
2830 		return -EINVAL;
2831 	}
2832 
2833 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2834 		verb = 1;
2835 		switch (pi->cmd) {
2836 		case P_DATA_REQUEST:
2837 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2838 			break;
2839 		case P_RS_THIN_REQ:
2840 		case P_RS_DATA_REQUEST:
2841 		case P_CSUM_RS_REQUEST:
2842 		case P_OV_REQUEST:
2843 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2844 			break;
2845 		case P_OV_REPLY:
2846 			verb = 0;
2847 			dec_rs_pending(peer_device);
2848 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2849 			break;
2850 		default:
2851 			BUG();
2852 		}
2853 		if (verb && drbd_ratelimit())
2854 			drbd_err(device, "Can not satisfy peer's read request, "
2855 			    "no local data.\n");
2856 
2857 		/* drain possibly payload */
2858 		return drbd_drain_block(peer_device, pi->size);
2859 	}
2860 
2861 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2862 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2863 	 * which in turn might block on the other node at this very place.  */
2864 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2865 			size, GFP_NOIO);
2866 	if (!peer_req) {
2867 		put_ldev(device);
2868 		return -ENOMEM;
2869 	}
2870 	peer_req->opf = REQ_OP_READ;
2871 
2872 	switch (pi->cmd) {
2873 	case P_DATA_REQUEST:
2874 		peer_req->w.cb = w_e_end_data_req;
2875 		/* application IO, don't drbd_rs_begin_io */
2876 		peer_req->flags |= EE_APPLICATION;
2877 		goto submit;
2878 
2879 	case P_RS_THIN_REQ:
2880 		/* If at some point in the future we have a smart way to
2881 		   find out if this data block is completely deallocated,
2882 		   then we would do something smarter here than reading
2883 		   the block... */
2884 		peer_req->flags |= EE_RS_THIN_REQ;
2885 		fallthrough;
2886 	case P_RS_DATA_REQUEST:
2887 		peer_req->w.cb = w_e_end_rsdata_req;
2888 		/* used in the sector offset progress display */
2889 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2890 		break;
2891 
2892 	case P_OV_REPLY:
2893 	case P_CSUM_RS_REQUEST:
2894 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2895 		if (!di)
2896 			goto out_free_e;
2897 
2898 		di->digest_size = pi->size;
2899 		di->digest = (((char *)di)+sizeof(struct digest_info));
2900 
2901 		peer_req->digest = di;
2902 		peer_req->flags |= EE_HAS_DIGEST;
2903 
2904 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2905 			goto out_free_e;
2906 
2907 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2908 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2909 			peer_req->w.cb = w_e_end_csum_rs_req;
2910 			/* used in the sector offset progress display */
2911 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2912 			/* remember to report stats in drbd_resync_finished */
2913 			device->use_csums = true;
2914 		} else if (pi->cmd == P_OV_REPLY) {
2915 			/* track progress, we may need to throttle */
2916 			atomic_add(size >> 9, &device->rs_sect_in);
2917 			peer_req->w.cb = w_e_end_ov_reply;
2918 			dec_rs_pending(peer_device);
2919 			/* drbd_rs_begin_io done when we sent this request,
2920 			 * but accounting still needs to be done. */
2921 			goto submit_for_resync;
2922 		}
2923 		break;
2924 
2925 	case P_OV_REQUEST:
2926 		if (device->ov_start_sector == ~(sector_t)0 &&
2927 		    peer_device->connection->agreed_pro_version >= 90) {
2928 			unsigned long now = jiffies;
2929 			int i;
2930 			device->ov_start_sector = sector;
2931 			device->ov_position = sector;
2932 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2933 			device->rs_total = device->ov_left;
2934 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2935 				device->rs_mark_left[i] = device->ov_left;
2936 				device->rs_mark_time[i] = now;
2937 			}
2938 			drbd_info(device, "Online Verify start sector: %llu\n",
2939 					(unsigned long long)sector);
2940 		}
2941 		peer_req->w.cb = w_e_end_ov_req;
2942 		break;
2943 
2944 	default:
2945 		BUG();
2946 	}
2947 
2948 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2949 	 * wrt the receiver, but it is not as straightforward as it may seem.
2950 	 * Various places in the resync start and stop logic assume resync
2951 	 * requests are processed in order, requeuing this on the worker thread
2952 	 * introduces a bunch of new code for synchronization between threads.
2953 	 *
2954 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2955 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2956 	 * for application writes for the same time.  For now, just throttle
2957 	 * here, where the rest of the code expects the receiver to sleep for
2958 	 * a while, anyways.
2959 	 */
2960 
2961 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2962 	 * this defers syncer requests for some time, before letting at least
2963 	 * on request through.  The resync controller on the receiving side
2964 	 * will adapt to the incoming rate accordingly.
2965 	 *
2966 	 * We cannot throttle here if remote is Primary/SyncTarget:
2967 	 * we would also throttle its application reads.
2968 	 * In that case, throttling is done on the SyncTarget only.
2969 	 */
2970 
2971 	/* Even though this may be a resync request, we do add to "read_ee";
2972 	 * "sync_ee" is only used for resync WRITEs.
2973 	 * Add to list early, so debugfs can find this request
2974 	 * even if we have to sleep below. */
2975 	spin_lock_irq(&device->resource->req_lock);
2976 	list_add_tail(&peer_req->w.list, &device->read_ee);
2977 	spin_unlock_irq(&device->resource->req_lock);
2978 
2979 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2980 	if (device->state.peer != R_PRIMARY
2981 	&& drbd_rs_should_slow_down(peer_device, sector, false))
2982 		schedule_timeout_uninterruptible(HZ/10);
2983 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2984 	if (drbd_rs_begin_io(device, sector))
2985 		goto out_free_e;
2986 
2987 submit_for_resync:
2988 	atomic_add(size >> 9, &device->rs_sect_ev);
2989 
2990 submit:
2991 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2992 	inc_unacked(device);
2993 	if (drbd_submit_peer_request(peer_req) == 0)
2994 		return 0;
2995 
2996 	/* don't care for the reason here */
2997 	drbd_err(device, "submit failed, triggering re-connect\n");
2998 
2999 out_free_e:
3000 	spin_lock_irq(&device->resource->req_lock);
3001 	list_del(&peer_req->w.list);
3002 	spin_unlock_irq(&device->resource->req_lock);
3003 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
3004 
3005 	put_ldev(device);
3006 	drbd_free_peer_req(device, peer_req);
3007 	return -EIO;
3008 }
3009 
3010 /*
3011  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3012  */
3013 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3014 {
3015 	struct drbd_device *device = peer_device->device;
3016 	int self, peer, rv = -100;
3017 	unsigned long ch_self, ch_peer;
3018 	enum drbd_after_sb_p after_sb_0p;
3019 
3020 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
3021 	peer = device->p_uuid[UI_BITMAP] & 1;
3022 
3023 	ch_peer = device->p_uuid[UI_SIZE];
3024 	ch_self = device->comm_bm_set;
3025 
3026 	rcu_read_lock();
3027 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3028 	rcu_read_unlock();
3029 	switch (after_sb_0p) {
3030 	case ASB_CONSENSUS:
3031 	case ASB_DISCARD_SECONDARY:
3032 	case ASB_CALL_HELPER:
3033 	case ASB_VIOLENTLY:
3034 		drbd_err(device, "Configuration error.\n");
3035 		break;
3036 	case ASB_DISCONNECT:
3037 		break;
3038 	case ASB_DISCARD_YOUNGER_PRI:
3039 		if (self == 0 && peer == 1) {
3040 			rv = -1;
3041 			break;
3042 		}
3043 		if (self == 1 && peer == 0) {
3044 			rv =  1;
3045 			break;
3046 		}
3047 		fallthrough;	/* to one of the other strategies */
3048 	case ASB_DISCARD_OLDER_PRI:
3049 		if (self == 0 && peer == 1) {
3050 			rv = 1;
3051 			break;
3052 		}
3053 		if (self == 1 && peer == 0) {
3054 			rv = -1;
3055 			break;
3056 		}
3057 		/* Else fall through to one of the other strategies... */
3058 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3059 		     "Using discard-least-changes instead\n");
3060 		fallthrough;
3061 	case ASB_DISCARD_ZERO_CHG:
3062 		if (ch_peer == 0 && ch_self == 0) {
3063 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3064 				? -1 : 1;
3065 			break;
3066 		} else {
3067 			if (ch_peer == 0) { rv =  1; break; }
3068 			if (ch_self == 0) { rv = -1; break; }
3069 		}
3070 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3071 			break;
3072 		fallthrough;
3073 	case ASB_DISCARD_LEAST_CHG:
3074 		if	(ch_self < ch_peer)
3075 			rv = -1;
3076 		else if (ch_self > ch_peer)
3077 			rv =  1;
3078 		else /* ( ch_self == ch_peer ) */
3079 		     /* Well, then use something else. */
3080 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3081 				? -1 : 1;
3082 		break;
3083 	case ASB_DISCARD_LOCAL:
3084 		rv = -1;
3085 		break;
3086 	case ASB_DISCARD_REMOTE:
3087 		rv =  1;
3088 	}
3089 
3090 	return rv;
3091 }
3092 
3093 /*
3094  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3095  */
3096 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3097 {
3098 	struct drbd_device *device = peer_device->device;
3099 	int hg, rv = -100;
3100 	enum drbd_after_sb_p after_sb_1p;
3101 
3102 	rcu_read_lock();
3103 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3104 	rcu_read_unlock();
3105 	switch (after_sb_1p) {
3106 	case ASB_DISCARD_YOUNGER_PRI:
3107 	case ASB_DISCARD_OLDER_PRI:
3108 	case ASB_DISCARD_LEAST_CHG:
3109 	case ASB_DISCARD_LOCAL:
3110 	case ASB_DISCARD_REMOTE:
3111 	case ASB_DISCARD_ZERO_CHG:
3112 		drbd_err(device, "Configuration error.\n");
3113 		break;
3114 	case ASB_DISCONNECT:
3115 		break;
3116 	case ASB_CONSENSUS:
3117 		hg = drbd_asb_recover_0p(peer_device);
3118 		if (hg == -1 && device->state.role == R_SECONDARY)
3119 			rv = hg;
3120 		if (hg == 1  && device->state.role == R_PRIMARY)
3121 			rv = hg;
3122 		break;
3123 	case ASB_VIOLENTLY:
3124 		rv = drbd_asb_recover_0p(peer_device);
3125 		break;
3126 	case ASB_DISCARD_SECONDARY:
3127 		return device->state.role == R_PRIMARY ? 1 : -1;
3128 	case ASB_CALL_HELPER:
3129 		hg = drbd_asb_recover_0p(peer_device);
3130 		if (hg == -1 && device->state.role == R_PRIMARY) {
3131 			enum drbd_state_rv rv2;
3132 
3133 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3134 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3135 			  * we do not need to wait for the after state change work either. */
3136 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3137 			if (rv2 != SS_SUCCESS) {
3138 				drbd_khelper(device, "pri-lost-after-sb");
3139 			} else {
3140 				drbd_warn(device, "Successfully gave up primary role.\n");
3141 				rv = hg;
3142 			}
3143 		} else
3144 			rv = hg;
3145 	}
3146 
3147 	return rv;
3148 }
3149 
3150 /*
3151  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3152  */
3153 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3154 {
3155 	struct drbd_device *device = peer_device->device;
3156 	int hg, rv = -100;
3157 	enum drbd_after_sb_p after_sb_2p;
3158 
3159 	rcu_read_lock();
3160 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3161 	rcu_read_unlock();
3162 	switch (after_sb_2p) {
3163 	case ASB_DISCARD_YOUNGER_PRI:
3164 	case ASB_DISCARD_OLDER_PRI:
3165 	case ASB_DISCARD_LEAST_CHG:
3166 	case ASB_DISCARD_LOCAL:
3167 	case ASB_DISCARD_REMOTE:
3168 	case ASB_CONSENSUS:
3169 	case ASB_DISCARD_SECONDARY:
3170 	case ASB_DISCARD_ZERO_CHG:
3171 		drbd_err(device, "Configuration error.\n");
3172 		break;
3173 	case ASB_VIOLENTLY:
3174 		rv = drbd_asb_recover_0p(peer_device);
3175 		break;
3176 	case ASB_DISCONNECT:
3177 		break;
3178 	case ASB_CALL_HELPER:
3179 		hg = drbd_asb_recover_0p(peer_device);
3180 		if (hg == -1) {
3181 			enum drbd_state_rv rv2;
3182 
3183 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3184 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3185 			  * we do not need to wait for the after state change work either. */
3186 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3187 			if (rv2 != SS_SUCCESS) {
3188 				drbd_khelper(device, "pri-lost-after-sb");
3189 			} else {
3190 				drbd_warn(device, "Successfully gave up primary role.\n");
3191 				rv = hg;
3192 			}
3193 		} else
3194 			rv = hg;
3195 	}
3196 
3197 	return rv;
3198 }
3199 
3200 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3201 			   u64 bits, u64 flags)
3202 {
3203 	if (!uuid) {
3204 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3205 		return;
3206 	}
3207 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3208 	     text,
3209 	     (unsigned long long)uuid[UI_CURRENT],
3210 	     (unsigned long long)uuid[UI_BITMAP],
3211 	     (unsigned long long)uuid[UI_HISTORY_START],
3212 	     (unsigned long long)uuid[UI_HISTORY_END],
3213 	     (unsigned long long)bits,
3214 	     (unsigned long long)flags);
3215 }
3216 
3217 /*
3218   100	after split brain try auto recover
3219     2	C_SYNC_SOURCE set BitMap
3220     1	C_SYNC_SOURCE use BitMap
3221     0	no Sync
3222    -1	C_SYNC_TARGET use BitMap
3223    -2	C_SYNC_TARGET set BitMap
3224  -100	after split brain, disconnect
3225 -1000	unrelated data
3226 -1091   requires proto 91
3227 -1096   requires proto 96
3228  */
3229 
3230 static int drbd_uuid_compare(struct drbd_peer_device *const peer_device,
3231 		enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3232 {
3233 	struct drbd_connection *const connection = peer_device->connection;
3234 	struct drbd_device *device = peer_device->device;
3235 	u64 self, peer;
3236 	int i, j;
3237 
3238 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3239 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3240 
3241 	*rule_nr = 10;
3242 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3243 		return 0;
3244 
3245 	*rule_nr = 20;
3246 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3247 	     peer != UUID_JUST_CREATED)
3248 		return -2;
3249 
3250 	*rule_nr = 30;
3251 	if (self != UUID_JUST_CREATED &&
3252 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3253 		return 2;
3254 
3255 	if (self == peer) {
3256 		int rct, dc; /* roles at crash time */
3257 
3258 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3259 
3260 			if (connection->agreed_pro_version < 91)
3261 				return -1091;
3262 
3263 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3264 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3265 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3266 				drbd_uuid_move_history(device);
3267 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3268 				device->ldev->md.uuid[UI_BITMAP] = 0;
3269 
3270 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3271 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3272 				*rule_nr = 34;
3273 			} else {
3274 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3275 				*rule_nr = 36;
3276 			}
3277 
3278 			return 1;
3279 		}
3280 
3281 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3282 
3283 			if (connection->agreed_pro_version < 91)
3284 				return -1091;
3285 
3286 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3287 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3288 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3289 
3290 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3291 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3292 				device->p_uuid[UI_BITMAP] = 0UL;
3293 
3294 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3295 				*rule_nr = 35;
3296 			} else {
3297 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3298 				*rule_nr = 37;
3299 			}
3300 
3301 			return -1;
3302 		}
3303 
3304 		/* Common power [off|failure] */
3305 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3306 			(device->p_uuid[UI_FLAGS] & 2);
3307 		/* lowest bit is set when we were primary,
3308 		 * next bit (weight 2) is set when peer was primary */
3309 		*rule_nr = 40;
3310 
3311 		/* Neither has the "crashed primary" flag set,
3312 		 * only a replication link hickup. */
3313 		if (rct == 0)
3314 			return 0;
3315 
3316 		/* Current UUID equal and no bitmap uuid; does not necessarily
3317 		 * mean this was a "simultaneous hard crash", maybe IO was
3318 		 * frozen, so no UUID-bump happened.
3319 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3320 		 * for "new-enough" peer DRBD version. */
3321 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3322 			*rule_nr = 41;
3323 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3324 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3325 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3326 			}
3327 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3328 				/* At least one has the "crashed primary" bit set,
3329 				 * both are primary now, but neither has rotated its UUIDs?
3330 				 * "Can not happen." */
3331 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3332 				return -100;
3333 			}
3334 			if (device->state.role == R_PRIMARY)
3335 				return 1;
3336 			return -1;
3337 		}
3338 
3339 		/* Both are secondary.
3340 		 * Really looks like recovery from simultaneous hard crash.
3341 		 * Check which had been primary before, and arbitrate. */
3342 		switch (rct) {
3343 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3344 		case 1: /*  self_pri && !peer_pri */ return 1;
3345 		case 2: /* !self_pri &&  peer_pri */ return -1;
3346 		case 3: /*  self_pri &&  peer_pri */
3347 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3348 			return dc ? -1 : 1;
3349 		}
3350 	}
3351 
3352 	*rule_nr = 50;
3353 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3354 	if (self == peer)
3355 		return -1;
3356 
3357 	*rule_nr = 51;
3358 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3359 	if (self == peer) {
3360 		if (connection->agreed_pro_version < 96 ?
3361 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3362 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3363 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3364 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3365 			   resync as sync source modifications of the peer's UUIDs. */
3366 
3367 			if (connection->agreed_pro_version < 91)
3368 				return -1091;
3369 
3370 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3371 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3372 
3373 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3374 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3375 
3376 			return -1;
3377 		}
3378 	}
3379 
3380 	*rule_nr = 60;
3381 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3382 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3383 		peer = device->p_uuid[i] & ~((u64)1);
3384 		if (self == peer)
3385 			return -2;
3386 	}
3387 
3388 	*rule_nr = 70;
3389 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3390 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3391 	if (self == peer)
3392 		return 1;
3393 
3394 	*rule_nr = 71;
3395 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3396 	if (self == peer) {
3397 		if (connection->agreed_pro_version < 96 ?
3398 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3399 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3400 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3401 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3402 			   resync as sync source modifications of our UUIDs. */
3403 
3404 			if (connection->agreed_pro_version < 91)
3405 				return -1091;
3406 
3407 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3408 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3409 
3410 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3411 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3412 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3413 
3414 			return 1;
3415 		}
3416 	}
3417 
3418 
3419 	*rule_nr = 80;
3420 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3421 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3422 		self = device->ldev->md.uuid[i] & ~((u64)1);
3423 		if (self == peer)
3424 			return 2;
3425 	}
3426 
3427 	*rule_nr = 90;
3428 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3429 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3430 	if (self == peer && self != ((u64)0))
3431 		return 100;
3432 
3433 	*rule_nr = 100;
3434 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3435 		self = device->ldev->md.uuid[i] & ~((u64)1);
3436 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3437 			peer = device->p_uuid[j] & ~((u64)1);
3438 			if (self == peer)
3439 				return -100;
3440 		}
3441 	}
3442 
3443 	return -1000;
3444 }
3445 
3446 /* drbd_sync_handshake() returns the new conn state on success, or
3447    CONN_MASK (-1) on failure.
3448  */
3449 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3450 					   enum drbd_role peer_role,
3451 					   enum drbd_disk_state peer_disk) __must_hold(local)
3452 {
3453 	struct drbd_device *device = peer_device->device;
3454 	enum drbd_conns rv = C_MASK;
3455 	enum drbd_disk_state mydisk;
3456 	struct net_conf *nc;
3457 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3458 
3459 	mydisk = device->state.disk;
3460 	if (mydisk == D_NEGOTIATING)
3461 		mydisk = device->new_state_tmp.disk;
3462 
3463 	drbd_info(device, "drbd_sync_handshake:\n");
3464 
3465 	spin_lock_irq(&device->ldev->md.uuid_lock);
3466 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3467 	drbd_uuid_dump(device, "peer", device->p_uuid,
3468 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3469 
3470 	hg = drbd_uuid_compare(peer_device, peer_role, &rule_nr);
3471 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3472 
3473 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3474 
3475 	if (hg == -1000) {
3476 		drbd_alert(device, "Unrelated data, aborting!\n");
3477 		return C_MASK;
3478 	}
3479 	if (hg < -0x10000) {
3480 		int proto, fflags;
3481 		hg = -hg;
3482 		proto = hg & 0xff;
3483 		fflags = (hg >> 8) & 0xff;
3484 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3485 					proto, fflags);
3486 		return C_MASK;
3487 	}
3488 	if (hg < -1000) {
3489 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3490 		return C_MASK;
3491 	}
3492 
3493 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3494 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3495 		int f = (hg == -100) || abs(hg) == 2;
3496 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3497 		if (f)
3498 			hg = hg*2;
3499 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3500 		     hg > 0 ? "source" : "target");
3501 	}
3502 
3503 	if (abs(hg) == 100)
3504 		drbd_khelper(device, "initial-split-brain");
3505 
3506 	rcu_read_lock();
3507 	nc = rcu_dereference(peer_device->connection->net_conf);
3508 	always_asbp = nc->always_asbp;
3509 	rr_conflict = nc->rr_conflict;
3510 	tentative = nc->tentative;
3511 	rcu_read_unlock();
3512 
3513 	if (hg == 100 || (hg == -100 && always_asbp)) {
3514 		int pcount = (device->state.role == R_PRIMARY)
3515 			   + (peer_role == R_PRIMARY);
3516 		int forced = (hg == -100);
3517 
3518 		switch (pcount) {
3519 		case 0:
3520 			hg = drbd_asb_recover_0p(peer_device);
3521 			break;
3522 		case 1:
3523 			hg = drbd_asb_recover_1p(peer_device);
3524 			break;
3525 		case 2:
3526 			hg = drbd_asb_recover_2p(peer_device);
3527 			break;
3528 		}
3529 		if (abs(hg) < 100) {
3530 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3531 			     "automatically solved. Sync from %s node\n",
3532 			     pcount, (hg < 0) ? "peer" : "this");
3533 			if (forced) {
3534 				drbd_warn(device, "Doing a full sync, since"
3535 				     " UUIDs where ambiguous.\n");
3536 				hg = hg*2;
3537 			}
3538 		}
3539 	}
3540 
3541 	if (hg == -100) {
3542 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3543 			hg = -1;
3544 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3545 			hg = 1;
3546 
3547 		if (abs(hg) < 100)
3548 			drbd_warn(device, "Split-Brain detected, manually solved. "
3549 			     "Sync from %s node\n",
3550 			     (hg < 0) ? "peer" : "this");
3551 	}
3552 
3553 	if (hg == -100) {
3554 		/* FIXME this log message is not correct if we end up here
3555 		 * after an attempted attach on a diskless node.
3556 		 * We just refuse to attach -- well, we drop the "connection"
3557 		 * to that disk, in a way... */
3558 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3559 		drbd_khelper(device, "split-brain");
3560 		return C_MASK;
3561 	}
3562 
3563 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3564 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3565 		return C_MASK;
3566 	}
3567 
3568 	if (hg < 0 && /* by intention we do not use mydisk here. */
3569 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3570 		switch (rr_conflict) {
3571 		case ASB_CALL_HELPER:
3572 			drbd_khelper(device, "pri-lost");
3573 			fallthrough;
3574 		case ASB_DISCONNECT:
3575 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3576 			return C_MASK;
3577 		case ASB_VIOLENTLY:
3578 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3579 			     "assumption\n");
3580 		}
3581 	}
3582 
3583 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3584 		if (hg == 0)
3585 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3586 		else
3587 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3588 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3589 				 abs(hg) >= 2 ? "full" : "bit-map based");
3590 		return C_MASK;
3591 	}
3592 
3593 	if (abs(hg) >= 2) {
3594 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3595 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3596 					BM_LOCKED_SET_ALLOWED, NULL))
3597 			return C_MASK;
3598 	}
3599 
3600 	if (hg > 0) { /* become sync source. */
3601 		rv = C_WF_BITMAP_S;
3602 	} else if (hg < 0) { /* become sync target */
3603 		rv = C_WF_BITMAP_T;
3604 	} else {
3605 		rv = C_CONNECTED;
3606 		if (drbd_bm_total_weight(device)) {
3607 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3608 			     drbd_bm_total_weight(device));
3609 		}
3610 	}
3611 
3612 	return rv;
3613 }
3614 
3615 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3616 {
3617 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3618 	if (peer == ASB_DISCARD_REMOTE)
3619 		return ASB_DISCARD_LOCAL;
3620 
3621 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3622 	if (peer == ASB_DISCARD_LOCAL)
3623 		return ASB_DISCARD_REMOTE;
3624 
3625 	/* everything else is valid if they are equal on both sides. */
3626 	return peer;
3627 }
3628 
3629 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3630 {
3631 	struct p_protocol *p = pi->data;
3632 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3633 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3634 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3635 	char integrity_alg[SHARED_SECRET_MAX] = "";
3636 	struct crypto_shash *peer_integrity_tfm = NULL;
3637 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3638 
3639 	p_proto		= be32_to_cpu(p->protocol);
3640 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3641 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3642 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3643 	p_two_primaries = be32_to_cpu(p->two_primaries);
3644 	cf		= be32_to_cpu(p->conn_flags);
3645 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3646 
3647 	if (connection->agreed_pro_version >= 87) {
3648 		int err;
3649 
3650 		if (pi->size > sizeof(integrity_alg))
3651 			return -EIO;
3652 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3653 		if (err)
3654 			return err;
3655 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3656 	}
3657 
3658 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3659 		clear_bit(CONN_DRY_RUN, &connection->flags);
3660 
3661 		if (cf & CF_DRY_RUN)
3662 			set_bit(CONN_DRY_RUN, &connection->flags);
3663 
3664 		rcu_read_lock();
3665 		nc = rcu_dereference(connection->net_conf);
3666 
3667 		if (p_proto != nc->wire_protocol) {
3668 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3669 			goto disconnect_rcu_unlock;
3670 		}
3671 
3672 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3673 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3674 			goto disconnect_rcu_unlock;
3675 		}
3676 
3677 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3678 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3679 			goto disconnect_rcu_unlock;
3680 		}
3681 
3682 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3683 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3684 			goto disconnect_rcu_unlock;
3685 		}
3686 
3687 		if (p_discard_my_data && nc->discard_my_data) {
3688 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3689 			goto disconnect_rcu_unlock;
3690 		}
3691 
3692 		if (p_two_primaries != nc->two_primaries) {
3693 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3694 			goto disconnect_rcu_unlock;
3695 		}
3696 
3697 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3698 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3699 			goto disconnect_rcu_unlock;
3700 		}
3701 
3702 		rcu_read_unlock();
3703 	}
3704 
3705 	if (integrity_alg[0]) {
3706 		int hash_size;
3707 
3708 		/*
3709 		 * We can only change the peer data integrity algorithm
3710 		 * here.  Changing our own data integrity algorithm
3711 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3712 		 * the same time; otherwise, the peer has no way to
3713 		 * tell between which packets the algorithm should
3714 		 * change.
3715 		 */
3716 
3717 		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3718 		if (IS_ERR(peer_integrity_tfm)) {
3719 			peer_integrity_tfm = NULL;
3720 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3721 				 integrity_alg);
3722 			goto disconnect;
3723 		}
3724 
3725 		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3726 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3727 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3728 		if (!(int_dig_in && int_dig_vv)) {
3729 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3730 			goto disconnect;
3731 		}
3732 	}
3733 
3734 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3735 	if (!new_net_conf)
3736 		goto disconnect;
3737 
3738 	mutex_lock(&connection->data.mutex);
3739 	mutex_lock(&connection->resource->conf_update);
3740 	old_net_conf = connection->net_conf;
3741 	*new_net_conf = *old_net_conf;
3742 
3743 	new_net_conf->wire_protocol = p_proto;
3744 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3745 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3746 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3747 	new_net_conf->two_primaries = p_two_primaries;
3748 
3749 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3750 	mutex_unlock(&connection->resource->conf_update);
3751 	mutex_unlock(&connection->data.mutex);
3752 
3753 	crypto_free_shash(connection->peer_integrity_tfm);
3754 	kfree(connection->int_dig_in);
3755 	kfree(connection->int_dig_vv);
3756 	connection->peer_integrity_tfm = peer_integrity_tfm;
3757 	connection->int_dig_in = int_dig_in;
3758 	connection->int_dig_vv = int_dig_vv;
3759 
3760 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3761 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3762 			  integrity_alg[0] ? integrity_alg : "(none)");
3763 
3764 	kvfree_rcu_mightsleep(old_net_conf);
3765 	return 0;
3766 
3767 disconnect_rcu_unlock:
3768 	rcu_read_unlock();
3769 disconnect:
3770 	crypto_free_shash(peer_integrity_tfm);
3771 	kfree(int_dig_in);
3772 	kfree(int_dig_vv);
3773 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3774 	return -EIO;
3775 }
3776 
3777 /* helper function
3778  * input: alg name, feature name
3779  * return: NULL (alg name was "")
3780  *         ERR_PTR(error) if something goes wrong
3781  *         or the crypto hash ptr, if it worked out ok. */
3782 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3783 		const struct drbd_device *device,
3784 		const char *alg, const char *name)
3785 {
3786 	struct crypto_shash *tfm;
3787 
3788 	if (!alg[0])
3789 		return NULL;
3790 
3791 	tfm = crypto_alloc_shash(alg, 0, 0);
3792 	if (IS_ERR(tfm)) {
3793 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3794 			alg, name, PTR_ERR(tfm));
3795 		return tfm;
3796 	}
3797 	return tfm;
3798 }
3799 
3800 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3801 {
3802 	void *buffer = connection->data.rbuf;
3803 	int size = pi->size;
3804 
3805 	while (size) {
3806 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3807 		s = drbd_recv(connection, buffer, s);
3808 		if (s <= 0) {
3809 			if (s < 0)
3810 				return s;
3811 			break;
3812 		}
3813 		size -= s;
3814 	}
3815 	if (size)
3816 		return -EIO;
3817 	return 0;
3818 }
3819 
3820 /*
3821  * config_unknown_volume  -  device configuration command for unknown volume
3822  *
3823  * When a device is added to an existing connection, the node on which the
3824  * device is added first will send configuration commands to its peer but the
3825  * peer will not know about the device yet.  It will warn and ignore these
3826  * commands.  Once the device is added on the second node, the second node will
3827  * send the same device configuration commands, but in the other direction.
3828  *
3829  * (We can also end up here if drbd is misconfigured.)
3830  */
3831 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3832 {
3833 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3834 		  cmdname(pi->cmd), pi->vnr);
3835 	return ignore_remaining_packet(connection, pi);
3836 }
3837 
3838 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3839 {
3840 	struct drbd_peer_device *peer_device;
3841 	struct drbd_device *device;
3842 	struct p_rs_param_95 *p;
3843 	unsigned int header_size, data_size, exp_max_sz;
3844 	struct crypto_shash *verify_tfm = NULL;
3845 	struct crypto_shash *csums_tfm = NULL;
3846 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3847 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3848 	const int apv = connection->agreed_pro_version;
3849 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3850 	unsigned int fifo_size = 0;
3851 	int err;
3852 
3853 	peer_device = conn_peer_device(connection, pi->vnr);
3854 	if (!peer_device)
3855 		return config_unknown_volume(connection, pi);
3856 	device = peer_device->device;
3857 
3858 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3859 		    : apv == 88 ? sizeof(struct p_rs_param)
3860 					+ SHARED_SECRET_MAX
3861 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3862 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3863 
3864 	if (pi->size > exp_max_sz) {
3865 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3866 		    pi->size, exp_max_sz);
3867 		return -EIO;
3868 	}
3869 
3870 	if (apv <= 88) {
3871 		header_size = sizeof(struct p_rs_param);
3872 		data_size = pi->size - header_size;
3873 	} else if (apv <= 94) {
3874 		header_size = sizeof(struct p_rs_param_89);
3875 		data_size = pi->size - header_size;
3876 		D_ASSERT(device, data_size == 0);
3877 	} else {
3878 		header_size = sizeof(struct p_rs_param_95);
3879 		data_size = pi->size - header_size;
3880 		D_ASSERT(device, data_size == 0);
3881 	}
3882 
3883 	/* initialize verify_alg and csums_alg */
3884 	p = pi->data;
3885 	BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
3886 	memset(&p->algs, 0, sizeof(p->algs));
3887 
3888 	err = drbd_recv_all(peer_device->connection, p, header_size);
3889 	if (err)
3890 		return err;
3891 
3892 	mutex_lock(&connection->resource->conf_update);
3893 	old_net_conf = peer_device->connection->net_conf;
3894 	if (get_ldev(device)) {
3895 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3896 		if (!new_disk_conf) {
3897 			put_ldev(device);
3898 			mutex_unlock(&connection->resource->conf_update);
3899 			drbd_err(device, "Allocation of new disk_conf failed\n");
3900 			return -ENOMEM;
3901 		}
3902 
3903 		old_disk_conf = device->ldev->disk_conf;
3904 		*new_disk_conf = *old_disk_conf;
3905 
3906 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3907 	}
3908 
3909 	if (apv >= 88) {
3910 		if (apv == 88) {
3911 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3912 				drbd_err(device, "verify-alg of wrong size, "
3913 					"peer wants %u, accepting only up to %u byte\n",
3914 					data_size, SHARED_SECRET_MAX);
3915 				goto reconnect;
3916 			}
3917 
3918 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3919 			if (err)
3920 				goto reconnect;
3921 			/* we expect NUL terminated string */
3922 			/* but just in case someone tries to be evil */
3923 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3924 			p->verify_alg[data_size-1] = 0;
3925 
3926 		} else /* apv >= 89 */ {
3927 			/* we still expect NUL terminated strings */
3928 			/* but just in case someone tries to be evil */
3929 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3930 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3931 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3932 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3933 		}
3934 
3935 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3936 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3937 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3938 				    old_net_conf->verify_alg, p->verify_alg);
3939 				goto disconnect;
3940 			}
3941 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3942 					p->verify_alg, "verify-alg");
3943 			if (IS_ERR(verify_tfm)) {
3944 				verify_tfm = NULL;
3945 				goto disconnect;
3946 			}
3947 		}
3948 
3949 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3950 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3951 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3952 				    old_net_conf->csums_alg, p->csums_alg);
3953 				goto disconnect;
3954 			}
3955 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3956 					p->csums_alg, "csums-alg");
3957 			if (IS_ERR(csums_tfm)) {
3958 				csums_tfm = NULL;
3959 				goto disconnect;
3960 			}
3961 		}
3962 
3963 		if (apv > 94 && new_disk_conf) {
3964 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3965 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3966 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3967 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3968 
3969 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3970 			if (fifo_size != device->rs_plan_s->size) {
3971 				new_plan = fifo_alloc(fifo_size);
3972 				if (!new_plan) {
3973 					drbd_err(device, "kmalloc of fifo_buffer failed");
3974 					put_ldev(device);
3975 					goto disconnect;
3976 				}
3977 			}
3978 		}
3979 
3980 		if (verify_tfm || csums_tfm) {
3981 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3982 			if (!new_net_conf)
3983 				goto disconnect;
3984 
3985 			*new_net_conf = *old_net_conf;
3986 
3987 			if (verify_tfm) {
3988 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3989 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3990 				crypto_free_shash(peer_device->connection->verify_tfm);
3991 				peer_device->connection->verify_tfm = verify_tfm;
3992 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3993 			}
3994 			if (csums_tfm) {
3995 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3996 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3997 				crypto_free_shash(peer_device->connection->csums_tfm);
3998 				peer_device->connection->csums_tfm = csums_tfm;
3999 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4000 			}
4001 			rcu_assign_pointer(connection->net_conf, new_net_conf);
4002 		}
4003 	}
4004 
4005 	if (new_disk_conf) {
4006 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4007 		put_ldev(device);
4008 	}
4009 
4010 	if (new_plan) {
4011 		old_plan = device->rs_plan_s;
4012 		rcu_assign_pointer(device->rs_plan_s, new_plan);
4013 	}
4014 
4015 	mutex_unlock(&connection->resource->conf_update);
4016 	synchronize_rcu();
4017 	if (new_net_conf)
4018 		kfree(old_net_conf);
4019 	kfree(old_disk_conf);
4020 	kfree(old_plan);
4021 
4022 	return 0;
4023 
4024 reconnect:
4025 	if (new_disk_conf) {
4026 		put_ldev(device);
4027 		kfree(new_disk_conf);
4028 	}
4029 	mutex_unlock(&connection->resource->conf_update);
4030 	return -EIO;
4031 
4032 disconnect:
4033 	kfree(new_plan);
4034 	if (new_disk_conf) {
4035 		put_ldev(device);
4036 		kfree(new_disk_conf);
4037 	}
4038 	mutex_unlock(&connection->resource->conf_update);
4039 	/* just for completeness: actually not needed,
4040 	 * as this is not reached if csums_tfm was ok. */
4041 	crypto_free_shash(csums_tfm);
4042 	/* but free the verify_tfm again, if csums_tfm did not work out */
4043 	crypto_free_shash(verify_tfm);
4044 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4045 	return -EIO;
4046 }
4047 
4048 /* warn if the arguments differ by more than 12.5% */
4049 static void warn_if_differ_considerably(struct drbd_device *device,
4050 	const char *s, sector_t a, sector_t b)
4051 {
4052 	sector_t d;
4053 	if (a == 0 || b == 0)
4054 		return;
4055 	d = (a > b) ? (a - b) : (b - a);
4056 	if (d > (a>>3) || d > (b>>3))
4057 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4058 		     (unsigned long long)a, (unsigned long long)b);
4059 }
4060 
4061 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4062 {
4063 	struct drbd_peer_device *peer_device;
4064 	struct drbd_device *device;
4065 	struct p_sizes *p = pi->data;
4066 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4067 	enum determine_dev_size dd = DS_UNCHANGED;
4068 	sector_t p_size, p_usize, p_csize, my_usize;
4069 	sector_t new_size, cur_size;
4070 	int ldsc = 0; /* local disk size changed */
4071 	enum dds_flags ddsf;
4072 
4073 	peer_device = conn_peer_device(connection, pi->vnr);
4074 	if (!peer_device)
4075 		return config_unknown_volume(connection, pi);
4076 	device = peer_device->device;
4077 	cur_size = get_capacity(device->vdisk);
4078 
4079 	p_size = be64_to_cpu(p->d_size);
4080 	p_usize = be64_to_cpu(p->u_size);
4081 	p_csize = be64_to_cpu(p->c_size);
4082 
4083 	/* just store the peer's disk size for now.
4084 	 * we still need to figure out whether we accept that. */
4085 	device->p_size = p_size;
4086 
4087 	if (get_ldev(device)) {
4088 		rcu_read_lock();
4089 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4090 		rcu_read_unlock();
4091 
4092 		warn_if_differ_considerably(device, "lower level device sizes",
4093 			   p_size, drbd_get_max_capacity(device->ldev));
4094 		warn_if_differ_considerably(device, "user requested size",
4095 					    p_usize, my_usize);
4096 
4097 		/* if this is the first connect, or an otherwise expected
4098 		 * param exchange, choose the minimum */
4099 		if (device->state.conn == C_WF_REPORT_PARAMS)
4100 			p_usize = min_not_zero(my_usize, p_usize);
4101 
4102 		/* Never shrink a device with usable data during connect,
4103 		 * or "attach" on the peer.
4104 		 * But allow online shrinking if we are connected. */
4105 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4106 		if (new_size < cur_size &&
4107 		    device->state.disk >= D_OUTDATED &&
4108 		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4109 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4110 					(unsigned long long)new_size, (unsigned long long)cur_size);
4111 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4112 			put_ldev(device);
4113 			return -EIO;
4114 		}
4115 
4116 		if (my_usize != p_usize) {
4117 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4118 
4119 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4120 			if (!new_disk_conf) {
4121 				put_ldev(device);
4122 				return -ENOMEM;
4123 			}
4124 
4125 			mutex_lock(&connection->resource->conf_update);
4126 			old_disk_conf = device->ldev->disk_conf;
4127 			*new_disk_conf = *old_disk_conf;
4128 			new_disk_conf->disk_size = p_usize;
4129 
4130 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4131 			mutex_unlock(&connection->resource->conf_update);
4132 			kvfree_rcu_mightsleep(old_disk_conf);
4133 
4134 			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4135 				 (unsigned long)p_usize, (unsigned long)my_usize);
4136 		}
4137 
4138 		put_ldev(device);
4139 	}
4140 
4141 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4142 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4143 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4144 	   drbd_reconsider_queue_parameters(), we can be sure that after
4145 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4146 
4147 	ddsf = be16_to_cpu(p->dds_flags);
4148 	if (get_ldev(device)) {
4149 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4150 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4151 		put_ldev(device);
4152 		if (dd == DS_ERROR)
4153 			return -EIO;
4154 		drbd_md_sync(device);
4155 	} else {
4156 		/*
4157 		 * I am diskless, need to accept the peer's *current* size.
4158 		 * I must NOT accept the peers backing disk size,
4159 		 * it may have been larger than mine all along...
4160 		 *
4161 		 * At this point, the peer knows more about my disk, or at
4162 		 * least about what we last agreed upon, than myself.
4163 		 * So if his c_size is less than his d_size, the most likely
4164 		 * reason is that *my* d_size was smaller last time we checked.
4165 		 *
4166 		 * However, if he sends a zero current size,
4167 		 * take his (user-capped or) backing disk size anyways.
4168 		 *
4169 		 * Unless of course he does not have a disk himself.
4170 		 * In which case we ignore this completely.
4171 		 */
4172 		sector_t new_size = p_csize ?: p_usize ?: p_size;
4173 		drbd_reconsider_queue_parameters(device, NULL, o);
4174 		if (new_size == 0) {
4175 			/* Ignore, peer does not know nothing. */
4176 		} else if (new_size == cur_size) {
4177 			/* nothing to do */
4178 		} else if (cur_size != 0 && p_size == 0) {
4179 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4180 					(unsigned long long)new_size, (unsigned long long)cur_size);
4181 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4182 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4183 					(unsigned long long)new_size, (unsigned long long)cur_size);
4184 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4185 			return -EIO;
4186 		} else {
4187 			/* I believe the peer, if
4188 			 *  - I don't have a current size myself
4189 			 *  - we agree on the size anyways
4190 			 *  - I do have a current size, am Secondary,
4191 			 *    and he has the only disk
4192 			 *  - I do have a current size, am Primary,
4193 			 *    and he has the only disk,
4194 			 *    which is larger than my current size
4195 			 */
4196 			drbd_set_my_capacity(device, new_size);
4197 		}
4198 	}
4199 
4200 	if (get_ldev(device)) {
4201 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4202 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4203 			ldsc = 1;
4204 		}
4205 
4206 		put_ldev(device);
4207 	}
4208 
4209 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4210 		if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4211 		    ldsc) {
4212 			/* we have different sizes, probably peer
4213 			 * needs to know my new size... */
4214 			drbd_send_sizes(peer_device, 0, ddsf);
4215 		}
4216 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4217 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4218 			if (device->state.pdsk >= D_INCONSISTENT &&
4219 			    device->state.disk >= D_INCONSISTENT) {
4220 				if (ddsf & DDSF_NO_RESYNC)
4221 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4222 				else
4223 					resync_after_online_grow(device);
4224 			} else
4225 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4226 		}
4227 	}
4228 
4229 	return 0;
4230 }
4231 
4232 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4233 {
4234 	struct drbd_peer_device *peer_device;
4235 	struct drbd_device *device;
4236 	struct p_uuids *p = pi->data;
4237 	u64 *p_uuid;
4238 	int i, updated_uuids = 0;
4239 
4240 	peer_device = conn_peer_device(connection, pi->vnr);
4241 	if (!peer_device)
4242 		return config_unknown_volume(connection, pi);
4243 	device = peer_device->device;
4244 
4245 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4246 	if (!p_uuid)
4247 		return false;
4248 
4249 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4250 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4251 
4252 	kfree(device->p_uuid);
4253 	device->p_uuid = p_uuid;
4254 
4255 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4256 	    device->state.disk < D_INCONSISTENT &&
4257 	    device->state.role == R_PRIMARY &&
4258 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4259 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4260 		    (unsigned long long)device->ed_uuid);
4261 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4262 		return -EIO;
4263 	}
4264 
4265 	if (get_ldev(device)) {
4266 		int skip_initial_sync =
4267 			device->state.conn == C_CONNECTED &&
4268 			peer_device->connection->agreed_pro_version >= 90 &&
4269 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4270 			(p_uuid[UI_FLAGS] & 8);
4271 		if (skip_initial_sync) {
4272 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4273 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4274 					"clear_n_write from receive_uuids",
4275 					BM_LOCKED_TEST_ALLOWED, NULL);
4276 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4277 			_drbd_uuid_set(device, UI_BITMAP, 0);
4278 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4279 					CS_VERBOSE, NULL);
4280 			drbd_md_sync(device);
4281 			updated_uuids = 1;
4282 		}
4283 		put_ldev(device);
4284 	} else if (device->state.disk < D_INCONSISTENT &&
4285 		   device->state.role == R_PRIMARY) {
4286 		/* I am a diskless primary, the peer just created a new current UUID
4287 		   for me. */
4288 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4289 	}
4290 
4291 	/* Before we test for the disk state, we should wait until an eventually
4292 	   ongoing cluster wide state change is finished. That is important if
4293 	   we are primary and are detaching from our disk. We need to see the
4294 	   new disk state... */
4295 	mutex_lock(device->state_mutex);
4296 	mutex_unlock(device->state_mutex);
4297 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4298 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4299 
4300 	if (updated_uuids)
4301 		drbd_print_uuids(device, "receiver updated UUIDs to");
4302 
4303 	return 0;
4304 }
4305 
4306 /**
4307  * convert_state() - Converts the peer's view of the cluster state to our point of view
4308  * @ps:		The state as seen by the peer.
4309  */
4310 static union drbd_state convert_state(union drbd_state ps)
4311 {
4312 	union drbd_state ms;
4313 
4314 	static enum drbd_conns c_tab[] = {
4315 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4316 		[C_CONNECTED] = C_CONNECTED,
4317 
4318 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4319 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4320 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4321 		[C_VERIFY_S]       = C_VERIFY_T,
4322 		[C_MASK]   = C_MASK,
4323 	};
4324 
4325 	ms.i = ps.i;
4326 
4327 	ms.conn = c_tab[ps.conn];
4328 	ms.peer = ps.role;
4329 	ms.role = ps.peer;
4330 	ms.pdsk = ps.disk;
4331 	ms.disk = ps.pdsk;
4332 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4333 
4334 	return ms;
4335 }
4336 
4337 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4338 {
4339 	struct drbd_peer_device *peer_device;
4340 	struct drbd_device *device;
4341 	struct p_req_state *p = pi->data;
4342 	union drbd_state mask, val;
4343 	enum drbd_state_rv rv;
4344 
4345 	peer_device = conn_peer_device(connection, pi->vnr);
4346 	if (!peer_device)
4347 		return -EIO;
4348 	device = peer_device->device;
4349 
4350 	mask.i = be32_to_cpu(p->mask);
4351 	val.i = be32_to_cpu(p->val);
4352 
4353 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4354 	    mutex_is_locked(device->state_mutex)) {
4355 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4356 		return 0;
4357 	}
4358 
4359 	mask = convert_state(mask);
4360 	val = convert_state(val);
4361 
4362 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4363 	drbd_send_sr_reply(peer_device, rv);
4364 
4365 	drbd_md_sync(device);
4366 
4367 	return 0;
4368 }
4369 
4370 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4371 {
4372 	struct p_req_state *p = pi->data;
4373 	union drbd_state mask, val;
4374 	enum drbd_state_rv rv;
4375 
4376 	mask.i = be32_to_cpu(p->mask);
4377 	val.i = be32_to_cpu(p->val);
4378 
4379 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4380 	    mutex_is_locked(&connection->cstate_mutex)) {
4381 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4382 		return 0;
4383 	}
4384 
4385 	mask = convert_state(mask);
4386 	val = convert_state(val);
4387 
4388 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4389 	conn_send_sr_reply(connection, rv);
4390 
4391 	return 0;
4392 }
4393 
4394 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4395 {
4396 	struct drbd_peer_device *peer_device;
4397 	struct drbd_device *device;
4398 	struct p_state *p = pi->data;
4399 	union drbd_state os, ns, peer_state;
4400 	enum drbd_disk_state real_peer_disk;
4401 	enum chg_state_flags cs_flags;
4402 	int rv;
4403 
4404 	peer_device = conn_peer_device(connection, pi->vnr);
4405 	if (!peer_device)
4406 		return config_unknown_volume(connection, pi);
4407 	device = peer_device->device;
4408 
4409 	peer_state.i = be32_to_cpu(p->state);
4410 
4411 	real_peer_disk = peer_state.disk;
4412 	if (peer_state.disk == D_NEGOTIATING) {
4413 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4414 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4415 	}
4416 
4417 	spin_lock_irq(&device->resource->req_lock);
4418  retry:
4419 	os = ns = drbd_read_state(device);
4420 	spin_unlock_irq(&device->resource->req_lock);
4421 
4422 	/* If some other part of the code (ack_receiver thread, timeout)
4423 	 * already decided to close the connection again,
4424 	 * we must not "re-establish" it here. */
4425 	if (os.conn <= C_TEAR_DOWN)
4426 		return -ECONNRESET;
4427 
4428 	/* If this is the "end of sync" confirmation, usually the peer disk
4429 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4430 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4431 	 * unpause-sync events has been "just right", the peer disk may
4432 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4433 	 */
4434 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4435 	    real_peer_disk == D_UP_TO_DATE &&
4436 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4437 		/* If we are (becoming) SyncSource, but peer is still in sync
4438 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4439 		 * will change to inconsistent once the peer reaches active
4440 		 * syncing states.
4441 		 * It may have changed syncer-paused flags, however, so we
4442 		 * cannot ignore this completely. */
4443 		if (peer_state.conn > C_CONNECTED &&
4444 		    peer_state.conn < C_SYNC_SOURCE)
4445 			real_peer_disk = D_INCONSISTENT;
4446 
4447 		/* if peer_state changes to connected at the same time,
4448 		 * it explicitly notifies us that it finished resync.
4449 		 * Maybe we should finish it up, too? */
4450 		else if (os.conn >= C_SYNC_SOURCE &&
4451 			 peer_state.conn == C_CONNECTED) {
4452 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4453 				drbd_resync_finished(peer_device);
4454 			return 0;
4455 		}
4456 	}
4457 
4458 	/* explicit verify finished notification, stop sector reached. */
4459 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4460 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4461 		ov_out_of_sync_print(peer_device);
4462 		drbd_resync_finished(peer_device);
4463 		return 0;
4464 	}
4465 
4466 	/* peer says his disk is inconsistent, while we think it is uptodate,
4467 	 * and this happens while the peer still thinks we have a sync going on,
4468 	 * but we think we are already done with the sync.
4469 	 * We ignore this to avoid flapping pdsk.
4470 	 * This should not happen, if the peer is a recent version of drbd. */
4471 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4472 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4473 		real_peer_disk = D_UP_TO_DATE;
4474 
4475 	if (ns.conn == C_WF_REPORT_PARAMS)
4476 		ns.conn = C_CONNECTED;
4477 
4478 	if (peer_state.conn == C_AHEAD)
4479 		ns.conn = C_BEHIND;
4480 
4481 	/* TODO:
4482 	 * if (primary and diskless and peer uuid != effective uuid)
4483 	 *     abort attach on peer;
4484 	 *
4485 	 * If this node does not have good data, was already connected, but
4486 	 * the peer did a late attach only now, trying to "negotiate" with me,
4487 	 * AND I am currently Primary, possibly frozen, with some specific
4488 	 * "effective" uuid, this should never be reached, really, because
4489 	 * we first send the uuids, then the current state.
4490 	 *
4491 	 * In this scenario, we already dropped the connection hard
4492 	 * when we received the unsuitable uuids (receive_uuids().
4493 	 *
4494 	 * Should we want to change this, that is: not drop the connection in
4495 	 * receive_uuids() already, then we would need to add a branch here
4496 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4497 	 * this node is currently Diskless Primary.
4498 	 */
4499 
4500 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4501 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4502 		int cr; /* consider resync */
4503 
4504 		/* if we established a new connection */
4505 		cr  = (os.conn < C_CONNECTED);
4506 		/* if we had an established connection
4507 		 * and one of the nodes newly attaches a disk */
4508 		cr |= (os.conn == C_CONNECTED &&
4509 		       (peer_state.disk == D_NEGOTIATING ||
4510 			os.disk == D_NEGOTIATING));
4511 		/* if we have both been inconsistent, and the peer has been
4512 		 * forced to be UpToDate with --force */
4513 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4514 		/* if we had been plain connected, and the admin requested to
4515 		 * start a sync by "invalidate" or "invalidate-remote" */
4516 		cr |= (os.conn == C_CONNECTED &&
4517 				(peer_state.conn >= C_STARTING_SYNC_S &&
4518 				 peer_state.conn <= C_WF_BITMAP_T));
4519 
4520 		if (cr)
4521 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4522 
4523 		put_ldev(device);
4524 		if (ns.conn == C_MASK) {
4525 			ns.conn = C_CONNECTED;
4526 			if (device->state.disk == D_NEGOTIATING) {
4527 				drbd_force_state(device, NS(disk, D_FAILED));
4528 			} else if (peer_state.disk == D_NEGOTIATING) {
4529 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4530 				peer_state.disk = D_DISKLESS;
4531 				real_peer_disk = D_DISKLESS;
4532 			} else {
4533 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4534 					return -EIO;
4535 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4536 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4537 				return -EIO;
4538 			}
4539 		}
4540 	}
4541 
4542 	spin_lock_irq(&device->resource->req_lock);
4543 	if (os.i != drbd_read_state(device).i)
4544 		goto retry;
4545 	clear_bit(CONSIDER_RESYNC, &device->flags);
4546 	ns.peer = peer_state.role;
4547 	ns.pdsk = real_peer_disk;
4548 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4549 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4550 		ns.disk = device->new_state_tmp.disk;
4551 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4552 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4553 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4554 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4555 		   for temporal network outages! */
4556 		spin_unlock_irq(&device->resource->req_lock);
4557 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4558 		tl_clear(peer_device->connection);
4559 		drbd_uuid_new_current(device);
4560 		clear_bit(NEW_CUR_UUID, &device->flags);
4561 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4562 		return -EIO;
4563 	}
4564 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4565 	ns = drbd_read_state(device);
4566 	spin_unlock_irq(&device->resource->req_lock);
4567 
4568 	if (rv < SS_SUCCESS) {
4569 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4570 		return -EIO;
4571 	}
4572 
4573 	if (os.conn > C_WF_REPORT_PARAMS) {
4574 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4575 		    peer_state.disk != D_NEGOTIATING ) {
4576 			/* we want resync, peer has not yet decided to sync... */
4577 			/* Nowadays only used when forcing a node into primary role and
4578 			   setting its disk to UpToDate with that */
4579 			drbd_send_uuids(peer_device);
4580 			drbd_send_current_state(peer_device);
4581 		}
4582 	}
4583 
4584 	clear_bit(DISCARD_MY_DATA, &device->flags);
4585 
4586 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4587 
4588 	return 0;
4589 }
4590 
4591 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4592 {
4593 	struct drbd_peer_device *peer_device;
4594 	struct drbd_device *device;
4595 	struct p_rs_uuid *p = pi->data;
4596 
4597 	peer_device = conn_peer_device(connection, pi->vnr);
4598 	if (!peer_device)
4599 		return -EIO;
4600 	device = peer_device->device;
4601 
4602 	wait_event(device->misc_wait,
4603 		   device->state.conn == C_WF_SYNC_UUID ||
4604 		   device->state.conn == C_BEHIND ||
4605 		   device->state.conn < C_CONNECTED ||
4606 		   device->state.disk < D_NEGOTIATING);
4607 
4608 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4609 
4610 	/* Here the _drbd_uuid_ functions are right, current should
4611 	   _not_ be rotated into the history */
4612 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4613 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4614 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4615 
4616 		drbd_print_uuids(device, "updated sync uuid");
4617 		drbd_start_resync(device, C_SYNC_TARGET);
4618 
4619 		put_ldev(device);
4620 	} else
4621 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4622 
4623 	return 0;
4624 }
4625 
4626 /*
4627  * receive_bitmap_plain
4628  *
4629  * Return 0 when done, 1 when another iteration is needed, and a negative error
4630  * code upon failure.
4631  */
4632 static int
4633 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4634 		     unsigned long *p, struct bm_xfer_ctx *c)
4635 {
4636 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4637 				 drbd_header_size(peer_device->connection);
4638 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4639 				       c->bm_words - c->word_offset);
4640 	unsigned int want = num_words * sizeof(*p);
4641 	int err;
4642 
4643 	if (want != size) {
4644 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4645 		return -EIO;
4646 	}
4647 	if (want == 0)
4648 		return 0;
4649 	err = drbd_recv_all(peer_device->connection, p, want);
4650 	if (err)
4651 		return err;
4652 
4653 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4654 
4655 	c->word_offset += num_words;
4656 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4657 	if (c->bit_offset > c->bm_bits)
4658 		c->bit_offset = c->bm_bits;
4659 
4660 	return 1;
4661 }
4662 
4663 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4664 {
4665 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4666 }
4667 
4668 static int dcbp_get_start(struct p_compressed_bm *p)
4669 {
4670 	return (p->encoding & 0x80) != 0;
4671 }
4672 
4673 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4674 {
4675 	return (p->encoding >> 4) & 0x7;
4676 }
4677 
4678 /*
4679  * recv_bm_rle_bits
4680  *
4681  * Return 0 when done, 1 when another iteration is needed, and a negative error
4682  * code upon failure.
4683  */
4684 static int
4685 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4686 		struct p_compressed_bm *p,
4687 		 struct bm_xfer_ctx *c,
4688 		 unsigned int len)
4689 {
4690 	struct bitstream bs;
4691 	u64 look_ahead;
4692 	u64 rl;
4693 	u64 tmp;
4694 	unsigned long s = c->bit_offset;
4695 	unsigned long e;
4696 	int toggle = dcbp_get_start(p);
4697 	int have;
4698 	int bits;
4699 
4700 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4701 
4702 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4703 	if (bits < 0)
4704 		return -EIO;
4705 
4706 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4707 		bits = vli_decode_bits(&rl, look_ahead);
4708 		if (bits <= 0)
4709 			return -EIO;
4710 
4711 		if (toggle) {
4712 			e = s + rl -1;
4713 			if (e >= c->bm_bits) {
4714 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4715 				return -EIO;
4716 			}
4717 			_drbd_bm_set_bits(peer_device->device, s, e);
4718 		}
4719 
4720 		if (have < bits) {
4721 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4722 				have, bits, look_ahead,
4723 				(unsigned int)(bs.cur.b - p->code),
4724 				(unsigned int)bs.buf_len);
4725 			return -EIO;
4726 		}
4727 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4728 		if (likely(bits < 64))
4729 			look_ahead >>= bits;
4730 		else
4731 			look_ahead = 0;
4732 		have -= bits;
4733 
4734 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4735 		if (bits < 0)
4736 			return -EIO;
4737 		look_ahead |= tmp << have;
4738 		have += bits;
4739 	}
4740 
4741 	c->bit_offset = s;
4742 	bm_xfer_ctx_bit_to_word_offset(c);
4743 
4744 	return (s != c->bm_bits);
4745 }
4746 
4747 /*
4748  * decode_bitmap_c
4749  *
4750  * Return 0 when done, 1 when another iteration is needed, and a negative error
4751  * code upon failure.
4752  */
4753 static int
4754 decode_bitmap_c(struct drbd_peer_device *peer_device,
4755 		struct p_compressed_bm *p,
4756 		struct bm_xfer_ctx *c,
4757 		unsigned int len)
4758 {
4759 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4760 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4761 
4762 	/* other variants had been implemented for evaluation,
4763 	 * but have been dropped as this one turned out to be "best"
4764 	 * during all our tests. */
4765 
4766 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4767 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4768 	return -EIO;
4769 }
4770 
4771 void INFO_bm_xfer_stats(struct drbd_peer_device *peer_device,
4772 		const char *direction, struct bm_xfer_ctx *c)
4773 {
4774 	/* what would it take to transfer it "plaintext" */
4775 	unsigned int header_size = drbd_header_size(peer_device->connection);
4776 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4777 	unsigned int plain =
4778 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4779 		c->bm_words * sizeof(unsigned long);
4780 	unsigned int total = c->bytes[0] + c->bytes[1];
4781 	unsigned int r;
4782 
4783 	/* total can not be zero. but just in case: */
4784 	if (total == 0)
4785 		return;
4786 
4787 	/* don't report if not compressed */
4788 	if (total >= plain)
4789 		return;
4790 
4791 	/* total < plain. check for overflow, still */
4792 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4793 		                    : (1000 * total / plain);
4794 
4795 	if (r > 1000)
4796 		r = 1000;
4797 
4798 	r = 1000 - r;
4799 	drbd_info(peer_device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4800 	     "total %u; compression: %u.%u%%\n",
4801 			direction,
4802 			c->bytes[1], c->packets[1],
4803 			c->bytes[0], c->packets[0],
4804 			total, r/10, r % 10);
4805 }
4806 
4807 /* Since we are processing the bitfield from lower addresses to higher,
4808    it does not matter if the process it in 32 bit chunks or 64 bit
4809    chunks as long as it is little endian. (Understand it as byte stream,
4810    beginning with the lowest byte...) If we would use big endian
4811    we would need to process it from the highest address to the lowest,
4812    in order to be agnostic to the 32 vs 64 bits issue.
4813 
4814    returns 0 on failure, 1 if we successfully received it. */
4815 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4816 {
4817 	struct drbd_peer_device *peer_device;
4818 	struct drbd_device *device;
4819 	struct bm_xfer_ctx c;
4820 	int err;
4821 
4822 	peer_device = conn_peer_device(connection, pi->vnr);
4823 	if (!peer_device)
4824 		return -EIO;
4825 	device = peer_device->device;
4826 
4827 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4828 	/* you are supposed to send additional out-of-sync information
4829 	 * if you actually set bits during this phase */
4830 
4831 	c = (struct bm_xfer_ctx) {
4832 		.bm_bits = drbd_bm_bits(device),
4833 		.bm_words = drbd_bm_words(device),
4834 	};
4835 
4836 	for(;;) {
4837 		if (pi->cmd == P_BITMAP)
4838 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4839 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4840 			/* MAYBE: sanity check that we speak proto >= 90,
4841 			 * and the feature is enabled! */
4842 			struct p_compressed_bm *p = pi->data;
4843 
4844 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4845 				drbd_err(device, "ReportCBitmap packet too large\n");
4846 				err = -EIO;
4847 				goto out;
4848 			}
4849 			if (pi->size <= sizeof(*p)) {
4850 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4851 				err = -EIO;
4852 				goto out;
4853 			}
4854 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4855 			if (err)
4856 			       goto out;
4857 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4858 		} else {
4859 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4860 			err = -EIO;
4861 			goto out;
4862 		}
4863 
4864 		c.packets[pi->cmd == P_BITMAP]++;
4865 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4866 
4867 		if (err <= 0) {
4868 			if (err < 0)
4869 				goto out;
4870 			break;
4871 		}
4872 		err = drbd_recv_header(peer_device->connection, pi);
4873 		if (err)
4874 			goto out;
4875 	}
4876 
4877 	INFO_bm_xfer_stats(peer_device, "receive", &c);
4878 
4879 	if (device->state.conn == C_WF_BITMAP_T) {
4880 		enum drbd_state_rv rv;
4881 
4882 		err = drbd_send_bitmap(device, peer_device);
4883 		if (err)
4884 			goto out;
4885 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4886 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4887 		D_ASSERT(device, rv == SS_SUCCESS);
4888 	} else if (device->state.conn != C_WF_BITMAP_S) {
4889 		/* admin may have requested C_DISCONNECTING,
4890 		 * other threads may have noticed network errors */
4891 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4892 		    drbd_conn_str(device->state.conn));
4893 	}
4894 	err = 0;
4895 
4896  out:
4897 	drbd_bm_unlock(device);
4898 	if (!err && device->state.conn == C_WF_BITMAP_S)
4899 		drbd_start_resync(device, C_SYNC_SOURCE);
4900 	return err;
4901 }
4902 
4903 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4904 {
4905 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4906 		 pi->cmd, pi->size);
4907 
4908 	return ignore_remaining_packet(connection, pi);
4909 }
4910 
4911 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4912 {
4913 	/* Make sure we've acked all the TCP data associated
4914 	 * with the data requests being unplugged */
4915 	tcp_sock_set_quickack(connection->data.socket->sk, 2);
4916 	return 0;
4917 }
4918 
4919 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4920 {
4921 	struct drbd_peer_device *peer_device;
4922 	struct drbd_device *device;
4923 	struct p_block_desc *p = pi->data;
4924 
4925 	peer_device = conn_peer_device(connection, pi->vnr);
4926 	if (!peer_device)
4927 		return -EIO;
4928 	device = peer_device->device;
4929 
4930 	switch (device->state.conn) {
4931 	case C_WF_SYNC_UUID:
4932 	case C_WF_BITMAP_T:
4933 	case C_BEHIND:
4934 			break;
4935 	default:
4936 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4937 				drbd_conn_str(device->state.conn));
4938 	}
4939 
4940 	drbd_set_out_of_sync(peer_device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4941 
4942 	return 0;
4943 }
4944 
4945 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4946 {
4947 	struct drbd_peer_device *peer_device;
4948 	struct p_block_desc *p = pi->data;
4949 	struct drbd_device *device;
4950 	sector_t sector;
4951 	int size, err = 0;
4952 
4953 	peer_device = conn_peer_device(connection, pi->vnr);
4954 	if (!peer_device)
4955 		return -EIO;
4956 	device = peer_device->device;
4957 
4958 	sector = be64_to_cpu(p->sector);
4959 	size = be32_to_cpu(p->blksize);
4960 
4961 	dec_rs_pending(peer_device);
4962 
4963 	if (get_ldev(device)) {
4964 		struct drbd_peer_request *peer_req;
4965 
4966 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4967 					       size, 0, GFP_NOIO);
4968 		if (!peer_req) {
4969 			put_ldev(device);
4970 			return -ENOMEM;
4971 		}
4972 
4973 		peer_req->w.cb = e_end_resync_block;
4974 		peer_req->opf = REQ_OP_DISCARD;
4975 		peer_req->submit_jif = jiffies;
4976 		peer_req->flags |= EE_TRIM;
4977 
4978 		spin_lock_irq(&device->resource->req_lock);
4979 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4980 		spin_unlock_irq(&device->resource->req_lock);
4981 
4982 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4983 		err = drbd_submit_peer_request(peer_req);
4984 
4985 		if (err) {
4986 			spin_lock_irq(&device->resource->req_lock);
4987 			list_del(&peer_req->w.list);
4988 			spin_unlock_irq(&device->resource->req_lock);
4989 
4990 			drbd_free_peer_req(device, peer_req);
4991 			put_ldev(device);
4992 			err = 0;
4993 			goto fail;
4994 		}
4995 
4996 		inc_unacked(device);
4997 
4998 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4999 		   as well as drbd_rs_complete_io() */
5000 	} else {
5001 	fail:
5002 		drbd_rs_complete_io(device, sector);
5003 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5004 	}
5005 
5006 	atomic_add(size >> 9, &device->rs_sect_in);
5007 
5008 	return err;
5009 }
5010 
5011 struct data_cmd {
5012 	int expect_payload;
5013 	unsigned int pkt_size;
5014 	int (*fn)(struct drbd_connection *, struct packet_info *);
5015 };
5016 
5017 static struct data_cmd drbd_cmd_handler[] = {
5018 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
5019 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
5020 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5021 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5022 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
5023 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5024 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5025 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5026 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5027 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
5028 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5029 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5030 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
5031 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
5032 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
5033 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5034 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5035 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5036 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5037 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5038 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5039 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5040 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5041 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5042 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5043 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
5044 	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
5045 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5046 };
5047 
5048 static void drbdd(struct drbd_connection *connection)
5049 {
5050 	struct packet_info pi;
5051 	size_t shs; /* sub header size */
5052 	int err;
5053 
5054 	while (get_t_state(&connection->receiver) == RUNNING) {
5055 		struct data_cmd const *cmd;
5056 
5057 		drbd_thread_current_set_cpu(&connection->receiver);
5058 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5059 		if (drbd_recv_header_maybe_unplug(connection, &pi))
5060 			goto err_out;
5061 
5062 		cmd = &drbd_cmd_handler[pi.cmd];
5063 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5064 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5065 				 cmdname(pi.cmd), pi.cmd);
5066 			goto err_out;
5067 		}
5068 
5069 		shs = cmd->pkt_size;
5070 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5071 			shs += sizeof(struct o_qlim);
5072 		if (pi.size > shs && !cmd->expect_payload) {
5073 			drbd_err(connection, "No payload expected %s l:%d\n",
5074 				 cmdname(pi.cmd), pi.size);
5075 			goto err_out;
5076 		}
5077 		if (pi.size < shs) {
5078 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5079 				 cmdname(pi.cmd), (int)shs, pi.size);
5080 			goto err_out;
5081 		}
5082 
5083 		if (shs) {
5084 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5085 			err = drbd_recv_all_warn(connection, pi.data, shs);
5086 			if (err)
5087 				goto err_out;
5088 			pi.size -= shs;
5089 		}
5090 
5091 		update_receiver_timing_details(connection, cmd->fn);
5092 		err = cmd->fn(connection, &pi);
5093 		if (err) {
5094 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5095 				 cmdname(pi.cmd), err, pi.size);
5096 			goto err_out;
5097 		}
5098 	}
5099 	return;
5100 
5101     err_out:
5102 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5103 }
5104 
5105 static void conn_disconnect(struct drbd_connection *connection)
5106 {
5107 	struct drbd_peer_device *peer_device;
5108 	enum drbd_conns oc;
5109 	int vnr;
5110 
5111 	if (connection->cstate == C_STANDALONE)
5112 		return;
5113 
5114 	/* We are about to start the cleanup after connection loss.
5115 	 * Make sure drbd_make_request knows about that.
5116 	 * Usually we should be in some network failure state already,
5117 	 * but just in case we are not, we fix it up here.
5118 	 */
5119 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5120 
5121 	/* ack_receiver does not clean up anything. it must not interfere, either */
5122 	drbd_thread_stop(&connection->ack_receiver);
5123 	if (connection->ack_sender) {
5124 		destroy_workqueue(connection->ack_sender);
5125 		connection->ack_sender = NULL;
5126 	}
5127 	drbd_free_sock(connection);
5128 
5129 	rcu_read_lock();
5130 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5131 		struct drbd_device *device = peer_device->device;
5132 		kref_get(&device->kref);
5133 		rcu_read_unlock();
5134 		drbd_disconnected(peer_device);
5135 		kref_put(&device->kref, drbd_destroy_device);
5136 		rcu_read_lock();
5137 	}
5138 	rcu_read_unlock();
5139 
5140 	if (!list_empty(&connection->current_epoch->list))
5141 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5142 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5143 	atomic_set(&connection->current_epoch->epoch_size, 0);
5144 	connection->send.seen_any_write_yet = false;
5145 
5146 	drbd_info(connection, "Connection closed\n");
5147 
5148 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5149 		conn_try_outdate_peer_async(connection);
5150 
5151 	spin_lock_irq(&connection->resource->req_lock);
5152 	oc = connection->cstate;
5153 	if (oc >= C_UNCONNECTED)
5154 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5155 
5156 	spin_unlock_irq(&connection->resource->req_lock);
5157 
5158 	if (oc == C_DISCONNECTING)
5159 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5160 }
5161 
5162 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5163 {
5164 	struct drbd_device *device = peer_device->device;
5165 	unsigned int i;
5166 
5167 	/* wait for current activity to cease. */
5168 	spin_lock_irq(&device->resource->req_lock);
5169 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5170 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5171 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5172 	spin_unlock_irq(&device->resource->req_lock);
5173 
5174 	/* We do not have data structures that would allow us to
5175 	 * get the rs_pending_cnt down to 0 again.
5176 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5177 	 *    the pending RSDataRequest's we have sent.
5178 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5179 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5180 	 *  And no, it is not the sum of the reference counts in the
5181 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5182 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5183 	 *  on the fly. */
5184 	drbd_rs_cancel_all(device);
5185 	device->rs_total = 0;
5186 	device->rs_failed = 0;
5187 	atomic_set(&device->rs_pending_cnt, 0);
5188 	wake_up(&device->misc_wait);
5189 
5190 	del_timer_sync(&device->resync_timer);
5191 	resync_timer_fn(&device->resync_timer);
5192 
5193 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5194 	 * w_make_resync_request etc. which may still be on the worker queue
5195 	 * to be "canceled" */
5196 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5197 
5198 	drbd_finish_peer_reqs(device);
5199 
5200 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5201 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5202 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5203 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5204 
5205 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5206 	 * again via drbd_try_clear_on_disk_bm(). */
5207 	drbd_rs_cancel_all(device);
5208 
5209 	kfree(device->p_uuid);
5210 	device->p_uuid = NULL;
5211 
5212 	if (!drbd_suspended(device))
5213 		tl_clear(peer_device->connection);
5214 
5215 	drbd_md_sync(device);
5216 
5217 	if (get_ldev(device)) {
5218 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5219 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED, NULL);
5220 		put_ldev(device);
5221 	}
5222 
5223 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5224 	 * want to use SO_LINGER, because apparently it can be deferred for
5225 	 * more than 20 seconds (longest time I checked).
5226 	 *
5227 	 * Actually we don't care for exactly when the network stack does its
5228 	 * put_page(), but release our reference on these pages right here.
5229 	 */
5230 	i = drbd_free_peer_reqs(device, &device->net_ee);
5231 	if (i)
5232 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5233 	i = atomic_read(&device->pp_in_use_by_net);
5234 	if (i)
5235 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5236 	i = atomic_read(&device->pp_in_use);
5237 	if (i)
5238 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5239 
5240 	D_ASSERT(device, list_empty(&device->read_ee));
5241 	D_ASSERT(device, list_empty(&device->active_ee));
5242 	D_ASSERT(device, list_empty(&device->sync_ee));
5243 	D_ASSERT(device, list_empty(&device->done_ee));
5244 
5245 	return 0;
5246 }
5247 
5248 /*
5249  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5250  * we can agree on is stored in agreed_pro_version.
5251  *
5252  * feature flags and the reserved array should be enough room for future
5253  * enhancements of the handshake protocol, and possible plugins...
5254  *
5255  * for now, they are expected to be zero, but ignored.
5256  */
5257 static int drbd_send_features(struct drbd_connection *connection)
5258 {
5259 	struct drbd_socket *sock;
5260 	struct p_connection_features *p;
5261 
5262 	sock = &connection->data;
5263 	p = conn_prepare_command(connection, sock);
5264 	if (!p)
5265 		return -EIO;
5266 	memset(p, 0, sizeof(*p));
5267 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5268 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5269 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5270 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5271 }
5272 
5273 /*
5274  * return values:
5275  *   1 yes, we have a valid connection
5276  *   0 oops, did not work out, please try again
5277  *  -1 peer talks different language,
5278  *     no point in trying again, please go standalone.
5279  */
5280 static int drbd_do_features(struct drbd_connection *connection)
5281 {
5282 	/* ASSERT current == connection->receiver ... */
5283 	struct p_connection_features *p;
5284 	const int expect = sizeof(struct p_connection_features);
5285 	struct packet_info pi;
5286 	int err;
5287 
5288 	err = drbd_send_features(connection);
5289 	if (err)
5290 		return 0;
5291 
5292 	err = drbd_recv_header(connection, &pi);
5293 	if (err)
5294 		return 0;
5295 
5296 	if (pi.cmd != P_CONNECTION_FEATURES) {
5297 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5298 			 cmdname(pi.cmd), pi.cmd);
5299 		return -1;
5300 	}
5301 
5302 	if (pi.size != expect) {
5303 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5304 		     expect, pi.size);
5305 		return -1;
5306 	}
5307 
5308 	p = pi.data;
5309 	err = drbd_recv_all_warn(connection, p, expect);
5310 	if (err)
5311 		return 0;
5312 
5313 	p->protocol_min = be32_to_cpu(p->protocol_min);
5314 	p->protocol_max = be32_to_cpu(p->protocol_max);
5315 	if (p->protocol_max == 0)
5316 		p->protocol_max = p->protocol_min;
5317 
5318 	if (PRO_VERSION_MAX < p->protocol_min ||
5319 	    PRO_VERSION_MIN > p->protocol_max)
5320 		goto incompat;
5321 
5322 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5323 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5324 
5325 	drbd_info(connection, "Handshake successful: "
5326 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5327 
5328 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5329 		  connection->agreed_features,
5330 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5331 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5332 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5333 		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5334 		  connection->agreed_features ? "" : " none");
5335 
5336 	return 1;
5337 
5338  incompat:
5339 	drbd_err(connection, "incompatible DRBD dialects: "
5340 	    "I support %d-%d, peer supports %d-%d\n",
5341 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5342 	    p->protocol_min, p->protocol_max);
5343 	return -1;
5344 }
5345 
5346 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5347 static int drbd_do_auth(struct drbd_connection *connection)
5348 {
5349 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5350 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5351 	return -1;
5352 }
5353 #else
5354 #define CHALLENGE_LEN 64
5355 
5356 /* Return value:
5357 	1 - auth succeeded,
5358 	0 - failed, try again (network error),
5359 	-1 - auth failed, don't try again.
5360 */
5361 
5362 static int drbd_do_auth(struct drbd_connection *connection)
5363 {
5364 	struct drbd_socket *sock;
5365 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5366 	char *response = NULL;
5367 	char *right_response = NULL;
5368 	char *peers_ch = NULL;
5369 	unsigned int key_len;
5370 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5371 	unsigned int resp_size;
5372 	struct shash_desc *desc;
5373 	struct packet_info pi;
5374 	struct net_conf *nc;
5375 	int err, rv;
5376 
5377 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5378 
5379 	rcu_read_lock();
5380 	nc = rcu_dereference(connection->net_conf);
5381 	key_len = strlen(nc->shared_secret);
5382 	memcpy(secret, nc->shared_secret, key_len);
5383 	rcu_read_unlock();
5384 
5385 	desc = kmalloc(sizeof(struct shash_desc) +
5386 		       crypto_shash_descsize(connection->cram_hmac_tfm),
5387 		       GFP_KERNEL);
5388 	if (!desc) {
5389 		rv = -1;
5390 		goto fail;
5391 	}
5392 	desc->tfm = connection->cram_hmac_tfm;
5393 
5394 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5395 	if (rv) {
5396 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5397 		rv = -1;
5398 		goto fail;
5399 	}
5400 
5401 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5402 
5403 	sock = &connection->data;
5404 	if (!conn_prepare_command(connection, sock)) {
5405 		rv = 0;
5406 		goto fail;
5407 	}
5408 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5409 				my_challenge, CHALLENGE_LEN);
5410 	if (!rv)
5411 		goto fail;
5412 
5413 	err = drbd_recv_header(connection, &pi);
5414 	if (err) {
5415 		rv = 0;
5416 		goto fail;
5417 	}
5418 
5419 	if (pi.cmd != P_AUTH_CHALLENGE) {
5420 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5421 			 cmdname(pi.cmd), pi.cmd);
5422 		rv = -1;
5423 		goto fail;
5424 	}
5425 
5426 	if (pi.size > CHALLENGE_LEN * 2) {
5427 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5428 		rv = -1;
5429 		goto fail;
5430 	}
5431 
5432 	if (pi.size < CHALLENGE_LEN) {
5433 		drbd_err(connection, "AuthChallenge payload too small.\n");
5434 		rv = -1;
5435 		goto fail;
5436 	}
5437 
5438 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5439 	if (!peers_ch) {
5440 		rv = -1;
5441 		goto fail;
5442 	}
5443 
5444 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5445 	if (err) {
5446 		rv = 0;
5447 		goto fail;
5448 	}
5449 
5450 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5451 		drbd_err(connection, "Peer presented the same challenge!\n");
5452 		rv = -1;
5453 		goto fail;
5454 	}
5455 
5456 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5457 	response = kmalloc(resp_size, GFP_NOIO);
5458 	if (!response) {
5459 		rv = -1;
5460 		goto fail;
5461 	}
5462 
5463 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5464 	if (rv) {
5465 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5466 		rv = -1;
5467 		goto fail;
5468 	}
5469 
5470 	if (!conn_prepare_command(connection, sock)) {
5471 		rv = 0;
5472 		goto fail;
5473 	}
5474 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5475 				response, resp_size);
5476 	if (!rv)
5477 		goto fail;
5478 
5479 	err = drbd_recv_header(connection, &pi);
5480 	if (err) {
5481 		rv = 0;
5482 		goto fail;
5483 	}
5484 
5485 	if (pi.cmd != P_AUTH_RESPONSE) {
5486 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5487 			 cmdname(pi.cmd), pi.cmd);
5488 		rv = 0;
5489 		goto fail;
5490 	}
5491 
5492 	if (pi.size != resp_size) {
5493 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5494 		rv = 0;
5495 		goto fail;
5496 	}
5497 
5498 	err = drbd_recv_all_warn(connection, response , resp_size);
5499 	if (err) {
5500 		rv = 0;
5501 		goto fail;
5502 	}
5503 
5504 	right_response = kmalloc(resp_size, GFP_NOIO);
5505 	if (!right_response) {
5506 		rv = -1;
5507 		goto fail;
5508 	}
5509 
5510 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5511 				 right_response);
5512 	if (rv) {
5513 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5514 		rv = -1;
5515 		goto fail;
5516 	}
5517 
5518 	rv = !memcmp(response, right_response, resp_size);
5519 
5520 	if (rv)
5521 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5522 		     resp_size);
5523 	else
5524 		rv = -1;
5525 
5526  fail:
5527 	kfree(peers_ch);
5528 	kfree(response);
5529 	kfree(right_response);
5530 	if (desc) {
5531 		shash_desc_zero(desc);
5532 		kfree(desc);
5533 	}
5534 
5535 	return rv;
5536 }
5537 #endif
5538 
5539 int drbd_receiver(struct drbd_thread *thi)
5540 {
5541 	struct drbd_connection *connection = thi->connection;
5542 	int h;
5543 
5544 	drbd_info(connection, "receiver (re)started\n");
5545 
5546 	do {
5547 		h = conn_connect(connection);
5548 		if (h == 0) {
5549 			conn_disconnect(connection);
5550 			schedule_timeout_interruptible(HZ);
5551 		}
5552 		if (h == -1) {
5553 			drbd_warn(connection, "Discarding network configuration.\n");
5554 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5555 		}
5556 	} while (h == 0);
5557 
5558 	if (h > 0) {
5559 		blk_start_plug(&connection->receiver_plug);
5560 		drbdd(connection);
5561 		blk_finish_plug(&connection->receiver_plug);
5562 	}
5563 
5564 	conn_disconnect(connection);
5565 
5566 	drbd_info(connection, "receiver terminated\n");
5567 	return 0;
5568 }
5569 
5570 /* ********* acknowledge sender ******** */
5571 
5572 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5573 {
5574 	struct p_req_state_reply *p = pi->data;
5575 	int retcode = be32_to_cpu(p->retcode);
5576 
5577 	if (retcode >= SS_SUCCESS) {
5578 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5579 	} else {
5580 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5581 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5582 			 drbd_set_st_err_str(retcode), retcode);
5583 	}
5584 	wake_up(&connection->ping_wait);
5585 
5586 	return 0;
5587 }
5588 
5589 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5590 {
5591 	struct drbd_peer_device *peer_device;
5592 	struct drbd_device *device;
5593 	struct p_req_state_reply *p = pi->data;
5594 	int retcode = be32_to_cpu(p->retcode);
5595 
5596 	peer_device = conn_peer_device(connection, pi->vnr);
5597 	if (!peer_device)
5598 		return -EIO;
5599 	device = peer_device->device;
5600 
5601 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5602 		D_ASSERT(device, connection->agreed_pro_version < 100);
5603 		return got_conn_RqSReply(connection, pi);
5604 	}
5605 
5606 	if (retcode >= SS_SUCCESS) {
5607 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5608 	} else {
5609 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5610 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5611 			drbd_set_st_err_str(retcode), retcode);
5612 	}
5613 	wake_up(&device->state_wait);
5614 
5615 	return 0;
5616 }
5617 
5618 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5619 {
5620 	return drbd_send_ping_ack(connection);
5621 
5622 }
5623 
5624 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5625 {
5626 	/* restore idle timeout */
5627 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5628 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5629 		wake_up(&connection->ping_wait);
5630 
5631 	return 0;
5632 }
5633 
5634 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5635 {
5636 	struct drbd_peer_device *peer_device;
5637 	struct drbd_device *device;
5638 	struct p_block_ack *p = pi->data;
5639 	sector_t sector = be64_to_cpu(p->sector);
5640 	int blksize = be32_to_cpu(p->blksize);
5641 
5642 	peer_device = conn_peer_device(connection, pi->vnr);
5643 	if (!peer_device)
5644 		return -EIO;
5645 	device = peer_device->device;
5646 
5647 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5648 
5649 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5650 
5651 	if (get_ldev(device)) {
5652 		drbd_rs_complete_io(device, sector);
5653 		drbd_set_in_sync(peer_device, sector, blksize);
5654 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5655 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5656 		put_ldev(device);
5657 	}
5658 	dec_rs_pending(peer_device);
5659 	atomic_add(blksize >> 9, &device->rs_sect_in);
5660 
5661 	return 0;
5662 }
5663 
5664 static int
5665 validate_req_change_req_state(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
5666 			      struct rb_root *root, const char *func,
5667 			      enum drbd_req_event what, bool missing_ok)
5668 {
5669 	struct drbd_device *device = peer_device->device;
5670 	struct drbd_request *req;
5671 	struct bio_and_error m;
5672 
5673 	spin_lock_irq(&device->resource->req_lock);
5674 	req = find_request(device, root, id, sector, missing_ok, func);
5675 	if (unlikely(!req)) {
5676 		spin_unlock_irq(&device->resource->req_lock);
5677 		return -EIO;
5678 	}
5679 	__req_mod(req, what, peer_device, &m);
5680 	spin_unlock_irq(&device->resource->req_lock);
5681 
5682 	if (m.bio)
5683 		complete_master_bio(device, &m);
5684 	return 0;
5685 }
5686 
5687 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5688 {
5689 	struct drbd_peer_device *peer_device;
5690 	struct drbd_device *device;
5691 	struct p_block_ack *p = pi->data;
5692 	sector_t sector = be64_to_cpu(p->sector);
5693 	int blksize = be32_to_cpu(p->blksize);
5694 	enum drbd_req_event what;
5695 
5696 	peer_device = conn_peer_device(connection, pi->vnr);
5697 	if (!peer_device)
5698 		return -EIO;
5699 	device = peer_device->device;
5700 
5701 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5702 
5703 	if (p->block_id == ID_SYNCER) {
5704 		drbd_set_in_sync(peer_device, sector, blksize);
5705 		dec_rs_pending(peer_device);
5706 		return 0;
5707 	}
5708 	switch (pi->cmd) {
5709 	case P_RS_WRITE_ACK:
5710 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5711 		break;
5712 	case P_WRITE_ACK:
5713 		what = WRITE_ACKED_BY_PEER;
5714 		break;
5715 	case P_RECV_ACK:
5716 		what = RECV_ACKED_BY_PEER;
5717 		break;
5718 	case P_SUPERSEDED:
5719 		what = CONFLICT_RESOLVED;
5720 		break;
5721 	case P_RETRY_WRITE:
5722 		what = POSTPONE_WRITE;
5723 		break;
5724 	default:
5725 		BUG();
5726 	}
5727 
5728 	return validate_req_change_req_state(peer_device, p->block_id, sector,
5729 					     &device->write_requests, __func__,
5730 					     what, false);
5731 }
5732 
5733 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5734 {
5735 	struct drbd_peer_device *peer_device;
5736 	struct drbd_device *device;
5737 	struct p_block_ack *p = pi->data;
5738 	sector_t sector = be64_to_cpu(p->sector);
5739 	int size = be32_to_cpu(p->blksize);
5740 	int err;
5741 
5742 	peer_device = conn_peer_device(connection, pi->vnr);
5743 	if (!peer_device)
5744 		return -EIO;
5745 	device = peer_device->device;
5746 
5747 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5748 
5749 	if (p->block_id == ID_SYNCER) {
5750 		dec_rs_pending(peer_device);
5751 		drbd_rs_failed_io(peer_device, sector, size);
5752 		return 0;
5753 	}
5754 
5755 	err = validate_req_change_req_state(peer_device, p->block_id, sector,
5756 					    &device->write_requests, __func__,
5757 					    NEG_ACKED, true);
5758 	if (err) {
5759 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5760 		   The master bio might already be completed, therefore the
5761 		   request is no longer in the collision hash. */
5762 		/* In Protocol B we might already have got a P_RECV_ACK
5763 		   but then get a P_NEG_ACK afterwards. */
5764 		drbd_set_out_of_sync(peer_device, sector, size);
5765 	}
5766 	return 0;
5767 }
5768 
5769 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5770 {
5771 	struct drbd_peer_device *peer_device;
5772 	struct drbd_device *device;
5773 	struct p_block_ack *p = pi->data;
5774 	sector_t sector = be64_to_cpu(p->sector);
5775 
5776 	peer_device = conn_peer_device(connection, pi->vnr);
5777 	if (!peer_device)
5778 		return -EIO;
5779 	device = peer_device->device;
5780 
5781 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5782 
5783 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5784 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5785 
5786 	return validate_req_change_req_state(peer_device, p->block_id, sector,
5787 					     &device->read_requests, __func__,
5788 					     NEG_ACKED, false);
5789 }
5790 
5791 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5792 {
5793 	struct drbd_peer_device *peer_device;
5794 	struct drbd_device *device;
5795 	sector_t sector;
5796 	int size;
5797 	struct p_block_ack *p = pi->data;
5798 
5799 	peer_device = conn_peer_device(connection, pi->vnr);
5800 	if (!peer_device)
5801 		return -EIO;
5802 	device = peer_device->device;
5803 
5804 	sector = be64_to_cpu(p->sector);
5805 	size = be32_to_cpu(p->blksize);
5806 
5807 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5808 
5809 	dec_rs_pending(peer_device);
5810 
5811 	if (get_ldev_if_state(device, D_FAILED)) {
5812 		drbd_rs_complete_io(device, sector);
5813 		switch (pi->cmd) {
5814 		case P_NEG_RS_DREPLY:
5815 			drbd_rs_failed_io(peer_device, sector, size);
5816 			break;
5817 		case P_RS_CANCEL:
5818 			break;
5819 		default:
5820 			BUG();
5821 		}
5822 		put_ldev(device);
5823 	}
5824 
5825 	return 0;
5826 }
5827 
5828 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5829 {
5830 	struct p_barrier_ack *p = pi->data;
5831 	struct drbd_peer_device *peer_device;
5832 	int vnr;
5833 
5834 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5835 
5836 	rcu_read_lock();
5837 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5838 		struct drbd_device *device = peer_device->device;
5839 
5840 		if (device->state.conn == C_AHEAD &&
5841 		    atomic_read(&device->ap_in_flight) == 0 &&
5842 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5843 			device->start_resync_timer.expires = jiffies + HZ;
5844 			add_timer(&device->start_resync_timer);
5845 		}
5846 	}
5847 	rcu_read_unlock();
5848 
5849 	return 0;
5850 }
5851 
5852 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5853 {
5854 	struct drbd_peer_device *peer_device;
5855 	struct drbd_device *device;
5856 	struct p_block_ack *p = pi->data;
5857 	struct drbd_device_work *dw;
5858 	sector_t sector;
5859 	int size;
5860 
5861 	peer_device = conn_peer_device(connection, pi->vnr);
5862 	if (!peer_device)
5863 		return -EIO;
5864 	device = peer_device->device;
5865 
5866 	sector = be64_to_cpu(p->sector);
5867 	size = be32_to_cpu(p->blksize);
5868 
5869 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5870 
5871 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5872 		drbd_ov_out_of_sync_found(peer_device, sector, size);
5873 	else
5874 		ov_out_of_sync_print(peer_device);
5875 
5876 	if (!get_ldev(device))
5877 		return 0;
5878 
5879 	drbd_rs_complete_io(device, sector);
5880 	dec_rs_pending(peer_device);
5881 
5882 	--device->ov_left;
5883 
5884 	/* let's advance progress step marks only for every other megabyte */
5885 	if ((device->ov_left & 0x200) == 0x200)
5886 		drbd_advance_rs_marks(peer_device, device->ov_left);
5887 
5888 	if (device->ov_left == 0) {
5889 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5890 		if (dw) {
5891 			dw->w.cb = w_ov_finished;
5892 			dw->device = device;
5893 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5894 		} else {
5895 			drbd_err(device, "kmalloc(dw) failed.");
5896 			ov_out_of_sync_print(peer_device);
5897 			drbd_resync_finished(peer_device);
5898 		}
5899 	}
5900 	put_ldev(device);
5901 	return 0;
5902 }
5903 
5904 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5905 {
5906 	return 0;
5907 }
5908 
5909 struct meta_sock_cmd {
5910 	size_t pkt_size;
5911 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5912 };
5913 
5914 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5915 {
5916 	long t;
5917 	struct net_conf *nc;
5918 
5919 	rcu_read_lock();
5920 	nc = rcu_dereference(connection->net_conf);
5921 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5922 	rcu_read_unlock();
5923 
5924 	t *= HZ;
5925 	if (ping_timeout)
5926 		t /= 10;
5927 
5928 	connection->meta.socket->sk->sk_rcvtimeo = t;
5929 }
5930 
5931 static void set_ping_timeout(struct drbd_connection *connection)
5932 {
5933 	set_rcvtimeo(connection, 1);
5934 }
5935 
5936 static void set_idle_timeout(struct drbd_connection *connection)
5937 {
5938 	set_rcvtimeo(connection, 0);
5939 }
5940 
5941 static struct meta_sock_cmd ack_receiver_tbl[] = {
5942 	[P_PING]	    = { 0, got_Ping },
5943 	[P_PING_ACK]	    = { 0, got_PingAck },
5944 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5945 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5946 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5947 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5948 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5949 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5950 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5951 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5952 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5953 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5954 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5955 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5956 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5957 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5958 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5959 };
5960 
5961 int drbd_ack_receiver(struct drbd_thread *thi)
5962 {
5963 	struct drbd_connection *connection = thi->connection;
5964 	struct meta_sock_cmd *cmd = NULL;
5965 	struct packet_info pi;
5966 	unsigned long pre_recv_jif;
5967 	int rv;
5968 	void *buf    = connection->meta.rbuf;
5969 	int received = 0;
5970 	unsigned int header_size = drbd_header_size(connection);
5971 	int expect   = header_size;
5972 	bool ping_timeout_active = false;
5973 
5974 	sched_set_fifo_low(current);
5975 
5976 	while (get_t_state(thi) == RUNNING) {
5977 		drbd_thread_current_set_cpu(thi);
5978 
5979 		conn_reclaim_net_peer_reqs(connection);
5980 
5981 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5982 			if (drbd_send_ping(connection)) {
5983 				drbd_err(connection, "drbd_send_ping has failed\n");
5984 				goto reconnect;
5985 			}
5986 			set_ping_timeout(connection);
5987 			ping_timeout_active = true;
5988 		}
5989 
5990 		pre_recv_jif = jiffies;
5991 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5992 
5993 		/* Note:
5994 		 * -EINTR	 (on meta) we got a signal
5995 		 * -EAGAIN	 (on meta) rcvtimeo expired
5996 		 * -ECONNRESET	 other side closed the connection
5997 		 * -ERESTARTSYS  (on data) we got a signal
5998 		 * rv <  0	 other than above: unexpected error!
5999 		 * rv == expected: full header or command
6000 		 * rv <  expected: "woken" by signal during receive
6001 		 * rv == 0	 : "connection shut down by peer"
6002 		 */
6003 		if (likely(rv > 0)) {
6004 			received += rv;
6005 			buf	 += rv;
6006 		} else if (rv == 0) {
6007 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6008 				long t;
6009 				rcu_read_lock();
6010 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6011 				rcu_read_unlock();
6012 
6013 				t = wait_event_timeout(connection->ping_wait,
6014 						       connection->cstate < C_WF_REPORT_PARAMS,
6015 						       t);
6016 				if (t)
6017 					break;
6018 			}
6019 			drbd_err(connection, "meta connection shut down by peer.\n");
6020 			goto reconnect;
6021 		} else if (rv == -EAGAIN) {
6022 			/* If the data socket received something meanwhile,
6023 			 * that is good enough: peer is still alive. */
6024 			if (time_after(connection->last_received, pre_recv_jif))
6025 				continue;
6026 			if (ping_timeout_active) {
6027 				drbd_err(connection, "PingAck did not arrive in time.\n");
6028 				goto reconnect;
6029 			}
6030 			set_bit(SEND_PING, &connection->flags);
6031 			continue;
6032 		} else if (rv == -EINTR) {
6033 			/* maybe drbd_thread_stop(): the while condition will notice.
6034 			 * maybe woken for send_ping: we'll send a ping above,
6035 			 * and change the rcvtimeo */
6036 			flush_signals(current);
6037 			continue;
6038 		} else {
6039 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6040 			goto reconnect;
6041 		}
6042 
6043 		if (received == expect && cmd == NULL) {
6044 			if (decode_header(connection, connection->meta.rbuf, &pi))
6045 				goto reconnect;
6046 			cmd = &ack_receiver_tbl[pi.cmd];
6047 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6048 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6049 					 cmdname(pi.cmd), pi.cmd);
6050 				goto disconnect;
6051 			}
6052 			expect = header_size + cmd->pkt_size;
6053 			if (pi.size != expect - header_size) {
6054 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6055 					pi.cmd, pi.size);
6056 				goto reconnect;
6057 			}
6058 		}
6059 		if (received == expect) {
6060 			bool err;
6061 
6062 			err = cmd->fn(connection, &pi);
6063 			if (err) {
6064 				drbd_err(connection, "%ps failed\n", cmd->fn);
6065 				goto reconnect;
6066 			}
6067 
6068 			connection->last_received = jiffies;
6069 
6070 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6071 				set_idle_timeout(connection);
6072 				ping_timeout_active = false;
6073 			}
6074 
6075 			buf	 = connection->meta.rbuf;
6076 			received = 0;
6077 			expect	 = header_size;
6078 			cmd	 = NULL;
6079 		}
6080 	}
6081 
6082 	if (0) {
6083 reconnect:
6084 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6085 		conn_md_sync(connection);
6086 	}
6087 	if (0) {
6088 disconnect:
6089 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6090 	}
6091 
6092 	drbd_info(connection, "ack_receiver terminated\n");
6093 
6094 	return 0;
6095 }
6096 
6097 void drbd_send_acks_wf(struct work_struct *ws)
6098 {
6099 	struct drbd_peer_device *peer_device =
6100 		container_of(ws, struct drbd_peer_device, send_acks_work);
6101 	struct drbd_connection *connection = peer_device->connection;
6102 	struct drbd_device *device = peer_device->device;
6103 	struct net_conf *nc;
6104 	int tcp_cork, err;
6105 
6106 	rcu_read_lock();
6107 	nc = rcu_dereference(connection->net_conf);
6108 	tcp_cork = nc->tcp_cork;
6109 	rcu_read_unlock();
6110 
6111 	if (tcp_cork)
6112 		tcp_sock_set_cork(connection->meta.socket->sk, true);
6113 
6114 	err = drbd_finish_peer_reqs(device);
6115 	kref_put(&device->kref, drbd_destroy_device);
6116 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6117 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6118 
6119 	if (err) {
6120 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6121 		return;
6122 	}
6123 
6124 	if (tcp_cork)
6125 		tcp_sock_set_cork(connection->meta.socket->sk, false);
6126 
6127 	return;
6128 }
6129