1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_receiver.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11  */
12 
13 
14 #include <linux/module.h>
15 
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18 
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #define __KERNEL_SYSCALLS__
31 #include <linux/unistd.h>
32 #include <linux/vmalloc.h>
33 #include <linux/random.h>
34 #include <linux/string.h>
35 #include <linux/scatterlist.h>
36 #include <linux/part_stat.h>
37 #include "drbd_int.h"
38 #include "drbd_protocol.h"
39 #include "drbd_req.h"
40 #include "drbd_vli.h"
41 
42 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
43 
44 struct packet_info {
45 	enum drbd_packet cmd;
46 	unsigned int size;
47 	unsigned int vnr;
48 	void *data;
49 };
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_features(struct drbd_connection *connection);
58 static int drbd_do_auth(struct drbd_connection *connection);
59 static int drbd_disconnected(struct drbd_peer_device *);
60 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
61 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
62 static int e_end_block(struct drbd_work *, int);
63 
64 
65 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
66 
67 /*
68  * some helper functions to deal with single linked page lists,
69  * page->private being our "next" pointer.
70  */
71 
72 /* If at least n pages are linked at head, get n pages off.
73  * Otherwise, don't modify head, and return NULL.
74  * Locking is the responsibility of the caller.
75  */
76 static struct page *page_chain_del(struct page **head, int n)
77 {
78 	struct page *page;
79 	struct page *tmp;
80 
81 	BUG_ON(!n);
82 	BUG_ON(!head);
83 
84 	page = *head;
85 
86 	if (!page)
87 		return NULL;
88 
89 	while (page) {
90 		tmp = page_chain_next(page);
91 		if (--n == 0)
92 			break; /* found sufficient pages */
93 		if (tmp == NULL)
94 			/* insufficient pages, don't use any of them. */
95 			return NULL;
96 		page = tmp;
97 	}
98 
99 	/* add end of list marker for the returned list */
100 	set_page_private(page, 0);
101 	/* actual return value, and adjustment of head */
102 	page = *head;
103 	*head = tmp;
104 	return page;
105 }
106 
107 /* may be used outside of locks to find the tail of a (usually short)
108  * "private" page chain, before adding it back to a global chain head
109  * with page_chain_add() under a spinlock. */
110 static struct page *page_chain_tail(struct page *page, int *len)
111 {
112 	struct page *tmp;
113 	int i = 1;
114 	while ((tmp = page_chain_next(page))) {
115 		++i;
116 		page = tmp;
117 	}
118 	if (len)
119 		*len = i;
120 	return page;
121 }
122 
123 static int page_chain_free(struct page *page)
124 {
125 	struct page *tmp;
126 	int i = 0;
127 	page_chain_for_each_safe(page, tmp) {
128 		put_page(page);
129 		++i;
130 	}
131 	return i;
132 }
133 
134 static void page_chain_add(struct page **head,
135 		struct page *chain_first, struct page *chain_last)
136 {
137 #if 1
138 	struct page *tmp;
139 	tmp = page_chain_tail(chain_first, NULL);
140 	BUG_ON(tmp != chain_last);
141 #endif
142 
143 	/* add chain to head */
144 	set_page_private(chain_last, (unsigned long)*head);
145 	*head = chain_first;
146 }
147 
148 static struct page *__drbd_alloc_pages(struct drbd_device *device,
149 				       unsigned int number)
150 {
151 	struct page *page = NULL;
152 	struct page *tmp = NULL;
153 	unsigned int i = 0;
154 
155 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
156 	 * So what. It saves a spin_lock. */
157 	if (drbd_pp_vacant >= number) {
158 		spin_lock(&drbd_pp_lock);
159 		page = page_chain_del(&drbd_pp_pool, number);
160 		if (page)
161 			drbd_pp_vacant -= number;
162 		spin_unlock(&drbd_pp_lock);
163 		if (page)
164 			return page;
165 	}
166 
167 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
168 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
169 	 * which in turn might block on the other node at this very place.  */
170 	for (i = 0; i < number; i++) {
171 		tmp = alloc_page(GFP_TRY);
172 		if (!tmp)
173 			break;
174 		set_page_private(tmp, (unsigned long)page);
175 		page = tmp;
176 	}
177 
178 	if (i == number)
179 		return page;
180 
181 	/* Not enough pages immediately available this time.
182 	 * No need to jump around here, drbd_alloc_pages will retry this
183 	 * function "soon". */
184 	if (page) {
185 		tmp = page_chain_tail(page, NULL);
186 		spin_lock(&drbd_pp_lock);
187 		page_chain_add(&drbd_pp_pool, page, tmp);
188 		drbd_pp_vacant += i;
189 		spin_unlock(&drbd_pp_lock);
190 	}
191 	return NULL;
192 }
193 
194 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
195 					   struct list_head *to_be_freed)
196 {
197 	struct drbd_peer_request *peer_req, *tmp;
198 
199 	/* The EEs are always appended to the end of the list. Since
200 	   they are sent in order over the wire, they have to finish
201 	   in order. As soon as we see the first not finished we can
202 	   stop to examine the list... */
203 
204 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
205 		if (drbd_peer_req_has_active_page(peer_req))
206 			break;
207 		list_move(&peer_req->w.list, to_be_freed);
208 	}
209 }
210 
211 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
212 {
213 	LIST_HEAD(reclaimed);
214 	struct drbd_peer_request *peer_req, *t;
215 
216 	spin_lock_irq(&device->resource->req_lock);
217 	reclaim_finished_net_peer_reqs(device, &reclaimed);
218 	spin_unlock_irq(&device->resource->req_lock);
219 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
220 		drbd_free_net_peer_req(device, peer_req);
221 }
222 
223 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
224 {
225 	struct drbd_peer_device *peer_device;
226 	int vnr;
227 
228 	rcu_read_lock();
229 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
230 		struct drbd_device *device = peer_device->device;
231 		if (!atomic_read(&device->pp_in_use_by_net))
232 			continue;
233 
234 		kref_get(&device->kref);
235 		rcu_read_unlock();
236 		drbd_reclaim_net_peer_reqs(device);
237 		kref_put(&device->kref, drbd_destroy_device);
238 		rcu_read_lock();
239 	}
240 	rcu_read_unlock();
241 }
242 
243 /**
244  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
245  * @device:	DRBD device.
246  * @number:	number of pages requested
247  * @retry:	whether to retry, if not enough pages are available right now
248  *
249  * Tries to allocate number pages, first from our own page pool, then from
250  * the kernel.
251  * Possibly retry until DRBD frees sufficient pages somewhere else.
252  *
253  * If this allocation would exceed the max_buffers setting, we throttle
254  * allocation (schedule_timeout) to give the system some room to breathe.
255  *
256  * We do not use max-buffers as hard limit, because it could lead to
257  * congestion and further to a distributed deadlock during online-verify or
258  * (checksum based) resync, if the max-buffers, socket buffer sizes and
259  * resync-rate settings are mis-configured.
260  *
261  * Returns a page chain linked via page->private.
262  */
263 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
264 			      bool retry)
265 {
266 	struct drbd_device *device = peer_device->device;
267 	struct page *page = NULL;
268 	struct net_conf *nc;
269 	DEFINE_WAIT(wait);
270 	unsigned int mxb;
271 
272 	rcu_read_lock();
273 	nc = rcu_dereference(peer_device->connection->net_conf);
274 	mxb = nc ? nc->max_buffers : 1000000;
275 	rcu_read_unlock();
276 
277 	if (atomic_read(&device->pp_in_use) < mxb)
278 		page = __drbd_alloc_pages(device, number);
279 
280 	/* Try to keep the fast path fast, but occasionally we need
281 	 * to reclaim the pages we lended to the network stack. */
282 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
283 		drbd_reclaim_net_peer_reqs(device);
284 
285 	while (page == NULL) {
286 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
287 
288 		drbd_reclaim_net_peer_reqs(device);
289 
290 		if (atomic_read(&device->pp_in_use) < mxb) {
291 			page = __drbd_alloc_pages(device, number);
292 			if (page)
293 				break;
294 		}
295 
296 		if (!retry)
297 			break;
298 
299 		if (signal_pending(current)) {
300 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
301 			break;
302 		}
303 
304 		if (schedule_timeout(HZ/10) == 0)
305 			mxb = UINT_MAX;
306 	}
307 	finish_wait(&drbd_pp_wait, &wait);
308 
309 	if (page)
310 		atomic_add(number, &device->pp_in_use);
311 	return page;
312 }
313 
314 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
315  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
316  * Either links the page chain back to the global pool,
317  * or returns all pages to the system. */
318 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
319 {
320 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
321 	int i;
322 
323 	if (page == NULL)
324 		return;
325 
326 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
327 		i = page_chain_free(page);
328 	else {
329 		struct page *tmp;
330 		tmp = page_chain_tail(page, &i);
331 		spin_lock(&drbd_pp_lock);
332 		page_chain_add(&drbd_pp_pool, page, tmp);
333 		drbd_pp_vacant += i;
334 		spin_unlock(&drbd_pp_lock);
335 	}
336 	i = atomic_sub_return(i, a);
337 	if (i < 0)
338 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
339 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
340 	wake_up(&drbd_pp_wait);
341 }
342 
343 /*
344 You need to hold the req_lock:
345  _drbd_wait_ee_list_empty()
346 
347 You must not have the req_lock:
348  drbd_free_peer_req()
349  drbd_alloc_peer_req()
350  drbd_free_peer_reqs()
351  drbd_ee_fix_bhs()
352  drbd_finish_peer_reqs()
353  drbd_clear_done_ee()
354  drbd_wait_ee_list_empty()
355 */
356 
357 /* normal: payload_size == request size (bi_size)
358  * w_same: payload_size == logical_block_size
359  * trim: payload_size == 0 */
360 struct drbd_peer_request *
361 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
362 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
363 {
364 	struct drbd_device *device = peer_device->device;
365 	struct drbd_peer_request *peer_req;
366 	struct page *page = NULL;
367 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
368 
369 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
370 		return NULL;
371 
372 	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
373 	if (!peer_req) {
374 		if (!(gfp_mask & __GFP_NOWARN))
375 			drbd_err(device, "%s: allocation failed\n", __func__);
376 		return NULL;
377 	}
378 
379 	if (nr_pages) {
380 		page = drbd_alloc_pages(peer_device, nr_pages,
381 					gfpflags_allow_blocking(gfp_mask));
382 		if (!page)
383 			goto fail;
384 	}
385 
386 	memset(peer_req, 0, sizeof(*peer_req));
387 	INIT_LIST_HEAD(&peer_req->w.list);
388 	drbd_clear_interval(&peer_req->i);
389 	peer_req->i.size = request_size;
390 	peer_req->i.sector = sector;
391 	peer_req->submit_jif = jiffies;
392 	peer_req->peer_device = peer_device;
393 	peer_req->pages = page;
394 	/*
395 	 * The block_id is opaque to the receiver.  It is not endianness
396 	 * converted, and sent back to the sender unchanged.
397 	 */
398 	peer_req->block_id = id;
399 
400 	return peer_req;
401 
402  fail:
403 	mempool_free(peer_req, &drbd_ee_mempool);
404 	return NULL;
405 }
406 
407 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
408 		       int is_net)
409 {
410 	might_sleep();
411 	if (peer_req->flags & EE_HAS_DIGEST)
412 		kfree(peer_req->digest);
413 	drbd_free_pages(device, peer_req->pages, is_net);
414 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
415 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
416 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
417 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
418 		drbd_al_complete_io(device, &peer_req->i);
419 	}
420 	mempool_free(peer_req, &drbd_ee_mempool);
421 }
422 
423 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
424 {
425 	LIST_HEAD(work_list);
426 	struct drbd_peer_request *peer_req, *t;
427 	int count = 0;
428 	int is_net = list == &device->net_ee;
429 
430 	spin_lock_irq(&device->resource->req_lock);
431 	list_splice_init(list, &work_list);
432 	spin_unlock_irq(&device->resource->req_lock);
433 
434 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
435 		__drbd_free_peer_req(device, peer_req, is_net);
436 		count++;
437 	}
438 	return count;
439 }
440 
441 /*
442  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
443  */
444 static int drbd_finish_peer_reqs(struct drbd_device *device)
445 {
446 	LIST_HEAD(work_list);
447 	LIST_HEAD(reclaimed);
448 	struct drbd_peer_request *peer_req, *t;
449 	int err = 0;
450 
451 	spin_lock_irq(&device->resource->req_lock);
452 	reclaim_finished_net_peer_reqs(device, &reclaimed);
453 	list_splice_init(&device->done_ee, &work_list);
454 	spin_unlock_irq(&device->resource->req_lock);
455 
456 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
457 		drbd_free_net_peer_req(device, peer_req);
458 
459 	/* possible callbacks here:
460 	 * e_end_block, and e_end_resync_block, e_send_superseded.
461 	 * all ignore the last argument.
462 	 */
463 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
464 		int err2;
465 
466 		/* list_del not necessary, next/prev members not touched */
467 		err2 = peer_req->w.cb(&peer_req->w, !!err);
468 		if (!err)
469 			err = err2;
470 		drbd_free_peer_req(device, peer_req);
471 	}
472 	wake_up(&device->ee_wait);
473 
474 	return err;
475 }
476 
477 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
478 				     struct list_head *head)
479 {
480 	DEFINE_WAIT(wait);
481 
482 	/* avoids spin_lock/unlock
483 	 * and calling prepare_to_wait in the fast path */
484 	while (!list_empty(head)) {
485 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
486 		spin_unlock_irq(&device->resource->req_lock);
487 		io_schedule();
488 		finish_wait(&device->ee_wait, &wait);
489 		spin_lock_irq(&device->resource->req_lock);
490 	}
491 }
492 
493 static void drbd_wait_ee_list_empty(struct drbd_device *device,
494 				    struct list_head *head)
495 {
496 	spin_lock_irq(&device->resource->req_lock);
497 	_drbd_wait_ee_list_empty(device, head);
498 	spin_unlock_irq(&device->resource->req_lock);
499 }
500 
501 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
502 {
503 	struct kvec iov = {
504 		.iov_base = buf,
505 		.iov_len = size,
506 	};
507 	struct msghdr msg = {
508 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
509 	};
510 	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
511 	return sock_recvmsg(sock, &msg, msg.msg_flags);
512 }
513 
514 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
515 {
516 	int rv;
517 
518 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
519 
520 	if (rv < 0) {
521 		if (rv == -ECONNRESET)
522 			drbd_info(connection, "sock was reset by peer\n");
523 		else if (rv != -ERESTARTSYS)
524 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
525 	} else if (rv == 0) {
526 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
527 			long t;
528 			rcu_read_lock();
529 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
530 			rcu_read_unlock();
531 
532 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
533 
534 			if (t)
535 				goto out;
536 		}
537 		drbd_info(connection, "sock was shut down by peer\n");
538 	}
539 
540 	if (rv != size)
541 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
542 
543 out:
544 	return rv;
545 }
546 
547 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
548 {
549 	int err;
550 
551 	err = drbd_recv(connection, buf, size);
552 	if (err != size) {
553 		if (err >= 0)
554 			err = -EIO;
555 	} else
556 		err = 0;
557 	return err;
558 }
559 
560 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
561 {
562 	int err;
563 
564 	err = drbd_recv_all(connection, buf, size);
565 	if (err && !signal_pending(current))
566 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
567 	return err;
568 }
569 
570 /* quoting tcp(7):
571  *   On individual connections, the socket buffer size must be set prior to the
572  *   listen(2) or connect(2) calls in order to have it take effect.
573  * This is our wrapper to do so.
574  */
575 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
576 		unsigned int rcv)
577 {
578 	/* open coded SO_SNDBUF, SO_RCVBUF */
579 	if (snd) {
580 		sock->sk->sk_sndbuf = snd;
581 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
582 	}
583 	if (rcv) {
584 		sock->sk->sk_rcvbuf = rcv;
585 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
586 	}
587 }
588 
589 static struct socket *drbd_try_connect(struct drbd_connection *connection)
590 {
591 	const char *what;
592 	struct socket *sock;
593 	struct sockaddr_in6 src_in6;
594 	struct sockaddr_in6 peer_in6;
595 	struct net_conf *nc;
596 	int err, peer_addr_len, my_addr_len;
597 	int sndbuf_size, rcvbuf_size, connect_int;
598 	int disconnect_on_error = 1;
599 
600 	rcu_read_lock();
601 	nc = rcu_dereference(connection->net_conf);
602 	if (!nc) {
603 		rcu_read_unlock();
604 		return NULL;
605 	}
606 	sndbuf_size = nc->sndbuf_size;
607 	rcvbuf_size = nc->rcvbuf_size;
608 	connect_int = nc->connect_int;
609 	rcu_read_unlock();
610 
611 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
612 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
613 
614 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
615 		src_in6.sin6_port = 0;
616 	else
617 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
618 
619 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
620 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
621 
622 	what = "sock_create_kern";
623 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
624 			       SOCK_STREAM, IPPROTO_TCP, &sock);
625 	if (err < 0) {
626 		sock = NULL;
627 		goto out;
628 	}
629 
630 	sock->sk->sk_rcvtimeo =
631 	sock->sk->sk_sndtimeo = connect_int * HZ;
632 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
633 
634        /* explicitly bind to the configured IP as source IP
635 	*  for the outgoing connections.
636 	*  This is needed for multihomed hosts and to be
637 	*  able to use lo: interfaces for drbd.
638 	* Make sure to use 0 as port number, so linux selects
639 	*  a free one dynamically.
640 	*/
641 	what = "bind before connect";
642 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
643 	if (err < 0)
644 		goto out;
645 
646 	/* connect may fail, peer not yet available.
647 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
648 	disconnect_on_error = 0;
649 	what = "connect";
650 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
651 
652 out:
653 	if (err < 0) {
654 		if (sock) {
655 			sock_release(sock);
656 			sock = NULL;
657 		}
658 		switch (-err) {
659 			/* timeout, busy, signal pending */
660 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
661 		case EINTR: case ERESTARTSYS:
662 			/* peer not (yet) available, network problem */
663 		case ECONNREFUSED: case ENETUNREACH:
664 		case EHOSTDOWN:    case EHOSTUNREACH:
665 			disconnect_on_error = 0;
666 			break;
667 		default:
668 			drbd_err(connection, "%s failed, err = %d\n", what, err);
669 		}
670 		if (disconnect_on_error)
671 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
672 	}
673 
674 	return sock;
675 }
676 
677 struct accept_wait_data {
678 	struct drbd_connection *connection;
679 	struct socket *s_listen;
680 	struct completion door_bell;
681 	void (*original_sk_state_change)(struct sock *sk);
682 
683 };
684 
685 static void drbd_incoming_connection(struct sock *sk)
686 {
687 	struct accept_wait_data *ad = sk->sk_user_data;
688 	void (*state_change)(struct sock *sk);
689 
690 	state_change = ad->original_sk_state_change;
691 	if (sk->sk_state == TCP_ESTABLISHED)
692 		complete(&ad->door_bell);
693 	state_change(sk);
694 }
695 
696 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
697 {
698 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
699 	struct sockaddr_in6 my_addr;
700 	struct socket *s_listen;
701 	struct net_conf *nc;
702 	const char *what;
703 
704 	rcu_read_lock();
705 	nc = rcu_dereference(connection->net_conf);
706 	if (!nc) {
707 		rcu_read_unlock();
708 		return -EIO;
709 	}
710 	sndbuf_size = nc->sndbuf_size;
711 	rcvbuf_size = nc->rcvbuf_size;
712 	rcu_read_unlock();
713 
714 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
715 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
716 
717 	what = "sock_create_kern";
718 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
719 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
720 	if (err) {
721 		s_listen = NULL;
722 		goto out;
723 	}
724 
725 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
726 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
727 
728 	what = "bind before listen";
729 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
730 	if (err < 0)
731 		goto out;
732 
733 	ad->s_listen = s_listen;
734 	write_lock_bh(&s_listen->sk->sk_callback_lock);
735 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
736 	s_listen->sk->sk_state_change = drbd_incoming_connection;
737 	s_listen->sk->sk_user_data = ad;
738 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
739 
740 	what = "listen";
741 	err = s_listen->ops->listen(s_listen, 5);
742 	if (err < 0)
743 		goto out;
744 
745 	return 0;
746 out:
747 	if (s_listen)
748 		sock_release(s_listen);
749 	if (err < 0) {
750 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751 			drbd_err(connection, "%s failed, err = %d\n", what, err);
752 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
753 		}
754 	}
755 
756 	return -EIO;
757 }
758 
759 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
760 {
761 	write_lock_bh(&sk->sk_callback_lock);
762 	sk->sk_state_change = ad->original_sk_state_change;
763 	sk->sk_user_data = NULL;
764 	write_unlock_bh(&sk->sk_callback_lock);
765 }
766 
767 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
768 {
769 	int timeo, connect_int, err = 0;
770 	struct socket *s_estab = NULL;
771 	struct net_conf *nc;
772 
773 	rcu_read_lock();
774 	nc = rcu_dereference(connection->net_conf);
775 	if (!nc) {
776 		rcu_read_unlock();
777 		return NULL;
778 	}
779 	connect_int = nc->connect_int;
780 	rcu_read_unlock();
781 
782 	timeo = connect_int * HZ;
783 	/* 28.5% random jitter */
784 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
785 
786 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
787 	if (err <= 0)
788 		return NULL;
789 
790 	err = kernel_accept(ad->s_listen, &s_estab, 0);
791 	if (err < 0) {
792 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
793 			drbd_err(connection, "accept failed, err = %d\n", err);
794 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
795 		}
796 	}
797 
798 	if (s_estab)
799 		unregister_state_change(s_estab->sk, ad);
800 
801 	return s_estab;
802 }
803 
804 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
805 
806 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
807 			     enum drbd_packet cmd)
808 {
809 	if (!conn_prepare_command(connection, sock))
810 		return -EIO;
811 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
812 }
813 
814 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
815 {
816 	unsigned int header_size = drbd_header_size(connection);
817 	struct packet_info pi;
818 	struct net_conf *nc;
819 	int err;
820 
821 	rcu_read_lock();
822 	nc = rcu_dereference(connection->net_conf);
823 	if (!nc) {
824 		rcu_read_unlock();
825 		return -EIO;
826 	}
827 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
828 	rcu_read_unlock();
829 
830 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
831 	if (err != header_size) {
832 		if (err >= 0)
833 			err = -EIO;
834 		return err;
835 	}
836 	err = decode_header(connection, connection->data.rbuf, &pi);
837 	if (err)
838 		return err;
839 	return pi.cmd;
840 }
841 
842 /**
843  * drbd_socket_okay() - Free the socket if its connection is not okay
844  * @sock:	pointer to the pointer to the socket.
845  */
846 static bool drbd_socket_okay(struct socket **sock)
847 {
848 	int rr;
849 	char tb[4];
850 
851 	if (!*sock)
852 		return false;
853 
854 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
855 
856 	if (rr > 0 || rr == -EAGAIN) {
857 		return true;
858 	} else {
859 		sock_release(*sock);
860 		*sock = NULL;
861 		return false;
862 	}
863 }
864 
865 static bool connection_established(struct drbd_connection *connection,
866 				   struct socket **sock1,
867 				   struct socket **sock2)
868 {
869 	struct net_conf *nc;
870 	int timeout;
871 	bool ok;
872 
873 	if (!*sock1 || !*sock2)
874 		return false;
875 
876 	rcu_read_lock();
877 	nc = rcu_dereference(connection->net_conf);
878 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
879 	rcu_read_unlock();
880 	schedule_timeout_interruptible(timeout);
881 
882 	ok = drbd_socket_okay(sock1);
883 	ok = drbd_socket_okay(sock2) && ok;
884 
885 	return ok;
886 }
887 
888 /* Gets called if a connection is established, or if a new minor gets created
889    in a connection */
890 int drbd_connected(struct drbd_peer_device *peer_device)
891 {
892 	struct drbd_device *device = peer_device->device;
893 	int err;
894 
895 	atomic_set(&device->packet_seq, 0);
896 	device->peer_seq = 0;
897 
898 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
899 		&peer_device->connection->cstate_mutex :
900 		&device->own_state_mutex;
901 
902 	err = drbd_send_sync_param(peer_device);
903 	if (!err)
904 		err = drbd_send_sizes(peer_device, 0, 0);
905 	if (!err)
906 		err = drbd_send_uuids(peer_device);
907 	if (!err)
908 		err = drbd_send_current_state(peer_device);
909 	clear_bit(USE_DEGR_WFC_T, &device->flags);
910 	clear_bit(RESIZE_PENDING, &device->flags);
911 	atomic_set(&device->ap_in_flight, 0);
912 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
913 	return err;
914 }
915 
916 /*
917  * return values:
918  *   1 yes, we have a valid connection
919  *   0 oops, did not work out, please try again
920  *  -1 peer talks different language,
921  *     no point in trying again, please go standalone.
922  *  -2 We do not have a network config...
923  */
924 static int conn_connect(struct drbd_connection *connection)
925 {
926 	struct drbd_socket sock, msock;
927 	struct drbd_peer_device *peer_device;
928 	struct net_conf *nc;
929 	int vnr, timeout, h;
930 	bool discard_my_data, ok;
931 	enum drbd_state_rv rv;
932 	struct accept_wait_data ad = {
933 		.connection = connection,
934 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
935 	};
936 
937 	clear_bit(DISCONNECT_SENT, &connection->flags);
938 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
939 		return -2;
940 
941 	mutex_init(&sock.mutex);
942 	sock.sbuf = connection->data.sbuf;
943 	sock.rbuf = connection->data.rbuf;
944 	sock.socket = NULL;
945 	mutex_init(&msock.mutex);
946 	msock.sbuf = connection->meta.sbuf;
947 	msock.rbuf = connection->meta.rbuf;
948 	msock.socket = NULL;
949 
950 	/* Assume that the peer only understands protocol 80 until we know better.  */
951 	connection->agreed_pro_version = 80;
952 
953 	if (prepare_listen_socket(connection, &ad))
954 		return 0;
955 
956 	do {
957 		struct socket *s;
958 
959 		s = drbd_try_connect(connection);
960 		if (s) {
961 			if (!sock.socket) {
962 				sock.socket = s;
963 				send_first_packet(connection, &sock, P_INITIAL_DATA);
964 			} else if (!msock.socket) {
965 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
966 				msock.socket = s;
967 				send_first_packet(connection, &msock, P_INITIAL_META);
968 			} else {
969 				drbd_err(connection, "Logic error in conn_connect()\n");
970 				goto out_release_sockets;
971 			}
972 		}
973 
974 		if (connection_established(connection, &sock.socket, &msock.socket))
975 			break;
976 
977 retry:
978 		s = drbd_wait_for_connect(connection, &ad);
979 		if (s) {
980 			int fp = receive_first_packet(connection, s);
981 			drbd_socket_okay(&sock.socket);
982 			drbd_socket_okay(&msock.socket);
983 			switch (fp) {
984 			case P_INITIAL_DATA:
985 				if (sock.socket) {
986 					drbd_warn(connection, "initial packet S crossed\n");
987 					sock_release(sock.socket);
988 					sock.socket = s;
989 					goto randomize;
990 				}
991 				sock.socket = s;
992 				break;
993 			case P_INITIAL_META:
994 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
995 				if (msock.socket) {
996 					drbd_warn(connection, "initial packet M crossed\n");
997 					sock_release(msock.socket);
998 					msock.socket = s;
999 					goto randomize;
1000 				}
1001 				msock.socket = s;
1002 				break;
1003 			default:
1004 				drbd_warn(connection, "Error receiving initial packet\n");
1005 				sock_release(s);
1006 randomize:
1007 				if (prandom_u32() & 1)
1008 					goto retry;
1009 			}
1010 		}
1011 
1012 		if (connection->cstate <= C_DISCONNECTING)
1013 			goto out_release_sockets;
1014 		if (signal_pending(current)) {
1015 			flush_signals(current);
1016 			smp_rmb();
1017 			if (get_t_state(&connection->receiver) == EXITING)
1018 				goto out_release_sockets;
1019 		}
1020 
1021 		ok = connection_established(connection, &sock.socket, &msock.socket);
1022 	} while (!ok);
1023 
1024 	if (ad.s_listen)
1025 		sock_release(ad.s_listen);
1026 
1027 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1028 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1029 
1030 	sock.socket->sk->sk_allocation = GFP_NOIO;
1031 	msock.socket->sk->sk_allocation = GFP_NOIO;
1032 
1033 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1034 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1035 
1036 	/* NOT YET ...
1037 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1038 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1039 	 * first set it to the P_CONNECTION_FEATURES timeout,
1040 	 * which we set to 4x the configured ping_timeout. */
1041 	rcu_read_lock();
1042 	nc = rcu_dereference(connection->net_conf);
1043 
1044 	sock.socket->sk->sk_sndtimeo =
1045 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1046 
1047 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1048 	timeout = nc->timeout * HZ / 10;
1049 	discard_my_data = nc->discard_my_data;
1050 	rcu_read_unlock();
1051 
1052 	msock.socket->sk->sk_sndtimeo = timeout;
1053 
1054 	/* we don't want delays.
1055 	 * we use TCP_CORK where appropriate, though */
1056 	tcp_sock_set_nodelay(sock.socket->sk);
1057 	tcp_sock_set_nodelay(msock.socket->sk);
1058 
1059 	connection->data.socket = sock.socket;
1060 	connection->meta.socket = msock.socket;
1061 	connection->last_received = jiffies;
1062 
1063 	h = drbd_do_features(connection);
1064 	if (h <= 0)
1065 		return h;
1066 
1067 	if (connection->cram_hmac_tfm) {
1068 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1069 		switch (drbd_do_auth(connection)) {
1070 		case -1:
1071 			drbd_err(connection, "Authentication of peer failed\n");
1072 			return -1;
1073 		case 0:
1074 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1075 			return 0;
1076 		}
1077 	}
1078 
1079 	connection->data.socket->sk->sk_sndtimeo = timeout;
1080 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1081 
1082 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1083 		return -1;
1084 
1085 	/* Prevent a race between resync-handshake and
1086 	 * being promoted to Primary.
1087 	 *
1088 	 * Grab and release the state mutex, so we know that any current
1089 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1090 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1091 	 */
1092 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1093 		mutex_lock(peer_device->device->state_mutex);
1094 
1095 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1096 	spin_lock_irq(&connection->resource->req_lock);
1097 	set_bit(STATE_SENT, &connection->flags);
1098 	spin_unlock_irq(&connection->resource->req_lock);
1099 
1100 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 		mutex_unlock(peer_device->device->state_mutex);
1102 
1103 	rcu_read_lock();
1104 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105 		struct drbd_device *device = peer_device->device;
1106 		kref_get(&device->kref);
1107 		rcu_read_unlock();
1108 
1109 		if (discard_my_data)
1110 			set_bit(DISCARD_MY_DATA, &device->flags);
1111 		else
1112 			clear_bit(DISCARD_MY_DATA, &device->flags);
1113 
1114 		drbd_connected(peer_device);
1115 		kref_put(&device->kref, drbd_destroy_device);
1116 		rcu_read_lock();
1117 	}
1118 	rcu_read_unlock();
1119 
1120 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122 		clear_bit(STATE_SENT, &connection->flags);
1123 		return 0;
1124 	}
1125 
1126 	drbd_thread_start(&connection->ack_receiver);
1127 	/* opencoded create_singlethread_workqueue(),
1128 	 * to be able to use format string arguments */
1129 	connection->ack_sender =
1130 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131 	if (!connection->ack_sender) {
1132 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133 		return 0;
1134 	}
1135 
1136 	mutex_lock(&connection->resource->conf_update);
1137 	/* The discard_my_data flag is a single-shot modifier to the next
1138 	 * connection attempt, the handshake of which is now well underway.
1139 	 * No need for rcu style copying of the whole struct
1140 	 * just to clear a single value. */
1141 	connection->net_conf->discard_my_data = 0;
1142 	mutex_unlock(&connection->resource->conf_update);
1143 
1144 	return h;
1145 
1146 out_release_sockets:
1147 	if (ad.s_listen)
1148 		sock_release(ad.s_listen);
1149 	if (sock.socket)
1150 		sock_release(sock.socket);
1151 	if (msock.socket)
1152 		sock_release(msock.socket);
1153 	return -1;
1154 }
1155 
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158 	unsigned int header_size = drbd_header_size(connection);
1159 
1160 	if (header_size == sizeof(struct p_header100) &&
1161 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162 		struct p_header100 *h = header;
1163 		if (h->pad != 0) {
1164 			drbd_err(connection, "Header padding is not zero\n");
1165 			return -EINVAL;
1166 		}
1167 		pi->vnr = be16_to_cpu(h->volume);
1168 		pi->cmd = be16_to_cpu(h->command);
1169 		pi->size = be32_to_cpu(h->length);
1170 	} else if (header_size == sizeof(struct p_header95) &&
1171 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172 		struct p_header95 *h = header;
1173 		pi->cmd = be16_to_cpu(h->command);
1174 		pi->size = be32_to_cpu(h->length);
1175 		pi->vnr = 0;
1176 	} else if (header_size == sizeof(struct p_header80) &&
1177 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178 		struct p_header80 *h = header;
1179 		pi->cmd = be16_to_cpu(h->command);
1180 		pi->size = be16_to_cpu(h->length);
1181 		pi->vnr = 0;
1182 	} else {
1183 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184 			 be32_to_cpu(*(__be32 *)header),
1185 			 connection->agreed_pro_version);
1186 		return -EINVAL;
1187 	}
1188 	pi->data = header + header_size;
1189 	return 0;
1190 }
1191 
1192 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1193 {
1194 	if (current->plug == &connection->receiver_plug) {
1195 		blk_finish_plug(&connection->receiver_plug);
1196 		blk_start_plug(&connection->receiver_plug);
1197 	} /* else: maybe just schedule() ?? */
1198 }
1199 
1200 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1201 {
1202 	void *buffer = connection->data.rbuf;
1203 	int err;
1204 
1205 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1206 	if (err)
1207 		return err;
1208 
1209 	err = decode_header(connection, buffer, pi);
1210 	connection->last_received = jiffies;
1211 
1212 	return err;
1213 }
1214 
1215 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1216 {
1217 	void *buffer = connection->data.rbuf;
1218 	unsigned int size = drbd_header_size(connection);
1219 	int err;
1220 
1221 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1222 	if (err != size) {
1223 		/* If we have nothing in the receive buffer now, to reduce
1224 		 * application latency, try to drain the backend queues as
1225 		 * quickly as possible, and let remote TCP know what we have
1226 		 * received so far. */
1227 		if (err == -EAGAIN) {
1228 			tcp_sock_set_quickack(connection->data.socket->sk, 2);
1229 			drbd_unplug_all_devices(connection);
1230 		}
1231 		if (err > 0) {
1232 			buffer += err;
1233 			size -= err;
1234 		}
1235 		err = drbd_recv_all_warn(connection, buffer, size);
1236 		if (err)
1237 			return err;
1238 	}
1239 
1240 	err = decode_header(connection, connection->data.rbuf, pi);
1241 	connection->last_received = jiffies;
1242 
1243 	return err;
1244 }
1245 /* This is blkdev_issue_flush, but asynchronous.
1246  * We want to submit to all component volumes in parallel,
1247  * then wait for all completions.
1248  */
1249 struct issue_flush_context {
1250 	atomic_t pending;
1251 	int error;
1252 	struct completion done;
1253 };
1254 struct one_flush_context {
1255 	struct drbd_device *device;
1256 	struct issue_flush_context *ctx;
1257 };
1258 
1259 static void one_flush_endio(struct bio *bio)
1260 {
1261 	struct one_flush_context *octx = bio->bi_private;
1262 	struct drbd_device *device = octx->device;
1263 	struct issue_flush_context *ctx = octx->ctx;
1264 
1265 	if (bio->bi_status) {
1266 		ctx->error = blk_status_to_errno(bio->bi_status);
1267 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1268 	}
1269 	kfree(octx);
1270 	bio_put(bio);
1271 
1272 	clear_bit(FLUSH_PENDING, &device->flags);
1273 	put_ldev(device);
1274 	kref_put(&device->kref, drbd_destroy_device);
1275 
1276 	if (atomic_dec_and_test(&ctx->pending))
1277 		complete(&ctx->done);
1278 }
1279 
1280 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1281 {
1282 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1283 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1284 	if (!bio || !octx) {
1285 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1286 		/* FIXME: what else can I do now?  disconnecting or detaching
1287 		 * really does not help to improve the state of the world, either.
1288 		 */
1289 		kfree(octx);
1290 		if (bio)
1291 			bio_put(bio);
1292 
1293 		ctx->error = -ENOMEM;
1294 		put_ldev(device);
1295 		kref_put(&device->kref, drbd_destroy_device);
1296 		return;
1297 	}
1298 
1299 	octx->device = device;
1300 	octx->ctx = ctx;
1301 	bio_set_dev(bio, device->ldev->backing_bdev);
1302 	bio->bi_private = octx;
1303 	bio->bi_end_io = one_flush_endio;
1304 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1305 
1306 	device->flush_jif = jiffies;
1307 	set_bit(FLUSH_PENDING, &device->flags);
1308 	atomic_inc(&ctx->pending);
1309 	submit_bio(bio);
1310 }
1311 
1312 static void drbd_flush(struct drbd_connection *connection)
1313 {
1314 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1315 		struct drbd_peer_device *peer_device;
1316 		struct issue_flush_context ctx;
1317 		int vnr;
1318 
1319 		atomic_set(&ctx.pending, 1);
1320 		ctx.error = 0;
1321 		init_completion(&ctx.done);
1322 
1323 		rcu_read_lock();
1324 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1325 			struct drbd_device *device = peer_device->device;
1326 
1327 			if (!get_ldev(device))
1328 				continue;
1329 			kref_get(&device->kref);
1330 			rcu_read_unlock();
1331 
1332 			submit_one_flush(device, &ctx);
1333 
1334 			rcu_read_lock();
1335 		}
1336 		rcu_read_unlock();
1337 
1338 		/* Do we want to add a timeout,
1339 		 * if disk-timeout is set? */
1340 		if (!atomic_dec_and_test(&ctx.pending))
1341 			wait_for_completion(&ctx.done);
1342 
1343 		if (ctx.error) {
1344 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1345 			 * don't try again for ANY return value != 0
1346 			 * if (rv == -EOPNOTSUPP) */
1347 			/* Any error is already reported by bio_endio callback. */
1348 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1349 		}
1350 	}
1351 }
1352 
1353 /**
1354  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1355  * @device:	DRBD device.
1356  * @epoch:	Epoch object.
1357  * @ev:		Epoch event.
1358  */
1359 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1360 					       struct drbd_epoch *epoch,
1361 					       enum epoch_event ev)
1362 {
1363 	int epoch_size;
1364 	struct drbd_epoch *next_epoch;
1365 	enum finish_epoch rv = FE_STILL_LIVE;
1366 
1367 	spin_lock(&connection->epoch_lock);
1368 	do {
1369 		next_epoch = NULL;
1370 
1371 		epoch_size = atomic_read(&epoch->epoch_size);
1372 
1373 		switch (ev & ~EV_CLEANUP) {
1374 		case EV_PUT:
1375 			atomic_dec(&epoch->active);
1376 			break;
1377 		case EV_GOT_BARRIER_NR:
1378 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1379 			break;
1380 		case EV_BECAME_LAST:
1381 			/* nothing to do*/
1382 			break;
1383 		}
1384 
1385 		if (epoch_size != 0 &&
1386 		    atomic_read(&epoch->active) == 0 &&
1387 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1388 			if (!(ev & EV_CLEANUP)) {
1389 				spin_unlock(&connection->epoch_lock);
1390 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1391 				spin_lock(&connection->epoch_lock);
1392 			}
1393 #if 0
1394 			/* FIXME: dec unacked on connection, once we have
1395 			 * something to count pending connection packets in. */
1396 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1397 				dec_unacked(epoch->connection);
1398 #endif
1399 
1400 			if (connection->current_epoch != epoch) {
1401 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1402 				list_del(&epoch->list);
1403 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1404 				connection->epochs--;
1405 				kfree(epoch);
1406 
1407 				if (rv == FE_STILL_LIVE)
1408 					rv = FE_DESTROYED;
1409 			} else {
1410 				epoch->flags = 0;
1411 				atomic_set(&epoch->epoch_size, 0);
1412 				/* atomic_set(&epoch->active, 0); is already zero */
1413 				if (rv == FE_STILL_LIVE)
1414 					rv = FE_RECYCLED;
1415 			}
1416 		}
1417 
1418 		if (!next_epoch)
1419 			break;
1420 
1421 		epoch = next_epoch;
1422 	} while (1);
1423 
1424 	spin_unlock(&connection->epoch_lock);
1425 
1426 	return rv;
1427 }
1428 
1429 static enum write_ordering_e
1430 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1431 {
1432 	struct disk_conf *dc;
1433 
1434 	dc = rcu_dereference(bdev->disk_conf);
1435 
1436 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1437 		wo = WO_DRAIN_IO;
1438 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1439 		wo = WO_NONE;
1440 
1441 	return wo;
1442 }
1443 
1444 /**
1445  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1446  * @connection:	DRBD connection.
1447  * @wo:		Write ordering method to try.
1448  */
1449 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1450 			      enum write_ordering_e wo)
1451 {
1452 	struct drbd_device *device;
1453 	enum write_ordering_e pwo;
1454 	int vnr;
1455 	static char *write_ordering_str[] = {
1456 		[WO_NONE] = "none",
1457 		[WO_DRAIN_IO] = "drain",
1458 		[WO_BDEV_FLUSH] = "flush",
1459 	};
1460 
1461 	pwo = resource->write_ordering;
1462 	if (wo != WO_BDEV_FLUSH)
1463 		wo = min(pwo, wo);
1464 	rcu_read_lock();
1465 	idr_for_each_entry(&resource->devices, device, vnr) {
1466 		if (get_ldev(device)) {
1467 			wo = max_allowed_wo(device->ldev, wo);
1468 			if (device->ldev == bdev)
1469 				bdev = NULL;
1470 			put_ldev(device);
1471 		}
1472 	}
1473 
1474 	if (bdev)
1475 		wo = max_allowed_wo(bdev, wo);
1476 
1477 	rcu_read_unlock();
1478 
1479 	resource->write_ordering = wo;
1480 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1481 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1482 }
1483 
1484 /*
1485  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1486  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1487  * will directly go to fallback mode, submitting normal writes, and
1488  * never even try to UNMAP.
1489  *
1490  * And dm-thin does not do this (yet), mostly because in general it has
1491  * to assume that "skip_block_zeroing" is set.  See also:
1492  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1493  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1494  *
1495  * We *may* ignore the discard-zeroes-data setting, if so configured.
1496  *
1497  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1498  * may ignore partial unaligned discards.
1499  *
1500  * LVM/DM thin as of at least
1501  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1502  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1503  *   Driver version:  4.29.0
1504  * still behaves this way.
1505  *
1506  * For unaligned (wrt. alignment and granularity) or too small discards,
1507  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1508  * but discard all the aligned full chunks.
1509  *
1510  * At least for LVM/DM thin, with skip_block_zeroing=false,
1511  * the result is effectively "discard_zeroes_data=1".
1512  */
1513 /* flags: EE_TRIM|EE_ZEROOUT */
1514 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1515 {
1516 	struct block_device *bdev = device->ldev->backing_bdev;
1517 	struct request_queue *q = bdev_get_queue(bdev);
1518 	sector_t tmp, nr;
1519 	unsigned int max_discard_sectors, granularity;
1520 	int alignment;
1521 	int err = 0;
1522 
1523 	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1524 		goto zero_out;
1525 
1526 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1527 	granularity = max(q->limits.discard_granularity >> 9, 1U);
1528 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1529 
1530 	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1531 	max_discard_sectors -= max_discard_sectors % granularity;
1532 	if (unlikely(!max_discard_sectors))
1533 		goto zero_out;
1534 
1535 	if (nr_sectors < granularity)
1536 		goto zero_out;
1537 
1538 	tmp = start;
1539 	if (sector_div(tmp, granularity) != alignment) {
1540 		if (nr_sectors < 2*granularity)
1541 			goto zero_out;
1542 		/* start + gran - (start + gran - align) % gran */
1543 		tmp = start + granularity - alignment;
1544 		tmp = start + granularity - sector_div(tmp, granularity);
1545 
1546 		nr = tmp - start;
1547 		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1548 		 * layers are below us, some may have smaller granularity */
1549 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1550 		nr_sectors -= nr;
1551 		start = tmp;
1552 	}
1553 	while (nr_sectors >= max_discard_sectors) {
1554 		err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1555 		nr_sectors -= max_discard_sectors;
1556 		start += max_discard_sectors;
1557 	}
1558 	if (nr_sectors) {
1559 		/* max_discard_sectors is unsigned int (and a multiple of
1560 		 * granularity, we made sure of that above already);
1561 		 * nr is < max_discard_sectors;
1562 		 * I don't need sector_div here, even though nr is sector_t */
1563 		nr = nr_sectors;
1564 		nr -= (unsigned int)nr % granularity;
1565 		if (nr) {
1566 			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1567 			nr_sectors -= nr;
1568 			start += nr;
1569 		}
1570 	}
1571  zero_out:
1572 	if (nr_sectors) {
1573 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1574 				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1575 	}
1576 	return err != 0;
1577 }
1578 
1579 static bool can_do_reliable_discards(struct drbd_device *device)
1580 {
1581 	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1582 	struct disk_conf *dc;
1583 	bool can_do;
1584 
1585 	if (!blk_queue_discard(q))
1586 		return false;
1587 
1588 	rcu_read_lock();
1589 	dc = rcu_dereference(device->ldev->disk_conf);
1590 	can_do = dc->discard_zeroes_if_aligned;
1591 	rcu_read_unlock();
1592 	return can_do;
1593 }
1594 
1595 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1596 {
1597 	/* If the backend cannot discard, or does not guarantee
1598 	 * read-back zeroes in discarded ranges, we fall back to
1599 	 * zero-out.  Unless configuration specifically requested
1600 	 * otherwise. */
1601 	if (!can_do_reliable_discards(device))
1602 		peer_req->flags |= EE_ZEROOUT;
1603 
1604 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1605 	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1606 		peer_req->flags |= EE_WAS_ERROR;
1607 	drbd_endio_write_sec_final(peer_req);
1608 }
1609 
1610 static void drbd_issue_peer_wsame(struct drbd_device *device,
1611 				  struct drbd_peer_request *peer_req)
1612 {
1613 	struct block_device *bdev = device->ldev->backing_bdev;
1614 	sector_t s = peer_req->i.sector;
1615 	sector_t nr = peer_req->i.size >> 9;
1616 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1617 		peer_req->flags |= EE_WAS_ERROR;
1618 	drbd_endio_write_sec_final(peer_req);
1619 }
1620 
1621 
1622 /**
1623  * drbd_submit_peer_request()
1624  * @device:	DRBD device.
1625  * @peer_req:	peer request
1626  * @rw:		flag field, see bio->bi_opf
1627  *
1628  * May spread the pages to multiple bios,
1629  * depending on bio_add_page restrictions.
1630  *
1631  * Returns 0 if all bios have been submitted,
1632  * -ENOMEM if we could not allocate enough bios,
1633  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1634  *  single page to an empty bio (which should never happen and likely indicates
1635  *  that the lower level IO stack is in some way broken). This has been observed
1636  *  on certain Xen deployments.
1637  */
1638 /* TODO allocate from our own bio_set. */
1639 int drbd_submit_peer_request(struct drbd_device *device,
1640 			     struct drbd_peer_request *peer_req,
1641 			     const unsigned op, const unsigned op_flags,
1642 			     const int fault_type)
1643 {
1644 	struct bio *bios = NULL;
1645 	struct bio *bio;
1646 	struct page *page = peer_req->pages;
1647 	sector_t sector = peer_req->i.sector;
1648 	unsigned data_size = peer_req->i.size;
1649 	unsigned n_bios = 0;
1650 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1651 	int err = -ENOMEM;
1652 
1653 	/* TRIM/DISCARD: for now, always use the helper function
1654 	 * blkdev_issue_zeroout(..., discard=true).
1655 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1656 	 * Correctness first, performance later.  Next step is to code an
1657 	 * asynchronous variant of the same.
1658 	 */
1659 	if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1660 		/* wait for all pending IO completions, before we start
1661 		 * zeroing things out. */
1662 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1663 		/* add it to the active list now,
1664 		 * so we can find it to present it in debugfs */
1665 		peer_req->submit_jif = jiffies;
1666 		peer_req->flags |= EE_SUBMITTED;
1667 
1668 		/* If this was a resync request from receive_rs_deallocated(),
1669 		 * it is already on the sync_ee list */
1670 		if (list_empty(&peer_req->w.list)) {
1671 			spin_lock_irq(&device->resource->req_lock);
1672 			list_add_tail(&peer_req->w.list, &device->active_ee);
1673 			spin_unlock_irq(&device->resource->req_lock);
1674 		}
1675 
1676 		if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1677 			drbd_issue_peer_discard_or_zero_out(device, peer_req);
1678 		else /* EE_WRITE_SAME */
1679 			drbd_issue_peer_wsame(device, peer_req);
1680 		return 0;
1681 	}
1682 
1683 	/* In most cases, we will only need one bio.  But in case the lower
1684 	 * level restrictions happen to be different at this offset on this
1685 	 * side than those of the sending peer, we may need to submit the
1686 	 * request in more than one bio.
1687 	 *
1688 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1689 	 * generated bio, but a bio allocated on behalf of the peer.
1690 	 */
1691 next_bio:
1692 	bio = bio_alloc(GFP_NOIO, nr_pages);
1693 	if (!bio) {
1694 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1695 		goto fail;
1696 	}
1697 	/* > peer_req->i.sector, unless this is the first bio */
1698 	bio->bi_iter.bi_sector = sector;
1699 	bio_set_dev(bio, device->ldev->backing_bdev);
1700 	bio_set_op_attrs(bio, op, op_flags);
1701 	bio->bi_private = peer_req;
1702 	bio->bi_end_io = drbd_peer_request_endio;
1703 
1704 	bio->bi_next = bios;
1705 	bios = bio;
1706 	++n_bios;
1707 
1708 	page_chain_for_each(page) {
1709 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1710 		if (!bio_add_page(bio, page, len, 0))
1711 			goto next_bio;
1712 		data_size -= len;
1713 		sector += len >> 9;
1714 		--nr_pages;
1715 	}
1716 	D_ASSERT(device, data_size == 0);
1717 	D_ASSERT(device, page == NULL);
1718 
1719 	atomic_set(&peer_req->pending_bios, n_bios);
1720 	/* for debugfs: update timestamp, mark as submitted */
1721 	peer_req->submit_jif = jiffies;
1722 	peer_req->flags |= EE_SUBMITTED;
1723 	do {
1724 		bio = bios;
1725 		bios = bios->bi_next;
1726 		bio->bi_next = NULL;
1727 
1728 		drbd_submit_bio_noacct(device, fault_type, bio);
1729 	} while (bios);
1730 	return 0;
1731 
1732 fail:
1733 	while (bios) {
1734 		bio = bios;
1735 		bios = bios->bi_next;
1736 		bio_put(bio);
1737 	}
1738 	return err;
1739 }
1740 
1741 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1742 					     struct drbd_peer_request *peer_req)
1743 {
1744 	struct drbd_interval *i = &peer_req->i;
1745 
1746 	drbd_remove_interval(&device->write_requests, i);
1747 	drbd_clear_interval(i);
1748 
1749 	/* Wake up any processes waiting for this peer request to complete.  */
1750 	if (i->waiting)
1751 		wake_up(&device->misc_wait);
1752 }
1753 
1754 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1755 {
1756 	struct drbd_peer_device *peer_device;
1757 	int vnr;
1758 
1759 	rcu_read_lock();
1760 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1761 		struct drbd_device *device = peer_device->device;
1762 
1763 		kref_get(&device->kref);
1764 		rcu_read_unlock();
1765 		drbd_wait_ee_list_empty(device, &device->active_ee);
1766 		kref_put(&device->kref, drbd_destroy_device);
1767 		rcu_read_lock();
1768 	}
1769 	rcu_read_unlock();
1770 }
1771 
1772 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1773 {
1774 	int rv;
1775 	struct p_barrier *p = pi->data;
1776 	struct drbd_epoch *epoch;
1777 
1778 	/* FIXME these are unacked on connection,
1779 	 * not a specific (peer)device.
1780 	 */
1781 	connection->current_epoch->barrier_nr = p->barrier;
1782 	connection->current_epoch->connection = connection;
1783 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1784 
1785 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1786 	 * the activity log, which means it would not be resynced in case the
1787 	 * R_PRIMARY crashes now.
1788 	 * Therefore we must send the barrier_ack after the barrier request was
1789 	 * completed. */
1790 	switch (connection->resource->write_ordering) {
1791 	case WO_NONE:
1792 		if (rv == FE_RECYCLED)
1793 			return 0;
1794 
1795 		/* receiver context, in the writeout path of the other node.
1796 		 * avoid potential distributed deadlock */
1797 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1798 		if (epoch)
1799 			break;
1800 		else
1801 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1802 		fallthrough;
1803 
1804 	case WO_BDEV_FLUSH:
1805 	case WO_DRAIN_IO:
1806 		conn_wait_active_ee_empty(connection);
1807 		drbd_flush(connection);
1808 
1809 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1810 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1811 			if (epoch)
1812 				break;
1813 		}
1814 
1815 		return 0;
1816 	default:
1817 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1818 			 connection->resource->write_ordering);
1819 		return -EIO;
1820 	}
1821 
1822 	epoch->flags = 0;
1823 	atomic_set(&epoch->epoch_size, 0);
1824 	atomic_set(&epoch->active, 0);
1825 
1826 	spin_lock(&connection->epoch_lock);
1827 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1828 		list_add(&epoch->list, &connection->current_epoch->list);
1829 		connection->current_epoch = epoch;
1830 		connection->epochs++;
1831 	} else {
1832 		/* The current_epoch got recycled while we allocated this one... */
1833 		kfree(epoch);
1834 	}
1835 	spin_unlock(&connection->epoch_lock);
1836 
1837 	return 0;
1838 }
1839 
1840 /* quick wrapper in case payload size != request_size (write same) */
1841 static void drbd_csum_ee_size(struct crypto_shash *h,
1842 			      struct drbd_peer_request *r, void *d,
1843 			      unsigned int payload_size)
1844 {
1845 	unsigned int tmp = r->i.size;
1846 	r->i.size = payload_size;
1847 	drbd_csum_ee(h, r, d);
1848 	r->i.size = tmp;
1849 }
1850 
1851 /* used from receive_RSDataReply (recv_resync_read)
1852  * and from receive_Data.
1853  * data_size: actual payload ("data in")
1854  * 	for normal writes that is bi_size.
1855  * 	for discards, that is zero.
1856  * 	for write same, it is logical_block_size.
1857  * both trim and write same have the bi_size ("data len to be affected")
1858  * as extra argument in the packet header.
1859  */
1860 static struct drbd_peer_request *
1861 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1862 	      struct packet_info *pi) __must_hold(local)
1863 {
1864 	struct drbd_device *device = peer_device->device;
1865 	const sector_t capacity = get_capacity(device->vdisk);
1866 	struct drbd_peer_request *peer_req;
1867 	struct page *page;
1868 	int digest_size, err;
1869 	unsigned int data_size = pi->size, ds;
1870 	void *dig_in = peer_device->connection->int_dig_in;
1871 	void *dig_vv = peer_device->connection->int_dig_vv;
1872 	unsigned long *data;
1873 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1874 	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1875 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1876 
1877 	digest_size = 0;
1878 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1879 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1880 		/*
1881 		 * FIXME: Receive the incoming digest into the receive buffer
1882 		 *	  here, together with its struct p_data?
1883 		 */
1884 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1885 		if (err)
1886 			return NULL;
1887 		data_size -= digest_size;
1888 	}
1889 
1890 	/* assume request_size == data_size, but special case trim and wsame. */
1891 	ds = data_size;
1892 	if (trim) {
1893 		if (!expect(data_size == 0))
1894 			return NULL;
1895 		ds = be32_to_cpu(trim->size);
1896 	} else if (zeroes) {
1897 		if (!expect(data_size == 0))
1898 			return NULL;
1899 		ds = be32_to_cpu(zeroes->size);
1900 	} else if (wsame) {
1901 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1902 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1903 				data_size, queue_logical_block_size(device->rq_queue));
1904 			return NULL;
1905 		}
1906 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1907 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1908 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1909 			return NULL;
1910 		}
1911 		ds = be32_to_cpu(wsame->size);
1912 	}
1913 
1914 	if (!expect(IS_ALIGNED(ds, 512)))
1915 		return NULL;
1916 	if (trim || wsame || zeroes) {
1917 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1918 			return NULL;
1919 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1920 		return NULL;
1921 
1922 	/* even though we trust out peer,
1923 	 * we sometimes have to double check. */
1924 	if (sector + (ds>>9) > capacity) {
1925 		drbd_err(device, "request from peer beyond end of local disk: "
1926 			"capacity: %llus < sector: %llus + size: %u\n",
1927 			(unsigned long long)capacity,
1928 			(unsigned long long)sector, ds);
1929 		return NULL;
1930 	}
1931 
1932 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1933 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1934 	 * which in turn might block on the other node at this very place.  */
1935 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1936 	if (!peer_req)
1937 		return NULL;
1938 
1939 	peer_req->flags |= EE_WRITE;
1940 	if (trim) {
1941 		peer_req->flags |= EE_TRIM;
1942 		return peer_req;
1943 	}
1944 	if (zeroes) {
1945 		peer_req->flags |= EE_ZEROOUT;
1946 		return peer_req;
1947 	}
1948 	if (wsame)
1949 		peer_req->flags |= EE_WRITE_SAME;
1950 
1951 	/* receive payload size bytes into page chain */
1952 	ds = data_size;
1953 	page = peer_req->pages;
1954 	page_chain_for_each(page) {
1955 		unsigned len = min_t(int, ds, PAGE_SIZE);
1956 		data = kmap(page);
1957 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1958 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1959 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1960 			data[0] = data[0] ^ (unsigned long)-1;
1961 		}
1962 		kunmap(page);
1963 		if (err) {
1964 			drbd_free_peer_req(device, peer_req);
1965 			return NULL;
1966 		}
1967 		ds -= len;
1968 	}
1969 
1970 	if (digest_size) {
1971 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1972 		if (memcmp(dig_in, dig_vv, digest_size)) {
1973 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1974 				(unsigned long long)sector, data_size);
1975 			drbd_free_peer_req(device, peer_req);
1976 			return NULL;
1977 		}
1978 	}
1979 	device->recv_cnt += data_size >> 9;
1980 	return peer_req;
1981 }
1982 
1983 /* drbd_drain_block() just takes a data block
1984  * out of the socket input buffer, and discards it.
1985  */
1986 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1987 {
1988 	struct page *page;
1989 	int err = 0;
1990 	void *data;
1991 
1992 	if (!data_size)
1993 		return 0;
1994 
1995 	page = drbd_alloc_pages(peer_device, 1, 1);
1996 
1997 	data = kmap(page);
1998 	while (data_size) {
1999 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
2000 
2001 		err = drbd_recv_all_warn(peer_device->connection, data, len);
2002 		if (err)
2003 			break;
2004 		data_size -= len;
2005 	}
2006 	kunmap(page);
2007 	drbd_free_pages(peer_device->device, page, 0);
2008 	return err;
2009 }
2010 
2011 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2012 			   sector_t sector, int data_size)
2013 {
2014 	struct bio_vec bvec;
2015 	struct bvec_iter iter;
2016 	struct bio *bio;
2017 	int digest_size, err, expect;
2018 	void *dig_in = peer_device->connection->int_dig_in;
2019 	void *dig_vv = peer_device->connection->int_dig_vv;
2020 
2021 	digest_size = 0;
2022 	if (peer_device->connection->peer_integrity_tfm) {
2023 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2024 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2025 		if (err)
2026 			return err;
2027 		data_size -= digest_size;
2028 	}
2029 
2030 	/* optimistically update recv_cnt.  if receiving fails below,
2031 	 * we disconnect anyways, and counters will be reset. */
2032 	peer_device->device->recv_cnt += data_size>>9;
2033 
2034 	bio = req->master_bio;
2035 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2036 
2037 	bio_for_each_segment(bvec, bio, iter) {
2038 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2039 		expect = min_t(int, data_size, bvec.bv_len);
2040 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2041 		kunmap(bvec.bv_page);
2042 		if (err)
2043 			return err;
2044 		data_size -= expect;
2045 	}
2046 
2047 	if (digest_size) {
2048 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2049 		if (memcmp(dig_in, dig_vv, digest_size)) {
2050 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2051 			return -EINVAL;
2052 		}
2053 	}
2054 
2055 	D_ASSERT(peer_device->device, data_size == 0);
2056 	return 0;
2057 }
2058 
2059 /*
2060  * e_end_resync_block() is called in ack_sender context via
2061  * drbd_finish_peer_reqs().
2062  */
2063 static int e_end_resync_block(struct drbd_work *w, int unused)
2064 {
2065 	struct drbd_peer_request *peer_req =
2066 		container_of(w, struct drbd_peer_request, w);
2067 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2068 	struct drbd_device *device = peer_device->device;
2069 	sector_t sector = peer_req->i.sector;
2070 	int err;
2071 
2072 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2073 
2074 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2075 		drbd_set_in_sync(device, sector, peer_req->i.size);
2076 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2077 	} else {
2078 		/* Record failure to sync */
2079 		drbd_rs_failed_io(device, sector, peer_req->i.size);
2080 
2081 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2082 	}
2083 	dec_unacked(device);
2084 
2085 	return err;
2086 }
2087 
2088 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2089 			    struct packet_info *pi) __releases(local)
2090 {
2091 	struct drbd_device *device = peer_device->device;
2092 	struct drbd_peer_request *peer_req;
2093 
2094 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2095 	if (!peer_req)
2096 		goto fail;
2097 
2098 	dec_rs_pending(device);
2099 
2100 	inc_unacked(device);
2101 	/* corresponding dec_unacked() in e_end_resync_block()
2102 	 * respective _drbd_clear_done_ee */
2103 
2104 	peer_req->w.cb = e_end_resync_block;
2105 	peer_req->submit_jif = jiffies;
2106 
2107 	spin_lock_irq(&device->resource->req_lock);
2108 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2109 	spin_unlock_irq(&device->resource->req_lock);
2110 
2111 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2112 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2113 				     DRBD_FAULT_RS_WR) == 0)
2114 		return 0;
2115 
2116 	/* don't care for the reason here */
2117 	drbd_err(device, "submit failed, triggering re-connect\n");
2118 	spin_lock_irq(&device->resource->req_lock);
2119 	list_del(&peer_req->w.list);
2120 	spin_unlock_irq(&device->resource->req_lock);
2121 
2122 	drbd_free_peer_req(device, peer_req);
2123 fail:
2124 	put_ldev(device);
2125 	return -EIO;
2126 }
2127 
2128 static struct drbd_request *
2129 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2130 	     sector_t sector, bool missing_ok, const char *func)
2131 {
2132 	struct drbd_request *req;
2133 
2134 	/* Request object according to our peer */
2135 	req = (struct drbd_request *)(unsigned long)id;
2136 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2137 		return req;
2138 	if (!missing_ok) {
2139 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2140 			(unsigned long)id, (unsigned long long)sector);
2141 	}
2142 	return NULL;
2143 }
2144 
2145 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2146 {
2147 	struct drbd_peer_device *peer_device;
2148 	struct drbd_device *device;
2149 	struct drbd_request *req;
2150 	sector_t sector;
2151 	int err;
2152 	struct p_data *p = pi->data;
2153 
2154 	peer_device = conn_peer_device(connection, pi->vnr);
2155 	if (!peer_device)
2156 		return -EIO;
2157 	device = peer_device->device;
2158 
2159 	sector = be64_to_cpu(p->sector);
2160 
2161 	spin_lock_irq(&device->resource->req_lock);
2162 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2163 	spin_unlock_irq(&device->resource->req_lock);
2164 	if (unlikely(!req))
2165 		return -EIO;
2166 
2167 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2168 	 * special casing it there for the various failure cases.
2169 	 * still no race with drbd_fail_pending_reads */
2170 	err = recv_dless_read(peer_device, req, sector, pi->size);
2171 	if (!err)
2172 		req_mod(req, DATA_RECEIVED);
2173 	/* else: nothing. handled from drbd_disconnect...
2174 	 * I don't think we may complete this just yet
2175 	 * in case we are "on-disconnect: freeze" */
2176 
2177 	return err;
2178 }
2179 
2180 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2181 {
2182 	struct drbd_peer_device *peer_device;
2183 	struct drbd_device *device;
2184 	sector_t sector;
2185 	int err;
2186 	struct p_data *p = pi->data;
2187 
2188 	peer_device = conn_peer_device(connection, pi->vnr);
2189 	if (!peer_device)
2190 		return -EIO;
2191 	device = peer_device->device;
2192 
2193 	sector = be64_to_cpu(p->sector);
2194 	D_ASSERT(device, p->block_id == ID_SYNCER);
2195 
2196 	if (get_ldev(device)) {
2197 		/* data is submitted to disk within recv_resync_read.
2198 		 * corresponding put_ldev done below on error,
2199 		 * or in drbd_peer_request_endio. */
2200 		err = recv_resync_read(peer_device, sector, pi);
2201 	} else {
2202 		if (__ratelimit(&drbd_ratelimit_state))
2203 			drbd_err(device, "Can not write resync data to local disk.\n");
2204 
2205 		err = drbd_drain_block(peer_device, pi->size);
2206 
2207 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2208 	}
2209 
2210 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2211 
2212 	return err;
2213 }
2214 
2215 static void restart_conflicting_writes(struct drbd_device *device,
2216 				       sector_t sector, int size)
2217 {
2218 	struct drbd_interval *i;
2219 	struct drbd_request *req;
2220 
2221 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2222 		if (!i->local)
2223 			continue;
2224 		req = container_of(i, struct drbd_request, i);
2225 		if (req->rq_state & RQ_LOCAL_PENDING ||
2226 		    !(req->rq_state & RQ_POSTPONED))
2227 			continue;
2228 		/* as it is RQ_POSTPONED, this will cause it to
2229 		 * be queued on the retry workqueue. */
2230 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2231 	}
2232 }
2233 
2234 /*
2235  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2236  */
2237 static int e_end_block(struct drbd_work *w, int cancel)
2238 {
2239 	struct drbd_peer_request *peer_req =
2240 		container_of(w, struct drbd_peer_request, w);
2241 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2242 	struct drbd_device *device = peer_device->device;
2243 	sector_t sector = peer_req->i.sector;
2244 	int err = 0, pcmd;
2245 
2246 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2247 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2248 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2249 				device->state.conn <= C_PAUSED_SYNC_T &&
2250 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2251 				P_RS_WRITE_ACK : P_WRITE_ACK;
2252 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2253 			if (pcmd == P_RS_WRITE_ACK)
2254 				drbd_set_in_sync(device, sector, peer_req->i.size);
2255 		} else {
2256 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2257 			/* we expect it to be marked out of sync anyways...
2258 			 * maybe assert this?  */
2259 		}
2260 		dec_unacked(device);
2261 	}
2262 
2263 	/* we delete from the conflict detection hash _after_ we sent out the
2264 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2265 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2266 		spin_lock_irq(&device->resource->req_lock);
2267 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2268 		drbd_remove_epoch_entry_interval(device, peer_req);
2269 		if (peer_req->flags & EE_RESTART_REQUESTS)
2270 			restart_conflicting_writes(device, sector, peer_req->i.size);
2271 		spin_unlock_irq(&device->resource->req_lock);
2272 	} else
2273 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2274 
2275 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2276 
2277 	return err;
2278 }
2279 
2280 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2281 {
2282 	struct drbd_peer_request *peer_req =
2283 		container_of(w, struct drbd_peer_request, w);
2284 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2285 	int err;
2286 
2287 	err = drbd_send_ack(peer_device, ack, peer_req);
2288 	dec_unacked(peer_device->device);
2289 
2290 	return err;
2291 }
2292 
2293 static int e_send_superseded(struct drbd_work *w, int unused)
2294 {
2295 	return e_send_ack(w, P_SUPERSEDED);
2296 }
2297 
2298 static int e_send_retry_write(struct drbd_work *w, int unused)
2299 {
2300 	struct drbd_peer_request *peer_req =
2301 		container_of(w, struct drbd_peer_request, w);
2302 	struct drbd_connection *connection = peer_req->peer_device->connection;
2303 
2304 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2305 			     P_RETRY_WRITE : P_SUPERSEDED);
2306 }
2307 
2308 static bool seq_greater(u32 a, u32 b)
2309 {
2310 	/*
2311 	 * We assume 32-bit wrap-around here.
2312 	 * For 24-bit wrap-around, we would have to shift:
2313 	 *  a <<= 8; b <<= 8;
2314 	 */
2315 	return (s32)a - (s32)b > 0;
2316 }
2317 
2318 static u32 seq_max(u32 a, u32 b)
2319 {
2320 	return seq_greater(a, b) ? a : b;
2321 }
2322 
2323 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2324 {
2325 	struct drbd_device *device = peer_device->device;
2326 	unsigned int newest_peer_seq;
2327 
2328 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2329 		spin_lock(&device->peer_seq_lock);
2330 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2331 		device->peer_seq = newest_peer_seq;
2332 		spin_unlock(&device->peer_seq_lock);
2333 		/* wake up only if we actually changed device->peer_seq */
2334 		if (peer_seq == newest_peer_seq)
2335 			wake_up(&device->seq_wait);
2336 	}
2337 }
2338 
2339 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2340 {
2341 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2342 }
2343 
2344 /* maybe change sync_ee into interval trees as well? */
2345 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2346 {
2347 	struct drbd_peer_request *rs_req;
2348 	bool rv = false;
2349 
2350 	spin_lock_irq(&device->resource->req_lock);
2351 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2352 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2353 			     rs_req->i.sector, rs_req->i.size)) {
2354 			rv = true;
2355 			break;
2356 		}
2357 	}
2358 	spin_unlock_irq(&device->resource->req_lock);
2359 
2360 	return rv;
2361 }
2362 
2363 /* Called from receive_Data.
2364  * Synchronize packets on sock with packets on msock.
2365  *
2366  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2367  * packet traveling on msock, they are still processed in the order they have
2368  * been sent.
2369  *
2370  * Note: we don't care for Ack packets overtaking P_DATA packets.
2371  *
2372  * In case packet_seq is larger than device->peer_seq number, there are
2373  * outstanding packets on the msock. We wait for them to arrive.
2374  * In case we are the logically next packet, we update device->peer_seq
2375  * ourselves. Correctly handles 32bit wrap around.
2376  *
2377  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2378  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2379  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2380  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2381  *
2382  * returns 0 if we may process the packet,
2383  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2384 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2385 {
2386 	struct drbd_device *device = peer_device->device;
2387 	DEFINE_WAIT(wait);
2388 	long timeout;
2389 	int ret = 0, tp;
2390 
2391 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2392 		return 0;
2393 
2394 	spin_lock(&device->peer_seq_lock);
2395 	for (;;) {
2396 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2397 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2398 			break;
2399 		}
2400 
2401 		if (signal_pending(current)) {
2402 			ret = -ERESTARTSYS;
2403 			break;
2404 		}
2405 
2406 		rcu_read_lock();
2407 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2408 		rcu_read_unlock();
2409 
2410 		if (!tp)
2411 			break;
2412 
2413 		/* Only need to wait if two_primaries is enabled */
2414 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2415 		spin_unlock(&device->peer_seq_lock);
2416 		rcu_read_lock();
2417 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2418 		rcu_read_unlock();
2419 		timeout = schedule_timeout(timeout);
2420 		spin_lock(&device->peer_seq_lock);
2421 		if (!timeout) {
2422 			ret = -ETIMEDOUT;
2423 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2424 			break;
2425 		}
2426 	}
2427 	spin_unlock(&device->peer_seq_lock);
2428 	finish_wait(&device->seq_wait, &wait);
2429 	return ret;
2430 }
2431 
2432 /* see also bio_flags_to_wire()
2433  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2434  * flags and back. We may replicate to other kernel versions. */
2435 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2436 {
2437 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2438 		(dpf & DP_FUA ? REQ_FUA : 0) |
2439 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2440 }
2441 
2442 static unsigned long wire_flags_to_bio_op(u32 dpf)
2443 {
2444 	if (dpf & DP_ZEROES)
2445 		return REQ_OP_WRITE_ZEROES;
2446 	if (dpf & DP_DISCARD)
2447 		return REQ_OP_DISCARD;
2448 	if (dpf & DP_WSAME)
2449 		return REQ_OP_WRITE_SAME;
2450 	else
2451 		return REQ_OP_WRITE;
2452 }
2453 
2454 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2455 				    unsigned int size)
2456 {
2457 	struct drbd_interval *i;
2458 
2459     repeat:
2460 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2461 		struct drbd_request *req;
2462 		struct bio_and_error m;
2463 
2464 		if (!i->local)
2465 			continue;
2466 		req = container_of(i, struct drbd_request, i);
2467 		if (!(req->rq_state & RQ_POSTPONED))
2468 			continue;
2469 		req->rq_state &= ~RQ_POSTPONED;
2470 		__req_mod(req, NEG_ACKED, &m);
2471 		spin_unlock_irq(&device->resource->req_lock);
2472 		if (m.bio)
2473 			complete_master_bio(device, &m);
2474 		spin_lock_irq(&device->resource->req_lock);
2475 		goto repeat;
2476 	}
2477 }
2478 
2479 static int handle_write_conflicts(struct drbd_device *device,
2480 				  struct drbd_peer_request *peer_req)
2481 {
2482 	struct drbd_connection *connection = peer_req->peer_device->connection;
2483 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2484 	sector_t sector = peer_req->i.sector;
2485 	const unsigned int size = peer_req->i.size;
2486 	struct drbd_interval *i;
2487 	bool equal;
2488 	int err;
2489 
2490 	/*
2491 	 * Inserting the peer request into the write_requests tree will prevent
2492 	 * new conflicting local requests from being added.
2493 	 */
2494 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2495 
2496     repeat:
2497 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2498 		if (i == &peer_req->i)
2499 			continue;
2500 		if (i->completed)
2501 			continue;
2502 
2503 		if (!i->local) {
2504 			/*
2505 			 * Our peer has sent a conflicting remote request; this
2506 			 * should not happen in a two-node setup.  Wait for the
2507 			 * earlier peer request to complete.
2508 			 */
2509 			err = drbd_wait_misc(device, i);
2510 			if (err)
2511 				goto out;
2512 			goto repeat;
2513 		}
2514 
2515 		equal = i->sector == sector && i->size == size;
2516 		if (resolve_conflicts) {
2517 			/*
2518 			 * If the peer request is fully contained within the
2519 			 * overlapping request, it can be considered overwritten
2520 			 * and thus superseded; otherwise, it will be retried
2521 			 * once all overlapping requests have completed.
2522 			 */
2523 			bool superseded = i->sector <= sector && i->sector +
2524 				       (i->size >> 9) >= sector + (size >> 9);
2525 
2526 			if (!equal)
2527 				drbd_alert(device, "Concurrent writes detected: "
2528 					       "local=%llus +%u, remote=%llus +%u, "
2529 					       "assuming %s came first\n",
2530 					  (unsigned long long)i->sector, i->size,
2531 					  (unsigned long long)sector, size,
2532 					  superseded ? "local" : "remote");
2533 
2534 			peer_req->w.cb = superseded ? e_send_superseded :
2535 						   e_send_retry_write;
2536 			list_add_tail(&peer_req->w.list, &device->done_ee);
2537 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2538 
2539 			err = -ENOENT;
2540 			goto out;
2541 		} else {
2542 			struct drbd_request *req =
2543 				container_of(i, struct drbd_request, i);
2544 
2545 			if (!equal)
2546 				drbd_alert(device, "Concurrent writes detected: "
2547 					       "local=%llus +%u, remote=%llus +%u\n",
2548 					  (unsigned long long)i->sector, i->size,
2549 					  (unsigned long long)sector, size);
2550 
2551 			if (req->rq_state & RQ_LOCAL_PENDING ||
2552 			    !(req->rq_state & RQ_POSTPONED)) {
2553 				/*
2554 				 * Wait for the node with the discard flag to
2555 				 * decide if this request has been superseded
2556 				 * or needs to be retried.
2557 				 * Requests that have been superseded will
2558 				 * disappear from the write_requests tree.
2559 				 *
2560 				 * In addition, wait for the conflicting
2561 				 * request to finish locally before submitting
2562 				 * the conflicting peer request.
2563 				 */
2564 				err = drbd_wait_misc(device, &req->i);
2565 				if (err) {
2566 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2567 					fail_postponed_requests(device, sector, size);
2568 					goto out;
2569 				}
2570 				goto repeat;
2571 			}
2572 			/*
2573 			 * Remember to restart the conflicting requests after
2574 			 * the new peer request has completed.
2575 			 */
2576 			peer_req->flags |= EE_RESTART_REQUESTS;
2577 		}
2578 	}
2579 	err = 0;
2580 
2581     out:
2582 	if (err)
2583 		drbd_remove_epoch_entry_interval(device, peer_req);
2584 	return err;
2585 }
2586 
2587 /* mirrored write */
2588 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2589 {
2590 	struct drbd_peer_device *peer_device;
2591 	struct drbd_device *device;
2592 	struct net_conf *nc;
2593 	sector_t sector;
2594 	struct drbd_peer_request *peer_req;
2595 	struct p_data *p = pi->data;
2596 	u32 peer_seq = be32_to_cpu(p->seq_num);
2597 	int op, op_flags;
2598 	u32 dp_flags;
2599 	int err, tp;
2600 
2601 	peer_device = conn_peer_device(connection, pi->vnr);
2602 	if (!peer_device)
2603 		return -EIO;
2604 	device = peer_device->device;
2605 
2606 	if (!get_ldev(device)) {
2607 		int err2;
2608 
2609 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2610 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2611 		atomic_inc(&connection->current_epoch->epoch_size);
2612 		err2 = drbd_drain_block(peer_device, pi->size);
2613 		if (!err)
2614 			err = err2;
2615 		return err;
2616 	}
2617 
2618 	/*
2619 	 * Corresponding put_ldev done either below (on various errors), or in
2620 	 * drbd_peer_request_endio, if we successfully submit the data at the
2621 	 * end of this function.
2622 	 */
2623 
2624 	sector = be64_to_cpu(p->sector);
2625 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2626 	if (!peer_req) {
2627 		put_ldev(device);
2628 		return -EIO;
2629 	}
2630 
2631 	peer_req->w.cb = e_end_block;
2632 	peer_req->submit_jif = jiffies;
2633 	peer_req->flags |= EE_APPLICATION;
2634 
2635 	dp_flags = be32_to_cpu(p->dp_flags);
2636 	op = wire_flags_to_bio_op(dp_flags);
2637 	op_flags = wire_flags_to_bio_flags(dp_flags);
2638 	if (pi->cmd == P_TRIM) {
2639 		D_ASSERT(peer_device, peer_req->i.size > 0);
2640 		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2641 		D_ASSERT(peer_device, peer_req->pages == NULL);
2642 		/* need to play safe: an older DRBD sender
2643 		 * may mean zero-out while sending P_TRIM. */
2644 		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2645 			peer_req->flags |= EE_ZEROOUT;
2646 	} else if (pi->cmd == P_ZEROES) {
2647 		D_ASSERT(peer_device, peer_req->i.size > 0);
2648 		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2649 		D_ASSERT(peer_device, peer_req->pages == NULL);
2650 		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2651 		if (dp_flags & DP_DISCARD)
2652 			peer_req->flags |= EE_TRIM;
2653 	} else if (peer_req->pages == NULL) {
2654 		D_ASSERT(device, peer_req->i.size == 0);
2655 		D_ASSERT(device, dp_flags & DP_FLUSH);
2656 	}
2657 
2658 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2659 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2660 
2661 	spin_lock(&connection->epoch_lock);
2662 	peer_req->epoch = connection->current_epoch;
2663 	atomic_inc(&peer_req->epoch->epoch_size);
2664 	atomic_inc(&peer_req->epoch->active);
2665 	spin_unlock(&connection->epoch_lock);
2666 
2667 	rcu_read_lock();
2668 	nc = rcu_dereference(peer_device->connection->net_conf);
2669 	tp = nc->two_primaries;
2670 	if (peer_device->connection->agreed_pro_version < 100) {
2671 		switch (nc->wire_protocol) {
2672 		case DRBD_PROT_C:
2673 			dp_flags |= DP_SEND_WRITE_ACK;
2674 			break;
2675 		case DRBD_PROT_B:
2676 			dp_flags |= DP_SEND_RECEIVE_ACK;
2677 			break;
2678 		}
2679 	}
2680 	rcu_read_unlock();
2681 
2682 	if (dp_flags & DP_SEND_WRITE_ACK) {
2683 		peer_req->flags |= EE_SEND_WRITE_ACK;
2684 		inc_unacked(device);
2685 		/* corresponding dec_unacked() in e_end_block()
2686 		 * respective _drbd_clear_done_ee */
2687 	}
2688 
2689 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2690 		/* I really don't like it that the receiver thread
2691 		 * sends on the msock, but anyways */
2692 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2693 	}
2694 
2695 	if (tp) {
2696 		/* two primaries implies protocol C */
2697 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2698 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2699 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2700 		if (err)
2701 			goto out_interrupted;
2702 		spin_lock_irq(&device->resource->req_lock);
2703 		err = handle_write_conflicts(device, peer_req);
2704 		if (err) {
2705 			spin_unlock_irq(&device->resource->req_lock);
2706 			if (err == -ENOENT) {
2707 				put_ldev(device);
2708 				return 0;
2709 			}
2710 			goto out_interrupted;
2711 		}
2712 	} else {
2713 		update_peer_seq(peer_device, peer_seq);
2714 		spin_lock_irq(&device->resource->req_lock);
2715 	}
2716 	/* TRIM and WRITE_SAME are processed synchronously,
2717 	 * we wait for all pending requests, respectively wait for
2718 	 * active_ee to become empty in drbd_submit_peer_request();
2719 	 * better not add ourselves here. */
2720 	if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2721 		list_add_tail(&peer_req->w.list, &device->active_ee);
2722 	spin_unlock_irq(&device->resource->req_lock);
2723 
2724 	if (device->state.conn == C_SYNC_TARGET)
2725 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2726 
2727 	if (device->state.pdsk < D_INCONSISTENT) {
2728 		/* In case we have the only disk of the cluster, */
2729 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2730 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2731 		drbd_al_begin_io(device, &peer_req->i);
2732 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2733 	}
2734 
2735 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2736 				       DRBD_FAULT_DT_WR);
2737 	if (!err)
2738 		return 0;
2739 
2740 	/* don't care for the reason here */
2741 	drbd_err(device, "submit failed, triggering re-connect\n");
2742 	spin_lock_irq(&device->resource->req_lock);
2743 	list_del(&peer_req->w.list);
2744 	drbd_remove_epoch_entry_interval(device, peer_req);
2745 	spin_unlock_irq(&device->resource->req_lock);
2746 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2747 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2748 		drbd_al_complete_io(device, &peer_req->i);
2749 	}
2750 
2751 out_interrupted:
2752 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2753 	put_ldev(device);
2754 	drbd_free_peer_req(device, peer_req);
2755 	return err;
2756 }
2757 
2758 /* We may throttle resync, if the lower device seems to be busy,
2759  * and current sync rate is above c_min_rate.
2760  *
2761  * To decide whether or not the lower device is busy, we use a scheme similar
2762  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2763  * (more than 64 sectors) of activity we cannot account for with our own resync
2764  * activity, it obviously is "busy".
2765  *
2766  * The current sync rate used here uses only the most recent two step marks,
2767  * to have a short time average so we can react faster.
2768  */
2769 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2770 		bool throttle_if_app_is_waiting)
2771 {
2772 	struct lc_element *tmp;
2773 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2774 
2775 	if (!throttle || throttle_if_app_is_waiting)
2776 		return throttle;
2777 
2778 	spin_lock_irq(&device->al_lock);
2779 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2780 	if (tmp) {
2781 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2782 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2783 			throttle = false;
2784 		/* Do not slow down if app IO is already waiting for this extent,
2785 		 * and our progress is necessary for application IO to complete. */
2786 	}
2787 	spin_unlock_irq(&device->al_lock);
2788 
2789 	return throttle;
2790 }
2791 
2792 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2793 {
2794 	struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2795 	unsigned long db, dt, dbdt;
2796 	unsigned int c_min_rate;
2797 	int curr_events;
2798 
2799 	rcu_read_lock();
2800 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2801 	rcu_read_unlock();
2802 
2803 	/* feature disabled? */
2804 	if (c_min_rate == 0)
2805 		return false;
2806 
2807 	curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2808 			atomic_read(&device->rs_sect_ev);
2809 
2810 	if (atomic_read(&device->ap_actlog_cnt)
2811 	    || curr_events - device->rs_last_events > 64) {
2812 		unsigned long rs_left;
2813 		int i;
2814 
2815 		device->rs_last_events = curr_events;
2816 
2817 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2818 		 * approx. */
2819 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2820 
2821 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2822 			rs_left = device->ov_left;
2823 		else
2824 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2825 
2826 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2827 		if (!dt)
2828 			dt++;
2829 		db = device->rs_mark_left[i] - rs_left;
2830 		dbdt = Bit2KB(db/dt);
2831 
2832 		if (dbdt > c_min_rate)
2833 			return true;
2834 	}
2835 	return false;
2836 }
2837 
2838 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2839 {
2840 	struct drbd_peer_device *peer_device;
2841 	struct drbd_device *device;
2842 	sector_t sector;
2843 	sector_t capacity;
2844 	struct drbd_peer_request *peer_req;
2845 	struct digest_info *di = NULL;
2846 	int size, verb;
2847 	unsigned int fault_type;
2848 	struct p_block_req *p =	pi->data;
2849 
2850 	peer_device = conn_peer_device(connection, pi->vnr);
2851 	if (!peer_device)
2852 		return -EIO;
2853 	device = peer_device->device;
2854 	capacity = get_capacity(device->vdisk);
2855 
2856 	sector = be64_to_cpu(p->sector);
2857 	size   = be32_to_cpu(p->blksize);
2858 
2859 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2860 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2861 				(unsigned long long)sector, size);
2862 		return -EINVAL;
2863 	}
2864 	if (sector + (size>>9) > capacity) {
2865 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2866 				(unsigned long long)sector, size);
2867 		return -EINVAL;
2868 	}
2869 
2870 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2871 		verb = 1;
2872 		switch (pi->cmd) {
2873 		case P_DATA_REQUEST:
2874 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2875 			break;
2876 		case P_RS_THIN_REQ:
2877 		case P_RS_DATA_REQUEST:
2878 		case P_CSUM_RS_REQUEST:
2879 		case P_OV_REQUEST:
2880 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2881 			break;
2882 		case P_OV_REPLY:
2883 			verb = 0;
2884 			dec_rs_pending(device);
2885 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2886 			break;
2887 		default:
2888 			BUG();
2889 		}
2890 		if (verb && __ratelimit(&drbd_ratelimit_state))
2891 			drbd_err(device, "Can not satisfy peer's read request, "
2892 			    "no local data.\n");
2893 
2894 		/* drain possibly payload */
2895 		return drbd_drain_block(peer_device, pi->size);
2896 	}
2897 
2898 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2899 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2900 	 * which in turn might block on the other node at this very place.  */
2901 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2902 			size, GFP_NOIO);
2903 	if (!peer_req) {
2904 		put_ldev(device);
2905 		return -ENOMEM;
2906 	}
2907 
2908 	switch (pi->cmd) {
2909 	case P_DATA_REQUEST:
2910 		peer_req->w.cb = w_e_end_data_req;
2911 		fault_type = DRBD_FAULT_DT_RD;
2912 		/* application IO, don't drbd_rs_begin_io */
2913 		peer_req->flags |= EE_APPLICATION;
2914 		goto submit;
2915 
2916 	case P_RS_THIN_REQ:
2917 		/* If at some point in the future we have a smart way to
2918 		   find out if this data block is completely deallocated,
2919 		   then we would do something smarter here than reading
2920 		   the block... */
2921 		peer_req->flags |= EE_RS_THIN_REQ;
2922 		fallthrough;
2923 	case P_RS_DATA_REQUEST:
2924 		peer_req->w.cb = w_e_end_rsdata_req;
2925 		fault_type = DRBD_FAULT_RS_RD;
2926 		/* used in the sector offset progress display */
2927 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2928 		break;
2929 
2930 	case P_OV_REPLY:
2931 	case P_CSUM_RS_REQUEST:
2932 		fault_type = DRBD_FAULT_RS_RD;
2933 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2934 		if (!di)
2935 			goto out_free_e;
2936 
2937 		di->digest_size = pi->size;
2938 		di->digest = (((char *)di)+sizeof(struct digest_info));
2939 
2940 		peer_req->digest = di;
2941 		peer_req->flags |= EE_HAS_DIGEST;
2942 
2943 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2944 			goto out_free_e;
2945 
2946 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2947 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2948 			peer_req->w.cb = w_e_end_csum_rs_req;
2949 			/* used in the sector offset progress display */
2950 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2951 			/* remember to report stats in drbd_resync_finished */
2952 			device->use_csums = true;
2953 		} else if (pi->cmd == P_OV_REPLY) {
2954 			/* track progress, we may need to throttle */
2955 			atomic_add(size >> 9, &device->rs_sect_in);
2956 			peer_req->w.cb = w_e_end_ov_reply;
2957 			dec_rs_pending(device);
2958 			/* drbd_rs_begin_io done when we sent this request,
2959 			 * but accounting still needs to be done. */
2960 			goto submit_for_resync;
2961 		}
2962 		break;
2963 
2964 	case P_OV_REQUEST:
2965 		if (device->ov_start_sector == ~(sector_t)0 &&
2966 		    peer_device->connection->agreed_pro_version >= 90) {
2967 			unsigned long now = jiffies;
2968 			int i;
2969 			device->ov_start_sector = sector;
2970 			device->ov_position = sector;
2971 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2972 			device->rs_total = device->ov_left;
2973 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2974 				device->rs_mark_left[i] = device->ov_left;
2975 				device->rs_mark_time[i] = now;
2976 			}
2977 			drbd_info(device, "Online Verify start sector: %llu\n",
2978 					(unsigned long long)sector);
2979 		}
2980 		peer_req->w.cb = w_e_end_ov_req;
2981 		fault_type = DRBD_FAULT_RS_RD;
2982 		break;
2983 
2984 	default:
2985 		BUG();
2986 	}
2987 
2988 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2989 	 * wrt the receiver, but it is not as straightforward as it may seem.
2990 	 * Various places in the resync start and stop logic assume resync
2991 	 * requests are processed in order, requeuing this on the worker thread
2992 	 * introduces a bunch of new code for synchronization between threads.
2993 	 *
2994 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2995 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2996 	 * for application writes for the same time.  For now, just throttle
2997 	 * here, where the rest of the code expects the receiver to sleep for
2998 	 * a while, anyways.
2999 	 */
3000 
3001 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
3002 	 * this defers syncer requests for some time, before letting at least
3003 	 * on request through.  The resync controller on the receiving side
3004 	 * will adapt to the incoming rate accordingly.
3005 	 *
3006 	 * We cannot throttle here if remote is Primary/SyncTarget:
3007 	 * we would also throttle its application reads.
3008 	 * In that case, throttling is done on the SyncTarget only.
3009 	 */
3010 
3011 	/* Even though this may be a resync request, we do add to "read_ee";
3012 	 * "sync_ee" is only used for resync WRITEs.
3013 	 * Add to list early, so debugfs can find this request
3014 	 * even if we have to sleep below. */
3015 	spin_lock_irq(&device->resource->req_lock);
3016 	list_add_tail(&peer_req->w.list, &device->read_ee);
3017 	spin_unlock_irq(&device->resource->req_lock);
3018 
3019 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3020 	if (device->state.peer != R_PRIMARY
3021 	&& drbd_rs_should_slow_down(device, sector, false))
3022 		schedule_timeout_uninterruptible(HZ/10);
3023 	update_receiver_timing_details(connection, drbd_rs_begin_io);
3024 	if (drbd_rs_begin_io(device, sector))
3025 		goto out_free_e;
3026 
3027 submit_for_resync:
3028 	atomic_add(size >> 9, &device->rs_sect_ev);
3029 
3030 submit:
3031 	update_receiver_timing_details(connection, drbd_submit_peer_request);
3032 	inc_unacked(device);
3033 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3034 				     fault_type) == 0)
3035 		return 0;
3036 
3037 	/* don't care for the reason here */
3038 	drbd_err(device, "submit failed, triggering re-connect\n");
3039 
3040 out_free_e:
3041 	spin_lock_irq(&device->resource->req_lock);
3042 	list_del(&peer_req->w.list);
3043 	spin_unlock_irq(&device->resource->req_lock);
3044 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
3045 
3046 	put_ldev(device);
3047 	drbd_free_peer_req(device, peer_req);
3048 	return -EIO;
3049 }
3050 
3051 /**
3052  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3053  */
3054 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3055 {
3056 	struct drbd_device *device = peer_device->device;
3057 	int self, peer, rv = -100;
3058 	unsigned long ch_self, ch_peer;
3059 	enum drbd_after_sb_p after_sb_0p;
3060 
3061 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
3062 	peer = device->p_uuid[UI_BITMAP] & 1;
3063 
3064 	ch_peer = device->p_uuid[UI_SIZE];
3065 	ch_self = device->comm_bm_set;
3066 
3067 	rcu_read_lock();
3068 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3069 	rcu_read_unlock();
3070 	switch (after_sb_0p) {
3071 	case ASB_CONSENSUS:
3072 	case ASB_DISCARD_SECONDARY:
3073 	case ASB_CALL_HELPER:
3074 	case ASB_VIOLENTLY:
3075 		drbd_err(device, "Configuration error.\n");
3076 		break;
3077 	case ASB_DISCONNECT:
3078 		break;
3079 	case ASB_DISCARD_YOUNGER_PRI:
3080 		if (self == 0 && peer == 1) {
3081 			rv = -1;
3082 			break;
3083 		}
3084 		if (self == 1 && peer == 0) {
3085 			rv =  1;
3086 			break;
3087 		}
3088 		fallthrough;	/* to one of the other strategies */
3089 	case ASB_DISCARD_OLDER_PRI:
3090 		if (self == 0 && peer == 1) {
3091 			rv = 1;
3092 			break;
3093 		}
3094 		if (self == 1 && peer == 0) {
3095 			rv = -1;
3096 			break;
3097 		}
3098 		/* Else fall through to one of the other strategies... */
3099 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3100 		     "Using discard-least-changes instead\n");
3101 		fallthrough;
3102 	case ASB_DISCARD_ZERO_CHG:
3103 		if (ch_peer == 0 && ch_self == 0) {
3104 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3105 				? -1 : 1;
3106 			break;
3107 		} else {
3108 			if (ch_peer == 0) { rv =  1; break; }
3109 			if (ch_self == 0) { rv = -1; break; }
3110 		}
3111 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3112 			break;
3113 		fallthrough;
3114 	case ASB_DISCARD_LEAST_CHG:
3115 		if	(ch_self < ch_peer)
3116 			rv = -1;
3117 		else if (ch_self > ch_peer)
3118 			rv =  1;
3119 		else /* ( ch_self == ch_peer ) */
3120 		     /* Well, then use something else. */
3121 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3122 				? -1 : 1;
3123 		break;
3124 	case ASB_DISCARD_LOCAL:
3125 		rv = -1;
3126 		break;
3127 	case ASB_DISCARD_REMOTE:
3128 		rv =  1;
3129 	}
3130 
3131 	return rv;
3132 }
3133 
3134 /**
3135  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3136  */
3137 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3138 {
3139 	struct drbd_device *device = peer_device->device;
3140 	int hg, rv = -100;
3141 	enum drbd_after_sb_p after_sb_1p;
3142 
3143 	rcu_read_lock();
3144 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3145 	rcu_read_unlock();
3146 	switch (after_sb_1p) {
3147 	case ASB_DISCARD_YOUNGER_PRI:
3148 	case ASB_DISCARD_OLDER_PRI:
3149 	case ASB_DISCARD_LEAST_CHG:
3150 	case ASB_DISCARD_LOCAL:
3151 	case ASB_DISCARD_REMOTE:
3152 	case ASB_DISCARD_ZERO_CHG:
3153 		drbd_err(device, "Configuration error.\n");
3154 		break;
3155 	case ASB_DISCONNECT:
3156 		break;
3157 	case ASB_CONSENSUS:
3158 		hg = drbd_asb_recover_0p(peer_device);
3159 		if (hg == -1 && device->state.role == R_SECONDARY)
3160 			rv = hg;
3161 		if (hg == 1  && device->state.role == R_PRIMARY)
3162 			rv = hg;
3163 		break;
3164 	case ASB_VIOLENTLY:
3165 		rv = drbd_asb_recover_0p(peer_device);
3166 		break;
3167 	case ASB_DISCARD_SECONDARY:
3168 		return device->state.role == R_PRIMARY ? 1 : -1;
3169 	case ASB_CALL_HELPER:
3170 		hg = drbd_asb_recover_0p(peer_device);
3171 		if (hg == -1 && device->state.role == R_PRIMARY) {
3172 			enum drbd_state_rv rv2;
3173 
3174 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3175 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3176 			  * we do not need to wait for the after state change work either. */
3177 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3178 			if (rv2 != SS_SUCCESS) {
3179 				drbd_khelper(device, "pri-lost-after-sb");
3180 			} else {
3181 				drbd_warn(device, "Successfully gave up primary role.\n");
3182 				rv = hg;
3183 			}
3184 		} else
3185 			rv = hg;
3186 	}
3187 
3188 	return rv;
3189 }
3190 
3191 /**
3192  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3193  */
3194 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3195 {
3196 	struct drbd_device *device = peer_device->device;
3197 	int hg, rv = -100;
3198 	enum drbd_after_sb_p after_sb_2p;
3199 
3200 	rcu_read_lock();
3201 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3202 	rcu_read_unlock();
3203 	switch (after_sb_2p) {
3204 	case ASB_DISCARD_YOUNGER_PRI:
3205 	case ASB_DISCARD_OLDER_PRI:
3206 	case ASB_DISCARD_LEAST_CHG:
3207 	case ASB_DISCARD_LOCAL:
3208 	case ASB_DISCARD_REMOTE:
3209 	case ASB_CONSENSUS:
3210 	case ASB_DISCARD_SECONDARY:
3211 	case ASB_DISCARD_ZERO_CHG:
3212 		drbd_err(device, "Configuration error.\n");
3213 		break;
3214 	case ASB_VIOLENTLY:
3215 		rv = drbd_asb_recover_0p(peer_device);
3216 		break;
3217 	case ASB_DISCONNECT:
3218 		break;
3219 	case ASB_CALL_HELPER:
3220 		hg = drbd_asb_recover_0p(peer_device);
3221 		if (hg == -1) {
3222 			enum drbd_state_rv rv2;
3223 
3224 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3225 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3226 			  * we do not need to wait for the after state change work either. */
3227 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3228 			if (rv2 != SS_SUCCESS) {
3229 				drbd_khelper(device, "pri-lost-after-sb");
3230 			} else {
3231 				drbd_warn(device, "Successfully gave up primary role.\n");
3232 				rv = hg;
3233 			}
3234 		} else
3235 			rv = hg;
3236 	}
3237 
3238 	return rv;
3239 }
3240 
3241 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3242 			   u64 bits, u64 flags)
3243 {
3244 	if (!uuid) {
3245 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3246 		return;
3247 	}
3248 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3249 	     text,
3250 	     (unsigned long long)uuid[UI_CURRENT],
3251 	     (unsigned long long)uuid[UI_BITMAP],
3252 	     (unsigned long long)uuid[UI_HISTORY_START],
3253 	     (unsigned long long)uuid[UI_HISTORY_END],
3254 	     (unsigned long long)bits,
3255 	     (unsigned long long)flags);
3256 }
3257 
3258 /*
3259   100	after split brain try auto recover
3260     2	C_SYNC_SOURCE set BitMap
3261     1	C_SYNC_SOURCE use BitMap
3262     0	no Sync
3263    -1	C_SYNC_TARGET use BitMap
3264    -2	C_SYNC_TARGET set BitMap
3265  -100	after split brain, disconnect
3266 -1000	unrelated data
3267 -1091   requires proto 91
3268 -1096   requires proto 96
3269  */
3270 
3271 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3272 {
3273 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3274 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3275 	u64 self, peer;
3276 	int i, j;
3277 
3278 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3279 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3280 
3281 	*rule_nr = 10;
3282 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3283 		return 0;
3284 
3285 	*rule_nr = 20;
3286 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3287 	     peer != UUID_JUST_CREATED)
3288 		return -2;
3289 
3290 	*rule_nr = 30;
3291 	if (self != UUID_JUST_CREATED &&
3292 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3293 		return 2;
3294 
3295 	if (self == peer) {
3296 		int rct, dc; /* roles at crash time */
3297 
3298 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3299 
3300 			if (connection->agreed_pro_version < 91)
3301 				return -1091;
3302 
3303 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3304 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3305 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3306 				drbd_uuid_move_history(device);
3307 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3308 				device->ldev->md.uuid[UI_BITMAP] = 0;
3309 
3310 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3311 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3312 				*rule_nr = 34;
3313 			} else {
3314 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3315 				*rule_nr = 36;
3316 			}
3317 
3318 			return 1;
3319 		}
3320 
3321 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3322 
3323 			if (connection->agreed_pro_version < 91)
3324 				return -1091;
3325 
3326 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3327 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3328 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3329 
3330 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3331 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3332 				device->p_uuid[UI_BITMAP] = 0UL;
3333 
3334 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3335 				*rule_nr = 35;
3336 			} else {
3337 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3338 				*rule_nr = 37;
3339 			}
3340 
3341 			return -1;
3342 		}
3343 
3344 		/* Common power [off|failure] */
3345 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3346 			(device->p_uuid[UI_FLAGS] & 2);
3347 		/* lowest bit is set when we were primary,
3348 		 * next bit (weight 2) is set when peer was primary */
3349 		*rule_nr = 40;
3350 
3351 		/* Neither has the "crashed primary" flag set,
3352 		 * only a replication link hickup. */
3353 		if (rct == 0)
3354 			return 0;
3355 
3356 		/* Current UUID equal and no bitmap uuid; does not necessarily
3357 		 * mean this was a "simultaneous hard crash", maybe IO was
3358 		 * frozen, so no UUID-bump happened.
3359 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3360 		 * for "new-enough" peer DRBD version. */
3361 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3362 			*rule_nr = 41;
3363 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3364 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3365 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3366 			}
3367 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3368 				/* At least one has the "crashed primary" bit set,
3369 				 * both are primary now, but neither has rotated its UUIDs?
3370 				 * "Can not happen." */
3371 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3372 				return -100;
3373 			}
3374 			if (device->state.role == R_PRIMARY)
3375 				return 1;
3376 			return -1;
3377 		}
3378 
3379 		/* Both are secondary.
3380 		 * Really looks like recovery from simultaneous hard crash.
3381 		 * Check which had been primary before, and arbitrate. */
3382 		switch (rct) {
3383 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3384 		case 1: /*  self_pri && !peer_pri */ return 1;
3385 		case 2: /* !self_pri &&  peer_pri */ return -1;
3386 		case 3: /*  self_pri &&  peer_pri */
3387 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3388 			return dc ? -1 : 1;
3389 		}
3390 	}
3391 
3392 	*rule_nr = 50;
3393 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3394 	if (self == peer)
3395 		return -1;
3396 
3397 	*rule_nr = 51;
3398 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3399 	if (self == peer) {
3400 		if (connection->agreed_pro_version < 96 ?
3401 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3402 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3403 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3404 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3405 			   resync as sync source modifications of the peer's UUIDs. */
3406 
3407 			if (connection->agreed_pro_version < 91)
3408 				return -1091;
3409 
3410 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3411 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3412 
3413 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3414 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3415 
3416 			return -1;
3417 		}
3418 	}
3419 
3420 	*rule_nr = 60;
3421 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3422 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3423 		peer = device->p_uuid[i] & ~((u64)1);
3424 		if (self == peer)
3425 			return -2;
3426 	}
3427 
3428 	*rule_nr = 70;
3429 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3430 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3431 	if (self == peer)
3432 		return 1;
3433 
3434 	*rule_nr = 71;
3435 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3436 	if (self == peer) {
3437 		if (connection->agreed_pro_version < 96 ?
3438 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3439 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3440 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3441 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3442 			   resync as sync source modifications of our UUIDs. */
3443 
3444 			if (connection->agreed_pro_version < 91)
3445 				return -1091;
3446 
3447 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3448 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3449 
3450 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3451 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3452 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3453 
3454 			return 1;
3455 		}
3456 	}
3457 
3458 
3459 	*rule_nr = 80;
3460 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3461 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3462 		self = device->ldev->md.uuid[i] & ~((u64)1);
3463 		if (self == peer)
3464 			return 2;
3465 	}
3466 
3467 	*rule_nr = 90;
3468 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3469 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3470 	if (self == peer && self != ((u64)0))
3471 		return 100;
3472 
3473 	*rule_nr = 100;
3474 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3475 		self = device->ldev->md.uuid[i] & ~((u64)1);
3476 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3477 			peer = device->p_uuid[j] & ~((u64)1);
3478 			if (self == peer)
3479 				return -100;
3480 		}
3481 	}
3482 
3483 	return -1000;
3484 }
3485 
3486 /* drbd_sync_handshake() returns the new conn state on success, or
3487    CONN_MASK (-1) on failure.
3488  */
3489 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3490 					   enum drbd_role peer_role,
3491 					   enum drbd_disk_state peer_disk) __must_hold(local)
3492 {
3493 	struct drbd_device *device = peer_device->device;
3494 	enum drbd_conns rv = C_MASK;
3495 	enum drbd_disk_state mydisk;
3496 	struct net_conf *nc;
3497 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3498 
3499 	mydisk = device->state.disk;
3500 	if (mydisk == D_NEGOTIATING)
3501 		mydisk = device->new_state_tmp.disk;
3502 
3503 	drbd_info(device, "drbd_sync_handshake:\n");
3504 
3505 	spin_lock_irq(&device->ldev->md.uuid_lock);
3506 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3507 	drbd_uuid_dump(device, "peer", device->p_uuid,
3508 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3509 
3510 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3511 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3512 
3513 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3514 
3515 	if (hg == -1000) {
3516 		drbd_alert(device, "Unrelated data, aborting!\n");
3517 		return C_MASK;
3518 	}
3519 	if (hg < -0x10000) {
3520 		int proto, fflags;
3521 		hg = -hg;
3522 		proto = hg & 0xff;
3523 		fflags = (hg >> 8) & 0xff;
3524 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3525 					proto, fflags);
3526 		return C_MASK;
3527 	}
3528 	if (hg < -1000) {
3529 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3530 		return C_MASK;
3531 	}
3532 
3533 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3534 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3535 		int f = (hg == -100) || abs(hg) == 2;
3536 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3537 		if (f)
3538 			hg = hg*2;
3539 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3540 		     hg > 0 ? "source" : "target");
3541 	}
3542 
3543 	if (abs(hg) == 100)
3544 		drbd_khelper(device, "initial-split-brain");
3545 
3546 	rcu_read_lock();
3547 	nc = rcu_dereference(peer_device->connection->net_conf);
3548 	always_asbp = nc->always_asbp;
3549 	rr_conflict = nc->rr_conflict;
3550 	tentative = nc->tentative;
3551 	rcu_read_unlock();
3552 
3553 	if (hg == 100 || (hg == -100 && always_asbp)) {
3554 		int pcount = (device->state.role == R_PRIMARY)
3555 			   + (peer_role == R_PRIMARY);
3556 		int forced = (hg == -100);
3557 
3558 		switch (pcount) {
3559 		case 0:
3560 			hg = drbd_asb_recover_0p(peer_device);
3561 			break;
3562 		case 1:
3563 			hg = drbd_asb_recover_1p(peer_device);
3564 			break;
3565 		case 2:
3566 			hg = drbd_asb_recover_2p(peer_device);
3567 			break;
3568 		}
3569 		if (abs(hg) < 100) {
3570 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3571 			     "automatically solved. Sync from %s node\n",
3572 			     pcount, (hg < 0) ? "peer" : "this");
3573 			if (forced) {
3574 				drbd_warn(device, "Doing a full sync, since"
3575 				     " UUIDs where ambiguous.\n");
3576 				hg = hg*2;
3577 			}
3578 		}
3579 	}
3580 
3581 	if (hg == -100) {
3582 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3583 			hg = -1;
3584 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3585 			hg = 1;
3586 
3587 		if (abs(hg) < 100)
3588 			drbd_warn(device, "Split-Brain detected, manually solved. "
3589 			     "Sync from %s node\n",
3590 			     (hg < 0) ? "peer" : "this");
3591 	}
3592 
3593 	if (hg == -100) {
3594 		/* FIXME this log message is not correct if we end up here
3595 		 * after an attempted attach on a diskless node.
3596 		 * We just refuse to attach -- well, we drop the "connection"
3597 		 * to that disk, in a way... */
3598 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3599 		drbd_khelper(device, "split-brain");
3600 		return C_MASK;
3601 	}
3602 
3603 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3604 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3605 		return C_MASK;
3606 	}
3607 
3608 	if (hg < 0 && /* by intention we do not use mydisk here. */
3609 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3610 		switch (rr_conflict) {
3611 		case ASB_CALL_HELPER:
3612 			drbd_khelper(device, "pri-lost");
3613 			fallthrough;
3614 		case ASB_DISCONNECT:
3615 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3616 			return C_MASK;
3617 		case ASB_VIOLENTLY:
3618 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3619 			     "assumption\n");
3620 		}
3621 	}
3622 
3623 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3624 		if (hg == 0)
3625 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3626 		else
3627 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3628 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3629 				 abs(hg) >= 2 ? "full" : "bit-map based");
3630 		return C_MASK;
3631 	}
3632 
3633 	if (abs(hg) >= 2) {
3634 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3635 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3636 					BM_LOCKED_SET_ALLOWED))
3637 			return C_MASK;
3638 	}
3639 
3640 	if (hg > 0) { /* become sync source. */
3641 		rv = C_WF_BITMAP_S;
3642 	} else if (hg < 0) { /* become sync target */
3643 		rv = C_WF_BITMAP_T;
3644 	} else {
3645 		rv = C_CONNECTED;
3646 		if (drbd_bm_total_weight(device)) {
3647 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3648 			     drbd_bm_total_weight(device));
3649 		}
3650 	}
3651 
3652 	return rv;
3653 }
3654 
3655 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3656 {
3657 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3658 	if (peer == ASB_DISCARD_REMOTE)
3659 		return ASB_DISCARD_LOCAL;
3660 
3661 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3662 	if (peer == ASB_DISCARD_LOCAL)
3663 		return ASB_DISCARD_REMOTE;
3664 
3665 	/* everything else is valid if they are equal on both sides. */
3666 	return peer;
3667 }
3668 
3669 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3670 {
3671 	struct p_protocol *p = pi->data;
3672 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3673 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3674 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3675 	char integrity_alg[SHARED_SECRET_MAX] = "";
3676 	struct crypto_shash *peer_integrity_tfm = NULL;
3677 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3678 
3679 	p_proto		= be32_to_cpu(p->protocol);
3680 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3681 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3682 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3683 	p_two_primaries = be32_to_cpu(p->two_primaries);
3684 	cf		= be32_to_cpu(p->conn_flags);
3685 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3686 
3687 	if (connection->agreed_pro_version >= 87) {
3688 		int err;
3689 
3690 		if (pi->size > sizeof(integrity_alg))
3691 			return -EIO;
3692 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3693 		if (err)
3694 			return err;
3695 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3696 	}
3697 
3698 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3699 		clear_bit(CONN_DRY_RUN, &connection->flags);
3700 
3701 		if (cf & CF_DRY_RUN)
3702 			set_bit(CONN_DRY_RUN, &connection->flags);
3703 
3704 		rcu_read_lock();
3705 		nc = rcu_dereference(connection->net_conf);
3706 
3707 		if (p_proto != nc->wire_protocol) {
3708 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3709 			goto disconnect_rcu_unlock;
3710 		}
3711 
3712 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3713 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3714 			goto disconnect_rcu_unlock;
3715 		}
3716 
3717 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3718 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3719 			goto disconnect_rcu_unlock;
3720 		}
3721 
3722 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3723 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3724 			goto disconnect_rcu_unlock;
3725 		}
3726 
3727 		if (p_discard_my_data && nc->discard_my_data) {
3728 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3729 			goto disconnect_rcu_unlock;
3730 		}
3731 
3732 		if (p_two_primaries != nc->two_primaries) {
3733 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3734 			goto disconnect_rcu_unlock;
3735 		}
3736 
3737 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3738 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3739 			goto disconnect_rcu_unlock;
3740 		}
3741 
3742 		rcu_read_unlock();
3743 	}
3744 
3745 	if (integrity_alg[0]) {
3746 		int hash_size;
3747 
3748 		/*
3749 		 * We can only change the peer data integrity algorithm
3750 		 * here.  Changing our own data integrity algorithm
3751 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3752 		 * the same time; otherwise, the peer has no way to
3753 		 * tell between which packets the algorithm should
3754 		 * change.
3755 		 */
3756 
3757 		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3758 		if (IS_ERR(peer_integrity_tfm)) {
3759 			peer_integrity_tfm = NULL;
3760 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3761 				 integrity_alg);
3762 			goto disconnect;
3763 		}
3764 
3765 		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3766 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3767 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3768 		if (!(int_dig_in && int_dig_vv)) {
3769 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3770 			goto disconnect;
3771 		}
3772 	}
3773 
3774 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3775 	if (!new_net_conf) {
3776 		drbd_err(connection, "Allocation of new net_conf failed\n");
3777 		goto disconnect;
3778 	}
3779 
3780 	mutex_lock(&connection->data.mutex);
3781 	mutex_lock(&connection->resource->conf_update);
3782 	old_net_conf = connection->net_conf;
3783 	*new_net_conf = *old_net_conf;
3784 
3785 	new_net_conf->wire_protocol = p_proto;
3786 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3787 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3788 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3789 	new_net_conf->two_primaries = p_two_primaries;
3790 
3791 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3792 	mutex_unlock(&connection->resource->conf_update);
3793 	mutex_unlock(&connection->data.mutex);
3794 
3795 	crypto_free_shash(connection->peer_integrity_tfm);
3796 	kfree(connection->int_dig_in);
3797 	kfree(connection->int_dig_vv);
3798 	connection->peer_integrity_tfm = peer_integrity_tfm;
3799 	connection->int_dig_in = int_dig_in;
3800 	connection->int_dig_vv = int_dig_vv;
3801 
3802 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3803 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3804 			  integrity_alg[0] ? integrity_alg : "(none)");
3805 
3806 	synchronize_rcu();
3807 	kfree(old_net_conf);
3808 	return 0;
3809 
3810 disconnect_rcu_unlock:
3811 	rcu_read_unlock();
3812 disconnect:
3813 	crypto_free_shash(peer_integrity_tfm);
3814 	kfree(int_dig_in);
3815 	kfree(int_dig_vv);
3816 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3817 	return -EIO;
3818 }
3819 
3820 /* helper function
3821  * input: alg name, feature name
3822  * return: NULL (alg name was "")
3823  *         ERR_PTR(error) if something goes wrong
3824  *         or the crypto hash ptr, if it worked out ok. */
3825 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3826 		const struct drbd_device *device,
3827 		const char *alg, const char *name)
3828 {
3829 	struct crypto_shash *tfm;
3830 
3831 	if (!alg[0])
3832 		return NULL;
3833 
3834 	tfm = crypto_alloc_shash(alg, 0, 0);
3835 	if (IS_ERR(tfm)) {
3836 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3837 			alg, name, PTR_ERR(tfm));
3838 		return tfm;
3839 	}
3840 	return tfm;
3841 }
3842 
3843 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3844 {
3845 	void *buffer = connection->data.rbuf;
3846 	int size = pi->size;
3847 
3848 	while (size) {
3849 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3850 		s = drbd_recv(connection, buffer, s);
3851 		if (s <= 0) {
3852 			if (s < 0)
3853 				return s;
3854 			break;
3855 		}
3856 		size -= s;
3857 	}
3858 	if (size)
3859 		return -EIO;
3860 	return 0;
3861 }
3862 
3863 /*
3864  * config_unknown_volume  -  device configuration command for unknown volume
3865  *
3866  * When a device is added to an existing connection, the node on which the
3867  * device is added first will send configuration commands to its peer but the
3868  * peer will not know about the device yet.  It will warn and ignore these
3869  * commands.  Once the device is added on the second node, the second node will
3870  * send the same device configuration commands, but in the other direction.
3871  *
3872  * (We can also end up here if drbd is misconfigured.)
3873  */
3874 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3875 {
3876 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3877 		  cmdname(pi->cmd), pi->vnr);
3878 	return ignore_remaining_packet(connection, pi);
3879 }
3880 
3881 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3882 {
3883 	struct drbd_peer_device *peer_device;
3884 	struct drbd_device *device;
3885 	struct p_rs_param_95 *p;
3886 	unsigned int header_size, data_size, exp_max_sz;
3887 	struct crypto_shash *verify_tfm = NULL;
3888 	struct crypto_shash *csums_tfm = NULL;
3889 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3890 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3891 	const int apv = connection->agreed_pro_version;
3892 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3893 	unsigned int fifo_size = 0;
3894 	int err;
3895 
3896 	peer_device = conn_peer_device(connection, pi->vnr);
3897 	if (!peer_device)
3898 		return config_unknown_volume(connection, pi);
3899 	device = peer_device->device;
3900 
3901 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3902 		    : apv == 88 ? sizeof(struct p_rs_param)
3903 					+ SHARED_SECRET_MAX
3904 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3905 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3906 
3907 	if (pi->size > exp_max_sz) {
3908 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3909 		    pi->size, exp_max_sz);
3910 		return -EIO;
3911 	}
3912 
3913 	if (apv <= 88) {
3914 		header_size = sizeof(struct p_rs_param);
3915 		data_size = pi->size - header_size;
3916 	} else if (apv <= 94) {
3917 		header_size = sizeof(struct p_rs_param_89);
3918 		data_size = pi->size - header_size;
3919 		D_ASSERT(device, data_size == 0);
3920 	} else {
3921 		header_size = sizeof(struct p_rs_param_95);
3922 		data_size = pi->size - header_size;
3923 		D_ASSERT(device, data_size == 0);
3924 	}
3925 
3926 	/* initialize verify_alg and csums_alg */
3927 	p = pi->data;
3928 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3929 
3930 	err = drbd_recv_all(peer_device->connection, p, header_size);
3931 	if (err)
3932 		return err;
3933 
3934 	mutex_lock(&connection->resource->conf_update);
3935 	old_net_conf = peer_device->connection->net_conf;
3936 	if (get_ldev(device)) {
3937 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3938 		if (!new_disk_conf) {
3939 			put_ldev(device);
3940 			mutex_unlock(&connection->resource->conf_update);
3941 			drbd_err(device, "Allocation of new disk_conf failed\n");
3942 			return -ENOMEM;
3943 		}
3944 
3945 		old_disk_conf = device->ldev->disk_conf;
3946 		*new_disk_conf = *old_disk_conf;
3947 
3948 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3949 	}
3950 
3951 	if (apv >= 88) {
3952 		if (apv == 88) {
3953 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3954 				drbd_err(device, "verify-alg of wrong size, "
3955 					"peer wants %u, accepting only up to %u byte\n",
3956 					data_size, SHARED_SECRET_MAX);
3957 				err = -EIO;
3958 				goto reconnect;
3959 			}
3960 
3961 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3962 			if (err)
3963 				goto reconnect;
3964 			/* we expect NUL terminated string */
3965 			/* but just in case someone tries to be evil */
3966 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3967 			p->verify_alg[data_size-1] = 0;
3968 
3969 		} else /* apv >= 89 */ {
3970 			/* we still expect NUL terminated strings */
3971 			/* but just in case someone tries to be evil */
3972 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3973 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3974 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3975 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3976 		}
3977 
3978 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3979 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3980 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3981 				    old_net_conf->verify_alg, p->verify_alg);
3982 				goto disconnect;
3983 			}
3984 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3985 					p->verify_alg, "verify-alg");
3986 			if (IS_ERR(verify_tfm)) {
3987 				verify_tfm = NULL;
3988 				goto disconnect;
3989 			}
3990 		}
3991 
3992 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3993 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3994 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3995 				    old_net_conf->csums_alg, p->csums_alg);
3996 				goto disconnect;
3997 			}
3998 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3999 					p->csums_alg, "csums-alg");
4000 			if (IS_ERR(csums_tfm)) {
4001 				csums_tfm = NULL;
4002 				goto disconnect;
4003 			}
4004 		}
4005 
4006 		if (apv > 94 && new_disk_conf) {
4007 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4008 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4009 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4010 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4011 
4012 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4013 			if (fifo_size != device->rs_plan_s->size) {
4014 				new_plan = fifo_alloc(fifo_size);
4015 				if (!new_plan) {
4016 					drbd_err(device, "kmalloc of fifo_buffer failed");
4017 					put_ldev(device);
4018 					goto disconnect;
4019 				}
4020 			}
4021 		}
4022 
4023 		if (verify_tfm || csums_tfm) {
4024 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4025 			if (!new_net_conf) {
4026 				drbd_err(device, "Allocation of new net_conf failed\n");
4027 				goto disconnect;
4028 			}
4029 
4030 			*new_net_conf = *old_net_conf;
4031 
4032 			if (verify_tfm) {
4033 				strcpy(new_net_conf->verify_alg, p->verify_alg);
4034 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4035 				crypto_free_shash(peer_device->connection->verify_tfm);
4036 				peer_device->connection->verify_tfm = verify_tfm;
4037 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4038 			}
4039 			if (csums_tfm) {
4040 				strcpy(new_net_conf->csums_alg, p->csums_alg);
4041 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4042 				crypto_free_shash(peer_device->connection->csums_tfm);
4043 				peer_device->connection->csums_tfm = csums_tfm;
4044 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4045 			}
4046 			rcu_assign_pointer(connection->net_conf, new_net_conf);
4047 		}
4048 	}
4049 
4050 	if (new_disk_conf) {
4051 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4052 		put_ldev(device);
4053 	}
4054 
4055 	if (new_plan) {
4056 		old_plan = device->rs_plan_s;
4057 		rcu_assign_pointer(device->rs_plan_s, new_plan);
4058 	}
4059 
4060 	mutex_unlock(&connection->resource->conf_update);
4061 	synchronize_rcu();
4062 	if (new_net_conf)
4063 		kfree(old_net_conf);
4064 	kfree(old_disk_conf);
4065 	kfree(old_plan);
4066 
4067 	return 0;
4068 
4069 reconnect:
4070 	if (new_disk_conf) {
4071 		put_ldev(device);
4072 		kfree(new_disk_conf);
4073 	}
4074 	mutex_unlock(&connection->resource->conf_update);
4075 	return -EIO;
4076 
4077 disconnect:
4078 	kfree(new_plan);
4079 	if (new_disk_conf) {
4080 		put_ldev(device);
4081 		kfree(new_disk_conf);
4082 	}
4083 	mutex_unlock(&connection->resource->conf_update);
4084 	/* just for completeness: actually not needed,
4085 	 * as this is not reached if csums_tfm was ok. */
4086 	crypto_free_shash(csums_tfm);
4087 	/* but free the verify_tfm again, if csums_tfm did not work out */
4088 	crypto_free_shash(verify_tfm);
4089 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4090 	return -EIO;
4091 }
4092 
4093 /* warn if the arguments differ by more than 12.5% */
4094 static void warn_if_differ_considerably(struct drbd_device *device,
4095 	const char *s, sector_t a, sector_t b)
4096 {
4097 	sector_t d;
4098 	if (a == 0 || b == 0)
4099 		return;
4100 	d = (a > b) ? (a - b) : (b - a);
4101 	if (d > (a>>3) || d > (b>>3))
4102 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4103 		     (unsigned long long)a, (unsigned long long)b);
4104 }
4105 
4106 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4107 {
4108 	struct drbd_peer_device *peer_device;
4109 	struct drbd_device *device;
4110 	struct p_sizes *p = pi->data;
4111 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4112 	enum determine_dev_size dd = DS_UNCHANGED;
4113 	sector_t p_size, p_usize, p_csize, my_usize;
4114 	sector_t new_size, cur_size;
4115 	int ldsc = 0; /* local disk size changed */
4116 	enum dds_flags ddsf;
4117 
4118 	peer_device = conn_peer_device(connection, pi->vnr);
4119 	if (!peer_device)
4120 		return config_unknown_volume(connection, pi);
4121 	device = peer_device->device;
4122 	cur_size = get_capacity(device->vdisk);
4123 
4124 	p_size = be64_to_cpu(p->d_size);
4125 	p_usize = be64_to_cpu(p->u_size);
4126 	p_csize = be64_to_cpu(p->c_size);
4127 
4128 	/* just store the peer's disk size for now.
4129 	 * we still need to figure out whether we accept that. */
4130 	device->p_size = p_size;
4131 
4132 	if (get_ldev(device)) {
4133 		rcu_read_lock();
4134 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4135 		rcu_read_unlock();
4136 
4137 		warn_if_differ_considerably(device, "lower level device sizes",
4138 			   p_size, drbd_get_max_capacity(device->ldev));
4139 		warn_if_differ_considerably(device, "user requested size",
4140 					    p_usize, my_usize);
4141 
4142 		/* if this is the first connect, or an otherwise expected
4143 		 * param exchange, choose the minimum */
4144 		if (device->state.conn == C_WF_REPORT_PARAMS)
4145 			p_usize = min_not_zero(my_usize, p_usize);
4146 
4147 		/* Never shrink a device with usable data during connect,
4148 		 * or "attach" on the peer.
4149 		 * But allow online shrinking if we are connected. */
4150 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4151 		if (new_size < cur_size &&
4152 		    device->state.disk >= D_OUTDATED &&
4153 		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4154 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4155 					(unsigned long long)new_size, (unsigned long long)cur_size);
4156 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4157 			put_ldev(device);
4158 			return -EIO;
4159 		}
4160 
4161 		if (my_usize != p_usize) {
4162 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4163 
4164 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4165 			if (!new_disk_conf) {
4166 				drbd_err(device, "Allocation of new disk_conf failed\n");
4167 				put_ldev(device);
4168 				return -ENOMEM;
4169 			}
4170 
4171 			mutex_lock(&connection->resource->conf_update);
4172 			old_disk_conf = device->ldev->disk_conf;
4173 			*new_disk_conf = *old_disk_conf;
4174 			new_disk_conf->disk_size = p_usize;
4175 
4176 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4177 			mutex_unlock(&connection->resource->conf_update);
4178 			synchronize_rcu();
4179 			kfree(old_disk_conf);
4180 
4181 			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4182 				 (unsigned long)p_usize, (unsigned long)my_usize);
4183 		}
4184 
4185 		put_ldev(device);
4186 	}
4187 
4188 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4189 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4190 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4191 	   drbd_reconsider_queue_parameters(), we can be sure that after
4192 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4193 
4194 	ddsf = be16_to_cpu(p->dds_flags);
4195 	if (get_ldev(device)) {
4196 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4197 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4198 		put_ldev(device);
4199 		if (dd == DS_ERROR)
4200 			return -EIO;
4201 		drbd_md_sync(device);
4202 	} else {
4203 		/*
4204 		 * I am diskless, need to accept the peer's *current* size.
4205 		 * I must NOT accept the peers backing disk size,
4206 		 * it may have been larger than mine all along...
4207 		 *
4208 		 * At this point, the peer knows more about my disk, or at
4209 		 * least about what we last agreed upon, than myself.
4210 		 * So if his c_size is less than his d_size, the most likely
4211 		 * reason is that *my* d_size was smaller last time we checked.
4212 		 *
4213 		 * However, if he sends a zero current size,
4214 		 * take his (user-capped or) backing disk size anyways.
4215 		 *
4216 		 * Unless of course he does not have a disk himself.
4217 		 * In which case we ignore this completely.
4218 		 */
4219 		sector_t new_size = p_csize ?: p_usize ?: p_size;
4220 		drbd_reconsider_queue_parameters(device, NULL, o);
4221 		if (new_size == 0) {
4222 			/* Ignore, peer does not know nothing. */
4223 		} else if (new_size == cur_size) {
4224 			/* nothing to do */
4225 		} else if (cur_size != 0 && p_size == 0) {
4226 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4227 					(unsigned long long)new_size, (unsigned long long)cur_size);
4228 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4229 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4230 					(unsigned long long)new_size, (unsigned long long)cur_size);
4231 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4232 			return -EIO;
4233 		} else {
4234 			/* I believe the peer, if
4235 			 *  - I don't have a current size myself
4236 			 *  - we agree on the size anyways
4237 			 *  - I do have a current size, am Secondary,
4238 			 *    and he has the only disk
4239 			 *  - I do have a current size, am Primary,
4240 			 *    and he has the only disk,
4241 			 *    which is larger than my current size
4242 			 */
4243 			drbd_set_my_capacity(device, new_size);
4244 		}
4245 	}
4246 
4247 	if (get_ldev(device)) {
4248 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4249 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4250 			ldsc = 1;
4251 		}
4252 
4253 		put_ldev(device);
4254 	}
4255 
4256 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4257 		if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4258 		    ldsc) {
4259 			/* we have different sizes, probably peer
4260 			 * needs to know my new size... */
4261 			drbd_send_sizes(peer_device, 0, ddsf);
4262 		}
4263 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4264 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4265 			if (device->state.pdsk >= D_INCONSISTENT &&
4266 			    device->state.disk >= D_INCONSISTENT) {
4267 				if (ddsf & DDSF_NO_RESYNC)
4268 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4269 				else
4270 					resync_after_online_grow(device);
4271 			} else
4272 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4273 		}
4274 	}
4275 
4276 	return 0;
4277 }
4278 
4279 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4280 {
4281 	struct drbd_peer_device *peer_device;
4282 	struct drbd_device *device;
4283 	struct p_uuids *p = pi->data;
4284 	u64 *p_uuid;
4285 	int i, updated_uuids = 0;
4286 
4287 	peer_device = conn_peer_device(connection, pi->vnr);
4288 	if (!peer_device)
4289 		return config_unknown_volume(connection, pi);
4290 	device = peer_device->device;
4291 
4292 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4293 	if (!p_uuid) {
4294 		drbd_err(device, "kmalloc of p_uuid failed\n");
4295 		return false;
4296 	}
4297 
4298 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4299 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4300 
4301 	kfree(device->p_uuid);
4302 	device->p_uuid = p_uuid;
4303 
4304 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4305 	    device->state.disk < D_INCONSISTENT &&
4306 	    device->state.role == R_PRIMARY &&
4307 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4308 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4309 		    (unsigned long long)device->ed_uuid);
4310 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4311 		return -EIO;
4312 	}
4313 
4314 	if (get_ldev(device)) {
4315 		int skip_initial_sync =
4316 			device->state.conn == C_CONNECTED &&
4317 			peer_device->connection->agreed_pro_version >= 90 &&
4318 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4319 			(p_uuid[UI_FLAGS] & 8);
4320 		if (skip_initial_sync) {
4321 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4322 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4323 					"clear_n_write from receive_uuids",
4324 					BM_LOCKED_TEST_ALLOWED);
4325 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4326 			_drbd_uuid_set(device, UI_BITMAP, 0);
4327 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4328 					CS_VERBOSE, NULL);
4329 			drbd_md_sync(device);
4330 			updated_uuids = 1;
4331 		}
4332 		put_ldev(device);
4333 	} else if (device->state.disk < D_INCONSISTENT &&
4334 		   device->state.role == R_PRIMARY) {
4335 		/* I am a diskless primary, the peer just created a new current UUID
4336 		   for me. */
4337 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4338 	}
4339 
4340 	/* Before we test for the disk state, we should wait until an eventually
4341 	   ongoing cluster wide state change is finished. That is important if
4342 	   we are primary and are detaching from our disk. We need to see the
4343 	   new disk state... */
4344 	mutex_lock(device->state_mutex);
4345 	mutex_unlock(device->state_mutex);
4346 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4347 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4348 
4349 	if (updated_uuids)
4350 		drbd_print_uuids(device, "receiver updated UUIDs to");
4351 
4352 	return 0;
4353 }
4354 
4355 /**
4356  * convert_state() - Converts the peer's view of the cluster state to our point of view
4357  * @ps:		The state as seen by the peer.
4358  */
4359 static union drbd_state convert_state(union drbd_state ps)
4360 {
4361 	union drbd_state ms;
4362 
4363 	static enum drbd_conns c_tab[] = {
4364 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4365 		[C_CONNECTED] = C_CONNECTED,
4366 
4367 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4368 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4369 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4370 		[C_VERIFY_S]       = C_VERIFY_T,
4371 		[C_MASK]   = C_MASK,
4372 	};
4373 
4374 	ms.i = ps.i;
4375 
4376 	ms.conn = c_tab[ps.conn];
4377 	ms.peer = ps.role;
4378 	ms.role = ps.peer;
4379 	ms.pdsk = ps.disk;
4380 	ms.disk = ps.pdsk;
4381 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4382 
4383 	return ms;
4384 }
4385 
4386 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4387 {
4388 	struct drbd_peer_device *peer_device;
4389 	struct drbd_device *device;
4390 	struct p_req_state *p = pi->data;
4391 	union drbd_state mask, val;
4392 	enum drbd_state_rv rv;
4393 
4394 	peer_device = conn_peer_device(connection, pi->vnr);
4395 	if (!peer_device)
4396 		return -EIO;
4397 	device = peer_device->device;
4398 
4399 	mask.i = be32_to_cpu(p->mask);
4400 	val.i = be32_to_cpu(p->val);
4401 
4402 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4403 	    mutex_is_locked(device->state_mutex)) {
4404 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4405 		return 0;
4406 	}
4407 
4408 	mask = convert_state(mask);
4409 	val = convert_state(val);
4410 
4411 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4412 	drbd_send_sr_reply(peer_device, rv);
4413 
4414 	drbd_md_sync(device);
4415 
4416 	return 0;
4417 }
4418 
4419 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4420 {
4421 	struct p_req_state *p = pi->data;
4422 	union drbd_state mask, val;
4423 	enum drbd_state_rv rv;
4424 
4425 	mask.i = be32_to_cpu(p->mask);
4426 	val.i = be32_to_cpu(p->val);
4427 
4428 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4429 	    mutex_is_locked(&connection->cstate_mutex)) {
4430 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4431 		return 0;
4432 	}
4433 
4434 	mask = convert_state(mask);
4435 	val = convert_state(val);
4436 
4437 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4438 	conn_send_sr_reply(connection, rv);
4439 
4440 	return 0;
4441 }
4442 
4443 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4444 {
4445 	struct drbd_peer_device *peer_device;
4446 	struct drbd_device *device;
4447 	struct p_state *p = pi->data;
4448 	union drbd_state os, ns, peer_state;
4449 	enum drbd_disk_state real_peer_disk;
4450 	enum chg_state_flags cs_flags;
4451 	int rv;
4452 
4453 	peer_device = conn_peer_device(connection, pi->vnr);
4454 	if (!peer_device)
4455 		return config_unknown_volume(connection, pi);
4456 	device = peer_device->device;
4457 
4458 	peer_state.i = be32_to_cpu(p->state);
4459 
4460 	real_peer_disk = peer_state.disk;
4461 	if (peer_state.disk == D_NEGOTIATING) {
4462 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4463 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4464 	}
4465 
4466 	spin_lock_irq(&device->resource->req_lock);
4467  retry:
4468 	os = ns = drbd_read_state(device);
4469 	spin_unlock_irq(&device->resource->req_lock);
4470 
4471 	/* If some other part of the code (ack_receiver thread, timeout)
4472 	 * already decided to close the connection again,
4473 	 * we must not "re-establish" it here. */
4474 	if (os.conn <= C_TEAR_DOWN)
4475 		return -ECONNRESET;
4476 
4477 	/* If this is the "end of sync" confirmation, usually the peer disk
4478 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4479 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4480 	 * unpause-sync events has been "just right", the peer disk may
4481 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4482 	 */
4483 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4484 	    real_peer_disk == D_UP_TO_DATE &&
4485 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4486 		/* If we are (becoming) SyncSource, but peer is still in sync
4487 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4488 		 * will change to inconsistent once the peer reaches active
4489 		 * syncing states.
4490 		 * It may have changed syncer-paused flags, however, so we
4491 		 * cannot ignore this completely. */
4492 		if (peer_state.conn > C_CONNECTED &&
4493 		    peer_state.conn < C_SYNC_SOURCE)
4494 			real_peer_disk = D_INCONSISTENT;
4495 
4496 		/* if peer_state changes to connected at the same time,
4497 		 * it explicitly notifies us that it finished resync.
4498 		 * Maybe we should finish it up, too? */
4499 		else if (os.conn >= C_SYNC_SOURCE &&
4500 			 peer_state.conn == C_CONNECTED) {
4501 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4502 				drbd_resync_finished(device);
4503 			return 0;
4504 		}
4505 	}
4506 
4507 	/* explicit verify finished notification, stop sector reached. */
4508 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4509 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4510 		ov_out_of_sync_print(device);
4511 		drbd_resync_finished(device);
4512 		return 0;
4513 	}
4514 
4515 	/* peer says his disk is inconsistent, while we think it is uptodate,
4516 	 * and this happens while the peer still thinks we have a sync going on,
4517 	 * but we think we are already done with the sync.
4518 	 * We ignore this to avoid flapping pdsk.
4519 	 * This should not happen, if the peer is a recent version of drbd. */
4520 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4521 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4522 		real_peer_disk = D_UP_TO_DATE;
4523 
4524 	if (ns.conn == C_WF_REPORT_PARAMS)
4525 		ns.conn = C_CONNECTED;
4526 
4527 	if (peer_state.conn == C_AHEAD)
4528 		ns.conn = C_BEHIND;
4529 
4530 	/* TODO:
4531 	 * if (primary and diskless and peer uuid != effective uuid)
4532 	 *     abort attach on peer;
4533 	 *
4534 	 * If this node does not have good data, was already connected, but
4535 	 * the peer did a late attach only now, trying to "negotiate" with me,
4536 	 * AND I am currently Primary, possibly frozen, with some specific
4537 	 * "effective" uuid, this should never be reached, really, because
4538 	 * we first send the uuids, then the current state.
4539 	 *
4540 	 * In this scenario, we already dropped the connection hard
4541 	 * when we received the unsuitable uuids (receive_uuids().
4542 	 *
4543 	 * Should we want to change this, that is: not drop the connection in
4544 	 * receive_uuids() already, then we would need to add a branch here
4545 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4546 	 * this node is currently Diskless Primary.
4547 	 */
4548 
4549 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4550 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4551 		int cr; /* consider resync */
4552 
4553 		/* if we established a new connection */
4554 		cr  = (os.conn < C_CONNECTED);
4555 		/* if we had an established connection
4556 		 * and one of the nodes newly attaches a disk */
4557 		cr |= (os.conn == C_CONNECTED &&
4558 		       (peer_state.disk == D_NEGOTIATING ||
4559 			os.disk == D_NEGOTIATING));
4560 		/* if we have both been inconsistent, and the peer has been
4561 		 * forced to be UpToDate with --force */
4562 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4563 		/* if we had been plain connected, and the admin requested to
4564 		 * start a sync by "invalidate" or "invalidate-remote" */
4565 		cr |= (os.conn == C_CONNECTED &&
4566 				(peer_state.conn >= C_STARTING_SYNC_S &&
4567 				 peer_state.conn <= C_WF_BITMAP_T));
4568 
4569 		if (cr)
4570 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4571 
4572 		put_ldev(device);
4573 		if (ns.conn == C_MASK) {
4574 			ns.conn = C_CONNECTED;
4575 			if (device->state.disk == D_NEGOTIATING) {
4576 				drbd_force_state(device, NS(disk, D_FAILED));
4577 			} else if (peer_state.disk == D_NEGOTIATING) {
4578 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4579 				peer_state.disk = D_DISKLESS;
4580 				real_peer_disk = D_DISKLESS;
4581 			} else {
4582 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4583 					return -EIO;
4584 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4585 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4586 				return -EIO;
4587 			}
4588 		}
4589 	}
4590 
4591 	spin_lock_irq(&device->resource->req_lock);
4592 	if (os.i != drbd_read_state(device).i)
4593 		goto retry;
4594 	clear_bit(CONSIDER_RESYNC, &device->flags);
4595 	ns.peer = peer_state.role;
4596 	ns.pdsk = real_peer_disk;
4597 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4598 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4599 		ns.disk = device->new_state_tmp.disk;
4600 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4601 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4602 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4603 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4604 		   for temporal network outages! */
4605 		spin_unlock_irq(&device->resource->req_lock);
4606 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4607 		tl_clear(peer_device->connection);
4608 		drbd_uuid_new_current(device);
4609 		clear_bit(NEW_CUR_UUID, &device->flags);
4610 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4611 		return -EIO;
4612 	}
4613 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4614 	ns = drbd_read_state(device);
4615 	spin_unlock_irq(&device->resource->req_lock);
4616 
4617 	if (rv < SS_SUCCESS) {
4618 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4619 		return -EIO;
4620 	}
4621 
4622 	if (os.conn > C_WF_REPORT_PARAMS) {
4623 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4624 		    peer_state.disk != D_NEGOTIATING ) {
4625 			/* we want resync, peer has not yet decided to sync... */
4626 			/* Nowadays only used when forcing a node into primary role and
4627 			   setting its disk to UpToDate with that */
4628 			drbd_send_uuids(peer_device);
4629 			drbd_send_current_state(peer_device);
4630 		}
4631 	}
4632 
4633 	clear_bit(DISCARD_MY_DATA, &device->flags);
4634 
4635 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4636 
4637 	return 0;
4638 }
4639 
4640 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4641 {
4642 	struct drbd_peer_device *peer_device;
4643 	struct drbd_device *device;
4644 	struct p_rs_uuid *p = pi->data;
4645 
4646 	peer_device = conn_peer_device(connection, pi->vnr);
4647 	if (!peer_device)
4648 		return -EIO;
4649 	device = peer_device->device;
4650 
4651 	wait_event(device->misc_wait,
4652 		   device->state.conn == C_WF_SYNC_UUID ||
4653 		   device->state.conn == C_BEHIND ||
4654 		   device->state.conn < C_CONNECTED ||
4655 		   device->state.disk < D_NEGOTIATING);
4656 
4657 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4658 
4659 	/* Here the _drbd_uuid_ functions are right, current should
4660 	   _not_ be rotated into the history */
4661 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4662 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4663 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4664 
4665 		drbd_print_uuids(device, "updated sync uuid");
4666 		drbd_start_resync(device, C_SYNC_TARGET);
4667 
4668 		put_ldev(device);
4669 	} else
4670 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4671 
4672 	return 0;
4673 }
4674 
4675 /**
4676  * receive_bitmap_plain
4677  *
4678  * Return 0 when done, 1 when another iteration is needed, and a negative error
4679  * code upon failure.
4680  */
4681 static int
4682 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4683 		     unsigned long *p, struct bm_xfer_ctx *c)
4684 {
4685 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4686 				 drbd_header_size(peer_device->connection);
4687 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4688 				       c->bm_words - c->word_offset);
4689 	unsigned int want = num_words * sizeof(*p);
4690 	int err;
4691 
4692 	if (want != size) {
4693 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4694 		return -EIO;
4695 	}
4696 	if (want == 0)
4697 		return 0;
4698 	err = drbd_recv_all(peer_device->connection, p, want);
4699 	if (err)
4700 		return err;
4701 
4702 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4703 
4704 	c->word_offset += num_words;
4705 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4706 	if (c->bit_offset > c->bm_bits)
4707 		c->bit_offset = c->bm_bits;
4708 
4709 	return 1;
4710 }
4711 
4712 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4713 {
4714 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4715 }
4716 
4717 static int dcbp_get_start(struct p_compressed_bm *p)
4718 {
4719 	return (p->encoding & 0x80) != 0;
4720 }
4721 
4722 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4723 {
4724 	return (p->encoding >> 4) & 0x7;
4725 }
4726 
4727 /**
4728  * recv_bm_rle_bits
4729  *
4730  * Return 0 when done, 1 when another iteration is needed, and a negative error
4731  * code upon failure.
4732  */
4733 static int
4734 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4735 		struct p_compressed_bm *p,
4736 		 struct bm_xfer_ctx *c,
4737 		 unsigned int len)
4738 {
4739 	struct bitstream bs;
4740 	u64 look_ahead;
4741 	u64 rl;
4742 	u64 tmp;
4743 	unsigned long s = c->bit_offset;
4744 	unsigned long e;
4745 	int toggle = dcbp_get_start(p);
4746 	int have;
4747 	int bits;
4748 
4749 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4750 
4751 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4752 	if (bits < 0)
4753 		return -EIO;
4754 
4755 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4756 		bits = vli_decode_bits(&rl, look_ahead);
4757 		if (bits <= 0)
4758 			return -EIO;
4759 
4760 		if (toggle) {
4761 			e = s + rl -1;
4762 			if (e >= c->bm_bits) {
4763 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4764 				return -EIO;
4765 			}
4766 			_drbd_bm_set_bits(peer_device->device, s, e);
4767 		}
4768 
4769 		if (have < bits) {
4770 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4771 				have, bits, look_ahead,
4772 				(unsigned int)(bs.cur.b - p->code),
4773 				(unsigned int)bs.buf_len);
4774 			return -EIO;
4775 		}
4776 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4777 		if (likely(bits < 64))
4778 			look_ahead >>= bits;
4779 		else
4780 			look_ahead = 0;
4781 		have -= bits;
4782 
4783 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4784 		if (bits < 0)
4785 			return -EIO;
4786 		look_ahead |= tmp << have;
4787 		have += bits;
4788 	}
4789 
4790 	c->bit_offset = s;
4791 	bm_xfer_ctx_bit_to_word_offset(c);
4792 
4793 	return (s != c->bm_bits);
4794 }
4795 
4796 /**
4797  * decode_bitmap_c
4798  *
4799  * Return 0 when done, 1 when another iteration is needed, and a negative error
4800  * code upon failure.
4801  */
4802 static int
4803 decode_bitmap_c(struct drbd_peer_device *peer_device,
4804 		struct p_compressed_bm *p,
4805 		struct bm_xfer_ctx *c,
4806 		unsigned int len)
4807 {
4808 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4809 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4810 
4811 	/* other variants had been implemented for evaluation,
4812 	 * but have been dropped as this one turned out to be "best"
4813 	 * during all our tests. */
4814 
4815 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4816 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4817 	return -EIO;
4818 }
4819 
4820 void INFO_bm_xfer_stats(struct drbd_device *device,
4821 		const char *direction, struct bm_xfer_ctx *c)
4822 {
4823 	/* what would it take to transfer it "plaintext" */
4824 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4825 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4826 	unsigned int plain =
4827 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4828 		c->bm_words * sizeof(unsigned long);
4829 	unsigned int total = c->bytes[0] + c->bytes[1];
4830 	unsigned int r;
4831 
4832 	/* total can not be zero. but just in case: */
4833 	if (total == 0)
4834 		return;
4835 
4836 	/* don't report if not compressed */
4837 	if (total >= plain)
4838 		return;
4839 
4840 	/* total < plain. check for overflow, still */
4841 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4842 		                    : (1000 * total / plain);
4843 
4844 	if (r > 1000)
4845 		r = 1000;
4846 
4847 	r = 1000 - r;
4848 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4849 	     "total %u; compression: %u.%u%%\n",
4850 			direction,
4851 			c->bytes[1], c->packets[1],
4852 			c->bytes[0], c->packets[0],
4853 			total, r/10, r % 10);
4854 }
4855 
4856 /* Since we are processing the bitfield from lower addresses to higher,
4857    it does not matter if the process it in 32 bit chunks or 64 bit
4858    chunks as long as it is little endian. (Understand it as byte stream,
4859    beginning with the lowest byte...) If we would use big endian
4860    we would need to process it from the highest address to the lowest,
4861    in order to be agnostic to the 32 vs 64 bits issue.
4862 
4863    returns 0 on failure, 1 if we successfully received it. */
4864 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4865 {
4866 	struct drbd_peer_device *peer_device;
4867 	struct drbd_device *device;
4868 	struct bm_xfer_ctx c;
4869 	int err;
4870 
4871 	peer_device = conn_peer_device(connection, pi->vnr);
4872 	if (!peer_device)
4873 		return -EIO;
4874 	device = peer_device->device;
4875 
4876 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4877 	/* you are supposed to send additional out-of-sync information
4878 	 * if you actually set bits during this phase */
4879 
4880 	c = (struct bm_xfer_ctx) {
4881 		.bm_bits = drbd_bm_bits(device),
4882 		.bm_words = drbd_bm_words(device),
4883 	};
4884 
4885 	for(;;) {
4886 		if (pi->cmd == P_BITMAP)
4887 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4888 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4889 			/* MAYBE: sanity check that we speak proto >= 90,
4890 			 * and the feature is enabled! */
4891 			struct p_compressed_bm *p = pi->data;
4892 
4893 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4894 				drbd_err(device, "ReportCBitmap packet too large\n");
4895 				err = -EIO;
4896 				goto out;
4897 			}
4898 			if (pi->size <= sizeof(*p)) {
4899 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4900 				err = -EIO;
4901 				goto out;
4902 			}
4903 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4904 			if (err)
4905 			       goto out;
4906 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4907 		} else {
4908 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4909 			err = -EIO;
4910 			goto out;
4911 		}
4912 
4913 		c.packets[pi->cmd == P_BITMAP]++;
4914 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4915 
4916 		if (err <= 0) {
4917 			if (err < 0)
4918 				goto out;
4919 			break;
4920 		}
4921 		err = drbd_recv_header(peer_device->connection, pi);
4922 		if (err)
4923 			goto out;
4924 	}
4925 
4926 	INFO_bm_xfer_stats(device, "receive", &c);
4927 
4928 	if (device->state.conn == C_WF_BITMAP_T) {
4929 		enum drbd_state_rv rv;
4930 
4931 		err = drbd_send_bitmap(device);
4932 		if (err)
4933 			goto out;
4934 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4935 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4936 		D_ASSERT(device, rv == SS_SUCCESS);
4937 	} else if (device->state.conn != C_WF_BITMAP_S) {
4938 		/* admin may have requested C_DISCONNECTING,
4939 		 * other threads may have noticed network errors */
4940 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4941 		    drbd_conn_str(device->state.conn));
4942 	}
4943 	err = 0;
4944 
4945  out:
4946 	drbd_bm_unlock(device);
4947 	if (!err && device->state.conn == C_WF_BITMAP_S)
4948 		drbd_start_resync(device, C_SYNC_SOURCE);
4949 	return err;
4950 }
4951 
4952 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4953 {
4954 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4955 		 pi->cmd, pi->size);
4956 
4957 	return ignore_remaining_packet(connection, pi);
4958 }
4959 
4960 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4961 {
4962 	/* Make sure we've acked all the TCP data associated
4963 	 * with the data requests being unplugged */
4964 	tcp_sock_set_quickack(connection->data.socket->sk, 2);
4965 	return 0;
4966 }
4967 
4968 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4969 {
4970 	struct drbd_peer_device *peer_device;
4971 	struct drbd_device *device;
4972 	struct p_block_desc *p = pi->data;
4973 
4974 	peer_device = conn_peer_device(connection, pi->vnr);
4975 	if (!peer_device)
4976 		return -EIO;
4977 	device = peer_device->device;
4978 
4979 	switch (device->state.conn) {
4980 	case C_WF_SYNC_UUID:
4981 	case C_WF_BITMAP_T:
4982 	case C_BEHIND:
4983 			break;
4984 	default:
4985 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4986 				drbd_conn_str(device->state.conn));
4987 	}
4988 
4989 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4990 
4991 	return 0;
4992 }
4993 
4994 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4995 {
4996 	struct drbd_peer_device *peer_device;
4997 	struct p_block_desc *p = pi->data;
4998 	struct drbd_device *device;
4999 	sector_t sector;
5000 	int size, err = 0;
5001 
5002 	peer_device = conn_peer_device(connection, pi->vnr);
5003 	if (!peer_device)
5004 		return -EIO;
5005 	device = peer_device->device;
5006 
5007 	sector = be64_to_cpu(p->sector);
5008 	size = be32_to_cpu(p->blksize);
5009 
5010 	dec_rs_pending(device);
5011 
5012 	if (get_ldev(device)) {
5013 		struct drbd_peer_request *peer_req;
5014 		const int op = REQ_OP_WRITE_ZEROES;
5015 
5016 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5017 					       size, 0, GFP_NOIO);
5018 		if (!peer_req) {
5019 			put_ldev(device);
5020 			return -ENOMEM;
5021 		}
5022 
5023 		peer_req->w.cb = e_end_resync_block;
5024 		peer_req->submit_jif = jiffies;
5025 		peer_req->flags |= EE_TRIM;
5026 
5027 		spin_lock_irq(&device->resource->req_lock);
5028 		list_add_tail(&peer_req->w.list, &device->sync_ee);
5029 		spin_unlock_irq(&device->resource->req_lock);
5030 
5031 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
5032 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5033 
5034 		if (err) {
5035 			spin_lock_irq(&device->resource->req_lock);
5036 			list_del(&peer_req->w.list);
5037 			spin_unlock_irq(&device->resource->req_lock);
5038 
5039 			drbd_free_peer_req(device, peer_req);
5040 			put_ldev(device);
5041 			err = 0;
5042 			goto fail;
5043 		}
5044 
5045 		inc_unacked(device);
5046 
5047 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5048 		   as well as drbd_rs_complete_io() */
5049 	} else {
5050 	fail:
5051 		drbd_rs_complete_io(device, sector);
5052 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5053 	}
5054 
5055 	atomic_add(size >> 9, &device->rs_sect_in);
5056 
5057 	return err;
5058 }
5059 
5060 struct data_cmd {
5061 	int expect_payload;
5062 	unsigned int pkt_size;
5063 	int (*fn)(struct drbd_connection *, struct packet_info *);
5064 };
5065 
5066 static struct data_cmd drbd_cmd_handler[] = {
5067 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
5068 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
5069 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5070 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5071 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
5072 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5073 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5074 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5075 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5076 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
5077 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5078 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5079 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
5080 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
5081 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
5082 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5083 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5084 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5085 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5086 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5087 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5088 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5089 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5090 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5091 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5092 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
5093 	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
5094 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5095 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
5096 };
5097 
5098 static void drbdd(struct drbd_connection *connection)
5099 {
5100 	struct packet_info pi;
5101 	size_t shs; /* sub header size */
5102 	int err;
5103 
5104 	while (get_t_state(&connection->receiver) == RUNNING) {
5105 		struct data_cmd const *cmd;
5106 
5107 		drbd_thread_current_set_cpu(&connection->receiver);
5108 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5109 		if (drbd_recv_header_maybe_unplug(connection, &pi))
5110 			goto err_out;
5111 
5112 		cmd = &drbd_cmd_handler[pi.cmd];
5113 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5114 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5115 				 cmdname(pi.cmd), pi.cmd);
5116 			goto err_out;
5117 		}
5118 
5119 		shs = cmd->pkt_size;
5120 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5121 			shs += sizeof(struct o_qlim);
5122 		if (pi.size > shs && !cmd->expect_payload) {
5123 			drbd_err(connection, "No payload expected %s l:%d\n",
5124 				 cmdname(pi.cmd), pi.size);
5125 			goto err_out;
5126 		}
5127 		if (pi.size < shs) {
5128 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5129 				 cmdname(pi.cmd), (int)shs, pi.size);
5130 			goto err_out;
5131 		}
5132 
5133 		if (shs) {
5134 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5135 			err = drbd_recv_all_warn(connection, pi.data, shs);
5136 			if (err)
5137 				goto err_out;
5138 			pi.size -= shs;
5139 		}
5140 
5141 		update_receiver_timing_details(connection, cmd->fn);
5142 		err = cmd->fn(connection, &pi);
5143 		if (err) {
5144 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5145 				 cmdname(pi.cmd), err, pi.size);
5146 			goto err_out;
5147 		}
5148 	}
5149 	return;
5150 
5151     err_out:
5152 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5153 }
5154 
5155 static void conn_disconnect(struct drbd_connection *connection)
5156 {
5157 	struct drbd_peer_device *peer_device;
5158 	enum drbd_conns oc;
5159 	int vnr;
5160 
5161 	if (connection->cstate == C_STANDALONE)
5162 		return;
5163 
5164 	/* We are about to start the cleanup after connection loss.
5165 	 * Make sure drbd_make_request knows about that.
5166 	 * Usually we should be in some network failure state already,
5167 	 * but just in case we are not, we fix it up here.
5168 	 */
5169 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5170 
5171 	/* ack_receiver does not clean up anything. it must not interfere, either */
5172 	drbd_thread_stop(&connection->ack_receiver);
5173 	if (connection->ack_sender) {
5174 		destroy_workqueue(connection->ack_sender);
5175 		connection->ack_sender = NULL;
5176 	}
5177 	drbd_free_sock(connection);
5178 
5179 	rcu_read_lock();
5180 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5181 		struct drbd_device *device = peer_device->device;
5182 		kref_get(&device->kref);
5183 		rcu_read_unlock();
5184 		drbd_disconnected(peer_device);
5185 		kref_put(&device->kref, drbd_destroy_device);
5186 		rcu_read_lock();
5187 	}
5188 	rcu_read_unlock();
5189 
5190 	if (!list_empty(&connection->current_epoch->list))
5191 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5192 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5193 	atomic_set(&connection->current_epoch->epoch_size, 0);
5194 	connection->send.seen_any_write_yet = false;
5195 
5196 	drbd_info(connection, "Connection closed\n");
5197 
5198 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5199 		conn_try_outdate_peer_async(connection);
5200 
5201 	spin_lock_irq(&connection->resource->req_lock);
5202 	oc = connection->cstate;
5203 	if (oc >= C_UNCONNECTED)
5204 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5205 
5206 	spin_unlock_irq(&connection->resource->req_lock);
5207 
5208 	if (oc == C_DISCONNECTING)
5209 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5210 }
5211 
5212 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5213 {
5214 	struct drbd_device *device = peer_device->device;
5215 	unsigned int i;
5216 
5217 	/* wait for current activity to cease. */
5218 	spin_lock_irq(&device->resource->req_lock);
5219 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5220 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5221 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5222 	spin_unlock_irq(&device->resource->req_lock);
5223 
5224 	/* We do not have data structures that would allow us to
5225 	 * get the rs_pending_cnt down to 0 again.
5226 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5227 	 *    the pending RSDataRequest's we have sent.
5228 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5229 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5230 	 *  And no, it is not the sum of the reference counts in the
5231 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5232 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5233 	 *  on the fly. */
5234 	drbd_rs_cancel_all(device);
5235 	device->rs_total = 0;
5236 	device->rs_failed = 0;
5237 	atomic_set(&device->rs_pending_cnt, 0);
5238 	wake_up(&device->misc_wait);
5239 
5240 	del_timer_sync(&device->resync_timer);
5241 	resync_timer_fn(&device->resync_timer);
5242 
5243 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5244 	 * w_make_resync_request etc. which may still be on the worker queue
5245 	 * to be "canceled" */
5246 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5247 
5248 	drbd_finish_peer_reqs(device);
5249 
5250 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5251 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5252 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5253 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5254 
5255 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5256 	 * again via drbd_try_clear_on_disk_bm(). */
5257 	drbd_rs_cancel_all(device);
5258 
5259 	kfree(device->p_uuid);
5260 	device->p_uuid = NULL;
5261 
5262 	if (!drbd_suspended(device))
5263 		tl_clear(peer_device->connection);
5264 
5265 	drbd_md_sync(device);
5266 
5267 	if (get_ldev(device)) {
5268 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5269 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5270 		put_ldev(device);
5271 	}
5272 
5273 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5274 	 * want to use SO_LINGER, because apparently it can be deferred for
5275 	 * more than 20 seconds (longest time I checked).
5276 	 *
5277 	 * Actually we don't care for exactly when the network stack does its
5278 	 * put_page(), but release our reference on these pages right here.
5279 	 */
5280 	i = drbd_free_peer_reqs(device, &device->net_ee);
5281 	if (i)
5282 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5283 	i = atomic_read(&device->pp_in_use_by_net);
5284 	if (i)
5285 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5286 	i = atomic_read(&device->pp_in_use);
5287 	if (i)
5288 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5289 
5290 	D_ASSERT(device, list_empty(&device->read_ee));
5291 	D_ASSERT(device, list_empty(&device->active_ee));
5292 	D_ASSERT(device, list_empty(&device->sync_ee));
5293 	D_ASSERT(device, list_empty(&device->done_ee));
5294 
5295 	return 0;
5296 }
5297 
5298 /*
5299  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5300  * we can agree on is stored in agreed_pro_version.
5301  *
5302  * feature flags and the reserved array should be enough room for future
5303  * enhancements of the handshake protocol, and possible plugins...
5304  *
5305  * for now, they are expected to be zero, but ignored.
5306  */
5307 static int drbd_send_features(struct drbd_connection *connection)
5308 {
5309 	struct drbd_socket *sock;
5310 	struct p_connection_features *p;
5311 
5312 	sock = &connection->data;
5313 	p = conn_prepare_command(connection, sock);
5314 	if (!p)
5315 		return -EIO;
5316 	memset(p, 0, sizeof(*p));
5317 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5318 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5319 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5320 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5321 }
5322 
5323 /*
5324  * return values:
5325  *   1 yes, we have a valid connection
5326  *   0 oops, did not work out, please try again
5327  *  -1 peer talks different language,
5328  *     no point in trying again, please go standalone.
5329  */
5330 static int drbd_do_features(struct drbd_connection *connection)
5331 {
5332 	/* ASSERT current == connection->receiver ... */
5333 	struct p_connection_features *p;
5334 	const int expect = sizeof(struct p_connection_features);
5335 	struct packet_info pi;
5336 	int err;
5337 
5338 	err = drbd_send_features(connection);
5339 	if (err)
5340 		return 0;
5341 
5342 	err = drbd_recv_header(connection, &pi);
5343 	if (err)
5344 		return 0;
5345 
5346 	if (pi.cmd != P_CONNECTION_FEATURES) {
5347 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5348 			 cmdname(pi.cmd), pi.cmd);
5349 		return -1;
5350 	}
5351 
5352 	if (pi.size != expect) {
5353 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5354 		     expect, pi.size);
5355 		return -1;
5356 	}
5357 
5358 	p = pi.data;
5359 	err = drbd_recv_all_warn(connection, p, expect);
5360 	if (err)
5361 		return 0;
5362 
5363 	p->protocol_min = be32_to_cpu(p->protocol_min);
5364 	p->protocol_max = be32_to_cpu(p->protocol_max);
5365 	if (p->protocol_max == 0)
5366 		p->protocol_max = p->protocol_min;
5367 
5368 	if (PRO_VERSION_MAX < p->protocol_min ||
5369 	    PRO_VERSION_MIN > p->protocol_max)
5370 		goto incompat;
5371 
5372 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5373 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5374 
5375 	drbd_info(connection, "Handshake successful: "
5376 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5377 
5378 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5379 		  connection->agreed_features,
5380 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5381 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5382 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5383 		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5384 		  connection->agreed_features ? "" : " none");
5385 
5386 	return 1;
5387 
5388  incompat:
5389 	drbd_err(connection, "incompatible DRBD dialects: "
5390 	    "I support %d-%d, peer supports %d-%d\n",
5391 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5392 	    p->protocol_min, p->protocol_max);
5393 	return -1;
5394 }
5395 
5396 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5397 static int drbd_do_auth(struct drbd_connection *connection)
5398 {
5399 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5400 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5401 	return -1;
5402 }
5403 #else
5404 #define CHALLENGE_LEN 64
5405 
5406 /* Return value:
5407 	1 - auth succeeded,
5408 	0 - failed, try again (network error),
5409 	-1 - auth failed, don't try again.
5410 */
5411 
5412 static int drbd_do_auth(struct drbd_connection *connection)
5413 {
5414 	struct drbd_socket *sock;
5415 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5416 	char *response = NULL;
5417 	char *right_response = NULL;
5418 	char *peers_ch = NULL;
5419 	unsigned int key_len;
5420 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5421 	unsigned int resp_size;
5422 	struct shash_desc *desc;
5423 	struct packet_info pi;
5424 	struct net_conf *nc;
5425 	int err, rv;
5426 
5427 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5428 
5429 	rcu_read_lock();
5430 	nc = rcu_dereference(connection->net_conf);
5431 	key_len = strlen(nc->shared_secret);
5432 	memcpy(secret, nc->shared_secret, key_len);
5433 	rcu_read_unlock();
5434 
5435 	desc = kmalloc(sizeof(struct shash_desc) +
5436 		       crypto_shash_descsize(connection->cram_hmac_tfm),
5437 		       GFP_KERNEL);
5438 	if (!desc) {
5439 		rv = -1;
5440 		goto fail;
5441 	}
5442 	desc->tfm = connection->cram_hmac_tfm;
5443 
5444 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5445 	if (rv) {
5446 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5447 		rv = -1;
5448 		goto fail;
5449 	}
5450 
5451 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5452 
5453 	sock = &connection->data;
5454 	if (!conn_prepare_command(connection, sock)) {
5455 		rv = 0;
5456 		goto fail;
5457 	}
5458 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5459 				my_challenge, CHALLENGE_LEN);
5460 	if (!rv)
5461 		goto fail;
5462 
5463 	err = drbd_recv_header(connection, &pi);
5464 	if (err) {
5465 		rv = 0;
5466 		goto fail;
5467 	}
5468 
5469 	if (pi.cmd != P_AUTH_CHALLENGE) {
5470 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5471 			 cmdname(pi.cmd), pi.cmd);
5472 		rv = -1;
5473 		goto fail;
5474 	}
5475 
5476 	if (pi.size > CHALLENGE_LEN * 2) {
5477 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5478 		rv = -1;
5479 		goto fail;
5480 	}
5481 
5482 	if (pi.size < CHALLENGE_LEN) {
5483 		drbd_err(connection, "AuthChallenge payload too small.\n");
5484 		rv = -1;
5485 		goto fail;
5486 	}
5487 
5488 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5489 	if (peers_ch == NULL) {
5490 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5491 		rv = -1;
5492 		goto fail;
5493 	}
5494 
5495 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5496 	if (err) {
5497 		rv = 0;
5498 		goto fail;
5499 	}
5500 
5501 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5502 		drbd_err(connection, "Peer presented the same challenge!\n");
5503 		rv = -1;
5504 		goto fail;
5505 	}
5506 
5507 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5508 	response = kmalloc(resp_size, GFP_NOIO);
5509 	if (response == NULL) {
5510 		drbd_err(connection, "kmalloc of response failed\n");
5511 		rv = -1;
5512 		goto fail;
5513 	}
5514 
5515 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5516 	if (rv) {
5517 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5518 		rv = -1;
5519 		goto fail;
5520 	}
5521 
5522 	if (!conn_prepare_command(connection, sock)) {
5523 		rv = 0;
5524 		goto fail;
5525 	}
5526 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5527 				response, resp_size);
5528 	if (!rv)
5529 		goto fail;
5530 
5531 	err = drbd_recv_header(connection, &pi);
5532 	if (err) {
5533 		rv = 0;
5534 		goto fail;
5535 	}
5536 
5537 	if (pi.cmd != P_AUTH_RESPONSE) {
5538 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5539 			 cmdname(pi.cmd), pi.cmd);
5540 		rv = 0;
5541 		goto fail;
5542 	}
5543 
5544 	if (pi.size != resp_size) {
5545 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5546 		rv = 0;
5547 		goto fail;
5548 	}
5549 
5550 	err = drbd_recv_all_warn(connection, response , resp_size);
5551 	if (err) {
5552 		rv = 0;
5553 		goto fail;
5554 	}
5555 
5556 	right_response = kmalloc(resp_size, GFP_NOIO);
5557 	if (right_response == NULL) {
5558 		drbd_err(connection, "kmalloc of right_response failed\n");
5559 		rv = -1;
5560 		goto fail;
5561 	}
5562 
5563 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5564 				 right_response);
5565 	if (rv) {
5566 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5567 		rv = -1;
5568 		goto fail;
5569 	}
5570 
5571 	rv = !memcmp(response, right_response, resp_size);
5572 
5573 	if (rv)
5574 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5575 		     resp_size);
5576 	else
5577 		rv = -1;
5578 
5579  fail:
5580 	kfree(peers_ch);
5581 	kfree(response);
5582 	kfree(right_response);
5583 	if (desc) {
5584 		shash_desc_zero(desc);
5585 		kfree(desc);
5586 	}
5587 
5588 	return rv;
5589 }
5590 #endif
5591 
5592 int drbd_receiver(struct drbd_thread *thi)
5593 {
5594 	struct drbd_connection *connection = thi->connection;
5595 	int h;
5596 
5597 	drbd_info(connection, "receiver (re)started\n");
5598 
5599 	do {
5600 		h = conn_connect(connection);
5601 		if (h == 0) {
5602 			conn_disconnect(connection);
5603 			schedule_timeout_interruptible(HZ);
5604 		}
5605 		if (h == -1) {
5606 			drbd_warn(connection, "Discarding network configuration.\n");
5607 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5608 		}
5609 	} while (h == 0);
5610 
5611 	if (h > 0) {
5612 		blk_start_plug(&connection->receiver_plug);
5613 		drbdd(connection);
5614 		blk_finish_plug(&connection->receiver_plug);
5615 	}
5616 
5617 	conn_disconnect(connection);
5618 
5619 	drbd_info(connection, "receiver terminated\n");
5620 	return 0;
5621 }
5622 
5623 /* ********* acknowledge sender ******** */
5624 
5625 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5626 {
5627 	struct p_req_state_reply *p = pi->data;
5628 	int retcode = be32_to_cpu(p->retcode);
5629 
5630 	if (retcode >= SS_SUCCESS) {
5631 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5632 	} else {
5633 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5634 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5635 			 drbd_set_st_err_str(retcode), retcode);
5636 	}
5637 	wake_up(&connection->ping_wait);
5638 
5639 	return 0;
5640 }
5641 
5642 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5643 {
5644 	struct drbd_peer_device *peer_device;
5645 	struct drbd_device *device;
5646 	struct p_req_state_reply *p = pi->data;
5647 	int retcode = be32_to_cpu(p->retcode);
5648 
5649 	peer_device = conn_peer_device(connection, pi->vnr);
5650 	if (!peer_device)
5651 		return -EIO;
5652 	device = peer_device->device;
5653 
5654 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5655 		D_ASSERT(device, connection->agreed_pro_version < 100);
5656 		return got_conn_RqSReply(connection, pi);
5657 	}
5658 
5659 	if (retcode >= SS_SUCCESS) {
5660 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5661 	} else {
5662 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5663 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5664 			drbd_set_st_err_str(retcode), retcode);
5665 	}
5666 	wake_up(&device->state_wait);
5667 
5668 	return 0;
5669 }
5670 
5671 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5672 {
5673 	return drbd_send_ping_ack(connection);
5674 
5675 }
5676 
5677 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5678 {
5679 	/* restore idle timeout */
5680 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5681 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5682 		wake_up(&connection->ping_wait);
5683 
5684 	return 0;
5685 }
5686 
5687 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5688 {
5689 	struct drbd_peer_device *peer_device;
5690 	struct drbd_device *device;
5691 	struct p_block_ack *p = pi->data;
5692 	sector_t sector = be64_to_cpu(p->sector);
5693 	int blksize = be32_to_cpu(p->blksize);
5694 
5695 	peer_device = conn_peer_device(connection, pi->vnr);
5696 	if (!peer_device)
5697 		return -EIO;
5698 	device = peer_device->device;
5699 
5700 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5701 
5702 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5703 
5704 	if (get_ldev(device)) {
5705 		drbd_rs_complete_io(device, sector);
5706 		drbd_set_in_sync(device, sector, blksize);
5707 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5708 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5709 		put_ldev(device);
5710 	}
5711 	dec_rs_pending(device);
5712 	atomic_add(blksize >> 9, &device->rs_sect_in);
5713 
5714 	return 0;
5715 }
5716 
5717 static int
5718 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5719 			      struct rb_root *root, const char *func,
5720 			      enum drbd_req_event what, bool missing_ok)
5721 {
5722 	struct drbd_request *req;
5723 	struct bio_and_error m;
5724 
5725 	spin_lock_irq(&device->resource->req_lock);
5726 	req = find_request(device, root, id, sector, missing_ok, func);
5727 	if (unlikely(!req)) {
5728 		spin_unlock_irq(&device->resource->req_lock);
5729 		return -EIO;
5730 	}
5731 	__req_mod(req, what, &m);
5732 	spin_unlock_irq(&device->resource->req_lock);
5733 
5734 	if (m.bio)
5735 		complete_master_bio(device, &m);
5736 	return 0;
5737 }
5738 
5739 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5740 {
5741 	struct drbd_peer_device *peer_device;
5742 	struct drbd_device *device;
5743 	struct p_block_ack *p = pi->data;
5744 	sector_t sector = be64_to_cpu(p->sector);
5745 	int blksize = be32_to_cpu(p->blksize);
5746 	enum drbd_req_event what;
5747 
5748 	peer_device = conn_peer_device(connection, pi->vnr);
5749 	if (!peer_device)
5750 		return -EIO;
5751 	device = peer_device->device;
5752 
5753 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5754 
5755 	if (p->block_id == ID_SYNCER) {
5756 		drbd_set_in_sync(device, sector, blksize);
5757 		dec_rs_pending(device);
5758 		return 0;
5759 	}
5760 	switch (pi->cmd) {
5761 	case P_RS_WRITE_ACK:
5762 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5763 		break;
5764 	case P_WRITE_ACK:
5765 		what = WRITE_ACKED_BY_PEER;
5766 		break;
5767 	case P_RECV_ACK:
5768 		what = RECV_ACKED_BY_PEER;
5769 		break;
5770 	case P_SUPERSEDED:
5771 		what = CONFLICT_RESOLVED;
5772 		break;
5773 	case P_RETRY_WRITE:
5774 		what = POSTPONE_WRITE;
5775 		break;
5776 	default:
5777 		BUG();
5778 	}
5779 
5780 	return validate_req_change_req_state(device, p->block_id, sector,
5781 					     &device->write_requests, __func__,
5782 					     what, false);
5783 }
5784 
5785 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5786 {
5787 	struct drbd_peer_device *peer_device;
5788 	struct drbd_device *device;
5789 	struct p_block_ack *p = pi->data;
5790 	sector_t sector = be64_to_cpu(p->sector);
5791 	int size = be32_to_cpu(p->blksize);
5792 	int err;
5793 
5794 	peer_device = conn_peer_device(connection, pi->vnr);
5795 	if (!peer_device)
5796 		return -EIO;
5797 	device = peer_device->device;
5798 
5799 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5800 
5801 	if (p->block_id == ID_SYNCER) {
5802 		dec_rs_pending(device);
5803 		drbd_rs_failed_io(device, sector, size);
5804 		return 0;
5805 	}
5806 
5807 	err = validate_req_change_req_state(device, p->block_id, sector,
5808 					    &device->write_requests, __func__,
5809 					    NEG_ACKED, true);
5810 	if (err) {
5811 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5812 		   The master bio might already be completed, therefore the
5813 		   request is no longer in the collision hash. */
5814 		/* In Protocol B we might already have got a P_RECV_ACK
5815 		   but then get a P_NEG_ACK afterwards. */
5816 		drbd_set_out_of_sync(device, sector, size);
5817 	}
5818 	return 0;
5819 }
5820 
5821 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5822 {
5823 	struct drbd_peer_device *peer_device;
5824 	struct drbd_device *device;
5825 	struct p_block_ack *p = pi->data;
5826 	sector_t sector = be64_to_cpu(p->sector);
5827 
5828 	peer_device = conn_peer_device(connection, pi->vnr);
5829 	if (!peer_device)
5830 		return -EIO;
5831 	device = peer_device->device;
5832 
5833 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5834 
5835 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5836 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5837 
5838 	return validate_req_change_req_state(device, p->block_id, sector,
5839 					     &device->read_requests, __func__,
5840 					     NEG_ACKED, false);
5841 }
5842 
5843 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5844 {
5845 	struct drbd_peer_device *peer_device;
5846 	struct drbd_device *device;
5847 	sector_t sector;
5848 	int size;
5849 	struct p_block_ack *p = pi->data;
5850 
5851 	peer_device = conn_peer_device(connection, pi->vnr);
5852 	if (!peer_device)
5853 		return -EIO;
5854 	device = peer_device->device;
5855 
5856 	sector = be64_to_cpu(p->sector);
5857 	size = be32_to_cpu(p->blksize);
5858 
5859 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5860 
5861 	dec_rs_pending(device);
5862 
5863 	if (get_ldev_if_state(device, D_FAILED)) {
5864 		drbd_rs_complete_io(device, sector);
5865 		switch (pi->cmd) {
5866 		case P_NEG_RS_DREPLY:
5867 			drbd_rs_failed_io(device, sector, size);
5868 		case P_RS_CANCEL:
5869 			break;
5870 		default:
5871 			BUG();
5872 		}
5873 		put_ldev(device);
5874 	}
5875 
5876 	return 0;
5877 }
5878 
5879 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5880 {
5881 	struct p_barrier_ack *p = pi->data;
5882 	struct drbd_peer_device *peer_device;
5883 	int vnr;
5884 
5885 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5886 
5887 	rcu_read_lock();
5888 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5889 		struct drbd_device *device = peer_device->device;
5890 
5891 		if (device->state.conn == C_AHEAD &&
5892 		    atomic_read(&device->ap_in_flight) == 0 &&
5893 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5894 			device->start_resync_timer.expires = jiffies + HZ;
5895 			add_timer(&device->start_resync_timer);
5896 		}
5897 	}
5898 	rcu_read_unlock();
5899 
5900 	return 0;
5901 }
5902 
5903 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5904 {
5905 	struct drbd_peer_device *peer_device;
5906 	struct drbd_device *device;
5907 	struct p_block_ack *p = pi->data;
5908 	struct drbd_device_work *dw;
5909 	sector_t sector;
5910 	int size;
5911 
5912 	peer_device = conn_peer_device(connection, pi->vnr);
5913 	if (!peer_device)
5914 		return -EIO;
5915 	device = peer_device->device;
5916 
5917 	sector = be64_to_cpu(p->sector);
5918 	size = be32_to_cpu(p->blksize);
5919 
5920 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5921 
5922 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5923 		drbd_ov_out_of_sync_found(device, sector, size);
5924 	else
5925 		ov_out_of_sync_print(device);
5926 
5927 	if (!get_ldev(device))
5928 		return 0;
5929 
5930 	drbd_rs_complete_io(device, sector);
5931 	dec_rs_pending(device);
5932 
5933 	--device->ov_left;
5934 
5935 	/* let's advance progress step marks only for every other megabyte */
5936 	if ((device->ov_left & 0x200) == 0x200)
5937 		drbd_advance_rs_marks(device, device->ov_left);
5938 
5939 	if (device->ov_left == 0) {
5940 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5941 		if (dw) {
5942 			dw->w.cb = w_ov_finished;
5943 			dw->device = device;
5944 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5945 		} else {
5946 			drbd_err(device, "kmalloc(dw) failed.");
5947 			ov_out_of_sync_print(device);
5948 			drbd_resync_finished(device);
5949 		}
5950 	}
5951 	put_ldev(device);
5952 	return 0;
5953 }
5954 
5955 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5956 {
5957 	return 0;
5958 }
5959 
5960 struct meta_sock_cmd {
5961 	size_t pkt_size;
5962 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5963 };
5964 
5965 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5966 {
5967 	long t;
5968 	struct net_conf *nc;
5969 
5970 	rcu_read_lock();
5971 	nc = rcu_dereference(connection->net_conf);
5972 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5973 	rcu_read_unlock();
5974 
5975 	t *= HZ;
5976 	if (ping_timeout)
5977 		t /= 10;
5978 
5979 	connection->meta.socket->sk->sk_rcvtimeo = t;
5980 }
5981 
5982 static void set_ping_timeout(struct drbd_connection *connection)
5983 {
5984 	set_rcvtimeo(connection, 1);
5985 }
5986 
5987 static void set_idle_timeout(struct drbd_connection *connection)
5988 {
5989 	set_rcvtimeo(connection, 0);
5990 }
5991 
5992 static struct meta_sock_cmd ack_receiver_tbl[] = {
5993 	[P_PING]	    = { 0, got_Ping },
5994 	[P_PING_ACK]	    = { 0, got_PingAck },
5995 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5996 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5997 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5998 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5999 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
6000 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
6001 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
6002 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
6003 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
6004 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6005 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
6006 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
6007 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
6008 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6009 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
6010 };
6011 
6012 int drbd_ack_receiver(struct drbd_thread *thi)
6013 {
6014 	struct drbd_connection *connection = thi->connection;
6015 	struct meta_sock_cmd *cmd = NULL;
6016 	struct packet_info pi;
6017 	unsigned long pre_recv_jif;
6018 	int rv;
6019 	void *buf    = connection->meta.rbuf;
6020 	int received = 0;
6021 	unsigned int header_size = drbd_header_size(connection);
6022 	int expect   = header_size;
6023 	bool ping_timeout_active = false;
6024 
6025 	sched_set_fifo_low(current);
6026 
6027 	while (get_t_state(thi) == RUNNING) {
6028 		drbd_thread_current_set_cpu(thi);
6029 
6030 		conn_reclaim_net_peer_reqs(connection);
6031 
6032 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6033 			if (drbd_send_ping(connection)) {
6034 				drbd_err(connection, "drbd_send_ping has failed\n");
6035 				goto reconnect;
6036 			}
6037 			set_ping_timeout(connection);
6038 			ping_timeout_active = true;
6039 		}
6040 
6041 		pre_recv_jif = jiffies;
6042 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6043 
6044 		/* Note:
6045 		 * -EINTR	 (on meta) we got a signal
6046 		 * -EAGAIN	 (on meta) rcvtimeo expired
6047 		 * -ECONNRESET	 other side closed the connection
6048 		 * -ERESTARTSYS  (on data) we got a signal
6049 		 * rv <  0	 other than above: unexpected error!
6050 		 * rv == expected: full header or command
6051 		 * rv <  expected: "woken" by signal during receive
6052 		 * rv == 0	 : "connection shut down by peer"
6053 		 */
6054 		if (likely(rv > 0)) {
6055 			received += rv;
6056 			buf	 += rv;
6057 		} else if (rv == 0) {
6058 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6059 				long t;
6060 				rcu_read_lock();
6061 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6062 				rcu_read_unlock();
6063 
6064 				t = wait_event_timeout(connection->ping_wait,
6065 						       connection->cstate < C_WF_REPORT_PARAMS,
6066 						       t);
6067 				if (t)
6068 					break;
6069 			}
6070 			drbd_err(connection, "meta connection shut down by peer.\n");
6071 			goto reconnect;
6072 		} else if (rv == -EAGAIN) {
6073 			/* If the data socket received something meanwhile,
6074 			 * that is good enough: peer is still alive. */
6075 			if (time_after(connection->last_received, pre_recv_jif))
6076 				continue;
6077 			if (ping_timeout_active) {
6078 				drbd_err(connection, "PingAck did not arrive in time.\n");
6079 				goto reconnect;
6080 			}
6081 			set_bit(SEND_PING, &connection->flags);
6082 			continue;
6083 		} else if (rv == -EINTR) {
6084 			/* maybe drbd_thread_stop(): the while condition will notice.
6085 			 * maybe woken for send_ping: we'll send a ping above,
6086 			 * and change the rcvtimeo */
6087 			flush_signals(current);
6088 			continue;
6089 		} else {
6090 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6091 			goto reconnect;
6092 		}
6093 
6094 		if (received == expect && cmd == NULL) {
6095 			if (decode_header(connection, connection->meta.rbuf, &pi))
6096 				goto reconnect;
6097 			cmd = &ack_receiver_tbl[pi.cmd];
6098 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6099 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6100 					 cmdname(pi.cmd), pi.cmd);
6101 				goto disconnect;
6102 			}
6103 			expect = header_size + cmd->pkt_size;
6104 			if (pi.size != expect - header_size) {
6105 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6106 					pi.cmd, pi.size);
6107 				goto reconnect;
6108 			}
6109 		}
6110 		if (received == expect) {
6111 			bool err;
6112 
6113 			err = cmd->fn(connection, &pi);
6114 			if (err) {
6115 				drbd_err(connection, "%ps failed\n", cmd->fn);
6116 				goto reconnect;
6117 			}
6118 
6119 			connection->last_received = jiffies;
6120 
6121 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6122 				set_idle_timeout(connection);
6123 				ping_timeout_active = false;
6124 			}
6125 
6126 			buf	 = connection->meta.rbuf;
6127 			received = 0;
6128 			expect	 = header_size;
6129 			cmd	 = NULL;
6130 		}
6131 	}
6132 
6133 	if (0) {
6134 reconnect:
6135 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6136 		conn_md_sync(connection);
6137 	}
6138 	if (0) {
6139 disconnect:
6140 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6141 	}
6142 
6143 	drbd_info(connection, "ack_receiver terminated\n");
6144 
6145 	return 0;
6146 }
6147 
6148 void drbd_send_acks_wf(struct work_struct *ws)
6149 {
6150 	struct drbd_peer_device *peer_device =
6151 		container_of(ws, struct drbd_peer_device, send_acks_work);
6152 	struct drbd_connection *connection = peer_device->connection;
6153 	struct drbd_device *device = peer_device->device;
6154 	struct net_conf *nc;
6155 	int tcp_cork, err;
6156 
6157 	rcu_read_lock();
6158 	nc = rcu_dereference(connection->net_conf);
6159 	tcp_cork = nc->tcp_cork;
6160 	rcu_read_unlock();
6161 
6162 	if (tcp_cork)
6163 		tcp_sock_set_cork(connection->meta.socket->sk, true);
6164 
6165 	err = drbd_finish_peer_reqs(device);
6166 	kref_put(&device->kref, drbd_destroy_device);
6167 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6168 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6169 
6170 	if (err) {
6171 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6172 		return;
6173 	}
6174 
6175 	if (tcp_cork)
6176 		tcp_sock_set_cork(connection->meta.socket->sk, false);
6177 
6178 	return;
6179 }
6180