1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48 
49 #include "drbd_vli.h"
50 
51 enum finish_epoch {
52 	FE_STILL_LIVE,
53 	FE_DESTROYED,
54 	FE_RECYCLED,
55 };
56 
57 static int drbd_do_handshake(struct drbd_conf *mdev);
58 static int drbd_do_auth(struct drbd_conf *mdev);
59 
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
62 
63 
64 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
65 
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70 
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77 	struct page *page;
78 	struct page *tmp;
79 
80 	BUG_ON(!n);
81 	BUG_ON(!head);
82 
83 	page = *head;
84 
85 	if (!page)
86 		return NULL;
87 
88 	while (page) {
89 		tmp = page_chain_next(page);
90 		if (--n == 0)
91 			break; /* found sufficient pages */
92 		if (tmp == NULL)
93 			/* insufficient pages, don't use any of them. */
94 			return NULL;
95 		page = tmp;
96 	}
97 
98 	/* add end of list marker for the returned list */
99 	set_page_private(page, 0);
100 	/* actual return value, and adjustment of head */
101 	page = *head;
102 	*head = tmp;
103 	return page;
104 }
105 
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111 	struct page *tmp;
112 	int i = 1;
113 	while ((tmp = page_chain_next(page)))
114 		++i, page = tmp;
115 	if (len)
116 		*len = i;
117 	return page;
118 }
119 
120 static int page_chain_free(struct page *page)
121 {
122 	struct page *tmp;
123 	int i = 0;
124 	page_chain_for_each_safe(page, tmp) {
125 		put_page(page);
126 		++i;
127 	}
128 	return i;
129 }
130 
131 static void page_chain_add(struct page **head,
132 		struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135 	struct page *tmp;
136 	tmp = page_chain_tail(chain_first, NULL);
137 	BUG_ON(tmp != chain_last);
138 #endif
139 
140 	/* add chain to head */
141 	set_page_private(chain_last, (unsigned long)*head);
142 	*head = chain_first;
143 }
144 
145 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
146 {
147 	struct page *page = NULL;
148 	struct page *tmp = NULL;
149 	int i = 0;
150 
151 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
152 	 * So what. It saves a spin_lock. */
153 	if (drbd_pp_vacant >= number) {
154 		spin_lock(&drbd_pp_lock);
155 		page = page_chain_del(&drbd_pp_pool, number);
156 		if (page)
157 			drbd_pp_vacant -= number;
158 		spin_unlock(&drbd_pp_lock);
159 		if (page)
160 			return page;
161 	}
162 
163 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
164 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
165 	 * which in turn might block on the other node at this very place.  */
166 	for (i = 0; i < number; i++) {
167 		tmp = alloc_page(GFP_TRY);
168 		if (!tmp)
169 			break;
170 		set_page_private(tmp, (unsigned long)page);
171 		page = tmp;
172 	}
173 
174 	if (i == number)
175 		return page;
176 
177 	/* Not enough pages immediately available this time.
178 	 * No need to jump around here, drbd_pp_alloc will retry this
179 	 * function "soon". */
180 	if (page) {
181 		tmp = page_chain_tail(page, NULL);
182 		spin_lock(&drbd_pp_lock);
183 		page_chain_add(&drbd_pp_pool, page, tmp);
184 		drbd_pp_vacant += i;
185 		spin_unlock(&drbd_pp_lock);
186 	}
187 	return NULL;
188 }
189 
190 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
191 {
192 	struct drbd_epoch_entry *e;
193 	struct list_head *le, *tle;
194 
195 	/* The EEs are always appended to the end of the list. Since
196 	   they are sent in order over the wire, they have to finish
197 	   in order. As soon as we see the first not finished we can
198 	   stop to examine the list... */
199 
200 	list_for_each_safe(le, tle, &mdev->net_ee) {
201 		e = list_entry(le, struct drbd_epoch_entry, w.list);
202 		if (drbd_ee_has_active_page(e))
203 			break;
204 		list_move(le, to_be_freed);
205 	}
206 }
207 
208 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
209 {
210 	LIST_HEAD(reclaimed);
211 	struct drbd_epoch_entry *e, *t;
212 
213 	spin_lock_irq(&mdev->req_lock);
214 	reclaim_net_ee(mdev, &reclaimed);
215 	spin_unlock_irq(&mdev->req_lock);
216 
217 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
218 		drbd_free_net_ee(mdev, e);
219 }
220 
221 /**
222  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
223  * @mdev:	DRBD device.
224  * @number:	number of pages requested
225  * @retry:	whether to retry, if not enough pages are available right now
226  *
227  * Tries to allocate number pages, first from our own page pool, then from
228  * the kernel, unless this allocation would exceed the max_buffers setting.
229  * Possibly retry until DRBD frees sufficient pages somewhere else.
230  *
231  * Returns a page chain linked via page->private.
232  */
233 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
234 {
235 	struct page *page = NULL;
236 	DEFINE_WAIT(wait);
237 
238 	/* Yes, we may run up to @number over max_buffers. If we
239 	 * follow it strictly, the admin will get it wrong anyways. */
240 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
241 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
242 
243 	while (page == NULL) {
244 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
245 
246 		drbd_kick_lo_and_reclaim_net(mdev);
247 
248 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
249 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
250 			if (page)
251 				break;
252 		}
253 
254 		if (!retry)
255 			break;
256 
257 		if (signal_pending(current)) {
258 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
259 			break;
260 		}
261 
262 		schedule();
263 	}
264 	finish_wait(&drbd_pp_wait, &wait);
265 
266 	if (page)
267 		atomic_add(number, &mdev->pp_in_use);
268 	return page;
269 }
270 
271 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
272  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
273  * Either links the page chain back to the global pool,
274  * or returns all pages to the system. */
275 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
276 {
277 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
278 	int i;
279 
280 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
281 		i = page_chain_free(page);
282 	else {
283 		struct page *tmp;
284 		tmp = page_chain_tail(page, &i);
285 		spin_lock(&drbd_pp_lock);
286 		page_chain_add(&drbd_pp_pool, page, tmp);
287 		drbd_pp_vacant += i;
288 		spin_unlock(&drbd_pp_lock);
289 	}
290 	i = atomic_sub_return(i, a);
291 	if (i < 0)
292 		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
293 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
294 	wake_up(&drbd_pp_wait);
295 }
296 
297 /*
298 You need to hold the req_lock:
299  _drbd_wait_ee_list_empty()
300 
301 You must not have the req_lock:
302  drbd_free_ee()
303  drbd_alloc_ee()
304  drbd_init_ee()
305  drbd_release_ee()
306  drbd_ee_fix_bhs()
307  drbd_process_done_ee()
308  drbd_clear_done_ee()
309  drbd_wait_ee_list_empty()
310 */
311 
312 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
313 				     u64 id,
314 				     sector_t sector,
315 				     unsigned int data_size,
316 				     gfp_t gfp_mask) __must_hold(local)
317 {
318 	struct drbd_epoch_entry *e;
319 	struct page *page;
320 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
321 
322 	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
323 		return NULL;
324 
325 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
326 	if (!e) {
327 		if (!(gfp_mask & __GFP_NOWARN))
328 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
329 		return NULL;
330 	}
331 
332 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
333 	if (!page)
334 		goto fail;
335 
336 	INIT_HLIST_NODE(&e->collision);
337 	e->epoch = NULL;
338 	e->mdev = mdev;
339 	e->pages = page;
340 	atomic_set(&e->pending_bios, 0);
341 	e->size = data_size;
342 	e->flags = 0;
343 	e->sector = sector;
344 	e->block_id = id;
345 
346 	return e;
347 
348  fail:
349 	mempool_free(e, drbd_ee_mempool);
350 	return NULL;
351 }
352 
353 void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
354 {
355 	if (e->flags & EE_HAS_DIGEST)
356 		kfree(e->digest);
357 	drbd_pp_free(mdev, e->pages, is_net);
358 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
359 	D_ASSERT(hlist_unhashed(&e->collision));
360 	mempool_free(e, drbd_ee_mempool);
361 }
362 
363 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
364 {
365 	LIST_HEAD(work_list);
366 	struct drbd_epoch_entry *e, *t;
367 	int count = 0;
368 	int is_net = list == &mdev->net_ee;
369 
370 	spin_lock_irq(&mdev->req_lock);
371 	list_splice_init(list, &work_list);
372 	spin_unlock_irq(&mdev->req_lock);
373 
374 	list_for_each_entry_safe(e, t, &work_list, w.list) {
375 		drbd_free_some_ee(mdev, e, is_net);
376 		count++;
377 	}
378 	return count;
379 }
380 
381 
382 /*
383  * This function is called from _asender only_
384  * but see also comments in _req_mod(,barrier_acked)
385  * and receive_Barrier.
386  *
387  * Move entries from net_ee to done_ee, if ready.
388  * Grab done_ee, call all callbacks, free the entries.
389  * The callbacks typically send out ACKs.
390  */
391 static int drbd_process_done_ee(struct drbd_conf *mdev)
392 {
393 	LIST_HEAD(work_list);
394 	LIST_HEAD(reclaimed);
395 	struct drbd_epoch_entry *e, *t;
396 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
397 
398 	spin_lock_irq(&mdev->req_lock);
399 	reclaim_net_ee(mdev, &reclaimed);
400 	list_splice_init(&mdev->done_ee, &work_list);
401 	spin_unlock_irq(&mdev->req_lock);
402 
403 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
404 		drbd_free_net_ee(mdev, e);
405 
406 	/* possible callbacks here:
407 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
408 	 * all ignore the last argument.
409 	 */
410 	list_for_each_entry_safe(e, t, &work_list, w.list) {
411 		/* list_del not necessary, next/prev members not touched */
412 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
413 		drbd_free_ee(mdev, e);
414 	}
415 	wake_up(&mdev->ee_wait);
416 
417 	return ok;
418 }
419 
420 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
421 {
422 	DEFINE_WAIT(wait);
423 
424 	/* avoids spin_lock/unlock
425 	 * and calling prepare_to_wait in the fast path */
426 	while (!list_empty(head)) {
427 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
428 		spin_unlock_irq(&mdev->req_lock);
429 		io_schedule();
430 		finish_wait(&mdev->ee_wait, &wait);
431 		spin_lock_irq(&mdev->req_lock);
432 	}
433 }
434 
435 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
436 {
437 	spin_lock_irq(&mdev->req_lock);
438 	_drbd_wait_ee_list_empty(mdev, head);
439 	spin_unlock_irq(&mdev->req_lock);
440 }
441 
442 /* see also kernel_accept; which is only present since 2.6.18.
443  * also we want to log which part of it failed, exactly */
444 static int drbd_accept(struct drbd_conf *mdev, const char **what,
445 		struct socket *sock, struct socket **newsock)
446 {
447 	struct sock *sk = sock->sk;
448 	int err = 0;
449 
450 	*what = "listen";
451 	err = sock->ops->listen(sock, 5);
452 	if (err < 0)
453 		goto out;
454 
455 	*what = "sock_create_lite";
456 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
457 			       newsock);
458 	if (err < 0)
459 		goto out;
460 
461 	*what = "accept";
462 	err = sock->ops->accept(sock, *newsock, 0);
463 	if (err < 0) {
464 		sock_release(*newsock);
465 		*newsock = NULL;
466 		goto out;
467 	}
468 	(*newsock)->ops  = sock->ops;
469 	__module_get((*newsock)->ops->owner);
470 
471 out:
472 	return err;
473 }
474 
475 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
476 		    void *buf, size_t size, int flags)
477 {
478 	mm_segment_t oldfs;
479 	struct kvec iov = {
480 		.iov_base = buf,
481 		.iov_len = size,
482 	};
483 	struct msghdr msg = {
484 		.msg_iovlen = 1,
485 		.msg_iov = (struct iovec *)&iov,
486 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
487 	};
488 	int rv;
489 
490 	oldfs = get_fs();
491 	set_fs(KERNEL_DS);
492 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
493 	set_fs(oldfs);
494 
495 	return rv;
496 }
497 
498 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
499 {
500 	mm_segment_t oldfs;
501 	struct kvec iov = {
502 		.iov_base = buf,
503 		.iov_len = size,
504 	};
505 	struct msghdr msg = {
506 		.msg_iovlen = 1,
507 		.msg_iov = (struct iovec *)&iov,
508 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
509 	};
510 	int rv;
511 
512 	oldfs = get_fs();
513 	set_fs(KERNEL_DS);
514 
515 	for (;;) {
516 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
517 		if (rv == size)
518 			break;
519 
520 		/* Note:
521 		 * ECONNRESET	other side closed the connection
522 		 * ERESTARTSYS	(on  sock) we got a signal
523 		 */
524 
525 		if (rv < 0) {
526 			if (rv == -ECONNRESET)
527 				dev_info(DEV, "sock was reset by peer\n");
528 			else if (rv != -ERESTARTSYS)
529 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
530 			break;
531 		} else if (rv == 0) {
532 			dev_info(DEV, "sock was shut down by peer\n");
533 			break;
534 		} else	{
535 			/* signal came in, or peer/link went down,
536 			 * after we read a partial message
537 			 */
538 			/* D_ASSERT(signal_pending(current)); */
539 			break;
540 		}
541 	};
542 
543 	set_fs(oldfs);
544 
545 	if (rv != size)
546 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
547 
548 	return rv;
549 }
550 
551 /* quoting tcp(7):
552  *   On individual connections, the socket buffer size must be set prior to the
553  *   listen(2) or connect(2) calls in order to have it take effect.
554  * This is our wrapper to do so.
555  */
556 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
557 		unsigned int rcv)
558 {
559 	/* open coded SO_SNDBUF, SO_RCVBUF */
560 	if (snd) {
561 		sock->sk->sk_sndbuf = snd;
562 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
563 	}
564 	if (rcv) {
565 		sock->sk->sk_rcvbuf = rcv;
566 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
567 	}
568 }
569 
570 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
571 {
572 	const char *what;
573 	struct socket *sock;
574 	struct sockaddr_in6 src_in6;
575 	int err;
576 	int disconnect_on_error = 1;
577 
578 	if (!get_net_conf(mdev))
579 		return NULL;
580 
581 	what = "sock_create_kern";
582 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
583 		SOCK_STREAM, IPPROTO_TCP, &sock);
584 	if (err < 0) {
585 		sock = NULL;
586 		goto out;
587 	}
588 
589 	sock->sk->sk_rcvtimeo =
590 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
591 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
592 			mdev->net_conf->rcvbuf_size);
593 
594        /* explicitly bind to the configured IP as source IP
595 	*  for the outgoing connections.
596 	*  This is needed for multihomed hosts and to be
597 	*  able to use lo: interfaces for drbd.
598 	* Make sure to use 0 as port number, so linux selects
599 	*  a free one dynamically.
600 	*/
601 	memcpy(&src_in6, mdev->net_conf->my_addr,
602 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
603 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
604 		src_in6.sin6_port = 0;
605 	else
606 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
607 
608 	what = "bind before connect";
609 	err = sock->ops->bind(sock,
610 			      (struct sockaddr *) &src_in6,
611 			      mdev->net_conf->my_addr_len);
612 	if (err < 0)
613 		goto out;
614 
615 	/* connect may fail, peer not yet available.
616 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
617 	disconnect_on_error = 0;
618 	what = "connect";
619 	err = sock->ops->connect(sock,
620 				 (struct sockaddr *)mdev->net_conf->peer_addr,
621 				 mdev->net_conf->peer_addr_len, 0);
622 
623 out:
624 	if (err < 0) {
625 		if (sock) {
626 			sock_release(sock);
627 			sock = NULL;
628 		}
629 		switch (-err) {
630 			/* timeout, busy, signal pending */
631 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
632 		case EINTR: case ERESTARTSYS:
633 			/* peer not (yet) available, network problem */
634 		case ECONNREFUSED: case ENETUNREACH:
635 		case EHOSTDOWN:    case EHOSTUNREACH:
636 			disconnect_on_error = 0;
637 			break;
638 		default:
639 			dev_err(DEV, "%s failed, err = %d\n", what, err);
640 		}
641 		if (disconnect_on_error)
642 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
643 	}
644 	put_net_conf(mdev);
645 	return sock;
646 }
647 
648 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
649 {
650 	int timeo, err;
651 	struct socket *s_estab = NULL, *s_listen;
652 	const char *what;
653 
654 	if (!get_net_conf(mdev))
655 		return NULL;
656 
657 	what = "sock_create_kern";
658 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
659 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
660 	if (err) {
661 		s_listen = NULL;
662 		goto out;
663 	}
664 
665 	timeo = mdev->net_conf->try_connect_int * HZ;
666 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
667 
668 	s_listen->sk->sk_reuse    = SK_CAN_REUSE; /* SO_REUSEADDR */
669 	s_listen->sk->sk_rcvtimeo = timeo;
670 	s_listen->sk->sk_sndtimeo = timeo;
671 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
672 			mdev->net_conf->rcvbuf_size);
673 
674 	what = "bind before listen";
675 	err = s_listen->ops->bind(s_listen,
676 			      (struct sockaddr *) mdev->net_conf->my_addr,
677 			      mdev->net_conf->my_addr_len);
678 	if (err < 0)
679 		goto out;
680 
681 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
682 
683 out:
684 	if (s_listen)
685 		sock_release(s_listen);
686 	if (err < 0) {
687 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
688 			dev_err(DEV, "%s failed, err = %d\n", what, err);
689 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
690 		}
691 	}
692 	put_net_conf(mdev);
693 
694 	return s_estab;
695 }
696 
697 static int drbd_send_fp(struct drbd_conf *mdev,
698 	struct socket *sock, enum drbd_packets cmd)
699 {
700 	struct p_header80 *h = &mdev->data.sbuf.header.h80;
701 
702 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
703 }
704 
705 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
706 {
707 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
708 	int rr;
709 
710 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
711 
712 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
713 		return be16_to_cpu(h->command);
714 
715 	return 0xffff;
716 }
717 
718 /**
719  * drbd_socket_okay() - Free the socket if its connection is not okay
720  * @mdev:	DRBD device.
721  * @sock:	pointer to the pointer to the socket.
722  */
723 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
724 {
725 	int rr;
726 	char tb[4];
727 
728 	if (!*sock)
729 		return false;
730 
731 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
732 
733 	if (rr > 0 || rr == -EAGAIN) {
734 		return true;
735 	} else {
736 		sock_release(*sock);
737 		*sock = NULL;
738 		return false;
739 	}
740 }
741 
742 /*
743  * return values:
744  *   1 yes, we have a valid connection
745  *   0 oops, did not work out, please try again
746  *  -1 peer talks different language,
747  *     no point in trying again, please go standalone.
748  *  -2 We do not have a network config...
749  */
750 static int drbd_connect(struct drbd_conf *mdev)
751 {
752 	struct socket *s, *sock, *msock;
753 	int try, h, ok;
754 	enum drbd_state_rv rv;
755 
756 	D_ASSERT(!mdev->data.socket);
757 
758 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
759 		return -2;
760 
761 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
762 
763 	sock  = NULL;
764 	msock = NULL;
765 
766 	do {
767 		for (try = 0;;) {
768 			/* 3 tries, this should take less than a second! */
769 			s = drbd_try_connect(mdev);
770 			if (s || ++try >= 3)
771 				break;
772 			/* give the other side time to call bind() & listen() */
773 			schedule_timeout_interruptible(HZ / 10);
774 		}
775 
776 		if (s) {
777 			if (!sock) {
778 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
779 				sock = s;
780 				s = NULL;
781 			} else if (!msock) {
782 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
783 				msock = s;
784 				s = NULL;
785 			} else {
786 				dev_err(DEV, "Logic error in drbd_connect()\n");
787 				goto out_release_sockets;
788 			}
789 		}
790 
791 		if (sock && msock) {
792 			schedule_timeout_interruptible(mdev->net_conf->ping_timeo*HZ/10);
793 			ok = drbd_socket_okay(mdev, &sock);
794 			ok = drbd_socket_okay(mdev, &msock) && ok;
795 			if (ok)
796 				break;
797 		}
798 
799 retry:
800 		s = drbd_wait_for_connect(mdev);
801 		if (s) {
802 			try = drbd_recv_fp(mdev, s);
803 			drbd_socket_okay(mdev, &sock);
804 			drbd_socket_okay(mdev, &msock);
805 			switch (try) {
806 			case P_HAND_SHAKE_S:
807 				if (sock) {
808 					dev_warn(DEV, "initial packet S crossed\n");
809 					sock_release(sock);
810 				}
811 				sock = s;
812 				break;
813 			case P_HAND_SHAKE_M:
814 				if (msock) {
815 					dev_warn(DEV, "initial packet M crossed\n");
816 					sock_release(msock);
817 				}
818 				msock = s;
819 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
820 				break;
821 			default:
822 				dev_warn(DEV, "Error receiving initial packet\n");
823 				sock_release(s);
824 				if (random32() & 1)
825 					goto retry;
826 			}
827 		}
828 
829 		if (mdev->state.conn <= C_DISCONNECTING)
830 			goto out_release_sockets;
831 		if (signal_pending(current)) {
832 			flush_signals(current);
833 			smp_rmb();
834 			if (get_t_state(&mdev->receiver) == Exiting)
835 				goto out_release_sockets;
836 		}
837 
838 		if (sock && msock) {
839 			ok = drbd_socket_okay(mdev, &sock);
840 			ok = drbd_socket_okay(mdev, &msock) && ok;
841 			if (ok)
842 				break;
843 		}
844 	} while (1);
845 
846 	msock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
847 	sock->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
848 
849 	sock->sk->sk_allocation = GFP_NOIO;
850 	msock->sk->sk_allocation = GFP_NOIO;
851 
852 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
853 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
854 
855 	/* NOT YET ...
856 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
857 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
858 	 * first set it to the P_HAND_SHAKE timeout,
859 	 * which we set to 4x the configured ping_timeout. */
860 	sock->sk->sk_sndtimeo =
861 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
862 
863 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
864 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
865 
866 	/* we don't want delays.
867 	 * we use TCP_CORK where appropriate, though */
868 	drbd_tcp_nodelay(sock);
869 	drbd_tcp_nodelay(msock);
870 
871 	mdev->data.socket = sock;
872 	mdev->meta.socket = msock;
873 	mdev->last_received = jiffies;
874 
875 	D_ASSERT(mdev->asender.task == NULL);
876 
877 	h = drbd_do_handshake(mdev);
878 	if (h <= 0)
879 		return h;
880 
881 	if (mdev->cram_hmac_tfm) {
882 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
883 		switch (drbd_do_auth(mdev)) {
884 		case -1:
885 			dev_err(DEV, "Authentication of peer failed\n");
886 			return -1;
887 		case 0:
888 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
889 			return 0;
890 		}
891 	}
892 
893 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
894 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
895 
896 	atomic_set(&mdev->packet_seq, 0);
897 	mdev->peer_seq = 0;
898 
899 	if (drbd_send_protocol(mdev) == -1)
900 		return -1;
901 	set_bit(STATE_SENT, &mdev->flags);
902 	drbd_send_sync_param(mdev, &mdev->sync_conf);
903 	drbd_send_sizes(mdev, 0, 0);
904 	drbd_send_uuids(mdev);
905 	drbd_send_current_state(mdev);
906 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
907 	clear_bit(RESIZE_PENDING, &mdev->flags);
908 
909 	spin_lock_irq(&mdev->req_lock);
910 	rv = _drbd_set_state(_NS(mdev, conn, C_WF_REPORT_PARAMS), CS_VERBOSE, NULL);
911 	if (mdev->state.conn != C_WF_REPORT_PARAMS)
912 		clear_bit(STATE_SENT, &mdev->flags);
913 	spin_unlock_irq(&mdev->req_lock);
914 
915 	if (rv < SS_SUCCESS)
916 		return 0;
917 
918 	drbd_thread_start(&mdev->asender);
919 	mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
920 
921 	return 1;
922 
923 out_release_sockets:
924 	if (sock)
925 		sock_release(sock);
926 	if (msock)
927 		sock_release(msock);
928 	return -1;
929 }
930 
931 static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
932 {
933 	union p_header *h = &mdev->data.rbuf.header;
934 	int r;
935 
936 	r = drbd_recv(mdev, h, sizeof(*h));
937 	if (unlikely(r != sizeof(*h))) {
938 		if (!signal_pending(current))
939 			dev_warn(DEV, "short read expecting header on sock: r=%d\n", r);
940 		return false;
941 	}
942 
943 	if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
944 		*cmd = be16_to_cpu(h->h80.command);
945 		*packet_size = be16_to_cpu(h->h80.length);
946 	} else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
947 		*cmd = be16_to_cpu(h->h95.command);
948 		*packet_size = be32_to_cpu(h->h95.length);
949 	} else {
950 		dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
951 		    be32_to_cpu(h->h80.magic),
952 		    be16_to_cpu(h->h80.command),
953 		    be16_to_cpu(h->h80.length));
954 		return false;
955 	}
956 	mdev->last_received = jiffies;
957 
958 	return true;
959 }
960 
961 static void drbd_flush(struct drbd_conf *mdev)
962 {
963 	int rv;
964 
965 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
966 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
967 					NULL);
968 		if (rv) {
969 			dev_info(DEV, "local disk flush failed with status %d\n", rv);
970 			/* would rather check on EOPNOTSUPP, but that is not reliable.
971 			 * don't try again for ANY return value != 0
972 			 * if (rv == -EOPNOTSUPP) */
973 			drbd_bump_write_ordering(mdev, WO_drain_io);
974 		}
975 		put_ldev(mdev);
976 	}
977 }
978 
979 /**
980  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
981  * @mdev:	DRBD device.
982  * @epoch:	Epoch object.
983  * @ev:		Epoch event.
984  */
985 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
986 					       struct drbd_epoch *epoch,
987 					       enum epoch_event ev)
988 {
989 	int epoch_size;
990 	struct drbd_epoch *next_epoch;
991 	enum finish_epoch rv = FE_STILL_LIVE;
992 
993 	spin_lock(&mdev->epoch_lock);
994 	do {
995 		next_epoch = NULL;
996 
997 		epoch_size = atomic_read(&epoch->epoch_size);
998 
999 		switch (ev & ~EV_CLEANUP) {
1000 		case EV_PUT:
1001 			atomic_dec(&epoch->active);
1002 			break;
1003 		case EV_GOT_BARRIER_NR:
1004 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1005 			break;
1006 		case EV_BECAME_LAST:
1007 			/* nothing to do*/
1008 			break;
1009 		}
1010 
1011 		if (epoch_size != 0 &&
1012 		    atomic_read(&epoch->active) == 0 &&
1013 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1014 			if (!(ev & EV_CLEANUP)) {
1015 				spin_unlock(&mdev->epoch_lock);
1016 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1017 				spin_lock(&mdev->epoch_lock);
1018 			}
1019 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1020 				dec_unacked(mdev);
1021 
1022 			if (mdev->current_epoch != epoch) {
1023 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1024 				list_del(&epoch->list);
1025 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1026 				mdev->epochs--;
1027 				kfree(epoch);
1028 
1029 				if (rv == FE_STILL_LIVE)
1030 					rv = FE_DESTROYED;
1031 			} else {
1032 				epoch->flags = 0;
1033 				atomic_set(&epoch->epoch_size, 0);
1034 				/* atomic_set(&epoch->active, 0); is already zero */
1035 				if (rv == FE_STILL_LIVE)
1036 					rv = FE_RECYCLED;
1037 				wake_up(&mdev->ee_wait);
1038 			}
1039 		}
1040 
1041 		if (!next_epoch)
1042 			break;
1043 
1044 		epoch = next_epoch;
1045 	} while (1);
1046 
1047 	spin_unlock(&mdev->epoch_lock);
1048 
1049 	return rv;
1050 }
1051 
1052 /**
1053  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1054  * @mdev:	DRBD device.
1055  * @wo:		Write ordering method to try.
1056  */
1057 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1058 {
1059 	enum write_ordering_e pwo;
1060 	static char *write_ordering_str[] = {
1061 		[WO_none] = "none",
1062 		[WO_drain_io] = "drain",
1063 		[WO_bdev_flush] = "flush",
1064 	};
1065 
1066 	pwo = mdev->write_ordering;
1067 	wo = min(pwo, wo);
1068 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1069 		wo = WO_drain_io;
1070 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1071 		wo = WO_none;
1072 	mdev->write_ordering = wo;
1073 	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1074 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1075 }
1076 
1077 /**
1078  * drbd_submit_ee()
1079  * @mdev:	DRBD device.
1080  * @e:		epoch entry
1081  * @rw:		flag field, see bio->bi_rw
1082  *
1083  * May spread the pages to multiple bios,
1084  * depending on bio_add_page restrictions.
1085  *
1086  * Returns 0 if all bios have been submitted,
1087  * -ENOMEM if we could not allocate enough bios,
1088  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1089  *  single page to an empty bio (which should never happen and likely indicates
1090  *  that the lower level IO stack is in some way broken). This has been observed
1091  *  on certain Xen deployments.
1092  */
1093 /* TODO allocate from our own bio_set. */
1094 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1095 		const unsigned rw, const int fault_type)
1096 {
1097 	struct bio *bios = NULL;
1098 	struct bio *bio;
1099 	struct page *page = e->pages;
1100 	sector_t sector = e->sector;
1101 	unsigned ds = e->size;
1102 	unsigned n_bios = 0;
1103 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1104 	int err = -ENOMEM;
1105 
1106 	/* In most cases, we will only need one bio.  But in case the lower
1107 	 * level restrictions happen to be different at this offset on this
1108 	 * side than those of the sending peer, we may need to submit the
1109 	 * request in more than one bio.
1110 	 *
1111 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1112 	 * generated bio, but a bio allocated on behalf of the peer.
1113 	 */
1114 next_bio:
1115 	bio = bio_alloc(GFP_NOIO, nr_pages);
1116 	if (!bio) {
1117 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1118 		goto fail;
1119 	}
1120 	/* > e->sector, unless this is the first bio */
1121 	bio->bi_sector = sector;
1122 	bio->bi_bdev = mdev->ldev->backing_bdev;
1123 	bio->bi_rw = rw;
1124 	bio->bi_private = e;
1125 	bio->bi_end_io = drbd_endio_sec;
1126 
1127 	bio->bi_next = bios;
1128 	bios = bio;
1129 	++n_bios;
1130 
1131 	page_chain_for_each(page) {
1132 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1133 		if (!bio_add_page(bio, page, len, 0)) {
1134 			/* A single page must always be possible!
1135 			 * But in case it fails anyways,
1136 			 * we deal with it, and complain (below). */
1137 			if (bio->bi_vcnt == 0) {
1138 				dev_err(DEV,
1139 					"bio_add_page failed for len=%u, "
1140 					"bi_vcnt=0 (bi_sector=%llu)\n",
1141 					len, (unsigned long long)bio->bi_sector);
1142 				err = -ENOSPC;
1143 				goto fail;
1144 			}
1145 			goto next_bio;
1146 		}
1147 		ds -= len;
1148 		sector += len >> 9;
1149 		--nr_pages;
1150 	}
1151 	D_ASSERT(page == NULL);
1152 	D_ASSERT(ds == 0);
1153 
1154 	atomic_set(&e->pending_bios, n_bios);
1155 	do {
1156 		bio = bios;
1157 		bios = bios->bi_next;
1158 		bio->bi_next = NULL;
1159 
1160 		drbd_generic_make_request(mdev, fault_type, bio);
1161 	} while (bios);
1162 	return 0;
1163 
1164 fail:
1165 	while (bios) {
1166 		bio = bios;
1167 		bios = bios->bi_next;
1168 		bio_put(bio);
1169 	}
1170 	return err;
1171 }
1172 
1173 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1174 {
1175 	int rv;
1176 	struct p_barrier *p = &mdev->data.rbuf.barrier;
1177 	struct drbd_epoch *epoch;
1178 
1179 	inc_unacked(mdev);
1180 
1181 	mdev->current_epoch->barrier_nr = p->barrier;
1182 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1183 
1184 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1185 	 * the activity log, which means it would not be resynced in case the
1186 	 * R_PRIMARY crashes now.
1187 	 * Therefore we must send the barrier_ack after the barrier request was
1188 	 * completed. */
1189 	switch (mdev->write_ordering) {
1190 	case WO_none:
1191 		if (rv == FE_RECYCLED)
1192 			return true;
1193 
1194 		/* receiver context, in the writeout path of the other node.
1195 		 * avoid potential distributed deadlock */
1196 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1197 		if (epoch)
1198 			break;
1199 		else
1200 			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1201 			/* Fall through */
1202 
1203 	case WO_bdev_flush:
1204 	case WO_drain_io:
1205 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1206 		drbd_flush(mdev);
1207 
1208 		if (atomic_read(&mdev->current_epoch->epoch_size)) {
1209 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1210 			if (epoch)
1211 				break;
1212 		}
1213 
1214 		epoch = mdev->current_epoch;
1215 		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1216 
1217 		D_ASSERT(atomic_read(&epoch->active) == 0);
1218 		D_ASSERT(epoch->flags == 0);
1219 
1220 		return true;
1221 	default:
1222 		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1223 		return false;
1224 	}
1225 
1226 	epoch->flags = 0;
1227 	atomic_set(&epoch->epoch_size, 0);
1228 	atomic_set(&epoch->active, 0);
1229 
1230 	spin_lock(&mdev->epoch_lock);
1231 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1232 		list_add(&epoch->list, &mdev->current_epoch->list);
1233 		mdev->current_epoch = epoch;
1234 		mdev->epochs++;
1235 	} else {
1236 		/* The current_epoch got recycled while we allocated this one... */
1237 		kfree(epoch);
1238 	}
1239 	spin_unlock(&mdev->epoch_lock);
1240 
1241 	return true;
1242 }
1243 
1244 /* used from receive_RSDataReply (recv_resync_read)
1245  * and from receive_Data */
1246 static struct drbd_epoch_entry *
1247 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1248 {
1249 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1250 	struct drbd_epoch_entry *e;
1251 	struct page *page;
1252 	int dgs, ds, rr;
1253 	void *dig_in = mdev->int_dig_in;
1254 	void *dig_vv = mdev->int_dig_vv;
1255 	unsigned long *data;
1256 
1257 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1258 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1259 
1260 	if (dgs) {
1261 		rr = drbd_recv(mdev, dig_in, dgs);
1262 		if (rr != dgs) {
1263 			if (!signal_pending(current))
1264 				dev_warn(DEV,
1265 					"short read receiving data digest: read %d expected %d\n",
1266 					rr, dgs);
1267 			return NULL;
1268 		}
1269 	}
1270 
1271 	data_size -= dgs;
1272 
1273 	ERR_IF(data_size == 0) return NULL;
1274 	ERR_IF(data_size &  0x1ff) return NULL;
1275 	ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
1276 
1277 	/* even though we trust out peer,
1278 	 * we sometimes have to double check. */
1279 	if (sector + (data_size>>9) > capacity) {
1280 		dev_err(DEV, "request from peer beyond end of local disk: "
1281 			"capacity: %llus < sector: %llus + size: %u\n",
1282 			(unsigned long long)capacity,
1283 			(unsigned long long)sector, data_size);
1284 		return NULL;
1285 	}
1286 
1287 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1288 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1289 	 * which in turn might block on the other node at this very place.  */
1290 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1291 	if (!e)
1292 		return NULL;
1293 
1294 	ds = data_size;
1295 	page = e->pages;
1296 	page_chain_for_each(page) {
1297 		unsigned len = min_t(int, ds, PAGE_SIZE);
1298 		data = kmap(page);
1299 		rr = drbd_recv(mdev, data, len);
1300 		if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1301 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1302 			data[0] = data[0] ^ (unsigned long)-1;
1303 		}
1304 		kunmap(page);
1305 		if (rr != len) {
1306 			drbd_free_ee(mdev, e);
1307 			if (!signal_pending(current))
1308 				dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1309 				rr, len);
1310 			return NULL;
1311 		}
1312 		ds -= rr;
1313 	}
1314 
1315 	if (dgs) {
1316 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1317 		if (memcmp(dig_in, dig_vv, dgs)) {
1318 			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1319 				(unsigned long long)sector, data_size);
1320 			drbd_bcast_ee(mdev, "digest failed",
1321 					dgs, dig_in, dig_vv, e);
1322 			drbd_free_ee(mdev, e);
1323 			return NULL;
1324 		}
1325 	}
1326 	mdev->recv_cnt += data_size>>9;
1327 	return e;
1328 }
1329 
1330 /* drbd_drain_block() just takes a data block
1331  * out of the socket input buffer, and discards it.
1332  */
1333 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1334 {
1335 	struct page *page;
1336 	int rr, rv = 1;
1337 	void *data;
1338 
1339 	if (!data_size)
1340 		return true;
1341 
1342 	page = drbd_pp_alloc(mdev, 1, 1);
1343 
1344 	data = kmap(page);
1345 	while (data_size) {
1346 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1347 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1348 			rv = 0;
1349 			if (!signal_pending(current))
1350 				dev_warn(DEV,
1351 					"short read receiving data: read %d expected %d\n",
1352 					rr, min_t(int, data_size, PAGE_SIZE));
1353 			break;
1354 		}
1355 		data_size -= rr;
1356 	}
1357 	kunmap(page);
1358 	drbd_pp_free(mdev, page, 0);
1359 	return rv;
1360 }
1361 
1362 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1363 			   sector_t sector, int data_size)
1364 {
1365 	struct bio_vec *bvec;
1366 	struct bio *bio;
1367 	int dgs, rr, i, expect;
1368 	void *dig_in = mdev->int_dig_in;
1369 	void *dig_vv = mdev->int_dig_vv;
1370 
1371 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1372 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1373 
1374 	if (dgs) {
1375 		rr = drbd_recv(mdev, dig_in, dgs);
1376 		if (rr != dgs) {
1377 			if (!signal_pending(current))
1378 				dev_warn(DEV,
1379 					"short read receiving data reply digest: read %d expected %d\n",
1380 					rr, dgs);
1381 			return 0;
1382 		}
1383 	}
1384 
1385 	data_size -= dgs;
1386 
1387 	/* optimistically update recv_cnt.  if receiving fails below,
1388 	 * we disconnect anyways, and counters will be reset. */
1389 	mdev->recv_cnt += data_size>>9;
1390 
1391 	bio = req->master_bio;
1392 	D_ASSERT(sector == bio->bi_sector);
1393 
1394 	bio_for_each_segment(bvec, bio, i) {
1395 		expect = min_t(int, data_size, bvec->bv_len);
1396 		rr = drbd_recv(mdev,
1397 			     kmap(bvec->bv_page)+bvec->bv_offset,
1398 			     expect);
1399 		kunmap(bvec->bv_page);
1400 		if (rr != expect) {
1401 			if (!signal_pending(current))
1402 				dev_warn(DEV, "short read receiving data reply: "
1403 					"read %d expected %d\n",
1404 					rr, expect);
1405 			return 0;
1406 		}
1407 		data_size -= rr;
1408 	}
1409 
1410 	if (dgs) {
1411 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1412 		if (memcmp(dig_in, dig_vv, dgs)) {
1413 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1414 			return 0;
1415 		}
1416 	}
1417 
1418 	D_ASSERT(data_size == 0);
1419 	return 1;
1420 }
1421 
1422 /* e_end_resync_block() is called via
1423  * drbd_process_done_ee() by asender only */
1424 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1425 {
1426 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1427 	sector_t sector = e->sector;
1428 	int ok;
1429 
1430 	D_ASSERT(hlist_unhashed(&e->collision));
1431 
1432 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1433 		drbd_set_in_sync(mdev, sector, e->size);
1434 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1435 	} else {
1436 		/* Record failure to sync */
1437 		drbd_rs_failed_io(mdev, sector, e->size);
1438 
1439 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1440 	}
1441 	dec_unacked(mdev);
1442 
1443 	return ok;
1444 }
1445 
1446 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1447 {
1448 	struct drbd_epoch_entry *e;
1449 
1450 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1451 	if (!e)
1452 		goto fail;
1453 
1454 	dec_rs_pending(mdev);
1455 
1456 	inc_unacked(mdev);
1457 	/* corresponding dec_unacked() in e_end_resync_block()
1458 	 * respective _drbd_clear_done_ee */
1459 
1460 	e->w.cb = e_end_resync_block;
1461 
1462 	spin_lock_irq(&mdev->req_lock);
1463 	list_add(&e->w.list, &mdev->sync_ee);
1464 	spin_unlock_irq(&mdev->req_lock);
1465 
1466 	atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1467 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1468 		return true;
1469 
1470 	/* don't care for the reason here */
1471 	dev_err(DEV, "submit failed, triggering re-connect\n");
1472 	spin_lock_irq(&mdev->req_lock);
1473 	list_del(&e->w.list);
1474 	spin_unlock_irq(&mdev->req_lock);
1475 
1476 	drbd_free_ee(mdev, e);
1477 fail:
1478 	put_ldev(mdev);
1479 	return false;
1480 }
1481 
1482 static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1483 {
1484 	struct drbd_request *req;
1485 	sector_t sector;
1486 	int ok;
1487 	struct p_data *p = &mdev->data.rbuf.data;
1488 
1489 	sector = be64_to_cpu(p->sector);
1490 
1491 	spin_lock_irq(&mdev->req_lock);
1492 	req = _ar_id_to_req(mdev, p->block_id, sector);
1493 	spin_unlock_irq(&mdev->req_lock);
1494 	if (unlikely(!req)) {
1495 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1496 		return false;
1497 	}
1498 
1499 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1500 	 * special casing it there for the various failure cases.
1501 	 * still no race with drbd_fail_pending_reads */
1502 	ok = recv_dless_read(mdev, req, sector, data_size);
1503 
1504 	if (ok)
1505 		req_mod(req, data_received);
1506 	/* else: nothing. handled from drbd_disconnect...
1507 	 * I don't think we may complete this just yet
1508 	 * in case we are "on-disconnect: freeze" */
1509 
1510 	return ok;
1511 }
1512 
1513 static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1514 {
1515 	sector_t sector;
1516 	int ok;
1517 	struct p_data *p = &mdev->data.rbuf.data;
1518 
1519 	sector = be64_to_cpu(p->sector);
1520 	D_ASSERT(p->block_id == ID_SYNCER);
1521 
1522 	if (get_ldev(mdev)) {
1523 		/* data is submitted to disk within recv_resync_read.
1524 		 * corresponding put_ldev done below on error,
1525 		 * or in drbd_endio_write_sec. */
1526 		ok = recv_resync_read(mdev, sector, data_size);
1527 	} else {
1528 		if (__ratelimit(&drbd_ratelimit_state))
1529 			dev_err(DEV, "Can not write resync data to local disk.\n");
1530 
1531 		ok = drbd_drain_block(mdev, data_size);
1532 
1533 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1534 	}
1535 
1536 	atomic_add(data_size >> 9, &mdev->rs_sect_in);
1537 
1538 	return ok;
1539 }
1540 
1541 /* e_end_block() is called via drbd_process_done_ee().
1542  * this means this function only runs in the asender thread
1543  */
1544 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1545 {
1546 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1547 	sector_t sector = e->sector;
1548 	int ok = 1, pcmd;
1549 
1550 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1551 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1552 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1553 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1554 				e->flags & EE_MAY_SET_IN_SYNC) ?
1555 				P_RS_WRITE_ACK : P_WRITE_ACK;
1556 			ok &= drbd_send_ack(mdev, pcmd, e);
1557 			if (pcmd == P_RS_WRITE_ACK)
1558 				drbd_set_in_sync(mdev, sector, e->size);
1559 		} else {
1560 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1561 			/* we expect it to be marked out of sync anyways...
1562 			 * maybe assert this?  */
1563 		}
1564 		dec_unacked(mdev);
1565 	}
1566 	/* we delete from the conflict detection hash _after_ we sent out the
1567 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1568 	if (mdev->net_conf->two_primaries) {
1569 		spin_lock_irq(&mdev->req_lock);
1570 		D_ASSERT(!hlist_unhashed(&e->collision));
1571 		hlist_del_init(&e->collision);
1572 		spin_unlock_irq(&mdev->req_lock);
1573 	} else {
1574 		D_ASSERT(hlist_unhashed(&e->collision));
1575 	}
1576 
1577 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1578 
1579 	return ok;
1580 }
1581 
1582 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1583 {
1584 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1585 	int ok = 1;
1586 
1587 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1588 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1589 
1590 	spin_lock_irq(&mdev->req_lock);
1591 	D_ASSERT(!hlist_unhashed(&e->collision));
1592 	hlist_del_init(&e->collision);
1593 	spin_unlock_irq(&mdev->req_lock);
1594 
1595 	dec_unacked(mdev);
1596 
1597 	return ok;
1598 }
1599 
1600 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_epoch_entry *data_e)
1601 {
1602 
1603 	struct drbd_epoch_entry *rs_e;
1604 	bool rv = 0;
1605 
1606 	spin_lock_irq(&mdev->req_lock);
1607 	list_for_each_entry(rs_e, &mdev->sync_ee, w.list) {
1608 		if (overlaps(data_e->sector, data_e->size, rs_e->sector, rs_e->size)) {
1609 			rv = 1;
1610 			break;
1611 		}
1612 	}
1613 	spin_unlock_irq(&mdev->req_lock);
1614 
1615 	return rv;
1616 }
1617 
1618 /* Called from receive_Data.
1619  * Synchronize packets on sock with packets on msock.
1620  *
1621  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1622  * packet traveling on msock, they are still processed in the order they have
1623  * been sent.
1624  *
1625  * Note: we don't care for Ack packets overtaking P_DATA packets.
1626  *
1627  * In case packet_seq is larger than mdev->peer_seq number, there are
1628  * outstanding packets on the msock. We wait for them to arrive.
1629  * In case we are the logically next packet, we update mdev->peer_seq
1630  * ourselves. Correctly handles 32bit wrap around.
1631  *
1632  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1633  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1634  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1635  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1636  *
1637  * returns 0 if we may process the packet,
1638  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1639 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1640 {
1641 	DEFINE_WAIT(wait);
1642 	unsigned int p_seq;
1643 	long timeout;
1644 	int ret = 0;
1645 	spin_lock(&mdev->peer_seq_lock);
1646 	for (;;) {
1647 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1648 		if (seq_le(packet_seq, mdev->peer_seq+1))
1649 			break;
1650 		if (signal_pending(current)) {
1651 			ret = -ERESTARTSYS;
1652 			break;
1653 		}
1654 		p_seq = mdev->peer_seq;
1655 		spin_unlock(&mdev->peer_seq_lock);
1656 		timeout = schedule_timeout(30*HZ);
1657 		spin_lock(&mdev->peer_seq_lock);
1658 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1659 			ret = -ETIMEDOUT;
1660 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1661 			break;
1662 		}
1663 	}
1664 	finish_wait(&mdev->seq_wait, &wait);
1665 	if (mdev->peer_seq+1 == packet_seq)
1666 		mdev->peer_seq++;
1667 	spin_unlock(&mdev->peer_seq_lock);
1668 	return ret;
1669 }
1670 
1671 /* see also bio_flags_to_wire()
1672  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1673  * flags and back. We may replicate to other kernel versions. */
1674 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1675 {
1676 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1677 		(dpf & DP_FUA ? REQ_FUA : 0) |
1678 		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1679 		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
1680 }
1681 
1682 /* mirrored write */
1683 static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1684 {
1685 	sector_t sector;
1686 	struct drbd_epoch_entry *e;
1687 	struct p_data *p = &mdev->data.rbuf.data;
1688 	int rw = WRITE;
1689 	u32 dp_flags;
1690 
1691 	if (!get_ldev(mdev)) {
1692 		spin_lock(&mdev->peer_seq_lock);
1693 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1694 			mdev->peer_seq++;
1695 		spin_unlock(&mdev->peer_seq_lock);
1696 
1697 		drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1698 		atomic_inc(&mdev->current_epoch->epoch_size);
1699 		return drbd_drain_block(mdev, data_size);
1700 	}
1701 
1702 	/* get_ldev(mdev) successful.
1703 	 * Corresponding put_ldev done either below (on various errors),
1704 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1705 	 * the end of this function. */
1706 
1707 	sector = be64_to_cpu(p->sector);
1708 	e = read_in_block(mdev, p->block_id, sector, data_size);
1709 	if (!e) {
1710 		put_ldev(mdev);
1711 		return false;
1712 	}
1713 
1714 	e->w.cb = e_end_block;
1715 
1716 	dp_flags = be32_to_cpu(p->dp_flags);
1717 	rw |= wire_flags_to_bio(mdev, dp_flags);
1718 
1719 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1720 		e->flags |= EE_MAY_SET_IN_SYNC;
1721 
1722 	spin_lock(&mdev->epoch_lock);
1723 	e->epoch = mdev->current_epoch;
1724 	atomic_inc(&e->epoch->epoch_size);
1725 	atomic_inc(&e->epoch->active);
1726 	spin_unlock(&mdev->epoch_lock);
1727 
1728 	/* I'm the receiver, I do hold a net_cnt reference. */
1729 	if (!mdev->net_conf->two_primaries) {
1730 		spin_lock_irq(&mdev->req_lock);
1731 	} else {
1732 		/* don't get the req_lock yet,
1733 		 * we may sleep in drbd_wait_peer_seq */
1734 		const int size = e->size;
1735 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1736 		DEFINE_WAIT(wait);
1737 		struct drbd_request *i;
1738 		struct hlist_node *n;
1739 		struct hlist_head *slot;
1740 		int first;
1741 
1742 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1743 		BUG_ON(mdev->ee_hash == NULL);
1744 		BUG_ON(mdev->tl_hash == NULL);
1745 
1746 		/* conflict detection and handling:
1747 		 * 1. wait on the sequence number,
1748 		 *    in case this data packet overtook ACK packets.
1749 		 * 2. check our hash tables for conflicting requests.
1750 		 *    we only need to walk the tl_hash, since an ee can not
1751 		 *    have a conflict with an other ee: on the submitting
1752 		 *    node, the corresponding req had already been conflicting,
1753 		 *    and a conflicting req is never sent.
1754 		 *
1755 		 * Note: for two_primaries, we are protocol C,
1756 		 * so there cannot be any request that is DONE
1757 		 * but still on the transfer log.
1758 		 *
1759 		 * unconditionally add to the ee_hash.
1760 		 *
1761 		 * if no conflicting request is found:
1762 		 *    submit.
1763 		 *
1764 		 * if any conflicting request is found
1765 		 * that has not yet been acked,
1766 		 * AND I have the "discard concurrent writes" flag:
1767 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1768 		 *
1769 		 * if any conflicting request is found:
1770 		 *	 block the receiver, waiting on misc_wait
1771 		 *	 until no more conflicting requests are there,
1772 		 *	 or we get interrupted (disconnect).
1773 		 *
1774 		 *	 we do not just write after local io completion of those
1775 		 *	 requests, but only after req is done completely, i.e.
1776 		 *	 we wait for the P_DISCARD_ACK to arrive!
1777 		 *
1778 		 *	 then proceed normally, i.e. submit.
1779 		 */
1780 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1781 			goto out_interrupted;
1782 
1783 		spin_lock_irq(&mdev->req_lock);
1784 
1785 		hlist_add_head(&e->collision, ee_hash_slot(mdev, sector));
1786 
1787 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1788 		slot = tl_hash_slot(mdev, sector);
1789 		first = 1;
1790 		for (;;) {
1791 			int have_unacked = 0;
1792 			int have_conflict = 0;
1793 			prepare_to_wait(&mdev->misc_wait, &wait,
1794 				TASK_INTERRUPTIBLE);
1795 			hlist_for_each_entry(i, n, slot, collision) {
1796 				if (OVERLAPS) {
1797 					/* only ALERT on first iteration,
1798 					 * we may be woken up early... */
1799 					if (first)
1800 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1801 						      "	new: %llus +%u; pending: %llus +%u\n",
1802 						      current->comm, current->pid,
1803 						      (unsigned long long)sector, size,
1804 						      (unsigned long long)i->sector, i->size);
1805 					if (i->rq_state & RQ_NET_PENDING)
1806 						++have_unacked;
1807 					++have_conflict;
1808 				}
1809 			}
1810 #undef OVERLAPS
1811 			if (!have_conflict)
1812 				break;
1813 
1814 			/* Discard Ack only for the _first_ iteration */
1815 			if (first && discard && have_unacked) {
1816 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1817 				     (unsigned long long)sector);
1818 				inc_unacked(mdev);
1819 				e->w.cb = e_send_discard_ack;
1820 				list_add_tail(&e->w.list, &mdev->done_ee);
1821 
1822 				spin_unlock_irq(&mdev->req_lock);
1823 
1824 				/* we could probably send that P_DISCARD_ACK ourselves,
1825 				 * but I don't like the receiver using the msock */
1826 
1827 				put_ldev(mdev);
1828 				wake_asender(mdev);
1829 				finish_wait(&mdev->misc_wait, &wait);
1830 				return true;
1831 			}
1832 
1833 			if (signal_pending(current)) {
1834 				hlist_del_init(&e->collision);
1835 
1836 				spin_unlock_irq(&mdev->req_lock);
1837 
1838 				finish_wait(&mdev->misc_wait, &wait);
1839 				goto out_interrupted;
1840 			}
1841 
1842 			spin_unlock_irq(&mdev->req_lock);
1843 			if (first) {
1844 				first = 0;
1845 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1846 				     "sec=%llus\n", (unsigned long long)sector);
1847 			} else if (discard) {
1848 				/* we had none on the first iteration.
1849 				 * there must be none now. */
1850 				D_ASSERT(have_unacked == 0);
1851 			}
1852 			schedule();
1853 			spin_lock_irq(&mdev->req_lock);
1854 		}
1855 		finish_wait(&mdev->misc_wait, &wait);
1856 	}
1857 
1858 	list_add(&e->w.list, &mdev->active_ee);
1859 	spin_unlock_irq(&mdev->req_lock);
1860 
1861 	if (mdev->state.conn == C_SYNC_TARGET)
1862 		wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, e));
1863 
1864 	switch (mdev->net_conf->wire_protocol) {
1865 	case DRBD_PROT_C:
1866 		inc_unacked(mdev);
1867 		/* corresponding dec_unacked() in e_end_block()
1868 		 * respective _drbd_clear_done_ee */
1869 		break;
1870 	case DRBD_PROT_B:
1871 		/* I really don't like it that the receiver thread
1872 		 * sends on the msock, but anyways */
1873 		drbd_send_ack(mdev, P_RECV_ACK, e);
1874 		break;
1875 	case DRBD_PROT_A:
1876 		/* nothing to do */
1877 		break;
1878 	}
1879 
1880 	if (mdev->state.pdsk < D_INCONSISTENT) {
1881 		/* In case we have the only disk of the cluster, */
1882 		drbd_set_out_of_sync(mdev, e->sector, e->size);
1883 		e->flags |= EE_CALL_AL_COMPLETE_IO;
1884 		e->flags &= ~EE_MAY_SET_IN_SYNC;
1885 		drbd_al_begin_io(mdev, e->sector);
1886 	}
1887 
1888 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1889 		return true;
1890 
1891 	/* don't care for the reason here */
1892 	dev_err(DEV, "submit failed, triggering re-connect\n");
1893 	spin_lock_irq(&mdev->req_lock);
1894 	list_del(&e->w.list);
1895 	hlist_del_init(&e->collision);
1896 	spin_unlock_irq(&mdev->req_lock);
1897 	if (e->flags & EE_CALL_AL_COMPLETE_IO)
1898 		drbd_al_complete_io(mdev, e->sector);
1899 
1900 out_interrupted:
1901 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + EV_CLEANUP);
1902 	put_ldev(mdev);
1903 	drbd_free_ee(mdev, e);
1904 	return false;
1905 }
1906 
1907 /* We may throttle resync, if the lower device seems to be busy,
1908  * and current sync rate is above c_min_rate.
1909  *
1910  * To decide whether or not the lower device is busy, we use a scheme similar
1911  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1912  * (more than 64 sectors) of activity we cannot account for with our own resync
1913  * activity, it obviously is "busy".
1914  *
1915  * The current sync rate used here uses only the most recent two step marks,
1916  * to have a short time average so we can react faster.
1917  */
1918 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
1919 {
1920 	struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1921 	unsigned long db, dt, dbdt;
1922 	struct lc_element *tmp;
1923 	int curr_events;
1924 	int throttle = 0;
1925 
1926 	/* feature disabled? */
1927 	if (mdev->sync_conf.c_min_rate == 0)
1928 		return 0;
1929 
1930 	spin_lock_irq(&mdev->al_lock);
1931 	tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
1932 	if (tmp) {
1933 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
1934 		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
1935 			spin_unlock_irq(&mdev->al_lock);
1936 			return 0;
1937 		}
1938 		/* Do not slow down if app IO is already waiting for this extent */
1939 	}
1940 	spin_unlock_irq(&mdev->al_lock);
1941 
1942 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1943 		      (int)part_stat_read(&disk->part0, sectors[1]) -
1944 			atomic_read(&mdev->rs_sect_ev);
1945 
1946 	if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1947 		unsigned long rs_left;
1948 		int i;
1949 
1950 		mdev->rs_last_events = curr_events;
1951 
1952 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1953 		 * approx. */
1954 		i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
1955 
1956 		if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
1957 			rs_left = mdev->ov_left;
1958 		else
1959 			rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1960 
1961 		dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1962 		if (!dt)
1963 			dt++;
1964 		db = mdev->rs_mark_left[i] - rs_left;
1965 		dbdt = Bit2KB(db/dt);
1966 
1967 		if (dbdt > mdev->sync_conf.c_min_rate)
1968 			throttle = 1;
1969 	}
1970 	return throttle;
1971 }
1972 
1973 
1974 static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1975 {
1976 	sector_t sector;
1977 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1978 	struct drbd_epoch_entry *e;
1979 	struct digest_info *di = NULL;
1980 	int size, verb;
1981 	unsigned int fault_type;
1982 	struct p_block_req *p =	&mdev->data.rbuf.block_req;
1983 
1984 	sector = be64_to_cpu(p->sector);
1985 	size   = be32_to_cpu(p->blksize);
1986 
1987 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
1988 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1989 				(unsigned long long)sector, size);
1990 		return false;
1991 	}
1992 	if (sector + (size>>9) > capacity) {
1993 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1994 				(unsigned long long)sector, size);
1995 		return false;
1996 	}
1997 
1998 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1999 		verb = 1;
2000 		switch (cmd) {
2001 		case P_DATA_REQUEST:
2002 			drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2003 			break;
2004 		case P_RS_DATA_REQUEST:
2005 		case P_CSUM_RS_REQUEST:
2006 		case P_OV_REQUEST:
2007 			drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2008 			break;
2009 		case P_OV_REPLY:
2010 			verb = 0;
2011 			dec_rs_pending(mdev);
2012 			drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2013 			break;
2014 		default:
2015 			dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2016 				cmdname(cmd));
2017 		}
2018 		if (verb && __ratelimit(&drbd_ratelimit_state))
2019 			dev_err(DEV, "Can not satisfy peer's read request, "
2020 			    "no local data.\n");
2021 
2022 		/* drain possibly payload */
2023 		return drbd_drain_block(mdev, digest_size);
2024 	}
2025 
2026 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2027 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2028 	 * which in turn might block on the other node at this very place.  */
2029 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2030 	if (!e) {
2031 		put_ldev(mdev);
2032 		return false;
2033 	}
2034 
2035 	switch (cmd) {
2036 	case P_DATA_REQUEST:
2037 		e->w.cb = w_e_end_data_req;
2038 		fault_type = DRBD_FAULT_DT_RD;
2039 		/* application IO, don't drbd_rs_begin_io */
2040 		goto submit;
2041 
2042 	case P_RS_DATA_REQUEST:
2043 		e->w.cb = w_e_end_rsdata_req;
2044 		fault_type = DRBD_FAULT_RS_RD;
2045 		/* used in the sector offset progress display */
2046 		mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2047 		break;
2048 
2049 	case P_OV_REPLY:
2050 	case P_CSUM_RS_REQUEST:
2051 		fault_type = DRBD_FAULT_RS_RD;
2052 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2053 		if (!di)
2054 			goto out_free_e;
2055 
2056 		di->digest_size = digest_size;
2057 		di->digest = (((char *)di)+sizeof(struct digest_info));
2058 
2059 		e->digest = di;
2060 		e->flags |= EE_HAS_DIGEST;
2061 
2062 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2063 			goto out_free_e;
2064 
2065 		if (cmd == P_CSUM_RS_REQUEST) {
2066 			D_ASSERT(mdev->agreed_pro_version >= 89);
2067 			e->w.cb = w_e_end_csum_rs_req;
2068 			/* used in the sector offset progress display */
2069 			mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2070 		} else if (cmd == P_OV_REPLY) {
2071 			/* track progress, we may need to throttle */
2072 			atomic_add(size >> 9, &mdev->rs_sect_in);
2073 			e->w.cb = w_e_end_ov_reply;
2074 			dec_rs_pending(mdev);
2075 			/* drbd_rs_begin_io done when we sent this request,
2076 			 * but accounting still needs to be done. */
2077 			goto submit_for_resync;
2078 		}
2079 		break;
2080 
2081 	case P_OV_REQUEST:
2082 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2083 		    mdev->agreed_pro_version >= 90) {
2084 			unsigned long now = jiffies;
2085 			int i;
2086 			mdev->ov_start_sector = sector;
2087 			mdev->ov_position = sector;
2088 			mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2089 			mdev->rs_total = mdev->ov_left;
2090 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2091 				mdev->rs_mark_left[i] = mdev->ov_left;
2092 				mdev->rs_mark_time[i] = now;
2093 			}
2094 			dev_info(DEV, "Online Verify start sector: %llu\n",
2095 					(unsigned long long)sector);
2096 		}
2097 		e->w.cb = w_e_end_ov_req;
2098 		fault_type = DRBD_FAULT_RS_RD;
2099 		break;
2100 
2101 	default:
2102 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2103 		    cmdname(cmd));
2104 		fault_type = DRBD_FAULT_MAX;
2105 		goto out_free_e;
2106 	}
2107 
2108 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2109 	 * wrt the receiver, but it is not as straightforward as it may seem.
2110 	 * Various places in the resync start and stop logic assume resync
2111 	 * requests are processed in order, requeuing this on the worker thread
2112 	 * introduces a bunch of new code for synchronization between threads.
2113 	 *
2114 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2115 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2116 	 * for application writes for the same time.  For now, just throttle
2117 	 * here, where the rest of the code expects the receiver to sleep for
2118 	 * a while, anyways.
2119 	 */
2120 
2121 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2122 	 * this defers syncer requests for some time, before letting at least
2123 	 * on request through.  The resync controller on the receiving side
2124 	 * will adapt to the incoming rate accordingly.
2125 	 *
2126 	 * We cannot throttle here if remote is Primary/SyncTarget:
2127 	 * we would also throttle its application reads.
2128 	 * In that case, throttling is done on the SyncTarget only.
2129 	 */
2130 	if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2131 		schedule_timeout_uninterruptible(HZ/10);
2132 	if (drbd_rs_begin_io(mdev, sector))
2133 		goto out_free_e;
2134 
2135 submit_for_resync:
2136 	atomic_add(size >> 9, &mdev->rs_sect_ev);
2137 
2138 submit:
2139 	inc_unacked(mdev);
2140 	spin_lock_irq(&mdev->req_lock);
2141 	list_add_tail(&e->w.list, &mdev->read_ee);
2142 	spin_unlock_irq(&mdev->req_lock);
2143 
2144 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2145 		return true;
2146 
2147 	/* don't care for the reason here */
2148 	dev_err(DEV, "submit failed, triggering re-connect\n");
2149 	spin_lock_irq(&mdev->req_lock);
2150 	list_del(&e->w.list);
2151 	spin_unlock_irq(&mdev->req_lock);
2152 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2153 
2154 out_free_e:
2155 	put_ldev(mdev);
2156 	drbd_free_ee(mdev, e);
2157 	return false;
2158 }
2159 
2160 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2161 {
2162 	int self, peer, rv = -100;
2163 	unsigned long ch_self, ch_peer;
2164 
2165 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2166 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2167 
2168 	ch_peer = mdev->p_uuid[UI_SIZE];
2169 	ch_self = mdev->comm_bm_set;
2170 
2171 	switch (mdev->net_conf->after_sb_0p) {
2172 	case ASB_CONSENSUS:
2173 	case ASB_DISCARD_SECONDARY:
2174 	case ASB_CALL_HELPER:
2175 		dev_err(DEV, "Configuration error.\n");
2176 		break;
2177 	case ASB_DISCONNECT:
2178 		break;
2179 	case ASB_DISCARD_YOUNGER_PRI:
2180 		if (self == 0 && peer == 1) {
2181 			rv = -1;
2182 			break;
2183 		}
2184 		if (self == 1 && peer == 0) {
2185 			rv =  1;
2186 			break;
2187 		}
2188 		/* Else fall through to one of the other strategies... */
2189 	case ASB_DISCARD_OLDER_PRI:
2190 		if (self == 0 && peer == 1) {
2191 			rv = 1;
2192 			break;
2193 		}
2194 		if (self == 1 && peer == 0) {
2195 			rv = -1;
2196 			break;
2197 		}
2198 		/* Else fall through to one of the other strategies... */
2199 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2200 		     "Using discard-least-changes instead\n");
2201 	case ASB_DISCARD_ZERO_CHG:
2202 		if (ch_peer == 0 && ch_self == 0) {
2203 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2204 				? -1 : 1;
2205 			break;
2206 		} else {
2207 			if (ch_peer == 0) { rv =  1; break; }
2208 			if (ch_self == 0) { rv = -1; break; }
2209 		}
2210 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2211 			break;
2212 	case ASB_DISCARD_LEAST_CHG:
2213 		if	(ch_self < ch_peer)
2214 			rv = -1;
2215 		else if (ch_self > ch_peer)
2216 			rv =  1;
2217 		else /* ( ch_self == ch_peer ) */
2218 		     /* Well, then use something else. */
2219 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2220 				? -1 : 1;
2221 		break;
2222 	case ASB_DISCARD_LOCAL:
2223 		rv = -1;
2224 		break;
2225 	case ASB_DISCARD_REMOTE:
2226 		rv =  1;
2227 	}
2228 
2229 	return rv;
2230 }
2231 
2232 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2233 {
2234 	int hg, rv = -100;
2235 
2236 	switch (mdev->net_conf->after_sb_1p) {
2237 	case ASB_DISCARD_YOUNGER_PRI:
2238 	case ASB_DISCARD_OLDER_PRI:
2239 	case ASB_DISCARD_LEAST_CHG:
2240 	case ASB_DISCARD_LOCAL:
2241 	case ASB_DISCARD_REMOTE:
2242 		dev_err(DEV, "Configuration error.\n");
2243 		break;
2244 	case ASB_DISCONNECT:
2245 		break;
2246 	case ASB_CONSENSUS:
2247 		hg = drbd_asb_recover_0p(mdev);
2248 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2249 			rv = hg;
2250 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2251 			rv = hg;
2252 		break;
2253 	case ASB_VIOLENTLY:
2254 		rv = drbd_asb_recover_0p(mdev);
2255 		break;
2256 	case ASB_DISCARD_SECONDARY:
2257 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2258 	case ASB_CALL_HELPER:
2259 		hg = drbd_asb_recover_0p(mdev);
2260 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2261 			enum drbd_state_rv rv2;
2262 
2263 			drbd_set_role(mdev, R_SECONDARY, 0);
2264 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2265 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2266 			  * we do not need to wait for the after state change work either. */
2267 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2268 			if (rv2 != SS_SUCCESS) {
2269 				drbd_khelper(mdev, "pri-lost-after-sb");
2270 			} else {
2271 				dev_warn(DEV, "Successfully gave up primary role.\n");
2272 				rv = hg;
2273 			}
2274 		} else
2275 			rv = hg;
2276 	}
2277 
2278 	return rv;
2279 }
2280 
2281 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2282 {
2283 	int hg, rv = -100;
2284 
2285 	switch (mdev->net_conf->after_sb_2p) {
2286 	case ASB_DISCARD_YOUNGER_PRI:
2287 	case ASB_DISCARD_OLDER_PRI:
2288 	case ASB_DISCARD_LEAST_CHG:
2289 	case ASB_DISCARD_LOCAL:
2290 	case ASB_DISCARD_REMOTE:
2291 	case ASB_CONSENSUS:
2292 	case ASB_DISCARD_SECONDARY:
2293 		dev_err(DEV, "Configuration error.\n");
2294 		break;
2295 	case ASB_VIOLENTLY:
2296 		rv = drbd_asb_recover_0p(mdev);
2297 		break;
2298 	case ASB_DISCONNECT:
2299 		break;
2300 	case ASB_CALL_HELPER:
2301 		hg = drbd_asb_recover_0p(mdev);
2302 		if (hg == -1) {
2303 			enum drbd_state_rv rv2;
2304 
2305 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2306 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2307 			  * we do not need to wait for the after state change work either. */
2308 			rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2309 			if (rv2 != SS_SUCCESS) {
2310 				drbd_khelper(mdev, "pri-lost-after-sb");
2311 			} else {
2312 				dev_warn(DEV, "Successfully gave up primary role.\n");
2313 				rv = hg;
2314 			}
2315 		} else
2316 			rv = hg;
2317 	}
2318 
2319 	return rv;
2320 }
2321 
2322 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2323 			   u64 bits, u64 flags)
2324 {
2325 	if (!uuid) {
2326 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2327 		return;
2328 	}
2329 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2330 	     text,
2331 	     (unsigned long long)uuid[UI_CURRENT],
2332 	     (unsigned long long)uuid[UI_BITMAP],
2333 	     (unsigned long long)uuid[UI_HISTORY_START],
2334 	     (unsigned long long)uuid[UI_HISTORY_END],
2335 	     (unsigned long long)bits,
2336 	     (unsigned long long)flags);
2337 }
2338 
2339 /*
2340   100	after split brain try auto recover
2341     2	C_SYNC_SOURCE set BitMap
2342     1	C_SYNC_SOURCE use BitMap
2343     0	no Sync
2344    -1	C_SYNC_TARGET use BitMap
2345    -2	C_SYNC_TARGET set BitMap
2346  -100	after split brain, disconnect
2347 -1000	unrelated data
2348 -1091   requires proto 91
2349 -1096   requires proto 96
2350  */
2351 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2352 {
2353 	u64 self, peer;
2354 	int i, j;
2355 
2356 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2357 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2358 
2359 	*rule_nr = 10;
2360 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2361 		return 0;
2362 
2363 	*rule_nr = 20;
2364 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2365 	     peer != UUID_JUST_CREATED)
2366 		return -2;
2367 
2368 	*rule_nr = 30;
2369 	if (self != UUID_JUST_CREATED &&
2370 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2371 		return 2;
2372 
2373 	if (self == peer) {
2374 		int rct, dc; /* roles at crash time */
2375 
2376 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2377 
2378 			if (mdev->agreed_pro_version < 91)
2379 				return -1091;
2380 
2381 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2382 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2383 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2384 				drbd_uuid_set_bm(mdev, 0UL);
2385 
2386 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2387 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2388 				*rule_nr = 34;
2389 			} else {
2390 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2391 				*rule_nr = 36;
2392 			}
2393 
2394 			return 1;
2395 		}
2396 
2397 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2398 
2399 			if (mdev->agreed_pro_version < 91)
2400 				return -1091;
2401 
2402 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2403 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2404 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2405 
2406 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2407 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2408 				mdev->p_uuid[UI_BITMAP] = 0UL;
2409 
2410 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2411 				*rule_nr = 35;
2412 			} else {
2413 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2414 				*rule_nr = 37;
2415 			}
2416 
2417 			return -1;
2418 		}
2419 
2420 		/* Common power [off|failure] */
2421 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2422 			(mdev->p_uuid[UI_FLAGS] & 2);
2423 		/* lowest bit is set when we were primary,
2424 		 * next bit (weight 2) is set when peer was primary */
2425 		*rule_nr = 40;
2426 
2427 		switch (rct) {
2428 		case 0: /* !self_pri && !peer_pri */ return 0;
2429 		case 1: /*  self_pri && !peer_pri */ return 1;
2430 		case 2: /* !self_pri &&  peer_pri */ return -1;
2431 		case 3: /*  self_pri &&  peer_pri */
2432 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2433 			return dc ? -1 : 1;
2434 		}
2435 	}
2436 
2437 	*rule_nr = 50;
2438 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2439 	if (self == peer)
2440 		return -1;
2441 
2442 	*rule_nr = 51;
2443 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2444 	if (self == peer) {
2445 		if (mdev->agreed_pro_version < 96 ?
2446 		    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2447 		    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2448 		    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2449 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2450 			   resync as sync source modifications of the peer's UUIDs. */
2451 
2452 			if (mdev->agreed_pro_version < 91)
2453 				return -1091;
2454 
2455 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2456 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2457 
2458 			dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2459 			drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2460 
2461 			return -1;
2462 		}
2463 	}
2464 
2465 	*rule_nr = 60;
2466 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2467 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2468 		peer = mdev->p_uuid[i] & ~((u64)1);
2469 		if (self == peer)
2470 			return -2;
2471 	}
2472 
2473 	*rule_nr = 70;
2474 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2475 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2476 	if (self == peer)
2477 		return 1;
2478 
2479 	*rule_nr = 71;
2480 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2481 	if (self == peer) {
2482 		if (mdev->agreed_pro_version < 96 ?
2483 		    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2484 		    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2485 		    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2486 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2487 			   resync as sync source modifications of our UUIDs. */
2488 
2489 			if (mdev->agreed_pro_version < 91)
2490 				return -1091;
2491 
2492 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2493 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2494 
2495 			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2496 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2497 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2498 
2499 			return 1;
2500 		}
2501 	}
2502 
2503 
2504 	*rule_nr = 80;
2505 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2506 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2507 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2508 		if (self == peer)
2509 			return 2;
2510 	}
2511 
2512 	*rule_nr = 90;
2513 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2514 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2515 	if (self == peer && self != ((u64)0))
2516 		return 100;
2517 
2518 	*rule_nr = 100;
2519 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2520 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2521 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2522 			peer = mdev->p_uuid[j] & ~((u64)1);
2523 			if (self == peer)
2524 				return -100;
2525 		}
2526 	}
2527 
2528 	return -1000;
2529 }
2530 
2531 /* drbd_sync_handshake() returns the new conn state on success, or
2532    CONN_MASK (-1) on failure.
2533  */
2534 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2535 					   enum drbd_disk_state peer_disk) __must_hold(local)
2536 {
2537 	int hg, rule_nr;
2538 	enum drbd_conns rv = C_MASK;
2539 	enum drbd_disk_state mydisk;
2540 
2541 	mydisk = mdev->state.disk;
2542 	if (mydisk == D_NEGOTIATING)
2543 		mydisk = mdev->new_state_tmp.disk;
2544 
2545 	dev_info(DEV, "drbd_sync_handshake:\n");
2546 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2547 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2548 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2549 
2550 	hg = drbd_uuid_compare(mdev, &rule_nr);
2551 
2552 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2553 
2554 	if (hg == -1000) {
2555 		dev_alert(DEV, "Unrelated data, aborting!\n");
2556 		return C_MASK;
2557 	}
2558 	if (hg < -1000) {
2559 		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2560 		return C_MASK;
2561 	}
2562 
2563 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2564 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2565 		int f = (hg == -100) || abs(hg) == 2;
2566 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2567 		if (f)
2568 			hg = hg*2;
2569 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2570 		     hg > 0 ? "source" : "target");
2571 	}
2572 
2573 	if (abs(hg) == 100)
2574 		drbd_khelper(mdev, "initial-split-brain");
2575 
2576 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2577 		int pcount = (mdev->state.role == R_PRIMARY)
2578 			   + (peer_role == R_PRIMARY);
2579 		int forced = (hg == -100);
2580 
2581 		switch (pcount) {
2582 		case 0:
2583 			hg = drbd_asb_recover_0p(mdev);
2584 			break;
2585 		case 1:
2586 			hg = drbd_asb_recover_1p(mdev);
2587 			break;
2588 		case 2:
2589 			hg = drbd_asb_recover_2p(mdev);
2590 			break;
2591 		}
2592 		if (abs(hg) < 100) {
2593 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2594 			     "automatically solved. Sync from %s node\n",
2595 			     pcount, (hg < 0) ? "peer" : "this");
2596 			if (forced) {
2597 				dev_warn(DEV, "Doing a full sync, since"
2598 				     " UUIDs where ambiguous.\n");
2599 				hg = hg*2;
2600 			}
2601 		}
2602 	}
2603 
2604 	if (hg == -100) {
2605 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2606 			hg = -1;
2607 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2608 			hg = 1;
2609 
2610 		if (abs(hg) < 100)
2611 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2612 			     "Sync from %s node\n",
2613 			     (hg < 0) ? "peer" : "this");
2614 	}
2615 
2616 	if (hg == -100) {
2617 		/* FIXME this log message is not correct if we end up here
2618 		 * after an attempted attach on a diskless node.
2619 		 * We just refuse to attach -- well, we drop the "connection"
2620 		 * to that disk, in a way... */
2621 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2622 		drbd_khelper(mdev, "split-brain");
2623 		return C_MASK;
2624 	}
2625 
2626 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2627 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2628 		return C_MASK;
2629 	}
2630 
2631 	if (hg < 0 && /* by intention we do not use mydisk here. */
2632 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2633 		switch (mdev->net_conf->rr_conflict) {
2634 		case ASB_CALL_HELPER:
2635 			drbd_khelper(mdev, "pri-lost");
2636 			/* fall through */
2637 		case ASB_DISCONNECT:
2638 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2639 			return C_MASK;
2640 		case ASB_VIOLENTLY:
2641 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2642 			     "assumption\n");
2643 		}
2644 	}
2645 
2646 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2647 		if (hg == 0)
2648 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2649 		else
2650 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2651 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2652 				 abs(hg) >= 2 ? "full" : "bit-map based");
2653 		return C_MASK;
2654 	}
2655 
2656 	if (abs(hg) >= 2) {
2657 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2658 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2659 					BM_LOCKED_SET_ALLOWED))
2660 			return C_MASK;
2661 	}
2662 
2663 	if (hg > 0) { /* become sync source. */
2664 		rv = C_WF_BITMAP_S;
2665 	} else if (hg < 0) { /* become sync target */
2666 		rv = C_WF_BITMAP_T;
2667 	} else {
2668 		rv = C_CONNECTED;
2669 		if (drbd_bm_total_weight(mdev)) {
2670 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2671 			     drbd_bm_total_weight(mdev));
2672 		}
2673 	}
2674 
2675 	return rv;
2676 }
2677 
2678 /* returns 1 if invalid */
2679 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2680 {
2681 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2682 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2683 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2684 		return 0;
2685 
2686 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2687 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2688 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2689 		return 1;
2690 
2691 	/* everything else is valid if they are equal on both sides. */
2692 	if (peer == self)
2693 		return 0;
2694 
2695 	/* everything es is invalid. */
2696 	return 1;
2697 }
2698 
2699 static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2700 {
2701 	struct p_protocol *p = &mdev->data.rbuf.protocol;
2702 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2703 	int p_want_lose, p_two_primaries, cf;
2704 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2705 
2706 	p_proto		= be32_to_cpu(p->protocol);
2707 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2708 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2709 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2710 	p_two_primaries = be32_to_cpu(p->two_primaries);
2711 	cf		= be32_to_cpu(p->conn_flags);
2712 	p_want_lose = cf & CF_WANT_LOSE;
2713 
2714 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2715 
2716 	if (cf & CF_DRY_RUN)
2717 		set_bit(CONN_DRY_RUN, &mdev->flags);
2718 
2719 	if (p_proto != mdev->net_conf->wire_protocol) {
2720 		dev_err(DEV, "incompatible communication protocols\n");
2721 		goto disconnect;
2722 	}
2723 
2724 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2725 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2726 		goto disconnect;
2727 	}
2728 
2729 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2730 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2731 		goto disconnect;
2732 	}
2733 
2734 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2735 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2736 		goto disconnect;
2737 	}
2738 
2739 	if (p_want_lose && mdev->net_conf->want_lose) {
2740 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2741 		goto disconnect;
2742 	}
2743 
2744 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2745 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2746 		goto disconnect;
2747 	}
2748 
2749 	if (mdev->agreed_pro_version >= 87) {
2750 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2751 
2752 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2753 			return false;
2754 
2755 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2756 		if (strcmp(p_integrity_alg, my_alg)) {
2757 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2758 			goto disconnect;
2759 		}
2760 		dev_info(DEV, "data-integrity-alg: %s\n",
2761 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2762 	}
2763 
2764 	return true;
2765 
2766 disconnect:
2767 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2768 	return false;
2769 }
2770 
2771 /* helper function
2772  * input: alg name, feature name
2773  * return: NULL (alg name was "")
2774  *         ERR_PTR(error) if something goes wrong
2775  *         or the crypto hash ptr, if it worked out ok. */
2776 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2777 		const char *alg, const char *name)
2778 {
2779 	struct crypto_hash *tfm;
2780 
2781 	if (!alg[0])
2782 		return NULL;
2783 
2784 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2785 	if (IS_ERR(tfm)) {
2786 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2787 			alg, name, PTR_ERR(tfm));
2788 		return tfm;
2789 	}
2790 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2791 		crypto_free_hash(tfm);
2792 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2793 		return ERR_PTR(-EINVAL);
2794 	}
2795 	return tfm;
2796 }
2797 
2798 static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2799 {
2800 	int ok = true;
2801 	struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2802 	unsigned int header_size, data_size, exp_max_sz;
2803 	struct crypto_hash *verify_tfm = NULL;
2804 	struct crypto_hash *csums_tfm = NULL;
2805 	const int apv = mdev->agreed_pro_version;
2806 	int *rs_plan_s = NULL;
2807 	int fifo_size = 0;
2808 
2809 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2810 		    : apv == 88 ? sizeof(struct p_rs_param)
2811 					+ SHARED_SECRET_MAX
2812 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
2813 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2814 
2815 	if (packet_size > exp_max_sz) {
2816 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2817 		    packet_size, exp_max_sz);
2818 		return false;
2819 	}
2820 
2821 	if (apv <= 88) {
2822 		header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2823 		data_size   = packet_size  - header_size;
2824 	} else if (apv <= 94) {
2825 		header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2826 		data_size   = packet_size  - header_size;
2827 		D_ASSERT(data_size == 0);
2828 	} else {
2829 		header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2830 		data_size   = packet_size  - header_size;
2831 		D_ASSERT(data_size == 0);
2832 	}
2833 
2834 	/* initialize verify_alg and csums_alg */
2835 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2836 
2837 	if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2838 		return false;
2839 
2840 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2841 
2842 	if (apv >= 88) {
2843 		if (apv == 88) {
2844 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
2845 				dev_err(DEV, "verify-alg of wrong size, "
2846 					"peer wants %u, accepting only up to %u byte\n",
2847 					data_size, SHARED_SECRET_MAX);
2848 				return false;
2849 			}
2850 
2851 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2852 				return false;
2853 
2854 			/* we expect NUL terminated string */
2855 			/* but just in case someone tries to be evil */
2856 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2857 			p->verify_alg[data_size-1] = 0;
2858 
2859 		} else /* apv >= 89 */ {
2860 			/* we still expect NUL terminated strings */
2861 			/* but just in case someone tries to be evil */
2862 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2863 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2864 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2865 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2866 		}
2867 
2868 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2869 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2870 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2871 				    mdev->sync_conf.verify_alg, p->verify_alg);
2872 				goto disconnect;
2873 			}
2874 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2875 					p->verify_alg, "verify-alg");
2876 			if (IS_ERR(verify_tfm)) {
2877 				verify_tfm = NULL;
2878 				goto disconnect;
2879 			}
2880 		}
2881 
2882 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2883 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2884 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2885 				    mdev->sync_conf.csums_alg, p->csums_alg);
2886 				goto disconnect;
2887 			}
2888 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2889 					p->csums_alg, "csums-alg");
2890 			if (IS_ERR(csums_tfm)) {
2891 				csums_tfm = NULL;
2892 				goto disconnect;
2893 			}
2894 		}
2895 
2896 		if (apv > 94) {
2897 			mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2898 			mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2899 			mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2900 			mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2901 			mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2902 
2903 			fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2904 			if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2905 				rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2906 				if (!rs_plan_s) {
2907 					dev_err(DEV, "kmalloc of fifo_buffer failed");
2908 					goto disconnect;
2909 				}
2910 			}
2911 		}
2912 
2913 		spin_lock(&mdev->peer_seq_lock);
2914 		/* lock against drbd_nl_syncer_conf() */
2915 		if (verify_tfm) {
2916 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2917 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2918 			crypto_free_hash(mdev->verify_tfm);
2919 			mdev->verify_tfm = verify_tfm;
2920 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2921 		}
2922 		if (csums_tfm) {
2923 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2924 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2925 			crypto_free_hash(mdev->csums_tfm);
2926 			mdev->csums_tfm = csums_tfm;
2927 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2928 		}
2929 		if (fifo_size != mdev->rs_plan_s.size) {
2930 			kfree(mdev->rs_plan_s.values);
2931 			mdev->rs_plan_s.values = rs_plan_s;
2932 			mdev->rs_plan_s.size   = fifo_size;
2933 			mdev->rs_planed = 0;
2934 		}
2935 		spin_unlock(&mdev->peer_seq_lock);
2936 	}
2937 
2938 	return ok;
2939 disconnect:
2940 	/* just for completeness: actually not needed,
2941 	 * as this is not reached if csums_tfm was ok. */
2942 	crypto_free_hash(csums_tfm);
2943 	/* but free the verify_tfm again, if csums_tfm did not work out */
2944 	crypto_free_hash(verify_tfm);
2945 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2946 	return false;
2947 }
2948 
2949 /* warn if the arguments differ by more than 12.5% */
2950 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2951 	const char *s, sector_t a, sector_t b)
2952 {
2953 	sector_t d;
2954 	if (a == 0 || b == 0)
2955 		return;
2956 	d = (a > b) ? (a - b) : (b - a);
2957 	if (d > (a>>3) || d > (b>>3))
2958 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2959 		     (unsigned long long)a, (unsigned long long)b);
2960 }
2961 
2962 static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2963 {
2964 	struct p_sizes *p = &mdev->data.rbuf.sizes;
2965 	enum determine_dev_size dd = unchanged;
2966 	sector_t p_size, p_usize, my_usize;
2967 	int ldsc = 0; /* local disk size changed */
2968 	enum dds_flags ddsf;
2969 
2970 	p_size = be64_to_cpu(p->d_size);
2971 	p_usize = be64_to_cpu(p->u_size);
2972 
2973 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2974 		dev_err(DEV, "some backing storage is needed\n");
2975 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2976 		return false;
2977 	}
2978 
2979 	/* just store the peer's disk size for now.
2980 	 * we still need to figure out whether we accept that. */
2981 	mdev->p_size = p_size;
2982 
2983 	if (get_ldev(mdev)) {
2984 		warn_if_differ_considerably(mdev, "lower level device sizes",
2985 			   p_size, drbd_get_max_capacity(mdev->ldev));
2986 		warn_if_differ_considerably(mdev, "user requested size",
2987 					    p_usize, mdev->ldev->dc.disk_size);
2988 
2989 		/* if this is the first connect, or an otherwise expected
2990 		 * param exchange, choose the minimum */
2991 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2992 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2993 					     p_usize);
2994 
2995 		my_usize = mdev->ldev->dc.disk_size;
2996 
2997 		if (mdev->ldev->dc.disk_size != p_usize) {
2998 			mdev->ldev->dc.disk_size = p_usize;
2999 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3000 			     (unsigned long)mdev->ldev->dc.disk_size);
3001 		}
3002 
3003 		/* Never shrink a device with usable data during connect.
3004 		   But allow online shrinking if we are connected. */
3005 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3006 		   drbd_get_capacity(mdev->this_bdev) &&
3007 		   mdev->state.disk >= D_OUTDATED &&
3008 		   mdev->state.conn < C_CONNECTED) {
3009 			dev_err(DEV, "The peer's disk size is too small!\n");
3010 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3011 			mdev->ldev->dc.disk_size = my_usize;
3012 			put_ldev(mdev);
3013 			return false;
3014 		}
3015 		put_ldev(mdev);
3016 	}
3017 
3018 	ddsf = be16_to_cpu(p->dds_flags);
3019 	if (get_ldev(mdev)) {
3020 		dd = drbd_determine_dev_size(mdev, ddsf);
3021 		put_ldev(mdev);
3022 		if (dd == dev_size_error)
3023 			return false;
3024 		drbd_md_sync(mdev);
3025 	} else {
3026 		/* I am diskless, need to accept the peer's size. */
3027 		drbd_set_my_capacity(mdev, p_size);
3028 	}
3029 
3030 	mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3031 	drbd_reconsider_max_bio_size(mdev);
3032 
3033 	if (get_ldev(mdev)) {
3034 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3035 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3036 			ldsc = 1;
3037 		}
3038 
3039 		put_ldev(mdev);
3040 	}
3041 
3042 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043 		if (be64_to_cpu(p->c_size) !=
3044 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045 			/* we have different sizes, probably peer
3046 			 * needs to know my new size... */
3047 			drbd_send_sizes(mdev, 0, ddsf);
3048 		}
3049 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3052 			    mdev->state.disk >= D_INCONSISTENT) {
3053 				if (ddsf & DDSF_NO_RESYNC)
3054 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055 				else
3056 					resync_after_online_grow(mdev);
3057 			} else
3058 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059 		}
3060 	}
3061 
3062 	return true;
3063 }
3064 
3065 static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3066 {
3067 	struct p_uuids *p = &mdev->data.rbuf.uuids;
3068 	u64 *p_uuid;
3069 	int i, updated_uuids = 0;
3070 
3071 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3072 
3073 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3074 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3075 
3076 	kfree(mdev->p_uuid);
3077 	mdev->p_uuid = p_uuid;
3078 
3079 	if (mdev->state.conn < C_CONNECTED &&
3080 	    mdev->state.disk < D_INCONSISTENT &&
3081 	    mdev->state.role == R_PRIMARY &&
3082 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3083 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3084 		    (unsigned long long)mdev->ed_uuid);
3085 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3086 		return false;
3087 	}
3088 
3089 	if (get_ldev(mdev)) {
3090 		int skip_initial_sync =
3091 			mdev->state.conn == C_CONNECTED &&
3092 			mdev->agreed_pro_version >= 90 &&
3093 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3094 			(p_uuid[UI_FLAGS] & 8);
3095 		if (skip_initial_sync) {
3096 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3097 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3098 					"clear_n_write from receive_uuids",
3099 					BM_LOCKED_TEST_ALLOWED);
3100 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3101 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3102 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3103 					CS_VERBOSE, NULL);
3104 			drbd_md_sync(mdev);
3105 			updated_uuids = 1;
3106 		}
3107 		put_ldev(mdev);
3108 	} else if (mdev->state.disk < D_INCONSISTENT &&
3109 		   mdev->state.role == R_PRIMARY) {
3110 		/* I am a diskless primary, the peer just created a new current UUID
3111 		   for me. */
3112 		updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3113 	}
3114 
3115 	/* Before we test for the disk state, we should wait until an eventually
3116 	   ongoing cluster wide state change is finished. That is important if
3117 	   we are primary and are detaching from our disk. We need to see the
3118 	   new disk state... */
3119 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3120 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3121 		updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3122 
3123 	if (updated_uuids)
3124 		drbd_print_uuids(mdev, "receiver updated UUIDs to");
3125 
3126 	return true;
3127 }
3128 
3129 /**
3130  * convert_state() - Converts the peer's view of the cluster state to our point of view
3131  * @ps:		The state as seen by the peer.
3132  */
3133 static union drbd_state convert_state(union drbd_state ps)
3134 {
3135 	union drbd_state ms;
3136 
3137 	static enum drbd_conns c_tab[] = {
3138 		[C_CONNECTED] = C_CONNECTED,
3139 
3140 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3141 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3142 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3143 		[C_VERIFY_S]       = C_VERIFY_T,
3144 		[C_MASK]   = C_MASK,
3145 	};
3146 
3147 	ms.i = ps.i;
3148 
3149 	ms.conn = c_tab[ps.conn];
3150 	ms.peer = ps.role;
3151 	ms.role = ps.peer;
3152 	ms.pdsk = ps.disk;
3153 	ms.disk = ps.pdsk;
3154 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3155 
3156 	return ms;
3157 }
3158 
3159 static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3160 {
3161 	struct p_req_state *p = &mdev->data.rbuf.req_state;
3162 	union drbd_state mask, val;
3163 	enum drbd_state_rv rv;
3164 
3165 	mask.i = be32_to_cpu(p->mask);
3166 	val.i = be32_to_cpu(p->val);
3167 
3168 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3169 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3170 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3171 		return true;
3172 	}
3173 
3174 	mask = convert_state(mask);
3175 	val = convert_state(val);
3176 
3177 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3178 
3179 	drbd_send_sr_reply(mdev, rv);
3180 	drbd_md_sync(mdev);
3181 
3182 	return true;
3183 }
3184 
3185 static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3186 {
3187 	struct p_state *p = &mdev->data.rbuf.state;
3188 	union drbd_state os, ns, peer_state;
3189 	enum drbd_disk_state real_peer_disk;
3190 	enum chg_state_flags cs_flags;
3191 	int rv;
3192 
3193 	peer_state.i = be32_to_cpu(p->state);
3194 
3195 	real_peer_disk = peer_state.disk;
3196 	if (peer_state.disk == D_NEGOTIATING) {
3197 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3198 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3199 	}
3200 
3201 	spin_lock_irq(&mdev->req_lock);
3202  retry:
3203 	os = ns = mdev->state;
3204 	spin_unlock_irq(&mdev->req_lock);
3205 
3206 	/* If some other part of the code (asender thread, timeout)
3207 	 * already decided to close the connection again,
3208 	 * we must not "re-establish" it here. */
3209 	if (os.conn <= C_TEAR_DOWN)
3210 		return false;
3211 
3212 	/* If this is the "end of sync" confirmation, usually the peer disk
3213 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3214 	 * set) resync started in PausedSyncT, or if the timing of pause-/
3215 	 * unpause-sync events has been "just right", the peer disk may
3216 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3217 	 */
3218 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3219 	    real_peer_disk == D_UP_TO_DATE &&
3220 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3221 		/* If we are (becoming) SyncSource, but peer is still in sync
3222 		 * preparation, ignore its uptodate-ness to avoid flapping, it
3223 		 * will change to inconsistent once the peer reaches active
3224 		 * syncing states.
3225 		 * It may have changed syncer-paused flags, however, so we
3226 		 * cannot ignore this completely. */
3227 		if (peer_state.conn > C_CONNECTED &&
3228 		    peer_state.conn < C_SYNC_SOURCE)
3229 			real_peer_disk = D_INCONSISTENT;
3230 
3231 		/* if peer_state changes to connected at the same time,
3232 		 * it explicitly notifies us that it finished resync.
3233 		 * Maybe we should finish it up, too? */
3234 		else if (os.conn >= C_SYNC_SOURCE &&
3235 			 peer_state.conn == C_CONNECTED) {
3236 			if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3237 				drbd_resync_finished(mdev);
3238 			return true;
3239 		}
3240 	}
3241 
3242 	/* peer says his disk is inconsistent, while we think it is uptodate,
3243 	 * and this happens while the peer still thinks we have a sync going on,
3244 	 * but we think we are already done with the sync.
3245 	 * We ignore this to avoid flapping pdsk.
3246 	 * This should not happen, if the peer is a recent version of drbd. */
3247 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3248 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3249 		real_peer_disk = D_UP_TO_DATE;
3250 
3251 	if (ns.conn == C_WF_REPORT_PARAMS)
3252 		ns.conn = C_CONNECTED;
3253 
3254 	if (peer_state.conn == C_AHEAD)
3255 		ns.conn = C_BEHIND;
3256 
3257 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3258 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3259 		int cr; /* consider resync */
3260 
3261 		/* if we established a new connection */
3262 		cr  = (os.conn < C_CONNECTED);
3263 		/* if we had an established connection
3264 		 * and one of the nodes newly attaches a disk */
3265 		cr |= (os.conn == C_CONNECTED &&
3266 		       (peer_state.disk == D_NEGOTIATING ||
3267 			os.disk == D_NEGOTIATING));
3268 		/* if we have both been inconsistent, and the peer has been
3269 		 * forced to be UpToDate with --overwrite-data */
3270 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3271 		/* if we had been plain connected, and the admin requested to
3272 		 * start a sync by "invalidate" or "invalidate-remote" */
3273 		cr |= (os.conn == C_CONNECTED &&
3274 				(peer_state.conn >= C_STARTING_SYNC_S &&
3275 				 peer_state.conn <= C_WF_BITMAP_T));
3276 
3277 		if (cr)
3278 			ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3279 
3280 		put_ldev(mdev);
3281 		if (ns.conn == C_MASK) {
3282 			ns.conn = C_CONNECTED;
3283 			if (mdev->state.disk == D_NEGOTIATING) {
3284 				drbd_force_state(mdev, NS(disk, D_FAILED));
3285 			} else if (peer_state.disk == D_NEGOTIATING) {
3286 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3287 				peer_state.disk = D_DISKLESS;
3288 				real_peer_disk = D_DISKLESS;
3289 			} else {
3290 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3291 					return false;
3292 				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3293 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3294 				return false;
3295 			}
3296 		}
3297 	}
3298 
3299 	spin_lock_irq(&mdev->req_lock);
3300 	if (mdev->state.i != os.i)
3301 		goto retry;
3302 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3303 	ns.peer = peer_state.role;
3304 	ns.pdsk = real_peer_disk;
3305 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3306 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3307 		ns.disk = mdev->new_state_tmp.disk;
3308 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3309 	if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3310 	    test_bit(NEW_CUR_UUID, &mdev->flags)) {
3311 		/* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3312 		   for temporal network outages! */
3313 		spin_unlock_irq(&mdev->req_lock);
3314 		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3315 		tl_clear(mdev);
3316 		drbd_uuid_new_current(mdev);
3317 		clear_bit(NEW_CUR_UUID, &mdev->flags);
3318 		drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3319 		return false;
3320 	}
3321 	rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3322 	ns = mdev->state;
3323 	spin_unlock_irq(&mdev->req_lock);
3324 
3325 	if (rv < SS_SUCCESS) {
3326 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3327 		return false;
3328 	}
3329 
3330 	if (os.conn > C_WF_REPORT_PARAMS) {
3331 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3332 		    peer_state.disk != D_NEGOTIATING ) {
3333 			/* we want resync, peer has not yet decided to sync... */
3334 			/* Nowadays only used when forcing a node into primary role and
3335 			   setting its disk to UpToDate with that */
3336 			drbd_send_uuids(mdev);
3337 			drbd_send_current_state(mdev);
3338 		}
3339 	}
3340 
3341 	mdev->net_conf->want_lose = 0;
3342 
3343 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3344 
3345 	return true;
3346 }
3347 
3348 static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3349 {
3350 	struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3351 
3352 	wait_event(mdev->misc_wait,
3353 		   mdev->state.conn == C_WF_SYNC_UUID ||
3354 		   mdev->state.conn == C_BEHIND ||
3355 		   mdev->state.conn < C_CONNECTED ||
3356 		   mdev->state.disk < D_NEGOTIATING);
3357 
3358 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3359 
3360 	/* Here the _drbd_uuid_ functions are right, current should
3361 	   _not_ be rotated into the history */
3362 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3363 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3364 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3365 
3366 		drbd_print_uuids(mdev, "updated sync uuid");
3367 		drbd_start_resync(mdev, C_SYNC_TARGET);
3368 
3369 		put_ldev(mdev);
3370 	} else
3371 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3372 
3373 	return true;
3374 }
3375 
3376 /**
3377  * receive_bitmap_plain
3378  *
3379  * Return 0 when done, 1 when another iteration is needed, and a negative error
3380  * code upon failure.
3381  */
3382 static int
3383 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3384 		     unsigned long *buffer, struct bm_xfer_ctx *c)
3385 {
3386 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3387 	unsigned want = num_words * sizeof(long);
3388 	int err;
3389 
3390 	if (want != data_size) {
3391 		dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3392 		return -EIO;
3393 	}
3394 	if (want == 0)
3395 		return 0;
3396 	err = drbd_recv(mdev, buffer, want);
3397 	if (err != want) {
3398 		if (err >= 0)
3399 			err = -EIO;
3400 		return err;
3401 	}
3402 
3403 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3404 
3405 	c->word_offset += num_words;
3406 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3407 	if (c->bit_offset > c->bm_bits)
3408 		c->bit_offset = c->bm_bits;
3409 
3410 	return 1;
3411 }
3412 
3413 /**
3414  * recv_bm_rle_bits
3415  *
3416  * Return 0 when done, 1 when another iteration is needed, and a negative error
3417  * code upon failure.
3418  */
3419 static int
3420 recv_bm_rle_bits(struct drbd_conf *mdev,
3421 		struct p_compressed_bm *p,
3422 		struct bm_xfer_ctx *c)
3423 {
3424 	struct bitstream bs;
3425 	u64 look_ahead;
3426 	u64 rl;
3427 	u64 tmp;
3428 	unsigned long s = c->bit_offset;
3429 	unsigned long e;
3430 	int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3431 	int toggle = DCBP_get_start(p);
3432 	int have;
3433 	int bits;
3434 
3435 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3436 
3437 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3438 	if (bits < 0)
3439 		return -EIO;
3440 
3441 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3442 		bits = vli_decode_bits(&rl, look_ahead);
3443 		if (bits <= 0)
3444 			return -EIO;
3445 
3446 		if (toggle) {
3447 			e = s + rl -1;
3448 			if (e >= c->bm_bits) {
3449 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3450 				return -EIO;
3451 			}
3452 			_drbd_bm_set_bits(mdev, s, e);
3453 		}
3454 
3455 		if (have < bits) {
3456 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3457 				have, bits, look_ahead,
3458 				(unsigned int)(bs.cur.b - p->code),
3459 				(unsigned int)bs.buf_len);
3460 			return -EIO;
3461 		}
3462 		look_ahead >>= bits;
3463 		have -= bits;
3464 
3465 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3466 		if (bits < 0)
3467 			return -EIO;
3468 		look_ahead |= tmp << have;
3469 		have += bits;
3470 	}
3471 
3472 	c->bit_offset = s;
3473 	bm_xfer_ctx_bit_to_word_offset(c);
3474 
3475 	return (s != c->bm_bits);
3476 }
3477 
3478 /**
3479  * decode_bitmap_c
3480  *
3481  * Return 0 when done, 1 when another iteration is needed, and a negative error
3482  * code upon failure.
3483  */
3484 static int
3485 decode_bitmap_c(struct drbd_conf *mdev,
3486 		struct p_compressed_bm *p,
3487 		struct bm_xfer_ctx *c)
3488 {
3489 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3490 		return recv_bm_rle_bits(mdev, p, c);
3491 
3492 	/* other variants had been implemented for evaluation,
3493 	 * but have been dropped as this one turned out to be "best"
3494 	 * during all our tests. */
3495 
3496 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3497 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3498 	return -EIO;
3499 }
3500 
3501 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3502 		const char *direction, struct bm_xfer_ctx *c)
3503 {
3504 	/* what would it take to transfer it "plaintext" */
3505 	unsigned plain = sizeof(struct p_header80) *
3506 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3507 		+ c->bm_words * sizeof(long);
3508 	unsigned total = c->bytes[0] + c->bytes[1];
3509 	unsigned r;
3510 
3511 	/* total can not be zero. but just in case: */
3512 	if (total == 0)
3513 		return;
3514 
3515 	/* don't report if not compressed */
3516 	if (total >= plain)
3517 		return;
3518 
3519 	/* total < plain. check for overflow, still */
3520 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3521 		                    : (1000 * total / plain);
3522 
3523 	if (r > 1000)
3524 		r = 1000;
3525 
3526 	r = 1000 - r;
3527 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3528 	     "total %u; compression: %u.%u%%\n",
3529 			direction,
3530 			c->bytes[1], c->packets[1],
3531 			c->bytes[0], c->packets[0],
3532 			total, r/10, r % 10);
3533 }
3534 
3535 /* Since we are processing the bitfield from lower addresses to higher,
3536    it does not matter if the process it in 32 bit chunks or 64 bit
3537    chunks as long as it is little endian. (Understand it as byte stream,
3538    beginning with the lowest byte...) If we would use big endian
3539    we would need to process it from the highest address to the lowest,
3540    in order to be agnostic to the 32 vs 64 bits issue.
3541 
3542    returns 0 on failure, 1 if we successfully received it. */
3543 static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3544 {
3545 	struct bm_xfer_ctx c;
3546 	void *buffer;
3547 	int err;
3548 	int ok = false;
3549 	struct p_header80 *h = &mdev->data.rbuf.header.h80;
3550 
3551 	drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3552 	/* you are supposed to send additional out-of-sync information
3553 	 * if you actually set bits during this phase */
3554 
3555 	/* maybe we should use some per thread scratch page,
3556 	 * and allocate that during initial device creation? */
3557 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3558 	if (!buffer) {
3559 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3560 		goto out;
3561 	}
3562 
3563 	c = (struct bm_xfer_ctx) {
3564 		.bm_bits = drbd_bm_bits(mdev),
3565 		.bm_words = drbd_bm_words(mdev),
3566 	};
3567 
3568 	for(;;) {
3569 		if (cmd == P_BITMAP) {
3570 			err = receive_bitmap_plain(mdev, data_size, buffer, &c);
3571 		} else if (cmd == P_COMPRESSED_BITMAP) {
3572 			/* MAYBE: sanity check that we speak proto >= 90,
3573 			 * and the feature is enabled! */
3574 			struct p_compressed_bm *p;
3575 
3576 			if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3577 				dev_err(DEV, "ReportCBitmap packet too large\n");
3578 				goto out;
3579 			}
3580 			/* use the page buff */
3581 			p = buffer;
3582 			memcpy(p, h, sizeof(*h));
3583 			if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3584 				goto out;
3585 			if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3586 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3587 				goto out;
3588 			}
3589 			err = decode_bitmap_c(mdev, p, &c);
3590 		} else {
3591 			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3592 			goto out;
3593 		}
3594 
3595 		c.packets[cmd == P_BITMAP]++;
3596 		c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3597 
3598 		if (err <= 0) {
3599 			if (err < 0)
3600 				goto out;
3601 			break;
3602 		}
3603 		if (!drbd_recv_header(mdev, &cmd, &data_size))
3604 			goto out;
3605 	}
3606 
3607 	INFO_bm_xfer_stats(mdev, "receive", &c);
3608 
3609 	if (mdev->state.conn == C_WF_BITMAP_T) {
3610 		enum drbd_state_rv rv;
3611 
3612 		ok = !drbd_send_bitmap(mdev);
3613 		if (!ok)
3614 			goto out;
3615 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3616 		rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3617 		D_ASSERT(rv == SS_SUCCESS);
3618 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3619 		/* admin may have requested C_DISCONNECTING,
3620 		 * other threads may have noticed network errors */
3621 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3622 		    drbd_conn_str(mdev->state.conn));
3623 	}
3624 
3625 	ok = true;
3626  out:
3627 	drbd_bm_unlock(mdev);
3628 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3629 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3630 	free_page((unsigned long) buffer);
3631 	return ok;
3632 }
3633 
3634 static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3635 {
3636 	/* TODO zero copy sink :) */
3637 	static char sink[128];
3638 	int size, want, r;
3639 
3640 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3641 		 cmd, data_size);
3642 
3643 	size = data_size;
3644 	while (size > 0) {
3645 		want = min_t(int, size, sizeof(sink));
3646 		r = drbd_recv(mdev, sink, want);
3647 		ERR_IF(r <= 0) break;
3648 		size -= r;
3649 	}
3650 	return size == 0;
3651 }
3652 
3653 static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3654 {
3655 	/* Make sure we've acked all the TCP data associated
3656 	 * with the data requests being unplugged */
3657 	drbd_tcp_quickack(mdev->data.socket);
3658 
3659 	return true;
3660 }
3661 
3662 static int receive_out_of_sync(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3663 {
3664 	struct p_block_desc *p = &mdev->data.rbuf.block_desc;
3665 
3666 	switch (mdev->state.conn) {
3667 	case C_WF_SYNC_UUID:
3668 	case C_WF_BITMAP_T:
3669 	case C_BEHIND:
3670 			break;
3671 	default:
3672 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
3673 				drbd_conn_str(mdev->state.conn));
3674 	}
3675 
3676 	drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
3677 
3678 	return true;
3679 }
3680 
3681 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3682 
3683 struct data_cmd {
3684 	int expect_payload;
3685 	size_t pkt_size;
3686 	drbd_cmd_handler_f function;
3687 };
3688 
3689 static struct data_cmd drbd_cmd_handler[] = {
3690 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
3691 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
3692 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3693 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3694 	[P_BITMAP]	    = { 1, sizeof(struct p_header80), receive_bitmap } ,
3695 	[P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3696 	[P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3697 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3698 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3699 	[P_SYNC_PARAM]	    = { 1, sizeof(struct p_header80), receive_SyncParam },
3700 	[P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3701 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3702 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
3703 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
3704 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
3705 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3706 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3707 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3708 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3709 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3710 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3711 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
3712 	/* anything missing from this table is in
3713 	 * the asender_tbl, see get_asender_cmd */
3714 	[P_MAX_CMD]	    = { 0, 0, NULL },
3715 };
3716 
3717 /* All handler functions that expect a sub-header get that sub-heder in
3718    mdev->data.rbuf.header.head.payload.
3719 
3720    Usually in mdev->data.rbuf.header.head the callback can find the usual
3721    p_header, but they may not rely on that. Since there is also p_header95 !
3722  */
3723 
3724 static void drbdd(struct drbd_conf *mdev)
3725 {
3726 	union p_header *header = &mdev->data.rbuf.header;
3727 	unsigned int packet_size;
3728 	enum drbd_packets cmd;
3729 	size_t shs; /* sub header size */
3730 	int rv;
3731 
3732 	while (get_t_state(&mdev->receiver) == Running) {
3733 		drbd_thread_current_set_cpu(mdev);
3734 		if (!drbd_recv_header(mdev, &cmd, &packet_size))
3735 			goto err_out;
3736 
3737 		if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3738 			dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3739 			goto err_out;
3740 		}
3741 
3742 		shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3743 		if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3744 			dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3745 			goto err_out;
3746 		}
3747 
3748 		if (shs) {
3749 			rv = drbd_recv(mdev, &header->h80.payload, shs);
3750 			if (unlikely(rv != shs)) {
3751 				if (!signal_pending(current))
3752 					dev_warn(DEV, "short read while reading sub header: rv=%d\n", rv);
3753 				goto err_out;
3754 			}
3755 		}
3756 
3757 		rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3758 
3759 		if (unlikely(!rv)) {
3760 			dev_err(DEV, "error receiving %s, l: %d!\n",
3761 			    cmdname(cmd), packet_size);
3762 			goto err_out;
3763 		}
3764 	}
3765 
3766 	if (0) {
3767 	err_out:
3768 		drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3769 	}
3770 	/* If we leave here, we probably want to update at least the
3771 	 * "Connected" indicator on stable storage. Do so explicitly here. */
3772 	drbd_md_sync(mdev);
3773 }
3774 
3775 void drbd_flush_workqueue(struct drbd_conf *mdev)
3776 {
3777 	struct drbd_wq_barrier barr;
3778 
3779 	barr.w.cb = w_prev_work_done;
3780 	init_completion(&barr.done);
3781 	drbd_queue_work(&mdev->data.work, &barr.w);
3782 	wait_for_completion(&barr.done);
3783 }
3784 
3785 void drbd_free_tl_hash(struct drbd_conf *mdev)
3786 {
3787 	struct hlist_head *h;
3788 
3789 	spin_lock_irq(&mdev->req_lock);
3790 
3791 	if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3792 		spin_unlock_irq(&mdev->req_lock);
3793 		return;
3794 	}
3795 	/* paranoia code */
3796 	for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3797 		if (h->first)
3798 			dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3799 				(int)(h - mdev->ee_hash), h->first);
3800 	kfree(mdev->ee_hash);
3801 	mdev->ee_hash = NULL;
3802 	mdev->ee_hash_s = 0;
3803 
3804 	/* paranoia code */
3805 	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3806 		if (h->first)
3807 			dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3808 				(int)(h - mdev->tl_hash), h->first);
3809 	kfree(mdev->tl_hash);
3810 	mdev->tl_hash = NULL;
3811 	mdev->tl_hash_s = 0;
3812 	spin_unlock_irq(&mdev->req_lock);
3813 }
3814 
3815 static void drbd_disconnect(struct drbd_conf *mdev)
3816 {
3817 	enum drbd_fencing_p fp;
3818 	union drbd_state os, ns;
3819 	int rv = SS_UNKNOWN_ERROR;
3820 	unsigned int i;
3821 
3822 	if (mdev->state.conn == C_STANDALONE)
3823 		return;
3824 
3825 	/* We are about to start the cleanup after connection loss.
3826 	 * Make sure drbd_make_request knows about that.
3827 	 * Usually we should be in some network failure state already,
3828 	 * but just in case we are not, we fix it up here.
3829 	 */
3830 	drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
3831 
3832 	/* asender does not clean up anything. it must not interfere, either */
3833 	drbd_thread_stop(&mdev->asender);
3834 	drbd_free_sock(mdev);
3835 
3836 	/* wait for current activity to cease. */
3837 	spin_lock_irq(&mdev->req_lock);
3838 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3839 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3840 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3841 	spin_unlock_irq(&mdev->req_lock);
3842 
3843 	/* We do not have data structures that would allow us to
3844 	 * get the rs_pending_cnt down to 0 again.
3845 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3846 	 *    the pending RSDataRequest's we have sent.
3847 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3848 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3849 	 *  And no, it is not the sum of the reference counts in the
3850 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3851 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3852 	 *  on the fly. */
3853 	drbd_rs_cancel_all(mdev);
3854 	mdev->rs_total = 0;
3855 	mdev->rs_failed = 0;
3856 	atomic_set(&mdev->rs_pending_cnt, 0);
3857 	wake_up(&mdev->misc_wait);
3858 
3859 	/* make sure syncer is stopped and w_resume_next_sg queued */
3860 	del_timer_sync(&mdev->resync_timer);
3861 	resync_timer_fn((unsigned long)mdev);
3862 
3863 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3864 	 * w_make_resync_request etc. which may still be on the worker queue
3865 	 * to be "canceled" */
3866 	drbd_flush_workqueue(mdev);
3867 
3868 	/* This also does reclaim_net_ee().  If we do this too early, we might
3869 	 * miss some resync ee and pages.*/
3870 	drbd_process_done_ee(mdev);
3871 
3872 	kfree(mdev->p_uuid);
3873 	mdev->p_uuid = NULL;
3874 
3875 	if (!is_susp(mdev->state))
3876 		tl_clear(mdev);
3877 
3878 	dev_info(DEV, "Connection closed\n");
3879 
3880 	drbd_md_sync(mdev);
3881 
3882 	fp = FP_DONT_CARE;
3883 	if (get_ldev(mdev)) {
3884 		fp = mdev->ldev->dc.fencing;
3885 		put_ldev(mdev);
3886 	}
3887 
3888 	if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3889 		drbd_try_outdate_peer_async(mdev);
3890 
3891 	spin_lock_irq(&mdev->req_lock);
3892 	os = mdev->state;
3893 	if (os.conn >= C_UNCONNECTED) {
3894 		/* Do not restart in case we are C_DISCONNECTING */
3895 		ns = os;
3896 		ns.conn = C_UNCONNECTED;
3897 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3898 	}
3899 	spin_unlock_irq(&mdev->req_lock);
3900 
3901 	if (os.conn == C_DISCONNECTING) {
3902 		wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3903 
3904 		crypto_free_hash(mdev->cram_hmac_tfm);
3905 		mdev->cram_hmac_tfm = NULL;
3906 
3907 		kfree(mdev->net_conf);
3908 		mdev->net_conf = NULL;
3909 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3910 	}
3911 
3912 	/* serialize with bitmap writeout triggered by the state change,
3913 	 * if any. */
3914 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
3915 
3916 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3917 	 * want to use SO_LINGER, because apparently it can be deferred for
3918 	 * more than 20 seconds (longest time I checked).
3919 	 *
3920 	 * Actually we don't care for exactly when the network stack does its
3921 	 * put_page(), but release our reference on these pages right here.
3922 	 */
3923 	i = drbd_release_ee(mdev, &mdev->net_ee);
3924 	if (i)
3925 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3926 	i = atomic_read(&mdev->pp_in_use_by_net);
3927 	if (i)
3928 		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3929 	i = atomic_read(&mdev->pp_in_use);
3930 	if (i)
3931 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3932 
3933 	D_ASSERT(list_empty(&mdev->read_ee));
3934 	D_ASSERT(list_empty(&mdev->active_ee));
3935 	D_ASSERT(list_empty(&mdev->sync_ee));
3936 	D_ASSERT(list_empty(&mdev->done_ee));
3937 
3938 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3939 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3940 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3941 }
3942 
3943 /*
3944  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3945  * we can agree on is stored in agreed_pro_version.
3946  *
3947  * feature flags and the reserved array should be enough room for future
3948  * enhancements of the handshake protocol, and possible plugins...
3949  *
3950  * for now, they are expected to be zero, but ignored.
3951  */
3952 static int drbd_send_handshake(struct drbd_conf *mdev)
3953 {
3954 	/* ASSERT current == mdev->receiver ... */
3955 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3956 	int ok;
3957 
3958 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3959 		dev_err(DEV, "interrupted during initial handshake\n");
3960 		return 0; /* interrupted. not ok. */
3961 	}
3962 
3963 	if (mdev->data.socket == NULL) {
3964 		mutex_unlock(&mdev->data.mutex);
3965 		return 0;
3966 	}
3967 
3968 	memset(p, 0, sizeof(*p));
3969 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3970 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3971 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3972 			     (struct p_header80 *)p, sizeof(*p), 0 );
3973 	mutex_unlock(&mdev->data.mutex);
3974 	return ok;
3975 }
3976 
3977 /*
3978  * return values:
3979  *   1 yes, we have a valid connection
3980  *   0 oops, did not work out, please try again
3981  *  -1 peer talks different language,
3982  *     no point in trying again, please go standalone.
3983  */
3984 static int drbd_do_handshake(struct drbd_conf *mdev)
3985 {
3986 	/* ASSERT current == mdev->receiver ... */
3987 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3988 	const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3989 	unsigned int length;
3990 	enum drbd_packets cmd;
3991 	int rv;
3992 
3993 	rv = drbd_send_handshake(mdev);
3994 	if (!rv)
3995 		return 0;
3996 
3997 	rv = drbd_recv_header(mdev, &cmd, &length);
3998 	if (!rv)
3999 		return 0;
4000 
4001 	if (cmd != P_HAND_SHAKE) {
4002 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
4003 		     cmdname(cmd), cmd);
4004 		return -1;
4005 	}
4006 
4007 	if (length != expect) {
4008 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4009 		     expect, length);
4010 		return -1;
4011 	}
4012 
4013 	rv = drbd_recv(mdev, &p->head.payload, expect);
4014 
4015 	if (rv != expect) {
4016 		if (!signal_pending(current))
4017 			dev_warn(DEV, "short read receiving handshake packet: l=%u\n", rv);
4018 		return 0;
4019 	}
4020 
4021 	p->protocol_min = be32_to_cpu(p->protocol_min);
4022 	p->protocol_max = be32_to_cpu(p->protocol_max);
4023 	if (p->protocol_max == 0)
4024 		p->protocol_max = p->protocol_min;
4025 
4026 	if (PRO_VERSION_MAX < p->protocol_min ||
4027 	    PRO_VERSION_MIN > p->protocol_max)
4028 		goto incompat;
4029 
4030 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4031 
4032 	dev_info(DEV, "Handshake successful: "
4033 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4034 
4035 	return 1;
4036 
4037  incompat:
4038 	dev_err(DEV, "incompatible DRBD dialects: "
4039 	    "I support %d-%d, peer supports %d-%d\n",
4040 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4041 	    p->protocol_min, p->protocol_max);
4042 	return -1;
4043 }
4044 
4045 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4046 static int drbd_do_auth(struct drbd_conf *mdev)
4047 {
4048 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4049 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4050 	return -1;
4051 }
4052 #else
4053 #define CHALLENGE_LEN 64
4054 
4055 /* Return value:
4056 	1 - auth succeeded,
4057 	0 - failed, try again (network error),
4058 	-1 - auth failed, don't try again.
4059 */
4060 
4061 static int drbd_do_auth(struct drbd_conf *mdev)
4062 {
4063 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4064 	struct scatterlist sg;
4065 	char *response = NULL;
4066 	char *right_response = NULL;
4067 	char *peers_ch = NULL;
4068 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4069 	unsigned int resp_size;
4070 	struct hash_desc desc;
4071 	enum drbd_packets cmd;
4072 	unsigned int length;
4073 	int rv;
4074 
4075 	desc.tfm = mdev->cram_hmac_tfm;
4076 	desc.flags = 0;
4077 
4078 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4079 				(u8 *)mdev->net_conf->shared_secret, key_len);
4080 	if (rv) {
4081 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4082 		rv = -1;
4083 		goto fail;
4084 	}
4085 
4086 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4087 
4088 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4089 	if (!rv)
4090 		goto fail;
4091 
4092 	rv = drbd_recv_header(mdev, &cmd, &length);
4093 	if (!rv)
4094 		goto fail;
4095 
4096 	if (cmd != P_AUTH_CHALLENGE) {
4097 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4098 		    cmdname(cmd), cmd);
4099 		rv = 0;
4100 		goto fail;
4101 	}
4102 
4103 	if (length > CHALLENGE_LEN * 2) {
4104 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4105 		rv = -1;
4106 		goto fail;
4107 	}
4108 
4109 	peers_ch = kmalloc(length, GFP_NOIO);
4110 	if (peers_ch == NULL) {
4111 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4112 		rv = -1;
4113 		goto fail;
4114 	}
4115 
4116 	rv = drbd_recv(mdev, peers_ch, length);
4117 
4118 	if (rv != length) {
4119 		if (!signal_pending(current))
4120 			dev_warn(DEV, "short read AuthChallenge: l=%u\n", rv);
4121 		rv = 0;
4122 		goto fail;
4123 	}
4124 
4125 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4126 	response = kmalloc(resp_size, GFP_NOIO);
4127 	if (response == NULL) {
4128 		dev_err(DEV, "kmalloc of response failed\n");
4129 		rv = -1;
4130 		goto fail;
4131 	}
4132 
4133 	sg_init_table(&sg, 1);
4134 	sg_set_buf(&sg, peers_ch, length);
4135 
4136 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4137 	if (rv) {
4138 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4139 		rv = -1;
4140 		goto fail;
4141 	}
4142 
4143 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4144 	if (!rv)
4145 		goto fail;
4146 
4147 	rv = drbd_recv_header(mdev, &cmd, &length);
4148 	if (!rv)
4149 		goto fail;
4150 
4151 	if (cmd != P_AUTH_RESPONSE) {
4152 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4153 			cmdname(cmd), cmd);
4154 		rv = 0;
4155 		goto fail;
4156 	}
4157 
4158 	if (length != resp_size) {
4159 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4160 		rv = 0;
4161 		goto fail;
4162 	}
4163 
4164 	rv = drbd_recv(mdev, response , resp_size);
4165 
4166 	if (rv != resp_size) {
4167 		if (!signal_pending(current))
4168 			dev_warn(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4169 		rv = 0;
4170 		goto fail;
4171 	}
4172 
4173 	right_response = kmalloc(resp_size, GFP_NOIO);
4174 	if (right_response == NULL) {
4175 		dev_err(DEV, "kmalloc of right_response failed\n");
4176 		rv = -1;
4177 		goto fail;
4178 	}
4179 
4180 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4181 
4182 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4183 	if (rv) {
4184 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4185 		rv = -1;
4186 		goto fail;
4187 	}
4188 
4189 	rv = !memcmp(response, right_response, resp_size);
4190 
4191 	if (rv)
4192 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4193 		     resp_size, mdev->net_conf->cram_hmac_alg);
4194 	else
4195 		rv = -1;
4196 
4197  fail:
4198 	kfree(peers_ch);
4199 	kfree(response);
4200 	kfree(right_response);
4201 
4202 	return rv;
4203 }
4204 #endif
4205 
4206 int drbdd_init(struct drbd_thread *thi)
4207 {
4208 	struct drbd_conf *mdev = thi->mdev;
4209 	unsigned int minor = mdev_to_minor(mdev);
4210 	int h;
4211 
4212 	sprintf(current->comm, "drbd%d_receiver", minor);
4213 
4214 	dev_info(DEV, "receiver (re)started\n");
4215 
4216 	do {
4217 		h = drbd_connect(mdev);
4218 		if (h == 0) {
4219 			drbd_disconnect(mdev);
4220 			schedule_timeout_interruptible(HZ);
4221 		}
4222 		if (h == -1) {
4223 			dev_warn(DEV, "Discarding network configuration.\n");
4224 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4225 		}
4226 	} while (h == 0);
4227 
4228 	if (h > 0) {
4229 		if (get_net_conf(mdev)) {
4230 			drbdd(mdev);
4231 			put_net_conf(mdev);
4232 		}
4233 	}
4234 
4235 	drbd_disconnect(mdev);
4236 
4237 	dev_info(DEV, "receiver terminated\n");
4238 	return 0;
4239 }
4240 
4241 /* ********* acknowledge sender ******** */
4242 
4243 static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4244 {
4245 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4246 
4247 	int retcode = be32_to_cpu(p->retcode);
4248 
4249 	if (retcode >= SS_SUCCESS) {
4250 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4251 	} else {
4252 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4253 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4254 		    drbd_set_st_err_str(retcode), retcode);
4255 	}
4256 	wake_up(&mdev->state_wait);
4257 
4258 	return true;
4259 }
4260 
4261 static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4262 {
4263 	return drbd_send_ping_ack(mdev);
4264 
4265 }
4266 
4267 static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4268 {
4269 	/* restore idle timeout */
4270 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4271 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4272 		wake_up(&mdev->misc_wait);
4273 
4274 	return true;
4275 }
4276 
4277 static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4278 {
4279 	struct p_block_ack *p = (struct p_block_ack *)h;
4280 	sector_t sector = be64_to_cpu(p->sector);
4281 	int blksize = be32_to_cpu(p->blksize);
4282 
4283 	D_ASSERT(mdev->agreed_pro_version >= 89);
4284 
4285 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4286 
4287 	if (get_ldev(mdev)) {
4288 		drbd_rs_complete_io(mdev, sector);
4289 		drbd_set_in_sync(mdev, sector, blksize);
4290 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4291 		mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4292 		put_ldev(mdev);
4293 	}
4294 	dec_rs_pending(mdev);
4295 	atomic_add(blksize >> 9, &mdev->rs_sect_in);
4296 
4297 	return true;
4298 }
4299 
4300 /* when we receive the ACK for a write request,
4301  * verify that we actually know about it */
4302 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4303 	u64 id, sector_t sector)
4304 {
4305 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4306 	struct hlist_node *n;
4307 	struct drbd_request *req;
4308 
4309 	hlist_for_each_entry(req, n, slot, collision) {
4310 		if ((unsigned long)req == (unsigned long)id) {
4311 			if (req->sector != sector) {
4312 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4313 				    "wrong sector (%llus versus %llus)\n", req,
4314 				    (unsigned long long)req->sector,
4315 				    (unsigned long long)sector);
4316 				break;
4317 			}
4318 			return req;
4319 		}
4320 	}
4321 	return NULL;
4322 }
4323 
4324 typedef struct drbd_request *(req_validator_fn)
4325 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4326 
4327 static int validate_req_change_req_state(struct drbd_conf *mdev,
4328 	u64 id, sector_t sector, req_validator_fn validator,
4329 	const char *func, enum drbd_req_event what)
4330 {
4331 	struct drbd_request *req;
4332 	struct bio_and_error m;
4333 
4334 	spin_lock_irq(&mdev->req_lock);
4335 	req = validator(mdev, id, sector);
4336 	if (unlikely(!req)) {
4337 		spin_unlock_irq(&mdev->req_lock);
4338 
4339 		dev_err(DEV, "%s: failed to find req %p, sector %llus\n", func,
4340 			(void *)(unsigned long)id, (unsigned long long)sector);
4341 		return false;
4342 	}
4343 	__req_mod(req, what, &m);
4344 	spin_unlock_irq(&mdev->req_lock);
4345 
4346 	if (m.bio)
4347 		complete_master_bio(mdev, &m);
4348 	return true;
4349 }
4350 
4351 static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4352 {
4353 	struct p_block_ack *p = (struct p_block_ack *)h;
4354 	sector_t sector = be64_to_cpu(p->sector);
4355 	int blksize = be32_to_cpu(p->blksize);
4356 	enum drbd_req_event what;
4357 
4358 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4359 
4360 	if (is_syncer_block_id(p->block_id)) {
4361 		drbd_set_in_sync(mdev, sector, blksize);
4362 		dec_rs_pending(mdev);
4363 		return true;
4364 	}
4365 	switch (be16_to_cpu(h->command)) {
4366 	case P_RS_WRITE_ACK:
4367 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4368 		what = write_acked_by_peer_and_sis;
4369 		break;
4370 	case P_WRITE_ACK:
4371 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4372 		what = write_acked_by_peer;
4373 		break;
4374 	case P_RECV_ACK:
4375 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4376 		what = recv_acked_by_peer;
4377 		break;
4378 	case P_DISCARD_ACK:
4379 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4380 		what = conflict_discarded_by_peer;
4381 		break;
4382 	default:
4383 		D_ASSERT(0);
4384 		return false;
4385 	}
4386 
4387 	return validate_req_change_req_state(mdev, p->block_id, sector,
4388 		_ack_id_to_req, __func__ , what);
4389 }
4390 
4391 static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4392 {
4393 	struct p_block_ack *p = (struct p_block_ack *)h;
4394 	sector_t sector = be64_to_cpu(p->sector);
4395 	int size = be32_to_cpu(p->blksize);
4396 	struct drbd_request *req;
4397 	struct bio_and_error m;
4398 
4399 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4400 
4401 	if (is_syncer_block_id(p->block_id)) {
4402 		dec_rs_pending(mdev);
4403 		drbd_rs_failed_io(mdev, sector, size);
4404 		return true;
4405 	}
4406 
4407 	spin_lock_irq(&mdev->req_lock);
4408 	req = _ack_id_to_req(mdev, p->block_id, sector);
4409 	if (!req) {
4410 		spin_unlock_irq(&mdev->req_lock);
4411 		if (mdev->net_conf->wire_protocol == DRBD_PROT_A ||
4412 		    mdev->net_conf->wire_protocol == DRBD_PROT_B) {
4413 			/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4414 			   The master bio might already be completed, therefore the
4415 			   request is no longer in the collision hash.
4416 			   => Do not try to validate block_id as request. */
4417 			/* In Protocol B we might already have got a P_RECV_ACK
4418 			   but then get a P_NEG_ACK after wards. */
4419 			drbd_set_out_of_sync(mdev, sector, size);
4420 			return true;
4421 		} else {
4422 			dev_err(DEV, "%s: failed to find req %p, sector %llus\n", __func__,
4423 				(void *)(unsigned long)p->block_id, (unsigned long long)sector);
4424 			return false;
4425 		}
4426 	}
4427 	__req_mod(req, neg_acked, &m);
4428 	spin_unlock_irq(&mdev->req_lock);
4429 
4430 	if (m.bio)
4431 		complete_master_bio(mdev, &m);
4432 	return true;
4433 }
4434 
4435 static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4436 {
4437 	struct p_block_ack *p = (struct p_block_ack *)h;
4438 	sector_t sector = be64_to_cpu(p->sector);
4439 
4440 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4441 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4442 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4443 
4444 	return validate_req_change_req_state(mdev, p->block_id, sector,
4445 		_ar_id_to_req, __func__ , neg_acked);
4446 }
4447 
4448 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4449 {
4450 	sector_t sector;
4451 	int size;
4452 	struct p_block_ack *p = (struct p_block_ack *)h;
4453 
4454 	sector = be64_to_cpu(p->sector);
4455 	size = be32_to_cpu(p->blksize);
4456 
4457 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4458 
4459 	dec_rs_pending(mdev);
4460 
4461 	if (get_ldev_if_state(mdev, D_FAILED)) {
4462 		drbd_rs_complete_io(mdev, sector);
4463 		switch (be16_to_cpu(h->command)) {
4464 		case P_NEG_RS_DREPLY:
4465 			drbd_rs_failed_io(mdev, sector, size);
4466 		case P_RS_CANCEL:
4467 			break;
4468 		default:
4469 			D_ASSERT(0);
4470 			put_ldev(mdev);
4471 			return false;
4472 		}
4473 		put_ldev(mdev);
4474 	}
4475 
4476 	return true;
4477 }
4478 
4479 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4480 {
4481 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4482 
4483 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4484 
4485 	if (mdev->state.conn == C_AHEAD &&
4486 	    atomic_read(&mdev->ap_in_flight) == 0 &&
4487 	    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
4488 		mdev->start_resync_timer.expires = jiffies + HZ;
4489 		add_timer(&mdev->start_resync_timer);
4490 	}
4491 
4492 	return true;
4493 }
4494 
4495 static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4496 {
4497 	struct p_block_ack *p = (struct p_block_ack *)h;
4498 	struct drbd_work *w;
4499 	sector_t sector;
4500 	int size;
4501 
4502 	sector = be64_to_cpu(p->sector);
4503 	size = be32_to_cpu(p->blksize);
4504 
4505 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4506 
4507 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4508 		drbd_ov_oos_found(mdev, sector, size);
4509 	else
4510 		ov_oos_print(mdev);
4511 
4512 	if (!get_ldev(mdev))
4513 		return true;
4514 
4515 	drbd_rs_complete_io(mdev, sector);
4516 	dec_rs_pending(mdev);
4517 
4518 	--mdev->ov_left;
4519 
4520 	/* let's advance progress step marks only for every other megabyte */
4521 	if ((mdev->ov_left & 0x200) == 0x200)
4522 		drbd_advance_rs_marks(mdev, mdev->ov_left);
4523 
4524 	if (mdev->ov_left == 0) {
4525 		w = kmalloc(sizeof(*w), GFP_NOIO);
4526 		if (w) {
4527 			w->cb = w_ov_finished;
4528 			drbd_queue_work_front(&mdev->data.work, w);
4529 		} else {
4530 			dev_err(DEV, "kmalloc(w) failed.");
4531 			ov_oos_print(mdev);
4532 			drbd_resync_finished(mdev);
4533 		}
4534 	}
4535 	put_ldev(mdev);
4536 	return true;
4537 }
4538 
4539 static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4540 {
4541 	return true;
4542 }
4543 
4544 struct asender_cmd {
4545 	size_t pkt_size;
4546 	int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4547 };
4548 
4549 static struct asender_cmd *get_asender_cmd(int cmd)
4550 {
4551 	static struct asender_cmd asender_tbl[] = {
4552 		/* anything missing from this table is in
4553 		 * the drbd_cmd_handler (drbd_default_handler) table,
4554 		 * see the beginning of drbdd() */
4555 	[P_PING]	    = { sizeof(struct p_header80), got_Ping },
4556 	[P_PING_ACK]	    = { sizeof(struct p_header80), got_PingAck },
4557 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4558 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4559 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4560 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4561 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4562 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4563 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4564 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4565 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4566 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4567 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4568 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4569 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply},
4570 	[P_MAX_CMD]	    = { 0, NULL },
4571 	};
4572 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4573 		return NULL;
4574 	return &asender_tbl[cmd];
4575 }
4576 
4577 int drbd_asender(struct drbd_thread *thi)
4578 {
4579 	struct drbd_conf *mdev = thi->mdev;
4580 	struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4581 	struct asender_cmd *cmd = NULL;
4582 
4583 	int rv, len;
4584 	void *buf    = h;
4585 	int received = 0;
4586 	int expect   = sizeof(struct p_header80);
4587 	int empty;
4588 	int ping_timeout_active = 0;
4589 
4590 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4591 
4592 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4593 	current->rt_priority = 2;    /* more important than all other tasks */
4594 
4595 	while (get_t_state(thi) == Running) {
4596 		drbd_thread_current_set_cpu(mdev);
4597 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4598 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4599 			mdev->meta.socket->sk->sk_rcvtimeo =
4600 				mdev->net_conf->ping_timeo*HZ/10;
4601 			ping_timeout_active = 1;
4602 		}
4603 
4604 		/* conditionally cork;
4605 		 * it may hurt latency if we cork without much to send */
4606 		if (!mdev->net_conf->no_cork &&
4607 			3 < atomic_read(&mdev->unacked_cnt))
4608 			drbd_tcp_cork(mdev->meta.socket);
4609 		while (1) {
4610 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4611 			flush_signals(current);
4612 			if (!drbd_process_done_ee(mdev))
4613 				goto reconnect;
4614 			/* to avoid race with newly queued ACKs */
4615 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4616 			spin_lock_irq(&mdev->req_lock);
4617 			empty = list_empty(&mdev->done_ee);
4618 			spin_unlock_irq(&mdev->req_lock);
4619 			/* new ack may have been queued right here,
4620 			 * but then there is also a signal pending,
4621 			 * and we start over... */
4622 			if (empty)
4623 				break;
4624 		}
4625 		/* but unconditionally uncork unless disabled */
4626 		if (!mdev->net_conf->no_cork)
4627 			drbd_tcp_uncork(mdev->meta.socket);
4628 
4629 		/* short circuit, recv_msg would return EINTR anyways. */
4630 		if (signal_pending(current))
4631 			continue;
4632 
4633 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4634 				     buf, expect-received, 0);
4635 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4636 
4637 		flush_signals(current);
4638 
4639 		/* Note:
4640 		 * -EINTR	 (on meta) we got a signal
4641 		 * -EAGAIN	 (on meta) rcvtimeo expired
4642 		 * -ECONNRESET	 other side closed the connection
4643 		 * -ERESTARTSYS  (on data) we got a signal
4644 		 * rv <  0	 other than above: unexpected error!
4645 		 * rv == expected: full header or command
4646 		 * rv <  expected: "woken" by signal during receive
4647 		 * rv == 0	 : "connection shut down by peer"
4648 		 */
4649 		if (likely(rv > 0)) {
4650 			received += rv;
4651 			buf	 += rv;
4652 		} else if (rv == 0) {
4653 			dev_err(DEV, "meta connection shut down by peer.\n");
4654 			goto reconnect;
4655 		} else if (rv == -EAGAIN) {
4656 			/* If the data socket received something meanwhile,
4657 			 * that is good enough: peer is still alive. */
4658 			if (time_after(mdev->last_received,
4659 				jiffies - mdev->meta.socket->sk->sk_rcvtimeo))
4660 				continue;
4661 			if (ping_timeout_active) {
4662 				dev_err(DEV, "PingAck did not arrive in time.\n");
4663 				goto reconnect;
4664 			}
4665 			set_bit(SEND_PING, &mdev->flags);
4666 			continue;
4667 		} else if (rv == -EINTR) {
4668 			continue;
4669 		} else {
4670 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4671 			goto reconnect;
4672 		}
4673 
4674 		if (received == expect && cmd == NULL) {
4675 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4676 				dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4677 				    be32_to_cpu(h->magic),
4678 				    be16_to_cpu(h->command),
4679 				    be16_to_cpu(h->length));
4680 				goto reconnect;
4681 			}
4682 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4683 			len = be16_to_cpu(h->length);
4684 			if (unlikely(cmd == NULL)) {
4685 				dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4686 				    be32_to_cpu(h->magic),
4687 				    be16_to_cpu(h->command),
4688 				    be16_to_cpu(h->length));
4689 				goto disconnect;
4690 			}
4691 			expect = cmd->pkt_size;
4692 			ERR_IF(len != expect-sizeof(struct p_header80))
4693 				goto reconnect;
4694 		}
4695 		if (received == expect) {
4696 			mdev->last_received = jiffies;
4697 			D_ASSERT(cmd != NULL);
4698 			if (!cmd->process(mdev, h))
4699 				goto reconnect;
4700 
4701 			/* the idle_timeout (ping-int)
4702 			 * has been restored in got_PingAck() */
4703 			if (cmd == get_asender_cmd(P_PING_ACK))
4704 				ping_timeout_active = 0;
4705 
4706 			buf	 = h;
4707 			received = 0;
4708 			expect	 = sizeof(struct p_header80);
4709 			cmd	 = NULL;
4710 		}
4711 	}
4712 
4713 	if (0) {
4714 reconnect:
4715 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4716 		drbd_md_sync(mdev);
4717 	}
4718 	if (0) {
4719 disconnect:
4720 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4721 		drbd_md_sync(mdev);
4722 	}
4723 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4724 
4725 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4726 	dev_info(DEV, "asender terminated\n");
4727 
4728 	return 0;
4729 }
4730