1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50 
51 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
52 
53 struct packet_info {
54 	enum drbd_packet cmd;
55 	unsigned int size;
56 	unsigned int vnr;
57 	void *data;
58 };
59 
60 enum finish_epoch {
61 	FE_STILL_LIVE,
62 	FE_DESTROYED,
63 	FE_RECYCLED,
64 };
65 
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72 
73 
74 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
75 
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80 
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87 	struct page *page;
88 	struct page *tmp;
89 
90 	BUG_ON(!n);
91 	BUG_ON(!head);
92 
93 	page = *head;
94 
95 	if (!page)
96 		return NULL;
97 
98 	while (page) {
99 		tmp = page_chain_next(page);
100 		if (--n == 0)
101 			break; /* found sufficient pages */
102 		if (tmp == NULL)
103 			/* insufficient pages, don't use any of them. */
104 			return NULL;
105 		page = tmp;
106 	}
107 
108 	/* add end of list marker for the returned list */
109 	set_page_private(page, 0);
110 	/* actual return value, and adjustment of head */
111 	page = *head;
112 	*head = tmp;
113 	return page;
114 }
115 
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121 	struct page *tmp;
122 	int i = 1;
123 	while ((tmp = page_chain_next(page)))
124 		++i, page = tmp;
125 	if (len)
126 		*len = i;
127 	return page;
128 }
129 
130 static int page_chain_free(struct page *page)
131 {
132 	struct page *tmp;
133 	int i = 0;
134 	page_chain_for_each_safe(page, tmp) {
135 		put_page(page);
136 		++i;
137 	}
138 	return i;
139 }
140 
141 static void page_chain_add(struct page **head,
142 		struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145 	struct page *tmp;
146 	tmp = page_chain_tail(chain_first, NULL);
147 	BUG_ON(tmp != chain_last);
148 #endif
149 
150 	/* add chain to head */
151 	set_page_private(chain_last, (unsigned long)*head);
152 	*head = chain_first;
153 }
154 
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 				       unsigned int number)
157 {
158 	struct page *page = NULL;
159 	struct page *tmp = NULL;
160 	unsigned int i = 0;
161 
162 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
163 	 * So what. It saves a spin_lock. */
164 	if (drbd_pp_vacant >= number) {
165 		spin_lock(&drbd_pp_lock);
166 		page = page_chain_del(&drbd_pp_pool, number);
167 		if (page)
168 			drbd_pp_vacant -= number;
169 		spin_unlock(&drbd_pp_lock);
170 		if (page)
171 			return page;
172 	}
173 
174 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 	 * which in turn might block on the other node at this very place.  */
177 	for (i = 0; i < number; i++) {
178 		tmp = alloc_page(GFP_TRY);
179 		if (!tmp)
180 			break;
181 		set_page_private(tmp, (unsigned long)page);
182 		page = tmp;
183 	}
184 
185 	if (i == number)
186 		return page;
187 
188 	/* Not enough pages immediately available this time.
189 	 * No need to jump around here, drbd_alloc_pages will retry this
190 	 * function "soon". */
191 	if (page) {
192 		tmp = page_chain_tail(page, NULL);
193 		spin_lock(&drbd_pp_lock);
194 		page_chain_add(&drbd_pp_pool, page, tmp);
195 		drbd_pp_vacant += i;
196 		spin_unlock(&drbd_pp_lock);
197 	}
198 	return NULL;
199 }
200 
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 					   struct list_head *to_be_freed)
203 {
204 	struct drbd_peer_request *peer_req, *tmp;
205 
206 	/* The EEs are always appended to the end of the list. Since
207 	   they are sent in order over the wire, they have to finish
208 	   in order. As soon as we see the first not finished we can
209 	   stop to examine the list... */
210 
211 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 		if (drbd_peer_req_has_active_page(peer_req))
213 			break;
214 		list_move(&peer_req->w.list, to_be_freed);
215 	}
216 }
217 
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220 	LIST_HEAD(reclaimed);
221 	struct drbd_peer_request *peer_req, *t;
222 
223 	spin_lock_irq(&device->resource->req_lock);
224 	reclaim_finished_net_peer_reqs(device, &reclaimed);
225 	spin_unlock_irq(&device->resource->req_lock);
226 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227 		drbd_free_net_peer_req(device, peer_req);
228 }
229 
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232 	struct drbd_peer_device *peer_device;
233 	int vnr;
234 
235 	rcu_read_lock();
236 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237 		struct drbd_device *device = peer_device->device;
238 		if (!atomic_read(&device->pp_in_use_by_net))
239 			continue;
240 
241 		kref_get(&device->kref);
242 		rcu_read_unlock();
243 		drbd_reclaim_net_peer_reqs(device);
244 		kref_put(&device->kref, drbd_destroy_device);
245 		rcu_read_lock();
246 	}
247 	rcu_read_unlock();
248 }
249 
250 /**
251  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252  * @device:	DRBD device.
253  * @number:	number of pages requested
254  * @retry:	whether to retry, if not enough pages are available right now
255  *
256  * Tries to allocate number pages, first from our own page pool, then from
257  * the kernel.
258  * Possibly retry until DRBD frees sufficient pages somewhere else.
259  *
260  * If this allocation would exceed the max_buffers setting, we throttle
261  * allocation (schedule_timeout) to give the system some room to breathe.
262  *
263  * We do not use max-buffers as hard limit, because it could lead to
264  * congestion and further to a distributed deadlock during online-verify or
265  * (checksum based) resync, if the max-buffers, socket buffer sizes and
266  * resync-rate settings are mis-configured.
267  *
268  * Returns a page chain linked via page->private.
269  */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271 			      bool retry)
272 {
273 	struct drbd_device *device = peer_device->device;
274 	struct page *page = NULL;
275 	struct net_conf *nc;
276 	DEFINE_WAIT(wait);
277 	unsigned int mxb;
278 
279 	rcu_read_lock();
280 	nc = rcu_dereference(peer_device->connection->net_conf);
281 	mxb = nc ? nc->max_buffers : 1000000;
282 	rcu_read_unlock();
283 
284 	if (atomic_read(&device->pp_in_use) < mxb)
285 		page = __drbd_alloc_pages(device, number);
286 
287 	/* Try to keep the fast path fast, but occasionally we need
288 	 * to reclaim the pages we lended to the network stack. */
289 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290 		drbd_reclaim_net_peer_reqs(device);
291 
292 	while (page == NULL) {
293 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294 
295 		drbd_reclaim_net_peer_reqs(device);
296 
297 		if (atomic_read(&device->pp_in_use) < mxb) {
298 			page = __drbd_alloc_pages(device, number);
299 			if (page)
300 				break;
301 		}
302 
303 		if (!retry)
304 			break;
305 
306 		if (signal_pending(current)) {
307 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308 			break;
309 		}
310 
311 		if (schedule_timeout(HZ/10) == 0)
312 			mxb = UINT_MAX;
313 	}
314 	finish_wait(&drbd_pp_wait, &wait);
315 
316 	if (page)
317 		atomic_add(number, &device->pp_in_use);
318 	return page;
319 }
320 
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323  * Either links the page chain back to the global pool,
324  * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328 	int i;
329 
330 	if (page == NULL)
331 		return;
332 
333 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334 		i = page_chain_free(page);
335 	else {
336 		struct page *tmp;
337 		tmp = page_chain_tail(page, &i);
338 		spin_lock(&drbd_pp_lock);
339 		page_chain_add(&drbd_pp_pool, page, tmp);
340 		drbd_pp_vacant += i;
341 		spin_unlock(&drbd_pp_lock);
342 	}
343 	i = atomic_sub_return(i, a);
344 	if (i < 0)
345 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347 	wake_up(&drbd_pp_wait);
348 }
349 
350 /*
351 You need to hold the req_lock:
352  _drbd_wait_ee_list_empty()
353 
354 You must not have the req_lock:
355  drbd_free_peer_req()
356  drbd_alloc_peer_req()
357  drbd_free_peer_reqs()
358  drbd_ee_fix_bhs()
359  drbd_finish_peer_reqs()
360  drbd_clear_done_ee()
361  drbd_wait_ee_list_empty()
362 */
363 
364 /* normal: payload_size == request size (bi_size)
365  * w_same: payload_size == logical_block_size
366  * trim: payload_size == 0 */
367 struct drbd_peer_request *
368 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
369 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
370 {
371 	struct drbd_device *device = peer_device->device;
372 	struct drbd_peer_request *peer_req;
373 	struct page *page = NULL;
374 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
375 
376 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
377 		return NULL;
378 
379 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
380 	if (!peer_req) {
381 		if (!(gfp_mask & __GFP_NOWARN))
382 			drbd_err(device, "%s: allocation failed\n", __func__);
383 		return NULL;
384 	}
385 
386 	if (nr_pages) {
387 		page = drbd_alloc_pages(peer_device, nr_pages,
388 					gfpflags_allow_blocking(gfp_mask));
389 		if (!page)
390 			goto fail;
391 	}
392 
393 	memset(peer_req, 0, sizeof(*peer_req));
394 	INIT_LIST_HEAD(&peer_req->w.list);
395 	drbd_clear_interval(&peer_req->i);
396 	peer_req->i.size = request_size;
397 	peer_req->i.sector = sector;
398 	peer_req->submit_jif = jiffies;
399 	peer_req->peer_device = peer_device;
400 	peer_req->pages = page;
401 	/*
402 	 * The block_id is opaque to the receiver.  It is not endianness
403 	 * converted, and sent back to the sender unchanged.
404 	 */
405 	peer_req->block_id = id;
406 
407 	return peer_req;
408 
409  fail:
410 	mempool_free(peer_req, drbd_ee_mempool);
411 	return NULL;
412 }
413 
414 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
415 		       int is_net)
416 {
417 	might_sleep();
418 	if (peer_req->flags & EE_HAS_DIGEST)
419 		kfree(peer_req->digest);
420 	drbd_free_pages(device, peer_req->pages, is_net);
421 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
422 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
423 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
424 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
425 		drbd_al_complete_io(device, &peer_req->i);
426 	}
427 	mempool_free(peer_req, drbd_ee_mempool);
428 }
429 
430 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
431 {
432 	LIST_HEAD(work_list);
433 	struct drbd_peer_request *peer_req, *t;
434 	int count = 0;
435 	int is_net = list == &device->net_ee;
436 
437 	spin_lock_irq(&device->resource->req_lock);
438 	list_splice_init(list, &work_list);
439 	spin_unlock_irq(&device->resource->req_lock);
440 
441 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
442 		__drbd_free_peer_req(device, peer_req, is_net);
443 		count++;
444 	}
445 	return count;
446 }
447 
448 /*
449  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
450  */
451 static int drbd_finish_peer_reqs(struct drbd_device *device)
452 {
453 	LIST_HEAD(work_list);
454 	LIST_HEAD(reclaimed);
455 	struct drbd_peer_request *peer_req, *t;
456 	int err = 0;
457 
458 	spin_lock_irq(&device->resource->req_lock);
459 	reclaim_finished_net_peer_reqs(device, &reclaimed);
460 	list_splice_init(&device->done_ee, &work_list);
461 	spin_unlock_irq(&device->resource->req_lock);
462 
463 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
464 		drbd_free_net_peer_req(device, peer_req);
465 
466 	/* possible callbacks here:
467 	 * e_end_block, and e_end_resync_block, e_send_superseded.
468 	 * all ignore the last argument.
469 	 */
470 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
471 		int err2;
472 
473 		/* list_del not necessary, next/prev members not touched */
474 		err2 = peer_req->w.cb(&peer_req->w, !!err);
475 		if (!err)
476 			err = err2;
477 		drbd_free_peer_req(device, peer_req);
478 	}
479 	wake_up(&device->ee_wait);
480 
481 	return err;
482 }
483 
484 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
485 				     struct list_head *head)
486 {
487 	DEFINE_WAIT(wait);
488 
489 	/* avoids spin_lock/unlock
490 	 * and calling prepare_to_wait in the fast path */
491 	while (!list_empty(head)) {
492 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
493 		spin_unlock_irq(&device->resource->req_lock);
494 		io_schedule();
495 		finish_wait(&device->ee_wait, &wait);
496 		spin_lock_irq(&device->resource->req_lock);
497 	}
498 }
499 
500 static void drbd_wait_ee_list_empty(struct drbd_device *device,
501 				    struct list_head *head)
502 {
503 	spin_lock_irq(&device->resource->req_lock);
504 	_drbd_wait_ee_list_empty(device, head);
505 	spin_unlock_irq(&device->resource->req_lock);
506 }
507 
508 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
509 {
510 	struct kvec iov = {
511 		.iov_base = buf,
512 		.iov_len = size,
513 	};
514 	struct msghdr msg = {
515 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
516 	};
517 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
518 }
519 
520 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
521 {
522 	int rv;
523 
524 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
525 
526 	if (rv < 0) {
527 		if (rv == -ECONNRESET)
528 			drbd_info(connection, "sock was reset by peer\n");
529 		else if (rv != -ERESTARTSYS)
530 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
531 	} else if (rv == 0) {
532 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
533 			long t;
534 			rcu_read_lock();
535 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
536 			rcu_read_unlock();
537 
538 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
539 
540 			if (t)
541 				goto out;
542 		}
543 		drbd_info(connection, "sock was shut down by peer\n");
544 	}
545 
546 	if (rv != size)
547 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
548 
549 out:
550 	return rv;
551 }
552 
553 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
554 {
555 	int err;
556 
557 	err = drbd_recv(connection, buf, size);
558 	if (err != size) {
559 		if (err >= 0)
560 			err = -EIO;
561 	} else
562 		err = 0;
563 	return err;
564 }
565 
566 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
567 {
568 	int err;
569 
570 	err = drbd_recv_all(connection, buf, size);
571 	if (err && !signal_pending(current))
572 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
573 	return err;
574 }
575 
576 /* quoting tcp(7):
577  *   On individual connections, the socket buffer size must be set prior to the
578  *   listen(2) or connect(2) calls in order to have it take effect.
579  * This is our wrapper to do so.
580  */
581 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
582 		unsigned int rcv)
583 {
584 	/* open coded SO_SNDBUF, SO_RCVBUF */
585 	if (snd) {
586 		sock->sk->sk_sndbuf = snd;
587 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
588 	}
589 	if (rcv) {
590 		sock->sk->sk_rcvbuf = rcv;
591 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
592 	}
593 }
594 
595 static struct socket *drbd_try_connect(struct drbd_connection *connection)
596 {
597 	const char *what;
598 	struct socket *sock;
599 	struct sockaddr_in6 src_in6;
600 	struct sockaddr_in6 peer_in6;
601 	struct net_conf *nc;
602 	int err, peer_addr_len, my_addr_len;
603 	int sndbuf_size, rcvbuf_size, connect_int;
604 	int disconnect_on_error = 1;
605 
606 	rcu_read_lock();
607 	nc = rcu_dereference(connection->net_conf);
608 	if (!nc) {
609 		rcu_read_unlock();
610 		return NULL;
611 	}
612 	sndbuf_size = nc->sndbuf_size;
613 	rcvbuf_size = nc->rcvbuf_size;
614 	connect_int = nc->connect_int;
615 	rcu_read_unlock();
616 
617 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
618 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
619 
620 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
621 		src_in6.sin6_port = 0;
622 	else
623 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
624 
625 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
626 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
627 
628 	what = "sock_create_kern";
629 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
630 			       SOCK_STREAM, IPPROTO_TCP, &sock);
631 	if (err < 0) {
632 		sock = NULL;
633 		goto out;
634 	}
635 
636 	sock->sk->sk_rcvtimeo =
637 	sock->sk->sk_sndtimeo = connect_int * HZ;
638 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
639 
640        /* explicitly bind to the configured IP as source IP
641 	*  for the outgoing connections.
642 	*  This is needed for multihomed hosts and to be
643 	*  able to use lo: interfaces for drbd.
644 	* Make sure to use 0 as port number, so linux selects
645 	*  a free one dynamically.
646 	*/
647 	what = "bind before connect";
648 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
649 	if (err < 0)
650 		goto out;
651 
652 	/* connect may fail, peer not yet available.
653 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
654 	disconnect_on_error = 0;
655 	what = "connect";
656 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
657 
658 out:
659 	if (err < 0) {
660 		if (sock) {
661 			sock_release(sock);
662 			sock = NULL;
663 		}
664 		switch (-err) {
665 			/* timeout, busy, signal pending */
666 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667 		case EINTR: case ERESTARTSYS:
668 			/* peer not (yet) available, network problem */
669 		case ECONNREFUSED: case ENETUNREACH:
670 		case EHOSTDOWN:    case EHOSTUNREACH:
671 			disconnect_on_error = 0;
672 			break;
673 		default:
674 			drbd_err(connection, "%s failed, err = %d\n", what, err);
675 		}
676 		if (disconnect_on_error)
677 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
678 	}
679 
680 	return sock;
681 }
682 
683 struct accept_wait_data {
684 	struct drbd_connection *connection;
685 	struct socket *s_listen;
686 	struct completion door_bell;
687 	void (*original_sk_state_change)(struct sock *sk);
688 
689 };
690 
691 static void drbd_incoming_connection(struct sock *sk)
692 {
693 	struct accept_wait_data *ad = sk->sk_user_data;
694 	void (*state_change)(struct sock *sk);
695 
696 	state_change = ad->original_sk_state_change;
697 	if (sk->sk_state == TCP_ESTABLISHED)
698 		complete(&ad->door_bell);
699 	state_change(sk);
700 }
701 
702 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
703 {
704 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
705 	struct sockaddr_in6 my_addr;
706 	struct socket *s_listen;
707 	struct net_conf *nc;
708 	const char *what;
709 
710 	rcu_read_lock();
711 	nc = rcu_dereference(connection->net_conf);
712 	if (!nc) {
713 		rcu_read_unlock();
714 		return -EIO;
715 	}
716 	sndbuf_size = nc->sndbuf_size;
717 	rcvbuf_size = nc->rcvbuf_size;
718 	rcu_read_unlock();
719 
720 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
721 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
722 
723 	what = "sock_create_kern";
724 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
725 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
726 	if (err) {
727 		s_listen = NULL;
728 		goto out;
729 	}
730 
731 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
732 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
733 
734 	what = "bind before listen";
735 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
736 	if (err < 0)
737 		goto out;
738 
739 	ad->s_listen = s_listen;
740 	write_lock_bh(&s_listen->sk->sk_callback_lock);
741 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
742 	s_listen->sk->sk_state_change = drbd_incoming_connection;
743 	s_listen->sk->sk_user_data = ad;
744 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
745 
746 	what = "listen";
747 	err = s_listen->ops->listen(s_listen, 5);
748 	if (err < 0)
749 		goto out;
750 
751 	return 0;
752 out:
753 	if (s_listen)
754 		sock_release(s_listen);
755 	if (err < 0) {
756 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
757 			drbd_err(connection, "%s failed, err = %d\n", what, err);
758 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
759 		}
760 	}
761 
762 	return -EIO;
763 }
764 
765 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
766 {
767 	write_lock_bh(&sk->sk_callback_lock);
768 	sk->sk_state_change = ad->original_sk_state_change;
769 	sk->sk_user_data = NULL;
770 	write_unlock_bh(&sk->sk_callback_lock);
771 }
772 
773 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
774 {
775 	int timeo, connect_int, err = 0;
776 	struct socket *s_estab = NULL;
777 	struct net_conf *nc;
778 
779 	rcu_read_lock();
780 	nc = rcu_dereference(connection->net_conf);
781 	if (!nc) {
782 		rcu_read_unlock();
783 		return NULL;
784 	}
785 	connect_int = nc->connect_int;
786 	rcu_read_unlock();
787 
788 	timeo = connect_int * HZ;
789 	/* 28.5% random jitter */
790 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
791 
792 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
793 	if (err <= 0)
794 		return NULL;
795 
796 	err = kernel_accept(ad->s_listen, &s_estab, 0);
797 	if (err < 0) {
798 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
799 			drbd_err(connection, "accept failed, err = %d\n", err);
800 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
801 		}
802 	}
803 
804 	if (s_estab)
805 		unregister_state_change(s_estab->sk, ad);
806 
807 	return s_estab;
808 }
809 
810 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
811 
812 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
813 			     enum drbd_packet cmd)
814 {
815 	if (!conn_prepare_command(connection, sock))
816 		return -EIO;
817 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
818 }
819 
820 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
821 {
822 	unsigned int header_size = drbd_header_size(connection);
823 	struct packet_info pi;
824 	struct net_conf *nc;
825 	int err;
826 
827 	rcu_read_lock();
828 	nc = rcu_dereference(connection->net_conf);
829 	if (!nc) {
830 		rcu_read_unlock();
831 		return -EIO;
832 	}
833 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
834 	rcu_read_unlock();
835 
836 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
837 	if (err != header_size) {
838 		if (err >= 0)
839 			err = -EIO;
840 		return err;
841 	}
842 	err = decode_header(connection, connection->data.rbuf, &pi);
843 	if (err)
844 		return err;
845 	return pi.cmd;
846 }
847 
848 /**
849  * drbd_socket_okay() - Free the socket if its connection is not okay
850  * @sock:	pointer to the pointer to the socket.
851  */
852 static bool drbd_socket_okay(struct socket **sock)
853 {
854 	int rr;
855 	char tb[4];
856 
857 	if (!*sock)
858 		return false;
859 
860 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
861 
862 	if (rr > 0 || rr == -EAGAIN) {
863 		return true;
864 	} else {
865 		sock_release(*sock);
866 		*sock = NULL;
867 		return false;
868 	}
869 }
870 
871 static bool connection_established(struct drbd_connection *connection,
872 				   struct socket **sock1,
873 				   struct socket **sock2)
874 {
875 	struct net_conf *nc;
876 	int timeout;
877 	bool ok;
878 
879 	if (!*sock1 || !*sock2)
880 		return false;
881 
882 	rcu_read_lock();
883 	nc = rcu_dereference(connection->net_conf);
884 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
885 	rcu_read_unlock();
886 	schedule_timeout_interruptible(timeout);
887 
888 	ok = drbd_socket_okay(sock1);
889 	ok = drbd_socket_okay(sock2) && ok;
890 
891 	return ok;
892 }
893 
894 /* Gets called if a connection is established, or if a new minor gets created
895    in a connection */
896 int drbd_connected(struct drbd_peer_device *peer_device)
897 {
898 	struct drbd_device *device = peer_device->device;
899 	int err;
900 
901 	atomic_set(&device->packet_seq, 0);
902 	device->peer_seq = 0;
903 
904 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
905 		&peer_device->connection->cstate_mutex :
906 		&device->own_state_mutex;
907 
908 	err = drbd_send_sync_param(peer_device);
909 	if (!err)
910 		err = drbd_send_sizes(peer_device, 0, 0);
911 	if (!err)
912 		err = drbd_send_uuids(peer_device);
913 	if (!err)
914 		err = drbd_send_current_state(peer_device);
915 	clear_bit(USE_DEGR_WFC_T, &device->flags);
916 	clear_bit(RESIZE_PENDING, &device->flags);
917 	atomic_set(&device->ap_in_flight, 0);
918 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
919 	return err;
920 }
921 
922 /*
923  * return values:
924  *   1 yes, we have a valid connection
925  *   0 oops, did not work out, please try again
926  *  -1 peer talks different language,
927  *     no point in trying again, please go standalone.
928  *  -2 We do not have a network config...
929  */
930 static int conn_connect(struct drbd_connection *connection)
931 {
932 	struct drbd_socket sock, msock;
933 	struct drbd_peer_device *peer_device;
934 	struct net_conf *nc;
935 	int vnr, timeout, h;
936 	bool discard_my_data, ok;
937 	enum drbd_state_rv rv;
938 	struct accept_wait_data ad = {
939 		.connection = connection,
940 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
941 	};
942 
943 	clear_bit(DISCONNECT_SENT, &connection->flags);
944 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
945 		return -2;
946 
947 	mutex_init(&sock.mutex);
948 	sock.sbuf = connection->data.sbuf;
949 	sock.rbuf = connection->data.rbuf;
950 	sock.socket = NULL;
951 	mutex_init(&msock.mutex);
952 	msock.sbuf = connection->meta.sbuf;
953 	msock.rbuf = connection->meta.rbuf;
954 	msock.socket = NULL;
955 
956 	/* Assume that the peer only understands protocol 80 until we know better.  */
957 	connection->agreed_pro_version = 80;
958 
959 	if (prepare_listen_socket(connection, &ad))
960 		return 0;
961 
962 	do {
963 		struct socket *s;
964 
965 		s = drbd_try_connect(connection);
966 		if (s) {
967 			if (!sock.socket) {
968 				sock.socket = s;
969 				send_first_packet(connection, &sock, P_INITIAL_DATA);
970 			} else if (!msock.socket) {
971 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
972 				msock.socket = s;
973 				send_first_packet(connection, &msock, P_INITIAL_META);
974 			} else {
975 				drbd_err(connection, "Logic error in conn_connect()\n");
976 				goto out_release_sockets;
977 			}
978 		}
979 
980 		if (connection_established(connection, &sock.socket, &msock.socket))
981 			break;
982 
983 retry:
984 		s = drbd_wait_for_connect(connection, &ad);
985 		if (s) {
986 			int fp = receive_first_packet(connection, s);
987 			drbd_socket_okay(&sock.socket);
988 			drbd_socket_okay(&msock.socket);
989 			switch (fp) {
990 			case P_INITIAL_DATA:
991 				if (sock.socket) {
992 					drbd_warn(connection, "initial packet S crossed\n");
993 					sock_release(sock.socket);
994 					sock.socket = s;
995 					goto randomize;
996 				}
997 				sock.socket = s;
998 				break;
999 			case P_INITIAL_META:
1000 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1001 				if (msock.socket) {
1002 					drbd_warn(connection, "initial packet M crossed\n");
1003 					sock_release(msock.socket);
1004 					msock.socket = s;
1005 					goto randomize;
1006 				}
1007 				msock.socket = s;
1008 				break;
1009 			default:
1010 				drbd_warn(connection, "Error receiving initial packet\n");
1011 				sock_release(s);
1012 randomize:
1013 				if (prandom_u32() & 1)
1014 					goto retry;
1015 			}
1016 		}
1017 
1018 		if (connection->cstate <= C_DISCONNECTING)
1019 			goto out_release_sockets;
1020 		if (signal_pending(current)) {
1021 			flush_signals(current);
1022 			smp_rmb();
1023 			if (get_t_state(&connection->receiver) == EXITING)
1024 				goto out_release_sockets;
1025 		}
1026 
1027 		ok = connection_established(connection, &sock.socket, &msock.socket);
1028 	} while (!ok);
1029 
1030 	if (ad.s_listen)
1031 		sock_release(ad.s_listen);
1032 
1033 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1034 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1035 
1036 	sock.socket->sk->sk_allocation = GFP_NOIO;
1037 	msock.socket->sk->sk_allocation = GFP_NOIO;
1038 
1039 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1040 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1041 
1042 	/* NOT YET ...
1043 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1044 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1045 	 * first set it to the P_CONNECTION_FEATURES timeout,
1046 	 * which we set to 4x the configured ping_timeout. */
1047 	rcu_read_lock();
1048 	nc = rcu_dereference(connection->net_conf);
1049 
1050 	sock.socket->sk->sk_sndtimeo =
1051 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1052 
1053 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1054 	timeout = nc->timeout * HZ / 10;
1055 	discard_my_data = nc->discard_my_data;
1056 	rcu_read_unlock();
1057 
1058 	msock.socket->sk->sk_sndtimeo = timeout;
1059 
1060 	/* we don't want delays.
1061 	 * we use TCP_CORK where appropriate, though */
1062 	drbd_tcp_nodelay(sock.socket);
1063 	drbd_tcp_nodelay(msock.socket);
1064 
1065 	connection->data.socket = sock.socket;
1066 	connection->meta.socket = msock.socket;
1067 	connection->last_received = jiffies;
1068 
1069 	h = drbd_do_features(connection);
1070 	if (h <= 0)
1071 		return h;
1072 
1073 	if (connection->cram_hmac_tfm) {
1074 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1075 		switch (drbd_do_auth(connection)) {
1076 		case -1:
1077 			drbd_err(connection, "Authentication of peer failed\n");
1078 			return -1;
1079 		case 0:
1080 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1081 			return 0;
1082 		}
1083 	}
1084 
1085 	connection->data.socket->sk->sk_sndtimeo = timeout;
1086 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1087 
1088 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1089 		return -1;
1090 
1091 	/* Prevent a race between resync-handshake and
1092 	 * being promoted to Primary.
1093 	 *
1094 	 * Grab and release the state mutex, so we know that any current
1095 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1096 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1097 	 */
1098 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1099 		mutex_lock(peer_device->device->state_mutex);
1100 
1101 	set_bit(STATE_SENT, &connection->flags);
1102 
1103 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1104 		mutex_unlock(peer_device->device->state_mutex);
1105 
1106 	rcu_read_lock();
1107 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1108 		struct drbd_device *device = peer_device->device;
1109 		kref_get(&device->kref);
1110 		rcu_read_unlock();
1111 
1112 		if (discard_my_data)
1113 			set_bit(DISCARD_MY_DATA, &device->flags);
1114 		else
1115 			clear_bit(DISCARD_MY_DATA, &device->flags);
1116 
1117 		drbd_connected(peer_device);
1118 		kref_put(&device->kref, drbd_destroy_device);
1119 		rcu_read_lock();
1120 	}
1121 	rcu_read_unlock();
1122 
1123 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1124 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1125 		clear_bit(STATE_SENT, &connection->flags);
1126 		return 0;
1127 	}
1128 
1129 	drbd_thread_start(&connection->ack_receiver);
1130 	/* opencoded create_singlethread_workqueue(),
1131 	 * to be able to use format string arguments */
1132 	connection->ack_sender =
1133 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1134 	if (!connection->ack_sender) {
1135 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1136 		return 0;
1137 	}
1138 
1139 	mutex_lock(&connection->resource->conf_update);
1140 	/* The discard_my_data flag is a single-shot modifier to the next
1141 	 * connection attempt, the handshake of which is now well underway.
1142 	 * No need for rcu style copying of the whole struct
1143 	 * just to clear a single value. */
1144 	connection->net_conf->discard_my_data = 0;
1145 	mutex_unlock(&connection->resource->conf_update);
1146 
1147 	return h;
1148 
1149 out_release_sockets:
1150 	if (ad.s_listen)
1151 		sock_release(ad.s_listen);
1152 	if (sock.socket)
1153 		sock_release(sock.socket);
1154 	if (msock.socket)
1155 		sock_release(msock.socket);
1156 	return -1;
1157 }
1158 
1159 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1160 {
1161 	unsigned int header_size = drbd_header_size(connection);
1162 
1163 	if (header_size == sizeof(struct p_header100) &&
1164 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1165 		struct p_header100 *h = header;
1166 		if (h->pad != 0) {
1167 			drbd_err(connection, "Header padding is not zero\n");
1168 			return -EINVAL;
1169 		}
1170 		pi->vnr = be16_to_cpu(h->volume);
1171 		pi->cmd = be16_to_cpu(h->command);
1172 		pi->size = be32_to_cpu(h->length);
1173 	} else if (header_size == sizeof(struct p_header95) &&
1174 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1175 		struct p_header95 *h = header;
1176 		pi->cmd = be16_to_cpu(h->command);
1177 		pi->size = be32_to_cpu(h->length);
1178 		pi->vnr = 0;
1179 	} else if (header_size == sizeof(struct p_header80) &&
1180 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1181 		struct p_header80 *h = header;
1182 		pi->cmd = be16_to_cpu(h->command);
1183 		pi->size = be16_to_cpu(h->length);
1184 		pi->vnr = 0;
1185 	} else {
1186 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1187 			 be32_to_cpu(*(__be32 *)header),
1188 			 connection->agreed_pro_version);
1189 		return -EINVAL;
1190 	}
1191 	pi->data = header + header_size;
1192 	return 0;
1193 }
1194 
1195 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1196 {
1197 	void *buffer = connection->data.rbuf;
1198 	int err;
1199 
1200 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1201 	if (err)
1202 		return err;
1203 
1204 	err = decode_header(connection, buffer, pi);
1205 	connection->last_received = jiffies;
1206 
1207 	return err;
1208 }
1209 
1210 /* This is blkdev_issue_flush, but asynchronous.
1211  * We want to submit to all component volumes in parallel,
1212  * then wait for all completions.
1213  */
1214 struct issue_flush_context {
1215 	atomic_t pending;
1216 	int error;
1217 	struct completion done;
1218 };
1219 struct one_flush_context {
1220 	struct drbd_device *device;
1221 	struct issue_flush_context *ctx;
1222 };
1223 
1224 void one_flush_endio(struct bio *bio)
1225 {
1226 	struct one_flush_context *octx = bio->bi_private;
1227 	struct drbd_device *device = octx->device;
1228 	struct issue_flush_context *ctx = octx->ctx;
1229 
1230 	if (bio->bi_error) {
1231 		ctx->error = bio->bi_error;
1232 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1233 	}
1234 	kfree(octx);
1235 	bio_put(bio);
1236 
1237 	clear_bit(FLUSH_PENDING, &device->flags);
1238 	put_ldev(device);
1239 	kref_put(&device->kref, drbd_destroy_device);
1240 
1241 	if (atomic_dec_and_test(&ctx->pending))
1242 		complete(&ctx->done);
1243 }
1244 
1245 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1246 {
1247 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1248 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1249 	if (!bio || !octx) {
1250 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1251 		/* FIXME: what else can I do now?  disconnecting or detaching
1252 		 * really does not help to improve the state of the world, either.
1253 		 */
1254 		kfree(octx);
1255 		if (bio)
1256 			bio_put(bio);
1257 
1258 		ctx->error = -ENOMEM;
1259 		put_ldev(device);
1260 		kref_put(&device->kref, drbd_destroy_device);
1261 		return;
1262 	}
1263 
1264 	octx->device = device;
1265 	octx->ctx = ctx;
1266 	bio->bi_bdev = device->ldev->backing_bdev;
1267 	bio->bi_private = octx;
1268 	bio->bi_end_io = one_flush_endio;
1269 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1270 
1271 	device->flush_jif = jiffies;
1272 	set_bit(FLUSH_PENDING, &device->flags);
1273 	atomic_inc(&ctx->pending);
1274 	submit_bio(bio);
1275 }
1276 
1277 static void drbd_flush(struct drbd_connection *connection)
1278 {
1279 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1280 		struct drbd_peer_device *peer_device;
1281 		struct issue_flush_context ctx;
1282 		int vnr;
1283 
1284 		atomic_set(&ctx.pending, 1);
1285 		ctx.error = 0;
1286 		init_completion(&ctx.done);
1287 
1288 		rcu_read_lock();
1289 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1290 			struct drbd_device *device = peer_device->device;
1291 
1292 			if (!get_ldev(device))
1293 				continue;
1294 			kref_get(&device->kref);
1295 			rcu_read_unlock();
1296 
1297 			submit_one_flush(device, &ctx);
1298 
1299 			rcu_read_lock();
1300 		}
1301 		rcu_read_unlock();
1302 
1303 		/* Do we want to add a timeout,
1304 		 * if disk-timeout is set? */
1305 		if (!atomic_dec_and_test(&ctx.pending))
1306 			wait_for_completion(&ctx.done);
1307 
1308 		if (ctx.error) {
1309 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1310 			 * don't try again for ANY return value != 0
1311 			 * if (rv == -EOPNOTSUPP) */
1312 			/* Any error is already reported by bio_endio callback. */
1313 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1314 		}
1315 	}
1316 }
1317 
1318 /**
1319  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1320  * @device:	DRBD device.
1321  * @epoch:	Epoch object.
1322  * @ev:		Epoch event.
1323  */
1324 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1325 					       struct drbd_epoch *epoch,
1326 					       enum epoch_event ev)
1327 {
1328 	int epoch_size;
1329 	struct drbd_epoch *next_epoch;
1330 	enum finish_epoch rv = FE_STILL_LIVE;
1331 
1332 	spin_lock(&connection->epoch_lock);
1333 	do {
1334 		next_epoch = NULL;
1335 
1336 		epoch_size = atomic_read(&epoch->epoch_size);
1337 
1338 		switch (ev & ~EV_CLEANUP) {
1339 		case EV_PUT:
1340 			atomic_dec(&epoch->active);
1341 			break;
1342 		case EV_GOT_BARRIER_NR:
1343 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1344 			break;
1345 		case EV_BECAME_LAST:
1346 			/* nothing to do*/
1347 			break;
1348 		}
1349 
1350 		if (epoch_size != 0 &&
1351 		    atomic_read(&epoch->active) == 0 &&
1352 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1353 			if (!(ev & EV_CLEANUP)) {
1354 				spin_unlock(&connection->epoch_lock);
1355 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1356 				spin_lock(&connection->epoch_lock);
1357 			}
1358 #if 0
1359 			/* FIXME: dec unacked on connection, once we have
1360 			 * something to count pending connection packets in. */
1361 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1362 				dec_unacked(epoch->connection);
1363 #endif
1364 
1365 			if (connection->current_epoch != epoch) {
1366 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1367 				list_del(&epoch->list);
1368 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1369 				connection->epochs--;
1370 				kfree(epoch);
1371 
1372 				if (rv == FE_STILL_LIVE)
1373 					rv = FE_DESTROYED;
1374 			} else {
1375 				epoch->flags = 0;
1376 				atomic_set(&epoch->epoch_size, 0);
1377 				/* atomic_set(&epoch->active, 0); is already zero */
1378 				if (rv == FE_STILL_LIVE)
1379 					rv = FE_RECYCLED;
1380 			}
1381 		}
1382 
1383 		if (!next_epoch)
1384 			break;
1385 
1386 		epoch = next_epoch;
1387 	} while (1);
1388 
1389 	spin_unlock(&connection->epoch_lock);
1390 
1391 	return rv;
1392 }
1393 
1394 static enum write_ordering_e
1395 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1396 {
1397 	struct disk_conf *dc;
1398 
1399 	dc = rcu_dereference(bdev->disk_conf);
1400 
1401 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1402 		wo = WO_DRAIN_IO;
1403 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1404 		wo = WO_NONE;
1405 
1406 	return wo;
1407 }
1408 
1409 /**
1410  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1411  * @connection:	DRBD connection.
1412  * @wo:		Write ordering method to try.
1413  */
1414 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1415 			      enum write_ordering_e wo)
1416 {
1417 	struct drbd_device *device;
1418 	enum write_ordering_e pwo;
1419 	int vnr;
1420 	static char *write_ordering_str[] = {
1421 		[WO_NONE] = "none",
1422 		[WO_DRAIN_IO] = "drain",
1423 		[WO_BDEV_FLUSH] = "flush",
1424 	};
1425 
1426 	pwo = resource->write_ordering;
1427 	if (wo != WO_BDEV_FLUSH)
1428 		wo = min(pwo, wo);
1429 	rcu_read_lock();
1430 	idr_for_each_entry(&resource->devices, device, vnr) {
1431 		if (get_ldev(device)) {
1432 			wo = max_allowed_wo(device->ldev, wo);
1433 			if (device->ldev == bdev)
1434 				bdev = NULL;
1435 			put_ldev(device);
1436 		}
1437 	}
1438 
1439 	if (bdev)
1440 		wo = max_allowed_wo(bdev, wo);
1441 
1442 	rcu_read_unlock();
1443 
1444 	resource->write_ordering = wo;
1445 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1446 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1447 }
1448 
1449 /*
1450  * We *may* ignore the discard-zeroes-data setting, if so configured.
1451  *
1452  * Assumption is that it "discard_zeroes_data=0" is only because the backend
1453  * may ignore partial unaligned discards.
1454  *
1455  * LVM/DM thin as of at least
1456  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1457  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1458  *   Driver version:  4.29.0
1459  * still behaves this way.
1460  *
1461  * For unaligned (wrt. alignment and granularity) or too small discards,
1462  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1463  * but discard all the aligned full chunks.
1464  *
1465  * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1466  */
1467 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1468 {
1469 	struct block_device *bdev = device->ldev->backing_bdev;
1470 	struct request_queue *q = bdev_get_queue(bdev);
1471 	sector_t tmp, nr;
1472 	unsigned int max_discard_sectors, granularity;
1473 	int alignment;
1474 	int err = 0;
1475 
1476 	if (!discard)
1477 		goto zero_out;
1478 
1479 	/* Zero-sector (unknown) and one-sector granularities are the same.  */
1480 	granularity = max(q->limits.discard_granularity >> 9, 1U);
1481 	alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1482 
1483 	max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1484 	max_discard_sectors -= max_discard_sectors % granularity;
1485 	if (unlikely(!max_discard_sectors))
1486 		goto zero_out;
1487 
1488 	if (nr_sectors < granularity)
1489 		goto zero_out;
1490 
1491 	tmp = start;
1492 	if (sector_div(tmp, granularity) != alignment) {
1493 		if (nr_sectors < 2*granularity)
1494 			goto zero_out;
1495 		/* start + gran - (start + gran - align) % gran */
1496 		tmp = start + granularity - alignment;
1497 		tmp = start + granularity - sector_div(tmp, granularity);
1498 
1499 		nr = tmp - start;
1500 		err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1501 		nr_sectors -= nr;
1502 		start = tmp;
1503 	}
1504 	while (nr_sectors >= granularity) {
1505 		nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1506 		err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1507 		nr_sectors -= nr;
1508 		start += nr;
1509 	}
1510  zero_out:
1511 	if (nr_sectors) {
1512 		err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1513 	}
1514 	return err != 0;
1515 }
1516 
1517 static bool can_do_reliable_discards(struct drbd_device *device)
1518 {
1519 	struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1520 	struct disk_conf *dc;
1521 	bool can_do;
1522 
1523 	if (!blk_queue_discard(q))
1524 		return false;
1525 
1526 	if (q->limits.discard_zeroes_data)
1527 		return true;
1528 
1529 	rcu_read_lock();
1530 	dc = rcu_dereference(device->ldev->disk_conf);
1531 	can_do = dc->discard_zeroes_if_aligned;
1532 	rcu_read_unlock();
1533 	return can_do;
1534 }
1535 
1536 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1537 {
1538 	/* If the backend cannot discard, or does not guarantee
1539 	 * read-back zeroes in discarded ranges, we fall back to
1540 	 * zero-out.  Unless configuration specifically requested
1541 	 * otherwise. */
1542 	if (!can_do_reliable_discards(device))
1543 		peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1544 
1545 	if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1546 	    peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1547 		peer_req->flags |= EE_WAS_ERROR;
1548 	drbd_endio_write_sec_final(peer_req);
1549 }
1550 
1551 static void drbd_issue_peer_wsame(struct drbd_device *device,
1552 				  struct drbd_peer_request *peer_req)
1553 {
1554 	struct block_device *bdev = device->ldev->backing_bdev;
1555 	sector_t s = peer_req->i.sector;
1556 	sector_t nr = peer_req->i.size >> 9;
1557 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1558 		peer_req->flags |= EE_WAS_ERROR;
1559 	drbd_endio_write_sec_final(peer_req);
1560 }
1561 
1562 
1563 /**
1564  * drbd_submit_peer_request()
1565  * @device:	DRBD device.
1566  * @peer_req:	peer request
1567  * @rw:		flag field, see bio->bi_opf
1568  *
1569  * May spread the pages to multiple bios,
1570  * depending on bio_add_page restrictions.
1571  *
1572  * Returns 0 if all bios have been submitted,
1573  * -ENOMEM if we could not allocate enough bios,
1574  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1575  *  single page to an empty bio (which should never happen and likely indicates
1576  *  that the lower level IO stack is in some way broken). This has been observed
1577  *  on certain Xen deployments.
1578  */
1579 /* TODO allocate from our own bio_set. */
1580 int drbd_submit_peer_request(struct drbd_device *device,
1581 			     struct drbd_peer_request *peer_req,
1582 			     const unsigned op, const unsigned op_flags,
1583 			     const int fault_type)
1584 {
1585 	struct bio *bios = NULL;
1586 	struct bio *bio;
1587 	struct page *page = peer_req->pages;
1588 	sector_t sector = peer_req->i.sector;
1589 	unsigned data_size = peer_req->i.size;
1590 	unsigned n_bios = 0;
1591 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1592 	int err = -ENOMEM;
1593 
1594 	/* TRIM/DISCARD: for now, always use the helper function
1595 	 * blkdev_issue_zeroout(..., discard=true).
1596 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1597 	 * Correctness first, performance later.  Next step is to code an
1598 	 * asynchronous variant of the same.
1599 	 */
1600 	if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1601 		/* wait for all pending IO completions, before we start
1602 		 * zeroing things out. */
1603 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1604 		/* add it to the active list now,
1605 		 * so we can find it to present it in debugfs */
1606 		peer_req->submit_jif = jiffies;
1607 		peer_req->flags |= EE_SUBMITTED;
1608 
1609 		/* If this was a resync request from receive_rs_deallocated(),
1610 		 * it is already on the sync_ee list */
1611 		if (list_empty(&peer_req->w.list)) {
1612 			spin_lock_irq(&device->resource->req_lock);
1613 			list_add_tail(&peer_req->w.list, &device->active_ee);
1614 			spin_unlock_irq(&device->resource->req_lock);
1615 		}
1616 
1617 		if (peer_req->flags & EE_IS_TRIM)
1618 			drbd_issue_peer_discard(device, peer_req);
1619 		else /* EE_WRITE_SAME */
1620 			drbd_issue_peer_wsame(device, peer_req);
1621 		return 0;
1622 	}
1623 
1624 	/* In most cases, we will only need one bio.  But in case the lower
1625 	 * level restrictions happen to be different at this offset on this
1626 	 * side than those of the sending peer, we may need to submit the
1627 	 * request in more than one bio.
1628 	 *
1629 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1630 	 * generated bio, but a bio allocated on behalf of the peer.
1631 	 */
1632 next_bio:
1633 	bio = bio_alloc(GFP_NOIO, nr_pages);
1634 	if (!bio) {
1635 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1636 		goto fail;
1637 	}
1638 	/* > peer_req->i.sector, unless this is the first bio */
1639 	bio->bi_iter.bi_sector = sector;
1640 	bio->bi_bdev = device->ldev->backing_bdev;
1641 	bio_set_op_attrs(bio, op, op_flags);
1642 	bio->bi_private = peer_req;
1643 	bio->bi_end_io = drbd_peer_request_endio;
1644 
1645 	bio->bi_next = bios;
1646 	bios = bio;
1647 	++n_bios;
1648 
1649 	page_chain_for_each(page) {
1650 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1651 		if (!bio_add_page(bio, page, len, 0))
1652 			goto next_bio;
1653 		data_size -= len;
1654 		sector += len >> 9;
1655 		--nr_pages;
1656 	}
1657 	D_ASSERT(device, data_size == 0);
1658 	D_ASSERT(device, page == NULL);
1659 
1660 	atomic_set(&peer_req->pending_bios, n_bios);
1661 	/* for debugfs: update timestamp, mark as submitted */
1662 	peer_req->submit_jif = jiffies;
1663 	peer_req->flags |= EE_SUBMITTED;
1664 	do {
1665 		bio = bios;
1666 		bios = bios->bi_next;
1667 		bio->bi_next = NULL;
1668 
1669 		drbd_generic_make_request(device, fault_type, bio);
1670 	} while (bios);
1671 	return 0;
1672 
1673 fail:
1674 	while (bios) {
1675 		bio = bios;
1676 		bios = bios->bi_next;
1677 		bio_put(bio);
1678 	}
1679 	return err;
1680 }
1681 
1682 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1683 					     struct drbd_peer_request *peer_req)
1684 {
1685 	struct drbd_interval *i = &peer_req->i;
1686 
1687 	drbd_remove_interval(&device->write_requests, i);
1688 	drbd_clear_interval(i);
1689 
1690 	/* Wake up any processes waiting for this peer request to complete.  */
1691 	if (i->waiting)
1692 		wake_up(&device->misc_wait);
1693 }
1694 
1695 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1696 {
1697 	struct drbd_peer_device *peer_device;
1698 	int vnr;
1699 
1700 	rcu_read_lock();
1701 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1702 		struct drbd_device *device = peer_device->device;
1703 
1704 		kref_get(&device->kref);
1705 		rcu_read_unlock();
1706 		drbd_wait_ee_list_empty(device, &device->active_ee);
1707 		kref_put(&device->kref, drbd_destroy_device);
1708 		rcu_read_lock();
1709 	}
1710 	rcu_read_unlock();
1711 }
1712 
1713 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1714 {
1715 	int rv;
1716 	struct p_barrier *p = pi->data;
1717 	struct drbd_epoch *epoch;
1718 
1719 	/* FIXME these are unacked on connection,
1720 	 * not a specific (peer)device.
1721 	 */
1722 	connection->current_epoch->barrier_nr = p->barrier;
1723 	connection->current_epoch->connection = connection;
1724 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1725 
1726 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1727 	 * the activity log, which means it would not be resynced in case the
1728 	 * R_PRIMARY crashes now.
1729 	 * Therefore we must send the barrier_ack after the barrier request was
1730 	 * completed. */
1731 	switch (connection->resource->write_ordering) {
1732 	case WO_NONE:
1733 		if (rv == FE_RECYCLED)
1734 			return 0;
1735 
1736 		/* receiver context, in the writeout path of the other node.
1737 		 * avoid potential distributed deadlock */
1738 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1739 		if (epoch)
1740 			break;
1741 		else
1742 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1743 			/* Fall through */
1744 
1745 	case WO_BDEV_FLUSH:
1746 	case WO_DRAIN_IO:
1747 		conn_wait_active_ee_empty(connection);
1748 		drbd_flush(connection);
1749 
1750 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1751 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1752 			if (epoch)
1753 				break;
1754 		}
1755 
1756 		return 0;
1757 	default:
1758 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1759 			 connection->resource->write_ordering);
1760 		return -EIO;
1761 	}
1762 
1763 	epoch->flags = 0;
1764 	atomic_set(&epoch->epoch_size, 0);
1765 	atomic_set(&epoch->active, 0);
1766 
1767 	spin_lock(&connection->epoch_lock);
1768 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1769 		list_add(&epoch->list, &connection->current_epoch->list);
1770 		connection->current_epoch = epoch;
1771 		connection->epochs++;
1772 	} else {
1773 		/* The current_epoch got recycled while we allocated this one... */
1774 		kfree(epoch);
1775 	}
1776 	spin_unlock(&connection->epoch_lock);
1777 
1778 	return 0;
1779 }
1780 
1781 /* quick wrapper in case payload size != request_size (write same) */
1782 static void drbd_csum_ee_size(struct crypto_ahash *h,
1783 			      struct drbd_peer_request *r, void *d,
1784 			      unsigned int payload_size)
1785 {
1786 	unsigned int tmp = r->i.size;
1787 	r->i.size = payload_size;
1788 	drbd_csum_ee(h, r, d);
1789 	r->i.size = tmp;
1790 }
1791 
1792 /* used from receive_RSDataReply (recv_resync_read)
1793  * and from receive_Data.
1794  * data_size: actual payload ("data in")
1795  * 	for normal writes that is bi_size.
1796  * 	for discards, that is zero.
1797  * 	for write same, it is logical_block_size.
1798  * both trim and write same have the bi_size ("data len to be affected")
1799  * as extra argument in the packet header.
1800  */
1801 static struct drbd_peer_request *
1802 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1803 	      struct packet_info *pi) __must_hold(local)
1804 {
1805 	struct drbd_device *device = peer_device->device;
1806 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1807 	struct drbd_peer_request *peer_req;
1808 	struct page *page;
1809 	int digest_size, err;
1810 	unsigned int data_size = pi->size, ds;
1811 	void *dig_in = peer_device->connection->int_dig_in;
1812 	void *dig_vv = peer_device->connection->int_dig_vv;
1813 	unsigned long *data;
1814 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1815 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1816 
1817 	digest_size = 0;
1818 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1819 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1820 		/*
1821 		 * FIXME: Receive the incoming digest into the receive buffer
1822 		 *	  here, together with its struct p_data?
1823 		 */
1824 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1825 		if (err)
1826 			return NULL;
1827 		data_size -= digest_size;
1828 	}
1829 
1830 	/* assume request_size == data_size, but special case trim and wsame. */
1831 	ds = data_size;
1832 	if (trim) {
1833 		if (!expect(data_size == 0))
1834 			return NULL;
1835 		ds = be32_to_cpu(trim->size);
1836 	} else if (wsame) {
1837 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1838 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1839 				data_size, queue_logical_block_size(device->rq_queue));
1840 			return NULL;
1841 		}
1842 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1843 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1844 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1845 			return NULL;
1846 		}
1847 		ds = be32_to_cpu(wsame->size);
1848 	}
1849 
1850 	if (!expect(IS_ALIGNED(ds, 512)))
1851 		return NULL;
1852 	if (trim || wsame) {
1853 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1854 			return NULL;
1855 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1856 		return NULL;
1857 
1858 	/* even though we trust out peer,
1859 	 * we sometimes have to double check. */
1860 	if (sector + (ds>>9) > capacity) {
1861 		drbd_err(device, "request from peer beyond end of local disk: "
1862 			"capacity: %llus < sector: %llus + size: %u\n",
1863 			(unsigned long long)capacity,
1864 			(unsigned long long)sector, ds);
1865 		return NULL;
1866 	}
1867 
1868 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1869 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1870 	 * which in turn might block on the other node at this very place.  */
1871 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1872 	if (!peer_req)
1873 		return NULL;
1874 
1875 	peer_req->flags |= EE_WRITE;
1876 	if (trim) {
1877 		peer_req->flags |= EE_IS_TRIM;
1878 		return peer_req;
1879 	}
1880 	if (wsame)
1881 		peer_req->flags |= EE_WRITE_SAME;
1882 
1883 	/* receive payload size bytes into page chain */
1884 	ds = data_size;
1885 	page = peer_req->pages;
1886 	page_chain_for_each(page) {
1887 		unsigned len = min_t(int, ds, PAGE_SIZE);
1888 		data = kmap(page);
1889 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1890 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1891 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1892 			data[0] = data[0] ^ (unsigned long)-1;
1893 		}
1894 		kunmap(page);
1895 		if (err) {
1896 			drbd_free_peer_req(device, peer_req);
1897 			return NULL;
1898 		}
1899 		ds -= len;
1900 	}
1901 
1902 	if (digest_size) {
1903 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1904 		if (memcmp(dig_in, dig_vv, digest_size)) {
1905 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1906 				(unsigned long long)sector, data_size);
1907 			drbd_free_peer_req(device, peer_req);
1908 			return NULL;
1909 		}
1910 	}
1911 	device->recv_cnt += data_size >> 9;
1912 	return peer_req;
1913 }
1914 
1915 /* drbd_drain_block() just takes a data block
1916  * out of the socket input buffer, and discards it.
1917  */
1918 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1919 {
1920 	struct page *page;
1921 	int err = 0;
1922 	void *data;
1923 
1924 	if (!data_size)
1925 		return 0;
1926 
1927 	page = drbd_alloc_pages(peer_device, 1, 1);
1928 
1929 	data = kmap(page);
1930 	while (data_size) {
1931 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1932 
1933 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1934 		if (err)
1935 			break;
1936 		data_size -= len;
1937 	}
1938 	kunmap(page);
1939 	drbd_free_pages(peer_device->device, page, 0);
1940 	return err;
1941 }
1942 
1943 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1944 			   sector_t sector, int data_size)
1945 {
1946 	struct bio_vec bvec;
1947 	struct bvec_iter iter;
1948 	struct bio *bio;
1949 	int digest_size, err, expect;
1950 	void *dig_in = peer_device->connection->int_dig_in;
1951 	void *dig_vv = peer_device->connection->int_dig_vv;
1952 
1953 	digest_size = 0;
1954 	if (peer_device->connection->peer_integrity_tfm) {
1955 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1956 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1957 		if (err)
1958 			return err;
1959 		data_size -= digest_size;
1960 	}
1961 
1962 	/* optimistically update recv_cnt.  if receiving fails below,
1963 	 * we disconnect anyways, and counters will be reset. */
1964 	peer_device->device->recv_cnt += data_size>>9;
1965 
1966 	bio = req->master_bio;
1967 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1968 
1969 	bio_for_each_segment(bvec, bio, iter) {
1970 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1971 		expect = min_t(int, data_size, bvec.bv_len);
1972 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1973 		kunmap(bvec.bv_page);
1974 		if (err)
1975 			return err;
1976 		data_size -= expect;
1977 	}
1978 
1979 	if (digest_size) {
1980 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1981 		if (memcmp(dig_in, dig_vv, digest_size)) {
1982 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1983 			return -EINVAL;
1984 		}
1985 	}
1986 
1987 	D_ASSERT(peer_device->device, data_size == 0);
1988 	return 0;
1989 }
1990 
1991 /*
1992  * e_end_resync_block() is called in ack_sender context via
1993  * drbd_finish_peer_reqs().
1994  */
1995 static int e_end_resync_block(struct drbd_work *w, int unused)
1996 {
1997 	struct drbd_peer_request *peer_req =
1998 		container_of(w, struct drbd_peer_request, w);
1999 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2000 	struct drbd_device *device = peer_device->device;
2001 	sector_t sector = peer_req->i.sector;
2002 	int err;
2003 
2004 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2005 
2006 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2007 		drbd_set_in_sync(device, sector, peer_req->i.size);
2008 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2009 	} else {
2010 		/* Record failure to sync */
2011 		drbd_rs_failed_io(device, sector, peer_req->i.size);
2012 
2013 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2014 	}
2015 	dec_unacked(device);
2016 
2017 	return err;
2018 }
2019 
2020 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2021 			    struct packet_info *pi) __releases(local)
2022 {
2023 	struct drbd_device *device = peer_device->device;
2024 	struct drbd_peer_request *peer_req;
2025 
2026 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2027 	if (!peer_req)
2028 		goto fail;
2029 
2030 	dec_rs_pending(device);
2031 
2032 	inc_unacked(device);
2033 	/* corresponding dec_unacked() in e_end_resync_block()
2034 	 * respective _drbd_clear_done_ee */
2035 
2036 	peer_req->w.cb = e_end_resync_block;
2037 	peer_req->submit_jif = jiffies;
2038 
2039 	spin_lock_irq(&device->resource->req_lock);
2040 	list_add_tail(&peer_req->w.list, &device->sync_ee);
2041 	spin_unlock_irq(&device->resource->req_lock);
2042 
2043 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
2044 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2045 				     DRBD_FAULT_RS_WR) == 0)
2046 		return 0;
2047 
2048 	/* don't care for the reason here */
2049 	drbd_err(device, "submit failed, triggering re-connect\n");
2050 	spin_lock_irq(&device->resource->req_lock);
2051 	list_del(&peer_req->w.list);
2052 	spin_unlock_irq(&device->resource->req_lock);
2053 
2054 	drbd_free_peer_req(device, peer_req);
2055 fail:
2056 	put_ldev(device);
2057 	return -EIO;
2058 }
2059 
2060 static struct drbd_request *
2061 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2062 	     sector_t sector, bool missing_ok, const char *func)
2063 {
2064 	struct drbd_request *req;
2065 
2066 	/* Request object according to our peer */
2067 	req = (struct drbd_request *)(unsigned long)id;
2068 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2069 		return req;
2070 	if (!missing_ok) {
2071 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2072 			(unsigned long)id, (unsigned long long)sector);
2073 	}
2074 	return NULL;
2075 }
2076 
2077 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2078 {
2079 	struct drbd_peer_device *peer_device;
2080 	struct drbd_device *device;
2081 	struct drbd_request *req;
2082 	sector_t sector;
2083 	int err;
2084 	struct p_data *p = pi->data;
2085 
2086 	peer_device = conn_peer_device(connection, pi->vnr);
2087 	if (!peer_device)
2088 		return -EIO;
2089 	device = peer_device->device;
2090 
2091 	sector = be64_to_cpu(p->sector);
2092 
2093 	spin_lock_irq(&device->resource->req_lock);
2094 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2095 	spin_unlock_irq(&device->resource->req_lock);
2096 	if (unlikely(!req))
2097 		return -EIO;
2098 
2099 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2100 	 * special casing it there for the various failure cases.
2101 	 * still no race with drbd_fail_pending_reads */
2102 	err = recv_dless_read(peer_device, req, sector, pi->size);
2103 	if (!err)
2104 		req_mod(req, DATA_RECEIVED);
2105 	/* else: nothing. handled from drbd_disconnect...
2106 	 * I don't think we may complete this just yet
2107 	 * in case we are "on-disconnect: freeze" */
2108 
2109 	return err;
2110 }
2111 
2112 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2113 {
2114 	struct drbd_peer_device *peer_device;
2115 	struct drbd_device *device;
2116 	sector_t sector;
2117 	int err;
2118 	struct p_data *p = pi->data;
2119 
2120 	peer_device = conn_peer_device(connection, pi->vnr);
2121 	if (!peer_device)
2122 		return -EIO;
2123 	device = peer_device->device;
2124 
2125 	sector = be64_to_cpu(p->sector);
2126 	D_ASSERT(device, p->block_id == ID_SYNCER);
2127 
2128 	if (get_ldev(device)) {
2129 		/* data is submitted to disk within recv_resync_read.
2130 		 * corresponding put_ldev done below on error,
2131 		 * or in drbd_peer_request_endio. */
2132 		err = recv_resync_read(peer_device, sector, pi);
2133 	} else {
2134 		if (__ratelimit(&drbd_ratelimit_state))
2135 			drbd_err(device, "Can not write resync data to local disk.\n");
2136 
2137 		err = drbd_drain_block(peer_device, pi->size);
2138 
2139 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2140 	}
2141 
2142 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2143 
2144 	return err;
2145 }
2146 
2147 static void restart_conflicting_writes(struct drbd_device *device,
2148 				       sector_t sector, int size)
2149 {
2150 	struct drbd_interval *i;
2151 	struct drbd_request *req;
2152 
2153 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2154 		if (!i->local)
2155 			continue;
2156 		req = container_of(i, struct drbd_request, i);
2157 		if (req->rq_state & RQ_LOCAL_PENDING ||
2158 		    !(req->rq_state & RQ_POSTPONED))
2159 			continue;
2160 		/* as it is RQ_POSTPONED, this will cause it to
2161 		 * be queued on the retry workqueue. */
2162 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2163 	}
2164 }
2165 
2166 /*
2167  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2168  */
2169 static int e_end_block(struct drbd_work *w, int cancel)
2170 {
2171 	struct drbd_peer_request *peer_req =
2172 		container_of(w, struct drbd_peer_request, w);
2173 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2174 	struct drbd_device *device = peer_device->device;
2175 	sector_t sector = peer_req->i.sector;
2176 	int err = 0, pcmd;
2177 
2178 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2179 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2180 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2181 				device->state.conn <= C_PAUSED_SYNC_T &&
2182 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2183 				P_RS_WRITE_ACK : P_WRITE_ACK;
2184 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2185 			if (pcmd == P_RS_WRITE_ACK)
2186 				drbd_set_in_sync(device, sector, peer_req->i.size);
2187 		} else {
2188 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2189 			/* we expect it to be marked out of sync anyways...
2190 			 * maybe assert this?  */
2191 		}
2192 		dec_unacked(device);
2193 	}
2194 
2195 	/* we delete from the conflict detection hash _after_ we sent out the
2196 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2197 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2198 		spin_lock_irq(&device->resource->req_lock);
2199 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2200 		drbd_remove_epoch_entry_interval(device, peer_req);
2201 		if (peer_req->flags & EE_RESTART_REQUESTS)
2202 			restart_conflicting_writes(device, sector, peer_req->i.size);
2203 		spin_unlock_irq(&device->resource->req_lock);
2204 	} else
2205 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2206 
2207 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2208 
2209 	return err;
2210 }
2211 
2212 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2213 {
2214 	struct drbd_peer_request *peer_req =
2215 		container_of(w, struct drbd_peer_request, w);
2216 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2217 	int err;
2218 
2219 	err = drbd_send_ack(peer_device, ack, peer_req);
2220 	dec_unacked(peer_device->device);
2221 
2222 	return err;
2223 }
2224 
2225 static int e_send_superseded(struct drbd_work *w, int unused)
2226 {
2227 	return e_send_ack(w, P_SUPERSEDED);
2228 }
2229 
2230 static int e_send_retry_write(struct drbd_work *w, int unused)
2231 {
2232 	struct drbd_peer_request *peer_req =
2233 		container_of(w, struct drbd_peer_request, w);
2234 	struct drbd_connection *connection = peer_req->peer_device->connection;
2235 
2236 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2237 			     P_RETRY_WRITE : P_SUPERSEDED);
2238 }
2239 
2240 static bool seq_greater(u32 a, u32 b)
2241 {
2242 	/*
2243 	 * We assume 32-bit wrap-around here.
2244 	 * For 24-bit wrap-around, we would have to shift:
2245 	 *  a <<= 8; b <<= 8;
2246 	 */
2247 	return (s32)a - (s32)b > 0;
2248 }
2249 
2250 static u32 seq_max(u32 a, u32 b)
2251 {
2252 	return seq_greater(a, b) ? a : b;
2253 }
2254 
2255 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2256 {
2257 	struct drbd_device *device = peer_device->device;
2258 	unsigned int newest_peer_seq;
2259 
2260 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2261 		spin_lock(&device->peer_seq_lock);
2262 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2263 		device->peer_seq = newest_peer_seq;
2264 		spin_unlock(&device->peer_seq_lock);
2265 		/* wake up only if we actually changed device->peer_seq */
2266 		if (peer_seq == newest_peer_seq)
2267 			wake_up(&device->seq_wait);
2268 	}
2269 }
2270 
2271 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2272 {
2273 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2274 }
2275 
2276 /* maybe change sync_ee into interval trees as well? */
2277 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2278 {
2279 	struct drbd_peer_request *rs_req;
2280 	bool rv = false;
2281 
2282 	spin_lock_irq(&device->resource->req_lock);
2283 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2284 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2285 			     rs_req->i.sector, rs_req->i.size)) {
2286 			rv = true;
2287 			break;
2288 		}
2289 	}
2290 	spin_unlock_irq(&device->resource->req_lock);
2291 
2292 	return rv;
2293 }
2294 
2295 /* Called from receive_Data.
2296  * Synchronize packets on sock with packets on msock.
2297  *
2298  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2299  * packet traveling on msock, they are still processed in the order they have
2300  * been sent.
2301  *
2302  * Note: we don't care for Ack packets overtaking P_DATA packets.
2303  *
2304  * In case packet_seq is larger than device->peer_seq number, there are
2305  * outstanding packets on the msock. We wait for them to arrive.
2306  * In case we are the logically next packet, we update device->peer_seq
2307  * ourselves. Correctly handles 32bit wrap around.
2308  *
2309  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2310  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2311  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2312  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2313  *
2314  * returns 0 if we may process the packet,
2315  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2316 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2317 {
2318 	struct drbd_device *device = peer_device->device;
2319 	DEFINE_WAIT(wait);
2320 	long timeout;
2321 	int ret = 0, tp;
2322 
2323 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2324 		return 0;
2325 
2326 	spin_lock(&device->peer_seq_lock);
2327 	for (;;) {
2328 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2329 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2330 			break;
2331 		}
2332 
2333 		if (signal_pending(current)) {
2334 			ret = -ERESTARTSYS;
2335 			break;
2336 		}
2337 
2338 		rcu_read_lock();
2339 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2340 		rcu_read_unlock();
2341 
2342 		if (!tp)
2343 			break;
2344 
2345 		/* Only need to wait if two_primaries is enabled */
2346 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2347 		spin_unlock(&device->peer_seq_lock);
2348 		rcu_read_lock();
2349 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2350 		rcu_read_unlock();
2351 		timeout = schedule_timeout(timeout);
2352 		spin_lock(&device->peer_seq_lock);
2353 		if (!timeout) {
2354 			ret = -ETIMEDOUT;
2355 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2356 			break;
2357 		}
2358 	}
2359 	spin_unlock(&device->peer_seq_lock);
2360 	finish_wait(&device->seq_wait, &wait);
2361 	return ret;
2362 }
2363 
2364 /* see also bio_flags_to_wire()
2365  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2366  * flags and back. We may replicate to other kernel versions. */
2367 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2368 {
2369 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2370 		(dpf & DP_FUA ? REQ_FUA : 0) |
2371 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2372 }
2373 
2374 static unsigned long wire_flags_to_bio_op(u32 dpf)
2375 {
2376 	if (dpf & DP_DISCARD)
2377 		return REQ_OP_DISCARD;
2378 	else
2379 		return REQ_OP_WRITE;
2380 }
2381 
2382 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2383 				    unsigned int size)
2384 {
2385 	struct drbd_interval *i;
2386 
2387     repeat:
2388 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2389 		struct drbd_request *req;
2390 		struct bio_and_error m;
2391 
2392 		if (!i->local)
2393 			continue;
2394 		req = container_of(i, struct drbd_request, i);
2395 		if (!(req->rq_state & RQ_POSTPONED))
2396 			continue;
2397 		req->rq_state &= ~RQ_POSTPONED;
2398 		__req_mod(req, NEG_ACKED, &m);
2399 		spin_unlock_irq(&device->resource->req_lock);
2400 		if (m.bio)
2401 			complete_master_bio(device, &m);
2402 		spin_lock_irq(&device->resource->req_lock);
2403 		goto repeat;
2404 	}
2405 }
2406 
2407 static int handle_write_conflicts(struct drbd_device *device,
2408 				  struct drbd_peer_request *peer_req)
2409 {
2410 	struct drbd_connection *connection = peer_req->peer_device->connection;
2411 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2412 	sector_t sector = peer_req->i.sector;
2413 	const unsigned int size = peer_req->i.size;
2414 	struct drbd_interval *i;
2415 	bool equal;
2416 	int err;
2417 
2418 	/*
2419 	 * Inserting the peer request into the write_requests tree will prevent
2420 	 * new conflicting local requests from being added.
2421 	 */
2422 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2423 
2424     repeat:
2425 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2426 		if (i == &peer_req->i)
2427 			continue;
2428 		if (i->completed)
2429 			continue;
2430 
2431 		if (!i->local) {
2432 			/*
2433 			 * Our peer has sent a conflicting remote request; this
2434 			 * should not happen in a two-node setup.  Wait for the
2435 			 * earlier peer request to complete.
2436 			 */
2437 			err = drbd_wait_misc(device, i);
2438 			if (err)
2439 				goto out;
2440 			goto repeat;
2441 		}
2442 
2443 		equal = i->sector == sector && i->size == size;
2444 		if (resolve_conflicts) {
2445 			/*
2446 			 * If the peer request is fully contained within the
2447 			 * overlapping request, it can be considered overwritten
2448 			 * and thus superseded; otherwise, it will be retried
2449 			 * once all overlapping requests have completed.
2450 			 */
2451 			bool superseded = i->sector <= sector && i->sector +
2452 				       (i->size >> 9) >= sector + (size >> 9);
2453 
2454 			if (!equal)
2455 				drbd_alert(device, "Concurrent writes detected: "
2456 					       "local=%llus +%u, remote=%llus +%u, "
2457 					       "assuming %s came first\n",
2458 					  (unsigned long long)i->sector, i->size,
2459 					  (unsigned long long)sector, size,
2460 					  superseded ? "local" : "remote");
2461 
2462 			peer_req->w.cb = superseded ? e_send_superseded :
2463 						   e_send_retry_write;
2464 			list_add_tail(&peer_req->w.list, &device->done_ee);
2465 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2466 
2467 			err = -ENOENT;
2468 			goto out;
2469 		} else {
2470 			struct drbd_request *req =
2471 				container_of(i, struct drbd_request, i);
2472 
2473 			if (!equal)
2474 				drbd_alert(device, "Concurrent writes detected: "
2475 					       "local=%llus +%u, remote=%llus +%u\n",
2476 					  (unsigned long long)i->sector, i->size,
2477 					  (unsigned long long)sector, size);
2478 
2479 			if (req->rq_state & RQ_LOCAL_PENDING ||
2480 			    !(req->rq_state & RQ_POSTPONED)) {
2481 				/*
2482 				 * Wait for the node with the discard flag to
2483 				 * decide if this request has been superseded
2484 				 * or needs to be retried.
2485 				 * Requests that have been superseded will
2486 				 * disappear from the write_requests tree.
2487 				 *
2488 				 * In addition, wait for the conflicting
2489 				 * request to finish locally before submitting
2490 				 * the conflicting peer request.
2491 				 */
2492 				err = drbd_wait_misc(device, &req->i);
2493 				if (err) {
2494 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2495 					fail_postponed_requests(device, sector, size);
2496 					goto out;
2497 				}
2498 				goto repeat;
2499 			}
2500 			/*
2501 			 * Remember to restart the conflicting requests after
2502 			 * the new peer request has completed.
2503 			 */
2504 			peer_req->flags |= EE_RESTART_REQUESTS;
2505 		}
2506 	}
2507 	err = 0;
2508 
2509     out:
2510 	if (err)
2511 		drbd_remove_epoch_entry_interval(device, peer_req);
2512 	return err;
2513 }
2514 
2515 /* mirrored write */
2516 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2517 {
2518 	struct drbd_peer_device *peer_device;
2519 	struct drbd_device *device;
2520 	struct net_conf *nc;
2521 	sector_t sector;
2522 	struct drbd_peer_request *peer_req;
2523 	struct p_data *p = pi->data;
2524 	u32 peer_seq = be32_to_cpu(p->seq_num);
2525 	int op, op_flags;
2526 	u32 dp_flags;
2527 	int err, tp;
2528 
2529 	peer_device = conn_peer_device(connection, pi->vnr);
2530 	if (!peer_device)
2531 		return -EIO;
2532 	device = peer_device->device;
2533 
2534 	if (!get_ldev(device)) {
2535 		int err2;
2536 
2537 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2538 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2539 		atomic_inc(&connection->current_epoch->epoch_size);
2540 		err2 = drbd_drain_block(peer_device, pi->size);
2541 		if (!err)
2542 			err = err2;
2543 		return err;
2544 	}
2545 
2546 	/*
2547 	 * Corresponding put_ldev done either below (on various errors), or in
2548 	 * drbd_peer_request_endio, if we successfully submit the data at the
2549 	 * end of this function.
2550 	 */
2551 
2552 	sector = be64_to_cpu(p->sector);
2553 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2554 	if (!peer_req) {
2555 		put_ldev(device);
2556 		return -EIO;
2557 	}
2558 
2559 	peer_req->w.cb = e_end_block;
2560 	peer_req->submit_jif = jiffies;
2561 	peer_req->flags |= EE_APPLICATION;
2562 
2563 	dp_flags = be32_to_cpu(p->dp_flags);
2564 	op = wire_flags_to_bio_op(dp_flags);
2565 	op_flags = wire_flags_to_bio_flags(dp_flags);
2566 	if (pi->cmd == P_TRIM) {
2567 		D_ASSERT(peer_device, peer_req->i.size > 0);
2568 		D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2569 		D_ASSERT(peer_device, peer_req->pages == NULL);
2570 	} else if (peer_req->pages == NULL) {
2571 		D_ASSERT(device, peer_req->i.size == 0);
2572 		D_ASSERT(device, dp_flags & DP_FLUSH);
2573 	}
2574 
2575 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2576 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2577 
2578 	spin_lock(&connection->epoch_lock);
2579 	peer_req->epoch = connection->current_epoch;
2580 	atomic_inc(&peer_req->epoch->epoch_size);
2581 	atomic_inc(&peer_req->epoch->active);
2582 	spin_unlock(&connection->epoch_lock);
2583 
2584 	rcu_read_lock();
2585 	nc = rcu_dereference(peer_device->connection->net_conf);
2586 	tp = nc->two_primaries;
2587 	if (peer_device->connection->agreed_pro_version < 100) {
2588 		switch (nc->wire_protocol) {
2589 		case DRBD_PROT_C:
2590 			dp_flags |= DP_SEND_WRITE_ACK;
2591 			break;
2592 		case DRBD_PROT_B:
2593 			dp_flags |= DP_SEND_RECEIVE_ACK;
2594 			break;
2595 		}
2596 	}
2597 	rcu_read_unlock();
2598 
2599 	if (dp_flags & DP_SEND_WRITE_ACK) {
2600 		peer_req->flags |= EE_SEND_WRITE_ACK;
2601 		inc_unacked(device);
2602 		/* corresponding dec_unacked() in e_end_block()
2603 		 * respective _drbd_clear_done_ee */
2604 	}
2605 
2606 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2607 		/* I really don't like it that the receiver thread
2608 		 * sends on the msock, but anyways */
2609 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2610 	}
2611 
2612 	if (tp) {
2613 		/* two primaries implies protocol C */
2614 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2615 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2616 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2617 		if (err)
2618 			goto out_interrupted;
2619 		spin_lock_irq(&device->resource->req_lock);
2620 		err = handle_write_conflicts(device, peer_req);
2621 		if (err) {
2622 			spin_unlock_irq(&device->resource->req_lock);
2623 			if (err == -ENOENT) {
2624 				put_ldev(device);
2625 				return 0;
2626 			}
2627 			goto out_interrupted;
2628 		}
2629 	} else {
2630 		update_peer_seq(peer_device, peer_seq);
2631 		spin_lock_irq(&device->resource->req_lock);
2632 	}
2633 	/* TRIM and WRITE_SAME are processed synchronously,
2634 	 * we wait for all pending requests, respectively wait for
2635 	 * active_ee to become empty in drbd_submit_peer_request();
2636 	 * better not add ourselves here. */
2637 	if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2638 		list_add_tail(&peer_req->w.list, &device->active_ee);
2639 	spin_unlock_irq(&device->resource->req_lock);
2640 
2641 	if (device->state.conn == C_SYNC_TARGET)
2642 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2643 
2644 	if (device->state.pdsk < D_INCONSISTENT) {
2645 		/* In case we have the only disk of the cluster, */
2646 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2647 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2648 		drbd_al_begin_io(device, &peer_req->i);
2649 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2650 	}
2651 
2652 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2653 				       DRBD_FAULT_DT_WR);
2654 	if (!err)
2655 		return 0;
2656 
2657 	/* don't care for the reason here */
2658 	drbd_err(device, "submit failed, triggering re-connect\n");
2659 	spin_lock_irq(&device->resource->req_lock);
2660 	list_del(&peer_req->w.list);
2661 	drbd_remove_epoch_entry_interval(device, peer_req);
2662 	spin_unlock_irq(&device->resource->req_lock);
2663 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2664 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2665 		drbd_al_complete_io(device, &peer_req->i);
2666 	}
2667 
2668 out_interrupted:
2669 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2670 	put_ldev(device);
2671 	drbd_free_peer_req(device, peer_req);
2672 	return err;
2673 }
2674 
2675 /* We may throttle resync, if the lower device seems to be busy,
2676  * and current sync rate is above c_min_rate.
2677  *
2678  * To decide whether or not the lower device is busy, we use a scheme similar
2679  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2680  * (more than 64 sectors) of activity we cannot account for with our own resync
2681  * activity, it obviously is "busy".
2682  *
2683  * The current sync rate used here uses only the most recent two step marks,
2684  * to have a short time average so we can react faster.
2685  */
2686 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2687 		bool throttle_if_app_is_waiting)
2688 {
2689 	struct lc_element *tmp;
2690 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2691 
2692 	if (!throttle || throttle_if_app_is_waiting)
2693 		return throttle;
2694 
2695 	spin_lock_irq(&device->al_lock);
2696 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2697 	if (tmp) {
2698 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2699 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2700 			throttle = false;
2701 		/* Do not slow down if app IO is already waiting for this extent,
2702 		 * and our progress is necessary for application IO to complete. */
2703 	}
2704 	spin_unlock_irq(&device->al_lock);
2705 
2706 	return throttle;
2707 }
2708 
2709 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2710 {
2711 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2712 	unsigned long db, dt, dbdt;
2713 	unsigned int c_min_rate;
2714 	int curr_events;
2715 
2716 	rcu_read_lock();
2717 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2718 	rcu_read_unlock();
2719 
2720 	/* feature disabled? */
2721 	if (c_min_rate == 0)
2722 		return false;
2723 
2724 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2725 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2726 			atomic_read(&device->rs_sect_ev);
2727 
2728 	if (atomic_read(&device->ap_actlog_cnt)
2729 	    || curr_events - device->rs_last_events > 64) {
2730 		unsigned long rs_left;
2731 		int i;
2732 
2733 		device->rs_last_events = curr_events;
2734 
2735 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2736 		 * approx. */
2737 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2738 
2739 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2740 			rs_left = device->ov_left;
2741 		else
2742 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2743 
2744 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2745 		if (!dt)
2746 			dt++;
2747 		db = device->rs_mark_left[i] - rs_left;
2748 		dbdt = Bit2KB(db/dt);
2749 
2750 		if (dbdt > c_min_rate)
2751 			return true;
2752 	}
2753 	return false;
2754 }
2755 
2756 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2757 {
2758 	struct drbd_peer_device *peer_device;
2759 	struct drbd_device *device;
2760 	sector_t sector;
2761 	sector_t capacity;
2762 	struct drbd_peer_request *peer_req;
2763 	struct digest_info *di = NULL;
2764 	int size, verb;
2765 	unsigned int fault_type;
2766 	struct p_block_req *p =	pi->data;
2767 
2768 	peer_device = conn_peer_device(connection, pi->vnr);
2769 	if (!peer_device)
2770 		return -EIO;
2771 	device = peer_device->device;
2772 	capacity = drbd_get_capacity(device->this_bdev);
2773 
2774 	sector = be64_to_cpu(p->sector);
2775 	size   = be32_to_cpu(p->blksize);
2776 
2777 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2778 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2779 				(unsigned long long)sector, size);
2780 		return -EINVAL;
2781 	}
2782 	if (sector + (size>>9) > capacity) {
2783 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2784 				(unsigned long long)sector, size);
2785 		return -EINVAL;
2786 	}
2787 
2788 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2789 		verb = 1;
2790 		switch (pi->cmd) {
2791 		case P_DATA_REQUEST:
2792 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2793 			break;
2794 		case P_RS_THIN_REQ:
2795 		case P_RS_DATA_REQUEST:
2796 		case P_CSUM_RS_REQUEST:
2797 		case P_OV_REQUEST:
2798 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2799 			break;
2800 		case P_OV_REPLY:
2801 			verb = 0;
2802 			dec_rs_pending(device);
2803 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2804 			break;
2805 		default:
2806 			BUG();
2807 		}
2808 		if (verb && __ratelimit(&drbd_ratelimit_state))
2809 			drbd_err(device, "Can not satisfy peer's read request, "
2810 			    "no local data.\n");
2811 
2812 		/* drain possibly payload */
2813 		return drbd_drain_block(peer_device, pi->size);
2814 	}
2815 
2816 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2817 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2818 	 * which in turn might block on the other node at this very place.  */
2819 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2820 			size, GFP_NOIO);
2821 	if (!peer_req) {
2822 		put_ldev(device);
2823 		return -ENOMEM;
2824 	}
2825 
2826 	switch (pi->cmd) {
2827 	case P_DATA_REQUEST:
2828 		peer_req->w.cb = w_e_end_data_req;
2829 		fault_type = DRBD_FAULT_DT_RD;
2830 		/* application IO, don't drbd_rs_begin_io */
2831 		peer_req->flags |= EE_APPLICATION;
2832 		goto submit;
2833 
2834 	case P_RS_THIN_REQ:
2835 		/* If at some point in the future we have a smart way to
2836 		   find out if this data block is completely deallocated,
2837 		   then we would do something smarter here than reading
2838 		   the block... */
2839 		peer_req->flags |= EE_RS_THIN_REQ;
2840 	case P_RS_DATA_REQUEST:
2841 		peer_req->w.cb = w_e_end_rsdata_req;
2842 		fault_type = DRBD_FAULT_RS_RD;
2843 		/* used in the sector offset progress display */
2844 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2845 		break;
2846 
2847 	case P_OV_REPLY:
2848 	case P_CSUM_RS_REQUEST:
2849 		fault_type = DRBD_FAULT_RS_RD;
2850 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2851 		if (!di)
2852 			goto out_free_e;
2853 
2854 		di->digest_size = pi->size;
2855 		di->digest = (((char *)di)+sizeof(struct digest_info));
2856 
2857 		peer_req->digest = di;
2858 		peer_req->flags |= EE_HAS_DIGEST;
2859 
2860 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2861 			goto out_free_e;
2862 
2863 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2864 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2865 			peer_req->w.cb = w_e_end_csum_rs_req;
2866 			/* used in the sector offset progress display */
2867 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2868 			/* remember to report stats in drbd_resync_finished */
2869 			device->use_csums = true;
2870 		} else if (pi->cmd == P_OV_REPLY) {
2871 			/* track progress, we may need to throttle */
2872 			atomic_add(size >> 9, &device->rs_sect_in);
2873 			peer_req->w.cb = w_e_end_ov_reply;
2874 			dec_rs_pending(device);
2875 			/* drbd_rs_begin_io done when we sent this request,
2876 			 * but accounting still needs to be done. */
2877 			goto submit_for_resync;
2878 		}
2879 		break;
2880 
2881 	case P_OV_REQUEST:
2882 		if (device->ov_start_sector == ~(sector_t)0 &&
2883 		    peer_device->connection->agreed_pro_version >= 90) {
2884 			unsigned long now = jiffies;
2885 			int i;
2886 			device->ov_start_sector = sector;
2887 			device->ov_position = sector;
2888 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2889 			device->rs_total = device->ov_left;
2890 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2891 				device->rs_mark_left[i] = device->ov_left;
2892 				device->rs_mark_time[i] = now;
2893 			}
2894 			drbd_info(device, "Online Verify start sector: %llu\n",
2895 					(unsigned long long)sector);
2896 		}
2897 		peer_req->w.cb = w_e_end_ov_req;
2898 		fault_type = DRBD_FAULT_RS_RD;
2899 		break;
2900 
2901 	default:
2902 		BUG();
2903 	}
2904 
2905 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2906 	 * wrt the receiver, but it is not as straightforward as it may seem.
2907 	 * Various places in the resync start and stop logic assume resync
2908 	 * requests are processed in order, requeuing this on the worker thread
2909 	 * introduces a bunch of new code for synchronization between threads.
2910 	 *
2911 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2912 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2913 	 * for application writes for the same time.  For now, just throttle
2914 	 * here, where the rest of the code expects the receiver to sleep for
2915 	 * a while, anyways.
2916 	 */
2917 
2918 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2919 	 * this defers syncer requests for some time, before letting at least
2920 	 * on request through.  The resync controller on the receiving side
2921 	 * will adapt to the incoming rate accordingly.
2922 	 *
2923 	 * We cannot throttle here if remote is Primary/SyncTarget:
2924 	 * we would also throttle its application reads.
2925 	 * In that case, throttling is done on the SyncTarget only.
2926 	 */
2927 
2928 	/* Even though this may be a resync request, we do add to "read_ee";
2929 	 * "sync_ee" is only used for resync WRITEs.
2930 	 * Add to list early, so debugfs can find this request
2931 	 * even if we have to sleep below. */
2932 	spin_lock_irq(&device->resource->req_lock);
2933 	list_add_tail(&peer_req->w.list, &device->read_ee);
2934 	spin_unlock_irq(&device->resource->req_lock);
2935 
2936 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2937 	if (device->state.peer != R_PRIMARY
2938 	&& drbd_rs_should_slow_down(device, sector, false))
2939 		schedule_timeout_uninterruptible(HZ/10);
2940 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2941 	if (drbd_rs_begin_io(device, sector))
2942 		goto out_free_e;
2943 
2944 submit_for_resync:
2945 	atomic_add(size >> 9, &device->rs_sect_ev);
2946 
2947 submit:
2948 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2949 	inc_unacked(device);
2950 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2951 				     fault_type) == 0)
2952 		return 0;
2953 
2954 	/* don't care for the reason here */
2955 	drbd_err(device, "submit failed, triggering re-connect\n");
2956 
2957 out_free_e:
2958 	spin_lock_irq(&device->resource->req_lock);
2959 	list_del(&peer_req->w.list);
2960 	spin_unlock_irq(&device->resource->req_lock);
2961 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2962 
2963 	put_ldev(device);
2964 	drbd_free_peer_req(device, peer_req);
2965 	return -EIO;
2966 }
2967 
2968 /**
2969  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2970  */
2971 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2972 {
2973 	struct drbd_device *device = peer_device->device;
2974 	int self, peer, rv = -100;
2975 	unsigned long ch_self, ch_peer;
2976 	enum drbd_after_sb_p after_sb_0p;
2977 
2978 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2979 	peer = device->p_uuid[UI_BITMAP] & 1;
2980 
2981 	ch_peer = device->p_uuid[UI_SIZE];
2982 	ch_self = device->comm_bm_set;
2983 
2984 	rcu_read_lock();
2985 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2986 	rcu_read_unlock();
2987 	switch (after_sb_0p) {
2988 	case ASB_CONSENSUS:
2989 	case ASB_DISCARD_SECONDARY:
2990 	case ASB_CALL_HELPER:
2991 	case ASB_VIOLENTLY:
2992 		drbd_err(device, "Configuration error.\n");
2993 		break;
2994 	case ASB_DISCONNECT:
2995 		break;
2996 	case ASB_DISCARD_YOUNGER_PRI:
2997 		if (self == 0 && peer == 1) {
2998 			rv = -1;
2999 			break;
3000 		}
3001 		if (self == 1 && peer == 0) {
3002 			rv =  1;
3003 			break;
3004 		}
3005 		/* Else fall through to one of the other strategies... */
3006 	case ASB_DISCARD_OLDER_PRI:
3007 		if (self == 0 && peer == 1) {
3008 			rv = 1;
3009 			break;
3010 		}
3011 		if (self == 1 && peer == 0) {
3012 			rv = -1;
3013 			break;
3014 		}
3015 		/* Else fall through to one of the other strategies... */
3016 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3017 		     "Using discard-least-changes instead\n");
3018 	case ASB_DISCARD_ZERO_CHG:
3019 		if (ch_peer == 0 && ch_self == 0) {
3020 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3021 				? -1 : 1;
3022 			break;
3023 		} else {
3024 			if (ch_peer == 0) { rv =  1; break; }
3025 			if (ch_self == 0) { rv = -1; break; }
3026 		}
3027 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3028 			break;
3029 	case ASB_DISCARD_LEAST_CHG:
3030 		if	(ch_self < ch_peer)
3031 			rv = -1;
3032 		else if (ch_self > ch_peer)
3033 			rv =  1;
3034 		else /* ( ch_self == ch_peer ) */
3035 		     /* Well, then use something else. */
3036 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3037 				? -1 : 1;
3038 		break;
3039 	case ASB_DISCARD_LOCAL:
3040 		rv = -1;
3041 		break;
3042 	case ASB_DISCARD_REMOTE:
3043 		rv =  1;
3044 	}
3045 
3046 	return rv;
3047 }
3048 
3049 /**
3050  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3051  */
3052 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3053 {
3054 	struct drbd_device *device = peer_device->device;
3055 	int hg, rv = -100;
3056 	enum drbd_after_sb_p after_sb_1p;
3057 
3058 	rcu_read_lock();
3059 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3060 	rcu_read_unlock();
3061 	switch (after_sb_1p) {
3062 	case ASB_DISCARD_YOUNGER_PRI:
3063 	case ASB_DISCARD_OLDER_PRI:
3064 	case ASB_DISCARD_LEAST_CHG:
3065 	case ASB_DISCARD_LOCAL:
3066 	case ASB_DISCARD_REMOTE:
3067 	case ASB_DISCARD_ZERO_CHG:
3068 		drbd_err(device, "Configuration error.\n");
3069 		break;
3070 	case ASB_DISCONNECT:
3071 		break;
3072 	case ASB_CONSENSUS:
3073 		hg = drbd_asb_recover_0p(peer_device);
3074 		if (hg == -1 && device->state.role == R_SECONDARY)
3075 			rv = hg;
3076 		if (hg == 1  && device->state.role == R_PRIMARY)
3077 			rv = hg;
3078 		break;
3079 	case ASB_VIOLENTLY:
3080 		rv = drbd_asb_recover_0p(peer_device);
3081 		break;
3082 	case ASB_DISCARD_SECONDARY:
3083 		return device->state.role == R_PRIMARY ? 1 : -1;
3084 	case ASB_CALL_HELPER:
3085 		hg = drbd_asb_recover_0p(peer_device);
3086 		if (hg == -1 && device->state.role == R_PRIMARY) {
3087 			enum drbd_state_rv rv2;
3088 
3089 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3090 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3091 			  * we do not need to wait for the after state change work either. */
3092 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3093 			if (rv2 != SS_SUCCESS) {
3094 				drbd_khelper(device, "pri-lost-after-sb");
3095 			} else {
3096 				drbd_warn(device, "Successfully gave up primary role.\n");
3097 				rv = hg;
3098 			}
3099 		} else
3100 			rv = hg;
3101 	}
3102 
3103 	return rv;
3104 }
3105 
3106 /**
3107  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3108  */
3109 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3110 {
3111 	struct drbd_device *device = peer_device->device;
3112 	int hg, rv = -100;
3113 	enum drbd_after_sb_p after_sb_2p;
3114 
3115 	rcu_read_lock();
3116 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3117 	rcu_read_unlock();
3118 	switch (after_sb_2p) {
3119 	case ASB_DISCARD_YOUNGER_PRI:
3120 	case ASB_DISCARD_OLDER_PRI:
3121 	case ASB_DISCARD_LEAST_CHG:
3122 	case ASB_DISCARD_LOCAL:
3123 	case ASB_DISCARD_REMOTE:
3124 	case ASB_CONSENSUS:
3125 	case ASB_DISCARD_SECONDARY:
3126 	case ASB_DISCARD_ZERO_CHG:
3127 		drbd_err(device, "Configuration error.\n");
3128 		break;
3129 	case ASB_VIOLENTLY:
3130 		rv = drbd_asb_recover_0p(peer_device);
3131 		break;
3132 	case ASB_DISCONNECT:
3133 		break;
3134 	case ASB_CALL_HELPER:
3135 		hg = drbd_asb_recover_0p(peer_device);
3136 		if (hg == -1) {
3137 			enum drbd_state_rv rv2;
3138 
3139 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3140 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3141 			  * we do not need to wait for the after state change work either. */
3142 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3143 			if (rv2 != SS_SUCCESS) {
3144 				drbd_khelper(device, "pri-lost-after-sb");
3145 			} else {
3146 				drbd_warn(device, "Successfully gave up primary role.\n");
3147 				rv = hg;
3148 			}
3149 		} else
3150 			rv = hg;
3151 	}
3152 
3153 	return rv;
3154 }
3155 
3156 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3157 			   u64 bits, u64 flags)
3158 {
3159 	if (!uuid) {
3160 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3161 		return;
3162 	}
3163 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3164 	     text,
3165 	     (unsigned long long)uuid[UI_CURRENT],
3166 	     (unsigned long long)uuid[UI_BITMAP],
3167 	     (unsigned long long)uuid[UI_HISTORY_START],
3168 	     (unsigned long long)uuid[UI_HISTORY_END],
3169 	     (unsigned long long)bits,
3170 	     (unsigned long long)flags);
3171 }
3172 
3173 /*
3174   100	after split brain try auto recover
3175     2	C_SYNC_SOURCE set BitMap
3176     1	C_SYNC_SOURCE use BitMap
3177     0	no Sync
3178    -1	C_SYNC_TARGET use BitMap
3179    -2	C_SYNC_TARGET set BitMap
3180  -100	after split brain, disconnect
3181 -1000	unrelated data
3182 -1091   requires proto 91
3183 -1096   requires proto 96
3184  */
3185 
3186 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3187 {
3188 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3189 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3190 	u64 self, peer;
3191 	int i, j;
3192 
3193 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3194 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3195 
3196 	*rule_nr = 10;
3197 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3198 		return 0;
3199 
3200 	*rule_nr = 20;
3201 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3202 	     peer != UUID_JUST_CREATED)
3203 		return -2;
3204 
3205 	*rule_nr = 30;
3206 	if (self != UUID_JUST_CREATED &&
3207 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3208 		return 2;
3209 
3210 	if (self == peer) {
3211 		int rct, dc; /* roles at crash time */
3212 
3213 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3214 
3215 			if (connection->agreed_pro_version < 91)
3216 				return -1091;
3217 
3218 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3219 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3220 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3221 				drbd_uuid_move_history(device);
3222 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3223 				device->ldev->md.uuid[UI_BITMAP] = 0;
3224 
3225 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3226 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3227 				*rule_nr = 34;
3228 			} else {
3229 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3230 				*rule_nr = 36;
3231 			}
3232 
3233 			return 1;
3234 		}
3235 
3236 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3237 
3238 			if (connection->agreed_pro_version < 91)
3239 				return -1091;
3240 
3241 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3242 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3243 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3244 
3245 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3246 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3247 				device->p_uuid[UI_BITMAP] = 0UL;
3248 
3249 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3250 				*rule_nr = 35;
3251 			} else {
3252 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3253 				*rule_nr = 37;
3254 			}
3255 
3256 			return -1;
3257 		}
3258 
3259 		/* Common power [off|failure] */
3260 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3261 			(device->p_uuid[UI_FLAGS] & 2);
3262 		/* lowest bit is set when we were primary,
3263 		 * next bit (weight 2) is set when peer was primary */
3264 		*rule_nr = 40;
3265 
3266 		/* Neither has the "crashed primary" flag set,
3267 		 * only a replication link hickup. */
3268 		if (rct == 0)
3269 			return 0;
3270 
3271 		/* Current UUID equal and no bitmap uuid; does not necessarily
3272 		 * mean this was a "simultaneous hard crash", maybe IO was
3273 		 * frozen, so no UUID-bump happened.
3274 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3275 		 * for "new-enough" peer DRBD version. */
3276 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3277 			*rule_nr = 41;
3278 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3279 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3280 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3281 			}
3282 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3283 				/* At least one has the "crashed primary" bit set,
3284 				 * both are primary now, but neither has rotated its UUIDs?
3285 				 * "Can not happen." */
3286 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3287 				return -100;
3288 			}
3289 			if (device->state.role == R_PRIMARY)
3290 				return 1;
3291 			return -1;
3292 		}
3293 
3294 		/* Both are secondary.
3295 		 * Really looks like recovery from simultaneous hard crash.
3296 		 * Check which had been primary before, and arbitrate. */
3297 		switch (rct) {
3298 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3299 		case 1: /*  self_pri && !peer_pri */ return 1;
3300 		case 2: /* !self_pri &&  peer_pri */ return -1;
3301 		case 3: /*  self_pri &&  peer_pri */
3302 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3303 			return dc ? -1 : 1;
3304 		}
3305 	}
3306 
3307 	*rule_nr = 50;
3308 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3309 	if (self == peer)
3310 		return -1;
3311 
3312 	*rule_nr = 51;
3313 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3314 	if (self == peer) {
3315 		if (connection->agreed_pro_version < 96 ?
3316 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3317 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3318 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3319 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3320 			   resync as sync source modifications of the peer's UUIDs. */
3321 
3322 			if (connection->agreed_pro_version < 91)
3323 				return -1091;
3324 
3325 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3326 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3327 
3328 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3329 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3330 
3331 			return -1;
3332 		}
3333 	}
3334 
3335 	*rule_nr = 60;
3336 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3337 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3338 		peer = device->p_uuid[i] & ~((u64)1);
3339 		if (self == peer)
3340 			return -2;
3341 	}
3342 
3343 	*rule_nr = 70;
3344 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3345 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3346 	if (self == peer)
3347 		return 1;
3348 
3349 	*rule_nr = 71;
3350 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3351 	if (self == peer) {
3352 		if (connection->agreed_pro_version < 96 ?
3353 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3354 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3355 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3356 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3357 			   resync as sync source modifications of our UUIDs. */
3358 
3359 			if (connection->agreed_pro_version < 91)
3360 				return -1091;
3361 
3362 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3363 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3364 
3365 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3366 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3367 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3368 
3369 			return 1;
3370 		}
3371 	}
3372 
3373 
3374 	*rule_nr = 80;
3375 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3376 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3377 		self = device->ldev->md.uuid[i] & ~((u64)1);
3378 		if (self == peer)
3379 			return 2;
3380 	}
3381 
3382 	*rule_nr = 90;
3383 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3384 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3385 	if (self == peer && self != ((u64)0))
3386 		return 100;
3387 
3388 	*rule_nr = 100;
3389 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3390 		self = device->ldev->md.uuid[i] & ~((u64)1);
3391 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3392 			peer = device->p_uuid[j] & ~((u64)1);
3393 			if (self == peer)
3394 				return -100;
3395 		}
3396 	}
3397 
3398 	return -1000;
3399 }
3400 
3401 /* drbd_sync_handshake() returns the new conn state on success, or
3402    CONN_MASK (-1) on failure.
3403  */
3404 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3405 					   enum drbd_role peer_role,
3406 					   enum drbd_disk_state peer_disk) __must_hold(local)
3407 {
3408 	struct drbd_device *device = peer_device->device;
3409 	enum drbd_conns rv = C_MASK;
3410 	enum drbd_disk_state mydisk;
3411 	struct net_conf *nc;
3412 	int hg, rule_nr, rr_conflict, tentative;
3413 
3414 	mydisk = device->state.disk;
3415 	if (mydisk == D_NEGOTIATING)
3416 		mydisk = device->new_state_tmp.disk;
3417 
3418 	drbd_info(device, "drbd_sync_handshake:\n");
3419 
3420 	spin_lock_irq(&device->ldev->md.uuid_lock);
3421 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3422 	drbd_uuid_dump(device, "peer", device->p_uuid,
3423 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3424 
3425 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3426 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3427 
3428 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3429 
3430 	if (hg == -1000) {
3431 		drbd_alert(device, "Unrelated data, aborting!\n");
3432 		return C_MASK;
3433 	}
3434 	if (hg < -0x10000) {
3435 		int proto, fflags;
3436 		hg = -hg;
3437 		proto = hg & 0xff;
3438 		fflags = (hg >> 8) & 0xff;
3439 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3440 					proto, fflags);
3441 		return C_MASK;
3442 	}
3443 	if (hg < -1000) {
3444 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3445 		return C_MASK;
3446 	}
3447 
3448 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3449 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3450 		int f = (hg == -100) || abs(hg) == 2;
3451 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3452 		if (f)
3453 			hg = hg*2;
3454 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3455 		     hg > 0 ? "source" : "target");
3456 	}
3457 
3458 	if (abs(hg) == 100)
3459 		drbd_khelper(device, "initial-split-brain");
3460 
3461 	rcu_read_lock();
3462 	nc = rcu_dereference(peer_device->connection->net_conf);
3463 
3464 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3465 		int pcount = (device->state.role == R_PRIMARY)
3466 			   + (peer_role == R_PRIMARY);
3467 		int forced = (hg == -100);
3468 
3469 		switch (pcount) {
3470 		case 0:
3471 			hg = drbd_asb_recover_0p(peer_device);
3472 			break;
3473 		case 1:
3474 			hg = drbd_asb_recover_1p(peer_device);
3475 			break;
3476 		case 2:
3477 			hg = drbd_asb_recover_2p(peer_device);
3478 			break;
3479 		}
3480 		if (abs(hg) < 100) {
3481 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3482 			     "automatically solved. Sync from %s node\n",
3483 			     pcount, (hg < 0) ? "peer" : "this");
3484 			if (forced) {
3485 				drbd_warn(device, "Doing a full sync, since"
3486 				     " UUIDs where ambiguous.\n");
3487 				hg = hg*2;
3488 			}
3489 		}
3490 	}
3491 
3492 	if (hg == -100) {
3493 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3494 			hg = -1;
3495 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3496 			hg = 1;
3497 
3498 		if (abs(hg) < 100)
3499 			drbd_warn(device, "Split-Brain detected, manually solved. "
3500 			     "Sync from %s node\n",
3501 			     (hg < 0) ? "peer" : "this");
3502 	}
3503 	rr_conflict = nc->rr_conflict;
3504 	tentative = nc->tentative;
3505 	rcu_read_unlock();
3506 
3507 	if (hg == -100) {
3508 		/* FIXME this log message is not correct if we end up here
3509 		 * after an attempted attach on a diskless node.
3510 		 * We just refuse to attach -- well, we drop the "connection"
3511 		 * to that disk, in a way... */
3512 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3513 		drbd_khelper(device, "split-brain");
3514 		return C_MASK;
3515 	}
3516 
3517 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3518 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3519 		return C_MASK;
3520 	}
3521 
3522 	if (hg < 0 && /* by intention we do not use mydisk here. */
3523 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3524 		switch (rr_conflict) {
3525 		case ASB_CALL_HELPER:
3526 			drbd_khelper(device, "pri-lost");
3527 			/* fall through */
3528 		case ASB_DISCONNECT:
3529 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3530 			return C_MASK;
3531 		case ASB_VIOLENTLY:
3532 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3533 			     "assumption\n");
3534 		}
3535 	}
3536 
3537 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3538 		if (hg == 0)
3539 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3540 		else
3541 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3542 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3543 				 abs(hg) >= 2 ? "full" : "bit-map based");
3544 		return C_MASK;
3545 	}
3546 
3547 	if (abs(hg) >= 2) {
3548 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3549 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3550 					BM_LOCKED_SET_ALLOWED))
3551 			return C_MASK;
3552 	}
3553 
3554 	if (hg > 0) { /* become sync source. */
3555 		rv = C_WF_BITMAP_S;
3556 	} else if (hg < 0) { /* become sync target */
3557 		rv = C_WF_BITMAP_T;
3558 	} else {
3559 		rv = C_CONNECTED;
3560 		if (drbd_bm_total_weight(device)) {
3561 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3562 			     drbd_bm_total_weight(device));
3563 		}
3564 	}
3565 
3566 	return rv;
3567 }
3568 
3569 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3570 {
3571 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3572 	if (peer == ASB_DISCARD_REMOTE)
3573 		return ASB_DISCARD_LOCAL;
3574 
3575 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3576 	if (peer == ASB_DISCARD_LOCAL)
3577 		return ASB_DISCARD_REMOTE;
3578 
3579 	/* everything else is valid if they are equal on both sides. */
3580 	return peer;
3581 }
3582 
3583 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3584 {
3585 	struct p_protocol *p = pi->data;
3586 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3587 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3588 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3589 	char integrity_alg[SHARED_SECRET_MAX] = "";
3590 	struct crypto_ahash *peer_integrity_tfm = NULL;
3591 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3592 
3593 	p_proto		= be32_to_cpu(p->protocol);
3594 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3595 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3596 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3597 	p_two_primaries = be32_to_cpu(p->two_primaries);
3598 	cf		= be32_to_cpu(p->conn_flags);
3599 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3600 
3601 	if (connection->agreed_pro_version >= 87) {
3602 		int err;
3603 
3604 		if (pi->size > sizeof(integrity_alg))
3605 			return -EIO;
3606 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3607 		if (err)
3608 			return err;
3609 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3610 	}
3611 
3612 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3613 		clear_bit(CONN_DRY_RUN, &connection->flags);
3614 
3615 		if (cf & CF_DRY_RUN)
3616 			set_bit(CONN_DRY_RUN, &connection->flags);
3617 
3618 		rcu_read_lock();
3619 		nc = rcu_dereference(connection->net_conf);
3620 
3621 		if (p_proto != nc->wire_protocol) {
3622 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3623 			goto disconnect_rcu_unlock;
3624 		}
3625 
3626 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3627 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3628 			goto disconnect_rcu_unlock;
3629 		}
3630 
3631 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3632 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3633 			goto disconnect_rcu_unlock;
3634 		}
3635 
3636 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3637 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3638 			goto disconnect_rcu_unlock;
3639 		}
3640 
3641 		if (p_discard_my_data && nc->discard_my_data) {
3642 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3643 			goto disconnect_rcu_unlock;
3644 		}
3645 
3646 		if (p_two_primaries != nc->two_primaries) {
3647 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3648 			goto disconnect_rcu_unlock;
3649 		}
3650 
3651 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3652 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3653 			goto disconnect_rcu_unlock;
3654 		}
3655 
3656 		rcu_read_unlock();
3657 	}
3658 
3659 	if (integrity_alg[0]) {
3660 		int hash_size;
3661 
3662 		/*
3663 		 * We can only change the peer data integrity algorithm
3664 		 * here.  Changing our own data integrity algorithm
3665 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3666 		 * the same time; otherwise, the peer has no way to
3667 		 * tell between which packets the algorithm should
3668 		 * change.
3669 		 */
3670 
3671 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3672 		if (IS_ERR(peer_integrity_tfm)) {
3673 			peer_integrity_tfm = NULL;
3674 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3675 				 integrity_alg);
3676 			goto disconnect;
3677 		}
3678 
3679 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3680 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3681 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3682 		if (!(int_dig_in && int_dig_vv)) {
3683 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3684 			goto disconnect;
3685 		}
3686 	}
3687 
3688 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3689 	if (!new_net_conf) {
3690 		drbd_err(connection, "Allocation of new net_conf failed\n");
3691 		goto disconnect;
3692 	}
3693 
3694 	mutex_lock(&connection->data.mutex);
3695 	mutex_lock(&connection->resource->conf_update);
3696 	old_net_conf = connection->net_conf;
3697 	*new_net_conf = *old_net_conf;
3698 
3699 	new_net_conf->wire_protocol = p_proto;
3700 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3701 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3702 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3703 	new_net_conf->two_primaries = p_two_primaries;
3704 
3705 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3706 	mutex_unlock(&connection->resource->conf_update);
3707 	mutex_unlock(&connection->data.mutex);
3708 
3709 	crypto_free_ahash(connection->peer_integrity_tfm);
3710 	kfree(connection->int_dig_in);
3711 	kfree(connection->int_dig_vv);
3712 	connection->peer_integrity_tfm = peer_integrity_tfm;
3713 	connection->int_dig_in = int_dig_in;
3714 	connection->int_dig_vv = int_dig_vv;
3715 
3716 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3717 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3718 			  integrity_alg[0] ? integrity_alg : "(none)");
3719 
3720 	synchronize_rcu();
3721 	kfree(old_net_conf);
3722 	return 0;
3723 
3724 disconnect_rcu_unlock:
3725 	rcu_read_unlock();
3726 disconnect:
3727 	crypto_free_ahash(peer_integrity_tfm);
3728 	kfree(int_dig_in);
3729 	kfree(int_dig_vv);
3730 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3731 	return -EIO;
3732 }
3733 
3734 /* helper function
3735  * input: alg name, feature name
3736  * return: NULL (alg name was "")
3737  *         ERR_PTR(error) if something goes wrong
3738  *         or the crypto hash ptr, if it worked out ok. */
3739 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3740 		const char *alg, const char *name)
3741 {
3742 	struct crypto_ahash *tfm;
3743 
3744 	if (!alg[0])
3745 		return NULL;
3746 
3747 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3748 	if (IS_ERR(tfm)) {
3749 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3750 			alg, name, PTR_ERR(tfm));
3751 		return tfm;
3752 	}
3753 	return tfm;
3754 }
3755 
3756 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3757 {
3758 	void *buffer = connection->data.rbuf;
3759 	int size = pi->size;
3760 
3761 	while (size) {
3762 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3763 		s = drbd_recv(connection, buffer, s);
3764 		if (s <= 0) {
3765 			if (s < 0)
3766 				return s;
3767 			break;
3768 		}
3769 		size -= s;
3770 	}
3771 	if (size)
3772 		return -EIO;
3773 	return 0;
3774 }
3775 
3776 /*
3777  * config_unknown_volume  -  device configuration command for unknown volume
3778  *
3779  * When a device is added to an existing connection, the node on which the
3780  * device is added first will send configuration commands to its peer but the
3781  * peer will not know about the device yet.  It will warn and ignore these
3782  * commands.  Once the device is added on the second node, the second node will
3783  * send the same device configuration commands, but in the other direction.
3784  *
3785  * (We can also end up here if drbd is misconfigured.)
3786  */
3787 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3788 {
3789 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3790 		  cmdname(pi->cmd), pi->vnr);
3791 	return ignore_remaining_packet(connection, pi);
3792 }
3793 
3794 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3795 {
3796 	struct drbd_peer_device *peer_device;
3797 	struct drbd_device *device;
3798 	struct p_rs_param_95 *p;
3799 	unsigned int header_size, data_size, exp_max_sz;
3800 	struct crypto_ahash *verify_tfm = NULL;
3801 	struct crypto_ahash *csums_tfm = NULL;
3802 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3803 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3804 	const int apv = connection->agreed_pro_version;
3805 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3806 	int fifo_size = 0;
3807 	int err;
3808 
3809 	peer_device = conn_peer_device(connection, pi->vnr);
3810 	if (!peer_device)
3811 		return config_unknown_volume(connection, pi);
3812 	device = peer_device->device;
3813 
3814 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3815 		    : apv == 88 ? sizeof(struct p_rs_param)
3816 					+ SHARED_SECRET_MAX
3817 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3818 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3819 
3820 	if (pi->size > exp_max_sz) {
3821 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3822 		    pi->size, exp_max_sz);
3823 		return -EIO;
3824 	}
3825 
3826 	if (apv <= 88) {
3827 		header_size = sizeof(struct p_rs_param);
3828 		data_size = pi->size - header_size;
3829 	} else if (apv <= 94) {
3830 		header_size = sizeof(struct p_rs_param_89);
3831 		data_size = pi->size - header_size;
3832 		D_ASSERT(device, data_size == 0);
3833 	} else {
3834 		header_size = sizeof(struct p_rs_param_95);
3835 		data_size = pi->size - header_size;
3836 		D_ASSERT(device, data_size == 0);
3837 	}
3838 
3839 	/* initialize verify_alg and csums_alg */
3840 	p = pi->data;
3841 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3842 
3843 	err = drbd_recv_all(peer_device->connection, p, header_size);
3844 	if (err)
3845 		return err;
3846 
3847 	mutex_lock(&connection->resource->conf_update);
3848 	old_net_conf = peer_device->connection->net_conf;
3849 	if (get_ldev(device)) {
3850 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3851 		if (!new_disk_conf) {
3852 			put_ldev(device);
3853 			mutex_unlock(&connection->resource->conf_update);
3854 			drbd_err(device, "Allocation of new disk_conf failed\n");
3855 			return -ENOMEM;
3856 		}
3857 
3858 		old_disk_conf = device->ldev->disk_conf;
3859 		*new_disk_conf = *old_disk_conf;
3860 
3861 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3862 	}
3863 
3864 	if (apv >= 88) {
3865 		if (apv == 88) {
3866 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3867 				drbd_err(device, "verify-alg of wrong size, "
3868 					"peer wants %u, accepting only up to %u byte\n",
3869 					data_size, SHARED_SECRET_MAX);
3870 				err = -EIO;
3871 				goto reconnect;
3872 			}
3873 
3874 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3875 			if (err)
3876 				goto reconnect;
3877 			/* we expect NUL terminated string */
3878 			/* but just in case someone tries to be evil */
3879 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3880 			p->verify_alg[data_size-1] = 0;
3881 
3882 		} else /* apv >= 89 */ {
3883 			/* we still expect NUL terminated strings */
3884 			/* but just in case someone tries to be evil */
3885 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3886 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3887 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3888 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3889 		}
3890 
3891 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3892 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3893 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3894 				    old_net_conf->verify_alg, p->verify_alg);
3895 				goto disconnect;
3896 			}
3897 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3898 					p->verify_alg, "verify-alg");
3899 			if (IS_ERR(verify_tfm)) {
3900 				verify_tfm = NULL;
3901 				goto disconnect;
3902 			}
3903 		}
3904 
3905 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3906 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3907 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3908 				    old_net_conf->csums_alg, p->csums_alg);
3909 				goto disconnect;
3910 			}
3911 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3912 					p->csums_alg, "csums-alg");
3913 			if (IS_ERR(csums_tfm)) {
3914 				csums_tfm = NULL;
3915 				goto disconnect;
3916 			}
3917 		}
3918 
3919 		if (apv > 94 && new_disk_conf) {
3920 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3921 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3922 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3923 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3924 
3925 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3926 			if (fifo_size != device->rs_plan_s->size) {
3927 				new_plan = fifo_alloc(fifo_size);
3928 				if (!new_plan) {
3929 					drbd_err(device, "kmalloc of fifo_buffer failed");
3930 					put_ldev(device);
3931 					goto disconnect;
3932 				}
3933 			}
3934 		}
3935 
3936 		if (verify_tfm || csums_tfm) {
3937 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3938 			if (!new_net_conf) {
3939 				drbd_err(device, "Allocation of new net_conf failed\n");
3940 				goto disconnect;
3941 			}
3942 
3943 			*new_net_conf = *old_net_conf;
3944 
3945 			if (verify_tfm) {
3946 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3947 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3948 				crypto_free_ahash(peer_device->connection->verify_tfm);
3949 				peer_device->connection->verify_tfm = verify_tfm;
3950 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3951 			}
3952 			if (csums_tfm) {
3953 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3954 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3955 				crypto_free_ahash(peer_device->connection->csums_tfm);
3956 				peer_device->connection->csums_tfm = csums_tfm;
3957 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3958 			}
3959 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3960 		}
3961 	}
3962 
3963 	if (new_disk_conf) {
3964 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3965 		put_ldev(device);
3966 	}
3967 
3968 	if (new_plan) {
3969 		old_plan = device->rs_plan_s;
3970 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3971 	}
3972 
3973 	mutex_unlock(&connection->resource->conf_update);
3974 	synchronize_rcu();
3975 	if (new_net_conf)
3976 		kfree(old_net_conf);
3977 	kfree(old_disk_conf);
3978 	kfree(old_plan);
3979 
3980 	return 0;
3981 
3982 reconnect:
3983 	if (new_disk_conf) {
3984 		put_ldev(device);
3985 		kfree(new_disk_conf);
3986 	}
3987 	mutex_unlock(&connection->resource->conf_update);
3988 	return -EIO;
3989 
3990 disconnect:
3991 	kfree(new_plan);
3992 	if (new_disk_conf) {
3993 		put_ldev(device);
3994 		kfree(new_disk_conf);
3995 	}
3996 	mutex_unlock(&connection->resource->conf_update);
3997 	/* just for completeness: actually not needed,
3998 	 * as this is not reached if csums_tfm was ok. */
3999 	crypto_free_ahash(csums_tfm);
4000 	/* but free the verify_tfm again, if csums_tfm did not work out */
4001 	crypto_free_ahash(verify_tfm);
4002 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4003 	return -EIO;
4004 }
4005 
4006 /* warn if the arguments differ by more than 12.5% */
4007 static void warn_if_differ_considerably(struct drbd_device *device,
4008 	const char *s, sector_t a, sector_t b)
4009 {
4010 	sector_t d;
4011 	if (a == 0 || b == 0)
4012 		return;
4013 	d = (a > b) ? (a - b) : (b - a);
4014 	if (d > (a>>3) || d > (b>>3))
4015 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4016 		     (unsigned long long)a, (unsigned long long)b);
4017 }
4018 
4019 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4020 {
4021 	struct drbd_peer_device *peer_device;
4022 	struct drbd_device *device;
4023 	struct p_sizes *p = pi->data;
4024 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4025 	enum determine_dev_size dd = DS_UNCHANGED;
4026 	sector_t p_size, p_usize, p_csize, my_usize;
4027 	int ldsc = 0; /* local disk size changed */
4028 	enum dds_flags ddsf;
4029 
4030 	peer_device = conn_peer_device(connection, pi->vnr);
4031 	if (!peer_device)
4032 		return config_unknown_volume(connection, pi);
4033 	device = peer_device->device;
4034 
4035 	p_size = be64_to_cpu(p->d_size);
4036 	p_usize = be64_to_cpu(p->u_size);
4037 	p_csize = be64_to_cpu(p->c_size);
4038 
4039 	/* just store the peer's disk size for now.
4040 	 * we still need to figure out whether we accept that. */
4041 	device->p_size = p_size;
4042 
4043 	if (get_ldev(device)) {
4044 		sector_t new_size, cur_size;
4045 		rcu_read_lock();
4046 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4047 		rcu_read_unlock();
4048 
4049 		warn_if_differ_considerably(device, "lower level device sizes",
4050 			   p_size, drbd_get_max_capacity(device->ldev));
4051 		warn_if_differ_considerably(device, "user requested size",
4052 					    p_usize, my_usize);
4053 
4054 		/* if this is the first connect, or an otherwise expected
4055 		 * param exchange, choose the minimum */
4056 		if (device->state.conn == C_WF_REPORT_PARAMS)
4057 			p_usize = min_not_zero(my_usize, p_usize);
4058 
4059 		/* Never shrink a device with usable data during connect.
4060 		   But allow online shrinking if we are connected. */
4061 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4062 		cur_size = drbd_get_capacity(device->this_bdev);
4063 		if (new_size < cur_size &&
4064 		    device->state.disk >= D_OUTDATED &&
4065 		    device->state.conn < C_CONNECTED) {
4066 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4067 					(unsigned long long)new_size, (unsigned long long)cur_size);
4068 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4069 			put_ldev(device);
4070 			return -EIO;
4071 		}
4072 
4073 		if (my_usize != p_usize) {
4074 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4075 
4076 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4077 			if (!new_disk_conf) {
4078 				drbd_err(device, "Allocation of new disk_conf failed\n");
4079 				put_ldev(device);
4080 				return -ENOMEM;
4081 			}
4082 
4083 			mutex_lock(&connection->resource->conf_update);
4084 			old_disk_conf = device->ldev->disk_conf;
4085 			*new_disk_conf = *old_disk_conf;
4086 			new_disk_conf->disk_size = p_usize;
4087 
4088 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4089 			mutex_unlock(&connection->resource->conf_update);
4090 			synchronize_rcu();
4091 			kfree(old_disk_conf);
4092 
4093 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
4094 				 (unsigned long)my_usize);
4095 		}
4096 
4097 		put_ldev(device);
4098 	}
4099 
4100 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4101 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4102 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4103 	   drbd_reconsider_queue_parameters(), we can be sure that after
4104 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4105 
4106 	ddsf = be16_to_cpu(p->dds_flags);
4107 	if (get_ldev(device)) {
4108 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4109 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4110 		put_ldev(device);
4111 		if (dd == DS_ERROR)
4112 			return -EIO;
4113 		drbd_md_sync(device);
4114 	} else {
4115 		/*
4116 		 * I am diskless, need to accept the peer's *current* size.
4117 		 * I must NOT accept the peers backing disk size,
4118 		 * it may have been larger than mine all along...
4119 		 *
4120 		 * At this point, the peer knows more about my disk, or at
4121 		 * least about what we last agreed upon, than myself.
4122 		 * So if his c_size is less than his d_size, the most likely
4123 		 * reason is that *my* d_size was smaller last time we checked.
4124 		 *
4125 		 * However, if he sends a zero current size,
4126 		 * take his (user-capped or) backing disk size anyways.
4127 		 */
4128 		drbd_reconsider_queue_parameters(device, NULL, o);
4129 		drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4130 	}
4131 
4132 	if (get_ldev(device)) {
4133 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4134 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4135 			ldsc = 1;
4136 		}
4137 
4138 		put_ldev(device);
4139 	}
4140 
4141 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4142 		if (be64_to_cpu(p->c_size) !=
4143 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4144 			/* we have different sizes, probably peer
4145 			 * needs to know my new size... */
4146 			drbd_send_sizes(peer_device, 0, ddsf);
4147 		}
4148 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4149 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4150 			if (device->state.pdsk >= D_INCONSISTENT &&
4151 			    device->state.disk >= D_INCONSISTENT) {
4152 				if (ddsf & DDSF_NO_RESYNC)
4153 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4154 				else
4155 					resync_after_online_grow(device);
4156 			} else
4157 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4158 		}
4159 	}
4160 
4161 	return 0;
4162 }
4163 
4164 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4165 {
4166 	struct drbd_peer_device *peer_device;
4167 	struct drbd_device *device;
4168 	struct p_uuids *p = pi->data;
4169 	u64 *p_uuid;
4170 	int i, updated_uuids = 0;
4171 
4172 	peer_device = conn_peer_device(connection, pi->vnr);
4173 	if (!peer_device)
4174 		return config_unknown_volume(connection, pi);
4175 	device = peer_device->device;
4176 
4177 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4178 	if (!p_uuid) {
4179 		drbd_err(device, "kmalloc of p_uuid failed\n");
4180 		return false;
4181 	}
4182 
4183 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4184 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4185 
4186 	kfree(device->p_uuid);
4187 	device->p_uuid = p_uuid;
4188 
4189 	if (device->state.conn < C_CONNECTED &&
4190 	    device->state.disk < D_INCONSISTENT &&
4191 	    device->state.role == R_PRIMARY &&
4192 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4193 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4194 		    (unsigned long long)device->ed_uuid);
4195 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4196 		return -EIO;
4197 	}
4198 
4199 	if (get_ldev(device)) {
4200 		int skip_initial_sync =
4201 			device->state.conn == C_CONNECTED &&
4202 			peer_device->connection->agreed_pro_version >= 90 &&
4203 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4204 			(p_uuid[UI_FLAGS] & 8);
4205 		if (skip_initial_sync) {
4206 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4207 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4208 					"clear_n_write from receive_uuids",
4209 					BM_LOCKED_TEST_ALLOWED);
4210 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4211 			_drbd_uuid_set(device, UI_BITMAP, 0);
4212 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4213 					CS_VERBOSE, NULL);
4214 			drbd_md_sync(device);
4215 			updated_uuids = 1;
4216 		}
4217 		put_ldev(device);
4218 	} else if (device->state.disk < D_INCONSISTENT &&
4219 		   device->state.role == R_PRIMARY) {
4220 		/* I am a diskless primary, the peer just created a new current UUID
4221 		   for me. */
4222 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4223 	}
4224 
4225 	/* Before we test for the disk state, we should wait until an eventually
4226 	   ongoing cluster wide state change is finished. That is important if
4227 	   we are primary and are detaching from our disk. We need to see the
4228 	   new disk state... */
4229 	mutex_lock(device->state_mutex);
4230 	mutex_unlock(device->state_mutex);
4231 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4232 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4233 
4234 	if (updated_uuids)
4235 		drbd_print_uuids(device, "receiver updated UUIDs to");
4236 
4237 	return 0;
4238 }
4239 
4240 /**
4241  * convert_state() - Converts the peer's view of the cluster state to our point of view
4242  * @ps:		The state as seen by the peer.
4243  */
4244 static union drbd_state convert_state(union drbd_state ps)
4245 {
4246 	union drbd_state ms;
4247 
4248 	static enum drbd_conns c_tab[] = {
4249 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4250 		[C_CONNECTED] = C_CONNECTED,
4251 
4252 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4253 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4254 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4255 		[C_VERIFY_S]       = C_VERIFY_T,
4256 		[C_MASK]   = C_MASK,
4257 	};
4258 
4259 	ms.i = ps.i;
4260 
4261 	ms.conn = c_tab[ps.conn];
4262 	ms.peer = ps.role;
4263 	ms.role = ps.peer;
4264 	ms.pdsk = ps.disk;
4265 	ms.disk = ps.pdsk;
4266 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4267 
4268 	return ms;
4269 }
4270 
4271 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4272 {
4273 	struct drbd_peer_device *peer_device;
4274 	struct drbd_device *device;
4275 	struct p_req_state *p = pi->data;
4276 	union drbd_state mask, val;
4277 	enum drbd_state_rv rv;
4278 
4279 	peer_device = conn_peer_device(connection, pi->vnr);
4280 	if (!peer_device)
4281 		return -EIO;
4282 	device = peer_device->device;
4283 
4284 	mask.i = be32_to_cpu(p->mask);
4285 	val.i = be32_to_cpu(p->val);
4286 
4287 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4288 	    mutex_is_locked(device->state_mutex)) {
4289 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4290 		return 0;
4291 	}
4292 
4293 	mask = convert_state(mask);
4294 	val = convert_state(val);
4295 
4296 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4297 	drbd_send_sr_reply(peer_device, rv);
4298 
4299 	drbd_md_sync(device);
4300 
4301 	return 0;
4302 }
4303 
4304 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4305 {
4306 	struct p_req_state *p = pi->data;
4307 	union drbd_state mask, val;
4308 	enum drbd_state_rv rv;
4309 
4310 	mask.i = be32_to_cpu(p->mask);
4311 	val.i = be32_to_cpu(p->val);
4312 
4313 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4314 	    mutex_is_locked(&connection->cstate_mutex)) {
4315 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4316 		return 0;
4317 	}
4318 
4319 	mask = convert_state(mask);
4320 	val = convert_state(val);
4321 
4322 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4323 	conn_send_sr_reply(connection, rv);
4324 
4325 	return 0;
4326 }
4327 
4328 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4329 {
4330 	struct drbd_peer_device *peer_device;
4331 	struct drbd_device *device;
4332 	struct p_state *p = pi->data;
4333 	union drbd_state os, ns, peer_state;
4334 	enum drbd_disk_state real_peer_disk;
4335 	enum chg_state_flags cs_flags;
4336 	int rv;
4337 
4338 	peer_device = conn_peer_device(connection, pi->vnr);
4339 	if (!peer_device)
4340 		return config_unknown_volume(connection, pi);
4341 	device = peer_device->device;
4342 
4343 	peer_state.i = be32_to_cpu(p->state);
4344 
4345 	real_peer_disk = peer_state.disk;
4346 	if (peer_state.disk == D_NEGOTIATING) {
4347 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4348 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4349 	}
4350 
4351 	spin_lock_irq(&device->resource->req_lock);
4352  retry:
4353 	os = ns = drbd_read_state(device);
4354 	spin_unlock_irq(&device->resource->req_lock);
4355 
4356 	/* If some other part of the code (ack_receiver thread, timeout)
4357 	 * already decided to close the connection again,
4358 	 * we must not "re-establish" it here. */
4359 	if (os.conn <= C_TEAR_DOWN)
4360 		return -ECONNRESET;
4361 
4362 	/* If this is the "end of sync" confirmation, usually the peer disk
4363 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4364 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4365 	 * unpause-sync events has been "just right", the peer disk may
4366 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4367 	 */
4368 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4369 	    real_peer_disk == D_UP_TO_DATE &&
4370 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4371 		/* If we are (becoming) SyncSource, but peer is still in sync
4372 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4373 		 * will change to inconsistent once the peer reaches active
4374 		 * syncing states.
4375 		 * It may have changed syncer-paused flags, however, so we
4376 		 * cannot ignore this completely. */
4377 		if (peer_state.conn > C_CONNECTED &&
4378 		    peer_state.conn < C_SYNC_SOURCE)
4379 			real_peer_disk = D_INCONSISTENT;
4380 
4381 		/* if peer_state changes to connected at the same time,
4382 		 * it explicitly notifies us that it finished resync.
4383 		 * Maybe we should finish it up, too? */
4384 		else if (os.conn >= C_SYNC_SOURCE &&
4385 			 peer_state.conn == C_CONNECTED) {
4386 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4387 				drbd_resync_finished(device);
4388 			return 0;
4389 		}
4390 	}
4391 
4392 	/* explicit verify finished notification, stop sector reached. */
4393 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4394 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4395 		ov_out_of_sync_print(device);
4396 		drbd_resync_finished(device);
4397 		return 0;
4398 	}
4399 
4400 	/* peer says his disk is inconsistent, while we think it is uptodate,
4401 	 * and this happens while the peer still thinks we have a sync going on,
4402 	 * but we think we are already done with the sync.
4403 	 * We ignore this to avoid flapping pdsk.
4404 	 * This should not happen, if the peer is a recent version of drbd. */
4405 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4406 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4407 		real_peer_disk = D_UP_TO_DATE;
4408 
4409 	if (ns.conn == C_WF_REPORT_PARAMS)
4410 		ns.conn = C_CONNECTED;
4411 
4412 	if (peer_state.conn == C_AHEAD)
4413 		ns.conn = C_BEHIND;
4414 
4415 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4416 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4417 		int cr; /* consider resync */
4418 
4419 		/* if we established a new connection */
4420 		cr  = (os.conn < C_CONNECTED);
4421 		/* if we had an established connection
4422 		 * and one of the nodes newly attaches a disk */
4423 		cr |= (os.conn == C_CONNECTED &&
4424 		       (peer_state.disk == D_NEGOTIATING ||
4425 			os.disk == D_NEGOTIATING));
4426 		/* if we have both been inconsistent, and the peer has been
4427 		 * forced to be UpToDate with --overwrite-data */
4428 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4429 		/* if we had been plain connected, and the admin requested to
4430 		 * start a sync by "invalidate" or "invalidate-remote" */
4431 		cr |= (os.conn == C_CONNECTED &&
4432 				(peer_state.conn >= C_STARTING_SYNC_S &&
4433 				 peer_state.conn <= C_WF_BITMAP_T));
4434 
4435 		if (cr)
4436 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4437 
4438 		put_ldev(device);
4439 		if (ns.conn == C_MASK) {
4440 			ns.conn = C_CONNECTED;
4441 			if (device->state.disk == D_NEGOTIATING) {
4442 				drbd_force_state(device, NS(disk, D_FAILED));
4443 			} else if (peer_state.disk == D_NEGOTIATING) {
4444 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4445 				peer_state.disk = D_DISKLESS;
4446 				real_peer_disk = D_DISKLESS;
4447 			} else {
4448 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4449 					return -EIO;
4450 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4451 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4452 				return -EIO;
4453 			}
4454 		}
4455 	}
4456 
4457 	spin_lock_irq(&device->resource->req_lock);
4458 	if (os.i != drbd_read_state(device).i)
4459 		goto retry;
4460 	clear_bit(CONSIDER_RESYNC, &device->flags);
4461 	ns.peer = peer_state.role;
4462 	ns.pdsk = real_peer_disk;
4463 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4464 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4465 		ns.disk = device->new_state_tmp.disk;
4466 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4467 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4468 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4469 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4470 		   for temporal network outages! */
4471 		spin_unlock_irq(&device->resource->req_lock);
4472 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4473 		tl_clear(peer_device->connection);
4474 		drbd_uuid_new_current(device);
4475 		clear_bit(NEW_CUR_UUID, &device->flags);
4476 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4477 		return -EIO;
4478 	}
4479 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4480 	ns = drbd_read_state(device);
4481 	spin_unlock_irq(&device->resource->req_lock);
4482 
4483 	if (rv < SS_SUCCESS) {
4484 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4485 		return -EIO;
4486 	}
4487 
4488 	if (os.conn > C_WF_REPORT_PARAMS) {
4489 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4490 		    peer_state.disk != D_NEGOTIATING ) {
4491 			/* we want resync, peer has not yet decided to sync... */
4492 			/* Nowadays only used when forcing a node into primary role and
4493 			   setting its disk to UpToDate with that */
4494 			drbd_send_uuids(peer_device);
4495 			drbd_send_current_state(peer_device);
4496 		}
4497 	}
4498 
4499 	clear_bit(DISCARD_MY_DATA, &device->flags);
4500 
4501 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4502 
4503 	return 0;
4504 }
4505 
4506 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4507 {
4508 	struct drbd_peer_device *peer_device;
4509 	struct drbd_device *device;
4510 	struct p_rs_uuid *p = pi->data;
4511 
4512 	peer_device = conn_peer_device(connection, pi->vnr);
4513 	if (!peer_device)
4514 		return -EIO;
4515 	device = peer_device->device;
4516 
4517 	wait_event(device->misc_wait,
4518 		   device->state.conn == C_WF_SYNC_UUID ||
4519 		   device->state.conn == C_BEHIND ||
4520 		   device->state.conn < C_CONNECTED ||
4521 		   device->state.disk < D_NEGOTIATING);
4522 
4523 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4524 
4525 	/* Here the _drbd_uuid_ functions are right, current should
4526 	   _not_ be rotated into the history */
4527 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4528 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4529 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4530 
4531 		drbd_print_uuids(device, "updated sync uuid");
4532 		drbd_start_resync(device, C_SYNC_TARGET);
4533 
4534 		put_ldev(device);
4535 	} else
4536 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4537 
4538 	return 0;
4539 }
4540 
4541 /**
4542  * receive_bitmap_plain
4543  *
4544  * Return 0 when done, 1 when another iteration is needed, and a negative error
4545  * code upon failure.
4546  */
4547 static int
4548 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4549 		     unsigned long *p, struct bm_xfer_ctx *c)
4550 {
4551 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4552 				 drbd_header_size(peer_device->connection);
4553 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4554 				       c->bm_words - c->word_offset);
4555 	unsigned int want = num_words * sizeof(*p);
4556 	int err;
4557 
4558 	if (want != size) {
4559 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4560 		return -EIO;
4561 	}
4562 	if (want == 0)
4563 		return 0;
4564 	err = drbd_recv_all(peer_device->connection, p, want);
4565 	if (err)
4566 		return err;
4567 
4568 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4569 
4570 	c->word_offset += num_words;
4571 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4572 	if (c->bit_offset > c->bm_bits)
4573 		c->bit_offset = c->bm_bits;
4574 
4575 	return 1;
4576 }
4577 
4578 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4579 {
4580 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4581 }
4582 
4583 static int dcbp_get_start(struct p_compressed_bm *p)
4584 {
4585 	return (p->encoding & 0x80) != 0;
4586 }
4587 
4588 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4589 {
4590 	return (p->encoding >> 4) & 0x7;
4591 }
4592 
4593 /**
4594  * recv_bm_rle_bits
4595  *
4596  * Return 0 when done, 1 when another iteration is needed, and a negative error
4597  * code upon failure.
4598  */
4599 static int
4600 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4601 		struct p_compressed_bm *p,
4602 		 struct bm_xfer_ctx *c,
4603 		 unsigned int len)
4604 {
4605 	struct bitstream bs;
4606 	u64 look_ahead;
4607 	u64 rl;
4608 	u64 tmp;
4609 	unsigned long s = c->bit_offset;
4610 	unsigned long e;
4611 	int toggle = dcbp_get_start(p);
4612 	int have;
4613 	int bits;
4614 
4615 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4616 
4617 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4618 	if (bits < 0)
4619 		return -EIO;
4620 
4621 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4622 		bits = vli_decode_bits(&rl, look_ahead);
4623 		if (bits <= 0)
4624 			return -EIO;
4625 
4626 		if (toggle) {
4627 			e = s + rl -1;
4628 			if (e >= c->bm_bits) {
4629 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4630 				return -EIO;
4631 			}
4632 			_drbd_bm_set_bits(peer_device->device, s, e);
4633 		}
4634 
4635 		if (have < bits) {
4636 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4637 				have, bits, look_ahead,
4638 				(unsigned int)(bs.cur.b - p->code),
4639 				(unsigned int)bs.buf_len);
4640 			return -EIO;
4641 		}
4642 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4643 		if (likely(bits < 64))
4644 			look_ahead >>= bits;
4645 		else
4646 			look_ahead = 0;
4647 		have -= bits;
4648 
4649 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4650 		if (bits < 0)
4651 			return -EIO;
4652 		look_ahead |= tmp << have;
4653 		have += bits;
4654 	}
4655 
4656 	c->bit_offset = s;
4657 	bm_xfer_ctx_bit_to_word_offset(c);
4658 
4659 	return (s != c->bm_bits);
4660 }
4661 
4662 /**
4663  * decode_bitmap_c
4664  *
4665  * Return 0 when done, 1 when another iteration is needed, and a negative error
4666  * code upon failure.
4667  */
4668 static int
4669 decode_bitmap_c(struct drbd_peer_device *peer_device,
4670 		struct p_compressed_bm *p,
4671 		struct bm_xfer_ctx *c,
4672 		unsigned int len)
4673 {
4674 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4675 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4676 
4677 	/* other variants had been implemented for evaluation,
4678 	 * but have been dropped as this one turned out to be "best"
4679 	 * during all our tests. */
4680 
4681 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4682 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4683 	return -EIO;
4684 }
4685 
4686 void INFO_bm_xfer_stats(struct drbd_device *device,
4687 		const char *direction, struct bm_xfer_ctx *c)
4688 {
4689 	/* what would it take to transfer it "plaintext" */
4690 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4691 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4692 	unsigned int plain =
4693 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4694 		c->bm_words * sizeof(unsigned long);
4695 	unsigned int total = c->bytes[0] + c->bytes[1];
4696 	unsigned int r;
4697 
4698 	/* total can not be zero. but just in case: */
4699 	if (total == 0)
4700 		return;
4701 
4702 	/* don't report if not compressed */
4703 	if (total >= plain)
4704 		return;
4705 
4706 	/* total < plain. check for overflow, still */
4707 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4708 		                    : (1000 * total / plain);
4709 
4710 	if (r > 1000)
4711 		r = 1000;
4712 
4713 	r = 1000 - r;
4714 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4715 	     "total %u; compression: %u.%u%%\n",
4716 			direction,
4717 			c->bytes[1], c->packets[1],
4718 			c->bytes[0], c->packets[0],
4719 			total, r/10, r % 10);
4720 }
4721 
4722 /* Since we are processing the bitfield from lower addresses to higher,
4723    it does not matter if the process it in 32 bit chunks or 64 bit
4724    chunks as long as it is little endian. (Understand it as byte stream,
4725    beginning with the lowest byte...) If we would use big endian
4726    we would need to process it from the highest address to the lowest,
4727    in order to be agnostic to the 32 vs 64 bits issue.
4728 
4729    returns 0 on failure, 1 if we successfully received it. */
4730 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4731 {
4732 	struct drbd_peer_device *peer_device;
4733 	struct drbd_device *device;
4734 	struct bm_xfer_ctx c;
4735 	int err;
4736 
4737 	peer_device = conn_peer_device(connection, pi->vnr);
4738 	if (!peer_device)
4739 		return -EIO;
4740 	device = peer_device->device;
4741 
4742 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4743 	/* you are supposed to send additional out-of-sync information
4744 	 * if you actually set bits during this phase */
4745 
4746 	c = (struct bm_xfer_ctx) {
4747 		.bm_bits = drbd_bm_bits(device),
4748 		.bm_words = drbd_bm_words(device),
4749 	};
4750 
4751 	for(;;) {
4752 		if (pi->cmd == P_BITMAP)
4753 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4754 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4755 			/* MAYBE: sanity check that we speak proto >= 90,
4756 			 * and the feature is enabled! */
4757 			struct p_compressed_bm *p = pi->data;
4758 
4759 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4760 				drbd_err(device, "ReportCBitmap packet too large\n");
4761 				err = -EIO;
4762 				goto out;
4763 			}
4764 			if (pi->size <= sizeof(*p)) {
4765 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4766 				err = -EIO;
4767 				goto out;
4768 			}
4769 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4770 			if (err)
4771 			       goto out;
4772 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4773 		} else {
4774 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4775 			err = -EIO;
4776 			goto out;
4777 		}
4778 
4779 		c.packets[pi->cmd == P_BITMAP]++;
4780 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4781 
4782 		if (err <= 0) {
4783 			if (err < 0)
4784 				goto out;
4785 			break;
4786 		}
4787 		err = drbd_recv_header(peer_device->connection, pi);
4788 		if (err)
4789 			goto out;
4790 	}
4791 
4792 	INFO_bm_xfer_stats(device, "receive", &c);
4793 
4794 	if (device->state.conn == C_WF_BITMAP_T) {
4795 		enum drbd_state_rv rv;
4796 
4797 		err = drbd_send_bitmap(device);
4798 		if (err)
4799 			goto out;
4800 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4801 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4802 		D_ASSERT(device, rv == SS_SUCCESS);
4803 	} else if (device->state.conn != C_WF_BITMAP_S) {
4804 		/* admin may have requested C_DISCONNECTING,
4805 		 * other threads may have noticed network errors */
4806 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4807 		    drbd_conn_str(device->state.conn));
4808 	}
4809 	err = 0;
4810 
4811  out:
4812 	drbd_bm_unlock(device);
4813 	if (!err && device->state.conn == C_WF_BITMAP_S)
4814 		drbd_start_resync(device, C_SYNC_SOURCE);
4815 	return err;
4816 }
4817 
4818 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4819 {
4820 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4821 		 pi->cmd, pi->size);
4822 
4823 	return ignore_remaining_packet(connection, pi);
4824 }
4825 
4826 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4827 {
4828 	/* Make sure we've acked all the TCP data associated
4829 	 * with the data requests being unplugged */
4830 	drbd_tcp_quickack(connection->data.socket);
4831 
4832 	return 0;
4833 }
4834 
4835 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4836 {
4837 	struct drbd_peer_device *peer_device;
4838 	struct drbd_device *device;
4839 	struct p_block_desc *p = pi->data;
4840 
4841 	peer_device = conn_peer_device(connection, pi->vnr);
4842 	if (!peer_device)
4843 		return -EIO;
4844 	device = peer_device->device;
4845 
4846 	switch (device->state.conn) {
4847 	case C_WF_SYNC_UUID:
4848 	case C_WF_BITMAP_T:
4849 	case C_BEHIND:
4850 			break;
4851 	default:
4852 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4853 				drbd_conn_str(device->state.conn));
4854 	}
4855 
4856 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4857 
4858 	return 0;
4859 }
4860 
4861 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4862 {
4863 	struct drbd_peer_device *peer_device;
4864 	struct p_block_desc *p = pi->data;
4865 	struct drbd_device *device;
4866 	sector_t sector;
4867 	int size, err = 0;
4868 
4869 	peer_device = conn_peer_device(connection, pi->vnr);
4870 	if (!peer_device)
4871 		return -EIO;
4872 	device = peer_device->device;
4873 
4874 	sector = be64_to_cpu(p->sector);
4875 	size = be32_to_cpu(p->blksize);
4876 
4877 	dec_rs_pending(device);
4878 
4879 	if (get_ldev(device)) {
4880 		struct drbd_peer_request *peer_req;
4881 		const int op = REQ_OP_DISCARD;
4882 
4883 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4884 					       size, 0, GFP_NOIO);
4885 		if (!peer_req) {
4886 			put_ldev(device);
4887 			return -ENOMEM;
4888 		}
4889 
4890 		peer_req->w.cb = e_end_resync_block;
4891 		peer_req->submit_jif = jiffies;
4892 		peer_req->flags |= EE_IS_TRIM;
4893 
4894 		spin_lock_irq(&device->resource->req_lock);
4895 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4896 		spin_unlock_irq(&device->resource->req_lock);
4897 
4898 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4899 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4900 
4901 		if (err) {
4902 			spin_lock_irq(&device->resource->req_lock);
4903 			list_del(&peer_req->w.list);
4904 			spin_unlock_irq(&device->resource->req_lock);
4905 
4906 			drbd_free_peer_req(device, peer_req);
4907 			put_ldev(device);
4908 			err = 0;
4909 			goto fail;
4910 		}
4911 
4912 		inc_unacked(device);
4913 
4914 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4915 		   as well as drbd_rs_complete_io() */
4916 	} else {
4917 	fail:
4918 		drbd_rs_complete_io(device, sector);
4919 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4920 	}
4921 
4922 	atomic_add(size >> 9, &device->rs_sect_in);
4923 
4924 	return err;
4925 }
4926 
4927 struct data_cmd {
4928 	int expect_payload;
4929 	unsigned int pkt_size;
4930 	int (*fn)(struct drbd_connection *, struct packet_info *);
4931 };
4932 
4933 static struct data_cmd drbd_cmd_handler[] = {
4934 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4935 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4936 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4937 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4938 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4939 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4940 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4941 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4942 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4943 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4944 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4945 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4946 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4947 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4948 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4949 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4950 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4951 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4952 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4953 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4954 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4955 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4956 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4957 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4958 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4959 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4960 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4961 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
4962 };
4963 
4964 static void drbdd(struct drbd_connection *connection)
4965 {
4966 	struct packet_info pi;
4967 	size_t shs; /* sub header size */
4968 	int err;
4969 
4970 	while (get_t_state(&connection->receiver) == RUNNING) {
4971 		struct data_cmd const *cmd;
4972 
4973 		drbd_thread_current_set_cpu(&connection->receiver);
4974 		update_receiver_timing_details(connection, drbd_recv_header);
4975 		if (drbd_recv_header(connection, &pi))
4976 			goto err_out;
4977 
4978 		cmd = &drbd_cmd_handler[pi.cmd];
4979 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4980 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4981 				 cmdname(pi.cmd), pi.cmd);
4982 			goto err_out;
4983 		}
4984 
4985 		shs = cmd->pkt_size;
4986 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4987 			shs += sizeof(struct o_qlim);
4988 		if (pi.size > shs && !cmd->expect_payload) {
4989 			drbd_err(connection, "No payload expected %s l:%d\n",
4990 				 cmdname(pi.cmd), pi.size);
4991 			goto err_out;
4992 		}
4993 		if (pi.size < shs) {
4994 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4995 				 cmdname(pi.cmd), (int)shs, pi.size);
4996 			goto err_out;
4997 		}
4998 
4999 		if (shs) {
5000 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5001 			err = drbd_recv_all_warn(connection, pi.data, shs);
5002 			if (err)
5003 				goto err_out;
5004 			pi.size -= shs;
5005 		}
5006 
5007 		update_receiver_timing_details(connection, cmd->fn);
5008 		err = cmd->fn(connection, &pi);
5009 		if (err) {
5010 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5011 				 cmdname(pi.cmd), err, pi.size);
5012 			goto err_out;
5013 		}
5014 	}
5015 	return;
5016 
5017     err_out:
5018 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5019 }
5020 
5021 static void conn_disconnect(struct drbd_connection *connection)
5022 {
5023 	struct drbd_peer_device *peer_device;
5024 	enum drbd_conns oc;
5025 	int vnr;
5026 
5027 	if (connection->cstate == C_STANDALONE)
5028 		return;
5029 
5030 	/* We are about to start the cleanup after connection loss.
5031 	 * Make sure drbd_make_request knows about that.
5032 	 * Usually we should be in some network failure state already,
5033 	 * but just in case we are not, we fix it up here.
5034 	 */
5035 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5036 
5037 	/* ack_receiver does not clean up anything. it must not interfere, either */
5038 	drbd_thread_stop(&connection->ack_receiver);
5039 	if (connection->ack_sender) {
5040 		destroy_workqueue(connection->ack_sender);
5041 		connection->ack_sender = NULL;
5042 	}
5043 	drbd_free_sock(connection);
5044 
5045 	rcu_read_lock();
5046 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5047 		struct drbd_device *device = peer_device->device;
5048 		kref_get(&device->kref);
5049 		rcu_read_unlock();
5050 		drbd_disconnected(peer_device);
5051 		kref_put(&device->kref, drbd_destroy_device);
5052 		rcu_read_lock();
5053 	}
5054 	rcu_read_unlock();
5055 
5056 	if (!list_empty(&connection->current_epoch->list))
5057 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5058 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5059 	atomic_set(&connection->current_epoch->epoch_size, 0);
5060 	connection->send.seen_any_write_yet = false;
5061 
5062 	drbd_info(connection, "Connection closed\n");
5063 
5064 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5065 		conn_try_outdate_peer_async(connection);
5066 
5067 	spin_lock_irq(&connection->resource->req_lock);
5068 	oc = connection->cstate;
5069 	if (oc >= C_UNCONNECTED)
5070 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5071 
5072 	spin_unlock_irq(&connection->resource->req_lock);
5073 
5074 	if (oc == C_DISCONNECTING)
5075 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5076 }
5077 
5078 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5079 {
5080 	struct drbd_device *device = peer_device->device;
5081 	unsigned int i;
5082 
5083 	/* wait for current activity to cease. */
5084 	spin_lock_irq(&device->resource->req_lock);
5085 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5086 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5087 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5088 	spin_unlock_irq(&device->resource->req_lock);
5089 
5090 	/* We do not have data structures that would allow us to
5091 	 * get the rs_pending_cnt down to 0 again.
5092 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5093 	 *    the pending RSDataRequest's we have sent.
5094 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5095 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5096 	 *  And no, it is not the sum of the reference counts in the
5097 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5098 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5099 	 *  on the fly. */
5100 	drbd_rs_cancel_all(device);
5101 	device->rs_total = 0;
5102 	device->rs_failed = 0;
5103 	atomic_set(&device->rs_pending_cnt, 0);
5104 	wake_up(&device->misc_wait);
5105 
5106 	del_timer_sync(&device->resync_timer);
5107 	resync_timer_fn((unsigned long)device);
5108 
5109 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5110 	 * w_make_resync_request etc. which may still be on the worker queue
5111 	 * to be "canceled" */
5112 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5113 
5114 	drbd_finish_peer_reqs(device);
5115 
5116 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5117 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5118 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5119 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5120 
5121 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5122 	 * again via drbd_try_clear_on_disk_bm(). */
5123 	drbd_rs_cancel_all(device);
5124 
5125 	kfree(device->p_uuid);
5126 	device->p_uuid = NULL;
5127 
5128 	if (!drbd_suspended(device))
5129 		tl_clear(peer_device->connection);
5130 
5131 	drbd_md_sync(device);
5132 
5133 	if (get_ldev(device)) {
5134 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5135 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5136 		put_ldev(device);
5137 	}
5138 
5139 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5140 	 * want to use SO_LINGER, because apparently it can be deferred for
5141 	 * more than 20 seconds (longest time I checked).
5142 	 *
5143 	 * Actually we don't care for exactly when the network stack does its
5144 	 * put_page(), but release our reference on these pages right here.
5145 	 */
5146 	i = drbd_free_peer_reqs(device, &device->net_ee);
5147 	if (i)
5148 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5149 	i = atomic_read(&device->pp_in_use_by_net);
5150 	if (i)
5151 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5152 	i = atomic_read(&device->pp_in_use);
5153 	if (i)
5154 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5155 
5156 	D_ASSERT(device, list_empty(&device->read_ee));
5157 	D_ASSERT(device, list_empty(&device->active_ee));
5158 	D_ASSERT(device, list_empty(&device->sync_ee));
5159 	D_ASSERT(device, list_empty(&device->done_ee));
5160 
5161 	return 0;
5162 }
5163 
5164 /*
5165  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5166  * we can agree on is stored in agreed_pro_version.
5167  *
5168  * feature flags and the reserved array should be enough room for future
5169  * enhancements of the handshake protocol, and possible plugins...
5170  *
5171  * for now, they are expected to be zero, but ignored.
5172  */
5173 static int drbd_send_features(struct drbd_connection *connection)
5174 {
5175 	struct drbd_socket *sock;
5176 	struct p_connection_features *p;
5177 
5178 	sock = &connection->data;
5179 	p = conn_prepare_command(connection, sock);
5180 	if (!p)
5181 		return -EIO;
5182 	memset(p, 0, sizeof(*p));
5183 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5184 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5185 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5186 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5187 }
5188 
5189 /*
5190  * return values:
5191  *   1 yes, we have a valid connection
5192  *   0 oops, did not work out, please try again
5193  *  -1 peer talks different language,
5194  *     no point in trying again, please go standalone.
5195  */
5196 static int drbd_do_features(struct drbd_connection *connection)
5197 {
5198 	/* ASSERT current == connection->receiver ... */
5199 	struct p_connection_features *p;
5200 	const int expect = sizeof(struct p_connection_features);
5201 	struct packet_info pi;
5202 	int err;
5203 
5204 	err = drbd_send_features(connection);
5205 	if (err)
5206 		return 0;
5207 
5208 	err = drbd_recv_header(connection, &pi);
5209 	if (err)
5210 		return 0;
5211 
5212 	if (pi.cmd != P_CONNECTION_FEATURES) {
5213 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5214 			 cmdname(pi.cmd), pi.cmd);
5215 		return -1;
5216 	}
5217 
5218 	if (pi.size != expect) {
5219 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5220 		     expect, pi.size);
5221 		return -1;
5222 	}
5223 
5224 	p = pi.data;
5225 	err = drbd_recv_all_warn(connection, p, expect);
5226 	if (err)
5227 		return 0;
5228 
5229 	p->protocol_min = be32_to_cpu(p->protocol_min);
5230 	p->protocol_max = be32_to_cpu(p->protocol_max);
5231 	if (p->protocol_max == 0)
5232 		p->protocol_max = p->protocol_min;
5233 
5234 	if (PRO_VERSION_MAX < p->protocol_min ||
5235 	    PRO_VERSION_MIN > p->protocol_max)
5236 		goto incompat;
5237 
5238 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5239 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5240 
5241 	drbd_info(connection, "Handshake successful: "
5242 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5243 
5244 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5245 		  connection->agreed_features,
5246 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5247 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5248 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5249 		  connection->agreed_features ? "" : " none");
5250 
5251 	return 1;
5252 
5253  incompat:
5254 	drbd_err(connection, "incompatible DRBD dialects: "
5255 	    "I support %d-%d, peer supports %d-%d\n",
5256 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5257 	    p->protocol_min, p->protocol_max);
5258 	return -1;
5259 }
5260 
5261 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5262 static int drbd_do_auth(struct drbd_connection *connection)
5263 {
5264 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5265 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5266 	return -1;
5267 }
5268 #else
5269 #define CHALLENGE_LEN 64
5270 
5271 /* Return value:
5272 	1 - auth succeeded,
5273 	0 - failed, try again (network error),
5274 	-1 - auth failed, don't try again.
5275 */
5276 
5277 static int drbd_do_auth(struct drbd_connection *connection)
5278 {
5279 	struct drbd_socket *sock;
5280 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5281 	char *response = NULL;
5282 	char *right_response = NULL;
5283 	char *peers_ch = NULL;
5284 	unsigned int key_len;
5285 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5286 	unsigned int resp_size;
5287 	SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5288 	struct packet_info pi;
5289 	struct net_conf *nc;
5290 	int err, rv;
5291 
5292 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5293 
5294 	rcu_read_lock();
5295 	nc = rcu_dereference(connection->net_conf);
5296 	key_len = strlen(nc->shared_secret);
5297 	memcpy(secret, nc->shared_secret, key_len);
5298 	rcu_read_unlock();
5299 
5300 	desc->tfm = connection->cram_hmac_tfm;
5301 	desc->flags = 0;
5302 
5303 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5304 	if (rv) {
5305 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5306 		rv = -1;
5307 		goto fail;
5308 	}
5309 
5310 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5311 
5312 	sock = &connection->data;
5313 	if (!conn_prepare_command(connection, sock)) {
5314 		rv = 0;
5315 		goto fail;
5316 	}
5317 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5318 				my_challenge, CHALLENGE_LEN);
5319 	if (!rv)
5320 		goto fail;
5321 
5322 	err = drbd_recv_header(connection, &pi);
5323 	if (err) {
5324 		rv = 0;
5325 		goto fail;
5326 	}
5327 
5328 	if (pi.cmd != P_AUTH_CHALLENGE) {
5329 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5330 			 cmdname(pi.cmd), pi.cmd);
5331 		rv = 0;
5332 		goto fail;
5333 	}
5334 
5335 	if (pi.size > CHALLENGE_LEN * 2) {
5336 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5337 		rv = -1;
5338 		goto fail;
5339 	}
5340 
5341 	if (pi.size < CHALLENGE_LEN) {
5342 		drbd_err(connection, "AuthChallenge payload too small.\n");
5343 		rv = -1;
5344 		goto fail;
5345 	}
5346 
5347 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5348 	if (peers_ch == NULL) {
5349 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5350 		rv = -1;
5351 		goto fail;
5352 	}
5353 
5354 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5355 	if (err) {
5356 		rv = 0;
5357 		goto fail;
5358 	}
5359 
5360 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5361 		drbd_err(connection, "Peer presented the same challenge!\n");
5362 		rv = -1;
5363 		goto fail;
5364 	}
5365 
5366 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5367 	response = kmalloc(resp_size, GFP_NOIO);
5368 	if (response == NULL) {
5369 		drbd_err(connection, "kmalloc of response failed\n");
5370 		rv = -1;
5371 		goto fail;
5372 	}
5373 
5374 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5375 	if (rv) {
5376 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5377 		rv = -1;
5378 		goto fail;
5379 	}
5380 
5381 	if (!conn_prepare_command(connection, sock)) {
5382 		rv = 0;
5383 		goto fail;
5384 	}
5385 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5386 				response, resp_size);
5387 	if (!rv)
5388 		goto fail;
5389 
5390 	err = drbd_recv_header(connection, &pi);
5391 	if (err) {
5392 		rv = 0;
5393 		goto fail;
5394 	}
5395 
5396 	if (pi.cmd != P_AUTH_RESPONSE) {
5397 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5398 			 cmdname(pi.cmd), pi.cmd);
5399 		rv = 0;
5400 		goto fail;
5401 	}
5402 
5403 	if (pi.size != resp_size) {
5404 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5405 		rv = 0;
5406 		goto fail;
5407 	}
5408 
5409 	err = drbd_recv_all_warn(connection, response , resp_size);
5410 	if (err) {
5411 		rv = 0;
5412 		goto fail;
5413 	}
5414 
5415 	right_response = kmalloc(resp_size, GFP_NOIO);
5416 	if (right_response == NULL) {
5417 		drbd_err(connection, "kmalloc of right_response failed\n");
5418 		rv = -1;
5419 		goto fail;
5420 	}
5421 
5422 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5423 				 right_response);
5424 	if (rv) {
5425 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5426 		rv = -1;
5427 		goto fail;
5428 	}
5429 
5430 	rv = !memcmp(response, right_response, resp_size);
5431 
5432 	if (rv)
5433 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5434 		     resp_size);
5435 	else
5436 		rv = -1;
5437 
5438  fail:
5439 	kfree(peers_ch);
5440 	kfree(response);
5441 	kfree(right_response);
5442 	shash_desc_zero(desc);
5443 
5444 	return rv;
5445 }
5446 #endif
5447 
5448 int drbd_receiver(struct drbd_thread *thi)
5449 {
5450 	struct drbd_connection *connection = thi->connection;
5451 	int h;
5452 
5453 	drbd_info(connection, "receiver (re)started\n");
5454 
5455 	do {
5456 		h = conn_connect(connection);
5457 		if (h == 0) {
5458 			conn_disconnect(connection);
5459 			schedule_timeout_interruptible(HZ);
5460 		}
5461 		if (h == -1) {
5462 			drbd_warn(connection, "Discarding network configuration.\n");
5463 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5464 		}
5465 	} while (h == 0);
5466 
5467 	if (h > 0)
5468 		drbdd(connection);
5469 
5470 	conn_disconnect(connection);
5471 
5472 	drbd_info(connection, "receiver terminated\n");
5473 	return 0;
5474 }
5475 
5476 /* ********* acknowledge sender ******** */
5477 
5478 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5479 {
5480 	struct p_req_state_reply *p = pi->data;
5481 	int retcode = be32_to_cpu(p->retcode);
5482 
5483 	if (retcode >= SS_SUCCESS) {
5484 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5485 	} else {
5486 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5487 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5488 			 drbd_set_st_err_str(retcode), retcode);
5489 	}
5490 	wake_up(&connection->ping_wait);
5491 
5492 	return 0;
5493 }
5494 
5495 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5496 {
5497 	struct drbd_peer_device *peer_device;
5498 	struct drbd_device *device;
5499 	struct p_req_state_reply *p = pi->data;
5500 	int retcode = be32_to_cpu(p->retcode);
5501 
5502 	peer_device = conn_peer_device(connection, pi->vnr);
5503 	if (!peer_device)
5504 		return -EIO;
5505 	device = peer_device->device;
5506 
5507 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5508 		D_ASSERT(device, connection->agreed_pro_version < 100);
5509 		return got_conn_RqSReply(connection, pi);
5510 	}
5511 
5512 	if (retcode >= SS_SUCCESS) {
5513 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5514 	} else {
5515 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5516 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5517 			drbd_set_st_err_str(retcode), retcode);
5518 	}
5519 	wake_up(&device->state_wait);
5520 
5521 	return 0;
5522 }
5523 
5524 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5525 {
5526 	return drbd_send_ping_ack(connection);
5527 
5528 }
5529 
5530 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5531 {
5532 	/* restore idle timeout */
5533 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5534 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5535 		wake_up(&connection->ping_wait);
5536 
5537 	return 0;
5538 }
5539 
5540 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5541 {
5542 	struct drbd_peer_device *peer_device;
5543 	struct drbd_device *device;
5544 	struct p_block_ack *p = pi->data;
5545 	sector_t sector = be64_to_cpu(p->sector);
5546 	int blksize = be32_to_cpu(p->blksize);
5547 
5548 	peer_device = conn_peer_device(connection, pi->vnr);
5549 	if (!peer_device)
5550 		return -EIO;
5551 	device = peer_device->device;
5552 
5553 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5554 
5555 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5556 
5557 	if (get_ldev(device)) {
5558 		drbd_rs_complete_io(device, sector);
5559 		drbd_set_in_sync(device, sector, blksize);
5560 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5561 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5562 		put_ldev(device);
5563 	}
5564 	dec_rs_pending(device);
5565 	atomic_add(blksize >> 9, &device->rs_sect_in);
5566 
5567 	return 0;
5568 }
5569 
5570 static int
5571 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5572 			      struct rb_root *root, const char *func,
5573 			      enum drbd_req_event what, bool missing_ok)
5574 {
5575 	struct drbd_request *req;
5576 	struct bio_and_error m;
5577 
5578 	spin_lock_irq(&device->resource->req_lock);
5579 	req = find_request(device, root, id, sector, missing_ok, func);
5580 	if (unlikely(!req)) {
5581 		spin_unlock_irq(&device->resource->req_lock);
5582 		return -EIO;
5583 	}
5584 	__req_mod(req, what, &m);
5585 	spin_unlock_irq(&device->resource->req_lock);
5586 
5587 	if (m.bio)
5588 		complete_master_bio(device, &m);
5589 	return 0;
5590 }
5591 
5592 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5593 {
5594 	struct drbd_peer_device *peer_device;
5595 	struct drbd_device *device;
5596 	struct p_block_ack *p = pi->data;
5597 	sector_t sector = be64_to_cpu(p->sector);
5598 	int blksize = be32_to_cpu(p->blksize);
5599 	enum drbd_req_event what;
5600 
5601 	peer_device = conn_peer_device(connection, pi->vnr);
5602 	if (!peer_device)
5603 		return -EIO;
5604 	device = peer_device->device;
5605 
5606 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5607 
5608 	if (p->block_id == ID_SYNCER) {
5609 		drbd_set_in_sync(device, sector, blksize);
5610 		dec_rs_pending(device);
5611 		return 0;
5612 	}
5613 	switch (pi->cmd) {
5614 	case P_RS_WRITE_ACK:
5615 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5616 		break;
5617 	case P_WRITE_ACK:
5618 		what = WRITE_ACKED_BY_PEER;
5619 		break;
5620 	case P_RECV_ACK:
5621 		what = RECV_ACKED_BY_PEER;
5622 		break;
5623 	case P_SUPERSEDED:
5624 		what = CONFLICT_RESOLVED;
5625 		break;
5626 	case P_RETRY_WRITE:
5627 		what = POSTPONE_WRITE;
5628 		break;
5629 	default:
5630 		BUG();
5631 	}
5632 
5633 	return validate_req_change_req_state(device, p->block_id, sector,
5634 					     &device->write_requests, __func__,
5635 					     what, false);
5636 }
5637 
5638 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5639 {
5640 	struct drbd_peer_device *peer_device;
5641 	struct drbd_device *device;
5642 	struct p_block_ack *p = pi->data;
5643 	sector_t sector = be64_to_cpu(p->sector);
5644 	int size = be32_to_cpu(p->blksize);
5645 	int err;
5646 
5647 	peer_device = conn_peer_device(connection, pi->vnr);
5648 	if (!peer_device)
5649 		return -EIO;
5650 	device = peer_device->device;
5651 
5652 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5653 
5654 	if (p->block_id == ID_SYNCER) {
5655 		dec_rs_pending(device);
5656 		drbd_rs_failed_io(device, sector, size);
5657 		return 0;
5658 	}
5659 
5660 	err = validate_req_change_req_state(device, p->block_id, sector,
5661 					    &device->write_requests, __func__,
5662 					    NEG_ACKED, true);
5663 	if (err) {
5664 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5665 		   The master bio might already be completed, therefore the
5666 		   request is no longer in the collision hash. */
5667 		/* In Protocol B we might already have got a P_RECV_ACK
5668 		   but then get a P_NEG_ACK afterwards. */
5669 		drbd_set_out_of_sync(device, sector, size);
5670 	}
5671 	return 0;
5672 }
5673 
5674 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5675 {
5676 	struct drbd_peer_device *peer_device;
5677 	struct drbd_device *device;
5678 	struct p_block_ack *p = pi->data;
5679 	sector_t sector = be64_to_cpu(p->sector);
5680 
5681 	peer_device = conn_peer_device(connection, pi->vnr);
5682 	if (!peer_device)
5683 		return -EIO;
5684 	device = peer_device->device;
5685 
5686 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5687 
5688 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5689 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5690 
5691 	return validate_req_change_req_state(device, p->block_id, sector,
5692 					     &device->read_requests, __func__,
5693 					     NEG_ACKED, false);
5694 }
5695 
5696 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5697 {
5698 	struct drbd_peer_device *peer_device;
5699 	struct drbd_device *device;
5700 	sector_t sector;
5701 	int size;
5702 	struct p_block_ack *p = pi->data;
5703 
5704 	peer_device = conn_peer_device(connection, pi->vnr);
5705 	if (!peer_device)
5706 		return -EIO;
5707 	device = peer_device->device;
5708 
5709 	sector = be64_to_cpu(p->sector);
5710 	size = be32_to_cpu(p->blksize);
5711 
5712 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5713 
5714 	dec_rs_pending(device);
5715 
5716 	if (get_ldev_if_state(device, D_FAILED)) {
5717 		drbd_rs_complete_io(device, sector);
5718 		switch (pi->cmd) {
5719 		case P_NEG_RS_DREPLY:
5720 			drbd_rs_failed_io(device, sector, size);
5721 		case P_RS_CANCEL:
5722 			break;
5723 		default:
5724 			BUG();
5725 		}
5726 		put_ldev(device);
5727 	}
5728 
5729 	return 0;
5730 }
5731 
5732 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5733 {
5734 	struct p_barrier_ack *p = pi->data;
5735 	struct drbd_peer_device *peer_device;
5736 	int vnr;
5737 
5738 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5739 
5740 	rcu_read_lock();
5741 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5742 		struct drbd_device *device = peer_device->device;
5743 
5744 		if (device->state.conn == C_AHEAD &&
5745 		    atomic_read(&device->ap_in_flight) == 0 &&
5746 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5747 			device->start_resync_timer.expires = jiffies + HZ;
5748 			add_timer(&device->start_resync_timer);
5749 		}
5750 	}
5751 	rcu_read_unlock();
5752 
5753 	return 0;
5754 }
5755 
5756 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5757 {
5758 	struct drbd_peer_device *peer_device;
5759 	struct drbd_device *device;
5760 	struct p_block_ack *p = pi->data;
5761 	struct drbd_device_work *dw;
5762 	sector_t sector;
5763 	int size;
5764 
5765 	peer_device = conn_peer_device(connection, pi->vnr);
5766 	if (!peer_device)
5767 		return -EIO;
5768 	device = peer_device->device;
5769 
5770 	sector = be64_to_cpu(p->sector);
5771 	size = be32_to_cpu(p->blksize);
5772 
5773 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5774 
5775 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5776 		drbd_ov_out_of_sync_found(device, sector, size);
5777 	else
5778 		ov_out_of_sync_print(device);
5779 
5780 	if (!get_ldev(device))
5781 		return 0;
5782 
5783 	drbd_rs_complete_io(device, sector);
5784 	dec_rs_pending(device);
5785 
5786 	--device->ov_left;
5787 
5788 	/* let's advance progress step marks only for every other megabyte */
5789 	if ((device->ov_left & 0x200) == 0x200)
5790 		drbd_advance_rs_marks(device, device->ov_left);
5791 
5792 	if (device->ov_left == 0) {
5793 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5794 		if (dw) {
5795 			dw->w.cb = w_ov_finished;
5796 			dw->device = device;
5797 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5798 		} else {
5799 			drbd_err(device, "kmalloc(dw) failed.");
5800 			ov_out_of_sync_print(device);
5801 			drbd_resync_finished(device);
5802 		}
5803 	}
5804 	put_ldev(device);
5805 	return 0;
5806 }
5807 
5808 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5809 {
5810 	return 0;
5811 }
5812 
5813 struct meta_sock_cmd {
5814 	size_t pkt_size;
5815 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5816 };
5817 
5818 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5819 {
5820 	long t;
5821 	struct net_conf *nc;
5822 
5823 	rcu_read_lock();
5824 	nc = rcu_dereference(connection->net_conf);
5825 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5826 	rcu_read_unlock();
5827 
5828 	t *= HZ;
5829 	if (ping_timeout)
5830 		t /= 10;
5831 
5832 	connection->meta.socket->sk->sk_rcvtimeo = t;
5833 }
5834 
5835 static void set_ping_timeout(struct drbd_connection *connection)
5836 {
5837 	set_rcvtimeo(connection, 1);
5838 }
5839 
5840 static void set_idle_timeout(struct drbd_connection *connection)
5841 {
5842 	set_rcvtimeo(connection, 0);
5843 }
5844 
5845 static struct meta_sock_cmd ack_receiver_tbl[] = {
5846 	[P_PING]	    = { 0, got_Ping },
5847 	[P_PING_ACK]	    = { 0, got_PingAck },
5848 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5849 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5850 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5851 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5852 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5853 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5854 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5855 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5856 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5857 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5858 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5859 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5860 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5861 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5862 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5863 };
5864 
5865 int drbd_ack_receiver(struct drbd_thread *thi)
5866 {
5867 	struct drbd_connection *connection = thi->connection;
5868 	struct meta_sock_cmd *cmd = NULL;
5869 	struct packet_info pi;
5870 	unsigned long pre_recv_jif;
5871 	int rv;
5872 	void *buf    = connection->meta.rbuf;
5873 	int received = 0;
5874 	unsigned int header_size = drbd_header_size(connection);
5875 	int expect   = header_size;
5876 	bool ping_timeout_active = false;
5877 	struct sched_param param = { .sched_priority = 2 };
5878 
5879 	rv = sched_setscheduler(current, SCHED_RR, &param);
5880 	if (rv < 0)
5881 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5882 
5883 	while (get_t_state(thi) == RUNNING) {
5884 		drbd_thread_current_set_cpu(thi);
5885 
5886 		conn_reclaim_net_peer_reqs(connection);
5887 
5888 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5889 			if (drbd_send_ping(connection)) {
5890 				drbd_err(connection, "drbd_send_ping has failed\n");
5891 				goto reconnect;
5892 			}
5893 			set_ping_timeout(connection);
5894 			ping_timeout_active = true;
5895 		}
5896 
5897 		pre_recv_jif = jiffies;
5898 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5899 
5900 		/* Note:
5901 		 * -EINTR	 (on meta) we got a signal
5902 		 * -EAGAIN	 (on meta) rcvtimeo expired
5903 		 * -ECONNRESET	 other side closed the connection
5904 		 * -ERESTARTSYS  (on data) we got a signal
5905 		 * rv <  0	 other than above: unexpected error!
5906 		 * rv == expected: full header or command
5907 		 * rv <  expected: "woken" by signal during receive
5908 		 * rv == 0	 : "connection shut down by peer"
5909 		 */
5910 		if (likely(rv > 0)) {
5911 			received += rv;
5912 			buf	 += rv;
5913 		} else if (rv == 0) {
5914 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5915 				long t;
5916 				rcu_read_lock();
5917 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5918 				rcu_read_unlock();
5919 
5920 				t = wait_event_timeout(connection->ping_wait,
5921 						       connection->cstate < C_WF_REPORT_PARAMS,
5922 						       t);
5923 				if (t)
5924 					break;
5925 			}
5926 			drbd_err(connection, "meta connection shut down by peer.\n");
5927 			goto reconnect;
5928 		} else if (rv == -EAGAIN) {
5929 			/* If the data socket received something meanwhile,
5930 			 * that is good enough: peer is still alive. */
5931 			if (time_after(connection->last_received, pre_recv_jif))
5932 				continue;
5933 			if (ping_timeout_active) {
5934 				drbd_err(connection, "PingAck did not arrive in time.\n");
5935 				goto reconnect;
5936 			}
5937 			set_bit(SEND_PING, &connection->flags);
5938 			continue;
5939 		} else if (rv == -EINTR) {
5940 			/* maybe drbd_thread_stop(): the while condition will notice.
5941 			 * maybe woken for send_ping: we'll send a ping above,
5942 			 * and change the rcvtimeo */
5943 			flush_signals(current);
5944 			continue;
5945 		} else {
5946 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5947 			goto reconnect;
5948 		}
5949 
5950 		if (received == expect && cmd == NULL) {
5951 			if (decode_header(connection, connection->meta.rbuf, &pi))
5952 				goto reconnect;
5953 			cmd = &ack_receiver_tbl[pi.cmd];
5954 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5955 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5956 					 cmdname(pi.cmd), pi.cmd);
5957 				goto disconnect;
5958 			}
5959 			expect = header_size + cmd->pkt_size;
5960 			if (pi.size != expect - header_size) {
5961 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5962 					pi.cmd, pi.size);
5963 				goto reconnect;
5964 			}
5965 		}
5966 		if (received == expect) {
5967 			bool err;
5968 
5969 			err = cmd->fn(connection, &pi);
5970 			if (err) {
5971 				drbd_err(connection, "%pf failed\n", cmd->fn);
5972 				goto reconnect;
5973 			}
5974 
5975 			connection->last_received = jiffies;
5976 
5977 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5978 				set_idle_timeout(connection);
5979 				ping_timeout_active = false;
5980 			}
5981 
5982 			buf	 = connection->meta.rbuf;
5983 			received = 0;
5984 			expect	 = header_size;
5985 			cmd	 = NULL;
5986 		}
5987 	}
5988 
5989 	if (0) {
5990 reconnect:
5991 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5992 		conn_md_sync(connection);
5993 	}
5994 	if (0) {
5995 disconnect:
5996 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5997 	}
5998 
5999 	drbd_info(connection, "ack_receiver terminated\n");
6000 
6001 	return 0;
6002 }
6003 
6004 void drbd_send_acks_wf(struct work_struct *ws)
6005 {
6006 	struct drbd_peer_device *peer_device =
6007 		container_of(ws, struct drbd_peer_device, send_acks_work);
6008 	struct drbd_connection *connection = peer_device->connection;
6009 	struct drbd_device *device = peer_device->device;
6010 	struct net_conf *nc;
6011 	int tcp_cork, err;
6012 
6013 	rcu_read_lock();
6014 	nc = rcu_dereference(connection->net_conf);
6015 	tcp_cork = nc->tcp_cork;
6016 	rcu_read_unlock();
6017 
6018 	if (tcp_cork)
6019 		drbd_tcp_cork(connection->meta.socket);
6020 
6021 	err = drbd_finish_peer_reqs(device);
6022 	kref_put(&device->kref, drbd_destroy_device);
6023 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6024 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6025 
6026 	if (err) {
6027 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6028 		return;
6029 	}
6030 
6031 	if (tcp_cork)
6032 		drbd_tcp_uncork(connection->meta.socket);
6033 
6034 	return;
6035 }
6036