1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/string.h>
46 #include <linux/scatterlist.h>
47 #include "drbd_int.h"
48 #include "drbd_req.h"
49 
50 #include "drbd_vli.h"
51 
52 struct flush_work {
53 	struct drbd_work w;
54 	struct drbd_epoch *epoch;
55 };
56 
57 enum finish_epoch {
58 	FE_STILL_LIVE,
59 	FE_DESTROYED,
60 	FE_RECYCLED,
61 };
62 
63 static int drbd_do_handshake(struct drbd_conf *mdev);
64 static int drbd_do_auth(struct drbd_conf *mdev);
65 
66 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
67 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
68 
69 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
70 {
71 	struct drbd_epoch *prev;
72 	spin_lock(&mdev->epoch_lock);
73 	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
74 	if (prev == epoch || prev == mdev->current_epoch)
75 		prev = NULL;
76 	spin_unlock(&mdev->epoch_lock);
77 	return prev;
78 }
79 
80 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
81 
82 /*
83  * some helper functions to deal with single linked page lists,
84  * page->private being our "next" pointer.
85  */
86 
87 /* If at least n pages are linked at head, get n pages off.
88  * Otherwise, don't modify head, and return NULL.
89  * Locking is the responsibility of the caller.
90  */
91 static struct page *page_chain_del(struct page **head, int n)
92 {
93 	struct page *page;
94 	struct page *tmp;
95 
96 	BUG_ON(!n);
97 	BUG_ON(!head);
98 
99 	page = *head;
100 
101 	if (!page)
102 		return NULL;
103 
104 	while (page) {
105 		tmp = page_chain_next(page);
106 		if (--n == 0)
107 			break; /* found sufficient pages */
108 		if (tmp == NULL)
109 			/* insufficient pages, don't use any of them. */
110 			return NULL;
111 		page = tmp;
112 	}
113 
114 	/* add end of list marker for the returned list */
115 	set_page_private(page, 0);
116 	/* actual return value, and adjustment of head */
117 	page = *head;
118 	*head = tmp;
119 	return page;
120 }
121 
122 /* may be used outside of locks to find the tail of a (usually short)
123  * "private" page chain, before adding it back to a global chain head
124  * with page_chain_add() under a spinlock. */
125 static struct page *page_chain_tail(struct page *page, int *len)
126 {
127 	struct page *tmp;
128 	int i = 1;
129 	while ((tmp = page_chain_next(page)))
130 		++i, page = tmp;
131 	if (len)
132 		*len = i;
133 	return page;
134 }
135 
136 static int page_chain_free(struct page *page)
137 {
138 	struct page *tmp;
139 	int i = 0;
140 	page_chain_for_each_safe(page, tmp) {
141 		put_page(page);
142 		++i;
143 	}
144 	return i;
145 }
146 
147 static void page_chain_add(struct page **head,
148 		struct page *chain_first, struct page *chain_last)
149 {
150 #if 1
151 	struct page *tmp;
152 	tmp = page_chain_tail(chain_first, NULL);
153 	BUG_ON(tmp != chain_last);
154 #endif
155 
156 	/* add chain to head */
157 	set_page_private(chain_last, (unsigned long)*head);
158 	*head = chain_first;
159 }
160 
161 static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
162 {
163 	struct page *page = NULL;
164 	struct page *tmp = NULL;
165 	int i = 0;
166 
167 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
168 	 * So what. It saves a spin_lock. */
169 	if (drbd_pp_vacant >= number) {
170 		spin_lock(&drbd_pp_lock);
171 		page = page_chain_del(&drbd_pp_pool, number);
172 		if (page)
173 			drbd_pp_vacant -= number;
174 		spin_unlock(&drbd_pp_lock);
175 		if (page)
176 			return page;
177 	}
178 
179 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
180 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
181 	 * which in turn might block on the other node at this very place.  */
182 	for (i = 0; i < number; i++) {
183 		tmp = alloc_page(GFP_TRY);
184 		if (!tmp)
185 			break;
186 		set_page_private(tmp, (unsigned long)page);
187 		page = tmp;
188 	}
189 
190 	if (i == number)
191 		return page;
192 
193 	/* Not enough pages immediately available this time.
194 	 * No need to jump around here, drbd_pp_alloc will retry this
195 	 * function "soon". */
196 	if (page) {
197 		tmp = page_chain_tail(page, NULL);
198 		spin_lock(&drbd_pp_lock);
199 		page_chain_add(&drbd_pp_pool, page, tmp);
200 		drbd_pp_vacant += i;
201 		spin_unlock(&drbd_pp_lock);
202 	}
203 	return NULL;
204 }
205 
206 /* kick lower level device, if we have more than (arbitrary number)
207  * reference counts on it, which typically are locally submitted io
208  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
209 static void maybe_kick_lo(struct drbd_conf *mdev)
210 {
211 	if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
212 		drbd_kick_lo(mdev);
213 }
214 
215 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
216 {
217 	struct drbd_epoch_entry *e;
218 	struct list_head *le, *tle;
219 
220 	/* The EEs are always appended to the end of the list. Since
221 	   they are sent in order over the wire, they have to finish
222 	   in order. As soon as we see the first not finished we can
223 	   stop to examine the list... */
224 
225 	list_for_each_safe(le, tle, &mdev->net_ee) {
226 		e = list_entry(le, struct drbd_epoch_entry, w.list);
227 		if (drbd_ee_has_active_page(e))
228 			break;
229 		list_move(le, to_be_freed);
230 	}
231 }
232 
233 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
234 {
235 	LIST_HEAD(reclaimed);
236 	struct drbd_epoch_entry *e, *t;
237 
238 	maybe_kick_lo(mdev);
239 	spin_lock_irq(&mdev->req_lock);
240 	reclaim_net_ee(mdev, &reclaimed);
241 	spin_unlock_irq(&mdev->req_lock);
242 
243 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
244 		drbd_free_ee(mdev, e);
245 }
246 
247 /**
248  * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
249  * @mdev:	DRBD device.
250  * @number:	number of pages requested
251  * @retry:	whether to retry, if not enough pages are available right now
252  *
253  * Tries to allocate number pages, first from our own page pool, then from
254  * the kernel, unless this allocation would exceed the max_buffers setting.
255  * Possibly retry until DRBD frees sufficient pages somewhere else.
256  *
257  * Returns a page chain linked via page->private.
258  */
259 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
260 {
261 	struct page *page = NULL;
262 	DEFINE_WAIT(wait);
263 
264 	/* Yes, we may run up to @number over max_buffers. If we
265 	 * follow it strictly, the admin will get it wrong anyways. */
266 	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
267 		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
268 
269 	while (page == NULL) {
270 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
271 
272 		drbd_kick_lo_and_reclaim_net(mdev);
273 
274 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
275 			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
276 			if (page)
277 				break;
278 		}
279 
280 		if (!retry)
281 			break;
282 
283 		if (signal_pending(current)) {
284 			dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
285 			break;
286 		}
287 
288 		schedule();
289 	}
290 	finish_wait(&drbd_pp_wait, &wait);
291 
292 	if (page)
293 		atomic_add(number, &mdev->pp_in_use);
294 	return page;
295 }
296 
297 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
298  * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
302 {
303 	int i;
304 	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
305 		i = page_chain_free(page);
306 	else {
307 		struct page *tmp;
308 		tmp = page_chain_tail(page, &i);
309 		spin_lock(&drbd_pp_lock);
310 		page_chain_add(&drbd_pp_pool, page, tmp);
311 		drbd_pp_vacant += i;
312 		spin_unlock(&drbd_pp_lock);
313 	}
314 	atomic_sub(i, &mdev->pp_in_use);
315 	i = atomic_read(&mdev->pp_in_use);
316 	if (i < 0)
317 		dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
318 	wake_up(&drbd_pp_wait);
319 }
320 
321 /*
322 You need to hold the req_lock:
323  _drbd_wait_ee_list_empty()
324 
325 You must not have the req_lock:
326  drbd_free_ee()
327  drbd_alloc_ee()
328  drbd_init_ee()
329  drbd_release_ee()
330  drbd_ee_fix_bhs()
331  drbd_process_done_ee()
332  drbd_clear_done_ee()
333  drbd_wait_ee_list_empty()
334 */
335 
336 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
337 				     u64 id,
338 				     sector_t sector,
339 				     unsigned int data_size,
340 				     gfp_t gfp_mask) __must_hold(local)
341 {
342 	struct drbd_epoch_entry *e;
343 	struct page *page;
344 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
345 
346 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
347 		return NULL;
348 
349 	e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
350 	if (!e) {
351 		if (!(gfp_mask & __GFP_NOWARN))
352 			dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
353 		return NULL;
354 	}
355 
356 	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
357 	if (!page)
358 		goto fail;
359 
360 	INIT_HLIST_NODE(&e->colision);
361 	e->epoch = NULL;
362 	e->mdev = mdev;
363 	e->pages = page;
364 	atomic_set(&e->pending_bios, 0);
365 	e->size = data_size;
366 	e->flags = 0;
367 	e->sector = sector;
368 	e->sector = sector;
369 	e->block_id = id;
370 
371 	return e;
372 
373  fail:
374 	mempool_free(e, drbd_ee_mempool);
375 	return NULL;
376 }
377 
378 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
379 {
380 	drbd_pp_free(mdev, e->pages);
381 	D_ASSERT(atomic_read(&e->pending_bios) == 0);
382 	D_ASSERT(hlist_unhashed(&e->colision));
383 	mempool_free(e, drbd_ee_mempool);
384 }
385 
386 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
387 {
388 	LIST_HEAD(work_list);
389 	struct drbd_epoch_entry *e, *t;
390 	int count = 0;
391 
392 	spin_lock_irq(&mdev->req_lock);
393 	list_splice_init(list, &work_list);
394 	spin_unlock_irq(&mdev->req_lock);
395 
396 	list_for_each_entry_safe(e, t, &work_list, w.list) {
397 		drbd_free_ee(mdev, e);
398 		count++;
399 	}
400 	return count;
401 }
402 
403 
404 /*
405  * This function is called from _asender only_
406  * but see also comments in _req_mod(,barrier_acked)
407  * and receive_Barrier.
408  *
409  * Move entries from net_ee to done_ee, if ready.
410  * Grab done_ee, call all callbacks, free the entries.
411  * The callbacks typically send out ACKs.
412  */
413 static int drbd_process_done_ee(struct drbd_conf *mdev)
414 {
415 	LIST_HEAD(work_list);
416 	LIST_HEAD(reclaimed);
417 	struct drbd_epoch_entry *e, *t;
418 	int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
419 
420 	spin_lock_irq(&mdev->req_lock);
421 	reclaim_net_ee(mdev, &reclaimed);
422 	list_splice_init(&mdev->done_ee, &work_list);
423 	spin_unlock_irq(&mdev->req_lock);
424 
425 	list_for_each_entry_safe(e, t, &reclaimed, w.list)
426 		drbd_free_ee(mdev, e);
427 
428 	/* possible callbacks here:
429 	 * e_end_block, and e_end_resync_block, e_send_discard_ack.
430 	 * all ignore the last argument.
431 	 */
432 	list_for_each_entry_safe(e, t, &work_list, w.list) {
433 		/* list_del not necessary, next/prev members not touched */
434 		ok = e->w.cb(mdev, &e->w, !ok) && ok;
435 		drbd_free_ee(mdev, e);
436 	}
437 	wake_up(&mdev->ee_wait);
438 
439 	return ok;
440 }
441 
442 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
443 {
444 	DEFINE_WAIT(wait);
445 
446 	/* avoids spin_lock/unlock
447 	 * and calling prepare_to_wait in the fast path */
448 	while (!list_empty(head)) {
449 		prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
450 		spin_unlock_irq(&mdev->req_lock);
451 		drbd_kick_lo(mdev);
452 		schedule();
453 		finish_wait(&mdev->ee_wait, &wait);
454 		spin_lock_irq(&mdev->req_lock);
455 	}
456 }
457 
458 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
459 {
460 	spin_lock_irq(&mdev->req_lock);
461 	_drbd_wait_ee_list_empty(mdev, head);
462 	spin_unlock_irq(&mdev->req_lock);
463 }
464 
465 /* see also kernel_accept; which is only present since 2.6.18.
466  * also we want to log which part of it failed, exactly */
467 static int drbd_accept(struct drbd_conf *mdev, const char **what,
468 		struct socket *sock, struct socket **newsock)
469 {
470 	struct sock *sk = sock->sk;
471 	int err = 0;
472 
473 	*what = "listen";
474 	err = sock->ops->listen(sock, 5);
475 	if (err < 0)
476 		goto out;
477 
478 	*what = "sock_create_lite";
479 	err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
480 			       newsock);
481 	if (err < 0)
482 		goto out;
483 
484 	*what = "accept";
485 	err = sock->ops->accept(sock, *newsock, 0);
486 	if (err < 0) {
487 		sock_release(*newsock);
488 		*newsock = NULL;
489 		goto out;
490 	}
491 	(*newsock)->ops  = sock->ops;
492 
493 out:
494 	return err;
495 }
496 
497 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
498 		    void *buf, size_t size, int flags)
499 {
500 	mm_segment_t oldfs;
501 	struct kvec iov = {
502 		.iov_base = buf,
503 		.iov_len = size,
504 	};
505 	struct msghdr msg = {
506 		.msg_iovlen = 1,
507 		.msg_iov = (struct iovec *)&iov,
508 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
509 	};
510 	int rv;
511 
512 	oldfs = get_fs();
513 	set_fs(KERNEL_DS);
514 	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
515 	set_fs(oldfs);
516 
517 	return rv;
518 }
519 
520 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
521 {
522 	mm_segment_t oldfs;
523 	struct kvec iov = {
524 		.iov_base = buf,
525 		.iov_len = size,
526 	};
527 	struct msghdr msg = {
528 		.msg_iovlen = 1,
529 		.msg_iov = (struct iovec *)&iov,
530 		.msg_flags = MSG_WAITALL | MSG_NOSIGNAL
531 	};
532 	int rv;
533 
534 	oldfs = get_fs();
535 	set_fs(KERNEL_DS);
536 
537 	for (;;) {
538 		rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
539 		if (rv == size)
540 			break;
541 
542 		/* Note:
543 		 * ECONNRESET	other side closed the connection
544 		 * ERESTARTSYS	(on  sock) we got a signal
545 		 */
546 
547 		if (rv < 0) {
548 			if (rv == -ECONNRESET)
549 				dev_info(DEV, "sock was reset by peer\n");
550 			else if (rv != -ERESTARTSYS)
551 				dev_err(DEV, "sock_recvmsg returned %d\n", rv);
552 			break;
553 		} else if (rv == 0) {
554 			dev_info(DEV, "sock was shut down by peer\n");
555 			break;
556 		} else	{
557 			/* signal came in, or peer/link went down,
558 			 * after we read a partial message
559 			 */
560 			/* D_ASSERT(signal_pending(current)); */
561 			break;
562 		}
563 	};
564 
565 	set_fs(oldfs);
566 
567 	if (rv != size)
568 		drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
569 
570 	return rv;
571 }
572 
573 /* quoting tcp(7):
574  *   On individual connections, the socket buffer size must be set prior to the
575  *   listen(2) or connect(2) calls in order to have it take effect.
576  * This is our wrapper to do so.
577  */
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 		unsigned int rcv)
580 {
581 	/* open coded SO_SNDBUF, SO_RCVBUF */
582 	if (snd) {
583 		sock->sk->sk_sndbuf = snd;
584 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
585 	}
586 	if (rcv) {
587 		sock->sk->sk_rcvbuf = rcv;
588 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
589 	}
590 }
591 
592 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
593 {
594 	const char *what;
595 	struct socket *sock;
596 	struct sockaddr_in6 src_in6;
597 	int err;
598 	int disconnect_on_error = 1;
599 
600 	if (!get_net_conf(mdev))
601 		return NULL;
602 
603 	what = "sock_create_kern";
604 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
605 		SOCK_STREAM, IPPROTO_TCP, &sock);
606 	if (err < 0) {
607 		sock = NULL;
608 		goto out;
609 	}
610 
611 	sock->sk->sk_rcvtimeo =
612 	sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
613 	drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
614 			mdev->net_conf->rcvbuf_size);
615 
616        /* explicitly bind to the configured IP as source IP
617 	*  for the outgoing connections.
618 	*  This is needed for multihomed hosts and to be
619 	*  able to use lo: interfaces for drbd.
620 	* Make sure to use 0 as port number, so linux selects
621 	*  a free one dynamically.
622 	*/
623 	memcpy(&src_in6, mdev->net_conf->my_addr,
624 	       min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
625 	if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
626 		src_in6.sin6_port = 0;
627 	else
628 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
629 
630 	what = "bind before connect";
631 	err = sock->ops->bind(sock,
632 			      (struct sockaddr *) &src_in6,
633 			      mdev->net_conf->my_addr_len);
634 	if (err < 0)
635 		goto out;
636 
637 	/* connect may fail, peer not yet available.
638 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
639 	disconnect_on_error = 0;
640 	what = "connect";
641 	err = sock->ops->connect(sock,
642 				 (struct sockaddr *)mdev->net_conf->peer_addr,
643 				 mdev->net_conf->peer_addr_len, 0);
644 
645 out:
646 	if (err < 0) {
647 		if (sock) {
648 			sock_release(sock);
649 			sock = NULL;
650 		}
651 		switch (-err) {
652 			/* timeout, busy, signal pending */
653 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
654 		case EINTR: case ERESTARTSYS:
655 			/* peer not (yet) available, network problem */
656 		case ECONNREFUSED: case ENETUNREACH:
657 		case EHOSTDOWN:    case EHOSTUNREACH:
658 			disconnect_on_error = 0;
659 			break;
660 		default:
661 			dev_err(DEV, "%s failed, err = %d\n", what, err);
662 		}
663 		if (disconnect_on_error)
664 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
665 	}
666 	put_net_conf(mdev);
667 	return sock;
668 }
669 
670 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
671 {
672 	int timeo, err;
673 	struct socket *s_estab = NULL, *s_listen;
674 	const char *what;
675 
676 	if (!get_net_conf(mdev))
677 		return NULL;
678 
679 	what = "sock_create_kern";
680 	err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
681 		SOCK_STREAM, IPPROTO_TCP, &s_listen);
682 	if (err) {
683 		s_listen = NULL;
684 		goto out;
685 	}
686 
687 	timeo = mdev->net_conf->try_connect_int * HZ;
688 	timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
689 
690 	s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
691 	s_listen->sk->sk_rcvtimeo = timeo;
692 	s_listen->sk->sk_sndtimeo = timeo;
693 	drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
694 			mdev->net_conf->rcvbuf_size);
695 
696 	what = "bind before listen";
697 	err = s_listen->ops->bind(s_listen,
698 			      (struct sockaddr *) mdev->net_conf->my_addr,
699 			      mdev->net_conf->my_addr_len);
700 	if (err < 0)
701 		goto out;
702 
703 	err = drbd_accept(mdev, &what, s_listen, &s_estab);
704 
705 out:
706 	if (s_listen)
707 		sock_release(s_listen);
708 	if (err < 0) {
709 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
710 			dev_err(DEV, "%s failed, err = %d\n", what, err);
711 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
712 		}
713 	}
714 	put_net_conf(mdev);
715 
716 	return s_estab;
717 }
718 
719 static int drbd_send_fp(struct drbd_conf *mdev,
720 	struct socket *sock, enum drbd_packets cmd)
721 {
722 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
723 
724 	return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
725 }
726 
727 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
728 {
729 	struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
730 	int rr;
731 
732 	rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
733 
734 	if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
735 		return be16_to_cpu(h->command);
736 
737 	return 0xffff;
738 }
739 
740 /**
741  * drbd_socket_okay() - Free the socket if its connection is not okay
742  * @mdev:	DRBD device.
743  * @sock:	pointer to the pointer to the socket.
744  */
745 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
746 {
747 	int rr;
748 	char tb[4];
749 
750 	if (!*sock)
751 		return FALSE;
752 
753 	rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
754 
755 	if (rr > 0 || rr == -EAGAIN) {
756 		return TRUE;
757 	} else {
758 		sock_release(*sock);
759 		*sock = NULL;
760 		return FALSE;
761 	}
762 }
763 
764 /*
765  * return values:
766  *   1 yes, we have a valid connection
767  *   0 oops, did not work out, please try again
768  *  -1 peer talks different language,
769  *     no point in trying again, please go standalone.
770  *  -2 We do not have a network config...
771  */
772 static int drbd_connect(struct drbd_conf *mdev)
773 {
774 	struct socket *s, *sock, *msock;
775 	int try, h, ok;
776 
777 	D_ASSERT(!mdev->data.socket);
778 
779 	if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
780 		dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
781 
782 	if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
783 		return -2;
784 
785 	clear_bit(DISCARD_CONCURRENT, &mdev->flags);
786 
787 	sock  = NULL;
788 	msock = NULL;
789 
790 	do {
791 		for (try = 0;;) {
792 			/* 3 tries, this should take less than a second! */
793 			s = drbd_try_connect(mdev);
794 			if (s || ++try >= 3)
795 				break;
796 			/* give the other side time to call bind() & listen() */
797 			__set_current_state(TASK_INTERRUPTIBLE);
798 			schedule_timeout(HZ / 10);
799 		}
800 
801 		if (s) {
802 			if (!sock) {
803 				drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
804 				sock = s;
805 				s = NULL;
806 			} else if (!msock) {
807 				drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
808 				msock = s;
809 				s = NULL;
810 			} else {
811 				dev_err(DEV, "Logic error in drbd_connect()\n");
812 				goto out_release_sockets;
813 			}
814 		}
815 
816 		if (sock && msock) {
817 			__set_current_state(TASK_INTERRUPTIBLE);
818 			schedule_timeout(HZ / 10);
819 			ok = drbd_socket_okay(mdev, &sock);
820 			ok = drbd_socket_okay(mdev, &msock) && ok;
821 			if (ok)
822 				break;
823 		}
824 
825 retry:
826 		s = drbd_wait_for_connect(mdev);
827 		if (s) {
828 			try = drbd_recv_fp(mdev, s);
829 			drbd_socket_okay(mdev, &sock);
830 			drbd_socket_okay(mdev, &msock);
831 			switch (try) {
832 			case P_HAND_SHAKE_S:
833 				if (sock) {
834 					dev_warn(DEV, "initial packet S crossed\n");
835 					sock_release(sock);
836 				}
837 				sock = s;
838 				break;
839 			case P_HAND_SHAKE_M:
840 				if (msock) {
841 					dev_warn(DEV, "initial packet M crossed\n");
842 					sock_release(msock);
843 				}
844 				msock = s;
845 				set_bit(DISCARD_CONCURRENT, &mdev->flags);
846 				break;
847 			default:
848 				dev_warn(DEV, "Error receiving initial packet\n");
849 				sock_release(s);
850 				if (random32() & 1)
851 					goto retry;
852 			}
853 		}
854 
855 		if (mdev->state.conn <= C_DISCONNECTING)
856 			goto out_release_sockets;
857 		if (signal_pending(current)) {
858 			flush_signals(current);
859 			smp_rmb();
860 			if (get_t_state(&mdev->receiver) == Exiting)
861 				goto out_release_sockets;
862 		}
863 
864 		if (sock && msock) {
865 			ok = drbd_socket_okay(mdev, &sock);
866 			ok = drbd_socket_okay(mdev, &msock) && ok;
867 			if (ok)
868 				break;
869 		}
870 	} while (1);
871 
872 	msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
873 	sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
874 
875 	sock->sk->sk_allocation = GFP_NOIO;
876 	msock->sk->sk_allocation = GFP_NOIO;
877 
878 	sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
879 	msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
880 
881 	/* NOT YET ...
882 	 * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
883 	 * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
884 	 * first set it to the P_HAND_SHAKE timeout,
885 	 * which we set to 4x the configured ping_timeout. */
886 	sock->sk->sk_sndtimeo =
887 	sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
888 
889 	msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
890 	msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
891 
892 	/* we don't want delays.
893 	 * we use TCP_CORK where apropriate, though */
894 	drbd_tcp_nodelay(sock);
895 	drbd_tcp_nodelay(msock);
896 
897 	mdev->data.socket = sock;
898 	mdev->meta.socket = msock;
899 	mdev->last_received = jiffies;
900 
901 	D_ASSERT(mdev->asender.task == NULL);
902 
903 	h = drbd_do_handshake(mdev);
904 	if (h <= 0)
905 		return h;
906 
907 	if (mdev->cram_hmac_tfm) {
908 		/* drbd_request_state(mdev, NS(conn, WFAuth)); */
909 		switch (drbd_do_auth(mdev)) {
910 		case -1:
911 			dev_err(DEV, "Authentication of peer failed\n");
912 			return -1;
913 		case 0:
914 			dev_err(DEV, "Authentication of peer failed, trying again.\n");
915 			return 0;
916 		}
917 	}
918 
919 	if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
920 		return 0;
921 
922 	sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
923 	sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
924 
925 	atomic_set(&mdev->packet_seq, 0);
926 	mdev->peer_seq = 0;
927 
928 	drbd_thread_start(&mdev->asender);
929 
930 	if (!drbd_send_protocol(mdev))
931 		return -1;
932 	drbd_send_sync_param(mdev, &mdev->sync_conf);
933 	drbd_send_sizes(mdev, 0, 0);
934 	drbd_send_uuids(mdev);
935 	drbd_send_state(mdev);
936 	clear_bit(USE_DEGR_WFC_T, &mdev->flags);
937 	clear_bit(RESIZE_PENDING, &mdev->flags);
938 
939 	return 1;
940 
941 out_release_sockets:
942 	if (sock)
943 		sock_release(sock);
944 	if (msock)
945 		sock_release(msock);
946 	return -1;
947 }
948 
949 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
950 {
951 	int r;
952 
953 	r = drbd_recv(mdev, h, sizeof(*h));
954 
955 	if (unlikely(r != sizeof(*h))) {
956 		dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
957 		return FALSE;
958 	};
959 	h->command = be16_to_cpu(h->command);
960 	h->length  = be16_to_cpu(h->length);
961 	if (unlikely(h->magic != BE_DRBD_MAGIC)) {
962 		dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
963 		    (long)be32_to_cpu(h->magic),
964 		    h->command, h->length);
965 		return FALSE;
966 	}
967 	mdev->last_received = jiffies;
968 
969 	return TRUE;
970 }
971 
972 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
973 {
974 	int rv;
975 
976 	if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
977 		rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
978 					NULL, BLKDEV_IFL_WAIT);
979 		if (rv) {
980 			dev_err(DEV, "local disk flush failed with status %d\n", rv);
981 			/* would rather check on EOPNOTSUPP, but that is not reliable.
982 			 * don't try again for ANY return value != 0
983 			 * if (rv == -EOPNOTSUPP) */
984 			drbd_bump_write_ordering(mdev, WO_drain_io);
985 		}
986 		put_ldev(mdev);
987 	}
988 
989 	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
990 }
991 
992 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
993 {
994 	struct flush_work *fw = (struct flush_work *)w;
995 	struct drbd_epoch *epoch = fw->epoch;
996 
997 	kfree(w);
998 
999 	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
1000 		drbd_flush_after_epoch(mdev, epoch);
1001 
1002 	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
1003 			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
1004 
1005 	return 1;
1006 }
1007 
1008 /**
1009  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1010  * @mdev:	DRBD device.
1011  * @epoch:	Epoch object.
1012  * @ev:		Epoch event.
1013  */
1014 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1015 					       struct drbd_epoch *epoch,
1016 					       enum epoch_event ev)
1017 {
1018 	int finish, epoch_size;
1019 	struct drbd_epoch *next_epoch;
1020 	int schedule_flush = 0;
1021 	enum finish_epoch rv = FE_STILL_LIVE;
1022 
1023 	spin_lock(&mdev->epoch_lock);
1024 	do {
1025 		next_epoch = NULL;
1026 		finish = 0;
1027 
1028 		epoch_size = atomic_read(&epoch->epoch_size);
1029 
1030 		switch (ev & ~EV_CLEANUP) {
1031 		case EV_PUT:
1032 			atomic_dec(&epoch->active);
1033 			break;
1034 		case EV_GOT_BARRIER_NR:
1035 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1036 
1037 			/* Special case: If we just switched from WO_bio_barrier to
1038 			   WO_bdev_flush we should not finish the current epoch */
1039 			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1040 			    mdev->write_ordering != WO_bio_barrier &&
1041 			    epoch == mdev->current_epoch)
1042 				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1043 			break;
1044 		case EV_BARRIER_DONE:
1045 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1046 			break;
1047 		case EV_BECAME_LAST:
1048 			/* nothing to do*/
1049 			break;
1050 		}
1051 
1052 		if (epoch_size != 0 &&
1053 		    atomic_read(&epoch->active) == 0 &&
1054 		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1055 		    epoch->list.prev == &mdev->current_epoch->list &&
1056 		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1057 			/* Nearly all conditions are met to finish that epoch... */
1058 			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1059 			    mdev->write_ordering == WO_none ||
1060 			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1061 			    ev & EV_CLEANUP) {
1062 				finish = 1;
1063 				set_bit(DE_IS_FINISHING, &epoch->flags);
1064 			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1065 				 mdev->write_ordering == WO_bio_barrier) {
1066 				atomic_inc(&epoch->active);
1067 				schedule_flush = 1;
1068 			}
1069 		}
1070 		if (finish) {
1071 			if (!(ev & EV_CLEANUP)) {
1072 				spin_unlock(&mdev->epoch_lock);
1073 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1074 				spin_lock(&mdev->epoch_lock);
1075 			}
1076 			dec_unacked(mdev);
1077 
1078 			if (mdev->current_epoch != epoch) {
1079 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1080 				list_del(&epoch->list);
1081 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1082 				mdev->epochs--;
1083 				kfree(epoch);
1084 
1085 				if (rv == FE_STILL_LIVE)
1086 					rv = FE_DESTROYED;
1087 			} else {
1088 				epoch->flags = 0;
1089 				atomic_set(&epoch->epoch_size, 0);
1090 				/* atomic_set(&epoch->active, 0); is alrady zero */
1091 				if (rv == FE_STILL_LIVE)
1092 					rv = FE_RECYCLED;
1093 			}
1094 		}
1095 
1096 		if (!next_epoch)
1097 			break;
1098 
1099 		epoch = next_epoch;
1100 	} while (1);
1101 
1102 	spin_unlock(&mdev->epoch_lock);
1103 
1104 	if (schedule_flush) {
1105 		struct flush_work *fw;
1106 		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1107 		if (fw) {
1108 			fw->w.cb = w_flush;
1109 			fw->epoch = epoch;
1110 			drbd_queue_work(&mdev->data.work, &fw->w);
1111 		} else {
1112 			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1113 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1114 			/* That is not a recursion, only one level */
1115 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1116 			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1117 		}
1118 	}
1119 
1120 	return rv;
1121 }
1122 
1123 /**
1124  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1125  * @mdev:	DRBD device.
1126  * @wo:		Write ordering method to try.
1127  */
1128 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1129 {
1130 	enum write_ordering_e pwo;
1131 	static char *write_ordering_str[] = {
1132 		[WO_none] = "none",
1133 		[WO_drain_io] = "drain",
1134 		[WO_bdev_flush] = "flush",
1135 		[WO_bio_barrier] = "barrier",
1136 	};
1137 
1138 	pwo = mdev->write_ordering;
1139 	wo = min(pwo, wo);
1140 	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1141 		wo = WO_bdev_flush;
1142 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1143 		wo = WO_drain_io;
1144 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1145 		wo = WO_none;
1146 	mdev->write_ordering = wo;
1147 	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1148 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1149 }
1150 
1151 /**
1152  * drbd_submit_ee()
1153  * @mdev:	DRBD device.
1154  * @e:		epoch entry
1155  * @rw:		flag field, see bio->bi_rw
1156  */
1157 /* TODO allocate from our own bio_set. */
1158 int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1159 		const unsigned rw, const int fault_type)
1160 {
1161 	struct bio *bios = NULL;
1162 	struct bio *bio;
1163 	struct page *page = e->pages;
1164 	sector_t sector = e->sector;
1165 	unsigned ds = e->size;
1166 	unsigned n_bios = 0;
1167 	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1168 
1169 	/* In most cases, we will only need one bio.  But in case the lower
1170 	 * level restrictions happen to be different at this offset on this
1171 	 * side than those of the sending peer, we may need to submit the
1172 	 * request in more than one bio. */
1173 next_bio:
1174 	bio = bio_alloc(GFP_NOIO, nr_pages);
1175 	if (!bio) {
1176 		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1177 		goto fail;
1178 	}
1179 	/* > e->sector, unless this is the first bio */
1180 	bio->bi_sector = sector;
1181 	bio->bi_bdev = mdev->ldev->backing_bdev;
1182 	/* we special case some flags in the multi-bio case, see below
1183 	 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
1184 	bio->bi_rw = rw;
1185 	bio->bi_private = e;
1186 	bio->bi_end_io = drbd_endio_sec;
1187 
1188 	bio->bi_next = bios;
1189 	bios = bio;
1190 	++n_bios;
1191 
1192 	page_chain_for_each(page) {
1193 		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1194 		if (!bio_add_page(bio, page, len, 0)) {
1195 			/* a single page must always be possible! */
1196 			BUG_ON(bio->bi_vcnt == 0);
1197 			goto next_bio;
1198 		}
1199 		ds -= len;
1200 		sector += len >> 9;
1201 		--nr_pages;
1202 	}
1203 	D_ASSERT(page == NULL);
1204 	D_ASSERT(ds == 0);
1205 
1206 	atomic_set(&e->pending_bios, n_bios);
1207 	do {
1208 		bio = bios;
1209 		bios = bios->bi_next;
1210 		bio->bi_next = NULL;
1211 
1212 		/* strip off BIO_RW_UNPLUG unless it is the last bio */
1213 		if (bios)
1214 			bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
1215 
1216 		drbd_generic_make_request(mdev, fault_type, bio);
1217 
1218 		/* strip off BIO_RW_BARRIER,
1219 		 * unless it is the first or last bio */
1220 		if (bios && bios->bi_next)
1221 			bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
1222 	} while (bios);
1223 	maybe_kick_lo(mdev);
1224 	return 0;
1225 
1226 fail:
1227 	while (bios) {
1228 		bio = bios;
1229 		bios = bios->bi_next;
1230 		bio_put(bio);
1231 	}
1232 	return -ENOMEM;
1233 }
1234 
1235 /**
1236  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1237  * @mdev:	DRBD device.
1238  * @w:		work object.
1239  * @cancel:	The connection will be closed anyways (unused in this callback)
1240  */
1241 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1242 {
1243 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1244 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1245 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1246 	   so that we can finish that epoch in drbd_may_finish_epoch().
1247 	   That is necessary if we already have a long chain of Epochs, before
1248 	   we realize that BIO_RW_BARRIER is actually not supported */
1249 
1250 	/* As long as the -ENOTSUPP on the barrier is reported immediately
1251 	   that will never trigger. If it is reported late, we will just
1252 	   print that warning and continue correctly for all future requests
1253 	   with WO_bdev_flush */
1254 	if (previous_epoch(mdev, e->epoch))
1255 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1256 
1257 	/* we still have a local reference,
1258 	 * get_ldev was done in receive_Data. */
1259 
1260 	e->w.cb = e_end_block;
1261 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
1262 		/* drbd_submit_ee fails for one reason only:
1263 		 * if was not able to allocate sufficient bios.
1264 		 * requeue, try again later. */
1265 		e->w.cb = w_e_reissue;
1266 		drbd_queue_work(&mdev->data.work, &e->w);
1267 	}
1268 	return 1;
1269 }
1270 
1271 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1272 {
1273 	int rv, issue_flush;
1274 	struct p_barrier *p = (struct p_barrier *)h;
1275 	struct drbd_epoch *epoch;
1276 
1277 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1278 
1279 	rv = drbd_recv(mdev, h->payload, h->length);
1280 	ERR_IF(rv != h->length) return FALSE;
1281 
1282 	inc_unacked(mdev);
1283 
1284 	if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1285 		drbd_kick_lo(mdev);
1286 
1287 	mdev->current_epoch->barrier_nr = p->barrier;
1288 	rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1289 
1290 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1291 	 * the activity log, which means it would not be resynced in case the
1292 	 * R_PRIMARY crashes now.
1293 	 * Therefore we must send the barrier_ack after the barrier request was
1294 	 * completed. */
1295 	switch (mdev->write_ordering) {
1296 	case WO_bio_barrier:
1297 	case WO_none:
1298 		if (rv == FE_RECYCLED)
1299 			return TRUE;
1300 		break;
1301 
1302 	case WO_bdev_flush:
1303 	case WO_drain_io:
1304 		if (rv == FE_STILL_LIVE) {
1305 			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1306 			drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1307 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1308 		}
1309 		if (rv == FE_RECYCLED)
1310 			return TRUE;
1311 
1312 		/* The asender will send all the ACKs and barrier ACKs out, since
1313 		   all EEs moved from the active_ee to the done_ee. We need to
1314 		   provide a new epoch object for the EEs that come in soon */
1315 		break;
1316 	}
1317 
1318 	/* receiver context, in the writeout path of the other node.
1319 	 * avoid potential distributed deadlock */
1320 	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1321 	if (!epoch) {
1322 		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1323 		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1324 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1325 		if (issue_flush) {
1326 			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1327 			if (rv == FE_RECYCLED)
1328 				return TRUE;
1329 		}
1330 
1331 		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1332 
1333 		return TRUE;
1334 	}
1335 
1336 	epoch->flags = 0;
1337 	atomic_set(&epoch->epoch_size, 0);
1338 	atomic_set(&epoch->active, 0);
1339 
1340 	spin_lock(&mdev->epoch_lock);
1341 	if (atomic_read(&mdev->current_epoch->epoch_size)) {
1342 		list_add(&epoch->list, &mdev->current_epoch->list);
1343 		mdev->current_epoch = epoch;
1344 		mdev->epochs++;
1345 	} else {
1346 		/* The current_epoch got recycled while we allocated this one... */
1347 		kfree(epoch);
1348 	}
1349 	spin_unlock(&mdev->epoch_lock);
1350 
1351 	return TRUE;
1352 }
1353 
1354 /* used from receive_RSDataReply (recv_resync_read)
1355  * and from receive_Data */
1356 static struct drbd_epoch_entry *
1357 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1358 {
1359 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1360 	struct drbd_epoch_entry *e;
1361 	struct page *page;
1362 	int dgs, ds, rr;
1363 	void *dig_in = mdev->int_dig_in;
1364 	void *dig_vv = mdev->int_dig_vv;
1365 	unsigned long *data;
1366 
1367 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1368 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1369 
1370 	if (dgs) {
1371 		rr = drbd_recv(mdev, dig_in, dgs);
1372 		if (rr != dgs) {
1373 			dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1374 			     rr, dgs);
1375 			return NULL;
1376 		}
1377 	}
1378 
1379 	data_size -= dgs;
1380 
1381 	ERR_IF(data_size &  0x1ff) return NULL;
1382 	ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1383 
1384 	/* even though we trust out peer,
1385 	 * we sometimes have to double check. */
1386 	if (sector + (data_size>>9) > capacity) {
1387 		dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1388 			(unsigned long long)capacity,
1389 			(unsigned long long)sector, data_size);
1390 		return NULL;
1391 	}
1392 
1393 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1394 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1395 	 * which in turn might block on the other node at this very place.  */
1396 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1397 	if (!e)
1398 		return NULL;
1399 
1400 	ds = data_size;
1401 	page = e->pages;
1402 	page_chain_for_each(page) {
1403 		unsigned len = min_t(int, ds, PAGE_SIZE);
1404 		data = kmap(page);
1405 		rr = drbd_recv(mdev, data, len);
1406 		if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1407 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1408 			data[0] = data[0] ^ (unsigned long)-1;
1409 		}
1410 		kunmap(page);
1411 		if (rr != len) {
1412 			drbd_free_ee(mdev, e);
1413 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1414 			     rr, len);
1415 			return NULL;
1416 		}
1417 		ds -= rr;
1418 	}
1419 
1420 	if (dgs) {
1421 		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1422 		if (memcmp(dig_in, dig_vv, dgs)) {
1423 			dev_err(DEV, "Digest integrity check FAILED.\n");
1424 			drbd_bcast_ee(mdev, "digest failed",
1425 					dgs, dig_in, dig_vv, e);
1426 			drbd_free_ee(mdev, e);
1427 			return NULL;
1428 		}
1429 	}
1430 	mdev->recv_cnt += data_size>>9;
1431 	return e;
1432 }
1433 
1434 /* drbd_drain_block() just takes a data block
1435  * out of the socket input buffer, and discards it.
1436  */
1437 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1438 {
1439 	struct page *page;
1440 	int rr, rv = 1;
1441 	void *data;
1442 
1443 	if (!data_size)
1444 		return TRUE;
1445 
1446 	page = drbd_pp_alloc(mdev, 1, 1);
1447 
1448 	data = kmap(page);
1449 	while (data_size) {
1450 		rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1451 		if (rr != min_t(int, data_size, PAGE_SIZE)) {
1452 			rv = 0;
1453 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1454 			     rr, min_t(int, data_size, PAGE_SIZE));
1455 			break;
1456 		}
1457 		data_size -= rr;
1458 	}
1459 	kunmap(page);
1460 	drbd_pp_free(mdev, page);
1461 	return rv;
1462 }
1463 
1464 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1465 			   sector_t sector, int data_size)
1466 {
1467 	struct bio_vec *bvec;
1468 	struct bio *bio;
1469 	int dgs, rr, i, expect;
1470 	void *dig_in = mdev->int_dig_in;
1471 	void *dig_vv = mdev->int_dig_vv;
1472 
1473 	dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1474 		crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1475 
1476 	if (dgs) {
1477 		rr = drbd_recv(mdev, dig_in, dgs);
1478 		if (rr != dgs) {
1479 			dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1480 			     rr, dgs);
1481 			return 0;
1482 		}
1483 	}
1484 
1485 	data_size -= dgs;
1486 
1487 	/* optimistically update recv_cnt.  if receiving fails below,
1488 	 * we disconnect anyways, and counters will be reset. */
1489 	mdev->recv_cnt += data_size>>9;
1490 
1491 	bio = req->master_bio;
1492 	D_ASSERT(sector == bio->bi_sector);
1493 
1494 	bio_for_each_segment(bvec, bio, i) {
1495 		expect = min_t(int, data_size, bvec->bv_len);
1496 		rr = drbd_recv(mdev,
1497 			     kmap(bvec->bv_page)+bvec->bv_offset,
1498 			     expect);
1499 		kunmap(bvec->bv_page);
1500 		if (rr != expect) {
1501 			dev_warn(DEV, "short read receiving data reply: "
1502 			     "read %d expected %d\n",
1503 			     rr, expect);
1504 			return 0;
1505 		}
1506 		data_size -= rr;
1507 	}
1508 
1509 	if (dgs) {
1510 		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1511 		if (memcmp(dig_in, dig_vv, dgs)) {
1512 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1513 			return 0;
1514 		}
1515 	}
1516 
1517 	D_ASSERT(data_size == 0);
1518 	return 1;
1519 }
1520 
1521 /* e_end_resync_block() is called via
1522  * drbd_process_done_ee() by asender only */
1523 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1524 {
1525 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1526 	sector_t sector = e->sector;
1527 	int ok;
1528 
1529 	D_ASSERT(hlist_unhashed(&e->colision));
1530 
1531 	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1532 		drbd_set_in_sync(mdev, sector, e->size);
1533 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1534 	} else {
1535 		/* Record failure to sync */
1536 		drbd_rs_failed_io(mdev, sector, e->size);
1537 
1538 		ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1539 	}
1540 	dec_unacked(mdev);
1541 
1542 	return ok;
1543 }
1544 
1545 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1546 {
1547 	struct drbd_epoch_entry *e;
1548 
1549 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1550 	if (!e)
1551 		goto fail;
1552 
1553 	dec_rs_pending(mdev);
1554 
1555 	inc_unacked(mdev);
1556 	/* corresponding dec_unacked() in e_end_resync_block()
1557 	 * respective _drbd_clear_done_ee */
1558 
1559 	e->w.cb = e_end_resync_block;
1560 
1561 	spin_lock_irq(&mdev->req_lock);
1562 	list_add(&e->w.list, &mdev->sync_ee);
1563 	spin_unlock_irq(&mdev->req_lock);
1564 
1565 	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1566 		return TRUE;
1567 
1568 	drbd_free_ee(mdev, e);
1569 fail:
1570 	put_ldev(mdev);
1571 	return FALSE;
1572 }
1573 
1574 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1575 {
1576 	struct drbd_request *req;
1577 	sector_t sector;
1578 	unsigned int header_size, data_size;
1579 	int ok;
1580 	struct p_data *p = (struct p_data *)h;
1581 
1582 	header_size = sizeof(*p) - sizeof(*h);
1583 	data_size   = h->length  - header_size;
1584 
1585 	ERR_IF(data_size == 0) return FALSE;
1586 
1587 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1588 		return FALSE;
1589 
1590 	sector = be64_to_cpu(p->sector);
1591 
1592 	spin_lock_irq(&mdev->req_lock);
1593 	req = _ar_id_to_req(mdev, p->block_id, sector);
1594 	spin_unlock_irq(&mdev->req_lock);
1595 	if (unlikely(!req)) {
1596 		dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1597 		return FALSE;
1598 	}
1599 
1600 	/* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1601 	 * special casing it there for the various failure cases.
1602 	 * still no race with drbd_fail_pending_reads */
1603 	ok = recv_dless_read(mdev, req, sector, data_size);
1604 
1605 	if (ok)
1606 		req_mod(req, data_received);
1607 	/* else: nothing. handled from drbd_disconnect...
1608 	 * I don't think we may complete this just yet
1609 	 * in case we are "on-disconnect: freeze" */
1610 
1611 	return ok;
1612 }
1613 
1614 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1615 {
1616 	sector_t sector;
1617 	unsigned int header_size, data_size;
1618 	int ok;
1619 	struct p_data *p = (struct p_data *)h;
1620 
1621 	header_size = sizeof(*p) - sizeof(*h);
1622 	data_size   = h->length  - header_size;
1623 
1624 	ERR_IF(data_size == 0) return FALSE;
1625 
1626 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1627 		return FALSE;
1628 
1629 	sector = be64_to_cpu(p->sector);
1630 	D_ASSERT(p->block_id == ID_SYNCER);
1631 
1632 	if (get_ldev(mdev)) {
1633 		/* data is submitted to disk within recv_resync_read.
1634 		 * corresponding put_ldev done below on error,
1635 		 * or in drbd_endio_write_sec. */
1636 		ok = recv_resync_read(mdev, sector, data_size);
1637 	} else {
1638 		if (__ratelimit(&drbd_ratelimit_state))
1639 			dev_err(DEV, "Can not write resync data to local disk.\n");
1640 
1641 		ok = drbd_drain_block(mdev, data_size);
1642 
1643 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1644 	}
1645 
1646 	return ok;
1647 }
1648 
1649 /* e_end_block() is called via drbd_process_done_ee().
1650  * this means this function only runs in the asender thread
1651  */
1652 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1653 {
1654 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1655 	sector_t sector = e->sector;
1656 	struct drbd_epoch *epoch;
1657 	int ok = 1, pcmd;
1658 
1659 	if (e->flags & EE_IS_BARRIER) {
1660 		epoch = previous_epoch(mdev, e->epoch);
1661 		if (epoch)
1662 			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1663 	}
1664 
1665 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1666 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1667 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1668 				mdev->state.conn <= C_PAUSED_SYNC_T &&
1669 				e->flags & EE_MAY_SET_IN_SYNC) ?
1670 				P_RS_WRITE_ACK : P_WRITE_ACK;
1671 			ok &= drbd_send_ack(mdev, pcmd, e);
1672 			if (pcmd == P_RS_WRITE_ACK)
1673 				drbd_set_in_sync(mdev, sector, e->size);
1674 		} else {
1675 			ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1676 			/* we expect it to be marked out of sync anyways...
1677 			 * maybe assert this?  */
1678 		}
1679 		dec_unacked(mdev);
1680 	}
1681 	/* we delete from the conflict detection hash _after_ we sent out the
1682 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1683 	if (mdev->net_conf->two_primaries) {
1684 		spin_lock_irq(&mdev->req_lock);
1685 		D_ASSERT(!hlist_unhashed(&e->colision));
1686 		hlist_del_init(&e->colision);
1687 		spin_unlock_irq(&mdev->req_lock);
1688 	} else {
1689 		D_ASSERT(hlist_unhashed(&e->colision));
1690 	}
1691 
1692 	drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1693 
1694 	return ok;
1695 }
1696 
1697 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1698 {
1699 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1700 	int ok = 1;
1701 
1702 	D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1703 	ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1704 
1705 	spin_lock_irq(&mdev->req_lock);
1706 	D_ASSERT(!hlist_unhashed(&e->colision));
1707 	hlist_del_init(&e->colision);
1708 	spin_unlock_irq(&mdev->req_lock);
1709 
1710 	dec_unacked(mdev);
1711 
1712 	return ok;
1713 }
1714 
1715 /* Called from receive_Data.
1716  * Synchronize packets on sock with packets on msock.
1717  *
1718  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1719  * packet traveling on msock, they are still processed in the order they have
1720  * been sent.
1721  *
1722  * Note: we don't care for Ack packets overtaking P_DATA packets.
1723  *
1724  * In case packet_seq is larger than mdev->peer_seq number, there are
1725  * outstanding packets on the msock. We wait for them to arrive.
1726  * In case we are the logically next packet, we update mdev->peer_seq
1727  * ourselves. Correctly handles 32bit wrap around.
1728  *
1729  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1730  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1731  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1732  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1733  *
1734  * returns 0 if we may process the packet,
1735  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1736 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1737 {
1738 	DEFINE_WAIT(wait);
1739 	unsigned int p_seq;
1740 	long timeout;
1741 	int ret = 0;
1742 	spin_lock(&mdev->peer_seq_lock);
1743 	for (;;) {
1744 		prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1745 		if (seq_le(packet_seq, mdev->peer_seq+1))
1746 			break;
1747 		if (signal_pending(current)) {
1748 			ret = -ERESTARTSYS;
1749 			break;
1750 		}
1751 		p_seq = mdev->peer_seq;
1752 		spin_unlock(&mdev->peer_seq_lock);
1753 		timeout = schedule_timeout(30*HZ);
1754 		spin_lock(&mdev->peer_seq_lock);
1755 		if (timeout == 0 && p_seq == mdev->peer_seq) {
1756 			ret = -ETIMEDOUT;
1757 			dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1758 			break;
1759 		}
1760 	}
1761 	finish_wait(&mdev->seq_wait, &wait);
1762 	if (mdev->peer_seq+1 == packet_seq)
1763 		mdev->peer_seq++;
1764 	spin_unlock(&mdev->peer_seq_lock);
1765 	return ret;
1766 }
1767 
1768 /* mirrored write */
1769 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1770 {
1771 	sector_t sector;
1772 	struct drbd_epoch_entry *e;
1773 	struct p_data *p = (struct p_data *)h;
1774 	int header_size, data_size;
1775 	int rw = WRITE;
1776 	u32 dp_flags;
1777 
1778 	header_size = sizeof(*p) - sizeof(*h);
1779 	data_size   = h->length  - header_size;
1780 
1781 	ERR_IF(data_size == 0) return FALSE;
1782 
1783 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
1784 		return FALSE;
1785 
1786 	if (!get_ldev(mdev)) {
1787 		if (__ratelimit(&drbd_ratelimit_state))
1788 			dev_err(DEV, "Can not write mirrored data block "
1789 			    "to local disk.\n");
1790 		spin_lock(&mdev->peer_seq_lock);
1791 		if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1792 			mdev->peer_seq++;
1793 		spin_unlock(&mdev->peer_seq_lock);
1794 
1795 		drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1796 		atomic_inc(&mdev->current_epoch->epoch_size);
1797 		return drbd_drain_block(mdev, data_size);
1798 	}
1799 
1800 	/* get_ldev(mdev) successful.
1801 	 * Corresponding put_ldev done either below (on various errors),
1802 	 * or in drbd_endio_write_sec, if we successfully submit the data at
1803 	 * the end of this function. */
1804 
1805 	sector = be64_to_cpu(p->sector);
1806 	e = read_in_block(mdev, p->block_id, sector, data_size);
1807 	if (!e) {
1808 		put_ldev(mdev);
1809 		return FALSE;
1810 	}
1811 
1812 	e->w.cb = e_end_block;
1813 
1814 	spin_lock(&mdev->epoch_lock);
1815 	e->epoch = mdev->current_epoch;
1816 	atomic_inc(&e->epoch->epoch_size);
1817 	atomic_inc(&e->epoch->active);
1818 
1819 	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1820 		struct drbd_epoch *epoch;
1821 		/* Issue a barrier if we start a new epoch, and the previous epoch
1822 		   was not a epoch containing a single request which already was
1823 		   a Barrier. */
1824 		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1825 		if (epoch == e->epoch) {
1826 			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1827 			rw |= (1<<BIO_RW_BARRIER);
1828 			e->flags |= EE_IS_BARRIER;
1829 		} else {
1830 			if (atomic_read(&epoch->epoch_size) > 1 ||
1831 			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1832 				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1833 				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1834 				rw |= (1<<BIO_RW_BARRIER);
1835 				e->flags |= EE_IS_BARRIER;
1836 			}
1837 		}
1838 	}
1839 	spin_unlock(&mdev->epoch_lock);
1840 
1841 	dp_flags = be32_to_cpu(p->dp_flags);
1842 	if (dp_flags & DP_HARDBARRIER) {
1843 		dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1844 		/* rw |= (1<<BIO_RW_BARRIER); */
1845 	}
1846 	if (dp_flags & DP_RW_SYNC)
1847 		rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1848 	if (dp_flags & DP_MAY_SET_IN_SYNC)
1849 		e->flags |= EE_MAY_SET_IN_SYNC;
1850 
1851 	/* I'm the receiver, I do hold a net_cnt reference. */
1852 	if (!mdev->net_conf->two_primaries) {
1853 		spin_lock_irq(&mdev->req_lock);
1854 	} else {
1855 		/* don't get the req_lock yet,
1856 		 * we may sleep in drbd_wait_peer_seq */
1857 		const int size = e->size;
1858 		const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1859 		DEFINE_WAIT(wait);
1860 		struct drbd_request *i;
1861 		struct hlist_node *n;
1862 		struct hlist_head *slot;
1863 		int first;
1864 
1865 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1866 		BUG_ON(mdev->ee_hash == NULL);
1867 		BUG_ON(mdev->tl_hash == NULL);
1868 
1869 		/* conflict detection and handling:
1870 		 * 1. wait on the sequence number,
1871 		 *    in case this data packet overtook ACK packets.
1872 		 * 2. check our hash tables for conflicting requests.
1873 		 *    we only need to walk the tl_hash, since an ee can not
1874 		 *    have a conflict with an other ee: on the submitting
1875 		 *    node, the corresponding req had already been conflicting,
1876 		 *    and a conflicting req is never sent.
1877 		 *
1878 		 * Note: for two_primaries, we are protocol C,
1879 		 * so there cannot be any request that is DONE
1880 		 * but still on the transfer log.
1881 		 *
1882 		 * unconditionally add to the ee_hash.
1883 		 *
1884 		 * if no conflicting request is found:
1885 		 *    submit.
1886 		 *
1887 		 * if any conflicting request is found
1888 		 * that has not yet been acked,
1889 		 * AND I have the "discard concurrent writes" flag:
1890 		 *	 queue (via done_ee) the P_DISCARD_ACK; OUT.
1891 		 *
1892 		 * if any conflicting request is found:
1893 		 *	 block the receiver, waiting on misc_wait
1894 		 *	 until no more conflicting requests are there,
1895 		 *	 or we get interrupted (disconnect).
1896 		 *
1897 		 *	 we do not just write after local io completion of those
1898 		 *	 requests, but only after req is done completely, i.e.
1899 		 *	 we wait for the P_DISCARD_ACK to arrive!
1900 		 *
1901 		 *	 then proceed normally, i.e. submit.
1902 		 */
1903 		if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1904 			goto out_interrupted;
1905 
1906 		spin_lock_irq(&mdev->req_lock);
1907 
1908 		hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1909 
1910 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1911 		slot = tl_hash_slot(mdev, sector);
1912 		first = 1;
1913 		for (;;) {
1914 			int have_unacked = 0;
1915 			int have_conflict = 0;
1916 			prepare_to_wait(&mdev->misc_wait, &wait,
1917 				TASK_INTERRUPTIBLE);
1918 			hlist_for_each_entry(i, n, slot, colision) {
1919 				if (OVERLAPS) {
1920 					/* only ALERT on first iteration,
1921 					 * we may be woken up early... */
1922 					if (first)
1923 						dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1924 						      "	new: %llus +%u; pending: %llus +%u\n",
1925 						      current->comm, current->pid,
1926 						      (unsigned long long)sector, size,
1927 						      (unsigned long long)i->sector, i->size);
1928 					if (i->rq_state & RQ_NET_PENDING)
1929 						++have_unacked;
1930 					++have_conflict;
1931 				}
1932 			}
1933 #undef OVERLAPS
1934 			if (!have_conflict)
1935 				break;
1936 
1937 			/* Discard Ack only for the _first_ iteration */
1938 			if (first && discard && have_unacked) {
1939 				dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1940 				     (unsigned long long)sector);
1941 				inc_unacked(mdev);
1942 				e->w.cb = e_send_discard_ack;
1943 				list_add_tail(&e->w.list, &mdev->done_ee);
1944 
1945 				spin_unlock_irq(&mdev->req_lock);
1946 
1947 				/* we could probably send that P_DISCARD_ACK ourselves,
1948 				 * but I don't like the receiver using the msock */
1949 
1950 				put_ldev(mdev);
1951 				wake_asender(mdev);
1952 				finish_wait(&mdev->misc_wait, &wait);
1953 				return TRUE;
1954 			}
1955 
1956 			if (signal_pending(current)) {
1957 				hlist_del_init(&e->colision);
1958 
1959 				spin_unlock_irq(&mdev->req_lock);
1960 
1961 				finish_wait(&mdev->misc_wait, &wait);
1962 				goto out_interrupted;
1963 			}
1964 
1965 			spin_unlock_irq(&mdev->req_lock);
1966 			if (first) {
1967 				first = 0;
1968 				dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1969 				     "sec=%llus\n", (unsigned long long)sector);
1970 			} else if (discard) {
1971 				/* we had none on the first iteration.
1972 				 * there must be none now. */
1973 				D_ASSERT(have_unacked == 0);
1974 			}
1975 			schedule();
1976 			spin_lock_irq(&mdev->req_lock);
1977 		}
1978 		finish_wait(&mdev->misc_wait, &wait);
1979 	}
1980 
1981 	list_add(&e->w.list, &mdev->active_ee);
1982 	spin_unlock_irq(&mdev->req_lock);
1983 
1984 	switch (mdev->net_conf->wire_protocol) {
1985 	case DRBD_PROT_C:
1986 		inc_unacked(mdev);
1987 		/* corresponding dec_unacked() in e_end_block()
1988 		 * respective _drbd_clear_done_ee */
1989 		break;
1990 	case DRBD_PROT_B:
1991 		/* I really don't like it that the receiver thread
1992 		 * sends on the msock, but anyways */
1993 		drbd_send_ack(mdev, P_RECV_ACK, e);
1994 		break;
1995 	case DRBD_PROT_A:
1996 		/* nothing to do */
1997 		break;
1998 	}
1999 
2000 	if (mdev->state.pdsk == D_DISKLESS) {
2001 		/* In case we have the only disk of the cluster, */
2002 		drbd_set_out_of_sync(mdev, e->sector, e->size);
2003 		e->flags |= EE_CALL_AL_COMPLETE_IO;
2004 		drbd_al_begin_io(mdev, e->sector);
2005 	}
2006 
2007 	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
2008 		return TRUE;
2009 
2010 out_interrupted:
2011 	/* yes, the epoch_size now is imbalanced.
2012 	 * but we drop the connection anyways, so we don't have a chance to
2013 	 * receive a barrier... atomic_inc(&mdev->epoch_size); */
2014 	put_ldev(mdev);
2015 	drbd_free_ee(mdev, e);
2016 	return FALSE;
2017 }
2018 
2019 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
2020 {
2021 	sector_t sector;
2022 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
2023 	struct drbd_epoch_entry *e;
2024 	struct digest_info *di = NULL;
2025 	int size, digest_size;
2026 	unsigned int fault_type;
2027 	struct p_block_req *p =
2028 		(struct p_block_req *)h;
2029 	const int brps = sizeof(*p)-sizeof(*h);
2030 
2031 	if (drbd_recv(mdev, h->payload, brps) != brps)
2032 		return FALSE;
2033 
2034 	sector = be64_to_cpu(p->sector);
2035 	size   = be32_to_cpu(p->blksize);
2036 
2037 	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
2038 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2039 				(unsigned long long)sector, size);
2040 		return FALSE;
2041 	}
2042 	if (sector + (size>>9) > capacity) {
2043 		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2044 				(unsigned long long)sector, size);
2045 		return FALSE;
2046 	}
2047 
2048 	if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2049 		if (__ratelimit(&drbd_ratelimit_state))
2050 			dev_err(DEV, "Can not satisfy peer's read request, "
2051 			    "no local data.\n");
2052 		drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
2053 				 P_NEG_RS_DREPLY , p);
2054 		return drbd_drain_block(mdev, h->length - brps);
2055 	}
2056 
2057 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2058 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2059 	 * which in turn might block on the other node at this very place.  */
2060 	e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
2061 	if (!e) {
2062 		put_ldev(mdev);
2063 		return FALSE;
2064 	}
2065 
2066 	switch (h->command) {
2067 	case P_DATA_REQUEST:
2068 		e->w.cb = w_e_end_data_req;
2069 		fault_type = DRBD_FAULT_DT_RD;
2070 		break;
2071 	case P_RS_DATA_REQUEST:
2072 		e->w.cb = w_e_end_rsdata_req;
2073 		fault_type = DRBD_FAULT_RS_RD;
2074 		/* Eventually this should become asynchronously. Currently it
2075 		 * blocks the whole receiver just to delay the reading of a
2076 		 * resync data block.
2077 		 * the drbd_work_queue mechanism is made for this...
2078 		 */
2079 		if (!drbd_rs_begin_io(mdev, sector)) {
2080 			/* we have been interrupted,
2081 			 * probably connection lost! */
2082 			D_ASSERT(signal_pending(current));
2083 			goto out_free_e;
2084 		}
2085 		break;
2086 
2087 	case P_OV_REPLY:
2088 	case P_CSUM_RS_REQUEST:
2089 		fault_type = DRBD_FAULT_RS_RD;
2090 		digest_size = h->length - brps ;
2091 		di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2092 		if (!di)
2093 			goto out_free_e;
2094 
2095 		di->digest_size = digest_size;
2096 		di->digest = (((char *)di)+sizeof(struct digest_info));
2097 
2098 		if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2099 			goto out_free_e;
2100 
2101 		e->block_id = (u64)(unsigned long)di;
2102 		if (h->command == P_CSUM_RS_REQUEST) {
2103 			D_ASSERT(mdev->agreed_pro_version >= 89);
2104 			e->w.cb = w_e_end_csum_rs_req;
2105 		} else if (h->command == P_OV_REPLY) {
2106 			e->w.cb = w_e_end_ov_reply;
2107 			dec_rs_pending(mdev);
2108 			break;
2109 		}
2110 
2111 		if (!drbd_rs_begin_io(mdev, sector)) {
2112 			/* we have been interrupted, probably connection lost! */
2113 			D_ASSERT(signal_pending(current));
2114 			goto out_free_e;
2115 		}
2116 		break;
2117 
2118 	case P_OV_REQUEST:
2119 		if (mdev->state.conn >= C_CONNECTED &&
2120 		    mdev->state.conn != C_VERIFY_T)
2121 			dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2122 				drbd_conn_str(mdev->state.conn));
2123 		if (mdev->ov_start_sector == ~(sector_t)0 &&
2124 		    mdev->agreed_pro_version >= 90) {
2125 			mdev->ov_start_sector = sector;
2126 			mdev->ov_position = sector;
2127 			mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2128 			dev_info(DEV, "Online Verify start sector: %llu\n",
2129 					(unsigned long long)sector);
2130 		}
2131 		e->w.cb = w_e_end_ov_req;
2132 		fault_type = DRBD_FAULT_RS_RD;
2133 		/* Eventually this should become asynchronous. Currently it
2134 		 * blocks the whole receiver just to delay the reading of a
2135 		 * resync data block.
2136 		 * the drbd_work_queue mechanism is made for this...
2137 		 */
2138 		if (!drbd_rs_begin_io(mdev, sector)) {
2139 			/* we have been interrupted,
2140 			 * probably connection lost! */
2141 			D_ASSERT(signal_pending(current));
2142 			goto out_free_e;
2143 		}
2144 		break;
2145 
2146 
2147 	default:
2148 		dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2149 		    cmdname(h->command));
2150 		fault_type = DRBD_FAULT_MAX;
2151 	}
2152 
2153 	spin_lock_irq(&mdev->req_lock);
2154 	list_add(&e->w.list, &mdev->read_ee);
2155 	spin_unlock_irq(&mdev->req_lock);
2156 
2157 	inc_unacked(mdev);
2158 
2159 	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2160 		return TRUE;
2161 
2162 out_free_e:
2163 	kfree(di);
2164 	put_ldev(mdev);
2165 	drbd_free_ee(mdev, e);
2166 	return FALSE;
2167 }
2168 
2169 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2170 {
2171 	int self, peer, rv = -100;
2172 	unsigned long ch_self, ch_peer;
2173 
2174 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2175 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2176 
2177 	ch_peer = mdev->p_uuid[UI_SIZE];
2178 	ch_self = mdev->comm_bm_set;
2179 
2180 	switch (mdev->net_conf->after_sb_0p) {
2181 	case ASB_CONSENSUS:
2182 	case ASB_DISCARD_SECONDARY:
2183 	case ASB_CALL_HELPER:
2184 		dev_err(DEV, "Configuration error.\n");
2185 		break;
2186 	case ASB_DISCONNECT:
2187 		break;
2188 	case ASB_DISCARD_YOUNGER_PRI:
2189 		if (self == 0 && peer == 1) {
2190 			rv = -1;
2191 			break;
2192 		}
2193 		if (self == 1 && peer == 0) {
2194 			rv =  1;
2195 			break;
2196 		}
2197 		/* Else fall through to one of the other strategies... */
2198 	case ASB_DISCARD_OLDER_PRI:
2199 		if (self == 0 && peer == 1) {
2200 			rv = 1;
2201 			break;
2202 		}
2203 		if (self == 1 && peer == 0) {
2204 			rv = -1;
2205 			break;
2206 		}
2207 		/* Else fall through to one of the other strategies... */
2208 		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2209 		     "Using discard-least-changes instead\n");
2210 	case ASB_DISCARD_ZERO_CHG:
2211 		if (ch_peer == 0 && ch_self == 0) {
2212 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2213 				? -1 : 1;
2214 			break;
2215 		} else {
2216 			if (ch_peer == 0) { rv =  1; break; }
2217 			if (ch_self == 0) { rv = -1; break; }
2218 		}
2219 		if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2220 			break;
2221 	case ASB_DISCARD_LEAST_CHG:
2222 		if	(ch_self < ch_peer)
2223 			rv = -1;
2224 		else if (ch_self > ch_peer)
2225 			rv =  1;
2226 		else /* ( ch_self == ch_peer ) */
2227 		     /* Well, then use something else. */
2228 			rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2229 				? -1 : 1;
2230 		break;
2231 	case ASB_DISCARD_LOCAL:
2232 		rv = -1;
2233 		break;
2234 	case ASB_DISCARD_REMOTE:
2235 		rv =  1;
2236 	}
2237 
2238 	return rv;
2239 }
2240 
2241 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2242 {
2243 	int self, peer, hg, rv = -100;
2244 
2245 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2246 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2247 
2248 	switch (mdev->net_conf->after_sb_1p) {
2249 	case ASB_DISCARD_YOUNGER_PRI:
2250 	case ASB_DISCARD_OLDER_PRI:
2251 	case ASB_DISCARD_LEAST_CHG:
2252 	case ASB_DISCARD_LOCAL:
2253 	case ASB_DISCARD_REMOTE:
2254 		dev_err(DEV, "Configuration error.\n");
2255 		break;
2256 	case ASB_DISCONNECT:
2257 		break;
2258 	case ASB_CONSENSUS:
2259 		hg = drbd_asb_recover_0p(mdev);
2260 		if (hg == -1 && mdev->state.role == R_SECONDARY)
2261 			rv = hg;
2262 		if (hg == 1  && mdev->state.role == R_PRIMARY)
2263 			rv = hg;
2264 		break;
2265 	case ASB_VIOLENTLY:
2266 		rv = drbd_asb_recover_0p(mdev);
2267 		break;
2268 	case ASB_DISCARD_SECONDARY:
2269 		return mdev->state.role == R_PRIMARY ? 1 : -1;
2270 	case ASB_CALL_HELPER:
2271 		hg = drbd_asb_recover_0p(mdev);
2272 		if (hg == -1 && mdev->state.role == R_PRIMARY) {
2273 			self = drbd_set_role(mdev, R_SECONDARY, 0);
2274 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2275 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2276 			  * we do not need to wait for the after state change work either. */
2277 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2278 			if (self != SS_SUCCESS) {
2279 				drbd_khelper(mdev, "pri-lost-after-sb");
2280 			} else {
2281 				dev_warn(DEV, "Successfully gave up primary role.\n");
2282 				rv = hg;
2283 			}
2284 		} else
2285 			rv = hg;
2286 	}
2287 
2288 	return rv;
2289 }
2290 
2291 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2292 {
2293 	int self, peer, hg, rv = -100;
2294 
2295 	self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2296 	peer = mdev->p_uuid[UI_BITMAP] & 1;
2297 
2298 	switch (mdev->net_conf->after_sb_2p) {
2299 	case ASB_DISCARD_YOUNGER_PRI:
2300 	case ASB_DISCARD_OLDER_PRI:
2301 	case ASB_DISCARD_LEAST_CHG:
2302 	case ASB_DISCARD_LOCAL:
2303 	case ASB_DISCARD_REMOTE:
2304 	case ASB_CONSENSUS:
2305 	case ASB_DISCARD_SECONDARY:
2306 		dev_err(DEV, "Configuration error.\n");
2307 		break;
2308 	case ASB_VIOLENTLY:
2309 		rv = drbd_asb_recover_0p(mdev);
2310 		break;
2311 	case ASB_DISCONNECT:
2312 		break;
2313 	case ASB_CALL_HELPER:
2314 		hg = drbd_asb_recover_0p(mdev);
2315 		if (hg == -1) {
2316 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2317 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
2318 			  * we do not need to wait for the after state change work either. */
2319 			self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2320 			if (self != SS_SUCCESS) {
2321 				drbd_khelper(mdev, "pri-lost-after-sb");
2322 			} else {
2323 				dev_warn(DEV, "Successfully gave up primary role.\n");
2324 				rv = hg;
2325 			}
2326 		} else
2327 			rv = hg;
2328 	}
2329 
2330 	return rv;
2331 }
2332 
2333 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2334 			   u64 bits, u64 flags)
2335 {
2336 	if (!uuid) {
2337 		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2338 		return;
2339 	}
2340 	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2341 	     text,
2342 	     (unsigned long long)uuid[UI_CURRENT],
2343 	     (unsigned long long)uuid[UI_BITMAP],
2344 	     (unsigned long long)uuid[UI_HISTORY_START],
2345 	     (unsigned long long)uuid[UI_HISTORY_END],
2346 	     (unsigned long long)bits,
2347 	     (unsigned long long)flags);
2348 }
2349 
2350 /*
2351   100	after split brain try auto recover
2352     2	C_SYNC_SOURCE set BitMap
2353     1	C_SYNC_SOURCE use BitMap
2354     0	no Sync
2355    -1	C_SYNC_TARGET use BitMap
2356    -2	C_SYNC_TARGET set BitMap
2357  -100	after split brain, disconnect
2358 -1000	unrelated data
2359  */
2360 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2361 {
2362 	u64 self, peer;
2363 	int i, j;
2364 
2365 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2366 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2367 
2368 	*rule_nr = 10;
2369 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2370 		return 0;
2371 
2372 	*rule_nr = 20;
2373 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2374 	     peer != UUID_JUST_CREATED)
2375 		return -2;
2376 
2377 	*rule_nr = 30;
2378 	if (self != UUID_JUST_CREATED &&
2379 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
2380 		return 2;
2381 
2382 	if (self == peer) {
2383 		int rct, dc; /* roles at crash time */
2384 
2385 		if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2386 
2387 			if (mdev->agreed_pro_version < 91)
2388 				return -1001;
2389 
2390 			if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2391 			    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2392 				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2393 				drbd_uuid_set_bm(mdev, 0UL);
2394 
2395 				drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2396 					       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2397 				*rule_nr = 34;
2398 			} else {
2399 				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2400 				*rule_nr = 36;
2401 			}
2402 
2403 			return 1;
2404 		}
2405 
2406 		if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2407 
2408 			if (mdev->agreed_pro_version < 91)
2409 				return -1001;
2410 
2411 			if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2412 			    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2413 				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2414 
2415 				mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2416 				mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2417 				mdev->p_uuid[UI_BITMAP] = 0UL;
2418 
2419 				drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2420 				*rule_nr = 35;
2421 			} else {
2422 				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2423 				*rule_nr = 37;
2424 			}
2425 
2426 			return -1;
2427 		}
2428 
2429 		/* Common power [off|failure] */
2430 		rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2431 			(mdev->p_uuid[UI_FLAGS] & 2);
2432 		/* lowest bit is set when we were primary,
2433 		 * next bit (weight 2) is set when peer was primary */
2434 		*rule_nr = 40;
2435 
2436 		switch (rct) {
2437 		case 0: /* !self_pri && !peer_pri */ return 0;
2438 		case 1: /*  self_pri && !peer_pri */ return 1;
2439 		case 2: /* !self_pri &&  peer_pri */ return -1;
2440 		case 3: /*  self_pri &&  peer_pri */
2441 			dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2442 			return dc ? -1 : 1;
2443 		}
2444 	}
2445 
2446 	*rule_nr = 50;
2447 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2448 	if (self == peer)
2449 		return -1;
2450 
2451 	*rule_nr = 51;
2452 	peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2453 	if (self == peer) {
2454 		self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2455 		peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2456 		if (self == peer) {
2457 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2458 			   resync as sync source modifications of the peer's UUIDs. */
2459 
2460 			if (mdev->agreed_pro_version < 91)
2461 				return -1001;
2462 
2463 			mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2464 			mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2465 			return -1;
2466 		}
2467 	}
2468 
2469 	*rule_nr = 60;
2470 	self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2471 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2472 		peer = mdev->p_uuid[i] & ~((u64)1);
2473 		if (self == peer)
2474 			return -2;
2475 	}
2476 
2477 	*rule_nr = 70;
2478 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2479 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2480 	if (self == peer)
2481 		return 1;
2482 
2483 	*rule_nr = 71;
2484 	self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2485 	if (self == peer) {
2486 		self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2487 		peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2488 		if (self == peer) {
2489 			/* The last P_SYNC_UUID did not get though. Undo the last start of
2490 			   resync as sync source modifications of our UUIDs. */
2491 
2492 			if (mdev->agreed_pro_version < 91)
2493 				return -1001;
2494 
2495 			_drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2496 			_drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2497 
2498 			dev_info(DEV, "Undid last start of resync:\n");
2499 
2500 			drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2501 				       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2502 
2503 			return 1;
2504 		}
2505 	}
2506 
2507 
2508 	*rule_nr = 80;
2509 	peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2510 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2511 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2512 		if (self == peer)
2513 			return 2;
2514 	}
2515 
2516 	*rule_nr = 90;
2517 	self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2518 	peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2519 	if (self == peer && self != ((u64)0))
2520 		return 100;
2521 
2522 	*rule_nr = 100;
2523 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2524 		self = mdev->ldev->md.uuid[i] & ~((u64)1);
2525 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2526 			peer = mdev->p_uuid[j] & ~((u64)1);
2527 			if (self == peer)
2528 				return -100;
2529 		}
2530 	}
2531 
2532 	return -1000;
2533 }
2534 
2535 /* drbd_sync_handshake() returns the new conn state on success, or
2536    CONN_MASK (-1) on failure.
2537  */
2538 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2539 					   enum drbd_disk_state peer_disk) __must_hold(local)
2540 {
2541 	int hg, rule_nr;
2542 	enum drbd_conns rv = C_MASK;
2543 	enum drbd_disk_state mydisk;
2544 
2545 	mydisk = mdev->state.disk;
2546 	if (mydisk == D_NEGOTIATING)
2547 		mydisk = mdev->new_state_tmp.disk;
2548 
2549 	dev_info(DEV, "drbd_sync_handshake:\n");
2550 	drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2551 	drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2552 		       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2553 
2554 	hg = drbd_uuid_compare(mdev, &rule_nr);
2555 
2556 	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2557 
2558 	if (hg == -1000) {
2559 		dev_alert(DEV, "Unrelated data, aborting!\n");
2560 		return C_MASK;
2561 	}
2562 	if (hg == -1001) {
2563 		dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2564 		return C_MASK;
2565 	}
2566 
2567 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2568 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2569 		int f = (hg == -100) || abs(hg) == 2;
2570 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
2571 		if (f)
2572 			hg = hg*2;
2573 		dev_info(DEV, "Becoming sync %s due to disk states.\n",
2574 		     hg > 0 ? "source" : "target");
2575 	}
2576 
2577 	if (abs(hg) == 100)
2578 		drbd_khelper(mdev, "initial-split-brain");
2579 
2580 	if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2581 		int pcount = (mdev->state.role == R_PRIMARY)
2582 			   + (peer_role == R_PRIMARY);
2583 		int forced = (hg == -100);
2584 
2585 		switch (pcount) {
2586 		case 0:
2587 			hg = drbd_asb_recover_0p(mdev);
2588 			break;
2589 		case 1:
2590 			hg = drbd_asb_recover_1p(mdev);
2591 			break;
2592 		case 2:
2593 			hg = drbd_asb_recover_2p(mdev);
2594 			break;
2595 		}
2596 		if (abs(hg) < 100) {
2597 			dev_warn(DEV, "Split-Brain detected, %d primaries, "
2598 			     "automatically solved. Sync from %s node\n",
2599 			     pcount, (hg < 0) ? "peer" : "this");
2600 			if (forced) {
2601 				dev_warn(DEV, "Doing a full sync, since"
2602 				     " UUIDs where ambiguous.\n");
2603 				hg = hg*2;
2604 			}
2605 		}
2606 	}
2607 
2608 	if (hg == -100) {
2609 		if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2610 			hg = -1;
2611 		if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2612 			hg = 1;
2613 
2614 		if (abs(hg) < 100)
2615 			dev_warn(DEV, "Split-Brain detected, manually solved. "
2616 			     "Sync from %s node\n",
2617 			     (hg < 0) ? "peer" : "this");
2618 	}
2619 
2620 	if (hg == -100) {
2621 		/* FIXME this log message is not correct if we end up here
2622 		 * after an attempted attach on a diskless node.
2623 		 * We just refuse to attach -- well, we drop the "connection"
2624 		 * to that disk, in a way... */
2625 		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2626 		drbd_khelper(mdev, "split-brain");
2627 		return C_MASK;
2628 	}
2629 
2630 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
2631 		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2632 		return C_MASK;
2633 	}
2634 
2635 	if (hg < 0 && /* by intention we do not use mydisk here. */
2636 	    mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2637 		switch (mdev->net_conf->rr_conflict) {
2638 		case ASB_CALL_HELPER:
2639 			drbd_khelper(mdev, "pri-lost");
2640 			/* fall through */
2641 		case ASB_DISCONNECT:
2642 			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2643 			return C_MASK;
2644 		case ASB_VIOLENTLY:
2645 			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2646 			     "assumption\n");
2647 		}
2648 	}
2649 
2650 	if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2651 		if (hg == 0)
2652 			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2653 		else
2654 			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2655 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2656 				 abs(hg) >= 2 ? "full" : "bit-map based");
2657 		return C_MASK;
2658 	}
2659 
2660 	if (abs(hg) >= 2) {
2661 		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2662 		if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2663 			return C_MASK;
2664 	}
2665 
2666 	if (hg > 0) { /* become sync source. */
2667 		rv = C_WF_BITMAP_S;
2668 	} else if (hg < 0) { /* become sync target */
2669 		rv = C_WF_BITMAP_T;
2670 	} else {
2671 		rv = C_CONNECTED;
2672 		if (drbd_bm_total_weight(mdev)) {
2673 			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2674 			     drbd_bm_total_weight(mdev));
2675 		}
2676 	}
2677 
2678 	return rv;
2679 }
2680 
2681 /* returns 1 if invalid */
2682 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2683 {
2684 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2685 	if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2686 	    (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2687 		return 0;
2688 
2689 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2690 	if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2691 	    self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2692 		return 1;
2693 
2694 	/* everything else is valid if they are equal on both sides. */
2695 	if (peer == self)
2696 		return 0;
2697 
2698 	/* everything es is invalid. */
2699 	return 1;
2700 }
2701 
2702 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2703 {
2704 	struct p_protocol *p = (struct p_protocol *)h;
2705 	int header_size, data_size;
2706 	int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2707 	int p_want_lose, p_two_primaries, cf;
2708 	char p_integrity_alg[SHARED_SECRET_MAX] = "";
2709 
2710 	header_size = sizeof(*p) - sizeof(*h);
2711 	data_size   = h->length  - header_size;
2712 
2713 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2714 		return FALSE;
2715 
2716 	p_proto		= be32_to_cpu(p->protocol);
2717 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
2718 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
2719 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
2720 	p_two_primaries = be32_to_cpu(p->two_primaries);
2721 	cf		= be32_to_cpu(p->conn_flags);
2722 	p_want_lose = cf & CF_WANT_LOSE;
2723 
2724 	clear_bit(CONN_DRY_RUN, &mdev->flags);
2725 
2726 	if (cf & CF_DRY_RUN)
2727 		set_bit(CONN_DRY_RUN, &mdev->flags);
2728 
2729 	if (p_proto != mdev->net_conf->wire_protocol) {
2730 		dev_err(DEV, "incompatible communication protocols\n");
2731 		goto disconnect;
2732 	}
2733 
2734 	if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2735 		dev_err(DEV, "incompatible after-sb-0pri settings\n");
2736 		goto disconnect;
2737 	}
2738 
2739 	if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2740 		dev_err(DEV, "incompatible after-sb-1pri settings\n");
2741 		goto disconnect;
2742 	}
2743 
2744 	if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2745 		dev_err(DEV, "incompatible after-sb-2pri settings\n");
2746 		goto disconnect;
2747 	}
2748 
2749 	if (p_want_lose && mdev->net_conf->want_lose) {
2750 		dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2751 		goto disconnect;
2752 	}
2753 
2754 	if (p_two_primaries != mdev->net_conf->two_primaries) {
2755 		dev_err(DEV, "incompatible setting of the two-primaries options\n");
2756 		goto disconnect;
2757 	}
2758 
2759 	if (mdev->agreed_pro_version >= 87) {
2760 		unsigned char *my_alg = mdev->net_conf->integrity_alg;
2761 
2762 		if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2763 			return FALSE;
2764 
2765 		p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2766 		if (strcmp(p_integrity_alg, my_alg)) {
2767 			dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2768 			goto disconnect;
2769 		}
2770 		dev_info(DEV, "data-integrity-alg: %s\n",
2771 		     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2772 	}
2773 
2774 	return TRUE;
2775 
2776 disconnect:
2777 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2778 	return FALSE;
2779 }
2780 
2781 /* helper function
2782  * input: alg name, feature name
2783  * return: NULL (alg name was "")
2784  *         ERR_PTR(error) if something goes wrong
2785  *         or the crypto hash ptr, if it worked out ok. */
2786 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2787 		const char *alg, const char *name)
2788 {
2789 	struct crypto_hash *tfm;
2790 
2791 	if (!alg[0])
2792 		return NULL;
2793 
2794 	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2795 	if (IS_ERR(tfm)) {
2796 		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2797 			alg, name, PTR_ERR(tfm));
2798 		return tfm;
2799 	}
2800 	if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2801 		crypto_free_hash(tfm);
2802 		dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2803 		return ERR_PTR(-EINVAL);
2804 	}
2805 	return tfm;
2806 }
2807 
2808 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2809 {
2810 	int ok = TRUE;
2811 	struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2812 	unsigned int header_size, data_size, exp_max_sz;
2813 	struct crypto_hash *verify_tfm = NULL;
2814 	struct crypto_hash *csums_tfm = NULL;
2815 	const int apv = mdev->agreed_pro_version;
2816 
2817 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2818 		    : apv == 88 ? sizeof(struct p_rs_param)
2819 					+ SHARED_SECRET_MAX
2820 		    : /* 89 */    sizeof(struct p_rs_param_89);
2821 
2822 	if (h->length > exp_max_sz) {
2823 		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2824 		    h->length, exp_max_sz);
2825 		return FALSE;
2826 	}
2827 
2828 	if (apv <= 88) {
2829 		header_size = sizeof(struct p_rs_param) - sizeof(*h);
2830 		data_size   = h->length  - header_size;
2831 	} else /* apv >= 89 */ {
2832 		header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2833 		data_size   = h->length  - header_size;
2834 		D_ASSERT(data_size == 0);
2835 	}
2836 
2837 	/* initialize verify_alg and csums_alg */
2838 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2839 
2840 	if (drbd_recv(mdev, h->payload, header_size) != header_size)
2841 		return FALSE;
2842 
2843 	mdev->sync_conf.rate	  = be32_to_cpu(p->rate);
2844 
2845 	if (apv >= 88) {
2846 		if (apv == 88) {
2847 			if (data_size > SHARED_SECRET_MAX) {
2848 				dev_err(DEV, "verify-alg too long, "
2849 				    "peer wants %u, accepting only %u byte\n",
2850 						data_size, SHARED_SECRET_MAX);
2851 				return FALSE;
2852 			}
2853 
2854 			if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2855 				return FALSE;
2856 
2857 			/* we expect NUL terminated string */
2858 			/* but just in case someone tries to be evil */
2859 			D_ASSERT(p->verify_alg[data_size-1] == 0);
2860 			p->verify_alg[data_size-1] = 0;
2861 
2862 		} else /* apv >= 89 */ {
2863 			/* we still expect NUL terminated strings */
2864 			/* but just in case someone tries to be evil */
2865 			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2866 			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2867 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2868 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2869 		}
2870 
2871 		if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2872 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2873 				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2874 				    mdev->sync_conf.verify_alg, p->verify_alg);
2875 				goto disconnect;
2876 			}
2877 			verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2878 					p->verify_alg, "verify-alg");
2879 			if (IS_ERR(verify_tfm)) {
2880 				verify_tfm = NULL;
2881 				goto disconnect;
2882 			}
2883 		}
2884 
2885 		if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2886 			if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2887 				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2888 				    mdev->sync_conf.csums_alg, p->csums_alg);
2889 				goto disconnect;
2890 			}
2891 			csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2892 					p->csums_alg, "csums-alg");
2893 			if (IS_ERR(csums_tfm)) {
2894 				csums_tfm = NULL;
2895 				goto disconnect;
2896 			}
2897 		}
2898 
2899 
2900 		spin_lock(&mdev->peer_seq_lock);
2901 		/* lock against drbd_nl_syncer_conf() */
2902 		if (verify_tfm) {
2903 			strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2904 			mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2905 			crypto_free_hash(mdev->verify_tfm);
2906 			mdev->verify_tfm = verify_tfm;
2907 			dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2908 		}
2909 		if (csums_tfm) {
2910 			strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2911 			mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2912 			crypto_free_hash(mdev->csums_tfm);
2913 			mdev->csums_tfm = csums_tfm;
2914 			dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2915 		}
2916 		spin_unlock(&mdev->peer_seq_lock);
2917 	}
2918 
2919 	return ok;
2920 disconnect:
2921 	/* just for completeness: actually not needed,
2922 	 * as this is not reached if csums_tfm was ok. */
2923 	crypto_free_hash(csums_tfm);
2924 	/* but free the verify_tfm again, if csums_tfm did not work out */
2925 	crypto_free_hash(verify_tfm);
2926 	drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2927 	return FALSE;
2928 }
2929 
2930 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2931 {
2932 	/* sorry, we currently have no working implementation
2933 	 * of distributed TCQ */
2934 }
2935 
2936 /* warn if the arguments differ by more than 12.5% */
2937 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2938 	const char *s, sector_t a, sector_t b)
2939 {
2940 	sector_t d;
2941 	if (a == 0 || b == 0)
2942 		return;
2943 	d = (a > b) ? (a - b) : (b - a);
2944 	if (d > (a>>3) || d > (b>>3))
2945 		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2946 		     (unsigned long long)a, (unsigned long long)b);
2947 }
2948 
2949 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2950 {
2951 	struct p_sizes *p = (struct p_sizes *)h;
2952 	enum determine_dev_size dd = unchanged;
2953 	unsigned int max_seg_s;
2954 	sector_t p_size, p_usize, my_usize;
2955 	int ldsc = 0; /* local disk size changed */
2956 	enum dds_flags ddsf;
2957 
2958 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2959 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
2960 		return FALSE;
2961 
2962 	p_size = be64_to_cpu(p->d_size);
2963 	p_usize = be64_to_cpu(p->u_size);
2964 
2965 	if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2966 		dev_err(DEV, "some backing storage is needed\n");
2967 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968 		return FALSE;
2969 	}
2970 
2971 	/* just store the peer's disk size for now.
2972 	 * we still need to figure out whether we accept that. */
2973 	mdev->p_size = p_size;
2974 
2975 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2976 	if (get_ldev(mdev)) {
2977 		warn_if_differ_considerably(mdev, "lower level device sizes",
2978 			   p_size, drbd_get_max_capacity(mdev->ldev));
2979 		warn_if_differ_considerably(mdev, "user requested size",
2980 					    p_usize, mdev->ldev->dc.disk_size);
2981 
2982 		/* if this is the first connect, or an otherwise expected
2983 		 * param exchange, choose the minimum */
2984 		if (mdev->state.conn == C_WF_REPORT_PARAMS)
2985 			p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2986 					     p_usize);
2987 
2988 		my_usize = mdev->ldev->dc.disk_size;
2989 
2990 		if (mdev->ldev->dc.disk_size != p_usize) {
2991 			mdev->ldev->dc.disk_size = p_usize;
2992 			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2993 			     (unsigned long)mdev->ldev->dc.disk_size);
2994 		}
2995 
2996 		/* Never shrink a device with usable data during connect.
2997 		   But allow online shrinking if we are connected. */
2998 		if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2999 		   drbd_get_capacity(mdev->this_bdev) &&
3000 		   mdev->state.disk >= D_OUTDATED &&
3001 		   mdev->state.conn < C_CONNECTED) {
3002 			dev_err(DEV, "The peer's disk size is too small!\n");
3003 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3004 			mdev->ldev->dc.disk_size = my_usize;
3005 			put_ldev(mdev);
3006 			return FALSE;
3007 		}
3008 		put_ldev(mdev);
3009 	}
3010 #undef min_not_zero
3011 
3012 	ddsf = be16_to_cpu(p->dds_flags);
3013 	if (get_ldev(mdev)) {
3014 		dd = drbd_determin_dev_size(mdev, ddsf);
3015 		put_ldev(mdev);
3016 		if (dd == dev_size_error)
3017 			return FALSE;
3018 		drbd_md_sync(mdev);
3019 	} else {
3020 		/* I am diskless, need to accept the peer's size. */
3021 		drbd_set_my_capacity(mdev, p_size);
3022 	}
3023 
3024 	if (get_ldev(mdev)) {
3025 		if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3026 			mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3027 			ldsc = 1;
3028 		}
3029 
3030 		if (mdev->agreed_pro_version < 94)
3031 			max_seg_s = be32_to_cpu(p->max_segment_size);
3032 		else /* drbd 8.3.8 onwards */
3033 			max_seg_s = DRBD_MAX_SEGMENT_SIZE;
3034 
3035 		if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
3036 			drbd_setup_queue_param(mdev, max_seg_s);
3037 
3038 		drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
3039 		put_ldev(mdev);
3040 	}
3041 
3042 	if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3043 		if (be64_to_cpu(p->c_size) !=
3044 		    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3045 			/* we have different sizes, probably peer
3046 			 * needs to know my new size... */
3047 			drbd_send_sizes(mdev, 0, ddsf);
3048 		}
3049 		if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3050 		    (dd == grew && mdev->state.conn == C_CONNECTED)) {
3051 			if (mdev->state.pdsk >= D_INCONSISTENT &&
3052 			    mdev->state.disk >= D_INCONSISTENT) {
3053 				if (ddsf & DDSF_NO_RESYNC)
3054 					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3055 				else
3056 					resync_after_online_grow(mdev);
3057 			} else
3058 				set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3059 		}
3060 	}
3061 
3062 	return TRUE;
3063 }
3064 
3065 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
3066 {
3067 	struct p_uuids *p = (struct p_uuids *)h;
3068 	u64 *p_uuid;
3069 	int i;
3070 
3071 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3072 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3073 		return FALSE;
3074 
3075 	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3076 
3077 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3078 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
3079 
3080 	kfree(mdev->p_uuid);
3081 	mdev->p_uuid = p_uuid;
3082 
3083 	if (mdev->state.conn < C_CONNECTED &&
3084 	    mdev->state.disk < D_INCONSISTENT &&
3085 	    mdev->state.role == R_PRIMARY &&
3086 	    (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3087 		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3088 		    (unsigned long long)mdev->ed_uuid);
3089 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3090 		return FALSE;
3091 	}
3092 
3093 	if (get_ldev(mdev)) {
3094 		int skip_initial_sync =
3095 			mdev->state.conn == C_CONNECTED &&
3096 			mdev->agreed_pro_version >= 90 &&
3097 			mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3098 			(p_uuid[UI_FLAGS] & 8);
3099 		if (skip_initial_sync) {
3100 			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3101 			drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3102 					"clear_n_write from receive_uuids");
3103 			_drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3104 			_drbd_uuid_set(mdev, UI_BITMAP, 0);
3105 			_drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3106 					CS_VERBOSE, NULL);
3107 			drbd_md_sync(mdev);
3108 		}
3109 		put_ldev(mdev);
3110 	}
3111 
3112 	/* Before we test for the disk state, we should wait until an eventually
3113 	   ongoing cluster wide state change is finished. That is important if
3114 	   we are primary and are detaching from our disk. We need to see the
3115 	   new disk state... */
3116 	wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3117 	if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3118 		drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3119 
3120 	return TRUE;
3121 }
3122 
3123 /**
3124  * convert_state() - Converts the peer's view of the cluster state to our point of view
3125  * @ps:		The state as seen by the peer.
3126  */
3127 static union drbd_state convert_state(union drbd_state ps)
3128 {
3129 	union drbd_state ms;
3130 
3131 	static enum drbd_conns c_tab[] = {
3132 		[C_CONNECTED] = C_CONNECTED,
3133 
3134 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3135 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3136 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3137 		[C_VERIFY_S]       = C_VERIFY_T,
3138 		[C_MASK]   = C_MASK,
3139 	};
3140 
3141 	ms.i = ps.i;
3142 
3143 	ms.conn = c_tab[ps.conn];
3144 	ms.peer = ps.role;
3145 	ms.role = ps.peer;
3146 	ms.pdsk = ps.disk;
3147 	ms.disk = ps.pdsk;
3148 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3149 
3150 	return ms;
3151 }
3152 
3153 static int receive_req_state(struct drbd_conf *mdev, struct p_header *h)
3154 {
3155 	struct p_req_state *p = (struct p_req_state *)h;
3156 	union drbd_state mask, val;
3157 	int rv;
3158 
3159 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3160 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3161 		return FALSE;
3162 
3163 	mask.i = be32_to_cpu(p->mask);
3164 	val.i = be32_to_cpu(p->val);
3165 
3166 	if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3167 	    test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3168 		drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3169 		return TRUE;
3170 	}
3171 
3172 	mask = convert_state(mask);
3173 	val = convert_state(val);
3174 
3175 	rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3176 
3177 	drbd_send_sr_reply(mdev, rv);
3178 	drbd_md_sync(mdev);
3179 
3180 	return TRUE;
3181 }
3182 
3183 static int receive_state(struct drbd_conf *mdev, struct p_header *h)
3184 {
3185 	struct p_state *p = (struct p_state *)h;
3186 	enum drbd_conns nconn, oconn;
3187 	union drbd_state ns, peer_state;
3188 	enum drbd_disk_state real_peer_disk;
3189 	int rv;
3190 
3191 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h)))
3192 		return FALSE;
3193 
3194 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3195 		return FALSE;
3196 
3197 	peer_state.i = be32_to_cpu(p->state);
3198 
3199 	real_peer_disk = peer_state.disk;
3200 	if (peer_state.disk == D_NEGOTIATING) {
3201 		real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3202 		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3203 	}
3204 
3205 	spin_lock_irq(&mdev->req_lock);
3206  retry:
3207 	oconn = nconn = mdev->state.conn;
3208 	spin_unlock_irq(&mdev->req_lock);
3209 
3210 	if (nconn == C_WF_REPORT_PARAMS)
3211 		nconn = C_CONNECTED;
3212 
3213 	if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3214 	    get_ldev_if_state(mdev, D_NEGOTIATING)) {
3215 		int cr; /* consider resync */
3216 
3217 		/* if we established a new connection */
3218 		cr  = (oconn < C_CONNECTED);
3219 		/* if we had an established connection
3220 		 * and one of the nodes newly attaches a disk */
3221 		cr |= (oconn == C_CONNECTED &&
3222 		       (peer_state.disk == D_NEGOTIATING ||
3223 			mdev->state.disk == D_NEGOTIATING));
3224 		/* if we have both been inconsistent, and the peer has been
3225 		 * forced to be UpToDate with --overwrite-data */
3226 		cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3227 		/* if we had been plain connected, and the admin requested to
3228 		 * start a sync by "invalidate" or "invalidate-remote" */
3229 		cr |= (oconn == C_CONNECTED &&
3230 				(peer_state.conn >= C_STARTING_SYNC_S &&
3231 				 peer_state.conn <= C_WF_BITMAP_T));
3232 
3233 		if (cr)
3234 			nconn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3235 
3236 		put_ldev(mdev);
3237 		if (nconn == C_MASK) {
3238 			nconn = C_CONNECTED;
3239 			if (mdev->state.disk == D_NEGOTIATING) {
3240 				drbd_force_state(mdev, NS(disk, D_DISKLESS));
3241 			} else if (peer_state.disk == D_NEGOTIATING) {
3242 				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3243 				peer_state.disk = D_DISKLESS;
3244 				real_peer_disk = D_DISKLESS;
3245 			} else {
3246 				if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3247 					return FALSE;
3248 				D_ASSERT(oconn == C_WF_REPORT_PARAMS);
3249 				drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3250 				return FALSE;
3251 			}
3252 		}
3253 	}
3254 
3255 	spin_lock_irq(&mdev->req_lock);
3256 	if (mdev->state.conn != oconn)
3257 		goto retry;
3258 	clear_bit(CONSIDER_RESYNC, &mdev->flags);
3259 	ns.i = mdev->state.i;
3260 	ns.conn = nconn;
3261 	ns.peer = peer_state.role;
3262 	ns.pdsk = real_peer_disk;
3263 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3264 	if ((nconn == C_CONNECTED || nconn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3265 		ns.disk = mdev->new_state_tmp.disk;
3266 
3267 	rv = _drbd_set_state(mdev, ns, CS_VERBOSE | CS_HARD, NULL);
3268 	ns = mdev->state;
3269 	spin_unlock_irq(&mdev->req_lock);
3270 
3271 	if (rv < SS_SUCCESS) {
3272 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3273 		return FALSE;
3274 	}
3275 
3276 	if (oconn > C_WF_REPORT_PARAMS) {
3277 		if (nconn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3278 		    peer_state.disk != D_NEGOTIATING ) {
3279 			/* we want resync, peer has not yet decided to sync... */
3280 			/* Nowadays only used when forcing a node into primary role and
3281 			   setting its disk to UpToDate with that */
3282 			drbd_send_uuids(mdev);
3283 			drbd_send_state(mdev);
3284 		}
3285 	}
3286 
3287 	mdev->net_conf->want_lose = 0;
3288 
3289 	drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3290 
3291 	return TRUE;
3292 }
3293 
3294 static int receive_sync_uuid(struct drbd_conf *mdev, struct p_header *h)
3295 {
3296 	struct p_rs_uuid *p = (struct p_rs_uuid *)h;
3297 
3298 	wait_event(mdev->misc_wait,
3299 		   mdev->state.conn == C_WF_SYNC_UUID ||
3300 		   mdev->state.conn < C_CONNECTED ||
3301 		   mdev->state.disk < D_NEGOTIATING);
3302 
3303 	/* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3304 
3305 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3306 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3307 		return FALSE;
3308 
3309 	/* Here the _drbd_uuid_ functions are right, current should
3310 	   _not_ be rotated into the history */
3311 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3312 		_drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3313 		_drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3314 
3315 		drbd_start_resync(mdev, C_SYNC_TARGET);
3316 
3317 		put_ldev(mdev);
3318 	} else
3319 		dev_err(DEV, "Ignoring SyncUUID packet!\n");
3320 
3321 	return TRUE;
3322 }
3323 
3324 enum receive_bitmap_ret { OK, DONE, FAILED };
3325 
3326 static enum receive_bitmap_ret
3327 receive_bitmap_plain(struct drbd_conf *mdev, struct p_header *h,
3328 	unsigned long *buffer, struct bm_xfer_ctx *c)
3329 {
3330 	unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3331 	unsigned want = num_words * sizeof(long);
3332 
3333 	if (want != h->length) {
3334 		dev_err(DEV, "%s:want (%u) != h->length (%u)\n", __func__, want, h->length);
3335 		return FAILED;
3336 	}
3337 	if (want == 0)
3338 		return DONE;
3339 	if (drbd_recv(mdev, buffer, want) != want)
3340 		return FAILED;
3341 
3342 	drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3343 
3344 	c->word_offset += num_words;
3345 	c->bit_offset = c->word_offset * BITS_PER_LONG;
3346 	if (c->bit_offset > c->bm_bits)
3347 		c->bit_offset = c->bm_bits;
3348 
3349 	return OK;
3350 }
3351 
3352 static enum receive_bitmap_ret
3353 recv_bm_rle_bits(struct drbd_conf *mdev,
3354 		struct p_compressed_bm *p,
3355 		struct bm_xfer_ctx *c)
3356 {
3357 	struct bitstream bs;
3358 	u64 look_ahead;
3359 	u64 rl;
3360 	u64 tmp;
3361 	unsigned long s = c->bit_offset;
3362 	unsigned long e;
3363 	int len = p->head.length - (sizeof(*p) - sizeof(p->head));
3364 	int toggle = DCBP_get_start(p);
3365 	int have;
3366 	int bits;
3367 
3368 	bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3369 
3370 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
3371 	if (bits < 0)
3372 		return FAILED;
3373 
3374 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
3375 		bits = vli_decode_bits(&rl, look_ahead);
3376 		if (bits <= 0)
3377 			return FAILED;
3378 
3379 		if (toggle) {
3380 			e = s + rl -1;
3381 			if (e >= c->bm_bits) {
3382 				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3383 				return FAILED;
3384 			}
3385 			_drbd_bm_set_bits(mdev, s, e);
3386 		}
3387 
3388 		if (have < bits) {
3389 			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3390 				have, bits, look_ahead,
3391 				(unsigned int)(bs.cur.b - p->code),
3392 				(unsigned int)bs.buf_len);
3393 			return FAILED;
3394 		}
3395 		look_ahead >>= bits;
3396 		have -= bits;
3397 
3398 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3399 		if (bits < 0)
3400 			return FAILED;
3401 		look_ahead |= tmp << have;
3402 		have += bits;
3403 	}
3404 
3405 	c->bit_offset = s;
3406 	bm_xfer_ctx_bit_to_word_offset(c);
3407 
3408 	return (s == c->bm_bits) ? DONE : OK;
3409 }
3410 
3411 static enum receive_bitmap_ret
3412 decode_bitmap_c(struct drbd_conf *mdev,
3413 		struct p_compressed_bm *p,
3414 		struct bm_xfer_ctx *c)
3415 {
3416 	if (DCBP_get_code(p) == RLE_VLI_Bits)
3417 		return recv_bm_rle_bits(mdev, p, c);
3418 
3419 	/* other variants had been implemented for evaluation,
3420 	 * but have been dropped as this one turned out to be "best"
3421 	 * during all our tests. */
3422 
3423 	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3424 	drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3425 	return FAILED;
3426 }
3427 
3428 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3429 		const char *direction, struct bm_xfer_ctx *c)
3430 {
3431 	/* what would it take to transfer it "plaintext" */
3432 	unsigned plain = sizeof(struct p_header) *
3433 		((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3434 		+ c->bm_words * sizeof(long);
3435 	unsigned total = c->bytes[0] + c->bytes[1];
3436 	unsigned r;
3437 
3438 	/* total can not be zero. but just in case: */
3439 	if (total == 0)
3440 		return;
3441 
3442 	/* don't report if not compressed */
3443 	if (total >= plain)
3444 		return;
3445 
3446 	/* total < plain. check for overflow, still */
3447 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3448 		                    : (1000 * total / plain);
3449 
3450 	if (r > 1000)
3451 		r = 1000;
3452 
3453 	r = 1000 - r;
3454 	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3455 	     "total %u; compression: %u.%u%%\n",
3456 			direction,
3457 			c->bytes[1], c->packets[1],
3458 			c->bytes[0], c->packets[0],
3459 			total, r/10, r % 10);
3460 }
3461 
3462 /* Since we are processing the bitfield from lower addresses to higher,
3463    it does not matter if the process it in 32 bit chunks or 64 bit
3464    chunks as long as it is little endian. (Understand it as byte stream,
3465    beginning with the lowest byte...) If we would use big endian
3466    we would need to process it from the highest address to the lowest,
3467    in order to be agnostic to the 32 vs 64 bits issue.
3468 
3469    returns 0 on failure, 1 if we successfully received it. */
3470 static int receive_bitmap(struct drbd_conf *mdev, struct p_header *h)
3471 {
3472 	struct bm_xfer_ctx c;
3473 	void *buffer;
3474 	enum receive_bitmap_ret ret;
3475 	int ok = FALSE;
3476 
3477 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3478 
3479 	drbd_bm_lock(mdev, "receive bitmap");
3480 
3481 	/* maybe we should use some per thread scratch page,
3482 	 * and allocate that during initial device creation? */
3483 	buffer	 = (unsigned long *) __get_free_page(GFP_NOIO);
3484 	if (!buffer) {
3485 		dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3486 		goto out;
3487 	}
3488 
3489 	c = (struct bm_xfer_ctx) {
3490 		.bm_bits = drbd_bm_bits(mdev),
3491 		.bm_words = drbd_bm_words(mdev),
3492 	};
3493 
3494 	do {
3495 		if (h->command == P_BITMAP) {
3496 			ret = receive_bitmap_plain(mdev, h, buffer, &c);
3497 		} else if (h->command == P_COMPRESSED_BITMAP) {
3498 			/* MAYBE: sanity check that we speak proto >= 90,
3499 			 * and the feature is enabled! */
3500 			struct p_compressed_bm *p;
3501 
3502 			if (h->length > BM_PACKET_PAYLOAD_BYTES) {
3503 				dev_err(DEV, "ReportCBitmap packet too large\n");
3504 				goto out;
3505 			}
3506 			/* use the page buff */
3507 			p = buffer;
3508 			memcpy(p, h, sizeof(*h));
3509 			if (drbd_recv(mdev, p->head.payload, h->length) != h->length)
3510 				goto out;
3511 			if (p->head.length <= (sizeof(*p) - sizeof(p->head))) {
3512 				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", p->head.length);
3513 				return FAILED;
3514 			}
3515 			ret = decode_bitmap_c(mdev, p, &c);
3516 		} else {
3517 			dev_warn(DEV, "receive_bitmap: h->command neither ReportBitMap nor ReportCBitMap (is 0x%x)", h->command);
3518 			goto out;
3519 		}
3520 
3521 		c.packets[h->command == P_BITMAP]++;
3522 		c.bytes[h->command == P_BITMAP] += sizeof(struct p_header) + h->length;
3523 
3524 		if (ret != OK)
3525 			break;
3526 
3527 		if (!drbd_recv_header(mdev, h))
3528 			goto out;
3529 	} while (ret == OK);
3530 	if (ret == FAILED)
3531 		goto out;
3532 
3533 	INFO_bm_xfer_stats(mdev, "receive", &c);
3534 
3535 	if (mdev->state.conn == C_WF_BITMAP_T) {
3536 		ok = !drbd_send_bitmap(mdev);
3537 		if (!ok)
3538 			goto out;
3539 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3540 		ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3541 		D_ASSERT(ok == SS_SUCCESS);
3542 	} else if (mdev->state.conn != C_WF_BITMAP_S) {
3543 		/* admin may have requested C_DISCONNECTING,
3544 		 * other threads may have noticed network errors */
3545 		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3546 		    drbd_conn_str(mdev->state.conn));
3547 	}
3548 
3549 	ok = TRUE;
3550  out:
3551 	drbd_bm_unlock(mdev);
3552 	if (ok && mdev->state.conn == C_WF_BITMAP_S)
3553 		drbd_start_resync(mdev, C_SYNC_SOURCE);
3554 	free_page((unsigned long) buffer);
3555 	return ok;
3556 }
3557 
3558 static int receive_skip(struct drbd_conf *mdev, struct p_header *h)
3559 {
3560 	/* TODO zero copy sink :) */
3561 	static char sink[128];
3562 	int size, want, r;
3563 
3564 	dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3565 	     h->command, h->length);
3566 
3567 	size = h->length;
3568 	while (size > 0) {
3569 		want = min_t(int, size, sizeof(sink));
3570 		r = drbd_recv(mdev, sink, want);
3571 		ERR_IF(r <= 0) break;
3572 		size -= r;
3573 	}
3574 	return size == 0;
3575 }
3576 
3577 static int receive_UnplugRemote(struct drbd_conf *mdev, struct p_header *h)
3578 {
3579 	if (mdev->state.disk >= D_INCONSISTENT)
3580 		drbd_kick_lo(mdev);
3581 
3582 	/* Make sure we've acked all the TCP data associated
3583 	 * with the data requests being unplugged */
3584 	drbd_tcp_quickack(mdev->data.socket);
3585 
3586 	return TRUE;
3587 }
3588 
3589 static void timeval_sub_us(struct timeval* tv, unsigned int us)
3590 {
3591 	tv->tv_sec -= us / 1000000;
3592 	us = us % 1000000;
3593 	if (tv->tv_usec > us) {
3594 		tv->tv_usec += 1000000;
3595 		tv->tv_sec--;
3596 	}
3597 	tv->tv_usec -= us;
3598 }
3599 
3600 static void got_delay_probe(struct drbd_conf *mdev, int from, struct p_delay_probe *p)
3601 {
3602 	struct delay_probe *dp;
3603 	struct list_head *le;
3604 	struct timeval now;
3605 	int seq_num;
3606 	int offset;
3607 	int data_delay;
3608 
3609 	seq_num = be32_to_cpu(p->seq_num);
3610 	offset  = be32_to_cpu(p->offset);
3611 
3612 	spin_lock(&mdev->peer_seq_lock);
3613 	if (!list_empty(&mdev->delay_probes)) {
3614 		if (from == USE_DATA_SOCKET)
3615 			le = mdev->delay_probes.next;
3616 		else
3617 			le = mdev->delay_probes.prev;
3618 
3619 		dp = list_entry(le, struct delay_probe, list);
3620 
3621 		if (dp->seq_num == seq_num) {
3622 			list_del(le);
3623 			spin_unlock(&mdev->peer_seq_lock);
3624 			do_gettimeofday(&now);
3625 			timeval_sub_us(&now, offset);
3626 			data_delay =
3627 				now.tv_usec - dp->time.tv_usec +
3628 				(now.tv_sec - dp->time.tv_sec) * 1000000;
3629 
3630 			if (data_delay > 0)
3631 				mdev->data_delay = data_delay;
3632 
3633 			kfree(dp);
3634 			return;
3635 		}
3636 
3637 		if (dp->seq_num > seq_num) {
3638 			spin_unlock(&mdev->peer_seq_lock);
3639 			dev_warn(DEV, "Previous allocation failure of struct delay_probe?\n");
3640 			return; /* Do not alloca a struct delay_probe.... */
3641 		}
3642 	}
3643 	spin_unlock(&mdev->peer_seq_lock);
3644 
3645 	dp = kmalloc(sizeof(struct delay_probe), GFP_NOIO);
3646 	if (!dp) {
3647 		dev_warn(DEV, "Failed to allocate a struct delay_probe, do not worry.\n");
3648 		return;
3649 	}
3650 
3651 	dp->seq_num = seq_num;
3652 	do_gettimeofday(&dp->time);
3653 	timeval_sub_us(&dp->time, offset);
3654 
3655 	spin_lock(&mdev->peer_seq_lock);
3656 	if (from == USE_DATA_SOCKET)
3657 		list_add(&dp->list, &mdev->delay_probes);
3658 	else
3659 		list_add_tail(&dp->list, &mdev->delay_probes);
3660 	spin_unlock(&mdev->peer_seq_lock);
3661 }
3662 
3663 static int receive_delay_probe(struct drbd_conf *mdev, struct p_header *h)
3664 {
3665 	struct p_delay_probe *p = (struct p_delay_probe *)h;
3666 
3667 	ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
3668 	if (drbd_recv(mdev, h->payload, h->length) != h->length)
3669 		return FALSE;
3670 
3671 	got_delay_probe(mdev, USE_DATA_SOCKET, p);
3672 	return TRUE;
3673 }
3674 
3675 typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, struct p_header *);
3676 
3677 static drbd_cmd_handler_f drbd_default_handler[] = {
3678 	[P_DATA]	    = receive_Data,
3679 	[P_DATA_REPLY]	    = receive_DataReply,
3680 	[P_RS_DATA_REPLY]   = receive_RSDataReply,
3681 	[P_BARRIER]	    = receive_Barrier,
3682 	[P_BITMAP]	    = receive_bitmap,
3683 	[P_COMPRESSED_BITMAP]    = receive_bitmap,
3684 	[P_UNPLUG_REMOTE]   = receive_UnplugRemote,
3685 	[P_DATA_REQUEST]    = receive_DataRequest,
3686 	[P_RS_DATA_REQUEST] = receive_DataRequest,
3687 	[P_SYNC_PARAM]	    = receive_SyncParam,
3688 	[P_SYNC_PARAM89]	   = receive_SyncParam,
3689 	[P_PROTOCOL]        = receive_protocol,
3690 	[P_UUIDS]	    = receive_uuids,
3691 	[P_SIZES]	    = receive_sizes,
3692 	[P_STATE]	    = receive_state,
3693 	[P_STATE_CHG_REQ]   = receive_req_state,
3694 	[P_SYNC_UUID]       = receive_sync_uuid,
3695 	[P_OV_REQUEST]      = receive_DataRequest,
3696 	[P_OV_REPLY]        = receive_DataRequest,
3697 	[P_CSUM_RS_REQUEST]    = receive_DataRequest,
3698 	[P_DELAY_PROBE]     = receive_delay_probe,
3699 	/* anything missing from this table is in
3700 	 * the asender_tbl, see get_asender_cmd */
3701 	[P_MAX_CMD]	    = NULL,
3702 };
3703 
3704 static drbd_cmd_handler_f *drbd_cmd_handler = drbd_default_handler;
3705 static drbd_cmd_handler_f *drbd_opt_cmd_handler;
3706 
3707 static void drbdd(struct drbd_conf *mdev)
3708 {
3709 	drbd_cmd_handler_f handler;
3710 	struct p_header *header = &mdev->data.rbuf.header;
3711 
3712 	while (get_t_state(&mdev->receiver) == Running) {
3713 		drbd_thread_current_set_cpu(mdev);
3714 		if (!drbd_recv_header(mdev, header)) {
3715 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3716 			break;
3717 		}
3718 
3719 		if (header->command < P_MAX_CMD)
3720 			handler = drbd_cmd_handler[header->command];
3721 		else if (P_MAY_IGNORE < header->command
3722 		     && header->command < P_MAX_OPT_CMD)
3723 			handler = drbd_opt_cmd_handler[header->command-P_MAY_IGNORE];
3724 		else if (header->command > P_MAX_OPT_CMD)
3725 			handler = receive_skip;
3726 		else
3727 			handler = NULL;
3728 
3729 		if (unlikely(!handler)) {
3730 			dev_err(DEV, "unknown packet type %d, l: %d!\n",
3731 			    header->command, header->length);
3732 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3733 			break;
3734 		}
3735 		if (unlikely(!handler(mdev, header))) {
3736 			dev_err(DEV, "error receiving %s, l: %d!\n",
3737 			    cmdname(header->command), header->length);
3738 			drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3739 			break;
3740 		}
3741 	}
3742 }
3743 
3744 static void drbd_fail_pending_reads(struct drbd_conf *mdev)
3745 {
3746 	struct hlist_head *slot;
3747 	struct hlist_node *pos;
3748 	struct hlist_node *tmp;
3749 	struct drbd_request *req;
3750 	int i;
3751 
3752 	/*
3753 	 * Application READ requests
3754 	 */
3755 	spin_lock_irq(&mdev->req_lock);
3756 	for (i = 0; i < APP_R_HSIZE; i++) {
3757 		slot = mdev->app_reads_hash+i;
3758 		hlist_for_each_entry_safe(req, pos, tmp, slot, colision) {
3759 			/* it may (but should not any longer!)
3760 			 * be on the work queue; if that assert triggers,
3761 			 * we need to also grab the
3762 			 * spin_lock_irq(&mdev->data.work.q_lock);
3763 			 * and list_del_init here. */
3764 			D_ASSERT(list_empty(&req->w.list));
3765 			/* It would be nice to complete outside of spinlock.
3766 			 * But this is easier for now. */
3767 			_req_mod(req, connection_lost_while_pending);
3768 		}
3769 	}
3770 	for (i = 0; i < APP_R_HSIZE; i++)
3771 		if (!hlist_empty(mdev->app_reads_hash+i))
3772 			dev_warn(DEV, "ASSERT FAILED: app_reads_hash[%d].first: "
3773 				"%p, should be NULL\n", i, mdev->app_reads_hash[i].first);
3774 
3775 	memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
3776 	spin_unlock_irq(&mdev->req_lock);
3777 }
3778 
3779 void drbd_flush_workqueue(struct drbd_conf *mdev)
3780 {
3781 	struct drbd_wq_barrier barr;
3782 
3783 	barr.w.cb = w_prev_work_done;
3784 	init_completion(&barr.done);
3785 	drbd_queue_work(&mdev->data.work, &barr.w);
3786 	wait_for_completion(&barr.done);
3787 }
3788 
3789 static void drbd_disconnect(struct drbd_conf *mdev)
3790 {
3791 	enum drbd_fencing_p fp;
3792 	union drbd_state os, ns;
3793 	int rv = SS_UNKNOWN_ERROR;
3794 	unsigned int i;
3795 
3796 	if (mdev->state.conn == C_STANDALONE)
3797 		return;
3798 	if (mdev->state.conn >= C_WF_CONNECTION)
3799 		dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3800 				drbd_conn_str(mdev->state.conn));
3801 
3802 	/* asender does not clean up anything. it must not interfere, either */
3803 	drbd_thread_stop(&mdev->asender);
3804 	drbd_free_sock(mdev);
3805 
3806 	spin_lock_irq(&mdev->req_lock);
3807 	_drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3808 	_drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3809 	_drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3810 	spin_unlock_irq(&mdev->req_lock);
3811 
3812 	/* We do not have data structures that would allow us to
3813 	 * get the rs_pending_cnt down to 0 again.
3814 	 *  * On C_SYNC_TARGET we do not have any data structures describing
3815 	 *    the pending RSDataRequest's we have sent.
3816 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
3817 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3818 	 *  And no, it is not the sum of the reference counts in the
3819 	 *  resync_LRU. The resync_LRU tracks the whole operation including
3820 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3821 	 *  on the fly. */
3822 	drbd_rs_cancel_all(mdev);
3823 	mdev->rs_total = 0;
3824 	mdev->rs_failed = 0;
3825 	atomic_set(&mdev->rs_pending_cnt, 0);
3826 	wake_up(&mdev->misc_wait);
3827 
3828 	/* make sure syncer is stopped and w_resume_next_sg queued */
3829 	del_timer_sync(&mdev->resync_timer);
3830 	set_bit(STOP_SYNC_TIMER, &mdev->flags);
3831 	resync_timer_fn((unsigned long)mdev);
3832 
3833 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3834 	 * w_make_resync_request etc. which may still be on the worker queue
3835 	 * to be "canceled" */
3836 	drbd_flush_workqueue(mdev);
3837 
3838 	/* This also does reclaim_net_ee().  If we do this too early, we might
3839 	 * miss some resync ee and pages.*/
3840 	drbd_process_done_ee(mdev);
3841 
3842 	kfree(mdev->p_uuid);
3843 	mdev->p_uuid = NULL;
3844 
3845 	if (!mdev->state.susp)
3846 		tl_clear(mdev);
3847 
3848 	drbd_fail_pending_reads(mdev);
3849 
3850 	dev_info(DEV, "Connection closed\n");
3851 
3852 	drbd_md_sync(mdev);
3853 
3854 	fp = FP_DONT_CARE;
3855 	if (get_ldev(mdev)) {
3856 		fp = mdev->ldev->dc.fencing;
3857 		put_ldev(mdev);
3858 	}
3859 
3860 	if (mdev->state.role == R_PRIMARY) {
3861 		if (fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN) {
3862 			enum drbd_disk_state nps = drbd_try_outdate_peer(mdev);
3863 			drbd_request_state(mdev, NS(pdsk, nps));
3864 		}
3865 	}
3866 
3867 	spin_lock_irq(&mdev->req_lock);
3868 	os = mdev->state;
3869 	if (os.conn >= C_UNCONNECTED) {
3870 		/* Do not restart in case we are C_DISCONNECTING */
3871 		ns = os;
3872 		ns.conn = C_UNCONNECTED;
3873 		rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3874 	}
3875 	spin_unlock_irq(&mdev->req_lock);
3876 
3877 	if (os.conn == C_DISCONNECTING) {
3878 		struct hlist_head *h;
3879 		wait_event(mdev->misc_wait, atomic_read(&mdev->net_cnt) == 0);
3880 
3881 		/* we must not free the tl_hash
3882 		 * while application io is still on the fly */
3883 		wait_event(mdev->misc_wait, atomic_read(&mdev->ap_bio_cnt) == 0);
3884 
3885 		spin_lock_irq(&mdev->req_lock);
3886 		/* paranoia code */
3887 		for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3888 			if (h->first)
3889 				dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3890 						(int)(h - mdev->ee_hash), h->first);
3891 		kfree(mdev->ee_hash);
3892 		mdev->ee_hash = NULL;
3893 		mdev->ee_hash_s = 0;
3894 
3895 		/* paranoia code */
3896 		for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3897 			if (h->first)
3898 				dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3899 						(int)(h - mdev->tl_hash), h->first);
3900 		kfree(mdev->tl_hash);
3901 		mdev->tl_hash = NULL;
3902 		mdev->tl_hash_s = 0;
3903 		spin_unlock_irq(&mdev->req_lock);
3904 
3905 		crypto_free_hash(mdev->cram_hmac_tfm);
3906 		mdev->cram_hmac_tfm = NULL;
3907 
3908 		kfree(mdev->net_conf);
3909 		mdev->net_conf = NULL;
3910 		drbd_request_state(mdev, NS(conn, C_STANDALONE));
3911 	}
3912 
3913 	/* tcp_close and release of sendpage pages can be deferred.  I don't
3914 	 * want to use SO_LINGER, because apparently it can be deferred for
3915 	 * more than 20 seconds (longest time I checked).
3916 	 *
3917 	 * Actually we don't care for exactly when the network stack does its
3918 	 * put_page(), but release our reference on these pages right here.
3919 	 */
3920 	i = drbd_release_ee(mdev, &mdev->net_ee);
3921 	if (i)
3922 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3923 	i = atomic_read(&mdev->pp_in_use);
3924 	if (i)
3925 		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3926 
3927 	D_ASSERT(list_empty(&mdev->read_ee));
3928 	D_ASSERT(list_empty(&mdev->active_ee));
3929 	D_ASSERT(list_empty(&mdev->sync_ee));
3930 	D_ASSERT(list_empty(&mdev->done_ee));
3931 
3932 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3933 	atomic_set(&mdev->current_epoch->epoch_size, 0);
3934 	D_ASSERT(list_empty(&mdev->current_epoch->list));
3935 }
3936 
3937 /*
3938  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3939  * we can agree on is stored in agreed_pro_version.
3940  *
3941  * feature flags and the reserved array should be enough room for future
3942  * enhancements of the handshake protocol, and possible plugins...
3943  *
3944  * for now, they are expected to be zero, but ignored.
3945  */
3946 static int drbd_send_handshake(struct drbd_conf *mdev)
3947 {
3948 	/* ASSERT current == mdev->receiver ... */
3949 	struct p_handshake *p = &mdev->data.sbuf.handshake;
3950 	int ok;
3951 
3952 	if (mutex_lock_interruptible(&mdev->data.mutex)) {
3953 		dev_err(DEV, "interrupted during initial handshake\n");
3954 		return 0; /* interrupted. not ok. */
3955 	}
3956 
3957 	if (mdev->data.socket == NULL) {
3958 		mutex_unlock(&mdev->data.mutex);
3959 		return 0;
3960 	}
3961 
3962 	memset(p, 0, sizeof(*p));
3963 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3964 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3965 	ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3966 			     (struct p_header *)p, sizeof(*p), 0 );
3967 	mutex_unlock(&mdev->data.mutex);
3968 	return ok;
3969 }
3970 
3971 /*
3972  * return values:
3973  *   1 yes, we have a valid connection
3974  *   0 oops, did not work out, please try again
3975  *  -1 peer talks different language,
3976  *     no point in trying again, please go standalone.
3977  */
3978 static int drbd_do_handshake(struct drbd_conf *mdev)
3979 {
3980 	/* ASSERT current == mdev->receiver ... */
3981 	struct p_handshake *p = &mdev->data.rbuf.handshake;
3982 	const int expect = sizeof(struct p_handshake)
3983 			  -sizeof(struct p_header);
3984 	int rv;
3985 
3986 	rv = drbd_send_handshake(mdev);
3987 	if (!rv)
3988 		return 0;
3989 
3990 	rv = drbd_recv_header(mdev, &p->head);
3991 	if (!rv)
3992 		return 0;
3993 
3994 	if (p->head.command != P_HAND_SHAKE) {
3995 		dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3996 		     cmdname(p->head.command), p->head.command);
3997 		return -1;
3998 	}
3999 
4000 	if (p->head.length != expect) {
4001 		dev_err(DEV, "expected HandShake length: %u, received: %u\n",
4002 		     expect, p->head.length);
4003 		return -1;
4004 	}
4005 
4006 	rv = drbd_recv(mdev, &p->head.payload, expect);
4007 
4008 	if (rv != expect) {
4009 		dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
4010 		return 0;
4011 	}
4012 
4013 	p->protocol_min = be32_to_cpu(p->protocol_min);
4014 	p->protocol_max = be32_to_cpu(p->protocol_max);
4015 	if (p->protocol_max == 0)
4016 		p->protocol_max = p->protocol_min;
4017 
4018 	if (PRO_VERSION_MAX < p->protocol_min ||
4019 	    PRO_VERSION_MIN > p->protocol_max)
4020 		goto incompat;
4021 
4022 	mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4023 
4024 	dev_info(DEV, "Handshake successful: "
4025 	     "Agreed network protocol version %d\n", mdev->agreed_pro_version);
4026 
4027 	return 1;
4028 
4029  incompat:
4030 	dev_err(DEV, "incompatible DRBD dialects: "
4031 	    "I support %d-%d, peer supports %d-%d\n",
4032 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
4033 	    p->protocol_min, p->protocol_max);
4034 	return -1;
4035 }
4036 
4037 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4038 static int drbd_do_auth(struct drbd_conf *mdev)
4039 {
4040 	dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4041 	dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4042 	return -1;
4043 }
4044 #else
4045 #define CHALLENGE_LEN 64
4046 
4047 /* Return value:
4048 	1 - auth succeeded,
4049 	0 - failed, try again (network error),
4050 	-1 - auth failed, don't try again.
4051 */
4052 
4053 static int drbd_do_auth(struct drbd_conf *mdev)
4054 {
4055 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4056 	struct scatterlist sg;
4057 	char *response = NULL;
4058 	char *right_response = NULL;
4059 	char *peers_ch = NULL;
4060 	struct p_header p;
4061 	unsigned int key_len = strlen(mdev->net_conf->shared_secret);
4062 	unsigned int resp_size;
4063 	struct hash_desc desc;
4064 	int rv;
4065 
4066 	desc.tfm = mdev->cram_hmac_tfm;
4067 	desc.flags = 0;
4068 
4069 	rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
4070 				(u8 *)mdev->net_conf->shared_secret, key_len);
4071 	if (rv) {
4072 		dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
4073 		rv = -1;
4074 		goto fail;
4075 	}
4076 
4077 	get_random_bytes(my_challenge, CHALLENGE_LEN);
4078 
4079 	rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
4080 	if (!rv)
4081 		goto fail;
4082 
4083 	rv = drbd_recv_header(mdev, &p);
4084 	if (!rv)
4085 		goto fail;
4086 
4087 	if (p.command != P_AUTH_CHALLENGE) {
4088 		dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4089 		    cmdname(p.command), p.command);
4090 		rv = 0;
4091 		goto fail;
4092 	}
4093 
4094 	if (p.length > CHALLENGE_LEN*2) {
4095 		dev_err(DEV, "expected AuthChallenge payload too big.\n");
4096 		rv = -1;
4097 		goto fail;
4098 	}
4099 
4100 	peers_ch = kmalloc(p.length, GFP_NOIO);
4101 	if (peers_ch == NULL) {
4102 		dev_err(DEV, "kmalloc of peers_ch failed\n");
4103 		rv = -1;
4104 		goto fail;
4105 	}
4106 
4107 	rv = drbd_recv(mdev, peers_ch, p.length);
4108 
4109 	if (rv != p.length) {
4110 		dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4111 		rv = 0;
4112 		goto fail;
4113 	}
4114 
4115 	resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4116 	response = kmalloc(resp_size, GFP_NOIO);
4117 	if (response == NULL) {
4118 		dev_err(DEV, "kmalloc of response failed\n");
4119 		rv = -1;
4120 		goto fail;
4121 	}
4122 
4123 	sg_init_table(&sg, 1);
4124 	sg_set_buf(&sg, peers_ch, p.length);
4125 
4126 	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4127 	if (rv) {
4128 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4129 		rv = -1;
4130 		goto fail;
4131 	}
4132 
4133 	rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4134 	if (!rv)
4135 		goto fail;
4136 
4137 	rv = drbd_recv_header(mdev, &p);
4138 	if (!rv)
4139 		goto fail;
4140 
4141 	if (p.command != P_AUTH_RESPONSE) {
4142 		dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4143 		    cmdname(p.command), p.command);
4144 		rv = 0;
4145 		goto fail;
4146 	}
4147 
4148 	if (p.length != resp_size) {
4149 		dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4150 		rv = 0;
4151 		goto fail;
4152 	}
4153 
4154 	rv = drbd_recv(mdev, response , resp_size);
4155 
4156 	if (rv != resp_size) {
4157 		dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4158 		rv = 0;
4159 		goto fail;
4160 	}
4161 
4162 	right_response = kmalloc(resp_size, GFP_NOIO);
4163 	if (right_response == NULL) {
4164 		dev_err(DEV, "kmalloc of right_response failed\n");
4165 		rv = -1;
4166 		goto fail;
4167 	}
4168 
4169 	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4170 
4171 	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4172 	if (rv) {
4173 		dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4174 		rv = -1;
4175 		goto fail;
4176 	}
4177 
4178 	rv = !memcmp(response, right_response, resp_size);
4179 
4180 	if (rv)
4181 		dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4182 		     resp_size, mdev->net_conf->cram_hmac_alg);
4183 	else
4184 		rv = -1;
4185 
4186  fail:
4187 	kfree(peers_ch);
4188 	kfree(response);
4189 	kfree(right_response);
4190 
4191 	return rv;
4192 }
4193 #endif
4194 
4195 int drbdd_init(struct drbd_thread *thi)
4196 {
4197 	struct drbd_conf *mdev = thi->mdev;
4198 	unsigned int minor = mdev_to_minor(mdev);
4199 	int h;
4200 
4201 	sprintf(current->comm, "drbd%d_receiver", minor);
4202 
4203 	dev_info(DEV, "receiver (re)started\n");
4204 
4205 	do {
4206 		h = drbd_connect(mdev);
4207 		if (h == 0) {
4208 			drbd_disconnect(mdev);
4209 			__set_current_state(TASK_INTERRUPTIBLE);
4210 			schedule_timeout(HZ);
4211 		}
4212 		if (h == -1) {
4213 			dev_warn(DEV, "Discarding network configuration.\n");
4214 			drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4215 		}
4216 	} while (h == 0);
4217 
4218 	if (h > 0) {
4219 		if (get_net_conf(mdev)) {
4220 			drbdd(mdev);
4221 			put_net_conf(mdev);
4222 		}
4223 	}
4224 
4225 	drbd_disconnect(mdev);
4226 
4227 	dev_info(DEV, "receiver terminated\n");
4228 	return 0;
4229 }
4230 
4231 /* ********* acknowledge sender ******** */
4232 
4233 static int got_RqSReply(struct drbd_conf *mdev, struct p_header *h)
4234 {
4235 	struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4236 
4237 	int retcode = be32_to_cpu(p->retcode);
4238 
4239 	if (retcode >= SS_SUCCESS) {
4240 		set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4241 	} else {
4242 		set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4243 		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4244 		    drbd_set_st_err_str(retcode), retcode);
4245 	}
4246 	wake_up(&mdev->state_wait);
4247 
4248 	return TRUE;
4249 }
4250 
4251 static int got_Ping(struct drbd_conf *mdev, struct p_header *h)
4252 {
4253 	return drbd_send_ping_ack(mdev);
4254 
4255 }
4256 
4257 static int got_PingAck(struct drbd_conf *mdev, struct p_header *h)
4258 {
4259 	/* restore idle timeout */
4260 	mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4261 	if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4262 		wake_up(&mdev->misc_wait);
4263 
4264 	return TRUE;
4265 }
4266 
4267 static int got_IsInSync(struct drbd_conf *mdev, struct p_header *h)
4268 {
4269 	struct p_block_ack *p = (struct p_block_ack *)h;
4270 	sector_t sector = be64_to_cpu(p->sector);
4271 	int blksize = be32_to_cpu(p->blksize);
4272 
4273 	D_ASSERT(mdev->agreed_pro_version >= 89);
4274 
4275 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4276 
4277 	drbd_rs_complete_io(mdev, sector);
4278 	drbd_set_in_sync(mdev, sector, blksize);
4279 	/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4280 	mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4281 	dec_rs_pending(mdev);
4282 
4283 	return TRUE;
4284 }
4285 
4286 /* when we receive the ACK for a write request,
4287  * verify that we actually know about it */
4288 static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4289 	u64 id, sector_t sector)
4290 {
4291 	struct hlist_head *slot = tl_hash_slot(mdev, sector);
4292 	struct hlist_node *n;
4293 	struct drbd_request *req;
4294 
4295 	hlist_for_each_entry(req, n, slot, colision) {
4296 		if ((unsigned long)req == (unsigned long)id) {
4297 			if (req->sector != sector) {
4298 				dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4299 				    "wrong sector (%llus versus %llus)\n", req,
4300 				    (unsigned long long)req->sector,
4301 				    (unsigned long long)sector);
4302 				break;
4303 			}
4304 			return req;
4305 		}
4306 	}
4307 	dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4308 		(void *)(unsigned long)id, (unsigned long long)sector);
4309 	return NULL;
4310 }
4311 
4312 typedef struct drbd_request *(req_validator_fn)
4313 	(struct drbd_conf *mdev, u64 id, sector_t sector);
4314 
4315 static int validate_req_change_req_state(struct drbd_conf *mdev,
4316 	u64 id, sector_t sector, req_validator_fn validator,
4317 	const char *func, enum drbd_req_event what)
4318 {
4319 	struct drbd_request *req;
4320 	struct bio_and_error m;
4321 
4322 	spin_lock_irq(&mdev->req_lock);
4323 	req = validator(mdev, id, sector);
4324 	if (unlikely(!req)) {
4325 		spin_unlock_irq(&mdev->req_lock);
4326 		dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4327 		return FALSE;
4328 	}
4329 	__req_mod(req, what, &m);
4330 	spin_unlock_irq(&mdev->req_lock);
4331 
4332 	if (m.bio)
4333 		complete_master_bio(mdev, &m);
4334 	return TRUE;
4335 }
4336 
4337 static int got_BlockAck(struct drbd_conf *mdev, struct p_header *h)
4338 {
4339 	struct p_block_ack *p = (struct p_block_ack *)h;
4340 	sector_t sector = be64_to_cpu(p->sector);
4341 	int blksize = be32_to_cpu(p->blksize);
4342 	enum drbd_req_event what;
4343 
4344 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4345 
4346 	if (is_syncer_block_id(p->block_id)) {
4347 		drbd_set_in_sync(mdev, sector, blksize);
4348 		dec_rs_pending(mdev);
4349 		return TRUE;
4350 	}
4351 	switch (be16_to_cpu(h->command)) {
4352 	case P_RS_WRITE_ACK:
4353 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4354 		what = write_acked_by_peer_and_sis;
4355 		break;
4356 	case P_WRITE_ACK:
4357 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4358 		what = write_acked_by_peer;
4359 		break;
4360 	case P_RECV_ACK:
4361 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4362 		what = recv_acked_by_peer;
4363 		break;
4364 	case P_DISCARD_ACK:
4365 		D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4366 		what = conflict_discarded_by_peer;
4367 		break;
4368 	default:
4369 		D_ASSERT(0);
4370 		return FALSE;
4371 	}
4372 
4373 	return validate_req_change_req_state(mdev, p->block_id, sector,
4374 		_ack_id_to_req, __func__ , what);
4375 }
4376 
4377 static int got_NegAck(struct drbd_conf *mdev, struct p_header *h)
4378 {
4379 	struct p_block_ack *p = (struct p_block_ack *)h;
4380 	sector_t sector = be64_to_cpu(p->sector);
4381 
4382 	if (__ratelimit(&drbd_ratelimit_state))
4383 		dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4384 
4385 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4386 
4387 	if (is_syncer_block_id(p->block_id)) {
4388 		int size = be32_to_cpu(p->blksize);
4389 		dec_rs_pending(mdev);
4390 		drbd_rs_failed_io(mdev, sector, size);
4391 		return TRUE;
4392 	}
4393 	return validate_req_change_req_state(mdev, p->block_id, sector,
4394 		_ack_id_to_req, __func__ , neg_acked);
4395 }
4396 
4397 static int got_NegDReply(struct drbd_conf *mdev, struct p_header *h)
4398 {
4399 	struct p_block_ack *p = (struct p_block_ack *)h;
4400 	sector_t sector = be64_to_cpu(p->sector);
4401 
4402 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4403 	dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4404 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
4405 
4406 	return validate_req_change_req_state(mdev, p->block_id, sector,
4407 		_ar_id_to_req, __func__ , neg_acked);
4408 }
4409 
4410 static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header *h)
4411 {
4412 	sector_t sector;
4413 	int size;
4414 	struct p_block_ack *p = (struct p_block_ack *)h;
4415 
4416 	sector = be64_to_cpu(p->sector);
4417 	size = be32_to_cpu(p->blksize);
4418 
4419 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4420 
4421 	dec_rs_pending(mdev);
4422 
4423 	if (get_ldev_if_state(mdev, D_FAILED)) {
4424 		drbd_rs_complete_io(mdev, sector);
4425 		drbd_rs_failed_io(mdev, sector, size);
4426 		put_ldev(mdev);
4427 	}
4428 
4429 	return TRUE;
4430 }
4431 
4432 static int got_BarrierAck(struct drbd_conf *mdev, struct p_header *h)
4433 {
4434 	struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4435 
4436 	tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4437 
4438 	return TRUE;
4439 }
4440 
4441 static int got_OVResult(struct drbd_conf *mdev, struct p_header *h)
4442 {
4443 	struct p_block_ack *p = (struct p_block_ack *)h;
4444 	struct drbd_work *w;
4445 	sector_t sector;
4446 	int size;
4447 
4448 	sector = be64_to_cpu(p->sector);
4449 	size = be32_to_cpu(p->blksize);
4450 
4451 	update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4452 
4453 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4454 		drbd_ov_oos_found(mdev, sector, size);
4455 	else
4456 		ov_oos_print(mdev);
4457 
4458 	drbd_rs_complete_io(mdev, sector);
4459 	dec_rs_pending(mdev);
4460 
4461 	if (--mdev->ov_left == 0) {
4462 		w = kmalloc(sizeof(*w), GFP_NOIO);
4463 		if (w) {
4464 			w->cb = w_ov_finished;
4465 			drbd_queue_work_front(&mdev->data.work, w);
4466 		} else {
4467 			dev_err(DEV, "kmalloc(w) failed.");
4468 			ov_oos_print(mdev);
4469 			drbd_resync_finished(mdev);
4470 		}
4471 	}
4472 	return TRUE;
4473 }
4474 
4475 static int got_delay_probe_m(struct drbd_conf *mdev, struct p_header *h)
4476 {
4477 	struct p_delay_probe *p = (struct p_delay_probe *)h;
4478 
4479 	got_delay_probe(mdev, USE_META_SOCKET, p);
4480 	return TRUE;
4481 }
4482 
4483 struct asender_cmd {
4484 	size_t pkt_size;
4485 	int (*process)(struct drbd_conf *mdev, struct p_header *h);
4486 };
4487 
4488 static struct asender_cmd *get_asender_cmd(int cmd)
4489 {
4490 	static struct asender_cmd asender_tbl[] = {
4491 		/* anything missing from this table is in
4492 		 * the drbd_cmd_handler (drbd_default_handler) table,
4493 		 * see the beginning of drbdd() */
4494 	[P_PING]	    = { sizeof(struct p_header), got_Ping },
4495 	[P_PING_ACK]	    = { sizeof(struct p_header), got_PingAck },
4496 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4497 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4498 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4499 	[P_DISCARD_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
4500 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
4501 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
4502 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4503 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
4504 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
4505 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4506 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4507 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe), got_delay_probe_m },
4508 	[P_MAX_CMD]	    = { 0, NULL },
4509 	};
4510 	if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4511 		return NULL;
4512 	return &asender_tbl[cmd];
4513 }
4514 
4515 int drbd_asender(struct drbd_thread *thi)
4516 {
4517 	struct drbd_conf *mdev = thi->mdev;
4518 	struct p_header *h = &mdev->meta.rbuf.header;
4519 	struct asender_cmd *cmd = NULL;
4520 
4521 	int rv, len;
4522 	void *buf    = h;
4523 	int received = 0;
4524 	int expect   = sizeof(struct p_header);
4525 	int empty;
4526 
4527 	sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4528 
4529 	current->policy = SCHED_RR;  /* Make this a realtime task! */
4530 	current->rt_priority = 2;    /* more important than all other tasks */
4531 
4532 	while (get_t_state(thi) == Running) {
4533 		drbd_thread_current_set_cpu(mdev);
4534 		if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4535 			ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4536 			mdev->meta.socket->sk->sk_rcvtimeo =
4537 				mdev->net_conf->ping_timeo*HZ/10;
4538 		}
4539 
4540 		/* conditionally cork;
4541 		 * it may hurt latency if we cork without much to send */
4542 		if (!mdev->net_conf->no_cork &&
4543 			3 < atomic_read(&mdev->unacked_cnt))
4544 			drbd_tcp_cork(mdev->meta.socket);
4545 		while (1) {
4546 			clear_bit(SIGNAL_ASENDER, &mdev->flags);
4547 			flush_signals(current);
4548 			if (!drbd_process_done_ee(mdev)) {
4549 				dev_err(DEV, "process_done_ee() = NOT_OK\n");
4550 				goto reconnect;
4551 			}
4552 			/* to avoid race with newly queued ACKs */
4553 			set_bit(SIGNAL_ASENDER, &mdev->flags);
4554 			spin_lock_irq(&mdev->req_lock);
4555 			empty = list_empty(&mdev->done_ee);
4556 			spin_unlock_irq(&mdev->req_lock);
4557 			/* new ack may have been queued right here,
4558 			 * but then there is also a signal pending,
4559 			 * and we start over... */
4560 			if (empty)
4561 				break;
4562 		}
4563 		/* but unconditionally uncork unless disabled */
4564 		if (!mdev->net_conf->no_cork)
4565 			drbd_tcp_uncork(mdev->meta.socket);
4566 
4567 		/* short circuit, recv_msg would return EINTR anyways. */
4568 		if (signal_pending(current))
4569 			continue;
4570 
4571 		rv = drbd_recv_short(mdev, mdev->meta.socket,
4572 				     buf, expect-received, 0);
4573 		clear_bit(SIGNAL_ASENDER, &mdev->flags);
4574 
4575 		flush_signals(current);
4576 
4577 		/* Note:
4578 		 * -EINTR	 (on meta) we got a signal
4579 		 * -EAGAIN	 (on meta) rcvtimeo expired
4580 		 * -ECONNRESET	 other side closed the connection
4581 		 * -ERESTARTSYS  (on data) we got a signal
4582 		 * rv <  0	 other than above: unexpected error!
4583 		 * rv == expected: full header or command
4584 		 * rv <  expected: "woken" by signal during receive
4585 		 * rv == 0	 : "connection shut down by peer"
4586 		 */
4587 		if (likely(rv > 0)) {
4588 			received += rv;
4589 			buf	 += rv;
4590 		} else if (rv == 0) {
4591 			dev_err(DEV, "meta connection shut down by peer.\n");
4592 			goto reconnect;
4593 		} else if (rv == -EAGAIN) {
4594 			if (mdev->meta.socket->sk->sk_rcvtimeo ==
4595 			    mdev->net_conf->ping_timeo*HZ/10) {
4596 				dev_err(DEV, "PingAck did not arrive in time.\n");
4597 				goto reconnect;
4598 			}
4599 			set_bit(SEND_PING, &mdev->flags);
4600 			continue;
4601 		} else if (rv == -EINTR) {
4602 			continue;
4603 		} else {
4604 			dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4605 			goto reconnect;
4606 		}
4607 
4608 		if (received == expect && cmd == NULL) {
4609 			if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4610 				dev_err(DEV, "magic?? on meta m: 0x%lx c: %d l: %d\n",
4611 				    (long)be32_to_cpu(h->magic),
4612 				    h->command, h->length);
4613 				goto reconnect;
4614 			}
4615 			cmd = get_asender_cmd(be16_to_cpu(h->command));
4616 			len = be16_to_cpu(h->length);
4617 			if (unlikely(cmd == NULL)) {
4618 				dev_err(DEV, "unknown command?? on meta m: 0x%lx c: %d l: %d\n",
4619 				    (long)be32_to_cpu(h->magic),
4620 				    h->command, h->length);
4621 				goto disconnect;
4622 			}
4623 			expect = cmd->pkt_size;
4624 			ERR_IF(len != expect-sizeof(struct p_header))
4625 				goto reconnect;
4626 		}
4627 		if (received == expect) {
4628 			D_ASSERT(cmd != NULL);
4629 			if (!cmd->process(mdev, h))
4630 				goto reconnect;
4631 
4632 			buf	 = h;
4633 			received = 0;
4634 			expect	 = sizeof(struct p_header);
4635 			cmd	 = NULL;
4636 		}
4637 	}
4638 
4639 	if (0) {
4640 reconnect:
4641 		drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4642 	}
4643 	if (0) {
4644 disconnect:
4645 		drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4646 	}
4647 	clear_bit(SIGNAL_ASENDER, &mdev->flags);
4648 
4649 	D_ASSERT(mdev->state.conn < C_CONNECTED);
4650 	dev_info(DEV, "asender terminated\n");
4651 
4652 	return 0;
4653 }
4654