1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
51 struct packet_info {
52 	enum drbd_packet cmd;
53 	unsigned int size;
54 	unsigned int vnr;
55 	void *data;
56 };
57 
58 enum finish_epoch {
59 	FE_STILL_LIVE,
60 	FE_DESTROYED,
61 	FE_RECYCLED,
62 };
63 
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67 
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70 
71 
72 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
73 
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78 
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85 	struct page *page;
86 	struct page *tmp;
87 
88 	BUG_ON(!n);
89 	BUG_ON(!head);
90 
91 	page = *head;
92 
93 	if (!page)
94 		return NULL;
95 
96 	while (page) {
97 		tmp = page_chain_next(page);
98 		if (--n == 0)
99 			break; /* found sufficient pages */
100 		if (tmp == NULL)
101 			/* insufficient pages, don't use any of them. */
102 			return NULL;
103 		page = tmp;
104 	}
105 
106 	/* add end of list marker for the returned list */
107 	set_page_private(page, 0);
108 	/* actual return value, and adjustment of head */
109 	page = *head;
110 	*head = tmp;
111 	return page;
112 }
113 
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119 	struct page *tmp;
120 	int i = 1;
121 	while ((tmp = page_chain_next(page)))
122 		++i, page = tmp;
123 	if (len)
124 		*len = i;
125 	return page;
126 }
127 
128 static int page_chain_free(struct page *page)
129 {
130 	struct page *tmp;
131 	int i = 0;
132 	page_chain_for_each_safe(page, tmp) {
133 		put_page(page);
134 		++i;
135 	}
136 	return i;
137 }
138 
139 static void page_chain_add(struct page **head,
140 		struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143 	struct page *tmp;
144 	tmp = page_chain_tail(chain_first, NULL);
145 	BUG_ON(tmp != chain_last);
146 #endif
147 
148 	/* add chain to head */
149 	set_page_private(chain_last, (unsigned long)*head);
150 	*head = chain_first;
151 }
152 
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154 				       unsigned int number)
155 {
156 	struct page *page = NULL;
157 	struct page *tmp = NULL;
158 	unsigned int i = 0;
159 
160 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
161 	 * So what. It saves a spin_lock. */
162 	if (drbd_pp_vacant >= number) {
163 		spin_lock(&drbd_pp_lock);
164 		page = page_chain_del(&drbd_pp_pool, number);
165 		if (page)
166 			drbd_pp_vacant -= number;
167 		spin_unlock(&drbd_pp_lock);
168 		if (page)
169 			return page;
170 	}
171 
172 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 	 * which in turn might block on the other node at this very place.  */
175 	for (i = 0; i < number; i++) {
176 		tmp = alloc_page(GFP_TRY);
177 		if (!tmp)
178 			break;
179 		set_page_private(tmp, (unsigned long)page);
180 		page = tmp;
181 	}
182 
183 	if (i == number)
184 		return page;
185 
186 	/* Not enough pages immediately available this time.
187 	 * No need to jump around here, drbd_alloc_pages will retry this
188 	 * function "soon". */
189 	if (page) {
190 		tmp = page_chain_tail(page, NULL);
191 		spin_lock(&drbd_pp_lock);
192 		page_chain_add(&drbd_pp_pool, page, tmp);
193 		drbd_pp_vacant += i;
194 		spin_unlock(&drbd_pp_lock);
195 	}
196 	return NULL;
197 }
198 
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 					   struct list_head *to_be_freed)
201 {
202 	struct drbd_peer_request *peer_req;
203 	struct list_head *le, *tle;
204 
205 	/* The EEs are always appended to the end of the list. Since
206 	   they are sent in order over the wire, they have to finish
207 	   in order. As soon as we see the first not finished we can
208 	   stop to examine the list... */
209 
210 	list_for_each_safe(le, tle, &mdev->net_ee) {
211 		peer_req = list_entry(le, struct drbd_peer_request, w.list);
212 		if (drbd_peer_req_has_active_page(peer_req))
213 			break;
214 		list_move(le, to_be_freed);
215 	}
216 }
217 
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220 	LIST_HEAD(reclaimed);
221 	struct drbd_peer_request *peer_req, *t;
222 
223 	spin_lock_irq(&mdev->tconn->req_lock);
224 	reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225 	spin_unlock_irq(&mdev->tconn->req_lock);
226 
227 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 		drbd_free_net_peer_req(mdev, peer_req);
229 }
230 
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:	DRBD device.
234  * @number:	number of pages requested
235  * @retry:	whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244 			      bool retry)
245 {
246 	struct page *page = NULL;
247 	struct net_conf *nc;
248 	DEFINE_WAIT(wait);
249 	int mxb;
250 
251 	/* Yes, we may run up to @number over max_buffers. If we
252 	 * follow it strictly, the admin will get it wrong anyways. */
253 	rcu_read_lock();
254 	nc = rcu_dereference(mdev->tconn->net_conf);
255 	mxb = nc ? nc->max_buffers : 1000000;
256 	rcu_read_unlock();
257 
258 	if (atomic_read(&mdev->pp_in_use) < mxb)
259 		page = __drbd_alloc_pages(mdev, number);
260 
261 	while (page == NULL) {
262 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263 
264 		drbd_kick_lo_and_reclaim_net(mdev);
265 
266 		if (atomic_read(&mdev->pp_in_use) < mxb) {
267 			page = __drbd_alloc_pages(mdev, number);
268 			if (page)
269 				break;
270 		}
271 
272 		if (!retry)
273 			break;
274 
275 		if (signal_pending(current)) {
276 			dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277 			break;
278 		}
279 
280 		schedule();
281 	}
282 	finish_wait(&drbd_pp_wait, &wait);
283 
284 	if (page)
285 		atomic_add(number, &mdev->pp_in_use);
286 	return page;
287 }
288 
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296 	int i;
297 
298 	if (page == NULL)
299 		return;
300 
301 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302 		i = page_chain_free(page);
303 	else {
304 		struct page *tmp;
305 		tmp = page_chain_tail(page, &i);
306 		spin_lock(&drbd_pp_lock);
307 		page_chain_add(&drbd_pp_pool, page, tmp);
308 		drbd_pp_vacant += i;
309 		spin_unlock(&drbd_pp_lock);
310 	}
311 	i = atomic_sub_return(i, a);
312 	if (i < 0)
313 		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
314 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315 	wake_up(&drbd_pp_wait);
316 }
317 
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321 
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331 
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
334 		    unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336 	struct drbd_peer_request *peer_req;
337 	struct page *page = NULL;
338 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
339 
340 	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
341 		return NULL;
342 
343 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
344 	if (!peer_req) {
345 		if (!(gfp_mask & __GFP_NOWARN))
346 			dev_err(DEV, "%s: allocation failed\n", __func__);
347 		return NULL;
348 	}
349 
350 	if (data_size) {
351 		page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
352 		if (!page)
353 			goto fail;
354 	}
355 
356 	drbd_clear_interval(&peer_req->i);
357 	peer_req->i.size = data_size;
358 	peer_req->i.sector = sector;
359 	peer_req->i.local = false;
360 	peer_req->i.waiting = false;
361 
362 	peer_req->epoch = NULL;
363 	peer_req->w.mdev = mdev;
364 	peer_req->pages = page;
365 	atomic_set(&peer_req->pending_bios, 0);
366 	peer_req->flags = 0;
367 	/*
368 	 * The block_id is opaque to the receiver.  It is not endianness
369 	 * converted, and sent back to the sender unchanged.
370 	 */
371 	peer_req->block_id = id;
372 
373 	return peer_req;
374 
375  fail:
376 	mempool_free(peer_req, drbd_ee_mempool);
377 	return NULL;
378 }
379 
380 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
381 		       int is_net)
382 {
383 	if (peer_req->flags & EE_HAS_DIGEST)
384 		kfree(peer_req->digest);
385 	drbd_free_pages(mdev, peer_req->pages, is_net);
386 	D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
387 	D_ASSERT(drbd_interval_empty(&peer_req->i));
388 	mempool_free(peer_req, drbd_ee_mempool);
389 }
390 
391 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
392 {
393 	LIST_HEAD(work_list);
394 	struct drbd_peer_request *peer_req, *t;
395 	int count = 0;
396 	int is_net = list == &mdev->net_ee;
397 
398 	spin_lock_irq(&mdev->tconn->req_lock);
399 	list_splice_init(list, &work_list);
400 	spin_unlock_irq(&mdev->tconn->req_lock);
401 
402 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
403 		__drbd_free_peer_req(mdev, peer_req, is_net);
404 		count++;
405 	}
406 	return count;
407 }
408 
409 /*
410  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
411  */
412 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
413 {
414 	LIST_HEAD(work_list);
415 	LIST_HEAD(reclaimed);
416 	struct drbd_peer_request *peer_req, *t;
417 	int err = 0;
418 
419 	spin_lock_irq(&mdev->tconn->req_lock);
420 	reclaim_finished_net_peer_reqs(mdev, &reclaimed);
421 	list_splice_init(&mdev->done_ee, &work_list);
422 	spin_unlock_irq(&mdev->tconn->req_lock);
423 
424 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
425 		drbd_free_net_peer_req(mdev, peer_req);
426 
427 	/* possible callbacks here:
428 	 * e_end_block, and e_end_resync_block, e_send_superseded.
429 	 * all ignore the last argument.
430 	 */
431 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432 		int err2;
433 
434 		/* list_del not necessary, next/prev members not touched */
435 		err2 = peer_req->w.cb(&peer_req->w, !!err);
436 		if (!err)
437 			err = err2;
438 		drbd_free_peer_req(mdev, peer_req);
439 	}
440 	wake_up(&mdev->ee_wait);
441 
442 	return err;
443 }
444 
445 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
446 				     struct list_head *head)
447 {
448 	DEFINE_WAIT(wait);
449 
450 	/* avoids spin_lock/unlock
451 	 * and calling prepare_to_wait in the fast path */
452 	while (!list_empty(head)) {
453 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
454 		spin_unlock_irq(&mdev->tconn->req_lock);
455 		io_schedule();
456 		finish_wait(&mdev->ee_wait, &wait);
457 		spin_lock_irq(&mdev->tconn->req_lock);
458 	}
459 }
460 
461 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
462 				    struct list_head *head)
463 {
464 	spin_lock_irq(&mdev->tconn->req_lock);
465 	_drbd_wait_ee_list_empty(mdev, head);
466 	spin_unlock_irq(&mdev->tconn->req_lock);
467 }
468 
469 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
470 {
471 	mm_segment_t oldfs;
472 	struct kvec iov = {
473 		.iov_base = buf,
474 		.iov_len = size,
475 	};
476 	struct msghdr msg = {
477 		.msg_iovlen = 1,
478 		.msg_iov = (struct iovec *)&iov,
479 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480 	};
481 	int rv;
482 
483 	oldfs = get_fs();
484 	set_fs(KERNEL_DS);
485 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486 	set_fs(oldfs);
487 
488 	return rv;
489 }
490 
491 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
492 {
493 	int rv;
494 
495 	rv = drbd_recv_short(tconn->data.socket, buf, size, 0);
496 
497 	if (rv < 0) {
498 		if (rv == -ECONNRESET)
499 			conn_info(tconn, "sock was reset by peer\n");
500 		else if (rv != -ERESTARTSYS)
501 			conn_err(tconn, "sock_recvmsg returned %d\n", rv);
502 	} else if (rv == 0) {
503 		if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
504 			long t;
505 			rcu_read_lock();
506 			t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
507 			rcu_read_unlock();
508 
509 			t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
510 
511 			if (t)
512 				goto out;
513 		}
514 		conn_info(tconn, "sock was shut down by peer\n");
515 	}
516 
517 	if (rv != size)
518 		conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
519 
520 out:
521 	return rv;
522 }
523 
524 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
525 {
526 	int err;
527 
528 	err = drbd_recv(tconn, buf, size);
529 	if (err != size) {
530 		if (err >= 0)
531 			err = -EIO;
532 	} else
533 		err = 0;
534 	return err;
535 }
536 
537 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
538 {
539 	int err;
540 
541 	err = drbd_recv_all(tconn, buf, size);
542 	if (err && !signal_pending(current))
543 		conn_warn(tconn, "short read (expected size %d)\n", (int)size);
544 	return err;
545 }
546 
547 /* quoting tcp(7):
548  *   On individual connections, the socket buffer size must be set prior to the
549  *   listen(2) or connect(2) calls in order to have it take effect.
550  * This is our wrapper to do so.
551  */
552 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
553 		unsigned int rcv)
554 {
555 	/* open coded SO_SNDBUF, SO_RCVBUF */
556 	if (snd) {
557 		sock->sk->sk_sndbuf = snd;
558 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
559 	}
560 	if (rcv) {
561 		sock->sk->sk_rcvbuf = rcv;
562 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
563 	}
564 }
565 
566 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
567 {
568 	const char *what;
569 	struct socket *sock;
570 	struct sockaddr_in6 src_in6;
571 	struct sockaddr_in6 peer_in6;
572 	struct net_conf *nc;
573 	int err, peer_addr_len, my_addr_len;
574 	int sndbuf_size, rcvbuf_size, connect_int;
575 	int disconnect_on_error = 1;
576 
577 	rcu_read_lock();
578 	nc = rcu_dereference(tconn->net_conf);
579 	if (!nc) {
580 		rcu_read_unlock();
581 		return NULL;
582 	}
583 	sndbuf_size = nc->sndbuf_size;
584 	rcvbuf_size = nc->rcvbuf_size;
585 	connect_int = nc->connect_int;
586 	rcu_read_unlock();
587 
588 	my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
589 	memcpy(&src_in6, &tconn->my_addr, my_addr_len);
590 
591 	if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
592 		src_in6.sin6_port = 0;
593 	else
594 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
595 
596 	peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
597 	memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
598 
599 	what = "sock_create_kern";
600 	err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
601 			       SOCK_STREAM, IPPROTO_TCP, &sock);
602 	if (err < 0) {
603 		sock = NULL;
604 		goto out;
605 	}
606 
607 	sock->sk->sk_rcvtimeo =
608 	sock->sk->sk_sndtimeo = connect_int * HZ;
609 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
610 
611        /* explicitly bind to the configured IP as source IP
612 	*  for the outgoing connections.
613 	*  This is needed for multihomed hosts and to be
614 	*  able to use lo: interfaces for drbd.
615 	* Make sure to use 0 as port number, so linux selects
616 	*  a free one dynamically.
617 	*/
618 	what = "bind before connect";
619 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
620 	if (err < 0)
621 		goto out;
622 
623 	/* connect may fail, peer not yet available.
624 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
625 	disconnect_on_error = 0;
626 	what = "connect";
627 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
628 
629 out:
630 	if (err < 0) {
631 		if (sock) {
632 			sock_release(sock);
633 			sock = NULL;
634 		}
635 		switch (-err) {
636 			/* timeout, busy, signal pending */
637 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
638 		case EINTR: case ERESTARTSYS:
639 			/* peer not (yet) available, network problem */
640 		case ECONNREFUSED: case ENETUNREACH:
641 		case EHOSTDOWN:    case EHOSTUNREACH:
642 			disconnect_on_error = 0;
643 			break;
644 		default:
645 			conn_err(tconn, "%s failed, err = %d\n", what, err);
646 		}
647 		if (disconnect_on_error)
648 			conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
649 	}
650 
651 	return sock;
652 }
653 
654 struct accept_wait_data {
655 	struct drbd_tconn *tconn;
656 	struct socket *s_listen;
657 	struct completion door_bell;
658 	void (*original_sk_state_change)(struct sock *sk);
659 
660 };
661 
662 static void drbd_incoming_connection(struct sock *sk)
663 {
664 	struct accept_wait_data *ad = sk->sk_user_data;
665 	void (*state_change)(struct sock *sk);
666 
667 	state_change = ad->original_sk_state_change;
668 	if (sk->sk_state == TCP_ESTABLISHED)
669 		complete(&ad->door_bell);
670 	state_change(sk);
671 }
672 
673 static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
674 {
675 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
676 	struct sockaddr_in6 my_addr;
677 	struct socket *s_listen;
678 	struct net_conf *nc;
679 	const char *what;
680 
681 	rcu_read_lock();
682 	nc = rcu_dereference(tconn->net_conf);
683 	if (!nc) {
684 		rcu_read_unlock();
685 		return -EIO;
686 	}
687 	sndbuf_size = nc->sndbuf_size;
688 	rcvbuf_size = nc->rcvbuf_size;
689 	rcu_read_unlock();
690 
691 	my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
692 	memcpy(&my_addr, &tconn->my_addr, my_addr_len);
693 
694 	what = "sock_create_kern";
695 	err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
696 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
697 	if (err) {
698 		s_listen = NULL;
699 		goto out;
700 	}
701 
702 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
703 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
704 
705 	what = "bind before listen";
706 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
707 	if (err < 0)
708 		goto out;
709 
710 	ad->s_listen = s_listen;
711 	write_lock_bh(&s_listen->sk->sk_callback_lock);
712 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
713 	s_listen->sk->sk_state_change = drbd_incoming_connection;
714 	s_listen->sk->sk_user_data = ad;
715 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
716 
717 	what = "listen";
718 	err = s_listen->ops->listen(s_listen, 5);
719 	if (err < 0)
720 		goto out;
721 
722 	return 0;
723 out:
724 	if (s_listen)
725 		sock_release(s_listen);
726 	if (err < 0) {
727 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
728 			conn_err(tconn, "%s failed, err = %d\n", what, err);
729 			conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
730 		}
731 	}
732 
733 	return -EIO;
734 }
735 
736 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
737 {
738 	write_lock_bh(&sk->sk_callback_lock);
739 	sk->sk_state_change = ad->original_sk_state_change;
740 	sk->sk_user_data = NULL;
741 	write_unlock_bh(&sk->sk_callback_lock);
742 }
743 
744 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
745 {
746 	int timeo, connect_int, err = 0;
747 	struct socket *s_estab = NULL;
748 	struct net_conf *nc;
749 
750 	rcu_read_lock();
751 	nc = rcu_dereference(tconn->net_conf);
752 	if (!nc) {
753 		rcu_read_unlock();
754 		return NULL;
755 	}
756 	connect_int = nc->connect_int;
757 	rcu_read_unlock();
758 
759 	timeo = connect_int * HZ;
760 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
761 
762 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763 	if (err <= 0)
764 		return NULL;
765 
766 	err = kernel_accept(ad->s_listen, &s_estab, 0);
767 	if (err < 0) {
768 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769 			conn_err(tconn, "accept failed, err = %d\n", err);
770 			conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
771 		}
772 	}
773 
774 	if (s_estab)
775 		unregister_state_change(s_estab->sk, ad);
776 
777 	return s_estab;
778 }
779 
780 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
781 
782 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
783 			     enum drbd_packet cmd)
784 {
785 	if (!conn_prepare_command(tconn, sock))
786 		return -EIO;
787 	return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
788 }
789 
790 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
791 {
792 	unsigned int header_size = drbd_header_size(tconn);
793 	struct packet_info pi;
794 	int err;
795 
796 	err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
797 	if (err != header_size) {
798 		if (err >= 0)
799 			err = -EIO;
800 		return err;
801 	}
802 	err = decode_header(tconn, tconn->data.rbuf, &pi);
803 	if (err)
804 		return err;
805 	return pi.cmd;
806 }
807 
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:	pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814 	int rr;
815 	char tb[4];
816 
817 	if (!*sock)
818 		return false;
819 
820 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821 
822 	if (rr > 0 || rr == -EAGAIN) {
823 		return true;
824 	} else {
825 		sock_release(*sock);
826 		*sock = NULL;
827 		return false;
828 	}
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_conf *mdev)
833 {
834 	int err;
835 
836 	atomic_set(&mdev->packet_seq, 0);
837 	mdev->peer_seq = 0;
838 
839 	mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
840 		&mdev->tconn->cstate_mutex :
841 		&mdev->own_state_mutex;
842 
843 	err = drbd_send_sync_param(mdev);
844 	if (!err)
845 		err = drbd_send_sizes(mdev, 0, 0);
846 	if (!err)
847 		err = drbd_send_uuids(mdev);
848 	if (!err)
849 		err = drbd_send_current_state(mdev);
850 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
851 	clear_bit(RESIZE_PENDING, &mdev->flags);
852 	mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
853 	return err;
854 }
855 
856 /*
857  * return values:
858  *   1 yes, we have a valid connection
859  *   0 oops, did not work out, please try again
860  *  -1 peer talks different language,
861  *     no point in trying again, please go standalone.
862  *  -2 We do not have a network config...
863  */
864 static int conn_connect(struct drbd_tconn *tconn)
865 {
866 	struct drbd_socket sock, msock;
867 	struct drbd_conf *mdev;
868 	struct net_conf *nc;
869 	int vnr, timeout, h, ok;
870 	bool discard_my_data;
871 	enum drbd_state_rv rv;
872 	struct accept_wait_data ad = {
873 		.tconn = tconn,
874 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
875 	};
876 
877 	clear_bit(DISCONNECT_SENT, &tconn->flags);
878 	if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
879 		return -2;
880 
881 	mutex_init(&sock.mutex);
882 	sock.sbuf = tconn->data.sbuf;
883 	sock.rbuf = tconn->data.rbuf;
884 	sock.socket = NULL;
885 	mutex_init(&msock.mutex);
886 	msock.sbuf = tconn->meta.sbuf;
887 	msock.rbuf = tconn->meta.rbuf;
888 	msock.socket = NULL;
889 
890 	/* Assume that the peer only understands protocol 80 until we know better.  */
891 	tconn->agreed_pro_version = 80;
892 
893 	if (prepare_listen_socket(tconn, &ad))
894 		return 0;
895 
896 	do {
897 		struct socket *s;
898 
899 		s = drbd_try_connect(tconn);
900 		if (s) {
901 			if (!sock.socket) {
902 				sock.socket = s;
903 				send_first_packet(tconn, &sock, P_INITIAL_DATA);
904 			} else if (!msock.socket) {
905 				clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
906 				msock.socket = s;
907 				send_first_packet(tconn, &msock, P_INITIAL_META);
908 			} else {
909 				conn_err(tconn, "Logic error in conn_connect()\n");
910 				goto out_release_sockets;
911 			}
912 		}
913 
914 		if (sock.socket && msock.socket) {
915 			rcu_read_lock();
916 			nc = rcu_dereference(tconn->net_conf);
917 			timeout = nc->ping_timeo * HZ / 10;
918 			rcu_read_unlock();
919 			schedule_timeout_interruptible(timeout);
920 			ok = drbd_socket_okay(&sock.socket);
921 			ok = drbd_socket_okay(&msock.socket) && ok;
922 			if (ok)
923 				break;
924 		}
925 
926 retry:
927 		s = drbd_wait_for_connect(tconn, &ad);
928 		if (s) {
929 			int fp = receive_first_packet(tconn, s);
930 			drbd_socket_okay(&sock.socket);
931 			drbd_socket_okay(&msock.socket);
932 			switch (fp) {
933 			case P_INITIAL_DATA:
934 				if (sock.socket) {
935 					conn_warn(tconn, "initial packet S crossed\n");
936 					sock_release(sock.socket);
937 					sock.socket = s;
938 					goto randomize;
939 				}
940 				sock.socket = s;
941 				break;
942 			case P_INITIAL_META:
943 				set_bit(RESOLVE_CONFLICTS, &tconn->flags);
944 				if (msock.socket) {
945 					conn_warn(tconn, "initial packet M crossed\n");
946 					sock_release(msock.socket);
947 					msock.socket = s;
948 					goto randomize;
949 				}
950 				msock.socket = s;
951 				break;
952 			default:
953 				conn_warn(tconn, "Error receiving initial packet\n");
954 				sock_release(s);
955 randomize:
956 				if (random32() & 1)
957 					goto retry;
958 			}
959 		}
960 
961 		if (tconn->cstate <= C_DISCONNECTING)
962 			goto out_release_sockets;
963 		if (signal_pending(current)) {
964 			flush_signals(current);
965 			smp_rmb();
966 			if (get_t_state(&tconn->receiver) == EXITING)
967 				goto out_release_sockets;
968 		}
969 
970 		ok = drbd_socket_okay(&sock.socket);
971 		ok = drbd_socket_okay(&msock.socket) && ok;
972 	} while (!ok);
973 
974 	if (ad.s_listen)
975 		sock_release(ad.s_listen);
976 
977 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
978 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
979 
980 	sock.socket->sk->sk_allocation = GFP_NOIO;
981 	msock.socket->sk->sk_allocation = GFP_NOIO;
982 
983 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
984 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
985 
986 	/* NOT YET ...
987 	 * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
988 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
989 	 * first set it to the P_CONNECTION_FEATURES timeout,
990 	 * which we set to 4x the configured ping_timeout. */
991 	rcu_read_lock();
992 	nc = rcu_dereference(tconn->net_conf);
993 
994 	sock.socket->sk->sk_sndtimeo =
995 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
996 
997 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
998 	timeout = nc->timeout * HZ / 10;
999 	discard_my_data = nc->discard_my_data;
1000 	rcu_read_unlock();
1001 
1002 	msock.socket->sk->sk_sndtimeo = timeout;
1003 
1004 	/* we don't want delays.
1005 	 * we use TCP_CORK where appropriate, though */
1006 	drbd_tcp_nodelay(sock.socket);
1007 	drbd_tcp_nodelay(msock.socket);
1008 
1009 	tconn->data.socket = sock.socket;
1010 	tconn->meta.socket = msock.socket;
1011 	tconn->last_received = jiffies;
1012 
1013 	h = drbd_do_features(tconn);
1014 	if (h <= 0)
1015 		return h;
1016 
1017 	if (tconn->cram_hmac_tfm) {
1018 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
1019 		switch (drbd_do_auth(tconn)) {
1020 		case -1:
1021 			conn_err(tconn, "Authentication of peer failed\n");
1022 			return -1;
1023 		case 0:
1024 			conn_err(tconn, "Authentication of peer failed, trying again.\n");
1025 			return 0;
1026 		}
1027 	}
1028 
1029 	tconn->data.socket->sk->sk_sndtimeo = timeout;
1030 	tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1031 
1032 	if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1033 		return -1;
1034 
1035 	set_bit(STATE_SENT, &tconn->flags);
1036 
1037 	rcu_read_lock();
1038 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1039 		kref_get(&mdev->kref);
1040 		/* Prevent a race between resync-handshake and
1041 		 * being promoted to Primary.
1042 		 *
1043 		 * Grab and release the state mutex, so we know that any current
1044 		 * drbd_set_role() is finished, and any incoming drbd_set_role
1045 		 * will see the STATE_SENT flag, and wait for it to be cleared.
1046 		 */
1047 		mutex_lock(mdev->state_mutex);
1048 		mutex_unlock(mdev->state_mutex);
1049 
1050 		rcu_read_unlock();
1051 
1052 		if (discard_my_data)
1053 			set_bit(DISCARD_MY_DATA, &mdev->flags);
1054 		else
1055 			clear_bit(DISCARD_MY_DATA, &mdev->flags);
1056 
1057 		drbd_connected(mdev);
1058 		kref_put(&mdev->kref, &drbd_minor_destroy);
1059 		rcu_read_lock();
1060 	}
1061 	rcu_read_unlock();
1062 
1063 	rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1064 	if (rv < SS_SUCCESS || tconn->cstate != C_WF_REPORT_PARAMS) {
1065 		clear_bit(STATE_SENT, &tconn->flags);
1066 		return 0;
1067 	}
1068 
1069 	drbd_thread_start(&tconn->asender);
1070 
1071 	mutex_lock(&tconn->conf_update);
1072 	/* The discard_my_data flag is a single-shot modifier to the next
1073 	 * connection attempt, the handshake of which is now well underway.
1074 	 * No need for rcu style copying of the whole struct
1075 	 * just to clear a single value. */
1076 	tconn->net_conf->discard_my_data = 0;
1077 	mutex_unlock(&tconn->conf_update);
1078 
1079 	return h;
1080 
1081 out_release_sockets:
1082 	if (ad.s_listen)
1083 		sock_release(ad.s_listen);
1084 	if (sock.socket)
1085 		sock_release(sock.socket);
1086 	if (msock.socket)
1087 		sock_release(msock.socket);
1088 	return -1;
1089 }
1090 
1091 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1092 {
1093 	unsigned int header_size = drbd_header_size(tconn);
1094 
1095 	if (header_size == sizeof(struct p_header100) &&
1096 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1097 		struct p_header100 *h = header;
1098 		if (h->pad != 0) {
1099 			conn_err(tconn, "Header padding is not zero\n");
1100 			return -EINVAL;
1101 		}
1102 		pi->vnr = be16_to_cpu(h->volume);
1103 		pi->cmd = be16_to_cpu(h->command);
1104 		pi->size = be32_to_cpu(h->length);
1105 	} else if (header_size == sizeof(struct p_header95) &&
1106 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1107 		struct p_header95 *h = header;
1108 		pi->cmd = be16_to_cpu(h->command);
1109 		pi->size = be32_to_cpu(h->length);
1110 		pi->vnr = 0;
1111 	} else if (header_size == sizeof(struct p_header80) &&
1112 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1113 		struct p_header80 *h = header;
1114 		pi->cmd = be16_to_cpu(h->command);
1115 		pi->size = be16_to_cpu(h->length);
1116 		pi->vnr = 0;
1117 	} else {
1118 		conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1119 			 be32_to_cpu(*(__be32 *)header),
1120 			 tconn->agreed_pro_version);
1121 		return -EINVAL;
1122 	}
1123 	pi->data = header + header_size;
1124 	return 0;
1125 }
1126 
1127 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1128 {
1129 	void *buffer = tconn->data.rbuf;
1130 	int err;
1131 
1132 	err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1133 	if (err)
1134 		return err;
1135 
1136 	err = decode_header(tconn, buffer, pi);
1137 	tconn->last_received = jiffies;
1138 
1139 	return err;
1140 }
1141 
1142 static void drbd_flush(struct drbd_tconn *tconn)
1143 {
1144 	int rv;
1145 	struct drbd_conf *mdev;
1146 	int vnr;
1147 
1148 	if (tconn->write_ordering >= WO_bdev_flush) {
1149 		rcu_read_lock();
1150 		idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1151 			if (!get_ldev(mdev))
1152 				continue;
1153 			kref_get(&mdev->kref);
1154 			rcu_read_unlock();
1155 
1156 			rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1157 					GFP_NOIO, NULL);
1158 			if (rv) {
1159 				dev_info(DEV, "local disk flush failed with status %d\n", rv);
1160 				/* would rather check on EOPNOTSUPP, but that is not reliable.
1161 				 * don't try again for ANY return value != 0
1162 				 * if (rv == -EOPNOTSUPP) */
1163 				drbd_bump_write_ordering(tconn, WO_drain_io);
1164 			}
1165 			put_ldev(mdev);
1166 			kref_put(&mdev->kref, &drbd_minor_destroy);
1167 
1168 			rcu_read_lock();
1169 			if (rv)
1170 				break;
1171 		}
1172 		rcu_read_unlock();
1173 	}
1174 }
1175 
1176 /**
1177  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1178  * @mdev:	DRBD device.
1179  * @epoch:	Epoch object.
1180  * @ev:		Epoch event.
1181  */
1182 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1183 					       struct drbd_epoch *epoch,
1184 					       enum epoch_event ev)
1185 {
1186 	int epoch_size;
1187 	struct drbd_epoch *next_epoch;
1188 	enum finish_epoch rv = FE_STILL_LIVE;
1189 
1190 	spin_lock(&tconn->epoch_lock);
1191 	do {
1192 		next_epoch = NULL;
1193 
1194 		epoch_size = atomic_read(&epoch->epoch_size);
1195 
1196 		switch (ev & ~EV_CLEANUP) {
1197 		case EV_PUT:
1198 			atomic_dec(&epoch->active);
1199 			break;
1200 		case EV_GOT_BARRIER_NR:
1201 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1202 			break;
1203 		case EV_BECAME_LAST:
1204 			/* nothing to do*/
1205 			break;
1206 		}
1207 
1208 		if (epoch_size != 0 &&
1209 		    atomic_read(&epoch->active) == 0 &&
1210 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1211 			if (!(ev & EV_CLEANUP)) {
1212 				spin_unlock(&tconn->epoch_lock);
1213 				drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1214 				spin_lock(&tconn->epoch_lock);
1215 			}
1216 #if 0
1217 			/* FIXME: dec unacked on connection, once we have
1218 			 * something to count pending connection packets in. */
1219 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1220 				dec_unacked(epoch->tconn);
1221 #endif
1222 
1223 			if (tconn->current_epoch != epoch) {
1224 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1225 				list_del(&epoch->list);
1226 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1227 				tconn->epochs--;
1228 				kfree(epoch);
1229 
1230 				if (rv == FE_STILL_LIVE)
1231 					rv = FE_DESTROYED;
1232 			} else {
1233 				epoch->flags = 0;
1234 				atomic_set(&epoch->epoch_size, 0);
1235 				/* atomic_set(&epoch->active, 0); is already zero */
1236 				if (rv == FE_STILL_LIVE)
1237 					rv = FE_RECYCLED;
1238 			}
1239 		}
1240 
1241 		if (!next_epoch)
1242 			break;
1243 
1244 		epoch = next_epoch;
1245 	} while (1);
1246 
1247 	spin_unlock(&tconn->epoch_lock);
1248 
1249 	return rv;
1250 }
1251 
1252 /**
1253  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1254  * @tconn:	DRBD connection.
1255  * @wo:		Write ordering method to try.
1256  */
1257 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1258 {
1259 	struct disk_conf *dc;
1260 	struct drbd_conf *mdev;
1261 	enum write_ordering_e pwo;
1262 	int vnr;
1263 	static char *write_ordering_str[] = {
1264 		[WO_none] = "none",
1265 		[WO_drain_io] = "drain",
1266 		[WO_bdev_flush] = "flush",
1267 	};
1268 
1269 	pwo = tconn->write_ordering;
1270 	wo = min(pwo, wo);
1271 	rcu_read_lock();
1272 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1273 		if (!get_ldev_if_state(mdev, D_ATTACHING))
1274 			continue;
1275 		dc = rcu_dereference(mdev->ldev->disk_conf);
1276 
1277 		if (wo == WO_bdev_flush && !dc->disk_flushes)
1278 			wo = WO_drain_io;
1279 		if (wo == WO_drain_io && !dc->disk_drain)
1280 			wo = WO_none;
1281 		put_ldev(mdev);
1282 	}
1283 	rcu_read_unlock();
1284 	tconn->write_ordering = wo;
1285 	if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1286 		conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1287 }
1288 
1289 /**
1290  * drbd_submit_peer_request()
1291  * @mdev:	DRBD device.
1292  * @peer_req:	peer request
1293  * @rw:		flag field, see bio->bi_rw
1294  *
1295  * May spread the pages to multiple bios,
1296  * depending on bio_add_page restrictions.
1297  *
1298  * Returns 0 if all bios have been submitted,
1299  * -ENOMEM if we could not allocate enough bios,
1300  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1301  *  single page to an empty bio (which should never happen and likely indicates
1302  *  that the lower level IO stack is in some way broken). This has been observed
1303  *  on certain Xen deployments.
1304  */
1305 /* TODO allocate from our own bio_set. */
1306 int drbd_submit_peer_request(struct drbd_conf *mdev,
1307 			     struct drbd_peer_request *peer_req,
1308 			     const unsigned rw, const int fault_type)
1309 {
1310 	struct bio *bios = NULL;
1311 	struct bio *bio;
1312 	struct page *page = peer_req->pages;
1313 	sector_t sector = peer_req->i.sector;
1314 	unsigned ds = peer_req->i.size;
1315 	unsigned n_bios = 0;
1316 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1317 	int err = -ENOMEM;
1318 
1319 	/* In most cases, we will only need one bio.  But in case the lower
1320 	 * level restrictions happen to be different at this offset on this
1321 	 * side than those of the sending peer, we may need to submit the
1322 	 * request in more than one bio.
1323 	 *
1324 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1325 	 * generated bio, but a bio allocated on behalf of the peer.
1326 	 */
1327 next_bio:
1328 	bio = bio_alloc(GFP_NOIO, nr_pages);
1329 	if (!bio) {
1330 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1331 		goto fail;
1332 	}
1333 	/* > peer_req->i.sector, unless this is the first bio */
1334 	bio->bi_sector = sector;
1335 	bio->bi_bdev = mdev->ldev->backing_bdev;
1336 	bio->bi_rw = rw;
1337 	bio->bi_private = peer_req;
1338 	bio->bi_end_io = drbd_peer_request_endio;
1339 
1340 	bio->bi_next = bios;
1341 	bios = bio;
1342 	++n_bios;
1343 
1344 	page_chain_for_each(page) {
1345 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1346 		if (!bio_add_page(bio, page, len, 0)) {
1347 			/* A single page must always be possible!
1348 			 * But in case it fails anyways,
1349 			 * we deal with it, and complain (below). */
1350 			if (bio->bi_vcnt == 0) {
1351 				dev_err(DEV,
1352 					"bio_add_page failed for len=%u, "
1353 					"bi_vcnt=0 (bi_sector=%llu)\n",
1354 					len, (unsigned long long)bio->bi_sector);
1355 				err = -ENOSPC;
1356 				goto fail;
1357 			}
1358 			goto next_bio;
1359 		}
1360 		ds -= len;
1361 		sector += len >> 9;
1362 		--nr_pages;
1363 	}
1364 	D_ASSERT(page == NULL);
1365 	D_ASSERT(ds == 0);
1366 
1367 	atomic_set(&peer_req->pending_bios, n_bios);
1368 	do {
1369 		bio = bios;
1370 		bios = bios->bi_next;
1371 		bio->bi_next = NULL;
1372 
1373 		drbd_generic_make_request(mdev, fault_type, bio);
1374 	} while (bios);
1375 	return 0;
1376 
1377 fail:
1378 	while (bios) {
1379 		bio = bios;
1380 		bios = bios->bi_next;
1381 		bio_put(bio);
1382 	}
1383 	return err;
1384 }
1385 
1386 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1387 					     struct drbd_peer_request *peer_req)
1388 {
1389 	struct drbd_interval *i = &peer_req->i;
1390 
1391 	drbd_remove_interval(&mdev->write_requests, i);
1392 	drbd_clear_interval(i);
1393 
1394 	/* Wake up any processes waiting for this peer request to complete.  */
1395 	if (i->waiting)
1396 		wake_up(&mdev->misc_wait);
1397 }
1398 
1399 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1400 {
1401 	struct drbd_conf *mdev;
1402 	int vnr;
1403 
1404 	rcu_read_lock();
1405 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1406 		kref_get(&mdev->kref);
1407 		rcu_read_unlock();
1408 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1409 		kref_put(&mdev->kref, &drbd_minor_destroy);
1410 		rcu_read_lock();
1411 	}
1412 	rcu_read_unlock();
1413 }
1414 
1415 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1416 {
1417 	int rv;
1418 	struct p_barrier *p = pi->data;
1419 	struct drbd_epoch *epoch;
1420 
1421 	/* FIXME these are unacked on connection,
1422 	 * not a specific (peer)device.
1423 	 */
1424 	tconn->current_epoch->barrier_nr = p->barrier;
1425 	tconn->current_epoch->tconn = tconn;
1426 	rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1427 
1428 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1429 	 * the activity log, which means it would not be resynced in case the
1430 	 * R_PRIMARY crashes now.
1431 	 * Therefore we must send the barrier_ack after the barrier request was
1432 	 * completed. */
1433 	switch (tconn->write_ordering) {
1434 	case WO_none:
1435 		if (rv == FE_RECYCLED)
1436 			return 0;
1437 
1438 		/* receiver context, in the writeout path of the other node.
1439 		 * avoid potential distributed deadlock */
1440 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1441 		if (epoch)
1442 			break;
1443 		else
1444 			conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1445 			/* Fall through */
1446 
1447 	case WO_bdev_flush:
1448 	case WO_drain_io:
1449 		conn_wait_active_ee_empty(tconn);
1450 		drbd_flush(tconn);
1451 
1452 		if (atomic_read(&tconn->current_epoch->epoch_size)) {
1453 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1454 			if (epoch)
1455 				break;
1456 		}
1457 
1458 		return 0;
1459 	default:
1460 		conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1461 		return -EIO;
1462 	}
1463 
1464 	epoch->flags = 0;
1465 	atomic_set(&epoch->epoch_size, 0);
1466 	atomic_set(&epoch->active, 0);
1467 
1468 	spin_lock(&tconn->epoch_lock);
1469 	if (atomic_read(&tconn->current_epoch->epoch_size)) {
1470 		list_add(&epoch->list, &tconn->current_epoch->list);
1471 		tconn->current_epoch = epoch;
1472 		tconn->epochs++;
1473 	} else {
1474 		/* The current_epoch got recycled while we allocated this one... */
1475 		kfree(epoch);
1476 	}
1477 	spin_unlock(&tconn->epoch_lock);
1478 
1479 	return 0;
1480 }
1481 
1482 /* used from receive_RSDataReply (recv_resync_read)
1483  * and from receive_Data */
1484 static struct drbd_peer_request *
1485 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1486 	      int data_size) __must_hold(local)
1487 {
1488 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1489 	struct drbd_peer_request *peer_req;
1490 	struct page *page;
1491 	int dgs, ds, err;
1492 	void *dig_in = mdev->tconn->int_dig_in;
1493 	void *dig_vv = mdev->tconn->int_dig_vv;
1494 	unsigned long *data;
1495 
1496 	dgs = 0;
1497 	if (mdev->tconn->peer_integrity_tfm) {
1498 		dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1499 		/*
1500 		 * FIXME: Receive the incoming digest into the receive buffer
1501 		 *	  here, together with its struct p_data?
1502 		 */
1503 		err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1504 		if (err)
1505 			return NULL;
1506 		data_size -= dgs;
1507 	}
1508 
1509 	if (!expect(IS_ALIGNED(data_size, 512)))
1510 		return NULL;
1511 	if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1512 		return NULL;
1513 
1514 	/* even though we trust out peer,
1515 	 * we sometimes have to double check. */
1516 	if (sector + (data_size>>9) > capacity) {
1517 		dev_err(DEV, "request from peer beyond end of local disk: "
1518 			"capacity: %llus < sector: %llus + size: %u\n",
1519 			(unsigned long long)capacity,
1520 			(unsigned long long)sector, data_size);
1521 		return NULL;
1522 	}
1523 
1524 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1525 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1526 	 * which in turn might block on the other node at this very place.  */
1527 	peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1528 	if (!peer_req)
1529 		return NULL;
1530 
1531 	if (!data_size)
1532 		return peer_req;
1533 
1534 	ds = data_size;
1535 	page = peer_req->pages;
1536 	page_chain_for_each(page) {
1537 		unsigned len = min_t(int, ds, PAGE_SIZE);
1538 		data = kmap(page);
1539 		err = drbd_recv_all_warn(mdev->tconn, data, len);
1540 		if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1541 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1542 			data[0] = data[0] ^ (unsigned long)-1;
1543 		}
1544 		kunmap(page);
1545 		if (err) {
1546 			drbd_free_peer_req(mdev, peer_req);
1547 			return NULL;
1548 		}
1549 		ds -= len;
1550 	}
1551 
1552 	if (dgs) {
1553 		drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1554 		if (memcmp(dig_in, dig_vv, dgs)) {
1555 			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1556 				(unsigned long long)sector, data_size);
1557 			drbd_free_peer_req(mdev, peer_req);
1558 			return NULL;
1559 		}
1560 	}
1561 	mdev->recv_cnt += data_size>>9;
1562 	return peer_req;
1563 }
1564 
1565 /* drbd_drain_block() just takes a data block
1566  * out of the socket input buffer, and discards it.
1567  */
1568 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1569 {
1570 	struct page *page;
1571 	int err = 0;
1572 	void *data;
1573 
1574 	if (!data_size)
1575 		return 0;
1576 
1577 	page = drbd_alloc_pages(mdev, 1, 1);
1578 
1579 	data = kmap(page);
1580 	while (data_size) {
1581 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1582 
1583 		err = drbd_recv_all_warn(mdev->tconn, data, len);
1584 		if (err)
1585 			break;
1586 		data_size -= len;
1587 	}
1588 	kunmap(page);
1589 	drbd_free_pages(mdev, page, 0);
1590 	return err;
1591 }
1592 
1593 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1594 			   sector_t sector, int data_size)
1595 {
1596 	struct bio_vec *bvec;
1597 	struct bio *bio;
1598 	int dgs, err, i, expect;
1599 	void *dig_in = mdev->tconn->int_dig_in;
1600 	void *dig_vv = mdev->tconn->int_dig_vv;
1601 
1602 	dgs = 0;
1603 	if (mdev->tconn->peer_integrity_tfm) {
1604 		dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1605 		err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1606 		if (err)
1607 			return err;
1608 		data_size -= dgs;
1609 	}
1610 
1611 	/* optimistically update recv_cnt.  if receiving fails below,
1612 	 * we disconnect anyways, and counters will be reset. */
1613 	mdev->recv_cnt += data_size>>9;
1614 
1615 	bio = req->master_bio;
1616 	D_ASSERT(sector == bio->bi_sector);
1617 
1618 	bio_for_each_segment(bvec, bio, i) {
1619 		void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1620 		expect = min_t(int, data_size, bvec->bv_len);
1621 		err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1622 		kunmap(bvec->bv_page);
1623 		if (err)
1624 			return err;
1625 		data_size -= expect;
1626 	}
1627 
1628 	if (dgs) {
1629 		drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1630 		if (memcmp(dig_in, dig_vv, dgs)) {
1631 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1632 			return -EINVAL;
1633 		}
1634 	}
1635 
1636 	D_ASSERT(data_size == 0);
1637 	return 0;
1638 }
1639 
1640 /*
1641  * e_end_resync_block() is called in asender context via
1642  * drbd_finish_peer_reqs().
1643  */
1644 static int e_end_resync_block(struct drbd_work *w, int unused)
1645 {
1646 	struct drbd_peer_request *peer_req =
1647 		container_of(w, struct drbd_peer_request, w);
1648 	struct drbd_conf *mdev = w->mdev;
1649 	sector_t sector = peer_req->i.sector;
1650 	int err;
1651 
1652 	D_ASSERT(drbd_interval_empty(&peer_req->i));
1653 
1654 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1655 		drbd_set_in_sync(mdev, sector, peer_req->i.size);
1656 		err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1657 	} else {
1658 		/* Record failure to sync */
1659 		drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1660 
1661 		err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1662 	}
1663 	dec_unacked(mdev);
1664 
1665 	return err;
1666 }
1667 
1668 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1669 {
1670 	struct drbd_peer_request *peer_req;
1671 
1672 	peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1673 	if (!peer_req)
1674 		goto fail;
1675 
1676 	dec_rs_pending(mdev);
1677 
1678 	inc_unacked(mdev);
1679 	/* corresponding dec_unacked() in e_end_resync_block()
1680 	 * respective _drbd_clear_done_ee */
1681 
1682 	peer_req->w.cb = e_end_resync_block;
1683 
1684 	spin_lock_irq(&mdev->tconn->req_lock);
1685 	list_add(&peer_req->w.list, &mdev->sync_ee);
1686 	spin_unlock_irq(&mdev->tconn->req_lock);
1687 
1688 	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1689 	if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1690 		return 0;
1691 
1692 	/* don't care for the reason here */
1693 	dev_err(DEV, "submit failed, triggering re-connect\n");
1694 	spin_lock_irq(&mdev->tconn->req_lock);
1695 	list_del(&peer_req->w.list);
1696 	spin_unlock_irq(&mdev->tconn->req_lock);
1697 
1698 	drbd_free_peer_req(mdev, peer_req);
1699 fail:
1700 	put_ldev(mdev);
1701 	return -EIO;
1702 }
1703 
1704 static struct drbd_request *
1705 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1706 	     sector_t sector, bool missing_ok, const char *func)
1707 {
1708 	struct drbd_request *req;
1709 
1710 	/* Request object according to our peer */
1711 	req = (struct drbd_request *)(unsigned long)id;
1712 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1713 		return req;
1714 	if (!missing_ok) {
1715 		dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1716 			(unsigned long)id, (unsigned long long)sector);
1717 	}
1718 	return NULL;
1719 }
1720 
1721 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1722 {
1723 	struct drbd_conf *mdev;
1724 	struct drbd_request *req;
1725 	sector_t sector;
1726 	int err;
1727 	struct p_data *p = pi->data;
1728 
1729 	mdev = vnr_to_mdev(tconn, pi->vnr);
1730 	if (!mdev)
1731 		return -EIO;
1732 
1733 	sector = be64_to_cpu(p->sector);
1734 
1735 	spin_lock_irq(&mdev->tconn->req_lock);
1736 	req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1737 	spin_unlock_irq(&mdev->tconn->req_lock);
1738 	if (unlikely(!req))
1739 		return -EIO;
1740 
1741 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1742 	 * special casing it there for the various failure cases.
1743 	 * still no race with drbd_fail_pending_reads */
1744 	err = recv_dless_read(mdev, req, sector, pi->size);
1745 	if (!err)
1746 		req_mod(req, DATA_RECEIVED);
1747 	/* else: nothing. handled from drbd_disconnect...
1748 	 * I don't think we may complete this just yet
1749 	 * in case we are "on-disconnect: freeze" */
1750 
1751 	return err;
1752 }
1753 
1754 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1755 {
1756 	struct drbd_conf *mdev;
1757 	sector_t sector;
1758 	int err;
1759 	struct p_data *p = pi->data;
1760 
1761 	mdev = vnr_to_mdev(tconn, pi->vnr);
1762 	if (!mdev)
1763 		return -EIO;
1764 
1765 	sector = be64_to_cpu(p->sector);
1766 	D_ASSERT(p->block_id == ID_SYNCER);
1767 
1768 	if (get_ldev(mdev)) {
1769 		/* data is submitted to disk within recv_resync_read.
1770 		 * corresponding put_ldev done below on error,
1771 		 * or in drbd_peer_request_endio. */
1772 		err = recv_resync_read(mdev, sector, pi->size);
1773 	} else {
1774 		if (__ratelimit(&drbd_ratelimit_state))
1775 			dev_err(DEV, "Can not write resync data to local disk.\n");
1776 
1777 		err = drbd_drain_block(mdev, pi->size);
1778 
1779 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1780 	}
1781 
1782 	atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1783 
1784 	return err;
1785 }
1786 
1787 static void restart_conflicting_writes(struct drbd_conf *mdev,
1788 				       sector_t sector, int size)
1789 {
1790 	struct drbd_interval *i;
1791 	struct drbd_request *req;
1792 
1793 	drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1794 		if (!i->local)
1795 			continue;
1796 		req = container_of(i, struct drbd_request, i);
1797 		if (req->rq_state & RQ_LOCAL_PENDING ||
1798 		    !(req->rq_state & RQ_POSTPONED))
1799 			continue;
1800 		/* as it is RQ_POSTPONED, this will cause it to
1801 		 * be queued on the retry workqueue. */
1802 		__req_mod(req, CONFLICT_RESOLVED, NULL);
1803 	}
1804 }
1805 
1806 /*
1807  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1808  */
1809 static int e_end_block(struct drbd_work *w, int cancel)
1810 {
1811 	struct drbd_peer_request *peer_req =
1812 		container_of(w, struct drbd_peer_request, w);
1813 	struct drbd_conf *mdev = w->mdev;
1814 	sector_t sector = peer_req->i.sector;
1815 	int err = 0, pcmd;
1816 
1817 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1818 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1819 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1820 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1821 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1822 				P_RS_WRITE_ACK : P_WRITE_ACK;
1823 			err = drbd_send_ack(mdev, pcmd, peer_req);
1824 			if (pcmd == P_RS_WRITE_ACK)
1825 				drbd_set_in_sync(mdev, sector, peer_req->i.size);
1826 		} else {
1827 			err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1828 			/* we expect it to be marked out of sync anyways...
1829 			 * maybe assert this?  */
1830 		}
1831 		dec_unacked(mdev);
1832 	}
1833 	/* we delete from the conflict detection hash _after_ we sent out the
1834 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1835 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1836 		spin_lock_irq(&mdev->tconn->req_lock);
1837 		D_ASSERT(!drbd_interval_empty(&peer_req->i));
1838 		drbd_remove_epoch_entry_interval(mdev, peer_req);
1839 		if (peer_req->flags & EE_RESTART_REQUESTS)
1840 			restart_conflicting_writes(mdev, sector, peer_req->i.size);
1841 		spin_unlock_irq(&mdev->tconn->req_lock);
1842 	} else
1843 		D_ASSERT(drbd_interval_empty(&peer_req->i));
1844 
1845 	drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1846 
1847 	return err;
1848 }
1849 
1850 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1851 {
1852 	struct drbd_conf *mdev = w->mdev;
1853 	struct drbd_peer_request *peer_req =
1854 		container_of(w, struct drbd_peer_request, w);
1855 	int err;
1856 
1857 	err = drbd_send_ack(mdev, ack, peer_req);
1858 	dec_unacked(mdev);
1859 
1860 	return err;
1861 }
1862 
1863 static int e_send_superseded(struct drbd_work *w, int unused)
1864 {
1865 	return e_send_ack(w, P_SUPERSEDED);
1866 }
1867 
1868 static int e_send_retry_write(struct drbd_work *w, int unused)
1869 {
1870 	struct drbd_tconn *tconn = w->mdev->tconn;
1871 
1872 	return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1873 			     P_RETRY_WRITE : P_SUPERSEDED);
1874 }
1875 
1876 static bool seq_greater(u32 a, u32 b)
1877 {
1878 	/*
1879 	 * We assume 32-bit wrap-around here.
1880 	 * For 24-bit wrap-around, we would have to shift:
1881 	 *  a <<= 8; b <<= 8;
1882 	 */
1883 	return (s32)a - (s32)b > 0;
1884 }
1885 
1886 static u32 seq_max(u32 a, u32 b)
1887 {
1888 	return seq_greater(a, b) ? a : b;
1889 }
1890 
1891 static bool need_peer_seq(struct drbd_conf *mdev)
1892 {
1893 	struct drbd_tconn *tconn = mdev->tconn;
1894 	int tp;
1895 
1896 	/*
1897 	 * We only need to keep track of the last packet_seq number of our peer
1898 	 * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1899 	 * handle_write_conflicts().
1900 	 */
1901 
1902 	rcu_read_lock();
1903 	tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1904 	rcu_read_unlock();
1905 
1906 	return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1907 }
1908 
1909 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1910 {
1911 	unsigned int newest_peer_seq;
1912 
1913 	if (need_peer_seq(mdev)) {
1914 		spin_lock(&mdev->peer_seq_lock);
1915 		newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1916 		mdev->peer_seq = newest_peer_seq;
1917 		spin_unlock(&mdev->peer_seq_lock);
1918 		/* wake up only if we actually changed mdev->peer_seq */
1919 		if (peer_seq == newest_peer_seq)
1920 			wake_up(&mdev->seq_wait);
1921 	}
1922 }
1923 
1924 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1925 {
1926 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1927 }
1928 
1929 /* maybe change sync_ee into interval trees as well? */
1930 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1931 {
1932 	struct drbd_peer_request *rs_req;
1933 	bool rv = 0;
1934 
1935 	spin_lock_irq(&mdev->tconn->req_lock);
1936 	list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1937 		if (overlaps(peer_req->i.sector, peer_req->i.size,
1938 			     rs_req->i.sector, rs_req->i.size)) {
1939 			rv = 1;
1940 			break;
1941 		}
1942 	}
1943 	spin_unlock_irq(&mdev->tconn->req_lock);
1944 
1945 	return rv;
1946 }
1947 
1948 /* Called from receive_Data.
1949  * Synchronize packets on sock with packets on msock.
1950  *
1951  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1952  * packet traveling on msock, they are still processed in the order they have
1953  * been sent.
1954  *
1955  * Note: we don't care for Ack packets overtaking P_DATA packets.
1956  *
1957  * In case packet_seq is larger than mdev->peer_seq number, there are
1958  * outstanding packets on the msock. We wait for them to arrive.
1959  * In case we are the logically next packet, we update mdev->peer_seq
1960  * ourselves. Correctly handles 32bit wrap around.
1961  *
1962  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1963  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1964  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1965  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1966  *
1967  * returns 0 if we may process the packet,
1968  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1969 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1970 {
1971 	DEFINE_WAIT(wait);
1972 	long timeout;
1973 	int ret;
1974 
1975 	if (!need_peer_seq(mdev))
1976 		return 0;
1977 
1978 	spin_lock(&mdev->peer_seq_lock);
1979 	for (;;) {
1980 		if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1981 			mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1982 			ret = 0;
1983 			break;
1984 		}
1985 		if (signal_pending(current)) {
1986 			ret = -ERESTARTSYS;
1987 			break;
1988 		}
1989 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1990 		spin_unlock(&mdev->peer_seq_lock);
1991 		rcu_read_lock();
1992 		timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1993 		rcu_read_unlock();
1994 		timeout = schedule_timeout(timeout);
1995 		spin_lock(&mdev->peer_seq_lock);
1996 		if (!timeout) {
1997 			ret = -ETIMEDOUT;
1998 			dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1999 			break;
2000 		}
2001 	}
2002 	spin_unlock(&mdev->peer_seq_lock);
2003 	finish_wait(&mdev->seq_wait, &wait);
2004 	return ret;
2005 }
2006 
2007 /* see also bio_flags_to_wire()
2008  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2009  * flags and back. We may replicate to other kernel versions. */
2010 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2011 {
2012 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2013 		(dpf & DP_FUA ? REQ_FUA : 0) |
2014 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2015 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2016 }
2017 
2018 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2019 				    unsigned int size)
2020 {
2021 	struct drbd_interval *i;
2022 
2023     repeat:
2024 	drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2025 		struct drbd_request *req;
2026 		struct bio_and_error m;
2027 
2028 		if (!i->local)
2029 			continue;
2030 		req = container_of(i, struct drbd_request, i);
2031 		if (!(req->rq_state & RQ_POSTPONED))
2032 			continue;
2033 		req->rq_state &= ~RQ_POSTPONED;
2034 		__req_mod(req, NEG_ACKED, &m);
2035 		spin_unlock_irq(&mdev->tconn->req_lock);
2036 		if (m.bio)
2037 			complete_master_bio(mdev, &m);
2038 		spin_lock_irq(&mdev->tconn->req_lock);
2039 		goto repeat;
2040 	}
2041 }
2042 
2043 static int handle_write_conflicts(struct drbd_conf *mdev,
2044 				  struct drbd_peer_request *peer_req)
2045 {
2046 	struct drbd_tconn *tconn = mdev->tconn;
2047 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2048 	sector_t sector = peer_req->i.sector;
2049 	const unsigned int size = peer_req->i.size;
2050 	struct drbd_interval *i;
2051 	bool equal;
2052 	int err;
2053 
2054 	/*
2055 	 * Inserting the peer request into the write_requests tree will prevent
2056 	 * new conflicting local requests from being added.
2057 	 */
2058 	drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2059 
2060     repeat:
2061 	drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2062 		if (i == &peer_req->i)
2063 			continue;
2064 
2065 		if (!i->local) {
2066 			/*
2067 			 * Our peer has sent a conflicting remote request; this
2068 			 * should not happen in a two-node setup.  Wait for the
2069 			 * earlier peer request to complete.
2070 			 */
2071 			err = drbd_wait_misc(mdev, i);
2072 			if (err)
2073 				goto out;
2074 			goto repeat;
2075 		}
2076 
2077 		equal = i->sector == sector && i->size == size;
2078 		if (resolve_conflicts) {
2079 			/*
2080 			 * If the peer request is fully contained within the
2081 			 * overlapping request, it can be considered overwritten
2082 			 * and thus superseded; otherwise, it will be retried
2083 			 * once all overlapping requests have completed.
2084 			 */
2085 			bool superseded = i->sector <= sector && i->sector +
2086 				       (i->size >> 9) >= sector + (size >> 9);
2087 
2088 			if (!equal)
2089 				dev_alert(DEV, "Concurrent writes detected: "
2090 					       "local=%llus +%u, remote=%llus +%u, "
2091 					       "assuming %s came first\n",
2092 					  (unsigned long long)i->sector, i->size,
2093 					  (unsigned long long)sector, size,
2094 					  superseded ? "local" : "remote");
2095 
2096 			inc_unacked(mdev);
2097 			peer_req->w.cb = superseded ? e_send_superseded :
2098 						   e_send_retry_write;
2099 			list_add_tail(&peer_req->w.list, &mdev->done_ee);
2100 			wake_asender(mdev->tconn);
2101 
2102 			err = -ENOENT;
2103 			goto out;
2104 		} else {
2105 			struct drbd_request *req =
2106 				container_of(i, struct drbd_request, i);
2107 
2108 			if (!equal)
2109 				dev_alert(DEV, "Concurrent writes detected: "
2110 					       "local=%llus +%u, remote=%llus +%u\n",
2111 					  (unsigned long long)i->sector, i->size,
2112 					  (unsigned long long)sector, size);
2113 
2114 			if (req->rq_state & RQ_LOCAL_PENDING ||
2115 			    !(req->rq_state & RQ_POSTPONED)) {
2116 				/*
2117 				 * Wait for the node with the discard flag to
2118 				 * decide if this request has been superseded
2119 				 * or needs to be retried.
2120 				 * Requests that have been superseded will
2121 				 * disappear from the write_requests tree.
2122 				 *
2123 				 * In addition, wait for the conflicting
2124 				 * request to finish locally before submitting
2125 				 * the conflicting peer request.
2126 				 */
2127 				err = drbd_wait_misc(mdev, &req->i);
2128 				if (err) {
2129 					_conn_request_state(mdev->tconn,
2130 							    NS(conn, C_TIMEOUT),
2131 							    CS_HARD);
2132 					fail_postponed_requests(mdev, sector, size);
2133 					goto out;
2134 				}
2135 				goto repeat;
2136 			}
2137 			/*
2138 			 * Remember to restart the conflicting requests after
2139 			 * the new peer request has completed.
2140 			 */
2141 			peer_req->flags |= EE_RESTART_REQUESTS;
2142 		}
2143 	}
2144 	err = 0;
2145 
2146     out:
2147 	if (err)
2148 		drbd_remove_epoch_entry_interval(mdev, peer_req);
2149 	return err;
2150 }
2151 
2152 /* mirrored write */
2153 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2154 {
2155 	struct drbd_conf *mdev;
2156 	sector_t sector;
2157 	struct drbd_peer_request *peer_req;
2158 	struct p_data *p = pi->data;
2159 	u32 peer_seq = be32_to_cpu(p->seq_num);
2160 	int rw = WRITE;
2161 	u32 dp_flags;
2162 	int err, tp;
2163 
2164 	mdev = vnr_to_mdev(tconn, pi->vnr);
2165 	if (!mdev)
2166 		return -EIO;
2167 
2168 	if (!get_ldev(mdev)) {
2169 		int err2;
2170 
2171 		err = wait_for_and_update_peer_seq(mdev, peer_seq);
2172 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2173 		atomic_inc(&tconn->current_epoch->epoch_size);
2174 		err2 = drbd_drain_block(mdev, pi->size);
2175 		if (!err)
2176 			err = err2;
2177 		return err;
2178 	}
2179 
2180 	/*
2181 	 * Corresponding put_ldev done either below (on various errors), or in
2182 	 * drbd_peer_request_endio, if we successfully submit the data at the
2183 	 * end of this function.
2184 	 */
2185 
2186 	sector = be64_to_cpu(p->sector);
2187 	peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2188 	if (!peer_req) {
2189 		put_ldev(mdev);
2190 		return -EIO;
2191 	}
2192 
2193 	peer_req->w.cb = e_end_block;
2194 
2195 	dp_flags = be32_to_cpu(p->dp_flags);
2196 	rw |= wire_flags_to_bio(mdev, dp_flags);
2197 	if (peer_req->pages == NULL) {
2198 		D_ASSERT(peer_req->i.size == 0);
2199 		D_ASSERT(dp_flags & DP_FLUSH);
2200 	}
2201 
2202 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2203 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2204 
2205 	spin_lock(&tconn->epoch_lock);
2206 	peer_req->epoch = tconn->current_epoch;
2207 	atomic_inc(&peer_req->epoch->epoch_size);
2208 	atomic_inc(&peer_req->epoch->active);
2209 	spin_unlock(&tconn->epoch_lock);
2210 
2211 	rcu_read_lock();
2212 	tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2213 	rcu_read_unlock();
2214 	if (tp) {
2215 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2216 		err = wait_for_and_update_peer_seq(mdev, peer_seq);
2217 		if (err)
2218 			goto out_interrupted;
2219 		spin_lock_irq(&mdev->tconn->req_lock);
2220 		err = handle_write_conflicts(mdev, peer_req);
2221 		if (err) {
2222 			spin_unlock_irq(&mdev->tconn->req_lock);
2223 			if (err == -ENOENT) {
2224 				put_ldev(mdev);
2225 				return 0;
2226 			}
2227 			goto out_interrupted;
2228 		}
2229 	} else
2230 		spin_lock_irq(&mdev->tconn->req_lock);
2231 	list_add(&peer_req->w.list, &mdev->active_ee);
2232 	spin_unlock_irq(&mdev->tconn->req_lock);
2233 
2234 	if (mdev->state.conn == C_SYNC_TARGET)
2235 		wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2236 
2237 	if (mdev->tconn->agreed_pro_version < 100) {
2238 		rcu_read_lock();
2239 		switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2240 		case DRBD_PROT_C:
2241 			dp_flags |= DP_SEND_WRITE_ACK;
2242 			break;
2243 		case DRBD_PROT_B:
2244 			dp_flags |= DP_SEND_RECEIVE_ACK;
2245 			break;
2246 		}
2247 		rcu_read_unlock();
2248 	}
2249 
2250 	if (dp_flags & DP_SEND_WRITE_ACK) {
2251 		peer_req->flags |= EE_SEND_WRITE_ACK;
2252 		inc_unacked(mdev);
2253 		/* corresponding dec_unacked() in e_end_block()
2254 		 * respective _drbd_clear_done_ee */
2255 	}
2256 
2257 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2258 		/* I really don't like it that the receiver thread
2259 		 * sends on the msock, but anyways */
2260 		drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2261 	}
2262 
2263 	if (mdev->state.pdsk < D_INCONSISTENT) {
2264 		/* In case we have the only disk of the cluster, */
2265 		drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2266 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2267 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2268 		drbd_al_begin_io(mdev, &peer_req->i);
2269 	}
2270 
2271 	err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2272 	if (!err)
2273 		return 0;
2274 
2275 	/* don't care for the reason here */
2276 	dev_err(DEV, "submit failed, triggering re-connect\n");
2277 	spin_lock_irq(&mdev->tconn->req_lock);
2278 	list_del(&peer_req->w.list);
2279 	drbd_remove_epoch_entry_interval(mdev, peer_req);
2280 	spin_unlock_irq(&mdev->tconn->req_lock);
2281 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2282 		drbd_al_complete_io(mdev, &peer_req->i);
2283 
2284 out_interrupted:
2285 	drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2286 	put_ldev(mdev);
2287 	drbd_free_peer_req(mdev, peer_req);
2288 	return err;
2289 }
2290 
2291 /* We may throttle resync, if the lower device seems to be busy,
2292  * and current sync rate is above c_min_rate.
2293  *
2294  * To decide whether or not the lower device is busy, we use a scheme similar
2295  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2296  * (more than 64 sectors) of activity we cannot account for with our own resync
2297  * activity, it obviously is "busy".
2298  *
2299  * The current sync rate used here uses only the most recent two step marks,
2300  * to have a short time average so we can react faster.
2301  */
2302 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2303 {
2304 	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2305 	unsigned long db, dt, dbdt;
2306 	struct lc_element *tmp;
2307 	int curr_events;
2308 	int throttle = 0;
2309 	unsigned int c_min_rate;
2310 
2311 	rcu_read_lock();
2312 	c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2313 	rcu_read_unlock();
2314 
2315 	/* feature disabled? */
2316 	if (c_min_rate == 0)
2317 		return 0;
2318 
2319 	spin_lock_irq(&mdev->al_lock);
2320 	tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2321 	if (tmp) {
2322 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2323 		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2324 			spin_unlock_irq(&mdev->al_lock);
2325 			return 0;
2326 		}
2327 		/* Do not slow down if app IO is already waiting for this extent */
2328 	}
2329 	spin_unlock_irq(&mdev->al_lock);
2330 
2331 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2332 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2333 			atomic_read(&mdev->rs_sect_ev);
2334 
2335 	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2336 		unsigned long rs_left;
2337 		int i;
2338 
2339 		mdev->rs_last_events = curr_events;
2340 
2341 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2342 		 * approx. */
2343 		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2344 
2345 		if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2346 			rs_left = mdev->ov_left;
2347 		else
2348 			rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2349 
2350 		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2351 		if (!dt)
2352 			dt++;
2353 		db = mdev->rs_mark_left[i] - rs_left;
2354 		dbdt = Bit2KB(db/dt);
2355 
2356 		if (dbdt > c_min_rate)
2357 			throttle = 1;
2358 	}
2359 	return throttle;
2360 }
2361 
2362 
2363 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2364 {
2365 	struct drbd_conf *mdev;
2366 	sector_t sector;
2367 	sector_t capacity;
2368 	struct drbd_peer_request *peer_req;
2369 	struct digest_info *di = NULL;
2370 	int size, verb;
2371 	unsigned int fault_type;
2372 	struct p_block_req *p =	pi->data;
2373 
2374 	mdev = vnr_to_mdev(tconn, pi->vnr);
2375 	if (!mdev)
2376 		return -EIO;
2377 	capacity = drbd_get_capacity(mdev->this_bdev);
2378 
2379 	sector = be64_to_cpu(p->sector);
2380 	size   = be32_to_cpu(p->blksize);
2381 
2382 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2383 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2384 				(unsigned long long)sector, size);
2385 		return -EINVAL;
2386 	}
2387 	if (sector + (size>>9) > capacity) {
2388 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2389 				(unsigned long long)sector, size);
2390 		return -EINVAL;
2391 	}
2392 
2393 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2394 		verb = 1;
2395 		switch (pi->cmd) {
2396 		case P_DATA_REQUEST:
2397 			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2398 			break;
2399 		case P_RS_DATA_REQUEST:
2400 		case P_CSUM_RS_REQUEST:
2401 		case P_OV_REQUEST:
2402 			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2403 			break;
2404 		case P_OV_REPLY:
2405 			verb = 0;
2406 			dec_rs_pending(mdev);
2407 			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2408 			break;
2409 		default:
2410 			BUG();
2411 		}
2412 		if (verb && __ratelimit(&drbd_ratelimit_state))
2413 			dev_err(DEV, "Can not satisfy peer's read request, "
2414 			    "no local data.\n");
2415 
2416 		/* drain possibly payload */
2417 		return drbd_drain_block(mdev, pi->size);
2418 	}
2419 
2420 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2421 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2422 	 * which in turn might block on the other node at this very place.  */
2423 	peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2424 	if (!peer_req) {
2425 		put_ldev(mdev);
2426 		return -ENOMEM;
2427 	}
2428 
2429 	switch (pi->cmd) {
2430 	case P_DATA_REQUEST:
2431 		peer_req->w.cb = w_e_end_data_req;
2432 		fault_type = DRBD_FAULT_DT_RD;
2433 		/* application IO, don't drbd_rs_begin_io */
2434 		goto submit;
2435 
2436 	case P_RS_DATA_REQUEST:
2437 		peer_req->w.cb = w_e_end_rsdata_req;
2438 		fault_type = DRBD_FAULT_RS_RD;
2439 		/* used in the sector offset progress display */
2440 		mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2441 		break;
2442 
2443 	case P_OV_REPLY:
2444 	case P_CSUM_RS_REQUEST:
2445 		fault_type = DRBD_FAULT_RS_RD;
2446 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2447 		if (!di)
2448 			goto out_free_e;
2449 
2450 		di->digest_size = pi->size;
2451 		di->digest = (((char *)di)+sizeof(struct digest_info));
2452 
2453 		peer_req->digest = di;
2454 		peer_req->flags |= EE_HAS_DIGEST;
2455 
2456 		if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2457 			goto out_free_e;
2458 
2459 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2460 			D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2461 			peer_req->w.cb = w_e_end_csum_rs_req;
2462 			/* used in the sector offset progress display */
2463 			mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2464 		} else if (pi->cmd == P_OV_REPLY) {
2465 			/* track progress, we may need to throttle */
2466 			atomic_add(size >> 9, &mdev->rs_sect_in);
2467 			peer_req->w.cb = w_e_end_ov_reply;
2468 			dec_rs_pending(mdev);
2469 			/* drbd_rs_begin_io done when we sent this request,
2470 			 * but accounting still needs to be done. */
2471 			goto submit_for_resync;
2472 		}
2473 		break;
2474 
2475 	case P_OV_REQUEST:
2476 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2477 		    mdev->tconn->agreed_pro_version >= 90) {
2478 			unsigned long now = jiffies;
2479 			int i;
2480 			mdev->ov_start_sector = sector;
2481 			mdev->ov_position = sector;
2482 			mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2483 			mdev->rs_total = mdev->ov_left;
2484 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2485 				mdev->rs_mark_left[i] = mdev->ov_left;
2486 				mdev->rs_mark_time[i] = now;
2487 			}
2488 			dev_info(DEV, "Online Verify start sector: %llu\n",
2489 					(unsigned long long)sector);
2490 		}
2491 		peer_req->w.cb = w_e_end_ov_req;
2492 		fault_type = DRBD_FAULT_RS_RD;
2493 		break;
2494 
2495 	default:
2496 		BUG();
2497 	}
2498 
2499 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2500 	 * wrt the receiver, but it is not as straightforward as it may seem.
2501 	 * Various places in the resync start and stop logic assume resync
2502 	 * requests are processed in order, requeuing this on the worker thread
2503 	 * introduces a bunch of new code for synchronization between threads.
2504 	 *
2505 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2506 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2507 	 * for application writes for the same time.  For now, just throttle
2508 	 * here, where the rest of the code expects the receiver to sleep for
2509 	 * a while, anyways.
2510 	 */
2511 
2512 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2513 	 * this defers syncer requests for some time, before letting at least
2514 	 * on request through.  The resync controller on the receiving side
2515 	 * will adapt to the incoming rate accordingly.
2516 	 *
2517 	 * We cannot throttle here if remote is Primary/SyncTarget:
2518 	 * we would also throttle its application reads.
2519 	 * In that case, throttling is done on the SyncTarget only.
2520 	 */
2521 	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2522 		schedule_timeout_uninterruptible(HZ/10);
2523 	if (drbd_rs_begin_io(mdev, sector))
2524 		goto out_free_e;
2525 
2526 submit_for_resync:
2527 	atomic_add(size >> 9, &mdev->rs_sect_ev);
2528 
2529 submit:
2530 	inc_unacked(mdev);
2531 	spin_lock_irq(&mdev->tconn->req_lock);
2532 	list_add_tail(&peer_req->w.list, &mdev->read_ee);
2533 	spin_unlock_irq(&mdev->tconn->req_lock);
2534 
2535 	if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2536 		return 0;
2537 
2538 	/* don't care for the reason here */
2539 	dev_err(DEV, "submit failed, triggering re-connect\n");
2540 	spin_lock_irq(&mdev->tconn->req_lock);
2541 	list_del(&peer_req->w.list);
2542 	spin_unlock_irq(&mdev->tconn->req_lock);
2543 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2544 
2545 out_free_e:
2546 	put_ldev(mdev);
2547 	drbd_free_peer_req(mdev, peer_req);
2548 	return -EIO;
2549 }
2550 
2551 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2552 {
2553 	int self, peer, rv = -100;
2554 	unsigned long ch_self, ch_peer;
2555 	enum drbd_after_sb_p after_sb_0p;
2556 
2557 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2558 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2559 
2560 	ch_peer = mdev->p_uuid[UI_SIZE];
2561 	ch_self = mdev->comm_bm_set;
2562 
2563 	rcu_read_lock();
2564 	after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2565 	rcu_read_unlock();
2566 	switch (after_sb_0p) {
2567 	case ASB_CONSENSUS:
2568 	case ASB_DISCARD_SECONDARY:
2569 	case ASB_CALL_HELPER:
2570 	case ASB_VIOLENTLY:
2571 		dev_err(DEV, "Configuration error.\n");
2572 		break;
2573 	case ASB_DISCONNECT:
2574 		break;
2575 	case ASB_DISCARD_YOUNGER_PRI:
2576 		if (self == 0 && peer == 1) {
2577 			rv = -1;
2578 			break;
2579 		}
2580 		if (self == 1 && peer == 0) {
2581 			rv =  1;
2582 			break;
2583 		}
2584 		/* Else fall through to one of the other strategies... */
2585 	case ASB_DISCARD_OLDER_PRI:
2586 		if (self == 0 && peer == 1) {
2587 			rv = 1;
2588 			break;
2589 		}
2590 		if (self == 1 && peer == 0) {
2591 			rv = -1;
2592 			break;
2593 		}
2594 		/* Else fall through to one of the other strategies... */
2595 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2596 		     "Using discard-least-changes instead\n");
2597 	case ASB_DISCARD_ZERO_CHG:
2598 		if (ch_peer == 0 && ch_self == 0) {
2599 			rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2600 				? -1 : 1;
2601 			break;
2602 		} else {
2603 			if (ch_peer == 0) { rv =  1; break; }
2604 			if (ch_self == 0) { rv = -1; break; }
2605 		}
2606 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2607 			break;
2608 	case ASB_DISCARD_LEAST_CHG:
2609 		if	(ch_self < ch_peer)
2610 			rv = -1;
2611 		else if (ch_self > ch_peer)
2612 			rv =  1;
2613 		else /* ( ch_self == ch_peer ) */
2614 		     /* Well, then use something else. */
2615 			rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2616 				? -1 : 1;
2617 		break;
2618 	case ASB_DISCARD_LOCAL:
2619 		rv = -1;
2620 		break;
2621 	case ASB_DISCARD_REMOTE:
2622 		rv =  1;
2623 	}
2624 
2625 	return rv;
2626 }
2627 
2628 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2629 {
2630 	int hg, rv = -100;
2631 	enum drbd_after_sb_p after_sb_1p;
2632 
2633 	rcu_read_lock();
2634 	after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2635 	rcu_read_unlock();
2636 	switch (after_sb_1p) {
2637 	case ASB_DISCARD_YOUNGER_PRI:
2638 	case ASB_DISCARD_OLDER_PRI:
2639 	case ASB_DISCARD_LEAST_CHG:
2640 	case ASB_DISCARD_LOCAL:
2641 	case ASB_DISCARD_REMOTE:
2642 	case ASB_DISCARD_ZERO_CHG:
2643 		dev_err(DEV, "Configuration error.\n");
2644 		break;
2645 	case ASB_DISCONNECT:
2646 		break;
2647 	case ASB_CONSENSUS:
2648 		hg = drbd_asb_recover_0p(mdev);
2649 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2650 			rv = hg;
2651 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2652 			rv = hg;
2653 		break;
2654 	case ASB_VIOLENTLY:
2655 		rv = drbd_asb_recover_0p(mdev);
2656 		break;
2657 	case ASB_DISCARD_SECONDARY:
2658 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2659 	case ASB_CALL_HELPER:
2660 		hg = drbd_asb_recover_0p(mdev);
2661 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2662 			enum drbd_state_rv rv2;
2663 
2664 			drbd_set_role(mdev, R_SECONDARY, 0);
2665 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2666 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2667 			  * we do not need to wait for the after state change work either. */
2668 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2669 			if (rv2 != SS_SUCCESS) {
2670 				drbd_khelper(mdev, "pri-lost-after-sb");
2671 			} else {
2672 				dev_warn(DEV, "Successfully gave up primary role.\n");
2673 				rv = hg;
2674 			}
2675 		} else
2676 			rv = hg;
2677 	}
2678 
2679 	return rv;
2680 }
2681 
2682 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2683 {
2684 	int hg, rv = -100;
2685 	enum drbd_after_sb_p after_sb_2p;
2686 
2687 	rcu_read_lock();
2688 	after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2689 	rcu_read_unlock();
2690 	switch (after_sb_2p) {
2691 	case ASB_DISCARD_YOUNGER_PRI:
2692 	case ASB_DISCARD_OLDER_PRI:
2693 	case ASB_DISCARD_LEAST_CHG:
2694 	case ASB_DISCARD_LOCAL:
2695 	case ASB_DISCARD_REMOTE:
2696 	case ASB_CONSENSUS:
2697 	case ASB_DISCARD_SECONDARY:
2698 	case ASB_DISCARD_ZERO_CHG:
2699 		dev_err(DEV, "Configuration error.\n");
2700 		break;
2701 	case ASB_VIOLENTLY:
2702 		rv = drbd_asb_recover_0p(mdev);
2703 		break;
2704 	case ASB_DISCONNECT:
2705 		break;
2706 	case ASB_CALL_HELPER:
2707 		hg = drbd_asb_recover_0p(mdev);
2708 		if (hg == -1) {
2709 			enum drbd_state_rv rv2;
2710 
2711 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2712 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2713 			  * we do not need to wait for the after state change work either. */
2714 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2715 			if (rv2 != SS_SUCCESS) {
2716 				drbd_khelper(mdev, "pri-lost-after-sb");
2717 			} else {
2718 				dev_warn(DEV, "Successfully gave up primary role.\n");
2719 				rv = hg;
2720 			}
2721 		} else
2722 			rv = hg;
2723 	}
2724 
2725 	return rv;
2726 }
2727 
2728 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2729 			   u64 bits, u64 flags)
2730 {
2731 	if (!uuid) {
2732 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2733 		return;
2734 	}
2735 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2736 	     text,
2737 	     (unsigned long long)uuid[UI_CURRENT],
2738 	     (unsigned long long)uuid[UI_BITMAP],
2739 	     (unsigned long long)uuid[UI_HISTORY_START],
2740 	     (unsigned long long)uuid[UI_HISTORY_END],
2741 	     (unsigned long long)bits,
2742 	     (unsigned long long)flags);
2743 }
2744 
2745 /*
2746   100	after split brain try auto recover
2747     2	C_SYNC_SOURCE set BitMap
2748     1	C_SYNC_SOURCE use BitMap
2749     0	no Sync
2750    -1	C_SYNC_TARGET use BitMap
2751    -2	C_SYNC_TARGET set BitMap
2752  -100	after split brain, disconnect
2753 -1000	unrelated data
2754 -1091   requires proto 91
2755 -1096   requires proto 96
2756  */
2757 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2758 {
2759 	u64 self, peer;
2760 	int i, j;
2761 
2762 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2763 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2764 
2765 	*rule_nr = 10;
2766 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2767 		return 0;
2768 
2769 	*rule_nr = 20;
2770 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2771 	     peer != UUID_JUST_CREATED)
2772 		return -2;
2773 
2774 	*rule_nr = 30;
2775 	if (self != UUID_JUST_CREATED &&
2776 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2777 		return 2;
2778 
2779 	if (self == peer) {
2780 		int rct, dc; /* roles at crash time */
2781 
2782 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2783 
2784 			if (mdev->tconn->agreed_pro_version < 91)
2785 				return -1091;
2786 
2787 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2788 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2789 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2790 				drbd_uuid_move_history(mdev);
2791 				mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2792 				mdev->ldev->md.uuid[UI_BITMAP] = 0;
2793 
2794 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2795 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2796 				*rule_nr = 34;
2797 			} else {
2798 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2799 				*rule_nr = 36;
2800 			}
2801 
2802 			return 1;
2803 		}
2804 
2805 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2806 
2807 			if (mdev->tconn->agreed_pro_version < 91)
2808 				return -1091;
2809 
2810 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2811 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2812 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2813 
2814 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2815 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2816 				mdev->p_uuid[UI_BITMAP] = 0UL;
2817 
2818 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2819 				*rule_nr = 35;
2820 			} else {
2821 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2822 				*rule_nr = 37;
2823 			}
2824 
2825 			return -1;
2826 		}
2827 
2828 		/* Common power [off|failure] */
2829 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2830 			(mdev->p_uuid[UI_FLAGS] & 2);
2831 		/* lowest bit is set when we were primary,
2832 		 * next bit (weight 2) is set when peer was primary */
2833 		*rule_nr = 40;
2834 
2835 		switch (rct) {
2836 		case 0: /* !self_pri && !peer_pri */ return 0;
2837 		case 1: /*  self_pri && !peer_pri */ return 1;
2838 		case 2: /* !self_pri &&  peer_pri */ return -1;
2839 		case 3: /*  self_pri &&  peer_pri */
2840 			dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2841 			return dc ? -1 : 1;
2842 		}
2843 	}
2844 
2845 	*rule_nr = 50;
2846 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2847 	if (self == peer)
2848 		return -1;
2849 
2850 	*rule_nr = 51;
2851 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2852 	if (self == peer) {
2853 		if (mdev->tconn->agreed_pro_version < 96 ?
2854 		    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2855 		    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2856 		    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2857 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2858 			   resync as sync source modifications of the peer's UUIDs. */
2859 
2860 			if (mdev->tconn->agreed_pro_version < 91)
2861 				return -1091;
2862 
2863 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2864 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2865 
2866 			dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2867 			drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2868 
2869 			return -1;
2870 		}
2871 	}
2872 
2873 	*rule_nr = 60;
2874 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2875 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2876 		peer = mdev->p_uuid[i] & ~((u64)1);
2877 		if (self == peer)
2878 			return -2;
2879 	}
2880 
2881 	*rule_nr = 70;
2882 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2883 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2884 	if (self == peer)
2885 		return 1;
2886 
2887 	*rule_nr = 71;
2888 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2889 	if (self == peer) {
2890 		if (mdev->tconn->agreed_pro_version < 96 ?
2891 		    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2892 		    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2893 		    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2894 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2895 			   resync as sync source modifications of our UUIDs. */
2896 
2897 			if (mdev->tconn->agreed_pro_version < 91)
2898 				return -1091;
2899 
2900 			__drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2901 			__drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2902 
2903 			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2904 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2905 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2906 
2907 			return 1;
2908 		}
2909 	}
2910 
2911 
2912 	*rule_nr = 80;
2913 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2914 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2915 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2916 		if (self == peer)
2917 			return 2;
2918 	}
2919 
2920 	*rule_nr = 90;
2921 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2922 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2923 	if (self == peer && self != ((u64)0))
2924 		return 100;
2925 
2926 	*rule_nr = 100;
2927 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2928 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2929 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2930 			peer = mdev->p_uuid[j] & ~((u64)1);
2931 			if (self == peer)
2932 				return -100;
2933 		}
2934 	}
2935 
2936 	return -1000;
2937 }
2938 
2939 /* drbd_sync_handshake() returns the new conn state on success, or
2940    CONN_MASK (-1) on failure.
2941  */
2942 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2943 					   enum drbd_disk_state peer_disk) __must_hold(local)
2944 {
2945 	enum drbd_conns rv = C_MASK;
2946 	enum drbd_disk_state mydisk;
2947 	struct net_conf *nc;
2948 	int hg, rule_nr, rr_conflict, tentative;
2949 
2950 	mydisk = mdev->state.disk;
2951 	if (mydisk == D_NEGOTIATING)
2952 		mydisk = mdev->new_state_tmp.disk;
2953 
2954 	dev_info(DEV, "drbd_sync_handshake:\n");
2955 
2956 	spin_lock_irq(&mdev->ldev->md.uuid_lock);
2957 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2958 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2959 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2960 
2961 	hg = drbd_uuid_compare(mdev, &rule_nr);
2962 	spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2963 
2964 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2965 
2966 	if (hg == -1000) {
2967 		dev_alert(DEV, "Unrelated data, aborting!\n");
2968 		return C_MASK;
2969 	}
2970 	if (hg < -1000) {
2971 		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2972 		return C_MASK;
2973 	}
2974 
2975 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2976 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2977 		int f = (hg == -100) || abs(hg) == 2;
2978 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2979 		if (f)
2980 			hg = hg*2;
2981 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2982 		     hg > 0 ? "source" : "target");
2983 	}
2984 
2985 	if (abs(hg) == 100)
2986 		drbd_khelper(mdev, "initial-split-brain");
2987 
2988 	rcu_read_lock();
2989 	nc = rcu_dereference(mdev->tconn->net_conf);
2990 
2991 	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2992 		int pcount = (mdev->state.role == R_PRIMARY)
2993 			   + (peer_role == R_PRIMARY);
2994 		int forced = (hg == -100);
2995 
2996 		switch (pcount) {
2997 		case 0:
2998 			hg = drbd_asb_recover_0p(mdev);
2999 			break;
3000 		case 1:
3001 			hg = drbd_asb_recover_1p(mdev);
3002 			break;
3003 		case 2:
3004 			hg = drbd_asb_recover_2p(mdev);
3005 			break;
3006 		}
3007 		if (abs(hg) < 100) {
3008 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
3009 			     "automatically solved. Sync from %s node\n",
3010 			     pcount, (hg < 0) ? "peer" : "this");
3011 			if (forced) {
3012 				dev_warn(DEV, "Doing a full sync, since"
3013 				     " UUIDs where ambiguous.\n");
3014 				hg = hg*2;
3015 			}
3016 		}
3017 	}
3018 
3019 	if (hg == -100) {
3020 		if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3021 			hg = -1;
3022 		if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3023 			hg = 1;
3024 
3025 		if (abs(hg) < 100)
3026 			dev_warn(DEV, "Split-Brain detected, manually solved. "
3027 			     "Sync from %s node\n",
3028 			     (hg < 0) ? "peer" : "this");
3029 	}
3030 	rr_conflict = nc->rr_conflict;
3031 	tentative = nc->tentative;
3032 	rcu_read_unlock();
3033 
3034 	if (hg == -100) {
3035 		/* FIXME this log message is not correct if we end up here
3036 		 * after an attempted attach on a diskless node.
3037 		 * We just refuse to attach -- well, we drop the "connection"
3038 		 * to that disk, in a way... */
3039 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3040 		drbd_khelper(mdev, "split-brain");
3041 		return C_MASK;
3042 	}
3043 
3044 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3045 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3046 		return C_MASK;
3047 	}
3048 
3049 	if (hg < 0 && /* by intention we do not use mydisk here. */
3050 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3051 		switch (rr_conflict) {
3052 		case ASB_CALL_HELPER:
3053 			drbd_khelper(mdev, "pri-lost");
3054 			/* fall through */
3055 		case ASB_DISCONNECT:
3056 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3057 			return C_MASK;
3058 		case ASB_VIOLENTLY:
3059 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3060 			     "assumption\n");
3061 		}
3062 	}
3063 
3064 	if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3065 		if (hg == 0)
3066 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3067 		else
3068 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3069 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3070 				 abs(hg) >= 2 ? "full" : "bit-map based");
3071 		return C_MASK;
3072 	}
3073 
3074 	if (abs(hg) >= 2) {
3075 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3076 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3077 					BM_LOCKED_SET_ALLOWED))
3078 			return C_MASK;
3079 	}
3080 
3081 	if (hg > 0) { /* become sync source. */
3082 		rv = C_WF_BITMAP_S;
3083 	} else if (hg < 0) { /* become sync target */
3084 		rv = C_WF_BITMAP_T;
3085 	} else {
3086 		rv = C_CONNECTED;
3087 		if (drbd_bm_total_weight(mdev)) {
3088 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3089 			     drbd_bm_total_weight(mdev));
3090 		}
3091 	}
3092 
3093 	return rv;
3094 }
3095 
3096 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3097 {
3098 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3099 	if (peer == ASB_DISCARD_REMOTE)
3100 		return ASB_DISCARD_LOCAL;
3101 
3102 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3103 	if (peer == ASB_DISCARD_LOCAL)
3104 		return ASB_DISCARD_REMOTE;
3105 
3106 	/* everything else is valid if they are equal on both sides. */
3107 	return peer;
3108 }
3109 
3110 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3111 {
3112 	struct p_protocol *p = pi->data;
3113 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3114 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3115 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3116 	char integrity_alg[SHARED_SECRET_MAX] = "";
3117 	struct crypto_hash *peer_integrity_tfm = NULL;
3118 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3119 
3120 	p_proto		= be32_to_cpu(p->protocol);
3121 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3122 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3123 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3124 	p_two_primaries = be32_to_cpu(p->two_primaries);
3125 	cf		= be32_to_cpu(p->conn_flags);
3126 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3127 
3128 	if (tconn->agreed_pro_version >= 87) {
3129 		int err;
3130 
3131 		if (pi->size > sizeof(integrity_alg))
3132 			return -EIO;
3133 		err = drbd_recv_all(tconn, integrity_alg, pi->size);
3134 		if (err)
3135 			return err;
3136 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3137 	}
3138 
3139 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3140 		clear_bit(CONN_DRY_RUN, &tconn->flags);
3141 
3142 		if (cf & CF_DRY_RUN)
3143 			set_bit(CONN_DRY_RUN, &tconn->flags);
3144 
3145 		rcu_read_lock();
3146 		nc = rcu_dereference(tconn->net_conf);
3147 
3148 		if (p_proto != nc->wire_protocol) {
3149 			conn_err(tconn, "incompatible %s settings\n", "protocol");
3150 			goto disconnect_rcu_unlock;
3151 		}
3152 
3153 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3154 			conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3155 			goto disconnect_rcu_unlock;
3156 		}
3157 
3158 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3159 			conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3160 			goto disconnect_rcu_unlock;
3161 		}
3162 
3163 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3164 			conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3165 			goto disconnect_rcu_unlock;
3166 		}
3167 
3168 		if (p_discard_my_data && nc->discard_my_data) {
3169 			conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3170 			goto disconnect_rcu_unlock;
3171 		}
3172 
3173 		if (p_two_primaries != nc->two_primaries) {
3174 			conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3175 			goto disconnect_rcu_unlock;
3176 		}
3177 
3178 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3179 			conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3180 			goto disconnect_rcu_unlock;
3181 		}
3182 
3183 		rcu_read_unlock();
3184 	}
3185 
3186 	if (integrity_alg[0]) {
3187 		int hash_size;
3188 
3189 		/*
3190 		 * We can only change the peer data integrity algorithm
3191 		 * here.  Changing our own data integrity algorithm
3192 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3193 		 * the same time; otherwise, the peer has no way to
3194 		 * tell between which packets the algorithm should
3195 		 * change.
3196 		 */
3197 
3198 		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3199 		if (!peer_integrity_tfm) {
3200 			conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3201 				 integrity_alg);
3202 			goto disconnect;
3203 		}
3204 
3205 		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3206 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3207 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3208 		if (!(int_dig_in && int_dig_vv)) {
3209 			conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3210 			goto disconnect;
3211 		}
3212 	}
3213 
3214 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3215 	if (!new_net_conf) {
3216 		conn_err(tconn, "Allocation of new net_conf failed\n");
3217 		goto disconnect;
3218 	}
3219 
3220 	mutex_lock(&tconn->data.mutex);
3221 	mutex_lock(&tconn->conf_update);
3222 	old_net_conf = tconn->net_conf;
3223 	*new_net_conf = *old_net_conf;
3224 
3225 	new_net_conf->wire_protocol = p_proto;
3226 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3227 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3228 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3229 	new_net_conf->two_primaries = p_two_primaries;
3230 
3231 	rcu_assign_pointer(tconn->net_conf, new_net_conf);
3232 	mutex_unlock(&tconn->conf_update);
3233 	mutex_unlock(&tconn->data.mutex);
3234 
3235 	crypto_free_hash(tconn->peer_integrity_tfm);
3236 	kfree(tconn->int_dig_in);
3237 	kfree(tconn->int_dig_vv);
3238 	tconn->peer_integrity_tfm = peer_integrity_tfm;
3239 	tconn->int_dig_in = int_dig_in;
3240 	tconn->int_dig_vv = int_dig_vv;
3241 
3242 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3243 		conn_info(tconn, "peer data-integrity-alg: %s\n",
3244 			  integrity_alg[0] ? integrity_alg : "(none)");
3245 
3246 	synchronize_rcu();
3247 	kfree(old_net_conf);
3248 	return 0;
3249 
3250 disconnect_rcu_unlock:
3251 	rcu_read_unlock();
3252 disconnect:
3253 	crypto_free_hash(peer_integrity_tfm);
3254 	kfree(int_dig_in);
3255 	kfree(int_dig_vv);
3256 	conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3257 	return -EIO;
3258 }
3259 
3260 /* helper function
3261  * input: alg name, feature name
3262  * return: NULL (alg name was "")
3263  *         ERR_PTR(error) if something goes wrong
3264  *         or the crypto hash ptr, if it worked out ok. */
3265 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3266 		const char *alg, const char *name)
3267 {
3268 	struct crypto_hash *tfm;
3269 
3270 	if (!alg[0])
3271 		return NULL;
3272 
3273 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3274 	if (IS_ERR(tfm)) {
3275 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3276 			alg, name, PTR_ERR(tfm));
3277 		return tfm;
3278 	}
3279 	return tfm;
3280 }
3281 
3282 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3283 {
3284 	void *buffer = tconn->data.rbuf;
3285 	int size = pi->size;
3286 
3287 	while (size) {
3288 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3289 		s = drbd_recv(tconn, buffer, s);
3290 		if (s <= 0) {
3291 			if (s < 0)
3292 				return s;
3293 			break;
3294 		}
3295 		size -= s;
3296 	}
3297 	if (size)
3298 		return -EIO;
3299 	return 0;
3300 }
3301 
3302 /*
3303  * config_unknown_volume  -  device configuration command for unknown volume
3304  *
3305  * When a device is added to an existing connection, the node on which the
3306  * device is added first will send configuration commands to its peer but the
3307  * peer will not know about the device yet.  It will warn and ignore these
3308  * commands.  Once the device is added on the second node, the second node will
3309  * send the same device configuration commands, but in the other direction.
3310  *
3311  * (We can also end up here if drbd is misconfigured.)
3312  */
3313 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3314 {
3315 	conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3316 		  cmdname(pi->cmd), pi->vnr);
3317 	return ignore_remaining_packet(tconn, pi);
3318 }
3319 
3320 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3321 {
3322 	struct drbd_conf *mdev;
3323 	struct p_rs_param_95 *p;
3324 	unsigned int header_size, data_size, exp_max_sz;
3325 	struct crypto_hash *verify_tfm = NULL;
3326 	struct crypto_hash *csums_tfm = NULL;
3327 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3328 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3329 	const int apv = tconn->agreed_pro_version;
3330 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3331 	int fifo_size = 0;
3332 	int err;
3333 
3334 	mdev = vnr_to_mdev(tconn, pi->vnr);
3335 	if (!mdev)
3336 		return config_unknown_volume(tconn, pi);
3337 
3338 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3339 		    : apv == 88 ? sizeof(struct p_rs_param)
3340 					+ SHARED_SECRET_MAX
3341 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3342 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3343 
3344 	if (pi->size > exp_max_sz) {
3345 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3346 		    pi->size, exp_max_sz);
3347 		return -EIO;
3348 	}
3349 
3350 	if (apv <= 88) {
3351 		header_size = sizeof(struct p_rs_param);
3352 		data_size = pi->size - header_size;
3353 	} else if (apv <= 94) {
3354 		header_size = sizeof(struct p_rs_param_89);
3355 		data_size = pi->size - header_size;
3356 		D_ASSERT(data_size == 0);
3357 	} else {
3358 		header_size = sizeof(struct p_rs_param_95);
3359 		data_size = pi->size - header_size;
3360 		D_ASSERT(data_size == 0);
3361 	}
3362 
3363 	/* initialize verify_alg and csums_alg */
3364 	p = pi->data;
3365 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3366 
3367 	err = drbd_recv_all(mdev->tconn, p, header_size);
3368 	if (err)
3369 		return err;
3370 
3371 	mutex_lock(&mdev->tconn->conf_update);
3372 	old_net_conf = mdev->tconn->net_conf;
3373 	if (get_ldev(mdev)) {
3374 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3375 		if (!new_disk_conf) {
3376 			put_ldev(mdev);
3377 			mutex_unlock(&mdev->tconn->conf_update);
3378 			dev_err(DEV, "Allocation of new disk_conf failed\n");
3379 			return -ENOMEM;
3380 		}
3381 
3382 		old_disk_conf = mdev->ldev->disk_conf;
3383 		*new_disk_conf = *old_disk_conf;
3384 
3385 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3386 	}
3387 
3388 	if (apv >= 88) {
3389 		if (apv == 88) {
3390 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3391 				dev_err(DEV, "verify-alg of wrong size, "
3392 					"peer wants %u, accepting only up to %u byte\n",
3393 					data_size, SHARED_SECRET_MAX);
3394 				err = -EIO;
3395 				goto reconnect;
3396 			}
3397 
3398 			err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3399 			if (err)
3400 				goto reconnect;
3401 			/* we expect NUL terminated string */
3402 			/* but just in case someone tries to be evil */
3403 			D_ASSERT(p->verify_alg[data_size-1] == 0);
3404 			p->verify_alg[data_size-1] = 0;
3405 
3406 		} else /* apv >= 89 */ {
3407 			/* we still expect NUL terminated strings */
3408 			/* but just in case someone tries to be evil */
3409 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3410 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3411 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3412 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3413 		}
3414 
3415 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3416 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3417 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3418 				    old_net_conf->verify_alg, p->verify_alg);
3419 				goto disconnect;
3420 			}
3421 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3422 					p->verify_alg, "verify-alg");
3423 			if (IS_ERR(verify_tfm)) {
3424 				verify_tfm = NULL;
3425 				goto disconnect;
3426 			}
3427 		}
3428 
3429 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3430 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3431 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3432 				    old_net_conf->csums_alg, p->csums_alg);
3433 				goto disconnect;
3434 			}
3435 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3436 					p->csums_alg, "csums-alg");
3437 			if (IS_ERR(csums_tfm)) {
3438 				csums_tfm = NULL;
3439 				goto disconnect;
3440 			}
3441 		}
3442 
3443 		if (apv > 94 && new_disk_conf) {
3444 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3445 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3446 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3447 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3448 
3449 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3450 			if (fifo_size != mdev->rs_plan_s->size) {
3451 				new_plan = fifo_alloc(fifo_size);
3452 				if (!new_plan) {
3453 					dev_err(DEV, "kmalloc of fifo_buffer failed");
3454 					put_ldev(mdev);
3455 					goto disconnect;
3456 				}
3457 			}
3458 		}
3459 
3460 		if (verify_tfm || csums_tfm) {
3461 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3462 			if (!new_net_conf) {
3463 				dev_err(DEV, "Allocation of new net_conf failed\n");
3464 				goto disconnect;
3465 			}
3466 
3467 			*new_net_conf = *old_net_conf;
3468 
3469 			if (verify_tfm) {
3470 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3471 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3472 				crypto_free_hash(mdev->tconn->verify_tfm);
3473 				mdev->tconn->verify_tfm = verify_tfm;
3474 				dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3475 			}
3476 			if (csums_tfm) {
3477 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3478 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3479 				crypto_free_hash(mdev->tconn->csums_tfm);
3480 				mdev->tconn->csums_tfm = csums_tfm;
3481 				dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3482 			}
3483 			rcu_assign_pointer(tconn->net_conf, new_net_conf);
3484 		}
3485 	}
3486 
3487 	if (new_disk_conf) {
3488 		rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3489 		put_ldev(mdev);
3490 	}
3491 
3492 	if (new_plan) {
3493 		old_plan = mdev->rs_plan_s;
3494 		rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3495 	}
3496 
3497 	mutex_unlock(&mdev->tconn->conf_update);
3498 	synchronize_rcu();
3499 	if (new_net_conf)
3500 		kfree(old_net_conf);
3501 	kfree(old_disk_conf);
3502 	kfree(old_plan);
3503 
3504 	return 0;
3505 
3506 reconnect:
3507 	if (new_disk_conf) {
3508 		put_ldev(mdev);
3509 		kfree(new_disk_conf);
3510 	}
3511 	mutex_unlock(&mdev->tconn->conf_update);
3512 	return -EIO;
3513 
3514 disconnect:
3515 	kfree(new_plan);
3516 	if (new_disk_conf) {
3517 		put_ldev(mdev);
3518 		kfree(new_disk_conf);
3519 	}
3520 	mutex_unlock(&mdev->tconn->conf_update);
3521 	/* just for completeness: actually not needed,
3522 	 * as this is not reached if csums_tfm was ok. */
3523 	crypto_free_hash(csums_tfm);
3524 	/* but free the verify_tfm again, if csums_tfm did not work out */
3525 	crypto_free_hash(verify_tfm);
3526 	conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3527 	return -EIO;
3528 }
3529 
3530 /* warn if the arguments differ by more than 12.5% */
3531 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3532 	const char *s, sector_t a, sector_t b)
3533 {
3534 	sector_t d;
3535 	if (a == 0 || b == 0)
3536 		return;
3537 	d = (a > b) ? (a - b) : (b - a);
3538 	if (d > (a>>3) || d > (b>>3))
3539 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3540 		     (unsigned long long)a, (unsigned long long)b);
3541 }
3542 
3543 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3544 {
3545 	struct drbd_conf *mdev;
3546 	struct p_sizes *p = pi->data;
3547 	enum determine_dev_size dd = unchanged;
3548 	sector_t p_size, p_usize, my_usize;
3549 	int ldsc = 0; /* local disk size changed */
3550 	enum dds_flags ddsf;
3551 
3552 	mdev = vnr_to_mdev(tconn, pi->vnr);
3553 	if (!mdev)
3554 		return config_unknown_volume(tconn, pi);
3555 
3556 	p_size = be64_to_cpu(p->d_size);
3557 	p_usize = be64_to_cpu(p->u_size);
3558 
3559 	/* just store the peer's disk size for now.
3560 	 * we still need to figure out whether we accept that. */
3561 	mdev->p_size = p_size;
3562 
3563 	if (get_ldev(mdev)) {
3564 		rcu_read_lock();
3565 		my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3566 		rcu_read_unlock();
3567 
3568 		warn_if_differ_considerably(mdev, "lower level device sizes",
3569 			   p_size, drbd_get_max_capacity(mdev->ldev));
3570 		warn_if_differ_considerably(mdev, "user requested size",
3571 					    p_usize, my_usize);
3572 
3573 		/* if this is the first connect, or an otherwise expected
3574 		 * param exchange, choose the minimum */
3575 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
3576 			p_usize = min_not_zero(my_usize, p_usize);
3577 
3578 		/* Never shrink a device with usable data during connect.
3579 		   But allow online shrinking if we are connected. */
3580 		if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3581 		    drbd_get_capacity(mdev->this_bdev) &&
3582 		    mdev->state.disk >= D_OUTDATED &&
3583 		    mdev->state.conn < C_CONNECTED) {
3584 			dev_err(DEV, "The peer's disk size is too small!\n");
3585 			conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3586 			put_ldev(mdev);
3587 			return -EIO;
3588 		}
3589 
3590 		if (my_usize != p_usize) {
3591 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3592 
3593 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3594 			if (!new_disk_conf) {
3595 				dev_err(DEV, "Allocation of new disk_conf failed\n");
3596 				put_ldev(mdev);
3597 				return -ENOMEM;
3598 			}
3599 
3600 			mutex_lock(&mdev->tconn->conf_update);
3601 			old_disk_conf = mdev->ldev->disk_conf;
3602 			*new_disk_conf = *old_disk_conf;
3603 			new_disk_conf->disk_size = p_usize;
3604 
3605 			rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3606 			mutex_unlock(&mdev->tconn->conf_update);
3607 			synchronize_rcu();
3608 			kfree(old_disk_conf);
3609 
3610 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3611 				 (unsigned long)my_usize);
3612 		}
3613 
3614 		put_ldev(mdev);
3615 	}
3616 
3617 	ddsf = be16_to_cpu(p->dds_flags);
3618 	if (get_ldev(mdev)) {
3619 		dd = drbd_determine_dev_size(mdev, ddsf);
3620 		put_ldev(mdev);
3621 		if (dd == dev_size_error)
3622 			return -EIO;
3623 		drbd_md_sync(mdev);
3624 	} else {
3625 		/* I am diskless, need to accept the peer's size. */
3626 		drbd_set_my_capacity(mdev, p_size);
3627 	}
3628 
3629 	mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3630 	drbd_reconsider_max_bio_size(mdev);
3631 
3632 	if (get_ldev(mdev)) {
3633 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3634 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3635 			ldsc = 1;
3636 		}
3637 
3638 		put_ldev(mdev);
3639 	}
3640 
3641 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3642 		if (be64_to_cpu(p->c_size) !=
3643 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3644 			/* we have different sizes, probably peer
3645 			 * needs to know my new size... */
3646 			drbd_send_sizes(mdev, 0, ddsf);
3647 		}
3648 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3649 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3650 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3651 			    mdev->state.disk >= D_INCONSISTENT) {
3652 				if (ddsf & DDSF_NO_RESYNC)
3653 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3654 				else
3655 					resync_after_online_grow(mdev);
3656 			} else
3657 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3658 		}
3659 	}
3660 
3661 	return 0;
3662 }
3663 
3664 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3665 {
3666 	struct drbd_conf *mdev;
3667 	struct p_uuids *p = pi->data;
3668 	u64 *p_uuid;
3669 	int i, updated_uuids = 0;
3670 
3671 	mdev = vnr_to_mdev(tconn, pi->vnr);
3672 	if (!mdev)
3673 		return config_unknown_volume(tconn, pi);
3674 
3675 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3676 	if (!p_uuid) {
3677 		dev_err(DEV, "kmalloc of p_uuid failed\n");
3678 		return false;
3679 	}
3680 
3681 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3682 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3683 
3684 	kfree(mdev->p_uuid);
3685 	mdev->p_uuid = p_uuid;
3686 
3687 	if (mdev->state.conn < C_CONNECTED &&
3688 	    mdev->state.disk < D_INCONSISTENT &&
3689 	    mdev->state.role == R_PRIMARY &&
3690 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3691 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3692 		    (unsigned long long)mdev->ed_uuid);
3693 		conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3694 		return -EIO;
3695 	}
3696 
3697 	if (get_ldev(mdev)) {
3698 		int skip_initial_sync =
3699 			mdev->state.conn == C_CONNECTED &&
3700 			mdev->tconn->agreed_pro_version >= 90 &&
3701 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3702 			(p_uuid[UI_FLAGS] & 8);
3703 		if (skip_initial_sync) {
3704 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3705 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3706 					"clear_n_write from receive_uuids",
3707 					BM_LOCKED_TEST_ALLOWED);
3708 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3709 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3710 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3711 					CS_VERBOSE, NULL);
3712 			drbd_md_sync(mdev);
3713 			updated_uuids = 1;
3714 		}
3715 		put_ldev(mdev);
3716 	} else if (mdev->state.disk < D_INCONSISTENT &&
3717 		   mdev->state.role == R_PRIMARY) {
3718 		/* I am a diskless primary, the peer just created a new current UUID
3719 		   for me. */
3720 		updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3721 	}
3722 
3723 	/* Before we test for the disk state, we should wait until an eventually
3724 	   ongoing cluster wide state change is finished. That is important if
3725 	   we are primary and are detaching from our disk. We need to see the
3726 	   new disk state... */
3727 	mutex_lock(mdev->state_mutex);
3728 	mutex_unlock(mdev->state_mutex);
3729 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3730 		updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3731 
3732 	if (updated_uuids)
3733 		drbd_print_uuids(mdev, "receiver updated UUIDs to");
3734 
3735 	return 0;
3736 }
3737 
3738 /**
3739  * convert_state() - Converts the peer's view of the cluster state to our point of view
3740  * @ps:		The state as seen by the peer.
3741  */
3742 static union drbd_state convert_state(union drbd_state ps)
3743 {
3744 	union drbd_state ms;
3745 
3746 	static enum drbd_conns c_tab[] = {
3747 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3748 		[C_CONNECTED] = C_CONNECTED,
3749 
3750 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3751 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3752 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3753 		[C_VERIFY_S]       = C_VERIFY_T,
3754 		[C_MASK]   = C_MASK,
3755 	};
3756 
3757 	ms.i = ps.i;
3758 
3759 	ms.conn = c_tab[ps.conn];
3760 	ms.peer = ps.role;
3761 	ms.role = ps.peer;
3762 	ms.pdsk = ps.disk;
3763 	ms.disk = ps.pdsk;
3764 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3765 
3766 	return ms;
3767 }
3768 
3769 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3770 {
3771 	struct drbd_conf *mdev;
3772 	struct p_req_state *p = pi->data;
3773 	union drbd_state mask, val;
3774 	enum drbd_state_rv rv;
3775 
3776 	mdev = vnr_to_mdev(tconn, pi->vnr);
3777 	if (!mdev)
3778 		return -EIO;
3779 
3780 	mask.i = be32_to_cpu(p->mask);
3781 	val.i = be32_to_cpu(p->val);
3782 
3783 	if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3784 	    mutex_is_locked(mdev->state_mutex)) {
3785 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3786 		return 0;
3787 	}
3788 
3789 	mask = convert_state(mask);
3790 	val = convert_state(val);
3791 
3792 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3793 	drbd_send_sr_reply(mdev, rv);
3794 
3795 	drbd_md_sync(mdev);
3796 
3797 	return 0;
3798 }
3799 
3800 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3801 {
3802 	struct p_req_state *p = pi->data;
3803 	union drbd_state mask, val;
3804 	enum drbd_state_rv rv;
3805 
3806 	mask.i = be32_to_cpu(p->mask);
3807 	val.i = be32_to_cpu(p->val);
3808 
3809 	if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3810 	    mutex_is_locked(&tconn->cstate_mutex)) {
3811 		conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3812 		return 0;
3813 	}
3814 
3815 	mask = convert_state(mask);
3816 	val = convert_state(val);
3817 
3818 	rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3819 	conn_send_sr_reply(tconn, rv);
3820 
3821 	return 0;
3822 }
3823 
3824 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3825 {
3826 	struct drbd_conf *mdev;
3827 	struct p_state *p = pi->data;
3828 	union drbd_state os, ns, peer_state;
3829 	enum drbd_disk_state real_peer_disk;
3830 	enum chg_state_flags cs_flags;
3831 	int rv;
3832 
3833 	mdev = vnr_to_mdev(tconn, pi->vnr);
3834 	if (!mdev)
3835 		return config_unknown_volume(tconn, pi);
3836 
3837 	peer_state.i = be32_to_cpu(p->state);
3838 
3839 	real_peer_disk = peer_state.disk;
3840 	if (peer_state.disk == D_NEGOTIATING) {
3841 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3842 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3843 	}
3844 
3845 	spin_lock_irq(&mdev->tconn->req_lock);
3846  retry:
3847 	os = ns = drbd_read_state(mdev);
3848 	spin_unlock_irq(&mdev->tconn->req_lock);
3849 
3850 	/* If some other part of the code (asender thread, timeout)
3851 	 * already decided to close the connection again,
3852 	 * we must not "re-establish" it here. */
3853 	if (os.conn <= C_TEAR_DOWN)
3854 		return -ECONNRESET;
3855 
3856 	/* If this is the "end of sync" confirmation, usually the peer disk
3857 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3858 	 * set) resync started in PausedSyncT, or if the timing of pause-/
3859 	 * unpause-sync events has been "just right", the peer disk may
3860 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3861 	 */
3862 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3863 	    real_peer_disk == D_UP_TO_DATE &&
3864 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3865 		/* If we are (becoming) SyncSource, but peer is still in sync
3866 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3867 		 * will change to inconsistent once the peer reaches active
3868 		 * syncing states.
3869 		 * It may have changed syncer-paused flags, however, so we
3870 		 * cannot ignore this completely. */
3871 		if (peer_state.conn > C_CONNECTED &&
3872 		    peer_state.conn < C_SYNC_SOURCE)
3873 			real_peer_disk = D_INCONSISTENT;
3874 
3875 		/* if peer_state changes to connected at the same time,
3876 		 * it explicitly notifies us that it finished resync.
3877 		 * Maybe we should finish it up, too? */
3878 		else if (os.conn >= C_SYNC_SOURCE &&
3879 			 peer_state.conn == C_CONNECTED) {
3880 			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3881 				drbd_resync_finished(mdev);
3882 			return 0;
3883 		}
3884 	}
3885 
3886 	/* explicit verify finished notification, stop sector reached. */
3887 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3888 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3889 		ov_out_of_sync_print(mdev);
3890 		drbd_resync_finished(mdev);
3891 		return 0;
3892 	}
3893 
3894 	/* peer says his disk is inconsistent, while we think it is uptodate,
3895 	 * and this happens while the peer still thinks we have a sync going on,
3896 	 * but we think we are already done with the sync.
3897 	 * We ignore this to avoid flapping pdsk.
3898 	 * This should not happen, if the peer is a recent version of drbd. */
3899 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3900 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3901 		real_peer_disk = D_UP_TO_DATE;
3902 
3903 	if (ns.conn == C_WF_REPORT_PARAMS)
3904 		ns.conn = C_CONNECTED;
3905 
3906 	if (peer_state.conn == C_AHEAD)
3907 		ns.conn = C_BEHIND;
3908 
3909 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3910 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3911 		int cr; /* consider resync */
3912 
3913 		/* if we established a new connection */
3914 		cr  = (os.conn < C_CONNECTED);
3915 		/* if we had an established connection
3916 		 * and one of the nodes newly attaches a disk */
3917 		cr |= (os.conn == C_CONNECTED &&
3918 		       (peer_state.disk == D_NEGOTIATING ||
3919 			os.disk == D_NEGOTIATING));
3920 		/* if we have both been inconsistent, and the peer has been
3921 		 * forced to be UpToDate with --overwrite-data */
3922 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3923 		/* if we had been plain connected, and the admin requested to
3924 		 * start a sync by "invalidate" or "invalidate-remote" */
3925 		cr |= (os.conn == C_CONNECTED &&
3926 				(peer_state.conn >= C_STARTING_SYNC_S &&
3927 				 peer_state.conn <= C_WF_BITMAP_T));
3928 
3929 		if (cr)
3930 			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3931 
3932 		put_ldev(mdev);
3933 		if (ns.conn == C_MASK) {
3934 			ns.conn = C_CONNECTED;
3935 			if (mdev->state.disk == D_NEGOTIATING) {
3936 				drbd_force_state(mdev, NS(disk, D_FAILED));
3937 			} else if (peer_state.disk == D_NEGOTIATING) {
3938 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3939 				peer_state.disk = D_DISKLESS;
3940 				real_peer_disk = D_DISKLESS;
3941 			} else {
3942 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3943 					return -EIO;
3944 				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3945 				conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3946 				return -EIO;
3947 			}
3948 		}
3949 	}
3950 
3951 	spin_lock_irq(&mdev->tconn->req_lock);
3952 	if (os.i != drbd_read_state(mdev).i)
3953 		goto retry;
3954 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3955 	ns.peer = peer_state.role;
3956 	ns.pdsk = real_peer_disk;
3957 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3958 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3959 		ns.disk = mdev->new_state_tmp.disk;
3960 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3961 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3962 	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3963 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3964 		   for temporal network outages! */
3965 		spin_unlock_irq(&mdev->tconn->req_lock);
3966 		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3967 		tl_clear(mdev->tconn);
3968 		drbd_uuid_new_current(mdev);
3969 		clear_bit(NEW_CUR_UUID, &mdev->flags);
3970 		conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3971 		return -EIO;
3972 	}
3973 	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3974 	ns = drbd_read_state(mdev);
3975 	spin_unlock_irq(&mdev->tconn->req_lock);
3976 
3977 	if (rv < SS_SUCCESS) {
3978 		conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3979 		return -EIO;
3980 	}
3981 
3982 	if (os.conn > C_WF_REPORT_PARAMS) {
3983 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3984 		    peer_state.disk != D_NEGOTIATING ) {
3985 			/* we want resync, peer has not yet decided to sync... */
3986 			/* Nowadays only used when forcing a node into primary role and
3987 			   setting its disk to UpToDate with that */
3988 			drbd_send_uuids(mdev);
3989 			drbd_send_current_state(mdev);
3990 		}
3991 	}
3992 
3993 	clear_bit(DISCARD_MY_DATA, &mdev->flags);
3994 
3995 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3996 
3997 	return 0;
3998 }
3999 
4000 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
4001 {
4002 	struct drbd_conf *mdev;
4003 	struct p_rs_uuid *p = pi->data;
4004 
4005 	mdev = vnr_to_mdev(tconn, pi->vnr);
4006 	if (!mdev)
4007 		return -EIO;
4008 
4009 	wait_event(mdev->misc_wait,
4010 		   mdev->state.conn == C_WF_SYNC_UUID ||
4011 		   mdev->state.conn == C_BEHIND ||
4012 		   mdev->state.conn < C_CONNECTED ||
4013 		   mdev->state.disk < D_NEGOTIATING);
4014 
4015 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4016 
4017 	/* Here the _drbd_uuid_ functions are right, current should
4018 	   _not_ be rotated into the history */
4019 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4020 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4021 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4022 
4023 		drbd_print_uuids(mdev, "updated sync uuid");
4024 		drbd_start_resync(mdev, C_SYNC_TARGET);
4025 
4026 		put_ldev(mdev);
4027 	} else
4028 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
4029 
4030 	return 0;
4031 }
4032 
4033 /**
4034  * receive_bitmap_plain
4035  *
4036  * Return 0 when done, 1 when another iteration is needed, and a negative error
4037  * code upon failure.
4038  */
4039 static int
4040 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4041 		     unsigned long *p, struct bm_xfer_ctx *c)
4042 {
4043 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4044 				 drbd_header_size(mdev->tconn);
4045 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4046 				       c->bm_words - c->word_offset);
4047 	unsigned int want = num_words * sizeof(*p);
4048 	int err;
4049 
4050 	if (want != size) {
4051 		dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4052 		return -EIO;
4053 	}
4054 	if (want == 0)
4055 		return 0;
4056 	err = drbd_recv_all(mdev->tconn, p, want);
4057 	if (err)
4058 		return err;
4059 
4060 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4061 
4062 	c->word_offset += num_words;
4063 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4064 	if (c->bit_offset > c->bm_bits)
4065 		c->bit_offset = c->bm_bits;
4066 
4067 	return 1;
4068 }
4069 
4070 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4071 {
4072 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4073 }
4074 
4075 static int dcbp_get_start(struct p_compressed_bm *p)
4076 {
4077 	return (p->encoding & 0x80) != 0;
4078 }
4079 
4080 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4081 {
4082 	return (p->encoding >> 4) & 0x7;
4083 }
4084 
4085 /**
4086  * recv_bm_rle_bits
4087  *
4088  * Return 0 when done, 1 when another iteration is needed, and a negative error
4089  * code upon failure.
4090  */
4091 static int
4092 recv_bm_rle_bits(struct drbd_conf *mdev,
4093 		struct p_compressed_bm *p,
4094 		 struct bm_xfer_ctx *c,
4095 		 unsigned int len)
4096 {
4097 	struct bitstream bs;
4098 	u64 look_ahead;
4099 	u64 rl;
4100 	u64 tmp;
4101 	unsigned long s = c->bit_offset;
4102 	unsigned long e;
4103 	int toggle = dcbp_get_start(p);
4104 	int have;
4105 	int bits;
4106 
4107 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4108 
4109 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4110 	if (bits < 0)
4111 		return -EIO;
4112 
4113 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4114 		bits = vli_decode_bits(&rl, look_ahead);
4115 		if (bits <= 0)
4116 			return -EIO;
4117 
4118 		if (toggle) {
4119 			e = s + rl -1;
4120 			if (e >= c->bm_bits) {
4121 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4122 				return -EIO;
4123 			}
4124 			_drbd_bm_set_bits(mdev, s, e);
4125 		}
4126 
4127 		if (have < bits) {
4128 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4129 				have, bits, look_ahead,
4130 				(unsigned int)(bs.cur.b - p->code),
4131 				(unsigned int)bs.buf_len);
4132 			return -EIO;
4133 		}
4134 		look_ahead >>= bits;
4135 		have -= bits;
4136 
4137 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4138 		if (bits < 0)
4139 			return -EIO;
4140 		look_ahead |= tmp << have;
4141 		have += bits;
4142 	}
4143 
4144 	c->bit_offset = s;
4145 	bm_xfer_ctx_bit_to_word_offset(c);
4146 
4147 	return (s != c->bm_bits);
4148 }
4149 
4150 /**
4151  * decode_bitmap_c
4152  *
4153  * Return 0 when done, 1 when another iteration is needed, and a negative error
4154  * code upon failure.
4155  */
4156 static int
4157 decode_bitmap_c(struct drbd_conf *mdev,
4158 		struct p_compressed_bm *p,
4159 		struct bm_xfer_ctx *c,
4160 		unsigned int len)
4161 {
4162 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4163 		return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4164 
4165 	/* other variants had been implemented for evaluation,
4166 	 * but have been dropped as this one turned out to be "best"
4167 	 * during all our tests. */
4168 
4169 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4170 	conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4171 	return -EIO;
4172 }
4173 
4174 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4175 		const char *direction, struct bm_xfer_ctx *c)
4176 {
4177 	/* what would it take to transfer it "plaintext" */
4178 	unsigned int header_size = drbd_header_size(mdev->tconn);
4179 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4180 	unsigned int plain =
4181 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4182 		c->bm_words * sizeof(unsigned long);
4183 	unsigned int total = c->bytes[0] + c->bytes[1];
4184 	unsigned int r;
4185 
4186 	/* total can not be zero. but just in case: */
4187 	if (total == 0)
4188 		return;
4189 
4190 	/* don't report if not compressed */
4191 	if (total >= plain)
4192 		return;
4193 
4194 	/* total < plain. check for overflow, still */
4195 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4196 		                    : (1000 * total / plain);
4197 
4198 	if (r > 1000)
4199 		r = 1000;
4200 
4201 	r = 1000 - r;
4202 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4203 	     "total %u; compression: %u.%u%%\n",
4204 			direction,
4205 			c->bytes[1], c->packets[1],
4206 			c->bytes[0], c->packets[0],
4207 			total, r/10, r % 10);
4208 }
4209 
4210 /* Since we are processing the bitfield from lower addresses to higher,
4211    it does not matter if the process it in 32 bit chunks or 64 bit
4212    chunks as long as it is little endian. (Understand it as byte stream,
4213    beginning with the lowest byte...) If we would use big endian
4214    we would need to process it from the highest address to the lowest,
4215    in order to be agnostic to the 32 vs 64 bits issue.
4216 
4217    returns 0 on failure, 1 if we successfully received it. */
4218 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4219 {
4220 	struct drbd_conf *mdev;
4221 	struct bm_xfer_ctx c;
4222 	int err;
4223 
4224 	mdev = vnr_to_mdev(tconn, pi->vnr);
4225 	if (!mdev)
4226 		return -EIO;
4227 
4228 	drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4229 	/* you are supposed to send additional out-of-sync information
4230 	 * if you actually set bits during this phase */
4231 
4232 	c = (struct bm_xfer_ctx) {
4233 		.bm_bits = drbd_bm_bits(mdev),
4234 		.bm_words = drbd_bm_words(mdev),
4235 	};
4236 
4237 	for(;;) {
4238 		if (pi->cmd == P_BITMAP)
4239 			err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4240 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4241 			/* MAYBE: sanity check that we speak proto >= 90,
4242 			 * and the feature is enabled! */
4243 			struct p_compressed_bm *p = pi->data;
4244 
4245 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4246 				dev_err(DEV, "ReportCBitmap packet too large\n");
4247 				err = -EIO;
4248 				goto out;
4249 			}
4250 			if (pi->size <= sizeof(*p)) {
4251 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4252 				err = -EIO;
4253 				goto out;
4254 			}
4255 			err = drbd_recv_all(mdev->tconn, p, pi->size);
4256 			if (err)
4257 			       goto out;
4258 			err = decode_bitmap_c(mdev, p, &c, pi->size);
4259 		} else {
4260 			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4261 			err = -EIO;
4262 			goto out;
4263 		}
4264 
4265 		c.packets[pi->cmd == P_BITMAP]++;
4266 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4267 
4268 		if (err <= 0) {
4269 			if (err < 0)
4270 				goto out;
4271 			break;
4272 		}
4273 		err = drbd_recv_header(mdev->tconn, pi);
4274 		if (err)
4275 			goto out;
4276 	}
4277 
4278 	INFO_bm_xfer_stats(mdev, "receive", &c);
4279 
4280 	if (mdev->state.conn == C_WF_BITMAP_T) {
4281 		enum drbd_state_rv rv;
4282 
4283 		err = drbd_send_bitmap(mdev);
4284 		if (err)
4285 			goto out;
4286 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4287 		rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4288 		D_ASSERT(rv == SS_SUCCESS);
4289 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
4290 		/* admin may have requested C_DISCONNECTING,
4291 		 * other threads may have noticed network errors */
4292 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4293 		    drbd_conn_str(mdev->state.conn));
4294 	}
4295 	err = 0;
4296 
4297  out:
4298 	drbd_bm_unlock(mdev);
4299 	if (!err && mdev->state.conn == C_WF_BITMAP_S)
4300 		drbd_start_resync(mdev, C_SYNC_SOURCE);
4301 	return err;
4302 }
4303 
4304 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4305 {
4306 	conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4307 		 pi->cmd, pi->size);
4308 
4309 	return ignore_remaining_packet(tconn, pi);
4310 }
4311 
4312 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4313 {
4314 	/* Make sure we've acked all the TCP data associated
4315 	 * with the data requests being unplugged */
4316 	drbd_tcp_quickack(tconn->data.socket);
4317 
4318 	return 0;
4319 }
4320 
4321 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4322 {
4323 	struct drbd_conf *mdev;
4324 	struct p_block_desc *p = pi->data;
4325 
4326 	mdev = vnr_to_mdev(tconn, pi->vnr);
4327 	if (!mdev)
4328 		return -EIO;
4329 
4330 	switch (mdev->state.conn) {
4331 	case C_WF_SYNC_UUID:
4332 	case C_WF_BITMAP_T:
4333 	case C_BEHIND:
4334 			break;
4335 	default:
4336 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4337 				drbd_conn_str(mdev->state.conn));
4338 	}
4339 
4340 	drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4341 
4342 	return 0;
4343 }
4344 
4345 struct data_cmd {
4346 	int expect_payload;
4347 	size_t pkt_size;
4348 	int (*fn)(struct drbd_tconn *, struct packet_info *);
4349 };
4350 
4351 static struct data_cmd drbd_cmd_handler[] = {
4352 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4353 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4354 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4355 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4356 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4357 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4358 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4359 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4360 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4361 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4362 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4363 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4364 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4365 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4366 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4367 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4368 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4369 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4370 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4371 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4372 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4373 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4374 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4375 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4376 };
4377 
4378 static void drbdd(struct drbd_tconn *tconn)
4379 {
4380 	struct packet_info pi;
4381 	size_t shs; /* sub header size */
4382 	int err;
4383 
4384 	while (get_t_state(&tconn->receiver) == RUNNING) {
4385 		struct data_cmd *cmd;
4386 
4387 		drbd_thread_current_set_cpu(&tconn->receiver);
4388 		if (drbd_recv_header(tconn, &pi))
4389 			goto err_out;
4390 
4391 		cmd = &drbd_cmd_handler[pi.cmd];
4392 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4393 			conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4394 				 cmdname(pi.cmd), pi.cmd);
4395 			goto err_out;
4396 		}
4397 
4398 		shs = cmd->pkt_size;
4399 		if (pi.size > shs && !cmd->expect_payload) {
4400 			conn_err(tconn, "No payload expected %s l:%d\n",
4401 				 cmdname(pi.cmd), pi.size);
4402 			goto err_out;
4403 		}
4404 
4405 		if (shs) {
4406 			err = drbd_recv_all_warn(tconn, pi.data, shs);
4407 			if (err)
4408 				goto err_out;
4409 			pi.size -= shs;
4410 		}
4411 
4412 		err = cmd->fn(tconn, &pi);
4413 		if (err) {
4414 			conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4415 				 cmdname(pi.cmd), err, pi.size);
4416 			goto err_out;
4417 		}
4418 	}
4419 	return;
4420 
4421     err_out:
4422 	conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4423 }
4424 
4425 void conn_flush_workqueue(struct drbd_tconn *tconn)
4426 {
4427 	struct drbd_wq_barrier barr;
4428 
4429 	barr.w.cb = w_prev_work_done;
4430 	barr.w.tconn = tconn;
4431 	init_completion(&barr.done);
4432 	drbd_queue_work(&tconn->sender_work, &barr.w);
4433 	wait_for_completion(&barr.done);
4434 }
4435 
4436 static void conn_disconnect(struct drbd_tconn *tconn)
4437 {
4438 	struct drbd_conf *mdev;
4439 	enum drbd_conns oc;
4440 	int vnr;
4441 
4442 	if (tconn->cstate == C_STANDALONE)
4443 		return;
4444 
4445 	/* We are about to start the cleanup after connection loss.
4446 	 * Make sure drbd_make_request knows about that.
4447 	 * Usually we should be in some network failure state already,
4448 	 * but just in case we are not, we fix it up here.
4449 	 */
4450 	conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4451 
4452 	/* asender does not clean up anything. it must not interfere, either */
4453 	drbd_thread_stop(&tconn->asender);
4454 	drbd_free_sock(tconn);
4455 
4456 	rcu_read_lock();
4457 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4458 		kref_get(&mdev->kref);
4459 		rcu_read_unlock();
4460 		drbd_disconnected(mdev);
4461 		kref_put(&mdev->kref, &drbd_minor_destroy);
4462 		rcu_read_lock();
4463 	}
4464 	rcu_read_unlock();
4465 
4466 	if (!list_empty(&tconn->current_epoch->list))
4467 		conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4468 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4469 	atomic_set(&tconn->current_epoch->epoch_size, 0);
4470 	tconn->send.seen_any_write_yet = false;
4471 
4472 	conn_info(tconn, "Connection closed\n");
4473 
4474 	if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4475 		conn_try_outdate_peer_async(tconn);
4476 
4477 	spin_lock_irq(&tconn->req_lock);
4478 	oc = tconn->cstate;
4479 	if (oc >= C_UNCONNECTED)
4480 		_conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4481 
4482 	spin_unlock_irq(&tconn->req_lock);
4483 
4484 	if (oc == C_DISCONNECTING)
4485 		conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4486 }
4487 
4488 static int drbd_disconnected(struct drbd_conf *mdev)
4489 {
4490 	unsigned int i;
4491 
4492 	/* wait for current activity to cease. */
4493 	spin_lock_irq(&mdev->tconn->req_lock);
4494 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4495 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4496 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4497 	spin_unlock_irq(&mdev->tconn->req_lock);
4498 
4499 	/* We do not have data structures that would allow us to
4500 	 * get the rs_pending_cnt down to 0 again.
4501 	 *  * On C_SYNC_TARGET we do not have any data structures describing
4502 	 *    the pending RSDataRequest's we have sent.
4503 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
4504 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4505 	 *  And no, it is not the sum of the reference counts in the
4506 	 *  resync_LRU. The resync_LRU tracks the whole operation including
4507 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4508 	 *  on the fly. */
4509 	drbd_rs_cancel_all(mdev);
4510 	mdev->rs_total = 0;
4511 	mdev->rs_failed = 0;
4512 	atomic_set(&mdev->rs_pending_cnt, 0);
4513 	wake_up(&mdev->misc_wait);
4514 
4515 	del_timer_sync(&mdev->resync_timer);
4516 	resync_timer_fn((unsigned long)mdev);
4517 
4518 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4519 	 * w_make_resync_request etc. which may still be on the worker queue
4520 	 * to be "canceled" */
4521 	drbd_flush_workqueue(mdev);
4522 
4523 	drbd_finish_peer_reqs(mdev);
4524 
4525 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4526 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
4527 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4528 	drbd_flush_workqueue(mdev);
4529 
4530 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
4531 	 * again via drbd_try_clear_on_disk_bm(). */
4532 	drbd_rs_cancel_all(mdev);
4533 
4534 	kfree(mdev->p_uuid);
4535 	mdev->p_uuid = NULL;
4536 
4537 	if (!drbd_suspended(mdev))
4538 		tl_clear(mdev->tconn);
4539 
4540 	drbd_md_sync(mdev);
4541 
4542 	/* serialize with bitmap writeout triggered by the state change,
4543 	 * if any. */
4544 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4545 
4546 	/* tcp_close and release of sendpage pages can be deferred.  I don't
4547 	 * want to use SO_LINGER, because apparently it can be deferred for
4548 	 * more than 20 seconds (longest time I checked).
4549 	 *
4550 	 * Actually we don't care for exactly when the network stack does its
4551 	 * put_page(), but release our reference on these pages right here.
4552 	 */
4553 	i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4554 	if (i)
4555 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4556 	i = atomic_read(&mdev->pp_in_use_by_net);
4557 	if (i)
4558 		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4559 	i = atomic_read(&mdev->pp_in_use);
4560 	if (i)
4561 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4562 
4563 	D_ASSERT(list_empty(&mdev->read_ee));
4564 	D_ASSERT(list_empty(&mdev->active_ee));
4565 	D_ASSERT(list_empty(&mdev->sync_ee));
4566 	D_ASSERT(list_empty(&mdev->done_ee));
4567 
4568 	return 0;
4569 }
4570 
4571 /*
4572  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4573  * we can agree on is stored in agreed_pro_version.
4574  *
4575  * feature flags and the reserved array should be enough room for future
4576  * enhancements of the handshake protocol, and possible plugins...
4577  *
4578  * for now, they are expected to be zero, but ignored.
4579  */
4580 static int drbd_send_features(struct drbd_tconn *tconn)
4581 {
4582 	struct drbd_socket *sock;
4583 	struct p_connection_features *p;
4584 
4585 	sock = &tconn->data;
4586 	p = conn_prepare_command(tconn, sock);
4587 	if (!p)
4588 		return -EIO;
4589 	memset(p, 0, sizeof(*p));
4590 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4591 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4592 	return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4593 }
4594 
4595 /*
4596  * return values:
4597  *   1 yes, we have a valid connection
4598  *   0 oops, did not work out, please try again
4599  *  -1 peer talks different language,
4600  *     no point in trying again, please go standalone.
4601  */
4602 static int drbd_do_features(struct drbd_tconn *tconn)
4603 {
4604 	/* ASSERT current == tconn->receiver ... */
4605 	struct p_connection_features *p;
4606 	const int expect = sizeof(struct p_connection_features);
4607 	struct packet_info pi;
4608 	int err;
4609 
4610 	err = drbd_send_features(tconn);
4611 	if (err)
4612 		return 0;
4613 
4614 	err = drbd_recv_header(tconn, &pi);
4615 	if (err)
4616 		return 0;
4617 
4618 	if (pi.cmd != P_CONNECTION_FEATURES) {
4619 		conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4620 			 cmdname(pi.cmd), pi.cmd);
4621 		return -1;
4622 	}
4623 
4624 	if (pi.size != expect) {
4625 		conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4626 		     expect, pi.size);
4627 		return -1;
4628 	}
4629 
4630 	p = pi.data;
4631 	err = drbd_recv_all_warn(tconn, p, expect);
4632 	if (err)
4633 		return 0;
4634 
4635 	p->protocol_min = be32_to_cpu(p->protocol_min);
4636 	p->protocol_max = be32_to_cpu(p->protocol_max);
4637 	if (p->protocol_max == 0)
4638 		p->protocol_max = p->protocol_min;
4639 
4640 	if (PRO_VERSION_MAX < p->protocol_min ||
4641 	    PRO_VERSION_MIN > p->protocol_max)
4642 		goto incompat;
4643 
4644 	tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4645 
4646 	conn_info(tconn, "Handshake successful: "
4647 	     "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4648 
4649 	return 1;
4650 
4651  incompat:
4652 	conn_err(tconn, "incompatible DRBD dialects: "
4653 	    "I support %d-%d, peer supports %d-%d\n",
4654 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4655 	    p->protocol_min, p->protocol_max);
4656 	return -1;
4657 }
4658 
4659 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4660 static int drbd_do_auth(struct drbd_tconn *tconn)
4661 {
4662 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4663 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4664 	return -1;
4665 }
4666 #else
4667 #define CHALLENGE_LEN 64
4668 
4669 /* Return value:
4670 	1 - auth succeeded,
4671 	0 - failed, try again (network error),
4672 	-1 - auth failed, don't try again.
4673 */
4674 
4675 static int drbd_do_auth(struct drbd_tconn *tconn)
4676 {
4677 	struct drbd_socket *sock;
4678 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4679 	struct scatterlist sg;
4680 	char *response = NULL;
4681 	char *right_response = NULL;
4682 	char *peers_ch = NULL;
4683 	unsigned int key_len;
4684 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
4685 	unsigned int resp_size;
4686 	struct hash_desc desc;
4687 	struct packet_info pi;
4688 	struct net_conf *nc;
4689 	int err, rv;
4690 
4691 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4692 
4693 	rcu_read_lock();
4694 	nc = rcu_dereference(tconn->net_conf);
4695 	key_len = strlen(nc->shared_secret);
4696 	memcpy(secret, nc->shared_secret, key_len);
4697 	rcu_read_unlock();
4698 
4699 	desc.tfm = tconn->cram_hmac_tfm;
4700 	desc.flags = 0;
4701 
4702 	rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4703 	if (rv) {
4704 		conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4705 		rv = -1;
4706 		goto fail;
4707 	}
4708 
4709 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4710 
4711 	sock = &tconn->data;
4712 	if (!conn_prepare_command(tconn, sock)) {
4713 		rv = 0;
4714 		goto fail;
4715 	}
4716 	rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4717 				my_challenge, CHALLENGE_LEN);
4718 	if (!rv)
4719 		goto fail;
4720 
4721 	err = drbd_recv_header(tconn, &pi);
4722 	if (err) {
4723 		rv = 0;
4724 		goto fail;
4725 	}
4726 
4727 	if (pi.cmd != P_AUTH_CHALLENGE) {
4728 		conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4729 			 cmdname(pi.cmd), pi.cmd);
4730 		rv = 0;
4731 		goto fail;
4732 	}
4733 
4734 	if (pi.size > CHALLENGE_LEN * 2) {
4735 		conn_err(tconn, "expected AuthChallenge payload too big.\n");
4736 		rv = -1;
4737 		goto fail;
4738 	}
4739 
4740 	peers_ch = kmalloc(pi.size, GFP_NOIO);
4741 	if (peers_ch == NULL) {
4742 		conn_err(tconn, "kmalloc of peers_ch failed\n");
4743 		rv = -1;
4744 		goto fail;
4745 	}
4746 
4747 	err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4748 	if (err) {
4749 		rv = 0;
4750 		goto fail;
4751 	}
4752 
4753 	resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4754 	response = kmalloc(resp_size, GFP_NOIO);
4755 	if (response == NULL) {
4756 		conn_err(tconn, "kmalloc of response failed\n");
4757 		rv = -1;
4758 		goto fail;
4759 	}
4760 
4761 	sg_init_table(&sg, 1);
4762 	sg_set_buf(&sg, peers_ch, pi.size);
4763 
4764 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4765 	if (rv) {
4766 		conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4767 		rv = -1;
4768 		goto fail;
4769 	}
4770 
4771 	if (!conn_prepare_command(tconn, sock)) {
4772 		rv = 0;
4773 		goto fail;
4774 	}
4775 	rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4776 				response, resp_size);
4777 	if (!rv)
4778 		goto fail;
4779 
4780 	err = drbd_recv_header(tconn, &pi);
4781 	if (err) {
4782 		rv = 0;
4783 		goto fail;
4784 	}
4785 
4786 	if (pi.cmd != P_AUTH_RESPONSE) {
4787 		conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4788 			 cmdname(pi.cmd), pi.cmd);
4789 		rv = 0;
4790 		goto fail;
4791 	}
4792 
4793 	if (pi.size != resp_size) {
4794 		conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4795 		rv = 0;
4796 		goto fail;
4797 	}
4798 
4799 	err = drbd_recv_all_warn(tconn, response , resp_size);
4800 	if (err) {
4801 		rv = 0;
4802 		goto fail;
4803 	}
4804 
4805 	right_response = kmalloc(resp_size, GFP_NOIO);
4806 	if (right_response == NULL) {
4807 		conn_err(tconn, "kmalloc of right_response failed\n");
4808 		rv = -1;
4809 		goto fail;
4810 	}
4811 
4812 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4813 
4814 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4815 	if (rv) {
4816 		conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4817 		rv = -1;
4818 		goto fail;
4819 	}
4820 
4821 	rv = !memcmp(response, right_response, resp_size);
4822 
4823 	if (rv)
4824 		conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4825 		     resp_size);
4826 	else
4827 		rv = -1;
4828 
4829  fail:
4830 	kfree(peers_ch);
4831 	kfree(response);
4832 	kfree(right_response);
4833 
4834 	return rv;
4835 }
4836 #endif
4837 
4838 int drbdd_init(struct drbd_thread *thi)
4839 {
4840 	struct drbd_tconn *tconn = thi->tconn;
4841 	int h;
4842 
4843 	conn_info(tconn, "receiver (re)started\n");
4844 
4845 	do {
4846 		h = conn_connect(tconn);
4847 		if (h == 0) {
4848 			conn_disconnect(tconn);
4849 			schedule_timeout_interruptible(HZ);
4850 		}
4851 		if (h == -1) {
4852 			conn_warn(tconn, "Discarding network configuration.\n");
4853 			conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4854 		}
4855 	} while (h == 0);
4856 
4857 	if (h > 0)
4858 		drbdd(tconn);
4859 
4860 	conn_disconnect(tconn);
4861 
4862 	conn_info(tconn, "receiver terminated\n");
4863 	return 0;
4864 }
4865 
4866 /* ********* acknowledge sender ******** */
4867 
4868 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4869 {
4870 	struct p_req_state_reply *p = pi->data;
4871 	int retcode = be32_to_cpu(p->retcode);
4872 
4873 	if (retcode >= SS_SUCCESS) {
4874 		set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4875 	} else {
4876 		set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4877 		conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4878 			 drbd_set_st_err_str(retcode), retcode);
4879 	}
4880 	wake_up(&tconn->ping_wait);
4881 
4882 	return 0;
4883 }
4884 
4885 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4886 {
4887 	struct drbd_conf *mdev;
4888 	struct p_req_state_reply *p = pi->data;
4889 	int retcode = be32_to_cpu(p->retcode);
4890 
4891 	mdev = vnr_to_mdev(tconn, pi->vnr);
4892 	if (!mdev)
4893 		return -EIO;
4894 
4895 	if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4896 		D_ASSERT(tconn->agreed_pro_version < 100);
4897 		return got_conn_RqSReply(tconn, pi);
4898 	}
4899 
4900 	if (retcode >= SS_SUCCESS) {
4901 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4902 	} else {
4903 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4904 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4905 			drbd_set_st_err_str(retcode), retcode);
4906 	}
4907 	wake_up(&mdev->state_wait);
4908 
4909 	return 0;
4910 }
4911 
4912 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4913 {
4914 	return drbd_send_ping_ack(tconn);
4915 
4916 }
4917 
4918 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4919 {
4920 	/* restore idle timeout */
4921 	tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4922 	if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4923 		wake_up(&tconn->ping_wait);
4924 
4925 	return 0;
4926 }
4927 
4928 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4929 {
4930 	struct drbd_conf *mdev;
4931 	struct p_block_ack *p = pi->data;
4932 	sector_t sector = be64_to_cpu(p->sector);
4933 	int blksize = be32_to_cpu(p->blksize);
4934 
4935 	mdev = vnr_to_mdev(tconn, pi->vnr);
4936 	if (!mdev)
4937 		return -EIO;
4938 
4939 	D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4940 
4941 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4942 
4943 	if (get_ldev(mdev)) {
4944 		drbd_rs_complete_io(mdev, sector);
4945 		drbd_set_in_sync(mdev, sector, blksize);
4946 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4947 		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4948 		put_ldev(mdev);
4949 	}
4950 	dec_rs_pending(mdev);
4951 	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4952 
4953 	return 0;
4954 }
4955 
4956 static int
4957 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4958 			      struct rb_root *root, const char *func,
4959 			      enum drbd_req_event what, bool missing_ok)
4960 {
4961 	struct drbd_request *req;
4962 	struct bio_and_error m;
4963 
4964 	spin_lock_irq(&mdev->tconn->req_lock);
4965 	req = find_request(mdev, root, id, sector, missing_ok, func);
4966 	if (unlikely(!req)) {
4967 		spin_unlock_irq(&mdev->tconn->req_lock);
4968 		return -EIO;
4969 	}
4970 	__req_mod(req, what, &m);
4971 	spin_unlock_irq(&mdev->tconn->req_lock);
4972 
4973 	if (m.bio)
4974 		complete_master_bio(mdev, &m);
4975 	return 0;
4976 }
4977 
4978 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4979 {
4980 	struct drbd_conf *mdev;
4981 	struct p_block_ack *p = pi->data;
4982 	sector_t sector = be64_to_cpu(p->sector);
4983 	int blksize = be32_to_cpu(p->blksize);
4984 	enum drbd_req_event what;
4985 
4986 	mdev = vnr_to_mdev(tconn, pi->vnr);
4987 	if (!mdev)
4988 		return -EIO;
4989 
4990 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4991 
4992 	if (p->block_id == ID_SYNCER) {
4993 		drbd_set_in_sync(mdev, sector, blksize);
4994 		dec_rs_pending(mdev);
4995 		return 0;
4996 	}
4997 	switch (pi->cmd) {
4998 	case P_RS_WRITE_ACK:
4999 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5000 		break;
5001 	case P_WRITE_ACK:
5002 		what = WRITE_ACKED_BY_PEER;
5003 		break;
5004 	case P_RECV_ACK:
5005 		what = RECV_ACKED_BY_PEER;
5006 		break;
5007 	case P_SUPERSEDED:
5008 		what = CONFLICT_RESOLVED;
5009 		break;
5010 	case P_RETRY_WRITE:
5011 		what = POSTPONE_WRITE;
5012 		break;
5013 	default:
5014 		BUG();
5015 	}
5016 
5017 	return validate_req_change_req_state(mdev, p->block_id, sector,
5018 					     &mdev->write_requests, __func__,
5019 					     what, false);
5020 }
5021 
5022 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5023 {
5024 	struct drbd_conf *mdev;
5025 	struct p_block_ack *p = pi->data;
5026 	sector_t sector = be64_to_cpu(p->sector);
5027 	int size = be32_to_cpu(p->blksize);
5028 	int err;
5029 
5030 	mdev = vnr_to_mdev(tconn, pi->vnr);
5031 	if (!mdev)
5032 		return -EIO;
5033 
5034 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5035 
5036 	if (p->block_id == ID_SYNCER) {
5037 		dec_rs_pending(mdev);
5038 		drbd_rs_failed_io(mdev, sector, size);
5039 		return 0;
5040 	}
5041 
5042 	err = validate_req_change_req_state(mdev, p->block_id, sector,
5043 					    &mdev->write_requests, __func__,
5044 					    NEG_ACKED, true);
5045 	if (err) {
5046 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5047 		   The master bio might already be completed, therefore the
5048 		   request is no longer in the collision hash. */
5049 		/* In Protocol B we might already have got a P_RECV_ACK
5050 		   but then get a P_NEG_ACK afterwards. */
5051 		drbd_set_out_of_sync(mdev, sector, size);
5052 	}
5053 	return 0;
5054 }
5055 
5056 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5057 {
5058 	struct drbd_conf *mdev;
5059 	struct p_block_ack *p = pi->data;
5060 	sector_t sector = be64_to_cpu(p->sector);
5061 
5062 	mdev = vnr_to_mdev(tconn, pi->vnr);
5063 	if (!mdev)
5064 		return -EIO;
5065 
5066 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5067 
5068 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5069 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5070 
5071 	return validate_req_change_req_state(mdev, p->block_id, sector,
5072 					     &mdev->read_requests, __func__,
5073 					     NEG_ACKED, false);
5074 }
5075 
5076 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5077 {
5078 	struct drbd_conf *mdev;
5079 	sector_t sector;
5080 	int size;
5081 	struct p_block_ack *p = pi->data;
5082 
5083 	mdev = vnr_to_mdev(tconn, pi->vnr);
5084 	if (!mdev)
5085 		return -EIO;
5086 
5087 	sector = be64_to_cpu(p->sector);
5088 	size = be32_to_cpu(p->blksize);
5089 
5090 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5091 
5092 	dec_rs_pending(mdev);
5093 
5094 	if (get_ldev_if_state(mdev, D_FAILED)) {
5095 		drbd_rs_complete_io(mdev, sector);
5096 		switch (pi->cmd) {
5097 		case P_NEG_RS_DREPLY:
5098 			drbd_rs_failed_io(mdev, sector, size);
5099 		case P_RS_CANCEL:
5100 			break;
5101 		default:
5102 			BUG();
5103 		}
5104 		put_ldev(mdev);
5105 	}
5106 
5107 	return 0;
5108 }
5109 
5110 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5111 {
5112 	struct p_barrier_ack *p = pi->data;
5113 	struct drbd_conf *mdev;
5114 	int vnr;
5115 
5116 	tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5117 
5118 	rcu_read_lock();
5119 	idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5120 		if (mdev->state.conn == C_AHEAD &&
5121 		    atomic_read(&mdev->ap_in_flight) == 0 &&
5122 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5123 			mdev->start_resync_timer.expires = jiffies + HZ;
5124 			add_timer(&mdev->start_resync_timer);
5125 		}
5126 	}
5127 	rcu_read_unlock();
5128 
5129 	return 0;
5130 }
5131 
5132 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5133 {
5134 	struct drbd_conf *mdev;
5135 	struct p_block_ack *p = pi->data;
5136 	struct drbd_work *w;
5137 	sector_t sector;
5138 	int size;
5139 
5140 	mdev = vnr_to_mdev(tconn, pi->vnr);
5141 	if (!mdev)
5142 		return -EIO;
5143 
5144 	sector = be64_to_cpu(p->sector);
5145 	size = be32_to_cpu(p->blksize);
5146 
5147 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5148 
5149 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5150 		drbd_ov_out_of_sync_found(mdev, sector, size);
5151 	else
5152 		ov_out_of_sync_print(mdev);
5153 
5154 	if (!get_ldev(mdev))
5155 		return 0;
5156 
5157 	drbd_rs_complete_io(mdev, sector);
5158 	dec_rs_pending(mdev);
5159 
5160 	--mdev->ov_left;
5161 
5162 	/* let's advance progress step marks only for every other megabyte */
5163 	if ((mdev->ov_left & 0x200) == 0x200)
5164 		drbd_advance_rs_marks(mdev, mdev->ov_left);
5165 
5166 	if (mdev->ov_left == 0) {
5167 		w = kmalloc(sizeof(*w), GFP_NOIO);
5168 		if (w) {
5169 			w->cb = w_ov_finished;
5170 			w->mdev = mdev;
5171 			drbd_queue_work(&mdev->tconn->sender_work, w);
5172 		} else {
5173 			dev_err(DEV, "kmalloc(w) failed.");
5174 			ov_out_of_sync_print(mdev);
5175 			drbd_resync_finished(mdev);
5176 		}
5177 	}
5178 	put_ldev(mdev);
5179 	return 0;
5180 }
5181 
5182 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5183 {
5184 	return 0;
5185 }
5186 
5187 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5188 {
5189 	struct drbd_conf *mdev;
5190 	int vnr, not_empty = 0;
5191 
5192 	do {
5193 		clear_bit(SIGNAL_ASENDER, &tconn->flags);
5194 		flush_signals(current);
5195 
5196 		rcu_read_lock();
5197 		idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5198 			kref_get(&mdev->kref);
5199 			rcu_read_unlock();
5200 			if (drbd_finish_peer_reqs(mdev)) {
5201 				kref_put(&mdev->kref, &drbd_minor_destroy);
5202 				return 1;
5203 			}
5204 			kref_put(&mdev->kref, &drbd_minor_destroy);
5205 			rcu_read_lock();
5206 		}
5207 		set_bit(SIGNAL_ASENDER, &tconn->flags);
5208 
5209 		spin_lock_irq(&tconn->req_lock);
5210 		idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5211 			not_empty = !list_empty(&mdev->done_ee);
5212 			if (not_empty)
5213 				break;
5214 		}
5215 		spin_unlock_irq(&tconn->req_lock);
5216 		rcu_read_unlock();
5217 	} while (not_empty);
5218 
5219 	return 0;
5220 }
5221 
5222 struct asender_cmd {
5223 	size_t pkt_size;
5224 	int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5225 };
5226 
5227 static struct asender_cmd asender_tbl[] = {
5228 	[P_PING]	    = { 0, got_Ping },
5229 	[P_PING_ACK]	    = { 0, got_PingAck },
5230 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5231 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5232 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5233 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5234 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5235 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5236 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5237 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5238 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5239 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5240 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5241 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5242 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5243 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5244 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5245 };
5246 
5247 int drbd_asender(struct drbd_thread *thi)
5248 {
5249 	struct drbd_tconn *tconn = thi->tconn;
5250 	struct asender_cmd *cmd = NULL;
5251 	struct packet_info pi;
5252 	int rv;
5253 	void *buf    = tconn->meta.rbuf;
5254 	int received = 0;
5255 	unsigned int header_size = drbd_header_size(tconn);
5256 	int expect   = header_size;
5257 	bool ping_timeout_active = false;
5258 	struct net_conf *nc;
5259 	int ping_timeo, tcp_cork, ping_int;
5260 
5261 	current->policy = SCHED_RR;  /* Make this a realtime task! */
5262 	current->rt_priority = 2;    /* more important than all other tasks */
5263 
5264 	while (get_t_state(thi) == RUNNING) {
5265 		drbd_thread_current_set_cpu(thi);
5266 
5267 		rcu_read_lock();
5268 		nc = rcu_dereference(tconn->net_conf);
5269 		ping_timeo = nc->ping_timeo;
5270 		tcp_cork = nc->tcp_cork;
5271 		ping_int = nc->ping_int;
5272 		rcu_read_unlock();
5273 
5274 		if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5275 			if (drbd_send_ping(tconn)) {
5276 				conn_err(tconn, "drbd_send_ping has failed\n");
5277 				goto reconnect;
5278 			}
5279 			tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5280 			ping_timeout_active = true;
5281 		}
5282 
5283 		/* TODO: conditionally cork; it may hurt latency if we cork without
5284 		   much to send */
5285 		if (tcp_cork)
5286 			drbd_tcp_cork(tconn->meta.socket);
5287 		if (tconn_finish_peer_reqs(tconn)) {
5288 			conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5289 			goto reconnect;
5290 		}
5291 		/* but unconditionally uncork unless disabled */
5292 		if (tcp_cork)
5293 			drbd_tcp_uncork(tconn->meta.socket);
5294 
5295 		/* short circuit, recv_msg would return EINTR anyways. */
5296 		if (signal_pending(current))
5297 			continue;
5298 
5299 		rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5300 		clear_bit(SIGNAL_ASENDER, &tconn->flags);
5301 
5302 		flush_signals(current);
5303 
5304 		/* Note:
5305 		 * -EINTR	 (on meta) we got a signal
5306 		 * -EAGAIN	 (on meta) rcvtimeo expired
5307 		 * -ECONNRESET	 other side closed the connection
5308 		 * -ERESTARTSYS  (on data) we got a signal
5309 		 * rv <  0	 other than above: unexpected error!
5310 		 * rv == expected: full header or command
5311 		 * rv <  expected: "woken" by signal during receive
5312 		 * rv == 0	 : "connection shut down by peer"
5313 		 */
5314 		if (likely(rv > 0)) {
5315 			received += rv;
5316 			buf	 += rv;
5317 		} else if (rv == 0) {
5318 			if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5319 				long t;
5320 				rcu_read_lock();
5321 				t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5322 				rcu_read_unlock();
5323 
5324 				t = wait_event_timeout(tconn->ping_wait,
5325 						       tconn->cstate < C_WF_REPORT_PARAMS,
5326 						       t);
5327 				if (t)
5328 					break;
5329 			}
5330 			conn_err(tconn, "meta connection shut down by peer.\n");
5331 			goto reconnect;
5332 		} else if (rv == -EAGAIN) {
5333 			/* If the data socket received something meanwhile,
5334 			 * that is good enough: peer is still alive. */
5335 			if (time_after(tconn->last_received,
5336 				jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5337 				continue;
5338 			if (ping_timeout_active) {
5339 				conn_err(tconn, "PingAck did not arrive in time.\n");
5340 				goto reconnect;
5341 			}
5342 			set_bit(SEND_PING, &tconn->flags);
5343 			continue;
5344 		} else if (rv == -EINTR) {
5345 			continue;
5346 		} else {
5347 			conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5348 			goto reconnect;
5349 		}
5350 
5351 		if (received == expect && cmd == NULL) {
5352 			if (decode_header(tconn, tconn->meta.rbuf, &pi))
5353 				goto reconnect;
5354 			cmd = &asender_tbl[pi.cmd];
5355 			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5356 				conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5357 					 cmdname(pi.cmd), pi.cmd);
5358 				goto disconnect;
5359 			}
5360 			expect = header_size + cmd->pkt_size;
5361 			if (pi.size != expect - header_size) {
5362 				conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5363 					pi.cmd, pi.size);
5364 				goto reconnect;
5365 			}
5366 		}
5367 		if (received == expect) {
5368 			bool err;
5369 
5370 			err = cmd->fn(tconn, &pi);
5371 			if (err) {
5372 				conn_err(tconn, "%pf failed\n", cmd->fn);
5373 				goto reconnect;
5374 			}
5375 
5376 			tconn->last_received = jiffies;
5377 
5378 			if (cmd == &asender_tbl[P_PING_ACK]) {
5379 				/* restore idle timeout */
5380 				tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5381 				ping_timeout_active = false;
5382 			}
5383 
5384 			buf	 = tconn->meta.rbuf;
5385 			received = 0;
5386 			expect	 = header_size;
5387 			cmd	 = NULL;
5388 		}
5389 	}
5390 
5391 	if (0) {
5392 reconnect:
5393 		conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5394 		conn_md_sync(tconn);
5395 	}
5396 	if (0) {
5397 disconnect:
5398 		conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5399 	}
5400 	clear_bit(SIGNAL_ASENDER, &tconn->flags);
5401 
5402 	conn_info(tconn, "asender terminated\n");
5403 
5404 	return 0;
5405 }
5406