1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52 
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
54 
55 struct packet_info {
56 	enum drbd_packet cmd;
57 	unsigned int size;
58 	unsigned int vnr;
59 	void *data;
60 };
61 
62 enum finish_epoch {
63 	FE_STILL_LIVE,
64 	FE_DESTROYED,
65 	FE_RECYCLED,
66 };
67 
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74 
75 
76 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
77 
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82 
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89 	struct page *page;
90 	struct page *tmp;
91 
92 	BUG_ON(!n);
93 	BUG_ON(!head);
94 
95 	page = *head;
96 
97 	if (!page)
98 		return NULL;
99 
100 	while (page) {
101 		tmp = page_chain_next(page);
102 		if (--n == 0)
103 			break; /* found sufficient pages */
104 		if (tmp == NULL)
105 			/* insufficient pages, don't use any of them. */
106 			return NULL;
107 		page = tmp;
108 	}
109 
110 	/* add end of list marker for the returned list */
111 	set_page_private(page, 0);
112 	/* actual return value, and adjustment of head */
113 	page = *head;
114 	*head = tmp;
115 	return page;
116 }
117 
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123 	struct page *tmp;
124 	int i = 1;
125 	while ((tmp = page_chain_next(page)))
126 		++i, page = tmp;
127 	if (len)
128 		*len = i;
129 	return page;
130 }
131 
132 static int page_chain_free(struct page *page)
133 {
134 	struct page *tmp;
135 	int i = 0;
136 	page_chain_for_each_safe(page, tmp) {
137 		put_page(page);
138 		++i;
139 	}
140 	return i;
141 }
142 
143 static void page_chain_add(struct page **head,
144 		struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147 	struct page *tmp;
148 	tmp = page_chain_tail(chain_first, NULL);
149 	BUG_ON(tmp != chain_last);
150 #endif
151 
152 	/* add chain to head */
153 	set_page_private(chain_last, (unsigned long)*head);
154 	*head = chain_first;
155 }
156 
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 				       unsigned int number)
159 {
160 	struct page *page = NULL;
161 	struct page *tmp = NULL;
162 	unsigned int i = 0;
163 
164 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
165 	 * So what. It saves a spin_lock. */
166 	if (drbd_pp_vacant >= number) {
167 		spin_lock(&drbd_pp_lock);
168 		page = page_chain_del(&drbd_pp_pool, number);
169 		if (page)
170 			drbd_pp_vacant -= number;
171 		spin_unlock(&drbd_pp_lock);
172 		if (page)
173 			return page;
174 	}
175 
176 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 	 * which in turn might block on the other node at this very place.  */
179 	for (i = 0; i < number; i++) {
180 		tmp = alloc_page(GFP_TRY);
181 		if (!tmp)
182 			break;
183 		set_page_private(tmp, (unsigned long)page);
184 		page = tmp;
185 	}
186 
187 	if (i == number)
188 		return page;
189 
190 	/* Not enough pages immediately available this time.
191 	 * No need to jump around here, drbd_alloc_pages will retry this
192 	 * function "soon". */
193 	if (page) {
194 		tmp = page_chain_tail(page, NULL);
195 		spin_lock(&drbd_pp_lock);
196 		page_chain_add(&drbd_pp_pool, page, tmp);
197 		drbd_pp_vacant += i;
198 		spin_unlock(&drbd_pp_lock);
199 	}
200 	return NULL;
201 }
202 
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 					   struct list_head *to_be_freed)
205 {
206 	struct drbd_peer_request *peer_req, *tmp;
207 
208 	/* The EEs are always appended to the end of the list. Since
209 	   they are sent in order over the wire, they have to finish
210 	   in order. As soon as we see the first not finished we can
211 	   stop to examine the list... */
212 
213 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 		if (drbd_peer_req_has_active_page(peer_req))
215 			break;
216 		list_move(&peer_req->w.list, to_be_freed);
217 	}
218 }
219 
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222 	LIST_HEAD(reclaimed);
223 	struct drbd_peer_request *peer_req, *t;
224 
225 	spin_lock_irq(&device->resource->req_lock);
226 	reclaim_finished_net_peer_reqs(device, &reclaimed);
227 	spin_unlock_irq(&device->resource->req_lock);
228 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 		drbd_free_net_peer_req(device, peer_req);
230 }
231 
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234 	struct drbd_peer_device *peer_device;
235 	int vnr;
236 
237 	rcu_read_lock();
238 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 		struct drbd_device *device = peer_device->device;
240 		if (!atomic_read(&device->pp_in_use_by_net))
241 			continue;
242 
243 		kref_get(&device->kref);
244 		rcu_read_unlock();
245 		drbd_reclaim_net_peer_reqs(device);
246 		kref_put(&device->kref, drbd_destroy_device);
247 		rcu_read_lock();
248 	}
249 	rcu_read_unlock();
250 }
251 
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:	DRBD device.
255  * @number:	number of pages requested
256  * @retry:	whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 			      bool retry)
274 {
275 	struct drbd_device *device = peer_device->device;
276 	struct page *page = NULL;
277 	struct net_conf *nc;
278 	DEFINE_WAIT(wait);
279 	unsigned int mxb;
280 
281 	rcu_read_lock();
282 	nc = rcu_dereference(peer_device->connection->net_conf);
283 	mxb = nc ? nc->max_buffers : 1000000;
284 	rcu_read_unlock();
285 
286 	if (atomic_read(&device->pp_in_use) < mxb)
287 		page = __drbd_alloc_pages(device, number);
288 
289 	/* Try to keep the fast path fast, but occasionally we need
290 	 * to reclaim the pages we lended to the network stack. */
291 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 		drbd_reclaim_net_peer_reqs(device);
293 
294 	while (page == NULL) {
295 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296 
297 		drbd_reclaim_net_peer_reqs(device);
298 
299 		if (atomic_read(&device->pp_in_use) < mxb) {
300 			page = __drbd_alloc_pages(device, number);
301 			if (page)
302 				break;
303 		}
304 
305 		if (!retry)
306 			break;
307 
308 		if (signal_pending(current)) {
309 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 			break;
311 		}
312 
313 		if (schedule_timeout(HZ/10) == 0)
314 			mxb = UINT_MAX;
315 	}
316 	finish_wait(&drbd_pp_wait, &wait);
317 
318 	if (page)
319 		atomic_add(number, &device->pp_in_use);
320 	return page;
321 }
322 
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 	int i;
331 
332 	if (page == NULL)
333 		return;
334 
335 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336 		i = page_chain_free(page);
337 	else {
338 		struct page *tmp;
339 		tmp = page_chain_tail(page, &i);
340 		spin_lock(&drbd_pp_lock);
341 		page_chain_add(&drbd_pp_pool, page, tmp);
342 		drbd_pp_vacant += i;
343 		spin_unlock(&drbd_pp_lock);
344 	}
345 	i = atomic_sub_return(i, a);
346 	if (i < 0)
347 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 	wake_up(&drbd_pp_wait);
350 }
351 
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355 
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365 
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373 	struct drbd_device *device = peer_device->device;
374 	struct drbd_peer_request *peer_req;
375 	struct page *page = NULL;
376 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377 
378 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 		return NULL;
380 
381 	peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 	if (!peer_req) {
383 		if (!(gfp_mask & __GFP_NOWARN))
384 			drbd_err(device, "%s: allocation failed\n", __func__);
385 		return NULL;
386 	}
387 
388 	if (nr_pages) {
389 		page = drbd_alloc_pages(peer_device, nr_pages,
390 					gfpflags_allow_blocking(gfp_mask));
391 		if (!page)
392 			goto fail;
393 	}
394 
395 	memset(peer_req, 0, sizeof(*peer_req));
396 	INIT_LIST_HEAD(&peer_req->w.list);
397 	drbd_clear_interval(&peer_req->i);
398 	peer_req->i.size = request_size;
399 	peer_req->i.sector = sector;
400 	peer_req->submit_jif = jiffies;
401 	peer_req->peer_device = peer_device;
402 	peer_req->pages = page;
403 	/*
404 	 * The block_id is opaque to the receiver.  It is not endianness
405 	 * converted, and sent back to the sender unchanged.
406 	 */
407 	peer_req->block_id = id;
408 
409 	return peer_req;
410 
411  fail:
412 	mempool_free(peer_req, &drbd_ee_mempool);
413 	return NULL;
414 }
415 
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 		       int is_net)
418 {
419 	might_sleep();
420 	if (peer_req->flags & EE_HAS_DIGEST)
421 		kfree(peer_req->digest);
422 	drbd_free_pages(device, peer_req->pages, is_net);
423 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 		drbd_al_complete_io(device, &peer_req->i);
428 	}
429 	mempool_free(peer_req, &drbd_ee_mempool);
430 }
431 
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434 	LIST_HEAD(work_list);
435 	struct drbd_peer_request *peer_req, *t;
436 	int count = 0;
437 	int is_net = list == &device->net_ee;
438 
439 	spin_lock_irq(&device->resource->req_lock);
440 	list_splice_init(list, &work_list);
441 	spin_unlock_irq(&device->resource->req_lock);
442 
443 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 		__drbd_free_peer_req(device, peer_req, is_net);
445 		count++;
446 	}
447 	return count;
448 }
449 
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455 	LIST_HEAD(work_list);
456 	LIST_HEAD(reclaimed);
457 	struct drbd_peer_request *peer_req, *t;
458 	int err = 0;
459 
460 	spin_lock_irq(&device->resource->req_lock);
461 	reclaim_finished_net_peer_reqs(device, &reclaimed);
462 	list_splice_init(&device->done_ee, &work_list);
463 	spin_unlock_irq(&device->resource->req_lock);
464 
465 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 		drbd_free_net_peer_req(device, peer_req);
467 
468 	/* possible callbacks here:
469 	 * e_end_block, and e_end_resync_block, e_send_superseded.
470 	 * all ignore the last argument.
471 	 */
472 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 		int err2;
474 
475 		/* list_del not necessary, next/prev members not touched */
476 		err2 = peer_req->w.cb(&peer_req->w, !!err);
477 		if (!err)
478 			err = err2;
479 		drbd_free_peer_req(device, peer_req);
480 	}
481 	wake_up(&device->ee_wait);
482 
483 	return err;
484 }
485 
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 				     struct list_head *head)
488 {
489 	DEFINE_WAIT(wait);
490 
491 	/* avoids spin_lock/unlock
492 	 * and calling prepare_to_wait in the fast path */
493 	while (!list_empty(head)) {
494 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 		spin_unlock_irq(&device->resource->req_lock);
496 		io_schedule();
497 		finish_wait(&device->ee_wait, &wait);
498 		spin_lock_irq(&device->resource->req_lock);
499 	}
500 }
501 
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 				    struct list_head *head)
504 {
505 	spin_lock_irq(&device->resource->req_lock);
506 	_drbd_wait_ee_list_empty(device, head);
507 	spin_unlock_irq(&device->resource->req_lock);
508 }
509 
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512 	struct kvec iov = {
513 		.iov_base = buf,
514 		.iov_len = size,
515 	};
516 	struct msghdr msg = {
517 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518 	};
519 	iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
520 	return sock_recvmsg(sock, &msg, msg.msg_flags);
521 }
522 
523 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
524 {
525 	int rv;
526 
527 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
528 
529 	if (rv < 0) {
530 		if (rv == -ECONNRESET)
531 			drbd_info(connection, "sock was reset by peer\n");
532 		else if (rv != -ERESTARTSYS)
533 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
534 	} else if (rv == 0) {
535 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
536 			long t;
537 			rcu_read_lock();
538 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
539 			rcu_read_unlock();
540 
541 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
542 
543 			if (t)
544 				goto out;
545 		}
546 		drbd_info(connection, "sock was shut down by peer\n");
547 	}
548 
549 	if (rv != size)
550 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
551 
552 out:
553 	return rv;
554 }
555 
556 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
557 {
558 	int err;
559 
560 	err = drbd_recv(connection, buf, size);
561 	if (err != size) {
562 		if (err >= 0)
563 			err = -EIO;
564 	} else
565 		err = 0;
566 	return err;
567 }
568 
569 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
570 {
571 	int err;
572 
573 	err = drbd_recv_all(connection, buf, size);
574 	if (err && !signal_pending(current))
575 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
576 	return err;
577 }
578 
579 /* quoting tcp(7):
580  *   On individual connections, the socket buffer size must be set prior to the
581  *   listen(2) or connect(2) calls in order to have it take effect.
582  * This is our wrapper to do so.
583  */
584 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
585 		unsigned int rcv)
586 {
587 	/* open coded SO_SNDBUF, SO_RCVBUF */
588 	if (snd) {
589 		sock->sk->sk_sndbuf = snd;
590 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
591 	}
592 	if (rcv) {
593 		sock->sk->sk_rcvbuf = rcv;
594 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
595 	}
596 }
597 
598 static struct socket *drbd_try_connect(struct drbd_connection *connection)
599 {
600 	const char *what;
601 	struct socket *sock;
602 	struct sockaddr_in6 src_in6;
603 	struct sockaddr_in6 peer_in6;
604 	struct net_conf *nc;
605 	int err, peer_addr_len, my_addr_len;
606 	int sndbuf_size, rcvbuf_size, connect_int;
607 	int disconnect_on_error = 1;
608 
609 	rcu_read_lock();
610 	nc = rcu_dereference(connection->net_conf);
611 	if (!nc) {
612 		rcu_read_unlock();
613 		return NULL;
614 	}
615 	sndbuf_size = nc->sndbuf_size;
616 	rcvbuf_size = nc->rcvbuf_size;
617 	connect_int = nc->connect_int;
618 	rcu_read_unlock();
619 
620 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
621 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
622 
623 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
624 		src_in6.sin6_port = 0;
625 	else
626 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
627 
628 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
629 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
630 
631 	what = "sock_create_kern";
632 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
633 			       SOCK_STREAM, IPPROTO_TCP, &sock);
634 	if (err < 0) {
635 		sock = NULL;
636 		goto out;
637 	}
638 
639 	sock->sk->sk_rcvtimeo =
640 	sock->sk->sk_sndtimeo = connect_int * HZ;
641 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
642 
643        /* explicitly bind to the configured IP as source IP
644 	*  for the outgoing connections.
645 	*  This is needed for multihomed hosts and to be
646 	*  able to use lo: interfaces for drbd.
647 	* Make sure to use 0 as port number, so linux selects
648 	*  a free one dynamically.
649 	*/
650 	what = "bind before connect";
651 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
652 	if (err < 0)
653 		goto out;
654 
655 	/* connect may fail, peer not yet available.
656 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
657 	disconnect_on_error = 0;
658 	what = "connect";
659 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
660 
661 out:
662 	if (err < 0) {
663 		if (sock) {
664 			sock_release(sock);
665 			sock = NULL;
666 		}
667 		switch (-err) {
668 			/* timeout, busy, signal pending */
669 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
670 		case EINTR: case ERESTARTSYS:
671 			/* peer not (yet) available, network problem */
672 		case ECONNREFUSED: case ENETUNREACH:
673 		case EHOSTDOWN:    case EHOSTUNREACH:
674 			disconnect_on_error = 0;
675 			break;
676 		default:
677 			drbd_err(connection, "%s failed, err = %d\n", what, err);
678 		}
679 		if (disconnect_on_error)
680 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
681 	}
682 
683 	return sock;
684 }
685 
686 struct accept_wait_data {
687 	struct drbd_connection *connection;
688 	struct socket *s_listen;
689 	struct completion door_bell;
690 	void (*original_sk_state_change)(struct sock *sk);
691 
692 };
693 
694 static void drbd_incoming_connection(struct sock *sk)
695 {
696 	struct accept_wait_data *ad = sk->sk_user_data;
697 	void (*state_change)(struct sock *sk);
698 
699 	state_change = ad->original_sk_state_change;
700 	if (sk->sk_state == TCP_ESTABLISHED)
701 		complete(&ad->door_bell);
702 	state_change(sk);
703 }
704 
705 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
706 {
707 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
708 	struct sockaddr_in6 my_addr;
709 	struct socket *s_listen;
710 	struct net_conf *nc;
711 	const char *what;
712 
713 	rcu_read_lock();
714 	nc = rcu_dereference(connection->net_conf);
715 	if (!nc) {
716 		rcu_read_unlock();
717 		return -EIO;
718 	}
719 	sndbuf_size = nc->sndbuf_size;
720 	rcvbuf_size = nc->rcvbuf_size;
721 	rcu_read_unlock();
722 
723 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
724 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
725 
726 	what = "sock_create_kern";
727 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
728 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
729 	if (err) {
730 		s_listen = NULL;
731 		goto out;
732 	}
733 
734 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
735 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
736 
737 	what = "bind before listen";
738 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
739 	if (err < 0)
740 		goto out;
741 
742 	ad->s_listen = s_listen;
743 	write_lock_bh(&s_listen->sk->sk_callback_lock);
744 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
745 	s_listen->sk->sk_state_change = drbd_incoming_connection;
746 	s_listen->sk->sk_user_data = ad;
747 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
748 
749 	what = "listen";
750 	err = s_listen->ops->listen(s_listen, 5);
751 	if (err < 0)
752 		goto out;
753 
754 	return 0;
755 out:
756 	if (s_listen)
757 		sock_release(s_listen);
758 	if (err < 0) {
759 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
760 			drbd_err(connection, "%s failed, err = %d\n", what, err);
761 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
762 		}
763 	}
764 
765 	return -EIO;
766 }
767 
768 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
769 {
770 	write_lock_bh(&sk->sk_callback_lock);
771 	sk->sk_state_change = ad->original_sk_state_change;
772 	sk->sk_user_data = NULL;
773 	write_unlock_bh(&sk->sk_callback_lock);
774 }
775 
776 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
777 {
778 	int timeo, connect_int, err = 0;
779 	struct socket *s_estab = NULL;
780 	struct net_conf *nc;
781 
782 	rcu_read_lock();
783 	nc = rcu_dereference(connection->net_conf);
784 	if (!nc) {
785 		rcu_read_unlock();
786 		return NULL;
787 	}
788 	connect_int = nc->connect_int;
789 	rcu_read_unlock();
790 
791 	timeo = connect_int * HZ;
792 	/* 28.5% random jitter */
793 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
794 
795 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
796 	if (err <= 0)
797 		return NULL;
798 
799 	err = kernel_accept(ad->s_listen, &s_estab, 0);
800 	if (err < 0) {
801 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
802 			drbd_err(connection, "accept failed, err = %d\n", err);
803 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
804 		}
805 	}
806 
807 	if (s_estab)
808 		unregister_state_change(s_estab->sk, ad);
809 
810 	return s_estab;
811 }
812 
813 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
814 
815 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
816 			     enum drbd_packet cmd)
817 {
818 	if (!conn_prepare_command(connection, sock))
819 		return -EIO;
820 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
821 }
822 
823 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
824 {
825 	unsigned int header_size = drbd_header_size(connection);
826 	struct packet_info pi;
827 	struct net_conf *nc;
828 	int err;
829 
830 	rcu_read_lock();
831 	nc = rcu_dereference(connection->net_conf);
832 	if (!nc) {
833 		rcu_read_unlock();
834 		return -EIO;
835 	}
836 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
837 	rcu_read_unlock();
838 
839 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
840 	if (err != header_size) {
841 		if (err >= 0)
842 			err = -EIO;
843 		return err;
844 	}
845 	err = decode_header(connection, connection->data.rbuf, &pi);
846 	if (err)
847 		return err;
848 	return pi.cmd;
849 }
850 
851 /**
852  * drbd_socket_okay() - Free the socket if its connection is not okay
853  * @sock:	pointer to the pointer to the socket.
854  */
855 static bool drbd_socket_okay(struct socket **sock)
856 {
857 	int rr;
858 	char tb[4];
859 
860 	if (!*sock)
861 		return false;
862 
863 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
864 
865 	if (rr > 0 || rr == -EAGAIN) {
866 		return true;
867 	} else {
868 		sock_release(*sock);
869 		*sock = NULL;
870 		return false;
871 	}
872 }
873 
874 static bool connection_established(struct drbd_connection *connection,
875 				   struct socket **sock1,
876 				   struct socket **sock2)
877 {
878 	struct net_conf *nc;
879 	int timeout;
880 	bool ok;
881 
882 	if (!*sock1 || !*sock2)
883 		return false;
884 
885 	rcu_read_lock();
886 	nc = rcu_dereference(connection->net_conf);
887 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
888 	rcu_read_unlock();
889 	schedule_timeout_interruptible(timeout);
890 
891 	ok = drbd_socket_okay(sock1);
892 	ok = drbd_socket_okay(sock2) && ok;
893 
894 	return ok;
895 }
896 
897 /* Gets called if a connection is established, or if a new minor gets created
898    in a connection */
899 int drbd_connected(struct drbd_peer_device *peer_device)
900 {
901 	struct drbd_device *device = peer_device->device;
902 	int err;
903 
904 	atomic_set(&device->packet_seq, 0);
905 	device->peer_seq = 0;
906 
907 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
908 		&peer_device->connection->cstate_mutex :
909 		&device->own_state_mutex;
910 
911 	err = drbd_send_sync_param(peer_device);
912 	if (!err)
913 		err = drbd_send_sizes(peer_device, 0, 0);
914 	if (!err)
915 		err = drbd_send_uuids(peer_device);
916 	if (!err)
917 		err = drbd_send_current_state(peer_device);
918 	clear_bit(USE_DEGR_WFC_T, &device->flags);
919 	clear_bit(RESIZE_PENDING, &device->flags);
920 	atomic_set(&device->ap_in_flight, 0);
921 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
922 	return err;
923 }
924 
925 /*
926  * return values:
927  *   1 yes, we have a valid connection
928  *   0 oops, did not work out, please try again
929  *  -1 peer talks different language,
930  *     no point in trying again, please go standalone.
931  *  -2 We do not have a network config...
932  */
933 static int conn_connect(struct drbd_connection *connection)
934 {
935 	struct drbd_socket sock, msock;
936 	struct drbd_peer_device *peer_device;
937 	struct net_conf *nc;
938 	int vnr, timeout, h;
939 	bool discard_my_data, ok;
940 	enum drbd_state_rv rv;
941 	struct accept_wait_data ad = {
942 		.connection = connection,
943 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
944 	};
945 
946 	clear_bit(DISCONNECT_SENT, &connection->flags);
947 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
948 		return -2;
949 
950 	mutex_init(&sock.mutex);
951 	sock.sbuf = connection->data.sbuf;
952 	sock.rbuf = connection->data.rbuf;
953 	sock.socket = NULL;
954 	mutex_init(&msock.mutex);
955 	msock.sbuf = connection->meta.sbuf;
956 	msock.rbuf = connection->meta.rbuf;
957 	msock.socket = NULL;
958 
959 	/* Assume that the peer only understands protocol 80 until we know better.  */
960 	connection->agreed_pro_version = 80;
961 
962 	if (prepare_listen_socket(connection, &ad))
963 		return 0;
964 
965 	do {
966 		struct socket *s;
967 
968 		s = drbd_try_connect(connection);
969 		if (s) {
970 			if (!sock.socket) {
971 				sock.socket = s;
972 				send_first_packet(connection, &sock, P_INITIAL_DATA);
973 			} else if (!msock.socket) {
974 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
975 				msock.socket = s;
976 				send_first_packet(connection, &msock, P_INITIAL_META);
977 			} else {
978 				drbd_err(connection, "Logic error in conn_connect()\n");
979 				goto out_release_sockets;
980 			}
981 		}
982 
983 		if (connection_established(connection, &sock.socket, &msock.socket))
984 			break;
985 
986 retry:
987 		s = drbd_wait_for_connect(connection, &ad);
988 		if (s) {
989 			int fp = receive_first_packet(connection, s);
990 			drbd_socket_okay(&sock.socket);
991 			drbd_socket_okay(&msock.socket);
992 			switch (fp) {
993 			case P_INITIAL_DATA:
994 				if (sock.socket) {
995 					drbd_warn(connection, "initial packet S crossed\n");
996 					sock_release(sock.socket);
997 					sock.socket = s;
998 					goto randomize;
999 				}
1000 				sock.socket = s;
1001 				break;
1002 			case P_INITIAL_META:
1003 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1004 				if (msock.socket) {
1005 					drbd_warn(connection, "initial packet M crossed\n");
1006 					sock_release(msock.socket);
1007 					msock.socket = s;
1008 					goto randomize;
1009 				}
1010 				msock.socket = s;
1011 				break;
1012 			default:
1013 				drbd_warn(connection, "Error receiving initial packet\n");
1014 				sock_release(s);
1015 randomize:
1016 				if (prandom_u32() & 1)
1017 					goto retry;
1018 			}
1019 		}
1020 
1021 		if (connection->cstate <= C_DISCONNECTING)
1022 			goto out_release_sockets;
1023 		if (signal_pending(current)) {
1024 			flush_signals(current);
1025 			smp_rmb();
1026 			if (get_t_state(&connection->receiver) == EXITING)
1027 				goto out_release_sockets;
1028 		}
1029 
1030 		ok = connection_established(connection, &sock.socket, &msock.socket);
1031 	} while (!ok);
1032 
1033 	if (ad.s_listen)
1034 		sock_release(ad.s_listen);
1035 
1036 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1038 
1039 	sock.socket->sk->sk_allocation = GFP_NOIO;
1040 	msock.socket->sk->sk_allocation = GFP_NOIO;
1041 
1042 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1043 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1044 
1045 	/* NOT YET ...
1046 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048 	 * first set it to the P_CONNECTION_FEATURES timeout,
1049 	 * which we set to 4x the configured ping_timeout. */
1050 	rcu_read_lock();
1051 	nc = rcu_dereference(connection->net_conf);
1052 
1053 	sock.socket->sk->sk_sndtimeo =
1054 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1055 
1056 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1057 	timeout = nc->timeout * HZ / 10;
1058 	discard_my_data = nc->discard_my_data;
1059 	rcu_read_unlock();
1060 
1061 	msock.socket->sk->sk_sndtimeo = timeout;
1062 
1063 	/* we don't want delays.
1064 	 * we use TCP_CORK where appropriate, though */
1065 	drbd_tcp_nodelay(sock.socket);
1066 	drbd_tcp_nodelay(msock.socket);
1067 
1068 	connection->data.socket = sock.socket;
1069 	connection->meta.socket = msock.socket;
1070 	connection->last_received = jiffies;
1071 
1072 	h = drbd_do_features(connection);
1073 	if (h <= 0)
1074 		return h;
1075 
1076 	if (connection->cram_hmac_tfm) {
1077 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1078 		switch (drbd_do_auth(connection)) {
1079 		case -1:
1080 			drbd_err(connection, "Authentication of peer failed\n");
1081 			return -1;
1082 		case 0:
1083 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1084 			return 0;
1085 		}
1086 	}
1087 
1088 	connection->data.socket->sk->sk_sndtimeo = timeout;
1089 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1090 
1091 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1092 		return -1;
1093 
1094 	/* Prevent a race between resync-handshake and
1095 	 * being promoted to Primary.
1096 	 *
1097 	 * Grab and release the state mutex, so we know that any current
1098 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1099 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1100 	 */
1101 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1102 		mutex_lock(peer_device->device->state_mutex);
1103 
1104 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105 	spin_lock_irq(&connection->resource->req_lock);
1106 	set_bit(STATE_SENT, &connection->flags);
1107 	spin_unlock_irq(&connection->resource->req_lock);
1108 
1109 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1110 		mutex_unlock(peer_device->device->state_mutex);
1111 
1112 	rcu_read_lock();
1113 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1114 		struct drbd_device *device = peer_device->device;
1115 		kref_get(&device->kref);
1116 		rcu_read_unlock();
1117 
1118 		if (discard_my_data)
1119 			set_bit(DISCARD_MY_DATA, &device->flags);
1120 		else
1121 			clear_bit(DISCARD_MY_DATA, &device->flags);
1122 
1123 		drbd_connected(peer_device);
1124 		kref_put(&device->kref, drbd_destroy_device);
1125 		rcu_read_lock();
1126 	}
1127 	rcu_read_unlock();
1128 
1129 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1130 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1131 		clear_bit(STATE_SENT, &connection->flags);
1132 		return 0;
1133 	}
1134 
1135 	drbd_thread_start(&connection->ack_receiver);
1136 	/* opencoded create_singlethread_workqueue(),
1137 	 * to be able to use format string arguments */
1138 	connection->ack_sender =
1139 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1140 	if (!connection->ack_sender) {
1141 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1142 		return 0;
1143 	}
1144 
1145 	mutex_lock(&connection->resource->conf_update);
1146 	/* The discard_my_data flag is a single-shot modifier to the next
1147 	 * connection attempt, the handshake of which is now well underway.
1148 	 * No need for rcu style copying of the whole struct
1149 	 * just to clear a single value. */
1150 	connection->net_conf->discard_my_data = 0;
1151 	mutex_unlock(&connection->resource->conf_update);
1152 
1153 	return h;
1154 
1155 out_release_sockets:
1156 	if (ad.s_listen)
1157 		sock_release(ad.s_listen);
1158 	if (sock.socket)
1159 		sock_release(sock.socket);
1160 	if (msock.socket)
1161 		sock_release(msock.socket);
1162 	return -1;
1163 }
1164 
1165 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1166 {
1167 	unsigned int header_size = drbd_header_size(connection);
1168 
1169 	if (header_size == sizeof(struct p_header100) &&
1170 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1171 		struct p_header100 *h = header;
1172 		if (h->pad != 0) {
1173 			drbd_err(connection, "Header padding is not zero\n");
1174 			return -EINVAL;
1175 		}
1176 		pi->vnr = be16_to_cpu(h->volume);
1177 		pi->cmd = be16_to_cpu(h->command);
1178 		pi->size = be32_to_cpu(h->length);
1179 	} else if (header_size == sizeof(struct p_header95) &&
1180 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1181 		struct p_header95 *h = header;
1182 		pi->cmd = be16_to_cpu(h->command);
1183 		pi->size = be32_to_cpu(h->length);
1184 		pi->vnr = 0;
1185 	} else if (header_size == sizeof(struct p_header80) &&
1186 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1187 		struct p_header80 *h = header;
1188 		pi->cmd = be16_to_cpu(h->command);
1189 		pi->size = be16_to_cpu(h->length);
1190 		pi->vnr = 0;
1191 	} else {
1192 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1193 			 be32_to_cpu(*(__be32 *)header),
1194 			 connection->agreed_pro_version);
1195 		return -EINVAL;
1196 	}
1197 	pi->data = header + header_size;
1198 	return 0;
1199 }
1200 
1201 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1202 {
1203 	if (current->plug == &connection->receiver_plug) {
1204 		blk_finish_plug(&connection->receiver_plug);
1205 		blk_start_plug(&connection->receiver_plug);
1206 	} /* else: maybe just schedule() ?? */
1207 }
1208 
1209 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1210 {
1211 	void *buffer = connection->data.rbuf;
1212 	int err;
1213 
1214 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1215 	if (err)
1216 		return err;
1217 
1218 	err = decode_header(connection, buffer, pi);
1219 	connection->last_received = jiffies;
1220 
1221 	return err;
1222 }
1223 
1224 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1225 {
1226 	void *buffer = connection->data.rbuf;
1227 	unsigned int size = drbd_header_size(connection);
1228 	int err;
1229 
1230 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1231 	if (err != size) {
1232 		/* If we have nothing in the receive buffer now, to reduce
1233 		 * application latency, try to drain the backend queues as
1234 		 * quickly as possible, and let remote TCP know what we have
1235 		 * received so far. */
1236 		if (err == -EAGAIN) {
1237 			drbd_tcp_quickack(connection->data.socket);
1238 			drbd_unplug_all_devices(connection);
1239 		}
1240 		if (err > 0) {
1241 			buffer += err;
1242 			size -= err;
1243 		}
1244 		err = drbd_recv_all_warn(connection, buffer, size);
1245 		if (err)
1246 			return err;
1247 	}
1248 
1249 	err = decode_header(connection, connection->data.rbuf, pi);
1250 	connection->last_received = jiffies;
1251 
1252 	return err;
1253 }
1254 /* This is blkdev_issue_flush, but asynchronous.
1255  * We want to submit to all component volumes in parallel,
1256  * then wait for all completions.
1257  */
1258 struct issue_flush_context {
1259 	atomic_t pending;
1260 	int error;
1261 	struct completion done;
1262 };
1263 struct one_flush_context {
1264 	struct drbd_device *device;
1265 	struct issue_flush_context *ctx;
1266 };
1267 
1268 static void one_flush_endio(struct bio *bio)
1269 {
1270 	struct one_flush_context *octx = bio->bi_private;
1271 	struct drbd_device *device = octx->device;
1272 	struct issue_flush_context *ctx = octx->ctx;
1273 
1274 	if (bio->bi_status) {
1275 		ctx->error = blk_status_to_errno(bio->bi_status);
1276 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1277 	}
1278 	kfree(octx);
1279 	bio_put(bio);
1280 
1281 	clear_bit(FLUSH_PENDING, &device->flags);
1282 	put_ldev(device);
1283 	kref_put(&device->kref, drbd_destroy_device);
1284 
1285 	if (atomic_dec_and_test(&ctx->pending))
1286 		complete(&ctx->done);
1287 }
1288 
1289 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1290 {
1291 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1292 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1293 	if (!bio || !octx) {
1294 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295 		/* FIXME: what else can I do now?  disconnecting or detaching
1296 		 * really does not help to improve the state of the world, either.
1297 		 */
1298 		kfree(octx);
1299 		if (bio)
1300 			bio_put(bio);
1301 
1302 		ctx->error = -ENOMEM;
1303 		put_ldev(device);
1304 		kref_put(&device->kref, drbd_destroy_device);
1305 		return;
1306 	}
1307 
1308 	octx->device = device;
1309 	octx->ctx = ctx;
1310 	bio_set_dev(bio, device->ldev->backing_bdev);
1311 	bio->bi_private = octx;
1312 	bio->bi_end_io = one_flush_endio;
1313 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1314 
1315 	device->flush_jif = jiffies;
1316 	set_bit(FLUSH_PENDING, &device->flags);
1317 	atomic_inc(&ctx->pending);
1318 	submit_bio(bio);
1319 }
1320 
1321 static void drbd_flush(struct drbd_connection *connection)
1322 {
1323 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1324 		struct drbd_peer_device *peer_device;
1325 		struct issue_flush_context ctx;
1326 		int vnr;
1327 
1328 		atomic_set(&ctx.pending, 1);
1329 		ctx.error = 0;
1330 		init_completion(&ctx.done);
1331 
1332 		rcu_read_lock();
1333 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1334 			struct drbd_device *device = peer_device->device;
1335 
1336 			if (!get_ldev(device))
1337 				continue;
1338 			kref_get(&device->kref);
1339 			rcu_read_unlock();
1340 
1341 			submit_one_flush(device, &ctx);
1342 
1343 			rcu_read_lock();
1344 		}
1345 		rcu_read_unlock();
1346 
1347 		/* Do we want to add a timeout,
1348 		 * if disk-timeout is set? */
1349 		if (!atomic_dec_and_test(&ctx.pending))
1350 			wait_for_completion(&ctx.done);
1351 
1352 		if (ctx.error) {
1353 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1354 			 * don't try again for ANY return value != 0
1355 			 * if (rv == -EOPNOTSUPP) */
1356 			/* Any error is already reported by bio_endio callback. */
1357 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1358 		}
1359 	}
1360 }
1361 
1362 /**
1363  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364  * @device:	DRBD device.
1365  * @epoch:	Epoch object.
1366  * @ev:		Epoch event.
1367  */
1368 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1369 					       struct drbd_epoch *epoch,
1370 					       enum epoch_event ev)
1371 {
1372 	int epoch_size;
1373 	struct drbd_epoch *next_epoch;
1374 	enum finish_epoch rv = FE_STILL_LIVE;
1375 
1376 	spin_lock(&connection->epoch_lock);
1377 	do {
1378 		next_epoch = NULL;
1379 
1380 		epoch_size = atomic_read(&epoch->epoch_size);
1381 
1382 		switch (ev & ~EV_CLEANUP) {
1383 		case EV_PUT:
1384 			atomic_dec(&epoch->active);
1385 			break;
1386 		case EV_GOT_BARRIER_NR:
1387 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1388 			break;
1389 		case EV_BECAME_LAST:
1390 			/* nothing to do*/
1391 			break;
1392 		}
1393 
1394 		if (epoch_size != 0 &&
1395 		    atomic_read(&epoch->active) == 0 &&
1396 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1397 			if (!(ev & EV_CLEANUP)) {
1398 				spin_unlock(&connection->epoch_lock);
1399 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1400 				spin_lock(&connection->epoch_lock);
1401 			}
1402 #if 0
1403 			/* FIXME: dec unacked on connection, once we have
1404 			 * something to count pending connection packets in. */
1405 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1406 				dec_unacked(epoch->connection);
1407 #endif
1408 
1409 			if (connection->current_epoch != epoch) {
1410 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1411 				list_del(&epoch->list);
1412 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1413 				connection->epochs--;
1414 				kfree(epoch);
1415 
1416 				if (rv == FE_STILL_LIVE)
1417 					rv = FE_DESTROYED;
1418 			} else {
1419 				epoch->flags = 0;
1420 				atomic_set(&epoch->epoch_size, 0);
1421 				/* atomic_set(&epoch->active, 0); is already zero */
1422 				if (rv == FE_STILL_LIVE)
1423 					rv = FE_RECYCLED;
1424 			}
1425 		}
1426 
1427 		if (!next_epoch)
1428 			break;
1429 
1430 		epoch = next_epoch;
1431 	} while (1);
1432 
1433 	spin_unlock(&connection->epoch_lock);
1434 
1435 	return rv;
1436 }
1437 
1438 static enum write_ordering_e
1439 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1440 {
1441 	struct disk_conf *dc;
1442 
1443 	dc = rcu_dereference(bdev->disk_conf);
1444 
1445 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1446 		wo = WO_DRAIN_IO;
1447 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1448 		wo = WO_NONE;
1449 
1450 	return wo;
1451 }
1452 
1453 /**
1454  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455  * @connection:	DRBD connection.
1456  * @wo:		Write ordering method to try.
1457  */
1458 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1459 			      enum write_ordering_e wo)
1460 {
1461 	struct drbd_device *device;
1462 	enum write_ordering_e pwo;
1463 	int vnr;
1464 	static char *write_ordering_str[] = {
1465 		[WO_NONE] = "none",
1466 		[WO_DRAIN_IO] = "drain",
1467 		[WO_BDEV_FLUSH] = "flush",
1468 	};
1469 
1470 	pwo = resource->write_ordering;
1471 	if (wo != WO_BDEV_FLUSH)
1472 		wo = min(pwo, wo);
1473 	rcu_read_lock();
1474 	idr_for_each_entry(&resource->devices, device, vnr) {
1475 		if (get_ldev(device)) {
1476 			wo = max_allowed_wo(device->ldev, wo);
1477 			if (device->ldev == bdev)
1478 				bdev = NULL;
1479 			put_ldev(device);
1480 		}
1481 	}
1482 
1483 	if (bdev)
1484 		wo = max_allowed_wo(bdev, wo);
1485 
1486 	rcu_read_unlock();
1487 
1488 	resource->write_ordering = wo;
1489 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1490 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1491 }
1492 
1493 /*
1494  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1495  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1496  * will directly go to fallback mode, submitting normal writes, and
1497  * never even try to UNMAP.
1498  *
1499  * And dm-thin does not do this (yet), mostly because in general it has
1500  * to assume that "skip_block_zeroing" is set.  See also:
1501  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1502  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1503  *
1504  * We *may* ignore the discard-zeroes-data setting, if so configured.
1505  *
1506  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1507  * may ignore partial unaligned discards.
1508  *
1509  * LVM/DM thin as of at least
1510  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1511  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1512  *   Driver version:  4.29.0
1513  * still behaves this way.
1514  *
1515  * For unaligned (wrt. alignment and granularity) or too small discards,
1516  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1517  * but discard all the aligned full chunks.
1518  *
1519  * At least for LVM/DM thin, with skip_block_zeroing=false,
1520  * the result is effectively "discard_zeroes_data=1".
1521  */
1522 /* flags: EE_TRIM|EE_ZEROOUT */
1523 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1524 {
1525 	struct block_device *bdev = device->ldev->backing_bdev;
1526 	struct request_queue *q = bdev_get_queue(bdev);
1527 	sector_t tmp, nr;
1528 	unsigned int max_discard_sectors, granularity;
1529 	int alignment;
1530 	int err = 0;
1531 
1532 	if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1533 		goto zero_out;
1534 
1535 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1536 	granularity = max(q->limits.discard_granularity >> 9, 1U);
1537 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1538 
1539 	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1540 	max_discard_sectors -= max_discard_sectors % granularity;
1541 	if (unlikely(!max_discard_sectors))
1542 		goto zero_out;
1543 
1544 	if (nr_sectors < granularity)
1545 		goto zero_out;
1546 
1547 	tmp = start;
1548 	if (sector_div(tmp, granularity) != alignment) {
1549 		if (nr_sectors < 2*granularity)
1550 			goto zero_out;
1551 		/* start + gran - (start + gran - align) % gran */
1552 		tmp = start + granularity - alignment;
1553 		tmp = start + granularity - sector_div(tmp, granularity);
1554 
1555 		nr = tmp - start;
1556 		/* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1557 		 * layers are below us, some may have smaller granularity */
1558 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1559 		nr_sectors -= nr;
1560 		start = tmp;
1561 	}
1562 	while (nr_sectors >= max_discard_sectors) {
1563 		err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1564 		nr_sectors -= max_discard_sectors;
1565 		start += max_discard_sectors;
1566 	}
1567 	if (nr_sectors) {
1568 		/* max_discard_sectors is unsigned int (and a multiple of
1569 		 * granularity, we made sure of that above already);
1570 		 * nr is < max_discard_sectors;
1571 		 * I don't need sector_div here, even though nr is sector_t */
1572 		nr = nr_sectors;
1573 		nr -= (unsigned int)nr % granularity;
1574 		if (nr) {
1575 			err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1576 			nr_sectors -= nr;
1577 			start += nr;
1578 		}
1579 	}
1580  zero_out:
1581 	if (nr_sectors) {
1582 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1583 				(flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1584 	}
1585 	return err != 0;
1586 }
1587 
1588 static bool can_do_reliable_discards(struct drbd_device *device)
1589 {
1590 	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1591 	struct disk_conf *dc;
1592 	bool can_do;
1593 
1594 	if (!blk_queue_discard(q))
1595 		return false;
1596 
1597 	rcu_read_lock();
1598 	dc = rcu_dereference(device->ldev->disk_conf);
1599 	can_do = dc->discard_zeroes_if_aligned;
1600 	rcu_read_unlock();
1601 	return can_do;
1602 }
1603 
1604 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1605 {
1606 	/* If the backend cannot discard, or does not guarantee
1607 	 * read-back zeroes in discarded ranges, we fall back to
1608 	 * zero-out.  Unless configuration specifically requested
1609 	 * otherwise. */
1610 	if (!can_do_reliable_discards(device))
1611 		peer_req->flags |= EE_ZEROOUT;
1612 
1613 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1614 	    peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1615 		peer_req->flags |= EE_WAS_ERROR;
1616 	drbd_endio_write_sec_final(peer_req);
1617 }
1618 
1619 static void drbd_issue_peer_wsame(struct drbd_device *device,
1620 				  struct drbd_peer_request *peer_req)
1621 {
1622 	struct block_device *bdev = device->ldev->backing_bdev;
1623 	sector_t s = peer_req->i.sector;
1624 	sector_t nr = peer_req->i.size >> 9;
1625 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1626 		peer_req->flags |= EE_WAS_ERROR;
1627 	drbd_endio_write_sec_final(peer_req);
1628 }
1629 
1630 
1631 /**
1632  * drbd_submit_peer_request()
1633  * @device:	DRBD device.
1634  * @peer_req:	peer request
1635  * @rw:		flag field, see bio->bi_opf
1636  *
1637  * May spread the pages to multiple bios,
1638  * depending on bio_add_page restrictions.
1639  *
1640  * Returns 0 if all bios have been submitted,
1641  * -ENOMEM if we could not allocate enough bios,
1642  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1643  *  single page to an empty bio (which should never happen and likely indicates
1644  *  that the lower level IO stack is in some way broken). This has been observed
1645  *  on certain Xen deployments.
1646  */
1647 /* TODO allocate from our own bio_set. */
1648 int drbd_submit_peer_request(struct drbd_device *device,
1649 			     struct drbd_peer_request *peer_req,
1650 			     const unsigned op, const unsigned op_flags,
1651 			     const int fault_type)
1652 {
1653 	struct bio *bios = NULL;
1654 	struct bio *bio;
1655 	struct page *page = peer_req->pages;
1656 	sector_t sector = peer_req->i.sector;
1657 	unsigned data_size = peer_req->i.size;
1658 	unsigned n_bios = 0;
1659 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1660 	int err = -ENOMEM;
1661 
1662 	/* TRIM/DISCARD: for now, always use the helper function
1663 	 * blkdev_issue_zeroout(..., discard=true).
1664 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1665 	 * Correctness first, performance later.  Next step is to code an
1666 	 * asynchronous variant of the same.
1667 	 */
1668 	if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1669 		/* wait for all pending IO completions, before we start
1670 		 * zeroing things out. */
1671 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1672 		/* add it to the active list now,
1673 		 * so we can find it to present it in debugfs */
1674 		peer_req->submit_jif = jiffies;
1675 		peer_req->flags |= EE_SUBMITTED;
1676 
1677 		/* If this was a resync request from receive_rs_deallocated(),
1678 		 * it is already on the sync_ee list */
1679 		if (list_empty(&peer_req->w.list)) {
1680 			spin_lock_irq(&device->resource->req_lock);
1681 			list_add_tail(&peer_req->w.list, &device->active_ee);
1682 			spin_unlock_irq(&device->resource->req_lock);
1683 		}
1684 
1685 		if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1686 			drbd_issue_peer_discard_or_zero_out(device, peer_req);
1687 		else /* EE_WRITE_SAME */
1688 			drbd_issue_peer_wsame(device, peer_req);
1689 		return 0;
1690 	}
1691 
1692 	/* In most cases, we will only need one bio.  But in case the lower
1693 	 * level restrictions happen to be different at this offset on this
1694 	 * side than those of the sending peer, we may need to submit the
1695 	 * request in more than one bio.
1696 	 *
1697 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1698 	 * generated bio, but a bio allocated on behalf of the peer.
1699 	 */
1700 next_bio:
1701 	bio = bio_alloc(GFP_NOIO, nr_pages);
1702 	if (!bio) {
1703 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1704 		goto fail;
1705 	}
1706 	/* > peer_req->i.sector, unless this is the first bio */
1707 	bio->bi_iter.bi_sector = sector;
1708 	bio_set_dev(bio, device->ldev->backing_bdev);
1709 	bio_set_op_attrs(bio, op, op_flags);
1710 	bio->bi_private = peer_req;
1711 	bio->bi_end_io = drbd_peer_request_endio;
1712 
1713 	bio->bi_next = bios;
1714 	bios = bio;
1715 	++n_bios;
1716 
1717 	page_chain_for_each(page) {
1718 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1719 		if (!bio_add_page(bio, page, len, 0))
1720 			goto next_bio;
1721 		data_size -= len;
1722 		sector += len >> 9;
1723 		--nr_pages;
1724 	}
1725 	D_ASSERT(device, data_size == 0);
1726 	D_ASSERT(device, page == NULL);
1727 
1728 	atomic_set(&peer_req->pending_bios, n_bios);
1729 	/* for debugfs: update timestamp, mark as submitted */
1730 	peer_req->submit_jif = jiffies;
1731 	peer_req->flags |= EE_SUBMITTED;
1732 	do {
1733 		bio = bios;
1734 		bios = bios->bi_next;
1735 		bio->bi_next = NULL;
1736 
1737 		drbd_generic_make_request(device, fault_type, bio);
1738 	} while (bios);
1739 	return 0;
1740 
1741 fail:
1742 	while (bios) {
1743 		bio = bios;
1744 		bios = bios->bi_next;
1745 		bio_put(bio);
1746 	}
1747 	return err;
1748 }
1749 
1750 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1751 					     struct drbd_peer_request *peer_req)
1752 {
1753 	struct drbd_interval *i = &peer_req->i;
1754 
1755 	drbd_remove_interval(&device->write_requests, i);
1756 	drbd_clear_interval(i);
1757 
1758 	/* Wake up any processes waiting for this peer request to complete.  */
1759 	if (i->waiting)
1760 		wake_up(&device->misc_wait);
1761 }
1762 
1763 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1764 {
1765 	struct drbd_peer_device *peer_device;
1766 	int vnr;
1767 
1768 	rcu_read_lock();
1769 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1770 		struct drbd_device *device = peer_device->device;
1771 
1772 		kref_get(&device->kref);
1773 		rcu_read_unlock();
1774 		drbd_wait_ee_list_empty(device, &device->active_ee);
1775 		kref_put(&device->kref, drbd_destroy_device);
1776 		rcu_read_lock();
1777 	}
1778 	rcu_read_unlock();
1779 }
1780 
1781 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1782 {
1783 	int rv;
1784 	struct p_barrier *p = pi->data;
1785 	struct drbd_epoch *epoch;
1786 
1787 	/* FIXME these are unacked on connection,
1788 	 * not a specific (peer)device.
1789 	 */
1790 	connection->current_epoch->barrier_nr = p->barrier;
1791 	connection->current_epoch->connection = connection;
1792 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1793 
1794 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1795 	 * the activity log, which means it would not be resynced in case the
1796 	 * R_PRIMARY crashes now.
1797 	 * Therefore we must send the barrier_ack after the barrier request was
1798 	 * completed. */
1799 	switch (connection->resource->write_ordering) {
1800 	case WO_NONE:
1801 		if (rv == FE_RECYCLED)
1802 			return 0;
1803 
1804 		/* receiver context, in the writeout path of the other node.
1805 		 * avoid potential distributed deadlock */
1806 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1807 		if (epoch)
1808 			break;
1809 		else
1810 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1811 			/* Fall through */
1812 
1813 	case WO_BDEV_FLUSH:
1814 	case WO_DRAIN_IO:
1815 		conn_wait_active_ee_empty(connection);
1816 		drbd_flush(connection);
1817 
1818 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1819 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1820 			if (epoch)
1821 				break;
1822 		}
1823 
1824 		return 0;
1825 	default:
1826 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1827 			 connection->resource->write_ordering);
1828 		return -EIO;
1829 	}
1830 
1831 	epoch->flags = 0;
1832 	atomic_set(&epoch->epoch_size, 0);
1833 	atomic_set(&epoch->active, 0);
1834 
1835 	spin_lock(&connection->epoch_lock);
1836 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1837 		list_add(&epoch->list, &connection->current_epoch->list);
1838 		connection->current_epoch = epoch;
1839 		connection->epochs++;
1840 	} else {
1841 		/* The current_epoch got recycled while we allocated this one... */
1842 		kfree(epoch);
1843 	}
1844 	spin_unlock(&connection->epoch_lock);
1845 
1846 	return 0;
1847 }
1848 
1849 /* quick wrapper in case payload size != request_size (write same) */
1850 static void drbd_csum_ee_size(struct crypto_shash *h,
1851 			      struct drbd_peer_request *r, void *d,
1852 			      unsigned int payload_size)
1853 {
1854 	unsigned int tmp = r->i.size;
1855 	r->i.size = payload_size;
1856 	drbd_csum_ee(h, r, d);
1857 	r->i.size = tmp;
1858 }
1859 
1860 /* used from receive_RSDataReply (recv_resync_read)
1861  * and from receive_Data.
1862  * data_size: actual payload ("data in")
1863  * 	for normal writes that is bi_size.
1864  * 	for discards, that is zero.
1865  * 	for write same, it is logical_block_size.
1866  * both trim and write same have the bi_size ("data len to be affected")
1867  * as extra argument in the packet header.
1868  */
1869 static struct drbd_peer_request *
1870 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1871 	      struct packet_info *pi) __must_hold(local)
1872 {
1873 	struct drbd_device *device = peer_device->device;
1874 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1875 	struct drbd_peer_request *peer_req;
1876 	struct page *page;
1877 	int digest_size, err;
1878 	unsigned int data_size = pi->size, ds;
1879 	void *dig_in = peer_device->connection->int_dig_in;
1880 	void *dig_vv = peer_device->connection->int_dig_vv;
1881 	unsigned long *data;
1882 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1883 	struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1884 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1885 
1886 	digest_size = 0;
1887 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1888 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1889 		/*
1890 		 * FIXME: Receive the incoming digest into the receive buffer
1891 		 *	  here, together with its struct p_data?
1892 		 */
1893 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1894 		if (err)
1895 			return NULL;
1896 		data_size -= digest_size;
1897 	}
1898 
1899 	/* assume request_size == data_size, but special case trim and wsame. */
1900 	ds = data_size;
1901 	if (trim) {
1902 		if (!expect(data_size == 0))
1903 			return NULL;
1904 		ds = be32_to_cpu(trim->size);
1905 	} else if (zeroes) {
1906 		if (!expect(data_size == 0))
1907 			return NULL;
1908 		ds = be32_to_cpu(zeroes->size);
1909 	} else if (wsame) {
1910 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1911 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1912 				data_size, queue_logical_block_size(device->rq_queue));
1913 			return NULL;
1914 		}
1915 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1916 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1917 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1918 			return NULL;
1919 		}
1920 		ds = be32_to_cpu(wsame->size);
1921 	}
1922 
1923 	if (!expect(IS_ALIGNED(ds, 512)))
1924 		return NULL;
1925 	if (trim || wsame || zeroes) {
1926 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1927 			return NULL;
1928 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1929 		return NULL;
1930 
1931 	/* even though we trust out peer,
1932 	 * we sometimes have to double check. */
1933 	if (sector + (ds>>9) > capacity) {
1934 		drbd_err(device, "request from peer beyond end of local disk: "
1935 			"capacity: %llus < sector: %llus + size: %u\n",
1936 			(unsigned long long)capacity,
1937 			(unsigned long long)sector, ds);
1938 		return NULL;
1939 	}
1940 
1941 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1942 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1943 	 * which in turn might block on the other node at this very place.  */
1944 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1945 	if (!peer_req)
1946 		return NULL;
1947 
1948 	peer_req->flags |= EE_WRITE;
1949 	if (trim) {
1950 		peer_req->flags |= EE_TRIM;
1951 		return peer_req;
1952 	}
1953 	if (zeroes) {
1954 		peer_req->flags |= EE_ZEROOUT;
1955 		return peer_req;
1956 	}
1957 	if (wsame)
1958 		peer_req->flags |= EE_WRITE_SAME;
1959 
1960 	/* receive payload size bytes into page chain */
1961 	ds = data_size;
1962 	page = peer_req->pages;
1963 	page_chain_for_each(page) {
1964 		unsigned len = min_t(int, ds, PAGE_SIZE);
1965 		data = kmap(page);
1966 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1967 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1968 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1969 			data[0] = data[0] ^ (unsigned long)-1;
1970 		}
1971 		kunmap(page);
1972 		if (err) {
1973 			drbd_free_peer_req(device, peer_req);
1974 			return NULL;
1975 		}
1976 		ds -= len;
1977 	}
1978 
1979 	if (digest_size) {
1980 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1981 		if (memcmp(dig_in, dig_vv, digest_size)) {
1982 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1983 				(unsigned long long)sector, data_size);
1984 			drbd_free_peer_req(device, peer_req);
1985 			return NULL;
1986 		}
1987 	}
1988 	device->recv_cnt += data_size >> 9;
1989 	return peer_req;
1990 }
1991 
1992 /* drbd_drain_block() just takes a data block
1993  * out of the socket input buffer, and discards it.
1994  */
1995 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1996 {
1997 	struct page *page;
1998 	int err = 0;
1999 	void *data;
2000 
2001 	if (!data_size)
2002 		return 0;
2003 
2004 	page = drbd_alloc_pages(peer_device, 1, 1);
2005 
2006 	data = kmap(page);
2007 	while (data_size) {
2008 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
2009 
2010 		err = drbd_recv_all_warn(peer_device->connection, data, len);
2011 		if (err)
2012 			break;
2013 		data_size -= len;
2014 	}
2015 	kunmap(page);
2016 	drbd_free_pages(peer_device->device, page, 0);
2017 	return err;
2018 }
2019 
2020 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2021 			   sector_t sector, int data_size)
2022 {
2023 	struct bio_vec bvec;
2024 	struct bvec_iter iter;
2025 	struct bio *bio;
2026 	int digest_size, err, expect;
2027 	void *dig_in = peer_device->connection->int_dig_in;
2028 	void *dig_vv = peer_device->connection->int_dig_vv;
2029 
2030 	digest_size = 0;
2031 	if (peer_device->connection->peer_integrity_tfm) {
2032 		digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2033 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2034 		if (err)
2035 			return err;
2036 		data_size -= digest_size;
2037 	}
2038 
2039 	/* optimistically update recv_cnt.  if receiving fails below,
2040 	 * we disconnect anyways, and counters will be reset. */
2041 	peer_device->device->recv_cnt += data_size>>9;
2042 
2043 	bio = req->master_bio;
2044 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2045 
2046 	bio_for_each_segment(bvec, bio, iter) {
2047 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2048 		expect = min_t(int, data_size, bvec.bv_len);
2049 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2050 		kunmap(bvec.bv_page);
2051 		if (err)
2052 			return err;
2053 		data_size -= expect;
2054 	}
2055 
2056 	if (digest_size) {
2057 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2058 		if (memcmp(dig_in, dig_vv, digest_size)) {
2059 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2060 			return -EINVAL;
2061 		}
2062 	}
2063 
2064 	D_ASSERT(peer_device->device, data_size == 0);
2065 	return 0;
2066 }
2067 
2068 /*
2069  * e_end_resync_block() is called in ack_sender context via
2070  * drbd_finish_peer_reqs().
2071  */
2072 static int e_end_resync_block(struct drbd_work *w, int unused)
2073 {
2074 	struct drbd_peer_request *peer_req =
2075 		container_of(w, struct drbd_peer_request, w);
2076 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2077 	struct drbd_device *device = peer_device->device;
2078 	sector_t sector = peer_req->i.sector;
2079 	int err;
2080 
2081 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2082 
2083 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2084 		drbd_set_in_sync(device, sector, peer_req->i.size);
2085 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2086 	} else {
2087 		/* Record failure to sync */
2088 		drbd_rs_failed_io(device, sector, peer_req->i.size);
2089 
2090 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2091 	}
2092 	dec_unacked(device);
2093 
2094 	return err;
2095 }
2096 
2097 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2098 			    struct packet_info *pi) __releases(local)
2099 {
2100 	struct drbd_device *device = peer_device->device;
2101 	struct drbd_peer_request *peer_req;
2102 
2103 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2104 	if (!peer_req)
2105 		goto fail;
2106 
2107 	dec_rs_pending(device);
2108 
2109 	inc_unacked(device);
2110 	/* corresponding dec_unacked() in e_end_resync_block()
2111 	 * respective _drbd_clear_done_ee */
2112 
2113 	peer_req->w.cb = e_end_resync_block;
2114 	peer_req->submit_jif = jiffies;
2115 
2116 	spin_lock_irq(&device->resource->req_lock);
2117 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2118 	spin_unlock_irq(&device->resource->req_lock);
2119 
2120 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2121 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2122 				     DRBD_FAULT_RS_WR) == 0)
2123 		return 0;
2124 
2125 	/* don't care for the reason here */
2126 	drbd_err(device, "submit failed, triggering re-connect\n");
2127 	spin_lock_irq(&device->resource->req_lock);
2128 	list_del(&peer_req->w.list);
2129 	spin_unlock_irq(&device->resource->req_lock);
2130 
2131 	drbd_free_peer_req(device, peer_req);
2132 fail:
2133 	put_ldev(device);
2134 	return -EIO;
2135 }
2136 
2137 static struct drbd_request *
2138 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2139 	     sector_t sector, bool missing_ok, const char *func)
2140 {
2141 	struct drbd_request *req;
2142 
2143 	/* Request object according to our peer */
2144 	req = (struct drbd_request *)(unsigned long)id;
2145 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2146 		return req;
2147 	if (!missing_ok) {
2148 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2149 			(unsigned long)id, (unsigned long long)sector);
2150 	}
2151 	return NULL;
2152 }
2153 
2154 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2155 {
2156 	struct drbd_peer_device *peer_device;
2157 	struct drbd_device *device;
2158 	struct drbd_request *req;
2159 	sector_t sector;
2160 	int err;
2161 	struct p_data *p = pi->data;
2162 
2163 	peer_device = conn_peer_device(connection, pi->vnr);
2164 	if (!peer_device)
2165 		return -EIO;
2166 	device = peer_device->device;
2167 
2168 	sector = be64_to_cpu(p->sector);
2169 
2170 	spin_lock_irq(&device->resource->req_lock);
2171 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2172 	spin_unlock_irq(&device->resource->req_lock);
2173 	if (unlikely(!req))
2174 		return -EIO;
2175 
2176 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2177 	 * special casing it there for the various failure cases.
2178 	 * still no race with drbd_fail_pending_reads */
2179 	err = recv_dless_read(peer_device, req, sector, pi->size);
2180 	if (!err)
2181 		req_mod(req, DATA_RECEIVED);
2182 	/* else: nothing. handled from drbd_disconnect...
2183 	 * I don't think we may complete this just yet
2184 	 * in case we are "on-disconnect: freeze" */
2185 
2186 	return err;
2187 }
2188 
2189 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2190 {
2191 	struct drbd_peer_device *peer_device;
2192 	struct drbd_device *device;
2193 	sector_t sector;
2194 	int err;
2195 	struct p_data *p = pi->data;
2196 
2197 	peer_device = conn_peer_device(connection, pi->vnr);
2198 	if (!peer_device)
2199 		return -EIO;
2200 	device = peer_device->device;
2201 
2202 	sector = be64_to_cpu(p->sector);
2203 	D_ASSERT(device, p->block_id == ID_SYNCER);
2204 
2205 	if (get_ldev(device)) {
2206 		/* data is submitted to disk within recv_resync_read.
2207 		 * corresponding put_ldev done below on error,
2208 		 * or in drbd_peer_request_endio. */
2209 		err = recv_resync_read(peer_device, sector, pi);
2210 	} else {
2211 		if (__ratelimit(&drbd_ratelimit_state))
2212 			drbd_err(device, "Can not write resync data to local disk.\n");
2213 
2214 		err = drbd_drain_block(peer_device, pi->size);
2215 
2216 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2217 	}
2218 
2219 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2220 
2221 	return err;
2222 }
2223 
2224 static void restart_conflicting_writes(struct drbd_device *device,
2225 				       sector_t sector, int size)
2226 {
2227 	struct drbd_interval *i;
2228 	struct drbd_request *req;
2229 
2230 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2231 		if (!i->local)
2232 			continue;
2233 		req = container_of(i, struct drbd_request, i);
2234 		if (req->rq_state & RQ_LOCAL_PENDING ||
2235 		    !(req->rq_state & RQ_POSTPONED))
2236 			continue;
2237 		/* as it is RQ_POSTPONED, this will cause it to
2238 		 * be queued on the retry workqueue. */
2239 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2240 	}
2241 }
2242 
2243 /*
2244  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2245  */
2246 static int e_end_block(struct drbd_work *w, int cancel)
2247 {
2248 	struct drbd_peer_request *peer_req =
2249 		container_of(w, struct drbd_peer_request, w);
2250 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2251 	struct drbd_device *device = peer_device->device;
2252 	sector_t sector = peer_req->i.sector;
2253 	int err = 0, pcmd;
2254 
2255 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2256 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2257 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2258 				device->state.conn <= C_PAUSED_SYNC_T &&
2259 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2260 				P_RS_WRITE_ACK : P_WRITE_ACK;
2261 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2262 			if (pcmd == P_RS_WRITE_ACK)
2263 				drbd_set_in_sync(device, sector, peer_req->i.size);
2264 		} else {
2265 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2266 			/* we expect it to be marked out of sync anyways...
2267 			 * maybe assert this?  */
2268 		}
2269 		dec_unacked(device);
2270 	}
2271 
2272 	/* we delete from the conflict detection hash _after_ we sent out the
2273 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2274 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2275 		spin_lock_irq(&device->resource->req_lock);
2276 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2277 		drbd_remove_epoch_entry_interval(device, peer_req);
2278 		if (peer_req->flags & EE_RESTART_REQUESTS)
2279 			restart_conflicting_writes(device, sector, peer_req->i.size);
2280 		spin_unlock_irq(&device->resource->req_lock);
2281 	} else
2282 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2283 
2284 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2285 
2286 	return err;
2287 }
2288 
2289 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2290 {
2291 	struct drbd_peer_request *peer_req =
2292 		container_of(w, struct drbd_peer_request, w);
2293 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2294 	int err;
2295 
2296 	err = drbd_send_ack(peer_device, ack, peer_req);
2297 	dec_unacked(peer_device->device);
2298 
2299 	return err;
2300 }
2301 
2302 static int e_send_superseded(struct drbd_work *w, int unused)
2303 {
2304 	return e_send_ack(w, P_SUPERSEDED);
2305 }
2306 
2307 static int e_send_retry_write(struct drbd_work *w, int unused)
2308 {
2309 	struct drbd_peer_request *peer_req =
2310 		container_of(w, struct drbd_peer_request, w);
2311 	struct drbd_connection *connection = peer_req->peer_device->connection;
2312 
2313 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2314 			     P_RETRY_WRITE : P_SUPERSEDED);
2315 }
2316 
2317 static bool seq_greater(u32 a, u32 b)
2318 {
2319 	/*
2320 	 * We assume 32-bit wrap-around here.
2321 	 * For 24-bit wrap-around, we would have to shift:
2322 	 *  a <<= 8; b <<= 8;
2323 	 */
2324 	return (s32)a - (s32)b > 0;
2325 }
2326 
2327 static u32 seq_max(u32 a, u32 b)
2328 {
2329 	return seq_greater(a, b) ? a : b;
2330 }
2331 
2332 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2333 {
2334 	struct drbd_device *device = peer_device->device;
2335 	unsigned int newest_peer_seq;
2336 
2337 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2338 		spin_lock(&device->peer_seq_lock);
2339 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2340 		device->peer_seq = newest_peer_seq;
2341 		spin_unlock(&device->peer_seq_lock);
2342 		/* wake up only if we actually changed device->peer_seq */
2343 		if (peer_seq == newest_peer_seq)
2344 			wake_up(&device->seq_wait);
2345 	}
2346 }
2347 
2348 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2349 {
2350 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2351 }
2352 
2353 /* maybe change sync_ee into interval trees as well? */
2354 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2355 {
2356 	struct drbd_peer_request *rs_req;
2357 	bool rv = false;
2358 
2359 	spin_lock_irq(&device->resource->req_lock);
2360 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2361 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2362 			     rs_req->i.sector, rs_req->i.size)) {
2363 			rv = true;
2364 			break;
2365 		}
2366 	}
2367 	spin_unlock_irq(&device->resource->req_lock);
2368 
2369 	return rv;
2370 }
2371 
2372 /* Called from receive_Data.
2373  * Synchronize packets on sock with packets on msock.
2374  *
2375  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2376  * packet traveling on msock, they are still processed in the order they have
2377  * been sent.
2378  *
2379  * Note: we don't care for Ack packets overtaking P_DATA packets.
2380  *
2381  * In case packet_seq is larger than device->peer_seq number, there are
2382  * outstanding packets on the msock. We wait for them to arrive.
2383  * In case we are the logically next packet, we update device->peer_seq
2384  * ourselves. Correctly handles 32bit wrap around.
2385  *
2386  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2387  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2388  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2389  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2390  *
2391  * returns 0 if we may process the packet,
2392  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2393 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2394 {
2395 	struct drbd_device *device = peer_device->device;
2396 	DEFINE_WAIT(wait);
2397 	long timeout;
2398 	int ret = 0, tp;
2399 
2400 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2401 		return 0;
2402 
2403 	spin_lock(&device->peer_seq_lock);
2404 	for (;;) {
2405 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2406 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2407 			break;
2408 		}
2409 
2410 		if (signal_pending(current)) {
2411 			ret = -ERESTARTSYS;
2412 			break;
2413 		}
2414 
2415 		rcu_read_lock();
2416 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2417 		rcu_read_unlock();
2418 
2419 		if (!tp)
2420 			break;
2421 
2422 		/* Only need to wait if two_primaries is enabled */
2423 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2424 		spin_unlock(&device->peer_seq_lock);
2425 		rcu_read_lock();
2426 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2427 		rcu_read_unlock();
2428 		timeout = schedule_timeout(timeout);
2429 		spin_lock(&device->peer_seq_lock);
2430 		if (!timeout) {
2431 			ret = -ETIMEDOUT;
2432 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2433 			break;
2434 		}
2435 	}
2436 	spin_unlock(&device->peer_seq_lock);
2437 	finish_wait(&device->seq_wait, &wait);
2438 	return ret;
2439 }
2440 
2441 /* see also bio_flags_to_wire()
2442  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2443  * flags and back. We may replicate to other kernel versions. */
2444 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2445 {
2446 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2447 		(dpf & DP_FUA ? REQ_FUA : 0) |
2448 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2449 }
2450 
2451 static unsigned long wire_flags_to_bio_op(u32 dpf)
2452 {
2453 	if (dpf & DP_ZEROES)
2454 		return REQ_OP_WRITE_ZEROES;
2455 	if (dpf & DP_DISCARD)
2456 		return REQ_OP_DISCARD;
2457 	if (dpf & DP_WSAME)
2458 		return REQ_OP_WRITE_SAME;
2459 	else
2460 		return REQ_OP_WRITE;
2461 }
2462 
2463 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2464 				    unsigned int size)
2465 {
2466 	struct drbd_interval *i;
2467 
2468     repeat:
2469 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2470 		struct drbd_request *req;
2471 		struct bio_and_error m;
2472 
2473 		if (!i->local)
2474 			continue;
2475 		req = container_of(i, struct drbd_request, i);
2476 		if (!(req->rq_state & RQ_POSTPONED))
2477 			continue;
2478 		req->rq_state &= ~RQ_POSTPONED;
2479 		__req_mod(req, NEG_ACKED, &m);
2480 		spin_unlock_irq(&device->resource->req_lock);
2481 		if (m.bio)
2482 			complete_master_bio(device, &m);
2483 		spin_lock_irq(&device->resource->req_lock);
2484 		goto repeat;
2485 	}
2486 }
2487 
2488 static int handle_write_conflicts(struct drbd_device *device,
2489 				  struct drbd_peer_request *peer_req)
2490 {
2491 	struct drbd_connection *connection = peer_req->peer_device->connection;
2492 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2493 	sector_t sector = peer_req->i.sector;
2494 	const unsigned int size = peer_req->i.size;
2495 	struct drbd_interval *i;
2496 	bool equal;
2497 	int err;
2498 
2499 	/*
2500 	 * Inserting the peer request into the write_requests tree will prevent
2501 	 * new conflicting local requests from being added.
2502 	 */
2503 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2504 
2505     repeat:
2506 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2507 		if (i == &peer_req->i)
2508 			continue;
2509 		if (i->completed)
2510 			continue;
2511 
2512 		if (!i->local) {
2513 			/*
2514 			 * Our peer has sent a conflicting remote request; this
2515 			 * should not happen in a two-node setup.  Wait for the
2516 			 * earlier peer request to complete.
2517 			 */
2518 			err = drbd_wait_misc(device, i);
2519 			if (err)
2520 				goto out;
2521 			goto repeat;
2522 		}
2523 
2524 		equal = i->sector == sector && i->size == size;
2525 		if (resolve_conflicts) {
2526 			/*
2527 			 * If the peer request is fully contained within the
2528 			 * overlapping request, it can be considered overwritten
2529 			 * and thus superseded; otherwise, it will be retried
2530 			 * once all overlapping requests have completed.
2531 			 */
2532 			bool superseded = i->sector <= sector && i->sector +
2533 				       (i->size >> 9) >= sector + (size >> 9);
2534 
2535 			if (!equal)
2536 				drbd_alert(device, "Concurrent writes detected: "
2537 					       "local=%llus +%u, remote=%llus +%u, "
2538 					       "assuming %s came first\n",
2539 					  (unsigned long long)i->sector, i->size,
2540 					  (unsigned long long)sector, size,
2541 					  superseded ? "local" : "remote");
2542 
2543 			peer_req->w.cb = superseded ? e_send_superseded :
2544 						   e_send_retry_write;
2545 			list_add_tail(&peer_req->w.list, &device->done_ee);
2546 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2547 
2548 			err = -ENOENT;
2549 			goto out;
2550 		} else {
2551 			struct drbd_request *req =
2552 				container_of(i, struct drbd_request, i);
2553 
2554 			if (!equal)
2555 				drbd_alert(device, "Concurrent writes detected: "
2556 					       "local=%llus +%u, remote=%llus +%u\n",
2557 					  (unsigned long long)i->sector, i->size,
2558 					  (unsigned long long)sector, size);
2559 
2560 			if (req->rq_state & RQ_LOCAL_PENDING ||
2561 			    !(req->rq_state & RQ_POSTPONED)) {
2562 				/*
2563 				 * Wait for the node with the discard flag to
2564 				 * decide if this request has been superseded
2565 				 * or needs to be retried.
2566 				 * Requests that have been superseded will
2567 				 * disappear from the write_requests tree.
2568 				 *
2569 				 * In addition, wait for the conflicting
2570 				 * request to finish locally before submitting
2571 				 * the conflicting peer request.
2572 				 */
2573 				err = drbd_wait_misc(device, &req->i);
2574 				if (err) {
2575 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2576 					fail_postponed_requests(device, sector, size);
2577 					goto out;
2578 				}
2579 				goto repeat;
2580 			}
2581 			/*
2582 			 * Remember to restart the conflicting requests after
2583 			 * the new peer request has completed.
2584 			 */
2585 			peer_req->flags |= EE_RESTART_REQUESTS;
2586 		}
2587 	}
2588 	err = 0;
2589 
2590     out:
2591 	if (err)
2592 		drbd_remove_epoch_entry_interval(device, peer_req);
2593 	return err;
2594 }
2595 
2596 /* mirrored write */
2597 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2598 {
2599 	struct drbd_peer_device *peer_device;
2600 	struct drbd_device *device;
2601 	struct net_conf *nc;
2602 	sector_t sector;
2603 	struct drbd_peer_request *peer_req;
2604 	struct p_data *p = pi->data;
2605 	u32 peer_seq = be32_to_cpu(p->seq_num);
2606 	int op, op_flags;
2607 	u32 dp_flags;
2608 	int err, tp;
2609 
2610 	peer_device = conn_peer_device(connection, pi->vnr);
2611 	if (!peer_device)
2612 		return -EIO;
2613 	device = peer_device->device;
2614 
2615 	if (!get_ldev(device)) {
2616 		int err2;
2617 
2618 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2619 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2620 		atomic_inc(&connection->current_epoch->epoch_size);
2621 		err2 = drbd_drain_block(peer_device, pi->size);
2622 		if (!err)
2623 			err = err2;
2624 		return err;
2625 	}
2626 
2627 	/*
2628 	 * Corresponding put_ldev done either below (on various errors), or in
2629 	 * drbd_peer_request_endio, if we successfully submit the data at the
2630 	 * end of this function.
2631 	 */
2632 
2633 	sector = be64_to_cpu(p->sector);
2634 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2635 	if (!peer_req) {
2636 		put_ldev(device);
2637 		return -EIO;
2638 	}
2639 
2640 	peer_req->w.cb = e_end_block;
2641 	peer_req->submit_jif = jiffies;
2642 	peer_req->flags |= EE_APPLICATION;
2643 
2644 	dp_flags = be32_to_cpu(p->dp_flags);
2645 	op = wire_flags_to_bio_op(dp_flags);
2646 	op_flags = wire_flags_to_bio_flags(dp_flags);
2647 	if (pi->cmd == P_TRIM) {
2648 		D_ASSERT(peer_device, peer_req->i.size > 0);
2649 		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2650 		D_ASSERT(peer_device, peer_req->pages == NULL);
2651 		/* need to play safe: an older DRBD sender
2652 		 * may mean zero-out while sending P_TRIM. */
2653 		if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2654 			peer_req->flags |= EE_ZEROOUT;
2655 	} else if (pi->cmd == P_ZEROES) {
2656 		D_ASSERT(peer_device, peer_req->i.size > 0);
2657 		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2658 		D_ASSERT(peer_device, peer_req->pages == NULL);
2659 		/* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2660 		if (dp_flags & DP_DISCARD)
2661 			peer_req->flags |= EE_TRIM;
2662 	} else if (peer_req->pages == NULL) {
2663 		D_ASSERT(device, peer_req->i.size == 0);
2664 		D_ASSERT(device, dp_flags & DP_FLUSH);
2665 	}
2666 
2667 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2668 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2669 
2670 	spin_lock(&connection->epoch_lock);
2671 	peer_req->epoch = connection->current_epoch;
2672 	atomic_inc(&peer_req->epoch->epoch_size);
2673 	atomic_inc(&peer_req->epoch->active);
2674 	spin_unlock(&connection->epoch_lock);
2675 
2676 	rcu_read_lock();
2677 	nc = rcu_dereference(peer_device->connection->net_conf);
2678 	tp = nc->two_primaries;
2679 	if (peer_device->connection->agreed_pro_version < 100) {
2680 		switch (nc->wire_protocol) {
2681 		case DRBD_PROT_C:
2682 			dp_flags |= DP_SEND_WRITE_ACK;
2683 			break;
2684 		case DRBD_PROT_B:
2685 			dp_flags |= DP_SEND_RECEIVE_ACK;
2686 			break;
2687 		}
2688 	}
2689 	rcu_read_unlock();
2690 
2691 	if (dp_flags & DP_SEND_WRITE_ACK) {
2692 		peer_req->flags |= EE_SEND_WRITE_ACK;
2693 		inc_unacked(device);
2694 		/* corresponding dec_unacked() in e_end_block()
2695 		 * respective _drbd_clear_done_ee */
2696 	}
2697 
2698 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2699 		/* I really don't like it that the receiver thread
2700 		 * sends on the msock, but anyways */
2701 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2702 	}
2703 
2704 	if (tp) {
2705 		/* two primaries implies protocol C */
2706 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2707 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2708 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2709 		if (err)
2710 			goto out_interrupted;
2711 		spin_lock_irq(&device->resource->req_lock);
2712 		err = handle_write_conflicts(device, peer_req);
2713 		if (err) {
2714 			spin_unlock_irq(&device->resource->req_lock);
2715 			if (err == -ENOENT) {
2716 				put_ldev(device);
2717 				return 0;
2718 			}
2719 			goto out_interrupted;
2720 		}
2721 	} else {
2722 		update_peer_seq(peer_device, peer_seq);
2723 		spin_lock_irq(&device->resource->req_lock);
2724 	}
2725 	/* TRIM and WRITE_SAME are processed synchronously,
2726 	 * we wait for all pending requests, respectively wait for
2727 	 * active_ee to become empty in drbd_submit_peer_request();
2728 	 * better not add ourselves here. */
2729 	if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2730 		list_add_tail(&peer_req->w.list, &device->active_ee);
2731 	spin_unlock_irq(&device->resource->req_lock);
2732 
2733 	if (device->state.conn == C_SYNC_TARGET)
2734 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2735 
2736 	if (device->state.pdsk < D_INCONSISTENT) {
2737 		/* In case we have the only disk of the cluster, */
2738 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2739 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2740 		drbd_al_begin_io(device, &peer_req->i);
2741 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2742 	}
2743 
2744 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2745 				       DRBD_FAULT_DT_WR);
2746 	if (!err)
2747 		return 0;
2748 
2749 	/* don't care for the reason here */
2750 	drbd_err(device, "submit failed, triggering re-connect\n");
2751 	spin_lock_irq(&device->resource->req_lock);
2752 	list_del(&peer_req->w.list);
2753 	drbd_remove_epoch_entry_interval(device, peer_req);
2754 	spin_unlock_irq(&device->resource->req_lock);
2755 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2756 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2757 		drbd_al_complete_io(device, &peer_req->i);
2758 	}
2759 
2760 out_interrupted:
2761 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2762 	put_ldev(device);
2763 	drbd_free_peer_req(device, peer_req);
2764 	return err;
2765 }
2766 
2767 /* We may throttle resync, if the lower device seems to be busy,
2768  * and current sync rate is above c_min_rate.
2769  *
2770  * To decide whether or not the lower device is busy, we use a scheme similar
2771  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2772  * (more than 64 sectors) of activity we cannot account for with our own resync
2773  * activity, it obviously is "busy".
2774  *
2775  * The current sync rate used here uses only the most recent two step marks,
2776  * to have a short time average so we can react faster.
2777  */
2778 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2779 		bool throttle_if_app_is_waiting)
2780 {
2781 	struct lc_element *tmp;
2782 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2783 
2784 	if (!throttle || throttle_if_app_is_waiting)
2785 		return throttle;
2786 
2787 	spin_lock_irq(&device->al_lock);
2788 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2789 	if (tmp) {
2790 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2791 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2792 			throttle = false;
2793 		/* Do not slow down if app IO is already waiting for this extent,
2794 		 * and our progress is necessary for application IO to complete. */
2795 	}
2796 	spin_unlock_irq(&device->al_lock);
2797 
2798 	return throttle;
2799 }
2800 
2801 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2802 {
2803 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2804 	unsigned long db, dt, dbdt;
2805 	unsigned int c_min_rate;
2806 	int curr_events;
2807 
2808 	rcu_read_lock();
2809 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2810 	rcu_read_unlock();
2811 
2812 	/* feature disabled? */
2813 	if (c_min_rate == 0)
2814 		return false;
2815 
2816 	curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2817 			atomic_read(&device->rs_sect_ev);
2818 
2819 	if (atomic_read(&device->ap_actlog_cnt)
2820 	    || curr_events - device->rs_last_events > 64) {
2821 		unsigned long rs_left;
2822 		int i;
2823 
2824 		device->rs_last_events = curr_events;
2825 
2826 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2827 		 * approx. */
2828 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2829 
2830 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2831 			rs_left = device->ov_left;
2832 		else
2833 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2834 
2835 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2836 		if (!dt)
2837 			dt++;
2838 		db = device->rs_mark_left[i] - rs_left;
2839 		dbdt = Bit2KB(db/dt);
2840 
2841 		if (dbdt > c_min_rate)
2842 			return true;
2843 	}
2844 	return false;
2845 }
2846 
2847 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2848 {
2849 	struct drbd_peer_device *peer_device;
2850 	struct drbd_device *device;
2851 	sector_t sector;
2852 	sector_t capacity;
2853 	struct drbd_peer_request *peer_req;
2854 	struct digest_info *di = NULL;
2855 	int size, verb;
2856 	unsigned int fault_type;
2857 	struct p_block_req *p =	pi->data;
2858 
2859 	peer_device = conn_peer_device(connection, pi->vnr);
2860 	if (!peer_device)
2861 		return -EIO;
2862 	device = peer_device->device;
2863 	capacity = drbd_get_capacity(device->this_bdev);
2864 
2865 	sector = be64_to_cpu(p->sector);
2866 	size   = be32_to_cpu(p->blksize);
2867 
2868 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2869 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2870 				(unsigned long long)sector, size);
2871 		return -EINVAL;
2872 	}
2873 	if (sector + (size>>9) > capacity) {
2874 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2875 				(unsigned long long)sector, size);
2876 		return -EINVAL;
2877 	}
2878 
2879 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2880 		verb = 1;
2881 		switch (pi->cmd) {
2882 		case P_DATA_REQUEST:
2883 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2884 			break;
2885 		case P_RS_THIN_REQ:
2886 		case P_RS_DATA_REQUEST:
2887 		case P_CSUM_RS_REQUEST:
2888 		case P_OV_REQUEST:
2889 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2890 			break;
2891 		case P_OV_REPLY:
2892 			verb = 0;
2893 			dec_rs_pending(device);
2894 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2895 			break;
2896 		default:
2897 			BUG();
2898 		}
2899 		if (verb && __ratelimit(&drbd_ratelimit_state))
2900 			drbd_err(device, "Can not satisfy peer's read request, "
2901 			    "no local data.\n");
2902 
2903 		/* drain possibly payload */
2904 		return drbd_drain_block(peer_device, pi->size);
2905 	}
2906 
2907 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2908 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2909 	 * which in turn might block on the other node at this very place.  */
2910 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2911 			size, GFP_NOIO);
2912 	if (!peer_req) {
2913 		put_ldev(device);
2914 		return -ENOMEM;
2915 	}
2916 
2917 	switch (pi->cmd) {
2918 	case P_DATA_REQUEST:
2919 		peer_req->w.cb = w_e_end_data_req;
2920 		fault_type = DRBD_FAULT_DT_RD;
2921 		/* application IO, don't drbd_rs_begin_io */
2922 		peer_req->flags |= EE_APPLICATION;
2923 		goto submit;
2924 
2925 	case P_RS_THIN_REQ:
2926 		/* If at some point in the future we have a smart way to
2927 		   find out if this data block is completely deallocated,
2928 		   then we would do something smarter here than reading
2929 		   the block... */
2930 		peer_req->flags |= EE_RS_THIN_REQ;
2931 		/* fall through */
2932 	case P_RS_DATA_REQUEST:
2933 		peer_req->w.cb = w_e_end_rsdata_req;
2934 		fault_type = DRBD_FAULT_RS_RD;
2935 		/* used in the sector offset progress display */
2936 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2937 		break;
2938 
2939 	case P_OV_REPLY:
2940 	case P_CSUM_RS_REQUEST:
2941 		fault_type = DRBD_FAULT_RS_RD;
2942 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2943 		if (!di)
2944 			goto out_free_e;
2945 
2946 		di->digest_size = pi->size;
2947 		di->digest = (((char *)di)+sizeof(struct digest_info));
2948 
2949 		peer_req->digest = di;
2950 		peer_req->flags |= EE_HAS_DIGEST;
2951 
2952 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2953 			goto out_free_e;
2954 
2955 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2956 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2957 			peer_req->w.cb = w_e_end_csum_rs_req;
2958 			/* used in the sector offset progress display */
2959 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2960 			/* remember to report stats in drbd_resync_finished */
2961 			device->use_csums = true;
2962 		} else if (pi->cmd == P_OV_REPLY) {
2963 			/* track progress, we may need to throttle */
2964 			atomic_add(size >> 9, &device->rs_sect_in);
2965 			peer_req->w.cb = w_e_end_ov_reply;
2966 			dec_rs_pending(device);
2967 			/* drbd_rs_begin_io done when we sent this request,
2968 			 * but accounting still needs to be done. */
2969 			goto submit_for_resync;
2970 		}
2971 		break;
2972 
2973 	case P_OV_REQUEST:
2974 		if (device->ov_start_sector == ~(sector_t)0 &&
2975 		    peer_device->connection->agreed_pro_version >= 90) {
2976 			unsigned long now = jiffies;
2977 			int i;
2978 			device->ov_start_sector = sector;
2979 			device->ov_position = sector;
2980 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2981 			device->rs_total = device->ov_left;
2982 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2983 				device->rs_mark_left[i] = device->ov_left;
2984 				device->rs_mark_time[i] = now;
2985 			}
2986 			drbd_info(device, "Online Verify start sector: %llu\n",
2987 					(unsigned long long)sector);
2988 		}
2989 		peer_req->w.cb = w_e_end_ov_req;
2990 		fault_type = DRBD_FAULT_RS_RD;
2991 		break;
2992 
2993 	default:
2994 		BUG();
2995 	}
2996 
2997 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2998 	 * wrt the receiver, but it is not as straightforward as it may seem.
2999 	 * Various places in the resync start and stop logic assume resync
3000 	 * requests are processed in order, requeuing this on the worker thread
3001 	 * introduces a bunch of new code for synchronization between threads.
3002 	 *
3003 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
3004 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
3005 	 * for application writes for the same time.  For now, just throttle
3006 	 * here, where the rest of the code expects the receiver to sleep for
3007 	 * a while, anyways.
3008 	 */
3009 
3010 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
3011 	 * this defers syncer requests for some time, before letting at least
3012 	 * on request through.  The resync controller on the receiving side
3013 	 * will adapt to the incoming rate accordingly.
3014 	 *
3015 	 * We cannot throttle here if remote is Primary/SyncTarget:
3016 	 * we would also throttle its application reads.
3017 	 * In that case, throttling is done on the SyncTarget only.
3018 	 */
3019 
3020 	/* Even though this may be a resync request, we do add to "read_ee";
3021 	 * "sync_ee" is only used for resync WRITEs.
3022 	 * Add to list early, so debugfs can find this request
3023 	 * even if we have to sleep below. */
3024 	spin_lock_irq(&device->resource->req_lock);
3025 	list_add_tail(&peer_req->w.list, &device->read_ee);
3026 	spin_unlock_irq(&device->resource->req_lock);
3027 
3028 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3029 	if (device->state.peer != R_PRIMARY
3030 	&& drbd_rs_should_slow_down(device, sector, false))
3031 		schedule_timeout_uninterruptible(HZ/10);
3032 	update_receiver_timing_details(connection, drbd_rs_begin_io);
3033 	if (drbd_rs_begin_io(device, sector))
3034 		goto out_free_e;
3035 
3036 submit_for_resync:
3037 	atomic_add(size >> 9, &device->rs_sect_ev);
3038 
3039 submit:
3040 	update_receiver_timing_details(connection, drbd_submit_peer_request);
3041 	inc_unacked(device);
3042 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3043 				     fault_type) == 0)
3044 		return 0;
3045 
3046 	/* don't care for the reason here */
3047 	drbd_err(device, "submit failed, triggering re-connect\n");
3048 
3049 out_free_e:
3050 	spin_lock_irq(&device->resource->req_lock);
3051 	list_del(&peer_req->w.list);
3052 	spin_unlock_irq(&device->resource->req_lock);
3053 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
3054 
3055 	put_ldev(device);
3056 	drbd_free_peer_req(device, peer_req);
3057 	return -EIO;
3058 }
3059 
3060 /**
3061  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3062  */
3063 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3064 {
3065 	struct drbd_device *device = peer_device->device;
3066 	int self, peer, rv = -100;
3067 	unsigned long ch_self, ch_peer;
3068 	enum drbd_after_sb_p after_sb_0p;
3069 
3070 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
3071 	peer = device->p_uuid[UI_BITMAP] & 1;
3072 
3073 	ch_peer = device->p_uuid[UI_SIZE];
3074 	ch_self = device->comm_bm_set;
3075 
3076 	rcu_read_lock();
3077 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3078 	rcu_read_unlock();
3079 	switch (after_sb_0p) {
3080 	case ASB_CONSENSUS:
3081 	case ASB_DISCARD_SECONDARY:
3082 	case ASB_CALL_HELPER:
3083 	case ASB_VIOLENTLY:
3084 		drbd_err(device, "Configuration error.\n");
3085 		break;
3086 	case ASB_DISCONNECT:
3087 		break;
3088 	case ASB_DISCARD_YOUNGER_PRI:
3089 		if (self == 0 && peer == 1) {
3090 			rv = -1;
3091 			break;
3092 		}
3093 		if (self == 1 && peer == 0) {
3094 			rv =  1;
3095 			break;
3096 		}
3097 		/* Else fall through - to one of the other strategies... */
3098 	case ASB_DISCARD_OLDER_PRI:
3099 		if (self == 0 && peer == 1) {
3100 			rv = 1;
3101 			break;
3102 		}
3103 		if (self == 1 && peer == 0) {
3104 			rv = -1;
3105 			break;
3106 		}
3107 		/* Else fall through to one of the other strategies... */
3108 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3109 		     "Using discard-least-changes instead\n");
3110 		/* fall through */
3111 	case ASB_DISCARD_ZERO_CHG:
3112 		if (ch_peer == 0 && ch_self == 0) {
3113 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3114 				? -1 : 1;
3115 			break;
3116 		} else {
3117 			if (ch_peer == 0) { rv =  1; break; }
3118 			if (ch_self == 0) { rv = -1; break; }
3119 		}
3120 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3121 			break;
3122 		/* else, fall through */
3123 	case ASB_DISCARD_LEAST_CHG:
3124 		if	(ch_self < ch_peer)
3125 			rv = -1;
3126 		else if (ch_self > ch_peer)
3127 			rv =  1;
3128 		else /* ( ch_self == ch_peer ) */
3129 		     /* Well, then use something else. */
3130 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3131 				? -1 : 1;
3132 		break;
3133 	case ASB_DISCARD_LOCAL:
3134 		rv = -1;
3135 		break;
3136 	case ASB_DISCARD_REMOTE:
3137 		rv =  1;
3138 	}
3139 
3140 	return rv;
3141 }
3142 
3143 /**
3144  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3145  */
3146 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3147 {
3148 	struct drbd_device *device = peer_device->device;
3149 	int hg, rv = -100;
3150 	enum drbd_after_sb_p after_sb_1p;
3151 
3152 	rcu_read_lock();
3153 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3154 	rcu_read_unlock();
3155 	switch (after_sb_1p) {
3156 	case ASB_DISCARD_YOUNGER_PRI:
3157 	case ASB_DISCARD_OLDER_PRI:
3158 	case ASB_DISCARD_LEAST_CHG:
3159 	case ASB_DISCARD_LOCAL:
3160 	case ASB_DISCARD_REMOTE:
3161 	case ASB_DISCARD_ZERO_CHG:
3162 		drbd_err(device, "Configuration error.\n");
3163 		break;
3164 	case ASB_DISCONNECT:
3165 		break;
3166 	case ASB_CONSENSUS:
3167 		hg = drbd_asb_recover_0p(peer_device);
3168 		if (hg == -1 && device->state.role == R_SECONDARY)
3169 			rv = hg;
3170 		if (hg == 1  && device->state.role == R_PRIMARY)
3171 			rv = hg;
3172 		break;
3173 	case ASB_VIOLENTLY:
3174 		rv = drbd_asb_recover_0p(peer_device);
3175 		break;
3176 	case ASB_DISCARD_SECONDARY:
3177 		return device->state.role == R_PRIMARY ? 1 : -1;
3178 	case ASB_CALL_HELPER:
3179 		hg = drbd_asb_recover_0p(peer_device);
3180 		if (hg == -1 && device->state.role == R_PRIMARY) {
3181 			enum drbd_state_rv rv2;
3182 
3183 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3184 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3185 			  * we do not need to wait for the after state change work either. */
3186 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3187 			if (rv2 != SS_SUCCESS) {
3188 				drbd_khelper(device, "pri-lost-after-sb");
3189 			} else {
3190 				drbd_warn(device, "Successfully gave up primary role.\n");
3191 				rv = hg;
3192 			}
3193 		} else
3194 			rv = hg;
3195 	}
3196 
3197 	return rv;
3198 }
3199 
3200 /**
3201  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3202  */
3203 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3204 {
3205 	struct drbd_device *device = peer_device->device;
3206 	int hg, rv = -100;
3207 	enum drbd_after_sb_p after_sb_2p;
3208 
3209 	rcu_read_lock();
3210 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3211 	rcu_read_unlock();
3212 	switch (after_sb_2p) {
3213 	case ASB_DISCARD_YOUNGER_PRI:
3214 	case ASB_DISCARD_OLDER_PRI:
3215 	case ASB_DISCARD_LEAST_CHG:
3216 	case ASB_DISCARD_LOCAL:
3217 	case ASB_DISCARD_REMOTE:
3218 	case ASB_CONSENSUS:
3219 	case ASB_DISCARD_SECONDARY:
3220 	case ASB_DISCARD_ZERO_CHG:
3221 		drbd_err(device, "Configuration error.\n");
3222 		break;
3223 	case ASB_VIOLENTLY:
3224 		rv = drbd_asb_recover_0p(peer_device);
3225 		break;
3226 	case ASB_DISCONNECT:
3227 		break;
3228 	case ASB_CALL_HELPER:
3229 		hg = drbd_asb_recover_0p(peer_device);
3230 		if (hg == -1) {
3231 			enum drbd_state_rv rv2;
3232 
3233 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3234 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3235 			  * we do not need to wait for the after state change work either. */
3236 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3237 			if (rv2 != SS_SUCCESS) {
3238 				drbd_khelper(device, "pri-lost-after-sb");
3239 			} else {
3240 				drbd_warn(device, "Successfully gave up primary role.\n");
3241 				rv = hg;
3242 			}
3243 		} else
3244 			rv = hg;
3245 	}
3246 
3247 	return rv;
3248 }
3249 
3250 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3251 			   u64 bits, u64 flags)
3252 {
3253 	if (!uuid) {
3254 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3255 		return;
3256 	}
3257 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3258 	     text,
3259 	     (unsigned long long)uuid[UI_CURRENT],
3260 	     (unsigned long long)uuid[UI_BITMAP],
3261 	     (unsigned long long)uuid[UI_HISTORY_START],
3262 	     (unsigned long long)uuid[UI_HISTORY_END],
3263 	     (unsigned long long)bits,
3264 	     (unsigned long long)flags);
3265 }
3266 
3267 /*
3268   100	after split brain try auto recover
3269     2	C_SYNC_SOURCE set BitMap
3270     1	C_SYNC_SOURCE use BitMap
3271     0	no Sync
3272    -1	C_SYNC_TARGET use BitMap
3273    -2	C_SYNC_TARGET set BitMap
3274  -100	after split brain, disconnect
3275 -1000	unrelated data
3276 -1091   requires proto 91
3277 -1096   requires proto 96
3278  */
3279 
3280 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3281 {
3282 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3283 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3284 	u64 self, peer;
3285 	int i, j;
3286 
3287 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3288 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3289 
3290 	*rule_nr = 10;
3291 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3292 		return 0;
3293 
3294 	*rule_nr = 20;
3295 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3296 	     peer != UUID_JUST_CREATED)
3297 		return -2;
3298 
3299 	*rule_nr = 30;
3300 	if (self != UUID_JUST_CREATED &&
3301 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3302 		return 2;
3303 
3304 	if (self == peer) {
3305 		int rct, dc; /* roles at crash time */
3306 
3307 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3308 
3309 			if (connection->agreed_pro_version < 91)
3310 				return -1091;
3311 
3312 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3313 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3314 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3315 				drbd_uuid_move_history(device);
3316 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3317 				device->ldev->md.uuid[UI_BITMAP] = 0;
3318 
3319 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3320 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3321 				*rule_nr = 34;
3322 			} else {
3323 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3324 				*rule_nr = 36;
3325 			}
3326 
3327 			return 1;
3328 		}
3329 
3330 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3331 
3332 			if (connection->agreed_pro_version < 91)
3333 				return -1091;
3334 
3335 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3336 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3337 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3338 
3339 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3340 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3341 				device->p_uuid[UI_BITMAP] = 0UL;
3342 
3343 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3344 				*rule_nr = 35;
3345 			} else {
3346 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3347 				*rule_nr = 37;
3348 			}
3349 
3350 			return -1;
3351 		}
3352 
3353 		/* Common power [off|failure] */
3354 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3355 			(device->p_uuid[UI_FLAGS] & 2);
3356 		/* lowest bit is set when we were primary,
3357 		 * next bit (weight 2) is set when peer was primary */
3358 		*rule_nr = 40;
3359 
3360 		/* Neither has the "crashed primary" flag set,
3361 		 * only a replication link hickup. */
3362 		if (rct == 0)
3363 			return 0;
3364 
3365 		/* Current UUID equal and no bitmap uuid; does not necessarily
3366 		 * mean this was a "simultaneous hard crash", maybe IO was
3367 		 * frozen, so no UUID-bump happened.
3368 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3369 		 * for "new-enough" peer DRBD version. */
3370 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3371 			*rule_nr = 41;
3372 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3373 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3374 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3375 			}
3376 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3377 				/* At least one has the "crashed primary" bit set,
3378 				 * both are primary now, but neither has rotated its UUIDs?
3379 				 * "Can not happen." */
3380 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3381 				return -100;
3382 			}
3383 			if (device->state.role == R_PRIMARY)
3384 				return 1;
3385 			return -1;
3386 		}
3387 
3388 		/* Both are secondary.
3389 		 * Really looks like recovery from simultaneous hard crash.
3390 		 * Check which had been primary before, and arbitrate. */
3391 		switch (rct) {
3392 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3393 		case 1: /*  self_pri && !peer_pri */ return 1;
3394 		case 2: /* !self_pri &&  peer_pri */ return -1;
3395 		case 3: /*  self_pri &&  peer_pri */
3396 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3397 			return dc ? -1 : 1;
3398 		}
3399 	}
3400 
3401 	*rule_nr = 50;
3402 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3403 	if (self == peer)
3404 		return -1;
3405 
3406 	*rule_nr = 51;
3407 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3408 	if (self == peer) {
3409 		if (connection->agreed_pro_version < 96 ?
3410 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3411 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3412 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3413 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3414 			   resync as sync source modifications of the peer's UUIDs. */
3415 
3416 			if (connection->agreed_pro_version < 91)
3417 				return -1091;
3418 
3419 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3420 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3421 
3422 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3423 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3424 
3425 			return -1;
3426 		}
3427 	}
3428 
3429 	*rule_nr = 60;
3430 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3431 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3432 		peer = device->p_uuid[i] & ~((u64)1);
3433 		if (self == peer)
3434 			return -2;
3435 	}
3436 
3437 	*rule_nr = 70;
3438 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3439 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3440 	if (self == peer)
3441 		return 1;
3442 
3443 	*rule_nr = 71;
3444 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3445 	if (self == peer) {
3446 		if (connection->agreed_pro_version < 96 ?
3447 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3448 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3449 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3450 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3451 			   resync as sync source modifications of our UUIDs. */
3452 
3453 			if (connection->agreed_pro_version < 91)
3454 				return -1091;
3455 
3456 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3457 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3458 
3459 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3460 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3461 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3462 
3463 			return 1;
3464 		}
3465 	}
3466 
3467 
3468 	*rule_nr = 80;
3469 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3470 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3471 		self = device->ldev->md.uuid[i] & ~((u64)1);
3472 		if (self == peer)
3473 			return 2;
3474 	}
3475 
3476 	*rule_nr = 90;
3477 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3478 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3479 	if (self == peer && self != ((u64)0))
3480 		return 100;
3481 
3482 	*rule_nr = 100;
3483 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3484 		self = device->ldev->md.uuid[i] & ~((u64)1);
3485 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3486 			peer = device->p_uuid[j] & ~((u64)1);
3487 			if (self == peer)
3488 				return -100;
3489 		}
3490 	}
3491 
3492 	return -1000;
3493 }
3494 
3495 /* drbd_sync_handshake() returns the new conn state on success, or
3496    CONN_MASK (-1) on failure.
3497  */
3498 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3499 					   enum drbd_role peer_role,
3500 					   enum drbd_disk_state peer_disk) __must_hold(local)
3501 {
3502 	struct drbd_device *device = peer_device->device;
3503 	enum drbd_conns rv = C_MASK;
3504 	enum drbd_disk_state mydisk;
3505 	struct net_conf *nc;
3506 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3507 
3508 	mydisk = device->state.disk;
3509 	if (mydisk == D_NEGOTIATING)
3510 		mydisk = device->new_state_tmp.disk;
3511 
3512 	drbd_info(device, "drbd_sync_handshake:\n");
3513 
3514 	spin_lock_irq(&device->ldev->md.uuid_lock);
3515 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3516 	drbd_uuid_dump(device, "peer", device->p_uuid,
3517 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3518 
3519 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3520 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3521 
3522 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3523 
3524 	if (hg == -1000) {
3525 		drbd_alert(device, "Unrelated data, aborting!\n");
3526 		return C_MASK;
3527 	}
3528 	if (hg < -0x10000) {
3529 		int proto, fflags;
3530 		hg = -hg;
3531 		proto = hg & 0xff;
3532 		fflags = (hg >> 8) & 0xff;
3533 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3534 					proto, fflags);
3535 		return C_MASK;
3536 	}
3537 	if (hg < -1000) {
3538 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3539 		return C_MASK;
3540 	}
3541 
3542 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3543 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3544 		int f = (hg == -100) || abs(hg) == 2;
3545 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3546 		if (f)
3547 			hg = hg*2;
3548 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3549 		     hg > 0 ? "source" : "target");
3550 	}
3551 
3552 	if (abs(hg) == 100)
3553 		drbd_khelper(device, "initial-split-brain");
3554 
3555 	rcu_read_lock();
3556 	nc = rcu_dereference(peer_device->connection->net_conf);
3557 	always_asbp = nc->always_asbp;
3558 	rr_conflict = nc->rr_conflict;
3559 	tentative = nc->tentative;
3560 	rcu_read_unlock();
3561 
3562 	if (hg == 100 || (hg == -100 && always_asbp)) {
3563 		int pcount = (device->state.role == R_PRIMARY)
3564 			   + (peer_role == R_PRIMARY);
3565 		int forced = (hg == -100);
3566 
3567 		switch (pcount) {
3568 		case 0:
3569 			hg = drbd_asb_recover_0p(peer_device);
3570 			break;
3571 		case 1:
3572 			hg = drbd_asb_recover_1p(peer_device);
3573 			break;
3574 		case 2:
3575 			hg = drbd_asb_recover_2p(peer_device);
3576 			break;
3577 		}
3578 		if (abs(hg) < 100) {
3579 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3580 			     "automatically solved. Sync from %s node\n",
3581 			     pcount, (hg < 0) ? "peer" : "this");
3582 			if (forced) {
3583 				drbd_warn(device, "Doing a full sync, since"
3584 				     " UUIDs where ambiguous.\n");
3585 				hg = hg*2;
3586 			}
3587 		}
3588 	}
3589 
3590 	if (hg == -100) {
3591 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3592 			hg = -1;
3593 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3594 			hg = 1;
3595 
3596 		if (abs(hg) < 100)
3597 			drbd_warn(device, "Split-Brain detected, manually solved. "
3598 			     "Sync from %s node\n",
3599 			     (hg < 0) ? "peer" : "this");
3600 	}
3601 
3602 	if (hg == -100) {
3603 		/* FIXME this log message is not correct if we end up here
3604 		 * after an attempted attach on a diskless node.
3605 		 * We just refuse to attach -- well, we drop the "connection"
3606 		 * to that disk, in a way... */
3607 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3608 		drbd_khelper(device, "split-brain");
3609 		return C_MASK;
3610 	}
3611 
3612 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3613 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3614 		return C_MASK;
3615 	}
3616 
3617 	if (hg < 0 && /* by intention we do not use mydisk here. */
3618 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3619 		switch (rr_conflict) {
3620 		case ASB_CALL_HELPER:
3621 			drbd_khelper(device, "pri-lost");
3622 			/* fall through */
3623 		case ASB_DISCONNECT:
3624 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3625 			return C_MASK;
3626 		case ASB_VIOLENTLY:
3627 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3628 			     "assumption\n");
3629 		}
3630 	}
3631 
3632 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3633 		if (hg == 0)
3634 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3635 		else
3636 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3637 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3638 				 abs(hg) >= 2 ? "full" : "bit-map based");
3639 		return C_MASK;
3640 	}
3641 
3642 	if (abs(hg) >= 2) {
3643 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3644 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3645 					BM_LOCKED_SET_ALLOWED))
3646 			return C_MASK;
3647 	}
3648 
3649 	if (hg > 0) { /* become sync source. */
3650 		rv = C_WF_BITMAP_S;
3651 	} else if (hg < 0) { /* become sync target */
3652 		rv = C_WF_BITMAP_T;
3653 	} else {
3654 		rv = C_CONNECTED;
3655 		if (drbd_bm_total_weight(device)) {
3656 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3657 			     drbd_bm_total_weight(device));
3658 		}
3659 	}
3660 
3661 	return rv;
3662 }
3663 
3664 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3665 {
3666 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3667 	if (peer == ASB_DISCARD_REMOTE)
3668 		return ASB_DISCARD_LOCAL;
3669 
3670 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3671 	if (peer == ASB_DISCARD_LOCAL)
3672 		return ASB_DISCARD_REMOTE;
3673 
3674 	/* everything else is valid if they are equal on both sides. */
3675 	return peer;
3676 }
3677 
3678 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3679 {
3680 	struct p_protocol *p = pi->data;
3681 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3682 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3683 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3684 	char integrity_alg[SHARED_SECRET_MAX] = "";
3685 	struct crypto_shash *peer_integrity_tfm = NULL;
3686 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3687 
3688 	p_proto		= be32_to_cpu(p->protocol);
3689 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3690 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3691 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3692 	p_two_primaries = be32_to_cpu(p->two_primaries);
3693 	cf		= be32_to_cpu(p->conn_flags);
3694 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3695 
3696 	if (connection->agreed_pro_version >= 87) {
3697 		int err;
3698 
3699 		if (pi->size > sizeof(integrity_alg))
3700 			return -EIO;
3701 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3702 		if (err)
3703 			return err;
3704 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3705 	}
3706 
3707 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3708 		clear_bit(CONN_DRY_RUN, &connection->flags);
3709 
3710 		if (cf & CF_DRY_RUN)
3711 			set_bit(CONN_DRY_RUN, &connection->flags);
3712 
3713 		rcu_read_lock();
3714 		nc = rcu_dereference(connection->net_conf);
3715 
3716 		if (p_proto != nc->wire_protocol) {
3717 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3718 			goto disconnect_rcu_unlock;
3719 		}
3720 
3721 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3722 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3723 			goto disconnect_rcu_unlock;
3724 		}
3725 
3726 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3727 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3728 			goto disconnect_rcu_unlock;
3729 		}
3730 
3731 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3732 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3733 			goto disconnect_rcu_unlock;
3734 		}
3735 
3736 		if (p_discard_my_data && nc->discard_my_data) {
3737 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3738 			goto disconnect_rcu_unlock;
3739 		}
3740 
3741 		if (p_two_primaries != nc->two_primaries) {
3742 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3743 			goto disconnect_rcu_unlock;
3744 		}
3745 
3746 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3747 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3748 			goto disconnect_rcu_unlock;
3749 		}
3750 
3751 		rcu_read_unlock();
3752 	}
3753 
3754 	if (integrity_alg[0]) {
3755 		int hash_size;
3756 
3757 		/*
3758 		 * We can only change the peer data integrity algorithm
3759 		 * here.  Changing our own data integrity algorithm
3760 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3761 		 * the same time; otherwise, the peer has no way to
3762 		 * tell between which packets the algorithm should
3763 		 * change.
3764 		 */
3765 
3766 		peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3767 		if (IS_ERR(peer_integrity_tfm)) {
3768 			peer_integrity_tfm = NULL;
3769 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3770 				 integrity_alg);
3771 			goto disconnect;
3772 		}
3773 
3774 		hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3775 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3776 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3777 		if (!(int_dig_in && int_dig_vv)) {
3778 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3779 			goto disconnect;
3780 		}
3781 	}
3782 
3783 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3784 	if (!new_net_conf) {
3785 		drbd_err(connection, "Allocation of new net_conf failed\n");
3786 		goto disconnect;
3787 	}
3788 
3789 	mutex_lock(&connection->data.mutex);
3790 	mutex_lock(&connection->resource->conf_update);
3791 	old_net_conf = connection->net_conf;
3792 	*new_net_conf = *old_net_conf;
3793 
3794 	new_net_conf->wire_protocol = p_proto;
3795 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3796 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3797 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3798 	new_net_conf->two_primaries = p_two_primaries;
3799 
3800 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3801 	mutex_unlock(&connection->resource->conf_update);
3802 	mutex_unlock(&connection->data.mutex);
3803 
3804 	crypto_free_shash(connection->peer_integrity_tfm);
3805 	kfree(connection->int_dig_in);
3806 	kfree(connection->int_dig_vv);
3807 	connection->peer_integrity_tfm = peer_integrity_tfm;
3808 	connection->int_dig_in = int_dig_in;
3809 	connection->int_dig_vv = int_dig_vv;
3810 
3811 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3812 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3813 			  integrity_alg[0] ? integrity_alg : "(none)");
3814 
3815 	synchronize_rcu();
3816 	kfree(old_net_conf);
3817 	return 0;
3818 
3819 disconnect_rcu_unlock:
3820 	rcu_read_unlock();
3821 disconnect:
3822 	crypto_free_shash(peer_integrity_tfm);
3823 	kfree(int_dig_in);
3824 	kfree(int_dig_vv);
3825 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3826 	return -EIO;
3827 }
3828 
3829 /* helper function
3830  * input: alg name, feature name
3831  * return: NULL (alg name was "")
3832  *         ERR_PTR(error) if something goes wrong
3833  *         or the crypto hash ptr, if it worked out ok. */
3834 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3835 		const struct drbd_device *device,
3836 		const char *alg, const char *name)
3837 {
3838 	struct crypto_shash *tfm;
3839 
3840 	if (!alg[0])
3841 		return NULL;
3842 
3843 	tfm = crypto_alloc_shash(alg, 0, 0);
3844 	if (IS_ERR(tfm)) {
3845 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3846 			alg, name, PTR_ERR(tfm));
3847 		return tfm;
3848 	}
3849 	return tfm;
3850 }
3851 
3852 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3853 {
3854 	void *buffer = connection->data.rbuf;
3855 	int size = pi->size;
3856 
3857 	while (size) {
3858 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3859 		s = drbd_recv(connection, buffer, s);
3860 		if (s <= 0) {
3861 			if (s < 0)
3862 				return s;
3863 			break;
3864 		}
3865 		size -= s;
3866 	}
3867 	if (size)
3868 		return -EIO;
3869 	return 0;
3870 }
3871 
3872 /*
3873  * config_unknown_volume  -  device configuration command for unknown volume
3874  *
3875  * When a device is added to an existing connection, the node on which the
3876  * device is added first will send configuration commands to its peer but the
3877  * peer will not know about the device yet.  It will warn and ignore these
3878  * commands.  Once the device is added on the second node, the second node will
3879  * send the same device configuration commands, but in the other direction.
3880  *
3881  * (We can also end up here if drbd is misconfigured.)
3882  */
3883 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3884 {
3885 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3886 		  cmdname(pi->cmd), pi->vnr);
3887 	return ignore_remaining_packet(connection, pi);
3888 }
3889 
3890 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3891 {
3892 	struct drbd_peer_device *peer_device;
3893 	struct drbd_device *device;
3894 	struct p_rs_param_95 *p;
3895 	unsigned int header_size, data_size, exp_max_sz;
3896 	struct crypto_shash *verify_tfm = NULL;
3897 	struct crypto_shash *csums_tfm = NULL;
3898 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3899 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3900 	const int apv = connection->agreed_pro_version;
3901 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3902 	int fifo_size = 0;
3903 	int err;
3904 
3905 	peer_device = conn_peer_device(connection, pi->vnr);
3906 	if (!peer_device)
3907 		return config_unknown_volume(connection, pi);
3908 	device = peer_device->device;
3909 
3910 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3911 		    : apv == 88 ? sizeof(struct p_rs_param)
3912 					+ SHARED_SECRET_MAX
3913 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3914 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3915 
3916 	if (pi->size > exp_max_sz) {
3917 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3918 		    pi->size, exp_max_sz);
3919 		return -EIO;
3920 	}
3921 
3922 	if (apv <= 88) {
3923 		header_size = sizeof(struct p_rs_param);
3924 		data_size = pi->size - header_size;
3925 	} else if (apv <= 94) {
3926 		header_size = sizeof(struct p_rs_param_89);
3927 		data_size = pi->size - header_size;
3928 		D_ASSERT(device, data_size == 0);
3929 	} else {
3930 		header_size = sizeof(struct p_rs_param_95);
3931 		data_size = pi->size - header_size;
3932 		D_ASSERT(device, data_size == 0);
3933 	}
3934 
3935 	/* initialize verify_alg and csums_alg */
3936 	p = pi->data;
3937 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3938 
3939 	err = drbd_recv_all(peer_device->connection, p, header_size);
3940 	if (err)
3941 		return err;
3942 
3943 	mutex_lock(&connection->resource->conf_update);
3944 	old_net_conf = peer_device->connection->net_conf;
3945 	if (get_ldev(device)) {
3946 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3947 		if (!new_disk_conf) {
3948 			put_ldev(device);
3949 			mutex_unlock(&connection->resource->conf_update);
3950 			drbd_err(device, "Allocation of new disk_conf failed\n");
3951 			return -ENOMEM;
3952 		}
3953 
3954 		old_disk_conf = device->ldev->disk_conf;
3955 		*new_disk_conf = *old_disk_conf;
3956 
3957 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3958 	}
3959 
3960 	if (apv >= 88) {
3961 		if (apv == 88) {
3962 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3963 				drbd_err(device, "verify-alg of wrong size, "
3964 					"peer wants %u, accepting only up to %u byte\n",
3965 					data_size, SHARED_SECRET_MAX);
3966 				err = -EIO;
3967 				goto reconnect;
3968 			}
3969 
3970 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3971 			if (err)
3972 				goto reconnect;
3973 			/* we expect NUL terminated string */
3974 			/* but just in case someone tries to be evil */
3975 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3976 			p->verify_alg[data_size-1] = 0;
3977 
3978 		} else /* apv >= 89 */ {
3979 			/* we still expect NUL terminated strings */
3980 			/* but just in case someone tries to be evil */
3981 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3982 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3983 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3984 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3985 		}
3986 
3987 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3988 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3989 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3990 				    old_net_conf->verify_alg, p->verify_alg);
3991 				goto disconnect;
3992 			}
3993 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3994 					p->verify_alg, "verify-alg");
3995 			if (IS_ERR(verify_tfm)) {
3996 				verify_tfm = NULL;
3997 				goto disconnect;
3998 			}
3999 		}
4000 
4001 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
4002 			if (device->state.conn == C_WF_REPORT_PARAMS) {
4003 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
4004 				    old_net_conf->csums_alg, p->csums_alg);
4005 				goto disconnect;
4006 			}
4007 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
4008 					p->csums_alg, "csums-alg");
4009 			if (IS_ERR(csums_tfm)) {
4010 				csums_tfm = NULL;
4011 				goto disconnect;
4012 			}
4013 		}
4014 
4015 		if (apv > 94 && new_disk_conf) {
4016 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4017 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4018 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4019 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4020 
4021 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4022 			if (fifo_size != device->rs_plan_s->size) {
4023 				new_plan = fifo_alloc(fifo_size);
4024 				if (!new_plan) {
4025 					drbd_err(device, "kmalloc of fifo_buffer failed");
4026 					put_ldev(device);
4027 					goto disconnect;
4028 				}
4029 			}
4030 		}
4031 
4032 		if (verify_tfm || csums_tfm) {
4033 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4034 			if (!new_net_conf) {
4035 				drbd_err(device, "Allocation of new net_conf failed\n");
4036 				goto disconnect;
4037 			}
4038 
4039 			*new_net_conf = *old_net_conf;
4040 
4041 			if (verify_tfm) {
4042 				strcpy(new_net_conf->verify_alg, p->verify_alg);
4043 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4044 				crypto_free_shash(peer_device->connection->verify_tfm);
4045 				peer_device->connection->verify_tfm = verify_tfm;
4046 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4047 			}
4048 			if (csums_tfm) {
4049 				strcpy(new_net_conf->csums_alg, p->csums_alg);
4050 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4051 				crypto_free_shash(peer_device->connection->csums_tfm);
4052 				peer_device->connection->csums_tfm = csums_tfm;
4053 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4054 			}
4055 			rcu_assign_pointer(connection->net_conf, new_net_conf);
4056 		}
4057 	}
4058 
4059 	if (new_disk_conf) {
4060 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4061 		put_ldev(device);
4062 	}
4063 
4064 	if (new_plan) {
4065 		old_plan = device->rs_plan_s;
4066 		rcu_assign_pointer(device->rs_plan_s, new_plan);
4067 	}
4068 
4069 	mutex_unlock(&connection->resource->conf_update);
4070 	synchronize_rcu();
4071 	if (new_net_conf)
4072 		kfree(old_net_conf);
4073 	kfree(old_disk_conf);
4074 	kfree(old_plan);
4075 
4076 	return 0;
4077 
4078 reconnect:
4079 	if (new_disk_conf) {
4080 		put_ldev(device);
4081 		kfree(new_disk_conf);
4082 	}
4083 	mutex_unlock(&connection->resource->conf_update);
4084 	return -EIO;
4085 
4086 disconnect:
4087 	kfree(new_plan);
4088 	if (new_disk_conf) {
4089 		put_ldev(device);
4090 		kfree(new_disk_conf);
4091 	}
4092 	mutex_unlock(&connection->resource->conf_update);
4093 	/* just for completeness: actually not needed,
4094 	 * as this is not reached if csums_tfm was ok. */
4095 	crypto_free_shash(csums_tfm);
4096 	/* but free the verify_tfm again, if csums_tfm did not work out */
4097 	crypto_free_shash(verify_tfm);
4098 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4099 	return -EIO;
4100 }
4101 
4102 /* warn if the arguments differ by more than 12.5% */
4103 static void warn_if_differ_considerably(struct drbd_device *device,
4104 	const char *s, sector_t a, sector_t b)
4105 {
4106 	sector_t d;
4107 	if (a == 0 || b == 0)
4108 		return;
4109 	d = (a > b) ? (a - b) : (b - a);
4110 	if (d > (a>>3) || d > (b>>3))
4111 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4112 		     (unsigned long long)a, (unsigned long long)b);
4113 }
4114 
4115 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4116 {
4117 	struct drbd_peer_device *peer_device;
4118 	struct drbd_device *device;
4119 	struct p_sizes *p = pi->data;
4120 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4121 	enum determine_dev_size dd = DS_UNCHANGED;
4122 	sector_t p_size, p_usize, p_csize, my_usize;
4123 	sector_t new_size, cur_size;
4124 	int ldsc = 0; /* local disk size changed */
4125 	enum dds_flags ddsf;
4126 
4127 	peer_device = conn_peer_device(connection, pi->vnr);
4128 	if (!peer_device)
4129 		return config_unknown_volume(connection, pi);
4130 	device = peer_device->device;
4131 	cur_size = drbd_get_capacity(device->this_bdev);
4132 
4133 	p_size = be64_to_cpu(p->d_size);
4134 	p_usize = be64_to_cpu(p->u_size);
4135 	p_csize = be64_to_cpu(p->c_size);
4136 
4137 	/* just store the peer's disk size for now.
4138 	 * we still need to figure out whether we accept that. */
4139 	device->p_size = p_size;
4140 
4141 	if (get_ldev(device)) {
4142 		rcu_read_lock();
4143 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4144 		rcu_read_unlock();
4145 
4146 		warn_if_differ_considerably(device, "lower level device sizes",
4147 			   p_size, drbd_get_max_capacity(device->ldev));
4148 		warn_if_differ_considerably(device, "user requested size",
4149 					    p_usize, my_usize);
4150 
4151 		/* if this is the first connect, or an otherwise expected
4152 		 * param exchange, choose the minimum */
4153 		if (device->state.conn == C_WF_REPORT_PARAMS)
4154 			p_usize = min_not_zero(my_usize, p_usize);
4155 
4156 		/* Never shrink a device with usable data during connect,
4157 		 * or "attach" on the peer.
4158 		 * But allow online shrinking if we are connected. */
4159 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4160 		if (new_size < cur_size &&
4161 		    device->state.disk >= D_OUTDATED &&
4162 		    (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4163 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4164 					(unsigned long long)new_size, (unsigned long long)cur_size);
4165 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4166 			put_ldev(device);
4167 			return -EIO;
4168 		}
4169 
4170 		if (my_usize != p_usize) {
4171 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4172 
4173 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4174 			if (!new_disk_conf) {
4175 				drbd_err(device, "Allocation of new disk_conf failed\n");
4176 				put_ldev(device);
4177 				return -ENOMEM;
4178 			}
4179 
4180 			mutex_lock(&connection->resource->conf_update);
4181 			old_disk_conf = device->ldev->disk_conf;
4182 			*new_disk_conf = *old_disk_conf;
4183 			new_disk_conf->disk_size = p_usize;
4184 
4185 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4186 			mutex_unlock(&connection->resource->conf_update);
4187 			synchronize_rcu();
4188 			kfree(old_disk_conf);
4189 
4190 			drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4191 				 (unsigned long)p_usize, (unsigned long)my_usize);
4192 		}
4193 
4194 		put_ldev(device);
4195 	}
4196 
4197 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4198 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4199 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4200 	   drbd_reconsider_queue_parameters(), we can be sure that after
4201 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4202 
4203 	ddsf = be16_to_cpu(p->dds_flags);
4204 	if (get_ldev(device)) {
4205 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4206 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4207 		put_ldev(device);
4208 		if (dd == DS_ERROR)
4209 			return -EIO;
4210 		drbd_md_sync(device);
4211 	} else {
4212 		/*
4213 		 * I am diskless, need to accept the peer's *current* size.
4214 		 * I must NOT accept the peers backing disk size,
4215 		 * it may have been larger than mine all along...
4216 		 *
4217 		 * At this point, the peer knows more about my disk, or at
4218 		 * least about what we last agreed upon, than myself.
4219 		 * So if his c_size is less than his d_size, the most likely
4220 		 * reason is that *my* d_size was smaller last time we checked.
4221 		 *
4222 		 * However, if he sends a zero current size,
4223 		 * take his (user-capped or) backing disk size anyways.
4224 		 *
4225 		 * Unless of course he does not have a disk himself.
4226 		 * In which case we ignore this completely.
4227 		 */
4228 		sector_t new_size = p_csize ?: p_usize ?: p_size;
4229 		drbd_reconsider_queue_parameters(device, NULL, o);
4230 		if (new_size == 0) {
4231 			/* Ignore, peer does not know nothing. */
4232 		} else if (new_size == cur_size) {
4233 			/* nothing to do */
4234 		} else if (cur_size != 0 && p_size == 0) {
4235 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4236 					(unsigned long long)new_size, (unsigned long long)cur_size);
4237 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4238 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4239 					(unsigned long long)new_size, (unsigned long long)cur_size);
4240 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4241 			return -EIO;
4242 		} else {
4243 			/* I believe the peer, if
4244 			 *  - I don't have a current size myself
4245 			 *  - we agree on the size anyways
4246 			 *  - I do have a current size, am Secondary,
4247 			 *    and he has the only disk
4248 			 *  - I do have a current size, am Primary,
4249 			 *    and he has the only disk,
4250 			 *    which is larger than my current size
4251 			 */
4252 			drbd_set_my_capacity(device, new_size);
4253 		}
4254 	}
4255 
4256 	if (get_ldev(device)) {
4257 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4258 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4259 			ldsc = 1;
4260 		}
4261 
4262 		put_ldev(device);
4263 	}
4264 
4265 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4266 		if (be64_to_cpu(p->c_size) !=
4267 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4268 			/* we have different sizes, probably peer
4269 			 * needs to know my new size... */
4270 			drbd_send_sizes(peer_device, 0, ddsf);
4271 		}
4272 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4273 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4274 			if (device->state.pdsk >= D_INCONSISTENT &&
4275 			    device->state.disk >= D_INCONSISTENT) {
4276 				if (ddsf & DDSF_NO_RESYNC)
4277 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4278 				else
4279 					resync_after_online_grow(device);
4280 			} else
4281 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4282 		}
4283 	}
4284 
4285 	return 0;
4286 }
4287 
4288 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4289 {
4290 	struct drbd_peer_device *peer_device;
4291 	struct drbd_device *device;
4292 	struct p_uuids *p = pi->data;
4293 	u64 *p_uuid;
4294 	int i, updated_uuids = 0;
4295 
4296 	peer_device = conn_peer_device(connection, pi->vnr);
4297 	if (!peer_device)
4298 		return config_unknown_volume(connection, pi);
4299 	device = peer_device->device;
4300 
4301 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4302 	if (!p_uuid) {
4303 		drbd_err(device, "kmalloc of p_uuid failed\n");
4304 		return false;
4305 	}
4306 
4307 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4308 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4309 
4310 	kfree(device->p_uuid);
4311 	device->p_uuid = p_uuid;
4312 
4313 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4314 	    device->state.disk < D_INCONSISTENT &&
4315 	    device->state.role == R_PRIMARY &&
4316 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4317 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4318 		    (unsigned long long)device->ed_uuid);
4319 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4320 		return -EIO;
4321 	}
4322 
4323 	if (get_ldev(device)) {
4324 		int skip_initial_sync =
4325 			device->state.conn == C_CONNECTED &&
4326 			peer_device->connection->agreed_pro_version >= 90 &&
4327 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4328 			(p_uuid[UI_FLAGS] & 8);
4329 		if (skip_initial_sync) {
4330 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4331 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4332 					"clear_n_write from receive_uuids",
4333 					BM_LOCKED_TEST_ALLOWED);
4334 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4335 			_drbd_uuid_set(device, UI_BITMAP, 0);
4336 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4337 					CS_VERBOSE, NULL);
4338 			drbd_md_sync(device);
4339 			updated_uuids = 1;
4340 		}
4341 		put_ldev(device);
4342 	} else if (device->state.disk < D_INCONSISTENT &&
4343 		   device->state.role == R_PRIMARY) {
4344 		/* I am a diskless primary, the peer just created a new current UUID
4345 		   for me. */
4346 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4347 	}
4348 
4349 	/* Before we test for the disk state, we should wait until an eventually
4350 	   ongoing cluster wide state change is finished. That is important if
4351 	   we are primary and are detaching from our disk. We need to see the
4352 	   new disk state... */
4353 	mutex_lock(device->state_mutex);
4354 	mutex_unlock(device->state_mutex);
4355 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4356 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4357 
4358 	if (updated_uuids)
4359 		drbd_print_uuids(device, "receiver updated UUIDs to");
4360 
4361 	return 0;
4362 }
4363 
4364 /**
4365  * convert_state() - Converts the peer's view of the cluster state to our point of view
4366  * @ps:		The state as seen by the peer.
4367  */
4368 static union drbd_state convert_state(union drbd_state ps)
4369 {
4370 	union drbd_state ms;
4371 
4372 	static enum drbd_conns c_tab[] = {
4373 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4374 		[C_CONNECTED] = C_CONNECTED,
4375 
4376 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4377 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4378 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4379 		[C_VERIFY_S]       = C_VERIFY_T,
4380 		[C_MASK]   = C_MASK,
4381 	};
4382 
4383 	ms.i = ps.i;
4384 
4385 	ms.conn = c_tab[ps.conn];
4386 	ms.peer = ps.role;
4387 	ms.role = ps.peer;
4388 	ms.pdsk = ps.disk;
4389 	ms.disk = ps.pdsk;
4390 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4391 
4392 	return ms;
4393 }
4394 
4395 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4396 {
4397 	struct drbd_peer_device *peer_device;
4398 	struct drbd_device *device;
4399 	struct p_req_state *p = pi->data;
4400 	union drbd_state mask, val;
4401 	enum drbd_state_rv rv;
4402 
4403 	peer_device = conn_peer_device(connection, pi->vnr);
4404 	if (!peer_device)
4405 		return -EIO;
4406 	device = peer_device->device;
4407 
4408 	mask.i = be32_to_cpu(p->mask);
4409 	val.i = be32_to_cpu(p->val);
4410 
4411 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4412 	    mutex_is_locked(device->state_mutex)) {
4413 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4414 		return 0;
4415 	}
4416 
4417 	mask = convert_state(mask);
4418 	val = convert_state(val);
4419 
4420 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4421 	drbd_send_sr_reply(peer_device, rv);
4422 
4423 	drbd_md_sync(device);
4424 
4425 	return 0;
4426 }
4427 
4428 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4429 {
4430 	struct p_req_state *p = pi->data;
4431 	union drbd_state mask, val;
4432 	enum drbd_state_rv rv;
4433 
4434 	mask.i = be32_to_cpu(p->mask);
4435 	val.i = be32_to_cpu(p->val);
4436 
4437 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4438 	    mutex_is_locked(&connection->cstate_mutex)) {
4439 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4440 		return 0;
4441 	}
4442 
4443 	mask = convert_state(mask);
4444 	val = convert_state(val);
4445 
4446 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4447 	conn_send_sr_reply(connection, rv);
4448 
4449 	return 0;
4450 }
4451 
4452 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4453 {
4454 	struct drbd_peer_device *peer_device;
4455 	struct drbd_device *device;
4456 	struct p_state *p = pi->data;
4457 	union drbd_state os, ns, peer_state;
4458 	enum drbd_disk_state real_peer_disk;
4459 	enum chg_state_flags cs_flags;
4460 	int rv;
4461 
4462 	peer_device = conn_peer_device(connection, pi->vnr);
4463 	if (!peer_device)
4464 		return config_unknown_volume(connection, pi);
4465 	device = peer_device->device;
4466 
4467 	peer_state.i = be32_to_cpu(p->state);
4468 
4469 	real_peer_disk = peer_state.disk;
4470 	if (peer_state.disk == D_NEGOTIATING) {
4471 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4472 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4473 	}
4474 
4475 	spin_lock_irq(&device->resource->req_lock);
4476  retry:
4477 	os = ns = drbd_read_state(device);
4478 	spin_unlock_irq(&device->resource->req_lock);
4479 
4480 	/* If some other part of the code (ack_receiver thread, timeout)
4481 	 * already decided to close the connection again,
4482 	 * we must not "re-establish" it here. */
4483 	if (os.conn <= C_TEAR_DOWN)
4484 		return -ECONNRESET;
4485 
4486 	/* If this is the "end of sync" confirmation, usually the peer disk
4487 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4488 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4489 	 * unpause-sync events has been "just right", the peer disk may
4490 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4491 	 */
4492 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4493 	    real_peer_disk == D_UP_TO_DATE &&
4494 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4495 		/* If we are (becoming) SyncSource, but peer is still in sync
4496 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4497 		 * will change to inconsistent once the peer reaches active
4498 		 * syncing states.
4499 		 * It may have changed syncer-paused flags, however, so we
4500 		 * cannot ignore this completely. */
4501 		if (peer_state.conn > C_CONNECTED &&
4502 		    peer_state.conn < C_SYNC_SOURCE)
4503 			real_peer_disk = D_INCONSISTENT;
4504 
4505 		/* if peer_state changes to connected at the same time,
4506 		 * it explicitly notifies us that it finished resync.
4507 		 * Maybe we should finish it up, too? */
4508 		else if (os.conn >= C_SYNC_SOURCE &&
4509 			 peer_state.conn == C_CONNECTED) {
4510 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4511 				drbd_resync_finished(device);
4512 			return 0;
4513 		}
4514 	}
4515 
4516 	/* explicit verify finished notification, stop sector reached. */
4517 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4518 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4519 		ov_out_of_sync_print(device);
4520 		drbd_resync_finished(device);
4521 		return 0;
4522 	}
4523 
4524 	/* peer says his disk is inconsistent, while we think it is uptodate,
4525 	 * and this happens while the peer still thinks we have a sync going on,
4526 	 * but we think we are already done with the sync.
4527 	 * We ignore this to avoid flapping pdsk.
4528 	 * This should not happen, if the peer is a recent version of drbd. */
4529 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4530 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4531 		real_peer_disk = D_UP_TO_DATE;
4532 
4533 	if (ns.conn == C_WF_REPORT_PARAMS)
4534 		ns.conn = C_CONNECTED;
4535 
4536 	if (peer_state.conn == C_AHEAD)
4537 		ns.conn = C_BEHIND;
4538 
4539 	/* TODO:
4540 	 * if (primary and diskless and peer uuid != effective uuid)
4541 	 *     abort attach on peer;
4542 	 *
4543 	 * If this node does not have good data, was already connected, but
4544 	 * the peer did a late attach only now, trying to "negotiate" with me,
4545 	 * AND I am currently Primary, possibly frozen, with some specific
4546 	 * "effective" uuid, this should never be reached, really, because
4547 	 * we first send the uuids, then the current state.
4548 	 *
4549 	 * In this scenario, we already dropped the connection hard
4550 	 * when we received the unsuitable uuids (receive_uuids().
4551 	 *
4552 	 * Should we want to change this, that is: not drop the connection in
4553 	 * receive_uuids() already, then we would need to add a branch here
4554 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4555 	 * this node is currently Diskless Primary.
4556 	 */
4557 
4558 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4559 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4560 		int cr; /* consider resync */
4561 
4562 		/* if we established a new connection */
4563 		cr  = (os.conn < C_CONNECTED);
4564 		/* if we had an established connection
4565 		 * and one of the nodes newly attaches a disk */
4566 		cr |= (os.conn == C_CONNECTED &&
4567 		       (peer_state.disk == D_NEGOTIATING ||
4568 			os.disk == D_NEGOTIATING));
4569 		/* if we have both been inconsistent, and the peer has been
4570 		 * forced to be UpToDate with --force */
4571 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4572 		/* if we had been plain connected, and the admin requested to
4573 		 * start a sync by "invalidate" or "invalidate-remote" */
4574 		cr |= (os.conn == C_CONNECTED &&
4575 				(peer_state.conn >= C_STARTING_SYNC_S &&
4576 				 peer_state.conn <= C_WF_BITMAP_T));
4577 
4578 		if (cr)
4579 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4580 
4581 		put_ldev(device);
4582 		if (ns.conn == C_MASK) {
4583 			ns.conn = C_CONNECTED;
4584 			if (device->state.disk == D_NEGOTIATING) {
4585 				drbd_force_state(device, NS(disk, D_FAILED));
4586 			} else if (peer_state.disk == D_NEGOTIATING) {
4587 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4588 				peer_state.disk = D_DISKLESS;
4589 				real_peer_disk = D_DISKLESS;
4590 			} else {
4591 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4592 					return -EIO;
4593 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4594 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4595 				return -EIO;
4596 			}
4597 		}
4598 	}
4599 
4600 	spin_lock_irq(&device->resource->req_lock);
4601 	if (os.i != drbd_read_state(device).i)
4602 		goto retry;
4603 	clear_bit(CONSIDER_RESYNC, &device->flags);
4604 	ns.peer = peer_state.role;
4605 	ns.pdsk = real_peer_disk;
4606 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4607 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4608 		ns.disk = device->new_state_tmp.disk;
4609 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4610 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4611 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4612 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4613 		   for temporal network outages! */
4614 		spin_unlock_irq(&device->resource->req_lock);
4615 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4616 		tl_clear(peer_device->connection);
4617 		drbd_uuid_new_current(device);
4618 		clear_bit(NEW_CUR_UUID, &device->flags);
4619 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4620 		return -EIO;
4621 	}
4622 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4623 	ns = drbd_read_state(device);
4624 	spin_unlock_irq(&device->resource->req_lock);
4625 
4626 	if (rv < SS_SUCCESS) {
4627 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4628 		return -EIO;
4629 	}
4630 
4631 	if (os.conn > C_WF_REPORT_PARAMS) {
4632 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4633 		    peer_state.disk != D_NEGOTIATING ) {
4634 			/* we want resync, peer has not yet decided to sync... */
4635 			/* Nowadays only used when forcing a node into primary role and
4636 			   setting its disk to UpToDate with that */
4637 			drbd_send_uuids(peer_device);
4638 			drbd_send_current_state(peer_device);
4639 		}
4640 	}
4641 
4642 	clear_bit(DISCARD_MY_DATA, &device->flags);
4643 
4644 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4645 
4646 	return 0;
4647 }
4648 
4649 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4650 {
4651 	struct drbd_peer_device *peer_device;
4652 	struct drbd_device *device;
4653 	struct p_rs_uuid *p = pi->data;
4654 
4655 	peer_device = conn_peer_device(connection, pi->vnr);
4656 	if (!peer_device)
4657 		return -EIO;
4658 	device = peer_device->device;
4659 
4660 	wait_event(device->misc_wait,
4661 		   device->state.conn == C_WF_SYNC_UUID ||
4662 		   device->state.conn == C_BEHIND ||
4663 		   device->state.conn < C_CONNECTED ||
4664 		   device->state.disk < D_NEGOTIATING);
4665 
4666 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4667 
4668 	/* Here the _drbd_uuid_ functions are right, current should
4669 	   _not_ be rotated into the history */
4670 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4671 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4672 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4673 
4674 		drbd_print_uuids(device, "updated sync uuid");
4675 		drbd_start_resync(device, C_SYNC_TARGET);
4676 
4677 		put_ldev(device);
4678 	} else
4679 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4680 
4681 	return 0;
4682 }
4683 
4684 /**
4685  * receive_bitmap_plain
4686  *
4687  * Return 0 when done, 1 when another iteration is needed, and a negative error
4688  * code upon failure.
4689  */
4690 static int
4691 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4692 		     unsigned long *p, struct bm_xfer_ctx *c)
4693 {
4694 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4695 				 drbd_header_size(peer_device->connection);
4696 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4697 				       c->bm_words - c->word_offset);
4698 	unsigned int want = num_words * sizeof(*p);
4699 	int err;
4700 
4701 	if (want != size) {
4702 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4703 		return -EIO;
4704 	}
4705 	if (want == 0)
4706 		return 0;
4707 	err = drbd_recv_all(peer_device->connection, p, want);
4708 	if (err)
4709 		return err;
4710 
4711 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4712 
4713 	c->word_offset += num_words;
4714 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4715 	if (c->bit_offset > c->bm_bits)
4716 		c->bit_offset = c->bm_bits;
4717 
4718 	return 1;
4719 }
4720 
4721 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4722 {
4723 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4724 }
4725 
4726 static int dcbp_get_start(struct p_compressed_bm *p)
4727 {
4728 	return (p->encoding & 0x80) != 0;
4729 }
4730 
4731 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4732 {
4733 	return (p->encoding >> 4) & 0x7;
4734 }
4735 
4736 /**
4737  * recv_bm_rle_bits
4738  *
4739  * Return 0 when done, 1 when another iteration is needed, and a negative error
4740  * code upon failure.
4741  */
4742 static int
4743 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4744 		struct p_compressed_bm *p,
4745 		 struct bm_xfer_ctx *c,
4746 		 unsigned int len)
4747 {
4748 	struct bitstream bs;
4749 	u64 look_ahead;
4750 	u64 rl;
4751 	u64 tmp;
4752 	unsigned long s = c->bit_offset;
4753 	unsigned long e;
4754 	int toggle = dcbp_get_start(p);
4755 	int have;
4756 	int bits;
4757 
4758 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4759 
4760 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4761 	if (bits < 0)
4762 		return -EIO;
4763 
4764 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4765 		bits = vli_decode_bits(&rl, look_ahead);
4766 		if (bits <= 0)
4767 			return -EIO;
4768 
4769 		if (toggle) {
4770 			e = s + rl -1;
4771 			if (e >= c->bm_bits) {
4772 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4773 				return -EIO;
4774 			}
4775 			_drbd_bm_set_bits(peer_device->device, s, e);
4776 		}
4777 
4778 		if (have < bits) {
4779 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4780 				have, bits, look_ahead,
4781 				(unsigned int)(bs.cur.b - p->code),
4782 				(unsigned int)bs.buf_len);
4783 			return -EIO;
4784 		}
4785 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4786 		if (likely(bits < 64))
4787 			look_ahead >>= bits;
4788 		else
4789 			look_ahead = 0;
4790 		have -= bits;
4791 
4792 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4793 		if (bits < 0)
4794 			return -EIO;
4795 		look_ahead |= tmp << have;
4796 		have += bits;
4797 	}
4798 
4799 	c->bit_offset = s;
4800 	bm_xfer_ctx_bit_to_word_offset(c);
4801 
4802 	return (s != c->bm_bits);
4803 }
4804 
4805 /**
4806  * decode_bitmap_c
4807  *
4808  * Return 0 when done, 1 when another iteration is needed, and a negative error
4809  * code upon failure.
4810  */
4811 static int
4812 decode_bitmap_c(struct drbd_peer_device *peer_device,
4813 		struct p_compressed_bm *p,
4814 		struct bm_xfer_ctx *c,
4815 		unsigned int len)
4816 {
4817 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4818 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4819 
4820 	/* other variants had been implemented for evaluation,
4821 	 * but have been dropped as this one turned out to be "best"
4822 	 * during all our tests. */
4823 
4824 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4825 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4826 	return -EIO;
4827 }
4828 
4829 void INFO_bm_xfer_stats(struct drbd_device *device,
4830 		const char *direction, struct bm_xfer_ctx *c)
4831 {
4832 	/* what would it take to transfer it "plaintext" */
4833 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4834 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4835 	unsigned int plain =
4836 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4837 		c->bm_words * sizeof(unsigned long);
4838 	unsigned int total = c->bytes[0] + c->bytes[1];
4839 	unsigned int r;
4840 
4841 	/* total can not be zero. but just in case: */
4842 	if (total == 0)
4843 		return;
4844 
4845 	/* don't report if not compressed */
4846 	if (total >= plain)
4847 		return;
4848 
4849 	/* total < plain. check for overflow, still */
4850 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4851 		                    : (1000 * total / plain);
4852 
4853 	if (r > 1000)
4854 		r = 1000;
4855 
4856 	r = 1000 - r;
4857 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4858 	     "total %u; compression: %u.%u%%\n",
4859 			direction,
4860 			c->bytes[1], c->packets[1],
4861 			c->bytes[0], c->packets[0],
4862 			total, r/10, r % 10);
4863 }
4864 
4865 /* Since we are processing the bitfield from lower addresses to higher,
4866    it does not matter if the process it in 32 bit chunks or 64 bit
4867    chunks as long as it is little endian. (Understand it as byte stream,
4868    beginning with the lowest byte...) If we would use big endian
4869    we would need to process it from the highest address to the lowest,
4870    in order to be agnostic to the 32 vs 64 bits issue.
4871 
4872    returns 0 on failure, 1 if we successfully received it. */
4873 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4874 {
4875 	struct drbd_peer_device *peer_device;
4876 	struct drbd_device *device;
4877 	struct bm_xfer_ctx c;
4878 	int err;
4879 
4880 	peer_device = conn_peer_device(connection, pi->vnr);
4881 	if (!peer_device)
4882 		return -EIO;
4883 	device = peer_device->device;
4884 
4885 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4886 	/* you are supposed to send additional out-of-sync information
4887 	 * if you actually set bits during this phase */
4888 
4889 	c = (struct bm_xfer_ctx) {
4890 		.bm_bits = drbd_bm_bits(device),
4891 		.bm_words = drbd_bm_words(device),
4892 	};
4893 
4894 	for(;;) {
4895 		if (pi->cmd == P_BITMAP)
4896 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4897 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4898 			/* MAYBE: sanity check that we speak proto >= 90,
4899 			 * and the feature is enabled! */
4900 			struct p_compressed_bm *p = pi->data;
4901 
4902 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4903 				drbd_err(device, "ReportCBitmap packet too large\n");
4904 				err = -EIO;
4905 				goto out;
4906 			}
4907 			if (pi->size <= sizeof(*p)) {
4908 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4909 				err = -EIO;
4910 				goto out;
4911 			}
4912 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4913 			if (err)
4914 			       goto out;
4915 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4916 		} else {
4917 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4918 			err = -EIO;
4919 			goto out;
4920 		}
4921 
4922 		c.packets[pi->cmd == P_BITMAP]++;
4923 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4924 
4925 		if (err <= 0) {
4926 			if (err < 0)
4927 				goto out;
4928 			break;
4929 		}
4930 		err = drbd_recv_header(peer_device->connection, pi);
4931 		if (err)
4932 			goto out;
4933 	}
4934 
4935 	INFO_bm_xfer_stats(device, "receive", &c);
4936 
4937 	if (device->state.conn == C_WF_BITMAP_T) {
4938 		enum drbd_state_rv rv;
4939 
4940 		err = drbd_send_bitmap(device);
4941 		if (err)
4942 			goto out;
4943 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4944 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4945 		D_ASSERT(device, rv == SS_SUCCESS);
4946 	} else if (device->state.conn != C_WF_BITMAP_S) {
4947 		/* admin may have requested C_DISCONNECTING,
4948 		 * other threads may have noticed network errors */
4949 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4950 		    drbd_conn_str(device->state.conn));
4951 	}
4952 	err = 0;
4953 
4954  out:
4955 	drbd_bm_unlock(device);
4956 	if (!err && device->state.conn == C_WF_BITMAP_S)
4957 		drbd_start_resync(device, C_SYNC_SOURCE);
4958 	return err;
4959 }
4960 
4961 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4962 {
4963 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4964 		 pi->cmd, pi->size);
4965 
4966 	return ignore_remaining_packet(connection, pi);
4967 }
4968 
4969 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4970 {
4971 	/* Make sure we've acked all the TCP data associated
4972 	 * with the data requests being unplugged */
4973 	drbd_tcp_quickack(connection->data.socket);
4974 
4975 	return 0;
4976 }
4977 
4978 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4979 {
4980 	struct drbd_peer_device *peer_device;
4981 	struct drbd_device *device;
4982 	struct p_block_desc *p = pi->data;
4983 
4984 	peer_device = conn_peer_device(connection, pi->vnr);
4985 	if (!peer_device)
4986 		return -EIO;
4987 	device = peer_device->device;
4988 
4989 	switch (device->state.conn) {
4990 	case C_WF_SYNC_UUID:
4991 	case C_WF_BITMAP_T:
4992 	case C_BEHIND:
4993 			break;
4994 	default:
4995 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4996 				drbd_conn_str(device->state.conn));
4997 	}
4998 
4999 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
5000 
5001 	return 0;
5002 }
5003 
5004 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
5005 {
5006 	struct drbd_peer_device *peer_device;
5007 	struct p_block_desc *p = pi->data;
5008 	struct drbd_device *device;
5009 	sector_t sector;
5010 	int size, err = 0;
5011 
5012 	peer_device = conn_peer_device(connection, pi->vnr);
5013 	if (!peer_device)
5014 		return -EIO;
5015 	device = peer_device->device;
5016 
5017 	sector = be64_to_cpu(p->sector);
5018 	size = be32_to_cpu(p->blksize);
5019 
5020 	dec_rs_pending(device);
5021 
5022 	if (get_ldev(device)) {
5023 		struct drbd_peer_request *peer_req;
5024 		const int op = REQ_OP_WRITE_ZEROES;
5025 
5026 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5027 					       size, 0, GFP_NOIO);
5028 		if (!peer_req) {
5029 			put_ldev(device);
5030 			return -ENOMEM;
5031 		}
5032 
5033 		peer_req->w.cb = e_end_resync_block;
5034 		peer_req->submit_jif = jiffies;
5035 		peer_req->flags |= EE_TRIM;
5036 
5037 		spin_lock_irq(&device->resource->req_lock);
5038 		list_add_tail(&peer_req->w.list, &device->sync_ee);
5039 		spin_unlock_irq(&device->resource->req_lock);
5040 
5041 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
5042 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5043 
5044 		if (err) {
5045 			spin_lock_irq(&device->resource->req_lock);
5046 			list_del(&peer_req->w.list);
5047 			spin_unlock_irq(&device->resource->req_lock);
5048 
5049 			drbd_free_peer_req(device, peer_req);
5050 			put_ldev(device);
5051 			err = 0;
5052 			goto fail;
5053 		}
5054 
5055 		inc_unacked(device);
5056 
5057 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5058 		   as well as drbd_rs_complete_io() */
5059 	} else {
5060 	fail:
5061 		drbd_rs_complete_io(device, sector);
5062 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5063 	}
5064 
5065 	atomic_add(size >> 9, &device->rs_sect_in);
5066 
5067 	return err;
5068 }
5069 
5070 struct data_cmd {
5071 	int expect_payload;
5072 	unsigned int pkt_size;
5073 	int (*fn)(struct drbd_connection *, struct packet_info *);
5074 };
5075 
5076 static struct data_cmd drbd_cmd_handler[] = {
5077 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
5078 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
5079 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5080 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5081 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
5082 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5083 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5084 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5085 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
5087 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5088 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5089 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
5090 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
5091 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
5092 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5093 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5094 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5095 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5096 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5097 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5098 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5099 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5100 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5101 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5102 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
5103 	[P_ZEROES]	    = { 0, sizeof(struct p_trim), receive_Data },
5104 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5105 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
5106 };
5107 
5108 static void drbdd(struct drbd_connection *connection)
5109 {
5110 	struct packet_info pi;
5111 	size_t shs; /* sub header size */
5112 	int err;
5113 
5114 	while (get_t_state(&connection->receiver) == RUNNING) {
5115 		struct data_cmd const *cmd;
5116 
5117 		drbd_thread_current_set_cpu(&connection->receiver);
5118 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5119 		if (drbd_recv_header_maybe_unplug(connection, &pi))
5120 			goto err_out;
5121 
5122 		cmd = &drbd_cmd_handler[pi.cmd];
5123 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5124 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5125 				 cmdname(pi.cmd), pi.cmd);
5126 			goto err_out;
5127 		}
5128 
5129 		shs = cmd->pkt_size;
5130 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5131 			shs += sizeof(struct o_qlim);
5132 		if (pi.size > shs && !cmd->expect_payload) {
5133 			drbd_err(connection, "No payload expected %s l:%d\n",
5134 				 cmdname(pi.cmd), pi.size);
5135 			goto err_out;
5136 		}
5137 		if (pi.size < shs) {
5138 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5139 				 cmdname(pi.cmd), (int)shs, pi.size);
5140 			goto err_out;
5141 		}
5142 
5143 		if (shs) {
5144 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5145 			err = drbd_recv_all_warn(connection, pi.data, shs);
5146 			if (err)
5147 				goto err_out;
5148 			pi.size -= shs;
5149 		}
5150 
5151 		update_receiver_timing_details(connection, cmd->fn);
5152 		err = cmd->fn(connection, &pi);
5153 		if (err) {
5154 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5155 				 cmdname(pi.cmd), err, pi.size);
5156 			goto err_out;
5157 		}
5158 	}
5159 	return;
5160 
5161     err_out:
5162 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5163 }
5164 
5165 static void conn_disconnect(struct drbd_connection *connection)
5166 {
5167 	struct drbd_peer_device *peer_device;
5168 	enum drbd_conns oc;
5169 	int vnr;
5170 
5171 	if (connection->cstate == C_STANDALONE)
5172 		return;
5173 
5174 	/* We are about to start the cleanup after connection loss.
5175 	 * Make sure drbd_make_request knows about that.
5176 	 * Usually we should be in some network failure state already,
5177 	 * but just in case we are not, we fix it up here.
5178 	 */
5179 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5180 
5181 	/* ack_receiver does not clean up anything. it must not interfere, either */
5182 	drbd_thread_stop(&connection->ack_receiver);
5183 	if (connection->ack_sender) {
5184 		destroy_workqueue(connection->ack_sender);
5185 		connection->ack_sender = NULL;
5186 	}
5187 	drbd_free_sock(connection);
5188 
5189 	rcu_read_lock();
5190 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5191 		struct drbd_device *device = peer_device->device;
5192 		kref_get(&device->kref);
5193 		rcu_read_unlock();
5194 		drbd_disconnected(peer_device);
5195 		kref_put(&device->kref, drbd_destroy_device);
5196 		rcu_read_lock();
5197 	}
5198 	rcu_read_unlock();
5199 
5200 	if (!list_empty(&connection->current_epoch->list))
5201 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5202 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5203 	atomic_set(&connection->current_epoch->epoch_size, 0);
5204 	connection->send.seen_any_write_yet = false;
5205 
5206 	drbd_info(connection, "Connection closed\n");
5207 
5208 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5209 		conn_try_outdate_peer_async(connection);
5210 
5211 	spin_lock_irq(&connection->resource->req_lock);
5212 	oc = connection->cstate;
5213 	if (oc >= C_UNCONNECTED)
5214 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5215 
5216 	spin_unlock_irq(&connection->resource->req_lock);
5217 
5218 	if (oc == C_DISCONNECTING)
5219 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5220 }
5221 
5222 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5223 {
5224 	struct drbd_device *device = peer_device->device;
5225 	unsigned int i;
5226 
5227 	/* wait for current activity to cease. */
5228 	spin_lock_irq(&device->resource->req_lock);
5229 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5230 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5231 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5232 	spin_unlock_irq(&device->resource->req_lock);
5233 
5234 	/* We do not have data structures that would allow us to
5235 	 * get the rs_pending_cnt down to 0 again.
5236 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5237 	 *    the pending RSDataRequest's we have sent.
5238 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5239 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5240 	 *  And no, it is not the sum of the reference counts in the
5241 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5242 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5243 	 *  on the fly. */
5244 	drbd_rs_cancel_all(device);
5245 	device->rs_total = 0;
5246 	device->rs_failed = 0;
5247 	atomic_set(&device->rs_pending_cnt, 0);
5248 	wake_up(&device->misc_wait);
5249 
5250 	del_timer_sync(&device->resync_timer);
5251 	resync_timer_fn(&device->resync_timer);
5252 
5253 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5254 	 * w_make_resync_request etc. which may still be on the worker queue
5255 	 * to be "canceled" */
5256 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5257 
5258 	drbd_finish_peer_reqs(device);
5259 
5260 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5261 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5262 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5263 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5264 
5265 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5266 	 * again via drbd_try_clear_on_disk_bm(). */
5267 	drbd_rs_cancel_all(device);
5268 
5269 	kfree(device->p_uuid);
5270 	device->p_uuid = NULL;
5271 
5272 	if (!drbd_suspended(device))
5273 		tl_clear(peer_device->connection);
5274 
5275 	drbd_md_sync(device);
5276 
5277 	if (get_ldev(device)) {
5278 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5279 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5280 		put_ldev(device);
5281 	}
5282 
5283 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5284 	 * want to use SO_LINGER, because apparently it can be deferred for
5285 	 * more than 20 seconds (longest time I checked).
5286 	 *
5287 	 * Actually we don't care for exactly when the network stack does its
5288 	 * put_page(), but release our reference on these pages right here.
5289 	 */
5290 	i = drbd_free_peer_reqs(device, &device->net_ee);
5291 	if (i)
5292 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5293 	i = atomic_read(&device->pp_in_use_by_net);
5294 	if (i)
5295 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5296 	i = atomic_read(&device->pp_in_use);
5297 	if (i)
5298 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5299 
5300 	D_ASSERT(device, list_empty(&device->read_ee));
5301 	D_ASSERT(device, list_empty(&device->active_ee));
5302 	D_ASSERT(device, list_empty(&device->sync_ee));
5303 	D_ASSERT(device, list_empty(&device->done_ee));
5304 
5305 	return 0;
5306 }
5307 
5308 /*
5309  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5310  * we can agree on is stored in agreed_pro_version.
5311  *
5312  * feature flags and the reserved array should be enough room for future
5313  * enhancements of the handshake protocol, and possible plugins...
5314  *
5315  * for now, they are expected to be zero, but ignored.
5316  */
5317 static int drbd_send_features(struct drbd_connection *connection)
5318 {
5319 	struct drbd_socket *sock;
5320 	struct p_connection_features *p;
5321 
5322 	sock = &connection->data;
5323 	p = conn_prepare_command(connection, sock);
5324 	if (!p)
5325 		return -EIO;
5326 	memset(p, 0, sizeof(*p));
5327 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5328 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5329 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5330 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5331 }
5332 
5333 /*
5334  * return values:
5335  *   1 yes, we have a valid connection
5336  *   0 oops, did not work out, please try again
5337  *  -1 peer talks different language,
5338  *     no point in trying again, please go standalone.
5339  */
5340 static int drbd_do_features(struct drbd_connection *connection)
5341 {
5342 	/* ASSERT current == connection->receiver ... */
5343 	struct p_connection_features *p;
5344 	const int expect = sizeof(struct p_connection_features);
5345 	struct packet_info pi;
5346 	int err;
5347 
5348 	err = drbd_send_features(connection);
5349 	if (err)
5350 		return 0;
5351 
5352 	err = drbd_recv_header(connection, &pi);
5353 	if (err)
5354 		return 0;
5355 
5356 	if (pi.cmd != P_CONNECTION_FEATURES) {
5357 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5358 			 cmdname(pi.cmd), pi.cmd);
5359 		return -1;
5360 	}
5361 
5362 	if (pi.size != expect) {
5363 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5364 		     expect, pi.size);
5365 		return -1;
5366 	}
5367 
5368 	p = pi.data;
5369 	err = drbd_recv_all_warn(connection, p, expect);
5370 	if (err)
5371 		return 0;
5372 
5373 	p->protocol_min = be32_to_cpu(p->protocol_min);
5374 	p->protocol_max = be32_to_cpu(p->protocol_max);
5375 	if (p->protocol_max == 0)
5376 		p->protocol_max = p->protocol_min;
5377 
5378 	if (PRO_VERSION_MAX < p->protocol_min ||
5379 	    PRO_VERSION_MIN > p->protocol_max)
5380 		goto incompat;
5381 
5382 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5383 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5384 
5385 	drbd_info(connection, "Handshake successful: "
5386 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5387 
5388 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5389 		  connection->agreed_features,
5390 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5391 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5392 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5393 		  connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5394 		  connection->agreed_features ? "" : " none");
5395 
5396 	return 1;
5397 
5398  incompat:
5399 	drbd_err(connection, "incompatible DRBD dialects: "
5400 	    "I support %d-%d, peer supports %d-%d\n",
5401 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5402 	    p->protocol_min, p->protocol_max);
5403 	return -1;
5404 }
5405 
5406 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5407 static int drbd_do_auth(struct drbd_connection *connection)
5408 {
5409 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5410 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5411 	return -1;
5412 }
5413 #else
5414 #define CHALLENGE_LEN 64
5415 
5416 /* Return value:
5417 	1 - auth succeeded,
5418 	0 - failed, try again (network error),
5419 	-1 - auth failed, don't try again.
5420 */
5421 
5422 static int drbd_do_auth(struct drbd_connection *connection)
5423 {
5424 	struct drbd_socket *sock;
5425 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5426 	char *response = NULL;
5427 	char *right_response = NULL;
5428 	char *peers_ch = NULL;
5429 	unsigned int key_len;
5430 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5431 	unsigned int resp_size;
5432 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5433 	struct packet_info pi;
5434 	struct net_conf *nc;
5435 	int err, rv;
5436 
5437 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5438 
5439 	rcu_read_lock();
5440 	nc = rcu_dereference(connection->net_conf);
5441 	key_len = strlen(nc->shared_secret);
5442 	memcpy(secret, nc->shared_secret, key_len);
5443 	rcu_read_unlock();
5444 
5445 	desc->tfm = connection->cram_hmac_tfm;
5446 
5447 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5448 	if (rv) {
5449 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5450 		rv = -1;
5451 		goto fail;
5452 	}
5453 
5454 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5455 
5456 	sock = &connection->data;
5457 	if (!conn_prepare_command(connection, sock)) {
5458 		rv = 0;
5459 		goto fail;
5460 	}
5461 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5462 				my_challenge, CHALLENGE_LEN);
5463 	if (!rv)
5464 		goto fail;
5465 
5466 	err = drbd_recv_header(connection, &pi);
5467 	if (err) {
5468 		rv = 0;
5469 		goto fail;
5470 	}
5471 
5472 	if (pi.cmd != P_AUTH_CHALLENGE) {
5473 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5474 			 cmdname(pi.cmd), pi.cmd);
5475 		rv = -1;
5476 		goto fail;
5477 	}
5478 
5479 	if (pi.size > CHALLENGE_LEN * 2) {
5480 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5481 		rv = -1;
5482 		goto fail;
5483 	}
5484 
5485 	if (pi.size < CHALLENGE_LEN) {
5486 		drbd_err(connection, "AuthChallenge payload too small.\n");
5487 		rv = -1;
5488 		goto fail;
5489 	}
5490 
5491 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5492 	if (peers_ch == NULL) {
5493 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5494 		rv = -1;
5495 		goto fail;
5496 	}
5497 
5498 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5499 	if (err) {
5500 		rv = 0;
5501 		goto fail;
5502 	}
5503 
5504 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5505 		drbd_err(connection, "Peer presented the same challenge!\n");
5506 		rv = -1;
5507 		goto fail;
5508 	}
5509 
5510 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5511 	response = kmalloc(resp_size, GFP_NOIO);
5512 	if (response == NULL) {
5513 		drbd_err(connection, "kmalloc of response failed\n");
5514 		rv = -1;
5515 		goto fail;
5516 	}
5517 
5518 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5519 	if (rv) {
5520 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5521 		rv = -1;
5522 		goto fail;
5523 	}
5524 
5525 	if (!conn_prepare_command(connection, sock)) {
5526 		rv = 0;
5527 		goto fail;
5528 	}
5529 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5530 				response, resp_size);
5531 	if (!rv)
5532 		goto fail;
5533 
5534 	err = drbd_recv_header(connection, &pi);
5535 	if (err) {
5536 		rv = 0;
5537 		goto fail;
5538 	}
5539 
5540 	if (pi.cmd != P_AUTH_RESPONSE) {
5541 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5542 			 cmdname(pi.cmd), pi.cmd);
5543 		rv = 0;
5544 		goto fail;
5545 	}
5546 
5547 	if (pi.size != resp_size) {
5548 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5549 		rv = 0;
5550 		goto fail;
5551 	}
5552 
5553 	err = drbd_recv_all_warn(connection, response , resp_size);
5554 	if (err) {
5555 		rv = 0;
5556 		goto fail;
5557 	}
5558 
5559 	right_response = kmalloc(resp_size, GFP_NOIO);
5560 	if (right_response == NULL) {
5561 		drbd_err(connection, "kmalloc of right_response failed\n");
5562 		rv = -1;
5563 		goto fail;
5564 	}
5565 
5566 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5567 				 right_response);
5568 	if (rv) {
5569 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5570 		rv = -1;
5571 		goto fail;
5572 	}
5573 
5574 	rv = !memcmp(response, right_response, resp_size);
5575 
5576 	if (rv)
5577 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5578 		     resp_size);
5579 	else
5580 		rv = -1;
5581 
5582  fail:
5583 	kfree(peers_ch);
5584 	kfree(response);
5585 	kfree(right_response);
5586 	shash_desc_zero(desc);
5587 
5588 	return rv;
5589 }
5590 #endif
5591 
5592 int drbd_receiver(struct drbd_thread *thi)
5593 {
5594 	struct drbd_connection *connection = thi->connection;
5595 	int h;
5596 
5597 	drbd_info(connection, "receiver (re)started\n");
5598 
5599 	do {
5600 		h = conn_connect(connection);
5601 		if (h == 0) {
5602 			conn_disconnect(connection);
5603 			schedule_timeout_interruptible(HZ);
5604 		}
5605 		if (h == -1) {
5606 			drbd_warn(connection, "Discarding network configuration.\n");
5607 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5608 		}
5609 	} while (h == 0);
5610 
5611 	if (h > 0) {
5612 		blk_start_plug(&connection->receiver_plug);
5613 		drbdd(connection);
5614 		blk_finish_plug(&connection->receiver_plug);
5615 	}
5616 
5617 	conn_disconnect(connection);
5618 
5619 	drbd_info(connection, "receiver terminated\n");
5620 	return 0;
5621 }
5622 
5623 /* ********* acknowledge sender ******** */
5624 
5625 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5626 {
5627 	struct p_req_state_reply *p = pi->data;
5628 	int retcode = be32_to_cpu(p->retcode);
5629 
5630 	if (retcode >= SS_SUCCESS) {
5631 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5632 	} else {
5633 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5634 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5635 			 drbd_set_st_err_str(retcode), retcode);
5636 	}
5637 	wake_up(&connection->ping_wait);
5638 
5639 	return 0;
5640 }
5641 
5642 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5643 {
5644 	struct drbd_peer_device *peer_device;
5645 	struct drbd_device *device;
5646 	struct p_req_state_reply *p = pi->data;
5647 	int retcode = be32_to_cpu(p->retcode);
5648 
5649 	peer_device = conn_peer_device(connection, pi->vnr);
5650 	if (!peer_device)
5651 		return -EIO;
5652 	device = peer_device->device;
5653 
5654 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5655 		D_ASSERT(device, connection->agreed_pro_version < 100);
5656 		return got_conn_RqSReply(connection, pi);
5657 	}
5658 
5659 	if (retcode >= SS_SUCCESS) {
5660 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5661 	} else {
5662 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5663 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5664 			drbd_set_st_err_str(retcode), retcode);
5665 	}
5666 	wake_up(&device->state_wait);
5667 
5668 	return 0;
5669 }
5670 
5671 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5672 {
5673 	return drbd_send_ping_ack(connection);
5674 
5675 }
5676 
5677 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5678 {
5679 	/* restore idle timeout */
5680 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5681 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5682 		wake_up(&connection->ping_wait);
5683 
5684 	return 0;
5685 }
5686 
5687 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5688 {
5689 	struct drbd_peer_device *peer_device;
5690 	struct drbd_device *device;
5691 	struct p_block_ack *p = pi->data;
5692 	sector_t sector = be64_to_cpu(p->sector);
5693 	int blksize = be32_to_cpu(p->blksize);
5694 
5695 	peer_device = conn_peer_device(connection, pi->vnr);
5696 	if (!peer_device)
5697 		return -EIO;
5698 	device = peer_device->device;
5699 
5700 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5701 
5702 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5703 
5704 	if (get_ldev(device)) {
5705 		drbd_rs_complete_io(device, sector);
5706 		drbd_set_in_sync(device, sector, blksize);
5707 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5708 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5709 		put_ldev(device);
5710 	}
5711 	dec_rs_pending(device);
5712 	atomic_add(blksize >> 9, &device->rs_sect_in);
5713 
5714 	return 0;
5715 }
5716 
5717 static int
5718 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5719 			      struct rb_root *root, const char *func,
5720 			      enum drbd_req_event what, bool missing_ok)
5721 {
5722 	struct drbd_request *req;
5723 	struct bio_and_error m;
5724 
5725 	spin_lock_irq(&device->resource->req_lock);
5726 	req = find_request(device, root, id, sector, missing_ok, func);
5727 	if (unlikely(!req)) {
5728 		spin_unlock_irq(&device->resource->req_lock);
5729 		return -EIO;
5730 	}
5731 	__req_mod(req, what, &m);
5732 	spin_unlock_irq(&device->resource->req_lock);
5733 
5734 	if (m.bio)
5735 		complete_master_bio(device, &m);
5736 	return 0;
5737 }
5738 
5739 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5740 {
5741 	struct drbd_peer_device *peer_device;
5742 	struct drbd_device *device;
5743 	struct p_block_ack *p = pi->data;
5744 	sector_t sector = be64_to_cpu(p->sector);
5745 	int blksize = be32_to_cpu(p->blksize);
5746 	enum drbd_req_event what;
5747 
5748 	peer_device = conn_peer_device(connection, pi->vnr);
5749 	if (!peer_device)
5750 		return -EIO;
5751 	device = peer_device->device;
5752 
5753 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5754 
5755 	if (p->block_id == ID_SYNCER) {
5756 		drbd_set_in_sync(device, sector, blksize);
5757 		dec_rs_pending(device);
5758 		return 0;
5759 	}
5760 	switch (pi->cmd) {
5761 	case P_RS_WRITE_ACK:
5762 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5763 		break;
5764 	case P_WRITE_ACK:
5765 		what = WRITE_ACKED_BY_PEER;
5766 		break;
5767 	case P_RECV_ACK:
5768 		what = RECV_ACKED_BY_PEER;
5769 		break;
5770 	case P_SUPERSEDED:
5771 		what = CONFLICT_RESOLVED;
5772 		break;
5773 	case P_RETRY_WRITE:
5774 		what = POSTPONE_WRITE;
5775 		break;
5776 	default:
5777 		BUG();
5778 	}
5779 
5780 	return validate_req_change_req_state(device, p->block_id, sector,
5781 					     &device->write_requests, __func__,
5782 					     what, false);
5783 }
5784 
5785 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5786 {
5787 	struct drbd_peer_device *peer_device;
5788 	struct drbd_device *device;
5789 	struct p_block_ack *p = pi->data;
5790 	sector_t sector = be64_to_cpu(p->sector);
5791 	int size = be32_to_cpu(p->blksize);
5792 	int err;
5793 
5794 	peer_device = conn_peer_device(connection, pi->vnr);
5795 	if (!peer_device)
5796 		return -EIO;
5797 	device = peer_device->device;
5798 
5799 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5800 
5801 	if (p->block_id == ID_SYNCER) {
5802 		dec_rs_pending(device);
5803 		drbd_rs_failed_io(device, sector, size);
5804 		return 0;
5805 	}
5806 
5807 	err = validate_req_change_req_state(device, p->block_id, sector,
5808 					    &device->write_requests, __func__,
5809 					    NEG_ACKED, true);
5810 	if (err) {
5811 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5812 		   The master bio might already be completed, therefore the
5813 		   request is no longer in the collision hash. */
5814 		/* In Protocol B we might already have got a P_RECV_ACK
5815 		   but then get a P_NEG_ACK afterwards. */
5816 		drbd_set_out_of_sync(device, sector, size);
5817 	}
5818 	return 0;
5819 }
5820 
5821 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5822 {
5823 	struct drbd_peer_device *peer_device;
5824 	struct drbd_device *device;
5825 	struct p_block_ack *p = pi->data;
5826 	sector_t sector = be64_to_cpu(p->sector);
5827 
5828 	peer_device = conn_peer_device(connection, pi->vnr);
5829 	if (!peer_device)
5830 		return -EIO;
5831 	device = peer_device->device;
5832 
5833 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5834 
5835 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5836 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5837 
5838 	return validate_req_change_req_state(device, p->block_id, sector,
5839 					     &device->read_requests, __func__,
5840 					     NEG_ACKED, false);
5841 }
5842 
5843 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5844 {
5845 	struct drbd_peer_device *peer_device;
5846 	struct drbd_device *device;
5847 	sector_t sector;
5848 	int size;
5849 	struct p_block_ack *p = pi->data;
5850 
5851 	peer_device = conn_peer_device(connection, pi->vnr);
5852 	if (!peer_device)
5853 		return -EIO;
5854 	device = peer_device->device;
5855 
5856 	sector = be64_to_cpu(p->sector);
5857 	size = be32_to_cpu(p->blksize);
5858 
5859 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5860 
5861 	dec_rs_pending(device);
5862 
5863 	if (get_ldev_if_state(device, D_FAILED)) {
5864 		drbd_rs_complete_io(device, sector);
5865 		switch (pi->cmd) {
5866 		case P_NEG_RS_DREPLY:
5867 			drbd_rs_failed_io(device, sector, size);
5868 		case P_RS_CANCEL:
5869 			break;
5870 		default:
5871 			BUG();
5872 		}
5873 		put_ldev(device);
5874 	}
5875 
5876 	return 0;
5877 }
5878 
5879 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5880 {
5881 	struct p_barrier_ack *p = pi->data;
5882 	struct drbd_peer_device *peer_device;
5883 	int vnr;
5884 
5885 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5886 
5887 	rcu_read_lock();
5888 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5889 		struct drbd_device *device = peer_device->device;
5890 
5891 		if (device->state.conn == C_AHEAD &&
5892 		    atomic_read(&device->ap_in_flight) == 0 &&
5893 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5894 			device->start_resync_timer.expires = jiffies + HZ;
5895 			add_timer(&device->start_resync_timer);
5896 		}
5897 	}
5898 	rcu_read_unlock();
5899 
5900 	return 0;
5901 }
5902 
5903 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5904 {
5905 	struct drbd_peer_device *peer_device;
5906 	struct drbd_device *device;
5907 	struct p_block_ack *p = pi->data;
5908 	struct drbd_device_work *dw;
5909 	sector_t sector;
5910 	int size;
5911 
5912 	peer_device = conn_peer_device(connection, pi->vnr);
5913 	if (!peer_device)
5914 		return -EIO;
5915 	device = peer_device->device;
5916 
5917 	sector = be64_to_cpu(p->sector);
5918 	size = be32_to_cpu(p->blksize);
5919 
5920 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5921 
5922 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5923 		drbd_ov_out_of_sync_found(device, sector, size);
5924 	else
5925 		ov_out_of_sync_print(device);
5926 
5927 	if (!get_ldev(device))
5928 		return 0;
5929 
5930 	drbd_rs_complete_io(device, sector);
5931 	dec_rs_pending(device);
5932 
5933 	--device->ov_left;
5934 
5935 	/* let's advance progress step marks only for every other megabyte */
5936 	if ((device->ov_left & 0x200) == 0x200)
5937 		drbd_advance_rs_marks(device, device->ov_left);
5938 
5939 	if (device->ov_left == 0) {
5940 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5941 		if (dw) {
5942 			dw->w.cb = w_ov_finished;
5943 			dw->device = device;
5944 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5945 		} else {
5946 			drbd_err(device, "kmalloc(dw) failed.");
5947 			ov_out_of_sync_print(device);
5948 			drbd_resync_finished(device);
5949 		}
5950 	}
5951 	put_ldev(device);
5952 	return 0;
5953 }
5954 
5955 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5956 {
5957 	return 0;
5958 }
5959 
5960 struct meta_sock_cmd {
5961 	size_t pkt_size;
5962 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5963 };
5964 
5965 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5966 {
5967 	long t;
5968 	struct net_conf *nc;
5969 
5970 	rcu_read_lock();
5971 	nc = rcu_dereference(connection->net_conf);
5972 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5973 	rcu_read_unlock();
5974 
5975 	t *= HZ;
5976 	if (ping_timeout)
5977 		t /= 10;
5978 
5979 	connection->meta.socket->sk->sk_rcvtimeo = t;
5980 }
5981 
5982 static void set_ping_timeout(struct drbd_connection *connection)
5983 {
5984 	set_rcvtimeo(connection, 1);
5985 }
5986 
5987 static void set_idle_timeout(struct drbd_connection *connection)
5988 {
5989 	set_rcvtimeo(connection, 0);
5990 }
5991 
5992 static struct meta_sock_cmd ack_receiver_tbl[] = {
5993 	[P_PING]	    = { 0, got_Ping },
5994 	[P_PING_ACK]	    = { 0, got_PingAck },
5995 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5996 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5997 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5998 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5999 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
6000 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
6001 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
6002 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
6003 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
6004 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6005 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
6006 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
6007 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
6008 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6009 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
6010 };
6011 
6012 int drbd_ack_receiver(struct drbd_thread *thi)
6013 {
6014 	struct drbd_connection *connection = thi->connection;
6015 	struct meta_sock_cmd *cmd = NULL;
6016 	struct packet_info pi;
6017 	unsigned long pre_recv_jif;
6018 	int rv;
6019 	void *buf    = connection->meta.rbuf;
6020 	int received = 0;
6021 	unsigned int header_size = drbd_header_size(connection);
6022 	int expect   = header_size;
6023 	bool ping_timeout_active = false;
6024 	struct sched_param param = { .sched_priority = 2 };
6025 
6026 	rv = sched_setscheduler(current, SCHED_RR, &param);
6027 	if (rv < 0)
6028 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
6029 
6030 	while (get_t_state(thi) == RUNNING) {
6031 		drbd_thread_current_set_cpu(thi);
6032 
6033 		conn_reclaim_net_peer_reqs(connection);
6034 
6035 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6036 			if (drbd_send_ping(connection)) {
6037 				drbd_err(connection, "drbd_send_ping has failed\n");
6038 				goto reconnect;
6039 			}
6040 			set_ping_timeout(connection);
6041 			ping_timeout_active = true;
6042 		}
6043 
6044 		pre_recv_jif = jiffies;
6045 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6046 
6047 		/* Note:
6048 		 * -EINTR	 (on meta) we got a signal
6049 		 * -EAGAIN	 (on meta) rcvtimeo expired
6050 		 * -ECONNRESET	 other side closed the connection
6051 		 * -ERESTARTSYS  (on data) we got a signal
6052 		 * rv <  0	 other than above: unexpected error!
6053 		 * rv == expected: full header or command
6054 		 * rv <  expected: "woken" by signal during receive
6055 		 * rv == 0	 : "connection shut down by peer"
6056 		 */
6057 		if (likely(rv > 0)) {
6058 			received += rv;
6059 			buf	 += rv;
6060 		} else if (rv == 0) {
6061 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6062 				long t;
6063 				rcu_read_lock();
6064 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6065 				rcu_read_unlock();
6066 
6067 				t = wait_event_timeout(connection->ping_wait,
6068 						       connection->cstate < C_WF_REPORT_PARAMS,
6069 						       t);
6070 				if (t)
6071 					break;
6072 			}
6073 			drbd_err(connection, "meta connection shut down by peer.\n");
6074 			goto reconnect;
6075 		} else if (rv == -EAGAIN) {
6076 			/* If the data socket received something meanwhile,
6077 			 * that is good enough: peer is still alive. */
6078 			if (time_after(connection->last_received, pre_recv_jif))
6079 				continue;
6080 			if (ping_timeout_active) {
6081 				drbd_err(connection, "PingAck did not arrive in time.\n");
6082 				goto reconnect;
6083 			}
6084 			set_bit(SEND_PING, &connection->flags);
6085 			continue;
6086 		} else if (rv == -EINTR) {
6087 			/* maybe drbd_thread_stop(): the while condition will notice.
6088 			 * maybe woken for send_ping: we'll send a ping above,
6089 			 * and change the rcvtimeo */
6090 			flush_signals(current);
6091 			continue;
6092 		} else {
6093 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6094 			goto reconnect;
6095 		}
6096 
6097 		if (received == expect && cmd == NULL) {
6098 			if (decode_header(connection, connection->meta.rbuf, &pi))
6099 				goto reconnect;
6100 			cmd = &ack_receiver_tbl[pi.cmd];
6101 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6102 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6103 					 cmdname(pi.cmd), pi.cmd);
6104 				goto disconnect;
6105 			}
6106 			expect = header_size + cmd->pkt_size;
6107 			if (pi.size != expect - header_size) {
6108 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6109 					pi.cmd, pi.size);
6110 				goto reconnect;
6111 			}
6112 		}
6113 		if (received == expect) {
6114 			bool err;
6115 
6116 			err = cmd->fn(connection, &pi);
6117 			if (err) {
6118 				drbd_err(connection, "%ps failed\n", cmd->fn);
6119 				goto reconnect;
6120 			}
6121 
6122 			connection->last_received = jiffies;
6123 
6124 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6125 				set_idle_timeout(connection);
6126 				ping_timeout_active = false;
6127 			}
6128 
6129 			buf	 = connection->meta.rbuf;
6130 			received = 0;
6131 			expect	 = header_size;
6132 			cmd	 = NULL;
6133 		}
6134 	}
6135 
6136 	if (0) {
6137 reconnect:
6138 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6139 		conn_md_sync(connection);
6140 	}
6141 	if (0) {
6142 disconnect:
6143 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6144 	}
6145 
6146 	drbd_info(connection, "ack_receiver terminated\n");
6147 
6148 	return 0;
6149 }
6150 
6151 void drbd_send_acks_wf(struct work_struct *ws)
6152 {
6153 	struct drbd_peer_device *peer_device =
6154 		container_of(ws, struct drbd_peer_device, send_acks_work);
6155 	struct drbd_connection *connection = peer_device->connection;
6156 	struct drbd_device *device = peer_device->device;
6157 	struct net_conf *nc;
6158 	int tcp_cork, err;
6159 
6160 	rcu_read_lock();
6161 	nc = rcu_dereference(connection->net_conf);
6162 	tcp_cork = nc->tcp_cork;
6163 	rcu_read_unlock();
6164 
6165 	if (tcp_cork)
6166 		drbd_tcp_cork(connection->meta.socket);
6167 
6168 	err = drbd_finish_peer_reqs(device);
6169 	kref_put(&device->kref, drbd_destroy_device);
6170 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6171 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6172 
6173 	if (err) {
6174 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6175 		return;
6176 	}
6177 
6178 	if (tcp_cork)
6179 		drbd_tcp_uncork(connection->meta.socket);
6180 
6181 	return;
6182 }
6183