xref: /openbmc/linux/drivers/block/drbd/drbd_receiver.c (revision 4da722ca19f30f7db250db808d1ab1703607a932)
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52 
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54 
55 struct packet_info {
56 	enum drbd_packet cmd;
57 	unsigned int size;
58 	unsigned int vnr;
59 	void *data;
60 };
61 
62 enum finish_epoch {
63 	FE_STILL_LIVE,
64 	FE_DESTROYED,
65 	FE_RECYCLED,
66 };
67 
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74 
75 
76 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
77 
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82 
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89 	struct page *page;
90 	struct page *tmp;
91 
92 	BUG_ON(!n);
93 	BUG_ON(!head);
94 
95 	page = *head;
96 
97 	if (!page)
98 		return NULL;
99 
100 	while (page) {
101 		tmp = page_chain_next(page);
102 		if (--n == 0)
103 			break; /* found sufficient pages */
104 		if (tmp == NULL)
105 			/* insufficient pages, don't use any of them. */
106 			return NULL;
107 		page = tmp;
108 	}
109 
110 	/* add end of list marker for the returned list */
111 	set_page_private(page, 0);
112 	/* actual return value, and adjustment of head */
113 	page = *head;
114 	*head = tmp;
115 	return page;
116 }
117 
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123 	struct page *tmp;
124 	int i = 1;
125 	while ((tmp = page_chain_next(page)))
126 		++i, page = tmp;
127 	if (len)
128 		*len = i;
129 	return page;
130 }
131 
132 static int page_chain_free(struct page *page)
133 {
134 	struct page *tmp;
135 	int i = 0;
136 	page_chain_for_each_safe(page, tmp) {
137 		put_page(page);
138 		++i;
139 	}
140 	return i;
141 }
142 
143 static void page_chain_add(struct page **head,
144 		struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147 	struct page *tmp;
148 	tmp = page_chain_tail(chain_first, NULL);
149 	BUG_ON(tmp != chain_last);
150 #endif
151 
152 	/* add chain to head */
153 	set_page_private(chain_last, (unsigned long)*head);
154 	*head = chain_first;
155 }
156 
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 				       unsigned int number)
159 {
160 	struct page *page = NULL;
161 	struct page *tmp = NULL;
162 	unsigned int i = 0;
163 
164 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
165 	 * So what. It saves a spin_lock. */
166 	if (drbd_pp_vacant >= number) {
167 		spin_lock(&drbd_pp_lock);
168 		page = page_chain_del(&drbd_pp_pool, number);
169 		if (page)
170 			drbd_pp_vacant -= number;
171 		spin_unlock(&drbd_pp_lock);
172 		if (page)
173 			return page;
174 	}
175 
176 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 	 * which in turn might block on the other node at this very place.  */
179 	for (i = 0; i < number; i++) {
180 		tmp = alloc_page(GFP_TRY);
181 		if (!tmp)
182 			break;
183 		set_page_private(tmp, (unsigned long)page);
184 		page = tmp;
185 	}
186 
187 	if (i == number)
188 		return page;
189 
190 	/* Not enough pages immediately available this time.
191 	 * No need to jump around here, drbd_alloc_pages will retry this
192 	 * function "soon". */
193 	if (page) {
194 		tmp = page_chain_tail(page, NULL);
195 		spin_lock(&drbd_pp_lock);
196 		page_chain_add(&drbd_pp_pool, page, tmp);
197 		drbd_pp_vacant += i;
198 		spin_unlock(&drbd_pp_lock);
199 	}
200 	return NULL;
201 }
202 
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 					   struct list_head *to_be_freed)
205 {
206 	struct drbd_peer_request *peer_req, *tmp;
207 
208 	/* The EEs are always appended to the end of the list. Since
209 	   they are sent in order over the wire, they have to finish
210 	   in order. As soon as we see the first not finished we can
211 	   stop to examine the list... */
212 
213 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 		if (drbd_peer_req_has_active_page(peer_req))
215 			break;
216 		list_move(&peer_req->w.list, to_be_freed);
217 	}
218 }
219 
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222 	LIST_HEAD(reclaimed);
223 	struct drbd_peer_request *peer_req, *t;
224 
225 	spin_lock_irq(&device->resource->req_lock);
226 	reclaim_finished_net_peer_reqs(device, &reclaimed);
227 	spin_unlock_irq(&device->resource->req_lock);
228 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 		drbd_free_net_peer_req(device, peer_req);
230 }
231 
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234 	struct drbd_peer_device *peer_device;
235 	int vnr;
236 
237 	rcu_read_lock();
238 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 		struct drbd_device *device = peer_device->device;
240 		if (!atomic_read(&device->pp_in_use_by_net))
241 			continue;
242 
243 		kref_get(&device->kref);
244 		rcu_read_unlock();
245 		drbd_reclaim_net_peer_reqs(device);
246 		kref_put(&device->kref, drbd_destroy_device);
247 		rcu_read_lock();
248 	}
249 	rcu_read_unlock();
250 }
251 
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:	DRBD device.
255  * @number:	number of pages requested
256  * @retry:	whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 			      bool retry)
274 {
275 	struct drbd_device *device = peer_device->device;
276 	struct page *page = NULL;
277 	struct net_conf *nc;
278 	DEFINE_WAIT(wait);
279 	unsigned int mxb;
280 
281 	rcu_read_lock();
282 	nc = rcu_dereference(peer_device->connection->net_conf);
283 	mxb = nc ? nc->max_buffers : 1000000;
284 	rcu_read_unlock();
285 
286 	if (atomic_read(&device->pp_in_use) < mxb)
287 		page = __drbd_alloc_pages(device, number);
288 
289 	/* Try to keep the fast path fast, but occasionally we need
290 	 * to reclaim the pages we lended to the network stack. */
291 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 		drbd_reclaim_net_peer_reqs(device);
293 
294 	while (page == NULL) {
295 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296 
297 		drbd_reclaim_net_peer_reqs(device);
298 
299 		if (atomic_read(&device->pp_in_use) < mxb) {
300 			page = __drbd_alloc_pages(device, number);
301 			if (page)
302 				break;
303 		}
304 
305 		if (!retry)
306 			break;
307 
308 		if (signal_pending(current)) {
309 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 			break;
311 		}
312 
313 		if (schedule_timeout(HZ/10) == 0)
314 			mxb = UINT_MAX;
315 	}
316 	finish_wait(&drbd_pp_wait, &wait);
317 
318 	if (page)
319 		atomic_add(number, &device->pp_in_use);
320 	return page;
321 }
322 
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 	int i;
331 
332 	if (page == NULL)
333 		return;
334 
335 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
336 		i = page_chain_free(page);
337 	else {
338 		struct page *tmp;
339 		tmp = page_chain_tail(page, &i);
340 		spin_lock(&drbd_pp_lock);
341 		page_chain_add(&drbd_pp_pool, page, tmp);
342 		drbd_pp_vacant += i;
343 		spin_unlock(&drbd_pp_lock);
344 	}
345 	i = atomic_sub_return(i, a);
346 	if (i < 0)
347 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 	wake_up(&drbd_pp_wait);
350 }
351 
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355 
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365 
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373 	struct drbd_device *device = peer_device->device;
374 	struct drbd_peer_request *peer_req;
375 	struct page *page = NULL;
376 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377 
378 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 		return NULL;
380 
381 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 	if (!peer_req) {
383 		if (!(gfp_mask & __GFP_NOWARN))
384 			drbd_err(device, "%s: allocation failed\n", __func__);
385 		return NULL;
386 	}
387 
388 	if (nr_pages) {
389 		page = drbd_alloc_pages(peer_device, nr_pages,
390 					gfpflags_allow_blocking(gfp_mask));
391 		if (!page)
392 			goto fail;
393 	}
394 
395 	memset(peer_req, 0, sizeof(*peer_req));
396 	INIT_LIST_HEAD(&peer_req->w.list);
397 	drbd_clear_interval(&peer_req->i);
398 	peer_req->i.size = request_size;
399 	peer_req->i.sector = sector;
400 	peer_req->submit_jif = jiffies;
401 	peer_req->peer_device = peer_device;
402 	peer_req->pages = page;
403 	/*
404 	 * The block_id is opaque to the receiver.  It is not endianness
405 	 * converted, and sent back to the sender unchanged.
406 	 */
407 	peer_req->block_id = id;
408 
409 	return peer_req;
410 
411  fail:
412 	mempool_free(peer_req, drbd_ee_mempool);
413 	return NULL;
414 }
415 
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 		       int is_net)
418 {
419 	might_sleep();
420 	if (peer_req->flags & EE_HAS_DIGEST)
421 		kfree(peer_req->digest);
422 	drbd_free_pages(device, peer_req->pages, is_net);
423 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 		drbd_al_complete_io(device, &peer_req->i);
428 	}
429 	mempool_free(peer_req, drbd_ee_mempool);
430 }
431 
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434 	LIST_HEAD(work_list);
435 	struct drbd_peer_request *peer_req, *t;
436 	int count = 0;
437 	int is_net = list == &device->net_ee;
438 
439 	spin_lock_irq(&device->resource->req_lock);
440 	list_splice_init(list, &work_list);
441 	spin_unlock_irq(&device->resource->req_lock);
442 
443 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 		__drbd_free_peer_req(device, peer_req, is_net);
445 		count++;
446 	}
447 	return count;
448 }
449 
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455 	LIST_HEAD(work_list);
456 	LIST_HEAD(reclaimed);
457 	struct drbd_peer_request *peer_req, *t;
458 	int err = 0;
459 
460 	spin_lock_irq(&device->resource->req_lock);
461 	reclaim_finished_net_peer_reqs(device, &reclaimed);
462 	list_splice_init(&device->done_ee, &work_list);
463 	spin_unlock_irq(&device->resource->req_lock);
464 
465 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 		drbd_free_net_peer_req(device, peer_req);
467 
468 	/* possible callbacks here:
469 	 * e_end_block, and e_end_resync_block, e_send_superseded.
470 	 * all ignore the last argument.
471 	 */
472 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 		int err2;
474 
475 		/* list_del not necessary, next/prev members not touched */
476 		err2 = peer_req->w.cb(&peer_req->w, !!err);
477 		if (!err)
478 			err = err2;
479 		drbd_free_peer_req(device, peer_req);
480 	}
481 	wake_up(&device->ee_wait);
482 
483 	return err;
484 }
485 
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 				     struct list_head *head)
488 {
489 	DEFINE_WAIT(wait);
490 
491 	/* avoids spin_lock/unlock
492 	 * and calling prepare_to_wait in the fast path */
493 	while (!list_empty(head)) {
494 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 		spin_unlock_irq(&device->resource->req_lock);
496 		io_schedule();
497 		finish_wait(&device->ee_wait, &wait);
498 		spin_lock_irq(&device->resource->req_lock);
499 	}
500 }
501 
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 				    struct list_head *head)
504 {
505 	spin_lock_irq(&device->resource->req_lock);
506 	_drbd_wait_ee_list_empty(device, head);
507 	spin_unlock_irq(&device->resource->req_lock);
508 }
509 
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512 	struct kvec iov = {
513 		.iov_base = buf,
514 		.iov_len = size,
515 	};
516 	struct msghdr msg = {
517 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518 	};
519 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521 
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524 	int rv;
525 
526 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527 
528 	if (rv < 0) {
529 		if (rv == -ECONNRESET)
530 			drbd_info(connection, "sock was reset by peer\n");
531 		else if (rv != -ERESTARTSYS)
532 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533 	} else if (rv == 0) {
534 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535 			long t;
536 			rcu_read_lock();
537 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538 			rcu_read_unlock();
539 
540 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541 
542 			if (t)
543 				goto out;
544 		}
545 		drbd_info(connection, "sock was shut down by peer\n");
546 	}
547 
548 	if (rv != size)
549 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550 
551 out:
552 	return rv;
553 }
554 
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557 	int err;
558 
559 	err = drbd_recv(connection, buf, size);
560 	if (err != size) {
561 		if (err >= 0)
562 			err = -EIO;
563 	} else
564 		err = 0;
565 	return err;
566 }
567 
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570 	int err;
571 
572 	err = drbd_recv_all(connection, buf, size);
573 	if (err && !signal_pending(current))
574 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575 	return err;
576 }
577 
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584 		unsigned int rcv)
585 {
586 	/* open coded SO_SNDBUF, SO_RCVBUF */
587 	if (snd) {
588 		sock->sk->sk_sndbuf = snd;
589 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590 	}
591 	if (rcv) {
592 		sock->sk->sk_rcvbuf = rcv;
593 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594 	}
595 }
596 
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599 	const char *what;
600 	struct socket *sock;
601 	struct sockaddr_in6 src_in6;
602 	struct sockaddr_in6 peer_in6;
603 	struct net_conf *nc;
604 	int err, peer_addr_len, my_addr_len;
605 	int sndbuf_size, rcvbuf_size, connect_int;
606 	int disconnect_on_error = 1;
607 
608 	rcu_read_lock();
609 	nc = rcu_dereference(connection->net_conf);
610 	if (!nc) {
611 		rcu_read_unlock();
612 		return NULL;
613 	}
614 	sndbuf_size = nc->sndbuf_size;
615 	rcvbuf_size = nc->rcvbuf_size;
616 	connect_int = nc->connect_int;
617 	rcu_read_unlock();
618 
619 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
621 
622 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623 		src_in6.sin6_port = 0;
624 	else
625 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626 
627 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629 
630 	what = "sock_create_kern";
631 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632 			       SOCK_STREAM, IPPROTO_TCP, &sock);
633 	if (err < 0) {
634 		sock = NULL;
635 		goto out;
636 	}
637 
638 	sock->sk->sk_rcvtimeo =
639 	sock->sk->sk_sndtimeo = connect_int * HZ;
640 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641 
642        /* explicitly bind to the configured IP as source IP
643 	*  for the outgoing connections.
644 	*  This is needed for multihomed hosts and to be
645 	*  able to use lo: interfaces for drbd.
646 	* Make sure to use 0 as port number, so linux selects
647 	*  a free one dynamically.
648 	*/
649 	what = "bind before connect";
650 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651 	if (err < 0)
652 		goto out;
653 
654 	/* connect may fail, peer not yet available.
655 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
656 	disconnect_on_error = 0;
657 	what = "connect";
658 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659 
660 out:
661 	if (err < 0) {
662 		if (sock) {
663 			sock_release(sock);
664 			sock = NULL;
665 		}
666 		switch (-err) {
667 			/* timeout, busy, signal pending */
668 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669 		case EINTR: case ERESTARTSYS:
670 			/* peer not (yet) available, network problem */
671 		case ECONNREFUSED: case ENETUNREACH:
672 		case EHOSTDOWN:    case EHOSTUNREACH:
673 			disconnect_on_error = 0;
674 			break;
675 		default:
676 			drbd_err(connection, "%s failed, err = %d\n", what, err);
677 		}
678 		if (disconnect_on_error)
679 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680 	}
681 
682 	return sock;
683 }
684 
685 struct accept_wait_data {
686 	struct drbd_connection *connection;
687 	struct socket *s_listen;
688 	struct completion door_bell;
689 	void (*original_sk_state_change)(struct sock *sk);
690 
691 };
692 
693 static void drbd_incoming_connection(struct sock *sk)
694 {
695 	struct accept_wait_data *ad = sk->sk_user_data;
696 	void (*state_change)(struct sock *sk);
697 
698 	state_change = ad->original_sk_state_change;
699 	if (sk->sk_state == TCP_ESTABLISHED)
700 		complete(&ad->door_bell);
701 	state_change(sk);
702 }
703 
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
707 	struct sockaddr_in6 my_addr;
708 	struct socket *s_listen;
709 	struct net_conf *nc;
710 	const char *what;
711 
712 	rcu_read_lock();
713 	nc = rcu_dereference(connection->net_conf);
714 	if (!nc) {
715 		rcu_read_unlock();
716 		return -EIO;
717 	}
718 	sndbuf_size = nc->sndbuf_size;
719 	rcvbuf_size = nc->rcvbuf_size;
720 	rcu_read_unlock();
721 
722 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
724 
725 	what = "sock_create_kern";
726 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
728 	if (err) {
729 		s_listen = NULL;
730 		goto out;
731 	}
732 
733 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735 
736 	what = "bind before listen";
737 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738 	if (err < 0)
739 		goto out;
740 
741 	ad->s_listen = s_listen;
742 	write_lock_bh(&s_listen->sk->sk_callback_lock);
743 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
744 	s_listen->sk->sk_state_change = drbd_incoming_connection;
745 	s_listen->sk->sk_user_data = ad;
746 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
747 
748 	what = "listen";
749 	err = s_listen->ops->listen(s_listen, 5);
750 	if (err < 0)
751 		goto out;
752 
753 	return 0;
754 out:
755 	if (s_listen)
756 		sock_release(s_listen);
757 	if (err < 0) {
758 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759 			drbd_err(connection, "%s failed, err = %d\n", what, err);
760 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761 		}
762 	}
763 
764 	return -EIO;
765 }
766 
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769 	write_lock_bh(&sk->sk_callback_lock);
770 	sk->sk_state_change = ad->original_sk_state_change;
771 	sk->sk_user_data = NULL;
772 	write_unlock_bh(&sk->sk_callback_lock);
773 }
774 
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777 	int timeo, connect_int, err = 0;
778 	struct socket *s_estab = NULL;
779 	struct net_conf *nc;
780 
781 	rcu_read_lock();
782 	nc = rcu_dereference(connection->net_conf);
783 	if (!nc) {
784 		rcu_read_unlock();
785 		return NULL;
786 	}
787 	connect_int = nc->connect_int;
788 	rcu_read_unlock();
789 
790 	timeo = connect_int * HZ;
791 	/* 28.5% random jitter */
792 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793 
794 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795 	if (err <= 0)
796 		return NULL;
797 
798 	err = kernel_accept(ad->s_listen, &s_estab, 0);
799 	if (err < 0) {
800 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801 			drbd_err(connection, "accept failed, err = %d\n", err);
802 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803 		}
804 	}
805 
806 	if (s_estab)
807 		unregister_state_change(s_estab->sk, ad);
808 
809 	return s_estab;
810 }
811 
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813 
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815 			     enum drbd_packet cmd)
816 {
817 	if (!conn_prepare_command(connection, sock))
818 		return -EIO;
819 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821 
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824 	unsigned int header_size = drbd_header_size(connection);
825 	struct packet_info pi;
826 	struct net_conf *nc;
827 	int err;
828 
829 	rcu_read_lock();
830 	nc = rcu_dereference(connection->net_conf);
831 	if (!nc) {
832 		rcu_read_unlock();
833 		return -EIO;
834 	}
835 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836 	rcu_read_unlock();
837 
838 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839 	if (err != header_size) {
840 		if (err >= 0)
841 			err = -EIO;
842 		return err;
843 	}
844 	err = decode_header(connection, connection->data.rbuf, &pi);
845 	if (err)
846 		return err;
847 	return pi.cmd;
848 }
849 
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:	pointer to the pointer to the socket.
853  */
854 static bool drbd_socket_okay(struct socket **sock)
855 {
856 	int rr;
857 	char tb[4];
858 
859 	if (!*sock)
860 		return false;
861 
862 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863 
864 	if (rr > 0 || rr == -EAGAIN) {
865 		return true;
866 	} else {
867 		sock_release(*sock);
868 		*sock = NULL;
869 		return false;
870 	}
871 }
872 
873 static bool connection_established(struct drbd_connection *connection,
874 				   struct socket **sock1,
875 				   struct socket **sock2)
876 {
877 	struct net_conf *nc;
878 	int timeout;
879 	bool ok;
880 
881 	if (!*sock1 || !*sock2)
882 		return false;
883 
884 	rcu_read_lock();
885 	nc = rcu_dereference(connection->net_conf);
886 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887 	rcu_read_unlock();
888 	schedule_timeout_interruptible(timeout);
889 
890 	ok = drbd_socket_okay(sock1);
891 	ok = drbd_socket_okay(sock2) && ok;
892 
893 	return ok;
894 }
895 
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900 	struct drbd_device *device = peer_device->device;
901 	int err;
902 
903 	atomic_set(&device->packet_seq, 0);
904 	device->peer_seq = 0;
905 
906 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907 		&peer_device->connection->cstate_mutex :
908 		&device->own_state_mutex;
909 
910 	err = drbd_send_sync_param(peer_device);
911 	if (!err)
912 		err = drbd_send_sizes(peer_device, 0, 0);
913 	if (!err)
914 		err = drbd_send_uuids(peer_device);
915 	if (!err)
916 		err = drbd_send_current_state(peer_device);
917 	clear_bit(USE_DEGR_WFC_T, &device->flags);
918 	clear_bit(RESIZE_PENDING, &device->flags);
919 	atomic_set(&device->ap_in_flight, 0);
920 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921 	return err;
922 }
923 
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
932 static int conn_connect(struct drbd_connection *connection)
933 {
934 	struct drbd_socket sock, msock;
935 	struct drbd_peer_device *peer_device;
936 	struct net_conf *nc;
937 	int vnr, timeout, h;
938 	bool discard_my_data, ok;
939 	enum drbd_state_rv rv;
940 	struct accept_wait_data ad = {
941 		.connection = connection,
942 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943 	};
944 
945 	clear_bit(DISCONNECT_SENT, &connection->flags);
946 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947 		return -2;
948 
949 	mutex_init(&sock.mutex);
950 	sock.sbuf = connection->data.sbuf;
951 	sock.rbuf = connection->data.rbuf;
952 	sock.socket = NULL;
953 	mutex_init(&msock.mutex);
954 	msock.sbuf = connection->meta.sbuf;
955 	msock.rbuf = connection->meta.rbuf;
956 	msock.socket = NULL;
957 
958 	/* Assume that the peer only understands protocol 80 until we know better.  */
959 	connection->agreed_pro_version = 80;
960 
961 	if (prepare_listen_socket(connection, &ad))
962 		return 0;
963 
964 	do {
965 		struct socket *s;
966 
967 		s = drbd_try_connect(connection);
968 		if (s) {
969 			if (!sock.socket) {
970 				sock.socket = s;
971 				send_first_packet(connection, &sock, P_INITIAL_DATA);
972 			} else if (!msock.socket) {
973 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974 				msock.socket = s;
975 				send_first_packet(connection, &msock, P_INITIAL_META);
976 			} else {
977 				drbd_err(connection, "Logic error in conn_connect()\n");
978 				goto out_release_sockets;
979 			}
980 		}
981 
982 		if (connection_established(connection, &sock.socket, &msock.socket))
983 			break;
984 
985 retry:
986 		s = drbd_wait_for_connect(connection, &ad);
987 		if (s) {
988 			int fp = receive_first_packet(connection, s);
989 			drbd_socket_okay(&sock.socket);
990 			drbd_socket_okay(&msock.socket);
991 			switch (fp) {
992 			case P_INITIAL_DATA:
993 				if (sock.socket) {
994 					drbd_warn(connection, "initial packet S crossed\n");
995 					sock_release(sock.socket);
996 					sock.socket = s;
997 					goto randomize;
998 				}
999 				sock.socket = s;
1000 				break;
1001 			case P_INITIAL_META:
1002 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003 				if (msock.socket) {
1004 					drbd_warn(connection, "initial packet M crossed\n");
1005 					sock_release(msock.socket);
1006 					msock.socket = s;
1007 					goto randomize;
1008 				}
1009 				msock.socket = s;
1010 				break;
1011 			default:
1012 				drbd_warn(connection, "Error receiving initial packet\n");
1013 				sock_release(s);
1014 randomize:
1015 				if (prandom_u32() & 1)
1016 					goto retry;
1017 			}
1018 		}
1019 
1020 		if (connection->cstate <= C_DISCONNECTING)
1021 			goto out_release_sockets;
1022 		if (signal_pending(current)) {
1023 			flush_signals(current);
1024 			smp_rmb();
1025 			if (get_t_state(&connection->receiver) == EXITING)
1026 				goto out_release_sockets;
1027 		}
1028 
1029 		ok = connection_established(connection, &sock.socket, &msock.socket);
1030 	} while (!ok);
1031 
1032 	if (ad.s_listen)
1033 		sock_release(ad.s_listen);
1034 
1035 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037 
1038 	sock.socket->sk->sk_allocation = GFP_NOIO;
1039 	msock.socket->sk->sk_allocation = GFP_NOIO;
1040 
1041 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043 
1044 	/* NOT YET ...
1045 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047 	 * first set it to the P_CONNECTION_FEATURES timeout,
1048 	 * which we set to 4x the configured ping_timeout. */
1049 	rcu_read_lock();
1050 	nc = rcu_dereference(connection->net_conf);
1051 
1052 	sock.socket->sk->sk_sndtimeo =
1053 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054 
1055 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056 	timeout = nc->timeout * HZ / 10;
1057 	discard_my_data = nc->discard_my_data;
1058 	rcu_read_unlock();
1059 
1060 	msock.socket->sk->sk_sndtimeo = timeout;
1061 
1062 	/* we don't want delays.
1063 	 * we use TCP_CORK where appropriate, though */
1064 	drbd_tcp_nodelay(sock.socket);
1065 	drbd_tcp_nodelay(msock.socket);
1066 
1067 	connection->data.socket = sock.socket;
1068 	connection->meta.socket = msock.socket;
1069 	connection->last_received = jiffies;
1070 
1071 	h = drbd_do_features(connection);
1072 	if (h <= 0)
1073 		return h;
1074 
1075 	if (connection->cram_hmac_tfm) {
1076 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1077 		switch (drbd_do_auth(connection)) {
1078 		case -1:
1079 			drbd_err(connection, "Authentication of peer failed\n");
1080 			return -1;
1081 		case 0:
1082 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083 			return 0;
1084 		}
1085 	}
1086 
1087 	connection->data.socket->sk->sk_sndtimeo = timeout;
1088 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089 
1090 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091 		return -1;
1092 
1093 	/* Prevent a race between resync-handshake and
1094 	 * being promoted to Primary.
1095 	 *
1096 	 * Grab and release the state mutex, so we know that any current
1097 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1098 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1099 	 */
1100 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 		mutex_lock(peer_device->device->state_mutex);
1102 
1103 	set_bit(STATE_SENT, &connection->flags);
1104 
1105 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1106 		mutex_unlock(peer_device->device->state_mutex);
1107 
1108 	rcu_read_lock();
1109 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1110 		struct drbd_device *device = peer_device->device;
1111 		kref_get(&device->kref);
1112 		rcu_read_unlock();
1113 
1114 		if (discard_my_data)
1115 			set_bit(DISCARD_MY_DATA, &device->flags);
1116 		else
1117 			clear_bit(DISCARD_MY_DATA, &device->flags);
1118 
1119 		drbd_connected(peer_device);
1120 		kref_put(&device->kref, drbd_destroy_device);
1121 		rcu_read_lock();
1122 	}
1123 	rcu_read_unlock();
1124 
1125 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1126 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1127 		clear_bit(STATE_SENT, &connection->flags);
1128 		return 0;
1129 	}
1130 
1131 	drbd_thread_start(&connection->ack_receiver);
1132 	/* opencoded create_singlethread_workqueue(),
1133 	 * to be able to use format string arguments */
1134 	connection->ack_sender =
1135 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1136 	if (!connection->ack_sender) {
1137 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1138 		return 0;
1139 	}
1140 
1141 	mutex_lock(&connection->resource->conf_update);
1142 	/* The discard_my_data flag is a single-shot modifier to the next
1143 	 * connection attempt, the handshake of which is now well underway.
1144 	 * No need for rcu style copying of the whole struct
1145 	 * just to clear a single value. */
1146 	connection->net_conf->discard_my_data = 0;
1147 	mutex_unlock(&connection->resource->conf_update);
1148 
1149 	return h;
1150 
1151 out_release_sockets:
1152 	if (ad.s_listen)
1153 		sock_release(ad.s_listen);
1154 	if (sock.socket)
1155 		sock_release(sock.socket);
1156 	if (msock.socket)
1157 		sock_release(msock.socket);
1158 	return -1;
1159 }
1160 
1161 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1162 {
1163 	unsigned int header_size = drbd_header_size(connection);
1164 
1165 	if (header_size == sizeof(struct p_header100) &&
1166 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1167 		struct p_header100 *h = header;
1168 		if (h->pad != 0) {
1169 			drbd_err(connection, "Header padding is not zero\n");
1170 			return -EINVAL;
1171 		}
1172 		pi->vnr = be16_to_cpu(h->volume);
1173 		pi->cmd = be16_to_cpu(h->command);
1174 		pi->size = be32_to_cpu(h->length);
1175 	} else if (header_size == sizeof(struct p_header95) &&
1176 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1177 		struct p_header95 *h = header;
1178 		pi->cmd = be16_to_cpu(h->command);
1179 		pi->size = be32_to_cpu(h->length);
1180 		pi->vnr = 0;
1181 	} else if (header_size == sizeof(struct p_header80) &&
1182 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1183 		struct p_header80 *h = header;
1184 		pi->cmd = be16_to_cpu(h->command);
1185 		pi->size = be16_to_cpu(h->length);
1186 		pi->vnr = 0;
1187 	} else {
1188 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1189 			 be32_to_cpu(*(__be32 *)header),
1190 			 connection->agreed_pro_version);
1191 		return -EINVAL;
1192 	}
1193 	pi->data = header + header_size;
1194 	return 0;
1195 }
1196 
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198 {
1199 	void *buffer = connection->data.rbuf;
1200 	int err;
1201 
1202 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203 	if (err)
1204 		return err;
1205 
1206 	err = decode_header(connection, buffer, pi);
1207 	connection->last_received = jiffies;
1208 
1209 	return err;
1210 }
1211 
1212 /* This is blkdev_issue_flush, but asynchronous.
1213  * We want to submit to all component volumes in parallel,
1214  * then wait for all completions.
1215  */
1216 struct issue_flush_context {
1217 	atomic_t pending;
1218 	int error;
1219 	struct completion done;
1220 };
1221 struct one_flush_context {
1222 	struct drbd_device *device;
1223 	struct issue_flush_context *ctx;
1224 };
1225 
1226 void one_flush_endio(struct bio *bio)
1227 {
1228 	struct one_flush_context *octx = bio->bi_private;
1229 	struct drbd_device *device = octx->device;
1230 	struct issue_flush_context *ctx = octx->ctx;
1231 
1232 	if (bio->bi_status) {
1233 		ctx->error = blk_status_to_errno(bio->bi_status);
1234 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1235 	}
1236 	kfree(octx);
1237 	bio_put(bio);
1238 
1239 	clear_bit(FLUSH_PENDING, &device->flags);
1240 	put_ldev(device);
1241 	kref_put(&device->kref, drbd_destroy_device);
1242 
1243 	if (atomic_dec_and_test(&ctx->pending))
1244 		complete(&ctx->done);
1245 }
1246 
1247 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1248 {
1249 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1250 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1251 	if (!bio || !octx) {
1252 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1253 		/* FIXME: what else can I do now?  disconnecting or detaching
1254 		 * really does not help to improve the state of the world, either.
1255 		 */
1256 		kfree(octx);
1257 		if (bio)
1258 			bio_put(bio);
1259 
1260 		ctx->error = -ENOMEM;
1261 		put_ldev(device);
1262 		kref_put(&device->kref, drbd_destroy_device);
1263 		return;
1264 	}
1265 
1266 	octx->device = device;
1267 	octx->ctx = ctx;
1268 	bio->bi_bdev = device->ldev->backing_bdev;
1269 	bio->bi_private = octx;
1270 	bio->bi_end_io = one_flush_endio;
1271 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1272 
1273 	device->flush_jif = jiffies;
1274 	set_bit(FLUSH_PENDING, &device->flags);
1275 	atomic_inc(&ctx->pending);
1276 	submit_bio(bio);
1277 }
1278 
1279 static void drbd_flush(struct drbd_connection *connection)
1280 {
1281 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1282 		struct drbd_peer_device *peer_device;
1283 		struct issue_flush_context ctx;
1284 		int vnr;
1285 
1286 		atomic_set(&ctx.pending, 1);
1287 		ctx.error = 0;
1288 		init_completion(&ctx.done);
1289 
1290 		rcu_read_lock();
1291 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1292 			struct drbd_device *device = peer_device->device;
1293 
1294 			if (!get_ldev(device))
1295 				continue;
1296 			kref_get(&device->kref);
1297 			rcu_read_unlock();
1298 
1299 			submit_one_flush(device, &ctx);
1300 
1301 			rcu_read_lock();
1302 		}
1303 		rcu_read_unlock();
1304 
1305 		/* Do we want to add a timeout,
1306 		 * if disk-timeout is set? */
1307 		if (!atomic_dec_and_test(&ctx.pending))
1308 			wait_for_completion(&ctx.done);
1309 
1310 		if (ctx.error) {
1311 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1312 			 * don't try again for ANY return value != 0
1313 			 * if (rv == -EOPNOTSUPP) */
1314 			/* Any error is already reported by bio_endio callback. */
1315 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1316 		}
1317 	}
1318 }
1319 
1320 /**
1321  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1322  * @device:	DRBD device.
1323  * @epoch:	Epoch object.
1324  * @ev:		Epoch event.
1325  */
1326 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1327 					       struct drbd_epoch *epoch,
1328 					       enum epoch_event ev)
1329 {
1330 	int epoch_size;
1331 	struct drbd_epoch *next_epoch;
1332 	enum finish_epoch rv = FE_STILL_LIVE;
1333 
1334 	spin_lock(&connection->epoch_lock);
1335 	do {
1336 		next_epoch = NULL;
1337 
1338 		epoch_size = atomic_read(&epoch->epoch_size);
1339 
1340 		switch (ev & ~EV_CLEANUP) {
1341 		case EV_PUT:
1342 			atomic_dec(&epoch->active);
1343 			break;
1344 		case EV_GOT_BARRIER_NR:
1345 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1346 			break;
1347 		case EV_BECAME_LAST:
1348 			/* nothing to do*/
1349 			break;
1350 		}
1351 
1352 		if (epoch_size != 0 &&
1353 		    atomic_read(&epoch->active) == 0 &&
1354 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1355 			if (!(ev & EV_CLEANUP)) {
1356 				spin_unlock(&connection->epoch_lock);
1357 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1358 				spin_lock(&connection->epoch_lock);
1359 			}
1360 #if 0
1361 			/* FIXME: dec unacked on connection, once we have
1362 			 * something to count pending connection packets in. */
1363 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1364 				dec_unacked(epoch->connection);
1365 #endif
1366 
1367 			if (connection->current_epoch != epoch) {
1368 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1369 				list_del(&epoch->list);
1370 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1371 				connection->epochs--;
1372 				kfree(epoch);
1373 
1374 				if (rv == FE_STILL_LIVE)
1375 					rv = FE_DESTROYED;
1376 			} else {
1377 				epoch->flags = 0;
1378 				atomic_set(&epoch->epoch_size, 0);
1379 				/* atomic_set(&epoch->active, 0); is already zero */
1380 				if (rv == FE_STILL_LIVE)
1381 					rv = FE_RECYCLED;
1382 			}
1383 		}
1384 
1385 		if (!next_epoch)
1386 			break;
1387 
1388 		epoch = next_epoch;
1389 	} while (1);
1390 
1391 	spin_unlock(&connection->epoch_lock);
1392 
1393 	return rv;
1394 }
1395 
1396 static enum write_ordering_e
1397 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1398 {
1399 	struct disk_conf *dc;
1400 
1401 	dc = rcu_dereference(bdev->disk_conf);
1402 
1403 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1404 		wo = WO_DRAIN_IO;
1405 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1406 		wo = WO_NONE;
1407 
1408 	return wo;
1409 }
1410 
1411 /**
1412  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1413  * @connection:	DRBD connection.
1414  * @wo:		Write ordering method to try.
1415  */
1416 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1417 			      enum write_ordering_e wo)
1418 {
1419 	struct drbd_device *device;
1420 	enum write_ordering_e pwo;
1421 	int vnr;
1422 	static char *write_ordering_str[] = {
1423 		[WO_NONE] = "none",
1424 		[WO_DRAIN_IO] = "drain",
1425 		[WO_BDEV_FLUSH] = "flush",
1426 	};
1427 
1428 	pwo = resource->write_ordering;
1429 	if (wo != WO_BDEV_FLUSH)
1430 		wo = min(pwo, wo);
1431 	rcu_read_lock();
1432 	idr_for_each_entry(&resource->devices, device, vnr) {
1433 		if (get_ldev(device)) {
1434 			wo = max_allowed_wo(device->ldev, wo);
1435 			if (device->ldev == bdev)
1436 				bdev = NULL;
1437 			put_ldev(device);
1438 		}
1439 	}
1440 
1441 	if (bdev)
1442 		wo = max_allowed_wo(bdev, wo);
1443 
1444 	rcu_read_unlock();
1445 
1446 	resource->write_ordering = wo;
1447 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1448 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1449 }
1450 
1451 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1452 {
1453 	struct block_device *bdev = device->ldev->backing_bdev;
1454 
1455 	if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1456 			GFP_NOIO, 0))
1457 		peer_req->flags |= EE_WAS_ERROR;
1458 
1459 	drbd_endio_write_sec_final(peer_req);
1460 }
1461 
1462 static void drbd_issue_peer_wsame(struct drbd_device *device,
1463 				  struct drbd_peer_request *peer_req)
1464 {
1465 	struct block_device *bdev = device->ldev->backing_bdev;
1466 	sector_t s = peer_req->i.sector;
1467 	sector_t nr = peer_req->i.size >> 9;
1468 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1469 		peer_req->flags |= EE_WAS_ERROR;
1470 	drbd_endio_write_sec_final(peer_req);
1471 }
1472 
1473 
1474 /**
1475  * drbd_submit_peer_request()
1476  * @device:	DRBD device.
1477  * @peer_req:	peer request
1478  * @rw:		flag field, see bio->bi_opf
1479  *
1480  * May spread the pages to multiple bios,
1481  * depending on bio_add_page restrictions.
1482  *
1483  * Returns 0 if all bios have been submitted,
1484  * -ENOMEM if we could not allocate enough bios,
1485  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1486  *  single page to an empty bio (which should never happen and likely indicates
1487  *  that the lower level IO stack is in some way broken). This has been observed
1488  *  on certain Xen deployments.
1489  */
1490 /* TODO allocate from our own bio_set. */
1491 int drbd_submit_peer_request(struct drbd_device *device,
1492 			     struct drbd_peer_request *peer_req,
1493 			     const unsigned op, const unsigned op_flags,
1494 			     const int fault_type)
1495 {
1496 	struct bio *bios = NULL;
1497 	struct bio *bio;
1498 	struct page *page = peer_req->pages;
1499 	sector_t sector = peer_req->i.sector;
1500 	unsigned data_size = peer_req->i.size;
1501 	unsigned n_bios = 0;
1502 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1503 	int err = -ENOMEM;
1504 
1505 	/* TRIM/DISCARD: for now, always use the helper function
1506 	 * blkdev_issue_zeroout(..., discard=true).
1507 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1508 	 * Correctness first, performance later.  Next step is to code an
1509 	 * asynchronous variant of the same.
1510 	 */
1511 	if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1512 		/* wait for all pending IO completions, before we start
1513 		 * zeroing things out. */
1514 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1515 		/* add it to the active list now,
1516 		 * so we can find it to present it in debugfs */
1517 		peer_req->submit_jif = jiffies;
1518 		peer_req->flags |= EE_SUBMITTED;
1519 
1520 		/* If this was a resync request from receive_rs_deallocated(),
1521 		 * it is already on the sync_ee list */
1522 		if (list_empty(&peer_req->w.list)) {
1523 			spin_lock_irq(&device->resource->req_lock);
1524 			list_add_tail(&peer_req->w.list, &device->active_ee);
1525 			spin_unlock_irq(&device->resource->req_lock);
1526 		}
1527 
1528 		if (peer_req->flags & EE_IS_TRIM)
1529 			drbd_issue_peer_discard(device, peer_req);
1530 		else /* EE_WRITE_SAME */
1531 			drbd_issue_peer_wsame(device, peer_req);
1532 		return 0;
1533 	}
1534 
1535 	/* In most cases, we will only need one bio.  But in case the lower
1536 	 * level restrictions happen to be different at this offset on this
1537 	 * side than those of the sending peer, we may need to submit the
1538 	 * request in more than one bio.
1539 	 *
1540 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1541 	 * generated bio, but a bio allocated on behalf of the peer.
1542 	 */
1543 next_bio:
1544 	bio = bio_alloc(GFP_NOIO, nr_pages);
1545 	if (!bio) {
1546 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1547 		goto fail;
1548 	}
1549 	/* > peer_req->i.sector, unless this is the first bio */
1550 	bio->bi_iter.bi_sector = sector;
1551 	bio->bi_bdev = device->ldev->backing_bdev;
1552 	bio_set_op_attrs(bio, op, op_flags);
1553 	bio->bi_private = peer_req;
1554 	bio->bi_end_io = drbd_peer_request_endio;
1555 
1556 	bio->bi_next = bios;
1557 	bios = bio;
1558 	++n_bios;
1559 
1560 	page_chain_for_each(page) {
1561 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1562 		if (!bio_add_page(bio, page, len, 0))
1563 			goto next_bio;
1564 		data_size -= len;
1565 		sector += len >> 9;
1566 		--nr_pages;
1567 	}
1568 	D_ASSERT(device, data_size == 0);
1569 	D_ASSERT(device, page == NULL);
1570 
1571 	atomic_set(&peer_req->pending_bios, n_bios);
1572 	/* for debugfs: update timestamp, mark as submitted */
1573 	peer_req->submit_jif = jiffies;
1574 	peer_req->flags |= EE_SUBMITTED;
1575 	do {
1576 		bio = bios;
1577 		bios = bios->bi_next;
1578 		bio->bi_next = NULL;
1579 
1580 		drbd_generic_make_request(device, fault_type, bio);
1581 	} while (bios);
1582 	return 0;
1583 
1584 fail:
1585 	while (bios) {
1586 		bio = bios;
1587 		bios = bios->bi_next;
1588 		bio_put(bio);
1589 	}
1590 	return err;
1591 }
1592 
1593 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1594 					     struct drbd_peer_request *peer_req)
1595 {
1596 	struct drbd_interval *i = &peer_req->i;
1597 
1598 	drbd_remove_interval(&device->write_requests, i);
1599 	drbd_clear_interval(i);
1600 
1601 	/* Wake up any processes waiting for this peer request to complete.  */
1602 	if (i->waiting)
1603 		wake_up(&device->misc_wait);
1604 }
1605 
1606 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1607 {
1608 	struct drbd_peer_device *peer_device;
1609 	int vnr;
1610 
1611 	rcu_read_lock();
1612 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1613 		struct drbd_device *device = peer_device->device;
1614 
1615 		kref_get(&device->kref);
1616 		rcu_read_unlock();
1617 		drbd_wait_ee_list_empty(device, &device->active_ee);
1618 		kref_put(&device->kref, drbd_destroy_device);
1619 		rcu_read_lock();
1620 	}
1621 	rcu_read_unlock();
1622 }
1623 
1624 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1625 {
1626 	int rv;
1627 	struct p_barrier *p = pi->data;
1628 	struct drbd_epoch *epoch;
1629 
1630 	/* FIXME these are unacked on connection,
1631 	 * not a specific (peer)device.
1632 	 */
1633 	connection->current_epoch->barrier_nr = p->barrier;
1634 	connection->current_epoch->connection = connection;
1635 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1636 
1637 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1638 	 * the activity log, which means it would not be resynced in case the
1639 	 * R_PRIMARY crashes now.
1640 	 * Therefore we must send the barrier_ack after the barrier request was
1641 	 * completed. */
1642 	switch (connection->resource->write_ordering) {
1643 	case WO_NONE:
1644 		if (rv == FE_RECYCLED)
1645 			return 0;
1646 
1647 		/* receiver context, in the writeout path of the other node.
1648 		 * avoid potential distributed deadlock */
1649 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1650 		if (epoch)
1651 			break;
1652 		else
1653 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1654 			/* Fall through */
1655 
1656 	case WO_BDEV_FLUSH:
1657 	case WO_DRAIN_IO:
1658 		conn_wait_active_ee_empty(connection);
1659 		drbd_flush(connection);
1660 
1661 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1662 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1663 			if (epoch)
1664 				break;
1665 		}
1666 
1667 		return 0;
1668 	default:
1669 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1670 			 connection->resource->write_ordering);
1671 		return -EIO;
1672 	}
1673 
1674 	epoch->flags = 0;
1675 	atomic_set(&epoch->epoch_size, 0);
1676 	atomic_set(&epoch->active, 0);
1677 
1678 	spin_lock(&connection->epoch_lock);
1679 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1680 		list_add(&epoch->list, &connection->current_epoch->list);
1681 		connection->current_epoch = epoch;
1682 		connection->epochs++;
1683 	} else {
1684 		/* The current_epoch got recycled while we allocated this one... */
1685 		kfree(epoch);
1686 	}
1687 	spin_unlock(&connection->epoch_lock);
1688 
1689 	return 0;
1690 }
1691 
1692 /* quick wrapper in case payload size != request_size (write same) */
1693 static void drbd_csum_ee_size(struct crypto_ahash *h,
1694 			      struct drbd_peer_request *r, void *d,
1695 			      unsigned int payload_size)
1696 {
1697 	unsigned int tmp = r->i.size;
1698 	r->i.size = payload_size;
1699 	drbd_csum_ee(h, r, d);
1700 	r->i.size = tmp;
1701 }
1702 
1703 /* used from receive_RSDataReply (recv_resync_read)
1704  * and from receive_Data.
1705  * data_size: actual payload ("data in")
1706  * 	for normal writes that is bi_size.
1707  * 	for discards, that is zero.
1708  * 	for write same, it is logical_block_size.
1709  * both trim and write same have the bi_size ("data len to be affected")
1710  * as extra argument in the packet header.
1711  */
1712 static struct drbd_peer_request *
1713 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1714 	      struct packet_info *pi) __must_hold(local)
1715 {
1716 	struct drbd_device *device = peer_device->device;
1717 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1718 	struct drbd_peer_request *peer_req;
1719 	struct page *page;
1720 	int digest_size, err;
1721 	unsigned int data_size = pi->size, ds;
1722 	void *dig_in = peer_device->connection->int_dig_in;
1723 	void *dig_vv = peer_device->connection->int_dig_vv;
1724 	unsigned long *data;
1725 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1726 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1727 
1728 	digest_size = 0;
1729 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1730 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1731 		/*
1732 		 * FIXME: Receive the incoming digest into the receive buffer
1733 		 *	  here, together with its struct p_data?
1734 		 */
1735 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1736 		if (err)
1737 			return NULL;
1738 		data_size -= digest_size;
1739 	}
1740 
1741 	/* assume request_size == data_size, but special case trim and wsame. */
1742 	ds = data_size;
1743 	if (trim) {
1744 		if (!expect(data_size == 0))
1745 			return NULL;
1746 		ds = be32_to_cpu(trim->size);
1747 	} else if (wsame) {
1748 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1749 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1750 				data_size, queue_logical_block_size(device->rq_queue));
1751 			return NULL;
1752 		}
1753 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1754 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1755 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1756 			return NULL;
1757 		}
1758 		ds = be32_to_cpu(wsame->size);
1759 	}
1760 
1761 	if (!expect(IS_ALIGNED(ds, 512)))
1762 		return NULL;
1763 	if (trim || wsame) {
1764 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1765 			return NULL;
1766 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1767 		return NULL;
1768 
1769 	/* even though we trust out peer,
1770 	 * we sometimes have to double check. */
1771 	if (sector + (ds>>9) > capacity) {
1772 		drbd_err(device, "request from peer beyond end of local disk: "
1773 			"capacity: %llus < sector: %llus + size: %u\n",
1774 			(unsigned long long)capacity,
1775 			(unsigned long long)sector, ds);
1776 		return NULL;
1777 	}
1778 
1779 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1780 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1781 	 * which in turn might block on the other node at this very place.  */
1782 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1783 	if (!peer_req)
1784 		return NULL;
1785 
1786 	peer_req->flags |= EE_WRITE;
1787 	if (trim) {
1788 		peer_req->flags |= EE_IS_TRIM;
1789 		return peer_req;
1790 	}
1791 	if (wsame)
1792 		peer_req->flags |= EE_WRITE_SAME;
1793 
1794 	/* receive payload size bytes into page chain */
1795 	ds = data_size;
1796 	page = peer_req->pages;
1797 	page_chain_for_each(page) {
1798 		unsigned len = min_t(int, ds, PAGE_SIZE);
1799 		data = kmap(page);
1800 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1801 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1802 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1803 			data[0] = data[0] ^ (unsigned long)-1;
1804 		}
1805 		kunmap(page);
1806 		if (err) {
1807 			drbd_free_peer_req(device, peer_req);
1808 			return NULL;
1809 		}
1810 		ds -= len;
1811 	}
1812 
1813 	if (digest_size) {
1814 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1815 		if (memcmp(dig_in, dig_vv, digest_size)) {
1816 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1817 				(unsigned long long)sector, data_size);
1818 			drbd_free_peer_req(device, peer_req);
1819 			return NULL;
1820 		}
1821 	}
1822 	device->recv_cnt += data_size >> 9;
1823 	return peer_req;
1824 }
1825 
1826 /* drbd_drain_block() just takes a data block
1827  * out of the socket input buffer, and discards it.
1828  */
1829 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1830 {
1831 	struct page *page;
1832 	int err = 0;
1833 	void *data;
1834 
1835 	if (!data_size)
1836 		return 0;
1837 
1838 	page = drbd_alloc_pages(peer_device, 1, 1);
1839 
1840 	data = kmap(page);
1841 	while (data_size) {
1842 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1843 
1844 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1845 		if (err)
1846 			break;
1847 		data_size -= len;
1848 	}
1849 	kunmap(page);
1850 	drbd_free_pages(peer_device->device, page, 0);
1851 	return err;
1852 }
1853 
1854 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1855 			   sector_t sector, int data_size)
1856 {
1857 	struct bio_vec bvec;
1858 	struct bvec_iter iter;
1859 	struct bio *bio;
1860 	int digest_size, err, expect;
1861 	void *dig_in = peer_device->connection->int_dig_in;
1862 	void *dig_vv = peer_device->connection->int_dig_vv;
1863 
1864 	digest_size = 0;
1865 	if (peer_device->connection->peer_integrity_tfm) {
1866 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1867 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1868 		if (err)
1869 			return err;
1870 		data_size -= digest_size;
1871 	}
1872 
1873 	/* optimistically update recv_cnt.  if receiving fails below,
1874 	 * we disconnect anyways, and counters will be reset. */
1875 	peer_device->device->recv_cnt += data_size>>9;
1876 
1877 	bio = req->master_bio;
1878 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1879 
1880 	bio_for_each_segment(bvec, bio, iter) {
1881 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1882 		expect = min_t(int, data_size, bvec.bv_len);
1883 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1884 		kunmap(bvec.bv_page);
1885 		if (err)
1886 			return err;
1887 		data_size -= expect;
1888 	}
1889 
1890 	if (digest_size) {
1891 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1892 		if (memcmp(dig_in, dig_vv, digest_size)) {
1893 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1894 			return -EINVAL;
1895 		}
1896 	}
1897 
1898 	D_ASSERT(peer_device->device, data_size == 0);
1899 	return 0;
1900 }
1901 
1902 /*
1903  * e_end_resync_block() is called in ack_sender context via
1904  * drbd_finish_peer_reqs().
1905  */
1906 static int e_end_resync_block(struct drbd_work *w, int unused)
1907 {
1908 	struct drbd_peer_request *peer_req =
1909 		container_of(w, struct drbd_peer_request, w);
1910 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1911 	struct drbd_device *device = peer_device->device;
1912 	sector_t sector = peer_req->i.sector;
1913 	int err;
1914 
1915 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1916 
1917 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1918 		drbd_set_in_sync(device, sector, peer_req->i.size);
1919 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1920 	} else {
1921 		/* Record failure to sync */
1922 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1923 
1924 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1925 	}
1926 	dec_unacked(device);
1927 
1928 	return err;
1929 }
1930 
1931 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1932 			    struct packet_info *pi) __releases(local)
1933 {
1934 	struct drbd_device *device = peer_device->device;
1935 	struct drbd_peer_request *peer_req;
1936 
1937 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1938 	if (!peer_req)
1939 		goto fail;
1940 
1941 	dec_rs_pending(device);
1942 
1943 	inc_unacked(device);
1944 	/* corresponding dec_unacked() in e_end_resync_block()
1945 	 * respective _drbd_clear_done_ee */
1946 
1947 	peer_req->w.cb = e_end_resync_block;
1948 	peer_req->submit_jif = jiffies;
1949 
1950 	spin_lock_irq(&device->resource->req_lock);
1951 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1952 	spin_unlock_irq(&device->resource->req_lock);
1953 
1954 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1955 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1956 				     DRBD_FAULT_RS_WR) == 0)
1957 		return 0;
1958 
1959 	/* don't care for the reason here */
1960 	drbd_err(device, "submit failed, triggering re-connect\n");
1961 	spin_lock_irq(&device->resource->req_lock);
1962 	list_del(&peer_req->w.list);
1963 	spin_unlock_irq(&device->resource->req_lock);
1964 
1965 	drbd_free_peer_req(device, peer_req);
1966 fail:
1967 	put_ldev(device);
1968 	return -EIO;
1969 }
1970 
1971 static struct drbd_request *
1972 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1973 	     sector_t sector, bool missing_ok, const char *func)
1974 {
1975 	struct drbd_request *req;
1976 
1977 	/* Request object according to our peer */
1978 	req = (struct drbd_request *)(unsigned long)id;
1979 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1980 		return req;
1981 	if (!missing_ok) {
1982 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1983 			(unsigned long)id, (unsigned long long)sector);
1984 	}
1985 	return NULL;
1986 }
1987 
1988 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1989 {
1990 	struct drbd_peer_device *peer_device;
1991 	struct drbd_device *device;
1992 	struct drbd_request *req;
1993 	sector_t sector;
1994 	int err;
1995 	struct p_data *p = pi->data;
1996 
1997 	peer_device = conn_peer_device(connection, pi->vnr);
1998 	if (!peer_device)
1999 		return -EIO;
2000 	device = peer_device->device;
2001 
2002 	sector = be64_to_cpu(p->sector);
2003 
2004 	spin_lock_irq(&device->resource->req_lock);
2005 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2006 	spin_unlock_irq(&device->resource->req_lock);
2007 	if (unlikely(!req))
2008 		return -EIO;
2009 
2010 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2011 	 * special casing it there for the various failure cases.
2012 	 * still no race with drbd_fail_pending_reads */
2013 	err = recv_dless_read(peer_device, req, sector, pi->size);
2014 	if (!err)
2015 		req_mod(req, DATA_RECEIVED);
2016 	/* else: nothing. handled from drbd_disconnect...
2017 	 * I don't think we may complete this just yet
2018 	 * in case we are "on-disconnect: freeze" */
2019 
2020 	return err;
2021 }
2022 
2023 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2024 {
2025 	struct drbd_peer_device *peer_device;
2026 	struct drbd_device *device;
2027 	sector_t sector;
2028 	int err;
2029 	struct p_data *p = pi->data;
2030 
2031 	peer_device = conn_peer_device(connection, pi->vnr);
2032 	if (!peer_device)
2033 		return -EIO;
2034 	device = peer_device->device;
2035 
2036 	sector = be64_to_cpu(p->sector);
2037 	D_ASSERT(device, p->block_id == ID_SYNCER);
2038 
2039 	if (get_ldev(device)) {
2040 		/* data is submitted to disk within recv_resync_read.
2041 		 * corresponding put_ldev done below on error,
2042 		 * or in drbd_peer_request_endio. */
2043 		err = recv_resync_read(peer_device, sector, pi);
2044 	} else {
2045 		if (__ratelimit(&drbd_ratelimit_state))
2046 			drbd_err(device, "Can not write resync data to local disk.\n");
2047 
2048 		err = drbd_drain_block(peer_device, pi->size);
2049 
2050 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2051 	}
2052 
2053 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2054 
2055 	return err;
2056 }
2057 
2058 static void restart_conflicting_writes(struct drbd_device *device,
2059 				       sector_t sector, int size)
2060 {
2061 	struct drbd_interval *i;
2062 	struct drbd_request *req;
2063 
2064 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2065 		if (!i->local)
2066 			continue;
2067 		req = container_of(i, struct drbd_request, i);
2068 		if (req->rq_state & RQ_LOCAL_PENDING ||
2069 		    !(req->rq_state & RQ_POSTPONED))
2070 			continue;
2071 		/* as it is RQ_POSTPONED, this will cause it to
2072 		 * be queued on the retry workqueue. */
2073 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2074 	}
2075 }
2076 
2077 /*
2078  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2079  */
2080 static int e_end_block(struct drbd_work *w, int cancel)
2081 {
2082 	struct drbd_peer_request *peer_req =
2083 		container_of(w, struct drbd_peer_request, w);
2084 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2085 	struct drbd_device *device = peer_device->device;
2086 	sector_t sector = peer_req->i.sector;
2087 	int err = 0, pcmd;
2088 
2089 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2090 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2091 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2092 				device->state.conn <= C_PAUSED_SYNC_T &&
2093 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2094 				P_RS_WRITE_ACK : P_WRITE_ACK;
2095 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2096 			if (pcmd == P_RS_WRITE_ACK)
2097 				drbd_set_in_sync(device, sector, peer_req->i.size);
2098 		} else {
2099 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2100 			/* we expect it to be marked out of sync anyways...
2101 			 * maybe assert this?  */
2102 		}
2103 		dec_unacked(device);
2104 	}
2105 
2106 	/* we delete from the conflict detection hash _after_ we sent out the
2107 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2108 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2109 		spin_lock_irq(&device->resource->req_lock);
2110 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2111 		drbd_remove_epoch_entry_interval(device, peer_req);
2112 		if (peer_req->flags & EE_RESTART_REQUESTS)
2113 			restart_conflicting_writes(device, sector, peer_req->i.size);
2114 		spin_unlock_irq(&device->resource->req_lock);
2115 	} else
2116 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2117 
2118 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2119 
2120 	return err;
2121 }
2122 
2123 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2124 {
2125 	struct drbd_peer_request *peer_req =
2126 		container_of(w, struct drbd_peer_request, w);
2127 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2128 	int err;
2129 
2130 	err = drbd_send_ack(peer_device, ack, peer_req);
2131 	dec_unacked(peer_device->device);
2132 
2133 	return err;
2134 }
2135 
2136 static int e_send_superseded(struct drbd_work *w, int unused)
2137 {
2138 	return e_send_ack(w, P_SUPERSEDED);
2139 }
2140 
2141 static int e_send_retry_write(struct drbd_work *w, int unused)
2142 {
2143 	struct drbd_peer_request *peer_req =
2144 		container_of(w, struct drbd_peer_request, w);
2145 	struct drbd_connection *connection = peer_req->peer_device->connection;
2146 
2147 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2148 			     P_RETRY_WRITE : P_SUPERSEDED);
2149 }
2150 
2151 static bool seq_greater(u32 a, u32 b)
2152 {
2153 	/*
2154 	 * We assume 32-bit wrap-around here.
2155 	 * For 24-bit wrap-around, we would have to shift:
2156 	 *  a <<= 8; b <<= 8;
2157 	 */
2158 	return (s32)a - (s32)b > 0;
2159 }
2160 
2161 static u32 seq_max(u32 a, u32 b)
2162 {
2163 	return seq_greater(a, b) ? a : b;
2164 }
2165 
2166 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2167 {
2168 	struct drbd_device *device = peer_device->device;
2169 	unsigned int newest_peer_seq;
2170 
2171 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2172 		spin_lock(&device->peer_seq_lock);
2173 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2174 		device->peer_seq = newest_peer_seq;
2175 		spin_unlock(&device->peer_seq_lock);
2176 		/* wake up only if we actually changed device->peer_seq */
2177 		if (peer_seq == newest_peer_seq)
2178 			wake_up(&device->seq_wait);
2179 	}
2180 }
2181 
2182 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2183 {
2184 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2185 }
2186 
2187 /* maybe change sync_ee into interval trees as well? */
2188 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2189 {
2190 	struct drbd_peer_request *rs_req;
2191 	bool rv = false;
2192 
2193 	spin_lock_irq(&device->resource->req_lock);
2194 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2195 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2196 			     rs_req->i.sector, rs_req->i.size)) {
2197 			rv = true;
2198 			break;
2199 		}
2200 	}
2201 	spin_unlock_irq(&device->resource->req_lock);
2202 
2203 	return rv;
2204 }
2205 
2206 /* Called from receive_Data.
2207  * Synchronize packets on sock with packets on msock.
2208  *
2209  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2210  * packet traveling on msock, they are still processed in the order they have
2211  * been sent.
2212  *
2213  * Note: we don't care for Ack packets overtaking P_DATA packets.
2214  *
2215  * In case packet_seq is larger than device->peer_seq number, there are
2216  * outstanding packets on the msock. We wait for them to arrive.
2217  * In case we are the logically next packet, we update device->peer_seq
2218  * ourselves. Correctly handles 32bit wrap around.
2219  *
2220  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2221  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2222  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2223  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2224  *
2225  * returns 0 if we may process the packet,
2226  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2227 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2228 {
2229 	struct drbd_device *device = peer_device->device;
2230 	DEFINE_WAIT(wait);
2231 	long timeout;
2232 	int ret = 0, tp;
2233 
2234 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2235 		return 0;
2236 
2237 	spin_lock(&device->peer_seq_lock);
2238 	for (;;) {
2239 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2240 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2241 			break;
2242 		}
2243 
2244 		if (signal_pending(current)) {
2245 			ret = -ERESTARTSYS;
2246 			break;
2247 		}
2248 
2249 		rcu_read_lock();
2250 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2251 		rcu_read_unlock();
2252 
2253 		if (!tp)
2254 			break;
2255 
2256 		/* Only need to wait if two_primaries is enabled */
2257 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2258 		spin_unlock(&device->peer_seq_lock);
2259 		rcu_read_lock();
2260 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2261 		rcu_read_unlock();
2262 		timeout = schedule_timeout(timeout);
2263 		spin_lock(&device->peer_seq_lock);
2264 		if (!timeout) {
2265 			ret = -ETIMEDOUT;
2266 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2267 			break;
2268 		}
2269 	}
2270 	spin_unlock(&device->peer_seq_lock);
2271 	finish_wait(&device->seq_wait, &wait);
2272 	return ret;
2273 }
2274 
2275 /* see also bio_flags_to_wire()
2276  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2277  * flags and back. We may replicate to other kernel versions. */
2278 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2279 {
2280 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2281 		(dpf & DP_FUA ? REQ_FUA : 0) |
2282 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2283 }
2284 
2285 static unsigned long wire_flags_to_bio_op(u32 dpf)
2286 {
2287 	if (dpf & DP_DISCARD)
2288 		return REQ_OP_WRITE_ZEROES;
2289 	else
2290 		return REQ_OP_WRITE;
2291 }
2292 
2293 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2294 				    unsigned int size)
2295 {
2296 	struct drbd_interval *i;
2297 
2298     repeat:
2299 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2300 		struct drbd_request *req;
2301 		struct bio_and_error m;
2302 
2303 		if (!i->local)
2304 			continue;
2305 		req = container_of(i, struct drbd_request, i);
2306 		if (!(req->rq_state & RQ_POSTPONED))
2307 			continue;
2308 		req->rq_state &= ~RQ_POSTPONED;
2309 		__req_mod(req, NEG_ACKED, &m);
2310 		spin_unlock_irq(&device->resource->req_lock);
2311 		if (m.bio)
2312 			complete_master_bio(device, &m);
2313 		spin_lock_irq(&device->resource->req_lock);
2314 		goto repeat;
2315 	}
2316 }
2317 
2318 static int handle_write_conflicts(struct drbd_device *device,
2319 				  struct drbd_peer_request *peer_req)
2320 {
2321 	struct drbd_connection *connection = peer_req->peer_device->connection;
2322 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2323 	sector_t sector = peer_req->i.sector;
2324 	const unsigned int size = peer_req->i.size;
2325 	struct drbd_interval *i;
2326 	bool equal;
2327 	int err;
2328 
2329 	/*
2330 	 * Inserting the peer request into the write_requests tree will prevent
2331 	 * new conflicting local requests from being added.
2332 	 */
2333 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2334 
2335     repeat:
2336 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2337 		if (i == &peer_req->i)
2338 			continue;
2339 		if (i->completed)
2340 			continue;
2341 
2342 		if (!i->local) {
2343 			/*
2344 			 * Our peer has sent a conflicting remote request; this
2345 			 * should not happen in a two-node setup.  Wait for the
2346 			 * earlier peer request to complete.
2347 			 */
2348 			err = drbd_wait_misc(device, i);
2349 			if (err)
2350 				goto out;
2351 			goto repeat;
2352 		}
2353 
2354 		equal = i->sector == sector && i->size == size;
2355 		if (resolve_conflicts) {
2356 			/*
2357 			 * If the peer request is fully contained within the
2358 			 * overlapping request, it can be considered overwritten
2359 			 * and thus superseded; otherwise, it will be retried
2360 			 * once all overlapping requests have completed.
2361 			 */
2362 			bool superseded = i->sector <= sector && i->sector +
2363 				       (i->size >> 9) >= sector + (size >> 9);
2364 
2365 			if (!equal)
2366 				drbd_alert(device, "Concurrent writes detected: "
2367 					       "local=%llus +%u, remote=%llus +%u, "
2368 					       "assuming %s came first\n",
2369 					  (unsigned long long)i->sector, i->size,
2370 					  (unsigned long long)sector, size,
2371 					  superseded ? "local" : "remote");
2372 
2373 			peer_req->w.cb = superseded ? e_send_superseded :
2374 						   e_send_retry_write;
2375 			list_add_tail(&peer_req->w.list, &device->done_ee);
2376 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2377 
2378 			err = -ENOENT;
2379 			goto out;
2380 		} else {
2381 			struct drbd_request *req =
2382 				container_of(i, struct drbd_request, i);
2383 
2384 			if (!equal)
2385 				drbd_alert(device, "Concurrent writes detected: "
2386 					       "local=%llus +%u, remote=%llus +%u\n",
2387 					  (unsigned long long)i->sector, i->size,
2388 					  (unsigned long long)sector, size);
2389 
2390 			if (req->rq_state & RQ_LOCAL_PENDING ||
2391 			    !(req->rq_state & RQ_POSTPONED)) {
2392 				/*
2393 				 * Wait for the node with the discard flag to
2394 				 * decide if this request has been superseded
2395 				 * or needs to be retried.
2396 				 * Requests that have been superseded will
2397 				 * disappear from the write_requests tree.
2398 				 *
2399 				 * In addition, wait for the conflicting
2400 				 * request to finish locally before submitting
2401 				 * the conflicting peer request.
2402 				 */
2403 				err = drbd_wait_misc(device, &req->i);
2404 				if (err) {
2405 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2406 					fail_postponed_requests(device, sector, size);
2407 					goto out;
2408 				}
2409 				goto repeat;
2410 			}
2411 			/*
2412 			 * Remember to restart the conflicting requests after
2413 			 * the new peer request has completed.
2414 			 */
2415 			peer_req->flags |= EE_RESTART_REQUESTS;
2416 		}
2417 	}
2418 	err = 0;
2419 
2420     out:
2421 	if (err)
2422 		drbd_remove_epoch_entry_interval(device, peer_req);
2423 	return err;
2424 }
2425 
2426 /* mirrored write */
2427 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2428 {
2429 	struct drbd_peer_device *peer_device;
2430 	struct drbd_device *device;
2431 	struct net_conf *nc;
2432 	sector_t sector;
2433 	struct drbd_peer_request *peer_req;
2434 	struct p_data *p = pi->data;
2435 	u32 peer_seq = be32_to_cpu(p->seq_num);
2436 	int op, op_flags;
2437 	u32 dp_flags;
2438 	int err, tp;
2439 
2440 	peer_device = conn_peer_device(connection, pi->vnr);
2441 	if (!peer_device)
2442 		return -EIO;
2443 	device = peer_device->device;
2444 
2445 	if (!get_ldev(device)) {
2446 		int err2;
2447 
2448 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2449 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2450 		atomic_inc(&connection->current_epoch->epoch_size);
2451 		err2 = drbd_drain_block(peer_device, pi->size);
2452 		if (!err)
2453 			err = err2;
2454 		return err;
2455 	}
2456 
2457 	/*
2458 	 * Corresponding put_ldev done either below (on various errors), or in
2459 	 * drbd_peer_request_endio, if we successfully submit the data at the
2460 	 * end of this function.
2461 	 */
2462 
2463 	sector = be64_to_cpu(p->sector);
2464 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2465 	if (!peer_req) {
2466 		put_ldev(device);
2467 		return -EIO;
2468 	}
2469 
2470 	peer_req->w.cb = e_end_block;
2471 	peer_req->submit_jif = jiffies;
2472 	peer_req->flags |= EE_APPLICATION;
2473 
2474 	dp_flags = be32_to_cpu(p->dp_flags);
2475 	op = wire_flags_to_bio_op(dp_flags);
2476 	op_flags = wire_flags_to_bio_flags(dp_flags);
2477 	if (pi->cmd == P_TRIM) {
2478 		D_ASSERT(peer_device, peer_req->i.size > 0);
2479 		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2480 		D_ASSERT(peer_device, peer_req->pages == NULL);
2481 	} else if (peer_req->pages == NULL) {
2482 		D_ASSERT(device, peer_req->i.size == 0);
2483 		D_ASSERT(device, dp_flags & DP_FLUSH);
2484 	}
2485 
2486 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2487 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2488 
2489 	spin_lock(&connection->epoch_lock);
2490 	peer_req->epoch = connection->current_epoch;
2491 	atomic_inc(&peer_req->epoch->epoch_size);
2492 	atomic_inc(&peer_req->epoch->active);
2493 	spin_unlock(&connection->epoch_lock);
2494 
2495 	rcu_read_lock();
2496 	nc = rcu_dereference(peer_device->connection->net_conf);
2497 	tp = nc->two_primaries;
2498 	if (peer_device->connection->agreed_pro_version < 100) {
2499 		switch (nc->wire_protocol) {
2500 		case DRBD_PROT_C:
2501 			dp_flags |= DP_SEND_WRITE_ACK;
2502 			break;
2503 		case DRBD_PROT_B:
2504 			dp_flags |= DP_SEND_RECEIVE_ACK;
2505 			break;
2506 		}
2507 	}
2508 	rcu_read_unlock();
2509 
2510 	if (dp_flags & DP_SEND_WRITE_ACK) {
2511 		peer_req->flags |= EE_SEND_WRITE_ACK;
2512 		inc_unacked(device);
2513 		/* corresponding dec_unacked() in e_end_block()
2514 		 * respective _drbd_clear_done_ee */
2515 	}
2516 
2517 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2518 		/* I really don't like it that the receiver thread
2519 		 * sends on the msock, but anyways */
2520 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2521 	}
2522 
2523 	if (tp) {
2524 		/* two primaries implies protocol C */
2525 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2526 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2527 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2528 		if (err)
2529 			goto out_interrupted;
2530 		spin_lock_irq(&device->resource->req_lock);
2531 		err = handle_write_conflicts(device, peer_req);
2532 		if (err) {
2533 			spin_unlock_irq(&device->resource->req_lock);
2534 			if (err == -ENOENT) {
2535 				put_ldev(device);
2536 				return 0;
2537 			}
2538 			goto out_interrupted;
2539 		}
2540 	} else {
2541 		update_peer_seq(peer_device, peer_seq);
2542 		spin_lock_irq(&device->resource->req_lock);
2543 	}
2544 	/* TRIM and WRITE_SAME are processed synchronously,
2545 	 * we wait for all pending requests, respectively wait for
2546 	 * active_ee to become empty in drbd_submit_peer_request();
2547 	 * better not add ourselves here. */
2548 	if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2549 		list_add_tail(&peer_req->w.list, &device->active_ee);
2550 	spin_unlock_irq(&device->resource->req_lock);
2551 
2552 	if (device->state.conn == C_SYNC_TARGET)
2553 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2554 
2555 	if (device->state.pdsk < D_INCONSISTENT) {
2556 		/* In case we have the only disk of the cluster, */
2557 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2558 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2559 		drbd_al_begin_io(device, &peer_req->i);
2560 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2561 	}
2562 
2563 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2564 				       DRBD_FAULT_DT_WR);
2565 	if (!err)
2566 		return 0;
2567 
2568 	/* don't care for the reason here */
2569 	drbd_err(device, "submit failed, triggering re-connect\n");
2570 	spin_lock_irq(&device->resource->req_lock);
2571 	list_del(&peer_req->w.list);
2572 	drbd_remove_epoch_entry_interval(device, peer_req);
2573 	spin_unlock_irq(&device->resource->req_lock);
2574 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2575 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2576 		drbd_al_complete_io(device, &peer_req->i);
2577 	}
2578 
2579 out_interrupted:
2580 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2581 	put_ldev(device);
2582 	drbd_free_peer_req(device, peer_req);
2583 	return err;
2584 }
2585 
2586 /* We may throttle resync, if the lower device seems to be busy,
2587  * and current sync rate is above c_min_rate.
2588  *
2589  * To decide whether or not the lower device is busy, we use a scheme similar
2590  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2591  * (more than 64 sectors) of activity we cannot account for with our own resync
2592  * activity, it obviously is "busy".
2593  *
2594  * The current sync rate used here uses only the most recent two step marks,
2595  * to have a short time average so we can react faster.
2596  */
2597 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2598 		bool throttle_if_app_is_waiting)
2599 {
2600 	struct lc_element *tmp;
2601 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2602 
2603 	if (!throttle || throttle_if_app_is_waiting)
2604 		return throttle;
2605 
2606 	spin_lock_irq(&device->al_lock);
2607 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2608 	if (tmp) {
2609 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2610 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2611 			throttle = false;
2612 		/* Do not slow down if app IO is already waiting for this extent,
2613 		 * and our progress is necessary for application IO to complete. */
2614 	}
2615 	spin_unlock_irq(&device->al_lock);
2616 
2617 	return throttle;
2618 }
2619 
2620 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2621 {
2622 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2623 	unsigned long db, dt, dbdt;
2624 	unsigned int c_min_rate;
2625 	int curr_events;
2626 
2627 	rcu_read_lock();
2628 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2629 	rcu_read_unlock();
2630 
2631 	/* feature disabled? */
2632 	if (c_min_rate == 0)
2633 		return false;
2634 
2635 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2636 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2637 			atomic_read(&device->rs_sect_ev);
2638 
2639 	if (atomic_read(&device->ap_actlog_cnt)
2640 	    || curr_events - device->rs_last_events > 64) {
2641 		unsigned long rs_left;
2642 		int i;
2643 
2644 		device->rs_last_events = curr_events;
2645 
2646 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2647 		 * approx. */
2648 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2649 
2650 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2651 			rs_left = device->ov_left;
2652 		else
2653 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2654 
2655 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2656 		if (!dt)
2657 			dt++;
2658 		db = device->rs_mark_left[i] - rs_left;
2659 		dbdt = Bit2KB(db/dt);
2660 
2661 		if (dbdt > c_min_rate)
2662 			return true;
2663 	}
2664 	return false;
2665 }
2666 
2667 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2668 {
2669 	struct drbd_peer_device *peer_device;
2670 	struct drbd_device *device;
2671 	sector_t sector;
2672 	sector_t capacity;
2673 	struct drbd_peer_request *peer_req;
2674 	struct digest_info *di = NULL;
2675 	int size, verb;
2676 	unsigned int fault_type;
2677 	struct p_block_req *p =	pi->data;
2678 
2679 	peer_device = conn_peer_device(connection, pi->vnr);
2680 	if (!peer_device)
2681 		return -EIO;
2682 	device = peer_device->device;
2683 	capacity = drbd_get_capacity(device->this_bdev);
2684 
2685 	sector = be64_to_cpu(p->sector);
2686 	size   = be32_to_cpu(p->blksize);
2687 
2688 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2689 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2690 				(unsigned long long)sector, size);
2691 		return -EINVAL;
2692 	}
2693 	if (sector + (size>>9) > capacity) {
2694 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2695 				(unsigned long long)sector, size);
2696 		return -EINVAL;
2697 	}
2698 
2699 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2700 		verb = 1;
2701 		switch (pi->cmd) {
2702 		case P_DATA_REQUEST:
2703 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2704 			break;
2705 		case P_RS_THIN_REQ:
2706 		case P_RS_DATA_REQUEST:
2707 		case P_CSUM_RS_REQUEST:
2708 		case P_OV_REQUEST:
2709 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2710 			break;
2711 		case P_OV_REPLY:
2712 			verb = 0;
2713 			dec_rs_pending(device);
2714 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2715 			break;
2716 		default:
2717 			BUG();
2718 		}
2719 		if (verb && __ratelimit(&drbd_ratelimit_state))
2720 			drbd_err(device, "Can not satisfy peer's read request, "
2721 			    "no local data.\n");
2722 
2723 		/* drain possibly payload */
2724 		return drbd_drain_block(peer_device, pi->size);
2725 	}
2726 
2727 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2728 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2729 	 * which in turn might block on the other node at this very place.  */
2730 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2731 			size, GFP_NOIO);
2732 	if (!peer_req) {
2733 		put_ldev(device);
2734 		return -ENOMEM;
2735 	}
2736 
2737 	switch (pi->cmd) {
2738 	case P_DATA_REQUEST:
2739 		peer_req->w.cb = w_e_end_data_req;
2740 		fault_type = DRBD_FAULT_DT_RD;
2741 		/* application IO, don't drbd_rs_begin_io */
2742 		peer_req->flags |= EE_APPLICATION;
2743 		goto submit;
2744 
2745 	case P_RS_THIN_REQ:
2746 		/* If at some point in the future we have a smart way to
2747 		   find out if this data block is completely deallocated,
2748 		   then we would do something smarter here than reading
2749 		   the block... */
2750 		peer_req->flags |= EE_RS_THIN_REQ;
2751 	case P_RS_DATA_REQUEST:
2752 		peer_req->w.cb = w_e_end_rsdata_req;
2753 		fault_type = DRBD_FAULT_RS_RD;
2754 		/* used in the sector offset progress display */
2755 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2756 		break;
2757 
2758 	case P_OV_REPLY:
2759 	case P_CSUM_RS_REQUEST:
2760 		fault_type = DRBD_FAULT_RS_RD;
2761 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2762 		if (!di)
2763 			goto out_free_e;
2764 
2765 		di->digest_size = pi->size;
2766 		di->digest = (((char *)di)+sizeof(struct digest_info));
2767 
2768 		peer_req->digest = di;
2769 		peer_req->flags |= EE_HAS_DIGEST;
2770 
2771 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2772 			goto out_free_e;
2773 
2774 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2775 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2776 			peer_req->w.cb = w_e_end_csum_rs_req;
2777 			/* used in the sector offset progress display */
2778 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2779 			/* remember to report stats in drbd_resync_finished */
2780 			device->use_csums = true;
2781 		} else if (pi->cmd == P_OV_REPLY) {
2782 			/* track progress, we may need to throttle */
2783 			atomic_add(size >> 9, &device->rs_sect_in);
2784 			peer_req->w.cb = w_e_end_ov_reply;
2785 			dec_rs_pending(device);
2786 			/* drbd_rs_begin_io done when we sent this request,
2787 			 * but accounting still needs to be done. */
2788 			goto submit_for_resync;
2789 		}
2790 		break;
2791 
2792 	case P_OV_REQUEST:
2793 		if (device->ov_start_sector == ~(sector_t)0 &&
2794 		    peer_device->connection->agreed_pro_version >= 90) {
2795 			unsigned long now = jiffies;
2796 			int i;
2797 			device->ov_start_sector = sector;
2798 			device->ov_position = sector;
2799 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2800 			device->rs_total = device->ov_left;
2801 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2802 				device->rs_mark_left[i] = device->ov_left;
2803 				device->rs_mark_time[i] = now;
2804 			}
2805 			drbd_info(device, "Online Verify start sector: %llu\n",
2806 					(unsigned long long)sector);
2807 		}
2808 		peer_req->w.cb = w_e_end_ov_req;
2809 		fault_type = DRBD_FAULT_RS_RD;
2810 		break;
2811 
2812 	default:
2813 		BUG();
2814 	}
2815 
2816 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2817 	 * wrt the receiver, but it is not as straightforward as it may seem.
2818 	 * Various places in the resync start and stop logic assume resync
2819 	 * requests are processed in order, requeuing this on the worker thread
2820 	 * introduces a bunch of new code for synchronization between threads.
2821 	 *
2822 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2823 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2824 	 * for application writes for the same time.  For now, just throttle
2825 	 * here, where the rest of the code expects the receiver to sleep for
2826 	 * a while, anyways.
2827 	 */
2828 
2829 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2830 	 * this defers syncer requests for some time, before letting at least
2831 	 * on request through.  The resync controller on the receiving side
2832 	 * will adapt to the incoming rate accordingly.
2833 	 *
2834 	 * We cannot throttle here if remote is Primary/SyncTarget:
2835 	 * we would also throttle its application reads.
2836 	 * In that case, throttling is done on the SyncTarget only.
2837 	 */
2838 
2839 	/* Even though this may be a resync request, we do add to "read_ee";
2840 	 * "sync_ee" is only used for resync WRITEs.
2841 	 * Add to list early, so debugfs can find this request
2842 	 * even if we have to sleep below. */
2843 	spin_lock_irq(&device->resource->req_lock);
2844 	list_add_tail(&peer_req->w.list, &device->read_ee);
2845 	spin_unlock_irq(&device->resource->req_lock);
2846 
2847 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2848 	if (device->state.peer != R_PRIMARY
2849 	&& drbd_rs_should_slow_down(device, sector, false))
2850 		schedule_timeout_uninterruptible(HZ/10);
2851 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2852 	if (drbd_rs_begin_io(device, sector))
2853 		goto out_free_e;
2854 
2855 submit_for_resync:
2856 	atomic_add(size >> 9, &device->rs_sect_ev);
2857 
2858 submit:
2859 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2860 	inc_unacked(device);
2861 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2862 				     fault_type) == 0)
2863 		return 0;
2864 
2865 	/* don't care for the reason here */
2866 	drbd_err(device, "submit failed, triggering re-connect\n");
2867 
2868 out_free_e:
2869 	spin_lock_irq(&device->resource->req_lock);
2870 	list_del(&peer_req->w.list);
2871 	spin_unlock_irq(&device->resource->req_lock);
2872 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2873 
2874 	put_ldev(device);
2875 	drbd_free_peer_req(device, peer_req);
2876 	return -EIO;
2877 }
2878 
2879 /**
2880  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2881  */
2882 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2883 {
2884 	struct drbd_device *device = peer_device->device;
2885 	int self, peer, rv = -100;
2886 	unsigned long ch_self, ch_peer;
2887 	enum drbd_after_sb_p after_sb_0p;
2888 
2889 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2890 	peer = device->p_uuid[UI_BITMAP] & 1;
2891 
2892 	ch_peer = device->p_uuid[UI_SIZE];
2893 	ch_self = device->comm_bm_set;
2894 
2895 	rcu_read_lock();
2896 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2897 	rcu_read_unlock();
2898 	switch (after_sb_0p) {
2899 	case ASB_CONSENSUS:
2900 	case ASB_DISCARD_SECONDARY:
2901 	case ASB_CALL_HELPER:
2902 	case ASB_VIOLENTLY:
2903 		drbd_err(device, "Configuration error.\n");
2904 		break;
2905 	case ASB_DISCONNECT:
2906 		break;
2907 	case ASB_DISCARD_YOUNGER_PRI:
2908 		if (self == 0 && peer == 1) {
2909 			rv = -1;
2910 			break;
2911 		}
2912 		if (self == 1 && peer == 0) {
2913 			rv =  1;
2914 			break;
2915 		}
2916 		/* Else fall through to one of the other strategies... */
2917 	case ASB_DISCARD_OLDER_PRI:
2918 		if (self == 0 && peer == 1) {
2919 			rv = 1;
2920 			break;
2921 		}
2922 		if (self == 1 && peer == 0) {
2923 			rv = -1;
2924 			break;
2925 		}
2926 		/* Else fall through to one of the other strategies... */
2927 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2928 		     "Using discard-least-changes instead\n");
2929 	case ASB_DISCARD_ZERO_CHG:
2930 		if (ch_peer == 0 && ch_self == 0) {
2931 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2932 				? -1 : 1;
2933 			break;
2934 		} else {
2935 			if (ch_peer == 0) { rv =  1; break; }
2936 			if (ch_self == 0) { rv = -1; break; }
2937 		}
2938 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2939 			break;
2940 	case ASB_DISCARD_LEAST_CHG:
2941 		if	(ch_self < ch_peer)
2942 			rv = -1;
2943 		else if (ch_self > ch_peer)
2944 			rv =  1;
2945 		else /* ( ch_self == ch_peer ) */
2946 		     /* Well, then use something else. */
2947 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2948 				? -1 : 1;
2949 		break;
2950 	case ASB_DISCARD_LOCAL:
2951 		rv = -1;
2952 		break;
2953 	case ASB_DISCARD_REMOTE:
2954 		rv =  1;
2955 	}
2956 
2957 	return rv;
2958 }
2959 
2960 /**
2961  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2962  */
2963 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2964 {
2965 	struct drbd_device *device = peer_device->device;
2966 	int hg, rv = -100;
2967 	enum drbd_after_sb_p after_sb_1p;
2968 
2969 	rcu_read_lock();
2970 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2971 	rcu_read_unlock();
2972 	switch (after_sb_1p) {
2973 	case ASB_DISCARD_YOUNGER_PRI:
2974 	case ASB_DISCARD_OLDER_PRI:
2975 	case ASB_DISCARD_LEAST_CHG:
2976 	case ASB_DISCARD_LOCAL:
2977 	case ASB_DISCARD_REMOTE:
2978 	case ASB_DISCARD_ZERO_CHG:
2979 		drbd_err(device, "Configuration error.\n");
2980 		break;
2981 	case ASB_DISCONNECT:
2982 		break;
2983 	case ASB_CONSENSUS:
2984 		hg = drbd_asb_recover_0p(peer_device);
2985 		if (hg == -1 && device->state.role == R_SECONDARY)
2986 			rv = hg;
2987 		if (hg == 1  && device->state.role == R_PRIMARY)
2988 			rv = hg;
2989 		break;
2990 	case ASB_VIOLENTLY:
2991 		rv = drbd_asb_recover_0p(peer_device);
2992 		break;
2993 	case ASB_DISCARD_SECONDARY:
2994 		return device->state.role == R_PRIMARY ? 1 : -1;
2995 	case ASB_CALL_HELPER:
2996 		hg = drbd_asb_recover_0p(peer_device);
2997 		if (hg == -1 && device->state.role == R_PRIMARY) {
2998 			enum drbd_state_rv rv2;
2999 
3000 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3001 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3002 			  * we do not need to wait for the after state change work either. */
3003 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3004 			if (rv2 != SS_SUCCESS) {
3005 				drbd_khelper(device, "pri-lost-after-sb");
3006 			} else {
3007 				drbd_warn(device, "Successfully gave up primary role.\n");
3008 				rv = hg;
3009 			}
3010 		} else
3011 			rv = hg;
3012 	}
3013 
3014 	return rv;
3015 }
3016 
3017 /**
3018  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3019  */
3020 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3021 {
3022 	struct drbd_device *device = peer_device->device;
3023 	int hg, rv = -100;
3024 	enum drbd_after_sb_p after_sb_2p;
3025 
3026 	rcu_read_lock();
3027 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3028 	rcu_read_unlock();
3029 	switch (after_sb_2p) {
3030 	case ASB_DISCARD_YOUNGER_PRI:
3031 	case ASB_DISCARD_OLDER_PRI:
3032 	case ASB_DISCARD_LEAST_CHG:
3033 	case ASB_DISCARD_LOCAL:
3034 	case ASB_DISCARD_REMOTE:
3035 	case ASB_CONSENSUS:
3036 	case ASB_DISCARD_SECONDARY:
3037 	case ASB_DISCARD_ZERO_CHG:
3038 		drbd_err(device, "Configuration error.\n");
3039 		break;
3040 	case ASB_VIOLENTLY:
3041 		rv = drbd_asb_recover_0p(peer_device);
3042 		break;
3043 	case ASB_DISCONNECT:
3044 		break;
3045 	case ASB_CALL_HELPER:
3046 		hg = drbd_asb_recover_0p(peer_device);
3047 		if (hg == -1) {
3048 			enum drbd_state_rv rv2;
3049 
3050 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3051 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3052 			  * we do not need to wait for the after state change work either. */
3053 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3054 			if (rv2 != SS_SUCCESS) {
3055 				drbd_khelper(device, "pri-lost-after-sb");
3056 			} else {
3057 				drbd_warn(device, "Successfully gave up primary role.\n");
3058 				rv = hg;
3059 			}
3060 		} else
3061 			rv = hg;
3062 	}
3063 
3064 	return rv;
3065 }
3066 
3067 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3068 			   u64 bits, u64 flags)
3069 {
3070 	if (!uuid) {
3071 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3072 		return;
3073 	}
3074 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3075 	     text,
3076 	     (unsigned long long)uuid[UI_CURRENT],
3077 	     (unsigned long long)uuid[UI_BITMAP],
3078 	     (unsigned long long)uuid[UI_HISTORY_START],
3079 	     (unsigned long long)uuid[UI_HISTORY_END],
3080 	     (unsigned long long)bits,
3081 	     (unsigned long long)flags);
3082 }
3083 
3084 /*
3085   100	after split brain try auto recover
3086     2	C_SYNC_SOURCE set BitMap
3087     1	C_SYNC_SOURCE use BitMap
3088     0	no Sync
3089    -1	C_SYNC_TARGET use BitMap
3090    -2	C_SYNC_TARGET set BitMap
3091  -100	after split brain, disconnect
3092 -1000	unrelated data
3093 -1091   requires proto 91
3094 -1096   requires proto 96
3095  */
3096 
3097 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3098 {
3099 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3100 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3101 	u64 self, peer;
3102 	int i, j;
3103 
3104 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3105 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3106 
3107 	*rule_nr = 10;
3108 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3109 		return 0;
3110 
3111 	*rule_nr = 20;
3112 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3113 	     peer != UUID_JUST_CREATED)
3114 		return -2;
3115 
3116 	*rule_nr = 30;
3117 	if (self != UUID_JUST_CREATED &&
3118 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3119 		return 2;
3120 
3121 	if (self == peer) {
3122 		int rct, dc; /* roles at crash time */
3123 
3124 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3125 
3126 			if (connection->agreed_pro_version < 91)
3127 				return -1091;
3128 
3129 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3130 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3131 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3132 				drbd_uuid_move_history(device);
3133 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3134 				device->ldev->md.uuid[UI_BITMAP] = 0;
3135 
3136 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3137 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3138 				*rule_nr = 34;
3139 			} else {
3140 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3141 				*rule_nr = 36;
3142 			}
3143 
3144 			return 1;
3145 		}
3146 
3147 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3148 
3149 			if (connection->agreed_pro_version < 91)
3150 				return -1091;
3151 
3152 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3153 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3154 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3155 
3156 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3157 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3158 				device->p_uuid[UI_BITMAP] = 0UL;
3159 
3160 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3161 				*rule_nr = 35;
3162 			} else {
3163 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3164 				*rule_nr = 37;
3165 			}
3166 
3167 			return -1;
3168 		}
3169 
3170 		/* Common power [off|failure] */
3171 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3172 			(device->p_uuid[UI_FLAGS] & 2);
3173 		/* lowest bit is set when we were primary,
3174 		 * next bit (weight 2) is set when peer was primary */
3175 		*rule_nr = 40;
3176 
3177 		/* Neither has the "crashed primary" flag set,
3178 		 * only a replication link hickup. */
3179 		if (rct == 0)
3180 			return 0;
3181 
3182 		/* Current UUID equal and no bitmap uuid; does not necessarily
3183 		 * mean this was a "simultaneous hard crash", maybe IO was
3184 		 * frozen, so no UUID-bump happened.
3185 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3186 		 * for "new-enough" peer DRBD version. */
3187 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3188 			*rule_nr = 41;
3189 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3190 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3191 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3192 			}
3193 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3194 				/* At least one has the "crashed primary" bit set,
3195 				 * both are primary now, but neither has rotated its UUIDs?
3196 				 * "Can not happen." */
3197 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3198 				return -100;
3199 			}
3200 			if (device->state.role == R_PRIMARY)
3201 				return 1;
3202 			return -1;
3203 		}
3204 
3205 		/* Both are secondary.
3206 		 * Really looks like recovery from simultaneous hard crash.
3207 		 * Check which had been primary before, and arbitrate. */
3208 		switch (rct) {
3209 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3210 		case 1: /*  self_pri && !peer_pri */ return 1;
3211 		case 2: /* !self_pri &&  peer_pri */ return -1;
3212 		case 3: /*  self_pri &&  peer_pri */
3213 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3214 			return dc ? -1 : 1;
3215 		}
3216 	}
3217 
3218 	*rule_nr = 50;
3219 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3220 	if (self == peer)
3221 		return -1;
3222 
3223 	*rule_nr = 51;
3224 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3225 	if (self == peer) {
3226 		if (connection->agreed_pro_version < 96 ?
3227 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3228 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3229 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3230 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3231 			   resync as sync source modifications of the peer's UUIDs. */
3232 
3233 			if (connection->agreed_pro_version < 91)
3234 				return -1091;
3235 
3236 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3237 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3238 
3239 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3240 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3241 
3242 			return -1;
3243 		}
3244 	}
3245 
3246 	*rule_nr = 60;
3247 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3248 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3249 		peer = device->p_uuid[i] & ~((u64)1);
3250 		if (self == peer)
3251 			return -2;
3252 	}
3253 
3254 	*rule_nr = 70;
3255 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3256 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3257 	if (self == peer)
3258 		return 1;
3259 
3260 	*rule_nr = 71;
3261 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3262 	if (self == peer) {
3263 		if (connection->agreed_pro_version < 96 ?
3264 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3265 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3266 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3267 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3268 			   resync as sync source modifications of our UUIDs. */
3269 
3270 			if (connection->agreed_pro_version < 91)
3271 				return -1091;
3272 
3273 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3274 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3275 
3276 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3277 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3278 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3279 
3280 			return 1;
3281 		}
3282 	}
3283 
3284 
3285 	*rule_nr = 80;
3286 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3287 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3288 		self = device->ldev->md.uuid[i] & ~((u64)1);
3289 		if (self == peer)
3290 			return 2;
3291 	}
3292 
3293 	*rule_nr = 90;
3294 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3295 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3296 	if (self == peer && self != ((u64)0))
3297 		return 100;
3298 
3299 	*rule_nr = 100;
3300 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3301 		self = device->ldev->md.uuid[i] & ~((u64)1);
3302 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3303 			peer = device->p_uuid[j] & ~((u64)1);
3304 			if (self == peer)
3305 				return -100;
3306 		}
3307 	}
3308 
3309 	return -1000;
3310 }
3311 
3312 /* drbd_sync_handshake() returns the new conn state on success, or
3313    CONN_MASK (-1) on failure.
3314  */
3315 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3316 					   enum drbd_role peer_role,
3317 					   enum drbd_disk_state peer_disk) __must_hold(local)
3318 {
3319 	struct drbd_device *device = peer_device->device;
3320 	enum drbd_conns rv = C_MASK;
3321 	enum drbd_disk_state mydisk;
3322 	struct net_conf *nc;
3323 	int hg, rule_nr, rr_conflict, tentative;
3324 
3325 	mydisk = device->state.disk;
3326 	if (mydisk == D_NEGOTIATING)
3327 		mydisk = device->new_state_tmp.disk;
3328 
3329 	drbd_info(device, "drbd_sync_handshake:\n");
3330 
3331 	spin_lock_irq(&device->ldev->md.uuid_lock);
3332 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3333 	drbd_uuid_dump(device, "peer", device->p_uuid,
3334 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3335 
3336 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3337 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3338 
3339 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3340 
3341 	if (hg == -1000) {
3342 		drbd_alert(device, "Unrelated data, aborting!\n");
3343 		return C_MASK;
3344 	}
3345 	if (hg < -0x10000) {
3346 		int proto, fflags;
3347 		hg = -hg;
3348 		proto = hg & 0xff;
3349 		fflags = (hg >> 8) & 0xff;
3350 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3351 					proto, fflags);
3352 		return C_MASK;
3353 	}
3354 	if (hg < -1000) {
3355 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3356 		return C_MASK;
3357 	}
3358 
3359 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3360 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3361 		int f = (hg == -100) || abs(hg) == 2;
3362 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3363 		if (f)
3364 			hg = hg*2;
3365 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3366 		     hg > 0 ? "source" : "target");
3367 	}
3368 
3369 	if (abs(hg) == 100)
3370 		drbd_khelper(device, "initial-split-brain");
3371 
3372 	rcu_read_lock();
3373 	nc = rcu_dereference(peer_device->connection->net_conf);
3374 
3375 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3376 		int pcount = (device->state.role == R_PRIMARY)
3377 			   + (peer_role == R_PRIMARY);
3378 		int forced = (hg == -100);
3379 
3380 		switch (pcount) {
3381 		case 0:
3382 			hg = drbd_asb_recover_0p(peer_device);
3383 			break;
3384 		case 1:
3385 			hg = drbd_asb_recover_1p(peer_device);
3386 			break;
3387 		case 2:
3388 			hg = drbd_asb_recover_2p(peer_device);
3389 			break;
3390 		}
3391 		if (abs(hg) < 100) {
3392 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3393 			     "automatically solved. Sync from %s node\n",
3394 			     pcount, (hg < 0) ? "peer" : "this");
3395 			if (forced) {
3396 				drbd_warn(device, "Doing a full sync, since"
3397 				     " UUIDs where ambiguous.\n");
3398 				hg = hg*2;
3399 			}
3400 		}
3401 	}
3402 
3403 	if (hg == -100) {
3404 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3405 			hg = -1;
3406 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3407 			hg = 1;
3408 
3409 		if (abs(hg) < 100)
3410 			drbd_warn(device, "Split-Brain detected, manually solved. "
3411 			     "Sync from %s node\n",
3412 			     (hg < 0) ? "peer" : "this");
3413 	}
3414 	rr_conflict = nc->rr_conflict;
3415 	tentative = nc->tentative;
3416 	rcu_read_unlock();
3417 
3418 	if (hg == -100) {
3419 		/* FIXME this log message is not correct if we end up here
3420 		 * after an attempted attach on a diskless node.
3421 		 * We just refuse to attach -- well, we drop the "connection"
3422 		 * to that disk, in a way... */
3423 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3424 		drbd_khelper(device, "split-brain");
3425 		return C_MASK;
3426 	}
3427 
3428 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3429 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3430 		return C_MASK;
3431 	}
3432 
3433 	if (hg < 0 && /* by intention we do not use mydisk here. */
3434 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3435 		switch (rr_conflict) {
3436 		case ASB_CALL_HELPER:
3437 			drbd_khelper(device, "pri-lost");
3438 			/* fall through */
3439 		case ASB_DISCONNECT:
3440 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3441 			return C_MASK;
3442 		case ASB_VIOLENTLY:
3443 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3444 			     "assumption\n");
3445 		}
3446 	}
3447 
3448 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3449 		if (hg == 0)
3450 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3451 		else
3452 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3453 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3454 				 abs(hg) >= 2 ? "full" : "bit-map based");
3455 		return C_MASK;
3456 	}
3457 
3458 	if (abs(hg) >= 2) {
3459 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3460 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3461 					BM_LOCKED_SET_ALLOWED))
3462 			return C_MASK;
3463 	}
3464 
3465 	if (hg > 0) { /* become sync source. */
3466 		rv = C_WF_BITMAP_S;
3467 	} else if (hg < 0) { /* become sync target */
3468 		rv = C_WF_BITMAP_T;
3469 	} else {
3470 		rv = C_CONNECTED;
3471 		if (drbd_bm_total_weight(device)) {
3472 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3473 			     drbd_bm_total_weight(device));
3474 		}
3475 	}
3476 
3477 	return rv;
3478 }
3479 
3480 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3481 {
3482 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3483 	if (peer == ASB_DISCARD_REMOTE)
3484 		return ASB_DISCARD_LOCAL;
3485 
3486 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3487 	if (peer == ASB_DISCARD_LOCAL)
3488 		return ASB_DISCARD_REMOTE;
3489 
3490 	/* everything else is valid if they are equal on both sides. */
3491 	return peer;
3492 }
3493 
3494 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3495 {
3496 	struct p_protocol *p = pi->data;
3497 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3498 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3499 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3500 	char integrity_alg[SHARED_SECRET_MAX] = "";
3501 	struct crypto_ahash *peer_integrity_tfm = NULL;
3502 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3503 
3504 	p_proto		= be32_to_cpu(p->protocol);
3505 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3506 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3507 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3508 	p_two_primaries = be32_to_cpu(p->two_primaries);
3509 	cf		= be32_to_cpu(p->conn_flags);
3510 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3511 
3512 	if (connection->agreed_pro_version >= 87) {
3513 		int err;
3514 
3515 		if (pi->size > sizeof(integrity_alg))
3516 			return -EIO;
3517 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3518 		if (err)
3519 			return err;
3520 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3521 	}
3522 
3523 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3524 		clear_bit(CONN_DRY_RUN, &connection->flags);
3525 
3526 		if (cf & CF_DRY_RUN)
3527 			set_bit(CONN_DRY_RUN, &connection->flags);
3528 
3529 		rcu_read_lock();
3530 		nc = rcu_dereference(connection->net_conf);
3531 
3532 		if (p_proto != nc->wire_protocol) {
3533 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3534 			goto disconnect_rcu_unlock;
3535 		}
3536 
3537 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3538 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3539 			goto disconnect_rcu_unlock;
3540 		}
3541 
3542 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3543 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3544 			goto disconnect_rcu_unlock;
3545 		}
3546 
3547 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3548 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3549 			goto disconnect_rcu_unlock;
3550 		}
3551 
3552 		if (p_discard_my_data && nc->discard_my_data) {
3553 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3554 			goto disconnect_rcu_unlock;
3555 		}
3556 
3557 		if (p_two_primaries != nc->two_primaries) {
3558 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3559 			goto disconnect_rcu_unlock;
3560 		}
3561 
3562 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3563 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3564 			goto disconnect_rcu_unlock;
3565 		}
3566 
3567 		rcu_read_unlock();
3568 	}
3569 
3570 	if (integrity_alg[0]) {
3571 		int hash_size;
3572 
3573 		/*
3574 		 * We can only change the peer data integrity algorithm
3575 		 * here.  Changing our own data integrity algorithm
3576 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3577 		 * the same time; otherwise, the peer has no way to
3578 		 * tell between which packets the algorithm should
3579 		 * change.
3580 		 */
3581 
3582 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3583 		if (IS_ERR(peer_integrity_tfm)) {
3584 			peer_integrity_tfm = NULL;
3585 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3586 				 integrity_alg);
3587 			goto disconnect;
3588 		}
3589 
3590 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3591 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3592 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3593 		if (!(int_dig_in && int_dig_vv)) {
3594 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3595 			goto disconnect;
3596 		}
3597 	}
3598 
3599 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3600 	if (!new_net_conf) {
3601 		drbd_err(connection, "Allocation of new net_conf failed\n");
3602 		goto disconnect;
3603 	}
3604 
3605 	mutex_lock(&connection->data.mutex);
3606 	mutex_lock(&connection->resource->conf_update);
3607 	old_net_conf = connection->net_conf;
3608 	*new_net_conf = *old_net_conf;
3609 
3610 	new_net_conf->wire_protocol = p_proto;
3611 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3612 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3613 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3614 	new_net_conf->two_primaries = p_two_primaries;
3615 
3616 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3617 	mutex_unlock(&connection->resource->conf_update);
3618 	mutex_unlock(&connection->data.mutex);
3619 
3620 	crypto_free_ahash(connection->peer_integrity_tfm);
3621 	kfree(connection->int_dig_in);
3622 	kfree(connection->int_dig_vv);
3623 	connection->peer_integrity_tfm = peer_integrity_tfm;
3624 	connection->int_dig_in = int_dig_in;
3625 	connection->int_dig_vv = int_dig_vv;
3626 
3627 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3628 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3629 			  integrity_alg[0] ? integrity_alg : "(none)");
3630 
3631 	synchronize_rcu();
3632 	kfree(old_net_conf);
3633 	return 0;
3634 
3635 disconnect_rcu_unlock:
3636 	rcu_read_unlock();
3637 disconnect:
3638 	crypto_free_ahash(peer_integrity_tfm);
3639 	kfree(int_dig_in);
3640 	kfree(int_dig_vv);
3641 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3642 	return -EIO;
3643 }
3644 
3645 /* helper function
3646  * input: alg name, feature name
3647  * return: NULL (alg name was "")
3648  *         ERR_PTR(error) if something goes wrong
3649  *         or the crypto hash ptr, if it worked out ok. */
3650 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3651 		const char *alg, const char *name)
3652 {
3653 	struct crypto_ahash *tfm;
3654 
3655 	if (!alg[0])
3656 		return NULL;
3657 
3658 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3659 	if (IS_ERR(tfm)) {
3660 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3661 			alg, name, PTR_ERR(tfm));
3662 		return tfm;
3663 	}
3664 	return tfm;
3665 }
3666 
3667 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3668 {
3669 	void *buffer = connection->data.rbuf;
3670 	int size = pi->size;
3671 
3672 	while (size) {
3673 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3674 		s = drbd_recv(connection, buffer, s);
3675 		if (s <= 0) {
3676 			if (s < 0)
3677 				return s;
3678 			break;
3679 		}
3680 		size -= s;
3681 	}
3682 	if (size)
3683 		return -EIO;
3684 	return 0;
3685 }
3686 
3687 /*
3688  * config_unknown_volume  -  device configuration command for unknown volume
3689  *
3690  * When a device is added to an existing connection, the node on which the
3691  * device is added first will send configuration commands to its peer but the
3692  * peer will not know about the device yet.  It will warn and ignore these
3693  * commands.  Once the device is added on the second node, the second node will
3694  * send the same device configuration commands, but in the other direction.
3695  *
3696  * (We can also end up here if drbd is misconfigured.)
3697  */
3698 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3699 {
3700 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3701 		  cmdname(pi->cmd), pi->vnr);
3702 	return ignore_remaining_packet(connection, pi);
3703 }
3704 
3705 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3706 {
3707 	struct drbd_peer_device *peer_device;
3708 	struct drbd_device *device;
3709 	struct p_rs_param_95 *p;
3710 	unsigned int header_size, data_size, exp_max_sz;
3711 	struct crypto_ahash *verify_tfm = NULL;
3712 	struct crypto_ahash *csums_tfm = NULL;
3713 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3714 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3715 	const int apv = connection->agreed_pro_version;
3716 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3717 	int fifo_size = 0;
3718 	int err;
3719 
3720 	peer_device = conn_peer_device(connection, pi->vnr);
3721 	if (!peer_device)
3722 		return config_unknown_volume(connection, pi);
3723 	device = peer_device->device;
3724 
3725 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3726 		    : apv == 88 ? sizeof(struct p_rs_param)
3727 					+ SHARED_SECRET_MAX
3728 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3729 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3730 
3731 	if (pi->size > exp_max_sz) {
3732 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3733 		    pi->size, exp_max_sz);
3734 		return -EIO;
3735 	}
3736 
3737 	if (apv <= 88) {
3738 		header_size = sizeof(struct p_rs_param);
3739 		data_size = pi->size - header_size;
3740 	} else if (apv <= 94) {
3741 		header_size = sizeof(struct p_rs_param_89);
3742 		data_size = pi->size - header_size;
3743 		D_ASSERT(device, data_size == 0);
3744 	} else {
3745 		header_size = sizeof(struct p_rs_param_95);
3746 		data_size = pi->size - header_size;
3747 		D_ASSERT(device, data_size == 0);
3748 	}
3749 
3750 	/* initialize verify_alg and csums_alg */
3751 	p = pi->data;
3752 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3753 
3754 	err = drbd_recv_all(peer_device->connection, p, header_size);
3755 	if (err)
3756 		return err;
3757 
3758 	mutex_lock(&connection->resource->conf_update);
3759 	old_net_conf = peer_device->connection->net_conf;
3760 	if (get_ldev(device)) {
3761 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3762 		if (!new_disk_conf) {
3763 			put_ldev(device);
3764 			mutex_unlock(&connection->resource->conf_update);
3765 			drbd_err(device, "Allocation of new disk_conf failed\n");
3766 			return -ENOMEM;
3767 		}
3768 
3769 		old_disk_conf = device->ldev->disk_conf;
3770 		*new_disk_conf = *old_disk_conf;
3771 
3772 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3773 	}
3774 
3775 	if (apv >= 88) {
3776 		if (apv == 88) {
3777 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3778 				drbd_err(device, "verify-alg of wrong size, "
3779 					"peer wants %u, accepting only up to %u byte\n",
3780 					data_size, SHARED_SECRET_MAX);
3781 				err = -EIO;
3782 				goto reconnect;
3783 			}
3784 
3785 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3786 			if (err)
3787 				goto reconnect;
3788 			/* we expect NUL terminated string */
3789 			/* but just in case someone tries to be evil */
3790 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3791 			p->verify_alg[data_size-1] = 0;
3792 
3793 		} else /* apv >= 89 */ {
3794 			/* we still expect NUL terminated strings */
3795 			/* but just in case someone tries to be evil */
3796 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3797 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3798 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3799 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3800 		}
3801 
3802 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3803 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3804 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3805 				    old_net_conf->verify_alg, p->verify_alg);
3806 				goto disconnect;
3807 			}
3808 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3809 					p->verify_alg, "verify-alg");
3810 			if (IS_ERR(verify_tfm)) {
3811 				verify_tfm = NULL;
3812 				goto disconnect;
3813 			}
3814 		}
3815 
3816 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3817 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3818 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3819 				    old_net_conf->csums_alg, p->csums_alg);
3820 				goto disconnect;
3821 			}
3822 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3823 					p->csums_alg, "csums-alg");
3824 			if (IS_ERR(csums_tfm)) {
3825 				csums_tfm = NULL;
3826 				goto disconnect;
3827 			}
3828 		}
3829 
3830 		if (apv > 94 && new_disk_conf) {
3831 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3832 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3833 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3834 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3835 
3836 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3837 			if (fifo_size != device->rs_plan_s->size) {
3838 				new_plan = fifo_alloc(fifo_size);
3839 				if (!new_plan) {
3840 					drbd_err(device, "kmalloc of fifo_buffer failed");
3841 					put_ldev(device);
3842 					goto disconnect;
3843 				}
3844 			}
3845 		}
3846 
3847 		if (verify_tfm || csums_tfm) {
3848 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3849 			if (!new_net_conf) {
3850 				drbd_err(device, "Allocation of new net_conf failed\n");
3851 				goto disconnect;
3852 			}
3853 
3854 			*new_net_conf = *old_net_conf;
3855 
3856 			if (verify_tfm) {
3857 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3858 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3859 				crypto_free_ahash(peer_device->connection->verify_tfm);
3860 				peer_device->connection->verify_tfm = verify_tfm;
3861 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3862 			}
3863 			if (csums_tfm) {
3864 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3865 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3866 				crypto_free_ahash(peer_device->connection->csums_tfm);
3867 				peer_device->connection->csums_tfm = csums_tfm;
3868 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3869 			}
3870 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3871 		}
3872 	}
3873 
3874 	if (new_disk_conf) {
3875 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3876 		put_ldev(device);
3877 	}
3878 
3879 	if (new_plan) {
3880 		old_plan = device->rs_plan_s;
3881 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3882 	}
3883 
3884 	mutex_unlock(&connection->resource->conf_update);
3885 	synchronize_rcu();
3886 	if (new_net_conf)
3887 		kfree(old_net_conf);
3888 	kfree(old_disk_conf);
3889 	kfree(old_plan);
3890 
3891 	return 0;
3892 
3893 reconnect:
3894 	if (new_disk_conf) {
3895 		put_ldev(device);
3896 		kfree(new_disk_conf);
3897 	}
3898 	mutex_unlock(&connection->resource->conf_update);
3899 	return -EIO;
3900 
3901 disconnect:
3902 	kfree(new_plan);
3903 	if (new_disk_conf) {
3904 		put_ldev(device);
3905 		kfree(new_disk_conf);
3906 	}
3907 	mutex_unlock(&connection->resource->conf_update);
3908 	/* just for completeness: actually not needed,
3909 	 * as this is not reached if csums_tfm was ok. */
3910 	crypto_free_ahash(csums_tfm);
3911 	/* but free the verify_tfm again, if csums_tfm did not work out */
3912 	crypto_free_ahash(verify_tfm);
3913 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3914 	return -EIO;
3915 }
3916 
3917 /* warn if the arguments differ by more than 12.5% */
3918 static void warn_if_differ_considerably(struct drbd_device *device,
3919 	const char *s, sector_t a, sector_t b)
3920 {
3921 	sector_t d;
3922 	if (a == 0 || b == 0)
3923 		return;
3924 	d = (a > b) ? (a - b) : (b - a);
3925 	if (d > (a>>3) || d > (b>>3))
3926 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3927 		     (unsigned long long)a, (unsigned long long)b);
3928 }
3929 
3930 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3931 {
3932 	struct drbd_peer_device *peer_device;
3933 	struct drbd_device *device;
3934 	struct p_sizes *p = pi->data;
3935 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3936 	enum determine_dev_size dd = DS_UNCHANGED;
3937 	sector_t p_size, p_usize, p_csize, my_usize;
3938 	int ldsc = 0; /* local disk size changed */
3939 	enum dds_flags ddsf;
3940 
3941 	peer_device = conn_peer_device(connection, pi->vnr);
3942 	if (!peer_device)
3943 		return config_unknown_volume(connection, pi);
3944 	device = peer_device->device;
3945 
3946 	p_size = be64_to_cpu(p->d_size);
3947 	p_usize = be64_to_cpu(p->u_size);
3948 	p_csize = be64_to_cpu(p->c_size);
3949 
3950 	/* just store the peer's disk size for now.
3951 	 * we still need to figure out whether we accept that. */
3952 	device->p_size = p_size;
3953 
3954 	if (get_ldev(device)) {
3955 		sector_t new_size, cur_size;
3956 		rcu_read_lock();
3957 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3958 		rcu_read_unlock();
3959 
3960 		warn_if_differ_considerably(device, "lower level device sizes",
3961 			   p_size, drbd_get_max_capacity(device->ldev));
3962 		warn_if_differ_considerably(device, "user requested size",
3963 					    p_usize, my_usize);
3964 
3965 		/* if this is the first connect, or an otherwise expected
3966 		 * param exchange, choose the minimum */
3967 		if (device->state.conn == C_WF_REPORT_PARAMS)
3968 			p_usize = min_not_zero(my_usize, p_usize);
3969 
3970 		/* Never shrink a device with usable data during connect.
3971 		   But allow online shrinking if we are connected. */
3972 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3973 		cur_size = drbd_get_capacity(device->this_bdev);
3974 		if (new_size < cur_size &&
3975 		    device->state.disk >= D_OUTDATED &&
3976 		    device->state.conn < C_CONNECTED) {
3977 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3978 					(unsigned long long)new_size, (unsigned long long)cur_size);
3979 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3980 			put_ldev(device);
3981 			return -EIO;
3982 		}
3983 
3984 		if (my_usize != p_usize) {
3985 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3986 
3987 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3988 			if (!new_disk_conf) {
3989 				drbd_err(device, "Allocation of new disk_conf failed\n");
3990 				put_ldev(device);
3991 				return -ENOMEM;
3992 			}
3993 
3994 			mutex_lock(&connection->resource->conf_update);
3995 			old_disk_conf = device->ldev->disk_conf;
3996 			*new_disk_conf = *old_disk_conf;
3997 			new_disk_conf->disk_size = p_usize;
3998 
3999 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4000 			mutex_unlock(&connection->resource->conf_update);
4001 			synchronize_rcu();
4002 			kfree(old_disk_conf);
4003 
4004 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
4005 				 (unsigned long)my_usize);
4006 		}
4007 
4008 		put_ldev(device);
4009 	}
4010 
4011 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4012 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4013 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4014 	   drbd_reconsider_queue_parameters(), we can be sure that after
4015 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4016 
4017 	ddsf = be16_to_cpu(p->dds_flags);
4018 	if (get_ldev(device)) {
4019 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4020 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4021 		put_ldev(device);
4022 		if (dd == DS_ERROR)
4023 			return -EIO;
4024 		drbd_md_sync(device);
4025 	} else {
4026 		/*
4027 		 * I am diskless, need to accept the peer's *current* size.
4028 		 * I must NOT accept the peers backing disk size,
4029 		 * it may have been larger than mine all along...
4030 		 *
4031 		 * At this point, the peer knows more about my disk, or at
4032 		 * least about what we last agreed upon, than myself.
4033 		 * So if his c_size is less than his d_size, the most likely
4034 		 * reason is that *my* d_size was smaller last time we checked.
4035 		 *
4036 		 * However, if he sends a zero current size,
4037 		 * take his (user-capped or) backing disk size anyways.
4038 		 */
4039 		drbd_reconsider_queue_parameters(device, NULL, o);
4040 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4041 	}
4042 
4043 	if (get_ldev(device)) {
4044 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4045 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4046 			ldsc = 1;
4047 		}
4048 
4049 		put_ldev(device);
4050 	}
4051 
4052 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4053 		if (be64_to_cpu(p->c_size) !=
4054 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4055 			/* we have different sizes, probably peer
4056 			 * needs to know my new size... */
4057 			drbd_send_sizes(peer_device, 0, ddsf);
4058 		}
4059 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4060 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4061 			if (device->state.pdsk >= D_INCONSISTENT &&
4062 			    device->state.disk >= D_INCONSISTENT) {
4063 				if (ddsf & DDSF_NO_RESYNC)
4064 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4065 				else
4066 					resync_after_online_grow(device);
4067 			} else
4068 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4069 		}
4070 	}
4071 
4072 	return 0;
4073 }
4074 
4075 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4076 {
4077 	struct drbd_peer_device *peer_device;
4078 	struct drbd_device *device;
4079 	struct p_uuids *p = pi->data;
4080 	u64 *p_uuid;
4081 	int i, updated_uuids = 0;
4082 
4083 	peer_device = conn_peer_device(connection, pi->vnr);
4084 	if (!peer_device)
4085 		return config_unknown_volume(connection, pi);
4086 	device = peer_device->device;
4087 
4088 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4089 	if (!p_uuid) {
4090 		drbd_err(device, "kmalloc of p_uuid failed\n");
4091 		return false;
4092 	}
4093 
4094 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4095 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4096 
4097 	kfree(device->p_uuid);
4098 	device->p_uuid = p_uuid;
4099 
4100 	if (device->state.conn < C_CONNECTED &&
4101 	    device->state.disk < D_INCONSISTENT &&
4102 	    device->state.role == R_PRIMARY &&
4103 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4104 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4105 		    (unsigned long long)device->ed_uuid);
4106 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4107 		return -EIO;
4108 	}
4109 
4110 	if (get_ldev(device)) {
4111 		int skip_initial_sync =
4112 			device->state.conn == C_CONNECTED &&
4113 			peer_device->connection->agreed_pro_version >= 90 &&
4114 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4115 			(p_uuid[UI_FLAGS] & 8);
4116 		if (skip_initial_sync) {
4117 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4118 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4119 					"clear_n_write from receive_uuids",
4120 					BM_LOCKED_TEST_ALLOWED);
4121 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4122 			_drbd_uuid_set(device, UI_BITMAP, 0);
4123 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4124 					CS_VERBOSE, NULL);
4125 			drbd_md_sync(device);
4126 			updated_uuids = 1;
4127 		}
4128 		put_ldev(device);
4129 	} else if (device->state.disk < D_INCONSISTENT &&
4130 		   device->state.role == R_PRIMARY) {
4131 		/* I am a diskless primary, the peer just created a new current UUID
4132 		   for me. */
4133 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4134 	}
4135 
4136 	/* Before we test for the disk state, we should wait until an eventually
4137 	   ongoing cluster wide state change is finished. That is important if
4138 	   we are primary and are detaching from our disk. We need to see the
4139 	   new disk state... */
4140 	mutex_lock(device->state_mutex);
4141 	mutex_unlock(device->state_mutex);
4142 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4143 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4144 
4145 	if (updated_uuids)
4146 		drbd_print_uuids(device, "receiver updated UUIDs to");
4147 
4148 	return 0;
4149 }
4150 
4151 /**
4152  * convert_state() - Converts the peer's view of the cluster state to our point of view
4153  * @ps:		The state as seen by the peer.
4154  */
4155 static union drbd_state convert_state(union drbd_state ps)
4156 {
4157 	union drbd_state ms;
4158 
4159 	static enum drbd_conns c_tab[] = {
4160 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4161 		[C_CONNECTED] = C_CONNECTED,
4162 
4163 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4164 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4165 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4166 		[C_VERIFY_S]       = C_VERIFY_T,
4167 		[C_MASK]   = C_MASK,
4168 	};
4169 
4170 	ms.i = ps.i;
4171 
4172 	ms.conn = c_tab[ps.conn];
4173 	ms.peer = ps.role;
4174 	ms.role = ps.peer;
4175 	ms.pdsk = ps.disk;
4176 	ms.disk = ps.pdsk;
4177 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4178 
4179 	return ms;
4180 }
4181 
4182 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4183 {
4184 	struct drbd_peer_device *peer_device;
4185 	struct drbd_device *device;
4186 	struct p_req_state *p = pi->data;
4187 	union drbd_state mask, val;
4188 	enum drbd_state_rv rv;
4189 
4190 	peer_device = conn_peer_device(connection, pi->vnr);
4191 	if (!peer_device)
4192 		return -EIO;
4193 	device = peer_device->device;
4194 
4195 	mask.i = be32_to_cpu(p->mask);
4196 	val.i = be32_to_cpu(p->val);
4197 
4198 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4199 	    mutex_is_locked(device->state_mutex)) {
4200 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4201 		return 0;
4202 	}
4203 
4204 	mask = convert_state(mask);
4205 	val = convert_state(val);
4206 
4207 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4208 	drbd_send_sr_reply(peer_device, rv);
4209 
4210 	drbd_md_sync(device);
4211 
4212 	return 0;
4213 }
4214 
4215 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4216 {
4217 	struct p_req_state *p = pi->data;
4218 	union drbd_state mask, val;
4219 	enum drbd_state_rv rv;
4220 
4221 	mask.i = be32_to_cpu(p->mask);
4222 	val.i = be32_to_cpu(p->val);
4223 
4224 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4225 	    mutex_is_locked(&connection->cstate_mutex)) {
4226 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4227 		return 0;
4228 	}
4229 
4230 	mask = convert_state(mask);
4231 	val = convert_state(val);
4232 
4233 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4234 	conn_send_sr_reply(connection, rv);
4235 
4236 	return 0;
4237 }
4238 
4239 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4240 {
4241 	struct drbd_peer_device *peer_device;
4242 	struct drbd_device *device;
4243 	struct p_state *p = pi->data;
4244 	union drbd_state os, ns, peer_state;
4245 	enum drbd_disk_state real_peer_disk;
4246 	enum chg_state_flags cs_flags;
4247 	int rv;
4248 
4249 	peer_device = conn_peer_device(connection, pi->vnr);
4250 	if (!peer_device)
4251 		return config_unknown_volume(connection, pi);
4252 	device = peer_device->device;
4253 
4254 	peer_state.i = be32_to_cpu(p->state);
4255 
4256 	real_peer_disk = peer_state.disk;
4257 	if (peer_state.disk == D_NEGOTIATING) {
4258 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4259 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4260 	}
4261 
4262 	spin_lock_irq(&device->resource->req_lock);
4263  retry:
4264 	os = ns = drbd_read_state(device);
4265 	spin_unlock_irq(&device->resource->req_lock);
4266 
4267 	/* If some other part of the code (ack_receiver thread, timeout)
4268 	 * already decided to close the connection again,
4269 	 * we must not "re-establish" it here. */
4270 	if (os.conn <= C_TEAR_DOWN)
4271 		return -ECONNRESET;
4272 
4273 	/* If this is the "end of sync" confirmation, usually the peer disk
4274 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4275 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4276 	 * unpause-sync events has been "just right", the peer disk may
4277 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4278 	 */
4279 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4280 	    real_peer_disk == D_UP_TO_DATE &&
4281 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4282 		/* If we are (becoming) SyncSource, but peer is still in sync
4283 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4284 		 * will change to inconsistent once the peer reaches active
4285 		 * syncing states.
4286 		 * It may have changed syncer-paused flags, however, so we
4287 		 * cannot ignore this completely. */
4288 		if (peer_state.conn > C_CONNECTED &&
4289 		    peer_state.conn < C_SYNC_SOURCE)
4290 			real_peer_disk = D_INCONSISTENT;
4291 
4292 		/* if peer_state changes to connected at the same time,
4293 		 * it explicitly notifies us that it finished resync.
4294 		 * Maybe we should finish it up, too? */
4295 		else if (os.conn >= C_SYNC_SOURCE &&
4296 			 peer_state.conn == C_CONNECTED) {
4297 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4298 				drbd_resync_finished(device);
4299 			return 0;
4300 		}
4301 	}
4302 
4303 	/* explicit verify finished notification, stop sector reached. */
4304 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4305 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4306 		ov_out_of_sync_print(device);
4307 		drbd_resync_finished(device);
4308 		return 0;
4309 	}
4310 
4311 	/* peer says his disk is inconsistent, while we think it is uptodate,
4312 	 * and this happens while the peer still thinks we have a sync going on,
4313 	 * but we think we are already done with the sync.
4314 	 * We ignore this to avoid flapping pdsk.
4315 	 * This should not happen, if the peer is a recent version of drbd. */
4316 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4317 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4318 		real_peer_disk = D_UP_TO_DATE;
4319 
4320 	if (ns.conn == C_WF_REPORT_PARAMS)
4321 		ns.conn = C_CONNECTED;
4322 
4323 	if (peer_state.conn == C_AHEAD)
4324 		ns.conn = C_BEHIND;
4325 
4326 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4327 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4328 		int cr; /* consider resync */
4329 
4330 		/* if we established a new connection */
4331 		cr  = (os.conn < C_CONNECTED);
4332 		/* if we had an established connection
4333 		 * and one of the nodes newly attaches a disk */
4334 		cr |= (os.conn == C_CONNECTED &&
4335 		       (peer_state.disk == D_NEGOTIATING ||
4336 			os.disk == D_NEGOTIATING));
4337 		/* if we have both been inconsistent, and the peer has been
4338 		 * forced to be UpToDate with --overwrite-data */
4339 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4340 		/* if we had been plain connected, and the admin requested to
4341 		 * start a sync by "invalidate" or "invalidate-remote" */
4342 		cr |= (os.conn == C_CONNECTED &&
4343 				(peer_state.conn >= C_STARTING_SYNC_S &&
4344 				 peer_state.conn <= C_WF_BITMAP_T));
4345 
4346 		if (cr)
4347 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4348 
4349 		put_ldev(device);
4350 		if (ns.conn == C_MASK) {
4351 			ns.conn = C_CONNECTED;
4352 			if (device->state.disk == D_NEGOTIATING) {
4353 				drbd_force_state(device, NS(disk, D_FAILED));
4354 			} else if (peer_state.disk == D_NEGOTIATING) {
4355 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4356 				peer_state.disk = D_DISKLESS;
4357 				real_peer_disk = D_DISKLESS;
4358 			} else {
4359 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4360 					return -EIO;
4361 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4362 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4363 				return -EIO;
4364 			}
4365 		}
4366 	}
4367 
4368 	spin_lock_irq(&device->resource->req_lock);
4369 	if (os.i != drbd_read_state(device).i)
4370 		goto retry;
4371 	clear_bit(CONSIDER_RESYNC, &device->flags);
4372 	ns.peer = peer_state.role;
4373 	ns.pdsk = real_peer_disk;
4374 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4375 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4376 		ns.disk = device->new_state_tmp.disk;
4377 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4378 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4379 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4380 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4381 		   for temporal network outages! */
4382 		spin_unlock_irq(&device->resource->req_lock);
4383 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4384 		tl_clear(peer_device->connection);
4385 		drbd_uuid_new_current(device);
4386 		clear_bit(NEW_CUR_UUID, &device->flags);
4387 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4388 		return -EIO;
4389 	}
4390 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4391 	ns = drbd_read_state(device);
4392 	spin_unlock_irq(&device->resource->req_lock);
4393 
4394 	if (rv < SS_SUCCESS) {
4395 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4396 		return -EIO;
4397 	}
4398 
4399 	if (os.conn > C_WF_REPORT_PARAMS) {
4400 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4401 		    peer_state.disk != D_NEGOTIATING ) {
4402 			/* we want resync, peer has not yet decided to sync... */
4403 			/* Nowadays only used when forcing a node into primary role and
4404 			   setting its disk to UpToDate with that */
4405 			drbd_send_uuids(peer_device);
4406 			drbd_send_current_state(peer_device);
4407 		}
4408 	}
4409 
4410 	clear_bit(DISCARD_MY_DATA, &device->flags);
4411 
4412 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4413 
4414 	return 0;
4415 }
4416 
4417 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4418 {
4419 	struct drbd_peer_device *peer_device;
4420 	struct drbd_device *device;
4421 	struct p_rs_uuid *p = pi->data;
4422 
4423 	peer_device = conn_peer_device(connection, pi->vnr);
4424 	if (!peer_device)
4425 		return -EIO;
4426 	device = peer_device->device;
4427 
4428 	wait_event(device->misc_wait,
4429 		   device->state.conn == C_WF_SYNC_UUID ||
4430 		   device->state.conn == C_BEHIND ||
4431 		   device->state.conn < C_CONNECTED ||
4432 		   device->state.disk < D_NEGOTIATING);
4433 
4434 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4435 
4436 	/* Here the _drbd_uuid_ functions are right, current should
4437 	   _not_ be rotated into the history */
4438 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4439 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4440 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4441 
4442 		drbd_print_uuids(device, "updated sync uuid");
4443 		drbd_start_resync(device, C_SYNC_TARGET);
4444 
4445 		put_ldev(device);
4446 	} else
4447 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4448 
4449 	return 0;
4450 }
4451 
4452 /**
4453  * receive_bitmap_plain
4454  *
4455  * Return 0 when done, 1 when another iteration is needed, and a negative error
4456  * code upon failure.
4457  */
4458 static int
4459 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4460 		     unsigned long *p, struct bm_xfer_ctx *c)
4461 {
4462 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4463 				 drbd_header_size(peer_device->connection);
4464 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4465 				       c->bm_words - c->word_offset);
4466 	unsigned int want = num_words * sizeof(*p);
4467 	int err;
4468 
4469 	if (want != size) {
4470 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4471 		return -EIO;
4472 	}
4473 	if (want == 0)
4474 		return 0;
4475 	err = drbd_recv_all(peer_device->connection, p, want);
4476 	if (err)
4477 		return err;
4478 
4479 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4480 
4481 	c->word_offset += num_words;
4482 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4483 	if (c->bit_offset > c->bm_bits)
4484 		c->bit_offset = c->bm_bits;
4485 
4486 	return 1;
4487 }
4488 
4489 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4490 {
4491 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4492 }
4493 
4494 static int dcbp_get_start(struct p_compressed_bm *p)
4495 {
4496 	return (p->encoding & 0x80) != 0;
4497 }
4498 
4499 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4500 {
4501 	return (p->encoding >> 4) & 0x7;
4502 }
4503 
4504 /**
4505  * recv_bm_rle_bits
4506  *
4507  * Return 0 when done, 1 when another iteration is needed, and a negative error
4508  * code upon failure.
4509  */
4510 static int
4511 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4512 		struct p_compressed_bm *p,
4513 		 struct bm_xfer_ctx *c,
4514 		 unsigned int len)
4515 {
4516 	struct bitstream bs;
4517 	u64 look_ahead;
4518 	u64 rl;
4519 	u64 tmp;
4520 	unsigned long s = c->bit_offset;
4521 	unsigned long e;
4522 	int toggle = dcbp_get_start(p);
4523 	int have;
4524 	int bits;
4525 
4526 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4527 
4528 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4529 	if (bits < 0)
4530 		return -EIO;
4531 
4532 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4533 		bits = vli_decode_bits(&rl, look_ahead);
4534 		if (bits <= 0)
4535 			return -EIO;
4536 
4537 		if (toggle) {
4538 			e = s + rl -1;
4539 			if (e >= c->bm_bits) {
4540 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4541 				return -EIO;
4542 			}
4543 			_drbd_bm_set_bits(peer_device->device, s, e);
4544 		}
4545 
4546 		if (have < bits) {
4547 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4548 				have, bits, look_ahead,
4549 				(unsigned int)(bs.cur.b - p->code),
4550 				(unsigned int)bs.buf_len);
4551 			return -EIO;
4552 		}
4553 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4554 		if (likely(bits < 64))
4555 			look_ahead >>= bits;
4556 		else
4557 			look_ahead = 0;
4558 		have -= bits;
4559 
4560 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4561 		if (bits < 0)
4562 			return -EIO;
4563 		look_ahead |= tmp << have;
4564 		have += bits;
4565 	}
4566 
4567 	c->bit_offset = s;
4568 	bm_xfer_ctx_bit_to_word_offset(c);
4569 
4570 	return (s != c->bm_bits);
4571 }
4572 
4573 /**
4574  * decode_bitmap_c
4575  *
4576  * Return 0 when done, 1 when another iteration is needed, and a negative error
4577  * code upon failure.
4578  */
4579 static int
4580 decode_bitmap_c(struct drbd_peer_device *peer_device,
4581 		struct p_compressed_bm *p,
4582 		struct bm_xfer_ctx *c,
4583 		unsigned int len)
4584 {
4585 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4586 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4587 
4588 	/* other variants had been implemented for evaluation,
4589 	 * but have been dropped as this one turned out to be "best"
4590 	 * during all our tests. */
4591 
4592 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4593 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4594 	return -EIO;
4595 }
4596 
4597 void INFO_bm_xfer_stats(struct drbd_device *device,
4598 		const char *direction, struct bm_xfer_ctx *c)
4599 {
4600 	/* what would it take to transfer it "plaintext" */
4601 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4602 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4603 	unsigned int plain =
4604 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4605 		c->bm_words * sizeof(unsigned long);
4606 	unsigned int total = c->bytes[0] + c->bytes[1];
4607 	unsigned int r;
4608 
4609 	/* total can not be zero. but just in case: */
4610 	if (total == 0)
4611 		return;
4612 
4613 	/* don't report if not compressed */
4614 	if (total >= plain)
4615 		return;
4616 
4617 	/* total < plain. check for overflow, still */
4618 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4619 		                    : (1000 * total / plain);
4620 
4621 	if (r > 1000)
4622 		r = 1000;
4623 
4624 	r = 1000 - r;
4625 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4626 	     "total %u; compression: %u.%u%%\n",
4627 			direction,
4628 			c->bytes[1], c->packets[1],
4629 			c->bytes[0], c->packets[0],
4630 			total, r/10, r % 10);
4631 }
4632 
4633 /* Since we are processing the bitfield from lower addresses to higher,
4634    it does not matter if the process it in 32 bit chunks or 64 bit
4635    chunks as long as it is little endian. (Understand it as byte stream,
4636    beginning with the lowest byte...) If we would use big endian
4637    we would need to process it from the highest address to the lowest,
4638    in order to be agnostic to the 32 vs 64 bits issue.
4639 
4640    returns 0 on failure, 1 if we successfully received it. */
4641 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4642 {
4643 	struct drbd_peer_device *peer_device;
4644 	struct drbd_device *device;
4645 	struct bm_xfer_ctx c;
4646 	int err;
4647 
4648 	peer_device = conn_peer_device(connection, pi->vnr);
4649 	if (!peer_device)
4650 		return -EIO;
4651 	device = peer_device->device;
4652 
4653 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4654 	/* you are supposed to send additional out-of-sync information
4655 	 * if you actually set bits during this phase */
4656 
4657 	c = (struct bm_xfer_ctx) {
4658 		.bm_bits = drbd_bm_bits(device),
4659 		.bm_words = drbd_bm_words(device),
4660 	};
4661 
4662 	for(;;) {
4663 		if (pi->cmd == P_BITMAP)
4664 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4665 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4666 			/* MAYBE: sanity check that we speak proto >= 90,
4667 			 * and the feature is enabled! */
4668 			struct p_compressed_bm *p = pi->data;
4669 
4670 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4671 				drbd_err(device, "ReportCBitmap packet too large\n");
4672 				err = -EIO;
4673 				goto out;
4674 			}
4675 			if (pi->size <= sizeof(*p)) {
4676 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4677 				err = -EIO;
4678 				goto out;
4679 			}
4680 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4681 			if (err)
4682 			       goto out;
4683 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4684 		} else {
4685 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4686 			err = -EIO;
4687 			goto out;
4688 		}
4689 
4690 		c.packets[pi->cmd == P_BITMAP]++;
4691 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4692 
4693 		if (err <= 0) {
4694 			if (err < 0)
4695 				goto out;
4696 			break;
4697 		}
4698 		err = drbd_recv_header(peer_device->connection, pi);
4699 		if (err)
4700 			goto out;
4701 	}
4702 
4703 	INFO_bm_xfer_stats(device, "receive", &c);
4704 
4705 	if (device->state.conn == C_WF_BITMAP_T) {
4706 		enum drbd_state_rv rv;
4707 
4708 		err = drbd_send_bitmap(device);
4709 		if (err)
4710 			goto out;
4711 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4712 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4713 		D_ASSERT(device, rv == SS_SUCCESS);
4714 	} else if (device->state.conn != C_WF_BITMAP_S) {
4715 		/* admin may have requested C_DISCONNECTING,
4716 		 * other threads may have noticed network errors */
4717 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4718 		    drbd_conn_str(device->state.conn));
4719 	}
4720 	err = 0;
4721 
4722  out:
4723 	drbd_bm_unlock(device);
4724 	if (!err && device->state.conn == C_WF_BITMAP_S)
4725 		drbd_start_resync(device, C_SYNC_SOURCE);
4726 	return err;
4727 }
4728 
4729 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4730 {
4731 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4732 		 pi->cmd, pi->size);
4733 
4734 	return ignore_remaining_packet(connection, pi);
4735 }
4736 
4737 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4738 {
4739 	/* Make sure we've acked all the TCP data associated
4740 	 * with the data requests being unplugged */
4741 	drbd_tcp_quickack(connection->data.socket);
4742 
4743 	return 0;
4744 }
4745 
4746 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4747 {
4748 	struct drbd_peer_device *peer_device;
4749 	struct drbd_device *device;
4750 	struct p_block_desc *p = pi->data;
4751 
4752 	peer_device = conn_peer_device(connection, pi->vnr);
4753 	if (!peer_device)
4754 		return -EIO;
4755 	device = peer_device->device;
4756 
4757 	switch (device->state.conn) {
4758 	case C_WF_SYNC_UUID:
4759 	case C_WF_BITMAP_T:
4760 	case C_BEHIND:
4761 			break;
4762 	default:
4763 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4764 				drbd_conn_str(device->state.conn));
4765 	}
4766 
4767 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4768 
4769 	return 0;
4770 }
4771 
4772 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4773 {
4774 	struct drbd_peer_device *peer_device;
4775 	struct p_block_desc *p = pi->data;
4776 	struct drbd_device *device;
4777 	sector_t sector;
4778 	int size, err = 0;
4779 
4780 	peer_device = conn_peer_device(connection, pi->vnr);
4781 	if (!peer_device)
4782 		return -EIO;
4783 	device = peer_device->device;
4784 
4785 	sector = be64_to_cpu(p->sector);
4786 	size = be32_to_cpu(p->blksize);
4787 
4788 	dec_rs_pending(device);
4789 
4790 	if (get_ldev(device)) {
4791 		struct drbd_peer_request *peer_req;
4792 		const int op = REQ_OP_WRITE_ZEROES;
4793 
4794 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4795 					       size, 0, GFP_NOIO);
4796 		if (!peer_req) {
4797 			put_ldev(device);
4798 			return -ENOMEM;
4799 		}
4800 
4801 		peer_req->w.cb = e_end_resync_block;
4802 		peer_req->submit_jif = jiffies;
4803 		peer_req->flags |= EE_IS_TRIM;
4804 
4805 		spin_lock_irq(&device->resource->req_lock);
4806 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4807 		spin_unlock_irq(&device->resource->req_lock);
4808 
4809 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4810 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4811 
4812 		if (err) {
4813 			spin_lock_irq(&device->resource->req_lock);
4814 			list_del(&peer_req->w.list);
4815 			spin_unlock_irq(&device->resource->req_lock);
4816 
4817 			drbd_free_peer_req(device, peer_req);
4818 			put_ldev(device);
4819 			err = 0;
4820 			goto fail;
4821 		}
4822 
4823 		inc_unacked(device);
4824 
4825 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4826 		   as well as drbd_rs_complete_io() */
4827 	} else {
4828 	fail:
4829 		drbd_rs_complete_io(device, sector);
4830 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4831 	}
4832 
4833 	atomic_add(size >> 9, &device->rs_sect_in);
4834 
4835 	return err;
4836 }
4837 
4838 struct data_cmd {
4839 	int expect_payload;
4840 	unsigned int pkt_size;
4841 	int (*fn)(struct drbd_connection *, struct packet_info *);
4842 };
4843 
4844 static struct data_cmd drbd_cmd_handler[] = {
4845 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4846 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4847 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4848 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4849 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4850 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4851 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4852 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4853 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4854 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4855 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4856 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4857 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4858 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4859 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4860 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4861 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4862 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4863 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4864 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4865 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4866 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4867 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4868 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4869 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4870 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4871 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4872 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
4873 };
4874 
4875 static void drbdd(struct drbd_connection *connection)
4876 {
4877 	struct packet_info pi;
4878 	size_t shs; /* sub header size */
4879 	int err;
4880 
4881 	while (get_t_state(&connection->receiver) == RUNNING) {
4882 		struct data_cmd const *cmd;
4883 
4884 		drbd_thread_current_set_cpu(&connection->receiver);
4885 		update_receiver_timing_details(connection, drbd_recv_header);
4886 		if (drbd_recv_header(connection, &pi))
4887 			goto err_out;
4888 
4889 		cmd = &drbd_cmd_handler[pi.cmd];
4890 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4891 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4892 				 cmdname(pi.cmd), pi.cmd);
4893 			goto err_out;
4894 		}
4895 
4896 		shs = cmd->pkt_size;
4897 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4898 			shs += sizeof(struct o_qlim);
4899 		if (pi.size > shs && !cmd->expect_payload) {
4900 			drbd_err(connection, "No payload expected %s l:%d\n",
4901 				 cmdname(pi.cmd), pi.size);
4902 			goto err_out;
4903 		}
4904 		if (pi.size < shs) {
4905 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4906 				 cmdname(pi.cmd), (int)shs, pi.size);
4907 			goto err_out;
4908 		}
4909 
4910 		if (shs) {
4911 			update_receiver_timing_details(connection, drbd_recv_all_warn);
4912 			err = drbd_recv_all_warn(connection, pi.data, shs);
4913 			if (err)
4914 				goto err_out;
4915 			pi.size -= shs;
4916 		}
4917 
4918 		update_receiver_timing_details(connection, cmd->fn);
4919 		err = cmd->fn(connection, &pi);
4920 		if (err) {
4921 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4922 				 cmdname(pi.cmd), err, pi.size);
4923 			goto err_out;
4924 		}
4925 	}
4926 	return;
4927 
4928     err_out:
4929 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4930 }
4931 
4932 static void conn_disconnect(struct drbd_connection *connection)
4933 {
4934 	struct drbd_peer_device *peer_device;
4935 	enum drbd_conns oc;
4936 	int vnr;
4937 
4938 	if (connection->cstate == C_STANDALONE)
4939 		return;
4940 
4941 	/* We are about to start the cleanup after connection loss.
4942 	 * Make sure drbd_make_request knows about that.
4943 	 * Usually we should be in some network failure state already,
4944 	 * but just in case we are not, we fix it up here.
4945 	 */
4946 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4947 
4948 	/* ack_receiver does not clean up anything. it must not interfere, either */
4949 	drbd_thread_stop(&connection->ack_receiver);
4950 	if (connection->ack_sender) {
4951 		destroy_workqueue(connection->ack_sender);
4952 		connection->ack_sender = NULL;
4953 	}
4954 	drbd_free_sock(connection);
4955 
4956 	rcu_read_lock();
4957 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4958 		struct drbd_device *device = peer_device->device;
4959 		kref_get(&device->kref);
4960 		rcu_read_unlock();
4961 		drbd_disconnected(peer_device);
4962 		kref_put(&device->kref, drbd_destroy_device);
4963 		rcu_read_lock();
4964 	}
4965 	rcu_read_unlock();
4966 
4967 	if (!list_empty(&connection->current_epoch->list))
4968 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4969 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4970 	atomic_set(&connection->current_epoch->epoch_size, 0);
4971 	connection->send.seen_any_write_yet = false;
4972 
4973 	drbd_info(connection, "Connection closed\n");
4974 
4975 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4976 		conn_try_outdate_peer_async(connection);
4977 
4978 	spin_lock_irq(&connection->resource->req_lock);
4979 	oc = connection->cstate;
4980 	if (oc >= C_UNCONNECTED)
4981 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4982 
4983 	spin_unlock_irq(&connection->resource->req_lock);
4984 
4985 	if (oc == C_DISCONNECTING)
4986 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4987 }
4988 
4989 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4990 {
4991 	struct drbd_device *device = peer_device->device;
4992 	unsigned int i;
4993 
4994 	/* wait for current activity to cease. */
4995 	spin_lock_irq(&device->resource->req_lock);
4996 	_drbd_wait_ee_list_empty(device, &device->active_ee);
4997 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
4998 	_drbd_wait_ee_list_empty(device, &device->read_ee);
4999 	spin_unlock_irq(&device->resource->req_lock);
5000 
5001 	/* We do not have data structures that would allow us to
5002 	 * get the rs_pending_cnt down to 0 again.
5003 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5004 	 *    the pending RSDataRequest's we have sent.
5005 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5006 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5007 	 *  And no, it is not the sum of the reference counts in the
5008 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5009 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5010 	 *  on the fly. */
5011 	drbd_rs_cancel_all(device);
5012 	device->rs_total = 0;
5013 	device->rs_failed = 0;
5014 	atomic_set(&device->rs_pending_cnt, 0);
5015 	wake_up(&device->misc_wait);
5016 
5017 	del_timer_sync(&device->resync_timer);
5018 	resync_timer_fn((unsigned long)device);
5019 
5020 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5021 	 * w_make_resync_request etc. which may still be on the worker queue
5022 	 * to be "canceled" */
5023 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5024 
5025 	drbd_finish_peer_reqs(device);
5026 
5027 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5028 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5029 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5030 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5031 
5032 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5033 	 * again via drbd_try_clear_on_disk_bm(). */
5034 	drbd_rs_cancel_all(device);
5035 
5036 	kfree(device->p_uuid);
5037 	device->p_uuid = NULL;
5038 
5039 	if (!drbd_suspended(device))
5040 		tl_clear(peer_device->connection);
5041 
5042 	drbd_md_sync(device);
5043 
5044 	if (get_ldev(device)) {
5045 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5046 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5047 		put_ldev(device);
5048 	}
5049 
5050 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5051 	 * want to use SO_LINGER, because apparently it can be deferred for
5052 	 * more than 20 seconds (longest time I checked).
5053 	 *
5054 	 * Actually we don't care for exactly when the network stack does its
5055 	 * put_page(), but release our reference on these pages right here.
5056 	 */
5057 	i = drbd_free_peer_reqs(device, &device->net_ee);
5058 	if (i)
5059 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5060 	i = atomic_read(&device->pp_in_use_by_net);
5061 	if (i)
5062 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5063 	i = atomic_read(&device->pp_in_use);
5064 	if (i)
5065 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5066 
5067 	D_ASSERT(device, list_empty(&device->read_ee));
5068 	D_ASSERT(device, list_empty(&device->active_ee));
5069 	D_ASSERT(device, list_empty(&device->sync_ee));
5070 	D_ASSERT(device, list_empty(&device->done_ee));
5071 
5072 	return 0;
5073 }
5074 
5075 /*
5076  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5077  * we can agree on is stored in agreed_pro_version.
5078  *
5079  * feature flags and the reserved array should be enough room for future
5080  * enhancements of the handshake protocol, and possible plugins...
5081  *
5082  * for now, they are expected to be zero, but ignored.
5083  */
5084 static int drbd_send_features(struct drbd_connection *connection)
5085 {
5086 	struct drbd_socket *sock;
5087 	struct p_connection_features *p;
5088 
5089 	sock = &connection->data;
5090 	p = conn_prepare_command(connection, sock);
5091 	if (!p)
5092 		return -EIO;
5093 	memset(p, 0, sizeof(*p));
5094 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5095 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5096 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5097 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5098 }
5099 
5100 /*
5101  * return values:
5102  *   1 yes, we have a valid connection
5103  *   0 oops, did not work out, please try again
5104  *  -1 peer talks different language,
5105  *     no point in trying again, please go standalone.
5106  */
5107 static int drbd_do_features(struct drbd_connection *connection)
5108 {
5109 	/* ASSERT current == connection->receiver ... */
5110 	struct p_connection_features *p;
5111 	const int expect = sizeof(struct p_connection_features);
5112 	struct packet_info pi;
5113 	int err;
5114 
5115 	err = drbd_send_features(connection);
5116 	if (err)
5117 		return 0;
5118 
5119 	err = drbd_recv_header(connection, &pi);
5120 	if (err)
5121 		return 0;
5122 
5123 	if (pi.cmd != P_CONNECTION_FEATURES) {
5124 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5125 			 cmdname(pi.cmd), pi.cmd);
5126 		return -1;
5127 	}
5128 
5129 	if (pi.size != expect) {
5130 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5131 		     expect, pi.size);
5132 		return -1;
5133 	}
5134 
5135 	p = pi.data;
5136 	err = drbd_recv_all_warn(connection, p, expect);
5137 	if (err)
5138 		return 0;
5139 
5140 	p->protocol_min = be32_to_cpu(p->protocol_min);
5141 	p->protocol_max = be32_to_cpu(p->protocol_max);
5142 	if (p->protocol_max == 0)
5143 		p->protocol_max = p->protocol_min;
5144 
5145 	if (PRO_VERSION_MAX < p->protocol_min ||
5146 	    PRO_VERSION_MIN > p->protocol_max)
5147 		goto incompat;
5148 
5149 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5150 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5151 
5152 	drbd_info(connection, "Handshake successful: "
5153 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5154 
5155 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5156 		  connection->agreed_features,
5157 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5158 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5159 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5160 		  connection->agreed_features ? "" : " none");
5161 
5162 	return 1;
5163 
5164  incompat:
5165 	drbd_err(connection, "incompatible DRBD dialects: "
5166 	    "I support %d-%d, peer supports %d-%d\n",
5167 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5168 	    p->protocol_min, p->protocol_max);
5169 	return -1;
5170 }
5171 
5172 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5173 static int drbd_do_auth(struct drbd_connection *connection)
5174 {
5175 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5176 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5177 	return -1;
5178 }
5179 #else
5180 #define CHALLENGE_LEN 64
5181 
5182 /* Return value:
5183 	1 - auth succeeded,
5184 	0 - failed, try again (network error),
5185 	-1 - auth failed, don't try again.
5186 */
5187 
5188 static int drbd_do_auth(struct drbd_connection *connection)
5189 {
5190 	struct drbd_socket *sock;
5191 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5192 	char *response = NULL;
5193 	char *right_response = NULL;
5194 	char *peers_ch = NULL;
5195 	unsigned int key_len;
5196 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5197 	unsigned int resp_size;
5198 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5199 	struct packet_info pi;
5200 	struct net_conf *nc;
5201 	int err, rv;
5202 
5203 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5204 
5205 	rcu_read_lock();
5206 	nc = rcu_dereference(connection->net_conf);
5207 	key_len = strlen(nc->shared_secret);
5208 	memcpy(secret, nc->shared_secret, key_len);
5209 	rcu_read_unlock();
5210 
5211 	desc->tfm = connection->cram_hmac_tfm;
5212 	desc->flags = 0;
5213 
5214 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5215 	if (rv) {
5216 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5217 		rv = -1;
5218 		goto fail;
5219 	}
5220 
5221 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5222 
5223 	sock = &connection->data;
5224 	if (!conn_prepare_command(connection, sock)) {
5225 		rv = 0;
5226 		goto fail;
5227 	}
5228 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5229 				my_challenge, CHALLENGE_LEN);
5230 	if (!rv)
5231 		goto fail;
5232 
5233 	err = drbd_recv_header(connection, &pi);
5234 	if (err) {
5235 		rv = 0;
5236 		goto fail;
5237 	}
5238 
5239 	if (pi.cmd != P_AUTH_CHALLENGE) {
5240 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5241 			 cmdname(pi.cmd), pi.cmd);
5242 		rv = 0;
5243 		goto fail;
5244 	}
5245 
5246 	if (pi.size > CHALLENGE_LEN * 2) {
5247 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5248 		rv = -1;
5249 		goto fail;
5250 	}
5251 
5252 	if (pi.size < CHALLENGE_LEN) {
5253 		drbd_err(connection, "AuthChallenge payload too small.\n");
5254 		rv = -1;
5255 		goto fail;
5256 	}
5257 
5258 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5259 	if (peers_ch == NULL) {
5260 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5261 		rv = -1;
5262 		goto fail;
5263 	}
5264 
5265 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5266 	if (err) {
5267 		rv = 0;
5268 		goto fail;
5269 	}
5270 
5271 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5272 		drbd_err(connection, "Peer presented the same challenge!\n");
5273 		rv = -1;
5274 		goto fail;
5275 	}
5276 
5277 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5278 	response = kmalloc(resp_size, GFP_NOIO);
5279 	if (response == NULL) {
5280 		drbd_err(connection, "kmalloc of response failed\n");
5281 		rv = -1;
5282 		goto fail;
5283 	}
5284 
5285 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5286 	if (rv) {
5287 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5288 		rv = -1;
5289 		goto fail;
5290 	}
5291 
5292 	if (!conn_prepare_command(connection, sock)) {
5293 		rv = 0;
5294 		goto fail;
5295 	}
5296 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5297 				response, resp_size);
5298 	if (!rv)
5299 		goto fail;
5300 
5301 	err = drbd_recv_header(connection, &pi);
5302 	if (err) {
5303 		rv = 0;
5304 		goto fail;
5305 	}
5306 
5307 	if (pi.cmd != P_AUTH_RESPONSE) {
5308 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5309 			 cmdname(pi.cmd), pi.cmd);
5310 		rv = 0;
5311 		goto fail;
5312 	}
5313 
5314 	if (pi.size != resp_size) {
5315 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5316 		rv = 0;
5317 		goto fail;
5318 	}
5319 
5320 	err = drbd_recv_all_warn(connection, response , resp_size);
5321 	if (err) {
5322 		rv = 0;
5323 		goto fail;
5324 	}
5325 
5326 	right_response = kmalloc(resp_size, GFP_NOIO);
5327 	if (right_response == NULL) {
5328 		drbd_err(connection, "kmalloc of right_response failed\n");
5329 		rv = -1;
5330 		goto fail;
5331 	}
5332 
5333 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5334 				 right_response);
5335 	if (rv) {
5336 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5337 		rv = -1;
5338 		goto fail;
5339 	}
5340 
5341 	rv = !memcmp(response, right_response, resp_size);
5342 
5343 	if (rv)
5344 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5345 		     resp_size);
5346 	else
5347 		rv = -1;
5348 
5349  fail:
5350 	kfree(peers_ch);
5351 	kfree(response);
5352 	kfree(right_response);
5353 	shash_desc_zero(desc);
5354 
5355 	return rv;
5356 }
5357 #endif
5358 
5359 int drbd_receiver(struct drbd_thread *thi)
5360 {
5361 	struct drbd_connection *connection = thi->connection;
5362 	int h;
5363 
5364 	drbd_info(connection, "receiver (re)started\n");
5365 
5366 	do {
5367 		h = conn_connect(connection);
5368 		if (h == 0) {
5369 			conn_disconnect(connection);
5370 			schedule_timeout_interruptible(HZ);
5371 		}
5372 		if (h == -1) {
5373 			drbd_warn(connection, "Discarding network configuration.\n");
5374 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5375 		}
5376 	} while (h == 0);
5377 
5378 	if (h > 0)
5379 		drbdd(connection);
5380 
5381 	conn_disconnect(connection);
5382 
5383 	drbd_info(connection, "receiver terminated\n");
5384 	return 0;
5385 }
5386 
5387 /* ********* acknowledge sender ******** */
5388 
5389 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5390 {
5391 	struct p_req_state_reply *p = pi->data;
5392 	int retcode = be32_to_cpu(p->retcode);
5393 
5394 	if (retcode >= SS_SUCCESS) {
5395 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5396 	} else {
5397 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5398 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5399 			 drbd_set_st_err_str(retcode), retcode);
5400 	}
5401 	wake_up(&connection->ping_wait);
5402 
5403 	return 0;
5404 }
5405 
5406 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5407 {
5408 	struct drbd_peer_device *peer_device;
5409 	struct drbd_device *device;
5410 	struct p_req_state_reply *p = pi->data;
5411 	int retcode = be32_to_cpu(p->retcode);
5412 
5413 	peer_device = conn_peer_device(connection, pi->vnr);
5414 	if (!peer_device)
5415 		return -EIO;
5416 	device = peer_device->device;
5417 
5418 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5419 		D_ASSERT(device, connection->agreed_pro_version < 100);
5420 		return got_conn_RqSReply(connection, pi);
5421 	}
5422 
5423 	if (retcode >= SS_SUCCESS) {
5424 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5425 	} else {
5426 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5427 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5428 			drbd_set_st_err_str(retcode), retcode);
5429 	}
5430 	wake_up(&device->state_wait);
5431 
5432 	return 0;
5433 }
5434 
5435 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5436 {
5437 	return drbd_send_ping_ack(connection);
5438 
5439 }
5440 
5441 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5442 {
5443 	/* restore idle timeout */
5444 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5445 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5446 		wake_up(&connection->ping_wait);
5447 
5448 	return 0;
5449 }
5450 
5451 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5452 {
5453 	struct drbd_peer_device *peer_device;
5454 	struct drbd_device *device;
5455 	struct p_block_ack *p = pi->data;
5456 	sector_t sector = be64_to_cpu(p->sector);
5457 	int blksize = be32_to_cpu(p->blksize);
5458 
5459 	peer_device = conn_peer_device(connection, pi->vnr);
5460 	if (!peer_device)
5461 		return -EIO;
5462 	device = peer_device->device;
5463 
5464 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5465 
5466 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5467 
5468 	if (get_ldev(device)) {
5469 		drbd_rs_complete_io(device, sector);
5470 		drbd_set_in_sync(device, sector, blksize);
5471 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5472 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5473 		put_ldev(device);
5474 	}
5475 	dec_rs_pending(device);
5476 	atomic_add(blksize >> 9, &device->rs_sect_in);
5477 
5478 	return 0;
5479 }
5480 
5481 static int
5482 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5483 			      struct rb_root *root, const char *func,
5484 			      enum drbd_req_event what, bool missing_ok)
5485 {
5486 	struct drbd_request *req;
5487 	struct bio_and_error m;
5488 
5489 	spin_lock_irq(&device->resource->req_lock);
5490 	req = find_request(device, root, id, sector, missing_ok, func);
5491 	if (unlikely(!req)) {
5492 		spin_unlock_irq(&device->resource->req_lock);
5493 		return -EIO;
5494 	}
5495 	__req_mod(req, what, &m);
5496 	spin_unlock_irq(&device->resource->req_lock);
5497 
5498 	if (m.bio)
5499 		complete_master_bio(device, &m);
5500 	return 0;
5501 }
5502 
5503 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5504 {
5505 	struct drbd_peer_device *peer_device;
5506 	struct drbd_device *device;
5507 	struct p_block_ack *p = pi->data;
5508 	sector_t sector = be64_to_cpu(p->sector);
5509 	int blksize = be32_to_cpu(p->blksize);
5510 	enum drbd_req_event what;
5511 
5512 	peer_device = conn_peer_device(connection, pi->vnr);
5513 	if (!peer_device)
5514 		return -EIO;
5515 	device = peer_device->device;
5516 
5517 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5518 
5519 	if (p->block_id == ID_SYNCER) {
5520 		drbd_set_in_sync(device, sector, blksize);
5521 		dec_rs_pending(device);
5522 		return 0;
5523 	}
5524 	switch (pi->cmd) {
5525 	case P_RS_WRITE_ACK:
5526 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5527 		break;
5528 	case P_WRITE_ACK:
5529 		what = WRITE_ACKED_BY_PEER;
5530 		break;
5531 	case P_RECV_ACK:
5532 		what = RECV_ACKED_BY_PEER;
5533 		break;
5534 	case P_SUPERSEDED:
5535 		what = CONFLICT_RESOLVED;
5536 		break;
5537 	case P_RETRY_WRITE:
5538 		what = POSTPONE_WRITE;
5539 		break;
5540 	default:
5541 		BUG();
5542 	}
5543 
5544 	return validate_req_change_req_state(device, p->block_id, sector,
5545 					     &device->write_requests, __func__,
5546 					     what, false);
5547 }
5548 
5549 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5550 {
5551 	struct drbd_peer_device *peer_device;
5552 	struct drbd_device *device;
5553 	struct p_block_ack *p = pi->data;
5554 	sector_t sector = be64_to_cpu(p->sector);
5555 	int size = be32_to_cpu(p->blksize);
5556 	int err;
5557 
5558 	peer_device = conn_peer_device(connection, pi->vnr);
5559 	if (!peer_device)
5560 		return -EIO;
5561 	device = peer_device->device;
5562 
5563 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5564 
5565 	if (p->block_id == ID_SYNCER) {
5566 		dec_rs_pending(device);
5567 		drbd_rs_failed_io(device, sector, size);
5568 		return 0;
5569 	}
5570 
5571 	err = validate_req_change_req_state(device, p->block_id, sector,
5572 					    &device->write_requests, __func__,
5573 					    NEG_ACKED, true);
5574 	if (err) {
5575 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5576 		   The master bio might already be completed, therefore the
5577 		   request is no longer in the collision hash. */
5578 		/* In Protocol B we might already have got a P_RECV_ACK
5579 		   but then get a P_NEG_ACK afterwards. */
5580 		drbd_set_out_of_sync(device, sector, size);
5581 	}
5582 	return 0;
5583 }
5584 
5585 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5586 {
5587 	struct drbd_peer_device *peer_device;
5588 	struct drbd_device *device;
5589 	struct p_block_ack *p = pi->data;
5590 	sector_t sector = be64_to_cpu(p->sector);
5591 
5592 	peer_device = conn_peer_device(connection, pi->vnr);
5593 	if (!peer_device)
5594 		return -EIO;
5595 	device = peer_device->device;
5596 
5597 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5598 
5599 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5600 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5601 
5602 	return validate_req_change_req_state(device, p->block_id, sector,
5603 					     &device->read_requests, __func__,
5604 					     NEG_ACKED, false);
5605 }
5606 
5607 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5608 {
5609 	struct drbd_peer_device *peer_device;
5610 	struct drbd_device *device;
5611 	sector_t sector;
5612 	int size;
5613 	struct p_block_ack *p = pi->data;
5614 
5615 	peer_device = conn_peer_device(connection, pi->vnr);
5616 	if (!peer_device)
5617 		return -EIO;
5618 	device = peer_device->device;
5619 
5620 	sector = be64_to_cpu(p->sector);
5621 	size = be32_to_cpu(p->blksize);
5622 
5623 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5624 
5625 	dec_rs_pending(device);
5626 
5627 	if (get_ldev_if_state(device, D_FAILED)) {
5628 		drbd_rs_complete_io(device, sector);
5629 		switch (pi->cmd) {
5630 		case P_NEG_RS_DREPLY:
5631 			drbd_rs_failed_io(device, sector, size);
5632 		case P_RS_CANCEL:
5633 			break;
5634 		default:
5635 			BUG();
5636 		}
5637 		put_ldev(device);
5638 	}
5639 
5640 	return 0;
5641 }
5642 
5643 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5644 {
5645 	struct p_barrier_ack *p = pi->data;
5646 	struct drbd_peer_device *peer_device;
5647 	int vnr;
5648 
5649 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5650 
5651 	rcu_read_lock();
5652 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5653 		struct drbd_device *device = peer_device->device;
5654 
5655 		if (device->state.conn == C_AHEAD &&
5656 		    atomic_read(&device->ap_in_flight) == 0 &&
5657 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5658 			device->start_resync_timer.expires = jiffies + HZ;
5659 			add_timer(&device->start_resync_timer);
5660 		}
5661 	}
5662 	rcu_read_unlock();
5663 
5664 	return 0;
5665 }
5666 
5667 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5668 {
5669 	struct drbd_peer_device *peer_device;
5670 	struct drbd_device *device;
5671 	struct p_block_ack *p = pi->data;
5672 	struct drbd_device_work *dw;
5673 	sector_t sector;
5674 	int size;
5675 
5676 	peer_device = conn_peer_device(connection, pi->vnr);
5677 	if (!peer_device)
5678 		return -EIO;
5679 	device = peer_device->device;
5680 
5681 	sector = be64_to_cpu(p->sector);
5682 	size = be32_to_cpu(p->blksize);
5683 
5684 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5685 
5686 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5687 		drbd_ov_out_of_sync_found(device, sector, size);
5688 	else
5689 		ov_out_of_sync_print(device);
5690 
5691 	if (!get_ldev(device))
5692 		return 0;
5693 
5694 	drbd_rs_complete_io(device, sector);
5695 	dec_rs_pending(device);
5696 
5697 	--device->ov_left;
5698 
5699 	/* let's advance progress step marks only for every other megabyte */
5700 	if ((device->ov_left & 0x200) == 0x200)
5701 		drbd_advance_rs_marks(device, device->ov_left);
5702 
5703 	if (device->ov_left == 0) {
5704 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5705 		if (dw) {
5706 			dw->w.cb = w_ov_finished;
5707 			dw->device = device;
5708 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5709 		} else {
5710 			drbd_err(device, "kmalloc(dw) failed.");
5711 			ov_out_of_sync_print(device);
5712 			drbd_resync_finished(device);
5713 		}
5714 	}
5715 	put_ldev(device);
5716 	return 0;
5717 }
5718 
5719 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5720 {
5721 	return 0;
5722 }
5723 
5724 struct meta_sock_cmd {
5725 	size_t pkt_size;
5726 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5727 };
5728 
5729 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5730 {
5731 	long t;
5732 	struct net_conf *nc;
5733 
5734 	rcu_read_lock();
5735 	nc = rcu_dereference(connection->net_conf);
5736 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5737 	rcu_read_unlock();
5738 
5739 	t *= HZ;
5740 	if (ping_timeout)
5741 		t /= 10;
5742 
5743 	connection->meta.socket->sk->sk_rcvtimeo = t;
5744 }
5745 
5746 static void set_ping_timeout(struct drbd_connection *connection)
5747 {
5748 	set_rcvtimeo(connection, 1);
5749 }
5750 
5751 static void set_idle_timeout(struct drbd_connection *connection)
5752 {
5753 	set_rcvtimeo(connection, 0);
5754 }
5755 
5756 static struct meta_sock_cmd ack_receiver_tbl[] = {
5757 	[P_PING]	    = { 0, got_Ping },
5758 	[P_PING_ACK]	    = { 0, got_PingAck },
5759 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5760 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5761 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5762 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5763 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5764 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5765 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5766 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5767 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5768 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5769 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5770 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5771 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5772 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5773 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5774 };
5775 
5776 int drbd_ack_receiver(struct drbd_thread *thi)
5777 {
5778 	struct drbd_connection *connection = thi->connection;
5779 	struct meta_sock_cmd *cmd = NULL;
5780 	struct packet_info pi;
5781 	unsigned long pre_recv_jif;
5782 	int rv;
5783 	void *buf    = connection->meta.rbuf;
5784 	int received = 0;
5785 	unsigned int header_size = drbd_header_size(connection);
5786 	int expect   = header_size;
5787 	bool ping_timeout_active = false;
5788 	struct sched_param param = { .sched_priority = 2 };
5789 
5790 	rv = sched_setscheduler(current, SCHED_RR, &param);
5791 	if (rv < 0)
5792 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5793 
5794 	while (get_t_state(thi) == RUNNING) {
5795 		drbd_thread_current_set_cpu(thi);
5796 
5797 		conn_reclaim_net_peer_reqs(connection);
5798 
5799 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5800 			if (drbd_send_ping(connection)) {
5801 				drbd_err(connection, "drbd_send_ping has failed\n");
5802 				goto reconnect;
5803 			}
5804 			set_ping_timeout(connection);
5805 			ping_timeout_active = true;
5806 		}
5807 
5808 		pre_recv_jif = jiffies;
5809 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5810 
5811 		/* Note:
5812 		 * -EINTR	 (on meta) we got a signal
5813 		 * -EAGAIN	 (on meta) rcvtimeo expired
5814 		 * -ECONNRESET	 other side closed the connection
5815 		 * -ERESTARTSYS  (on data) we got a signal
5816 		 * rv <  0	 other than above: unexpected error!
5817 		 * rv == expected: full header or command
5818 		 * rv <  expected: "woken" by signal during receive
5819 		 * rv == 0	 : "connection shut down by peer"
5820 		 */
5821 		if (likely(rv > 0)) {
5822 			received += rv;
5823 			buf	 += rv;
5824 		} else if (rv == 0) {
5825 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5826 				long t;
5827 				rcu_read_lock();
5828 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5829 				rcu_read_unlock();
5830 
5831 				t = wait_event_timeout(connection->ping_wait,
5832 						       connection->cstate < C_WF_REPORT_PARAMS,
5833 						       t);
5834 				if (t)
5835 					break;
5836 			}
5837 			drbd_err(connection, "meta connection shut down by peer.\n");
5838 			goto reconnect;
5839 		} else if (rv == -EAGAIN) {
5840 			/* If the data socket received something meanwhile,
5841 			 * that is good enough: peer is still alive. */
5842 			if (time_after(connection->last_received, pre_recv_jif))
5843 				continue;
5844 			if (ping_timeout_active) {
5845 				drbd_err(connection, "PingAck did not arrive in time.\n");
5846 				goto reconnect;
5847 			}
5848 			set_bit(SEND_PING, &connection->flags);
5849 			continue;
5850 		} else if (rv == -EINTR) {
5851 			/* maybe drbd_thread_stop(): the while condition will notice.
5852 			 * maybe woken for send_ping: we'll send a ping above,
5853 			 * and change the rcvtimeo */
5854 			flush_signals(current);
5855 			continue;
5856 		} else {
5857 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5858 			goto reconnect;
5859 		}
5860 
5861 		if (received == expect && cmd == NULL) {
5862 			if (decode_header(connection, connection->meta.rbuf, &pi))
5863 				goto reconnect;
5864 			cmd = &ack_receiver_tbl[pi.cmd];
5865 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5866 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5867 					 cmdname(pi.cmd), pi.cmd);
5868 				goto disconnect;
5869 			}
5870 			expect = header_size + cmd->pkt_size;
5871 			if (pi.size != expect - header_size) {
5872 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5873 					pi.cmd, pi.size);
5874 				goto reconnect;
5875 			}
5876 		}
5877 		if (received == expect) {
5878 			bool err;
5879 
5880 			err = cmd->fn(connection, &pi);
5881 			if (err) {
5882 				drbd_err(connection, "%pf failed\n", cmd->fn);
5883 				goto reconnect;
5884 			}
5885 
5886 			connection->last_received = jiffies;
5887 
5888 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5889 				set_idle_timeout(connection);
5890 				ping_timeout_active = false;
5891 			}
5892 
5893 			buf	 = connection->meta.rbuf;
5894 			received = 0;
5895 			expect	 = header_size;
5896 			cmd	 = NULL;
5897 		}
5898 	}
5899 
5900 	if (0) {
5901 reconnect:
5902 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5903 		conn_md_sync(connection);
5904 	}
5905 	if (0) {
5906 disconnect:
5907 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5908 	}
5909 
5910 	drbd_info(connection, "ack_receiver terminated\n");
5911 
5912 	return 0;
5913 }
5914 
5915 void drbd_send_acks_wf(struct work_struct *ws)
5916 {
5917 	struct drbd_peer_device *peer_device =
5918 		container_of(ws, struct drbd_peer_device, send_acks_work);
5919 	struct drbd_connection *connection = peer_device->connection;
5920 	struct drbd_device *device = peer_device->device;
5921 	struct net_conf *nc;
5922 	int tcp_cork, err;
5923 
5924 	rcu_read_lock();
5925 	nc = rcu_dereference(connection->net_conf);
5926 	tcp_cork = nc->tcp_cork;
5927 	rcu_read_unlock();
5928 
5929 	if (tcp_cork)
5930 		drbd_tcp_cork(connection->meta.socket);
5931 
5932 	err = drbd_finish_peer_reqs(device);
5933 	kref_put(&device->kref, drbd_destroy_device);
5934 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5935 	   struct work_struct send_acks_work alive, which is in the peer_device object */
5936 
5937 	if (err) {
5938 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5939 		return;
5940 	}
5941 
5942 	if (tcp_cork)
5943 		drbd_tcp_uncork(connection->meta.socket);
5944 
5945 	return;
5946 }
5947