1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52 
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54 
55 struct packet_info {
56 	enum drbd_packet cmd;
57 	unsigned int size;
58 	unsigned int vnr;
59 	void *data;
60 };
61 
62 enum finish_epoch {
63 	FE_STILL_LIVE,
64 	FE_DESTROYED,
65 	FE_RECYCLED,
66 };
67 
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74 
75 
76 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
77 
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82 
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89 	struct page *page;
90 	struct page *tmp;
91 
92 	BUG_ON(!n);
93 	BUG_ON(!head);
94 
95 	page = *head;
96 
97 	if (!page)
98 		return NULL;
99 
100 	while (page) {
101 		tmp = page_chain_next(page);
102 		if (--n == 0)
103 			break; /* found sufficient pages */
104 		if (tmp == NULL)
105 			/* insufficient pages, don't use any of them. */
106 			return NULL;
107 		page = tmp;
108 	}
109 
110 	/* add end of list marker for the returned list */
111 	set_page_private(page, 0);
112 	/* actual return value, and adjustment of head */
113 	page = *head;
114 	*head = tmp;
115 	return page;
116 }
117 
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123 	struct page *tmp;
124 	int i = 1;
125 	while ((tmp = page_chain_next(page)))
126 		++i, page = tmp;
127 	if (len)
128 		*len = i;
129 	return page;
130 }
131 
132 static int page_chain_free(struct page *page)
133 {
134 	struct page *tmp;
135 	int i = 0;
136 	page_chain_for_each_safe(page, tmp) {
137 		put_page(page);
138 		++i;
139 	}
140 	return i;
141 }
142 
143 static void page_chain_add(struct page **head,
144 		struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147 	struct page *tmp;
148 	tmp = page_chain_tail(chain_first, NULL);
149 	BUG_ON(tmp != chain_last);
150 #endif
151 
152 	/* add chain to head */
153 	set_page_private(chain_last, (unsigned long)*head);
154 	*head = chain_first;
155 }
156 
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 				       unsigned int number)
159 {
160 	struct page *page = NULL;
161 	struct page *tmp = NULL;
162 	unsigned int i = 0;
163 
164 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
165 	 * So what. It saves a spin_lock. */
166 	if (drbd_pp_vacant >= number) {
167 		spin_lock(&drbd_pp_lock);
168 		page = page_chain_del(&drbd_pp_pool, number);
169 		if (page)
170 			drbd_pp_vacant -= number;
171 		spin_unlock(&drbd_pp_lock);
172 		if (page)
173 			return page;
174 	}
175 
176 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 	 * which in turn might block on the other node at this very place.  */
179 	for (i = 0; i < number; i++) {
180 		tmp = alloc_page(GFP_TRY);
181 		if (!tmp)
182 			break;
183 		set_page_private(tmp, (unsigned long)page);
184 		page = tmp;
185 	}
186 
187 	if (i == number)
188 		return page;
189 
190 	/* Not enough pages immediately available this time.
191 	 * No need to jump around here, drbd_alloc_pages will retry this
192 	 * function "soon". */
193 	if (page) {
194 		tmp = page_chain_tail(page, NULL);
195 		spin_lock(&drbd_pp_lock);
196 		page_chain_add(&drbd_pp_pool, page, tmp);
197 		drbd_pp_vacant += i;
198 		spin_unlock(&drbd_pp_lock);
199 	}
200 	return NULL;
201 }
202 
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 					   struct list_head *to_be_freed)
205 {
206 	struct drbd_peer_request *peer_req, *tmp;
207 
208 	/* The EEs are always appended to the end of the list. Since
209 	   they are sent in order over the wire, they have to finish
210 	   in order. As soon as we see the first not finished we can
211 	   stop to examine the list... */
212 
213 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 		if (drbd_peer_req_has_active_page(peer_req))
215 			break;
216 		list_move(&peer_req->w.list, to_be_freed);
217 	}
218 }
219 
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222 	LIST_HEAD(reclaimed);
223 	struct drbd_peer_request *peer_req, *t;
224 
225 	spin_lock_irq(&device->resource->req_lock);
226 	reclaim_finished_net_peer_reqs(device, &reclaimed);
227 	spin_unlock_irq(&device->resource->req_lock);
228 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 		drbd_free_net_peer_req(device, peer_req);
230 }
231 
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234 	struct drbd_peer_device *peer_device;
235 	int vnr;
236 
237 	rcu_read_lock();
238 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 		struct drbd_device *device = peer_device->device;
240 		if (!atomic_read(&device->pp_in_use_by_net))
241 			continue;
242 
243 		kref_get(&device->kref);
244 		rcu_read_unlock();
245 		drbd_reclaim_net_peer_reqs(device);
246 		kref_put(&device->kref, drbd_destroy_device);
247 		rcu_read_lock();
248 	}
249 	rcu_read_unlock();
250 }
251 
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:	DRBD device.
255  * @number:	number of pages requested
256  * @retry:	whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 			      bool retry)
274 {
275 	struct drbd_device *device = peer_device->device;
276 	struct page *page = NULL;
277 	struct net_conf *nc;
278 	DEFINE_WAIT(wait);
279 	unsigned int mxb;
280 
281 	rcu_read_lock();
282 	nc = rcu_dereference(peer_device->connection->net_conf);
283 	mxb = nc ? nc->max_buffers : 1000000;
284 	rcu_read_unlock();
285 
286 	if (atomic_read(&device->pp_in_use) < mxb)
287 		page = __drbd_alloc_pages(device, number);
288 
289 	/* Try to keep the fast path fast, but occasionally we need
290 	 * to reclaim the pages we lended to the network stack. */
291 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 		drbd_reclaim_net_peer_reqs(device);
293 
294 	while (page == NULL) {
295 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296 
297 		drbd_reclaim_net_peer_reqs(device);
298 
299 		if (atomic_read(&device->pp_in_use) < mxb) {
300 			page = __drbd_alloc_pages(device, number);
301 			if (page)
302 				break;
303 		}
304 
305 		if (!retry)
306 			break;
307 
308 		if (signal_pending(current)) {
309 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 			break;
311 		}
312 
313 		if (schedule_timeout(HZ/10) == 0)
314 			mxb = UINT_MAX;
315 	}
316 	finish_wait(&drbd_pp_wait, &wait);
317 
318 	if (page)
319 		atomic_add(number, &device->pp_in_use);
320 	return page;
321 }
322 
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 	int i;
331 
332 	if (page == NULL)
333 		return;
334 
335 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336 		i = page_chain_free(page);
337 	else {
338 		struct page *tmp;
339 		tmp = page_chain_tail(page, &i);
340 		spin_lock(&drbd_pp_lock);
341 		page_chain_add(&drbd_pp_pool, page, tmp);
342 		drbd_pp_vacant += i;
343 		spin_unlock(&drbd_pp_lock);
344 	}
345 	i = atomic_sub_return(i, a);
346 	if (i < 0)
347 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 	wake_up(&drbd_pp_wait);
350 }
351 
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355 
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365 
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373 	struct drbd_device *device = peer_device->device;
374 	struct drbd_peer_request *peer_req;
375 	struct page *page = NULL;
376 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377 
378 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 		return NULL;
380 
381 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 	if (!peer_req) {
383 		if (!(gfp_mask & __GFP_NOWARN))
384 			drbd_err(device, "%s: allocation failed\n", __func__);
385 		return NULL;
386 	}
387 
388 	if (nr_pages) {
389 		page = drbd_alloc_pages(peer_device, nr_pages,
390 					gfpflags_allow_blocking(gfp_mask));
391 		if (!page)
392 			goto fail;
393 	}
394 
395 	memset(peer_req, 0, sizeof(*peer_req));
396 	INIT_LIST_HEAD(&peer_req->w.list);
397 	drbd_clear_interval(&peer_req->i);
398 	peer_req->i.size = request_size;
399 	peer_req->i.sector = sector;
400 	peer_req->submit_jif = jiffies;
401 	peer_req->peer_device = peer_device;
402 	peer_req->pages = page;
403 	/*
404 	 * The block_id is opaque to the receiver.  It is not endianness
405 	 * converted, and sent back to the sender unchanged.
406 	 */
407 	peer_req->block_id = id;
408 
409 	return peer_req;
410 
411  fail:
412 	mempool_free(peer_req, drbd_ee_mempool);
413 	return NULL;
414 }
415 
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 		       int is_net)
418 {
419 	might_sleep();
420 	if (peer_req->flags & EE_HAS_DIGEST)
421 		kfree(peer_req->digest);
422 	drbd_free_pages(device, peer_req->pages, is_net);
423 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 		drbd_al_complete_io(device, &peer_req->i);
428 	}
429 	mempool_free(peer_req, drbd_ee_mempool);
430 }
431 
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434 	LIST_HEAD(work_list);
435 	struct drbd_peer_request *peer_req, *t;
436 	int count = 0;
437 	int is_net = list == &device->net_ee;
438 
439 	spin_lock_irq(&device->resource->req_lock);
440 	list_splice_init(list, &work_list);
441 	spin_unlock_irq(&device->resource->req_lock);
442 
443 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 		__drbd_free_peer_req(device, peer_req, is_net);
445 		count++;
446 	}
447 	return count;
448 }
449 
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455 	LIST_HEAD(work_list);
456 	LIST_HEAD(reclaimed);
457 	struct drbd_peer_request *peer_req, *t;
458 	int err = 0;
459 
460 	spin_lock_irq(&device->resource->req_lock);
461 	reclaim_finished_net_peer_reqs(device, &reclaimed);
462 	list_splice_init(&device->done_ee, &work_list);
463 	spin_unlock_irq(&device->resource->req_lock);
464 
465 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 		drbd_free_net_peer_req(device, peer_req);
467 
468 	/* possible callbacks here:
469 	 * e_end_block, and e_end_resync_block, e_send_superseded.
470 	 * all ignore the last argument.
471 	 */
472 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 		int err2;
474 
475 		/* list_del not necessary, next/prev members not touched */
476 		err2 = peer_req->w.cb(&peer_req->w, !!err);
477 		if (!err)
478 			err = err2;
479 		drbd_free_peer_req(device, peer_req);
480 	}
481 	wake_up(&device->ee_wait);
482 
483 	return err;
484 }
485 
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 				     struct list_head *head)
488 {
489 	DEFINE_WAIT(wait);
490 
491 	/* avoids spin_lock/unlock
492 	 * and calling prepare_to_wait in the fast path */
493 	while (!list_empty(head)) {
494 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 		spin_unlock_irq(&device->resource->req_lock);
496 		io_schedule();
497 		finish_wait(&device->ee_wait, &wait);
498 		spin_lock_irq(&device->resource->req_lock);
499 	}
500 }
501 
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 				    struct list_head *head)
504 {
505 	spin_lock_irq(&device->resource->req_lock);
506 	_drbd_wait_ee_list_empty(device, head);
507 	spin_unlock_irq(&device->resource->req_lock);
508 }
509 
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512 	struct kvec iov = {
513 		.iov_base = buf,
514 		.iov_len = size,
515 	};
516 	struct msghdr msg = {
517 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518 	};
519 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521 
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524 	int rv;
525 
526 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527 
528 	if (rv < 0) {
529 		if (rv == -ECONNRESET)
530 			drbd_info(connection, "sock was reset by peer\n");
531 		else if (rv != -ERESTARTSYS)
532 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533 	} else if (rv == 0) {
534 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535 			long t;
536 			rcu_read_lock();
537 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538 			rcu_read_unlock();
539 
540 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541 
542 			if (t)
543 				goto out;
544 		}
545 		drbd_info(connection, "sock was shut down by peer\n");
546 	}
547 
548 	if (rv != size)
549 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550 
551 out:
552 	return rv;
553 }
554 
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557 	int err;
558 
559 	err = drbd_recv(connection, buf, size);
560 	if (err != size) {
561 		if (err >= 0)
562 			err = -EIO;
563 	} else
564 		err = 0;
565 	return err;
566 }
567 
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570 	int err;
571 
572 	err = drbd_recv_all(connection, buf, size);
573 	if (err && !signal_pending(current))
574 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575 	return err;
576 }
577 
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584 		unsigned int rcv)
585 {
586 	/* open coded SO_SNDBUF, SO_RCVBUF */
587 	if (snd) {
588 		sock->sk->sk_sndbuf = snd;
589 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590 	}
591 	if (rcv) {
592 		sock->sk->sk_rcvbuf = rcv;
593 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594 	}
595 }
596 
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599 	const char *what;
600 	struct socket *sock;
601 	struct sockaddr_in6 src_in6;
602 	struct sockaddr_in6 peer_in6;
603 	struct net_conf *nc;
604 	int err, peer_addr_len, my_addr_len;
605 	int sndbuf_size, rcvbuf_size, connect_int;
606 	int disconnect_on_error = 1;
607 
608 	rcu_read_lock();
609 	nc = rcu_dereference(connection->net_conf);
610 	if (!nc) {
611 		rcu_read_unlock();
612 		return NULL;
613 	}
614 	sndbuf_size = nc->sndbuf_size;
615 	rcvbuf_size = nc->rcvbuf_size;
616 	connect_int = nc->connect_int;
617 	rcu_read_unlock();
618 
619 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
621 
622 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623 		src_in6.sin6_port = 0;
624 	else
625 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626 
627 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629 
630 	what = "sock_create_kern";
631 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632 			       SOCK_STREAM, IPPROTO_TCP, &sock);
633 	if (err < 0) {
634 		sock = NULL;
635 		goto out;
636 	}
637 
638 	sock->sk->sk_rcvtimeo =
639 	sock->sk->sk_sndtimeo = connect_int * HZ;
640 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641 
642        /* explicitly bind to the configured IP as source IP
643 	*  for the outgoing connections.
644 	*  This is needed for multihomed hosts and to be
645 	*  able to use lo: interfaces for drbd.
646 	* Make sure to use 0 as port number, so linux selects
647 	*  a free one dynamically.
648 	*/
649 	what = "bind before connect";
650 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651 	if (err < 0)
652 		goto out;
653 
654 	/* connect may fail, peer not yet available.
655 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
656 	disconnect_on_error = 0;
657 	what = "connect";
658 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659 
660 out:
661 	if (err < 0) {
662 		if (sock) {
663 			sock_release(sock);
664 			sock = NULL;
665 		}
666 		switch (-err) {
667 			/* timeout, busy, signal pending */
668 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669 		case EINTR: case ERESTARTSYS:
670 			/* peer not (yet) available, network problem */
671 		case ECONNREFUSED: case ENETUNREACH:
672 		case EHOSTDOWN:    case EHOSTUNREACH:
673 			disconnect_on_error = 0;
674 			break;
675 		default:
676 			drbd_err(connection, "%s failed, err = %d\n", what, err);
677 		}
678 		if (disconnect_on_error)
679 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680 	}
681 
682 	return sock;
683 }
684 
685 struct accept_wait_data {
686 	struct drbd_connection *connection;
687 	struct socket *s_listen;
688 	struct completion door_bell;
689 	void (*original_sk_state_change)(struct sock *sk);
690 
691 };
692 
693 static void drbd_incoming_connection(struct sock *sk)
694 {
695 	struct accept_wait_data *ad = sk->sk_user_data;
696 	void (*state_change)(struct sock *sk);
697 
698 	state_change = ad->original_sk_state_change;
699 	if (sk->sk_state == TCP_ESTABLISHED)
700 		complete(&ad->door_bell);
701 	state_change(sk);
702 }
703 
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
707 	struct sockaddr_in6 my_addr;
708 	struct socket *s_listen;
709 	struct net_conf *nc;
710 	const char *what;
711 
712 	rcu_read_lock();
713 	nc = rcu_dereference(connection->net_conf);
714 	if (!nc) {
715 		rcu_read_unlock();
716 		return -EIO;
717 	}
718 	sndbuf_size = nc->sndbuf_size;
719 	rcvbuf_size = nc->rcvbuf_size;
720 	rcu_read_unlock();
721 
722 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
724 
725 	what = "sock_create_kern";
726 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
728 	if (err) {
729 		s_listen = NULL;
730 		goto out;
731 	}
732 
733 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735 
736 	what = "bind before listen";
737 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738 	if (err < 0)
739 		goto out;
740 
741 	ad->s_listen = s_listen;
742 	write_lock_bh(&s_listen->sk->sk_callback_lock);
743 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
744 	s_listen->sk->sk_state_change = drbd_incoming_connection;
745 	s_listen->sk->sk_user_data = ad;
746 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
747 
748 	what = "listen";
749 	err = s_listen->ops->listen(s_listen, 5);
750 	if (err < 0)
751 		goto out;
752 
753 	return 0;
754 out:
755 	if (s_listen)
756 		sock_release(s_listen);
757 	if (err < 0) {
758 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759 			drbd_err(connection, "%s failed, err = %d\n", what, err);
760 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761 		}
762 	}
763 
764 	return -EIO;
765 }
766 
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769 	write_lock_bh(&sk->sk_callback_lock);
770 	sk->sk_state_change = ad->original_sk_state_change;
771 	sk->sk_user_data = NULL;
772 	write_unlock_bh(&sk->sk_callback_lock);
773 }
774 
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777 	int timeo, connect_int, err = 0;
778 	struct socket *s_estab = NULL;
779 	struct net_conf *nc;
780 
781 	rcu_read_lock();
782 	nc = rcu_dereference(connection->net_conf);
783 	if (!nc) {
784 		rcu_read_unlock();
785 		return NULL;
786 	}
787 	connect_int = nc->connect_int;
788 	rcu_read_unlock();
789 
790 	timeo = connect_int * HZ;
791 	/* 28.5% random jitter */
792 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793 
794 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795 	if (err <= 0)
796 		return NULL;
797 
798 	err = kernel_accept(ad->s_listen, &s_estab, 0);
799 	if (err < 0) {
800 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801 			drbd_err(connection, "accept failed, err = %d\n", err);
802 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803 		}
804 	}
805 
806 	if (s_estab)
807 		unregister_state_change(s_estab->sk, ad);
808 
809 	return s_estab;
810 }
811 
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813 
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815 			     enum drbd_packet cmd)
816 {
817 	if (!conn_prepare_command(connection, sock))
818 		return -EIO;
819 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821 
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824 	unsigned int header_size = drbd_header_size(connection);
825 	struct packet_info pi;
826 	struct net_conf *nc;
827 	int err;
828 
829 	rcu_read_lock();
830 	nc = rcu_dereference(connection->net_conf);
831 	if (!nc) {
832 		rcu_read_unlock();
833 		return -EIO;
834 	}
835 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836 	rcu_read_unlock();
837 
838 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839 	if (err != header_size) {
840 		if (err >= 0)
841 			err = -EIO;
842 		return err;
843 	}
844 	err = decode_header(connection, connection->data.rbuf, &pi);
845 	if (err)
846 		return err;
847 	return pi.cmd;
848 }
849 
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:	pointer to the pointer to the socket.
853  */
854 static bool drbd_socket_okay(struct socket **sock)
855 {
856 	int rr;
857 	char tb[4];
858 
859 	if (!*sock)
860 		return false;
861 
862 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863 
864 	if (rr > 0 || rr == -EAGAIN) {
865 		return true;
866 	} else {
867 		sock_release(*sock);
868 		*sock = NULL;
869 		return false;
870 	}
871 }
872 
873 static bool connection_established(struct drbd_connection *connection,
874 				   struct socket **sock1,
875 				   struct socket **sock2)
876 {
877 	struct net_conf *nc;
878 	int timeout;
879 	bool ok;
880 
881 	if (!*sock1 || !*sock2)
882 		return false;
883 
884 	rcu_read_lock();
885 	nc = rcu_dereference(connection->net_conf);
886 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887 	rcu_read_unlock();
888 	schedule_timeout_interruptible(timeout);
889 
890 	ok = drbd_socket_okay(sock1);
891 	ok = drbd_socket_okay(sock2) && ok;
892 
893 	return ok;
894 }
895 
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900 	struct drbd_device *device = peer_device->device;
901 	int err;
902 
903 	atomic_set(&device->packet_seq, 0);
904 	device->peer_seq = 0;
905 
906 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907 		&peer_device->connection->cstate_mutex :
908 		&device->own_state_mutex;
909 
910 	err = drbd_send_sync_param(peer_device);
911 	if (!err)
912 		err = drbd_send_sizes(peer_device, 0, 0);
913 	if (!err)
914 		err = drbd_send_uuids(peer_device);
915 	if (!err)
916 		err = drbd_send_current_state(peer_device);
917 	clear_bit(USE_DEGR_WFC_T, &device->flags);
918 	clear_bit(RESIZE_PENDING, &device->flags);
919 	atomic_set(&device->ap_in_flight, 0);
920 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921 	return err;
922 }
923 
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
932 static int conn_connect(struct drbd_connection *connection)
933 {
934 	struct drbd_socket sock, msock;
935 	struct drbd_peer_device *peer_device;
936 	struct net_conf *nc;
937 	int vnr, timeout, h;
938 	bool discard_my_data, ok;
939 	enum drbd_state_rv rv;
940 	struct accept_wait_data ad = {
941 		.connection = connection,
942 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943 	};
944 
945 	clear_bit(DISCONNECT_SENT, &connection->flags);
946 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947 		return -2;
948 
949 	mutex_init(&sock.mutex);
950 	sock.sbuf = connection->data.sbuf;
951 	sock.rbuf = connection->data.rbuf;
952 	sock.socket = NULL;
953 	mutex_init(&msock.mutex);
954 	msock.sbuf = connection->meta.sbuf;
955 	msock.rbuf = connection->meta.rbuf;
956 	msock.socket = NULL;
957 
958 	/* Assume that the peer only understands protocol 80 until we know better.  */
959 	connection->agreed_pro_version = 80;
960 
961 	if (prepare_listen_socket(connection, &ad))
962 		return 0;
963 
964 	do {
965 		struct socket *s;
966 
967 		s = drbd_try_connect(connection);
968 		if (s) {
969 			if (!sock.socket) {
970 				sock.socket = s;
971 				send_first_packet(connection, &sock, P_INITIAL_DATA);
972 			} else if (!msock.socket) {
973 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974 				msock.socket = s;
975 				send_first_packet(connection, &msock, P_INITIAL_META);
976 			} else {
977 				drbd_err(connection, "Logic error in conn_connect()\n");
978 				goto out_release_sockets;
979 			}
980 		}
981 
982 		if (connection_established(connection, &sock.socket, &msock.socket))
983 			break;
984 
985 retry:
986 		s = drbd_wait_for_connect(connection, &ad);
987 		if (s) {
988 			int fp = receive_first_packet(connection, s);
989 			drbd_socket_okay(&sock.socket);
990 			drbd_socket_okay(&msock.socket);
991 			switch (fp) {
992 			case P_INITIAL_DATA:
993 				if (sock.socket) {
994 					drbd_warn(connection, "initial packet S crossed\n");
995 					sock_release(sock.socket);
996 					sock.socket = s;
997 					goto randomize;
998 				}
999 				sock.socket = s;
1000 				break;
1001 			case P_INITIAL_META:
1002 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003 				if (msock.socket) {
1004 					drbd_warn(connection, "initial packet M crossed\n");
1005 					sock_release(msock.socket);
1006 					msock.socket = s;
1007 					goto randomize;
1008 				}
1009 				msock.socket = s;
1010 				break;
1011 			default:
1012 				drbd_warn(connection, "Error receiving initial packet\n");
1013 				sock_release(s);
1014 randomize:
1015 				if (prandom_u32() & 1)
1016 					goto retry;
1017 			}
1018 		}
1019 
1020 		if (connection->cstate <= C_DISCONNECTING)
1021 			goto out_release_sockets;
1022 		if (signal_pending(current)) {
1023 			flush_signals(current);
1024 			smp_rmb();
1025 			if (get_t_state(&connection->receiver) == EXITING)
1026 				goto out_release_sockets;
1027 		}
1028 
1029 		ok = connection_established(connection, &sock.socket, &msock.socket);
1030 	} while (!ok);
1031 
1032 	if (ad.s_listen)
1033 		sock_release(ad.s_listen);
1034 
1035 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037 
1038 	sock.socket->sk->sk_allocation = GFP_NOIO;
1039 	msock.socket->sk->sk_allocation = GFP_NOIO;
1040 
1041 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043 
1044 	/* NOT YET ...
1045 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047 	 * first set it to the P_CONNECTION_FEATURES timeout,
1048 	 * which we set to 4x the configured ping_timeout. */
1049 	rcu_read_lock();
1050 	nc = rcu_dereference(connection->net_conf);
1051 
1052 	sock.socket->sk->sk_sndtimeo =
1053 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054 
1055 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056 	timeout = nc->timeout * HZ / 10;
1057 	discard_my_data = nc->discard_my_data;
1058 	rcu_read_unlock();
1059 
1060 	msock.socket->sk->sk_sndtimeo = timeout;
1061 
1062 	/* we don't want delays.
1063 	 * we use TCP_CORK where appropriate, though */
1064 	drbd_tcp_nodelay(sock.socket);
1065 	drbd_tcp_nodelay(msock.socket);
1066 
1067 	connection->data.socket = sock.socket;
1068 	connection->meta.socket = msock.socket;
1069 	connection->last_received = jiffies;
1070 
1071 	h = drbd_do_features(connection);
1072 	if (h <= 0)
1073 		return h;
1074 
1075 	if (connection->cram_hmac_tfm) {
1076 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1077 		switch (drbd_do_auth(connection)) {
1078 		case -1:
1079 			drbd_err(connection, "Authentication of peer failed\n");
1080 			return -1;
1081 		case 0:
1082 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083 			return 0;
1084 		}
1085 	}
1086 
1087 	connection->data.socket->sk->sk_sndtimeo = timeout;
1088 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089 
1090 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091 		return -1;
1092 
1093 	/* Prevent a race between resync-handshake and
1094 	 * being promoted to Primary.
1095 	 *
1096 	 * Grab and release the state mutex, so we know that any current
1097 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1098 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1099 	 */
1100 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 		mutex_lock(peer_device->device->state_mutex);
1102 
1103 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1104 	spin_lock_irq(&connection->resource->req_lock);
1105 	set_bit(STATE_SENT, &connection->flags);
1106 	spin_unlock_irq(&connection->resource->req_lock);
1107 
1108 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1109 		mutex_unlock(peer_device->device->state_mutex);
1110 
1111 	rcu_read_lock();
1112 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1113 		struct drbd_device *device = peer_device->device;
1114 		kref_get(&device->kref);
1115 		rcu_read_unlock();
1116 
1117 		if (discard_my_data)
1118 			set_bit(DISCARD_MY_DATA, &device->flags);
1119 		else
1120 			clear_bit(DISCARD_MY_DATA, &device->flags);
1121 
1122 		drbd_connected(peer_device);
1123 		kref_put(&device->kref, drbd_destroy_device);
1124 		rcu_read_lock();
1125 	}
1126 	rcu_read_unlock();
1127 
1128 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1129 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1130 		clear_bit(STATE_SENT, &connection->flags);
1131 		return 0;
1132 	}
1133 
1134 	drbd_thread_start(&connection->ack_receiver);
1135 	/* opencoded create_singlethread_workqueue(),
1136 	 * to be able to use format string arguments */
1137 	connection->ack_sender =
1138 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1139 	if (!connection->ack_sender) {
1140 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1141 		return 0;
1142 	}
1143 
1144 	mutex_lock(&connection->resource->conf_update);
1145 	/* The discard_my_data flag is a single-shot modifier to the next
1146 	 * connection attempt, the handshake of which is now well underway.
1147 	 * No need for rcu style copying of the whole struct
1148 	 * just to clear a single value. */
1149 	connection->net_conf->discard_my_data = 0;
1150 	mutex_unlock(&connection->resource->conf_update);
1151 
1152 	return h;
1153 
1154 out_release_sockets:
1155 	if (ad.s_listen)
1156 		sock_release(ad.s_listen);
1157 	if (sock.socket)
1158 		sock_release(sock.socket);
1159 	if (msock.socket)
1160 		sock_release(msock.socket);
1161 	return -1;
1162 }
1163 
1164 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1165 {
1166 	unsigned int header_size = drbd_header_size(connection);
1167 
1168 	if (header_size == sizeof(struct p_header100) &&
1169 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1170 		struct p_header100 *h = header;
1171 		if (h->pad != 0) {
1172 			drbd_err(connection, "Header padding is not zero\n");
1173 			return -EINVAL;
1174 		}
1175 		pi->vnr = be16_to_cpu(h->volume);
1176 		pi->cmd = be16_to_cpu(h->command);
1177 		pi->size = be32_to_cpu(h->length);
1178 	} else if (header_size == sizeof(struct p_header95) &&
1179 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1180 		struct p_header95 *h = header;
1181 		pi->cmd = be16_to_cpu(h->command);
1182 		pi->size = be32_to_cpu(h->length);
1183 		pi->vnr = 0;
1184 	} else if (header_size == sizeof(struct p_header80) &&
1185 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1186 		struct p_header80 *h = header;
1187 		pi->cmd = be16_to_cpu(h->command);
1188 		pi->size = be16_to_cpu(h->length);
1189 		pi->vnr = 0;
1190 	} else {
1191 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1192 			 be32_to_cpu(*(__be32 *)header),
1193 			 connection->agreed_pro_version);
1194 		return -EINVAL;
1195 	}
1196 	pi->data = header + header_size;
1197 	return 0;
1198 }
1199 
1200 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1201 {
1202 	if (current->plug == &connection->receiver_plug) {
1203 		blk_finish_plug(&connection->receiver_plug);
1204 		blk_start_plug(&connection->receiver_plug);
1205 	} /* else: maybe just schedule() ?? */
1206 }
1207 
1208 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1209 {
1210 	void *buffer = connection->data.rbuf;
1211 	int err;
1212 
1213 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1214 	if (err)
1215 		return err;
1216 
1217 	err = decode_header(connection, buffer, pi);
1218 	connection->last_received = jiffies;
1219 
1220 	return err;
1221 }
1222 
1223 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1224 {
1225 	void *buffer = connection->data.rbuf;
1226 	unsigned int size = drbd_header_size(connection);
1227 	int err;
1228 
1229 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1230 	if (err != size) {
1231 		/* If we have nothing in the receive buffer now, to reduce
1232 		 * application latency, try to drain the backend queues as
1233 		 * quickly as possible, and let remote TCP know what we have
1234 		 * received so far. */
1235 		if (err == -EAGAIN) {
1236 			drbd_tcp_quickack(connection->data.socket);
1237 			drbd_unplug_all_devices(connection);
1238 		}
1239 		if (err > 0) {
1240 			buffer += err;
1241 			size -= err;
1242 		}
1243 		err = drbd_recv_all_warn(connection, buffer, size);
1244 		if (err)
1245 			return err;
1246 	}
1247 
1248 	err = decode_header(connection, connection->data.rbuf, pi);
1249 	connection->last_received = jiffies;
1250 
1251 	return err;
1252 }
1253 /* This is blkdev_issue_flush, but asynchronous.
1254  * We want to submit to all component volumes in parallel,
1255  * then wait for all completions.
1256  */
1257 struct issue_flush_context {
1258 	atomic_t pending;
1259 	int error;
1260 	struct completion done;
1261 };
1262 struct one_flush_context {
1263 	struct drbd_device *device;
1264 	struct issue_flush_context *ctx;
1265 };
1266 
1267 static void one_flush_endio(struct bio *bio)
1268 {
1269 	struct one_flush_context *octx = bio->bi_private;
1270 	struct drbd_device *device = octx->device;
1271 	struct issue_flush_context *ctx = octx->ctx;
1272 
1273 	if (bio->bi_status) {
1274 		ctx->error = blk_status_to_errno(bio->bi_status);
1275 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1276 	}
1277 	kfree(octx);
1278 	bio_put(bio);
1279 
1280 	clear_bit(FLUSH_PENDING, &device->flags);
1281 	put_ldev(device);
1282 	kref_put(&device->kref, drbd_destroy_device);
1283 
1284 	if (atomic_dec_and_test(&ctx->pending))
1285 		complete(&ctx->done);
1286 }
1287 
1288 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1289 {
1290 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1291 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1292 	if (!bio || !octx) {
1293 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1294 		/* FIXME: what else can I do now?  disconnecting or detaching
1295 		 * really does not help to improve the state of the world, either.
1296 		 */
1297 		kfree(octx);
1298 		if (bio)
1299 			bio_put(bio);
1300 
1301 		ctx->error = -ENOMEM;
1302 		put_ldev(device);
1303 		kref_put(&device->kref, drbd_destroy_device);
1304 		return;
1305 	}
1306 
1307 	octx->device = device;
1308 	octx->ctx = ctx;
1309 	bio_set_dev(bio, device->ldev->backing_bdev);
1310 	bio->bi_private = octx;
1311 	bio->bi_end_io = one_flush_endio;
1312 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1313 
1314 	device->flush_jif = jiffies;
1315 	set_bit(FLUSH_PENDING, &device->flags);
1316 	atomic_inc(&ctx->pending);
1317 	submit_bio(bio);
1318 }
1319 
1320 static void drbd_flush(struct drbd_connection *connection)
1321 {
1322 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1323 		struct drbd_peer_device *peer_device;
1324 		struct issue_flush_context ctx;
1325 		int vnr;
1326 
1327 		atomic_set(&ctx.pending, 1);
1328 		ctx.error = 0;
1329 		init_completion(&ctx.done);
1330 
1331 		rcu_read_lock();
1332 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1333 			struct drbd_device *device = peer_device->device;
1334 
1335 			if (!get_ldev(device))
1336 				continue;
1337 			kref_get(&device->kref);
1338 			rcu_read_unlock();
1339 
1340 			submit_one_flush(device, &ctx);
1341 
1342 			rcu_read_lock();
1343 		}
1344 		rcu_read_unlock();
1345 
1346 		/* Do we want to add a timeout,
1347 		 * if disk-timeout is set? */
1348 		if (!atomic_dec_and_test(&ctx.pending))
1349 			wait_for_completion(&ctx.done);
1350 
1351 		if (ctx.error) {
1352 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1353 			 * don't try again for ANY return value != 0
1354 			 * if (rv == -EOPNOTSUPP) */
1355 			/* Any error is already reported by bio_endio callback. */
1356 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1357 		}
1358 	}
1359 }
1360 
1361 /**
1362  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1363  * @device:	DRBD device.
1364  * @epoch:	Epoch object.
1365  * @ev:		Epoch event.
1366  */
1367 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1368 					       struct drbd_epoch *epoch,
1369 					       enum epoch_event ev)
1370 {
1371 	int epoch_size;
1372 	struct drbd_epoch *next_epoch;
1373 	enum finish_epoch rv = FE_STILL_LIVE;
1374 
1375 	spin_lock(&connection->epoch_lock);
1376 	do {
1377 		next_epoch = NULL;
1378 
1379 		epoch_size = atomic_read(&epoch->epoch_size);
1380 
1381 		switch (ev & ~EV_CLEANUP) {
1382 		case EV_PUT:
1383 			atomic_dec(&epoch->active);
1384 			break;
1385 		case EV_GOT_BARRIER_NR:
1386 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1387 			break;
1388 		case EV_BECAME_LAST:
1389 			/* nothing to do*/
1390 			break;
1391 		}
1392 
1393 		if (epoch_size != 0 &&
1394 		    atomic_read(&epoch->active) == 0 &&
1395 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1396 			if (!(ev & EV_CLEANUP)) {
1397 				spin_unlock(&connection->epoch_lock);
1398 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1399 				spin_lock(&connection->epoch_lock);
1400 			}
1401 #if 0
1402 			/* FIXME: dec unacked on connection, once we have
1403 			 * something to count pending connection packets in. */
1404 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1405 				dec_unacked(epoch->connection);
1406 #endif
1407 
1408 			if (connection->current_epoch != epoch) {
1409 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1410 				list_del(&epoch->list);
1411 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1412 				connection->epochs--;
1413 				kfree(epoch);
1414 
1415 				if (rv == FE_STILL_LIVE)
1416 					rv = FE_DESTROYED;
1417 			} else {
1418 				epoch->flags = 0;
1419 				atomic_set(&epoch->epoch_size, 0);
1420 				/* atomic_set(&epoch->active, 0); is already zero */
1421 				if (rv == FE_STILL_LIVE)
1422 					rv = FE_RECYCLED;
1423 			}
1424 		}
1425 
1426 		if (!next_epoch)
1427 			break;
1428 
1429 		epoch = next_epoch;
1430 	} while (1);
1431 
1432 	spin_unlock(&connection->epoch_lock);
1433 
1434 	return rv;
1435 }
1436 
1437 static enum write_ordering_e
1438 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1439 {
1440 	struct disk_conf *dc;
1441 
1442 	dc = rcu_dereference(bdev->disk_conf);
1443 
1444 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1445 		wo = WO_DRAIN_IO;
1446 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1447 		wo = WO_NONE;
1448 
1449 	return wo;
1450 }
1451 
1452 /**
1453  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1454  * @connection:	DRBD connection.
1455  * @wo:		Write ordering method to try.
1456  */
1457 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1458 			      enum write_ordering_e wo)
1459 {
1460 	struct drbd_device *device;
1461 	enum write_ordering_e pwo;
1462 	int vnr;
1463 	static char *write_ordering_str[] = {
1464 		[WO_NONE] = "none",
1465 		[WO_DRAIN_IO] = "drain",
1466 		[WO_BDEV_FLUSH] = "flush",
1467 	};
1468 
1469 	pwo = resource->write_ordering;
1470 	if (wo != WO_BDEV_FLUSH)
1471 		wo = min(pwo, wo);
1472 	rcu_read_lock();
1473 	idr_for_each_entry(&resource->devices, device, vnr) {
1474 		if (get_ldev(device)) {
1475 			wo = max_allowed_wo(device->ldev, wo);
1476 			if (device->ldev == bdev)
1477 				bdev = NULL;
1478 			put_ldev(device);
1479 		}
1480 	}
1481 
1482 	if (bdev)
1483 		wo = max_allowed_wo(bdev, wo);
1484 
1485 	rcu_read_unlock();
1486 
1487 	resource->write_ordering = wo;
1488 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1489 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1490 }
1491 
1492 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1493 {
1494 	struct block_device *bdev = device->ldev->backing_bdev;
1495 
1496 	if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1497 			GFP_NOIO, 0))
1498 		peer_req->flags |= EE_WAS_ERROR;
1499 
1500 	drbd_endio_write_sec_final(peer_req);
1501 }
1502 
1503 static void drbd_issue_peer_wsame(struct drbd_device *device,
1504 				  struct drbd_peer_request *peer_req)
1505 {
1506 	struct block_device *bdev = device->ldev->backing_bdev;
1507 	sector_t s = peer_req->i.sector;
1508 	sector_t nr = peer_req->i.size >> 9;
1509 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1510 		peer_req->flags |= EE_WAS_ERROR;
1511 	drbd_endio_write_sec_final(peer_req);
1512 }
1513 
1514 
1515 /**
1516  * drbd_submit_peer_request()
1517  * @device:	DRBD device.
1518  * @peer_req:	peer request
1519  * @rw:		flag field, see bio->bi_opf
1520  *
1521  * May spread the pages to multiple bios,
1522  * depending on bio_add_page restrictions.
1523  *
1524  * Returns 0 if all bios have been submitted,
1525  * -ENOMEM if we could not allocate enough bios,
1526  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1527  *  single page to an empty bio (which should never happen and likely indicates
1528  *  that the lower level IO stack is in some way broken). This has been observed
1529  *  on certain Xen deployments.
1530  */
1531 /* TODO allocate from our own bio_set. */
1532 int drbd_submit_peer_request(struct drbd_device *device,
1533 			     struct drbd_peer_request *peer_req,
1534 			     const unsigned op, const unsigned op_flags,
1535 			     const int fault_type)
1536 {
1537 	struct bio *bios = NULL;
1538 	struct bio *bio;
1539 	struct page *page = peer_req->pages;
1540 	sector_t sector = peer_req->i.sector;
1541 	unsigned data_size = peer_req->i.size;
1542 	unsigned n_bios = 0;
1543 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1544 	int err = -ENOMEM;
1545 
1546 	/* TRIM/DISCARD: for now, always use the helper function
1547 	 * blkdev_issue_zeroout(..., discard=true).
1548 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1549 	 * Correctness first, performance later.  Next step is to code an
1550 	 * asynchronous variant of the same.
1551 	 */
1552 	if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1553 		/* wait for all pending IO completions, before we start
1554 		 * zeroing things out. */
1555 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1556 		/* add it to the active list now,
1557 		 * so we can find it to present it in debugfs */
1558 		peer_req->submit_jif = jiffies;
1559 		peer_req->flags |= EE_SUBMITTED;
1560 
1561 		/* If this was a resync request from receive_rs_deallocated(),
1562 		 * it is already on the sync_ee list */
1563 		if (list_empty(&peer_req->w.list)) {
1564 			spin_lock_irq(&device->resource->req_lock);
1565 			list_add_tail(&peer_req->w.list, &device->active_ee);
1566 			spin_unlock_irq(&device->resource->req_lock);
1567 		}
1568 
1569 		if (peer_req->flags & EE_IS_TRIM)
1570 			drbd_issue_peer_discard(device, peer_req);
1571 		else /* EE_WRITE_SAME */
1572 			drbd_issue_peer_wsame(device, peer_req);
1573 		return 0;
1574 	}
1575 
1576 	/* In most cases, we will only need one bio.  But in case the lower
1577 	 * level restrictions happen to be different at this offset on this
1578 	 * side than those of the sending peer, we may need to submit the
1579 	 * request in more than one bio.
1580 	 *
1581 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1582 	 * generated bio, but a bio allocated on behalf of the peer.
1583 	 */
1584 next_bio:
1585 	bio = bio_alloc(GFP_NOIO, nr_pages);
1586 	if (!bio) {
1587 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1588 		goto fail;
1589 	}
1590 	/* > peer_req->i.sector, unless this is the first bio */
1591 	bio->bi_iter.bi_sector = sector;
1592 	bio_set_dev(bio, device->ldev->backing_bdev);
1593 	bio_set_op_attrs(bio, op, op_flags);
1594 	bio->bi_private = peer_req;
1595 	bio->bi_end_io = drbd_peer_request_endio;
1596 
1597 	bio->bi_next = bios;
1598 	bios = bio;
1599 	++n_bios;
1600 
1601 	page_chain_for_each(page) {
1602 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1603 		if (!bio_add_page(bio, page, len, 0))
1604 			goto next_bio;
1605 		data_size -= len;
1606 		sector += len >> 9;
1607 		--nr_pages;
1608 	}
1609 	D_ASSERT(device, data_size == 0);
1610 	D_ASSERT(device, page == NULL);
1611 
1612 	atomic_set(&peer_req->pending_bios, n_bios);
1613 	/* for debugfs: update timestamp, mark as submitted */
1614 	peer_req->submit_jif = jiffies;
1615 	peer_req->flags |= EE_SUBMITTED;
1616 	do {
1617 		bio = bios;
1618 		bios = bios->bi_next;
1619 		bio->bi_next = NULL;
1620 
1621 		drbd_generic_make_request(device, fault_type, bio);
1622 	} while (bios);
1623 	return 0;
1624 
1625 fail:
1626 	while (bios) {
1627 		bio = bios;
1628 		bios = bios->bi_next;
1629 		bio_put(bio);
1630 	}
1631 	return err;
1632 }
1633 
1634 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1635 					     struct drbd_peer_request *peer_req)
1636 {
1637 	struct drbd_interval *i = &peer_req->i;
1638 
1639 	drbd_remove_interval(&device->write_requests, i);
1640 	drbd_clear_interval(i);
1641 
1642 	/* Wake up any processes waiting for this peer request to complete.  */
1643 	if (i->waiting)
1644 		wake_up(&device->misc_wait);
1645 }
1646 
1647 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1648 {
1649 	struct drbd_peer_device *peer_device;
1650 	int vnr;
1651 
1652 	rcu_read_lock();
1653 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1654 		struct drbd_device *device = peer_device->device;
1655 
1656 		kref_get(&device->kref);
1657 		rcu_read_unlock();
1658 		drbd_wait_ee_list_empty(device, &device->active_ee);
1659 		kref_put(&device->kref, drbd_destroy_device);
1660 		rcu_read_lock();
1661 	}
1662 	rcu_read_unlock();
1663 }
1664 
1665 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1666 {
1667 	int rv;
1668 	struct p_barrier *p = pi->data;
1669 	struct drbd_epoch *epoch;
1670 
1671 	/* FIXME these are unacked on connection,
1672 	 * not a specific (peer)device.
1673 	 */
1674 	connection->current_epoch->barrier_nr = p->barrier;
1675 	connection->current_epoch->connection = connection;
1676 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1677 
1678 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1679 	 * the activity log, which means it would not be resynced in case the
1680 	 * R_PRIMARY crashes now.
1681 	 * Therefore we must send the barrier_ack after the barrier request was
1682 	 * completed. */
1683 	switch (connection->resource->write_ordering) {
1684 	case WO_NONE:
1685 		if (rv == FE_RECYCLED)
1686 			return 0;
1687 
1688 		/* receiver context, in the writeout path of the other node.
1689 		 * avoid potential distributed deadlock */
1690 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1691 		if (epoch)
1692 			break;
1693 		else
1694 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1695 			/* Fall through */
1696 
1697 	case WO_BDEV_FLUSH:
1698 	case WO_DRAIN_IO:
1699 		conn_wait_active_ee_empty(connection);
1700 		drbd_flush(connection);
1701 
1702 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1703 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1704 			if (epoch)
1705 				break;
1706 		}
1707 
1708 		return 0;
1709 	default:
1710 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1711 			 connection->resource->write_ordering);
1712 		return -EIO;
1713 	}
1714 
1715 	epoch->flags = 0;
1716 	atomic_set(&epoch->epoch_size, 0);
1717 	atomic_set(&epoch->active, 0);
1718 
1719 	spin_lock(&connection->epoch_lock);
1720 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1721 		list_add(&epoch->list, &connection->current_epoch->list);
1722 		connection->current_epoch = epoch;
1723 		connection->epochs++;
1724 	} else {
1725 		/* The current_epoch got recycled while we allocated this one... */
1726 		kfree(epoch);
1727 	}
1728 	spin_unlock(&connection->epoch_lock);
1729 
1730 	return 0;
1731 }
1732 
1733 /* quick wrapper in case payload size != request_size (write same) */
1734 static void drbd_csum_ee_size(struct crypto_ahash *h,
1735 			      struct drbd_peer_request *r, void *d,
1736 			      unsigned int payload_size)
1737 {
1738 	unsigned int tmp = r->i.size;
1739 	r->i.size = payload_size;
1740 	drbd_csum_ee(h, r, d);
1741 	r->i.size = tmp;
1742 }
1743 
1744 /* used from receive_RSDataReply (recv_resync_read)
1745  * and from receive_Data.
1746  * data_size: actual payload ("data in")
1747  * 	for normal writes that is bi_size.
1748  * 	for discards, that is zero.
1749  * 	for write same, it is logical_block_size.
1750  * both trim and write same have the bi_size ("data len to be affected")
1751  * as extra argument in the packet header.
1752  */
1753 static struct drbd_peer_request *
1754 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1755 	      struct packet_info *pi) __must_hold(local)
1756 {
1757 	struct drbd_device *device = peer_device->device;
1758 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1759 	struct drbd_peer_request *peer_req;
1760 	struct page *page;
1761 	int digest_size, err;
1762 	unsigned int data_size = pi->size, ds;
1763 	void *dig_in = peer_device->connection->int_dig_in;
1764 	void *dig_vv = peer_device->connection->int_dig_vv;
1765 	unsigned long *data;
1766 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1767 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1768 
1769 	digest_size = 0;
1770 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1771 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1772 		/*
1773 		 * FIXME: Receive the incoming digest into the receive buffer
1774 		 *	  here, together with its struct p_data?
1775 		 */
1776 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1777 		if (err)
1778 			return NULL;
1779 		data_size -= digest_size;
1780 	}
1781 
1782 	/* assume request_size == data_size, but special case trim and wsame. */
1783 	ds = data_size;
1784 	if (trim) {
1785 		if (!expect(data_size == 0))
1786 			return NULL;
1787 		ds = be32_to_cpu(trim->size);
1788 	} else if (wsame) {
1789 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1790 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1791 				data_size, queue_logical_block_size(device->rq_queue));
1792 			return NULL;
1793 		}
1794 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1795 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1796 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1797 			return NULL;
1798 		}
1799 		ds = be32_to_cpu(wsame->size);
1800 	}
1801 
1802 	if (!expect(IS_ALIGNED(ds, 512)))
1803 		return NULL;
1804 	if (trim || wsame) {
1805 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1806 			return NULL;
1807 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1808 		return NULL;
1809 
1810 	/* even though we trust out peer,
1811 	 * we sometimes have to double check. */
1812 	if (sector + (ds>>9) > capacity) {
1813 		drbd_err(device, "request from peer beyond end of local disk: "
1814 			"capacity: %llus < sector: %llus + size: %u\n",
1815 			(unsigned long long)capacity,
1816 			(unsigned long long)sector, ds);
1817 		return NULL;
1818 	}
1819 
1820 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1821 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1822 	 * which in turn might block on the other node at this very place.  */
1823 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1824 	if (!peer_req)
1825 		return NULL;
1826 
1827 	peer_req->flags |= EE_WRITE;
1828 	if (trim) {
1829 		peer_req->flags |= EE_IS_TRIM;
1830 		return peer_req;
1831 	}
1832 	if (wsame)
1833 		peer_req->flags |= EE_WRITE_SAME;
1834 
1835 	/* receive payload size bytes into page chain */
1836 	ds = data_size;
1837 	page = peer_req->pages;
1838 	page_chain_for_each(page) {
1839 		unsigned len = min_t(int, ds, PAGE_SIZE);
1840 		data = kmap(page);
1841 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1842 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1843 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1844 			data[0] = data[0] ^ (unsigned long)-1;
1845 		}
1846 		kunmap(page);
1847 		if (err) {
1848 			drbd_free_peer_req(device, peer_req);
1849 			return NULL;
1850 		}
1851 		ds -= len;
1852 	}
1853 
1854 	if (digest_size) {
1855 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1856 		if (memcmp(dig_in, dig_vv, digest_size)) {
1857 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1858 				(unsigned long long)sector, data_size);
1859 			drbd_free_peer_req(device, peer_req);
1860 			return NULL;
1861 		}
1862 	}
1863 	device->recv_cnt += data_size >> 9;
1864 	return peer_req;
1865 }
1866 
1867 /* drbd_drain_block() just takes a data block
1868  * out of the socket input buffer, and discards it.
1869  */
1870 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1871 {
1872 	struct page *page;
1873 	int err = 0;
1874 	void *data;
1875 
1876 	if (!data_size)
1877 		return 0;
1878 
1879 	page = drbd_alloc_pages(peer_device, 1, 1);
1880 
1881 	data = kmap(page);
1882 	while (data_size) {
1883 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1884 
1885 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1886 		if (err)
1887 			break;
1888 		data_size -= len;
1889 	}
1890 	kunmap(page);
1891 	drbd_free_pages(peer_device->device, page, 0);
1892 	return err;
1893 }
1894 
1895 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1896 			   sector_t sector, int data_size)
1897 {
1898 	struct bio_vec bvec;
1899 	struct bvec_iter iter;
1900 	struct bio *bio;
1901 	int digest_size, err, expect;
1902 	void *dig_in = peer_device->connection->int_dig_in;
1903 	void *dig_vv = peer_device->connection->int_dig_vv;
1904 
1905 	digest_size = 0;
1906 	if (peer_device->connection->peer_integrity_tfm) {
1907 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1908 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1909 		if (err)
1910 			return err;
1911 		data_size -= digest_size;
1912 	}
1913 
1914 	/* optimistically update recv_cnt.  if receiving fails below,
1915 	 * we disconnect anyways, and counters will be reset. */
1916 	peer_device->device->recv_cnt += data_size>>9;
1917 
1918 	bio = req->master_bio;
1919 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1920 
1921 	bio_for_each_segment(bvec, bio, iter) {
1922 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1923 		expect = min_t(int, data_size, bvec.bv_len);
1924 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1925 		kunmap(bvec.bv_page);
1926 		if (err)
1927 			return err;
1928 		data_size -= expect;
1929 	}
1930 
1931 	if (digest_size) {
1932 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1933 		if (memcmp(dig_in, dig_vv, digest_size)) {
1934 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1935 			return -EINVAL;
1936 		}
1937 	}
1938 
1939 	D_ASSERT(peer_device->device, data_size == 0);
1940 	return 0;
1941 }
1942 
1943 /*
1944  * e_end_resync_block() is called in ack_sender context via
1945  * drbd_finish_peer_reqs().
1946  */
1947 static int e_end_resync_block(struct drbd_work *w, int unused)
1948 {
1949 	struct drbd_peer_request *peer_req =
1950 		container_of(w, struct drbd_peer_request, w);
1951 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1952 	struct drbd_device *device = peer_device->device;
1953 	sector_t sector = peer_req->i.sector;
1954 	int err;
1955 
1956 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1957 
1958 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1959 		drbd_set_in_sync(device, sector, peer_req->i.size);
1960 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1961 	} else {
1962 		/* Record failure to sync */
1963 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1964 
1965 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1966 	}
1967 	dec_unacked(device);
1968 
1969 	return err;
1970 }
1971 
1972 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1973 			    struct packet_info *pi) __releases(local)
1974 {
1975 	struct drbd_device *device = peer_device->device;
1976 	struct drbd_peer_request *peer_req;
1977 
1978 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1979 	if (!peer_req)
1980 		goto fail;
1981 
1982 	dec_rs_pending(device);
1983 
1984 	inc_unacked(device);
1985 	/* corresponding dec_unacked() in e_end_resync_block()
1986 	 * respective _drbd_clear_done_ee */
1987 
1988 	peer_req->w.cb = e_end_resync_block;
1989 	peer_req->submit_jif = jiffies;
1990 
1991 	spin_lock_irq(&device->resource->req_lock);
1992 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1993 	spin_unlock_irq(&device->resource->req_lock);
1994 
1995 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1996 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1997 				     DRBD_FAULT_RS_WR) == 0)
1998 		return 0;
1999 
2000 	/* don't care for the reason here */
2001 	drbd_err(device, "submit failed, triggering re-connect\n");
2002 	spin_lock_irq(&device->resource->req_lock);
2003 	list_del(&peer_req->w.list);
2004 	spin_unlock_irq(&device->resource->req_lock);
2005 
2006 	drbd_free_peer_req(device, peer_req);
2007 fail:
2008 	put_ldev(device);
2009 	return -EIO;
2010 }
2011 
2012 static struct drbd_request *
2013 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2014 	     sector_t sector, bool missing_ok, const char *func)
2015 {
2016 	struct drbd_request *req;
2017 
2018 	/* Request object according to our peer */
2019 	req = (struct drbd_request *)(unsigned long)id;
2020 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2021 		return req;
2022 	if (!missing_ok) {
2023 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2024 			(unsigned long)id, (unsigned long long)sector);
2025 	}
2026 	return NULL;
2027 }
2028 
2029 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2030 {
2031 	struct drbd_peer_device *peer_device;
2032 	struct drbd_device *device;
2033 	struct drbd_request *req;
2034 	sector_t sector;
2035 	int err;
2036 	struct p_data *p = pi->data;
2037 
2038 	peer_device = conn_peer_device(connection, pi->vnr);
2039 	if (!peer_device)
2040 		return -EIO;
2041 	device = peer_device->device;
2042 
2043 	sector = be64_to_cpu(p->sector);
2044 
2045 	spin_lock_irq(&device->resource->req_lock);
2046 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2047 	spin_unlock_irq(&device->resource->req_lock);
2048 	if (unlikely(!req))
2049 		return -EIO;
2050 
2051 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2052 	 * special casing it there for the various failure cases.
2053 	 * still no race with drbd_fail_pending_reads */
2054 	err = recv_dless_read(peer_device, req, sector, pi->size);
2055 	if (!err)
2056 		req_mod(req, DATA_RECEIVED);
2057 	/* else: nothing. handled from drbd_disconnect...
2058 	 * I don't think we may complete this just yet
2059 	 * in case we are "on-disconnect: freeze" */
2060 
2061 	return err;
2062 }
2063 
2064 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2065 {
2066 	struct drbd_peer_device *peer_device;
2067 	struct drbd_device *device;
2068 	sector_t sector;
2069 	int err;
2070 	struct p_data *p = pi->data;
2071 
2072 	peer_device = conn_peer_device(connection, pi->vnr);
2073 	if (!peer_device)
2074 		return -EIO;
2075 	device = peer_device->device;
2076 
2077 	sector = be64_to_cpu(p->sector);
2078 	D_ASSERT(device, p->block_id == ID_SYNCER);
2079 
2080 	if (get_ldev(device)) {
2081 		/* data is submitted to disk within recv_resync_read.
2082 		 * corresponding put_ldev done below on error,
2083 		 * or in drbd_peer_request_endio. */
2084 		err = recv_resync_read(peer_device, sector, pi);
2085 	} else {
2086 		if (__ratelimit(&drbd_ratelimit_state))
2087 			drbd_err(device, "Can not write resync data to local disk.\n");
2088 
2089 		err = drbd_drain_block(peer_device, pi->size);
2090 
2091 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2092 	}
2093 
2094 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2095 
2096 	return err;
2097 }
2098 
2099 static void restart_conflicting_writes(struct drbd_device *device,
2100 				       sector_t sector, int size)
2101 {
2102 	struct drbd_interval *i;
2103 	struct drbd_request *req;
2104 
2105 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2106 		if (!i->local)
2107 			continue;
2108 		req = container_of(i, struct drbd_request, i);
2109 		if (req->rq_state & RQ_LOCAL_PENDING ||
2110 		    !(req->rq_state & RQ_POSTPONED))
2111 			continue;
2112 		/* as it is RQ_POSTPONED, this will cause it to
2113 		 * be queued on the retry workqueue. */
2114 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2115 	}
2116 }
2117 
2118 /*
2119  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2120  */
2121 static int e_end_block(struct drbd_work *w, int cancel)
2122 {
2123 	struct drbd_peer_request *peer_req =
2124 		container_of(w, struct drbd_peer_request, w);
2125 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2126 	struct drbd_device *device = peer_device->device;
2127 	sector_t sector = peer_req->i.sector;
2128 	int err = 0, pcmd;
2129 
2130 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2131 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2132 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2133 				device->state.conn <= C_PAUSED_SYNC_T &&
2134 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2135 				P_RS_WRITE_ACK : P_WRITE_ACK;
2136 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2137 			if (pcmd == P_RS_WRITE_ACK)
2138 				drbd_set_in_sync(device, sector, peer_req->i.size);
2139 		} else {
2140 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2141 			/* we expect it to be marked out of sync anyways...
2142 			 * maybe assert this?  */
2143 		}
2144 		dec_unacked(device);
2145 	}
2146 
2147 	/* we delete from the conflict detection hash _after_ we sent out the
2148 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2149 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2150 		spin_lock_irq(&device->resource->req_lock);
2151 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2152 		drbd_remove_epoch_entry_interval(device, peer_req);
2153 		if (peer_req->flags & EE_RESTART_REQUESTS)
2154 			restart_conflicting_writes(device, sector, peer_req->i.size);
2155 		spin_unlock_irq(&device->resource->req_lock);
2156 	} else
2157 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2158 
2159 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2160 
2161 	return err;
2162 }
2163 
2164 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2165 {
2166 	struct drbd_peer_request *peer_req =
2167 		container_of(w, struct drbd_peer_request, w);
2168 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2169 	int err;
2170 
2171 	err = drbd_send_ack(peer_device, ack, peer_req);
2172 	dec_unacked(peer_device->device);
2173 
2174 	return err;
2175 }
2176 
2177 static int e_send_superseded(struct drbd_work *w, int unused)
2178 {
2179 	return e_send_ack(w, P_SUPERSEDED);
2180 }
2181 
2182 static int e_send_retry_write(struct drbd_work *w, int unused)
2183 {
2184 	struct drbd_peer_request *peer_req =
2185 		container_of(w, struct drbd_peer_request, w);
2186 	struct drbd_connection *connection = peer_req->peer_device->connection;
2187 
2188 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2189 			     P_RETRY_WRITE : P_SUPERSEDED);
2190 }
2191 
2192 static bool seq_greater(u32 a, u32 b)
2193 {
2194 	/*
2195 	 * We assume 32-bit wrap-around here.
2196 	 * For 24-bit wrap-around, we would have to shift:
2197 	 *  a <<= 8; b <<= 8;
2198 	 */
2199 	return (s32)a - (s32)b > 0;
2200 }
2201 
2202 static u32 seq_max(u32 a, u32 b)
2203 {
2204 	return seq_greater(a, b) ? a : b;
2205 }
2206 
2207 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2208 {
2209 	struct drbd_device *device = peer_device->device;
2210 	unsigned int newest_peer_seq;
2211 
2212 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2213 		spin_lock(&device->peer_seq_lock);
2214 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2215 		device->peer_seq = newest_peer_seq;
2216 		spin_unlock(&device->peer_seq_lock);
2217 		/* wake up only if we actually changed device->peer_seq */
2218 		if (peer_seq == newest_peer_seq)
2219 			wake_up(&device->seq_wait);
2220 	}
2221 }
2222 
2223 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2224 {
2225 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2226 }
2227 
2228 /* maybe change sync_ee into interval trees as well? */
2229 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2230 {
2231 	struct drbd_peer_request *rs_req;
2232 	bool rv = false;
2233 
2234 	spin_lock_irq(&device->resource->req_lock);
2235 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2236 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2237 			     rs_req->i.sector, rs_req->i.size)) {
2238 			rv = true;
2239 			break;
2240 		}
2241 	}
2242 	spin_unlock_irq(&device->resource->req_lock);
2243 
2244 	return rv;
2245 }
2246 
2247 /* Called from receive_Data.
2248  * Synchronize packets on sock with packets on msock.
2249  *
2250  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2251  * packet traveling on msock, they are still processed in the order they have
2252  * been sent.
2253  *
2254  * Note: we don't care for Ack packets overtaking P_DATA packets.
2255  *
2256  * In case packet_seq is larger than device->peer_seq number, there are
2257  * outstanding packets on the msock. We wait for them to arrive.
2258  * In case we are the logically next packet, we update device->peer_seq
2259  * ourselves. Correctly handles 32bit wrap around.
2260  *
2261  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2262  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2263  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2264  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2265  *
2266  * returns 0 if we may process the packet,
2267  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2268 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2269 {
2270 	struct drbd_device *device = peer_device->device;
2271 	DEFINE_WAIT(wait);
2272 	long timeout;
2273 	int ret = 0, tp;
2274 
2275 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2276 		return 0;
2277 
2278 	spin_lock(&device->peer_seq_lock);
2279 	for (;;) {
2280 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2281 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2282 			break;
2283 		}
2284 
2285 		if (signal_pending(current)) {
2286 			ret = -ERESTARTSYS;
2287 			break;
2288 		}
2289 
2290 		rcu_read_lock();
2291 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2292 		rcu_read_unlock();
2293 
2294 		if (!tp)
2295 			break;
2296 
2297 		/* Only need to wait if two_primaries is enabled */
2298 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2299 		spin_unlock(&device->peer_seq_lock);
2300 		rcu_read_lock();
2301 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2302 		rcu_read_unlock();
2303 		timeout = schedule_timeout(timeout);
2304 		spin_lock(&device->peer_seq_lock);
2305 		if (!timeout) {
2306 			ret = -ETIMEDOUT;
2307 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2308 			break;
2309 		}
2310 	}
2311 	spin_unlock(&device->peer_seq_lock);
2312 	finish_wait(&device->seq_wait, &wait);
2313 	return ret;
2314 }
2315 
2316 /* see also bio_flags_to_wire()
2317  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2318  * flags and back. We may replicate to other kernel versions. */
2319 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2320 {
2321 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2322 		(dpf & DP_FUA ? REQ_FUA : 0) |
2323 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2324 }
2325 
2326 static unsigned long wire_flags_to_bio_op(u32 dpf)
2327 {
2328 	if (dpf & DP_DISCARD)
2329 		return REQ_OP_WRITE_ZEROES;
2330 	else
2331 		return REQ_OP_WRITE;
2332 }
2333 
2334 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2335 				    unsigned int size)
2336 {
2337 	struct drbd_interval *i;
2338 
2339     repeat:
2340 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2341 		struct drbd_request *req;
2342 		struct bio_and_error m;
2343 
2344 		if (!i->local)
2345 			continue;
2346 		req = container_of(i, struct drbd_request, i);
2347 		if (!(req->rq_state & RQ_POSTPONED))
2348 			continue;
2349 		req->rq_state &= ~RQ_POSTPONED;
2350 		__req_mod(req, NEG_ACKED, &m);
2351 		spin_unlock_irq(&device->resource->req_lock);
2352 		if (m.bio)
2353 			complete_master_bio(device, &m);
2354 		spin_lock_irq(&device->resource->req_lock);
2355 		goto repeat;
2356 	}
2357 }
2358 
2359 static int handle_write_conflicts(struct drbd_device *device,
2360 				  struct drbd_peer_request *peer_req)
2361 {
2362 	struct drbd_connection *connection = peer_req->peer_device->connection;
2363 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2364 	sector_t sector = peer_req->i.sector;
2365 	const unsigned int size = peer_req->i.size;
2366 	struct drbd_interval *i;
2367 	bool equal;
2368 	int err;
2369 
2370 	/*
2371 	 * Inserting the peer request into the write_requests tree will prevent
2372 	 * new conflicting local requests from being added.
2373 	 */
2374 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2375 
2376     repeat:
2377 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2378 		if (i == &peer_req->i)
2379 			continue;
2380 		if (i->completed)
2381 			continue;
2382 
2383 		if (!i->local) {
2384 			/*
2385 			 * Our peer has sent a conflicting remote request; this
2386 			 * should not happen in a two-node setup.  Wait for the
2387 			 * earlier peer request to complete.
2388 			 */
2389 			err = drbd_wait_misc(device, i);
2390 			if (err)
2391 				goto out;
2392 			goto repeat;
2393 		}
2394 
2395 		equal = i->sector == sector && i->size == size;
2396 		if (resolve_conflicts) {
2397 			/*
2398 			 * If the peer request is fully contained within the
2399 			 * overlapping request, it can be considered overwritten
2400 			 * and thus superseded; otherwise, it will be retried
2401 			 * once all overlapping requests have completed.
2402 			 */
2403 			bool superseded = i->sector <= sector && i->sector +
2404 				       (i->size >> 9) >= sector + (size >> 9);
2405 
2406 			if (!equal)
2407 				drbd_alert(device, "Concurrent writes detected: "
2408 					       "local=%llus +%u, remote=%llus +%u, "
2409 					       "assuming %s came first\n",
2410 					  (unsigned long long)i->sector, i->size,
2411 					  (unsigned long long)sector, size,
2412 					  superseded ? "local" : "remote");
2413 
2414 			peer_req->w.cb = superseded ? e_send_superseded :
2415 						   e_send_retry_write;
2416 			list_add_tail(&peer_req->w.list, &device->done_ee);
2417 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2418 
2419 			err = -ENOENT;
2420 			goto out;
2421 		} else {
2422 			struct drbd_request *req =
2423 				container_of(i, struct drbd_request, i);
2424 
2425 			if (!equal)
2426 				drbd_alert(device, "Concurrent writes detected: "
2427 					       "local=%llus +%u, remote=%llus +%u\n",
2428 					  (unsigned long long)i->sector, i->size,
2429 					  (unsigned long long)sector, size);
2430 
2431 			if (req->rq_state & RQ_LOCAL_PENDING ||
2432 			    !(req->rq_state & RQ_POSTPONED)) {
2433 				/*
2434 				 * Wait for the node with the discard flag to
2435 				 * decide if this request has been superseded
2436 				 * or needs to be retried.
2437 				 * Requests that have been superseded will
2438 				 * disappear from the write_requests tree.
2439 				 *
2440 				 * In addition, wait for the conflicting
2441 				 * request to finish locally before submitting
2442 				 * the conflicting peer request.
2443 				 */
2444 				err = drbd_wait_misc(device, &req->i);
2445 				if (err) {
2446 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2447 					fail_postponed_requests(device, sector, size);
2448 					goto out;
2449 				}
2450 				goto repeat;
2451 			}
2452 			/*
2453 			 * Remember to restart the conflicting requests after
2454 			 * the new peer request has completed.
2455 			 */
2456 			peer_req->flags |= EE_RESTART_REQUESTS;
2457 		}
2458 	}
2459 	err = 0;
2460 
2461     out:
2462 	if (err)
2463 		drbd_remove_epoch_entry_interval(device, peer_req);
2464 	return err;
2465 }
2466 
2467 /* mirrored write */
2468 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2469 {
2470 	struct drbd_peer_device *peer_device;
2471 	struct drbd_device *device;
2472 	struct net_conf *nc;
2473 	sector_t sector;
2474 	struct drbd_peer_request *peer_req;
2475 	struct p_data *p = pi->data;
2476 	u32 peer_seq = be32_to_cpu(p->seq_num);
2477 	int op, op_flags;
2478 	u32 dp_flags;
2479 	int err, tp;
2480 
2481 	peer_device = conn_peer_device(connection, pi->vnr);
2482 	if (!peer_device)
2483 		return -EIO;
2484 	device = peer_device->device;
2485 
2486 	if (!get_ldev(device)) {
2487 		int err2;
2488 
2489 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2490 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2491 		atomic_inc(&connection->current_epoch->epoch_size);
2492 		err2 = drbd_drain_block(peer_device, pi->size);
2493 		if (!err)
2494 			err = err2;
2495 		return err;
2496 	}
2497 
2498 	/*
2499 	 * Corresponding put_ldev done either below (on various errors), or in
2500 	 * drbd_peer_request_endio, if we successfully submit the data at the
2501 	 * end of this function.
2502 	 */
2503 
2504 	sector = be64_to_cpu(p->sector);
2505 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2506 	if (!peer_req) {
2507 		put_ldev(device);
2508 		return -EIO;
2509 	}
2510 
2511 	peer_req->w.cb = e_end_block;
2512 	peer_req->submit_jif = jiffies;
2513 	peer_req->flags |= EE_APPLICATION;
2514 
2515 	dp_flags = be32_to_cpu(p->dp_flags);
2516 	op = wire_flags_to_bio_op(dp_flags);
2517 	op_flags = wire_flags_to_bio_flags(dp_flags);
2518 	if (pi->cmd == P_TRIM) {
2519 		D_ASSERT(peer_device, peer_req->i.size > 0);
2520 		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2521 		D_ASSERT(peer_device, peer_req->pages == NULL);
2522 	} else if (peer_req->pages == NULL) {
2523 		D_ASSERT(device, peer_req->i.size == 0);
2524 		D_ASSERT(device, dp_flags & DP_FLUSH);
2525 	}
2526 
2527 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2528 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2529 
2530 	spin_lock(&connection->epoch_lock);
2531 	peer_req->epoch = connection->current_epoch;
2532 	atomic_inc(&peer_req->epoch->epoch_size);
2533 	atomic_inc(&peer_req->epoch->active);
2534 	spin_unlock(&connection->epoch_lock);
2535 
2536 	rcu_read_lock();
2537 	nc = rcu_dereference(peer_device->connection->net_conf);
2538 	tp = nc->two_primaries;
2539 	if (peer_device->connection->agreed_pro_version < 100) {
2540 		switch (nc->wire_protocol) {
2541 		case DRBD_PROT_C:
2542 			dp_flags |= DP_SEND_WRITE_ACK;
2543 			break;
2544 		case DRBD_PROT_B:
2545 			dp_flags |= DP_SEND_RECEIVE_ACK;
2546 			break;
2547 		}
2548 	}
2549 	rcu_read_unlock();
2550 
2551 	if (dp_flags & DP_SEND_WRITE_ACK) {
2552 		peer_req->flags |= EE_SEND_WRITE_ACK;
2553 		inc_unacked(device);
2554 		/* corresponding dec_unacked() in e_end_block()
2555 		 * respective _drbd_clear_done_ee */
2556 	}
2557 
2558 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2559 		/* I really don't like it that the receiver thread
2560 		 * sends on the msock, but anyways */
2561 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2562 	}
2563 
2564 	if (tp) {
2565 		/* two primaries implies protocol C */
2566 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2567 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2568 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2569 		if (err)
2570 			goto out_interrupted;
2571 		spin_lock_irq(&device->resource->req_lock);
2572 		err = handle_write_conflicts(device, peer_req);
2573 		if (err) {
2574 			spin_unlock_irq(&device->resource->req_lock);
2575 			if (err == -ENOENT) {
2576 				put_ldev(device);
2577 				return 0;
2578 			}
2579 			goto out_interrupted;
2580 		}
2581 	} else {
2582 		update_peer_seq(peer_device, peer_seq);
2583 		spin_lock_irq(&device->resource->req_lock);
2584 	}
2585 	/* TRIM and WRITE_SAME are processed synchronously,
2586 	 * we wait for all pending requests, respectively wait for
2587 	 * active_ee to become empty in drbd_submit_peer_request();
2588 	 * better not add ourselves here. */
2589 	if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2590 		list_add_tail(&peer_req->w.list, &device->active_ee);
2591 	spin_unlock_irq(&device->resource->req_lock);
2592 
2593 	if (device->state.conn == C_SYNC_TARGET)
2594 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2595 
2596 	if (device->state.pdsk < D_INCONSISTENT) {
2597 		/* In case we have the only disk of the cluster, */
2598 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2599 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2600 		drbd_al_begin_io(device, &peer_req->i);
2601 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2602 	}
2603 
2604 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2605 				       DRBD_FAULT_DT_WR);
2606 	if (!err)
2607 		return 0;
2608 
2609 	/* don't care for the reason here */
2610 	drbd_err(device, "submit failed, triggering re-connect\n");
2611 	spin_lock_irq(&device->resource->req_lock);
2612 	list_del(&peer_req->w.list);
2613 	drbd_remove_epoch_entry_interval(device, peer_req);
2614 	spin_unlock_irq(&device->resource->req_lock);
2615 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2616 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2617 		drbd_al_complete_io(device, &peer_req->i);
2618 	}
2619 
2620 out_interrupted:
2621 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2622 	put_ldev(device);
2623 	drbd_free_peer_req(device, peer_req);
2624 	return err;
2625 }
2626 
2627 /* We may throttle resync, if the lower device seems to be busy,
2628  * and current sync rate is above c_min_rate.
2629  *
2630  * To decide whether or not the lower device is busy, we use a scheme similar
2631  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2632  * (more than 64 sectors) of activity we cannot account for with our own resync
2633  * activity, it obviously is "busy".
2634  *
2635  * The current sync rate used here uses only the most recent two step marks,
2636  * to have a short time average so we can react faster.
2637  */
2638 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2639 		bool throttle_if_app_is_waiting)
2640 {
2641 	struct lc_element *tmp;
2642 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2643 
2644 	if (!throttle || throttle_if_app_is_waiting)
2645 		return throttle;
2646 
2647 	spin_lock_irq(&device->al_lock);
2648 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2649 	if (tmp) {
2650 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2651 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2652 			throttle = false;
2653 		/* Do not slow down if app IO is already waiting for this extent,
2654 		 * and our progress is necessary for application IO to complete. */
2655 	}
2656 	spin_unlock_irq(&device->al_lock);
2657 
2658 	return throttle;
2659 }
2660 
2661 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2662 {
2663 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2664 	unsigned long db, dt, dbdt;
2665 	unsigned int c_min_rate;
2666 	int curr_events;
2667 
2668 	rcu_read_lock();
2669 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2670 	rcu_read_unlock();
2671 
2672 	/* feature disabled? */
2673 	if (c_min_rate == 0)
2674 		return false;
2675 
2676 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2677 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2678 			atomic_read(&device->rs_sect_ev);
2679 
2680 	if (atomic_read(&device->ap_actlog_cnt)
2681 	    || curr_events - device->rs_last_events > 64) {
2682 		unsigned long rs_left;
2683 		int i;
2684 
2685 		device->rs_last_events = curr_events;
2686 
2687 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2688 		 * approx. */
2689 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2690 
2691 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2692 			rs_left = device->ov_left;
2693 		else
2694 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2695 
2696 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2697 		if (!dt)
2698 			dt++;
2699 		db = device->rs_mark_left[i] - rs_left;
2700 		dbdt = Bit2KB(db/dt);
2701 
2702 		if (dbdt > c_min_rate)
2703 			return true;
2704 	}
2705 	return false;
2706 }
2707 
2708 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2709 {
2710 	struct drbd_peer_device *peer_device;
2711 	struct drbd_device *device;
2712 	sector_t sector;
2713 	sector_t capacity;
2714 	struct drbd_peer_request *peer_req;
2715 	struct digest_info *di = NULL;
2716 	int size, verb;
2717 	unsigned int fault_type;
2718 	struct p_block_req *p =	pi->data;
2719 
2720 	peer_device = conn_peer_device(connection, pi->vnr);
2721 	if (!peer_device)
2722 		return -EIO;
2723 	device = peer_device->device;
2724 	capacity = drbd_get_capacity(device->this_bdev);
2725 
2726 	sector = be64_to_cpu(p->sector);
2727 	size   = be32_to_cpu(p->blksize);
2728 
2729 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2730 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2731 				(unsigned long long)sector, size);
2732 		return -EINVAL;
2733 	}
2734 	if (sector + (size>>9) > capacity) {
2735 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2736 				(unsigned long long)sector, size);
2737 		return -EINVAL;
2738 	}
2739 
2740 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2741 		verb = 1;
2742 		switch (pi->cmd) {
2743 		case P_DATA_REQUEST:
2744 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2745 			break;
2746 		case P_RS_THIN_REQ:
2747 		case P_RS_DATA_REQUEST:
2748 		case P_CSUM_RS_REQUEST:
2749 		case P_OV_REQUEST:
2750 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2751 			break;
2752 		case P_OV_REPLY:
2753 			verb = 0;
2754 			dec_rs_pending(device);
2755 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2756 			break;
2757 		default:
2758 			BUG();
2759 		}
2760 		if (verb && __ratelimit(&drbd_ratelimit_state))
2761 			drbd_err(device, "Can not satisfy peer's read request, "
2762 			    "no local data.\n");
2763 
2764 		/* drain possibly payload */
2765 		return drbd_drain_block(peer_device, pi->size);
2766 	}
2767 
2768 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2769 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2770 	 * which in turn might block on the other node at this very place.  */
2771 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2772 			size, GFP_NOIO);
2773 	if (!peer_req) {
2774 		put_ldev(device);
2775 		return -ENOMEM;
2776 	}
2777 
2778 	switch (pi->cmd) {
2779 	case P_DATA_REQUEST:
2780 		peer_req->w.cb = w_e_end_data_req;
2781 		fault_type = DRBD_FAULT_DT_RD;
2782 		/* application IO, don't drbd_rs_begin_io */
2783 		peer_req->flags |= EE_APPLICATION;
2784 		goto submit;
2785 
2786 	case P_RS_THIN_REQ:
2787 		/* If at some point in the future we have a smart way to
2788 		   find out if this data block is completely deallocated,
2789 		   then we would do something smarter here than reading
2790 		   the block... */
2791 		peer_req->flags |= EE_RS_THIN_REQ;
2792 	case P_RS_DATA_REQUEST:
2793 		peer_req->w.cb = w_e_end_rsdata_req;
2794 		fault_type = DRBD_FAULT_RS_RD;
2795 		/* used in the sector offset progress display */
2796 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2797 		break;
2798 
2799 	case P_OV_REPLY:
2800 	case P_CSUM_RS_REQUEST:
2801 		fault_type = DRBD_FAULT_RS_RD;
2802 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2803 		if (!di)
2804 			goto out_free_e;
2805 
2806 		di->digest_size = pi->size;
2807 		di->digest = (((char *)di)+sizeof(struct digest_info));
2808 
2809 		peer_req->digest = di;
2810 		peer_req->flags |= EE_HAS_DIGEST;
2811 
2812 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2813 			goto out_free_e;
2814 
2815 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2816 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2817 			peer_req->w.cb = w_e_end_csum_rs_req;
2818 			/* used in the sector offset progress display */
2819 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2820 			/* remember to report stats in drbd_resync_finished */
2821 			device->use_csums = true;
2822 		} else if (pi->cmd == P_OV_REPLY) {
2823 			/* track progress, we may need to throttle */
2824 			atomic_add(size >> 9, &device->rs_sect_in);
2825 			peer_req->w.cb = w_e_end_ov_reply;
2826 			dec_rs_pending(device);
2827 			/* drbd_rs_begin_io done when we sent this request,
2828 			 * but accounting still needs to be done. */
2829 			goto submit_for_resync;
2830 		}
2831 		break;
2832 
2833 	case P_OV_REQUEST:
2834 		if (device->ov_start_sector == ~(sector_t)0 &&
2835 		    peer_device->connection->agreed_pro_version >= 90) {
2836 			unsigned long now = jiffies;
2837 			int i;
2838 			device->ov_start_sector = sector;
2839 			device->ov_position = sector;
2840 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2841 			device->rs_total = device->ov_left;
2842 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2843 				device->rs_mark_left[i] = device->ov_left;
2844 				device->rs_mark_time[i] = now;
2845 			}
2846 			drbd_info(device, "Online Verify start sector: %llu\n",
2847 					(unsigned long long)sector);
2848 		}
2849 		peer_req->w.cb = w_e_end_ov_req;
2850 		fault_type = DRBD_FAULT_RS_RD;
2851 		break;
2852 
2853 	default:
2854 		BUG();
2855 	}
2856 
2857 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2858 	 * wrt the receiver, but it is not as straightforward as it may seem.
2859 	 * Various places in the resync start and stop logic assume resync
2860 	 * requests are processed in order, requeuing this on the worker thread
2861 	 * introduces a bunch of new code for synchronization between threads.
2862 	 *
2863 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2864 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2865 	 * for application writes for the same time.  For now, just throttle
2866 	 * here, where the rest of the code expects the receiver to sleep for
2867 	 * a while, anyways.
2868 	 */
2869 
2870 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2871 	 * this defers syncer requests for some time, before letting at least
2872 	 * on request through.  The resync controller on the receiving side
2873 	 * will adapt to the incoming rate accordingly.
2874 	 *
2875 	 * We cannot throttle here if remote is Primary/SyncTarget:
2876 	 * we would also throttle its application reads.
2877 	 * In that case, throttling is done on the SyncTarget only.
2878 	 */
2879 
2880 	/* Even though this may be a resync request, we do add to "read_ee";
2881 	 * "sync_ee" is only used for resync WRITEs.
2882 	 * Add to list early, so debugfs can find this request
2883 	 * even if we have to sleep below. */
2884 	spin_lock_irq(&device->resource->req_lock);
2885 	list_add_tail(&peer_req->w.list, &device->read_ee);
2886 	spin_unlock_irq(&device->resource->req_lock);
2887 
2888 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2889 	if (device->state.peer != R_PRIMARY
2890 	&& drbd_rs_should_slow_down(device, sector, false))
2891 		schedule_timeout_uninterruptible(HZ/10);
2892 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2893 	if (drbd_rs_begin_io(device, sector))
2894 		goto out_free_e;
2895 
2896 submit_for_resync:
2897 	atomic_add(size >> 9, &device->rs_sect_ev);
2898 
2899 submit:
2900 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2901 	inc_unacked(device);
2902 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2903 				     fault_type) == 0)
2904 		return 0;
2905 
2906 	/* don't care for the reason here */
2907 	drbd_err(device, "submit failed, triggering re-connect\n");
2908 
2909 out_free_e:
2910 	spin_lock_irq(&device->resource->req_lock);
2911 	list_del(&peer_req->w.list);
2912 	spin_unlock_irq(&device->resource->req_lock);
2913 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2914 
2915 	put_ldev(device);
2916 	drbd_free_peer_req(device, peer_req);
2917 	return -EIO;
2918 }
2919 
2920 /**
2921  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2922  */
2923 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2924 {
2925 	struct drbd_device *device = peer_device->device;
2926 	int self, peer, rv = -100;
2927 	unsigned long ch_self, ch_peer;
2928 	enum drbd_after_sb_p after_sb_0p;
2929 
2930 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2931 	peer = device->p_uuid[UI_BITMAP] & 1;
2932 
2933 	ch_peer = device->p_uuid[UI_SIZE];
2934 	ch_self = device->comm_bm_set;
2935 
2936 	rcu_read_lock();
2937 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2938 	rcu_read_unlock();
2939 	switch (after_sb_0p) {
2940 	case ASB_CONSENSUS:
2941 	case ASB_DISCARD_SECONDARY:
2942 	case ASB_CALL_HELPER:
2943 	case ASB_VIOLENTLY:
2944 		drbd_err(device, "Configuration error.\n");
2945 		break;
2946 	case ASB_DISCONNECT:
2947 		break;
2948 	case ASB_DISCARD_YOUNGER_PRI:
2949 		if (self == 0 && peer == 1) {
2950 			rv = -1;
2951 			break;
2952 		}
2953 		if (self == 1 && peer == 0) {
2954 			rv =  1;
2955 			break;
2956 		}
2957 		/* Else fall through to one of the other strategies... */
2958 	case ASB_DISCARD_OLDER_PRI:
2959 		if (self == 0 && peer == 1) {
2960 			rv = 1;
2961 			break;
2962 		}
2963 		if (self == 1 && peer == 0) {
2964 			rv = -1;
2965 			break;
2966 		}
2967 		/* Else fall through to one of the other strategies... */
2968 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2969 		     "Using discard-least-changes instead\n");
2970 	case ASB_DISCARD_ZERO_CHG:
2971 		if (ch_peer == 0 && ch_self == 0) {
2972 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2973 				? -1 : 1;
2974 			break;
2975 		} else {
2976 			if (ch_peer == 0) { rv =  1; break; }
2977 			if (ch_self == 0) { rv = -1; break; }
2978 		}
2979 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2980 			break;
2981 	case ASB_DISCARD_LEAST_CHG:
2982 		if	(ch_self < ch_peer)
2983 			rv = -1;
2984 		else if (ch_self > ch_peer)
2985 			rv =  1;
2986 		else /* ( ch_self == ch_peer ) */
2987 		     /* Well, then use something else. */
2988 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2989 				? -1 : 1;
2990 		break;
2991 	case ASB_DISCARD_LOCAL:
2992 		rv = -1;
2993 		break;
2994 	case ASB_DISCARD_REMOTE:
2995 		rv =  1;
2996 	}
2997 
2998 	return rv;
2999 }
3000 
3001 /**
3002  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3003  */
3004 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3005 {
3006 	struct drbd_device *device = peer_device->device;
3007 	int hg, rv = -100;
3008 	enum drbd_after_sb_p after_sb_1p;
3009 
3010 	rcu_read_lock();
3011 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3012 	rcu_read_unlock();
3013 	switch (after_sb_1p) {
3014 	case ASB_DISCARD_YOUNGER_PRI:
3015 	case ASB_DISCARD_OLDER_PRI:
3016 	case ASB_DISCARD_LEAST_CHG:
3017 	case ASB_DISCARD_LOCAL:
3018 	case ASB_DISCARD_REMOTE:
3019 	case ASB_DISCARD_ZERO_CHG:
3020 		drbd_err(device, "Configuration error.\n");
3021 		break;
3022 	case ASB_DISCONNECT:
3023 		break;
3024 	case ASB_CONSENSUS:
3025 		hg = drbd_asb_recover_0p(peer_device);
3026 		if (hg == -1 && device->state.role == R_SECONDARY)
3027 			rv = hg;
3028 		if (hg == 1  && device->state.role == R_PRIMARY)
3029 			rv = hg;
3030 		break;
3031 	case ASB_VIOLENTLY:
3032 		rv = drbd_asb_recover_0p(peer_device);
3033 		break;
3034 	case ASB_DISCARD_SECONDARY:
3035 		return device->state.role == R_PRIMARY ? 1 : -1;
3036 	case ASB_CALL_HELPER:
3037 		hg = drbd_asb_recover_0p(peer_device);
3038 		if (hg == -1 && device->state.role == R_PRIMARY) {
3039 			enum drbd_state_rv rv2;
3040 
3041 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3042 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3043 			  * we do not need to wait for the after state change work either. */
3044 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3045 			if (rv2 != SS_SUCCESS) {
3046 				drbd_khelper(device, "pri-lost-after-sb");
3047 			} else {
3048 				drbd_warn(device, "Successfully gave up primary role.\n");
3049 				rv = hg;
3050 			}
3051 		} else
3052 			rv = hg;
3053 	}
3054 
3055 	return rv;
3056 }
3057 
3058 /**
3059  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3060  */
3061 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3062 {
3063 	struct drbd_device *device = peer_device->device;
3064 	int hg, rv = -100;
3065 	enum drbd_after_sb_p after_sb_2p;
3066 
3067 	rcu_read_lock();
3068 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3069 	rcu_read_unlock();
3070 	switch (after_sb_2p) {
3071 	case ASB_DISCARD_YOUNGER_PRI:
3072 	case ASB_DISCARD_OLDER_PRI:
3073 	case ASB_DISCARD_LEAST_CHG:
3074 	case ASB_DISCARD_LOCAL:
3075 	case ASB_DISCARD_REMOTE:
3076 	case ASB_CONSENSUS:
3077 	case ASB_DISCARD_SECONDARY:
3078 	case ASB_DISCARD_ZERO_CHG:
3079 		drbd_err(device, "Configuration error.\n");
3080 		break;
3081 	case ASB_VIOLENTLY:
3082 		rv = drbd_asb_recover_0p(peer_device);
3083 		break;
3084 	case ASB_DISCONNECT:
3085 		break;
3086 	case ASB_CALL_HELPER:
3087 		hg = drbd_asb_recover_0p(peer_device);
3088 		if (hg == -1) {
3089 			enum drbd_state_rv rv2;
3090 
3091 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3092 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3093 			  * we do not need to wait for the after state change work either. */
3094 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3095 			if (rv2 != SS_SUCCESS) {
3096 				drbd_khelper(device, "pri-lost-after-sb");
3097 			} else {
3098 				drbd_warn(device, "Successfully gave up primary role.\n");
3099 				rv = hg;
3100 			}
3101 		} else
3102 			rv = hg;
3103 	}
3104 
3105 	return rv;
3106 }
3107 
3108 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3109 			   u64 bits, u64 flags)
3110 {
3111 	if (!uuid) {
3112 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3113 		return;
3114 	}
3115 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3116 	     text,
3117 	     (unsigned long long)uuid[UI_CURRENT],
3118 	     (unsigned long long)uuid[UI_BITMAP],
3119 	     (unsigned long long)uuid[UI_HISTORY_START],
3120 	     (unsigned long long)uuid[UI_HISTORY_END],
3121 	     (unsigned long long)bits,
3122 	     (unsigned long long)flags);
3123 }
3124 
3125 /*
3126   100	after split brain try auto recover
3127     2	C_SYNC_SOURCE set BitMap
3128     1	C_SYNC_SOURCE use BitMap
3129     0	no Sync
3130    -1	C_SYNC_TARGET use BitMap
3131    -2	C_SYNC_TARGET set BitMap
3132  -100	after split brain, disconnect
3133 -1000	unrelated data
3134 -1091   requires proto 91
3135 -1096   requires proto 96
3136  */
3137 
3138 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3139 {
3140 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3141 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3142 	u64 self, peer;
3143 	int i, j;
3144 
3145 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3146 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3147 
3148 	*rule_nr = 10;
3149 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3150 		return 0;
3151 
3152 	*rule_nr = 20;
3153 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3154 	     peer != UUID_JUST_CREATED)
3155 		return -2;
3156 
3157 	*rule_nr = 30;
3158 	if (self != UUID_JUST_CREATED &&
3159 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3160 		return 2;
3161 
3162 	if (self == peer) {
3163 		int rct, dc; /* roles at crash time */
3164 
3165 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3166 
3167 			if (connection->agreed_pro_version < 91)
3168 				return -1091;
3169 
3170 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3171 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3172 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3173 				drbd_uuid_move_history(device);
3174 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3175 				device->ldev->md.uuid[UI_BITMAP] = 0;
3176 
3177 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3178 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3179 				*rule_nr = 34;
3180 			} else {
3181 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3182 				*rule_nr = 36;
3183 			}
3184 
3185 			return 1;
3186 		}
3187 
3188 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3189 
3190 			if (connection->agreed_pro_version < 91)
3191 				return -1091;
3192 
3193 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3194 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3195 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3196 
3197 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3198 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3199 				device->p_uuid[UI_BITMAP] = 0UL;
3200 
3201 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3202 				*rule_nr = 35;
3203 			} else {
3204 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3205 				*rule_nr = 37;
3206 			}
3207 
3208 			return -1;
3209 		}
3210 
3211 		/* Common power [off|failure] */
3212 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3213 			(device->p_uuid[UI_FLAGS] & 2);
3214 		/* lowest bit is set when we were primary,
3215 		 * next bit (weight 2) is set when peer was primary */
3216 		*rule_nr = 40;
3217 
3218 		/* Neither has the "crashed primary" flag set,
3219 		 * only a replication link hickup. */
3220 		if (rct == 0)
3221 			return 0;
3222 
3223 		/* Current UUID equal and no bitmap uuid; does not necessarily
3224 		 * mean this was a "simultaneous hard crash", maybe IO was
3225 		 * frozen, so no UUID-bump happened.
3226 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3227 		 * for "new-enough" peer DRBD version. */
3228 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3229 			*rule_nr = 41;
3230 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3231 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3232 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3233 			}
3234 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3235 				/* At least one has the "crashed primary" bit set,
3236 				 * both are primary now, but neither has rotated its UUIDs?
3237 				 * "Can not happen." */
3238 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3239 				return -100;
3240 			}
3241 			if (device->state.role == R_PRIMARY)
3242 				return 1;
3243 			return -1;
3244 		}
3245 
3246 		/* Both are secondary.
3247 		 * Really looks like recovery from simultaneous hard crash.
3248 		 * Check which had been primary before, and arbitrate. */
3249 		switch (rct) {
3250 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3251 		case 1: /*  self_pri && !peer_pri */ return 1;
3252 		case 2: /* !self_pri &&  peer_pri */ return -1;
3253 		case 3: /*  self_pri &&  peer_pri */
3254 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3255 			return dc ? -1 : 1;
3256 		}
3257 	}
3258 
3259 	*rule_nr = 50;
3260 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3261 	if (self == peer)
3262 		return -1;
3263 
3264 	*rule_nr = 51;
3265 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3266 	if (self == peer) {
3267 		if (connection->agreed_pro_version < 96 ?
3268 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3269 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3270 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3271 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3272 			   resync as sync source modifications of the peer's UUIDs. */
3273 
3274 			if (connection->agreed_pro_version < 91)
3275 				return -1091;
3276 
3277 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3278 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3279 
3280 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3281 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3282 
3283 			return -1;
3284 		}
3285 	}
3286 
3287 	*rule_nr = 60;
3288 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3289 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3290 		peer = device->p_uuid[i] & ~((u64)1);
3291 		if (self == peer)
3292 			return -2;
3293 	}
3294 
3295 	*rule_nr = 70;
3296 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3297 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3298 	if (self == peer)
3299 		return 1;
3300 
3301 	*rule_nr = 71;
3302 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3303 	if (self == peer) {
3304 		if (connection->agreed_pro_version < 96 ?
3305 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3306 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3307 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3308 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3309 			   resync as sync source modifications of our UUIDs. */
3310 
3311 			if (connection->agreed_pro_version < 91)
3312 				return -1091;
3313 
3314 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3315 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3316 
3317 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3318 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3319 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3320 
3321 			return 1;
3322 		}
3323 	}
3324 
3325 
3326 	*rule_nr = 80;
3327 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3328 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3329 		self = device->ldev->md.uuid[i] & ~((u64)1);
3330 		if (self == peer)
3331 			return 2;
3332 	}
3333 
3334 	*rule_nr = 90;
3335 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3336 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3337 	if (self == peer && self != ((u64)0))
3338 		return 100;
3339 
3340 	*rule_nr = 100;
3341 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3342 		self = device->ldev->md.uuid[i] & ~((u64)1);
3343 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3344 			peer = device->p_uuid[j] & ~((u64)1);
3345 			if (self == peer)
3346 				return -100;
3347 		}
3348 	}
3349 
3350 	return -1000;
3351 }
3352 
3353 /* drbd_sync_handshake() returns the new conn state on success, or
3354    CONN_MASK (-1) on failure.
3355  */
3356 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3357 					   enum drbd_role peer_role,
3358 					   enum drbd_disk_state peer_disk) __must_hold(local)
3359 {
3360 	struct drbd_device *device = peer_device->device;
3361 	enum drbd_conns rv = C_MASK;
3362 	enum drbd_disk_state mydisk;
3363 	struct net_conf *nc;
3364 	int hg, rule_nr, rr_conflict, tentative;
3365 
3366 	mydisk = device->state.disk;
3367 	if (mydisk == D_NEGOTIATING)
3368 		mydisk = device->new_state_tmp.disk;
3369 
3370 	drbd_info(device, "drbd_sync_handshake:\n");
3371 
3372 	spin_lock_irq(&device->ldev->md.uuid_lock);
3373 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3374 	drbd_uuid_dump(device, "peer", device->p_uuid,
3375 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3376 
3377 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3378 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3379 
3380 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3381 
3382 	if (hg == -1000) {
3383 		drbd_alert(device, "Unrelated data, aborting!\n");
3384 		return C_MASK;
3385 	}
3386 	if (hg < -0x10000) {
3387 		int proto, fflags;
3388 		hg = -hg;
3389 		proto = hg & 0xff;
3390 		fflags = (hg >> 8) & 0xff;
3391 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3392 					proto, fflags);
3393 		return C_MASK;
3394 	}
3395 	if (hg < -1000) {
3396 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3397 		return C_MASK;
3398 	}
3399 
3400 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3401 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3402 		int f = (hg == -100) || abs(hg) == 2;
3403 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3404 		if (f)
3405 			hg = hg*2;
3406 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3407 		     hg > 0 ? "source" : "target");
3408 	}
3409 
3410 	if (abs(hg) == 100)
3411 		drbd_khelper(device, "initial-split-brain");
3412 
3413 	rcu_read_lock();
3414 	nc = rcu_dereference(peer_device->connection->net_conf);
3415 
3416 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3417 		int pcount = (device->state.role == R_PRIMARY)
3418 			   + (peer_role == R_PRIMARY);
3419 		int forced = (hg == -100);
3420 
3421 		switch (pcount) {
3422 		case 0:
3423 			hg = drbd_asb_recover_0p(peer_device);
3424 			break;
3425 		case 1:
3426 			hg = drbd_asb_recover_1p(peer_device);
3427 			break;
3428 		case 2:
3429 			hg = drbd_asb_recover_2p(peer_device);
3430 			break;
3431 		}
3432 		if (abs(hg) < 100) {
3433 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3434 			     "automatically solved. Sync from %s node\n",
3435 			     pcount, (hg < 0) ? "peer" : "this");
3436 			if (forced) {
3437 				drbd_warn(device, "Doing a full sync, since"
3438 				     " UUIDs where ambiguous.\n");
3439 				hg = hg*2;
3440 			}
3441 		}
3442 	}
3443 
3444 	if (hg == -100) {
3445 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3446 			hg = -1;
3447 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3448 			hg = 1;
3449 
3450 		if (abs(hg) < 100)
3451 			drbd_warn(device, "Split-Brain detected, manually solved. "
3452 			     "Sync from %s node\n",
3453 			     (hg < 0) ? "peer" : "this");
3454 	}
3455 	rr_conflict = nc->rr_conflict;
3456 	tentative = nc->tentative;
3457 	rcu_read_unlock();
3458 
3459 	if (hg == -100) {
3460 		/* FIXME this log message is not correct if we end up here
3461 		 * after an attempted attach on a diskless node.
3462 		 * We just refuse to attach -- well, we drop the "connection"
3463 		 * to that disk, in a way... */
3464 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3465 		drbd_khelper(device, "split-brain");
3466 		return C_MASK;
3467 	}
3468 
3469 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3470 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3471 		return C_MASK;
3472 	}
3473 
3474 	if (hg < 0 && /* by intention we do not use mydisk here. */
3475 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3476 		switch (rr_conflict) {
3477 		case ASB_CALL_HELPER:
3478 			drbd_khelper(device, "pri-lost");
3479 			/* fall through */
3480 		case ASB_DISCONNECT:
3481 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3482 			return C_MASK;
3483 		case ASB_VIOLENTLY:
3484 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3485 			     "assumption\n");
3486 		}
3487 	}
3488 
3489 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3490 		if (hg == 0)
3491 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3492 		else
3493 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3494 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3495 				 abs(hg) >= 2 ? "full" : "bit-map based");
3496 		return C_MASK;
3497 	}
3498 
3499 	if (abs(hg) >= 2) {
3500 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3501 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3502 					BM_LOCKED_SET_ALLOWED))
3503 			return C_MASK;
3504 	}
3505 
3506 	if (hg > 0) { /* become sync source. */
3507 		rv = C_WF_BITMAP_S;
3508 	} else if (hg < 0) { /* become sync target */
3509 		rv = C_WF_BITMAP_T;
3510 	} else {
3511 		rv = C_CONNECTED;
3512 		if (drbd_bm_total_weight(device)) {
3513 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3514 			     drbd_bm_total_weight(device));
3515 		}
3516 	}
3517 
3518 	return rv;
3519 }
3520 
3521 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3522 {
3523 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3524 	if (peer == ASB_DISCARD_REMOTE)
3525 		return ASB_DISCARD_LOCAL;
3526 
3527 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3528 	if (peer == ASB_DISCARD_LOCAL)
3529 		return ASB_DISCARD_REMOTE;
3530 
3531 	/* everything else is valid if they are equal on both sides. */
3532 	return peer;
3533 }
3534 
3535 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3536 {
3537 	struct p_protocol *p = pi->data;
3538 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3539 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3540 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3541 	char integrity_alg[SHARED_SECRET_MAX] = "";
3542 	struct crypto_ahash *peer_integrity_tfm = NULL;
3543 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3544 
3545 	p_proto		= be32_to_cpu(p->protocol);
3546 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3547 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3548 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3549 	p_two_primaries = be32_to_cpu(p->two_primaries);
3550 	cf		= be32_to_cpu(p->conn_flags);
3551 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3552 
3553 	if (connection->agreed_pro_version >= 87) {
3554 		int err;
3555 
3556 		if (pi->size > sizeof(integrity_alg))
3557 			return -EIO;
3558 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3559 		if (err)
3560 			return err;
3561 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3562 	}
3563 
3564 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3565 		clear_bit(CONN_DRY_RUN, &connection->flags);
3566 
3567 		if (cf & CF_DRY_RUN)
3568 			set_bit(CONN_DRY_RUN, &connection->flags);
3569 
3570 		rcu_read_lock();
3571 		nc = rcu_dereference(connection->net_conf);
3572 
3573 		if (p_proto != nc->wire_protocol) {
3574 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3575 			goto disconnect_rcu_unlock;
3576 		}
3577 
3578 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3579 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3580 			goto disconnect_rcu_unlock;
3581 		}
3582 
3583 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3584 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3585 			goto disconnect_rcu_unlock;
3586 		}
3587 
3588 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3589 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3590 			goto disconnect_rcu_unlock;
3591 		}
3592 
3593 		if (p_discard_my_data && nc->discard_my_data) {
3594 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3595 			goto disconnect_rcu_unlock;
3596 		}
3597 
3598 		if (p_two_primaries != nc->two_primaries) {
3599 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3600 			goto disconnect_rcu_unlock;
3601 		}
3602 
3603 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3604 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3605 			goto disconnect_rcu_unlock;
3606 		}
3607 
3608 		rcu_read_unlock();
3609 	}
3610 
3611 	if (integrity_alg[0]) {
3612 		int hash_size;
3613 
3614 		/*
3615 		 * We can only change the peer data integrity algorithm
3616 		 * here.  Changing our own data integrity algorithm
3617 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3618 		 * the same time; otherwise, the peer has no way to
3619 		 * tell between which packets the algorithm should
3620 		 * change.
3621 		 */
3622 
3623 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3624 		if (IS_ERR(peer_integrity_tfm)) {
3625 			peer_integrity_tfm = NULL;
3626 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3627 				 integrity_alg);
3628 			goto disconnect;
3629 		}
3630 
3631 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3632 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3633 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3634 		if (!(int_dig_in && int_dig_vv)) {
3635 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3636 			goto disconnect;
3637 		}
3638 	}
3639 
3640 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3641 	if (!new_net_conf) {
3642 		drbd_err(connection, "Allocation of new net_conf failed\n");
3643 		goto disconnect;
3644 	}
3645 
3646 	mutex_lock(&connection->data.mutex);
3647 	mutex_lock(&connection->resource->conf_update);
3648 	old_net_conf = connection->net_conf;
3649 	*new_net_conf = *old_net_conf;
3650 
3651 	new_net_conf->wire_protocol = p_proto;
3652 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3653 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3654 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3655 	new_net_conf->two_primaries = p_two_primaries;
3656 
3657 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3658 	mutex_unlock(&connection->resource->conf_update);
3659 	mutex_unlock(&connection->data.mutex);
3660 
3661 	crypto_free_ahash(connection->peer_integrity_tfm);
3662 	kfree(connection->int_dig_in);
3663 	kfree(connection->int_dig_vv);
3664 	connection->peer_integrity_tfm = peer_integrity_tfm;
3665 	connection->int_dig_in = int_dig_in;
3666 	connection->int_dig_vv = int_dig_vv;
3667 
3668 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3669 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3670 			  integrity_alg[0] ? integrity_alg : "(none)");
3671 
3672 	synchronize_rcu();
3673 	kfree(old_net_conf);
3674 	return 0;
3675 
3676 disconnect_rcu_unlock:
3677 	rcu_read_unlock();
3678 disconnect:
3679 	crypto_free_ahash(peer_integrity_tfm);
3680 	kfree(int_dig_in);
3681 	kfree(int_dig_vv);
3682 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3683 	return -EIO;
3684 }
3685 
3686 /* helper function
3687  * input: alg name, feature name
3688  * return: NULL (alg name was "")
3689  *         ERR_PTR(error) if something goes wrong
3690  *         or the crypto hash ptr, if it worked out ok. */
3691 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3692 		const char *alg, const char *name)
3693 {
3694 	struct crypto_ahash *tfm;
3695 
3696 	if (!alg[0])
3697 		return NULL;
3698 
3699 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3700 	if (IS_ERR(tfm)) {
3701 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3702 			alg, name, PTR_ERR(tfm));
3703 		return tfm;
3704 	}
3705 	return tfm;
3706 }
3707 
3708 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3709 {
3710 	void *buffer = connection->data.rbuf;
3711 	int size = pi->size;
3712 
3713 	while (size) {
3714 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3715 		s = drbd_recv(connection, buffer, s);
3716 		if (s <= 0) {
3717 			if (s < 0)
3718 				return s;
3719 			break;
3720 		}
3721 		size -= s;
3722 	}
3723 	if (size)
3724 		return -EIO;
3725 	return 0;
3726 }
3727 
3728 /*
3729  * config_unknown_volume  -  device configuration command for unknown volume
3730  *
3731  * When a device is added to an existing connection, the node on which the
3732  * device is added first will send configuration commands to its peer but the
3733  * peer will not know about the device yet.  It will warn and ignore these
3734  * commands.  Once the device is added on the second node, the second node will
3735  * send the same device configuration commands, but in the other direction.
3736  *
3737  * (We can also end up here if drbd is misconfigured.)
3738  */
3739 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3740 {
3741 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3742 		  cmdname(pi->cmd), pi->vnr);
3743 	return ignore_remaining_packet(connection, pi);
3744 }
3745 
3746 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3747 {
3748 	struct drbd_peer_device *peer_device;
3749 	struct drbd_device *device;
3750 	struct p_rs_param_95 *p;
3751 	unsigned int header_size, data_size, exp_max_sz;
3752 	struct crypto_ahash *verify_tfm = NULL;
3753 	struct crypto_ahash *csums_tfm = NULL;
3754 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3755 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3756 	const int apv = connection->agreed_pro_version;
3757 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3758 	int fifo_size = 0;
3759 	int err;
3760 
3761 	peer_device = conn_peer_device(connection, pi->vnr);
3762 	if (!peer_device)
3763 		return config_unknown_volume(connection, pi);
3764 	device = peer_device->device;
3765 
3766 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3767 		    : apv == 88 ? sizeof(struct p_rs_param)
3768 					+ SHARED_SECRET_MAX
3769 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3770 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3771 
3772 	if (pi->size > exp_max_sz) {
3773 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3774 		    pi->size, exp_max_sz);
3775 		return -EIO;
3776 	}
3777 
3778 	if (apv <= 88) {
3779 		header_size = sizeof(struct p_rs_param);
3780 		data_size = pi->size - header_size;
3781 	} else if (apv <= 94) {
3782 		header_size = sizeof(struct p_rs_param_89);
3783 		data_size = pi->size - header_size;
3784 		D_ASSERT(device, data_size == 0);
3785 	} else {
3786 		header_size = sizeof(struct p_rs_param_95);
3787 		data_size = pi->size - header_size;
3788 		D_ASSERT(device, data_size == 0);
3789 	}
3790 
3791 	/* initialize verify_alg and csums_alg */
3792 	p = pi->data;
3793 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3794 
3795 	err = drbd_recv_all(peer_device->connection, p, header_size);
3796 	if (err)
3797 		return err;
3798 
3799 	mutex_lock(&connection->resource->conf_update);
3800 	old_net_conf = peer_device->connection->net_conf;
3801 	if (get_ldev(device)) {
3802 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3803 		if (!new_disk_conf) {
3804 			put_ldev(device);
3805 			mutex_unlock(&connection->resource->conf_update);
3806 			drbd_err(device, "Allocation of new disk_conf failed\n");
3807 			return -ENOMEM;
3808 		}
3809 
3810 		old_disk_conf = device->ldev->disk_conf;
3811 		*new_disk_conf = *old_disk_conf;
3812 
3813 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3814 	}
3815 
3816 	if (apv >= 88) {
3817 		if (apv == 88) {
3818 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3819 				drbd_err(device, "verify-alg of wrong size, "
3820 					"peer wants %u, accepting only up to %u byte\n",
3821 					data_size, SHARED_SECRET_MAX);
3822 				err = -EIO;
3823 				goto reconnect;
3824 			}
3825 
3826 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3827 			if (err)
3828 				goto reconnect;
3829 			/* we expect NUL terminated string */
3830 			/* but just in case someone tries to be evil */
3831 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3832 			p->verify_alg[data_size-1] = 0;
3833 
3834 		} else /* apv >= 89 */ {
3835 			/* we still expect NUL terminated strings */
3836 			/* but just in case someone tries to be evil */
3837 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3838 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3839 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3840 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3841 		}
3842 
3843 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3844 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3845 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3846 				    old_net_conf->verify_alg, p->verify_alg);
3847 				goto disconnect;
3848 			}
3849 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3850 					p->verify_alg, "verify-alg");
3851 			if (IS_ERR(verify_tfm)) {
3852 				verify_tfm = NULL;
3853 				goto disconnect;
3854 			}
3855 		}
3856 
3857 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3858 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3859 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3860 				    old_net_conf->csums_alg, p->csums_alg);
3861 				goto disconnect;
3862 			}
3863 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3864 					p->csums_alg, "csums-alg");
3865 			if (IS_ERR(csums_tfm)) {
3866 				csums_tfm = NULL;
3867 				goto disconnect;
3868 			}
3869 		}
3870 
3871 		if (apv > 94 && new_disk_conf) {
3872 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3873 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3874 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3875 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3876 
3877 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3878 			if (fifo_size != device->rs_plan_s->size) {
3879 				new_plan = fifo_alloc(fifo_size);
3880 				if (!new_plan) {
3881 					drbd_err(device, "kmalloc of fifo_buffer failed");
3882 					put_ldev(device);
3883 					goto disconnect;
3884 				}
3885 			}
3886 		}
3887 
3888 		if (verify_tfm || csums_tfm) {
3889 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3890 			if (!new_net_conf) {
3891 				drbd_err(device, "Allocation of new net_conf failed\n");
3892 				goto disconnect;
3893 			}
3894 
3895 			*new_net_conf = *old_net_conf;
3896 
3897 			if (verify_tfm) {
3898 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3899 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3900 				crypto_free_ahash(peer_device->connection->verify_tfm);
3901 				peer_device->connection->verify_tfm = verify_tfm;
3902 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3903 			}
3904 			if (csums_tfm) {
3905 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3906 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3907 				crypto_free_ahash(peer_device->connection->csums_tfm);
3908 				peer_device->connection->csums_tfm = csums_tfm;
3909 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3910 			}
3911 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3912 		}
3913 	}
3914 
3915 	if (new_disk_conf) {
3916 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3917 		put_ldev(device);
3918 	}
3919 
3920 	if (new_plan) {
3921 		old_plan = device->rs_plan_s;
3922 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3923 	}
3924 
3925 	mutex_unlock(&connection->resource->conf_update);
3926 	synchronize_rcu();
3927 	if (new_net_conf)
3928 		kfree(old_net_conf);
3929 	kfree(old_disk_conf);
3930 	kfree(old_plan);
3931 
3932 	return 0;
3933 
3934 reconnect:
3935 	if (new_disk_conf) {
3936 		put_ldev(device);
3937 		kfree(new_disk_conf);
3938 	}
3939 	mutex_unlock(&connection->resource->conf_update);
3940 	return -EIO;
3941 
3942 disconnect:
3943 	kfree(new_plan);
3944 	if (new_disk_conf) {
3945 		put_ldev(device);
3946 		kfree(new_disk_conf);
3947 	}
3948 	mutex_unlock(&connection->resource->conf_update);
3949 	/* just for completeness: actually not needed,
3950 	 * as this is not reached if csums_tfm was ok. */
3951 	crypto_free_ahash(csums_tfm);
3952 	/* but free the verify_tfm again, if csums_tfm did not work out */
3953 	crypto_free_ahash(verify_tfm);
3954 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3955 	return -EIO;
3956 }
3957 
3958 /* warn if the arguments differ by more than 12.5% */
3959 static void warn_if_differ_considerably(struct drbd_device *device,
3960 	const char *s, sector_t a, sector_t b)
3961 {
3962 	sector_t d;
3963 	if (a == 0 || b == 0)
3964 		return;
3965 	d = (a > b) ? (a - b) : (b - a);
3966 	if (d > (a>>3) || d > (b>>3))
3967 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3968 		     (unsigned long long)a, (unsigned long long)b);
3969 }
3970 
3971 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3972 {
3973 	struct drbd_peer_device *peer_device;
3974 	struct drbd_device *device;
3975 	struct p_sizes *p = pi->data;
3976 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3977 	enum determine_dev_size dd = DS_UNCHANGED;
3978 	sector_t p_size, p_usize, p_csize, my_usize;
3979 	int ldsc = 0; /* local disk size changed */
3980 	enum dds_flags ddsf;
3981 
3982 	peer_device = conn_peer_device(connection, pi->vnr);
3983 	if (!peer_device)
3984 		return config_unknown_volume(connection, pi);
3985 	device = peer_device->device;
3986 
3987 	p_size = be64_to_cpu(p->d_size);
3988 	p_usize = be64_to_cpu(p->u_size);
3989 	p_csize = be64_to_cpu(p->c_size);
3990 
3991 	/* just store the peer's disk size for now.
3992 	 * we still need to figure out whether we accept that. */
3993 	device->p_size = p_size;
3994 
3995 	if (get_ldev(device)) {
3996 		sector_t new_size, cur_size;
3997 		rcu_read_lock();
3998 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3999 		rcu_read_unlock();
4000 
4001 		warn_if_differ_considerably(device, "lower level device sizes",
4002 			   p_size, drbd_get_max_capacity(device->ldev));
4003 		warn_if_differ_considerably(device, "user requested size",
4004 					    p_usize, my_usize);
4005 
4006 		/* if this is the first connect, or an otherwise expected
4007 		 * param exchange, choose the minimum */
4008 		if (device->state.conn == C_WF_REPORT_PARAMS)
4009 			p_usize = min_not_zero(my_usize, p_usize);
4010 
4011 		/* Never shrink a device with usable data during connect.
4012 		   But allow online shrinking if we are connected. */
4013 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4014 		cur_size = drbd_get_capacity(device->this_bdev);
4015 		if (new_size < cur_size &&
4016 		    device->state.disk >= D_OUTDATED &&
4017 		    device->state.conn < C_CONNECTED) {
4018 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4019 					(unsigned long long)new_size, (unsigned long long)cur_size);
4020 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4021 			put_ldev(device);
4022 			return -EIO;
4023 		}
4024 
4025 		if (my_usize != p_usize) {
4026 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4027 
4028 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4029 			if (!new_disk_conf) {
4030 				drbd_err(device, "Allocation of new disk_conf failed\n");
4031 				put_ldev(device);
4032 				return -ENOMEM;
4033 			}
4034 
4035 			mutex_lock(&connection->resource->conf_update);
4036 			old_disk_conf = device->ldev->disk_conf;
4037 			*new_disk_conf = *old_disk_conf;
4038 			new_disk_conf->disk_size = p_usize;
4039 
4040 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4041 			mutex_unlock(&connection->resource->conf_update);
4042 			synchronize_rcu();
4043 			kfree(old_disk_conf);
4044 
4045 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
4046 				 (unsigned long)my_usize);
4047 		}
4048 
4049 		put_ldev(device);
4050 	}
4051 
4052 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4053 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4054 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4055 	   drbd_reconsider_queue_parameters(), we can be sure that after
4056 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4057 
4058 	ddsf = be16_to_cpu(p->dds_flags);
4059 	if (get_ldev(device)) {
4060 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4061 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4062 		put_ldev(device);
4063 		if (dd == DS_ERROR)
4064 			return -EIO;
4065 		drbd_md_sync(device);
4066 	} else {
4067 		/*
4068 		 * I am diskless, need to accept the peer's *current* size.
4069 		 * I must NOT accept the peers backing disk size,
4070 		 * it may have been larger than mine all along...
4071 		 *
4072 		 * At this point, the peer knows more about my disk, or at
4073 		 * least about what we last agreed upon, than myself.
4074 		 * So if his c_size is less than his d_size, the most likely
4075 		 * reason is that *my* d_size was smaller last time we checked.
4076 		 *
4077 		 * However, if he sends a zero current size,
4078 		 * take his (user-capped or) backing disk size anyways.
4079 		 */
4080 		drbd_reconsider_queue_parameters(device, NULL, o);
4081 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4082 	}
4083 
4084 	if (get_ldev(device)) {
4085 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4086 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4087 			ldsc = 1;
4088 		}
4089 
4090 		put_ldev(device);
4091 	}
4092 
4093 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4094 		if (be64_to_cpu(p->c_size) !=
4095 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4096 			/* we have different sizes, probably peer
4097 			 * needs to know my new size... */
4098 			drbd_send_sizes(peer_device, 0, ddsf);
4099 		}
4100 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4101 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4102 			if (device->state.pdsk >= D_INCONSISTENT &&
4103 			    device->state.disk >= D_INCONSISTENT) {
4104 				if (ddsf & DDSF_NO_RESYNC)
4105 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4106 				else
4107 					resync_after_online_grow(device);
4108 			} else
4109 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4110 		}
4111 	}
4112 
4113 	return 0;
4114 }
4115 
4116 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4117 {
4118 	struct drbd_peer_device *peer_device;
4119 	struct drbd_device *device;
4120 	struct p_uuids *p = pi->data;
4121 	u64 *p_uuid;
4122 	int i, updated_uuids = 0;
4123 
4124 	peer_device = conn_peer_device(connection, pi->vnr);
4125 	if (!peer_device)
4126 		return config_unknown_volume(connection, pi);
4127 	device = peer_device->device;
4128 
4129 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4130 	if (!p_uuid) {
4131 		drbd_err(device, "kmalloc of p_uuid failed\n");
4132 		return false;
4133 	}
4134 
4135 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4136 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4137 
4138 	kfree(device->p_uuid);
4139 	device->p_uuid = p_uuid;
4140 
4141 	if (device->state.conn < C_CONNECTED &&
4142 	    device->state.disk < D_INCONSISTENT &&
4143 	    device->state.role == R_PRIMARY &&
4144 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4145 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4146 		    (unsigned long long)device->ed_uuid);
4147 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4148 		return -EIO;
4149 	}
4150 
4151 	if (get_ldev(device)) {
4152 		int skip_initial_sync =
4153 			device->state.conn == C_CONNECTED &&
4154 			peer_device->connection->agreed_pro_version >= 90 &&
4155 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4156 			(p_uuid[UI_FLAGS] & 8);
4157 		if (skip_initial_sync) {
4158 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4159 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4160 					"clear_n_write from receive_uuids",
4161 					BM_LOCKED_TEST_ALLOWED);
4162 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4163 			_drbd_uuid_set(device, UI_BITMAP, 0);
4164 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4165 					CS_VERBOSE, NULL);
4166 			drbd_md_sync(device);
4167 			updated_uuids = 1;
4168 		}
4169 		put_ldev(device);
4170 	} else if (device->state.disk < D_INCONSISTENT &&
4171 		   device->state.role == R_PRIMARY) {
4172 		/* I am a diskless primary, the peer just created a new current UUID
4173 		   for me. */
4174 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4175 	}
4176 
4177 	/* Before we test for the disk state, we should wait until an eventually
4178 	   ongoing cluster wide state change is finished. That is important if
4179 	   we are primary and are detaching from our disk. We need to see the
4180 	   new disk state... */
4181 	mutex_lock(device->state_mutex);
4182 	mutex_unlock(device->state_mutex);
4183 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4184 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4185 
4186 	if (updated_uuids)
4187 		drbd_print_uuids(device, "receiver updated UUIDs to");
4188 
4189 	return 0;
4190 }
4191 
4192 /**
4193  * convert_state() - Converts the peer's view of the cluster state to our point of view
4194  * @ps:		The state as seen by the peer.
4195  */
4196 static union drbd_state convert_state(union drbd_state ps)
4197 {
4198 	union drbd_state ms;
4199 
4200 	static enum drbd_conns c_tab[] = {
4201 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4202 		[C_CONNECTED] = C_CONNECTED,
4203 
4204 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4205 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4206 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4207 		[C_VERIFY_S]       = C_VERIFY_T,
4208 		[C_MASK]   = C_MASK,
4209 	};
4210 
4211 	ms.i = ps.i;
4212 
4213 	ms.conn = c_tab[ps.conn];
4214 	ms.peer = ps.role;
4215 	ms.role = ps.peer;
4216 	ms.pdsk = ps.disk;
4217 	ms.disk = ps.pdsk;
4218 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4219 
4220 	return ms;
4221 }
4222 
4223 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4224 {
4225 	struct drbd_peer_device *peer_device;
4226 	struct drbd_device *device;
4227 	struct p_req_state *p = pi->data;
4228 	union drbd_state mask, val;
4229 	enum drbd_state_rv rv;
4230 
4231 	peer_device = conn_peer_device(connection, pi->vnr);
4232 	if (!peer_device)
4233 		return -EIO;
4234 	device = peer_device->device;
4235 
4236 	mask.i = be32_to_cpu(p->mask);
4237 	val.i = be32_to_cpu(p->val);
4238 
4239 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4240 	    mutex_is_locked(device->state_mutex)) {
4241 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4242 		return 0;
4243 	}
4244 
4245 	mask = convert_state(mask);
4246 	val = convert_state(val);
4247 
4248 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4249 	drbd_send_sr_reply(peer_device, rv);
4250 
4251 	drbd_md_sync(device);
4252 
4253 	return 0;
4254 }
4255 
4256 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4257 {
4258 	struct p_req_state *p = pi->data;
4259 	union drbd_state mask, val;
4260 	enum drbd_state_rv rv;
4261 
4262 	mask.i = be32_to_cpu(p->mask);
4263 	val.i = be32_to_cpu(p->val);
4264 
4265 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4266 	    mutex_is_locked(&connection->cstate_mutex)) {
4267 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4268 		return 0;
4269 	}
4270 
4271 	mask = convert_state(mask);
4272 	val = convert_state(val);
4273 
4274 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4275 	conn_send_sr_reply(connection, rv);
4276 
4277 	return 0;
4278 }
4279 
4280 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4281 {
4282 	struct drbd_peer_device *peer_device;
4283 	struct drbd_device *device;
4284 	struct p_state *p = pi->data;
4285 	union drbd_state os, ns, peer_state;
4286 	enum drbd_disk_state real_peer_disk;
4287 	enum chg_state_flags cs_flags;
4288 	int rv;
4289 
4290 	peer_device = conn_peer_device(connection, pi->vnr);
4291 	if (!peer_device)
4292 		return config_unknown_volume(connection, pi);
4293 	device = peer_device->device;
4294 
4295 	peer_state.i = be32_to_cpu(p->state);
4296 
4297 	real_peer_disk = peer_state.disk;
4298 	if (peer_state.disk == D_NEGOTIATING) {
4299 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4300 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4301 	}
4302 
4303 	spin_lock_irq(&device->resource->req_lock);
4304  retry:
4305 	os = ns = drbd_read_state(device);
4306 	spin_unlock_irq(&device->resource->req_lock);
4307 
4308 	/* If some other part of the code (ack_receiver thread, timeout)
4309 	 * already decided to close the connection again,
4310 	 * we must not "re-establish" it here. */
4311 	if (os.conn <= C_TEAR_DOWN)
4312 		return -ECONNRESET;
4313 
4314 	/* If this is the "end of sync" confirmation, usually the peer disk
4315 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4316 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4317 	 * unpause-sync events has been "just right", the peer disk may
4318 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4319 	 */
4320 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4321 	    real_peer_disk == D_UP_TO_DATE &&
4322 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4323 		/* If we are (becoming) SyncSource, but peer is still in sync
4324 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4325 		 * will change to inconsistent once the peer reaches active
4326 		 * syncing states.
4327 		 * It may have changed syncer-paused flags, however, so we
4328 		 * cannot ignore this completely. */
4329 		if (peer_state.conn > C_CONNECTED &&
4330 		    peer_state.conn < C_SYNC_SOURCE)
4331 			real_peer_disk = D_INCONSISTENT;
4332 
4333 		/* if peer_state changes to connected at the same time,
4334 		 * it explicitly notifies us that it finished resync.
4335 		 * Maybe we should finish it up, too? */
4336 		else if (os.conn >= C_SYNC_SOURCE &&
4337 			 peer_state.conn == C_CONNECTED) {
4338 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4339 				drbd_resync_finished(device);
4340 			return 0;
4341 		}
4342 	}
4343 
4344 	/* explicit verify finished notification, stop sector reached. */
4345 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4346 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4347 		ov_out_of_sync_print(device);
4348 		drbd_resync_finished(device);
4349 		return 0;
4350 	}
4351 
4352 	/* peer says his disk is inconsistent, while we think it is uptodate,
4353 	 * and this happens while the peer still thinks we have a sync going on,
4354 	 * but we think we are already done with the sync.
4355 	 * We ignore this to avoid flapping pdsk.
4356 	 * This should not happen, if the peer is a recent version of drbd. */
4357 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4358 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4359 		real_peer_disk = D_UP_TO_DATE;
4360 
4361 	if (ns.conn == C_WF_REPORT_PARAMS)
4362 		ns.conn = C_CONNECTED;
4363 
4364 	if (peer_state.conn == C_AHEAD)
4365 		ns.conn = C_BEHIND;
4366 
4367 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4368 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4369 		int cr; /* consider resync */
4370 
4371 		/* if we established a new connection */
4372 		cr  = (os.conn < C_CONNECTED);
4373 		/* if we had an established connection
4374 		 * and one of the nodes newly attaches a disk */
4375 		cr |= (os.conn == C_CONNECTED &&
4376 		       (peer_state.disk == D_NEGOTIATING ||
4377 			os.disk == D_NEGOTIATING));
4378 		/* if we have both been inconsistent, and the peer has been
4379 		 * forced to be UpToDate with --overwrite-data */
4380 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4381 		/* if we had been plain connected, and the admin requested to
4382 		 * start a sync by "invalidate" or "invalidate-remote" */
4383 		cr |= (os.conn == C_CONNECTED &&
4384 				(peer_state.conn >= C_STARTING_SYNC_S &&
4385 				 peer_state.conn <= C_WF_BITMAP_T));
4386 
4387 		if (cr)
4388 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4389 
4390 		put_ldev(device);
4391 		if (ns.conn == C_MASK) {
4392 			ns.conn = C_CONNECTED;
4393 			if (device->state.disk == D_NEGOTIATING) {
4394 				drbd_force_state(device, NS(disk, D_FAILED));
4395 			} else if (peer_state.disk == D_NEGOTIATING) {
4396 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4397 				peer_state.disk = D_DISKLESS;
4398 				real_peer_disk = D_DISKLESS;
4399 			} else {
4400 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4401 					return -EIO;
4402 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4403 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4404 				return -EIO;
4405 			}
4406 		}
4407 	}
4408 
4409 	spin_lock_irq(&device->resource->req_lock);
4410 	if (os.i != drbd_read_state(device).i)
4411 		goto retry;
4412 	clear_bit(CONSIDER_RESYNC, &device->flags);
4413 	ns.peer = peer_state.role;
4414 	ns.pdsk = real_peer_disk;
4415 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4416 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4417 		ns.disk = device->new_state_tmp.disk;
4418 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4419 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4420 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4421 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4422 		   for temporal network outages! */
4423 		spin_unlock_irq(&device->resource->req_lock);
4424 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4425 		tl_clear(peer_device->connection);
4426 		drbd_uuid_new_current(device);
4427 		clear_bit(NEW_CUR_UUID, &device->flags);
4428 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4429 		return -EIO;
4430 	}
4431 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4432 	ns = drbd_read_state(device);
4433 	spin_unlock_irq(&device->resource->req_lock);
4434 
4435 	if (rv < SS_SUCCESS) {
4436 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4437 		return -EIO;
4438 	}
4439 
4440 	if (os.conn > C_WF_REPORT_PARAMS) {
4441 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4442 		    peer_state.disk != D_NEGOTIATING ) {
4443 			/* we want resync, peer has not yet decided to sync... */
4444 			/* Nowadays only used when forcing a node into primary role and
4445 			   setting its disk to UpToDate with that */
4446 			drbd_send_uuids(peer_device);
4447 			drbd_send_current_state(peer_device);
4448 		}
4449 	}
4450 
4451 	clear_bit(DISCARD_MY_DATA, &device->flags);
4452 
4453 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4454 
4455 	return 0;
4456 }
4457 
4458 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4459 {
4460 	struct drbd_peer_device *peer_device;
4461 	struct drbd_device *device;
4462 	struct p_rs_uuid *p = pi->data;
4463 
4464 	peer_device = conn_peer_device(connection, pi->vnr);
4465 	if (!peer_device)
4466 		return -EIO;
4467 	device = peer_device->device;
4468 
4469 	wait_event(device->misc_wait,
4470 		   device->state.conn == C_WF_SYNC_UUID ||
4471 		   device->state.conn == C_BEHIND ||
4472 		   device->state.conn < C_CONNECTED ||
4473 		   device->state.disk < D_NEGOTIATING);
4474 
4475 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4476 
4477 	/* Here the _drbd_uuid_ functions are right, current should
4478 	   _not_ be rotated into the history */
4479 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4480 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4481 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4482 
4483 		drbd_print_uuids(device, "updated sync uuid");
4484 		drbd_start_resync(device, C_SYNC_TARGET);
4485 
4486 		put_ldev(device);
4487 	} else
4488 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4489 
4490 	return 0;
4491 }
4492 
4493 /**
4494  * receive_bitmap_plain
4495  *
4496  * Return 0 when done, 1 when another iteration is needed, and a negative error
4497  * code upon failure.
4498  */
4499 static int
4500 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4501 		     unsigned long *p, struct bm_xfer_ctx *c)
4502 {
4503 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4504 				 drbd_header_size(peer_device->connection);
4505 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4506 				       c->bm_words - c->word_offset);
4507 	unsigned int want = num_words * sizeof(*p);
4508 	int err;
4509 
4510 	if (want != size) {
4511 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4512 		return -EIO;
4513 	}
4514 	if (want == 0)
4515 		return 0;
4516 	err = drbd_recv_all(peer_device->connection, p, want);
4517 	if (err)
4518 		return err;
4519 
4520 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4521 
4522 	c->word_offset += num_words;
4523 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4524 	if (c->bit_offset > c->bm_bits)
4525 		c->bit_offset = c->bm_bits;
4526 
4527 	return 1;
4528 }
4529 
4530 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4531 {
4532 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4533 }
4534 
4535 static int dcbp_get_start(struct p_compressed_bm *p)
4536 {
4537 	return (p->encoding & 0x80) != 0;
4538 }
4539 
4540 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4541 {
4542 	return (p->encoding >> 4) & 0x7;
4543 }
4544 
4545 /**
4546  * recv_bm_rle_bits
4547  *
4548  * Return 0 when done, 1 when another iteration is needed, and a negative error
4549  * code upon failure.
4550  */
4551 static int
4552 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4553 		struct p_compressed_bm *p,
4554 		 struct bm_xfer_ctx *c,
4555 		 unsigned int len)
4556 {
4557 	struct bitstream bs;
4558 	u64 look_ahead;
4559 	u64 rl;
4560 	u64 tmp;
4561 	unsigned long s = c->bit_offset;
4562 	unsigned long e;
4563 	int toggle = dcbp_get_start(p);
4564 	int have;
4565 	int bits;
4566 
4567 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4568 
4569 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4570 	if (bits < 0)
4571 		return -EIO;
4572 
4573 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4574 		bits = vli_decode_bits(&rl, look_ahead);
4575 		if (bits <= 0)
4576 			return -EIO;
4577 
4578 		if (toggle) {
4579 			e = s + rl -1;
4580 			if (e >= c->bm_bits) {
4581 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4582 				return -EIO;
4583 			}
4584 			_drbd_bm_set_bits(peer_device->device, s, e);
4585 		}
4586 
4587 		if (have < bits) {
4588 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4589 				have, bits, look_ahead,
4590 				(unsigned int)(bs.cur.b - p->code),
4591 				(unsigned int)bs.buf_len);
4592 			return -EIO;
4593 		}
4594 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4595 		if (likely(bits < 64))
4596 			look_ahead >>= bits;
4597 		else
4598 			look_ahead = 0;
4599 		have -= bits;
4600 
4601 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4602 		if (bits < 0)
4603 			return -EIO;
4604 		look_ahead |= tmp << have;
4605 		have += bits;
4606 	}
4607 
4608 	c->bit_offset = s;
4609 	bm_xfer_ctx_bit_to_word_offset(c);
4610 
4611 	return (s != c->bm_bits);
4612 }
4613 
4614 /**
4615  * decode_bitmap_c
4616  *
4617  * Return 0 when done, 1 when another iteration is needed, and a negative error
4618  * code upon failure.
4619  */
4620 static int
4621 decode_bitmap_c(struct drbd_peer_device *peer_device,
4622 		struct p_compressed_bm *p,
4623 		struct bm_xfer_ctx *c,
4624 		unsigned int len)
4625 {
4626 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4627 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4628 
4629 	/* other variants had been implemented for evaluation,
4630 	 * but have been dropped as this one turned out to be "best"
4631 	 * during all our tests. */
4632 
4633 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4634 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4635 	return -EIO;
4636 }
4637 
4638 void INFO_bm_xfer_stats(struct drbd_device *device,
4639 		const char *direction, struct bm_xfer_ctx *c)
4640 {
4641 	/* what would it take to transfer it "plaintext" */
4642 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4643 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4644 	unsigned int plain =
4645 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4646 		c->bm_words * sizeof(unsigned long);
4647 	unsigned int total = c->bytes[0] + c->bytes[1];
4648 	unsigned int r;
4649 
4650 	/* total can not be zero. but just in case: */
4651 	if (total == 0)
4652 		return;
4653 
4654 	/* don't report if not compressed */
4655 	if (total >= plain)
4656 		return;
4657 
4658 	/* total < plain. check for overflow, still */
4659 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4660 		                    : (1000 * total / plain);
4661 
4662 	if (r > 1000)
4663 		r = 1000;
4664 
4665 	r = 1000 - r;
4666 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4667 	     "total %u; compression: %u.%u%%\n",
4668 			direction,
4669 			c->bytes[1], c->packets[1],
4670 			c->bytes[0], c->packets[0],
4671 			total, r/10, r % 10);
4672 }
4673 
4674 /* Since we are processing the bitfield from lower addresses to higher,
4675    it does not matter if the process it in 32 bit chunks or 64 bit
4676    chunks as long as it is little endian. (Understand it as byte stream,
4677    beginning with the lowest byte...) If we would use big endian
4678    we would need to process it from the highest address to the lowest,
4679    in order to be agnostic to the 32 vs 64 bits issue.
4680 
4681    returns 0 on failure, 1 if we successfully received it. */
4682 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4683 {
4684 	struct drbd_peer_device *peer_device;
4685 	struct drbd_device *device;
4686 	struct bm_xfer_ctx c;
4687 	int err;
4688 
4689 	peer_device = conn_peer_device(connection, pi->vnr);
4690 	if (!peer_device)
4691 		return -EIO;
4692 	device = peer_device->device;
4693 
4694 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4695 	/* you are supposed to send additional out-of-sync information
4696 	 * if you actually set bits during this phase */
4697 
4698 	c = (struct bm_xfer_ctx) {
4699 		.bm_bits = drbd_bm_bits(device),
4700 		.bm_words = drbd_bm_words(device),
4701 	};
4702 
4703 	for(;;) {
4704 		if (pi->cmd == P_BITMAP)
4705 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4706 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4707 			/* MAYBE: sanity check that we speak proto >= 90,
4708 			 * and the feature is enabled! */
4709 			struct p_compressed_bm *p = pi->data;
4710 
4711 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4712 				drbd_err(device, "ReportCBitmap packet too large\n");
4713 				err = -EIO;
4714 				goto out;
4715 			}
4716 			if (pi->size <= sizeof(*p)) {
4717 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4718 				err = -EIO;
4719 				goto out;
4720 			}
4721 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4722 			if (err)
4723 			       goto out;
4724 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4725 		} else {
4726 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4727 			err = -EIO;
4728 			goto out;
4729 		}
4730 
4731 		c.packets[pi->cmd == P_BITMAP]++;
4732 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4733 
4734 		if (err <= 0) {
4735 			if (err < 0)
4736 				goto out;
4737 			break;
4738 		}
4739 		err = drbd_recv_header(peer_device->connection, pi);
4740 		if (err)
4741 			goto out;
4742 	}
4743 
4744 	INFO_bm_xfer_stats(device, "receive", &c);
4745 
4746 	if (device->state.conn == C_WF_BITMAP_T) {
4747 		enum drbd_state_rv rv;
4748 
4749 		err = drbd_send_bitmap(device);
4750 		if (err)
4751 			goto out;
4752 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4753 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4754 		D_ASSERT(device, rv == SS_SUCCESS);
4755 	} else if (device->state.conn != C_WF_BITMAP_S) {
4756 		/* admin may have requested C_DISCONNECTING,
4757 		 * other threads may have noticed network errors */
4758 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4759 		    drbd_conn_str(device->state.conn));
4760 	}
4761 	err = 0;
4762 
4763  out:
4764 	drbd_bm_unlock(device);
4765 	if (!err && device->state.conn == C_WF_BITMAP_S)
4766 		drbd_start_resync(device, C_SYNC_SOURCE);
4767 	return err;
4768 }
4769 
4770 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4771 {
4772 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4773 		 pi->cmd, pi->size);
4774 
4775 	return ignore_remaining_packet(connection, pi);
4776 }
4777 
4778 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4779 {
4780 	/* Make sure we've acked all the TCP data associated
4781 	 * with the data requests being unplugged */
4782 	drbd_tcp_quickack(connection->data.socket);
4783 
4784 	return 0;
4785 }
4786 
4787 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4788 {
4789 	struct drbd_peer_device *peer_device;
4790 	struct drbd_device *device;
4791 	struct p_block_desc *p = pi->data;
4792 
4793 	peer_device = conn_peer_device(connection, pi->vnr);
4794 	if (!peer_device)
4795 		return -EIO;
4796 	device = peer_device->device;
4797 
4798 	switch (device->state.conn) {
4799 	case C_WF_SYNC_UUID:
4800 	case C_WF_BITMAP_T:
4801 	case C_BEHIND:
4802 			break;
4803 	default:
4804 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4805 				drbd_conn_str(device->state.conn));
4806 	}
4807 
4808 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4809 
4810 	return 0;
4811 }
4812 
4813 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4814 {
4815 	struct drbd_peer_device *peer_device;
4816 	struct p_block_desc *p = pi->data;
4817 	struct drbd_device *device;
4818 	sector_t sector;
4819 	int size, err = 0;
4820 
4821 	peer_device = conn_peer_device(connection, pi->vnr);
4822 	if (!peer_device)
4823 		return -EIO;
4824 	device = peer_device->device;
4825 
4826 	sector = be64_to_cpu(p->sector);
4827 	size = be32_to_cpu(p->blksize);
4828 
4829 	dec_rs_pending(device);
4830 
4831 	if (get_ldev(device)) {
4832 		struct drbd_peer_request *peer_req;
4833 		const int op = REQ_OP_WRITE_ZEROES;
4834 
4835 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4836 					       size, 0, GFP_NOIO);
4837 		if (!peer_req) {
4838 			put_ldev(device);
4839 			return -ENOMEM;
4840 		}
4841 
4842 		peer_req->w.cb = e_end_resync_block;
4843 		peer_req->submit_jif = jiffies;
4844 		peer_req->flags |= EE_IS_TRIM;
4845 
4846 		spin_lock_irq(&device->resource->req_lock);
4847 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4848 		spin_unlock_irq(&device->resource->req_lock);
4849 
4850 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4851 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4852 
4853 		if (err) {
4854 			spin_lock_irq(&device->resource->req_lock);
4855 			list_del(&peer_req->w.list);
4856 			spin_unlock_irq(&device->resource->req_lock);
4857 
4858 			drbd_free_peer_req(device, peer_req);
4859 			put_ldev(device);
4860 			err = 0;
4861 			goto fail;
4862 		}
4863 
4864 		inc_unacked(device);
4865 
4866 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4867 		   as well as drbd_rs_complete_io() */
4868 	} else {
4869 	fail:
4870 		drbd_rs_complete_io(device, sector);
4871 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4872 	}
4873 
4874 	atomic_add(size >> 9, &device->rs_sect_in);
4875 
4876 	return err;
4877 }
4878 
4879 struct data_cmd {
4880 	int expect_payload;
4881 	unsigned int pkt_size;
4882 	int (*fn)(struct drbd_connection *, struct packet_info *);
4883 };
4884 
4885 static struct data_cmd drbd_cmd_handler[] = {
4886 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4887 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4888 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4889 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4890 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4891 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4892 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4893 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4894 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4895 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4896 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4897 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4898 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4899 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4900 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4901 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4902 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4903 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4904 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4905 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4906 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4907 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4908 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4909 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4910 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4911 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4912 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4913 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
4914 };
4915 
4916 static void drbdd(struct drbd_connection *connection)
4917 {
4918 	struct packet_info pi;
4919 	size_t shs; /* sub header size */
4920 	int err;
4921 
4922 	while (get_t_state(&connection->receiver) == RUNNING) {
4923 		struct data_cmd const *cmd;
4924 
4925 		drbd_thread_current_set_cpu(&connection->receiver);
4926 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4927 		if (drbd_recv_header_maybe_unplug(connection, &pi))
4928 			goto err_out;
4929 
4930 		cmd = &drbd_cmd_handler[pi.cmd];
4931 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4932 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4933 				 cmdname(pi.cmd), pi.cmd);
4934 			goto err_out;
4935 		}
4936 
4937 		shs = cmd->pkt_size;
4938 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4939 			shs += sizeof(struct o_qlim);
4940 		if (pi.size > shs && !cmd->expect_payload) {
4941 			drbd_err(connection, "No payload expected %s l:%d\n",
4942 				 cmdname(pi.cmd), pi.size);
4943 			goto err_out;
4944 		}
4945 		if (pi.size < shs) {
4946 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4947 				 cmdname(pi.cmd), (int)shs, pi.size);
4948 			goto err_out;
4949 		}
4950 
4951 		if (shs) {
4952 			update_receiver_timing_details(connection, drbd_recv_all_warn);
4953 			err = drbd_recv_all_warn(connection, pi.data, shs);
4954 			if (err)
4955 				goto err_out;
4956 			pi.size -= shs;
4957 		}
4958 
4959 		update_receiver_timing_details(connection, cmd->fn);
4960 		err = cmd->fn(connection, &pi);
4961 		if (err) {
4962 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4963 				 cmdname(pi.cmd), err, pi.size);
4964 			goto err_out;
4965 		}
4966 	}
4967 	return;
4968 
4969     err_out:
4970 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4971 }
4972 
4973 static void conn_disconnect(struct drbd_connection *connection)
4974 {
4975 	struct drbd_peer_device *peer_device;
4976 	enum drbd_conns oc;
4977 	int vnr;
4978 
4979 	if (connection->cstate == C_STANDALONE)
4980 		return;
4981 
4982 	/* We are about to start the cleanup after connection loss.
4983 	 * Make sure drbd_make_request knows about that.
4984 	 * Usually we should be in some network failure state already,
4985 	 * but just in case we are not, we fix it up here.
4986 	 */
4987 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4988 
4989 	/* ack_receiver does not clean up anything. it must not interfere, either */
4990 	drbd_thread_stop(&connection->ack_receiver);
4991 	if (connection->ack_sender) {
4992 		destroy_workqueue(connection->ack_sender);
4993 		connection->ack_sender = NULL;
4994 	}
4995 	drbd_free_sock(connection);
4996 
4997 	rcu_read_lock();
4998 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4999 		struct drbd_device *device = peer_device->device;
5000 		kref_get(&device->kref);
5001 		rcu_read_unlock();
5002 		drbd_disconnected(peer_device);
5003 		kref_put(&device->kref, drbd_destroy_device);
5004 		rcu_read_lock();
5005 	}
5006 	rcu_read_unlock();
5007 
5008 	if (!list_empty(&connection->current_epoch->list))
5009 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5010 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5011 	atomic_set(&connection->current_epoch->epoch_size, 0);
5012 	connection->send.seen_any_write_yet = false;
5013 
5014 	drbd_info(connection, "Connection closed\n");
5015 
5016 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5017 		conn_try_outdate_peer_async(connection);
5018 
5019 	spin_lock_irq(&connection->resource->req_lock);
5020 	oc = connection->cstate;
5021 	if (oc >= C_UNCONNECTED)
5022 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5023 
5024 	spin_unlock_irq(&connection->resource->req_lock);
5025 
5026 	if (oc == C_DISCONNECTING)
5027 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5028 }
5029 
5030 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5031 {
5032 	struct drbd_device *device = peer_device->device;
5033 	unsigned int i;
5034 
5035 	/* wait for current activity to cease. */
5036 	spin_lock_irq(&device->resource->req_lock);
5037 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5038 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5039 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5040 	spin_unlock_irq(&device->resource->req_lock);
5041 
5042 	/* We do not have data structures that would allow us to
5043 	 * get the rs_pending_cnt down to 0 again.
5044 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5045 	 *    the pending RSDataRequest's we have sent.
5046 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5047 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5048 	 *  And no, it is not the sum of the reference counts in the
5049 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5050 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5051 	 *  on the fly. */
5052 	drbd_rs_cancel_all(device);
5053 	device->rs_total = 0;
5054 	device->rs_failed = 0;
5055 	atomic_set(&device->rs_pending_cnt, 0);
5056 	wake_up(&device->misc_wait);
5057 
5058 	del_timer_sync(&device->resync_timer);
5059 	resync_timer_fn((unsigned long)device);
5060 
5061 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5062 	 * w_make_resync_request etc. which may still be on the worker queue
5063 	 * to be "canceled" */
5064 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5065 
5066 	drbd_finish_peer_reqs(device);
5067 
5068 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5069 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5070 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5071 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5072 
5073 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5074 	 * again via drbd_try_clear_on_disk_bm(). */
5075 	drbd_rs_cancel_all(device);
5076 
5077 	kfree(device->p_uuid);
5078 	device->p_uuid = NULL;
5079 
5080 	if (!drbd_suspended(device))
5081 		tl_clear(peer_device->connection);
5082 
5083 	drbd_md_sync(device);
5084 
5085 	if (get_ldev(device)) {
5086 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5087 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5088 		put_ldev(device);
5089 	}
5090 
5091 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5092 	 * want to use SO_LINGER, because apparently it can be deferred for
5093 	 * more than 20 seconds (longest time I checked).
5094 	 *
5095 	 * Actually we don't care for exactly when the network stack does its
5096 	 * put_page(), but release our reference on these pages right here.
5097 	 */
5098 	i = drbd_free_peer_reqs(device, &device->net_ee);
5099 	if (i)
5100 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5101 	i = atomic_read(&device->pp_in_use_by_net);
5102 	if (i)
5103 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5104 	i = atomic_read(&device->pp_in_use);
5105 	if (i)
5106 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5107 
5108 	D_ASSERT(device, list_empty(&device->read_ee));
5109 	D_ASSERT(device, list_empty(&device->active_ee));
5110 	D_ASSERT(device, list_empty(&device->sync_ee));
5111 	D_ASSERT(device, list_empty(&device->done_ee));
5112 
5113 	return 0;
5114 }
5115 
5116 /*
5117  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5118  * we can agree on is stored in agreed_pro_version.
5119  *
5120  * feature flags and the reserved array should be enough room for future
5121  * enhancements of the handshake protocol, and possible plugins...
5122  *
5123  * for now, they are expected to be zero, but ignored.
5124  */
5125 static int drbd_send_features(struct drbd_connection *connection)
5126 {
5127 	struct drbd_socket *sock;
5128 	struct p_connection_features *p;
5129 
5130 	sock = &connection->data;
5131 	p = conn_prepare_command(connection, sock);
5132 	if (!p)
5133 		return -EIO;
5134 	memset(p, 0, sizeof(*p));
5135 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5136 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5137 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5138 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5139 }
5140 
5141 /*
5142  * return values:
5143  *   1 yes, we have a valid connection
5144  *   0 oops, did not work out, please try again
5145  *  -1 peer talks different language,
5146  *     no point in trying again, please go standalone.
5147  */
5148 static int drbd_do_features(struct drbd_connection *connection)
5149 {
5150 	/* ASSERT current == connection->receiver ... */
5151 	struct p_connection_features *p;
5152 	const int expect = sizeof(struct p_connection_features);
5153 	struct packet_info pi;
5154 	int err;
5155 
5156 	err = drbd_send_features(connection);
5157 	if (err)
5158 		return 0;
5159 
5160 	err = drbd_recv_header(connection, &pi);
5161 	if (err)
5162 		return 0;
5163 
5164 	if (pi.cmd != P_CONNECTION_FEATURES) {
5165 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5166 			 cmdname(pi.cmd), pi.cmd);
5167 		return -1;
5168 	}
5169 
5170 	if (pi.size != expect) {
5171 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5172 		     expect, pi.size);
5173 		return -1;
5174 	}
5175 
5176 	p = pi.data;
5177 	err = drbd_recv_all_warn(connection, p, expect);
5178 	if (err)
5179 		return 0;
5180 
5181 	p->protocol_min = be32_to_cpu(p->protocol_min);
5182 	p->protocol_max = be32_to_cpu(p->protocol_max);
5183 	if (p->protocol_max == 0)
5184 		p->protocol_max = p->protocol_min;
5185 
5186 	if (PRO_VERSION_MAX < p->protocol_min ||
5187 	    PRO_VERSION_MIN > p->protocol_max)
5188 		goto incompat;
5189 
5190 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5191 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5192 
5193 	drbd_info(connection, "Handshake successful: "
5194 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5195 
5196 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5197 		  connection->agreed_features,
5198 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5199 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5200 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5201 		  connection->agreed_features ? "" : " none");
5202 
5203 	return 1;
5204 
5205  incompat:
5206 	drbd_err(connection, "incompatible DRBD dialects: "
5207 	    "I support %d-%d, peer supports %d-%d\n",
5208 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5209 	    p->protocol_min, p->protocol_max);
5210 	return -1;
5211 }
5212 
5213 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5214 static int drbd_do_auth(struct drbd_connection *connection)
5215 {
5216 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5217 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5218 	return -1;
5219 }
5220 #else
5221 #define CHALLENGE_LEN 64
5222 
5223 /* Return value:
5224 	1 - auth succeeded,
5225 	0 - failed, try again (network error),
5226 	-1 - auth failed, don't try again.
5227 */
5228 
5229 static int drbd_do_auth(struct drbd_connection *connection)
5230 {
5231 	struct drbd_socket *sock;
5232 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5233 	char *response = NULL;
5234 	char *right_response = NULL;
5235 	char *peers_ch = NULL;
5236 	unsigned int key_len;
5237 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5238 	unsigned int resp_size;
5239 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5240 	struct packet_info pi;
5241 	struct net_conf *nc;
5242 	int err, rv;
5243 
5244 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5245 
5246 	rcu_read_lock();
5247 	nc = rcu_dereference(connection->net_conf);
5248 	key_len = strlen(nc->shared_secret);
5249 	memcpy(secret, nc->shared_secret, key_len);
5250 	rcu_read_unlock();
5251 
5252 	desc->tfm = connection->cram_hmac_tfm;
5253 	desc->flags = 0;
5254 
5255 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5256 	if (rv) {
5257 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5258 		rv = -1;
5259 		goto fail;
5260 	}
5261 
5262 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5263 
5264 	sock = &connection->data;
5265 	if (!conn_prepare_command(connection, sock)) {
5266 		rv = 0;
5267 		goto fail;
5268 	}
5269 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5270 				my_challenge, CHALLENGE_LEN);
5271 	if (!rv)
5272 		goto fail;
5273 
5274 	err = drbd_recv_header(connection, &pi);
5275 	if (err) {
5276 		rv = 0;
5277 		goto fail;
5278 	}
5279 
5280 	if (pi.cmd != P_AUTH_CHALLENGE) {
5281 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5282 			 cmdname(pi.cmd), pi.cmd);
5283 		rv = 0;
5284 		goto fail;
5285 	}
5286 
5287 	if (pi.size > CHALLENGE_LEN * 2) {
5288 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5289 		rv = -1;
5290 		goto fail;
5291 	}
5292 
5293 	if (pi.size < CHALLENGE_LEN) {
5294 		drbd_err(connection, "AuthChallenge payload too small.\n");
5295 		rv = -1;
5296 		goto fail;
5297 	}
5298 
5299 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5300 	if (peers_ch == NULL) {
5301 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5302 		rv = -1;
5303 		goto fail;
5304 	}
5305 
5306 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5307 	if (err) {
5308 		rv = 0;
5309 		goto fail;
5310 	}
5311 
5312 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5313 		drbd_err(connection, "Peer presented the same challenge!\n");
5314 		rv = -1;
5315 		goto fail;
5316 	}
5317 
5318 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5319 	response = kmalloc(resp_size, GFP_NOIO);
5320 	if (response == NULL) {
5321 		drbd_err(connection, "kmalloc of response failed\n");
5322 		rv = -1;
5323 		goto fail;
5324 	}
5325 
5326 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5327 	if (rv) {
5328 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5329 		rv = -1;
5330 		goto fail;
5331 	}
5332 
5333 	if (!conn_prepare_command(connection, sock)) {
5334 		rv = 0;
5335 		goto fail;
5336 	}
5337 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5338 				response, resp_size);
5339 	if (!rv)
5340 		goto fail;
5341 
5342 	err = drbd_recv_header(connection, &pi);
5343 	if (err) {
5344 		rv = 0;
5345 		goto fail;
5346 	}
5347 
5348 	if (pi.cmd != P_AUTH_RESPONSE) {
5349 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5350 			 cmdname(pi.cmd), pi.cmd);
5351 		rv = 0;
5352 		goto fail;
5353 	}
5354 
5355 	if (pi.size != resp_size) {
5356 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5357 		rv = 0;
5358 		goto fail;
5359 	}
5360 
5361 	err = drbd_recv_all_warn(connection, response , resp_size);
5362 	if (err) {
5363 		rv = 0;
5364 		goto fail;
5365 	}
5366 
5367 	right_response = kmalloc(resp_size, GFP_NOIO);
5368 	if (right_response == NULL) {
5369 		drbd_err(connection, "kmalloc of right_response failed\n");
5370 		rv = -1;
5371 		goto fail;
5372 	}
5373 
5374 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5375 				 right_response);
5376 	if (rv) {
5377 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5378 		rv = -1;
5379 		goto fail;
5380 	}
5381 
5382 	rv = !memcmp(response, right_response, resp_size);
5383 
5384 	if (rv)
5385 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5386 		     resp_size);
5387 	else
5388 		rv = -1;
5389 
5390  fail:
5391 	kfree(peers_ch);
5392 	kfree(response);
5393 	kfree(right_response);
5394 	shash_desc_zero(desc);
5395 
5396 	return rv;
5397 }
5398 #endif
5399 
5400 int drbd_receiver(struct drbd_thread *thi)
5401 {
5402 	struct drbd_connection *connection = thi->connection;
5403 	int h;
5404 
5405 	drbd_info(connection, "receiver (re)started\n");
5406 
5407 	do {
5408 		h = conn_connect(connection);
5409 		if (h == 0) {
5410 			conn_disconnect(connection);
5411 			schedule_timeout_interruptible(HZ);
5412 		}
5413 		if (h == -1) {
5414 			drbd_warn(connection, "Discarding network configuration.\n");
5415 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5416 		}
5417 	} while (h == 0);
5418 
5419 	if (h > 0) {
5420 		blk_start_plug(&connection->receiver_plug);
5421 		drbdd(connection);
5422 		blk_finish_plug(&connection->receiver_plug);
5423 	}
5424 
5425 	conn_disconnect(connection);
5426 
5427 	drbd_info(connection, "receiver terminated\n");
5428 	return 0;
5429 }
5430 
5431 /* ********* acknowledge sender ******** */
5432 
5433 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5434 {
5435 	struct p_req_state_reply *p = pi->data;
5436 	int retcode = be32_to_cpu(p->retcode);
5437 
5438 	if (retcode >= SS_SUCCESS) {
5439 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5440 	} else {
5441 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5442 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5443 			 drbd_set_st_err_str(retcode), retcode);
5444 	}
5445 	wake_up(&connection->ping_wait);
5446 
5447 	return 0;
5448 }
5449 
5450 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5451 {
5452 	struct drbd_peer_device *peer_device;
5453 	struct drbd_device *device;
5454 	struct p_req_state_reply *p = pi->data;
5455 	int retcode = be32_to_cpu(p->retcode);
5456 
5457 	peer_device = conn_peer_device(connection, pi->vnr);
5458 	if (!peer_device)
5459 		return -EIO;
5460 	device = peer_device->device;
5461 
5462 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5463 		D_ASSERT(device, connection->agreed_pro_version < 100);
5464 		return got_conn_RqSReply(connection, pi);
5465 	}
5466 
5467 	if (retcode >= SS_SUCCESS) {
5468 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5469 	} else {
5470 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5471 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5472 			drbd_set_st_err_str(retcode), retcode);
5473 	}
5474 	wake_up(&device->state_wait);
5475 
5476 	return 0;
5477 }
5478 
5479 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5480 {
5481 	return drbd_send_ping_ack(connection);
5482 
5483 }
5484 
5485 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5486 {
5487 	/* restore idle timeout */
5488 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5489 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5490 		wake_up(&connection->ping_wait);
5491 
5492 	return 0;
5493 }
5494 
5495 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5496 {
5497 	struct drbd_peer_device *peer_device;
5498 	struct drbd_device *device;
5499 	struct p_block_ack *p = pi->data;
5500 	sector_t sector = be64_to_cpu(p->sector);
5501 	int blksize = be32_to_cpu(p->blksize);
5502 
5503 	peer_device = conn_peer_device(connection, pi->vnr);
5504 	if (!peer_device)
5505 		return -EIO;
5506 	device = peer_device->device;
5507 
5508 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5509 
5510 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5511 
5512 	if (get_ldev(device)) {
5513 		drbd_rs_complete_io(device, sector);
5514 		drbd_set_in_sync(device, sector, blksize);
5515 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5516 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5517 		put_ldev(device);
5518 	}
5519 	dec_rs_pending(device);
5520 	atomic_add(blksize >> 9, &device->rs_sect_in);
5521 
5522 	return 0;
5523 }
5524 
5525 static int
5526 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5527 			      struct rb_root *root, const char *func,
5528 			      enum drbd_req_event what, bool missing_ok)
5529 {
5530 	struct drbd_request *req;
5531 	struct bio_and_error m;
5532 
5533 	spin_lock_irq(&device->resource->req_lock);
5534 	req = find_request(device, root, id, sector, missing_ok, func);
5535 	if (unlikely(!req)) {
5536 		spin_unlock_irq(&device->resource->req_lock);
5537 		return -EIO;
5538 	}
5539 	__req_mod(req, what, &m);
5540 	spin_unlock_irq(&device->resource->req_lock);
5541 
5542 	if (m.bio)
5543 		complete_master_bio(device, &m);
5544 	return 0;
5545 }
5546 
5547 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5548 {
5549 	struct drbd_peer_device *peer_device;
5550 	struct drbd_device *device;
5551 	struct p_block_ack *p = pi->data;
5552 	sector_t sector = be64_to_cpu(p->sector);
5553 	int blksize = be32_to_cpu(p->blksize);
5554 	enum drbd_req_event what;
5555 
5556 	peer_device = conn_peer_device(connection, pi->vnr);
5557 	if (!peer_device)
5558 		return -EIO;
5559 	device = peer_device->device;
5560 
5561 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5562 
5563 	if (p->block_id == ID_SYNCER) {
5564 		drbd_set_in_sync(device, sector, blksize);
5565 		dec_rs_pending(device);
5566 		return 0;
5567 	}
5568 	switch (pi->cmd) {
5569 	case P_RS_WRITE_ACK:
5570 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5571 		break;
5572 	case P_WRITE_ACK:
5573 		what = WRITE_ACKED_BY_PEER;
5574 		break;
5575 	case P_RECV_ACK:
5576 		what = RECV_ACKED_BY_PEER;
5577 		break;
5578 	case P_SUPERSEDED:
5579 		what = CONFLICT_RESOLVED;
5580 		break;
5581 	case P_RETRY_WRITE:
5582 		what = POSTPONE_WRITE;
5583 		break;
5584 	default:
5585 		BUG();
5586 	}
5587 
5588 	return validate_req_change_req_state(device, p->block_id, sector,
5589 					     &device->write_requests, __func__,
5590 					     what, false);
5591 }
5592 
5593 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5594 {
5595 	struct drbd_peer_device *peer_device;
5596 	struct drbd_device *device;
5597 	struct p_block_ack *p = pi->data;
5598 	sector_t sector = be64_to_cpu(p->sector);
5599 	int size = be32_to_cpu(p->blksize);
5600 	int err;
5601 
5602 	peer_device = conn_peer_device(connection, pi->vnr);
5603 	if (!peer_device)
5604 		return -EIO;
5605 	device = peer_device->device;
5606 
5607 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5608 
5609 	if (p->block_id == ID_SYNCER) {
5610 		dec_rs_pending(device);
5611 		drbd_rs_failed_io(device, sector, size);
5612 		return 0;
5613 	}
5614 
5615 	err = validate_req_change_req_state(device, p->block_id, sector,
5616 					    &device->write_requests, __func__,
5617 					    NEG_ACKED, true);
5618 	if (err) {
5619 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5620 		   The master bio might already be completed, therefore the
5621 		   request is no longer in the collision hash. */
5622 		/* In Protocol B we might already have got a P_RECV_ACK
5623 		   but then get a P_NEG_ACK afterwards. */
5624 		drbd_set_out_of_sync(device, sector, size);
5625 	}
5626 	return 0;
5627 }
5628 
5629 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5630 {
5631 	struct drbd_peer_device *peer_device;
5632 	struct drbd_device *device;
5633 	struct p_block_ack *p = pi->data;
5634 	sector_t sector = be64_to_cpu(p->sector);
5635 
5636 	peer_device = conn_peer_device(connection, pi->vnr);
5637 	if (!peer_device)
5638 		return -EIO;
5639 	device = peer_device->device;
5640 
5641 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5642 
5643 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5644 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5645 
5646 	return validate_req_change_req_state(device, p->block_id, sector,
5647 					     &device->read_requests, __func__,
5648 					     NEG_ACKED, false);
5649 }
5650 
5651 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5652 {
5653 	struct drbd_peer_device *peer_device;
5654 	struct drbd_device *device;
5655 	sector_t sector;
5656 	int size;
5657 	struct p_block_ack *p = pi->data;
5658 
5659 	peer_device = conn_peer_device(connection, pi->vnr);
5660 	if (!peer_device)
5661 		return -EIO;
5662 	device = peer_device->device;
5663 
5664 	sector = be64_to_cpu(p->sector);
5665 	size = be32_to_cpu(p->blksize);
5666 
5667 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5668 
5669 	dec_rs_pending(device);
5670 
5671 	if (get_ldev_if_state(device, D_FAILED)) {
5672 		drbd_rs_complete_io(device, sector);
5673 		switch (pi->cmd) {
5674 		case P_NEG_RS_DREPLY:
5675 			drbd_rs_failed_io(device, sector, size);
5676 		case P_RS_CANCEL:
5677 			break;
5678 		default:
5679 			BUG();
5680 		}
5681 		put_ldev(device);
5682 	}
5683 
5684 	return 0;
5685 }
5686 
5687 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5688 {
5689 	struct p_barrier_ack *p = pi->data;
5690 	struct drbd_peer_device *peer_device;
5691 	int vnr;
5692 
5693 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5694 
5695 	rcu_read_lock();
5696 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5697 		struct drbd_device *device = peer_device->device;
5698 
5699 		if (device->state.conn == C_AHEAD &&
5700 		    atomic_read(&device->ap_in_flight) == 0 &&
5701 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5702 			device->start_resync_timer.expires = jiffies + HZ;
5703 			add_timer(&device->start_resync_timer);
5704 		}
5705 	}
5706 	rcu_read_unlock();
5707 
5708 	return 0;
5709 }
5710 
5711 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5712 {
5713 	struct drbd_peer_device *peer_device;
5714 	struct drbd_device *device;
5715 	struct p_block_ack *p = pi->data;
5716 	struct drbd_device_work *dw;
5717 	sector_t sector;
5718 	int size;
5719 
5720 	peer_device = conn_peer_device(connection, pi->vnr);
5721 	if (!peer_device)
5722 		return -EIO;
5723 	device = peer_device->device;
5724 
5725 	sector = be64_to_cpu(p->sector);
5726 	size = be32_to_cpu(p->blksize);
5727 
5728 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5729 
5730 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5731 		drbd_ov_out_of_sync_found(device, sector, size);
5732 	else
5733 		ov_out_of_sync_print(device);
5734 
5735 	if (!get_ldev(device))
5736 		return 0;
5737 
5738 	drbd_rs_complete_io(device, sector);
5739 	dec_rs_pending(device);
5740 
5741 	--device->ov_left;
5742 
5743 	/* let's advance progress step marks only for every other megabyte */
5744 	if ((device->ov_left & 0x200) == 0x200)
5745 		drbd_advance_rs_marks(device, device->ov_left);
5746 
5747 	if (device->ov_left == 0) {
5748 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5749 		if (dw) {
5750 			dw->w.cb = w_ov_finished;
5751 			dw->device = device;
5752 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5753 		} else {
5754 			drbd_err(device, "kmalloc(dw) failed.");
5755 			ov_out_of_sync_print(device);
5756 			drbd_resync_finished(device);
5757 		}
5758 	}
5759 	put_ldev(device);
5760 	return 0;
5761 }
5762 
5763 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5764 {
5765 	return 0;
5766 }
5767 
5768 struct meta_sock_cmd {
5769 	size_t pkt_size;
5770 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5771 };
5772 
5773 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5774 {
5775 	long t;
5776 	struct net_conf *nc;
5777 
5778 	rcu_read_lock();
5779 	nc = rcu_dereference(connection->net_conf);
5780 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5781 	rcu_read_unlock();
5782 
5783 	t *= HZ;
5784 	if (ping_timeout)
5785 		t /= 10;
5786 
5787 	connection->meta.socket->sk->sk_rcvtimeo = t;
5788 }
5789 
5790 static void set_ping_timeout(struct drbd_connection *connection)
5791 {
5792 	set_rcvtimeo(connection, 1);
5793 }
5794 
5795 static void set_idle_timeout(struct drbd_connection *connection)
5796 {
5797 	set_rcvtimeo(connection, 0);
5798 }
5799 
5800 static struct meta_sock_cmd ack_receiver_tbl[] = {
5801 	[P_PING]	    = { 0, got_Ping },
5802 	[P_PING_ACK]	    = { 0, got_PingAck },
5803 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5804 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5805 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5806 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5807 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5808 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5809 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5810 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5811 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5812 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5813 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5814 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5815 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5816 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5817 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5818 };
5819 
5820 int drbd_ack_receiver(struct drbd_thread *thi)
5821 {
5822 	struct drbd_connection *connection = thi->connection;
5823 	struct meta_sock_cmd *cmd = NULL;
5824 	struct packet_info pi;
5825 	unsigned long pre_recv_jif;
5826 	int rv;
5827 	void *buf    = connection->meta.rbuf;
5828 	int received = 0;
5829 	unsigned int header_size = drbd_header_size(connection);
5830 	int expect   = header_size;
5831 	bool ping_timeout_active = false;
5832 	struct sched_param param = { .sched_priority = 2 };
5833 
5834 	rv = sched_setscheduler(current, SCHED_RR, &param);
5835 	if (rv < 0)
5836 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5837 
5838 	while (get_t_state(thi) == RUNNING) {
5839 		drbd_thread_current_set_cpu(thi);
5840 
5841 		conn_reclaim_net_peer_reqs(connection);
5842 
5843 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5844 			if (drbd_send_ping(connection)) {
5845 				drbd_err(connection, "drbd_send_ping has failed\n");
5846 				goto reconnect;
5847 			}
5848 			set_ping_timeout(connection);
5849 			ping_timeout_active = true;
5850 		}
5851 
5852 		pre_recv_jif = jiffies;
5853 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5854 
5855 		/* Note:
5856 		 * -EINTR	 (on meta) we got a signal
5857 		 * -EAGAIN	 (on meta) rcvtimeo expired
5858 		 * -ECONNRESET	 other side closed the connection
5859 		 * -ERESTARTSYS  (on data) we got a signal
5860 		 * rv <  0	 other than above: unexpected error!
5861 		 * rv == expected: full header or command
5862 		 * rv <  expected: "woken" by signal during receive
5863 		 * rv == 0	 : "connection shut down by peer"
5864 		 */
5865 		if (likely(rv > 0)) {
5866 			received += rv;
5867 			buf	 += rv;
5868 		} else if (rv == 0) {
5869 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5870 				long t;
5871 				rcu_read_lock();
5872 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5873 				rcu_read_unlock();
5874 
5875 				t = wait_event_timeout(connection->ping_wait,
5876 						       connection->cstate < C_WF_REPORT_PARAMS,
5877 						       t);
5878 				if (t)
5879 					break;
5880 			}
5881 			drbd_err(connection, "meta connection shut down by peer.\n");
5882 			goto reconnect;
5883 		} else if (rv == -EAGAIN) {
5884 			/* If the data socket received something meanwhile,
5885 			 * that is good enough: peer is still alive. */
5886 			if (time_after(connection->last_received, pre_recv_jif))
5887 				continue;
5888 			if (ping_timeout_active) {
5889 				drbd_err(connection, "PingAck did not arrive in time.\n");
5890 				goto reconnect;
5891 			}
5892 			set_bit(SEND_PING, &connection->flags);
5893 			continue;
5894 		} else if (rv == -EINTR) {
5895 			/* maybe drbd_thread_stop(): the while condition will notice.
5896 			 * maybe woken for send_ping: we'll send a ping above,
5897 			 * and change the rcvtimeo */
5898 			flush_signals(current);
5899 			continue;
5900 		} else {
5901 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5902 			goto reconnect;
5903 		}
5904 
5905 		if (received == expect && cmd == NULL) {
5906 			if (decode_header(connection, connection->meta.rbuf, &pi))
5907 				goto reconnect;
5908 			cmd = &ack_receiver_tbl[pi.cmd];
5909 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5910 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5911 					 cmdname(pi.cmd), pi.cmd);
5912 				goto disconnect;
5913 			}
5914 			expect = header_size + cmd->pkt_size;
5915 			if (pi.size != expect - header_size) {
5916 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5917 					pi.cmd, pi.size);
5918 				goto reconnect;
5919 			}
5920 		}
5921 		if (received == expect) {
5922 			bool err;
5923 
5924 			err = cmd->fn(connection, &pi);
5925 			if (err) {
5926 				drbd_err(connection, "%pf failed\n", cmd->fn);
5927 				goto reconnect;
5928 			}
5929 
5930 			connection->last_received = jiffies;
5931 
5932 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5933 				set_idle_timeout(connection);
5934 				ping_timeout_active = false;
5935 			}
5936 
5937 			buf	 = connection->meta.rbuf;
5938 			received = 0;
5939 			expect	 = header_size;
5940 			cmd	 = NULL;
5941 		}
5942 	}
5943 
5944 	if (0) {
5945 reconnect:
5946 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5947 		conn_md_sync(connection);
5948 	}
5949 	if (0) {
5950 disconnect:
5951 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5952 	}
5953 
5954 	drbd_info(connection, "ack_receiver terminated\n");
5955 
5956 	return 0;
5957 }
5958 
5959 void drbd_send_acks_wf(struct work_struct *ws)
5960 {
5961 	struct drbd_peer_device *peer_device =
5962 		container_of(ws, struct drbd_peer_device, send_acks_work);
5963 	struct drbd_connection *connection = peer_device->connection;
5964 	struct drbd_device *device = peer_device->device;
5965 	struct net_conf *nc;
5966 	int tcp_cork, err;
5967 
5968 	rcu_read_lock();
5969 	nc = rcu_dereference(connection->net_conf);
5970 	tcp_cork = nc->tcp_cork;
5971 	rcu_read_unlock();
5972 
5973 	if (tcp_cork)
5974 		drbd_tcp_cork(connection->meta.socket);
5975 
5976 	err = drbd_finish_peer_reqs(device);
5977 	kref_put(&device->kref, drbd_destroy_device);
5978 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5979 	   struct work_struct send_acks_work alive, which is in the peer_device object */
5980 
5981 	if (err) {
5982 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5983 		return;
5984 	}
5985 
5986 	if (tcp_cork)
5987 		drbd_tcp_uncork(connection->meta.socket);
5988 
5989 	return;
5990 }
5991