xref: /openbmc/linux/drivers/block/drbd/drbd_receiver.c (revision 2f0f2441b4a10948e2ec042b48fef13680387f7c)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_receiver.c
4 
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 
11  */
12 
13 
14 #include <linux/module.h>
15 
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18 
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #define __KERNEL_SYSCALLS__
31 #include <linux/unistd.h>
32 #include <linux/vmalloc.h>
33 #include <linux/random.h>
34 #include <linux/string.h>
35 #include <linux/scatterlist.h>
36 #include "drbd_int.h"
37 #include "drbd_protocol.h"
38 #include "drbd_req.h"
39 #include "drbd_vli.h"
40 
41 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
42 
43 struct packet_info {
44 	enum drbd_packet cmd;
45 	unsigned int size;
46 	unsigned int vnr;
47 	void *data;
48 };
49 
50 enum finish_epoch {
51 	FE_STILL_LIVE,
52 	FE_DESTROYED,
53 	FE_RECYCLED,
54 };
55 
56 static int drbd_do_features(struct drbd_connection *connection);
57 static int drbd_do_auth(struct drbd_connection *connection);
58 static int drbd_disconnected(struct drbd_peer_device *);
59 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page)))
114 		++i, page = tmp;
115 	if (len)
116 		*len = i;
117 	return page;
118 }
119 
120 static int page_chain_free(struct page *page)
121 {
122 	struct page *tmp;
123 	int i = 0;
124 	page_chain_for_each_safe(page, tmp) {
125 		put_page(page);
126 		++i;
127 	}
128 	return i;
129 }
130 
131 static void page_chain_add(struct page **head,
132 		struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135 	struct page *tmp;
136 	tmp = page_chain_tail(chain_first, NULL);
137 	BUG_ON(tmp != chain_last);
138 #endif
139 
140 	/* add chain to head */
141 	set_page_private(chain_last, (unsigned long)*head);
142 	*head = chain_first;
143 }
144 
145 static struct page *__drbd_alloc_pages(struct drbd_device *device,
146 				       unsigned int number)
147 {
148 	struct page *page = NULL;
149 	struct page *tmp = NULL;
150 	unsigned int i = 0;
151 
152 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
153 	 * So what. It saves a spin_lock. */
154 	if (drbd_pp_vacant >= number) {
155 		spin_lock(&drbd_pp_lock);
156 		page = page_chain_del(&drbd_pp_pool, number);
157 		if (page)
158 			drbd_pp_vacant -= number;
159 		spin_unlock(&drbd_pp_lock);
160 		if (page)
161 			return page;
162 	}
163 
164 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
165 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
166 	 * which in turn might block on the other node at this very place.  */
167 	for (i = 0; i < number; i++) {
168 		tmp = alloc_page(GFP_TRY);
169 		if (!tmp)
170 			break;
171 		set_page_private(tmp, (unsigned long)page);
172 		page = tmp;
173 	}
174 
175 	if (i == number)
176 		return page;
177 
178 	/* Not enough pages immediately available this time.
179 	 * No need to jump around here, drbd_alloc_pages will retry this
180 	 * function "soon". */
181 	if (page) {
182 		tmp = page_chain_tail(page, NULL);
183 		spin_lock(&drbd_pp_lock);
184 		page_chain_add(&drbd_pp_pool, page, tmp);
185 		drbd_pp_vacant += i;
186 		spin_unlock(&drbd_pp_lock);
187 	}
188 	return NULL;
189 }
190 
191 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
192 					   struct list_head *to_be_freed)
193 {
194 	struct drbd_peer_request *peer_req, *tmp;
195 
196 	/* The EEs are always appended to the end of the list. Since
197 	   they are sent in order over the wire, they have to finish
198 	   in order. As soon as we see the first not finished we can
199 	   stop to examine the list... */
200 
201 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
202 		if (drbd_peer_req_has_active_page(peer_req))
203 			break;
204 		list_move(&peer_req->w.list, to_be_freed);
205 	}
206 }
207 
208 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
209 {
210 	LIST_HEAD(reclaimed);
211 	struct drbd_peer_request *peer_req, *t;
212 
213 	spin_lock_irq(&device->resource->req_lock);
214 	reclaim_finished_net_peer_reqs(device, &reclaimed);
215 	spin_unlock_irq(&device->resource->req_lock);
216 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
217 		drbd_free_net_peer_req(device, peer_req);
218 }
219 
220 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
221 {
222 	struct drbd_peer_device *peer_device;
223 	int vnr;
224 
225 	rcu_read_lock();
226 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
227 		struct drbd_device *device = peer_device->device;
228 		if (!atomic_read(&device->pp_in_use_by_net))
229 			continue;
230 
231 		kref_get(&device->kref);
232 		rcu_read_unlock();
233 		drbd_reclaim_net_peer_reqs(device);
234 		kref_put(&device->kref, drbd_destroy_device);
235 		rcu_read_lock();
236 	}
237 	rcu_read_unlock();
238 }
239 
240 /**
241  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
242  * @device:	DRBD device.
243  * @number:	number of pages requested
244  * @retry:	whether to retry, if not enough pages are available right now
245  *
246  * Tries to allocate number pages, first from our own page pool, then from
247  * the kernel.
248  * Possibly retry until DRBD frees sufficient pages somewhere else.
249  *
250  * If this allocation would exceed the max_buffers setting, we throttle
251  * allocation (schedule_timeout) to give the system some room to breathe.
252  *
253  * We do not use max-buffers as hard limit, because it could lead to
254  * congestion and further to a distributed deadlock during online-verify or
255  * (checksum based) resync, if the max-buffers, socket buffer sizes and
256  * resync-rate settings are mis-configured.
257  *
258  * Returns a page chain linked via page->private.
259  */
260 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
261 			      bool retry)
262 {
263 	struct drbd_device *device = peer_device->device;
264 	struct page *page = NULL;
265 	struct net_conf *nc;
266 	DEFINE_WAIT(wait);
267 	unsigned int mxb;
268 
269 	rcu_read_lock();
270 	nc = rcu_dereference(peer_device->connection->net_conf);
271 	mxb = nc ? nc->max_buffers : 1000000;
272 	rcu_read_unlock();
273 
274 	if (atomic_read(&device->pp_in_use) < mxb)
275 		page = __drbd_alloc_pages(device, number);
276 
277 	/* Try to keep the fast path fast, but occasionally we need
278 	 * to reclaim the pages we lended to the network stack. */
279 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
280 		drbd_reclaim_net_peer_reqs(device);
281 
282 	while (page == NULL) {
283 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
284 
285 		drbd_reclaim_net_peer_reqs(device);
286 
287 		if (atomic_read(&device->pp_in_use) < mxb) {
288 			page = __drbd_alloc_pages(device, number);
289 			if (page)
290 				break;
291 		}
292 
293 		if (!retry)
294 			break;
295 
296 		if (signal_pending(current)) {
297 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
298 			break;
299 		}
300 
301 		if (schedule_timeout(HZ/10) == 0)
302 			mxb = UINT_MAX;
303 	}
304 	finish_wait(&drbd_pp_wait, &wait);
305 
306 	if (page)
307 		atomic_add(number, &device->pp_in_use);
308 	return page;
309 }
310 
311 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
312  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
313  * Either links the page chain back to the global pool,
314  * or returns all pages to the system. */
315 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
316 {
317 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
318 	int i;
319 
320 	if (page == NULL)
321 		return;
322 
323 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
324 		i = page_chain_free(page);
325 	else {
326 		struct page *tmp;
327 		tmp = page_chain_tail(page, &i);
328 		spin_lock(&drbd_pp_lock);
329 		page_chain_add(&drbd_pp_pool, page, tmp);
330 		drbd_pp_vacant += i;
331 		spin_unlock(&drbd_pp_lock);
332 	}
333 	i = atomic_sub_return(i, a);
334 	if (i < 0)
335 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
336 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
337 	wake_up(&drbd_pp_wait);
338 }
339 
340 /*
341 You need to hold the req_lock:
342  _drbd_wait_ee_list_empty()
343 
344 You must not have the req_lock:
345  drbd_free_peer_req()
346  drbd_alloc_peer_req()
347  drbd_free_peer_reqs()
348  drbd_ee_fix_bhs()
349  drbd_finish_peer_reqs()
350  drbd_clear_done_ee()
351  drbd_wait_ee_list_empty()
352 */
353 
354 /* normal: payload_size == request size (bi_size)
355  * w_same: payload_size == logical_block_size
356  * trim: payload_size == 0 */
357 struct drbd_peer_request *
358 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
359 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
360 {
361 	struct drbd_device *device = peer_device->device;
362 	struct drbd_peer_request *peer_req;
363 	struct page *page = NULL;
364 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
365 
366 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
367 		return NULL;
368 
369 	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
370 	if (!peer_req) {
371 		if (!(gfp_mask & __GFP_NOWARN))
372 			drbd_err(device, "%s: allocation failed\n", __func__);
373 		return NULL;
374 	}
375 
376 	if (nr_pages) {
377 		page = drbd_alloc_pages(peer_device, nr_pages,
378 					gfpflags_allow_blocking(gfp_mask));
379 		if (!page)
380 			goto fail;
381 	}
382 
383 	memset(peer_req, 0, sizeof(*peer_req));
384 	INIT_LIST_HEAD(&peer_req->w.list);
385 	drbd_clear_interval(&peer_req->i);
386 	peer_req->i.size = request_size;
387 	peer_req->i.sector = sector;
388 	peer_req->submit_jif = jiffies;
389 	peer_req->peer_device = peer_device;
390 	peer_req->pages = page;
391 	/*
392 	 * The block_id is opaque to the receiver.  It is not endianness
393 	 * converted, and sent back to the sender unchanged.
394 	 */
395 	peer_req->block_id = id;
396 
397 	return peer_req;
398 
399  fail:
400 	mempool_free(peer_req, &drbd_ee_mempool);
401 	return NULL;
402 }
403 
404 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
405 		       int is_net)
406 {
407 	might_sleep();
408 	if (peer_req->flags & EE_HAS_DIGEST)
409 		kfree(peer_req->digest);
410 	drbd_free_pages(device, peer_req->pages, is_net);
411 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
412 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
413 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
414 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
415 		drbd_al_complete_io(device, &peer_req->i);
416 	}
417 	mempool_free(peer_req, &drbd_ee_mempool);
418 }
419 
420 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
421 {
422 	LIST_HEAD(work_list);
423 	struct drbd_peer_request *peer_req, *t;
424 	int count = 0;
425 	int is_net = list == &device->net_ee;
426 
427 	spin_lock_irq(&device->resource->req_lock);
428 	list_splice_init(list, &work_list);
429 	spin_unlock_irq(&device->resource->req_lock);
430 
431 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432 		__drbd_free_peer_req(device, peer_req, is_net);
433 		count++;
434 	}
435 	return count;
436 }
437 
438 /*
439  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
440  */
441 static int drbd_finish_peer_reqs(struct drbd_device *device)
442 {
443 	LIST_HEAD(work_list);
444 	LIST_HEAD(reclaimed);
445 	struct drbd_peer_request *peer_req, *t;
446 	int err = 0;
447 
448 	spin_lock_irq(&device->resource->req_lock);
449 	reclaim_finished_net_peer_reqs(device, &reclaimed);
450 	list_splice_init(&device->done_ee, &work_list);
451 	spin_unlock_irq(&device->resource->req_lock);
452 
453 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
454 		drbd_free_net_peer_req(device, peer_req);
455 
456 	/* possible callbacks here:
457 	 * e_end_block, and e_end_resync_block, e_send_superseded.
458 	 * all ignore the last argument.
459 	 */
460 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
461 		int err2;
462 
463 		/* list_del not necessary, next/prev members not touched */
464 		err2 = peer_req->w.cb(&peer_req->w, !!err);
465 		if (!err)
466 			err = err2;
467 		drbd_free_peer_req(device, peer_req);
468 	}
469 	wake_up(&device->ee_wait);
470 
471 	return err;
472 }
473 
474 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
475 				     struct list_head *head)
476 {
477 	DEFINE_WAIT(wait);
478 
479 	/* avoids spin_lock/unlock
480 	 * and calling prepare_to_wait in the fast path */
481 	while (!list_empty(head)) {
482 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
483 		spin_unlock_irq(&device->resource->req_lock);
484 		io_schedule();
485 		finish_wait(&device->ee_wait, &wait);
486 		spin_lock_irq(&device->resource->req_lock);
487 	}
488 }
489 
490 static void drbd_wait_ee_list_empty(struct drbd_device *device,
491 				    struct list_head *head)
492 {
493 	spin_lock_irq(&device->resource->req_lock);
494 	_drbd_wait_ee_list_empty(device, head);
495 	spin_unlock_irq(&device->resource->req_lock);
496 }
497 
498 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
499 {
500 	struct kvec iov = {
501 		.iov_base = buf,
502 		.iov_len = size,
503 	};
504 	struct msghdr msg = {
505 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506 	};
507 	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
508 	return sock_recvmsg(sock, &msg, msg.msg_flags);
509 }
510 
511 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
512 {
513 	int rv;
514 
515 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
516 
517 	if (rv < 0) {
518 		if (rv == -ECONNRESET)
519 			drbd_info(connection, "sock was reset by peer\n");
520 		else if (rv != -ERESTARTSYS)
521 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
522 	} else if (rv == 0) {
523 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
524 			long t;
525 			rcu_read_lock();
526 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
527 			rcu_read_unlock();
528 
529 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
530 
531 			if (t)
532 				goto out;
533 		}
534 		drbd_info(connection, "sock was shut down by peer\n");
535 	}
536 
537 	if (rv != size)
538 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
539 
540 out:
541 	return rv;
542 }
543 
544 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
545 {
546 	int err;
547 
548 	err = drbd_recv(connection, buf, size);
549 	if (err != size) {
550 		if (err >= 0)
551 			err = -EIO;
552 	} else
553 		err = 0;
554 	return err;
555 }
556 
557 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
558 {
559 	int err;
560 
561 	err = drbd_recv_all(connection, buf, size);
562 	if (err && !signal_pending(current))
563 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
564 	return err;
565 }
566 
567 /* quoting tcp(7):
568  *   On individual connections, the socket buffer size must be set prior to the
569  *   listen(2) or connect(2) calls in order to have it take effect.
570  * This is our wrapper to do so.
571  */
572 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
573 		unsigned int rcv)
574 {
575 	/* open coded SO_SNDBUF, SO_RCVBUF */
576 	if (snd) {
577 		sock->sk->sk_sndbuf = snd;
578 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
579 	}
580 	if (rcv) {
581 		sock->sk->sk_rcvbuf = rcv;
582 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
583 	}
584 }
585 
586 static struct socket *drbd_try_connect(struct drbd_connection *connection)
587 {
588 	const char *what;
589 	struct socket *sock;
590 	struct sockaddr_in6 src_in6;
591 	struct sockaddr_in6 peer_in6;
592 	struct net_conf *nc;
593 	int err, peer_addr_len, my_addr_len;
594 	int sndbuf_size, rcvbuf_size, connect_int;
595 	int disconnect_on_error = 1;
596 
597 	rcu_read_lock();
598 	nc = rcu_dereference(connection->net_conf);
599 	if (!nc) {
600 		rcu_read_unlock();
601 		return NULL;
602 	}
603 	sndbuf_size = nc->sndbuf_size;
604 	rcvbuf_size = nc->rcvbuf_size;
605 	connect_int = nc->connect_int;
606 	rcu_read_unlock();
607 
608 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
609 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
610 
611 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
612 		src_in6.sin6_port = 0;
613 	else
614 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615 
616 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
617 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
618 
619 	what = "sock_create_kern";
620 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
621 			       SOCK_STREAM, IPPROTO_TCP, &sock);
622 	if (err < 0) {
623 		sock = NULL;
624 		goto out;
625 	}
626 
627 	sock->sk->sk_rcvtimeo =
628 	sock->sk->sk_sndtimeo = connect_int * HZ;
629 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
630 
631        /* explicitly bind to the configured IP as source IP
632 	*  for the outgoing connections.
633 	*  This is needed for multihomed hosts and to be
634 	*  able to use lo: interfaces for drbd.
635 	* Make sure to use 0 as port number, so linux selects
636 	*  a free one dynamically.
637 	*/
638 	what = "bind before connect";
639 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
640 	if (err < 0)
641 		goto out;
642 
643 	/* connect may fail, peer not yet available.
644 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
645 	disconnect_on_error = 0;
646 	what = "connect";
647 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
648 
649 out:
650 	if (err < 0) {
651 		if (sock) {
652 			sock_release(sock);
653 			sock = NULL;
654 		}
655 		switch (-err) {
656 			/* timeout, busy, signal pending */
657 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
658 		case EINTR: case ERESTARTSYS:
659 			/* peer not (yet) available, network problem */
660 		case ECONNREFUSED: case ENETUNREACH:
661 		case EHOSTDOWN:    case EHOSTUNREACH:
662 			disconnect_on_error = 0;
663 			break;
664 		default:
665 			drbd_err(connection, "%s failed, err = %d\n", what, err);
666 		}
667 		if (disconnect_on_error)
668 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
669 	}
670 
671 	return sock;
672 }
673 
674 struct accept_wait_data {
675 	struct drbd_connection *connection;
676 	struct socket *s_listen;
677 	struct completion door_bell;
678 	void (*original_sk_state_change)(struct sock *sk);
679 
680 };
681 
682 static void drbd_incoming_connection(struct sock *sk)
683 {
684 	struct accept_wait_data *ad = sk->sk_user_data;
685 	void (*state_change)(struct sock *sk);
686 
687 	state_change = ad->original_sk_state_change;
688 	if (sk->sk_state == TCP_ESTABLISHED)
689 		complete(&ad->door_bell);
690 	state_change(sk);
691 }
692 
693 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
694 {
695 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
696 	struct sockaddr_in6 my_addr;
697 	struct socket *s_listen;
698 	struct net_conf *nc;
699 	const char *what;
700 
701 	rcu_read_lock();
702 	nc = rcu_dereference(connection->net_conf);
703 	if (!nc) {
704 		rcu_read_unlock();
705 		return -EIO;
706 	}
707 	sndbuf_size = nc->sndbuf_size;
708 	rcvbuf_size = nc->rcvbuf_size;
709 	rcu_read_unlock();
710 
711 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
712 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
713 
714 	what = "sock_create_kern";
715 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
716 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
717 	if (err) {
718 		s_listen = NULL;
719 		goto out;
720 	}
721 
722 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
723 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
724 
725 	what = "bind before listen";
726 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
727 	if (err < 0)
728 		goto out;
729 
730 	ad->s_listen = s_listen;
731 	write_lock_bh(&s_listen->sk->sk_callback_lock);
732 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
733 	s_listen->sk->sk_state_change = drbd_incoming_connection;
734 	s_listen->sk->sk_user_data = ad;
735 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
736 
737 	what = "listen";
738 	err = s_listen->ops->listen(s_listen, 5);
739 	if (err < 0)
740 		goto out;
741 
742 	return 0;
743 out:
744 	if (s_listen)
745 		sock_release(s_listen);
746 	if (err < 0) {
747 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
748 			drbd_err(connection, "%s failed, err = %d\n", what, err);
749 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
750 		}
751 	}
752 
753 	return -EIO;
754 }
755 
756 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
757 {
758 	write_lock_bh(&sk->sk_callback_lock);
759 	sk->sk_state_change = ad->original_sk_state_change;
760 	sk->sk_user_data = NULL;
761 	write_unlock_bh(&sk->sk_callback_lock);
762 }
763 
764 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
765 {
766 	int timeo, connect_int, err = 0;
767 	struct socket *s_estab = NULL;
768 	struct net_conf *nc;
769 
770 	rcu_read_lock();
771 	nc = rcu_dereference(connection->net_conf);
772 	if (!nc) {
773 		rcu_read_unlock();
774 		return NULL;
775 	}
776 	connect_int = nc->connect_int;
777 	rcu_read_unlock();
778 
779 	timeo = connect_int * HZ;
780 	/* 28.5% random jitter */
781 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
782 
783 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
784 	if (err <= 0)
785 		return NULL;
786 
787 	err = kernel_accept(ad->s_listen, &s_estab, 0);
788 	if (err < 0) {
789 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
790 			drbd_err(connection, "accept failed, err = %d\n", err);
791 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
792 		}
793 	}
794 
795 	if (s_estab)
796 		unregister_state_change(s_estab->sk, ad);
797 
798 	return s_estab;
799 }
800 
801 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
802 
803 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
804 			     enum drbd_packet cmd)
805 {
806 	if (!conn_prepare_command(connection, sock))
807 		return -EIO;
808 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
809 }
810 
811 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
812 {
813 	unsigned int header_size = drbd_header_size(connection);
814 	struct packet_info pi;
815 	struct net_conf *nc;
816 	int err;
817 
818 	rcu_read_lock();
819 	nc = rcu_dereference(connection->net_conf);
820 	if (!nc) {
821 		rcu_read_unlock();
822 		return -EIO;
823 	}
824 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
825 	rcu_read_unlock();
826 
827 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
828 	if (err != header_size) {
829 		if (err >= 0)
830 			err = -EIO;
831 		return err;
832 	}
833 	err = decode_header(connection, connection->data.rbuf, &pi);
834 	if (err)
835 		return err;
836 	return pi.cmd;
837 }
838 
839 /**
840  * drbd_socket_okay() - Free the socket if its connection is not okay
841  * @sock:	pointer to the pointer to the socket.
842  */
843 static bool drbd_socket_okay(struct socket **sock)
844 {
845 	int rr;
846 	char tb[4];
847 
848 	if (!*sock)
849 		return false;
850 
851 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
852 
853 	if (rr > 0 || rr == -EAGAIN) {
854 		return true;
855 	} else {
856 		sock_release(*sock);
857 		*sock = NULL;
858 		return false;
859 	}
860 }
861 
862 static bool connection_established(struct drbd_connection *connection,
863 				   struct socket **sock1,
864 				   struct socket **sock2)
865 {
866 	struct net_conf *nc;
867 	int timeout;
868 	bool ok;
869 
870 	if (!*sock1 || !*sock2)
871 		return false;
872 
873 	rcu_read_lock();
874 	nc = rcu_dereference(connection->net_conf);
875 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
876 	rcu_read_unlock();
877 	schedule_timeout_interruptible(timeout);
878 
879 	ok = drbd_socket_okay(sock1);
880 	ok = drbd_socket_okay(sock2) && ok;
881 
882 	return ok;
883 }
884 
885 /* Gets called if a connection is established, or if a new minor gets created
886    in a connection */
887 int drbd_connected(struct drbd_peer_device *peer_device)
888 {
889 	struct drbd_device *device = peer_device->device;
890 	int err;
891 
892 	atomic_set(&device->packet_seq, 0);
893 	device->peer_seq = 0;
894 
895 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
896 		&peer_device->connection->cstate_mutex :
897 		&device->own_state_mutex;
898 
899 	err = drbd_send_sync_param(peer_device);
900 	if (!err)
901 		err = drbd_send_sizes(peer_device, 0, 0);
902 	if (!err)
903 		err = drbd_send_uuids(peer_device);
904 	if (!err)
905 		err = drbd_send_current_state(peer_device);
906 	clear_bit(USE_DEGR_WFC_T, &device->flags);
907 	clear_bit(RESIZE_PENDING, &device->flags);
908 	atomic_set(&device->ap_in_flight, 0);
909 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
910 	return err;
911 }
912 
913 /*
914  * return values:
915  *   1 yes, we have a valid connection
916  *   0 oops, did not work out, please try again
917  *  -1 peer talks different language,
918  *     no point in trying again, please go standalone.
919  *  -2 We do not have a network config...
920  */
921 static int conn_connect(struct drbd_connection *connection)
922 {
923 	struct drbd_socket sock, msock;
924 	struct drbd_peer_device *peer_device;
925 	struct net_conf *nc;
926 	int vnr, timeout, h;
927 	bool discard_my_data, ok;
928 	enum drbd_state_rv rv;
929 	struct accept_wait_data ad = {
930 		.connection = connection,
931 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
932 	};
933 
934 	clear_bit(DISCONNECT_SENT, &connection->flags);
935 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
936 		return -2;
937 
938 	mutex_init(&sock.mutex);
939 	sock.sbuf = connection->data.sbuf;
940 	sock.rbuf = connection->data.rbuf;
941 	sock.socket = NULL;
942 	mutex_init(&msock.mutex);
943 	msock.sbuf = connection->meta.sbuf;
944 	msock.rbuf = connection->meta.rbuf;
945 	msock.socket = NULL;
946 
947 	/* Assume that the peer only understands protocol 80 until we know better.  */
948 	connection->agreed_pro_version = 80;
949 
950 	if (prepare_listen_socket(connection, &ad))
951 		return 0;
952 
953 	do {
954 		struct socket *s;
955 
956 		s = drbd_try_connect(connection);
957 		if (s) {
958 			if (!sock.socket) {
959 				sock.socket = s;
960 				send_first_packet(connection, &sock, P_INITIAL_DATA);
961 			} else if (!msock.socket) {
962 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
963 				msock.socket = s;
964 				send_first_packet(connection, &msock, P_INITIAL_META);
965 			} else {
966 				drbd_err(connection, "Logic error in conn_connect()\n");
967 				goto out_release_sockets;
968 			}
969 		}
970 
971 		if (connection_established(connection, &sock.socket, &msock.socket))
972 			break;
973 
974 retry:
975 		s = drbd_wait_for_connect(connection, &ad);
976 		if (s) {
977 			int fp = receive_first_packet(connection, s);
978 			drbd_socket_okay(&sock.socket);
979 			drbd_socket_okay(&msock.socket);
980 			switch (fp) {
981 			case P_INITIAL_DATA:
982 				if (sock.socket) {
983 					drbd_warn(connection, "initial packet S crossed\n");
984 					sock_release(sock.socket);
985 					sock.socket = s;
986 					goto randomize;
987 				}
988 				sock.socket = s;
989 				break;
990 			case P_INITIAL_META:
991 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
992 				if (msock.socket) {
993 					drbd_warn(connection, "initial packet M crossed\n");
994 					sock_release(msock.socket);
995 					msock.socket = s;
996 					goto randomize;
997 				}
998 				msock.socket = s;
999 				break;
1000 			default:
1001 				drbd_warn(connection, "Error receiving initial packet\n");
1002 				sock_release(s);
1003 randomize:
1004 				if (prandom_u32() & 1)
1005 					goto retry;
1006 			}
1007 		}
1008 
1009 		if (connection->cstate <= C_DISCONNECTING)
1010 			goto out_release_sockets;
1011 		if (signal_pending(current)) {
1012 			flush_signals(current);
1013 			smp_rmb();
1014 			if (get_t_state(&connection->receiver) == EXITING)
1015 				goto out_release_sockets;
1016 		}
1017 
1018 		ok = connection_established(connection, &sock.socket, &msock.socket);
1019 	} while (!ok);
1020 
1021 	if (ad.s_listen)
1022 		sock_release(ad.s_listen);
1023 
1024 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1025 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1026 
1027 	sock.socket->sk->sk_allocation = GFP_NOIO;
1028 	msock.socket->sk->sk_allocation = GFP_NOIO;
1029 
1030 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1031 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1032 
1033 	/* NOT YET ...
1034 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1035 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1036 	 * first set it to the P_CONNECTION_FEATURES timeout,
1037 	 * which we set to 4x the configured ping_timeout. */
1038 	rcu_read_lock();
1039 	nc = rcu_dereference(connection->net_conf);
1040 
1041 	sock.socket->sk->sk_sndtimeo =
1042 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1043 
1044 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1045 	timeout = nc->timeout * HZ / 10;
1046 	discard_my_data = nc->discard_my_data;
1047 	rcu_read_unlock();
1048 
1049 	msock.socket->sk->sk_sndtimeo = timeout;
1050 
1051 	/* we don't want delays.
1052 	 * we use TCP_CORK where appropriate, though */
1053 	drbd_tcp_nodelay(sock.socket);
1054 	drbd_tcp_nodelay(msock.socket);
1055 
1056 	connection->data.socket = sock.socket;
1057 	connection->meta.socket = msock.socket;
1058 	connection->last_received = jiffies;
1059 
1060 	h = drbd_do_features(connection);
1061 	if (h <= 0)
1062 		return h;
1063 
1064 	if (connection->cram_hmac_tfm) {
1065 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1066 		switch (drbd_do_auth(connection)) {
1067 		case -1:
1068 			drbd_err(connection, "Authentication of peer failed\n");
1069 			return -1;
1070 		case 0:
1071 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1072 			return 0;
1073 		}
1074 	}
1075 
1076 	connection->data.socket->sk->sk_sndtimeo = timeout;
1077 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1078 
1079 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1080 		return -1;
1081 
1082 	/* Prevent a race between resync-handshake and
1083 	 * being promoted to Primary.
1084 	 *
1085 	 * Grab and release the state mutex, so we know that any current
1086 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1087 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1088 	 */
1089 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1090 		mutex_lock(peer_device->device->state_mutex);
1091 
1092 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1093 	spin_lock_irq(&connection->resource->req_lock);
1094 	set_bit(STATE_SENT, &connection->flags);
1095 	spin_unlock_irq(&connection->resource->req_lock);
1096 
1097 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1098 		mutex_unlock(peer_device->device->state_mutex);
1099 
1100 	rcu_read_lock();
1101 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1102 		struct drbd_device *device = peer_device->device;
1103 		kref_get(&device->kref);
1104 		rcu_read_unlock();
1105 
1106 		if (discard_my_data)
1107 			set_bit(DISCARD_MY_DATA, &device->flags);
1108 		else
1109 			clear_bit(DISCARD_MY_DATA, &device->flags);
1110 
1111 		drbd_connected(peer_device);
1112 		kref_put(&device->kref, drbd_destroy_device);
1113 		rcu_read_lock();
1114 	}
1115 	rcu_read_unlock();
1116 
1117 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1118 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1119 		clear_bit(STATE_SENT, &connection->flags);
1120 		return 0;
1121 	}
1122 
1123 	drbd_thread_start(&connection->ack_receiver);
1124 	/* opencoded create_singlethread_workqueue(),
1125 	 * to be able to use format string arguments */
1126 	connection->ack_sender =
1127 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1128 	if (!connection->ack_sender) {
1129 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1130 		return 0;
1131 	}
1132 
1133 	mutex_lock(&connection->resource->conf_update);
1134 	/* The discard_my_data flag is a single-shot modifier to the next
1135 	 * connection attempt, the handshake of which is now well underway.
1136 	 * No need for rcu style copying of the whole struct
1137 	 * just to clear a single value. */
1138 	connection->net_conf->discard_my_data = 0;
1139 	mutex_unlock(&connection->resource->conf_update);
1140 
1141 	return h;
1142 
1143 out_release_sockets:
1144 	if (ad.s_listen)
1145 		sock_release(ad.s_listen);
1146 	if (sock.socket)
1147 		sock_release(sock.socket);
1148 	if (msock.socket)
1149 		sock_release(msock.socket);
1150 	return -1;
1151 }
1152 
1153 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1154 {
1155 	unsigned int header_size = drbd_header_size(connection);
1156 
1157 	if (header_size == sizeof(struct p_header100) &&
1158 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1159 		struct p_header100 *h = header;
1160 		if (h->pad != 0) {
1161 			drbd_err(connection, "Header padding is not zero\n");
1162 			return -EINVAL;
1163 		}
1164 		pi->vnr = be16_to_cpu(h->volume);
1165 		pi->cmd = be16_to_cpu(h->command);
1166 		pi->size = be32_to_cpu(h->length);
1167 	} else if (header_size == sizeof(struct p_header95) &&
1168 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1169 		struct p_header95 *h = header;
1170 		pi->cmd = be16_to_cpu(h->command);
1171 		pi->size = be32_to_cpu(h->length);
1172 		pi->vnr = 0;
1173 	} else if (header_size == sizeof(struct p_header80) &&
1174 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1175 		struct p_header80 *h = header;
1176 		pi->cmd = be16_to_cpu(h->command);
1177 		pi->size = be16_to_cpu(h->length);
1178 		pi->vnr = 0;
1179 	} else {
1180 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1181 			 be32_to_cpu(*(__be32 *)header),
1182 			 connection->agreed_pro_version);
1183 		return -EINVAL;
1184 	}
1185 	pi->data = header + header_size;
1186 	return 0;
1187 }
1188 
1189 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1190 {
1191 	if (current->plug == &connection->receiver_plug) {
1192 		blk_finish_plug(&connection->receiver_plug);
1193 		blk_start_plug(&connection->receiver_plug);
1194 	} /* else: maybe just schedule() ?? */
1195 }
1196 
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198 {
1199 	void *buffer = connection->data.rbuf;
1200 	int err;
1201 
1202 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203 	if (err)
1204 		return err;
1205 
1206 	err = decode_header(connection, buffer, pi);
1207 	connection->last_received = jiffies;
1208 
1209 	return err;
1210 }
1211 
1212 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1213 {
1214 	void *buffer = connection->data.rbuf;
1215 	unsigned int size = drbd_header_size(connection);
1216 	int err;
1217 
1218 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1219 	if (err != size) {
1220 		/* If we have nothing in the receive buffer now, to reduce
1221 		 * application latency, try to drain the backend queues as
1222 		 * quickly as possible, and let remote TCP know what we have
1223 		 * received so far. */
1224 		if (err == -EAGAIN) {
1225 			drbd_tcp_quickack(connection->data.socket);
1226 			drbd_unplug_all_devices(connection);
1227 		}
1228 		if (err > 0) {
1229 			buffer += err;
1230 			size -= err;
1231 		}
1232 		err = drbd_recv_all_warn(connection, buffer, size);
1233 		if (err)
1234 			return err;
1235 	}
1236 
1237 	err = decode_header(connection, connection->data.rbuf, pi);
1238 	connection->last_received = jiffies;
1239 
1240 	return err;
1241 }
1242 /* This is blkdev_issue_flush, but asynchronous.
1243  * We want to submit to all component volumes in parallel,
1244  * then wait for all completions.
1245  */
1246 struct issue_flush_context {
1247 	atomic_t pending;
1248 	int error;
1249 	struct completion done;
1250 };
1251 struct one_flush_context {
1252 	struct drbd_device *device;
1253 	struct issue_flush_context *ctx;
1254 };
1255 
1256 static void one_flush_endio(struct bio *bio)
1257 {
1258 	struct one_flush_context *octx = bio->bi_private;
1259 	struct drbd_device *device = octx->device;
1260 	struct issue_flush_context *ctx = octx->ctx;
1261 
1262 	if (bio->bi_status) {
1263 		ctx->error = blk_status_to_errno(bio->bi_status);
1264 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1265 	}
1266 	kfree(octx);
1267 	bio_put(bio);
1268 
1269 	clear_bit(FLUSH_PENDING, &device->flags);
1270 	put_ldev(device);
1271 	kref_put(&device->kref, drbd_destroy_device);
1272 
1273 	if (atomic_dec_and_test(&ctx->pending))
1274 		complete(&ctx->done);
1275 }
1276 
1277 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1278 {
1279 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1280 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1281 	if (!bio || !octx) {
1282 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1283 		/* FIXME: what else can I do now?  disconnecting or detaching
1284 		 * really does not help to improve the state of the world, either.
1285 		 */
1286 		kfree(octx);
1287 		if (bio)
1288 			bio_put(bio);
1289 
1290 		ctx->error = -ENOMEM;
1291 		put_ldev(device);
1292 		kref_put(&device->kref, drbd_destroy_device);
1293 		return;
1294 	}
1295 
1296 	octx->device = device;
1297 	octx->ctx = ctx;
1298 	bio_set_dev(bio, device->ldev->backing_bdev);
1299 	bio->bi_private = octx;
1300 	bio->bi_end_io = one_flush_endio;
1301 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1302 
1303 	device->flush_jif = jiffies;
1304 	set_bit(FLUSH_PENDING, &device->flags);
1305 	atomic_inc(&ctx->pending);
1306 	submit_bio(bio);
1307 }
1308 
1309 static void drbd_flush(struct drbd_connection *connection)
1310 {
1311 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1312 		struct drbd_peer_device *peer_device;
1313 		struct issue_flush_context ctx;
1314 		int vnr;
1315 
1316 		atomic_set(&ctx.pending, 1);
1317 		ctx.error = 0;
1318 		init_completion(&ctx.done);
1319 
1320 		rcu_read_lock();
1321 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1322 			struct drbd_device *device = peer_device->device;
1323 
1324 			if (!get_ldev(device))
1325 				continue;
1326 			kref_get(&device->kref);
1327 			rcu_read_unlock();
1328 
1329 			submit_one_flush(device, &ctx);
1330 
1331 			rcu_read_lock();
1332 		}
1333 		rcu_read_unlock();
1334 
1335 		/* Do we want to add a timeout,
1336 		 * if disk-timeout is set? */
1337 		if (!atomic_dec_and_test(&ctx.pending))
1338 			wait_for_completion(&ctx.done);
1339 
1340 		if (ctx.error) {
1341 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1342 			 * don't try again for ANY return value != 0
1343 			 * if (rv == -EOPNOTSUPP) */
1344 			/* Any error is already reported by bio_endio callback. */
1345 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1346 		}
1347 	}
1348 }
1349 
1350 /**
1351  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1352  * @device:	DRBD device.
1353  * @epoch:	Epoch object.
1354  * @ev:		Epoch event.
1355  */
1356 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1357 					       struct drbd_epoch *epoch,
1358 					       enum epoch_event ev)
1359 {
1360 	int epoch_size;
1361 	struct drbd_epoch *next_epoch;
1362 	enum finish_epoch rv = FE_STILL_LIVE;
1363 
1364 	spin_lock(&connection->epoch_lock);
1365 	do {
1366 		next_epoch = NULL;
1367 
1368 		epoch_size = atomic_read(&epoch->epoch_size);
1369 
1370 		switch (ev & ~EV_CLEANUP) {
1371 		case EV_PUT:
1372 			atomic_dec(&epoch->active);
1373 			break;
1374 		case EV_GOT_BARRIER_NR:
1375 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1376 			break;
1377 		case EV_BECAME_LAST:
1378 			/* nothing to do*/
1379 			break;
1380 		}
1381 
1382 		if (epoch_size != 0 &&
1383 		    atomic_read(&epoch->active) == 0 &&
1384 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1385 			if (!(ev & EV_CLEANUP)) {
1386 				spin_unlock(&connection->epoch_lock);
1387 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1388 				spin_lock(&connection->epoch_lock);
1389 			}
1390 #if 0
1391 			/* FIXME: dec unacked on connection, once we have
1392 			 * something to count pending connection packets in. */
1393 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1394 				dec_unacked(epoch->connection);
1395 #endif
1396 
1397 			if (connection->current_epoch != epoch) {
1398 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1399 				list_del(&epoch->list);
1400 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1401 				connection->epochs--;
1402 				kfree(epoch);
1403 
1404 				if (rv == FE_STILL_LIVE)
1405 					rv = FE_DESTROYED;
1406 			} else {
1407 				epoch->flags = 0;
1408 				atomic_set(&epoch->epoch_size, 0);
1409 				/* atomic_set(&epoch->active, 0); is already zero */
1410 				if (rv == FE_STILL_LIVE)
1411 					rv = FE_RECYCLED;
1412 			}
1413 		}
1414 
1415 		if (!next_epoch)
1416 			break;
1417 
1418 		epoch = next_epoch;
1419 	} while (1);
1420 
1421 	spin_unlock(&connection->epoch_lock);
1422 
1423 	return rv;
1424 }
1425 
1426 static enum write_ordering_e
1427 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1428 {
1429 	struct disk_conf *dc;
1430 
1431 	dc = rcu_dereference(bdev->disk_conf);
1432 
1433 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1434 		wo = WO_DRAIN_IO;
1435 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1436 		wo = WO_NONE;
1437 
1438 	return wo;
1439 }
1440 
1441 /**
1442  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1443  * @connection:	DRBD connection.
1444  * @wo:		Write ordering method to try.
1445  */
1446 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1447 			      enum write_ordering_e wo)
1448 {
1449 	struct drbd_device *device;
1450 	enum write_ordering_e pwo;
1451 	int vnr;
1452 	static char *write_ordering_str[] = {
1453 		[WO_NONE] = "none",
1454 		[WO_DRAIN_IO] = "drain",
1455 		[WO_BDEV_FLUSH] = "flush",
1456 	};
1457 
1458 	pwo = resource->write_ordering;
1459 	if (wo != WO_BDEV_FLUSH)
1460 		wo = min(pwo, wo);
1461 	rcu_read_lock();
1462 	idr_for_each_entry(&resource->devices, device, vnr) {
1463 		if (get_ldev(device)) {
1464 			wo = max_allowed_wo(device->ldev, wo);
1465 			if (device->ldev == bdev)
1466 				bdev = NULL;
1467 			put_ldev(device);
1468 		}
1469 	}
1470 
1471 	if (bdev)
1472 		wo = max_allowed_wo(bdev, wo);
1473 
1474 	rcu_read_unlock();
1475 
1476 	resource->write_ordering = wo;
1477 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1478 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1479 }
1480 
1481 /*
1482  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1483  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1484  * will directly go to fallback mode, submitting normal writes, and
1485  * never even try to UNMAP.
1486  *
1487  * And dm-thin does not do this (yet), mostly because in general it has
1488  * to assume that "skip_block_zeroing" is set.  See also:
1489  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1490  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1491  *
1492  * We *may* ignore the discard-zeroes-data setting, if so configured.
1493  *
1494  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1495  * may ignore partial unaligned discards.
1496  *
1497  * LVM/DM thin as of at least
1498  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1499  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1500  *   Driver version:  4.29.0
1501  * still behaves this way.
1502  *
1503  * For unaligned (wrt. alignment and granularity) or too small discards,
1504  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1505  * but discard all the aligned full chunks.
1506  *
1507  * At least for LVM/DM thin, with skip_block_zeroing=false,
1508  * the result is effectively "discard_zeroes_data=1".
1509  */
1510 /* flags: EE_TRIM|EE_ZEROOUT */
1511 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1512 {
1513 	struct block_device *bdev = device->ldev->backing_bdev;
1514 	struct request_queue *q = bdev_get_queue(bdev);
1515 	sector_t tmp, nr;
1516 	unsigned int max_discard_sectors, granularity;
1517 	int alignment;
1518 	int err = 0;
1519 
1520 	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1521 		goto zero_out;
1522 
1523 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1524 	granularity = max(q->limits.discard_granularity >> 9, 1U);
1525 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1526 
1527 	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1528 	max_discard_sectors -= max_discard_sectors % granularity;
1529 	if (unlikely(!max_discard_sectors))
1530 		goto zero_out;
1531 
1532 	if (nr_sectors < granularity)
1533 		goto zero_out;
1534 
1535 	tmp = start;
1536 	if (sector_div(tmp, granularity) != alignment) {
1537 		if (nr_sectors < 2*granularity)
1538 			goto zero_out;
1539 		/* start + gran - (start + gran - align) % gran */
1540 		tmp = start + granularity - alignment;
1541 		tmp = start + granularity - sector_div(tmp, granularity);
1542 
1543 		nr = tmp - start;
1544 		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1545 		 * layers are below us, some may have smaller granularity */
1546 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1547 		nr_sectors -= nr;
1548 		start = tmp;
1549 	}
1550 	while (nr_sectors >= max_discard_sectors) {
1551 		err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1552 		nr_sectors -= max_discard_sectors;
1553 		start += max_discard_sectors;
1554 	}
1555 	if (nr_sectors) {
1556 		/* max_discard_sectors is unsigned int (and a multiple of
1557 		 * granularity, we made sure of that above already);
1558 		 * nr is < max_discard_sectors;
1559 		 * I don't need sector_div here, even though nr is sector_t */
1560 		nr = nr_sectors;
1561 		nr -= (unsigned int)nr % granularity;
1562 		if (nr) {
1563 			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1564 			nr_sectors -= nr;
1565 			start += nr;
1566 		}
1567 	}
1568  zero_out:
1569 	if (nr_sectors) {
1570 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1571 				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1572 	}
1573 	return err != 0;
1574 }
1575 
1576 static bool can_do_reliable_discards(struct drbd_device *device)
1577 {
1578 	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1579 	struct disk_conf *dc;
1580 	bool can_do;
1581 
1582 	if (!blk_queue_discard(q))
1583 		return false;
1584 
1585 	rcu_read_lock();
1586 	dc = rcu_dereference(device->ldev->disk_conf);
1587 	can_do = dc->discard_zeroes_if_aligned;
1588 	rcu_read_unlock();
1589 	return can_do;
1590 }
1591 
1592 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1593 {
1594 	/* If the backend cannot discard, or does not guarantee
1595 	 * read-back zeroes in discarded ranges, we fall back to
1596 	 * zero-out.  Unless configuration specifically requested
1597 	 * otherwise. */
1598 	if (!can_do_reliable_discards(device))
1599 		peer_req->flags |= EE_ZEROOUT;
1600 
1601 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1602 	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1603 		peer_req->flags |= EE_WAS_ERROR;
1604 	drbd_endio_write_sec_final(peer_req);
1605 }
1606 
1607 static void drbd_issue_peer_wsame(struct drbd_device *device,
1608 				  struct drbd_peer_request *peer_req)
1609 {
1610 	struct block_device *bdev = device->ldev->backing_bdev;
1611 	sector_t s = peer_req->i.sector;
1612 	sector_t nr = peer_req->i.size >> 9;
1613 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1614 		peer_req->flags |= EE_WAS_ERROR;
1615 	drbd_endio_write_sec_final(peer_req);
1616 }
1617 
1618 
1619 /**
1620  * drbd_submit_peer_request()
1621  * @device:	DRBD device.
1622  * @peer_req:	peer request
1623  * @rw:		flag field, see bio->bi_opf
1624  *
1625  * May spread the pages to multiple bios,
1626  * depending on bio_add_page restrictions.
1627  *
1628  * Returns 0 if all bios have been submitted,
1629  * -ENOMEM if we could not allocate enough bios,
1630  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1631  *  single page to an empty bio (which should never happen and likely indicates
1632  *  that the lower level IO stack is in some way broken). This has been observed
1633  *  on certain Xen deployments.
1634  */
1635 /* TODO allocate from our own bio_set. */
1636 int drbd_submit_peer_request(struct drbd_device *device,
1637 			     struct drbd_peer_request *peer_req,
1638 			     const unsigned op, const unsigned op_flags,
1639 			     const int fault_type)
1640 {
1641 	struct bio *bios = NULL;
1642 	struct bio *bio;
1643 	struct page *page = peer_req->pages;
1644 	sector_t sector = peer_req->i.sector;
1645 	unsigned data_size = peer_req->i.size;
1646 	unsigned n_bios = 0;
1647 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1648 	int err = -ENOMEM;
1649 
1650 	/* TRIM/DISCARD: for now, always use the helper function
1651 	 * blkdev_issue_zeroout(..., discard=true).
1652 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1653 	 * Correctness first, performance later.  Next step is to code an
1654 	 * asynchronous variant of the same.
1655 	 */
1656 	if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1657 		/* wait for all pending IO completions, before we start
1658 		 * zeroing things out. */
1659 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1660 		/* add it to the active list now,
1661 		 * so we can find it to present it in debugfs */
1662 		peer_req->submit_jif = jiffies;
1663 		peer_req->flags |= EE_SUBMITTED;
1664 
1665 		/* If this was a resync request from receive_rs_deallocated(),
1666 		 * it is already on the sync_ee list */
1667 		if (list_empty(&peer_req->w.list)) {
1668 			spin_lock_irq(&device->resource->req_lock);
1669 			list_add_tail(&peer_req->w.list, &device->active_ee);
1670 			spin_unlock_irq(&device->resource->req_lock);
1671 		}
1672 
1673 		if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1674 			drbd_issue_peer_discard_or_zero_out(device, peer_req);
1675 		else /* EE_WRITE_SAME */
1676 			drbd_issue_peer_wsame(device, peer_req);
1677 		return 0;
1678 	}
1679 
1680 	/* In most cases, we will only need one bio.  But in case the lower
1681 	 * level restrictions happen to be different at this offset on this
1682 	 * side than those of the sending peer, we may need to submit the
1683 	 * request in more than one bio.
1684 	 *
1685 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1686 	 * generated bio, but a bio allocated on behalf of the peer.
1687 	 */
1688 next_bio:
1689 	bio = bio_alloc(GFP_NOIO, nr_pages);
1690 	if (!bio) {
1691 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1692 		goto fail;
1693 	}
1694 	/* > peer_req->i.sector, unless this is the first bio */
1695 	bio->bi_iter.bi_sector = sector;
1696 	bio_set_dev(bio, device->ldev->backing_bdev);
1697 	bio_set_op_attrs(bio, op, op_flags);
1698 	bio->bi_private = peer_req;
1699 	bio->bi_end_io = drbd_peer_request_endio;
1700 
1701 	bio->bi_next = bios;
1702 	bios = bio;
1703 	++n_bios;
1704 
1705 	page_chain_for_each(page) {
1706 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1707 		if (!bio_add_page(bio, page, len, 0))
1708 			goto next_bio;
1709 		data_size -= len;
1710 		sector += len >> 9;
1711 		--nr_pages;
1712 	}
1713 	D_ASSERT(device, data_size == 0);
1714 	D_ASSERT(device, page == NULL);
1715 
1716 	atomic_set(&peer_req->pending_bios, n_bios);
1717 	/* for debugfs: update timestamp, mark as submitted */
1718 	peer_req->submit_jif = jiffies;
1719 	peer_req->flags |= EE_SUBMITTED;
1720 	do {
1721 		bio = bios;
1722 		bios = bios->bi_next;
1723 		bio->bi_next = NULL;
1724 
1725 		drbd_generic_make_request(device, fault_type, bio);
1726 	} while (bios);
1727 	return 0;
1728 
1729 fail:
1730 	while (bios) {
1731 		bio = bios;
1732 		bios = bios->bi_next;
1733 		bio_put(bio);
1734 	}
1735 	return err;
1736 }
1737 
1738 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1739 					     struct drbd_peer_request *peer_req)
1740 {
1741 	struct drbd_interval *i = &peer_req->i;
1742 
1743 	drbd_remove_interval(&device->write_requests, i);
1744 	drbd_clear_interval(i);
1745 
1746 	/* Wake up any processes waiting for this peer request to complete.  */
1747 	if (i->waiting)
1748 		wake_up(&device->misc_wait);
1749 }
1750 
1751 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1752 {
1753 	struct drbd_peer_device *peer_device;
1754 	int vnr;
1755 
1756 	rcu_read_lock();
1757 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1758 		struct drbd_device *device = peer_device->device;
1759 
1760 		kref_get(&device->kref);
1761 		rcu_read_unlock();
1762 		drbd_wait_ee_list_empty(device, &device->active_ee);
1763 		kref_put(&device->kref, drbd_destroy_device);
1764 		rcu_read_lock();
1765 	}
1766 	rcu_read_unlock();
1767 }
1768 
1769 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1770 {
1771 	int rv;
1772 	struct p_barrier *p = pi->data;
1773 	struct drbd_epoch *epoch;
1774 
1775 	/* FIXME these are unacked on connection,
1776 	 * not a specific (peer)device.
1777 	 */
1778 	connection->current_epoch->barrier_nr = p->barrier;
1779 	connection->current_epoch->connection = connection;
1780 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1781 
1782 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1783 	 * the activity log, which means it would not be resynced in case the
1784 	 * R_PRIMARY crashes now.
1785 	 * Therefore we must send the barrier_ack after the barrier request was
1786 	 * completed. */
1787 	switch (connection->resource->write_ordering) {
1788 	case WO_NONE:
1789 		if (rv == FE_RECYCLED)
1790 			return 0;
1791 
1792 		/* receiver context, in the writeout path of the other node.
1793 		 * avoid potential distributed deadlock */
1794 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1795 		if (epoch)
1796 			break;
1797 		else
1798 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1799 			/* Fall through */
1800 
1801 	case WO_BDEV_FLUSH:
1802 	case WO_DRAIN_IO:
1803 		conn_wait_active_ee_empty(connection);
1804 		drbd_flush(connection);
1805 
1806 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1807 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1808 			if (epoch)
1809 				break;
1810 		}
1811 
1812 		return 0;
1813 	default:
1814 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1815 			 connection->resource->write_ordering);
1816 		return -EIO;
1817 	}
1818 
1819 	epoch->flags = 0;
1820 	atomic_set(&epoch->epoch_size, 0);
1821 	atomic_set(&epoch->active, 0);
1822 
1823 	spin_lock(&connection->epoch_lock);
1824 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1825 		list_add(&epoch->list, &connection->current_epoch->list);
1826 		connection->current_epoch = epoch;
1827 		connection->epochs++;
1828 	} else {
1829 		/* The current_epoch got recycled while we allocated this one... */
1830 		kfree(epoch);
1831 	}
1832 	spin_unlock(&connection->epoch_lock);
1833 
1834 	return 0;
1835 }
1836 
1837 /* quick wrapper in case payload size != request_size (write same) */
1838 static void drbd_csum_ee_size(struct crypto_shash *h,
1839 			      struct drbd_peer_request *r, void *d,
1840 			      unsigned int payload_size)
1841 {
1842 	unsigned int tmp = r->i.size;
1843 	r->i.size = payload_size;
1844 	drbd_csum_ee(h, r, d);
1845 	r->i.size = tmp;
1846 }
1847 
1848 /* used from receive_RSDataReply (recv_resync_read)
1849  * and from receive_Data.
1850  * data_size: actual payload ("data in")
1851  * 	for normal writes that is bi_size.
1852  * 	for discards, that is zero.
1853  * 	for write same, it is logical_block_size.
1854  * both trim and write same have the bi_size ("data len to be affected")
1855  * as extra argument in the packet header.
1856  */
1857 static struct drbd_peer_request *
1858 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1859 	      struct packet_info *pi) __must_hold(local)
1860 {
1861 	struct drbd_device *device = peer_device->device;
1862 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1863 	struct drbd_peer_request *peer_req;
1864 	struct page *page;
1865 	int digest_size, err;
1866 	unsigned int data_size = pi->size, ds;
1867 	void *dig_in = peer_device->connection->int_dig_in;
1868 	void *dig_vv = peer_device->connection->int_dig_vv;
1869 	unsigned long *data;
1870 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1871 	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1872 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1873 
1874 	digest_size = 0;
1875 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1876 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1877 		/*
1878 		 * FIXME: Receive the incoming digest into the receive buffer
1879 		 *	  here, together with its struct p_data?
1880 		 */
1881 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1882 		if (err)
1883 			return NULL;
1884 		data_size -= digest_size;
1885 	}
1886 
1887 	/* assume request_size == data_size, but special case trim and wsame. */
1888 	ds = data_size;
1889 	if (trim) {
1890 		if (!expect(data_size == 0))
1891 			return NULL;
1892 		ds = be32_to_cpu(trim->size);
1893 	} else if (zeroes) {
1894 		if (!expect(data_size == 0))
1895 			return NULL;
1896 		ds = be32_to_cpu(zeroes->size);
1897 	} else if (wsame) {
1898 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1899 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1900 				data_size, queue_logical_block_size(device->rq_queue));
1901 			return NULL;
1902 		}
1903 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1904 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1905 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1906 			return NULL;
1907 		}
1908 		ds = be32_to_cpu(wsame->size);
1909 	}
1910 
1911 	if (!expect(IS_ALIGNED(ds, 512)))
1912 		return NULL;
1913 	if (trim || wsame || zeroes) {
1914 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1915 			return NULL;
1916 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1917 		return NULL;
1918 
1919 	/* even though we trust out peer,
1920 	 * we sometimes have to double check. */
1921 	if (sector + (ds>>9) > capacity) {
1922 		drbd_err(device, "request from peer beyond end of local disk: "
1923 			"capacity: %llus < sector: %llus + size: %u\n",
1924 			(unsigned long long)capacity,
1925 			(unsigned long long)sector, ds);
1926 		return NULL;
1927 	}
1928 
1929 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1930 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1931 	 * which in turn might block on the other node at this very place.  */
1932 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1933 	if (!peer_req)
1934 		return NULL;
1935 
1936 	peer_req->flags |= EE_WRITE;
1937 	if (trim) {
1938 		peer_req->flags |= EE_TRIM;
1939 		return peer_req;
1940 	}
1941 	if (zeroes) {
1942 		peer_req->flags |= EE_ZEROOUT;
1943 		return peer_req;
1944 	}
1945 	if (wsame)
1946 		peer_req->flags |= EE_WRITE_SAME;
1947 
1948 	/* receive payload size bytes into page chain */
1949 	ds = data_size;
1950 	page = peer_req->pages;
1951 	page_chain_for_each(page) {
1952 		unsigned len = min_t(int, ds, PAGE_SIZE);
1953 		data = kmap(page);
1954 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1955 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1956 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1957 			data[0] = data[0] ^ (unsigned long)-1;
1958 		}
1959 		kunmap(page);
1960 		if (err) {
1961 			drbd_free_peer_req(device, peer_req);
1962 			return NULL;
1963 		}
1964 		ds -= len;
1965 	}
1966 
1967 	if (digest_size) {
1968 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1969 		if (memcmp(dig_in, dig_vv, digest_size)) {
1970 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1971 				(unsigned long long)sector, data_size);
1972 			drbd_free_peer_req(device, peer_req);
1973 			return NULL;
1974 		}
1975 	}
1976 	device->recv_cnt += data_size >> 9;
1977 	return peer_req;
1978 }
1979 
1980 /* drbd_drain_block() just takes a data block
1981  * out of the socket input buffer, and discards it.
1982  */
1983 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1984 {
1985 	struct page *page;
1986 	int err = 0;
1987 	void *data;
1988 
1989 	if (!data_size)
1990 		return 0;
1991 
1992 	page = drbd_alloc_pages(peer_device, 1, 1);
1993 
1994 	data = kmap(page);
1995 	while (data_size) {
1996 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1997 
1998 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1999 		if (err)
2000 			break;
2001 		data_size -= len;
2002 	}
2003 	kunmap(page);
2004 	drbd_free_pages(peer_device->device, page, 0);
2005 	return err;
2006 }
2007 
2008 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2009 			   sector_t sector, int data_size)
2010 {
2011 	struct bio_vec bvec;
2012 	struct bvec_iter iter;
2013 	struct bio *bio;
2014 	int digest_size, err, expect;
2015 	void *dig_in = peer_device->connection->int_dig_in;
2016 	void *dig_vv = peer_device->connection->int_dig_vv;
2017 
2018 	digest_size = 0;
2019 	if (peer_device->connection->peer_integrity_tfm) {
2020 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2021 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2022 		if (err)
2023 			return err;
2024 		data_size -= digest_size;
2025 	}
2026 
2027 	/* optimistically update recv_cnt.  if receiving fails below,
2028 	 * we disconnect anyways, and counters will be reset. */
2029 	peer_device->device->recv_cnt += data_size>>9;
2030 
2031 	bio = req->master_bio;
2032 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2033 
2034 	bio_for_each_segment(bvec, bio, iter) {
2035 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2036 		expect = min_t(int, data_size, bvec.bv_len);
2037 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2038 		kunmap(bvec.bv_page);
2039 		if (err)
2040 			return err;
2041 		data_size -= expect;
2042 	}
2043 
2044 	if (digest_size) {
2045 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2046 		if (memcmp(dig_in, dig_vv, digest_size)) {
2047 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2048 			return -EINVAL;
2049 		}
2050 	}
2051 
2052 	D_ASSERT(peer_device->device, data_size == 0);
2053 	return 0;
2054 }
2055 
2056 /*
2057  * e_end_resync_block() is called in ack_sender context via
2058  * drbd_finish_peer_reqs().
2059  */
2060 static int e_end_resync_block(struct drbd_work *w, int unused)
2061 {
2062 	struct drbd_peer_request *peer_req =
2063 		container_of(w, struct drbd_peer_request, w);
2064 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2065 	struct drbd_device *device = peer_device->device;
2066 	sector_t sector = peer_req->i.sector;
2067 	int err;
2068 
2069 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2070 
2071 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2072 		drbd_set_in_sync(device, sector, peer_req->i.size);
2073 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2074 	} else {
2075 		/* Record failure to sync */
2076 		drbd_rs_failed_io(device, sector, peer_req->i.size);
2077 
2078 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2079 	}
2080 	dec_unacked(device);
2081 
2082 	return err;
2083 }
2084 
2085 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2086 			    struct packet_info *pi) __releases(local)
2087 {
2088 	struct drbd_device *device = peer_device->device;
2089 	struct drbd_peer_request *peer_req;
2090 
2091 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2092 	if (!peer_req)
2093 		goto fail;
2094 
2095 	dec_rs_pending(device);
2096 
2097 	inc_unacked(device);
2098 	/* corresponding dec_unacked() in e_end_resync_block()
2099 	 * respective _drbd_clear_done_ee */
2100 
2101 	peer_req->w.cb = e_end_resync_block;
2102 	peer_req->submit_jif = jiffies;
2103 
2104 	spin_lock_irq(&device->resource->req_lock);
2105 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2106 	spin_unlock_irq(&device->resource->req_lock);
2107 
2108 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2109 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2110 				     DRBD_FAULT_RS_WR) == 0)
2111 		return 0;
2112 
2113 	/* don't care for the reason here */
2114 	drbd_err(device, "submit failed, triggering re-connect\n");
2115 	spin_lock_irq(&device->resource->req_lock);
2116 	list_del(&peer_req->w.list);
2117 	spin_unlock_irq(&device->resource->req_lock);
2118 
2119 	drbd_free_peer_req(device, peer_req);
2120 fail:
2121 	put_ldev(device);
2122 	return -EIO;
2123 }
2124 
2125 static struct drbd_request *
2126 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2127 	     sector_t sector, bool missing_ok, const char *func)
2128 {
2129 	struct drbd_request *req;
2130 
2131 	/* Request object according to our peer */
2132 	req = (struct drbd_request *)(unsigned long)id;
2133 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2134 		return req;
2135 	if (!missing_ok) {
2136 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2137 			(unsigned long)id, (unsigned long long)sector);
2138 	}
2139 	return NULL;
2140 }
2141 
2142 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2143 {
2144 	struct drbd_peer_device *peer_device;
2145 	struct drbd_device *device;
2146 	struct drbd_request *req;
2147 	sector_t sector;
2148 	int err;
2149 	struct p_data *p = pi->data;
2150 
2151 	peer_device = conn_peer_device(connection, pi->vnr);
2152 	if (!peer_device)
2153 		return -EIO;
2154 	device = peer_device->device;
2155 
2156 	sector = be64_to_cpu(p->sector);
2157 
2158 	spin_lock_irq(&device->resource->req_lock);
2159 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2160 	spin_unlock_irq(&device->resource->req_lock);
2161 	if (unlikely(!req))
2162 		return -EIO;
2163 
2164 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2165 	 * special casing it there for the various failure cases.
2166 	 * still no race with drbd_fail_pending_reads */
2167 	err = recv_dless_read(peer_device, req, sector, pi->size);
2168 	if (!err)
2169 		req_mod(req, DATA_RECEIVED);
2170 	/* else: nothing. handled from drbd_disconnect...
2171 	 * I don't think we may complete this just yet
2172 	 * in case we are "on-disconnect: freeze" */
2173 
2174 	return err;
2175 }
2176 
2177 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2178 {
2179 	struct drbd_peer_device *peer_device;
2180 	struct drbd_device *device;
2181 	sector_t sector;
2182 	int err;
2183 	struct p_data *p = pi->data;
2184 
2185 	peer_device = conn_peer_device(connection, pi->vnr);
2186 	if (!peer_device)
2187 		return -EIO;
2188 	device = peer_device->device;
2189 
2190 	sector = be64_to_cpu(p->sector);
2191 	D_ASSERT(device, p->block_id == ID_SYNCER);
2192 
2193 	if (get_ldev(device)) {
2194 		/* data is submitted to disk within recv_resync_read.
2195 		 * corresponding put_ldev done below on error,
2196 		 * or in drbd_peer_request_endio. */
2197 		err = recv_resync_read(peer_device, sector, pi);
2198 	} else {
2199 		if (__ratelimit(&drbd_ratelimit_state))
2200 			drbd_err(device, "Can not write resync data to local disk.\n");
2201 
2202 		err = drbd_drain_block(peer_device, pi->size);
2203 
2204 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2205 	}
2206 
2207 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2208 
2209 	return err;
2210 }
2211 
2212 static void restart_conflicting_writes(struct drbd_device *device,
2213 				       sector_t sector, int size)
2214 {
2215 	struct drbd_interval *i;
2216 	struct drbd_request *req;
2217 
2218 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2219 		if (!i->local)
2220 			continue;
2221 		req = container_of(i, struct drbd_request, i);
2222 		if (req->rq_state & RQ_LOCAL_PENDING ||
2223 		    !(req->rq_state & RQ_POSTPONED))
2224 			continue;
2225 		/* as it is RQ_POSTPONED, this will cause it to
2226 		 * be queued on the retry workqueue. */
2227 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2228 	}
2229 }
2230 
2231 /*
2232  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2233  */
2234 static int e_end_block(struct drbd_work *w, int cancel)
2235 {
2236 	struct drbd_peer_request *peer_req =
2237 		container_of(w, struct drbd_peer_request, w);
2238 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2239 	struct drbd_device *device = peer_device->device;
2240 	sector_t sector = peer_req->i.sector;
2241 	int err = 0, pcmd;
2242 
2243 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2244 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2245 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2246 				device->state.conn <= C_PAUSED_SYNC_T &&
2247 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2248 				P_RS_WRITE_ACK : P_WRITE_ACK;
2249 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2250 			if (pcmd == P_RS_WRITE_ACK)
2251 				drbd_set_in_sync(device, sector, peer_req->i.size);
2252 		} else {
2253 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2254 			/* we expect it to be marked out of sync anyways...
2255 			 * maybe assert this?  */
2256 		}
2257 		dec_unacked(device);
2258 	}
2259 
2260 	/* we delete from the conflict detection hash _after_ we sent out the
2261 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2262 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2263 		spin_lock_irq(&device->resource->req_lock);
2264 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2265 		drbd_remove_epoch_entry_interval(device, peer_req);
2266 		if (peer_req->flags & EE_RESTART_REQUESTS)
2267 			restart_conflicting_writes(device, sector, peer_req->i.size);
2268 		spin_unlock_irq(&device->resource->req_lock);
2269 	} else
2270 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2271 
2272 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2273 
2274 	return err;
2275 }
2276 
2277 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2278 {
2279 	struct drbd_peer_request *peer_req =
2280 		container_of(w, struct drbd_peer_request, w);
2281 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2282 	int err;
2283 
2284 	err = drbd_send_ack(peer_device, ack, peer_req);
2285 	dec_unacked(peer_device->device);
2286 
2287 	return err;
2288 }
2289 
2290 static int e_send_superseded(struct drbd_work *w, int unused)
2291 {
2292 	return e_send_ack(w, P_SUPERSEDED);
2293 }
2294 
2295 static int e_send_retry_write(struct drbd_work *w, int unused)
2296 {
2297 	struct drbd_peer_request *peer_req =
2298 		container_of(w, struct drbd_peer_request, w);
2299 	struct drbd_connection *connection = peer_req->peer_device->connection;
2300 
2301 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2302 			     P_RETRY_WRITE : P_SUPERSEDED);
2303 }
2304 
2305 static bool seq_greater(u32 a, u32 b)
2306 {
2307 	/*
2308 	 * We assume 32-bit wrap-around here.
2309 	 * For 24-bit wrap-around, we would have to shift:
2310 	 *  a <<= 8; b <<= 8;
2311 	 */
2312 	return (s32)a - (s32)b > 0;
2313 }
2314 
2315 static u32 seq_max(u32 a, u32 b)
2316 {
2317 	return seq_greater(a, b) ? a : b;
2318 }
2319 
2320 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2321 {
2322 	struct drbd_device *device = peer_device->device;
2323 	unsigned int newest_peer_seq;
2324 
2325 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2326 		spin_lock(&device->peer_seq_lock);
2327 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2328 		device->peer_seq = newest_peer_seq;
2329 		spin_unlock(&device->peer_seq_lock);
2330 		/* wake up only if we actually changed device->peer_seq */
2331 		if (peer_seq == newest_peer_seq)
2332 			wake_up(&device->seq_wait);
2333 	}
2334 }
2335 
2336 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2337 {
2338 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2339 }
2340 
2341 /* maybe change sync_ee into interval trees as well? */
2342 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2343 {
2344 	struct drbd_peer_request *rs_req;
2345 	bool rv = false;
2346 
2347 	spin_lock_irq(&device->resource->req_lock);
2348 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2349 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2350 			     rs_req->i.sector, rs_req->i.size)) {
2351 			rv = true;
2352 			break;
2353 		}
2354 	}
2355 	spin_unlock_irq(&device->resource->req_lock);
2356 
2357 	return rv;
2358 }
2359 
2360 /* Called from receive_Data.
2361  * Synchronize packets on sock with packets on msock.
2362  *
2363  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2364  * packet traveling on msock, they are still processed in the order they have
2365  * been sent.
2366  *
2367  * Note: we don't care for Ack packets overtaking P_DATA packets.
2368  *
2369  * In case packet_seq is larger than device->peer_seq number, there are
2370  * outstanding packets on the msock. We wait for them to arrive.
2371  * In case we are the logically next packet, we update device->peer_seq
2372  * ourselves. Correctly handles 32bit wrap around.
2373  *
2374  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2375  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2376  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2377  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2378  *
2379  * returns 0 if we may process the packet,
2380  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2381 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2382 {
2383 	struct drbd_device *device = peer_device->device;
2384 	DEFINE_WAIT(wait);
2385 	long timeout;
2386 	int ret = 0, tp;
2387 
2388 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2389 		return 0;
2390 
2391 	spin_lock(&device->peer_seq_lock);
2392 	for (;;) {
2393 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2394 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2395 			break;
2396 		}
2397 
2398 		if (signal_pending(current)) {
2399 			ret = -ERESTARTSYS;
2400 			break;
2401 		}
2402 
2403 		rcu_read_lock();
2404 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2405 		rcu_read_unlock();
2406 
2407 		if (!tp)
2408 			break;
2409 
2410 		/* Only need to wait if two_primaries is enabled */
2411 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2412 		spin_unlock(&device->peer_seq_lock);
2413 		rcu_read_lock();
2414 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2415 		rcu_read_unlock();
2416 		timeout = schedule_timeout(timeout);
2417 		spin_lock(&device->peer_seq_lock);
2418 		if (!timeout) {
2419 			ret = -ETIMEDOUT;
2420 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2421 			break;
2422 		}
2423 	}
2424 	spin_unlock(&device->peer_seq_lock);
2425 	finish_wait(&device->seq_wait, &wait);
2426 	return ret;
2427 }
2428 
2429 /* see also bio_flags_to_wire()
2430  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2431  * flags and back. We may replicate to other kernel versions. */
2432 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2433 {
2434 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2435 		(dpf & DP_FUA ? REQ_FUA : 0) |
2436 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2437 }
2438 
2439 static unsigned long wire_flags_to_bio_op(u32 dpf)
2440 {
2441 	if (dpf & DP_ZEROES)
2442 		return REQ_OP_WRITE_ZEROES;
2443 	if (dpf & DP_DISCARD)
2444 		return REQ_OP_DISCARD;
2445 	if (dpf & DP_WSAME)
2446 		return REQ_OP_WRITE_SAME;
2447 	else
2448 		return REQ_OP_WRITE;
2449 }
2450 
2451 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2452 				    unsigned int size)
2453 {
2454 	struct drbd_interval *i;
2455 
2456     repeat:
2457 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2458 		struct drbd_request *req;
2459 		struct bio_and_error m;
2460 
2461 		if (!i->local)
2462 			continue;
2463 		req = container_of(i, struct drbd_request, i);
2464 		if (!(req->rq_state & RQ_POSTPONED))
2465 			continue;
2466 		req->rq_state &= ~RQ_POSTPONED;
2467 		__req_mod(req, NEG_ACKED, &m);
2468 		spin_unlock_irq(&device->resource->req_lock);
2469 		if (m.bio)
2470 			complete_master_bio(device, &m);
2471 		spin_lock_irq(&device->resource->req_lock);
2472 		goto repeat;
2473 	}
2474 }
2475 
2476 static int handle_write_conflicts(struct drbd_device *device,
2477 				  struct drbd_peer_request *peer_req)
2478 {
2479 	struct drbd_connection *connection = peer_req->peer_device->connection;
2480 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2481 	sector_t sector = peer_req->i.sector;
2482 	const unsigned int size = peer_req->i.size;
2483 	struct drbd_interval *i;
2484 	bool equal;
2485 	int err;
2486 
2487 	/*
2488 	 * Inserting the peer request into the write_requests tree will prevent
2489 	 * new conflicting local requests from being added.
2490 	 */
2491 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2492 
2493     repeat:
2494 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2495 		if (i == &peer_req->i)
2496 			continue;
2497 		if (i->completed)
2498 			continue;
2499 
2500 		if (!i->local) {
2501 			/*
2502 			 * Our peer has sent a conflicting remote request; this
2503 			 * should not happen in a two-node setup.  Wait for the
2504 			 * earlier peer request to complete.
2505 			 */
2506 			err = drbd_wait_misc(device, i);
2507 			if (err)
2508 				goto out;
2509 			goto repeat;
2510 		}
2511 
2512 		equal = i->sector == sector && i->size == size;
2513 		if (resolve_conflicts) {
2514 			/*
2515 			 * If the peer request is fully contained within the
2516 			 * overlapping request, it can be considered overwritten
2517 			 * and thus superseded; otherwise, it will be retried
2518 			 * once all overlapping requests have completed.
2519 			 */
2520 			bool superseded = i->sector <= sector && i->sector +
2521 				       (i->size >> 9) >= sector + (size >> 9);
2522 
2523 			if (!equal)
2524 				drbd_alert(device, "Concurrent writes detected: "
2525 					       "local=%llus +%u, remote=%llus +%u, "
2526 					       "assuming %s came first\n",
2527 					  (unsigned long long)i->sector, i->size,
2528 					  (unsigned long long)sector, size,
2529 					  superseded ? "local" : "remote");
2530 
2531 			peer_req->w.cb = superseded ? e_send_superseded :
2532 						   e_send_retry_write;
2533 			list_add_tail(&peer_req->w.list, &device->done_ee);
2534 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2535 
2536 			err = -ENOENT;
2537 			goto out;
2538 		} else {
2539 			struct drbd_request *req =
2540 				container_of(i, struct drbd_request, i);
2541 
2542 			if (!equal)
2543 				drbd_alert(device, "Concurrent writes detected: "
2544 					       "local=%llus +%u, remote=%llus +%u\n",
2545 					  (unsigned long long)i->sector, i->size,
2546 					  (unsigned long long)sector, size);
2547 
2548 			if (req->rq_state & RQ_LOCAL_PENDING ||
2549 			    !(req->rq_state & RQ_POSTPONED)) {
2550 				/*
2551 				 * Wait for the node with the discard flag to
2552 				 * decide if this request has been superseded
2553 				 * or needs to be retried.
2554 				 * Requests that have been superseded will
2555 				 * disappear from the write_requests tree.
2556 				 *
2557 				 * In addition, wait for the conflicting
2558 				 * request to finish locally before submitting
2559 				 * the conflicting peer request.
2560 				 */
2561 				err = drbd_wait_misc(device, &req->i);
2562 				if (err) {
2563 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2564 					fail_postponed_requests(device, sector, size);
2565 					goto out;
2566 				}
2567 				goto repeat;
2568 			}
2569 			/*
2570 			 * Remember to restart the conflicting requests after
2571 			 * the new peer request has completed.
2572 			 */
2573 			peer_req->flags |= EE_RESTART_REQUESTS;
2574 		}
2575 	}
2576 	err = 0;
2577 
2578     out:
2579 	if (err)
2580 		drbd_remove_epoch_entry_interval(device, peer_req);
2581 	return err;
2582 }
2583 
2584 /* mirrored write */
2585 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2586 {
2587 	struct drbd_peer_device *peer_device;
2588 	struct drbd_device *device;
2589 	struct net_conf *nc;
2590 	sector_t sector;
2591 	struct drbd_peer_request *peer_req;
2592 	struct p_data *p = pi->data;
2593 	u32 peer_seq = be32_to_cpu(p->seq_num);
2594 	int op, op_flags;
2595 	u32 dp_flags;
2596 	int err, tp;
2597 
2598 	peer_device = conn_peer_device(connection, pi->vnr);
2599 	if (!peer_device)
2600 		return -EIO;
2601 	device = peer_device->device;
2602 
2603 	if (!get_ldev(device)) {
2604 		int err2;
2605 
2606 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2607 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2608 		atomic_inc(&connection->current_epoch->epoch_size);
2609 		err2 = drbd_drain_block(peer_device, pi->size);
2610 		if (!err)
2611 			err = err2;
2612 		return err;
2613 	}
2614 
2615 	/*
2616 	 * Corresponding put_ldev done either below (on various errors), or in
2617 	 * drbd_peer_request_endio, if we successfully submit the data at the
2618 	 * end of this function.
2619 	 */
2620 
2621 	sector = be64_to_cpu(p->sector);
2622 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2623 	if (!peer_req) {
2624 		put_ldev(device);
2625 		return -EIO;
2626 	}
2627 
2628 	peer_req->w.cb = e_end_block;
2629 	peer_req->submit_jif = jiffies;
2630 	peer_req->flags |= EE_APPLICATION;
2631 
2632 	dp_flags = be32_to_cpu(p->dp_flags);
2633 	op = wire_flags_to_bio_op(dp_flags);
2634 	op_flags = wire_flags_to_bio_flags(dp_flags);
2635 	if (pi->cmd == P_TRIM) {
2636 		D_ASSERT(peer_device, peer_req->i.size > 0);
2637 		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2638 		D_ASSERT(peer_device, peer_req->pages == NULL);
2639 		/* need to play safe: an older DRBD sender
2640 		 * may mean zero-out while sending P_TRIM. */
2641 		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2642 			peer_req->flags |= EE_ZEROOUT;
2643 	} else if (pi->cmd == P_ZEROES) {
2644 		D_ASSERT(peer_device, peer_req->i.size > 0);
2645 		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2646 		D_ASSERT(peer_device, peer_req->pages == NULL);
2647 		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2648 		if (dp_flags & DP_DISCARD)
2649 			peer_req->flags |= EE_TRIM;
2650 	} else if (peer_req->pages == NULL) {
2651 		D_ASSERT(device, peer_req->i.size == 0);
2652 		D_ASSERT(device, dp_flags & DP_FLUSH);
2653 	}
2654 
2655 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2656 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2657 
2658 	spin_lock(&connection->epoch_lock);
2659 	peer_req->epoch = connection->current_epoch;
2660 	atomic_inc(&peer_req->epoch->epoch_size);
2661 	atomic_inc(&peer_req->epoch->active);
2662 	spin_unlock(&connection->epoch_lock);
2663 
2664 	rcu_read_lock();
2665 	nc = rcu_dereference(peer_device->connection->net_conf);
2666 	tp = nc->two_primaries;
2667 	if (peer_device->connection->agreed_pro_version < 100) {
2668 		switch (nc->wire_protocol) {
2669 		case DRBD_PROT_C:
2670 			dp_flags |= DP_SEND_WRITE_ACK;
2671 			break;
2672 		case DRBD_PROT_B:
2673 			dp_flags |= DP_SEND_RECEIVE_ACK;
2674 			break;
2675 		}
2676 	}
2677 	rcu_read_unlock();
2678 
2679 	if (dp_flags & DP_SEND_WRITE_ACK) {
2680 		peer_req->flags |= EE_SEND_WRITE_ACK;
2681 		inc_unacked(device);
2682 		/* corresponding dec_unacked() in e_end_block()
2683 		 * respective _drbd_clear_done_ee */
2684 	}
2685 
2686 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2687 		/* I really don't like it that the receiver thread
2688 		 * sends on the msock, but anyways */
2689 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2690 	}
2691 
2692 	if (tp) {
2693 		/* two primaries implies protocol C */
2694 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2695 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2696 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2697 		if (err)
2698 			goto out_interrupted;
2699 		spin_lock_irq(&device->resource->req_lock);
2700 		err = handle_write_conflicts(device, peer_req);
2701 		if (err) {
2702 			spin_unlock_irq(&device->resource->req_lock);
2703 			if (err == -ENOENT) {
2704 				put_ldev(device);
2705 				return 0;
2706 			}
2707 			goto out_interrupted;
2708 		}
2709 	} else {
2710 		update_peer_seq(peer_device, peer_seq);
2711 		spin_lock_irq(&device->resource->req_lock);
2712 	}
2713 	/* TRIM and WRITE_SAME are processed synchronously,
2714 	 * we wait for all pending requests, respectively wait for
2715 	 * active_ee to become empty in drbd_submit_peer_request();
2716 	 * better not add ourselves here. */
2717 	if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2718 		list_add_tail(&peer_req->w.list, &device->active_ee);
2719 	spin_unlock_irq(&device->resource->req_lock);
2720 
2721 	if (device->state.conn == C_SYNC_TARGET)
2722 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2723 
2724 	if (device->state.pdsk < D_INCONSISTENT) {
2725 		/* In case we have the only disk of the cluster, */
2726 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2727 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2728 		drbd_al_begin_io(device, &peer_req->i);
2729 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2730 	}
2731 
2732 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2733 				       DRBD_FAULT_DT_WR);
2734 	if (!err)
2735 		return 0;
2736 
2737 	/* don't care for the reason here */
2738 	drbd_err(device, "submit failed, triggering re-connect\n");
2739 	spin_lock_irq(&device->resource->req_lock);
2740 	list_del(&peer_req->w.list);
2741 	drbd_remove_epoch_entry_interval(device, peer_req);
2742 	spin_unlock_irq(&device->resource->req_lock);
2743 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2744 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2745 		drbd_al_complete_io(device, &peer_req->i);
2746 	}
2747 
2748 out_interrupted:
2749 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2750 	put_ldev(device);
2751 	drbd_free_peer_req(device, peer_req);
2752 	return err;
2753 }
2754 
2755 /* We may throttle resync, if the lower device seems to be busy,
2756  * and current sync rate is above c_min_rate.
2757  *
2758  * To decide whether or not the lower device is busy, we use a scheme similar
2759  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2760  * (more than 64 sectors) of activity we cannot account for with our own resync
2761  * activity, it obviously is "busy".
2762  *
2763  * The current sync rate used here uses only the most recent two step marks,
2764  * to have a short time average so we can react faster.
2765  */
2766 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2767 		bool throttle_if_app_is_waiting)
2768 {
2769 	struct lc_element *tmp;
2770 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2771 
2772 	if (!throttle || throttle_if_app_is_waiting)
2773 		return throttle;
2774 
2775 	spin_lock_irq(&device->al_lock);
2776 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2777 	if (tmp) {
2778 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2779 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2780 			throttle = false;
2781 		/* Do not slow down if app IO is already waiting for this extent,
2782 		 * and our progress is necessary for application IO to complete. */
2783 	}
2784 	spin_unlock_irq(&device->al_lock);
2785 
2786 	return throttle;
2787 }
2788 
2789 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2790 {
2791 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2792 	unsigned long db, dt, dbdt;
2793 	unsigned int c_min_rate;
2794 	int curr_events;
2795 
2796 	rcu_read_lock();
2797 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2798 	rcu_read_unlock();
2799 
2800 	/* feature disabled? */
2801 	if (c_min_rate == 0)
2802 		return false;
2803 
2804 	curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2805 			atomic_read(&device->rs_sect_ev);
2806 
2807 	if (atomic_read(&device->ap_actlog_cnt)
2808 	    || curr_events - device->rs_last_events > 64) {
2809 		unsigned long rs_left;
2810 		int i;
2811 
2812 		device->rs_last_events = curr_events;
2813 
2814 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2815 		 * approx. */
2816 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2817 
2818 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2819 			rs_left = device->ov_left;
2820 		else
2821 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2822 
2823 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2824 		if (!dt)
2825 			dt++;
2826 		db = device->rs_mark_left[i] - rs_left;
2827 		dbdt = Bit2KB(db/dt);
2828 
2829 		if (dbdt > c_min_rate)
2830 			return true;
2831 	}
2832 	return false;
2833 }
2834 
2835 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2836 {
2837 	struct drbd_peer_device *peer_device;
2838 	struct drbd_device *device;
2839 	sector_t sector;
2840 	sector_t capacity;
2841 	struct drbd_peer_request *peer_req;
2842 	struct digest_info *di = NULL;
2843 	int size, verb;
2844 	unsigned int fault_type;
2845 	struct p_block_req *p =	pi->data;
2846 
2847 	peer_device = conn_peer_device(connection, pi->vnr);
2848 	if (!peer_device)
2849 		return -EIO;
2850 	device = peer_device->device;
2851 	capacity = drbd_get_capacity(device->this_bdev);
2852 
2853 	sector = be64_to_cpu(p->sector);
2854 	size   = be32_to_cpu(p->blksize);
2855 
2856 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2857 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2858 				(unsigned long long)sector, size);
2859 		return -EINVAL;
2860 	}
2861 	if (sector + (size>>9) > capacity) {
2862 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2863 				(unsigned long long)sector, size);
2864 		return -EINVAL;
2865 	}
2866 
2867 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2868 		verb = 1;
2869 		switch (pi->cmd) {
2870 		case P_DATA_REQUEST:
2871 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2872 			break;
2873 		case P_RS_THIN_REQ:
2874 		case P_RS_DATA_REQUEST:
2875 		case P_CSUM_RS_REQUEST:
2876 		case P_OV_REQUEST:
2877 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2878 			break;
2879 		case P_OV_REPLY:
2880 			verb = 0;
2881 			dec_rs_pending(device);
2882 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2883 			break;
2884 		default:
2885 			BUG();
2886 		}
2887 		if (verb && __ratelimit(&drbd_ratelimit_state))
2888 			drbd_err(device, "Can not satisfy peer's read request, "
2889 			    "no local data.\n");
2890 
2891 		/* drain possibly payload */
2892 		return drbd_drain_block(peer_device, pi->size);
2893 	}
2894 
2895 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2896 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2897 	 * which in turn might block on the other node at this very place.  */
2898 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2899 			size, GFP_NOIO);
2900 	if (!peer_req) {
2901 		put_ldev(device);
2902 		return -ENOMEM;
2903 	}
2904 
2905 	switch (pi->cmd) {
2906 	case P_DATA_REQUEST:
2907 		peer_req->w.cb = w_e_end_data_req;
2908 		fault_type = DRBD_FAULT_DT_RD;
2909 		/* application IO, don't drbd_rs_begin_io */
2910 		peer_req->flags |= EE_APPLICATION;
2911 		goto submit;
2912 
2913 	case P_RS_THIN_REQ:
2914 		/* If at some point in the future we have a smart way to
2915 		   find out if this data block is completely deallocated,
2916 		   then we would do something smarter here than reading
2917 		   the block... */
2918 		peer_req->flags |= EE_RS_THIN_REQ;
2919 		/* fall through */
2920 	case P_RS_DATA_REQUEST:
2921 		peer_req->w.cb = w_e_end_rsdata_req;
2922 		fault_type = DRBD_FAULT_RS_RD;
2923 		/* used in the sector offset progress display */
2924 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2925 		break;
2926 
2927 	case P_OV_REPLY:
2928 	case P_CSUM_RS_REQUEST:
2929 		fault_type = DRBD_FAULT_RS_RD;
2930 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2931 		if (!di)
2932 			goto out_free_e;
2933 
2934 		di->digest_size = pi->size;
2935 		di->digest = (((char *)di)+sizeof(struct digest_info));
2936 
2937 		peer_req->digest = di;
2938 		peer_req->flags |= EE_HAS_DIGEST;
2939 
2940 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2941 			goto out_free_e;
2942 
2943 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2944 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2945 			peer_req->w.cb = w_e_end_csum_rs_req;
2946 			/* used in the sector offset progress display */
2947 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2948 			/* remember to report stats in drbd_resync_finished */
2949 			device->use_csums = true;
2950 		} else if (pi->cmd == P_OV_REPLY) {
2951 			/* track progress, we may need to throttle */
2952 			atomic_add(size >> 9, &device->rs_sect_in);
2953 			peer_req->w.cb = w_e_end_ov_reply;
2954 			dec_rs_pending(device);
2955 			/* drbd_rs_begin_io done when we sent this request,
2956 			 * but accounting still needs to be done. */
2957 			goto submit_for_resync;
2958 		}
2959 		break;
2960 
2961 	case P_OV_REQUEST:
2962 		if (device->ov_start_sector == ~(sector_t)0 &&
2963 		    peer_device->connection->agreed_pro_version >= 90) {
2964 			unsigned long now = jiffies;
2965 			int i;
2966 			device->ov_start_sector = sector;
2967 			device->ov_position = sector;
2968 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2969 			device->rs_total = device->ov_left;
2970 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2971 				device->rs_mark_left[i] = device->ov_left;
2972 				device->rs_mark_time[i] = now;
2973 			}
2974 			drbd_info(device, "Online Verify start sector: %llu\n",
2975 					(unsigned long long)sector);
2976 		}
2977 		peer_req->w.cb = w_e_end_ov_req;
2978 		fault_type = DRBD_FAULT_RS_RD;
2979 		break;
2980 
2981 	default:
2982 		BUG();
2983 	}
2984 
2985 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2986 	 * wrt the receiver, but it is not as straightforward as it may seem.
2987 	 * Various places in the resync start and stop logic assume resync
2988 	 * requests are processed in order, requeuing this on the worker thread
2989 	 * introduces a bunch of new code for synchronization between threads.
2990 	 *
2991 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2992 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2993 	 * for application writes for the same time.  For now, just throttle
2994 	 * here, where the rest of the code expects the receiver to sleep for
2995 	 * a while, anyways.
2996 	 */
2997 
2998 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2999 	 * this defers syncer requests for some time, before letting at least
3000 	 * on request through.  The resync controller on the receiving side
3001 	 * will adapt to the incoming rate accordingly.
3002 	 *
3003 	 * We cannot throttle here if remote is Primary/SyncTarget:
3004 	 * we would also throttle its application reads.
3005 	 * In that case, throttling is done on the SyncTarget only.
3006 	 */
3007 
3008 	/* Even though this may be a resync request, we do add to "read_ee";
3009 	 * "sync_ee" is only used for resync WRITEs.
3010 	 * Add to list early, so debugfs can find this request
3011 	 * even if we have to sleep below. */
3012 	spin_lock_irq(&device->resource->req_lock);
3013 	list_add_tail(&peer_req->w.list, &device->read_ee);
3014 	spin_unlock_irq(&device->resource->req_lock);
3015 
3016 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3017 	if (device->state.peer != R_PRIMARY
3018 	&& drbd_rs_should_slow_down(device, sector, false))
3019 		schedule_timeout_uninterruptible(HZ/10);
3020 	update_receiver_timing_details(connection, drbd_rs_begin_io);
3021 	if (drbd_rs_begin_io(device, sector))
3022 		goto out_free_e;
3023 
3024 submit_for_resync:
3025 	atomic_add(size >> 9, &device->rs_sect_ev);
3026 
3027 submit:
3028 	update_receiver_timing_details(connection, drbd_submit_peer_request);
3029 	inc_unacked(device);
3030 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3031 				     fault_type) == 0)
3032 		return 0;
3033 
3034 	/* don't care for the reason here */
3035 	drbd_err(device, "submit failed, triggering re-connect\n");
3036 
3037 out_free_e:
3038 	spin_lock_irq(&device->resource->req_lock);
3039 	list_del(&peer_req->w.list);
3040 	spin_unlock_irq(&device->resource->req_lock);
3041 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
3042 
3043 	put_ldev(device);
3044 	drbd_free_peer_req(device, peer_req);
3045 	return -EIO;
3046 }
3047 
3048 /**
3049  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3050  */
3051 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3052 {
3053 	struct drbd_device *device = peer_device->device;
3054 	int self, peer, rv = -100;
3055 	unsigned long ch_self, ch_peer;
3056 	enum drbd_after_sb_p after_sb_0p;
3057 
3058 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
3059 	peer = device->p_uuid[UI_BITMAP] & 1;
3060 
3061 	ch_peer = device->p_uuid[UI_SIZE];
3062 	ch_self = device->comm_bm_set;
3063 
3064 	rcu_read_lock();
3065 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3066 	rcu_read_unlock();
3067 	switch (after_sb_0p) {
3068 	case ASB_CONSENSUS:
3069 	case ASB_DISCARD_SECONDARY:
3070 	case ASB_CALL_HELPER:
3071 	case ASB_VIOLENTLY:
3072 		drbd_err(device, "Configuration error.\n");
3073 		break;
3074 	case ASB_DISCONNECT:
3075 		break;
3076 	case ASB_DISCARD_YOUNGER_PRI:
3077 		if (self == 0 && peer == 1) {
3078 			rv = -1;
3079 			break;
3080 		}
3081 		if (self == 1 && peer == 0) {
3082 			rv =  1;
3083 			break;
3084 		}
3085 		/* Else fall through - to one of the other strategies... */
3086 	case ASB_DISCARD_OLDER_PRI:
3087 		if (self == 0 && peer == 1) {
3088 			rv = 1;
3089 			break;
3090 		}
3091 		if (self == 1 && peer == 0) {
3092 			rv = -1;
3093 			break;
3094 		}
3095 		/* Else fall through to one of the other strategies... */
3096 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3097 		     "Using discard-least-changes instead\n");
3098 		/* fall through */
3099 	case ASB_DISCARD_ZERO_CHG:
3100 		if (ch_peer == 0 && ch_self == 0) {
3101 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3102 				? -1 : 1;
3103 			break;
3104 		} else {
3105 			if (ch_peer == 0) { rv =  1; break; }
3106 			if (ch_self == 0) { rv = -1; break; }
3107 		}
3108 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3109 			break;
3110 		/* else, fall through */
3111 	case ASB_DISCARD_LEAST_CHG:
3112 		if	(ch_self < ch_peer)
3113 			rv = -1;
3114 		else if (ch_self > ch_peer)
3115 			rv =  1;
3116 		else /* ( ch_self == ch_peer ) */
3117 		     /* Well, then use something else. */
3118 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3119 				? -1 : 1;
3120 		break;
3121 	case ASB_DISCARD_LOCAL:
3122 		rv = -1;
3123 		break;
3124 	case ASB_DISCARD_REMOTE:
3125 		rv =  1;
3126 	}
3127 
3128 	return rv;
3129 }
3130 
3131 /**
3132  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3133  */
3134 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3135 {
3136 	struct drbd_device *device = peer_device->device;
3137 	int hg, rv = -100;
3138 	enum drbd_after_sb_p after_sb_1p;
3139 
3140 	rcu_read_lock();
3141 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3142 	rcu_read_unlock();
3143 	switch (after_sb_1p) {
3144 	case ASB_DISCARD_YOUNGER_PRI:
3145 	case ASB_DISCARD_OLDER_PRI:
3146 	case ASB_DISCARD_LEAST_CHG:
3147 	case ASB_DISCARD_LOCAL:
3148 	case ASB_DISCARD_REMOTE:
3149 	case ASB_DISCARD_ZERO_CHG:
3150 		drbd_err(device, "Configuration error.\n");
3151 		break;
3152 	case ASB_DISCONNECT:
3153 		break;
3154 	case ASB_CONSENSUS:
3155 		hg = drbd_asb_recover_0p(peer_device);
3156 		if (hg == -1 && device->state.role == R_SECONDARY)
3157 			rv = hg;
3158 		if (hg == 1  && device->state.role == R_PRIMARY)
3159 			rv = hg;
3160 		break;
3161 	case ASB_VIOLENTLY:
3162 		rv = drbd_asb_recover_0p(peer_device);
3163 		break;
3164 	case ASB_DISCARD_SECONDARY:
3165 		return device->state.role == R_PRIMARY ? 1 : -1;
3166 	case ASB_CALL_HELPER:
3167 		hg = drbd_asb_recover_0p(peer_device);
3168 		if (hg == -1 && device->state.role == R_PRIMARY) {
3169 			enum drbd_state_rv rv2;
3170 
3171 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3172 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3173 			  * we do not need to wait for the after state change work either. */
3174 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3175 			if (rv2 != SS_SUCCESS) {
3176 				drbd_khelper(device, "pri-lost-after-sb");
3177 			} else {
3178 				drbd_warn(device, "Successfully gave up primary role.\n");
3179 				rv = hg;
3180 			}
3181 		} else
3182 			rv = hg;
3183 	}
3184 
3185 	return rv;
3186 }
3187 
3188 /**
3189  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3190  */
3191 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3192 {
3193 	struct drbd_device *device = peer_device->device;
3194 	int hg, rv = -100;
3195 	enum drbd_after_sb_p after_sb_2p;
3196 
3197 	rcu_read_lock();
3198 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3199 	rcu_read_unlock();
3200 	switch (after_sb_2p) {
3201 	case ASB_DISCARD_YOUNGER_PRI:
3202 	case ASB_DISCARD_OLDER_PRI:
3203 	case ASB_DISCARD_LEAST_CHG:
3204 	case ASB_DISCARD_LOCAL:
3205 	case ASB_DISCARD_REMOTE:
3206 	case ASB_CONSENSUS:
3207 	case ASB_DISCARD_SECONDARY:
3208 	case ASB_DISCARD_ZERO_CHG:
3209 		drbd_err(device, "Configuration error.\n");
3210 		break;
3211 	case ASB_VIOLENTLY:
3212 		rv = drbd_asb_recover_0p(peer_device);
3213 		break;
3214 	case ASB_DISCONNECT:
3215 		break;
3216 	case ASB_CALL_HELPER:
3217 		hg = drbd_asb_recover_0p(peer_device);
3218 		if (hg == -1) {
3219 			enum drbd_state_rv rv2;
3220 
3221 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3222 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3223 			  * we do not need to wait for the after state change work either. */
3224 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3225 			if (rv2 != SS_SUCCESS) {
3226 				drbd_khelper(device, "pri-lost-after-sb");
3227 			} else {
3228 				drbd_warn(device, "Successfully gave up primary role.\n");
3229 				rv = hg;
3230 			}
3231 		} else
3232 			rv = hg;
3233 	}
3234 
3235 	return rv;
3236 }
3237 
3238 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3239 			   u64 bits, u64 flags)
3240 {
3241 	if (!uuid) {
3242 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3243 		return;
3244 	}
3245 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3246 	     text,
3247 	     (unsigned long long)uuid[UI_CURRENT],
3248 	     (unsigned long long)uuid[UI_BITMAP],
3249 	     (unsigned long long)uuid[UI_HISTORY_START],
3250 	     (unsigned long long)uuid[UI_HISTORY_END],
3251 	     (unsigned long long)bits,
3252 	     (unsigned long long)flags);
3253 }
3254 
3255 /*
3256   100	after split brain try auto recover
3257     2	C_SYNC_SOURCE set BitMap
3258     1	C_SYNC_SOURCE use BitMap
3259     0	no Sync
3260    -1	C_SYNC_TARGET use BitMap
3261    -2	C_SYNC_TARGET set BitMap
3262  -100	after split brain, disconnect
3263 -1000	unrelated data
3264 -1091   requires proto 91
3265 -1096   requires proto 96
3266  */
3267 
3268 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3269 {
3270 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3271 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3272 	u64 self, peer;
3273 	int i, j;
3274 
3275 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3276 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3277 
3278 	*rule_nr = 10;
3279 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3280 		return 0;
3281 
3282 	*rule_nr = 20;
3283 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3284 	     peer != UUID_JUST_CREATED)
3285 		return -2;
3286 
3287 	*rule_nr = 30;
3288 	if (self != UUID_JUST_CREATED &&
3289 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3290 		return 2;
3291 
3292 	if (self == peer) {
3293 		int rct, dc; /* roles at crash time */
3294 
3295 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3296 
3297 			if (connection->agreed_pro_version < 91)
3298 				return -1091;
3299 
3300 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3301 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3302 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3303 				drbd_uuid_move_history(device);
3304 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3305 				device->ldev->md.uuid[UI_BITMAP] = 0;
3306 
3307 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3308 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3309 				*rule_nr = 34;
3310 			} else {
3311 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3312 				*rule_nr = 36;
3313 			}
3314 
3315 			return 1;
3316 		}
3317 
3318 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3319 
3320 			if (connection->agreed_pro_version < 91)
3321 				return -1091;
3322 
3323 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3324 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3325 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3326 
3327 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3328 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3329 				device->p_uuid[UI_BITMAP] = 0UL;
3330 
3331 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3332 				*rule_nr = 35;
3333 			} else {
3334 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3335 				*rule_nr = 37;
3336 			}
3337 
3338 			return -1;
3339 		}
3340 
3341 		/* Common power [off|failure] */
3342 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3343 			(device->p_uuid[UI_FLAGS] & 2);
3344 		/* lowest bit is set when we were primary,
3345 		 * next bit (weight 2) is set when peer was primary */
3346 		*rule_nr = 40;
3347 
3348 		/* Neither has the "crashed primary" flag set,
3349 		 * only a replication link hickup. */
3350 		if (rct == 0)
3351 			return 0;
3352 
3353 		/* Current UUID equal and no bitmap uuid; does not necessarily
3354 		 * mean this was a "simultaneous hard crash", maybe IO was
3355 		 * frozen, so no UUID-bump happened.
3356 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3357 		 * for "new-enough" peer DRBD version. */
3358 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3359 			*rule_nr = 41;
3360 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3361 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3362 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3363 			}
3364 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3365 				/* At least one has the "crashed primary" bit set,
3366 				 * both are primary now, but neither has rotated its UUIDs?
3367 				 * "Can not happen." */
3368 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3369 				return -100;
3370 			}
3371 			if (device->state.role == R_PRIMARY)
3372 				return 1;
3373 			return -1;
3374 		}
3375 
3376 		/* Both are secondary.
3377 		 * Really looks like recovery from simultaneous hard crash.
3378 		 * Check which had been primary before, and arbitrate. */
3379 		switch (rct) {
3380 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3381 		case 1: /*  self_pri && !peer_pri */ return 1;
3382 		case 2: /* !self_pri &&  peer_pri */ return -1;
3383 		case 3: /*  self_pri &&  peer_pri */
3384 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3385 			return dc ? -1 : 1;
3386 		}
3387 	}
3388 
3389 	*rule_nr = 50;
3390 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3391 	if (self == peer)
3392 		return -1;
3393 
3394 	*rule_nr = 51;
3395 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3396 	if (self == peer) {
3397 		if (connection->agreed_pro_version < 96 ?
3398 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3399 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3400 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3401 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3402 			   resync as sync source modifications of the peer's UUIDs. */
3403 
3404 			if (connection->agreed_pro_version < 91)
3405 				return -1091;
3406 
3407 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3408 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3409 
3410 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3411 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3412 
3413 			return -1;
3414 		}
3415 	}
3416 
3417 	*rule_nr = 60;
3418 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3419 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3420 		peer = device->p_uuid[i] & ~((u64)1);
3421 		if (self == peer)
3422 			return -2;
3423 	}
3424 
3425 	*rule_nr = 70;
3426 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3427 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3428 	if (self == peer)
3429 		return 1;
3430 
3431 	*rule_nr = 71;
3432 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3433 	if (self == peer) {
3434 		if (connection->agreed_pro_version < 96 ?
3435 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3436 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3437 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3438 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3439 			   resync as sync source modifications of our UUIDs. */
3440 
3441 			if (connection->agreed_pro_version < 91)
3442 				return -1091;
3443 
3444 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3445 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3446 
3447 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3448 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3449 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3450 
3451 			return 1;
3452 		}
3453 	}
3454 
3455 
3456 	*rule_nr = 80;
3457 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3458 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3459 		self = device->ldev->md.uuid[i] & ~((u64)1);
3460 		if (self == peer)
3461 			return 2;
3462 	}
3463 
3464 	*rule_nr = 90;
3465 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3466 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3467 	if (self == peer && self != ((u64)0))
3468 		return 100;
3469 
3470 	*rule_nr = 100;
3471 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3472 		self = device->ldev->md.uuid[i] & ~((u64)1);
3473 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3474 			peer = device->p_uuid[j] & ~((u64)1);
3475 			if (self == peer)
3476 				return -100;
3477 		}
3478 	}
3479 
3480 	return -1000;
3481 }
3482 
3483 /* drbd_sync_handshake() returns the new conn state on success, or
3484    CONN_MASK (-1) on failure.
3485  */
3486 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3487 					   enum drbd_role peer_role,
3488 					   enum drbd_disk_state peer_disk) __must_hold(local)
3489 {
3490 	struct drbd_device *device = peer_device->device;
3491 	enum drbd_conns rv = C_MASK;
3492 	enum drbd_disk_state mydisk;
3493 	struct net_conf *nc;
3494 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3495 
3496 	mydisk = device->state.disk;
3497 	if (mydisk == D_NEGOTIATING)
3498 		mydisk = device->new_state_tmp.disk;
3499 
3500 	drbd_info(device, "drbd_sync_handshake:\n");
3501 
3502 	spin_lock_irq(&device->ldev->md.uuid_lock);
3503 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3504 	drbd_uuid_dump(device, "peer", device->p_uuid,
3505 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3506 
3507 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3508 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3509 
3510 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3511 
3512 	if (hg == -1000) {
3513 		drbd_alert(device, "Unrelated data, aborting!\n");
3514 		return C_MASK;
3515 	}
3516 	if (hg < -0x10000) {
3517 		int proto, fflags;
3518 		hg = -hg;
3519 		proto = hg & 0xff;
3520 		fflags = (hg >> 8) & 0xff;
3521 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3522 					proto, fflags);
3523 		return C_MASK;
3524 	}
3525 	if (hg < -1000) {
3526 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3527 		return C_MASK;
3528 	}
3529 
3530 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3531 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3532 		int f = (hg == -100) || abs(hg) == 2;
3533 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3534 		if (f)
3535 			hg = hg*2;
3536 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3537 		     hg > 0 ? "source" : "target");
3538 	}
3539 
3540 	if (abs(hg) == 100)
3541 		drbd_khelper(device, "initial-split-brain");
3542 
3543 	rcu_read_lock();
3544 	nc = rcu_dereference(peer_device->connection->net_conf);
3545 	always_asbp = nc->always_asbp;
3546 	rr_conflict = nc->rr_conflict;
3547 	tentative = nc->tentative;
3548 	rcu_read_unlock();
3549 
3550 	if (hg == 100 || (hg == -100 && always_asbp)) {
3551 		int pcount = (device->state.role == R_PRIMARY)
3552 			   + (peer_role == R_PRIMARY);
3553 		int forced = (hg == -100);
3554 
3555 		switch (pcount) {
3556 		case 0:
3557 			hg = drbd_asb_recover_0p(peer_device);
3558 			break;
3559 		case 1:
3560 			hg = drbd_asb_recover_1p(peer_device);
3561 			break;
3562 		case 2:
3563 			hg = drbd_asb_recover_2p(peer_device);
3564 			break;
3565 		}
3566 		if (abs(hg) < 100) {
3567 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3568 			     "automatically solved. Sync from %s node\n",
3569 			     pcount, (hg < 0) ? "peer" : "this");
3570 			if (forced) {
3571 				drbd_warn(device, "Doing a full sync, since"
3572 				     " UUIDs where ambiguous.\n");
3573 				hg = hg*2;
3574 			}
3575 		}
3576 	}
3577 
3578 	if (hg == -100) {
3579 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3580 			hg = -1;
3581 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3582 			hg = 1;
3583 
3584 		if (abs(hg) < 100)
3585 			drbd_warn(device, "Split-Brain detected, manually solved. "
3586 			     "Sync from %s node\n",
3587 			     (hg < 0) ? "peer" : "this");
3588 	}
3589 
3590 	if (hg == -100) {
3591 		/* FIXME this log message is not correct if we end up here
3592 		 * after an attempted attach on a diskless node.
3593 		 * We just refuse to attach -- well, we drop the "connection"
3594 		 * to that disk, in a way... */
3595 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3596 		drbd_khelper(device, "split-brain");
3597 		return C_MASK;
3598 	}
3599 
3600 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3601 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3602 		return C_MASK;
3603 	}
3604 
3605 	if (hg < 0 && /* by intention we do not use mydisk here. */
3606 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3607 		switch (rr_conflict) {
3608 		case ASB_CALL_HELPER:
3609 			drbd_khelper(device, "pri-lost");
3610 			/* fall through */
3611 		case ASB_DISCONNECT:
3612 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3613 			return C_MASK;
3614 		case ASB_VIOLENTLY:
3615 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3616 			     "assumption\n");
3617 		}
3618 	}
3619 
3620 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3621 		if (hg == 0)
3622 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3623 		else
3624 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3625 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3626 				 abs(hg) >= 2 ? "full" : "bit-map based");
3627 		return C_MASK;
3628 	}
3629 
3630 	if (abs(hg) >= 2) {
3631 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3632 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3633 					BM_LOCKED_SET_ALLOWED))
3634 			return C_MASK;
3635 	}
3636 
3637 	if (hg > 0) { /* become sync source. */
3638 		rv = C_WF_BITMAP_S;
3639 	} else if (hg < 0) { /* become sync target */
3640 		rv = C_WF_BITMAP_T;
3641 	} else {
3642 		rv = C_CONNECTED;
3643 		if (drbd_bm_total_weight(device)) {
3644 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3645 			     drbd_bm_total_weight(device));
3646 		}
3647 	}
3648 
3649 	return rv;
3650 }
3651 
3652 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3653 {
3654 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3655 	if (peer == ASB_DISCARD_REMOTE)
3656 		return ASB_DISCARD_LOCAL;
3657 
3658 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3659 	if (peer == ASB_DISCARD_LOCAL)
3660 		return ASB_DISCARD_REMOTE;
3661 
3662 	/* everything else is valid if they are equal on both sides. */
3663 	return peer;
3664 }
3665 
3666 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3667 {
3668 	struct p_protocol *p = pi->data;
3669 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3670 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3671 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3672 	char integrity_alg[SHARED_SECRET_MAX] = "";
3673 	struct crypto_shash *peer_integrity_tfm = NULL;
3674 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3675 
3676 	p_proto		= be32_to_cpu(p->protocol);
3677 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3678 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3679 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3680 	p_two_primaries = be32_to_cpu(p->two_primaries);
3681 	cf		= be32_to_cpu(p->conn_flags);
3682 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3683 
3684 	if (connection->agreed_pro_version >= 87) {
3685 		int err;
3686 
3687 		if (pi->size > sizeof(integrity_alg))
3688 			return -EIO;
3689 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3690 		if (err)
3691 			return err;
3692 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3693 	}
3694 
3695 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3696 		clear_bit(CONN_DRY_RUN, &connection->flags);
3697 
3698 		if (cf & CF_DRY_RUN)
3699 			set_bit(CONN_DRY_RUN, &connection->flags);
3700 
3701 		rcu_read_lock();
3702 		nc = rcu_dereference(connection->net_conf);
3703 
3704 		if (p_proto != nc->wire_protocol) {
3705 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3706 			goto disconnect_rcu_unlock;
3707 		}
3708 
3709 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3710 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3711 			goto disconnect_rcu_unlock;
3712 		}
3713 
3714 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3715 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3716 			goto disconnect_rcu_unlock;
3717 		}
3718 
3719 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3720 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3721 			goto disconnect_rcu_unlock;
3722 		}
3723 
3724 		if (p_discard_my_data && nc->discard_my_data) {
3725 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3726 			goto disconnect_rcu_unlock;
3727 		}
3728 
3729 		if (p_two_primaries != nc->two_primaries) {
3730 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3731 			goto disconnect_rcu_unlock;
3732 		}
3733 
3734 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3735 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3736 			goto disconnect_rcu_unlock;
3737 		}
3738 
3739 		rcu_read_unlock();
3740 	}
3741 
3742 	if (integrity_alg[0]) {
3743 		int hash_size;
3744 
3745 		/*
3746 		 * We can only change the peer data integrity algorithm
3747 		 * here.  Changing our own data integrity algorithm
3748 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3749 		 * the same time; otherwise, the peer has no way to
3750 		 * tell between which packets the algorithm should
3751 		 * change.
3752 		 */
3753 
3754 		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3755 		if (IS_ERR(peer_integrity_tfm)) {
3756 			peer_integrity_tfm = NULL;
3757 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3758 				 integrity_alg);
3759 			goto disconnect;
3760 		}
3761 
3762 		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3763 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3764 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3765 		if (!(int_dig_in && int_dig_vv)) {
3766 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3767 			goto disconnect;
3768 		}
3769 	}
3770 
3771 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3772 	if (!new_net_conf) {
3773 		drbd_err(connection, "Allocation of new net_conf failed\n");
3774 		goto disconnect;
3775 	}
3776 
3777 	mutex_lock(&connection->data.mutex);
3778 	mutex_lock(&connection->resource->conf_update);
3779 	old_net_conf = connection->net_conf;
3780 	*new_net_conf = *old_net_conf;
3781 
3782 	new_net_conf->wire_protocol = p_proto;
3783 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3784 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3785 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3786 	new_net_conf->two_primaries = p_two_primaries;
3787 
3788 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3789 	mutex_unlock(&connection->resource->conf_update);
3790 	mutex_unlock(&connection->data.mutex);
3791 
3792 	crypto_free_shash(connection->peer_integrity_tfm);
3793 	kfree(connection->int_dig_in);
3794 	kfree(connection->int_dig_vv);
3795 	connection->peer_integrity_tfm = peer_integrity_tfm;
3796 	connection->int_dig_in = int_dig_in;
3797 	connection->int_dig_vv = int_dig_vv;
3798 
3799 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3800 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3801 			  integrity_alg[0] ? integrity_alg : "(none)");
3802 
3803 	synchronize_rcu();
3804 	kfree(old_net_conf);
3805 	return 0;
3806 
3807 disconnect_rcu_unlock:
3808 	rcu_read_unlock();
3809 disconnect:
3810 	crypto_free_shash(peer_integrity_tfm);
3811 	kfree(int_dig_in);
3812 	kfree(int_dig_vv);
3813 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3814 	return -EIO;
3815 }
3816 
3817 /* helper function
3818  * input: alg name, feature name
3819  * return: NULL (alg name was "")
3820  *         ERR_PTR(error) if something goes wrong
3821  *         or the crypto hash ptr, if it worked out ok. */
3822 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3823 		const struct drbd_device *device,
3824 		const char *alg, const char *name)
3825 {
3826 	struct crypto_shash *tfm;
3827 
3828 	if (!alg[0])
3829 		return NULL;
3830 
3831 	tfm = crypto_alloc_shash(alg, 0, 0);
3832 	if (IS_ERR(tfm)) {
3833 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3834 			alg, name, PTR_ERR(tfm));
3835 		return tfm;
3836 	}
3837 	return tfm;
3838 }
3839 
3840 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3841 {
3842 	void *buffer = connection->data.rbuf;
3843 	int size = pi->size;
3844 
3845 	while (size) {
3846 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3847 		s = drbd_recv(connection, buffer, s);
3848 		if (s <= 0) {
3849 			if (s < 0)
3850 				return s;
3851 			break;
3852 		}
3853 		size -= s;
3854 	}
3855 	if (size)
3856 		return -EIO;
3857 	return 0;
3858 }
3859 
3860 /*
3861  * config_unknown_volume  -  device configuration command for unknown volume
3862  *
3863  * When a device is added to an existing connection, the node on which the
3864  * device is added first will send configuration commands to its peer but the
3865  * peer will not know about the device yet.  It will warn and ignore these
3866  * commands.  Once the device is added on the second node, the second node will
3867  * send the same device configuration commands, but in the other direction.
3868  *
3869  * (We can also end up here if drbd is misconfigured.)
3870  */
3871 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3872 {
3873 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3874 		  cmdname(pi->cmd), pi->vnr);
3875 	return ignore_remaining_packet(connection, pi);
3876 }
3877 
3878 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3879 {
3880 	struct drbd_peer_device *peer_device;
3881 	struct drbd_device *device;
3882 	struct p_rs_param_95 *p;
3883 	unsigned int header_size, data_size, exp_max_sz;
3884 	struct crypto_shash *verify_tfm = NULL;
3885 	struct crypto_shash *csums_tfm = NULL;
3886 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3887 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3888 	const int apv = connection->agreed_pro_version;
3889 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3890 	int fifo_size = 0;
3891 	int err;
3892 
3893 	peer_device = conn_peer_device(connection, pi->vnr);
3894 	if (!peer_device)
3895 		return config_unknown_volume(connection, pi);
3896 	device = peer_device->device;
3897 
3898 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3899 		    : apv == 88 ? sizeof(struct p_rs_param)
3900 					+ SHARED_SECRET_MAX
3901 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3902 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3903 
3904 	if (pi->size > exp_max_sz) {
3905 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3906 		    pi->size, exp_max_sz);
3907 		return -EIO;
3908 	}
3909 
3910 	if (apv <= 88) {
3911 		header_size = sizeof(struct p_rs_param);
3912 		data_size = pi->size - header_size;
3913 	} else if (apv <= 94) {
3914 		header_size = sizeof(struct p_rs_param_89);
3915 		data_size = pi->size - header_size;
3916 		D_ASSERT(device, data_size == 0);
3917 	} else {
3918 		header_size = sizeof(struct p_rs_param_95);
3919 		data_size = pi->size - header_size;
3920 		D_ASSERT(device, data_size == 0);
3921 	}
3922 
3923 	/* initialize verify_alg and csums_alg */
3924 	p = pi->data;
3925 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3926 
3927 	err = drbd_recv_all(peer_device->connection, p, header_size);
3928 	if (err)
3929 		return err;
3930 
3931 	mutex_lock(&connection->resource->conf_update);
3932 	old_net_conf = peer_device->connection->net_conf;
3933 	if (get_ldev(device)) {
3934 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3935 		if (!new_disk_conf) {
3936 			put_ldev(device);
3937 			mutex_unlock(&connection->resource->conf_update);
3938 			drbd_err(device, "Allocation of new disk_conf failed\n");
3939 			return -ENOMEM;
3940 		}
3941 
3942 		old_disk_conf = device->ldev->disk_conf;
3943 		*new_disk_conf = *old_disk_conf;
3944 
3945 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3946 	}
3947 
3948 	if (apv >= 88) {
3949 		if (apv == 88) {
3950 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3951 				drbd_err(device, "verify-alg of wrong size, "
3952 					"peer wants %u, accepting only up to %u byte\n",
3953 					data_size, SHARED_SECRET_MAX);
3954 				err = -EIO;
3955 				goto reconnect;
3956 			}
3957 
3958 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3959 			if (err)
3960 				goto reconnect;
3961 			/* we expect NUL terminated string */
3962 			/* but just in case someone tries to be evil */
3963 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3964 			p->verify_alg[data_size-1] = 0;
3965 
3966 		} else /* apv >= 89 */ {
3967 			/* we still expect NUL terminated strings */
3968 			/* but just in case someone tries to be evil */
3969 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3970 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3971 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3972 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3973 		}
3974 
3975 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3976 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3977 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3978 				    old_net_conf->verify_alg, p->verify_alg);
3979 				goto disconnect;
3980 			}
3981 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3982 					p->verify_alg, "verify-alg");
3983 			if (IS_ERR(verify_tfm)) {
3984 				verify_tfm = NULL;
3985 				goto disconnect;
3986 			}
3987 		}
3988 
3989 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3990 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3991 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3992 				    old_net_conf->csums_alg, p->csums_alg);
3993 				goto disconnect;
3994 			}
3995 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3996 					p->csums_alg, "csums-alg");
3997 			if (IS_ERR(csums_tfm)) {
3998 				csums_tfm = NULL;
3999 				goto disconnect;
4000 			}
4001 		}
4002 
4003 		if (apv > 94 && new_disk_conf) {
4004 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4005 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4006 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4007 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4008 
4009 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4010 			if (fifo_size != device->rs_plan_s->size) {
4011 				new_plan = fifo_alloc(fifo_size);
4012 				if (!new_plan) {
4013 					drbd_err(device, "kmalloc of fifo_buffer failed");
4014 					put_ldev(device);
4015 					goto disconnect;
4016 				}
4017 			}
4018 		}
4019 
4020 		if (verify_tfm || csums_tfm) {
4021 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4022 			if (!new_net_conf) {
4023 				drbd_err(device, "Allocation of new net_conf failed\n");
4024 				goto disconnect;
4025 			}
4026 
4027 			*new_net_conf = *old_net_conf;
4028 
4029 			if (verify_tfm) {
4030 				strcpy(new_net_conf->verify_alg, p->verify_alg);
4031 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4032 				crypto_free_shash(peer_device->connection->verify_tfm);
4033 				peer_device->connection->verify_tfm = verify_tfm;
4034 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4035 			}
4036 			if (csums_tfm) {
4037 				strcpy(new_net_conf->csums_alg, p->csums_alg);
4038 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4039 				crypto_free_shash(peer_device->connection->csums_tfm);
4040 				peer_device->connection->csums_tfm = csums_tfm;
4041 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4042 			}
4043 			rcu_assign_pointer(connection->net_conf, new_net_conf);
4044 		}
4045 	}
4046 
4047 	if (new_disk_conf) {
4048 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4049 		put_ldev(device);
4050 	}
4051 
4052 	if (new_plan) {
4053 		old_plan = device->rs_plan_s;
4054 		rcu_assign_pointer(device->rs_plan_s, new_plan);
4055 	}
4056 
4057 	mutex_unlock(&connection->resource->conf_update);
4058 	synchronize_rcu();
4059 	if (new_net_conf)
4060 		kfree(old_net_conf);
4061 	kfree(old_disk_conf);
4062 	kfree(old_plan);
4063 
4064 	return 0;
4065 
4066 reconnect:
4067 	if (new_disk_conf) {
4068 		put_ldev(device);
4069 		kfree(new_disk_conf);
4070 	}
4071 	mutex_unlock(&connection->resource->conf_update);
4072 	return -EIO;
4073 
4074 disconnect:
4075 	kfree(new_plan);
4076 	if (new_disk_conf) {
4077 		put_ldev(device);
4078 		kfree(new_disk_conf);
4079 	}
4080 	mutex_unlock(&connection->resource->conf_update);
4081 	/* just for completeness: actually not needed,
4082 	 * as this is not reached if csums_tfm was ok. */
4083 	crypto_free_shash(csums_tfm);
4084 	/* but free the verify_tfm again, if csums_tfm did not work out */
4085 	crypto_free_shash(verify_tfm);
4086 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4087 	return -EIO;
4088 }
4089 
4090 /* warn if the arguments differ by more than 12.5% */
4091 static void warn_if_differ_considerably(struct drbd_device *device,
4092 	const char *s, sector_t a, sector_t b)
4093 {
4094 	sector_t d;
4095 	if (a == 0 || b == 0)
4096 		return;
4097 	d = (a > b) ? (a - b) : (b - a);
4098 	if (d > (a>>3) || d > (b>>3))
4099 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4100 		     (unsigned long long)a, (unsigned long long)b);
4101 }
4102 
4103 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4104 {
4105 	struct drbd_peer_device *peer_device;
4106 	struct drbd_device *device;
4107 	struct p_sizes *p = pi->data;
4108 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4109 	enum determine_dev_size dd = DS_UNCHANGED;
4110 	sector_t p_size, p_usize, p_csize, my_usize;
4111 	sector_t new_size, cur_size;
4112 	int ldsc = 0; /* local disk size changed */
4113 	enum dds_flags ddsf;
4114 
4115 	peer_device = conn_peer_device(connection, pi->vnr);
4116 	if (!peer_device)
4117 		return config_unknown_volume(connection, pi);
4118 	device = peer_device->device;
4119 	cur_size = drbd_get_capacity(device->this_bdev);
4120 
4121 	p_size = be64_to_cpu(p->d_size);
4122 	p_usize = be64_to_cpu(p->u_size);
4123 	p_csize = be64_to_cpu(p->c_size);
4124 
4125 	/* just store the peer's disk size for now.
4126 	 * we still need to figure out whether we accept that. */
4127 	device->p_size = p_size;
4128 
4129 	if (get_ldev(device)) {
4130 		rcu_read_lock();
4131 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4132 		rcu_read_unlock();
4133 
4134 		warn_if_differ_considerably(device, "lower level device sizes",
4135 			   p_size, drbd_get_max_capacity(device->ldev));
4136 		warn_if_differ_considerably(device, "user requested size",
4137 					    p_usize, my_usize);
4138 
4139 		/* if this is the first connect, or an otherwise expected
4140 		 * param exchange, choose the minimum */
4141 		if (device->state.conn == C_WF_REPORT_PARAMS)
4142 			p_usize = min_not_zero(my_usize, p_usize);
4143 
4144 		/* Never shrink a device with usable data during connect,
4145 		 * or "attach" on the peer.
4146 		 * But allow online shrinking if we are connected. */
4147 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4148 		if (new_size < cur_size &&
4149 		    device->state.disk >= D_OUTDATED &&
4150 		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4151 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4152 					(unsigned long long)new_size, (unsigned long long)cur_size);
4153 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154 			put_ldev(device);
4155 			return -EIO;
4156 		}
4157 
4158 		if (my_usize != p_usize) {
4159 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4160 
4161 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4162 			if (!new_disk_conf) {
4163 				drbd_err(device, "Allocation of new disk_conf failed\n");
4164 				put_ldev(device);
4165 				return -ENOMEM;
4166 			}
4167 
4168 			mutex_lock(&connection->resource->conf_update);
4169 			old_disk_conf = device->ldev->disk_conf;
4170 			*new_disk_conf = *old_disk_conf;
4171 			new_disk_conf->disk_size = p_usize;
4172 
4173 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4174 			mutex_unlock(&connection->resource->conf_update);
4175 			synchronize_rcu();
4176 			kfree(old_disk_conf);
4177 
4178 			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4179 				 (unsigned long)p_usize, (unsigned long)my_usize);
4180 		}
4181 
4182 		put_ldev(device);
4183 	}
4184 
4185 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4186 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4187 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4188 	   drbd_reconsider_queue_parameters(), we can be sure that after
4189 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4190 
4191 	ddsf = be16_to_cpu(p->dds_flags);
4192 	if (get_ldev(device)) {
4193 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4194 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4195 		put_ldev(device);
4196 		if (dd == DS_ERROR)
4197 			return -EIO;
4198 		drbd_md_sync(device);
4199 	} else {
4200 		/*
4201 		 * I am diskless, need to accept the peer's *current* size.
4202 		 * I must NOT accept the peers backing disk size,
4203 		 * it may have been larger than mine all along...
4204 		 *
4205 		 * At this point, the peer knows more about my disk, or at
4206 		 * least about what we last agreed upon, than myself.
4207 		 * So if his c_size is less than his d_size, the most likely
4208 		 * reason is that *my* d_size was smaller last time we checked.
4209 		 *
4210 		 * However, if he sends a zero current size,
4211 		 * take his (user-capped or) backing disk size anyways.
4212 		 *
4213 		 * Unless of course he does not have a disk himself.
4214 		 * In which case we ignore this completely.
4215 		 */
4216 		sector_t new_size = p_csize ?: p_usize ?: p_size;
4217 		drbd_reconsider_queue_parameters(device, NULL, o);
4218 		if (new_size == 0) {
4219 			/* Ignore, peer does not know nothing. */
4220 		} else if (new_size == cur_size) {
4221 			/* nothing to do */
4222 		} else if (cur_size != 0 && p_size == 0) {
4223 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4224 					(unsigned long long)new_size, (unsigned long long)cur_size);
4225 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4226 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4227 					(unsigned long long)new_size, (unsigned long long)cur_size);
4228 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4229 			return -EIO;
4230 		} else {
4231 			/* I believe the peer, if
4232 			 *  - I don't have a current size myself
4233 			 *  - we agree on the size anyways
4234 			 *  - I do have a current size, am Secondary,
4235 			 *    and he has the only disk
4236 			 *  - I do have a current size, am Primary,
4237 			 *    and he has the only disk,
4238 			 *    which is larger than my current size
4239 			 */
4240 			drbd_set_my_capacity(device, new_size);
4241 		}
4242 	}
4243 
4244 	if (get_ldev(device)) {
4245 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4246 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4247 			ldsc = 1;
4248 		}
4249 
4250 		put_ldev(device);
4251 	}
4252 
4253 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4254 		if (be64_to_cpu(p->c_size) !=
4255 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4256 			/* we have different sizes, probably peer
4257 			 * needs to know my new size... */
4258 			drbd_send_sizes(peer_device, 0, ddsf);
4259 		}
4260 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4261 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4262 			if (device->state.pdsk >= D_INCONSISTENT &&
4263 			    device->state.disk >= D_INCONSISTENT) {
4264 				if (ddsf & DDSF_NO_RESYNC)
4265 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4266 				else
4267 					resync_after_online_grow(device);
4268 			} else
4269 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4270 		}
4271 	}
4272 
4273 	return 0;
4274 }
4275 
4276 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4277 {
4278 	struct drbd_peer_device *peer_device;
4279 	struct drbd_device *device;
4280 	struct p_uuids *p = pi->data;
4281 	u64 *p_uuid;
4282 	int i, updated_uuids = 0;
4283 
4284 	peer_device = conn_peer_device(connection, pi->vnr);
4285 	if (!peer_device)
4286 		return config_unknown_volume(connection, pi);
4287 	device = peer_device->device;
4288 
4289 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4290 	if (!p_uuid) {
4291 		drbd_err(device, "kmalloc of p_uuid failed\n");
4292 		return false;
4293 	}
4294 
4295 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4296 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4297 
4298 	kfree(device->p_uuid);
4299 	device->p_uuid = p_uuid;
4300 
4301 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4302 	    device->state.disk < D_INCONSISTENT &&
4303 	    device->state.role == R_PRIMARY &&
4304 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4305 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4306 		    (unsigned long long)device->ed_uuid);
4307 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4308 		return -EIO;
4309 	}
4310 
4311 	if (get_ldev(device)) {
4312 		int skip_initial_sync =
4313 			device->state.conn == C_CONNECTED &&
4314 			peer_device->connection->agreed_pro_version >= 90 &&
4315 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4316 			(p_uuid[UI_FLAGS] & 8);
4317 		if (skip_initial_sync) {
4318 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4319 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4320 					"clear_n_write from receive_uuids",
4321 					BM_LOCKED_TEST_ALLOWED);
4322 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4323 			_drbd_uuid_set(device, UI_BITMAP, 0);
4324 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4325 					CS_VERBOSE, NULL);
4326 			drbd_md_sync(device);
4327 			updated_uuids = 1;
4328 		}
4329 		put_ldev(device);
4330 	} else if (device->state.disk < D_INCONSISTENT &&
4331 		   device->state.role == R_PRIMARY) {
4332 		/* I am a diskless primary, the peer just created a new current UUID
4333 		   for me. */
4334 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4335 	}
4336 
4337 	/* Before we test for the disk state, we should wait until an eventually
4338 	   ongoing cluster wide state change is finished. That is important if
4339 	   we are primary and are detaching from our disk. We need to see the
4340 	   new disk state... */
4341 	mutex_lock(device->state_mutex);
4342 	mutex_unlock(device->state_mutex);
4343 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4344 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4345 
4346 	if (updated_uuids)
4347 		drbd_print_uuids(device, "receiver updated UUIDs to");
4348 
4349 	return 0;
4350 }
4351 
4352 /**
4353  * convert_state() - Converts the peer's view of the cluster state to our point of view
4354  * @ps:		The state as seen by the peer.
4355  */
4356 static union drbd_state convert_state(union drbd_state ps)
4357 {
4358 	union drbd_state ms;
4359 
4360 	static enum drbd_conns c_tab[] = {
4361 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4362 		[C_CONNECTED] = C_CONNECTED,
4363 
4364 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4365 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4366 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4367 		[C_VERIFY_S]       = C_VERIFY_T,
4368 		[C_MASK]   = C_MASK,
4369 	};
4370 
4371 	ms.i = ps.i;
4372 
4373 	ms.conn = c_tab[ps.conn];
4374 	ms.peer = ps.role;
4375 	ms.role = ps.peer;
4376 	ms.pdsk = ps.disk;
4377 	ms.disk = ps.pdsk;
4378 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4379 
4380 	return ms;
4381 }
4382 
4383 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4384 {
4385 	struct drbd_peer_device *peer_device;
4386 	struct drbd_device *device;
4387 	struct p_req_state *p = pi->data;
4388 	union drbd_state mask, val;
4389 	enum drbd_state_rv rv;
4390 
4391 	peer_device = conn_peer_device(connection, pi->vnr);
4392 	if (!peer_device)
4393 		return -EIO;
4394 	device = peer_device->device;
4395 
4396 	mask.i = be32_to_cpu(p->mask);
4397 	val.i = be32_to_cpu(p->val);
4398 
4399 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4400 	    mutex_is_locked(device->state_mutex)) {
4401 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4402 		return 0;
4403 	}
4404 
4405 	mask = convert_state(mask);
4406 	val = convert_state(val);
4407 
4408 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4409 	drbd_send_sr_reply(peer_device, rv);
4410 
4411 	drbd_md_sync(device);
4412 
4413 	return 0;
4414 }
4415 
4416 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4417 {
4418 	struct p_req_state *p = pi->data;
4419 	union drbd_state mask, val;
4420 	enum drbd_state_rv rv;
4421 
4422 	mask.i = be32_to_cpu(p->mask);
4423 	val.i = be32_to_cpu(p->val);
4424 
4425 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4426 	    mutex_is_locked(&connection->cstate_mutex)) {
4427 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4428 		return 0;
4429 	}
4430 
4431 	mask = convert_state(mask);
4432 	val = convert_state(val);
4433 
4434 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4435 	conn_send_sr_reply(connection, rv);
4436 
4437 	return 0;
4438 }
4439 
4440 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4441 {
4442 	struct drbd_peer_device *peer_device;
4443 	struct drbd_device *device;
4444 	struct p_state *p = pi->data;
4445 	union drbd_state os, ns, peer_state;
4446 	enum drbd_disk_state real_peer_disk;
4447 	enum chg_state_flags cs_flags;
4448 	int rv;
4449 
4450 	peer_device = conn_peer_device(connection, pi->vnr);
4451 	if (!peer_device)
4452 		return config_unknown_volume(connection, pi);
4453 	device = peer_device->device;
4454 
4455 	peer_state.i = be32_to_cpu(p->state);
4456 
4457 	real_peer_disk = peer_state.disk;
4458 	if (peer_state.disk == D_NEGOTIATING) {
4459 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4460 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4461 	}
4462 
4463 	spin_lock_irq(&device->resource->req_lock);
4464  retry:
4465 	os = ns = drbd_read_state(device);
4466 	spin_unlock_irq(&device->resource->req_lock);
4467 
4468 	/* If some other part of the code (ack_receiver thread, timeout)
4469 	 * already decided to close the connection again,
4470 	 * we must not "re-establish" it here. */
4471 	if (os.conn <= C_TEAR_DOWN)
4472 		return -ECONNRESET;
4473 
4474 	/* If this is the "end of sync" confirmation, usually the peer disk
4475 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4476 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4477 	 * unpause-sync events has been "just right", the peer disk may
4478 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4479 	 */
4480 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4481 	    real_peer_disk == D_UP_TO_DATE &&
4482 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4483 		/* If we are (becoming) SyncSource, but peer is still in sync
4484 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4485 		 * will change to inconsistent once the peer reaches active
4486 		 * syncing states.
4487 		 * It may have changed syncer-paused flags, however, so we
4488 		 * cannot ignore this completely. */
4489 		if (peer_state.conn > C_CONNECTED &&
4490 		    peer_state.conn < C_SYNC_SOURCE)
4491 			real_peer_disk = D_INCONSISTENT;
4492 
4493 		/* if peer_state changes to connected at the same time,
4494 		 * it explicitly notifies us that it finished resync.
4495 		 * Maybe we should finish it up, too? */
4496 		else if (os.conn >= C_SYNC_SOURCE &&
4497 			 peer_state.conn == C_CONNECTED) {
4498 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4499 				drbd_resync_finished(device);
4500 			return 0;
4501 		}
4502 	}
4503 
4504 	/* explicit verify finished notification, stop sector reached. */
4505 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4506 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4507 		ov_out_of_sync_print(device);
4508 		drbd_resync_finished(device);
4509 		return 0;
4510 	}
4511 
4512 	/* peer says his disk is inconsistent, while we think it is uptodate,
4513 	 * and this happens while the peer still thinks we have a sync going on,
4514 	 * but we think we are already done with the sync.
4515 	 * We ignore this to avoid flapping pdsk.
4516 	 * This should not happen, if the peer is a recent version of drbd. */
4517 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4518 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4519 		real_peer_disk = D_UP_TO_DATE;
4520 
4521 	if (ns.conn == C_WF_REPORT_PARAMS)
4522 		ns.conn = C_CONNECTED;
4523 
4524 	if (peer_state.conn == C_AHEAD)
4525 		ns.conn = C_BEHIND;
4526 
4527 	/* TODO:
4528 	 * if (primary and diskless and peer uuid != effective uuid)
4529 	 *     abort attach on peer;
4530 	 *
4531 	 * If this node does not have good data, was already connected, but
4532 	 * the peer did a late attach only now, trying to "negotiate" with me,
4533 	 * AND I am currently Primary, possibly frozen, with some specific
4534 	 * "effective" uuid, this should never be reached, really, because
4535 	 * we first send the uuids, then the current state.
4536 	 *
4537 	 * In this scenario, we already dropped the connection hard
4538 	 * when we received the unsuitable uuids (receive_uuids().
4539 	 *
4540 	 * Should we want to change this, that is: not drop the connection in
4541 	 * receive_uuids() already, then we would need to add a branch here
4542 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4543 	 * this node is currently Diskless Primary.
4544 	 */
4545 
4546 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4547 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4548 		int cr; /* consider resync */
4549 
4550 		/* if we established a new connection */
4551 		cr  = (os.conn < C_CONNECTED);
4552 		/* if we had an established connection
4553 		 * and one of the nodes newly attaches a disk */
4554 		cr |= (os.conn == C_CONNECTED &&
4555 		       (peer_state.disk == D_NEGOTIATING ||
4556 			os.disk == D_NEGOTIATING));
4557 		/* if we have both been inconsistent, and the peer has been
4558 		 * forced to be UpToDate with --force */
4559 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4560 		/* if we had been plain connected, and the admin requested to
4561 		 * start a sync by "invalidate" or "invalidate-remote" */
4562 		cr |= (os.conn == C_CONNECTED &&
4563 				(peer_state.conn >= C_STARTING_SYNC_S &&
4564 				 peer_state.conn <= C_WF_BITMAP_T));
4565 
4566 		if (cr)
4567 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4568 
4569 		put_ldev(device);
4570 		if (ns.conn == C_MASK) {
4571 			ns.conn = C_CONNECTED;
4572 			if (device->state.disk == D_NEGOTIATING) {
4573 				drbd_force_state(device, NS(disk, D_FAILED));
4574 			} else if (peer_state.disk == D_NEGOTIATING) {
4575 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4576 				peer_state.disk = D_DISKLESS;
4577 				real_peer_disk = D_DISKLESS;
4578 			} else {
4579 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4580 					return -EIO;
4581 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4582 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4583 				return -EIO;
4584 			}
4585 		}
4586 	}
4587 
4588 	spin_lock_irq(&device->resource->req_lock);
4589 	if (os.i != drbd_read_state(device).i)
4590 		goto retry;
4591 	clear_bit(CONSIDER_RESYNC, &device->flags);
4592 	ns.peer = peer_state.role;
4593 	ns.pdsk = real_peer_disk;
4594 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4595 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4596 		ns.disk = device->new_state_tmp.disk;
4597 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4598 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4599 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4600 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4601 		   for temporal network outages! */
4602 		spin_unlock_irq(&device->resource->req_lock);
4603 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4604 		tl_clear(peer_device->connection);
4605 		drbd_uuid_new_current(device);
4606 		clear_bit(NEW_CUR_UUID, &device->flags);
4607 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4608 		return -EIO;
4609 	}
4610 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4611 	ns = drbd_read_state(device);
4612 	spin_unlock_irq(&device->resource->req_lock);
4613 
4614 	if (rv < SS_SUCCESS) {
4615 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4616 		return -EIO;
4617 	}
4618 
4619 	if (os.conn > C_WF_REPORT_PARAMS) {
4620 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4621 		    peer_state.disk != D_NEGOTIATING ) {
4622 			/* we want resync, peer has not yet decided to sync... */
4623 			/* Nowadays only used when forcing a node into primary role and
4624 			   setting its disk to UpToDate with that */
4625 			drbd_send_uuids(peer_device);
4626 			drbd_send_current_state(peer_device);
4627 		}
4628 	}
4629 
4630 	clear_bit(DISCARD_MY_DATA, &device->flags);
4631 
4632 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4633 
4634 	return 0;
4635 }
4636 
4637 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4638 {
4639 	struct drbd_peer_device *peer_device;
4640 	struct drbd_device *device;
4641 	struct p_rs_uuid *p = pi->data;
4642 
4643 	peer_device = conn_peer_device(connection, pi->vnr);
4644 	if (!peer_device)
4645 		return -EIO;
4646 	device = peer_device->device;
4647 
4648 	wait_event(device->misc_wait,
4649 		   device->state.conn == C_WF_SYNC_UUID ||
4650 		   device->state.conn == C_BEHIND ||
4651 		   device->state.conn < C_CONNECTED ||
4652 		   device->state.disk < D_NEGOTIATING);
4653 
4654 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4655 
4656 	/* Here the _drbd_uuid_ functions are right, current should
4657 	   _not_ be rotated into the history */
4658 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4659 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4660 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4661 
4662 		drbd_print_uuids(device, "updated sync uuid");
4663 		drbd_start_resync(device, C_SYNC_TARGET);
4664 
4665 		put_ldev(device);
4666 	} else
4667 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4668 
4669 	return 0;
4670 }
4671 
4672 /**
4673  * receive_bitmap_plain
4674  *
4675  * Return 0 when done, 1 when another iteration is needed, and a negative error
4676  * code upon failure.
4677  */
4678 static int
4679 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4680 		     unsigned long *p, struct bm_xfer_ctx *c)
4681 {
4682 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4683 				 drbd_header_size(peer_device->connection);
4684 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4685 				       c->bm_words - c->word_offset);
4686 	unsigned int want = num_words * sizeof(*p);
4687 	int err;
4688 
4689 	if (want != size) {
4690 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4691 		return -EIO;
4692 	}
4693 	if (want == 0)
4694 		return 0;
4695 	err = drbd_recv_all(peer_device->connection, p, want);
4696 	if (err)
4697 		return err;
4698 
4699 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4700 
4701 	c->word_offset += num_words;
4702 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4703 	if (c->bit_offset > c->bm_bits)
4704 		c->bit_offset = c->bm_bits;
4705 
4706 	return 1;
4707 }
4708 
4709 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4710 {
4711 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4712 }
4713 
4714 static int dcbp_get_start(struct p_compressed_bm *p)
4715 {
4716 	return (p->encoding & 0x80) != 0;
4717 }
4718 
4719 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4720 {
4721 	return (p->encoding >> 4) & 0x7;
4722 }
4723 
4724 /**
4725  * recv_bm_rle_bits
4726  *
4727  * Return 0 when done, 1 when another iteration is needed, and a negative error
4728  * code upon failure.
4729  */
4730 static int
4731 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4732 		struct p_compressed_bm *p,
4733 		 struct bm_xfer_ctx *c,
4734 		 unsigned int len)
4735 {
4736 	struct bitstream bs;
4737 	u64 look_ahead;
4738 	u64 rl;
4739 	u64 tmp;
4740 	unsigned long s = c->bit_offset;
4741 	unsigned long e;
4742 	int toggle = dcbp_get_start(p);
4743 	int have;
4744 	int bits;
4745 
4746 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4747 
4748 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4749 	if (bits < 0)
4750 		return -EIO;
4751 
4752 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4753 		bits = vli_decode_bits(&rl, look_ahead);
4754 		if (bits <= 0)
4755 			return -EIO;
4756 
4757 		if (toggle) {
4758 			e = s + rl -1;
4759 			if (e >= c->bm_bits) {
4760 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4761 				return -EIO;
4762 			}
4763 			_drbd_bm_set_bits(peer_device->device, s, e);
4764 		}
4765 
4766 		if (have < bits) {
4767 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4768 				have, bits, look_ahead,
4769 				(unsigned int)(bs.cur.b - p->code),
4770 				(unsigned int)bs.buf_len);
4771 			return -EIO;
4772 		}
4773 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4774 		if (likely(bits < 64))
4775 			look_ahead >>= bits;
4776 		else
4777 			look_ahead = 0;
4778 		have -= bits;
4779 
4780 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4781 		if (bits < 0)
4782 			return -EIO;
4783 		look_ahead |= tmp << have;
4784 		have += bits;
4785 	}
4786 
4787 	c->bit_offset = s;
4788 	bm_xfer_ctx_bit_to_word_offset(c);
4789 
4790 	return (s != c->bm_bits);
4791 }
4792 
4793 /**
4794  * decode_bitmap_c
4795  *
4796  * Return 0 when done, 1 when another iteration is needed, and a negative error
4797  * code upon failure.
4798  */
4799 static int
4800 decode_bitmap_c(struct drbd_peer_device *peer_device,
4801 		struct p_compressed_bm *p,
4802 		struct bm_xfer_ctx *c,
4803 		unsigned int len)
4804 {
4805 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4806 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4807 
4808 	/* other variants had been implemented for evaluation,
4809 	 * but have been dropped as this one turned out to be "best"
4810 	 * during all our tests. */
4811 
4812 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4813 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4814 	return -EIO;
4815 }
4816 
4817 void INFO_bm_xfer_stats(struct drbd_device *device,
4818 		const char *direction, struct bm_xfer_ctx *c)
4819 {
4820 	/* what would it take to transfer it "plaintext" */
4821 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4822 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4823 	unsigned int plain =
4824 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4825 		c->bm_words * sizeof(unsigned long);
4826 	unsigned int total = c->bytes[0] + c->bytes[1];
4827 	unsigned int r;
4828 
4829 	/* total can not be zero. but just in case: */
4830 	if (total == 0)
4831 		return;
4832 
4833 	/* don't report if not compressed */
4834 	if (total >= plain)
4835 		return;
4836 
4837 	/* total < plain. check for overflow, still */
4838 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4839 		                    : (1000 * total / plain);
4840 
4841 	if (r > 1000)
4842 		r = 1000;
4843 
4844 	r = 1000 - r;
4845 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4846 	     "total %u; compression: %u.%u%%\n",
4847 			direction,
4848 			c->bytes[1], c->packets[1],
4849 			c->bytes[0], c->packets[0],
4850 			total, r/10, r % 10);
4851 }
4852 
4853 /* Since we are processing the bitfield from lower addresses to higher,
4854    it does not matter if the process it in 32 bit chunks or 64 bit
4855    chunks as long as it is little endian. (Understand it as byte stream,
4856    beginning with the lowest byte...) If we would use big endian
4857    we would need to process it from the highest address to the lowest,
4858    in order to be agnostic to the 32 vs 64 bits issue.
4859 
4860    returns 0 on failure, 1 if we successfully received it. */
4861 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4862 {
4863 	struct drbd_peer_device *peer_device;
4864 	struct drbd_device *device;
4865 	struct bm_xfer_ctx c;
4866 	int err;
4867 
4868 	peer_device = conn_peer_device(connection, pi->vnr);
4869 	if (!peer_device)
4870 		return -EIO;
4871 	device = peer_device->device;
4872 
4873 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4874 	/* you are supposed to send additional out-of-sync information
4875 	 * if you actually set bits during this phase */
4876 
4877 	c = (struct bm_xfer_ctx) {
4878 		.bm_bits = drbd_bm_bits(device),
4879 		.bm_words = drbd_bm_words(device),
4880 	};
4881 
4882 	for(;;) {
4883 		if (pi->cmd == P_BITMAP)
4884 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4885 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4886 			/* MAYBE: sanity check that we speak proto >= 90,
4887 			 * and the feature is enabled! */
4888 			struct p_compressed_bm *p = pi->data;
4889 
4890 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4891 				drbd_err(device, "ReportCBitmap packet too large\n");
4892 				err = -EIO;
4893 				goto out;
4894 			}
4895 			if (pi->size <= sizeof(*p)) {
4896 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4897 				err = -EIO;
4898 				goto out;
4899 			}
4900 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4901 			if (err)
4902 			       goto out;
4903 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4904 		} else {
4905 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4906 			err = -EIO;
4907 			goto out;
4908 		}
4909 
4910 		c.packets[pi->cmd == P_BITMAP]++;
4911 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4912 
4913 		if (err <= 0) {
4914 			if (err < 0)
4915 				goto out;
4916 			break;
4917 		}
4918 		err = drbd_recv_header(peer_device->connection, pi);
4919 		if (err)
4920 			goto out;
4921 	}
4922 
4923 	INFO_bm_xfer_stats(device, "receive", &c);
4924 
4925 	if (device->state.conn == C_WF_BITMAP_T) {
4926 		enum drbd_state_rv rv;
4927 
4928 		err = drbd_send_bitmap(device);
4929 		if (err)
4930 			goto out;
4931 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4932 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4933 		D_ASSERT(device, rv == SS_SUCCESS);
4934 	} else if (device->state.conn != C_WF_BITMAP_S) {
4935 		/* admin may have requested C_DISCONNECTING,
4936 		 * other threads may have noticed network errors */
4937 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4938 		    drbd_conn_str(device->state.conn));
4939 	}
4940 	err = 0;
4941 
4942  out:
4943 	drbd_bm_unlock(device);
4944 	if (!err && device->state.conn == C_WF_BITMAP_S)
4945 		drbd_start_resync(device, C_SYNC_SOURCE);
4946 	return err;
4947 }
4948 
4949 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4950 {
4951 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4952 		 pi->cmd, pi->size);
4953 
4954 	return ignore_remaining_packet(connection, pi);
4955 }
4956 
4957 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4958 {
4959 	/* Make sure we've acked all the TCP data associated
4960 	 * with the data requests being unplugged */
4961 	drbd_tcp_quickack(connection->data.socket);
4962 
4963 	return 0;
4964 }
4965 
4966 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4967 {
4968 	struct drbd_peer_device *peer_device;
4969 	struct drbd_device *device;
4970 	struct p_block_desc *p = pi->data;
4971 
4972 	peer_device = conn_peer_device(connection, pi->vnr);
4973 	if (!peer_device)
4974 		return -EIO;
4975 	device = peer_device->device;
4976 
4977 	switch (device->state.conn) {
4978 	case C_WF_SYNC_UUID:
4979 	case C_WF_BITMAP_T:
4980 	case C_BEHIND:
4981 			break;
4982 	default:
4983 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4984 				drbd_conn_str(device->state.conn));
4985 	}
4986 
4987 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4988 
4989 	return 0;
4990 }
4991 
4992 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4993 {
4994 	struct drbd_peer_device *peer_device;
4995 	struct p_block_desc *p = pi->data;
4996 	struct drbd_device *device;
4997 	sector_t sector;
4998 	int size, err = 0;
4999 
5000 	peer_device = conn_peer_device(connection, pi->vnr);
5001 	if (!peer_device)
5002 		return -EIO;
5003 	device = peer_device->device;
5004 
5005 	sector = be64_to_cpu(p->sector);
5006 	size = be32_to_cpu(p->blksize);
5007 
5008 	dec_rs_pending(device);
5009 
5010 	if (get_ldev(device)) {
5011 		struct drbd_peer_request *peer_req;
5012 		const int op = REQ_OP_WRITE_ZEROES;
5013 
5014 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5015 					       size, 0, GFP_NOIO);
5016 		if (!peer_req) {
5017 			put_ldev(device);
5018 			return -ENOMEM;
5019 		}
5020 
5021 		peer_req->w.cb = e_end_resync_block;
5022 		peer_req->submit_jif = jiffies;
5023 		peer_req->flags |= EE_TRIM;
5024 
5025 		spin_lock_irq(&device->resource->req_lock);
5026 		list_add_tail(&peer_req->w.list, &device->sync_ee);
5027 		spin_unlock_irq(&device->resource->req_lock);
5028 
5029 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
5030 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5031 
5032 		if (err) {
5033 			spin_lock_irq(&device->resource->req_lock);
5034 			list_del(&peer_req->w.list);
5035 			spin_unlock_irq(&device->resource->req_lock);
5036 
5037 			drbd_free_peer_req(device, peer_req);
5038 			put_ldev(device);
5039 			err = 0;
5040 			goto fail;
5041 		}
5042 
5043 		inc_unacked(device);
5044 
5045 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5046 		   as well as drbd_rs_complete_io() */
5047 	} else {
5048 	fail:
5049 		drbd_rs_complete_io(device, sector);
5050 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5051 	}
5052 
5053 	atomic_add(size >> 9, &device->rs_sect_in);
5054 
5055 	return err;
5056 }
5057 
5058 struct data_cmd {
5059 	int expect_payload;
5060 	unsigned int pkt_size;
5061 	int (*fn)(struct drbd_connection *, struct packet_info *);
5062 };
5063 
5064 static struct data_cmd drbd_cmd_handler[] = {
5065 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
5066 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
5067 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5068 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5069 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
5070 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5071 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5072 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5073 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5074 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
5075 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5076 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5077 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
5078 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
5079 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
5080 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5081 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5082 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5083 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5084 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5085 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5087 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5088 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5089 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5090 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
5091 	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
5092 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5093 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
5094 };
5095 
5096 static void drbdd(struct drbd_connection *connection)
5097 {
5098 	struct packet_info pi;
5099 	size_t shs; /* sub header size */
5100 	int err;
5101 
5102 	while (get_t_state(&connection->receiver) == RUNNING) {
5103 		struct data_cmd const *cmd;
5104 
5105 		drbd_thread_current_set_cpu(&connection->receiver);
5106 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5107 		if (drbd_recv_header_maybe_unplug(connection, &pi))
5108 			goto err_out;
5109 
5110 		cmd = &drbd_cmd_handler[pi.cmd];
5111 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5112 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5113 				 cmdname(pi.cmd), pi.cmd);
5114 			goto err_out;
5115 		}
5116 
5117 		shs = cmd->pkt_size;
5118 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5119 			shs += sizeof(struct o_qlim);
5120 		if (pi.size > shs && !cmd->expect_payload) {
5121 			drbd_err(connection, "No payload expected %s l:%d\n",
5122 				 cmdname(pi.cmd), pi.size);
5123 			goto err_out;
5124 		}
5125 		if (pi.size < shs) {
5126 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5127 				 cmdname(pi.cmd), (int)shs, pi.size);
5128 			goto err_out;
5129 		}
5130 
5131 		if (shs) {
5132 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5133 			err = drbd_recv_all_warn(connection, pi.data, shs);
5134 			if (err)
5135 				goto err_out;
5136 			pi.size -= shs;
5137 		}
5138 
5139 		update_receiver_timing_details(connection, cmd->fn);
5140 		err = cmd->fn(connection, &pi);
5141 		if (err) {
5142 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5143 				 cmdname(pi.cmd), err, pi.size);
5144 			goto err_out;
5145 		}
5146 	}
5147 	return;
5148 
5149     err_out:
5150 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5151 }
5152 
5153 static void conn_disconnect(struct drbd_connection *connection)
5154 {
5155 	struct drbd_peer_device *peer_device;
5156 	enum drbd_conns oc;
5157 	int vnr;
5158 
5159 	if (connection->cstate == C_STANDALONE)
5160 		return;
5161 
5162 	/* We are about to start the cleanup after connection loss.
5163 	 * Make sure drbd_make_request knows about that.
5164 	 * Usually we should be in some network failure state already,
5165 	 * but just in case we are not, we fix it up here.
5166 	 */
5167 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5168 
5169 	/* ack_receiver does not clean up anything. it must not interfere, either */
5170 	drbd_thread_stop(&connection->ack_receiver);
5171 	if (connection->ack_sender) {
5172 		destroy_workqueue(connection->ack_sender);
5173 		connection->ack_sender = NULL;
5174 	}
5175 	drbd_free_sock(connection);
5176 
5177 	rcu_read_lock();
5178 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5179 		struct drbd_device *device = peer_device->device;
5180 		kref_get(&device->kref);
5181 		rcu_read_unlock();
5182 		drbd_disconnected(peer_device);
5183 		kref_put(&device->kref, drbd_destroy_device);
5184 		rcu_read_lock();
5185 	}
5186 	rcu_read_unlock();
5187 
5188 	if (!list_empty(&connection->current_epoch->list))
5189 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5190 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5191 	atomic_set(&connection->current_epoch->epoch_size, 0);
5192 	connection->send.seen_any_write_yet = false;
5193 
5194 	drbd_info(connection, "Connection closed\n");
5195 
5196 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5197 		conn_try_outdate_peer_async(connection);
5198 
5199 	spin_lock_irq(&connection->resource->req_lock);
5200 	oc = connection->cstate;
5201 	if (oc >= C_UNCONNECTED)
5202 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5203 
5204 	spin_unlock_irq(&connection->resource->req_lock);
5205 
5206 	if (oc == C_DISCONNECTING)
5207 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5208 }
5209 
5210 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5211 {
5212 	struct drbd_device *device = peer_device->device;
5213 	unsigned int i;
5214 
5215 	/* wait for current activity to cease. */
5216 	spin_lock_irq(&device->resource->req_lock);
5217 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5218 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5219 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5220 	spin_unlock_irq(&device->resource->req_lock);
5221 
5222 	/* We do not have data structures that would allow us to
5223 	 * get the rs_pending_cnt down to 0 again.
5224 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5225 	 *    the pending RSDataRequest's we have sent.
5226 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5227 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5228 	 *  And no, it is not the sum of the reference counts in the
5229 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5230 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5231 	 *  on the fly. */
5232 	drbd_rs_cancel_all(device);
5233 	device->rs_total = 0;
5234 	device->rs_failed = 0;
5235 	atomic_set(&device->rs_pending_cnt, 0);
5236 	wake_up(&device->misc_wait);
5237 
5238 	del_timer_sync(&device->resync_timer);
5239 	resync_timer_fn(&device->resync_timer);
5240 
5241 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5242 	 * w_make_resync_request etc. which may still be on the worker queue
5243 	 * to be "canceled" */
5244 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5245 
5246 	drbd_finish_peer_reqs(device);
5247 
5248 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5249 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5250 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5251 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5252 
5253 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5254 	 * again via drbd_try_clear_on_disk_bm(). */
5255 	drbd_rs_cancel_all(device);
5256 
5257 	kfree(device->p_uuid);
5258 	device->p_uuid = NULL;
5259 
5260 	if (!drbd_suspended(device))
5261 		tl_clear(peer_device->connection);
5262 
5263 	drbd_md_sync(device);
5264 
5265 	if (get_ldev(device)) {
5266 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5267 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5268 		put_ldev(device);
5269 	}
5270 
5271 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5272 	 * want to use SO_LINGER, because apparently it can be deferred for
5273 	 * more than 20 seconds (longest time I checked).
5274 	 *
5275 	 * Actually we don't care for exactly when the network stack does its
5276 	 * put_page(), but release our reference on these pages right here.
5277 	 */
5278 	i = drbd_free_peer_reqs(device, &device->net_ee);
5279 	if (i)
5280 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5281 	i = atomic_read(&device->pp_in_use_by_net);
5282 	if (i)
5283 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5284 	i = atomic_read(&device->pp_in_use);
5285 	if (i)
5286 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5287 
5288 	D_ASSERT(device, list_empty(&device->read_ee));
5289 	D_ASSERT(device, list_empty(&device->active_ee));
5290 	D_ASSERT(device, list_empty(&device->sync_ee));
5291 	D_ASSERT(device, list_empty(&device->done_ee));
5292 
5293 	return 0;
5294 }
5295 
5296 /*
5297  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5298  * we can agree on is stored in agreed_pro_version.
5299  *
5300  * feature flags and the reserved array should be enough room for future
5301  * enhancements of the handshake protocol, and possible plugins...
5302  *
5303  * for now, they are expected to be zero, but ignored.
5304  */
5305 static int drbd_send_features(struct drbd_connection *connection)
5306 {
5307 	struct drbd_socket *sock;
5308 	struct p_connection_features *p;
5309 
5310 	sock = &connection->data;
5311 	p = conn_prepare_command(connection, sock);
5312 	if (!p)
5313 		return -EIO;
5314 	memset(p, 0, sizeof(*p));
5315 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5316 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5317 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5318 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5319 }
5320 
5321 /*
5322  * return values:
5323  *   1 yes, we have a valid connection
5324  *   0 oops, did not work out, please try again
5325  *  -1 peer talks different language,
5326  *     no point in trying again, please go standalone.
5327  */
5328 static int drbd_do_features(struct drbd_connection *connection)
5329 {
5330 	/* ASSERT current == connection->receiver ... */
5331 	struct p_connection_features *p;
5332 	const int expect = sizeof(struct p_connection_features);
5333 	struct packet_info pi;
5334 	int err;
5335 
5336 	err = drbd_send_features(connection);
5337 	if (err)
5338 		return 0;
5339 
5340 	err = drbd_recv_header(connection, &pi);
5341 	if (err)
5342 		return 0;
5343 
5344 	if (pi.cmd != P_CONNECTION_FEATURES) {
5345 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5346 			 cmdname(pi.cmd), pi.cmd);
5347 		return -1;
5348 	}
5349 
5350 	if (pi.size != expect) {
5351 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5352 		     expect, pi.size);
5353 		return -1;
5354 	}
5355 
5356 	p = pi.data;
5357 	err = drbd_recv_all_warn(connection, p, expect);
5358 	if (err)
5359 		return 0;
5360 
5361 	p->protocol_min = be32_to_cpu(p->protocol_min);
5362 	p->protocol_max = be32_to_cpu(p->protocol_max);
5363 	if (p->protocol_max == 0)
5364 		p->protocol_max = p->protocol_min;
5365 
5366 	if (PRO_VERSION_MAX < p->protocol_min ||
5367 	    PRO_VERSION_MIN > p->protocol_max)
5368 		goto incompat;
5369 
5370 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5371 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5372 
5373 	drbd_info(connection, "Handshake successful: "
5374 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5375 
5376 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5377 		  connection->agreed_features,
5378 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5379 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5380 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5381 		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5382 		  connection->agreed_features ? "" : " none");
5383 
5384 	return 1;
5385 
5386  incompat:
5387 	drbd_err(connection, "incompatible DRBD dialects: "
5388 	    "I support %d-%d, peer supports %d-%d\n",
5389 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5390 	    p->protocol_min, p->protocol_max);
5391 	return -1;
5392 }
5393 
5394 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5395 static int drbd_do_auth(struct drbd_connection *connection)
5396 {
5397 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5398 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5399 	return -1;
5400 }
5401 #else
5402 #define CHALLENGE_LEN 64
5403 
5404 /* Return value:
5405 	1 - auth succeeded,
5406 	0 - failed, try again (network error),
5407 	-1 - auth failed, don't try again.
5408 */
5409 
5410 static int drbd_do_auth(struct drbd_connection *connection)
5411 {
5412 	struct drbd_socket *sock;
5413 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5414 	char *response = NULL;
5415 	char *right_response = NULL;
5416 	char *peers_ch = NULL;
5417 	unsigned int key_len;
5418 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5419 	unsigned int resp_size;
5420 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5421 	struct packet_info pi;
5422 	struct net_conf *nc;
5423 	int err, rv;
5424 
5425 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5426 
5427 	rcu_read_lock();
5428 	nc = rcu_dereference(connection->net_conf);
5429 	key_len = strlen(nc->shared_secret);
5430 	memcpy(secret, nc->shared_secret, key_len);
5431 	rcu_read_unlock();
5432 
5433 	desc->tfm = connection->cram_hmac_tfm;
5434 
5435 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5436 	if (rv) {
5437 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5438 		rv = -1;
5439 		goto fail;
5440 	}
5441 
5442 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5443 
5444 	sock = &connection->data;
5445 	if (!conn_prepare_command(connection, sock)) {
5446 		rv = 0;
5447 		goto fail;
5448 	}
5449 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5450 				my_challenge, CHALLENGE_LEN);
5451 	if (!rv)
5452 		goto fail;
5453 
5454 	err = drbd_recv_header(connection, &pi);
5455 	if (err) {
5456 		rv = 0;
5457 		goto fail;
5458 	}
5459 
5460 	if (pi.cmd != P_AUTH_CHALLENGE) {
5461 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5462 			 cmdname(pi.cmd), pi.cmd);
5463 		rv = -1;
5464 		goto fail;
5465 	}
5466 
5467 	if (pi.size > CHALLENGE_LEN * 2) {
5468 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5469 		rv = -1;
5470 		goto fail;
5471 	}
5472 
5473 	if (pi.size < CHALLENGE_LEN) {
5474 		drbd_err(connection, "AuthChallenge payload too small.\n");
5475 		rv = -1;
5476 		goto fail;
5477 	}
5478 
5479 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5480 	if (peers_ch == NULL) {
5481 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5482 		rv = -1;
5483 		goto fail;
5484 	}
5485 
5486 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5487 	if (err) {
5488 		rv = 0;
5489 		goto fail;
5490 	}
5491 
5492 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5493 		drbd_err(connection, "Peer presented the same challenge!\n");
5494 		rv = -1;
5495 		goto fail;
5496 	}
5497 
5498 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5499 	response = kmalloc(resp_size, GFP_NOIO);
5500 	if (response == NULL) {
5501 		drbd_err(connection, "kmalloc of response failed\n");
5502 		rv = -1;
5503 		goto fail;
5504 	}
5505 
5506 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5507 	if (rv) {
5508 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5509 		rv = -1;
5510 		goto fail;
5511 	}
5512 
5513 	if (!conn_prepare_command(connection, sock)) {
5514 		rv = 0;
5515 		goto fail;
5516 	}
5517 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5518 				response, resp_size);
5519 	if (!rv)
5520 		goto fail;
5521 
5522 	err = drbd_recv_header(connection, &pi);
5523 	if (err) {
5524 		rv = 0;
5525 		goto fail;
5526 	}
5527 
5528 	if (pi.cmd != P_AUTH_RESPONSE) {
5529 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5530 			 cmdname(pi.cmd), pi.cmd);
5531 		rv = 0;
5532 		goto fail;
5533 	}
5534 
5535 	if (pi.size != resp_size) {
5536 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5537 		rv = 0;
5538 		goto fail;
5539 	}
5540 
5541 	err = drbd_recv_all_warn(connection, response , resp_size);
5542 	if (err) {
5543 		rv = 0;
5544 		goto fail;
5545 	}
5546 
5547 	right_response = kmalloc(resp_size, GFP_NOIO);
5548 	if (right_response == NULL) {
5549 		drbd_err(connection, "kmalloc of right_response failed\n");
5550 		rv = -1;
5551 		goto fail;
5552 	}
5553 
5554 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5555 				 right_response);
5556 	if (rv) {
5557 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5558 		rv = -1;
5559 		goto fail;
5560 	}
5561 
5562 	rv = !memcmp(response, right_response, resp_size);
5563 
5564 	if (rv)
5565 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5566 		     resp_size);
5567 	else
5568 		rv = -1;
5569 
5570  fail:
5571 	kfree(peers_ch);
5572 	kfree(response);
5573 	kfree(right_response);
5574 	shash_desc_zero(desc);
5575 
5576 	return rv;
5577 }
5578 #endif
5579 
5580 int drbd_receiver(struct drbd_thread *thi)
5581 {
5582 	struct drbd_connection *connection = thi->connection;
5583 	int h;
5584 
5585 	drbd_info(connection, "receiver (re)started\n");
5586 
5587 	do {
5588 		h = conn_connect(connection);
5589 		if (h == 0) {
5590 			conn_disconnect(connection);
5591 			schedule_timeout_interruptible(HZ);
5592 		}
5593 		if (h == -1) {
5594 			drbd_warn(connection, "Discarding network configuration.\n");
5595 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5596 		}
5597 	} while (h == 0);
5598 
5599 	if (h > 0) {
5600 		blk_start_plug(&connection->receiver_plug);
5601 		drbdd(connection);
5602 		blk_finish_plug(&connection->receiver_plug);
5603 	}
5604 
5605 	conn_disconnect(connection);
5606 
5607 	drbd_info(connection, "receiver terminated\n");
5608 	return 0;
5609 }
5610 
5611 /* ********* acknowledge sender ******** */
5612 
5613 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5614 {
5615 	struct p_req_state_reply *p = pi->data;
5616 	int retcode = be32_to_cpu(p->retcode);
5617 
5618 	if (retcode >= SS_SUCCESS) {
5619 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5620 	} else {
5621 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5622 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5623 			 drbd_set_st_err_str(retcode), retcode);
5624 	}
5625 	wake_up(&connection->ping_wait);
5626 
5627 	return 0;
5628 }
5629 
5630 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5631 {
5632 	struct drbd_peer_device *peer_device;
5633 	struct drbd_device *device;
5634 	struct p_req_state_reply *p = pi->data;
5635 	int retcode = be32_to_cpu(p->retcode);
5636 
5637 	peer_device = conn_peer_device(connection, pi->vnr);
5638 	if (!peer_device)
5639 		return -EIO;
5640 	device = peer_device->device;
5641 
5642 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5643 		D_ASSERT(device, connection->agreed_pro_version < 100);
5644 		return got_conn_RqSReply(connection, pi);
5645 	}
5646 
5647 	if (retcode >= SS_SUCCESS) {
5648 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5649 	} else {
5650 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5651 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5652 			drbd_set_st_err_str(retcode), retcode);
5653 	}
5654 	wake_up(&device->state_wait);
5655 
5656 	return 0;
5657 }
5658 
5659 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5660 {
5661 	return drbd_send_ping_ack(connection);
5662 
5663 }
5664 
5665 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5666 {
5667 	/* restore idle timeout */
5668 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5669 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5670 		wake_up(&connection->ping_wait);
5671 
5672 	return 0;
5673 }
5674 
5675 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5676 {
5677 	struct drbd_peer_device *peer_device;
5678 	struct drbd_device *device;
5679 	struct p_block_ack *p = pi->data;
5680 	sector_t sector = be64_to_cpu(p->sector);
5681 	int blksize = be32_to_cpu(p->blksize);
5682 
5683 	peer_device = conn_peer_device(connection, pi->vnr);
5684 	if (!peer_device)
5685 		return -EIO;
5686 	device = peer_device->device;
5687 
5688 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5689 
5690 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5691 
5692 	if (get_ldev(device)) {
5693 		drbd_rs_complete_io(device, sector);
5694 		drbd_set_in_sync(device, sector, blksize);
5695 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5696 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5697 		put_ldev(device);
5698 	}
5699 	dec_rs_pending(device);
5700 	atomic_add(blksize >> 9, &device->rs_sect_in);
5701 
5702 	return 0;
5703 }
5704 
5705 static int
5706 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5707 			      struct rb_root *root, const char *func,
5708 			      enum drbd_req_event what, bool missing_ok)
5709 {
5710 	struct drbd_request *req;
5711 	struct bio_and_error m;
5712 
5713 	spin_lock_irq(&device->resource->req_lock);
5714 	req = find_request(device, root, id, sector, missing_ok, func);
5715 	if (unlikely(!req)) {
5716 		spin_unlock_irq(&device->resource->req_lock);
5717 		return -EIO;
5718 	}
5719 	__req_mod(req, what, &m);
5720 	spin_unlock_irq(&device->resource->req_lock);
5721 
5722 	if (m.bio)
5723 		complete_master_bio(device, &m);
5724 	return 0;
5725 }
5726 
5727 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5728 {
5729 	struct drbd_peer_device *peer_device;
5730 	struct drbd_device *device;
5731 	struct p_block_ack *p = pi->data;
5732 	sector_t sector = be64_to_cpu(p->sector);
5733 	int blksize = be32_to_cpu(p->blksize);
5734 	enum drbd_req_event what;
5735 
5736 	peer_device = conn_peer_device(connection, pi->vnr);
5737 	if (!peer_device)
5738 		return -EIO;
5739 	device = peer_device->device;
5740 
5741 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5742 
5743 	if (p->block_id == ID_SYNCER) {
5744 		drbd_set_in_sync(device, sector, blksize);
5745 		dec_rs_pending(device);
5746 		return 0;
5747 	}
5748 	switch (pi->cmd) {
5749 	case P_RS_WRITE_ACK:
5750 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5751 		break;
5752 	case P_WRITE_ACK:
5753 		what = WRITE_ACKED_BY_PEER;
5754 		break;
5755 	case P_RECV_ACK:
5756 		what = RECV_ACKED_BY_PEER;
5757 		break;
5758 	case P_SUPERSEDED:
5759 		what = CONFLICT_RESOLVED;
5760 		break;
5761 	case P_RETRY_WRITE:
5762 		what = POSTPONE_WRITE;
5763 		break;
5764 	default:
5765 		BUG();
5766 	}
5767 
5768 	return validate_req_change_req_state(device, p->block_id, sector,
5769 					     &device->write_requests, __func__,
5770 					     what, false);
5771 }
5772 
5773 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5774 {
5775 	struct drbd_peer_device *peer_device;
5776 	struct drbd_device *device;
5777 	struct p_block_ack *p = pi->data;
5778 	sector_t sector = be64_to_cpu(p->sector);
5779 	int size = be32_to_cpu(p->blksize);
5780 	int err;
5781 
5782 	peer_device = conn_peer_device(connection, pi->vnr);
5783 	if (!peer_device)
5784 		return -EIO;
5785 	device = peer_device->device;
5786 
5787 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5788 
5789 	if (p->block_id == ID_SYNCER) {
5790 		dec_rs_pending(device);
5791 		drbd_rs_failed_io(device, sector, size);
5792 		return 0;
5793 	}
5794 
5795 	err = validate_req_change_req_state(device, p->block_id, sector,
5796 					    &device->write_requests, __func__,
5797 					    NEG_ACKED, true);
5798 	if (err) {
5799 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5800 		   The master bio might already be completed, therefore the
5801 		   request is no longer in the collision hash. */
5802 		/* In Protocol B we might already have got a P_RECV_ACK
5803 		   but then get a P_NEG_ACK afterwards. */
5804 		drbd_set_out_of_sync(device, sector, size);
5805 	}
5806 	return 0;
5807 }
5808 
5809 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5810 {
5811 	struct drbd_peer_device *peer_device;
5812 	struct drbd_device *device;
5813 	struct p_block_ack *p = pi->data;
5814 	sector_t sector = be64_to_cpu(p->sector);
5815 
5816 	peer_device = conn_peer_device(connection, pi->vnr);
5817 	if (!peer_device)
5818 		return -EIO;
5819 	device = peer_device->device;
5820 
5821 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5822 
5823 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5824 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5825 
5826 	return validate_req_change_req_state(device, p->block_id, sector,
5827 					     &device->read_requests, __func__,
5828 					     NEG_ACKED, false);
5829 }
5830 
5831 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5832 {
5833 	struct drbd_peer_device *peer_device;
5834 	struct drbd_device *device;
5835 	sector_t sector;
5836 	int size;
5837 	struct p_block_ack *p = pi->data;
5838 
5839 	peer_device = conn_peer_device(connection, pi->vnr);
5840 	if (!peer_device)
5841 		return -EIO;
5842 	device = peer_device->device;
5843 
5844 	sector = be64_to_cpu(p->sector);
5845 	size = be32_to_cpu(p->blksize);
5846 
5847 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5848 
5849 	dec_rs_pending(device);
5850 
5851 	if (get_ldev_if_state(device, D_FAILED)) {
5852 		drbd_rs_complete_io(device, sector);
5853 		switch (pi->cmd) {
5854 		case P_NEG_RS_DREPLY:
5855 			drbd_rs_failed_io(device, sector, size);
5856 		case P_RS_CANCEL:
5857 			break;
5858 		default:
5859 			BUG();
5860 		}
5861 		put_ldev(device);
5862 	}
5863 
5864 	return 0;
5865 }
5866 
5867 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5868 {
5869 	struct p_barrier_ack *p = pi->data;
5870 	struct drbd_peer_device *peer_device;
5871 	int vnr;
5872 
5873 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5874 
5875 	rcu_read_lock();
5876 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5877 		struct drbd_device *device = peer_device->device;
5878 
5879 		if (device->state.conn == C_AHEAD &&
5880 		    atomic_read(&device->ap_in_flight) == 0 &&
5881 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5882 			device->start_resync_timer.expires = jiffies + HZ;
5883 			add_timer(&device->start_resync_timer);
5884 		}
5885 	}
5886 	rcu_read_unlock();
5887 
5888 	return 0;
5889 }
5890 
5891 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5892 {
5893 	struct drbd_peer_device *peer_device;
5894 	struct drbd_device *device;
5895 	struct p_block_ack *p = pi->data;
5896 	struct drbd_device_work *dw;
5897 	sector_t sector;
5898 	int size;
5899 
5900 	peer_device = conn_peer_device(connection, pi->vnr);
5901 	if (!peer_device)
5902 		return -EIO;
5903 	device = peer_device->device;
5904 
5905 	sector = be64_to_cpu(p->sector);
5906 	size = be32_to_cpu(p->blksize);
5907 
5908 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5909 
5910 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5911 		drbd_ov_out_of_sync_found(device, sector, size);
5912 	else
5913 		ov_out_of_sync_print(device);
5914 
5915 	if (!get_ldev(device))
5916 		return 0;
5917 
5918 	drbd_rs_complete_io(device, sector);
5919 	dec_rs_pending(device);
5920 
5921 	--device->ov_left;
5922 
5923 	/* let's advance progress step marks only for every other megabyte */
5924 	if ((device->ov_left & 0x200) == 0x200)
5925 		drbd_advance_rs_marks(device, device->ov_left);
5926 
5927 	if (device->ov_left == 0) {
5928 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5929 		if (dw) {
5930 			dw->w.cb = w_ov_finished;
5931 			dw->device = device;
5932 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5933 		} else {
5934 			drbd_err(device, "kmalloc(dw) failed.");
5935 			ov_out_of_sync_print(device);
5936 			drbd_resync_finished(device);
5937 		}
5938 	}
5939 	put_ldev(device);
5940 	return 0;
5941 }
5942 
5943 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5944 {
5945 	return 0;
5946 }
5947 
5948 struct meta_sock_cmd {
5949 	size_t pkt_size;
5950 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5951 };
5952 
5953 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5954 {
5955 	long t;
5956 	struct net_conf *nc;
5957 
5958 	rcu_read_lock();
5959 	nc = rcu_dereference(connection->net_conf);
5960 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5961 	rcu_read_unlock();
5962 
5963 	t *= HZ;
5964 	if (ping_timeout)
5965 		t /= 10;
5966 
5967 	connection->meta.socket->sk->sk_rcvtimeo = t;
5968 }
5969 
5970 static void set_ping_timeout(struct drbd_connection *connection)
5971 {
5972 	set_rcvtimeo(connection, 1);
5973 }
5974 
5975 static void set_idle_timeout(struct drbd_connection *connection)
5976 {
5977 	set_rcvtimeo(connection, 0);
5978 }
5979 
5980 static struct meta_sock_cmd ack_receiver_tbl[] = {
5981 	[P_PING]	    = { 0, got_Ping },
5982 	[P_PING_ACK]	    = { 0, got_PingAck },
5983 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5984 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5985 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5986 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5987 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5988 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5989 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5990 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5991 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5992 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5993 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5994 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5995 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5996 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5997 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5998 };
5999 
6000 int drbd_ack_receiver(struct drbd_thread *thi)
6001 {
6002 	struct drbd_connection *connection = thi->connection;
6003 	struct meta_sock_cmd *cmd = NULL;
6004 	struct packet_info pi;
6005 	unsigned long pre_recv_jif;
6006 	int rv;
6007 	void *buf    = connection->meta.rbuf;
6008 	int received = 0;
6009 	unsigned int header_size = drbd_header_size(connection);
6010 	int expect   = header_size;
6011 	bool ping_timeout_active = false;
6012 	struct sched_param param = { .sched_priority = 2 };
6013 
6014 	rv = sched_setscheduler(current, SCHED_RR, &param);
6015 	if (rv < 0)
6016 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
6017 
6018 	while (get_t_state(thi) == RUNNING) {
6019 		drbd_thread_current_set_cpu(thi);
6020 
6021 		conn_reclaim_net_peer_reqs(connection);
6022 
6023 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6024 			if (drbd_send_ping(connection)) {
6025 				drbd_err(connection, "drbd_send_ping has failed\n");
6026 				goto reconnect;
6027 			}
6028 			set_ping_timeout(connection);
6029 			ping_timeout_active = true;
6030 		}
6031 
6032 		pre_recv_jif = jiffies;
6033 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6034 
6035 		/* Note:
6036 		 * -EINTR	 (on meta) we got a signal
6037 		 * -EAGAIN	 (on meta) rcvtimeo expired
6038 		 * -ECONNRESET	 other side closed the connection
6039 		 * -ERESTARTSYS  (on data) we got a signal
6040 		 * rv <  0	 other than above: unexpected error!
6041 		 * rv == expected: full header or command
6042 		 * rv <  expected: "woken" by signal during receive
6043 		 * rv == 0	 : "connection shut down by peer"
6044 		 */
6045 		if (likely(rv > 0)) {
6046 			received += rv;
6047 			buf	 += rv;
6048 		} else if (rv == 0) {
6049 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6050 				long t;
6051 				rcu_read_lock();
6052 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6053 				rcu_read_unlock();
6054 
6055 				t = wait_event_timeout(connection->ping_wait,
6056 						       connection->cstate < C_WF_REPORT_PARAMS,
6057 						       t);
6058 				if (t)
6059 					break;
6060 			}
6061 			drbd_err(connection, "meta connection shut down by peer.\n");
6062 			goto reconnect;
6063 		} else if (rv == -EAGAIN) {
6064 			/* If the data socket received something meanwhile,
6065 			 * that is good enough: peer is still alive. */
6066 			if (time_after(connection->last_received, pre_recv_jif))
6067 				continue;
6068 			if (ping_timeout_active) {
6069 				drbd_err(connection, "PingAck did not arrive in time.\n");
6070 				goto reconnect;
6071 			}
6072 			set_bit(SEND_PING, &connection->flags);
6073 			continue;
6074 		} else if (rv == -EINTR) {
6075 			/* maybe drbd_thread_stop(): the while condition will notice.
6076 			 * maybe woken for send_ping: we'll send a ping above,
6077 			 * and change the rcvtimeo */
6078 			flush_signals(current);
6079 			continue;
6080 		} else {
6081 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6082 			goto reconnect;
6083 		}
6084 
6085 		if (received == expect && cmd == NULL) {
6086 			if (decode_header(connection, connection->meta.rbuf, &pi))
6087 				goto reconnect;
6088 			cmd = &ack_receiver_tbl[pi.cmd];
6089 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6090 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6091 					 cmdname(pi.cmd), pi.cmd);
6092 				goto disconnect;
6093 			}
6094 			expect = header_size + cmd->pkt_size;
6095 			if (pi.size != expect - header_size) {
6096 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6097 					pi.cmd, pi.size);
6098 				goto reconnect;
6099 			}
6100 		}
6101 		if (received == expect) {
6102 			bool err;
6103 
6104 			err = cmd->fn(connection, &pi);
6105 			if (err) {
6106 				drbd_err(connection, "%ps failed\n", cmd->fn);
6107 				goto reconnect;
6108 			}
6109 
6110 			connection->last_received = jiffies;
6111 
6112 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6113 				set_idle_timeout(connection);
6114 				ping_timeout_active = false;
6115 			}
6116 
6117 			buf	 = connection->meta.rbuf;
6118 			received = 0;
6119 			expect	 = header_size;
6120 			cmd	 = NULL;
6121 		}
6122 	}
6123 
6124 	if (0) {
6125 reconnect:
6126 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6127 		conn_md_sync(connection);
6128 	}
6129 	if (0) {
6130 disconnect:
6131 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6132 	}
6133 
6134 	drbd_info(connection, "ack_receiver terminated\n");
6135 
6136 	return 0;
6137 }
6138 
6139 void drbd_send_acks_wf(struct work_struct *ws)
6140 {
6141 	struct drbd_peer_device *peer_device =
6142 		container_of(ws, struct drbd_peer_device, send_acks_work);
6143 	struct drbd_connection *connection = peer_device->connection;
6144 	struct drbd_device *device = peer_device->device;
6145 	struct net_conf *nc;
6146 	int tcp_cork, err;
6147 
6148 	rcu_read_lock();
6149 	nc = rcu_dereference(connection->net_conf);
6150 	tcp_cork = nc->tcp_cork;
6151 	rcu_read_unlock();
6152 
6153 	if (tcp_cork)
6154 		drbd_tcp_cork(connection->meta.socket);
6155 
6156 	err = drbd_finish_peer_reqs(device);
6157 	kref_put(&device->kref, drbd_destroy_device);
6158 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6159 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6160 
6161 	if (err) {
6162 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6163 		return;
6164 	}
6165 
6166 	if (tcp_cork)
6167 		drbd_tcp_uncork(connection->meta.socket);
6168 
6169 	return;
6170 }
6171