1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52 
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54 
55 struct packet_info {
56 	enum drbd_packet cmd;
57 	unsigned int size;
58 	unsigned int vnr;
59 	void *data;
60 };
61 
62 enum finish_epoch {
63 	FE_STILL_LIVE,
64 	FE_DESTROYED,
65 	FE_RECYCLED,
66 };
67 
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74 
75 
76 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
77 
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82 
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89 	struct page *page;
90 	struct page *tmp;
91 
92 	BUG_ON(!n);
93 	BUG_ON(!head);
94 
95 	page = *head;
96 
97 	if (!page)
98 		return NULL;
99 
100 	while (page) {
101 		tmp = page_chain_next(page);
102 		if (--n == 0)
103 			break; /* found sufficient pages */
104 		if (tmp == NULL)
105 			/* insufficient pages, don't use any of them. */
106 			return NULL;
107 		page = tmp;
108 	}
109 
110 	/* add end of list marker for the returned list */
111 	set_page_private(page, 0);
112 	/* actual return value, and adjustment of head */
113 	page = *head;
114 	*head = tmp;
115 	return page;
116 }
117 
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123 	struct page *tmp;
124 	int i = 1;
125 	while ((tmp = page_chain_next(page)))
126 		++i, page = tmp;
127 	if (len)
128 		*len = i;
129 	return page;
130 }
131 
132 static int page_chain_free(struct page *page)
133 {
134 	struct page *tmp;
135 	int i = 0;
136 	page_chain_for_each_safe(page, tmp) {
137 		put_page(page);
138 		++i;
139 	}
140 	return i;
141 }
142 
143 static void page_chain_add(struct page **head,
144 		struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147 	struct page *tmp;
148 	tmp = page_chain_tail(chain_first, NULL);
149 	BUG_ON(tmp != chain_last);
150 #endif
151 
152 	/* add chain to head */
153 	set_page_private(chain_last, (unsigned long)*head);
154 	*head = chain_first;
155 }
156 
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 				       unsigned int number)
159 {
160 	struct page *page = NULL;
161 	struct page *tmp = NULL;
162 	unsigned int i = 0;
163 
164 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
165 	 * So what. It saves a spin_lock. */
166 	if (drbd_pp_vacant >= number) {
167 		spin_lock(&drbd_pp_lock);
168 		page = page_chain_del(&drbd_pp_pool, number);
169 		if (page)
170 			drbd_pp_vacant -= number;
171 		spin_unlock(&drbd_pp_lock);
172 		if (page)
173 			return page;
174 	}
175 
176 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 	 * which in turn might block on the other node at this very place.  */
179 	for (i = 0; i < number; i++) {
180 		tmp = alloc_page(GFP_TRY);
181 		if (!tmp)
182 			break;
183 		set_page_private(tmp, (unsigned long)page);
184 		page = tmp;
185 	}
186 
187 	if (i == number)
188 		return page;
189 
190 	/* Not enough pages immediately available this time.
191 	 * No need to jump around here, drbd_alloc_pages will retry this
192 	 * function "soon". */
193 	if (page) {
194 		tmp = page_chain_tail(page, NULL);
195 		spin_lock(&drbd_pp_lock);
196 		page_chain_add(&drbd_pp_pool, page, tmp);
197 		drbd_pp_vacant += i;
198 		spin_unlock(&drbd_pp_lock);
199 	}
200 	return NULL;
201 }
202 
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 					   struct list_head *to_be_freed)
205 {
206 	struct drbd_peer_request *peer_req, *tmp;
207 
208 	/* The EEs are always appended to the end of the list. Since
209 	   they are sent in order over the wire, they have to finish
210 	   in order. As soon as we see the first not finished we can
211 	   stop to examine the list... */
212 
213 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 		if (drbd_peer_req_has_active_page(peer_req))
215 			break;
216 		list_move(&peer_req->w.list, to_be_freed);
217 	}
218 }
219 
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222 	LIST_HEAD(reclaimed);
223 	struct drbd_peer_request *peer_req, *t;
224 
225 	spin_lock_irq(&device->resource->req_lock);
226 	reclaim_finished_net_peer_reqs(device, &reclaimed);
227 	spin_unlock_irq(&device->resource->req_lock);
228 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 		drbd_free_net_peer_req(device, peer_req);
230 }
231 
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234 	struct drbd_peer_device *peer_device;
235 	int vnr;
236 
237 	rcu_read_lock();
238 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 		struct drbd_device *device = peer_device->device;
240 		if (!atomic_read(&device->pp_in_use_by_net))
241 			continue;
242 
243 		kref_get(&device->kref);
244 		rcu_read_unlock();
245 		drbd_reclaim_net_peer_reqs(device);
246 		kref_put(&device->kref, drbd_destroy_device);
247 		rcu_read_lock();
248 	}
249 	rcu_read_unlock();
250 }
251 
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:	DRBD device.
255  * @number:	number of pages requested
256  * @retry:	whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 			      bool retry)
274 {
275 	struct drbd_device *device = peer_device->device;
276 	struct page *page = NULL;
277 	struct net_conf *nc;
278 	DEFINE_WAIT(wait);
279 	unsigned int mxb;
280 
281 	rcu_read_lock();
282 	nc = rcu_dereference(peer_device->connection->net_conf);
283 	mxb = nc ? nc->max_buffers : 1000000;
284 	rcu_read_unlock();
285 
286 	if (atomic_read(&device->pp_in_use) < mxb)
287 		page = __drbd_alloc_pages(device, number);
288 
289 	/* Try to keep the fast path fast, but occasionally we need
290 	 * to reclaim the pages we lended to the network stack. */
291 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 		drbd_reclaim_net_peer_reqs(device);
293 
294 	while (page == NULL) {
295 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296 
297 		drbd_reclaim_net_peer_reqs(device);
298 
299 		if (atomic_read(&device->pp_in_use) < mxb) {
300 			page = __drbd_alloc_pages(device, number);
301 			if (page)
302 				break;
303 		}
304 
305 		if (!retry)
306 			break;
307 
308 		if (signal_pending(current)) {
309 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 			break;
311 		}
312 
313 		if (schedule_timeout(HZ/10) == 0)
314 			mxb = UINT_MAX;
315 	}
316 	finish_wait(&drbd_pp_wait, &wait);
317 
318 	if (page)
319 		atomic_add(number, &device->pp_in_use);
320 	return page;
321 }
322 
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 	int i;
331 
332 	if (page == NULL)
333 		return;
334 
335 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
336 		i = page_chain_free(page);
337 	else {
338 		struct page *tmp;
339 		tmp = page_chain_tail(page, &i);
340 		spin_lock(&drbd_pp_lock);
341 		page_chain_add(&drbd_pp_pool, page, tmp);
342 		drbd_pp_vacant += i;
343 		spin_unlock(&drbd_pp_lock);
344 	}
345 	i = atomic_sub_return(i, a);
346 	if (i < 0)
347 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 	wake_up(&drbd_pp_wait);
350 }
351 
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355 
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365 
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373 	struct drbd_device *device = peer_device->device;
374 	struct drbd_peer_request *peer_req;
375 	struct page *page = NULL;
376 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377 
378 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 		return NULL;
380 
381 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 	if (!peer_req) {
383 		if (!(gfp_mask & __GFP_NOWARN))
384 			drbd_err(device, "%s: allocation failed\n", __func__);
385 		return NULL;
386 	}
387 
388 	if (nr_pages) {
389 		page = drbd_alloc_pages(peer_device, nr_pages,
390 					gfpflags_allow_blocking(gfp_mask));
391 		if (!page)
392 			goto fail;
393 	}
394 
395 	memset(peer_req, 0, sizeof(*peer_req));
396 	INIT_LIST_HEAD(&peer_req->w.list);
397 	drbd_clear_interval(&peer_req->i);
398 	peer_req->i.size = request_size;
399 	peer_req->i.sector = sector;
400 	peer_req->submit_jif = jiffies;
401 	peer_req->peer_device = peer_device;
402 	peer_req->pages = page;
403 	/*
404 	 * The block_id is opaque to the receiver.  It is not endianness
405 	 * converted, and sent back to the sender unchanged.
406 	 */
407 	peer_req->block_id = id;
408 
409 	return peer_req;
410 
411  fail:
412 	mempool_free(peer_req, drbd_ee_mempool);
413 	return NULL;
414 }
415 
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 		       int is_net)
418 {
419 	might_sleep();
420 	if (peer_req->flags & EE_HAS_DIGEST)
421 		kfree(peer_req->digest);
422 	drbd_free_pages(device, peer_req->pages, is_net);
423 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 		drbd_al_complete_io(device, &peer_req->i);
428 	}
429 	mempool_free(peer_req, drbd_ee_mempool);
430 }
431 
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434 	LIST_HEAD(work_list);
435 	struct drbd_peer_request *peer_req, *t;
436 	int count = 0;
437 	int is_net = list == &device->net_ee;
438 
439 	spin_lock_irq(&device->resource->req_lock);
440 	list_splice_init(list, &work_list);
441 	spin_unlock_irq(&device->resource->req_lock);
442 
443 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 		__drbd_free_peer_req(device, peer_req, is_net);
445 		count++;
446 	}
447 	return count;
448 }
449 
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455 	LIST_HEAD(work_list);
456 	LIST_HEAD(reclaimed);
457 	struct drbd_peer_request *peer_req, *t;
458 	int err = 0;
459 
460 	spin_lock_irq(&device->resource->req_lock);
461 	reclaim_finished_net_peer_reqs(device, &reclaimed);
462 	list_splice_init(&device->done_ee, &work_list);
463 	spin_unlock_irq(&device->resource->req_lock);
464 
465 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 		drbd_free_net_peer_req(device, peer_req);
467 
468 	/* possible callbacks here:
469 	 * e_end_block, and e_end_resync_block, e_send_superseded.
470 	 * all ignore the last argument.
471 	 */
472 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 		int err2;
474 
475 		/* list_del not necessary, next/prev members not touched */
476 		err2 = peer_req->w.cb(&peer_req->w, !!err);
477 		if (!err)
478 			err = err2;
479 		drbd_free_peer_req(device, peer_req);
480 	}
481 	wake_up(&device->ee_wait);
482 
483 	return err;
484 }
485 
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 				     struct list_head *head)
488 {
489 	DEFINE_WAIT(wait);
490 
491 	/* avoids spin_lock/unlock
492 	 * and calling prepare_to_wait in the fast path */
493 	while (!list_empty(head)) {
494 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 		spin_unlock_irq(&device->resource->req_lock);
496 		io_schedule();
497 		finish_wait(&device->ee_wait, &wait);
498 		spin_lock_irq(&device->resource->req_lock);
499 	}
500 }
501 
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 				    struct list_head *head)
504 {
505 	spin_lock_irq(&device->resource->req_lock);
506 	_drbd_wait_ee_list_empty(device, head);
507 	spin_unlock_irq(&device->resource->req_lock);
508 }
509 
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512 	struct kvec iov = {
513 		.iov_base = buf,
514 		.iov_len = size,
515 	};
516 	struct msghdr msg = {
517 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518 	};
519 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521 
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524 	int rv;
525 
526 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527 
528 	if (rv < 0) {
529 		if (rv == -ECONNRESET)
530 			drbd_info(connection, "sock was reset by peer\n");
531 		else if (rv != -ERESTARTSYS)
532 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533 	} else if (rv == 0) {
534 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535 			long t;
536 			rcu_read_lock();
537 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538 			rcu_read_unlock();
539 
540 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541 
542 			if (t)
543 				goto out;
544 		}
545 		drbd_info(connection, "sock was shut down by peer\n");
546 	}
547 
548 	if (rv != size)
549 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550 
551 out:
552 	return rv;
553 }
554 
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557 	int err;
558 
559 	err = drbd_recv(connection, buf, size);
560 	if (err != size) {
561 		if (err >= 0)
562 			err = -EIO;
563 	} else
564 		err = 0;
565 	return err;
566 }
567 
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570 	int err;
571 
572 	err = drbd_recv_all(connection, buf, size);
573 	if (err && !signal_pending(current))
574 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575 	return err;
576 }
577 
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584 		unsigned int rcv)
585 {
586 	/* open coded SO_SNDBUF, SO_RCVBUF */
587 	if (snd) {
588 		sock->sk->sk_sndbuf = snd;
589 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590 	}
591 	if (rcv) {
592 		sock->sk->sk_rcvbuf = rcv;
593 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594 	}
595 }
596 
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599 	const char *what;
600 	struct socket *sock;
601 	struct sockaddr_in6 src_in6;
602 	struct sockaddr_in6 peer_in6;
603 	struct net_conf *nc;
604 	int err, peer_addr_len, my_addr_len;
605 	int sndbuf_size, rcvbuf_size, connect_int;
606 	int disconnect_on_error = 1;
607 
608 	rcu_read_lock();
609 	nc = rcu_dereference(connection->net_conf);
610 	if (!nc) {
611 		rcu_read_unlock();
612 		return NULL;
613 	}
614 	sndbuf_size = nc->sndbuf_size;
615 	rcvbuf_size = nc->rcvbuf_size;
616 	connect_int = nc->connect_int;
617 	rcu_read_unlock();
618 
619 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
621 
622 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623 		src_in6.sin6_port = 0;
624 	else
625 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626 
627 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629 
630 	what = "sock_create_kern";
631 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632 			       SOCK_STREAM, IPPROTO_TCP, &sock);
633 	if (err < 0) {
634 		sock = NULL;
635 		goto out;
636 	}
637 
638 	sock->sk->sk_rcvtimeo =
639 	sock->sk->sk_sndtimeo = connect_int * HZ;
640 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641 
642        /* explicitly bind to the configured IP as source IP
643 	*  for the outgoing connections.
644 	*  This is needed for multihomed hosts and to be
645 	*  able to use lo: interfaces for drbd.
646 	* Make sure to use 0 as port number, so linux selects
647 	*  a free one dynamically.
648 	*/
649 	what = "bind before connect";
650 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651 	if (err < 0)
652 		goto out;
653 
654 	/* connect may fail, peer not yet available.
655 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
656 	disconnect_on_error = 0;
657 	what = "connect";
658 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659 
660 out:
661 	if (err < 0) {
662 		if (sock) {
663 			sock_release(sock);
664 			sock = NULL;
665 		}
666 		switch (-err) {
667 			/* timeout, busy, signal pending */
668 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669 		case EINTR: case ERESTARTSYS:
670 			/* peer not (yet) available, network problem */
671 		case ECONNREFUSED: case ENETUNREACH:
672 		case EHOSTDOWN:    case EHOSTUNREACH:
673 			disconnect_on_error = 0;
674 			break;
675 		default:
676 			drbd_err(connection, "%s failed, err = %d\n", what, err);
677 		}
678 		if (disconnect_on_error)
679 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680 	}
681 
682 	return sock;
683 }
684 
685 struct accept_wait_data {
686 	struct drbd_connection *connection;
687 	struct socket *s_listen;
688 	struct completion door_bell;
689 	void (*original_sk_state_change)(struct sock *sk);
690 
691 };
692 
693 static void drbd_incoming_connection(struct sock *sk)
694 {
695 	struct accept_wait_data *ad = sk->sk_user_data;
696 	void (*state_change)(struct sock *sk);
697 
698 	state_change = ad->original_sk_state_change;
699 	if (sk->sk_state == TCP_ESTABLISHED)
700 		complete(&ad->door_bell);
701 	state_change(sk);
702 }
703 
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
707 	struct sockaddr_in6 my_addr;
708 	struct socket *s_listen;
709 	struct net_conf *nc;
710 	const char *what;
711 
712 	rcu_read_lock();
713 	nc = rcu_dereference(connection->net_conf);
714 	if (!nc) {
715 		rcu_read_unlock();
716 		return -EIO;
717 	}
718 	sndbuf_size = nc->sndbuf_size;
719 	rcvbuf_size = nc->rcvbuf_size;
720 	rcu_read_unlock();
721 
722 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
724 
725 	what = "sock_create_kern";
726 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
728 	if (err) {
729 		s_listen = NULL;
730 		goto out;
731 	}
732 
733 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735 
736 	what = "bind before listen";
737 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738 	if (err < 0)
739 		goto out;
740 
741 	ad->s_listen = s_listen;
742 	write_lock_bh(&s_listen->sk->sk_callback_lock);
743 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
744 	s_listen->sk->sk_state_change = drbd_incoming_connection;
745 	s_listen->sk->sk_user_data = ad;
746 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
747 
748 	what = "listen";
749 	err = s_listen->ops->listen(s_listen, 5);
750 	if (err < 0)
751 		goto out;
752 
753 	return 0;
754 out:
755 	if (s_listen)
756 		sock_release(s_listen);
757 	if (err < 0) {
758 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759 			drbd_err(connection, "%s failed, err = %d\n", what, err);
760 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761 		}
762 	}
763 
764 	return -EIO;
765 }
766 
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769 	write_lock_bh(&sk->sk_callback_lock);
770 	sk->sk_state_change = ad->original_sk_state_change;
771 	sk->sk_user_data = NULL;
772 	write_unlock_bh(&sk->sk_callback_lock);
773 }
774 
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777 	int timeo, connect_int, err = 0;
778 	struct socket *s_estab = NULL;
779 	struct net_conf *nc;
780 
781 	rcu_read_lock();
782 	nc = rcu_dereference(connection->net_conf);
783 	if (!nc) {
784 		rcu_read_unlock();
785 		return NULL;
786 	}
787 	connect_int = nc->connect_int;
788 	rcu_read_unlock();
789 
790 	timeo = connect_int * HZ;
791 	/* 28.5% random jitter */
792 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793 
794 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795 	if (err <= 0)
796 		return NULL;
797 
798 	err = kernel_accept(ad->s_listen, &s_estab, 0);
799 	if (err < 0) {
800 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801 			drbd_err(connection, "accept failed, err = %d\n", err);
802 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803 		}
804 	}
805 
806 	if (s_estab)
807 		unregister_state_change(s_estab->sk, ad);
808 
809 	return s_estab;
810 }
811 
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813 
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815 			     enum drbd_packet cmd)
816 {
817 	if (!conn_prepare_command(connection, sock))
818 		return -EIO;
819 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821 
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824 	unsigned int header_size = drbd_header_size(connection);
825 	struct packet_info pi;
826 	struct net_conf *nc;
827 	int err;
828 
829 	rcu_read_lock();
830 	nc = rcu_dereference(connection->net_conf);
831 	if (!nc) {
832 		rcu_read_unlock();
833 		return -EIO;
834 	}
835 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836 	rcu_read_unlock();
837 
838 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839 	if (err != header_size) {
840 		if (err >= 0)
841 			err = -EIO;
842 		return err;
843 	}
844 	err = decode_header(connection, connection->data.rbuf, &pi);
845 	if (err)
846 		return err;
847 	return pi.cmd;
848 }
849 
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:	pointer to the pointer to the socket.
853  */
854 static bool drbd_socket_okay(struct socket **sock)
855 {
856 	int rr;
857 	char tb[4];
858 
859 	if (!*sock)
860 		return false;
861 
862 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863 
864 	if (rr > 0 || rr == -EAGAIN) {
865 		return true;
866 	} else {
867 		sock_release(*sock);
868 		*sock = NULL;
869 		return false;
870 	}
871 }
872 
873 static bool connection_established(struct drbd_connection *connection,
874 				   struct socket **sock1,
875 				   struct socket **sock2)
876 {
877 	struct net_conf *nc;
878 	int timeout;
879 	bool ok;
880 
881 	if (!*sock1 || !*sock2)
882 		return false;
883 
884 	rcu_read_lock();
885 	nc = rcu_dereference(connection->net_conf);
886 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887 	rcu_read_unlock();
888 	schedule_timeout_interruptible(timeout);
889 
890 	ok = drbd_socket_okay(sock1);
891 	ok = drbd_socket_okay(sock2) && ok;
892 
893 	return ok;
894 }
895 
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900 	struct drbd_device *device = peer_device->device;
901 	int err;
902 
903 	atomic_set(&device->packet_seq, 0);
904 	device->peer_seq = 0;
905 
906 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907 		&peer_device->connection->cstate_mutex :
908 		&device->own_state_mutex;
909 
910 	err = drbd_send_sync_param(peer_device);
911 	if (!err)
912 		err = drbd_send_sizes(peer_device, 0, 0);
913 	if (!err)
914 		err = drbd_send_uuids(peer_device);
915 	if (!err)
916 		err = drbd_send_current_state(peer_device);
917 	clear_bit(USE_DEGR_WFC_T, &device->flags);
918 	clear_bit(RESIZE_PENDING, &device->flags);
919 	atomic_set(&device->ap_in_flight, 0);
920 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921 	return err;
922 }
923 
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
932 static int conn_connect(struct drbd_connection *connection)
933 {
934 	struct drbd_socket sock, msock;
935 	struct drbd_peer_device *peer_device;
936 	struct net_conf *nc;
937 	int vnr, timeout, h;
938 	bool discard_my_data, ok;
939 	enum drbd_state_rv rv;
940 	struct accept_wait_data ad = {
941 		.connection = connection,
942 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943 	};
944 
945 	clear_bit(DISCONNECT_SENT, &connection->flags);
946 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947 		return -2;
948 
949 	mutex_init(&sock.mutex);
950 	sock.sbuf = connection->data.sbuf;
951 	sock.rbuf = connection->data.rbuf;
952 	sock.socket = NULL;
953 	mutex_init(&msock.mutex);
954 	msock.sbuf = connection->meta.sbuf;
955 	msock.rbuf = connection->meta.rbuf;
956 	msock.socket = NULL;
957 
958 	/* Assume that the peer only understands protocol 80 until we know better.  */
959 	connection->agreed_pro_version = 80;
960 
961 	if (prepare_listen_socket(connection, &ad))
962 		return 0;
963 
964 	do {
965 		struct socket *s;
966 
967 		s = drbd_try_connect(connection);
968 		if (s) {
969 			if (!sock.socket) {
970 				sock.socket = s;
971 				send_first_packet(connection, &sock, P_INITIAL_DATA);
972 			} else if (!msock.socket) {
973 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974 				msock.socket = s;
975 				send_first_packet(connection, &msock, P_INITIAL_META);
976 			} else {
977 				drbd_err(connection, "Logic error in conn_connect()\n");
978 				goto out_release_sockets;
979 			}
980 		}
981 
982 		if (connection_established(connection, &sock.socket, &msock.socket))
983 			break;
984 
985 retry:
986 		s = drbd_wait_for_connect(connection, &ad);
987 		if (s) {
988 			int fp = receive_first_packet(connection, s);
989 			drbd_socket_okay(&sock.socket);
990 			drbd_socket_okay(&msock.socket);
991 			switch (fp) {
992 			case P_INITIAL_DATA:
993 				if (sock.socket) {
994 					drbd_warn(connection, "initial packet S crossed\n");
995 					sock_release(sock.socket);
996 					sock.socket = s;
997 					goto randomize;
998 				}
999 				sock.socket = s;
1000 				break;
1001 			case P_INITIAL_META:
1002 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003 				if (msock.socket) {
1004 					drbd_warn(connection, "initial packet M crossed\n");
1005 					sock_release(msock.socket);
1006 					msock.socket = s;
1007 					goto randomize;
1008 				}
1009 				msock.socket = s;
1010 				break;
1011 			default:
1012 				drbd_warn(connection, "Error receiving initial packet\n");
1013 				sock_release(s);
1014 randomize:
1015 				if (prandom_u32() & 1)
1016 					goto retry;
1017 			}
1018 		}
1019 
1020 		if (connection->cstate <= C_DISCONNECTING)
1021 			goto out_release_sockets;
1022 		if (signal_pending(current)) {
1023 			flush_signals(current);
1024 			smp_rmb();
1025 			if (get_t_state(&connection->receiver) == EXITING)
1026 				goto out_release_sockets;
1027 		}
1028 
1029 		ok = connection_established(connection, &sock.socket, &msock.socket);
1030 	} while (!ok);
1031 
1032 	if (ad.s_listen)
1033 		sock_release(ad.s_listen);
1034 
1035 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037 
1038 	sock.socket->sk->sk_allocation = GFP_NOIO;
1039 	msock.socket->sk->sk_allocation = GFP_NOIO;
1040 
1041 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043 
1044 	/* NOT YET ...
1045 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047 	 * first set it to the P_CONNECTION_FEATURES timeout,
1048 	 * which we set to 4x the configured ping_timeout. */
1049 	rcu_read_lock();
1050 	nc = rcu_dereference(connection->net_conf);
1051 
1052 	sock.socket->sk->sk_sndtimeo =
1053 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054 
1055 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056 	timeout = nc->timeout * HZ / 10;
1057 	discard_my_data = nc->discard_my_data;
1058 	rcu_read_unlock();
1059 
1060 	msock.socket->sk->sk_sndtimeo = timeout;
1061 
1062 	/* we don't want delays.
1063 	 * we use TCP_CORK where appropriate, though */
1064 	drbd_tcp_nodelay(sock.socket);
1065 	drbd_tcp_nodelay(msock.socket);
1066 
1067 	connection->data.socket = sock.socket;
1068 	connection->meta.socket = msock.socket;
1069 	connection->last_received = jiffies;
1070 
1071 	h = drbd_do_features(connection);
1072 	if (h <= 0)
1073 		return h;
1074 
1075 	if (connection->cram_hmac_tfm) {
1076 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1077 		switch (drbd_do_auth(connection)) {
1078 		case -1:
1079 			drbd_err(connection, "Authentication of peer failed\n");
1080 			return -1;
1081 		case 0:
1082 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083 			return 0;
1084 		}
1085 	}
1086 
1087 	connection->data.socket->sk->sk_sndtimeo = timeout;
1088 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089 
1090 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091 		return -1;
1092 
1093 	/* Prevent a race between resync-handshake and
1094 	 * being promoted to Primary.
1095 	 *
1096 	 * Grab and release the state mutex, so we know that any current
1097 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1098 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1099 	 */
1100 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 		mutex_lock(peer_device->device->state_mutex);
1102 
1103 	set_bit(STATE_SENT, &connection->flags);
1104 
1105 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1106 		mutex_unlock(peer_device->device->state_mutex);
1107 
1108 	rcu_read_lock();
1109 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1110 		struct drbd_device *device = peer_device->device;
1111 		kref_get(&device->kref);
1112 		rcu_read_unlock();
1113 
1114 		if (discard_my_data)
1115 			set_bit(DISCARD_MY_DATA, &device->flags);
1116 		else
1117 			clear_bit(DISCARD_MY_DATA, &device->flags);
1118 
1119 		drbd_connected(peer_device);
1120 		kref_put(&device->kref, drbd_destroy_device);
1121 		rcu_read_lock();
1122 	}
1123 	rcu_read_unlock();
1124 
1125 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1126 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1127 		clear_bit(STATE_SENT, &connection->flags);
1128 		return 0;
1129 	}
1130 
1131 	drbd_thread_start(&connection->ack_receiver);
1132 	/* opencoded create_singlethread_workqueue(),
1133 	 * to be able to use format string arguments */
1134 	connection->ack_sender =
1135 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1136 	if (!connection->ack_sender) {
1137 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1138 		return 0;
1139 	}
1140 
1141 	mutex_lock(&connection->resource->conf_update);
1142 	/* The discard_my_data flag is a single-shot modifier to the next
1143 	 * connection attempt, the handshake of which is now well underway.
1144 	 * No need for rcu style copying of the whole struct
1145 	 * just to clear a single value. */
1146 	connection->net_conf->discard_my_data = 0;
1147 	mutex_unlock(&connection->resource->conf_update);
1148 
1149 	return h;
1150 
1151 out_release_sockets:
1152 	if (ad.s_listen)
1153 		sock_release(ad.s_listen);
1154 	if (sock.socket)
1155 		sock_release(sock.socket);
1156 	if (msock.socket)
1157 		sock_release(msock.socket);
1158 	return -1;
1159 }
1160 
1161 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1162 {
1163 	unsigned int header_size = drbd_header_size(connection);
1164 
1165 	if (header_size == sizeof(struct p_header100) &&
1166 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1167 		struct p_header100 *h = header;
1168 		if (h->pad != 0) {
1169 			drbd_err(connection, "Header padding is not zero\n");
1170 			return -EINVAL;
1171 		}
1172 		pi->vnr = be16_to_cpu(h->volume);
1173 		pi->cmd = be16_to_cpu(h->command);
1174 		pi->size = be32_to_cpu(h->length);
1175 	} else if (header_size == sizeof(struct p_header95) &&
1176 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1177 		struct p_header95 *h = header;
1178 		pi->cmd = be16_to_cpu(h->command);
1179 		pi->size = be32_to_cpu(h->length);
1180 		pi->vnr = 0;
1181 	} else if (header_size == sizeof(struct p_header80) &&
1182 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1183 		struct p_header80 *h = header;
1184 		pi->cmd = be16_to_cpu(h->command);
1185 		pi->size = be16_to_cpu(h->length);
1186 		pi->vnr = 0;
1187 	} else {
1188 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1189 			 be32_to_cpu(*(__be32 *)header),
1190 			 connection->agreed_pro_version);
1191 		return -EINVAL;
1192 	}
1193 	pi->data = header + header_size;
1194 	return 0;
1195 }
1196 
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198 {
1199 	void *buffer = connection->data.rbuf;
1200 	int err;
1201 
1202 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203 	if (err)
1204 		return err;
1205 
1206 	err = decode_header(connection, buffer, pi);
1207 	connection->last_received = jiffies;
1208 
1209 	return err;
1210 }
1211 
1212 /* This is blkdev_issue_flush, but asynchronous.
1213  * We want to submit to all component volumes in parallel,
1214  * then wait for all completions.
1215  */
1216 struct issue_flush_context {
1217 	atomic_t pending;
1218 	int error;
1219 	struct completion done;
1220 };
1221 struct one_flush_context {
1222 	struct drbd_device *device;
1223 	struct issue_flush_context *ctx;
1224 };
1225 
1226 void one_flush_endio(struct bio *bio)
1227 {
1228 	struct one_flush_context *octx = bio->bi_private;
1229 	struct drbd_device *device = octx->device;
1230 	struct issue_flush_context *ctx = octx->ctx;
1231 
1232 	if (bio->bi_error) {
1233 		ctx->error = bio->bi_error;
1234 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1235 	}
1236 	kfree(octx);
1237 	bio_put(bio);
1238 
1239 	clear_bit(FLUSH_PENDING, &device->flags);
1240 	put_ldev(device);
1241 	kref_put(&device->kref, drbd_destroy_device);
1242 
1243 	if (atomic_dec_and_test(&ctx->pending))
1244 		complete(&ctx->done);
1245 }
1246 
1247 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1248 {
1249 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1250 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1251 	if (!bio || !octx) {
1252 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1253 		/* FIXME: what else can I do now?  disconnecting or detaching
1254 		 * really does not help to improve the state of the world, either.
1255 		 */
1256 		kfree(octx);
1257 		if (bio)
1258 			bio_put(bio);
1259 
1260 		ctx->error = -ENOMEM;
1261 		put_ldev(device);
1262 		kref_put(&device->kref, drbd_destroy_device);
1263 		return;
1264 	}
1265 
1266 	octx->device = device;
1267 	octx->ctx = ctx;
1268 	bio->bi_bdev = device->ldev->backing_bdev;
1269 	bio->bi_private = octx;
1270 	bio->bi_end_io = one_flush_endio;
1271 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1272 
1273 	device->flush_jif = jiffies;
1274 	set_bit(FLUSH_PENDING, &device->flags);
1275 	atomic_inc(&ctx->pending);
1276 	submit_bio(bio);
1277 }
1278 
1279 static void drbd_flush(struct drbd_connection *connection)
1280 {
1281 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1282 		struct drbd_peer_device *peer_device;
1283 		struct issue_flush_context ctx;
1284 		int vnr;
1285 
1286 		atomic_set(&ctx.pending, 1);
1287 		ctx.error = 0;
1288 		init_completion(&ctx.done);
1289 
1290 		rcu_read_lock();
1291 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1292 			struct drbd_device *device = peer_device->device;
1293 
1294 			if (!get_ldev(device))
1295 				continue;
1296 			kref_get(&device->kref);
1297 			rcu_read_unlock();
1298 
1299 			submit_one_flush(device, &ctx);
1300 
1301 			rcu_read_lock();
1302 		}
1303 		rcu_read_unlock();
1304 
1305 		/* Do we want to add a timeout,
1306 		 * if disk-timeout is set? */
1307 		if (!atomic_dec_and_test(&ctx.pending))
1308 			wait_for_completion(&ctx.done);
1309 
1310 		if (ctx.error) {
1311 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1312 			 * don't try again for ANY return value != 0
1313 			 * if (rv == -EOPNOTSUPP) */
1314 			/* Any error is already reported by bio_endio callback. */
1315 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1316 		}
1317 	}
1318 }
1319 
1320 /**
1321  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1322  * @device:	DRBD device.
1323  * @epoch:	Epoch object.
1324  * @ev:		Epoch event.
1325  */
1326 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1327 					       struct drbd_epoch *epoch,
1328 					       enum epoch_event ev)
1329 {
1330 	int epoch_size;
1331 	struct drbd_epoch *next_epoch;
1332 	enum finish_epoch rv = FE_STILL_LIVE;
1333 
1334 	spin_lock(&connection->epoch_lock);
1335 	do {
1336 		next_epoch = NULL;
1337 
1338 		epoch_size = atomic_read(&epoch->epoch_size);
1339 
1340 		switch (ev & ~EV_CLEANUP) {
1341 		case EV_PUT:
1342 			atomic_dec(&epoch->active);
1343 			break;
1344 		case EV_GOT_BARRIER_NR:
1345 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1346 			break;
1347 		case EV_BECAME_LAST:
1348 			/* nothing to do*/
1349 			break;
1350 		}
1351 
1352 		if (epoch_size != 0 &&
1353 		    atomic_read(&epoch->active) == 0 &&
1354 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1355 			if (!(ev & EV_CLEANUP)) {
1356 				spin_unlock(&connection->epoch_lock);
1357 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1358 				spin_lock(&connection->epoch_lock);
1359 			}
1360 #if 0
1361 			/* FIXME: dec unacked on connection, once we have
1362 			 * something to count pending connection packets in. */
1363 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1364 				dec_unacked(epoch->connection);
1365 #endif
1366 
1367 			if (connection->current_epoch != epoch) {
1368 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1369 				list_del(&epoch->list);
1370 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1371 				connection->epochs--;
1372 				kfree(epoch);
1373 
1374 				if (rv == FE_STILL_LIVE)
1375 					rv = FE_DESTROYED;
1376 			} else {
1377 				epoch->flags = 0;
1378 				atomic_set(&epoch->epoch_size, 0);
1379 				/* atomic_set(&epoch->active, 0); is already zero */
1380 				if (rv == FE_STILL_LIVE)
1381 					rv = FE_RECYCLED;
1382 			}
1383 		}
1384 
1385 		if (!next_epoch)
1386 			break;
1387 
1388 		epoch = next_epoch;
1389 	} while (1);
1390 
1391 	spin_unlock(&connection->epoch_lock);
1392 
1393 	return rv;
1394 }
1395 
1396 static enum write_ordering_e
1397 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1398 {
1399 	struct disk_conf *dc;
1400 
1401 	dc = rcu_dereference(bdev->disk_conf);
1402 
1403 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1404 		wo = WO_DRAIN_IO;
1405 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1406 		wo = WO_NONE;
1407 
1408 	return wo;
1409 }
1410 
1411 /**
1412  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1413  * @connection:	DRBD connection.
1414  * @wo:		Write ordering method to try.
1415  */
1416 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1417 			      enum write_ordering_e wo)
1418 {
1419 	struct drbd_device *device;
1420 	enum write_ordering_e pwo;
1421 	int vnr;
1422 	static char *write_ordering_str[] = {
1423 		[WO_NONE] = "none",
1424 		[WO_DRAIN_IO] = "drain",
1425 		[WO_BDEV_FLUSH] = "flush",
1426 	};
1427 
1428 	pwo = resource->write_ordering;
1429 	if (wo != WO_BDEV_FLUSH)
1430 		wo = min(pwo, wo);
1431 	rcu_read_lock();
1432 	idr_for_each_entry(&resource->devices, device, vnr) {
1433 		if (get_ldev(device)) {
1434 			wo = max_allowed_wo(device->ldev, wo);
1435 			if (device->ldev == bdev)
1436 				bdev = NULL;
1437 			put_ldev(device);
1438 		}
1439 	}
1440 
1441 	if (bdev)
1442 		wo = max_allowed_wo(bdev, wo);
1443 
1444 	rcu_read_unlock();
1445 
1446 	resource->write_ordering = wo;
1447 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1448 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1449 }
1450 
1451 /*
1452  * We *may* ignore the discard-zeroes-data setting, if so configured.
1453  *
1454  * Assumption is that it "discard_zeroes_data=0" is only because the backend
1455  * may ignore partial unaligned discards.
1456  *
1457  * LVM/DM thin as of at least
1458  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1459  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1460  *   Driver version:  4.29.0
1461  * still behaves this way.
1462  *
1463  * For unaligned (wrt. alignment and granularity) or too small discards,
1464  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1465  * but discard all the aligned full chunks.
1466  *
1467  * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1468  */
1469 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1470 {
1471 	struct block_device *bdev = device->ldev->backing_bdev;
1472 	struct request_queue *q = bdev_get_queue(bdev);
1473 	sector_t tmp, nr;
1474 	unsigned int max_discard_sectors, granularity;
1475 	int alignment;
1476 	int err = 0;
1477 
1478 	if (!discard)
1479 		goto zero_out;
1480 
1481 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1482 	granularity = max(q->limits.discard_granularity >> 9, 1U);
1483 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1484 
1485 	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1486 	max_discard_sectors -= max_discard_sectors % granularity;
1487 	if (unlikely(!max_discard_sectors))
1488 		goto zero_out;
1489 
1490 	if (nr_sectors < granularity)
1491 		goto zero_out;
1492 
1493 	tmp = start;
1494 	if (sector_div(tmp, granularity) != alignment) {
1495 		if (nr_sectors < 2*granularity)
1496 			goto zero_out;
1497 		/* start + gran - (start + gran - align) % gran */
1498 		tmp = start + granularity - alignment;
1499 		tmp = start + granularity - sector_div(tmp, granularity);
1500 
1501 		nr = tmp - start;
1502 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1503 		nr_sectors -= nr;
1504 		start = tmp;
1505 	}
1506 	while (nr_sectors >= granularity) {
1507 		nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1508 		err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1509 		nr_sectors -= nr;
1510 		start += nr;
1511 	}
1512  zero_out:
1513 	if (nr_sectors) {
1514 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1515 	}
1516 	return err != 0;
1517 }
1518 
1519 static bool can_do_reliable_discards(struct drbd_device *device)
1520 {
1521 	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1522 	struct disk_conf *dc;
1523 	bool can_do;
1524 
1525 	if (!blk_queue_discard(q))
1526 		return false;
1527 
1528 	if (q->limits.discard_zeroes_data)
1529 		return true;
1530 
1531 	rcu_read_lock();
1532 	dc = rcu_dereference(device->ldev->disk_conf);
1533 	can_do = dc->discard_zeroes_if_aligned;
1534 	rcu_read_unlock();
1535 	return can_do;
1536 }
1537 
1538 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1539 {
1540 	/* If the backend cannot discard, or does not guarantee
1541 	 * read-back zeroes in discarded ranges, we fall back to
1542 	 * zero-out.  Unless configuration specifically requested
1543 	 * otherwise. */
1544 	if (!can_do_reliable_discards(device))
1545 		peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1546 
1547 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1548 	    peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1549 		peer_req->flags |= EE_WAS_ERROR;
1550 	drbd_endio_write_sec_final(peer_req);
1551 }
1552 
1553 static void drbd_issue_peer_wsame(struct drbd_device *device,
1554 				  struct drbd_peer_request *peer_req)
1555 {
1556 	struct block_device *bdev = device->ldev->backing_bdev;
1557 	sector_t s = peer_req->i.sector;
1558 	sector_t nr = peer_req->i.size >> 9;
1559 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1560 		peer_req->flags |= EE_WAS_ERROR;
1561 	drbd_endio_write_sec_final(peer_req);
1562 }
1563 
1564 
1565 /**
1566  * drbd_submit_peer_request()
1567  * @device:	DRBD device.
1568  * @peer_req:	peer request
1569  * @rw:		flag field, see bio->bi_opf
1570  *
1571  * May spread the pages to multiple bios,
1572  * depending on bio_add_page restrictions.
1573  *
1574  * Returns 0 if all bios have been submitted,
1575  * -ENOMEM if we could not allocate enough bios,
1576  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1577  *  single page to an empty bio (which should never happen and likely indicates
1578  *  that the lower level IO stack is in some way broken). This has been observed
1579  *  on certain Xen deployments.
1580  */
1581 /* TODO allocate from our own bio_set. */
1582 int drbd_submit_peer_request(struct drbd_device *device,
1583 			     struct drbd_peer_request *peer_req,
1584 			     const unsigned op, const unsigned op_flags,
1585 			     const int fault_type)
1586 {
1587 	struct bio *bios = NULL;
1588 	struct bio *bio;
1589 	struct page *page = peer_req->pages;
1590 	sector_t sector = peer_req->i.sector;
1591 	unsigned data_size = peer_req->i.size;
1592 	unsigned n_bios = 0;
1593 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1594 	int err = -ENOMEM;
1595 
1596 	/* TRIM/DISCARD: for now, always use the helper function
1597 	 * blkdev_issue_zeroout(..., discard=true).
1598 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1599 	 * Correctness first, performance later.  Next step is to code an
1600 	 * asynchronous variant of the same.
1601 	 */
1602 	if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1603 		/* wait for all pending IO completions, before we start
1604 		 * zeroing things out. */
1605 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1606 		/* add it to the active list now,
1607 		 * so we can find it to present it in debugfs */
1608 		peer_req->submit_jif = jiffies;
1609 		peer_req->flags |= EE_SUBMITTED;
1610 
1611 		/* If this was a resync request from receive_rs_deallocated(),
1612 		 * it is already on the sync_ee list */
1613 		if (list_empty(&peer_req->w.list)) {
1614 			spin_lock_irq(&device->resource->req_lock);
1615 			list_add_tail(&peer_req->w.list, &device->active_ee);
1616 			spin_unlock_irq(&device->resource->req_lock);
1617 		}
1618 
1619 		if (peer_req->flags & EE_IS_TRIM)
1620 			drbd_issue_peer_discard(device, peer_req);
1621 		else /* EE_WRITE_SAME */
1622 			drbd_issue_peer_wsame(device, peer_req);
1623 		return 0;
1624 	}
1625 
1626 	/* In most cases, we will only need one bio.  But in case the lower
1627 	 * level restrictions happen to be different at this offset on this
1628 	 * side than those of the sending peer, we may need to submit the
1629 	 * request in more than one bio.
1630 	 *
1631 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1632 	 * generated bio, but a bio allocated on behalf of the peer.
1633 	 */
1634 next_bio:
1635 	bio = bio_alloc(GFP_NOIO, nr_pages);
1636 	if (!bio) {
1637 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1638 		goto fail;
1639 	}
1640 	/* > peer_req->i.sector, unless this is the first bio */
1641 	bio->bi_iter.bi_sector = sector;
1642 	bio->bi_bdev = device->ldev->backing_bdev;
1643 	bio_set_op_attrs(bio, op, op_flags);
1644 	bio->bi_private = peer_req;
1645 	bio->bi_end_io = drbd_peer_request_endio;
1646 
1647 	bio->bi_next = bios;
1648 	bios = bio;
1649 	++n_bios;
1650 
1651 	page_chain_for_each(page) {
1652 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1653 		if (!bio_add_page(bio, page, len, 0))
1654 			goto next_bio;
1655 		data_size -= len;
1656 		sector += len >> 9;
1657 		--nr_pages;
1658 	}
1659 	D_ASSERT(device, data_size == 0);
1660 	D_ASSERT(device, page == NULL);
1661 
1662 	atomic_set(&peer_req->pending_bios, n_bios);
1663 	/* for debugfs: update timestamp, mark as submitted */
1664 	peer_req->submit_jif = jiffies;
1665 	peer_req->flags |= EE_SUBMITTED;
1666 	do {
1667 		bio = bios;
1668 		bios = bios->bi_next;
1669 		bio->bi_next = NULL;
1670 
1671 		drbd_generic_make_request(device, fault_type, bio);
1672 	} while (bios);
1673 	return 0;
1674 
1675 fail:
1676 	while (bios) {
1677 		bio = bios;
1678 		bios = bios->bi_next;
1679 		bio_put(bio);
1680 	}
1681 	return err;
1682 }
1683 
1684 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1685 					     struct drbd_peer_request *peer_req)
1686 {
1687 	struct drbd_interval *i = &peer_req->i;
1688 
1689 	drbd_remove_interval(&device->write_requests, i);
1690 	drbd_clear_interval(i);
1691 
1692 	/* Wake up any processes waiting for this peer request to complete.  */
1693 	if (i->waiting)
1694 		wake_up(&device->misc_wait);
1695 }
1696 
1697 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1698 {
1699 	struct drbd_peer_device *peer_device;
1700 	int vnr;
1701 
1702 	rcu_read_lock();
1703 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1704 		struct drbd_device *device = peer_device->device;
1705 
1706 		kref_get(&device->kref);
1707 		rcu_read_unlock();
1708 		drbd_wait_ee_list_empty(device, &device->active_ee);
1709 		kref_put(&device->kref, drbd_destroy_device);
1710 		rcu_read_lock();
1711 	}
1712 	rcu_read_unlock();
1713 }
1714 
1715 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1716 {
1717 	int rv;
1718 	struct p_barrier *p = pi->data;
1719 	struct drbd_epoch *epoch;
1720 
1721 	/* FIXME these are unacked on connection,
1722 	 * not a specific (peer)device.
1723 	 */
1724 	connection->current_epoch->barrier_nr = p->barrier;
1725 	connection->current_epoch->connection = connection;
1726 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1727 
1728 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1729 	 * the activity log, which means it would not be resynced in case the
1730 	 * R_PRIMARY crashes now.
1731 	 * Therefore we must send the barrier_ack after the barrier request was
1732 	 * completed. */
1733 	switch (connection->resource->write_ordering) {
1734 	case WO_NONE:
1735 		if (rv == FE_RECYCLED)
1736 			return 0;
1737 
1738 		/* receiver context, in the writeout path of the other node.
1739 		 * avoid potential distributed deadlock */
1740 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1741 		if (epoch)
1742 			break;
1743 		else
1744 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1745 			/* Fall through */
1746 
1747 	case WO_BDEV_FLUSH:
1748 	case WO_DRAIN_IO:
1749 		conn_wait_active_ee_empty(connection);
1750 		drbd_flush(connection);
1751 
1752 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1753 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1754 			if (epoch)
1755 				break;
1756 		}
1757 
1758 		return 0;
1759 	default:
1760 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1761 			 connection->resource->write_ordering);
1762 		return -EIO;
1763 	}
1764 
1765 	epoch->flags = 0;
1766 	atomic_set(&epoch->epoch_size, 0);
1767 	atomic_set(&epoch->active, 0);
1768 
1769 	spin_lock(&connection->epoch_lock);
1770 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1771 		list_add(&epoch->list, &connection->current_epoch->list);
1772 		connection->current_epoch = epoch;
1773 		connection->epochs++;
1774 	} else {
1775 		/* The current_epoch got recycled while we allocated this one... */
1776 		kfree(epoch);
1777 	}
1778 	spin_unlock(&connection->epoch_lock);
1779 
1780 	return 0;
1781 }
1782 
1783 /* quick wrapper in case payload size != request_size (write same) */
1784 static void drbd_csum_ee_size(struct crypto_ahash *h,
1785 			      struct drbd_peer_request *r, void *d,
1786 			      unsigned int payload_size)
1787 {
1788 	unsigned int tmp = r->i.size;
1789 	r->i.size = payload_size;
1790 	drbd_csum_ee(h, r, d);
1791 	r->i.size = tmp;
1792 }
1793 
1794 /* used from receive_RSDataReply (recv_resync_read)
1795  * and from receive_Data.
1796  * data_size: actual payload ("data in")
1797  * 	for normal writes that is bi_size.
1798  * 	for discards, that is zero.
1799  * 	for write same, it is logical_block_size.
1800  * both trim and write same have the bi_size ("data len to be affected")
1801  * as extra argument in the packet header.
1802  */
1803 static struct drbd_peer_request *
1804 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1805 	      struct packet_info *pi) __must_hold(local)
1806 {
1807 	struct drbd_device *device = peer_device->device;
1808 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1809 	struct drbd_peer_request *peer_req;
1810 	struct page *page;
1811 	int digest_size, err;
1812 	unsigned int data_size = pi->size, ds;
1813 	void *dig_in = peer_device->connection->int_dig_in;
1814 	void *dig_vv = peer_device->connection->int_dig_vv;
1815 	unsigned long *data;
1816 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1817 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1818 
1819 	digest_size = 0;
1820 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1821 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1822 		/*
1823 		 * FIXME: Receive the incoming digest into the receive buffer
1824 		 *	  here, together with its struct p_data?
1825 		 */
1826 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1827 		if (err)
1828 			return NULL;
1829 		data_size -= digest_size;
1830 	}
1831 
1832 	/* assume request_size == data_size, but special case trim and wsame. */
1833 	ds = data_size;
1834 	if (trim) {
1835 		if (!expect(data_size == 0))
1836 			return NULL;
1837 		ds = be32_to_cpu(trim->size);
1838 	} else if (wsame) {
1839 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1840 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1841 				data_size, queue_logical_block_size(device->rq_queue));
1842 			return NULL;
1843 		}
1844 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1845 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1846 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1847 			return NULL;
1848 		}
1849 		ds = be32_to_cpu(wsame->size);
1850 	}
1851 
1852 	if (!expect(IS_ALIGNED(ds, 512)))
1853 		return NULL;
1854 	if (trim || wsame) {
1855 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1856 			return NULL;
1857 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1858 		return NULL;
1859 
1860 	/* even though we trust out peer,
1861 	 * we sometimes have to double check. */
1862 	if (sector + (ds>>9) > capacity) {
1863 		drbd_err(device, "request from peer beyond end of local disk: "
1864 			"capacity: %llus < sector: %llus + size: %u\n",
1865 			(unsigned long long)capacity,
1866 			(unsigned long long)sector, ds);
1867 		return NULL;
1868 	}
1869 
1870 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1871 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1872 	 * which in turn might block on the other node at this very place.  */
1873 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1874 	if (!peer_req)
1875 		return NULL;
1876 
1877 	peer_req->flags |= EE_WRITE;
1878 	if (trim) {
1879 		peer_req->flags |= EE_IS_TRIM;
1880 		return peer_req;
1881 	}
1882 	if (wsame)
1883 		peer_req->flags |= EE_WRITE_SAME;
1884 
1885 	/* receive payload size bytes into page chain */
1886 	ds = data_size;
1887 	page = peer_req->pages;
1888 	page_chain_for_each(page) {
1889 		unsigned len = min_t(int, ds, PAGE_SIZE);
1890 		data = kmap(page);
1891 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1892 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1893 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1894 			data[0] = data[0] ^ (unsigned long)-1;
1895 		}
1896 		kunmap(page);
1897 		if (err) {
1898 			drbd_free_peer_req(device, peer_req);
1899 			return NULL;
1900 		}
1901 		ds -= len;
1902 	}
1903 
1904 	if (digest_size) {
1905 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1906 		if (memcmp(dig_in, dig_vv, digest_size)) {
1907 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1908 				(unsigned long long)sector, data_size);
1909 			drbd_free_peer_req(device, peer_req);
1910 			return NULL;
1911 		}
1912 	}
1913 	device->recv_cnt += data_size >> 9;
1914 	return peer_req;
1915 }
1916 
1917 /* drbd_drain_block() just takes a data block
1918  * out of the socket input buffer, and discards it.
1919  */
1920 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1921 {
1922 	struct page *page;
1923 	int err = 0;
1924 	void *data;
1925 
1926 	if (!data_size)
1927 		return 0;
1928 
1929 	page = drbd_alloc_pages(peer_device, 1, 1);
1930 
1931 	data = kmap(page);
1932 	while (data_size) {
1933 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1934 
1935 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1936 		if (err)
1937 			break;
1938 		data_size -= len;
1939 	}
1940 	kunmap(page);
1941 	drbd_free_pages(peer_device->device, page, 0);
1942 	return err;
1943 }
1944 
1945 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1946 			   sector_t sector, int data_size)
1947 {
1948 	struct bio_vec bvec;
1949 	struct bvec_iter iter;
1950 	struct bio *bio;
1951 	int digest_size, err, expect;
1952 	void *dig_in = peer_device->connection->int_dig_in;
1953 	void *dig_vv = peer_device->connection->int_dig_vv;
1954 
1955 	digest_size = 0;
1956 	if (peer_device->connection->peer_integrity_tfm) {
1957 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1958 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1959 		if (err)
1960 			return err;
1961 		data_size -= digest_size;
1962 	}
1963 
1964 	/* optimistically update recv_cnt.  if receiving fails below,
1965 	 * we disconnect anyways, and counters will be reset. */
1966 	peer_device->device->recv_cnt += data_size>>9;
1967 
1968 	bio = req->master_bio;
1969 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1970 
1971 	bio_for_each_segment(bvec, bio, iter) {
1972 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1973 		expect = min_t(int, data_size, bvec.bv_len);
1974 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1975 		kunmap(bvec.bv_page);
1976 		if (err)
1977 			return err;
1978 		data_size -= expect;
1979 	}
1980 
1981 	if (digest_size) {
1982 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1983 		if (memcmp(dig_in, dig_vv, digest_size)) {
1984 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1985 			return -EINVAL;
1986 		}
1987 	}
1988 
1989 	D_ASSERT(peer_device->device, data_size == 0);
1990 	return 0;
1991 }
1992 
1993 /*
1994  * e_end_resync_block() is called in ack_sender context via
1995  * drbd_finish_peer_reqs().
1996  */
1997 static int e_end_resync_block(struct drbd_work *w, int unused)
1998 {
1999 	struct drbd_peer_request *peer_req =
2000 		container_of(w, struct drbd_peer_request, w);
2001 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2002 	struct drbd_device *device = peer_device->device;
2003 	sector_t sector = peer_req->i.sector;
2004 	int err;
2005 
2006 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2007 
2008 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2009 		drbd_set_in_sync(device, sector, peer_req->i.size);
2010 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2011 	} else {
2012 		/* Record failure to sync */
2013 		drbd_rs_failed_io(device, sector, peer_req->i.size);
2014 
2015 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2016 	}
2017 	dec_unacked(device);
2018 
2019 	return err;
2020 }
2021 
2022 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2023 			    struct packet_info *pi) __releases(local)
2024 {
2025 	struct drbd_device *device = peer_device->device;
2026 	struct drbd_peer_request *peer_req;
2027 
2028 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2029 	if (!peer_req)
2030 		goto fail;
2031 
2032 	dec_rs_pending(device);
2033 
2034 	inc_unacked(device);
2035 	/* corresponding dec_unacked() in e_end_resync_block()
2036 	 * respective _drbd_clear_done_ee */
2037 
2038 	peer_req->w.cb = e_end_resync_block;
2039 	peer_req->submit_jif = jiffies;
2040 
2041 	spin_lock_irq(&device->resource->req_lock);
2042 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2043 	spin_unlock_irq(&device->resource->req_lock);
2044 
2045 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2046 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2047 				     DRBD_FAULT_RS_WR) == 0)
2048 		return 0;
2049 
2050 	/* don't care for the reason here */
2051 	drbd_err(device, "submit failed, triggering re-connect\n");
2052 	spin_lock_irq(&device->resource->req_lock);
2053 	list_del(&peer_req->w.list);
2054 	spin_unlock_irq(&device->resource->req_lock);
2055 
2056 	drbd_free_peer_req(device, peer_req);
2057 fail:
2058 	put_ldev(device);
2059 	return -EIO;
2060 }
2061 
2062 static struct drbd_request *
2063 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2064 	     sector_t sector, bool missing_ok, const char *func)
2065 {
2066 	struct drbd_request *req;
2067 
2068 	/* Request object according to our peer */
2069 	req = (struct drbd_request *)(unsigned long)id;
2070 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2071 		return req;
2072 	if (!missing_ok) {
2073 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2074 			(unsigned long)id, (unsigned long long)sector);
2075 	}
2076 	return NULL;
2077 }
2078 
2079 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2080 {
2081 	struct drbd_peer_device *peer_device;
2082 	struct drbd_device *device;
2083 	struct drbd_request *req;
2084 	sector_t sector;
2085 	int err;
2086 	struct p_data *p = pi->data;
2087 
2088 	peer_device = conn_peer_device(connection, pi->vnr);
2089 	if (!peer_device)
2090 		return -EIO;
2091 	device = peer_device->device;
2092 
2093 	sector = be64_to_cpu(p->sector);
2094 
2095 	spin_lock_irq(&device->resource->req_lock);
2096 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2097 	spin_unlock_irq(&device->resource->req_lock);
2098 	if (unlikely(!req))
2099 		return -EIO;
2100 
2101 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2102 	 * special casing it there for the various failure cases.
2103 	 * still no race with drbd_fail_pending_reads */
2104 	err = recv_dless_read(peer_device, req, sector, pi->size);
2105 	if (!err)
2106 		req_mod(req, DATA_RECEIVED);
2107 	/* else: nothing. handled from drbd_disconnect...
2108 	 * I don't think we may complete this just yet
2109 	 * in case we are "on-disconnect: freeze" */
2110 
2111 	return err;
2112 }
2113 
2114 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2115 {
2116 	struct drbd_peer_device *peer_device;
2117 	struct drbd_device *device;
2118 	sector_t sector;
2119 	int err;
2120 	struct p_data *p = pi->data;
2121 
2122 	peer_device = conn_peer_device(connection, pi->vnr);
2123 	if (!peer_device)
2124 		return -EIO;
2125 	device = peer_device->device;
2126 
2127 	sector = be64_to_cpu(p->sector);
2128 	D_ASSERT(device, p->block_id == ID_SYNCER);
2129 
2130 	if (get_ldev(device)) {
2131 		/* data is submitted to disk within recv_resync_read.
2132 		 * corresponding put_ldev done below on error,
2133 		 * or in drbd_peer_request_endio. */
2134 		err = recv_resync_read(peer_device, sector, pi);
2135 	} else {
2136 		if (__ratelimit(&drbd_ratelimit_state))
2137 			drbd_err(device, "Can not write resync data to local disk.\n");
2138 
2139 		err = drbd_drain_block(peer_device, pi->size);
2140 
2141 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2142 	}
2143 
2144 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2145 
2146 	return err;
2147 }
2148 
2149 static void restart_conflicting_writes(struct drbd_device *device,
2150 				       sector_t sector, int size)
2151 {
2152 	struct drbd_interval *i;
2153 	struct drbd_request *req;
2154 
2155 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2156 		if (!i->local)
2157 			continue;
2158 		req = container_of(i, struct drbd_request, i);
2159 		if (req->rq_state & RQ_LOCAL_PENDING ||
2160 		    !(req->rq_state & RQ_POSTPONED))
2161 			continue;
2162 		/* as it is RQ_POSTPONED, this will cause it to
2163 		 * be queued on the retry workqueue. */
2164 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2165 	}
2166 }
2167 
2168 /*
2169  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2170  */
2171 static int e_end_block(struct drbd_work *w, int cancel)
2172 {
2173 	struct drbd_peer_request *peer_req =
2174 		container_of(w, struct drbd_peer_request, w);
2175 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2176 	struct drbd_device *device = peer_device->device;
2177 	sector_t sector = peer_req->i.sector;
2178 	int err = 0, pcmd;
2179 
2180 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2181 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2182 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2183 				device->state.conn <= C_PAUSED_SYNC_T &&
2184 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2185 				P_RS_WRITE_ACK : P_WRITE_ACK;
2186 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2187 			if (pcmd == P_RS_WRITE_ACK)
2188 				drbd_set_in_sync(device, sector, peer_req->i.size);
2189 		} else {
2190 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2191 			/* we expect it to be marked out of sync anyways...
2192 			 * maybe assert this?  */
2193 		}
2194 		dec_unacked(device);
2195 	}
2196 
2197 	/* we delete from the conflict detection hash _after_ we sent out the
2198 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2199 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2200 		spin_lock_irq(&device->resource->req_lock);
2201 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2202 		drbd_remove_epoch_entry_interval(device, peer_req);
2203 		if (peer_req->flags & EE_RESTART_REQUESTS)
2204 			restart_conflicting_writes(device, sector, peer_req->i.size);
2205 		spin_unlock_irq(&device->resource->req_lock);
2206 	} else
2207 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2208 
2209 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2210 
2211 	return err;
2212 }
2213 
2214 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2215 {
2216 	struct drbd_peer_request *peer_req =
2217 		container_of(w, struct drbd_peer_request, w);
2218 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2219 	int err;
2220 
2221 	err = drbd_send_ack(peer_device, ack, peer_req);
2222 	dec_unacked(peer_device->device);
2223 
2224 	return err;
2225 }
2226 
2227 static int e_send_superseded(struct drbd_work *w, int unused)
2228 {
2229 	return e_send_ack(w, P_SUPERSEDED);
2230 }
2231 
2232 static int e_send_retry_write(struct drbd_work *w, int unused)
2233 {
2234 	struct drbd_peer_request *peer_req =
2235 		container_of(w, struct drbd_peer_request, w);
2236 	struct drbd_connection *connection = peer_req->peer_device->connection;
2237 
2238 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2239 			     P_RETRY_WRITE : P_SUPERSEDED);
2240 }
2241 
2242 static bool seq_greater(u32 a, u32 b)
2243 {
2244 	/*
2245 	 * We assume 32-bit wrap-around here.
2246 	 * For 24-bit wrap-around, we would have to shift:
2247 	 *  a <<= 8; b <<= 8;
2248 	 */
2249 	return (s32)a - (s32)b > 0;
2250 }
2251 
2252 static u32 seq_max(u32 a, u32 b)
2253 {
2254 	return seq_greater(a, b) ? a : b;
2255 }
2256 
2257 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2258 {
2259 	struct drbd_device *device = peer_device->device;
2260 	unsigned int newest_peer_seq;
2261 
2262 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2263 		spin_lock(&device->peer_seq_lock);
2264 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2265 		device->peer_seq = newest_peer_seq;
2266 		spin_unlock(&device->peer_seq_lock);
2267 		/* wake up only if we actually changed device->peer_seq */
2268 		if (peer_seq == newest_peer_seq)
2269 			wake_up(&device->seq_wait);
2270 	}
2271 }
2272 
2273 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2274 {
2275 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2276 }
2277 
2278 /* maybe change sync_ee into interval trees as well? */
2279 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2280 {
2281 	struct drbd_peer_request *rs_req;
2282 	bool rv = false;
2283 
2284 	spin_lock_irq(&device->resource->req_lock);
2285 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2286 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2287 			     rs_req->i.sector, rs_req->i.size)) {
2288 			rv = true;
2289 			break;
2290 		}
2291 	}
2292 	spin_unlock_irq(&device->resource->req_lock);
2293 
2294 	return rv;
2295 }
2296 
2297 /* Called from receive_Data.
2298  * Synchronize packets on sock with packets on msock.
2299  *
2300  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2301  * packet traveling on msock, they are still processed in the order they have
2302  * been sent.
2303  *
2304  * Note: we don't care for Ack packets overtaking P_DATA packets.
2305  *
2306  * In case packet_seq is larger than device->peer_seq number, there are
2307  * outstanding packets on the msock. We wait for them to arrive.
2308  * In case we are the logically next packet, we update device->peer_seq
2309  * ourselves. Correctly handles 32bit wrap around.
2310  *
2311  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2312  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2313  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2314  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2315  *
2316  * returns 0 if we may process the packet,
2317  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2318 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2319 {
2320 	struct drbd_device *device = peer_device->device;
2321 	DEFINE_WAIT(wait);
2322 	long timeout;
2323 	int ret = 0, tp;
2324 
2325 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2326 		return 0;
2327 
2328 	spin_lock(&device->peer_seq_lock);
2329 	for (;;) {
2330 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2331 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2332 			break;
2333 		}
2334 
2335 		if (signal_pending(current)) {
2336 			ret = -ERESTARTSYS;
2337 			break;
2338 		}
2339 
2340 		rcu_read_lock();
2341 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2342 		rcu_read_unlock();
2343 
2344 		if (!tp)
2345 			break;
2346 
2347 		/* Only need to wait if two_primaries is enabled */
2348 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2349 		spin_unlock(&device->peer_seq_lock);
2350 		rcu_read_lock();
2351 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2352 		rcu_read_unlock();
2353 		timeout = schedule_timeout(timeout);
2354 		spin_lock(&device->peer_seq_lock);
2355 		if (!timeout) {
2356 			ret = -ETIMEDOUT;
2357 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2358 			break;
2359 		}
2360 	}
2361 	spin_unlock(&device->peer_seq_lock);
2362 	finish_wait(&device->seq_wait, &wait);
2363 	return ret;
2364 }
2365 
2366 /* see also bio_flags_to_wire()
2367  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2368  * flags and back. We may replicate to other kernel versions. */
2369 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2370 {
2371 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2372 		(dpf & DP_FUA ? REQ_FUA : 0) |
2373 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2374 }
2375 
2376 static unsigned long wire_flags_to_bio_op(u32 dpf)
2377 {
2378 	if (dpf & DP_DISCARD)
2379 		return REQ_OP_DISCARD;
2380 	else
2381 		return REQ_OP_WRITE;
2382 }
2383 
2384 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2385 				    unsigned int size)
2386 {
2387 	struct drbd_interval *i;
2388 
2389     repeat:
2390 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2391 		struct drbd_request *req;
2392 		struct bio_and_error m;
2393 
2394 		if (!i->local)
2395 			continue;
2396 		req = container_of(i, struct drbd_request, i);
2397 		if (!(req->rq_state & RQ_POSTPONED))
2398 			continue;
2399 		req->rq_state &= ~RQ_POSTPONED;
2400 		__req_mod(req, NEG_ACKED, &m);
2401 		spin_unlock_irq(&device->resource->req_lock);
2402 		if (m.bio)
2403 			complete_master_bio(device, &m);
2404 		spin_lock_irq(&device->resource->req_lock);
2405 		goto repeat;
2406 	}
2407 }
2408 
2409 static int handle_write_conflicts(struct drbd_device *device,
2410 				  struct drbd_peer_request *peer_req)
2411 {
2412 	struct drbd_connection *connection = peer_req->peer_device->connection;
2413 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2414 	sector_t sector = peer_req->i.sector;
2415 	const unsigned int size = peer_req->i.size;
2416 	struct drbd_interval *i;
2417 	bool equal;
2418 	int err;
2419 
2420 	/*
2421 	 * Inserting the peer request into the write_requests tree will prevent
2422 	 * new conflicting local requests from being added.
2423 	 */
2424 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2425 
2426     repeat:
2427 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2428 		if (i == &peer_req->i)
2429 			continue;
2430 		if (i->completed)
2431 			continue;
2432 
2433 		if (!i->local) {
2434 			/*
2435 			 * Our peer has sent a conflicting remote request; this
2436 			 * should not happen in a two-node setup.  Wait for the
2437 			 * earlier peer request to complete.
2438 			 */
2439 			err = drbd_wait_misc(device, i);
2440 			if (err)
2441 				goto out;
2442 			goto repeat;
2443 		}
2444 
2445 		equal = i->sector == sector && i->size == size;
2446 		if (resolve_conflicts) {
2447 			/*
2448 			 * If the peer request is fully contained within the
2449 			 * overlapping request, it can be considered overwritten
2450 			 * and thus superseded; otherwise, it will be retried
2451 			 * once all overlapping requests have completed.
2452 			 */
2453 			bool superseded = i->sector <= sector && i->sector +
2454 				       (i->size >> 9) >= sector + (size >> 9);
2455 
2456 			if (!equal)
2457 				drbd_alert(device, "Concurrent writes detected: "
2458 					       "local=%llus +%u, remote=%llus +%u, "
2459 					       "assuming %s came first\n",
2460 					  (unsigned long long)i->sector, i->size,
2461 					  (unsigned long long)sector, size,
2462 					  superseded ? "local" : "remote");
2463 
2464 			peer_req->w.cb = superseded ? e_send_superseded :
2465 						   e_send_retry_write;
2466 			list_add_tail(&peer_req->w.list, &device->done_ee);
2467 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2468 
2469 			err = -ENOENT;
2470 			goto out;
2471 		} else {
2472 			struct drbd_request *req =
2473 				container_of(i, struct drbd_request, i);
2474 
2475 			if (!equal)
2476 				drbd_alert(device, "Concurrent writes detected: "
2477 					       "local=%llus +%u, remote=%llus +%u\n",
2478 					  (unsigned long long)i->sector, i->size,
2479 					  (unsigned long long)sector, size);
2480 
2481 			if (req->rq_state & RQ_LOCAL_PENDING ||
2482 			    !(req->rq_state & RQ_POSTPONED)) {
2483 				/*
2484 				 * Wait for the node with the discard flag to
2485 				 * decide if this request has been superseded
2486 				 * or needs to be retried.
2487 				 * Requests that have been superseded will
2488 				 * disappear from the write_requests tree.
2489 				 *
2490 				 * In addition, wait for the conflicting
2491 				 * request to finish locally before submitting
2492 				 * the conflicting peer request.
2493 				 */
2494 				err = drbd_wait_misc(device, &req->i);
2495 				if (err) {
2496 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2497 					fail_postponed_requests(device, sector, size);
2498 					goto out;
2499 				}
2500 				goto repeat;
2501 			}
2502 			/*
2503 			 * Remember to restart the conflicting requests after
2504 			 * the new peer request has completed.
2505 			 */
2506 			peer_req->flags |= EE_RESTART_REQUESTS;
2507 		}
2508 	}
2509 	err = 0;
2510 
2511     out:
2512 	if (err)
2513 		drbd_remove_epoch_entry_interval(device, peer_req);
2514 	return err;
2515 }
2516 
2517 /* mirrored write */
2518 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2519 {
2520 	struct drbd_peer_device *peer_device;
2521 	struct drbd_device *device;
2522 	struct net_conf *nc;
2523 	sector_t sector;
2524 	struct drbd_peer_request *peer_req;
2525 	struct p_data *p = pi->data;
2526 	u32 peer_seq = be32_to_cpu(p->seq_num);
2527 	int op, op_flags;
2528 	u32 dp_flags;
2529 	int err, tp;
2530 
2531 	peer_device = conn_peer_device(connection, pi->vnr);
2532 	if (!peer_device)
2533 		return -EIO;
2534 	device = peer_device->device;
2535 
2536 	if (!get_ldev(device)) {
2537 		int err2;
2538 
2539 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2540 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2541 		atomic_inc(&connection->current_epoch->epoch_size);
2542 		err2 = drbd_drain_block(peer_device, pi->size);
2543 		if (!err)
2544 			err = err2;
2545 		return err;
2546 	}
2547 
2548 	/*
2549 	 * Corresponding put_ldev done either below (on various errors), or in
2550 	 * drbd_peer_request_endio, if we successfully submit the data at the
2551 	 * end of this function.
2552 	 */
2553 
2554 	sector = be64_to_cpu(p->sector);
2555 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2556 	if (!peer_req) {
2557 		put_ldev(device);
2558 		return -EIO;
2559 	}
2560 
2561 	peer_req->w.cb = e_end_block;
2562 	peer_req->submit_jif = jiffies;
2563 	peer_req->flags |= EE_APPLICATION;
2564 
2565 	dp_flags = be32_to_cpu(p->dp_flags);
2566 	op = wire_flags_to_bio_op(dp_flags);
2567 	op_flags = wire_flags_to_bio_flags(dp_flags);
2568 	if (pi->cmd == P_TRIM) {
2569 		D_ASSERT(peer_device, peer_req->i.size > 0);
2570 		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2571 		D_ASSERT(peer_device, peer_req->pages == NULL);
2572 	} else if (peer_req->pages == NULL) {
2573 		D_ASSERT(device, peer_req->i.size == 0);
2574 		D_ASSERT(device, dp_flags & DP_FLUSH);
2575 	}
2576 
2577 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2578 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2579 
2580 	spin_lock(&connection->epoch_lock);
2581 	peer_req->epoch = connection->current_epoch;
2582 	atomic_inc(&peer_req->epoch->epoch_size);
2583 	atomic_inc(&peer_req->epoch->active);
2584 	spin_unlock(&connection->epoch_lock);
2585 
2586 	rcu_read_lock();
2587 	nc = rcu_dereference(peer_device->connection->net_conf);
2588 	tp = nc->two_primaries;
2589 	if (peer_device->connection->agreed_pro_version < 100) {
2590 		switch (nc->wire_protocol) {
2591 		case DRBD_PROT_C:
2592 			dp_flags |= DP_SEND_WRITE_ACK;
2593 			break;
2594 		case DRBD_PROT_B:
2595 			dp_flags |= DP_SEND_RECEIVE_ACK;
2596 			break;
2597 		}
2598 	}
2599 	rcu_read_unlock();
2600 
2601 	if (dp_flags & DP_SEND_WRITE_ACK) {
2602 		peer_req->flags |= EE_SEND_WRITE_ACK;
2603 		inc_unacked(device);
2604 		/* corresponding dec_unacked() in e_end_block()
2605 		 * respective _drbd_clear_done_ee */
2606 	}
2607 
2608 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2609 		/* I really don't like it that the receiver thread
2610 		 * sends on the msock, but anyways */
2611 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2612 	}
2613 
2614 	if (tp) {
2615 		/* two primaries implies protocol C */
2616 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2617 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2618 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2619 		if (err)
2620 			goto out_interrupted;
2621 		spin_lock_irq(&device->resource->req_lock);
2622 		err = handle_write_conflicts(device, peer_req);
2623 		if (err) {
2624 			spin_unlock_irq(&device->resource->req_lock);
2625 			if (err == -ENOENT) {
2626 				put_ldev(device);
2627 				return 0;
2628 			}
2629 			goto out_interrupted;
2630 		}
2631 	} else {
2632 		update_peer_seq(peer_device, peer_seq);
2633 		spin_lock_irq(&device->resource->req_lock);
2634 	}
2635 	/* TRIM and WRITE_SAME are processed synchronously,
2636 	 * we wait for all pending requests, respectively wait for
2637 	 * active_ee to become empty in drbd_submit_peer_request();
2638 	 * better not add ourselves here. */
2639 	if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2640 		list_add_tail(&peer_req->w.list, &device->active_ee);
2641 	spin_unlock_irq(&device->resource->req_lock);
2642 
2643 	if (device->state.conn == C_SYNC_TARGET)
2644 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2645 
2646 	if (device->state.pdsk < D_INCONSISTENT) {
2647 		/* In case we have the only disk of the cluster, */
2648 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2649 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2650 		drbd_al_begin_io(device, &peer_req->i);
2651 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2652 	}
2653 
2654 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2655 				       DRBD_FAULT_DT_WR);
2656 	if (!err)
2657 		return 0;
2658 
2659 	/* don't care for the reason here */
2660 	drbd_err(device, "submit failed, triggering re-connect\n");
2661 	spin_lock_irq(&device->resource->req_lock);
2662 	list_del(&peer_req->w.list);
2663 	drbd_remove_epoch_entry_interval(device, peer_req);
2664 	spin_unlock_irq(&device->resource->req_lock);
2665 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2666 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2667 		drbd_al_complete_io(device, &peer_req->i);
2668 	}
2669 
2670 out_interrupted:
2671 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2672 	put_ldev(device);
2673 	drbd_free_peer_req(device, peer_req);
2674 	return err;
2675 }
2676 
2677 /* We may throttle resync, if the lower device seems to be busy,
2678  * and current sync rate is above c_min_rate.
2679  *
2680  * To decide whether or not the lower device is busy, we use a scheme similar
2681  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2682  * (more than 64 sectors) of activity we cannot account for with our own resync
2683  * activity, it obviously is "busy".
2684  *
2685  * The current sync rate used here uses only the most recent two step marks,
2686  * to have a short time average so we can react faster.
2687  */
2688 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2689 		bool throttle_if_app_is_waiting)
2690 {
2691 	struct lc_element *tmp;
2692 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2693 
2694 	if (!throttle || throttle_if_app_is_waiting)
2695 		return throttle;
2696 
2697 	spin_lock_irq(&device->al_lock);
2698 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2699 	if (tmp) {
2700 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2701 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2702 			throttle = false;
2703 		/* Do not slow down if app IO is already waiting for this extent,
2704 		 * and our progress is necessary for application IO to complete. */
2705 	}
2706 	spin_unlock_irq(&device->al_lock);
2707 
2708 	return throttle;
2709 }
2710 
2711 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2712 {
2713 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2714 	unsigned long db, dt, dbdt;
2715 	unsigned int c_min_rate;
2716 	int curr_events;
2717 
2718 	rcu_read_lock();
2719 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2720 	rcu_read_unlock();
2721 
2722 	/* feature disabled? */
2723 	if (c_min_rate == 0)
2724 		return false;
2725 
2726 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2727 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2728 			atomic_read(&device->rs_sect_ev);
2729 
2730 	if (atomic_read(&device->ap_actlog_cnt)
2731 	    || curr_events - device->rs_last_events > 64) {
2732 		unsigned long rs_left;
2733 		int i;
2734 
2735 		device->rs_last_events = curr_events;
2736 
2737 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2738 		 * approx. */
2739 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2740 
2741 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2742 			rs_left = device->ov_left;
2743 		else
2744 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2745 
2746 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2747 		if (!dt)
2748 			dt++;
2749 		db = device->rs_mark_left[i] - rs_left;
2750 		dbdt = Bit2KB(db/dt);
2751 
2752 		if (dbdt > c_min_rate)
2753 			return true;
2754 	}
2755 	return false;
2756 }
2757 
2758 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2759 {
2760 	struct drbd_peer_device *peer_device;
2761 	struct drbd_device *device;
2762 	sector_t sector;
2763 	sector_t capacity;
2764 	struct drbd_peer_request *peer_req;
2765 	struct digest_info *di = NULL;
2766 	int size, verb;
2767 	unsigned int fault_type;
2768 	struct p_block_req *p =	pi->data;
2769 
2770 	peer_device = conn_peer_device(connection, pi->vnr);
2771 	if (!peer_device)
2772 		return -EIO;
2773 	device = peer_device->device;
2774 	capacity = drbd_get_capacity(device->this_bdev);
2775 
2776 	sector = be64_to_cpu(p->sector);
2777 	size   = be32_to_cpu(p->blksize);
2778 
2779 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2780 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2781 				(unsigned long long)sector, size);
2782 		return -EINVAL;
2783 	}
2784 	if (sector + (size>>9) > capacity) {
2785 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2786 				(unsigned long long)sector, size);
2787 		return -EINVAL;
2788 	}
2789 
2790 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2791 		verb = 1;
2792 		switch (pi->cmd) {
2793 		case P_DATA_REQUEST:
2794 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2795 			break;
2796 		case P_RS_THIN_REQ:
2797 		case P_RS_DATA_REQUEST:
2798 		case P_CSUM_RS_REQUEST:
2799 		case P_OV_REQUEST:
2800 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2801 			break;
2802 		case P_OV_REPLY:
2803 			verb = 0;
2804 			dec_rs_pending(device);
2805 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2806 			break;
2807 		default:
2808 			BUG();
2809 		}
2810 		if (verb && __ratelimit(&drbd_ratelimit_state))
2811 			drbd_err(device, "Can not satisfy peer's read request, "
2812 			    "no local data.\n");
2813 
2814 		/* drain possibly payload */
2815 		return drbd_drain_block(peer_device, pi->size);
2816 	}
2817 
2818 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2819 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2820 	 * which in turn might block on the other node at this very place.  */
2821 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2822 			size, GFP_NOIO);
2823 	if (!peer_req) {
2824 		put_ldev(device);
2825 		return -ENOMEM;
2826 	}
2827 
2828 	switch (pi->cmd) {
2829 	case P_DATA_REQUEST:
2830 		peer_req->w.cb = w_e_end_data_req;
2831 		fault_type = DRBD_FAULT_DT_RD;
2832 		/* application IO, don't drbd_rs_begin_io */
2833 		peer_req->flags |= EE_APPLICATION;
2834 		goto submit;
2835 
2836 	case P_RS_THIN_REQ:
2837 		/* If at some point in the future we have a smart way to
2838 		   find out if this data block is completely deallocated,
2839 		   then we would do something smarter here than reading
2840 		   the block... */
2841 		peer_req->flags |= EE_RS_THIN_REQ;
2842 	case P_RS_DATA_REQUEST:
2843 		peer_req->w.cb = w_e_end_rsdata_req;
2844 		fault_type = DRBD_FAULT_RS_RD;
2845 		/* used in the sector offset progress display */
2846 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2847 		break;
2848 
2849 	case P_OV_REPLY:
2850 	case P_CSUM_RS_REQUEST:
2851 		fault_type = DRBD_FAULT_RS_RD;
2852 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2853 		if (!di)
2854 			goto out_free_e;
2855 
2856 		di->digest_size = pi->size;
2857 		di->digest = (((char *)di)+sizeof(struct digest_info));
2858 
2859 		peer_req->digest = di;
2860 		peer_req->flags |= EE_HAS_DIGEST;
2861 
2862 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2863 			goto out_free_e;
2864 
2865 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2866 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2867 			peer_req->w.cb = w_e_end_csum_rs_req;
2868 			/* used in the sector offset progress display */
2869 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2870 			/* remember to report stats in drbd_resync_finished */
2871 			device->use_csums = true;
2872 		} else if (pi->cmd == P_OV_REPLY) {
2873 			/* track progress, we may need to throttle */
2874 			atomic_add(size >> 9, &device->rs_sect_in);
2875 			peer_req->w.cb = w_e_end_ov_reply;
2876 			dec_rs_pending(device);
2877 			/* drbd_rs_begin_io done when we sent this request,
2878 			 * but accounting still needs to be done. */
2879 			goto submit_for_resync;
2880 		}
2881 		break;
2882 
2883 	case P_OV_REQUEST:
2884 		if (device->ov_start_sector == ~(sector_t)0 &&
2885 		    peer_device->connection->agreed_pro_version >= 90) {
2886 			unsigned long now = jiffies;
2887 			int i;
2888 			device->ov_start_sector = sector;
2889 			device->ov_position = sector;
2890 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2891 			device->rs_total = device->ov_left;
2892 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2893 				device->rs_mark_left[i] = device->ov_left;
2894 				device->rs_mark_time[i] = now;
2895 			}
2896 			drbd_info(device, "Online Verify start sector: %llu\n",
2897 					(unsigned long long)sector);
2898 		}
2899 		peer_req->w.cb = w_e_end_ov_req;
2900 		fault_type = DRBD_FAULT_RS_RD;
2901 		break;
2902 
2903 	default:
2904 		BUG();
2905 	}
2906 
2907 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2908 	 * wrt the receiver, but it is not as straightforward as it may seem.
2909 	 * Various places in the resync start and stop logic assume resync
2910 	 * requests are processed in order, requeuing this on the worker thread
2911 	 * introduces a bunch of new code for synchronization between threads.
2912 	 *
2913 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2914 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2915 	 * for application writes for the same time.  For now, just throttle
2916 	 * here, where the rest of the code expects the receiver to sleep for
2917 	 * a while, anyways.
2918 	 */
2919 
2920 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2921 	 * this defers syncer requests for some time, before letting at least
2922 	 * on request through.  The resync controller on the receiving side
2923 	 * will adapt to the incoming rate accordingly.
2924 	 *
2925 	 * We cannot throttle here if remote is Primary/SyncTarget:
2926 	 * we would also throttle its application reads.
2927 	 * In that case, throttling is done on the SyncTarget only.
2928 	 */
2929 
2930 	/* Even though this may be a resync request, we do add to "read_ee";
2931 	 * "sync_ee" is only used for resync WRITEs.
2932 	 * Add to list early, so debugfs can find this request
2933 	 * even if we have to sleep below. */
2934 	spin_lock_irq(&device->resource->req_lock);
2935 	list_add_tail(&peer_req->w.list, &device->read_ee);
2936 	spin_unlock_irq(&device->resource->req_lock);
2937 
2938 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2939 	if (device->state.peer != R_PRIMARY
2940 	&& drbd_rs_should_slow_down(device, sector, false))
2941 		schedule_timeout_uninterruptible(HZ/10);
2942 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2943 	if (drbd_rs_begin_io(device, sector))
2944 		goto out_free_e;
2945 
2946 submit_for_resync:
2947 	atomic_add(size >> 9, &device->rs_sect_ev);
2948 
2949 submit:
2950 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2951 	inc_unacked(device);
2952 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2953 				     fault_type) == 0)
2954 		return 0;
2955 
2956 	/* don't care for the reason here */
2957 	drbd_err(device, "submit failed, triggering re-connect\n");
2958 
2959 out_free_e:
2960 	spin_lock_irq(&device->resource->req_lock);
2961 	list_del(&peer_req->w.list);
2962 	spin_unlock_irq(&device->resource->req_lock);
2963 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2964 
2965 	put_ldev(device);
2966 	drbd_free_peer_req(device, peer_req);
2967 	return -EIO;
2968 }
2969 
2970 /**
2971  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2972  */
2973 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2974 {
2975 	struct drbd_device *device = peer_device->device;
2976 	int self, peer, rv = -100;
2977 	unsigned long ch_self, ch_peer;
2978 	enum drbd_after_sb_p after_sb_0p;
2979 
2980 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2981 	peer = device->p_uuid[UI_BITMAP] & 1;
2982 
2983 	ch_peer = device->p_uuid[UI_SIZE];
2984 	ch_self = device->comm_bm_set;
2985 
2986 	rcu_read_lock();
2987 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2988 	rcu_read_unlock();
2989 	switch (after_sb_0p) {
2990 	case ASB_CONSENSUS:
2991 	case ASB_DISCARD_SECONDARY:
2992 	case ASB_CALL_HELPER:
2993 	case ASB_VIOLENTLY:
2994 		drbd_err(device, "Configuration error.\n");
2995 		break;
2996 	case ASB_DISCONNECT:
2997 		break;
2998 	case ASB_DISCARD_YOUNGER_PRI:
2999 		if (self == 0 && peer == 1) {
3000 			rv = -1;
3001 			break;
3002 		}
3003 		if (self == 1 && peer == 0) {
3004 			rv =  1;
3005 			break;
3006 		}
3007 		/* Else fall through to one of the other strategies... */
3008 	case ASB_DISCARD_OLDER_PRI:
3009 		if (self == 0 && peer == 1) {
3010 			rv = 1;
3011 			break;
3012 		}
3013 		if (self == 1 && peer == 0) {
3014 			rv = -1;
3015 			break;
3016 		}
3017 		/* Else fall through to one of the other strategies... */
3018 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3019 		     "Using discard-least-changes instead\n");
3020 	case ASB_DISCARD_ZERO_CHG:
3021 		if (ch_peer == 0 && ch_self == 0) {
3022 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3023 				? -1 : 1;
3024 			break;
3025 		} else {
3026 			if (ch_peer == 0) { rv =  1; break; }
3027 			if (ch_self == 0) { rv = -1; break; }
3028 		}
3029 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3030 			break;
3031 	case ASB_DISCARD_LEAST_CHG:
3032 		if	(ch_self < ch_peer)
3033 			rv = -1;
3034 		else if (ch_self > ch_peer)
3035 			rv =  1;
3036 		else /* ( ch_self == ch_peer ) */
3037 		     /* Well, then use something else. */
3038 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3039 				? -1 : 1;
3040 		break;
3041 	case ASB_DISCARD_LOCAL:
3042 		rv = -1;
3043 		break;
3044 	case ASB_DISCARD_REMOTE:
3045 		rv =  1;
3046 	}
3047 
3048 	return rv;
3049 }
3050 
3051 /**
3052  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3053  */
3054 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3055 {
3056 	struct drbd_device *device = peer_device->device;
3057 	int hg, rv = -100;
3058 	enum drbd_after_sb_p after_sb_1p;
3059 
3060 	rcu_read_lock();
3061 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3062 	rcu_read_unlock();
3063 	switch (after_sb_1p) {
3064 	case ASB_DISCARD_YOUNGER_PRI:
3065 	case ASB_DISCARD_OLDER_PRI:
3066 	case ASB_DISCARD_LEAST_CHG:
3067 	case ASB_DISCARD_LOCAL:
3068 	case ASB_DISCARD_REMOTE:
3069 	case ASB_DISCARD_ZERO_CHG:
3070 		drbd_err(device, "Configuration error.\n");
3071 		break;
3072 	case ASB_DISCONNECT:
3073 		break;
3074 	case ASB_CONSENSUS:
3075 		hg = drbd_asb_recover_0p(peer_device);
3076 		if (hg == -1 && device->state.role == R_SECONDARY)
3077 			rv = hg;
3078 		if (hg == 1  && device->state.role == R_PRIMARY)
3079 			rv = hg;
3080 		break;
3081 	case ASB_VIOLENTLY:
3082 		rv = drbd_asb_recover_0p(peer_device);
3083 		break;
3084 	case ASB_DISCARD_SECONDARY:
3085 		return device->state.role == R_PRIMARY ? 1 : -1;
3086 	case ASB_CALL_HELPER:
3087 		hg = drbd_asb_recover_0p(peer_device);
3088 		if (hg == -1 && device->state.role == R_PRIMARY) {
3089 			enum drbd_state_rv rv2;
3090 
3091 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3092 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3093 			  * we do not need to wait for the after state change work either. */
3094 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3095 			if (rv2 != SS_SUCCESS) {
3096 				drbd_khelper(device, "pri-lost-after-sb");
3097 			} else {
3098 				drbd_warn(device, "Successfully gave up primary role.\n");
3099 				rv = hg;
3100 			}
3101 		} else
3102 			rv = hg;
3103 	}
3104 
3105 	return rv;
3106 }
3107 
3108 /**
3109  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3110  */
3111 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3112 {
3113 	struct drbd_device *device = peer_device->device;
3114 	int hg, rv = -100;
3115 	enum drbd_after_sb_p after_sb_2p;
3116 
3117 	rcu_read_lock();
3118 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3119 	rcu_read_unlock();
3120 	switch (after_sb_2p) {
3121 	case ASB_DISCARD_YOUNGER_PRI:
3122 	case ASB_DISCARD_OLDER_PRI:
3123 	case ASB_DISCARD_LEAST_CHG:
3124 	case ASB_DISCARD_LOCAL:
3125 	case ASB_DISCARD_REMOTE:
3126 	case ASB_CONSENSUS:
3127 	case ASB_DISCARD_SECONDARY:
3128 	case ASB_DISCARD_ZERO_CHG:
3129 		drbd_err(device, "Configuration error.\n");
3130 		break;
3131 	case ASB_VIOLENTLY:
3132 		rv = drbd_asb_recover_0p(peer_device);
3133 		break;
3134 	case ASB_DISCONNECT:
3135 		break;
3136 	case ASB_CALL_HELPER:
3137 		hg = drbd_asb_recover_0p(peer_device);
3138 		if (hg == -1) {
3139 			enum drbd_state_rv rv2;
3140 
3141 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3142 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3143 			  * we do not need to wait for the after state change work either. */
3144 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3145 			if (rv2 != SS_SUCCESS) {
3146 				drbd_khelper(device, "pri-lost-after-sb");
3147 			} else {
3148 				drbd_warn(device, "Successfully gave up primary role.\n");
3149 				rv = hg;
3150 			}
3151 		} else
3152 			rv = hg;
3153 	}
3154 
3155 	return rv;
3156 }
3157 
3158 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3159 			   u64 bits, u64 flags)
3160 {
3161 	if (!uuid) {
3162 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3163 		return;
3164 	}
3165 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3166 	     text,
3167 	     (unsigned long long)uuid[UI_CURRENT],
3168 	     (unsigned long long)uuid[UI_BITMAP],
3169 	     (unsigned long long)uuid[UI_HISTORY_START],
3170 	     (unsigned long long)uuid[UI_HISTORY_END],
3171 	     (unsigned long long)bits,
3172 	     (unsigned long long)flags);
3173 }
3174 
3175 /*
3176   100	after split brain try auto recover
3177     2	C_SYNC_SOURCE set BitMap
3178     1	C_SYNC_SOURCE use BitMap
3179     0	no Sync
3180    -1	C_SYNC_TARGET use BitMap
3181    -2	C_SYNC_TARGET set BitMap
3182  -100	after split brain, disconnect
3183 -1000	unrelated data
3184 -1091   requires proto 91
3185 -1096   requires proto 96
3186  */
3187 
3188 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3189 {
3190 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3191 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3192 	u64 self, peer;
3193 	int i, j;
3194 
3195 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3196 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3197 
3198 	*rule_nr = 10;
3199 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3200 		return 0;
3201 
3202 	*rule_nr = 20;
3203 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3204 	     peer != UUID_JUST_CREATED)
3205 		return -2;
3206 
3207 	*rule_nr = 30;
3208 	if (self != UUID_JUST_CREATED &&
3209 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3210 		return 2;
3211 
3212 	if (self == peer) {
3213 		int rct, dc; /* roles at crash time */
3214 
3215 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3216 
3217 			if (connection->agreed_pro_version < 91)
3218 				return -1091;
3219 
3220 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3221 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3222 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3223 				drbd_uuid_move_history(device);
3224 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3225 				device->ldev->md.uuid[UI_BITMAP] = 0;
3226 
3227 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3228 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3229 				*rule_nr = 34;
3230 			} else {
3231 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3232 				*rule_nr = 36;
3233 			}
3234 
3235 			return 1;
3236 		}
3237 
3238 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3239 
3240 			if (connection->agreed_pro_version < 91)
3241 				return -1091;
3242 
3243 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3244 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3245 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3246 
3247 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3248 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3249 				device->p_uuid[UI_BITMAP] = 0UL;
3250 
3251 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3252 				*rule_nr = 35;
3253 			} else {
3254 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3255 				*rule_nr = 37;
3256 			}
3257 
3258 			return -1;
3259 		}
3260 
3261 		/* Common power [off|failure] */
3262 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3263 			(device->p_uuid[UI_FLAGS] & 2);
3264 		/* lowest bit is set when we were primary,
3265 		 * next bit (weight 2) is set when peer was primary */
3266 		*rule_nr = 40;
3267 
3268 		/* Neither has the "crashed primary" flag set,
3269 		 * only a replication link hickup. */
3270 		if (rct == 0)
3271 			return 0;
3272 
3273 		/* Current UUID equal and no bitmap uuid; does not necessarily
3274 		 * mean this was a "simultaneous hard crash", maybe IO was
3275 		 * frozen, so no UUID-bump happened.
3276 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3277 		 * for "new-enough" peer DRBD version. */
3278 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3279 			*rule_nr = 41;
3280 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3281 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3282 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3283 			}
3284 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3285 				/* At least one has the "crashed primary" bit set,
3286 				 * both are primary now, but neither has rotated its UUIDs?
3287 				 * "Can not happen." */
3288 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3289 				return -100;
3290 			}
3291 			if (device->state.role == R_PRIMARY)
3292 				return 1;
3293 			return -1;
3294 		}
3295 
3296 		/* Both are secondary.
3297 		 * Really looks like recovery from simultaneous hard crash.
3298 		 * Check which had been primary before, and arbitrate. */
3299 		switch (rct) {
3300 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3301 		case 1: /*  self_pri && !peer_pri */ return 1;
3302 		case 2: /* !self_pri &&  peer_pri */ return -1;
3303 		case 3: /*  self_pri &&  peer_pri */
3304 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3305 			return dc ? -1 : 1;
3306 		}
3307 	}
3308 
3309 	*rule_nr = 50;
3310 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3311 	if (self == peer)
3312 		return -1;
3313 
3314 	*rule_nr = 51;
3315 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3316 	if (self == peer) {
3317 		if (connection->agreed_pro_version < 96 ?
3318 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3319 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3320 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3321 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3322 			   resync as sync source modifications of the peer's UUIDs. */
3323 
3324 			if (connection->agreed_pro_version < 91)
3325 				return -1091;
3326 
3327 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3328 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3329 
3330 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3331 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3332 
3333 			return -1;
3334 		}
3335 	}
3336 
3337 	*rule_nr = 60;
3338 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3339 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3340 		peer = device->p_uuid[i] & ~((u64)1);
3341 		if (self == peer)
3342 			return -2;
3343 	}
3344 
3345 	*rule_nr = 70;
3346 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3347 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3348 	if (self == peer)
3349 		return 1;
3350 
3351 	*rule_nr = 71;
3352 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3353 	if (self == peer) {
3354 		if (connection->agreed_pro_version < 96 ?
3355 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3356 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3357 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3358 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3359 			   resync as sync source modifications of our UUIDs. */
3360 
3361 			if (connection->agreed_pro_version < 91)
3362 				return -1091;
3363 
3364 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3365 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3366 
3367 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3368 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3369 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3370 
3371 			return 1;
3372 		}
3373 	}
3374 
3375 
3376 	*rule_nr = 80;
3377 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3378 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3379 		self = device->ldev->md.uuid[i] & ~((u64)1);
3380 		if (self == peer)
3381 			return 2;
3382 	}
3383 
3384 	*rule_nr = 90;
3385 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3386 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3387 	if (self == peer && self != ((u64)0))
3388 		return 100;
3389 
3390 	*rule_nr = 100;
3391 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3392 		self = device->ldev->md.uuid[i] & ~((u64)1);
3393 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3394 			peer = device->p_uuid[j] & ~((u64)1);
3395 			if (self == peer)
3396 				return -100;
3397 		}
3398 	}
3399 
3400 	return -1000;
3401 }
3402 
3403 /* drbd_sync_handshake() returns the new conn state on success, or
3404    CONN_MASK (-1) on failure.
3405  */
3406 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3407 					   enum drbd_role peer_role,
3408 					   enum drbd_disk_state peer_disk) __must_hold(local)
3409 {
3410 	struct drbd_device *device = peer_device->device;
3411 	enum drbd_conns rv = C_MASK;
3412 	enum drbd_disk_state mydisk;
3413 	struct net_conf *nc;
3414 	int hg, rule_nr, rr_conflict, tentative;
3415 
3416 	mydisk = device->state.disk;
3417 	if (mydisk == D_NEGOTIATING)
3418 		mydisk = device->new_state_tmp.disk;
3419 
3420 	drbd_info(device, "drbd_sync_handshake:\n");
3421 
3422 	spin_lock_irq(&device->ldev->md.uuid_lock);
3423 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3424 	drbd_uuid_dump(device, "peer", device->p_uuid,
3425 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3426 
3427 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3428 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3429 
3430 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3431 
3432 	if (hg == -1000) {
3433 		drbd_alert(device, "Unrelated data, aborting!\n");
3434 		return C_MASK;
3435 	}
3436 	if (hg < -0x10000) {
3437 		int proto, fflags;
3438 		hg = -hg;
3439 		proto = hg & 0xff;
3440 		fflags = (hg >> 8) & 0xff;
3441 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3442 					proto, fflags);
3443 		return C_MASK;
3444 	}
3445 	if (hg < -1000) {
3446 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3447 		return C_MASK;
3448 	}
3449 
3450 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3451 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3452 		int f = (hg == -100) || abs(hg) == 2;
3453 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3454 		if (f)
3455 			hg = hg*2;
3456 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3457 		     hg > 0 ? "source" : "target");
3458 	}
3459 
3460 	if (abs(hg) == 100)
3461 		drbd_khelper(device, "initial-split-brain");
3462 
3463 	rcu_read_lock();
3464 	nc = rcu_dereference(peer_device->connection->net_conf);
3465 
3466 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3467 		int pcount = (device->state.role == R_PRIMARY)
3468 			   + (peer_role == R_PRIMARY);
3469 		int forced = (hg == -100);
3470 
3471 		switch (pcount) {
3472 		case 0:
3473 			hg = drbd_asb_recover_0p(peer_device);
3474 			break;
3475 		case 1:
3476 			hg = drbd_asb_recover_1p(peer_device);
3477 			break;
3478 		case 2:
3479 			hg = drbd_asb_recover_2p(peer_device);
3480 			break;
3481 		}
3482 		if (abs(hg) < 100) {
3483 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3484 			     "automatically solved. Sync from %s node\n",
3485 			     pcount, (hg < 0) ? "peer" : "this");
3486 			if (forced) {
3487 				drbd_warn(device, "Doing a full sync, since"
3488 				     " UUIDs where ambiguous.\n");
3489 				hg = hg*2;
3490 			}
3491 		}
3492 	}
3493 
3494 	if (hg == -100) {
3495 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3496 			hg = -1;
3497 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3498 			hg = 1;
3499 
3500 		if (abs(hg) < 100)
3501 			drbd_warn(device, "Split-Brain detected, manually solved. "
3502 			     "Sync from %s node\n",
3503 			     (hg < 0) ? "peer" : "this");
3504 	}
3505 	rr_conflict = nc->rr_conflict;
3506 	tentative = nc->tentative;
3507 	rcu_read_unlock();
3508 
3509 	if (hg == -100) {
3510 		/* FIXME this log message is not correct if we end up here
3511 		 * after an attempted attach on a diskless node.
3512 		 * We just refuse to attach -- well, we drop the "connection"
3513 		 * to that disk, in a way... */
3514 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3515 		drbd_khelper(device, "split-brain");
3516 		return C_MASK;
3517 	}
3518 
3519 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3520 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3521 		return C_MASK;
3522 	}
3523 
3524 	if (hg < 0 && /* by intention we do not use mydisk here. */
3525 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3526 		switch (rr_conflict) {
3527 		case ASB_CALL_HELPER:
3528 			drbd_khelper(device, "pri-lost");
3529 			/* fall through */
3530 		case ASB_DISCONNECT:
3531 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3532 			return C_MASK;
3533 		case ASB_VIOLENTLY:
3534 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3535 			     "assumption\n");
3536 		}
3537 	}
3538 
3539 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3540 		if (hg == 0)
3541 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3542 		else
3543 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3544 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3545 				 abs(hg) >= 2 ? "full" : "bit-map based");
3546 		return C_MASK;
3547 	}
3548 
3549 	if (abs(hg) >= 2) {
3550 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3551 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3552 					BM_LOCKED_SET_ALLOWED))
3553 			return C_MASK;
3554 	}
3555 
3556 	if (hg > 0) { /* become sync source. */
3557 		rv = C_WF_BITMAP_S;
3558 	} else if (hg < 0) { /* become sync target */
3559 		rv = C_WF_BITMAP_T;
3560 	} else {
3561 		rv = C_CONNECTED;
3562 		if (drbd_bm_total_weight(device)) {
3563 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3564 			     drbd_bm_total_weight(device));
3565 		}
3566 	}
3567 
3568 	return rv;
3569 }
3570 
3571 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3572 {
3573 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3574 	if (peer == ASB_DISCARD_REMOTE)
3575 		return ASB_DISCARD_LOCAL;
3576 
3577 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3578 	if (peer == ASB_DISCARD_LOCAL)
3579 		return ASB_DISCARD_REMOTE;
3580 
3581 	/* everything else is valid if they are equal on both sides. */
3582 	return peer;
3583 }
3584 
3585 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3586 {
3587 	struct p_protocol *p = pi->data;
3588 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3589 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3590 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3591 	char integrity_alg[SHARED_SECRET_MAX] = "";
3592 	struct crypto_ahash *peer_integrity_tfm = NULL;
3593 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3594 
3595 	p_proto		= be32_to_cpu(p->protocol);
3596 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3597 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3598 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3599 	p_two_primaries = be32_to_cpu(p->two_primaries);
3600 	cf		= be32_to_cpu(p->conn_flags);
3601 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3602 
3603 	if (connection->agreed_pro_version >= 87) {
3604 		int err;
3605 
3606 		if (pi->size > sizeof(integrity_alg))
3607 			return -EIO;
3608 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3609 		if (err)
3610 			return err;
3611 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3612 	}
3613 
3614 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3615 		clear_bit(CONN_DRY_RUN, &connection->flags);
3616 
3617 		if (cf & CF_DRY_RUN)
3618 			set_bit(CONN_DRY_RUN, &connection->flags);
3619 
3620 		rcu_read_lock();
3621 		nc = rcu_dereference(connection->net_conf);
3622 
3623 		if (p_proto != nc->wire_protocol) {
3624 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3625 			goto disconnect_rcu_unlock;
3626 		}
3627 
3628 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3629 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3630 			goto disconnect_rcu_unlock;
3631 		}
3632 
3633 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3634 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3635 			goto disconnect_rcu_unlock;
3636 		}
3637 
3638 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3639 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3640 			goto disconnect_rcu_unlock;
3641 		}
3642 
3643 		if (p_discard_my_data && nc->discard_my_data) {
3644 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3645 			goto disconnect_rcu_unlock;
3646 		}
3647 
3648 		if (p_two_primaries != nc->two_primaries) {
3649 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3650 			goto disconnect_rcu_unlock;
3651 		}
3652 
3653 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3654 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3655 			goto disconnect_rcu_unlock;
3656 		}
3657 
3658 		rcu_read_unlock();
3659 	}
3660 
3661 	if (integrity_alg[0]) {
3662 		int hash_size;
3663 
3664 		/*
3665 		 * We can only change the peer data integrity algorithm
3666 		 * here.  Changing our own data integrity algorithm
3667 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3668 		 * the same time; otherwise, the peer has no way to
3669 		 * tell between which packets the algorithm should
3670 		 * change.
3671 		 */
3672 
3673 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3674 		if (IS_ERR(peer_integrity_tfm)) {
3675 			peer_integrity_tfm = NULL;
3676 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3677 				 integrity_alg);
3678 			goto disconnect;
3679 		}
3680 
3681 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3682 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3683 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3684 		if (!(int_dig_in && int_dig_vv)) {
3685 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3686 			goto disconnect;
3687 		}
3688 	}
3689 
3690 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3691 	if (!new_net_conf) {
3692 		drbd_err(connection, "Allocation of new net_conf failed\n");
3693 		goto disconnect;
3694 	}
3695 
3696 	mutex_lock(&connection->data.mutex);
3697 	mutex_lock(&connection->resource->conf_update);
3698 	old_net_conf = connection->net_conf;
3699 	*new_net_conf = *old_net_conf;
3700 
3701 	new_net_conf->wire_protocol = p_proto;
3702 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3703 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3704 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3705 	new_net_conf->two_primaries = p_two_primaries;
3706 
3707 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3708 	mutex_unlock(&connection->resource->conf_update);
3709 	mutex_unlock(&connection->data.mutex);
3710 
3711 	crypto_free_ahash(connection->peer_integrity_tfm);
3712 	kfree(connection->int_dig_in);
3713 	kfree(connection->int_dig_vv);
3714 	connection->peer_integrity_tfm = peer_integrity_tfm;
3715 	connection->int_dig_in = int_dig_in;
3716 	connection->int_dig_vv = int_dig_vv;
3717 
3718 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3719 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3720 			  integrity_alg[0] ? integrity_alg : "(none)");
3721 
3722 	synchronize_rcu();
3723 	kfree(old_net_conf);
3724 	return 0;
3725 
3726 disconnect_rcu_unlock:
3727 	rcu_read_unlock();
3728 disconnect:
3729 	crypto_free_ahash(peer_integrity_tfm);
3730 	kfree(int_dig_in);
3731 	kfree(int_dig_vv);
3732 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3733 	return -EIO;
3734 }
3735 
3736 /* helper function
3737  * input: alg name, feature name
3738  * return: NULL (alg name was "")
3739  *         ERR_PTR(error) if something goes wrong
3740  *         or the crypto hash ptr, if it worked out ok. */
3741 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3742 		const char *alg, const char *name)
3743 {
3744 	struct crypto_ahash *tfm;
3745 
3746 	if (!alg[0])
3747 		return NULL;
3748 
3749 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3750 	if (IS_ERR(tfm)) {
3751 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3752 			alg, name, PTR_ERR(tfm));
3753 		return tfm;
3754 	}
3755 	return tfm;
3756 }
3757 
3758 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3759 {
3760 	void *buffer = connection->data.rbuf;
3761 	int size = pi->size;
3762 
3763 	while (size) {
3764 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3765 		s = drbd_recv(connection, buffer, s);
3766 		if (s <= 0) {
3767 			if (s < 0)
3768 				return s;
3769 			break;
3770 		}
3771 		size -= s;
3772 	}
3773 	if (size)
3774 		return -EIO;
3775 	return 0;
3776 }
3777 
3778 /*
3779  * config_unknown_volume  -  device configuration command for unknown volume
3780  *
3781  * When a device is added to an existing connection, the node on which the
3782  * device is added first will send configuration commands to its peer but the
3783  * peer will not know about the device yet.  It will warn and ignore these
3784  * commands.  Once the device is added on the second node, the second node will
3785  * send the same device configuration commands, but in the other direction.
3786  *
3787  * (We can also end up here if drbd is misconfigured.)
3788  */
3789 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3790 {
3791 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3792 		  cmdname(pi->cmd), pi->vnr);
3793 	return ignore_remaining_packet(connection, pi);
3794 }
3795 
3796 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3797 {
3798 	struct drbd_peer_device *peer_device;
3799 	struct drbd_device *device;
3800 	struct p_rs_param_95 *p;
3801 	unsigned int header_size, data_size, exp_max_sz;
3802 	struct crypto_ahash *verify_tfm = NULL;
3803 	struct crypto_ahash *csums_tfm = NULL;
3804 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3805 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3806 	const int apv = connection->agreed_pro_version;
3807 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3808 	int fifo_size = 0;
3809 	int err;
3810 
3811 	peer_device = conn_peer_device(connection, pi->vnr);
3812 	if (!peer_device)
3813 		return config_unknown_volume(connection, pi);
3814 	device = peer_device->device;
3815 
3816 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3817 		    : apv == 88 ? sizeof(struct p_rs_param)
3818 					+ SHARED_SECRET_MAX
3819 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3820 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3821 
3822 	if (pi->size > exp_max_sz) {
3823 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3824 		    pi->size, exp_max_sz);
3825 		return -EIO;
3826 	}
3827 
3828 	if (apv <= 88) {
3829 		header_size = sizeof(struct p_rs_param);
3830 		data_size = pi->size - header_size;
3831 	} else if (apv <= 94) {
3832 		header_size = sizeof(struct p_rs_param_89);
3833 		data_size = pi->size - header_size;
3834 		D_ASSERT(device, data_size == 0);
3835 	} else {
3836 		header_size = sizeof(struct p_rs_param_95);
3837 		data_size = pi->size - header_size;
3838 		D_ASSERT(device, data_size == 0);
3839 	}
3840 
3841 	/* initialize verify_alg and csums_alg */
3842 	p = pi->data;
3843 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3844 
3845 	err = drbd_recv_all(peer_device->connection, p, header_size);
3846 	if (err)
3847 		return err;
3848 
3849 	mutex_lock(&connection->resource->conf_update);
3850 	old_net_conf = peer_device->connection->net_conf;
3851 	if (get_ldev(device)) {
3852 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3853 		if (!new_disk_conf) {
3854 			put_ldev(device);
3855 			mutex_unlock(&connection->resource->conf_update);
3856 			drbd_err(device, "Allocation of new disk_conf failed\n");
3857 			return -ENOMEM;
3858 		}
3859 
3860 		old_disk_conf = device->ldev->disk_conf;
3861 		*new_disk_conf = *old_disk_conf;
3862 
3863 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3864 	}
3865 
3866 	if (apv >= 88) {
3867 		if (apv == 88) {
3868 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3869 				drbd_err(device, "verify-alg of wrong size, "
3870 					"peer wants %u, accepting only up to %u byte\n",
3871 					data_size, SHARED_SECRET_MAX);
3872 				err = -EIO;
3873 				goto reconnect;
3874 			}
3875 
3876 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3877 			if (err)
3878 				goto reconnect;
3879 			/* we expect NUL terminated string */
3880 			/* but just in case someone tries to be evil */
3881 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3882 			p->verify_alg[data_size-1] = 0;
3883 
3884 		} else /* apv >= 89 */ {
3885 			/* we still expect NUL terminated strings */
3886 			/* but just in case someone tries to be evil */
3887 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3888 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3889 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3890 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3891 		}
3892 
3893 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3894 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3895 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3896 				    old_net_conf->verify_alg, p->verify_alg);
3897 				goto disconnect;
3898 			}
3899 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3900 					p->verify_alg, "verify-alg");
3901 			if (IS_ERR(verify_tfm)) {
3902 				verify_tfm = NULL;
3903 				goto disconnect;
3904 			}
3905 		}
3906 
3907 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3908 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3909 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3910 				    old_net_conf->csums_alg, p->csums_alg);
3911 				goto disconnect;
3912 			}
3913 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3914 					p->csums_alg, "csums-alg");
3915 			if (IS_ERR(csums_tfm)) {
3916 				csums_tfm = NULL;
3917 				goto disconnect;
3918 			}
3919 		}
3920 
3921 		if (apv > 94 && new_disk_conf) {
3922 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3923 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3924 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3925 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3926 
3927 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3928 			if (fifo_size != device->rs_plan_s->size) {
3929 				new_plan = fifo_alloc(fifo_size);
3930 				if (!new_plan) {
3931 					drbd_err(device, "kmalloc of fifo_buffer failed");
3932 					put_ldev(device);
3933 					goto disconnect;
3934 				}
3935 			}
3936 		}
3937 
3938 		if (verify_tfm || csums_tfm) {
3939 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3940 			if (!new_net_conf) {
3941 				drbd_err(device, "Allocation of new net_conf failed\n");
3942 				goto disconnect;
3943 			}
3944 
3945 			*new_net_conf = *old_net_conf;
3946 
3947 			if (verify_tfm) {
3948 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3949 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3950 				crypto_free_ahash(peer_device->connection->verify_tfm);
3951 				peer_device->connection->verify_tfm = verify_tfm;
3952 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3953 			}
3954 			if (csums_tfm) {
3955 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3956 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3957 				crypto_free_ahash(peer_device->connection->csums_tfm);
3958 				peer_device->connection->csums_tfm = csums_tfm;
3959 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3960 			}
3961 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3962 		}
3963 	}
3964 
3965 	if (new_disk_conf) {
3966 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3967 		put_ldev(device);
3968 	}
3969 
3970 	if (new_plan) {
3971 		old_plan = device->rs_plan_s;
3972 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3973 	}
3974 
3975 	mutex_unlock(&connection->resource->conf_update);
3976 	synchronize_rcu();
3977 	if (new_net_conf)
3978 		kfree(old_net_conf);
3979 	kfree(old_disk_conf);
3980 	kfree(old_plan);
3981 
3982 	return 0;
3983 
3984 reconnect:
3985 	if (new_disk_conf) {
3986 		put_ldev(device);
3987 		kfree(new_disk_conf);
3988 	}
3989 	mutex_unlock(&connection->resource->conf_update);
3990 	return -EIO;
3991 
3992 disconnect:
3993 	kfree(new_plan);
3994 	if (new_disk_conf) {
3995 		put_ldev(device);
3996 		kfree(new_disk_conf);
3997 	}
3998 	mutex_unlock(&connection->resource->conf_update);
3999 	/* just for completeness: actually not needed,
4000 	 * as this is not reached if csums_tfm was ok. */
4001 	crypto_free_ahash(csums_tfm);
4002 	/* but free the verify_tfm again, if csums_tfm did not work out */
4003 	crypto_free_ahash(verify_tfm);
4004 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4005 	return -EIO;
4006 }
4007 
4008 /* warn if the arguments differ by more than 12.5% */
4009 static void warn_if_differ_considerably(struct drbd_device *device,
4010 	const char *s, sector_t a, sector_t b)
4011 {
4012 	sector_t d;
4013 	if (a == 0 || b == 0)
4014 		return;
4015 	d = (a > b) ? (a - b) : (b - a);
4016 	if (d > (a>>3) || d > (b>>3))
4017 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4018 		     (unsigned long long)a, (unsigned long long)b);
4019 }
4020 
4021 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4022 {
4023 	struct drbd_peer_device *peer_device;
4024 	struct drbd_device *device;
4025 	struct p_sizes *p = pi->data;
4026 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4027 	enum determine_dev_size dd = DS_UNCHANGED;
4028 	sector_t p_size, p_usize, p_csize, my_usize;
4029 	int ldsc = 0; /* local disk size changed */
4030 	enum dds_flags ddsf;
4031 
4032 	peer_device = conn_peer_device(connection, pi->vnr);
4033 	if (!peer_device)
4034 		return config_unknown_volume(connection, pi);
4035 	device = peer_device->device;
4036 
4037 	p_size = be64_to_cpu(p->d_size);
4038 	p_usize = be64_to_cpu(p->u_size);
4039 	p_csize = be64_to_cpu(p->c_size);
4040 
4041 	/* just store the peer's disk size for now.
4042 	 * we still need to figure out whether we accept that. */
4043 	device->p_size = p_size;
4044 
4045 	if (get_ldev(device)) {
4046 		sector_t new_size, cur_size;
4047 		rcu_read_lock();
4048 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4049 		rcu_read_unlock();
4050 
4051 		warn_if_differ_considerably(device, "lower level device sizes",
4052 			   p_size, drbd_get_max_capacity(device->ldev));
4053 		warn_if_differ_considerably(device, "user requested size",
4054 					    p_usize, my_usize);
4055 
4056 		/* if this is the first connect, or an otherwise expected
4057 		 * param exchange, choose the minimum */
4058 		if (device->state.conn == C_WF_REPORT_PARAMS)
4059 			p_usize = min_not_zero(my_usize, p_usize);
4060 
4061 		/* Never shrink a device with usable data during connect.
4062 		   But allow online shrinking if we are connected. */
4063 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4064 		cur_size = drbd_get_capacity(device->this_bdev);
4065 		if (new_size < cur_size &&
4066 		    device->state.disk >= D_OUTDATED &&
4067 		    device->state.conn < C_CONNECTED) {
4068 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4069 					(unsigned long long)new_size, (unsigned long long)cur_size);
4070 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4071 			put_ldev(device);
4072 			return -EIO;
4073 		}
4074 
4075 		if (my_usize != p_usize) {
4076 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4077 
4078 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4079 			if (!new_disk_conf) {
4080 				drbd_err(device, "Allocation of new disk_conf failed\n");
4081 				put_ldev(device);
4082 				return -ENOMEM;
4083 			}
4084 
4085 			mutex_lock(&connection->resource->conf_update);
4086 			old_disk_conf = device->ldev->disk_conf;
4087 			*new_disk_conf = *old_disk_conf;
4088 			new_disk_conf->disk_size = p_usize;
4089 
4090 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4091 			mutex_unlock(&connection->resource->conf_update);
4092 			synchronize_rcu();
4093 			kfree(old_disk_conf);
4094 
4095 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
4096 				 (unsigned long)my_usize);
4097 		}
4098 
4099 		put_ldev(device);
4100 	}
4101 
4102 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4103 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4104 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4105 	   drbd_reconsider_queue_parameters(), we can be sure that after
4106 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4107 
4108 	ddsf = be16_to_cpu(p->dds_flags);
4109 	if (get_ldev(device)) {
4110 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4111 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4112 		put_ldev(device);
4113 		if (dd == DS_ERROR)
4114 			return -EIO;
4115 		drbd_md_sync(device);
4116 	} else {
4117 		/*
4118 		 * I am diskless, need to accept the peer's *current* size.
4119 		 * I must NOT accept the peers backing disk size,
4120 		 * it may have been larger than mine all along...
4121 		 *
4122 		 * At this point, the peer knows more about my disk, or at
4123 		 * least about what we last agreed upon, than myself.
4124 		 * So if his c_size is less than his d_size, the most likely
4125 		 * reason is that *my* d_size was smaller last time we checked.
4126 		 *
4127 		 * However, if he sends a zero current size,
4128 		 * take his (user-capped or) backing disk size anyways.
4129 		 */
4130 		drbd_reconsider_queue_parameters(device, NULL, o);
4131 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4132 	}
4133 
4134 	if (get_ldev(device)) {
4135 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4136 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4137 			ldsc = 1;
4138 		}
4139 
4140 		put_ldev(device);
4141 	}
4142 
4143 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4144 		if (be64_to_cpu(p->c_size) !=
4145 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4146 			/* we have different sizes, probably peer
4147 			 * needs to know my new size... */
4148 			drbd_send_sizes(peer_device, 0, ddsf);
4149 		}
4150 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4151 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4152 			if (device->state.pdsk >= D_INCONSISTENT &&
4153 			    device->state.disk >= D_INCONSISTENT) {
4154 				if (ddsf & DDSF_NO_RESYNC)
4155 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4156 				else
4157 					resync_after_online_grow(device);
4158 			} else
4159 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4160 		}
4161 	}
4162 
4163 	return 0;
4164 }
4165 
4166 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4167 {
4168 	struct drbd_peer_device *peer_device;
4169 	struct drbd_device *device;
4170 	struct p_uuids *p = pi->data;
4171 	u64 *p_uuid;
4172 	int i, updated_uuids = 0;
4173 
4174 	peer_device = conn_peer_device(connection, pi->vnr);
4175 	if (!peer_device)
4176 		return config_unknown_volume(connection, pi);
4177 	device = peer_device->device;
4178 
4179 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4180 	if (!p_uuid) {
4181 		drbd_err(device, "kmalloc of p_uuid failed\n");
4182 		return false;
4183 	}
4184 
4185 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4186 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4187 
4188 	kfree(device->p_uuid);
4189 	device->p_uuid = p_uuid;
4190 
4191 	if (device->state.conn < C_CONNECTED &&
4192 	    device->state.disk < D_INCONSISTENT &&
4193 	    device->state.role == R_PRIMARY &&
4194 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4195 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4196 		    (unsigned long long)device->ed_uuid);
4197 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4198 		return -EIO;
4199 	}
4200 
4201 	if (get_ldev(device)) {
4202 		int skip_initial_sync =
4203 			device->state.conn == C_CONNECTED &&
4204 			peer_device->connection->agreed_pro_version >= 90 &&
4205 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4206 			(p_uuid[UI_FLAGS] & 8);
4207 		if (skip_initial_sync) {
4208 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4209 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4210 					"clear_n_write from receive_uuids",
4211 					BM_LOCKED_TEST_ALLOWED);
4212 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4213 			_drbd_uuid_set(device, UI_BITMAP, 0);
4214 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4215 					CS_VERBOSE, NULL);
4216 			drbd_md_sync(device);
4217 			updated_uuids = 1;
4218 		}
4219 		put_ldev(device);
4220 	} else if (device->state.disk < D_INCONSISTENT &&
4221 		   device->state.role == R_PRIMARY) {
4222 		/* I am a diskless primary, the peer just created a new current UUID
4223 		   for me. */
4224 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4225 	}
4226 
4227 	/* Before we test for the disk state, we should wait until an eventually
4228 	   ongoing cluster wide state change is finished. That is important if
4229 	   we are primary and are detaching from our disk. We need to see the
4230 	   new disk state... */
4231 	mutex_lock(device->state_mutex);
4232 	mutex_unlock(device->state_mutex);
4233 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4234 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4235 
4236 	if (updated_uuids)
4237 		drbd_print_uuids(device, "receiver updated UUIDs to");
4238 
4239 	return 0;
4240 }
4241 
4242 /**
4243  * convert_state() - Converts the peer's view of the cluster state to our point of view
4244  * @ps:		The state as seen by the peer.
4245  */
4246 static union drbd_state convert_state(union drbd_state ps)
4247 {
4248 	union drbd_state ms;
4249 
4250 	static enum drbd_conns c_tab[] = {
4251 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4252 		[C_CONNECTED] = C_CONNECTED,
4253 
4254 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4255 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4256 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4257 		[C_VERIFY_S]       = C_VERIFY_T,
4258 		[C_MASK]   = C_MASK,
4259 	};
4260 
4261 	ms.i = ps.i;
4262 
4263 	ms.conn = c_tab[ps.conn];
4264 	ms.peer = ps.role;
4265 	ms.role = ps.peer;
4266 	ms.pdsk = ps.disk;
4267 	ms.disk = ps.pdsk;
4268 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4269 
4270 	return ms;
4271 }
4272 
4273 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4274 {
4275 	struct drbd_peer_device *peer_device;
4276 	struct drbd_device *device;
4277 	struct p_req_state *p = pi->data;
4278 	union drbd_state mask, val;
4279 	enum drbd_state_rv rv;
4280 
4281 	peer_device = conn_peer_device(connection, pi->vnr);
4282 	if (!peer_device)
4283 		return -EIO;
4284 	device = peer_device->device;
4285 
4286 	mask.i = be32_to_cpu(p->mask);
4287 	val.i = be32_to_cpu(p->val);
4288 
4289 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4290 	    mutex_is_locked(device->state_mutex)) {
4291 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4292 		return 0;
4293 	}
4294 
4295 	mask = convert_state(mask);
4296 	val = convert_state(val);
4297 
4298 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4299 	drbd_send_sr_reply(peer_device, rv);
4300 
4301 	drbd_md_sync(device);
4302 
4303 	return 0;
4304 }
4305 
4306 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4307 {
4308 	struct p_req_state *p = pi->data;
4309 	union drbd_state mask, val;
4310 	enum drbd_state_rv rv;
4311 
4312 	mask.i = be32_to_cpu(p->mask);
4313 	val.i = be32_to_cpu(p->val);
4314 
4315 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4316 	    mutex_is_locked(&connection->cstate_mutex)) {
4317 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4318 		return 0;
4319 	}
4320 
4321 	mask = convert_state(mask);
4322 	val = convert_state(val);
4323 
4324 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4325 	conn_send_sr_reply(connection, rv);
4326 
4327 	return 0;
4328 }
4329 
4330 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4331 {
4332 	struct drbd_peer_device *peer_device;
4333 	struct drbd_device *device;
4334 	struct p_state *p = pi->data;
4335 	union drbd_state os, ns, peer_state;
4336 	enum drbd_disk_state real_peer_disk;
4337 	enum chg_state_flags cs_flags;
4338 	int rv;
4339 
4340 	peer_device = conn_peer_device(connection, pi->vnr);
4341 	if (!peer_device)
4342 		return config_unknown_volume(connection, pi);
4343 	device = peer_device->device;
4344 
4345 	peer_state.i = be32_to_cpu(p->state);
4346 
4347 	real_peer_disk = peer_state.disk;
4348 	if (peer_state.disk == D_NEGOTIATING) {
4349 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4350 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4351 	}
4352 
4353 	spin_lock_irq(&device->resource->req_lock);
4354  retry:
4355 	os = ns = drbd_read_state(device);
4356 	spin_unlock_irq(&device->resource->req_lock);
4357 
4358 	/* If some other part of the code (ack_receiver thread, timeout)
4359 	 * already decided to close the connection again,
4360 	 * we must not "re-establish" it here. */
4361 	if (os.conn <= C_TEAR_DOWN)
4362 		return -ECONNRESET;
4363 
4364 	/* If this is the "end of sync" confirmation, usually the peer disk
4365 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4366 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4367 	 * unpause-sync events has been "just right", the peer disk may
4368 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4369 	 */
4370 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4371 	    real_peer_disk == D_UP_TO_DATE &&
4372 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4373 		/* If we are (becoming) SyncSource, but peer is still in sync
4374 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4375 		 * will change to inconsistent once the peer reaches active
4376 		 * syncing states.
4377 		 * It may have changed syncer-paused flags, however, so we
4378 		 * cannot ignore this completely. */
4379 		if (peer_state.conn > C_CONNECTED &&
4380 		    peer_state.conn < C_SYNC_SOURCE)
4381 			real_peer_disk = D_INCONSISTENT;
4382 
4383 		/* if peer_state changes to connected at the same time,
4384 		 * it explicitly notifies us that it finished resync.
4385 		 * Maybe we should finish it up, too? */
4386 		else if (os.conn >= C_SYNC_SOURCE &&
4387 			 peer_state.conn == C_CONNECTED) {
4388 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4389 				drbd_resync_finished(device);
4390 			return 0;
4391 		}
4392 	}
4393 
4394 	/* explicit verify finished notification, stop sector reached. */
4395 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4396 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4397 		ov_out_of_sync_print(device);
4398 		drbd_resync_finished(device);
4399 		return 0;
4400 	}
4401 
4402 	/* peer says his disk is inconsistent, while we think it is uptodate,
4403 	 * and this happens while the peer still thinks we have a sync going on,
4404 	 * but we think we are already done with the sync.
4405 	 * We ignore this to avoid flapping pdsk.
4406 	 * This should not happen, if the peer is a recent version of drbd. */
4407 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4408 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4409 		real_peer_disk = D_UP_TO_DATE;
4410 
4411 	if (ns.conn == C_WF_REPORT_PARAMS)
4412 		ns.conn = C_CONNECTED;
4413 
4414 	if (peer_state.conn == C_AHEAD)
4415 		ns.conn = C_BEHIND;
4416 
4417 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4418 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4419 		int cr; /* consider resync */
4420 
4421 		/* if we established a new connection */
4422 		cr  = (os.conn < C_CONNECTED);
4423 		/* if we had an established connection
4424 		 * and one of the nodes newly attaches a disk */
4425 		cr |= (os.conn == C_CONNECTED &&
4426 		       (peer_state.disk == D_NEGOTIATING ||
4427 			os.disk == D_NEGOTIATING));
4428 		/* if we have both been inconsistent, and the peer has been
4429 		 * forced to be UpToDate with --overwrite-data */
4430 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4431 		/* if we had been plain connected, and the admin requested to
4432 		 * start a sync by "invalidate" or "invalidate-remote" */
4433 		cr |= (os.conn == C_CONNECTED &&
4434 				(peer_state.conn >= C_STARTING_SYNC_S &&
4435 				 peer_state.conn <= C_WF_BITMAP_T));
4436 
4437 		if (cr)
4438 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4439 
4440 		put_ldev(device);
4441 		if (ns.conn == C_MASK) {
4442 			ns.conn = C_CONNECTED;
4443 			if (device->state.disk == D_NEGOTIATING) {
4444 				drbd_force_state(device, NS(disk, D_FAILED));
4445 			} else if (peer_state.disk == D_NEGOTIATING) {
4446 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4447 				peer_state.disk = D_DISKLESS;
4448 				real_peer_disk = D_DISKLESS;
4449 			} else {
4450 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4451 					return -EIO;
4452 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4453 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4454 				return -EIO;
4455 			}
4456 		}
4457 	}
4458 
4459 	spin_lock_irq(&device->resource->req_lock);
4460 	if (os.i != drbd_read_state(device).i)
4461 		goto retry;
4462 	clear_bit(CONSIDER_RESYNC, &device->flags);
4463 	ns.peer = peer_state.role;
4464 	ns.pdsk = real_peer_disk;
4465 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4466 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4467 		ns.disk = device->new_state_tmp.disk;
4468 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4469 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4470 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4471 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4472 		   for temporal network outages! */
4473 		spin_unlock_irq(&device->resource->req_lock);
4474 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4475 		tl_clear(peer_device->connection);
4476 		drbd_uuid_new_current(device);
4477 		clear_bit(NEW_CUR_UUID, &device->flags);
4478 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4479 		return -EIO;
4480 	}
4481 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4482 	ns = drbd_read_state(device);
4483 	spin_unlock_irq(&device->resource->req_lock);
4484 
4485 	if (rv < SS_SUCCESS) {
4486 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4487 		return -EIO;
4488 	}
4489 
4490 	if (os.conn > C_WF_REPORT_PARAMS) {
4491 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4492 		    peer_state.disk != D_NEGOTIATING ) {
4493 			/* we want resync, peer has not yet decided to sync... */
4494 			/* Nowadays only used when forcing a node into primary role and
4495 			   setting its disk to UpToDate with that */
4496 			drbd_send_uuids(peer_device);
4497 			drbd_send_current_state(peer_device);
4498 		}
4499 	}
4500 
4501 	clear_bit(DISCARD_MY_DATA, &device->flags);
4502 
4503 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4504 
4505 	return 0;
4506 }
4507 
4508 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4509 {
4510 	struct drbd_peer_device *peer_device;
4511 	struct drbd_device *device;
4512 	struct p_rs_uuid *p = pi->data;
4513 
4514 	peer_device = conn_peer_device(connection, pi->vnr);
4515 	if (!peer_device)
4516 		return -EIO;
4517 	device = peer_device->device;
4518 
4519 	wait_event(device->misc_wait,
4520 		   device->state.conn == C_WF_SYNC_UUID ||
4521 		   device->state.conn == C_BEHIND ||
4522 		   device->state.conn < C_CONNECTED ||
4523 		   device->state.disk < D_NEGOTIATING);
4524 
4525 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4526 
4527 	/* Here the _drbd_uuid_ functions are right, current should
4528 	   _not_ be rotated into the history */
4529 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4530 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4531 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4532 
4533 		drbd_print_uuids(device, "updated sync uuid");
4534 		drbd_start_resync(device, C_SYNC_TARGET);
4535 
4536 		put_ldev(device);
4537 	} else
4538 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4539 
4540 	return 0;
4541 }
4542 
4543 /**
4544  * receive_bitmap_plain
4545  *
4546  * Return 0 when done, 1 when another iteration is needed, and a negative error
4547  * code upon failure.
4548  */
4549 static int
4550 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4551 		     unsigned long *p, struct bm_xfer_ctx *c)
4552 {
4553 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4554 				 drbd_header_size(peer_device->connection);
4555 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4556 				       c->bm_words - c->word_offset);
4557 	unsigned int want = num_words * sizeof(*p);
4558 	int err;
4559 
4560 	if (want != size) {
4561 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4562 		return -EIO;
4563 	}
4564 	if (want == 0)
4565 		return 0;
4566 	err = drbd_recv_all(peer_device->connection, p, want);
4567 	if (err)
4568 		return err;
4569 
4570 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4571 
4572 	c->word_offset += num_words;
4573 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4574 	if (c->bit_offset > c->bm_bits)
4575 		c->bit_offset = c->bm_bits;
4576 
4577 	return 1;
4578 }
4579 
4580 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4581 {
4582 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4583 }
4584 
4585 static int dcbp_get_start(struct p_compressed_bm *p)
4586 {
4587 	return (p->encoding & 0x80) != 0;
4588 }
4589 
4590 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4591 {
4592 	return (p->encoding >> 4) & 0x7;
4593 }
4594 
4595 /**
4596  * recv_bm_rle_bits
4597  *
4598  * Return 0 when done, 1 when another iteration is needed, and a negative error
4599  * code upon failure.
4600  */
4601 static int
4602 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4603 		struct p_compressed_bm *p,
4604 		 struct bm_xfer_ctx *c,
4605 		 unsigned int len)
4606 {
4607 	struct bitstream bs;
4608 	u64 look_ahead;
4609 	u64 rl;
4610 	u64 tmp;
4611 	unsigned long s = c->bit_offset;
4612 	unsigned long e;
4613 	int toggle = dcbp_get_start(p);
4614 	int have;
4615 	int bits;
4616 
4617 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4618 
4619 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4620 	if (bits < 0)
4621 		return -EIO;
4622 
4623 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4624 		bits = vli_decode_bits(&rl, look_ahead);
4625 		if (bits <= 0)
4626 			return -EIO;
4627 
4628 		if (toggle) {
4629 			e = s + rl -1;
4630 			if (e >= c->bm_bits) {
4631 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4632 				return -EIO;
4633 			}
4634 			_drbd_bm_set_bits(peer_device->device, s, e);
4635 		}
4636 
4637 		if (have < bits) {
4638 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4639 				have, bits, look_ahead,
4640 				(unsigned int)(bs.cur.b - p->code),
4641 				(unsigned int)bs.buf_len);
4642 			return -EIO;
4643 		}
4644 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4645 		if (likely(bits < 64))
4646 			look_ahead >>= bits;
4647 		else
4648 			look_ahead = 0;
4649 		have -= bits;
4650 
4651 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4652 		if (bits < 0)
4653 			return -EIO;
4654 		look_ahead |= tmp << have;
4655 		have += bits;
4656 	}
4657 
4658 	c->bit_offset = s;
4659 	bm_xfer_ctx_bit_to_word_offset(c);
4660 
4661 	return (s != c->bm_bits);
4662 }
4663 
4664 /**
4665  * decode_bitmap_c
4666  *
4667  * Return 0 when done, 1 when another iteration is needed, and a negative error
4668  * code upon failure.
4669  */
4670 static int
4671 decode_bitmap_c(struct drbd_peer_device *peer_device,
4672 		struct p_compressed_bm *p,
4673 		struct bm_xfer_ctx *c,
4674 		unsigned int len)
4675 {
4676 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4677 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4678 
4679 	/* other variants had been implemented for evaluation,
4680 	 * but have been dropped as this one turned out to be "best"
4681 	 * during all our tests. */
4682 
4683 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4684 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4685 	return -EIO;
4686 }
4687 
4688 void INFO_bm_xfer_stats(struct drbd_device *device,
4689 		const char *direction, struct bm_xfer_ctx *c)
4690 {
4691 	/* what would it take to transfer it "plaintext" */
4692 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4693 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4694 	unsigned int plain =
4695 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4696 		c->bm_words * sizeof(unsigned long);
4697 	unsigned int total = c->bytes[0] + c->bytes[1];
4698 	unsigned int r;
4699 
4700 	/* total can not be zero. but just in case: */
4701 	if (total == 0)
4702 		return;
4703 
4704 	/* don't report if not compressed */
4705 	if (total >= plain)
4706 		return;
4707 
4708 	/* total < plain. check for overflow, still */
4709 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4710 		                    : (1000 * total / plain);
4711 
4712 	if (r > 1000)
4713 		r = 1000;
4714 
4715 	r = 1000 - r;
4716 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4717 	     "total %u; compression: %u.%u%%\n",
4718 			direction,
4719 			c->bytes[1], c->packets[1],
4720 			c->bytes[0], c->packets[0],
4721 			total, r/10, r % 10);
4722 }
4723 
4724 /* Since we are processing the bitfield from lower addresses to higher,
4725    it does not matter if the process it in 32 bit chunks or 64 bit
4726    chunks as long as it is little endian. (Understand it as byte stream,
4727    beginning with the lowest byte...) If we would use big endian
4728    we would need to process it from the highest address to the lowest,
4729    in order to be agnostic to the 32 vs 64 bits issue.
4730 
4731    returns 0 on failure, 1 if we successfully received it. */
4732 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4733 {
4734 	struct drbd_peer_device *peer_device;
4735 	struct drbd_device *device;
4736 	struct bm_xfer_ctx c;
4737 	int err;
4738 
4739 	peer_device = conn_peer_device(connection, pi->vnr);
4740 	if (!peer_device)
4741 		return -EIO;
4742 	device = peer_device->device;
4743 
4744 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4745 	/* you are supposed to send additional out-of-sync information
4746 	 * if you actually set bits during this phase */
4747 
4748 	c = (struct bm_xfer_ctx) {
4749 		.bm_bits = drbd_bm_bits(device),
4750 		.bm_words = drbd_bm_words(device),
4751 	};
4752 
4753 	for(;;) {
4754 		if (pi->cmd == P_BITMAP)
4755 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4756 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4757 			/* MAYBE: sanity check that we speak proto >= 90,
4758 			 * and the feature is enabled! */
4759 			struct p_compressed_bm *p = pi->data;
4760 
4761 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4762 				drbd_err(device, "ReportCBitmap packet too large\n");
4763 				err = -EIO;
4764 				goto out;
4765 			}
4766 			if (pi->size <= sizeof(*p)) {
4767 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4768 				err = -EIO;
4769 				goto out;
4770 			}
4771 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4772 			if (err)
4773 			       goto out;
4774 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4775 		} else {
4776 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4777 			err = -EIO;
4778 			goto out;
4779 		}
4780 
4781 		c.packets[pi->cmd == P_BITMAP]++;
4782 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4783 
4784 		if (err <= 0) {
4785 			if (err < 0)
4786 				goto out;
4787 			break;
4788 		}
4789 		err = drbd_recv_header(peer_device->connection, pi);
4790 		if (err)
4791 			goto out;
4792 	}
4793 
4794 	INFO_bm_xfer_stats(device, "receive", &c);
4795 
4796 	if (device->state.conn == C_WF_BITMAP_T) {
4797 		enum drbd_state_rv rv;
4798 
4799 		err = drbd_send_bitmap(device);
4800 		if (err)
4801 			goto out;
4802 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4803 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4804 		D_ASSERT(device, rv == SS_SUCCESS);
4805 	} else if (device->state.conn != C_WF_BITMAP_S) {
4806 		/* admin may have requested C_DISCONNECTING,
4807 		 * other threads may have noticed network errors */
4808 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4809 		    drbd_conn_str(device->state.conn));
4810 	}
4811 	err = 0;
4812 
4813  out:
4814 	drbd_bm_unlock(device);
4815 	if (!err && device->state.conn == C_WF_BITMAP_S)
4816 		drbd_start_resync(device, C_SYNC_SOURCE);
4817 	return err;
4818 }
4819 
4820 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4821 {
4822 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4823 		 pi->cmd, pi->size);
4824 
4825 	return ignore_remaining_packet(connection, pi);
4826 }
4827 
4828 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4829 {
4830 	/* Make sure we've acked all the TCP data associated
4831 	 * with the data requests being unplugged */
4832 	drbd_tcp_quickack(connection->data.socket);
4833 
4834 	return 0;
4835 }
4836 
4837 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4838 {
4839 	struct drbd_peer_device *peer_device;
4840 	struct drbd_device *device;
4841 	struct p_block_desc *p = pi->data;
4842 
4843 	peer_device = conn_peer_device(connection, pi->vnr);
4844 	if (!peer_device)
4845 		return -EIO;
4846 	device = peer_device->device;
4847 
4848 	switch (device->state.conn) {
4849 	case C_WF_SYNC_UUID:
4850 	case C_WF_BITMAP_T:
4851 	case C_BEHIND:
4852 			break;
4853 	default:
4854 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4855 				drbd_conn_str(device->state.conn));
4856 	}
4857 
4858 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4859 
4860 	return 0;
4861 }
4862 
4863 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4864 {
4865 	struct drbd_peer_device *peer_device;
4866 	struct p_block_desc *p = pi->data;
4867 	struct drbd_device *device;
4868 	sector_t sector;
4869 	int size, err = 0;
4870 
4871 	peer_device = conn_peer_device(connection, pi->vnr);
4872 	if (!peer_device)
4873 		return -EIO;
4874 	device = peer_device->device;
4875 
4876 	sector = be64_to_cpu(p->sector);
4877 	size = be32_to_cpu(p->blksize);
4878 
4879 	dec_rs_pending(device);
4880 
4881 	if (get_ldev(device)) {
4882 		struct drbd_peer_request *peer_req;
4883 		const int op = REQ_OP_DISCARD;
4884 
4885 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4886 					       size, 0, GFP_NOIO);
4887 		if (!peer_req) {
4888 			put_ldev(device);
4889 			return -ENOMEM;
4890 		}
4891 
4892 		peer_req->w.cb = e_end_resync_block;
4893 		peer_req->submit_jif = jiffies;
4894 		peer_req->flags |= EE_IS_TRIM;
4895 
4896 		spin_lock_irq(&device->resource->req_lock);
4897 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4898 		spin_unlock_irq(&device->resource->req_lock);
4899 
4900 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4901 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4902 
4903 		if (err) {
4904 			spin_lock_irq(&device->resource->req_lock);
4905 			list_del(&peer_req->w.list);
4906 			spin_unlock_irq(&device->resource->req_lock);
4907 
4908 			drbd_free_peer_req(device, peer_req);
4909 			put_ldev(device);
4910 			err = 0;
4911 			goto fail;
4912 		}
4913 
4914 		inc_unacked(device);
4915 
4916 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4917 		   as well as drbd_rs_complete_io() */
4918 	} else {
4919 	fail:
4920 		drbd_rs_complete_io(device, sector);
4921 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4922 	}
4923 
4924 	atomic_add(size >> 9, &device->rs_sect_in);
4925 
4926 	return err;
4927 }
4928 
4929 struct data_cmd {
4930 	int expect_payload;
4931 	unsigned int pkt_size;
4932 	int (*fn)(struct drbd_connection *, struct packet_info *);
4933 };
4934 
4935 static struct data_cmd drbd_cmd_handler[] = {
4936 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4937 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4938 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4939 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4940 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4941 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4942 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4943 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4944 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4945 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4946 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4947 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4948 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4949 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4950 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4951 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4952 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4953 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4955 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4956 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4957 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4958 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4959 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4960 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4961 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4962 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4963 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
4964 };
4965 
4966 static void drbdd(struct drbd_connection *connection)
4967 {
4968 	struct packet_info pi;
4969 	size_t shs; /* sub header size */
4970 	int err;
4971 
4972 	while (get_t_state(&connection->receiver) == RUNNING) {
4973 		struct data_cmd const *cmd;
4974 
4975 		drbd_thread_current_set_cpu(&connection->receiver);
4976 		update_receiver_timing_details(connection, drbd_recv_header);
4977 		if (drbd_recv_header(connection, &pi))
4978 			goto err_out;
4979 
4980 		cmd = &drbd_cmd_handler[pi.cmd];
4981 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4982 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4983 				 cmdname(pi.cmd), pi.cmd);
4984 			goto err_out;
4985 		}
4986 
4987 		shs = cmd->pkt_size;
4988 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4989 			shs += sizeof(struct o_qlim);
4990 		if (pi.size > shs && !cmd->expect_payload) {
4991 			drbd_err(connection, "No payload expected %s l:%d\n",
4992 				 cmdname(pi.cmd), pi.size);
4993 			goto err_out;
4994 		}
4995 		if (pi.size < shs) {
4996 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4997 				 cmdname(pi.cmd), (int)shs, pi.size);
4998 			goto err_out;
4999 		}
5000 
5001 		if (shs) {
5002 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5003 			err = drbd_recv_all_warn(connection, pi.data, shs);
5004 			if (err)
5005 				goto err_out;
5006 			pi.size -= shs;
5007 		}
5008 
5009 		update_receiver_timing_details(connection, cmd->fn);
5010 		err = cmd->fn(connection, &pi);
5011 		if (err) {
5012 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5013 				 cmdname(pi.cmd), err, pi.size);
5014 			goto err_out;
5015 		}
5016 	}
5017 	return;
5018 
5019     err_out:
5020 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5021 }
5022 
5023 static void conn_disconnect(struct drbd_connection *connection)
5024 {
5025 	struct drbd_peer_device *peer_device;
5026 	enum drbd_conns oc;
5027 	int vnr;
5028 
5029 	if (connection->cstate == C_STANDALONE)
5030 		return;
5031 
5032 	/* We are about to start the cleanup after connection loss.
5033 	 * Make sure drbd_make_request knows about that.
5034 	 * Usually we should be in some network failure state already,
5035 	 * but just in case we are not, we fix it up here.
5036 	 */
5037 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5038 
5039 	/* ack_receiver does not clean up anything. it must not interfere, either */
5040 	drbd_thread_stop(&connection->ack_receiver);
5041 	if (connection->ack_sender) {
5042 		destroy_workqueue(connection->ack_sender);
5043 		connection->ack_sender = NULL;
5044 	}
5045 	drbd_free_sock(connection);
5046 
5047 	rcu_read_lock();
5048 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5049 		struct drbd_device *device = peer_device->device;
5050 		kref_get(&device->kref);
5051 		rcu_read_unlock();
5052 		drbd_disconnected(peer_device);
5053 		kref_put(&device->kref, drbd_destroy_device);
5054 		rcu_read_lock();
5055 	}
5056 	rcu_read_unlock();
5057 
5058 	if (!list_empty(&connection->current_epoch->list))
5059 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5060 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5061 	atomic_set(&connection->current_epoch->epoch_size, 0);
5062 	connection->send.seen_any_write_yet = false;
5063 
5064 	drbd_info(connection, "Connection closed\n");
5065 
5066 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5067 		conn_try_outdate_peer_async(connection);
5068 
5069 	spin_lock_irq(&connection->resource->req_lock);
5070 	oc = connection->cstate;
5071 	if (oc >= C_UNCONNECTED)
5072 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5073 
5074 	spin_unlock_irq(&connection->resource->req_lock);
5075 
5076 	if (oc == C_DISCONNECTING)
5077 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5078 }
5079 
5080 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5081 {
5082 	struct drbd_device *device = peer_device->device;
5083 	unsigned int i;
5084 
5085 	/* wait for current activity to cease. */
5086 	spin_lock_irq(&device->resource->req_lock);
5087 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5088 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5089 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5090 	spin_unlock_irq(&device->resource->req_lock);
5091 
5092 	/* We do not have data structures that would allow us to
5093 	 * get the rs_pending_cnt down to 0 again.
5094 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5095 	 *    the pending RSDataRequest's we have sent.
5096 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5097 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5098 	 *  And no, it is not the sum of the reference counts in the
5099 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5100 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5101 	 *  on the fly. */
5102 	drbd_rs_cancel_all(device);
5103 	device->rs_total = 0;
5104 	device->rs_failed = 0;
5105 	atomic_set(&device->rs_pending_cnt, 0);
5106 	wake_up(&device->misc_wait);
5107 
5108 	del_timer_sync(&device->resync_timer);
5109 	resync_timer_fn((unsigned long)device);
5110 
5111 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5112 	 * w_make_resync_request etc. which may still be on the worker queue
5113 	 * to be "canceled" */
5114 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5115 
5116 	drbd_finish_peer_reqs(device);
5117 
5118 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5119 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5120 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5121 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5122 
5123 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5124 	 * again via drbd_try_clear_on_disk_bm(). */
5125 	drbd_rs_cancel_all(device);
5126 
5127 	kfree(device->p_uuid);
5128 	device->p_uuid = NULL;
5129 
5130 	if (!drbd_suspended(device))
5131 		tl_clear(peer_device->connection);
5132 
5133 	drbd_md_sync(device);
5134 
5135 	if (get_ldev(device)) {
5136 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5137 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5138 		put_ldev(device);
5139 	}
5140 
5141 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5142 	 * want to use SO_LINGER, because apparently it can be deferred for
5143 	 * more than 20 seconds (longest time I checked).
5144 	 *
5145 	 * Actually we don't care for exactly when the network stack does its
5146 	 * put_page(), but release our reference on these pages right here.
5147 	 */
5148 	i = drbd_free_peer_reqs(device, &device->net_ee);
5149 	if (i)
5150 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5151 	i = atomic_read(&device->pp_in_use_by_net);
5152 	if (i)
5153 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5154 	i = atomic_read(&device->pp_in_use);
5155 	if (i)
5156 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5157 
5158 	D_ASSERT(device, list_empty(&device->read_ee));
5159 	D_ASSERT(device, list_empty(&device->active_ee));
5160 	D_ASSERT(device, list_empty(&device->sync_ee));
5161 	D_ASSERT(device, list_empty(&device->done_ee));
5162 
5163 	return 0;
5164 }
5165 
5166 /*
5167  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5168  * we can agree on is stored in agreed_pro_version.
5169  *
5170  * feature flags and the reserved array should be enough room for future
5171  * enhancements of the handshake protocol, and possible plugins...
5172  *
5173  * for now, they are expected to be zero, but ignored.
5174  */
5175 static int drbd_send_features(struct drbd_connection *connection)
5176 {
5177 	struct drbd_socket *sock;
5178 	struct p_connection_features *p;
5179 
5180 	sock = &connection->data;
5181 	p = conn_prepare_command(connection, sock);
5182 	if (!p)
5183 		return -EIO;
5184 	memset(p, 0, sizeof(*p));
5185 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5186 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5187 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5188 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5189 }
5190 
5191 /*
5192  * return values:
5193  *   1 yes, we have a valid connection
5194  *   0 oops, did not work out, please try again
5195  *  -1 peer talks different language,
5196  *     no point in trying again, please go standalone.
5197  */
5198 static int drbd_do_features(struct drbd_connection *connection)
5199 {
5200 	/* ASSERT current == connection->receiver ... */
5201 	struct p_connection_features *p;
5202 	const int expect = sizeof(struct p_connection_features);
5203 	struct packet_info pi;
5204 	int err;
5205 
5206 	err = drbd_send_features(connection);
5207 	if (err)
5208 		return 0;
5209 
5210 	err = drbd_recv_header(connection, &pi);
5211 	if (err)
5212 		return 0;
5213 
5214 	if (pi.cmd != P_CONNECTION_FEATURES) {
5215 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5216 			 cmdname(pi.cmd), pi.cmd);
5217 		return -1;
5218 	}
5219 
5220 	if (pi.size != expect) {
5221 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5222 		     expect, pi.size);
5223 		return -1;
5224 	}
5225 
5226 	p = pi.data;
5227 	err = drbd_recv_all_warn(connection, p, expect);
5228 	if (err)
5229 		return 0;
5230 
5231 	p->protocol_min = be32_to_cpu(p->protocol_min);
5232 	p->protocol_max = be32_to_cpu(p->protocol_max);
5233 	if (p->protocol_max == 0)
5234 		p->protocol_max = p->protocol_min;
5235 
5236 	if (PRO_VERSION_MAX < p->protocol_min ||
5237 	    PRO_VERSION_MIN > p->protocol_max)
5238 		goto incompat;
5239 
5240 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5241 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5242 
5243 	drbd_info(connection, "Handshake successful: "
5244 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5245 
5246 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5247 		  connection->agreed_features,
5248 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5249 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5250 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5251 		  connection->agreed_features ? "" : " none");
5252 
5253 	return 1;
5254 
5255  incompat:
5256 	drbd_err(connection, "incompatible DRBD dialects: "
5257 	    "I support %d-%d, peer supports %d-%d\n",
5258 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5259 	    p->protocol_min, p->protocol_max);
5260 	return -1;
5261 }
5262 
5263 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5264 static int drbd_do_auth(struct drbd_connection *connection)
5265 {
5266 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5267 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5268 	return -1;
5269 }
5270 #else
5271 #define CHALLENGE_LEN 64
5272 
5273 /* Return value:
5274 	1 - auth succeeded,
5275 	0 - failed, try again (network error),
5276 	-1 - auth failed, don't try again.
5277 */
5278 
5279 static int drbd_do_auth(struct drbd_connection *connection)
5280 {
5281 	struct drbd_socket *sock;
5282 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5283 	char *response = NULL;
5284 	char *right_response = NULL;
5285 	char *peers_ch = NULL;
5286 	unsigned int key_len;
5287 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5288 	unsigned int resp_size;
5289 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5290 	struct packet_info pi;
5291 	struct net_conf *nc;
5292 	int err, rv;
5293 
5294 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5295 
5296 	rcu_read_lock();
5297 	nc = rcu_dereference(connection->net_conf);
5298 	key_len = strlen(nc->shared_secret);
5299 	memcpy(secret, nc->shared_secret, key_len);
5300 	rcu_read_unlock();
5301 
5302 	desc->tfm = connection->cram_hmac_tfm;
5303 	desc->flags = 0;
5304 
5305 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5306 	if (rv) {
5307 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5308 		rv = -1;
5309 		goto fail;
5310 	}
5311 
5312 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5313 
5314 	sock = &connection->data;
5315 	if (!conn_prepare_command(connection, sock)) {
5316 		rv = 0;
5317 		goto fail;
5318 	}
5319 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5320 				my_challenge, CHALLENGE_LEN);
5321 	if (!rv)
5322 		goto fail;
5323 
5324 	err = drbd_recv_header(connection, &pi);
5325 	if (err) {
5326 		rv = 0;
5327 		goto fail;
5328 	}
5329 
5330 	if (pi.cmd != P_AUTH_CHALLENGE) {
5331 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5332 			 cmdname(pi.cmd), pi.cmd);
5333 		rv = 0;
5334 		goto fail;
5335 	}
5336 
5337 	if (pi.size > CHALLENGE_LEN * 2) {
5338 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5339 		rv = -1;
5340 		goto fail;
5341 	}
5342 
5343 	if (pi.size < CHALLENGE_LEN) {
5344 		drbd_err(connection, "AuthChallenge payload too small.\n");
5345 		rv = -1;
5346 		goto fail;
5347 	}
5348 
5349 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5350 	if (peers_ch == NULL) {
5351 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5352 		rv = -1;
5353 		goto fail;
5354 	}
5355 
5356 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5357 	if (err) {
5358 		rv = 0;
5359 		goto fail;
5360 	}
5361 
5362 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5363 		drbd_err(connection, "Peer presented the same challenge!\n");
5364 		rv = -1;
5365 		goto fail;
5366 	}
5367 
5368 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5369 	response = kmalloc(resp_size, GFP_NOIO);
5370 	if (response == NULL) {
5371 		drbd_err(connection, "kmalloc of response failed\n");
5372 		rv = -1;
5373 		goto fail;
5374 	}
5375 
5376 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5377 	if (rv) {
5378 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5379 		rv = -1;
5380 		goto fail;
5381 	}
5382 
5383 	if (!conn_prepare_command(connection, sock)) {
5384 		rv = 0;
5385 		goto fail;
5386 	}
5387 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5388 				response, resp_size);
5389 	if (!rv)
5390 		goto fail;
5391 
5392 	err = drbd_recv_header(connection, &pi);
5393 	if (err) {
5394 		rv = 0;
5395 		goto fail;
5396 	}
5397 
5398 	if (pi.cmd != P_AUTH_RESPONSE) {
5399 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5400 			 cmdname(pi.cmd), pi.cmd);
5401 		rv = 0;
5402 		goto fail;
5403 	}
5404 
5405 	if (pi.size != resp_size) {
5406 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5407 		rv = 0;
5408 		goto fail;
5409 	}
5410 
5411 	err = drbd_recv_all_warn(connection, response , resp_size);
5412 	if (err) {
5413 		rv = 0;
5414 		goto fail;
5415 	}
5416 
5417 	right_response = kmalloc(resp_size, GFP_NOIO);
5418 	if (right_response == NULL) {
5419 		drbd_err(connection, "kmalloc of right_response failed\n");
5420 		rv = -1;
5421 		goto fail;
5422 	}
5423 
5424 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5425 				 right_response);
5426 	if (rv) {
5427 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5428 		rv = -1;
5429 		goto fail;
5430 	}
5431 
5432 	rv = !memcmp(response, right_response, resp_size);
5433 
5434 	if (rv)
5435 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5436 		     resp_size);
5437 	else
5438 		rv = -1;
5439 
5440  fail:
5441 	kfree(peers_ch);
5442 	kfree(response);
5443 	kfree(right_response);
5444 	shash_desc_zero(desc);
5445 
5446 	return rv;
5447 }
5448 #endif
5449 
5450 int drbd_receiver(struct drbd_thread *thi)
5451 {
5452 	struct drbd_connection *connection = thi->connection;
5453 	int h;
5454 
5455 	drbd_info(connection, "receiver (re)started\n");
5456 
5457 	do {
5458 		h = conn_connect(connection);
5459 		if (h == 0) {
5460 			conn_disconnect(connection);
5461 			schedule_timeout_interruptible(HZ);
5462 		}
5463 		if (h == -1) {
5464 			drbd_warn(connection, "Discarding network configuration.\n");
5465 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5466 		}
5467 	} while (h == 0);
5468 
5469 	if (h > 0)
5470 		drbdd(connection);
5471 
5472 	conn_disconnect(connection);
5473 
5474 	drbd_info(connection, "receiver terminated\n");
5475 	return 0;
5476 }
5477 
5478 /* ********* acknowledge sender ******** */
5479 
5480 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5481 {
5482 	struct p_req_state_reply *p = pi->data;
5483 	int retcode = be32_to_cpu(p->retcode);
5484 
5485 	if (retcode >= SS_SUCCESS) {
5486 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5487 	} else {
5488 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5489 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5490 			 drbd_set_st_err_str(retcode), retcode);
5491 	}
5492 	wake_up(&connection->ping_wait);
5493 
5494 	return 0;
5495 }
5496 
5497 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5498 {
5499 	struct drbd_peer_device *peer_device;
5500 	struct drbd_device *device;
5501 	struct p_req_state_reply *p = pi->data;
5502 	int retcode = be32_to_cpu(p->retcode);
5503 
5504 	peer_device = conn_peer_device(connection, pi->vnr);
5505 	if (!peer_device)
5506 		return -EIO;
5507 	device = peer_device->device;
5508 
5509 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5510 		D_ASSERT(device, connection->agreed_pro_version < 100);
5511 		return got_conn_RqSReply(connection, pi);
5512 	}
5513 
5514 	if (retcode >= SS_SUCCESS) {
5515 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5516 	} else {
5517 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5518 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5519 			drbd_set_st_err_str(retcode), retcode);
5520 	}
5521 	wake_up(&device->state_wait);
5522 
5523 	return 0;
5524 }
5525 
5526 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5527 {
5528 	return drbd_send_ping_ack(connection);
5529 
5530 }
5531 
5532 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5533 {
5534 	/* restore idle timeout */
5535 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5536 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5537 		wake_up(&connection->ping_wait);
5538 
5539 	return 0;
5540 }
5541 
5542 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5543 {
5544 	struct drbd_peer_device *peer_device;
5545 	struct drbd_device *device;
5546 	struct p_block_ack *p = pi->data;
5547 	sector_t sector = be64_to_cpu(p->sector);
5548 	int blksize = be32_to_cpu(p->blksize);
5549 
5550 	peer_device = conn_peer_device(connection, pi->vnr);
5551 	if (!peer_device)
5552 		return -EIO;
5553 	device = peer_device->device;
5554 
5555 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5556 
5557 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5558 
5559 	if (get_ldev(device)) {
5560 		drbd_rs_complete_io(device, sector);
5561 		drbd_set_in_sync(device, sector, blksize);
5562 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5563 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5564 		put_ldev(device);
5565 	}
5566 	dec_rs_pending(device);
5567 	atomic_add(blksize >> 9, &device->rs_sect_in);
5568 
5569 	return 0;
5570 }
5571 
5572 static int
5573 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5574 			      struct rb_root *root, const char *func,
5575 			      enum drbd_req_event what, bool missing_ok)
5576 {
5577 	struct drbd_request *req;
5578 	struct bio_and_error m;
5579 
5580 	spin_lock_irq(&device->resource->req_lock);
5581 	req = find_request(device, root, id, sector, missing_ok, func);
5582 	if (unlikely(!req)) {
5583 		spin_unlock_irq(&device->resource->req_lock);
5584 		return -EIO;
5585 	}
5586 	__req_mod(req, what, &m);
5587 	spin_unlock_irq(&device->resource->req_lock);
5588 
5589 	if (m.bio)
5590 		complete_master_bio(device, &m);
5591 	return 0;
5592 }
5593 
5594 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5595 {
5596 	struct drbd_peer_device *peer_device;
5597 	struct drbd_device *device;
5598 	struct p_block_ack *p = pi->data;
5599 	sector_t sector = be64_to_cpu(p->sector);
5600 	int blksize = be32_to_cpu(p->blksize);
5601 	enum drbd_req_event what;
5602 
5603 	peer_device = conn_peer_device(connection, pi->vnr);
5604 	if (!peer_device)
5605 		return -EIO;
5606 	device = peer_device->device;
5607 
5608 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5609 
5610 	if (p->block_id == ID_SYNCER) {
5611 		drbd_set_in_sync(device, sector, blksize);
5612 		dec_rs_pending(device);
5613 		return 0;
5614 	}
5615 	switch (pi->cmd) {
5616 	case P_RS_WRITE_ACK:
5617 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5618 		break;
5619 	case P_WRITE_ACK:
5620 		what = WRITE_ACKED_BY_PEER;
5621 		break;
5622 	case P_RECV_ACK:
5623 		what = RECV_ACKED_BY_PEER;
5624 		break;
5625 	case P_SUPERSEDED:
5626 		what = CONFLICT_RESOLVED;
5627 		break;
5628 	case P_RETRY_WRITE:
5629 		what = POSTPONE_WRITE;
5630 		break;
5631 	default:
5632 		BUG();
5633 	}
5634 
5635 	return validate_req_change_req_state(device, p->block_id, sector,
5636 					     &device->write_requests, __func__,
5637 					     what, false);
5638 }
5639 
5640 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5641 {
5642 	struct drbd_peer_device *peer_device;
5643 	struct drbd_device *device;
5644 	struct p_block_ack *p = pi->data;
5645 	sector_t sector = be64_to_cpu(p->sector);
5646 	int size = be32_to_cpu(p->blksize);
5647 	int err;
5648 
5649 	peer_device = conn_peer_device(connection, pi->vnr);
5650 	if (!peer_device)
5651 		return -EIO;
5652 	device = peer_device->device;
5653 
5654 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5655 
5656 	if (p->block_id == ID_SYNCER) {
5657 		dec_rs_pending(device);
5658 		drbd_rs_failed_io(device, sector, size);
5659 		return 0;
5660 	}
5661 
5662 	err = validate_req_change_req_state(device, p->block_id, sector,
5663 					    &device->write_requests, __func__,
5664 					    NEG_ACKED, true);
5665 	if (err) {
5666 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5667 		   The master bio might already be completed, therefore the
5668 		   request is no longer in the collision hash. */
5669 		/* In Protocol B we might already have got a P_RECV_ACK
5670 		   but then get a P_NEG_ACK afterwards. */
5671 		drbd_set_out_of_sync(device, sector, size);
5672 	}
5673 	return 0;
5674 }
5675 
5676 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5677 {
5678 	struct drbd_peer_device *peer_device;
5679 	struct drbd_device *device;
5680 	struct p_block_ack *p = pi->data;
5681 	sector_t sector = be64_to_cpu(p->sector);
5682 
5683 	peer_device = conn_peer_device(connection, pi->vnr);
5684 	if (!peer_device)
5685 		return -EIO;
5686 	device = peer_device->device;
5687 
5688 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5689 
5690 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5691 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5692 
5693 	return validate_req_change_req_state(device, p->block_id, sector,
5694 					     &device->read_requests, __func__,
5695 					     NEG_ACKED, false);
5696 }
5697 
5698 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5699 {
5700 	struct drbd_peer_device *peer_device;
5701 	struct drbd_device *device;
5702 	sector_t sector;
5703 	int size;
5704 	struct p_block_ack *p = pi->data;
5705 
5706 	peer_device = conn_peer_device(connection, pi->vnr);
5707 	if (!peer_device)
5708 		return -EIO;
5709 	device = peer_device->device;
5710 
5711 	sector = be64_to_cpu(p->sector);
5712 	size = be32_to_cpu(p->blksize);
5713 
5714 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5715 
5716 	dec_rs_pending(device);
5717 
5718 	if (get_ldev_if_state(device, D_FAILED)) {
5719 		drbd_rs_complete_io(device, sector);
5720 		switch (pi->cmd) {
5721 		case P_NEG_RS_DREPLY:
5722 			drbd_rs_failed_io(device, sector, size);
5723 		case P_RS_CANCEL:
5724 			break;
5725 		default:
5726 			BUG();
5727 		}
5728 		put_ldev(device);
5729 	}
5730 
5731 	return 0;
5732 }
5733 
5734 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5735 {
5736 	struct p_barrier_ack *p = pi->data;
5737 	struct drbd_peer_device *peer_device;
5738 	int vnr;
5739 
5740 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5741 
5742 	rcu_read_lock();
5743 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5744 		struct drbd_device *device = peer_device->device;
5745 
5746 		if (device->state.conn == C_AHEAD &&
5747 		    atomic_read(&device->ap_in_flight) == 0 &&
5748 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5749 			device->start_resync_timer.expires = jiffies + HZ;
5750 			add_timer(&device->start_resync_timer);
5751 		}
5752 	}
5753 	rcu_read_unlock();
5754 
5755 	return 0;
5756 }
5757 
5758 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5759 {
5760 	struct drbd_peer_device *peer_device;
5761 	struct drbd_device *device;
5762 	struct p_block_ack *p = pi->data;
5763 	struct drbd_device_work *dw;
5764 	sector_t sector;
5765 	int size;
5766 
5767 	peer_device = conn_peer_device(connection, pi->vnr);
5768 	if (!peer_device)
5769 		return -EIO;
5770 	device = peer_device->device;
5771 
5772 	sector = be64_to_cpu(p->sector);
5773 	size = be32_to_cpu(p->blksize);
5774 
5775 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5776 
5777 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5778 		drbd_ov_out_of_sync_found(device, sector, size);
5779 	else
5780 		ov_out_of_sync_print(device);
5781 
5782 	if (!get_ldev(device))
5783 		return 0;
5784 
5785 	drbd_rs_complete_io(device, sector);
5786 	dec_rs_pending(device);
5787 
5788 	--device->ov_left;
5789 
5790 	/* let's advance progress step marks only for every other megabyte */
5791 	if ((device->ov_left & 0x200) == 0x200)
5792 		drbd_advance_rs_marks(device, device->ov_left);
5793 
5794 	if (device->ov_left == 0) {
5795 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5796 		if (dw) {
5797 			dw->w.cb = w_ov_finished;
5798 			dw->device = device;
5799 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5800 		} else {
5801 			drbd_err(device, "kmalloc(dw) failed.");
5802 			ov_out_of_sync_print(device);
5803 			drbd_resync_finished(device);
5804 		}
5805 	}
5806 	put_ldev(device);
5807 	return 0;
5808 }
5809 
5810 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5811 {
5812 	return 0;
5813 }
5814 
5815 struct meta_sock_cmd {
5816 	size_t pkt_size;
5817 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5818 };
5819 
5820 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5821 {
5822 	long t;
5823 	struct net_conf *nc;
5824 
5825 	rcu_read_lock();
5826 	nc = rcu_dereference(connection->net_conf);
5827 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5828 	rcu_read_unlock();
5829 
5830 	t *= HZ;
5831 	if (ping_timeout)
5832 		t /= 10;
5833 
5834 	connection->meta.socket->sk->sk_rcvtimeo = t;
5835 }
5836 
5837 static void set_ping_timeout(struct drbd_connection *connection)
5838 {
5839 	set_rcvtimeo(connection, 1);
5840 }
5841 
5842 static void set_idle_timeout(struct drbd_connection *connection)
5843 {
5844 	set_rcvtimeo(connection, 0);
5845 }
5846 
5847 static struct meta_sock_cmd ack_receiver_tbl[] = {
5848 	[P_PING]	    = { 0, got_Ping },
5849 	[P_PING_ACK]	    = { 0, got_PingAck },
5850 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5851 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5852 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5853 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5854 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5855 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5856 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5857 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5858 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5859 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5860 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5861 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5862 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5863 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5864 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5865 };
5866 
5867 int drbd_ack_receiver(struct drbd_thread *thi)
5868 {
5869 	struct drbd_connection *connection = thi->connection;
5870 	struct meta_sock_cmd *cmd = NULL;
5871 	struct packet_info pi;
5872 	unsigned long pre_recv_jif;
5873 	int rv;
5874 	void *buf    = connection->meta.rbuf;
5875 	int received = 0;
5876 	unsigned int header_size = drbd_header_size(connection);
5877 	int expect   = header_size;
5878 	bool ping_timeout_active = false;
5879 	struct sched_param param = { .sched_priority = 2 };
5880 
5881 	rv = sched_setscheduler(current, SCHED_RR, &param);
5882 	if (rv < 0)
5883 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5884 
5885 	while (get_t_state(thi) == RUNNING) {
5886 		drbd_thread_current_set_cpu(thi);
5887 
5888 		conn_reclaim_net_peer_reqs(connection);
5889 
5890 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5891 			if (drbd_send_ping(connection)) {
5892 				drbd_err(connection, "drbd_send_ping has failed\n");
5893 				goto reconnect;
5894 			}
5895 			set_ping_timeout(connection);
5896 			ping_timeout_active = true;
5897 		}
5898 
5899 		pre_recv_jif = jiffies;
5900 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5901 
5902 		/* Note:
5903 		 * -EINTR	 (on meta) we got a signal
5904 		 * -EAGAIN	 (on meta) rcvtimeo expired
5905 		 * -ECONNRESET	 other side closed the connection
5906 		 * -ERESTARTSYS  (on data) we got a signal
5907 		 * rv <  0	 other than above: unexpected error!
5908 		 * rv == expected: full header or command
5909 		 * rv <  expected: "woken" by signal during receive
5910 		 * rv == 0	 : "connection shut down by peer"
5911 		 */
5912 		if (likely(rv > 0)) {
5913 			received += rv;
5914 			buf	 += rv;
5915 		} else if (rv == 0) {
5916 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5917 				long t;
5918 				rcu_read_lock();
5919 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5920 				rcu_read_unlock();
5921 
5922 				t = wait_event_timeout(connection->ping_wait,
5923 						       connection->cstate < C_WF_REPORT_PARAMS,
5924 						       t);
5925 				if (t)
5926 					break;
5927 			}
5928 			drbd_err(connection, "meta connection shut down by peer.\n");
5929 			goto reconnect;
5930 		} else if (rv == -EAGAIN) {
5931 			/* If the data socket received something meanwhile,
5932 			 * that is good enough: peer is still alive. */
5933 			if (time_after(connection->last_received, pre_recv_jif))
5934 				continue;
5935 			if (ping_timeout_active) {
5936 				drbd_err(connection, "PingAck did not arrive in time.\n");
5937 				goto reconnect;
5938 			}
5939 			set_bit(SEND_PING, &connection->flags);
5940 			continue;
5941 		} else if (rv == -EINTR) {
5942 			/* maybe drbd_thread_stop(): the while condition will notice.
5943 			 * maybe woken for send_ping: we'll send a ping above,
5944 			 * and change the rcvtimeo */
5945 			flush_signals(current);
5946 			continue;
5947 		} else {
5948 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5949 			goto reconnect;
5950 		}
5951 
5952 		if (received == expect && cmd == NULL) {
5953 			if (decode_header(connection, connection->meta.rbuf, &pi))
5954 				goto reconnect;
5955 			cmd = &ack_receiver_tbl[pi.cmd];
5956 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5957 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5958 					 cmdname(pi.cmd), pi.cmd);
5959 				goto disconnect;
5960 			}
5961 			expect = header_size + cmd->pkt_size;
5962 			if (pi.size != expect - header_size) {
5963 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5964 					pi.cmd, pi.size);
5965 				goto reconnect;
5966 			}
5967 		}
5968 		if (received == expect) {
5969 			bool err;
5970 
5971 			err = cmd->fn(connection, &pi);
5972 			if (err) {
5973 				drbd_err(connection, "%pf failed\n", cmd->fn);
5974 				goto reconnect;
5975 			}
5976 
5977 			connection->last_received = jiffies;
5978 
5979 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5980 				set_idle_timeout(connection);
5981 				ping_timeout_active = false;
5982 			}
5983 
5984 			buf	 = connection->meta.rbuf;
5985 			received = 0;
5986 			expect	 = header_size;
5987 			cmd	 = NULL;
5988 		}
5989 	}
5990 
5991 	if (0) {
5992 reconnect:
5993 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5994 		conn_md_sync(connection);
5995 	}
5996 	if (0) {
5997 disconnect:
5998 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5999 	}
6000 
6001 	drbd_info(connection, "ack_receiver terminated\n");
6002 
6003 	return 0;
6004 }
6005 
6006 void drbd_send_acks_wf(struct work_struct *ws)
6007 {
6008 	struct drbd_peer_device *peer_device =
6009 		container_of(ws, struct drbd_peer_device, send_acks_work);
6010 	struct drbd_connection *connection = peer_device->connection;
6011 	struct drbd_device *device = peer_device->device;
6012 	struct net_conf *nc;
6013 	int tcp_cork, err;
6014 
6015 	rcu_read_lock();
6016 	nc = rcu_dereference(connection->net_conf);
6017 	tcp_cork = nc->tcp_cork;
6018 	rcu_read_unlock();
6019 
6020 	if (tcp_cork)
6021 		drbd_tcp_cork(connection->meta.socket);
6022 
6023 	err = drbd_finish_peer_reqs(device);
6024 	kref_put(&device->kref, drbd_destroy_device);
6025 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6026 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6027 
6028 	if (err) {
6029 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6030 		return;
6031 	}
6032 
6033 	if (tcp_cork)
6034 		drbd_tcp_uncork(connection->meta.socket);
6035 
6036 	return;
6037 }
6038